使用Easystream 实现网站UV、PV、转化率指标的实时统计
更新时间: 2021-12-10 11:20:44
阅读 1375
使用Easystream 实现网站UV、PV、转化率指标的实时统计
- 网站的独立访客数量 UV
- 网站商品页面的点击量 PV
- 转化率(转化率 = 成交次数 / 点击量)
测试数据
# 购买记录
{
"record_type":0, # 0 表示浏览记录
"user_id": 6,
"client_ip": "100.0.0.6",
"product_id": 101,
"create_time": "2021-12-06 16:00:00"
}
{
"record_type":1, # 1 表示购买记录
"user_id": 6,
"client_ip": "100.0.0.8",
"product_id": 101,
"create_time": "2021-12-08 18:00:00"
}
定义source 表
CREATE TABLE `input_web_record` (
`record_type` INT,
`user_id` INT,
`client_ip` VARCHAR,
`product_id` INT,
`create_time` TIMESTAMP,
`times` AS create_time,
WATERMARK FOR times AS times - INTERVAL '10' MINUTE
) WITH (
'connector' = 'kafka',
'topic' = 'uvpv_log',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'xxx:9092',
'properties.group.id' = 'WebRecordGroup', -- 必选参数, 一定要指定 Group ID
'format' = 'json',
'json.ignore-parse-errors' = 'true', -- 忽略 JSON 结构解析异常
'json.fail-on-missing-field' = 'false' -- 如果设置为 true, 则遇到缺失字段会报错 设置为 false 则缺失字段设置为 null
);
定义sink
-- UV sink
为了方便测试使用print 连接进行演示,实际开发使用Easystream处理后可以在支持更新输出库通过主键更新达到了对同一访客的数据去重的目的。以实际业务需求选择对应的sink connector
CREATE TABLE `output_uv` (
`userids` STRING,
uv_c BIGINT
) WITH (
'connector' = 'print'
);
-- PV sink
CREATE TABLE `output_pv` (
`pagevisits` STRING,
`product_id` STRING,
`hour_count` BIGINT
) WITH (
'connector' = 'print'
);
-- 转化率 sink
CREATE TABLE `output_conversion_rate` (
`conversion_rate` STRING,
`rate` STRING
) WITH (
'connector' = 'print'
);
SQL 实现
-- 加工得到 UV 指标,统计所有一天内的 UV
INSERT INTO output_uv
SELECT
'uv' as userids,
count(distinct user_id) AS uv_c
FROM input_web_record
group by TUMBLE(times, INTERVAL '24' HOUR) ;
-- 加工并得到 PV 指标,统计每 10 分钟内的 PV
INSERT INTO output_pv
SELECT
'pagevisits' AS `pagevisits`,
CAST(product_id AS string) AS product_id,
SUM(product_id) AS hour_count
FROM input_web_record WHERE record_type = 0
GROUP BY
HOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
product_id,
user_id;
-- 加工并得到转化率指标,统计每 10 分钟内的转化率
INSERT INTO output_conversion_rate
SELECT
'conversion_rate' AS `conversion_rate`,
CAST( (((SELECT COUNT(1) FROM input_web_record WHERE record_type=0)*1.0)/SUM(a.product_id)) as string)
FROM (SELECT * FROM input_web_record where record_type = 1) AS a
GROUP BY
HOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
product_id;
文档反馈
以上内容对您是否有帮助?