Demo - 流水型数据抽取
更新时间: 2024-03-11 02:51:19
阅读 1406
DEMO-流水型数据抽取
适用模块
具体说明
#背景
我们通常将源端关系型数据库的表分为流水型和日志型。
日志型数据一般写入数据库后不会发生变更,比如某APP的用户登录表,这种登录日志写入数据库后一般就不会再去更新。
流水型数据是指数据写入数据库之后,还会被更新,比如订单表,订单表通常都会有`order_status`字段表示订单状态,这个字段是经常会被更新的。
使用示例
##前提条件
当我们希望把这种流水型的表抽取到大数据环境时,要解决以下几个问题:
###源端数据库的表需要有字段来标识出增量
以MySQL为例,需要有`CURRENT_TIMESTAMP`字段。
可以用如下方式创建MySQL的表
`DEFAULT CURRENT_TIMESTAMP` 表示当插入数据的时候,该字段默认值为当前时间
`ON UPDATE CURRENT_TIMESTAMP` 表示每次更新这条数据的时候,该字段都会更新成当前时间
```sql
CREATE TABLE `mytest` (
`text` varchar(255) DEFAULT '' COMMENT '内容',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
这两个操作是MySQL数据库本身在维护,所以可以根据这个特性来生成创建时间和更新时间两个字段,且不需要代码来维护。
###目标端merge问题
为了提高数据写入大数据环境后的merge效率,可以使用HBase作为数据同步的目标端。
###Hive读取HBase表的问题
使用Hive over HBase,用Hive来读取HBase表里的数据。
#操作方法
##Step1:创建HBase表
登录到HBase Regionserver所在的服务器
```shell
10.4.14.61
bigdata009.dmp.jztweb.com
进行kerberos认证
```shell
[root@bigdata009 ~]# cd /usr/easyops/hbase/hbase_dev_hbase/client/20210329172526784486e7893/keytab/
[root@bigdata009 keytab]# ll
total 8
-r--r-----. 1 hbase hadoop 722 Mar 29 17:28 hbase.keytab
-r---w----. 1 hbase hadoop 714 Mar 29 17:28 spengo.keytab
[root@bigdata009 keytab]# klist -kt hbase.keytab
Keytab name: FILE:hbase.keytab
KVNO Timestamp Principal
---- ----------------- --------------------------------------------------------
1 03/29/21 17:28:48 hbase/bigdata009.dmp.jztweb.com@BDMS.163.COM
1 03/29/21 17:28:48 hbase/bigdata009.dmp.jztweb.com@BDMS.163.COM
1 03/29/21 17:28:48 hbase/bigdata009.dmp.jztweb.com@BDMS.163.COM
1 03/29/21 17:28:48 hbase/bigdata009.dmp.jztweb.com@BDMS.163.COM
1 03/29/21 17:28:48 hbase/bigdata009.dmp.jztweb.com@BDMS.163.COM
1 03/29/21 17:28:48 hbase/bigdata009.dmp.jztweb.com@BDMS.163.COM
1 03/29/21 17:28:48 hbase/bigdata009.dmp.jztweb.com@BDMS.163.COM
1 03/29/21 17:28:48 hbase/bigdata009.dmp.jztweb.com@BDMS.163.COM
[root@bigdata009 keytab]# kinit -kt hbase.keytab hbase/bigdata009.dmp.jztweb.com@BDMS.163.COM
[root@bigdata009 keytab]#
进入HBase shell环境
```shell
[root@bigdata009 keytab]# cd ../current/bin
[root@bigdata009 bin]# ./hbase --config /usr/easyops/hbase/hbase_dev_hbase/client/20210329172526784486e7893/config shell
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/easyops/hbase/hbase_dev_hbase/client/20210329172526784486e7893/package/hbase-1.2.6/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/easyops/hdfs/default_hdfs/client/2021020220051118040fc92d3/package/hadoop-2.9.2-1.1.1.2/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.2.6, rUnknown, Mon May 29 02:25:32 CDT 2017
hbase(main):001:0> list
TABLE
dwmid:tb_gos_sale_salestockoutdet
ndi_Test1_partition2
ndi_test1
ndi_test1_partition
ns1:t1
poc:create_table_demo
t1
7 row(s) in 0.2540 seconds
=> ["dwmid:tb_gos_sale_salestockoutdet", "ndi_Test1_partition2", "ndi_test1", "ndi_test1_partition", "ns1:t1", "poc:create_table_demo", "t1"]
hbase(main):002:0>
创建HBase Table
```shell
/*创建hbase namespace, namespace相当于MySQL的database*/
create_namespace 'dwmid'
/*创建table,预设分区*/
create 'dwmid:tb_gos_sale_salestockoutdet','cf1',SPLITS => ['20000000000','40000000000','60000000000','80000000000','100000000000','120000000000','140000000000','160000000000','180000000000','200000000000','220000000000','240000000000','260000000000','280000000000','300000000000','320000000000','340000000000','360000000000','380000000000','400000000000','420000000000','440000000000','460000000000','480000000000','500000000000','520000000000','540000000000','560000000000','580000000000','600000000000','620000000000','640000000000','660000000000','680000000000','700000000000','720000000000','740000000000','760000000000','780000000000','800000000000','820000000000','840000000000','860000000000','880000000000','900000000000','920000000000','940000000000','960000000000','980000000000','1000000000000','1020000000000','1040000000000','1060000000000','1080000000000','1100000000000','1120000000000','1140000000000','1160000000000','1180000000000','1200000000000','1220000000000','1240000000000','1260000000000','1280000000000','1300000000000','1320000000000','1340000000000','1360000000000','1380000000000','1400000000000','1420000000000','1440000000000','1460000000000','1480000000000','1500000000000','1520000000000','1540000000000','1560000000000','1580000000000','1600000000000','1620000000000','1640000000000','1660000000000','1680000000000','1700000000000','1720000000000','1740000000000','1760000000000','1780000000000','1800000000000','1820000000000','1840000000000','1860000000000','1880000000000','1900000000000','1920000000000','1940000000000','1960000000000','1980000000000','2000000000000']
开放权限
HBase有自己的权限管理模块,为了方便使用,可以开通权限给平台的用户。
```shell
grant 'bdms_liuheng1','RWXCA'
##Step2:将原表初始化到HBase
配置数据传输任务到HBase,初始化的数据量一般比较大,我们推荐使用`bulkload`的方式,来避免大量数据通过put写入对regionserver造成的负载。
示例任务:`orace2hbase_dwmid_tb_gos_sale_salestockoutdet_ini`
任务前面的内容表示数据流向是Oracle传输到HBase,任务的最后表示这个任务是一个数据初始化任务。这类任务一般不设置调度,有需要时,手动执行即可。
任务截图如下:
![](/documents/uploads/projects/service_support/1679903c2ca74234.png)
注意:
在数据去向中需要设置一些高级参数:
```java
conf.hadoop.hbase.security.authentication=kerberos
conf.hadoop.hbase.regionserver.kerberos.principal=hbase/_HOST@BDMS.163.COM
conf.hadoop.hbase.master.kerberos.principal=hbase/_HOST@BDMS.163.COM
conf.hadoop.hbase.rpc.protection=privacy
conf.hadoop.hbase.fs.tmp.dir=/tmp
`splitSize`代表扫描的步长值(默认为1000000),当主键跳跃很大时,可以设置的大一些。
任务配置优化:
初始化任务由于数据较多,为了更好地写入,hbase需要根据rowKey的值进行预分区,分区数需要根据数据量调节。同时在节点属性调节任务使用的内存。在离线开发节点属性添加配置:
```java
ndi.spark.spark-argument.executor-memory=50g
ndi.spark.spark-conf.spark.executor.extraJavaOptions=-XX:PermSIze=1024m -XX:MaxPermSize=1024 -XX:MaxDirectMemorySize=10240m -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps
ndi.spark.spark-conf.spark.reducer.maxBlocksInFlightPerAddress=1
##Step3:配置增量任务
增量任务会把增量数据定时同步到HBase中,增量的数据一般不大,我们可以采用put的方式。增量的选择方式,可以选择:
1. 选择普通类型,按照自然日期来筛选T+1的增量;
2. 选择流水型,选择`LASTMODIFYTIME`字段作为流水键。
示例任务:`oracle2hbase_dwmid_tb_gos_sale_salestockoutdet_di`
任务截图如下:
![](/documents/uploads/projects/service_support/1679905e78a7ee3f.png)
注意:
每天的增量数据不是很多,一般情况下一个并发足够,为了避免数据传输去对pk进行split,我们可以设置`partitionSize`大一些。可以在数据来源中增加高级属性:
```java
partitionSize=1000000g
在数据去向中需要设置一些高级参数:
```java
conf.hadoop.hbase.security.authentication=kerberos
conf.hadoop.hbase.rpc.protection=privacy
任务在数据传输页面测试时,需要在运行设置页面的高级属性添加:
`ndi.spark.spark-argument.files=/tmp/hbase/conf/hbase-site.xml`
该值不会进行保存,仅用于测试,在离线开发调度时,需要将该配置添加在节点属性。
任务在离线开发中调度时,需要添加节点属性:
```java
ndi.spark.spark-argument.files=/tmp/hbase/conf/hbase-site.xml
任务抽取速度调优:
数据传输在并行度大于1时或者表的数据大于1G(高级配置`partitionSize`的默认值)时,自动开启切分,当需要抽取的数据量较少,而整表的数据较多时,切分过程会耗费大量的时间,可能超过本身数据抽取的时间。因此,当抽取的数据量较少,可以通过高级配置的属性`partitionSize`调节不开启切分,例如将`patitionSize`设置为100000000g。
##Step4:建立Hive表和HBase表的映射
在Hive中创建表,映射HBase的表。
````sql
create external table jzt_dmp.test23123123123test (
PK string,
FK string,
LINEID string,
CREATETIME string,
LASTMODIFYTIME string,
BRANCHID string,
DELETEFLAG string,
NOTE string,
BILLID string,
BILLINGDATE string,
PRODID string,
LOTNO string,
STERILELOTNO string,
PRODUCTDATE string,
LOTEXPIREDATE string,
QUANTITY string,
PRICE string,
AMOUNT string,
GROSSPROFIT string,
WHSEID string,
STOCKOUTTASKPK string,
LOOSEQUANTITY string,
DIRSALEQUANTITY string,
SALEORDERDETPK string,
QUALITYSTATE string,
PATCHUPBOX string,
SETTLEMENTPRICE string,
COSTACCOUNTING string,
APPROVALNO string,
PROSELLGUIDECOEFFICIENT string,
CANCELQTY string,
QUANTITYSUM string,
RETURNQUANTITY string,
PAYMENTS string,
ISSETTLEMENT string,
ENTIREQUANTITY string,
TAXRATE string,
LOTINVENPK string,
IS_INVOICE string,
IS_FINISHINVOICE string,
INVOICEDQTY string,
INVOICEDAMOUNT string,
VERSION string,
COSTPRICE string,
REALGROSSPROFIT string,
RETAILPRICE string,
PRODNO string,
PRODNAME string,
MANUFACTURE string,
BIGPACKAGEQUANTITY string,
PACKGEUNIT string,
PRODSPECIFICATION string,
WHSENAME string,
BUSITYPETEXT string,
QUALITYSTATENAME string,
CHINESEDRUGYIELDLY string,
PRODDOSAGEFORMNOTEXT string,
ORDERREALGROSSPROFIT string,
IS_PRINTREPORT string,
OUTCHECKSTAFFNAME string,
CHECKSTAFFNAME string,
CUSTOMERID string,
TWOCUSTID string,
STOREID string,
SIGNINDATE string,
ARRIVEDATE string,
SIGNINREMARK string,
SIGNINSTAFFID string,
SIGNINSTAFFNAME string,
SIGNINDEPTID string,
SIGNINSTATE string,
PICKSTAFFID string,
PICKSTAFFNAME string,
CHECKPICKSTAFFID string,
CHECKPICKSTAFFNAME string,
CONFIRMSTATE string,
AREACODE string,
ZJWHSENAME string,
EXECPRICE string,
QUOTATIONBILLID string,
OLDCOSTACCOUNTING string,
CONFRIMQUANTITY string,
CONFRIMTIME string,
CONFRIMSTAFFID string,
CONFRIMSTAFFNAME string,
IS_FINISHCONFRIM string,
ORDERSOURCE string,
ORDERSOURCEID string,
ORDERSOURCEDETID string,
TRADEPRICE string,
YLQXXKZ string,
YLQXSCBA string,
ISENDSCAN string,
QRCODELIST string,
AREANAME string,
SELLTYPE string,
PRODBARCODE string,
KEYWORDABBR string,
IS_SPECIALDRUGS string,
STORAGECONDITION string,
PROSCOPENO string,
ISELECTRONICMONITORING string,
SCQYXKZ string,
STORAGENOTE string,
MANUFACTUREPRODNO string,
ISPICKTWOBILL string,
YOUX_PRINT string,
GROUPZGBM string,
GROUPZGBMTEXT string,
VRPURCHASEDEPID string,
VRPURCHASEDEPNAME string,
VIRTUALSALEPRICE string,
VIRTUALAMOUNT string,
VRSALEDEPID string,
VIRTUALRULEPK string,
VRSALEDEPNAME string,
PROMOTEPERCENT string,
ISPRINT string,
ISCUSTGROUPPURCHASE string,
ISPRODGROUPPURCHASE string,
CURRENCYNAME string,
EXCHANGERATE string,
FOREIGNCURRENCYPRICE string,
FOREIGNCURRENCYAMOUNT string,
REGISTERPRICE string,
SHORTPRODUCTDATE string,
SHORTLOTEXPIREDATE string,
STOCKCLASSIFICATIONID string,
STOCKCLASSIFICATIONNAME string,
NEWSALEORDERDETPK string,
ACC_BILLSTATE string,
CEN_GROUPZGBM string,
CEN_GROUPZGBMTEXT string,
CEN_STOCKCLASSIFICATIONID string,
CEN_STOCKCLASSIFICATIONNAME string,
CEN_GROSSPROFIT string,
CEN_COSTPRICE string
) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties ("hbase.columns.mapping" = ":key,cf1:FK,cf1:LINEID,cf1:CREATETIME,cf1:LASTMODIFYTIME,cf1:BRANCHID,cf1:DELETEFLAG,cf1:NOTE,cf1:BILLID,cf1:BILLINGDATE,cf1:PRODID,cf1:LOTNO,cf1:STERILELOTNO,cf1:PRODUCTDATE,cf1:LOTEXPIREDATE,cf1:QUANTITY,cf1:PRICE,cf1:AMOUNT,cf1:GROSSPROFIT,cf1:WHSEID,cf1:STOCKOUTTASKPK,cf1:LOOSEQUANTITY,cf1:DIRSALEQUANTITY,cf1:SALEORDERDETPK,cf1:QUALITYSTATE,cf1:PATCHUPBOX,cf1:SETTLEMENTPRICE,cf1:COSTACCOUNTING,cf1:APPROVALNO,cf1:PROSELLGUIDECOEFFICIENT,cf1:CANCELQTY,cf1:QUANTITYSUM,cf1:RETURNQUANTITY,cf1:PAYMENTS,cf1:ISSETTLEMENT,cf1:ENTIREQUANTITY,cf1:TAXRATE,cf1:LOTINVENPK,cf1:IS_INVOICE,cf1:IS_FINISHINVOICE,cf1:INVOICEDQTY,cf1:INVOICEDAMOUNT,cf1:VERSION,cf1:COSTPRICE,cf1:REALGROSSPROFIT,cf1:RETAILPRICE,cf1:PRODNO,cf1:PRODNAME,cf1:MANUFACTURE,cf1:BIGPACKAGEQUANTITY,cf1:PACKGEUNIT,cf1:PRODSPECIFICATION,cf1:WHSENAME,cf1:BUSITYPETEXT,cf1:QUALITYSTATENAME,cf1:CHINESEDRUGYIELDLY,cf1:PRODDOSAGEFORMNOTEXT,cf1:ORDERREALGROSSPROFIT,cf1:IS_PRINTREPORT,cf1:OUTCHECKSTAFFNAME,cf1:CHECKSTAFFNAME,cf1:CUSTOMERID,cf1:TWOCUSTID,cf1:STOREID,cf1:SIGNINDATE,cf1:ARRIVEDATE,cf1:SIGNINREMARK,cf1:SIGNINSTAFFID,cf1:SIGNINSTAFFNAME,cf1:SIGNINDEPTID,cf1:SIGNINSTATE,cf1:PICKSTAFFID,cf1:PICKSTAFFNAME,cf1:CHECKPICKSTAFFID,cf1:CHECKPICKSTAFFNAME,cf1:CONFIRMSTATE,cf1:AREACODE,cf1:ZJWHSENAME,cf1:EXECPRICE,cf1:QUOTATIONBILLID,cf1:OLDCOSTACCOUNTING,cf1:CONFRIMQUANTITY,cf1:CONFRIMTIME,cf1:CONFRIMSTAFFID,cf1:CONFRIMSTAFFNAME,cf1:IS_FINISHCONFRIM,cf1:ORDERSOURCE,cf1:ORDERSOURCEID,cf1:ORDERSOURCEDETID,cf1:TRADEPRICE,cf1:YLQXXKZ,cf1:YLQXSCBA,cf1:ISENDSCAN,cf1:QRCODELIST,cf1:AREANAME,cf1:SELLTYPE,cf1:PRODBARCODE,cf1:KEYWORDABBR,cf1:IS_SPECIALDRUGS,cf1:STORAGECONDITION,cf1:PROSCOPENO,cf1:ISELECTRONICMONITORING,cf1:SCQYXKZ,cf1:STORAGENOTE,cf1:MANUFACTUREPRODNO,cf1:ISPICKTWOBILL,cf1:YOUX_PRINT,cf1:GROUPZGBM,cf1:GROUPZGBMTEXT,cf1:VRPURCHASEDEPID,cf1:VRPURCHASEDEPNAME,cf1:VIRTUALSALEPRICE,cf1:VIRTUALAMOUNT,cf1:VRSALEDEPID,cf1:VIRTUALRULEPK,cf1:VRSALEDEPNAME,cf1:PROMOTEPERCENT,cf1:ISPRINT,cf1:ISCUSTGROUPPURCHASE,cf1:ISPRODGROUPPURCHASE,cf1:CURRENCYNAME,cf1:EXCHANGERATE,cf1:FOREIGNCURRENCYPRICE,cf1:FOREIGNCURRENCYAMOUNT,cf1:REGISTERPRICE,cf1:SHORTPRODUCTDATE,cf1:SHORTLOTEXPIREDATE,cf1:STOCKCLASSIFICATIONID,cf1:STOCKCLASSIFICATIONNAME,cf1:NEWSALEORDERDETPK,cf1:ACC_BILLSTATE,cf1:CEN_GROUPZGBM,cf1:CEN_GROUPZGBMTEXT,cf1:CEN_STOCKCLASSIFICATIONID,cf1:CEN_STOCKCLASSIFICATIONNAME,cf1:CEN_GROSSPROFIT,cf1:CEN_COSTPRICE")
tblproperties("hbase.table.name" = "dwmid:tb_gos_sale_salestockoutdet");
#结果验证
我们在自助分析中,使用Hive查询表,可以拿到结果。
注意:Spark和Impala目前不能查询这类表。
```sql
select count(1) from jzt_dmp.tb_gos_sale_salestockoutdet limit 10
select * from jzt_dmp.tb_gos_sale_salestockoutdet limit 10
文档反馈
以上内容对您是否有帮助?