Wednesday 10 June 2020

Automating OnCall Summary Page Generation


This is final part of 3 part series discussing automation of generating on call summary of PagerDuty incidents into a Notion page. In this blog we will discuss combining everything we did in last 2 parts and schedule it using K8 CronJob.

PagerDuty is designed to alert clients to disruptions and outages through machine learning and automation. Notion is a collaboration platform with markdown support that integrates kanban boards, tasks, wikis, and databases.

Let’s first create a docker image. Assume the python file for collecting incidents from PagerDuty (Part 1) is named get_pd_incidents.py and python file for creating notion page (Part 2) is named create_notion_page.py. A simple Dockerfile will look like:

FROM python:3.7.1

# Install required dependencies
RUN pip3 install --upgrade pip
RUN pip3 install requests
RUN pip3 install pdpyras
RUN pip3 install notion

# Bundle app source
COPY get_pd_incidents.py /src/get_pd_incidents.py
COPY create_notion_page.py /src/create_notion_page.py

CMD ["python3", "/src/get_pd_incidents.py"]
 
Make sure the files are present in your current working directory where Dockerfile is present. Build the docker image using:

docker build . -t <docker_hub>/on-call-summary:0.0.0

Now let’s schedule the application using K8 CronJob. 

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: on-call-summary
spec:
  schedule: “0 0 * * SUN"
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: on-call-summary
            image: <docker_hub>/on-call-summary:0.0.0

Save it to on-call-summary-cron.yml file and apply it using:

kubectl apply -f on-call-summary-cron.yml


Complete application code can be found in my GitHub.  

Tuesday 9 June 2020

Creating Notion Table/Database 


This is 2 of 3 part series discussing automation of generating on call summary of PagerDuty incidents into a Notion page. In this blog we will discuss about creating a Notion page with a database that will show the alert summary. Part 1 on this series can be found here.

PagerDuty is designed to alert clients to disruptions and outages through machine learning and automation. Notion is a collaboration platform with markdown support that integrates kanban boards, tasks, wikis, and databases.

When I write this blog, there is still no official API support provided by Notion. More details here

But don’t worry, there is an unofficial python API - https://pypi.org/project/notion/. This provides all necessary APIs we need. One caveat is that since there is no official API support yet, there is no standard way of obtaining API token. However, if you are logged into Notion, you can find this (token_v2) by inspecting the browser.

Once you obtain the token, we are ready to create a new Notion page.

client = NotionClient(token_v2=“XXXXXXXXXXXXXXXXX")
page = client.get_block("https://www.notion.so/yourorg/OnCallSummary-wfhwjfwcnccwlrhfow2486r9wn")
page2 = page.children.add_new(notion.block.PageBlock, icon="\U0001F9EF")

This will create a new page. Let’s now set the page title to today’s date.

page2.title = str(datetime.today().strftime('%Y-%m-%d'))

It’s time to create a table block inside the new page.

new_table = page2.children.add_new(notion.block.CollectionViewBlock)
new_table.collection = client.get_collection(
client.create_record("collection", parent=new_table, schema=get_collection_schema())
)
new_table.views.add_new(view_type="table")

where get_collection_schema() corresponds to: 

def get_collection_schema():
return {
"count": {"name": "Count", "type": "number"},
"action": {"name": "Remedies taken", "type": "text"},
"title": {"name": "Alert Name", "type": "title"},
"mttr": {"name": "MTTR", "type": "text"},
"notes": {"name": "Other Notes", "type": "text"},
"runbook": {"name": "Runbook", "type": "url"},
"=d{|": {
"name": "Service",
"type": "select",
"options": [
{
"color": "green",
"id": "695667ab-c776-43d1-3420-27f5611fcaty",
"value": "Service 1",
},
{
"color": "yellow",
"id": “452c7016-ef57-445a-90a6-64afadfb042d",
"value": "Service 2",
},
],
},
}

We have a blank table created. Next step is to populate data. (incidentSummary dict is generated as part of pulling incidents from PagerDuty)

    total_alerts = 0
    for alert in incidentSummary:
        row = new_table.collection.add_row()
        row.alert = str(alert)
        row.count = (incidentSummary.get(alert).get('count'))
        total_alerts = total_alerts + incidentSummary.get(alert).get('count')
        mttr_min = round((incidentSummary.get(alert).get('time')/incidentSummary.get(alert).get('count'))/60)
        if mttr_min > 59:
            mttr_hrs = mttr_min/60
            if mttr_hrs >= 24:
                row.mttr = str(round(mttr_hrs/24)) + " days"
            else:
                row.mttr = str(mttr_hrs) + " hrs"
        else:
            row.mttr = str(mttr_min) + " min"
        row.service = str(incidentSummary.get(alert).get('service'))
    new_table.title = "Total Alerts - " + str(total_alerts) 


and Voila! 

Complete application code can be found in my GitHub. In the next part we will discuss about scheduling this application as a K8 CronJob.

Monday 8 June 2020

Generating Pagerduty Incidents summary 


This is part 1 of 3 discussing automation of generating on call summary of PagerDuty incidents into a Notion page. In this blog we will discuss about a simple Python application to generate PagerDuty Incidents summary.

PagerDuty is designed to alert clients to disruptions and outages through machine learning and automation. Notion is a collaboration platform with markdown support that integrates kanban boards, tasks, wikis, and databases.

We will make use of pdpyras, a low-level PagerDuty REST API Client in Python.

The alert summary will include the list of incidents reported through PagerDuty for last X days. Number of occurrences of each alert, mean time to resolve (MTTR) and service name is also included in the summary.

We need to generate a PagerDuty API token if you do not have one already. Details can be found here. Also find out the service ID of services for which you plan to generate the summary. One way to easily find this is by checking the URL of particular service you are interested in. For example, PY1B8TY is the service ID in the following URL.  


response = requests.get('https://api.pagerduty.com/incidents?since='+since_time+
'&service_ids[]='+ PD_SERVICE_ID_1+'&service_ids[]='+ PD_SERVICE_ID_2+
'&limit=100', headers=headers, params=payload,)

where since_time is calculated as

since_time = (datetime.now() - timedelta(days = X_DAYS)).strftime('%Y-%m-%dT%H:%M:%SZ')

where X_DAYS is number of days for which we need to collect the summary.

headers contain

    headers = {
        'Authorization': 'Token token='+ PAGERDUTY_API_ACCESS_KEY,
        'Content-type': 'application/json',
    }

payload contain

    payload = {
        'status':'Resolved',
    }

The response is then parsed and processed to gather alerts, counts, MTTR, etc.

    incidentSummary = {}
    for incident in responseJson['incidents']:
        alert = incident['title'].replace("[FIRING:1] ","")
        starttime = datetime.strptime(incident['created_at'], '%Y-%m-%dT%H:%M:%SZ')
        endtime = datetime.strptime(incident['last_status_change_at'], '%Y-%m-%dT%H:%M:%SZ')
        mttr = (endtime - starttime).total_seconds()
        if alert in incidentSummary:
            incidentSummary[alert]['count'] = incidentSummary[alert]['count'] + 1
            incidentSummary[alert]['time'] = incidentSummary[alert]['time'] + mttr
        else:
            service = "Service 1"
            id = incident['service']['id']
            if id == PD_SERVICE_ID_2:
                service = "Service 2"
            incidentSummary[alert] = {"count":1,"time": mttr,"service": service}

Complete application code can be found in my GitHub. In the next part we will discuss about generating the summary page in Notion.




Tuesday 14 May 2019


Spark Logging + Change log level runtime


This blog explains a way to change the custom logging level in a spark on the fly without restarting the application. I will be using log4j2 logger in the sample code used.

Sample scala application:

package com.jaal
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.SparkSession

object ReadTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("ReadTest").master("yarn").getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    import spark.implicits._

    val sc = spark.sparkContext;
    import scala.io.Source
    val linesRead = spark.sparkContext.longAccumulator("LinesRead")

    var i = 0
    while(i < 25) {
      val loglevelFile = "/tmp/loglevel.out"
      var lev = ""
      for (line <- Source.fromFile(loglevelFile).getLines) {
        lev = line
      }
      val loglevelSeverityFile = "/tmp/loglevelSeverity.out"
      var levSer = 400
      for (line <- Source.fromFile(loglevelSeverityFile).getLines) {
        levSer = line.toInt
      }

      sc.parallelize(Seq(""), 100).foreachPartition(x => {
        import org.apache.logging.log4j.{LogManager, Level}
        import org.apache.logging.log4j.core.config.Configurator

        Configurator.setRootLevel(Level.forName(lev, levSer))
        Configurator.setLevel("streamingtest", Level.forName(lev, levSer))

        val log = LogManager.getRootLogger()
        log.info("STARTING EXECUTOR INFO LOGGING")
        log.warn("STARTING EXECUTOR WARN LOGGING")
        log.error("STARTING EXECUTOR ERROR LOGGING")
        log.fatal("STARTING EXECUTOR FATAL LOGGING")
        log.debug("STARTING EXECUTOR DEBUG LOGGING")
      })

      val in = spark.read.text("/tmp/test.txt")

      val inp = in.repartition(100)
      inp.show(false)
      linesRead.add(inp.count())
      val logger = LogManager.getLogger("streamingtest")

      import org.apache.spark.sql.functions.udf
      val uppUdf = udf { s: String => {
        val logger = LogManager.getLogger("streamingtest")
        logger.debug("Inside upp")
        logger.info("Inside upp")
        logger.warn("Inside upp")
        logger.error("Inside upp")
        logger.fatal("Inside upp")
        s.toUpperCase
      }
      }

      val upC = inp.withColumn("upper",uppUdf($"value"))
      upC.show(10)

      logger.info("AlwinInfo")
      logger.debug("AlwinDebug")
      logger.error("AlwinError")
      logger.fatal("AlwinFatal")
      Thread.sleep(10000)
      i = i+1
    }

    println(linesRead.value)
    println(linesRead.avg)
  }
}


The above application is good enough to explain and test the concept. All what the above application does is to read a file and convert the contents of it into upper case. This logic is running inside a loop that will terminate after executing 25 times. This is just to give us some breathing time to change the log level and verify that it has taken into effect.

The magic happens inside the following code snippet:

      sc.parallelize(Seq(""), 100).foreachPartition(x => {
        import org.apache.logging.log4j.{LogManager, Level}
        import org.apache.logging.log4j.core.config.Configurator

        Configurator.setRootLevel(Level.forName(lev, levSer))
        Configurator.setLevel("streamingtest", Level.forName(lev, levSer))

        val log = LogManager.getRootLogger()
        log.info("STARTING EXECUTOR INFO LOGGING")
        log.warn("STARTING EXECUTOR WARN LOGGING")
        log.error("STARTING EXECUTOR ERROR LOGGING")
        log.fatal("STARTING EXECUTOR FATAL LOGGING")
        log.debug("STARTING EXECUTOR DEBUG LOGGING")
      })

The above code snippet will set log level to ‘lev’ and ‘levSer’ across all the executor where this action is executed. You might have also noticed that the number of partitions for the above action is 100. You will need to adjust this number based on the number of executors that is running in the cluster. A safe bet is to set it to 2 times the number of executors you have so that it will run at least once in all of them. ‘lev’ and  ‘levSer’ is read from a local file as shown below:


      val loglevelFile = "/tmp/loglevel.out"
      var lev = ""
      for (line <- Source.fromFile(loglevelFile).getLines) {
        lev = line
      }
      val loglevelSeverityFile = "/tmp/loglevelSeverity.out"
      var levSer = 400
      for (line <- Source.fromFile(loglevelSeverityFile).getLines) {
        levSer = line.toInt
      }

So in this case, in order to change the log level of the application, you just need to change the level inside these files. More details about log4j2 log level can be found here: https://logging.apache.org/log4j/2.x/manual/customloglevels.html

Profiling Spark application


My daily day life involves working with many spark streaming/batch applications. As part of improving stability of our existing system, we decided to profile our applications. UBER's Engineering has open sourced this useful java profiler which we used to profile spark streaming/batch application.

Details about the JVM profiler can be found in UBER's blog - https://eng.uber.com/jvm-profiler/
In this blog, I will focus on how I used it and few useful insights we gathered from it.

Setup steps:

1. Clone and build JVM-Profiler locally (one time activity)

2. Restart the spark application adding necessary profiler arguments. In case of our profiling, I was interested in the following metrics:
    1. CPU use
    2. Memory use
    3. Method call duration
    4. Flamegraph



Hence, added the following arguments: 
--jars /root/jvm-profiler-1.0.0.jar --conf "spark.driver.extraJavaOptions=-javaagent:/root/jvm-profiler-1.0.0.jar=reporter=com.uber.profiling.reporters.FileOutputReporter,outputDir=/tmp/ProfilerOut,sampleInterval=2000,durationProfiling=com.jaal.method1” --conf "spark.executor.extraJavaOptions=-javaagent:/root/jvm-profiler-1.0.0.jar=reporter=com.uber.profiling.reporters.FileOutputReporter,outputDir=/tmp/ProfilerOut2,sampleInterval=2000,durationProfiling=com.jaal.method1"  


3. In our case, we wanted profiler to collect enough information and the terminate it. 
Allow the profiler to collect enough information. Turn off profiling by removing the extra arguments and re-deploy the application. 
The profiling information is collected every minute and written to local directory of each executor/driver.

4. Copied the profiling information into S3. (You can copy it to HDFS or any other distributed file system like MapRFS. If the profiling information is small enough, you can still let it live on the local disk.)

5. Use any framework/tool that can query JSON files. I used ‘Apache Drill’. 
For more details about Apache Drill:- https://drill.apache.org/

NOTE: To execute queries from command line in your local machine:

2. Go into 'bin' directory and then execute following command:
             
./sqlline -u jdbc:drill:drillbit=<Drill Bit Hostname>:31010


Information collected:

Following information were collected:

* CPU and Memory use of executors and driver (CpuAndMemory.json)
* Method duration and number of method calls (MethodDuration.json)
* Stacktrace information - can be converted to flamegraphs (Stacktrace.json)
* General process information (ProcessInfo.json)

A. CPU and Memory

‘CpuAndMemory.json’ provides some useful information such as:
* vmHWM memory used
* Heap memory used
* Non heap memory used
* Process CPU use
* System CPU use
* Code cache used
* Compressed class space used
* Garbage collection information:
    * Young GC / Full GC count
    * Time taken for Young GC and Full GC

B. Method Call Details

‘MethodDuration.json’ provides following insights: number of times a specified method was called
* Number of times the specified method was called
* Time taken for the method execution
* Maximum time taken by a method execution
* Average time for method execution

C. Stacktrace Information

Stacktrace will be collected on every specified interval. (sampleInterval=2000 - in ms)
This output can then be converted into flamegraph which will give useful insights on how CPU is utilized.

D. Process Information
‘ProcessInfo.json’ provides very general process information like command used, Xmx value, etc.

Some useful conclusions from the profiling output obtained:

* We saw that the maximum process CPU use was around 20 and system CPU use was around 27. This indicated that the CPU use was never utilized to its max or even half. This helped in reducing the number of cores given to each executor without impacting the performance.

* The GC details showed us that none of the executors were going into aggressive full GC frequently. Interestingly, the vmHWM was keeping increasing while the java heap was staying constant at ~2.2GB. This indicated that there might be some memory leak happening. This lead into heap dump analysis of the process. It indicated we had native memory leak.

* The flame graph showed a strange behavior where 'sun.nio.ch.EPollArrayWrapper.epollWait’ was consuming unusual high percent of CPU time. This lead to stack analysis of the process which In turn showed there were more than 20K threads running for the process with 99% of then being 'sun.nio.ch.EPollArrayWrapper.epollWait’. Native memory analysis using NMT showed that the thread pool memory was ever increasing. This again pointed out to memory leak issue.


Few SQL queries that helped in output analysis:

Query 1: Get the maximum process CPU utilization per executor
select processUuid as executor, round(max(processCpuLoad) * 100,2) as max_process_CPU_load_percent from <Path to CpuAndMemory.json> group by processUuid;

Query 2: Get the maximum system CPU utilization per executor
select processUuid as executor, round(max(systemCpuLoad) * 100,2) as max_system_CPU_load_percent from <Path to CpuAndMemory.json> group by processUuid;

Query 3: Get the maximum vmHWM per executor
select processUuid as executor,round(max(cast(vmHWM as double)/1024/1024/1024), 3) as max_vmHWM_GB from <Path to CpuAndMemory.json> group by processUuid;

Query4: Get the maximum heap memory per executor
select processUuid as executor, round(max(cast(heapMemoryTotalUsed as double)/1024/1024/1024),3) as max_HeapMemoryUsed_GB from <Path to CpuAndMemory.json> group by processUuid;

Query 5: Collect GC details
select processUuid as executor, max(gc[0].collectionCount) as youngGC_count, max(cast(gc[0].collectionTime as double)/1000) as youngGC_collection_time_sec, max(gc[1].collectionCount) as oldGC_count, max(cast(gc[1].collectionTime as double)/1000) as oldGC_collection_time_sec from <Path to CpuAndMemory.json> group by processUuid;

And many more!

Friday 12 October 2018

System Design - 'Here Now' service


Aim: 


Design a location service, where we can search for a user’s current location and also search for all users in a given location. Gaining insights about user location helps for better recommendations and predictive analysis.

Requirements:


1. The user should be able to check in their location.
2. Given a location (latitude/longitude), we should be able to find all users in the location.
3. The system should have a real-time search experience with very less latency.
4. The service should support heavy search load.
5. The system should be durable and highly available.

Scaling Estimation:


1. 200 Million check-in places
2. 100 Million active users
3. 20 Million daily check-ins
4. 30% growth every year

Database Schema:


A. Details for every check-in point – Location Table (Stored in MySQL):
1. LUID – Location Unique ID
2. Name - Location Name
3. Longitude
4. Latitude
5. Category – Breakfast/Lunch/Dinner/Coffee & Tea/Nightlife/Things to do
6. Description - Small description of the place


Location Table
PK
LUID
Int

Name
Varchar(32)

Longitude
Varchar(8)

Latitude
Varchar(8)

Category
Varchar(20)

Description
Varchar(300)








B. Details for every user – User Table (Stored in MySQL):
1. UUID – User Unique ID
2. Name – Name of the user
3. Email – email address of the user
4. DateOfBirth – Birth date of the user
5. CreationDate – Date when the user joined 
6. LastLogin – Last login information of the user

User Table
PK
UUID
Int

Name
Varchar(20)

Email
Varchar(32)

DateOfBirth
datetime

CreationDate
datetime

LastLogin
datetime









System APIs:


1. Search users in a given location: 
search(api_dev_key, location, radius_filter, max_number_of_results_to_return) api_dev_key : Mandatory. Developer API key. Used for multiple cases like throttling access based on the allowed quota.
location: Mandatory. Location where the search is performed
radius_filter: This is optional. To specify the search radius. By default, the radius is 0 meters. (Looks only the exact location).
max_number_of_results_to_return : Again optional. To specify the number of results to return. by default, return all users.


2. Check in to a location: 
checkin(user_id, location_uid, user_location_lattitude, user_location_longitude)


System Design:



1. MySQL solution: 
The user – location information will be tracked in ‘user_location_table’ in MySQL. We will insert a new row UUID, LUID, and CurrentTimestamp when the user checks in for the first time. If the user updates their check-in location, update the existing row.


user_location_table
PK
UUID
Int
LUID
Int

RecordTimestamp
datetime






A. High level query to get user location for a particular user: 
select name from location_table where LUID in (select LUID from user_location_table where UUID = <UUID1> and (CurrentTimestamp - RecordTimestamp)< 3 Hours)

B. High-level query to get all users in a particular location: 
select name from user_table where UUID in (select UUID from user_location_table where LUID = <LUID1> and (CurrentTimestamp - RecordTimestamp)< 3 Hours)

NOTE: If the data is huge, this will not return results in near real time.


2. Solution based on HBase (Preferred solution):
We can use HBase to store the check-in location of a user. We can use the ‘Time To Leave’ feature of HBase to delete the user check-in after 3 hours of check-in. The TTL can be set to 3 hours (10800 seconds).

HBase table creation:
create ‘user_check_in_info’,’c’, { TTL => 10800 }

Table structure: Row Key – <UUID> Column Qualifier – check_in_location (constant string) Column Value - <LUID/Location name>

If the same user checks in to a different location within 3 hours of the previous check-in, then we will update the same row with the new location. E.g.:

put ‘user_check_in_info’,’UUID1’,’c: check_in_location’,’LUID1/Location name 1’

Now the user is tagged to location ‘LUID1’. If the user check-in to a different location, then the row will be updated.

put ‘user_check_in_info’,’UUID1’,’c: check_in_location’,’LUID2/Location name 2’

A. To query the location of a particular user: 
This is a simple ‘get’ operation in HBase. get ‘user_check_in_info’,’UUID1’

B. To query all users in a given location: 
Since the HBase rowkey is UUID, to get all users specific to a given location, we will need to scan the entire HBase table and filter out all rows where check_in_location is the required location. This solution is not optimal. Query latency will increase as the data grows. This solution can be improved by caching the data in memory. The next section discusses one such solution.

Faster retrieval using GRIDs: 
Divide the whole map to small grids. Each gird is responsible for a group of locations that is within a specified range of latitude and longitude. As a result, we can restrict our query to a few grids which will reduce the amount of data queried. With help of given location and the optional radius, we can also find the neighboring grids that are of interests. Hence we can leverage the current system not only to get a list of users from a particular location but also from a location nearby to the given location.

The grid will be used to track the location of each user.


Deciding what should be the size of each grid: 
We can pre-divide and specify a fixed range of latitude and longitude for a given grid. But this solution will have problems when the certain area will be crowded (eg. places like San Francisco/ New York, etc.) while other areas might be sparsely populated (eg. Someplace in Antarctica!).

We can modify the solution to use dynamic grids. Here, the grid will be decided based on the number of check-in places. For example, we can create a new grid when the number of check-in place reach, say 500. In this way, each grid will have a more uniform amount of data. This can be implemented using the quadtree data structure.

Implementation: 
We will build a tree structure with four children - a QuadTree. Each grid or node will have details of users present in a particular location. Once the number of check-in places reaches the specified limit, (in our case, 500), the node will be divided to make four new children and the load is distributed. Hence, only the leaf nodes will have information about the users. To find the neighboring grids, we can connect all the leaf nodes using a doubly linked list.





How we build the QuadTree if the system crashes? 
We will scan through all the records in HBase ‘user_check_in_info’ table. Based on the location obtained for each user, we will decide which grid the user information should be kept. 

Memory requirement: We will store location ID (8 bytes), latitude (8 bytes), longitude (8 bytes), user ID (8 bytes), username (20 bytes) and check in timestamp (8 bytes) in the grid. Since we assume that there will be 20 million checks in every day, total memory requirement will be:

(8+8+8+8+20+8) *20M Bytes = 60*20M Bytes = 1145 MB

We can add few MBs of memory to hold the pointers (parent to children) required to build the QuadTree. In total let’s assume that we need roughly 1.5 GB of memory to hold the entire tree. This can easily fit into the memory of a single machine. However, for better performance and scalability we can shard data to multiple servers based on hash function of location ID. (Discussed in data sharding section).

What happens when a user creates a new check-in?

Whenever a new check-in is added by a user, we need to insert it into the HBase, as well as, in the QuadTree. Since the QuadTree is distributed among different servers, first we need to find the grid/server of the new check-in and then add it there.

What happens when user check in to location X and then check in to location Y?

We should track only the latest check-in by the user. We have designed our HBase table in such a way that it will track only the latest check-in. However, we need to track the same in our QuadTree. Hence, when we update HBase, first we get the previous location of the user from HBase (in this case X) and update the new location (in this case Y). We can do a ‘get’ operation from HBase to retrieve the previous location of the user.

get ‘user_check_in_info’,’UUID1’

With the help of previous location information obtained from the HBase table, we modify the QuadTree to delete the user from location X and add the user to location Y.

How to filter out users with check in time that lasted more than 3 hours?

Since we have set TTL, data will be purged from the HBase table after 3 hours of check-in. However, the data will still reside in the QuadTree data structure. Hence, when we retrieve the list of users for a particular location, we need to filter out for users who have checked in time longer than 3 hours. This can be implemented with help of check-in timestamp stored in QuadTree and the current timestamp. (Rule: current timestamp - check in timestamp < 3 hours)

Housekeeping: We can run a background chore that will periodically check for the records that have expired (>3 Hours) and then do a cleanup activity. This will be similar to compaction activity performed by HBase. This is required for disk space cleanup and better overall performance.

Data Sharding:


user_check_in_info HBase Table: We can shard data based on the hash of UUID. Ideally UUID will be monotonically increasing integer generated by MySQL. We need to hash the UUID to generate a uniform distribution of data across different servers. Also, based on the hash function, we can also create a pre-split HBase table which will help in better performance and load balancing.

users_in_location QuadTree: One solution is using a hash function to map each location ID to a server where we will store the location and the user present in the location. However, this can cause hot spotting if some location is crowded with users. For example, if we have some event in Levi’s Stadium, Santa Clara, then there might a large number of check in to the stadium. Hence, partitioning based on the hash of UUID is a better solution. However, this comes with an overhead of extra computation when we need to fetch users for a particular location. Since, now the data is sharded based on UUID and can reside in any of the available QuadTree servers, we need to fire the query to get results of users from a particular location to all the servers. Once each server responds with their set of users, then we need an aggregation service that will aggregate the results from all the individual servers and preset the complete output.

Fault Tolerance & Replication:

We would need replicas of these servers, so that if the primary dies the secondary can take control. We will store this data in a distributed file system like HDFS/MapRFS. By default, these filesystems will have data replication factor of 3. Also, we can make use of SSDs for faster IOs.

Disk space required to store data:


For user details: 
8 (UUID) + 20 (Name) + 32 (Email) + 8 (DateOfBirth) + 8 (CreationDate) + 8 (LastLogin) = 84 bytes per user 

For 100 million users: 
84*100M = ~8GB For location details: 8 (LUID) + 32 (name) + 8 (Latitude) + 8 (Longitude) + 20 (Category) + 300 (Description) = 374 bytes per location For 

200 million locations: 
374*200M = ~70GB

For user-location tracking table: 
8 bytes (UUID) + 8 bytes (LUID) = 16 bytes + 4 bytes (HBase metadata) For 20 million check ins: 20*20M = ~380MB

Hence, we need ~80GB of storage space for the first year. If we design a system for 10 years with year over year growth of 30%, we will need ~1110GB storage space. If we consider data replication of factor 3, then total storage required will be 1110*3GB = 3330GB.

High Level System Design: