## 5、SPARK RDD开发实战

需求：通过电商后台产生的订单等信息，为用户构建简单用户画像

![s8](pics/s8.png)

- 提取性别、年龄、星座等信息
- 用户身份证号码的编码规则
  - 第7至第14位是出生日期码，表示出生的年、月、日，年、月、日分别用4位、2位（不足两位加0）、2（同上）位数字表示。
  - 第15至17位是顺序码：表示在同一地址码表示的范围内，对同年、同月、同日出生的人遍定的顺序号，顺序码的奇数分配给男性，偶数分配给女性。

代码：

```python
import sys
from pyspark.sql import SparkSession
import datetime

#将生日转换为星座
def birth_to_zodiac(birth):
    month = birth.month
    day = birth.day
    s = u"魔羯水瓶双鱼牡羊金牛双子巨蟹狮子处女天秤天蝎射手魔羯"
    # s = "AABBCCDDEEFFGGHHIIGGKKLLAA"
    # s = u"010203040506070809101112"
    arr = [20, 19, 21, 21, 21, 22, 23, 23, 23, 23, 22, 22]
    idx = month * 2 - (2 if day < arr[month - 1] else 0)
    return s[idx:idx + 2]

#通过身份证和订单日期计算用户的年龄和性别，星座
def user_profile(idcard, order_time):
    try:
        #xxxxxx19800101111xxxx
        birth = datetime.datetime.strptime(str(idcard)[6:14], '%Y%m%d')
        sex = u'男' if int(str(idcard)[14:17]) % 2 == 1 else u'女'
        order_time = datetime.datetime.strptime(order_time, '%Y-%m-%d')
        age = order_time.year - birth.year - ((order_time.month, order_time.day) < (birth.month, birth.day))
        zodiac = birth_to_zodiac(birth)
        return sex, age, zodiac
    except:
        return idcard, 0, order_time

#将数据中的内容进行拆分，取出身份证和订单日期
def idcardSplit(x):
    segs = x.split(',')
    idcard, order_date = segs[0], segs[1]
    #将身份证中的有效信息取出，进行脱敏
    idcard = 'xxxxxx' + idcard[6:17] + 'xxxx'
    return idcard, order_date


if __name__ == '__main__':

   	#data_path:数据文件路径
    #save_path：结果的存储路径
    help_str = 'Usage : <data_path> <save_path>'
    if len(sys.argv) != 3:
        print(help_str)
        exit(0)

    spark = SparkSession.builder.appName('user_profile').getOrCreate()
    sc = spark.sparkContext
    userRDD = sc.textFile(sys.argv[1])

    # 将数据进行脱敏
    userMaskRDD = userRDD.map(lambda x: idcardSplit(x)).cache()
    userProfileRDD = userMaskRDD.filter(lambda x: len(x[0]) > 18).map(lambda x: user_profile(x[0], x[1]))

    # 脏数据
    dirty_data = userProfileRDD.filter(lambda x: x[1] == 0)
    dirty_data.take(0)

    cleanRDD = userProfileRDD.filter(lambda x: x[1] > 0)
    cleanRDD.saveAsTextFile(sys.argv[2])

    sc.stop()
```

