Some information in PostgreSQL log-events are split over multiple loglines belonging to one (virtual-)transaction-id. While inspecting the log-events in a tool like Kibana (ELK-Stack) it could be handy to have the information of one transaction in one log-event.
This solution was on my mind for quite some time, but now I found the time and motivation to apply this.
Complete example Logstash configuration
input {
file {
path => "/tmp/logstash/psql.log"
type => "postgres"
mode => read
codec => multiline {
negate => true
pattern => "^%{TIMESTAMP_ISO8601} "
what => "previous"
}
}
}
filter {
grok {
# parses the log-events
match => { "message" => [
"%{TIMESTAMP_ISO8601:@timestamp} %{DATA:timezone} \[%{POSINT:pid}\]: \[%{POSINT:sequence}-1\] host=(?:%{DATA:psql_client}),user=(?:%{DATA:psql_user}),db=(?:%{DATA:psql_db}),tx=(?:%{DATA:psql_transactionid}),vtx=(?:%{DATA:psql_virtualtransactionid}) %{WORD:psql_logger}:%{SPACE}duration:%{SPACE}%{NUMBER:psql_duration}%{SPACE}ms%{SPACE}%{GREEDYDATA:msg}",
"%{TIMESTAMP_ISO8601:@timestamp} %{DATA:timezone} \[%{POSINT:pid}\]: \[%{POSINT:sequence}-1\] host=(?:%{DATA:psql_client}),user=(?:%{DATA:psql_user}),db=(?:%{DATA:psql_db}),tx=(?:%{DATA:psql_transactionid}),vtx=(?:%{DATA:psql_virtualtransactionid}) %{WORD:psql_logger}:%{SPACE}%{GREEDYDATA:msg}"
]}
remove_field => [ "message" ]
tag_on_failure => ["grok_postgres_failed"]
}
# parse date into @timestamp
date {
match => [ "timestamp", "ISO8601", "YYYY-MM-dd HH:mm:ss" ]
remove_field => [ "timestamp" ]
}
}
# only apply the aggregate filter on log-events carrying a value in the field
# `user`, no need to apply the filter to _every_ log-event.
if [psql_user] {
aggregate {
task_id => "%{psql_virtualtransactionid}"
code => "
(map.merge!(event); map['msg'] = ''; map['psql_duration'] = 0.0 )if map.empty?
map['msg'] += event.get('msg') + 10.chr
if event.get('psql_duration')
map['msg'] += 'duration: ' + event.get('psql_duration') + 10.chr
map['psql_duration'] += event.get('psql_duration').to_f
end
"
timeout_task_id_field => "psql_virtualtransactionid"
timeout => 10
timeout_tags => ['_aggregatetimeout']
timeout_code => "event.tag('psql_aggregated')"
push_map_as_event_on_timeout => true
}
if "psql_aggregated" in [tags] {
mutate {
convert => {
"psql_duration" => "float"
}
}
ruby{
code=>"
if event.get('psql_duration')
if event.get('psql_duration') == 0.0
event.remove('psql_duration')
else
event.set('psql_duration', event.get('psql_duration').round(2))
end
end
"
}
} else {
drop {}
}
}
output {
stdout {
codec => rubydebug
}
}
Inputs
The input block ensures that every line not starting with a TIMESTAMP_ISO8601
is concatenated to the previous line.
input {
file {
path => "/tmp/logstash/psql.log"
type => "postgres"
mode => read
codec => multiline {
negate => true
pattern => "^%{TIMESTAMP_ISO8601} "
what => "previous"
}
}
}
Filter
After the initial GROK-parsing the aggregate-filter is applied:
aggregate {
task_id => "%{psql_virtualtransactionid}"
code => "
(map.merge!(event); map['msg'] = ''; map['psql_duration'] = 0.0 )if map.empty?
map['msg'] += event.get('msg') + 10.chr
if event.get('psql_duration')
map['msg'] += 'duration: ' + event.get('psql_duration') + 10.chr
map['psql_duration'] += event.get('psql_duration').to_f
end
"
timeout_task_id_field => "psql_virtualtransactionid"
timeout => 10
timeout_tags => ['_aggregatetimeout']
timeout_code => "event.tag('psql_aggregated')"
push_map_as_event_on_timeout => true
}
The field psql_virtualtransactionid
is used as a identifier for log-evnts.
All log-events matching this identifier are aggregated.
The code executed is the following one:
- If there’s no map established for the current
task_id
:- create a new map and merge the content of the current log-event into the new map
- reset the content of
map['msg]
tonil
- reset the content of
map['psql_duration]
to0.0
- Add the field-value of the current log-event to
map['msg]
& add a linebreak - If the current log-event contains a field
psql-duration
:- Add the field-value of the current log-event to
map['msg]
& add a linebreak - Add the field-value of the current log-event to
map['psql_duration]
- Add the field-value of the current log-event to
The map is written as a log-event after the timeout of 10 seconds and a tag
psql_aggregated
is added.
Later on in the code some cleanup happens:
if "psql_aggregated" in [tags] {
mutate {
convert => {
"psql_duration" => "float"
}
}
ruby{
code=>"
if event.get('psql_duration')
if event.get('psql_duration') == 0.0
event.remove('psql_duration')
else
event.set('psql_duration', event.get('psql_duration').round(2))
end
end
"
}
} else {
drop {}
}
On log-events having the tag psql_aggregated
the value of psql_duration
is
explicitly converted to float
and an empty fields is removed.
If a value of psql_duration
is present, the values are rounded to two
decimal-points to improve the readability.
Log-events not having the tag psql_aggregated
, that is the “source”-log-events,
are finally dropped completely. The information is already aggregated in the
new aggregated log-event.
Hints
The aggregate-plugin only can run in one thread. You should ensure that either Logstash is only using one worker, or this pipeline is configured to use only one Logstash worker.
Some of your log-events will have a delay of seconds, but usually this information is not very time-critical.
Further resources
- https://github.com/logstash-plugins/logstash-filter-aggregate
- https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html
This solution is inspired by https://discuss.elastic.co/t/specific-grok-filter-for-multi-line-postgresql-log/56286/9.