Starter Kafka¶
The module sda-commons-starter-kafka
provides autoconfigured Kafka producer and consumer
configuration.
Based on:
org.springframework.boot:spring-boot-starter
org.springframework.boot:spring-boot-starter-validation
org.springframework.kafka:spring-kafka
Configuration¶
Property | Description | Default | Example | Env |
---|---|---|---|---|
sda.kafka.consumer.retry.initialBackOffInterval int |
The initial backoff of the retry in milli seconds. | 1000 |
1500 |
SDA_KAFKA_CONSUMER_RETRY_INITIAL_BACKOFF_INTERVALL |
sda.kafka.consumer.retry.maxBackOffInterval int |
The max backoff interval in milli seconds. | 4000 |
5000 |
SDA_KAFKA_CONSUMER_RETRY_MAX_BACKOFF_INTERVALL |
sda.kafka.consumer.retry.backOffMultiplier double |
The multiplier beginning with the initial backoff. | 2 |
1.5 |
SDA_KAFKA_CONSUMER_RETRY_INITIAL_BACKOFF_INTERVALL |
sda.kafka.consumer.retry.maxRetries int |
Max retries consuming the offset. | 4 |
10 |
SDA_KAFKA_CONSUMER_RETRY_INITIAL_MAXRETRIES |
sda.kafka.consumer.dlt.pattern string |
Pattern of consumer dead letter topic. <topic> will be replaced by topic name. If not set, "dlt-" is added to the topic name as prefix. If set to empty the spring-boot fallback of <topic>.DLT is used. |
"dlt-<topic>" |
"prefix-<topic>" |
SDA_KAFKA_CONSUMER_DLT_PATTERN |
management.health.kafka.enabled boolean |
Flag to enable kafka health check. | true |
false |
MANAGEMENT_HEALTH_KAFKA_ENABLED |
management.health.kafka.timeout duration |
Allowed duration for health check to finish. | 4s |
"5000ms" |
MANAGEMENT_HEALTH_KAFKA_TIMEOUT |
spring.kafka.bootstrap.servers string |
Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. |
"localhost:9092" |
"kafka-broker:9092" |
SPRING_KAFKA_BOOTSTRAP_SERVERS |
spring.kafka.security.protocol string |
The security protocol used by Kafka. Please note that SASL mechanism requires some manual configuration. | "PLAINTEXT" |
"SSL" |
SPRING_KAFKA_SECURITY_PROTOCOL |
spring.kafka.ssl.keystore-location url |
Location of the SSL keystore file. | "file:///kafka/kafka.client.keystore.jks" |
SPRING_KAFKA_SSL_KEYSTORELOCATION |
|
spring.kafka.ssl.key-store-password string |
Password for the SSL keystore file. | "s3cr3t" |
SPRING_KAFKA_SSL_KEYSTOREPASSWORD |
|
spring.kafka.ssl.trust-store-location string |
Location of the SSL truststore file. | "file:/kafka-certs/kafka.client.keystore.jks" |
SPRING_KAFKA_SSL_TRUSTSTORELOCATION |
|
spring.kafka.ssl.trust-store-password string |
Password for the SSL truststore file. | "s3cret" |
SPRING_KAFKA_SSL_TRUSTSTOREPASSWORD |
|
spring.kafka.consumer.group-id string |
Consumer group name of Kafka Consumer. | "default" |
"my-service-name" |
SPRING_KAFKA_CONSUMER_GROUPID |
Make sure to overwrite spring.kafka.consumer.group-id
in your application.properties
otherwise
you could have conflicts with other services using default
.
For further information have a look to the Spring Kafka reference documentation.
Default configuration set by this library
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
|
Consumer configuration¶
The autoconfigured consumer configuration provides
several ConcurrentKafkaListenerContainerFactory<String, ?>
which can be referenced in @KafkaListener
annotated methods.
SdaKafkaListenerContainerFactory.LOG_ON_FAILURE
- Simply logs the exception; with a record listener, the remaining records from the previous poll are passed to the listener.
SdaKafkaListenerContainerFactory.RETRY_AND_LOG
- Skips record that keeps failing after
sda.kafka.consumer.retry.maxRetries
(default:4
) and log exception.
- Skips record that keeps failing after
SdaKafkaListenerContainerFactory.RETRY_AND_DLT
- Skips record that keeps failing after
sda.kafka.consumer.retry.maxRetries
(default: 4) and produces failed record to the configured dlt topic. -
The spring DLT naming convention can be configured using the
sda.kafka.consumer.dlt.pattern
property. The pattern must contain<topic>
, which will be replaced by the actual topic name. If not configured, it defaults todtl-<topic>
. As mentioned in the migration guide, thesda.kafka.consumer.dlt.pattern
property can also be configured as empty to force the spring boot default implementation of<topic>.DLT
as a fallback to not break older implementations.So there are three possible configurations: -
sda.kafka.consumer.dlt.pattern
not configured, fallback to sda default and resulting dlt topic:dlt-example
-sda.kafka.consumer.dlt.pattern
configured as empty, fallback to spring default and resulting dlt topic:example.DLT
-sda.kafka.consumer.dlt.pattern
configured with<topic>-customSuffix
, resulting dlt topic:example-customSuffix
- Skips record that keeps failing after
To skip retry for specific business errors, you can throw the custom NotRetryableKafkaException
.
Each containerFactory
expects a message key as String
and the message payload of any type.
The payload is deserialized as byte array and converted with the ByteArrayJsonMessageConverter
.
The ack mode for offsets is per default RECORD
where the offset after each record is
processed by the listener.
1 2 3 4 5 6 7 8 9 |
|
DLT Error Handling¶
To allow handling of serialization exceptions, the DLT KafkaTemplate is using a ByteArraySerializer.
You can add additional templates used for uploading messages to the dead-letter-topic for records
that were deserialized successfully, by overwriting the dltTemplates
bean.
see spring-kafka documentation.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
|
Producer configuration¶
The autoconfigured producer configuration provides a preconfigured KafkaTemplate
for producing
messages with String
key and payload as json
.
To configure different serializers, use spring.kafka.producer.key-serializer
and
spring.kafka.producer.value-serializer
properties
ProducerFactory
Do not hard code ((DefaultKafkaProducerFactory<?, ?>) producerFactory)
.setValueSerializer(new JsonSerializer<>(objectMapper));
of the default spring producer factory.
It will affect other configurations, we recommend to use a copy of the producer factory as in the example below.
1 2 3 4 5 6 7 8 9 10 11 12 |
|
You need to autowire the KafkaTemplate using a Qualifier.
1 |
|