Configuring Kafka for persisted streams
Kafka streams may be persisted to Striim's internal Kafka server or to an external Kafka server.
Kafka PropertySet properties for persisted streams
The following table describes the properties that can be configured in a Kafka PropertySet for persisted streams.
Property | Required? | Default value | Possible values |
|---|---|---|---|
kafkaversion | No | 3.9 | 3.9 |
bootstrap.brokers | Yes | Comma separated list of <host>:<port> for the brokers. | |
jmx.broker | No | <host>:<port> for the JMX server. | |
partitions | No | 20 | Positive integer. The maximum number of Kafka partitions to use if the stream is partitioned. If the stream is not partitioned, only one partition is used and this value is ignored. |
replication.factor | No | 1 | Positive integer up to the number of brokers. The number of replicas to keep for each event. If this is greater than the number of brokers, creation of a topic will fail. |
securityconfig | No | Security properties for Kafka. See Enabling security. |
Using Striim's internal Kafka server
The default property set for the internal Kafka server that is installed with Striim at Striim/Kafka is Global.DefaultKafkaProperties:
kafkaversion: '3.9' bootstrap.brokers; 'localhost:9092', jmx.broker: 'localhost:9998', partitions: '20', replication.factor: '1'
To change properties in an existing property set, see ALTER PROPERTYSET.
If you installed Striim with the JAR installer as discussed in Install Striim Platform for evaluation purposes and enabled Kafka in the setup wizard, it will start automatically. If you did not enable Kafka during installation, you may do so by re-running the setup wizard in the Strim/bin directory (WebConfig.exe for Windows, WebConfig for Mac, or WebConfig.sh for Linux).
If you installed Striim from a DEB, RPM, TGZ, or ZIP package as discussed in Running Striim in Ubuntu, Running Striim in Red Hat Enterprise Linux, or Running Striim as a process, start Kafka as follows:
Open a terminal.
Change to
Striim/Kafka, and enterbin/zookeeper-server-start.sh config/zookeeper.properties(this will start Zookeeper).Open another terminal.
Change to
Striim/Kafkaand enterJMX_PORT=9998 bin/kafka-server-start.sh config/server.properties(this will start Kafka).
You can then persist Kafka streams using the default property set.
Using an external Kafka server
When using an external Kafka server, to handle Striim's maximum batch size the following entries in config/server.properties must have at least these minimum values:
message.max.bytes = 43264200 replica.fetch.max.bytes = 43264200 socket.request.max.bytes=104857600
To support persisting streams to an external server, use the Tungsten console to create a custom Striim property set using the following syntax:
CREATE PROPERTYSET <name> ( bootstrap.brokers:'<bootstrap IP address>:<port>', jmx.broker:'<jmx IP address>:<port>', partitions:'<number of partitions to use>', replication.factor:'<number of replicas>' );
If not specified, partitions defaults to 20.
Example: Create a Persisted Stream using Kafka 3.9 client with a Kafka topic having 1 partition and 1 replica, without any security:
create propertyset KPS_PropSet ( bootstrap.brokers: '<broker_address>', partitions: '1', replication.factor: '1' ); create stream db_data of Global.WAEvent persist using KPS_PropSet;
To change properties in an existing property set, see ALTER PROPERTYSET.
Enabling security
To connect to a secure Kafka broker, security properties must be defined under the securityconfig property. The main security property is security.protocol that determines what kind of security is being used. Based on this property value, other properties are required.
SASL
SASL (Simple Authentication and Security Layer) is a standardized framework that provides authentication. To use SASL authentication, the value for security.protocol is SASL_PLAINTEXT. The property sasl.mechanism defines the type of authentication used. Possible values are PLAIN for username/password based authentication or GSSAPI for Kerberos based authentication.
SASL/PLAIN
SASL/PLAIN uses username and password credentials for authentication.
Example: SASL/PLAIN
securityConfig: '
security.protocol: SASL_PLAINTEXT,
sasl.mechanism: PLAIN,
sasl.jaas.config:
org.apache.kafka.common.security.plain.PlainLoginModule required
username="<user_name>"
password="<password>" ; 'SASL/GSSAPI (Kerberos)
SASL/GSSAPI uses Kerberos for authentication.
Example: SASL/GSSAPI
securityConfig: '
security.protocol: SASL_PLAINTEXT,
sasl.mechanism: GSSAPI,
sasl.kerberos.service.name: kafka,
sasl.jaas.config:
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="<path_to_kafka_client_kerberos_keytab_file>"
principal="<identity_of_kafka_client_kerberos_principal>" ; 'TLS/mTLS
TLS (Transport Layer Security) provides privacy and authentication. While with regular TLS only the server is authenticated by the clients, with mTLS (Mutual Transport Layer Security) both the clients and the server authenticate each other. To use TLS/mTLS, the value for security.protocol is SSL.
mTLS
Mutual TLS provides bidirectional authentication.
Example: mTLS
securityConfig: ' security.protocol: SSL, ssl.keystore.location: <path_to_jks_keystore_file>, ssl.keystore.password: <keystore_password>, ssl.key.password: <private_key_password>, ssl.truststore.location: <path_to_jks_truststore_file>, ssl.truststore.password: <truststore_password> '
TLS (one-way)
One-way TLS authenticates only the server.
Example: TLS
securityConfig: ' security.protocol: SSL, ssl.truststore.location: <path_to_jks_truststore_file>, ssl.truststore.password: <truststore_password> '
Note
The default truststore and keystore type is JKS. A PKCS12 or PEM store can also be used with the additional properties:
ssl.truststore.type=<store_type> ssl.keystore.type=<store_type>
SASL with TLS
To use both SASL and TLS, the value for security.protocol is SASL_SSL.
Example: SASL/PLAIN with mTLS
securityConfig: '
security.protocol: SASL_SSL,
ssl.keystore.location: <path_to_jks_keystore_file>,
ssl.keystore.password: <keystore_password>,
ssl.key.password: <private_key_password>,
ssl.truststore.location: <path_to_jks_truststore_file>,
ssl.truststore.password: <truststore_password>,
sasl.mechanism: PLAIN,
sasl.jaas.config:
org.apache.kafka.common.security.plain.PlainLoginModule required
username="<user_name>"
password="<password>" ; 'Using Confluent Cloud
Confluent Cloud provides fully managed Kafka services. This section describes how to configure Striim to persist streams to Confluent Cloud Kafka.
Confluent Cloud Kafka
Confluent Cloud Kafka is a secure Kafka that uses SASL with TLS. The SASL is username/password based and the TLS is one-way. The Kafka brokers use certificates signed by public CAs that are trusted by Java's default truststore. Hence no additional keystore or truststore configurations are necessary.
Example: Persisted Stream PropertySet for Confluent Cloud Kafka
create PropertySet Confluent_Kafka_PropSet (
bootstrap.brokers: '<confluent_cloud_kafka_broker_address>',
partitions: '1',
replication.factor: '3',
securityConfig: '
security.protocol: SASL_SSL,
sasl.mechanism: PLAIN,
sasl.jaas.config:
org.apache.kafka.common.security.plain.PlainLoginModule required
username="<cluster_api_key>"
password="<cluster_api_secret>" ;'
);
create stream connect_data of Global.WAEvent
persist using Confluent_Kafka_PropSet;Using Kafka without authentication or encryption
To use neither SASL authentication nor SSL encryption, do not specify securityconfig in your Kafka PropertySet.