Search

MapReduce Example Jobs

Maven 프로젝트를 통해 간단한 MapReduce 작업을 개발한다.
기본적인 dependency는 hadoop-commonhadoop-mapreduce-client-core가 필요하며, java compiler의 version은 1.8로 정해둔다.
<!--pom.xml--> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.fastcampus.hadoop</groupId> <artifactId>mapreduce-job</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> <hadoop.version>3.3.4</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> </project>
XML
복사

WordCount

MapReduce job을 작성 할 WordCount.java는 다음과 같다.
package com.fastcampus.hadoop; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.StringTokenizer; //job을 더욱 간편하게 작성할 수 있게 상속받는 Configure, Tool public class WordCount extends Configured implements Tool { // 맵퍼 클래스는 static으로 선언 // 제네릭은 순서대로 입력 key, 입력 value, 출력 key, 출력 value public static class TokenizeMapper extends Mapper<Object, Text, Text, IntWritable> { private Text word = new Text(); private IntWritable one = new IntWritable(1); @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(value.toString()); while(st.hasMoreTokens()) { word.set(st.nextToken().toLowerCase()); context.write(word, one); } } } // 리듀서 클래스는 static으로 선언 // 제네릭은 순서대로 입력 key, 입력 value, 출력 key, 출력 value public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable val : values) { count += val.get(); } result.set(count); context.write(key, result); } } @Override public int run(String[] strings) throws Exception { // job 인스턴스 생성 Job job = Job.getInstance(getConf(), "wordcount"); // job에 필요한 설정 job.setJarByClass(WordCount.class); // entry 클래스 job.setMapperClass(TokenizeMapper.class); // 맵퍼 클래스 job.setCombinerClass(IntSumReducer.class); // 컴바이너 클래스 job.setReducerClass(IntSumReducer.class); // 리듀서 클래스 //job.setMapOutputKeyClass(Text.class); // 맵퍼 출력 키 타입 - 맵퍼와 리듀서의 출력 키 타입이 같다면 설정 하지 않아도 된다. //job.setMapOutputValueClass(IntWritable.class); // 맵퍼 출력 값 타입 - 맵퍼와 리듀서의 출력 값 타입이 같다면 설정 하지 않아도 된다. job.setOutputKeyClass(Text.class); // 출력 키 타입 job.setOutputValueClass(IntWritable.class); // 출력 값 타입 //job.setInputFormatClass(TextInputFormat.class); // InputFormat 타입 - 설정하지 않으면 기본값(TextInputFormat)으로 지정된다. //job.setOutputFormatClass(TextOutputFormat.class); // OutputFormat 타입 - 설정하지 않으면 기본값(TextOutputFormat)으로 지정된다. // job 실행 FileInputFormat.addInputPath(job, new Path(strings[0])); // 입력 파일 경로 FileOutputFormat.setOutputPath(job, new Path(strings[1])); // 출력 파일 경로 return job.waitForCompletion(true) ? 0 : 1; } /** Hadoop에선 GenericOptionsParser란 도구를 제공한다. GenericOptionsParser는 Hadoop CLI를 통해 Job을 실행할 때 실행인자를 통해 Hadoop의 Configuration을 설정할 수 있도록 지원하는 도구이다. Hadoop은 이러한 GenericOptionsParser를 편리하게 사용할 수 있도록 Tool이란 인터페이스를 제공한다. 이 Tool의 run 함수를 구현함으로써 MapReduce job을 생성할 수 있다. */ public static void main(String[] args) throws Exception { // main 함수를 통해 실행할 수 있도록 생성 int exitCode = ToolRunner.run(new WordCount(), args); System.out.println(exitCode); } }
Java
복사
모든 자료형에 Primitive 타입이 아닌 Writable타입을 사용하는 이유는, 네트워크로 객체를 전달받기 위해 직렬화와 역직렬화가 필요하기 때문이다.
다음과 같은 명령어로 WordCount 예제를 실행할 수 있다.
# hadoop jar <archive file path> <entry class> <input file path in HDFS> <output directory in HDFS> > hadoop jar share/hadoop/mapreduce/mapreduce-job-1.0-SNAPSHOT.jar com.fastcampus.hadoop.WordCount /user/malachai/input/LICENSE.txt /user/malachai/output/wc 2022-11-30 08:19:52,866 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at /0.0.0.0:8032 2022-11-30 08:19:53,278 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1669699513691_0004 2022-11-30 08:19:53,520 INFO input.FileInputFormat: Total input files to process : 1 2022-11-30 08:19:53,582 INFO mapreduce.JobSubmitter: number of splits:1 2022-11-30 08:19:53,734 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1669699513691_0004 2022-11-30 08:19:53,734 INFO mapreduce.JobSubmitter: Executing with tokens: [] 2022-11-30 08:19:53,924 INFO conf.Configuration: resource-types.xml not found 2022-11-30 08:19:53,924 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'. 2022-11-30 08:19:53,990 INFO impl.YarnClientImpl: Submitted application application_1669699513691_0004 2022-11-30 08:19:54,052 INFO mapreduce.Job: The url to track the job: http://8aeca48e0e41:8088/proxy/application_1669699513691_0004/ 2022-11-30 08:19:54,054 INFO mapreduce.Job: Running job: job_1669699513691_0004 2022-11-30 08:20:00,236 INFO mapreduce.Job: Job job_1669699513691_0004 running in uber mode : false 2022-11-30 08:20:00,238 INFO mapreduce.Job: map 0% reduce 0% 2022-11-30 08:20:05,299 INFO mapreduce.Job: map 100% reduce 0% ...
Bash
복사
결과를 확인해보면 다음과 같다.
> hadoop fs -cat /user/malachai/output/wc/part-r-00000 "[]" 1 "as 2 "contribution" 1 "contributor" 1 "control" 1 "derivative 1 "legal 1 "license" 1 "license"); 1 "licensor" 1 "not 1 "notice" 1 "object" 1 "printed 1 "source" 1 ...
Bash
복사

Simple GenericOptionsParser Job

MapReduce의 옵션 파싱 기능을 확인하기 위해 작성 할 GenericOptionsParserApp.java는 다음과 같다.
package com.fastcampus.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.util.Arrays; public class GenericOptionsParserApp { public static void main(String[] args) throws Exception { // 1. 모든 argument 확인 System.out.println("1. "+Arrays.toString(args)); Configuration conf = new Configuration(); GenericOptionsParser optionsParser = new GenericOptionsParser(conf, args); String val1 = conf.get("mapreduce.map.memory.mb"); Boolean val2 = conf.getBoolean("job.test", false); // 2. 옵션으로 파싱 된 인자 확인 System.out.println("2. "+"mapreduce.map.memory.mb : "+val1+", "+"job.test : "+val2); String[] remainingArgs = optionsParser.getRemainingArgs(); // 3. 옵션 파싱 후 나머지 argument 확인 System.out.println("3. "+Arrays.toString(remainingArgs)); } }
Java
복사
다음과 같은 명령어로 GenericOptionsParser 예제를 실행할 수 있다.
> hadoop jar share/hadoop/mapreduce/mapreduce-job-1.0-SNAPSHOT.jar com.fastcampus.hadoop.GenericOptionsParserApp -Dmapreduce.map.memory.mb=4g -Djob.test=true other1 other2 1. [-Dmapreduce.map.memory.mb=4g, -Djob.test=true, other1, other2] 2. mapreduce.map.memory.mb : 4g, job.test : true 3. [other1, other2]
Bash
복사

Simple ToolRunner Job

MapReduce의 옵션 파싱 기능을 확인하기 위해 작성 할 ToolRunnerApp.java는 다음과 같다.
package com.fastcampus.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.util.Arrays; public class ToolRunnerApp extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { Configuration conf = getConf(); String val1 = conf.get("mapreduce.map.memory.mb"); Boolean val2 = conf.getBoolean("job.test", false); // 2. 옵션으로 파싱 된 인자 확인 System.out.println("2. "+"mapreduce.map.memory.mb : "+val1+", "+"job.test : "+val2); // 3. 옵션 파싱 후 나머지 argument 확인 System.out.println("3. "+Arrays.toString(strings)); return 0; } public static void main(String[] args) throws Exception { // 1. 모든 argument 확인 System.out.println("1. "+Arrays.toString(args)); int exitCode = ToolRunner.run(new ToolRunnerApp(), args); System.out.println(exitCode); } }
Java
복사
다음과 같은 명령어로 ToolRunner 예제를 실행할 수 있다.
> hadoop jar share/hadoop/mapreduce/mapreduce-job-1.0-SNAPSHOT.jar com.fastcampus.hadoop.ToolRunnerApp -Dmapreduce.map.memory.mb=4g -Djob.test=true other1 other2 1. [-Dmapreduce.map.memory.mb=4g, -Djob.test=true, other1, other2] 2. mapreduce.map.memory.mb : 4g, job.test : true 3. [other1, other2] 0
Bash
복사

Counter

MapReduce의 Counter기능을 확인하기 위해 작성 할 WordCountWithCounter.java는 다음과 같다. WordCount 로직은 앞선 WordCount.java 예제를 사용하였다.
package com.fastcampus.hadoop; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.StringTokenizer; import java.util.regex.Matcher; import java.util.regex.Pattern; public class WordCountWithCounter extends Configured implements Tool { static enum Word { WITHOUT_SPECIAL_CHARACTER, WITH_SPECIAL_CHARACTER } public static class TokenizeMapper extends Mapper<Object, Text, Text, IntWritable> { private Text word = new Text(); private IntWritable one = new IntWritable(1); // Pattern matching 을 통해 특수문자를 포함하지 않는 단어의 개수를 counter로 집계 private Pattern pattern = Pattern.compile("[^a-z0-9]", Pattern.CASE_INSENSITIVE); @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { StringTokenizer st = new StringTokenizer(value.toString()); while(st.hasMoreTokens()) { String str = st.nextToken().toLowerCase(); // Matcher를 통한 pattern matching Matcher matcher = pattern.matcher(str); // Counter는 Enum 또는 String을 통해 불러온다. // 특수문자가 포함된 단어의 Counter if (matcher.find()) context.getCounter(Word.WITH_SPECIAL_CHARACTER).increment(1); // 특수문자가 포함되지 않은 단어의 Counter else context.getCounter(Word.WITHOUT_SPECIAL_CHARACTER).increment(1); word.set(str); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable val : values) { count += val.get(); } result.set(count); context.write(key, result); } } @Override public int run(String[] strings) throws Exception { // job 인스턴스 생성 Job job = Job.getInstance(getConf(), "wordcount"); // job에 필요한 설정 job.setJarByClass(WordCountWithCounter.class); // entry 클래스 job.setMapperClass(TokenizeMapper.class); // 맵퍼 클래스 job.setCombinerClass(IntSumReducer.class); // 컴바이너 클래스 job.setReducerClass(IntSumReducer.class); // 리듀서 클래스 job.setOutputKeyClass(Text.class); // 출력 키 타입 job.setOutputValueClass(IntWritable.class); // 출력 값 타입 FileInputFormat.addInputPath(job, new Path(strings[0])); // 입력 파일 경로 FileOutputFormat.setOutputPath(job, new Path(strings[1])); // 출력 파일 경로 return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new WordCountWithCounter(), args); System.out.println(exitCode); } }
Java
복사
다음과 같은 명령어로 WordCountWithCounter 예제를 실행할 수 있다.
> hadoop jar share/hadoop/mapreduce/mapreduce-job-1.0-SNAPSHOT.jar com.fastcampus.hadoop.WordCountWithCounter /user/malachai/input/LICENSE.txt /user/malachai/output/wc ... com.fastcampus.hadoop.WordCountWithCounter$Word WITHOUT_SPECIAL_CHARACTER=1350 WITH_SPECIAL_CHARACTER=322 ...
Bash
복사

Sort

MapReduce의 정렬기능을 확인하기 위해 작성 할 SortWordCount.java는 다음과 같다. 이 예제는 WordCount 작업을 통해 출력된tab separated key-value 데이터를 단어의 개수 기준으로 오름차순 정렬한다.
package com.fastcampus.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class SortWordCount extends Configured implements Tool { // 정렬에 사용될 mapper를 정의한다. public static class SortMapper extends Mapper<Text, Text, LongWritable, Text> { @Override protected void map(Text key, Text value, Mapper<Text, Text, LongWritable, Text>.Context context) throws IOException, InterruptedException { // MapReduce는 key를 기준으로 정렬하기 때문에, 단어 개수를 key로 반환하도록 한다. context.write(new LongWritable(Long.parseLong(value.toString())), key); } } @Override public int run(String[] strings) throws Exception { // WordCount의 출력은 tab separated key-value 형태이다. // separator는 configuration을 통해 설정해준다. Configuration conf = getConf(); conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t"); Job job = Job.getInstance(conf, "sort-wordcount"); job.setJarByClass(SortWordCount.class); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(SortMapper.class); // Reducer 클래스는 따로 정의하지 않고 default Reducer 를 사용한다. // 대신, 하나의 Reducer 에 데이터를 전달함으로써 하나의 파일에 전체정렬이 일어나도록 한다. // 만약 여러개의 Reducer 를 사용하게 된다면, 여러개의 파일에 나뉘어 각각 정렬되는 부분정령이 일어나게 된다. job.setNumReduceTasks(1); FileInputFormat.addInputPath(job, new Path(strings[0])); FileOutputFormat.setOutputPath(job, new Path(strings[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SortWordCount(), args); System.out.println(exitCode); } }
Java
복사
다음과 같은 명령어로 SortWordCount 예제를 실행할 수 있다.
> hadoop jar share/hadoop/mapreduce/mapreduce-job-1.0-SNAPSHOT.jar com.fastcampus.hadoop.SortWordCount /user/malachai/output/wc/part-r-00000 /user/malachai/output/sortwc > hadoop fs -cat /user/malachai/output/sortwc/part-r-00000 ... 25 work 25 you 30 any 40 to 45 and 64 or 65 of 99 the
Bash
복사

Distrubuted Cache

Distributed Cache 기능을 적극적으로 사용할 수 있는 작업인 join 작업을 구현한다. join 작업은 Mapper에서 연산을 진행하는 Map-side Join, Reducer에서 연산을 진행하는 Reducer-side Join의 두가지로 나뉜다.

Map-side Join

Map-side Join 작업을 위해 작성할 DistCacheMapSideJoin.java는 다음과 같다. 이 예제는 HDFS의 dataset에 존재하는 departments, employees 데이터를 department_no 기준으로 join한다.
package com.fastcampus.hadoop; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.Arrays; import java.util.HashMap; import java.util.Map; public class DistCacheMapSideJoin extends Configured implements Tool { public static class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, Text> { Text outKey = new Text(); Text outValue = new Text(); // Mapper 내에선 캐싱되어있는 파일을 불러오는 코드가 필요하다. // 캐싱된 파일을 메모리에 담아두기 위해 객체를 생성한다. Map<String, String> departmentsMap = new HashMap<>(); // setup 함수의 오버라이딩을 통해 캐싱된 파일을 불러올 수 있다. @Override protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { URI[] uris = context.getCacheFiles(); for (URI uri : uris) { Path path = new Path(uri.getPath()); // 파일명을 사용하여 정의한 메모리 로드함수를 호출한다. loadDepartmentMap(path.getName()); } } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { // employees 데이터는 TextInputFormat 의 형태로 입력받는 다. /* employees 데이터의 형태는 다음과 같다. (emp_no, birth_date, first_name, last_name, sex, hire_date, dept_no) 10012,1960-10-04,Patricio,Bridgland,M,1992-12-18,d005 10013,1963-06-07,Eberhardt,Terkki,M,1985-10-20,d003 10014,1956-02-12,Berni,Genin,M,1987-03-11,d005 10015,1959-08-19,Guoxiang,Nooteboom,M,1987-07-02,d008 */ String[] split = value.toString().split(","); outKey.set(split[0]); String deptName = departmentsMap.get(split[6]); deptName = deptName==null ? "Not Found" : deptName; outValue.set(split[2]+"\t"+split[4]+"\t"+deptName); context.write(outKey, outValue); } // departmentMap 객체에 데이터를 담아줄 함수를 정의한다. /* department 데이터의 형태는 다음과 같다. (dept_no, dept_name) d009,Customer Service d005,Development d002,Finance d003,Human Resources */ private void loadDepartmentMap(String fileName) throws IOException { String line = ""; try (BufferedReader br = new BufferedReader(new FileReader(fileName))) { while ((line = br.readLine()) != null) { String[] split = line.split(","); departmentsMap.put(split[0], split[1]); } } } } @Override public int run(String[] strings) throws Exception { Job job = Job.getInstance(getConf(), "DistributedCacheMapSideJoin"); // 분산캐시를 사용하기 위해 HDFS에 업로드 되어있는 데이터셋을 로드한다. job.addCacheFile(new URI("/user/malachai/dataset/departments")); job.setJarByClass(DistCacheMapSideJoin.class); job.setMapperClass(MapSideJoinMapper.class); // Reducer 를 사용하지 않음으로써 Mapper 작업만 수행하도록 한다. job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(strings[0])); FileOutputFormat.setOutputPath(job, new Path(strings[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new DistCacheMapSideJoin(), args); System.out.println(exitCode); } }
Java
복사
다음과 같은 명령어로 DistCacheMapSideJoin 예제를 실행할 수 있다. API단에서 departments 파일을 읽어들이고 있으니 GenericOptionsParser의 -files 옵션으로 따로 설정할 필요는 없다.
> hadoop jar share/hadoop/mapreduce/mapreduce-job-1.0-SNAPSHOT.jar com.fastcampus.hadoop.DistCacheMapSideJoin /user/malachai/dataset/employees /user/malachai/output/mapjoin > hadoop fs -cat /user/malachai/output/mapjoin/part-m-00000 ... 499995 Dekang F Production 499996 Zito M Production 499997 Berhard M Development 499998 Patricia M Finance 499999 Sachin M Production
Bash
복사
앞선 예제들과 출력 파일명이 다른 모습을 볼 수 있다. 여기서 map 작업의 결과물을 m, reduce 작업의 결과물을 r로 명명함을 알 수 있다.

Reduce-side Join

Reduce-side Join은 join 연산의 좀 더 일반적인 경우이다. 동일한 key를 가진 레코드를 같은 Reducer로 모이도록 하는 Shuffling 과정의 특징을 이용하여 join key를 Mapper의 출력 key로 지정하는 방식이다.
Reduce-side Join 작업을 위해 작성할 DistCacheReduceSideJoin.java는 다음과 같다. 이 예제는 HDFS의 dataset에 존재하는 departments, employees 데이터를 department_no 기준으로 join한다.
package com.fastcampus.hadoop; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.HashMap; import java.util.Map; public class DistCacheReduceSideJoin extends Configured implements Tool { public static class EmployeeMapper extends Mapper<LongWritable, Text, Text, Text> { Text outKey = new Text(); Text outValue = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); // mapper 의 출력 key 를 dept_no 로 설정한다. outKey.set(split[6]); // 어느 mapper 에서 출력한 레코드인지 알 수 있도록 prefix 를 추가한다. outValue.set("e"+"\t"+split[0]+"\t"+split[2]+"\t"+split[4]+"\t"); context.write(outKey, outValue); } } public static class DepartmentMapper extends Mapper<LongWritable, Text, Text, Text> { Text outKey = new Text(); Text outValue = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); // mapper 의 출력 key 를 dept_no 로 설정한다. outKey.set(split[0]); // 어느 mapper 에서 출력한 레코드인지 알 수 있도록 prefix 를 추가한다. outValue.set("d"+"\t"+split[1]); context.write(outKey, outValue); } } public static class ReduceSideJoinReducer extends Reducer<Text, Text, Text, Text> { Text outKey = new Text(); Text outValue = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { Map<String, String> employeeMap = new HashMap<>(); String deptName = "Not Found"; for (Text t : values) { String[] split = t.toString().split("\t"); // EmployeeMapper 에서 입력받은 데이터 if(split[0].equals("e")) { employeeMap.put(split[1], split[2]+"\t"+split[3]); } // DepartmentMapper 에서 입력받은 데이터 else if (split[0].equals("d")) { deptName = split[1]; } } // employeeMap 을 순회하며 deptName 을 join 한다. for (Map.Entry<String, String> e : employeeMap.entrySet()) { outKey.set(e.getKey()); outValue.set(e.getValue()+"\t"+deptName); context.write(outKey, outValue); } } } @Override public int run(String[] strings) throws Exception { Job job = Job.getInstance(getConf(), "DistributedCacheReduceSideJoin"); job.setJarByClass(DistCacheReduceSideJoin.class); job.setReducerClass(ReduceSideJoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // Mapper나 InputFormat이 서로 다른 하나 이상의 파일 경로로 입력을 받기 위해 MultipleInputs api를 사용한다. MultipleInputs.addInputPath(job, new Path(strings[0]), TextInputFormat.class, EmployeeMapper.class); MultipleInputs.addInputPath(job, new Path(strings[1]), TextInputFormat.class, DepartmentMapper.class); FileOutputFormat.setOutputPath(job, new Path(strings[2])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new DistCacheReduceSideJoin(), args); System.out.println(exitCode); } }
Java
복사
다음과 같은 명령어로 DistCacheReduceSideJoin 예제를 실행할 수 있다.
> hadoop jar share/hadoop/mapreduce/mapreduce-job-1.0-SNAPSHOT.jar com.fastcampus.hadoop.DistCacheReduceSideJoin /user/malachai/dataset/employees /user/malachai/dataset/departments /user/malachai/output/reducejoin > hadoop fs -cat /user/malachai/output/reducejoin/part-r-00000 ... 51876 Mamdouh M Customer Service 458661 Stepehn M Customer Service 230803 Dannz M Customer Service 216515 Zongyan M Customer Service 433386 Ayakannu M Customer Service
Bash
복사

Reduce-side Join

Reduce-side Join에서 Secondary Sort의 경우는 조금 더 복잡하다.
TextTupleWritable - 정렬하고자 하는 값을 포함하는 복합 키 클래스
KeyPartitioner - 정의한 복합키를 통해 어느 Reducer에게 전달될 지 결정하는 Partitioner 클래스
GroupComparator - Reduce에 입력될 값을 그룹핑 해주는 클래스 정의
KeyComparator - Reducer의 입력을 key를 기준으로 정렬해주기 위한 정렬 클래스
Reduce-side Join 후 사용자 정의 키를 기준으로 Secondary Sort를 수행하는 작업을 위해 작성할 코드는 다음과 같다. 이 예제는 HDFS의 dataset에 존재하는 departments, employees 데이터를 department_no 기준으로 join한 이후 department_no를 기준으로 정렬한다.
// TextTupleWritable.java package com.fastcampus.hadoop.key; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Map; import java.util.Objects; import java.util.function.Supplier; public class TextTupleWritable implements WritableComparable<TextTupleWritable> { private Text key; private Text value; public TextTupleWritable() { this.key = new Text(); this.value = new Text(); } public void set(Text key, Text value) { this.key = key; this.value = value; } public Text getKey() { return key; } public Text getValue() { return value; } @Override public int compareTo(TextTupleWritable o) { int cmp = key.compareTo(o.key); if (cmp!=0) return cmp; return value.compareTo(o.value); } @Override public void write(DataOutput dataOutput) throws IOException { key.write(dataOutput); value.write(dataOutput); } @Override public void readFields(DataInput dataInput) throws IOException { key.readFields(dataInput); value.readFields(dataInput); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; TextTupleWritable that = (TextTupleWritable) o; return Objects.equals(key, that.key) && Objects.equals(value, that.value); } @Override public int hashCode() { return Objects.hash(key, value); } @Override public String toString() { return "EntryWritable{" + "key=" + key + ", value=" + value + '}'; } }
Java
복사
// DistCacheReduceSideJoinCustomKey.java package com.fastcampus.hadoop; import com.fastcampus.hadoop.key.TextTupleWritable; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.Iterator; public class DistCacheReduceSideJoinCustomKey extends Configured implements Tool { enum DataType { DEPARTMENT("ENUM_DATATYPE_DEPARTMENT"), EMPLOYEE("ENUM_DATATYPE_EMPLOYEE"); private final String value; DataType(String value){ this.value = value; } public String value() { return value; } } // Paritioner 클래스 // Key 를 기준으로 전달할 Reducer 를 결정한다. public static class KeyPartitioner extends Partitioner<TextTupleWritable, Text> { // 파라미터는 순서대로 키, 값, 파티션 개수이다. // 반환값은 Partition의 번호이다. // EntryWritable의 key값이 같은 경우엔 같은 partition 번호를 반환한다. // Bitwise 연산을 한 이유는, partition 번호는 항상 양수여야 하기 떄문이다. @Override public int getPartition(TextTupleWritable key, Text value, int i) { return (key.getKey().hashCode() & Integer.MAX_VALUE) % i; } } // Grouping 클래스 // Reducer 에 전달 된 값들을 집계한다. // Modular 연산으로 인해 같은 Partitioner 에 다른 해시 값을 가진 key 가 들어올 수 있으므로, 실제 값과 비교하여 집계하는 클래스가 필요하다. // Key 가 같은 레코드들은 같은 reduce 함수에 입력된다. public static class GroupComparator extends WritableComparator { protected GroupComparator() { super(TextTupleWritable.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { TextTupleWritable t1 = (TextTupleWritable) a; TextTupleWritable t2 = (TextTupleWritable) b; return t1.getKey().compareTo(t2.getKey()); } } // Key 정렬 클래스 // 앞서 Grouping 된 레코드들을 정렬 해준다. public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(TextTupleWritable.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { TextTupleWritable t1 = (TextTupleWritable) a; TextTupleWritable t2 = (TextTupleWritable) b; int cmp = t1.getKey().compareTo(t2.getKey()); if (cmp!=0) return cmp; return t1.getValue().compareTo(t2.getValue()); } } // Employee Mapper 클래스 public static class EmployeeMapper extends Mapper<LongWritable, Text, TextTupleWritable, Text> { TextTupleWritable outKey = new TextTupleWritable(); Text outValue = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TextTupleWritable, Text>.Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); outKey.set(new Text(split[6]), new Text(DataType.EMPLOYEE.value())); outValue.set(split[0]+"\t"+split[2]+"\t"+split[4]+"\t"); context.write(outKey, outValue); } } // Department Mapper 클래스 public static class DepartmentMapper extends Mapper<LongWritable, Text, TextTupleWritable, Text> { TextTupleWritable outKey = new TextTupleWritable(); Text outValue = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TextTupleWritable, Text>.Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); outKey.set(new Text(split[0]), new Text(DataType.DEPARTMENT.value())); outValue.set(split[1]); context.write(outKey, outValue); } } public static class ReduceSideJoinReducer extends Reducer<TextTupleWritable, Text, Text, Text> { Text outKey = new Text(); Text outValue = new Text(); @Override protected void reduce(TextTupleWritable key, Iterable<Text> values, Reducer<TextTupleWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { Iterator<Text> iter = values.iterator(); // values는 Enum의 value 값을 기준으로 정렬되어 입력되기 때문에 첫 레코드의 값은 무조건 ENUM_DATATYPE_DEPARTMENT의 값이다. String departmentText = iter.next().toString(); // 이후의 값은 모두 ENUM_DATATYPE_EMPLOYEE의 값이다. while(iter.hasNext()) { Text employeeText = iter.next(); String[] employeeSplit = employeeText.toString().split("\t"); outKey.set(employeeSplit[0]); outValue.set(employeeSplit[1]+"\t"+employeeSplit[1]+"\t"+departmentText); context.write(outKey, outValue); } } } @Override public int run(String[] strings) throws Exception { Job job = Job.getInstance(getConf(), "ReduceSideJoinCustomKey"); job.setJarByClass(DistCacheReduceSideJoinCustomKey.class); job.setReducerClass(ReduceSideJoinReducer.class); job.setMapOutputKeyClass(TextTupleWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setPartitionerClass(KeyPartitioner.class); // Partitioner 클래스 job.setSortComparatorClass(KeyComparator.class); // 2차 정렬 클래스 job.setGroupingComparatorClass(GroupComparator.class); // Grouping 클래스 MultipleInputs.addInputPath(job, new Path(strings[0]), TextInputFormat.class, EmployeeMapper.class); MultipleInputs.addInputPath(job, new Path(strings[1]), TextInputFormat.class, DepartmentMapper.class); FileOutputFormat.setOutputPath(job, new Path(strings[2])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new DistCacheReduceSideJoinCustomKey(), args); System.out.println(exitCode); } }
Java
복사
다음과 같은 명령어로 DistCacheReduceSideJoin 예제를 실행할 수 있다.
> hadoop jar share/hadoop/mapreduce/mapreduce-job-1.0-SNAPSHOT.jar com.fastcampus.hadoop.DistCacheReduceSideJoinCustomKey /user/malachai/dataset/employees /user/malachai/dataset/departments /user/malachai/output/reducesortjoin > hadoop fs -head /user/malachai/output/reducesortjoin/part-r-00000 225998 Nakhoon Nakhoon Marketing 62864 Guozhong Guozhong Marketing 220036 Chriss Chriss Marketing 494457 Ashish Ashish Marketing 294165 Michaela Michaela Marketing ...
Bash
복사