본문 바로가기

프로그래밍(TA, AA)/JVM 언어

[SPARK] 스파크 주요 키워드

spark.serializer = org.apache.spark.serializer.KryoSerializer(카이로 써드파티 직렬화 라이브러리)

spark.rdd.compress = true

spark.dynamicAllocation.maxExecutors = 512

spark.debug.maxToStringFields = 1000


HiveMetaStore / hive.metastore


storage.MemoryStore

storage.BlckmanagerInfo


spark.sparkContext(애플리케이션 전체의 실행 관련 정보 집약 객체로 스케줄러 등이 포함 - RDD 생성시)

spark.sparkSession(스파크컨텍스트에 세션정보가 추가로 포함 - DataFrame 생성시)


sparkParquet(스파크로 읽어서 데이터를 볼 수 있음)



[스파크 세션(SparkSession)]

데이터셋을 다루기 위해 가장 먼저 알아야 할것은 SparkSession이다. RDD를 생성하기 위해 SparkContext가 필요했던 것처럼 데이터프레임을 생성하기 위해서는 SparkSession을 이용해야 한다. SparkSession은 인스턴스 생성을 위한 build() 메서드를 제공하고, 이 메서드를 이용하면 기존 인스턴스를 재사용하거나 새로운 인스턴스를 생성할 수 있다. 만약 스파크셸을 사용하다면 스파크 셸이 자동으로 spark라는 이름으로 SparkSession 인스턴스를 생성해주므로 별도의 생성 단계를 거치지 않고 spark라는 변수를 통해 접근할 수 있다. SparkSession 이전에는 Hive 사용 여부에 따라 SQLContext와 HiveContext라는 별도의 API를사용했지만 SparkSession에서는 Hive 지원이 기본 사항으로 바뀌면서 기존 API는 더이상 사용하지 않게 되었다.



[스파크 코어]

스파크 코어(Spark Core)는 작업 스케줄링 / 메모리 관리 / 장애 복구 / 저장 장치와의 연동 등등 기본적인 기능들로 구성된다. 스파크 코어는 RDD(Resilient Distributed Dataset)를 정의하는 API의 기반이 되며, 이것이 주된 스파크 프로그래밍 추상화 구조이다.


RDD는 여러 컴퓨터 노드에 흩어져 있으면서 병렬 처리될 수 있는 아이템들의 모음을 표현한다. 스파크 코어는 이 모음들을 생성하고 조작할 수 있는 수많은 API를 지원한다.



[스파크 SQL]

스파크 SQL은 정형 데이터를 처리하기 위한 스파크의 패키지이다. 스파크 SQL은 SQL뿐만 아니라 하이브 테이블, Parquet, JSON 등 다양한 데이터 소스를 지원한다. 특히 아파치 하이브의 SQL 변형을 써서 데이터에 질의를 보낼 수 있다.


스파크 SQL은 단순히 SQL 인터페이스를 제공하는 것 이상으로 SQL과 복잡한 분석 작업을 서로 연결할 수 있도록 지원한다. 그 덕분에 개발자가 단일 애플리케이션 내에 파이썬, 자바, 스칼라의 RDD에서 지원하는 코드를 데이터 조작을 위해 SQL 쿼리와 함께 사용할 수 있다. 



[스파크 스트리밍]

스파크 스트리밍은 실시간 데이터 스트림(순차적으로 드어오는 데이터 흐름을 데이터 스트림이라고 부른다. 전체 데이터를 기다릴 필요없이 도착하는 순서대로 처리할 수 있음)을 처리 가능하게 해 주는 스파크의 컴포넌트이다. 여기서 데이터스트림이라는 것은 제품 환경의 웹서버가 생성한 로그 파일이나 웹서비스의 사용자들이 만들어내는 상태 업데이트 메시지들이 저장되는 큐(queue)같은 것들도 해당된다. 


스파크 스트리밍은 스파크 코어의 RDD API와 거의 일치하는 형태의 데이터 스트림 조작 API를 지원함으로써 프로그래머들이 프로젝트에 빠르게 익숙해지게 해준다. 그리고 메모리나 디스크에 저장되어 있는 데이터를 다루는 애플리케이션이든 실시간으로 받는 데이터를 다루는 애플리케이션이든 서로 바꿔가면서 다루더라도 혼란없게 해준다.



[스파크의 저장소 계층]

스파크는 HDFS이나 하둡 API가 지원하는 다른 저장시스템(로컬 파일 시스템, 아마존 S3, 카산드라, 하이브, HBase 등등)에 있는 어떤 파일로부터든 분산데이터 모음을 만들 수 있다. 스파크는 단순히 하둡 API를 사용하는 저장 시스템들을 지원하는 것일뿐. 스파크는 텍스트 파일, 시퀀스 파일, 에이브로, Parquet뿐만 아니라 다른 하둡의 InputFormat이 지원하는 파일까지 지원한다.


[스파크 SQL]

스파크 SQL은 데이터프레임으로 대표되는 스키마 기반의 데이터 처리와 더불어 언어에 따른 구현 방법에 대한 중립성, SQL 및 하이브QL 지원을 통한 개발 편의성 등으로 업무 현장에서 널리 사용되고 있다. 




넓게 보면 모든 스파크 애플리케이션은 클러스터에서 다양한 병렬 연산을 수행하는 드라이버 프로그램으로 구성된다. 드라이버 프로그램은 개발자가 만든 애플리케이션의 main 함수를 갖고 있으며 클러스터의 분산 데이터세트를 정의하고 그 데이터세트에 연산 작업을 수행한다. 


드라이버 프로그램들은 연산 클러스터에 대한 연결을 나타내는 SparkContext 객체를 통해 스파크에 접속한다. 셸에서 이 SparkContext 객체는 자동적으로 sc라는 변수에 만들어진다.


scala> sc

res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@4c8d45cf


SparkContext 객체를 하나 만들었다면 그것으로 RDD를 만들어 낼 수 있다. RDD를 만들기 위해 sc.textFile()을 호출하고나면 그 라인에 count() 같은 다양한 연산을 수행해 볼 수 있다.


이런 연산들을 수행하기 위해 드라이버 프로그램들은 보통 executor라 불리는 다수의 노드(대체적으로 '노드'는 클러스터의 머신 하나를 표현하는 의미로 생각하면 된다. 여기서 executor 프로그램이 실행되는 머신.)를 관리한다.



스파크 API의 기능 대부분은 filter같이 사용자 함수 기반으로 이루어지는 연산들이 클러스터 위에서 실행되는데, 병렬 처리까지 동시에 된다는 것이다. 즉, 스파크는 직접 짠 함수를 가져다가 이것을 실제 실행 노드로 보내준다. 그러므로 그냥 하나의 프로그램만 짜서 돌려도 여러 개의 노드에서 돌아갈 수 있다.



[SparkContext 초기화하기]

한 애플리케이션이 스파크에 연동되려면 우선 프로그램 내에서 관련 스파크 패키지들을 import하고 SparkContext 객체를 생성해야한다. 이를 위해서는 먼저 SparkConf 객체를 만들어 애플리케이션에 필요한 설정을 해야 SparkContext를 생성할 수 있다.


import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.Sparkcontext._

val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)


위는 SparkContext를 초기화하는 가장 간단한 형태이며, 이때 다음의 두가지 인자를 전달해 주어야 한다.


클러스터 URL: 위 예제에서 local이라고 쓰인 부분. 스파크에게 어떤 식으로 클러스터에게 접속할지 알려준다. local은 한 개의 쓰레드나 단일의 로컬 머신에서 돌때 따로 접속할 필요가 없음을 알려주는 특수한 값이다.


애플리케이션 이름: 위 예제에서 My App 부분. 클러스터에 접속한다면 클러스터 UI에서 저 이름으로 애플리케이션을 구분할 수 있다.


(물론 애플리케이션 실행과 클러스터에 분배되는 과정에 대한 추가적인 인자들이 더 존재하기는 한다.)


SparkContext를 초기화했다면 이제 앞에 나왔던 모든 메소드들을 써서 RDD를 만들고 다룰 수 있다. 최종적으로 스파크를 shut down하려면 SparkContext에서 stop() 메소드를 호출하거나 그냥 애플리케이션을 끝내면 된다.



[SparkSession과 SparkContext]

RDD를 생성하려면 SparkContext 객체를 먼저 생성하고, DataFrame 또는 DataSet을 생성하려면 SparkSession 객체를 먼저 생성해야 한다. 하지만 정확하게 말하면 스파크세션 객체 안에는 스파크컨텍스트 객체가 포함돼 있기 때문에 RDD를 만들때나 데이터프레임을 만들때나 상관없이 스파크세션 객체를 먼저 생성하면 된다.

// spark는 스파크세션 객체
val input = List("a", "b", "c")
val rdd = spark.sparkContext.parallelize(input)

스파크세션 클래스의 생성자 인자는 모두 4개이며 각각의 타입이 SparkContext, SharedState, SessionState, SparkSessionExetensions이다. 즉, 스파크세션 객체를 만들려면 스파크컨텍스트를 비롯한 SessionState 객체가 먼저 생성돼 있어야 하며 실제로 어딘가에 이 부분을 처리하는 코드가 있다는 것을 의미한다. 이부분은 굳이 찾아보지 않더라도 스파크세션을 만드는 방법을 생각해보면 대충 감을 잡을 수 있다.

// 스파크세션 생성
val spark = SparkSession
	.builder()
	.appName("MyApp")
	.master("local[*]")
	.getOrCreate()

위 코드는 스파크세션을 생성하는 코드로, 필요한 설정값을 지정한 다음 맨 마지막에 getOrCreate를 호출하게 돼 있다. 바로 이 부분이 스파크컨텍스트를 포함한 나머지 객체를 생성하는 부분이며 이를 통해 새로운 스파크세션 객체를 생성할 수 있다.


스파크세션은 스파크컨텍스트에 세션 상태 정보를 추가로 담은 것이라고 할 수 있다.


스파크세션에 세션 상태 정보라는 것을 추가한 이유는 무엇일까? 스파크 컨텍스트는 스파크가 동작하기 위한 각종 백엔드 서비스에 대한 참조를 가지고 있는 객체라고 할 수 있다.


실제로 스파크컨텍스트 클래스는 statusTracker, dagScheduler, taskScheduler 등 백엔드 서비스들에 직접 접근할 수 있는 참조 변수를 가지고 있으며 RDD API를 이용해 필요한 연산을 수행할 경우 내부적으로 이러한 참조 변수를 직접 사용해 요청한 작업을 처리하게 된다.


따라서 스파크에서 동작하는 모든 애플리케이션은 백엔드 서버와 통신하기 위해 스파크컨텍스트 객체를 사용해야 하며 같은 이유로 스파크세션의 경우에도 스파크컨텍스트를 먼저 생성한 후 이에 대한 참조를 내부적으로 유지하고 있는 것이다.


스파크에서 사용자가 RDD와 같은 백엔드 서비스에 직접 접근 가능한 API를 사용하는 대신 RDD보다 한 단계 추상화된 API를 사용해 코드를 작성하게 하고, 이 코드를 내부적인 최적화 과정을 거쳐 실제 동작 가능한 RDD 기반의 코드로 전환하는 방법을 채택하게 됐는데, 이 새로운 API가 바로 데이터프레임과 데이터셋이라고 할 수 있다.

private[sql] class SessionState(
	sharedState: SharedState,
	val conf: SQLConf,
	val experimentalMethod: ExperimentalMethods,
	val functionRegistry: FunctionRegistry,
	val udfRegistration: UDFRegistration,
	catalogBuilder: () => SessionCatalog,
	val sqlParser: ParserInterface,
	analyzerBuilder: () => Analyzer,
	optimizerBuilder: () => Optimizer,
	val planner: SparkPlanner,
	val streamingQueryManager: StreamingQueryManager,
	val listenerManager: ExecutionListenerManager,
	resourceLoaderBuilder: () => SessionResourceLoader,
	createQueryExecution: LogicalPlan => QueryExecution,
	createClone: (SparkSession, SessionState) => SessionState) {
	...
}

데이터프레임이 사용자가 입력한 코드를 최적화해서 실제 동작 가능한 코드로 바꾼다 하였는데, Analyzer/Optimizer/SparkPlanner/QueryExecution 등이 이와 관련된 역할을 수행하는 것들이다. 구체적으로는 아래와 같은 절차를 따르게 된다.


- 데이터프레임 연산을 통해 LogicalPlan 생성(QueryExecution의 logical로 조회되는 쿼리)

- 생성된 LogicalPlan을 SessionState의 Analyzer에 전달해서 미식별 정보에 대한 처리를 진행한 후 그 결과로 수정된 LogicalPlan을 생성(QueryExecution의 analyzed로 조회되는 쿼리)

- Analyzer가 생성한 LogicalPlan을 SessionState의 Optimizer에 전달, 최적화 과정을 수행한 후 새로운 LogicalPlan을 생성(QueryExecution의 optimizedPlan으로 조회되는 쿼리)

- 최적화된 LogicalPlan을 SessionState의 SparkPlanner에 전달해서 SparkPlan 생성(QueryExecution의 sparkPlan으로 조회되는 쿼리)

- 생성된 SparkPlan에 추가적인 최적화 과정을 적용한 후 최종 SparkPlan 생성(QueryExecution의 executedPlan으로 조회되는 쿼리)


쿼리 실행 계획은 내부적으로 이 같은 최적화 단계를 거쳐서 생성된 것이며 이러한 최적화 기능이 데이터프레임 내부에서 이뤄지고 있음을 확인할 수 있다. QueryExecution 객체를 이용하면 런타임에 생성된 코드 정보와 최적화에 사용된 각종 클래스 정보 등도 확인할 수 있으므로 실제 업무를 수행하는 과정에서는 최신 API를 참고해서 이러한 정보들을 잘 활용하것도 내부 구조를 이해하고 성능을 최적화하는데 도움이 될것이다.