Apache Kafka for Real-time Event Streaming
Event-driven architecture has become the backbone of modern distributed systems. At Agoda, we process millions of booking events daily, and at CP Axtra, we handle real-time inventory updates across thousands of retail locations. Here’s how Apache Kafka enabled us to build resilient, scalable event streaming platforms.
The Business Case
Agoda: Booking Platform
- 50M+ bookings per month
- Real-time inventory updates
- Multi-region deployment
- 99.99% availability requirement
CP Axtra: Retail Operations
- 10,000+ stores across Thailand
- Real-time inventory synchronization
- POS system integration
- Supply chain optimization
Architecture Overview
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Producer │───▶│ Kafka │───▶│ Consumer │
│ Services │ │ Cluster │ │ Services │
└─────────────┘ └──────────────┘ └─────────────┘
│
┌──────────────┐
│ Schema │
│ Registry │
└──────────────┘
Implementation Details
Topic Design Strategy
# Booking Events
booking-events:
partitions: 12
replication-factor: 3
retention: 30 days
# Inventory Events
inventory-events:
partitions: 24
replication-factor: 3
retention: 7 days
# Payment Events
payment-events:
partitions: 6
replication-factor: 3
retention: 90 days
Producer Configuration
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// Performance optimizations
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
return new DefaultKafkaProducerFactory<>(props);
}
}
Consumer Implementation
@KafkaListener(topics = "booking-events", groupId = "booking-processor")
public void handleBookingEvent(
@Payload BookingEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset) {
try {
log.info("Processing booking event: {} from partition: {}, offset: {}",
event.getBookingId(), partition, offset);
// Process the event
bookingService.processBookingEvent(event);
// Update metrics
meterRegistry.counter("booking.events.processed",
"type", event.getEventType()).increment();
} catch (Exception e) {
log.error("Failed to process booking event: {}", event.getBookingId(), e);
// Send to dead letter queue
deadLetterService.send(event, e.getMessage());
}
}
Schema Evolution with Avro
{
"type": "record",
"name": "BookingEvent",
"namespace": "com.agoda.events",
"fields": [
{"name": "bookingId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "hotelId", "type": "string"},
{"name": "checkIn", "type": "long", "logicalType": "timestamp-millis"},
{"name": "checkOut", "type": "long", "logicalType": "timestamp-millis"},
{"name": "totalAmount", "type": "double"},
{"name": "currency", "type": "string"},
{"name": "status", "type": {"type": "enum", "name": "BookingStatus",
"symbols": ["PENDING", "CONFIRMED", "CANCELLED"]}},
{"name": "metadata", "type": ["null", "string"], "default": null}
]
}
Monitoring and Observability
Key Metrics
@Component
public class KafkaMetrics {
private final MeterRegistry meterRegistry;
@EventListener
public void handleProducerMetrics(ProducerMetricEvent event) {
Gauge.builder("kafka.producer.batch.size.avg")
.register(meterRegistry, event::getBatchSizeAvg);
Gauge.builder("kafka.producer.record.send.rate")
.register(meterRegistry, event::getRecordSendRate);
}
@EventListener
public void handleConsumerMetrics(ConsumerMetricEvent event) {
Gauge.builder("kafka.consumer.lag.max")
.register(meterRegistry, event::getLagMax);
Gauge.builder("kafka.consumer.records.consumed.rate")
.register(meterRegistry, event::getRecordsConsumedRate);
}
}
Alerting Rules
# High Consumer Lag
- alert: KafkaHighConsumerLag
expr: kafka_consumer_lag_max > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "High consumer lag detected"
# Producer Error Rate
- alert: KafkaHighProducerErrorRate
expr: rate(kafka_producer_record_error_total[5m]) > 0.01
for: 2m
labels:
severity: critical
Performance Results
Before Kafka Implementation
- Message processing: 5,000 msg/sec
- Latency: 500ms average
- Downtime: 2-3 hours/month
- Data loss: Occasional during failures
After Kafka Implementation
- Message processing: 100,000 msg/sec
- Latency: 50ms average
- Downtime: 0 hours/month
- Data loss: Zero
Best Practices Learned
1. Partition Strategy
// Good: Distribute load evenly
String partitionKey = customerId + ":" + region;
// Bad: Creates hot partitions
String partitionKey = "all-events";
2. Error Handling
@RetryableTopic(
attempts = "3",
backoff = @Backoff(delay = 1000, multiplier = 2.0),
dltStrategy = DltStrategy.FAIL_ON_ERROR
)
@KafkaListener(topics = "booking-events")
public void processBooking(BookingEvent event) {
// Process with automatic retry and DLT
}
3. Exactly-Once Semantics
@Transactional
@KafkaListener(topics = "payment-events")
public void processPayment(PaymentEvent event) {
// Database update and Kafka produce in same transaction
paymentRepository.updateStatus(event.getPaymentId(), COMPLETED);
kafkaTemplate.send("payment-completed", event);
}
What’s Next?
In the next post, I’ll share our payment system optimization journey and how we achieved 30% performance improvement through strategic caching and database optimization.
Interested in event streaming architecture? Let’s connect on LinkedIn to discuss your use case.