SINETStream ユーザガイド
Java API
1. 使用例 2. Java API クラス一覧 2.1 MessageWriterFactory クラス 2.2 MessageWriter クラス 2.3 AsyncMessageWriter クラス 2.4 MessageReaderFactory クラス 2.5 MessageReader クラス 2.6 AsyncMessageReader クラス 2.7 Message クラス 2.8 Metrics クラス 2.9 例外一覧 3. メッセージングシステム固有のパラメータ 4. チートシートの表示方法
1. 使用例
はじめに簡単な使用例を示す。
この例では、異なるメッセージングシステムをバックエンドとする二つのサービス service-1 と service-2 を利用する。
service-1 のバックエンドは Apache Kafka で、4台のブローカー kafka-1 ~ kafka-4 で構成される。
service-2 のバックエンドは MQTT で、1台のブローカー 192.168.2.105 で構成される。
設定ファイル作成
設定ファイルは、クライアントがブローカーに接続するための設定が記述されたファイルである。 詳細は 設定ファイル を参照すること。
この例では、以下の内容の設定ファイル .sinetstream_config.yml をクライアントマシンのカレントディレクトリに作成する。
service-1:
type: kafka
brokers:
- kafka-1:9092
- kafka-2:9092
- kafka-3:9092
- kafka-4:9092
service-2:
type: mqtt
brokers: 192.168.2.105:1883
username_pw_set:
username: user01
password: pass01
メッセージ送信
サービス名 service-1 に対応するメッセージングシステムのトピック topic-1 に対してメッセージを送信する例を示す。
MessageWriterFactory<String> factory =
MessageWriterFactory.<String>builder()
.service("service-1")
.topic("topic-1")
.consistency(AT_LEAST_ONCE)
.build();
try (MessageWriter<String> writer = factory.getWriter()) {
writer.write("Hello! This is the 1st message.");
writer.write("Hello! This is the 2nd message.");
}
まず、パラメータ service, topic, consistency を指定してファクトリオブジェクト factory を作成する。
この factory に対して getWriter() を呼び出し、メッセージを送信するためのライターを得る。
その後、ライターの write() を呼び出してメッセージをブローカーに送信する。
メッセージ受信
サービス名 service-1 に対応するメッセージングシステムのトピック topic-1 からメッセージを受信する例を示す。
MessageReaderFactory<String> factory =
MessageReaderFactory.<String>builder()
.service("service-1")
.topic("topic-1")
.consistency(AT_LEAST_ONCE)
.receiveTimeout(Duration.ofSeconds(60))
.build();
try (MessageReader<String> reader = factory.getReader()) {
Message<String> msg;
while (Objects.nonNull(msg = reader.read())) {
System.out.println(msg.getValue());
}
}
まず、パラメータ service, topic, consistency, receiveTimeout を指定してファクトリオブジェクト factory を作成する。
この factory に対して getReader() を呼び出し、メッセージを受信するためのリーダーを得る。
その後、リーダーの read() を呼び出してブローカーからメッセージを受信する。
リーダーの read() を呼び出したあと、receiveTimeout に指定した時間メッセージが取得できなかった場合、read() が null を返しループが終了する。
2. Java API クラス一覧
主要クラス
- jp.ad.sinet.stream.api.MessageWriter
- メッセージを送信するクラス
- jp.ad.sinet.stream.api.AsyncMessageWriter
- メッセージを送信するクラス(同非期API)
- jp.ad.sinet.stream.api.MessageReader
- メッセージを受信するクラス
- jp.ad.sinet.stream.api.AsyncMessageReader
- メッセージを受信するクラス(非同期API)
- jp.ad.sinet.stream.utils.MessageWriterFactory
MessageWriterオブジェクトを作成するためのファクトリクラス
- jp.ad.sinet.stream.utils.MessageReaderFactory
MessageReaderオブジェクトを作成するためのファクトリクラス
2.1 MessageWriterFactory クラス
MessageWriter を取得するためのファクトリクラス。
複数のパラメータを指定して MessageWriter のインスタンスを構築するためのビルダークラス MessageWriterFactoryBuilder が内部クラスとして用意されている。
ビルダークラスでは以下のパラメータを指定できる。
- service(String)
- サービス名
- 対応するサービスが設定ファイルに記述されている必要がある
- topic(String)
- メッセージの送信先となるトピック名
- configName(String)
- コンフィグ名
- コンフィグ名が指定されるとコンフィグサーバからコンフィグ情報を取得する。
- コンフィグ情報のなかで定義されているサービスが1つしかないとわかっている場合はサービス名にnullを指定してもよい。
- コンフィグ名が指定されなかった場合、コンフィグ情報を得るために設定ファイルが読み込まれる。
- clientId(String)
- クライアントID
- 指定されなかった場合は、値をライブラリ内部で自動生成する
- consistency(Consistency)
- 列挙型で
AT_MOST_ONCE,AT_LEAST_ONCE,EXACTLY_ONCEのいずれかの値をとる - デフォルト値は
AT_MOST_ONCE
- 列挙型で
- valueType(ValueType)
- メッセージのデータ本体部分(ペイロード)のタイプ
MessageWriter.write()が送信するメッセージは、このパラメータに対応するシリアライザによってバイト列に変換される- 標準パッケージでは
SimpleValueType.BYTE_ARRAY,SimpleValueType.TEXTの何れかを指定するSimpleValueType.BYTE_ARRAY(デフォルト値)を指定した場合、ペイロードの型はbyte[]として処理するSimpleValueType.TEXTを指定した場合、ペイロードの型はjava.lang.Stringとして処理する
- 追加パッケージをインストールすることにより、サポートする
ValueTypeの種類を増やすことができる- SINETStream v1.1 以降では画像タイプを追加するパッケージを提供している
- 画像タイプを指定する場合は
valueType()の引数にnew ValueTypeFactory().get("image")を設定する - 画像タイプを指定した場合、ペイロードの型は
java.awt.image.BufferedImageとして処理する
- serializer(Serializer<T>)
- メッセージのデータ本体のシリアライザ
- このパラメータを指定しない場合、シリアライザは
valueTypeの値から決定される
- dataEncryption(Boolean)
- メッセージを暗号化の有効、無効の指定
- 暗号化を有効にする場合は、暗号化に関するパラメータ
cryptoが設定ファイルなどで指定されている必要がある
- parameter(String key, Object value)
- メッセージングシステム固有のパラメータを指定する。設定ファイルと異なる値を指定したい場合にマッピングを指定する
- parameters(Map<String, Object> parameters)
- メッセージングシステム固有のパラメータを指定する。設定ファイルと異なる値を複数のマッピングペアで指定する
ビルダークラスのインスタンスを取得するには MessageWriterFactory.builder() を呼び出す。
また、ビルダーオブジェクトからファクトリオブジェクトを得るには build() を呼び出す。
以下に例を示す。
MessageWriterFactory<String> factory =
MessageWriterFactory.<String>builder()
.service("service-1")
.topic("topic-1")
.consistency(AT_LEAST_ONCE)
.build();
2.2 MessageWriter クラス
ブローカーにメッセージを送信するクラス。
ファクトリクラスのインスタンスに対して getWriter() を呼び出すことで、ライタークラス MessageWriter のインスタンスが取得できる。
MessageWriter には AutoCloseable が実装されているので try-with-resources 文を利用できる。
メッセージを送信するメソッドwrite()は送信処理が完了するまでブロックする。
以下に例を示す。
MessageWriterFactory<String> factory = MessageWriterFactory.<String>builder()
.service("service-1").build();
try (MessageWriter<String> writer = factory.getWriter()) {
writer.write("message-1");
}
2.3 AsyncMessageWriter クラス
ブローカーにメッセージを送信するクラス。
ファクトリクラスのインスタンスに対して getAsyncWriter() を呼び出すことで、ライタークラス AsyncMessageWriter のインスタンスが取得できる。
AsyncMessageWriter には AutoCloseable が実装されているので try-with-resources 文を利用できる。
メッセージを送信するメソッドwrite()は非同期処理であり JDeferredのPromiseオブジェクトを返す。
以下に例を示す。
MessageWriterFactory<String> factory = MessageWriterFactory.<String>builder()
.service("service-1").build();
try (AsyncMessageWriter<String> writer = factory.getAsyncWriter()) {
writer.write("message-1")
.done(result -> System.out.println("write task done"))
.fail(result -> System.out.println("write task failed"))
}
write()メソッドが返すPromiseオブジェクトのメソッド .done(), .fail()を用いることで、
送信結果(成功、失敗)に応じた処理を設定することができる。Promiseの主なメソッドを以下に示す。
done()– 遅延オブジェクトの処理が正常に完了した場合にトリガーされるfail()– 遅延オブジェクトの処理中に例外が発生したした場合にトリガーされるalways()– 遅延オブジェクトの処理結果によらず全ての場合にトリガーされる
2.4 MessageReaderFactory クラス
MessageReader を取得するためのファクトリクラス。
複数のパラメータを指定して MessageReader のインスタンスを構築するためのビルダークラス
MessageReaderFactoryBuilder が内部クラスとして用意されている。
ビルダークラスでは以下のパラメータを指定できる。
- service(String)
- サービス名
- 対応するサービスが設定ファイルに記述されている必要がある
- topic(String)
- メッセージの受信元となるトピック名
- topicsパラメータが指定されているときは無視される
- configName(String)
- コンフィグ名
- コンフィグ名が指定されるとコンフィグサーバからコンフィグ情報を取得する。
- コンフィグ情報のなかで定義されているサービスが1つしかないとわかっている場合はサービス名にnullを指定してもよい。
- コンフィグ名が指定されなかった場合、コンフィグ情報を得るために設定ファイルが読み込まれる。
- topics(Collection<String>)
- メッセージの受信元となるトピックのコレクション
MessageReaderでは複数のトピックからメッセージを受信することができる
- clientId(String)
- クライアントID
- 指定されなかった場合は、値をライブラリ内部で自動生成する
- consistency(Consistency)
- 列挙型で
AT_MOST_ONCE,AT_LEAST_ONCE,EXACTLY_ONCEのいずれかの値をとる - デフォルト値は
AT_MOST_ONCE
- 列挙型で
- valueType(ValueType)
- メッセージのデータ本体部分(ペイロード)のタイプ
MessageReader.read()で受信するメッセージは、このパラメータに対応するデシリアライザによってバイト列から変換される- 標準パッケージでは
SimpleValueType.BYTE_ARRAY,SimpleValueType.TEXTの何れかを指定するSimpleValueType.BYTE_ARRAY(デフォルト値)を指定した場合、ペイロードの型はbyte[]として処理するSimpleValueType.TEXTを指定した場合、ペイロードの型はjava.lang.Stringとして処理する
- 追加パッケージをインストールすることにより、サポートする
ValueTypeの種類を増やすことができる- SINETStream v1.1 以降では画像タイプを追加するパッケージを提供している
- 画像タイプを指定する場合は
valueType()の引数にnew ValueTypeFactory().get("image")を設定する - 画像タイプを指定した場合、ペイロードの型は
java.awt.image.BufferedImageとして処理する
- deserializer(Serializer<T>)
- メッセージのデータ本体のデシリアライザ
- このパラメータを指定しない場合、デシリアライザは
valueTypeの値から決定される
- dataEncryption(Boolean)
- 暗号化されたメッセージの復号処理の有効、無効の指定
- 暗号化を有効にする場合は、暗号化に関するパラメータ
cryptoが設定ファイルなどで指定されている必要がある
- receiveTimeout(Duration)
MessageReaderのread()メソッドがメッセージの到着を待つ最大待ち時間
- parameter(String key, Object value)
- メッセージングシステム固有のパラメータを指定する。設定ファイルと異なる値を指定したい場合にマッピングを指定する
- parameters(Map<String, Object> parameters)
- メッセージングシステム固有のパラメータを指定する。設定ファイルと異なる値を複数のマッピングペアで指定する
ビルダークラスのインスタンスを取得するには MessageReaderFactory.builder() を呼び出す。
また、ビルダーオブジェクトからファクトリオブジェクトを得るには build() を呼び出す。
以下に例を示す。
MessageReaderFactory<String> factory =
MessageReaderFactory.<String>builder()
.service("service-1")
.topic("topic-1")
.consistency(AT_LEAST_ONCE)
.build();
2.5 MessageReader クラス
ブローカーからメッセージを受信するクラス。
ファクトリクラスのインスタンスに対して getReader() を呼び出すことで、リーダークラス MessageReader のインスタンスが取得できる。
MessageReader には AutoCloseable が実装されているので try-with-resources 文を利用できる。
メッセージを受信するメソッドread()はメッセージを受信するかreceiveTimeout()に指定されているタイムアウト時間が経過するまではブロックする。
以下に例を示す。
MessageReaderFactory<String> factory = MessageReaderFactory.<String>builder()
.service("service-1").receiveTimeout(Duration.ofSeconds(60)).build();
try (MessageReader<String> reader = factory.getReader()) {
Message<String> msg;
while (Objects.nonNull(msg = reader.read())) {
System.out.println("TOPIC: " + msg.getTopic() + " MESSAGE: " + msg.getValue());
}
}
read() メソッドの返り値は Message<T> クラスのインスタンスになる。
2.6 AsyncMessageReader クラス
ブローカーからメッセージを受信するクラス。
ファクトリクラスのインスタンスに対して getAsyncReader() を呼び出すことで、
リーダークラス AsyncMessageReader のインスタンスが取得できる。
受信したメッセージを処理するにはaddOnMessageCallback()メソッドでメッセージを受信した際に呼び出されるコールバックを設定する。
コールバックの引数によって受信したメッセージが受け渡される。
以下に例を示す。
MessageReaderFactory<String> factory = MessageReaderFactory.<String>builder()
.service("service-1").receiveTimeout(Duration.ofSeconds(60)).build();
AsyncMessageReader<String> reader = factory.getAsyncReader();
reader.addOnMessageCallback((msg) -> {
System.out.println("TOPIC: " + msg.getTopic() + " MESSAGE: " + msg.getValue());
});
// 他の処理など
reader.close();
2.7 Message クラス
ブローカーから受信したメッセージのクラス。
- getTopic()
- トピック名を取得する。
- getValue()
- メッセージの値を取得する。
- getTimestamp()
- メッセージ送信時刻(Unix時間;単位は秒)を取得する。
- 値
0は時刻が設定されてないことを示す。
- getTimestampMicroseconds()
- メッセージ送信時刻(Unix時間;単位はマイクロ秒単位)を取得する。
- 値
0は時刻が設定されてないことを示す。
2.8 Metrics クラス
メトリクス情報のクラス。 Reader/Writerオブジェクトに対してgetMetrics()メソッドを呼び出すと得られる。
- MessageReader#getMetrics()
- AsyncMessageReader#getMetrics()
- MessageWriter#getMetrics()
- AsyncMessageWriter#getMetrics()
Reader/Writerオブジェクトに対してresetMetrics()メソッドを呼び出すとReader/Writerの統計情報がリセットされる。
引数 reset_raw にtrueを指定した場合に限り、SINETStreamの統計情報だけでなくメッセージングシステム固有の統計情報もリセットされる(可能であれば)。
- MessageReader#resetMetrics(boolean reset_raw)
- MessageWriter#resetMetrics(boolean reset_raw)
- AsyncMessageReader#resetMetrics(boolean reset_raw)
- AsyncMessageWriter#resetMetrics(boolean reset_raw)
引数 reset_raw なしのresetMetrics()メソッドはreset_raw=falseを指定したのと同じである。
- MessageReader#resetMetrics()
- MessageWriter#resetMetrics()
- AsyncMessageReader#resetMetrics()
- AsyncMessageWriter#resetMetrics()
Eclipse Paho(SINETStreamのMQTTプラグインで使用しているMQTTクライアントライブラリ)は統計情報を提供してない。 Kafkaにはメッセージングシステム固有の統計情報があるがリセット機能はない。
統計情報はSINETStreamメインライブラリとメッセージングシステムプラグインの境界で測定した値が使われる。 したがって、SINETStreamの暗号化機能が有効の場合は暗号化されたメッセージが測定される。 統計情報の更新タイミングはWriterではメッセージングシステムプラグインにデータ渡す直前(メッセージングシステムが実際に送信したかは関知しない)、 Readerではメッセージングシステムプラグインからデータを受け取った直後である。 圧縮に関する統計統計情報は例外で圧縮処理の前後で測定される。
<writer> <reader>
Application Application
↓ ↑
value serializer value deserializer
↓ ↑ ←msg_uncompressed_bytes_total
compressor decompressor
↓ ↑ ←msg_compressed_bytes_total
Avro serializer Avro deserializer
↓ ↑
encrypt decrypt
- - ↓ - - - - - - - - - - - - - ↑ - - - - - - - -←メトリクス測定境界
messaging system → broker → messaging system
プロパティ
- getStartTime(), getStartTimeMillis()
- 測定を開始した時刻(Unix時間)
- getStartTime()が返す時刻の単位は秒
- getStartTimeMillis()が返す時刻の単位はミリ秒
- Reader/Writerオブジェクトを作成した時刻、またはリセットした時刻。
- 測定を開始した時刻(Unix時間)
- getEndTime(), getEndTimeMillis()
- 測定を終了した時刻(Unix時間)
- getEndTime()が返す時刻の単位は秒
- getEndTimeMillis()が返す時刻の単位はミリ秒
- getMetrics()メソッドを呼んだ時刻
- 測定を終了した時刻(Unix時間)
- getTime(), getTimeMillis()
- 測定時間 (= EndTime - StartTime)
- getTime()が返す時間の単位は秒
- getTimeMillis()が返す時間の単位はミリ秒
- 測定時間 (= EndTime - StartTime)
- getMsgCountTotal()
- 累積送受信メッセージ数
- getMsgCountRate()
- 送受信メッセージ数レート
- = msg_count_total / time
- timeが0のときは0を返す。
- getMsgUncompressedBytesTotal()
- ユーザデータ累積送受信メッセージ量(bytes)
- value serializerを通した直後・value deserializerを通す直前
- getMsgCompressedBytesTotal()
- ユーザデータ圧縮後累積送受信メッセージ量(bytes)
- getMsgCompressionRatio()
- メッセージ圧縮率 (0に近い方が高圧縮率、1に近い方が低圧縮率)
- = msg_compression_ratio / msg_uncompressed_bytes_total
- getMsgBytesTotal()
- 累積送受信メッセージ量(bytes)
- getMsgBytesRate()
- 送受信メッセージ量レート
- = msg_bytes_total / time
- timeが0のときは0を返す。
- getMsgSizeMin()
- 最小送受信メッセージサイズ(bytes)
- getMsgSizeAvg()
- 平均送受信メッセージサイズ(bytes)
- = msg_bytes_total / msg_count_total
- msg_count_totalが0のときは0を返す。
- getMsgSizeMax()
- 最大送受信メッセージサイズ(bytes)
- getErrorCountTotal()
- 累積エラー数
- getErrorCountRate()
- エラーレート
- = error_count_total / time
- timeが0のときは0を返す。
- getRaw()
- メッセージングシステム固有の統計情報
使用例
受信したメッセージ数・バイト数を表示する。
try (MessageReader<String> reader = factory.getReader()) {
// (1)
Message<String> msg;
while (Objects.nonNull(msg = reader.read())) {
;
}
Metrics metrics = reader.getMetrics(); // (1) からの累積の統計情報が得られる
System.out.println("COUNT: " + metrics.getMsgCountTotal());
System.out.println("BYTES: " + metrics.getMsgBytesTotal());
}
10メッセージごとに受信レートを表示する。
try (MessageReader<String> reader = factory.getReader()) {
Message<String> msg;
int count = 0;
while (Objects.nonNull(msg = reader.read())) {
count++;
if (count == 10) {
count = 0;
Metrics metrics = reader.getMetrics();
reader.resetMetrics();
System.out.println("COUNT/s: " + metrics.getMsgCountRate());
System.out.println("BYTES/s: " + metrics.getMsgBytesRate());
}
}
}
2.9 例外一覧
| 例外名 | メソッド名 | |
|---|---|---|
| NoConfigException | MessageReaderFactory#getReader() MessageReaderFactory#getAsyncReader() MessageWriterFactory#getWriter() MessageWriterFactory#getAsyncWriter() | 設定ファイルを読み込めない |
| NoServiceException | MessageReaderFactory#getReader() MessageReaderFactory#getAsyncReader() MessageWriterFactory#getWriter() MessageWriterFactory#getAsyncWriter() | 指定したサービス名に対応するエントリが設定ファイルに存在しない |
| UnsupportedServiceException | MessageReaderFactory#getReader() MessageReaderFactory#getAsyncReader() MessageWriterFactory#getWriter() MessageWriterFactory#getAsyncWriter() | サポートしていないメッセージングシステムが指定された |
| ConnectionException | MessageReaderFactory#getReader() MessageReaderFactory#getAsyncReader() MessageWriterFactory#getWriter() MessageWriterFactory#getAsyncWriter() | ブローカーに接続できない |
| InvalidConfigurationException | MessageReaderFactory#getReader() MessageReaderFactory#getAsyncReader() MessageWriterFactory#getWriter() MessageWriterFactory#getAsyncWriter() | 設定ファイルの記述内容に誤りがある |
| SinetStreamIOException | MessageReaderFactory#getReader() MessageReaderFactory#getAsyncReader() MessageWriterFactory#getWriter() MessageWriterFactory#getAsyncWriter() MessageReader<T>#read() MessageReader<T>#close() MessageWriter<T>#write(T) MessageWriter<T>#close() | メッセージングシステムとのIOでエラーが発生した |
| SinetStreamException | MessageReaderFactory#getReader() MessageReaderFactory#getAsyncReader() MessageWriterFactory#getWriter() MessageWriterFactory#getAsyncWriter() MessageReader<T>#read() MessageReader<T>#close() MessageWriter<T>write(T) MessageWriter<T>close() | SINETStreamに関するその他のエラー |
| InvalidMessageException | MessageReader<T>#read() | メッセージのフォーマットが間違っている |
| AuthenticationException | MessageReaderFactory#getReader() MessageReaderFactory#getAsyncReader() MessageWriterFactory#getWriter() MessageWriterFactory#getAsyncWriter() | ブローカーとの接続で認証エラーになった |
| AuthorizationException | MessageReader<T>#read() MessageWriter<T>#write() | 認可されない操作を行った(*) |
(*) メッセージングシステムのタイプやConsistencyのパラメータによっては認可されない操作を行っても例外が発生しない場合がある。
- MQTT(Mosquitto)では認可されない操作を行ってもブローカーがエラーを返さないため例外が発生しない
- Kafkaのブローカーに対して
ConsistencyにAT_MOST_ONCEを指定してメッセージの送信を行った場合、ブローカーからの応答を待たずに送信処理が完了するため、認可されない操作を行った場合も例外が発生しない
3. メッセージングシステム固有のパラメータ
4. チートシートの表示方法
APIのjarファイルを java -jar の後に指定して実行すると、チートシートが表示される。
$ java -jar SINETStream-api-1.6.0.jar
==================================================
MessageWriter example
--------------------------------------------------
MessageWriterFactory<String> factory = MessageWriterFactory.<String>builder()
.service("service-1")
.topic("topic-1")
.build();
try (MessageWriter<String> writer = factory.getWriter()) {
writer.writer("message");
}
--------------------------------------------------
MessageWriterFactory parameters:
service(String service)
Service name defined in the configuration file. (REQUIRED)
clientId(String clientId)
If not specified, the value is automatically generated.
configName(String configName)
configuration name.
consistency(Consistency consistency[=AT_MOST_ONCE])
consistency: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE
dataEncryption(Boolean dataEncryption[=false])
Message encryption.
parameter(String key, Object parameter)
Rewrites the parameters described in the configuration file only for the specified key / value pairs.
parameters(Map parameters)
Overwrites the parameters described in the configuration file with the specified values.
serializer(Serializer serializer)
If not specified, use default serializer according to valueType.
topic(String topic)
The topic to send.
valueType(ValueType valueType[=BYTE_ARRAY])
The type of message.
==================================================
MessageReader example
--------------------------------------------------
MessageReaderFactory<String> factory = MessageReaderFactory.<String>builder()
.service("service-1")
.topic("topic-1")
.build();
try (MessageReader<String> reader = factory.getReader()) {
Message<String> msg;
while (Objects.nonNull(msg = reader.read)) {
System.out.println(msg.getValue());
}
}
--------------------------------------------------
MessageReaderFactory parameters:
service(String service)
Service name defined in the configuration file. (REQUIRED)
clientId(String clientId)
If not specified, the value is automatically generated.
configName(String configName)
configuration name.
consistency(Consistency consistency[=AT_MOST_ONCE])
consistency: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE
dataEncryption(Boolean dataEncryption[=false])
Message encryption.
deserializer(Deserializer deserializer)
If not specified, use default deserializer according to valueType.
parameter(String key, Object parameter)
Rewrites the parameters described in the configuration file only for the specified key / value pairs.
parameters(Map parameters)
Overwrites the parameters described in the configuration file with the specified values.
topic(String topic)
The topic to receive.
topics(Collection topics)
A list of topics to receive.
valueType(ValueType valueType[=BYTE_ARRAY])
The type of message.