본문 바로가기
Data_Analysis/Spark, Zeppelin

[Spark] 집계 연산

by Classic! 2021. 3. 7.

[집계 함수]

스파크는 모든 데이터 타입을 다루는 것 외에도 그룹화 데이터 타입 생성도 가능합니다. 그룹화된 결과는 지정된 집계 함수에 따라 Relational Grouped Dataset을 반환합니다.

1. count / countDistinct / approx_count_distinct

2. first / last / min / max

3. sum / sumDistinct

4. 평균 / 분산 / 표준편차 / 왜도 / 첨도 / 공분산 / 상관관계

5. 복합 데이터 타입 (collect_list, collect_set)


먼저 예시로 사용할 데이터 셋을 불러옵니다.

// 데이터 셋팅
val retail_all=spark.read.format("csv")
                .option("header","true")
                .option("inferSchema","true")
                .load("C:/my_spark/retail-data/all/*.csv")

retail_all.createOrReplaceTempView("retail_all")
retail_all.show()

 

 

 

1. count / countDistinct / approx_count_distinct

1) count : 전체 레코드 수

- count(*) : null값을 가진 로우를 포함하여 카운트.
- count("칼럼명") : null 포함한 로우는 세지 않음. 

// 대부분의 집계 함수는 import org.apache.spark.sql.functions 패키지에서 제공됩니다.
import org.apache.spark.sql.functions.{count,countDistinct,approx_count_distinct}

retail_all.select(count("StockCode")).show(4)
retail_all.select(count("*")).show(4)  

2) countDistinct : 고유 레코드 수

retail_all.select(
    countDistinct("StockCode")
    ).show(4)
// 결과 = 4070

 

 3) approx_count_distinct : 어느 정도 정확도를 가지는 근사치로 산출
- 최대 추정 오류율을 설정하여 기대치에서 크게 벗어나는 결과를 산출

- countDistinct보다 더 빠르게 결과 반환

retail_all.select(approx_count_distinct("StockCode",0.1)).show()
// 결과 =  3364

 

 

 

2. first / last / min / max

1) first, last : 첫 번째/마지막 값을 얻을 때 사용.

- DataFrame이 아니라 로우 기반으로 동작

import org.apache.spark.sql.functions.{first,last}

retail_all.select(first("StockCode"),last("StockCode")).show()

 

2) max, min : 최댓값/최솟값 산출

import org.apache.spark.sql.functions.{min,max}

retail_all.select(min("UnitPrice"),max("UnitPrice")).show()

 

 

 

3. sum / sumDistinct

import org.apache.spark.sql.functions.{sum,sumDistinct}

// 1) sum : 컬럼 내 모든 값 합산
retail_all.select(sum("Quantity")).show()
// 2) sumDistinct : 고유값 합산
retail_all.select(sumDistinct("Quantity")).show()

 

 

4. 평균 / 분산 / 표준편차 / 왜도 / 첨도 / 공분산 / 상관관계

1) 평균

import org.apache.spark.sql.functions.{avg,round}

retail_all.select(
    round(count("Quantity").alias("Total_transaction"),3),
    round(sum(col("Quantity")*col("UnitPrice")).alias("Total_Return"),3),
    round(avg("UnitPrice").alias("Average_UnitPrice"),3)
    ).show()

 

2) 분산과 표준 편차
- 스파크는 표본 표준편차와 모 표준편차 방식 2개 모두 지원.

import org.apache.spark.sql.functions.{variance,stddev,stddev_samp,var_samp,var_pop,stddev_pop}

// - 표본분산/표준편차 : variance/stddev 또는 stddev_samp/var_samp
retail_all.select(variance("Quantity"),stddev("Quantity")).show()
retail_all.select(var_samp("Quantity"),stddev_samp("Quantity")).show()

// - 모분산/표준편차 : var_pop/stddev_pop
retail_all.select(var_pop("Quantity"),stddev_pop("Quantity")).show()                

 

3) 왜도와 첨도

- 왜도 : 분포의 치우침의 정도. 대체로 |3|미만일 때, 정규 분포라고 판단.

- 첨도 : 평균으로 집중된 정도 = 분포의 뾰족한 정도. 대체로 |8| 미만일 때 정규분포라고 판단.

** 왜도 |2| 미만, 첨도 |4| 미만으로 판단하는 경우도 있음

import org.apache.spark.sql.functions.{skewness,kurtosis}
retail_all.select(
    skewness("Quantity"),kurtosis("Quantity")
    ).show()

 

4) 공분산과 상관관계

- 공분산 : 2개의 확률변수의 선형 관계를 나타내는 값.

- 상관관계 : 두 변수 간의 관계(변수 독립이거나 서로 상관성을 가질 수 있음).

import org.apache.spark.sql.functions.{corr,covar_pop,covar_samp}
retail_all.select(
    corr("UnitPrice","Quantity")
    ,covar_pop("UnitPrice","Quantity")
    ,covar_samp("UnitPrice","Quantity")).show()

 

 

5. 복합 데이터 타입 (collect_list, collect_set)

1) collect_set :  셋 데이터 타입
2) collect_list : 리스트 데이터 타입

아래 결과 이미지는 [United Kingdom, ... 결과가 잘려서 나오는데

SQL로 collect_list, collect_set으로 나타내면 보기 좋게 결과를 출력할 수 있습니다.

import org.apache.spark.sql.functions.{collect_list,collect_set}
retail_all.agg(collect_list("Country"),collect_set("Country")).show()

- SQL문 

%sql
select collect_set(country) from retail_all

'Data_Analysis > Spark, Zeppelin' 카테고리의 다른 글

[Spark] Window 함수  (0) 2021.03.11
[Spark] 그룹화  (0) 2021.03.09
[Spark] 구조적 API 기본 연산2 - 로우  (0) 2020.12.26
[Spark] 구조적 API 기본 연산1 - 컬럼  (0) 2020.12.26
[Spark] 구조적 API 개요  (0) 2020.12.23

댓글