[Row Handling]
1) 로우 생성
- 로우의 데이터에 접근할 때는 원하는 위치와 데이터 타입을 지정합니다. 자바나 스칼라를 사용할 때는 명시적으로 데이터 타입을 정의하지만, Python이나 R같은 동적 코드에서는 타입이 자동으로 변환됩니다.
import org.apache.spark.sql.Row
val myRow=Row("Hello",null,1,false)
myRow
2) 로우 필터링
- filter 또는 where 메서드로 로우 필터링
- filter와 where는 똑같은 기능을 하며,
- 메서드를 연달아 쓸 경우 메서드의 조건이 AND로 조건이 적용된다.
- 다만, 스파크는 자동으로 필터의 순서와 상관없이 동시에 모든 필터링 작업을 수행
// 2) 로우 필터링 : filter 또는 where 메서드로 로우 필터링
flight15.where(col("count")<2)
.filter(col("ORIGIN_COUNTRY_NAME")=!="Croatia")
.show(3)
/*
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Singapore| 1|
| Moldova| United States| 1|
| Malta| United States| 1|
+-----------------+-------------------+-----+
only showing top 3 rows
*/
3) 고유한 로우 얻기
- distinct 메서드로 로우의 고유한 값 추출
// 3) 고유한 로우 얻기(distinct) + 고유한 로우 수 세기(count)
flight15.select("ORIGIN_COUNTRY_NAME").distinct().count()
// res35: Long = 125
4) 로우 합치기 + 추가하기
- DataFrame을 변경할 수 없으므로 DataFrame에 레코드를 추가하려면 원래 DataFrame과 새로운 DataFrame을통합 해야 합니다.
- 통합하려는 2개의 DataFrame은 반드시 동일한 스키마와 컬럼수를 가져야 합니다.
import org.apache.spark.sql.Row
// 4) 로우 합치기 + 추가하기
// 원래 DataFrame과 동일한 스키마 만들기
val shcema=flight15.schema
// 추가할 로우 생성
val newRows=Seq(
Row("New Country","Other Country",5L),
Row("New Country 2","Other Country 3",1L)
)
/* parallelize()
: 새로운 로우를 텍스트 리스트를 parallelize() 메서드로 데이터를 가져옴.
데이터세트를 넘겨주고 RDD를 생성시킴. spark.createDataFrame 메서드로 데이터 프레임에 넣기*/
val parallelizeRows=spark.sparkContext.parallelize(newRows)
val newDF=spark.createDataFrame(parallelizeRows,shcema)
// Dataframe 병합
flight15.union(newDF)
.where("count=1")
.where($"ORIGIN_COUNTRY_NAME"=!="United States")
.show(3)
5) 로우 정렬하기
- sort 또는 orderBy 메서드로 정렬. 2개 메서드는 동일한 방식으로 작동.
// 5) 로우 정렬하기
flight15.sort("ORIGIN_COUNTRY_NAME").show(5)
/*
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Angola| 13|
| United States| Anguilla| 38|
| United States|Antigua and Barbuda| 117|
| United States| Argentina| 141|
| United States| Aruba| 342|
+-----------------+-------------------+-----+
only showing top 5 rows
*/
// desc, asc 지정
flight15.orderBy(expr("count desc")).show(3)
/*
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| Moldova| United States| 1|
| United States| Singapore| 1|
| United States| Croatia| 1|
+-----------------+-------------------+-----+
only showing top 3 rows
*/
flight15.sort(asc("count"))
.orderBy(desc("DEST_COUNTRY_NAME"))
.show(3)
/*
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| Zambia| United States| 1|
| Venezuela| United States| 290|
| Uruguay| United States| 43|
+-----------------+-------------------+-----+
only showing top 3 rows
*/
// 파티션 별 정렬 수행 >> 트랜스포메이션 처리 전 성능 최적화를 위해
flight15.sortWithinPartitions("ORIGIN_COUNTRY_NAME").show(4)
/*
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Angola| 13|
| United States| Anguilla| 38|
| United States|Antigua and Barbuda| 117|
| United States| Argentina| 141|
+-----------------+-------------------+-----+
only showing top 4 rows
*/
'Data_Analysis > Spark, Zeppelin' 카테고리의 다른 글
[Spark] 그룹화 (0) | 2021.03.09 |
---|---|
[Spark] 집계 연산 (0) | 2021.03.07 |
[Spark] 구조적 API 기본 연산1 - 컬럼 (0) | 2020.12.26 |
[Spark] 구조적 API 개요 (0) | 2020.12.23 |
[Spark] 아파치 스파크 들어가기 (0) | 2020.12.23 |
댓글