"이 장에서는 카운터,정렬,데이터셋 조인 등 맵리듀스의 고급기능을 살펴본다."
9-1 카운터
우리는 가끔 분석 작업과는 별도로 분석하려는 데이터 자체가 궁금해질때가 있다. 예를들어 부적절한 레코드 수를 세는 중 전체 데이터셋에서 이러한 레코드가 차지하는 비율이 굉장히 높다는 사실을 알게 되면 왜 그렇게 많은 레코드가 잘못되었는지 즉시 확인하고 싶을 것이다. 아마 프로그램 일부의 버그 때문에 부적절한 레코드가 많을 수도 있고, 실제로 데이터의 품질에 문제가 있어서 이러한 레코드가 많을 수도 있다. 확인을 한 후에는 의미 있는 분석을 하기에 충분한 정상 레코드를 확보하기 위해 데이터셋의 크기를 늘리고 싶을 것이다.
카운터는 잡에 대한 통계 정보를 수집하는 유용한 채널로, 데이터 품질의 통제나 애플리케이션 수준의 통계를 제공한다.
또한 카운터는 문제 진단에 매우 유용하다. 맵이나 리듀스 태스크에 로그 메세지를 넣어서 특정조건의 발생을 기록하는 것보다는 카운터를 사용하는 것이 더 좋다. 게다가 큰 분산 잡의 로그 출력을 일일이 확인하는 것보다는 그냥 카운터 값을 확인하는 것이 훨씬 수월하다. 카운터 대신 로그를 사용하면 수많은 로그파일로부터 특정조건을 맞는 레코드를 모두 추출하는 프로그램을 별도로 작성해야 하는 어려움이 있다.
9-1-1 내장 카운터
하둡은 모든 잡에 대해 내장 카운터를 제공하며 이들은 다양한 메트릭(지표)을 알려준다.
예를 들어 처리한 데이터의 바이트와 레코드 수를 세는 카운터가 있으면 입력과 출력 데이터의 크기가 예상과 맞는지 확인할 수 있다. 카운터는 여러 그룹으로 나누어지며, 아래 표에서 내장 카운터 그룹을 확인할 수 있다.
-내장 카운터 그룹
| 그룹 | 이름/열거자 | 참고 |
| 맵리듀스 태스크 카운터 | org.apache.mapreduce.TaskCounter | [표 9-2] |
| 파일시스템 카운터 | org.apache.mapreduce.FileSystemCounter | [표 9-3] |
| FileInputFormat 카운터 | org.apache.mapreduce.lib.input.FileInputFormatCounter | [표 9-4] |
| FileOutputFormat 카운터 | org.apache.mapreduce.lib.output.FileOutputFormatCounter | [표 9-5] |
| 잡 카운터 | org.apache.mapreduce.JobCounter | [표 9-6] |
내장 카운터 그룹은 태스크가 진행되면서 갱신되는 태스크 카운터 또는 잡이 진행되면서 갱신되는 잡 카운터를 포함하고 있다.
태스크 카운터
태스크 카운터는 각 태스크가 실행될 때 해당 태스크에 대한 정보를 수집한 후 잡의 모든 태스크에 대한 값을 취합하여 최종 결과를 알려준다. 예를 들어 MAP_INPUT_RECORDS 카운터는 먼저 각 맵 태스크가 읽어 들이는 입력 레코드 수를 센 후 잡에 속한 모든 맵 태스크의 값을 집계하므로 최종 값은 잡의 총 입력 레코드 수가 된다.
태스크 카운터는 각 태스크 시행마다 관리되고 주기적으로 애플리케이션 마스터에 전송되므로 결국 전역적으로 수집된다.
태스크 카운터는 마지막 전송 이후에 변경된 수치만 보내는 것이 아니라 보낼 때마다 누적된 전체 수치를 전송하므로 메세지 유실로 인한 오류를 방지할 수 있다. 또한 잡 실행 도중 태스크가 실패하면 카운터는 중단된다.
카운터의 값은 잡이 성공적으로 끝나야 의미가 있다. 하지만 어떤 카운터는 태스크 진행 도중에도 의미 있는 진단 정보를 제공하므로 웹 UI를 통해 이를 모니터링 하는 것도 유용하다. 예를 들어 PHYSICAL_MEMORY_BYTES, VIRTUAL_MEMORAY_BYTES, COMMITTED_HEAP_BYTES는 특정 태스크의 메모리 사용량 변화 정보를 실시간으로 보여준다.
내장 태스크 카운터는 맵리듀스 태스크 카운터 그룹과 관련 카운터 그룹에 속한 카운터를 포함한다.
-내장 맵리듀스 태스크 카운터
| 카운터 | 설명 |
| 맵 입력 레코드 (MAP_INPUT_RECORDS) |
잡의 모든 맵이 처리한 입력 레코드 수,RecordReader로 한 레코드를 읽은 후 프레임워크를 통해 맵의 map() 메서드로 전달할 때마다 증가한다. |
| 스플릿 원시 바이트 (SPLIT_RAW_BYTES) |
맵이 읽은 입력 스플릿 객체의 바이트 수, 이 객체는 스플릿 데이터 자체가 아닌 스플릿의 메타데이터(즉, 파일의 오프셋과 길이)이므로 전체 크기는 작다. |
| 맵 출력 레코드 (MAP_OUTPUT_RECORDS) |
잡의 모든 맵이 생성한 출력 레코드 수, 맵의 OutputCollector의 collect() 메서드를 호출할 때마다 증가한다. |
| 맵 출력 바이트 (MAP_OUTPUT_BYTES) |
잡의 모든 맵이 생성한 압축되지 않은 바이트 수, 맵의 OutputCollector의 collect() 메서드를 호출할 때마다 증가한다. |
| 맵 출력 실질 바이트 (MAP_OUTPUT_MATERIALIZED_BYTES) |
실제로 디스크에 쓰여진 맵 출력 바이트수, 맵의 출력을 압축할 때만 카운터 값에 반영된다. |
| 컴바인 입력 레코드 (COMBINE_INPUT_RECORDS) |
잡의 모든 컴바이너(존재할 때만)가 처리한 입력 레코드 수,컴바이너의 반복자가 값을 읽을 때마다 증가한다. 이 수치는 컴바이너가 처리한 값의 개수다. 유일한 키 그룹의 개수를 제공하지 않는 이유는 컴바이너에 키별로 하나의 그룹만 존재하는 것은 아니기 때문이다. 따라서 유용한 지표가 될 수 없다. |
| 컴바인 출력 레코드 (COMBINE_OUTPUT_RECORDS) |
잡의 모든 컴바이너(존재할 때만)가 생성하는 출력 레코드 수, 컴바이너의 OutputCollector의 collect() 메서드를 호출할 때마다 증가한다. |
| 리듀스 입력 그룹 (REDUCE_INPUT_GROUPS) |
잡의 모든 리듀서가 처리하는 유일한 키 그룹 수, 프레임워크가 리듀서의 reduce() 메서드를 호출할 때마다 증가한다. |
| 리듀스 입력 레코드 (REDUCE_INPUT_RECORDS) |
잡의 모든 리듀서가 처리하는 입력 레코드수, 리듀서의 반복자가 값을 읽을때마다 증가한다. 리듀서가 입력을 모두 처리했다면 이 수치는 맵의 출력 레코드 수와 일치해야 한다. |
| 리듀스 출력 레코드 (REDUCE_OUTPUT_RECORDS) |
잡의 모든 맵이 생성하는 리듀스 출력 레코드 수, 리듀서의 OutputCollector의 collect() 메서드를 호출할 때마다 증가한다 |
| 리듀스 셔플 바이트 (REDUCE_SHUFFLE_BYTES) |
셔플에 의해서 리듀서로 복사된 맵 출력 바이트 수 |
| 디스크에 쓰여진 레코드 (SPILLED_RECORDS) |
잡의 모든 맵과 리듀스 태스크에서 디스크로 쓴 레코드 수 |
| CPU 밀리초(CPU_MILLISECONDS) | /proc/cpuinfo에 기록된 태스크의 밀리초 단위 누적 CPU 시간 |
| 물리 메모리 바이트 (PHYSICAL_MEMORY_BYTES) |
/proc/meminfo에 기록된 태스크가 사용된 물리 메모리 바이트 |
| 가상 메모리 바이트 (VIRTUAL_MEMORY_BYTES) |
/proc/meminfo에 기록된 태스크가 사용한 가상 메모리 바이트 |
| 커밋된 힙 바이트 (COMMITTED_HEAP_BYTES) |
Runtime.getRuntime().totalMemory()의 결과인 JVM에서 사용가능한 전체 메모리 바이트 크기 |
| GC 밀리초 시간 (GC_TIME_MILLIS) |
GarbegeCollectorMXBean.getCollectionTime()의 결과인 태스크의 밀리초 단위 가비지 컬렉션 경과 시간 |
| 셔플된 맵(SHUFFLED_MAPS) | 셔플에 의해 리듀서로 전송된 맵 출력 파일 수 |
| 실패한 셔플(FAILED_SHUFFLE) | 셔플 진행중에 복사에 실패한 맵 출력 수 |
| 병합된 맵 출력(MERGED_MAP_OUTPUTS) | 셔플의 리듀서 측에서 병합된 맵 출력 수 |
-내장 파일 시스템 태스크 카운터
| 카운터 | 설명 |
| 파일시스템에서 읽은 바이트 (BYTES_READ) |
맵과 리듀스 태스크가 파일시스템에서 읽은 바이트 수, 각 파일시스템 마다 카운터가 존재하며 로컬,HDFS,S3 등의 파일시스템이 있다. |
| 파일시스템에 쓴 바이트(BYTES_WRITTEN) | 맵과 리듀스 태스크가 파일시스템에 쓴 바이트 수 |
| 파일시스템 읽기 동작(READ_OPS) | 맵과 리듀스 태스크가 파일시스템에 수행한 읽기 동작(예를 들면 열기, 파일 상태 보기)수 |
| 파일시스템 대량 읽기 동작(LARGE_READ_OPS) | 맵과 리듀스 태스크가 파일시스템에 수행한 대량 읽기 동작(예를 들면 큰 디렉터리를 나열)수 |
| 파일시스템 쓰기 동작(WRITE_OPS) | 맵과 리듀스 태스크가 파일시스템에 수행한 쓰기 동작(예를 들면 생성, 추가)수 |
-내장 FileInputFormat 태스크 카운터
| 카운터 | 설명 |
| 읽은 바이트(BYTES_READ) | 맵 태스크가 FileInputFormat을 통해 읽은 바이트 수 |
-내장 FileOutputFormat 태스크 카운터
| 카운터 | 설명 |
| 쓴 바이트(BYTES_READ) | 맵 태스크나 리듀스 태스크가 FileOutputFormat을 통해 쓴 바이트 수 |
잡 카운터
잡 카운터는 애플리케이션 마스터에 의해 유지되므로 사용자 정의 카운터를 비롯한 다른 모든 카운터와 달리 네트워크를 통해 카운터를 전달할 필요는 없다.잡 카운터는 태스크 수행 중에 변경되는 값이 아닌 잡 수준의 통계 값을 측정한다. 예를들어 TOTAL_LAUNCHED_MAPS 카운터는 잡 과정에서 실행된 맵 태스크 수(실패한 태스크 포함)를 센다.
-내장 잡 카운터
| 카운터 | 설명 |
| 실행된 맵 태스크 (TOTAL_LAUNCHED_MAPS) |
실행된 맵 태스크 수. 투기적으로 시작된 태스크도 포함한다. |
| 실행된 리듀스 태스크 (TOTAL_LAUNCHED_REDUCES) |
실행된 리듀스 태스크 수,투기적으로 시작된 태스크도 포함한다. |
| 실행된 우버 태스크 (TOTAL_LAUNCHED_UBERTASKS) |
실행된 우버 태스크 수 |
| 우버 태스크 내의 맵(NUM_UBER_SUBMAPS) | 우버 태스크 내의 맵 수 |
| 우버 태스크 내의 리듀스(NUM_UBER_SUBREDUCES) | 우버 태스크 내의 리듀스 수 |
| 실패한 맵 태스크(NUM_FAILED_MAPS) | 실패한 맵 태스크 수 |
| 실패한 리듀스 태스크(NUM_FAILED_REDUCES) | 실패한 리듀스 태스크 수 |
| 실패한 우버 태스크(NUM_FAILED_REDUCES) | 실패한 우버 태스크 수 |
| 강제 종료된 맵 태스크(NUM_KILLED_MAPS) | 강제 종료된 맵 태스크 수. |
| 강제 종료된 리듀스 태스크(NUM_KILLED_REDUCES) | 강제 종료된 리듀스 태스크 수 |
| 데이터 로컬 맵 태스크(DATA_LOCAL_MAPS) | 입력 데이터와 동일한 노드에서 실행된 맵 태스크 수 |
| 랙 로컬 맵 태스크(RACK_LOCAL_MAPS) | 입력 데이터와 동일한 랙이지만 데이터-로컬이 아닌 노드에서 실행된 맵 태스크 수 |
| 그 외 로컬 맵 태스크(OTHER_LOCAL_MAPS) | 입력 데이터와 다른 랙의 노드에서 실행된 맵 태스크 수. 보통 랙 간의 대역폭은 크지 않고, 하둡은 맵 태스크를 입력 데이터와 최대한 가까이 두려고 노력하므로 이 수치는 최대한 낮은 것이 좋다. |
| 맵 태스크 전체 시간(MILLIS_MAPS) | 맵 태스크 수행에 걸린 밀리초 단위의 전체 시간. 투기적으로 시작된 태스크를 포함한다. 코어와 메모리 사용량(VCORES_MILLIS_MAPS와 MB_MILLIS_MAPS)을 측정한 카운터를 참조하라. |
| 리듀스 태스크 전체 시간(MILLIS_REDUCES) | 리듀스 태스크 수행에 걸린 밀리초 단위의 전체 시간.투기적으로 시작된 태스크를 포함한다. 코어와 메모리 사용량(VCORES_MILLIS_REDUCES와 MB_MILLIS_REDUCES)을 측정한 카운터를 참조하라. |
9-1-2 사용자 정의 자바 카운터
맵리듀스는 사용자 코드 수준에서 카운터 집합을 정의하게 해주며, 매퍼와 리듀서에서 원하는 방식으로 증가시킬 수 있다. 카운터는 연관된 카운터를 묶을 수 있도록 자바 enum으로 정의한다. 잡은 임의 개수의 enum을 정의할 수 있으며, 각 enum은 임의 개수의 필드를 갖는다. enum의 이름은 그룹명이고 enum의 필드는 카운터 명이다. 카운터는 전역이며, 맵리듀스 프레임워크는 잡이 끝나는 시점에 총계를 구하기 위해 모든 맵과 리듀스로부터 카운터를 수집한다.
6장에서 기상 데이터 셋에 있는 잘못된 레코드를 세는 몇개의 카운터를 만들어봤다 밑에 예제는 누락된 레코드와 기온 특성 코드의 분포를 계산하는 예제를 확장한 것이다.
-최고 기온을 구하는 애플리케이션, 누락되거나 잘못된 필드, 특성 코드를 계산하는 카운터를 포함하고 있다.
public class MaxtemperatureWithCounters extends Configured implements Tool{
enum Temperature {
MISSING,
MALFORMED
}
static class MaxTemperatureMapperWithCounters
extends Mapper<LongWritable, Text, Text, IntWritable> {
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException{
parser.parse(value);
if (parser.isValidTemperature()) {
int airTemperature = parser.getAirTemperature();
context.write(new Text(parser.getYear()),
new IntWritable(airTemperature));
} else if (parser.isMalformedTemperature()) {
System.err.println("Ignoring possibly corrupt input: " + value);
context.getCounter(Temperature.MALFORMED).increment(1);
} else if (parser.isMissingTemperature()) {
context.getCounter(Temperature.MISSING).increment(1);
}
context.getCounter("TemperatureQuality", parser.getQuality()).increment(1);
}
}
@Override
public int run(String[] args) throws Exception{
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (job == null){
return -1;
}
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(MaxTemperatureMapperWithCounters.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MaxTemperatureWithCounters(), args);
System.exit(exitCode);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MaxTemperatureWithCounters(), args);
System.exit(exitCode);
}
}
이 프로그램이 무엇을 하는지 궁금하다면 일단 전체 데이터셋으로 실행해보자.
% hadoop jar hadoop-examples.jar MaxTemperatureWithCounters \
input/ncdc/all output-counters
잡이 성공적으로 끝나면 잡 클라이언트는 마지막에 카운터를 출력한다. 우리가 궁금한 카운터의 값은 다음과 같다.
Air Temperature Records
Malformed=3
Missing=66136856
TemperatureQuality
0=1
1=973422173
2=1246032
4=10764500
5=158291879
6=40066
9=66136858
기온 카운터의 가독성을 높이기 위해 enum 뒤에 리소스 번들(내부 클래스를 위해 '_'를 구분자로 사용)을 사용했다. 여기서 사용된 리소스 번들은 MaxTemperatureWithCounters_Temperature.properties며, 화면에 표시할 이름을 지정할 수 있다.
동적 카운터
이번에는 동적카운터를 사용한 코드를 작성해보자. 동작 카운터는 자바 enum으로 정의하지 않는다. 자바 enum의 필드는 컴파일 시점에 정의되기 때문에 enum을 이용해서 동적으로 새로운 카운터를 생성할 수 없다. 여기서 우리는 기온 특성 코드의 분포를 계산할 것이다. 기온 특성 코드가 취할 수 있는 값을 정의한 포맷 명세가 있지만, 실제로 취하는 값을 출력하기 위해서는 동적 카운터를 사용하는 것이 더 편리하다. Context 객체의 메서드는 그룹명과 카운트 명을 String으로 받는다.
public Counter getCounter(String groupName, String counterName)
카운터를 생성하고(enum 사용) 접근하는(문자열 사용) 두 가지 방법은 실제로는 동일하다. 하둡은 enum을 문자열로 변환해서 RPC로 카운터를 전송하기 때문이다. enum은 작업하기 조금 수월하고 타입 안정성도 제공하므로 대부분의 잡에 적합하다. 동적으로 카운터를 생성하는 것이 예외적인 상황이라면 String 인터페이스를 사용할 수 있다.
카운터 반환
웹 UI와 명령행(mapred job -count)을 사용하는 방법도 있지만, 자바 API로도 카운터 값을 반환할 수 있다. 잡이 끝나는 시점에 카운터를 가져오는 것이 일반적이지만 잡이 안정적으로 동작되고 있을 때는 실행 중에도 카운터 값을 얻을 수 있다. 아래 코드에서는 누락된 기온 필드를 가진 레코드의 비율을 계산하는 프로그램이다.
-누락된 기온 필드를 가진 레코드의 비율을 계산하는 애플리케이션
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
public class MissingTemperatureFields extend Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
if (args.length != 1) {
JobBuilder.printUsage(this, "<job ID>");
return -1;
}
String jobID = args[0];
Cluster cluster = new Cluster(getConf());
Job job = cluster.getJob(JobID.forName(jobID));
if (job == null){
System.err.printf("No job with ID %s found. \n", jobID);
return -1;
}
if (!job.isComplete()) {
System.err.printf("Job %s is not complete.\n", jobID);
return -1
}
Counters counters = job.getCounters();
long missing = counters.findCounter(
MaxTemperatureWithCounters.Temperature.MISSING).getValue();
long total = counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
System.out.printf("Records with missing temperature fields: %.2f%%\n",
100.0 & missing / total);
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MissingTempeatureFields(), args);
System.exit(exitCode);
}
}
먼저 잡 ID를 인자로 getJob() 메서드를 호출하여 Cluster의 Job 객체를 반환하자. null 여부를 확인하여 주어진 ID를 가진 잡이 실제 존재하는지 확인해야 한다. ID를 잘못 지정하거나 잡이 더 이상 잡 히스토리에 없으면 잡이 존재하지 않을 수도 있다.
잡이 완료된 다음에 Job의 getCounters() 메서드를 호출하면 해당 잡의 모든 카운터를 담은 Counters 객체를 얻을 수 있다. Counters 클래스는 카운터의 이름과 값을 찾을 수 있는 다양한 메서드를 제공한다. enum을 인자로 받는 findCounter() 메서드를 호출하면 누락된 기온 필드를 가진 레코드 수와 처리한 총 레코드 수를 내장 카운터에서 확인할 수 있다.
마지막으로, 누락된 기온 필드를 가진 레코드의 비율을 출력한다. 다음은 전체 기상 데이터셋으로 얻은 결과다.
% hadoop jar hadoop-exampels.jar MissingTemperatureFields job_1410450250506_0007
Records with missing temperature fields: 5.47%
9-1-3 사용자 정의 스트리밍 카운터
스트리밍 맵리듀스 프로그램은 특별하게 포맷된 행을 표준 에러 스트림에 전달하여 카운터를 증가시킨다. 여기서 스트림은 제어 채널로 활용된다. 행은 다음과 같은 포맷을 따라야 한다.
reporter:counter:group,counter,amount
다음은 파이썬 코드의 일부로 'Temperature' 그룹의 'Missing'카운터를 1씩 증가시키는 방법을 보여준다.
sys.stderr.write("reporter:counter:Temperature,Missing, 1\n")
이와 유사하게 상태 메세지는 다음과 같은 포맷의 행으로 전달된다.
reporter:status:message
9-2 정렬
데이터를 정렬하는 기능은 맵리듀스의 핵심이다. 애플리케이션 자체가 정렬과 굳이 관련이 없더라도 데이터를 정리하기 위해 맵리듀스가 제공하는 정렬 과정을 사용할 수 있다. 이절에서는 데이터 셋을 정렬하는 다양한 방법과 맵리듀스가 정렬 순서를 제어하는 방법을 자세하게 살펴볼 것이다.
9-2-1 준비
우리는 기온을 기준으로 기상 데이터셋을 정렬할 것이다. 기온을 Text 객체로 저장하면 제대로 정렬할 수 없는데, 이는 부호를 가진 정수는 사전 순서대로 정렬할 수 없기 때문이다. 대신 데이터를 시퀸스 파일로 저장했는데, IntWritable 키는 기온(이제 제대로 정렬된다)이고, 데이터의 각 행은 Text 값을 가지도록 했다.
아래 코드에서 맵리듀스 잡은 잘못된 기온값을 가진 레코드를 제거하기 위해 입력 데이터를 필터링 하는 맵-단독 잡이다. 각 맵은 단일 블록 단위로 압축된 시퀸스 파일을 출력으로 생성하며, 다음에 나오는 명령으로 호출할 수 있다.
% hadoop jar hadoop-examples.jar SortDataPreprocessor input/ncdc/all \
input/ncdc/all-seq
- 기상 데이터를 시퀸스 파일 형태로 변환하는 맵리듀스 프로그램
public class SortDataPreprocessor extends Configured implements Tool{
static class CleanerMapper
extends Mapper <LongWritable, Text, IntWritable, Text> {
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
protected void map(LongWritable key, Text value, Context conetext)
throws IOException, InterruptedException {
parser.parse(value);
if (parser.isValidTemperature()) {
context.write(new IntWritable(parser.getAirTemperature()), value);
}
}
}
@Override
public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (job == null) {
return -1;
}
job.setMapperClass(CleanerMapper.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.Class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setCompressOutput(job, ture);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new SortDataPreprocessor(), args);
System.exit(exitCode);
}
}
9-2-2 부분 정렬
이전에 '기본 맵 리듀스 잡'에서 맵리듀스는 기본적으로 키를 기준으로 입력 레코드를 정렬한다는 것을 배웠다.
아래 코드는 IntWritable 키로 키쉰스 파일을 정렬하는 병현된 맵리듀스 잡이다.
- 기본 HashPartitioner를 사용하여 IntWritable 키로 SequenceFile을 정렬하는 맵리듀스 프로그램
public class SortByTemperatureUsingHashPartitioner extends Configured
implements Tool {
@Override
public int run(String[] args) throws Exception{
Job job = JobBuilder.parseInputAndOutput()
if (job == null) {
return -1;
}
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
return job.waitForCompletion(true) ? 0: 1;
}
public static void main(String[] args) throws Exception{
int exitCode = ToolRunner.run(new SortByTemperatureUsingHashPartitioner(),
args);
System.exit(exitCode);
}
}
정렬 순서 제어
키에 대한 정렬 순서는 RawComparator로 제어하며, 다음과 같이 설정한다.
1. mapreduce.job.output.key.comparator.class 속성을 명시적으로 설정하거나 Job의 setSortComparatorClass()를 호출하여 설정하면 해당 클래스의 객체가 사용된다. 이전 버전의 API 에서는 JobConf의 setOutputKeyComparatorClass()를 사용했다.
2. 그렇지 않으면 키는 반드시 WritableComparable의 서브클래스여야 하고, 해당 키 클래스를 이용한 등록된 비교자가 사용된다.
3. 등록된 비교자가 없으면 RawComparator가 사용된다. RawComparator는 비교할 바이트 스트림을 객체로 역직렬화 해서 WritableComparable의 compareTo() 메서드에 위임한다.
이규칙은 Writable 클래스를 위한 최적화된 버전의 RawComparator를 등록하는 것이 왜 중요한지 잘 설명해준다. 또한 자신만의 비교자를 등록하면 정렬 순서를 쉽게 변경할 수 있다.
30개의 리듀서로 이 프로그램을 수행해보자.
% hadoop jar hadoop-examples.jar SortByTemperatureUsingHashPartitioner \
-D mapreduce.job.reduces=30 input/ncdc/all-seq output-hashsort
이 명령어는 30개의 출력 파일을 생성하며 각 파일은 정렬되어 있다. 하지만 전체적으로 정렬된 하나의 파일을 생성하기 위해 각 파일을 병합하는 것은 쉽지 않다(예를 들어 일반 텍스트 파일에서 합치기).
많은 애플리케이션에서 전체 정렬은 중요한 이슈가 아니다. 예를 들어 키로 검색할 때는 부분 정렬된 파일셋만으로도 충분하다. 이 책의 예제 코드에 있는 SortByTemperatureToMapFile과 LookupRecordsByTemperature 클래스를 보면 쉽게 이해할 수 있을 것이다. 시퀸스 파일 대신 맵 파일을 사용하면 키가 속한 연관 파티션을 먼저 찾을 수 있고(파티셔너를 사용), 맵 파일 파티션에서 원하는 레코드를 효율적으로 검색할 수 있다.
9-2-3 전체 정렬
하둡을 사용하여 전체적으로 정렬된 파일을 생성하는 방법은 무엇일까? 가장 간단한 해법은 단일 파티션을 사용하는 것이다. 하지만 이 방법은 단일 머신에서 모든 출력을 처리해야 하므로 파일이 굉장히 클 때는 매우 비효율적이며, 맵리듀스가 제공하는 병렬 처리 아키텍처의 장점을 포기하는 것이기도 하다. 대신 정렬된 파일 집합을 생성하고 이를 모두 합치면 전체적으로 정렬된 하나의 파일을 얻을 수 있다. 비법은 출력의 전체 순서를 고려한 특정 파티셔너를 사용하는 것이다. 예를들어 4개의 파티션이 있다면 첫 번째 파티션에는 -10도 이하의 작은 기온 키를, 두번째 파티션에는 -10도와 0도 사이의 키를, 세번째에는 0도와 10도 사이의 키를, 네번째에는 10도 이상의 키를 두는것이다.
이 방법은 잘 동작하지만 각 파티션의 크기가 균등하도록 주의해야 한다. 한 파티션에 데이터가 몰리면 그 리듀서 때문에 잡의 전체 실행 시간이 늦어지기 때문이다. 방금 설명한 파티션 정책에서 각 파티션의 상대적인 크기는 다음과 같다.

결과를 보면 각 파티션의 크기가 균등하지 않다는 것을 알수가 있다. 균등한 파티션을 만들기 위해서는 전체 데이터셋의 기본 분포를 이해할 필요가 있다., 이를 위해 기온 버킷 집합에 속할 레코드 수를 세는 간단한 맵리듀스 잡을 작성해보자 아래 그림은 1도 단위의 버킷에 대한 분포를 보여주며, 그림에서 각 점은 하나의 버킷에 해당한다.
이러한 정보를 이용하여 매우 균등한 파티션 집합을 만들 수 있지만 이를 위해서는 전체 데이터 셋을 입력으로 잡을 실행해야 하므로 비효율적이다. 그대신 샘플링을 통해 키의 범위를 파악하는 방법으로 균등한 파티션 집합을 얻을 수 있다. 샘플링의 기본적인 아이디어는 키 분포를 짐작하기 위해 작은 크기의 키 집합을 살펴본 후 파티션 생성에 활용하는 것이다.다행히도 하둡은 다양한 샘플링 기능을 제공하므로 이를 위해 별도의 코드를 직접 작성할 필요는 없다.
InputSampler 클래스는 내부의 Sampler 인터페이스를 정의하며, Sampler의 구현체는 InputFormat과 Job을 입력으로 받아 키의 샘플을 반환한다.
public interface Sampler<K, V> {
K[] getSample(InputFormat<K, V> inf, Job job)
throws IOException, InterruptedException;
}

클라이언트는 Sampler 인터페이스를 직접 호출하지 않는다. 그 대신 InputSampler의 writePartitionFile() 정적 메서드를 사용한다. 이 메서드는 파티션의 키 범위를 저장하기 위해 시퀸스 파일을 생성한다.
public static<K, V> void writePartitionFile(Job job, Sampler<K, V> sampler)
throws IOException, ClassNotFoundException, InterruptedException
TotalOrderPatitioner는 정렬 잡에 필요한 파티션을 생성할 때 이 시퀸스 파일을 사용한다. 앞서 설명한 모든 내용이 아래 코드에 포함되어 있다.
-전체 데이터를 정렬하기 위해 TotalOrderPartitioner를 사용하여 IntWritable 키를 기준으로 시퀸스 파일을 정렬하는 맵리듀스 프로그램
public class SortByTemperatureUsingTotalOrderPartitioner extends Configured
implements Tool {
@Override
Public int run(String[] args) throws Exception{
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
return -1;
}
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
job.setPartitionerClass(TotalOrderPartitioner.class);
InputSampler.Sampler<IntWritable, Text> sampler =
new InputSampler.RandomSampler<IntWritable, Text>(0.1, 10000, 10);
InputSampler.writePartitionFile(job, sampler);
// DistributedCache에 추가
Configuration conf = job.getConfiguration()
String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);
URI partitionUri = new URI(partitionFile);
job.addCacheFile(partitionUri);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception{
int exitCode = ToolRunner.run(
new SortByTemperatureUsingTotalOrderPartitioner(), args);
System.exit(exitCode);
}
}
RandomSampler를 사용하여 균등 확률(예제에서 0.1로 지정)을 갖는 키를 추출했다. 또한 샘플의 최대 개수와 샘플의 최대 스플릿 수를 매개 변수로 지정했다. 애플리케이션에서 InputSampler가 수행될 때 두 매개변수의 기본값은 각각 10.000과 10이다. 샘플러는 이러한 제약조건 중에서 첫번 째 조건인 샘플의 최대수를 만족하면 멈춘다. 샘플러는 클라이언트에서 동작하므로 샘플러가 빠르게 수행되도록 다운로드하는 스플릿 수를 제한하는 것이 중요하다. 전체 잡의 실행 시간에서 샘플러의 비중은 상당히 작아야 한다.
InputSampler는 파티션 파일을 하나 생성한다. 클러스터에서 실행되는 모든 태스크가 공유할수 있도록 이 파일을 분산캐시에 반드시 추가해야 한다.
실행 결과를 보면 샘플러는 4개의 파티션에 대해 -5.6도 13.9도 22.0도를 파티션 경계로 결정했고, 파티션 크기는 전보다 더 균등하게 되었다.

입력 데이터의 특성에 적합한 최고의 샘플러를 선택하는 것이 중요하다. 예를 들어 SplitSampler는 스플릿 전체에서 키를 선택하지 않고 처음 n개의 레코드만 추출하므로 정렬된 데이터에는 적합하지 않다.
반면 IntervalSampler는 스플릿에서 일정한 간격으로 키를 추출하므로 정렬된 데이터에도 적용할 수 있다. RandomSampler가 가장 범용적인 샘플러다. 하지만 특정 애플리케이션에 적합한 샘플러가 없다면(대략적으로 균등한 크기의 파티션을 생성하는것이 샘플링의 핵심임을 기억해라) Sampler 인터페이스 구현체를 직접 작성할 수도 있다.
InputSampler와 TotalOrderPartitioner의 멋진 특성 중 하나는 파티션 수(즉, 리듀서 수)를 원하는 대로 지정할 수 있다는 것이다. 하지만 TotalorderPartitioner는 파티션의 경계가 명확할 때 제대로 동작한다는 점을 주의해야 한다. 만약 키의 범위가 매우 작은데 파티션 수가 크다면 충돌이 발생할 가능성이 있다.
다음과 같이 실행해보자.
% hadoop jar hadoop-examples.jar SortByTemperatureUsingTotalOrderPartitioner \
-D mapreduce.job.reduces=30 input/ncdc/all-seq output-totalsort
이 프로그램은 30개의 출력 파티션을 생성하며 각 파티션은 내부적으로 정렬되어 있다. 또한 모든 파티션에 대해 파티션 i의 모든 키는 파티션 i+1의 모든 키보다 작은 조건을 만족한다.
9-2-4 2차 정렬
맵리듀스 프레임워크는 리듀서로 보내기 전에 키를 기준으로 레코드를 정렬한다. 하지만 특정 키에 대한 값은 정렬되지 않는다. 여러 맵 태스크에서 값이 전달되고 각 태스크의 종료 시점은 실행할 때마다 다르기 때문에 값이 나타나는 순서는 일정하지 않다. 일반적으로 대부분의 맵리듀스 프로그램은 리듀스 함수에 전달되는 값의 순서와 상관없이 구현된다고 할 수 있다. 그러나 키를 정렬하고 그룹화할때 그 값도 정렬할 수 있는 특별한 방법이 있다.
이 방식을 설명하기 위해 연도별 최고 기온을 계산하는 맵리듀스 프로그램을 고려해보자. 만약 특정 값(기온)을 내림차순으로 정렬할 수 있다면 최곳값을 찾기 위해 전체 레코드를 조회하지 않아도 된다. 대신 각 연도의 첫 번째 값만 취하고 나머지는 무시한다(여기서 설명한 방법은 최고 기온 문제를 해결하는 효율적인 방법은 아니지만, 2차 정렬이 일반적으로 작동하는 방식을 잘 설명해준다).
이를 위해 키를 조합 키(연도와 기온)로 대체했다. 우리는 다음과 같이 키가 연도(오름차순)와 기온(내림차순)순으로 정렬되길 원한다.
1900 35º C
1900 34º C
1900 34º C
...
1900 36º C
1900 35º C
단지 키를 변경했을 뿐이라 별다른 도움이 되지 않는다. 그 이유는 동일한 연도의 레코드라도 키가 서로 다르므로 같은 리듀서로 전달되지 않기 때문이다. 예를 들어(1900,35ºC)와 (1900,34ºC)는 각기 다른 리듀서로 갈 수 있다.하지만 키의 연도 부분을 기준으로 분할되도록 파티셔너를 설정하면 동일한 연도의 레코드는 동일한 리듀서로 가도록 보장할 수 있다. 하지만 여전히 우리 목적을 달성하기엔 부족하다. 파티셔너는 한 연도의 모든 레코드가 하나의 리듀서로 전달되는 것을 보장할 뿐 리듀서가 파티션 내에서 키를 기준으로 레코드를 그룹화한다는 사실은 변경하지 않는다.

퍼즐의 마지막 부분은 그룹화를 제어하는 것이다. 리듀서에서 키의 연도 부분으로 값을 그룹화 하면 동일한 연도의 모든 레코드를 단일 리듀스 그룹에서 볼 수 있다. 그리고 레코드는 기온의 내림차순으로 정렬되므로 첫번째 값이 바로 최고 기온이다.

요약하면 값으로 정렬한 효과를 얻는 방법은 다음과 같다.
- 원래 키와 원래 값의 조합으로 새로운 키를 만든다
- 정렬 비교자는 조합 키(즉,원래 키와 원래 값)를 기준으로 정렬한다.
- 조합 키에 대한 파티셔너와 그룹화 비교자는 파티셔닝과 그룹화를 수행할 때 원래 키만 고려한다.
자바 코드
밑에 코드에서 앞서 설명한 모든 코드를 볼 수 있다. 이 프로그램은 일반 텍스트 입력을 다시 사용했다.
-키에서 기온을 정렬해서 최고 기온을 알아내는 애플리케이션
public class MaxTemperatureUsingSecondarySort
extends Configured implements Tool{
static class MaxTemperatureMapper
extend Mapper<LongWritable, Text, IntPair, NullWritable> {
private NcdcrecordParser parser = new NcdcRecordParser();
@Override
protected void map(LongWritable key, Text value,
Context context) throws IOException, InterruptedException {
parser.parse(value);
if (parser.isValidTemperature()) {
context.write(new IntPair(parser.getYearInt(),
parser.getAirTemperature()), NullWritable.get());
}
}
}
static class MaxTemperatureReducer
extends Reducer<IntPair, NullWritable, IntPair, NullWritable> {
@Override
protected void reduce(IntPair key, Iterable<NullWritalbe> values,
Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static class FirstPartitioner
extends Partitioner<IntPair, NullWritable>{
@Override
public int getPartition(IntPair key, NullWritable value, int numPartitions) {
// 섞기 위해 127을 곱한다.
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
public static class KeyComparator extends WritableComparator {
protected KeyComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());
if (cmp != 0){
return cmp;
}
return -IntPari.compare(ip1.getSecond(), ip2.getSecond()); //reverse
}
}
public static class GroupComparator extends WritableComparator {
protected GroupComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
return IntPair.compare(ip1.getFirst(), ip2.getFirst());
}
}
@Override
public int run(String[] args) throws Exception{
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (job == null){
return -1;
}
job.setMapperClass(MaxTemperatureMapper.class);
job.setPartitionerClass(FirstPartitioner.class);
job.setSortComparatorClass(KeyComparator.class);
job.setGroupingComparator(GroupComparator.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(IntPair.class);
job.setOutputValueClass(NullWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception{
int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args);
System.exit(exitCode);
}
}
매퍼에서 IntPairWritable 구현체를 사용하여 연도와 기온을 조합한 키를 생성했다. 리듀서는 키에서 첫 번째(최고) 기온을 얻을 수 있고 따라서 어떤 값도 필요 없기 때문에 NullWritable을 사용하여 값을 생성했다. 리듀서는 첫 번째 키를 내보내며,2차 정렬로 인해 이 값은 연도와 최고 기온을 담은 IntPair가 된다. IntPair의 toString() 메서드는 탭으로 구분된 문자열을 생성하므로 출력 결과는 탭으로 구분된 연도-기온 쌍의 집합이다.
FirstPartitioner라는 파티셔너를 직접 작성하여 조합 키의 첫 번째 필드인 연도를 기준으로 분할하도록 파티셔너를 설정했다. 키의 연도(오름차순)와 기온(내림차순)을 기준으로 정렬하려면 setSortComparatorClass()에 커스텀 정렬 비교자를 지정하여 필드를 추출하고 적절한 비교를 수행하면 된다. 이와 비슷하게 연도를 기준으로 키를 그룹화하려면 setGroupingComparatorClass()에 커스텀 비교자를 설정하여 키의 첫 번째 필드를 추출하고 비교를 수행하면 된다.
이 프로그램을 실행하면 연도별 최고 기온을 얻을 수 있다.
% hadoop jar hadoop-examples.jar MaxTemperatureUsingSecondarySort \
input/ncdc/all output-secondarysort
% hadoop fs -cat output-secondarysort/part-* | sort | head
1901 317
1902 244
1903 289
1904 256
1905 283
1906 294
1907 283
1908 289
1909 278
1910 294
스트리밍
하둡이 제공하는 몇 가지 라이브러리를 활용하면 스트리밍에서 2차 정렬을 수행할 수 있다. 다음은 2차 정렬을 수행하기 위해 사용할 수 있는 드라이버를 보여준다.
% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streming-*.jar \
-D stream.num.map.output.key.fields=2 \
-D mapreduce.partition.keypartitioner.options=-k1,1 \
-D mapreduce.job.output.key.comparator.class=\
org.apache.hadoop.mapred.lib.KeyFieldBasedCoparator \
-D mapreduce.partition.keycomparator.options="-k1n -2nr" \
-files secondary_sort_map.py, secondary_sort_reduce.py \
-input input/ncdc/all \
-output output-secondarysort-streaming \
-mapper ch09-mr-features/src/main/python/secondary_sort_map.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-reducer ch09-mr-features/src/main/python/secondary_sort_reduce.py
아래 파이쏜 코드의 맵 함수는 연도와 기온 필드를 가진 레코드를 내보낸다. 이 두개의 필드 조합을 키로 처리해야 하므로 stream.num.map.output.key.fields를 2로 설정한다. 이렇게 하면 자바 코드에서와 같이 키-값 중에서 값은 존재하지 않게 된다.
-파이썬으로 작성한 2차 정렬 맵 함수
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
(year, temp, q) = (val[15:19], int(val[87:92]), val[92:93])
if temp == 9999;
sys.stderr.write("reporter:counter:Temperature,Missing,1\n")
elif re.match("[01459]", q):
print "%s\t%s" % (year, temp)
partition.keypartitioner.options를 이용했으며, 파티셔너가 키의 첫 번째 필드만 사용하도록 지시하기 위해 그 값을 -k1,1로 지정했다. 여기서 키의 필드는 mapreduce.map.output.key.field.separator 속성에 정의된 문자열(기본 탭 문자)로 구분되었다고 가정한다.
다음으로 연도 필드는 오름차순으로 기온 필드는 내림차순으로 정렬하는 비교자를 지정했고, 따라서 리듀스 함수는 각 그룹의 첫 번째 레코드만 반환하면 된다.이를 위해 하둡은 KeyFieldBasedComparator라는 비교자를 제공한다. 비교 순서를 정의한 명세는 GNU 정렬에 사용되는 것과 유사하다. mapreduce.partition.keycomparator.options 속성으로 이를 설정할 수 있다. 여기서 사용된 -k1n -k2nr 값은 '첫번째 필드는 숫자 오름차순으로 정렬하고 두 번째 필드는 숫자 내림차순으로 정렬하라'는 의미다. 이 비교자의 사촌격인 KeyFieldBasedPartitioner파티셔너는 키를 필드로 분리할 때 맵 출력키 구분자를 이용한다.
자바 버전에서는 그룹화 비교자를 설정해야 했다. 하지만 스트리밍에서 그룹은 어떤 방식을 쓰더라도 그 경계를 알려주지 않으므로 리듀스 함수에서 연도가 변경되는 시점을 찾아서 그룹의 경계를 스스로 알아내야 한다.
-파이썬으로 작성한 2차 정렬을 위한 리듀스 함수
#!/usr/bin/env python
import sys
last_group = None
for line in sys.stdin:
val = line.strip()
(year, temp) = val.split("\t")
group = year
if last_group != group:
print val
last_group = group
스트리밍 프로그램을 실행하면 자바 버전과 동일한 결과를 얻을 수 있다.
마지막으로, KeyFieldBasedPartitioner와 KeyFieldBasedComparator는 스트리밍 프로그램에서만 제한적으로 사용되는 것은 아니며 자바 맵리듀스 프로그램에서도 사용할 수 있다.
9-3 조인
맵리듀스는 대용량 데이터셋 간의 조인을 수행할 수 있지만, 조인 코드를 밑바닥부터 작성하는 것은 매우 어려운 일이다. 맵리듀스 프로그램을 직접 작성하는 대신 피그, 하이브, 캐스케이딩,크럭, 스파크와 같은 고차원 프레임워크를 활용하는 것이 더 좋으며, 이들은 조인 연산을 구현체의 핵심 부분으로 제공하고 있다.
해결할 문제를 간단히 살펴보자. 예를 들어 기상관측소 데이터베이스와 기상 레코드 등 두 개의 데이터셋이 있고, 이 둘을 결합하고 싶다. 그리고 각 출력 행에 기상 레코드 기록과 함께 기상 관측소의 메타데이터를 포함하여 함께 보고 싶을것이다. 아래 테이블로 표현하였다.

조인을 구현하는 방법은 데이터셋이 얼마나 큰지 그리고 어떻게 분할되어 있는지에 따라 달라진다. 만약 하나의 데이터셋이 크고(기상 레코드) 다른 것은 클러스터 내에 각 노드로 분산시킬 수 있을 정도로 작다면(기상관측소 메타데이터), 조인 연산은 기상관측소별로 레코드를 모으는 맵리듀스 잡에 큰 영향을 받는다(예를 들면 기상관측소 아이디로 부분 정렬), 매퍼나 리듀서는 작은 데이터셋을 이용해서 기상관측소 아이디에 해당하는 기상관측소 메타데이터를 찾을 수 있으므로 각 기상 레코드와 함께 출력할 수 있다.
여기서는 클러스터 내의 노드로 데이터를 분산하는 방법에 대해서만 초점을 두었다.
매퍼에 의해 조인이 수행되면 맵-사이드 조인 이라 부르고, 리듀서에 의해 수행되면 리듀스-사이드 조인 이라고 부른다.
두 데이터셋 모두 클러스터의 각 노드에 복사하기에 너무 크더라도, 맵리듀스는 데이터가 어떻게 구조화되어 있느냐에 따라 맵-사이드 또는 리듀스-사이드 조인을 이용해서 조인을 수행할 수 있다. 흔한 예로 사용자 데이터베이스와 사용자 행동 로그(예를 들면 접근 로그)를 들 수 있다. 유명한 서비스의 경우 사용자 데이터베이스(또는 로그)가 매우 커서 모든 맵리듀스 노드에 분산시키는 것은 거의 불가능하다.
9-3-1 맵-사이드 조인
대용량 입력에 대한 맵-사이드 조인은 데이터가 맵 함수에 도달하기 전에 조인이 수행된다.이와 같이 작동하려면 각 맵의 입력이 특별한 방식으로 분할되고 정렬되어야 한다. 각 입력 데이터셋은 반드시 동일한 개수의 파티션으로 분할되어야 하며, 각 원본은 동일한 조인키로 정렬되어 있어야 한다. 다시 말해, 특정 키에 대한 모든 레코드는 동일한 파티션에 존재해야 한다. 이는 꽤 까다로운 요구조건으로 보이지만(실제로도 그렇다). 사실 일반적인 맵리듀스 잡의 출력 명세와 잘 맞아 떨어진다.
따라서 맵-사이드 조인은 동일한 개수의 리듀서, 동일한 키, 분리되지 않는 출력 파일(예를 들면 HDFS 블록보다 작거나 gzip으로 압축된 파일)을 가진 여러 잡의 출력을 조인하는 데 사용할 수 있다.날씨 예제에서 기상관측소 ID로 기상관측소 파일에 대해 부분 정렬을 수행하고 동일한 리듀서 수로 기상관측소 ID를 기준으로 같은 정렬을 수행하면 이 두 개의 출력에 대해 맵-사이드 조인을 실행할 수 있는 조건이 만족된다.
맵-사이드 조인을 수행하기 위해 org.apache.hadoop.mapreduce.join 패키지의 CompositeInputFormat을 사용할 수 있다. CompositeInputFormat을 위한 입력 데이터와 조인의 유형(내부 혹은 외부)은 간단한 문법에 따라 작성된 조인 표현식으로 설정된다. 이와 관련된 패키지 문서에서 자세한 설명과 예제를 볼 수 있다.
org.apache.hadoop.examples.Join 예제는 맵-사이드 조인을 수행하는 범용 명령행 프로그램으로, 조인 연산을 수행할 여러 입력에 대해 임의의 매퍼와 리듀서를 지정하고 맵리듀스 잡을 수행할 수 있다.
9-3-2 리듀스-사이드 조인
리듀스-사이드 조인은 맵-사이드 조인보다 더 일반적이다. 입력 데이터셋을 일부러 특별한 방식으로 구조화할 필요가 없기 때문이다. 하지만 두 데이터셋 모두 맵리듀스의 셔플 단계를 거쳐야 한다는 비효율적인 면이 있다. 기본적인 아이디어는 매퍼와 소스에 따라 각 레코드에 태그를 붙이고 조인키를 맵 출력키로 사용함으로써 동일한 키를 가진 레코드는 같은 리듀서에 함께 모이게 된다는 것이다. 실무에서 리듀스-사이드 조인을 수행하려면 다음에 나오는 몇 개의 요소를 알아야 한다.
- 다중 입력
- 일반적으로 데이터셋의 입력 원본은 서로 다른 포맷이므로 MultipleInputs 클래스를 사용하여 각 입력 원본을 분석하고 태깅하는 코드를 별도로 작성하는 것이 더 편하다.
- 2차 정렬
- 앞에서 설명한 대로 리듀서는 동일한 키를 가진 두 원본의 레코드를 볼 수는 있지만 원본의 순서를 보장하지는 않는다. 하지만 조인을 수행하려면 한 원본의 데이터를 다른 원본보다 먼저 처리하는 것이 중요하다. 기상 데이터 조인에서 기상관측소 레코드는 각 키의 값 중에서 앞부분에 있어야 한다. 그러면 리듀서는 뒤에 나오는 기상 레코드에 기상 관측소의 이름을 넣어 함께 출력할 수 있다.물론 메모리에 버퍼링한다면 어떤 순서로 받더라도 괜찮겠지만, 어떤 그룹의 레코드 수가 매우 커서 리듀서의 메모리 한도를 넘어가면 오류가 발생하므로 이를 피하는것이 좋다.
각 레코드에 태그를 붙이기 위해 키(기상관측소 ID를 저장하기 위한)와 태그에 TextPair를 사용했다. 태그 값을 사용할 때 유일한 제약사항은 기상관측소 레코드가 기상 레코드 보다 먼저 나타나도록 순서를 보장해야 한다는 것이다. 이를 위해 기상관측소 레코드는 0으로 기상 레코드는 1로 태그 값을 지정했다 아래 두 코드에서 이를 수행하는 매퍼 클래스를 볼 수 있다.
-리듀스-사이드 조인을 위해 기상관측소 레코드를 태깅하는 매퍼
public class JoinStationMapper
extends Mapper<LongWritable, Text, TextPair, Text> {
private NcdcStationMetadataParser parser = new NcdcStationMetadataParser();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if (parser.parse(value)) {
context.write(new TextPair(parser.getStationId(), "0"),
new Text(parser.getStationName()));
}
}
}
-리듀스-사이드 조인을 위해 기상 레코드를 태깅하는 매퍼
public class JoinRecordMapper
extends Mapper<LongWritable, Text, TextPair, Text> {
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
parser.parse(value);
context.write(new TextPair(parser.getStationId(), "1"), value);
}
}
리듀서는 기상관측소 레코드를 맨 처음 받는다는 것을 알고 있다. 따라서 그 값에서 기상관측소의 이름을 추출하고, 모든 출력 레코드에 그 값을 포함하여 기록한다.
-태깅된 기상관측소 레코드와 기상 레코드를 조인하는 리듀서
public class JoinReducer extends Reducer<TextPair, Text, Text, Text> {
@Override
protected void reduce(TextPair key, Iterable<Text> value, Context context)
throws IOException, InterruptedException {
Iterator<Text> iter = values.iterator();
Text stationName = new Text(iter.next());
while (iter.hasNext()) {
Text record = iter.next();
Text outValue = new Text(stationName.toString() + "\t" + record.toString());
context.write(key.getFrist(), outValue);
}
}
}
이 코드는 기상 레코드에 있는 모든 기상관측소 아이디와 일치하는 레코드가 기상관측소 데이터셋에 정확히 하나만 있다고 가정한다.만약 그렇지 않다면 다른 TextPair를 사용해서 값 객체에 태그를 넣어 코드를 일반화할 필요가 있다. reduce() 메서드는 기상 레코드를 처리하기 전에 어떤 항목이 기상관측소 이름인지 알아야 하며 누락되거나 중복된 항목을 발견하고 처리할 수 있어야 한다.
아래 코드는 잡을 엮어주는 드라이버 클래스다. 여기서 핵심은 키의 첫번째 부분인 기상관측소 아이디로 분할하고 그룹화하는 것이다. 이를 위해 직접 작성한 파티셔너인 KeyPartitioner와 그룹 비교자인 TextPair의 FirstComparator를 사용했다.
-기상 레코드와 기상관측소 이름을 조인하는 애플리케이션
public class JoinRecordWithStationName extends Configured implements Tool{
public static class KeyPartitioner extends Partitioner<TextPair, Text> {
@Override
public int getPartition(TextPair key, Text value, int numPartitions)
return (key,getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
@Override
public int run(String[] args) throws Exception {
if (args.length != 3) {
JobBuilder.printUsage(this, "<ncdc input> <station input> <output>");
return -1;
}
Job job = new Job(getConf(), "Join weather records with station names");
job.setJarByClass(getClass());
Path ncdcInputPath = new Path(args[0]);
Path stationInputPath = new Path(args[1]);
Path outputPath = new Path(args[2]);
MultipleInputs.addInputPath(job, ncdcInputPath,
TextInputFormat.class, JoinRecordMapper.class);
MultipleInputs.addInputPath(job, stationInputPath,
TextInputFormat.class, JoinStationMapper.class);
FileOutputFormat.setOutputPath(job, outputPath);
job.setPartitionerClass(KeyPartitioner.class);
job.setGroupingComparatorClass(TextPair.FirstComparator.class);
job.setMapOutputKeyClass(TextPair.Class);
job.setReducerClass(JoinReducer.class);
job.setOutputKeyClass(Text.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new JoinRecordWithStationName(), args);
System.exit(exitCode);
}
}
샘플 데이터로 프로그램을 수행하면 다음과 같은 결과가 나온다.
011990-99999 SIHCCAJAVRI 006778937298357923795793729857912...
011990-99999 SIHCCAJAVRI 002578937298357923795793729857912...
011990-99999 SIHCCAJAVRI 002578937298357923795793729857912...
011990-99999 TYNSET-AHANSMOEN 002578937298357923795793729857912...
012650-99999 TYNSET-AHANSMOEN 002578937298357923795793729857912...
9-4 사이드 데이터 분배
사이드 데이터는 잡이 주요 데이터셋을 처리하는 데 필요한 별도의 읽기 전용 데이터다.
모든 맵과 리듀 태스크에서 이 데이터를 쉽고 효율적으로 사용하는 방법은 간단하지 않다.
9-4-1 잡 환경 설정 사용
Configuration(이전 맵리듀스 API에서는 JobConf)의 다양한 setter 메서드를 사용하여 잡 환경 설정에 임의의 키-값 쌍을 설정할 수 있다. 이 방법은 작은 크기의 메타데이터를 각 태스크에 전달할 때 매우 유용하다.
각 태스크는 Context의 getConfiguration() 메서드가 반환하는 환경 설정에서 원하는 데이터를 얻을 수 있다. 이전 API에서는 Mapper나 Reducer의 configure() 메서드를 변경한 후 JobConf 객체의 getter 메서드로 데이터를 얻을 수 있었다. 일반적으로 데이터는 객체 필드에 저장되기 때문에 map() 또는 reduce() 메서드에서 사용할 수 있다.
보통 메타데이터를 인코딩할 때는 기본 타입만으로도 충분하지만, 임의의 객체라면 객체를 문자열로 변환하고 복원하는 방식으로 직렬화를 직접 구현하거나 하둡의 Stringifier 클래스를 활용해야 한다.
이러한 환경 설정 방식은 수 킬로바이트가 넘는 데이터를 전송할 때는 적합하지 않다. 맵리듀스 컴포넌트의 메모리 사용량에 부하를 주기 때문이다. 클라이언트, 애플리케이션 마스터, 태스크 JVM은 매번 잡 환경 설정을 읽으며 전혀 사용하지 않는 요소를 포함한 모든 요소를 메모리에 로드한다.
9-4-2 분산 캐시
잡 환경 설정에서 사이드 데이터를 직렬화하는 방법보다 하둡의 분산 캐시 기법으로 데이터셋을 분산하는 편이 더 낫다. 분산 캐시는 실행 시점에 파일과 아카이브의 사본을 태스크 노드에 복사하여 이를 이용하도록 해주는 서비스다. 네트워크 대역폭을 줄이기 위해 파일은 잡 단위로 특정 노드에 복사된다.
사용법
이 책의 많은 예제에서 사용한 GenericOptionsParser를 이용하는 도구를 활용하여 -files 옵션에 콤마로 분리된 URI 목록으로 배포할 파일을 지정하면 분산 캐시 기능을 활용할 수 있다.로컬 파일시스템,HDFS,S3와 같은 하둡이 읽을 수 있는 모든 파일시스템을 지정할 수 있다. 이를 구체적으로 지정하지 않으면(기본 파일시스템이 로컬 파일시스템이 아니더라도)로컬에 해당 파일이 존재한다고 간주한다.
-archives 옵션을 사용하면 아카이브 파일(JAR 파일, ZIP 파일, 타르 파일)을 태스크 노드에 복사할 수 있으며, 이 파일은 태스크 노드에 풀린다. 또한 -libjars 옵션을 사용하면 JAR파일을 매퍼와 리듀서 태스크의 클래스패스에 추가할 수 있다. 이 옵션은 특정 잡의 JAR 파일에 라이브러리 JAR 파일이 포함되어 있지 않을 때 유용하다.
분산 캐시를 사용하여 기상관측소 이름을 포함한 메타데이터 파일을 공유하는 방법을 알아보자.
% haddop jar hadoop-exampels.jar \
MaxTemperatureByStationNameUsingDistributedCacheFile \
-files input/ncdc/metadata/stations-fixed-width.txt input/ncdc/all output
이 명령은 로컬에 있는 stations-fiexd-width.txt 파일을 태스크 노드에 복사한다. 이렇게 하면 기상관측소의 이름을 찾을수 있다.
아래에 있는 MaxTemperatureByStationNameUsingDistributedCacheFile을 참조하자
-분산 캐시 파일로 전달된 테이블에서 기상관측소 이름을 조회하여 기상관측소별 최고 기온을 찾는 애플리케이션
public class MaxTemperatureByStationNameUsingDistributedCacheFile
extends Configured implements Tool {
static class StationTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
parser.parse(value);
if (parser.isvalidTemperature()) {
context.write(new Text(parser.getStationId()),
new IntWritable(parser.getAirTemperature()));
}
}
}
static class MaxTemperatureReducerWithStationLookup
extends Reducer<Text, IntWritable, Text, IntWritable>{
private NcdcStationMetadata metadata;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
metadata = new NcdcStationMetadata();
metadata.initialize(new File("stations-fixed-width.txt"));
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
String stationName = metadata.getStationName(key.toString());
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(new Text(stationName), new IntWritable(maxValue));
}
}
@Override
public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (job == null) {
return -1;
}
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(StationTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducerWithStationLookup.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(
new MaxTemperatureByStationNameUsingDistributedCacheFile(), args);
System.exit(exitCode);
}
}
이 프로그램은 기상관측소별 최고 기온을 찾기 때문에 매퍼인 StationTemperatureMapper는 단순히 기상관측소 아이디와 기온만을 내보낸다. 컴바이너는 2장과 6장에 나온 MaxTemperatureReducer를 재사용해서 맵의 출력 그룹별 최고 기온을 맵에서 구한다. 리듀서인 MaxTemperatureReducerWithStationLookup은 최고 기온을 찾는 것은 같지만 캐시 파일에서 기상관측소의 이름을 검색하기 때문에 컴바이너와는 조금 다르다.
리듀서의 setup() 메서드를 사용하면 태스크 작업 디렉터리의 상대 경로에서 원본 파일명으로 된 캐시 파일을 가져올 수 있다.
다음은 일부 기상관측소의 최고 기온을 출력한 결과다.
PEATS RIDGE WARATAH 372
STRATHALBYN RACECOU 410
SHEOAKS AWS 399
WANGARATTA AERO 409
MOOGARA 334
MACKAY AERO 331
작동 방식
하둡은 특정 잡을 구동할때 -files, -archives, -libjars 옵션으로 지정된 파일을 분산 파일 시스템(보통 HDFS)에 복사한다. 그리고 태스크를 실행하기 전에 태스크가 해당 파일에 접근 할 수 있도록 노드 매니저가 분산 파일시스템에 있는 파일을 로컬 디스크(캐시)에 복사한다. 이 시점에 파일이 로컬화 되었다고 말할 수 있으며, 태스크의 입장에서는 파일의 위치는 그대로이고 특정 태스크의 작업 디렉터리에 심벌릭 링크가 존재하는 것으로 볼 수 있다. 추가로 -libjars로 지정된 파일은 태스크가 실행되기 전에 태스크의 클래스패스에 포함된다.
노드 매니저는 캐시에 있는 각 파일을 이용하는 태스크 수를 파악하기 위해 참조 빈도수를 관리한다.태스크를 실행하기 전에 해당 파일의 참조 빈도수는 1씩 증가하고 태스크의 실행이 완료되면 1씩 감소한다. 캐시에 있는 특정 파일이 더 이상 필요 없으면 참조 빈도수는 0이 되고, 이 때 파일을 삭제할 수 있다. 그리고 노드의 전체 캐시 크기가 10GB를 넘어서면 새로운 파일의 공간을 확보하고자 최근-최소 사용 정책에 따라 파일을 삭제한다. 캐시의 크기는 yarn.nodemanager.localizer.cache.target-size-mb 속성에서 지정할 수 있다.
이러한 설계는 동일한 노드에서 실행되는 동일한 잡의 연속적인 태스크가 캐시에서 필요한 파일을 찾는 것을 보장하지는 않는다. 하지만 일반적으로 한 잡의 태스크는 거의 동일한 시간대에 실행되므로 다른 잡 때문에 기존 태스크의 파일이 캐시에서 삭제 될 가능성은 매우 적다.
분산 캐시 API
대부분의 애플리케이션은 분산 캐시 API를 사용할 필요가 없다. 왜냐면 GenericOptionsParser로 캐시를 사용할수 있기 때문이다. 그러나 GenericOptionsParser를 사용할 수 없을 때는 Job의 API를 이용하여 분산 캐시에 객체를 넣을수 있다. Job의 관련 메서드는 다음과 같다.
public void addCacheFile(URI uri)
public void addCacheArchive(URI uri)
public void setCacheFiles(URI[] files)
public void setCacheArchives(URI[] archives)
public void addFileToClassPath(Path file)
public void addArchiveToClassPath(Path archive)
캐시에 넣을 수 있는 파일과 아카이브 두 객체를 다시 고려해보자. 파일은 태스크 노드에 그대로 있지만 아카이브는 풀린 상태로 존재한다. 객체의 유형에 따라 세 개의 메서드가 존재한다. 파일이나 아카이브를 분산 캐시에 추가하는 addCacheXXXX() 메서드, 파일이나 아카이브의 전체 목록을 한번의 호출로 캐시에 추가하는(이전에 호출한 설정을 대체함) setCacheXXXXs() Path() 메서드가 있다. 아래 표에서는 이러한 API 메서드와 예전에 설명한 GenericOptionsParser 옵션을 비교했다.
| 잡 API 메서드 | 상응하는 GenericOptionsParser |
설명 |
| addCacheFile(URI uri) setCacheFiles(URI[] files) |
-files file1, file2, ... |
태스크 노드에 복사하기 위해 파일을 분산 캐시에 추가한다. |
| addCacheArchive(URI uri) setCacheArchives(URI[] files) |
-archives archive1,archive2,... |
아카이브를 태스크 노드에 복사하고 거기에 풀 수 있도록 분산 캐시에 추가한다. |
| addFileToClassPath(Path file) | -libjars jar1,jar2,... |
파일을 맵리듀스 태스크의 클래스패스에 추가하기 위해 분산 캐시에 추가한다.파일의 압축을 풀지 않는 방식이므로 JAR 파일을 클래스 패스에 추가할 때 유용하다. |
| addArchiveToClassPath(Path archive) | 없음 | 아카이브를 풀고 맵리듀스 태스크의 클래스패스에 추가하기 위해 분산 캐시에 추가한다. 다수의 파일이 있는 디렉터리를 클래스패스에 추가할 때 이를 포함한 아카이브를 생성하므로 매우 유용하다. 대안으로 JAR 파일을 생성하고 addFileToClassPath()를 사용하면 동일한 결과를 얻을수 있다. |
태스크에서 분산 캐시 파일을 얻어오는 방법은 앞에서 설명한 방법과 동일하게 파일명으로 직접 로컬화된 파일에 접근하는 것이다. 맵리듀스는 분산 캐시에 추가된 모든 파일과 아카이브를 가리키는 심벌릭 링크를 태스크의 작업 디렉터리에 항상 생성한다.
9-5 맵리듀스 라이브러리 클래스
하둡은 자주 사용하는 함수를 매퍼와 리듀서의 라이브러리로 제공한다 다음 표는 이 라이브러리를 간단한 설명과 함께 나열했다.
-맵리듀스 라이브러리 클래스
| 클래스 | 설명 |
| ChainMapper, ChainReducer | 단일 매퍼에서 매퍼 체인을 실행하고, 하나의 리듀서와 리듀서 내의 매퍼 체인을 각각 실행한다(M+Rm*로 표현할 수 있으며, 여기서 M은 매퍼, R은 리듀서). 여러 맵리듀스 잡을 실행하는 것과 비교해서 디스크 I/O 부하가 굉장히 줄어든다. |
| FieldSelectionMapReduce(이전 API), FieldSelectionMapper와 FieldSelectionReducer(새로운 API) |
입력키와 값으로부터 필드(유닉스 cut 명령어와 유사)를 선택하여 출력키와 값으로 내보낼 수 있는 매퍼와 리듀서 |
| IntSumReducer,LongSumReducer | 모든 키에 대한 정숫값을 합하여 총합을 구하는 리듀서 |
| InverseMapper | 키와 값을 맞바꾸는 매퍼 |
| MultithreadedMapRunner(이전 API), MultithreadedMapper(새로운 API) |
매퍼를 동시에 여러 스레드에서 병렬적으로 수행하는 매퍼(또는 이전 API에서는 맵 실행자). CPU 부하가 높지 않은 매퍼에서 유용하다. |
| TokenCounterMapper | 입력값을 단어로 쪼개어(자바의 StringTokenizer 사용)각 단어를 개수 1과 함께 내보내는 매퍼 |
| RegexMapper | 입력값에서 정규표현식에 부합하는 것을 찾고 개수 1과 함께 내보내는 매퍼 |
'책&스터디' 카테고리의 다른 글
| [하둡 완벽 가이드] - PART01 2장 정리 (0) | 2026.02.11 |
|---|---|
| [하둡 완벽 가이드] - PART01 1장 정리 (0) | 2026.02.10 |
| [하둡 완벽 가이드] - PART2 08장 맵리듀스 타입과 포맷 (0) | 2026.02.08 |
| [하둡 완벽 가이드] - PART2 07장 맵리듀스 작동 방법 (0) | 2026.02.07 |
| [하둡 완벽 가이드] - PART2 06장 맵리듀스 프로그래밍 (1) | 2026.02.06 |