Docs For AI
Elk

Logstash

Server-side data processing pipeline for collecting, parsing, transforming, and forwarding data

Logstash

Logstash is a server-side data processing pipeline that ingests data from multiple sources, transforms it, and sends it to one or more destinations. It acts as the central processing hub in the ELK Stack.

Pipeline Architecture

Input → Filter → Output

┌──────────┐    ┌──────────────┐    ┌───────────┐
│  Beats   │    │    Grok      │    │Elasticsearch│
│  Kafka   │───▶│    Mutate    │───▶│   S3       │
│  HTTP    │    │    Date      │    │   Kafka    │
│  File    │    │    GeoIP     │    │   Stdout   │
└──────────┘    └──────────────┘    └───────────┘

Pipeline Configuration

Basic Pipeline

# /etc/logstash/conf.d/main.conf

input {
  beats {
    port => 5044
  }
}

filter {
  # Parse JSON logs
  if [message] =~ /^\{/ {
    json {
      source => "message"
    }
  }

  # Parse standard log format
  grok {
    match => {
      "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{DATA:service} - %{GREEDYDATA:msg}"
    }
    tag_on_failure => ["_grokparsefailure"]
  }

  # Parse timestamp
  date {
    match => ["timestamp", "ISO8601"]
    target => "@timestamp"
    remove_field => ["timestamp"]
  }

  # Add environment tag
  mutate {
    add_field => { "environment" => "production" }
    remove_field => ["agent", "ecs", "input"]
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "logs-%{[service]}-%{+YYYY.MM.dd}"
    user => "elastic"
    password => "${ES_PASSWORD}"
  }
}

Multi-Pipeline Configuration

# /etc/logstash/pipelines.yml
- pipeline.id: application-logs
  path.config: /etc/logstash/conf.d/app-logs.conf
  pipeline.workers: 4
  pipeline.batch.size: 250

- pipeline.id: access-logs
  path.config: /etc/logstash/conf.d/access-logs.conf
  pipeline.workers: 2
  pipeline.batch.size: 500

- pipeline.id: metrics
  path.config: /etc/logstash/conf.d/metrics.conf
  pipeline.workers: 2

Input Plugins

Beats Input

input {
  beats {
    port => 5044
    ssl_enabled => true
    ssl_certificate => "/etc/logstash/certs/logstash.crt"
    ssl_key => "/etc/logstash/certs/logstash.key"
  }
}

Kafka Input

input {
  kafka {
    bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
    topics => ["app-logs", "access-logs"]
    group_id => "logstash-consumers"
    codec => json
    consumer_threads => 4
    decorate_events => "basic"
    auto_offset_reset => "latest"
  }
}

HTTP Input

input {
  http {
    port => 8080
    codec => json
    additional_codecs => { "text/plain" => "plain" }
  }
}

Filter Plugins

Grok (Log Parsing)

filter {
  # Apache access log
  grok {
    match => {
      "message" => '%{IPORHOST:client_ip} - %{DATA:user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status} %{NUMBER:bytes}'
    }
  }

  # Nginx error log
  grok {
    match => {
      "message" => "%{DATESTAMP:timestamp} \[%{WORD:level}\] %{POSINT:pid}#%{NUMBER}: %{GREEDYDATA:error_message}"
    }
  }

  # Custom application log
  grok {
    match => {
      "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] %{LOGLEVEL:level} %{NOTSPACE:logger} - %{GREEDYDATA:msg}"
    }
  }
}

Data Transformation

filter {
  # Rename and remove fields
  mutate {
    rename => { "hostname" => "host.name" }
    remove_field => ["agent", "ecs", "@version"]
    convert => { "status" => "integer" }
    gsub => ["message", "\r\n", "\n"]
    lowercase => ["level"]
  }

  # Conditional processing
  if [level] == "error" or [level] == "fatal" {
    mutate {
      add_tag => ["alert"]
      add_field => { "severity" => "high" }
    }
  }

  # GeoIP enrichment
  if [client_ip] {
    geoip {
      source => "client_ip"
      target => "geo"
      fields => ["city_name", "country_name", "location"]
    }
  }

  # User-Agent parsing
  if [user_agent] {
    useragent {
      source => "user_agent"
      target => "ua"
    }
  }

  # Calculate response time category
  if [duration_ms] {
    ruby {
      code => '
        duration = event.get("duration_ms").to_f
        category = case duration
                   when 0..100 then "fast"
                   when 100..500 then "normal"
                   when 500..2000 then "slow"
                   else "critical"
                   end
        event.set("response_category", category)
      '
    }
  }

  # Drop unwanted events
  if [level] == "debug" and [environment] == "production" {
    drop {}
  }
}

Multiline Codec (Stack Traces)

input {
  beats {
    port => 5044
    codec => multiline {
      pattern => "^%{TIMESTAMP_ISO8601}"
      negate => true
      what => "previous"
    }
  }
}

Output Plugins

Elasticsearch Output

output {
  elasticsearch {
    hosts => ["https://es-node-1:9200", "https://es-node-2:9200"]
    index => "logs-%{[service]}-%{+YYYY.MM.dd}"
    user => "logstash_writer"
    password => "${ES_PASSWORD}"
    ssl_enabled => true
    ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]

    # Template management
    manage_template => true
    template_name => "logs"
    template_overwrite => true
  }
}

Conditional Routing

output {
  # Errors to dedicated index
  if "alert" in [tags] {
    elasticsearch {
      hosts => ["http://elasticsearch:9200"]
      index => "alerts-%{+YYYY.MM.dd}"
    }
  }

  # All logs to main index
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "logs-%{+YYYY.MM.dd}"
  }

  # High-severity to PagerDuty
  if [severity] == "high" {
    http {
      url => "https://events.pagerduty.com/v2/enqueue"
      http_method => "post"
      format => "json"
      mapping => {
        "routing_key" => "YOUR_ROUTING_KEY"
        "event_action" => "trigger"
        "payload" => {
          "summary" => "%{message}"
          "severity" => "critical"
          "source" => "%{host}"
        }
      }
    }
  }
}

Performance Tuning

SettingDefaultRecommendationImpact
pipeline.workersCPU coresMatch CPU coresParallelism for filter/output
pipeline.batch.size125250-500Larger batch = higher throughput
pipeline.batch.delay50ms50-250msWait time to fill a batch
queue.typememorypersisted (for durability)Disk-backed queue survives restarts
queue.max_bytes1GB4-8GBPersistent queue capacity

JVM Settings

# /etc/logstash/jvm.options
-Xms2g
-Xmx2g
# Set equal; typically 1/4 of system RAM, max 4-8GB

Monitoring Logstash

Key Metrics

MetricHealthyWarning
Events in per secondStable rateSudden drop
Events out per secondMatches events inLag growing
Pipeline workers busy< 80%100% sustained
Queue backpressure0Growing
JVM heap usage< 75%> 85%
DLQ size0Growing
# Check pipeline stats
curl -s localhost:9600/_node/stats/pipelines | jq .

# Check JVM metrics
curl -s localhost:9600/_node/stats/jvm | jq .

# Check hot threads
curl -s localhost:9600/_node/hot_threads

Dead Letter Queue

When events fail to process, they go to the Dead Letter Queue (DLQ) for later reprocessing.

# Enable DLQ in logstash.yml
# dead_letter_queue.enable: true
# dead_letter_queue.max_bytes: 1gb

# Reprocess DLQ events
input {
  dead_letter_queue {
    path => "/var/logstash/data/dead_letter_queue"
    commit_offsets => true
    pipeline_id => "main"
  }
}

filter {
  # Fix or transform failed events
  mutate {
    remove_field => ["[dead_letter_queue_metadata]"]
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "recovered-%{+YYYY.MM.dd}"
  }
}

On this page