Tuesday 14 May 2019


Spark Logging + Change log level runtime


This blog explains a way to change the custom logging level in a spark on the fly without restarting the application. I will be using log4j2 logger in the sample code used.

Sample scala application:

package com.jaal
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.SparkSession

object ReadTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("ReadTest").master("yarn").getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    import spark.implicits._

    val sc = spark.sparkContext;
    import scala.io.Source
    val linesRead = spark.sparkContext.longAccumulator("LinesRead")

    var i = 0
    while(i < 25) {
      val loglevelFile = "/tmp/loglevel.out"
      var lev = ""
      for (line <- Source.fromFile(loglevelFile).getLines) {
        lev = line
      }
      val loglevelSeverityFile = "/tmp/loglevelSeverity.out"
      var levSer = 400
      for (line <- Source.fromFile(loglevelSeverityFile).getLines) {
        levSer = line.toInt
      }

      sc.parallelize(Seq(""), 100).foreachPartition(x => {
        import org.apache.logging.log4j.{LogManager, Level}
        import org.apache.logging.log4j.core.config.Configurator

        Configurator.setRootLevel(Level.forName(lev, levSer))
        Configurator.setLevel("streamingtest", Level.forName(lev, levSer))

        val log = LogManager.getRootLogger()
        log.info("STARTING EXECUTOR INFO LOGGING")
        log.warn("STARTING EXECUTOR WARN LOGGING")
        log.error("STARTING EXECUTOR ERROR LOGGING")
        log.fatal("STARTING EXECUTOR FATAL LOGGING")
        log.debug("STARTING EXECUTOR DEBUG LOGGING")
      })

      val in = spark.read.text("/tmp/test.txt")

      val inp = in.repartition(100)
      inp.show(false)
      linesRead.add(inp.count())
      val logger = LogManager.getLogger("streamingtest")

      import org.apache.spark.sql.functions.udf
      val uppUdf = udf { s: String => {
        val logger = LogManager.getLogger("streamingtest")
        logger.debug("Inside upp")
        logger.info("Inside upp")
        logger.warn("Inside upp")
        logger.error("Inside upp")
        logger.fatal("Inside upp")
        s.toUpperCase
      }
      }

      val upC = inp.withColumn("upper",uppUdf($"value"))
      upC.show(10)

      logger.info("AlwinInfo")
      logger.debug("AlwinDebug")
      logger.error("AlwinError")
      logger.fatal("AlwinFatal")
      Thread.sleep(10000)
      i = i+1
    }

    println(linesRead.value)
    println(linesRead.avg)
  }
}


The above application is good enough to explain and test the concept. All what the above application does is to read a file and convert the contents of it into upper case. This logic is running inside a loop that will terminate after executing 25 times. This is just to give us some breathing time to change the log level and verify that it has taken into effect.

The magic happens inside the following code snippet:

      sc.parallelize(Seq(""), 100).foreachPartition(x => {
        import org.apache.logging.log4j.{LogManager, Level}
        import org.apache.logging.log4j.core.config.Configurator

        Configurator.setRootLevel(Level.forName(lev, levSer))
        Configurator.setLevel("streamingtest", Level.forName(lev, levSer))

        val log = LogManager.getRootLogger()
        log.info("STARTING EXECUTOR INFO LOGGING")
        log.warn("STARTING EXECUTOR WARN LOGGING")
        log.error("STARTING EXECUTOR ERROR LOGGING")
        log.fatal("STARTING EXECUTOR FATAL LOGGING")
        log.debug("STARTING EXECUTOR DEBUG LOGGING")
      })

The above code snippet will set log level to ‘lev’ and ‘levSer’ across all the executor where this action is executed. You might have also noticed that the number of partitions for the above action is 100. You will need to adjust this number based on the number of executors that is running in the cluster. A safe bet is to set it to 2 times the number of executors you have so that it will run at least once in all of them. ‘lev’ and  ‘levSer’ is read from a local file as shown below:


      val loglevelFile = "/tmp/loglevel.out"
      var lev = ""
      for (line <- Source.fromFile(loglevelFile).getLines) {
        lev = line
      }
      val loglevelSeverityFile = "/tmp/loglevelSeverity.out"
      var levSer = 400
      for (line <- Source.fromFile(loglevelSeverityFile).getLines) {
        levSer = line.toInt
      }

So in this case, in order to change the log level of the application, you just need to change the level inside these files. More details about log4j2 log level can be found here: https://logging.apache.org/log4j/2.x/manual/customloglevels.html

No comments:

Post a Comment