先贴下案例源码
import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Durations, StreamingContext}object StreamingWordCountSelfScala { def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("StreamingWordCountSelfScala") val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // 每5秒收割一次数据 val lines = ssc.socketTextStream("localhost", 9999) // 监听 本地9999 socket 端口 val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // flat map 后 reduce words.print() // 打印结果 ssc.start() // 启动 ssc.awaitTermination() ssc.stop(true) }}
已经从源码分析到将Receiver作为RDD提交给Spark,本文将聚焦于Receiver在Spark 集群中执行的运行时。
-
提交给DAGScheduler,返回异步执行等待器JobWaiter。
-
返回SimpleFutureAction对象。
// SparkContext.scala line 1980def submitJob[T, U, R]( rdd: RDD[T], // 基于Receiver的RDD,见 ReceiverTracker.scala line 583 processPartition: Iterator[T] => U, // 此参数为 startReceiverFunc ,见 ReceiverTracker.scala line 564 partitions: Seq[Int], // 此处传入的是 Seq(0),见 ReceiverTracker.scala line 595 resultHandler: (Int, U) => Unit, // 调用时传入的是 (_, _) => Unit,因此 resultHandler = (_, _) => Unit,啥也没干。见 ReceiverTracker.scala line 595 resultFunc: => R): SimpleFutureAction[R] = // 调用时是 (_, _) => Unit,因此 resultHandler = (),啥也没干。见 ReceiverTracker.scala line 595{ assertNotStopped() val cleanF = clean(processPartition) // 加工 val callSite = getCallSite val waiter = dagScheduler.submitJob( // 提交给DAGScheduler,返回JobWaiter类型对象,执行结果等待器 rdd, (context: TaskContext, iter: Iterator[T]) => cleanF(iter), // 构建同构方法签名 partitions, callSite, resultHandler, localProperties.get) new SimpleFutureAction(waiter, resultFunc)}
看下DAGScheduler.submitJob的内容
// DAGScheduler.scala line 543/** * Submit an action job to the scheduler. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD * @param partitions set of partitions to run on; some jobs may not want to compute on all * partitions of the target RDD, e.g. for operations like first() * @param callSite where in the user program this job was called * @param resultHandler callback to pass each result to * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name * * @return a JobWaiter object that can be used to block until the job finishes executing * or can be used to cancel the job. * * @throws IllegalArgumentException when partitions ids are illegal */def submitJob[T, U]( rdd: RDD[T], // 基于Receiver的RDD,见 ReceiverTracker.scala line 583 func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], // 此处传入的是 Seq(0),见 ReceiverTracker.scala line 595 callSite: CallSite, resultHandler: (Int, U) => Unit, // 调用时传入的是 (_, _) => Unit,因此 resultHandler = (_, _) => Unit,啥也没干。见 ReceiverTracker.scala line 595 properties: Properties): JobWaiter[U] = { // Check to make sure we are not launching a task on a partition that does not exist. val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) } val jobId = nextJobId.getAndIncrement() // 获取JobID if (partitions.size == 0) { // partitions = Seq(0) 因此,size==0 => false // Return immediately if the job is running 0 tasks return new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] // 此时 方法签名已经将输出弱化了,输出什么已经不重要了。 val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) // 实例化JobWaiter,因为基于不同的调度模式,可能会排队。异步等待来接收结果。 eventProcessLoop.post(JobSubmitted( // 提交JobSubmitted类型的case class给eventProcessLoop jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter // 返回结果}
了解下这个事件的定义
看下JobSubmitted这个case class。继承自
// DAGSchedulerEvent.scala line 39/** A result-yielding job was submitted on a target RDD */private[scheduler] case class JobSubmitted( jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties = null) extends DAGSchedulerEvent
// DAGSchedulerEvent.scala line 30/** * Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue * architecture where any thread can post an event (e.g. a task finishing or a new job being * submitted) but there is a single "logic" thread that reads these events and takes decisions. * This greatly simplifies synchronization. */private[scheduler] sealed trait DAGSchedulerEvent
此时将event post到DAGSchedulerEventLoop中的eventQueue。这个对象是在SparkContext创建的时候实例化的。这个属于Spark Core的内容了。后续给出Spark Core的流程源码解析,请关注。为了延续上下文,简单介绍下。
实例化SparkContext时,再构造中也同时构造了DAGScheduler
// SparkContext.scala line 525_dagScheduler = new DAGScheduler(this)
DAGScheduler在实例化时,实例化了DAGSchedulerEventProcessLoop
// DAGScheduler.scala line 184private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
看下DAGSchedulerEventProcessLoop的实例化过程。DAGSchedulerEventProcessLoop继承自EventLoop
// DAGScheduler.scala line 1588private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler) extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging
父类EventLoop是一个抽象类,构造很关键。
-
实例化一个LinkedBlockingDeque的阻塞队列
-
有一个,只有一个线程,不停的串行的从队列中的取出一个事件,具体如何处理。有兴趣的读者可以看DAGScheduler.scala line 1605 doOnReceive
-
定义了onStart方法,供子类重写,以便在start前被调用
-
定义了往队列中加入内容的post方法
DAGSchedulerEventProcessLoop实例化完成后,EventLoop中的线程并没有启动。
// EventLoop.scala line 34/** * An event loop to receive events from the caller and process all events in the event thread. It * will start an exclusive event thread to process all events. * * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can * handle events in time to avoid the potential OOM. */private[spark] abstract class EventLoop[E](name: String) extends Logging { private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() // 有一个阻塞的队列,实现是LinkedBlockingDeque private val stopped = new AtomicBoolean(false) private val eventThread = new Thread(name) { setDaemon(true) override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try { onReceive(event) } catch { case NonFatal(e) => { try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } } def start(): Unit = { if (stopped.get) { throw new IllegalStateException(name + " has already been stopped") } // Call onStart before starting the event thread to make sure it happens before onReceive onStart() eventThread.start() } def stop(): Unit = { if (stopped.compareAndSet(false, true)) { eventThread.interrupt() var onStopCalled = false try { eventThread.join() // Call onStop after the event thread exits to make sure onReceive happens before onStop onStopCalled = true onStop() } catch { case ie: InterruptedException => Thread.currentThread().interrupt() if (!onStopCalled) { // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since // it's already called. onStop() } } } else { // Keep quiet to allow calling `stop` multiple times. } } /** * Put the event into the event queue. The event thread will process it later. */ def post(event: E): Unit = { eventQueue.put(event) } /** * Return if the event thread has already been started but not yet stopped. */ def isActive: Boolean = eventThread.isAlive ... // 其他方法 /** * Invoked when `start()` is called but before the event thread starts. */ protected def onStart(): Unit = {} // 其他方法}
真正启动线程是在DAGScheduler的构造中。隐藏的很深。
// DAGScheduler.scala line 1585eventProcessLoop.start()
至此,DAGScheduler中的事件消费线程启动了。
让我们再回到提交任务至Spark的场景。任务提交时,已经将JobWaiter的句柄也传递过去了。此时,直接返回JobWaiter的对象。
当处理到JobSubmitted类型的任务时,
// EventLoop.scala line 43override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() // 取到的是JobSubmitted case class try { onReceive(event) } catch { case NonFatal(e) => { try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) }}
追踪
// DAGScheduler.scala line 1596/** * The main event loop of the DAG scheduler. */override def onReceive(event: DAGSchedulerEvent): Unit = { val timerContext = timer.time() try { doOnReceive(event) } finally { timerContext.stop() }}
深入
// DAGScheduler.scala line 1605private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) ... // 其他 case class 的处理}
按照常理,此处应该有广告!
且听分解。