spark HiveThriftServer2Listener 源码

  • 2022-10-20
  • 浏览 (211)

spark HiveThriftServer2Listener 代码

文件路径:/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.hive.thriftserver.ui

import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.hive.service.server.HiveServer2

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
import org.apache.spark.scheduler._
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity}

/**
 * An inner sparkListener called in sc.stop to clean up the HiveThriftServer2
 */
private[thriftserver] class HiveThriftServer2Listener(
    kvstore: ElementTrackingStore,
    sparkConf: SparkConf,
    server: Option[HiveServer2],
    live: Boolean = true) extends SparkListener with Logging {

  private val sessionList = new ConcurrentHashMap[String, LiveSessionData]()
  private val executionList = new ConcurrentHashMap[String, LiveExecutionData]()

  private val (retainedStatements: Int, retainedSessions: Int) = {
    (sparkConf.get(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT),
      sparkConf.get(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT))
  }

  // How often to update live entities. -1 means "never update" when replaying applications,
  // meaning only the last write will happen. For live applications, this avoids a few
  // operations that we can live without when rapidly processing incoming events.
  private val liveUpdatePeriodNs = if (live) sparkConf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L

  // Returns true if this listener has no live data. Exposed for tests only.
  private[thriftserver] def noLiveData(): Boolean = {
    sessionList.isEmpty && executionList.isEmpty
  }

  kvstore.addTrigger(classOf[SessionInfo], retainedSessions) { count =>
    cleanupSession(count)
  }

  kvstore.addTrigger(classOf[ExecutionInfo], retainedStatements) { count =>
    cleanupExecutions(count)
  }

  kvstore.onFlush {
    if (!live) {
      flush((entity: LiveEntity) => updateStoreWithTriggerEnabled(entity))
    }
  }

  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
    if (live) {
      server.foreach(_.stop())
    }
  }

  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    val properties = jobStart.properties
    if (properties != null) {
      val groupId = properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
      if (groupId != null) {
        updateJobDetails(jobStart.jobId.toString, groupId)
        }
      }
    }

  private def updateJobDetails(jobId: String, groupId: String): Unit = {
    val execList = executionList.values().asScala.filter(_.groupId == groupId).toSeq
    if (execList.nonEmpty) {
      execList.foreach { exec =>
        exec.jobId += jobId
        updateLiveStore(exec)
      }
    } else {
      // It may possible that event reordering happens, such a way that JobStart event come after
      // Execution end event (Refer SPARK-27019). To handle that situation, if occurs in
      // Thriftserver, following code will take care. Here will come only if JobStart event comes
      // after Execution End event.
      val storeExecInfo = KVUtils.viewToSeq(
        kvstore.view(classOf[ExecutionInfo]), Int.MaxValue)(_.groupId == groupId)
      storeExecInfo.foreach { exec =>
        val liveExec = getOrCreateExecution(exec.execId, exec.statement, exec.sessionId,
          exec.startTimestamp, exec.userName)
        liveExec.jobId += jobId
        updateStoreWithTriggerEnabled(liveExec)
        executionList.remove(liveExec.execId)
      }
    }
  }

  override def onOtherEvent(event: SparkListenerEvent): Unit = {
    event match {
      case e: SparkListenerThriftServerSessionCreated => onSessionCreated(e)
      case e: SparkListenerThriftServerSessionClosed => onSessionClosed(e)
      case e: SparkListenerThriftServerOperationStart => onOperationStart(e)
      case e: SparkListenerThriftServerOperationParsed => onOperationParsed(e)
      case e: SparkListenerThriftServerOperationCanceled => onOperationCanceled(e)
      case e: SparkListenerThriftServerOperationTimeout => onOperationTimeout(e)
      case e: SparkListenerThriftServerOperationError => onOperationError(e)
      case e: SparkListenerThriftServerOperationFinish => onOperationFinished(e)
      case e: SparkListenerThriftServerOperationClosed => onOperationClosed(e)
      case _ => // Ignore
    }
  }

  private def onSessionCreated(e: SparkListenerThriftServerSessionCreated): Unit = {
    val session = getOrCreateSession(e.sessionId, e.startTime, e.ip, e.userName)
    sessionList.put(e.sessionId, session)
    updateLiveStore(session)
  }

  private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit =
    Option(sessionList.get(e.sessionId)) match {
      case Some(sessionData) =>
        sessionData.finishTimestamp = e.finishTime
        updateStoreWithTriggerEnabled(sessionData)
        sessionList.remove(e.sessionId)
      case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}")
    }

  private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = {
    val executionData = getOrCreateExecution(
      e.id,
      e.statement,
      e.sessionId,
      e.startTime,
      e.userName)

    executionData.state = ExecutionState.STARTED
    executionList.put(e.id, executionData)
    executionData.groupId = e.groupId
    updateLiveStore(executionData)

    Option(sessionList.get(e.sessionId)) match {
      case Some(sessionData) =>
        sessionData.totalExecution += 1
        updateLiveStore(sessionData)
      case None => logWarning(s"onOperationStart called with unknown session id: ${e.sessionId}." +
        s"Regardless, the operation has been registered.")
    }
  }

  private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit =
    Option(executionList.get(e.id)) match {
      case Some(executionData) =>
        executionData.executePlan = e.executionPlan
        executionData.state = ExecutionState.COMPILED
        updateLiveStore(executionData)
      case None => logWarning(s"onOperationParsed called with unknown operation id: ${e.id}")
    }

  private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit =
    Option(executionList.get(e.id)) match {
      case Some(executionData) =>
        executionData.finishTimestamp = e.finishTime
        executionData.state = ExecutionState.CANCELED
        updateLiveStore(executionData)
      case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}")
    }

  private def onOperationTimeout(e: SparkListenerThriftServerOperationTimeout): Unit =
    Option(executionList.get(e.id)) match {
      case Some(executionData) =>
        executionData.finishTimestamp = e.finishTime
        executionData.state = ExecutionState.TIMEDOUT
        updateLiveStore(executionData)
      case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}")
    }

  private def onOperationError(e: SparkListenerThriftServerOperationError): Unit =
    Option(executionList.get(e.id)) match {
      case Some(executionData) =>
        executionData.finishTimestamp = e.finishTime
        executionData.detail = e.errorMsg
        executionData.state = ExecutionState.FAILED
        updateLiveStore(executionData)
      case None => logWarning(s"onOperationError called with unknown operation id: ${e.id}")
    }

  private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit =
    Option(executionList.get(e.id)) match {
      case Some(executionData) =>
        executionData.finishTimestamp = e.finishTime
        executionData.state = ExecutionState.FINISHED
        updateLiveStore(executionData)
      case None => logWarning(s"onOperationFinished called with unknown operation id: ${e.id}")
    }

  private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit =
    Option(executionList.get(e.id)) match {
      case Some(executionData) =>
        executionData.closeTimestamp = e.closeTime
        executionData.state = ExecutionState.CLOSED
        updateStoreWithTriggerEnabled(executionData)
        executionList.remove(e.id)
      case None => logWarning(s"onOperationClosed called with unknown operation id: ${e.id}")
    }

  // Update both live and history stores. Trigger is enabled by default, hence
  // it will cleanup the entity which exceeds the threshold.
  def updateStoreWithTriggerEnabled(entity: LiveEntity): Unit = {
    entity.write(kvstore, System.nanoTime(), checkTriggers = true)
  }

  // Update only live stores. If trigger is enabled, it will cleanup entity
  // which exceeds the threshold.
  def updateLiveStore(entity: LiveEntity, trigger: Boolean = false): Unit = {
    val now = System.nanoTime()
    if (live && liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs) {
      entity.write(kvstore, now, checkTriggers = trigger)
    }
  }

  /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */
  private def flush(entityFlushFunc: LiveEntity => Unit): Unit = {
    sessionList.values.asScala.foreach(entityFlushFunc)
    executionList.values.asScala.foreach(entityFlushFunc)
  }

  private def getOrCreateSession(
     sessionId: String,
     startTime: Long,
     ip: String,
     username: String): LiveSessionData = {
    sessionList.computeIfAbsent(sessionId,
      (_: String) => new LiveSessionData(sessionId, startTime, ip, username))
  }

  private def getOrCreateExecution(
    execId: String, statement: String,
    sessionId: String, startTimestamp: Long,
    userName: String): LiveExecutionData = {
    executionList.computeIfAbsent(execId,
      (_: String) => new LiveExecutionData(execId, statement, sessionId, startTimestamp, userName))
  }

  private def cleanupExecutions(count: Long): Unit = {
    val countToDelete = calculateNumberToRemove(count, retainedStatements)
    if (countToDelete <= 0L) {
      return
    }
    val view = kvstore.view(classOf[ExecutionInfo]).index("finishTime").first(0L)
    val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
      j.finishTimestamp != 0
    }
    toDelete.foreach { j => kvstore.delete(j.getClass, j.execId) }
  }

  private def cleanupSession(count: Long): Unit = {
    val countToDelete = calculateNumberToRemove(count, retainedSessions)
    if (countToDelete <= 0L) {
      return
    }
    val view = kvstore.view(classOf[SessionInfo]).index("finishTime").first(0L)
    val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
      j.finishTimestamp != 0L
    }

    toDelete.foreach { j => kvstore.delete(j.getClass, j.sessionId) }
  }

  /**
   * Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done
   * asynchronously, this method may return 0 in case enough items have been deleted already.
   */
  private def calculateNumberToRemove(dataSize: Long, retainedSize: Long): Long = {
    if (dataSize > retainedSize) {
      math.max(retainedSize / 10L, dataSize - retainedSize)
    } else {
      0L
    }
  }
}

private[thriftserver] class LiveExecutionData(
    val execId: String,
    val statement: String,
    val sessionId: String,
    val startTimestamp: Long,
    val userName: String) extends LiveEntity {

    var finishTimestamp: Long = 0L
    var closeTimestamp: Long = 0L
    var executePlan: String = ""
    var detail: String = ""
    var state: ExecutionState.Value = ExecutionState.STARTED
    val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
    var groupId: String = ""

  override protected def doUpdate(): Any = {
    new ExecutionInfo(
      execId,
      statement,
      sessionId,
      startTimestamp,
      userName,
      finishTimestamp,
      closeTimestamp,
      executePlan,
      detail,
      state,
      jobId,
      groupId)
  }
}

private[thriftserver] class LiveSessionData(
    val sessionId: String,
    val startTimeStamp: Long,
    val ip: String,
    val username: String) extends LiveEntity {

  var finishTimestamp: Long = 0L
  var totalExecution: Int = 0

  override protected def doUpdate(): Any = {
    new SessionInfo(
      sessionId,
      startTimeStamp,
      ip,
      username,
      finishTimestamp,
      totalExecution)
  }
}

相关信息

spark 源码目录

相关文章

spark HiveThriftServer2AppStatusStore 源码

spark HiveThriftServer2EventManager 源码

spark HiveThriftServer2HistoryServerPlugin 源码

spark ThriftServerPage 源码

spark ThriftServerSessionPage 源码

spark ThriftServerTab 源码

spark ToolTips 源码

0  赞