Some information in PostgreSQL log-events are split over multiple loglines belonging to one (virtual-)transaction-id. While inspecting the log-events in a tool like Kibana (ELK-Stack) it could be handy to have the information of one transaction in one log-event.

This solution was on my mind for quite some time, but now I found the time and motivation to apply this.

Complete example Logstash configuration

input {
  file {
    path  => "/tmp/logstash/psql.log"
    type  => "postgres"
    mode  => read
    codec => multiline {
      negate  => true
      pattern => "^%{TIMESTAMP_ISO8601} "
      what    => "previous"
    }
  }
}

filter {
  grok {
    # parses the log-events
    match => { "message" => [
                  "%{TIMESTAMP_ISO8601:@timestamp} %{DATA:timezone} \[%{POSINT:pid}\]: \[%{POSINT:sequence}-1\] host=(?:%{DATA:psql_client}),user=(?:%{DATA:psql_user}),db=(?:%{DATA:psql_db}),tx=(?:%{DATA:psql_transactionid}),vtx=(?:%{DATA:psql_virtualtransactionid}) %{WORD:psql_logger}:%{SPACE}duration:%{SPACE}%{NUMBER:psql_duration}%{SPACE}ms%{SPACE}%{GREEDYDATA:msg}",
                  "%{TIMESTAMP_ISO8601:@timestamp} %{DATA:timezone} \[%{POSINT:pid}\]: \[%{POSINT:sequence}-1\] host=(?:%{DATA:psql_client}),user=(?:%{DATA:psql_user}),db=(?:%{DATA:psql_db}),tx=(?:%{DATA:psql_transactionid}),vtx=(?:%{DATA:psql_virtualtransactionid}) %{WORD:psql_logger}:%{SPACE}%{GREEDYDATA:msg}"
                  ]}
    remove_field => [ "message" ]
    tag_on_failure => ["grok_postgres_failed"]
  }

  # parse date into @timestamp
  date {
    match => [ "timestamp", "ISO8601", "YYYY-MM-dd HH:mm:ss" ]
    remove_field => [ "timestamp" ]
  }
}

# only apply the aggregate filter on log-events carrying a value in the field
# `user`, no need to apply the filter to _every_ log-event.
if [psql_user] {
  aggregate {
    task_id => "%{psql_virtualtransactionid}"
    code =>  "
        (map.merge!(event); map['msg'] = ''; map['psql_duration'] = 0.0 )if map.empty?
        map['msg'] += event.get('msg') + 10.chr
        if event.get('psql_duration')
          map['msg'] += 'duration: ' + event.get('psql_duration') + 10.chr
          map['psql_duration'] += event.get('psql_duration').to_f
        end
    "
    timeout_task_id_field => "psql_virtualtransactionid"
    timeout => 10
    timeout_tags => ['_aggregatetimeout']
    timeout_code => "event.tag('psql_aggregated')"
    push_map_as_event_on_timeout => true
  }

  if "psql_aggregated" in [tags] {
    mutate {
      convert => {
        "psql_duration" => "float"
      }
    }

    ruby{
      code=>"
          if event.get('psql_duration')
            if event.get('psql_duration') == 0.0
              event.remove('psql_duration')
            else
              event.set('psql_duration', event.get('psql_duration').round(2))
            end
          end
      "
    }
  } else {
    drop {}
  }
}

output {
  stdout {
    codec => rubydebug
  }
}

Inputs

The input block ensures that every line not starting with a TIMESTAMP_ISO8601 is concatenated to the previous line.

input {
  file {
    path  => "/tmp/logstash/psql.log"
    type  => "postgres"
    mode  => read
    codec => multiline {
      negate  => true
      pattern => "^%{TIMESTAMP_ISO8601} "
      what    => "previous"
    }
  }
}

Filter

After the initial GROK-parsing the aggregate-filter is applied:

aggregate {
  task_id => "%{psql_virtualtransactionid}"
  code =>  "
      (map.merge!(event); map['msg'] = ''; map['psql_duration'] = 0.0 )if map.empty?
      map['msg'] += event.get('msg') + 10.chr
      if event.get('psql_duration')
        map['msg'] += 'duration: ' + event.get('psql_duration') + 10.chr
        map['psql_duration'] += event.get('psql_duration').to_f
      end
  "
  timeout_task_id_field => "psql_virtualtransactionid"
  timeout => 10
  timeout_tags => ['_aggregatetimeout']
  timeout_code => "event.tag('psql_aggregated')"
  push_map_as_event_on_timeout => true
}

The field psql_virtualtransactionid is used as a identifier for log-evnts. All log-events matching this identifier are aggregated.

The code executed is the following one:

  1. If there’s no map established for the current task_id:
    • create a new map and merge the content of the current log-event into the new map
    • reset the content of map['msg] to nil
    • reset the content of map['psql_duration] to 0.0
  2. Add the field-value of the current log-event to map['msg] & add a linebreak
  3. If the current log-event contains a field psql-duration:
    • Add the field-value of the current log-event to map['msg] & add a linebreak
    • Add the field-value of the current log-event to map['psql_duration]

The map is written as a log-event after the timeout of 10 seconds and a tag psql_aggregated is added.

Later on in the code some cleanup happens:

if "psql_aggregated" in [tags] {
  mutate {
    convert => {
      "psql_duration" => "float"
    }
  }

  ruby{
    code=>"
        if event.get('psql_duration')
          if event.get('psql_duration') == 0.0
            event.remove('psql_duration')
          else
            event.set('psql_duration', event.get('psql_duration').round(2))
          end
        end
    "
  }
} else {
  drop {}
}

On log-events having the tag psql_aggregated the value of psql_duration is explicitly converted to float and an empty fields is removed. If a value of psql_duration is present, the values are rounded to two decimal-points to improve the readability.

Log-events not having the tag psql_aggregated, that is the “source”-log-events, are finally dropped completely. The information is already aggregated in the new aggregated log-event.

Hints

The aggregate-plugin only can run in one thread. You should ensure that either Logstash is only using one worker, or this pipeline is configured to use only one Logstash worker.

Some of your log-events will have a delay of seconds, but usually this information is not very time-critical.

Further resources

This solution is inspired by https://discuss.elastic.co/t/specific-grok-filter-for-multi-line-postgresql-log/56286/9.