Recently I had some issues with Kafka client in Python. I wanted to write
a Kafka event consumer, which will be able to stop gracefully
The other requirement is to be able to run multiple instances of this consumer.
I decided to use the newest python-kafka library - currently version 1.3.3. But maybe let’s start from what actually is this Kafka.
Some info on Kafka and python-kafka
Before we start here is an simple overview on Kafka and python-kafka library. If you know any basics just skip this part.
Very short overview on Apache Kafka
Kafka is a distributed streaming platform created by Apache. It uses its own protocol called kafka. It’s quite similar to publish-subscribe model like in MQTT protocol, but also uses queue concepts like in RabbitMQ (AMPQ protocol). We can say that it’s a generalization of publish-subscribe and queue model.
In regular queue, event can go to only one subscriber, then it’s gone - in Kafka it will go to all subscribers. On the other hand in publish-subscribe model, scaling is much harder. In Kafka they resolved this issue with scaling somehow (I don’t know yet how!). As it supposed to be short, I’ll write more about Kafka in future.
Very short overview on python-kafka
As Kafka is using publish-subscribe model - client for it needs an event consumer and an event producer. Library python-kafka which is a Python client for Kafka, according to documentation consist of:
kafka.KafkaConsumer- it should consume Kafka events (so it’s a subscriber)
kafka.KafkaProducer- for producing Kafka events
Also there is
kafka.KafkaClient which is used by both of them - it shouldn’t
be used explicitly.
If you look to a documentation page or old versions of this library, there
kafka.SimpleProducer - they
are to be deprecated soon, so I’m not going to use them.
I wanted to write a Kafka event consumer, which
will be able to stop gracefully
SIGINT signal. By stop gracefully I mean here
that I’ll use
kafka.KafkaConsumer.close() - it’s rightful method for closing
I had two approaches:
To be able to close my consumer I decided to start from
kafka.KafkaConsumer into a thread.
I created a
Consumer class which was my thread class. It
was supposed to run
kafka.KafkaConsumer in it:
SIGTERM all running threads were to be marked with
should_stop flag. With this flag set, loop in run for each thread was supposed
to stop. Everything was fine, except that they stopped only after receiving
next message. This is because
kafka.KafkaConsumer is actually an iterator.
Anyway, when I was debugging I realized it’s not a good approach at all, because
kafka.KafkaClient class in file
client_async.py there is a small comment:
This class is not thread-safe!
I had to study almost all consumer related code of this library to find it. Also I could read whole documentation to find it - it was quite hidden. I’m not sure why they didn’t stress it more.
So, I had to change my approach.
Using kafka.KafkaConsumer explicitly
Still I needed some mechanism to be able to stop iteration of loop like this:
kafka.KafkaConsumer is deriving from
six.Iterator (so Iterator compatible
with both Python 2 and Python 3), it implements
It’s clearly written that iteration (waiting for next messages mechanism) will
StopIteration exception will be thrown. So I changed my
previous code a bit.
Consumer now derives from
kafka.KafkaConsumer (so it’s not a thread anymore!)
and has methods
run runs a message iterator,
StopIteration exception to finish
message iterator loop:
When receiving signal
exit_gracefully is run. It raises
StopIteration which stops an iterator loop, after loop there is
instruction, which closes kafka consumer properly.
What about multiple consumers?
Solution above doesn’t work as expected when having more than one consumer.
Let’s say we want to close consumers on received
signal in loop like this:
It’s impossible, because raised
is propagated in code of
I need to interrupt (send
as many times as I’ve consumers to stop this program. I’ve no idea how to
solve this… Any ideas?
They say we should always close everything gracefully:
close sockets, close connections, close files, etc. But sometimes it’s not so
easy to do this properly. I made so big effort, spent so much time, to just
method and still I’m not so successful… I’ve a partial success, because
I’m able to close all consumers, but still I’ve to send a few interrupt signals
instead of one.