DevelopHyun

Data Science & Algorith with Computer Science

Spark[2] Resilient Distributed Datasets: A Fault Tolerant Abstraction for In Memory Cluster Computing(2012) - Review

15 May 2018 » distributed-computing, spark, rdd, paperreview

1. Abstract

  • 기존 framework의 비효율성을 해결하기 위하여 등장
  • RDD의 장점
    • in-memory computation
    • fault-tolerant
  1. in-memory computation
    • memory에서 iterative algorithm과 interactive data mining tool을 효율적으로 빠르게 처리
  2. fault-tolerant
    • coarse-grained transformation기반의 restricted form(read-only) 제공
    • coarse-grained : 어떠한 프로세스의 각 단계의 실행여부를 세세하게 파악하지 않고, ‘그냥’ 작업을 진행시키는 것
    • fine-grained : 프로세스의 각 단계마다 실행여부와 다음 단계를 지시하는 것
    • ex) fine-grained : 한 개의 작업을 시키고, 끝났는지 확인하고, 끝났다면 다음 작업을 지시

2. Introduction

  • 기존 분산처리의 문제점
    • 많은 framework들이 cluster의 자원을 사용하게 해주지만, memory를 사용하지는 않음
    • 중간 과정에 놓여있는 data를 ‘reuse’할 때 비효율적
    • 왜냐하면, 모든 실행결과를 external stable storage에 저장하므로 data replication, disk I/O, 직렬화 등 작업으로 많은 CPU시간을 소모
    • 이 문제를 해결하기 위한 다양한 framework가 있었지만, 특정 작업(loop of MapReduce)에 대해서만 memory를 사용
    • ex) iterative algorithm(machine learning), interactive data mining(같은 data의 subset을 계속 불러내는 것)
  • RDD
    • 보다 많은 작업에서 data를 재사용(중간 결과들을 memory에 저장)
    • 병렬 연산
    • fault-tolerant
    • 작업을 control하여 data placement(RDD를 통한 작업)을 최적화
    • Spark에 RDD를 구현

2.1 fault tolerant

  • 기존의 fault-tolerant 방식은 저장소에 replicate하거나, 다른 컴퓨터에 data update를 logging하는 것
    • cluster network를 통해 많은 데이터가 이동해야하므로 많은 자원을 소모
  • lineage(계보)를 작성하여 해결
    • coarse-grained transformation
    • 실제 data의 변화를 logging하는 것이 아니라, dataset에 적용되는 연산들을 logging하는 것
    • 만약 RDD에 문제가 생겨도, 이 데이터가 어떻게 연산되어 생성됐는지 알 수 있다면 금방 다시 만들 수 있음

3. RDD

3.1 RDD Abstraction

  • read-only
    • 수정이 불가능
    • 다른 RDD부터 만들거나, stable storage에서 불러와야 함
  • RDD는 항상 data를 materialize(구체화)할 필요 없음
    • ‘lineage’를 통하여 어떻게 RDD가 생성되는지에 대한 정보만 충분히 가지고 있으면 됨
    • 문제가 생기면, lineage에 정보를 가져와 다시 RDD를 만들면 되기 때문
  • transformation
    • 다른 RDD를 변형하여, 새로운 RDD를 만드는 것
    • 실제로 data를 만드는 것이 아니라 lineage에만 logging함
    • lineage에 logging하는 것은 pipeline을 만드는 과정
    • map(), filter(), join() 연산 등
  • user는 RDD를 통하여 persistence와 partitioning 가능
    • persistence : 재사용할 RDD를 지정하여 memory에 보관가능, persist() 사용
    • partitioning : 각 recode(data)의 key를 기준으로 partition하여 cluster로 분배할 수 있음

3.2 Spark Programming Interface

  • Spark에서 RDD는 하나의 객체의 형태로 dataset을 표현
    • transformation 연산은 이 객체의 method
  • 처음에는 stable storage에 있는 data에 transformation연산을 통해 RDD를 정의
    • 이후부터는 RDD로부터 새로운 RDD를 만들어낼 수 있음
  • action
    • lineage에 logging된 transformation연산들을 통하여 만들어진 RDD의 값을 반환하는 연산
    • 즉, lineage에 쓰여있는 transformation 연산들을 실제로 실행하여 실제 data를 만들어내는 연산
    • action연산이 발생하기 전까지의 RDD는 그저 lineage에 logging만 되어 있으므로 실제 데이터라고 할 수 없음
    • 따라서 action연산을 통하여 RDD를 실제로 사용할 수 있음
    • count(), collect(), save() 연산 등
  • lazy execution
    • Spark는 transformation연산들을 통해 만들어진 RDD를 마지막 action을 통하여서 실제 값을 계산
    • 즉, RDD를 연산하는 것을 미루다가 마지막에 한번에 연산하는 것이므로 lazy하다고 할 수 있음
    • 하지만, 어떻게 만들어지는지 lineage를 통하여 계보가 그려졌으므로 ‘Execution plan’을 만들 수 있음
    • 따라서 연산의 순서나 자원의 상황을 고려하여 효율적으로 작업을 재구성하거나 배치할 수 있음
  • 어떤 RDD를 재사용할지 설정해둬 효율적으로 데이터를 사용할 수 있음
  • 기본적으로 RDD는 RAM에 저장되지만, 용량이 부족하면 Disk로 보내짐
    • user가 자율적으로 RAM에 저장하는 우선순위 등 strategy를 짤 수 있음

3.2.1 Example: Console Log Mining

example

  • 만약 hdfs에서 발생한 error를 spark로 찾는 작업이라면, 위와 같은 과정을 통해서 찾을 수 있다.
  • persist()를 통해서 RDD를 memory에 계속 올려두면, hdfs의 error를 찾는 작업뿐만 아니라, 다른 작업도 빠르게 실행할 수 있다.
  • collect()를 실행하기 전까지는 transformation연산이므로 lineage에 logging만 하고, collect()를 통한 action연산을 실행해야 실제로 RAM으로 data를 불러와서 작업을 수행함

3.3 Advantages of the RDD Model

advantage

  • coarse-grained transformation
  • read-only
    • fault-tolerant
    • data가 수정되지 않으므로 어떻게 만들어졌는지 알면 복구가 가능
    • lineage를 통해 데이터를 복구
    • data가 update되지 않아서 data의 checkpoint를 일일히 확인하지 않아도되며, 따라서 overhead가 발생하지 않음
    • 문제가 생겼을 경우, 해당부분만 다른 nodes들에서 병렬연산하여 다시 연산할 수 있으므로 전체적인 프로그램을 다시 실행할 필요가 없음
    • slow node(straggler)의 task를 copy하여 다른 node에서 처리가 가능하므로 어느정도 완화 가능. 즉, read-only이기 때문에 처리해야하는 RDD는 변하지 않아서 task를 그냥 copy해와도 다른 node에서 그대로 처리가 가능
  • data locality에 따라서 runtime이 schedule됨
    • lineage를 통하여 planning이 가능
  • RAM이 부족하더라도 Disk에 저장되어 처리가 되므로 최소한 기존의 시스템들과 비슷한 성능을 가질 수 있음

3.4 Applications Not Suitable for RDDs

  • RDD는 dataset의 모든 element들에 같은 연산이 적용될 때 효율적
    • lineage에 transformation 연산들을 저장해두면, 문제가 생겼을 때 모든 data를 logging하지 않아도 되므로 복구하기 쉬움
  • shared state에 asynchronous fine-grained update하는 작업에는 부적합
    • 이러한 작업은 logging을 update하거나, data checkpointing하는 system에 더 적합
    • 비동기적이므로 따로 처리할 수는 있지만, 계속 작업상태를 공유해야하므로 네트워크를 통해 정보가 이동하는 양이 많아지면 비효율적
    • ex) web application, web crawler

4. Spark Programming Interface

spark

  • Scala, Java, Python을 지원하는 Spark를 통해 RDD구현
    • 함수형 언어가 필요
  • Driver & Worker
    • Driver에서는 transformation을 통한 RDD 생성, action을 통한 RDD 실현을 지시 by coding
    • 해당 code는 lineage에 tracked됨
    • Worker는 이 명령을 받아서 data를 RAM으로 불러온 후, 작업을 수행

4.1 RDD Operations in Spark

operation

  • transformation
    • define new RDD
    • lazy operation
  • action
    • launch computation
    • return value
    • write data to external storage
  • persist
    • RDD를 RAM에 저장하여 계속 사용

4.2 Example Applications

  • 많은 Machine Learning Algorithm이 iterative한 특징을 가지고 있으므로, RDD가 적합
    • ex) gradient descent

4.2.1 Logistic Regression

lr

4.2.2 PageRank

pr


5. Representing RDDs

5.1 graph-based representation for RDD

interface

  • 많은 transformation 중에서 lineage(계보)를 tracking(추적)하는 것을 해결하기 위하여 제시
  • RDD의 graph를 그려서 해결
  • scheduler에 추가되는 다른 logic이 필요 없음
  • 5개의 정보를 노출하는 공통 interface를 통해 각 RDD를 표현
    • partitions, dependencies of parent RDD, function, metadata about its partitioning scheme and data placement

5.2 dependency

dependency

  • RDD간의 dependency를 어떻게 표현할지를 해결

  • narrow dependency
    • 하나의 parent RDD의 partition이 딱 하나의 child RDD의 partition에 의해 사용됨
    • 즉, 하나의 partition of child RDD에는 하나의 partition of parent RDD
    • pipeline이 하나의 node내에서만 그려짐
    • recover하기 위해서 오직 하나의 parent partition이 필요하므로 더 효율적, 따라서 복구하기 위해서는 하나의 node만 작동
    • ex) map()
  • wide dependency
    • 많은 parent RDD의 partition이 하나의 childe RDD의 partition을 만들기 위해 사용됨
    • 즉, 하나의 partition of child RDD에 많은 partition of parent RDDs
    • 여러 partition의 data가 필요하므로 shuffle이 발생
    • recover하기 위해서 많은 parent partition이 필요, 따라서 복구하기 위해서 많은 node들이 작동
    • ex) join()
  • 만약 input file이 HDFS에서 불러진다면, 각 block마다 하나의 partition을 return

6. Implementation

  • Mesos cluster manager 위에서 system 작동
    • Hadoop, MPI 등 다른 application들과 resource와 공유
    • Spark는 Hadoop의 input plugin API를 통해 Hadoop의 input source를 모두 사용 가능

6.1 Job Scheduling

schedule

  • graph 형태의 RDD representation 사용
  • memory에 RDD가 계속 있을 수 있다는 점이 기존의 scheduler에 추가됨
  • action이 실행되면 scheduler가 lineage의 graph를 검사하여 실행단계를 나눔
    • narrow dependency로 구성된 pipeline을 최대한 많이 만듦
    • shuffle이 발생하는 부분 or 이미 계산된 partition가 합쳐지는 부분을 기준으로 stage를 구분
    • scheduler는 target RDD가 만들어질 때까지 missing partition이 있는지 확인
  • lazy execution 사용
  • data의 locality(위치)를 고려하여 task 할당
    • 만약 어떤 task가 특정 node의 memory에 있는 partition을 사용해야한다면, 그 node로 task를 보냄
    • RDD가 특정 node의 memory가 아니라 HDFS에 있는 file처럼 기본위치에서 제공되는 경우, 해당 partition으로 작업을 보냄
  • fault recovery를 더 쉽게 하기위하여 wide dependency의 intermediate record(중간결과)를 구체화하여 실제 값을 node에 저장
  • 한 node에 할당된 task가 실패할 경우, 다른 node로 보내짐
  • 만약 특정 stage가 실패할 경우, 없어진 partition을 다시 연산하여 생성

6.2 Interpreter Integration

og

  • Scala는 compile하여 JVM에서 실행하는 형식
  • Class shipping
    • HTTP를 통하여 Worker node가 JVM에서 생성된 bytecode를 가져옴
  • Modified code generation
    • object graph를 따라서 해당 값을 찾아내는 것이 아니라, 해당 값이 tracked된 채로 Worker에게 제공

6.3 Memory Management

  • Spark가 제공하는 in-memory persistent RDD의 장점
    • 빠르게 접근 가능
    • memory공간이 한정되어 있을 때 더 효율적으로 사용가능
    • RAM에 저장하기에는 크기가 크지만, 재계산 비용이 많이 드는 RDD에 유리
  • LRU eviction policy
    • 제한된 memory자원을 효율적으로 사용하기 위하여 RDD에 적용
    • 만약 새로운 RDD가 만들어졌는데 momory가 부족하다면, 가장 조금 접근된 RDD를 축출
    • user가 직접 RDD에 persistence priority를 부여 가능

6.4 Support for Checkpointing

  • lineage가 fault recovery에 좋은 성능을 보이지만, lineage가 길어지면 많은 시간을 소모하게 됨
  • 따라서, 어떤 RDD에 대해서는 stable storage에 저장하여 checkpoint를 만드는 것이 더 효율적
    • 특히, wide dependency를 많이 가지는 lineage에 유리
    • scheduler가 알아서 설정을 해주지만, user가 replicate()로 checkpoint user가 직접 생성가능
    • read-only이기 때문에 data가 변경되지 않으므로 checkpoint를 형성하는 동안 작업이 멈추지 않아도 됨
    • ex) PageRank Algorithm
  • narrow dependency가 있는 pipeline의 경우는 stable storage에 저장하지 않는 것이 더 효율적
    • 병렬연산이 가능하므로 RDD를 replicate하는 것보다 새롭게 연산하는 것이 더 빠름

7. Evaluation

  • Spark가 Hadoop보다 훨씬 빠른 속도로 작업을 처리한다.(20~40배)

7.1 Iterative Machine Learning Applications

result1

7.2 PageRank

result2

7.3 Fault Recovery

result3

7.4 Behavior with Insufficient Memory

result4

7.5 User Applications Built with Spark

result5

7.6 Interactive Data Mining

result6


8. Discussion

8.1 Expressing Existing Programming Models

  • 다른 cluster program보다 성능이 월등히 좋음
  • RDD에 restriction을 주는 것은 application들에게 영향을 별로 주지 않음
    • 비록 read-only와 coarse-grained하게 RDD가 생성되지만, 본질적으로 많은 data에 같은 연산을 적용한다는 것은 변하지 않음
    • immutable하지만, 덕분에 하나의 RDD로부터 여러 RDD를 만들 수 있다는 장점이 있음

8.2 Leveraging RDDs for Debugging

  • 처음에 lineage는 RDD가 fault-tolerant하기 위하여 design되었지만 debugging하는 것에도 기여
  • lineage를 tracking하여 RDD를 reconstruct하거나, 부분적으로 재계산하여 debugging할 수 있음
    • 이전에는 계속 전체 Distributed system을 실행하며 debugging
    • graph가 logging되는 것이므로 overhead의 우려가 없음

  • Cluster Programming Models
  • Caching Systems
  • Lineage
  • Relational Databases

10. Conclusion

  • RDD
    • efficient, general purpose
    • lineage를 통한 fault-tolerant
    • iterative한 작업에 적절

11. Reference