spark ConsoleProgressBar 源码
spark ConsoleProgressBar 代码
文件路径:/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ui
import java.util.{Timer, TimerTask}
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.UI._
import org.apache.spark.status.api.v1.StageData
/**
* ConsoleProgressBar shows the progress of stages in the next line of the console. It poll the
* status of active stages from the app state store periodically, the progress bar will be showed
* up after the stage has ran at least 500ms. If multiple stages run in the same time, the status
* of them will be combined together, showed in one line.
*/
private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
// Carriage return
private val CR = '\r'
// Update period of progress bar, in milliseconds
private val updatePeriodMSec = sc.getConf.get(UI_CONSOLE_PROGRESS_UPDATE_INTERVAL)
// Delay to show up a progress bar, in milliseconds
private val firstDelayMSec = 500L
// The width of terminal
private val TerminalWidth = sys.env.getOrElse("COLUMNS", "80").toInt
private var lastFinishTime = 0L
private var lastUpdateTime = 0L
private var lastProgressBar = ""
// Schedule a refresh thread to run periodically
private val timer = new Timer("refresh progress", true)
timer.schedule(new TimerTask {
override def run(): Unit = {
refresh()
}
}, firstDelayMSec, updatePeriodMSec)
/**
* Try to refresh the progress bar in every cycle
*/
private def refresh(): Unit = synchronized {
val now = System.currentTimeMillis()
if (now - lastFinishTime < firstDelayMSec) {
return
}
val stages = sc.statusStore.activeStages()
.filter { s => now - s.submissionTime.get.getTime() > firstDelayMSec }
if (stages.length > 0) {
show(now, stages.take(3)) // display at most 3 stages in same time
}
}
/**
* Show progress bar in console. The progress bar is displayed in the next line
* after your last output, keeps overwriting itself to hold in one line. The logging will follow
* the progress bar, then progress bar will be showed in next line without overwrite logs.
*/
private def show(now: Long, stages: Seq[StageData]): Unit = {
val width = TerminalWidth / stages.size
val bar = stages.map { s =>
val total = s.numTasks
val header = s"[Stage ${s.stageId}:"
val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]"
val w = width - header.length - tailer.length
val bar = if (w > 0) {
val percent = w * s.numCompleteTasks / total
(0 until w).map { i =>
if (i < percent) "=" else if (i == percent) ">" else " "
}.mkString("")
} else {
""
}
header + bar + tailer
}.mkString("")
// only refresh if it's changed OR after 1 minute (or the ssh connection will be closed
// after idle some time)
if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) {
System.err.print(CR + bar + CR)
lastUpdateTime = now
}
lastProgressBar = bar
}
/**
* Clear the progress bar if showed.
*/
private def clear(): Unit = {
if (!lastProgressBar.isEmpty) {
System.err.printf(CR + " " * TerminalWidth + CR)
lastProgressBar = ""
}
}
/**
* Mark all the stages as finished, clear the progress bar if showed, then the progress will not
* interweave with output of jobs.
*/
def finishAll(): Unit = synchronized {
clear()
lastFinishTime = System.currentTimeMillis()
}
/**
* Tear down the timer thread. The timer thread is a GC root, and it retains the entire
* SparkContext if it's not terminated.
*/
def stop(): Unit = timer.cancel()
}
相关信息
相关文章
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦