Transactions Aren’t Just for Databases — Kafka Does Them Too
We have a use case where a consumer reads a message, processes it, and sends the result to another topic. If something goes wrong during this process, we need to roll back the operation.
Yes, with Kafka transactions, everything is handled in a single atomic operation.
Real world scenario
When a ride ends, the system must:
- Generate a payment charge
- Update trip status to “completed”
- Push notification to the user
Using Kafka transactions, all related events (on different topics) can be committed atomically. If Kafka crashes or the app fails mid-way, either all these records are visible to consumers or none are — ensuring exactly-once and consistent updates.
1. Create a producer
1const producer = client.producer({
2 transactionalId: `ride-transaction`,
3 maxInFlightRequests: 1, // Default value is 5
4 idempotent: true
5})
Why and what is idempotent?
If
enable.idempotence
set totrue
then the producer ensures that the message is delivered only once(Exactly once semantics), eventhough there are network issues | broker failureThe producer config
transactional.id
is a unique identifier for the producer instance and it allows kafka to track the transactions of the prodcuerAs the name suggests, it is the number of requests that can be made over a single connection. when
max.in.flight.requests.per.connection
is set to less than 5, say 2(Default is 5), it means producer can send 2 requests in a single connection without acknowledgement.
Create a transaction
1const transaction = await producer.transaction()
Emit respective events
1await transaction.send({
2 topic: "trip-status-topic",
3 messages: [{ value: "<STATUS>" }]
4})
5await transaction.send({
6 topic: "payment-processing-topic",
7 messages: [{ value: "<AMOUNT>" }]
8})
9await transaction.send({
10 topic: "alert-notification-topic",
11 messages: [{ value: "<ALERT-MESSAGE>" }]
12})
Commit the created transaction
1await transaction.commit()
All events are atomically published. Consumers using read_committed will receive only fully committed transactions.
If Kafka crashes or something goes wrong, abort the transaction
1 await transaction.abort()