반응형


Spark dataframe(스파크 데이터프레임)으로 작업 중 dataframe의 null값을 특정값으로 바꾸고 싶은 경우가 있다.


이 때 주의해야할 점은 dataframe의 컬럼의 자료형 타입에 맞게끔 변환해줘야 정상적으로 replace된다.



다음과 같은 데이터프레임(dataframe)이 있을 때 "bid_i"의 값을 0으로 변경하려고 다음을 실행


val result_df_q_new = result_df_q.na.fill(0, Seq("bid_i"))


위와 같은 명령을 수행하고 확인을 해도 정상적으로 null값이 0으로 변경되지 않은 걸 확인할 수 있었다.


원인은 bid_i의 자료형 타입에 맞지않게 변경하려했기 때문이다.



위에서 보듯이 "bid_i"의 자료형 타입은 string인데 0으로 변경(na.fill메서드를 하려고 하니 정상적으로 변환되지 않았던 것이다. 

(처리 도중 딱히 에러메세지가 없었다...)


val result_df_q_new = result_df_q.na.fill("0", Seq("bid_i"))


0을 string형(큰따옴표)를 씌워서 명령어를 주니 정상적으로 변경되는 것을 확인할 수 있었다.


데이터프레임(dataframe) 값을 na.fill을 통해 변경할 때는 자료형타입을 잘 확인하도록 하자!


반응형
반응형


Dataframe count중 scala.MatchError 발생 ( show 명령어는 정상적으로 먹는데??? )


간만에 스파크 작업을 진행하다가 다음과 같은 에러문구를 만났다.


Caused by: scala.MatchError: [Ljava.lang.String;@17f58fdb (of class [Ljava.lang.String;)


Text 파일을 sc.textFile("path") 로 읽어와 해당 RDD를 Dataframe으로 변환하고 dataframe.count시 발생하였다.



문제의 원인은  실제 텍스트파일내의 내용을 RDD로 읽어와 split으로 나눈 후 Dataframe으로 변형하는 과정에서 


컬럼개수가 일치하지 않는 데이터가 있었다.


text파일의 포맷은 다음과 같았다.


id    advid    itemid  itemName 

1       123      456      [아임닭] 닭가슴살~~~

.

.

.


이러한 RDD를 다음과 같이 Dataframe으로 만들려고 하였다.

val rawDataDF = rawData.map(_.split("\t") match { case Array(v1,v2,v3,v4) => (v1,v2,v3,v4)}).toDF(“id”, “advid”, “item_id”, “item_name”)


전혀 문제가 없을 것 같지만 이렇게 변경 후 count명령을 내려보면 MatchError가 발생하는 것이다.


알고보니 itemName에 상품명중 탭 구분자("\t")가 들어가 있었던 상품명이 있었던 것이다.



그래서 실제로는 4개의 컬럼으로 나누어져서 Dataframe이 만들어져야 하지만 상품명에 탭구분자가 있는 경우


Array length가 4이상이되면서 macth case 조건에 정상적으로 매칭되지 않았던 것이다.



이럴 경우 Dataframe을 show로 보았을 때는 전혀 문제가 없어보일 수 있지만 count로 모든 데이터의 수를 세게 될 떄


특정 데이터에 문제가 있을 경우 matchError가 발생하게 되는 것이다.




해결방법은 map처리에서 flatMap처리로 변경 후 조건에 맞지 않는 데이터는 None처리를 진행하였다.

val df = rawData.flatMap(_.split("\t") match { case Array(v1,v2,v3,v4) => Some((v1,v2,v3,v4)) case d => None}).toDF("id", "advid", "item_id", "item_name")


또 다시 같은 삽질을 하지 않기 위해 포스팅 남겨본다.

반응형
반응형

스파크 데이터프레임(Dataframe) partitionBy를 사용해 원하는대로 손쉽게 저장하자!


스파크(Spark) 데이터프레임(Dataframe) 혹은 데이터셋(Dataset)을 통해 작업하게 되면 


sql기반의 명령을 통해서 데이터를 손쉽게 활용할 수 있다는 점과 더불어 특정 컬럼 기반으로 


데이터를 저장할 수가 있다.


잠깐 데이터프레임(Dataframe)과 데이터셋(Dataset)에 대해 언급하자면 데이터셋(Dataset)은 데이터프레임과 RDD의 단점들을


보완한 모델로 Spark 1.6이상 버전부터 사용할 수 있다.


이번에 데이터프레임(Dataframe)을 partitionBy를 통해 저장해보았는데 이런 기능이 있다라고만 알았지


막상 써보니 너무 편해서 정리하게 되었다.


다음과 같은 데이터프레임(Dataframe)이 있을 때


partitionBy를 이용하면 특정 컬럼을 기반으로 디렉토리를 나누어 저장할 수 있다.


예를들어 advid를 기준으로 데이터를 나누어 저장하고 싶을 떄


df.write.partitionBy("advid").save("/저장될경로")


라고 저장해주면 다음과 같이 파일들이 advid를 기준으로 저장되게 된다.


이 얼마나 간편한가!!!!


다들 partitionBy를 통해 데이터를 원하는대로 손쉽게 저장해서 사용하시길!!!

반응형
반응형

스파크 작업을 하다보면 데이터 처리시 쿼리 기반의 spark sql, hive를 이용하기위해 orc로 데이터를 적재하는 경우가 많다.


이 때 spark-shell로 orc파일을 읽어들여 데이터를 보게되면 컬럼보다 데이터내용이 길게되면 잘려서 노출된다.


따라서 데이터 내용을 보고싶다면 dataframe형태의 데이터를 rdd로 만들어서 first나 rdd.take(n).foreach(println)식으로 보도록 하자



1. orc 파일 읽기

val data = sqlContext.read.format("orc").load("hdfs file directory")          //spark1.5,6 version


2. 읽어들인 데이터(dataframe형태)를 rdd로 변경하기

val rdd = data.rdd


3. rdd로 변경한 데이터 보기

rdd.first

rdd.take(n).foreach(println)         //n은 보고싶은 라인 개수




반응형

+ Recent posts