Flink源码剖析:Jar包任务提交流程

【Azure Developer】解决Azure Key Vault管理Storage的示例代码在中国区Azure遇见的各种认证/授权问题 – C# Example Code,【Azure Developer】使用Postman获取Azure AD中注册应用程序的授权Token,及为Azure REST API设置Authorization,【Azure Developer】Python代码通过AAD认证访问微软Azure密钥保管库(Azure Key Vault)中机密信息(Secret)

 

Flink基于用户程序生成JobGraph,提交到集群进行分布式部署运行。本篇从源码角度讲解一下Flink Jar包是如何被提交到集群的。(本文源码基于Flink 1.11.3)

1 Flink run 提交Jar包流程分析

首先分析run脚本可以找到入口类CliFrontend,这个类在main方法中解析参数,基于第二个参数定位到run方法:

try {
    // do action
    switch (action) {
        case ACTION_RUN:
            run(params);
            return 0;
        case ACTION_RUN_APPLICATION:
            runApplication(params);
            return 0;
        case ACTION_LIST:
            list(params);
            return 0;
        case ACTION_INFO:
            info(params);
            return 0;
        case ACTION_CANCEL:
            cancel(params);
            return 0;
        case ACTION_STOP:
            stop(params);
            return 0;
        case ACTION_SAVEPOINT:
            savepoint(params);
            return 0;
        case "-h":
        case "--help":
            ...
            return 0;
        case "-v":
        case "--version":
            ...
        default:
            ...
            return 1;
    }
}

在run方法中,根据classpath、用户指定的jar、main函数等信息创建PackagedProgram。在Flink中通过Jar方式提交的任务都封装成了PackagedProgram对象。

protected void run(String[] args) throws Exception {
    ...
    final ProgramOptions programOptions = ProgramOptions.create(commandLine);
    final PackagedProgram program = getPackagedProgram(programOptions);

    // 把用户的jar配置到config里面
    final List<URL> jobJars = program.getJobJarAndDependencies();
    final Configuration effectiveConfiguration = getEffectiveConfiguration(
            activeCommandLine, commandLine, programOptions, jobJars);

    try {
        executeProgram(effectiveConfiguration, program);
    } finally {
        program.deleteExtractedLibraries();
    }
}

创建PackagedProgram后,有个非常关键的步骤就是这个effectiveConfig,这里面会把相关的Jar都放入pipeline.jars这个属性里,后面pipeline提交作业时,这些jar也会一起提交到集群。

其中比较关键的是Flink的类加载机制,为了避免用户自己的jar内与其他用户冲突,采用了逆转类加载顺序的机制。

private PackagedProgram(
        @Nullable File jarFile,
        List<URL> classpaths,
        @Nullable String entryPointClassName,
        Configuration configuration,
        SavepointRestoreSettings savepointRestoreSettings,
        String... args) throws ProgramInvocationException {

    // 依赖的资源
    this.classpaths = checkNotNull(classpaths);

    // 保存点配置
    this.savepointSettings = checkNotNull(savepointRestoreSettings);

    // 参数配置
    this.args = checkNotNull(args);

    // 用户jar
    this.jarFile = loadJarFile(jarFile);

    // 自定义类加载
    this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(
        getJobJarAndDependencies(),
        classpaths,
        getClass().getClassLoader(),
        configuration);

    // 加载main函数
    this.mainClass = loadMainClass(
        entryPointClassName != null ? entryPointClassName : getEntryPointClassNameFromJar(this.jarFile),
        userCodeClassLoader);
}

在类加载器工具类中根据参数classloader.resolve-order决定是父类优先还是子类优先,默认是使用子类优先模式。

executeProgram方法内部是启动任务的核心,在完成一系列的环境初始化后(主要是类加载以及一些输出信息),会调用packagedProgram的invokeInteractiveModeForExecution的,在这个方法里通过反射调用用户的main方法。

private static void callMainMethod(Class<?> entryClass, String[] args) 
    throws ProgramInvocationException {
    ...
    Method mainMethod = entryClass.getMethod("main", String[].class);
    mainMethod.invoke(null, (Object) args);
    ...
}

执行用户的main方法后,就是flink的标准流程了。创建env、构建StreamDAG、生成Pipeline、提交到集群、阻塞运行。当main程序执行完毕,整个run脚本程序也就退出了。

总结来说,Flink提交Jar任务的流程是:
1 脚本入口程序根据参数决定做什么操作
2 创建PackagedProgram,准备相关jar和类加载器
3 通过反射调用用户Main方法
4 构建Pipeline,提交到集群

Jenkins自动部署spring boot

2 通过PackagedProgram获取Pipeline

有的时候不想通过阻塞的方式卡任务执行状态,需要通过类似JobClient的客户端异步查询程序状态,并提供停止退出的能力。

要了解这个流程,首先要了解Pipeline是什么。用户编写的Flink程序,无论是DataStream API还是SQL,最终编译出的都是Pipeline。只是DataStream API编译出的是StreamGraph,而SQL编译出的Plan。Pipeline会在env.execute()中进行编译并提交到集群。

既然这样,此时可以思考一个问题:Jar包任务是独立的Main方法,如何能抽取其中的用户程序获得Pipeline呢?

通过浏览源码的单元测试,发现了一个很好用的工具类:PackagedProgramUtils。

public static Pipeline getPipelineFromProgram(
        PackagedProgram program,
        Configuration configuration,
        int parallelism,
        boolean suppressOutput) throws CompilerException, ProgramInvocationException {

    // 切换classloader
    final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
    Thread.currentThread().setContextClassLoader(program.getUserCodeClassLoader());

    // 创建env
    OptimizerPlanEnvironment benv = new OptimizerPlanEnvironment(
        configuration,
        program.getUserCodeClassLoader(),
        parallelism);
    benv.setAsContext();
    StreamPlanEnvironment senv = new StreamPlanEnvironment(
        configuration,
        program.getUserCodeClassLoader(),
        parallelism);
    senv.setAsContext();

    try {
        // 执行用户main方法
        program.invokeInteractiveModeForExecution();
    } catch (Throwable t) {
        if (benv.getPipeline() != null) {
            return benv.getPipeline();
        }

        if (senv.getPipeline() != null) {
            return senv.getPipeline();
        }

        ...
    } finally {
        // 重置classloader
    }
}

这个工具类首先在线程内创建了一个env,这个env通过threadload保存到当前线程中。当通过反射调用用户代码main方法时,内部的getEnv函数直接从threadlocal中获取到这个env。

ThreadLocal<StreamExecutionEnvironmentFactory> factory = new ThreadLocal<>();

public static StreamExecutionEnvironment getExecutionEnvironment() {
        return Utils.resolveFactory(factory , contextEnvironmentFactory)
            .map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
            .orElseGet(StreamExecutionEnvironment::createLocalEnvironment);
    }

再回头看看env有什么特殊的。

public class StreamPlanEnvironment extends StreamExecutionEnvironment {

    private Pipeline pipeline;

    public Pipeline getPipeline() {
        return pipeline;
    }

    @Override
    public JobClient executeAsync(StreamGraph streamGraph) {
        pipeline = streamGraph;

        // do not go on with anything now!
        throw new ProgramAbortException();
    }
}

原来是重写了executeAysnc方法,当用户执行env.execute时,触发异常,从而在PackagedProgramUtils里面拦截异常,获取到用户到pipeline。

总结起来流程如下:

3 编程实战

通过阅读上述源码,可以学习到:

1 classloader类加载的父类优先和子类优先问题
2 threadlocal线程级本地变量的使用
3 PackagedProgramUtils 利用枚举作为工具类
4 PackagedProgramUtils 利用重写env,拦截异常获取pipeline。

关于pipeline如何提交到集群、如何运行,就后文再谈了。

 

【老孟Flutter】源码分析系列之InheritedWidget

给TA买糖
共{{data.count}}人
人已赞赏
经验教程

YYDS: Webpack Plugin开发

2021-1-19 20:50:00

经验教程

【Azure Developer】解决Azure Key Vault管理Storage的示例代码在中国区Azure遇见的各种认证/授权问题 - C# Example Code,【Azure Developer】使用Postman获取Azure AD中注册应用程序的授权Token,及为Azure REST API设置Authorization,【Azure Developer】Python代码通过AAD认证访问微软Azure密钥保管库(Azure Key Vault)中机密信息(Secret)

2021-1-19 21:22:00

⚠️
免责声明:根据《计算机软件保护条例》第十七条规定“为了学习和研究软件内含的设计思想和原理,通过安装、显示、传输或者存储软件等方式使用软件的,可以不经软件著作权人许可,不向其支付报酬。”您需知晓本站所有内容资源均来源于网络,仅供用户交流学习与研究使用,版权归属原版权方所有,版权争议与本站无关,用户本人下载后不能用作商业或非法用途,需在24个小时之内从您的电脑中彻底删除上述内容,否则后果均由用户承担责任;如果您访问和下载此文件,表示您同意只将此文件用于参考、学习而非其他用途,否则一切后果请您自行承担,如果您喜欢该程序,请支持正版软件,购买注册,得到更好的正版服务。 本站为个人博客非盈利性站点,所有软件信息均来自网络,所有资源仅供学习参考研究目的,并不贩卖软件,不存在任何商业目的及用途,网站会员捐赠是您喜欢本站而产生的赞助支持行为,仅为维持服务器的开支与维护,全凭自愿无任何强求。本站部份代码及教程来源于互联网,仅供网友学习交流,若您喜欢本文可附上原文链接随意转载。
无意侵害您的权益,请发送邮件至 momeis6@qq.com 或点击右侧 私信:momeis 反馈,我们将尽快处理。
0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
今日签到
有新私信 私信列表
搜索