Apache Kafka for Real-time Event Streaming

Event-driven architecture has become the backbone of modern distributed systems. At Agoda, we process millions of booking events daily, and at CP Axtra, we handle real-time inventory updates across thousands of retail locations. Here’s how Apache Kafka enabled us to build resilient, scalable event streaming platforms.

The Business Case

Agoda: Booking Platform

  • 50M+ bookings per month
  • Real-time inventory updates
  • Multi-region deployment
  • 99.99% availability requirement

CP Axtra: Retail Operations

  • 10,000+ stores across Thailand
  • Real-time inventory synchronization
  • POS system integration
  • Supply chain optimization

Architecture Overview

┌─────────────┐    ┌──────────────┐    ┌─────────────┐
│   Producer  │───▶│    Kafka     │───▶│  Consumer   │
│  Services   │    │   Cluster    │    │  Services   │
└─────────────┘    └──────────────┘    └─────────────┘
                           │
                   ┌──────────────┐
                   │   Schema     │
                   │   Registry   │
                   └──────────────┘

Implementation Details

Topic Design Strategy

# Booking Events
booking-events:
  partitions: 12
  replication-factor: 3
  retention: 30 days
  
# Inventory Events  
inventory-events:
  partitions: 24
  replication-factor: 3
  retention: 7 days
  
# Payment Events
payment-events:
  partitions: 6
  replication-factor: 3
  retention: 90 days

Producer Configuration

@Configuration
public class KafkaProducerConfig {
    
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        
        // Performance optimizations
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        
        return new DefaultKafkaProducerFactory<>(props);
    }
}

Consumer Implementation

@KafkaListener(topics = "booking-events", groupId = "booking-processor")
public void handleBookingEvent(
    @Payload BookingEvent event,
    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
    @Header(KafkaHeaders.OFFSET) long offset) {
    
    try {
        log.info("Processing booking event: {} from partition: {}, offset: {}", 
                 event.getBookingId(), partition, offset);
                 
        // Process the event
        bookingService.processBookingEvent(event);
        
        // Update metrics
        meterRegistry.counter("booking.events.processed",
                             "type", event.getEventType()).increment();
                             
    } catch (Exception e) {
        log.error("Failed to process booking event: {}", event.getBookingId(), e);
        
        // Send to dead letter queue
        deadLetterService.send(event, e.getMessage());
    }
}

Schema Evolution with Avro

{
  "type": "record",
  "name": "BookingEvent",
  "namespace": "com.agoda.events",
  "fields": [
    {"name": "bookingId", "type": "string"},
    {"name": "customerId", "type": "string"},
    {"name": "hotelId", "type": "string"},
    {"name": "checkIn", "type": "long", "logicalType": "timestamp-millis"},
    {"name": "checkOut", "type": "long", "logicalType": "timestamp-millis"},
    {"name": "totalAmount", "type": "double"},
    {"name": "currency", "type": "string"},
    {"name": "status", "type": {"type": "enum", "name": "BookingStatus", 
                                "symbols": ["PENDING", "CONFIRMED", "CANCELLED"]}},
    {"name": "metadata", "type": ["null", "string"], "default": null}
  ]
}

Monitoring and Observability

Key Metrics

@Component
public class KafkaMetrics {
    
    private final MeterRegistry meterRegistry;
    
    @EventListener
    public void handleProducerMetrics(ProducerMetricEvent event) {
        Gauge.builder("kafka.producer.batch.size.avg")
             .register(meterRegistry, event::getBatchSizeAvg);
             
        Gauge.builder("kafka.producer.record.send.rate")
             .register(meterRegistry, event::getRecordSendRate);
    }
    
    @EventListener  
    public void handleConsumerMetrics(ConsumerMetricEvent event) {
        Gauge.builder("kafka.consumer.lag.max")
             .register(meterRegistry, event::getLagMax);
             
        Gauge.builder("kafka.consumer.records.consumed.rate")
             .register(meterRegistry, event::getRecordsConsumedRate);
    }
}

Alerting Rules

# High Consumer Lag
- alert: KafkaHighConsumerLag
  expr: kafka_consumer_lag_max > 10000
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "High consumer lag detected"
    
# Producer Error Rate
- alert: KafkaHighProducerErrorRate  
  expr: rate(kafka_producer_record_error_total[5m]) > 0.01
  for: 2m
  labels:
    severity: critical

Performance Results

Before Kafka Implementation

  • Message processing: 5,000 msg/sec
  • Latency: 500ms average
  • Downtime: 2-3 hours/month
  • Data loss: Occasional during failures

After Kafka Implementation

  • Message processing: 100,000 msg/sec
  • Latency: 50ms average
  • Downtime: 0 hours/month
  • Data loss: Zero

Best Practices Learned

1. Partition Strategy

// Good: Distribute load evenly
String partitionKey = customerId + ":" + region;

// Bad: Creates hot partitions  
String partitionKey = "all-events";

2. Error Handling

@RetryableTopic(
    attempts = "3",
    backoff = @Backoff(delay = 1000, multiplier = 2.0),
    dltStrategy = DltStrategy.FAIL_ON_ERROR
)
@KafkaListener(topics = "booking-events")
public void processBooking(BookingEvent event) {
    // Process with automatic retry and DLT
}

3. Exactly-Once Semantics

@Transactional
@KafkaListener(topics = "payment-events")
public void processPayment(PaymentEvent event) {
    // Database update and Kafka produce in same transaction
    paymentRepository.updateStatus(event.getPaymentId(), COMPLETED);
    kafkaTemplate.send("payment-completed", event);
}

What’s Next?

In the next post, I’ll share our payment system optimization journey and how we achieved 30% performance improvement through strategic caching and database optimization.


Interested in event streaming architecture? Let’s connect on LinkedIn to discuss your use case.