Saturday 27 June 2015

Enhancements in Java 8 – Java Programming


A.      Lambda Expression
Aka – anonymous function – not bound to an identifier
1.       Used as arguments to higher-order functions
2.       Used to construct the result of a higher-order function that needs to return a function
Functionality that need to be used only for short term – e.g.: ‘closures’ and ‘currying’
a = ['house', 'car', 'bike']
a.sort (lambda x, y: cmp (len(x), len(y)))
print (a)
['car', 'bike', 'house']


B.      Default Methods

Aka – defender function – allows new methods to be added to interfaces without breaking the functionality of existing interface.
Allows interface to use as default in the situation where a concrete class fails to provide implementation for that method.

public interface oldInterface {
    public void existingMethod();
        default public void newDefaultMethod() {
        System.out.println("New default method"
              " is added in interface");
    }
}


How we can use default methods?
ü  Not override the default method and will inherit the default method.
ü  Override the default method similar to other methods we override in subclass.
ü  Redeclare default method as abstract, which force subclass to override it.


C.      Repeating Annotations
Allows same annotations to be used more than once to same declaration or type use. Prior to Java 8, to have repeated annotations, we need to group them into an annotation container.
        @Manufactures({
        @Manufacturer(name =”BMW”),
        @Manufacturer(name = “Range Rover”)
 
        })
        public class Car{
        //code goes in here
        }

Now with Java 8, for repeating annotations, we need not specify an annotation container.
        @Manufacturer(name = “BMW”)
        @Manufacturer(name= “Range Rover”)
        public class Car{
        //code goes in here
}


D.      Type Annotation
Java 7 allowed annotations to be written only on method formal parameters and declarations of packages, classes, methods, fields and local variables. Type annotations are annotations that can be used anywhere we use a type.


Examples:
@NotNull String str1 = ...
@Email String str2 = ...
@NotNull @NotBlank String str3 = ...

Java annotations are used to provide meta data for your Java code. Annotations are used for compiler instructions, build-time instructions and runtime instructions.

Type annotations are provided for stronger type checking.

Java 8 introduces annotations to ‘use’ of types.
·         Class instance creation expression:
new @Intered MyObject();

·         Type cast
mystring = (@NonNull String) str;

·         implements clause
class UnmodifiableList<T> implements
        @Readonly List<@Readonly T> { ... }

·         Thrown exception declaration
void monitorTemperature() throws
        @Critical TemperatureException { ... }




E.       Method References
Shortcuts that can be used anywhere we use lambdas. They are compact and more readable   form of a lambda expression for already written methods.  “::” operator is used for method reference.

Example:
interface IsReferable {
                public void referenceDemo();
                }

class ReferenceDemo {
public static void commonMethod()
{
System.out.println("This method is already defined.");
}

public void implement()
{ // Anonymous class.
IsReferable demoOne = new IsReferable() {
@Override
public void referenceDemo() {
ReferenceDemo.commonMethod();
}
};

demoOne.referenceDemo();

// Lambda implementaion.
IsReferable demo = () -> ReferenceDemo.commonMethod();
demo.referenceDemo();

// Method reference.
IsReferable demoTwo = ReferenceDemo::commonMethod;
demoTwo.referenceDemo();
}
}


F.       Type Interface - Generics
It is possible to infer the generic types from method signatures when passing a constructor as a parameter of a method. Java 7 does not use ‘target typing’, but Java 8 does. Target typing is a language feature wherein the type of the variable in which a result is to be stored influences the type of the computation.


Target typing example:
long MICRO_SECONDS_IN_DAY = 24 × 60 × 60 × 1000 × 1000; à Java 7 does not return what is        expected.

Type interface example:
public class TypeInference {
  public int getDictionarySize(Map<String, String> theDict) {
    return theDict.size();
  }
}


@Test
public void emptyDictionarySizeShouldBeZero() {
  TypeInference testObj = new TypeInference();

  Map<String, String> dict = new HashMap<>();

  int expected = 0;
  int actual = testObj.getDictionarySize(dict);

  assertEquals("Size is incorrect!", expected, actual);
}


@Test
public void emptyDictionarySizeShouldBeZero2() {
  TypeInference testObj = new TypeInference();

  int expected = 0;
  int actual = testObj.getDictionarySize(new HashMap<>());  //Java 7: Compile Error.
                                                            //Java 8: Better Type Inference

  assertEquals("Size is incorrect!", expected, actual);
}



G.     Method parameter reflection
We can obtain the names of the formal parameters of any method constructor with the method java.lang.reflect.Executable.getParameters. However, .class files do not store formal parameter names by default. To store formal parameter names in a particular .class file, and thus enable the Reflection API to retrieve formal parameter names, compile the source file with the -parameters option of the javac compiler.



H.     Collections – The new ‘java.util.stream’ package
Classes in the new java.util.stream package provide a Stream API to support functional-style operations on streams of elements. The Stream API is integrated into the Collections API, which enables bulk operations on collections, such as sequential or parallel map-reduce transformations.

Example:
                    int sum = widgets.stream()
                      .filter(b -> b.getColor() == RED)
                      .mapToInt(b -> b.getWeight())
                      .sum();

Here we use widgets, a Collection<Widget>, as a source for a stream, and then perform a filter-map-reduce on the stream to obtain the sum of the weights of the red widgets.

The key abstraction introduced in this package is stream. The classes Stream, IntStream, LongStream, and DoubleStream are streams over objects and the primitive int, long and double types. Streams differ from collections in several ways:

No storage. A stream is not a data structure that stores elements; instead, it conveys elements from a source such as a data structure, an array, a generator function, or an I/O channel, through a pipeline of computational operations.

Functional in nature. An operation on a stream produces a result, but does not modify its source.

Laziness-seeking. Many stream operations, such as filtering, mapping, or duplicate removal, can be implemented lazily, exposing opportunities for optimization.

Possibly unbounded. While collections have a finite size, streams need not.

Consumable. The elements of a stream are only visited once during the life of a stream. Like an Iterator, a new stream must be generated to revisit the same elements of the source.

Stream operations are divided into intermediate and terminal operations, and are combined to form stream pipelines. A stream pipeline consists of a source (such as a Collection, an array, a generator function, or an I/O channel); followed by zero or more intermediate operations such as Stream.filter or Stream.map; and a terminal operation such asStream.forEach or Stream.reduce.




I.        Collections - Performance Improvement for HashMaps with Key Collisions

Hash bins containing a large number of colliding keys improve performance by storing their entries in a balanced tree instead of a linked list. This JDK 8 change applies only to HashMap, LinkedHashMap, and ConcurrentHashMap.

In rare situations, this change could introduce a change to the iteration order of HashMap and HashSet. A particular iteration order is not specified for HashMap objects - any code that depends on iteration order should be fixed.

Java 8 is on average 20% faster than Java 7 in simple HashMap.get(). When a bucket becomes too big (currently: TREEIFY_THRESHOLD = 8), HashMap dynamically replaces it with an ad-hoc implementation of tree map.



References:


Saturday 20 June 2015

Apache HBase – Support for Medium Objects (MOBs)



Use case:

It is quite useful to save the binary data like images, documents into the HBase. The traditional database has the ability to save the MOB, for example, Oracle Database.
Apache HBase is technically designed to handle binary objects up to 10MB size. However, it is designed for storing data with <10K in each cell with low latency reads and writes. However, the performance can degrade when we use moderately sized objects (medium objects) 100K – 10MB. This is because of increasing I/O pressure created by compactions.


After Effects:

The increase in I/O pressure will lead to slower compactions, which eventually blocks memstore flushing and hence blocking updates. This will also increase the frequency of region splits reducing the availability of affected regions.


Characteristics of MOBs:

  •        Write intensive
  •        Data size is quite big
  •        Seldom deletes and updates
  •        Infrequent read (MOB data are accessed much less than the corresponding meta data)
  •        Stored along with metadata


MOB Design:


The key is to treat MOBs as a separate region. This separates the MOBs from normal region splits and compactions thus decreasing the I/O pressure.

The idea is HBase + HDFS in managing the data. The memstore caches the MOB files before they are flushed onto the disk. The MOBs are written to HFile called ‘MOB’ file which may contain multiple MOB objects. The meta data is stored in HBase and there is a reference column that links to MOB file. The meta data and MOB are stored in different column families.
In order to take advantage of HBase consistency feature we need to use the memstore flushing. If we save the MOBs directly into sequence file, then it will make compaction difficult and add load to HBase when updating pointers. The MOB data will not take part in split and compaction in HBase.

The actions that take place while writing the MOB data

The MOB data is written into KeyValue of MOB column. When the memstore is full, then MOB data is flushed to MOB files in the format of HFiles and metadata are flushed to StoreFiles. The values of MOB keyvalues are replaced by the path of MOB files. The MOB KeyValue in StoreFile have a tag that is a reference which links to MOB files.

The file path is /rootPath/tableName/.mob/columnFamilyName/${filename}


The chain of the random read against the MOB data
       1.       Find the metadata, and the path of the HFile in the metadata
       2.       Find the HFile by the path
       3.       Seek the KeyValue with the keyrowkey,columnFamily:column,ts in this HFile, and            retrieve it




MOB Files Cleaner and Sweep Tool
·         The MOB file cleaner cleans the expired MOB files.
·         The sweep tool uses a MapReduce job to clean the unused MOB data. This tool also sweeps the small MOB files to larger files.


How to take advantage of MOBs?

In order take advantage of MOB feature, we need to use HFile version 3. We need to edit the ‘hbase-site.xml’ and restart RegionServer. Changes will take place after major compaction.
<property>
  <name>hfile.format.version</name>
  <value>3</value>
</property>


Configuring Columns to Store MOBs
The properties to be set for handling MOB data are ‘IS_MOB’ and ‘MOB_THRESHOLD’. ‘IS_MOB’ specifies whether a column can store MOB data. The ‘MOB_THRESHOLD’ specifies the size of data to be considered as a MOB. Default is 100 bytes.
HBase Shell command:
hbase> create 'sample', {NAME => 'm', IS_MOB => true, MOB_THRESHOLD => 10500}
hbase> alter 'sample', {NAME => 'm', IS_MOB => true, MOB_THRESHOLD => 10500}



References:



Thursday 12 February 2015

Program to write the result of a WordCount operation to HBase using Crunch

import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.hbase.HBaseTarget;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * A program to persist the output of a word count program to HBase using Apache Crunch.
 */

public class WordCount extends Configured implements Tool {

public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new WordCount(), args);
}

public int run(String[] args) throws Exception {

if (args.length != 2) {
System.out.println("USAGE:- <Input Path> <Output Path>");
GenericOptionsParser.printGenericCommandUsage(System.err);
return 1;
}

String inputPath = args[0];
String outputPath = args[1];

// Create an object to coordinate pipeline creation and execution. Here we make use of
                // HBaseConfiguration.create() to create conf object.
Pipeline pipeline = new MRPipeline(WordCount.class,
HBaseConfiguration.create());

PCollection<String> lines = pipeline.readTextFile(inputPath);

// Define a function that splits each line in a PCollection of Strings
// into a PCollection made up of the individual words in the file.
// The second argument sets the serialization format.
PCollection<String> words = lines.parallelDo(new Tokenizer(),
Writables.strings());

// The count method applies a series of Crunch primitives and returns
// a map of the unique words in the input PCollection to their counts.
PTable<String, Long> counts = words.count();

// Instruct the pipeline to write the resulting counts to a text file.
pipeline.writeTextFile(counts, outputPath);

                // Create a PCollection of Put object to store the output to HBase
PCollection<Put> aPut = WordCount.splitLines(counts);

                // Writing the data to HBase table
pipeline.write(aPut, new HBaseTarget("table"));

// Execute the pipeline as a MapReduce.
pipeline.done();
return 0;

}


        SplitLines() to create list of Put objects


  public static PCollection<Put> splitLines(PTable<String, Long> lines) {
// This will work fine because the DoFn is defined inside of a static
// method.
return lines.parallelDo("Convert to puts",
new DoFn<Pair<String, Long>, Put>() {

@Override
public void process(Pair<String, Long> input,
Emitter<Put> emitter) {
Put put;
put = new Put(Bytes
.toBytes(((Pair<String, Long>) input).first()));
put.add(Bytes.toBytes("a"), Bytes.toBytes("Count"),
Bytes.toBytes(((Pair<String, Long>) input)
.second()));
emitter.emit(put);

}

}, Writables.writables(Put.class));
              }
          }


       Tokenizer Class


         import org.apache.crunch.DoFn;
         import org.apache.crunch.Emitter;

         public class Tokenizer extends DoFn<String, String> {
         // Tokenize the line using pipe as delimeter
        @Override
        public void process(String line, Emitter<String> emitter) {
              String[] splitted = line.split("\\|");
             for (String word : splitted) {
                  emitter.emit(word);
             }
            }
          }

Wednesday 28 January 2015

Word Count Program using Crunch

WordCount Class

import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * A word count example for Apache Crunch.
 */
public class WordCount extends Configured implements Tool {

public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new WordCount(), args);
}

public int run(String[] args) throws Exception {

if (args.length != 2) {
System.out.println("USAGE:- <Input Path> <Output Path>")
                        GenericOptionsParser.printGenericCommandUsage(System.err);
return 1;
}

String inputPath = args[0];
String outputPath = args[1];

// Create an object to coordinate pipeline creation and execution.
Pipeline pipeline = new MRPipeline(WordCount.class);

PCollection<String> lines = pipeline.readTextFile(inputPath);

// Define a function that splits each line in a PCollection of Strings
// into a PCollection made up of the individual words in the file.
// The second argument sets the serialization format.
PCollection<String> words = lines.parallelDo(new Tokenizer(),
Writables.strings());

// The count method applies a series of Crunch primitives and returns
// a map of the unique words in the input PCollection to their counts.
PTable<String, Long> counts = words.count();

// Instruct the pipeline to write the resulting counts to a text file.
pipeline.writeTextFile(counts, outputPath);

// Execute the pipeline as a MapReduce.
pipeline.done();
return 0;

// return result.succeeded() ? 0 : 1;
}
}



Tokenizer Class


import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;

/**
 * Splits a line of text, filtering known stop words.
 */
public class Tokenizer extends DoFn<String, String> {
         // Tokenize the line using pipe as delimeter
@Override
public void process(String line, Emitter<String> emitter) {
String[] splitted = line.split("\\|");
for (String word : splitted) {
emitter.emit(word);
}
}
}

Monday 19 January 2015

Working with Apache Crunch - Overview

Apache Crunch is a thin veneer on top of map reduce for developing easy and efficient map reduce pipelines. It aims to help the developers with a high level API for writing and testing complex map reduce programs that require multiple processing stages. Crunch is modeled after 'FlumeJava' by Google. 

What is advantageous about Crunch?

  • All the common functionality like JOIN, AGGREGATION, SORTING, etc., are available in Crunch API.
  • Crunch can process data from various input sources that confirm to different input types like serialized object formats, time series data, HBase rows and columns, etc. It does not impose a single data type.
  • Crunch provides type safety.
  • The 'MemPipeline' feature available makes testing a much easier process.
  • Crunch manages pipeline execution very easily and efficiently.


3 most important Crunch APIs

Crunch APIs are centered around 3 immutable distributed data sets.
  1. PCollection
  2. PTable
  3. PGroupedTable 


PCollection - Lazily evaluated parallel collection

PCollection<T> provides an immutable, distributed and unsorted collection of elements of type T.

          e.g.: PCollection<String>


PCollection provides a method called parallelDo, that apply the logic in DoFn parallel to all the elements present in source PCollection and provides a transformed PCollection object as the output. paralleDo allows element wise comparison over an input PCollection.


PTable - Sub interface of PCollection<Pair<K,V>>

PTable<K,V> provides an immutable, distributed and unsorted multimap of key type K and value type V.

          e.g.: PTable<String, String>

PTable provides parallelDo, groupByKey, join, cogroup operations.


PGroupedTable - Result of groupByKey function

PGroupedTable<K,V> is a distributed, sorted map of keys of type K to an iterable that may be iterated once. PGroupedTable<K,V> has parallelDo, combinedValues operations.



Input Data - Source
Output Data - Target
Data Container Abstraction  - PCollection
Data Format and Serialization - POJOs and PTypes
Data Transformation - DoFn

Friday 16 January 2015

'Bigdata In Our Palm' is a technology blog wherein I share the latest trends in big data industry. This is also a forum where I discuss about the various challenges I faced working with the Hadoop ecosystem.