책&스터디

[하둡 완벽 가이드] - PART1 02장 맵리듀스

jyu_seo_ 2026. 2. 1. 21:27

맵리듀스는 데이터 처리를 위한 프로그래밍 모델이다. 이 모델은 단순하지만 그렇다고 유용한 프로그램을 작성할 수 없을 정도로 단순하기만 한것은 아니다. 하둡은 다양한 언어로 작성된 맵리듀스 프로그램을 구동시킬수 있다.

맵리듀스는 태생 자체가 병행성을 고려하여 설계되었고, 누구든지 충분한 장비만 갖추고 있다면 대규모 데이터 분석을 할 수 있다. 맵리듀스는 대용량 데이터셋에서 그 진가가 드러난다. 예제를 통해서 맵리듀스의 프로그래밍 모델을 살펴보자.

2-1 기상 데이터셋

우리는 기상 데이터를 분석하는 프로그램을 작성할것이다. 지구 전지역에서 매시간 데이터를 수집하는 기상센서들은 대량의 로그 데이터를 모으는데 이러한 데이터는 반구조적이면서 레코드지향적 이기 때문에 맵리듀스를 이용한 데이터 분석에 적합하다.

2-2 데이터 포맷

데이터는 국립기후자료센터(NCDC)에서 가져와서 사용한다. 이 데이터는 한행이 하나의 레코드며 행단위의 아스키 형식으로 되어있다.이 형식은 기상학적 요소들이 데이터 길이가 가변적이고 선택적인 요소가 많음에도 불구하고 유연한 표현을 가능하게 한다. 단순한 처리를 위해 모든 데이터에 존재하고 고정길이를 가진 기온과 같은 기본 요소에 초점을 두도록 하겠다.

 

중요필드에 대한 주석을 포함한 예를 보여준다. 실제파일에는 구분자 없이 모든 필드가 한행으로 붙어있지만 예제에서는 각필드 단위로 보기 쉽게 하기 위해 여러행으로 구분해놓았다.

 

수만개의 기상관측소가 존재하기 때문에 전체 데이터셋은 상대적으로 작은 파일이 매우 많다. 하둡의 특성상 소수의 큰파일이 처리하기 쉽고 효율적이다. 따라서 다수의 파일을 연도를 기준으로 하나의 파일로 병합하기 위해 먼저 전처리 작업을 수행해야 한다.

 

2-3 유닉스 도구로 데이터 분석하기

연도별 전 세계 최고 기온을 얼마일까? 처음에는 하둡을 사용하지 않고 이 문제를 해결할 것이다. 이 정보는 성능 평가의 기준선이 될 것이고 결과를 검증하는 데도 유용할 것이다.

 

awk는 행 기반 데이터를 처리하기 위한 전통적인 유닉스 도구다.

awk 스크립트는 데이터에서 두개의 필드(기온과 특성코드)를 추출한다. 기온(문자열)에 0을 더하면 그 값은 정수형으로 변환한다.

다음에는 기온이 유효한 값을 가지는지,특성 코드가 그 측정값을 신뢰할 수 있다고 보는지 점검한다. 측정된 값에 문제가 없다면 현재 최고 기온과 비교하여 새로운 값이 더 높으면 최고 기온을 변경한다. END 영역은 파일에 있는 모든행이 처리된 후에 실행되는데, 최종 최고 기온을 출력한다.

 

2-4 하둡으로 데이터 분석하기

하둡이 제공하는 병렬처리의 이점을 이용하기 위해서는 우리 요구사항을 맵리듀스 잡으로 다시 표현할 필요가 있다.

로컬이나 소규모 클러스터에서 테스트를 한 후 다수의 머신으로 구성된 클러스터에서 수행해보자.

2-4-1 맵과 리듀스

맵리듀스 작업은 크게 맵단계와 리듀스 단계로 구분된다. 각 단계는 입력과 출력으로 키-값의 쌍을 가지며, 그 타입은 프로그래머가 선택한다. 또한 프로그래머는 맵 함수와 리듀스 함수를 작성해야 한다.

 

맵단계의 입력은 NCDC의 원본 데이터다. 먼저 데이터셋의 각 행의 타입을 텍스트로 인식하는 텍스트 입력 포맷을 선택한다. 값은 각 행(문자열)이고, 키는 파일의 시작부에서 각 행이 시작되는 지점까지의 오프셋이다.여기서 키는 의미가 없으므로 무시해도 된다.

 

맵 함수는 단순하다. 각 행에서 연도와 기온을 추출한다. 이 예제에서 맵 함수는 단지 데이터의 준비 단계로 연도별 최고 기온을 찾는 리듀스 함수를 위해 데이터를 제공하는 역할을 맡는다. 또한 잘못된 레코드를 걸러주는 작업은 맵 함수에서 수행하는 것이 적합하며, 여기서는 기온 필드의 값이 누락되거나 이상하거나 문제가 있는 레코드를 제거하는 작업을 수행한다.

 

맵리듀스의 논리적 데이터 흐름

 

입력 > 맵 > 셔플 > 리듀스 > 출력

 

2-4-2 자바 맵리듀스

맵리듀스 프로그램이 어떻게 작동하는지 빠르게 훓어보기 위해 소스 코드를 한번 살펴보자. 맵 함수, 리듀스 함수, 잡을 구동하기 위한 코드를 볼 필요가 있다. 맵 함수는 추상 map() 메서드를 정의하는 Mapper 클래스로 구현된다.

 

Mapper클래스는 제네릭 타입으로, 네개의 정규타입 매개변수(입력키,입력값, 출력키, 출력값)를 가진다. 예제에서 입력키는 long integer 타입의 오프셋, 입력값은 한 행의 내용,출력키는 연도, 출력값은 기온이다. 하둡은 최적화된 네트워크 직렬화를 위해 자체적으로 기본 타입 셋을 제공한다. 이러한 타입 클래스는 org.apache.hadoop.io 패키지에서 찾아볼수 있다.

 

map()메서드의 입력으로 하나의 키와 하나의 값이 전달된다. 우리는 한 행의 Text 입력값을 자바 String으로 변환하고, 원하는 컬럼을 추출하기 위해 substring() 메서드를 사용한다.

 

또한 map()메서드는 출력을 위해 Context의 인스턴스를 제공한다. 예제에서 연도는 Text객체로 (키로 사용되기 때문에), 기온은 IntWritable 객체로 기록된다. 기온이 존재하고 측정된 값에 대한 특성 코드가 유효하다면 출력 레코드를 기록한다.

리듀스 함수는 Reducer를 사용하여 정의한다. 맵 함수의 정의와 비슷하다.

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer
	extends Reducer<Text, IntWritable, Text, IntWritable > {

   @Override
   public void reduce(Text Key,Iterable<IntWritable> values, Context context)
   		throws IOException, InterruptedException {
        
       int maxValue = Integer.MIN_VALUE;
       for(IntWritable value : values) {
       	maxValue = Math.max(maxValue, value.get());
       } 
        context.write(key, new intwritable(maxValue));
   }
}

 

리듀스 함수 역시 입력과 출력 타입을 규정하기 위해 네 개의 정규 타입 매개변수를 사용한다. 리듀스 함수의 입력 타입은 맵 함수의 출력타입이었던 Text, IntWriable과 짝을 이룬다. 예제에서 리듀스 함수의 출력 타입은 Text와 intWritable이며, Text에는 연도를, IntWritable에는 해당 연도의 기온값을 모두 비교하여 가장 높은 기온을 기록한다.

 

 

2-5 분산형으로 확장하기

지금까지는 작은 입력 데이터로 맵리듀스가 어떻게 작동하는지 살펴보았다. 이제부터는 시스템 전체를 한눈에 내려다보면서 대용량 데이터를 어떻게 처리하는지 살펴보겠다. 앞의 예제는 단순화를 위해 로컬 파일시스템의 파일을 사용했다. 그러나 확장을 위해서는 전체 데이터를 HDFS(3장에서 다룬다)라는 분산 파일시스템에 저장할 필요가 있다. 하둡은 데이터의 일부분이 저장된 클러스터의 각 머신에서 맵리듀스 프로그램을 실행한다. 이를 위해 하둡은 YARN이라 불리는 하둡 자원 관리 시스템을 이용한다. 이제부터 그 작동방식을 살펴보자.

2-5-1 데이터 흐름

먼저 용어부터 살펴보자. 맵리듀스 잡은 클라이언트가 수행하는 작업의 기본단위다. 잡은 입력데이터, 맵리듀스 프로그램, 설정정보로 구성된다. 하둡은 잡을 맵 태스크리듀스태스크로 나누어 실행한다.각 태스크는 YARN을 이용하여 스케줄링되고 클러스터의 여러 노드에서 실행된다.특정 노드의 태스크 하나가 실패하면 자동으로 다른 노드를 재할당하여 다시 실행된다.

 

하둡은 맵리듀스 잡의 입력을 입력 스플릿 또는 단순히 스플릿 이라고 부르는 고정 크기조각으로 분리한다. 하둡은 각 스플릿마다 하나의 맵 태스크를 생성하고 스플릿의 각 레코드를 사용자 정의 맵 함수로 처리한다.

 

전체 입력을 통째로 처리하는 것보다 스플릿으로 분리된 많은 조각을 각각 처리하는 것이 훨씬 빠르다. 따라서 만일 스플릿을 병렬로 처리한다면 고성능 서버가 저성능 서버보다 작업 과정에서 비교적 더 많은 스플릿을 처리할 수 있기 때문에 스플릿의 크기가 작을수록 부하 분산에 더 좋은 효과를 볼수있다.같은 성능의 서버에서도 프로세스가 실패하거나 여러 잡이 동시에 실행되기 때문에 부하 분산은 충분히 의미가 있다. 또한 부하 분산의 효과는 스플릿이 세분화 될수록 더 커진다.

 

반면 스플릿 크기가 너무 작으면 스플릿 관리와 맵 태스크 생성을 위한 오버헤드 때문에 잡의 실행 시간이 증가하는 단점이 있다.

일반적인 잡의 적절한 스플릿 크기는 HDFS 블록의 기본크기인 128MB가 적당하다고 알려져있다. HDFS 블록의 크기는 클러스터 설정에 따라 다르고 파일을 생성할 때 개별로 지정할 수 있다.

 

하둡은 HDFS 내의 입력 데이터가 있는 노드에서 맵 태스크를 실행할 때 가장 빠르게 작동한다. 이를 데이터 지역성 최적화 라고 하는데, 클러스터의 중요한 공유자원인 네트워크 대역폭을 사용하지 않는 방법이다. 그러나 맵 태스크의 입력 스플릿에 해당하는 HDFS 블록 복제본이 저장된 세 개의 노드 모두 다른 맵 태스크를 실행하여 여유가 없는 상황이 발생할 수도 있다. 이런 상황에서 잡 스케줄러는 블록 복제본이 저장된 동일 렉에 속한 다른 노드에서 가용한 맵 슬롯을 찾는다. 또한 아주 드문 일이지만 데이터 복제본이 저장된 노드가 없는 외부 랙의 노드가 선택될 수도 있는데,이때에는 랙 간 네트워크 전송이 불가피하게 일어난다.

 

최적의 스플릿 크기가 HDFS 블록 크기와 같아야 하는 이유는 그 블록 크기가 단일 노드에 저장된다고 확신할 수 있는 가장 큰 입력 크기이기 때문이다. 하나의 스플릿이 두 블록에 걸쳐 있을 때 두 블록 모두 저장하는 HDFS 노드는 존재할 가능성이 낮기 때문에 스플릿의 일부 데이터를 네트워크를 통해 맵 태스크가 실행되는 다른 노드로 전송해야 한다. 이렇게 되면 맵 태스크 전체가 로컬 데이터만 이용할때보다 더 느려지게 된다.

 

 

맵 태스크의 결과는 HDFS가 아닌 로컬 디스크에 저장된다. 맵의 결과는 리듀스가 최종 결과를 생성하기 위한 중간 결과물이고, 잡이 완료된 후 맵의 결과는 그냥 버려지기 때문이다. 따라서 맵의 결과를 HDFS에 저장하는 것은 내부 복제 과정을 추가하는 것이 되어 적절치 않다. 리듀스 태스크로 모든 결과를 보내기 전에 맵 태스크가 실패한다면 하둡은 자동으로 해당 맵 태스크를 다른 노드에 할당하여 맵의 출력을 다시 생성할 것이다.

 

리듀스 태스크는 일반적으로 모든 매퍼의 출력 결과를 입력으로 받기 때문에 데이터 지역성의 장점이 없다. 예제에서 모든 맵 태스크는 하나의 리듀스 태스크에 연결되어 있다. 따라서 정렬된 맵의 모든 결과는 네트워크를 통해 일단 리듀스 태스크가 실행중인 노드로 전송되고, 맵의 모든 결과를 병합한 후 사용자 정의 리듀스 함수로 전달된다.

 

일반적으로 리듀스의 결과는 안정성을 위해 HDFS에 저장된다. 3장에서 자세히 설명하겠지만, 리듀스 출력에 대한 HDFS 블록의 첫번째 복제본은 로컬노드에 저장되고, 나머지 복제본은 외부 랙에 저장된다. 따라서 리듀스의 결과를 출력하는 것은 네트워크 대역폭을 소모하지만 일반적인 HDFS 쓰기 파이프라인에 소모되는 대역폭과 비슷한 수준이다.

 

리스크 태스크 수는 입력 크기와 상관없이 독립적으로 지정할수 있다.리듀스가 여럿이면 맵 태스크는 리듀스 수만큼 파티션을 생성하고 맵의 결과를 각 파티션에 분배한다.각 파티션에는 여러키가 존재하지만 개별 키에 속한 모든 레코드는 여러 파티션중 한곳에만 배치된다. 파티셔닝 알고리즘은 사용자 정의 파티셔닝 함수를 지원하지만 해시 함수로 키를 분배하는 기본 파티셔너를 주로 사용하며 매우 잘 작동한다. 다이어그램은 맵과 리듀스 태스크 사이의 데이터 흐름을 왜 셔플 이라고 하는지 그 이유를 설명해준다. 셔플의 실제 과정은 이 다이어그램에서 보는 것보다 훨씬 복잡하다.

 

2-5-2 컴바이너 함수

클러스터에서 맵리듀스 잡이 사용하는 네트워크 대역폭은 한계가 있기 때문에 맵과 리듀스 태스크 사이의 데이터 전송을 최소화할 필요가 있다. 하둡은 맵의 결과를 처리하는 컴바이너 함수 를 허용한다. 컴바이너 함수는 최적화와 관련이 있다. 따라서 특정 맵의 결과 레코드에 컴바이너를 몇 번 호출할지 지정하는 기능은 제공하지 않는다. 다시 말해, 하둡은 컴바이너 함수의 호출 빈도와 상관없이 리듀스의 결과가 언제나 같도록 보장한다.

컴바이너 함수 작성하기

자바 맵리듀스 프로그램으로 다시 돌아가서, 컴바이너 함수는 Reducer 클래스를 사용해서 정의한다. 즉, MaxTemperatureReducer에 있는 리듀서 함수와 동일한 구현체를 사용한다. 차이가 있다면 Job 설정에 컴바이너 클래스를 추가로 지정해야 한다는 것이다.

2-5-3 분산 맵리듀스 잡 실행하기

이 프로그램은 코드를 고치지 않아도 전체 데이터셋에서 잘 동작한다. 맵리듀스는 데이터 크기와 하드웨어 성능에 따라 확장할 수 있는데 이것이 바로 맵리듀스의 핵심이다.데이터의 관점에서 보면 프로그램의 전체 수행시간은 EC2 클러스터의 10개의 노드(고성능 CPU XL 인스턴스)에서 6분이였다.

2-6 하둡 스트리밍

하둡은 자바 외에 다른 언어로 맵과 리듀스 함수를 작성할 수 있는 맵리듀스 API를 제공한다. 하둡 스트리밍은 하둡과 사용자 프로그램 사이의 인터페이스로 유닉스 표준 스트림을 사용한다. 따라서 사용자는 표준 입력을 읽고 표준 출력으로 쓸 수 있는 다양한 언어를 이용하여 맵리듀스 프로그램을 작성할 수 있다.

 

스트리밍은 그 특성상 텍스트 처리에 매우 적합하다. 맵의 입력 데이터는 표준 입력으로 맵 함수에 전달되고, 행 단위로 처리되어 표준 출력으로 쓰여진다. 맵의 출력 키-값 쌍은 탭으로 구분된 하나의 행으로 출력된다. 리듀스 함수의 입력은 표준 입력으로 전달받은 탭으로 구분된 키-값 쌍과 같은 포맷이다. 리듀스를 수행하기 전에 프레임워크는 키를 기준으로 데이터를 정렬하며, 리듀스 함수는 표준 입력으로부터 각 행을 읽고, 그 결과를 표준 출력에 쓴다.