Wednesday 20 September 2017

HBase Memstore Flush - Part 2


Aim:


Aim of this blog is to discuss various scenarios which will lead to memstore flushes in HBase.

Any put operation to HBase goes to memstore (in memory). It is also written to WAL by default. There is one memstore per column family per region per regionserver per HBase table. When certain threshold is reached memstore is flushed.

The threshold can be mainly categorized into two:
[A] Size based
[B] Time based

This blog focuses on time based memstore flushes. My previous blog (HBase Memstore Flush - Part 1) discusses about size based memstore flushes.

Time based memstore flushes:


Memstore is also flushed periodically. The flushing interval in time based memstore flush is controlled by 'hbase.regionserver.optionalcacheflushinterval' set in hbase-site.xml. If nothing is set, the default value - 3600000ms (1 hour) is taken. Periodic memstore flushes will help in freeing up regionserver memory. However, more number of small memstore flushes, the more number of minor compaction. Hence depending on the application running on HBase, we need to tune the parameter. Setting 'hbase.regionserver.optionalcacheflushinterval' to negative value will disable periodic memstore flushes.

Periodic memstore flushes are introduced as part of https://issues.apache.org/jira/browse/HBASE-5930 

Following is part of HBase source code that performs the same:

static class PeriodicMemstoreFlusher extends ScheduledChore {
    final HRegionServer server;
    final static int RANGE_OF_DELAY = 20000; //millisec
    final static int MIN_DELAY_TIME = 3000; //millisec
    public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
      super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
      this.server = server;
    }

    @Override
    protected void chore() {
      for (Region r : this.server.onlineRegions.values()) {
        if (r == null)
          continue;
        if (((HRegion)r).shouldFlush()) {
          FlushRequester requester = server.getFlushRequester();
          if (requester != null) {
            long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
            LOG.info(getName() + " requesting flush for region " +
              r.getRegionInfo().getRegionNameAsString() + " after a delay of " + randomDelay);
            //Throttle the flushes by putting a delay. If we don't throttle, and there
            //is a balanced write-load on the regions in a table, we might end up
            //overwhelming the filesystem with too many flushes at once.
            requester.requestDelayedFlush(r, randomDelay, false);
          }
        }
      }
    }
  }

Following is the definition for shouldFlush():

boolean shouldFlush() {
    // This is a rough measure.
    if (this.maxFlushedSeqId > 0
          && (this.maxFlushedSeqId + this.flushPerChanges < this.sequenceId.get())) {
      return true;
    }
    long modifiedFlushCheckInterval = flushCheckInterval;
    if (getRegionInfo().isMetaRegion() &&
        getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
      modifiedFlushCheckInterval = META_CACHE_FLUSH_INTERVAL;
    }
    if (modifiedFlushCheckInterval <= 0) { //disabled
      return false;
    }
    long now = EnvironmentEdgeManager.currentTime();
    //if we flushed in the recent past, we don't need to do again now
    if ((now - getEarliestFlushTimeForAllStores() < modifiedFlushCheckInterval)) {
      return false;
    }
    //since we didn't flush in the recent past, flush now if certain conditions
    //are met. Return true on first such memstore hit.
    for (Store s : getStores()) {
      if (s.timeOfOldestEdit() < now - modifiedFlushCheckInterval) {
        // we have an old enough edit in the memstore, flush
        return true;
      }
    }
    return false;
  }

'flushCheckInterval' is set from following properties:

this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,DEFAULT_CACHE_FLUSH_INTERVAL);

where 

public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL = "hbase.regionserver.optionalcacheflushinterval";
public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;

The periodic flush chore will be invoked based on 'hbase.server.thread.wakefrequency' value. Default value is 10000ms.

No comments:

Post a Comment