Kafka Streams Static Membership

June 17, 2020

Kafka Streams (KStreams)

For Kafka applications in Java, Kotlin, or Scala, the Kafka Streams client library is extremely powerful. It allows developers to build scalable, fault-tolerant, high-throughput applications and microservices more easily. It can handle both stateless and stateful use cases.

Until recently, deploying stateful KStreams applications that minimized downtime was relatively difficult. Kafka Streams uses a rebalancing protocol1 that adjusts the workload among the workers when a worker joins or leaves, such as during a deployment. For a stateless application, this rebalance is relatively quick – the topics’ partitions get distributed across the workers’ threads and each picks up at the last committed offset.

However, for a stateful application this rebalance can take a lot more time. Each worker can have a local state store2 that is optionally backed by an internal changelog topic3. This state store can have a lot of data that must be streamed and recreated from the changelog topic. This rebalance and restore process stops the worker until complete.

Static Membership

Due to the high time cost of restoring state, stateful Kafka Streams applications benefit greatly from re-using the existing state stores instead of recreating them each time they are deployed. This is the idea behind Static Membership4.

With Static Membership enabled, workers will be re-assigned the topics’ partitions that they were already assigned. When that happens, the state stores can be re-used and processing can begin again almost immediately.

Making It Work

To use Static Membership, Kafka Brokers need to be on at least version 2.3 with inter.broker.protocol.version at 2.3 or higher. The Kafka Streams client library also needs to be on 2.3 or higher.5

There are also a few minor caveats to the above assertions.

First, the number of workers and threads needs to be the same. If a new worker joins, or an existing worker leaves, a normal rebalance occurs.

Workers need to have consistent identifiers, which are set using the consumer configuration group.instance.id. If a worker has a different identifier, the Kafka brokers will see it as a new worker and will trigger a rebalance.

Most often this is the worker’s hostname, so long as that name remains constant between deployments.

        Properties streamsSettings = new Properties();
        try {
            String hostname = InetAddress.getLocalHost().getHostName();
            streamsSettings.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, hostname);
        } catch (UnknownHostException ignored) {
            // Not setting group instance id
        }

Additionally, a longer session timeout is necessary. Set this value high enough to allow workers time to start, restart, deploy, etc. Otherwise, the application may get into a restart cycle if it misses too many heartbeats during normal operations. Setting it too high may cause long periods of partial unavailability if a worker dies, and the workload is not rebalanced. Each application will set this value differently based on its own availability needs.

This configuration is the session.timeout.ms6 setting for consumers.

        streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);

Summary

Enabling Static Membership in Kafka Streams adds to the already very resilient and fault-tolerant nature of a KStreams app. No longer will a simple deployment put the application out of commission while streaming and recreating the entirety of the local state stores. Additionally, the application better handles normal operations that might cause a hiccup in the heartbeats to the Kafka brokers.


  1. For a great overview of the rebalance protocol, check out this article on Incremental Cooperative Rebalancing. [return]
  2. Local State Stores [return]
  3. Local state in Kafka Streams is out of scope for this article. More information is available in the Kafka Developer Guide [return]
  4. Introduced in Kafka 2.3, you can see more detail here and in the official KIP-345 [return]
  5. Opting into Static Membership [return]
  6. Example setting session.timeout.ms [return]