from pyspark import SparkConf from pyspark.sql import SparkSession from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from setting.default import DefaultConfig # happybase连接池 import happybase # 用于读取hbase缓存结果配置 pool = happybase.ConnectionPool(size=10, host='hadoop-master', port=9090) # - 1、创建spark streaming配置信息 conf = SparkConf() conf.setAll(DefaultConfig.SPARK_ONLINE_CONFIG) sc = SparkContext(conf=conf) stream_sc = StreamingContext(sc, 5) # - 2、Kafka配置,初始化KafkaUtils.createDirectStream kafka_param = {"metadata.broker.list": DefaultConfig.KAFKA_SEVER, "group.id": "similar"} SIMILAR_DS = KafkaUtils.createDirectStream(stream_sc, ['click-trace'], kafka_param) # 添加热门文章读取Kafka,新文章读取Kafka的配置 click_kafkaParams = {"metadata.broker.list": DefaultConfig.KAFKA_SEVER} HOT_DS = KafkaUtils.createDirectStream(stream_sc, ['click-trace'], click_kafkaParams) NEW_ARTICLE_DS = KafkaUtils.createDirectStream(stream_sc, ['new-article'], click_kafkaParams)