I am new in mapreduce and hadoop (hadoop 3.2.3 and java 8). I am trying to separate some lines based on a symbol in a line. Example: “q1,a,q0,” should be return (‘a’,”q1,a,q0,”) as (key, value). My dataset contains ten(10) lines , five(5) for key ‘a’ and five for key ‘b’.
I expect to get 5 line for each key but i always get five for ‘a’ and 10 for ‘b’
Data
A,q0,a,q1;A,q0,b,q0;A,q1,a,q1;A,q1,b,q2;A,q2,a,q1;A,q2,b,q0;B,s0,a,s0;B,s0,b,s1;B,s1,a,s1;B,s1,b,s0
Mapper class:
import java.io.IOException; import org.apache.hadoop.io.ByteWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper<LongWritable, Text, ByteWritable ,Text>{ private ByteWritable key1 = new ByteWritable(); //private int n ; private int count =0 ; private Text wordObject = new Text(); @Override public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { String ftext = value.toString(); for (String line: ftext.split(";")) { wordObject = new Text(); if (line.split(",")[2].equals("b")) { key1.set((byte) 'b'); wordObject.set(line) ; context.write(key1,wordObject); continue ; } key1.set((byte) 'a'); wordObject.set(line) ; context.write(key1,wordObject); } } }
Reducer class:
import java.io.IOException; import org.apache.hadoop.io.ByteWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; public class MyReducer extends Reducer<ByteWritable, Text, ByteWritable ,Text>{ private Integer count=0 ; @Override public void reduce(ByteWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for(Text val : values ) { count++ ; } Text symb = new Text(count.toString()) ; context.write(key , symb); } }
Driver class:
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.ByteWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MyDriver extends Configured implements Tool { public int run(String[] args) throws Exception { if (args.length != 2) { System.out.printf("Usage: %s [generic options] <inputdir> <outputdir>n", getClass().getSimpleName()); return -1; } @SuppressWarnings("deprecation") Job job = new Job(getConf()); job.setJarByClass(MyDriver.class); job.setJobName("separation "); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(ByteWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(ByteWritable.class); job.setOutputValueClass(Text.class); boolean success = job.waitForCompletion(true); return success ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Configuration(), new MyDriver(), args); System.exit(exitCode); } }
Advertisement
Answer
The problem was solved by putting the variable “count” inside the function “Reduce()”.