Spark内核源码深度剖析:sparkContext初始化的源码核心

释放双眼,带上耳机,听听看~!

TaskSchedulerImpl


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
1sparkContext中的创建createTaskScheduler(2000多行)
2  private def createTaskScheduler(
3      sc: SparkContext,
4      master: String): (SchedulerBackend, TaskScheduler) = {
5    import SparkMasterRegex._
6
7    // When running locally, don't try to re-execute tasks on failure.
8    val MAX_LOCAL_TASK_FAILURES = 1
9
10    master match {
11      case "local" =>
12        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
13        val backend = new LocalBackend(sc.getConf, scheduler, 1)
14        scheduler.initialize(backend)
15        (backend, scheduler)
16
17      case LOCAL_N_REGEX(threads) =>
18        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
19        // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
20        val threadCount = if (threads == "*") localCpuCount else threads.toInt
21        if (threadCount <= 0) {
22          throw new SparkException(s"Asked to run locally with $threadCount threads")
23        }
24        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
25        val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
26        scheduler.initialize(backend)
27        (backend, scheduler)
28  * 1。底层通过调度一个ScheduleBackend,针对不同种类的cluster(standalone,local,mesos),调度task。
29  * 2。也可通过一个localbackend,并将一个islocal参数设为true,并在本地模式在运行。
30  * 3。他负责处理一些通用的逻辑,比如说启动多个job的执行顺序,启动推测任务执行。
31  * 4。客户端首先调用他们的init(),start()方法,然后通过run task()方法提交task sets;
32TaskSchedulerImpl初始化的代码内容
33def initialize(backend: SchedulerBackend) {
34    this.backend = backend
35    // temporarily set rootPool name to empty
36    rootPool = new Pool("", schedulingMode, 0, 0)
37    schedulableBuilder = {
38      schedulingMode match {
39        case SchedulingMode.FIFO =>
40          new FIFOSchedulableBuilder(rootPool)
41        case SchedulingMode.FAIR =>
42          new FairSchedulableBuilder(rootPool, conf)
43      }
44    }
45    schedulableBuilder.buildPools()
46  }
47

DAGScheduler


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1sparkContext中的创建DAGScheduler
2
3实现了面向stage的调度机制的最高层的调度层,它会为每个job计算一个stage的DAG(有向无环图),追踪
4  * RDD和stage的输出是否被物化了(写入磁盘或内存),并且寻找一个最小消耗调度机制来运行job。
5  * 它会将stage作为tasksets提交到底层的TaskSchedulerImpl,并将集群上运行他们。
6  *除了处理stage的DAG,他们还要负责运行每个task的最佳位置,基于当前的缓存状态,将这些位置交给底层的TaskScheduleImpl
7  * 除此之外,它还会处理shaffule处理文件导致的失败,在这种情况下,旧的stage可能被重新提交,一个stage内部的失败,如果不是
8  * shuffle文件丢失出现的问题,会被taskScheduler处理,他会多次重试每一个task,知道最后,实在不行才会去取消整个stage。
9
10DAGScheduler会构造一个ApplicationDescription
11  *  //这个ApplicationDescription十分重要,它包括了当前执行的这个application的一些情况
12    //cpu,内存。
13    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
14      command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
15    client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
16    client.start()
17    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
18    waitForRegistration()
19    launcherBackend.setState(SparkAppHandle.State.RUNNING)
20

AppClient


1
2
3
4
5
1  *这是一个接口,它负责application与spark集群进行通信。
2  *它会接收一个sparkmaster的URL,以及一个applicationDescription,和一个集群事件的监听器,
3  * 以及各种事件发生时监听器的回调函数。
4
5

SparkUI


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1//提供一个spark界面
2private def create(
3      sc: Option[SparkContext],
4      conf: SparkConf,
5      listenerBus: SparkListenerBus,
6      securityManager: SecurityManager,
7      appName: String,
8      basePath: String = "",
9      jobProgressListener: Option[JobProgressListener] = None,
10      startTime: Long): SparkUI = {
11
12    val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
13      val listener = new JobProgressListener(conf)
14      listenerBus.addListener(listener)
15      listener
16    }
17
18    val environmentListener = new EnvironmentListener
19    val storageStatusListener = new StorageStatusListener
20    val executorsListener = new ExecutorsListener(storageStatusListener)
21    val storageListener = new StorageListener(storageStatusListener)
22    val operationGraphListener = new RDDOperationGraphListener(conf)
23
24    listenerBus.addListener(environmentListener)
25    listenerBus.addListener(storageStatusListener)
26    listenerBus.addListener(executorsListener)
27    listenerBus.addListener(storageListener)
28    listenerBus.addListener(operationGraphListener)
29
30    new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
31      executorsListener, _jobProgressListener, storageListener, operationGraphListener,
32      appName, basePath, startTime)
33  }
34

给TA打赏
共{{data.count}}人
人已打赏
安全运维

OpenSSH-8.7p1离线升级修复安全漏洞

2021-10-23 10:13:25

安全运维

设计模式的设计原则

2021-12-12 17:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索