반응형


하둡 디렉토리hadoop directory) 존재 여부나 파일이 있는지 확인을 해야 하는 경우가 있다.


이 때 사용할 수 있는 명령어가 -test 라는 명령어이다.


help명령어를 통해 사용할 수 있는 옵션을 살펴보면


> hadoop fs -help test 



다음과 같은 옵션을 확인할 수 있다.


보통 해당 하둡 디렉토리 확인 여부(-d), 하둡 path 존재여부(-e), path에 파일 존재하는 여부 확인(-f) 옵션을 자주 사용한다.


보통 다음과 같이 쉘스크립트(shell script)에서 사용할 수 있다.


> -e옵션과 -z옵션을 통한 하둡파일체크



해당 path가 하둡에 있는지 확인(-e)하고 -z 옵션을 이용해 해당 path의 파일의 사이즈가 0byte가 아닌지 확인한다.


위와 같이 하둡디렉토리에 정상적인 파일이 있는 경우 작업시행시 조건으로 사용할 수 있다.


-test(하둡 명령어)를 유용하게 활용해보시길~





반응형
반응형

Chapter 2. Introducing Cassandra

Cassandra in 50 Words or Less

"Apache Cassandra is an open source, distributed, decentralized, elastically scalable(탄력적인 확장성), highly available(고가용성), fault-tolerant, tuneably consistent, row-oriented database that bases its distribution design on Amazon's Dynamo and its data model on Google's Bigtable. Created at Facebook, it is now used at some of the most popular sites on the Web."

Distributed and Decentralized

  • The fact that Cassandra is decentralized means that there is no single point of failure.
  • All of the nodes in a Cassandra cluster function exactly the same. This is sometimes referred to as “server symmetry.” (서버 대칭)
  • they are all doing the same thing, by definition there can't be a special host that is coordinating activities(활동을 조정하는),
    as with the master/slave setup that you see in MySQL, Bigtable, and so many others.
  • RDMBS(master/slave) - centerlized, NOSQL(cluster) - distributed
  • Note that while we frequently understand master/slave replication in the RDBMS world, there are NoSQL databases such as MongoDB that follow the master/slave scheme as well.
  • Decentralization has two key advantages
    • it's simpler to use than master/slave, and it helps you avoid outages.(중단을 방지)
    • It can be easier to operate and maintain a decentralized store than a master/slave store because all nodes are the same.
      That means that you don't need any special knowledge to scale; setting up 50 nodes isn't much different from setting up one.
  • Cassandra is distributed and decentralized, there is no single point of failure, which supports high availability.

Elastic Scalability

  • Scalability is an architectural feature of a system that can continue serving a greater number of requests with little degradation in performance(성능저하거의없이)
  • Horizontal scaling means adding more machines that have all or some of the data on them so that no one machine has to bear the entire burden of serving requests.
  • You don't have to restart your process. You don't have to change your application queries. You don't have to manually rebalance the data yourself. Just add another machine
  • you won't need to upset(뒤집다) the entire apple cart to scale back (!!!!important)

High Availability and Fault Tolerance

  • Cassandra is highly available. You can replace failed nodes in the cluster with no downtime, and you can replicate data to multiple data centers
    to offer improved local performance and prevent downtime if one data center experiences a catastrophe(대재앙) such as fire(화재) or flood.

Tuneable Consistency

  • When considering consistency, availability, and partition tolerance, we can achieve only two of these goals in a given distributed system, a trade-off known as the CAP theorem
  • In Cassandra, consistency is not an all-or-nothing proposition.
  • We might more accurately term it “tuneable consistency” because the client can control the number of replicas to block on for all updates.
  • The replication factor lets you decide how much you want to pay in performance to gain more consistency.

Brewer's CAP Theorem

  • Consistency
    • All database clients will read the same value for the same query, even given concurrent updates.
  • Availability
    • All database clients will always be able to read and write data.
  • Partition tolerance
    • The database can be split into multiple machines; it can continue functioning in the face of network segmentation breaks.
  • Brewer's theorem is that in any given system, you can strongly support only two of the three.
  • “You can have it good, you can have it fast, you can have it cheap: pick two.”

  • Figure2-1illustrates visually that there is no overlapping(중첩) segment where all three are obtainable(세 가지 모두 얻을수 있는).
[ Figure 2-2 ]

Although databases its on line two property
According to the Bigtable paper, the average percentage of server hours that "some data" was unavailable is 0.0047% (section 4), so this is relative, as we're talking about very robust systems already.
So what does it mean in practical terms(실제적인 측면) to support only two of the three facets of CAP?
CA (Consistency, Availability)
To primarily support consistency and availability means that you're likely using two-phase commit for distributed transactions. It means that the system will block when a network partition occurs, so it may be that your system is limited to a single data center cluster in an attempt to mitigate(완화) this. If your application needs only this level of scale, this is easy to manage and allows you to rely on familiar, simple structures.
CP (Consistency, Partition tolerance)
To primarily(주로) support consistency and partition tolerance, you may try to advance your architecture by setting up data shards in order to scale. Your data will be consistent, but you still run the risk of some data becoming unavailable if nodes fail.
AP (Availability, Partition tolerance)
To primarily support availability and partition tolerance, your system may return inaccurate data, but the system will always be available, even in the face of network partitioning.

Row-Oriented

  • Cassandra's data model can be described as a partitioned row store, in which data is stored in sparse multidimensional(다차원) hashtables.
  • "Partitioned" means that each row has a unique key which makes its data accessible, and the keys are used to distribute the rows across multiple data stores.

High Performance

  • Cassandra was designed specifically from the ground up to take full advantage of multiprocessor/multi-core machines, and to run across many dozens of these machines housed in multiple data centers.
  • As you add more servers, you can maintain all of Cassandra's desirable properties(바람직한 특성) without sacrificing performance(성능저하).

Is Cassandra a Good Fit for My Project?

  • if you expect that you can reliably serve traffic with an acceptable level of performance with just a few relational databases, it might be a better choice to do so,
    simply because RDBMSs are easier to run on a single machine and are more familiar.
  • the ability to handle application workloads that require high performance at significant write volumes with many concurrent client threads is one of the primary features of Cassandra.

그렇다면 DMP에 Cassandra를 쓰는게 적합한가???

번외

CAP Theorem

  • CAP Theorem은 분산시스템에서 일관성(Consistency), 가용성(Availability), 분할 용인(Partition tolerance)이라는 세 가지 조건을 모두 만족할 수 없다는 정리로, 세 가지 중 두 가지를 택하라는 것으로 많이 알려졌있다.
  • CAP는 분산시스템 설계에서 기술적인 선택을 돕는다. 분산시스템이 모든 것 특성을 만족시킬 수 없음을 인지하게 하고 선택지를 제공해주는 것이다.

CAP Theorem의 오해와 진실

Brewer가 2000년에 이정리를 발표할 때 P에 대한 정의.
The system continues to operate despite arbitrary message loss or failure of part of the system
2002년에 이에 대해 Gilbert와 Lynch가 이 정리에 대해 증명에 사용된 P에 대한 새로운 정의
The network will be allowed to lose arbitrarily many messages sent from one node to another
  • CAP Theorem의 정의를 살펴보면 마치 C,A,P가 동등한 선상에 있는 것처럼 느껴진다. 이런 연유로 C,A,P가 모두 분산시스템의 특성에 대한 것으로 여겨지지만, 실상은 그렇지 않다.
    앞에서 언급한 정의를 생각해본다면, C와 A는 분산시스템의 특성이지만, P는 그 분산시스템이 돌아가는 네트워크에 대한 특성이다.
  • CAP Theorem의 정의가 이해하기 어렵게 되어 있다. 세 가지 특성 중 두 개까지 고를 수 있다는 것으로도 많이 알려졌는데 마치 P를 포기하고 CA를 선택할 수도 있는 것 같은 기분을 들게 한다.
    하지만 실상은 P의 정의는 네트워크가 임의의 메시지 손질을 할 수 있다는 것을 허용하느냐는 의미로 네트워크가 가끔 장애가 날 수 있다는 것을 인정하냐는 것이다.
    P의 선택을 배제하려면 절대로 장애가 나지 않는 네트워크를 구성해야하지만 그런 것은 세상에 존재하지 않는다. 따라서 원하든 원하지 않든 P는 선택되어야 하며 결국 선택권은 C나 A중 하나 밖에 없다.
애초에 P는 네트워크 장애가 있을 수 있다는 것을 인정하느냐의 문제이고 이는 인정 할 수밖에 없는 문제이므로 처음부터 선택되어있는 것이다.
결국, CAP Theorem이 내포하고 있는 의미는 분산시스템에서 네트워크 장애상황일 때 일관성과 가용성 중 많아도 하나만 선택할 수 있다는 것이다.
하지만 이런 식의 해석은 정상 상황일 때의 분산시스템 동작을 서술해주지 못한다. 이에 대해 Daniel Abadi가 PACELC라는 아주 우아한 표기법을 제시하였다.

CAP보다는 PACELC를 사용하자

CAP는 분산시스템 디자이너의 선택에 도움을 주는 정리다. CAP가 장애 상황일 때의 선택에 대해 서술하는 것으로 생각하면, 정상 상황일 때의 선택에 대해 서술하지 못하게 된다.
그래서 정상상황일 때와 장애상황을 때를 나누어 설명하자는 것이 PACELC의 핵심이다. PACELC의 의미를 인용하자면 아래와 같다.
if there is a partition (P) how does the system tradeoff between availability and consistency (A and C);
else (E) when the system is running as normal in the absence of partitions, how does the system tradeoff between latency (L) and consistency (C)?

  • 파티션(네트워크 장애)상황일 때에는 A와 C는 상충하며 둘 중 하나를 선택해야 한다는 것이다.
  • 변경된 값을 모든 노드에서 바라볼 수 있도록 하려면 신뢰성 있는 프로토콜을 이용하여 데이터에 관여하는 모든 노드에 데이터가 반영되어야 한다.
    하지만 네트워크 단절이 일어나 몇 개의 노드에 접근할 수 없을 때 C를 위해 데이터 반영이 아예 실패하던지 C를 포기하고 일단 접근 가능한 노드들이게 만 데이터를 반영하던지 둘 중의 하나만 골라야 한다.
  • 정상상황일 때에는 L과 C는 상충하며 둘 중 하나를 선택해야 한다. 모든 노드들에 새로운 데이터를 반영하는 것은 상대적으로 긴 응답시간이 필요하기 때문이다.
– HBase는 PC/EC이다. 장애 상황일때 C를 위해 A를 희생한다. 그렇지 않은 경우에도 C를 위해 L을 희생한다.
– Cassandra는 PA/EL이 가능하도록 디자인 되었다. 설정에 따라 Eventual Consistency의 특성을 가지게 되는데, 이 경우 PA/EL이 된다.
장애 상황인 경우에는 가능한 노드에만 데이터를 반영하고 정상으로 복구되면 필요한 노드에 데이터를 모두 반영한다. 정상상황일때도 Latency를 위해 모든 노드에 데이터를 반영하지 않는다.

결론

  • 절대로 장애가 없는 네트워크는 존재하지 않으며, 따라서 분산시스템에서 P는 무조건 인정하고 들어가야 한다
  • 네트워크가 단절되었을 때 시스템이 어떻게 동작할지 결정해야 하며, 이 때 C와 A를 모두 만족시키는건 불가능하므로 둘 중에 하나를 골라야 한다.
  • CAP로 표현하는 것보다 PACELC로 표현하는 것이 훨씬 명확하다.

[ Cassandra definitive guide(카산드라 완벽 가이드 정리). Chapter1. Beyond Relational Databases ]

반응형
반응형


Chapter1. Beyond Relational Databases

Preface

  • Apache Cassandra는 2009년 1월 Apache에서 Incubator프로젝트로 처음 시작했다.
  • 카산드라는 페이스북, 트위터, 넥플릭스(Netflix)를 포함한 웹상의 가장 큰 회사들에 사용되고 있다.
  • 빠른 쓰기를 수행하고 수백 테라 바이트의 데이터를 분산화 시켜 저장할 수 있고 단일 실패 지점(SPOF)가 없다.
  • 이 책은 Apache Cassandra 3.0과 DataStax Java Driver 버전 3.0을 사용하여 개발되었습니다.

Chapter1. Beyond Relational Databases

If at first the idea is not absurd, then there is no hope for it.
Albert Einstein

What's Wrong with Relational Databases?

  • We encounter scalability problems(확장성 문제) when our relational applications become successful and usage goes up.
  • Joins are inherent in(내재되어 있다) any relatively normalized relational database(상대적으로 정규화된 데이터베이스) of even modest size, and joins can be slow.
  • The way that databases gain consistency is typically through the use of transactions, which require locking some portion of the database so it's not available to other clients.
  • This can become untenable(견딜수 없는) under very heavy loads, as the locks mean that competing users start queuing up, waiting for their turn to read or write the data
We typically address these problems in one or more of the following ways.
  • Throw hardware at the problem by adding more memory, adding faster processors, and upgrading disks.This is known as vertical scaling. - This can relieve you for a time.

  • When the problems arise again, the answer appears to be similar: now that one box is maxed out(최대치), you add hardware in the form of additional boxes in a database cluster.

  • We try to improve our indexes and optimize the queries. But this also has a limitaion.

  • When we use cache, Now we have a consistency problem between updates in the cache and updates in the database, which is exacerbated(악화시키다) over a cluster.

  • For addressing these problems we try denormalization. But this violates rules of relational data.

RDBMS: The Awesome and the Not-So-Much

  • There are many reasons that the relational database has become so overwhelmingly popular over the last four decades.
    • An important one is the Structured Query Language (SQL), which is feature-rich and uses a simple, declarative syntax.
  • SQL is easy to use. The basic syntax can be learned quickly, and conceptually SQL and RDBMSs offer a low barrier to entry.

TRANSACTIONS, ACID-ITY, AND TWO-PHASE COMMIT

  • RDBMSs and SQL also support transactions
  • ACID (Atomic, Consistent, Isolated, Durable)

특징

 DB기능

 설명

 Atomicity(원자성) 

 회복성의 보장, Commit/Rollback

 트랜잭션은 분해가 불가능한 논리적 최소 단위로 실행 전체가 승인되거나 취소되어야 함

(All or Nothing) 

 Consistency(일관성)

 무결성 제약조건, 동시성 제어

 트랜잭션의 수행 이후 데이터는 무결성이 유지되고 모순되지 않아야 함

 Isolation(고립성)

 분산트랜잭션, Lock

 다수의 트랜잭션이 동시에 수행되더라도 개별 트랜잭션의 결과에 영향을 미쳐서는 안됨

 Durability(지속성)

 회복기법, 회복 컴포넌트관리

 성공적으로 수행된 트랜잭션은 해당 요인에 의해 변경 및 손실되지 않아야 함


  • Transactions become difficult under heavy load. When you first attempt to horizontally scale a relational database, making it distributed, you must now account for(고려하다) distributed transactions,
    where the transaction isn't simply operating inside a single table or a single database, but is spread across multiple systems.
  • In order to continue to honor the ACID properties of transactions, you now need a transaction manager to orchestrate(조정) across the multiple nodes.
  • In order to account for successful completion across multiple hosts(여러 호스트에서 성공적 완료를 설명하기 위해), the idea of a two-phase commit (sometimes referred to as “2PC”) is introduced.
  • But then, because two-phase commit locks all associated resources, it is useful only for operations that can complete very quickly.

Gregor Hohpe, a Google architect, wrote a wonderful and often-cited blog entry called“Starbucks Does Not Use Two-Phase Commit”.

스타벅스의 사례에 접목해보면, "2단계 커밋"은 점원(cashier)이 커피가 완성될 때까지 한 고객만을 바라보며 영수증과 돈을 테이블 위에 올려놓고 기다리는 것과 같습니다. 마지막으로 음료가 준비되면, 영수증과 돈으로 일거에 교환하는 것입니다. 이런 상황에서는 점원뿐만 아니라 고객까지도 "트랜잭션(거래)"이 끝날 때 까지 자리를 떠날 수 없게 됩니다. 이러한 "2단계 커밋" 방식을 적용하게 되면 일정 시간에 서비스 할 수 있는 고객 수는 극적으로 감소할 것이고, 스타벅스의 비즈니스를 확실하게 망하게 만들 것입니다. 이것은 2단계 커밋이 일상을 좀 더 단순하게 만들 수는 있지만, 메시지들의 자유로운 흐름을 방해하게 된다(그리고 확장성을 해친다)는 사실을 일깨워 주는 사례입니다. 왜냐하면, 비동기의 다중 작업 흐름 속에서 트랜잭션의 상태를 관리해야 하기 때문입니다.
출처: http://sunnykwak.tistory.com/100 一生牛步行

Summary

  • Relational databases are very good at solving certain(특정) data storage problems, but because of their focus, they also can create problems of their own when it's time to scale.
  • The recent interest in non-relational databases reflects the growing sense of need in the software development community for web scale data solutions.

번외

  • 2000년 후반으로 넘어오면서 인터넷이 활성화되고, 소셜네트워크 서비스 등이 등장하면서 관계형 데이터 또는 정형데이터가 아닌 데이터, 즉 비정형데이터라는 것을 보다 쉽게 담아서 저장하고 처리할 수 있는 구조를 가진 데이터 베이스들이 관심을 받게 되었다.
    • 해당 기술의 발전과 더불어 NoSQL 데이터베이스가 각광을 받게되었고 어떤 엔지니어들은 NoSQL을 Modern web-scale databases라고 정의하기도 한다고 한다.

NoSQL과 RDBMS 차이점

  • 관계형 모델을 사용하지 않으며 테이블간의 조인 기능이 없음
  • 대부분 여러 대의 데이터베이스 서버를 묶어서(클러스터링) 하나의 데이터베이스를 구성
  • 관계형 데이터베이스에서는 지원하는 Data처리 완결성(Transaction ACID 지원) 미보장
  • 데이터의 스키마와 속성들을 다양하게 수용 및 동적 정의(Schema-less)
  • 데이터베이스의 중단 없는 서비스와 자동 복구 기능지원
  • 다수가 Open Source로 제공
  • 확장성, 가용성이 높음

NoSQL 종류

Key Value DB
  • key와 value의 쌍으로 데이터가 저장되는 단순한 형태의 솔루션
  • Riak, Vodemort, Tokyo etc.
Columnar Store
  • Key Value에서 발전된 형태의 Column Family데이터 모델을 사용
  • HBasem Cassandra, ScyllaDB etc.
Document DB
  • JSON, XML과 같은 Collection 데이터 모델 구조를 채택하고 있다.
  • MongoDB, CoughDB etc.
Graph DB

  • Nodes, Relatsionship, Key-value 데이터 모델을 채용하고 있다.
  • Neo4J, OreientDB etc.


반응형
반응형


[ Spark ] JavaRDD로 saveAsTextFile했는데 데이터가 정상적으로 나오지 않는다???


Spark(스파크) 작업을 하고나서 결과물을 hadoop(하둡)에 쓰고 싶을 경우 보통 saveAsTextFile 메소드를 사용하게 된다.


그런데 적재를 하고 하둡 명령어로 해당 파일을 열어봤는데 파일의 내용이 정상적으로 안써지고 dataframe을 RDD로 변경할 때 사용한


모델 패키지명으로 노출되는 경우가 있다. 다음과 같이...


처음 스파크로 작업을 했을 때 적잖이 당황했던 기억이나서 포스팅한다.


코드를 보게되면


다음과 같은 식으로 integratedDF라는 변수명의 dataframe을 JavaRDD로 변환하는데 매핑 모델로 AdidBidPairingModel을 쓰고 있다.


해당 모델을 보면 다음과 같다.


스파크에서 특정 클래스파일로 데이터를 쓸 때 해당 클래스의 toString메소드를 참고해 데이터를 쓰게되는데?(내추측) 


해당 모델 클래스에는 toString메소드가 없다...


보통 일반적인 스프링프로젝트에서는 lombok을 써서 알아서 생성해주지만 일반 자바 프로젝트에서는 toString메소드를 작성해주어야 한다.




다음과 같이 AdidBidPairingModel 끝부분에 toString메소드를 정의해주고 다시 spark-submit을 해보면 ~



정상적인형태로 데이터가 나오는 것을 확인할 수 있다!!!!


처음 스파크프로젝트로 작업을 하시는 분들은 꽤나 헤맬 수 있다...나또한 나중에 또 헤맬 수 있으므로 오랜만에 spark작업하는 김에 기록해본다!!!


반응형
반응형


스파크(Spark)에서 OutOfMemoryError:Java heap space 다음과 같은 에러 메세지가 발생했다면


스파크 어플리케이션 내부에서 다음과 같은 메서드를 사용하지 않았는지 확인해볼 필요가 있다.

- collect()

- countByKey()

- countByValue()

- collectAsMap()



사실 이전에 collect() 메서드 사용시 주의점을 포스팅하긴했었다...

http://brocess.tistory.com/56  - [Spark collect 연산시 주의사항]



해당 메서드들을 rdd에 사용하게 되면 각각의 executor 노드들에서 돌고있는 rdd element들을 driver로 copy하려고 한다.


따라서 RDD의 사이즈가 크다면 driver에서 해당 RDD들을 저장할 메모리가 부족하게 되고 이에 OOME(OutOfMemory)가 발생할 수 있다.



이번에 나도 무심결에 RDD를 collectAsMap()해버려 위와 같은 에러가 발생하였다...


단순히 RDD를 map형태로 만들어 놓고 다른 RDD연산을 위해 사용하려고 했던 생각해서 저지른 실수였다...


물론 충분한 driver memory를 spark-submit시 할당해 주면 되겠지만 아무래도 클러스터 내에 해당 어플리케이션만 동작하는게 아니기 때문에 


좋지 않은 방법이다.


따라서 사이즈가 큰 데이터를 map형태로 이용하고 싶다면 Storage(RDBMS, NOSQL)을 사용하기를 권한다.



반응형
반응형


스파크(Spark) 스트리밍 성능(Processing Time) 개선

실시간 관심사 타겟팅 애플리케이션에서 더 많은 관심사 모수를 뽑아도록 작업한 로직을 배포하게되었다.

실시간 스파크 어플리케이션을 배포할 때 가장 유의해야 하는 것은 추가된 기능으로 인해 마이크로배치형태로 처리되는 작업들의 

수행시간이 현저히 길어지지 않았는가 모니터링하고 체크하는 것이다.

내가 이번에 추가한 로직도 성능에 영향을 미칠 것이라고 판단되어 배포전 실시간 스트리밍 처리 현황과 배포 후를 비교해보았다.


[ 새로운 기능이 추가되기 전 ] 


[ 새로운 기능이 추가된 후 ]


새로운 기능이 추가되기 전과 후의 차이를 보게되면 Processing Time이 더 길어짐에 따라서 Total Delay가 길어지는 것을 확인해 볼 수 있었다.

이 어플리케이션은 1분 단위의 마이크로배치로 돌아가기 때문에 배치간 작업은 1분 이내에 마무리가 되어야 그 다음 배치에 

영향을 미치지 않는다. 따라서 해당 어플리케이션을 튜닝해야하는 이슈가 있었고 어떻게 성능을 개선하면 좋을지 생각을 하다가 

기존 executor가 5개로 동작을 하고 있었는데 클러스터의 가용 vcore개수가 많음에도 불구하고 너무나도 적은 개수의 

executor만을 사용하여 실시간 작업을 처리하고 있었다.

이에 spark-submit 실행시 executor number개수를 5개에서 10개로 늘려주고 확인해보았다.


[ 성능 개선 후 (executor 개수(5->10) 조절) ]


조절 후 이미지를 보면 알겠지만 Processing Time이 반으로 줄어들면서 Total Delay부분도 줄어들었고 Scheduling Delay가 발생하지 

않는 것을 확인할 수 있었다. 어떻게 보면 되게 단순하게 어플리케이션의 성능을 개선했지만 스파크의 동작방식에 대한 이해와 작동되는 

클러스터에 대한 이해없이는 쉽게 나올 수 없는 해결책이었다고 생각이든다. 


그럼 executor 개수를 더 늘려주면 더 좋을까? 라고 누군가 묻는다면 불필요하게 executor의 개수만 무작정 늘린다고 해서 좋다고 

말할 수는 없다. 어플리케이션에 따라 프로세스를 처리하는데 적절한 executor의 개수가 존재한다고 생각이 들고 

스트리밍 어플리케이션은 그 기준은 Scheduling Delay가 발생하지 않는 선에서의 개수를 맞추어주면 된다. 

불필요하게 많은 executor는 너무 적은 데이터를 처리하게 되고 클러스터 전체로 보았을 때는 불필요한 오버헤드가 많이 발생할 수 있다. 

또한 executor들간 broadcast시에도 더 많은 네트워크I/O를 초래할 수 있기 때문에 무작정 executor를 많이 설정해 사용하는 것이 

좋은 방법만은 아니다.


또한 한가지 팁은 executor의 개수를 spark-submit시 명시적으로 주어 활용한다면 

--conf에 spark.yarn.max.executor.failures 해당 옵션을 주어서 사용하도록 하자.

spark.yarn.max.executor.failuresnumExecutors * 2, with minimum of 3The maximum number of executor failures before failing the application.

ex ) executor 개수를 5개로 할당했으면 --conf "spark.yarn.max.executor.failures=10"으로 주어 설정해주도록 하자.


Worker만 무작정 늘린다고 Performance가 향상되는 것은 아니다^^


반응형
반응형

[ 하둡 MR보다 스파크(SPARK)를 사용할 때 장점 ]

하둡MR보다 스파크(SPARK)를 사용했을 때의 대부분이 말하는 이점은 디스크 처리 기반에서 메모리 처리 기반으로 넘어오면서 연산처리 속도가 빨라졌다는 것이다.

뭐 틀린말은 아니지만 데이터 엔지니어 입장에서 뭔가 더 구체적으로 설명할 수 있어야 하지 않을까 하는 생각에 간단히 정리해 포스팅해본다.

1. 스파크(SPARK)의 연산 방식은 lazy evaluation으로 수행된다. 

Lazy evaluation(굳이 번역해 보자면 느긋한 연산 정도 되겠다)을 사용함으로써 action이 시작되는 시점에 트랜스포메이션(transformation)끼리의 연계를 파악해 실행 계획의 최적가 가능해진다. 사용자가 입력한 변환 연산들을 즉시 수행하지 않고 모아뒀다가 가장 최적의 수행 방법을 찾아 처리하는 장점을 가진다.
여기서 말하는 최적화란 대부분 지역성(locality)에 관한 것이다. 예를 들어 물건을 사오는 심부름을 시킬 때 A상점에서 파는 물건과 B상점에서 파는 물건을 따로따로 여러 번사오게 하는 것보다 필요한 물건을 한꺼번에 주문해서 한 번 방문했을 때 필요한 물건을 한 번에 사는 것이 효율적이기 떄문이다.

사실 fist() 액션에서도 스파크는 처음 일치하는 라인이 나올 때까지만 파일을 읽을 뿐 전체 파일을 읽거나 하지 않는다.

실제로 하둡 맵리듀스 같은 시스템에서는 맵리듀스의 데이터 전달 회수를 줄이기 위해 어떤 식으로 연산을 그룹화 할지 고민하느라 개발자들이 시간을 많이 빼앗기게 된다. 맵리듀스에서 연산 개수가 많다는 것은 곧 네트워크로 데이터를 전송하는 단계가 많아짐을 의미하고 그만큼 클러스터에 부하를 가져다 줄 수 있다. 스파크(SPARK)에서는 단순한 연산 들을 많이 연결해서 사용하는 것이나 하나의 복잡한 매핑 코드를 쓰는 것이나 큰 차이가 없는데 기본적으로 스파크에서 효율적인 계획을 세워서 수행하기 때문이다. 그렇다고 해서 rdd재사용 등을 고려하지 않고 아무렇게나 프로그래밍을 해도 된다는 의미는 아니다. 따라서 스파크 사용자들은 프로그램을 더 작게 만들고, 효율적인 연산의 코드를 만들어 내야 한다는 부담에서 좀 더 자유로울 수 있다.


2. RDD 재사용을 위한 캐싱 기능
기본적으로 메모리위에 캐싱을 하여 처리를 하게 되면 디스크 처리 기반의 MR작업보다 최소 10~20배 이상 빠를 수 밖에 없다. 여러 액션에서RDD 하나를 재사용하고 싶으면 RDD.persist()를 사용하여 계속 결과를 유지하도록 할 수 있다. 첫 연산이 이루어진 후 스파크는 RDD의 내용을 메모리에 저장하게 되며(클러스터의 여러 머신들에 나뉘어서) 이후의 액션들에서 재사용할 수 있게 된다.


3. RDD는 유연한 연산 방식을 제공한다.
분산 데이터로서의 RDD(Resilient Distributed Datasets)는 문자 그대로 해석하면 "회복력을 가진 분산 데이터 집합"으로  데이터를 처리하는 과정에서 집합을 이루고 있던 데이터의 일부에 문제가 생겨도 스스로 알아서 복구할 수 있다는 의미이다.  실제로 이것은 스파크(SPARK)가 RDD를 만들어 내는 방법을 기억하고 있기 때문에 가능한 것으로 스파크는 데이터의 일부가 유실되면 어딘가에 백업해둔 데이터를 다시 불러오는 것이 아니고 데이터를 다시 만들어내는 방식으로 복구를 수행하게 됩니다.


4. 코드 간결성 및 Interactive shell
하둡 MR을 해보신 분은 알겠지만 단어들을 aggregate하는 하둡 MR소스코드는 맵과 리듀스를 만들어주어야 하기 때문에 길고 복잡할 수 밖에 없는 반면에 스파크는 람다기반의 함수형 프로그래밍 기법으로코드가 매우 간단하며, interactive shell을 사용하여 실제 쉘에서 실시간으로 데이터 변화를 확인할 수 있다는 장점을 가지고 있습니다.


실제로 하둡 MR 대안으로 SQL을 MapReduce로 변환해주는 Hive 프로젝트가 있어 많은 사람이 잘 사용하고 있지만, 쿼리를 최적화하기가 어렵고 속도가 더 느려지는 경우가 많다는 어려움이 있다. 스파크는 이러한 단점들을 보안하며 위와 같은 장점들로 인해 분산 처리 툴로서 많은 관심과 사랑?을 받고 있다고 볼 수 있습니다.


반응형
반응형


hadoop(하둡)을 운영하다 보면 특정한 경우 hdfs에 나누어 저장되어 있는 파일들을 합쳐서 로컬로 받고 싶은 경우가 있다.

이 때 -getmerge명령어를 쓰면되는데 -text명령어로도 동일한 기능을 수행할 수 있다.

다만 주의해야할 차이점은 -text명령의 경우는 hdfs에 파일이 gz으로 묶여 쌓여있는 경우 압축을 풀어 라인을 읽어 로컬에 쓸 수 있는 반면에

-getmerge의 경우는 그렇지 않다. 따라서 합치고자 하는 파일이 .gz형태라면 -text를 사용해서 합쳐 로컬로 받는 방법이 훨씬 더 간편하다.


[ hdfs 예시 파일 ] 

!주의 .gz의 경우에는 -getmerge가 정상적으로 먹히지 않는다. (.gz일 때는 -text사용)

-rw-r--r--   3 irteam irteam      36997 2017-09-22 16:50 /log/temp/manual/part-00000

-rw-r--r--   3 irteam irteam    8828447 2017-09-22 16:59 /log/temp/manual/part-00001

-rw-r--r--   3 irteam irteam      38420 2017-09-22 16:49 /log/temp/manual/part-00002


[ -getmerge 사용법 ] 

hadoop fs -getmerge [hdfs경로] [로컬디렉토리]

 hadoop fs -getmerge  /log/temp/manual  /local/directory


[ -text 사용법 ] 

hadoop fs -text [hdfs경로] > [로컬디렉토리] 

hadoop fs -text /log/temp/manual/part-* > /local/direcotory


명령어 실행 후 로컬디렉토리에 저장된 파일의 개수를 세어보면 두 명령어로 실행한 데이터 개수가 동일한 것을 확인할 수 있다.

텍스트파일로 hdfs에 저장된 총 1241517라인(50M)로 테스트 해봤을 떄 로컬로 쓰는데 까지 두 명령어 모두 약 5초정도 걸렸다.

따라서 편한방법으로 사용하시길:)


반응형
반응형

하둡1.0과 하둡2.0의 차이는 YARN으로 인해 많은 부분이 변화되었다.

그 차이에 대해서 알아보도록 하자.

[ 아키텍처의 변화 ]


[ 하둡 1.0과 2.0에서 리소스 관리 차이 ]

하둡 1.0에서 맵리듀스를 실행할 때는 슬롯 단위로 맵/리듀스 태스크 갯수를 관리했다.따라서 맵퍼는 모두 동작하는데 리듀서는 놀고 있거나 반대의 경우로 인해 클러스터 전체 사용률이 낮았다.

하지만 하둡 2.0에서 YARN(얀)이 도입되면서 슬롯이 아닌 컨테이너 단위로 리소스를 할당하게 되었다. 얀의 리소스 매니저는 전체 클러스터의 리소스 정보를 토대로 할당 가능한 컨테이너 개수를 계산하며, 맵리듀스는 필요한 컨테이너들을 할당 받아서 맵리듀스 태스크를 실행하게 된다. 

이 때 컨테이너 개수와 맵과 리듀스 태스크의 관계는 1:1의 관계가 아니며, 맵과 리듀스 태스크는 상황에 따라서 하나 이상의 컨테이너를 실행할 수도 있다. 그래서 관리자는 전체 클러스터의 리소스 상황과 얀에서 실행하는 잡들의 워크로드를 고려하여 리소스 설정을 진행해야 한다.


[ YARN의 도입으로 JobTracker의 역할이 Resource Manager와 Application Master로 분리 ] 

하둡 1.0에서는 JobTracker(잡트래커)가 클러스터 리소스 관리 및 어플리케이션 스케쥴링 등을 모두 담당했었다.

하지만 하둡 2.0에서는 클러스터마다 Application Master(어플리케이션 마스터)가 존재하고 각 서버마다 Node Manager(노드 매니저)가 할당되어 있고 리소스관리는 Resource Manager(리소스 매니저)가 어플리케이션 수행 및 스케쥴링 관리는 Application Master(어플리케이션 마스터)로 역할이 분리되어 운영된다.


[ Spark 등 분산처리 환경 지원 ]

하둡의 맵/리듀스 작업보다 성능이 훨씬 개선된 SPARK 및 분산처리 프레임워크를 사용할 수 있게 되었다. 스파크는 배치 처리 작업에 있어서 맵리듀스보다 10배정도 빠르며 인메모리 분석에서 100배나 빠르다고 알려져 있다.




반응형

+ Recent posts