|

揭秘:如何精准统计连续登录三天及以上的用户?
[TOC]
### 一、需求
> 统计连续登录三天及以上的用户
+ 这个问题可以扩展到很多相似的问题:连续几个月充值会员、连续天数有商品卖出、连续打滴滴、连续逾期。
示例
| uid | times | start_date | end_date |
| ------ | ----- | ---------- | ---------- |
| guid01 | 4 | 2018-03-04 | 2018-03-07 |
| guid02 | 3 | 2018-03-01 | 2018-03-03 |
### 二、数据准备
+ `v_user_login.csv`
-
- uid,datatime
- guid01,2018-02-28
- guid01,2018-03-01
- guid01,2018-03-02
- guid01,2018-03-04
- guid01,2018-03-05
- guid01,2018-03-06
- guid01,2018-03-07
- guid02,2018-03-01
- guid02,2018-03-02
- guid02,2018-03-03
- guid02,2018-03-06
复制代码
说明
+ uid:登录用户ID
+ datatime:登录时间
### 三、SQL实现
#### 3.1 步骤
1.查询用户登录详情,并且根据访问时间顺序对每个用户访问次数进行累计
2.查询出连续登录日期
- 思考:如何能够确定是否是连续时间登录
- 寻找规律:登录累积次数减登录时间,如果相同则代码是连续时间登录
3. 对uid和dif进行分组,统计数量
#### 3.2 代码
-
- import org.apache.spark.sql.{DataFrame, SparkSession}
- object UserContinueLoginSQL {
- def main(args: Array[String]): Unit = {
- val spark = SparkSession.builder().appName("UserContinueLoginSQL").master("local[*]").getOrCreate()
- import spark.implicits._
- val df: DataFrame = spark.read
- .option("header", "true")
- .option("inferSchema", "true")
- .csv("data_sql/v_user_login.csv")
- df.createOrReplaceTempView("v_access_log")
- //1.查询用户登录详情,并且根据访问时间顺序对每个用户访问次数进行累计
- //row_number()函数将针对SELECT语句返回的每一行,从1开始编号,赋予其连续的编号。
- spark.sql(
- """
- | select
- | uid,
- | datatime,
- | ROW_NUMBER() OVER(PARTITION BY uid ORDER BY datatime ASC) rn
- | from v_access_log
- """.stripMargin).createOrReplaceTempView("t1")
- //2.查询出连续登录日期
- //思考:如何能够确定是否是连续时间登录
- //寻找规律:登录累积次数减登录时间,如果相同则代码是连续时间登录
- //DATE_SUB():日期相减
- spark.sql(
- """
- | select
- | uid,
- | datatime,
- | DATE_SUB(datatime,rn) dif
- | from t1
- """.stripMargin).createOrReplaceTempView("t2")
- //3.对uid和dif进行分组,统计数量
- spark.sql(
- """
- | select
- | uid,
- | MIN(datatime) start_date,
- | MAX(datatime) end_date,
- | count(1) counts
- | from t2
- | group by uid,dif HAVING counts >= 3
- """.stripMargin).show()
- }
- }
复制代码
运行结果
+------+----------+----------+------+
| uid|start_date| end_date|counts|
+------+----------+----------+------+
|guid02|2018-03-01|2018-03-03| 3|
|guid01|2018-02-28|2018-03-02| 3|
|guid01|2018-03-04|2018-03-07| 4|
+------+----------+----------+------+
### 四、RDD实现
-
- import java.text.SimpleDateFormat
- import java.util.{Calendar, Date}
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
- object UserContinueLoginRDD {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setAppName("UserContinueLoginRDD").setMaster("local[*]")
- val sc = new SparkContext(conf)
- //读取数据
- val rdd: RDD[String] = sc.textFile("data_sql/v_user_login.csv")
- //过滤掉第一行表头
- val header = rdd.first()
- val rdd1 = rdd.filter(row => row != header)
- //对数据进行处理
- val uidAndDate: RDD[(String, String)] = rdd1.map(x => {
- val fis = x.split(",")
- val uid = fis(0)
- val date = fis(1)
- (uid, date)
- })
- //根据uid进行分分组,将同一个用户的登录数据搞到同一个分组中
- val grouped: RDD[(String, Iterable[String])] = uidAndDate.groupByKey()
- //在组内进行排序
- val uidAndDateDif = grouped.flatMapValues(it => {
- //将迭代器中的数据toList/toSet,有可能会发生内存溢出
- val sorted = it.toSet.toList.sorted
- //定义一个日期的工具类
- val calendar = Calendar.getInstance()
- val sdf = new SimpleDateFormat("yyyy-MM-dd")
- var index = 0;
- sorted.map(desStr => {
- val date: Date = sdf.parse(desStr)
- calendar.setTime(date)
- calendar.add(Calendar.DATE, -index)
- index += 1
- (desStr, sdf.format(calendar.getTime))
- })
- })
- val result = uidAndDateDif.map(x => {
- ((x._1, x._2._2), x._2._1)
- }).groupByKey().mapValues(it=>{
- val list = it.toList
- val times = list.size
- val startTime = list.head
- val endTime = list.last
- (times,startTime,endTime)
- }).map(t=>{
- (t._1._1,t._2._1,t._2._2,t._2._3)
- }).filter(x=>{
- x._2>=3
- })
- val buffer = result.collect().toBuffer
- println(buffer)
- }
- }
复制代码
运行结果
ArrayBuffer((guid02,3,2018-03-01,2018-03-03), (guid01,4,2018-03-04,2018-03-07), (guid01,3,2018-02-28,2018-03-02))
|
|