import os import sys BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, os.path.join(BASE_DIR)) from online import stream_sc, SIMILAR_DS, pool, NEW_ARTICLE_DS, HOT_DS from datetime import datetime from setting.default import DefaultConfig import redis import json import time import setting.logging as lg import logging logger = logging.getLogger('online') class OnlineRecall(object): """在线召回,解决用户的冷启动 # 1、内容召回 # 2、新文章 # 3、热门文章召回 """ def __init__(self): self.client = redis.StrictRedis(host=DefaultConfig.REDIS_HOST, port=DefaultConfig.REDIS_PORT, db=10) def _online_content_recall(self): """ 在线基于内容的召回 :return: """ # {"actionTime":"2019-04-10 21:04:39","readTime":"","channelId":18,"param":{"action": "click", "userId": "2", "articleId": "116644", "algorithmCombine": "C2"}} def get_similar_article(rdd): """ 对于若干个用户的点击行为立即计算 :param rdd: :return: """ # ['', '', '', '', ] # 取出每一个用户的行为,如果用户点击,分享、收藏这些行为,立刻计算相似文章放入cb_recall, online for data in rdd.collect(): # data 进行判断 if data['param']['action'] in ["click", "collect", "share"]: # dayin logger.info( "{} INFO: get user_id:{} action:{} log".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), data['param']['userId'], data['param']['action'])) # 读取param当中articleId,相似的文章 with pool.connection() as conn: sim_table = conn.table("article_similar") # 根据用户点击流日志涉及文章找出与之最相似文章(基于内容的相似),选取TOP-k相似的作为召回推荐结果 _dic = sim_table.row(data["param"]["articleId"].encode(), columns=[b"similar"]) _srt = sorted(_dic.items(), key=lambda obj: obj[1], reverse=True) # 按相似度排序 if _srt: topKSimIds = [int(i[0].split(b":")[1]) for i in _srt[:10]] # 根据历史推荐集过滤,已经给用户推荐过的文章 history_table = conn.table("history_recall") _history_data = history_table.cells( b"reco:his:%s" % data["param"]["userId"].encode(), b"channel:%d" % data["channelId"] ) history = [] if len(_history_data) > 1: for l in _history_data: history.extend(l) # 根据历史召回记录,过滤召回结果 recall_list = list(set(topKSimIds) - set(history)) # 如果有推荐结果集,那么将数据添加到cb_recall表中,同时记录到历史记录表中 logger.info( "{} INFO: store online recall data:{}".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'), str(recall_list))) if recall_list: recall_table = conn.table("cb_recall") recall_table.put( b"recall:user:%s" % data["param"]["userId"].encode(), {b"online:%d" % data["channelId"]: str(recall_list).encode()} ) history_table.put( b"reco:his:%s" % data["param"]["userId"].encode(), {b"channel:%d" % data["channelId"]: str(recall_list).encode()} ) conn.close() # x [number, data] # 1、SIMILAR_DS数据处理以及计算, json字符串,# 定义一个计算内容相似召回的函数, SIMILAR_DS.map(lambda x: json.loads(x[1])).foreachRDD(get_similar_article) return None def _update_hot_recall(self): """进行热门召回的计算存储 :return: """ client = self.client def updateHotArt(rdd): """点击文章的分数增加处理 :param rdd: :return: """ for data in rdd.collect(): # data 进行判断 if data['param']['action'] in ["click", "collect", "share"]: client.zincrby('ch:{}:hot'.format(data['channelId']), 1, data['param']['articleId']) HOT_DS.map(lambda x: json.loads(x[1])).foreachRDD(updateHotArt) if __name__ == '__main__': # 启动日志初始化 lg.create_logger() # 启动在线内容召回程序 ol = OnlineRecall() # ol._online_content_recall() ol._update_hot_recall() # 没有spark streaming, SIMILAR_DS是处理函数 stream_sc.start() # 使用 ctrl+c 可以退出服务 _ONE_DAY_IN_SECONDS = 60 * 60 * 24 try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: pass