Elk
Logstash
Server-side data processing pipeline for collecting, parsing, transforming, and forwarding data
Logstash
Logstash is a server-side data processing pipeline that ingests data from multiple sources, transforms it, and sends it to one or more destinations. It acts as the central processing hub in the ELK Stack.
Pipeline Architecture
Input → Filter → Output
┌──────────┐ ┌──────────────┐ ┌───────────┐
│ Beats │ │ Grok │ │Elasticsearch│
│ Kafka │───▶│ Mutate │───▶│ S3 │
│ HTTP │ │ Date │ │ Kafka │
│ File │ │ GeoIP │ │ Stdout │
└──────────┘ └──────────────┘ └───────────┘Pipeline Configuration
Basic Pipeline
# /etc/logstash/conf.d/main.conf
input {
beats {
port => 5044
}
}
filter {
# Parse JSON logs
if [message] =~ /^\{/ {
json {
source => "message"
}
}
# Parse standard log format
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{DATA:service} - %{GREEDYDATA:msg}"
}
tag_on_failure => ["_grokparsefailure"]
}
# Parse timestamp
date {
match => ["timestamp", "ISO8601"]
target => "@timestamp"
remove_field => ["timestamp"]
}
# Add environment tag
mutate {
add_field => { "environment" => "production" }
remove_field => ["agent", "ecs", "input"]
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs-%{[service]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "${ES_PASSWORD}"
}
}Multi-Pipeline Configuration
# /etc/logstash/pipelines.yml
- pipeline.id: application-logs
path.config: /etc/logstash/conf.d/app-logs.conf
pipeline.workers: 4
pipeline.batch.size: 250
- pipeline.id: access-logs
path.config: /etc/logstash/conf.d/access-logs.conf
pipeline.workers: 2
pipeline.batch.size: 500
- pipeline.id: metrics
path.config: /etc/logstash/conf.d/metrics.conf
pipeline.workers: 2Input Plugins
Beats Input
input {
beats {
port => 5044
ssl_enabled => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
}
}Kafka Input
input {
kafka {
bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
topics => ["app-logs", "access-logs"]
group_id => "logstash-consumers"
codec => json
consumer_threads => 4
decorate_events => "basic"
auto_offset_reset => "latest"
}
}HTTP Input
input {
http {
port => 8080
codec => json
additional_codecs => { "text/plain" => "plain" }
}
}Filter Plugins
Grok (Log Parsing)
filter {
# Apache access log
grok {
match => {
"message" => '%{IPORHOST:client_ip} - %{DATA:user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status} %{NUMBER:bytes}'
}
}
# Nginx error log
grok {
match => {
"message" => "%{DATESTAMP:timestamp} \[%{WORD:level}\] %{POSINT:pid}#%{NUMBER}: %{GREEDYDATA:error_message}"
}
}
# Custom application log
grok {
match => {
"message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] %{LOGLEVEL:level} %{NOTSPACE:logger} - %{GREEDYDATA:msg}"
}
}
}Data Transformation
filter {
# Rename and remove fields
mutate {
rename => { "hostname" => "host.name" }
remove_field => ["agent", "ecs", "@version"]
convert => { "status" => "integer" }
gsub => ["message", "\r\n", "\n"]
lowercase => ["level"]
}
# Conditional processing
if [level] == "error" or [level] == "fatal" {
mutate {
add_tag => ["alert"]
add_field => { "severity" => "high" }
}
}
# GeoIP enrichment
if [client_ip] {
geoip {
source => "client_ip"
target => "geo"
fields => ["city_name", "country_name", "location"]
}
}
# User-Agent parsing
if [user_agent] {
useragent {
source => "user_agent"
target => "ua"
}
}
# Calculate response time category
if [duration_ms] {
ruby {
code => '
duration = event.get("duration_ms").to_f
category = case duration
when 0..100 then "fast"
when 100..500 then "normal"
when 500..2000 then "slow"
else "critical"
end
event.set("response_category", category)
'
}
}
# Drop unwanted events
if [level] == "debug" and [environment] == "production" {
drop {}
}
}Multiline Codec (Stack Traces)
input {
beats {
port => 5044
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what => "previous"
}
}
}Output Plugins
Elasticsearch Output
output {
elasticsearch {
hosts => ["https://es-node-1:9200", "https://es-node-2:9200"]
index => "logs-%{[service]}-%{+YYYY.MM.dd}"
user => "logstash_writer"
password => "${ES_PASSWORD}"
ssl_enabled => true
ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
# Template management
manage_template => true
template_name => "logs"
template_overwrite => true
}
}Conditional Routing
output {
# Errors to dedicated index
if "alert" in [tags] {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "alerts-%{+YYYY.MM.dd}"
}
}
# All logs to main index
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
# High-severity to PagerDuty
if [severity] == "high" {
http {
url => "https://events.pagerduty.com/v2/enqueue"
http_method => "post"
format => "json"
mapping => {
"routing_key" => "YOUR_ROUTING_KEY"
"event_action" => "trigger"
"payload" => {
"summary" => "%{message}"
"severity" => "critical"
"source" => "%{host}"
}
}
}
}
}Performance Tuning
| Setting | Default | Recommendation | Impact |
|---|---|---|---|
pipeline.workers | CPU cores | Match CPU cores | Parallelism for filter/output |
pipeline.batch.size | 125 | 250-500 | Larger batch = higher throughput |
pipeline.batch.delay | 50ms | 50-250ms | Wait time to fill a batch |
queue.type | memory | persisted (for durability) | Disk-backed queue survives restarts |
queue.max_bytes | 1GB | 4-8GB | Persistent queue capacity |
JVM Settings
# /etc/logstash/jvm.options
-Xms2g
-Xmx2g
# Set equal; typically 1/4 of system RAM, max 4-8GBMonitoring Logstash
Key Metrics
| Metric | Healthy | Warning |
|---|---|---|
| Events in per second | Stable rate | Sudden drop |
| Events out per second | Matches events in | Lag growing |
| Pipeline workers busy | < 80% | 100% sustained |
| Queue backpressure | 0 | Growing |
| JVM heap usage | < 75% | > 85% |
| DLQ size | 0 | Growing |
# Check pipeline stats
curl -s localhost:9600/_node/stats/pipelines | jq .
# Check JVM metrics
curl -s localhost:9600/_node/stats/jvm | jq .
# Check hot threads
curl -s localhost:9600/_node/hot_threadsDead Letter Queue
When events fail to process, they go to the Dead Letter Queue (DLQ) for later reprocessing.
# Enable DLQ in logstash.yml
# dead_letter_queue.enable: true
# dead_letter_queue.max_bytes: 1gb
# Reprocess DLQ events
input {
dead_letter_queue {
path => "/var/logstash/data/dead_letter_queue"
commit_offsets => true
pipeline_id => "main"
}
}
filter {
# Fix or transform failed events
mutate {
remove_field => ["[dead_letter_queue_metadata]"]
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "recovered-%{+YYYY.MM.dd}"
}
}