본문 바로가기
Data_Analysis/Spark, Zeppelin

[Spark] 구조적 API 기본 연산1 - 컬럼

by Classic! 2020. 12. 26.

 

Spark 포스트는 책 「스파크 완벽 가이드 : 스파크를 활용한 빅데이터 처리와 분석의 모든 것」를 바탕으로 쓴 것임을 알려드립니다.

또한, Scala 코드 위주로 작성한 점 참고 바랍니다. 

 

스파크 완벽 가이드

스파크 창시자가 알려주는 스파크 활용과 배포, 유지 보수의 모든 것. 오픈소스 클러스터 컴퓨팅 프레임워크인 스파크의 창시자가 쓴 스파크에 대한 종합 안내서다. 스파크 사용법부터 배포, 유

www.aladin.co.kr


 

지난 포스팅에서 구조적 API의 예로 DataFrame을 언급했습니다. DataFrame은 컬럼과 로우로 구성된 로우와 컬럼을 가지는 분산 테이블 형태의 컬렉션입니다. 그러므로 DataFrame 핸들링도 로우와 컬럼에 대한 트랜스포메이션을 위주로 살펴보고자 합니다.

 

[DataFrame 트랜스포메이션]

1) DataFrame 생성하기

- 외부 파일 불러와 생성 :  Spark 폴더 아래 data폴더에 사용할 파일을 저장한 다음, 해당 파일을 불러와 사용했습니다.

// DataFrame 생성
// 파일 불러오기
val flight15=spark.read.format("json")
	.load("C:/HadoopEco/spark-2.4.7-bin-hadoop2.7/data/flight-data/json/2015-summary.json")

flight15

// 생성된 데이터 프레임을 SQL쿼리로 실행하고 트랜스포메이션하기 위해 임시 뷰로 등로
df.createOrReplaceTempView("dfTable")

 

- 직접 DataFrame 생성하기

import org.apache.spark.sql.types.{StructField,StructType,StringType,LongType}
import org.apache.spark.sql.types.Metadata

val mySchema=StructType(Array(
    StructField("DEST_COUNTRY_NAME",StringType,true),
    StructField("ORIGIN_COUNTRY_NAME",StringType,true),
    StructField("count",LongType,false,
        Metadata.fromJson("{\"hello\":\"world\"}"))
    ))

또는 Row 객체를 가진 Seq타입을 변환하여 생성합니다.

// Row객체를 가진 Seq타입을 변환하여 생성
import org.apache.spark.sql.Row
val myDf=Seq(("Hello",2,1L)).toDF("col1","col2","col3")

2) repartitions & coalesce

- 두 메서드 모두 컬럼을 기준으로 데이터를 분할하여 최적화하는 기법
- repartitions : 전체 데이터를 셔플. 향후 사용할 파티션 수가 현재 파티션 수보다 많거나 컬럼을 기준으로 파티션을 만드는 경우에만 사용
- 특정 컬럼을 기준으로 자주 필터링 한다면 자주 필터링하는 컬럼을 기준으로 파티션을 repartitions 
- coalesce : 전체 데이터를 셔플하지 않고 파티션을 병합하는 경우에 사용

// repartitions & coalesce 
//repartitions 
flight15.rdd.getNumPartitions // 파티션 수 확인 
flight15.repartition(4) // 파티션 수 지정 
flight15.repartition(4, col("DEST_COUNTRY_NAME")) // 파티션 수, 기준이 되는 컬럼 지정 

// coalesce flight15.repartition(4, col("DEST_COUNTRY_NAME")).coalesce(2)

 

3) 임의 분할하기 

val splitFlight15=flight15.randomSplit(Array(0.25,0.75),100)
splitFlight15(0).count() > splitFlight15(1).count()

/*
splitFlight15: 
	Array[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] 
	= Array([DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field], 
    [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field])
res50: Boolean = false
*/

 

** Select, SelectExpr

Select, SelectExpr메서드를 가지고 SQL쿼리로 DataFrame을 다룰 수 있다.

// Select, SelectExpr
flight15.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME").show(3)

/*
+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
|    United States|            Ireland|
+-----------------+-------------------+
only showing top 3 rows
*/
// Select, SelectExpr
flight15.selectExpr("avg(count)","count(distinct(DEST_COUNTRY_NAME))").show(3)

/*
+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+
*/

 

 

 

[Column Handling]

1) 컬럼 생성

- 컬럼을 생성하는 방법으로 col, columns 함수가 있습니다. 컬럼명을 인수로 받아 컬럼을 생성합니다. 

// 컬럼 생성
import org.apache.spark.sql.functions.{col,column}

col("someColumnName")
column("someColumnName")


// 다양한 컬럼 참조 방법
// 아래는 모두 컬럼을 참조하는 방법
flight15.select(
    expr("DEST_COUNTRY_NAME AS dest"),
    col("DEST_COUNTRY_NAME").alias("DEST"),
    'DEST_COUNTRY_NAME,
    $"DEST_COUNTRY_NAME",
    column("DEST_COUNTRY_NAME")
    ).show(2)
    
/*
+-------------+-------------+-----------------+-----------------+-----------------+
|         dest|         DEST|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-------------+-------------+-----------------+-----------------+-----------------+
|United States|United States|    United States|    United States|    United States|
|United States|United States|    United States|    United States|    United States|
+-------------+-------------+-----------------+-----------------+-----------------+
only showing top 2 rows
*/

 

2) 컬럼 추가

- withColumn으로 컬럼 추가

- withColumn는 컬럼명값을 생성하는 표현식, 2개의 인수를 사용한다.

// 컬럼 추가 : withColumn
flight15.withColumn("withinCountry",expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))
        .show(3)

/*
+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
|    United States|            Ireland|  344|        false|
+-----------------+-------------------+-----+-------------+
only showing top 3 rows
*/

 

3) 컬럼명 변경

- withColumnRenamed로 컬럼명 변경

 

//컬럼명 변경
flight15.withColumnRenamed("DEST_COUNTRY_NAME","dest").columns     

// res12: Array[String] = Array(dest, ORIGIN_COUNTRY_NAME, count)

 

4) 예약 문자와 키워드

- 공백이나 하이픈 같은 예약 문자는 컬럼명에 사용할 수 없음
- 예약 문자를 컬럼명에 사용하려면 (`)문자를 이용해 이스케이핑 해야함.

// 예약문자와 키워드


import org.apache.spark.sql.functions.expr

val dfWithLongColName=flight15.withColumn(
    "This Long Column-Name",
    expr("ORIGIN_COUNTRY_NAME")
    )

dfWithLongColName.selectExpr(
    "`This Long Column-Name`",
    "`This Long Column-Name` as `new col`")
    .show(2)

 

5) 컬럼 제거

- select외에 drop으로도 컬럼 제거 가능

// 5) 컬럼 제거 : drop
flight15.drop("ORIGIN_COUNTRY_NAME").columns
// res28: Array[String] = Array(DEST_COUNTRY_NAME, count)

 

6) 데이터 타입 변경

- cast 메서드로 데이터 타입을 변환

// 6) 데이터 타입 변경 : integer타입을 string타입으로 변환
flight15.withColumn("count2",col("count").cast("string")) 

//res32: org.apache.spark.sql.DataFrame 
// 		= [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 2 more fields]

댓글