Kafka And Zookeeper Multi Node Cluster Setup

Kafka is a distributed streaming platform that is used to build real-time pipelines and streaming apps. It is a good replacement for traditional message broker. For applications with large-scale message processing Kafka is the most preferred. It is used by very large applications like twitter, linkedin, uber etc; And Zookeeper is a centralized service that maintains configuration information, naming and provides he intention of this blog, how to set up a Kafka-Zookeeper multi-node cluster for message streaming process.

If you want to implement high availability in a production environment, the Apache Kafka server cluster must consist of multiple servers.

For a cluster to be always up and running, majority of the nodes in the cluster should be up. So, it is always recommended to run zookeeper cluster in odd number of servers.

In this blog, I’ll set up Kafka zookeeper cluster with 3 nodes.

kafka_diagram

What is Kafka?

Kafka is used for building real-time data pipelines and streaming apps.

What is Zookeeper?

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications.

Prerequisites:

  1. Install Java(Minimum 1.7).
  2. Kafka zookeeper binary files.

Install Java:

Install the java in all instances

In Ubuntu,

Add the PPA using below command

$ sudo add-apt-repository ppa:webupd8team/java

ppa:webupd8team/java

Run commands to update system package index and install Java installer script:

$ sudo apt-get update

$ sudo apt-get install oracle-java8-installer

In CentOS,

$ yum install java-1.8.0-openjdk

check the version

$ java -version

Install Zookeeper:

Download the Zookeeper binaries on your all instances and extract them.

$ wget http://mirror.cc.columbia.edu/pub/software/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz

$ tar -xvf zookeeper-3.4.10.tar.gz

$ ln -sfn zookeeper-3.4.10 /opt/zookeeper

$ rm zookeeper-3.4.10.tar.gz

Install Kafka:

Download the Kafka binaries on your all instances and extract them.

$ wget http://mirror.cc.columbia.edu/pub/software/apache/kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz

$ tar -xvf kafka_2.11-0.11.0.0.tgz

$ ln -sfn kafka_2.11-0.11.0.0 /opt/kafka

$ rm kafka_2.11-0.11.0.0.tgz


Update zookeeper properties:

Create zookeeper.properties file using below command in all instances.

$ touch

/opt/zookeeper/conf/zookeeper.properties

And update the zookeeper.properties file with below content in all instances.

dataDir=/tmp/zookeeper

clientPort=2181

maxClientCnxns=200

tickTime=2000

server.1=x.x.x.x:2888:3888

server.2=x.x.x.x:2888:3888

server.3=x.x.x.x:2888:3888

initLimit=20

syncLimit=10

  • The value of dataDir with the directory where you would like ZooKeeper to save its data and log respectively.
  • clientPort property, as the name suggests, is for the clients to connect to ZooKeeper Service.
  • x in server.x denotes the id of node. Each server.x row must have unique id. Each server is assigned an id by creating a file named myid, one for each server, which resides in that server’s data directory, as specified by the configuration file parameter dataDir.

Create myid file:

$ mkdir /tmp/zookeeper/ -p

$ touch /tmp/zookeeper/myid

$ echo '1' >> myid #Add Server ID for Respective Instances i.e. "server.1, server.2 and server.3"

  • The ports, :2888:3888(Don’t change) that is at the end of the nodes. Zookeeper nodes will use these ports to connect the individual follower nodes to the leader nodes. The other port is used for leader election.
  • And x.x.x.x is each node IP Address better to use private IP here. If you are trying to use public IP, current node IP should be replaced with 0.0.0.0 in each node.

Update Kafka server properties:

Update kafka server.properties file in all instances with below content. This file is located in /opt/kafka/config/server.properties

broker.id=1

# With help of this we can consume outside of instance

advertised.host.name=x.x.x.x # current node public IP or hostname

# Enter the zookeeper quorum details as below

zookeeper.connect=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181

  • The broker.id property is the unique and permanent name of each node in the cluster.
  • advertised.host.name Hostname to publish to ZooKeeper for clients to use.
  • zookeeper.connect Specifies the ZooKeeper connection string in the form hostname:port where host(better to use private IP of each node) and port are the host and port of a ZooKeeper server.

Start the services:

Before starting the Kafka service, start the zookeeper service using below command in all instances

$ /opt/zookeeper/bin/zkServer.sh start /opt/zookeeper/conf/zookeeper.properties


Check the zookeeper status using below command. In these nodes, any of one zookeeper service will act as a leader remaining will be followers.

$ /opt/zookeeper/bin/zkServer.sh status

Start the kafka service using below command in all instances

$ /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties


Test the services from terminal:

Create the topic using below command

$ /opt/kafka/bin/kafka-topics.sh --create --zookeeper x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181 --replication-factor 1 --partitions 1 --topic sample_test

Get the list of topics using below command

$ /opt/kafka/bin/kafka-topics.sh --list --zookeeper x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181

Consume the messages using below command

$ /opt/kafka/bin/kafka-console-consumer.sh --zookeeper x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181 --topic sample_test --from-beginning

It will keep on listen, currently, there are messages in that topic.

Note: we have to use –zookeeper option with any one or list of clustered node IP address’s or domain’s while creating topics, listing topics and consuming message.

Produce the messages using below command(open the new terminal and try it)

$ /opt/kafka/bin/kafka-console-producer.sh --broker-list  x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092 --topic sample_test

It will keep on listen, type the message and hit the enter and now check in previous consumer terminal.

It’s possible to test these operations from other systems(out of cluster nodes) if advertised.host.name property is configured by public IP in Kafka server properties.

READ NOW >>  Angular vs React - Which Is Best For Web Development