English

チュートリアル - STEP1

1. 概要

このチュートリアルで実行するコンポーネントを以下の図に示します。

構成STEP1

はじめに」で示した各コンポーネントの役割を、以下に再掲します。

STEP1のチュートリアルでは、WriterReaderBroker の全てを一つのマシンで実行します。 具体的には、チュートリアルの実行環境で一つのコンテナを起動し、その中でチュートリアルの全てのコンポーネントを実行します。

前提条件

チュートリアルを実行するマシンに Docker Engine がインストールされている必要があります。Docker Engine のインストールについては「Docker Engine インストール手順へのリンク」などを参照してください。

実行例について

以降の説明では、コマンドの実行例を表記する箇所があります。 このチュートリアルでは Docker コンテナを利用するので、ホスト環境と、コンテナ環境の実行例を区別する必要があります。 そのために表記に以下の規則を設けます。

2. 実行環境を準備する

2.1. バックエンドシステムを準備する

SINETStreamが利用するバックエンドのメッセージングシステム(Kafka, MQTT)をdockerコンテナで実行します。

ホスト環境で以下のコマンドを実行してください。

$ docker run -d --name tutorial --hostname broker harbor.vcloud.nii.ac.jp/sinetstream/tutorial:1.8

コンテナが正常に起動したことを確認するために、状態を表示させます。

$ docker ps -l
CONTAINER ID        IMAGE                        COMMAND                  CREATED              STATUS              PORTS                NAMES
xxxxxxxxxxxx        sinetstream/tutorial:1.8   "/usr/local/bin/supe…"   About a minute ago   Up About a minute   1883/tcp, 9092/tcp   tutorial

STATUSUp と表示されていれば、コンテナが正常に起動しています。

起動したコンテナでは、SINETStreamが利用するメッセージングシステム Kafka, MQTT(Mosquitto) のブローカーが実行されています。

2.2. SINETStreamをインストールする

Reader, Writer が利用する SINETStream のPython3 ライブラリをコンテナ環境にインストールします。

まず、ホスト環境からコンテナ環境に入ります。 以下のコマンドを実行してください。

$ docker exec -it -u user01 tutorial bash

次にコンテナ環境で SINETStream のライブラリをインストールします。 以下のコマンドを実行してください。

[user01@broker]$ pip3 install --user sinetstream-kafka sinetstream-mqtt
Collecting sinetstream-kafka
(中略)
Successfully installed avro-python3-1.10.0 kafka-python-2.0.2 paho-mqtt-1.5.1 promise-2.3 pycryptodome-3.9.9 pyyaml-3.13 sinetstream-1.4.0 sinetstream-kafka-1.4.0 sinetstream-mqtt-1.4.0 six-1.15.0

最後に Successfully installed ...と表示されていれば、ライブラリのインストールに成功しています。 確認のためインストールされている Python3 ライブラリの一覧を表示してみます。

[user01@broker]$ pip3 list
Package           Version
----------------- --------
avro-python3      1.10.0
kafka-python      2.0.2
paho-mqtt         1.5.1
pip               20.2.4
promise           2.3
pycryptodome      3.9.9
PyYAML            3.13
setuptools        50.3.2
sinetstream       1.4.0
sinetstream-kafka 1.4.0
sinetstream-mqtt  1.4.0
six               1.15.0
supervisor        4.2.1

SINETStream以外のライブラリの Version 表示については上記と異なる場合があります。

2.3. Readerを準備する

コンテナ環境に Reader を準備します。 手順を以下に示します。

  1. Reader用のディレクトリを作成する
  2. SINETStream の設定ファイルを準備する
  3. Readerのプログラムを準備する

この節ではコンテナ環境にてコマンドの実行を行います。 2.2 でコンテナ環境の操作を行っていた状態が継続されていることを想定しています。

ディレクトリを作成し、そのディレクトリに移動します。

[user01@broker]$ mkdir -p ~/sinetstream/reader
[user01@broker]$ cd ~/sinetstream/reader

SINETStreamの設定ファイルを準備します。 このチュートリアルのための設定ファイルを GitHub に用意していますので、それを取得します。

[user01@broker]$ ss_url=https://raw.githubusercontent.com/nii-gakunin-cloud/sinetstream/main
[user01@broker]$ curl -O ${ss_url}/docs/tutorial/.sinetstream_config.yml

SINETStreamのPython3 APIを用いて作成されたReaderのサンプルプログラムをGitHubから取得します。 取得したプログラムには実行権限を付与します。

[user01@broker]$ curl -O ${ss_url}/python/sample/text/consumer.py
[user01@broker]$ chmod a+x consumer.py

ここまでの手順が正しく行われたことを確認します。 ディレクトリとファイルが以下の実行例と同じになっていることを確認してください。

[user01@broker]$ pwd
/home/user01/sinetstream/reader
[user01@broker]$ ls -a
.  ..  .sinetstream_config.yml  consumer.py

2.4. Writerを準備する

コンテナ環境に Writer を準備します。 手順を以下に示します。

  1. Writer用のディレクトリを作成する
  2. SINETStream の設定ファイルを準備する
  3. Writerのプログラムを準備する

ディレクトリを作成し、そのディレクトリに移動します。

[user01@broker]$ mkdir -p ~/sinetstream/writer
[user01@broker]$ cd ~/sinetstream/writer

GitHubからSINETStreamの設定ファイルを取得します。

[user01@broker]$ ss_url=https://raw.githubusercontent.com/nii-gakunin-cloud/sinetstream/main
[user01@broker]$ curl -O ${ss_url}/docs/tutorial/.sinetstream_config.yml

SINETStreamのPython3 APIを用いて作成されたWriterのサンプルプログラムをGitHubから取得します。 取得したプログラムには実行権限を付与します。

[user01@broker]$ curl -O ${ss_url}/python/sample/text/producer.py
[user01@broker]$ chmod a+x producer.py

ここまでの手順が正しく行われたことを確認します。 ディレクトリとファイルが以下の実行例と同じになっていることを確認してください。

[user01@broker]$ pwd
/home/user01/sinetstream/writer
[user01@broker]$ ls -a
.  ..  .sinetstream_config.yml  producer.py

3. Reader, Writerを実行する

ReaderWriterを実行して SINETStream を利用したメッセージの送受信が行えることを確認します。

SINETStreamで利用可能なメッセージングシステムには KafkaMQTT(Mosquitto) などがあります。 ここでは、まず Kafkaブローカーとメッセージの送受信が行えることを確認します。 その後、設定変更のみでプログラムを変更することなくMQTTブローカーともメッセージの送受信が行えることを確認します。

3.1. Kafkaブローカーとの間でメッセージの送受信を行う

ここからは、ReaderWriterのプログラムを同時に実行します。 それぞれを実行するためのターミナルをホスト環境で開いてください。

Readerの実行

Reader用のターミナルにて、ホスト環境からコンテナ環境に入ります。 以下のコマンドを実行してください。

$ docker exec -it -u user01 tutorial bash

Reader用のディレクトリに移動してください。

[user01@broker]$ cd ~/sinetstream/reader

Readerのプログラムを実行します。 引数に指定しているservice-tutorial-kafkaはKafkaブローカーを指定するサービス名です。

[user01@broker]$ ./consumer.py -s service-tutorial-kafka
Press ctrl-c to exit the program.
: service=service-tutorial-kafka

コマンドラインで指定したサービス名が : の後に表示されます。

Writerの実行

Writer用のターミナルにて、ホスト環境からコンテナ環境に入ります。 以下のコマンドを実行してください。

$ docker exec -it -u user01 tutorial bash

Writer用のディレクトリに移動してください。

[user01@broker]$ cd ~/sinetstream/writer

Writerのプログラムを実行します。 引数に指定しているservice-tutorial-kafkaはKafkaブローカーを指定するサービス名です。

[user01@broker]$ ./producer.py -s service-tutorial-kafka
Press ctrl-c to exit the program.
: service=service-tutorial-kafka

コマンドラインで指定したサービス名が : の後に表示されます。

メッセージの送受信

Writer用のターミナルにて、メッセージとなるテキストを入力し最後に改行を打ち込んでください。 改行までに入力された文字列がメッセージとして Kafka ブローカーに送信されます。

Readerのプログラムは Kafkaブローカーに送られたメッセージを受信してターミナルに表示しています。 先ほど Writer で送信したメッセージが表示されていることを確認してください。

Reader, Writer の停止

メッセージの送受信が行えたことを確認したら ReaderWriterのサンプルプログラムを停止させます。 それぞれのターミナルで ctrl-c を打ち込んでください。

3.2. MQTTブローカー(Mosquitto)との間でメッセージの送受信を行う

Kafkaブローカーと同じ操作を行い、MQTTブローカーを利用した場合もメッセージの送受信が行えることを確認します。 先ほどはプログラムの引数に Kafka ブローカーを指定するサービス名として service-tutorial-kafkaを指定しました。 ここでは代わりに MQTTブローカーを指定するためのサービス名 service-tutorial-mqttを指定します。

Readerの実行

Readerのターミナルにて、以下のコマンドを実行してください。

[user01@broker]$ ./consumer.py -s service-tutorial-mqtt
Press ctrl-c to exit the program.
: service=service-tutorial-mqtt

Readerのターミナルは、コンテナ環境のReader用ディレクトリにいることを想定しています。

Writerの実行

Writerのターミナルにて、以下のコマンドを実行してください。

[user01@broker]$ ./producer.py -s service-tutorial-mqtt
Press ctrl-c to exit the program.
: service=service-tutorial-mqtt

Writerターミナルは、コンテナ環境のWriter用ディレクトリにいることを想定しています。

メッセージの送受信

Kafkaブローカーの場合と同様の操作を行い、MQTTブローカーを用いた場合もメッセージの送受信が行えることを確認します。 Writerのターミナルから文字列を入力して改行を打ち込むとReaderのターミナルに入力した文字列に対応するメッセージが表示されることを確認してください。

Reader, Writer の停止

メッセージの送受信が行えたことを確認したら ReaderWriter のサンプルプログラムを停止します。 それぞれのターミナルで ctrl-c を打ち込んでください。

3.3. コンテナの停止、削除

最後にこのチュートリアルで使用したコンテナの停止、削除を行います。

コンテナの操作はホスト環境で実行します。 コンテナ環境から、ホスト環境に戻る場合は exit を実行してください。

[user01@broker]$ exit
exit
$

ホスト環境に戻ったあとで、以下のコマンドを実行してください。

$ docker stop tutorial
$ docker rm tutorial

4. SINETStreamについて

チュートリアルで使用した SINETStream の設定ファイルとAPIについて簡単な説明を行います。

4.1. 設定ファイル

Reader, Writer で使用した SINETStream の設定ファイル .sinetstream_config.yml の内容を以下に示します。

service-tutorial-kafka:
    type: kafka
    brokers: "broker:9092"
    topic: topic-tutorial-kafka
    value_type: text

service-tutorial-mqtt:
    type: mqtt
    brokers: "broker:1883"
    topic: topic-tutorial-mqtt
    value_type: text

設定ファイルは階層化したキー、バリューをYAMLの形式で記述しています。

トップレベルのキーには SINETStreamでサービス名と呼んでいる名前を記しています。 上記の例では service-tutorial-kafkaservice-tutorial-mqtt がこれにあたります。 サービス名はブローカーとの接続に関する種々のパラメータをまとめて扱うためのラベル名になります。 Reader, Writerのサンプルプログラムを実行する際に指定したサービス名は、設定ファイルに記述した、この値に対応しています。

サービス名の子要素にブローカーとの接続に関する具体的なパラメータを記述します。 サービス名service-tutorial-kafka に対応するパラメータは以下の部分になります。

    type: kafka
    brokers: "broker:9092"
    topic: topic-tutorial-kafka
    value_type: text

各パラメータに関する簡単な説明を以下に記します。

設定ファイルに関する詳細についてはユーザガイドを参照してください。

4.2. SINETStream API

Reader

Readerのサンプルプログラムconsumer.py で SINETStream API を使用している箇所を以下に示します。

with MessageReader(service) as reader:
    for message in reader:
        print(f"topic={message.topic} value='{message.value}'")

サンプルプログラムconsumer.py 全体のコードはGitHubで確認できます。

はじめにメッセージを受信するための MessageReader のオブジェクトを作成します。 その際、引数としてサービス名を指定します。 MessageReaderは通常Pythonのwith文で実行します。 これにより、ブローカーとの接続、切断が with文のブロックの境界で実行されます。 with文が返した値 reader はイテラブルなオブジェクトになっています。 for文などによりreaderから順次取得した値が、ブローカーから受信したメッセージとなります。

Writer

Writerのサンプルプログラムproducer.py で SINETStream API を使用している箇所を以下に示します。

with MessageWriter(service) as writer:
    while True:
        message = get_message()
        writer.publish(message)

サンプルプログラムproducer.py 全体のコードはGitHubで確認できます。

メッセージを送信するための MessageWriter のオブジェクトを作成します。 その際、引数としてサービス名を指定します。 MessageWriterは通常Pythonのwith文で実行します。 これにより、ブローカーとの接続、切断が with文のブロックの境界で実行されます。 with文が返した値 writer に対して .publish(message)を呼び出すことでメッセージをブローカーに送信することができます。

SINETStreamのPython APIに関する詳細についてはユーザガイドを参照してください。

4.3. SINETStream API(非同期API)

SINETStream v1.4 では非同期APIを利用することができます。 ここまで記してきたのと同様の処理を非同期APIで実行するサンプルプログラムをGithubに用意しています。

Reader(非同期API)

非同期APIを用いたReaderのサンプルプログラムconsumer.pyでSINETStream APIを使用している箇所を以下に示します。

reader = AsyncMessageReader(service)
reader.on_message = show_message
reader.open()

はじめにメッセージを受信するための AsyncMessageReader のオブジェクトを作成します。 その際、引数としてサービス名を指定します。

次に.on_messageプロパティにメッセージを受信した際に呼び出すコールバック関数を指定します。 コールバック関数は引数から受信したメッセージを受け取ることができます。 サンプルプログラムでは以下のようなコールバック関数を用いています。

def show_message(message):
    ts = datetime.fromtimestamp(message.timestamp)
    print(f"[{ts}] topic={message.topic} value='{message.value}'")

最後にreader.open()を呼び出すことでブローカーに接続されます。

Writer(非同期API)

非同期APIを用いたWriterのサンプルプログラムproducer.pyでSINETStream APIを使用している箇所を以下に示します。

with AsyncMessageWriter(service) as writer:
    while True:
        message = get_message()
        writer.publish(message)

メッセージを送信するための AsyncMessageWriter のオブジェクトを作成します。 その際、引数としてサービス名を指定します。 AsyncMessageWriterは通常Pythonのwith文で実行します。 これにより、ブローカーとの接続、切断が with文のブロックの境界で実行されます。 with文が返した値 writer に対して .publish(message)を呼び出すことでメッセージをブローカーに送信することができます。

同期APIでは.publish()が送信処理の完了までブロックしますが、非同期APIの場合はブロックすることなく返ります。 また非同期APIの.publish()は処理結果が確定した後の処理を指定するための Promiseオブジェクトを返します。