Skip to content

SDA Commons Server Kafka Testing

javadoc

The module sda-commons-server-kafka-testing is the base module to add unit and integration test for Kafka broker usage.

It includes the dependencies to sda-commons-server-testing module.

The kafka-junit5 library provides means for easily setting up a Kafka broker that can be reconfigured easily by using the following class extension:

1
2
3
@RegisterExtension
@Order(0) // Start the broker before the app
static final SharedKafkaTestResource KAFKA = new SharedKafkaTestResource().withBrokers(2);

Test support with random broker ports

The usage of random ports allows to execute tests in parallel and reduce the probability of port conflicts, e.g. when local-infra is also started.

The example above starts two Kafka brokers within a cluster. To test your application, you have to configure these servers as bootstrap servers. This is normally done via the configuration YAML file within the property kafka -> brokers.

You can override these properties programmatically using config overrides when creating your DropwizardAppExtension:

Example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import com.salesforce.kafka.test.junit5.SharedKafkaTestResource;
import io.dropwizard.testing.junit5.DropwizardAppExtension;
// ...

class KafkaJUnit5IT {

  @RegisterExtension
  @Order(0) // Start the broker before the app
  static final SharedKafkaTestResource KAFKA = new SharedKafkaTestResource().withBrokers(2);

  @RegisterExtension
  @Order(1)
  static final DropwizardAppExtension<KafkaTestConfiguration> DW =
      new DropwizardAppExtension<>(
          KafkaTestApplication.class,
          resourceFilePath("test-config.yaml"),
          config("kafka.brokers", KAFKA::getKafkaConnectString) // Override the Kafka brokers
      );

}

Create topics

When setting up your test class you might have problems if producers or consumers want to register with a topic that does not exist. Do ease the creation of topics you can use the following Junit 5 extension. Make sure that this class extension is executed after the Kafka server was started but before your application starts up. Example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
  @RegisterExtension
  @Order(0)
  static final SharedKafkaTestResource KAFKA = new SharedKafkaTestResource();

  @RegisterExtension
  @Order(1)
  static final CreateKafkaTopicsClassExtension TOPICS =
      new CreateKafkaTopicsClassExtension(KAFKA, Arrays.asList("foo", "bar"));

  @RegisterExtension
  @Order(2)
  static final DropwizardAppExtension<KafkaExampleConfiguration> DW =
      new DropwizardAppExtension<>(
          KafkaExampleProducerApplication.class,
          resourceFilePath("test-config-producer.yml"),
          config("kafka.brokers", KAFKA::getKafkaConnectString));