KafkaConnector Extension

The KafkaConnector Extension provides tools to integrate with Apache Kafka.

Extension Configuration

The KafkaConnector extension allows configuring multiple Kafka brokers.

The following configuration property (System Configuration > [Extension] > Edit Configuration in FNZ Studio Composition) can be used to configure a broker called my-dev-broker: kafka.my-dev-broker.bootstrap.servers — Comma-separated list of cluster servers (e.g.,: localhost:9092).

An example configuration for a broker called main-broker is:

Copy
kafka.main-broker.bootstrap.servers = localhost:9092

To configure another broker, use a similar syntax, but change the broker name, e.g., kafka.another-broker.bootstrap.servers = localhost:9092,localhost:9093

Note: The broker name must not contain the character `.` (dot).

Dynamic Configuration

The KafkaConnector extension allows configuring Kafka brokers dynamically.

The configuration properties to configure a broker have the following format:

  • kafka.<broker-name>.common.<property-name> = <property-value>
  • kafka.<broker-name>.producer.<property-name> = <property-value>
  • kafka.<broker-name>.consumer.<property-name> = <property-value>

The configuration properties to configure Kafka producers start with:

  • kafka.<broker-name>.producer. or kafka.<broker-name>.common.

The configuration properties to configure Kafka consumers start with:

  • kafka.<broker-name>.consumer. or kafka.<broker-name>.common.
Note: If you use the same property name with the kafka.<broker-name>.common. prefix and the kafka.<broker-name>.producer. or kafka.<broker-name>.consumer. prefixes, the value of the property that uses the kafka.<broker-name>.producer. or kafka.<broker-name>.consumer. prefix will be used.

Consider that the following Kafka properties cannot be configured dynamically: bootstrap.servers, key.serializer, value.serializer, key.deserializer, value.deserializer, group.id. The bootstrap.servers property must be configured using the kafka.<broker-name>.bootstrap.servers property, while the rest of properties are configured dynamically based on the configuration of the Kafka producer or consumer Integration Links.

Example:

Copy
kafka.main-broker.common.client.id = fnz-studio
kafka.main-broker.producer.linger.ms = 100
kafka.main-broker.consumer.auto.offset.reset = earliest

After changing the configuration, the extension must be restarted.

Security

The KafkaConnector extension supports the security mechanisms provided by Kafka. These mechanisms can be set using the dynamic configuration explained above.

Check the Kafka documentation for more details on the security mechanisms available.

An example configuration for the SASL_PLAINTEXT security protocol for a broker called main-broker is illustrated below.

Note that his configuration is used for both producers and consumers that use the `main-broker` broker.
Copy
kafka.main-broker.common.security.protocol = SASL_PLAINTEXT
kafka.main-broker.common.sasl.mechanism = PLAIN
kafka.main-broker.common.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";

Consider that, since kafka.<broker-name>.common.sasl.jaas.config contains sensitive information, it is stored in the FNZ Studio key service and, therefore, it us masked in the FNZ Studio UI.

Transport Layer Security (TLS)

The KafkaConnector extension supports mutual Transport Layer Security (mTLS) for the communication between Kafka clients and Kafka brokers. mTLS configuration set using the dynamic configuration explained above.

An example configuration for the mTLS security protocol for a broker called main-broker is illustrated below. This configuration is used for both producers and consumers connecting to the main-broker.

Copy
kafka.main-broker.common.security.protocol = SSL
kafka.main-broker.common.ssl.truststore.location = /path/to/truststore.jks
kafka.main-broker.common.ssl.truststore.password = truststore-password
kafka.main-broker.common.ssl.keystore.location = /path/to/keystore.jks
kafka.main-broker.common.ssl.keystore.password = keystore-password
kafka.main-broker.common.ssl.key.password = key-password

FNZ Studio Truststore and Keystore

FNZ Studio provides functionality to store certificates and private keys in a truststore and keystore. The AddPrivateKey and AddTrustedCertificate Script Functions can be used to add certificates and private keys to the truststore and keystore.

If you need to use the information stored in the FNZ Studio truststore and keystore, use the following configuration properties:

  • kafka.<broker-name>.common.certificate.alias — The alias of the certificate that was previously stored in the FNZ Studio truststore. When this property is specified, the extension automatically configures the following properties: kafka.<broker-name>.common.ssl.truststore.location; kafka.<broker-name>.common.ssl.truststore.password.
  • kafka.<broker-name>.common.private.key.alias — (optional) The alias of the private key that was previously stored in the FNZ Studio keystore. When this property is specified, the extension automatically configures the following properties: kafka.<broker-name>.common.ssl.keystore.location; kafka.<broker-name>.common.ssl.keystore.password; kafka.<broker-name>.common.ssl.key.password.

When setting one of the above properties, you must also set a security protocol using the property: kafka.<broker-name>.common.security.protocol.

Example:

Copy
kafka.main-broker.common.security.protocol = SSL
kafka.main-broker.certificate.alias = kafka
kafka.main-broker.private.key.alias = kafka
Note: If you add a certificate or a private key to the FNZ Studio truststore or keystore, you must restart the KafkaConnector extension for the changes to take effect.

Integration Links

The KafkaConnector extension provides two components that can be used in Integration Links: Kafka Consumer and Kafka Producer.

Kafka Consumer

A Start Element called Kafka Consumer can be used to receive messages from Kafka. When FNZ Studio is running on multiple cluster nodes, only a cluster node will process a message received from Kafka.

The Kafka Consumer component has the following properties:

  • Broker — (mandatory) Name of the Kafka broker to use that was configured in the extension.
  • Topic — (mandatory) Name of the topic to send or receive messages to/from. This must be previously created in Kafka.
  • MessageType — (mandatory) Type of the message payload. e.g.,: string, byte array.
  • Group Id — (mandatory) Label that uniquely identifies the group of consumer processes to which a consumer belongs.

The following configuration can be used to configure the Kafka Consumer component:

  • kafka.<broker-name>.consumer.poll.timeout — Maximum time to block for a consumer while fetching data (in milliseconds). Default: 500 ms.

Once the Integration Link is started, a new thread responsible to consume messages starts as well. The Kafka consumer subscribes to the given topic and starts polling for records to consume until the Integration Link is stopped.

Consumer Commit Strategy

Auto-commit is disabled by default and cannot be enabled through dynamic properties. Every time records are consumed and processed, the consumer performs an asynchronous commit to the broker. When the Integration Link is stopped, a synchronous commit to the broker is performed to avoid potential data loss and the Kafka consumer is then closed.

Consumer Exception Handling

While consuming records, the following exception might happen with the following behavior:

  • A RetriableException (e.g.: TimeoutException) is thrown while polling for new records. The consumer logs the error and retries to consume.
  • An exception is thrown while the Integration Link tries to process the records. The consumer logs the error and retries to consume.
  • A Fatal Exception (e.g.: SerializationException, AuthorizationException) is thrown while polling for new records. The Kafka consumer and the Integration Link are stopped without any further commits. The Integration Link must be manually restarted after the exceptional situation has been fixed.

Headers

During record processing, the Kafka consumer adds relevant information about the records that can be used by further components in the Integration Link pipeline.

The information is added as Camel headers:

  • KAFKA_KEY — (optional) Key of the consumed message.
  • KAFKA_PARTITION — (mandatory) Partition of the topic this record is received from.
  • KAFKA_OFFSET — (mandatory) Offset of the record in the corresponding Kafka partition.
  • KAFKA_TIMESTAMP — (mandatory) Timestamp of the record.

Kafka Producer

An End Component called Kafka Producer can be used to send messages to Kafka. The Kafka Producer component has the following properties:

  • Broker — (mandatory) Name of the Kafka broker to use that was configured in the extension.
  • Topic — (mandatory) Name of the topic to send or receive messages from. This must be previously created in Kafka.
  • MessageType — (mandatory) Type of the message payload, e.g., string, byte array.
Note: The Kafka Producer component does not wait for the message to be acknowledged by Kafka, instead, it just sends the message to Kafka and continues the execution of the Integration Link.

Kafka Keys Support

By default, having a key is not mandatory. If not specified, no key will be used and the message will be sent to a random partition.

Otherwise, it is possible to assign a string key to the message by adding a KAFKA_KEY header,whose value is the key to be used. For example, you can assign a key with the following script using the Scripted Processor component: $in.setHeader('KAFKA_KEY', 'the key value');

Troubleshooting

To troubleshoot the KafkaConnector extension, enable the debug logging for the org.apache.kafka logger. You can do this by adding the following line to the {nm.data.home}/conf/log4j2-additional.properties file:

log4j.logger.org.apache.kafka=DEBUG

Further Information

Note the following further information:

  • When the extension is started, it does not try to connect to the configured Kafka brokers, Therefore, no feedback will be provided if the brokers are not correctly configured.
  • When starting a consumer/producer Integration Link against an invalid broker, it will keep trying to connect to the broker until it responds (enable info-logging for the org.apache.kafka to see related logs). The same situation occurs when the Integration Link has already been started but the broker is temporarily down. This can eventually cause some delays between when the stop command is sent to the Integration Link and when it reaches the inactive state.
  • When starting a producer Integration Link against an invalid topic, no feedback is provided and the Integration Link is started. On the first sending attempt, if the topic does not yet exist, an infinite series of attempts will begin, unless the Integration Link is stopped.
  • When starting a consumer Integration Link against an invalid topic, no feedback is provided and the IntegrationLink is started. An infinite series of connection attempts will begin immediately, unless the Integration Link is stopped.