Kafka Interview Questions (Free Preview)
Free sample of 15 from 60 questions available
Producers
What is a Kafka producer and how does it work?
The 30-Second Answer: A Kafka producer is a client application that publishes records to Kafka topics. It serializes data, determines the partition (using key hashing or custom logic), buffers records in memory for batching, and sends them to the appropriate broker leader, handling retries and acknowledgments automatically.
The 2-Minute Answer (If They Want More): A Kafka producer is responsible for writing data to Kafka topics through a sophisticated multi-step process. When you create a producer record, it first goes through serialization where both the key and value are converted to byte arrays. The producer then determines which partition to send the record to - either by hashing the key (if provided), using a custom partitioner, or round-robin distribution (if no key exists).
Once the partition is determined, the record is added to an internal buffer organized by topic-partition. The producer doesn't immediately send each record; instead, it batches multiple records together to improve throughput. A background I/O thread manages the actual network communication, sending batches to the appropriate broker leaders.
The producer maintains metadata about the cluster topology, including which brokers are leaders for which partitions. This metadata is refreshed periodically or when failures occur. When sending data, the producer communicates directly with the partition leader, bypassing unnecessary hops. After sending, the producer waits for acknowledgments based on the configured acks setting, and implements automatic retry logic for failed requests.
Modern producers also support idempotent writes and transactions, ensuring exactly-once semantics when properly configured. The producer handles compression, metrics collection, and provides callbacks for asynchronous send operations.
Code Example:
// Basic Kafka producer configuration and usage
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class BasicProducerExample {
public static void main(String[] args) {
// Configure producer properties
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Create producer instance
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
// Create a producer record
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key-1", "Hello Kafka!");
// Synchronous send
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Sent to partition %d at offset %d%n",
metadata.partition(), metadata.offset());
// Asynchronous send with callback
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.printf("Async sent to partition %d%n", metadata.partition());
}
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
Mermaid Diagram:
flowchart TD
A[Producer Application] --> B[Serialize Key & Value]
B --> C{Partition Determined?}
C -->|Key Provided| D[Hash Key to Partition]
C -->|No Key| E[Round Robin Partition]
C -->|Custom Partitioner| F[Custom Logic]
D --> G[Add to Record Batch Buffer]
E --> G
F --> G
G --> H{Batch Ready?}
H -->|Size/Time Threshold Met| I[I/O Thread Sends Batch]
H -->|Not Ready| G
I --> J[Broker Leader Receives]
J --> K[Write to Log]
K --> L{Acks Setting?}
L -->|acks=0| M[No Wait]
L -->|acks=1| N[Wait for Leader]
L -->|acks=all| O[Wait for All ISR]
N --> P[Send Acknowledgment]
O --> P
P --> Q{Success?}
Q -->|Yes| R[Callback Success]
Q -->|No| S{Retries Left?}
S -->|Yes| I
S -->|No| T[Callback Error]
References:
↑ Back to topConsumers
What is consumer rebalancing and how does it work?
The 30-Second Answer: Consumer rebalancing is the process where Kafka redistributes partition assignments among consumers in a group when membership changes (consumer joins, leaves, or fails) or when topic partitions are added. During rebalancing, consumers temporarily stop consuming messages while partitions are reassigned, which can cause brief processing delays.
The 2-Minute Answer (If They Want More): Rebalancing is triggered by several events: a new consumer joining the group, an existing consumer leaving gracefully, a consumer crashing (detected via heartbeat timeout), or partition count changes in subscribed topics. When a rebalance occurs, Kafka's group coordinator works with consumers to redistribute partitions fairly across all active members.
The rebalancing process follows these steps: First, consumers stop fetching data and commit their current offsets. Then, they rejoin the consumer group and receive new partition assignments from the group coordinator based on the configured partition assignment strategy. Finally, consumers begin fetching from their newly assigned partitions, starting from the last committed offset.
There are two types of rebalancing protocols. The older "eager rebalancing" stops all consumers and reassigns all partitions, causing a complete pause in consumption. The newer "cooperative rebalancing" (incremental rebalancing) only revokes and reassigns the partitions that need to move, allowing unaffected consumers to continue processing, which significantly reduces downtime.
Rebalancing overhead can be minimized by tuning session timeout, heartbeat interval, and max poll interval configurations. Using cooperative rebalancing and static membership (where consumers have fixed IDs that survive restarts) can further reduce rebalancing frequency and duration in production systems.
Code Example:
// Consumer configuration for optimized rebalancing
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Rebalancing tuning parameters
props.put("session.timeout.ms", "45000"); // How long before considering consumer dead
props.put("heartbeat.interval.ms", "3000"); // Heartbeat frequency
props.put("max.poll.interval.ms", "300000"); // Max time between poll() calls
// Use cooperative sticky assignor for incremental rebalancing
props.put("partition.assignment.strategy",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
// Static membership (optional) - survives short restarts without rebalancing
props.put("group.instance.id", "consumer-1"); // Unique static ID per consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Listen to rebalance events
consumer.subscribe(Arrays.asList("my-topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Partitions revoked: " + partitions);
// Commit offsets before rebalancing
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Partitions assigned: " + partitions);
// Initialize state for new partitions
}
});
Mermaid Diagram:
sequenceDiagram
participant C1 as Consumer 1
participant C2 as Consumer 2
participant C3 as Consumer 3 (new)
participant GC as Group Coordinator
participant K as Kafka Broker
Note over C1,C2: Normal consumption with partitions 0-5 split
C3->>GC: JoinGroup Request
GC->>C1: Stop fetching & prepare to rebalance
GC->>C2: Stop fetching & prepare to rebalance
C1->>K: Commit current offsets
C2->>K: Commit current offsets
C1->>GC: Rejoin group
C2->>GC: Rejoin group
Note over GC: Calculate new partition assignments<br/>for 3 consumers
GC->>C1: Assign partitions 0,1
GC->>C2: Assign partitions 2,3
GC->>C3: Assign partitions 4,5
C1->>K: Resume consuming 0,1
C2->>K: Resume consuming 2,3
C3->>K: Start consuming 4,5
Note over C1,C3: Rebalancing complete - all consumers active
References:
↑ Back to topWhat is the difference between at-least-once, at-most-once, and exactly-once semantics?
The 30-Second Answer: At-least-once guarantees every message is processed but may result in duplicates, at-most-once ensures messages are never duplicated but may be lost, and exactly-once guarantees each message is processed once and only once. The choice depends on your application's tolerance for duplicates versus data loss, with exactly-once providing the strongest guarantee at the cost of complexity and performance.
The 2-Minute Answer (If They Want More): These delivery semantics define how Kafka handles message processing in the face of failures. At-least-once is achieved by committing offsets after processing messages—if a consumer crashes after processing but before committing, it will reprocess those messages on restart. This is the default and most common approach, as most applications prefer duplicates over data loss and can implement idempotent processing.
At-most-once commits offsets before processing messages. If the consumer crashes during processing, those messages are lost because Kafka believes they were already handled. This is rare in practice but acceptable for metrics, logs, or other non-critical data where occasional loss is tolerable and duplicates would cause incorrect aggregations.
Exactly-once semantics (EOS) is the most challenging to implement but provides the strongest guarantee. Kafka achieves this through idempotent producers, transactional APIs, and isolation levels. For stream processing with Kafka Streams, exactly-once is built-in. For consumer applications, you need to either use transactional commits (committing offsets in the same transaction as your output) or implement idempotent processing with external systems.
The performance impact varies significantly. At-least-once has minimal overhead, at-most-once is slightly faster but risky, and exactly-once can reduce throughput by 20-30% due to additional coordination overhead. Most production systems use at-least-once with idempotent processing logic (like upserts with unique keys) to achieve effectively-once semantics without the full EOS overhead.
Code Example:
// 1. AT-MOST-ONCE: Commit before processing (messages may be lost)
Properties atMostOnceProps = new Properties();
atMostOnceProps.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer1 = new KafkaConsumer<>(atMostOnceProps);
consumer1.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer1.poll(Duration.ofMillis(100));
// Commit BEFORE processing (risky!)
consumer1.commitSync();
for (ConsumerRecord<String, String> record : records) {
processRecord(record); // If this fails, message is lost
}
}
// 2. AT-LEAST-ONCE: Commit after processing (messages may duplicate)
Properties atLeastOnceProps = new Properties();
atLeastOnceProps.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer2 = new KafkaConsumer<>(atLeastOnceProps);
consumer2.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer2.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record); // If crash happens here, will reprocess
}
// Commit AFTER processing
consumer2.commitSync(); // If crash before this, messages reprocessed
}
// 3. EXACTLY-ONCE: Using transactional consumer-producer pattern
Properties eosProps = new Properties();
eosProps.put("enable.auto.commit", "false");
eosProps.put("isolation.level", "read_committed"); // Read only committed messages
KafkaConsumer<String, String> consumer3 = new KafkaConsumer<>(eosProps);
consumer3.subscribe(Arrays.asList("input-topic"));
Properties producerProps = new Properties();
producerProps.put("transactional.id", "my-transaction-id"); // Required for EOS
producerProps.put("enable.idempotence", "true");
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
producer.initTransactions();
try {
while (true) {
ConsumerRecords<String, String> records = consumer3.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
producer.beginTransaction();
try {
// Process records and produce results
for (ConsumerRecord<String, String> record : records) {
String result = processRecord(record);
producer.send(new ProducerRecord<>("output-topic", record.key(), result));
}
// Commit consumer offsets as part of the transaction
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
producer.sendOffsetsToTransaction(offsets, consumer3.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
}
}
}
} finally {
producer.close();
consumer3.close();
}
// 4. EFFECTIVELY-ONCE: At-least-once with idempotent processing
while (true) {
ConsumerRecords<String, String> records = consumer2.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Use idempotent operations (upsert with unique key)
String uniqueKey = record.key();
String value = record.value();
// Database upsert - safe to execute multiple times
database.upsert(uniqueKey, value); // Same key = same result
}
consumer2.commitSync();
}
Comparison Table:
| Semantic | Commit Timing | Guarantees | Risk | Performance | Use Case |
|---|---|---|---|---|---|
| At-most-once | Before processing | No duplicates | Message loss | Fastest | Logs, metrics, non-critical data |
| At-least-once | After processing | No loss | Duplicates possible | Fast | Most applications with idempotent logic |
| Exactly-once | Transactional | No loss or duplicates | Complex setup | Slower (20-30% overhead) | Financial transactions, critical state |
Mermaid Diagram:
flowchart TD
Start[Message Received] --> Decision{Which Semantic?}
Decision -->|At-Most-Once| Commit1[Commit Offset]
Commit1 --> Process1[Process Message]
Process1 --> Crash1{Crash?}
Crash1 -->|Yes| Lost[Message Lost ❌]
Crash1 -->|No| Success1[Success âś“]
Decision -->|At-Least-Once| Process2[Process Message]
Process2 --> Commit2[Commit Offset]
Commit2 --> Crash2{Crash?}
Crash2 -->|Before Commit| Reprocess[Reprocess = Duplicate ⚠️]
Crash2 -->|After Commit| Success2[Success âś“]
Decision -->|Exactly-Once| BeginTx[Begin Transaction]
BeginTx --> Process3[Process Message]
Process3 --> Produce[Produce Results]
Produce --> CommitTx[Commit Offsets + Transaction]
CommitTx --> Crash3{Crash?}
Crash3 -->|Before Tx Commit| Rollback[Rollback - Retry ↻]
Crash3 -->|After Tx Commit| Success3[Success âś“<br/>Exactly Once]
style Lost fill:#faa
style Reprocess fill:#ffa
style Success1 fill:#afa
style Success2 fill:#afa
style Success3 fill:#afa
References:
↑ Back to topKafka Connect
What is the difference between source and sink connectors?
The 30-Second Answer: Source connectors read data from external systems and produce records to Kafka topics, while sink connectors consume records from Kafka topics and write them to external systems. Source connectors are data importers (e.g., database → Kafka), and sink connectors are data exporters (e.g., Kafka → database).
The 2-Minute Answer (If They Want More): Source and sink connectors represent opposite directions of data flow in the Kafka Connect framework. Source connectors implement the SourceConnector and SourceTask interfaces, continuously polling external systems for new or updated data and producing it to Kafka topics. They must handle offset management to track what data has already been ingested, enabling resumption after failures without duplicates or data loss.
Common source connector patterns include: incrementing mode (using auto-incrementing IDs), timestamp mode (using updated_at columns), bulk mode (full table snapshots), and log-based change data capture (CDC) using database transaction logs. For example, a JDBC source connector might poll a database table every few seconds, reading new rows based on incrementing IDs or timestamps, then producing each row as a Kafka message.
Sink connectors implement the SinkConnector and SinkTask interfaces, consuming records from one or more Kafka topics and writing them to external systems. They receive batches of records from Kafka and must handle failures gracefully, often implementing retry logic, idempotent writes, and transactional semantics. Sink connectors also manage offsets, committing them only after successfully writing data to the destination system.
Common sink connector patterns include: upsert (insert or update based on primary key), append-only (insert new records), and delete (handle tombstone records). For example, an Elasticsearch sink connector consumes JSON messages from Kafka and indexes them in Elasticsearch, using the message key as the document ID for idempotent indexing. Both connector types benefit from parallel execution through task distribution, with the number of tasks configured via the tasks.max property to balance load across the Connect cluster.
Code Example:
// Source Connector - PostgreSQL to Kafka
{
"name": "postgres-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "mydb",
"database.server.name": "dbserver1",
"table.include.list": "public.customers,public.orders",
"tasks.max": 1
}
}
// Sink Connector - Kafka to MongoDB
{
"name": "mongodb-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri": "mongodb://mongo:27017",
"database": "analytics",
"collection": "events",
"topics": "user-events,system-events",
"tasks.max": 2,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
}
Mermaid Diagram:
flowchart TB
subgraph Source Connector Flow
ExtSource[(External Source<br/>Database, File, API)]
SourceTask[Source Task<br/>Polls & Reads Data]
SourceOffset[Offset Storage<br/>Tracks Position]
KafkaTopic1[Kafka Topic]
ExtSource -->|Poll/Query| SourceTask
SourceTask -->|Update| SourceOffset
SourceTask -->|Produce Records| KafkaTopic1
end
subgraph Sink Connector Flow
KafkaTopic2[Kafka Topic]
SinkTask[Sink Task<br/>Consumes & Writes]
SinkOffset[Offset Storage<br/>Tracks Committed]
ExtSink[(External Sink<br/>Database, Storage, Index)]
KafkaTopic2 -->|Consume Records| SinkTask
SinkTask -->|Write Data| ExtSink
SinkTask -->|Commit| SinkOffset
end
style SourceTask fill:#90EE90
style SinkTask fill:#87CEEB
References:
↑ Back to topKafka Streams
What is Kafka Streams and how does it differ from other stream processing frameworks?
The 30-Second Answer: Kafka Streams is a lightweight Java library for building real-time stream processing applications that read from and write to Kafka topics. Unlike frameworks like Apache Flink or Spark Streaming that require separate cluster infrastructure, Kafka Streams runs as part of your application with no external dependencies beyond Kafka itself.
The 2-Minute Answer (If They Want More): Kafka Streams is a client library that enables you to build mission-critical real-time applications and microservices where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java applications with the benefits of Kafka's server-side cluster technology to make these applications highly scalable, elastic, fault-tolerant, and distributed.
The key differentiators from other stream processing frameworks are its deployment model and operational simplicity. With Kafka Streams, there's no need to set up and manage a separate processing cluster like you would with Apache Flink, Spark Streaming, or Apache Storm. You simply include the library in your application, and it leverages Kafka's built-in parallelism through partitions and consumer groups. This makes it ideal for microservices architectures where each service can independently process its streams.
Kafka Streams provides both a high-level DSL (Domain Specific Language) for common operations like filtering, mapping, and aggregations, and a low-level Processor API for more complex stateful processing. It includes built-in support for exactly-once processing semantics, fault-tolerant local state stores, windowing operations, and interactive queries. The library automatically handles load balancing, failure recovery, and state management, making it significantly easier to operate than traditional stream processing frameworks.
Another major advantage is that Kafka Streams applications scale by simply starting more instances - Kafka's consumer group protocol automatically distributes the workload. This elastic scalability, combined with its ability to maintain local state that's continuously backed up to Kafka topics, makes it particularly well-suited for event-driven microservices and real-time data pipelines.
Code Example:
// Simple Kafka Streams application using the DSL
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class WordCountExample {
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// Read from input topic
KStream<String, String> textLines = builder.stream("input-topic");
// Process: split, group, count
textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count()
.toStream()
.to("output-topic");
// Start the streams application
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
// Graceful shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Comparison with Other Frameworks:
| Feature | Kafka Streams | Apache Flink | Spark Streaming |
|---|---|---|---|
| Deployment | Library (runs in your app) | Separate cluster required | Separate cluster required |
| Language Support | Java, Scala | Java, Scala, Python | Java, Scala, Python, R |
| Data Source | Kafka only | Multiple sources | Multiple sources |
| Processing Model | Event-at-a-time | Event-at-a-time | Micro-batching (or continuous) |
| State Management | Local state stores | Managed state backend | RDDs/DataFrames |
| Scaling | Start more app instances | Cluster resource management | Cluster resource management |
| Operational Complexity | Low | Medium-High | Medium-High |
References:
↑ Back to topSchema Registry
What is the difference between backward, forward, and full compatibility?
The 30-Second Answer: BACKWARD compatibility means new code reads old data (add optional fields), FORWARD means old code reads new data (remove optional fields), and FULL means both work simultaneously (add/remove optional fields with defaults). The choice determines your deployment order and allowed schema changes.
The 2-Minute Answer (If They Want More): These three compatibility modes represent different guarantees about schema evolution. BACKWARD compatibility ensures that consumers using the new schema can deserialize messages written with the old schema. This is the most common mode because it supports the typical deployment pattern: upgrade all consumers first, then upgrade producers. With BACKWARD, you can add new optional fields (with defaults) or remove required fields from old schemas.
FORWARD compatibility works in the opposite direction—old consumers can read messages from new producers. This matters when you must update producers before consumers, perhaps because you're adding new data sources or cannot control all consumers. FORWARD allows you to delete optional fields or add new required fields (which old consumers will ignore).
FULL compatibility provides both guarantees simultaneously, meaning consumers and producers can be upgraded in any order. However, this flexibility comes at a cost: the allowed schema changes are more restrictive. You can only add or remove optional fields that have default values. FULL compatibility is ideal for environments where you cannot coordinate deployment order or need maximum flexibility.
The practical impact is significant. Choosing the wrong compatibility mode can lead to deserialization failures in production. For example, if you use FORWARD compatibility but upgrade consumers first, old producers might send data that new consumers can't properly handle. Understanding your deployment patterns and data evolution requirements is crucial for selecting the appropriate mode.
Detailed Comparison:
| Aspect | BACKWARD | FORWARD | FULL |
|---|---|---|---|
| Direction | New reads old | Old reads new | Both directions |
| Deployment Order | Consumers first, then producers | Producers first, then consumers | Any order |
| Add Optional Field | ✅ Yes (with default) | ❌ No | ✅ Yes (with default) |
| Remove Optional Field | ❌ No | ✅ Yes | ✅ Yes (if has default) |
| Add Required Field | ❌ No | ✅ Yes | ❌ No |
| Remove Required Field | ✅ Yes | ❌ No | ❌ No |
| Change Field Type | ❌ No | ❌ No | ❌ No |
| Rename Field | ❌ No (use aliases) | ❌ No (use aliases) | ❌ No (use aliases) |
| Use Case | Standard deployments | External producers | Microservices, flexible deploys |
Code Example:
import org.apache.avro.Schema;
// BACKWARD Compatibility Example
// Old Schema (V1)
String backwardV1 = "{\"type\":\"record\"," +
"\"name\":\"Order\"," +
"\"fields\":[" +
"{\"name\":\"orderId\",\"type\":\"int\"}," +
"{\"name\":\"customerId\",\"type\":\"int\"}," +
"{\"name\":\"total\",\"type\":\"double\"}" +
"]}";
// New Schema (V2) - BACKWARD compatible
// New consumer can read old data
String backwardV2 = "{\"type\":\"record\"," +
"\"name\":\"Order\"," +
"\"fields\":[" +
"{\"name\":\"orderId\",\"type\":\"int\"}," +
"{\"name\":\"customerId\",\"type\":\"int\"}," +
"{\"name\":\"total\",\"type\":\"double\"}," +
"{\"name\":\"discountCode\",\"type\":\"string\",\"default\":\"\"}" + // Added optional
"]}";
// FORWARD Compatibility Example
// Old Schema (V1)
String forwardV1 = "{\"type\":\"record\"," +
"\"name\":\"Product\"," +
"\"fields\":[" +
"{\"name\":\"productId\",\"type\":\"int\"}," +
"{\"name\":\"name\",\"type\":\"string\"}," +
"{\"name\":\"description\",\"type\":\"string\",\"default\":\"\"}," + // Optional
"{\"name\":\"price\",\"type\":\"double\"}" +
"]}";
// New Schema (V2) - FORWARD compatible
// Old consumer can read new data
String forwardV2 = "{\"type\":\"record\"," +
"\"name\":\"Product\"," +
"\"fields\":[" +
"{\"name\":\"productId\",\"type\":\"int\"}," +
"{\"name\":\"name\",\"type\":\"string\"}," +
// Removed 'description' field - old consumer will use default
"{\"name\":\"price\",\"type\":\"double\"}" +
"]}";
// FULL Compatibility Example
// Old Schema (V1)
String fullV1 = "{\"type\":\"record\"," +
"\"name\":\"Customer\"," +
"\"fields\":[" +
"{\"name\":\"customerId\",\"type\":\"int\"}," +
"{\"name\":\"name\",\"type\":\"string\"}," +
"{\"name\":\"email\",\"type\":\"string\",\"default\":\"\"}" + // Optional
"]}";
// New Schema (V2) - FULL compatible
// Both old and new can read each other's data
String fullV2 = "{\"type\":\"record\"," +
"\"name\":\"Customer\"," +
"\"fields\":[" +
"{\"name\":\"customerId\",\"type\":\"int\"}," +
"{\"name\":\"name\",\"type\":\"string\"}," +
"{\"name\":\"email\",\"type\":\"string\",\"default\":\"\"}," +
"{\"name\":\"phoneNumber\",\"type\":\"string\",\"default\":\"\"}" + // Added optional with default
"]}";
// Testing compatibility programmatically
Schema.Parser parser = new Schema.Parser();
Schema oldSchema = parser.parse(fullV1);
Schema newSchema = parser.parse(fullV2);
// Check if schemas are compatible
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
// Check BACKWARD compatibility (new reads old)
SchemaCompatibilityType backwardResult =
SchemaCompatibility.checkReaderWriterCompatibility(newSchema, oldSchema).getType();
System.out.println("BACKWARD compatible: " +
(backwardResult == SchemaCompatibilityType.COMPATIBLE));
// Check FORWARD compatibility (old reads new)
SchemaCompatibilityType forwardResult =
SchemaCompatibility.checkReaderWriterCompatibility(oldSchema, newSchema).getType();
System.out.println("FORWARD compatible: " +
(forwardResult == SchemaCompatibilityType.COMPATIBLE));
// Both must be compatible for FULL compatibility
boolean isFullCompatible =
(backwardResult == SchemaCompatibilityType.COMPATIBLE) &&
(forwardResult == SchemaCompatibilityType.COMPATIBLE);
System.out.println("FULL compatible: " + isFullCompatible);
Mermaid Diagram:
flowchart TD
subgraph BACKWARD["BACKWARD Compatibility"]
B1[Old Producer<br/>Schema V1] -->|Writes| BK[Kafka Topic]
BK -->|Reads| B2[New Consumer<br/>Schema V2]
B2 -.->|Can deserialize<br/>old data| B1
BN[Example: Add optional<br/>field with default]
end
subgraph FORWARD["FORWARD Compatibility"]
F1[New Producer<br/>Schema V2] -->|Writes| FK[Kafka Topic]
FK -->|Reads| F2[Old Consumer<br/>Schema V1]
F2 -.->|Can deserialize<br/>new data| F1
FN[Example: Remove<br/>optional field]
end
subgraph FULL["FULL Compatibility"]
FL1[Any Producer<br/>V1 or V2] -->|Writes| FLK[Kafka Topic]
FLK -->|Reads| FL2[Any Consumer<br/>V1 or V2]
FL2 -.->|Both directions<br/>work| FL1
FLN[Example: Add/remove<br/>optional with defaults]
end
style BACKWARD fill:#e1f5ff,stroke:#333,stroke-width:2px
style FORWARD fill:#fff5e1,stroke:#333,stroke-width:2px
style FULL fill:#e1ffe1,stroke:#333,stroke-width:2px
Real-World Deployment Scenarios:
// Scenario 1: BACKWARD (Most Common)
// Day 1: Deploy new consumers that can handle new field
// - Consumers with schema V2 deployed
// - Can read messages from old producers (V1)
// Day 2: Deploy new producers with new field
// - Producers with schema V2 deployed
// - New consumers already handle the new field
// Scenario 2: FORWARD (External Data Sources)
// Day 1: New data source starts sending enhanced data
// - External producer with schema V2 starts
// - Old consumers (V1) ignore new fields
// Day 2: Update internal consumers to use new data
// - Internal consumers upgraded to V2
// - Can now use the enhanced data
// Scenario 3: FULL (Microservices)
// Any Day: Services update independently
// - Service A updates to V2
// - Service B still on V1
// - Both can communicate without coordination
// - Maximum deployment flexibility
References:
↑ Back to topPerformance and Optimization
What is the impact of replication on performance?
The 30-Second Answer:
Replication impacts write performance by requiring each message to be written to multiple brokers (based on replication factor). With acks=all, producers wait for all in-sync replicas before completing, adding latency. However, replication enables read scalability through follower fetching and provides fault tolerance with minimal impact on read performance.
The 2-Minute Answer (If They Want More):
Replication in Kafka creates multiple copies of each partition across different brokers, providing fault tolerance at the cost of write performance. The replication.factor setting determines how many copies exist - a factor of 3 means each message is written to three brokers. This directly impacts write throughput and latency since the leader must wait for followers to acknowledge writes before considering them committed.
The producer's acks setting determines replication's performance impact. With acks=0, the producer doesn't wait for any acknowledgment, so replication doesn't affect producer latency. With acks=1, the producer waits only for the leader to write, minimizing latency but risking data loss if the leader fails immediately. With acks=all, the producer waits for all in-sync replicas (ISRs) to acknowledge, providing maximum durability but highest latency (typically 2-5x higher than acks=1).
The min.insync.replicas setting works with acks=all to define how many replicas must acknowledge writes. Setting this to 2 with a replication factor of 3 allows one replica to lag without blocking writes, balancing durability and availability. Network latency between brokers directly affects replication performance - brokers in the same datacenter have minimal overhead, while cross-datacenter replication adds significant latency.
Replication actually improves read performance in some scenarios. Recent Kafka versions support follower fetching, allowing consumers to read from nearby follower replicas rather than always hitting the leader. This reduces leader load and network traffic for geographically distributed consumers. However, there's a slight increase in broker disk I/O and network usage due to maintaining replicas. Memory usage also increases as each broker must track additional metadata for follower partitions.
Configuration Example:
# Topic creation with replication
bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic my-replicated-topic \
--partitions 3 \
--replication-factor 3 \
--config min.insync.replicas=2
# Broker replication configuration
default.replication.factor=3
min.insync.replicas=2
replica.lag.time.max.ms=30000 # Max lag before removing from ISR
replica.socket.timeout.ms=30000
replica.fetch.max.bytes=1048576
# Producer with replication awareness
acks=all # Wait for all in-sync replicas
retries=3 # Retry on replication failures
max.in.flight.requests.per.connection=5
enable.idempotence=true # Prevents duplicates on retries
# Consumer follower fetching (Kafka 2.4+)
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
client.rack=rack1 # Consumer reads from nearby replica
Replication Performance Impact:
| Configuration | Write Latency | Write Throughput | Durability | Availability |
|---|---|---|---|---|
| RF=1, acks=1 | Baseline (1x) | Baseline (1x) | None | Low |
| RF=3, acks=1 | +5-10% | -5-10% | Low | Medium |
| RF=3, acks=all, ISR=2 | +100-150% | -30-40% | High | High |
| RF=3, acks=all, ISR=3 | +150-200% | -40-50% | Highest | Medium |
Replication Latency Breakdown:
Producer send (acks=all, RF=3, ISR=2):
├─ Network: Producer → Leader Broker (1-2ms)
├─ Leader disk write (1-3ms)
├─ Leader → Follower 1 replication (1-3ms)
├─ Leader → Follower 2 replication (1-3ms)
├─ Wait for ISR acknowledgments (0-5ms)
└─ Network: Leader → Producer ACK (1-2ms)
Total: ~6-18ms (vs 3-7ms with acks=1)
Monitoring Replication:
# Key JMX metrics
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
kafka.server:type=ReplicaManager,name=PartitionCount
kafka.server:type=ReplicaFetcherManager,name=MaxLag
kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec
kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec
# Check replication status
bin/kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic my-topic
# Output shows leader and ISR
Topic: my-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Optimization Strategies:
# Strategy 1: Balance durability and performance
replication.factor=3
min.insync.replicas=2 # Tolerate 1 replica failure
acks=all
compression.type=lz4 # Reduce replication bytes
# Strategy 2: Critical data, max durability
replication.factor=5
min.insync.replicas=3 # Tolerate 2 failures
acks=all
# Strategy 3: High throughput, acceptable data loss
replication.factor=2
min.insync.replicas=1
acks=1
# Strategy 4: Geo-replication optimization
replica.fetch.max.bytes=2097152 # Larger fetch batches
replica.socket.receive.buffer.bytes=131072
References:
↑ Back to topKafka Fundamentals
What is Apache Kafka and what problems does it solve?
The 30-Second Answer: Apache Kafka is a distributed event streaming platform that enables high-throughput, fault-tolerant, real-time data pipelines and streaming applications. It solves problems related to handling massive volumes of real-time data streams, decoupling data producers from consumers, and providing durable, scalable message storage with replay capabilities.
The 2-Minute Answer (If They Want More): Apache Kafka was originally developed at LinkedIn to solve their data pipeline challenges and became an Apache open-source project in 2011. It addresses several critical problems in modern distributed systems.
Traditional messaging systems struggle with high throughput, data durability, and scalability. Kafka solves these by treating messages as an append-only distributed commit log. This design allows Kafka to handle millions of messages per second while maintaining message ordering within partitions and providing fault tolerance through replication.
Kafka excels at decoupling data producers from consumers, enabling multiple independent consumers to read the same data stream at their own pace. Unlike traditional message queues where messages are deleted after consumption, Kafka retains messages for a configurable period, allowing data replay and new consumers to process historical data.
Common use cases include real-time analytics, log aggregation, event sourcing, microservices communication, IoT data processing, and building data pipelines between systems. Companies like LinkedIn, Netflix, Uber, and Airbnb rely on Kafka to process trillions of messages daily, powering everything from activity tracking to fraud detection systems.
Architecture Diagram:
flowchart LR
subgraph Producers
P1[Producer 1]
P2[Producer 2]
P3[Producer 3]
end
subgraph Kafka Cluster
B1[Broker 1]
B2[Broker 2]
B3[Broker 3]
ZK[ZooKeeper/KRaft]
end
subgraph Consumers
C1[Consumer Group 1]
C2[Consumer Group 2]
C3[Consumer Group 3]
end
P1 --> B1
P2 --> B2
P3 --> B3
B1 -.replication.-> B2
B2 -.replication.-> B3
B3 -.replication.-> B1
ZK -.coordination.-> B1
ZK -.coordination.-> B2
ZK -.coordination.-> B3
B1 --> C1
B2 --> C2
B3 --> C3
References:
↑ Back to topTopics and Partitions
What is log compaction and when would you use it?
The 30-Second Answer: Log compaction is a retention policy that keeps only the most recent value for each message key, rather than deleting old messages after a time/size limit. It's ideal for changelog topics, stateful applications, and snapshots where you need the latest state for each key but don't care about historical updates.
The 2-Minute Answer (If They Want More):
Kafka offers two retention policies: time/size-based deletion and log compaction. With log compaction (cleanup.policy=compact), Kafka ensures that the log contains at least the last known value for each key within each partition.
How It Works: The compaction process runs in the background, creating a new segment that retains only the latest value for each key. Messages with null values are tombstones that mark keys for deletion. After a configurable time (delete.retention.ms), even tombstones are removed.
Use Cases:
- Database Changelogs: Stream database changes to Kafka where downstream consumers need current state, not full history (e.g., Kafka Connect, CDC patterns)
- Application State: Maintain application configuration or user profile snapshots
- Cache Rebuilding: Consumers can replay the compacted log to rebuild an in-memory cache
- Kafka Streams State Stores: Internal topics used by Kafka Streams for state management
Characteristics:
- Guarantees you can replay to get current state for all keys
- Active segment (currently being written) is never compacted
- Ordering is preserved per key
- No guarantee on when compaction runs (controlled by
min.compaction.lag.ms,max.compaction.lag.ms) - Old segments are compacted when log segments are closed
Code Example:
# Create a compacted topic
kafka-topics --create \
--bootstrap-server localhost:9092 \
--topic user-profiles \
--partitions 3 \
--replication-factor 3 \
--config cleanup.policy=compact \
--config min.compaction.lag.ms=60000 \
--config delete.retention.ms=86400000
# Or configure existing topic
kafka-configs --alter \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name user-profiles \
--add-config cleanup.policy=compact
# Produce messages with same key
kafka-console-producer --bootstrap-server localhost:9092 \
--topic user-profiles \
--property parse.key=true \
--property key.separator=:
# Enter messages (key:value format):
user123:{"name":"John","email":"john@example.com","version":1}
user456:{"name":"Jane","email":"jane@example.com","version":1}
user123:{"name":"John","email":"john.doe@example.com","version":2}
user123:null # Tombstone - marks user123 for deletion
# After compaction, log contains:
# user456 -> {"name":"Jane","email":"jane@example.com","version":1}
# user123 -> null (tombstone, eventually deleted after delete.retention.ms)
Mermaid Diagram:
flowchart LR
subgraph Before["Before Compaction"]
B1[user1:v1<br/>offset 0]
B2[user2:v1<br/>offset 1]
B3[user1:v2<br/>offset 2]
B4[user3:v1<br/>offset 3]
B5[user2:v2<br/>offset 4]
B6[user1:v3<br/>offset 5]
B1 --> B2 --> B3 --> B4 --> B5 --> B6
end
Before ==>|Compaction| After
subgraph After["After Compaction (Keeps Latest per Key)"]
A1[user1:v3<br/>offset 5]
A2[user2:v2<br/>offset 4]
A3[user3:v1<br/>offset 3]
end
Note1[Only latest value per key retained<br/>Tombstones user1:null delete the key<br/>Active segment never compacted]
style Before fill:#fbb,stroke:#333,stroke-width:2px
style After fill:#bfb,stroke:#333,stroke-width:2px
style Note1 fill:#ffe,stroke:#333,stroke-width:1px
References:
- Apache Kafka Documentation - Log Compaction
- Confluent: Log Compaction: Highlights in the Apache Kafka and Stream Processing Community
Security
What is Kafka authorization and how do ACLs work?
The 30-Second Answer: Kafka authorization controls what authenticated users can do with Kafka resources using Access Control Lists (ACLs). ACLs define permissions for operations (READ, WRITE, CREATE, DELETE, etc.) on resources (topics, consumer groups, clusters) for specific principals (users/groups). The kafka-acls command-line tool manages ACLs, and by default, Kafka uses the SimpleAclAuthorizer with a deny-by-default policy.
The 2-Minute Answer (If They Want More): Authorization in Kafka determines whether an authenticated principal (user or service) has permission to perform specific operations on Kafka resources. Unlike authentication which verifies "who you are," authorization answers "what you're allowed to do." This is critical in multi-tenant environments where different applications and teams share the same Kafka cluster.
Kafka uses an ACL-based authorization model implemented through the AclAuthorizer (formerly SimpleAclAuthorizer). ACLs are rules that grant or deny permissions, consisting of four components: Principal (who), Permission (allow/deny), Operation (what action), and Resource (which topic/group/cluster). ACLs are stored in ZooKeeper (legacy mode) or in the metadata topic (KRaft mode).
The authorization model supports various resource types including Topics, Groups (consumer groups), Cluster (cluster-wide operations), TransactionalId (for exactly-once semantics), and DelegationToken. Operations include READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, and IDEMPOTENT_WRITE.
By default, Kafka operates in a deny-by-default mode where no operations are permitted unless explicitly allowed by an ACL. This provides strong security but requires careful ACL management. Kafka also supports a permissive mode for development (though not recommended for production). ACLs support wildcards for flexible permission management, can be scoped to specific hosts for network-level restrictions, and can use principal prefixes for group-based permissions.
Super users can be configured to bypass ACL checks entirely, which is useful for administrative tools and monitoring systems. The authorization system integrates seamlessly with all authentication mechanisms, and permissions can be managed dynamically without broker restarts.
Authorization Configuration:
# Server Configuration (server.properties)
# Enable authorization
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# Super users who bypass ACL checks
super.users=User:admin;User:kafka-broker;User:monitoring-system
# Allow everyone if no ACL is found (NOT recommended for production)
allow.everyone.if.no.acl.found=false
# For inter-broker communication
super.users=User:ANONYMOUS;User:admin
ACL Management Examples:
# Grant WRITE permission on a topic to a user
kafka-acls --bootstrap-server localhost:9093 \
--add \
--allow-principal User:alice \
--operation Write \
--topic orders \
--command-config admin.properties
# Grant READ permission on a topic and consumer group
kafka-acls --bootstrap-server localhost:9093 \
--add \
--allow-principal User:bob \
--operation Read \
--topic orders \
--resource-pattern-type literal \
--command-config admin.properties
kafka-acls --bootstrap-server localhost:9093 \
--add \
--allow-principal User:bob \
--operation Read \
--group order-consumer-group \
--command-config admin.properties
# Grant CREATE permission on all topics with prefix
kafka-acls --bootstrap-server localhost:9093 \
--add \
--allow-principal User:app-service \
--operation Create \
--topic dev- \
--resource-pattern-type prefixed \
--command-config admin.properties
# Grant all operations on a topic
kafka-acls --bootstrap-server localhost:9093 \
--add \
--allow-principal User:admin \
--operation All \
--topic test-topic \
--command-config admin.properties
# Grant producer permissions (WRITE + DESCRIBE on topic, CREATE on cluster)
kafka-acls --bootstrap-server localhost:9093 \
--add \
--allow-principal User:producer-app \
--operation Write \
--operation Describe \
--topic orders \
--command-config admin.properties
kafka-acls --bootstrap-server localhost:9093 \
--add \
--allow-principal User:producer-app \
--operation Create \
--cluster \
--command-config admin.properties
# Grant consumer permissions (READ on topic and group, DESCRIBE on topic)
kafka-acls --bootstrap-server localhost:9093 \
--add \
--allow-principal User:consumer-app \
--operation Read \
--operation Describe \
--topic orders \
--command-config admin.properties
kafka-acls --bootstrap-server localhost:9093 \
--add \
--allow-principal User:consumer-app \
--operation Read \
--group consumer-group-1 \
--command-config admin.properties
# Grant permissions for idempotent producer
kafka-acls --bootstrap-server localhost:9093 \
--add \
--allow-principal User:idempotent-producer \
--operation IdempotentWrite \
--cluster \
--command-config admin.properties
# Grant permissions for transactional producer
kafka-acls --bootstrap-server localhost:9093 \
--add \
--allow-principal User:transactional-producer \
--operation Write \
--operation Describe \
--transactional-id txn-id-1 \
--command-config admin.properties
# Deny operation (takes precedence over allow)
kafka-acls --bootstrap-server localhost:9093 \
--add \
--deny-principal User:alice \
--operation Delete \
--topic critical-topic \
--command-config admin.properties
# List ACLs for a topic
kafka-acls --bootstrap-server localhost:9093 \
--list \
--topic orders \
--command-config admin.properties
# List all ACLs for a principal
kafka-acls --bootstrap-server localhost:9093 \
--list \
--principal User:alice \
--command-config admin.properties
# Remove ACL
kafka-acls --bootstrap-server localhost:9093 \
--remove \
--allow-principal User:alice \
--operation Write \
--topic orders \
--command-config admin.properties
# Remove all ACLs for a topic
kafka-acls --bootstrap-server localhost:9093 \
--remove \
--topic test-topic \
--force \
--command-config admin.properties
ACL Architecture:
flowchart TD
A[Client Request] --> B{Authentication}
B -->|Success| C{Authorization Check}
B -->|Failure| D[Reject: Not Authenticated]
C --> E{Is Super User?}
E -->|Yes| F[Allow]
E -->|No| G{Check ACLs}
G --> H{Deny ACL Exists?}
H -->|Yes| I[Reject: Denied by ACL]
H -->|No| J{Allow ACL Exists?}
J -->|Yes| F
J -->|No| K{allow.everyone.if.no.acl.found?}
K -->|true| F
K -->|false| L[Reject: No ACL Found]
F --> M[Execute Operation]
style F fill:#90EE90
style D fill:#FFB6C6
style I fill:#FFB6C6
style L fill:#FFB6C6
style M fill:#87CEEB
Common ACL Patterns:
| Use Case | Required ACLs | Notes |
|---|---|---|
| Producer | WRITE, DESCRIBE on topic; CREATE on cluster | CREATE needed for auto-topic creation |
| Consumer | READ on topic; READ on consumer group | DESCRIBE on topic for metadata |
| Idempotent Producer | IdempotentWrite on cluster | In addition to regular producer ACLs |
| Transactional Producer | WRITE, DESCRIBE on transactional-id | Plus regular producer ACLs |
| Topic Admin | CREATE, DELETE, ALTER, DESCRIBE on topics | Often combined with cluster ACLs |
| Monitoring | DESCRIBE, DESCRIBE_CONFIGS on cluster/topics | Read-only cluster access |
Resource Pattern Types:
# LITERAL - Exact match only
--resource-pattern-type literal --topic orders
# Matches: orders
# Does not match: orders-2023, dev-orders
# PREFIXED - Matches prefix
--resource-pattern-type prefixed --topic dev-
# Matches: dev-orders, dev-users, dev-anything
# Does not match: orders, production-orders
# ANY - Matches both literal and prefixed (used for listing)
--resource-pattern-type any
References:
↑ Back to topReplication and Fault Tolerance
What is the replication factor and how do you choose it?
The 30-Second Answer: The replication factor defines how many copies of each partition exist across the Kafka cluster. A replication factor of 3 is the industry standard, providing a good balance between fault tolerance (can survive 2 broker failures) and resource usage. The minimum should be 2, and you should never use 1 in production environments.
The 2-Minute Answer (If They Want More): The replication factor (RF) is a critical configuration that determines your cluster's resilience and resource requirements. With RF=N, your cluster can tolerate N-1 broker failures without data loss. However, higher replication factors come with trade-offs: increased storage requirements (RF=3 uses 3x disk space), more network bandwidth for replication traffic, and potentially higher write latency.
For production environments, RF=3 has become the de facto standard across the industry. This configuration strikes an optimal balance: it provides strong durability (surviving two simultaneous broker failures is extremely rare), reasonable resource overhead, and aligns well with common deployment patterns (3+ broker clusters). Tech giants like LinkedIn, Netflix, and Uber typically use RF=3 for critical production workloads.
When choosing your replication factor, consider these factors: the criticality of your data (financial transactions might warrant RF=4 or 5), your cluster size (RF should not exceed the number of brokers), operational requirements (more replicas mean more complex maintenance), and cost constraints (storage and bandwidth costs scale linearly with RF). For less critical data like logs or metrics, RF=2 might be acceptable, but RF=1 should be avoided in production as it provides no fault tolerance.
It's also important to note that the replication factor is set per topic and cannot be easily changed after creation (though it can be modified using partition reassignment tools). Plan your replication strategy during the design phase, considering both current and future needs.
Code Example:
# Standard production topic (RF=3)
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic critical-transactions \
--partitions 12 \
--replication-factor 3 \
--config min.insync.replicas=2
# High-criticality topic (RF=4)
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic financial-ledger \
--partitions 6 \
--replication-factor 4 \
--config min.insync.replicas=3
# Modify existing topic's replication factor (complex operation)
# 1. Generate reassignment JSON
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic orders > current-assignment.json
# 2. Edit JSON to add replicas
# 3. Execute reassignment
kafka-reassign-partitions.sh \
--bootstrap-server localhost:9092 \
--reassignment-json-file new-assignment.json \
--execute
# Storage calculation
# RF=3, 100GB data per day = 300GB storage needed
# RF=4, 100GB data per day = 400GB storage needed
Decision Matrix:
flowchart TD
Start[Choose Replication Factor] --> Critical{Data Criticality?}
Critical -->|Critical| HighRF[RF = 3 or higher]
Critical -->|Non-Critical| LowRF{Production?}
LowRF -->|Yes| RF2[RF = 2 minimum]
LowRF -->|No/Dev| RF1[RF = 1 acceptable]
HighRF --> BrokerCount{Broker Count?}
BrokerCount -->|5+ brokers| RF4[Consider RF = 4-5]
BrokerCount -->|3-4 brokers| RF3[Use RF = 3]
BrokerCount -->|< 3 brokers| Scale[Scale cluster first]
RF2 --> MinISR2[Set min.insync.replicas = 1]
RF3 --> MinISR3[Set min.insync.replicas = 2]
RF4 --> MinISR4[Set min.insync.replicas = 3]
style RF3 fill:#90EE90
style RF4 fill:#87CEEB
style RF1 fill:#FFB6C1
References:
↑ Back to topMonitoring and Operations
What is MirrorMaker and how do you use it for replication?
The 30-Second Answer: MirrorMaker is Kafka's cross-cluster replication tool that copies data between Kafka clusters. MirrorMaker 2 (MM2), built on Kafka Connect, is the current version offering bidirectional replication, automatic topic creation, offset translation, and configurable replication flows. It's essential for disaster recovery, data aggregation, cloud migration, and geographic distribution of Kafka clusters.
The 2-Minute Answer (If They Want More):
MirrorMaker enables replication between geographically distributed or logically separated Kafka clusters. The original MirrorMaker (now legacy) was a simple consumer-producer pair that consumed from source topics and produced to destination topics. MirrorMaker 2 (introduced in Kafka 2.4) is a complete rewrite based on Kafka Connect framework, offering significantly more features and reliability.
MM2 provides several key capabilities: bidirectional replication (active-active scenarios), automatic topic and configuration sync, consumer group offset translation (enabling failover), ACL replication, and prefix-based topic naming to prevent replication loops. It supports multiple replication topologies including hub-and-spoke, active-passive, active-active, and aggregation patterns.
Common use cases include disaster recovery (replicate to standby datacenter), cloud migration (gradually move from on-premise to cloud), compliance and data locality (keep data in specific regions), and edge-to-cloud aggregation (collect data from edge clusters to central processing). MM2 can filter topics using regex patterns, transform data during replication, and maintain exactly-once semantics within the replication flow.
The architecture consists of three main connectors: MirrorSourceConnector (replicates topics and records), MirrorCheckpointConnector (replicates consumer offsets), and MirrorHeartbeatConnector (monitors replication lag and cluster connectivity). Each runs as a Kafka Connect task, providing scalability and fault tolerance. Configuration is straightforward using standard Connect properties, specifying source and target clusters, replication policies, and filtering rules.
MirrorMaker 2 Architecture:
flowchart LR
subgraph Source["Source Cluster (us-west)"]
ST1[orders]
ST2[payments]
SCG[Consumer Groups]
end
subgraph MM2["MirrorMaker 2"]
MSC[MirrorSource<br/>Connector]
MCC[MirrorCheckpoint<br/>Connector]
MHC[MirrorHeartbeat<br/>Connector]
end
subgraph Target["Target Cluster (us-east)"]
TT1[us-west.orders]
TT2[us-west.payments]
TCG[Replicated Offsets]
THB[Heartbeat Topic]
end
ST1 -->|Replicate Data| MSC
ST2 -->|Replicate Data| MSC
SCG -->|Replicate Offsets| MCC
MSC -->|Write| TT1
MSC -->|Write| TT2
MCC -->|Write| TCG
MHC -->|Monitor| THB
style MM2 fill:#e1f5ff
style Source fill:#fff4e1
style Target fill:#e8f5e9
Basic Configuration:
# mm2.properties - MirrorMaker 2 configuration
# Define clusters
clusters = source, target
source.bootstrap.servers = source-kafka:9092
target.bootstrap.servers = target-kafka:9092
# Enable replication from source to target
source->target.enabled = true
source->target.topics = orders.*, payments.*, inventory.*
# Consumer and producer settings
source.consumer.group.id = mm2-source-consumer
source.consumer.auto.offset.reset = earliest
source.consumer.max.poll.records = 500
target.producer.compression.type = lz4
target.producer.batch.size = 32768
target.producer.linger.ms = 10
# Replication factor for internal topics
replication.factor = 3
checkpoints.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
offset-syncs.topic.replication.factor = 3
# Sync topic configurations
sync.topic.configs.enabled = true
sync.topic.acls.enabled = false
# Offset sync frequency
emit.checkpoints.interval.seconds = 60
emit.heartbeats.interval.seconds = 5
# Topic naming (default: adds source cluster name as prefix)
replication.policy.class = org.apache.kafka.connect.mirror.DefaultReplicationPolicy
# Or use IdentityReplicationPolicy to keep same topic names
# replication.policy.class = org.apache.kafka.connect.mirror.IdentityReplicationPolicy
# Exclude internal topics
topics.exclude = __consumer_offsets, __transaction_state, mm2-.*
# Enable exactly-once semantics
source->target.producer.enable.idempotence = true
Advanced Configuration with Bidirectional Replication:
# mm2-bidirectional.properties
# Define both clusters
clusters = us-west, us-east
us-west.bootstrap.servers = west-kafka1:9092,west-kafka2:9092
us-east.bootstrap.servers = east-kafka1:9092,east-kafka2:9092
# Enable bidirectional replication
us-west->us-east.enabled = true
us-east->us-west.enabled = true
# Replicate specific topics (avoid circular replication)
us-west->us-east.topics = local-orders.*, shared-events.*
us-east->us-west.topics = local-inventory.*, shared-events.*
# Exclude already replicated topics to prevent loops
us-west->us-east.topics.exclude = us-east\..*
us-east->us-west.topics.exclude = us-west\..*
# Checkpoint and heartbeat configuration
emit.checkpoints.interval.seconds = 30
emit.heartbeats.interval.seconds = 5
sync.group.offsets.enabled = true
sync.group.offsets.interval.seconds = 60
# Performance tuning
tasks.max = 4
replication.factor = 3
# Security configuration (if using SSL/SASL)
us-west.security.protocol = SASL_SSL
us-west.sasl.mechanism = PLAIN
us-west.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="mm2" password="secret";
us-east.security.protocol = SASL_SSL
us-east.sasl.mechanism = PLAIN
us-east.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="mm2" password="secret";
Running MirrorMaker 2:
# Start MirrorMaker 2 in dedicated mode (recommended for production)
connect-mirror-maker.sh mm2.properties
# Start MirrorMaker 2 in distributed mode (for scalability)
# Step 1: Start multiple Connect workers
connect-distributed.sh connect-distributed.properties
# Step 2: Submit MM2 configuration via REST API
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "mm2-source-connector",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "source",
"target.cluster.alias": "target",
"source.cluster.bootstrap.servers": "source-kafka:9092",
"target.cluster.bootstrap.servers": "target-kafka:9092",
"topics": "orders.*,payments.*",
"replication.factor": "3",
"tasks.max": "4"
}
}'
# Check connector status
curl http://localhost:8083/connectors/mm2-source-connector/status
# Monitor replication lag
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list target-kafka:9092 \
--topic source.orders --time -1
# Verify replicated topics
kafka-topics.sh --bootstrap-server target-kafka:9092 --list | grep "^source\."
Offset Translation for Failover:
# When failing over consumers to target cluster, translate offsets
# Step 1: Get current offsets from source cluster consumer group
kafka-consumer-groups.sh --bootstrap-server source-kafka:9092 \
--group order-processor --describe
# Step 2: Translate offsets to target cluster
# MM2 creates checkpoint topics with offset mappings
# Consumer reads from: target-cluster-name.checkpoints.internal
# Step 3: Configure consumer to read from replicated topic
# If using DefaultReplicationPolicy:
# Original topic: orders
# Replicated topic: source.orders
# Step 4: Consumer automatically uses translated offsets
# When consumer group ID matches, MM2 checkpoint connector
# synchronizes offsets to target cluster
# Manual offset translation example
kafka-console-consumer.sh \
--bootstrap-server target-kafka:9092 \
--topic source.orders \
--group order-processor \
--from-beginning
# MM2 will translate offsets automatically
Monitoring MirrorMaker 2:
# Check replication lag (MirrorMaker 2 JMX metrics)
# Replication latency in milliseconds
kafka.connect.mirror:type=MirrorSourceConnector,target=*,topic=*,partition=*
# Record count
jmx_query "kafka.connect.mirror:type=MirrorSourceConnector,task=*,name=record-count"
# Byte rate
jmx_query "kafka.connect.mirror:type=MirrorSourceConnector,task=*,name=byte-rate"
# Checkpoint lag
jmx_query "kafka.connect.mirror:type=MirrorCheckpointConnector,*"
# Heartbeat metrics
kafka-console-consumer.sh --bootstrap-server target-kafka:9092 \
--topic heartbeats \
--from-beginning --max-messages 10
# Check Connect worker status
curl http://localhost:8083/connectors
curl http://localhost:8083/connectors/mm2-source-connector/tasks
# View logs for errors
tail -f logs/connect.log | grep -i "mirror\|error"
Disaster Recovery Scenario:
# Scenario: Primary cluster (us-west) fails, failover to DR (us-east)
# Step 1: Verify replication is current (before failure)
# Check latest offset on source
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list us-west:9092 --topic orders --time -1
# Returns: orders:0:1000000
# Check replicated offset on target
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list us-east:9092 --topic us-west.orders --time -1
# Returns: us-west.orders:0:999950 (lag of 50 messages)
# Step 2: Primary cluster fails
# Redirect applications to DR cluster
# Step 3: Update consumer configuration
# Change bootstrap.servers to us-east:9092
# Change topic name from "orders" to "us-west.orders"
# Keep same group.id - offsets are already synchronized by MM2
# Step 4: Consumers resume from last committed offset
# MM2 checkpoint connector has synchronized offsets
# Step 5: Monitor consumption
kafka-consumer-groups.sh --bootstrap-server us-east:9092 \
--group order-processor --describe
# Step 6: When primary recovers, reverse replication
# Configure us-east->us-west replication
# Catch up primary cluster with DR data
# Eventually fail back to primary
Common Topologies:
# Topology 1: Active-Passive DR
# Primary: us-west, DR: us-east
clusters = primary, dr
primary->dr.enabled = true
primary->dr.topics = .*
# One-way replication for disaster recovery
# Topology 2: Active-Active (Multi-Region)
# Both regions serve traffic
clusters = us, eu
us->eu.enabled = true
eu->us.enabled = true
us->eu.topics = shared.*, us-originated.*
eu->us.topics = shared.*, eu-originated.*
# Bidirectional with namespace separation
# Topology 3: Hub-and-Spoke (Edge Aggregation)
# Multiple edge clusters -> central
clusters = edge1, edge2, central
edge1->central.enabled = true
edge2->central.enabled = true
edge1->central.topics = sensor-data.*
edge2->central.topics = sensor-data.*
# Aggregate data from edges to central cluster
# Topology 4: Fan-out (Distribution)
# Central -> Multiple regions
clusters = central, us-west, us-east, eu
central->us-west.enabled = true
central->us-east.enabled = true
central->eu.enabled = true
central->us-west.topics = global-config.*, reference-data.*
# Distribute reference data to all regions
References:
↑ Back to topUse Cases and Patterns
What is event-driven architecture and how does Kafka enable it?
The 30-Second Answer: Event-driven architecture (EDA) is a design pattern where services communicate by producing and consuming events rather than direct API calls. Kafka enables EDA by providing a durable, scalable event backbone that decouples producers from consumers, supports event replay, and guarantees ordered delivery within partitions, allowing systems to react to state changes asynchronously.
The 2-Minute Answer (If They Want More): Event-driven architecture is a software design paradigm where components communicate through events - significant state changes or occurrences in the system. Unlike request-response architectures where services directly call each other, EDA uses asynchronous message passing through an event broker. This creates loosely coupled systems where producers don't know or care who consumes their events.
Kafka is particularly well-suited for EDA because it provides several critical capabilities. First, its distributed, replicated architecture ensures events are durable and highly available. Second, its log-based storage allows consumers to replay historical events, enabling new services to build state from scratch or recover from failures. Third, partitioning with ordered delivery ensures that related events (same key) are processed in sequence while allowing parallel processing across partitions.
In an event-driven system built on Kafka, services emit domain events (OrderPlaced, PaymentProcessed, ShipmentDispatched) to topics. Multiple consumers can independently subscribe and react to these events without coupling. This enables powerful patterns like event choreography (services react to events autonomously) versus orchestration (central coordinator), CQRS (separate read/write models), and saga patterns for distributed transactions.
Key benefits include: improved scalability (services scale independently), resilience (failures don't cascade), flexibility (add new consumers without changing producers), and auditability (complete event history). Challenges include eventual consistency, event schema evolution, and increased operational complexity.
Kafka's features like consumer groups, stream processing with Kafka Streams, and schema management with Schema Registry make it a comprehensive platform for implementing production-grade event-driven architectures.
Code Example:
// Domain events
@Data
public class OrderPlacedEvent {
private String orderId;
private String customerId;
private List<OrderItem> items;
private BigDecimal totalAmount;
private Instant timestamp;
}
@Data
public class PaymentProcessedEvent {
private String paymentId;
private String orderId;
private BigDecimal amount;
private PaymentStatus status;
private Instant timestamp;
}
@Data
public class OrderShippedEvent {
private String shipmentId;
private String orderId;
private String trackingNumber;
private Instant shippedAt;
}
// Event producer - Order Service
public class OrderService {
private final KafkaProducer<String, OrderPlacedEvent> eventProducer;
private static final String TOPIC = "order-events";
public void placeOrder(Order order) {
// Save order to database
orderRepository.save(order);
// Emit event
OrderPlacedEvent event = new OrderPlacedEvent(
order.getId(),
order.getCustomerId(),
order.getItems(),
order.getTotalAmount(),
Instant.now()
);
ProducerRecord<String, OrderPlacedEvent> record =
new ProducerRecord<>(TOPIC, order.getId(), event);
eventProducer.send(record, (metadata, exception) -> {
if (exception != null) {
log.error("Failed to publish OrderPlacedEvent", exception);
// Implement outbox pattern or retry logic
} else {
log.info("OrderPlacedEvent published: offset={}, partition={}",
metadata.offset(), metadata.partition());
}
});
}
}
// Event consumer - Payment Service
@Service
public class PaymentEventConsumer {
private final PaymentProcessor paymentProcessor;
private final KafkaProducer<String, PaymentProcessedEvent> eventProducer;
@KafkaListener(topics = "order-events", groupId = "payment-service")
public void handleOrderPlaced(OrderPlacedEvent event) {
try {
// Process payment
Payment payment = paymentProcessor.processPayment(
event.getOrderId(),
event.getTotalAmount()
);
// Emit payment processed event
PaymentProcessedEvent paymentEvent = new PaymentProcessedEvent(
payment.getId(),
event.getOrderId(),
payment.getAmount(),
payment.getStatus(),
Instant.now()
);
ProducerRecord<String, PaymentProcessedEvent> record =
new ProducerRecord<>("payment-events", event.getOrderId(), paymentEvent);
eventProducer.send(record);
} catch (PaymentException e) {
log.error("Payment failed for order: " + event.getOrderId(), e);
// Publish payment failed event or retry
}
}
}
// Event consumer - Inventory Service
@Service
public class InventoryEventConsumer {
private final InventoryService inventoryService;
@KafkaListener(topics = "order-events", groupId = "inventory-service")
public void handleOrderPlaced(OrderPlacedEvent event) {
// Reserve inventory for order items
for (OrderItem item : event.getItems()) {
inventoryService.reserveStock(item.getProductId(), item.getQuantity());
}
log.info("Inventory reserved for order: {}", event.getOrderId());
}
}
// Event consumer - Notification Service
@Service
public class NotificationEventConsumer {
private final EmailService emailService;
@KafkaListener(topics = "order-events", groupId = "notification-service")
public void handleOrderPlaced(OrderPlacedEvent event) {
emailService.sendOrderConfirmation(event.getCustomerId(), event.getOrderId());
}
@KafkaListener(topics = "payment-events", groupId = "notification-service")
public void handlePaymentProcessed(PaymentProcessedEvent event) {
if (event.getStatus() == PaymentStatus.SUCCESS) {
emailService.sendPaymentConfirmation(event.getOrderId());
} else {
emailService.sendPaymentFailureNotification(event.getOrderId());
}
}
@KafkaListener(topics = "shipment-events", groupId = "notification-service")
public void handleOrderShipped(OrderShippedEvent event) {
emailService.sendShipmentNotification(event.getOrderId(), event.getTrackingNumber());
}
}
// Using Kafka Streams for event processing and correlation
public class OrderSagaProcessor {
public void buildOrderSagaTopology(StreamsBuilder builder) {
// Stream of order events
KStream<String, OrderPlacedEvent> orderEvents = builder.stream("order-events");
// Stream of payment events
KStream<String, PaymentProcessedEvent> paymentEvents = builder.stream("payment-events");
// Stream of shipment events
KStream<String, OrderShippedEvent> shipmentEvents = builder.stream("shipment-events");
// Join streams to track order saga state
KStream<String, OrderSagaState> orderSaga = orderEvents
.selectKey((k, v) -> v.getOrderId())
.mapValues(event -> OrderSagaState.fromOrderPlaced(event))
.outerJoin(
paymentEvents.selectKey((k, v) -> v.getOrderId()),
(saga, payment) -> saga.withPayment(payment),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(30))
)
.outerJoin(
shipmentEvents.selectKey((k, v) -> v.getOrderId()),
(saga, shipment) -> saga.withShipment(shipment),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofDays(7))
);
// Track completed orders
orderSaga
.filter((orderId, saga) -> saga.isComplete())
.to("completed-orders");
}
}
Mermaid Diagram:
flowchart TD
A[Order Service] -->|OrderPlacedEvent| B[Kafka: order-events]
B --> C[Payment Service]
B --> D[Inventory Service]
B --> E[Notification Service]
C -->|PaymentProcessedEvent| F[Kafka: payment-events]
F --> G[Fulfillment Service]
F --> E
D -->|InventoryReservedEvent| H[Kafka: inventory-events]
G -->|OrderShippedEvent| I[Kafka: shipment-events]
I --> E
I --> J[Analytics Service]
H --> K[Warehouse Service]
style B fill:#f9f,stroke:#333,stroke-width:2px
style F fill:#f9f,stroke:#333,stroke-width:2px
style H fill:#f9f,stroke:#333,stroke-width:2px
style I fill:#f9f,stroke:#333,stroke-width:2px
L[Event-Driven Architecture Flow]
L -.->|Async Communication| B
L -.->|Loose Coupling| C
L -.->|Independent Scaling| D
L -.->|Event Replay| J
Mermaid Diagram (Choreography vs Orchestration):
flowchart LR
subgraph Choreography["Event Choreography (Kafka)"]
A1[Order Created] --> B1[Order Topic]
B1 --> C1[Payment Service]
B1 --> D1[Inventory Service]
B1 --> E1[Notification Service]
C1 --> F1[Payment Topic]
F1 --> G1[Fulfillment Service]
D1 --> H1[Inventory Topic]
H1 --> I1[Warehouse Service]
end
subgraph Orchestration["Orchestration Pattern"]
A2[Order Created] --> B2[Order Orchestrator]
B2 -->|Call| C2[Payment Service]
B2 -->|Call| D2[Inventory Service]
B2 -->|Call| E2[Notification Service]
C2 -->|Response| B2
D2 -->|Response| B2
B2 -->|Call| F2[Fulfillment Service]
end
style B1 fill:#f9f,stroke:#333,stroke-width:2px
style F1 fill:#f9f,stroke:#333,stroke-width:2px
style H1 fill:#f9f,stroke:#333,stroke-width:2px
style B2 fill:#fbb,stroke:#333,stroke-width:2px
References:
↑ Back to topTransactions
What is read_committed vs read_uncommitted isolation level?
The 30-Second Answer:
read_uncommitted allows consumers to see all messages including those in open transactions, while read_committed only shows messages from committed transactions. Use read_committed when consuming from transactional producers to maintain exactly-once semantics; use read_uncommitted for non-transactional workloads or when you need maximum performance and can tolerate seeing messages that might be aborted.
The 2-Minute Answer (If They Want More):
Kafka's isolation levels control the visibility of messages to consumers, similar to database isolation levels. The isolation.level consumer configuration determines which messages are readable based on their transactional state.
With read_uncommitted (the default for backward compatibility), consumers see all messages in the order they're written to the log, regardless of transaction state. This includes messages in ongoing transactions and even messages from transactions that will later be aborted. It provides the lowest latency and highest throughput but can expose consumers to "dirty reads" that might disappear if the transaction aborts.
With read_committed, consumers only see messages from transactions that have been successfully committed. Messages from ongoing transactions are buffered by the consumer and only delivered once the transaction commits. If a transaction aborts, those messages are never exposed. This prevents dirty reads and is essential for maintaining exactly-once semantics in stream processing pipelines.
The tradeoff is latency: read_committed consumers experience additional delay equal to the transaction duration, as they must wait for transaction markers before delivering messages. For high-throughput transactional producers, this can buffer significant data in consumer memory. The consumer's fetch.max.bytes and max.partition.fetch.bytes settings may need adjustment to accommodate buffering.
Non-transactional messages (from non-transactional producers) are treated as committed immediately and are visible to both isolation levels. This allows mixing transactional and non-transactional producers on the same topic, though it's generally not recommended for applications requiring strong consistency guarantees.
Isolation Level Comparison:
| Aspect | read_uncommitted | read_committed |
|---|---|---|
| Visibility | All messages including in-flight transactions | Only committed messages |
| Dirty Reads | Possible - may see messages later aborted | Not possible - guaranteed committed data |
| Latency | Lowest - immediate message visibility | Higher - waits for transaction commit |
| Use Case | Non-transactional workloads, maximum throughput | Exactly-once semantics, consistency requirements |
| Memory Usage | Lower - no buffering needed | Higher - buffers uncommitted messages |
| Default | Yes (for backward compatibility) | No - must explicitly configure |
| Required For EOS | No | Yes - essential for exactly-once |
Code Example:
// Consumer with read_committed isolation
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("isolation.level", "read_committed"); // Only see committed messages
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
// Consumer with read_uncommitted isolation (default)
Properties unCommittedProps = new Properties();
unCommittedProps.put("bootstrap.servers", "localhost:9092");
unCommittedProps.put("group.id", "my-consumer-group-2");
unCommittedProps.put("isolation.level", "read_uncommitted"); // See all messages
unCommittedProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
unCommittedProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> uncommittedConsumer = new KafkaConsumer<>(unCommittedProps);
uncommittedConsumer.subscribe(Collections.singletonList("my-topic"));
Behavior Demonstration:
// Scenario: Transactional producer writes then aborts
KafkaProducer<String, String> producer = createTransactionalProducer();
producer.initTransactions();
// Two consumers with different isolation levels
KafkaConsumer<String, String> committedConsumer = createCommittedConsumer();
KafkaConsumer<String, String> uncommittedConsumer = createUncommittedConsumer();
// Producer starts transaction
producer.beginTransaction();
producer.send(new ProducerRecord<>("test-topic", "key1", "This message will be aborted"));
producer.flush();
// read_uncommitted consumer sees the message immediately
ConsumerRecords<String, String> uncommittedRecords =
uncommittedConsumer.poll(Duration.ofMillis(1000));
System.out.println("Uncommitted consumer sees: " + uncommittedRecords.count() + " messages");
// Output: Uncommitted consumer sees: 1 messages
// read_committed consumer sees nothing (transaction not committed)
ConsumerRecords<String, String> committedRecords =
committedConsumer.poll(Duration.ofMillis(1000));
System.out.println("Committed consumer sees: " + committedRecords.count() + " messages");
// Output: Committed consumer sees: 0 messages
// Producer aborts transaction
producer.abortTransaction();
// After abort, read_committed consumer still sees nothing
committedRecords = committedConsumer.poll(Duration.ofMillis(1000));
System.out.println("Committed consumer after abort: " + committedRecords.count() + " messages");
// Output: Committed consumer after abort: 0 messages
// read_uncommitted consumer saw the message, but it was later aborted
// (in practice, Kafka consumers don't "unsee" messages, but the message
// is marked as aborted in the log)
Mermaid Diagram:
flowchart TD
P[Transactional Producer] --> T[Transaction Start]
T --> M1[Write Message 1]
T --> M2[Write Message 2]
T --> M3[Write Message 3]
M1 --> D{Transaction Decision}
M2 --> D
M3 --> D
D -->|Commit| CM[Write COMMIT Marker]
D -->|Abort| AM[Write ABORT Marker]
CM --> RC[read_committed Consumer]
CM --> RU[read_uncommitted Consumer]
AM --> RU2[read_uncommitted Consumer]
M1 -.->|Visible Immediately| RU
M2 -.->|Visible Immediately| RU
M3 -.->|Visible Immediately| RU
RC -->|Only After Commit| V1[Messages Visible]
RU --> V2[All Messages Visible]
RU2 --> V3[Aborted Messages Visible<br/>but marked as aborted]
style CM fill:#9f9
style AM fill:#f99
style RC fill:#9cf
style RU fill:#fcf
References:
↑ Back to topTroubleshooting and Common Issues
What are the common causes of data loss in Kafka and how do you prevent them?
The 30-Second Answer:
Data loss occurs from insufficient replication, premature message deletion, improper producer acknowledgments, or broker failures before replication completes. Prevent it by using acks=all for producers, min.insync.replicas=2, replication factor of 3+, disabling unclean leader election, and ensuring proper log retention configuration.
The 2-Minute Answer (If They Want More):
Kafka is designed for durability, but misconfigurations can lead to data loss. Common scenarios include: producer using acks=0 or acks=1 (message lost if broker fails before replication), insufficient replication factor allowing data loss when brokers fail, unclean leader election where out-of-sync replicas become leaders, log retention deleting data too early, consumer offset commits before processing (losing messages on failure), and disk failures without proper RAID configuration.
Producer-level prevention requires setting acks=all (wait for all in-sync replicas to acknowledge), enabling enable.idempotence=true for automatic retries without duplicates, and setting appropriate retries and retry.backoff.ms for transient failures. Broker-level prevention includes setting min.insync.replicas=2 (require at least 2 replicas before accepting writes), using replication factor of 3 or higher, disabling unclean leader election with unclean.leader.election.enable=false, and configuring appropriate log retention policies (log.retention.hours, log.retention.bytes).
Infrastructure considerations include using RAID for broker disks to prevent data loss from disk failures, monitoring broker health and replacing failed brokers quickly, ensuring sufficient disk space to prevent log segment deletion, and using multiple availability zones for true fault tolerance. Consumer-level prevention involves committing offsets only after successful processing and using transactions for exactly-once semantics when writing to external systems.
Trade-offs exist between durability and performance. acks=all with min.insync.replicas=2 provides strong durability but increases latency. For truly critical data, use replication factor of 3, disable unclean leader election, and implement end-to-end monitoring to detect data loss early. Regular testing of disaster recovery procedures ensures your configurations work as expected.
Code Example:
# Producer configuration for maximum durability
# Wait for all in-sync replicas to acknowledge (required for no data loss)
acks=all
# Enable idempotence for automatic safe retries
enable.idempotence=true
# Retry on transient failures (safe with idempotence)
retries=2147483647
retry.backoff.ms=100
# Wait for brokers to respond (prevent silent failures)
request.timeout.ms=30000
delivery.timeout.ms=120000
# Maximum in-flight requests (5 with idempotence for throughput)
max.in.flight.requests.per.connection=5
# Broker/Topic configuration for durability
# Replication factor (3 recommended for production)
replication.factor=3
# Minimum in-sync replicas (2 prevents data loss with acks=all)
min.insync.replicas=2
# Disable unclean leader election (prevent out-of-sync replicas from becoming leaders)
unclean.leader.election.enable=false
# Log retention (prevent premature deletion)
log.retention.hours=168
log.retention.bytes=-1
# Flush settings (OS handles this, but for critical data)
log.flush.interval.messages=10000
log.flush.interval.ms=1000
# Create topic with proper durability settings
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic critical-data \
--partitions 6 \
--replication-factor 3 \
--config min.insync.replicas=2 \
--config unclean.leader.election.enable=false \
--config retention.ms=604800000
# Verify topic configuration
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic critical-data
# Monitor under-replicated partitions (indicates potential data loss risk)
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --under-replicated-partitions
# Check broker logs for replica sync issues
grep "Replica.*lag" /var/log/kafka/server.log
// Producer with maximum durability
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all"); // Wait for all in-sync replicas
props.put("enable.idempotence", "true"); // Prevent duplicates during retries
props.put("retries", Integer.MAX_VALUE); // Retry until success
props.put("max.in.flight.requests.per.connection", 5);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Handle send errors properly
producer.send(new ProducerRecord<>("critical-data", "key", "value"),
(metadata, exception) -> {
if (exception != null) {
log.error("Failed to send message", exception);
// Implement fallback: save to database, alert, etc.
} else {
log.info("Message sent to partition {} at offset {}",
metadata.partition(), metadata.offset());
}
});
// Consumer: Commit only after successful processing
props.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
try {
for (ConsumerRecord<String, String> record : records) {
processMessage(record); // Process first
}
consumer.commitSync(); // Commit only after success
} catch (Exception e) {
log.error("Processing failed, will retry on next poll", e);
// Do not commit - will reprocess on next poll
}
}
| Risk Factor | Impact | Prevention |
|---|---|---|
acks=0 or acks=1 |
Messages lost on broker failure | Use acks=all |
| Single replica | Total data loss if broker fails | Replication factor ≥ 3 |
min.insync.replicas=1 |
Accepts writes with single replica | Set to 2 or more |
| Unclean leader election | Out-of-sync replica becomes leader | Disable unclean election |
| Short retention | Data deleted before consumption | Configure adequate retention |
| Auto-commit before processing | Lost messages on consumer crash | Manual commit after processing |
| No disk redundancy | Data loss on disk failure | Use RAID, monitor disk health |
| Single availability zone | Total loss in zone failure | Multi-zone deployment |
References:
↑ Back to top