Python链接Hadoop集群[Kerberos]

适用模块

客户端

具体说明

Python脚本操作HDFSHiveImpala

系统环境准备

操作系统依赖包安装
# 安装kerberos客户端
yum install -y krb5-lib krb5-workstation

# 安装python相关模块系统依赖包
yum install  libffi-devel python-devel openssl-devel  cyrus-sasl cyrus-sasl-devel cyrus-sasl-lib  gcc-c++ -y
系统环境变量配置
# 配置环境变量  其中xxx替换为节点上jdk实际安装目录)
echo  "export JAVA_HOME=xxx" >~/.bash_profile
source ~/.bash_profile

Python虚拟环境安装

为避免污染主机python环境,建议通过miniconda安装虚拟python环境;

wget https://mirrors.tuna.tsinghua.edu.cn/anaconda/miniconda/Miniconda3-latest-Linux-x86_64.sh

chmod +x  ./Miniconda3-latest-Linux-x86_64.sh

# 安装过程根据提示选择一个合适的目录安装
./Miniconda3-latest-Linux-x86_64.sh


# 查看conda版本
conda --version

# 查看base python版本
python -V
Python依赖包安装
Python3环境准备
# 基于conda安装Python3
conda create -n python3  python=3.12

# 激活已安装python3虚拟环境
conda activate python3

# 安装连接集群第三方python模块

## kerberos模块
export CFLAGS="-std=c99" # gssapi模块安装报错时可引入该环境变量解决
pip3 install gssapi==1.8.3 
pip3 install  kerberos==1.3.0  krbcontext==0.10 

## Hive/Impala模块
pip3 install impyla==0.19.0 thrift==0.16.0 thrift-sasl==0.4.3 six==1.16.0 bitarray==2.9.2

## hdfs模块
pip3 install hdfs==2.7.3 requests-kerberos==0.15.0
Python2环境准备
# 基于conda安装Python2

# 查找当前可用python版本
conda search python
conda create -n python2  python=2.7.18

# 激活已安装python2虚拟环境
conda activate python2

# 安装连接集群第三方python模块

## kerberos模块
pip install gssapi==1.6.2 
pip install  kerberos==1.3.0  krbcontext==0.10 

## Hive/Impala模块
pip install impyla==0.19.0 thrift==0.16.0 thrift-sasl==0.4.3 six==1.16.0 bitarray==2.9.2

## hdfs模块
pip install hdfs==2.7.0 requests-kerberos==0.12.0

Python样例代码

Hive样例代码
# -*- coding: utf-8 -*-

from krbcontext import krbcontext
from impala.dbapi import connect

def conn_hive_with_kerberos(host, port, kerberos_service_name, principal, keytab):
    """
    使用 krbcontext 进行上下文管理票据的生成与销毁,进入with时生成票据,退出时自动销毁票据。
    """
    with krbcontext(using_keytab=True, principal=principal, keytab_file=keytab):
        # 连接到 Impala
        conn = connect(
            host=host,
            port=port,
            auth_mechanism='GSSAPI',
            kerberos_service_name=kerberos_service_name,
            use_ssl=False,  # 如果使用 SSL,请设置为 True
        )

        # 创建游标
        cursor = conn.cursor()

        # 执行查询
        ## 指定队列
        cursor.execute('set mapreduce.job.queuename=root.dsc_support')
        cursor.execute('show databases')

        # 获取结果
        results = cursor.fetchall()
        for row in results:
            print(row)

        # 关闭连接
        cursor.close()
        conn.close()

if __name__ == '__main__':
    # 配置参数
    # 替换为Impalad服务所在主机名,可通过easyops中基础组件--Hive-default_hive-Hiveserver查看
    host = 'dsc-demo08.jdlt.163.org'  
     # HiveServer2默认端口为9999
    port = 9999         
    # 当链接引擎为hive时,默认为impala
    kerberos_service_name = 'hive'
    # 指定用于认证的keytab文件绝对路径
    keytab = '/home/xxxx/xxxx.keytab'
    # 指定keytab文件的principal,可通过平台-用户中心查看或通过命令行 `klist -kt  keytab文件`查看
    principal = 'bdms_xxxx/dev@BDMS_DEMO.COM'

    # 连接到 hive 并执行查询
    conn_hive_with_kerberos(host, port, kerberos_service_name, principal, keytab)
Impala样例代码

通过impyla模块仅支持单点模式链接Impala服务;如需用于生产环境,需自行通过Python轮询多个Impalad节点保证可用性;

# -*- coding: utf-8 -*-

from krbcontext import krbcontext
from impala.dbapi import connect

def conn_impala_with_kerberos(host, port, kerberos_service_name, principal, keytab):
    """
    使用 krbcontext 进行上下文管理票据的生成与销毁,进入with时生成票据,退出时自动销毁票据。
    """
    with krbcontext(using_keytab=True, principal=principal, keytab_file=keytab):
        # 连接到 Impala
        conn = connect(
            host=host,
            port=port,
            auth_mechanism='GSSAPI',
            kerberos_service_name=kerberos_service_name,
            use_ssl=False  # 如果使用 SSL,请设置为 True
        )

        # 创建游标
        cursor = conn.cursor()

        # 执行查询
        cursor.execute('show databases')

        # 获取结果
        results = cursor.fetchall()
        for row in results:
            print(row)

        # 关闭连接
        cursor.close()
        conn.close()

if __name__ == '__main__':
    # 配置参数
    # 替换为Impalad服务所在主机名,可通过easyops中基础组件--impala--default_impala--impalad查看
    host = 'dsc-demo02.jdlt.163.org'  
     # Impala默认端口为21050
    port = 21050         
    # 当链接引擎为impala时,默认为impala
    kerberos_service_name = 'impala'
    # 指定用于认证的keytab文件绝对路径
    keytab = '/home/xxxx/xxxx.keytab'
    # 指定keytab文件的principal,可通过平台-用户中心查看或通过命令行 `klist -kt  keytab文件`查看
    principal = 'bdms_xxxx/dev@BDMS_DEMO.COM'

    # 连接到 Impala 并执行查询
    conn_impala_with_kerberos(host, port, kerberos_service_name, principal, keytab)
HDFS样例代码
# -*- coding: utf-8 -*-

from krbcontext import krbcontext
from hdfs import InsecureClient
from hdfs.ext.kerberos import KerberosClient


def conn_hdfs_with_kerberos(host, port, principal, keytab,local_path,hdfs_path):
    """
    使用 krbcontext 获取 Kerberos 票据并连接到HDFS。
    """
    with krbcontext(using_keytab=True, principal=principal, keytab_file=keytab):

        # 系统操作符
        hdfs_url = 'http://' + host + ':' + port
        client = KerberosClient(hdfs_url)
        files = client.list(hdfs_path)
        print(files)


if __name__ == '__main__':
    # 配置参数
    # 替换为namenode服务所在主机名,可通过easyops中基础组件--hdfs--default_hdfs--namenode查看
    host = 'dsc-demo07.jdlt.163.org'
    # webhdfs 的默认端口
    port = '50070'
    # 指定用于认证的keytab文件绝对路径
    keytab = '/home/xxxx/xxxx.keytab'
    # 指定keytab文件的principal,可通过平台-用户中心查看或通过命令行 `klist -kt  keytab文件`查看
    principal = 'bdms_xxxx/dev@BDMS_DEMO.COM'
    # 指定本地路径
    local_path = '/home/xxxxx'
    # 指定hdfs路径
    hdfs_path = '/user/dsc_support'

    # 连接到HDFS并进行相关操作
    conn_hdfs_with_kerberos(host, port, principal, keytab,local_path,hdfs_path)

# 更多API参考https://hdfscli.readthedocs.io/en/latest/api.html#api-reference

@Deprecated

以下代码为23年以前测试通过,默认基于最新版本安装python模块存在代码兼容性问题,不在维护

安装python模块
pip install --upgrade setuptools
pip install sasl
pip install thrift
pip install thrift-sasl
pip install impyla
pip install krbcontext

## 以下为ibis依赖
pip install ibis-framework
pip install future
pip install PyHive
pip install thriftpy
pip install --ignore-installed requests hdfs[kerberos]
网络端口授权

KDC:750 88

Namenode:8020

ResourceManager:8030 8031 8032

Hiveserver2:9999

Metastore:9083

MySQL:3306

客户端配置
  1. 从集群节点拷贝krb5.conf配置文件至客户端主机/etc/目录下;

  2. 从集群执行节点拷贝spark及hdfs文件夹至客户端主机;

  3. 拷贝hive-site.xml配置文件至spark/conf文件夹下;

  4. 确认hdfs配置文件路径为$HADOOP_HOME/etc/hadoop下,否则需手动拷贝配置文件至该路径下;(仅需要保留hdfs-site.xmlcore-site.xmlyarn-site.xmlhadoop-env.sh

  5. 删除所有配置文件中关于集群路径信息相关配置参数,避免日志打印异常信息干扰;

  6. 如需自定义krb5.conf及认证缓存文件路径,则在hadoop-env.sh中添加以下环境该变量

    export KRB5_CONFIG="$HADOOP_CONF_DIR"/krb5.conf
    export KRB5CCNAME="$HADOOP_CONF_DIR"/krb5cc_$UID
    export HADOOP_OPTS="-Djava.security.krb5.conf=$KRB5_CONFIG"
python链接Hive脚本
方案一:提交至默认队列
#!/usr/bin/python

from impala.dbapi import connect
from krbcontext import krbcontext

config = {
    "kerberos_principal": "hive/bigdata-demo1.jdlt.163.org@BDMS.163.COM",
    "keytab_file": '/home/wangsong03/hive.service.keytab',
    "kerberos_ccache_file": '/home/wangsong03/hive_ccache_uid',
    "AUTH_MECHANISM": "GSSAPI"
}
with krbcontext(using_keytab=True,
                               principal=config['kerberos_principal'],
                               keytab_file=config['keytab_file'],
                               ccache_file=config['kerberos_ccache_file']):
    conn = connect(host='bigdata-demo1.jdlt.163.org', port=9999, auth_mechanism='GSSAPI',kerberos_service_name='hive')
    cur = conn.cursor()
    cur.execute('SHOW databases')
    print(cur.fetchall())
    cur.close()
    conn.close()
方案二:提交至指定队列
#!/usr/bin/python
from pyhive import hive
from krbcontext import krbcontext
config = {
    "kerberos_principal": "jzt_dmp/dev@BDMS.163.COM",
    "keytab_file": '/root/jzt_dmp.keytab',
    "kerberos_ccache_file": './hive_ccache_uid',
    "AUTH_MECHANISM": "GSSAPI"
}
with krbcontext(using_keytab=True,
                               principal=config['kerberos_principal'],
                               keytab_file=config['keytab_file'],
                               ccache_file=config['kerberos_ccache_file']):
    conn = hive.connection(host='bigdata004.dmp.jztweb.com', port=9999, auth_mechanism='GSSAPI',kerberos_service_name='hive',configuration={"mapreduce.job.queuename":"root.schedule_queue"})
    cur = conn.cursor()
    cur.execute('select count(1) from b2b_ods.dim_plat');

    print(cur.fetchone())
    cur.close()
    conn.close()
python链接Impala脚本
#!/usr/bin/python

from impala.dbapi import connect
from krbcontext import krbcontext

config = {
    "kerberos_principal": "bdms_wangsong03/dev@BDMS.163.COM",
    "keytab_file": '/home/wangsong03/bdms_wangsong03.keytab',
    "kerberos_ccache_file": '/home/wangsong03/wangsong03_ccache_uid',
    "AUTH_MECHANISM": "GSSAPI"
}
with krbcontext(using_keytab=True,
                               principal=config['kerberos_principal'],
                               keytab_file=config['keytab_file'],
                               ccache_file=config['kerberos_ccache_file']):
    conn = connect(host='bigdata-demo5.jdlt.163.org', port=21050, auth_mechanism='GSSAPI',kerberos_service_name='impala')
    cur = conn.cursor()
    cur.execute('SHOW databases')
    print(cur.fetchall())
    cur.close()
    conn.close()
python链接hdfs脚本
#!/usr/bin/python

import ibis

from krbcontext import krbcontext



conf={
"impala_host":"bigdata-demo5.jdlt.163.org",
"impala_port":21050,
"kerberos_service_name":"impala",
"auth_mechanism":"GSSAPI",
"webhdfs_host1":"bigdata-demo1.jdlt.163.org",
"webhdfs_host2":"bigdata-demo2.jdlt.163.org",
"webhdfs_port":50070
}


config = {
    "kerberos_principal": "bdms_wangsong03/dev@BDMS.163.COM",
    "keytab_file": '/home/wangsong03/bdms_wangsong03.keytab',
    "kerberos_ccache_file": '/home/wangsong03/wangsong03_ccache_uid',
    "AUTH_MECHANISM": "GSSAPI"
}
with krbcontext(using_keytab=True,
                               principal=config['kerberos_principal'],
                               keytab_file=config['keytab_file'],
                               ccache_file=config['kerberos_ccache_file']):

  # get hdfs_connect
  try:
    hdfs_client=ibis.hdfs_connect(host=conf["webhdfs_host1"],port=conf["webhdfs_port"],auth_mechanism=conf["auth_mechanism"],use_https=False,verify=True)
    hdfs_client.ls("/")
  except:
    hdfs_client=ibis.hdfs_connect(host=conf["webhdfs_host2"],port=conf["webhdfs_port"],auth_mechanism=conf["auth_mechanism"],use_https=False,verify=False)
    hdfs_client.ls("/")

  print(hdfs_client.ls('/user'))

  # connect impala method2
  impala_client=ibis.impala.connect(host=conf["impala_host"],port=conf["impala_port"],hdfs_client = hdfs_client, auth_mechanism=conf["auth_mechanism"], timeout = 300)
  res=impala_client.sql("""select * from poc.demo limit 10""")
  print(res.execute())
pyspark提交任务至kerberos集群
# _*_ coding: utf-8 _*_
import findspark
findspark.init()

import os
os.environ['JAVA_HOME']='/usr/lib64/jdk8'
os.environ['SPARK_HOME']='~/spark2'
os.environ['HADOOP_HOME']='~/hadoop'
os.environ['HADOOP_CONF_DIR']='~/hadoop/etc/hadoop'


# 增加client模式driver内存
memory = '10g'
pyspark_submit_args = ' --driver-memory ' + memory + '   pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args


from krbcontext import krbcontext
from pyspark import SparkConf, SparkContext


class CreateSparksession():

    def createSpark(self):
        conf = {"appname": "demo", "driver_memory": "4g", "executor_memory": "4g", "executor_cores": 2, "executor_num": 2, "master": "yarn", "deploy_mode": "client"}
        sc = SparkConf()
        sc.setMaster(conf['master']) \
            .setAppName(conf['appname']) \
            .set('spark.driver.memory', conf['driver_memory']) \
            .set('spark.executor.memory', conf['executor_memory']) \
            .set('spark.executor.cores', conf['executor_cores']) \
            .set('spark.deploy_mode', conf["deploy_mode"])\
            .set('spark.yarn.queue', 'root.poc')\
            .set('spark.executor.memoryOverhead', '2g')\
            .set('spark.driver.memoryOverhead', '2g')
        spark= SparkSession.builder.config(conf=sc).enableHiveSupport().getOrCreate()
        sctx = spark.sparkContext
        return spark, sctx


config = {
    "kerberos_principal": "bdms_wangsong03/dev@BDMS.163.COM",
    "keytab_file": '/home/wangsong03/bdms_wangsong03.keytab',
    "kerberos_ccache_file": '/home/wangsong03/wangsong03_ccache_uid',
    "AUTH_MECHANISM": "GSSAPI"
}
with krbcontext(using_keytab=True,
                principal=config['kerberos_principal'],
                keytab_file=config['keytab_file'],
                ccache_file=config['kerberos_ccache_file']):
    spark, sctx = CreateSparksession().createSpark()
    rdd = sctx.textFile("hdfs://easyops-cluster/user/poc/pysparkdemo/demo.txt")
    print(rdd.collect())
    spark.sql("select count(*) from poc.demo").show()
    sctx.stop()
    spark.stop()
通过python基础模块实现kerberos认证
创建上下文管理器
import os,sys
import subprocess
from contextlib import contextmanager


def KRB5KinitError(Exception):
      pass

def kinit_with_keytab(keytab_file,principal,ccache_file):
    ''' 
      initialize kerberos using keytab file
    return the tgt filename
    '''
    cmd = 'kinit -kt %(keytab_file)s -c %(ccache_file)s %(principal)s'
    args = {}

    args['keytab_file'] = keytab_file
    args['principal'] = principal
    args['ccache_file'] = ccache_file

    kinit_proc = subprocess.Popen(
        (cmd % args).split(),
        stderr = subprocess.PIPE)
        stdout_data,stderr_data = kinit_proc.communicate()

    if kinit_proc.returncode >0:
          raise KRB5KinitError(stderr_data)

    return ccache_file


@contextmanager
def krbcontext(using_keytab=False,**kwargs):
  '''
  A context manager for krberos-related actions
  Using_keytab: specify to use keytab file in kerberos context  if true, or be as a regular user.
  kwargs:contains the necessary arguments used in kerberos context, it can contain principal,keytab_file, ccache_file
  '''
  env_name='KRB5CCNAME'
  h_ccache = os.getenv(env_name)
  ccache_file = kinit_with_keytab(**kwargs)
  os.environ[env_name] = ccache_file
  yield

使用非默认python执行任务,需在代码中指定目标python环境变量 PYSPARK_PYTHON

FAQ

Q:pip安装gssapi模块报错:pycore_frame.h:134:5: error: ‘for’ loop initial declarations are only allowed in C99 mode

A:安装前设置环境变量export CFLAGS="-std=c99"

Q: sys.stderr.write(f"ERROR: {exc}")

A: 因python2 已经停止支持导致pip进行安装时报错,从官网下载2.7版本的get-pip.py,然后安装

wget https://bootstrap.pypa.io/pip/2.7/get-pip.py
python get-pip.py

QImportError: cannot import name TFrozenDict

A:安装pyhive时需添加[hive]后缀,否则有些关联的包装不上,会导致报错

Q: gcc: error trying to exec 'cc1plus' : execvp: No such file or directory

A: 因操作系统缺少基础gcc依赖包导致,通过yum安装即可

yum install gcc-c++

QAttributeError: 'SparkConf' object has no attribute '_get_object_id'

ASparkSession.builder.config(conf = sc) 括号中必须使用conf =sc

QCaused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: bigdata11/10.4.9.68:39005

A:问题表象spark am Container链接client失败,如测试节点间网络端口确实不通,则需申请权限;如测试网络端口正常,则一般为客户端多网卡问题导致;

​ 方案一:调整客户端与集群节点/etc/hosts中ip主机名映射一致,且映射IP与集群可正常通讯; ​ 方案二:调整集群与客户端hdfs-site.xml配置文件,增加以下参数

<property>
    <name>dfs.client.use.datanode.hostname</name>
    <value>true</value>
</property>