DEMO-Python链接Hadoop集群
Python链接Hadoop集群[Kerberos]
适用模块
客户端
具体说明
Python脚本操作HDFS、Hive、Impala
系统环境准备
操作系统依赖包安装
# 安装kerberos客户端
yum install -y krb5-lib krb5-workstation
# 安装python相关模块系统依赖包
yum install libffi-devel python-devel openssl-devel cyrus-sasl cyrus-sasl-devel cyrus-sasl-lib gcc-c++ -y
系统环境变量配置
# 配置环境变量 其中xxx替换为节点上jdk实际安装目录)
echo "export JAVA_HOME=xxx" >~/.bash_profile
source ~/.bash_profile
Python虚拟环境安装
为避免污染主机python环境,建议通过miniconda安装虚拟python环境;
wget https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda/Miniconda3-latest-Linux-x86_64.sh
chmod +x ./Miniconda3-latest-Linux-x86_64.sh
# 安装过程根据提示选择一个合适的目录安装
./Miniconda3-latest-Linux-x86_64.sh
# 查看conda版本
conda --version
# 查看base python版本
python -V
Python依赖包安装
Python3环境准备
# 基于conda安装Python3
conda create -n python3 python=3.12
# 激活已安装python3虚拟环境
conda activate python3
# 安装连接集群第三方python模块
## kerberos模块
export CFLAGS="-std=c99" # gssapi模块安装报错时可引入该环境变量解决
pip3 install gssapi==1.8.3
pip3 install kerberos==1.3.0 krbcontext==0.10
## Hive/Impala模块
pip3 install impyla==0.19.0 thrift==0.16.0 thrift-sasl==0.4.3 six==1.16.0 bitarray==2.9.2
## hdfs模块
pip3 install hdfs==2.7.3 requests-kerberos==0.15.0
Python2环境准备
# 基于conda安装Python2
# 查找当前可用python版本
conda search python
conda create -n python2 python=2.7.18
# 激活已安装python2虚拟环境
conda activate python2
# 安装连接集群第三方python模块
## kerberos模块
pip install gssapi==1.6.2
pip install kerberos==1.3.0 krbcontext==0.10
## Hive/Impala模块
pip install impyla==0.19.0 thrift==0.16.0 thrift-sasl==0.4.3 six==1.16.0 bitarray==2.9.2
## hdfs模块
pip install hdfs==2.7.0 requests-kerberos==0.12.0
Python样例代码
Hive样例代码
# -*- coding: utf-8 -*-
from krbcontext import krbcontext
from impala.dbapi import connect
def conn_hive_with_kerberos(host, port, kerberos_service_name, principal, keytab):
"""
使用 krbcontext 进行上下文管理票据的生成与销毁,进入with时生成票据,退出时自动销毁票据。
"""
with krbcontext(using_keytab=True, principal=principal, keytab_file=keytab):
# 连接到 Impala
conn = connect(
host=host,
port=port,
auth_mechanism='GSSAPI',
kerberos_service_name=kerberos_service_name,
use_ssl=False, # 如果使用 SSL,请设置为 True
)
# 创建游标
cursor = conn.cursor()
# 执行查询
## 指定队列
cursor.execute('set mapreduce.job.queuename=root.dsc_support')
cursor.execute('show databases')
# 获取结果
results = cursor.fetchall()
for row in results:
print(row)
# 关闭连接
cursor.close()
conn.close()
if __name__ == '__main__':
# 配置参数
# 替换为Impalad服务所在主机名,可通过easyops中基础组件--Hive-default_hive-Hiveserver查看
host = 'dsc-demo08.jdlt.163.org'
# HiveServer2默认端口为9999
port = 9999
# 当链接引擎为hive时,默认为impala
kerberos_service_name = 'hive'
# 指定用于认证的keytab文件绝对路径
keytab = '/home/xxxx/xxxx.keytab'
# 指定keytab文件的principal,可通过平台-用户中心查看或通过命令行 `klist -kt keytab文件`查看
principal = 'bdms_xxxx/dev@BDMS_DEMO.COM'
# 连接到 hive 并执行查询
conn_hive_with_kerberos(host, port, kerberos_service_name, principal, keytab)
Impala样例代码
通过impyla模块仅支持单点模式链接Impala服务;如需用于生产环境,需自行通过Python轮询多个Impalad节点保证可用性;
# -*- coding: utf-8 -*-
from krbcontext import krbcontext
from impala.dbapi import connect
def conn_impala_with_kerberos(host, port, kerberos_service_name, principal, keytab):
"""
使用 krbcontext 进行上下文管理票据的生成与销毁,进入with时生成票据,退出时自动销毁票据。
"""
with krbcontext(using_keytab=True, principal=principal, keytab_file=keytab):
# 连接到 Impala
conn = connect(
host=host,
port=port,
auth_mechanism='GSSAPI',
kerberos_service_name=kerberos_service_name,
use_ssl=False # 如果使用 SSL,请设置为 True
)
# 创建游标
cursor = conn.cursor()
# 执行查询
cursor.execute('show databases')
# 获取结果
results = cursor.fetchall()
for row in results:
print(row)
# 关闭连接
cursor.close()
conn.close()
if __name__ == '__main__':
# 配置参数
# 替换为Impalad服务所在主机名,可通过easyops中基础组件--impala--default_impala--impalad查看
host = 'dsc-demo02.jdlt.163.org'
# Impala默认端口为21050
port = 21050
# 当链接引擎为impala时,默认为impala
kerberos_service_name = 'impala'
# 指定用于认证的keytab文件绝对路径
keytab = '/home/xxxx/xxxx.keytab'
# 指定keytab文件的principal,可通过平台-用户中心查看或通过命令行 `klist -kt keytab文件`查看
principal = 'bdms_xxxx/dev@BDMS_DEMO.COM'
# 连接到 Impala 并执行查询
conn_impala_with_kerberos(host, port, kerberos_service_name, principal, keytab)
HDFS样例代码
# -*- coding: utf-8 -*-
from krbcontext import krbcontext
from hdfs import InsecureClient
from hdfs.ext.kerberos import KerberosClient
def conn_hdfs_with_kerberos(host, port, principal, keytab,local_path,hdfs_path):
"""
使用 krbcontext 获取 Kerberos 票据并连接到HDFS。
"""
with krbcontext(using_keytab=True, principal=principal, keytab_file=keytab):
# 系统操作符
hdfs_url = 'http://' + host + ':' + port
client = KerberosClient(hdfs_url)
files = client.list(hdfs_path)
print(files)
if __name__ == '__main__':
# 配置参数
# 替换为namenode服务所在主机名,可通过easyops中基础组件--hdfs--default_hdfs--namenode查看
host = 'dsc-demo07.jdlt.163.org'
# webhdfs 的默认端口
port = '50070'
# 指定用于认证的keytab文件绝对路径
keytab = '/home/xxxx/xxxx.keytab'
# 指定keytab文件的principal,可通过平台-用户中心查看或通过命令行 `klist -kt keytab文件`查看
principal = 'bdms_xxxx/dev@BDMS_DEMO.COM'
# 指定本地路径
local_path = '/home/xxxxx'
# 指定hdfs路径
hdfs_path = '/user/dsc_support'
# 连接到HDFS并进行相关操作
conn_hdfs_with_kerberos(host, port, principal, keytab,local_path,hdfs_path)
# 更多API参考https://hdfscli.readthedocs.io/en/latest/api.html#api-reference
@Deprecated
以下代码为23年以前测试通过,默认基于最新版本安装python模块存在代码兼容性问题,不在维护
安装python模块
pip install --upgrade setuptools
pip install sasl
pip install thrift
pip install thrift-sasl
pip install impyla
pip install krbcontext
## 以下为ibis依赖
pip install ibis-framework
pip install future
pip install PyHive
pip install thriftpy
pip install --ignore-installed requests hdfs[kerberos]
网络端口授权
KDC:750 88
Namenode:8020
ResourceManager:8030 8031 8032
Hiveserver2:9999
Metastore:9083
MySQL:3306
客户端配置
从集群节点拷贝
krb5.conf
配置文件至客户端主机/etc/
目录下;从集群执行节点拷贝spark及hdfs文件夹至客户端主机;
拷贝
hive-site.xml
配置文件至spark/conf
文件夹下;确认hdfs配置文件路径为
$HADOOP_HOME/etc/hadoop
下,否则需手动拷贝配置文件至该路径下;(仅需要保留hdfs-site.xml
、core-site.xml
、yarn-site.xml
及hadoop-env.sh
)删除所有配置文件中关于集群路径信息相关配置参数,避免日志打印异常信息干扰;
如需自定义
krb5.conf
及认证缓存文件路径,则在hadoop-env.sh
中添加以下环境该变量export KRB5_CONFIG="$HADOOP_CONF_DIR"/krb5.conf export KRB5CCNAME="$HADOOP_CONF_DIR"/krb5cc_$UID export HADOOP_OPTS="-Djava.security.krb5.conf=$KRB5_CONFIG"
python链接Hive脚本
方案一:提交至默认队列
#!/usr/bin/python
from impala.dbapi import connect
from krbcontext import krbcontext
config = {
"kerberos_principal": "hive/bigdata-demo1.jdlt.163.org@BDMS.163.COM",
"keytab_file": '/home/wangsong03/hive.service.keytab',
"kerberos_ccache_file": '/home/wangsong03/hive_ccache_uid',
"AUTH_MECHANISM": "GSSAPI"
}
with krbcontext(using_keytab=True,
principal=config['kerberos_principal'],
keytab_file=config['keytab_file'],
ccache_file=config['kerberos_ccache_file']):
conn = connect(host='bigdata-demo1.jdlt.163.org', port=9999, auth_mechanism='GSSAPI',kerberos_service_name='hive')
cur = conn.cursor()
cur.execute('SHOW databases')
print(cur.fetchall())
cur.close()
conn.close()
方案二:提交至指定队列
#!/usr/bin/python
from pyhive import hive
from krbcontext import krbcontext
config = {
"kerberos_principal": "jzt_dmp/dev@BDMS.163.COM",
"keytab_file": '/root/jzt_dmp.keytab',
"kerberos_ccache_file": './hive_ccache_uid',
"AUTH_MECHANISM": "GSSAPI"
}
with krbcontext(using_keytab=True,
principal=config['kerberos_principal'],
keytab_file=config['keytab_file'],
ccache_file=config['kerberos_ccache_file']):
conn = hive.connection(host='bigdata004.dmp.jztweb.com', port=9999, auth_mechanism='GSSAPI',kerberos_service_name='hive',configuration={"mapreduce.job.queuename":"root.schedule_queue"})
cur = conn.cursor()
cur.execute('select count(1) from b2b_ods.dim_plat');
print(cur.fetchone())
cur.close()
conn.close()
python链接Impala脚本
#!/usr/bin/python
from impala.dbapi import connect
from krbcontext import krbcontext
config = {
"kerberos_principal": "bdms_wangsong03/dev@BDMS.163.COM",
"keytab_file": '/home/wangsong03/bdms_wangsong03.keytab',
"kerberos_ccache_file": '/home/wangsong03/wangsong03_ccache_uid',
"AUTH_MECHANISM": "GSSAPI"
}
with krbcontext(using_keytab=True,
principal=config['kerberos_principal'],
keytab_file=config['keytab_file'],
ccache_file=config['kerberos_ccache_file']):
conn = connect(host='bigdata-demo5.jdlt.163.org', port=21050, auth_mechanism='GSSAPI',kerberos_service_name='impala')
cur = conn.cursor()
cur.execute('SHOW databases')
print(cur.fetchall())
cur.close()
conn.close()
python链接hdfs脚本
#!/usr/bin/python
import ibis
from krbcontext import krbcontext
conf={
"impala_host":"bigdata-demo5.jdlt.163.org",
"impala_port":21050,
"kerberos_service_name":"impala",
"auth_mechanism":"GSSAPI",
"webhdfs_host1":"bigdata-demo1.jdlt.163.org",
"webhdfs_host2":"bigdata-demo2.jdlt.163.org",
"webhdfs_port":50070
}
config = {
"kerberos_principal": "bdms_wangsong03/dev@BDMS.163.COM",
"keytab_file": '/home/wangsong03/bdms_wangsong03.keytab',
"kerberos_ccache_file": '/home/wangsong03/wangsong03_ccache_uid',
"AUTH_MECHANISM": "GSSAPI"
}
with krbcontext(using_keytab=True,
principal=config['kerberos_principal'],
keytab_file=config['keytab_file'],
ccache_file=config['kerberos_ccache_file']):
# get hdfs_connect
try:
hdfs_client=ibis.hdfs_connect(host=conf["webhdfs_host1"],port=conf["webhdfs_port"],auth_mechanism=conf["auth_mechanism"],use_https=False,verify=True)
hdfs_client.ls("/")
except:
hdfs_client=ibis.hdfs_connect(host=conf["webhdfs_host2"],port=conf["webhdfs_port"],auth_mechanism=conf["auth_mechanism"],use_https=False,verify=False)
hdfs_client.ls("/")
print(hdfs_client.ls('/user'))
# connect impala method2
impala_client=ibis.impala.connect(host=conf["impala_host"],port=conf["impala_port"],hdfs_client = hdfs_client, auth_mechanism=conf["auth_mechanism"], timeout = 300)
res=impala_client.sql("""select * from poc.demo limit 10""")
print(res.execute())
pyspark提交任务至kerberos集群
# _*_ coding: utf-8 _*_
import findspark
findspark.init()
import os
os.environ['JAVA_HOME']='/usr/lib64/jdk8'
os.environ['SPARK_HOME']='~/spark2'
os.environ['HADOOP_HOME']='~/hadoop'
os.environ['HADOOP_CONF_DIR']='~/hadoop/etc/hadoop'
# 增加client模式driver内存
memory = '10g'
pyspark_submit_args = ' --driver-memory ' + memory + ' pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args
from krbcontext import krbcontext
from pyspark import SparkConf, SparkContext
class CreateSparksession():
def createSpark(self):
conf = {"appname": "demo", "driver_memory": "4g", "executor_memory": "4g", "executor_cores": 2, "executor_num": 2, "master": "yarn", "deploy_mode": "client"}
sc = SparkConf()
sc.setMaster(conf['master']) \
.setAppName(conf['appname']) \
.set('spark.driver.memory', conf['driver_memory']) \
.set('spark.executor.memory', conf['executor_memory']) \
.set('spark.executor.cores', conf['executor_cores']) \
.set('spark.deploy_mode', conf["deploy_mode"])\
.set('spark.yarn.queue', 'root.poc')\
.set('spark.executor.memoryOverhead', '2g')\
.set('spark.driver.memoryOverhead', '2g')
spark= SparkSession.builder.config(conf=sc).enableHiveSupport().getOrCreate()
sctx = spark.sparkContext
return spark, sctx
config = {
"kerberos_principal": "bdms_wangsong03/dev@BDMS.163.COM",
"keytab_file": '/home/wangsong03/bdms_wangsong03.keytab',
"kerberos_ccache_file": '/home/wangsong03/wangsong03_ccache_uid',
"AUTH_MECHANISM": "GSSAPI"
}
with krbcontext(using_keytab=True,
principal=config['kerberos_principal'],
keytab_file=config['keytab_file'],
ccache_file=config['kerberos_ccache_file']):
spark, sctx = CreateSparksession().createSpark()
rdd = sctx.textFile("hdfs://easyops-cluster/user/poc/pysparkdemo/demo.txt")
print(rdd.collect())
spark.sql("select count(*) from poc.demo").show()
sctx.stop()
spark.stop()
通过python基础模块实现kerberos认证
创建上下文管理器
import os,sys
import subprocess
from contextlib import contextmanager
def KRB5KinitError(Exception):
pass
def kinit_with_keytab(keytab_file,principal,ccache_file):
'''
initialize kerberos using keytab file
return the tgt filename
'''
cmd = 'kinit -kt %(keytab_file)s -c %(ccache_file)s %(principal)s'
args = {}
args['keytab_file'] = keytab_file
args['principal'] = principal
args['ccache_file'] = ccache_file
kinit_proc = subprocess.Popen(
(cmd % args).split(),
stderr = subprocess.PIPE)
stdout_data,stderr_data = kinit_proc.communicate()
if kinit_proc.returncode >0:
raise KRB5KinitError(stderr_data)
return ccache_file
@contextmanager
def krbcontext(using_keytab=False,**kwargs):
'''
A context manager for krberos-related actions
Using_keytab: specify to use keytab file in kerberos context if true, or be as a regular user.
kwargs:contains the necessary arguments used in kerberos context, it can contain principal,keytab_file, ccache_file
'''
env_name='KRB5CCNAME'
h_ccache = os.getenv(env_name)
ccache_file = kinit_with_keytab(**kwargs)
os.environ[env_name] = ccache_file
yield
使用非默认python执行任务,需在代码中指定目标python环境变量
PYSPARK_PYTHON
FAQ
Q:pip安装gssapi模块报错:pycore_frame.h:134:5: error: ‘for’ loop initial declarations are only allowed in C99 mode
A:安装前设置环境变量export CFLAGS="-std=c99"
Q: sys.stderr.write(f"ERROR: {exc}")
A: 因python2 已经停止支持导致pip进行安装时报错,从官网下载2.7版本的get-pip.py,然后安装
wget https://bootstrap.pypa.io/pip/2.7/get-pip.py
python get-pip.py
Q:ImportError: cannot import name TFrozenDict
A:安装pyhive时需添加[hive]后缀,否则有些关联的包装不上,会导致报错
Q: gcc: error trying to exec 'cc1plus' : execvp: No such file or directory
A: 因操作系统缺少基础gcc依赖包导致,通过yum安装即可
yum install gcc-c++
Q:AttributeError: 'SparkConf' object has no attribute '_get_object_id'
A:SparkSession.builder.config(conf = sc)
括号中必须使用conf =sc
Q:Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: bigdata11/10.4.9.68:39005
A:问题表象spark am Container链接client失败,如测试节点间网络端口确实不通,则需申请权限;如测试网络端口正常,则一般为客户端多网卡问题导致;
方案一:调整客户端与集群节点/etc/hosts
中ip主机名映射一致,且映射IP与集群可正常通讯;
方案二:调整集群与客户端hdfs-site.xml配置文件,增加以下参数
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>true</value>
</property>
以上内容对您是否有帮助?