Starter Kafka¶
The module sda-commons-starter-kafka provides autoconfigured Kafka producer and consumer
configuration.
Based on:
org.springframework.boot:spring-boot-starterorg.springframework.boot:spring-boot-starter-validationorg.springframework.kafka:spring-kafka
Configuration¶
| Property | Description | Default | Example | Env |
|---|---|---|---|---|
sda.kafka.consumer.retry.initialBackOffInterval int |
The initial backoff of the retry in milli seconds. | 1000 |
1500 |
SDA_KAFKA_CONSUMER_RETRY_INITIAL_BACKOFF_INTERVALL |
sda.kafka.consumer.retry.maxBackOffInterval int |
The max backoff interval in milli seconds. | 4000 |
5000 |
SDA_KAFKA_CONSUMER_RETRY_MAX_BACKOFF_INTERVALL |
sda.kafka.consumer.retry.backOffMultiplier double |
The multiplier beginning with the initial backoff. | 2 |
1.5 |
SDA_KAFKA_CONSUMER_RETRY_INITIAL_BACKOFF_INTERVALL |
sda.kafka.consumer.retry.maxRetries int |
Max retries consuming the offset. | 4 |
10 |
SDA_KAFKA_CONSUMER_RETRY_INITIAL_MAXRETRIES |
sda.kafka.consumer.dlt.pattern string |
Pattern of consumer dead letter topic. <topic> will be replaced by topic name. If not set, "dlt-" is added to the topic name as prefix. If set to empty the spring-boot fallback of <topic>.DLT is used. |
"dlt-<topic>" |
"prefix-<topic>" |
SDA_KAFKA_CONSUMER_DLT_PATTERN |
management.health.kafka.enabled boolean |
Flag to enable kafka health check. | true |
false |
MANAGEMENT_HEALTH_KAFKA_ENABLED |
management.health.kafka.timeout duration |
Allowed duration for health check to finish. | 4s |
"5000ms" |
MANAGEMENT_HEALTH_KAFKA_TIMEOUT |
spring.kafka.bootstrap.servers string |
Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. |
"localhost:9092" |
"kafka-broker:9092" |
SPRING_KAFKA_BOOTSTRAP_SERVERS |
spring.kafka.security.protocol string |
The security protocol used by Kafka. Please note that SASL mechanism requires some manual configuration. | "PLAINTEXT" |
"SSL" |
SPRING_KAFKA_SECURITY_PROTOCOL |
spring.kafka.ssl.keystore-location url |
Location of the SSL keystore file. | "file:///kafka/kafka.client.keystore.jks" |
SPRING_KAFKA_SSL_KEYSTORELOCATION |
|
spring.kafka.ssl.key-store-password string |
Password for the SSL keystore file. | "s3cr3t" |
SPRING_KAFKA_SSL_KEYSTOREPASSWORD |
|
spring.kafka.ssl.trust-store-location string |
Location of the SSL truststore file. | "file:/kafka-certs/kafka.client.keystore.jks" |
SPRING_KAFKA_SSL_TRUSTSTORELOCATION |
|
spring.kafka.ssl.trust-store-password string |
Password for the SSL truststore file. | "s3cret" |
SPRING_KAFKA_SSL_TRUSTSTOREPASSWORD |
|
spring.kafka.consumer.group-id string |
Consumer group name of Kafka Consumer. | "default" |
"my-service-name" |
SPRING_KAFKA_CONSUMER_GROUPID |
Make sure to overwrite spring.kafka.consumer.group-id in your application.properties otherwise
you could have conflicts with other services using default.
For further information have a look to the Spring Kafka reference documentation.
Default configuration set by this library
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | |
Consumer configuration¶
The autoconfigured consumer configuration provides
several ConcurrentKafkaListenerContainerFactory<String, ?>
which can be referenced in @KafkaListener annotated methods.
SdaKafkaListenerContainerFactory.LOG_ON_FAILURE- Simply logs the exception; with a record listener, the remaining records from the previous poll are passed to the listener.
SdaKafkaListenerContainerFactory.RETRY_AND_LOG- Skips record that keeps failing after
sda.kafka.consumer.retry.maxRetries(default:4) and log exception.
- Skips record that keeps failing after
SdaKafkaListenerContainerFactory.RETRY_AND_DLT- Skips record that keeps failing after
sda.kafka.consumer.retry.maxRetries(default: 4) and produces failed record to the configured dlt topic. -
The spring DLT naming convention can be configured using the
sda.kafka.consumer.dlt.patternproperty. The pattern must contain<topic>, which will be replaced by the actual topic name. If not configured, it defaults todtl-<topic>. As mentioned in the migration guide, thesda.kafka.consumer.dlt.patternproperty can also be configured as empty to force the spring boot default implementation of<topic>.DLTas a fallback to not break older implementations.So there are three possible configurations: -
sda.kafka.consumer.dlt.patternnot configured, fallback to sda default and resulting dlt topic:dlt-example-sda.kafka.consumer.dlt.patternconfigured as empty, fallback to spring default and resulting dlt topic:example.DLT-sda.kafka.consumer.dlt.patternconfigured with<topic>-customSuffix, resulting dlt topic:example-customSuffix
- Skips record that keeps failing after
To skip retry for specific business errors, you can throw the custom NotRetryableKafkaException.
Each containerFactory expects a message key as String and the message payload of any type.
The payload is deserialized as byte array and converted with the ByteArrayJsonMessageConverter.
The ack mode for offsets is per default RECORD where the offset after each record is
processed by the listener.
1 2 3 4 5 6 7 8 9 | |
DLT Error Handling¶
To allow handling of serialization exceptions, the DLT KafkaTemplate is using a ByteArraySerializer.
You can add additional templates used for uploading messages to the dead-letter-topic for records
that were deserialized successfully, by overwriting the dltTemplates bean.
see spring-kafka documentation.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | |
Producer configuration¶
The autoconfigured producer configuration provides a preconfigured KafkaTemplate for producing
messages with String key and payload as json.
To configure different serializers, use spring.kafka.producer.key-serializer and
spring.kafka.producer.value-serializer properties
ProducerFactory
Do not hard code ((DefaultKafkaProducerFactory<?, ?>) producerFactory)
.setValueSerializer(new JsonSerializer<>(objectMapper)); of the default spring producer factory.
It will affect other configurations, we recommend to use a copy of the producer factory as in the example below.
1 2 3 4 5 6 7 8 9 10 11 12 | |
You need to autowire the KafkaTemplate using a Qualifier.
1 | |