English

SINETStream ユーザガイド

Java API

1. 使用例
2. Java API クラス一覧
 2.1 MessageWriterFactory クラス
 2.2 MessageWriter クラス
 2.3 AsyncMessageWriter クラス
 2.4 MessageReaderFactory クラス
 2.5 MessageReader クラス
 2.6 AsyncMessageReader クラス
 2.7 Message クラス
 2.8 Metrics クラス
 2.9 例外一覧
3. メッセージングシステム固有のパラメータ
4. チートシートの表示方法

1. 使用例

はじめに簡単な使用例を示す。

この例では、異なるメッセージングシステムをバックエンドとする二つのサービス service-1service-2 を利用する。 service-1 のバックエンドは Apache Kafka で、4台のブローカー kafka-1kafka-4 で構成される。 service-2 のバックエンドは MQTT で、1台のブローカー 192.168.2.105 で構成される。

設定ファイル作成

設定ファイルは、クライアントがブローカーに接続するための設定が記述されたファイルである。 詳細は 設定ファイル を参照すること。

この例では、以下の内容の設定ファイル .sinetstream_config.yml をクライアントマシンのカレントディレクトリに作成する。

service-1:
  type: kafka
  brokers:
    - kafka-1:9092
    - kafka-2:9092
    - kafka-3:9092
    - kafka-4:9092
service-2:
  type: mqtt
  brokers: 192.168.2.105:1883
  username_pw_set:
    username: user01
    password: pass01

メッセージ送信

サービス名 service-1 に対応するメッセージングシステムのトピック topic-1 に対してメッセージを送信する例を示す。

MessageWriterFactory<String> factory =
    MessageWriterFactory.<String>builder()
        .service("service-1")
        .topic("topic-1")
        .consistency(AT_LEAST_ONCE)
        .build();

try (MessageWriter<String> writer = factory.getWriter()) {
    writer.write("Hello! This is the 1st message.");
    writer.write("Hello! This is the 2nd message.");
}

まず、パラメータ service, topic, consistency を指定してファクトリオブジェクト factory を作成する。 この factory に対して getWriter() を呼び出し、メッセージを送信するためのライターを得る。 その後、ライターの write() を呼び出してメッセージをブローカーに送信する。

メッセージ受信

サービス名 service-1 に対応するメッセージングシステムのトピック topic-1 からメッセージを受信する例を示す。

MessageReaderFactory<String> factory =
    MessageReaderFactory.<String>builder()
        .service("service-1")
        .topic("topic-1")
        .consistency(AT_LEAST_ONCE)
        .receiveTimeout(Duration.ofSeconds(60))
        .build();

try (MessageReader<String> reader = factory.getReader()) {
    Message<String> msg;
    while (Objects.nonNull(msg = reader.read())) {
        System.out.println(msg.getValue());
    }
}

まず、パラメータ service, topic, consistency, receiveTimeout を指定してファクトリオブジェクト factory を作成する。 この factory に対して getReader() を呼び出し、メッセージを受信するためのリーダーを得る。 その後、リーダーの read() を呼び出してブローカーからメッセージを受信する。 リーダーの read() を呼び出したあと、receiveTimeout に指定した時間メッセージが取得できなかった場合、read()null を返しループが終了する。

2. Java API クラス一覧

主要クラス

2.1 MessageWriterFactory クラス

MessageWriter を取得するためのファクトリクラス。

複数のパラメータを指定して MessageWriter のインスタンスを構築するためのビルダークラス MessageWriterFactoryBuilder が内部クラスとして用意されている。 ビルダークラスでは以下のパラメータを指定できる。

ビルダークラスのインスタンスを取得するには MessageWriterFactory.builder() を呼び出す。 また、ビルダーオブジェクトからファクトリオブジェクトを得るには build() を呼び出す。 以下に例を示す。

MessageWriterFactory<String> factory =
    MessageWriterFactory.<String>builder()
        .service("service-1")
        .topic("topic-1")
        .consistency(AT_LEAST_ONCE)
        .build();

2.2 MessageWriter クラス

ブローカーにメッセージを送信するクラス。

ファクトリクラスのインスタンスに対して getWriter() を呼び出すことで、ライタークラス MessageWriter のインスタンスが取得できる。 MessageWriter には AutoCloseable が実装されているので try-with-resources 文を利用できる。 メッセージを送信するメソッドwrite()は送信処理が完了するまでブロックする。

以下に例を示す。

MessageWriterFactory<String> factory = MessageWriterFactory.<String>builder()
        .service("service-1").build();

try (MessageWriter<String> writer = factory.getWriter()) {
    writer.write("message-1");
}

2.3 AsyncMessageWriter クラス

ブローカーにメッセージを送信するクラス。

ファクトリクラスのインスタンスに対して getAsyncWriter() を呼び出すことで、ライタークラス AsyncMessageWriter のインスタンスが取得できる。 AsyncMessageWriter には AutoCloseable が実装されているので try-with-resources 文を利用できる。 メッセージを送信するメソッドwrite()は非同期処理であり JDeferredPromiseオブジェクトを返す。

以下に例を示す。

MessageWriterFactory<String> factory = MessageWriterFactory.<String>builder()
        .service("service-1").build();

try (AsyncMessageWriter<String> writer = factory.getAsyncWriter()) {
    writer.write("message-1")
          .done(result -> System.out.println("write task done"))
          .fail(result -> System.out.println("write task failed"))
}

write()メソッドが返すPromiseオブジェクトのメソッド .done(), .fail()を用いることで、 送信結果(成功、失敗)に応じた処理を設定することができる。Promiseの主なメソッドを以下に示す。

2.4 MessageReaderFactory クラス

MessageReader を取得するためのファクトリクラス。

複数のパラメータを指定して MessageReader のインスタンスを構築するためのビルダークラス MessageReaderFactoryBuilder が内部クラスとして用意されている。 ビルダークラスでは以下のパラメータを指定できる。

ビルダークラスのインスタンスを取得するには MessageReaderFactory.builder() を呼び出す。 また、ビルダーオブジェクトからファクトリオブジェクトを得るには build() を呼び出す。 以下に例を示す。

MessageReaderFactory<String> factory =
    MessageReaderFactory.<String>builder()
        .service("service-1")
        .topic("topic-1")
        .consistency(AT_LEAST_ONCE)
        .build();

2.5 MessageReader クラス

ブローカーからメッセージを受信するクラス。

ファクトリクラスのインスタンスに対して getReader() を呼び出すことで、リーダークラス MessageReader のインスタンスが取得できる。 MessageReader には AutoCloseable が実装されているので try-with-resources 文を利用できる。 メッセージを受信するメソッドread()はメッセージを受信するかreceiveTimeout()に指定されているタイムアウト時間が経過するまではブロックする。

以下に例を示す。

MessageReaderFactory<String> factory = MessageReaderFactory.<String>builder()
        .service("service-1").receiveTimeout(Duration.ofSeconds(60)).build();

try (MessageReader<String> reader = factory.getReader()) {
    Message<String> msg;
    while (Objects.nonNull(msg = reader.read())) {
        System.out.println("TOPIC: " + msg.getTopic() + " MESSAGE: " + msg.getValue());
    }
}

read() メソッドの返り値は Message<T> クラスのインスタンスになる。

2.6 AsyncMessageReader クラス

ブローカーからメッセージを受信するクラス。

ファクトリクラスのインスタンスに対して getAsyncReader() を呼び出すことで、 リーダークラス AsyncMessageReader のインスタンスが取得できる。

受信したメッセージを処理するにはaddOnMessageCallback()メソッドでメッセージを受信した際に呼び出されるコールバックを設定する。 コールバックの引数によって受信したメッセージが受け渡される。

以下に例を示す。

MessageReaderFactory<String> factory = MessageReaderFactory.<String>builder()
        .service("service-1").receiveTimeout(Duration.ofSeconds(60)).build();

AsyncMessageReader<String> reader = factory.getAsyncReader();
reader.addOnMessageCallback((msg) -> {
    System.out.println("TOPIC: " + msg.getTopic() + " MESSAGE: " + msg.getValue());
});

// 他の処理など

reader.close();

2.7 Message クラス

ブローカーから受信したメッセージのクラス。

2.8 Metrics クラス

メトリクス情報のクラス。 Reader/Writerオブジェクトに対してgetMetrics()メソッドを呼び出すと得られる。

Reader/Writerオブジェクトに対してresetMetrics()メソッドを呼び出すとReader/Writerの統計情報がリセットされる。 引数 reset_raw にtrueを指定した場合に限り、SINETStreamの統計情報だけでなくメッセージングシステム固有の統計情報もリセットされる(可能であれば)。

引数 reset_raw なしのresetMetrics()メソッドはreset_raw=falseを指定したのと同じである。

Eclipse Paho(SINETStreamのMQTTプラグインで使用しているMQTTクライアントライブラリ)は統計情報を提供してない。 Kafkaにはメッセージングシステム固有の統計情報があるがリセット機能はない。

統計情報はSINETStreamメインライブラリとメッセージングシステムプラグインの境界で測定した値が使われる。 したがって、SINETStreamの暗号化機能が有効の場合は暗号化されたメッセージが測定される。 統計情報の更新タイミングはWriterではメッセージングシステムプラグインにデータ渡す直前(メッセージングシステムが実際に送信したかは関知しない)、 Readerではメッセージングシステムプラグインからデータを受け取った直後である。 圧縮に関する統計統計情報は例外で圧縮処理の前後で測定される。

  <writer>                      <reader>
  Application                   Application
    ↓                            ↑
  value serializer              value deserializer
    ↓                            ↑                ←msg_uncompressed_bytes_total
  compressor                    decompressor
    ↓                            ↑                ←msg_compressed_bytes_total
  Avro serializer               Avro deserializer
    ↓                            ↑
  encrypt                       decrypt
- - ↓  - - - - - - - - - - - - - ↑ - - - - - - - -←メトリクス測定境界
  messaging system → broker → messaging system

プロパティ

使用例

受信したメッセージ数・バイト数を表示する。

try (MessageReader<String> reader = factory.getReader()) {
    // (1)
    Message<String> msg;
    while (Objects.nonNull(msg = reader.read())) {
        ;
    }
    Metrics metrics = reader.getMetrics();  // (1) からの累積の統計情報が得られる
    System.out.println("COUNT: " + metrics.getMsgCountTotal());
    System.out.println("BYTES: " + metrics.getMsgBytesTotal());
}

10メッセージごとに受信レートを表示する。

try (MessageReader<String> reader = factory.getReader()) {
    Message<String> msg;
    int count = 0;
    while (Objects.nonNull(msg = reader.read())) {
        count++;
        if (count == 10) {
            count = 0;
            Metrics metrics = reader.getMetrics();
            reader.resetMetrics();
            System.out.println("COUNT/s: " + metrics.getMsgCountRate());
            System.out.println("BYTES/s: " + metrics.getMsgBytesRate());
        }
    }
}

2.9 例外一覧

例外名 メソッド名  
NoConfigException MessageReaderFactory#getReader() MessageReaderFactory#getAsyncReader() MessageWriterFactory#getWriter() MessageWriterFactory#getAsyncWriter() 設定ファイルを読み込めない
NoServiceException MessageReaderFactory#getReader() MessageReaderFactory#getAsyncReader() MessageWriterFactory#getWriter() MessageWriterFactory#getAsyncWriter() 指定したサービス名に対応するエントリが設定ファイルに存在しない
UnsupportedServiceException MessageReaderFactory#getReader() MessageReaderFactory#getAsyncReader() MessageWriterFactory#getWriter() MessageWriterFactory#getAsyncWriter() サポートしていないメッセージングシステムが指定された
ConnectionException MessageReaderFactory#getReader() MessageReaderFactory#getAsyncReader() MessageWriterFactory#getWriter() MessageWriterFactory#getAsyncWriter() ブローカーに接続できない
InvalidConfigurationException MessageReaderFactory#getReader() MessageReaderFactory#getAsyncReader() MessageWriterFactory#getWriter() MessageWriterFactory#getAsyncWriter() 設定ファイルの記述内容に誤りがある
SinetStreamIOException MessageReaderFactory#getReader() MessageReaderFactory#getAsyncReader() MessageWriterFactory#getWriter() MessageWriterFactory#getAsyncWriter() MessageReader<T>#read() MessageReader<T>#close() MessageWriter<T>#write(T) MessageWriter<T>#close() メッセージングシステムとのIOでエラーが発生した
SinetStreamException MessageReaderFactory#getReader() MessageReaderFactory#getAsyncReader() MessageWriterFactory#getWriter() MessageWriterFactory#getAsyncWriter() MessageReader<T>#read() MessageReader<T>#close() MessageWriter<T>write(T) MessageWriter<T>close() SINETStreamに関するその他のエラー
InvalidMessageException MessageReader<T>#read() メッセージのフォーマットが間違っている
AuthenticationException MessageReaderFactory#getReader() MessageReaderFactory#getAsyncReader() MessageWriterFactory#getWriter() MessageWriterFactory#getAsyncWriter() ブローカーとの接続で認証エラーになった
AuthorizationException MessageReader<T>#read() MessageWriter<T>#write() 認可されない操作を行った(*)

(*) メッセージングシステムのタイプやConsistencyのパラメータによっては認可されない操作を行っても例外が発生しない場合がある。

  1. MQTT(Mosquitto)では認可されない操作を行ってもブローカーがエラーを返さないため例外が発生しない
  2. Kafkaのブローカーに対してConsistencyAT_MOST_ONCEを指定してメッセージの送信を行った場合、ブローカーからの応答を待たずに送信処理が完了するため、認可されない操作を行った場合も例外が発生しない

3. メッセージングシステム固有のパラメータ

4. チートシートの表示方法

APIのjarファイルを java -jar の後に指定して実行すると、チートシートが表示される。

$ java -jar SINETStream-api-1.6.0.jar

==================================================
MessageWriter example
--------------------------------------------------
MessageWriterFactory<String> factory = MessageWriterFactory.<String>builder()
        .service("service-1")
        .topic("topic-1")
        .build();
try (MessageWriter<String> writer = factory.getWriter()) {
    writer.writer("message");
}
--------------------------------------------------
MessageWriterFactory parameters:
    service(String service)
        Service name defined in the configuration file. (REQUIRED)
    clientId(String clientId)
        If not specified, the value is automatically generated.
    configName(String configName)
        configuration name.
    consistency(Consistency consistency[=AT_MOST_ONCE])
        consistency: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE
    dataEncryption(Boolean dataEncryption[=false])
        Message encryption.
    parameter(String key, Object parameter)
        Rewrites the parameters described in the configuration file only for the specified key / value pairs.
    parameters(Map parameters)
        Overwrites the parameters described in the configuration file with the specified values.
    serializer(Serializer serializer)
        If not specified, use default serializer according to valueType.
    topic(String topic)
        The topic to send.
    valueType(ValueType valueType[=BYTE_ARRAY])
        The type of message.
==================================================
MessageReader example
--------------------------------------------------
MessageReaderFactory<String> factory = MessageReaderFactory.<String>builder()
        .service("service-1")
        .topic("topic-1")
        .build();
try (MessageReader<String> reader = factory.getReader()) {
    Message<String> msg;
    while (Objects.nonNull(msg = reader.read)) {
        System.out.println(msg.getValue());
    }
}
--------------------------------------------------
MessageReaderFactory parameters:
    service(String service)
        Service name defined in the configuration file. (REQUIRED)
    clientId(String clientId)
        If not specified, the value is automatically generated.
    configName(String configName)
        configuration name.
    consistency(Consistency consistency[=AT_MOST_ONCE])
        consistency: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE
    dataEncryption(Boolean dataEncryption[=false])
        Message encryption.
    deserializer(Deserializer deserializer)
        If not specified, use default deserializer according to valueType.
    parameter(String key, Object parameter)
        Rewrites the parameters described in the configuration file only for the specified key / value pairs.
    parameters(Map parameters)
        Overwrites the parameters described in the configuration file with the specified values.
    topic(String topic)
        The topic to receive.
    topics(Collection topics)
        A list of topics to receive.
    valueType(ValueType valueType[=BYTE_ARRAY])
        The type of message.