ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spark] RDD의 Action 연산
    ComputerScience/스파크 2020. 7. 20. 18:04

    이 장은 Java 8 버전 이상의 언어로 작성되었으며 가끔 Scala 언어를 사용합니다.

    Action 연산

     

    RDD의 연산은 RDD가 RDD를 반환하는 트랜스포메이션 연산과 그 결괏값이 정수나 리스트, 맵 등의 RDD가 아닌 다른 타입을 반환하는 액션 연산으로 나누어져 있다.

     

    이 장에서는 액션 연산을 알아보자.


     

    first

    first 연산은 RDD 요소 가운데 첫 번째 요소 하나를 돌려줍니다. 스파크 셸에서 작업할 때 트랜스포메이션의 수행 결과 등을 빠르게 확인하는 용도로 확인할 수 있습니다. 다음은 스칼라 언어의 예제이고 다른 언어의 경우도 동일한 방법을 사용합니다.

    val rdd = sc.parallelize(List(5,4,1))
    val result = rdd.first
    println(result)
    
    결과 : 5

    take

    take()는 RDD의 첫 번째 요소로부터 순서대로 n개를 추출해서 되돌려주는 메소드입니다. 원하는 데이터를 찾기 위해 RDD의 전체 파티션을 다 뒤지지는 않지만 최종 결과 데이터는 배열 혹은 리스트와 같은 컬렉션 타입으로 반환하기 때문에 지나치게 큰 n값을 지정하면 메모리 부족 오류가 발생할 수 있습니다.

    val rdd = sc.parallelize(1 to 20, 5)
    val result = rdd.take(5)
    println(result.mkString(", ")
    
    결과 : 1,2,3,4,5

    takeSample

    takeSample()메소드는 RDD 요소 가운데 지정된 크기의 샘플을 추출하는 메소드이다. sample() 메소드와 유사하지만 샘플의 크기를 지정할 수 있다는 점에서 결과 타입이 RDD가 아닌 배열이나 리스트 같은 컬렉션 타입이라는 차이점이 있습니다. 결과값이 컬렉션 타입이기에 샘플의 크기가 너무 크면 메모리 오류가 발생 할 수 있습니다.

    val rdd= sc.parallelize(1 to 100)
    val result = rdd.takeSample(false, 20)
    println(result.length)
    
    결과 : 20

    collect, count

    collect()는 RDD의 모든 요소를 배열 혹은 리스트 같은 하나의 컬렉션에 담아서 돌려주는 메소드이다.

    count()는 이름과 같이 RDD에 있는 모든 요소의 개수를 돌려주는 메소드이다.

     

    collect() 메소드는 RDD의 모든 원소를 드라이버의 메모리에 불러오게 되므로 적당한 크기의 RDD를 대상으로 사용해야 합니다.


    countByValue

    countByValue()는 RDD에 속하는 각 값들이 나타내는 횟수를 구하여 맵 형태로 되돌려 주는 메소드입니다. 예를 들어 "1, 1, 2, 2, 3"이라는 다섯 개의 숫자로 이루어진 RDD가 있다면 이 RDD에는 1과 2가 각 두 개, 그리고 3이 한 개 포함되어 있다고 말 할 수 있습니다.

     

    reduce()나 fold() 함수를 떠올리기 전에 countByValue()함수를 적용할 수 있는지 검토해 보는 것이 좋습니다.

    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,1,2,2,2,3));
    Map<Integer, Long> result = rdd.countByValue();
    System.out.println(result);
    
    결과 : {1=2, 2=3, 3=1}

    reduce

    reduce() 메소드는 RDD에 포함된 임의의 값 두개를 하나로 합치는 함수를 이용해 RDD에 포함된 모든 요소를 하나의 값으로 병합하고 그 결괏값을 반환하는 메소드입니다. 예를 들면, 1,3,5,7,9를 '+' 연산으로 처리한다면 1과 3을 더해서 4, 다음 4와 5를 더해서 9... 이런식으로 마지막까지 연산을 마친 뒤 결과값을 내는 방식입니다.

    def reduce(f: (T,T) => T) : T

     

    이 때, 주의할 점은 스파크 애플리케이션이 클러스터 환경에서 동작하는 분산 프로그래밍이기 때문에 실제 병합이 첫 번째 요소부터 마지막 요소까지 순서대로 처리되는 것이 아니고 각 서버에 흩어져 있는 파티션 단위로 나누어져 처리된다는 것이다. 따라서 병합연산은 모든 요소에 대해 결합법칙과 교환법칙이 성립되는 경우에만 사용이 가능합니다.

    JavaRDD<Integer> rdd3 = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10),3);
    int result3 = rdd3.reduce((Integer v1, Integer v2)->v1+v2);
    System.out.println(result3);
            
            결과 : 55

    fold

    fold()는 reduce와 같이 RDD 내 모든 요소를 대상으로 교환법칙과 결합법칙이 성립되는 바이너리 함수를 순차 적용하여 최종 결과를 구하는 메소드이다. 다만 차이라면 연산의 초기값이 존재한다. 더하기 연산이라면 초기값으로 0, 곱하기 연산이라면 초기값으로 1을 넣어준다.

    다른 값을 넣어주면 값이 이상하게 나옴... 이유는 모르겠다.

     

    def fold(zeroValue : T)(op : (T, T) => T)

            JavaRDD<Integer> rdd4 = sc.parallelize(Arrays.asList(1,2,3));
            int result4 = rdd4.fold(0,(Integer v1, Integer v2)->v1+v2);
            System.out.println(result4);
            
            결과 : 6

    aggregate

    aggregate는 인자로 총 3개를 사용한다. 첫 번째는 fold()와 유사하게 초깃값을 지정하는 것, 두 번째는 각 파티션 단위 부분합을 구하기 위한 병합함수, 그리고 이렇게 파티션 단위로 생성된 부분합을 최종적으로 하나로 합치기 위한 또 다른 병합함수로 구성됩니다.

     

    def aggregate[U](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U)(implicit arg0: ClassTag[U]): U

     

    seqOp는 U와 T타입값을 입력값으로 전달받아 U 타입의 값으로 돌려주고 있는데, T는 RDD의 요소들이 갖는 타입을 의미,  U는 zeroValue로 전달했던 초기값과 같은 타입을 의미한다. 예를 들어, RDD[String] 타입의 RDDㅇ에서 zeroValue로 비어있는 Set을 사용했다면 seqOp는 (Set ,String) -> Set 과 같은 형태가 되는 것이다.

     

    aggregate()는 두 단계에 걸쳐 병합을 처리하는데 첫 번째 단계에서는 파티션 단위로 병합을 수행하고 두 번째 단계에서는 파티션단위 병합 결과끼리 다시 병합을 수행해서 최종 결과를 생성합니다. seqOp와 compOp 함수 모두 첫 번째 인자로 이전 단계의 병합 결과로 생성된 객체를 재사용하기 때문에 매번 새로운 객체가 생성되는 부담을 덜 수 있다는 점이 있습니다.

     

    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(100,80,75,90,95),3);
    Record zeroValue = new Record(0,0);
    
    
    Function2<Record, Integer, Record> seqOp = (Record r, Integer v) -> r.add(v);
    Function2<Record, Record, Record> combOp = (Record r1, Record r2) -> r1.add(r2);
    
    Record result = rdd.aggregate(zeroValue,seqOp,combOp);
    
    System.out.println(result);
    
    결과 : 88

    sum

    스파크에서는 RDD를 구성하는 요소의 타입에 따라 좀 더 특화된 편리한 연산을 제공하기 위해 특정 타입의 요소로 구성된 RDD에서만 사용 가능한 메소드를 정의하고 있습니다. 대표적인 경우로 키와 값 형태의 쌍을 이루는 요소를 위한 reduceByKey()나 combineByKey() 등을 들 수 있습니다. sum() 메소드도 그 중 하나로서, RDD를 구성하는 요소가 double, Long 등 숫자 타입일 경우에만 사용 가능하며 전체 요소의 합을 구해줍니다.

     

    List<Double> data = Arrays.asList(1d,2d,3d,4d,5d,6d,7d,8d,9d,10d);
    javaDoubleRdd rdd = sc.parallelizeDoubles(data);
    double result = rdd.sum();
    System.out.println(result);
    
    결과 : 55.0

    foreach,foreachPartition

    RDD의 모든 요소에 특정 함수를 적용하는 메소드입니다. 이 메소드는 인자로 한 개의 입력값을 가지는 함수를 전달 받는데, 이렇게 전달받은 함수에 각 RDD 요소를 하나씩 입력값으로 사용해 해당 함수를 실행합니다. foreachPartitions()도 같이 실행할 함수를 인자로 받아 사용하는데, foreach()와 다른 점은 해당 함수를 개별 요소가 아닌 파티션 단위로 적용한다는 점입니다. 

     

    이 메소드를 사용할 때 유의할 점은 인자로 전달받은 함수가 드라이버 프로그램(메인 함수를 포함하고 있는 프로그램)이 동작하고 있는 서버가 아닌 클러스터의 각 개별 노드(서버)에서 실행되다는 점입니다. 예를 들어, 분산 클러스터 환경에서 RDD의 foreach() 연산에 콘솔 출력 함수를 전달한다는 것은 실제로는 각 노드의 콘솔에 출력하라는 의미가 됩니다.

     

    List<Integer> data = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
    JavaRDD<Integer> rdd = sc.parallelize(data,3);
    
    rdd.foreach((Integer t) -> System.out.println("Value Side Effect: " + t));
    
    

    foreach결과값

    이유는 모르겠는데 순서가 첫번째 파티션먼저 나오지가 않는다.

     

    List<Integer> data = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
    JavaRDD<Integer> rdd = sc.parallelize(data,3);
    
    rdd.foreach((Integer t) -> System.out.println("Value Side Effect: " + t));
    
    

    foreachPartition결과값


    toDebugString

    이름 그대로 디버깅을 위한 메소드입니다. RDD의 파티션 개수나 의존성 정보 등 세부 정보를 알고 싶을 때 사용할 수 있습니다.


    cashe, persist, unpersist

    RDD는 액션 연산이 수행될 때마다 관련 트랜스포메이션 연산을 반복합니다. 이때 기존에 사용했던 데이터가 메모리에 남아 있다면 그 데이터를 사용하지만 다른 이유로 인해 데이터가 남아 있지 않다면 RDD 생성 히스토리(리니지)를 이용해 복구(재생성)하는 단계를 수행합니다.

     

    cashe()와 persist()는 첫 액션을 실행한 후 RDD 정보를 메모리 또는 디스크 등에 저장해서 다음 액션을 수행할 때 불필요한 재생성 단계를 거치지 않고 원하는 작업을 즉시 실행할 수 있게 해 주는 메소드입니다. 이 중에서 cashe()는 RDD의 데이터를 메모리에 저장하라는 의미로, 만약 저장해야 할 메모리 공간이 충분치 않다면 부족한 용량만큼 저장을 수행하지 않게 됩니다.

     

    이에 반해 persist()는 StorageLevel이라는 옵션을 이용해 저장 위치(메모리, 디스크) 와 저장 방식(직렬화 여부) 등을 상세히 지정할 수 있는 기능을 제공합니다. 예를 들어, StorageLevel의 MEMORY_ONLY는 메모리에만 저장하라는 의미이며, MEMORY_AND_DISK_SER는 메모리에 저장하다가 공간이 부족할 경우 DISK를 사용하되 직렬화된 포맷을 이용하라는 의미입니다.

     

    rdd.cashe

    rdd.persist(StorageLevel.MEMORY_ONLY)

    와 같이 사용됩니다.

     

    무조건 캐시하고 보자는 생각보다는 처리하고자 하는 데이터의 특성과 용량, 사용 가능한 클러스터 자원을 신중히 고려해서 캐시 정책을 세우는 것이 중요합니다.

     


    partitions

    partitions()는 RDD의 파티션 정보가 담긴 배열을 돌려줍니다. 이때 배열에 담긴 요소는 Partition 타입 객체이며, 파티션의 인덱스 정보를 알려주는 index() 메소드를 포함하고 있습니다. 단순히 크기 정보를 알아볼 목적이라면 getNumPartitions() 메소드를 사용할 수 도 있습니다.

     

    val rdd =sc.parallelize(1 to 1000, 10)
    println(rdd.partitions.size)
    println(rdd.getNumPartitions)
    
    결과 : 
    10
    10

     

    댓글

Designed by Tistory.