Skip to main content

Configuring Kafka for persisted streams

Kafka streams may be persisted to Striim's internal Kafka server or to an external Kafka server.

Kafka PropertySet properties for persisted streams

The following table describes the properties that can be configured in a Kafka PropertySet for persisted streams.

Property

Required?

Default value

Possible values

kafkaversion

No

3.9

3.9

bootstrap.brokers

Yes

Comma separated list of <host>:<port> for the brokers.

jmx.broker

No

<host>:<port> for the JMX server.

partitions

No

20

Positive integer. The maximum number of Kafka partitions to use if the stream is partitioned. If the stream is not partitioned, only one partition is used and this value is ignored.

replication.factor

No

1

Positive integer up to the number of brokers. The number of replicas to keep for each event. If this is greater than the number of brokers, creation of a topic will fail.

securityconfig

No

Security properties for Kafka. See Enabling security.

Using Striim's internal Kafka server

The default property set for the internal Kafka server that is installed with Striim at Striim/Kafka is Global.DefaultKafkaProperties:

kafkaversion: '3.9'
bootstrap.brokers; 'localhost:9092',
jmx.broker: 'localhost:9998',
partitions: '20',
replication.factor: '1'

To change properties in an existing property set, see ALTER PROPERTYSET.

If you installed Striim with the JAR installer as discussed in Install Striim Platform for evaluation purposes and enabled Kafka in the setup wizard, it will start automatically. If you did not enable Kafka during installation, you may do so by re-running the setup wizard in the Strim/bin directory (WebConfig.exe for Windows, WebConfig for Mac, or WebConfig.sh for Linux).

If you installed Striim from a DEB, RPM, TGZ, or ZIP package as discussed in Running Striim in Ubuntu, Running Striim in Red Hat Enterprise Linux, or Running Striim as a process, start Kafka as follows:

  1. Open a terminal.

  2. Change to Striim/Kafka, and enter bin/zookeeper-server-start.sh config/zookeeper.properties (this will start Zookeeper).

  3. Open another terminal.

  4. Change to Striim/Kafka and enter JMX_PORT=9998 bin/kafka-server-start.sh config/server.properties (this will start Kafka).

You can then persist Kafka streams using the default property set.

Using an external Kafka server

When using an external Kafka server, to handle Striim's maximum batch size the following entries in config/server.properties must have at least these minimum values:

message.max.bytes = 43264200
replica.fetch.max.bytes = 43264200
socket.request.max.bytes=104857600 

To support persisting streams to an external server, use the Tungsten console to create a custom Striim property set using the following syntax:

CREATE PROPERTYSET <name> (
  bootstrap.brokers:'<bootstrap IP address>:<port>',
  jmx.broker:'<jmx IP address>:<port>',
  partitions:'<number of partitions to use>',
  replication.factor:'<number of replicas>'
);

If not specified, partitions defaults to 20.

Example: Create a Persisted Stream using Kafka 3.9 client with a Kafka topic having 1 partition and 1 replica, without any security:

create propertyset KPS_PropSet (
  bootstrap.brokers: '<broker_address>',
  partitions: '1',
  replication.factor: '1'
);
create stream db_data of Global.WAEvent
  persist using KPS_PropSet;

To change properties in an existing property set, see ALTER PROPERTYSET.

Enabling security

To connect to a secure Kafka broker, security properties must be defined under the securityconfig property. The main security property is security.protocol that determines what kind of security is being used. Based on this property value, other properties are required.

SASL

SASL (Simple Authentication and Security Layer) is a standardized framework that provides authentication. To use SASL authentication, the value for security.protocol is SASL_PLAINTEXT. The property sasl.mechanism defines the type of authentication used. Possible values are PLAIN for username/password based authentication or GSSAPI for Kerberos based authentication.

SASL/PLAIN

SASL/PLAIN uses username and password credentials for authentication.

Example: SASL/PLAIN

securityConfig: '
  security.protocol: SASL_PLAINTEXT,
  sasl.mechanism: PLAIN,
  sasl.jaas.config:
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="<user_name>"
    password="<password>" ; '

SASL/GSSAPI (Kerberos)

SASL/GSSAPI uses Kerberos for authentication.

Example: SASL/GSSAPI

securityConfig: '
  security.protocol: SASL_PLAINTEXT,
  sasl.mechanism: GSSAPI,
  sasl.kerberos.service.name: kafka,
  sasl.jaas.config:
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="<path_to_kafka_client_kerberos_keytab_file>"
    principal="<identity_of_kafka_client_kerberos_principal>" ; '

TLS/mTLS

TLS (Transport Layer Security) provides privacy and authentication. While with regular TLS only the server is authenticated by the clients, with mTLS (Mutual Transport Layer Security) both the clients and the server authenticate each other. To use TLS/mTLS, the value for security.protocol is SSL.

mTLS

Mutual TLS provides bidirectional authentication.

Example: mTLS

securityConfig: '
  security.protocol: SSL,
  ssl.keystore.location: <path_to_jks_keystore_file>,
  ssl.keystore.password: <keystore_password>,
  ssl.key.password: <private_key_password>,
  ssl.truststore.location: <path_to_jks_truststore_file>,
  ssl.truststore.password: <truststore_password> '

TLS (one-way)

One-way TLS authenticates only the server.

Example: TLS

securityConfig: '
  security.protocol: SSL,
  ssl.truststore.location: <path_to_jks_truststore_file>,
  ssl.truststore.password: <truststore_password> '

Note

The default truststore and keystore type is JKS. A PKCS12 or PEM store can also be used with the additional properties:

ssl.truststore.type=<store_type>
ssl.keystore.type=<store_type>

SASL with TLS

To use both SASL and TLS, the value for security.protocol is SASL_SSL.

Example: SASL/PLAIN with mTLS

securityConfig: '
  security.protocol: SASL_SSL,
  ssl.keystore.location: <path_to_jks_keystore_file>,
  ssl.keystore.password: <keystore_password>,
  ssl.key.password: <private_key_password>,
  ssl.truststore.location: <path_to_jks_truststore_file>,
  ssl.truststore.password: <truststore_password>,
  sasl.mechanism: PLAIN,
  sasl.jaas.config:
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="<user_name>"
    password="<password>" ; '

Using Confluent Cloud

Confluent Cloud provides fully managed Kafka services. This section describes how to configure Striim to persist streams to Confluent Cloud Kafka.

Confluent Cloud Kafka

Confluent Cloud Kafka is a secure Kafka that uses SASL with TLS. The SASL is username/password based and the TLS is one-way. The Kafka brokers use certificates signed by public CAs that are trusted by Java's default truststore. Hence no additional keystore or truststore configurations are necessary.

Example: Persisted Stream PropertySet for Confluent Cloud Kafka

create PropertySet Confluent_Kafka_PropSet (
  bootstrap.brokers: '<confluent_cloud_kafka_broker_address>',
  partitions: '1',
  replication.factor: '3',
  securityConfig: '
    security.protocol: SASL_SSL,
    sasl.mechanism: PLAIN,
    sasl.jaas.config:
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="<cluster_api_key>"
      password="<cluster_api_secret>" ;'
);

create stream connect_data of Global.WAEvent
  persist using Confluent_Kafka_PropSet;

Using Kafka without authentication or encryption

To use neither SASL authentication nor SSL encryption, do not specify securityconfig in your Kafka PropertySet.