当前位置: 首页 > 编程日记 > 正文

Spark源码分析之七:Task运行(一)

在Task调度相关的两篇文章《Spark源码分析之五:Task调度(一)》与《Spark源码分析之六:Task调度(二)》中,我们大致了解了Task调度相关的主要逻辑,并且在Task调度逻辑的最后,CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的makeOffers()方法的最后,我们通过调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq[Seq[TaskDescription]],相关代码如下:

[java] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. // 调用scheduler的resourceOffers()方法,分配资源,并在得到资源后,调用launchTasks()方法,启动tasks  
  2. // 这个scheduler就是TaskSchedulerImpl  
  3. launchTasks(scheduler.resourceOffers(workOffers))
[java] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /** 
  2.    * Called by cluster manager to offer resources on slaves. We respond by asking our active task 
  3.    * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so 
  4.    * that tasks are balanced across the cluster. 
  5.    * 
  6.    * 被集群manager调用以提供slaves上的资源。我们通过按照优先顺序询问活动task集中的task来回应。 
  7.    * 我们通过循环的方式将task调度到每个节点上以便tasks在集群中可以保持大致的均衡。 
  8.    */  
  9. def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {  

这个TaskDescription很简单,是传递到executor上即将被执行的Task的描述,通常由TaskSetManager的resourceOffer()方法生成。代码如下:

[java] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /** 
  2.  * Description of a task that gets passed onto executors to be executed, usually created by 
  3.  * [[TaskSetManager.resourceOffer]]. 
  4.  */  
  5. private[spark] class TaskDescription(  
  6. val taskId: Long,
  7. val attemptNumber: Int,
  8. val executorId: String,
  9. val name: String,
  10. val index: Int,    // Index within this task's TaskSet  
  11. _serializedTask: ByteBuffer)
  12. extends Serializable {  
  13. // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer  
  14. // 由于ByteBuffers不可以被序列化,所以将task包装在SerializableBuffer中,_serializedTask为ByteBuffer类型的Task  
  15. private val buffer = new SerializableBuffer(_serializedTask)  
  16. // 序列化后的Task, 取buffer的value  
  17. def serializedTask: ByteBuffer = buffer.value
  18. override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)  
  19. }

此时,得到Seq[Seq[TaskDescription]],即Task被调度到相应executor上后(仅是逻辑调度,实际上并未分配到executor上执行),接下来要做的,便是真正的将Task分配到指定的executor上去执行,也就是本篇我们将要讲的Task的运行。而这部分的开端,源于上述提到的CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的launchTasks()方法,代码如下:

[java] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. // Launch tasks returned by a set of resource offers  
  2. private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {  
  3. // 循环每个task  
  4. for (task <- tasks.flatten) {  
  5. // 序列化Task  
  6. val serializedTask = ser.serialize(task)
  7. // 序列化后的task的大小超出规定的上限  
  8. // 即如果序列化后task的大小大于等于框架配置的Akka消息最大大小减去除序列化task或task结果外,一个Akka消息需要保留的额外大小的值  
  9. if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {  
  10. // 根据task的taskId,在TaskSchedulerImpl的taskIdToTaskSetManager中获取对应的TaskSetManager  
  11. scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
  12. try {  
  13. var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +  
  14. "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +  
  15. "spark.akka.frameSize or using broadcast variables for large values."  
  16. msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
  17. AkkaUtils.reservedSizeBytes)
  18. // 调用TaskSetManager的abort()方法,标记对应TaskSetManager为失败  
  19. taskSetMgr.abort(msg)
  20. catch {  
  21. case e: Exception => logError("Exception in error callback", e)  
  22. }
  23. }
  24. }
  25. else {// 序列化后task的大小在规定的大小内  
  26. // 从executorDataMap中,根据task.executorId获取executor描述信息executorData  
  27. val executorData = executorDataMap(task.executorId)
  28. // executorData中,freeCores做相应减少  
  29. executorData.freeCores -= scheduler.CPUS_PER_TASK
  30. // 利用executorData中的executorEndpoint,发送LaunchTask事件,LaunchTask事件中包含序列化后的task  
  31. executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))  
  32. }
  33. }
  34. }

launchTasks的执行逻辑很简单,针对传入的TaskDescription序列,循环每个Task,做以下处理:

1、首先对Task进行序列化,得到serializedTask;

2、针对序列化后的Task:serializedTask,判断其大小:

2.1、序列化后的task的大小达到或超出规定的上限,即框架配置的Akka消息最大大小,减去除序列化task或task结果外,一个Akka消息需要保留的额外大小的值,则根据task的taskId,在TaskSchedulerImpl的taskIdToTaskSetManager中获取对应的TaskSetManager,并调用其abort()方法,标记对应TaskSetManager为失败;

2.2、序列化后的task的大小未达到上限,在规定的大小范围内,则:

2.2.1、从executorDataMap中,根据task.executorId获取executor描述信息executorData;

2.2.2、在executorData中,freeCores做相应减少;

2.2.3、利用executorData中的executorEndpoint,即Driver端executor通讯端点的引用,发送LaunchTask事件,LaunchTask事件中包含序列化后的task,将Task传递到executor中去执行。

接下来,我们重点分析下上述流程。

先说下异常流程,即序列化后Task的大小超过上限时,对TaskSet标记为失败的处理。入口方法为TaskSetManager的abort()方法,代码如下:

[java] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. def abort(message: String, exception: Option[Throwable] = None): Unit = sched.synchronized {  
  2. // TODO: Kill running tasks if we were not terminated due to a Mesos error  
  3. // 调用DAGScheduler的taskSetFailed()方法,标记TaskSet运行失败  
  4. sched.dagScheduler.taskSetFailed(taskSet, message, exception)
  5. // 标志位isZombie设置为true  
  6. isZombie = true  
  7. // 满足一定条件的情况下,将TaskSet标记为Finished  
  8. maybeFinishTaskSet()
  9. }

abort()方法处理逻辑共分三步:

第一,调用DAGScheduler的taskSetFailed()方法,标记TaskSet运行失败;

第二,标志位isZombie设置为true;

第三,满足一定条件的情况下,将TaskSet标记为Finished。

首先看下DAGScheduler的taskSetFailed()方法,代码如下:

[java] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /** 
  2.    * Called by the TaskSetManager to cancel an entire TaskSet due to either repeated failures or 
  3.    * cancellation of the job itself. 
  4.    */  
  5. def taskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable]): Unit = {
  6. eventProcessLoop.post(TaskSetFailed(taskSet, reason, exception))
  7. }

和第二篇文章《Spark源码分析之二:Job的调度模型与运行反馈》中Job的调度模型一致,都是依靠事件队列eventProcessLoop来完成事件的调度执行的,这里,我们在事件队列eventProcessLoop中放入了一个TaskSetFailed事件。在DAGScheduler的事件处理调度函数doOnReceive()方法中,明确规定了事件的处理方法,代码如下:

[java] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. // 如果是TaskSetFailed事件,调用dagScheduler.handleTaskSetFailed()方法处理  
  2. case TaskSetFailed(taskSet, reason, exception) =>  
  3. dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

下面,我们看下handleTaskSetFailed()这个方法。

[java] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. private[scheduler] def handleTaskSetFailed(  
  2. taskSet: TaskSet,
  3. reason: String,
  4. exception: Option[Throwable]): Unit = {
  5. // 根据taskSet的stageId获取到对应的Stage,循环调用abortStage,终止该Stage  
  6. stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, exception) }
  7. // 提交等待的Stages  
  8. submitWaitingStages()
  9. }

很简单,首先通过taskSet的stageId获取到对应的Stage,针对Stage,循环调用abortStage()方法,终止该Stage,然后调用submitWaitingStages()方法提交等待的Stages。我们先看下abortStage()方法,代码如下:

[java] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /** 
  2.    * Aborts all jobs depending on a particular Stage. This is called in response to a task set 
  3.    * being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside. 
  4.    * 终止给定Stage上的所有Job。 
  5.    */  
  6. private[scheduler] def abortStage(  
  7. failedStage: Stage,
  8. reason: String,
  9. exception: Option[Throwable]): Unit = {
  10. // 如果stageIdToStage中不存在对应的stage,说明stage已经被移除,直接返回  
  11. if (!stageIdToStage.contains(failedStage.id)) {  
  12. // Skip all the actions if the stage has been removed.  
  13. return  
  14. }
  15. // 遍历activeJobs中的ActiveJob,逐个调用stageDependsOn()方法,找出存在failedStage的祖先stage的activeJob,即dependentJobs  
  16. val dependentJobs: Seq[ActiveJob] =
  17. activeJobs.filter(job => stageDependsOn(job.finalStage, failedStage)).toSeq
  18. // 标记failedStage的完成时间completionTime  
  19. failedStage.latestInfo.completionTime = Some(clock.getTimeMillis())
  20. // 遍历dependentJobs,调用failJobAndIndependentStages()  
  21. for (job <- dependentJobs) {  
  22. failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason", exception)  
  23. }
  24. if (dependentJobs.isEmpty) {  
  25. logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")  
  26. }
  27. }

这个方法的处理逻辑主要分为四步:

1、如果stageIdToStage中不存在对应的stage,说明stage已经被移除,直接返回,这是对异常情况下的一种特殊处理;

2、遍历activeJobs中的ActiveJob,逐个调用stageDependsOn()方法,找出存在failedStage的祖先stage的activeJob,即dependentJobs;

3、标记failedStage的完成时间completionTime;

4、遍历dependentJobs,调用failJobAndIndependentStages()。

其它都好说,我们主要看下stageDependsOn()和failJobAndIndependentStages()这两个方法。首先看下stageDependsOn()方法,代码如下:

[java] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /** Return true if one of stage's ancestors is target. */  
  2. // 如果参数stage的祖先是target,返回true  
  3. private def stageDependsOn(stage: Stage, target: Stage): Boolean = {  
  4. // 如果stage即为target,返回true  
  5. if (stage == target) {  
  6. return true  
  7. }
  8. // 存储处理过的RDD  
  9. val visitedRdds = new HashSet[RDD[_]]  
  10. // We are manually maintaining a stack here to prevent StackOverflowError  
  11. // caused by recursively visiting  
  12. // 存储待处理的RDD  
  13. val waitingForVisit = new Stack[RDD[_]]  
  14. // 定义一个visit()方法  
  15. def visit(rdd: RDD[_]) {
  16. // 如果该RDD未被处理过的话,继续处理  
  17. if (!visitedRdds(rdd)) {  
  18. // 将RDD添加到visitedRdds中  
  19. visitedRdds += rdd
  20. // 遍历RDD的依赖  
  21. for (dep <- rdd.dependencies) {  
  22. dep match {
  23. // 如果是ShuffleDependency  
  24. case shufDep: ShuffleDependency[_, _, _] =>  
  25. // 获得mapStage,并且如果stage的isAvailable为false的话,将其压入waitingForVisit  
  26. val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
  27. if (!mapStage.isAvailable) {  
  28. waitingForVisit.push(mapStage.rdd)
  29. }  // Otherwise there's no need to follow the dependency back  
  30. // 如果是NarrowDependency,直接将其压入waitingForVisit  
  31. case narrowDep: NarrowDependency[_] =>  
  32. waitingForVisit.push(narrowDep.rdd)
  33. }
  34. }
  35. }
  36. }
  37. // 从stage的rdd开始处理,将其入栈waitingForVisit  
  38. waitingForVisit.push(stage.rdd)
  39. // 当waitingForVisit中存在数据,就调用visit()方法进行处理  
  40. while (waitingForVisit.nonEmpty) {  
  41. visit(waitingForVisit.pop())
  42. }
  43. // 根据visitedRdds中是否存在target的rdd判断参数stage的祖先是否为target  
  44. visitedRdds.contains(target.rdd)
  45. }

这个方法主要是判断参数stage是否为参数target的祖先stage,其代码风格与stage划分和提交中的部分代码一样,这在前面的两篇文章中也提到过,在此不再赘述。而它主要是通过stage的rdd,并遍历其上层依赖的rdd链,将每个stage的rdd加入到visitedRdds中,最后根据visitedRdds中是否存在target的rdd判断参数stage的祖先是否为target。值得一提的是,如果RDD的依赖是NarrowDependency,直接将其压入waitingForVisit,如果为ShuffleDependency,则需要判断stage的isAvailable,如果为false,则将对应RDD压入waitingForVisit。关于isAvailable,我在《Spark源码分析之四:Stage提交》一文中具体阐述过,这里不再赘述。

接下来,我们再看下failJobAndIndependentStages()方法,这个方法的主要作用就是使得一个Job和仅被该Job使用的所有stages失败,并清空有关状态。代码如下:

[java] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /** Fails a job and all stages that are only used by that job, and cleans up relevant state. */  
  2. // 使得一个Job和仅被该Job使用的所有stages失败,并清空有关状态  
  3. private def failJobAndIndependentStages(  
  4. job: ActiveJob,
  5. failureReason: String,
  6. exception: Option[Throwable] = None): Unit = {
  7. // 构造一个异常,内容为failureReason  
  8. val error = new SparkException(failureReason, exception.getOrElse(null))  
  9. // 标志位,是否能取消Stages  
  10. var ableToCancelStages = true  
  11. // 标志位,是否应该中断线程  
  12. val shouldInterruptThread =
  13. if (job.properties == null) false  
  14. else job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false").toBoolean  
  15. // Cancel all independent, running stages.  
  16. // 取消所有独立的,正在运行的stages  
  17. // 根据Job的jobId,获取其stages  
  18. val stages = jobIdToStageIds(job.jobId)
  19. // 如果stages为空,记录错误日志  
  20. if (stages.isEmpty) {  
  21. logError("No stages registered for job " + job.jobId)  
  22. }
  23. // 遍历stages,循环处理  
  24. stages.foreach { stageId =>
  25. // 根据stageId,获取jobsForStage,即每个Job所包含的Stage信息  
  26. val jobsForStage: Option[HashSet[Int]] = stageIdToStage.get(stageId).map(_.jobIds)
  27. // 首先处理异常情况,即jobsForStage为空,或者jobsForStage中不包含当前Job  
  28. if (jobsForStage.isEmpty || !jobsForStage.get.contains(job.jobId)) {  
  29. logError(
  30. "Job %d not registered for stage %d even though that stage was registered for the job"  
  31. .format(job.jobId, stageId))
  32. else if (jobsForStage.get.size == 1) {  
  33. // 如果stageId对应的stage不存在  
  34. if (!stageIdToStage.contains(stageId)) {  
  35. logError(s"Missing Stage for stage with id $stageId")  
  36. else {  
  37. // This is the only job that uses this stage, so fail the stage if it is running.  
  38. //   
  39. val stage = stageIdToStage(stageId)
  40. if (runningStages.contains(stage)) {  
  41. try { // cancelTasks will fail if a SchedulerBackend does not implement killTask  
  42. // 调用taskScheduler的cancelTasks()方法,取消stage内的tasks  
  43. taskScheduler.cancelTasks(stageId, shouldInterruptThread)
  44. // 标记Stage为完成  
  45. markStageAsFinished(stage, Some(failureReason))
  46. catch {  
  47. case e: UnsupportedOperationException =>  
  48. logInfo(s"Could not cancel tasks for stage $stageId", e)  
  49. ableToCancelStages = false  
  50. }
  51. }
  52. }
  53. }
  54. }
  55. if (ableToCancelStages) {// 如果能取消Stages  
  56. // 调用job监听器的jobFailed()方法  
  57. job.listener.jobFailed(error)
  58. // 为Job和独立Stages清空状态,独立Stages的意思为该stage仅为该Job使用  
  59. cleanupStateForJobAndIndependentStages(job)
  60. // 发送一个SparkListenerJobEnd事件  
  61. listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
  62. }
  63. }

处理过程还是很简单的,读者可以通过上述源码和注释自行补脑,这里就先略过了。

下面,再说下正常情况下,即序列化后Task大小未超过上限时,LaunchTask事件的发送及executor端的响应。代码再跳转到CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的launchTasks()方法。正常情况下处理流程主要分为三大部分:

1、从executorDataMap中,根据task.executorId获取executor描述信息executorData;

2、在executorData中,freeCores做相应减少;

3、利用executorData中的executorEndpoint,即Driver端executor通讯端点的引用,发送LaunchTask事件,LaunchTask事件中包含序列化后的task,将Task传递到executor中去执行。

我们重点看下第3步,利用Driver端持有的executor描述信息executorData中的executorEndpoint,即Driver端executor通讯端点的引用,发送LaunchTask事件给executor,将Task传递到executor中去执行。那么executor中是如何接收LaunchTask事件的呢?答案就在CoarseGrainedExecutorBackend中。

我们先说下这个CoarseGrainedExecutorBackend,类的定义如下所示:

[java] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. private[spark] class CoarseGrainedExecutorBackend(  
  2. override val rpcEnv: RpcEnv,
  3. driverUrl: String,
  4. executorId: String,
  5. hostPort: String,
  6. cores: Int,
  7. userClassPath: Seq[URL],
  8. env: SparkEnv)
  9. extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {  

由上面的代码我们可以知道,它实现了ThreadSafeRpcEndpoint和ExecutorBackend两个trait,而ExecutorBackend的定义如下:

[java] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. /** 
  2.  * A pluggable interface used by the Executor to send updates to the cluster scheduler. 
  3.  * 一个被Executor用来发送更新到集群调度器的可插拔接口。 
  4.  */  
  5. private[spark] trait ExecutorBackend {  
  6. // 唯一的一个statusUpdate()方法  
  7. // 需要Long类型的taskId、TaskState类型的state、ByteBuffer类型的data三个参数  
  8. def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
  9. }

那么它自然就有两种主要的任务,第一,作为endpoint提供driver与executor间的通讯功能;第二,提供了executor任务执行时状态汇报的功能。

CoarseGrainedExecutorBackend到底是什么呢?这里我们先不深究,留到以后分析,你只要知道它是Executor的一个后台辅助进程,和Executor是一对一的关系,向Executor提供了与Driver通讯、任务执行时状态汇报两个基本功能即可。

接下来,我们看下CoarseGrainedExecutorBackend是如何处理LaunchTask事件的。做为RpcEndpoint,在其处理各类事件或消息的receive()方法中,定义如下:

[java] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. case LaunchTask(data) =>  
  2. if (executor == null) {  
  3. logError("Received LaunchTask command but executor was null")  
  4. System.exit(1)  
  5. else {  
  6. // 反序列话task,得到taskDesc  
  7. val taskDesc = ser.deserialize[TaskDescription](data.value)
  8. logInfo("Got assigned task " + taskDesc.taskId)  
  9. // 调用executor的launchTask()方法加载task  
  10. executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,  
  11. taskDesc.name, taskDesc.serializedTask)
  12. }

首先,会判断对应的executor是否为空,为空的话,记录错误日志并退出,不为空的话,则按照如下流程处理:

1、反序列话task,得到taskDesc;

2、调用executor的launchTask()方法加载task。

那么,重点就落在了Executor的launchTask()方法中,代码如下:

[java] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. def launchTask(
  2. context: ExecutorBackend,
  3. taskId: Long,
  4. attemptNumber: Int,
  5. taskName: String,
  6. serializedTask: ByteBuffer): Unit = {
  7. // 新建一个TaskRunner  
  8. val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,  
  9. serializedTask)
  10. // 将taskId与TaskRunner的对应关系存入runningTasks  
  11. runningTasks.put(taskId, tr)
  12. // 线程池执行TaskRunner  
  13. threadPool.execute(tr)
  14. }

非常简单,创建一个TaskRunner对象,然后将taskId与TaskRunner的对应关系存入runningTasks,将TaskRunner扔到线程池中去执行即可。

我们先看下这个TaskRunner类。我们先看下Class及其成员变量的定义,如下:

[java] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. class TaskRunner(  
  2. execBackend: ExecutorBackend,
  3. val taskId: Long,
  4. val attemptNumber: Int,
  5. taskName: String,
  6. serializedTask: ByteBuffer)
  7. extends Runnable {  
  8. // TaskRunner继承了Runnable  
  9. /** Whether this task has been killed. */  
  10. // 标志位,task是否被杀掉  
  11. @volatile private var killed = false  
  12. /** How much the JVM process has spent in GC when the task starts to run. */  
  13. @volatile var startGCTime: Long = _  
  14. /** 
  15.      * The task to run. This will be set in run() by deserializing the task binary coming 
  16.      * from the driver. Once it is set, it will never be changed. 
  17.      *  
  18.      * 需要运行的task。它将在反序列化来自driver的task二进制数据时在run()方法被设置,一旦被设置,它将不会再发生改变。 
  19.      */  
  20. @volatile var task: Task[Any] = _  
  21. }

由类的定义我们可以看出,TaskRunner继承了Runnable,所以它本质上是一个线程,故其可以被放到线程池中去运行。它所包含的成员变量,主要有以下几个:

1、execBackend:Executor后台辅助进程,提供了与Driver通讯、状态汇报等两大基本功能,实际上传入的是CoarseGrainedExecutorBackend实例;

2、taskId:Task的唯一标识;

3、attemptNumber:Task运行的序列号,Spark与MapReduce一样,可以为拖后腿任务启动备份任务,即推测执行原理,如此,就需要通过taskId加attemptNumber来唯一标识一个Task运行实例;

4、serializedTask:ByteBuffer类型,序列化后的Task,包含的是Task的内容,通过发序列化它来得到Task,并运行其中的run()方法来执行Task;

5、killed:Task是否被杀死的标志位;

6、task:Task[Any]类型,需要运行的Task,它将在反序列化来自driver的task二进制数据时在run()方法被设置,一旦被设置,它将不会再发生改变;

7、startGCTime:JVM在task开始运行后,进行垃圾回收的时间。

另外,既然是一个线程,TaskRunner必须得提供run()方法,该run()方法就是TaskRunner线程在线程池中被调度时,需要执行的方法,我们来看下它的定义:

[java] view plain copy
 在CODE上查看代码片派生到我的代码片
  1. override def run(): Unit = {
  2. // Step1:Task及其运行时需要的辅助对象构造  
  3. // 获取任务内存管理器  
  4. val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)  
  5. // 反序列化开始时间  
  6. val deserializeStartTime = System.currentTimeMillis()
  7. // 当前线程设置上下文类加载器  
  8. Thread.currentThread.setContextClassLoader(replClassLoader)
  9. // 从SparkEnv中获取序列化器  
  10. val ser = env.closureSerializer.newInstance()
  11. logInfo(s"Running $taskName (TID $taskId)")  
  12. // execBackend更新状态TaskState.RUNNING  
  13. execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
  14. var taskStart: Long = 0  
  15. // 计算垃圾回收的时间  
  16. startGCTime = computeTotalGcTime()
  17. try {  
  18. // 调用Task的deserializeWithDependencies()方法,反序列化Task,得到Task运行需要的文件taskFiles、jar包taskFiles和Task二进制数据taskBytes  
  19. val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
  20. updateDependencies(taskFiles, taskJars)
  21. // 反序列化Task二进制数据taskBytes,得到task实例  
  22. task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
  23. // 设置Task的任务内存管理器  
  24. task.setTaskMemoryManager(taskMemoryManager)
  25. // If this task has been killed before we deserialized it, let's quit now. Otherwise,  
  26. // continue executing the task.  
  27. // 如果此时Task被kill,抛出异常,快速退出  
  28. if (killed) {  
  29. // Throw an exception rather than returning, because returning within a try{} block  
  30. // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl  
  31. // exception will be caught by the catch block, leading to an incorrect ExceptionFailure  
  32. // for the task.  
  33. throw new TaskKilledException  
  34. }
  35. logDebug("Task " + taskId + "'s epoch is " + task.epoch)  
  36. // mapOutputTracker更新Epoch  
  37. env.mapOutputTracker.updateEpoch(task.epoch)
  38. // Run the actual task and measure its runtime.  
  39. // 运行真正的task,并度量它的运行时间  
  40. // Step2:Task运行  
  41. // task开始时间  
  42. taskStart = System.currentTimeMillis()
  43. // 标志位threwException设置为true,标识Task真正执行过程中是否抛出异常  
  44. var threwException = true  
  45. // 调用Task的run()方法,真正执行Task,并获得运行结果value  
  46. val (value, accumUpdates) = try {  
  47. // 调用Task的run()方法,真正执行Task  
  48. val res = task.run(
  49. taskAttemptId = taskId,
  50. attemptNumber = attemptNumber,
  51. metricsSystem = env.metricsSystem)
  52. // 标志位threwException设置为false  
  53. threwException = false  
  54. // 返回res,Task的run()方法中,res的定义为(T, AccumulatorUpdates)  
  55. // 这里,前者为任务运行结果,后者为累加器更新  
  56. res
  57. finally {  
  58. // 通过任务内存管理器清理所有的分配的内存  
  59. val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
  60. if (freedMemory > 0) {  
  61. val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"  
  62. if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) {  
  63. throw new SparkException(errMsg)  
  64. else {  
  65. logError(errMsg)
  66. }
  67. }
  68. }
  69. // task完成时间  
  70. val taskFinish = System.currentTimeMillis()
  71. // If the task has been killed, let's fail it.  
  72. // 如果task被杀死,抛出TaskKilledException异常  
  73. if (task.killed) {  
  74. throw new TaskKilledException  
  75. }
  76. // Step3:Task运行结果处理  
  77. // 通过Spark获取Task运行结果序列化器  
  78. val resultSer = env.serializer.newInstance()
  79. // 结果序列化前的时间点  
  80. val beforeSerialization = System.currentTimeMillis()
  81. // 利用Task运行结果序列化器序列化Task运行结果,得到valueBytes  
  82. val valueBytes = resultSer.serialize(value)
  83. // 结果序列化后的时间点  
  84. val afterSerialization = System.currentTimeMillis()
  85. // 度量指标体系相关,暂不介绍  
  86. for (m <- task.metrics) {  
  87. // Deserialization happens in two parts: first, we deserialize a Task object, which  
  88. // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.  
  89. m.setExecutorDeserializeTime(
  90. (taskStart - deserializeStartTime) + task.executorDeserializeTime)
  91. // We need to subtract Task.run()'s deserialization time to avoid double-counting  
  92. m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
  93. m.setJvmGCTime(computeTotalGcTime() - startGCTime)
  94. m.setResultSerializationTime(afterSerialization - beforeSerialization)
  95. m.updateAccumulators()
  96. }
  97. // 构造DirectTaskResult,同时包含Task运行结果valueBytes和累加器更新值accumulator updates  
  98. val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)  
  99. // 序列化DirectTaskResult,得到serializedDirectResult  
  100. val serializedDirectResult = ser.serialize(directResult)
  101. // 获取Task运行结果大小  
  102. val resultSize = serializedDirectResult.limit
  103. // directSend = sending directly back to the driver  
  104. // directSend的意思就是直接发送结果至Driver端  
  105. val serializedResult: ByteBuffer = {
  106. // 如果Task运行结果大小大于所有Task运行结果的最大大小,序列化IndirectTaskResult  
  107. // IndirectTaskResult为存储在Worker上BlockManager中DirectTaskResult的一个引用  
  108. if (maxResultSize > 0 && resultSize > maxResultSize) {  
  109. logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +  
  110. s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +  
  111. s"dropping it.")  
  112. ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))  
  113. }
  114. // 如果 Task运行结果大小超过Akka除去需要保留的字节外最大大小,则将结果写入BlockManager  
  115. // 即运行结果无法通过消息传递  
  116. else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {  
  117. val blockId = TaskResultBlockId(taskId)
  118. env.blockManager.putBytes(
  119. blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
  120. logInfo(
  121. s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")  
  122. ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))  
  123. }
  124. // Task运行结果比较小的话,直接返回,通过消息传递  
  125. else {  
  126. logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")  
  127. serializedDirectResult
  128. }
  129. }
  130. // execBackend更新状态TaskState.FINISHED  
  131. execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
  132. catch {// 处理各种异常信息  
  133. case ffe: FetchFailedException =>  
  134. val reason = ffe.toTaskEndReason
  135. execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
  136. case _: TaskKilledException | _: InterruptedException if task.killed =>  
  137. logInfo(s"Executor killed $taskName (TID $taskId)")  
  138. execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
  139. case cDE: CommitDeniedException =>  
  140. val reason = cDE.toTaskEndReason
  141. execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
  142. case t: Throwable =>  
  143. // Attempt to exit cleanly by informing the driver of our failure.  
  144. // If anything goes wrong (or this was a fatal exception), we will delegate to  
  145. // the default uncaught exception handler, which will terminate the Executor.  
  146. logError(s"Exception in $taskName (TID $taskId)", t)  
  147. val metrics: Option[TaskMetrics] = Option(task).flatMap { task =>
  148. task.metrics.map { m =>
  149. m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
  150. m.setJvmGCTime(computeTotalGcTime() - startGCTime)
  151. m.updateAccumulators()
  152. m
  153. }
  154. }
  155. val serializedTaskEndReason = {
  156. try {  
  157. ser.serialize(new ExceptionFailure(t, metrics))  
  158. catch {  
  159. case _: NotSerializableException =>  
  160. // t is not serializable so just send the stacktrace  
  161. ser.serialize(new ExceptionFailure(t, metrics, false))  
  162. }
  163. }
  164. // execBackend更新状态TaskState.FAILED  
  165. execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
  166. // Don't forcibly exit unless the exception was inherently fatal, to avoid  
  167. // stopping other tasks unnecessarily.  
  168. if (Utils.isFatalError(t)) {  
  169. SparkUncaughtExceptionHandler.uncaughtException(t)
  170. }
  171. finally {  
  172. // 最后,无论运行成功还是失败,将task从runningTasks中移除  
  173. runningTasks.remove(taskId)
  174. }
  175. }

如此长的一个方法,好长好大,哈哈!不过,纵观全篇,无非三个Step就可搞定:

1、Step1:Task及其运行时需要的辅助对象构造;

2、Step2:Task运行;

3、Step3:Task运行结果处理。

对, 就这么简单!鉴于时间与篇幅问题,我们这里先讲下主要流程,细节方面的东西留待下节继续。

下面,我们一个个Step来看,首先看下Step1:Task及其运行时需要的辅助对象构造,主要包括以下步骤:

1.1、构造TaskMemoryManager任务内存管理器,即taskMemoryManager;

1.2、记录反序列化开始时间;

1.3、当前线程设置上下文类加载器;

1.4、从SparkEnv中获取序列化器ser;

1.5、execBackend更新状态TaskState.RUNNING;

1.6、计算垃圾回收时间;

1.7、调用Task的deserializeWithDependencies()方法,反序列化Task,得到Task运行需要的文件taskFiles、jar包taskFiles和Task二进制数据taskBytes;

1.8、反序列化Task二进制数据taskBytes,得到task实例;

1.9、设置Task的任务内存管理器;

1.10、如果此时Task被kill,抛出异常,快速退出;

接下来,是Step2:Task运行,主要流程如下:

2.1、获取task开始时间;

2.2、标志位threwException设置为true,标识Task真正执行过程中是否抛出异常;

2.3、调用Task的run()方法,真正执行Task,并获得运行结果value,和累加器更新accumUpdates;

2.4、标志位threwException设置为false;

2.5、通过任务内存管理器taskMemoryManager清理所有的分配的内存;

2.6、获取task完成时间;

2.7、如果task被杀死,抛出TaskKilledException异常。

最后一步,Step3:Task运行结果处理,大体流程如下:

3.1、通过SparkEnv获取Task运行结果序列化器;

3.2、获取结果序列化前的时间点;

3.3、利用Task运行结果序列化器序列化Task运行结果value,得到valueBytes;

3.4、获取结果序列化后的时间点;

3.5、度量指标体系相关,暂不介绍;

3.6、构造DirectTaskResult,同时包含Task运行结果valueBytes和累加器更新值accumulator updates;

3.7、序列化DirectTaskResult,得到serializedDirectResult;

3.8、获取Task运行结果大小;

3.9、处理Task运行结果:

3.9.1、如果Task运行结果大小大于所有Task运行结果的最大大小,序列化IndirectTaskResult,IndirectTaskResult为存储在Worker上BlockManager中DirectTaskResult的一个引用;

3.9.2、如果 Task运行结果大小超过Akka除去需要保留的字节外最大大小,则将结果写入BlockManager,Task运行结果比较小的话,直接返回,通过消息传递;

3.9.3、Task运行结果比较小的话,直接返回,通过消息传递

3.10、execBackend更新状态TaskState.FINISHED;

最后,无论运行成功还是失败,将task从runningTasks中移除。

至此,Task的运行主体流程已经介绍完毕,剩余的部分细节,包括Task内run()方法的具体执行,还有任务内存管理器、序列化器、累加更新,还有部分异常情况处理,状态汇报等等其他更为详细的内容留到下篇再讲吧!

明天还要工作,洗洗睡了!

博客原地址:http://blog.csdn.net/lipeng_bigdata/article/details/50726216

转载于:https://www.cnblogs.com/jirimutu01/p/5274461.html

相关文章:

solidworks画白色金属光泽_美人的共通点就是卧蚕,卧蚕真的太重要了,没有也要画出来...

在美学上&#xff0c;卧蚕加上双眼皮最能呈现「电眼」风采。美人们的共通点就是「卧蚕」&#xff0c;它主宰着眼神的命运&#xff0c;不仅能提升下眼睑魅力&#xff0c;让眼神看起来灵动温和&#xff0c;也能让眼睛放大&#xff0c;还提升你的好人缘呢。有木有发现&#xff0c;…

ssh: connect to host github.com port 22: Connection timed out fatal: Could not read from remote...

ssh报错&#xff1a; ssh: connect to host github.com port 22: Connection timed out fatal: Could not read from remote repository. 在终端输入&#xff1a;$ ssh -T gitgithub.com 打开下图路径下查看是否有该文件&#xff1a; 没有的话就添加一个文件命名&#xff1a…

虚幻引擎4:打造街机经典游戏学习教程 Unreal Engine 4: Create an Arcade Classic

比如“堡垒之夜&#xff1f;”掌握用于构建它的工具&#xff0c;因为我们使用虚幻引擎4重新创建了一个街机经典 描述 在这个游戏设计课程中&#xff0c;我将一步一步地指导你使用虚幻引擎4重新创建一个街机经典:首要免费下载游戏引擎&#xff01;(和制造轰动一时的《堡垒之夜》…

MySQL基本了解与使用

MySQL的相关概念介绍 MySQL 为关系型数据库(Relational Database Management System), 这种所谓的"关系型"可以理解为"表格"的概念, 一个关系型数据库由一个或数个表格组成, 如图所示的一个表格: 表头(header): 每一列的名称;列(row): 具有相同数据类型的数…

proxmoxve打造云桌面_AIO攻略 | 桌面云是如何解决外设问题的?

如何有效牢固地保护敏感数据&#xff1f;如何实现通过多元化的终端设备接入企业办公桌面&#xff0c;且高效安全、不限地点&#xff1f;如何实现桌面集中维护、统一管理&#xff0c;同时打造云上轻办公空间呢&#xff1f;这恐怕是很多企业都在思考的问题。答案非桌面云莫属。桌…

8位无符号和8位有符号转化为更高字节类型的问题

main() {unsigned char uch0xff; //数值为255 char ch0xff; //数值为-1int i0; iiuch;printf("%x\n",i);i0;iich;printf("%x\n",i);} 运行结果为&#xff1a; ff ffffffff 解释&#xff…

C/C++基础知识

C/C知识积累 知识积累 1.string是C的标准库&#xff0c;是不能作为变量名的&#xff1b; 2.strcpy(str1,str2)是将str2拷贝给str1&#xff0c;并返回str1&#xff1b; 注&#xff1a;若str2比str1大&#xff0c;将导致数据溢出&#xff0c;导致错误&#xff1b; char str “…

面向完全初学者的Unity和C#游戏开发学习教程

了解如何通过使用Unity游戏引擎和C#制作BomberMan风格的3D游戏来制作您的第一款视频游戏 你会学到: 使用Unity 2021学习3D游戏开发 通过制作你的第一个3D游戏来学习C#编程语言的基础知识 从头到尾制作你的第一个电子游戏 超过40个视频作业&#xff0c;测试你的学习技能&#x…

java transient关键字_小伙子,你真的搞懂 transient 关键字了吗?

先解释下什么是序列化我们的对象并不只是存在内存中&#xff0c;还需要传输网络&#xff0c;或者保存起来下次再加载出来用&#xff0c;所以需要Java序列化技术。Java序列化技术正是将对象转变成一串由二进制字节组成的数组&#xff0c;可以通过将二进制数据保存到磁盘或者传输…

DEDECMS后台上传banner图控制图片轮播

将图片轮播做到后台控制&#xff0c;无论是dedecms还是其他的程序都是一样的重要&#xff0c;方便客户自己调试&#xff0c;不然动不动就拿FTP开刷&#xff0c;一个是操作不方便&#xff0c;增加了使用上的难度&#xff0c;另外也有一定的风险&#xff0c;很可能由于操作生疏&a…

C/C++刷题知识点总结

纠正一些知识性偏见、欠缺&#xff1a; 变量占用内存大小&#xff0c;是用sizeof&#xff08;&#xff09;计算出来那个&#xff0c;定义字符型数据时后面不添加’\0‘的&#xff0c;但是在字符串数组后面是添加的&#xff1b;例如char a[]{a,b,c};char b[]{"abc"};两…

虚幻引擎5:从零开始的蓝图近战AI学习教程

Unreal Engine 5: Blueprint Melee Combat AI from Scratch 创建一个动作游戏&#xff0c;以打击2个独特的人工智能&#xff0c;包括一个火蔓延龙&#xff01; 你会学到什么 学习使用虚幻引擎5的技巧 如何通过蓝图设计和创建人工智能 学会创造两种独特的人工智能 学会在战斗中…

git add 文件夹_Git的下载安装以及基本操作

二&#xff0c;配置git&#xff1a;用户名和邮箱在桌面右键-【Git Bash Here】输入命令&#xff1a;git config --global user.name "lijiang"git config --global user.email "2906718132qq.com"查看用户名和邮箱是否配置成功&#xff0c;去这个路径&…

scapy windows install

最近有点扫描网络的需求&#xff0c;都说scapy好&#xff0c;但是安装是个事&#xff08;当然指的是windows安装&#xff09;有个scapy3k&#xff0c;支持python3&#xff0c;可惜需要powershell&#xff0c;也就是说windows xp是没有戏了。 网上说最好python2.6&#xff0c;但…

Enda 的 2015 下半年读书计划

2015下半年的读书计划 php 方面 《PHP设计模式》 《PHP核心技术与最佳实践》 《learning php设计模式》 《深入php》 Mysql 方面 《高性能MySQL》 《深入理解mysql核心技术》 其他 《天才在左疯子在右》 大家有什么好书吗&#xff1f;推荐推荐嘛&#xff5e;&#xff5e;

Qt嵌入外部EXE程序,并显示在主界面中!

一、获取程序句柄以及类 打开Visual Studio 进行查询 二、QT调用程序 H文件 #ifndef VISUAL_H #define VISUAL_H#include <QWidget> #include <QProcess> #include<Windows.h> #include<QWindow>QT_BEGIN_NAMESPACE namespace Ui { class Vi…

UE虚幻引擎:建筑可视化学习教程 Unreal Engine : Architectural Visualization

使用这套4合1虚幻引擎课程包提高您的建筑可视化技能 你会学到什么 准备要导出的Revit模型 使用虚幻数据史密斯产品查看器工具 创造UE的材料 交换材料 点亮你的模型 添加交互 打包内容以便在虚拟现实和移动设备上分发 迁移材料包 批量更换材料 静态和动态照明 光照图分析、校正…

NOIP2013普及组 T2 表达式求值

OJ地址&#xff1a;洛谷P1981 CODEVS 3292 正常写法是用栈 1 #include<iostream>2 #include<algorithm>3 #include<cmath>4 #include<stack>5 #include<cstring>6 #include<cstdio>7 using namespace std;8 char c[20000000];9 10 stack&…

清除图片下默认的小间隙_密封间隙对迷宫密封性能影响的三维数值分析

建立迷宫密封的三维模型&#xff1b;采用GAMBIT对迷宫间隙进行非结构化网格划分&#xff0c;模拟密封间隙对迷宫密封性能的影响&#xff0c;并与二维截面模型模拟结果进行对比。结果表明&#xff1a;在考虑周向湍流的影响下&#xff0c;泄漏量相对于仅考虑横向及纵向湍流f的影响…

Thrift抛直接内存OOM一点解决思路

2019独角兽企业重金招聘Python工程师标准>>> 最近使用Thrift TThreadedSelectorServer服务方式&#xff0c;运行一段时间就会抛OutOfMemoryError: Direct buffer memory异常&#xff1b; java.lang.OutOfMemoryError: Direct buffer memoryat java.nio.Bits.reserve…

C/C++、嵌入式开发岗笔试集锦

图标链接 C运算符检索&#xff08;点击查看 - 转&#xff09; C构造函数、拷贝函数等理解&#xff08;点击查看 - 转&#xff09; 题目 ⭐⭐⭐1.&#xff08;STL库&#xff09;请问C的STL库中map的底层数据结构为&#xff08;&#xff09;&#xff0c;multimap的底层数据结…

Blender制作3D模型导出到UE5完整学习教程

学习如何在Blender中创建AAA游戏资产&#xff0c;然后导出到虚幻引擎5的完整指南 你会学到什么 遵循关于创建一套AAA游戏就绪的优质中世纪市场摊位的完整指南 最大化您的推荐人的潜力&#xff0c;以开发强大的游戏资产概念 使用Blender并了解其界面和快捷方式 从初级水平到世界…

[纪录]仿IOS滚轮效果(竖直滑动选择器)

今天想做一个类似这样的一个效果,可是UI的模板是参考IOS做的,于是就各种百度各种搜,最后让我找到了一个仿IOS滚轮的一个Demo,稍微研究了一下,发上来,大家一起学习,以后也方便我查看,就不用再去百度了,嘿嘿! 仿IOS滚轮效果 首先是一个类,继承了ScrollView public class WheelVie…

win7如何设置wifi热点_xp系统怎么设置无线网络热点(电脑设置wifi热点步骤)

现在手机基本都是可以开启wifi热点功能&#xff0c;但有时候因为各种原因&#xff0c;比如没有无线或者路由器有问题等情况&#xff0c;手机又需要使用网络&#xff0c;这个时候需要用电脑设置wifi热点。那么&#xff0c;怎么用电脑设置wifi热点呢&#xff1f;下面分别用win10系…

汇编语言第二课作业-实验1

Debug命令-摘自百度百科A 汇编命令 功能&#xff1a; 将指令直接汇编成机器码输入到内存中。说明&#xff1a; 用于小段程序的汇编及修改目标程序&#xff0c;所有输入的数字均采用十六进制&#xff0c; 用户装入内存的汇编语句是连续存放的&#xff0c;若没有指定地址&#xf…

C++、嵌入式软开之数据结构

总结&#xff1a; 1.二叉树搜索树是需要进行比较大小&#xff0c;满足传递性才可&#xff1b; 刷题总结&#xff1a; 1.&#xff08;二叉树遍历&#xff09;已知某二叉树的后序遍历序列是dabec&#xff0c;中序遍历序列是debac&#xff0c;它的前序遍历是&#xff08;&#xf…

3dsMax插件V-Ray渲染与合成学习课程 3ds Max: Rendering for Compositing in V-Ray Next

使用渲染元素&#xff0c;3ds Max的V-Ray Next对创建高质量合成所需的参数(如反射、阴影、遮罩等)提供了精细的控制。在本课程中&#xff0c;布莱恩布拉德利展示了如何在Photoshop和After Effects等应用程序中使用V-Ray Next为后期制作工作流专门创建渲染。探索渲染元素用户界面…

对找数程序的理解

经过几个小时的思考&#xff0c;总算是对老师出的这个程序题有了一定的了解。该C#程序是一个对数字进行查找的程序。程序清单如下&#xff1a; using System; using System.Collections.Generic; using System.Text; namespace FindTheNumber {  class Program  {    s…

win10鼠标灵敏度怎么调_和平精英最稳压枪灵敏度怎么调教程,适合所有段位以及适合国际版PUBG手游压枪...

和平精英(原刺激战场)主播最稳压枪灵敏度怎么调&#xff1f;不妨看看花了五个小时调试的最稳和平精英压枪灵敏度吧。废话不多&#xff0c;上图按照调。保证你满意&#xff0c;你离主播只差点意识此和平精英压枪灵敏度适合所有段位 也适合PUBG(和平精英国际版)-------分割线---…

C++基本知识点集锦(2022秋招)

&#xff08;1&#xff09;构造函数是一种特殊的类成员函数&#xff0c;是当创建一个类的对象时&#xff0c;它被调用来对类的数据成员进行初始化和分配内存。&#xff08;构造函数的命名必须和类名完全相同&#xff09; &#xff08;2&#xff09;C的空类&#xff0c;编译器会…