释放双眼,带上耳机,听听看~!
TaskSchedulerImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 1sparkContext中的创建createTaskScheduler(2000多行)
2 private def createTaskScheduler(
3 sc: SparkContext,
4 master: String): (SchedulerBackend, TaskScheduler) = {
5 import SparkMasterRegex._
6
7 // When running locally, don't try to re-execute tasks on failure.
8 val MAX_LOCAL_TASK_FAILURES = 1
9
10 master match {
11 case "local" =>
12 val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
13 val backend = new LocalBackend(sc.getConf, scheduler, 1)
14 scheduler.initialize(backend)
15 (backend, scheduler)
16
17 case LOCAL_N_REGEX(threads) =>
18 def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
19 // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
20 val threadCount = if (threads == "*") localCpuCount else threads.toInt
21 if (threadCount <= 0) {
22 throw new SparkException(s"Asked to run locally with $threadCount threads")
23 }
24 val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
25 val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
26 scheduler.initialize(backend)
27 (backend, scheduler)
28 * 1。底层通过调度一个ScheduleBackend,针对不同种类的cluster(standalone,local,mesos),调度task。
29 * 2。也可通过一个localbackend,并将一个islocal参数设为true,并在本地模式在运行。
30 * 3。他负责处理一些通用的逻辑,比如说启动多个job的执行顺序,启动推测任务执行。
31 * 4。客户端首先调用他们的init(),start()方法,然后通过run task()方法提交task sets;
32TaskSchedulerImpl初始化的代码内容
33def initialize(backend: SchedulerBackend) {
34 this.backend = backend
35 // temporarily set rootPool name to empty
36 rootPool = new Pool("", schedulingMode, 0, 0)
37 schedulableBuilder = {
38 schedulingMode match {
39 case SchedulingMode.FIFO =>
40 new FIFOSchedulableBuilder(rootPool)
41 case SchedulingMode.FAIR =>
42 new FairSchedulableBuilder(rootPool, conf)
43 }
44 }
45 schedulableBuilder.buildPools()
46 }
47
DAGScheduler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1sparkContext中的创建DAGScheduler
2
3实现了面向stage的调度机制的最高层的调度层,它会为每个job计算一个stage的DAG(有向无环图),追踪
4 * RDD和stage的输出是否被物化了(写入磁盘或内存),并且寻找一个最小消耗调度机制来运行job。
5 * 它会将stage作为tasksets提交到底层的TaskSchedulerImpl,并将集群上运行他们。
6 *除了处理stage的DAG,他们还要负责运行每个task的最佳位置,基于当前的缓存状态,将这些位置交给底层的TaskScheduleImpl
7 * 除此之外,它还会处理shaffule处理文件导致的失败,在这种情况下,旧的stage可能被重新提交,一个stage内部的失败,如果不是
8 * shuffle文件丢失出现的问题,会被taskScheduler处理,他会多次重试每一个task,知道最后,实在不行才会去取消整个stage。
9
10DAGScheduler会构造一个ApplicationDescription
11 * //这个ApplicationDescription十分重要,它包括了当前执行的这个application的一些情况
12 //cpu,内存。
13 val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
14 command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
15 client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
16 client.start()
17 launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
18 waitForRegistration()
19 launcherBackend.setState(SparkAppHandle.State.RUNNING)
20
AppClient
1
2
3
4
5 1 *这是一个接口,它负责application与spark集群进行通信。
2 *它会接收一个sparkmaster的URL,以及一个applicationDescription,和一个集群事件的监听器,
3 * 以及各种事件发生时监听器的回调函数。
4
5
SparkUI
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 1//提供一个spark界面
2private def create(
3 sc: Option[SparkContext],
4 conf: SparkConf,
5 listenerBus: SparkListenerBus,
6 securityManager: SecurityManager,
7 appName: String,
8 basePath: String = "",
9 jobProgressListener: Option[JobProgressListener] = None,
10 startTime: Long): SparkUI = {
11
12 val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
13 val listener = new JobProgressListener(conf)
14 listenerBus.addListener(listener)
15 listener
16 }
17
18 val environmentListener = new EnvironmentListener
19 val storageStatusListener = new StorageStatusListener
20 val executorsListener = new ExecutorsListener(storageStatusListener)
21 val storageListener = new StorageListener(storageStatusListener)
22 val operationGraphListener = new RDDOperationGraphListener(conf)
23
24 listenerBus.addListener(environmentListener)
25 listenerBus.addListener(storageStatusListener)
26 listenerBus.addListener(executorsListener)
27 listenerBus.addListener(storageListener)
28 listenerBus.addListener(operationGraphListener)
29
30 new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
31 executorsListener, _jobProgressListener, storageListener, operationGraphListener,
32 appName, basePath, startTime)
33 }
34