Flink Job提交分析

发布于:2024-04-25 ⋅ 阅读:(46) ⋅ 点赞:(0)

1.概述

  Flink 应用程序的提交方式为:打成jar包,通过 flink 命令来进行提交。 flink 命令脚本的底层是通过 java 命令启动:CliFrontend 类 来启动 JVM 进程,执行任务的构造和提交。

flink run xxx.jar class arg1 arg2

flink.sh 脚本:

##  flink.sh
target="$0"
# For the case, the executable has been directly symlinked, figure out
# the correct bin path by following its symlink up to an upper bound.
# Note: we can't use the readlink utility here if we want to be POSIX
# compatible.
iteration=0
while [ -L "$target" ]; do
    if [ "$iteration" -gt 100 ]; then
        echo "Cannot resolve path: You have a cyclic symlink in $target."
        break
    fi
    ls=`ls -ld -- "$target"`
    target=`expr "$ls" : '.* -> \(.*\)$'`
    iteration=$((iteration + 1))
done

# Convert relative path to absolute path
bin=`dirname "$target"`

# get flink config 获取Flink配置信息
. "$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then
        FLINK_IDENT_STRING="$USER"
fi
CC_CLASSPATH=`constructFlinkClassPath`
log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log
log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)

# Add Client-specific JVM options
FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_CLI}"

# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
# 提交jar取集群运行
exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

应用程序提交的入口:org.apache.flink.client.cli.CliFrontend

2.CliFrontend 提交分析

  当 Flink 应用程序被打 jar包 使用 flink run … 的 shell 命令提交时,底层是通过 CliFrontend 来处理。通过反射来调用用户程序的 main() 方法执行。

2.1 启动入口(main)

  Flink Job 提交时程序运行的入口为: org.apache.flink.client.cli.CliFrontend.main()。主要做了以下事情:

  • 1.初始化 CliFrontend 对象
  • 2.解析参数(内部提交job)
/*
* main()
* --> cli.parseParameters(args) 解析命令行参数
* --> run(params); 提交job
*/
public static void main(final String[] args) {
		// 注释:打印输出一些环境信息
		EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

		// 注释: 通过 FLINK_CONF_DIR 变量找到 conf 目录
		// 1. find the configuration directory
		final String configurationDirectory = getConfigurationDirectoryFromEnv();
		//注释: 解析 conf 目录下的 flink-conf.yaml 配置文件
		// 2. load the global configuration
		final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
		// 注释: 加载:FlinkYarnSessionCli 和 DefaultCLI
		// 3. load the custom command lines
		final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(configuration, configurationDirectory);

		try {
			/*************************************************
			 *  注释: 初始化 CliFrontend 对象
			 */
			final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
			SecurityUtils.install(new SecurityConfiguration(cli.configuration));

			/*************************************************
			 *  注释: 运行 解析命令行并并开始请求操作
			 */
			int retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseParameters(args));
			System.exit(retCode);
		} catch(Throwable t) {
			final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
			LOG.error("Fatal error while running command line interface.", strippedThrowable);
			strippedThrowable.printStackTrace();
			System.exit(31);
		}
	}
//-->cli.parseParameters(args) 解析命令行参数
/**
	 * Parses the command line arguments and starts the requested action.
	 * 解析命令行参数并启动请求的操作。
	 * @param args command line arguments of the client.
	 * @return The return code of the program
	 */
public int parseParameters(String[] args) {
		// 注释: 检查参数的长度
		// check for action
		if(args.length < 1) {
			CliFrontendParser.printHelp(customCommandLines);
			System.out.println("Please specify an action.");
			return 1;
		}
		//注释: 获取第一个参数:flink run 命令中的 run 命令
		// get action
		String action = args[0];
		//注释: 从所有参数中,移除 action 参数
		// remove action from parameters
		final String[] params = Arrays.copyOfRange(args, 1, args.length);
		try {
			// do action
			switch(action) {
				// 注释: 提交job
				case ACTION_RUN:
					run(params);
					return 0;
				case ACTION_RUN_APPLICATION:
					runApplication(params);
					return 0;
				case ACTION_LIST:
					list(params);
					return 0;
				case ACTION_INFO:
					info(params);
					return 0;
				case ACTION_CANCEL:
					cancel(params);
					return 0;
				case ACTION_STOP:
					stop(params);
					return 0;
				case ACTION_SAVEPOINT:
					savepoint(params);
					return 0;
				case "-h":
				case "--help":
					CliFrontendParser.printHelp(customCommandLines);
					return 0;
				case "-v":
				case "--version":
		......
	}

注:ResourceID:Flink集群启动时主节点和从节点都会生成一个全局唯一的ID。流程图解如下:
在这里插入图片描述

2.2 run(运行进入wordCount)

  • 1.为程序提交准备的有效配置(参数+jar包)

    • 1、activeCommandLine–>不为空的参数
    • 2、commandLine–>命令行参数
    • 3、programOptions --> 程序参数
    • 4、jobJars–>依赖jar
  • 2.提交执行:executeProgram–>ClientUtils.executeProgram–>program.invokeInteractiveModeForExecution()–>callMainMethod(mainClass, args)–>entryClass.getMethod(“main”, String[].class)【获取运行主类的 main 方法实例】–>mainMethod.invoke(null, (Object) args)【调用运行主类的 main 方法】

/* run(params) 解析完成的参数、PackagedProgram对象、依赖Jar处理
 * --> 方法1: getPackagedProgram(programOptions);
    --> buildProgram(programOptions); 
    --> PackagedProgram.newBuilder().setxx(设置各种属性(Jar包、用户环境变量、主类入口、配置信息等))
 * --> 方法2:executeProgram(effectiveConfiguration, program);
 	-->ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program, false, false);
 	--> program.invokeInteractiveModeForExecution();
 	--> callMainMethod(mainClass, args);
 	--> entryClass.getMethod("main", String[].class).invoke(null, (Object) args);
 *
 */

//--> run
	/**
	 * Executions the run action.执行运行操作
	 * @param args Command line arguments for the run action.
	 */
	protected void run(String[] args) throws Exception {
		LOG.info("Running 'run' command.");
		final Options commandOptions = CliFrontendParser.getRunCommandOptions();
		// 注释: 解析好的对应的参数
		final CommandLine commandLine = getCommandLine(commandOptions, args, true);
		// evaluate help flag
		if(commandLine.hasOption(HELP_OPTION.getOpt())) {
			CliFrontendParser.printHelpForRun(customCommandLines);
			return;
		}

		final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(checkNotNull(commandLine));

		final ProgramOptions programOptions = ProgramOptions.create(commandLine);

		/*************************************************
		 *  注释: 构建 PackagedProgram对象,该对象将程序封装到Jar包中,通过getPackagedProgram方法buildProgram方法(创建一个打包程序)获取PackagedProgram对象,设置jar包、应用程序主类的全路径、设置主类以及配置信息等
		 主类:.setEntryPointClassName(entryPointClass)
		 */
		final PackagedProgram program = getPackagedProgram(programOptions);
		//注释: 依赖jar处理
		final List<URL> jobJars = program.getJobJarAndDependencies();
		/*************************************************
		 *  注释: 有效配置
		 *  1、activeCommandLine
		 *  2、commandLine
		 *  3、programOptions	程序参数
		 *  4、jobJars			依赖jar
		 */
		final Configuration effectiveConfiguration = getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);

		LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

		try {//提交执行
			executeProgram(effectiveConfiguration, program);
		} finally {
			program.deleteExtractedLibraries();
		}
	}
/*****************************************************************************/
//--> callMainMethod(mainClass, args)
private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException {
		Method mainMethod;
		if(!Modifier.isPublic(entryClass.getModifiers())) {
			throw new ProgramInvocationException("The class " + entryClass.getName() + " must be public.");
		}
		try {
			/*************************************************
			 *  注释:获取运行主类的 main 方法实例
			 */
			mainMethod = entryClass.getMethod("main", String[].class);

		} catch(NoSuchMethodException e) {
			throw new ProgramInvocationException("The class " + entryClass.getName() + " has no main(String[]) method.");
		} catch(Throwable t) {
			throw new ProgramInvocationException(
				"Could not look up the main(String[]) method from the class " + entryClass.getName() + ": " + t.getMessage(), t);
		}
........
		try {
			/*************************************************
			 *  注释: 通过反射的方式调用运行主类的 main 方法
			 *  跳转到运行主类的 main 方法(wordCount)
			 */
			mainMethod.invoke(null, (Object) args);
		} catch(IllegalArgumentException e) {
			throw new ProgramInvocationException("Could not invoke the main method, arguments are not matching.", e);
		} catch(IllegalAccessException e) {
			throw new ProgramInvocationException("Access to the main method was denied: " + e.getMessage(), e);
		} .........
	}

CliFrontend内部主要完成:

1、根据 flink 后面的执行命令来确定执行方法(run ==> run(params));

2、解析 main 参数,构建 PackagedProgram对象,然后执行 PackagedProgram;

3、通过反射获取应用程序的 main 方法的实例,通过反射的方式调用执行。

在这里插入图片描述

2.3 ExecutionEnvironment源码解析

  Flink 应用程序的执行,首先是创建运行环境 StreamExecutionEnvironment,一般在企业环境中,都是通过 getExecutionEnvironment() 来获取 ExecutionEnvironment,如果是本地运行的话,则会获取:LocalStreamEnvironment,如果是提交到 Flink 集群运行,则获取到:StreamExecutionEnvironment。

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

StreamExecutionEnvironment 是 Flink 应用程序的执行入口,提供了一些重要的操作机制:

  • 1、提供 readTextFile(), socketTextStream(), createInput(), addSource() 等方法对接数据源。
  • 2、提供 setParallelism() 设置程序的并行度。
  • 3、StreamExecutionEnvironment 管理了 ExecutionConfig 对象,该对象负责Job执行行为的配置管理,同时还管理其他配置,例如程序的并行度、失败重试以及序列化器等配置信息。
  • 4、StreamExecutionEnvironment 管理 List<Transformation<?>> transformations成员变量,主要用于保存 Job 的各种算子转化得到的 Transformation,将这些Transformation 按照逻辑拼接起来得到 StreamGragh(Transformation ->StreamOperator -> StreamNode)
  • 5、StreamExecutionEnvironment 提供了 execute() 方法用于提交 Job 执行,该方法接收的参数:StreamGraph。

2.4 以SocketWindowWordCount为例

public class SocketWindowWordCount {
	public static void main(String[] args) throws Exception {
		// the host and the port to connect to
		final String hostname;
		final int port;
		try {
			final ParameterTool params = ParameterTool.fromArgs(args);
			hostname = params.has("hostname") ? params.get("hostname") : "localhost";
			port = params.getInt("port");
		} catch (Exception e) {
			System.err.println("No port specified. Please run 'SocketWindowWordCount " +"--hostname <hostname> --port <port>', where hostname (localhost by default) " +"and port is the address of the text server");
			System.err.println("To start a simple text server, run 'netcat -l <port>' and " +"type the input text into the command line");
			return;
		}

		// get the execution environment 获取执行环境
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// get input data by connecting to the socket 通过socket获取输入数据,底层调用addSource()方法
		DataStream<String> text = env.socketTextStream(hostname, port, "\n");
		// 处理数据获取结果	
		// parse the data, group it, window it, and aggregate the counts
		DataStream<WordWithCount> windowCounts = text
				.flatMap(new FlatMapFunction<String, WordWithCount>() {
					@Override
					public void flatMap(String value, Collector<WordWithCount> out) {
						for (String word : value.split("\\s")) {
							out.collect(new WordWithCount(word, 1L));
						}
					}
				})
				.keyBy(value -> value.word)
				.timeWindow(Time.seconds(5))

				.reduce(new ReduceFunction<WordWithCount>() {
					@Override
					public WordWithCount reduce(WordWithCount a, WordWithCount b) {
						return new WordWithCount(a.word, a.count + b.count);
					}
				});

		// print the results with a single thread, rather than in parallel
		windowCounts.print().setParallelism(1);
		env.execute("Socket Window WordCount");
	}

在这里插入图片描述

分析:

  • 1.获取执行环境对象StreamExecutionEnvironment,该对象主要作用是:提供对接数据源的多种方法,设置程序运行并行度,创建ExcutionConfig对象,封装transformations成员变量以及执行应用程序。
  • 2.加载数据源获取数据抽象:DataStreamSource,底层都是调用addSource方法
    • 对于函数的理解:
      • Function–传参
      • Operator–Graph中的抽象概念
      • Transformation–对流的逻辑操作
      • Function --> Operator --> Transformation
  • 3.将算子生成Transformation加入到Env中的transformations集合中,算子类型是通过反射获取。
    • flatMap(Function)–>transform(StreamOperator)–>Env.addOperator(Transformation)–>transformations.add(transformation)
    • keyBy(Function)–>KeyedStream(Function)–>KeyedStream(Transformation,KeyGroupStreamPartitioner)
    • reduce(Fuction)–>input.transform(opName, resultType, operator)–>doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator))–>getExecutionEnvironment().addOperator(resultTransform)–>this.transformations.add(transformation)
  • 4.提交执行:env.execute()

3. Job 提交流程源码分析

核心流程如下:

env.execute("Socket Window WordCount");
//-->execute 源码入口
	StreamExecutionEnvironment.execute(getStreamGraph(jobName))

任务提交之后,主要做了两件事情:

  • 1.getStreamGraph(jobName) 获取 Graph
  • 2.execute(Graph) 执行Graph

3.1 getStreamGraph(JobName)获取StreamGraph解析

  • 通过StreamGraphGenerator对象中的generate()方法生成streamGraph对象
  • generate方法内部实现:
    • 1.构建StreamGraph对象,然后设置各种成员变量属性
    • 2.generate()方法针对每一个transformation进行转换transform(),最终返回tranformIds的集合。
  • transform方法内部实现:
    • 根据 transform 的类型,做相应不同的转换,以transformOneInputTransform为例
    • streamGraph.addOperator底层是为StreamNode设置映射关系(vertexID, vertex)
    • streamGraph.addEdge底层是为上游的StreamNode设置出边,为下游的StreamNode设置入边
      外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

StreamGraph:根据用户通过Stream API 编写的代码生成最初的图。Flink将每一个算子transform成一个对流的转换(比如 SingleOutputStreamOperator, 它就是一个 DataStream 的子类),并且注册到执行环境中,用于生成 StreamGraph。

StreamGraph抽象的概念:

1.StreamNode:用来代表 operator 的类,并具有所有相关的属性,如并发度、入边和出边等。

2.StreamEdge:表示连接两个 StreamNode 的边。

	@Internal
	public StreamGraph getStreamGraph(String jobName) {
		/*************************************************
		 *  注释:获取StreamGraph
		 */
		return getStreamGraph(jobName, true);
	}
/*****************************************************************************/
//--> getStreamGraph(jobName, true)
	@Internal
	public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
		/*************************************************
		 *  注释:
		 *  1、getStreamGraphGenerator() = StreamGraphGenerator
		 *  2、调用 generate 生成 StreamGragh
		 */
		StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();

		if(clearTransformations) {
			this.transformations.clear();
		}
		return streamGraph;
	}
/*****************************************************************************/
//-->getStreamGraphGenerator().setJobName(jobName).generate();
//返回的是StreamGraphGenerator对象
private StreamGraphGenerator getStreamGraphGenerator() {
		if(transformations.size() <= 0) {
			throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
		}
		// 注释:数据处理操作都在这个 transformations 列表里
		return new StreamGraphGenerator(transformations, config, checkpointCfg)
            .setStateBackend(defaultStateBackend)  // StateBackend = null
		   .setChaining(isChainingEnabled)  // isChainingEnabled = true
		   .setUserArtifacts(cacheFile).setTimeCharacteristic(timeCharacteristic)        			// TimeCharacteristic = ProcessingTime
		   .setDefaultBufferTimeout(bufferTimeout);  // default 100
	}
/*****************************************************************************/
// -->generate()
public StreamGraph generate() {
		//  注释:实例化StreamGraph对象
		streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
		//  注释: 设置各种属性(成员变量赋值)
		streamGraph.setStateBackend(stateBackend);
		streamGraph.setChaining(chaining);
		streamGraph.setScheduleMode(scheduleMode);
		streamGraph.setUserArtifacts(userArtifacts);
		streamGraph.setTimeCharacteristic(timeCharacteristic);
		streamGraph.setJobName(jobName);
		streamGraph.setGlobalDataExchangeMode(globalDataExchangeMode);
		alreadyTransformed = new HashMap<>();

		/*************************************************
		 *  注释: 执行各种 transformation
		 *  自底向上(先遍历 input transformations)对转换的每个 transformation 进行转换
		 */
		for (Transformation<?> transformation: transformations) {
			transform(transformation);
		}

		final StreamGraph builtStreamGraph = streamGraph;
		alreadyTransformed.clear();
		alreadyTransformed = null;
		streamGraph = null;

		return builtStreamGraph;
	}
/*****************************************************************************/
//-->transform(transformation)
//注释:对具体的一个 transformation 进行转换,转换成 StreamGraph 中的 StreamNode 和 StreamEdge
//返回值为该 transform 的 id 集合,通常大小为 1 个(除 FeedbackTransformation)
private Collection<Integer> transform(Transformation<?> transform) {

		//  注释: 已转换过的Transformation 会放在alreadyTransformed集合中
		if (alreadyTransformed.containsKey(transform)) {
			return alreadyTransformed.get(transform);
		}
		LOG.debug("Transforming " + transform);
		if (transform.getMaxParallelism() <= 0) {
			// 注释:如果 MaxParallelism 没有设置,使用 job 的 MaxParallelism 设置
			// if the max parallelism hasn't been set, then first use the job wide max parallelism from the ExecutionConfig.
			int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
			if (globalMaxParallelismFromConfig > 0) {
				transform.setMaxParallelism(globalMaxParallelismFromConfig);
			}
		}

		//  注释: 如果是 MissingTypeInfo 类型(类型不确定),将会触发异常
		// call at least once to trigger exceptions about MissingTypeInfo
		transform.getOutputType();

		/*************************************************
		 *  注释: 关键实现:根据 transform 的类型,做相应不同的转换
		 */
		Collection<Integer> transformedIds;
		if (transform instanceof OneInputTransformation<?, ?>) {
			transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
		} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
			transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
		} else if (transform instanceof AbstractMultipleInputTransformation<?>) {
			transformedIds = transformMultipleInputTransform((AbstractMultipleInputTransformation<?>) transform);
		......
		}
		// need this check because the iterate transformation adds itself before
		// transforming the feedback edges
		if (!alreadyTransformed.containsKey(transform)) {
			alreadyTransformed.put(transform, transformedIds);
		}
		//注释:将这个 Transform 相关的信息记录到 StreamGraph 中
		if (transform.getBufferTimeout() >= 0) {
			streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
		} else {
			streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout);
		}
		......
		return transformedIds;
	}
/*****************************************************************************/
//-->以transformOneInputTransform()为例进行说明
	/**
	 * Transforms a {@code OneInputTransformation}.
	 * // 注释:递归转换输入在图中创建一个新的{@code StreamNode},并将输入连接到该新节点。
	 */
	private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {

		// 注释:递归调用input 的 Transformation 处理完后才能处理后面
		Collection<Integer> inputIds = transform(transform.getInput());

		// the recursive call might have already transformed this
		if (alreadyTransformed.containsKey(transform)) {
			return alreadyTransformed.get(transform);
		}

		// 注释:获取 share group
		String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);

		// 将Transformation变成 Operator 设置到 StreamGraph 中,其实是添加 StreamNode
		streamGraph.addOperator(transform.getId(),
				slotSharingGroup,
				transform.getCoLocationGroupKey(),
				transform.getOperatorFactory(),
				transform.getInputType(),
				transform.getOutputType(),
				transform.getName());
		if (transform.getStateKeySelector() != null) {
			TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
			streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
		}

		int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
			transform.getParallelism() : executionConfig.getParallelism();
		streamGraph.setParallelism(transform.getId(), parallelism);
		streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
		//设置该 StreamNode 的入边 StreamEdge
		for (Integer inputId: inputIds) {
			// 注释:根据输入的 id,给这个 node 在 graph 中设置相应的 graph
			streamGraph.addEdge(inputId, transform.getId(), 0);
		}
		return Collections.singleton(transform.getId());
	}
/*****************************************************************************/
//streamGraph.addOperator()-->addNode
protected StreamNode addNode(
			Integer vertexID,
			@Nullable String slotSharingGroup,
			@Nullable String coLocationGroup,
			Class<? extends AbstractInvokable> vertexClass,
			StreamOperatorFactory<?> operatorFactory,
			String operatorName) {

		if (streamNodes.containsKey(vertexID)) {
			throw new RuntimeException("Duplicate vertexID " + vertexID);
		}

		StreamNode vertex = new StreamNode(
				vertexID,
				slotSharingGroup,
				coLocationGroup,
				operatorFactory,
				operatorName,
				new ArrayList<OutputSelector<?>>(),
				vertexClass);
        //Map集合
		streamNodes.put(vertexID, vertex);
		return vertex;
	}
/*****************************************************************************/
//streamGraph.addEdge(inputId, transform.getId(), 0)--> addEdgeInternal()
private void addEdgeInternal(Integer upStreamVertexID,
			Integer downStreamVertexID,
			int typeNumber,
			StreamPartitioner<?> partitioner,
			List<String> outputNames,
			OutputTag outputTag,
			ShuffleMode shuffleMode) {
			.....
              // 构建StreamNode之间的边(StreamEdge)对象
			StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);
             //上游StreamNode设置出边
			getStreamNode(edge.getSourceId()).addOutEdge(edge);
             //下游StreamNode设置入边
			getStreamNode(edge.getTargetId()).addInEdge(edge);
		}
	}

获取 StreamGraph 的流程图解:

在这里插入图片描述

3.2 execute(StreamGraph)解析

  • 将StreamGraph转换成JobGraph(优化动作)
  • 通过 RestClusterClient 提交 JobGraph 到Flink集群执行
    • 在初始化RestClient对象时,会初始化netty的客户端用来处理应用程序的提交
    • 应用程序提交的到Flink集群,是由WebMonitorEndpoint中的jobSubmitHandler处理
    /**
	 * // 注释:触发程序执行。环境将执行程序的所有导致“接收器”操作的部分。
	 * Triggers the program execution. The environment will execute all parts of
	 * the program that have resulted in a "sink" operation.
	 *
	 * //注释:接收器操作例如是打印结果或将其转发到消息队列。
	 * Sink operations are for example printing results or forwarding them to a message queue.
	 *
	 * @param streamGraph the stream graph representing the transformations
	 * @return The result of the job execution, containing elapsed time and accumulators.
	 * @throws Exception which occurs during job execution.
	 */
	@Internal
	public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
		/*************************************************
		 *   注释: 执行 StreamGraghde 入口(异步的方式执行)
		 */
		final JobClient jobClient = executeAsync(streamGraph);

		try {
			final JobExecutionResult jobExecutionResult;
			/*************************************************
			 *  注释: 获取执行结果
			 */
			if(configuration.getBoolean(DeploymentOptions.ATTACHED)) {
				jobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
			} else {
				jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
			}
			jobListeners.forEach(jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
			return jobExecutionResult;
		} catch(Throwable t) {
			jobListeners.forEach(jobListener -> {
				jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t));
			});
			ExceptionUtils.rethrowException(t);
			// never reached, only make javac happy
			return null;
		}
	}
/*******************************************************************************/
//--> executeAsync(streamGraph) 异步执行
    @Internal
	public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
		checkNotNull(streamGraph, "StreamGraph cannot be null.");
		checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");

		final PipelineExecutorFactory executorFactory = executorServiceLoader.getExecutorFactory(configuration);

		checkNotNull(executorFactory, "Cannot find compatible factory for specified execution.target (=%s)",
			configuration.get(DeploymentOptions.TARGET));

		/*************************************************
		 *  注释: 异步提交执行 StreamGraph
		 */
		CompletableFuture<JobClient> jobClientFuture = executorFactory.getExecutor(configuration).execute(streamGraph, configuration);

		/*************************************************
		 *  注释: 阻塞获取 StreamGraph 的执行结果
		 */
		try {
			JobClient jobClient = jobClientFuture.get();
			jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
			return jobClient;
		} catch(ExecutionException executionException) {
			final Throwable strippedException = ExceptionUtils.stripExecutionException(executionException);
			jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, strippedException));
			throw new FlinkException(String.format("Failed to execute job '%s'.", streamGraph.getJobName()), strippedException);
		}
	}
/*******************************************************************************/
//AbstractSessionClusterExecutor
//-->executorFactory.getExecutor(configuration).execute(streamGraph, configuration)
	@Override
	public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
        // 1.由StreamGraph转换成JobGraph(优化操作)
		final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);

		/*************************************************
		 *  注释: clusterDescriptor = StandaloneClusterDescriptor
		 */
		try(final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
			final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
			checkState(clusterID != null);

			// 注释: RestClusterClient
			final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor.retrieve(clusterID);

			// 注释:clusterClient = RestClusterClient
			ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();

			/*************************************************
			 *  注释:2.通过RestClusterClient提交jobGraph到Flink集群执行
			 *  1、MiniClusterClient 本地执行
			 *  2、RestClusterClient 提交到 Flink Rest 服务接收处理
			 */
			return clusterClient.submitJob(jobGraph)
				.thenApplyAsync(jobID -> (JobClient) new ClusterClientJobClientAdapter<>(clusterClientProvider, jobID))
				.whenComplete((ignored1, ignored2) -> clusterClient.close());
		}
	}

/*******************************************************************************/
//继续提交:clusterClient.submitJob(jobGraph)
@Override
	public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
		/*************************************************
		 *  注释:把 JobGragh 持久化到磁盘文件形成 jobGraphFile
		 *  1、持久化 JobGragh 的前缀:flink-jobgraph
		 *  2、持久化 JobGragh 的后缀:.bin
		 */
		CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> {
			try {
				final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin");
				try(ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
					objectOut.writeObject(jobGraph);
				}
				return jobGraphFile;
			} catch(IOException e) {
				throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e));
			}
		}, executorService);

		/*************************************************
		 *  注释: 将持久化完成的文件加入待上传集合中
		 *  补充:thenApply 接收一个函数作为参数,使用该函数处理上一个 CompletableFuture 调用的结果,并返回一个具有处理结果的 Future 对象。(异步编程)
		 */
		CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> {
			List<String> jarFileNames = new ArrayList<>(8);
			List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8);
			Collection<FileUpload> filesToUpload = new ArrayList<>(8);
			// 注释: 加入待上传的文件
			filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));

			for(Path jar : jobGraph.getUserJars()) {
				jarFileNames.add(jar.getName());
				filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
			}
			for(Map.Entry<String, DistributedCache.DistributedCacheEntry> artifacts : jobGraph.getUserArtifacts().entrySet()) {
				final Path artifactFilePath = new Path(artifacts.getValue().filePath);
				try {
					// Only local artifacts need to be uploaded.
					if(!artifactFilePath.getFileSystem().isDistributedFS()) {
						artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), artifactFilePath.getName()));
						filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY));
					}
				} catch(IOException e) {
					throw new CompletionException(new FlinkException("Failed to get the FileSystem of artifact " + artifactFilePath + ".", e));
				}
			}
			// 注释:构建提交任务的请求体,包含对应的一些资源,主要是 jobGragh 的持久化文件和对应的依赖jar
			final JobSubmitRequestBody requestBody = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), jarFileNames, artifactFileNames);

			//注释: 返回一个Tuple2, 其实包含两个返回结果:requestBody 和 filesToUpload
			return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload));
		});
		/*************************************************
		 *  注释: 发送请求
		 *  requestFuture.thenCompose 的参数函数的参数,是 requestFuture 的返回结果,就是 Tuple2
		 *  补充:thenCompose 的参数为一个返回 CompletableFuture 实例的函数,该函数的参数是先前计算步骤的结果。
		 */
		final CompletableFuture<JobSubmitResponseBody> submissionFuture = requestFuture.thenCompose(
			// 注释:sendRetriableRequest 提交
			requestAndFileUploads -> sendRetriableRequest
            (
                JobSubmitHeaders.getInstance(), 
                EmptyMessageParameters.getInstance(),
                requestAndFileUploads.f0, 
                requestAndFileUploads.f1, 		                                                     isConnectionProblemOrServiceUnavailable())
            );
		/*************************************************
		 *  注释:
		 *  submissionFuture.thenAccept() 只对结果执行Action,而不返回新的计算值。
		 *  submissionFuture.thenCombine() 合并两个线程任务的结果,并进一步处理
		 */
		submissionFuture.thenCombine(
            jobGraphFileFuture, 
            (ignored, jobGraphFile) ->jobGraphFile).thenAccept(
            	jobGraphFile -> {
			try {
				//注释:等 sendRetriableRequest 提交完成后,删除生成的 jobGraghFile
				Files.delete(jobGraphFile);
			} catch(IOException e) {
				LOG.warn("Could not delete temporary file {}.", jobGraphFile, e);
			}
		});
		// 注释: thenApply 接收一个函数作为参数,使用该函数处理上一个 CompletableFuture 调用的结果,并返回一个具有处理结果的 Future 对象。
		//注释: exceptionally 跑出异常的时候,执行特定的处理
		return submissionFuture.thenApply
            (
            ignore -> jobGraph.getJobID()
        ).exceptionally((Throwable throwable) -> {
			throw new CompletionException(
				new JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", ExceptionUtils.stripCompletionException(throwable)));
		});
	}
/*******************************************************************************/
//-->继续提交sendRetriableRequest(JobSubmitHeaders.getInstance(), EmptyMessageParameters.getInstance(),requestAndFileUploads.f0, requestAndFileUploads.f1, isConnectionProblemOrServiceUnavailable()));
private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRetriableRequest(
		M messageHeaders, U messageParameters, R request, Collection<FileUpload> filesToUpload, Predicate<Throwable> retryPredicate) {

		/*************************************************
		 *  带重试机制
		 *  注释: 通过 getWebMonitorBaseUrl() 获取 JobManager 的 web 服务
		 */
		return retry(() -> getWebMonitorBaseUrl().thenCompose(webMonitorBaseUrl -> {
			try {
				/*************************************************
				 *  注释: restClient = RestClient,通过restClient以Http Restful的方式发送请求给WeMnonitorEndpoint,最终由JobSubmitHandler来执行请求处理。
				 */
				return restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(),messageHeaders, messageParameters, request, filesToUpload);
			} catch(IOException e) {
				throw new CompletionException(e);
			}
		}), retryPredicate);
	}

/*******************************************************************************/
//补充:在初始化RestClient对象时,会初始化Netty的客户端程序,用来处理应用程序的提交。
public RestClient(RestClientConfiguration configuration, Executor executor) {
		Preconditions.checkNotNull(configuration);
		this.executor = Preconditions.checkNotNull(executor);
		this.terminationFuture = new CompletableFuture<>();
		final SSLHandlerFactory sslHandlerFactory = configuration.getSslHandlerFactory();
		ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel socketChannel) {
				try {
					// SSL should be the first handler in the pipeline
					if(sslHandlerFactory != null) {
						socketChannel.pipeline().addLast("ssl", sslHandlerFactory.createNettySSLHandler(socketChannel.alloc()));
					}
					socketChannel.pipeline().addLast(new HttpClientCodec()).addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
						.addLast(new ChunkedWriteHandler()) // required for multipart-requests
						.addLast(new IdleStateHandler(configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(),
							configuration.getIdlenessTimeout(), TimeUnit.MILLISECONDS)).addLast(new ClientHandler());
				} catch(Throwable t) {
					t.printStackTrace();
					ExceptionUtils.rethrow(t);
				}
			}
		};
		NioEventLoopGroup group = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-client-netty"));

		/*************************************************
		 *  注释: 启动引导程序(netty的客户端的引导程序)
		 */
		bootstrap = new Bootstrap();
		bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(configuration.getConnectionTimeout())).group(group)
			.channel(NioSocketChannel.class).handler(initializer);
		LOG.debug("Rest client endpoint started.");
	}

/*******************************************************************************/
//-->继续提交 restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(),messageHeaders, messageParameters, request, filesToUpload);-->sendRequest(XXXXX);
public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
		sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request, Collection<FileUpload> fileUploads,
RestAPIVersion apiVersion) throws IOException {
......
		/*************************************************
		 *  注释: 构建 Http Request
		 */
		Request httpRequest = createRequest(targetAddress + ':' + targetPort, targetUrl,messageHeaders.getHttpMethod().getNettyHttpMethod(), payload, fileUploads);
		final JavaType responseType;
		final Collection<Class<?>> typeParameters = messageHeaders.getResponseTypeParameters();

		if(typeParameters.isEmpty()) {
			responseType = objectMapper.constructType(messageHeaders.getResponseClass());
		} else {
			responseType = objectMapper.getTypeFactory()
				.constructParametricType(messageHeaders.getResponseClass(), typeParameters.toArray(new Class<?>[typeParameters.size()]));
		}
		/*************************************************
		 *  注释: 提交请求
		 */
		return submitRequest(targetAddress, targetPort, httpRequest, responseType);
	}
/*******************************************************************************/
//-->submitRequest(targetAddress, targetPort, httpRequest, responseType);
private <P extends ResponseBody> CompletableFuture<P> submitRequest(String targetAddress, int targetPort, Request httpRequest,JavaType responseType) {

		/*************************************************
		 *  注释: 通过 Netty 客户端发送请求给 Netty 服务端
		 */
		final ChannelFuture connectFuture = bootstrap.connect(targetAddress, targetPort);
		final CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
		connectFuture.addListener((ChannelFuture future) -> {
			if(future.isSuccess()) {
				channelFuture.complete(future.channel());
			} else {
				channelFuture.completeExceptionally(future.cause());
			}
		});
		return channelFuture.thenComposeAsync(channel -> {
			ClientHandler handler = channel.pipeline().get(ClientHandler.class);
			CompletableFuture<JsonResponse> future;
			boolean success = false;
			try {
				if(handler == null) {
					throw new IOException("Netty pipeline was not properly initialized.");
				} else {
					/*************************************************
					 * 注释:发送请求到JobManager中WebMonitorEndpoint的Netty 服务端
					 * 最终是由JobSubmotHandler进行处理
					 */
					httpRequest.writeTo(channel);
					future = handler.getJsonFuture();
					success = true;
				}
			} catch(IOException e) {
				future = FutureUtils.completedExceptionally(new ConnectionException("Could not write request.", e));
			} finally {
				if(!success) {
					channel.close();
				}
			}
			return future;
			/*************************************************
			 *  注释: 解析响应:parseResponse
			 */
		}, executor).thenComposeAsync((JsonResponse rawResponse) -> parseResponse(rawResponse, responseType), executor);
	}

上述代码流程图解:

在这里插入图片描述

3.3 StreamGraph–>JobGrpah(客户端)

  JobGraph是StreamGraph经过优化之后提交给JobManager的数据结构。JobGraph中包含JobVertex、IntermediateDataSet以及JobEdge。
在这里插入图片描述

JobGraph主要抽象的概念:

1.JobVertex:经过优化之后符合条件的多个StreamNode可能会chain在一起生成一个JobVertex,即一个JobVertex包含一个或者多个operator,JobVertex的输入是JobEdge,输出是IntermediateDataSet。

2.IntermediateDataSet:表示的是JobVertex的输出,经过operator处理产生的数据集,IntermediateDataSet 的输入是JobVertex,输出是JobEdge。

3.JobEdge:表示JobGraph的一条数据传输通道。JobEdge的输入是IntermediateDataSet ,输出是 JobVertex。即数据通过JobEdge由IntermediateDataSet 传递给JobVertex。

/*******************************************************************************/
// StreamGraph --> JobGraph 优化动作(operator chain)
// StreamNode-->JobVertex intermediateDataset  SteamEdge-->jobEdge
/* env.execute("Socket Window WordCount")
 -->execute(getStreamGraph(jobName))
 --> final JobClient jobClient = executeAsync(streamGraph)
 --> CompletableFuture<JobClient> jobClientFuture = executorFactory.getExecutor(configuration).execute(streamGraph, configuration);
 --> final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
 -->final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
 --> pipelineTranslator.translateToJobGraph(pipeline,optimizerConfiguration,
defaultParallelism);
--> streamGraph.getJobGraph(null)
--> StreamingJobGraphGenerator.createJobGraph(this, jobID);
--> new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
 */
public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws MalformedURLException {
		checkNotNull(pipeline);// pipeline-->StreamGrpah
		checkNotNull(configuration);
        //访问配置信息 
		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
    	// 获取JobGraph(内部通过Translator来实现StreamGraph到JobGraph的翻译)
		final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
		configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
.ifPresent(strJobID -> jobGraph.setJobID(JobID.fromHexString(strJobID)));

		jobGraph.addJars(executionConfigAccessor.getJars());
		jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
			jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());

		return jobGraph;
	}
//-->final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
public static JobGraph getJobGraph(
			Pipeline pipeline,
			Configuration optimizerConfiguration,
			int defaultParallelism) {
         // 获取pipelineTranslator
		FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);
         // 通过FlinkPipelineTranslator来转换得到JobGraph
         // pipeline=StreamGraph
		return pipelineTranslator.translateToJobGraph(pipeline,
				optimizerConfiguration,
				defaultParallelism);
	}
//--> pipelineTranslator.translateToJobGraph(pipeline,optimizerConfiguration,defaultParallelism);
	@Override
	public JobGraph translateToJobGraph(
			Pipeline pipeline,
			Configuration optimizerConfiguration,
			int defaultParallelism) {
		checkArgument(pipeline instanceof StreamGraph,
				"Given pipeline is not a DataStream StreamGraph.");

		StreamGraph streamGraph = (StreamGraph) pipeline;
        // 通过StreamGrpah转换得到JobGraph
		return streamGraph.getJobGraph(null);
	}
// streamGraph.getJobGraph(null)
public JobGraph getJobGraph(@Nullable JobID jobID) {
         // 通过 StreamGraph 通过StreamingJobGraphGenerator创建JobGraph
         // this=StreamGraph
		return StreamingJobGraphGenerator.createJobGraph(this, jobID);
	}
// --> StreamingJobGraphGenerator.createJobGraph(this, jobID);
public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
		return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
	}
// -->new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph()
private JobGraph createJobGraph() {// 完成StreamGraph到JobGraph的转换
		preValidate();
     
		// make sure that all vertices start immediately
         // 设置ScheduleMode
		jobGraph.setScheduleMode(streamGraph.getScheduleMode());

		// Generate deterministic hashes for the nodes in order to identify them across
		// submission iff they didn't change.
         // 为节点生成确定性哈希,以便在提交未发生变化的情况下对其进行标识
		Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

		// Generate legacy version hashes for backwards compatibility
		List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
		for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
			legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
		}
         /**
		 * 注释: 重点: 设置 chaining,将可以 chain 到一起的 streamNode chain 在一起,此处会生成相应的 JobVertex、JobEdge、IntermediateDataset 对象把能 chain 在一起的 Operator 都合并了,变成了 Operatorchain
		 * 大致逻辑:遍历每个节点:
		 * 1、如果该节点是一个 chain 的头节点,就生成一个 JobVertex
		 * 2、如果该结点不是chain的头节点,就要把自身配置并入头节点,然后把头节点和自己的出边相连对于不能chain的节点,当作只有头节点处理即可
		 * 作用:能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐。
		 * 
		 */
		setChaining(hashes, legacyHashes);
		setPhysicalEdges();
		setSlotSharingAndCoLocation();
		setManagedMemoryFraction(
			Collections.unmodifiableMap(jobVertices),
			Collections.unmodifiableMap(vertexConfigs),
			Collections.unmodifiableMap(chainedConfigs),
			id -> streamGraph.getStreamNode(id).getMinResources(),
			id -> streamGraph.getStreamNode(id).getManagedMemoryWeight());
		configureCheckpointing();

	jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());

		JobGraphUtils.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);

		// set the ExecutionConfig last when it has been finalized
		try {
			jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
		}
		catch (IOException e) {
			throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +"This indicates that non-serializable types (like custom serializers) were registered");
		}
		return jobGraph;
	}

对于两个StreamNode能够chain在一起形成OperatorChain,必须同时满足以下9个条件:

// 1、下游节点的入度为1 (即下游节点没有来自其他节点的输入)

downStreamVertex.getInEdges().size() == 1;

// 2、上下游节点都在同一个 slot group 中

upStreamVertex.isSameSlotSharingGroup(downStreamVertex);

// 3、前后算子不为空

!(downStreamOperator == null || upStreamOperator == null);

// 4、上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source 默认是 HEAD)

!upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER;

// 5、下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter 等默认是

ALWAYS)

downStreamOperator.getChainingStrategy() == ChainingStrategy.ALWAYS;

// 6、两个节点间物理分区逻辑是 ForwardPartitioner(两个物理节点之间没有shuffle动作)

(edge.getPartitioner() instanceof ForwardPartitioner);

// 7、两个算子间的shuffle方式(PIPELINED,BATCH,UNDEFINED)不等于批处理模式

edge.getShuffleMode() != ShuffleMode.BATCH;

// 8、上下游的并行度一致

upStreamVertex.getParallelism() == downStreamVertex.getParallelism();

// 9、用户没有禁用 chain

streamGraph.isChainingEnabled();

StreamGraph 转换为 JobGraph(客户端完成这个动作)的流程图:

在这里插入图片描述

4. WebMonitorEndpoint 处理 RestClient的JobSubmit请求(服务端)

  客户端通过netty服务将请求发送到Flink集群的主节点,Flink集群通过netty服务端接收请求,由 WebMoniotorEndpoint 中的 JobSubmitHandler 中的HandleRequest()方法处理。

  • 1.从请求中获取文件:反序列化JobGraph文件,以及job运行需要的jar和对应的依赖jar,随后将其上传
  • 2.由Dispachter将任务继续提交运行(调度执行)
    • 2.1 将JobGraph文件保存到zk
    • 2.2 提交job执行
      • 创建 JobMaster(完成JobGraph–>ExecutionGraph的转换)
      • 启动 JobMaster
    • 2.3 Job执行完成,删除对应的 JobGraph相关的数据

代码的核心入口:

JobSubmitHandler.handleRequest();

4.1 handleRequest–>Dispatcher

/* env.execute("Socket Window WordCount");
 * --> execute(getStreamGraph(jobName));
 * --> final JobClient jobClient = executeAsync(streamGraph);
 * --> CompletableFuture<JobClient> jobClientFuture = executorFactory.getExecutor(configuration).execute(streamGraph, configuration);
 * --> clusterClient.submitJob(jobGraph).thenApplyAsync(jobID -> (JobClient) new ClusterClientJobClientAdapter<>(clusterClientProvider, jobID))
				.whenComplete((ignored1, ignored2) -> clusterClient.close());
 * -->sendRetriableRequest(JobSubmitHeaders.getInstance(), EmptyMessageParameters.getInstance(),requestAndFileUploads.f0, requestAndFileUploads.f1, isConnectionProblemOrServiceUnavailable()));
 * --> restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(),messageHeaders, messageParameters, request, filesToUpload);
 * --> sendRequest(targetAddress, targetPort, messageHeaders, messageParameters, request, fileUploads,
 RestAPIVersion.getLatestVersion(messageHeaders.getSupportedAPIVersions()));
 *--> submitRequest(targetAddress, targetPort, httpRequest, responseType);
 * --> httpRequest.writeTo(channel); 通过netty的客户端发送请求给netty服务端
 **************************************************************************
 * 服务端
 * handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request,@Nonnull DispatcherGateway gateway)
 * --> jobGraph -> gateway.submitJob(jobGraph, timeout)
 * --> internalSubmitJob(jobGraph);
 * --> persistAndRunJob(JobGraph jobGraph)
 * --> runJob(jobGraph);
*/
// JobSubmitHandler中的handleRequest方法接收到客户端的请求
@Override
	protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request,
		@Nonnull DispatcherGateway gateway) throws RestHandlerException {
		// 注释: 从请求中获取文件(包含jobGraph序列化文件)
		final Collection<File> uploadedFiles = request.getUploadedFiles();
		final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(File::getName, Path::fromLocalFile));

		if(uploadedFiles.size() != nameToFile.size()) {
			throw new RestHandlerException(String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
				uploadedFiles.size() < nameToFile.size() ? "lower" : "higher", nameToFile.size(), uploadedFiles.size()),
				HttpResponseStatus.BAD_REQUEST);
		}
        /*************************************************
		 *  注释: 恢复得到 JobGragh
		 *  由此可见: 服务端接收到客户端提交的,其实就是 JobGragh
		 *  到此为止: 客户端终于把 JobGragh 提交给 JobManager 了,最终由 JobSubmitHandler 来执行处理
		 */
		CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);

		//  注释: 获取job需要的 jar
		Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);

		//  注释: 获取 job需要的依赖 jar
		Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);

		//  注释: 上传 JobGraph + 程序jar + 依赖 jar
		CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);

		//  注释: 提交任务
		CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(

			// 注释: 提交任务 gateway = Dispatcher
			jobGraph -> gateway.submitJob(jobGraph, timeout)
		);

		return jobSubmissionFuture.thenCombine(jobGraphFuture,
			// 注释: 封装处理响应返回!
			(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())
		);
	}
/*****************************************************************************/
// -->  gateway.submitJob(jobGraph, timeout)
@Override
	public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
		log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());

		try {
			if(isDuplicateJob(jobGraph.getJobID())) {
				return FutureUtils.completedExceptionally(new DuplicateJobSubmissionException(jobGraph.getJobID()));
			} else if(isPartialResourceConfigured(jobGraph)) {
				return FutureUtils.completedExceptionally(new JobSubmissionException(jobGraph.getJobID(),
					"Currently jobs is not supported if parts of the vertices have " + "resources configured. The limitation will be removed in future versions."));
			} else {
				//  注释: 提交
				return internalSubmitJob(jobGraph);
			}
		} catch(FlinkException e) {
			return FutureUtils.completedExceptionally(e);
		}
	}
/*****************************************************************************/
// --> internalSubmitJob(jobGraph);
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
		log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());

		//  注释: 提交执行
		final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(

			// 注释: 持久和提交
			jobGraph.getJobID(), jobGraph, this::persistAndRunJob).thenApply(ignored -> Acknowledge.get()
		);

		return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
			if(throwable != null) {
				// 注释: 当 Job 执行结束之后,删除该 Job 相关的数据
				cleanUpJobData(jobGraph.getJobID(), true);

				final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
				log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
				throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable));
			} else {
				return acknowledge;
			}
		}, getRpcService().getExecutor());
	}
/*****************************************************************************/
// --> persistAndRunJob(JobGraph jobGraph)
private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {

		// 注释: 保存 JobGraph 到zk jobGraphWriter=ZooKeeperJobGraphStore
		jobGraphWriter.putJobGraph(jobGraph);

		// 注释: 运行 job
		final CompletableFuture<Void> runJobFuture = runJob(jobGraph);

		//  注释: 运行完毕该 job 就从 jobGraphWriter 移除
		return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> {
			if(throwable != null) {
				jobGraphWriter.removeJobGraph(jobGraph.getJobID());
			}
		}));
	}

/*****************************************************************************/
// 客户端正常提交一个 job 时,最终由集群主节点中的 Dispatcher 接收来继续提交执行
// --> runJob(jobGraph); ==> 创建JobManagerRunner,创建完成后启动JobManagerRunner
private CompletableFuture<Void> runJob(JobGraph jobGraph) {	Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
		// 注释: 创建 JobManagerRunner(创建JobMaster实例,同时会将JobGraph编程为ExecutionGraph) 严格来说是启动JobMaster,此处的名称最好叫createJobMasterRunner,Flink的主从架构:资源管理--> ResourceManager + TaskExecutor, 任务运行 --> JobMaster + StreamTask                              
		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);

		jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);

		//  注释: 启动 JobManagerRunner
		return jobManagerRunnerFuture.thenApply(
			// 注释: 提交任务 == start JobManagerRunner
			FunctionUtils.uncheckedFunction(this::startJobManagerRunner)

		).thenApply(
			FunctionUtils.nullFn()
		).whenCompleteAsync(
			(ignored, throwable) -> {
				if(throwable != null) {
					jobManagerRunnerFutures.remove(jobGraph.getJobID());
				}
			}, getMainThreadExecutor());
	}

WebMonitorEndpoint 处理客户端提交的job的流程:

在这里插入图片描述

4.2 Dispatcher.runJob(jobGraph)

主要做了最重要的两件事情:

  • 创建 JobMaster -->createJobManagerRunner(jobGraph)
  • 启动 JobMaster --> jobManagerRunner -> startJobManagerRunner(jobManagerRunner)

在这里插入图片描述

4.2.1 创建 JobMaster对象

  通过jobManagerRunnerFactory.createJobManagerRunner()方法返回一个JobManagerRunnerImpl对象;在初始化JobManagerRunnerImpl对象时,内部通过jobMasterFactory.createJobMasterService()方法,创建一个JobMaster对象,同时将JobGraph转换为ExecutionGraph。

注:JobMaster 是 RpcEndpoint的子类,在JobMaster对象实例化完成后,会执行该对象的onStart()方法。

//代码入口:
/*final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
 * -->jobManagerRunnerFactory.createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices,jobManagerSharedServices, new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), fatalErrorHandler)
 * -->  new JobManagerRunnerImpl(jobGraph,jobMasterFactory,highAvailabilityServices,
jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()),jobManagerServices.getScheduledExecutorService(),fatalErrorHandler);
 * --> this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader);
 * --> new JobMaster(rpcService, jobMasterConfiguration, ResourceID.generate(), jobGraph, haServices, slotPoolFactory, schedulerFactory,jobManagerSharedServices, heartbeatServices, jobManagerJobMetricGroupFactory, jobCompletionActions, fatalErrorHandler,userCodeClassloader, schedulerNGFactory, shuffleMaster,
lookup -> new JobMasterPartitionTrackerImpl(jobGraph.getJobID(), shuffleMaster, lookup));
*/
private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
		final RpcService rpcService = getRpcService();

		return CompletableFuture.supplyAsync(() -> {
			try {

				//  注释: createJobManagerRunner 方法返回 JobManagerRunnerImpl
				return jobManagerRunnerFactory.createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices,jobManagerSharedServices, new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), fatalErrorHandler);
			} catch(Exception e) {
				throw new CompletionException(new JobExecutionException(jobGraph.getJobID(), "Could not instantiate JobManager.", e));
			}
		}, rpcService.getExecutor());
	}
/***********************************************************************************/
// --> jobManagerRunnerFactory.createJobManagerRunner(xxxxx)
public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory {
	INSTANCE;

	@Override
	public JobManagerRunner createJobManagerRunner(
			JobGraph jobGraph,
			Configuration configuration,
			RpcService rpcService,
			HighAvailabilityServices highAvailabilityServices,
			HeartbeatServices heartbeatServices,
			JobManagerSharedServices jobManagerServices,
			JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
			FatalErrorHandler fatalErrorHandler) throws Exception {

		final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);

		final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(configuration);
		final SchedulerFactory schedulerFactory = DefaultSchedulerFactory.fromConfiguration(configuration);
		final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
		final ShuffleMaster<?> shuffleMaster = ShuffleServiceLoader.loadShuffleServiceFactory(configuration).createShuffleMaster(configuration);

		// 注释: 生成 DefaultJobMasterServiceFactory
		final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory(
			jobMasterConfiguration,
			slotPoolFactory,
			schedulerFactory,
			rpcService,
			highAvailabilityServices,
			jobManagerServices,
			heartbeatServices,
			jobManagerJobMetricGroupFactory,
			fatalErrorHandler,
			schedulerNGFactory,
			shuffleMaster);

		//  注释: 返回 JobManagerRunnerImpl 负责启动 JobMaster
		return new JobManagerRunnerImpl(
			jobGraph,
			jobMasterFactory,
			highAvailabilityServices,
jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()),
			jobManagerServices.getScheduledExecutorService(),
			fatalErrorHandler);
	}
}
/***********************************************************************************/
// -->  new JobManagerRunnerImpl(xxxxx)
public JobManagerRunnerImpl(
			final JobGraph jobGraph,
			final JobMasterServiceFactory jobMasterFactory,
			final HighAvailabilityServices haServices,
			final LibraryCacheManager.ClassLoaderLease classLoaderLease,
			final Executor executor,
			final FatalErrorHandler fatalErrorHandler) throws Exception {

		this.resultFuture = new CompletableFuture<>();
		this.terminationFuture = new CompletableFuture<>();
		this.leadershipOperation = CompletableFuture.completedFuture(null);

		this.jobGraph = checkNotNull(jobGraph);
		this.classLoaderLease = checkNotNull(classLoaderLease);
		this.executor = checkNotNull(executor);
		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);

		checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");

		// libraries and class loader first
		final ClassLoader userCodeLoader;
		try {
			userCodeLoader = classLoaderLease.getOrResolveClassLoader(
				jobGraph.getUserJarBlobKeys(),
				jobGraph.getClasspaths());
		} catch (IOException e) {
			throw new Exception("Cannot set up the user code libraries: " + e.getMessage(), e);
		}

		// high availability services next
		this.runningJobsRegistry = haServices.getRunningJobsRegistry();

		// 注释:返回 ZooKeeperLeaderElectionService
		this.leaderElectionService = haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());

		this.leaderGatewayFuture = new CompletableFuture<>();

		/*************************************************
		 *  注释:启动 JobMaster  jobMasterService = JobMaster 实例
		 */
		// now start the JobManager
		this.jobMasterService = jobMasterFactory.createJobMasterService(jobGraph, this, userCodeLoader);
	}
/***********************************************************************************/
// -->  jobMasterFactory.createJobMasterService(xxxxx)
@Override
	public JobMaster createJobMasterService(JobGraph jobGraph, OnCompletionActions jobCompletionActions,
		ClassLoader userCodeClassloader) throws Exception {

		/*************************************************
		 *  注释: 生成和启动一个 JobMaster
		 */
		return new JobMaster(rpcService, jobMasterConfiguration, ResourceID.generate(), jobGraph, haServices, slotPoolFactory, schedulerFactory,
			jobManagerSharedServices, heartbeatServices, jobManagerJobMetricGroupFactory, jobCompletionActions, fatalErrorHandler,
			userCodeClassloader, schedulerNGFactory, shuffleMaster,
			lookup -> new JobMasterPartitionTrackerImpl(jobGraph.getJobID(), shuffleMaster, lookup));
	}
}
/***********************************************************************************/
// -->  new JobMaster(xxxxx) 在该对象实例化完成之后,会执行该对象的onStart()方法,该对象没有onStart()方法
public JobMaster(RpcService rpcService, JobMasterConfiguration jobMasterConfiguration, ResourceID resourceId, JobGraph jobGraph,
		HighAvailabilityServices highAvailabilityService, SlotPoolFactory slotPoolFactory, SchedulerFactory schedulerFactory,
		JobManagerSharedServices jobManagerSharedServices, HeartbeatServices heartbeatServices, JobManagerJobMetricGroupFactory jobMetricGroupFactory,
		OnCompletionActions jobCompletionActions, FatalErrorHandler fatalErrorHandler, ClassLoader userCodeLoader,
		SchedulerNGFactory schedulerNGFactory, ShuffleMaster<?> shuffleMaster, PartitionTrackerFactory partitionTrackerFactory) throws Exception {

		super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME), null);

		this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration);
		this.resourceId = checkNotNull(resourceId);
		this.jobGraph = checkNotNull(jobGraph);
		this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
		this.highAvailabilityServices = checkNotNull(highAvailabilityService);
		this.blobWriter = jobManagerSharedServices.getBlobWriter();
		this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();
		this.jobCompletionActions = checkNotNull(jobCompletionActions);
		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
		this.userCodeLoader = checkNotNull(userCodeLoader);
		this.schedulerNGFactory = checkNotNull(schedulerNGFactory);
		this.heartbeatServices = checkNotNull(heartbeatServices);
		this.jobMetricGroupFactory = checkNotNull(jobMetricGroupFactory);
		final String jobName = jobGraph.getName();
		final JobID jid = jobGraph.getJobID();
		log.info("Initializing job {} ({}).", jobName, jid);
		resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
		this.slotPool = checkNotNull(slotPoolFactory).createSlotPool(jobGraph.getJobID());
		// 注释: SchedulerImpl
		this.scheduler = checkNotNull(schedulerFactory).createScheduler(slotPool);
		this.registeredTaskManagers = new HashMap<>(4);
		this.partitionTracker = checkNotNull(partitionTrackerFactory).create(resourceID -> {
			Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerInfo = registeredTaskManagers.get(resourceID);
			if(taskManagerInfo == null) {
				return Optional.empty();
			}
			return Optional.of(taskManagerInfo.f1);
		});
		this.backPressureStatsTracker = checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker());
		this.shuffleMaster = checkNotNull(shuffleMaster);
		this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
		// 注释: defaultScheduler(内部进行JobGraph-->ExecutionGraph的转换)
		this.schedulerNG = createScheduler(jobManagerJobMetricGroup);
		this.jobStatusListener = null;
		this.resourceManagerConnection = null;
		this.establishedResourceManagerConnection = null;
		this.accumulators = new HashMap<>();
		this.taskManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();
		this.resourceManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();
	}

在这里插入图片描述

JobGraph–>ExecutionGraph

  ExecutionGraph:JobManager(JobMaster) 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph是JobGraph 的并行化版本,是调度层最核心的数据结构。源码的核心入口:

ExecutionGraph executioinGraph = SchedulerBase.createAndRestoreExecutionGraph()

  在 SchedulerBase 这个类的内部,有两个成员变量:一个是 JobGraph,一个是 ExecutioinGraph;在创建 SchedulerBase 的子类DefaultScheduler 实例对象时,会在 SchedulerBase 的构造方法中去生成 ExecutionGraph。

它包含的主要抽象概念有:

1、ExecutionJobVertex:和JobGraph中的JobVertex一一对应。每一个ExecutionJobVertex都有和并发度相同的 ExecutionVertex。

2、ExecutionVertex:表示ExecutionJobVertex中的一个并发子任务,输入是ExecutionEdge,输出是IntermediateResultPartition。

3、IntermediateResult:和JobGraph中的IntermediateDataSet一一对应。一个IntermediateResult包含多个IntermediateResultPartition,其个数等于该operator的并发度。

4、IntermediateResultPartition:表示ExecutionVertex的一个输出分区,输入是ExecutionVertex,输出是若干个ExecutionEdge。

5、ExecutionEdge:表示ExecutionVertex的输入,source是IntermediateResultPartition,target是ExecutionVertex。source和target都只能是一个。

6、Execution:执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过ExecutionAttemptID 来唯一标识。JM和TM之间关于 task 的部署和 task status 的更新都是通过ExecutionAttemptID 来确定消息接受者。

// 在初始化JobMaster对象时,在内部会进行JobGraph-->ExecutionGraph的转换
/***********************************************************************************/
// -->  createAndRestoreExecutionGraph(xxxxx)
private ExecutionGraph createAndRestoreExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup, ShuffleMaster<?> shuffleMaster,
		JobMasterPartitionTracker partitionTracker) throws Exception {
         // 创建 ExecutionGraph 
		ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker);

		final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();

		if(checkpointCoordinator != null) {
			// check whether we find a valid checkpoint
			if(!checkpointCoordinator.restoreLatestCheckpointedStateToAll(new HashSet<>(newExecutionGraph.getAllVertices().values()), false)) {

				// check whether we can restore from a savepoint
				tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
			}
		}
		return newExecutionGraph;
	}
/***********************************************************************************/
// -->  createExecutionGraph(xxxxx)
private ExecutionGraph createExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup, ShuffleMaster<?> shuffleMaster,
		final JobMasterPartitionTracker partitionTracker) throws JobExecutionException, JobException {
		final FailoverStrategy.Factory failoverStrategy = legacyScheduling ? FailoverStrategyLoader.loadFailoverStrategy(jobMasterConfiguration, log) : new NoOpFailoverStrategy.Factory();
		return ExecutionGraphBuilder.buildGraph(null, jobGraph, jobMasterConfiguration, futureExecutor, ioExecutor, slotProvider, userCodeLoader, checkpointRecoveryFactory,rpcTimeout, restartStrategy, currentJobManagerJobMetricGroup, blobWriter, slotRequestTimeout, log, shuffleMaster, partitionTracker,failoverStrategy);
	}
/***********************************************************************************/
// -->   ExecutionGraphBuilder.buildGraph(xxxxx)
public static ExecutionGraph buildGraph(
		@Nullable ExecutionGraph prior,
		JobGraph jobGraph,
		Configuration jobManagerConfig,
		ScheduledExecutorService futureExecutor,
		Executor ioExecutor,
		SlotProvider slotProvider,
		ClassLoader classLoader,
		CheckpointRecoveryFactory recoveryFactory,
		Time rpcTimeout,
		RestartStrategy restartStrategy,
		MetricGroup metrics,
		BlobWriter blobWriter,
		Time allocationTimeout,
		Logger log,
		ShuffleMaster<?> shuffleMaster,
		JobMasterPartitionTracker partitionTracker,
		FailoverStrategy.Factory failoverStrategyFactory) throws JobExecutionException, JobException {

		checkNotNull(jobGraph, "job graph cannot be null");
         // 从JobGraph中获取JobName和JobID
		final String jobName = jobGraph.getName();
		final JobID jobId = jobGraph.getJobID();
        // 构建包含job信息的JobInformation对象
		final JobInformation jobInformation = new JobInformation(
			jobId,
			jobName,
			jobGraph.getSerializedExecutionConfig(),
			jobGraph.getJobConfiguration(),
			jobGraph.getUserJarBlobKeys(),
			jobGraph.getClasspaths());

		final int maxPriorAttemptsHistoryLength =
jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
         // 释放IntermediateResultPartition的策略:RegionPartitionReleaseStrategy
		final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory 	PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(jobManagerConfig);

		// create a new execution graph, if none exists so far
         // 创建executionGraph对象,初始化成员变量
		final ExecutionGraph executionGraph;
		try {
			executionGraph = (prior != null) ? prior :
				new ExecutionGraph(
					jobInformation,
					futureExecutor,
					ioExecutor,
					rpcTimeout,
					restartStrategy,
					maxPriorAttemptsHistoryLength,
					failoverStrategyFactory,
					slotProvider,
					classLoader,
					blobWriter,
					allocationTimeout,
					partitionReleaseStrategyFactory,
					shuffleMaster,
					partitionTracker,
					jobGraph.getScheduleMode());
		} catch (IOException e) {
			throw new JobException("Could not create the ExecutionGraph.", e);
		}

		// set the basic properties
        // 设置executionGraph的基本属性
        //1、JsonPlanGenerator.generateplan(jobGraph)根据JobGraph 生成一个 JsonPlan
        //2、executionGraph.setIsonplan(Jsonplan)把 Jsonplan 设置到 ExecutionGraph
		try {
			executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
		}
		catch (Throwable t) {
			log.warn("Cannot create JSON plan for job", t);
			// give the graph an empty plan
			executionGraph.setJsonPlan("{}");
		}
		// initialize the vertices that have a master initialization hook
		// file output formats create directories here, input formats create splits

		final long initMasterStart = System.nanoTime();
		log.info("Running initialization on master for job {} ({}).", jobName, jobId);
         // 遍历每个JobVertex执行初始化 
		for (JobVertex vertex : jobGraph.getVertices()) {
			String executableClass = vertex.getInvokableClassName();
			if (executableClass == null || executableClass.isEmpty()) {
				throw new JobSubmissionException(jobId,"The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
			}

			try {
				vertex.initializeOnMaster(classLoader);
			}
			catch (Throwable t) {
					throw new JobExecutionException(jobId,"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
			}
		}

		log.info("Successfully ran initialization on master in {} ms.",
				(System.nanoTime() - initMasterStart) / 1_000_000);
// topologically sort the job vertices and attach the graph to the existing one
		List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
		if (log.isDebugEnabled()) {
			log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);
		}
        //该方法内部:遍历每个JobVertex,一个JobVertex对应一个ExecutionJobVertex对象
        // 根据并行度生成对应的ExecutionVertex
		executionGraph.attachJobGraph(sortedTopology);

		if (log.isDebugEnabled()) {
			log.debug("Successfully created execution graph from job graph {} ({}).", jobName, jobId);
		}
		.......
		return executionGraph;
	}

在这里插入图片描述

4.2.3 启动 JobMaster【开启相关的服务】

  通过jobManagerRunner.start()启动JobMaster,在启动的过程中,会进行选举,同时启动心跳和注册服务,接下里进行资源申请与任务的调度执行。

FunctionUtils.uncheckedFunction(this::startJobManagerRunner)
/*
 * --> startJobManagerRunner(JobManagerRunner jobManagerRunner) 
 * --> jobManagerRunner.start();
 * --> start(LeaderContender contender)
 * --> isLeader()
 * --> leaderContender.grantLeadership(issuedLeaderSessionID);
 * --> verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
 * --> startJobMaster(leaderSessionId);
 * --> jobMasterService.start(new JobMasterId(leaderSessionId));
 * --> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);(调度执行)
 *  --> startJobMasterServices();
 *  --> resetAndStartScheduler();   
*/ 
    private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
		final JobID jobId = jobManagerRunner.getJobID();
         ...............
		/*************************************************
		 *  注释: 启动 jobManagerRunner
		 */
		jobManagerRunner.start();
		return jobManagerRunner;
	}

/**********************************************************************************/
// ==> jobManagerRunner.start();
	 //  注释: 启动 JobMaster,会执行选举操作
	@Override
	public void start() throws Exception {
		try {

			/*************************************************
			 *  注释:ZooKeeperLeaderElectionService = leaderElectionService
			 * 如果当前JobMaster选举成功,会跳转到ZooKeeperLeaderElectionService的isLeader()方法
			 */
			leaderElectionService.start(this);
		} catch (Exception e) {
			log.error("Could not start the JobManager because the leader election service did not start.", e);
			throw new Exception("Could not start the leader election service.", e);
		}
	}
/**********************************************************************************/
// ==> leaderElectionService.start(this);

/*************************************************
	 *  注释: 因为当前类是 LeaderLatchListener 的子类,所以当该组件在进行选举如果成功, 则会自动调用 isLeader() 方法,否则调用 notLeader 方法。 这是 ZooKeeper 的 API 框架 cruator 的机制
	 */
	@Override
	public void start(LeaderContender contender) throws Exception {
		Preconditions.checkNotNull(contender, "Contender must not be null.");
		Preconditions.checkState(leaderContender == null, "Contender was already set.");

		LOG.info("Starting ZooKeeperLeaderElectionService {}.", this);

		synchronized(lock) {

			client.getUnhandledErrorListenable().addListener(this);

			//注释: 这个值到底是什么,根据情况而定
			leaderContender = contender;

			/*************************************************
			 *  注释: Fink的选举,是通过 ZooKeeper 的 API 框架 Curator 实现的
			 *  1、leaderLatch.start(); 事实上就是举行选举
			 *  2、当选举结束的时候:
			 *  	如果成功了: isLeader()
			 *      如果失败了: notLeader()
			 */
			leaderLatch.addListener(this);
			leaderLatch.start();
			/*************************************************
			 *  注释: 注册监听器,如果选举结束之后:
			 *  1、自己成为 Leader, 则会回调 isLeader() 进行处理
			 *  2、自己成为 Follower,则会回调 notLeader() 进行处理
			 */
			cache.getListenable().addListener(this);
			cache.start();
			client.getConnectionStateListenable().addListener(listener);
			running = true;
		}
	}
/**********************************************************************************/
// ==> isLeader()
@Override
	public void isLeader() {
		synchronized(lock) {
			if(running) {
				issuedLeaderSessionID = UUID.randomUUID();
				clearConfirmedLeaderInformation();
				if(LOG.isDebugEnabled()) {
					LOG.debug("Grant leadership to contender {} with session ID {}.", leaderContender.getDescription(), issuedLeaderSessionID);
				}
				/*************************************************
				 *  注释: 分配 LeaderShip
				 *  leaderContender = JobManagerRunnerImpl
				 *  leaderContender = ResourceManager
				 *  leaderContender = DefaultDispatcherRunner
				 *  leaderElectionService.start(this);
				 *  leaderContender = this
				 */
				leaderContender.grantLeadership(issuedLeaderSessionID);
			} else {
				LOG.debug("Ignoring the grant leadership notification since the service has " + "already been stopped.");
			}
		}
	}
/**********************************************************************************/
// ==> leaderContender.grantLeadership(issuedLeaderSessionID);
@Override
	public void grantLeadership(final UUID leaderSessionID) {
		synchronized (lock) {
			if (shutdown) {
				log.debug("JobManagerRunner cannot be granted leadership because it is already shut down.");
				return;
			}

			leadershipOperation = leadershipOperation.thenCompose(
				(ignored) -> {
					synchronized (lock) {

						/*************************************************
						 *  注释: 调度 并启动 JobManager
						 */
						return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
					}
				});

			handleException(leadershipOperation, "Could not start the job manager.");
		}
	}
/**********************************************************************************/
// ==> verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
		final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();
		return jobSchedulingStatusFuture.thenCompose(
			jobSchedulingStatus -> {
				if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
					return jobAlreadyDone();
				} else {

					/*************************************************
					 *  注释: 启动 JobMaster
					 */
					return startJobMaster(leaderSessionId);
				}
			});
	}
/**********************************************************************************/
// ==> startJobMaster(leaderSessionId);

private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
		log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
			jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, jobMasterService.getAddress());

		/*************************************************
		 *  注释: 当 JobMaster 启动好了之后,更改 Job 状态为 Running
		 */
		try {
			runningJobsRegistry.setJobRunning(jobGraph.getJobID());
		} catch (IOException e) {
			return FutureUtils.completedExceptionally(
				new FlinkException(
					String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
					e));
		}

		final CompletableFuture<Acknowledge> startFuture;
		try {

			/*************************************************
			 *  注释: 启动 JobMaster
			 */
			startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));

		} catch (Exception e) {
			return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
		}

		final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
		return startFuture.thenAcceptAsync(
			(Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(
				leaderSessionId,
				jobMasterService.getAddress(),
				currentLeaderGatewayFuture),
			executor);
	}
/***********************************************************************************/
// --> jobMasterService.start(new JobMasterId(leaderSessionId));
public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {

		// 注释: 确保 RPC 工作正常,给自己发送一条消息,确认启动时正常的
		// make sure we receive RPC and async calls
		start();

		// 注释: 执行 JobGragh
		return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
	}
/***********************************************************************************/
// --> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
    ..............
		setNewFencingToken(newJobMasterId);

		/*************************************************
		 *  注释: 初始化一些必要的服务组件(JobMaster的注册和心跳)
		 */
		startJobMasterServices();

		log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);

		/*************************************************
		 *  注释: JobMaster开始调度执行StreamTask
		 */
		resetAndStartScheduler();
		return Acknowledge.get();
	}
/***********************************************************************************/
// --> startJobMasterServices();
private void startJobMasterServices() throws Exception {

		/*************************************************
		 *  注释: 启动心跳服务
		 */
		startHeartbeatServices();

		/*************************************************
		 * 启动 SlotPool(实现类SlotPoolImpl) 和 Schduler(实现类SchdulerImpl) 服务
		 */

		slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
		scheduler.start(getMainThreadExecutor());

		/*************************************************
		 *  注释: JobMaster 链接 ResourceManager,注册成功ResourceManager向JobMaster法送心跳.
		 */
		reconnectToResourceManager(new FlinkException("Starting JobMaster component."));

		/*************************************************
		 *  注释: 启动监听,监听ResourceManager的变更
		 */
		resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
	}

在这里插入图片描述

4.2.4 启动 JobMaster【申请slot】

Job调度执行代码的核心入口

// 开始调度执行-- JobMaster.startJobExecution()
resetAndStartScheduler();

  在JobMaster启动并完成向ResourceManager的注册后,下一步向ResourceManager申请slot资源。

在这里插入图片描述

申请slot的核心代码入口:

allocateSlots(executionVertexDeploymentOptions);

申请slot的步骤分为四步:

  • 1.JobMaster向ResourceManager发送请求申请slot
  • 2.ResourceManager接收到请求,执行slot请求处理
  • 3.TaskManager处理ResourceManger发过来的slot请求
  • 4.JobMaster 接收到 TaskManager 发送过来的 slot 申请处理结果

1、ResourceManager 管理所有的 TaskManager(TaskExecutor)
2、TaskExecutor 中关于资源的管理,使用 slot的抽象
3、JobMaster 申请slot, 管理组件:SlotPool(slot共享);如果要执行一个 Task,可以先尝试从 SlotPool 中
申请,如果申请不到,则再向 ResourceManager 申请。

4.2.4.1 JobMaster发送请求申请slot
/* allocateSlots(executionVertexDeploymentOptions);
 * --> DefaultScheduler.allocateSlots();
 * --> DefaultExecutionSlotAllocator.allocateSlotsFor();
 * --> NormalSlotProviderStrategy.allocateSlot();
 * --> SchedulerImpl.allocateSlot();
 * --> SchedulerImpl.allocateSlotInternal();
 * --> SchedulerImpl.internalAllocateSlot();
 * --> SchedulerImpl.allocateSingleSlot();
 * --> SchedulerImpl.requestNewAllocatedSlot();
 * --> SlotPoolImpl.requestNewAllocatedBatchSlot();
 * --> SlotPoolImpl.requestNewAllocatedSlotInternal();
 * --> SlotPoolImpl.requestSlotFromResourceManager();
 /
 /**********************************************************************************/
// --> allocateSlots(executionVertexDeploymentOptions);
@Override
	public void allocateSlotsAndDeploy(final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
		validateDeploymentOptions(executionVertexDeploymentOptions);
		final Map<ExecutionVertexID, ExecutionVertexDeploymentOption> deploymentOptionsByVertex = groupDeploymentOptionsByVertexId(
            executionVertexDeploymentOptions);
		final List<ExecutionVertexID> verticesToDeploy = executionVertexDeploymentOptions.stream().map(ExecutionVertexDeploymentOption::getExecutionVertexId).collect(Collectors.toList());

		final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex = executionVertexVersioner.recordVertexModifications(verticesToDeploy);
		transitionToScheduled(verticesToDeploy);
		//注释: 申请Slot
		final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments = allocateSlots(executionVertexDeploymentOptions);

		//注释: 构建的 DeploymentHandle
		final List<DeploymentHandle> deploymentHandles = createDeploymentHandles(requiredVersionByVertex, deploymentOptionsByVertex,
			slotExecutionVertexAssignments);

		// 注释: 部署运行
		waitForAllSlotsAndDeploy(deploymentHandles);
	}
 /**********************************************************************************/
// --> DefaultScheduler.allocateSlots();
private List<SlotExecutionVertexAssignment> allocateSlots(final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
		/*************************************************
		 *  注释:通过executionSlotAllocator 申请slot  参数内部:ExecutionVertexId --> ExecutionVertex --> ExecutionVertexSchedulingRequirements(三层映射关系)
		 */
		return executionSlotAllocator.allocateSlotsFor(	executionVertexDeploymentOptions.stream().map(ExecutionVertexDeploymentOption::getExecutionVertexId).map(this::getExecutionVertex).map(ExecutionVertexSchedulingRequirementsMapper::from).collect(Collectors.toList()));
	}
 //**********************************************************************************
// --> DefaultExecutionSlotAllocator.allocateSlotsFor();
   /**
	 *
	 * @param executionVertexSchedulingRequirements The requirements for scheduling the executions.
	 * @return
	 * 三个步骤:
	 * 1.初始化容器,用来存储申请到的 slotExecutionVertexAssignment
	 * 2.遍历待申请slot的 ExecutionVertex 集合:ExecutionVertexSchedulingRequirements,依次申请slot
	 * 3.处理申请结果:申请成功,最终申请到的是LogicalSlot
	 * 在申请slot过程中的两种关于slot的抽象:LogicalSlot 与  physicalSlot(共享slot的概念)
	 */
	@Override
	public List<SlotExecutionVertexAssignment> allocateSlotsFor(
			List<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {

		validateSchedulingRequirements(executionVertexSchedulingRequirements);

		// 计算待申请的Slot个数 executionVertexSchedulingRequirements--> slotExecutionVertexAssignment
		List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
				new ArrayList<>(executionVertexSchedulingRequirements.size());
        // 计算这些slot申请过程中每个slot的AllocationID
		Set<AllocationID> allPreviousAllocationIds = computeAllPriorAllocationIds(executionVertexSchedulingRequirements);
		// 遍历 ExecutionVertexSchedulingRequirements,为每个ExecutionVertex 申请slot
		for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
			final ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId();
			final SlotRequestId slotRequestId = new SlotRequestId();
			final SlotSharingGroupId slotSharingGroupId = schedulingRequirements.getSlotSharingGroupId();
			LOG.debug("Allocate slot with id {} for execution {}", slotRequestId, executionVertexId);

			// 注释: calculatePreferredLocations 计算 slot 本地性
			CompletableFuture<LogicalSlot> slotFuture = calculatePreferredLocations(
					executionVertexId,
					schedulingRequirements.getPreferredLocations(),
					inputsLocationsRetriever).thenCompose(
(Collection<TaskManagerLocation> preferredLocations) ->	
								 //注释: NormalSlotProviderStrategy
								slotProviderStrategy.allocateSlot(
									slotRequestId,
									new ScheduledUnit(
										executionVertexId,
										slotSharingGroupId,									schedulingRequirements.getCoLocationConstraint()),			SlotProfile.priorAllocation(schedulingRequirements.getTaskResourceProfile(),								schedulingRequirements.getPhysicalSlotResourceProfile(),
										preferredLocations,
		Collections.singletonList(schedulingRequirements.getPreviousAllocationId()),
										allPreviousAllocationIds)));
			SlotExecutionVertexAssignment slotExecutionVertexAssignment =
					new SlotExecutionVertexAssignment(executionVertexId, slotFuture);
			// add to map first to avoid the future completed before added.
			pendingSlotAssignments.put(executionVertexId, slotExecutionVertexAssignment);
			slotFuture.whenComplete(
					(ignored, throwable) -> {
						pendingSlotAssignments.remove(executionVertexId);
						if (throwable != null) {
							slotProviderStrategy.cancelSlotRequest(slotRequestId, slotSharingGroupId, throwable);
						}
					});
            // 申请成功,代码执行到这,加入已申请到的slot抽象集合中
			slotExecutionVertexAssignments.add(slotExecutionVertexAssignment);
		}
        // 返回申请到的slot的抽象集合
		return slotExecutionVertexAssignments;
	}
 /**********************************************************************************/
// --> NormalSlotProviderStrategy.allocateSlot();
		@Override
		public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile) {
			/*************************************************
			 *  注释:调度SchedulerImpl来申请slot
			 */
			return slotProvider.allocateSlot(slotRequestId, scheduledUnit, slotProfile, allocationTimeout);
		}
 /**********************************************************************************/
// --> SchedulerImpl.allocateSlot();
	@Override
	public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile,
		Time allocationTimeout) {
		/*************************************************
		 *  注释:调用内部实现
		 */
		return allocateSlotInternal(slotRequestId, scheduledUnit, slotProfile, allocationTimeout);
	}
 /**********************************************************************************/
// --> SchedulerImpl.allocateSlotInternal();
	@Nonnull
	private CompletableFuture<LogicalSlot> allocateSlotInternal(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile,
		@Nullable Time allocationTimeout) {
		log.debug("Received slot request [{}] for task: {}", slotRequestId, scheduledUnit.getJobVertexId());
		componentMainThreadExecutor.assertRunningInMainThread();
		final CompletableFuture<LogicalSlot> allocationResultFuture = new CompletableFuture<>();

		/*************************************************
		 *  注释:调用内部实现
		 */
		internalAllocateSlot(allocationResultFuture, slotRequestId, scheduledUnit, slotProfile, allocationTimeout);
		return allocationResultFuture;
	}
 /**********************************************************************************/
// --> SchedulerImpl.internalAllocateSlot();
private void internalAllocateSlot(CompletableFuture<LogicalSlot> allocationResultFuture, SlotRequestId slotRequestId, ScheduledUnit scheduledUnit,
		SlotProfile slotProfile, Time allocationTimeout) {
		// 如果没有指定 SlotSharingGroupId,说明这个任务不运行slot共享,要独占一个slot
		// 如果指定getSlotSharingGroupId,则调用allocateSharedSlot 申请slot
		CompletableFuture<LogicalSlot> allocationFuture = scheduledUnit.getSlotSharingGroupId() == null ?
			/*************************************************
			 *  注释:申请单独的slot
			 */
			allocateSingleSlot(slotRequestId, slotProfile, allocationTimeout) :

			/*************************************************
			 *  注释:共享slot
			 */
			allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, allocationTimeout);

		allocationFuture.whenComplete((LogicalSlot slot, Throwable failure) -> {
			if(failure != null) {
				cancelSlotRequest(slotRequestId, scheduledUnit.getSlotSharingGroupId(), failure);
				allocationResultFuture.completeExceptionally(failure);
			} else {
				allocationResultFuture.complete(slot);
			}
		});
	}
 /**********************************************************************************/
// --> SchedulerImpl.allocateSingleSlot();
private CompletableFuture<LogicalSlot> allocateSingleSlot(SlotRequestId slotRequestId, SlotProfile slotProfile,
		@Nullable Time allocationTimeout) {
		// 先尝试从可用的slot去申请(从slotPool可用的AllocatedSlot中获取)
		Optional<SlotAndLocality> slotAndLocality = tryAllocateFromAvailable(slotRequestId, slotProfile);

		if(slotAndLocality.isPresent()) {  // 向slotPool申请slot
			// 如果有可用得slot,需要创建一个SingleLogicalSlot,并作为 AllocatedSlot的payload
			// already successful from available
			try {
				return CompletableFuture.completedFuture(completeAllocationByAssigningPayload(slotRequestId, slotAndLocality.get()));
			} catch(FlinkException e) {
				return FutureUtils.completedExceptionally(e);
			}
		} else {// 向ResourceManager申请slot,如果暂时没有可用的,如果允许排队的话,可以要求slotPool向ResourceManager申请一个新的slot
			// we allocate by requesting a new slot
			return requestNewAllocatedSlot(slotRequestId, slotProfile, allocationTimeout).thenApply((PhysicalSlot allocatedSlot) -> {
				try {

					/*************************************************
					 *  注释: SlotAndLocality
					 */
					return completeAllocationByAssigningPayload(slotRequestId, new SlotAndLocality(allocatedSlot, Locality.UNKNOWN));
				} catch(FlinkException e) {
					throw new CompletionException(e);
				}
			});
		}
	}
 /**********************************************************************************/
// --> SchedulerImpl.requestNewAllocatedSlot();
	@Nonnull
	private CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(SlotRequestId slotRequestId, SlotProfile slotProfile,
		@Nullable Time allocationTimeout) {
		if(allocationTimeout == null) {
			// 调用 SlotPoolImpl来申请slot
			return slotPool.requestNewAllocatedBatchSlot(slotRequestId, slotProfile.getPhysicalSlotResourceProfile());
		} else {
			// 调用 SlotPoolImpl来申请slot
			return slotPool.requestNewAllocatedSlot(slotRequestId, slotProfile.getPhysicalSlotResourceProfile(), allocationTimeout);
		}
	}
 /**********************************************************************************/
// --> SlotPoolImpl.requestNewAllocatedBatchSlot();
	@Nonnull
	@Override
	public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile,
		Time timeout) {

		componentMainThreadExecutor.assertRunningInMainThread();
		// 创建请求
		final PendingRequest pendingRequest = PendingRequest.createStreamingRequest(slotRequestId, resourceProfile);

		// register request timeout
		FutureUtils.orTimeout(pendingRequest.getAllocatedSlotFuture(), timeout.toMilliseconds(), TimeUnit.MILLISECONDS, componentMainThreadExecutor)
			.whenComplete((AllocatedSlot ignored, Throwable throwable) -> {
				if(throwable instanceof TimeoutException) {
					timeoutPendingSlotRequest(slotRequestId);
				}
			});
        // 申请slot
		return requestNewAllocatedSlotInternal(pendingRequest).thenApply((Function.identity()));
	}
 /**********************************************************************************/
// --> SlotPoolImpl.requestNewAllocatedSlotInternal();
   @Nonnull
	private CompletableFuture<AllocatedSlot> requestNewAllocatedSlotInternal(PendingRequest pendingRequest) {

		if(resourceManagerGateway == null) {
			// 如果还没有和ResourceManager建立连接,则先把请求存储起来,待建立连接之后再处理这些请求
			stashRequestWaitingForResourceManager(pendingRequest);
		} else {
			// 向ResourceManager申请slot
			requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
		}

		return pendingRequest.getAllocatedSlotFuture();
	}
 //*******************************************************************************************************************
// --> SlotPoolImpl.requestSlotFromResourceManager();
private void requestSlotFromResourceManager(final ResourceManagerGateway resourceManagerGateway, final PendingRequest pendingRequest) {
		//  准备将slot申请请求发送给ResourceManager
		checkNotNull(resourceManagerGateway);
		checkNotNull(pendingRequest);

		log.info("Requesting new slot [{}] and profile {} from resource manager.", pendingRequest.getSlotRequestId(),
			pendingRequest.getResourceProfile());
        // 构造一个AllocationID(目的:防止重复申请)
		final AllocationID allocationId = new AllocationID();
        // 保存请求
		pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest);

		pendingRequest.getAllocatedSlotFuture().whenComplete((AllocatedSlot allocatedSlot, Throwable throwable) -> {
			if(throwable != null || !allocationId.equals(allocatedSlot.getAllocationId())) {
				// cancel the slot request if there is a failure or if the pending request has
				// been completed with another allocated slot
				resourceManagerGateway.cancelSlotRequest(allocationId);
			}
		});
		// 调用ResourceManager的代理对象申请slot
		CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway
			.requestSlot(jobMasterId, new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress), rpcTimeout);

		FutureUtils.whenCompleteAsyncIfNotDone(rmResponse, componentMainThreadExecutor, (Acknowledge ignored, Throwable failure) -> {
			// on failure, fail the request future
			if(failure != null) {
				slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure);
			}
		});
	}

在这里插入图片描述

4.2.4.2 ResourceManager接收到请求,执行slot请求处理
/*   ResourceManager.requestSlot();
* --> SlotManagerImpl.registerSlotRequest();
* --> SlotManagerImpl.internalRequestSlot();
* --> SlotManagerImpl.allocateSlot();
* --> TaskExecutorGateway.requestSlot();
*/
// --> ResourceManager.requestSlot();
@Override
	public CompletableFuture<Acknowledge> requestSlot(JobMasterId jobMasterId, SlotRequest slotRequest, final Time timeout) {
		// 获取 JobID, SlotRequest 中携带 JobID, 判断该Job是否已经注册过
		JobID jobId = slotRequest.getJobId();
		JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);

		if(null != jobManagerRegistration) {
			// 判断申请slot的 JobMaster 和注册的 Job 的Master 地址是否一样【如果不一样,则放弃。防止因为 JobMaster迁移导致申请了双倍的slot导致资源浪费】
			if(Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
				log.info("Request slot with profile {} for job {} with allocation id {}.", slotRequest.getResourceProfile(), slotRequest.getJobId(),
					slotRequest.getAllocationId());

				try {
					// 调用 SlotManagerImpl 来申请 slot
					slotManager.registerSlotRequest(slotRequest);
				} catch(ResourceManagerException e) {
					return FutureUtils.completedExceptionally(e);
				}

				return CompletableFuture.completedFuture(Acknowledge.get());
			} else {
				return FutureUtils.completedExceptionally(new ResourceManagerException(
					"The job leader's id " + jobManagerRegistration.getJobMasterId() + " does not match the received id " + jobMasterId + '.'));
			}

		} else {
			return FutureUtils.completedExceptionally(new ResourceManagerException("Could not find registered job manager for job " + jobId + '.'));
		}
	}

//****************************************************************************************
// --> SlotManagerImpl.registerSlotRequest();
@Override
	public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {
		// 检测 SlotManagerImpl 是否已经启动
		checkInit();
        // 检测slot的申请是否重复
		if(checkDuplicateRequest(slotRequest.getAllocationId())) {
			LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId());
			return false;
		} else {
			// 将请求封装成 PendingSlotRequest
			PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);

			pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);

			try {
				// 调用内部实现
				internalRequestSlot(pendingSlotRequest);
			} catch(ResourceManagerException e) {
				// requesting the slot failed --> remove pending slot request
				pendingSlotRequests.remove(slotRequest.getAllocationId());

				throw new ResourceManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
			}

			return true;
		}
	}
//****************************************************************************************
// --> SlotManagerImpl.internalRequestSlot();
private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
		final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
		// 首先从FREE状态的已经注册的slot中选择符合要求的slot(逻辑计算),返回值包含:slotID和TaskExecutorID
		OptionalConsumer.of(findMatchingSlot(resourceProfile))
			// 找到符合条件的slot,进行分配,
			.ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest))
			//如果代码走到这,说明集群中没有足够的资源,如果是YARN模式,会走fulfillPendingSlotRequestWithPendingTaskManagerSlot分支
			.ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));
	}
//****************************************************************************************
// --> SlotManagerImpl.allocateSlot();
private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) {
		// 状态校验
		Preconditions.checkState(taskManagerSlot.getState() == TaskManagerSlot.State.FREE);
		// 获取和TaskManager的链接
		TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection();
		TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway();

		final CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
		final AllocationID allocationId = pendingSlotRequest.getAllocationId();
		final SlotID slotId = taskManagerSlot.getSlotId();
		final InstanceID instanceID = taskManagerSlot.getInstanceId();
		// TaskManagerSlot的状态变为 PENDING
		taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
		pendingSlotRequest.setRequestFuture(completableFuture);

		returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest);
		// 获取 TaskManager 注册对象
		TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID);

		if(taskManagerRegistration == null) {
			throw new IllegalStateException("Could not find a registered task manager for instance id " + instanceID + '.');
		}

		taskManagerRegistration.markUsed();

		/*************************************************
		 *  注释:向 TaskExecutor 发起 RPC请求 申请 slot
		 *  gateway= TaskExecutor slotId:TaskExecutor中的一个slot pendingSlotRequest.getJobId():对应JobMaster
		 */
		// RPC call to the task manager
		CompletableFuture<Acknowledge> requestFuture = gateway
			.requestSlot(slotId, pendingSlotRequest.getJobId(), allocationId, pendingSlotRequest.getResourceProfile(),
				pendingSlotRequest.getTargetAddress(), resourceManagerId, taskManagerRequestTimeout);

		requestFuture.whenComplete((Acknowledge acknowledge, Throwable throwable) -> {
			if(acknowledge != null) {
				completableFuture.complete(acknowledge);
			} else {
				completableFuture.completeExceptionally(throwable);
			}
		});
		// 发起向 TaskExecutor 申请 slot 请求完成
		completableFuture.whenCompleteAsync((Acknowledge acknowledge, Throwable throwable) -> {
			try {
				// 申请成功
				if(acknowledge != null) {
					// 取消pendingSlotRequest,并更新slot状态:PENDING--> ALLOCATED
					updateSlot(slotId, allocationId, pendingSlotRequest.getJobId());
				} else {
					// 申请失败
					if(throwable instanceof SlotOccupiedException) {
						SlotOccupiedException exception = (SlotOccupiedException) throwable;
						// 更新slot状态
						updateSlot(slotId, exception.getAllocationId(), exception.getJobId());
					} else {
						// 请求失败,将pendingSlotRequest从TaskManagerSlot中移除
						removeSlotRequestFromSlot(slotId, allocationId);
					}
					//报错或者取消
					if(!(throwable instanceof CancellationException)) {
						handleFailedSlotRequest(slotId, allocationId, throwable);
					} else {
						LOG.debug("Slot allocation request {} has been cancelled.", allocationId, throwable);
					}
				}
			} catch(Exception e) {
				LOG.error("Error while completing the slot allocation.", e);
			}
		}, mainThreadExecutor);
	}
//****************************************************************************************
// --> TaskExecutorGateway.requestSlot();
//****************************************************************************************

在这里插入图片描述

4.2.4.3 TaskManager处理ResourceManger发过来的slot请求
/*--> TaskExecutor.requestSlot();
*--> TaskExecutor.offerSlotsToJobManager();
*--> TaskExecutor.internalOfferSlotsToJobManager();
*--> JobMasterGateway.offerSlots();
*/
// --> TaskExecutor.requestSlot();
	@Override
	public CompletableFuture<Acknowledge> requestSlot(
		final SlotID slotId, // slotID
		final JobID jobId, // jobID
		final AllocationID allocationId,  //AllocationID
		final ResourceProfile resourceProfile, //ResourceProfile
		final String targetAddress,  // ResourceManagerID
		final ResourceManagerId resourceManagerId, // ResourceManagerID
		final Time timeout) {
		// TODO: Filter invalid requests from the resource manager by using the instance/registration Id

		log.info("Receive slot request {} for job {} from resource manager with leader id {}.", allocationId, jobId, resourceManagerId);
		// 判断当前发送请求的ResourceManger是否为当前 TaskExecutor注册的ResourceManger
		if (!isConnectedToResourceManager(resourceManagerId)) {
			final String message = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId);
			log.debug(message);
			return FutureUtils.completedExceptionally(new TaskManagerException(message));
		}

		try {
			// 分配slot
			allocateSlot(slotId, jobId, allocationId, resourceProfile);
		} catch (SlotAllocationException sae) {
			return FutureUtils.completedExceptionally(sae);
		}
		// 启动 Job,并不是一个真正的Job,而是代表一个是否有链接存在
		final JobTable.Job job;

		try {
			job = jobTable.getOrCreateJob(jobId, () -> registerNewJobAndCreateServices(jobId, targetAddress));
		} catch (Exception e) {
			// free the allocated slot
			try {
				taskSlotTable.freeSlot(allocationId);
			} catch (SlotNotFoundException slotNotFoundException) {
				// slot no longer existent, this should actually never happen, because we've
				// just allocated the slot. So let's fail hard in this case!
				onFatalError(slotNotFoundException);
			}

			// release local state under the allocation id.
			localStateStoresManager.releaseLocalStateForAllocationId(allocationId);

			// sanity check
			if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
				onFatalError(new Exception("Could not free slot " + slotId));
			}

			return FutureUtils.completedExceptionally(new SlotAllocationException("Could not create new job.", e));
		}

		if (job.isConnected()) {

			/*************************************************
			 *  注释:提供一个slot给JobManager(JobMaster)
			 */
			offerSlotsToJobManager(jobId);
		}

		return CompletableFuture.completedFuture(Acknowledge.get());
	}
//****************************************************************************************
// --> TaskExecutor.offerSlotsToJobManager();
private void offerSlotsToJobManager(final JobID jobId) {
		/*************************************************
		 *  注释:分配slot给JobMaster
		 */
		jobTable.getConnection(jobId).ifPresent(this::internalOfferSlotsToJobManager);
	}

//****************************************************************************************
// --> TaskExecutor.internalOfferSlotsToJobManager();
private void internalOfferSlotsToJobManager(JobTable.Connection jobManagerConnection) {
		// JobID
		final JobID jobId = jobManagerConnection.getJobId();

		if (taskSlotTable.hasAllocatedSlots(jobId)) {
			log.info("Offer reserved slots to the leader of job {}.", jobId);
			//获取JobMaster地址
			final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway();
			// 获取分配给当前Job的slot,这里只会取状态为Allocated的slot
			final Iterator<TaskSlot<Task>> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
			// 获取 jobMasterId
			final JobMasterId jobMasterId = jobManagerConnection.getJobMasterId();

			final Collection<SlotOffer> reservedSlots = new HashSet<>(2);
			// 每一个slot生成slotOffer
			while (reservedSlotsIterator.hasNext()) {
				SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
				reservedSlots.add(offer);
			}

			/*************************************************
			 *  注释:将自己的slot分配给JobManager(JobMaster)
			 */
			CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway
				.offerSlots(getResourceID(), reservedSlots, taskManagerConfiguration.getTimeout());
			// 处理结果
			acceptedSlotsFuture
				.whenCompleteAsync(handleAcceptedSlotOffers(jobId, jobMasterGateway, jobMasterId, reservedSlots), getMainThreadExecutor());
		} else {
			log.debug("There are no unassigned slots for the job {}.", jobId);
		}
	}

在这里插入图片描述

4.2.4.4 JobMaster 接收到 TaskManager 发送过来的 slot 申请处理结果
/*-->JobMaster.offerSlots();
* --> SlotPoolImpl.offerSlots
*/
//****************************************************************************************
// --> JobMasterGateway.offerSlots();
	@Override
	public CompletableFuture<Collection<SlotOffer>> offerSlots(final ResourceID taskManagerId, final Collection<SlotOffer> slots,
		final Time timeout) {
		// 获取 registeredTaskManagers 中该 TaskManagerID 对应的TaskManagerID
		Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId);

		if(taskManager == null) {
			return FutureUtils.completedExceptionally(new Exception("Unknown TaskManager " + taskManagerId));
		}

		final TaskManagerLocation taskManagerLocation = taskManager.f0;
		final TaskExecutorGateway taskExecutorGateway = taskManager.f1;

		/*************************************************
		 *  注释:rpcTaskManagerGateway = RpcTaskManagerGateway
		 */
		final RpcTaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway, getFencingToken());

		/*************************************************
		 *  注释:将申请到的slot放入slotPool进行管理
		 */
		return CompletableFuture.completedFuture(slotPool.offerSlots(taskManagerLocation, rpcTaskManagerGateway, slots));
	}
//****************************************************************************************
// --> SlotPoolImpl.offerSlots
	@Override
	public Collection<SlotOffer> offerSlots(TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway,
		Collection<SlotOffer> offers) {

		ArrayList<SlotOffer> result = new ArrayList<>(offers.size());

		for(SlotOffer offer : offers) {

			/*************************************************
			 *  注释:将申请到的slot放入slotPool
			 */
			if(offerSlot(taskManagerLocation, taskManagerGateway, offer)) {

				result.add(offer);
			}
		}

		return result;
	}

在这里插入图片描述

4.2.5 启动 JobMaster【Task的部署与提交】

代码核心入口:
在 DefaultScheduler 类中 waitForAllSlotsAndDeploy 的方法

waitForAllSlotsAndDeploy(deploymentHandles);
private void waitForAllSlotsAndDeploy(final List<DeploymentHandle> deploymentHandles) {

		/*************************************************
		 *  注释:分配slot,调用deployAll部署任务
		 */
		FutureUtils.assertNoException(assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles)));
	}

部署流程:

/*
 * --> DefaultScheduler.waitForAllSlotsAndDeploy(final List<DeploymentHandle> deploymentHandles)
 * --> DefaultScheduler.deployAll(final List<DeploymentHandle> deploymentHandles)
 * --> DefaultScheduler.deployOrHandleError(final DeploymentHandle deploymentHandle)
 * --> DefaultScheduler.deployTaskSafe(final ExecutionVertexID executionVertexId)
 * --> DefaultExecutionVertexOperations.deploy(final ExecutionVertex executionVertex)
 * --> ExecutionVertex.deploy()
 * --> Execution.deploy()
*/
private BiFunction<Void, Throwable, Void> deployAll(final List<DeploymentHandle> deploymentHandles) {
		return (ignored, throwable) -> {
			propagateIfNonNull(throwable);
			//获取slot申请消息
			for(final DeploymentHandle deploymentHandle : deploymentHandles) {
				final SlotExecutionVertexAssignment slotExecutionVertexAssignment = deploymentHandle.getSlotExecutionVertexAssignment();
				final CompletableFuture<LogicalSlot> slotAssigned = slotExecutionVertexAssignment.getLogicalSlotFuture();
				checkState(slotAssigned.isDone());
				/*************************************************
				 *  注释: 通过 deployOrHandleError 来进行部署,部署的时候也可能报错
				 *  部署时获取的信息:slotAssigned、deploymentHandle、slotExecutionVertexAssignment
				 */
				FutureUtils.assertNoException(slotAssigned.handle(deployOrHandleError(deploymentHandle)));
			}
			return null;
		};
	}
//***************************************************************************************************
// --> deployOrHandleError(final DeploymentHandle deploymentHandle)
private BiFunction<Object, Throwable, Void> deployOrHandleError(final DeploymentHandle deploymentHandle) {
		final ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion();
		// 获取 ExecutionVertexID(一个ExecutionVertexID对应启动一个Task)
		final ExecutionVertexID executionVertexId = requiredVertexVersion.getExecutionVertexId();

		return (ignored, throwable) -> {
			if(executionVertexVersioner.isModified(requiredVertexVersion)) {
				log.debug("Refusing to deploy execution vertex {} because this deployment was " + "superseded by another deployment",
					executionVertexId);
				return null;
			}
			if(throwable == null) {
				/*************************************************
				 *  注释: 部署 Task(根据ExecutionVertexID来确定Task)
				 */
				deployTaskSafe(executionVertexId);
			} else {
				// 部署Task报错处理
				handleTaskDeploymentFailure(executionVertexId, throwable);
			}
			return null;
		};
	}
//***************************************************************************************************
// --> deployTaskSafe(final ExecutionVertexID executionVertexId)
private void deployTaskSafe(final ExecutionVertexID executionVertexId) {
		try {

			/*************************************************
			 *  注释: 根据 ExecutionVertexId 获取 ExecutionVertex
			 */
			final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);

			/*************************************************
			 *  注释: 一个 Task 执行一个 ExecutionVertex
			 *  executionVertexOperations = DefaultExecutionVertexOperations
			 */
			executionVertexOperations.deploy(executionVertex);
		} catch(Throwable e) {
			handleTaskDeploymentFailure(executionVertexId, e);
		}
	}
//***************************************************************************************************
// --> deploy(final ExecutionVertex executionVertex)
@Override
	public void deploy(final ExecutionVertex executionVertex) throws JobException {
		/*************************************************
		 *  注释: 部署一个 ExecutionVertex
		 */
		executionVertex.deploy();
	}
//***************************************************************************************************
// --> deploy()	
public void deploy() throws JobException {
		/*************************************************
		 *  注释:调用Execution的deploy()方法部署Task
		 */
		currentExecution.deploy();
	}
//***************************************************************************************************
// --> deploy()	
public void deploy() throws JobException {
		assertRunningInJobMasterMainThread();

		final LogicalSlot slot = assignedResource;

		checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");

		// Check if the TaskManager died in the meantime
		// This only speeds up the response to TaskManagers failing concurrently to deployments.
		// The more general check is the rpcTimeout of the deployment call
		if(!slot.isAlive()) {
			throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");
		}

		// make sure exactly one deployment call happens from the correct state
		// note: the transition from CREATED to DEPLOYING is for testing purposes only
		// 状态判断,如果之前的状态是SCHEDULED或者CREATED, 现在更改为 DEPLOYING
		ExecutionState previous = this.state;
		if(previous == SCHEDULED || previous == CREATED) {
			if(!transitionState(previous, DEPLOYING)) {
				// race condition, someone else beat us to the deploying call.
				// this should actually not happen and indicates a race somewhere else
				throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
			}
		} else {
			// vertex may have been cancelled, or it was already scheduled
			throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + previous);
		}

		if(this != slot.getPayload()) {
			throw new IllegalStateException(String.format("The execution %s has not been assigned to the assigned slot.", this));
		}

		try {

			// race double check, did we fail/cancel and do we need to release the slot?
			// 如果状态不是DEPLOYING,则释放归还slot
			if(this.state != DEPLOYING) {
				slot.releaseSlot(
					new FlinkException("Actual state of execution " + this + " (" + state + ") does not match expected state DEPLOYING."));
				return;
			}

			if(LOG.isInfoEnabled()) {
				LOG.info(String
					.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(), attemptNumber, getAssignedResourceLocation()));
			}

			// 创建 TaskDeploymentDescriptor 对象,包含Task运行过程中的一切信息
			final TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory.fromExecutionVertex(vertex, attemptNumber)
				.createDeploymentDescriptor(slot.getAllocationId(), slot.getPhysicalSlotNumber(), taskRestore, producedPartitions.values());

			// null taskRestore to let it be GC'ed
			taskRestore = null;
			/*************************************************
			 *  注释: 获取一个 TaskManagerGateway
			 *  slot = SingleLogicSlot
			 *  taskManagerGateway = RpcTaskManagerGateway
			 */
			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

			final ComponentMainThreadExecutor jobMasterMainThreadExecutor = vertex.getExecutionGraph().getJobMasterMainThreadExecutor();

			/*************************************************
			 *  注释: 提交 Task
			 *  taskManagerGateway = RPCTaskManagerGateway
			 *  真正实现从JobMaster提交Task到TaskManager执行
			 */
			// We run the submission in the future executor so that the serialization of large TDDs does not block
			// the main thread and sync back to the main thread once submission is completed.
			CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor).thenCompose(Function.identity())
				.whenCompleteAsync((ack, failure) -> {
					// only respond to the failure case
					if(failure != null) {
						if(failure instanceof TimeoutException) {
							String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';

							markFailed(new Exception(
								"Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation() + ") not responding after a rpcTimeout of " + rpcTimeout,
								failure));
						} else {
							markFailed(failure);
						}
					}
				}, jobMasterMainThreadExecutor);

		} catch(Throwable t) {
			markFailed(t);
			if(isLegacyScheduling()) {
				ExceptionUtils.rethrow(t);
			}
		}
	}

在这里插入图片描述

5. 总结

  • 用户把 Flink 应用程序打成 jar 包使用 flink run … 的 shell 命令提交的时候,底层是通过 CliFrontend
    来处理。
  • StreamExecutionEnvironment 是 Flink 应用程序的执行入口,提供重要的操作机制:对接数据源的机制、设置并行度、管理 ExecutionConfig 对象、管理 Job 各种的 transformations 集合、提供 execute() 方法。
  • 客户端提交 Job 时,先获取 StreamGraph,然后进行执行 execute(StreamGraph)。将StreamGraph --> JobGraph,持久化为 JobGraph File,接下来就提交到 服务端。
  • 服务端收到客户端的请求,是有 WebMonitorEndpoint 中的 JobsubmitHandler 中的 HandleRequest() 处理。主要做了 JobGraph的反序列化,获取 jar以及依赖 jar包,上传 JobGraph + jar + 依赖 jar, 继续提交给 Dispatcher。
  • 在 Dispatcher 的内部会做两件事情:
    • 1.通过 JobManagerFactory 创建 JobMaster对象,同时将 JobGraph --> ExecutionGraph
    • 2.启动 JobMaster 服务,在启动过程中,会进行选举,同时启动心跳服务和注册服务,接下来会申请slot以及Job的部署。
  • slot 的申请的过程:JobMaster 向 ResourceManager 发送申请 slot 的请求,ResourceManager 接收到请求,执行slot请求处理,TaskManager 处理 ResourceManager发过来的slot请求,JobMaster 接收到 TaskManager 发送过来的slot 申请结果。
  • Job的部署主要是:首先进行状态校验(SCHEDULED/CREATED–>DEPLOYING),将 Task 信息封装在 TaskDeploymentDescriptor 对象中,通过 RPC 方式 将 Task 部署到 TaskExecutor 中。