INFO-实时计算日志采集功能对接

使用场景
配置日志采集功能,实时运维页面查看任务运行日志以及异常日志
INFO-实时计算日志采集功能对接 - 图1
使用示例
1. easyops找到logstash服务找到server 配置组,按照如下示例添加
INFO-实时计算日志采集功能对接 - 图2
pipeline_filter
filter {
  mutate {
    rename => {
      "_body" => "message"
      "_hostname" => "[host][name]"
      "_position" => "[log][offset]"
      "_file_dir" => "[log][file][path]"
    }
  }

  date {
    match => [ "_timestamp", "UNIX_MS" ]
  }

  if [message] =~ "^[0-9]{4}-[0-9]{2}-[0-9]{2}[[:space:]][0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}[[:space:]]ERROR[[:space:]]+" or [message] =~ "^[a-zA-Z][\s\S]+:[\s\S]*\n\s+at[[:space:]]" {
    mutate {
      add_field => {"isError" => true}
      add_field => {"logLevel" => "log.error"}
    }
  } else {
    mutate {
      add_field => {"isError" => false}
      add_field => {"logLevel" => "log"}
    }
  }

  if [message] =~ "Starting.*?\(Version:" {
    mutate { add_field => { "[@metadata][side_output]" => "true" }}
  }

  grok {
    match => { "[log][file][path]" => "/(?<applicationId>application_[^\/]*?)/(?<containerId>container_[^\/]*?)/(?<logType>[^\/]*?)\." }
  }

  grok {
    match => {"_task_name" => "^sloth_log_cluster_(?<clusterId>[0-9]+)"}
  }

  ruby {
    code => "event.set('index_day', event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
  }

  mutate {
    rename => {
      "clusterId" => "[fields][clusterId]"
    }
  }

  mutate {
    convert => {
      "[fields][clusterId]" => "integer"
      "isError" => "boolean"
    }
  }

  mutate {
    remove_field => [ "_timestamp", "_task_name" ]
  }
}

pipeline_output,ES配置信息从ES导出值获取

output {
  if [@metadata][side_output] == "true" {
    elasticsearch {
      hosts => ["xxx:9601","xxx:9601","xxx:9601"]
      index => subtask_container_log
      user => "elastic"
      password => "xxx"
    }
  }
  elasticsearch {
    hosts => ["xxx:9601","xxx:9601","xxx:9601"]
    index => "sloth_yarn_log_%{index_day}"
    user => "elastic"
    password => "xxx"
  }
}

作者:林帅