Skip to content

Distributed log processing system demonstrating real-world observability architecture with Go, Kafka, Python, and Grafana.

License

Notifications You must be signed in to change notification settings

GauthamKrM/distributed-log-pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

18 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Distributed Log Aggregation & Analysis Pipeline

Go Python Apache Kafka Elasticsearch MinIO Prometheus Grafana Docker

Production-grade distributed log processing system implementing a real-world observability architecture with Go collectors, Kafka streaming, Python processors, and comprehensive monitoring.


Overview

This project implements a scalable and fault-tolerant pipeline for aggregating, processing, and analyzing logs from distributed application services. The goal is to build a system that mimics real-world logging stacks like ELK/EFK at scale, providing robust and observable infrastructure for handling high-volume log data.

The core architecture follows a decoupled, streaming model where logs are collected, pushed into a durable message queue, processed asynchronously, and finally stored for analysis and long-term archival.


Architecture

┌──────────────────────────────────────────────────────────────────────────────────────┐
│                              MONITORING & VISUALIZATION                              │
│                                                                                      │
│  ┌─────────────────────┐                    ┌──────────────────────┐                 │
│  │     Grafana         │◄───────────────────│    Prometheus        │                 │
│  │   Dashboards        │                    │   Metrics Store      │                 │
│  │    :3000            │                    │      :9090           │                 │
│  └─────────────────────┘                    └──────────▲───────────┘                 │
│                                                        │                             │
└────────────────────────────────────────────────────────┼─────────────────────────────┘
                                                         │
                                                         │─────────────────────────┐
                                                         │                         │
                                                         │                         │
┌────────────────────────────────────────────────────────┼─────────────────────────┼───┐
│                                                        │                         │   │
│                        DATA PROCESSING PIPELINE        │                         │   │
│                                                        │                         │   │
│  ┌──────────────┐       ┌──────────────────┐           │                         │   │
│  │              │       │                  │           │                         │   │
│  │ App Services │──────►│   Collector      │           │                         │   │
│  │              │       │      (Go)        │           │                         │   │
│  └──────────────┘       │     :8081        │           │                         │   │
│                         └────────┬─────────┘           │                         │   │
│                                  │                     │                         │   │
│                                  │ metrics             │                         │   │
│                                  ├─────────────────────┘                         │   │
│                                  │                                               │   │
│                                  ▼                                               │   │
│                         ┌─────────────────┐                                      │   │
│                         │                 │                                      │   │
│                         │     Kafka       │                                      │   │
│                         │   Message Bus   │                                      │   │
│                         │     :9092       │                                      │   │
│                         └────────┬────────┘                                      │   │
│                                  │                                               │   │
│                  ┌───────────────┴────────────────┐                              │   │
│                  │                                │                              │   │
│                  ▼                                ▼                              │   │
│         ┌──────────────────┐              ┌─────────────────┐                    │   │
│         │   Processor      │              │    Archiver     │                    │   │
│         │     (Python)     │              │    (Python)     │                    │   │
│         │     :8001        │              │     :8002       │                    │   │
│         │                  │              │                 │                    │   │
│         │  ┌───────────┐   │              │  ┌───────────┐  │                    │   │
│         │  │ Replica 1 │   │              │  │ Replica 1 │  │                    │   │
│         │  └─────┬─────┘   │              │  └─────┬─────┘  │                    │   │
│         │  ┌─────┴─────┐   │              │  ┌─────┴─────┐  │                    │   │
│         │  │ Replica 2 │   │              │  │ Replica 2 │  │                    │   │
│         │  └─────┬─────┘   │              │  └─────┬─────┘  │                    │   │
│         │  ┌─────┴─────┐   │              │  ┌─────┴─────┐  │                    │   │
│         │  │ Replica n │   │              │  │ Replica n │  │                    │   │
│         │  └───────────┘   │              │  └───────────┘  │                    │   │
│         └────────┬─────────┘              └────────┬────────┘                    │   │
│                  │                                 │                             │   │
│                  │ metrics                         │ metrics                     │   │
│                  ├─────────────────────────────────┼─────────────────────────────┘   │
│                  │                                 │                                 │
│                  ▼                                 ▼                                 │
│         ┌──────────────────┐            ┌──────────────────┐                         │
│         │                  │            │                  │                         │
│         │ Elasticsearch    │            │      MinIO       │                         │
│         │  Search/Index    │            │   (S3 Storage)   │                         │
│         │     :9200        │            │      :9000       │                         │
│         └──────────────────┘            └──────────────────┘                         │
│                                                                                      │
└──────────────────────────────────────────────────────────────────────────────────────┘

Legend:
  ──►  Data Flow
  ───  Metrics Export (to Prometheus)
  
Flow Description:
  1. App Services send data to Collector (Go service on port 8081)
  2. Collector forwards messages to Kafka broker (port 9092)
  3. Kafka distributes messages to two consumer groups:
     a. Processor (Python, port 8001) → indexes data into Elasticsearch (port 9200)
     b. Archiver (Python, port 8002) → stores data in MinIO S3 (port 9000)
  4. Collector, Processor, and Archiver export metrics to Prometheus (port 9090)
  5. Grafana (port 3000) visualizes metrics from Prometheus
  
Scalability:
  - Processor and Archiver run with n replicas for high throughput
  - Kafka enables parallel processing across consumer groups

(This diagram illustrates the flow of logs and metrics through the various components of the pipeline.)


Data Flow

App Services → Collector → Kafka → [Processor → Elasticsearch] + [Archiver → MinIO]

  1. Applications send structured logs to the Go collector
  2. Collector batches and publishes logs to Kafka topics
  3. Processor consumers parse, enrich, and index logs to Elasticsearch
  4. Archiver consumers batch and store raw logs to MinIO for long-term retention
  5. Prometheus scrapes metrics from all components
  6. Grafana visualizes logs, metrics, and system health

Key Components

Collector (Go)

A high-performance log ingestion agent that:

  • Accepts structured JSON logs via HTTP (port 8081)
  • Batches and compresses payloads for efficiency
  • Publishes to Kafka with retry logic and backpressure handling
  • Exposes Prometheus metrics (port 8080)

Kafka Cluster

Central streaming backbone providing:

  • Durable, ordered log streams via app-logs topic
  • Decoupling between producers and consumers
  • 7-day retention policy for streaming data
  • Horizontal scalability through partitioning
  • Consumer group coordination for parallel processing

Processor (Python)

Scalable stream processor (3 replicas) that:

  • Validates and parses log events
  • Enriches with GeoIP location data
  • Writes processed logs to Elasticsearch with ILM policies
  • Routes failed events to dead-letter queue (DLQ)
  • Exposes metrics on port 8001

Archiver (Python)

Dedicated archival service (3 replicas) that:

  • Consumes logs from Kafka in configurable batches
  • Compresses and uploads to MinIO/S3
  • Implements partitioned storage: s3://logs-archive/YYYY/MM/DD/
  • Provides cost-effective long-term log retention
  • Exposes metrics on port 8002

Storage Layer

Elasticsearch (Hot Storage)

  • Full-text search and log analytics
  • Index Lifecycle Management (ILM): hot → warm → cold
  • Optimized mappings for log metadata

MinIO (Cold Storage)

  • S3-compatible object storage
  • Compressed raw log archives
  • Time-partitioned prefixes for efficient retrieval
  • Cost-effective long-term retention

Observability Stack

Prometheus

  • Scrapes metrics from all pipeline components
  • Tracks ingestion rates, latencies, errors, and consumer lag
  • 15-second scrape interval

Grafana

  • Unified dashboards for logs and metrics
  • Pre-configured datasources and dashboards
  • Real-time monitoring and alerting
  • Default credentials: admin/admin

Tech Stack

Layer Technology Purpose
Collector Go High-performance concurrent log ingestion
Streaming Apache Kafka 7.9.4 Durable message queue with ordering guarantees
Processing Python + confluent-kafka Stream processing with rich ecosystem
Search Elasticsearch 8.19.5 Full-text indexing and log analytics
Archive MinIO S3-compatible object storage
Metrics Prometheus Time-series metrics collection
Visualization Grafana Dashboards and alerting
Orchestration Docker Compose Container orchestration

Data Schema

Example structured log event:

{
  "timestamp": "2025-10-30T12:34:56.789Z",
  "service": "webapp-frontend",
  "level": "INFO",
  "message": "User logged in successfully",
  "trace_id": "trace-abc123",
  "request_id": "req-xyz789",
  "host": "app-node-1",
  "payload": {
    "user_id": "user-9876",
    "ip_address": "8.8.8.8",
    "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/108.0.0.0"
  },
  "labels": {
    "env": "production",
    "region": "ap-south-1"
  }
}

Getting Started

Prerequisites

  • Docker and Docker Compose
  • Ports available: 2181, 3000, 8001, 8002, 8080, 8081, 9000, 9001, 9090, 9092, 9200

Quick Start

  1. Clone the repository
git clone https://github.com/GauthamKrM/distributed-log-pipeline
cd distributed-log-pipeline
  1. Start the pipeline
docker-compose up -d
  1. Verify services are running
docker-compose ps
  1. Send test logs to collector
curl -X POST http://localhost:8081/logs \
  -H "Content-Type: application/json" \
  -d '{
    "service": "test-app",
    "level": "INFO",
    "message": "Test log message",
    "timestamp": "2025-10-30T12:00:00Z"
  }'
  1. Access dashboards

Metrics & Monitoring

Exposed Metrics

Collector (Go)

  • logs_produced_total - Total logs sent to Kafka
  • producer_errors_total - Failed publish attempts
  • produce_latency_seconds - Kafka publish latency

Processor (Python)

  • ingested_events_total - Logs consumed from Kafka
  • processing_latency_seconds - Processing time per log
  • es_write_success_total - Successful ES writes
  • es_write_errors_total - Failed ES writes
  • dlq_events_total - Events routed to DLQ

Archiver (Python)

  • archiver_messages_consumed_total - Messages consumed
  • archiver_messages_archived_total - Messages archived
  • archiver_batches_uploaded_total - Batches uploaded to S3
  • archiver_archive_latency_seconds - Archive operation latency
  • archiver_batch_size_bytes - Batch size in bytes

Design Decisions

Reliability & Fault Tolerance

  • At-least-once delivery: Kafka consumer offsets committed after successful processing
  • Dead Letter Queue: Failed messages routed to dlq topic for inspection
  • Retry logic: Configurable retry attempts (max 5) with exponential backoff
  • Health checks: All services include readiness and liveness probes

Scalability

  • Horizontal scaling: Processor and Archiver run with 3 replicas each
  • Partitioning strategy: Kafka topics partitioned by service
  • Consumer groups: Enable parallel processing across replicas
  • Resource limits: 512MB memory per processor/archiver instance

Storage Strategy

  • Hot path: Elasticsearch with ILM for recent, searchable logs
  • Cold path: MinIO for compressed, long-term archives
  • Retention: 7 days in Kafka, 30-90 days in Elasticsearch, unlimited in S3
  • Compression: Gzip for archived logs to reduce storage costs

Configuration

Environment Variables

The default .env values are pre-configured to make the setup work out of the box.
However, for custom deployments or production environments, the following key environment variables can be overridden:

Collector

  • KAFKA_BROKERS: Kafka bootstrap servers (default: kafka:9092)

Processor

  • KAFKA_BOOTSTRAP_SERVERS: Kafka connection
  • ES_HOSTS: Elasticsearch endpoint
  • ES_INDEX_ALIAS: Index alias for writes (default: log-write)
  • GEOIP_DB_PATH: GeoIP database location
  • METRICS_PORT: Prometheus metrics port (default: 8001)

Archiver

  • KAFKA_BOOTSTRAP_SERVERS: Kafka connection
  • KAFKA_INPUT_TOPIC: Source topic (default: app-logs)
  • S3_ENDPOINT: MinIO/S3 endpoint
  • S3_BUCKET: Archive bucket name
  • BATCH_SIZE: Messages per batch (default: 100)
  • BATCH_TIMEOUT_SECONDS: Max wait time for batch (default: 60)
  • METRICS_PORT: Prometheus metrics port (default: 8002)

Monitoring & Alerts

Grafana dashboards track:

  • Log ingestion rate and volume
  • Processing latency (p50, p95, p99)
  • Error rates and DLQ growth
  • Kafka consumer lag
  • Elasticsearch indexing performance
  • Archive upload success rate
  • System resource utilization

TODO

  • Schema registry (Avro/Protobuf) for versioned log schemas
  • Kibana integration for advanced log exploration
  • Advanced alerting rules and runbooks
  • API gateway with rate limiting
  • TLS encryption for Kafka, Elasticsearch, and HTTP endpoints
  • SASL authentication for Kafka brokers
  • Basic auth for Elasticsearch and Grafana
  • Log sanitization to prevent PII leaks
  • RBAC for Grafana dashboards

License

This project is licensed under the Apache-2.0 License. See the LICENSE file for details.


About

Distributed log processing system demonstrating real-world observability architecture with Go, Kafka, Python, and Grafana.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published