Spark 포스트는 책 「스파크 완벽 가이드 : 스파크를 활용한 빅데이터 처리와 분석의 모든 것」를 바탕으로 쓴 것임을 알려드립니다.
또한, Scala 코드 위주로 작성한 점 참고 바랍니다.
지난 포스팅에서 구조적 API의 예로 DataFrame을 언급했습니다. DataFrame은 컬럼과 로우로 구성된 로우와 컬럼을 가지는 분산 테이블 형태의 컬렉션입니다. 그러므로 DataFrame 핸들링도 로우와 컬럼에 대한 트랜스포메이션을 위주로 살펴보고자 합니다.
[DataFrame 트랜스포메이션]
1) DataFrame 생성하기
- 외부 파일 불러와 생성 : Spark 폴더 아래 data폴더에 사용할 파일을 저장한 다음, 해당 파일을 불러와 사용했습니다.
// DataFrame 생성
// 파일 불러오기
val flight15=spark.read.format("json")
.load("C:/HadoopEco/spark-2.4.7-bin-hadoop2.7/data/flight-data/json/2015-summary.json")
flight15
// 생성된 데이터 프레임을 SQL쿼리로 실행하고 트랜스포메이션하기 위해 임시 뷰로 등로
df.createOrReplaceTempView("dfTable")
- 직접 DataFrame 생성하기
import org.apache.spark.sql.types.{StructField,StructType,StringType,LongType}
import org.apache.spark.sql.types.Metadata
val mySchema=StructType(Array(
StructField("DEST_COUNTRY_NAME",StringType,true),
StructField("ORIGIN_COUNTRY_NAME",StringType,true),
StructField("count",LongType,false,
Metadata.fromJson("{\"hello\":\"world\"}"))
))
또는 Row 객체를 가진 Seq타입을 변환하여 생성합니다.
// Row객체를 가진 Seq타입을 변환하여 생성
import org.apache.spark.sql.Row
val myDf=Seq(("Hello",2,1L)).toDF("col1","col2","col3")
2) repartitions & coalesce
- 두 메서드 모두 컬럼을 기준으로 데이터를 분할하여 최적화하는 기법
- repartitions : 전체 데이터를 셔플. 향후 사용할 파티션 수가 현재 파티션 수보다 많거나 컬럼을 기준으로 파티션을 만드는 경우에만 사용
- 특정 컬럼을 기준으로 자주 필터링 한다면 자주 필터링하는 컬럼을 기준으로 파티션을 repartitions
- coalesce : 전체 데이터를 셔플하지 않고 파티션을 병합하는 경우에 사용
// repartitions & coalesce
//repartitions
flight15.rdd.getNumPartitions // 파티션 수 확인
flight15.repartition(4) // 파티션 수 지정
flight15.repartition(4, col("DEST_COUNTRY_NAME")) // 파티션 수, 기준이 되는 컬럼 지정
// coalesce flight15.repartition(4, col("DEST_COUNTRY_NAME")).coalesce(2)
3) 임의 분할하기
val splitFlight15=flight15.randomSplit(Array(0.25,0.75),100)
splitFlight15(0).count() > splitFlight15(1).count()
/*
splitFlight15:
Array[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]]
= Array([DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field],
[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field])
res50: Boolean = false
*/
** Select, SelectExpr
Select, SelectExpr메서드를 가지고 SQL쿼리로 DataFrame을 다룰 수 있다.
// Select, SelectExpr
flight15.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME").show(3)
/*
+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
| United States| Romania|
| United States| Croatia|
| United States| Ireland|
+-----------------+-------------------+
only showing top 3 rows
*/
// Select, SelectExpr
flight15.selectExpr("avg(count)","count(distinct(DEST_COUNTRY_NAME))").show(3)
/*
+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625| 132|
+-----------+---------------------------------+
*/
[Column Handling]
1) 컬럼 생성
- 컬럼을 생성하는 방법으로 col, columns 함수가 있습니다. 컬럼명을 인수로 받아 컬럼을 생성합니다.
// 컬럼 생성
import org.apache.spark.sql.functions.{col,column}
col("someColumnName")
column("someColumnName")
// 다양한 컬럼 참조 방법
// 아래는 모두 컬럼을 참조하는 방법
flight15.select(
expr("DEST_COUNTRY_NAME AS dest"),
col("DEST_COUNTRY_NAME").alias("DEST"),
'DEST_COUNTRY_NAME,
$"DEST_COUNTRY_NAME",
column("DEST_COUNTRY_NAME")
).show(2)
/*
+-------------+-------------+-----------------+-----------------+-----------------+
| dest| DEST|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-------------+-------------+-----------------+-----------------+-----------------+
|United States|United States| United States| United States| United States|
|United States|United States| United States| United States| United States|
+-------------+-------------+-----------------+-----------------+-----------------+
only showing top 2 rows
*/
2) 컬럼 추가
- withColumn으로 컬럼 추가
- withColumn는 컬럼명과 값을 생성하는 표현식, 2개의 인수를 사용한다.
// 컬럼 추가 : withColumn
flight15.withColumn("withinCountry",expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))
.show(3)
/*
+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
| United States| Romania| 15| false|
| United States| Croatia| 1| false|
| United States| Ireland| 344| false|
+-----------------+-------------------+-----+-------------+
only showing top 3 rows
*/
3) 컬럼명 변경
- withColumnRenamed로 컬럼명 변경
//컬럼명 변경
flight15.withColumnRenamed("DEST_COUNTRY_NAME","dest").columns
// res12: Array[String] = Array(dest, ORIGIN_COUNTRY_NAME, count)
4) 예약 문자와 키워드
- 공백이나 하이픈 같은 예약 문자는 컬럼명에 사용할 수 없음
- 예약 문자를 컬럼명에 사용하려면 (`)문자를 이용해 이스케이핑 해야함.
// 예약문자와 키워드
import org.apache.spark.sql.functions.expr
val dfWithLongColName=flight15.withColumn(
"This Long Column-Name",
expr("ORIGIN_COUNTRY_NAME")
)
dfWithLongColName.selectExpr(
"`This Long Column-Name`",
"`This Long Column-Name` as `new col`")
.show(2)
5) 컬럼 제거
- select외에 drop으로도 컬럼 제거 가능
// 5) 컬럼 제거 : drop
flight15.drop("ORIGIN_COUNTRY_NAME").columns
// res28: Array[String] = Array(DEST_COUNTRY_NAME, count)
6) 데이터 타입 변경
- cast 메서드로 데이터 타입을 변환
// 6) 데이터 타입 변경 : integer타입을 string타입으로 변환
flight15.withColumn("count2",col("count").cast("string"))
//res32: org.apache.spark.sql.DataFrame
// = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 2 more fields]
'Data_Analysis > Spark, Zeppelin' 카테고리의 다른 글
[Spark] 집계 연산 (0) | 2021.03.07 |
---|---|
[Spark] 구조적 API 기본 연산2 - 로우 (0) | 2020.12.26 |
[Spark] 구조적 API 개요 (0) | 2020.12.23 |
[Spark] 아파치 스파크 들어가기 (0) | 2020.12.23 |
[Zeppelin] Zepplelin 설치 및 실행 (0) | 2020.12.20 |
댓글