English

チュートリアル - STEP2

1. 概要

このチュートリアルで実行するコンポーネントを以下の図に示します。

構成STEP2

はじめに」で示した各コンポーネントの役割を、以下に再掲します。

STEP1のチュートリアルでは、WriterReaderBroker を同一のマシンで実行しましたが、 このチュートリアルでは、WriterReaderBroker を別々のマシンで実行します。

前提条件

  1. チュートリアルを実行するマシン(Writer, Reader, Broker)に Docker Engine がインストールされていること
  2. Brokerとなるマシンで TCP/1883, TCP/9092 のポートが利用可能なこと
    • ブローカーがTCPの待ち受けポートとして利用します
  3. Writer, ReaderのマシンからBrokerのマシンの TCP/1883, TCP/9092 にアクセス可能なこと
    • ファイアウォールなどで通信がブロックされないように設定してください

実行例について

このチュートリアルではWriter, Reader, Broker が別のマシンになり、それぞれにホスト環境、コンテナ環境があるので6つの環境での実行例がでてきます。 実行例におけるホスト名、ユーザ名などを以下の表のように定めます。

役割 環境 ホスト名 ユーザ名 IPアドレス
Broker ホスト環境 host-broker user00 192.168.1.XXX
Reader ホスト環境 host-reader user00 -
Writer ホスト環境 host-writer user00 -
Broker コンテナ環境 broker user01 -
Reader コンテナ環境 reader user01 -
Writer コンテナ環境 writer user01 -

ホスト環境の値については、実際にチュートリアルを実行する環境に合わせて適宜読み替えてください。

実行例を示す際は、コマンドプロンプトにホスト名、ユーザ名を示すことで、どの環境で実行しているのかを区別できるように表記しています。 たとえば、Writerのコンテナ環境での実行例は以下のように表記します。

[user01@writer]$ ls

プロンプトの[] の中の@の前の部分がユーザ名を、後の部分がホスト名を表しています。

Writer のホスト環境の場合は以下のようになります。

[user00@host-writer]$ ls

2. 実行環境を準備する

Broker, Reader, Writer のそれぞれの環境を順に準備します。 それぞれのマシンのターミナルを開き操作を行ってください。

2.1. Brokerを準備する

2.1.1. バックエンドシステムを準備する

SINETStreamが利用するバックエンドのメッセージングシステム(Kafka, MQTT)をdockerコンテナで実行します。

Brokerのホスト環境で以下のコマンドを実行してください。

[user00@host-broker]$ docker run -d --name broker --hostname broker \
                      -p 1883:1883 -p 9092:9092 harbor.vcloud.nii.ac.jp/sinetstream/tutorial:1.8

コンテナが正常に起動したことを確認するために、状態を表示させます。

[user00@host-broker]$ docker ps -l
CONTAINER ID        IMAGE                        COMMAND                  CREATED              STATUS              PORTS                                            NAMES
xxxxxxxxxxxx        sinetstream/tutorial:1.8   "/usr/local/bin/supe…"   About a minute ago   Up About a minute   0.0.0.0:1883->1883/tcp, 0.0.0.0:9092->9092/tcp   broker

STATUSUp と表示されていれば、コンテナが正常に起動しています。

起動したコンテナでは、SINETStreamが利用するメッセージングシステム Kafka, MQTT(Mosquitto) のブローカーが実行されています。 コンテナで実行しているプロセスを確認してみると以下のようになります。

[user00@host-broker]$ docker exec -t broker ps ax
  PID TTY      STAT   TIME COMMAND
    1 ?        Ss     0:00 /usr/bin/python3 /usr/local/bin/supervisord -n -c /et
    9 ?        Sl     0:05 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPaus
   10 ?        S      0:00 /usr/sbin/mosquitto -c /etc/mosquitto/mosquitto.conf
   12 ?        Sl     0:01 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGC
  822 pts/0    Rs+    0:00 ps ax

上記の実行例に表示された、それぞれのプロセスの役割を以下に示します。

2.2. Readerを準備する

Readerを実行する環境をコンテナで用意します。

2.2.1. Readerの実行環境となるコンテナを起動する

Readerのホスト環境で以下のコマンドを実行してください。

[user00@host-reader]$ docker run -d --name reader --hostname reader -e ENABLE_BROKER=false \
                      --add-host=broker:192.168.1.xxx harbor.vcloud.nii.ac.jp/sinetstream/tutorial:1.8

192.168.1.XXXには実際に使用する環境のBrokerのIPアドレスを指定してください。

コンテナが正常に起動したことを確認するために、状態を表示させます。

[user00@host-reader]$ docker ps -l
CONTAINER ID        IMAGE                        COMMAND                  CREATED             STATUS              PORTS                NAMES
xxxxxxxxxxxx        sinetstream/tutorial:1.8   "/usr/local/bin/supe…"   About a minute ago  Up About a minute   1883/tcp, 9092/tcp   reader

STATUSUp と表示されていれば、コンテナが正常に起動しています。

ここで実行したコンテナイメージはBrokerと同じものですが、コンテナ起動時に-e ENABLE_BROKER=falseを指定することで、ブローカーが実行されないようになっています。 コンテナ内のプロセスの一覧を表示してそのことを確認してみます。

[user00@host-reader]$ docker exec -t reader ps ax
  PID TTY      STAT   TIME COMMAND
    1 ?        Ss     0:00 /usr/bin/python3 /usr/local/bin/supervisord -n -c /et
   30 pts/0    Rs+    0:00 ps ax

Brokerコンテナでプロセス一覧を確認した時の結果と異なり、Kafkaブローカー、MQTTブローカー、ZooKeeperが実行されていないことがわかります。

Reader用のコンテナを起動する際に指定した --add-hostReaderコンテナの /etc/hosts に、Broker の IPアドレスを登録するためのものです。 Kafkaブローカーを利用するためにはサーバアドレスの名前解決が必要となるため、このパラメータの指定を追加しています。 Readerコンテナの /etc/hosts を表示して Broker のIPアドレスが登録されていることを確認します。

[user00@host-reader]$ docker exec -t reader cat /etc/hosts
127.0.0.1       localhost
::1     localhost ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
192.168.1.XXX   broker
172.17.0.3      reader

192.168.1.XXXにはコンテナ起動時に指定したBrokerのIPアドレスが表示されます。また reader のIPアドレスは実行環境によって異なる値が表示されます。

2.2.2. SINETStreamをインストールする

SINETStreamのPython3ライブラリをコンテナ環境にインストールします。 まずReaderのホスト環境からコンテナ環境にはいります。

[user00@host-reader]$ docker exec -it -u user01 reader bash

次にコンテナ環境で SINETStream のライブラリをインストールします。 以下のコマンドを実行してください。

[user01@reader]$ pip3 install --user sinetstream-kafka sinetstream-mqtt
Collecting sinetstream-kafka
(中略)
Successfully installed avro-python3-1.10.0 kafka-python-2.0.2 paho-mqtt-1.5.1 promise-2.3 pycryptodome-3.9.9 pyyaml-3.13 sinetstream-1.4.0 sinetstream-kafka-1.4.0 sinetstream-mqtt-1.4.0 six-1.15.0

最後に Successfully installed ...と表示されていれば、ライブラリのインストールに成功しています。 確認のためインストールされている Python3 ライブラリの一覧を表示してみます。

[user01@reader]$ pip3 list
Package           Version
----------------- --------
avro-python3      1.10.0
kafka-python      2.0.2
paho-mqtt         1.5.1
pip               20.2.4
promise           2.3
pycryptodome      3.9.9
PyYAML            3.13
setuptools        50.3.2
sinetstream       1.4.0
sinetstream-kafka 1.4.0
sinetstream-mqtt  1.4.0
six               1.15.0
supervisor        4.2.1

SINETStream以外のライブラリの Version 表示については上記と異なる場合があります。

2.2.3. Readerのプログラムと設定ファイルを準備する

手順を以下に示します。

  1. Reader用のディレクトリを作成する
  2. SINETStream の設定ファイルを準備する
  3. Readerのプログラムを準備する

この節ではコンテナ環境にてコマンドの実行を行います。2.2.2 でコンテナ環境の操作を行っていた状態が継続されていることを想定しています。

ディレクトリを作成し、そのディレクトリに移動します。

[user01@reader]$ mkdir -p ~/sinetstream/reader
[user01@reader]$ cd ~/sinetstream/reader

SINETStreamの設定ファイルを準備します。 このチュートリアルのための設定ファイルを GitHub に用意していますので、それを取得します。

[user01@reader]$ ss_url=https://raw.githubusercontent.com/nii-gakunin-cloud/sinetstream/main
[user01@reader]$ curl -O ${ss_url}/docs/tutorial/.sinetstream_config.yml

SINETStreamのPython3 APIを用いて作成されたReaderのサンプルプログラムをGitHubから取得します。 取得したプログラムには実行権限を付与します。

[user01@reader]$ curl -O ${ss_url}/python/sample/text/consumer.py
[user01@reader]$ chmod a+x consumer.py

ここまでの手順が正しく行われたことを確認します。 ディレクトリとファイルが以下の実行例と同じになっていることを確認してください。

[user01@reader]$ pwd
/home/user01/sinetstream/reader
[user01@reader]$ ls -a
.  ..  .sinetstream_config.yml  consumer.py

2.3. Writerを準備する

Writerを実行する環境をコンテナで用意します。

2.3.1. Writerの実行環境となるコンテナを起動する

Writerのホスト環境で以下のコマンドを実行してください。

[user00@host-writer]$ docker run -d --name writer --hostname writer -e ENABLE_BROKER=false \
                      --add-host=broker:192.168.1.xxx harbor.vcloud.nii.ac.jp/sinetstream/tutorial:1.8

192.168.1.XXXには実際に使用する環境のBrokerのIPアドレスを指定してください。

コンテナが正常に起動したことを確認するために、状態を表示させます。

[user00@host-writer]$ docker ps -l
CONTAINER ID        IMAGE                        COMMAND                  CREATED             STATUS              PORTS                NAMES
xxxxxxxxxxxx        sinetstream/tutorial:1.8   "/usr/local/bin/supe…"   About a minute ago  Up About a minute   1883/tcp, 9092/tcp   writer

STATUSUp と表示されていれば、コンテナが正常に起動しています。

Readerコンテナの場合と同様、起動時に -e ENABLE_BROKER=falseを指定したので、コンテナ内ではブローカーが実行されません。 そのことを確認します。

[user00@host-writer]$ docker exec -t writer ps ax
  PID TTY      STAT   TIME COMMAND
    1 ?        Ss     0:00 /usr/bin/python3 /usr/local/bin/supervisord -n -c /et
   31 pts/0    Rs+    0:00 ps ax

また--add-host の指定により Writerコンテナの /etc/hosts に Broker のIPアドレスが登録されていることを確認します。

[user00@host-writer]$ docker exec -t writer cat /etc/hosts
127.0.0.1       localhost
::1     localhost ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
192.168.1.XXX   broker
172.17.0.4      writer

192.168.1.XXXにはコンテナ起動時に指定したBrokerのIPアドレスが表示されます。また writer のIPアドレスは実行環境によって異なる値が表示されます。

2.3.2. SINETStreamをインストールする

SINETStreamのPython3ライブラリをコンテナ環境にインストールします。 まずWriterのホスト環境からコンテナ環境にはいります。

[user00@host-writer]$ docker exec -it -u user01 writer bash

次にコンテナ環境で SINETStream のライブラリをインストールします。 以下のコマンドを実行してください。

[user01@writer]$ pip3 install --user sinetstream-kafka sinetstream-mqtt
Collecting sinetstream-kafka
(中略)
Successfully installed avro-python3-1.10.0 kafka-python-2.0.2 paho-mqtt-1.5.1 promise-2.3 pycryptodome-3.9.9 pyyaml-3.13 sinetstream-1.4.0 sinetstream-kafka-1.4.0 sinetstream-mqtt-1.4.0 six-1.15.0

最後に Successfully installed ...と表示されていれば、ライブラリのインストールに成功しています。 確認のためインストールされている Python3 ライブラリの一覧を表示してみます。

[user01@writer]$ pip3 list
Package           Version
----------------- --------
avro-python3      1.10.0
kafka-python      2.0.2
paho-mqtt         1.5.1
pip               20.2.4
promise           2.3
pycryptodome      3.9.9
PyYAML            3.13
setuptools        50.3.2
sinetstream       1.4.0
sinetstream-kafka 1.4.0
sinetstream-mqtt  1.4.0
six               1.15.0
supervisor        4.2.1

SINETStream以外のライブラリの Version 表示については上記と異なる場合があります。

2.3.3. Writerのプログラムと設定ファイルを準備する

手順を以下に示します。

  1. Writer用のディレクトリを作成する
  2. SINETStream の設定ファイルを準備する
  3. Writerのプログラムを準備する

この節ではコンテナ環境にてコマンドの実行を行います。2.3.2 でコンテナ環境の操作を行っていた状態が継続されていることを想定しています。

ディレクトリを作成し、そのディレクトリに移動します。

[user01@writer]$ mkdir -p ~/sinetstream/writer
[user01@writer]$ cd ~/sinetstream/writer

SINETStreamの設定ファイルを準備します。 このチュートリアルのための設定ファイルを GitHub に用意していますので、それを取得します。

[user01@writer]$ ss_url=https://raw.githubusercontent.com/nii-gakunin-cloud/sinetstream/main
[user01@writer]$ curl -O ${ss_url}/docs/tutorial/.sinetstream_config.yml

SINETStreamのPython3 APIを用いて作成されたWriterのサンプルプログラムをGitHubから取得します。 取得したプログラムには実行権限を付与します。

[user01@writer]$ curl -O ${ss_url}/python/sample/text/producer.py
[user01@writer]$ chmod a+x producer.py

ここまでの手順が正しく行われたことを確認します。 ディレクトリとファイルが以下の実行例と同じになっていることを確認してください。

[user01@writer]$ pwd
/home/user01/sinetstream/writer
[user01@writer]$ ls -a
.  ..  .sinetstream_config.yml  producer.py

3. Reader, Writerを実行する

ReaderWriterを実行して SINETStream を利用したメッセージの送受信が行えることを確認します。

SINETStreamで利用可能なメッセージングシステムは KafkaMQTT(Mosquitto) などがあります。 ここでは、まず Kafkaブローカーとメッセージの送受信が行えることを確認します。 その後、設定変更のみでプログラムを変更することなくMQTTブローカーともメッセージの送受信が行えることを確認します。

3.1. Kafkaブローカーとの間でメッセージの送受信を行う

ここからは、ReaderWriterのプログラムを同時に実行します。 実行するためのターミナルをそれぞれのマシンで開いてください。

Readerの実行

Readerのターミナルにて、ホスト環境からコンテナ環境に入ります。 以下のコマンドを実行してください。

既にコンテナ環境に入っている場合は実行する必要はありません。

[user00@host-reader]$ docker exec -it -u user01 reader bash

Reader用のディレクトリに移動してください。

[user01@reader]$ cd ~/sinetstream/reader

Readerのプログラムを実行します。 引数に指定しているservice-tutorial-kafkaはKafkaブローカーを指定するサービス名です。

[user01@reader]$ ./consumer.py -s service-tutorial-kafka
Press ctrl-c to exit the program.
: service=service-tutorial-kafka

コマンドラインで指定したサービス名が : の後に表示されます。

Writerの実行

Writerのターミナルにて、ホスト環境からコンテナ環境に入ります。 以下のコマンドを実行してください。

既にコンテナ環境に入っている場合は実行する必要はありません。

[user00@host-writer]$ docker exec -it -u user01 writer bash

Writer用のディレクトリに移動してください。

[user01@writer]$ cd ~/sinetstream/writer

Writerのプログラムを実行します。 引数に指定しているservice-tutorial-kafkaはKafkaブローカーを指定するサービス名です。

[user01@writer]$ ./producer.py -s service-tutorial-kafka
Press ctrl-c to exit the program.
: service=service-tutorial-kafka

コマンドラインで指定したサービス名が : の後に表示されます。

メッセージの送受信

Writerのターミナルにて、メッセージとなるテキストを入力し最後に改行を打ち込んでください。 改行までに入力された文字列がメッセージとして Kafka ブローカーに送信されます。

Readerのプログラムは Kafkaブローカーに送られたメッセージを受信してターミナルに表示します。 先ほど Writer で送信したメッセージが表示されていることを確認してください。

メッセージがブローカーによって配送されていることを確認する

Writerから送信したメッセージが Broker によってReaderに配送されていることを確認するために、ブローカーを一時的に停止してみます。

Brokerのホスト環境で以下のコマンドを実行しBrokerコンテナを停止させてください。

[user00@host-broker]$ docker stop broker

Brokerコンテナが停止した状態で Writer のターミナルから実行中のサンプルプログラム producer.pyでメッセージの送信を行ってください。 Kafkaブローカーが停止しているため、Reader側でメッセージの受信ができずWriter側で入力した文字列が表示されないことが確認できます。

確認が済んだらBrokerのホスト環境で以下のコマンドを実行してBrokerコンテナを再開させてください。

[user00@host-broker]$ docker start broker

Reader, Writer の停止

ReaderWriter のサンプルプログラムを停止します。 サンプルプログラムを実行しているそれぞれのターミナルで ctrl-c を打ち込んでください。

3.2. MQTTブローカー(Mosquitto)との間でメッセージの送受信を行う

Kafkaブローカーと同じ操作を行い、MQTTブローカーを利用した場合もメッセージの送受信が行えることを確認します。 先ほどはプログラムの引数に Kafka ブローカーを指定するサービス名として service-tutorial-kafkaを指定しました。 ここでは代わりに MQTTブローカーを指定するためのサービス名 service-tutorial-mqttを指定します。

Readerの実行

Readerのターミナルにて、以下のコマンドを実行してください。

[user01@reader]$ ./consumer.py -s service-tutorial-mqtt
Press ctrl-c to exit the program.
: service=service-tutorial-mqtt

Readerのターミナルでは、コンテナ環境のReader用ディレクトリにいることを想定しています。

Writerの実行

Writerのターミナルにて、以下のコマンドを実行してください。

[user01@writer]$ ./producer.py -s service-tutorial-mqtt
Press ctrl-c to exit the program.
: service=service-tutorial-mqtt

Writerのターミナルでは、コンテナ環境のWriter用ディレクトリにいることを想定しています。

メッセージの送受信

Kafkaブローカーの場合と同様の操作を行い、MQTTブローカーを用いた場合もメッセージの送受信が行えることを確認します。 Writerのターミナルから文字列を入力して改行を打ち込むとReaderのターミナルに入力した文字列に対応するメッセージが表示されることを確認してください。

Reader, Writer の停止

メッセージの送受信が行えたことを確認したら ReaderWriter のサンプルプログラムを停止します。 それぞれのターミナルで ctrl-c を打ち込んでください。

3.3. コンテナの停止、削除

最後にこのチュートリアルで使用したコンテナの停止、削除を行います。

コンテナの操作はホスト環境で実行します。 コンテナ環境からホスト環境に戻る場合は exit を実行してください。 例えば Readerのターミナルでコンテナ環境からホスト環境に戻る場合、以下のようになります。

[user01@reader]$ exit
exit
[user00@host-reader]$

ホスト環境に戻ったら、それぞれのマシンで以下のコマンドを実行してください。

Broker

[user00@host-broker]$ docker stop broker
[user00@host-broker]$ docker rm broker

Reader

[user00@host-reader]$ docker stop reader
[user00@host-reader]$ docker rm reader

Writer

[user00@host-writer]$ docker stop writer
[user00@host-writer]$ docker rm writer

4. SINETStreamについて

チュートリアルで使用した SINETStream の設定ファイルとAPIについて簡単な説明を行います。

この章の記述は STEP1 のものと全く同じ内容になっています。

4.1. 設定ファイル

Reader, Writer で使用した SINETStream の設定ファイル .sinetstream_config.yml の内容を以下に示します。

service-tutorial-kafka:
    type: kafka
    brokers: "broker:9092"
    topic: topic-tutorial-kafka
    value_type: text

service-tutorial-mqtt:
    type: mqtt
    brokers: "broker:1883"
    topic: topic-tutorial-mqtt
    value_type: text

設定ファイルは階層化したキー、バリューをYAMLの形式で記述しています。

トップレベルのキーには SINETStreamでサービス名と呼んでいる名前を記しています。 上記の例では service-tutorial-kafkaservice-tutorial-mqtt がこれにあたります。 サービス名はブローカーとの接続に関する種々のパラメータをまとめて扱うためのラベル名になります。 Reader, Writerのサンプルプログラムを実行する際に指定したサービス名は、設定ファイルに記述した、この値に対応しています。

サービス名の子要素にブローカーとの接続に関する具体的なパラメータを記述します。 サービス名service-tutorial-kafka に対応するパラメータは以下の部分になります。

    type: kafka
    brokers: "broker:9092"
    topic: topic-tutorial-kafka
    value_type: text

各パラメータに関する簡単な説明を以下に記します。

設定ファイルに関する詳細についてはユーザガイドを参照してください。

4.2. SINETStream API

Reader

Readerのサンプルプログラムconsumer.py で SINETStream API を使用している箇所を以下に示します。

with MessageReader(service) as reader:
    for message in reader:
        print(f"topic={message.topic} value='{message.value}'")

サンプルプログラムconsumer.py 全体のコードはGitHubで確認できます。

はじめにメッセージを受信するための MessageReader のオブジェクトを作成します。 その際、引数としてサービス名を指定します。 MessageReaderは通常Pythonのwith文で実行します。 これにより、ブローカーとの接続、切断が with文のブロックの境界で実行されます。 with文が返した値 reader はイテラブルなオブジェクトになっています。 for文などによりreaderから順次取得した値が、ブローカーから受信したメッセージとなります。

Writer

Writerのサンプルプログラムproducer.py で SINETStream API を使用している箇所を以下に示します。

with MessageWriter(service) as writer:
    while True:
        message = get_message()
        writer.publish(message)

サンプルプログラムproducer.py 全体のコードはGitHubで確認できます。

メッセージを送信するための MessageWriter のオブジェクトを作成します。 その際、引数としてサービス名を指定します。 MessageWriterは通常Pythonのwith文で実行します。 これにより、ブローカーとの接続、切断が with文のブロックの境界で実行されます。 with文が返した値 writer に対して .publish(message)を呼び出すことでメッセージをブローカーに送信することができます。

SINETStreamのPython APIに関する詳細についてはユーザガイドを参照してください。

4.3. SINETStream API(非同期API)

SINETStream v1.4 では非同期APIを利用することができます。 ここまで記してきたのと同様の処理を非同期APIで実行するサンプルプログラムをGithubに用意しています。

Reader(非同期API)

非同期APIを用いたReaderのサンプルプログラムconsumer.pyでSINETStream APIを使用している箇所を以下に示します。

reader = AsyncMessageReader(service)
reader.on_message = show_message
reader.open()

はじめにメッセージを受信するための AsyncMessageReader のオブジェクトを作成します。 その際、引数としてサービス名を指定します。

次に.on_messageプロパティにメッセージを受信した際に呼び出すコールバック関数を指定します。 コールバック関数は引数から受信したメッセージを受け取ることができます。 サンプルプログラムでは以下のようなコールバック関数を用いています。

def show_message(message):
    ts = datetime.fromtimestamp(message.timestamp)
    print(f"[{ts}] topic={message.topic} value='{message.value}'")

最後にreader.open()を呼び出すことでブローカーに接続されます。

Writer(非同期API)

非同期APIを用いたWriterのサンプルプログラムproducer.pyでSINETStream APIを使用している箇所を以下に示します。

with AsyncMessageWriter(service) as writer:
    while True:
        message = get_message()
        writer.publish(message)

メッセージを送信するための AsyncMessageWriter のオブジェクトを作成します。 その際、引数としてサービス名を指定します。 AsyncMessageWriterは通常Pythonのwith文で実行します。 これにより、ブローカーとの接続、切断が with文のブロックの境界で実行されます。 with文が返した値 writer に対して .publish(message)を呼び出すことでメッセージをブローカーに送信することができます。

同期APIでは.publish()が送信処理の完了までブロックしますが、非同期APIの場合はブロックすることなく返ります。 また非同期APIの.publish()は処理結果が確定した後の処理を指定するための Promiseオブジェクトを返します。