Tuesday 14 May 2019


Spark Logging + Change log level runtime


This blog explains a way to change the custom logging level in a spark on the fly without restarting the application. I will be using log4j2 logger in the sample code used.

Sample scala application:

package com.jaal
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.SparkSession

object ReadTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("ReadTest").master("yarn").getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    import spark.implicits._

    val sc = spark.sparkContext;
    import scala.io.Source
    val linesRead = spark.sparkContext.longAccumulator("LinesRead")

    var i = 0
    while(i < 25) {
      val loglevelFile = "/tmp/loglevel.out"
      var lev = ""
      for (line <- Source.fromFile(loglevelFile).getLines) {
        lev = line
      }
      val loglevelSeverityFile = "/tmp/loglevelSeverity.out"
      var levSer = 400
      for (line <- Source.fromFile(loglevelSeverityFile).getLines) {
        levSer = line.toInt
      }

      sc.parallelize(Seq(""), 100).foreachPartition(x => {
        import org.apache.logging.log4j.{LogManager, Level}
        import org.apache.logging.log4j.core.config.Configurator

        Configurator.setRootLevel(Level.forName(lev, levSer))
        Configurator.setLevel("streamingtest", Level.forName(lev, levSer))

        val log = LogManager.getRootLogger()
        log.info("STARTING EXECUTOR INFO LOGGING")
        log.warn("STARTING EXECUTOR WARN LOGGING")
        log.error("STARTING EXECUTOR ERROR LOGGING")
        log.fatal("STARTING EXECUTOR FATAL LOGGING")
        log.debug("STARTING EXECUTOR DEBUG LOGGING")
      })

      val in = spark.read.text("/tmp/test.txt")

      val inp = in.repartition(100)
      inp.show(false)
      linesRead.add(inp.count())
      val logger = LogManager.getLogger("streamingtest")

      import org.apache.spark.sql.functions.udf
      val uppUdf = udf { s: String => {
        val logger = LogManager.getLogger("streamingtest")
        logger.debug("Inside upp")
        logger.info("Inside upp")
        logger.warn("Inside upp")
        logger.error("Inside upp")
        logger.fatal("Inside upp")
        s.toUpperCase
      }
      }

      val upC = inp.withColumn("upper",uppUdf($"value"))
      upC.show(10)

      logger.info("AlwinInfo")
      logger.debug("AlwinDebug")
      logger.error("AlwinError")
      logger.fatal("AlwinFatal")
      Thread.sleep(10000)
      i = i+1
    }

    println(linesRead.value)
    println(linesRead.avg)
  }
}


The above application is good enough to explain and test the concept. All what the above application does is to read a file and convert the contents of it into upper case. This logic is running inside a loop that will terminate after executing 25 times. This is just to give us some breathing time to change the log level and verify that it has taken into effect.

The magic happens inside the following code snippet:

      sc.parallelize(Seq(""), 100).foreachPartition(x => {
        import org.apache.logging.log4j.{LogManager, Level}
        import org.apache.logging.log4j.core.config.Configurator

        Configurator.setRootLevel(Level.forName(lev, levSer))
        Configurator.setLevel("streamingtest", Level.forName(lev, levSer))

        val log = LogManager.getRootLogger()
        log.info("STARTING EXECUTOR INFO LOGGING")
        log.warn("STARTING EXECUTOR WARN LOGGING")
        log.error("STARTING EXECUTOR ERROR LOGGING")
        log.fatal("STARTING EXECUTOR FATAL LOGGING")
        log.debug("STARTING EXECUTOR DEBUG LOGGING")
      })

The above code snippet will set log level to ‘lev’ and ‘levSer’ across all the executor where this action is executed. You might have also noticed that the number of partitions for the above action is 100. You will need to adjust this number based on the number of executors that is running in the cluster. A safe bet is to set it to 2 times the number of executors you have so that it will run at least once in all of them. ‘lev’ and  ‘levSer’ is read from a local file as shown below:


      val loglevelFile = "/tmp/loglevel.out"
      var lev = ""
      for (line <- Source.fromFile(loglevelFile).getLines) {
        lev = line
      }
      val loglevelSeverityFile = "/tmp/loglevelSeverity.out"
      var levSer = 400
      for (line <- Source.fromFile(loglevelSeverityFile).getLines) {
        levSer = line.toInt
      }

So in this case, in order to change the log level of the application, you just need to change the level inside these files. More details about log4j2 log level can be found here: https://logging.apache.org/log4j/2.x/manual/customloglevels.html

Profiling Spark application


My daily day life involves working with many spark streaming/batch applications. As part of improving stability of our existing system, we decided to profile our applications. UBER's Engineering has open sourced this useful java profiler which we used to profile spark streaming/batch application.

Details about the JVM profiler can be found in UBER's blog - https://eng.uber.com/jvm-profiler/
In this blog, I will focus on how I used it and few useful insights we gathered from it.

Setup steps:

1. Clone and build JVM-Profiler locally (one time activity)

2. Restart the spark application adding necessary profiler arguments. In case of our profiling, I was interested in the following metrics:
    1. CPU use
    2. Memory use
    3. Method call duration
    4. Flamegraph



Hence, added the following arguments: 
--jars /root/jvm-profiler-1.0.0.jar --conf "spark.driver.extraJavaOptions=-javaagent:/root/jvm-profiler-1.0.0.jar=reporter=com.uber.profiling.reporters.FileOutputReporter,outputDir=/tmp/ProfilerOut,sampleInterval=2000,durationProfiling=com.jaal.method1” --conf "spark.executor.extraJavaOptions=-javaagent:/root/jvm-profiler-1.0.0.jar=reporter=com.uber.profiling.reporters.FileOutputReporter,outputDir=/tmp/ProfilerOut2,sampleInterval=2000,durationProfiling=com.jaal.method1"  


3. In our case, we wanted profiler to collect enough information and the terminate it. 
Allow the profiler to collect enough information. Turn off profiling by removing the extra arguments and re-deploy the application. 
The profiling information is collected every minute and written to local directory of each executor/driver.

4. Copied the profiling information into S3. (You can copy it to HDFS or any other distributed file system like MapRFS. If the profiling information is small enough, you can still let it live on the local disk.)

5. Use any framework/tool that can query JSON files. I used ‘Apache Drill’. 
For more details about Apache Drill:- https://drill.apache.org/

NOTE: To execute queries from command line in your local machine:

2. Go into 'bin' directory and then execute following command:
             
./sqlline -u jdbc:drill:drillbit=<Drill Bit Hostname>:31010


Information collected:

Following information were collected:

* CPU and Memory use of executors and driver (CpuAndMemory.json)
* Method duration and number of method calls (MethodDuration.json)
* Stacktrace information - can be converted to flamegraphs (Stacktrace.json)
* General process information (ProcessInfo.json)

A. CPU and Memory

‘CpuAndMemory.json’ provides some useful information such as:
* vmHWM memory used
* Heap memory used
* Non heap memory used
* Process CPU use
* System CPU use
* Code cache used
* Compressed class space used
* Garbage collection information:
    * Young GC / Full GC count
    * Time taken for Young GC and Full GC

B. Method Call Details

‘MethodDuration.json’ provides following insights: number of times a specified method was called
* Number of times the specified method was called
* Time taken for the method execution
* Maximum time taken by a method execution
* Average time for method execution

C. Stacktrace Information

Stacktrace will be collected on every specified interval. (sampleInterval=2000 - in ms)
This output can then be converted into flamegraph which will give useful insights on how CPU is utilized.

D. Process Information
‘ProcessInfo.json’ provides very general process information like command used, Xmx value, etc.

Some useful conclusions from the profiling output obtained:

* We saw that the maximum process CPU use was around 20 and system CPU use was around 27. This indicated that the CPU use was never utilized to its max or even half. This helped in reducing the number of cores given to each executor without impacting the performance.

* The GC details showed us that none of the executors were going into aggressive full GC frequently. Interestingly, the vmHWM was keeping increasing while the java heap was staying constant at ~2.2GB. This indicated that there might be some memory leak happening. This lead into heap dump analysis of the process. It indicated we had native memory leak.

* The flame graph showed a strange behavior where 'sun.nio.ch.EPollArrayWrapper.epollWait’ was consuming unusual high percent of CPU time. This lead to stack analysis of the process which In turn showed there were more than 20K threads running for the process with 99% of then being 'sun.nio.ch.EPollArrayWrapper.epollWait’. Native memory analysis using NMT showed that the thread pool memory was ever increasing. This again pointed out to memory leak issue.


Few SQL queries that helped in output analysis:

Query 1: Get the maximum process CPU utilization per executor
select processUuid as executor, round(max(processCpuLoad) * 100,2) as max_process_CPU_load_percent from <Path to CpuAndMemory.json> group by processUuid;

Query 2: Get the maximum system CPU utilization per executor
select processUuid as executor, round(max(systemCpuLoad) * 100,2) as max_system_CPU_load_percent from <Path to CpuAndMemory.json> group by processUuid;

Query 3: Get the maximum vmHWM per executor
select processUuid as executor,round(max(cast(vmHWM as double)/1024/1024/1024), 3) as max_vmHWM_GB from <Path to CpuAndMemory.json> group by processUuid;

Query4: Get the maximum heap memory per executor
select processUuid as executor, round(max(cast(heapMemoryTotalUsed as double)/1024/1024/1024),3) as max_HeapMemoryUsed_GB from <Path to CpuAndMemory.json> group by processUuid;

Query 5: Collect GC details
select processUuid as executor, max(gc[0].collectionCount) as youngGC_count, max(cast(gc[0].collectionTime as double)/1000) as youngGC_collection_time_sec, max(gc[1].collectionCount) as oldGC_count, max(cast(gc[1].collectionTime as double)/1000) as oldGC_collection_time_sec from <Path to CpuAndMemory.json> group by processUuid;

And many more!