Apache Kafka Raft (KRaft, pronounced craft) is the consensus protocol that was introduced in KIP-500 to remove Apache Kafka’s dependency on ZooKeeper for metadata management.

KRaft mode makes use of a new quorum controller service in Kafka which replaces the previous controller and makes use of an event-based variant of the Raft consensus protocol. [1]

KRaft mode is production ready for new clusters as of Apache Kafka 3.3. The development progress for additional features like migration from ZooKeeper is tracked in KIP-833.

KRaft running in Isolated Mode

The KRaft controller nodes comprise a Raft quorum which manages the Kafka metadata log. This log contains information about each change to the cluster metadata. Everything that is currently stored in ZooKeeper, such as topics, partitions, ISRs, configurations, and so on, is stored in this log. [2]

1. Control plane and Data plane

A Kafka cluster can be broken down into two components: a control plane and a data plane, each with its own responsibilities that work together to transfer data where it needs to go. [3]

Control plane responsibilities include:

  • Knowing which servers are alive.

  • Making appropriate changes when a server is detected as down.

  • Storing and exchanging metadata.

Data plane responsibilities include:

  • Handling requests to produce and fetch records and other application requests.

  • Reacting to metadata changes from the control plane.

Historically, Kafka used an Apache ZooKeeper cluster to provide most of its control plane functionality. ZooKeeper tracks each broker and provides replicated and consistent storage for the cluster metadata. ZooKeeper also elects one Kafka broker to be the controller. The controller has extra, non data plane duties to manage the state of the cluster, such as responding to brokers that crash or restart.

ZooKeeper architecture for Kafka

The new architecture removes the ZooKeeper dependency and replaces it with a flavor of the Raft consensus protocol, allowing each server in the Kafka cluster to take the role of broker, controller, or both. The controller cluster will perform the same roles as the cluster of ZooKeeper nodes did previously, but the Kafka controller will now be elected from the controllers instead of the brokers.

KRaft architecture for Kafka

For a Kafka cluster to be highly available, you need to make certain both the data plane and control plane (whichever kind is being used) are highly available.

2. Install using TAR Archives

Your local environment must have Java 8+ installed.
  1. Go to Eclipse Temurin, and download JDK 17-LTS.

  2. Extract the tar to /usr/local/jdk:

    $ sudo mkdir /usr/local/jdk
    $ sudo tar xf OpenJDK17U-jdk_x64_linux_hotspot_17.0.9_9.tar.gz -C /usr/local/jdk  --strip-components=1
  3. Set JAVA_HOME in /etc/profile.d/java.sh with the following content:

    JAVA_HOME=/usr/local/jdk
    PATH=$JAVA_HOME/bin:$PATH
  4. Load the environment variables to the current shell and verify the installation:

    $ source /etc/profile
    $ java -version

2.1. Setup a standalone server in KRaft combined mode as a proof of concept.

  1. Go to https://kafka.apache.org/, download the latest Kafka:

    $ curl -LO https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
  2. Create a kafka user and extract the tar to the home:

    $ sudo useradd -m kafka # [-s /bin/bash] Specify the login shell of the new account.
    $ sudo su - kafka
    $ sudo tar xf kafka_2.13-3.6.1.tgz -C /home/kafka/ --strip-components=1
    Running Kafka as root is not a recommended configuration.
  3. Generate a Cluster UUID:

    $ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
  4. Format Log Directories:

    $ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
    Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2.
  5. Start the Kafka Server:

    $ bin/kafka-server-start.sh config/kraft/server.properties
    ...
    [2024-01-12 23:22:34,872] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Enabling request processing. (kafka.network.SocketServer)
    [2024-01-12 23:22:34,881] INFO [MetadataLoader id=1] InitializeNewPublishers: initializing ScramPublisher controller id=1 with a snapshot at offset 4 (org.apache.kafka.image.loader.MetadataLoader)
    [2024-01-12 23:22:34,911] INFO Awaiting socket connections on 0.0.0.0:9093. (kafka.network.DataPlaneAcceptor)
    ...
    [2024-01-12 23:22:36,629] INFO [SocketServer listenerType=BROKER, nodeId=1] Enabling request processing. (kafka.network.SocketServer)
    [2024-01-12 23:22:36,629] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.DataPlaneAcceptor)
    ...
    The logs (not to be confused with the commit log) are located at logs which are configured in the log4j.properties.
  6. Once the Kafka server has successfully launched:

    • Open another terminal session and create a topic:

      $ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
      Created topic quickstart-events.
      $ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
      Topic: quickstart-events	TopicId: wx6vplZjRHaJubPnPP3_QQ	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
      	Topic: quickstart-events	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
    • Run the console producer client to write a few events into your topic:

      $ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
      This is my first event
      This is my second event
    • Open another terminal session and run the console consumer client to read the events you just created:

      $ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
      This is my first event
      This is my second event

2.2. Setup a Kafka cluster in KRaft

  1. Make sure the nodes in the cluster could be reachable each other:

    You can use the hostname, DNS name, or even IP address to connect each other.
    You can run the ip a s to show the addresses assigned to all network interfaces.

    The following steps will be demostrated with the following two nodes (/etc/hosts):

    192.168.46.131	node-1
    192.168.46.132	node-2
  2. Create a kafka user and extract the tar to the home at each node:

    $ sudo useradd -m kafka # [-s /bin/bash] Specify the login shell of the new account.
    $ sudo su - kafka
    $ sudo tar xf kafka_2.13-3.6.1.tgz -C /home/kafka/ --strip-components=1
    Running Kafka as root is not a recommended configuration.
  3. Generate a Cluster UUID:

    $ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
    $ echo $KAFKA_CLUSTER_ID
    MkU3OEVBNTcwNTJENDM2Qk

    Note down the value of KAFKA_CLUSTER_ID and copy it to each node in /etc/profile.d/kafka.sh with the following content:

    KAFKA_CLUSTER_ID=MkU3OEVBNTcwNTJENDM2Qk

    Load the environment variables to the current shell with the following command:

    $ source /etc/profile
  4. Backup the orignal config directory on each node:

    $ cp -a config config.org
  5. Create log.dirs with the following commands on each node:

    $ sudo mkdir -p /var/lib/kafka
    $ sudo chown kafka:kafka /var/lib/kafka
  6. Update the config/kraft/controller.properties:

    # The node id associated with this instance's roles
    # !!! on the second node, set the node.id to be 3002.
    node.id=3001
    
    # The connect string for the controller quorum
    controller.quorum.voters=3001@node-1:9093,3002@node-2:9093
    
    # Use to specify where the metadata log for clusters in KRaft mode is placed.
    log.dirs=/var/lib/kafka/controller
    Each node ID (node.id) must be unique across all the servers in a particular cluster.
  7. Update the config/kraft/broker.properties:

    # The node id associated with this instance's roles
    # !!! on the second node, set the node.id to be 1002.
    node.id=1001
    
    # The connect string for the controller quorum
    controller.quorum.voters=3001@node-1:9093,3002@node-2:9093
    
    # The address the socket server listens on.
    listeners=PLAINTEXT://:9092
    
    # Listener name, hostname and port the broker will advertise to clients.
    # !!! on the second node, set it to be `PLAINTEXT://node-2:9092`.
    advertised.listeners=PLAINTEXT://node-1:9092
    
    # The directory in which the log data is kept。
    log.dirs=/var/lib/kafka/data
    Each node ID (node.id) must be unique across all the servers in a particular cluster.
    The advertised.listeners should be reachable by the clients outside the cluster. You could set it with a reachable hostname or DNS name, or an external IP address. [7]
  8. Format Log Directories:

    $ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/controller.properties
    Formatting /var/lib/kafka/controller with metadata.version 3.6-IV2.
    $ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/broker.properties
    Formatting /var/lib/kafka/data with metadata.version 3.6-IV2.
  9. Start the Kafka Controller and Broker on each node:

    $ bin/kafka-server-start.sh -daemon config/kraft/controller.properties
    $ bin/kafka-server-start.sh -daemon config/kraft/broker.properties

    Note that authentication is disabled for JMX by default in Kafka and security configs must be overridden for production deployments by setting the environment variable KAFKA_JMX_OPTS for processes started using the CLI or by setting appropriate Java system properties. [5][6]

    $ JMX_PORT=9101 bin/kafka-server-start.sh -daemon config/kraft/broker.properties
  10. Use the kafka-metadata-quorum tool to query the metadata quorum status.

    The following code example displays a summary of the metadata quorum:

    $ bin/kafka-metadata-quorum.sh --bootstrap-server node-1:9092 describe --status
    bin/kafka-metadata-quorum.sh --bootstrap-server node-1:9092 describe --status
    ClusterId:              MkU3OEVBNTcwNTJENDM2Qg
    LeaderId:               3002
    LeaderEpoch:            83
    HighWatermark:          779
    MaxFollowerLag:         0
    MaxFollowerLagTimeMs:   408
    CurrentVoters:          [3001,3002]
    CurrentObservers:       [1001,1002]

2.2.1. Setup Schema Registry

Schema Registry provides a centralized repository for managing and validating schemas for topic message data, and for serialization and deserialization of the data over the network. [8] [9]

The Schema Registry is not part of Apache Kafka but there are several open source options to choose from. Here we use the Confluent Schema Registry for this example. [10]

Confluent Schema Registry for storing and retrieving schemas

Schema Registry lives outside of and separately from your Kafka brokers. Your producers and consumers still talk to Kafka to publish and read data (messages) to topics. Concurrently, they can also talk to Schema Registry to send and retrieve schemas that describe the data models for the messages. [14]

Schema Registry is a distributed storage layer for schemas which uses Kafka as its underlying storage mechanism. Some key design decisions:

  • Assigns globally unique ID to each registered schema. Allocated IDs are guaranteed to be monotonically increasing and unique, but not necessarily consecutive.

  • Kafka provides the durable backend, and functions as a write-ahead changelog for the state of Schema Registry and the schemas it contains.

  • Schema Registry is designed to be distributed, with single-primary architecture, and ZooKeeper/Kafka coordinates primary election (based on the configuration).

2.2.1.1. Installation
  1. Download Confluent Platform using only Confluent Community components by using the curl command:

    $ curl -O https://packages.confluent.io/archive/7.5/confluent-community-7.5.3.tar.gz
  2. Extract the contents of the archive to /home/kafka/confluent:

    $ mkdir /home/kafka/confluent
    $ tar xf confluent-community-7.5.3.tar.gz -C /home/kafka/confluent/ --strip-components=1
    $ cd /home/kafka/confluent
    $ cp -a etc/ etc.org
  3. Navigate to the Schema Registry properties file (etc/schema-registry/schema-registry.properties) and specify or update the following properties:

    # Specify the address the socket server listens on, e.g. listeners = PLAINTEXT://your.host.name:9092
    listeners=http://0.0.0.0:8081
    
    # The advertised host name. Make sure to set this if running Schema Registry with multiple nodes.
    host.name=node-1
    
    # List of Kafka brokers to connect to, e.g. PLAINTEXT://hostname:9092,SSL://hostname2:9092
    kafkastore.bootstrap.servers=PLAINTEXT://node-1:9092,PLAINTEXT://node-1:9092

Schema Registry on Confluent Platform can be deployed using a single primary source, with either Kafka or ZooKeeper leader election. You can also set up multiple Schema Registry servers for high availability deployments, where you switch to a secondary Schema Registry cluster if the primary goes down, and for data migration, one time or as a continuous feed. [13]

  1. Start Schema Registry. Run this command in its own terminal:

    $ bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties
  2. View the runtime logs of Schema Registry:

    $ tail -f logs/schema-registry.log
    [2024-01-13 01:58:05,916] INFO DefaultSessionIdManager workerName=node0 (org.eclipse.jetty.server.session)
    [2024-01-13 01:58:05,916] INFO No SessionScavenger set, using defaults (org.eclipse.jetty.server.session)
    [2024-01-13 01:58:05,918] INFO node0 Scavenging every 600000ms (org.eclipse.jetty.server.session)
    [2024-01-13 01:58:06,798] INFO HV000001: Hibernate Validator 6.1.7.Final (org.hibernate.validator.internal.util.Version)
    [2024-01-13 01:58:07,291] INFO Started o.e.j.s.ServletContextHandler@53a84ff4{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler)
    [2024-01-13 01:58:07,319] INFO Started o.e.j.s.ServletContextHandler@5807efad{/ws,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler)
    [2024-01-13 01:58:07,349] INFO Started NetworkTrafficServerConnector@65a15628{HTTP/1.1, (http/1.1, h2c)}{0.0.0.0:8081} (org.eclipse.jetty.server.AbstractConnector)
    [2024-01-13 01:58:07,354] INFO Started @9485ms (org.eclipse.jetty.server.Server)
    [2024-01-13 01:58:07,355] INFO Schema Registry version: 7.5.3 commitId: 03b675da443c5687684ecae6736d873560f7c441 (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain)
    [2024-01-13 01:58:07,356] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain)
  3. Show the _schemas information:

$ bin/kafka-topics.sh --describe --topic _schemas --bootstrap-server node-1:9092
Topic: _schemas	TopicId: 9A_-36hMRYuTfUyhQwMm6Q	PartitionCount: 1	ReplicationFactor: 2	Configs: cleanup.policy=compact,segment.bytes=1073741824
	Topic: _schemas	Partition: 0	Leader: 1001	Replicas: 1001,1002	Isr: 1001,1002

2.2.2. Setup UI for Apache Kafka

UI for Apache Kafka is a free, open-source web UI to monitor and manage Apache Kafka clusters. [15]

$ docker run -it -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true provectuslabs/kafka-ui

3. Install using Docker

Make sure the nodes in the cluster could be reachable each other.
You can use the hostname, DNS name, or an external IP address to connect each other.
You can run the ip a s to show the addresses assigned to all network interfaces.
The following steps will be demostrated with the following two nodes:
192.168.56.131 node-1
192.168.56.132 node-2
  1. Optional: Install Docker Engine

    See https://docs.docker.com/engine/install/ to install Docker Engine.

    You might need to configure the Docker daemon to use a different data directory (by default: /var/lib/docker on Linux) and the log driver options.
    1. Create the configuration file at /etc/docker/daemon.json with the following content:

      {
        "data-root": "/mnt/docker-data",
        "log-opts": {
          "max-file": "5",
          "max-size": "10m"
         }
      }
    2. Restart Docker:

      sudo systemctl start docker
    3. Show Docker version:

      $ sudo docker --version
      Docker version 24.0.7, build afdd53b
  2. Optional: Stop and disable the firewalld.service.

    • View the current status:

      sudo firewall-cmd --state
    • Stop the FirewallD service:

      sudo systemctl stop firewalld.service
    • List the rules:

      $ sudo iptables -L
      Chain INPUT (policy ACCEPT)
      target     prot opt source               destination
      
      Chain FORWARD (policy ACCEPT)
      target     prot opt source               destination
      
      Chain OUTPUT (policy ACCEPT)
      target     prot opt source               destination
    • Disable the FirewallD service

      sudo systemctl disable firewalld.service
  3. Optional: Generate a Cluster UUID:

    $ KAFKA_CLUSTER_ID="$(docker run --rm confluentinc/cp-kafka:7.5.3 kafka-storage random-uuid)"
    $ echo $KAFKA_CLUSTER_ID
    MkU3OEVBNTcwNTJENDM2Qg
  4. Copy the docker/ directory to all the nodes in the Kafka cluster:

  5. Start the controllers:

    On node-1:

    • Update the compose.override.yml in docker/controller/compose.override.yml:

      version: "2.4"
      services:
        controller:
          environment:
            KAFKA_NODE_ID: 3001
            KAFKA_CONTROLLER_QUORUM_VOTERS: '3001@node-1:9093,3002@node-2:9093'
            CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qg'
          extra_hosts:
            - "node-1:192.168.56.131"
            - "node-2:192.168.56.132"
      Update the CLUSTER_ID with the KAFKA_CLUSTER_ID that generated at the above step.
      Each node ID (KAFKA_NODE_ID) must be unique across all the nodes in a particular cluster.
    • Start the Kraft controller:

      cd docker/controller
      docker compose up -d

      On node-2:

    • Repeat the above steps and update the KAFKA_NODE_ID with 3002.

  6. Start the brokers:

    On node-1:

    • Update the compose.override.yml in docker/broker/compose.override.yml:

      version: "2.4"
      services:
        controller:
          environment:
            KAFKA_NODE_ID: 1001
            KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://node-1:9092'
            KAFKA_CONTROLLER_QUORUM_VOTERS: '3001@node-1:9093,3002@node-2:9093'
            CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qg'
          extra_hosts:
            - "node-1:192.168.56.131"
            - "node-2:192.168.56.132"
      Update the CLUSTER_ID with the KAFKA_CLUSTER_ID that generated at step 2.
      Each node ID (KAFKA_NODE_ID) must be unique across all the nodes in a particular cluster.
      The KAFKA_ADVERTISED_LISTENERS should be reachable by the clients outside the cluster. You could set it with a reachable hostname or DNS name, or an external IP address.
    • Start the broker:

      cd docker/broker
      docker compose up -d
    • Use kcat to display the current state of the Kafka cluster and its topics, partitions, replicas and in-sync replicas (ISR).

      $ docker run --rm --add-host node-1:192.168.56.131 confluentinc/cp-kcat:7.5.3 -b node-1:9092 -L
      Metadata for all topics (from broker -1: node-1:9092/bootstrap):
       1 brokers:
        broker 1001 at node-1:9092 (controller)
       0 topics:
    • Use the kafka-metadata-quorum tool to view the metadata quorum status.

      $ docker run --rm --add-host node-1:192.168.56.131 confluentinc/cp-kafka:7.5.3 kafka-metadata-quorum --bootstrap-server node-1:9092 describe --status
      ClusterId:              MkU3OEVBNTcwNTJENDM2Qg
      LeaderId:               3002
      LeaderEpoch:            28
      HighWatermark:          47816
      MaxFollowerLag:         0
      MaxFollowerLagTimeMs:   32
      CurrentVoters:          [3001,3002]
      CurrentObservers:       [1001]

      On node-2:

    • Repeat the above steps and update the KAFKA_NODE_ID with 1002, and KAFKA_ADVERTISED_LISTENERS with 'PLAINTEXT://node-2:9092'.

    • Use kcat to display the current state of the Kafka cluster and its topics, partitions, replicas and in-sync replicas (ISR).

      $ docker run --rm --add-host node-2:192.168.56.132 confluentinc/cp-kcat:7.5.3 -b node-2:9092 -L
      Metadata for all topics (from broker 1002: node-2:9092/1002):
       2 brokers:
        broker 1001 at node-2:9092
        broker 1002 at node-2:9092 (controller)
       0 topics:
    • Use the kafka-metadata-quorum tool to view the metadata quorum status.

      $ docker run --rm --add-host node-2:192.168.56.132 confluentinc/cp-kafka:7.5.3 kafka-metadata-quorum --bootstrap-server node-2:9092 describe --status
      ClusterId:              MkU3OEVBNTcwNTJENDM2Qg
      LeaderId:               3002
      LeaderEpoch:            28
      HighWatermark:          47816
      MaxFollowerLag:         0
      MaxFollowerLagTimeMs:   32
      CurrentVoters:          [3001,3002]
      CurrentObservers:       [1001,1002]
  7. Start the Schema Registry:

    On node-1:

    • Update the compose.override.yml in docker/schema-registry/compose.override.yml:

      version: "2.4"
      services:
        schema-registry:
          environment:
            SCHEMA_REGISTRY_HOST_NAME: node-1
            SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: node-1:9092,node-2:9092
            SCHEMA_REGISTRY_DEBUG: true
          extra_hosts:
            - "node-1:192.168.56.131"
            - "node-2:192.168.56.132"
    • Start the Schema Registry:

      cd docker/schema-registry:
      docker compose up -d

      On node-2:

    • Repeat the above steps, and replace the SCHEMA_REGISTRY_HOST_NAME with node-2 to setup a replication if you need to support high available service.

  8. Start the UI Kafka:

    On node-1:

    • Update the compose.override.yml in docker/controller/compose.override.yml:

      version: "2.4"
      services:
        kafka-ui:
          environment:
            KAFKA_CLUSTERS_0_NAME: iot
            KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: node-1:9092,node-2:9092
          extra_hosts:
            - "node-1:192.168.56.131"
            - "node-2:192.168.56.132"
    • Start the kafka-ui:

      cd docker/kafka-ui
      docker compose up -d
    • Go to http://node-1:8080 with your browser to view the cluster status.

      On node-2:

    • Repeat the above steps to setup a replication of the kafka-ui if you need to support high available service.

    • Go to http://node-2:8080 with your browser to view the cluster status.