Word Count Program using Crunch
WordCount Class
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* A word count example for Apache Crunch.
*/
public class WordCount extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new WordCount(), args);
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("USAGE:- <Input Path> <Output Path>")
GenericOptionsParser.printGenericCommandUsage(System.err);
return 1;
}
String inputPath = args[0];
String outputPath = args[1];
// Create an object to coordinate pipeline creation and execution.
Pipeline pipeline = new MRPipeline(WordCount.class);
PCollection<String> lines = pipeline.readTextFile(inputPath);
// Define a function that splits each line in a PCollection of Strings
// into a PCollection made up of the individual words in the file.
// The second argument sets the serialization format.
PCollection<String> words = lines.parallelDo(new Tokenizer(),
Writables.strings());
// The count method applies a series of Crunch primitives and returns
// a map of the unique words in the input PCollection to their counts.
PTable<String, Long> counts = words.count();
// Instruct the pipeline to write the resulting counts to a text file.
pipeline.writeTextFile(counts, outputPath);
// Execute the pipeline as a MapReduce.
pipeline.done();
return 0;
// return result.succeeded() ? 0 : 1;
}
}
Tokenizer Class
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
/**
* Splits a line of text, filtering known stop words.
*/
public class Tokenizer extends DoFn<String, String> {
// Tokenize the line using pipe as delimeter
@Override
public void process(String line, Emitter<String> emitter) {
String[] splitted = line.split("\\|");
for (String word : splitted) {
emitter.emit(word);
}
}
}