본문 바로가기

프로그래밍(TA, AA)/JVM 언어

[SPARK] 스파크의 처리 모델 RDD

스파크의 데이터처리는 RDD라는 자료구조를 이용하다. 스파크의 프로그래밍 모델은 RDD를 가공해 새로운 RDD를 만들고, 이런 처리를 반복하여 원하는 결과를 얻는 형태다. 이런 동작원리를 자세히 알아보고 RDD의 구조와 특징, RDD 중심의 분산처리가 클러스터 환경에서는 어떤식으로 이루어질까?


[RDD 구조와 특징]

RDD는 대량의 데이터를 요소로 가지는 분산 컬렉션(거대한 배열과 리스트 등의 자료구조?)이다. 

RDD는 여러 머신으로 구성된 클러스터 환경에서의 분산처리를 전제로 설계되었고, 내부는 파티션이라는 단위로 나뉜다. 스파크에서는 이 파티션이 분산처리 단위다. RDD를 파티션 단위로 여러 머신에서 처리하므로 한 대의 머신으로 처리할 수 있는 것보다 더 큰 데이터를 다룰 수 있다.


RDD는 배열/리스트 등의 요소를 저장하며, 내부는 여러 개의 파티션(분산 처리 단위)으로 구분된다.


사용자는 HDFS 등의 분산 파일시스템에 있는 파일 내용을 RDD로 로드하고, RDD를 가공하는 형식으로 대량의 데이터를 분산처리할 수 있다. 스파크에서는 이런 가공을 변환(transformation)이라고 한다. 그리고 RDD의 내용에 따라 액션(action)이라는 처리를 적용하여 원하는 결과를 얻는다.






[RDD 다루기]

RDD에는 변환/액션 두 종류의 처리를 적용할 수 있다.



변환(transformation) : (데이터) 요소 위주(element-wise)


트랜스포메이션은 존재하는 RDD에서 새로운 RDD를 만들어낸다. 


변환이란 RDD를 가공하고 그 결과 새로운 RDD를 얻는 처리다. 새로운 RDD가 가지는 요소는 변환처리 전의 RDD에 들어있던 요소를 가공하거나 필터링해 생성된다.


변환은 다시 두 종류로 구분할 수 있다. 변환처리 전의 RDD가 가지는 요소를 같은 RDD의 다른 요소들과 관계없이 처리할 수 있는 첫번째 방식이 있다.


filter: 요소를 필터링한다.

map: 각 요소에 동일한 처리를 적용한다.

flatmap: 각 요소에 동일한 처리를 적용하고 여러 개의 요소를 생성한다. (문장요소를 단어로 분해)

zip: 파티션 수가 같고, 파티션에 있는 요소의 수도 같은 두 개의 RDD를 조합해 한쪽의 요소 값을 키로, 다른 한쪽의 요소 값을 value로 같은 key-value pair를 만든다.


두번째 종류는, 변환 전의 RDD가 가지는 요소를 같은 RDD의 다른 요소와 함께 처리하는 변환이다. 그 변환 대상은 키와 밸류의 쌍을 요소로 가지는 RDD다. 같은 키를 가지는 요소를 한데 모아 처리하는데, 이때 같은 키를 가지는 요소가 전부 같은 파티션에 있어야 한다. 스파크는 파티션 단위로 독립해 분산처리하기 때문이다. 이때 서로 다른 파티션에 있는 같은 키를 가지는 요소의 자리를 바꾸는 것을 셔플이라고 한다.


셔플은 변환하기 전의 RDD 요소를, 변환 후에 키를 기준으로 각 파티션에 배분한다. 따라서 셔플에 의해 같은 키를 가지는 요소가 같은 파티션에 있도록 보증할 수 있다. 이러한 종류의 변환에는 다음과 같은 것이 있다.


reduceByKey: 같은 키를 가지는 요소를 aggregation한다.

join: 두 개의 RDD에서 같은 키를 가지는 요소끼리 join한다.


셔플할때 같은 키를 가지는 요소를 같은 파티션에 모으는 것이 '파티셔너'의 역할이다. 파티셔너는 변환 후 RDD의 파티션 수와 파티션 이동의 대상이 되는 요소의 키를 기준으로, 요소를 모을 곳(파티션)을 결정한다. 스파크는 키의 해시값을 변환한 후, RDD의 파티션 수로 나눈 나머지를 디폴트 기준으로 삼아 모을 곳(파티션)을 결정한다.



액션(action)


RDD를 기초로 결과 값을 계산하며 그 값을 드라이버 프로그램에 되돌려 주거나 외부 스토리지에 저장하기도 한다.


변환이 RDD로부터 다른 RDD를 얻는 '데이터 가공'에 해당하는 조작이라면, 액션은 RDD 내용을 바탕으로 데이터를 가공하지 않고 원하는 결과를 얻는 조작이다.


스파크는 트랜스포메이션과 액션을 매우 다르게 취급하므로 자신이 실행하는 연산의 타입에 대한 이해는 매우 중요한다. 사용하는 함수가 때대로 트랜스포메이션인지 액션인지 헷갈린다면, 반환 타입을 보면 된다. 트랜스포메이션은 RDD를 되돌려 주지만, 액션은 그 외의 다른 데이터 타입을 반환한다.



saveAsTextFile: RDD의 내용을 파일로 출력한다. (같은 디렉터리에, 파티션 단위로 각 파일을 따로 출력)

count: RDD의 요소 수를 센다


이를 위해 두 가지 액션, 개수를 숫자로 되돌려 주는 count()와 RDD에서 여러 개의 데이터를 가져오는 take()를 사용한 샘플 코드를 작성해보면 아래와 같다.

println("Input had " + badLinesRDD.count() + " concerning lines")
println("Here are 10 examples:")
badLinesRDD.take(10).foreach(println)

위 예제에서 드라이버 프로그램에서 RDD의 데이터 일부를 가져오기 위해 take()를 사용했다. 그리고 드라이버 프로그램에서 정보를 출력하기 위해 로컬에서 반복 작업을 수행했다. RDD들은 전체 RDD 데이터를 가져올수 있게 해주는 collect()라는 함수도 지원한다. 이 함수는 RDD를 filter()에 의해 작은 크기의 데이터세트의 RDD로 만든 후 분산이 아닌 로컬에서 데이터를 처리하고 싶을 때 유용하다. collect()를 사용할때는 전체 데이터세트가 사용하는 단일 컴퓨터의 메모리에 올라올수 있을 정도의 크기여야 하고 데이터세트가 너무 크면 collect()를 사용할 수 없다는 점을 꼭 기억하자.


RDD가 드라이버 프로그램에 의해 collect()가 불가능한 대부분의 경우는 데이터가 너무 크기 때문이다. 이런 경우에는 HDFS나 아마존 S3 같은 분산 파일 시스템에 데이터를 써버리는 것이 일반적이 해결책이다. RDD의 내용들은 saveAsTextFile()이나 saveAsSequenceFile(), 여타 내장된 파일 포맷용 액션들을 써서 저장할 수 있다.





[스파크가 클러스터 환경에서 RDD를 처리하는 방식]

스파크는 여러 머신으로 구성된 클러스터 환경에서의 동작을 전제로 한 분산처리 플랫폼이다. 스파크를 포함해 클러스터 환경에서 처리를 시행하는 일반적인 시스템은 클러스터 내의 계산 리소스를 관리하는 기능이 필요하다. 스파크는 다음 세 종류의 클러스터 관리 시스템을 지원한다.


YARN: 하둡의 클러스터 관리 시스템이다. 분산처리를 위한 범용 클러스터 관리 시스템으로 설계되어, 스파크 이외의 분산처리 프레임워크를 돌릴 수 있다. 분산처리의 Datasource로 하둡의 분산 파일시스템인 HDFS를 이용할 경우, 클러스터 관리 시스템으로 YARN을 같이 사용하면 데이터 지역성이 고려(처리에 필요한 데이터가 보존된 머신에 처리가 할당)되어 효율적인 I/O 처리가 이루어진다.


Mesos: YARN과 마찬가지로 범용 클러스터 관리 시스템이다. 처리에 할당하는 CPU 코어 수의 분배를 동적으로 바꿀 수 있는 등의 세세한 제어가 가능하다.


Spark Standalone: 스파크에 번들되는 전용 클러스터 관리 시스템이다. 별도의 클러스터 관리 시스템 없이 편리하게 이용할 수 있다.


클러스터 관리 시스템 하에서 각 머신은 마스터 노드 또는 워커 노드로 동작한다. 마스터 노드는 클러스터 내의 계산 리소스를 집중하며 관리 역할을 담당한다. 한편 워커 노드는 CPU 코어와 메모리 등의 계산 리소스를 제공하고 할당된 처리를 실행한다. 마스터 노드와 복수의 워커 노드로 구성된 클러스터 환경에서는 다음과 같은 프로세스로 분산처리가 이루어진다.



애플리케이션 배포와 계산 리소스 요구


스파크로 분산처리할 때는 RDD 생성과 일련의 변환으로 구성된 스파크 애플리케이션을 클라이언트가 클러스터에 배포한다.


클라이언트는 애플리케이션을 배포함과 동시에 애플리케이션 실행에 필요한 executor의 스펙을 지정한다. executor란 워커 노드에서 구동하여 스파크 애플리케이션을 분산처리하는 프로세스를 말한다. executor 스펙으로는 CPU 코어 수의 할당량과 메모리 할당량, 클러스터 내에서 구동할 executor 수를 지정할 수 있다.



클러스터 내 계산 리소스 확보


클러스터에 애플리케이션이 배포되면, 마스터 노드는 각 워커 노드의 이용 가능한 리소스양과 클라이언트가 요청하는 executor 스펙을 고려하여 하나 이상의 워커 노드에 executor의 구동을 요구한다.



드라이버 프로그램 구동


클러스터 내에 리소스과 확보됨과 동시에 '드라이버 프로그램'이 구동된다. 드라이버 프로그램이란 사용자에 의해 정의되는 스파크 애플리케이션의 entry point(C나 자바의 함수와 같은 실행 포인트)가 되는 프로그램이다. 사용자가 기술한 RDD의 생성/변환 로직을 이용해 애플리케이션을 제어하는 역할을 담당한다.


드라이버 프로그램은 배포 시점의 옵션에 따라 클라이언트에서 구동할지, 워커 노드에서 구동할지를 결정할 수 있다.



태스크 스케줄링과 실행


스파크 애플리케이션에서는 RDD 생성부터 액션 적용까지(잡이라는 단위)로 처리한다. job은 드라이버 프로그램에 포함되는 scheduler에 의해 executor가 처리 가능한 task라는 단위로 분할되며, 그 실행이 스케줄링된다. task는 파티션 단위로 데이터를 로드하고 변환/액션을 적용하는 처리 단위이기도 하다. 따라서 executor가 각 태스크를 처리함으로써 RDD 전체가 분산처리되는 것이다.



[드라이버 프로그램]

스파크 애플리케이션의 동작을 제어하는 것은 '드라이버 프로그램'이다. 스파크 애플리케이션을 개발할 때, 사용자는 드라이버 프로그램에 RDD의 생성/변환/액션의 로직 등을 기술한다. 스파크에서는 다음과 같이 collection을 다루듯 RDD를 생성/변환을 기술할 수 있다.


// HDFS등의 파일시스템에 저장된 거대한 텍스트파일을 가지고 RDD를 생성
val textRDD = sc.textFile("/path/to/huge-text")
// RDD의 각 요소에 변환을 적용
val mappedRDD = textRDD.map(text => someFunction(text))
val filteredRDD = textRDD.filter(processedText => filterFunction(processedText))
...
// RDD에 액션을 적용하고, 원래의 텍스트파일을 가공한 결과를 저장한다.
processedRDD.saveAsTextFile("/path/to/result")


RDD는 한대의 머신에서 전부 다룰 수 없을 만큼 크고 많은 데이터를 가질 수 있다. 드라이버 프로그램에서 RDD를 생성/변환할 수 있는 요인은 무엇일까? 위 샘플코드에서 마치 드라이버 프로그램에서 RDD에서 데이터를 로드하거나 변환/액션을 실행하는 듯 보인다. 그러나 이는 데이터 로드/변환/액션의 적용을 선언한 것에 불과하다. 즉, 실제 처리는 드라이버 프로그램에서 이루어지지 않으며 잡 형태로 클러스터에서 실행된다. 그리고 실행 시점은 드라이버 프로그램에서 액션의 적용이 선언될 때까지 지연된다.


RDD라는 분산 컬렉션과 지연 평가에 의해, 사용자는 대량 데이터처리를 평소 다루던 컬렉션처럼 기술할 수 있다.



[태스크 스케줄링]

RDD의 생성/로드는 lazy evaluation되므로, RDD가 데이터를 가진 상태가 되는 것(인스턴스화)은 클러스터를 처리할 때다. 드라이버 프로그램에서 생성된 RDD에는 태스크 작성과 스케줄링에 필요한 정보가 포함된다.


 - RDD의 본래 데이터 정보, 또는 변환 하기 전의 RDD

 - RDD 생성에 필요한 데이터를 어떤 식으로 로드해야 하는가? 또는, RDD에 어떤 변환을 실행해야 하는가?

 - 변환 후 RDD 요소의 타입

 - 변환 후 RDD 파티션의 수

 - 파티셔너(셔플을 발생시키는 변환 후의 RDD의 경우)

 - RDD가 영속화(persistence)되었는가?


드라이버 프로그램에서 RDD에 대한 액션 적용이 선언되면, 해당 드라이버 프로그램에서 동작하는 스케줄러가 이 정보를 바탕으로 변환의 전체적인 흐름을 해석한다. 그리고 executor간의 네트워크 통신과 I/O 부하가 가능한 한 작아지도록 태스크를 구성하는 등 스케줄링에 도움을 준다.


변환의 전체적인 흐름을 해석한 결과 로드할 데이터의 위치를 알게 되면, 데이터 지역성을 고려해 데이터에 물리적으로 가까운 곳에서 실행되는 executor를 할당할 수 있다. RDD가 클러스터 환경에서 persistence되면 해당 RDD를 인스턴스화하는 과정을 생략하도록 스케줄링할 수 있다.


또한, 변환에 의해 셔플이 필요해질지를 파악하여 네트워크 통신 부하를 최소한으로 억제할 수도 있다. 이미 설명했듯이 셔플은 여러 파티션에 걸쳐 요소들을 재배치한다. 태스크는 데이터 로드와 일련의 변환/액션의 적용을 파티션 단위로 실행하는 처리 단위인 만큼, 셔플 전후의 RDD를 구성하는 파티션은 각각 다른 태스크로 처리해야 한다.


태스크는 executor에서 처리되므로, 셔플을 실행한다는 것은 곧 executor 간의 다대다 네트워크 통신이 필요해져, 애플리케이션 전체를 통틀어 비교적 부하가 높은 처리가 된다는 의미다. 변환이 셔플을 발행시키지 않을 경우에는 변환 전후의 파티션끼리 동일한 태스크에 포함시킬 수 있으므로 네트워크 통신을 억제할 수 있다. 


파티션에 데이터를 로드하는 태스크부터 차례로 실행되도록 스케줄링함으로써, RDD 생성을 시작으로 일련의 변환/액션 적용까지 클러스터 환경에서 실행할 수 있는 것이다. 태스크가 할당된 executor는 해당 태스크에 포함된 첫 파티션부터 마지막 파티션까지 순서대로 인스턴스화해 처리한다.


스파크에서는 executor에 장애가 발생하는 등의 이유로 태스크 실행에 필패하면 다른 executor로 재실행된다. 이 경우에도 태스크 내용을 바탕으로 파티션을 순서대로 인스턴스화해 처리한다. 스파크는 이것을 recomput라고 한다.


RDD는 immutable한 특징을 가지므로, recompute에 의해 동일한 변환/액션이 적용되더라도 같은 결과를 얻을 수 있다.


immutable: 한번 생성된 인스턴스는 임의로 값을 바꿀 수도 바뀌지도 않기 때문에 참조 전용 객체로서의 특징을 갖게 된다. 한 처리(스테이지)에서 장애가 발생했을 때 해당 스테이지 상에서 사용된 RDD를 다 버리고, 다시 처음부터 동일한 처리를 해도 원래 기대했던 값을 다시 얻을 수 있다. Resilient Distributed Dataset의 Resilent = Fault Tolerant(장애 대응이 뛰어남)를 실현하는 중요한 특성이다. 이 외에도 캐시를 할 때 원래 데이터와의 정합성 문제가 전혀 발생하지 않으므로(원래 데이터는 바뀌지 않으므로 원래 데이터와 캐시는 항상 동일함_ 효율적인 캐시 기능을 실현할 수 있는 특징을 제공하기도 한다. -> 다만, 이를 위해서는 데이터소스 역시 불변이여야 한다.



[RDD 영속화]

executor는 태스크 처리를 끝낼때, 처리 과정에서 생성된 파티션 인스턴스를 영속화하는 경우가 있다. 스파크에서는 RDD가 다음 조건 중 어느 하나라도 만족하면 영속화된다.


셔플이 발생하는 변환을 실행하기 직전의 RDD

셔플을 실행할 때, 변환하기 전의 파티션을 처리하는 executor는 파티션 내용을 로컬 디스크에 저장함으로써 파티션을 영속화한다.


사용자에 의해서 명시적으로 영속화가 선언된 RDD

사용자가 드라이버 프로그램에서 '특정한 RDD를 영속화한다'라고 선언할 수 있는데, 이 선언에 대해 executor는 해당 RDD에 포함된 파티션의 영속화를 시도한다.


만약 RDD의 파티션이 하나라도 영속화 되지 않으면, 해당 RDD 전체가 영속화되지 않은 것으로 간주된다. 애플리케이션에서 RDD를 재사용할 경우에는 해당 태스크를 다시 한번 스케줄링해 클러스터 환경에 다시 데이터를 로드하고 변환해야 하므로 비효율적이다.


한편, RDD의 파티션이 전부 영속화되면 RDD 자체가 영속화된 것으로 간주한다. RDD가 영속화되면, 해당 RDD의 인스터를 다시 만들기 위해 변환 대상 RDD를 인스턴스화 하거나 데이터를 로드하는 등의 처리를 생략할 수 있다.


예를 들면 executor가 고장나서 셔플처리할 때 영속화한 파티션의 일부가 부족해지거나, 명시적인 영속화를 선언했을 때 일부 executor가 영속화에 실패했을 경우, RDD는 부분적으로 영속화한 상태가 된다. 이 경우에는 일부 파티션만 다시 인스턴스화 하도록 태스크가 스케줄링된다. 그리고 해당 파티션의 처리를 포함하는 태스크가 할당된 EXECUTOR는 다시 영속화를 시도한다.


org.apache.spark.storage.StorageLevel의 영속화 레벨: 데이터를 두 개의 머신에 복제하고 싶다면 각 레벨명 뒤에 _2를 붙이면 된다.


MEMORY_ONLY / MEMORY_ONLY_SER / MEMORY_AND_DISK / MEMORY_AND_DISK_SER / DISK_ONLY


처음 액션을 수행하기전에 persist()를 호출하면 된다. persist() 호출은 연산을 강제로 수행하지는 않는다. 만약 메모리에 많은 데이터를 올리려고 시도하면 스파크는 LRU(Least Recently Used) 캐시 정책에 따라 오래된 파티션들을 자동으로 버린다. 디스크와 메모리를 같이 쓰는 영속화 수준에서는 이것들을 디스크에 쓰지만, 메모리만 쓰는 정책에서는 다음에 파티션에 접근할 때 스파크는 그 파티션들을 다시 계산한다. 어느 쪽이든 스파크에 너무 많은 데이터의 캐싱을 요청했다고 해서 무언가 작업에 문제가 생길지를 고민하지 않아도 된다. 하지만 자주 필요하지 않은 데이터를 캐싱한다면 막상 필요한 데이터의 파티션이 메모리에서 내려가 버리고 자주 재계산되는 경우가 발생할 수 있다.


마지막으로, 직접 캐시에서 데이터를 삭제할 수 있도록 RDD들은 unpersist()라는 메소드도 갖고 있다.




RDD는 단순하게는 분산되어 존재하는 데이터 요소들의 모임이다. 스파크에서의 모든 작업은 새로운 RDD를 만들거나, 존재하는 RDD를 변형하거나, 결과 계산을 위해 RDD에서 연산을 호출하는 것 중의 하나로 표현된다. 그리고 내부적으로 스파크는 자동으로 RDD에 있는 데이터들을 클러스터에 분배하며 클러스터 위에서 수행하는 연산들을 병렬화한다.


[RDD 기초]

스파크의 RDD는 분산되어 있는 변경 불가능한 객체 모음이다. 각 RDD는 클러스터의 서로 다른 노드들에서 연산 가능하도록 여러개의 파티션(partition)으로 나뉜다. RDD는 외부 데이터세트를 로드하거나 드라이버 프로그램에서 객체 컬렉션을 분산시키는 두 가지 방법 중의 하나로 만들 수 있다.


[lazy evaluation]

아무때나 새로운 RDD를 정의할 수는 있지만, 스파크는 그것들을 늘 lazy evaluation으로 처음 액션을 사용하는 시점에 처리한다. 이런 방식은 처음 접하면 익숙해지기 어렵겠지만, 빅데이터로 작업하다 보면 이것이 매우 합리적임을 깨닫게 된다.


예를 들어, 텍스트 파일을 정의하고 Python 단어가 들어간 라인을 필터링하는 코드가 있다고 생각하보자. lines = sc.textFile(...)라는 코드를 입력하자마자 스파크가 파일의 모든 라인을 로드해서 저장해 놓았더라면 상당한 스토리지 공간의 낭비일 수 밖에 없으며, 당장 그 수많은 라인을 필터링해야 할 처지가 될 것이다. 대신에 스파크는 한번에 모든 트랜스포메이션끼리의 연계를 파악한다면 결과 도출에 필요한 데이터만을 연산하는 것이 가능하다. 사실 first() 액션에서도 스파크는 처음 일치하는 라인이 나올때까지만 파일을 읽을 뿐 전체 파일을 읽거나 하지는 않는다.


여유로운 수행이란 RDD에 대한 트랜스포메이션을 호출할때 그 연산이 즉시 수행되는 것이 아니라는 뜻이다. 대신 내부적으로 스파크는 메타데이터(metadata)에 이런 연산이 요청되었다는 사실만을 기록한다. 그러므로 RDD가 실제로는 어떤 특정 데이터를 갖고 있는게 아니라 트랜스포메이션들이 생성한 데이터를 어떻게 계산할지에 대한 명령어들을 갖고 있다고 생각하는 것이 구조를 이해하기에 더 쉽다. RDD에 데이터를 로드하는 것도 트랜스포메이션과 마찬가지로 여유롭게 수행된다. 그러므로 sc.textFile()을 호출했을때 실제로 필요한 시점이 되기 전까지는 로딩되지 않는다. 트랜스포메이션처럼 그런 연산들도 여러번 호출될 수있다.


스파크는 ㅇ녀산들을 그룹지어서 데이터를 전달해야 하는 횟수를 줄이기 위해 여유로운 수행 방식을 사용한다. 하둡 맵리듀스 같은 시스템에서는 맵리듀스 데이터 전달 횟수를 줄이기 위해 어떤식으로 연산을 그룹화할지 고민하느라 개발자들이 시간을 많이 뺏긴다.(맵리듀스에서는 연산 개수가 많다는 것은 곧 네트워크로 데이터를 전송하는 단계가 많아짐을 의미한다) 스파크에서는 단순한 연산들을 많이 연결해서 사용하는 것이나 하나의 복잡한 매핑 코드를 쓰는 것이나 큰 차이가 없다. 그러므로 스파크 사용자들은 프로그램을 더 작게 만들고, 효율적인 연산의 코드를 만들어 내야 한다는 부담에서 자유롭다.



[RDD 영속화]

스파크의 RDD들은 기본적으로 액션이 실행될때마다 매번 새로 연산을 한다. 만약 여러 액션에서 RDD 하나를 재사용하고 싶으면 스파크에게 RDD.persist()를 사용하여 계속 결과를 유지하도록 요청할 수 있다. 또한, 메모리/디스크 등에 데이터를 보존해 주도록 스파크에 요청할 수도 있다. 


첫 연산이 이루어진 후 스파크는 RDD의 내용을 메모리에 저장하게 되며(클러스터의 여러 머신들에 나뉘어서), 이후의 액션들에서 재사용한다. 메모리 대신 디스크에 RDD를 저장하는 것도 가능하다. 그렇다면 저장을 아예 하지 않는 기본 동작 방식은 좀 이상해 보일 수도 있겠지만, 대량의 데이터에서는 충분히 의미가 있다. 


RDD를 재사용하지 않기로 한다면 스파크가 일회성 데이터를 가져와 결과만 계산하고 데이터를 저장할 필요가 없는 경우에 굳이 스토리지 공간을 낭비할 이유가 없다.




모든 스파크 프로그램과 셸의 세션은 다음과 같이 동작한다.


1. 외부 데이터에서 입력 RDD를 만든다.

2. filter() 같은 트랜스포메이션을 써서 새로운 RDD를 정의한다.

3. 재사용을 위한 중간 단계의 RDD들을 보존하기 위해 스파크에 persist()를 요청한다.

4. 스파크가 최적화한 병렬 연산 수행을 위해 count()나 first() 같은 액션을 시작한다.



[RDD 생성하기]

RDD를 만드는 가장 간단한 방법은 프로그램에 있는 데이터세트를 가져다가 SparkContext의 parallelize() 메소드를 넘겨주는 것이다. 이 방식은 셸에서 바로 자신만의 RDD를 만들 수 있고 연산을 수행할 수 있으므로 스파크를 배우는 단계에서 특히 유용하다. RDD를 만드는 더욱 일반적인 방법은 외부 스토리지에서 데이터를 불러오는 것이다.

val lines = sc.parallelized(List("pandas", "i like pandas"))
val lines = sc.textFile("/path/to/README.md")




[스파크에 함수 전달하기]

대부분의 트랜스포메이션과 액션 일부는 스파크가 실제로 연산할때 쓰일 함수들을 전달해야 하는 구조를 가진다. 스파크에 함수를 전달하는 구조는 언어마다 약간씩 다르지만, scala를 예제로 살펴보겠다.


스칼라에서는 스칼라의 다른 함수형 API처럼 인라인으로 정의된 함수나 메소드에 대한 참조, 정적 함수를 전달할 수 있다. 몇가지 주의사항이 필요한데 전달하는 함수나 참조하는 데이터들이 직렬화가 가능해야 한다는 점이다. 또한 파이썬에서처럼 self를 필수적으로 쓰지 않으므로 덜 명시적으로 보이긴하지만, 객체의 메소드나 필드를 전달하면 전체 객체에 대한 참조 또한 포함된다.


class SearchFunctions(val query: String) {
	def isMatch(s: String): Boolean = {
		s.contains(query)
	}
	def getMatchesFunctionReference(rdd: RDD[String]): RDD[Boolean] = {
		// 문제: "isMatch"는 "this.isMatch"이므로 this의 모든 것이 전달된다.
		rdd.map(isMatch)
	}
	def getMatchesFieldReference(rdd: RDD[String]): RDD[Array[String]] = {
		// 문제: "query"는 "this.query"이므로 this의 모든것이 전달된다.
		rdd.map(x => x.split(query))
	}
	def getMatchesNoReference(rdd: RDD[String]): RDD[Array[String]] = {
		// 안전함: 필요한 필드만 추출하여 지역 변수에 저장해 전달한다.
		val query_ = this.query
		rdd.map(x => x.split(query_))
	}
}


스칼라에서 NotSerializableException이 발생한다면 직렬화 불가능한 클래스의 메소드나 필드를 참조하는 문제일 가능성이 많다. 최상위 객체의 멤버인 지역 변수나 함수 내에서 전달하는 것은 항상 안전하다는 점을 기억하자.