반응형


[ Spark ] JavaRDD로 saveAsTextFile했는데 데이터가 정상적으로 나오지 않는다???


Spark(스파크) 작업을 하고나서 결과물을 hadoop(하둡)에 쓰고 싶을 경우 보통 saveAsTextFile 메소드를 사용하게 된다.


그런데 적재를 하고 하둡 명령어로 해당 파일을 열어봤는데 파일의 내용이 정상적으로 안써지고 dataframe을 RDD로 변경할 때 사용한


모델 패키지명으로 노출되는 경우가 있다. 다음과 같이...


처음 스파크로 작업을 했을 때 적잖이 당황했던 기억이나서 포스팅한다.


코드를 보게되면


다음과 같은 식으로 integratedDF라는 변수명의 dataframe을 JavaRDD로 변환하는데 매핑 모델로 AdidBidPairingModel을 쓰고 있다.


해당 모델을 보면 다음과 같다.


스파크에서 특정 클래스파일로 데이터를 쓸 때 해당 클래스의 toString메소드를 참고해 데이터를 쓰게되는데?(내추측) 


해당 모델 클래스에는 toString메소드가 없다...


보통 일반적인 스프링프로젝트에서는 lombok을 써서 알아서 생성해주지만 일반 자바 프로젝트에서는 toString메소드를 작성해주어야 한다.




다음과 같이 AdidBidPairingModel 끝부분에 toString메소드를 정의해주고 다시 spark-submit을 해보면 ~



정상적인형태로 데이터가 나오는 것을 확인할 수 있다!!!!


처음 스파크프로젝트로 작업을 하시는 분들은 꽤나 헤맬 수 있다...나또한 나중에 또 헤맬 수 있으므로 오랜만에 spark작업하는 김에 기록해본다!!!


반응형
반응형


스파크(Spark)에서 OutOfMemoryError:Java heap space 다음과 같은 에러 메세지가 발생했다면


스파크 어플리케이션 내부에서 다음과 같은 메서드를 사용하지 않았는지 확인해볼 필요가 있다.

- collect()

- countByKey()

- countByValue()

- collectAsMap()



사실 이전에 collect() 메서드 사용시 주의점을 포스팅하긴했었다...

http://brocess.tistory.com/56  - [Spark collect 연산시 주의사항]



해당 메서드들을 rdd에 사용하게 되면 각각의 executor 노드들에서 돌고있는 rdd element들을 driver로 copy하려고 한다.


따라서 RDD의 사이즈가 크다면 driver에서 해당 RDD들을 저장할 메모리가 부족하게 되고 이에 OOME(OutOfMemory)가 발생할 수 있다.



이번에 나도 무심결에 RDD를 collectAsMap()해버려 위와 같은 에러가 발생하였다...


단순히 RDD를 map형태로 만들어 놓고 다른 RDD연산을 위해 사용하려고 했던 생각해서 저지른 실수였다...


물론 충분한 driver memory를 spark-submit시 할당해 주면 되겠지만 아무래도 클러스터 내에 해당 어플리케이션만 동작하는게 아니기 때문에 


좋지 않은 방법이다.


따라서 사이즈가 큰 데이터를 map형태로 이용하고 싶다면 Storage(RDBMS, NOSQL)을 사용하기를 권한다.



반응형
반응형


스파크(Spark) 스트리밍 성능(Processing Time) 개선

실시간 관심사 타겟팅 애플리케이션에서 더 많은 관심사 모수를 뽑아도록 작업한 로직을 배포하게되었다.

실시간 스파크 어플리케이션을 배포할 때 가장 유의해야 하는 것은 추가된 기능으로 인해 마이크로배치형태로 처리되는 작업들의 

수행시간이 현저히 길어지지 않았는가 모니터링하고 체크하는 것이다.

내가 이번에 추가한 로직도 성능에 영향을 미칠 것이라고 판단되어 배포전 실시간 스트리밍 처리 현황과 배포 후를 비교해보았다.


[ 새로운 기능이 추가되기 전 ] 


[ 새로운 기능이 추가된 후 ]


새로운 기능이 추가되기 전과 후의 차이를 보게되면 Processing Time이 더 길어짐에 따라서 Total Delay가 길어지는 것을 확인해 볼 수 있었다.

이 어플리케이션은 1분 단위의 마이크로배치로 돌아가기 때문에 배치간 작업은 1분 이내에 마무리가 되어야 그 다음 배치에 

영향을 미치지 않는다. 따라서 해당 어플리케이션을 튜닝해야하는 이슈가 있었고 어떻게 성능을 개선하면 좋을지 생각을 하다가 

기존 executor가 5개로 동작을 하고 있었는데 클러스터의 가용 vcore개수가 많음에도 불구하고 너무나도 적은 개수의 

executor만을 사용하여 실시간 작업을 처리하고 있었다.

이에 spark-submit 실행시 executor number개수를 5개에서 10개로 늘려주고 확인해보았다.


[ 성능 개선 후 (executor 개수(5->10) 조절) ]


조절 후 이미지를 보면 알겠지만 Processing Time이 반으로 줄어들면서 Total Delay부분도 줄어들었고 Scheduling Delay가 발생하지 

않는 것을 확인할 수 있었다. 어떻게 보면 되게 단순하게 어플리케이션의 성능을 개선했지만 스파크의 동작방식에 대한 이해와 작동되는 

클러스터에 대한 이해없이는 쉽게 나올 수 없는 해결책이었다고 생각이든다. 


그럼 executor 개수를 더 늘려주면 더 좋을까? 라고 누군가 묻는다면 불필요하게 executor의 개수만 무작정 늘린다고 해서 좋다고 

말할 수는 없다. 어플리케이션에 따라 프로세스를 처리하는데 적절한 executor의 개수가 존재한다고 생각이 들고 

스트리밍 어플리케이션은 그 기준은 Scheduling Delay가 발생하지 않는 선에서의 개수를 맞추어주면 된다. 

불필요하게 많은 executor는 너무 적은 데이터를 처리하게 되고 클러스터 전체로 보았을 때는 불필요한 오버헤드가 많이 발생할 수 있다. 

또한 executor들간 broadcast시에도 더 많은 네트워크I/O를 초래할 수 있기 때문에 무작정 executor를 많이 설정해 사용하는 것이 

좋은 방법만은 아니다.


또한 한가지 팁은 executor의 개수를 spark-submit시 명시적으로 주어 활용한다면 

--conf에 spark.yarn.max.executor.failures 해당 옵션을 주어서 사용하도록 하자.

spark.yarn.max.executor.failuresnumExecutors * 2, with minimum of 3The maximum number of executor failures before failing the application.

ex ) executor 개수를 5개로 할당했으면 --conf "spark.yarn.max.executor.failures=10"으로 주어 설정해주도록 하자.


Worker만 무작정 늘린다고 Performance가 향상되는 것은 아니다^^


반응형
반응형

[ 하둡 MR보다 스파크(SPARK)를 사용할 때 장점 ]

하둡MR보다 스파크(SPARK)를 사용했을 때의 대부분이 말하는 이점은 디스크 처리 기반에서 메모리 처리 기반으로 넘어오면서 연산처리 속도가 빨라졌다는 것이다.

뭐 틀린말은 아니지만 데이터 엔지니어 입장에서 뭔가 더 구체적으로 설명할 수 있어야 하지 않을까 하는 생각에 간단히 정리해 포스팅해본다.

1. 스파크(SPARK)의 연산 방식은 lazy evaluation으로 수행된다. 

Lazy evaluation(굳이 번역해 보자면 느긋한 연산 정도 되겠다)을 사용함으로써 action이 시작되는 시점에 트랜스포메이션(transformation)끼리의 연계를 파악해 실행 계획의 최적가 가능해진다. 사용자가 입력한 변환 연산들을 즉시 수행하지 않고 모아뒀다가 가장 최적의 수행 방법을 찾아 처리하는 장점을 가진다.
여기서 말하는 최적화란 대부분 지역성(locality)에 관한 것이다. 예를 들어 물건을 사오는 심부름을 시킬 때 A상점에서 파는 물건과 B상점에서 파는 물건을 따로따로 여러 번사오게 하는 것보다 필요한 물건을 한꺼번에 주문해서 한 번 방문했을 때 필요한 물건을 한 번에 사는 것이 효율적이기 떄문이다.

사실 fist() 액션에서도 스파크는 처음 일치하는 라인이 나올 때까지만 파일을 읽을 뿐 전체 파일을 읽거나 하지 않는다.

실제로 하둡 맵리듀스 같은 시스템에서는 맵리듀스의 데이터 전달 회수를 줄이기 위해 어떤 식으로 연산을 그룹화 할지 고민하느라 개발자들이 시간을 많이 빼앗기게 된다. 맵리듀스에서 연산 개수가 많다는 것은 곧 네트워크로 데이터를 전송하는 단계가 많아짐을 의미하고 그만큼 클러스터에 부하를 가져다 줄 수 있다. 스파크(SPARK)에서는 단순한 연산 들을 많이 연결해서 사용하는 것이나 하나의 복잡한 매핑 코드를 쓰는 것이나 큰 차이가 없는데 기본적으로 스파크에서 효율적인 계획을 세워서 수행하기 때문이다. 그렇다고 해서 rdd재사용 등을 고려하지 않고 아무렇게나 프로그래밍을 해도 된다는 의미는 아니다. 따라서 스파크 사용자들은 프로그램을 더 작게 만들고, 효율적인 연산의 코드를 만들어 내야 한다는 부담에서 좀 더 자유로울 수 있다.


2. RDD 재사용을 위한 캐싱 기능
기본적으로 메모리위에 캐싱을 하여 처리를 하게 되면 디스크 처리 기반의 MR작업보다 최소 10~20배 이상 빠를 수 밖에 없다. 여러 액션에서RDD 하나를 재사용하고 싶으면 RDD.persist()를 사용하여 계속 결과를 유지하도록 할 수 있다. 첫 연산이 이루어진 후 스파크는 RDD의 내용을 메모리에 저장하게 되며(클러스터의 여러 머신들에 나뉘어서) 이후의 액션들에서 재사용할 수 있게 된다.


3. RDD는 유연한 연산 방식을 제공한다.
분산 데이터로서의 RDD(Resilient Distributed Datasets)는 문자 그대로 해석하면 "회복력을 가진 분산 데이터 집합"으로  데이터를 처리하는 과정에서 집합을 이루고 있던 데이터의 일부에 문제가 생겨도 스스로 알아서 복구할 수 있다는 의미이다.  실제로 이것은 스파크(SPARK)가 RDD를 만들어 내는 방법을 기억하고 있기 때문에 가능한 것으로 스파크는 데이터의 일부가 유실되면 어딘가에 백업해둔 데이터를 다시 불러오는 것이 아니고 데이터를 다시 만들어내는 방식으로 복구를 수행하게 됩니다.


4. 코드 간결성 및 Interactive shell
하둡 MR을 해보신 분은 알겠지만 단어들을 aggregate하는 하둡 MR소스코드는 맵과 리듀스를 만들어주어야 하기 때문에 길고 복잡할 수 밖에 없는 반면에 스파크는 람다기반의 함수형 프로그래밍 기법으로코드가 매우 간단하며, interactive shell을 사용하여 실제 쉘에서 실시간으로 데이터 변화를 확인할 수 있다는 장점을 가지고 있습니다.


실제로 하둡 MR 대안으로 SQL을 MapReduce로 변환해주는 Hive 프로젝트가 있어 많은 사람이 잘 사용하고 있지만, 쿼리를 최적화하기가 어렵고 속도가 더 느려지는 경우가 많다는 어려움이 있다. 스파크는 이러한 단점들을 보안하며 위와 같은 장점들로 인해 분산 처리 툴로서 많은 관심과 사랑?을 받고 있다고 볼 수 있습니다.


반응형
반응형

스파크 작업을 하다보면 데이터 처리시 쿼리 기반의 spark sql, hive를 이용하기위해 orc로 데이터를 적재하는 경우가 많다.


이 때 spark-shell로 orc파일을 읽어들여 데이터를 보게되면 컬럼보다 데이터내용이 길게되면 잘려서 노출된다.


따라서 데이터 내용을 보고싶다면 dataframe형태의 데이터를 rdd로 만들어서 first나 rdd.take(n).foreach(println)식으로 보도록 하자



1. orc 파일 읽기

val data = sqlContext.read.format("orc").load("hdfs file directory")          //spark1.5,6 version


2. 읽어들인 데이터(dataframe형태)를 rdd로 변경하기

val rdd = data.rdd


3. rdd로 변경한 데이터 보기

rdd.first

rdd.take(n).foreach(println)         //n은 보고싶은 라인 개수




반응형
반응형

Spark 직렬화 포맷

spark는 네트워크로 데이터를 전송하거나 디스크에 쓸 때 객체들을 직렬화해 바이너리 포맷으로 변환한다.

기본적으로 Java에 내장된 직렬화를 이용하지만 spark는 java 직렬화보다 훨씬 향상된 서드파티 라이브러리인

kryo를 쓰는 것을 지원한다.

반응형
반응형

Spark scala.reflect.api.JavaUniverse.runtimeMirror 에러


Exception in thread "main" java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;

        at com.payco.dmp.orc.converter.ClickerScala$.main(ClickerScala.scala:36)

        at com.payco.dmp.orc.converter.ClickerScala.main(ClickerScala.scala)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)

        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)

        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)

        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)

        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



spark-submit으로 spark 작업 실행 다음과 같은 에러 메세지가 떨어졌다면 build.sbt scala버전과 spark library 버전이


서버에서 사용중인 spark 버전과 맞는지 확인하기 바란다.


나의 경우는 spark2.1.1 에서 동작하는 scala spark프로젝트 내부 scalaVersion 2.10.5에서 2.11.5 변경하여 해결하였다.



반응형
반응형

Scala 버전만 작성하였다.


Java, Python, R에 대해서도 정보가 필요하다면 글 하단의 참조 링크를 참고 바란다.


SparkConf( Spark 1.6 / Spark 2.x)

You will continue to use these classes (via the sparkContext accessor) to perform operations that require the Spark Core API, such as working with accumulators, broadcast variables, or low-level RDDs. However, you will not need to manually create it.

// Spark 1.6
val sparkConf = new SparkConf().setMaster("local[*]")
sparkConf.set("spark.files", "file.txt")
 
// Spark 2.x
val spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.files", "file.txt")


SparkContext( Spark 1.6 / Spark 2.x)

The SQLContext is completely superceded by SparkSession. Most Dataset and DataFrame operations are directly available in SparkSession. Operations related to table and database metadata are now encapsulated in a Catalog (via the catalog accessor).

// Spark 1.6
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json("data.json")
val tables = sqlContext.tables()
 
// Spark 2.x
val spark = SparkSession.builder.getOrCreate()
val df = spark.read.json("data.json")
val tables = spark.catalog.listTables()


HiveContext( Spark 1.6 / Spark 2.x)

The HiveContext is completely superceded by SparkSession. You will need enable Hive support when you create your SparkSession and include the necessary Hive library dependencies in your classpath.

// Spark 1.6
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val hiveContext = new HiveContext(sc)
val df = hiveContext.sql("SELECT * FROM hiveTable")
 
// Spark 2.x
val spark = SparkSession.builder.enableHiveSupport().getOrCreate()
val df = spark.sql("SELECT * FROM hiveTable")


ref : https://sparkour.urizone.net/recipes/understanding-sparksession/

반응형
반응형

오늘 스파크 작업중 No TypeTag available for 에러가 발생하였다. 


밑에 소스코드 부분에서 val df = sQLContext.createDataFrmae(test_rdd3) 부분에서 발생하였고 


case class의 위치를 메소드의 바깥 부분으로 빼내어서 처리하였다. 


원인에 대해 참고할 만한 stackoverflow 내용이 있어 참고하였다.

https://stackoverflow.com/questions/29143756/scala-spark-app-with-no-typetag-available-error-in-def-main-style-app





[ 에러 발생 코드 ] 




[ 에러 해결 ]

case class를 main method 바깥으로 빼서 처리





뭔가 컴파일 시점에 case class를 정의하는 시점과 case class를 통한 spark의 action이 일어나는 시점과 연관이 있을 것 같

은데 자세한 원인에 대해서는 파악하지 못했다ㅠ 혹시 조언해주실분 있으면 감사하겠습니다. 

스칼라 공부를 좀 더 심도있게해야 할듯 싶다.


반응형

+ Recent posts