Kafka is a distributed streaming platform that is used to build real-time pipelines and streaming apps. It is a good replacement for traditional message broker. For applications with large-scale message processing Kafka is the most preferred. It is used by very large applications like twitter, linkedin, uber etc; And Zookeeper is a centralized service that maintains configuration information, naming and provides he intention of this blog, how to set up a Kafka-Zookeeper multi-node cluster for message streaming process.
If you want to implement high availability in a production environment, the Apache Kafka server cluster must consist of multiple servers.
For a cluster to be always up and running, majority of the nodes in the cluster should be up. So, it is always recommended to run zookeeper cluster in odd number of servers.
In this blog, I’ll set up Kafka zookeeper cluster with 3 nodes.
What is Kafka?
Kafka is used for building real-time data pipelines and streaming apps.
What is Zookeeper?
ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications.
- Install Java(Minimum 1.7).
- Kafka zookeeper binary files.
Install the java in all instances
Add the PPA using below command
$ sudo add-apt-repository ppa:webupd8team/java
Run commands to update system package index and install Java installer script:
$ sudo apt-get update
$ sudo apt-get install oracle-java8-installer
$ yum install java-1.8.0-openjdk
check the version
$ java -version
Download the Zookeeper binaries on your all instances and extract them.
$ wget http://mirror.cc.columbia.edu/pub/software/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
$ tar -xvf zookeeper-3.4.10.tar.gz
$ ln -sfn zookeeper-3.4.10 /opt/zookeeper
$ rm zookeeper-3.4.10.tar.gz
Download the Kafka binaries on your all instances and extract them.
$ wget http://mirror.cc.columbia.edu/pub/software/apache/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz
$ tar -xvf kafka_2.11-0.11.0.0.tgz
$ ln -sfn kafka_2.11-0.11.0.0 /opt/kafka
$ rm kafka_2.11-0.11.0.0.tgz
Update zookeeper properties:
Create zookeeper.properties file using below command in all instances.
And update the zookeeper.properties file with below content in all instances.
- The value of dataDir with the directory where you would like ZooKeeper to save its data and log respectively.
- clientPort property, as the name suggests, is for the clients to connect to ZooKeeper Service.
- x in server.x denotes the id of node. Each server.x row must have unique id. Each server is assigned an id by creating a file named myid, one for each server, which resides in that server’s data directory, as specified by the configuration file parameter dataDir.
Create myid file:
$ mkdir /tmp/zookeeper/ -p
$ touch /tmp/zookeeper/myid
$ echo '1' >> myid #Add Server ID for Respective Instances i.e. "server.1, server.2 and server.3"
- The ports, :2888:3888(Don’t change) that is at the end of the nodes. Zookeeper nodes will use these ports to connect the individual follower nodes to the leader nodes. The other port is used for leader election.
- And x.x.x.x is each node IP Address better to use private IP here. If you are trying to use public IP, current node IP should be replaced with 0.0.0.0 in each node.
Update Kafka server properties:
Update kafka server.properties file in all instances with below content. This file is located in /opt/kafka/config/server.properties
# With help of this we can consume outside of instance
advertised.host.name=x.x.x.x # current node public IP or hostname
# Enter the zookeeper quorum details as below
- The broker.id property is the unique and permanent name of each node in the cluster.
- advertised.host.name Hostname to publish to ZooKeeper for clients to use.
- zookeeper.connect Specifies the ZooKeeper connection string in the form hostname:port where host(better to use private IP of each node) and port are the host and port of a ZooKeeper server.
Start the services:
Before starting the Kafka service, start the zookeeper service using below command in all instances
$ /opt/zookeeper/bin/zkServer.sh start /opt/zookeeper/conf/zookeeper.properties
Check the zookeeper status using below command. In these nodes, any of one zookeeper service will act as a leader remaining will be followers.
$ /opt/zookeeper/bin/zkServer.sh status
Start the kafka service using below command in all instances
$ /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Test the services from terminal:
Create the topic using below command
$ /opt/kafka/bin/kafka-topics.sh --create --zookeeper x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181 --replication-factor 1 --partitions 1 --topic sample_test
Get the list of topics using below command
$ /opt/kafka/bin/kafka-topics.sh --list --zookeeper x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181
Consume the messages using below command
$ /opt/kafka/bin/kafka-console-consumer.sh --zookeeper x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181 --topic sample_test --from-beginning
It will keep on listen, currently, there are messages in that topic.
Note: we have to use –zookeeper option with any one or list of clustered node IP address’s or domain’s while creating topics, listing topics and consuming message.
Produce the messages using below command(open the new terminal and try it)
$ /opt/kafka/bin/kafka-console-producer.sh --broker-list x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092 --topic sample_test
It will keep on listen, type the message and hit the enter and now check in previous consumer terminal.
It’s possible to test these operations from other systems(out of cluster nodes) if advertised.host.name property is configured by public IP in Kafka server properties.