Skip to content

Starter Kafka

The module sda-commons-starter-kafka provides autoconfigured Kafka producer and consumer configuration.

Based on:

  • org.springframework.boot:spring-boot-starter
  • org.springframework.boot:spring-boot-starter-validation
  • org.springframework.kafka:spring-kafka

Configuration

Property Description Default Example Env
sda.kafka.consumer.retry.initialBackOffInterval int The initial backoff of the retry in milli seconds. 1000 1500 SDA_KAFKA_CONSUMER_RETRY_INITIAL_BACKOFF_INTERVALL
sda.kafka.consumer.retry.maxBackOffInterval int The max backoff interval in milli seconds. 4000 5000 SDA_KAFKA_CONSUMER_RETRY_MAX_BACKOFF_INTERVALL
sda.kafka.consumer.retry.backOffMultiplier double The multiplier beginning with the initial backoff. 2 1.5 SDA_KAFKA_CONSUMER_RETRY_INITIAL_BACKOFF_INTERVALL
sda.kafka.consumer.retry.maxRetries int Max retries consuming the offset. 4 10 SDA_KAFKA_CONSUMER_RETRY_INITIAL_MAXRETRIES
sda.kafka.consumer.dlt.pattern string Pattern of consumer dead letter topic. <topic> will be replaced by topic name. If not set, "dlt-" is added to the topic name as prefix. If set to empty the spring-boot fallback of <topic>.DLT is used. "dlt-<topic>" "prefix-<topic>" SDA_KAFKA_CONSUMER_DLT_PATTERN
management.health.kafka.enabled boolean Flag to enable kafka health check. true false MANAGEMENT_HEALTH_KAFKA_ENABLED
management.health.kafka.timeout duration Allowed duration for health check to finish. 4s "5000ms" MANAGEMENT_HEALTH_KAFKA_TIMEOUT
spring.kafka.bootstrap.servers string Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. "localhost:9092" "kafka-broker:9092" SPRING_KAFKA_BOOTSTRAP_SERVERS
spring.kafka.security.protocol string The security protocol used by Kafka. Please note that SASL mechanism requires some manual configuration. "PLAINTEXT" "SSL" SPRING_KAFKA_SECURITY_PROTOCOL
spring.kafka.ssl.keystore-location url Location of the SSL keystore file. "file:///kafka/kafka.client.keystore.jks" SPRING_KAFKA_SSL_KEYSTORELOCATION
spring.kafka.ssl.key-store-password string Password for the SSL keystore file. "s3cr3t" SPRING_KAFKA_SSL_KEYSTOREPASSWORD
spring.kafka.ssl.trust-store-location string Location of the SSL truststore file. "file:/kafka-certs/kafka.client.keystore.jks" SPRING_KAFKA_SSL_TRUSTSTORELOCATION
spring.kafka.ssl.trust-store-password string Password for the SSL truststore file. "s3cret" SPRING_KAFKA_SSL_TRUSTSTOREPASSWORD
spring.kafka.consumer.group-id string Consumer group name of Kafka Consumer. "default" "my-service-name" SPRING_KAFKA_CONSUMER_GROUPID

Make sure to overwrite spring.kafka.consumer.group-id in your application.properties otherwise you could have conflicts with other services using default.

For further information have a look to the Spring Kafka reference documentation.

Default configuration set by this library
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
spring.kafka.consumer.group-id=default
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.apache.kafka.common.serialization.ByteArrayDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

### SDA SPECIFIC CONFIGURATION
sda.kafka.consumer.retry.initialBackoffInterval=1000
sda.kafka.consumer.retry.maxBackoffInterval=4000
sda.kafka.consumer.retry.backoffMultiplier=2
sda.kafka.consumer.retry.maxRetries=4
sda.kafka.consumer.dlt.pattern=dlt-<topic>

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

Consumer configuration

The autoconfigured consumer configuration provides several ConcurrentKafkaListenerContainerFactory<String, ?> which can be referenced in @KafkaListener annotated methods.

  • SdaKafkaListenerContainerFactory.LOG_ON_FAILURE
    • Simply logs the exception; with a record listener, the remaining records from the previous poll are passed to the listener.
  • SdaKafkaListenerContainerFactory.RETRY_AND_LOG
    • Skips record that keeps failing after sda.kafka.consumer.retry.maxRetries(default: 4) and log exception.
  • SdaKafkaListenerContainerFactory.RETRY_AND_DLT
    • Skips record that keeps failing after sda.kafka.consumer.retry.maxRetries (default: 4) and produces failed record to the configured dlt topic.
    • The spring DLT naming convention can be configured using the sda.kafka.consumer.dlt.pattern property. The pattern must contain <topic>, which will be replaced by the actual topic name. If not configured, it defaults to dtl-<topic>. As mentioned in the migration guide, the sda.kafka.consumer.dlt.pattern property can also be configured as empty to force the spring boot default implementation of <topic>.DLT as a fallback to not break older implementations.

      So there are three possible configurations: - sda.kafka.consumer.dlt.pattern not configured, fallback to sda default and resulting dlt topic: dlt-example - sda.kafka.consumer.dlt.pattern configured as empty, fallback to spring default and resulting dlt topic: example.DLT - sda.kafka.consumer.dlt.pattern configured with <topic>-customSuffix, resulting dlt topic: example-customSuffix

To skip retry for specific business errors, you can throw the custom NotRetryableKafkaException.

Each containerFactory expects a message key as String and the message payload of any type. The payload is deserialized as byte array and converted with the ByteArrayJsonMessageConverter. The ack mode for offsets is per default RECORD where the offset after each record is processed by the listener.

1
2
3
4
5
6
7
8
9
@KafkaListener(
  topics = "TestTopic",
  containerFactory = SdaKafkaListenerContainerFactory.RETRY_AND_LOG)
public void retryAndLog(@Payload @Valid Message message) {
      // doSomething
    if(businessError) {
      throw new NotRetryableKafkaException();
    } 
}

DLT Error Handling

To allow handling of serialization exceptions, the DLT KafkaTemplate is using a ByteArraySerializer. You can add additional templates used for uploading messages to the dead-letter-topic for records that were deserialized successfully, by overwriting the dltTemplates bean. see spring-kafka documentation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
@Import({SdaKafkaConsumerConfiguration.class})
public class DltConfiguration {

  @Autowired
  @Qualifier("kafkaByteArrayDltTemplate")
  private KafkaTemplate<String, ?> recoverTemplate;

  @Autowired
  private KafkaTemplate<String, ?> myCustomTemplate;


  @Bean
  public Map<Class<?>, KafkaOperations<?, ?>> dltTemplates() {
    Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
    templates.put(MyCustomClass.class, myCustomTemplate);
    templates.put(byte[].class, recoverTemplate);
    return templates;
  }
}

Producer configuration

The autoconfigured producer configuration provides a preconfigured KafkaTemplate for producing messages with String key and payload as json.

To configure different serializers, use spring.kafka.producer.key-serializer and spring.kafka.producer.value-serializer properties

ProducerFactory

Do not hard code ((DefaultKafkaProducerFactory<?, ?>) producerFactory) .setValueSerializer(new JsonSerializer<>(objectMapper)); of the default spring producer factory. It will affect other configurations, we recommend to use a copy of the producer factory as in the example below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@Bean("kafkaByteArrayDltTemplate")  
public KafkaTemplate<String, ?> kafkaByteArrayDltTemplate(ProducerFactory<String, ?> producerFactory) {

  Map<String, Object> props = new HashMap<>(commonProperties);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
  ProducerFactory<String, ?> producerFactoryByte =
      producerFactory.copyWithConfigurationOverride(props);


  return new KafkaTemplate<>(producerFactoryByte, props);
}

You need to autowire the KafkaTemplate using a Qualifier.

1
@Qualifier("kafkaByteArrayDltTemplate") KafkaTemplate<String, ?> recoverTemplate,