[window 함수]
- 특정 윈도우를 대상으로 고유의 집계 연산을 수행합니다.
- 윈도우는 현재 데이터에 대한 참조를 사용해 정의하고,윈도우 명세는 함수에 전달될 로우를 결정합니다.
- groupBy 함수를 사용하면 모든 로우 레코드가 단일 그룹으로만 이동하는 반면,
윈도우 함수는 프레임(로우 그룹 기반의 테이블)에 입력되는 모든 로우에 대해 결괏값을 계산합니다.
코드 예제를 통해 window 함수를 살펴보겠습니다.
- 데이터 셋팅
// 데이터 셋팅
val retail=spark.read.format("csv")
.option("header","true")
.option("inferSchema","true")
.load("C:/data/retail-data/by-day/2011-01-30.csv")
retail.cache()
retail.createOrReplaceTempView("retail")
retail.show(4)
위 데이터에 window 함수를 적용하겠습니다.
[window 함수 사용 절차]
1) window 함수 명세 작성
-partitionBy로 그룹을 어떻게 나눌지 결정.
- orderBy로 파티션 정렬 방식 정의.
- 프레임 명세는 입력된 로우의 참조를 기반으로 프레임에 로우가 포함될 수 있는지 결정.
- 1행부터 현재 행까지 확인.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
val windowSpec=Window
.partitionBy("CustomerId","Date")
.orderBy(col("Quantity").desc)
.rowsBetween(Window.unboundedPreceding,Window.currentRow)
2. window 함수 사용을 위한 정보 전달
- 집계 함수에 컬럼명이나 표현식을 전달
- 함수를 적용할 데이터 프레임이 정의된 윈도우 명세도 함께 사용
import org.apache.spark.sql.functions.{dense_rank,rank}
// 수량 컬럼의 최대값 구하기
val maxPurchaseQuantity=max(col("Quantity")).over(windowSpec)
// 순위를 구할 함수
val purchaseDenseRank=dense_rank().over(windowSpec)
[window 함수 종류]
1) ranking
- 위의 예제를 select구문에서 사용할 수 있는 컬럼을 반환.
- slelect메서드를 사용하여 계산된 윈도우값을 확인.
retailWithDate.where("customerId IS NOT NULL").orderBy("customerId")
.select(
col("CustomerId"),
col("Date"),
col("Quantity"),
purchaseDenseRank.alias("quantityRank"),
maxPurchaseQuantity.alias("maxPurchaseQuantity")
).show(5)
2) Roll-up
- 아래 코드에서 롤업의 결과로 생성된 DataFrame은 모든 날짜의 총합, 날짜별 총합, 날짜별&국가별 총합을 포함합니다.
- 전체 날짜의 합계는 롤업 된 2개의 컬럼 값이 모두 null 값인 로우에서 확인할 수 있습니다.
// 롤업
val retailNotNull=retail.where("CustomerId IS NOT NULL").orderBy("CustomerId")
retailNotNull.createOrReplaceTempView("retailNotNull")
// InvoiceDate와 country 컬럼을 축으로 하는 롤업 생성
val rollUpRetail=retailNotNull.rollup("InvoiceDate","Country").agg(sum("Quantity"))
.selectExpr("InvoiceDate","Country","`sum(Quantity)` as total_quantity")
.orderBy("InvoiceDate")
rollUpRetail.show(5)
3) Cube
- 큐브는 모든 차원에 대해 동일한 작업을 수행합니다.
- 아래 코드에서는 전체 기간에 대해 다음과 같은 결과를 얻을 수 있습니다.
(1) 전체 날짜와 모든 국가에 대한 합계
(2) 모든 국가의 날짜별 합계
(3) 날짜별 국가별 합계
(4) 전체 날짜의 국가별 합계
// 전체 기간에 대해 날짜와 국가별 결과를 산출
retailNotNull.cube("InvoiceDate","Country")
.agg(sum(col("Quantity")))
.select("InvoiceDate","Country","sum(Quantity)").orderBy("Country")
.show(10)
※ 그룹화 메타데이터
- 큐브와 롤업을 사용하다 보면 집계 수준에 따라 쉽게 필터링하기 위해 집계 수준 조회가 필요한 경우가 있습니다.
이때, grouping_id를 사용하여 결과 데이터셋의 집계 수준을 명시하는 컬럼을 제공합니다.
- grouping_id 값의 의미
3 : 가장 높은 계층의 집계 결과에서 나타남. customerId나 stockCode에 관계없이 총수량을 제공
2 : 개별 재고 코드의 모든 집계 결과에서 나타난다. customer Id에 관계없이 재고 코드별 총 수량을 제공
1 : 구매한 물품에 관계없이 customerId를 기반으로 총 수량을 제공
0 : customerId와 stockCode별 조합에 따라 총 수량을 제공
import org.apache.spark.sql.functions.{grouping_id,sum,expr}
retailNotNull.cube("customerId","stockCode")
.agg(grouping_id(),sum("Quantity"))
.orderBy(col("grouping_id()").desc)
.show(5)
4) Pivot
- 피벗을 사용하면 로우를 컬럼으로 바꿀 수 있습니다.
- 피벗으로 컬럼별로 집계 함수를 적용할 수 있으며 쿼리를 이용하여 쉽게 결과를 확인할 수 있습니다.
val pivoted=retailNotNull.groupBy("stockCode").pivot("Country").sum()
pivoted.select("stockCode","`United Kingdom_sum(Quantity)`").show(10)
'Data_Analysis > Spark, Zeppelin' 카테고리의 다른 글
[Spark] 그룹화 (0) | 2021.03.09 |
---|---|
[Spark] 집계 연산 (0) | 2021.03.07 |
[Spark] 구조적 API 기본 연산2 - 로우 (0) | 2020.12.26 |
[Spark] 구조적 API 기본 연산1 - 컬럼 (0) | 2020.12.26 |
[Spark] 구조적 API 개요 (0) | 2020.12.23 |
댓글