본문 바로가기
Data_Analysis/Spark, Zeppelin

[Spark] 구조적 API 기본 연산2 - 로우

by Classic! 2020. 12. 26.

[Row Handling]

1) 로우 생성

- 로우의 데이터에 접근할 때는 원하는 위치와 데이터 타입을 지정합니다. 자바나 스칼라를 사용할 때는 명시적으로 데이터 타입을 정의하지만, Python이나 R같은 동적 코드에서는 타입이 자동으로 변환됩니다.

import org.apache.spark.sql.Row

val myRow=Row("Hello",null,1,false)
myRow

 

2) 로우 필터링

- filter 또는 where 메서드로 로우 필터링

- filter와 where는 똑같은 기능을 하며,
- 메서드를 연달아 쓸 경우 메서드의 조건이 AND로 조건이 적용된다.
- 다만, 스파크는 자동으로 필터의 순서와 상관없이 동시에 모든 필터링 작업을 수행

// 2) 로우 필터링 : filter 또는 where 메서드로 로우 필터링

flight15.where(col("count")<2)
		.filter(col("ORIGIN_COUNTRY_NAME")=!="Croatia")
        .show(3)
/*
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
|            Malta|      United States|    1|
+-----------------+-------------------+-----+
only showing top 3 rows
*/        

 

3) 고유한 로우 얻기

- distinct 메서드로 로우의 고유한 값 추출

// 3) 고유한 로우 얻기(distinct) + 고유한 로우 수 세기(count)
flight15.select("ORIGIN_COUNTRY_NAME").distinct().count()

// res35: Long = 125

 

4) 로우 합치기 + 추가하기

- DataFrame을 변경할 수 없으므로 DataFrame에 레코드를 추가하려면 원래 DataFrame과 새로운 DataFrame을통합 해야 합니다.

- 통합하려는 2개의 DataFrame은 반드시 동일한 스키마와 컬럼수를 가져야 합니다.

import org.apache.spark.sql.Row
// 4) 로우 합치기 + 추가하기

// 원래 DataFrame과 동일한 스키마 만들기
val shcema=flight15.schema

// 추가할 로우 생성
val newRows=Seq(
    Row("New Country","Other Country",5L),
    Row("New Country 2","Other Country 3",1L)
    )
    
/* parallelize() 
: 새로운 로우를 텍스트 리스트를 parallelize() 메서드로 데이터를 가져옴.
데이터세트를 넘겨주고 RDD를 생성시킴. spark.createDataFrame 메서드로 데이터 프레임에 넣기*/
val parallelizeRows=spark.sparkContext.parallelize(newRows)
val newDF=spark.createDataFrame(parallelizeRows,shcema)

// Dataframe 병합
flight15.union(newDF)
    .where("count=1")
    .where($"ORIGIN_COUNTRY_NAME"=!="United States")
    .show(3)

 

5) 로우 정렬하기

- sort 또는 orderBy 메서드로 정렬. 2개 메서드는 동일한 방식으로 작동.

// 5) 로우 정렬하기
flight15.sort("ORIGIN_COUNTRY_NAME").show(5)

/*
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|             Angola|   13|
|    United States|           Anguilla|   38|
|    United States|Antigua and Barbuda|  117|
|    United States|          Argentina|  141|
|    United States|              Aruba|  342|
+-----------------+-------------------+-----+
only showing top 5 rows
*/


// desc, asc 지정
flight15.orderBy(expr("count desc")).show(3)
/*
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|    United States|          Singapore|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 3 rows
*/
flight15.sort(asc("count"))
        .orderBy(desc("DEST_COUNTRY_NAME"))
        .show(3)

/*
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|           Zambia|      United States|    1|
|        Venezuela|      United States|  290|
|          Uruguay|      United States|   43|
+-----------------+-------------------+-----+
only showing top 3 rows
*/

// 파티션 별 정렬 수행 >> 트랜스포메이션 처리 전 성능 최적화를 위해
flight15.sortWithinPartitions("ORIGIN_COUNTRY_NAME").show(4)
/*
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|             Angola|   13|
|    United States|           Anguilla|   38|
|    United States|Antigua and Barbuda|  117|
|    United States|          Argentina|  141|
+-----------------+-------------------+-----+
only showing top 4 rows
*/

 

'Data_Analysis > Spark, Zeppelin' 카테고리의 다른 글

[Spark] 그룹화  (0) 2021.03.09
[Spark] 집계 연산  (0) 2021.03.07
[Spark] 구조적 API 기본 연산1 - 컬럼  (0) 2020.12.26
[Spark] 구조적 API 개요  (0) 2020.12.23
[Spark] 아파치 스파크 들어가기  (0) 2020.12.23

댓글