반응형

스파크 작업을 하다보면 데이터 처리시 쿼리 기반의 spark sql, hive를 이용하기위해 orc로 데이터를 적재하는 경우가 많다.


이 때 spark-shell로 orc파일을 읽어들여 데이터를 보게되면 컬럼보다 데이터내용이 길게되면 잘려서 노출된다.


따라서 데이터 내용을 보고싶다면 dataframe형태의 데이터를 rdd로 만들어서 first나 rdd.take(n).foreach(println)식으로 보도록 하자



1. orc 파일 읽기

val data = sqlContext.read.format("orc").load("hdfs file directory")          //spark1.5,6 version


2. 읽어들인 데이터(dataframe형태)를 rdd로 변경하기

val rdd = data.rdd


3. rdd로 변경한 데이터 보기

rdd.first

rdd.take(n).foreach(println)         //n은 보고싶은 라인 개수




반응형
반응형

카프카(KAFKA) 데이터 처리방식의 특화된 기능


1. KAFKA는 기존 메시징 시스템과는 달리 메시지를 메모리대신 파일 시스템에 쌓아두고 관리한다.


2. 디스크에 기반한 영속적인 저장 방식을 사용하지만 페이지 캐시를 활용하여 높은 처리량을 제공하는 인메모리 방식에 가깝다


3. 메모리에 별도의 캐시를 구현하지 않고 OS의 페이지 캐시에 위임하고 OS가 알아서 서버의 유휴 메모리를 페이지 캐시로 사용하여 앞으로 필요한 것으로 예상되는 메시지들을 미리 읽어들여(readahead)디스크 읽기 성능을 향상 시킨다.


4. Kafka 프로세스가 직접 캐시를 관리하지 않고 OS에 위임하기 때문에 프로세스를 재시작 하더라도 OS의 페이지 캐시는 그대로 남아있기 때문에 프로세스 재시작 후 캐시를 워밍업할 필요가 없다는 장점이 있다. 


5. 여러 consumer가 한 topic으로부터 여러 번에 걸쳐 메시지를 가져올 수 있다. 이러한 방식이 가능한 이유는 클라이언트가 해당 queue에서 어느 부분까지 데이터를 받아갔는지 위치를 알려주는 'offset'을 관리하기 때문이다.


6. 메시지를 메모리에 저장하지 않기 때문에 메시지가 JVM 객체로 변환되면서 크기가 커지는 것을 방지할 수 있고 JVM의 GC로 인한 성능 저하를 피할 수 있다.

반응형

'Bigdata > Kafka' 카테고리의 다른 글

[Kafka] 큐잉 시스템과 카프카의 차이  (0) 2017.06.10
반응형

Spark 직렬화 포맷

spark는 네트워크로 데이터를 전송하거나 디스크에 쓸 때 객체들을 직렬화해 바이너리 포맷으로 변환한다.

기본적으로 Java에 내장된 직렬화를 이용하지만 spark는 java 직렬화보다 훨씬 향상된 서드파티 라이브러리인

kryo를 쓰는 것을 지원한다.

반응형
반응형

Spark scala.reflect.api.JavaUniverse.runtimeMirror 에러


Exception in thread "main" java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaMirrors$JavaMirror;

        at com.payco.dmp.orc.converter.ClickerScala$.main(ClickerScala.scala:36)

        at com.payco.dmp.orc.converter.ClickerScala.main(ClickerScala.scala)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)

        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)

        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)

        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)

        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



spark-submit으로 spark 작업 실행 다음과 같은 에러 메세지가 떨어졌다면 build.sbt scala버전과 spark library 버전이


서버에서 사용중인 spark 버전과 맞는지 확인하기 바란다.


나의 경우는 spark2.1.1 에서 동작하는 scala spark프로젝트 내부 scalaVersion 2.10.5에서 2.11.5 변경하여 해결하였다.



반응형
반응형

Scala 버전만 작성하였다.


Java, Python, R에 대해서도 정보가 필요하다면 글 하단의 참조 링크를 참고 바란다.


SparkConf( Spark 1.6 / Spark 2.x)

You will continue to use these classes (via the sparkContext accessor) to perform operations that require the Spark Core API, such as working with accumulators, broadcast variables, or low-level RDDs. However, you will not need to manually create it.

// Spark 1.6
val sparkConf = new SparkConf().setMaster("local[*]")
sparkConf.set("spark.files", "file.txt")
 
// Spark 2.x
val spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.files", "file.txt")


SparkContext( Spark 1.6 / Spark 2.x)

The SQLContext is completely superceded by SparkSession. Most Dataset and DataFrame operations are directly available in SparkSession. Operations related to table and database metadata are now encapsulated in a Catalog (via the catalog accessor).

// Spark 1.6
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json("data.json")
val tables = sqlContext.tables()
 
// Spark 2.x
val spark = SparkSession.builder.getOrCreate()
val df = spark.read.json("data.json")
val tables = spark.catalog.listTables()


HiveContext( Spark 1.6 / Spark 2.x)

The HiveContext is completely superceded by SparkSession. You will need enable Hive support when you create your SparkSession and include the necessary Hive library dependencies in your classpath.

// Spark 1.6
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val hiveContext = new HiveContext(sc)
val df = hiveContext.sql("SELECT * FROM hiveTable")
 
// Spark 2.x
val spark = SparkSession.builder.enableHiveSupport().getOrCreate()
val df = spark.sql("SELECT * FROM hiveTable")


ref : https://sparkour.urizone.net/recipes/understanding-sparksession/

반응형
반응형

오늘 스파크 작업중 No TypeTag available for 에러가 발생하였다. 


밑에 소스코드 부분에서 val df = sQLContext.createDataFrmae(test_rdd3) 부분에서 발생하였고 


case class의 위치를 메소드의 바깥 부분으로 빼내어서 처리하였다. 


원인에 대해 참고할 만한 stackoverflow 내용이 있어 참고하였다.

https://stackoverflow.com/questions/29143756/scala-spark-app-with-no-typetag-available-error-in-def-main-style-app





[ 에러 발생 코드 ] 




[ 에러 해결 ]

case class를 main method 바깥으로 빼서 처리





뭔가 컴파일 시점에 case class를 정의하는 시점과 case class를 통한 spark의 action이 일어나는 시점과 연관이 있을 것 같

은데 자세한 원인에 대해서는 파악하지 못했다ㅠ 혹시 조언해주실분 있으면 감사하겠습니다. 

스칼라 공부를 좀 더 심도있게해야 할듯 싶다.


반응형
반응형

Collect연산?

Spark에서 Collect연산은 RDD의 모든 원소를 모아서 배열로 돌려줍니다.

반환 타입이 RDD가 아닌 배열이므로 이 연산은 액션에 속하는 연산입니다.


[ Collect연산을 사용하실 때 주의사항 ]

Collect 연산을 수행하면 RDD에 있는 모든 요소들이 collect 연산을 호출한 서버의 메모리에 수집되기 때문에

전체 데이터를 모두 담을 수 있을 정도의 충분한 메모리 공간이 확보되어 있는 상태에서만 사용해야 합니다.

그렇지 않을 경우에는 out of memory exception이 발생할 수 있습니다.


따라서 작은 크기으 데이터를 디버깅하거나 처리할 때 제한적으로 사용하시길 바랍니다.

반응형
반응형

큐잉 시스템과 카프카가 다른점

분명히 카프카는 메시지들이 수신된 순서대로 처리되도록 보장하기 위해 많은 문제를 겪는 ActiveMQ나 RabbitMQ 같은 큐잉 시스템이 아니다.

카프카의 파티셔닝 시스템은 이런 구조를 유지하지 않는다. 특정한 토픽의 파티션에 대한 쓰기와 읽기 순서에 대한 정의가 없으므로 클라이언트는 메시지가 쓰여진 순서와 다르게 파티션에서 읽을 수도 있다. 게다가 생산자를 비동기로 구현하는 일이 흔해서 한 파티션으로 보내진 메시지는 (비록 응답대기시간이나 비결정적 이벤트의 차이로 인해 먼저 발생하더라도) 또 다른 파티션으로 보내진 메시지 이후에 쓰여질 수도 있다.


카프카는 또한 메시지 소비자를 다루는 방법에서도 많은 큐잉 시스템과는 다르다. 대부분의 큐잉 시스템에서 메시지는 소비되었을 때 시스템에서 제거된다. 카프카는 메시지를 제거하는 메커니즘이 없는 대신, 소비한 마지막 메시지의 오프셋을 지속적으로 파악하기 위해 소비자에 의존한다. 로그는 카프카 설정의 log.retention.hours 설정에 의해 삭제된다.


[ 참고 ] 실시간 분석의 모든 것


반응형

'Bigdata > Kafka' 카테고리의 다른 글

카프카(KAFKA) 데이터 처리방식의 특화된 기능  (1) 2017.08.30
반응형

[ 작업 배경 ] 


현재 로그 시스템에서 여러 여러 파일 포맷(gz, parquet 등)사용하여 데이터 처리 작업이 이루어지고 있고


앞으로 추가적인 작업 및 dmp 운영에 통합된 데이터 포맷을 사용하여 관리가 필요함.


그 타겟으로 ORC, PARQUET 데이터 포맷을 검토해보게되었다. gz은 가급적 지양하도록 하자...



[ 테스트 시나리오 ]


1 case : text파일을 읽어 dataframe으로 변형 후 orc, parquet 파일포맷으로 저장하는 속도 비교


2 case : orc, parquet 파일포맷으로 저장되어 있는 파일을 읽어들여 row를 count하는 속도 비교


3 case : orc, parquet 파일포맷을 읽어들여 데이터 프레임을 table로 생성 후 spark sql로 adid컬럼만 추출하여 orc, parquet 파일포


맷으로 다시 저장하는 속도 비교



[ 테스트 환경 ]

- MacBook Pro (Retina, Mid 2014)

- Intel Core i5 2.6GHz 듀얼코어

- 8GB RAM 

- 256GB SSD

- Spark Version : 2.0.1

- Scala Version : 2.11.8



[ 테스트 데이터 사이즈 및 포맷 ]


- 총 row : 17685383

- DataFormat

0000040B-D236-4AF8-B95A-80B5FBA306A1 0 2 0

0000040F-91BF-4726-A3E9-B53836D3848D 1 6 2

0000067C-8719-4E41-928C-417965803D61 1 9 1

000006C6-2E07-4F84-AB8A-13A0D412C745 0 6 2

.

.

.



[ 테스트 코드 ]


scala code

  // 1 case : text 파일을 읽어 dataframe으로 변형 후 parquet 파일포맷으로 저장

  def textFileSaveToParquet(sc : SparkContext, sqlContext : HiveContext) : Unit = {

    import sqlContext.implicits._


    val demo_adids = sc.textFile("/Users/nhnent/Desktop/demo20170209.out")

    val demo_adids_df = demo_adids.map(_.split("\t") match { case Array(v1, v2, v3, v4) => (v1, v2, v3, v4)}).toDF("adid", "gender", "age", "married")


    val output = "/Users/nhnent/Desktop/spark_sample_data/testResult/parquet_result.out"

    demo_adids_df.toDF.write.format("parquet").save(output)


  }



  // text 파일을 읽어 dataframe으로 변형 후 orc 파일포맷으로 저장

  def textFileSaveToOrc(sc : SparkContext, sqlContext : HiveContext) : Unit = {

    import sqlContext.implicits._

    val demo_adids = sc.textFile("/Users/nhnent/Desktop/demo20170209.out")

    val demo_adids_df = demo_adids.map(_.split("\t") match { case Array(v1, v2, v3, v4) => (v1, v2, v3, v4)}).toDF("adid", "gender", "age", "married")



    val output = "/Users/nhnent/Desktop/spark_sample_data/testResult/orc_result.out"

    demo_adids_df.toDF.write.format("orc").save(output)

  }



  // parquet 파일을 읽어들여 해당 row를 count

  def readParquetFile(sc : SparkContext, sqlContext: SQLContext) : Unit = {

    val file_path = "/Users/nhnent/Desktop/spark_sample_data/testResult/parquet_result.out/part-*"

    val parquet_file = sqlContext.read.parquet(file_path)

    val count_parquet_file = parquet_file.count()

    log.warn(s"parquet count : $count_parquet_file")

  }

  


  // orc 파일을 읽어들여 해당 row를 count

  def readOrcFile(sc :  SparkContext, sqlContext: SQLContext) : Unit = {

    val file_path = "/Users/nhnent/Desktop/spark_sample_data/testResult/orc_result.out/part-*"

    val orc_file = sqlContext.read.orc(file_path)

    val count_orc_file = orc_file.count()

    log.warn(s"orc count : ${count_orc_file}")

  }



  // parquet 파일을 읽어들여 dataframe을 테이블로 생성 후 spark sql로 adid만 뽑아 낸 후 parquet 포맷으로 저장

  def readParquetAndSaveParquet(sc : SparkContext, sqlContext : SQLContext) : Unit = {

    val file_path = "/Users/nhnent/Desktop/spark_sample_data/testResult/parquet_result.out/part-*"

    val parquet_file = sqlContext.read.parquet(file_path)

    parquet_file.createOrReplaceTempView("info")


    val parquest_adid_file=sqlContext.sql("select adid from info")


    val output = "/Users/nhnent/Desktop/spark_sample_data/testResult/parquet_adid_result.out"

    parquest_adid_file.toDF.write.format("parquet").save(output)


  }



  // orc 파일을 읽어들여 dataframe을 테이블로 생성 후 spark sql로 adid만 뽑아 낸 후 orc 포맷으로 저장

  def readOrcAndSaveOrc(sc : SparkContext, sqlContext : SQLContext) : Unit = {

    val file_path = "/Users/nhnent/Desktop/spark_sample_data/testResult/orc_result.out/part-*"

    val orc_file = sqlContext.read.orc(file_path)


    orc_file.createOrReplaceTempView("info")


    val orc_adid_file=sqlContext.sql("select adid from info")


    val output = "/Users/nhnent/Desktop/spark_sample_data/testResult/orc_adid_result.out"

    orc_adid_file.toDF.write.format("orc").save(output)

  }




[ 테스트 결과 ]


1 case(parquet) : Execute textFileSaveToParquet : 34211

1 case(orc)     : Execute textFileSaveToOrc : 17173

2 case(parquet) : Execute readParquetFile : 1484

2 case(orc)     : Execute readOrcFile : 7161

3 case(parquet) : Execute readParquetAndSaveParquet : 30632

3 case(orc)     : Execute readOrcAndSaveOrc : 26512


1 case(parquet) : Execute textFileSaveToParquet : 31381

1 case(orc)     : Execute textFileSaveToOrc : 27624

2 case(parquet) : Execute readParquetFile : 924

2 case(orc)     : Execute readOrcFile : 5452

3 case(parquet) : Execute readParquetAndSaveParquet : 21010

3 case(orc)     : Execute readOrcAndSaveOrc : 18438



[ 결론 ]


< read&write 성능 >

파일 WRITE 성능 : *ORC > PARQUET*

파일 READ 성능  : *PARQUET > ORC*

파일 READ & SQL & 파일 WRITE 성능 : *ORC > PARQUET*


<파일 사이즈>

725M    원본파일(demo20170209.out)

560M orc_adid_result.out

582M orc_result.out

557M parquet_adid_result.out

572M parquet_result.out

*parquet > orc*


좀 더 큰 데이터로 알파 환경에서 확인이 필요하긴 하지만 로컬환경에서 테스트 결과 파일을 읽어들일 때의 속도는 parquet가 빨랐지


만 전체적인면으로 볼 때 ORC를 사용하는 것이 좋다고 판단된다.



반응형

+ Recent posts