使用python写自定义函数Demo

适用模块

自助分析
离线开发

具体说明

计算玩家的连胜&连败纪录

使用示例

源数据的表结构如下:

CREATE TABLE `t1`(
 `id` bigint, 
 `player_id` int, 
 `game_result` int, 
 `season` int, 
 `game_type` int)
  ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY '\t';
-----------------------------------------------
数据文件如下:

 id:表示比赛id

 player_id:用户id

 game_result:比赛结果,0代表输,1代表赢
id player_id game_result
1 123 1
2 123 1
3 432 0
使用python编写UDF如下:

win_streak_reduce.py

代码如下:
python
#! /usr/bin/python

import sys

for line in sys.stdin:
        line = line.split('\t')
        player_id = line[0]
        games = [int(i) for i in line[1].split(',')]
        win_streak_max = 0
        lost_streak_max = 0
        win_tmp = 0
        lost_tmp = 0
        for i in games:
                if i == 1:
                        lost_tmp = 0
                        win_tmp += 1
                else:
                        lost_tmp += 1
                        win_tmp = 0
                if win_tmp > win_streak_max:
                        win_streak_max = win_tmp
                if lost_tmp > lost_streak_max:
                        lost_streak_max = lost_tmp
        print "{player_id}\t{win_streak_max}\t{lost_streak_max}".format(player_id = player_id, win_streak_max = win_streak_max, lost_streak_max = lost_streak_max)

最后查询:

add file /win_streak_reduce.py;
select transform(player_id,games) using 'python win_streak_reduce.py' as player_id,win_streak,lost_streak from (select player_id,concat_ws(',',collect_list(cast(game_result as string))) games from (select player_id,id, game_result from t1 cluster by player_id,id) t  group by player_id) t;

整体思路是:
1. 将比赛记录按照player_id,id排序,HQL中的cluster by player_id,id,会使同一个player_id的数据被分发到一起处理。
2. concat_ws(',',collect_list(cast(game_result as string)))会把玩家的比赛结果按照id从小打到排成数组,最后转成字符串,用,分隔
3. 使用udf对比赛结果进行计算,找出连胜和连败最大值

作者:焦巍