input { # this is the actual live log file to monitor file { path => ["/home/cowrie/cowrie-git/log/cowrie.json"] codec => json type => "cowrie" } # this is to send old logs to for reprocessing #tcp { # port => 3333 # type => "cowrie" #} } filter { if [type] == "cowrie" { json { source => message } date { match => [ "timestamp", "ISO8601" ] } if [src_ip] { mutate { add_field => { "src_host" => "%{src_ip}" } } dns { reverse => [ "src_host" ] nameserver => [ "8.8.8.8", "8.8.4.4" ] action => "replace" hit_cache_size => 4096 hit_cache_ttl => 900 failed_cache_size => 512 failed_cache_ttl => 900 } geoip { source => "src_ip" target => "geoip" database => "/opt/logstash/vendor/geoip/GeoLite2-City.dat" } } mutate { remove_tag => [ "beats_input_codec_plain_applied"] # cut useless fields added by filebeat, if you use it of course remove_field => [ "source", "offset", "input_type" ] } } } output { if [type] == "cowrie" { elasticsearch { hosts => ["localhost:9200"] } file { path => "/tmp/cowrie-logstash.log" codec => json } stdout { codec => rubydebug } } }