Flink自身具有一套完整的监控系统,用于监控Flink任务的运行状况,具有如Failover、输入输出QPS、CheckPoint成功率、延迟等一系列指标,但是对于业务而言,在一些情况下,仅有任务本身的监控是不够的,Flink任务运行正常也不能意味着业务场景没有问题,用户需要将Flink任务计算的某些业务指标(如统计金额异常的订单数)进行上报,并且监控异常,发送报警。基于此需求,我们在提供了上报业务指标并进行监控报警的方案。

本文将从三个方面:1、业务指标的上报;2、监控展示;3、异常报警三方面指导用户完成业务监控报警需求。

1、业务指标的上报

Flink提供了4种类型的Metrics:Counters, Gauges, Histograms, Meters,每种metrics的各自特点可参考 官方文档

在绝大部分的场景中,Counters和Gauges就可以满足需求,Counters主要用于累加的Metircs,比如计算1分钟内的金额,Gauges主要用于设置当前值,比如计算当前这条数据的延迟,为了方便用户使用,我们将这两种类型的Metrics进行了封装,用户只需要引入pom依赖,然后调用相应的接口即可。

<repositories>
    <repository>
    <id>libs-releases</id>
    <url>http://mvn.hz.netease.com/artifactory/libs-releases</url>
    </repository>
    <repository>
    <id>libs-snapshots</id>
    <url>http://mvn.hz.netease.com/artifactory/libs-snapshots</url>
    </repository>
</repositories>

1.2 添加依赖

<dependency>
    <groupId>com.netease.sloth</groupId>
    <artifactId>sloth-udmetric-base</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

1.3 选择指标类型

1.3.1 MetricGauge

​ MetricGauge主要用于上报当前值,其中setValue(long value)方法会将设置当前值,每次getValue时会将value上报到指标系统中并将value赋为0

/** the current value. */
private AtomicLong value = new AtomicLong(0);

@Override
public Long getValue() {
    return value.getAndSet(0);
}

public void setValue(Long value) {
    this.value.set(value);
}

1.3.1 MetricCounter

MetricGauge主要用于上报累加值,其中包含一个累加器counter,inc()方法会将counter加1,inc(long n)方法会将counter加n,每次getValue时会将value上报到指标系统中并将value赋为0。

/** the current value. */
private AtomicLong count = new AtomicLong(0);

@Override
public void inc() {
    count.incrementAndGet();
}

@Override
public void inc(long n) {
    count.addAndGet(n);
}

@Override
public void dec() {
    count.decrementAndGet();
}

@Override
public void dec(long n) {
    count.set(count.get() - n);
}

@Override
public long getCount() {
    return count.getAndSet(0);
}

1.4 指标上报

Metric指标的上报包含3个过程:

1.4.1 在静态域中创建Metric对象

private static MetricCounter count = new MetricCounter();
private static MetricGague gauge = new MetricGague();

1.4.2 在一个RichFunction中的open()函数中注册Metric

注册Metric需要调用getRuntimeContext方法,该方法需要在RichFunction的open()方法中才能获得。

stream.map(new RichMapFunction<String, String>() {
    @Override
    public void open(Configuration parameters){
        getRuntimeContext().getMetricGroup().addGroup("MetricName", "IpCount").counter("UDMetric", count);
        getRuntimeContext().getMetricGroup().addGroup("MetricName", "IpGauge").gauge("UDMetric", gauge);
    }
}

这里需要注意的是,getRuntimeContext().getMetricGroup().addGroup(“MetricName“, “IpCount”).counter(“UDMetric“, count)中,加粗部分MetricName及UDMetric是固定的,是用于上报指标时监控系统进行过滤识别业务指标的,因此这两个字段不能改动,而IpCount是用户定义的指标名称,可以根据需要设置,count是静态域中定义的Metric对象。

1.4.3 上报指标

在完成好Metric的创建和注册后,用户就可以在代码中上报自己的指标了

@Override
public String map(String s) throws Exception {
 gauge.setValue(1L);
 count.inc(1);
 return s;
}

1.4.4 添加Falcon Reporter类

在完成指标上报后,需要在任务依赖中添加Open falcon Reporter类,因此,需要将下面的jar包添加到任务的依赖中: ic-falcon-1.10.jar

注意:依赖Jar包根据不同引擎版本有所区别,可咨询对应技术支持进行获取与详细确认。

2 监控指标查看

指标上报完成后,可以登陆Grafana监控页面,查看业务监控指标栏查看指标上报情况,在panel的下方,可以看到对应的指标名。其中,对于Gauge类型指标,展示max值,代表指定周期内的最大值;对于Counter类型指标,展示sum值,代表指定周期内sum的求和值,sum的周期可以在界面上方进行选择,目前可供选择的周期有10s/1min/5min/1h。 业务指标开发 - 图1