Thursday 12 February 2015

Program to write the result of a WordCount operation to HBase using Crunch

import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.hbase.HBaseTarget;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * A program to persist the output of a word count program to HBase using Apache Crunch.
 */

public class WordCount extends Configured implements Tool {

public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new WordCount(), args);
}

public int run(String[] args) throws Exception {

if (args.length != 2) {
System.out.println("USAGE:- <Input Path> <Output Path>");
GenericOptionsParser.printGenericCommandUsage(System.err);
return 1;
}

String inputPath = args[0];
String outputPath = args[1];

// Create an object to coordinate pipeline creation and execution. Here we make use of
                // HBaseConfiguration.create() to create conf object.
Pipeline pipeline = new MRPipeline(WordCount.class,
HBaseConfiguration.create());

PCollection<String> lines = pipeline.readTextFile(inputPath);

// Define a function that splits each line in a PCollection of Strings
// into a PCollection made up of the individual words in the file.
// The second argument sets the serialization format.
PCollection<String> words = lines.parallelDo(new Tokenizer(),
Writables.strings());

// The count method applies a series of Crunch primitives and returns
// a map of the unique words in the input PCollection to their counts.
PTable<String, Long> counts = words.count();

// Instruct the pipeline to write the resulting counts to a text file.
pipeline.writeTextFile(counts, outputPath);

                // Create a PCollection of Put object to store the output to HBase
PCollection<Put> aPut = WordCount.splitLines(counts);

                // Writing the data to HBase table
pipeline.write(aPut, new HBaseTarget("table"));

// Execute the pipeline as a MapReduce.
pipeline.done();
return 0;

}


        SplitLines() to create list of Put objects


  public static PCollection<Put> splitLines(PTable<String, Long> lines) {
// This will work fine because the DoFn is defined inside of a static
// method.
return lines.parallelDo("Convert to puts",
new DoFn<Pair<String, Long>, Put>() {

@Override
public void process(Pair<String, Long> input,
Emitter<Put> emitter) {
Put put;
put = new Put(Bytes
.toBytes(((Pair<String, Long>) input).first()));
put.add(Bytes.toBytes("a"), Bytes.toBytes("Count"),
Bytes.toBytes(((Pair<String, Long>) input)
.second()));
emitter.emit(put);

}

}, Writables.writables(Put.class));
              }
          }


       Tokenizer Class


         import org.apache.crunch.DoFn;
         import org.apache.crunch.Emitter;

         public class Tokenizer extends DoFn<String, String> {
         // Tokenize the line using pipe as delimeter
        @Override
        public void process(String line, Emitter<String> emitter) {
              String[] splitted = line.split("\\|");
             for (String word : splitted) {
                  emitter.emit(word);
             }
            }
          }