Production-grade distributed log processing system implementing a real-world observability architecture with Go collectors, Kafka streaming, Python processors, and comprehensive monitoring.
This project implements a scalable and fault-tolerant pipeline for aggregating, processing, and analyzing logs from distributed application services. The goal is to build a system that mimics real-world logging stacks like ELK/EFK at scale, providing robust and observable infrastructure for handling high-volume log data.
The core architecture follows a decoupled, streaming model where logs are collected, pushed into a durable message queue, processed asynchronously, and finally stored for analysis and long-term archival.
┌──────────────────────────────────────────────────────────────────────────────────────┐
│ MONITORING & VISUALIZATION │
│ │
│ ┌─────────────────────┐ ┌──────────────────────┐ │
│ │ Grafana │◄───────────────────│ Prometheus │ │
│ │ Dashboards │ │ Metrics Store │ │
│ │ :3000 │ │ :9090 │ │
│ └─────────────────────┘ └──────────▲───────────┘ │
│ │ │
└────────────────────────────────────────────────────────┼─────────────────────────────┘
│
│─────────────────────────┐
│ │
│ │
┌────────────────────────────────────────────────────────┼─────────────────────────┼───┐
│ │ │ │
│ DATA PROCESSING PIPELINE │ │ │
│ │ │ │
│ ┌──────────────┐ ┌──────────────────┐ │ │ │
│ │ │ │ │ │ │ │
│ │ App Services │──────►│ Collector │ │ │ │
│ │ │ │ (Go) │ │ │ │
│ └──────────────┘ │ :8081 │ │ │ │
│ └────────┬─────────┘ │ │ │
│ │ │ │ │
│ │ metrics │ │ │
│ ├─────────────────────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌─────────────────┐ │ │
│ │ │ │ │
│ │ Kafka │ │ │
│ │ Message Bus │ │ │
│ │ :9092 │ │ │
│ └────────┬────────┘ │ │
│ │ │ │
│ ┌───────────────┴────────────────┐ │ │
│ │ │ │ │
│ ▼ ▼ │ │
│ ┌──────────────────┐ ┌─────────────────┐ │ │
│ │ Processor │ │ Archiver │ │ │
│ │ (Python) │ │ (Python) │ │ │
│ │ :8001 │ │ :8002 │ │ │
│ │ │ │ │ │ │
│ │ ┌───────────┐ │ │ ┌───────────┐ │ │ │
│ │ │ Replica 1 │ │ │ │ Replica 1 │ │ │ │
│ │ └─────┬─────┘ │ │ └─────┬─────┘ │ │ │
│ │ ┌─────┴─────┐ │ │ ┌─────┴─────┐ │ │ │
│ │ │ Replica 2 │ │ │ │ Replica 2 │ │ │ │
│ │ └─────┬─────┘ │ │ └─────┬─────┘ │ │ │
│ │ ┌─────┴─────┐ │ │ ┌─────┴─────┐ │ │ │
│ │ │ Replica n │ │ │ │ Replica n │ │ │ │
│ │ └───────────┘ │ │ └───────────┘ │ │ │
│ └────────┬─────────┘ └────────┬────────┘ │ │
│ │ │ │ │
│ │ metrics │ metrics │ │
│ ├─────────────────────────────────┼─────────────────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ │ │ │ │
│ │ Elasticsearch │ │ MinIO │ │
│ │ Search/Index │ │ (S3 Storage) │ │
│ │ :9200 │ │ :9000 │ │
│ └──────────────────┘ └──────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────────────────────┘
Legend:
──► Data Flow
─── Metrics Export (to Prometheus)
Flow Description:
1. App Services send data to Collector (Go service on port 8081)
2. Collector forwards messages to Kafka broker (port 9092)
3. Kafka distributes messages to two consumer groups:
a. Processor (Python, port 8001) → indexes data into Elasticsearch (port 9200)
b. Archiver (Python, port 8002) → stores data in MinIO S3 (port 9000)
4. Collector, Processor, and Archiver export metrics to Prometheus (port 9090)
5. Grafana (port 3000) visualizes metrics from Prometheus
Scalability:
- Processor and Archiver run with n replicas for high throughput
- Kafka enables parallel processing across consumer groups
(This diagram illustrates the flow of logs and metrics through the various components of the pipeline.)
App Services → Collector → Kafka → [Processor → Elasticsearch] + [Archiver → MinIO]
- Applications send structured logs to the Go collector
- Collector batches and publishes logs to Kafka topics
- Processor consumers parse, enrich, and index logs to Elasticsearch
- Archiver consumers batch and store raw logs to MinIO for long-term retention
- Prometheus scrapes metrics from all components
- Grafana visualizes logs, metrics, and system health
A high-performance log ingestion agent that:
- Accepts structured JSON logs via HTTP (port 8081)
- Batches and compresses payloads for efficiency
- Publishes to Kafka with retry logic and backpressure handling
- Exposes Prometheus metrics (port 8080)
Central streaming backbone providing:
- Durable, ordered log streams via
app-logstopic - Decoupling between producers and consumers
- 7-day retention policy for streaming data
- Horizontal scalability through partitioning
- Consumer group coordination for parallel processing
Scalable stream processor (3 replicas) that:
- Validates and parses log events
- Enriches with GeoIP location data
- Writes processed logs to Elasticsearch with ILM policies
- Routes failed events to dead-letter queue (DLQ)
- Exposes metrics on port 8001
Dedicated archival service (3 replicas) that:
- Consumes logs from Kafka in configurable batches
- Compresses and uploads to MinIO/S3
- Implements partitioned storage:
s3://logs-archive/YYYY/MM/DD/ - Provides cost-effective long-term log retention
- Exposes metrics on port 8002
Elasticsearch (Hot Storage)
- Full-text search and log analytics
- Index Lifecycle Management (ILM): hot → warm → cold
- Optimized mappings for log metadata
MinIO (Cold Storage)
- S3-compatible object storage
- Compressed raw log archives
- Time-partitioned prefixes for efficient retrieval
- Cost-effective long-term retention
Prometheus
- Scrapes metrics from all pipeline components
- Tracks ingestion rates, latencies, errors, and consumer lag
- 15-second scrape interval
Grafana
- Unified dashboards for logs and metrics
- Pre-configured datasources and dashboards
- Real-time monitoring and alerting
- Default credentials:
admin/admin
| Layer | Technology | Purpose |
|---|---|---|
| Collector | Go | High-performance concurrent log ingestion |
| Streaming | Apache Kafka 7.9.4 | Durable message queue with ordering guarantees |
| Processing | Python + confluent-kafka | Stream processing with rich ecosystem |
| Search | Elasticsearch 8.19.5 | Full-text indexing and log analytics |
| Archive | MinIO | S3-compatible object storage |
| Metrics | Prometheus | Time-series metrics collection |
| Visualization | Grafana | Dashboards and alerting |
| Orchestration | Docker Compose | Container orchestration |
Example structured log event:
{
"timestamp": "2025-10-30T12:34:56.789Z",
"service": "webapp-frontend",
"level": "INFO",
"message": "User logged in successfully",
"trace_id": "trace-abc123",
"request_id": "req-xyz789",
"host": "app-node-1",
"payload": {
"user_id": "user-9876",
"ip_address": "8.8.8.8",
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/108.0.0.0"
},
"labels": {
"env": "production",
"region": "ap-south-1"
}
}- Docker and Docker Compose
- Ports available: 2181, 3000, 8001, 8002, 8080, 8081, 9000, 9001, 9090, 9092, 9200
- Clone the repository
git clone https://github.com/GauthamKrM/distributed-log-pipeline
cd distributed-log-pipeline- Start the pipeline
docker-compose up -d- Verify services are running
docker-compose ps- Send test logs to collector
curl -X POST http://localhost:8081/logs \
-H "Content-Type: application/json" \
-d '{
"service": "test-app",
"level": "INFO",
"message": "Test log message",
"timestamp": "2025-10-30T12:00:00Z"
}'- Access dashboards
- Grafana: http://localhost:3000 (admin/admin)
- Prometheus: http://localhost:9090
- Elasticsearch: http://localhost:9200
- MinIO Console: http://localhost:9001 (minioadmin/minioadmin)
Collector (Go)
logs_produced_total- Total logs sent to Kafkaproducer_errors_total- Failed publish attemptsproduce_latency_seconds- Kafka publish latency
Processor (Python)
ingested_events_total- Logs consumed from Kafkaprocessing_latency_seconds- Processing time per loges_write_success_total- Successful ES writeses_write_errors_total- Failed ES writesdlq_events_total- Events routed to DLQ
Archiver (Python)
archiver_messages_consumed_total- Messages consumedarchiver_messages_archived_total- Messages archivedarchiver_batches_uploaded_total- Batches uploaded to S3archiver_archive_latency_seconds- Archive operation latencyarchiver_batch_size_bytes- Batch size in bytes
- At-least-once delivery: Kafka consumer offsets committed after successful processing
- Dead Letter Queue: Failed messages routed to
dlqtopic for inspection - Retry logic: Configurable retry attempts (max 5) with exponential backoff
- Health checks: All services include readiness and liveness probes
- Horizontal scaling: Processor and Archiver run with 3 replicas each
- Partitioning strategy: Kafka topics partitioned by service
- Consumer groups: Enable parallel processing across replicas
- Resource limits: 512MB memory per processor/archiver instance
- Hot path: Elasticsearch with ILM for recent, searchable logs
- Cold path: MinIO for compressed, long-term archives
- Retention: 7 days in Kafka, 30-90 days in Elasticsearch, unlimited in S3
- Compression: Gzip for archived logs to reduce storage costs
The default .env values are pre-configured to make the setup work out of the box.
However, for custom deployments or production environments, the following key environment variables can be overridden:
Collector
KAFKA_BROKERS: Kafka bootstrap servers (default:kafka:9092)
Processor
KAFKA_BOOTSTRAP_SERVERS: Kafka connectionES_HOSTS: Elasticsearch endpointES_INDEX_ALIAS: Index alias for writes (default:log-write)GEOIP_DB_PATH: GeoIP database locationMETRICS_PORT: Prometheus metrics port (default:8001)
Archiver
KAFKA_BOOTSTRAP_SERVERS: Kafka connectionKAFKA_INPUT_TOPIC: Source topic (default:app-logs)S3_ENDPOINT: MinIO/S3 endpointS3_BUCKET: Archive bucket nameBATCH_SIZE: Messages per batch (default:100)BATCH_TIMEOUT_SECONDS: Max wait time for batch (default:60)METRICS_PORT: Prometheus metrics port (default:8002)
Grafana dashboards track:
- Log ingestion rate and volume
- Processing latency (p50, p95, p99)
- Error rates and DLQ growth
- Kafka consumer lag
- Elasticsearch indexing performance
- Archive upload success rate
- System resource utilization
- Schema registry (Avro/Protobuf) for versioned log schemas
- Kibana integration for advanced log exploration
- Advanced alerting rules and runbooks
- API gateway with rate limiting
- TLS encryption for Kafka, Elasticsearch, and HTTP endpoints
- SASL authentication for Kafka brokers
- Basic auth for Elasticsearch and Grafana
- Log sanitization to prevent PII leaks
- RBAC for Grafana dashboards
This project is licensed under the Apache-2.0 License. See the LICENSE file for details.