반응형


스파크(Spark) 스트리밍 성능(Processing Time) 개선

실시간 관심사 타겟팅 애플리케이션에서 더 많은 관심사 모수를 뽑아도록 작업한 로직을 배포하게되었다.

실시간 스파크 어플리케이션을 배포할 때 가장 유의해야 하는 것은 추가된 기능으로 인해 마이크로배치형태로 처리되는 작업들의 

수행시간이 현저히 길어지지 않았는가 모니터링하고 체크하는 것이다.

내가 이번에 추가한 로직도 성능에 영향을 미칠 것이라고 판단되어 배포전 실시간 스트리밍 처리 현황과 배포 후를 비교해보았다.


[ 새로운 기능이 추가되기 전 ] 


[ 새로운 기능이 추가된 후 ]


새로운 기능이 추가되기 전과 후의 차이를 보게되면 Processing Time이 더 길어짐에 따라서 Total Delay가 길어지는 것을 확인해 볼 수 있었다.

이 어플리케이션은 1분 단위의 마이크로배치로 돌아가기 때문에 배치간 작업은 1분 이내에 마무리가 되어야 그 다음 배치에 

영향을 미치지 않는다. 따라서 해당 어플리케이션을 튜닝해야하는 이슈가 있었고 어떻게 성능을 개선하면 좋을지 생각을 하다가 

기존 executor가 5개로 동작을 하고 있었는데 클러스터의 가용 vcore개수가 많음에도 불구하고 너무나도 적은 개수의 

executor만을 사용하여 실시간 작업을 처리하고 있었다.

이에 spark-submit 실행시 executor number개수를 5개에서 10개로 늘려주고 확인해보았다.


[ 성능 개선 후 (executor 개수(5->10) 조절) ]


조절 후 이미지를 보면 알겠지만 Processing Time이 반으로 줄어들면서 Total Delay부분도 줄어들었고 Scheduling Delay가 발생하지 

않는 것을 확인할 수 있었다. 어떻게 보면 되게 단순하게 어플리케이션의 성능을 개선했지만 스파크의 동작방식에 대한 이해와 작동되는 

클러스터에 대한 이해없이는 쉽게 나올 수 없는 해결책이었다고 생각이든다. 


그럼 executor 개수를 더 늘려주면 더 좋을까? 라고 누군가 묻는다면 불필요하게 executor의 개수만 무작정 늘린다고 해서 좋다고 

말할 수는 없다. 어플리케이션에 따라 프로세스를 처리하는데 적절한 executor의 개수가 존재한다고 생각이 들고 

스트리밍 어플리케이션은 그 기준은 Scheduling Delay가 발생하지 않는 선에서의 개수를 맞추어주면 된다. 

불필요하게 많은 executor는 너무 적은 데이터를 처리하게 되고 클러스터 전체로 보았을 때는 불필요한 오버헤드가 많이 발생할 수 있다. 

또한 executor들간 broadcast시에도 더 많은 네트워크I/O를 초래할 수 있기 때문에 무작정 executor를 많이 설정해 사용하는 것이 

좋은 방법만은 아니다.


또한 한가지 팁은 executor의 개수를 spark-submit시 명시적으로 주어 활용한다면 

--conf에 spark.yarn.max.executor.failures 해당 옵션을 주어서 사용하도록 하자.

spark.yarn.max.executor.failuresnumExecutors * 2, with minimum of 3The maximum number of executor failures before failing the application.

ex ) executor 개수를 5개로 할당했으면 --conf "spark.yarn.max.executor.failures=10"으로 주어 설정해주도록 하자.


Worker만 무작정 늘린다고 Performance가 향상되는 것은 아니다^^


반응형
반응형

[ 하둡 MR보다 스파크(SPARK)를 사용할 때 장점 ]

하둡MR보다 스파크(SPARK)를 사용했을 때의 대부분이 말하는 이점은 디스크 처리 기반에서 메모리 처리 기반으로 넘어오면서 연산처리 속도가 빨라졌다는 것이다.

뭐 틀린말은 아니지만 데이터 엔지니어 입장에서 뭔가 더 구체적으로 설명할 수 있어야 하지 않을까 하는 생각에 간단히 정리해 포스팅해본다.

1. 스파크(SPARK)의 연산 방식은 lazy evaluation으로 수행된다. 

Lazy evaluation(굳이 번역해 보자면 느긋한 연산 정도 되겠다)을 사용함으로써 action이 시작되는 시점에 트랜스포메이션(transformation)끼리의 연계를 파악해 실행 계획의 최적가 가능해진다. 사용자가 입력한 변환 연산들을 즉시 수행하지 않고 모아뒀다가 가장 최적의 수행 방법을 찾아 처리하는 장점을 가진다.
여기서 말하는 최적화란 대부분 지역성(locality)에 관한 것이다. 예를 들어 물건을 사오는 심부름을 시킬 때 A상점에서 파는 물건과 B상점에서 파는 물건을 따로따로 여러 번사오게 하는 것보다 필요한 물건을 한꺼번에 주문해서 한 번 방문했을 때 필요한 물건을 한 번에 사는 것이 효율적이기 떄문이다.

사실 fist() 액션에서도 스파크는 처음 일치하는 라인이 나올 때까지만 파일을 읽을 뿐 전체 파일을 읽거나 하지 않는다.

실제로 하둡 맵리듀스 같은 시스템에서는 맵리듀스의 데이터 전달 회수를 줄이기 위해 어떤 식으로 연산을 그룹화 할지 고민하느라 개발자들이 시간을 많이 빼앗기게 된다. 맵리듀스에서 연산 개수가 많다는 것은 곧 네트워크로 데이터를 전송하는 단계가 많아짐을 의미하고 그만큼 클러스터에 부하를 가져다 줄 수 있다. 스파크(SPARK)에서는 단순한 연산 들을 많이 연결해서 사용하는 것이나 하나의 복잡한 매핑 코드를 쓰는 것이나 큰 차이가 없는데 기본적으로 스파크에서 효율적인 계획을 세워서 수행하기 때문이다. 그렇다고 해서 rdd재사용 등을 고려하지 않고 아무렇게나 프로그래밍을 해도 된다는 의미는 아니다. 따라서 스파크 사용자들은 프로그램을 더 작게 만들고, 효율적인 연산의 코드를 만들어 내야 한다는 부담에서 좀 더 자유로울 수 있다.


2. RDD 재사용을 위한 캐싱 기능
기본적으로 메모리위에 캐싱을 하여 처리를 하게 되면 디스크 처리 기반의 MR작업보다 최소 10~20배 이상 빠를 수 밖에 없다. 여러 액션에서RDD 하나를 재사용하고 싶으면 RDD.persist()를 사용하여 계속 결과를 유지하도록 할 수 있다. 첫 연산이 이루어진 후 스파크는 RDD의 내용을 메모리에 저장하게 되며(클러스터의 여러 머신들에 나뉘어서) 이후의 액션들에서 재사용할 수 있게 된다.


3. RDD는 유연한 연산 방식을 제공한다.
분산 데이터로서의 RDD(Resilient Distributed Datasets)는 문자 그대로 해석하면 "회복력을 가진 분산 데이터 집합"으로  데이터를 처리하는 과정에서 집합을 이루고 있던 데이터의 일부에 문제가 생겨도 스스로 알아서 복구할 수 있다는 의미이다.  실제로 이것은 스파크(SPARK)가 RDD를 만들어 내는 방법을 기억하고 있기 때문에 가능한 것으로 스파크는 데이터의 일부가 유실되면 어딘가에 백업해둔 데이터를 다시 불러오는 것이 아니고 데이터를 다시 만들어내는 방식으로 복구를 수행하게 됩니다.


4. 코드 간결성 및 Interactive shell
하둡 MR을 해보신 분은 알겠지만 단어들을 aggregate하는 하둡 MR소스코드는 맵과 리듀스를 만들어주어야 하기 때문에 길고 복잡할 수 밖에 없는 반면에 스파크는 람다기반의 함수형 프로그래밍 기법으로코드가 매우 간단하며, interactive shell을 사용하여 실제 쉘에서 실시간으로 데이터 변화를 확인할 수 있다는 장점을 가지고 있습니다.


실제로 하둡 MR 대안으로 SQL을 MapReduce로 변환해주는 Hive 프로젝트가 있어 많은 사람이 잘 사용하고 있지만, 쿼리를 최적화하기가 어렵고 속도가 더 느려지는 경우가 많다는 어려움이 있다. 스파크는 이러한 단점들을 보안하며 위와 같은 장점들로 인해 분산 처리 툴로서 많은 관심과 사랑?을 받고 있다고 볼 수 있습니다.


반응형
반응형

하둡1.0과 하둡2.0의 차이는 YARN으로 인해 많은 부분이 변화되었다.

그 차이에 대해서 알아보도록 하자.

[ 아키텍처의 변화 ]


[ 하둡 1.0과 2.0에서 리소스 관리 차이 ]

하둡 1.0에서 맵리듀스를 실행할 때는 슬롯 단위로 맵/리듀스 태스크 갯수를 관리했다.따라서 맵퍼는 모두 동작하는데 리듀서는 놀고 있거나 반대의 경우로 인해 클러스터 전체 사용률이 낮았다.

하지만 하둡 2.0에서 YARN(얀)이 도입되면서 슬롯이 아닌 컨테이너 단위로 리소스를 할당하게 되었다. 얀의 리소스 매니저는 전체 클러스터의 리소스 정보를 토대로 할당 가능한 컨테이너 개수를 계산하며, 맵리듀스는 필요한 컨테이너들을 할당 받아서 맵리듀스 태스크를 실행하게 된다. 

이 때 컨테이너 개수와 맵과 리듀스 태스크의 관계는 1:1의 관계가 아니며, 맵과 리듀스 태스크는 상황에 따라서 하나 이상의 컨테이너를 실행할 수도 있다. 그래서 관리자는 전체 클러스터의 리소스 상황과 얀에서 실행하는 잡들의 워크로드를 고려하여 리소스 설정을 진행해야 한다.


[ YARN의 도입으로 JobTracker의 역할이 Resource Manager와 Application Master로 분리 ] 

하둡 1.0에서는 JobTracker(잡트래커)가 클러스터 리소스 관리 및 어플리케이션 스케쥴링 등을 모두 담당했었다.

하지만 하둡 2.0에서는 클러스터마다 Application Master(어플리케이션 마스터)가 존재하고 각 서버마다 Node Manager(노드 매니저)가 할당되어 있고 리소스관리는 Resource Manager(리소스 매니저)가 어플리케이션 수행 및 스케쥴링 관리는 Application Master(어플리케이션 마스터)로 역할이 분리되어 운영된다.


[ Spark 등 분산처리 환경 지원 ]

하둡의 맵/리듀스 작업보다 성능이 훨씬 개선된 SPARK 및 분산처리 프레임워크를 사용할 수 있게 되었다. 스파크는 배치 처리 작업에 있어서 맵리듀스보다 10배정도 빠르며 인메모리 분석에서 100배나 빠르다고 알려져 있다.




반응형
반응형

스파크 작업을 하다보면 데이터 처리시 쿼리 기반의 spark sql, hive를 이용하기위해 orc로 데이터를 적재하는 경우가 많다.


이 때 spark-shell로 orc파일을 읽어들여 데이터를 보게되면 컬럼보다 데이터내용이 길게되면 잘려서 노출된다.


따라서 데이터 내용을 보고싶다면 dataframe형태의 데이터를 rdd로 만들어서 first나 rdd.take(n).foreach(println)식으로 보도록 하자



1. orc 파일 읽기

val data = sqlContext.read.format("orc").load("hdfs file directory")          //spark1.5,6 version


2. 읽어들인 데이터(dataframe형태)를 rdd로 변경하기

val rdd = data.rdd


3. rdd로 변경한 데이터 보기

rdd.first

rdd.take(n).foreach(println)         //n은 보고싶은 라인 개수




반응형
반응형

Spark 직렬화 포맷

spark는 네트워크로 데이터를 전송하거나 디스크에 쓸 때 객체들을 직렬화해 바이너리 포맷으로 변환한다.

기본적으로 Java에 내장된 직렬화를 이용하지만 spark는 java 직렬화보다 훨씬 향상된 서드파티 라이브러리인

kryo를 쓰는 것을 지원한다.

반응형
반응형

Spark scala.reflect.api.JavaUniverse.runtimeMirror 에러


Exception in thread "main" java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;

        at com.payco.dmp.orc.converter.ClickerScala$.main(ClickerScala.scala:36)

        at com.payco.dmp.orc.converter.ClickerScala.main(ClickerScala.scala)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)

        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)

        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)

        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)

        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



spark-submit으로 spark 작업 실행 다음과 같은 에러 메세지가 떨어졌다면 build.sbt scala버전과 spark library 버전이


서버에서 사용중인 spark 버전과 맞는지 확인하기 바란다.


나의 경우는 spark2.1.1 에서 동작하는 scala spark프로젝트 내부 scalaVersion 2.10.5에서 2.11.5 변경하여 해결하였다.



반응형
반응형

Scala 버전만 작성하였다.


Java, Python, R에 대해서도 정보가 필요하다면 글 하단의 참조 링크를 참고 바란다.


SparkConf( Spark 1.6 / Spark 2.x)

You will continue to use these classes (via the sparkContext accessor) to perform operations that require the Spark Core API, such as working with accumulators, broadcast variables, or low-level RDDs. However, you will not need to manually create it.

// Spark 1.6
val sparkConf = new SparkConf().setMaster("local[*]")
sparkConf.set("spark.files", "file.txt")
 
// Spark 2.x
val spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.files", "file.txt")


SparkContext( Spark 1.6 / Spark 2.x)

The SQLContext is completely superceded by SparkSession. Most Dataset and DataFrame operations are directly available in SparkSession. Operations related to table and database metadata are now encapsulated in a Catalog (via the catalog accessor).

// Spark 1.6
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json("data.json")
val tables = sqlContext.tables()
 
// Spark 2.x
val spark = SparkSession.builder.getOrCreate()
val df = spark.read.json("data.json")
val tables = spark.catalog.listTables()


HiveContext( Spark 1.6 / Spark 2.x)

The HiveContext is completely superceded by SparkSession. You will need enable Hive support when you create your SparkSession and include the necessary Hive library dependencies in your classpath.

// Spark 1.6
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val hiveContext = new HiveContext(sc)
val df = hiveContext.sql("SELECT * FROM hiveTable")
 
// Spark 2.x
val spark = SparkSession.builder.enableHiveSupport().getOrCreate()
val df = spark.sql("SELECT * FROM hiveTable")


ref : https://sparkour.urizone.net/recipes/understanding-sparksession/

반응형
반응형

데이터 엔지니어로 살아가기 149일째(spark2.1.1


오늘 문득 spark2.1.1버전에서 java8 람다식을 활용한 데이터 처리 속도와 


spark1.5(현재 클러스터에서 사용버전)에서 java7을 통한 spark 데이터 처리 속도가 얼마나 나는지 궁금해졌다.


덤으로 scala spark이 동일한 데이터 처리 프로세스를 가지고 spark1.5, spark2.1.1버전에서 돌았을 때 성능도 궁금해져


간단히 70만 라인의 텍스트 파일로 파싱하고 필터링 처리를 하고 text파일과 orc파일로 변환해 적재하는 테스트를 진행하였다.


아직 scala를 spark2.1.1에서 테스트하는 부분은 확인하진 못했지만 확실히 spark2.1.1대 버전에서 java8 람다식을 활용해 처리하는 것


이 제일 빨랐다. 이부분은 좀더 테스트 후 포스팅 하도록 하겠다.


그리고 오전에는 딥러닝에 대한 세미나를 들었다. 세미나 발표를 통해 큰 깨달음이나 감명을 받진 못했지만


꾸준히 관심을 가지고 학습해 추후에 지금 처리하고 있는 데이터들에도 적용할 수 있는 방법을 연구해보면 좋을 것 같다.


아 그리고 틈틈히 scala공부도 하도록하자!


반응형
반응형

오늘 스파크 작업중 No TypeTag available for 에러가 발생하였다. 


밑에 소스코드 부분에서 val df = sQLContext.createDataFrmae(test_rdd3) 부분에서 발생하였고 


case class의 위치를 메소드의 바깥 부분으로 빼내어서 처리하였다. 


원인에 대해 참고할 만한 stackoverflow 내용이 있어 참고하였다.

https://stackoverflow.com/questions/29143756/scala-spark-app-with-no-typetag-available-error-in-def-main-style-app





[ 에러 발생 코드 ] 




[ 에러 해결 ]

case class를 main method 바깥으로 빼서 처리





뭔가 컴파일 시점에 case class를 정의하는 시점과 case class를 통한 spark의 action이 일어나는 시점과 연관이 있을 것 같

은데 자세한 원인에 대해서는 파악하지 못했다ㅠ 혹시 조언해주실분 있으면 감사하겠습니다. 

스칼라 공부를 좀 더 심도있게해야 할듯 싶다.


반응형

+ Recent posts