cat /etc/vector/vector.toml
[sources.elasticsearch_search_slowlog]
type = "file"
include = ["/var/log/elasticsearch/picapica_es_index_search_slowlog*.log", "/var/log/elasticsearch_2nd/picapica_es_index_search_slowlog*.log", ]
#read_from = "end"
read_from = "beginning" #是否获取之前的日志进行格式化
[transforms.elasticsearch_search_slowlog_parse]
type = "remap"
inputs = ["elasticsearch_search_slowlog"]
source = """
. |= parse_regex!(.message, r'^\\[(?P<timestamp>\\S+)\\]\\[(?P<log_level>\\S+)\\s+\\]\\[(?P<query_type>\\S+)\\] \\[(?P<instance>\\S+)\\] \\[(?P<query_index>\\S+)\\]\\[(?P<query_shard>\\d+)\\] took\\[(?P<delay_time>\\d+\\.{0,1}\\d*)(?P<delay_time_unit>\\S+)\\], took_millis\\[(?P<delay_time_millis>\\d+)\\], total_hits\\[(?P<hit>\\d+)\\+{0,1} hits\\], \\S+, \\S+, search_type\\[(?P<search_type>\\S+)\\], total_shards\\[(?P<search_shard_num>\\S+)\\], source\\[(?P<source>\\S+( \\S+)*)\\], id\\[\\]')
#. |= parse_regex!(.message, r'^\\[(?P<timestamp>\\S+)\\]\\[(?P<log_level>\\S+) \\]\\[(?P<query_type>\\S+)\\] \\[(?P<instance>\\S+)\\] \\[(?P<query_index>\\S+)\\]\\[(?P<query_shard>\\S+)\\] took\\[(?P<delay_time>\\d+\\.{0,1}\\d*)(?P<delay_time_unit>\\S+)\\], took_millis\\[(?P<delay_time_millis>\\S+)\\], total_hits\\[(?P<hit>\\S+) hits\\], types\\[\\], stats\\[\\], search_type\\[(?P<search_type>\\S+)\\], total_shards\\[(?P<search_shard_num>\\S+)\\], id\\[\\]')
#. |= parse_regex!(.message, r'^\\[(?P<timestamp>\\S+)\\]\\[(?P<level>\\S+) \\]\\[(?P<query_type>\\S+)\\] \\[(?P<instance>\\S+)\\] \\[(?P<index>\\S+)\\]\\[(?P<shard>\\S+)\\] took\\[(?P<delay_time>\\S+)\\], took_millis\\[(?P<delay_time_millis>\\S+)\\], total_hits\\[(?P<hit>\\S+) hits\\], types\\[\\], stats\\[\\], search_type\\[(?P<search_type>\\S+)\\], total_shards\\[(?P<search_shard_num>\\S+)\\], source\\[(?P<source>\\S+)\\], id\\[\\]')
.timestamp = parse_timestamp(.timestamp, "%Y-%m-%d %H:%M:%S,%z") ?? now()
.input_ip = "10.x.6.x"
.cluster = "picapica-es"
.delay_time = to_float!(.delay_time)
del(.message)
del(.file)
#drop_on_error= true # 打开就是忽略不匹配的日志
"""
[sinks.bigdata_es]
inputs = ["elasticsearch_search_slowlog_parse"]
type = "elasticsearch"
endpoint = "http://elasticsearch.com:80"
[sinks.bigdata_es.auth]
user = "elastic"
password = "123pws"
strategy = "basic"
[sinks.bigdata_es.bulk]
index = "bigdata-elasticsearch-index-slowlog-%Y.%m"
## 以下是开启打印到日志,进行调试josn是否合适
#[sinks.my_sink_id]
#type = "console"
#inputs = [ "elasticsearch_search_slowlog_parse" ]
#target = "stdout"
#
#[sinks.my_sink_id.encoding]
#codec = "json"
标签:parse,search,记录,slowlog,初步,vector,elasticsearch,type,es
From: https://www.cnblogs.com/x602/p/16712611.html