반응형

Collect연산?

Spark에서 Collect연산은 RDD의 모든 원소를 모아서 배열로 돌려줍니다.

반환 타입이 RDD가 아닌 배열이므로 이 연산은 액션에 속하는 연산입니다.


[ Collect연산을 사용하실 때 주의사항 ]

Collect 연산을 수행하면 RDD에 있는 모든 요소들이 collect 연산을 호출한 서버의 메모리에 수집되기 때문에

전체 데이터를 모두 담을 수 있을 정도의 충분한 메모리 공간이 확보되어 있는 상태에서만 사용해야 합니다.

그렇지 않을 경우에는 out of memory exception이 발생할 수 있습니다.


따라서 작은 크기으 데이터를 디버깅하거나 처리할 때 제한적으로 사용하시길 바랍니다.

반응형
반응형

[ 작업 배경 ] 


현재 로그 시스템에서 여러 여러 파일 포맷(gz, parquet 등)사용하여 데이터 처리 작업이 이루어지고 있고


앞으로 추가적인 작업 및 dmp 운영에 통합된 데이터 포맷을 사용하여 관리가 필요함.


그 타겟으로 ORC, PARQUET 데이터 포맷을 검토해보게되었다. gz은 가급적 지양하도록 하자...



[ 테스트 시나리오 ]


1 case : text파일을 읽어 dataframe으로 변형 후 orc, parquet 파일포맷으로 저장하는 속도 비교


2 case : orc, parquet 파일포맷으로 저장되어 있는 파일을 읽어들여 row를 count하는 속도 비교


3 case : orc, parquet 파일포맷을 읽어들여 데이터 프레임을 table로 생성 후 spark sql로 adid컬럼만 추출하여 orc, parquet 파일포


맷으로 다시 저장하는 속도 비교



[ 테스트 환경 ]

- MacBook Pro (Retina, Mid 2014)

- Intel Core i5 2.6GHz 듀얼코어

- 8GB RAM 

- 256GB SSD

- Spark Version : 2.0.1

- Scala Version : 2.11.8



[ 테스트 데이터 사이즈 및 포맷 ]


- 총 row : 17685383

- DataFormat

0000040B-D236-4AF8-B95A-80B5FBA306A1 0 2 0

0000040F-91BF-4726-A3E9-B53836D3848D 1 6 2

0000067C-8719-4E41-928C-417965803D61 1 9 1

000006C6-2E07-4F84-AB8A-13A0D412C745 0 6 2

.

.

.



[ 테스트 코드 ]


scala code

  // 1 case : text 파일을 읽어 dataframe으로 변형 후 parquet 파일포맷으로 저장

  def textFileSaveToParquet(sc : SparkContext, sqlContext : HiveContext) : Unit = {

    import sqlContext.implicits._


    val demo_adids = sc.textFile("/Users/nhnent/Desktop/demo20170209.out")

    val demo_adids_df = demo_adids.map(_.split("\t") match { case Array(v1, v2, v3, v4) => (v1, v2, v3, v4)}).toDF("adid", "gender", "age", "married")


    val output = "/Users/nhnent/Desktop/spark_sample_data/testResult/parquet_result.out"

    demo_adids_df.toDF.write.format("parquet").save(output)


  }



  // text 파일을 읽어 dataframe으로 변형 후 orc 파일포맷으로 저장

  def textFileSaveToOrc(sc : SparkContext, sqlContext : HiveContext) : Unit = {

    import sqlContext.implicits._

    val demo_adids = sc.textFile("/Users/nhnent/Desktop/demo20170209.out")

    val demo_adids_df = demo_adids.map(_.split("\t") match { case Array(v1, v2, v3, v4) => (v1, v2, v3, v4)}).toDF("adid", "gender", "age", "married")



    val output = "/Users/nhnent/Desktop/spark_sample_data/testResult/orc_result.out"

    demo_adids_df.toDF.write.format("orc").save(output)

  }



  // parquet 파일을 읽어들여 해당 row를 count

  def readParquetFile(sc : SparkContext, sqlContext: SQLContext) : Unit = {

    val file_path = "/Users/nhnent/Desktop/spark_sample_data/testResult/parquet_result.out/part-*"

    val parquet_file = sqlContext.read.parquet(file_path)

    val count_parquet_file = parquet_file.count()

    log.warn(s"parquet count : $count_parquet_file")

  }

  


  // orc 파일을 읽어들여 해당 row를 count

  def readOrcFile(sc :  SparkContext, sqlContext: SQLContext) : Unit = {

    val file_path = "/Users/nhnent/Desktop/spark_sample_data/testResult/orc_result.out/part-*"

    val orc_file = sqlContext.read.orc(file_path)

    val count_orc_file = orc_file.count()

    log.warn(s"orc count : ${count_orc_file}")

  }



  // parquet 파일을 읽어들여 dataframe을 테이블로 생성 후 spark sql로 adid만 뽑아 낸 후 parquet 포맷으로 저장

  def readParquetAndSaveParquet(sc : SparkContext, sqlContext : SQLContext) : Unit = {

    val file_path = "/Users/nhnent/Desktop/spark_sample_data/testResult/parquet_result.out/part-*"

    val parquet_file = sqlContext.read.parquet(file_path)

    parquet_file.createOrReplaceTempView("info")


    val parquest_adid_file=sqlContext.sql("select adid from info")


    val output = "/Users/nhnent/Desktop/spark_sample_data/testResult/parquet_adid_result.out"

    parquest_adid_file.toDF.write.format("parquet").save(output)


  }



  // orc 파일을 읽어들여 dataframe을 테이블로 생성 후 spark sql로 adid만 뽑아 낸 후 orc 포맷으로 저장

  def readOrcAndSaveOrc(sc : SparkContext, sqlContext : SQLContext) : Unit = {

    val file_path = "/Users/nhnent/Desktop/spark_sample_data/testResult/orc_result.out/part-*"

    val orc_file = sqlContext.read.orc(file_path)


    orc_file.createOrReplaceTempView("info")


    val orc_adid_file=sqlContext.sql("select adid from info")


    val output = "/Users/nhnent/Desktop/spark_sample_data/testResult/orc_adid_result.out"

    orc_adid_file.toDF.write.format("orc").save(output)

  }




[ 테스트 결과 ]


1 case(parquet) : Execute textFileSaveToParquet : 34211

1 case(orc)     : Execute textFileSaveToOrc : 17173

2 case(parquet) : Execute readParquetFile : 1484

2 case(orc)     : Execute readOrcFile : 7161

3 case(parquet) : Execute readParquetAndSaveParquet : 30632

3 case(orc)     : Execute readOrcAndSaveOrc : 26512


1 case(parquet) : Execute textFileSaveToParquet : 31381

1 case(orc)     : Execute textFileSaveToOrc : 27624

2 case(parquet) : Execute readParquetFile : 924

2 case(orc)     : Execute readOrcFile : 5452

3 case(parquet) : Execute readParquetAndSaveParquet : 21010

3 case(orc)     : Execute readOrcAndSaveOrc : 18438



[ 결론 ]


< read&write 성능 >

파일 WRITE 성능 : *ORC > PARQUET*

파일 READ 성능  : *PARQUET > ORC*

파일 READ & SQL & 파일 WRITE 성능 : *ORC > PARQUET*


<파일 사이즈>

725M    원본파일(demo20170209.out)

560M orc_adid_result.out

582M orc_result.out

557M parquet_adid_result.out

572M parquet_result.out

*parquet > orc*


좀 더 큰 데이터로 알파 환경에서 확인이 필요하긴 하지만 로컬환경에서 테스트 결과 파일을 읽어들일 때의 속도는 parquet가 빨랐지


만 전체적인면으로 볼 때 ORC를 사용하는 것이 좋다고 판단된다.



반응형

+ Recent posts