请问大数据如何处理以下这个需求

请看下面这个日志

用户id 视频id 操作
402 40086 开始播放
403 37896 开始播放
764 675 开始播放
1037 8696 开始播放
402 40086 暂停回放
764 675 暂停回放
。。。

。。。

。。。

这是一个视频门户用户播放视频产生的日志。这个日志假设有很多,几十个G

需求是,统计出哪个视频是用户最喜欢暂停回放的。。这个太简单了。。spark sql搞定。。


然而。。。上面那个只是一个理想的日志文件。。真实的日志文件是这样的

用户id    视频id    操作

402 40086 开始播放
403 37896 开始播放
764 675 开始播放
1037 8696 开始播放
402 暂停回放
764 暂停回放
。。。

。。。

。。。

会发现。。。当用户暂停回放的时候,那一行记录是没有视频id的。。。你只能往上面的日志去找了。。。而且如果在某些情况下,两条记录之间可能还会间隔几万条或者几十万条数据。。。
请问这种情况下。。。。。究竟是这个日志本身就是设计得不合理呢。。还是说目前有什么办法可以处理。。如果有办法。。。还请各位教教我

最好当然是日志信息比较完备,这样就可以不需要或者尽可能少的数据清洗处理。时间、设备信息、ip之类都可以考虑记录上。

在日志格式不变的情况下,你只能考虑数据清洗了。基于用户ID为KEY,过滤非开始播放和暂停回放事件,value只能把事件类型和视频ID都带上,
reduce操作就是把所有value拼接起来,拼接过程中补齐暂停播放的videoID。然后输出时候在把拼接的内容拆开输出。

有些设计缺陷,不应该去迁就,用太多多余代码来实现,这对系统性能来说也是很不好的。
不合理就要完善调整,否则此类问题堆积越来越多,系统也就像到处补窟窿,是很可怕的。
当前这个问题,应该将没视频ID的数据补全(确认无法补全,就直接删除掉这些残缺数据,以免影响统计结果)。
然后,再使用SQL进行统计即可。

python代码:

#-*- coding: utf-8 -*-

from __future__ import print_function

import sys

from pyspark.sql import SparkSession

def makeLineTuple(line):
    '''
    generate key,value pair, key is the use_id, value is like "VIDEOID__TYPEID"
    :param line: origin log line.
    :return:  key,value pair
    '''
    sects = line.strip().split(' ')
    video_id = ''
    if len(sects) > 2:
        video_id = sects[1]
        action_type = 0 if sects[2] == u'开始播放' else 1
    else:
        action_type = 0 if sects[1] == u'开始播放' else 1

    value = '%s__%d' % (video_id, action_type)
    return (sects[0], value)

def merge(str1, str2):
    '''
    merge value. value is like "VIDEOID__TYPEID|VIDEOID__TYPEID|VIDEOID__TYPEID...", may be only "VIDEOID__TYPEID"
    :param str1: a string
    :param str2: next string
    :return: return string is same format.
    '''
    sects1 = str1.split('|')
    sects2 = str2.split('|')
    sects1.extend(sects2)
    datas = []
    prev_videoid = None
    for item in sects1:
        video_id, action_type = item.split('__')
        action_type = int(action_type)
        if video_id == '' and action_type == 1:
            if prev_videoid is not None:
                video_id = prev_videoid
        elif action_type == 0:
            prev_videoid = video_id

        datas.append('%s__%d' % (video_id, action_type))
    return '|'.join(datas)

def expand(x):
    '''
    expand string value to original log format. x like "USERID@VIDEOID__TYPEID|VIDEOID__TYPEID|VIDEOID__TYPEID..."
    :param x: userid ref logs item.
    :return: list of original log format data.
    '''
    key, value = x.split('@')
    sects = value.split('|')
    ret_data = []
    for item in sects:
        video_id, action_type = item.split('__')
        action_type = int(action_type)
        ret_data.append('%s %s %s' % (key, video_id, u'开始播放' if action_type == 0 else u'暂停回放'))

    return ret_data


if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage:  fillvideodata <file>", file=sys.stderr)
        exit(-1)

    import os
    os.system('rm -rf outvideo')

    spark = SparkSession \
        .builder \
        .appName("PythonFillVideoData") \
        .getOrCreate()

    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    lines = lines.filter(lambda line: line.find(u'开始播放') >= 0 or line.find(u'暂停回放') >= 0)

    datas = lines.map(lambda x: makeLineTuple(x)) \
        .reduceByKey(merge)

    datas = datas.map(lambda (key, x):(key, key + '@' + x))
    datas = datas.values()
    datas = datas.flatMap(expand)

    datas.saveAsTextFile('outvideo')

    spark.stop() 

输出:

 403 37896 开始播放
402 40086 开始播放
402 40086 暂停回放
764 675 开始播放
764 675 暂停回放
1037 8696 开始播放