argo workflows源码解析

网友投稿 823 2022-09-22

argo workflows源码解析

via:​​Workflow的核心Feature以及核心执行流程的源码实现进行解析讲解,Feature的实现细节请翻看Argo Workflow源码进行更深入的了解。

一、知识梳理

由于Argo本身的概念和内容较多,我这里先通过思维导图的方式梳理出其中较为关键的知识点,作为前置预备知识:

​​Workflow介绍​​

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JlT6FGDe-1657099905843)(Workflow,这里有几个问题,我们带着问题去探究Argo效果可能会更好一些:

Workflow有哪些核心组件,各自的作用是什么?Workflow的流程数据是如何实现上下文传递的?Workflow的流程管理逻辑是如何实现的?Workflow的模板以及状态数据存储在哪里?

接下来我们先梳理一下Argo Workflow的核心流程以及一些关键逻辑,然后我们再回过头来解答这些问题。

三、工程结构

​​Argo Workflow​​​的整个工程是使用经典的​​kubebuilder​​​搭建的,因此大部分目录结构和​​kubebuilder​​​保持一致。关于​​kubebuilder​​​的介绍可参考:​​class="data-table" data-id="t7a7e9d1-0QDfNbPS" data-width="" style="outline: none; border-collapse: collapse; width: 100%;">

目录名称

职责及说明

​api​

Swagger API 定义Json文件存放目录,主要是供Argo Server UI使用。

​cmd​

入口源码文件

​ - argo​

​argo CLI​

​ - argoexec​

​argoexec container image​​命令

​ - workflow-controller​

​Kubernetes CRD Controller​

​community​

开源社区相关介绍,目前就一个README.MD

​config​

​Argo Workflow Controller​​配置对象以及相关方法

​docs​

​Argo Workflow​​的相关介绍文档,与官网文档一致

​errors​

封装第三方 ​​github.com/pkg/errors​​​ 组件,​​argo Workflow​​内部使用的错误管理组件

​examples​

丰富的使用示例,主要是yaml文件

​hack​

项目使用到的脚本及工具文件

​manifests​

​Argo​​​的安装配置文件,都是些​​yaml​​​文件,使用​​kustomize​​​工具管理,关于kustomize工具的介绍请参考:​​data-id="t31e458f-6FxYX66R" style="height: 30px;">

​persist​

Argo数据库持久化封装组件,支持MySQL/PostgreSQL两种数据库。持久化主要是针对于​​Archived Workflow​​对象的存储,包含Workflow的定义以及状态数据。

​pkg​

​Argo Workflow​​的对外API定义、结构定义、客户端定义,主要提供给外部服务、客户端使用。

​ - apiclient​

​Argo Server​​​对外​​API​​相关定义、客户端组件。

​ - workflow​

​Argo Workflow Controller​​相关结构体定义。

​ - client​

​Argo Workflow Controller​​​与​​Kubernetes​​​交互的​​Client/Informer/Lister​​定义。

​server​

​Argo Server​​模块。

​test​

单元测试文件。

​ui​

​Argo Server​​​的前端​​UI NodeJS​​​源码文件,使用​​Yarn​​包管理。

​util​

项目封装的工具包模块

​workflow​

​Argo Workflow​​的核心功能逻辑封装

四、Workflow Controller

​​Argo​​​中最核心也最复杂的便是​​Workflow Controller​​​的实现。​​Argo Workflow Controller​​​的主要职责是​​CRD​​​的实现,以及​​Pod​​​的创建创建。由于​​Argo​​​采用的是​​Kubernetes CRD​​​设计,因此整体架构以及流程控制采用的是​​Kubernetes Informer​​​实现,相关背景知识可以参考之前的两篇文章:​​Kubernetes Informer及client-go资料​​​、​​Kubernetes CRD, Controller, Operator​​。

1、基本架构

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tMtDozAA-1657099905845)(Workflow Controller​​组件有一些,我个人觉得较为重要的设计给大家分享下。

1)定义与状态分离

这个其实是​​Kubernetes​​​的标准设计,即​​CRD实现​​​对象应当包含​​Spec​​​及​​Status​​​属性对象,其中​​Spec​​​对应​​CR​​​的定义,而​​Status​​​对应​​CR​​​的业务状态信息。​​Spec​​​由业务客户端创建和修改,一般创建后不会更新,在​​Informer Controller​​​处理流程中只能读取。而​​Status​​​是​​Informer Controller​​中根据业务场景的需要不断变化的字段。

2)定义与数据分离

​​Argo Workflow Template​​​应当只包含流程以及变量定义,而变量数据则是由运行时产生的,例如通过Template运行时生成到终端或者​​Artifact​​​,再通过​​Outputs​​​的定义被其他的Template引用。一个​​Node​​​执行成功之后,它的输出数据将会被保存到​​Template.Status​​​字段(​​Kubernetes etcd​​​)或者Artifact中,返回执行不会重复生成。一个​​Node​​​执行失败后,如果重新执行将会重新去拉取依赖的数据。这种定义与数据分离的设计使得​​Workflow Template​​可以预先设计,甚至可以通过UI拖拽的方式生成。

3)全局与局部变量

​​在Argo Workflow Controller​​​内部中的变量分为两种:一种是​​Workflow​​​全局生效的变量(​​globalParams​​​),一种是当前​​Template​​​生效的本地变量(​​localParams​​​)。其中全局变量也包括开发者自定义的输入/输出变量、​​Workflow Annotations&Labels​​​,这些变量也是能被​​Workflow​​全局中访问。两种变量由于访问方式不同,因此不会相互冲突。

4)模板化变量设计

​​Argo Workflow Controller​​​的变量其实主要是使用到模板解析中。在​​Controller​​​处理流程中,会看到多次的​​json.Marshal/json.Unmarshal​​​操作:通过​​json.Marhsal​​​将​​Template​​​对象转为字符串,再通过模板解析将字符串中的变量替换为真正的内容,随后再将字符串​​json.Unmarshal​​​到该对象上覆盖原有属性值。这种设计也使得​​Workflow Template​​中的变量对应的内容必须是一个具体的值(字符串/数字等基本类型),不能是一个复杂对象,否则无法完成模板解析替换。

5)多模板融合设计

在​​Argo Workflow​​​中有三个地方可以设置​​Template​​​运行模板,按照优先级顺序为:​​Default Template、Workflow Template和Node Template​​。

​​**Default Template**​​: 全局Template定义,所有创建的Workflow都会自动使用到该Template定义。

​​**Workflow Template**​​: Workflow流程中所有Node都会使用到的Template定义。

​​**Node Template**​​: 使用Steps/DAG流程调度的各个步骤/任务Node使用到的Template。

优先级高的​​Template​​​在运行时会覆盖优先级低的​​Template​​​,最终融合生成的Template再使用到​​Pod​​的创建中。

6)简化的调度控制

​​Argo Workflow​​​目前仅使用两种调度控制方式:​​Steps​​​和​​DAG​​。

​​**Steps:** ​​通过步骤的先后顺序、并行/串行控制来调度执行任务。

​​**DAG:** ​​通过有向无环图,任务之间的依赖关系来调度执行任务。

并且这两种方式可以混合使用,使得​​Argo Workflow​​基本能满足绝大部分的任务调度业务场景。

3、核心结构

整个​​Controller​​逻辑中涉及到的核心数据结构如下。

4、核心流程

主要节点流程图:​​Workflow Controller​​的细节很多、流程非常长,这里对流程做了精简,只保留了相对比较重要的执行节点,以便有侧重性进行介绍。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-j0eLELmf-1657099905849)(Serever:``6060/healthz​​​,用于​​Controller容器​​​的健康检查。不过,从执行结果来看,​​6060​​​端口的健康检查服务并没有被使用,而是使用的后续开启的​​Metrics Http Server​​作为健康检查的地址。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-i205v2Bd-1657099905849)(ConfigMap​​​的变化,当​​argo​​​的相关​​ConfigMap​​​更新后,会自动更新​​wfc​​​的相关配置,包括数据库连接​​Session​​。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0JNMhU73-1657099905849)(Handler​​​,根据各自设定的​​cache.ListWatch​​​规则对​​Event​​​进行过滤(只会监听​​argo​​创建的相关资源)。例如:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cwlwZBu0-1657099905850)(Http Server:9090​​,用于​​Prometheus​​的指标上报,内部的指标有点多,可以单独创建一个话题来研究,这里就不深究了。经典的​​Kubernetes Client Leader​​选举逻辑,当选出​​Leader​​时,在​​Leader​​节点通过​​OnStartedLeading​​回调进入​​wfc.startLeading​​逻辑。​​wfc.startLeading​​​中开始队列的开启、异步任务的创建,这里使用了​​wait.Until​​方法,该方法会每隔一段时间创建一个异步的协程执行。这里涉及到3个队列的

worker

创建:

wfc.wfQueue/wfc.podQueue/wfc.podCleanupQueue

​​wfc.wfQueue​​ 用于核心的Workflow对象的创建/修改流程控制。​​wfc.podQueue​​​ 用于​​Pod​​​的更新,其实就是当​​Pod​​​有更新时如果​​Pod​​​还存在,那么重新往​​wfc.wfQueue​​​中添加一条数据重新走一遍​​Workflow​​​的流程对​​Pod​​执行修改。​​wfc.podCleanupQueue​​​ 用于​​Pod​​​的标记完成。关闭:先关闭​​main container​​​,再关闭​​wait container​​​(关闭时先发送​​syscall.SIGTERM​​​再发送​​syscall.SIGKILL​​​信号)。删除:直接从​​Kubernetes​​​中​​Delete​​​该​​Pod​​。官方的架构图中也能看得到几个队列之间的关联关系。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pYJ8VRse-1657099905850)(Controller​​​,用于​​CRD​​​的实现,负责与​​Kubernetes Event​​​打交道。​​woc​​​负责内部的业务逻辑、流程、变量管理,因此​​woc​​​是​​Workflow​​处理中的核心业务逻辑封装对象。

6)woc.operate

毫无疑问地,接下来的控制权转交给了​​woc(wfOperationCtx)​​​,通过​​woc.operate​​进入业务逻辑处理流程。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KmW45uFB-1657099905851)(Annotations&Labels​​​,这些变量也是能被​​Workflow​​全局中访问。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MImPOHF7-1657099905852)(​​github.com/antonmedv/expr​​ 组件。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XKq7xPDm-1657099905853)(​​argoexec resource ​​​命令解析参数,容器创建通过调用​​woc.createWorkflowPod​​​创建​​Pod​​​到​​Kubernetes​​中。

​​**Data**​​

​​Data​​​类型的​​Template​​​通过​​woc.executeData​​​方法实现,​​data​​​内容通过创建一个​​argoexec​​​容器,并使用 ​​argoexec data ​​​命令解析参数,容器创建通过调用​​woc.createWorkflowPod​​​创建​​Pod​​​到​​Kubernetes​​中。

​​**ContainerSet**​​

​​ContainerSet​​​类型的​​Template​​​通过​​woc.executeContainerSet​​​方法实现,多个容器的创建通过调用​​woc.createWorkflowPod​​​创建​​Pod​​​到​​Kubernetes​​​中。关于​​ContainerSet​​​类型的​​Template​​​介绍请参考:​​& DAG**​​

​​Steps&DAG​​​类型的​​Template​​​通过​​woc.executeSteps​​​、​​woc.executeDAG​​​方法实现,内部会对多个​​Template​​​的流程进行控制,循环调用​​woc.executeTemplate​​​方法执行每个​​Template​​。

​​**Container**​​

这部分是整个​​Workflow Controller​​​调度的关键,是创建​​Pod​​​的核心逻辑。​​Container​​​类型的​​Template​​​通过​​woc.executeTemplate​​​方法实现。在该方法中,涉及到几点重要的​​Pod​​设置:

a)根据条件创建​​Init/Wait Containers​​​,内部都是通过 ​​woc.newExecContainer ​​​创建容器,容器创建时并设置通用的环境变量以及​​Volume​​挂载。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nEkkf3tR-1657099905854)(根据将开发者自定义的​​Volume​​​,按照名称关联挂载到​​Pod的Init/Wait/Main Containers​​中。

c)​​addSchedulingConstraints​​​ 方法根据​​WorkflowSpec​​​的配置来设置​​Pod​​​调度的一些调度策略,包括:​​NodeSelector/Affinity/Tolerations/SchedulerName/PriorityClassName/Priority/HostAliases/SecurityContext​​。

d)​​woc.addInputArtifactsVolumes​​​ 对于​​artifacts​​​功能特性来说是一个很重要的方法,将​​Artifacts​​​相关的​​Volume​​​挂载到​​Pod​​​中,这些​​Volume​​​包括:​​/argo/inputs/artifacts​​​ 、 ​​/mainctrfs​​​以及开发者在配置中设置的​​Volume​​地址。

如果​​Template​​​类型为​​Script​​​,那么会增加挂载一个 ​​/argo/staging​​​ 的​​emptyDir​​​类型的​​Volume​​​,用于​​Init/Wait/``Main Containers​​​之间共享​​Resource​​​内容。我们来看一个官方的例子​​(scripts-bash.yaml)​​:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8uAWY9sc-1657099905854)(​​inputs-artifacts​​​ 的​​emptyDir​​​类型volume供​​Init/Wait/Main Containers共享artifacts数据。我们来看一个官方的例子(artifacts-passing.yaml):​​

![img](& addSidecars & ``addOutputArtifactsVolumes​​​ 将​​Main Containers​​​中的​​Volume​​​同步挂载到​​Init/Wait Containers​​​中,以便于共享数据。从一个示例可以看到,​​Main Containers​​​中的​​Volume​​​在​​Init/Wait Containers​​中都有。

f)一些固定的环境变量设置,注意其中的​​Template​​​环境变量设置,将整个​​Template​​​对象转换为​​Json​​后塞到环境变量中,以便于后续容器读取:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Umlr9py6-1657099905855)(最后一次变量替换,特别是来源于​​Workflow ConfigMap​​​或者​​Volume​​属性的变量。

h)​​kubeclientset.CoreV1.Pods.Create​​​ 将之前创建的​​Pod​​​提交到​​Kubernetes​​执行创建。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jYVIWbxT-1657099905856)(Container

1、核心结构

整个​​agoexec​​逻辑中涉及到的核心数据结构如下。

2、ArgoExec Init

只有在​​Template​​​类型为​​Script​​​或者带有​​Artifacts​​​功能时,​​Argo Workflow Controller​​​才会为​​Pod​​​创建​​Init Container​​​,该​​Container​​​使用的是​​argoexec​​​镜像,通过 ​​argoexec init​​​ 命令启动运行。​​Init Container​​​主要的职责是将​​Script​​​的​​Resource​​​读取或将依赖的​​Artifacts​​​内容拉取,保存到本地挂载的共享​​Volume​​​上,便于后续启动的​​Main Container​​使用。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-D9jAw1SZ-1657099905859)(Container​​的执行流程比较简单,这里简单介绍一下。

1)iniExecutor & wfExecutor.Init

首先创建​​WorkflowExecutor​​​对象,该对象用于​​Init/Wait Containers​​的核心业务逻辑封装、流程控制执行。

在​​WorkflowExecutor​​​对象创建时会同时创建​​ContainerRuntimeExecutor​​​对象,用于​​Docker Container​​​的交互,包括​​Docker​​​终端输出读取、结果文件获取等重要操作。在默认情况下,​​WorkflowExecutor​​​会创建一个​​DockerExecutor​​对象。

此外,大家可能会对于为何能与​​Pod​​​内部的​​Container​​​交互,并且如何获取到​​Docker​​​的输出内容感觉好奇。那我们​​describe​​​一个​​Pod​​来看大家也许就明白了:

![img](Container​​​不会直接与​​Docker​​​交互,往往只有​​Wait Container​​​才会,所以​​Init Container​​​中并没有挂载该​​docker.sock​​文件。

2)wfExecutor.StageFiles

​​wfExecutor.StageFiles​​​方法用于将​​Script/Resource​​​(如果有)以文件形式存写入到本地挂载的​​Volume​​​位置,这些​​Volume​​​是​​Container​​​之间共享后续操作,后续​​Main Container​​​会通过共享​​Volume​​​访问到这些文件。需要注意的是,不同的​​Template​​类型,内容来源以及写入的磁盘位置会不同:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zh965CY6-1657099905859)(Container​​​访问。为便于扩展,​​Artifacts​​​使用了​​ArtifactDrive​​​接口设计,不同类型的​​Artifact​​可以分开实现,并根据类型进行引入,通过接口进行使用。

3、ArgoExec Wait

所有的​​Argo Workflow Template​​​在执行时都会创建一个​​Wait Container​​,这是一个非常关键的Container。该​​Container​​​负责监控 ​​Main Container​​​的生命周期,在 ​​Main Container​​​ 中的主要逻辑运行结束之后,负责将输出部分读取、持久化,这样 ​​Main Container​​ 就不用操心如何将该步产生的结果传到后面的步骤上的问题。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BTu3aG68-1657099905860)(Container​​的执行流程比较简单,这里简单介绍一下。

1)wfxecutor.Wait

该方法用于等待​​Main Container​​​完成,我们看看默认的​​DockerExecutor​​底层是怎么实现的:

2)wfExecutor.CaptureScriptResult

通过捕获​​Main Container​​的终端输出,并保存输出结果。需要特别注意的是执行结果的大小,如果超过​​256KB​​将会被强行截断。

2)wfExecutor.SaveLogs

保存日志,默认情况下会保存到​​argo​​​自带的​​minio​​​服务(使用​​S3​​​通信协议)中,该日志也可以被​​Argo Server​​中访问展示。

​​Argo​​​默认的​​ArtifactRepository​​:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KHtOhUhX-1657099905861)(​​Template.Outputs.Parameters​​ 中。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-B78xQxRF-1657099905862)(Container​​​中的​​Artifacts​​​保存到 ​​/mainctrfs​​​ 目录,并且解压(​​untar/unzip​​​)后保存临时目录​​/tmp/argo/outputs/artifacts​​​下,随后将临时目录中的​​Artifacts​​​文件将上传到​​Artifact Repository​​中。值得注意的是:

​​/mainctrfs​​​ 目录是​​Wait Container​​​与​​Main Container​​​的共享​​Volume​​​,因此直接文件​​Copy​​​即可。这是内部​​Volume​​​交互,文件都是压缩(​​tgz​​)过后的,无须解压。临时目录​​/tmp/argo/outputs/artifacts​​​下的​​Artifacts​​​文件只是用于后续的​​ArtifactDriver​​​上传到​​Artifact Repository​​​中,并且上传的文件内容需要实现解压(​​untar/unzip​​​),因为压缩的机制只是​​argo​​​内部文件交互使用,并不对外部​​ArtifactDriver​​通用。默认的​​ArtifactRepository​​​是​​minio​​​,因此执行结果也会保存到​​minio​​服务中。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-e5JU67UX-1657099905862)(Container​​​最后这一步操作很有意思。但是可能会使得​​Metadata​​​中的​​Annotation​​​会变得比较大。使用时需要注意,​​Annotation​​​本身是有大小限制的,​​Kubernetes​​​对于该项默认大小限制是​​256KB​​。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0A8k2fYp-1657099905863)(Controller​​​调度时被自动读取出来设置到​​Template​​​的​​Outputs​​​属性中,这样一个​​Template​​​执行的输出便可以被其他关联的​​Template​​引用到:

归根到底,从底层实现来讲,多个​​Template​​​传递流程数据的方式主要依靠​​Annotations、Artifacts​​​及共享​​Volume​​。

4、ArgoExec其他命令

​​ArgoExec​​​的其他命令(​​data/resource/emissary​​)主要用于流程调度过程中的内容解析,比较简单,这里不再做介绍,感兴趣可以看下源码。

六、常见问题

​​Argo Workflow​​的流程和主要逻辑梳理完了,接下来我们回答最开始的那几个问题。

由于篇幅较长,我们将问答内容迁移到了这里:​​Argo Workflow常见问题​​

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Apache DolphinScheduler 系统架构设计
下一篇:为Golang项目编写Makefile
相关文章

 发表评论

暂时没有评论,来抢沙发吧~