반응형

'빅데이터 분석을 위한 스파크2 프로그래밍'책의 내용을 정리한 포스팅입니다.



RDD란?

스파크가 사용하는 핵심 데이터 모델로서 다수의 서버에 걸쳐 분산 방식으로 저장된 데이터 요소들의 집합을 의미하며, 병렬처리가 가능하고 장애가 발생할 경우에도 스스로 복구될 수 있는 내성을 가지고 있다. 즉, RDD란 스파크에서 정의한 분산 데이터 모델인데 내부에는 단위 데이터를 포함하고 있고 저장할 때는 여러 서버에 나누어 저장되며, 처리할 때는 각 서버에 저장된 데이터를 동시에 병렬로 처리할 수 있는 모델이다. 


RDD장점

데이터를 여러 서버에 나누어 저장하고, 처리하는 과정에서 일부 서버 혹은 데이터에 문제가 발생하더라도 스스로 에러를 복구할 수 있는 능력을 가지고 있는 데이터 모델이다.


RDD처리 방식

RDD에 속한 요소들은 파티션이라고 하는 더 작은 단위로 나눠질 수 있는데, 스파크는 작업을 수행할 때 바로 이 파티션 단위로 나눠서 병렬로 처리를 수행한다. 이렇게 만들어진 파티션은 작업이 진행되는 과정에서 재구성되거나 네트워크를 통해 다른 서버로 이동하는, 이른바 셔플링이 발생할 수 있다. 

이런 셔플링은 전체 작업 성능에 큰 영향을 주기 때문에 주의해서 다뤄야 하며, 스파크에서는 셔플링이 발생할 수 있는 주요 연산마다 파티션의 개수를 직접 지정할 수 있는 옵션을 제공한다.  (파티션의 수는 곧 데이터 처리에 참여하는 병렬 프로세스의 수이다. 즉, 하나의 데이터를 잘게 쪼개어 여러 개의 파티션을 만들면 여러 프로세스에서 동시에 작업을 처리해서 처리 속도가 증가할 수 있지만 이 정도가 지나치면 오히려 전체 성능을 떨어뜨리는 요인이 된다.


스파크의 장애시 RDD복구

하나의 RDD가 여러 파티션으로 나눠져 다수의 서버에서 처리되다 보니 작업 도중 일부 파티션에 장애가 발생해서 데이터가 유실될 수 있는데, 스파크는 손상된 RDD를 원래 상태로 다시 복원하기 위해 RDD의 생성 과정을 기록해 뒀다가 다시 복구해 주는 기능을 가지고 있다. RDD의  resilient라는 단어가 복구 능력을 의미하는데, 좀 더 정확하게 말하면 RDD에 포함된 데이터를 저장해 두는 것이 아니고 RDD를 생성하는 데 사용했던 작업 내용을 기억하고 있는 것이다. 그래서 문제가 발생하면 전체 작업을 처음부터 다시 실행하는 대신 문제가 발생한 RDD를 생성했던 작업만 다시 수행해서 복구를 수행한다.

정리하면, 스파크는 RDD가 생성되어 변경되는 모든 과정을 일일이 기억하는 대신 RDD를 한번 생성하면 변경되지 않는 읽기 전용 모델로 만든 후 RDD 생성과 관련된 내용만 기억하고 있다가 장애가 발생하면 이전에 RDD를 만들 때 수행했던 작업을 똑같이 실행해 데이터를 복구하는 방식을 사용한다. 이처럼 스파크에서 RDD 생성 작업을 기록해 두는 것을 리니지(linege)라고 한다.


도움이 되셨다면 광고도 한 번 클릭해주시는 센스^_^


반응형
반응형


Spark dataframe(스파크 데이터프레임)으로 작업 중 dataframe의 null값을 특정값으로 바꾸고 싶은 경우가 있다.


이 때 주의해야할 점은 dataframe의 컬럼의 자료형 타입에 맞게끔 변환해줘야 정상적으로 replace된다.



다음과 같은 데이터프레임(dataframe)이 있을 때 "bid_i"의 값을 0으로 변경하려고 다음을 실행


val result_df_q_new = result_df_q.na.fill(0, Seq("bid_i"))


위와 같은 명령을 수행하고 확인을 해도 정상적으로 null값이 0으로 변경되지 않은 걸 확인할 수 있었다.


원인은 bid_i의 자료형 타입에 맞지않게 변경하려했기 때문이다.



위에서 보듯이 "bid_i"의 자료형 타입은 string인데 0으로 변경(na.fill메서드를 하려고 하니 정상적으로 변환되지 않았던 것이다. 

(처리 도중 딱히 에러메세지가 없었다...)


val result_df_q_new = result_df_q.na.fill("0", Seq("bid_i"))


0을 string형(큰따옴표)를 씌워서 명령어를 주니 정상적으로 변경되는 것을 확인할 수 있었다.


데이터프레임(dataframe) 값을 na.fill을 통해 변경할 때는 자료형타입을 잘 확인하도록 하자!


반응형
반응형


Dataframe count중 scala.MatchError 발생 ( show 명령어는 정상적으로 먹는데??? )


간만에 스파크 작업을 진행하다가 다음과 같은 에러문구를 만났다.


Caused by: scala.MatchError: [Ljava.lang.String;@17f58fdb (of class [Ljava.lang.String;)


Text 파일을 sc.textFile("path") 로 읽어와 해당 RDD를 Dataframe으로 변환하고 dataframe.count시 발생하였다.



문제의 원인은  실제 텍스트파일내의 내용을 RDD로 읽어와 split으로 나눈 후 Dataframe으로 변형하는 과정에서 


컬럼개수가 일치하지 않는 데이터가 있었다.


text파일의 포맷은 다음과 같았다.


id    advid    itemid  itemName 

1       123      456      [아임닭] 닭가슴살~~~

.

.

.


이러한 RDD를 다음과 같이 Dataframe으로 만들려고 하였다.

val rawDataDF = rawData.map(_.split("\t") match { case Array(v1,v2,v3,v4) => (v1,v2,v3,v4)}).toDF(“id”, “advid”, “item_id”, “item_name”)


전혀 문제가 없을 것 같지만 이렇게 변경 후 count명령을 내려보면 MatchError가 발생하는 것이다.


알고보니 itemName에 상품명중 탭 구분자("\t")가 들어가 있었던 상품명이 있었던 것이다.



그래서 실제로는 4개의 컬럼으로 나누어져서 Dataframe이 만들어져야 하지만 상품명에 탭구분자가 있는 경우


Array length가 4이상이되면서 macth case 조건에 정상적으로 매칭되지 않았던 것이다.



이럴 경우 Dataframe을 show로 보았을 때는 전혀 문제가 없어보일 수 있지만 count로 모든 데이터의 수를 세게 될 떄


특정 데이터에 문제가 있을 경우 matchError가 발생하게 되는 것이다.




해결방법은 map처리에서 flatMap처리로 변경 후 조건에 맞지 않는 데이터는 None처리를 진행하였다.

val df = rawData.flatMap(_.split("\t") match { case Array(v1,v2,v3,v4) => Some((v1,v2,v3,v4)) case d => None}).toDF("id", "advid", "item_id", "item_name")


또 다시 같은 삽질을 하지 않기 위해 포스팅 남겨본다.

반응형
반응형


[ Spark ] JavaRDD로 saveAsTextFile했는데 데이터가 정상적으로 나오지 않는다???


Spark(스파크) 작업을 하고나서 결과물을 hadoop(하둡)에 쓰고 싶을 경우 보통 saveAsTextFile 메소드를 사용하게 된다.


그런데 적재를 하고 하둡 명령어로 해당 파일을 열어봤는데 파일의 내용이 정상적으로 안써지고 dataframe을 RDD로 변경할 때 사용한


모델 패키지명으로 노출되는 경우가 있다. 다음과 같이...


처음 스파크로 작업을 했을 때 적잖이 당황했던 기억이나서 포스팅한다.


코드를 보게되면


다음과 같은 식으로 integratedDF라는 변수명의 dataframe을 JavaRDD로 변환하는데 매핑 모델로 AdidBidPairingModel을 쓰고 있다.


해당 모델을 보면 다음과 같다.


스파크에서 특정 클래스파일로 데이터를 쓸 때 해당 클래스의 toString메소드를 참고해 데이터를 쓰게되는데?(내추측) 


해당 모델 클래스에는 toString메소드가 없다...


보통 일반적인 스프링프로젝트에서는 lombok을 써서 알아서 생성해주지만 일반 자바 프로젝트에서는 toString메소드를 작성해주어야 한다.




다음과 같이 AdidBidPairingModel 끝부분에 toString메소드를 정의해주고 다시 spark-submit을 해보면 ~



정상적인형태로 데이터가 나오는 것을 확인할 수 있다!!!!


처음 스파크프로젝트로 작업을 하시는 분들은 꽤나 헤맬 수 있다...나또한 나중에 또 헤맬 수 있으므로 오랜만에 spark작업하는 김에 기록해본다!!!


반응형
반응형

[ 하둡 MR보다 스파크(SPARK)를 사용할 때 장점 ]

하둡MR보다 스파크(SPARK)를 사용했을 때의 대부분이 말하는 이점은 디스크 처리 기반에서 메모리 처리 기반으로 넘어오면서 연산처리 속도가 빨라졌다는 것이다.

뭐 틀린말은 아니지만 데이터 엔지니어 입장에서 뭔가 더 구체적으로 설명할 수 있어야 하지 않을까 하는 생각에 간단히 정리해 포스팅해본다.

1. 스파크(SPARK)의 연산 방식은 lazy evaluation으로 수행된다. 

Lazy evaluation(굳이 번역해 보자면 느긋한 연산 정도 되겠다)을 사용함으로써 action이 시작되는 시점에 트랜스포메이션(transformation)끼리의 연계를 파악해 실행 계획의 최적가 가능해진다. 사용자가 입력한 변환 연산들을 즉시 수행하지 않고 모아뒀다가 가장 최적의 수행 방법을 찾아 처리하는 장점을 가진다.
여기서 말하는 최적화란 대부분 지역성(locality)에 관한 것이다. 예를 들어 물건을 사오는 심부름을 시킬 때 A상점에서 파는 물건과 B상점에서 파는 물건을 따로따로 여러 번사오게 하는 것보다 필요한 물건을 한꺼번에 주문해서 한 번 방문했을 때 필요한 물건을 한 번에 사는 것이 효율적이기 떄문이다.

사실 fist() 액션에서도 스파크는 처음 일치하는 라인이 나올 때까지만 파일을 읽을 뿐 전체 파일을 읽거나 하지 않는다.

실제로 하둡 맵리듀스 같은 시스템에서는 맵리듀스의 데이터 전달 회수를 줄이기 위해 어떤 식으로 연산을 그룹화 할지 고민하느라 개발자들이 시간을 많이 빼앗기게 된다. 맵리듀스에서 연산 개수가 많다는 것은 곧 네트워크로 데이터를 전송하는 단계가 많아짐을 의미하고 그만큼 클러스터에 부하를 가져다 줄 수 있다. 스파크(SPARK)에서는 단순한 연산 들을 많이 연결해서 사용하는 것이나 하나의 복잡한 매핑 코드를 쓰는 것이나 큰 차이가 없는데 기본적으로 스파크에서 효율적인 계획을 세워서 수행하기 때문이다. 그렇다고 해서 rdd재사용 등을 고려하지 않고 아무렇게나 프로그래밍을 해도 된다는 의미는 아니다. 따라서 스파크 사용자들은 프로그램을 더 작게 만들고, 효율적인 연산의 코드를 만들어 내야 한다는 부담에서 좀 더 자유로울 수 있다.


2. RDD 재사용을 위한 캐싱 기능
기본적으로 메모리위에 캐싱을 하여 처리를 하게 되면 디스크 처리 기반의 MR작업보다 최소 10~20배 이상 빠를 수 밖에 없다. 여러 액션에서RDD 하나를 재사용하고 싶으면 RDD.persist()를 사용하여 계속 결과를 유지하도록 할 수 있다. 첫 연산이 이루어진 후 스파크는 RDD의 내용을 메모리에 저장하게 되며(클러스터의 여러 머신들에 나뉘어서) 이후의 액션들에서 재사용할 수 있게 된다.


3. RDD는 유연한 연산 방식을 제공한다.
분산 데이터로서의 RDD(Resilient Distributed Datasets)는 문자 그대로 해석하면 "회복력을 가진 분산 데이터 집합"으로  데이터를 처리하는 과정에서 집합을 이루고 있던 데이터의 일부에 문제가 생겨도 스스로 알아서 복구할 수 있다는 의미이다.  실제로 이것은 스파크(SPARK)가 RDD를 만들어 내는 방법을 기억하고 있기 때문에 가능한 것으로 스파크는 데이터의 일부가 유실되면 어딘가에 백업해둔 데이터를 다시 불러오는 것이 아니고 데이터를 다시 만들어내는 방식으로 복구를 수행하게 됩니다.


4. 코드 간결성 및 Interactive shell
하둡 MR을 해보신 분은 알겠지만 단어들을 aggregate하는 하둡 MR소스코드는 맵과 리듀스를 만들어주어야 하기 때문에 길고 복잡할 수 밖에 없는 반면에 스파크는 람다기반의 함수형 프로그래밍 기법으로코드가 매우 간단하며, interactive shell을 사용하여 실제 쉘에서 실시간으로 데이터 변화를 확인할 수 있다는 장점을 가지고 있습니다.


실제로 하둡 MR 대안으로 SQL을 MapReduce로 변환해주는 Hive 프로젝트가 있어 많은 사람이 잘 사용하고 있지만, 쿼리를 최적화하기가 어렵고 속도가 더 느려지는 경우가 많다는 어려움이 있다. 스파크는 이러한 단점들을 보안하며 위와 같은 장점들로 인해 분산 처리 툴로서 많은 관심과 사랑?을 받고 있다고 볼 수 있습니다.


반응형
반응형

스파크 작업을 하다보면 데이터 처리시 쿼리 기반의 spark sql, hive를 이용하기위해 orc로 데이터를 적재하는 경우가 많다.


이 때 spark-shell로 orc파일을 읽어들여 데이터를 보게되면 컬럼보다 데이터내용이 길게되면 잘려서 노출된다.


따라서 데이터 내용을 보고싶다면 dataframe형태의 데이터를 rdd로 만들어서 first나 rdd.take(n).foreach(println)식으로 보도록 하자



1. orc 파일 읽기

val data = sqlContext.read.format("orc").load("hdfs file directory")          //spark1.5,6 version


2. 읽어들인 데이터(dataframe형태)를 rdd로 변경하기

val rdd = data.rdd


3. rdd로 변경한 데이터 보기

rdd.first

rdd.take(n).foreach(println)         //n은 보고싶은 라인 개수




반응형
반응형

Collect연산?

Spark에서 Collect연산은 RDD의 모든 원소를 모아서 배열로 돌려줍니다.

반환 타입이 RDD가 아닌 배열이므로 이 연산은 액션에 속하는 연산입니다.


[ Collect연산을 사용하실 때 주의사항 ]

Collect 연산을 수행하면 RDD에 있는 모든 요소들이 collect 연산을 호출한 서버의 메모리에 수집되기 때문에

전체 데이터를 모두 담을 수 있을 정도의 충분한 메모리 공간이 확보되어 있는 상태에서만 사용해야 합니다.

그렇지 않을 경우에는 out of memory exception이 발생할 수 있습니다.


따라서 작은 크기으 데이터를 디버깅하거나 처리할 때 제한적으로 사용하시길 바랍니다.

반응형

+ Recent posts