Demo - Flink写入HDFS
更新时间: 2024-03-11 02:50:25
阅读 2507
DEMO-Flink写入HDFS
适用模块
实时模块
具体说明
Java Flink消费Kafka写HDFS
使用示例
##### 依赖配置
对应pom.xml文件依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
##### 代码示例
Flink消费Kafka写HDFS的简单demo代码
```java
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.time.ZoneId;
import java.util.Properties;
public class Kafka2Hdfs {
public static void main(String[] args) throws Exception {
//kafka 连接信息
String bootstrap_servers = "kafka1:9092,kafka2:9092:kafka3:9092";
String groupId = "test_group";
String offset = "latest";
String topic = "test-topic";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", bootstrap_servers);
properties.setProperty("group.id", groupId);
properties.setProperty("auto.offset.reset", offset);
// 创建Flink执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setMaxParallelism(256);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setCheckpointInterval(10 * 60 * 1000); //checkpoint间隔10min
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建 Kafka Source
FlinkKafkaConsumer011<String> kafkaSource = new FlinkKafkaConsumer011(
topic,
new SimpleStringSchema(),
properties);
// 创建 HDFS Sink
String filePath = "/user/bdms";
// 文件滚动策略1 : 可指定文件滚动规则
DefaultRollingPolicy rollingPolicy = DefaultRollingPolicy
.builder()
.withMaxPartSize(1024 * 1024 * 256) // 设置每个文件的最大大小 ,默认是128M。这里设置为256M
.withRolloverInterval(Long.MAX_VALUE) // 滚动写入新文件的时间,默认60s。这里设置为无限大
.withInactivityInterval(60 * 1000) // 60s空闲,就滚动写入新的文件
.build();
// 文件滚动策略2 : 当checkpoint时,文件滚动
OnCheckpointRollingPolicy rollingPolicy2 = OnCheckpointRollingPolicy.build();
StreamingFileSink hdfsSink = StreamingFileSink
.forRowFormat(new Path(filePath), new SimpleStringEncoder<String>("UTF-8")) // 输出的文件是按行存储的
// .forBulkFormat(new Path(filePath)) // 也可以将输出结果用 Parquet 等格式进行压缩存储
.withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"))) //分桶策略 默认"yyyy-MM-dd--HH" 这里设置按天分
// .withBucketAssigner(new BasePathBucketAssigner()) //分桶策略: 不分桶,所有文件写到根目录;
.withRollingPolicy(rollingPolicy)
.withBucketCheckInterval(1000L) // 桶检查间隔,这里设置为1s
.build();
// 添加Source、Sink
DataStreamSource<String> sourceStream = env.addSource(kafkaSource);
sourceStream.addSink(hdfsSink);
// 执行任务
env.execute("Kafka2Hdfs");
}
}
作者:wangsong
文档反馈
以上内容对您是否有帮助?