SINETStream User Guide
Python API
1. Example 2. Summary of Python API classes 2.1 MessageReader Class 2.2 AsyncMessageReader Class 2.3 MessageWriter Class 2.4 AsyncMessageWriter Class 2.5 Message Class 2.6 Metrics Class 2.7 Summary of exception 3. Messaging system-specific parameters 3.1 Apache Kafka 3.2 MQTT (Eclipse Paho) 3.3 S3 4. How to show a cheat sheet
1. Example
First, a simple example is shown.
This example uses two services, namely service-1
and service-2
, each with a different messaging system as its backend.
The backend of service-1
is Apache Kafka, which consists of four brokers, namely kafka-1
thru kafka-4
.
The backend of service-2
is MQTT, which consists of one broker, 192.168.2.105
.
Creating a configuration file
The configuration file contains settings for the clients to connect to the broker. Please refer to the Configuration file for details.
In this example, we create the following configuration file .sinetstream_config.yml
in the current directory on the client machine.
service-1:
type: kafka
brokers:
- kafka-1:9092
- kafka-2:9092
- kafka-3:9092
- kafka-4:9092
service-2:
type: mqtt
brokers: 192.168.2.105:1883
username_pw_set:
username: user01
password: pass01
Sending Messages
The following code sends two messages to the topic topic-1
of the messaging system associated with the service service-1
.
from sinetstream import MessageWriter
writer = MessageWriter('service-1', 'topic-1')
with writer as f:
f.publish(b'Hello! This is the 1st message.')
f.publish(b'Hello! This is the 2nd message.')
First, create an instance of the MessageWriter object by specifying the service name and the topic name.
Open this instance with a with
statement and send messages to the broker by invoking the publish()
method in the block.
The MessageWriter object automatically connects to the messaging system when entering the
with
block, and it automatically closes the connection when exiting thewith
block.
By default, the argument of publish()
is a byte sequence.
To transfer an object other than a byte sequence, specify value_type or value_serializer in the constructor of the MessageWriter Class.
Receiving Messages
The following code receives messages from the topic topic-1
of the messaging system associated with the service service-1
.
from sinetstream import MessageReader
reader = MessageReader('service-1', 'topic-1')
with reader as f:
for msg in f:
print(msg.value)
First, create an instance of the MessageReader object by specifying the service name and the topic name.
Open this instance with a with
statement, loop the iterator for f
in the block, and receive messages from the broker by referring to the value
property of the iterator.
The MessageReader object automatically connects to the messaging system when entering the
with
block, and it automatically closes the connection when exiting thewith
block.
By default, the message reader process does not time out and the for
statement becomes an infinite loop.
To exit the for
loop, specify the receive_timeout_ms
parameter in the constructor of the MessageReader class or perform a signal handling.
2. Summary of Python API classes
- sinetstream.MessageReader
- The class to receive messages from the messaging system.
- sinetstream.AsyncMessageReader
- The class to receive messages from the messaging system. (Asynchronous API)
- sinetstream.MessageWriter
- The class to send messages to the messaging system.
- sinetstream. AsyncMessageWriter
- The class to send messages to the messaging system. (Asynchronous API)
- sinetstream.Message
- The class to represent a message.
- sinetstream.SinetError
- The parent class of all the exception classes in SINETStream
2.1 MessageReader Class
MessageReader()
The constructor of the MessageReader class
MessageReader(
service,
topics=None,
consistency=AT_MOST_ONCE,
client_id=DEFAULT_CLIENT_ID,
value_type=None,
value_deserializer=None,
receive_timeout_ms=float("inf"),
**kwargs)
Parameters
- service
- Service name.
- The name must be defined in the configuration file.
- topics
- Topic name.
- Specify a
str
or alist
for a single topic. - Specify a
list
when subscribing for multiple topics. - If not specified, the value specified in the configuration file will be used as default.
- consistency
- The reliability of the message delivery.
- AT_MOST_ONCE (=0)
- A message may not arrive.
- AT_LEAST_ONCE (=1)
- A message always arrives but may arrive many times.
- EXACTLY_ONCE (=2)
- A message always arrives only once.
- client_id
- Client name
- If any of DEFAULT_CLIENT_ID, None, or an empty string is specified, the library will automatically generate a value.
- The generated value can be obtained as a property of this object.
- value_type
- The type name of message payload.
MessageReader
will treat the payload as the type specified here.- When using the standard package, the following two type names are supported.
- Set
"byte_array"
(default) to treat the payload asbytes
. - Set
"text"
to treat the payload asstr
.
- Set
- When using a plugin pacakge, other type names may be supported.
- When using the image type plugin provided with SINETStream v1.1 (or later), the following type name is supported.
- Set
"image"
to treat the payload asnumpy.ndarray
, which is the image data type in OpenCV. - The color order in
numpy.ndarray
is BGR, which is consistent with OpenCV.
- Set
- value_deserializer
- The function used to decode the value from the byte array in the message.
- If not specified, an appropriate deserializer function will be used according to
value_type
.
- receive_timeout_ms
- Maximum time (ms) to wait for message to arrive.
- Once timed out, no more messages can be read from this connection.
- data_encryption
- Enable or disable message encryption and decryption.
- kwargs
- Specify the messaging system-specific parameters as YAML mappings.
The parameters specified in kwargs
are passed to the constructor of the backend messaging system.
Please refer to the Messaging system-specific parameters of for details.
For the arguments other than service
, their default values can be specified in the configuration file.
If the same parameter is specified in both the configuration file and the constructor argument, the value specified in the constructor argument takes precedence.
** Limitation: SINETStream downgrades to AT_LEAST_ONCE
even if EXACTLY_ONCE
is specified for Kafka consistency
. **
Exception
- NoServiceError
- The specified service name is not defined in the configuration file.
- NoConfigError
- The configuration file does not exist or cannot be read
- InvalidArgumentError
- The format of the specified argument is incorrect, e.g., the value of consistency is out of range or the topic name includes invalid character, etc.
- UnsupportedServiceTypeError
- The plugin for the specified service type is not installed.
Properties
The following properties can be used to get the parameter value specified in the configuration file or in the constructor.
client_id
consistency
topics
value_type
MessageReader.open()
Connects to the broker of the messaging system.
Implicitly invoked when using MessageReader in a with
statement; not intended for explicit invocation.
Returned value
A handler that mentains the connection status with the messaging system.
Exception
- ConnectionError
- Error connecting to the broker.
- AlreadyConnectedError
open()
is called again for an object that is already connected.
MessageReader.close()
Disconnects from the broker of the messaging system.
Implicitly invoked when using MessageReader in a with
statement; not intended for explicit invocation.
MessageReader.__iter__()
Returns an iterator for the messages received from the messaging system.
Exception
The following errors may occur when calling next()
to the iterator.
- AuthorizationError
- Tried to receive messages from an unauthorized topic.
- InvalidMessageError
- Invalid message format in SINETStream.
AuthorizationError does not occur in the following cases:
- When using MQTT (Mosquitto)
- Because the MQTT broker raises no error for unauthorized operation.
2.2 AsyncMessageReader Class
AsyncMessageReader()
The constructor of the AsyncMessageReader class
AsyncMessageReader(
service,
topics=None,
consistency=AT_MOST_ONCE,
client_id=DEFAULT_CLIENT_ID,
value_type="byte_array",
value_deserializer=None,
**kwargs)
Parameters
- service
- Service name.
- The name must be defined in the configuration file.
- topics
- Topic name.
- Specify a str or a list for a single topic.
- Specify a list when subscribing for multiple topics.
- If not specified, the value specified in the configuration file will be used as default.
- consistency
- The reliability of the message delivery.
- AT_MOST_ONCE (=0)
- A message may not arrive.
- AT_LEAST_ONCE (=1)
- A message always arrives but may arrive many times.
- EXACTLY_ONCE (=2)
- A message always arrives only once.
- client_id
- Client name
- If any of DEFAULT_CLIENT_ID, None, or an empty string is specified, the library will automatically generate a value.
- The generated value can be obtained as a property of this object.
- value_type
- The type name of message payload.
- AsyncMessageReader will treat the payload as the type specified here.
- When using the standard package, the following two type names are supported.
- Set ‘“byte_array”’ (default) to treat the payload as bytes.
- Set ‘“text”’ to treat the payload as str.
- When using a plugin pacakge, other type names may be supported.
- When using the image type plugin provided with SINETStream, the following type name is supported.
- Set ‘“image”’ to treat the payload as ‘numpy.ndarray’, which is the image data type in OpenCV.
- The color order in ‘numpy.ndarray’ is BGR, which is consistent with OpenCV.
- value_deserializer
- The function used to decode the value from the byte array in the message.
- If not specified, an appropriate deserializer function will be used according to value_type.
- data_encryption
- Enable or disable message encryption and decryption.
- kwargs
- Specify the messaging system-specific parameters as YAML mappings.
The parameters specified in ‘kwargs’ are passed to the constructor of the backend messaging system. Please refer to the Messaging system-specific parameters for details.
For the arguments other than ‘service’, their default values can be specified in the configuration file. If the same parameter is specified in both the configuration file and the constructor argument, the value specified in the constructor argument takes precedence.
** Limitation: SINETStream downgrades to ‘AT_LEAST_ONCE’ even if ‘EXACTLY_ONCE’ is specified for Kafka ‘consistency’. **
Exception
- NoServiceError
- The specified service name is not defined in the configuration file.
- NoConfigError
- The configuration file does not exist or cannot be read
- InvalidArgumentError
- The format of the specified argument is incorrect, e.g., the value of consistency is out of range or the topic name includes invalid character, etc.
- UnsupportedServiceTypeError
- The plugin for the specified service type is not installed.
Properties
The following properties can be used to get the parameter value specified in the configuration file or in the constructor.
client_id
consistency
topics
value_type
AsyncMessageReader.open()
Connects to the broker of the messaging system.
Returned value
A handler that mentains the connection status with the messaging system.
Exception
- ConnectionError
- Error connecting to the broker.
- AlreadyConnectedError
- ‘open()’ is called again for an object that is already connected.
AsyncMessageReader.close()
Disconnects from the broker of the messaging system. Implicitly invoked when using AsyncMessageReader in a ‘with’ statement; not intended for explicit invocation.
Property: ‘AsyncMessageReader.on_message’
Set the callback function to be invoked when the message is received.
2.3 MessageWriter Class
MessageWriter()
MessageWriter(
service,
topic,
consistency=AT_MOST_ONCE,
client_id=DEFAULT_CLIENT_ID,
value_serializer=None,
**kwargs)
The constructor of MessageWriter class
Parameters
- service
- Service name
- The name must be defined in the configuration file.
- topic
- Topic name
- If not specified, the value specified in the configuration file will be used as default.
- consistency
- Specify the reliability of message delivery.
- AT_MOST_ONCE (=0)
- A message may not be delivered.
- AT_LEAST_ONCE (=1)
- A message is always delivered but may be delivered many times.
- EXACTLY_ONCE (=2)
- A message is always delivered only once.
- client_id
- Client name.
- If any of DEFAULT_CLIENT_ID, None, or an empty string is specified, the library will automatically generate a value.
- value_type
- The type name of message payload.
MessageWriter.publish()
will treat the given data as the type specified here.- When using the standard package, the following two type names are supported.
- Set
"byte_array"
(default) to treat the payload asbytes
. - Set
"text"
to treat the payload asstr
.
- Set
- When using a plugin pacakge, other type names may be supported.
- When using the image type plugin provided with SINETStream v1.1 (or later), the following type name is supported.
- Set
"image"
to treat the payload asnumpy.ndarray
, which is the image data type in OpenCV. - The color order in
numpy.ndarray
is BGR, which is consistent with OpenCV.
- Set
- value_serializer
- The function used to encode the value to the byte array in the message.
- If not specified, an appropriate serializer function will be used according to
value_type
.
- data_encryption
- Enable or disable message encryption and decryption.
- kwargs
- Specify the messaging system-specific parameters as YAML mappings.
The parameters specified in kwargs
are passed to the constructor of the backend messaging system.
Please refer to the Messaging system-specific parameters of for details.
For the arguments other than service
, their default values can be specified in the configuration file.
If the same parameter is specified in both the configuration file and the constructor argument, the value specified in the constructor argument takes precedence.
** Limitations: SINETStream downgrades to AT_LEAST_ONCE
even if EXACTLY_ONCE
is specified for Kafka consistency
. **
Exception
- NoServiceError
- The
service
corresponding to the value specified for service does not exist in the configuration file
- The
- NoConfigError
- The configuration file does not exist or cannot be read
- InvalidArgumentError
- The format of the specified argument is incorrect.
When the value of
consistency
is out of range or a character string that is not allowed as atopic
name
- The format of the specified argument is incorrect.
When the value of
- UnsupportedServiceTypeError
- The plugin for the messaging system corresponding to
type
specified in the configuration file is not installed
- The plugin for the messaging system corresponding to
Properties
The following properties can be used to get the parameter value specified in the configuration file or in the constructor.
client_id
consistency
topic
value_type
MessageWriter.open()
Connects to the broker of the messaging system.
Implicitly invoked when using MessageWriter in a with
statement; not intended for explicit invocation.
Returned value
A handler that mentains the connection status with the messaging system
Exception
- ConnectionError
- Error connecting to the broker
- AlreadyConnectedError
open()
is called again for an object that is already connected
MessageWriter.close()
Disconnects from the broker of the messaging system.
Implicitly invoked when using MessageWriter in a with
statement; not intended for explicit invocation.
MessageWriter.publish(message)
Sends a message to the broker of the messaging system.
The message is serialized according to the value_type
parameter value or using the function specified by value_serializer
.
Exception
- InvalidMessageError
- The type of
message
does not matchvalue_type
or the function specified byvalue_serializer
.
- The type of
- AuthorizationError
- Tried to send messages to an unauthorized topic.
AuthorizationError does not occur in the following cases:
- When using MQTT (Mosquitto)
- Because the MQTT broker raises no error for unauthorized operation.
- When using Kafka with
Consistency
set toAT_MOST_ONCE
- Because the client does not wait for a response from the broker after sending a message. Therefore, the client cannot detect an error on the broker side.
2.4 AsyncMessageWriter Class
AsyncMessageWriter()
AsyncMessageWriter(
service,
topic,
consistency=AT_MOST_ONCE,
client_id=DEFAULT_CLIENT_ID,
value_type="byte_array",
value_serializer=None,
**kwargs)
The constructor of MessageWriter class
Parameters
- service
- Service name
- The name must be defined in the configuration file.
- topic
- Topic name
- If not specified, the value specified in the configuration file will be used as default.
- consistency
- Specify the reliability of message delivery.
- AT_MOST_ONCE (=0)
- A message may not be delivered.
- AT_LEAST_ONCE (=1)
- A message is always delivered but may be delivered many times.
- EXACTLY_ONCE (=2)
- A message is always delivered only once.
- client_id
- Client name.
- If any of DEFAULT_CLIENT_ID, None, or an empty string is specified, the library will automatically generate a value.
- value_type
- The type name of message payload.
- ‘MessageWriter.publish()’ will treat the given data as the type specified here.
- When using the standard package, the following two type names are supported.
- Set ‘“byte_array”’ (default) to treat the payload as ‘bytes’.
- Set ‘“text”’ to treat the payload as ‘str’.
- When using a plugin pacakge, other type names may be supported.
- When using the image type plugin provided with SINETStream, the following type name is supported.
- Set ‘“image”’ to treat the payload as ‘numpy.ndarray’, which is the image data type in OpenCV.
- The color order in ‘numpy.ndarray’ is BGR, which is consistent with OpenCV.
- value_serializer
- The function used to encode the value to the byte array in the message.
- If not specified, an appropriate serializer function will be used according to ‘value_type’.
- data_encryption
- Enable or disable message encryption and decryption.
- kwargs
- Specify the messaging system-specific parameters as YAML mappings.
The parameters specified in ‘kwargs’ are passed to the constructor of the backend messaging system. Please refer to the Messaging system-specific parameters for details.
For the arguments other than service, their default values can be specified in the configuration file. If the same parameter is specified in both the configuration file and the constructor argument, the value specified in the constructor argument takes precedence.
** Limitations: SINETStream downgrades to ‘AT_LEAST_ONCE’ even if ‘EXACTLY_ONCE’ is specified for Kafka ‘consistency’. **
Exception
- NoServiceError
- The service corresponding to the value specified for service does not exist in the configuration file
- NoConfigError
- The configuration file does not exist or cannot be read
- InvalidArgumentError
- The format of the specified argument is incorrect. When the value of consistency is out of range or a character string that is not allowed as a topic name
- UnsupportedServiceTypeError
- The plugin for the messaging system corresponding to type specified in the configuration file is not installed
Properties
The following properties can be used to get the parameter value specified in the configuration file or in the constructor.
client_id
consistency
topic
value_type
AsyncMessageWriter.open()
Connects to the broker of the messaging system. Implicitly invoked when using ‘AsyncMessageWriter’ in a with statement; not intended for explicit invocation.
Returned value
A handler that mentains the connection status with the messaging system
Exception
- ConnectionError
- Error connecting to the broker
- AlreadyConnectedError
- ‘open()’ is called again for an object that is already connected
‘AsyncMessageWriter.close()’
Disconnects from the broker of the messaging system. Implicitly invoked when using ‘AsyncMessageWriter’ in a with statement; not intended for explicit invocation.
‘AsyncMessageWriter.publish(message)’
Sends a message to the broker of the messaging system. The message is serialized according to the ‘value_type’ parameter or using the function specified by ‘value_serializer’ of AsyncMessageWriter and then sent it to broker.
‘publish(message)’ is an asynchronous process and returns a promise object of promise. By using the methods ‘.then()’ and ‘.catch()’ of the Promise object, It is possible to set processing according to the transmission result (success or failure). A usage example is shown as below.
with AsyncMessageWriter('service-1') as writer:
writer.publish("message 1").then(lambda _: print("success")).catch(lambda _: print("failure"))
Exception
- InvalidMessageError
- The type of message does not match ‘value_type’ or the function specified by ‘value_serializer’.
- AuthorizationError
- Tried to send messages to an unauthorized topic.
Depending on the messaging system, even if an unauthorized operation is performed, an AuthorizationError Exception may not occur in the following cases:
- When using MQTT (Mosquitto)
- Because the MQTT broker raises no error for unauthorized operation.
- When using Kafka with ‘Consistency’ set to ‘AT_MOST_ONCE’
- Because the client does not wait for a response from the broker after sending a message. Therefore, the client cannot detect an error on the broker side.
2.5 Message class
The wrapper class for the message object provided by the messaging system.
Property
All the properties are read only.
- value
- The payload of the message (given by
Raw.value
for Kafka andraw.payload
for MQTT). - By default, a byte sequence is obtained.
- The type of
value
depends on thevalue_type
parameter or thevalue_deserializer
function specified in MessageWriter.- If
value_type
is"byte_array"
(default), the type isbytes
. - If
value_type
is"text"
, the type isstr
. - If
value_deserializer
is specified, an object converted by this function is obtained.
- If
- The payload of the message (given by
- topic
- The topic name.
- timestamp
- The time the message was sent (UNIX time)
- Unit: second
- Type: float
0
indicates no time is set
- The time the message was sent (UNIX time)
- timestamp_us
- The time the message was sent (UNIX time)
- Unit: microsecond
- Type: int
0
indicates no time is set
- The time the message was sent (UNIX time)
- raw
- The message object provided by the messaging system.
2.6 Metrics Class
Metrics class You can get metrics information by referencing the metrics property for Reader/Writer objects. After close() Reader/Writer objects, you can get metrics information when it is closed (but raw is None).
- MessageReader.metrics
- MessageWriter.metrics
- AsyncMessageReader.metrics
- AsyncMessageWriter.metrics
The Reader/Writer metrics are reset when the reset_metrics() method was called from the Reader/Writer class.
If the reset_raw
argument is set to True, the metrics of the backend messaging system will also be reset if possible.
- MessageReader.reset_metrics(reset_raw=False)
- MessageWriter.reset_metrics(reset_raw=False)
- AsyncMessageReader.reset_metrics(reset_raw=False)
- AsyncMessageWriter.reset_metrics(reset_raw=False)
Eclipse Paho, an MQTT client library used in the SINETStream MQTT plugin, does not provide metrics collection capability. The Kafka client library has the capability, but does not provide the reset function.
The metrics are measured at the boundary of the SINETStream main library and the specified messaging system plugin. Therefore, a stream of encrypted massages will be measured if the data encryption function provided by SINETStream is used.
Property
- start_time, start_time_ms
- float
- The Unix time when the measurement was started.
- The unit of the start_time is seconds.
- The unit of the start_time_ms is milliseconds.
- The time when the Reader/Writer object was created or reset.
- end_time, end_time_ms
- float
- The Unix time when the measurement was completed.
- The unit of the end_time is seconds.
- The unit of the end_time_ms is milliseconds.
- The time referenced in the metrics property.
- time, time_ms
- float
- Measurement time (= EndTime - StartTime).
- The unit of the time is seconds.
- The unit of the time_ms is milliseconds.
- = end_time - start_time
- msg_count_total
- int
- The cumulative number of messages sent and received.
- msg_count_rate
- float
- The rate of the number of messages sent and received.
- = msg_count_total / time
- return 0 if time is 0.
- msg_bytes_total
- int
- The Cumulative amount of messages sent and received in bytes.
- msg_bytes_rate
- float
- The rate of the amount of messages sent and received.
- = msg_bytes_total / time
- return 0 if time is 0.
- msg_size_min
- int
- The minimum size of messages sent and received in bytes.
- msg_size_avg
- float
- The average size of messages sent and received in bytes.
- = msg_bytes_total / msg_count_total
- return 0 if msg_count_total is 0.
- msg_size_max
- int
- The maximum size of messages sent and received in bytes.
- error_count_total
- int
- The cumulative number of errors.
- error_count_rate
- float
- The error rate.
- = error_count_total / time
- return 0 if time is 0.
- raw
- The metrics provided by the specified messaging system client library.
Examples
Display the number of received messages and its amount in bytes:
from sinetstream import MessageReader
reader = MessageReader('service-1', 'topic-001')
# (1)
with reader as f:
for msg in f:
pass
m = reader.metrics # Statistics on the accumulation from (1)
print(f'COUNT: {m.msg_count_total}')
print(f'BYTES: {m.msg_bytes_total}')
Display the receive rate for every 10 messages:
from sinetstream import MessageReader
reader = MessageReader('service-1', 'topic-001')
with reader as f:
count = 0
for msg in f:
count += 1
if (count == 10):
count = 0
m = reader.metrics
reader.reset_metrics()
print(f'COUNT/s: {m.msg_count_rate}')
print(f'BYTES/s: {m.msg_bytes_rate}')
2.7 Summary of exception
exception | origin method | reason |
---|---|---|
NoServiceError |
MessageReader() , MessageWriter() , AsyncMessageReader() , AsyncMessageWriter() |
The specified service name is not defined in the configuration file. |
UnsupportedServiceTypeError |
MessageReader() , MessageWriter() , AsyncMessageReader() , AsyncMessageWriter() |
The specified service type is not supported or the corresponding plugin is not installed. |
NoConfigError |
MessageReader() , MessageWriter() , AsyncMessageReader() , AsyncMessageWriter() |
The configuration file does not exist. |
InvalidArgumentError |
MessageReader() , MessageWriter() , AsyncMessageReader() , AsyncMessageWriter() , MessageReader.open() , MessageWriter.open() , MessageWriter.publish() , AsyncMessageReader().open() , AsyncMessageWriter().open() |
The argument is incorrect. |
ConnectionError |
MessageReader.open() , MessageWriter.open() , MessageWriter.publish() , AsyncMessageReader().open() , AsyncMessageWriter().open() |
Error connecting to the broker. |
AlreadyConnectedError |
MessageReader.open() , MessageWriter.open() , AsyncMessageReader().open() , AsyncMessageWriter().open() |
Already connected to a broker. |
InvalidMessageError |
MessageWriter.publish() , MessageReader.__iter__().__next__() |
The message format is incorrect. |
AuthorizationError |
MessageWriter.publish() , MessageReader.__iter__().__next__() |
An unauthorized operation was conducted |
3. Messaging system-specific parameters
kwargs
can be used to transparently pass parameters to the backend messaging system.
The actual parameters that can be passed depend on the backend.
No validation is performed.
3.1 Apache Kafka
Basically, the constructor arguments to
KafkaConsumer and
KafkaProducer of
kafka-python can be specified as parameters.
If the parameter is valid only in KafkaConsumer
or KafkaProducer
, it affects MessageReader
or MessageWriter
, respectively.
3.2 MQTT (Eclipse Paho)
Basically, the constructor arguments and the setter function (XXX_set
) arguments of
paho.mqtt.client.Client
can be specified as parameters.
3.3 S3
4. How to show a cheat sheet
After installing SINETStream, run python3 -m sinetstream
to show a cheat sheet.
$ python3 -m sinetstream
==================================================
Default parameters:
MessageReader(
service=SERVICE, # Service name defined in the configuration file. (REQUIRED)
topics=TOPICS, # The topic to receive.
config=CONFIG, # Config name on the config-server.
consistency=AT_MOST_ONCE, # consistency: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE
client_id=DEFAULT_CLIENT_ID, # If not specified, the value is automatically generated.
value_type=BYTE_ARRAY, # The type of message.
value_deserializer=None # If not specified, use default deserializer according to valueType.
)
MessageWriter(
service=SERVICE, # Service name defined in the configuration file. (REQUIRED)
topic=TOPIC, # The topic to send.
config=CONFIG, # Config name on the config-server.
consistency=AT_MOST_ONCE, # consistency: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE
client_id=DEFAULT_CLIENT_ID, # If not specified, the value is automatically generated.
value_type=BYTE_ARRAY, # The type of message.
value_serializer=None # If not specified, use default serializer according to valueType.
)
AsyncMessageReader(
service=SERVICE, # Service name defined in the configuration file. (REQUIRED)
topics=TOPICS, # The topic to receive.
config=CONFIG, # Config name on the config-server.
consistency=AT_MOST_ONCE, # consistency: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE
client_id=DEFAULT_CLIENT_ID, # If not specified, the value is automatically generated.
value_type=BYTE_ARRAY, # The type of message.
value_deserializer=None # If not specified, use default deserializer according to valueType.
)
AsyncMessageWriter(
service=SERVICE, # Service name defined in the configuration file. (REQUIRED)
topic=TOPIC, # The topic to send.
config=CONFIG, # Config name on the config-server.
consistency=AT_MOST_ONCE, # consistency: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE
client_id=DEFAULT_CLIENT_ID, # If not specified, the value is automatically generated.
value_type=BYTE_ARRAY, # The type of message.
value_serializer=None # If not specified, use default serializer according to valueType.
)