适用模块

实时模块

具体说明

Java Flink消费KafkaHDFS

使用示例

##### 依赖配置
对应pom.xml文件依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-filesystem_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
##### 代码示例
Flink消费KafkaHDFS的简单demo代码

```java
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.time.ZoneId;
import java.util.Properties;

public class Kafka2Hdfs {
    public static void main(String[] args) throws Exception {

        //kafka 连接信息
        String bootstrap_servers = "kafka1:9092,kafka2:9092:kafka3:9092";
        String groupId = "test_group";
        String offset = "latest";
        String topic = "test-topic";

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", bootstrap_servers);
        properties.setProperty("group.id", groupId);
        properties.setProperty("auto.offset.reset", offset);

        // 创建Flink执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setMaxParallelism(256);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setCheckpointInterval(10 * 60 * 1000);    //checkpoint间隔10min
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // 创建 Kafka Source
        FlinkKafkaConsumer011<String> kafkaSource = new FlinkKafkaConsumer011(
                topic,
                new SimpleStringSchema(),
                properties);

        // 创建 HDFS Sink
        String filePath = "/user/bdms";

        // 文件滚动策略1 :  可指定文件滚动规则
        DefaultRollingPolicy rollingPolicy = DefaultRollingPolicy
                .builder()
                .withMaxPartSize(1024 * 1024 * 256)      // 设置每个文件的最大大小 ,默认是128M。这里设置为256M
                .withRolloverInterval(Long.MAX_VALUE)   // 滚动写入新文件的时间,默认60s。这里设置为无限大
                .withInactivityInterval(60 * 1000)      // 60s空闲,就滚动写入新的文件
                .build();

        // 文件滚动策略2 : 当checkpoint时,文件滚动
        OnCheckpointRollingPolicy rollingPolicy2 = OnCheckpointRollingPolicy.build();


        StreamingFileSink hdfsSink = StreamingFileSink
                .forRowFormat(new Path(filePath), new SimpleStringEncoder<String>("UTF-8"))  // 输出的文件是按行存储的
//                .forBulkFormat(new Path(filePath))     // 也可以将输出结果用 Parquet 等格式进行压缩存储
                .withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd", ZoneId.of("Asia/Shanghai")))   //分桶策略 默认"yyyy-MM-dd--HH"  这里设置按天分
//                .withBucketAssigner(new BasePathBucketAssigner())   //分桶策略: 不分桶,所有文件写到根目录;
                .withRollingPolicy(rollingPolicy)
                .withBucketCheckInterval(1000L) // 桶检查间隔,这里设置为1s
                .build();

        // 添加Source、Sink
        DataStreamSource<String> sourceStream = env.addSource(kafkaSource);
        sourceStream.addSink(hdfsSink);

        // 执行任务
        env.execute("Kafka2Hdfs");

    }
}

作者:wangsong