하둡은 데이터 I/O를 위한 프리미티브(내장된 기본 기능을)를 제공한다. 데이터 무결성과 압축 같은 일부 프리미티브는 하둡보다 일반적인 기술이지만 멀티테라바이트의 데이터셋을 처리할 때는 이 프리미티브를 특별히 고려할 만한 가치가 있다. 다른 프리미티브로는 직렬화 프레임워크나 디스크 기반 데이터 구조와 같은 분산 시스템을 개발하기 위한 구성요소를 제공해주는 하둡 도구나 API가 있다.
5-1 데이터 무결성
하둡 사용자는 당연히 저장 또는 처리 과정에서 어떠한 데이터도 잃어버리거나 손상되지 않기를 기대한다. 그러나 디스크나 네트워크상의 모든 I/O 조작은 데이터를 읽거나 쓸 때 가능성은 적지만 에러가 발생할 수 있기 때문에 하둡이 처리할 수 있는 수준의 커다란 데이터가 시스템에 유입되면 데이터가 손상될 가능성도 그만큼 커질 수 있다.
손상된 데이터를 검출하는 일반적인 방법은 데이터가 시스템에 처음 유입되었을 때와 데이터를 손상시킬지도 모르는 신뢰할 수 없는 통신 채널로 데이터가 전송되었을 때마다 체크섬 을 계산하는 것이 있다. 만일 새롭게 생성된 체크섬이 원본과 정확히 일치하지 않는다면 그 데이터는 손상된 것으로 간주한다.이러한 기술은 데이터를 원상 복구하는 방법을 제공하지 않고 단순히 검출만 수행한다. 따라서 저가의 하드웨어를 권장하지 않으며, 특히 ECC 메모리를 사용해야 한다. 주목할 점은 데이터가 아니라 체크섬이 손상될 수도 있다는 것이다. 그러나 체크섬은 데이터보다 훨씬 작기 때문에 손상될 가능성은 매우 낮다.
일반적으로 에러 검출 코드는 모든 크기의 입력에 대해 32비트 정수 체크섬을 계산하는 CRC-32(32비트 순환 중복 검사)를 사용한다.CRC-32는 하둡의 ChecksumFileSystem에서 체크섬 계산을 하기 위해서도 사용되지만 HDFS에서는 CRC-32C라고 불리는 매우 효율적인 변형을 사용한다.
- 대규모 시스템에서는 데이터가 언젠가 반드시 깨진다
- 하둡은 체크섬으로 “깨졌는지”를 검출한다
- 복구는 복제본으로 한다, 체크섬은 탐지용이다
*체크섬 : 데이터의 지문이다.
- 데이터가 들어올 때
→ 지문 하나 생성 - 나중에 읽을 때
→ 지문 다시 생성 - 두 지문 비교
*CRC-32 = 대표적인 체크섬 알고리즘
- 어떤 크기의 데이터든 32비트 숫자 하나로 요약
5-1-1 HDFS의 데이터 무결성
HDFS는 모든 데이터를 쓰는 과정에서 내부적으로 체크섬을 계산하고, 데이터를 읽는 과정에서 체크섬을 기본적으로 검증한다. dfs.bytes-per-checksum에 설정된 크기만큼의 모든 바이트 데이터에 대해 별도의 체크섬이 생성된다. 기본적으로는 512바이트고, CRC-32C 체크섬이 4바이트의 long 타입이기 때문에 스토리지 오버헤드는 1%도 되지 않는다.
데이터노드는 데이터와 체크섬을 저장하기 전에 수신한 데이터를 검증할 책임이 있다. 이 같은 검정은 클라이언트로부터 수신한 데이터 또는 복제 과정에서 다른 데이터노드로부터 수신한 데이터에 대해 수행된다.데이터를 쓰는 클라이언트가 데이터노드 파이프라인으로 데이터를 보내면 파이프라인의 마지막 데이터노드는 해당 데이터의 체크섬을 검증한다. 만일 데이터노드가 에러를 검출하면 클라이언트는 IOException의 서브클래스를 예외롤 받고 애플리케이션 특성에 맞게 처리한다(예를 들어 재연산을 시도할 수 있다).
클라이언트가 데이터노드로부터 데이터를 읽을 때 클라이언트 역시 데이터노드에 저장된 체크섬과 수신된 데이터로부터 계산된 체크섬을 검증한다. 각 데이터노드는 체크섬 검증 로그를 영구 저장하기 때문에 각각의 블록이 검증되었던 마지막 시간을 알고 있다.
클라이언트가 성공적으로 하나의 블록을 검증하고 데이터노드에 알리면 데이터노드는 체크섬 검증에 대한 로그를 갱신한다. 이러한 통계치의 저장은 오류 디스크 검출에 유용하다.
클라이언트의 읽기 과정에서 블록을 검증하는 것 외에도 각 데이터노드는 저장된 모든 블록을 주기적으로 검증하는 DataBlockScanner를 백그라운드 스레드로 수행한다. 이것은 물리적 저장 매체에서 발생할 수도 있는 '비트로트'에 의한 데이터 손실을 피하기 위한 방법이다. HDFS는 블록의 복제본을 저장하기 때문에 손상되지 않은 새로운 복제본 생성을 위해 정상 복제본 중 하나를 복사하는 방식으로 손상된 블록을 치료할 수 있다. 만일 클라이언트가 블록을 읽는 과정에서 에러를 검출하면 훼손된 블록과 데이터노드에 대한 정보를 네임노드에 보고하고 ChecksumException을 발생시킨다. 네임노드는 그 블록 복제본이 손상되었다고 표시하고 다른 클라이언트에 제공하거나 또 다른 데이터노드로 복사하지 못하도록 한다. 그리고 네임노드는 해당 블록을 다른 데이 터노드에 복제되도록 스케줄링 해서 그 블록의 복제계수를 원래 수준으로 복구한다. 이러한 일이 발생하면 손상된 복제본은 삭제된다. 파일을 읽으려고 open() 메서드를 호출하기 전에 FileSystem의 setVerifyChecksum() 메서드 false를 전달해서 체크섬 검증을 비활성화 할 수 있다. 쉘에서 -get 또는 -copyToLocal 명령어에 -ignoreCrc 옵션을 사용해도 동일하게 동작한다. 이 기능은 손상된 파일을 조사하려 할 때 이 파일로 무엇을 할지 결정할 수 있어서 매우 유용하다. 예를 들어 해당 파일을 지우기 전에 그것을 복구할 수 있는지 여부를 파악하고자 할 수 있다.
Hadoop fs -checksum을 이용해서 파일의 체크섬을 확인할 수 있다. 이는 HDFS 안의 두 파일이 동일한 내용인지 확인할 때 유용하다. 예를 들면 distcp가 이러한 일을 수행한다.
정리하자면 다음과 같다
기본값
- 512바이트 데이터마다 체크섬 1개 생성
- 체크섬 크기 = 4바이트 (CRC-32C)
쓰기 과정에서 무슨 일이 벌어지냐
- 클라이언트
→ 데이터 전송
→ 데이터노드 파이프라인
→ 마지막 데이터노드가 체크섬 검증
→ HDFS 저장
데이터노드의 책임
데이터노드는:
“내가 받은 데이터가 중간에 깨지지 않았는지”
무조건 검사해야 함
검사 대상
- 클라이언트에서 받은 데이터
- 다른 데이터노드에서 복제된 데이터
에러 발생하면
- 클라이언트에게 IOException 발생
- 애플리케이션은:
- 재전송
- 재계산
- 실패 처리
중에서 선택
읽기 과정에서도 검사한다
- 클라이언트
→ 데이터노드에서 블록 읽음
→ 체크섬 다시 계산
→ 저장된 체크섬과 비교
개념의미
| 512바이트 단위 | 매우 촘촘한 무결성 검사 |
| 쓰기 검증 | 저장 전부터 깨짐 방지 |
| 읽기 검증 | 사용자에게 깨진 데이터 안 줌 |
| 백그라운드 검사 | 디스크 노화 감시 |
| 자동 복제 복구 | 깨지면 시스템이 스스로 치료 |
“HDFS는 512바이트 단위로 CRC-32C 체크섬을 생성하고, 쓰기·읽기·백그라운드 스캐너를 통해 무결성을 검증하며, 손상된 블록은 네임노드가 복제본을 재생성해 자동으로 복구한다.”
5-1-2 LocalFileSystem
하둡 LocalFileSystem은 클라이언트 측 체크섬을 수행한다. 다시 말해, filename이라는 파일을 쓸때 파일 시스템 클라이언트는 파일과 같은 위치의 디렉터리에 그 파일의 각 청크별 체크섬이 담긴, filename.crc라는 숨겨진 파일을 내부적으로 생성한다. 청크 크기는 기본적으로 512바이트며 file.bytes-per-checksum 속성으로 변경할 수 있다. 청크 크기는 .crc 파일에 메타데이터로 저장되기 때문에 청크 크기의 설정이 변경된다 하더라도 파일을 다시 정확하게 읽을 수 있다. 파일을 읽을 때 체크섬이 검증되고 에러가 검출되면 LocalFileSystem이 ChecksumException을 발생한다.
체크섬은 일반적으로 파일을 읽고/쓰는 시간에 몇 퍼센트의 오버헤드를 추가하는 정도이므로 전체 계산 성능에 미치는 영향이 미미하다.대부분의 애플리케이션에서 이는 데이터 무결성을 위해 납득할 만한 비용이라고 할 수 있다. 그러나 기존 파일 시스템이 자체적으로 체크섬을 지원한다면 LocalFileSystem의 체크섬을 비활성화 할 수 도 있다. 이를 위해 LocalFileSystem 대신 RawLocalFileSystem을 사용하면 된다. 애플리케이션에 전역적으로 체크섬을 비활성화하려면 fs.file.impl 속성을 org.apache.hadoop.fs.RawLocalFileSystem 값으로 설정해서 파일 URI 구현을 변경하면 된다. 대안으로 RawLocalFileSystem 인스턴스를 직접 생성할 수도 있는데, 이는 다음 예처럼 일부 읽기에 대해서만 체크섬 검증을 비활성화 할 때 유용하다.
Configuration conf = ...
FileSystem fs = new RawLocalFileSystem();
fs.initialize(null, conf);
5-1-3 ChecksumFileSystem
LocalFileSystem이 동작할 때 ChecksumFileSystem을 사용하며, 이 클래스는 단순한 FileSystem의 래퍼로 구현되어 있기 때문에 다른 체크섬이 없는 파일시스템에 체크섬 기능을 쉽게 추가할 수 있다. 일반적인 사용법은 다음과 같다.
FileSystem rawFs = ...
FileSystem checksummedFs = new ChecksumFileSystem(rawFs);
내부의 파일 시스템은 원시 파일 시스템이라 불리며, ChecksumFilesystem의 getRawFileSystem() 메서드를 사용해서 얻을 수 있다. ChecksumFileSystem은 체크섬과 관련된 몇개의 유용한 메서드를 가지고 있는데, 예를 들면 어떤 파일에 대한 체크섬 경로를 얻을 수 있는 getChecksumFile()이 있다.다른 메서드에 대한 자세한 사항은 문서를 확인하라
ChecksumFileSystem이 파일을 읽을 때 에러를 검출하면 reportChecksumFailure() 메서드를 호출할 것이다. 기본적인 구현에는 아무것도 하지 않는 것으로 되어 있지만 LocalFileSystem은 문제가 되는 파일과 체크섬을 같은 디바이스의 bad_files 라는 별도의 디렉터리로 이동시킨다. 관리자는 주기적으로 이러한 훼손된 파일을 체크하고 그에 대해 적절한 처리를 해야한다.
5-2 압축
파일 압축은 파일 저장 공간을 줄이고, 네트워크 또는 디스크로부터 데이터 전송을 고속화 할 수 있는 두가지 커다란 이점이 있다. 이러한 두가지 이점은 대용량 데이터를 처리할 때 매우 중요하기 때문에 하둡에서 압축이 사용되는 방법을 주의 깊게 살펴봐야 한다.
다양한 특성을 가지고 있는 여러 종류의 압축 포맷, 도구, 알고리즘이 존재한다 다음 표는 하둡에서 사용할수 있는 일반적인 알고리즘이다.
| 압축 포맷 | 도구 | 알고리즘 | 파일 확장명 | 분할 가능 |
| DEFLATE | N/A | DEFLATE | .deflate | No |
| gzip | gzip | DEFLATE | .gz | No |
| bzip2 | bzip2 | bzip2 | .bz2 | Yes |
| LZO | lzop | LZO | .lzo | No |
| LZ4 | N/A | LZ4 | .lz4 | No |
| Snappy | N/A | Snappy | .snappy | No |
모든 압축 알고리즘은 압축과 해제가 빨라질수록 공간이 늘어나는 희생을 감수해야 하기 때문에 공간과 시간은 트레이드오프 관계에 있다. 표의 도구는 보통 9개의 옵션(-1은 스피드 최적화, -9는 공간 최적화를 의미)을 제공함으로써 어느 정도 이러한 트레이드오프에 대한 제어를 가능하게 한다. 예를 들어 다음 명령은 가장 빠른 압축 메서드를 사용해서 file.gz라는 압축 파일을 생성한다.
% gzip -1 file
각 도구는 각기 다른 압축 특성이 있다. gzip은 일반적인 목적의 압축 도구고, 공간/시간 트레이드오프의 중앙에 위치한다. bzip2는 gzip보다 더 효율적으로 압축하지만 대신 더 느리다. bzip2의 압축 해제 속도는 압축 속도보다 더 빠르지만 여전히 다른 포맷에 비해 더 느리다. 한편 LZO, LZ4, Snappy는 모두 속도에 최적화되었고 gzip보다 어느정도 빠르지만 압축 효율은 떨어진다. Snappy와 LZ4는 압축 해제 속도 측면에서 LZO보다 매우 빠르다. 표의 '분할 가능' 열은 압축 포맷이 분할을 지원하는지 여부를 알려준다. 예를들면 스트림의 특정 지점까지 탐색한 후 이후의 일부 지점으로부터 읽기를 시작할 수 있는지 알려준다. 분할 가능한 압축 포맷은 특히 맵리듀스에 적합하다.
5-2-1 코덱
코덱은 압축-해제 알고리즘을 구현한것이다. 하둡에서 코덱은 CompressionCodeC 인터페이스로 구현된다 예를들어 GzipCodec은 gzip을 위한 압축과 해제 알고리즘을 담고있다. 아래 표는 하둡에서 이용할 수 있는 코덱을 보여준다.
| 압축 포멧 | 하둡 압축 코덱 |
| DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
| Gzip | org.apache.hadoop.io.compress.GzipCodec |
| bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
| LZO | com.hadoop.compression.lzo.LzopCodec |
| LZ4 | org.apache.hadoop.io.compress.Lz4Codec |
| Snappy | org.apache.hadoop.io.compress.SnappyCodec |
LZO 라이브러리는 GPL 라이선스고 아파치 배포판에 포함되지 않을 수 있으므로 하둡 코덱은 구글,깃허브로 내려받아야 된다.
LzopCodec은 lzop 도구와 호환되고 핵심적인 LZO 포맷에 부가적인 헤더가 포함된 것으로 일반적으로 사용자가 원하는 형태다. 또한 순수한 LZO 포맷을 위한 LzoCodec이 있는데, .lzo_deflate 파일 확장명을 사용한다.
CompressionCodec를 통한 압축 및 해제 스트림
CompressionCodec은 데이터를 쉽게 압축하거나 해제해주는 두개의 메서드를 제공한다. 출력 스트림에 쓸 데이터를 압축하려면 createOutputStream(OutputStreamout out) 메서드를 사용하는데, 이 메서드는 압축되지 않은 데이터를 압축된 형태로 내부 스트림에 쓰는 CompressionOutputStream을 생성한다. 반대로 입력 스트림으로부터 읽어 들인 데이터를 압축 해제하려면 createInputStream(InputStream in) 메서드를 호출하는데, 이 메서드는 기존 스트림으로부터 비압축 데이터를 읽을 수 있는 CompressionInputStream을 반환한다.
CompressionOutputStream과 CompressionInputStream은 기존 압축기 또는 압축 해제기를 재설정할수 있는 기능을 제공하는 것을 제외하면 각각 java.util.zip.DeflaterOutputStream,java.util.zip.DeflaterInputStream과 비슷하다. 이는 5.4.1절 'SequenceFile'에서 다루는 SequenceFile과 같이 데이터 스트림의 구간을 별개의 블록으로 압축하는 애플리케이션에서 사용되는 중요한 메서드다. 아래 예제는 표준 입력으로부터 읽어 들인 데이터와 표준 출력으로 쓰이는 데이터를 압축하는 API의 사용법을 보여준다.
public class StreamCompressor {
public static void main(String[] args) throws Exception {
String codecClassname = args[0];
Class<?> codecClass = Class.forName(codecClassname);
configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec)
ReflectionUtils.newInstance(codecClass, conf)
CompressionOutputStream out = codec.createOutputStream(System.out);
IOUtils.copyBytes(System.in, out, 4096, false);
out.finish();
}
}
애플리케이션의 첫번째 명령행 인자는 CompressionCodec 패키지의 정규화된 전체 이름이다. 그 코덱의 새로운 인스턴스를 생성하기 위해 ReflectionUtils를 사용하고 System.out의 압축 래퍼를 얻는다. 그리고 입력을 출력으로 복사하기 위해 IOutils의 copyBytes() 유틸리티 메서드를 호출하면 CompressionOutputStream으로 압축한다. 마지막으로 CompressionOutputStream의 finish()를 호출하고, 압축 도구에 압축된 스트림 쓰기 중단을 요청하지만 스트림 자체를 닫지 않는다. 이를 다음과 같이 명령행에서 시험해 볼수 있는데, 이 예제는 GzipCodec으로 StreamCompressor를 사용해서 'Text' 문자열을 압축하고 gunzip을 사용하여 표준 입력으로 부터 압축 해제한다.
% echo "Text" | hadoop StreamCompressor org.apache.hadoop.io.compress.GzipCodec \ | gunzip - Text
CompressionCodecFactory를 사용하여 CompressionCodec 유추하기
압축된 파일을 읽을 때 일반적으로 해당 파일 확장명을 보면 사용된 코덱을 유추할 수 있다. 예를 들어 .gz로 끝나는 파일은 GzipCodec으로 읽을 수 있다. 각 압축 포맷에 대한 확장자는 위에 java코드에서 확인할수 있다.
CompressionCodecFactory의 getCodec() 메서드는 지정된 파일에 대한 Path 객체를 인자로 받아 파일 확장명에 맞는 CompressionCodec를 찾아준다. 아래는 파일을 압축 해제하기 위해 이러한 기능을 사용하는 애플리케이션을 보여준다.
public class FileDecompressor {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path inputPath = new Path(uri);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(inputPath);
if (codec == null) {
System.err.println("No codec found for " + uri);
System.exit(1);
String outputUri =
CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension())
InputStream in = null;
OutputStream out = null;
try {
in = codec.createInputStream(fs.open(inputPath));
out = fs.create(new Path(outputUri));
IOUtils.copyByte(in, out, conf);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
}
}
일단 코덱을 찾았으면 출력 파일의 이름을 생성하기 위해 CompressionCodecFactory의 removeSuffix() 정적 메서드로 파일 접미사를 제거한다. 따라서 다음처럼 프로그램을 호출하면 file.gz로 명명된 파일은 file로 압축 해제된다.
% hadoop FileDecompressor file.gz
CompressionCodecFactory는 코덱표에 나열된 코덱(LZO는 제외)과 io.compression.codecs 속성에 나열된 코덱을 불러온다.(아래 표 참고) 기본적으로 io.compression.codecs 속성은 비어 있다. 따라서 새롭게 등록하는 커스텀 코덱(외부 로딩이 필요한 LZO 코덱과 같은)을 얻고자 할 때만 환경 속성을 변경할 필요가 있다. 각 코덱은 기본 파일 확장명을 알고 있으므로 CompressionCodecFactory가 주어진 확장과 일치하는 코덱을 찾기 위해 등록된 코덱 전체를 검색한다.
| 속성명 | 타입 | 기본값 | 설명 |
| io.compression.codecs | 콤마로 구분된 클래스 이름 | 압축 및 해제를 위해 추가하고자 하는 CompressionCodec 클래스 목록 |
원시 라이브러리
성능 관점에서 압축과 해제를 위해 원시 라이브러리를 사용하는 것이 바람직하다. 예를 들어 원시 gzip 라이브러리를 사용하여 테스트 해본 결과 압축 해제 성능은 최대 50%, 압축 성능은 거의 최대 10% 정도 더 좋아졌다. 아래 표는 각 압축 포맷에 대한 자바와 원시 구현체의 사용 가능 여부를 보여준다. 모든 포맷은 원시 구현체를 가지고 있지만 모든 포맷이 자바 구현체를 가지고 있는 것은 아니다.(예를 들면 LZO)
| 압축 포맷 | 자바 구현체 | 원시 구현체 |
| DEFLATE | Yes | Yes |
| gzip | Yes | Yes |
| bzip2 | Yes | Yes |
| LZO | No | Yes |
| LZ4 | No | Yes |
| Snappy | No | Yes |
하둡은 lib/native 디렉터리에 미리 빌드된 64비트 리눅스용 원시 압축 라이브러리인 libhadoop.so를 제공한다. 다른 플랫폼은 소스 최상단의 BUILDING.txt 문서의 지시에 따라 직접 라이브러리를 컴파일해야 한다.
원시 라이브러리는 java.library.path라는 자바 시스템 속성에 따라 선택된다. etc/hadoop 디렉터리에 있는 hadoop 스크립트가 이 속성을 직접 설정해주지만,이 스크립트를 사용하지 않는다면 애플리케이션에서 설정할 수도 있다.
기본적으로 하둡은 자신이 수행되는 플랫폼에 맞는 원시 라이브러리를 먼저 찾고, 있으면 자동으로 해당 라이브러리를 로드한다. 이것은 원시 라이브러리를 사용하기 위해 어떠한 환경 설정도 변경할 필요가 없음을 의미한다. 그러나 압축 관련 문제를 디버깅하는 것과 같은 상황에서는 원시 라이브러리의 사용을 비활성화하고 싶을 수도 있다. 이를 위해 io.native.lib.available 속성을 false로 설정하면 되는데, 이때 내장되어 있는 자바와 동등한 코덱 라이브러리가 사용된다(만일 이용할 수 있다면)
코덱풀 만일 원시 라이브러리를 사용하고 애플리케이션에서 상당히 많은 압축 또는 해제 작업을 수행해야 한다면 압축기와 해제기를 재사용해서 객체 생성 비용을 절감할 수 있는 CodecPool의 사용을 고려하는 것이 좋다.
아래의 코드에서 API를 사용하는 예제를 볼수 있다. 다만 단일 압축기(Compressor)만 생성하기 때문에 사실 코덱 풀을 사용할 필요는 없다.
public class PooledStreamCompressor {
public static void main(String[] args) throws Exception {
String codecClassname = args[0];
Class<?> codecClass = Class.forName(codecClassname);
configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec)
RefletionUtils.newInstance(codecClass, conf);
Compressor compressor = null;
try {
compressor = CodePool.getCompressor(codec);
CompressionOutputStream out =
codec.createOutputStream(System.out, compressor);
IOUtils.copyBytes(System.in, out, 4096, false)l
out.finish();
} finally {
CodecPool.returnCompressor(compressor);
}
}
}
주어진 CompressionCodec으로 풀에서 Compressor 인스턴스를 얻고, 그 코덱을 재정의한 createOutputStream() 메서드에서 이 인스턴스를 사용한다. Finally 블록을 사용하여 스트림 간에 바이트를 복사하는 과정에서 IOException이 발생해도 압축기 인스턴스가 풀로 반환되도록 보장한다.
5-2-2 압축과 입력 스플릿
맵리듀스로 처리되는 데이터를 어떻게 압축할지 고민하는 시점에 압축 포맷이 분할을 지원하는지 여부를 알고 있는것은 중요하다.
HDFS에 1GB 크기로 저장된 비압축 파일을 고려해보자. 128MB 크기의 HDFS 블록 8개가 저장되어 있을 때 이 파일을 입력으로 사용하는 맵리듀스 잡은 개별적인 맵 태스크에서 독립적으로 처리되는 8개의 입력 스플릿을 생성할 것이다.
이번에는 그 파일이 압축된 크기가 1GB인 단일 gzip 압축 파일이라고 상상해보라. 이전처럼 HDFS는 그 파일을 8개의 블록으로 저장할 것이다. 그러나 gzip 스트림이 특정 위치에서 읽기를 지원하지 않기 때문에 각 블록별로 스플릿을 생성할 수 없다. 그러므로 맵 태스크가 각 블록 스플릿을 개별적으로 읽는 것은 불가능하다. gzip 포맷은 압축된 데이터를 저장하기 위해 DEFLATE를 사용하고, DEFLATE는 데이터를 일련의 압축된 블록으로 저장한다. 리더가 다음 블록의 시작으로 이동하려면 스트림과 동기화되어 그 스트림의 특정 지점에 있을 수 있는 어떤 방법을 지원해야 하는데, DEPLATE 압축 방식은 각 블록의 시작점을 구별할 수 없다는 문제가 있다. 이러한 이유로 gzip은 분할을 지원하지 않는다.
이때 맵리듀스는 입력이 gzip 압축(파일 확장명을 통해)이고, gzip은 분할을 지원하지 않는다는 것을 인식하기 때문에 파일을 분할하려 하지 않으면서 최적의 방식으로 작동할 것이다. 이것은 제대로 동작하긴 하지만 지역성 비용이 발생한다. 즉, 단일 맵이 8개의 HDFS 블록을 모두 처리해야 하는데, 블록 대부분은 맵의 로컬에 있지 않을것이다. 또한 소수의 맵으로 잡이 덜 세분화되기 때문에 결국 시간이 더 많이 걸릴 것이다.
이 상황에서 압축 파일이 LZO라면 내부의 압축 포맷은 스트림과 동기화되는 방법을 리더에 제공하지 않기 때문에 같은 문제가 생길것이다. 하지만 하둡 LZO 라이브러리에 포함된 색인 도구를 사용해서 LZO 파일을 전처리 할수 있다. 색인 도구는 스플릿 지점의 색인을 구축하고 적당한 맵리듀스 입력 포맷으로 사용되어 파일을 효율적으로 분할할 것이다.
반면 bzip2 파일은 블록사이에서 동기화 표시자를 제공하므로 분할을 지원한다
어떤 압축 포맷을 사용해야 하는가?
하둡 애플리케이션은 거대한 데이터셋을 처리하기때문에 압축의 장점을 충분히 활용할 필요가 있다.
어떤 압축 포맷을 사용할지는 파일크기, 포맷, 처리에 사용되는 도구같은 고려사항에 의해 결정된다.
5-2-3 맵리듀스에서 압축 사용하기
5.2.1 절의 'CompressionCodecFactory를 사용하여 CompressionCodecs 유추하기'에서 설명했듯이 입력 파일이 압축되면 맵리듀스는 파일 확장명을 통해 사용할 코덱을 결정하고 파일을 읽을 때 자동으로 압축을 해제할 것이다.
맵리듀스 잡의 출력을 압축하려면 잡 환경 설정에서 mapreduce.output.fileoutputformat.compress 속성을 true로 설정하고, 사용할 압축 코덱의 클래스 이름을 mapreduce.output.fileoutputformat.compress.codec 속성에 지정하라.
또는 아래처럼 FileOutputFormat의 정적 편의 메서드로 그 속성을 설정할 수 있다.
public class MaxTemperatureWithCompression {
public static void main(String[] args) throws Exception {
if (args. length != 2){
System.err.println("Usage: MaxtemperatureWithCompression <input path> " + "<output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
job.setMapperClass(MaxTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
이 프로그램은 다음과 같이 압축된 파일을 입력으로 받아 실행된다. 이 예제에서는 출력과 동일한 압축 포맷을 사용했지만 꼭같을 필요는 없다.
% hadoop MaxTemperatureWithCompression input/ncdc/sample.txt.gz output
최종 출력의 각 부분은 압축되어 있다. 아래는 한 부분의 내용이다.
% gunzip -c output/part-r-00000.gz
1949 111
1950 22
만일 시퀸스 파일로 출력을 내보내고 있다면 mapreduce.output.fileoutputformat.compress.type 속성을 설정해서 사용할 압축 유형을 제어할 수 있다. 기본값은 RECORD고, 개별 레코드를 압축하는 방식이다.압축의 성능을 높이려면 레코드 그룹 방식으로 압축하는 BLOCK방식을 추천한다.
또한 SequenceFileOutputFormat에도 이 속성을 설정할수 있는 setOutputCompressionType()이라는 정적 편의 메서드가 있다.
맵리듀스 잡 출력의 압축을 설정하는 속성은 다음 표에 요약해두었다. 만약 맵리듀스 드라이버가 Tool 인터페이스를 사용한다면 명령행에서 이러한 속성을 프로그램에 넘겨 줄 수 있다. 그렇게 하면 프로그램의 코드에 압축 속성을 포함하는 방법보다 쉽게 속성을 변경할 수 있다.
| 속성명 | 타입 | 기본값 | 설명 |
| mapreduce.output.fileoutputformat.compress | 불린 | False | 출력 압축 여부 |
| mapreduce.output.fileoutputformat.compress.codec | 클래스명 | org.apache.hadoop.io.compress.DefaultCodec | 출력 압축에 사용할 코덱 |
| mapreduce.output.fileoutputformat.compress.type | 문자열 | RECORD | 순차 파일 출력에 사용할 압축 유형(NONE, RECORD, BLOCK) |
맵 출력 압축
맵리듀스 애플리케이션이 압축되지 않은 데이터를 읽고 쓰더라도 맵 단계에서 임시 출력을 압축하면 이익이 있다. 맵 출력은 디스크에 기록되고 네트워크를 통해 리듀서 노드로 전송되는데, 이때 단순히 LZO나 LZ4, Snappy와 같은 빠른 압축기를 사용하여 전송할 데이터양을 줄이면 성능을 향상 시킬수 있기 때문이다 아래 표는 맵 출력의 압축을 활성화하고 압축 포맷을 설정할수 있는 환경 설정 속성을 보여준다.
| 속성명 | 타입 | 기본값 | 설명 |
| mapreduce.map.output.compress | 불린 | false | 맵 출력의 압축 여부 |
| mapreduce.map.output.compress.codec | 클래스명 | org.apache.hadoop.io.compress.DefaultCodec | 맵 출력에 사용할 압축 코덱 |
다음과 같은 코드를 추가하면 잡에 gzip 맵 출력 압축을 활성화 할 수 있다.
Configuration conf = new Configuration();
conf. setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class,CompressionCodec.class);
Job job = new Job(conf);
5-3 직렬화
직렬화는 네트워크 전송을 위해 구조화된 객체를 바이트 스트림으로 전환하는 과정이다. 역직렬화 는 바이트 스트림을 일련의 구조화된 객체로 역전환하는 과정이다.
직렬화는 프로세스 간 통신과 영속적인 저장소와 같은 분산 데이터 처리의 독특한 두 영역에서 나타난다.
하둡 시스템에서 노드 사이의 프로세스 간 통신은 원격 프로시저 호출 을 사용하여 구현되었다.
RPC 프로토콜은 원격 노드로 보내기 위한 메세지를 하나의 바이너리 스트림으로 구성하기 위해 직렬화를 사용하고, 그 후 원격 노드에서 바이너리 스트림을 원본 메세지로 재구성하기 위해 역직렬화를 사용한다. 일반적으로 RPC 직렬화 포맷이 유익한 이유는 다음과 같다.
- 간결성
- 간결한 포맷을 사용하면 데이터 센서에서 가장 희소성이 높은 자원인 네트워크 대역폭을 절약할 수 있다.
- 고속화
- 프로세스 간 통신은 분산 시스템을 위한 백본을 형성하기 때문에 직렬화와 역직렬화는 가능하면 오버헤드가 작아야 한다.
- 확장성
- 프로토콜은 새로운 요구사항을 만족시키기 위해 점차 변경되므로 클라이언트와 서버 사이의 통제 방식과 관련된 프로토콜의 발전도 직관적이어야 한다. 예를들어 새로운 인자를 메서드 호출에 추가할 수 있어야 하고, 새로운 서버는 기존 클라이언트에서(새로운 인자 없이) 예전 포맷의 메세지도 수용할 수 있어야 한다.
- 상호운용성
- 일부 시스템을 위해 다양한 언어로 작성된 클라이언트를 지원하는 편이 좋으며, 이를 가능하도록 포맷을 설계할 필요가 있다.
표면적으로 영속적인 저장소를 위해 선택된 데이터 포맷은 직렬화 프레임워크와는 다른 요구사항을 가진다. 즉, RPC의 수명은 1초보다 작은 반면 영속적 데이터는 저장 후에도 수년동안 읽히기 때문이다. 그럼에도 RPC 직렬화 포맷의 네 가지 유익한 속성은 영속적인 저장소 포맷을 위해서도 중요하다. 저장소 포맷은 간결하고 빠르고 상호운용 할수 있어야 한다.
하둡은 Writable이라는 매우 간결하고 빠른 자체 직렬화 포맷을 사용한다. 그러나 확장하거나 자바 외에 다른 언어를 사용하는 것은 어렵다 Writable이 하둡의 중심에 있기 때문에 일반적인 직렬화 프레임워크를 살펴보기 전에 다음 세 절을 통해 Writable을 어느정도 깊이 있게 다루겠다.
5-3-1 Writable 인터페이스
Writable 인터페이스는 자신의 상태 정보를 DataOutput 바이너리 스트림으로 쓰기 위한 메서드와 DataInput 바이너리 스트림으로 부터 상태 정보를 읽기 위한 메서드를 정의한다.
package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
public interface Writable{
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}
이것을 활용하는 방법을 알아보기 위해 특정 Writable을 살펴보자. 여기서 우리는 자바 int의 래퍼에 해당하는 IntWritable을 사용해볼것이다. 인스턴스를 하나 생성하고 set() 메서드로 값을 설정할 수 있다.
5-3-2 직렬화 인터페이스
비록 대부분 맵리듀스 프로그램이 Writable 키와 값 타입을 사용하지만 맵리듀스 API의 의무사항은 아니다. 사실 모든타입이 사용될 수 있으며, 유일한 요구사항은 각 타입에 대한 바이너리 표현의 변환 매커니즘이다.
이것을 지원하기 위해 하둡은 플러그인 직렬화 프레임워크 API를 제공한다. 직렬화 프레임워크는 Serialization(org.apache.hadoop.io.serializer 패키지에 있는)의 구현체로 표현된다. 예를들어 WritableSerialization은 Writable 타입을 위한 Serialization의 구현체다.
Serialization은 타입을 Serializer 인스턴스(하나의 객체를 하나의 바이트 스트림으로 변환하기 위한)와 Deserializer 인스턴스(하나의 바이트 스트림을 하나의 객체로 변환하기 위한)로 매핑하는 방법을 정의한다.
Serialization 구현체는 io.serializations 속성에 클래스 이름의 목록을 콤마로 분리하여 설정하면 등록할 수 있다. io.serializations 속성의 기본값은 org.apache.hadoop.io.serializar.WritableSerialization과 에이브로의 구체적 매핑과 리플랙트 매핑 직렬화다(12.1절 '에이브로 자료형과 스키마' 참조) 이것은 Writable 또는 에이브로 객체만 직렬화하거나 역직렬화할수 있다는것을 의미한다. 하둡은 자바 객체 직렬화를 사용하는 JavaSerialization이라는 클래스를 포함한다. 맵리듀스 프로그램에서 Integer 또는 String 같은 표준 자바 자료형을 쉽게 사용할 수 있도록 지원하지만, 자바 객체 직렬화는 Writable만큼 효율적이지는 않다. 따라서 어떤 것이 좋은지 따져볼 필요는 없다.
직렬화 IDL
이 문제를 다른 방식으로 접근한 많은 직렬화 프레임워크가 있다. 코드를 통해 타입을 정의하기 보다는 IDL을 사용하여 언어 중립적이고 선언적 방식으로 타입을 정의하는 방식이 있는데, 이러한 시스템은 다양한 언어로 타입을 생성할 수 있기 때문에 상호운용성이 매우 좋다. 또한 타입을 쉽게 확장하고 발전시킬 수 있는 버전화 스키마도 일반적으로 정의되어 있다.
아파치 쓰리프트와 구글 프로토콜 버퍼는 둘다 인기 있는 직렬화 프레임워크며, 영속적인 바이너리 데이터를 위한 포맷으로 널리 활용되고 있다. 맵리듀스 포맷은 제한적으로 지원하고 있지만, RPC와 데이터 교환용으로 하둡 내부에서 일부 활용되고 있다.
에이브로는 하둡에 저장된 대규모 데이터 처리에 적합한 IDL 기반의 직렬화 프레임워크다.
5-4 파일 기반 데이터구조
어떤 애플리케이션에서 데이터를 얻기 위해서는 특별한 데이터 구조가 필요하다. 맵리듀스 기반의 데이터 처리를 위해 바이너리 데이터의 각 블랍을 한 파일에 몽땅 담아두는 것은 확장성에 좋지 않다. 따라서 하둡은 이러한 상황을 위해 다양한 고수준 컨테이너를 개발했다.
'책&스터디' 카테고리의 다른 글
| [하둡 완벽 가이드] - PART2 07장 맵리듀스 작동 방법 (0) | 2026.02.07 |
|---|---|
| [하둡 완벽 가이드] - PART2 06장 맵리듀스 프로그래밍 (1) | 2026.02.06 |
| [하둡 완벽 가이드] - PART1 04장 YARN (0) | 2026.02.03 |
| [하둡 완벽 가이드] - PART1 03장 하둡 분산 파일시스템 HDFS (1) | 2026.02.02 |
| [하둡 완벽 가이드] - PART1 02장 맵리듀스 (0) | 2026.02.01 |