English

プラグイン開発ガイド(Messaging system / Java)

新たなメッセージングシステムをSINETStream (Java)で扱えるようにするためのプラグインを開発する手順について説明します。

1. はじめに

SINETStream v1.2では以下に示すメッセージングシステムに対応しています。

上記のメッセージングシステムに対応するための処理はプラグインとして実装されています。 さらに、新たなプラグインを実装することで、 上記に示したメッセージングシステム以外のものをSINETStreamで扱えるようになります。

このドキュメントでは新たなメッセージングシステムをサポートするためのプラグインを開発する手順について説明します。

1.1 対象者

このドキュメントが対象としている読者を以下に示します。

1.2 前提知識

このドキュメントの説明は、以下の知識を有していることを前提としています。

2. SINETStream の内部構造について

プラグインを開発する手順を説明する前に、開発の際に必要となるSINETStreamの内部構造について説明します。

2.1 モジュール構成

SINETStreamのモジュール構成を以下の図に示します。

モジュール構成

青枠で示した部分がSINETStream本体です。 この部分はメッセージングシステムによらない機能を提供しています。 緑枠で示した部分がSINETStreamのプラグインです。 メッセージングシステムとのインターフェイスはこの部分に閉じた実装となります。 各メッセージングシステムに対する操作は、SPIを実装したプラグインを介して行われます。

モジュールの各部分についての簡単な説明を以下に記します。

2.2 処理シーケンス

SINETStreamでメッセージの送受信を行うためのクラスMessageReader, MesageWriter, AsyncMessageReader, AsyncMessageWriter の処理シーケンスについて説明します。

2.2.1 MessageReader

以下に示すメッセージ受信処理を SINETStreamで行った場合のシーケンス図を示します。 ここではKafkaブローカーからメッセージを受信することを想定しています。

MessageReaderFactory<String> factory =
    MessageReaderFactory.<String>builder()
            .service("kafka-service")
            .build();

try (MessageReader<String> reader = factory.getReader()) {
    Message<String> msg;
    while (Objects.nonNull(msg = reader.read())) {
        System.out.print(msg.getValue());
    }
}

MessageReader

図中のクラスについての簡単な説明を以下に記します。

2.2.2 MessageWriter

以下に示すメッセージ送信処理を SINETStreamで行った場合のシーケンス図を示します。 ここではKafkaブローカーにメッセージを送信することを想定しています。

MessageWriterFactory<String> factory =
    MessageWriterFactory.<String>builder()
            .service("kafka-service")
            .build();

try (MessageWriter<String> writer = factory.getWriter()) {
    for (String msg : messages) {
        writer.write(msg);
    }
}

MessageWriter

図中のクラスについての簡単な説明を以下に記します。

2.2.3 AsyncMessageReader

以下に示す非同期APIのメッセージ受信処理を SINETStreamで行った場合のシーケンス図を示します。 ここではKafkaブローカーからメッセージを受信することを想定しています。

MessageReaderFactory<String> factory =
    MessageReaderFactory.<String>builder()
            .service("kafka-service")
            .build();

try (AsyncMessageReader<String> reader = factory.getAsyncReader()) {
    reader.addOnMessageCallback((message) -> {
        System.out.print(msg.getValue());
    });

    // 他の処理
    otherTask();
}

AsyncMessageReader

図中のクラスについての簡単な説明を以下に記します。

2.2.4 AsyncMessageWriter

以下に示す非同期APIのメッセージ送信処理を SINETStreamで行った場合のシーケンス図を示します。 ここではKafkaブローカーにメッセージを送信することを想定しています。

MessageWriterFactory<String> factory =
    MessageWriterFactory.<String>builder()
            .service("kafka-service")
            .build();

try (AsyncMessageWriter<String> writer = factory.getAsyncWriter()) {
    for (String msg : messages) {
        writer.write(msg)
            .then(r -> System.err.println("success"));
    }
}

MessageWriter

図中のクラスについての簡単な説明を以下に記します。

3. プラグインの実装方法

3.1 概要

SINETStreamでは ServiceLoader を用いてプラグインを実現しています。 プラグインを作成するためには以下の作業が必要となります。

それぞれの作業項目の詳細について以下に記します。

3.2 プロバイダ構成ファイルの作成

プロバイダ構成ファイルにサービスプロバイダを登録することで、 ServiceLoaderがプラグインを見つけることができるようになります。

構成ファイルはリソースディレクトリのMETA-INF/services/に配置します。 ファイル名はサービスプロバイダの完全修飾クラス名にする必要があります。 SINETStreamのメッセージ受信、送信に対応するサービスプロバイダの場合、以下のファイル名となります。

構成ファイルには、サービスプロバイダの実装クラスを完全修飾名で1クラス1行で記述します。

例えば Kafkaブローカーにメッセージ送信を行うクラスjp.ad.sinet.stream.plugins.kafka.KafkaMessageProviderを登録する場合、 以下の内容を構成ファイルMETA-INF/services/jp.ad.sinet.stream.spi.MessageWriterProviderに記します。

jp.ad.sinet.stream.plugins.kafka.KafkaMessageProvider

SINETStreamには4つのサービスプロバイダがありますが、1つのプラグインで全てのサービスプロバイダに対応する必要はありません。 サポートするものに対応する構成ファイルのみを作成してください。

3.3 サービスプロバイダの実装

3.3.1 メッセージ送信(同期API)のためのクラス

メッセージ送信(同期API)を行うサービスプロバイダを実装するには、 以下に示すインターフェースの実装クラスを作成する必要があります。

MessageWriterProviderのメソッドを以下に示します。

PluginMessageWriterの主なメソッドを以下に示します。

3.3.2 メッセージ受信(同期API)のためのクラス

メッセージ受信(同期API)を行うサービスプロバイダを実装するには、 以下に示すインターフェースの実装クラスを作成する必要がある。

MessageReaderProviderのメソッドを以下に示します。

PluginMessageReaderの主なメソッドを以下に示します。

3.3.3 メッセージ送信(非同期API)のためのクラス

メッセージ送信(非同期API)を行うサービスプロバイダを実装するには、 以下に示すインターフェースの実装クラスを作成する必要があります。

AsyncMessageWriterProviderのメソッドを以下に示します。

PluginAsyncMessageWriterの主なメソッドを以下に示します。

3.3.4 メッセージ受信(非同期API)のためのクラス

メッセージ受信(非同期API)を行うサービスプロバイダを実装するには、 以下に示すインターフェースの実装クラスを作成する必要がある。

AsyncMessageReaderProviderのメソッドを以下に示します。

PluginAsyncMessageReaderの主なメソッドを以下に示します。

4. プラグインの実装例

プラグイン実装の具体的な手順を示すために実装例を示します。

ここで示す実装例では実際のブローカーにアクセスするのではなく、 プロセス内でjava.util.Queueオブジェクトを利用したデータの受け渡しを行う処理をSINETStreamのプラグインとして実現します。

4.1 ファイル構成

以下のファイルを作成します。

4.2 実装クラス

プラグインとして実装するクラスについて説明します。

ここでは主な処理についてのみの説明となります。サンプルコード全体を確認する場合は「ソースコード」のリンク先を参照してください。

4.2.1 QueueMessageProvider.java

プラグインのプロバイダインタフェースMessageReaderProvider, MessageWriterProviderを実装したクラスになります。

public class QueueMessageProvider implements MessageReaderProvider, MessageWriterProvider,
        AsyncMessageReaderProvider, AsyncMessageWriterProvider {

    private static final ConcurrentMap<String, BlockingQueue<QueueMessage>> queues = new ConcurrentHashMap<>();

    @Override
    public String getType() {
        return "queue";
    }

    @Override
    public PluginMessageReader getReader(ReaderParameters params) {
        String topic = params.getTopics().get(0);
        BlockingQueue<QueueMessage> queue = queues.computeIfAbsent(topic, key -> new LinkedBlockingQueue<>());
        return new QueueMessageReader(params, queue);
    }

    @Override
    public PluginMessageWriter getWriter(WriterParameters params) {
        String topic = params.getTopic();
        BlockingQueue<QueueMessage> queue = queues.computeIfAbsent(topic, key -> new LinkedBlockingQueue<>());
        return new QueueMessageWriter(params, queue);
    }

    @Override
    public PluginAsyncMessageReader getAsyncReader(ReaderParameters params) {
        String topic = params.getTopics().get(0);
        BlockingQueue<QueueMessage> queue = queues.computeIfAbsent(topic, key -> new LinkedBlockingQueue<>());
        return new QueueAsyncMessageReader(params, queue);
    }

    @Override
    public PluginAsyncMessageWriter getAsyncWriter(WriterParameters params) {
        String topic = params.getTopic();
        BlockingQueue<QueueMessage> queue = queues.computeIfAbsent(topic, key -> new LinkedBlockingQueue<>());
        return new QueueAsyncMessageWriter(params, queue);
    }
}

getType()でメッセージングシステムのタイプ名を返します。 getReader()でプラグインのPluginMessageReader実装となるQueueMessageReaderオブジェクトを返します。 getWriter()でプラグインのPluginMessageWriter実装となるQueueMessageWriterオブジェクトを返します。 getAsyncReader()でプラグインのPluginAsyncMessageReader実装となるQueueAsyncMessageReaderオブジェクトを返します。 getAsyncWriter()でプラグインのPluginAsyncMessageWriter実装となるQueueAsyncMessageWriterオブジェクトを返します。

QueueMessageReader, QueueMessageWriter, QueueAsyncMessageReader, QueueAsyncMessageWriterのコンストラクタには BlockingQueueのオブジェクト queueを引数で渡します。queueを通してメッセージが受け渡されことになります。

4.2.2 QueueMessageReader.java

PluginMessageReaderを実装したクラスになります。

public class QueueMessageReader implements PluginMessageReader {
(中略)
    @Override
    public PluginMessageWrapper read() {
        try {
            return queue.poll(receiveTimeout.getSeconds(), TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new SinetStreamIOException(e);
        }
    }
(中略)
}

read()はメッセージングシステムからメッセージの取得を行い、その値を返すメソッドになります。 ここでは queue からメッセージを取得して、その値を返しています。

4.2.3 QueueMessageWriter.java

PluginMessageWriterを実装したクラスになります。

public class QueueMessageWriter implements PluginMessageWriter {
(中略)
    @Override
    public void write(byte[] aByte) {
        QueueMessage msg = new QueueMessage(topic, aByte);
        try {
            queue.put(msg);
        } catch (InterruptedException e) {
            throw new SinetStreamIOException(e);
        }
    }
(中略)
}

write()は引数で渡されたバイト列をメッセージングシステムに送信するメソッドになります。 ここでは、引数で受け取ったバイト列をQueueMessageクラスでラップして queue に送っています。

4.2.4 QueueAsyncMessageReader.java

PluginAsyncMessageReaderを実装したクラスになります。

public class QueueAsyncMessageReader implements PluginAsyncMessageReader {
(中略)
    public QueueAsyncMessageReader(ReaderParameters params, BlockingQueue<QueueMessage> queue) {
(中略)
        executor = Executors.newSingleThreadExecutor();
        future = executor.submit(this::pollingTask);
    }

    private void pollingTask() {
        try {
            while (!closed.get()) {
                onMessage(queue.take());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void onMessage(PluginMessageWrapper message) {
        for (Consumer<PluginMessageWrapper> callback : onMessageCallbacks) {
            try {
                callback.accept(message);
            } catch (Throwable ex) {
                onFailure(ex);
            }
        }
    }
(中略)
    @Override
    public void addOnMessageCallback(Consumer<PluginMessageWrapper> onMessage, Consumer<Throwable> onFailure) {
        if (Objects.nonNull(onMessage)) {
            onMessageCallbacks.add(onMessage);
        }
        if (Objects.nonNull(onFailure)) {
            onFailureCallbacks.add(onFailure);
        }
    }
(中略)
}

addOnMessageCallback()は、メッセージングシステムからメッセージの取得した際に呼び出すコールバック関数を登録します。 また、コンストラクタで起動したexecutorのスレッドでpollingTask() を実行し queue からのメッセージの取得を行います。 メッセージの取得に成功すると登録されているコールバック関数をメッセージを引数にして呼び出します。

4.2.5 QueueAsyncMessageWriter.java

PluginAsyncMessageWriterを実装したクラスになります。

public class QueueAsyncMessageWriter implements PluginAsyncMessageWriter {
(中略)
    private final DefaultDeferredManager manager =
            new DefaultDeferredManager(Executors.newFixedThreadPool(4));
(中略)
    @Override
    public Promise<?, ? extends Throwable, ?> write(byte[] bytes) {
        if (closed.get()) {
            throw new SinetStreamIOException();
        }
        return manager.when(() -> enqueue(bytes));
    }

    private void enqueue(byte[] bytes) {
        QueueMessage msg = new QueueMessage(topic, bytes);
        try {
            queue.put(msg);
        } catch (InterruptedException e) {
            throw new SinetStreamIOException(e);
        }
    }
(中略)
}

PluginAsyncMessageWriterは非同期APIを想定しているので、メッセージ送信処理のwrite()ではqueueへの追加を直接は行っていません。 manager.when()を呼び出すことでmanagerが管理するスレッドプールにqueueへのメッセージ追加のタスクを依頼しています。 そのためwrite()はブロックせずに直ぐに返ります。

4.3 プロバイダ構成ファイルの作成

リソースディレクトリのMETA-INF/services/に4つの構成ファイルを以下の内容で作成します。

4.4 jarファイルの作成

プラグインのjarファイルを作成する手順を以下に示します。

  1. Gradleをインストールする
  2. gradle を実行して jar ファイルを作成する
    $ gradle jar
    
  3. build/libs/にjarファイルが作成されたことを確認する
    $ ls build/libs/
    SINETStream-queue-1.2.0.jar
    

4.5 ソースコード

プラグインの実装例となるファイルへのリンクを以下に示します。