In the previous article, we talked about what Kafka is and its ecosystem. We also talked about how Kafka works and how to get started with it. In that article, we also discussed several important concepts like Message, Producer, Topic, Broker, and Consumer. These are the basic concepts of Kafka and will be needed for all our future sessions.
Moving on, in today’s article, we’re going to discuss the administration of Kafka. The goals of this session would be Configuration files, Scalability, Performance introduction, and Topics administration. Like the previous lesson, we will discuss another set of essential keywords: Consumer group, Partition, Segment, Compaction, Retention, Deletion, ISR, and Offset.
As you remember, we discussed Consumer and Producer in our previous session. In it, I’ve mentioned that Producers and Consumers are working together to achieve a common goal. To understand how the Consumer and Producer would work and scale together, we should first understand the Consumer group.
A consumer group is composed of multiple consumers. They have a set of workers who share the same goal with cooperation to consume data or read messages from topics. Usually, your application handles the customer groups, and they are intelligent enough to cooperate together and to be coordinated into the way that you can scale them. They will read the messages from one or more partitions.
For example, Let’s assume that we want to consume from the bank account topic and the bank account topic has ten partitions. Suppose we create a consumer group that has a set of consumers, which we will read from the topic. So, if we have multiple Consumers inside the same group, they will be able to read from the same topic, but divide the number of partitions among the number of consumers in the same group.
How Partitions are tied with Producers and Consumers
As shown in the figure above, multiple producers (left side) can write into one partition or the same topic (Topic partitions). The consumer groups (right side) can read from the topic partitions. The topic partitions are divided among a consumer group.
This means that every single consumer in a group can only read from one partition.
You can see in the above-given figure that each arrow is straightly pointed to one partition. Two consumers reading from the same partition is not possible according to the architecture.
Hence, when choosing a number of partitions, developers have to keep in mind how much they want to scale their consumers.
The most important concept that you must know is that whenever you want to write to a partition or write a message, that message comes in order. For example, if you write on multiple partitions, the messages will be written in a timely manner, which means that they are sorted according to the timestamps.
Note that when there are multiple partitions, they are ordered by the time within the same partition. Hence, when you read the topic as a whole, it is never guaranteed that the messages are read in the written order.
How a message ends up in a partition
The producer is in charge of sending a message to the partition. The basic function of a partition is to grab the content, put it in a message, and send it to the topic partition. There are methods to find the corresponding partition for a message. Among them are delegating Kafka libraries and using a “producer partitioner.” A producer partitioner maps each message to the corresponding topic partition. Hence, no worries to the developer about where the message is sent as long he uses a producer partitioner.
Why partitions are shipped with Kafka
Partitions shipped with Kafka libraries always guarantee that the same kind of message (sharing the same non-empty key) is mapped to the same partition. The function used to achieve this task is explained as follows;
There will be a key inside the message. Once the message is sent to a sensor and entered, the key (For example, the key is 123). The partitioner will hash the key using the algorithm “murmur2” and divide it by the number of partitions (where the message with key 123 ends up in partition number 5).
Likewise, every message that shares the same key will end up in the same partition. Therefore, it is important to use something that changes over time as the key. For instance, you can add the timestamp as the key.
Note that all the messages over time will spread among the partitions because it is important to understand as a concept that partitions are logical parts of a topic. Being a logical part, we should expect that topic into wall is divided in size by the number of partitions. In other words, if our topic is 100 GB with ten partitions, over time, each partition will be around 10 GB in size. This way, the load can split among the brokers.
We will see slightly a bit more about what happens when brokers don’t add the same amount of data; that might lead to disasters. It is our responsibility to keep an eye on topics, the size, the wall, and the partition size per each topic and broker size.
What is a Kafka Segment
Each partition represents a segment in Kafka. So basically, we will have active segments as many as partitions by topics.
Now let’s consider Kafka installation with only one topic, which is ten partitions. When Kafka starts, it will basically create ten segments. So there will be ten open files. Every time we write into one of those files, Kafka will send the data to that specific segment. The segments grow in size over time.
At some point, when we close a segment or archive it and we create a new segment. This is very important for Kafka because Kafka only keeps on writing on the active segment. The old segment will be considered closed. If you want to read the old information from the topic and partition, Kafka will also retrieve the old segment. But, Kafka doesn’t consider it is handy to keep a segment open forever. The reason for this is a performance reason and logical reason for Kafka. Basically, every time Kafka has to clean up the data, it will look in old segments.
Why you might want compaction
The concept of segments is important to be understood as it is tied with the concepts of compaction and deletion. If you are familiar with the database, we can relate it to an update of a record. Basically, what’s happening is that sometimes we might want our applications or our users to have the visibility of the last value of a record. We don’t want to record all the historicals but only to see the last state.
For example, if we have an appointment with our doctor, we call and set the appointment for tomorrow. When we want to change the appointment, we just call again to change it to the day after tomorrow. So, there will be two messages created for Kafka; the first is the “Appointment_no1” and the second is “Appointment_no1_Update”. Now the user is not interested in knowing when was the first appointment. He is just interested in when he has to go to the doctor (the day after tomorrow).
To know how compaction relates to this when the second message is generated, Kafka will check whether it already had a message like that. If the answer is a “Yes”, the older message is marked as deleted, and the new message is kept.
Why you might not want compaction
What we have to consider is that compaction is not for everybody and not for old situations. For example, if we want to record the temperature of a sensor, we don’t want compaction. Because we need the historical data of the sensors. Hence, if your topic is recording any historical data, then never use compaction.
If you want only the latest status, use compaction.
Note that compaction is triggered every number of minutes, which is configurable, and the records that are deleted are not the ones in the active segment. As mentioned earlier, Kafka only writes on the active segment, and once it is archived, it won’t be active any longer. When we do compaction, we can write to a file that is not opened. So it is difficult for Kafka to delete records within the same segment.
So when you consume your messages from scratch, you will still see the last record. So the application will always be written in a way that only the last message is kept.
Deletion is similar to compaction when it comes to non-active segments. But the concept is different. The deletion is an expiration of your messages. Going back to the last example of sensors, we only need to keep the data on the temperature of the last month. So, Kafka deletion will be configured to be triggered after one month of data. So today, we will be going for one month + one day ago of messages and so on.
More on compaction and deletion will be discussed in the near future.
ISR stands for In Sync Replica.
What is a replica in the first place?
A replica is where your data is replicated. For example, if we have 3 brokers and 1 topic with 1 partition, we can tell Kafka, ‘do replica 3’. Then every bit of the topic will be replicated over the 3 brokers. This is very important because in case 1 broker goes down, we still have 2 copies of the data.
Now, when every time we are going to write to this topic, the data will be physically replicated to the other two brokers. So at every point in time, there will be 3 copies of our data given that the brokers can keep writing and they are up and running.
When we are going to query the status of our data, Kafka will tell us that there are three in sync replicas. There is the first itself, which is considered as the in sync replica, and then there are two more copies. If one broker goes down, then the in sync replicas will only be two.
There is also the possibility that one broker cannot keep up with the rise because it has a slower disk, network failures, or the network is insufficient between the main broker and the replica, so replicas won’t be in sync any longer. That is an important concept to understand because if a replica is not in sync, that means that we don’t have 3 copies of our data but less.
ISR – From the official documentation
The ISR official documentation says about In Sync Replica as follows;
- “As with most distributed systems, automatically handling failures requires having a precise definition of what it means for a node to be “alive”. For Kafka, node liveness has two conditions.
- A node must be able to maintain its session with ZooKeeper (via ZooKeeper’s heartbeat mechanism)
- If it is a follower, it must replicate the writes happening on the leader and not fall “too far” behind.
According to the above-given points, they need a program to understand when a node is live and when it is not. So they introduced the mechanism where every broker keeps a session with a ZooKeeper via its heartbeat. Let’s say it’s a few seconds by default, and the broker will let ZooKeeper know that it is alive via messages.
If one of these messages is lost, then the ZooKeeper starts to question whether the broker is alive. If a number of heartbeats are lost, then the ZooKeeper will mark that broker as not alive and is not in sync any longer.
In another condition where the broker is marked “not in sync” is when it is too far behind.
More replicas mean we can have a better performance. Because more producers are spread over multiple replicas, we can fish data over more brokers. If one broker goes down, another one can take over, so we reach high availability by just setting a replica more than one. So we have 3 brokers, and if one broker goes down, then we will have another broker taken to replace the other.
Every partition will be replicated a number of times. One of the three replicas will be the leader; that is, we call it the “main replica”. So, when you are going to write, you will be writing to the leader. The leader will forward the writes to the followers, the other replicas.
For example, if we have Kafka 0,1 and 2, and the leader is 0. Every time we write to 0, the write is also sent to 1 and 2. Assume that Kafka 0 goes down, so the leader is unreachable. There will be one In Sync Replica and a replica that is not marked by ZooKeeper and is up-to-date with the latest data, then one of them will be the new leader.
If we have the same number of replicas and brokers, and when one broker goes down, you are left with 2 brokers. For example, let’s assume that we have 5 brokers and 3 replicas. When our leader or one In Sync Replica dies, there will be a rebalancing. The rebalancing is the operation of spreading the data again among the survivors.
So, if our partition is replicated three times on Kafka 0,1, and 2, and one of the three dies, then Kafka 3 will take over. When it takes over, it has to grab all the data that is missing. This means that the very Kafka instance will have a very intensive write on the disk, and the instance that you are reading from will have intensive reads. So, the disks will be busy, which results in a Denial of Service.Page Break
Every time when a message is written on Kafka, the message has a bit of metadata together with this message. Metadata is some data that serves the real data. Metadata, in this case, is an offset, so do is a timestamp. The timestamp is the exact time when a message is received by Kafka.
The timestamp is important because one feature of Kafka is to easily be able to serve the messages and syncs the specific timestamp. This feature in Kafka is in the architecture, and it is possible to achieve with a database. Every time a consumer consumes the messages, it writes the offset of the message consumed last inside an internal topic, which is called the “__consumer_offsets”
Configuration files: Best Practices
- Always refer to the official documents.
- Keep all envs equal in resources is a big advantage.
- Make sure your changes are applied (hey, k8s?!): write or do a Functional Test (FT)
- Follow DTAP – Development Testing Acceptance and Production (Different environments used in software development in the IT industry)
Configuration files – Kafka
- The configuration file is called by default as Server.properties
- The syntax of the configuration file is key: value
- There is a full list of possible settings in the official Kafka documentation – https://kafka.apache.org/documentation.
- The configuration file will create settings only for automatically created topics. The manually created ones are set by the developers.
The configuration file consists of the following configurations.
- broker.id = unique id per Broker
- Log.dir = Where your data files are located (If you move a disk, you must be aware that you should move the disk with the Log.dir to save your files)
- zookeeper.connect = IP:Port (for each zookeeper instance)
- min.insync.replicas = minimum number of ISRs you can have. When you write data on a Kafka, it then gets synced to the minimum number of replicas (minimum no = 2). If this number is not fulfilled, data cannot be served.
- default.replication.factor = only for automatically created topics
- num.partitions = only for automatically created topics (If it is not specified, it will be innovated.)
- default.replication.factor = only for automatically created Topics
- offsets.topic.replication.factor = topics containing offsets are helpful to the consumer.They are replicated and handled by Kafka internally. The suggestion is to set this number to 3.
- transaction.state.log.replication.factor = used by the producer to guarantee exactly one’s production. Whenever there’s a network issue or a problem in the broker, the one who is producing the data always knows that the data is always written once.
- auto.create.topics.enable = Enables you to create topics by themselves.
- unclean.leader.election.enable = The leader is in charge of hosting the data. When the leader dies, Kafka needs to tell the clients that there’s a new leader, so there has to be an election to decide who that would be. The problem here is that do we need to serve the data anyway if the data is old or not because there might be no In Sync Replicas. (Suggestion is not to)
- delete.topic.enable = should be disabled on Production. Enables you to manually delete topics.
- replica.lag.time.max.ms = how behind an LSR can be? Don’t keep this setting too high because having a replica with a high time is not very helpful.
Note that settings about compaction and deletions will be explained in the next session.
Be careful with some settings
Something to know about the settings such as;
- log.retention.ms is used to set how long you want to keep the data before it is deleted. There are two settings, one in milliseconds, minutes(log.retention.minutes), and hours (log.retention.hours). For how long you want it. Milliseconds cannot be set to big numbers like years, so you have to use hours.
If someone sets the log.retention to milliseconds, then milliseconds will take precedence.
- No, stackoverflow.com, please.
- Time invested in reading docs is not time wasted
- The best place to refer to is the Official Documentation:
- Watch out for the Kafka version you are using and the docs you are reading.
ZooKeeper config file
The ZooKeeper configuration file is a separate file from Kafka.
- The configuration file is called zookeeper.properties by default.
- Simpler configuration than Kafka
- The default configuration file is not for production.
- All the configuration settings are available on https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_configuration
- DataDir = where your persistent files will be
- DataLogDir = Journaling files
- ClientPort = 2181 is default and is OK
All the machines must have the form as follows.
- Server.1 = 220.127.116.11:2888:3888
- Server.1 = 18.104.22.168:2888:3888
- TickTime = the basic time unit, measured in milliseconds. It is used to regulate other settings.
- The default value is 2000.
- MaxSessionTimeout = default is 20 (Ticks).
- InitLimit = large zookeeper installation requires larger values.
Kafka and ZooKeeper use the Log4j file to log. It is not the most intuitive logging facility for operations usually. It is a Java-based logging utility used by both Kafka and Zookeeper and comes with the default install. Don’t use INFO for production. It is too verbose, and you will lose sight of what’s happening.
A bit of hands on.. – Kafka-topics.sh
Kafka topics are the main tool we have to understand what is going on. It comes with the default installation. You need ‘–zookeeper’ or ‘–bootstrap-server’ to know where zookeeper or Kafka installation is. For that, you can have Kubernetes or call the Kafka client, which has all the tools. As you can damage the cluster, you should handle it with care. If in doubt, run in dev first.
You can try Kafka topics yourself on your cluster. If you are using Kubernetes, it comes as kafka-topics (without a .sh). Run the syntax and try to connect to the cluster.
kafka-topics.sh –zookeeper 22.214.171.124:2181 –list
I was explaining the main parameters. The first thing that might be handy to know is how many topics you need.
kafka-topics.sh –zookeeper 126.96.36.199:2181 –describe –topic topic1
Topic: topic1 PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=2,max.message.bytes=10485880,unclean.leader.election.enable=false
Topic: topic1 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic – Topic name. You’ll see one line for each topic.
PartitionCount – How many partitions this topic is divided into. So you will see the partition count equal to the number of partitions you set in the configuration file if your topic is not overwriting this.
ReplicationFactor – How many times your topic is replicated over your cluster. If you have 3 brokers, the replication factor is 3 in the settings. If nobody overwrote it, you would see a replication factor of 3.
Configs – settings in configuration files.
For each topic, you will see one line per partition. Here, the topic has only one partition, and the first partition is always zero (they count from zero). So, topic 1 in partition zero has a leader who is broker 2. If you are running Kubernetes, your brokers are named from 0 to 2.
The number of replicas is 3, and we have the replicas displayed as 2,0 and 1. The order here is important because if a leader 2 dies, the next in line to take over will be 0, and the next in line will be 1.
These configurations can be changed by the administrator by passing a command in Kafka topics.
After replicas 2,0 and 1, you also have In Sync Replicas that are the most important bit of information that you need to acquire from your cluster. In Sync, Replicas must be under normal operations equal to the number of the replication factor. So basically, Replicas and InSync replicas must have the same values.
If we have 3 replicas planned, but In Sync Replica only has 2 items (let’s say 2 and 0). That means in that particular topic, broker number 1 is not up-to-date. We have to monitor In Sync Replicas and check and sync replicas. It’s up to you to find out what’s going on. It might be a slow disk, a bottleneck in the network, etc.
Example of a different topic
Topic: __consumer_offsets PartitionCount: 50 ReplicationFactor: 3
Topic: __consumer_offsets Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 2,0,1
Topic: __consumer_offsets Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: __consumer_offsets Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 2,0,1
This topic has 50 partitions. That’s the default number of partitions for the internal topics. For example, where the custom offsets are stored, we see 50 lines from 1 to 49. The leader changes from partition to partition. The partitions are spread evenly among all the topics. It is important that partitions have fairly similar amounts of data in each of them.
Kafka-topics.sh can be used for:
- Describe – specify how many partitions you want
- Show Topics configured in a particular way (–topics-with-overrides)
- Show what is not properly configured currently.
Tips and tricks
- Delete can return OK, but nothing happens if delete is not allowed…
- Delete does not actually delete, not immediately…
- Homebrew monitoring? Kafka-topics.sh is a good ally.