책&스터디

[하둡 완벽 가이드] - PART2 08장 맵리듀스 타입과 포맷

jyu_seo_ 2026. 2. 8. 22:08

맵리듀스의 데이터 처리 모델은 단순하다. 맵과 리듀스 함수의 입력과 출력은 키-값 쌍으로 되어 있다.이 장에서는 맵리듀스 모델을 자세히 살펴볼 것이다. 특히 단순한 텍스트부터 구조화된 바이너리 객체까지 다양한 포맷의 데이터를 맵리듀스 모델에서 어떻게 처리하는지 살펴보겠다.

8-1 맵리듀스 타입

하둡 맵리듀스의 맵과 리듀스 함수의 형식은 다음과 같다.

map: (K1, V1) -> list(K2, V2)
reduce: (K2, list(V2)) -> list(K3, V3)

 

일반적으로 맵의 입력키와 값의 타입 (K1과 V1)은 맵의 출력 타입(K2와 V2)과 다르다. 하지만 리듀스의 입력은 맵의 출력과 반드시 같은 타입이어야 하며, 리듀스의 출력 타입(K3과 V3)과는 다를 수 있다. 맵리듀스 자바 API의 일반적인 형태는 다음과 같다.

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
	public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    // ...
    
    protected void map(KEYIN key, VALUEIN value, 
    	Context context) throws IOException, InterruptedException {
   // ...      
 }
}

public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
	public class Context extends ReducerContext<KEYIN, VALUEIN, KEYOUT<VALUEOUT> {
      // ...
  }
  protected void reduce(KEYIN key, Iterable<VALUEIN> values
  		Context context) throws IOException, InterruptedException {
   	  // ...       
   }
}

 

context 객체는 키-값 쌍을 내보낼 때 사용되며, 출력 타입으로 인자화된다. write() 함수의 원형은 다음과 같다.

public void write(KEYOUT key, VALUEOUT value)
	throws IOException, InterruptedException

Mapper와 Reducer는 별개의 클래스이므로 각각의 타입 매개 변수는 서로 다른 유효 범위를 갖는다. 따라서 동일한 이름의 KEYIN 타입 매개변수라도 Mapper와 Reducer의 실제 타입 인자는 서로 다를 수 있다. 예를 들어 이전 장에서 살펴본 최고 기온 예제에서 Mapper는 LongWritable을 Reducer는 Text를 각각 KEYIN으로 사용했다.

 

마찬가지로 맵의 출력 타입과 리듀스의 입력 타입은 반드시 일치해야 하지만 자바 컴파일러가 이를 강제할 수는 없다.

 

타입 매개변수와 추상타입의 이름은 다를수 있지만 (KEYIN과 K1), 그 형태는 동일해야 한다.

 

컴바인 함수는 Reducer의 구현체로, 리듀스 함수와 동일한 형태를 가진다. 컴바인 함수의 출력 타입은 중간 키-값 타입(K2와 V2) 이며, 컴바인의 출력은 리듀스 함수의 입력으로 전달된다.

map: (K1, V1) -> list(K2, V2)
combiner: (K2, list(V2)) -> list(K2, V2)
reduce: (K2, list(V2)) -> list(K3, V3)

 

컴바인 함수와 리듀스 함수는 같은 경우가 많다. 이 경우는 K3는 K2와 같고 V3는 V2와 같다.

 

파티션 함수는 컴바인의 중간 키-값 타입(K2와 V2)을 처리한 후 파티션 인덱스를 반환한다. 실제로 파티션은 순전히 키에 의해 결정된다(그 값은 무시한다).

partition: (K2, V2) -> integer

 

자바에서는 다음과 같이 구현한다.

public abstract class Partitioner<KEY, VALUE> {
	public abstract int getPartition(Key key, Value value, int numPartitions);
}

 

이전 API에서 맵리듀스 함수의 원형

이전 API를 보면 맵리듀스 원형은 거의 비슷하고 이전과 새로운 API의 제약 조건도 정확히 같지만 K1, V1 등 타입 매개변수의 이름을 실제로 지정했다.

public interface Mapper<k1, V1, K2, V2> extends JobConfigurable, Closeable {
	void map(K1 key, V1 Value,
    	OutputCollector<K2, V2> output, Reporter reporter) throws IOException;
}
public interface Reducer<k2, V2, K3, V3> extends JobConfigurable, Closeable {
	void reduce(K2 key, Iterator<V2> Values,
    	OutputCollector<K3, V3> output, Reporter reporter) throws IOException;
}

public interface Partitioner<k2, V2> extends JobConfigurable{
	int getPartition(K2 key, V2 Value, int numPartitions);
}

 

이론은 이쯤에서 접고, 맵리듀스 잡을 설정하는데 도움이 되는 방법을 살펴보자. 아래 표는 새로운 API의 환경 설정 옵션을 요약한 것이다. 표는 타입을 결정하는 속성과 설정된 타입의 상호 호환성으로 크게 나뉘어 있다.

 

입력 타입은 InputFormat을 통해 설정된다.예를 들어 TextInputFormat은 LongWritable 타입의 키와 Text 타입의 값을 생성한다. 다른 입력 타입은 Job의 메서드(이전 API의 JobConf)를 명시적으로 호출하여 설정한다. 중간 타입을 명시적으로 설정하지 않으면 자동으로 출력 타입과 같게 되어있다. 출력의 기본 타입은 LongWritable과 Text다. 따라서 만약 K2와 K3가 같다면 setMapOutputKeyClass()를 호출할 필요가 없다. setOutputKeyClass()에 정의된 타입으로 다시 설정되기 때문이다. 마찬가지로 V2와 V3가 같다면 setOutputValueClass() 만 사용하면 된다.

 

중간과 최종 출력 타입을 설정하는 함수가 각각 존재하는 것이 이상하게 보일 수 있다. 왜 매퍼와 리듀서의 조합으로 이러한 타입을 결정할 수 없는것일까? 해답은 자바 제네릭의 한계와 관련이 있다. 즉, 타입 삭제 때문에 런타임에 타입 정보가 항상 존재하지 않을 수 있으므로 타입 정보를 명시적으로 하둡에 제공해야 한다. 또한 컴파일 타임에 환경설정을 점검하지 않기 때문에 맵리듀스 잡과 호환되지 않는 타입을 설정할 가능성도 있다.  아래 하단에 맵리듀스 타입과 반드시 호환되어야 하는 설정이 있다. 타입 충돌은 잡을 실행하는 런타임에 발견된다. 이러한 이유 때문에 먼저 작은 데이터로 테스트 잡을 실행하여 타입 호환성에 문제가 없는지 점검하는것이 좋다.

 

-새로운 API에서 맵리듀스 타입 설정

속성 Job설정 메서드 입력K1 타입V1 중간K2 타입V2 출력K3 타입V3
타입설정 속성:
mapreduce.job.inputformat.class
setInputFormatClass() O O        
mapreduce.map.output.key.class setMapOutputKeyClass()     O      
mapreduce.map.output.value.class setMapOutputValueClass()       O    
mapreduce.job.output.key.class setOutputKeyClass()         O  
mapreduce.job.output.value.class setOutputValueClass()           O
타입과 반드시 일치해야 하는 속성:
mapreduce.job.map.class
setMapperClass() O O O O    
mapreduce.job.combine.class setCombinerClass()     O O    
mapreduce.job.partitioner.class setPartitionerClass()     O O    
mapreduce.job.output.key.comparator.class setSortComparatorClass()     O      
mapreduce.job.output.group.comparator.class setGroupingComparatorClass()     O      
mapreduce.job.reduce.class setReducerClass()     O O O O
mapreduce.job.outputformat.class setOutputFormatClass()         O O

 

 

-이전 API에서 맵리듀스 타입 설정

속성 Job설정 메서드 입력K1 타입V1 중간K2 타입V2 출력K3 타입V3
타입설정 속성:
mapred.input.format.class
setInputFormat() O O        
mapred.mapoutput.key.class setMapOutputKeyClass()     O      
mapred.mapoutput.value.class setMapOutputValueClass()       O    
mapred.output.key.class setOutputKeyClass()         O  
mapred.output.value.class setOutputValueClass()           O
타입과 반드시 일치해야 하는 속성:
mapred.mapper.class
setMapperClass() O O O O    
mapred.map.runner.class setMapRunnerClass() O O O O    
mapred.combiner.class setCombinerClass()     O O    
mapred.partitioner.class setPartitionerClass()     O O    
mapred.output.key.comparator.class setOutputKeyComparatorClass()     O      
mapred.output.value.groupfn.class setOutputValueGroupingComparator()     O      
mapred.reducer.class setReducerClass()     O O O O
mapred.output.format.class setOutputFormat()         O O

8-1-1 기본 맵리듀스 잡

매퍼와 리듀서를 설정하지 않고 맵리듀스를 실행하면 어떻게 될까? 다음과 같이 최소한의 맵리듀스 프로그램을 실행해서 알아보자.

public class MinimalMapReduce extends Configured implements Tool {

	@Override
    public int run(String[] args) throws Exception {
    	if (args.length != 2) {
        	System.err.printf("Usage: %s [generic options] <input> <output> \n",
            getClass().getSimpleName());
        ToolRunner.printGenericCommandUsage(System.err);
        return -1;
     }
     
     Job job = new Job(getConf());
     job.setJarByClass(getClass());
     FileInputFormat.addInputPath(job, new Path(args[0]));
     FileOutputFormat.setOutputPath(job, new Path(args[1]));
     return job.waitForCompletion(true) ? 0 : 1;
    }
    
    public static void main(String[] args) throws Exception {
    	int exitCode = ToolRunner.run(new MinimalMapReduce(), args);
        System.exit(exitCode);
    }
}

 

예제에서는 입출력 경로만 설정했다. 다음과 같이 기상 데이터의 일부를 가지고 실행해보자

% hadoop MinimalMapreduce "input/ncdc/all/190{1,2}.gz" output

 

실행 결과로 출력 디렉터리에 part-r-00000으로 명명된 하나의 파일이 생성된다.

 

각 행은 정수, 탭 문자, 원본 기상 데이터 레코드 순이다. 그다지 유용한 프로그램은 아니지만 결과를 생성하는 방법을 이해하면 맵리듀스 잡을 실행할때 하둡이 사용하는 기본적인 설정에 대한 통찰력을 얻을 수 있다. 아래 코드는 MinimalMapReduce와 같은 동작을 하는 프로그램이지만, 기본값을 명시적으로 설정했다는 점은 다르다.

 

-기본값을 명시적으로 설정한 최소한의 맵리듀스 드라이버

public class MinimalMapReduceWithDefaults extends Configured implements Tool{

	@Override
    public int run(String[] args) throws Exception{
    	Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
        if (job == null){
        	return -1;
        }
    	
        job.setInputFormatClass(TextInputFormat.class);
        
        job.setMapperClass(Mapper.Class);
        
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setPartitionerClass(HashPartitioner.class);
        
        job.setNumReduceTasks(1);
        job.setReducerClass(Reducer.class);
        
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        
        job.setOutputFormatClass(TextOutputFormat.class);
        
        return job.waitForCompletion(true) ? 0 : 1;
    }
      public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new MinimalMapreduceWithDefaults(), args);
      System.exit(exitCode);
      }
}

 

사용 방법을 출력하고 입력과 출력 경로를 설정하는 로직을 헬퍼 메서드로 분리해서 run() 메서드의 앞부분을 몇 줄로 줄였다. 대부분의 맵리듀스 드라이버는 두 개의 인자(입력과 출력 경로)를 취하므로 반복적인 코드를 줄이는 것은 좋은 습관이다. 참고로 별도로 분리한 JobBuilder 클래스의 메서드는 다음과 같다.

public static Job parseInputAndOutput(Tool tool, Configuration conf,
	String[] args) throws IOException {
    
    if(args.length != 2) {
    	printUsage(tool, "<input> <output>");
        return null;
    }
    Job job = new Job(conf);
    job.setJarByClass(tool.getClass());
    
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job;
  }
  
  
  public static void printUsage(Tool tool, String extraArgsUsage) {
  	System.err.printf("Usage: %s [genericOptions] %s\n\n",
    	tool.getClass().getSimpleName(), extraArgsUsage);
    GenericOptionsParser.printGenericCommandUsage(System.err);
  }

 

기본값을 명시적으로 설정한 최소한의 맵리듀스 드라이버 여기로 돌아가보자. 기본 잡 설정은 매우 많치만 예제에서 굵은 문자로 표시한 것은 잡을 실행하는데 가장 중요한 설정이다. 이를 하나씩 살펴보자.

 

기본 입력 포맷은 TextInputFormat이며, LongWritable 타입의 키(파일에서 행 시작 지점의 오프셋)와 Text 타입의 값(텍스트 행)을 생성한다. 이것은 최종 출력에 포함된 정수가 어디에서 왔는지 설명해준다. 이 정수는 행의 오프셋이다. 기본 매퍼는 Mapper 클래스며 입력키와 값을 변경 없이 그대로 출력한다.

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

	protected void map(KEYIN key, VALUEIN value,
    	Context context) throws IOException, InterruptedException {
      context.write((KEYOUT) key, (VALUEOUT) value);      
     }
}

 

Mapper는 제네릭 타입으로, 임의의 키 또는 값 타입을 지원한다. 여기서 맵의 입력키와 출력키는 LongWritable 타입이며, 맵의 입력값과 출력값은 Text 타입이다.

 

기본 파티셔너는 HashPartitioner로, 레코드 키의 해시값으로 레코드가 속할 파티션을 결정한다. 각 파티션은 단일 리듀스 태스크에서 처리되므로 파티션 수는 잡의 리듀스 태스크 수와 같다.

public class HashPartitioner<K,V> extends Partitioner<K,Y> {

	public int getPartition(K key, V value,
    	int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) & numReduceTasks;    
   }
}

 

키의 해시 코드는 최대 정숫값과 비트 단위 AND 연산을 하여 양의 정수 (0 포함)로 변환된다.

그리고 레코드가 속할 파티션을 결정하기 위해 전체 파티션 수로 나눈 나머지를 구한다.

 

리듀서 수는 기본적으로 하나이므로, 결국 단일 파티션만 존재하게 된다. 이 경우 모든 레코드가 하나의 파티션으로 모이기 때문에 파티셔너의 동작은 사실 의미가 없다. 하지만 두 개 이상의 리듀스 태스크가 존재한다면 HashPartitioner의 동작을 이해하는것은 매~우 중요하다! 키의 해시 함수가 잘 동작한다면 모든 레코드는 전체 리듀스 태스크에 걸쳐 균등하게 분산되며, 같은 키를 가진 모든 레코드는 동일한 리듀스 태스크에서 처리된다.

 

독자 중 누군가는 맵 태스크 수를 설정하지 않았다는 것을 알아챘을 것이다. 맵 태스크 수는 입력으로 부터 변환된 스플릿 수와 일치하는데, 이 수는 입력의 크기와 파일의 블록 크기(파일이 HDFS에 존재한다면)에 의해 결정되기 때문이다.


리듀서 수 선택

리듀서를 하나만 두는 것(기본값)은 하둡 초보자가 자주 범하는 실수다. 실제로 대부분의 잡은 리듀서 수를 기본값인 1보다 크게 설정하는 것이 좋다. 그렇지 않으면 모든 중간 데이터가 하나의 리듀스 태스크로 모여들기 때문에 잡이 굉장히 느려진다.

 

사실 잡의 리듀서 수를 결정하는 것은 과학보다는 예술에 가깝다. 보통 리듀서 수를 늘리면 병렬 처리 개수도 늘어나서 리듀스 단계에서 걸리는 시간을 줄일 수 있다. 그러나 너무 많이 늘리면 작은 파일이 너무 많이 생성되는 준최적화에 빠지게 된다.경험적으로 리듀서의 실행 시간은 5분 내외, 출력 파일의 HDFS 블록 수는 최소 1개로 잡는 것이 좋다.


기본 리듀서는 Reducer 클래스로, 역시 제네릭 타입니다. 이는 값을 변경하지 않고 모든 입력을 그대로 출력한다.

public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

	protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context conetext
    	Context context) throws IOEXception, InterruptedException {
    for (VALUEIN value: values) {
    	context.write((KEYOUT) key, (VALUEOUT) value);
    }     
   } 
}

 

잡의 출력키는 LongWritable이고, 출력값은 Text다. 사실 이 맵리듀스 프로그램의 모든 키는 LongWritable이고 값은 Text인데, 그 이유는 이들이 맵의 입력키와 값 타입이며 맵과 리듀스 함수 모두 타입을 보존하는 항등 함수이기 때문이다. 그러나 대부분의 맵리듀스 프로그램은 동일한 키나 값의 타입을 처음부터 끝까지 사용하지 않기 때문에 앞에서 설명한대로 사용할 타입을 직접 선언하여 잡을 설정할 필요가 있다.

 

리듀서로 레코드를 전달하기 전에 맵리듀스 시스템은 이들을 정렬한다. 이 예제에서는 숫자순으로 키를 정렬하므로 여러 입력 파일의 모든 행을 하나의 출력 파일로 병합 배치하는 효과를 갖는다.

 

기본 출력 포맷은 TextOutputFormat이며, 키와 값을 각각 문자열로 변환하고 탭 문자로 두 문자열을 구분하여 한 줄에 하나씩 레코드를 쓴다.

기본 스트리밍 잡

스트리밍의 기본 잡은 자바와 유사하지만 완전히 똑같지는 않다. 기본 형태는 다음과 같다.

% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
	-input input/ncdc/sample.txt \
    -output output \
    -mapper /bin/cat

논-자바(자바가 아닌) 매퍼를 지정하고 기본 텍스트 모드로 스트리밍 잡을 실행하면(-io text) 앞에서 본 자바 잡과는 조금 다르게 동작한다. 결과를 보면 매퍼 프로세스에 키를 전달하지 않고 단지 값만 전달한다(다른 입력 포맷에서 stream.map.input.ignoreKey 속성을 true로 지정하면 동일한 결과를 얻을 수 있다.). 이 기능은 굉장히 유용한데, 여기서 키는 사실 의미 없는 파일의 행 오프셋일 뿐이고 대부분의 애플리케이션이 관심 있어 하는 것은 바로 값이기 때문이다. 이 잡을 실행하면 입력 행을 정렬하는 효과를 얻을 수 있다.

 

다음 명령어를 보면 추가적인 기본 설정을 알 수 있다.(이전 맵리듀스 API 클래스를 사용한 스트리밍 예제란 것을 명심해야 한다.)

% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
	-input input/ncdc/sample.txt \
    -output output \
    -inputformat org.apache.hadoop.mapred.TextInputFormat \
    -mapper /bin/cat \
    -partitioner org.apache.hadoop.mapred.lib.HashPartitioner \
    -numReduceTasks 1 \
    -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
    -outputformat org.apache.hadoop.mapred.TextOutputFormat
    -io text

 

-mapper와 -reducer 옵션은 명령어나 자바 클래스를 인자로 받는다. 컴바이너는 -combiner 인자를 사용해서 정의할 수 있다.

스트리밍의 키와 값

스트리밍 애플리케이션은 키-값 쌍을 연속된 바이트로 변환하여 맵과 리듀스 프로세스에 표준입력으로 보낼 때 사용할 구분자를 제어할 수 있다. 기본 구분자는 탭 문자지만 키나 값에 탭 문자가 포함되어 있으면 구분자를 다른 것으로 변경해야 한다.

 

마찬가지로 맵이나 리듀스가 키-값 쌍을 출력할 때도 설정 구분자로 분리할 수 있다. 게다가 출력키는 첫 번째 필드만이 아니라 다른 필드를 추가할 수도 있다. 키와 값의 분리는 stream.num.map.output.key.fields와 stream.num.reduce.output.key.fields 속성에 정의하며, 처음 n개의 필드로 키를 구성하고 나머지 필드는 값이 된다. 예를 들어 스트리밍 프로세스의 출력 필드가 a,b,c고 n이 2라면 출력키는 a,b 가되고 출력값은 c가 된다.

 

맵과 리듀스에 개별적으로 구분자를 설정할 수 있다. 아래 표에 관련 속성을 볼수 있고 그림을 통해 데이터의 전체 흐름을 다이어그램으로 보여준다.

 

이러한 설정은 입력과 출력 포맷과는 아무런 관련이 없다. 예를들어 stream.reduce.output.field.separator가 콜론으로 설정되었고 리듀스 스트림 프로세스가 표준 출력으로 a:b 행을 썻다면 스트리밍 리듀서는 키는 a를 값은 b를 추출할 것이다. 여기서 TextOutputFormat을 사용하면 탭으로 구분된 a와 b를 출력 파일에 쓸 것이다. TextOutputFormat을 사용할 때 mapreduce.output.textoutputformat.separator를 설정하면 구분자를 변경할 수 있다.

 

-스트리밍 구분자 속성

속성명 타입 기본값 설명
stream.map.input.field.separator String \t 입력키와 값 문자열을 스트림 맵 프로세스에 바이트 스트림으로 보낼때 사용하는 구분자
stream.map.output.field.separator String \t 맵 출력을 위해 스트림 맵 프로세스의 출력을 키와 값 문자열로 분리 할때 사용하는 구분자
stream.num.map.output.key.fields int 1 stream.map.output.field.separator 속성 값으로 분리된 필드 중에서 맵 출력키로 처리할 필드 수
stream.reduce.input.field.separator String \t 입력키와 값 문자열을 스트림 리듀스 프로세스에 바이트 스트림으로 보낼때 사용하는 구분자
stream.reduce.output.field.separator String \t 최종 리듀스 출력을 위해 스트림 리듀스 프로세스의 출력을 키와 값 문자열로 분리할때 사용하는 구분자
stream.num.reduce.output.key.fields int 1 stream.reduce.output.field.separator 속성 값으로 분리된 필드 중에서 리듀스 출력키로 처리할 필드수

 

8-2 입력 포맷

하둡은 일반 텍스트 파일에서 데이터베이스에 이르기까지 다양한 유형의 데이터 포맷을 처리할 수 있다. 이 절에서는 다양한 입력 포맷을 살펴보겠다.

8-2-1 입력 스플릿과 레코드

2장에서 보았듯이 입력 스플릿은 하나의 맵에서 처리하는 입력 청크다. 각 맵은 하나의 스플릿을 처리하고 각 스플릿은 여러 개의 레코드로 나누어지며 맵은 각 레코드의 키-값 쌍을 차례로 처리한다. 여기서 스플릿과 레코드는 논리적인 개념이다. 즉, 물리적인 파일과 반드시 일치하는 것은 아니지만 대부분의 사례에서는 거의 일치한다. 데이터베이스에서 스플릿은 테이블의 특정 범위에 있는 행의 집합이며, 레코드는 해당 범위에 속한 행을 의미한다(관계형 데이터베이스에서 데이터를 읽는 입력 포맷인 DBInputFormat이 정확히 여기에 해당된다).

 

입력 스플릿은 자바 클래스 InputSplit으로 표현된다. 이 절에서 언급된 모든 클래스는 org.apache.hadoop.mapreduce 패키지에 존재한다.

public abstract class InputSplit {
	public abstract long getLength() throws IOException, InterruptedException;
    public abstract String[] getLocations() throws IOException,
    	InterruptedException;
}

 

InputSplit은 바이트 길이와 호스트네임 문자열로 된 저장소 위치의 집합이다. 여기서 스플릿은 입력 데이터 자체를 포함하지 않는다는 점을 주의하자. 이는 단지 데이터에 대한 참조 객체일 뿐이다. 저장소 위치 정보는 맵리듀스 시스템에서 사용되며 맵 태스크와 스플릿 데이터를 가능한 한 가장 가깝게 배치하기 위해 사용된다. 스플릿을 큰 것부터 먼저 처리하는 것이 잡의 실행 시간을 최소화 할 수 있기 때문에 바이트 길이를 사용하여 스플릿을 정렬하는데, 이 방법이 바로 탐욕적 근사 알고리즘 (greed approximation algorithm) 이다.

 

맵리듀스 애플리케이션을 작성할 때 InputSplits를 직접 다룰 필요는 없다. InputFormat이 입력 스플릿을 생성하고 이를 레코드로 분리하는 역할을 모두 맡고 있기 때문이다. InputFormat의 구체적인 예제를 보기 전에 맵리듀스에서 어떻게 사용되는지 간략히 살펴보자. 다음은 인터페이스다.

public abstract class InputFormat<K, V> {
	public abstract List<InputSplit> getSplits(JobContext context)
    	throws IOException, InterruptedException;
    
    
    public abstract RecordReader<K, V>
    	createRecordReader(InputSplit split, TaskAttemptContext context)
        	throws IOException, InterruptedException;
}

 

잡을 실행하는 클라이언트는 getSplits()를 호출하여 잡의 스플릿을 계산한 다음 애플리케이션 마스터에 보낸다. 애플리케이션 마스터는 스플릿의 저장 위치를 이용해서 클러스터에서 각 스플릿을 처리할 맵 태스크를 스케줄링 한다. 맵 태스크는 InputFormat의 createRecordReader() 메서드로 스플릿을 전달하여 해당 스플릿을 위한 RecordReader를 얻는다. RecordReader는 단순히 레코드에 대한 반복자며, 맵 태스크는 이를 사용해서 맵 함수로 전달할 레코드의 키-값 쌍을 생성한다. Mapper의 run() 메서드를 살펴보면 쉽게 이해할 수 있다.

public void run(Context context) throws IOException, InterruptedException {
	setup(context);
    while (context.nextKeyValue()) {
    	map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
	cleanup(context);
}

 

먼저 setup() 을 실행한 후 Context의 nextKeyValue()를 반복적으로 호출하여(내부적으로는 RecordReader의 동일 이름의 메서드로 위임함) 매퍼에서 사용할 키-값 객체를 가져온다. RecordReader에서 얻은 키-값은 Context를 거쳐 map() 메서드로 전달되는 방식으로 작업이 수행된다. RecordReader가 스트림의 끝에 도달하면 nextKeyValue() 메서드는 false를 반환하며 맵 태스크는 cleanup() 메서드를 수행한 후 종료된다. 마지막으로, Mapper의 run() 메서드는 public이며 사용자가 변경할 수 있다. MultithreadedMapper는 설정된 개수(mapreduce.mapper.multithreadedmapper.threads 속성으로 설정) 만큼의 스레드로 매퍼를 동시에 실행하는 구현체다. 대부분 데이터 처리 태스크는 병렬로 실행을 하더라도 기본 구현체에 비해 얻는 이득은 크지 않다. 그러나 각 레코드를 처리하는 시간이 매우 긴(예를 들면 외부 서버와 통신으로 인해) 매퍼라면 경쟁이 적이므로 단일 JVM에서 다수의 매퍼를 실행하는 것이 유리하다.

FileInputFormat

FileInputFormat은 파일을 원본 데이터로 사용하는 모든 InputFormat 구현체의 기본 클래스다. 이 클래스는 두가지를 제공한다. 하나는 잡의 입력 파일을 포함하는 위치정보고, 다른 하나는 입력 파일의 스플릿을 생성하는 구현체다. 여기서 스플릿을 레코드로 나누는 일은 서브클래스에 의해 이루어진다.

 

FileInputFormat 입력 경로

잡의 입력은 경로의 집합으로 지정되며, 잡의 입력을 제어하는 데 많은 유연성을 제공한다. FileInputFormat은 Job의 입력 경로를 설정하는 편리한 네 개의 정적 메서드를 제공한다.

public static void addInputPath(Job job, Path path)
public static void addInputPaths(Job job, String commaSeparatedPaths)
public static void setInputPaths(Job job, Path... inputPaths)
public static void setInputPaths(Job job, String commaSeparatedPaths)

 

addInputPath()와 addInputPaths() 메서드는 단일 경로 혹은 여러 경로를 입력 목록에 추가한다. 그리고 경로 목록을 만들기 위해 반복적으로 이 메서드를 호출할 수도 있다. 경로의 전체 목록을 단번에 설정하려면 setInputPaths () 메서드를 사용하면 된다.(이전에 호출하여 Job에 설정된 경로는 모두 대체된다).

 

경로는 파일, 디렉터리 또는 glob를 사용하여 파일과 디렉터리 집합을 나타낼 수 있다. 디렉터리를 경로로 사용하면 해당 디렉터리에 속한 모든 파일을 잡의 입력에 포함시킨다. add와 set 메서드를 사용해서 특정 파일만 지정할 수 있다. 만약 입력에서 특정 파일을 제외하려면 FileInputFormat의 setInputPathFilter()를 호출해서 필터를 설정하면 기본 필터에 추가되는 방식으로 동작한다. 즉, 숨겨지지 않은 파일 중에서 필터가 수용하는 것만 통과된다.

 

경로와 필터는 환경 설정 속성을 통해서도 아래 표처럼 지정할수 있다. 스트리밍 잡에서 이를 활용하면 쉽게 작업할 수 있다. 스트리밍 인터페이스에서 경로 설정은 -input 옵션으로 가능하므로 코드에서 직접 경로를 설정하지 않아도 된다.

- 입력 경로와 필터 속성

속성명 타입 기본값 설명
mapreduce.input.fileinputformat.inputdir 콤마로 구분된 경로 없음 잡에 대한 입력 파일. 콤마를 포함하는 경로는 콤마 앞에 역슬래시 문자로 처리해야 한다. 예를 들면 glob{a,b}는 {a\,b}로 처리해야 한다.
mapreduce.input.pathFilter.class PathFilter 클래스명 없음 잡 입력 파일에 적용할 필터

 

FileInputFormat 입력 스플릿

여러 파일이 있을 때 FileInputFormat은 스플릿을 어떻게 생성할까? FileInputFormat은 오직 큰 파일만 분리한다. 여기서 '크다'는 HDFS 블록보다 크다는 의미다. 스플릿의 크기는 보통 HDFS 블록의 크기와 같으며, 이는 대부분의 애플리케이션에 적합한 방식이다. 하지만 아래 표처럼 다양한 하둡 속성 설정으로 스플릿 크기를 직접 제어할 수 있다.

-스플릿 크기를 제어하는 속성

속성명 타입 기본값 설명
mapreduce.input.fileinputformat.split.minsize int 1 파일 스플릿의 최소 바이트 크기
mapreduce.input.fileinputformat.split.maxsize long Long.MAX_VALUE 파일 스플릿의 최대 바이트 크기
dfs.blocksize long 128MB HDFS 블록 바이트 크기

 

스플릿의 최소 크기는 1바이트지만, 일부 포맷은 스플릿 크기의 하한선을 가지고 있다. 예를 들어 시퀸스 파일은 스트림에 동기화 항목을 가끔씩 추가한다. 따라서 레코드 리더가 레코드의 경계를 알기 위해서는 모든 스플릿이 동기화 지점을 하나 이상 반드시 포함할 만큼 스플릿의 최소 크기가 커야 한다.

 

애플리케이션에서 최소 스플릿 크기를 지정할 수도 있다. 만약 블록 크기보다 큰 값을 설정하면 블록보다 큰 스플릿을 강제적으로 만들 수 있다. 이렇게 하면 맵 태스크와 동일한 로컬에 존재하는 블록 수가 줄어들기 때문에 HDFS를 사용하는 장점이 사라질 수도있다.

 

최대 스플릿 크기는 자바 long 타입으로 표현할 수 있는 최댓값이다. 최대 크기는 블록 크기보다 작을 때만 효과가 있다. 이때에는 하나의 블록을 작은 여러개의 스플릿으로 분리한다. 스플릿 크기는 다음 공식으로 계산할 수 있다(FileInputFormat의 computeSplitSize() 메서드를 참조하라)

max(minimumSize, min(maximumSize, blockSize))

기본적으로 크기는다음과 같다.

minimumSize < blockSize < maximumSize

 

일반적인 스플릿의 크기는 블록의 크기(blockSize)와 같다. 아래 표는 이러한 매개변수를 다양한 조합으로 설정했을 때 결정되는 최종 스플릿 크기를 잘 보여주고 있다.

최소 스플릿 크기 최대 스플릿 크기 블록 크기 스플릿 크기 비고
1(기본) Long.MAX_VALUE 128MB 128MB 기본적으로 스플릿 크기는 기본 블록 크기와 같다
1(기본) Long.MAX_VALUE 256MB 256MB 스플릿 크기를 증가시키는 가장 자연스러운 방법은 HDFS 블록을 크게 만드는 것인데, dfs.blocksize를 설정하거나 파일 생성 시점에 파일 단위로 설정할 수 있다.
256MB Long.MAX_VALUE 128MB 256MB 최소 스플릿 크기를 블록 크기보다 크게 하면 스플릿 크기를 증가시키지만 데이터 지역성에서 손해가 발생한다.
1(기본) 64MB 128MB 64MB 최대 스플릿 크기를 블록 크기보다 작게 하면 스플릿 크기가 줄어든다.

 

작은 파일과 CombineFileInputFormat

하둡은 작은 파일이 많을 때 보다 큰 파일이 적을 때 더 좋은 성능을 보인다. 그 이유는 FileInputFormat은 각 스플릿이 한 파일의 전체나 일부가 되도록 스플릿을 생성하기 때문이다. 파일의 크기가 작고('작다'는 HDFS 블록보다 훨씬 더 작음을 의미) 이러한 파일이 매우 많다면 작은 입력을 처리하는 맵 태스크가 굉장히 많이 필요하기 때문에 추가적인 북키핑 오버해드가 발생한다. 1GB의 파일을 8개의 128MB의 블록으로 분리한 것과 대략 10,000개 정도의 100KB 블록으로 나눈 것을 비교해보자. 10.000개의 파일에 대해 각각 맵 태스크를 사용하면 하나의 입력 파일에 8개의 맵 태스크를 사용한 것보다 잡의 전체 실행 시간이 10배 또는 100배 더 느려질 것이다.

 

작은 파일을 위해 설계된 CombineFileInputFormat을 사용하면 이러한 문제가 어느정도 해결된다. FileInputFormat은 파일당 하나의 스플릿을 생성하지만 CombineFileInputFormat은 여러 개의 파일을 하나로 묶어서 스플릿을 생성하므로 각 매퍼는 더 많은 데이터를 처리할 수 있다. 중요한 사실은 CombineFileInputFormat은 노드와 랙의 지역성을 고려하여 동일한 스플릿에 배치할 블록을 결정한다는 것이다. 따라서 전형적인 맵리듀스 잡에서 입력을 처리할 수 있는 속도를 보장한다.

 

물론 가능하다면 많은 수의 작은 파일과 같은 상황은 피하는 것이 좋다. 맵리듀스는 클러스터 내의 디스크 전송 속도 수준으로 동작하는 것이 가장 좋은데, 많은 수의 작은 파일 처리는 잡을 실행하는 데 필요한 탐색 횟수를 증가시키기 때문이다. 또한 HDFS에 작은 크기의 많은 파일을 저장하는것은 네임노드 메모리의 낭비를 초래한다. 많은 작은 파일사태를 피하는 방법중 하나는 [표:입력 경로와 필터 속성] 의 접근방법처럼 순차 파일을 이용해서 작은 파일들을 하나의 큰 파일로 병합하는 것이다. 순차 파일의 키는 파일이름(필요 없으면 NullWritable과 같은 상수), 값은 파일 내용으로 지정한다. 하지만 이미 HDFS에 많은 작은 파일이 있으면 CombineFileInputFormat도 괜찮은 방법이다.

스플릿 방지

일부 애플리케이션은 파일이 스플릿(분리)되지 않고 단일 매퍼가 각 입력파일 전체를 처리하길 원한다.

예를들어 파일의 모든 레코드가 정렬되어 있는지 확인하는 가장 간단한 방법은 현재 레코드가 이전 레코드보다 큰지 하나씩 순차적으로 살펴보는 것이다.이를 맵 태스크로 구현할 때 단일 맵으로 전체 파일을 처리해야 이 알고리즘이 제대로 작동한다.

 

기존의 파일을 스플릿 하지 않는 두가지 방법이 있다. 빠르고 간단한 첫번째 방법은 최소 스플릿 크기를 시스템에서 가장 큰파일보다 크게 설정하는 것이다. 최소 스플릿 크기를 가장 큰 값인 Long.MAX_VALUE로 설정하면 그 효과를 볼 수 있다. 두번째 방법은 사용하길 원하는 FileInputFormat의 구체화된 서브클래스의 서브클래스를 만들어 isSplitable() 메서드가 무조건 false를 반환하도록 하는것이다. 다음은 스플릿되지 않는 TextInputFormat의 예제다.

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class NonSplittableTextinputFormat extends TextInputFormat {
	@Override
    protected boolean isSplitable(JobContext context, Path file){
    	return false;
    }
}

 

매퍼의 파일 정보

파일 입력 스플릿을 처리하는 매퍼는 Mapper의 Context 객채의 getInputSplit() 메서드를 호출하여 스플릿에 대한 정보를 얻을 수 있다. FileInputFormat을 상속받은 입력 포맷은 getInputSplit() 메서드가 반환하는 InputSplit을 FileSplit으로 변환하여 아래 표에 있는 파일 정보에 접근할 수 있다.

 

이전 맵리듀스 API와 스트리밍 인터페이스에서는 매퍼의 환경 설정으로부터 읽은 속성을 통해 동일한 파일 스플릿 정보에 접근할 수 있다. 이전 맵리듀스 API에서는 Mapper 구현체에 있는 configure()를 구현하여 JobConf 객체에 접근했다.

FileSplit 메서드 속성명 타입 설명
getPath() mapreduce.map.input.file Path/String 처리할 입력 파일의 경로
getStart() mapreduce.map.input.start long 파일의 시작으로부터 스플릿 시작 지점
getLength() mapreduce.map.inputlength long 스플릿의 바이트 길이

 

다음 절에서는 스플릿의 파일명을 접근하기 위해 필요한 FileSplit 사용법을 알아본다.

파일의 전체 내용을 하나의 레코드로 처리하기

가끔은 파일의 전체 내용을 매퍼가 접근해야 하는 요구사항이 발생할 수도 있다. 이때에는 파일을 스플릿하지 않고 파일의 전체 내용을 하나의 레코드 값으로 전달하는 RecordReader를 사용하면 된다. 이러한 방법은 아래 코드의 WholeFileInputFormat에서 확인할 수 있다.

- 파일 전체 내용을 하나의 레코드로 읽는 InputFormat

public class WholeFileInputFormat
	extends FileInputFormat<NullWritable, BytesWritable> {
 
   @Override
   protected boolean isSplitable(JobContext context, Path file){
   	return false;
   }
   
   @Override
   public RecordReader<NullWritable, BytesWritable> createRecordReader(
   	InputSplit split, TaskAttempthContext context) throws IOException,
    InterruptedException {
   WholeFileRecordReader reader = new WholeFileRecordReader();
   reader.initialize(split, context);
   return reader;
  }  
 }

 

WholeFileInputFormat은 키는 필요하지 않으므로 NullWritable 객체로, 값(파일 내용)은 BytesWritable 객체로 선언했다. 두 개의 메서드를 정의했으며, 첫 번째 메서드는 isSplitable() 이 false를 반환하게 해서 입력 파일이 스플릿되지 않도록 코드를 작성했다.두번째 메서드는 아래코드에 나타낸 RecordReader의 구현체를 반환하도록 createRecordReader()를 구현했다.

 

- 파일 전체를 하나의 레코드로 읽기 위해 WholeFileInputFormat에서 사용한 RecordReader

class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {

	private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
	private boolean processed = false;
    
    @Override
    public void initilize(InputSplit split, TaskAttemptContext context)
    	throws IOException, InterruptedException{
     this.fileSplit = (FileSplit) split;
     this.conf = context.getConfiguration();        
}
	 @Override
     public boolean nextKeyValue() throws IOException, InterruptedException{
     	if(!processed){
        	byte[] contents = new byte[(int) fileSplit.getLength()];
            Path file = fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try {
            	in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            } fanally {
              IOUtils.closeStream(in);
            }
            processed = true;
            return true;
        }
     	return false;
     }

	 @Override
     public NullWritable getCurrentKey() throws IOException, InterruptedException {
     	return NullWritable.get();
     }
	
     @Override
     public BytesWritable getCurrentValue() throws IOException,
     	InterruptedException {
     return false   
     }
     
     @Override
     public float getProgress() throws IOException {
     	return processed ? 1.0f : 0.0f;
     }
     @Override
     public void close() throws IOException {
     	//아무것도 하지 않는다.
   }
}

 

WholeFileRecordReader는 FileSplit을 가져와서 null 키와 파일 바이트 값으로 이루어진 단일 레코드로 변환하는 역할을 맡고 있다. 단일 레코드만 존재하기 때문에 WholeFileRecordReader는 처리 또는 실패 중 하나고 Processed라는 불린 필드를 가진다. 만약 nextKeyValue() 메서드를 호출했을 때 파일이 처리된 상태가 아니라면 해당 파일을 열어 파일의 길이만큼 바이트 배열을 생성하고 하둡의 IOUtils 클래스를 사용해서 파일을 바이트 배열로 한번에 변환한다. 그러고 나서 next() 메서드에 전달된 BytesWritable 객체에 이 배열을 넣고 레코드 읽기를 완료했다는 신호로 true를 반환한다.

 

현재 키와 값 타임에 접근하여 레코드 리더의 진행 상황을 가져오는 간단한 북키핑 메서드와 레코드 리더가 종료될 때 맵리듀스 프레임워크에서 호출하는 close() 메서드가 추가로 있다.

 

WholeFileInputFormat 사용 방법을 보여주기 위해 여러 개의 작은 파일을 원본 파일명의 키와 파일 내용의 값을 가진 시퀸스 파일로 묶는 맵리듀스 잡을 하나 만들어보자 아래 코드에서 볼수 있다.

 

-여러개의 작은 파일을 하나의 시퀸스 파일로 묶는 맵리듀스 프로그램

public class SmallFailesToSequenceFileConverter extends Configured
	implements Tools{

static class SequenceFileMapper
	extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
    
    private Text filenameKey;
    
    @Override
    protected void setup(Context context) throws IOException,
    	InterruptedException {
       InputSplit split = conetxt.getInputSplit();
       Path path = ((FileSplit) split).getPath();
       filenameKey = new Text(path.toString());
    }
    @Override
    protected void map(NullWritable key, ByteWritable value, Context context)
    	throws IOException, InterruptedException {
     context.write(filenameKey, value);   
    }
   }
    @Override
    public int run(String[] args) throws Exception {
      Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
      if (job == null) {
      	return -1;
      }
    
    job.setInputFormatClass(WholeFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(BytesWritable.class);
    
    job.setMapperClass(SequenceFileMapper.class);
    
    return job.waitForCompletion(true) ? 0 : 1; 
   }
   
   	public static void main(String[] args) throws Exception {
    	int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), args);
        System.exit(exitCode);
    }
}

 

입력 포맷은 WholeFileInputFormat이므로 매퍼는 입력 파일 스플릿에 대한 파일명만 찾으면 된다. context의 InputSplit을 FileSplit으로 변환하면 파일 경로를 얻을 수 있는 메서드를 사용할 수 있다. 파일 경로는 키를 위한 Text 객체에 저장된다. 리듀서는 항등 함수(명시적으로 설정하지 않음)며 출력 포맷은 SequenceFileOutputFormat이다.

 

다음과 같이 몇 개의 작은 파일로 실행해보자. 두 개의 리듀서를 지정했고, 따라서 두 개의 시퀸스 파일을 결과로 얻을 수 있다.

% hadoop jar hadoop-examples.jar SmallFilesToSequenceFileConverter \
	-conf conf/hadoop-localhost.xml -D mapreduce.job.reduce=2 \
    input/smallfiles output

파일 시스템 쉘에서 -text 옵션으로 두개의 시퀸스 파일의 내용을 볼수 있다.

% hadoop fs -conf conf/hadoop-localhost.xml -text output/part-r-00000
hdfs://localhost/user/tom/input/smallfiles/a 61 61 61 61 61 61 61 61
hdfs://localhost/user/tom/input/smallfiles/c 63 63 63 63 63 63 63 63
hdfs://localhost/user/tom/input/smallfiles/e
% hadoop fs -conf conf/hadoop-localhost.xml -text output/part-r-00001
hdfs://localhost/user/tom/input/smallfiles/b 62 62 62 62 62 62 62 62
hdfs://localhost/user/tom/input/smallfiles/d 64 64 64 64 64 64 64 64
hdfs://localhost/user/tom/input/smallfiles/f 66 66 66 66 66 66 66 66

 

입력 파일의 이름은 a,b,c,d,e,f며 비어 있는 e 파일을 제외한 나머지 파일은 각각 파일 이름에 해당하는 10개의 문자를 가지고 있다(예를 들어 파일 a는 'a'문자 10개를 포함), 시퀸스 파일을 텍스트 형식으로 나타내면 파일명과 16진수의 파일 내용이 출력된다.

 

8-2-2 텍스트 입력

하둡은 비정형 텍스트 처리하는데 탁월하다. 이절에서는 텍스트를 처리하기 위해 하둡이 제공하는 InputFormat을 살펴보겠다.

TextInputFormat

TextInputFormat은 하둡의 기본 InputFormat이다. 각 레코드는 입력 데이터의 한 행이다. 키는 LongWritable로 파일에서 행이 시작되는 지점의 바이트 오프셋이고, 그 값은 행의 내용으로 행 종료 문자(개행 문자)를 제외한 후 Text 객체로 패키징된다. 다음과 같은 텍스트를 포함하고 있는 파일이 있다고 가정하자.

On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

 

이 파일은 네 개의 레코드를 가진 하나의 스플릿으로 분할된다. 각 레코드는 다음과 같이 키-값 쌍으로 해석된다.

(0, On the top of the Crumpetty Tree)
(33, The Quangle Wangle sat,)
(57, But his face you could not see,)
(89, On account of his Beaver Hat.)

분명히 레코드의 키는 행 번호가 아니다. 만약 행이 아닌 바이트를 경계로 파일을 여러 개의 스플릿으로 분할했다면 이런 방식의 구현은 불가능 했을 것이다. 스플릿은 독립적으로 수행되며, 행 번호는 순차적인 개념이다.사용자는 스플릿에서 행 번호를 아는 것은 가능하지만 파일 전체에서 행 번호를 아는것은 불가능하다.

 

하지만 각 스플릿은 각 행의 파일 내에서의 오프셋을 다른 스플릿과 상관없이 알 수 있으며 각 스플릿은 이전 스플릿의 크기를 알 수 있으므로 그 값을 현재 스플릿의 오프셋에 더하면 전체 파일에서의 오프셋 값을 계산할 수 있다. 파일 오프셋을 파일 이름과 결합하면 파일시스템의 유일한 값이 된다. 물론 모든 행이 고정된 길이를 가진다면 단순히 오프셋을 행의 길이로 나누기만 하면 행 번호를 얻을 수 있다.

 

행의 최대 길이 제어. 지금 설명하고 있는 텍스트 입력 포맷중 하나를 사용하면 손상된 파일에 대한 보호 장치를 마련하기 위해 행의 최대 길이 한계를 설정할 수 있다. 파일에서 손상된 부분은 매우 긴 행으로 나타날 수 있고, 따라서 메모리 부족 에러를 발생시켜 태스크 실패를 유발할 수 있다. 메모리 크기에 적당한(그리고 입력 데이터의 모든 행 길이를 충분히 만족시키는) 바이트 값을 mapreduce.input.linerecordreader.line.maxlength 속성에 설정하면 레코드 리더가(매우 긴) 손상된 행을 무시하여 태스크 실패가 일어나지 않도록 보장한다.

KeyValueTextInputFormat

TextInputFormat의 키는 단순히 파일 내에서의 오프셋이므로 실제로 활용되는 일은 거의 없다.일반적으로 파일의 각 행은 탭 문자로 구분된 키-값 쌍이다. 예를 들어 하둡의 기본 OutputFormat인 TextOutputFormat이 생성하는 출력이 대표적이다. 이와 같은 파일을 제대로 해석하려면 KeyValueTextInputFormat이 가장 적합하다.

 

mapreduce.input.Keyvaluelinerecordreader.key.value.separator 속성에 구분자를 명시 할 수 있다. 기본값은 탭 문자다. 다음 입력 파일을 살펴보자. 여기서 -> 는 탭 문자를 나타낸다.

line1 -> On the top of the Crumpetty Tree
line2 -> The Quangle Wangle sat,
line3 -> his face you could not see,
line4 -> On account of his Beaver Hat.

입력은 TextInputFormat 예제와 같이 4개의 레코드로 이루어진 단일 스플릿이지만, KeyValueTextInputFormat의 키는 각 행에서 탭 문자 앞에 있는 텍스트 문자열이다.

(line1, On the top of the Crumpetty Tree)
(line2, The Quangle Wangle sat,)
(line3, But his face you could not see,)
(line4, On account of his Beaver Hat.)

 

NLineInputFormat

TextInputFormat과 KeyValueTextInputFormat을 통해 각 매퍼는 가변 개수의 입력 행을 받게 된다.이 개수는 스플릿 크기와 행의 길이에 의해 좌우된다. 만약 매퍼가 고정된 개수의 입력 행을 받고자 한다면 NLineInputFormat을 InputFormat으로 사용하면 된다. TextInputFormat과 마찬가지로 키는 파일의 바이트 오프셋이며 값은 행의 내용이다.

 

N은 각 매퍼가 받게 될 입력 행의 개수다. N을 1로 설정하면(기본값) 각 매퍼는 정확히 하나의 입력 행을 받게 된다. N값은 mapreduce.input.lineinputformat.linespermap 속성으로 제어할 수 있다. 다시한번 다음과 같이 네 개의 행이 있는 상황을 고려해보자.

On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

만약 N 값이 2라면 각 스플릿은 두 개의 행을 갖는다. 첫 번째는 매퍼는 처음 두개의 키-값 쌍을 받을 것이다.

(0, On the top of the Crumpetty Tree)
(33, The Quangle Wangle sat,)

그리고 다른 매퍼는 다음 두 개의 키-값 쌍을 받을 것이다.

(57, But his face you could not see,)
(89, On account of his Beaver Hat.)

 

키와 값은 TextInputFormat이 생성하는 것과 동일하다. 다만 스플릿을 구성하는 방식은 차이가 있다.

일반적으로 매우 작은 입력 행을 맵 태스크에서 처리하는 것은 비효율적이지만(태스크 초기설정의 오버헤드 때문에) 소량의 입력데이터를 받아서 오랜 연산 작업(CPU 집중 작업)을 수행하여 결과를 얻는 애플리케이션이 있을 수 있다. 가장 대표적인 사례가 바로 시뮬레이션이다. 또 한 행마다 하나의 입력 매개변수를 명시한 입력 파일을 생성하여 매개변수 스윕을 수행할 수도 있다. 매개변수 스윕은 동시 다발적으로 시뮬레이션을 수행하여 어떤 매개변수를 변경할 때 그 모델이 어떻게 변화되는지 알 수 있는 기법이다.

 

다른 예로는 하둡을 사용하여 데이터베이스와 같은 다수의 데이터 출저로부터 데이터를 불러오는 것이다. 이를 위해 행당 하나의 데이터 출저를 나열한 '씨앗'입력 파일을 생성한다. 그 다음에 각 매퍼는 데이터 출처를 하나씩 할당받고 해당 출처로부터 데이터를 읽어서 HDFS에 로드한다. 이 잡은 리듀스 단계가 필요하지 않으므로 리듀서 수는 0으로 설정해야 한다(Job의 setNumReduceTasks()를 호출해서). 또한 HDFS에 로드된 데이터를 처리하는 맵리듀스 잡을 추가로 실행할 수도 있다.

XML

대부분 XML 분석기는 XML 문서 전체를 다루기 때문에 커다란 XMl 문서 하나가 여러 개의 입력 스플릿으로 분리되어 있다면 이를 개별적으로 파싱하는 것은 매우 어려운 일이다. 물론 XML 문서 전체가 매우 크지 않다면 '파일의 전체 내용을 하나의 레코드로 처리하기' 기법을 활용하여 하나의 매퍼로 처리할 수 있다.

 

일련의 '레코드' 로 이루어진 매우 큰 XMl 문서는 단순 문자열이나 정규표현식을 활용하여 레코드의 시작과 종료 태그를 찾아서 레코드를 분리할 수 있다. TextInputFormat으로 행의 경계를 찾는 것과 마찬가지로 스플릿의 시작 지점부터 단순히 스캐닝하면 레코드의 다음 시작 태그를 쉽게 찾을 수 있기 때문에 이와 같은 방식으로 처리하면 프레임워크에 의해 문서가 분리될 때 발생하는 문제를 방지할 수 있다. 이를 위해 하둡은 StreamXmlRecordReader 클래스를 제공하고 있다. 이 클래스는 org.apache.hadoop.streaming.mapreduce 패키지에 있지만 스트리밍 외부에서도 사용할 수 있다. 입력 포맷을 StreamInputFormat으로 설정하고 stream.recordreader.class 속성을 org.apache.hadoop.streaming.mapreduce.StreamXmlRecordReader로 설정하는 방식으로 사용하면 된다. 리더의 시작과 종료 태그 패턴은 잡의 환경 설정 속정으로 설정한다.

 

예제 데이터로는 위키피디아를 추천한다. 위키피디아는 XML 형태로 내용을 내려받을 수 있으며, 앞에서 설명한 방식으로 맵리듀스를 이용하여 병렬로 데이터를 처리할 수 있다. 데이터는 하나의 XML로 둘러싸인 큰 문서에 들어있으며, 페이지의 내용과 연관 메타데이터를 포함한 일련의 page 항목으로 구성되어있다. StreamXmlRecordReader를 사용하면 매퍼는 page항목을 레코드로 처리할 수 있다.

8-2-3 바이너리 입력

하둡 맵리듀스는 텍스트 데이터뿐만 아니라 바이너리 포맷도 지원한다.

SequenceFileInputFormat

하둡의 시퀸스 파일 포맷은 일련의 바이너리 키-값 쌍을 저장한다. 시퀸스 파일은 스플릿이 가능하기 때문에 맵리듀스에 적합한 데이터 포맷이다. 시퀸스 파일은 파일의 임의 지점에서 리더가 레코드의 경계를 동기화 할 수 있는 싱크 지점을 제공한다. 또한 포맷의 일부로 압축을 지원하고 다양한 직렬화 프레임워크를 이용하면 임의의 타입을 저장할 수 있다.

 

SequenceFileInputFormat을 사용하면 시퀸스 파일의 데이터를 맵리듀스의 입력으로 전달할 수 있다. 키-값의 타입은 시퀸스 파일에 의해 결정되므로 맵의 입력 타입과 일치하는지 반드시 확인해야 한다. 예를 들어 5장에서 언급한 시퀸스 파일은 IntWritable 키와 Text 값을 가지므로 맵 함수의 원형은 Mapper<IntWritable, Text, K, V>가 되어야 한다. 여기서 K와 V는 맵의 출력키와 값의 타입이다.

SequenceFileAsTextInputFormat

SequenceFileAsTextInputFormat은 시퀸스 파일의 키와 값을 Text 객체로 변환하도록 SequenceFileInputFormat을 변형한 것이다. 변환은 키와 값에 toString() 메서드를 호출해서 수행한다. 이 포맷을 이용하면 시퀸스 파일을 스트리밍에 적합한 입력으로 만들 수 있다.

SequenceFileAsBinaryInputFormat

SequenceFileAsBinaryInputFormat은 시퀸스 파일의 키와 값을 직접 해석이 불가능한 바이너리 객체로 반환하도록 SequenceFileInputFormat을 변형한 것이다. 키와 값을 BytesWritable 객체로 캡슐화하면 애플리케이션은 내부의 바이트 배열을 원하는 대로 자유롭게 해석할 수 있다. SequenceFile.Writer의 appendRaw() 메서드 혹은 SequenceFileAsBinaryOutputFormat을 이용해서 시퀸스 파일을 생성하는 프로세스와 결합하면 맵리듀스를 통해 시퀸스 파일로 저장된 어떠한 바이너리 데이터 타입도 사용할 수 있게 된다. 하지만 일반적으로 하둡의 직렬화 메커니즘을 활용하는 편이 더 좋다.

FixedLengthInputFormat

FixedLengthInputFormat은 구분자로 분리된 레코드가 아닌 고정된 길이의 바이너리 레코드를 읽을 수 있다. 레코드 길이는 fixedlengthinputformat.record.length 속성으로 설정한다.

8-2-4 다중 입력

파일 글로브, 필터, 단순 경로를 조합하면 맵리듀스 잡에 다수의 입력 파일을 지정할 수 있다. 하지만 모든 입력은 하나의 InputFormat과 하나의 Mapper로 해석되어야 한다. 그러나 시간이 지나면서 데이터 포맷은 점차 발전하게 되므로 과거의 모든 포맷을 처리할 수 있는 매퍼를 직접 작성해야 한다. 또는 동일한 타입이지만 포맷이 다른 데이터 원본을 처리해야 할 때도 있다. 이러한 상황은 서로 다른 데이터셋에 대해 조인을 수행할 때 주로 발생한다. 예를 들어 하나는 탭으로 구분된 일반 텍스트고 다른 하나는 바이너리 시퀸스 파일일 수 있다. 두 파일이 동일한 형태일지라도 다른 표현 형태를 가지므로 결국 다른 방식으로 해석해야 한다.

 

MultipleInputs 클래스를 이용하여 각각의 경로에 사용할 InputFormat과 Mapper를 지정하면 이러한 문제를 깔끔하게 처리할 수 있다. 예를 들어 최고 기온 분석을 위해 NCDC 데이터와 영국의 Met 오피스의 기상 데이터를 결합하고 싶으면 다음과 같이 입력을 설정하면 된다.

	MultipleInputs.addInputPath(job, ncdcInputPath,
    	TextInputFormat.class, MaxTemperatureMapper.class);
    MultipleInputs.addInputPath(job, metOfficeInputPath,
    	TextInputFormat.class, MetOfficeMaxTemperatureMapper.class);

 

FileInputFormat.addInputPath()와 job.setMapperClass()를 호출하는 대신 이 함수를 호출했다. Met 오피스와 NCDC 데이터는 모두 텍스트 기반이므로 TextInputFormat을 사용한다.  그러나 두 데이터의 행 포맷은 서로 다르므로 두 개의 다른 매퍼를 사용해야 한다. 먼저 MaxTemperatureMapper는 NCDC 입력 데이터를 읽어서 연도와 기온 필드를 추출한다. 그리고 MetOfficeMaxTemperatureMapper는 맷 오피스의 입력 데이터를 읽어서 연도와 기온 필드를 추출한다. 중요한 점은 맵의 출력은 동일한 타입이며, 타입이 모두 동일한 리듀서는 집계된 맵의 출력만 보게 되므로 이를 생성한 매퍼가 다르다는 점을 알지 못한다는 것이다.

 

MultipleInputs 클래스는 addInputPath()를 수정한 버전으로, 매퍼를 인자로 갖지 않는다.

public static void addInputPath(Job job, Path path, 
					Class<? extends InputFormat> inputFormatClass)

이 클래스는 Job의 setMapperClass() 메서드로 설정할 수 있으며, 다수의 입력 포맷을 가진 단일 매퍼를 사용할 때 매우 유용하다.

8-2-5 데이터베이스의 입력과 출력

DBInputFormat은 JDBC를 사용해서 관계형 데이터베이스로부터 데이터를 읽는 입력 포맷이다. 이 포맷은 샤딩 기능을 지원하지 않는다. 따라서 많은 매퍼로 데이터를 읽어서 데이터베이스에 과부하를 주는 일이 발생하지 않도록 주의해야 한다. 이러한 이유로 HDFS에 존재하는 아주 큰 데이터 셋과 조인할 데이터베이스의 작은 데이터셋을 MultipleInputs로 읽을때 적합하다. 데이터베이스의 출력 포맷은 DBOutputFormat이며 적당한 크기의 잡 출력을 데이터베이스로 내보내는 데 사용한다.

 

관계형 데이터베이스와 HDFS 사이에 데이터를 이동하는 대표적인 도구로는 스쿱이 있다.

 

HBase의 TableInputFormat은 HBase 테이블에 저장된 데이터를 맵리듀스로 처리하기 위해 만들어 졌다. 맵리듀스의 출력을 HBase 테이블에 TableOutputFormat을 사용하면 된다.

8-3 출력 포맷

하둡은 이전 절에서 설명한 입력 포맷에 대응하는 출력 데이터 포맷을 제공한다.

8-3-1 텍스트 출력

하둡의 기본 출력 포맷인 TextOutputFormat은 레코드를 한 줄의 텍스트로 기록한다. TextOutputFormat은 toString()을 호출하여 문자열로 변환하기 때ㅑ문에 모든 타입의 키와 값을 사용할 수 있다. 키-값 쌍의 기본 구분자는 탭 문자며, mapreduce.output.textoutputformat.separator 속성으로 이를 변경할 수 있다. TextOutputFormat에 대응하는 입력 포맷은 KeyValueTextInputFormat이며 지정된 구분자로 각 행을 키-값 쌍으로 분리한다.

 

NullWritable 타입을 사용하면 키 또는 값을 출력하지 않는다(만약 키와 값 모두에 nullWritalbe을 사용하면 NullOutputFormat처럼 아무것도 출력하지 않는다). 이 출력 포맷은 구분자가 없기 때문에 이 출력 파일을 읽을 때 적합한 입력 포맷은 TextInputFormat이다.

 

8-3-2 바이너리 출력

SequenceFileOutputFormat

이름에서 알수 있듯이 SequenceFileOutputFormat은 시퀸스 파일에 출력 결과를 기록한다. 이 출력 포맷은 단순하고 쉽게 압축할 수 있기 때문에 다음에 실행할 맵리듀스의 잡의 입력으로 좋은 선택이다.SequenceFileOutputFormat의 정적 메서드로 압축을 제어할 수 있다.

SequenceFileAsBinaryOutputFormat

SequenceFileAsBinaryInputFormat에 대응하는 SequenceFileAsBinaryOutputFormat은 원시 바이너리 포맷의 키와 값을 시퀸스 파일 컨테이너에 기록한다.

MapFileOutputFormat

MapFileOutputFormat의 출력물은 맵 파일이다. MapFile의 키는 순서대로 추가되어야 하므로 리듀서는 반드시 순차적으로 키를 출력해야 한다.

8-3-3 다중 출력

FileOutputFormat과 서브클래스는 출력 디렉터리에 다수의 파일을 생성한다. 리듀서당 하나의 파일이 존재하며 part-r-00000, part-r-00001 등과 같이 파티션 번호로 파일의 이름을 짓는다. 파일의 이름을 다르게 지정하거나 리듀서당 하나 이상의 파일을 생성할 때도 있다. 이를 위해 맵리듀스는 MultipleOutputs 클래스를 제공한다.

 

8-3-4 느린 출력

FileOutputFormat의 서브클래스는 내용이 없더라도 출력파일을 생성한다. 빈파일을 만들지 않는 애플리케이션을 작성하고 싶을 때는 LazyOutputFormat을 사용하면 된다. 이 클래스는 래퍼 출력 포맷으로, 첫번째 레코드를 특정 파티션에 보내는 시점에 출력 파일을 생성한다.이를 사용하려면 JobConf와 내부 출력 포맷을 인자로 LazyOutputFormat의 setOutpuutFormatClass() 메서드를 호출하면 된다. 스트리밍에서는 LazyOutputFormat을 활성화하도록 -lazyOutput 옵션을 지원한다.