<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    隨筆-314  評論-209  文章-0  trackbacks-0

    spark 累加歷史主要用到了窗口函數(shù),而進(jìn)行全部統(tǒng)計,則需要用到rollup函數(shù)

    1  應(yīng)用場景:

      1、我們需要統(tǒng)計用戶的總使用時長(累加歷史)

      2、前臺展現(xiàn)頁面需要對多個維度進(jìn)行查詢,如:產(chǎn)品、地區(qū)等等

      3、需要展現(xiàn)的表格頭如: 產(chǎn)品、2015-04、2015-05、2015-06

    2 原始數(shù)據(jù):

    復(fù)制代碼
    product_code |event_date |duration |
    -------------|-----------|---------|
    1438         |2016-05-13 |165      |
    1438         |2016-05-14 |595      |
    1438         |2016-05-15 |105      |
    1629         |2016-05-13 |12340    |
    1629         |2016-05-14 |13850    |
    1629         |2016-05-15 |227      |
    復(fù)制代碼

    3 業(yè)務(wù)場景實現(xiàn)

    3.1 業(yè)務(wù)場景1:累加歷史:

    如數(shù)據(jù)源所示:我們已經(jīng)有當(dāng)天用戶的使用時長,我們期望在進(jìn)行統(tǒng)計的時候,14號能累加13號的,15號能累加14、13號的,以此類推

    3.1.1 spark-sql實現(xiàn)

    復(fù)制代碼
    //spark sql 使用窗口函數(shù)累加歷史數(shù)據(jù)
    sqlContext.sql(
    """
      select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration
      from userlogs_date
    """).show
    +-----+----------+------------+                                                 
    |pcode|event_date|sum_duration|
    +-----+----------+------------+
    | 1438|2016-05-13|         165|
    | 1438|2016-05-14|         760|
    | 1438|2016-05-15|         865|
    | 1629|2016-05-13|       12340|
    | 1629|2016-05-14|       26190|
    | 1629|2016-05-15|       26417|
    +-----+----------+------------+
    復(fù)制代碼

    3.1.2 dataframe實現(xiàn)

     

    復(fù)制代碼
    //使用Column提供的over 函數(shù),傳入窗口操作
    import org.apache.spark.sql.expressions._
    val first_2_now_window = Window.partitionBy("pcode").orderBy("event_date")
    df_userlogs_date.select(
        $"pcode",
        $"event_date",
        sum($"duration").over(first_2_now_window).as("sum_duration")
    ).show
    
    +-----+----------+------------+                                                 
    |pcode|event_date|sum_duration|
    +-----+----------+------------+
    | 1438|2016-05-13|         165|
    | 1438|2016-05-14|         760|
    | 1438|2016-05-15|         865|
    | 1629|2016-05-13|       12340|
    | 1629|2016-05-14|       26190|
    | 1629|2016-05-15|       26417|
    +-----+----------+------------+
    復(fù)制代碼

     3.1.3 擴展 累加一段時間范圍內(nèi)

    實際業(yè)務(wù)中的累加邏輯遠(yuǎn)比上面復(fù)雜,比如,累加之前N天,累加前N天到后N天等等。以下我們來實現(xiàn):

     3.1.3.1 累加歷史所有:

    select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date
    select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date
    Window.partitionBy("pcode").orderBy("event_date").rowsBetween(Long.MinValue,0)
    Window.partitionBy("pcode").orderBy("event_date")
    上邊四種寫法完全相等

    3.1.3.2 累加N天之前,假設(shè)N=3
    select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 3 preceding and current row) as sum_duration from userlogs_date
    Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,0

      3.1.3.3 累加前N天,后M天: 假設(shè)N=3 M=5 

    select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 3 preceding and 5 following ) as sum_duration from userlogs_date
    Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,5)
    3.1.3.4 累加該分區(qū)內(nèi)所有行
    select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and unbounded following ) as sum_duration from userlogs_date
    Window.partitionBy("pcode").orderBy("event_date").rowsBetween(Long.MinValue,Long.MaxValue)

     總結(jié)如下:

    preceding:用于累加前N行(分區(qū)之內(nèi))。若是從分區(qū)第一行頭開始,則為 unbounded。 N為:相對當(dāng)前行向前的偏移量
    following :與preceding相反,累加后N行(分區(qū)之內(nèi))。若是累加到該分區(qū)結(jié)束,則為 unbounded。N為:相對當(dāng)前行向后的偏移量
    current row:顧名思義,當(dāng)前行,偏移量為0
    說明:上邊的前N,后M,以及current row均會累加該偏移量所在行

    3.1.3.4 實測結(jié)果
    累加歷史:分區(qū)內(nèi)當(dāng)天及之前所有 寫法1:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date
    
    
    復(fù)制代碼
    +-----+----------+------------+                                                 
    |pcode|event_date|sum_duration|
    +-----+----------+------------+
    | 1438|2016-05-13|         165|
    | 1438|2016-05-14|         760|
    | 1438|2016-05-15|         865|
    | 1629|2016-05-13|       12340|
    | 1629|2016-05-14|       26190|
    | 1629|2016-05-15|       26417|
    +-----+----------+------------+
    復(fù)制代碼
    累加歷史:分區(qū)內(nèi)當(dāng)天及之前所有 寫法2:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date
    復(fù)制代碼
    +-----+----------+------------+                                                 
    |pcode|event_date|sum_duration|
    +-----+----------+------------+
    | 1438|2016-05-13|         165|
    | 1438|2016-05-14|         760|
    | 1438|2016-05-15|         865|
    | 1629|2016-05-13|       12340|
    | 1629|2016-05-14|       26190|
    | 1629|2016-05-15|       26417|
    +-----+----------+------------+
    復(fù)制代碼
    累加當(dāng)日和昨天:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 1 preceding and current row) as sum_duration from userlogs_date
    復(fù)制代碼
    +-----+----------+------------+                                                 
    |pcode|event_date|sum_duration|
    +-----+----------+------------+
    | 1438|2016-05-13|         165|
    | 1438|2016-05-14|         760|
    | 1438|2016-05-15|         700|
    | 1629|2016-05-13|       12340|
    | 1629|2016-05-14|       26190|
    | 1629|2016-05-15|       14077|
    +-----+----------+------------+
    復(fù)制代碼
    累加當(dāng)日、昨日、明日:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 1 preceding and 1 following ) as sum_duration from userlogs_date
    復(fù)制代碼
    +-----+----------+------------+                                                 
    |pcode|event_date|sum_duration|
    +-----+----------+------------+
    | 1438|2016-05-13|         760|
    | 1438|2016-05-14|         865|
    | 1438|2016-05-15|         700|
    | 1629|2016-05-13|       26190|
    | 1629|2016-05-14|       26417|
    | 1629|2016-05-15|       14077|
    +-----+----------+------------+
    復(fù)制代碼
    累加分區(qū)內(nèi)所有:當(dāng)天和之前之后所有select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and unbounded following ) as sum_duration from userlogs_date
    復(fù)制代碼
    +-----+----------+------------+                                                 
    |pcode|event_date|sum_duration|
    +-----+----------+------------+
    | 1438|2016-05-13|         865|
    | 1438|2016-05-14|         865|
    | 1438|2016-05-15|         865|
    | 1629|2016-05-13|       26417|
    | 1629|2016-05-14|       26417|
    | 1629|2016-05-15|       26417|
    +-----+----------+------------+
    復(fù)制代碼
    3.2 業(yè)務(wù)場景2:統(tǒng)計全部

    3.2.1 spark sql實現(xiàn)

    復(fù)制代碼
    //spark sql 使用rollup添加all統(tǒng)計
    sqlContext.sql(
    """
      select pcode,event_date,sum(duration) as sum_duration
      from userlogs_date_1
      group by pcode,event_date with rollup
      order by pcode,event_date
    """).show()
    
    +-----+----------+------------+                                                 
    |pcode|event_date|sum_duration|
    +-----+----------+------------+
    | null|      null|       27282|
    | 1438|      null|         865|
    | 1438|2016-05-13|         165|
    | 1438|2016-05-14|         595|
    | 1438|2016-05-15|         105|
    | 1629|      null|       26417|
    | 1629|2016-05-13|       12340|
    | 1629|2016-05-14|       13850|
    | 1629|2016-05-15|         227|
    +-----+----------+------------+
    復(fù)制代碼

    3.2.2 dataframe函數(shù)實現(xiàn)

    復(fù)制代碼
    //使用dataframe提供的rollup函數(shù),進(jìn)行多維度all統(tǒng)計
    df_userlogs_date.rollup($"pcode", $"event_date").agg(sum($"duration")).orderBy($"pcode", $"event_date")
    
    +-----+----------+-------------+                                                
    |pcode|event_date|sum(duration)|
    +-----+----------+-------------+
    | null|      null|        27282|
    | 1438|      null|          865|
    | 1438|2016-05-13|          165|
    | 1438|2016-05-14|          595|
    | 1438|2016-05-15|          105|
    | 1629|      null|        26417|
    | 1629|2016-05-13|        12340|
    | 1629|2016-05-14|        13850|
    | 1629|2016-05-15|          227|
    +-----+----------+-------------+
    復(fù)制代碼

      3.3 行轉(zhuǎn)列 ->pivot

     

     

    pivot目前還沒有sql語法,先用df語法吧
    復(fù)制代碼
    val userlogs_date_all = sqlContext.sql("select dcode, pcode,event_date,sum(duration) as duration from userlogs group by dognum, pcode,event_date ")
    userlogs_date_all.registerTempTable("userlogs_date_all")
    val dates = userlogs_date_all.select($"event_date").map(row => row.getAs[String]("event_date")).distinct().collect().toList
    userlogs_date_all.groupBy($"dcode", $"pcode").pivot("event_date", dates).sum("duration").na.fill(0).show
    
    +-----------------+-----+----------+----------+----------+----------+
    |            dcode|pcode|2016-05-26|2016-05-13|2016-05-14|2016-05-15|
    +-----------------+-----+----------+----------+----------+----------+
    |         F2429186| 1438|         0|         0|       227|         0|
    |        AI2342441| 1438|         0|         0|         0|       345|
    |       A320018711| 1438|         0|       939|         0|         0|
    |         H2635817| 1438|         0|       522|         0|         0|
    |         D0288196| 1438|         0|       101|         0|         0|
    |         Y0242218| 1438|         0|      1036|         0|         0|
    |         H2392574| 1438|         0|         0|       689|         0|
    |         D2245588| 1438|         0|         0|         1|         0|
    |         Y2514906| 1438|         0|         0|       118|         4|
    |         H2540419| 1438|         0|       465|       242|         5|
    |         R2231926| 1438|         0|         0|       305|         0|
    |         H2684591| 1438|         0|       136|         0|         0|
    |         A2548470| 1438|         0|       412|         0|         0|
    |         GH000309| 1438|         0|         0|         0|         4|
    |         H2293216| 1438|         0|         0|         0|       534|
    |         R2170601| 1438|         0|         0|         0|         0|
    |B2365238;B2559538| 1438|         0|         0|         0|         0|
    |         BQ005465| 1438|         0|         0|       642|        78|
    |        AH2180324| 1438|         0|       608|       146|        36|
    |         H0279306| 1438|         0|       490|         0|         0|
    +-----------------+-----+----------+----------+----------+----------+
    復(fù)制代碼

     

     

    附錄

    下面是這兩個函數(shù)的官方api說明:

    org.apache.spark.sql.scala
    復(fù)制代碼
    1
    def rollup(col1: String, cols: String*): GroupedData
    Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.
    This is a variant of rollup that can only group by existing columns using column names (i.e. cannot construct expressions).
    
    // Compute the average for all numeric columns rolluped by department and group.
    df.rollup("department", "group").avg()
    
    // Compute the max age and average salary, rolluped by department and gender.
    df.rollup($"department", $"gender").agg(Map(
      "salary" -> "avg",
      "age" -> "max"
    ))
    復(fù)制代碼
    復(fù)制代碼
    def rollup(cols: Column*): GroupedData
    Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.
    df.rollup($"department", $"group").avg()
    
    // Compute the max age and average salary, rolluped by department and gender.
    df.rollup($"department", $"gender").agg(Map(
      "salary" -> "avg",
      "age" -> "max"
    ))
    復(fù)制代碼
    org.apache.spark.sql.Column.scala
    復(fù)制代碼
    def over(window: WindowSpec): Column
    Define a windowing column.
    
    val w = Window.partitionBy("name").orderBy("id")
    df.select(
      sum("price").over(w.rangeBetween(Long.MinValue, 2)),
      avg("price").over(w.rowsBetween(0, 4))
    )
    復(fù)制代碼
    posted on 2017-10-23 22:05 xzc 閱讀(886) 評論(0)  編輯  收藏 所屬分類: hadoop
    主站蜘蛛池模板: 久久精品无码专区免费东京热| 亚洲综合区小说区激情区| 久久高潮一级毛片免费| 中国好声音第二季免费播放| 羞羞的视频在线免费观看| fc2成年免费共享视频18| 无码天堂亚洲国产AV| 丝袜捆绑调教视频免费区| 久久成人免费电影| 最新仑乱免费视频| 亚洲无码精品浪潮| 亚洲网站在线免费观看| 亚洲AV性色在线观看| 成人免费ā片在线观看| 五月亭亭免费高清在线| 免费人成在线观看网站品爱网日本| 手机在线看永久av片免费| 免费在线观看毛片| 亚洲A∨无码无在线观看| 77777亚洲午夜久久多喷| 日日摸夜夜添夜夜免费视频 | 一级毛片免费播放男男| 日韩视频免费在线观看| 在线精品免费视频无码的| 在线观看午夜亚洲一区| 亚洲二区在线视频| xxxxxx日本处大片免费看| 免费专区丝袜脚调教视频| 亚洲七七久久精品中文国产| 亚洲精品一区二区三区四区乱码 | 一区二区三区亚洲视频| 国产免费卡一卡三卡乱码| 亚洲国产精品VA在线观看麻豆| 2019亚洲午夜无码天堂| 中文毛片无遮挡高清免费| 成年女人午夜毛片免费看| 亚洲精品一品区二品区三品区| 久久精品国产亚洲综合色| 亚洲欧洲精品成人久久曰| 日韩精品免费视频| 四虎影视精品永久免费|