Maven 프로젝트를 통해 간단한 MapReduce 작업을 개발한다.
기본적인 dependency는 hadoop-common과 hadoop-mapreduce-client-core가 필요하며, java compiler의 version은 1.8로 정해둔다.
<!--pom.xml-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fastcampus.hadoop</groupId>
<artifactId>mapreduce-job</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<hadoop.version>3.3.4</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
</project>
XML
복사
WordCount
MapReduce job을 작성 할 WordCount.java는 다음과 같다.
package com.fastcampus.hadoop;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.StringTokenizer;
//job을 더욱 간편하게 작성할 수 있게 상속받는 Configure, Tool
public class WordCount extends Configured implements Tool {
// 맵퍼 클래스는 static으로 선언
// 제네릭은 순서대로 입력 key, 입력 value, 출력 key, 출력 value
public static class TokenizeMapper extends Mapper<Object, Text, Text, IntWritable> {
private Text word = new Text();
private IntWritable one = new IntWritable(1);
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
StringTokenizer st = new StringTokenizer(value.toString());
while(st.hasMoreTokens()) {
word.set(st.nextToken().toLowerCase());
context.write(word, one);
}
}
}
// 리듀서 클래스는 static으로 선언
// 제네릭은 순서대로 입력 key, 입력 value, 출력 key, 출력 value
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable val : values) {
count += val.get();
}
result.set(count);
context.write(key, result);
}
}
@Override
public int run(String[] strings) throws Exception {
// job 인스턴스 생성
Job job = Job.getInstance(getConf(), "wordcount");
// job에 필요한 설정
job.setJarByClass(WordCount.class); // entry 클래스
job.setMapperClass(TokenizeMapper.class); // 맵퍼 클래스
job.setCombinerClass(IntSumReducer.class); // 컴바이너 클래스
job.setReducerClass(IntSumReducer.class); // 리듀서 클래스
//job.setMapOutputKeyClass(Text.class); // 맵퍼 출력 키 타입 - 맵퍼와 리듀서의 출력 키 타입이 같다면 설정 하지 않아도 된다.
//job.setMapOutputValueClass(IntWritable.class); // 맵퍼 출력 값 타입 - 맵퍼와 리듀서의 출력 값 타입이 같다면 설정 하지 않아도 된다.
job.setOutputKeyClass(Text.class); // 출력 키 타입
job.setOutputValueClass(IntWritable.class); // 출력 값 타입
//job.setInputFormatClass(TextInputFormat.class); // InputFormat 타입 - 설정하지 않으면 기본값(TextInputFormat)으로 지정된다.
//job.setOutputFormatClass(TextOutputFormat.class); // OutputFormat 타입 - 설정하지 않으면 기본값(TextOutputFormat)으로 지정된다.
// job 실행
FileInputFormat.addInputPath(job, new Path(strings[0])); // 입력 파일 경로
FileOutputFormat.setOutputPath(job, new Path(strings[1])); // 출력 파일 경로
return job.waitForCompletion(true) ? 0 : 1;
}
/**
Hadoop에선 GenericOptionsParser란 도구를 제공한다.
GenericOptionsParser는 Hadoop CLI를 통해 Job을 실행할 때 실행인자를 통해 Hadoop의 Configuration을 설정할 수 있도록 지원하는 도구이다.
Hadoop은 이러한 GenericOptionsParser를 편리하게 사용할 수 있도록 Tool이란 인터페이스를 제공한다.
이 Tool의 run 함수를 구현함으로써 MapReduce job을 생성할 수 있다.
*/
public static void main(String[] args) throws Exception {
// main 함수를 통해 실행할 수 있도록 생성
int exitCode = ToolRunner.run(new WordCount(), args);
System.out.println(exitCode);
}
}
Java
복사
모든 자료형에 Primitive 타입이 아닌 Writable타입을 사용하는 이유는, 네트워크로 객체를 전달받기 위해 직렬화와 역직렬화가 필요하기 때문이다.
다음과 같은 명령어로 WordCount 예제를 실행할 수 있다.
# hadoop jar <archive file path> <entry class> <input file path in HDFS> <output directory in HDFS>
> hadoop jar share/hadoop/mapreduce/mapreduce-job-1.0-SNAPSHOT.jar com.fastcampus.hadoop.WordCount /user/malachai/input/LICENSE.txt /user/malachai/output/wc
2022-11-30 08:19:52,866 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at /0.0.0.0:8032
2022-11-30 08:19:53,278 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1669699513691_0004
2022-11-30 08:19:53,520 INFO input.FileInputFormat: Total input files to process : 1
2022-11-30 08:19:53,582 INFO mapreduce.JobSubmitter: number of splits:1
2022-11-30 08:19:53,734 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1669699513691_0004
2022-11-30 08:19:53,734 INFO mapreduce.JobSubmitter: Executing with tokens: []
2022-11-30 08:19:53,924 INFO conf.Configuration: resource-types.xml not found
2022-11-30 08:19:53,924 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2022-11-30 08:19:53,990 INFO impl.YarnClientImpl: Submitted application application_1669699513691_0004
2022-11-30 08:19:54,052 INFO mapreduce.Job: The url to track the job: http://8aeca48e0e41:8088/proxy/application_1669699513691_0004/
2022-11-30 08:19:54,054 INFO mapreduce.Job: Running job: job_1669699513691_0004
2022-11-30 08:20:00,236 INFO mapreduce.Job: Job job_1669699513691_0004 running in uber mode : false
2022-11-30 08:20:00,238 INFO mapreduce.Job: map 0% reduce 0%
2022-11-30 08:20:05,299 INFO mapreduce.Job: map 100% reduce 0%
...
Bash
복사
결과를 확인해보면 다음과 같다.
> hadoop fs -cat /user/malachai/output/wc/part-r-00000
"[]" 1
"as 2
"contribution" 1
"contributor" 1
"control" 1
"derivative 1
"legal 1
"license" 1
"license"); 1
"licensor" 1
"not 1
"notice" 1
"object" 1
"printed 1
"source" 1
...
Bash
복사
Simple GenericOptionsParser Job
MapReduce의 옵션 파싱 기능을 확인하기 위해 작성 할 GenericOptionsParserApp.java는 다음과 같다.
package com.fastcampus.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.util.Arrays;
public class GenericOptionsParserApp {
public static void main(String[] args) throws Exception {
// 1. 모든 argument 확인
System.out.println("1. "+Arrays.toString(args));
Configuration conf = new Configuration();
GenericOptionsParser optionsParser = new GenericOptionsParser(conf, args);
String val1 = conf.get("mapreduce.map.memory.mb");
Boolean val2 = conf.getBoolean("job.test", false);
// 2. 옵션으로 파싱 된 인자 확인
System.out.println("2. "+"mapreduce.map.memory.mb : "+val1+", "+"job.test : "+val2);
String[] remainingArgs = optionsParser.getRemainingArgs();
// 3. 옵션 파싱 후 나머지 argument 확인
System.out.println("3. "+Arrays.toString(remainingArgs));
}
}
Java
복사
다음과 같은 명령어로 GenericOptionsParser 예제를 실행할 수 있다.
> hadoop jar share/hadoop/mapreduce/mapreduce-job-1.0-SNAPSHOT.jar com.fastcampus.hadoop.GenericOptionsParserApp -Dmapreduce.map.memory.mb=4g -Djob.test=true other1 other2
1. [-Dmapreduce.map.memory.mb=4g, -Djob.test=true, other1, other2]
2. mapreduce.map.memory.mb : 4g, job.test : true
3. [other1, other2]
Bash
복사
Simple ToolRunner Job
MapReduce의 옵션 파싱 기능을 확인하기 위해 작성 할 ToolRunnerApp.java는 다음과 같다.
package com.fastcampus.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.util.Arrays;
public class ToolRunnerApp extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
String val1 = conf.get("mapreduce.map.memory.mb");
Boolean val2 = conf.getBoolean("job.test", false);
// 2. 옵션으로 파싱 된 인자 확인
System.out.println("2. "+"mapreduce.map.memory.mb : "+val1+", "+"job.test : "+val2);
// 3. 옵션 파싱 후 나머지 argument 확인
System.out.println("3. "+Arrays.toString(strings));
return 0;
}
public static void main(String[] args) throws Exception {
// 1. 모든 argument 확인
System.out.println("1. "+Arrays.toString(args));
int exitCode = ToolRunner.run(new ToolRunnerApp(), args);
System.out.println(exitCode);
}
}
Java
복사
다음과 같은 명령어로 ToolRunner 예제를 실행할 수 있다.
> hadoop jar share/hadoop/mapreduce/mapreduce-job-1.0-SNAPSHOT.jar com.fastcampus.hadoop.ToolRunnerApp -Dmapreduce.map.memory.mb=4g -Djob.test=true other1 other2
1. [-Dmapreduce.map.memory.mb=4g, -Djob.test=true, other1, other2]
2. mapreduce.map.memory.mb : 4g, job.test : true
3. [other1, other2]
0
Bash
복사
Counter
MapReduce의 Counter기능을 확인하기 위해 작성 할 WordCountWithCounter.java는 다음과 같다. WordCount 로직은 앞선 WordCount.java 예제를 사용하였다.
package com.fastcampus.hadoop;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class WordCountWithCounter extends Configured implements Tool {
static enum Word {
WITHOUT_SPECIAL_CHARACTER,
WITH_SPECIAL_CHARACTER
}
public static class TokenizeMapper extends Mapper<Object, Text, Text, IntWritable> {
private Text word = new Text();
private IntWritable one = new IntWritable(1);
// Pattern matching 을 통해 특수문자를 포함하지 않는 단어의 개수를 counter로 집계
private Pattern pattern = Pattern.compile("[^a-z0-9]", Pattern.CASE_INSENSITIVE);
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
StringTokenizer st = new StringTokenizer(value.toString());
while(st.hasMoreTokens()) {
String str = st.nextToken().toLowerCase();
// Matcher를 통한 pattern matching
Matcher matcher = pattern.matcher(str);
// Counter는 Enum 또는 String을 통해 불러온다.
// 특수문자가 포함된 단어의 Counter
if (matcher.find()) context.getCounter(Word.WITH_SPECIAL_CHARACTER).increment(1);
// 특수문자가 포함되지 않은 단어의 Counter
else context.getCounter(Word.WITHOUT_SPECIAL_CHARACTER).increment(1);
word.set(str);
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable val : values) {
count += val.get();
}
result.set(count);
context.write(key, result);
}
}
@Override
public int run(String[] strings) throws Exception {
// job 인스턴스 생성
Job job = Job.getInstance(getConf(), "wordcount");
// job에 필요한 설정
job.setJarByClass(WordCountWithCounter.class); // entry 클래스
job.setMapperClass(TokenizeMapper.class); // 맵퍼 클래스
job.setCombinerClass(IntSumReducer.class); // 컴바이너 클래스
job.setReducerClass(IntSumReducer.class); // 리듀서 클래스
job.setOutputKeyClass(Text.class); // 출력 키 타입
job.setOutputValueClass(IntWritable.class); // 출력 값 타입
FileInputFormat.addInputPath(job, new Path(strings[0])); // 입력 파일 경로
FileOutputFormat.setOutputPath(job, new Path(strings[1])); // 출력 파일 경로
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new WordCountWithCounter(), args);
System.out.println(exitCode);
}
}
Java
복사
다음과 같은 명령어로 WordCountWithCounter 예제를 실행할 수 있다.
> hadoop jar share/hadoop/mapreduce/mapreduce-job-1.0-SNAPSHOT.jar com.fastcampus.hadoop.WordCountWithCounter /user/malachai/input/LICENSE.txt /user/malachai/output/wc
...
com.fastcampus.hadoop.WordCountWithCounter$Word
WITHOUT_SPECIAL_CHARACTER=1350
WITH_SPECIAL_CHARACTER=322
...
Bash
복사
Sort
MapReduce의 정렬기능을 확인하기 위해 작성 할 SortWordCount.java는 다음과 같다. 이 예제는 WordCount 작업을 통해 출력된tab separated key-value 데이터를 단어의 개수 기준으로 오름차순 정렬한다.
package com.fastcampus.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class SortWordCount extends Configured implements Tool {
// 정렬에 사용될 mapper를 정의한다.
public static class SortMapper extends Mapper<Text, Text, LongWritable, Text> {
@Override
protected void map(Text key, Text value, Mapper<Text, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException {
// MapReduce는 key를 기준으로 정렬하기 때문에, 단어 개수를 key로 반환하도록 한다.
context.write(new LongWritable(Long.parseLong(value.toString())), key);
}
}
@Override
public int run(String[] strings) throws Exception {
// WordCount의 출력은 tab separated key-value 형태이다.
// separator는 configuration을 통해 설정해준다.
Configuration conf = getConf();
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");
Job job = Job.getInstance(conf, "sort-wordcount");
job.setJarByClass(SortWordCount.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(SortMapper.class);
// Reducer 클래스는 따로 정의하지 않고 default Reducer 를 사용한다.
// 대신, 하나의 Reducer 에 데이터를 전달함으로써 하나의 파일에 전체정렬이 일어나도록 한다.
// 만약 여러개의 Reducer 를 사용하게 된다면, 여러개의 파일에 나뉘어 각각 정렬되는 부분정령이 일어나게 된다.
job.setNumReduceTasks(1);
FileInputFormat.addInputPath(job, new Path(strings[0]));
FileOutputFormat.setOutputPath(job, new Path(strings[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new SortWordCount(), args);
System.out.println(exitCode);
}
}
Java
복사
다음과 같은 명령어로 SortWordCount 예제를 실행할 수 있다.
> hadoop jar share/hadoop/mapreduce/mapreduce-job-1.0-SNAPSHOT.jar com.fastcampus.hadoop.SortWordCount /user/malachai/output/wc/part-r-00000 /user/malachai/output/sortwc
> hadoop fs -cat /user/malachai/output/sortwc/part-r-00000
...
25 work
25 you
30 any
40 to
45 and
64 or
65 of
99 the
Bash
복사
Distrubuted Cache
Distributed Cache 기능을 적극적으로 사용할 수 있는 작업인 join 작업을 구현한다. join 작업은 Mapper에서 연산을 진행하는 Map-side Join, Reducer에서 연산을 진행하는 Reducer-side Join의 두가지로 나뉜다.
Map-side Join
Map-side Join 작업을 위해 작성할 DistCacheMapSideJoin.java는 다음과 같다. 이 예제는 HDFS의 dataset에 존재하는 departments, employees 데이터를 department_no 기준으로 join한다.
package com.fastcampus.hadoop;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class DistCacheMapSideJoin extends Configured implements Tool {
public static class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
Text outKey = new Text();
Text outValue = new Text();
// Mapper 내에선 캐싱되어있는 파일을 불러오는 코드가 필요하다.
// 캐싱된 파일을 메모리에 담아두기 위해 객체를 생성한다.
Map<String, String> departmentsMap = new HashMap<>();
// setup 함수의 오버라이딩을 통해 캐싱된 파일을 불러올 수 있다.
@Override
protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
URI[] uris = context.getCacheFiles();
for (URI uri : uris) {
Path path = new Path(uri.getPath());
// 파일명을 사용하여 정의한 메모리 로드함수를 호출한다.
loadDepartmentMap(path.getName());
}
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
// employees 데이터는 TextInputFormat 의 형태로 입력받는 다.
/* employees 데이터의 형태는 다음과 같다.
(emp_no, birth_date, first_name, last_name, sex, hire_date, dept_no)
10012,1960-10-04,Patricio,Bridgland,M,1992-12-18,d005
10013,1963-06-07,Eberhardt,Terkki,M,1985-10-20,d003
10014,1956-02-12,Berni,Genin,M,1987-03-11,d005
10015,1959-08-19,Guoxiang,Nooteboom,M,1987-07-02,d008
*/
String[] split = value.toString().split(",");
outKey.set(split[0]);
String deptName = departmentsMap.get(split[6]);
deptName = deptName==null ? "Not Found" : deptName;
outValue.set(split[2]+"\t"+split[4]+"\t"+deptName);
context.write(outKey, outValue);
}
// departmentMap 객체에 데이터를 담아줄 함수를 정의한다.
/* department 데이터의 형태는 다음과 같다.
(dept_no, dept_name)
d009,Customer Service
d005,Development
d002,Finance
d003,Human Resources
*/
private void loadDepartmentMap(String fileName) throws IOException {
String line = "";
try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
while ((line = br.readLine()) != null) {
String[] split = line.split(",");
departmentsMap.put(split[0], split[1]);
}
}
}
}
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(getConf(), "DistributedCacheMapSideJoin");
// 분산캐시를 사용하기 위해 HDFS에 업로드 되어있는 데이터셋을 로드한다.
job.addCacheFile(new URI("/user/malachai/dataset/departments"));
job.setJarByClass(DistCacheMapSideJoin.class);
job.setMapperClass(MapSideJoinMapper.class);
// Reducer 를 사용하지 않음으로써 Mapper 작업만 수행하도록 한다.
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(strings[0]));
FileOutputFormat.setOutputPath(job, new Path(strings[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new DistCacheMapSideJoin(), args);
System.out.println(exitCode);
}
}
Java
복사
다음과 같은 명령어로 DistCacheMapSideJoin 예제를 실행할 수 있다. API단에서 departments 파일을 읽어들이고 있으니 GenericOptionsParser의 -files 옵션으로 따로 설정할 필요는 없다.
> hadoop jar share/hadoop/mapreduce/mapreduce-job-1.0-SNAPSHOT.jar com.fastcampus.hadoop.DistCacheMapSideJoin /user/malachai/dataset/employees /user/malachai/output/mapjoin
> hadoop fs -cat /user/malachai/output/mapjoin/part-m-00000
...
499995 Dekang F Production
499996 Zito M Production
499997 Berhard M Development
499998 Patricia M Finance
499999 Sachin M Production
Bash
복사
앞선 예제들과 출력 파일명이 다른 모습을 볼 수 있다. 여기서 map 작업의 결과물을 m, reduce 작업의 결과물을 r로 명명함을 알 수 있다.
Reduce-side Join
Reduce-side Join은 join 연산의 좀 더 일반적인 경우이다. 동일한 key를 가진 레코드를 같은 Reducer로 모이도록 하는 Shuffling 과정의 특징을 이용하여 join key를 Mapper의 출력 key로 지정하는 방식이다.
Reduce-side Join 작업을 위해 작성할 DistCacheReduceSideJoin.java는 다음과 같다. 이 예제는 HDFS의 dataset에 존재하는 departments, employees 데이터를 department_no 기준으로 join한다.
package com.fastcampus.hadoop;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class DistCacheReduceSideJoin extends Configured implements Tool {
public static class EmployeeMapper extends Mapper<LongWritable, Text, Text, Text> {
Text outKey = new Text();
Text outValue = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
String[] split = value.toString().split(",");
// mapper 의 출력 key 를 dept_no 로 설정한다.
outKey.set(split[6]);
// 어느 mapper 에서 출력한 레코드인지 알 수 있도록 prefix 를 추가한다.
outValue.set("e"+"\t"+split[0]+"\t"+split[2]+"\t"+split[4]+"\t");
context.write(outKey, outValue);
}
}
public static class DepartmentMapper extends Mapper<LongWritable, Text, Text, Text> {
Text outKey = new Text();
Text outValue = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
String[] split = value.toString().split(",");
// mapper 의 출력 key 를 dept_no 로 설정한다.
outKey.set(split[0]);
// 어느 mapper 에서 출력한 레코드인지 알 수 있도록 prefix 를 추가한다.
outValue.set("d"+"\t"+split[1]);
context.write(outKey, outValue);
}
}
public static class ReduceSideJoinReducer extends Reducer<Text, Text, Text, Text> {
Text outKey = new Text();
Text outValue = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
Map<String, String> employeeMap = new HashMap<>();
String deptName = "Not Found";
for (Text t : values) {
String[] split = t.toString().split("\t");
// EmployeeMapper 에서 입력받은 데이터
if(split[0].equals("e")) {
employeeMap.put(split[1], split[2]+"\t"+split[3]);
}
// DepartmentMapper 에서 입력받은 데이터
else if (split[0].equals("d")) {
deptName = split[1];
}
}
// employeeMap 을 순회하며 deptName 을 join 한다.
for (Map.Entry<String, String> e : employeeMap.entrySet()) {
outKey.set(e.getKey());
outValue.set(e.getValue()+"\t"+deptName);
context.write(outKey, outValue);
}
}
}
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(getConf(), "DistributedCacheReduceSideJoin");
job.setJarByClass(DistCacheReduceSideJoin.class);
job.setReducerClass(ReduceSideJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// Mapper나 InputFormat이 서로 다른 하나 이상의 파일 경로로 입력을 받기 위해 MultipleInputs api를 사용한다.
MultipleInputs.addInputPath(job, new Path(strings[0]), TextInputFormat.class, EmployeeMapper.class);
MultipleInputs.addInputPath(job, new Path(strings[1]), TextInputFormat.class, DepartmentMapper.class);
FileOutputFormat.setOutputPath(job, new Path(strings[2]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new DistCacheReduceSideJoin(), args);
System.out.println(exitCode);
}
}
Java
복사
다음과 같은 명령어로 DistCacheReduceSideJoin 예제를 실행할 수 있다.
> hadoop jar share/hadoop/mapreduce/mapreduce-job-1.0-SNAPSHOT.jar com.fastcampus.hadoop.DistCacheReduceSideJoin /user/malachai/dataset/employees /user/malachai/dataset/departments /user/malachai/output/reducejoin
> hadoop fs -cat /user/malachai/output/reducejoin/part-r-00000
...
51876 Mamdouh M Customer Service
458661 Stepehn M Customer Service
230803 Dannz M Customer Service
216515 Zongyan M Customer Service
433386 Ayakannu M Customer Service
Bash
복사
Reduce-side Join
Reduce-side Join에서 Secondary Sort의 경우는 조금 더 복잡하다.
•
TextTupleWritable - 정렬하고자 하는 값을 포함하는 복합 키 클래스
•
KeyPartitioner - 정의한 복합키를 통해 어느 Reducer에게 전달될 지 결정하는 Partitioner 클래스
•
GroupComparator - Reduce에 입력될 값을 그룹핑 해주는 클래스 정의
•
KeyComparator - Reducer의 입력을 key를 기준으로 정렬해주기 위한 정렬 클래스
Reduce-side Join 후 사용자 정의 키를 기준으로 Secondary Sort를 수행하는 작업을 위해 작성할 코드는 다음과 같다. 이 예제는 HDFS의 dataset에 존재하는 departments, employees 데이터를 department_no 기준으로 join한 이후 department_no를 기준으로 정렬한다.
// TextTupleWritable.java
package com.fastcampus.hadoop.key;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
public class TextTupleWritable implements WritableComparable<TextTupleWritable> {
private Text key;
private Text value;
public TextTupleWritable() {
this.key = new Text();
this.value = new Text();
}
public void set(Text key, Text value) {
this.key = key;
this.value = value;
}
public Text getKey() {
return key;
}
public Text getValue() {
return value;
}
@Override
public int compareTo(TextTupleWritable o) {
int cmp = key.compareTo(o.key);
if (cmp!=0) return cmp;
return value.compareTo(o.value);
}
@Override
public void write(DataOutput dataOutput) throws IOException {
key.write(dataOutput);
value.write(dataOutput);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
key.readFields(dataInput);
value.readFields(dataInput);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TextTupleWritable that = (TextTupleWritable) o;
return Objects.equals(key, that.key) && Objects.equals(value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(key, value);
}
@Override
public String toString() {
return "EntryWritable{" +
"key=" + key +
", value=" + value +
'}';
}
}
Java
복사
// DistCacheReduceSideJoinCustomKey.java
package com.fastcampus.hadoop;
import com.fastcampus.hadoop.key.TextTupleWritable;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Iterator;
public class DistCacheReduceSideJoinCustomKey extends Configured implements Tool {
enum DataType {
DEPARTMENT("ENUM_DATATYPE_DEPARTMENT"),
EMPLOYEE("ENUM_DATATYPE_EMPLOYEE");
private final String value;
DataType(String value){
this.value = value;
}
public String value() {
return value;
}
}
// Paritioner 클래스
// Key 를 기준으로 전달할 Reducer 를 결정한다.
public static class KeyPartitioner extends Partitioner<TextTupleWritable, Text> {
// 파라미터는 순서대로 키, 값, 파티션 개수이다.
// 반환값은 Partition의 번호이다.
// EntryWritable의 key값이 같은 경우엔 같은 partition 번호를 반환한다.
// Bitwise 연산을 한 이유는, partition 번호는 항상 양수여야 하기 떄문이다.
@Override
public int getPartition(TextTupleWritable key, Text value, int i) {
return (key.getKey().hashCode() & Integer.MAX_VALUE) % i;
}
}
// Grouping 클래스
// Reducer 에 전달 된 값들을 집계한다.
// Modular 연산으로 인해 같은 Partitioner 에 다른 해시 값을 가진 key 가 들어올 수 있으므로, 실제 값과 비교하여 집계하는 클래스가 필요하다.
// Key 가 같은 레코드들은 같은 reduce 함수에 입력된다.
public static class GroupComparator extends WritableComparator {
protected GroupComparator() {
super(TextTupleWritable.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
TextTupleWritable t1 = (TextTupleWritable) a;
TextTupleWritable t2 = (TextTupleWritable) b;
return t1.getKey().compareTo(t2.getKey());
}
}
// Key 정렬 클래스
// 앞서 Grouping 된 레코드들을 정렬 해준다.
public static class KeyComparator extends WritableComparator {
protected KeyComparator() {
super(TextTupleWritable.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
TextTupleWritable t1 = (TextTupleWritable) a;
TextTupleWritable t2 = (TextTupleWritable) b;
int cmp = t1.getKey().compareTo(t2.getKey());
if (cmp!=0) return cmp;
return t1.getValue().compareTo(t2.getValue());
}
}
// Employee Mapper 클래스
public static class EmployeeMapper extends Mapper<LongWritable, Text, TextTupleWritable, Text> {
TextTupleWritable outKey = new TextTupleWritable();
Text outValue = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TextTupleWritable, Text>.Context context) throws IOException, InterruptedException {
String[] split = value.toString().split(",");
outKey.set(new Text(split[6]), new Text(DataType.EMPLOYEE.value()));
outValue.set(split[0]+"\t"+split[2]+"\t"+split[4]+"\t");
context.write(outKey, outValue);
}
}
// Department Mapper 클래스
public static class DepartmentMapper extends Mapper<LongWritable, Text, TextTupleWritable, Text> {
TextTupleWritable outKey = new TextTupleWritable();
Text outValue = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TextTupleWritable, Text>.Context context) throws IOException, InterruptedException {
String[] split = value.toString().split(",");
outKey.set(new Text(split[0]), new Text(DataType.DEPARTMENT.value()));
outValue.set(split[1]);
context.write(outKey, outValue);
}
}
public static class ReduceSideJoinReducer extends Reducer<TextTupleWritable, Text, Text, Text> {
Text outKey = new Text();
Text outValue = new Text();
@Override
protected void reduce(TextTupleWritable key, Iterable<Text> values, Reducer<TextTupleWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
Iterator<Text> iter = values.iterator();
// values는 Enum의 value 값을 기준으로 정렬되어 입력되기 때문에 첫 레코드의 값은 무조건 ENUM_DATATYPE_DEPARTMENT의 값이다.
String departmentText = iter.next().toString();
// 이후의 값은 모두 ENUM_DATATYPE_EMPLOYEE의 값이다.
while(iter.hasNext()) {
Text employeeText = iter.next();
String[] employeeSplit = employeeText.toString().split("\t");
outKey.set(employeeSplit[0]);
outValue.set(employeeSplit[1]+"\t"+employeeSplit[1]+"\t"+departmentText);
context.write(outKey, outValue);
}
}
}
@Override
public int run(String[] strings) throws Exception {
Job job = Job.getInstance(getConf(), "ReduceSideJoinCustomKey");
job.setJarByClass(DistCacheReduceSideJoinCustomKey.class);
job.setReducerClass(ReduceSideJoinReducer.class);
job.setMapOutputKeyClass(TextTupleWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(KeyPartitioner.class); // Partitioner 클래스
job.setSortComparatorClass(KeyComparator.class); // 2차 정렬 클래스
job.setGroupingComparatorClass(GroupComparator.class); // Grouping 클래스
MultipleInputs.addInputPath(job, new Path(strings[0]), TextInputFormat.class, EmployeeMapper.class);
MultipleInputs.addInputPath(job, new Path(strings[1]), TextInputFormat.class, DepartmentMapper.class);
FileOutputFormat.setOutputPath(job, new Path(strings[2]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new DistCacheReduceSideJoinCustomKey(), args);
System.out.println(exitCode);
}
}
Java
복사
다음과 같은 명령어로 DistCacheReduceSideJoin 예제를 실행할 수 있다.
> hadoop jar share/hadoop/mapreduce/mapreduce-job-1.0-SNAPSHOT.jar com.fastcampus.hadoop.DistCacheReduceSideJoinCustomKey /user/malachai/dataset/employees /user/malachai/dataset/departments /user/malachai/output/reducesortjoin
> hadoop fs -head /user/malachai/output/reducesortjoin/part-r-00000
225998 Nakhoon Nakhoon Marketing
62864 Guozhong Guozhong Marketing
220036 Chriss Chriss Marketing
494457 Ashish Ashish Marketing
294165 Michaela Michaela Marketing
...
Bash
복사