English

プラグイン開発ガイド(Messaging system / Python)

新たなメッセージングシステムをSINETStream (Python)で扱えるようにするためのプラグインを開発する手順について説明します。

1. はじめに

SINETStream v1.2では以下に示すメッセージングシステムに対応しています。

上記のメッセージングシステムに対応するための処理はプラグインとして実装されています。 さらに、新たなプラグインを実装することで、上記に示したメッセージングシステム以外のものをSINETStreamで扱えるようになります。

このドキュメントでは新たなメッセージングシステムをサポートするためのプラグインを開発する手順について説明します。

1.1 対象者

このドキュメントが対象としている読者を以下に示します。

1.2 前提知識

このドキュメントの説明は、以下の知識を有していることを前提としています。

2. SINETStream の内部構造について

プラグインを開発する手順を説明する前に、開発の際に必要となるSINETStreamの内部構造について説明します。

2.1 モジュール構成

SINETStreamのモジュール構成を以下の図に示します。

モジュール構成

青枠で示した部分がSINETStream本体です。この部分はメッセージングシステムによらない機能を提供しています。 緑枠で示した部分がSINETStreamのプラグインです。 メッセージングシステム固有の処理は緑枠で示したプラグインに閉じた実装となっています。

モジュールの各部についての簡単な説明を以下に記します。

2.2 処理シーケンス

SINETStreamでメッセージの送受信を行うためのクラスMessageReader, MesageWriter, AsyncMessageReader, AsyncMessageWriter の処理シーケンスについて説明します。

2.2.1 MessageReader

以下に示すようなメッセージ受信処理を SINETStreamで行った場合のシーケンス図を示します。 ここではKafkaブローカーからメッセージを受信することを想定しています。

with MessageReader(service='kafka-service') as reader:
    for message in reader:
        print(f'value={message.value}')

MessageReader

図中のクラスについての簡単な説明を以下に記します。

2.2.2 MessageWriter

以下に示すようなメッセージ送信処理を SINETStreamで行った場合のシーケンス図を示します。 ここではKafkaブローカーにメッセージを送信することを想定しています。

with MessageWriter(service='kafka-service') as writer:
    for message in messages:
        writer.publish(message)

MessageWriter

図中のクラスについての簡単な説明を以下に記します。

2.2.3 AsyncMessageReader

以下に示すようなメッセージ受信処理を SINETStreamの非同期APIで行った場合のシーケンス図を示します。 ここではKafkaブローカーからメッセージを受信することを想定しています。

def show_message(message):
    print(f'value={message.value}')

reader = AsyncMessageReader(service='kafka-service')
reader.on_message = show_message
reader.open()

# 他の処理...

reader.close()

AsyncMessageReader

図中のクラスについての簡単な説明を以下に記します。

2.2.4 AsyncMessageWriter

以下に示すようなメッセージ送信処理をSINETStreamの非同期APIで行った場合のシーケンス図を示します。 ここではKafkaブローカーにメッセージを送信することを想定しています。

def on_publish(_):
    print("success")

with AsyncMessageWriter(service='kafka-service') as writer:
    for message in messages:
        writer.publish(message).then(on_publish)

AsyncMessageWriter

図中のクラスについての簡単な説明を以下に記します。

3. プラグインの実装方法

3.1 概要

Python Packaging User Guide - Creating and discovering plugins にあるように Python でプラグインを作成する場合、主に3つの方法があります。

  1. 命名規則を用いる方法
  2. 名前空間パッケージを用いる方法
  3. パッケージメタデータを用いる方法

SINETStreamでは3番目のパッケージメタデータを用いてプラグインを実現しています。

SINETStreamのプラグインを作成するためには以下の作業が必要となります。

それぞれの作業項目の詳細について以下に記します。

3.2 プラグインに定められているメソッドを実装したクラスの作成

プラグインで必要となるメソッドはメッセージを受信するためのクラスとメッセージを送信するためのクラスで異なります。 それぞれについて順に説明します。

3.2.1 メッセージ受信のためのクラス

メッセージ受信を行うプラグインで必要となるメソッドについて説明します。

メッセージ受信を行うプラグインのクラスでは、以下の3つのメソッドを実装する必要があります。

プラグインが上記のメソッドを実装することを確認するために、抽象基底クラス sinetstream.spi.PluginMessageReader を利用することができます。PluginMessageReaderでは上記の3つのメソッドが抽象メソッドとして定義されています。

メッセージ受信を行うクラスのコンストラクタは引数から params を受け取ります。 paramsには、SINETStreamの設定ファイル、あるいはMessageReaderのコンストラクタで指定されたパラメータがdict型の変数として渡されます。

3.2.2 メッセージ送信のためのクラス

メッセージ送信を行うプラグインで必要となるインターフェースについて説明します。

メッセージ送信を行うプラグインのクラスでは、以下の3つのメソッドを実装する必要があります。

プラグインが上記のメソッドを実装することを確認するために、抽象基底クラス sinetstream.spi.PluginMessageWriter を利用することができます。PluginMessageWriterでは上記の3つのメソッドが抽象メソッドとして定義されています。

メッセージ送信を行うクラスのコンストラクタは引数から params を受け取ります。 paramsには、SINETStreamの設定ファイル、あるいはMessageWriterのコンストラクタで指定されたパラメータがdict型の変数として渡されます。

3.2.3 メッセージ受信(非同期API)のためのクラス

非同期APIのメッセージ受信を行うプラグインで必要となるメソッドについて説明します。 以下の2つのメソッドと2つのプロパティを実装する必要があります。

プラグインが上記のメソッドを実装することを確認するために、抽象基底クラス sinetstream.spi.PluginAsyncMessageReader を利用することができます。PluginAsyncMessageReaderでは上記のメソッド、プロパティが抽象メソッドとして定義されています。

メッセージ受信を行うクラスのコンストラクタは引数から params を受け取ります。 paramsには、SINETStreamの設定ファイル、あるいはAsyncMessageReaderのコンストラクタで指定されたパラメータがdict型の変数として渡されます。

3.2.4 メッセージ送信(非同期API)のためのクラス

非同期APIのメッセージ送信を行うプラグインで必要となるインターフェースについて説明します。 以下の3つのメソッドを実装する必要があります。

プラグインが上記のメソッドを実装することを確認するために、抽象基底クラス sinetstream.spi.PluginAsyncMessageWriter を利用することができます。PluginAsyncMessageWriterでは上記の3つのメソッドが抽象メソッドとして定義されています。

メッセージ送信を行うクラスのコンストラクタは引数から params を受け取ります。 paramsには、SINETStreamの設定ファイル、あるいはAsyncMessageWriterのコンストラクタで指定されたパラメータがdict型の変数として渡されます。

3.3 パッケージメタデータの作成

setuptools のエントリポイントにクラスを登録することで、 SINETStreamがプラグインを見つけることができるようになります。 これは登録されたエントリポイントをsetuptoolsが検出する機能を利用して実現しています。 setuptoolsはPythonの配布パッケージのビルドなどを行うためのツールです。

登録されているエントリポイントからSINETStreamで必要となるクラスを探し出すことができるようにするためには、 エントリポイントのグループと名前を適切に設定する必要があります。

SINETStreamではグループをMessageReader, MessageWriter, AsyncMessageReader, AsyncMessageWriter のうちどのクラスに対応するのかを識別するために用いています。 以下に示すいずれかのグループを指定してください。

1つのプラグインで必ずしも上記全てのグループに対応する実装を提供する必要はありません。 必要なグループに関する記述のみを行ってください。

エントリポイントにはグループ内で識別するための名前がつけられます。 SINETSteramではエントリポイントに付けられた名前が、メッセージングシステムのタイプに対応づけられます。

例えばKafkaプラグインのメッセージ送信を行うクラス sinetstream_plugins.kafka:KafkaWriter に対応するエントリポイントは sinetstream.writerグループに所属する kafka という名前にします。

パッケージメタデータに上記の例のエントリポイントに対応する設定を行うには setup.cfg に以下の記述を行います。

[options.entry_points]
sinetstream.writer =
    kafka = sinetstream_plugins.kafka:KafkaWriter

エントリポイントの詳細については setuptools documentation - Entry Points を参照してください。

4. プラグインの実装例

プラグインを実装する具体的な手順を示すために、簡単な実装例を示します。

ここで示す実装例では実際のブローカーにアクセスするのではなく、 プロセス内でqueue.Queueオブジェクトを利用したデータの受け渡しを行う処理をSINETStreamのプラグインとして実現します。

4.1 ファイル構成

以下のファイルを作成します。

4.2 プラグイン実装

プラグインの実装を行うモジュールファイルqueue.pyについて説明します。

4.2.1 モジュールレベル

queue.Queueを格納するdict型変数をモジュールレベルで定義します。

queues = defaultdict(Queue)

queuesはトピック名をキー、Queueオブジェクトを値とする dict変数です。 defaultdictを利用することで、トピック名に対応する値が queuesに無い場合は自動的に作成された Queueオブジェクトが取得できます。

4.2.2 メッセージ送信のためのクラス

メッセージ送信を行うプラグインのクラスQueueWriterを実装します。

まずクラス定義を行います。

class QueueWriter(PluginMessageWriter):

ここでは抽象基底クラスPluginMessageWriterを継承したクラスを定義します。 プラグインクラスの実装においてPluginMessageWriterを継承することは必須ではありません。 しかし開発環境によっては抽象基底クラスを継承することにより、 プラグイン実装に必要となるメソッドに関する情報などの支援を受けられる場合があります。

次にコンストラクタを定義します。

    def __init__(self, params):
        self._queue = None
        self._topic = params.get('topic')
        if self._topic is None or not isinstance(self._topic, str):
            raise InvalidArgumentError()

引数のparamsにはSINETStreamの設定ファイル、 あるいはMessageWriterのコンストラクタで指定されたパラメータがdict型の変数として渡されます。 ここでは、パラメータtopicの値を送信対象のトピック名としてインスタンス変数に格納しています。

次にプラグインで実装する必要のあるメソッドを定義します。

    def open(self):
        self._queue = queues[self._topic]

    def close(self):
        self._queue = None

    def publish(self, value):
        self._queue.put(value)

open()の際に queuesからトピック名に対応する Queueのオブジェクトを取得します。 publish(value)では open()の際に格納した Queueオブジェクトにメッセージをput()します。 publish()で送られたメッセージは Queueオブジェクトを通して受信側に受け渡されます。

4.2.3 メッセージ受信のためのクラス

メッセージ受信を行うプラグインのクラスQueueReaderを実装します。

まずクラス定義を行います。

class QueueReader(PluginMessageReader):

ここでは抽象基底クラスPluginMessageReaderを継承したクラスを定義します。

次にコンストラクタを定義します。

    def __init__(self, params):
        self._queue = None
        self._topic = params.get('topic')
        if self._topic is None or not isinstance(self._topic, str):
            raise InvalidArgumentError()
        timeout_ms = params.get('receive_timeout_ms', inf)
        self._timeout = timeout_ms / 1000.0 if timeout_ms != inf else None

引数のparamsにはSINETStreamの設定ファイル、あるいはMessageReaderのコンストラクタで指定されたパラメータがdict型の変数として渡されます。 ここでは、パラメータtopicの値を受信対象のトピック名としてインスタンス変数に格納しています。 またreceive_timeout_msをメッセージ受信のタイムアウト値(ms)としてインスタンス変数に格納します。

次にプラグインで実装する必要のあるメソッドを定義します。

    def open(self):
        self._queue = queues[self._topic]

    def close(self):
        self._queue = None

    def __iter__(self):
        while True:
            try:
                value = self._queue.get(timeout=self._timeout)
                raw = value
                yield value, self._topic, raw
            except Empty:
                raise StopIteration()

open()の際に queuesからトピック名に対応する Queueのオブジェクトを取得します。 __iter__(value)open()の際に格納した Queueオブジェクトからメッセージ取得するイテレータを返します。 MessageWriterで送信されたメッセージはQueueオブジェクトを通して受け取ることができます。

4.2.4 メッセージ送信(非同期API)のためのクラス

非同期APIでメッセージ送信を行うプラグインのクラスQueueAsyncWriterを実装します。

まずクラス定義を行います。

class QueueAsyncWriter(PluginAsyncMessageWriter):

ここでは抽象基底クラスPluginAsyncMessageWriterを継承したクラスを定義します。 プラグインクラスの実装においてPluginAsyncMessageWriterを継承することは必須ではありません。 しかし開発環境によっては抽象基底クラスを継承することにより、 プラグイン実装に必要となるメソッドに関する情報などの支援を受けられる場合があります。

次にコンストラクタを定義します。

    def __init__(self, params):
        self._queue = None
        self._topic = params.get('topic')
        if self._topic is None or not isinstance(self._topic, str):
            raise InvalidArgumentError()
        self._executor = None

引数のparamsにはSINETStreamの設定ファイル、あるいはMessageWriterのコンストラクタで指定されたパラメータがdict型の変数として渡されます。 ここでは、パラメータtopicの値を送信対象のトピック名としてインスタンス変数に格納しています。

次にプラグインで実装する必要のあるメソッドを定義します。

    def open(self):
        self._executor = ThreadPoolExecutor()
        self._queue = queues[self._topic]

    def close(self):
        self._queue = None
        self._executor.shutdown()

    def publish(self, value):
        future = self._executor.submit(lambda: self._queue.put(value))
        return Promise.cast(future)

open()の際に queuesからトピック名に対応する Queueのオブジェクトを取得します。 またメッセージ送信処理を行うスレッドプールを作成しておきます。 publish(value)では open()の際に作成したスレッドプールに送信処理を行うタスクを実行するように依頼します。 また送信処理を表すPromiseオブジェクトを結果として返します。 close()ではopen()の際に作成したスレッドプールの資源解放を行います。

4.2.5 メッセージ受信(非同期API)のためのクラス

非同期APIでメッセージ受信を行うプラグインのクラスQueueAsyncReaderを実装します。

まずクラス定義を行います。

class QueueAsyncReader(PluginAsyncMessageReader):

ここでは抽象基底クラスPluginAsyncMessageReaderを継承したクラスを定義します。

次にコンストラクタを定義します。

    def __init__(self, params):
        self._queue = None
        self._topic = params.get('topic')
        if self._topic is None or not isinstance(self._topic, str):
            raise InvalidArgumentError()
        self._on_message = None
        self._on_failure = None
        self._reader_executor = None
        self._future = None
        self._closed = True

引数のparamsにはSINETStreamの設定ファイル、あるいはMessageReaderのコンストラクタで指定されたパラメータがdict型の変数として渡されます。 ここでは、パラメータtopicの値を受信対象のトピック名としてインスタンス変数に格納しています。

次にプラグインで実装する必要のあるメソッドを定義します。

    def open(self):
        if self._closed:
            self._queue = queues[self._topic]
            self._reader_executor = ThreadPoolExecutor(max_workers=1)
            self._closed = False
            self._future = self._reader_executor.submit(self._reader_loop)

    def _reader_loop(self):
        while not self._closed:
            try:
                value = self._queue.get(timeout=0.1)
                raw = value
                if self._on_message is not None:
                    self._on_message(value, self._topic, raw),
            except Empty:
                pass

    def close(self):
        if not self._closed:
            self._queue = None
            self._future.cancel()
            self._reader_executor.shutdown()
            self._reader_executor = None
            self._future = None
            self._closed = True

    @property
    def on_message(self):
        return self._on_message

    @on_message.setter
    def on_message(self, on_message):
        self._on_message = on_message

    @property
    def on_failure(self):
        return self._on_failure

    @on_failure.setter
    def on_failure(self, on_failure):
        self._on_failure = on_failure

open()ではメッセージ受信を行うスレッドプールを作成し、受信処理ループを実行しています。 受信処理ループでは、メッセージを受信するとコールバック関数self._on_message()を呼び出します。 .on_message, .on_failure は、それぞれメッセージ受信時、エラー発生時のコールバック関数に対応するプロパティとなっています。 close()では open()で起動した受信スレッドを停止しています。

4.3 パッケージング

4.3.1 setup.py, setup.cfgの作成

パッケージングを行う際のコマンドラインインタフェースとなる setup.py とその設定ファイル setup.cfg を作成します。

まず setup.py を作成します。設定については全てsetup.cfgで行うので setup.pyは必要最小限なものとします。

from setuptools import setup
setup()

次に setup.cfg を作成します。

[metadata]
name = sinetstream-queue
version = 1.2.0

[options]
package_dir=
    =src
packages = find_namespace:
zip_safe = False
namespace_packages =
  ssplugin
install_requires =
  sinetstream>=1.2.0
  promise
python_requires = >= 3.6

[options.packages.find]
where = src

[options.entry_points]
sinetstream.reader =
    queue = ssplugin.queue:QueueReader
sinetstream.writer =
    queue = ssplugin.queue:QueueWriter
sinetstream.async_reader =
    queue = ssplugin.queue:QueueAsyncReader
sinetstream.async_writer =
    queue = ssplugin.queue:QueueAsyncWriter

プラグインに直接関わる設定は options.entry_pointsセクションです。 sinetstream.reader, sinetstream.writerがメッセージ受信、メッセージ送信のプラグインに対応するグループになります。 また sinetstream.async_reader, sinetstream.async_writerが非同期APIのメッセージ受信、メッセージ送信のプラグインに対応するグループになります。 各グループに対して (メッセージングシステムのタイプ名)=(パッケージ名:クラス名) を指定しています。

4.3.2 パッケージの作成

$ python setup.py bdist_wheel
running bdist_wheel
running build
running build_py
(中略)
$ ls dist/
dist/sinetstream_queue-1.2.0-py3-none-any.whl

4.4 ソースコード

ここまで記した実装例のファイルへのリンクを以下に示します。