특정서비스의 로그를 spark으로 분석하기 위해 하둡커맨드 서버로 데이터를 가져와 hdfs에 put하는 과정 중 발생한 내용이다.
특정서비스(10대 서버)에서 한 달치의 로그(약 4.4g - 각 서버당)를 커맨드서버에서 wget으로 가져와 작업을 진행하였다.
wget으로 network bandwidth 옵션을 줘서 네트워크 대역폭을 모두 사용하지 않도록 했어야했지만 별다른 생각없이 wget을 하게 되었다.
문제는 해당 하둡 클러스터가 카프카(KAFKA)와 연결되어 있고 실시간으로 consuming하여 streaming하는 서비스에서 발생하였다.
네트쿼으 대역폭을 모두 사용하게 되어 카프카로부터 정상적인 컨슈밍이 되지 못했던 것이다.
앞으로는 데이터를 하둡클러스터로 가져올 때 혹은 다른 서버로 데이터를 옮길 때 항상 wget에 network bandwidth옵션을 주도록 하자.
ex) wget으로 데이터가져올 때 limit으로 50kbyte를 주는 예시
wget --limit-rate=50k {데이터 떙겨올 서버주소}
--limit-rate=amount Limit the download speed to amount bytes per second. Amount may be expressed in bytes, kilobytes with the k suffix, or megabytes with the m suffix. For example, --limit-rate=20k will limit the retrieval rate to 20KB/s. This is useful when, for whatever reason, you don't want Wget to consume the entire available bandwidth.
19/09/24 14:02:47 ERROR tools.DistCp: Invalid arguments: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error at org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:88) . . (생략) . Invalid arguments: Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error . (생략) .
문제는 신규클러스터의 standby의 네임노드 주소로 distcp를 하려고 하였던게 원인이였다.
OrderBy는 단순히 sort function의 alias라는점!!!...결국 동일하다는 얘기
[ Spark documentation ] /** * Returns a new Dataset sorted by the given expressions. * This is an alias of the `sort` function. * * @group typedrel * @since 2.0.0 */ @scala.annotation.varargs def orderBy(sortCol: String, sortCols: String*): Dataset[T] = sort(sortCol, sortCols : _*)
19/06/27 18:27:47 INFO mapreduce.Job: Running job: job_1559692026802_4689
19/06/27 18:27:56 INFO mapreduce.Job: Job job_1559692026802_4689 running in uber mode : false
19/06/27 18:27:56 INFO mapreduce.Job:map 0% reduce 0%
19/06/27 18:28:14 INFO mapreduce.Job:map 19% reduce 0%
19/06/27 18:28:15 INFO mapreduce.Job:map 39% reduce 0%
(생 략)
19/06/27 18:28:42 INFO mapreduce.Job:map 100% reduce 99%
19/06/27 18:28:43 INFO mapreduce.Job:map 100% reduce 100%
19/06/27 18:28:46 INFO mapreduce.Job: Job job_1559692026802_4689 completed successfully
결 론
Reducer를 많이 늘린다고해서 처리 속도가 비례해서 향상되는 것은 아니다. 해당 작업의 유형을 고려하고 Reducer개수를 조절해가며 최적의 개수를 찾는게 중요하다. Reducer개수에 따라 결과 데이터가 hdfs에 쓰여진다.(Reducer10개면 10개의 파티션으로, 5개이면 5개의 파티션으로)
마지막으로 하둡 완벽가이드(4판)의 내용을 첨부한다.
리듀서를 하나만 두는 것(기본값)은 하둡 초보자가 자주 범하는 실수다. 실제로 대부분의 잡은 리듀서 수를 기본값인 1보다 크게 설정하는 것이 좋다. 그렇지 않으면 모든 중간 데이터가 하나의 리듀스 태스크로 모여들기 때문에 잡이 굉장히 느려진다. 사실 잡의 리듀서 수를 결정하는 것은 과학보다는 예술에 가깝다. 보통 리듀서 수를 늘리면 병렬 처리 개수도 늘어나서 리듀스 단계에서 걸리는 시간을 줄일 수 있다. 그러나 너무 많이 늘리면 작은 파일이 너무 많이 생성되는 준최적화(suboptimal)에 빠지게 된다. 경험적으로 리듀서의 실행 시간은 5분 내외, 출력 파일의 HDFS 블록 수는 최소 1개로 잡는 것이 좋다.
19/06/19 11:16:49 INFO mapreduce.Job: Task Id : attempt_1559692026802_2824_m_000031_2, Status : FAILED Error: Found interface org.apache.hadoop.mapreduce.Counter, but class was expected 19/06/19 11:16:49 INFO mapreduce.Job: Task Id : attempt_1559692026802_2824_m_000047_2, Status : FAILED Error: Found interface org.apache.hadoop.mapreduce.Counter, but class was expected 19/06/19 11:16:49 INFO mapreduce.Job: Task Id : attempt_1559692026802_2824_m_000011_2, Status : FAILED Error: Found interface org.apache.hadoop.mapreduce.Counter, but class was expected 19/06/19 11:16:49 INFO mapreduce.Job: Task Id : attempt_1559692026802_2824_m_000034_2, Status : FAILED Error: Found interface org.apache.hadoop.mapreduce.Counter, but class was expected
위와 같은 에러로 인해 map작업이 계속해서 실패하며 다음과 같은 에러를 내며 죽어버린다.
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.mapreduce.Counters.getGroup(Ljava/lang/String;)Lorg/apache/hadoop/mapreduce/CounterGroup;
문제의 원인은 하둡 MR작업 중 Counter를 사용하는데 해당 라이브러리를 잘못 가져다 써서 문제가 발생한 것이였다.
위와 같은 hadoop-core, hadoop-common 버전의 라이브러리를 사용해 작업을 했었다. 하지만 실제 하둡 MR을 구동하는 환경은 CDH 5.11.1에 설치된 하둡 패키지를 사용하고 있었기에 Counter메소드를 찾지 못해 NoSuchMethodError를 뱉는 것이었다.
따라서 pom.xml에 repository와 depency 수정으로 해결하였다.
cdh5.11.1에서 동작하도록 수정해주었다.
끝~~~~신기한건 똑같은 기능을 spark, mr둘다 만들어서 테스트해보았는데 단순히 데이터 읽어가면서 filterling하고 간단한 통계자료 뽑고하는 로직만 있어서 그런지 하둡MR이 훨씬 빨랐다는거....
When performing a shuffle my Spark job fails and says "no space left on device", but when I rundf -hit says I have free space left! Why does this happen, and how can I fix it?
Answers 1:
By defaultSparkuses the/tmpdirectory to store intermediate data. If you actually do have space left onsome device-- you can alter this by creating the fileSPARK_HOME/conf/spark-defaults.confand adding the line. HereSPARK_HOMEis wherever you root directory for the spark install is.
This is because Spark create some temp shuffle files under /tmp directory of you local system.You can avoid this issue by setting below properties in your spark conf files.
뭐 스파크 작업중 중간 과정에서 셔플링 데이터가 쌓이는데 해당 공간이 부족해서 그렇다며 스파크 디렉토리 데이터를 수정해주면 된다는~!
하지만 난 그냥 뭔가 spark-submit시 설정만 변경하기로 하고 설정을 다시 봤더니 executor-cores와 num-executor를 설정해주지 않은 걸 보고 default로 잡혀서 너무 작게 잡혀서 그런가 하고 추가로 넣어서 다시 spark-submit결과 정상동작하였다.