Kafka Producer/Consumer with Message key and offset
Objectives
- Use message keys to keep message streams sorted in their original publication state/order
- Use consumer offset to control and track message sequential positions in topic partitions
Start ZooKeeper
ZooKeeper is required for Kafka to work. Start the ZooKeeper server.
#For linux/MacOS
cd kafka_2.13-3.1.0
bin/zookeeper-server-start.sh config/zookeeper.properties#For windows system
D:\software\kafka_2.13-3.1.0\bin\windows\zookeeper-server-start.bat kafka_2.13-3.1.0\config\zookeeper.properties
When ZooKeeper starts you should see an output like this:
You can be sure it has started when you see an output like this:
ZooKeeper, as of this version, is required for Kafka to work. ZooKeeper is responsible for the overall management of a Kafka cluster. It monitors the Kafka brokers and notifies Kafka if any broker or partition goes down, or if a new broker or partition comes up.
Start Apache Kafka Server
Start a new terminal.
Run the following command to start the Kafka server
#For linux/MacOS
cd kafka_2.13-3.1.0
bin/kafka-server-start.sh config/server.properties#For windows system
D:\software\kafka_2.13-3.1.0\bin\windows\kafka-server-start.bat kafka_2.13-3.1.0\config\server.properties
When Kafka starts, you should see an output like this:
You can be sure it has started when you see an output like this:
Create a topic and producer for processing bank ATM transactions
Next, we will be creating a bankbranch
topic to process the messages that come from the ATM machines of bank branches.
Suppose the messages come from the ATM in the form of a simple JSON object, including an ATM id and a transaction id like the following example:
{"atmid": 1, "transid": 100}
To process the ATM messages, let’s first create a new topic called bankbranch
.
- Start a new terminal and go to the extracted
Kafka
folder:
cd kafka_2.13-3.1.0
- Create a new topic using the
--topic
argument with the namebankbranch
. In order to simplify the topic configuration and better explain how message key and consumer offset work, here we specify--partitions 2
an argument to create two partitions for this topic. You may try otherpartitions
settings for this topic if you are interested in comparing the difference.
#For linux/MacOS
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic bankbranch --partitions 2#For windows system
D:\software\kafka_2.13-3.1.0\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic bankbranch
Now let’s list all the topics to see if bankbranch
has been created successfully.
#For linux/MacOS
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list#For windows system
D:\software\kafka_2.13-3.1.0\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list
We can also use the --describe
command to check the details of the topic bankbranch
#For linux/MacOS
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bankbranch#For windows system
D:\software\kafka_2.13-3.1.0\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic bankbranch
and you can see bankbranch
has two partitions Partition 0
and Partition 1
. If no message keys are specified, messages will be published to these two partitions in an alternating sequence, like this:
Partition 0
-> Partition 1
-> Partition 0
-> Partition 1
…
Next, we can create a producer to publish some ATM transaction messages.
- Stay in the same terminal window with the topic details, then create a producer for the topic
bankbranch
#For linux/MacOS
bin\kafka-console-producer.sh --bootstrap-server localhost:9092 --topic bankbranch#For windows system
D:\software\kafka_2.13-3.1.0\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic bankbranch
To produce the messages, look for the >
icon, and copy and paste the following ATM messages after it:
{"atmid": 1, "transid": 100}{"atmid": 1, "transid": 101}{"atmid": 2, "transid": 200}{"atmid": 1, "transid": 102}{"atmid": 2, "transid": 201}
Then, let’s create a consumer in a new terminal window to consume these 5 new messages.
- Start a new terminal and go to the extracted
Kafka
folder:
cd kafka_2.12-2.8.0
- Then start a new consumer to subscribe to the
bankbranch
topic:
#For linux/MacOS
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankbranch --from-beginning#For windows system
D:\software\kafka_2.13-3.1.0\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic bankbranch --from-beginning
Then, you should see the 5 new messages we just published, but very likely, they are not consumed in the same order as they were published. Normally, you need to keep the consumed messages sorted in their original published order, especially for critical use cases such as financial transactions.
Produce and consume with message keys
In this step, you will be using message keys to ensure that messages with the same key will be consumed in the same order as they were published. In the backend, messages with the same key will be published into the same partition and will always be consumed by the same consumer. As such, the original publication order is kept on the consumer side.
at this point, you should have the following four terminals open in Cloud IDE:
- Zookeeper terminal
- Kafka Server terminal
- Producer terminal
- Consumer terminal
In the next steps, you will be frequently switching among these terminals.
- First, go to the consumer terminal and stop the consumer from using
Ctrl
+C
(Windows)
or Command
+ .
(Mac).
- Then, switch to the Producer terminal and stop the previous producer.
Ok, we can now start a new producer and consumer, this time using message keys. You can start a new producer with the following message key commands:
--property parse.key=true
to make the producer parse message keys--property key.separator=:
define the key separator to be the:
character,
so our message with key now looks like the following key-value pair example: — 1:{"atmid": 1, "transid": 102}
. Here the message key is 1
, which also corresponds to the ATM id, and the value is the transaction JSON object, {"atmid": 1, "transid": 102}
.
- Start a new producer with message key enabled:
#For linux/MacOSbin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic bankbranch --property parse.key=true --property key.separator=:#For windows system
D:\software\kafka_2.13-3.1.0\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic bankbranch --property parse.key=true --property key.separator=:
- Once you see
>
symbol, you can start to produce the following messages, where you define each key to match the ATM id for each message:
1:{"atmid": 1, "transid": 102}1:{"atmid": 1, "transid": 103}2:{"atmid": 2, "transid": 202}2:{"atmid": 2, "transid": 203}1:{"atmid": 1, "transid": 104}
- Next, switch to the consumer terminal again, and start a new consumer with
--property print.key=true --property key.separator=:
arguments to print the keys
#For linux/MacOSbin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankbranch --from-beginning --property print.key=true --property key.separator=:#For windows system
D:\software\kafka_2.13-3.1.0\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic bankbranch --from-beginning --property print.key=true --property key.separator=:
Now, you should see that messages that have the same key are being consumed in the same order (e.g., trans102 -> trans103 -> trans104
) as they were published.
This is because each topic partition maintains its own message queue, and new messages are enqueued (appended to the end of the queue) as they get published to the partition. Once consumed, the earliest messages will be dequeued and no longer be available for consumption.
Recall that with two partitions and no message keys specified, the transaction messages were published to the two partitions in rotation:
- Partition 0:
[{"atmid": 1, "transid": 102}, {"atmid": 2, "transid": 202}, {"atmid": 1, "transid": 104}]
- Partition 1:
[{"atmid": 1, "transid": 103}, {"atmid": 2, "transid": 203}]
As you can see, the transaction messages from atm1
and atm2
got scattered across both partitions. It would be difficult to unravel this and consume messages from one ATM with the same order as they were published.
However, with the message key specified as the atmid
value, the messages from the two ATMs will look like the following:
- Partition 0:
[{"atmid": 1, "transid": 102}, {"atmid": 1, "transid": 103}, {"atmid": 1, "transid": 104}]
- Partition 1:
[{"atmid": 2, "transid": 202}, {"atmid": 2, "transid": 203}]
Messages with the same key will always be published to the same partition so that their published order will be preserved within the message queue of each partition.
As such, we can keep the states or orders of the transactions for each ATM.
Consumer Offset
Topic partitions keep published messages in a sequence, like a list. Message offset indicates a message’s position in the sequence. For example, the offset of an empty Partition 0 of bankbranch
is 0
, and if you publish the first message to the partition, its offset will be 1
.
By using offsets in the consumer, you can specify the starting position for message consumption, such as from the beginning to retrieve all messages, or from some later point to retrieve only the latest messages.
Consumer Group
In addition, we normally group related consumers together as a consumer group. For example, we may want to create a consumer for each ATM in the bank and manage all ATM-related consumers together in a group.
So let’s see how to create a consumer group, which is actually very easy with the --group
argument.
- In the consumer terminal, stop the previous consumer if it is still running.
- Run the following command to create a new consumer within a consumer group called
atm-app
:
#For linux/MacOSbin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankbranch --group atm-app#For windows system
D:\software\kafka_2.13-3.1.0\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic bankbranch --group atm-app
After the consumer within the atm-app
the consumer group is started, you should not expect any messages to be consumed. This is because the offsets for both partitions have already reached the end. In other words, all messages have already been consumed, and therefore dequeued, by previous consumers.
You can verify that by checking consumer group details.
- Stop the consumer.
- Show the details of the consumer group
atm-app
:
#For linux/MacOSbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group atm-app#For windows system
D:\software\kafka_2.13-3.1.0\bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group atm-app
Now you should see the offset information for the topic bankbranch
:
Recall that we have published 10
messages in total, and we can see the CURRENT-OFFSET
column of partition 1 is 6
and CURRENT-OFFSET
of partition 0 is 4
, and they add up to 10 messages.
The LOG-END-OFFSET
column indicates the last offset or the end of the sequence, which is 6 for partition 1 and 4 for partition 0. Thus, both partitions have reached the end of their queues and no more messages are available for consumption.
Meanwhile, you can check the LAG
column which represents the count of unconsumed messages for each partition. Currently, it is 0
for all partitions, as expected.
Now, let’s produce more messages and see how the offsets change.
- Switch to the previous producer terminal, and publish two more messages:
1:{"atmid": 1, "transid": 105}2:{"atmid": 2, "transid": 204}
and let’s switch back to the consumer terminal and check the consumer group details again:
#For linux/MacOSbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group atm-app#For windows system
D:\software\kafka_2.13-3.1.0\bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group atm-app
You should see that both offsets have been increased by 1, and the LAG
columns for both partitions have become 1
. It means we have 1 new message for each partition to be consumed.
- Let’s start the consumer again and see whether the two new messages will be consumed.
#For linux/MacOSbin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankbranch --group atm-app#For windows system
D:\software\kafka_2.13-3.1.0\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic bankbranch --group atm-app
OK, now both partitions have reached the end once again. But what if you want to consume the messages again from the beginning?
We can do that via resetting offset in the next step.
Reset offset
We can reset the index with the --reset-offsets
argument.
First, let’s try resetting the offset to the earliest position (beginning) using --reset-offsets --to-earliest
.
- Stop the previous consumer if it is still running, and run the following command to reset the offset:
#For linux/MacOSbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --topic bankbranch --group atm-app --reset-offsets --to-earliest --execute#For windows system
D:\software\kafka_2.13-3.1.0\bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --topic bankbranch --group atm-app --reset-offsets --to-earliest --execute
Now the offsets have been set to 0 (the beginning).
- Start the consumer again:
#For linux/MacOSbin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankbranch --group atm-app#For windows system
D:\software\kafka_2.13-3.1.0\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic bankbranch --group atm-app
You should see that all 12 messages are consumed and that all offsets have reached the partition ends again.
In fact, you can reset the offset to any position. For example, let’s reset the offset so that we only consume the last two messages.
- Stop the previous consumer
- Shift the offset to left by 2 using
--reset-offsets --shift-by -2
:
#For linux/MacOSbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --topic bankbranch --group atm-app --reset-offsets --shift-by -2 --execute#For windows system
D:\software\kafka_2.13-3.1.0\bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --topic bankbranch --group atm-app --reset-offsets --shift-by -2 --execute
- If you run the consumer again, you should see that we consumed 4 messages, 2 for each partition:
#For linux/MacOSbin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankbranch --group atm-app#For windows system
D:\software\kafka_2.13-3.1.0\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic bankbranch --group atm-app
Summary
We have learned how to include message keys in publication to keep their message states/order. You have also learned how to reset the offset to control the message consumption starting point.
Reference
[1] https://www.coursera.org/professional-certificates/data-warehouse-engineering