반응형

jar로 작업된 스파크 프로젝트를 spark-submit 실행시 아래와 같은 에러가 발생했다.

RejectedExecutionException은 첨 겪어봐서 당황했지만....문제의 원인은 단순했다.

java.util.concurrent.RejectedExecutionException: Task org.apache.spark.scheduler.TaskResultGetter$$anon$2@ab4016a rejected from java.util.concurrent.ThreadPoolExecutor@9c5a328[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 603]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)

at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)

프로젝트 빌드시 spark driver셋팅을 잘못해준것...

trait InitSpark {
  // for local
//  val sparkConf = new SparkConf().setAppName("Spark-1.6.x-sample").setMaster("local[*]").set("spark.driver.host", "localhost");
  // for build
  val sparkConf = new SparkConf().setAppName("Spark-1.6.x-sample")
  val sc = new SparkContext(sparkConf)
  val sqlContext = new HiveContext(sc)
}

로컬모드 (setMaster("local[*]").set("spark.driver.host", "localhost") 로 빌드하고 돌려서 문제가 되었다...

보통은 작업한 스파크 버전과 spark-submit을 실행하는 환경의 스파크 버전이 달라도 발생하는 에러메세지라고 한다.

반응형
반응형

최근 사이즈가 큰 대략 두 데이터 각각 2TB (2000GB) 정도의 데이터의 ID값들을 JOIN시켜 얼마나 일치하는지 확인하는 작업을 진행하였다. 간만에 들어온 Adhoc요청에 요구사항 파악이 먼저였고 "사이즈가 커봤자 스파크가 executor개수와 core개수 excetuor memory설정만 맞춰주면 잘 돌리면 되겠다."라고 생각했었다.

 

하지만 사이즈가 크다보니 실제 운영 클러스터에서 spark-shell에 접속해 command를 날리는건 클러스터의 실제 운영되는 작업에 영향을 줄 수 있기에 일정 부분의 데이터들을 떼와 로컬 Spark 모듈 프로젝트를 통해 원하는대로 파싱하고 조인해서 결과값이 나오는지 먼저 확인하였다. 

 

두 데이터 각각 2TB들을 가공해 뽑아야 하는 조건은 15가지 정도 되었고 나는 해당 코드작업을 하고 jar파일로 말아 특정 리소스풀을 사용하는 조건과 스파크 설정값들을 알맞게 설정해 spark-submit을 실행할 예정이었다.

 

작업에 대해 설명하기 전에 클러스터 규모에 대해 간단히 언급하자면 실제 서비스를 운영하는 클러스터는 아니였고

Data Lake로 사용되는 데이터노드 5대로 이루어진 규모가 그렇게 크지 않은 클러스터였다.

가용 가능 최대 메모리는 300GB, VCore 개수는 130개 정도 규모

 

데이터  추출의 첫 번째 조건

특정 action을 했던 ID값들을 뽑아 distinct하고 counting하는 작업이었다.

물론 데이터사이즈가 크긴했지만 단순한 작업이였기에 그렇게 오래는 걸리지 않을거라고 예상했다 (20분 이내 예상)

 

실제 Spark-submit을 수행하였고 각 파일들을 읽어들여 카운팅 작업이 수행되는 것을 Spark에서 제공하는 Application UI을 통해 어떤 작업 현재 진행중인지 executor들을 잘 할당되어 일을 하고 있는지를 모니터링 하였다.

근데 이게 왠걸??? Couning을 하는 단계에서 실제 action이 발생했고 (Spark는 lazy연산이 기본) 시간이 너무 오래걸리는 걸 발견하였다...한 시간 이상이 지나도 끝나지 않은 것으로....기억한다.

출처 : https://www.dmitory.com/index.php?mid=issue&page=2&document_srl=69970864

 

따라서 이대로는 안되겠다 싶어 일단 요구사항들 중 한번에 같이 처리할 수 있는 그룹 세개로 나누고 기존 데이터를 가지고 처리할게 아니라 특정조건으로 filterling이 된 실제 필요한 데이터들만 가지고 작업을 해야겠다고 생각했다.

 

그래서 일단 원하는 조건으로 filterling을 한 후 해당 데이터들만 hdfs에 다시 적재했다.

 

실제 처리에 사용될 데이터 사이즈가 2~4G로 훅 줄었고 (다른 값들을 다 버리고 조건에 맞는 실제 ID값만 뽑아냈기 때문) 이제 돌리면 되겠다 하고 생각하고 돌렸다.

 

그런데...데이터 사이즈가 2~4G밖에 안되는 데이터들간 Join을 하고 Counting을 하는 작업이 무슨 20~30분이나 걸린단 말인가?....(1~3분 이내를 예상했음) 

 

그래서 이전에 비슷한 상황에서 실제 데이터들의 파티션이 데이터 사이즈에 비해 너무 많이 나누어져 있어서 Spark가 구동되며 execution plan을 세울 때 많은 task들이 발생되어 성능이 떨어졌던 기억이 있어 필터링 한 데이터들의 파티션 개수를 세어 보았더니...

 

사이즈가 2.6G가 밖에 안되는 실제 사용될 데이터의 파일의 개수가....각각 14393개와 14887개였다.....

 

이러니 task들이 몇 만개씩 생기지......😱

수행되어야 할 task 개수가 44878개....

 

 

해결방법

따라서 아 필터링된 아이디값의 데이터들을 hdfs로 쓸 때 개수를 줄여 쓰는 것 부터 다시 해야겠다라고 판단하고 rdd를 save하기전 coalesce(20)을 주어 14393개로 나뉘어진 파티션들을 20개의 파티션으로 나뉘어 쓰이도록 수정해 주었다.

 

그리고 ID값을 기준으로 두 데이터를 Join이후 distinct하고 counting하는 작업도 굉장히 늦었기에 join이후에 repartition으로 파티션의 개수를 줄여주고 counting을 하도록 수행해주었다.(실제 ID값으로 Join을 하게 되면 데이터가 줄어들기 때문에 많은 파티션을 유지할 필요가 없다.) coalesce와 repartition은 둘다 파티션의 개수를 조절해주는 메서드인데 차이가 궁금하신 분들은 이전 포스팅을 참고 바랍니다.

https://brocess.tistory.com/183?category=715036

 

[ Spark ] 스파크 coalesce와 repartition

해당 내용은 '빅데이터 분석을 위한 스파크2 프로그래밍' 책의 내용을 정리한 것입니다. 실제로 실무에서 스파크로 작업된 결과를 hdfs에 남기기전에 coalesce명령어를 써서 저장되는 파일의 개수를 지정해주곤 했..

brocess.tistory.com

 

이렇게 수정을 하고 수행하였더니 task개수도 확 줄어들고 수행시간도 5~7분이내에 다 끝나는 것을 확인할 수 있었다.

task들의 개수가 몇만개에서 몇 십 몇 백개 단위로 확 줄어든 것을 확인 할 수 있다.
작업 시간도 1~2시간 걸려도 안끝나던데 훅~줄었다.

 

Spark 데이터 처리 작업시 고려사항

간만에 데이터 추출작업을 하다보니 미처 늦게 인지한 부분도 없지 않아 있었다. 이렇게 사이즈가 큰 데이터를 처리할 때는 항상 파티션의 개수를 잘 조절해서 처리하길 바란다. 그리고 스파크 성능에 영향을 미치는 부분에는 많은 부분이 있지만 기본적으로 데이터들의 파티션과 spark를 수행할 때 기본적인 executor개수와 core의 개수 executor 메모리 설정만 적합하게 되도(이 적합하게가 힘듬...) 큰 문제 없이 사용할 수 있을 거라 생각한다.

반응형
반응형



이번 포스팅은 저번 포스팅(스파크 설정 Part.1)에 이어 spark-submit 실행시 메모리, 익스큐터, 네트워크, 보안,암호화 관련 설정에 대해 정리해보겠습니다. 해당 내용은 '빅데이터 분석을 위한 스파크2 프로그래밍' 책의 내용을 기반으로 정리하였습니다.


[ 메모리 관련 설정 ]

  • spark.memory.fraction : 전체 힙 영역에서 익스큐터와 RDD 데이터 저장에 사용될 크기를 비율로 설정합니다. 기본값은 0.6이며 스파크 내부에서 사용하는 메타데이터나 객체 직렬화 및 역질렬화 등에 필요한 예비 메모리 공간을 확보해서 OOM을 방지할 목적으로 이 값을 조정할 수 있습니다.
  • spark.memory.storageFraction : 할당된 메모리에서 데이터 저장에 사용할 비율을 지정할 수 있습니다. 기본값은 0.5이며 이 값을 크게 할 경우 익스큐터에서 사용할 메모리 크기를 줄여야 합니다.
  • spark.memory.offHeap.enabled : 기본값은 false이며 true로 설정할 경우 off-heap메모리를 사용합니다. 이 값을 true로 설정했다면 spark.memory.offHeap.size에 오프-힙 메모리 크기를 지정해야 합니다.

[ 익스큐터 관련 설정 ]
  • spark.executor.cores : 익스큐터에 할당된 코어의 수를 지정합니다. 지정하지 않을 경우 얀 모드에서는 1, 스탠드얼론 모드와 메소스 coarse-grained모드에서는 사용 가능한 전체 코어의 개수가 사용됩니다.
  • spark.default.parallelism : 스파크에서 사용할 파티션의 수, 즉 스파크의 기본 병렬 처리 수준을 지정합니다.
  • spark.files.fetchTimeout : sparkContext.addFile() 메서드를 이용했을 때 드라이버로부터 파일을 받아오는 데 걸리는 최대 시간을 설정합니다. 기본값은 60s 입니다.

[ 네트워크 관련 설정 ]
  • spark.driver.host, spark.driver.port : 드라이버 프로세스의 호스트와 포트 정보를 설정합니다.
  • spark.network.timeout : 스파크의 기본 네트워크 타임아웃을 설정합니다. 이 값은 spark.core.connection.ack.wait.timeout 등 다른 설정 값들의 기본값으로 사용됩니다.

[ 보안 관련 설정 ]
  • spark.acls.enable : 스파크 acl을 활성화할지 여부를 설정합니다. 기본값은 false입니다.
  • spark.admin.acls : 스파크 잡에 접근할 수 있는 사용자(user)와 관리자(administrator) 정보를 설정하며, 콤마(,)를 이용해 다수의 사용자를 지정할 수 있습니다. 만약 그룹으로 설정할 경우 spark.admin.acls, groups 속성을 사용할 수 있습니다.
  • spark.authenticate : 스파크에서 사용자 인증 여부를 확인할 것인지를 설정합니다. 기본 값은 false이며, 이 경우 인증 여부와 상관없이 스파크 잡을 실행하고 접근할 수 있습니다.
  • spark.authenticate.secret : 잡을 실행하기 위한 비밀 키 정보를 설정합니다.
  • spark.ui.view.acls,spark.ui.view.acls.groups : 스파크 UI에서 잡 정보를 조회하기 위한 acl 정보를 설정합니다.
  • spark.ui.filters : 스파크 UI에 적용할 자바 서블릿 필터 정보를 지정합니다. 콤마(,)를 이용해 여러 개의 필터를 지정할 수 있으며, 자바 시스템 프로퍼티를 사용해 필터에서 사용할 파라미터 정보를 지정할 수 있습니다. 

[ 암호화 관련 설정 ]
  • spark.ssl.enabled : 기본값은 false이며 SSL 연결을 활성화할 것인지 설정합니다.
  • spark.ssl.keyStore : 키 스토어 파일이 저장된 경로를 지정합니다.
  • spark.ssl.keyStoreType : 키 스토어 파일의 타입을 지정합니다.
  • spark.ssl.keyStorePassword : 키 스토어 파일에 대한 비밀번호를 지정합니다.
  • spark.ssl.enabledAlgorithms : ssl을 위한 알고리즘(cipher) 리스트를 지정합니다. 콤마(,)를 이용해 여러 개 지정할 수 있습니다.

보안, 암호화 관련 설정은 거의 작업해 본적이 없는 것 같네요...보통 사용하는 하둡 클러스터 장비들이 사내 네트워크망에서만 접근 가능하도록 되어있어서ㅎㅎ

이상으로 포스팅을 마치도록 하겠습니다.


반응형
반응형


현상황  : Cloudera(클라우데라) 버전(CDH 5.5.1, Parcel), Spark버전(1.5) - jdk version 1.7

필요상황 : 기존 작업을 Spark1.5(jdk1.7) - jdk 1.8로 돌리기

준비상황 :  클러스터의 각 노드들에 jdk1.8이 설치되어 있어야 함.


spark-submit스크립트에 jdk1.8 path를 명시

--conf "spark.yarn.appMasterEnv.JAVA_HOME=/home1/irteam/jdk/jdk1.8.0_141" \
--conf "spark.driverEnv.JAVA_HOME=/home1/irteam/jdk/jdk1.8.0_141" \
--conf "spark.executorEnv.JAVA_HOME=/home1/irteam/jdk/jdk1.8.0_141" \

이렇게 driver와 executor의 JAVA_HOME은 명시가 되었고

해당 스크립트가 돌아가는 client의 JAVA_HOME은 export로 변경해준다.


기존 단순 spark-submit명령어에서 앞에 다음과 같이 추가

export JAVA_HOME=/home1/irteam/jdk/jdk1.8.0_141 && spark-submit \


이렇게 설정을 해주게 되면 jdk1.8로 빌드된 코드도 기존 spark로 돌릴 수 있게 된다.


이런식의 설정으로 클라우데라 스파크 버전 또한 2점대로 높여 사용할 수 있겠다.


반응형
반응형


스파크(Spark) 스트리밍 성능(Processing Time) 개선

실시간 관심사 타겟팅 애플리케이션에서 더 많은 관심사 모수를 뽑아도록 작업한 로직을 배포하게되었다.

실시간 스파크 어플리케이션을 배포할 때 가장 유의해야 하는 것은 추가된 기능으로 인해 마이크로배치형태로 처리되는 작업들의 

수행시간이 현저히 길어지지 않았는가 모니터링하고 체크하는 것이다.

내가 이번에 추가한 로직도 성능에 영향을 미칠 것이라고 판단되어 배포전 실시간 스트리밍 처리 현황과 배포 후를 비교해보았다.


[ 새로운 기능이 추가되기 전 ] 


[ 새로운 기능이 추가된 후 ]


새로운 기능이 추가되기 전과 후의 차이를 보게되면 Processing Time이 더 길어짐에 따라서 Total Delay가 길어지는 것을 확인해 볼 수 있었다.

이 어플리케이션은 1분 단위의 마이크로배치로 돌아가기 때문에 배치간 작업은 1분 이내에 마무리가 되어야 그 다음 배치에 

영향을 미치지 않는다. 따라서 해당 어플리케이션을 튜닝해야하는 이슈가 있었고 어떻게 성능을 개선하면 좋을지 생각을 하다가 

기존 executor가 5개로 동작을 하고 있었는데 클러스터의 가용 vcore개수가 많음에도 불구하고 너무나도 적은 개수의 

executor만을 사용하여 실시간 작업을 처리하고 있었다.

이에 spark-submit 실행시 executor number개수를 5개에서 10개로 늘려주고 확인해보았다.


[ 성능 개선 후 (executor 개수(5->10) 조절) ]


조절 후 이미지를 보면 알겠지만 Processing Time이 반으로 줄어들면서 Total Delay부분도 줄어들었고 Scheduling Delay가 발생하지 

않는 것을 확인할 수 있었다. 어떻게 보면 되게 단순하게 어플리케이션의 성능을 개선했지만 스파크의 동작방식에 대한 이해와 작동되는 

클러스터에 대한 이해없이는 쉽게 나올 수 없는 해결책이었다고 생각이든다. 


그럼 executor 개수를 더 늘려주면 더 좋을까? 라고 누군가 묻는다면 불필요하게 executor의 개수만 무작정 늘린다고 해서 좋다고 

말할 수는 없다. 어플리케이션에 따라 프로세스를 처리하는데 적절한 executor의 개수가 존재한다고 생각이 들고 

스트리밍 어플리케이션은 그 기준은 Scheduling Delay가 발생하지 않는 선에서의 개수를 맞추어주면 된다. 

불필요하게 많은 executor는 너무 적은 데이터를 처리하게 되고 클러스터 전체로 보았을 때는 불필요한 오버헤드가 많이 발생할 수 있다. 

또한 executor들간 broadcast시에도 더 많은 네트워크I/O를 초래할 수 있기 때문에 무작정 executor를 많이 설정해 사용하는 것이 

좋은 방법만은 아니다.


또한 한가지 팁은 executor의 개수를 spark-submit시 명시적으로 주어 활용한다면 

--conf에 spark.yarn.max.executor.failures 해당 옵션을 주어서 사용하도록 하자.

spark.yarn.max.executor.failuresnumExecutors * 2, with minimum of 3The maximum number of executor failures before failing the application.

ex ) executor 개수를 5개로 할당했으면 --conf "spark.yarn.max.executor.failures=10"으로 주어 설정해주도록 하자.


Worker만 무작정 늘린다고 Performance가 향상되는 것은 아니다^^


반응형
반응형

Spark scala.reflect.api.JavaUniverse.runtimeMirror 에러


Exception in thread "main" java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;

        at com.payco.dmp.orc.converter.ClickerScala$.main(ClickerScala.scala:36)

        at com.payco.dmp.orc.converter.ClickerScala.main(ClickerScala.scala)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)

        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)

        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)

        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)

        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



spark-submit으로 spark 작업 실행 다음과 같은 에러 메세지가 떨어졌다면 build.sbt scala버전과 spark library 버전이


서버에서 사용중인 spark 버전과 맞는지 확인하기 바란다.


나의 경우는 spark2.1.1 에서 동작하는 scala spark프로젝트 내부 scalaVersion 2.10.5에서 2.11.5 변경하여 해결하였다.



반응형

+ Recent posts