博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于案例贯通Spark Streaming流计算框架运行源码6
阅读量:6249 次
发布时间:2019-06-22

本文共 9618 字,大约阅读时间需要 32 分钟。

hot3.png

先贴下案例源码

import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Durations, StreamingContext}object StreamingWordCountSelfScala {  def main(args: Array[String]) {    val sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("StreamingWordCountSelfScala")    val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // 每5秒收割一次数据    val lines = ssc.socketTextStream("localhost", 9999) // 监听 本地9999 socket 端口    val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // flat map 后 reduce    words.print() // 打印结果    ssc.start() // 启动    ssc.awaitTermination()    ssc.stop(true)  }}

已经从源码分析到将Receiver作为RDD提交给Spark,本文将聚焦于Receiver在Spark 集群中执行的运行时。

  1. 提交给DAGScheduler,返回异步执行等待器JobWaiter。

  2. 返回SimpleFutureAction对象。

// SparkContext.scala line 1980def submitJob[T, U, R](    rdd: RDD[T], // 基于Receiver的RDD,见 ReceiverTracker.scala line 583    processPartition: Iterator[T] => U, // 此参数为 startReceiverFunc ,见 ReceiverTracker.scala line 564    partitions: Seq[Int], // 此处传入的是 Seq(0),见 ReceiverTracker.scala line 595    resultHandler: (Int, U) => Unit, // 调用时传入的是 (_, _) => Unit,因此 resultHandler = (_, _) => Unit,啥也没干。见 ReceiverTracker.scala line 595    resultFunc: => R): SimpleFutureAction[R] = // 调用时是 (_, _) => Unit,因此 resultHandler = (),啥也没干。见 ReceiverTracker.scala line 595{  assertNotStopped()  val cleanF = clean(processPartition) // 加工  val callSite = getCallSite  val waiter = dagScheduler.submitJob( // 提交给DAGScheduler,返回JobWaiter类型对象,执行结果等待器    rdd,     (context: TaskContext, iter: Iterator[T]) => cleanF(iter), // 构建同构方法签名    partitions,    callSite,    resultHandler,    localProperties.get)   new SimpleFutureAction(waiter, resultFunc)}

看下DAGScheduler.submitJob的内容

// DAGScheduler.scala line 543/** * Submit an action job to the scheduler. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD * @param partitions set of partitions to run on; some jobs may not want to compute on all *   partitions of the target RDD, e.g. for operations like first() * @param callSite where in the user program this job was called * @param resultHandler callback to pass each result to * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name * * @return a JobWaiter object that can be used to block until the job finishes executing *         or can be used to cancel the job. * * @throws IllegalArgumentException when partitions ids are illegal */def submitJob[T, U](    rdd: RDD[T], // 基于Receiver的RDD,见 ReceiverTracker.scala line 583    func: (TaskContext, Iterator[T]) => U,    partitions: Seq[Int], // 此处传入的是 Seq(0),见 ReceiverTracker.scala line 595    callSite: CallSite,    resultHandler: (Int, U) => Unit, // 调用时传入的是 (_, _) => Unit,因此 resultHandler = (_, _) => Unit,啥也没干。见 ReceiverTracker.scala line 595    properties: Properties): JobWaiter[U] = {  // Check to make sure we are not launching a task on a partition that does not exist.  val maxPartitions = rdd.partitions.length  partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>    throw new IllegalArgumentException(      "Attempting to access a non-existent partition: " + p + ". " +        "Total number of partitions: " + maxPartitions)  }  val jobId = nextJobId.getAndIncrement() // 获取JobID  if (partitions.size == 0) { // partitions = Seq(0) 因此,size==0 => false    // Return immediately if the job is running 0 tasks    return new JobWaiter[U](this, jobId, 0, resultHandler)  }  assert(partitions.size > 0)  val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]  // 此时 方法签名已经将输出弱化了,输出什么已经不重要了。  val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) // 实例化JobWaiter,因为基于不同的调度模式,可能会排队。异步等待来接收结果。  eventProcessLoop.post(JobSubmitted( // 提交JobSubmitted类型的case class给eventProcessLoop    jobId, rdd, func2, partitions.toArray, callSite, waiter,    SerializationUtils.clone(properties)))  waiter // 返回结果}

了解下这个事件的定义

看下JobSubmitted这个case class。继承自

// DAGSchedulerEvent.scala line 39/** A result-yielding job was submitted on a target RDD */private[scheduler] case class JobSubmitted(    jobId: Int,    finalRDD: RDD[_],    func: (TaskContext, Iterator[_]) => _,    partitions: Array[Int],    callSite: CallSite,    listener: JobListener,    properties: Properties = null)  extends DAGSchedulerEvent
// DAGSchedulerEvent.scala line 30/** * Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue * architecture where any thread can post an event (e.g. a task finishing or a new job being * submitted) but there is a single "logic" thread that reads these events and takes decisions. * This greatly simplifies synchronization. */private[scheduler] sealed trait DAGSchedulerEvent

此时将event post到DAGSchedulerEventLoop中的eventQueue。这个对象是在SparkContext创建的时候实例化的。这个属于Spark Core的内容了。后续给出Spark Core的流程源码解析,请关注。为了延续上下文,简单介绍下。

实例化SparkContext时,再构造中也同时构造了DAGScheduler

// SparkContext.scala line 525_dagScheduler = new DAGScheduler(this)

DAGScheduler在实例化时,实例化了DAGSchedulerEventProcessLoop

// DAGScheduler.scala line 184private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

看下DAGSchedulerEventProcessLoop的实例化过程。DAGSchedulerEventProcessLoop继承自EventLoop

// DAGScheduler.scala line 1588private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)  extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging

父类EventLoop是一个抽象类,构造很关键。

  1. 实例化一个LinkedBlockingDeque的阻塞队列

  2. 有一个,只有一个线程,不停的串行的从队列中的取出一个事件,具体如何处理。有兴趣的读者可以看DAGScheduler.scala line 1605 doOnReceive

  3. 定义了onStart方法,供子类重写,以便在start前被调用

  4. 定义了往队列中加入内容的post方法

DAGSchedulerEventProcessLoop实例化完成后,EventLoop中的线程并没有启动。

// EventLoop.scala line 34/** * An event loop to receive events from the caller and process all events in the event thread. It * will start an exclusive event thread to process all events. * * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can * handle events in time to avoid the potential OOM. */private[spark] abstract class EventLoop[E](name: String) extends Logging {  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() // 有一个阻塞的队列,实现是LinkedBlockingDeque  private val stopped = new AtomicBoolean(false)  private val eventThread = new Thread(name) {    setDaemon(true)    override def run(): Unit = {      try {        while (!stopped.get) {          val event = eventQueue.take()          try {            onReceive(event)          } catch {            case NonFatal(e) => {              try {                onError(e)              } catch {                case NonFatal(e) => logError("Unexpected error in " + name, e)              }            }          }        }      } catch {        case ie: InterruptedException => // exit even if eventQueue is not empty        case NonFatal(e) => logError("Unexpected error in " + name, e)      }    }  }  def start(): Unit = {    if (stopped.get) {      throw new IllegalStateException(name + " has already been stopped")    }    // Call onStart before starting the event thread to make sure it happens before onReceive    onStart()    eventThread.start()  }  def stop(): Unit = {    if (stopped.compareAndSet(false, true)) {      eventThread.interrupt()      var onStopCalled = false      try {        eventThread.join()        // Call onStop after the event thread exits to make sure onReceive happens before onStop        onStopCalled = true        onStop()      } catch {        case ie: InterruptedException =>          Thread.currentThread().interrupt()          if (!onStopCalled) {            // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since            // it's already called.            onStop()          }      }    } else {      // Keep quiet to allow calling `stop` multiple times.    }  }  /**   * Put the event into the event queue. The event thread will process it later.   */  def post(event: E): Unit = {    eventQueue.put(event)  }  /**   * Return if the event thread has already been started but not yet stopped.   */  def isActive: Boolean = eventThread.isAlive  ...  // 其他方法  /** * Invoked when `start()` is called but before the event thread starts. */  protected def onStart(): Unit = {}    // 其他方法}

真正启动线程是在DAGScheduler的构造中。隐藏的很深。

// DAGScheduler.scala line 1585eventProcessLoop.start()

至此,DAGScheduler中的事件消费线程启动了。

 

让我们再回到提交任务至Spark的场景。任务提交时,已经将JobWaiter的句柄也传递过去了。此时,直接返回JobWaiter的对象。

 

当处理到JobSubmitted类型的任务时,

// EventLoop.scala line 43override def run(): Unit = {  try {    while (!stopped.get) {      val event = eventQueue.take() // 取到的是JobSubmitted case class      try {        onReceive(event)      } catch {        case NonFatal(e) => {          try {            onError(e)          } catch {            case NonFatal(e) => logError("Unexpected error in " + name, e)          }        }      }    }  } catch {    case ie: InterruptedException => // exit even if eventQueue is not empty    case NonFatal(e) => logError("Unexpected error in " + name, e)  }}

追踪

// DAGScheduler.scala line 1596/** * The main event loop of the DAG scheduler. */override def onReceive(event: DAGSchedulerEvent): Unit = {  val timerContext = timer.time()  try {    doOnReceive(event)  } finally {    timerContext.stop()  }}

深入

// DAGScheduler.scala line 1605private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {  case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)  ...  // 其他 case class 的处理}

按照常理,此处应该有广告!

且听分解。

 

转载于:https://my.oschina.net/corleone/blog/672100

你可能感兴趣的文章
接口测试-python
查看>>
python使用hbase
查看>>
我太水了~
查看>>
Mysql-proxy中的lua脚本编程(一)
查看>>
SY-SUBRC 的含义【转】
查看>>
仓库管理系统用例建模
查看>>
转换数字为人民币大写金额
查看>>
Python爬虫之爬取西刺免费IP并保存到MySQL
查看>>
PostgreSQL的进程结构
查看>>
[HBase_2] HBase数据模型
查看>>
Android之Sqlite数据库
查看>>
高并发编程-CountDownLatch深入解析
查看>>
Sublime 中文标题乱码
查看>>
世界上最幸福的职业-鉴黄师
查看>>
asp.net 10 Cookie & Session
查看>>
[置顶]C# 邮件发送方法【NetMail方式】
查看>>
一个数据库系统的笔试题
查看>>
使用Form个性化修改标准Form的LOV
查看>>
第二阶段冲刺06
查看>>
六、input框中的数字(金额)只能输入正整数
查看>>