pipeline rule
rule "GeoIP lookup: zimbra_auth_failure"
when
regex("^warning\\:\\sunknown\\[(.+?)\\]\\:\\sSASL\\sLOGIN\\sauthentication\\sfailed\\:\\sauthentication\\sfailure$", to_string($message.message)).matches == true
then
let result = regex("^warning\\:\\sunknown\\[(.+?)\\]\\:\\sSASL\\sLOGIN\\sauthentication\\sfailed\\:\\sauthentication\\sfailure$", to_string($message.message));
let geo = lookup("geoip", result["0"]);
set_field("src_ip_geo_location", geo["coordinates"]);
set_field("src_ip_geo_country", geo["country"].iso_code);
set_field("src_ip_geo_city", geo["city"].names.en);
end
# 替换timestamp值,貌似不好使, 用Extroc
rule "replace timestamp"
when
regex("(^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2})", to_string($message.message)).matches == true
then
let result = regex("([0-9]{4}-[0-9]{2}-[0-9]{2}\\s[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3})", to_string($message.message));
let new_time = parse_date(to_string(result["0"]), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
set_field("timestamp", new_time);
end
利用Extroctor把filebeat@timestame的时间替换为日志时间
必须转换为时间格式,否则es存不进去
system->input->manager extractors> add extractor