ERROR: Memory limit exceeded Instance 5240c598fffb12fd:813807e3a953de95 of plan fragment F00 of query 5240c598fffb12fd:813807e3a953de91 could not start because the backend Impala daemon is over its memory limit
문제해결
클라우데라 IMPALA -> 구성 -> 단일 풀 메모리 제한(default_pool_mem_limit) 변경
Impala 쿼리를 통해 정확하진 않더라도 대략적인 distinct한 모수를 추정하고 싶을 때 사용할 수 있는 함수가 있어 간단히 확인한겸 옮겨 적어본다. 보통 추정에서 많이 사용되는 하이퍼로그로그와 비슷한 기능을 IMPALA에서도 제공하고 있다. 물론 적은 리소스를 가지고 효율적으로 추정만 하기에 정확한 모수를 알고 싶은경우는 패스하도록 한다. 차이는 대략 1.9%정도 났다. (일반적으로 3%정도 차이가 난다고 생각하면 된다.)
아래와 같은 데이터에서 선생님별 학생리스트가 아닌 1-1 matching된 multi row로 만들 때 보통 lateral view explode()를 사용한다. 하지만 주의할게 explode() udf는 아래와 같이 array형태나 mpa as a parameter형태만 지원한다.
teacher
student_id_list
teacher1
[1,2,3]
table : class
$ SELECT teacher, id from class LATERAL VIEW explode(student_id_list) ids AS id where adid='teacher1';
위처럼 lateral view explode 을 실행하면 아래처럼 muliti row 형태로 변환된다.
teacher
student_id_list
teacher1
1
teacher1
2
teacher1
3
하지만 student_id_list가 String 형태로 되어있다면?
teacher
student_id_list
teacher1
1,2,3
위에서 언급한 것 처럼 explode()는 array형태나 map as a parameter형태의 데이터에만 지원되기 때문에 다음과 같이 처리해준다.
$ SELECT teacher, id from class LATERAL VIEW explode(student_id_list, ',') ids AS id where adid='teacher1';
보통 impala(임팔라)에서 임시 테이블을 생성 후 해당 테이블의 데이터를 기존 Imala의 데이터와 Join(조인)해서 사용해야 할 경우 다음과 같이 테이블을 생성 후 해당 경로에 데이터를 밀어넣어 준다.
create table DBNAME.TABLENAME (uid STRING) row format delimited fields terminated by ' ' LOCATION "hdfs:///user/hive/warehouse/특정원하는디렉토리명칭/“;
문제는 해당 명령어로 TABLE을 만들고 Imapa shell상에서 drop table [테이블명] 을 하였을 때 LOCATION으로 지정된 HDFS 파일은 남아 있게 되고 hadoop 명령어로 따로 지우려고 하면 보통 Hadoop 명령어 실행 권한과 해당 경로에 파일을 쓴 권한 [Impala] 가 달라서 Permission denied되며 삭제 되지 않는다.
따라서 해당 테이블을 Impala 테이블에서 Drop시키면서 연결괸 HDFS File도 함께 삭제 하고 싶다면 'PURGE' 옵션을 함께 주자.
jar로 작업된 스파크 프로젝트를 spark-submit 실행시 아래와 같은 에러가 발생했다.
RejectedExecutionException은 첨 겪어봐서 당황했지만....문제의 원인은 단순했다.
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.scheduler.TaskResultGetter$$anon$2@ab4016a rejected from java.util.concurrent.ThreadPoolExecutor@9c5a328[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 603] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
프로젝트 빌드시 spark driver셋팅을 잘못해준것...
trait InitSpark {
// for local
// val sparkConf = new SparkConf().setAppName("Spark-1.6.x-sample").setMaster("local[*]").set("spark.driver.host", "localhost");
// for build
val sparkConf = new SparkConf().setAppName("Spark-1.6.x-sample")
val sc = new SparkContext(sparkConf)
val sqlContext = new HiveContext(sc)
}
로컬모드 (setMaster("local[*]").set("spark.driver.host", "localhost") 로 빌드하고 돌려서 문제가 되었다...
보통은 작업한 스파크 버전과 spark-submit을 실행하는 환경의 스파크 버전이 달라도 발생하는 에러메세지라고 한다.
최근 운영하고 있는 하둡 클러스터의 노후 장비 교체건으로 데이터노드 한대를 제거 하는 작업이 진행되었다.
해당 로그는 클러스터에 붙어 hdfs을 쓰고 있던 외부 서버에서 발생한 로그이다.
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /user/example....log._COPYING_ (inode 428475201): File does not exist. Holder DFSClient_NONMAPREDUCE_-1052637306_1 does not have any open files.
로그가 발생한 이유는 해당 서버가 hdfs 해당 데이터노드 특정 파일에 write작업을 하고 있었는데 중간에 데이터노드가 내려가면서 write하고 있던 파일을 찾지 못해 발생한 것이다.
HDFS에서 Lease는 다음과 같이 정의된다.
In HDFS these locks are called Leases. Leases are granted to a client which request to open a file for a write operation (e.g. create / append / truncate a file.) Every lease belongs to a single HDFS Client but could be for several HDFS files. Often enough a lease has several thousand files open for write by a single HDFS client. As the client opens and closes files, the appropriate lease must be identified and updated. The exact datastructures have been changed quite frequently over the years to provide better lookups, better reverse lookups, speed and space efficiency etc. However all this accounting obviously is done on the NameNode. This is in stark contrast to GFS, where a lease is tracked by the Namenode (master server in their parlance) and Datanodes (chunk servers in their parlance) (Section 3.1 in the Google File System paper). For HDFS this means the Namenode has a higher overhead of now maintaining these leases (something GFS expressly wanted to avoid). However this also allows HDFS to allow renames of files being written (which in my experience is not too uncommon an operation.)
이 경우 외에도 스파크(Spark)나 hive의 병렬 작업시에도 작업이 꼬여 발생할 수 있다는 걸 검색을 하다가 알게되었다.
최근 사이즈가 큰 대략 두 데이터 각각 2TB (2000GB) 정도의 데이터의 ID값들을 JOIN시켜 얼마나 일치하는지 확인하는 작업을 진행하였다. 간만에 들어온 Adhoc요청에 요구사항 파악이 먼저였고 "사이즈가 커봤자 스파크가 executor개수와 core개수 excetuor memory설정만 맞춰주면 잘 돌리면 되겠다."라고 생각했었다.
하지만 사이즈가 크다보니 실제 운영 클러스터에서 spark-shell에 접속해 command를 날리는건 클러스터의 실제 운영되는 작업에 영향을 줄 수 있기에 일정 부분의 데이터들을 떼와 로컬 Spark 모듈 프로젝트를 통해 원하는대로 파싱하고 조인해서 결과값이 나오는지 먼저 확인하였다.
두 데이터 각각 2TB들을 가공해 뽑아야 하는 조건은 15가지 정도 되었고 나는 해당 코드작업을 하고 jar파일로 말아 특정 리소스풀을 사용하는 조건과 스파크 설정값들을 알맞게 설정해 spark-submit을 실행할 예정이었다.
작업에 대해 설명하기 전에 클러스터 규모에 대해 간단히 언급하자면 실제 서비스를 운영하는 클러스터는 아니였고
Data Lake로 사용되는 데이터노드 5대로 이루어진 규모가 그렇게 크지 않은 클러스터였다.
데이터 추출의 첫 번째 조건
특정 action을 했던 ID값들을 뽑아 distinct하고 counting하는 작업이었다.
물론 데이터사이즈가 크긴했지만 단순한 작업이였기에 그렇게 오래는 걸리지 않을거라고 예상했다 (20분 이내 예상)
실제 Spark-submit을 수행하였고 각 파일들을 읽어들여 카운팅 작업이 수행되는 것을 Spark에서 제공하는 Application UI을 통해 어떤 작업 현재 진행중인지 executor들을 잘 할당되어 일을 하고 있는지를 모니터링 하였다.
근데 이게 왠걸??? Couning을 하는 단계에서 실제 action이 발생했고 (Spark는 lazy연산이 기본) 시간이 너무 오래걸리는 걸 발견하였다...한 시간 이상이 지나도 끝나지 않은 것으로....기억한다.
따라서 이대로는 안되겠다 싶어 일단 요구사항들 중 한번에 같이 처리할 수 있는 그룹 세개로 나누고 기존 데이터를 가지고 처리할게 아니라 특정조건으로 filterling이 된 실제 필요한 데이터들만 가지고 작업을 해야겠다고 생각했다.
그래서 일단 원하는 조건으로 filterling을 한 후 해당 데이터들만 hdfs에 다시 적재했다.
실제 처리에 사용될 데이터 사이즈가 2~4G로 훅 줄었고 (다른 값들을 다 버리고 조건에 맞는 실제 ID값만 뽑아냈기 때문) 이제 돌리면 되겠다 하고 생각하고 돌렸다.
그런데...데이터 사이즈가 2~4G밖에 안되는 데이터들간 Join을 하고 Counting을 하는 작업이 무슨 20~30분이나 걸린단 말인가?....(1~3분 이내를 예상했음)
그래서 이전에 비슷한 상황에서 실제 데이터들의 파티션이 데이터 사이즈에 비해 너무 많이 나누어져 있어서 Spark가 구동되며 execution plan을 세울 때 많은 task들이 발생되어 성능이 떨어졌던 기억이 있어 필터링 한 데이터들의 파티션 개수를 세어 보았더니...
사이즈가 2.6G가 밖에 안되는 실제 사용될 데이터의 파일의 개수가....각각 14393개와 14887개였다.....
이러니 task들이 몇 만개씩 생기지......😱
해결방법
따라서 아 필터링된 아이디값의 데이터들을 hdfs로 쓸 때 개수를 줄여 쓰는 것 부터 다시 해야겠다라고 판단하고 rdd를 save하기전 coalesce(20)을 주어 14393개로 나뉘어진 파티션들을 20개의 파티션으로 나뉘어 쓰이도록 수정해 주었다.
그리고 ID값을 기준으로 두 데이터를 Join이후 distinct하고 counting하는 작업도 굉장히 늦었기에 join이후에 repartition으로 파티션의 개수를 줄여주고 counting을 하도록 수행해주었다.(실제 ID값으로 Join을 하게 되면 데이터가 줄어들기 때문에 많은 파티션을 유지할 필요가 없다.) coalesce와 repartition은 둘다 파티션의 개수를 조절해주는 메서드인데 차이가 궁금하신 분들은 이전 포스팅을 참고 바랍니다.
이렇게 수정을 하고 수행하였더니 task개수도 확 줄어들고수행시간도 5~7분이내에 다 끝나는 것을 확인할 수 있었다.
Spark 데이터 처리 작업시 고려사항
간만에 데이터 추출작업을 하다보니 미처 늦게 인지한 부분도 없지 않아 있었다. 이렇게 사이즈가 큰 데이터를 처리할 때는 항상 파티션의 개수를 잘 조절해서 처리하길 바란다. 그리고 스파크 성능에 영향을 미치는 부분에는 많은 부분이 있지만 기본적으로 데이터들의 파티션과 spark를 수행할 때 기본적인 executor개수와 core의 개수 executor 메모리 설정만 적합하게 되도(이 적합하게가 힘듬...) 큰 문제 없이 사용할 수 있을 거라 생각한다.