본문 바로가기
Data_Analysis/Spark, Zeppelin

[Spark] Window 함수

by Classic! 2021. 3. 11.

[window 함수]

- 특정 윈도우를 대상으로 고유의 집계 연산을 수행합니다.

- 윈도우는 현재 데이터에 대한 참조를 사용해 정의하고,윈도우 명세는 함수에 전달될 로우를 결정합니다.

- groupBy 함수를 사용하면 모든 로우 레코드가 단일 그룹으로만 이동하는 반면,

윈도우 함수는 프레임(로우 그룹 기반의 테이블)에 입력되는 모든 로우에 대해 결괏값을 계산합니다.

 

코드 예제를 통해 window 함수를 살펴보겠습니다. 


- 데이터 셋팅

// 데이터 셋팅
val retail=spark.read.format("csv")
                .option("header","true")
                .option("inferSchema","true")
                .load("C:/data/retail-data/by-day/2011-01-30.csv")
retail.cache()
retail.createOrReplaceTempView("retail")
retail.show(4)

위 데이터에 window 함수를 적용하겠습니다.

 

 

 

[window 함수 사용 절차]

 

1) window 함수 명세 작성

-partitionBy로 그룹을 어떻게 나눌지 결정.

- orderBy로 파티션 정렬 방식 정의.
- 프레임 명세는 입력된 로우의 참조를 기반으로 프레임에 로우가 포함될 수 있는지 결정.

- 1행부터 현재 행까지 확인.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col

val windowSpec=Window
                .partitionBy("CustomerId","Date")
                .orderBy(col("Quantity").desc)
                .rowsBetween(Window.unboundedPreceding,Window.currentRow)

 

2. window 함수 사용을 위한 정보 전달

- 집계 함수에 컬럼명이나 표현식을 전달
- 함수를 적용할 데이터 프레임이 정의된 윈도우 명세도 함께 사용

import org.apache.spark.sql.functions.{dense_rank,rank}

// 수량 컬럼의 최대값 구하기
val maxPurchaseQuantity=max(col("Quantity")).over(windowSpec)

// 순위를 구할 함수
val purchaseDenseRank=dense_rank().over(windowSpec)

 

 

 

[window 함수 종류]


1) ranking

- 위의 예제를 select구문에서 사용할 수 있는 컬럼을 반환. 

- slelect메서드를 사용하여 계산된 윈도우값을 확인.

retailWithDate.where("customerId IS NOT NULL").orderBy("customerId")
            .select(
                col("CustomerId"),
                col("Date"),
                col("Quantity"),
                purchaseDenseRank.alias("quantityRank"),
                maxPurchaseQuantity.alias("maxPurchaseQuantity")
                ).show(5)

 

2) Roll-up

- 아래 코드에서 롤업의 결과로 생성된 DataFrame은 모든 날짜의 총합, 날짜별 총합, 날짜별&국가별 총합을 포함합니다.

- 전체 날짜의 합계는 롤업 된 2개의 컬럼 값이 모두 null 값인 로우에서 확인할 수 있습니다.

// 롤업
val retailNotNull=retail.where("CustomerId IS NOT NULL").orderBy("CustomerId")
retailNotNull.createOrReplaceTempView("retailNotNull")

// InvoiceDate와 country 컬럼을 축으로 하는 롤업 생성
val rollUpRetail=retailNotNull.rollup("InvoiceDate","Country").agg(sum("Quantity"))
                             .selectExpr("InvoiceDate","Country","`sum(Quantity)` as total_quantity")
                             .orderBy("InvoiceDate")
                            
rollUpRetail.show(5)

 

 

3) Cube

- 큐브는 모든 차원에 대해 동일한 작업을 수행합니다.

- 아래 코드에서는 전체 기간에 대해 다음과 같은 결과를 얻을 수 있습니다.

    (1) 전체 날짜와 모든 국가에 대한 합계

    (2) 모든 국가의 날짜별 합계

    (3) 날짜별 국가별 합계

    (4) 전체 날짜의 국가별 합계

// 전체 기간에 대해 날짜와 국가별 결과를 산출
retailNotNull.cube("InvoiceDate","Country")
             .agg(sum(col("Quantity")))
             .select("InvoiceDate","Country","sum(Quantity)").orderBy("Country")
             .show(10)

 

 

 

※ 그룹화 메타데이터

- 큐브와 롤업을 사용하다 보면 집계 수준에 따라 쉽게 필터링하기 위해 집계 수준 조회가 필요한 경우가 있습니다.

이때, grouping_id를 사용하여 결과 데이터셋의 집계 수준을 명시하는 컬럼을 제공합니다.

- grouping_id 값의 의미
  3 : 가장 높은 계층의 집계 결과에서 나타남. customerId나 stockCode에 관계없이 총수량을 제공
  2 : 개별 재고 코드의 모든 집계 결과에서 나타난다. customer Id에 관계없이 재고 코드별 총 수량을 제공
  1 : 구매한 물품에 관계없이 customerId를 기반으로 총 수량을 제공
  0 : customerId와 stockCode별 조합에 따라 총 수량을 제공

import org.apache.spark.sql.functions.{grouping_id,sum,expr}

retailNotNull.cube("customerId","stockCode")
             .agg(grouping_id(),sum("Quantity"))
             .orderBy(col("grouping_id()").desc)
             .show(5)

 

 

4) Pivot

- 피벗을 사용하면 로우를 컬럼으로 바꿀 수 있습니다.

- 피벗으로 컬럼별로 집계 함수를 적용할 수 있으며 쿼리를 이용하여 쉽게 결과를 확인할 수 있습니다.

val pivoted=retailNotNull.groupBy("stockCode").pivot("Country").sum()
pivoted.select("stockCode","`United Kingdom_sum(Quantity)`").show(10)

 

 

 

'Data_Analysis > Spark, Zeppelin' 카테고리의 다른 글

[Spark] 그룹화  (0) 2021.03.09
[Spark] 집계 연산  (0) 2021.03.07
[Spark] 구조적 API 기본 연산2 - 로우  (0) 2020.12.26
[Spark] 구조적 API 기본 연산1 - 컬럼  (0) 2020.12.26
[Spark] 구조적 API 개요  (0) 2020.12.23

댓글