When your CEO asks why conversion rates dropped 15% overnight and you’re staring at yesterday’s batch reports, you realize real-time analytics isn’t a luxury - it’s survival. After our third “we need this data NOW” emergency, I convinced leadership to let me rebuild our analytics pipeline from scratch.
Why Batch Processing Was Killing Us
Our old system was embarrassingly slow:
- 6+ hours to generate basic reports
- Executives refreshing dashboards that showed yesterday’s problems
- No way to catch issues before they became disasters
- Manual ETL jobs failing at 3 AM (and nobody knowing until morning)
When Black Friday traffic spiked and we couldn’t tell if our site was handling it or melting down, I knew we had to fix this immediately.
Architecture Overview
High-Level Data Flow
Web/Mobile Apps → Kafka → Logstash → Elasticsearch → Kibana
↓
Stream Processing
↓
Real-time Alerts
Detailed System Architecture
from kafka import KafkaProducer, KafkaConsumer
from elasticsearch import Elasticsearch
import json
from typing import Dict, List
import logging
from datetime import datetime
class ClickstreamEvent:
def __init__(self, user_id: str, session_id: str, event_type: str,
page_url: str, timestamp: datetime, metadata: Dict):
self.user_id = user_id
self.session_id = session_id
self.event_type = event_type
self.page_url = page_url
self.timestamp = timestamp
self.metadata = metadata
def to_dict(self) -> Dict:
return {
'user_id': self.user_id,
'session_id': self.session_id,
'event_type': self.event_type,
'page_url': self.page_url,
'@timestamp': self.timestamp.isoformat(),
'metadata': self.metadata,
'date': self.timestamp.date().isoformat(),
'hour': self.timestamp.hour,
'day_of_week': self.timestamp.weekday()
}
class ClickstreamProducer:
def __init__(self, kafka_servers: List[str], topic: str):
self.producer = KafkaProducer(
bootstrap_servers=kafka_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all', # Wait for all replicas
retries=3,
batch_size=16384,
linger_ms=10, # Small batching for low latency
compression_type='snappy'
)
self.topic = topic
def send_event(self, event: ClickstreamEvent):
"""Send clickstream event to Kafka"""
try:
# Use user_id as key for consistent partitioning
future = self.producer.send(
self.topic,
value=event.to_dict(),
key=event.user_id
)
# Optional: wait for confirmation (blocks)
# record_metadata = future.get(timeout=10)
except Exception as e:
logging.error(f"Failed to send event: {e}")
# Implement dead letter queue or retry logic
Kafka Configuration for High Throughput
Topic Configuration
# Create topic with optimal settings for clickstream data
kafka-topics.sh --create \
--topic clickstream-events \
--bootstrap-server kafka:9092 \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config compression.type=snappy \
--config max.message.bytes=1048576 \
--config segment.ms=86400000
Producer Optimization
class OptimizedKafkaProducer:
def __init__(self, kafka_servers: List[str]):
self.producer = KafkaProducer(
bootstrap_servers=kafka_servers,
# Serialization
value_serializer=lambda v: json.dumps(v, separators=(',', ':')).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
# Reliability settings
acks='all',
retries=3,
max_in_flight_requests_per_connection=5,
enable_idempotence=True,
# Performance settings
batch_size=32768, # 32KB batches
linger_ms=5, # Low latency
compression_type='snappy',
buffer_memory=67108864, # 64MB buffer
# Timeout settings
request_timeout_ms=30000,
delivery_timeout_ms=120000
)
def send_batch(self, events: List[ClickstreamEvent]):
"""Send batch of events efficiently"""
futures = []
for event in events:
future = self.producer.send(
'clickstream-events',
value=event.to_dict(),
key=event.user_id
)
futures.append(future)
# Flush to ensure delivery
self.producer.flush()
# Check for failures
failed_events = []
for i, future in enumerate(futures):
try:
record_metadata = future.get(timeout=1)
except Exception as e:
logging.error(f"Failed to send event {i}: {e}")
failed_events.append(events[i])
return failed_events
Logstash Pipeline Configuration
Input Configuration
# /etc/logstash/conf.d/clickstream.conf
input {
kafka {
bootstrap_servers => ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
topics => ["clickstream-events"]
group_id => "logstash-clickstream"
consumer_threads => 4
decorate_events => true
codec => json
}
}
filter {
# Parse timestamp
date {
match => [ "@timestamp", "ISO8601" ]
}
# Extract URL components
if [page_url] {
grok {
match => {
"page_url" => "https?://[^/]+(?<url_path>/[^?]*)?(?:\?(?<query_params>.*))?(?:#(?<fragment>.*))?"
}
}
}
# Parse user agent
if [metadata][user_agent] {
useragent {
source => "[metadata][user_agent]"
target => "user_agent_parsed"
}
}
# GeoIP lookup
if [metadata][ip_address] {
geoip {
source => "[metadata][ip_address]"
target => "geoip"
}
}
# Add derived fields
ruby {
code => "
event.set('processing_time', Time.now.to_f - Time.parse(event.get('@timestamp')).to_f)
# Session duration calculation (requires state management)
user_id = event.get('user_id')
session_id = event.get('session_id')
# Custom business logic
if event.get('event_type') == 'purchase'
event.set('is_conversion', true)
event.set('revenue', event.get('metadata')['amount'] || 0)
end
"
}
# Remove sensitive fields
mutate {
remove_field => ["[metadata][ip_address]"]
}
}
output {
elasticsearch {
hosts => ["elasticsearch1:9200", "elasticsearch2:9200", "elasticsearch3:9200"]
index => "clickstream-%{+YYYY.MM.dd}"
template_name => "clickstream"
template_pattern => "clickstream-*"
template => "/etc/logstash/templates/clickstream-template.json"
# Performance settings
workers => 4
flush_size => 1000
idle_flush_time => 1
}
# Also send to real-time processing
http {
url => "http://stream-processor:8080/events"
http_method => "post"
format => "json"
pool_max => 50
}
}
Elasticsearch Optimization
Index Template
{
"index_patterns": ["clickstream-*"],
"settings": {
"number_of_shards": 6,
"number_of_replicas": 1,
"refresh_interval": "5s",
"index.codec": "best_compression",
"index.max_result_window": 50000,
"index.lifecycle.name": "clickstream-policy",
"index.lifecycle.rollover_alias": "clickstream"
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"user_id": {
"type": "keyword",
"store": true
},
"session_id": {
"type": "keyword"
},
"event_type": {
"type": "keyword",
"store": true
},
"page_url": {
"type": "text",
"analyzer": "url_analyzer",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 2048
}
}
},
"url_path": {
"type": "keyword",
"store": true
},
"query_params": {
"type": "text",
"index": false
},
"user_agent_parsed": {
"properties": {
"name": {"type": "keyword"},
"version": {"type": "keyword"},
"os": {"type": "keyword"},
"device": {"type": "keyword"}
}
},
"geoip": {
"properties": {
"country_name": {"type": "keyword"},
"city_name": {"type": "keyword"},
"location": {"type": "geo_point"}
}
},
"is_conversion": {
"type": "boolean"
},
"revenue": {
"type": "double"
},
"processing_time": {
"type": "double"
}
}
}
}
Custom Analyzers
{
"settings": {
"analysis": {
"analyzer": {
"url_analyzer": {
"type": "custom",
"tokenizer": "uax_url_email",
"filter": ["lowercase", "stop"]
}
}
}
}
}
Index Lifecycle Management
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "5GB",
"max_age": "1d",
"max_docs": 10000000
},
"set_priority": {
"priority": 100
}
}
},
"warm": {
"min_age": "2d",
"actions": {
"allocate": {
"number_of_replicas": 0
},
"forcemerge": {
"max_num_segments": 1
},
"set_priority": {
"priority": 50
}
}
},
"cold": {
"min_age": "7d",
"actions": {
"allocate": {
"include": {
"box_type": "cold"
}
},
"set_priority": {
"priority": 0
}
}
},
"delete": {
"min_age": "30d",
"actions": {
"delete": {}
}
}
}
}
}
Real-Time Stream Processing
Event Aggregation Engine
from collections import defaultdict, deque
import threading
import time
from typing import Dict, List, Callable
import redis
class StreamProcessor:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.aggregators = {}
self.window_size = 300 # 5 minutes
self.slide_interval = 60 # 1 minute
self.event_buffer = deque()
self.running = False
def register_aggregator(self, name: str, aggregator_func: Callable):
"""Register custom aggregation function"""
self.aggregators[name] = aggregator_func
def process_event(self, event: Dict):
"""Process incoming event and update aggregations"""
current_time = time.time()
# Add to event buffer
self.event_buffer.append({
**event,
'processing_timestamp': current_time
})
# Remove old events outside window
cutoff_time = current_time - self.window_size
while self.event_buffer and self.event_buffer[0]['processing_timestamp'] < cutoff_time:
self.event_buffer.popleft()
# Update all aggregators
for name, aggregator in self.aggregators.items():
try:
result = aggregator(list(self.event_buffer), current_time)
self.store_aggregation(name, result, current_time)
except Exception as e:
logging.error(f"Aggregator {name} failed: {e}")
def store_aggregation(self, aggregator_name: str, result: Dict, timestamp: float):
"""Store aggregation result in Redis for real-time access"""
key = f"agg:{aggregator_name}:{int(timestamp // self.slide_interval)}"
# Store with TTL
self.redis.hset(key, mapping=result)
self.redis.expire(key, self.window_size * 2) # Keep longer than window
# Also store latest result
latest_key = f"latest:agg:{aggregator_name}"
self.redis.hset(latest_key, mapping={
**result,
'timestamp': timestamp
})
# Custom aggregation functions
def page_view_aggregator(events: List[Dict], current_time: float) -> Dict:
"""Aggregate page view statistics"""
page_views = defaultdict(int)
unique_users = defaultdict(set)
browsers = defaultdict(int)
for event in events:
if event.get('event_type') == 'page_view':
page = event.get('url_path', 'unknown')
user_id = event.get('user_id')
browser = event.get('user_agent_parsed', {}).get('name', 'unknown')
page_views[page] += 1
unique_users[page].add(user_id)
browsers[browser] += 1
# Convert sets to counts
unique_user_counts = {page: len(users) for page, users in unique_users.items()}
return {
'total_page_views': sum(page_views.values()),
'unique_pages': len(page_views),
'top_pages': dict(sorted(page_views.items(), key=lambda x: x[1], reverse=True)[:10]),
'unique_users_by_page': unique_user_counts,
'browser_distribution': dict(browsers),
'window_start': current_time - 300,
'window_end': current_time
}
def conversion_funnel_aggregator(events: List[Dict], current_time: float) -> Dict:
"""Track conversion funnel metrics"""
funnel_steps = {
'landing': 0,
'product_view': 0,
'add_to_cart': 0,
'checkout_start': 0,
'purchase': 0
}
user_journeys = defaultdict(set)
for event in events:
event_type = event.get('event_type')
user_id = event.get('user_id')
if event_type == 'page_view' and event.get('url_path', '').startswith('/product'):
funnel_steps['product_view'] += 1
user_journeys[user_id].add('product_view')
elif event_type == 'add_to_cart':
funnel_steps['add_to_cart'] += 1
user_journeys[user_id].add('add_to_cart')
elif event_type == 'checkout_start':
funnel_steps['checkout_start'] += 1
user_journeys[user_id].add('checkout_start')
elif event_type == 'purchase':
funnel_steps['purchase'] += 1
user_journeys[user_id].add('purchase')
# Calculate conversion rates
total_users = len(user_journeys)
conversion_rates = {}
if total_users > 0:
for step in funnel_steps:
users_in_step = len([uid for uid, journey in user_journeys.items() if step in journey])
conversion_rates[f"{step}_rate"] = users_in_step / total_users
return {
'funnel_counts': funnel_steps,
'conversion_rates': conversion_rates,
'total_users_in_funnel': total_users
}
Kibana Dashboard Configuration
Real-Time Dashboard
{
"version": "7.15.0",
"objects": [
{
"id": "realtime-clickstream-dashboard",
"type": "dashboard",
"attributes": {
"title": "Real-time Clickstream Analytics",
"hits": 0,
"description": "Live view of user interactions and business metrics",
"panelsJSON": "[{\"version\":\"7.15.0\",\"panelIndex\":\"1\",\"panelRefName\":\"panel_1\",\"embeddableConfig\":{\"title\":\"Events per Minute\"}},{\"version\":\"7.15.0\",\"panelIndex\":\"2\",\"panelRefName\":\"panel_2\",\"embeddableConfig\":{\"title\":\"Top Pages\"}}]",
"optionsJSON": "{\"useMargins\":true,\"syncColors\":false,\"hidePanelTitles\":false}",
"timeRestore": false,
"refreshInterval": {
"pause": false,
"value": 5000
}
}
}
]
}
Custom Visualizations
class KibanaVisualizationManager:
def __init__(self, elasticsearch_client):
self.es = elasticsearch_client
def create_real_time_metrics_visualization(self):
"""Create visualization showing real-time metrics"""
vis_config = {
"title": "Real-time Event Metrics",
"type": "line",
"params": {
"grid": {"categoryLines": False, "style": {"color": "#eee"}},
"categoryAxes": [{
"id": "CategoryAxis-1",
"type": "category",
"position": "bottom",
"show": True,
"style": {},
"scale": {"type": "linear"},
"labels": {"show": True, "truncate": 100},
"title": {}
}],
"valueAxes": [{
"id": "ValueAxis-1",
"name": "LeftAxis-1",
"type": "value",
"position": "left",
"show": True,
"style": {},
"scale": {"type": "linear", "mode": "normal"},
"labels": {"show": True, "rotate": 0, "filter": False, "truncate": 100},
"title": {"text": "Events per minute"}
}],
"seriesParams": [{
"show": "true",
"type": "line",
"mode": "normal",
"data": {"label": "Events", "id": "1"},
"valueAxis": "ValueAxis-1",
"drawLinesBetweenPoints": True,
"showCircles": True
}],
"addTooltip": True,
"addLegend": True,
"legendPosition": "right",
"times": [],
"addTimeMarker": False
},
"aggs": [{
"id": "1",
"type": "count",
"schema": "metric",
"params": {}
}, {
"id": "2",
"type": "date_histogram",
"schema": "segment",
"params": {
"field": "@timestamp",
"interval": "auto",
"customInterval": "2h",
"min_doc_count": 1,
"extended_bounds": {}
}
}]
}
return vis_config
def create_geographic_heatmap(self):
"""Create geographic visualization of user activity"""
return {
"title": "User Activity Heatmap",
"type": "tile_map",
"params": {
"colorSchema": "Yellow to Red",
"mapType": "Scaled Circle Markers",
"isDesaturated": True,
"addTooltip": True,
"heatClusterSize": 1.5,
"legendPosition": "bottomright",
"mapZoom": 2,
"mapCenter": [0, 0],
"wms": {
"enabled": False,
"url": "",
"options": {
"format": "image/png",
"transparent": True
}
}
},
"aggs": [{
"id": "1",
"type": "count",
"schema": "metric",
"params": {}
}, {
"id": "2",
"type": "geohash_grid",
"schema": "segment",
"params": {
"field": "geoip.location",
"autoPrecision": True,
"precision": 2,
"useGeocentroid": True
}
}]
}
Monitoring and Alerting
Performance Monitoring
import psutil
from elasticsearch import Elasticsearch
from prometheus_client import Gauge, Counter, Histogram
class ELKStackMonitor:
def __init__(self, es_client: Elasticsearch, kafka_client):
self.es = es_client
self.kafka = kafka_client
# Prometheus metrics
self.events_processed = Counter(
'clickstream_events_processed_total',
'Total events processed',
['stage', 'status']
)
self.processing_latency = Histogram(
'clickstream_processing_latency_seconds',
'Event processing latency',
['stage']
)
self.elasticsearch_query_time = Histogram(
'elasticsearch_query_duration_seconds',
'Elasticsearch query duration',
['index_pattern', 'query_type']
)
self.kafka_lag = Gauge(
'kafka_consumer_lag',
'Kafka consumer lag',
['topic', 'partition', 'consumer_group']
)
def check_elasticsearch_health(self) -> Dict:
"""Monitor Elasticsearch cluster health"""
health = self.es.cluster.health()
stats = self.es.cluster.stats()
return {
'cluster_status': health['status'],
'number_of_nodes': health['number_of_nodes'],
'active_primary_shards': health['active_primary_shards'],
'active_shards': health['active_shards'],
'relocating_shards': health['relocating_shards'],
'initializing_shards': health['initializing_shards'],
'unassigned_shards': health['unassigned_shards'],
'disk_usage_percent': self.get_disk_usage_percent(stats),
'memory_usage_percent': self.get_memory_usage_percent(stats)
}
def check_kafka_health(self) -> Dict:
"""Monitor Kafka cluster health"""
# Get consumer group lag
consumer_groups = self.kafka.admin_client.describe_consumer_groups(['logstash-clickstream'])
lag_info = {}
for topic_partition, offset_metadata in consumer_groups.items():
lag = offset_metadata.high_water_mark - offset_metadata.offset
lag_info[f"{topic_partition.topic}_{topic_partition.partition}"] = lag
# Update Prometheus metric
self.kafka_lag.labels(
topic=topic_partition.topic,
partition=topic_partition.partition,
consumer_group='logstash-clickstream'
).set(lag)
return {
'consumer_lag': lag_info,
'total_lag': sum(lag_info.values())
}
def check_logstash_health(self) -> Dict:
"""Monitor Logstash performance"""
# Query Logstash monitoring API
import requests
try:
response = requests.get('http://logstash:9600/_node/stats')
stats = response.json()
return {
'events_in_rate': stats['events']['in'],
'events_out_rate': stats['events']['out'],
'events_filtered_rate': stats['events']['filtered'],
'pipeline_workers': stats['pipeline']['workers'],
'pipeline_batch_size': stats['pipeline']['batch_size'],
'jvm_heap_used_percent': stats['jvm']['mem']['heap_used_percent']
}
except Exception as e:
logging.error(f"Failed to get Logstash stats: {e}")
return {}
Alerting Rules
# Prometheus alerting rules
groups:
- name: elk-stack-alerts
rules:
# High event processing latency
- alert: HighProcessingLatency
expr: histogram_quantile(0.95, clickstream_processing_latency_seconds) > 5
for: 2m
labels:
severity: warning
annotations:
summary: "High event processing latency detected"
description: "95th percentile processing latency is s"
# Elasticsearch cluster health issues
- alert: ElasticsearchClusterRed
expr: elasticsearch_cluster_status != 0
for: 1m
labels:
severity: critical
annotations:
summary: "Elasticsearch cluster health is RED"
# High Kafka consumer lag
- alert: HighKafkaConsumerLag
expr: kafka_consumer_lag > 1000000
for: 5m
labels:
severity: warning
annotations:
summary: "High Kafka consumer lag detected"
description: "Consumer lag is messages"
# Low event ingestion rate
- alert: LowEventIngestionRate
expr: rate(clickstream_events_processed_total[5m]) < 100
for: 10m
labels:
severity: warning
annotations:
summary: "Low event ingestion rate"
description: "Event ingestion rate is events/second"
What I Learned Building This
Everything breaks, plan for it: Our first version had single points of failure everywhere. Spent way too many weekends fixing cascading failures before I learned to build redundancy from the start.
Generic configs don’t scale: I initially used default Elasticsearch settings and wondered why performance sucked. Tuning for our specific query patterns made a 10x difference.
Monitor obsessively: I can’t count how many production issues we caught through monitoring before users noticed. If you can’t measure it, you can’t fix it.
Perfect consistency is the enemy of fast insights: Sometimes you need to know that conversion rates are dropping NOW, even if the exact number might be off by 0.1%.
Garbage in, real-time garbage out: Bad data flowing at high velocity is worse than good data that’s delayed. We learned to validate early in the pipeline.
Future Enhancements
- Machine Learning Integration: Automated anomaly detection and predictive analytics
- Multi-datacenter Setup: Geographic distribution for global low-latency access
- Schema Evolution: Dynamic schema management for changing event structures
- Cost Optimization: Intelligent data tiering based on query patterns
Building this system taught me that real-time isn’t just about speed - it’s about making data actionable when it matters most. The technical stack (ELK) was important, but the real win was understanding our business needs and optimizing the entire pipeline for our specific use cases.
Now when something goes wrong, we know about it in seconds instead of hours. And more importantly, we can actually do something about it while it still matters.