在SQL 任务中使用kerberos 认证 的Kafka
更新时间: 2022-10-13 11:47:45
阅读 780
在SQL 任务中使用kerberos 认证 的Kafka
本文基于flink 1.12 引擎测试
由于有数开发平台集成了kerberos 做集群的安全认证,在任务提交运行过程中,都需要去认证hdfs和yarn. 平台已在flink conf 配置中添加作业运行过程中需要的认证配置,可以在任意任务的flink 运行页面, Job Manager -> Configuration 参数中查看。相关参数请查阅社区参数配置https://nightlies.apache.org/flink/flink-docs-release-1.12/deployment/config.html#auth-with-external-systems 由于实时计算产品目前不支持多用户的方式进行集群认证,因此kafka 的认证用户 需要和平台配置的认证用户保持一致。目前平台使用sloth 用户登录yarn 和hdfs ,如果需要在平台使用kerberos 认证的kafka ,则需要kafka 管理人员通过ACLs 对作业涉及到的topic、消费者组为sloth用户进行相关授权配置 |
ACLs授权 相关操作,请参阅kafka 官方文档 https://kafka.apache.org/documentation/#security_authz
授权示例
查看授权
./kafka-acls.sh --authorizer-properties zookeeper.connect=bigdata1:2182/kafka -list
生产者授权
注意 用户必须指定为sloth
./kafka-acls.sh --authorizer-properties zookeeper.connect=bigdata1:2182/kafka -add --allow-principal User:sloth --producer --topic test_demo63
消费者组授权
注意 用户必须指定为sloth
/kafka-acls.sh --authorizer-properties zookeeper.connect=bigdata1:2182/kafka-add --allow-principal User:sloth --consumer --group=* --topic test_demo63
作业配置
flink sql kafka DDL 作业配置示例如下
CREATE TABLE sink_table (
order_id int,
order_date VARCHAR,
customer_name VARCHAR,
price decimal(10,3),
product_id int,
order_status boolean
) WITH (
'connector' = 'kafka',
'topic' = 'test_demo63',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'broker1:9092',
'format' = 'json',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'GSSAPI',
'properties.sasl.kerberos.service.name'='kafka'
);
相较于不带kerberos 认证的kafka ,需要增加如下3个参数,无论kafka 作为source 或者sink。 作为sink 时需要注意的是,指定的消费者组必须为sloth 用户做授权配置
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'GSSAPI',
'properties.sasl.kerberos.service.name'='kafka'
同时需要为作业添加一个高级参数配置,以将平台配置的kerberos 认证同时也作为kafka 的认证 参数如下:
security.kerberos.login.contexts KafkaClient
Sloth 版本为384 时,在任务页面右侧的高级参数处添加即可
Sloth 版本为大于3901 时,需要先将任务提交发布,然后在作业配置的,Flink 高级配置处添加
作业完整sql 示例
create table datagen_source(
order_id int,
order_date varchar,
customer_name varchar,
price decimal(10,3),
product_id int,
order_status boolean
)with(
'connector' = 'datagen',
'rows-per-second' = '1'
);
CREATE TABLE sink_table (
order_id int,
order_date VARCHAR,
customer_name VARCHAR,
price decimal(10,3),
product_id int,
order_status boolean
) WITH (
'connector' = 'kafka',
'topic' = 'test_demo63',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'broker1:9092',
'format' = 'json',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'GSSAPI',
'properties.sasl.kerberos.service.name'='kafka'
);
insert into sink_table select * from datagen_source;
文档反馈
以上内容对您是否有帮助?