반응형

jar로 작업된 스파크 프로젝트를 spark-submit 실행시 아래와 같은 에러가 발생했다.

RejectedExecutionException은 첨 겪어봐서 당황했지만....문제의 원인은 단순했다.

java.util.concurrent.RejectedExecutionException: Task org.apache.spark.scheduler.TaskResultGetter$$anon$2@ab4016a rejected from java.util.concurrent.ThreadPoolExecutor@9c5a328[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 603]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)

at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)

프로젝트 빌드시 spark driver셋팅을 잘못해준것...

trait InitSpark {
  // for local
//  val sparkConf = new SparkConf().setAppName("Spark-1.6.x-sample").setMaster("local[*]").set("spark.driver.host", "localhost");
  // for build
  val sparkConf = new SparkConf().setAppName("Spark-1.6.x-sample")
  val sc = new SparkContext(sparkConf)
  val sqlContext = new HiveContext(sc)
}

로컬모드 (setMaster("local[*]").set("spark.driver.host", "localhost") 로 빌드하고 돌려서 문제가 되었다...

보통은 작업한 스파크 버전과 spark-submit을 실행하는 환경의 스파크 버전이 달라도 발생하는 에러메세지라고 한다.

반응형
반응형


스파크(Spark) 작업 중 평소에 잘돌던 잡이 갑자기


특정 노드에서의 문제로 작업이 중단됬거나 제대로 돌지 않았을 경우는 해당 노드의 디스크폴트(disk fault)를 의심해보아야 한다.


최근 잡이 실패해 해당 잡의 로그를 보았더니 다음과 같은 에러가 찍혔다.


Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 1.0 failed 4 times, most recent failure: Lost task 8.3 in stage 1.0 (TID 374, datanode-10.svr.maker.net):

com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is corrupted

        at com.esotericsoftware.kryo.io.Input.fill(Input.java:142)

        at com.esotericsoftware.kryo.io.Input.require(Input.java:155)

        at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337)

        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)

        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)

        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)

        at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:192)

        at org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169)

        at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201)

        at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198)

        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)

        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)

        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

        at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)

        at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)

        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)

        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

        at org.apache.spark.scheduler.Task.run(Task.scala:88)

        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)

Caused by: java.io.IOException: Stream is corrupted

        at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:153)

        at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:117)

        at com.esotericsoftware.kryo.io.Input.fill(Input.java:140)

        ... 27 more


데이터노드에서 com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is corrupted exception이 발생하며


잡이 정상적으로 돌지 않았다. 그래서 해당 서버를 모니터링 하는 사이트에 들어가서 확인해봤더니 딱히 별다른 이슈가 없었지만 이전에도


상태는 정상이나 disk fault로 비슷한 상황이 발생한 적이 있었기에 시스템팀에 문의해보았더니 역시나 디스크폴트(disk fault)문제였다.


정확히는 하드웨어 폴트는 확인 되지 않고 OS에서만 컨트롤러 및 디스크 장애가 있다고 전달받았다.


따라서 평소에 잘 돌던 작업이 특정노드를 원인으로 돌지않았다면 장비를 의심해보시길!!!

반응형
반응형

스파크(SPARK)가 설치된 서버에서 스파크 버전이 확인 하고 싶을 떄


spark-submit --version


으로 확인할 수 있다.


감사합니당 :)


반응형
반응형


2018년 개발자 라이프 회고 (데이터엔지니어)

앞으로 조금 귀찮고 힘들더라도 개발자로서 한 해를 마무리하는 글과 새해 목표에 대해서 남겨보려고 한다.

크게 전공관련 목표는 네 가지 정도로 세웠던 것 같다.


1. 블로그 꾸준히 운영하기

일년동안 총 56개의 기술포스팅을 진행했다. 목표치에는 부족했지만 꾸준히 쓰려고 노력했다. 예전 포스팅을 너무 잘작성하려는 욕심 때문인지 어느 순간부터 글쓰는데 대한 부담감을 느끼고 한동안 글을 쓰지 않았던 적이 있다. 그 이후로 네이버 블로그에서 티스토리로 넘어오면서는 너무 포스팅을 잘하려고?심도있게 잘 작성해야한다는 압박으로부터 벗어나 간단하게라도 포스팅을 하자라고 생각이 바뀌었다. 포스팅에 대한 부담감을 느끼지 않고 꾸준히 하는것이 중요하다고 생각했기 때문이다. 주로 실무에서 삽질한 경험, 새롭게 알게된 지식, 책 학습을 통한 내용을 포스팅했다. 내년에는 IT기술 및 개발자의 삶 전반에 대한 고찰과 생각들도 글로 써보고 싶다. 

일년동안 3만 명이 넘는 분들이 블로그에 방문해 주셨고 총 4만5천 페이지 뷰가 발생하였다. 아무래도 심도있는 포스팅이 많지 않고 다른 연관관계에 있는 글들이 많지 않아 방문자수에 비해 페이지수가 낮게 집계된 듯 하다. 앞으로는 연관 포스팅에는 링크도 걸고 포스팅의 질도 높여 세션시간과 방문자수 대비 페이지뷰가 더 늘어날 수 있도록 실행해 보아야겠다. 내가 다른 분들의 블로그들을 통해 도움을 받고 지식을 얻듯 다른 분들도 내 블로그를 통해 도움을 받을 수 있다면 좋겠다.


2. 토이프로젝트 운영하고 광고수익 창출하기

실제로 토이프로젝트를 운영해보고 싶다는 생각은 일을 시작하고 2년차쯤부터 계속해서 가지고 있었다. 그 생각이후 2년 후에 실행하게 된데 대해 반성해본다. 지금은 개발자로 일한지 5년차이다. 토이프로젝트로 무엇을 만들어볼까 하다가 2018년 초기 당시 열풍이 불었던 코인정보들을 한데 모아 보여주는 사이트를 운영해보면 재미있을 것 같다는 생각이 들었다. 그렇게 2018년 1월 중순 회사퇴근하고 새벽 2~3시까지 개발을 했고 약 2주 정도에 걸쳐 사이트를 완성하고 오픈하게 되었다. 최대한 페이지 정보를 가리지 않는선에서 광고도 달아보았다. 그렇게 구글 애드센스를 통해 벌어들인 수익은 약 700달러 정도 되었고 중간에 페이지에 배너광고를 달고 싶다는 요청에 30만원을 받고 게재를 해주었다. 

돈의 액수를 떠나 토이프로젝트를 통한 광고 수익이 발생했다는 것에 가장큰 기쁨을 느꼈다. 그리고 사이트를 운영하는 것은 생각보다 더 힘들다는 것과 홍보 및 마케팅 분야에 대한 중요성에 대해서도 느끼게 된 경험이였다. 2018년에는 또 다른 토이프로젝트를 진행해 볼 생각이다.


3. 개발자들을 위한 컨텐츠 제작

외국에는 개발자들을 위한 유머? 컨텐츠들이 많은 것 같은데 국내에서는 많이 보지 못한것 같아 운영해보고 싶다는 생각이 들었다. 그리고 나 자체가 좀 엉뚱한 생각을 많이하기도 하고 내가 괜찮다고 생각이드는 아이디어가 남들에게는 어떻게 반응할지에 대해 궁금하기도 하였다. 인스타 계정 @happydeveloper 을 새로 하나 만들고 현재 계속해서 운영중이다. 욕심 부리지 않고 내 머릿속에 있는 생각들을 조금씩 컨텐츠로 만들어나가도록 해야겠다.


4. Scala, Spark에 대한 심도 있는 학습

사실 제일 아쉬운 부분이 이부분이다ㅎㅎ생각만큼 스칼라공부를 심도 있게 하지 못했고 기존 운영하던 Spark프로젝트를 계속해서 유지보수하고 기능을 추가하였지만 애초 목표였던 java spark -> scala spark으로 프로젝트를 변경해보지 못했다. 일단 이 부분은 업무의 영역과도 관련있기 때문에 내 마음대로 진행하지 못한 점이 크지만 많이 아쉬움으로 남는다. 공부는 끝이 없다....내년에는 scala도 좋지만 원초적인 프로그래밍에 필요한 기본적인 지식들을 좀 더 심도있게 쌓는데 중점을 두고 싶다.


이렇게 2018년도 가고 내일이면 2019년의 시작이다. 2018년 개인적으로 굉장히 다사다난한 일들이 많이 발생했었다. 그럼에도 불구하고 이정도의 실행을 할 수 있었던 건 연초에 목표를 세우고 눈에 보이는 곳에 항상 붙여놓았던 부분이 크다고 생각한다. 2019년 목표도 정리해서 포스팅 할 수 있도록 해야겠다. 

'어디로 가고 있는지 모르고 있다면, 결국 가고 싶지 않은 곳으로 간다.'

긴 글 읽어주셔서 감사합니다.







반응형
반응형


오늘은 스파크의 넓은 종속성(narrow dependency)와 좁은 종속성(wide dependency)에 대해서 포스팅 하도록 하겠습니다.


해당 내용은 '하이 퍼포먼스 스파크'와 '빅데이터 분석을 위한 스파크2 프로그래밍'의 내용을 정리한 것입니다.


[ 좁은 종속성(narrow dependency) ]

개념적으로 좁은 종속성은 자식 RDD의 각 파티션이 부모 RDD의 파티션들에 대해 단순하고 한정적인 종속성을 가지는 것이다. 디자인 시점에 종속성을 결정할 수 있고, 부모 파티션의 값과 상관이 없으며, 각각의 부모가 최대 하나의 자식 파티션을 가진다면 이 종속성은 좁다고 할 수 있다. 특히 좁은 트랜스포메이션의 파티션들은 하나의 부모 파티션에만 종속되거나(map 연산) 디자인 시점에 알게 된 부모 파티션들 중 알려진 일부에만 종속된다(coalesce). 그러므로 좁은 트랜스포메이션은 다른 파티션의 정보를 필요로 하지 않고 데이터의 임의의 부분에 대해 실행이 가능하다.

// 좁은 종속성. rdd에 map 연산으로 (x, 1) 의 튜플로 만든다.

val rdd2 = rdd1.map(x => (x, 1))


[ 넓은 종속성(wide dependency) ]

넓은 종속성을 가지는 트랜스포메이션은 임의의 데이터만으로 실행할 수는 없으며, 특별한 방법, 예를 들면 키의 값에 따라 파티셔닝된 데이터를 요구한다. (결국 키의 재분포, 즉 셔플이 필요하다는 의미). 일례로 sort같은 경우 같은 범위의 키들이 같은 파티션 안에 존재하도록 레코드들을 파티셔닝해야 한다. 넓은 종속성의 트랜스포메이션은 sort, reduceByKey, groupByKey, join 그리고 rePartition 함수를 호출하는 모든 것을 아우른다.

// 넓은 종속성, groupKey

val rdd3 = rdd2.groupByKey()


특별한 경우로는 스파크가 이미 데이터가 어떤 특정한 방법으로 파티션되어 있다는 것을 갈고 있다면 넓은 종속성의 연산이라도 셔플링을 하지 않는다. 대개 셔플링은 비싼 비용을 치러야 하고 셔플 과정에서 새로운 파티션으로 옮겨야 하는 데이터의 비율이 높을수록 이 비용은 더 비싸지게 마련이다. 


정리하자면, 하나의 RDD가 새로운 RDD로 변환될 때 기존 RDD를 부모 RDD, 새로운 RDD를 자식 RDD라고 하겠습니다. 이때 부모 RDD를 구성하는 파티션이 여러 개의 자식 RDD 파티션과 관계를 맺고 있으면 넓은 의존성을 갖고 있다고 말하고, 그 반대의 경우는 좁은 의존성을 갖고 있다고 표현할 수 있겠습니다.





포스팅을 마치도록 하겠습니다.


도움이 되셨다면 광고 한 번 클릭해주시는 센스 감사합니다:)

반응형
반응형



이번 포스팅은 저번 포스팅(스파크 설정 Part.1)에 이어 spark-submit 실행시 메모리, 익스큐터, 네트워크, 보안,암호화 관련 설정에 대해 정리해보겠습니다. 해당 내용은 '빅데이터 분석을 위한 스파크2 프로그래밍' 책의 내용을 기반으로 정리하였습니다.


[ 메모리 관련 설정 ]

  • spark.memory.fraction : 전체 힙 영역에서 익스큐터와 RDD 데이터 저장에 사용될 크기를 비율로 설정합니다. 기본값은 0.6이며 스파크 내부에서 사용하는 메타데이터나 객체 직렬화 및 역질렬화 등에 필요한 예비 메모리 공간을 확보해서 OOM을 방지할 목적으로 이 값을 조정할 수 있습니다.
  • spark.memory.storageFraction : 할당된 메모리에서 데이터 저장에 사용할 비율을 지정할 수 있습니다. 기본값은 0.5이며 이 값을 크게 할 경우 익스큐터에서 사용할 메모리 크기를 줄여야 합니다.
  • spark.memory.offHeap.enabled : 기본값은 false이며 true로 설정할 경우 off-heap메모리를 사용합니다. 이 값을 true로 설정했다면 spark.memory.offHeap.size에 오프-힙 메모리 크기를 지정해야 합니다.

[ 익스큐터 관련 설정 ]
  • spark.executor.cores : 익스큐터에 할당된 코어의 수를 지정합니다. 지정하지 않을 경우 얀 모드에서는 1, 스탠드얼론 모드와 메소스 coarse-grained모드에서는 사용 가능한 전체 코어의 개수가 사용됩니다.
  • spark.default.parallelism : 스파크에서 사용할 파티션의 수, 즉 스파크의 기본 병렬 처리 수준을 지정합니다.
  • spark.files.fetchTimeout : sparkContext.addFile() 메서드를 이용했을 때 드라이버로부터 파일을 받아오는 데 걸리는 최대 시간을 설정합니다. 기본값은 60s 입니다.

[ 네트워크 관련 설정 ]
  • spark.driver.host, spark.driver.port : 드라이버 프로세스의 호스트와 포트 정보를 설정합니다.
  • spark.network.timeout : 스파크의 기본 네트워크 타임아웃을 설정합니다. 이 값은 spark.core.connection.ack.wait.timeout 등 다른 설정 값들의 기본값으로 사용됩니다.

[ 보안 관련 설정 ]
  • spark.acls.enable : 스파크 acl을 활성화할지 여부를 설정합니다. 기본값은 false입니다.
  • spark.admin.acls : 스파크 잡에 접근할 수 있는 사용자(user)와 관리자(administrator) 정보를 설정하며, 콤마(,)를 이용해 다수의 사용자를 지정할 수 있습니다. 만약 그룹으로 설정할 경우 spark.admin.acls, groups 속성을 사용할 수 있습니다.
  • spark.authenticate : 스파크에서 사용자 인증 여부를 확인할 것인지를 설정합니다. 기본 값은 false이며, 이 경우 인증 여부와 상관없이 스파크 잡을 실행하고 접근할 수 있습니다.
  • spark.authenticate.secret : 잡을 실행하기 위한 비밀 키 정보를 설정합니다.
  • spark.ui.view.acls,spark.ui.view.acls.groups : 스파크 UI에서 잡 정보를 조회하기 위한 acl 정보를 설정합니다.
  • spark.ui.filters : 스파크 UI에 적용할 자바 서블릿 필터 정보를 지정합니다. 콤마(,)를 이용해 여러 개의 필터를 지정할 수 있으며, 자바 시스템 프로퍼티를 사용해 필터에서 사용할 파라미터 정보를 지정할 수 있습니다. 

[ 암호화 관련 설정 ]
  • spark.ssl.enabled : 기본값은 false이며 SSL 연결을 활성화할 것인지 설정합니다.
  • spark.ssl.keyStore : 키 스토어 파일이 저장된 경로를 지정합니다.
  • spark.ssl.keyStoreType : 키 스토어 파일의 타입을 지정합니다.
  • spark.ssl.keyStorePassword : 키 스토어 파일에 대한 비밀번호를 지정합니다.
  • spark.ssl.enabledAlgorithms : ssl을 위한 알고리즘(cipher) 리스트를 지정합니다. 콤마(,)를 이용해 여러 개 지정할 수 있습니다.

보안, 암호화 관련 설정은 거의 작업해 본적이 없는 것 같네요...보통 사용하는 하둡 클러스터 장비들이 사내 네트워크망에서만 접근 가능하도록 되어있어서ㅎㅎ

이상으로 포스팅을 마치도록 하겠습니다.


반응형
반응형

이번 포스팅에서는 spark-submit 실행시 스크립트상에서 설정할 수 있는 방법에 대해 정리하도록 하겠습니다.


해당 내용은 '빅데이터 분석을 위한 스파크2 프로그래밍' 책의 내용을 기반으로 정리하였습니다.


[ 애플리케이션 관련 설정 ]

  • spark.app.name : 애플리케이션 이름. SparkConf의 appName으로 설정하는 것과 같은 속성
  • spark.driver.cores : 드라이버가 사용할 코어 수. 클러스터 모드에서만 사용 가능하며 기본값은 1입니다.
  • spark.driver.maxResultSize : collect() 메서드 등의 호출 결과로 생성된 결과 값의 최대 크기입니다. 최소 1M 이상으로 지정해야 하며, 이 값을 초과할 경우 전체 잡은 실패로 종료됩니다. 기본값은 1g입니다.
  • spark.driver.memory : 드라이버가 사용할 메모리 크기이며, 클라이언트 모드에서 사용할 경우 반드시 SparkConf가 아닌 --driver-memory 실행 옵션이나 프로퍼티 파일을 사용해서 지정해야 합니다. 기본값은 1g입니다.
  • spark.executor.memory : 익스큐터 하나의 메모리 크기를 지정합니다. 기본값은 1g입니다.
  • spark.local.dir : RDD 데이터를 디스크에 저장하거나 셔플 시 매퍼의 결과를 저장하는 디렉터리를 지정합니다. 콤마(,)를 이용해 여러 위치를 지정할 수 있으며, 성능에 큰 영향을 주므로 반드시 빠른 로컬 디스크를 사용해야 합니다. 기본값은 /tmp 입니다.
  • spark.master : 클러스터 매니저 정보를 지정합니다.
  • spark.submit.deployMode : 디플로이 모드를 지정합니다. client 또는 cluster 모드를 사용할 수 있습니다.

[ 실행환경(Runtime Enviroment) 관련 설정 ]
  • spark.driver.extraClassPath : 드라이버 클래스패스에 추가할 항목을 지정합니다. 이 속성은 SparkConf가 아닌 --driver-memory 실행 옵션이나 프로퍼티 파일을 사용해서 지정해야 합니다. 유사한 속성으로 spark.driver.extraJavaOptions, spark.driver.extraLibraryPath가 있으며 각각 드라이버 실행 시 필요한 자바 옵션과 라이브러리 정보를 지정하는 용도로 사용됩니다.
  • spark.executor.extraClassPath : 익스큐터의 클래스패스에 추가할 항목을 지정합니다. 유사한 속성으로 spark.executor.extraJavaOptions와 spark.executor.extraLibraryPath가 있습니다.
  • spark.files, spark.jars : 각 익스큐터의 실행 디렉터리에 위치할 파일들 또는 jar 파일들을 지정하며, 콤마(,)를 이용해 여러 파일을 지정할 수 있습니다.
  • spark.submit.pyFiles : PYTHONPATH에 추가될 .zip, .egg, .py 파일을 지정하며, 콤마(,)를 이용해 여러 파일을 지정할 수 있습니다.
  • spark.jars.packages : 익스큐터와 드라이버의 클래스패스에 추가될 의존성 jar정보를 메이븐 코디네이트 형식으로 지정 할 수 있습니다.

[ 셔플 관련 설정 ] 
  • spark.reducer.maxSizeInFlight : 셔플 수행 시 각 리듀서가 매퍼의 실행 결과를 읽어갈 때 사용할 버퍼의 크기를 지정합니다. 기본값은 48m입니다.
  • spark.reducer.maxReqslnFlight : 리듀서에서 매퍼의 결과를 가져갈 때 동시에 수행 가능한 최대 요청 수를 지정합니다. 기본값은 int.MaxValue입니다.
  • spark.shuffle.compress : 맵의 결과를 압축할 것인지에 대한 설정입니다. true로 설정할 경우 spark.io.compress.codec에 지정한 압축 코덱을 사용해 압축합니다.
  • spark.shuffle.service.enabled : 외부 셔플 서비스를 사용할 것인지 여부를 지정합니다. 이와 관련된 내용은 이후의 동적 자원 할당 부분에서 다시 확인해 보겠습니다. 기본값은 false이며 true로 설정할 경우 외부 셔플 서비스를 사용하게 됩니다.

[ 스파크 UI 관련 설정 ] 
  • spark.eventLog.enabled : 스파크 이벤트 관련 로깅을 수행할 것인지를 설정합니다. 기본 값은 false이며 true로 설정할 경우 spark.eventLog.dir에 로깅을 수행할 경로를 지정해야 합니다. 이벤트 로깅을 활성화할 경우 종료된 애플리케이션에 대한 상세 실행 히스토리 정보를 스파크 UI에서 확인할 수 있습니다. 
  • spark.ui.port : 스파크 UI 포트를 지정합니다. 기본값은 4040입니다.
  • spark.ui.killEnabled : 스파크 UI를 통해 잡을 중지(kill)시킬 수 있도록 할 것인지 설정합니다. 기본값은 true입니다.
  • spark.ui.retainedJob : 종료된 잡에 대한 정보를 몇 개까지 유지할 것인지 설정합니다. 유사한 옵션으로 spark.ui.retainedStages, spark.ui.retainedTasks, spark.ui.retainedExecutors, spark.ui.retainedDrivers, spark.ui.retainedBatches 등이 있습니다.

[ 압축 및 직렬화(Serialization) 관련 설정 ]
  • spark.broadcast.compress : 브로드캐스트 변수의 값을 압축할 것인지 설정합니다. 기본값은 true입니다.
  • spark.io.compression.codec : 브로드캐스트 변수나 셔플을 위한 중간 결과물 등 스파크 내부에서 사용하는 데이터를 압축할 때 사용할 압축 코덱을 지정합니다. l4z, lzf, snappy를 사용할 수 있으며 기본값은 lz4입니다.
  • spark.kyro.classesToRegister : Kyro 직렬화를 위해 등록할 커스텀 클래스 정보를 지정합니다. 만약 클래스 등록 방식을 좀 더 커스텀하게 진행하고자 한다면 spark.kyro.registrator를 사용할 수 있습니다.
  • spark.serializer : 스파크에서 사용할 객체 직렬화 방식을 설정합니다. org.apache.spark.Serializer의 하위 클래스를 지정할 수 있으며, 현재 스파크에서는 JavaSerializer와 KyroSerializer라는 두 클래스를 제공하고 있습니다. 


다음 메모리 관련 설정, 익스큐터 관련 설정, 네트워크 관련 설정, 보안 관련 설정, 암호화 관련 설정은 다음 포스팅에서 하도록 하겠습니다.


도움이 되셨다면 광고도 한 번 클릭해주시는 센스^_^

반응형
반응형

해당 내용은 '빅데이터 분석을 위한 스파크2 프로그래밍' 책의 내용을 정리한 것입니다.


실제로 실무에서 스파크로 작업된 결과를 hdfs에 남기기전에 coalesce명령어를 써서 저장되는 파일의 개수를 지정해주곤 했다.


업무에서 사용하긴 했지만 실제 repartition연산과 어떤점이 다른지 모르고 사용했었는데 책을 보며 알게되어 기록.


핵심은 셔플을 하느냐 안하느냐!!!


coalesce와 repartition

RDD를 생성한 뒤 filter()연산을 비롯한 다양한 트랜스포메이션 연산을 수행하다 보면 최초에 설정된 파티션 개수가 적합하지 않은 경우가 발생할 수 있다.

이 경우 coalesce()나 repartition()연산을 사용해 현재의 RDD의 파티션 개수를 조정할 수 있다.


두 메서드는 모두 파티션의 크기를 나타내는 정수를 인자로 받아서 파티션의 수를 조정한다는 점에서 공통점이 있지만 repartition()이 파티션 수를 늘리거나 줄이는 것을 모두 할 수 있는 반면 coalesce()는 줄이는 것만 가능하다!!!


이렇게 모든 것이 가능한 repartition()메서드가 있음에도 coalesce()메서드를 따로 두는 이유는 바로 처리 방식에 따른 성능 차이 때문이다. 즉, repartition()은 셔플을 기반으로 동작을 수행하는 데 반해 coalesce()는 강제로 셔플을 수행하라는 옵션을 지정하지 않는 한 셔플을 사용하지 않기 때문이다. 따라서 데이터 필터링 등의 작업으로 데이터 수가 줄어들어 파티션의 수를 줄이고자 할 때는 상대적으로 성능이 좋은 coalesce()를 사용하고, 파티션 수를 늘여야 하는 경우에만 repartition() 메서드를 사용하는 것이 좋다.


오우.....이런 중요한 차이점이 있었다니....그렇다면 coalesce를 사용하면 셔플을 발생시키지 않기때문에 파티션마다 데이터의 사이즈가 다를꺼고 hdfs write했을때 repartition으로 개수를 조정한것과는 다르게 사이즈가 뒤죽박죽이겠네?!!! (나중에 시간되면 테스트해보자)


[ 업데이트 내용 ] 

댓글에서 관련내용에 대해 적어주신분이 있어 확인할겸 관련 내용 업데이트 합니다.


실제 repartition내부는 coalesce메소드를 호출하는 형태로 되어있습니다.


coalesce내부 소스코드도 올려봅니다.

소스코드의 주석을 보면 'This results in a narrow dependency' 좁은 의존성을 초래한다고 적혀 있는데 관련해서는 따로 포스팅하도록 하겠습니다.

그리고 위에서는 coalesce는 파티션 수를 줄이는 것만 가능하다고 적어놨지만 'true'옵션을 주면 늘리는 것 또한 가능하네요.

하지만 기존 처리하던 partitions의 개수보다 많은 파티션수로 처리할 경우에는 반드시 shuffle옵션을 true로 주셔야합니다(매개변수로 넘겨주면됨)


이상으로 포스팅을 마치도록 하겠습니다.


도움이 되셨다면 광고도 한 번 클릭해주시는 센스^_^

반응형
반응형

'빅데이터 분석을 위한 스파크2 프로그래밍'책의 내용을 정리한 포스팅입니다.



RDD란?

스파크가 사용하는 핵심 데이터 모델로서 다수의 서버에 걸쳐 분산 방식으로 저장된 데이터 요소들의 집합을 의미하며, 병렬처리가 가능하고 장애가 발생할 경우에도 스스로 복구될 수 있는 내성을 가지고 있다. 즉, RDD란 스파크에서 정의한 분산 데이터 모델인데 내부에는 단위 데이터를 포함하고 있고 저장할 때는 여러 서버에 나누어 저장되며, 처리할 때는 각 서버에 저장된 데이터를 동시에 병렬로 처리할 수 있는 모델이다. 


RDD장점

데이터를 여러 서버에 나누어 저장하고, 처리하는 과정에서 일부 서버 혹은 데이터에 문제가 발생하더라도 스스로 에러를 복구할 수 있는 능력을 가지고 있는 데이터 모델이다.


RDD처리 방식

RDD에 속한 요소들은 파티션이라고 하는 더 작은 단위로 나눠질 수 있는데, 스파크는 작업을 수행할 때 바로 이 파티션 단위로 나눠서 병렬로 처리를 수행한다. 이렇게 만들어진 파티션은 작업이 진행되는 과정에서 재구성되거나 네트워크를 통해 다른 서버로 이동하는, 이른바 셔플링이 발생할 수 있다. 

이런 셔플링은 전체 작업 성능에 큰 영향을 주기 때문에 주의해서 다뤄야 하며, 스파크에서는 셔플링이 발생할 수 있는 주요 연산마다 파티션의 개수를 직접 지정할 수 있는 옵션을 제공한다.  (파티션의 수는 곧 데이터 처리에 참여하는 병렬 프로세스의 수이다. 즉, 하나의 데이터를 잘게 쪼개어 여러 개의 파티션을 만들면 여러 프로세스에서 동시에 작업을 처리해서 처리 속도가 증가할 수 있지만 이 정도가 지나치면 오히려 전체 성능을 떨어뜨리는 요인이 된다.


스파크의 장애시 RDD복구

하나의 RDD가 여러 파티션으로 나눠져 다수의 서버에서 처리되다 보니 작업 도중 일부 파티션에 장애가 발생해서 데이터가 유실될 수 있는데, 스파크는 손상된 RDD를 원래 상태로 다시 복원하기 위해 RDD의 생성 과정을 기록해 뒀다가 다시 복구해 주는 기능을 가지고 있다. RDD의  resilient라는 단어가 복구 능력을 의미하는데, 좀 더 정확하게 말하면 RDD에 포함된 데이터를 저장해 두는 것이 아니고 RDD를 생성하는 데 사용했던 작업 내용을 기억하고 있는 것이다. 그래서 문제가 발생하면 전체 작업을 처음부터 다시 실행하는 대신 문제가 발생한 RDD를 생성했던 작업만 다시 수행해서 복구를 수행한다.

정리하면, 스파크는 RDD가 생성되어 변경되는 모든 과정을 일일이 기억하는 대신 RDD를 한번 생성하면 변경되지 않는 읽기 전용 모델로 만든 후 RDD 생성과 관련된 내용만 기억하고 있다가 장애가 발생하면 이전에 RDD를 만들 때 수행했던 작업을 똑같이 실행해 데이터를 복구하는 방식을 사용한다. 이처럼 스파크에서 RDD 생성 작업을 기록해 두는 것을 리니지(linege)라고 한다.


도움이 되셨다면 광고도 한 번 클릭해주시는 센스^_^


반응형

+ Recent posts