INFO-实时计算日志采集功能对接
更新时间: 2025-06-04 16:23:10
阅读 11
INFO-实时计算日志采集功能对接
使用场景
配置日志采集功能,实时运维页面查看任务运行日志以及异常日志
使用示例
1. easyops找到logstash服务找到server 配置组,按照如下示例添加
pipeline_filter
filter {
mutate {
rename => {
"_body" => "message"
"_hostname" => "[host][name]"
"_position" => "[log][offset]"
"_file_dir" => "[log][file][path]"
}
}
date {
match => [ "_timestamp", "UNIX_MS" ]
}
if [message] =~ "^[0-9]{4}-[0-9]{2}-[0-9]{2}[[:space:]][0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}[[:space:]]ERROR[[:space:]]+" or [message] =~ "^[a-zA-Z][\s\S]+:[\s\S]*\n\s+at[[:space:]]" {
mutate {
add_field => {"isError" => true}
add_field => {"logLevel" => "log.error"}
}
} else {
mutate {
add_field => {"isError" => false}
add_field => {"logLevel" => "log"}
}
}
if [message] =~ "Starting.*?\(Version:" {
mutate { add_field => { "[@metadata][side_output]" => "true" }}
}
grok {
match => { "[log][file][path]" => "/(?<applicationId>application_[^\/]*?)/(?<containerId>container_[^\/]*?)/(?<logType>[^\/]*?)\." }
}
grok {
match => {"_task_name" => "^sloth_log_cluster_(?<clusterId>[0-9]+)"}
}
ruby {
code => "event.set('index_day', event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
}
mutate {
rename => {
"clusterId" => "[fields][clusterId]"
}
}
mutate {
convert => {
"[fields][clusterId]" => "integer"
"isError" => "boolean"
}
}
mutate {
remove_field => [ "_timestamp", "_task_name" ]
}
}
pipeline_output,ES配置信息从ES导出值获取
output {
if [@metadata][side_output] == "true" {
elasticsearch {
hosts => ["xxx:9601","xxx:9601","xxx:9601"]
index => subtask_container_log
user => "elastic"
password => "xxx"
}
}
elasticsearch {
hosts => ["xxx:9601","xxx:9601","xxx:9601"]
index => "sloth_yarn_log_%{index_day}"
user => "elastic"
password => "xxx"
}
}
作者:林帅
文档反馈
以上内容对您是否有帮助?