Apache Kafka: How to create a Kafka Consumer and Producer using Python

cem akpolat
5 min readDec 21, 2022

--

Data streaming is one of the hot topics of IoT and data analysis of IoT devices and services. Apart from MQTT, Apache Kafka is also commonly preferred by the developers, where the connection between the data provider and consumer is more stable. In this article, the whole purpose is to build an environment on Mac OSX, in which we can profit from the kafka features in a short amount of time.

Before doing all steps my self, I looked at the available tutorials and articles. I encountered this Medium article, Apache Kafka Installation on Mac using Homebrew. After performing steps in this tutorial, it seems that some information are not needed or outdated. This is a common problem for all us writers, especially, if the topic is related to the tech topic. Because of this, the plan in this tutorial is:

  1. Install Apache Kafka on Mac OSX (actual OS version is Monterey)
  2. Perform Kafka-Producer and Kafka-Consumer Test on Terminal
  3. Write a Python Script Replicating Previous Operation Programmatically

1. Install Apache Kafka using Homebrew

Assuming that you don't have all these apps, therefore you can install Java and Kafka as below. Why Java, since Kafka is written in Java.

brew cask install java
brew install kafka

During the installation, the following libraries are fetched and installed

==> Fetching dependencies for kafka: giflib, libpng, freetype, 
fontconfig, pcre2, glib, pkg-config, libpthread-stubs, xorgproto,
libxau, libxdmcp, libxcb, libx11, libxext, libxrender,
lzo, pixman, cairo, graphite2, icu4c, harfbuzz, jpeg-turbo, lz4, xz,
zstd, libtiff, little-cms2, openjdk, openssl@1.1 and zookeeper

As soon is it is installed, the installation operation is ended with the following message:

To restart kafka after an upgrade:
brew services restart kafka
Or, if you don't want/need a background service you can just run:
/opt/homebrew/opt/kafka/bin/kafka-server-start /opt/homebrew/etc/kafka/server.properties

In my case, using it as background service was logical therefore, I simply restarted the Kafka. Zookeeper also runs as a background service.

brew services restart kafka 
brew services restart zookeeper

Kafka starts with the default port 9092, if you plan to change it, than you need to edit the configuration file under

/opt/homebrew/etc/kafka/server.properties

as explained in this tutorial, notice that the path is different.

To be sure whether kafka and zookeeper runs, we can execute, and its result should be as shown below.

$ brew services 

At least, these two services status should be seen as started.

2. Perform Kafka-Producer and Kafka-Consumer Test on Terminal

To be able to use the kafka producer and consumer, kafka topics should be created beforehand. The reason relies on the publish/subscribe concept. The idea is simple, if a data producer aims to share its data, i.e. monthly report, first a monthly report topic is created. Afterwards, the customer who may have interest in this data, listens this topic to get updated. Producer publishes the newest data with a specific topic, kafka broker receives it and looks its topics and send the clients who listen it. For this reason,

  1. Create Kafka Topic
$ kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic monthly-report
Created topic monthly-report.

2. Send Data via Kafka Producer

$ kafka-console-producer --broker-list localhost:9092 --topic monthly-report

I wrote two reports, namely, report1 and report2

3. Listen Topic via Kafka Consumer

kafka-console-consumer --bootstrap-server localhost:9092 --topic monthly-report --from-beginning

After executing the code above, these two reports should be delivered to the consumer, and here is the result

3. Write a Python Script Replicating Previous Operation Programmatically

Kafka-python library is enough mature to realize to implement Kafka-Consumer and -Producer, and we will use the example given in this library website. Firstly we need to install the kafka-python library, then implement kafka-producer and -consumer scripts, and finally execute them on the terminal

  1. Install Kafka-Python Library
pip3 install kafka-python

2. Implement Kafka-Producer

from kafka import KafkaProducer
from kafka.errors import KafkaError
import logging as log
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
future = producer.send('monthly-report', b'raw_bytes')

# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass
# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

3. Implement Kafka-Consumer

from kafka import KafkaConsumer

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('monthly-report',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))

4. Test Kafka-Producer and -Consumer

Kafka consumer execution and results after the producer is called.

$ python3 kafka_consumer.py
monthly-report:0:3: key=None value=b'raw_bytes'
monthly-report:0:4: key=None value=b'raw_bytes'

Kafka producer and the console output:

$ python3 kafka_producer.py
monthly-report
0
5

In the meantime, the previous kafka consumer that we tested in the 2nd section received also the same messages, since it was registered also with this monthly-report topic. Here, the console outputs of the other client.

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic monthly-report --from-beginning
report1
report2
raw_bytes
raw_bytes
raw_bytes
raw_bytes

Summary

The purpose of this tutorial is to install kafka and test it with one of the python libraries. First, kafka producer and consumer are tested on the terminal, and then a similar setup is built with the implemented kafka-consumer and -producer.

For more detailed information and mind-blowing examples about kafka, I would recommend the website of Kai Waehner and the official Apache Kafka website.

References

  1. https://kafka.apache.org/
  2. https://www.kai-waehner.de/
  3. https://kafka-python.readthedocs.io/en/2.0.1/usage.html
  4. https://mail-narayank.medium.com/kafka-architecture-internal-d0b3334d1df
  5. https://medium.com/analytics-vidhya/apache-kafka-architecture-getting-started-with-apache-kafka-771d69ac6cef

--

--