日本語

How to use a Kafka broker with SASL/SCRAM authentication

Overview

This page describes how to connect from SINETStream to a Kafka broker that requires SASL/SCRAM authentication.

The description will be made in the following order.

  1. Prerequisites
  2. Configurations on the Kafka broker (server side)
  3. Configurations on SINETStream (client side)
  4. Behavior on authentication errors

Prerequisites

Though the configuration and setting of a Kafka broker may vary, the following conditions are assumed for simplicity in this document.

(*1) Refer to How to create a certificate with a private certificate authority for details.

The following values are used in the examples.

In practice, use appropriate values for your environment. If you use SCRAM-SHA-512, replace SCRAM-SHA-256 to SCRAM-SHA-512.

Configurations on the Kafka broker (server side)

The following procedure is needed for a Kafka broker to perform SASL/SCRAM-SHA-256 authentication with SSL/TLS connection.

  1. Edit the Kafka broker’s properties file for SASL authentication
  2. Register SCRAM credentials in ZooKeeper
  3. Convert the file format
  4. Edit the Kafka broker’s properties file for SSL/TLS connection

Edit the Kafka broker’s properties file for SASL authentication

Add the following lines to the Kafka broker’s properties file /srv/kafka/config/server.properties

listeners=SASL_SSL://:9094
advertised.listeners=SASL_SSL://broker.example.org:9094
sasl.enabled.mechanisms=SCRAM-SHA-256
listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required;

The meanings of the above settings are:

Register SCRAM credentials in ZooKeeper

Register SCRAM credentials in ZooKeeper using the following command. Specify the user name after --entity-name and the password within the --add-config option.

$ /srv/kafka/bin/bin/kafka-configs.sh --zookeeper broker.example.org:2181 --alter --entity-type users \
      --entity-name user01 --add-config 'SCRAM-SHA-256=[iterations=8192,password=user01-pass]'
$ /srv/kafka/bin/bin/kafka-configs.sh --zookeeper broker.example.org:2181 --alter --entity-type users \
      --entity-name user02 --add-config 'SCRAM-SHA-256=[iterations=8192,password=user02-pass]'
$ /srv/kafka/bin/bin/kafka-configs.sh --zookeeper broker.example.org:2181 --alter --entity-type users \
      --entity-name user03 --add-config 'SCRAM-SHA-256=[iterations=8192,password=user03-pass]'

Convert the file format

Convert the certificate and the private key files from PEM format to PKCS#12 format so that the Kafka broker can read them. The CA certificate and its private key are converted and stored in a trust store, while the server certificate and its private key are converted and stored in a key store.

First, create a trust store using the follwing command. Specify the CA certificate filename after -in, its private key filename after -inkey, the output filename of the trust store after -out, and the password to be set for the trust store after -passout.

$ sudo mkdir -p /srv/kafka/config/cert
$ sudo openssl pkcs12 -export -in /etc/pki/CA/cacert.pem \
         -inkey /etc/pki/CA/private/cakey.pem -name private-ca \
         -CAfile /etc/pki/CA/cacert.pem -caname private-ca \
         -out /srv/kafka/config/cert/truststore.p12 \
         -passout pass:trust-pass-00

Next, create a key store using the following command. Specify the server certificate filename after -in, its private key filename after -inkey, the output filename of the trust store after -out, and the password to be set for the key store after -passout.

$ sudo openssl pkcs12 -export -in /etc/pki/CA/certs/broker.crt \
         -inkey /etc/pki/CA/private/broker.key -name broker \
         -CAfile /etc/pki/CA/cacert.pem -caname private-ca \
         -out /srv/kafka/config/cert/keystore.p12 \
         -passout pass:key-pass-00

Edit the Kafka broker’s properties file for SSL/TLS connection

Add the following lines to the Kafka broker’s properties file /srv/kafka/config/server.properties.

ssl.truststore.location=/srv/kafka/config/cert/truststore.p12
ssl.truststore.password=trust-pass-00
ssl.truststore.type=pkcs12
ssl.keystore.location=/srv/kafka/config/cert/keystore.p12
ssl.keystore.password=key-pass-00
ssl.keystore.type=pkcs12

The meanings of the above settings are:

Restart the Kafka broker

Restart the Kafka broker to apply the changes in the properties file.

$ sudo /srv/kafka/bin/kafka-server-stop.sh
$ sudo /srv/kafka/bin/kafka-server-start.sh /srv/kafka/config/server.properties

In order to change the settings without interrupting the service, configure multiple Kafka brokers and reflect the changes by rolling restart.

Configurations on SINETStream (client side)

The following procedure is needed for SINETStream to connect to the Kafka broker with SASL/SCRAM-SHA-256 authentication.

  1. Prepare certificate
  2. Edit the SINETStream’s configuration file
  3. Create a program that uses SINETStream

Prepare certificate

The following certificate is required on the client side to use SSL/TLS connection.

Deploy the certificate created by a private CA etc. to your convenient location. SINETStream reads the certificate from the path specified in the configuration file.

Edit the SINETStream’s configuration file

An example of SINETStream’s configuration file is shown below.

service-kafka-sasl-scram:
  brokers: broker.example.org:9094
  type: kafka
  topic: topic-001
  tls:
    ca_certs: /opt/certs/cacert.pem
  security_protocol: SASL_SSL
  sasl_mechanism: SCRAM-SHA-256
  sasl_plain_username: user01
  sasl_plain_pasword: user01-pass

The settings for brokers, type, topic, consistency, tls are identical to those without authentication. Settings related to SASL authentication are:

Create a program that uses SINETStream

Your program will be identical with or without SASL/SCRAM authentication. For example, a program that uses MessageWriter of the SINETStream’s Python API is shown below.

with sinetstream.MessageWriter(service='service-kafka-sasl-scram') as writer:
    writer.publish(b'message 001')

As you see, no code is written for authentication.

If you want to configure the authentication within your program, add parameters to the constructor arguments.

user_passwd = {
    'sasl_plain_username': 'user01',
    'sasl_plain_password': 'user01-pass',
}
with sinetstream.MessageWriter(service='service-kafka-sasl-scram', **user_passwd) as writer:
    writer.publish(b'message 001')

Behavior on authentication errors

Python API

The methods listed below raises the sinetstream.error.ConnectionError exception when an authentication error occurs.

Java API

The methods listed below throws the jp.ad.sinet.stream.api.AuthenticationException exception when an authentication error occurs.