Having set up basic ELK stack we probably should get on to ingesting some data.
Wait, what about schema?
Theoretically, we could play it very strict and define our expected documetns similar to the way we define SQL tables. But it’s not fun and besides, Elastic comes with sensible defaults. It has done a decent enough job for us to have never needed to correct it. Not bad at all.
Setting up the pipeline
Logstash operates in terms of pipelines. If I were to reduce it to the bare basics, each pipeline allows us to define where data comes from (input), how Logstash should pick events to process it (filter) and finally, where to send it to (output). The last bit is kinda obvious and easy – we want to send processed events to Elastic. And by virtue of running doker we happen to know that it’s hosted at http://elasticsearch:9200. Too easy.
Input
Being quite modular, Logstash operates in terms of plugins. And there are HEAPS to chose from! Apart from obvious bits you get to see in the documentation, one thing that might not be quite obvious – you can have multiple plugins running for one input stream!
Our telemetry agent of choice, telegraf supports sending metrics via Influx Line Protocol, but Logstash does not have a plugin for that – so we will need to craft something bit more complex. Suppose we want basic metrics and sql-stats
input {
tcp {
port => 8098
type => "telegraf"
} # opens up a TCP listener for events sent by telegraf (which happens to support Influx Line Protocol)
tcp {
port => 8099
type => "sql-stats"
} # opens up a TCP listener for events coded in Influx Line protocol
}
Filter
This bit is optional, but sometimes is very helpful. And again, we’ve got quite a variety of plugins to suit. Here we’ve got a chance to inspect incoming events and either transform or outright reject some. For example here we parse Influx Line protocol sent by telegraf and do some basic enhancements on data coming from SQL-stats poller:
filter {
if [type] == "telegraf" {
dissect {
mapping => {
"message" => "%{measurement}.%{metric};%{tags} %{value} %{ts}"
}
}
kv {
source => "tags"
field_split => ";"
}
date {
match => ["ts", "UNIX"]
}
mutate {
convert => {
"value" => "float"
}
remove_field => ["tags", "message", "ts", "port"]
}
}
if [type] == "sql-stats" {
grok {
match => {
"message" => "%{WORD:measurement}(\.%{WORD:metric})?;%{GREEDYDATA:tags} (%{BASE10NUM:value}) %{NUMBER:ts}"
}
}
kv {
source => "tags"
field_split => ";"
}
date {
match => ["ts", "UNIX"]
}
mutate {
convert => {
"value" => "float"
}
rename => {
"counter" => "metric"
"clerk_type" => "metric"
}
remove_field => ["tags", "message", "ts", "port", "host"]
}
}
}
All that’s left
To store the events, we invoke output like so:
output {
if [type] == "telegraf" {
elasticsearch {
hosts => "elasticsearch:9200"
index => "telegraf-%{+YYYY.MM}"
}
}
if [type] == "sql-stats" {
elasticsearch {
hosts => "elasticsearch:9200"
index => "sql-stats-%{+YYYY.MM}"
}
}
}