本文主要对Argo Workflow的核心Feature以及核心执行流程的源码实现进行解析讲解,Feature的实现细节请翻看Argo Workflow源码进行更深入的了解。

一、知识梳理

由于Argo本身的概念和内容较多,我这里先通过思维导图的方式梳理出其中较为关键的知识点,作为前置预备知识:

https://whimsical.com/kubernetes-argo-framework-UZXKpyqfjqMzRx6mxuEdNt@2Ux7TurymN5ZuzLYocBL

一些基本的概念和功能介绍这里不再赘述,可以参考之前的一篇Argo介绍文章:Argo Workflow介绍

二、充满好奇

为了更好地学习Argo Workflow,这里有几个问题,我们带着问题去探究Argo效果可能会更好一些:

  1. Workflow有哪些核心组件,各自的作用是什么?
  2. Workflow的流程数据是如何实现上下文传递的?
  3. Workflow的流程管理逻辑是如何实现的?
  4. Workflow的模板以及状态数据存储在哪里?

接下来我们先梳理一下Argo Workflow的核心流程以及一些关键逻辑,然后我们再回过头来解答这些问题。

三、工程结构

Argo Workflow的整个工程是使用经典的kubebuilder搭建的,因此大部分目录结构和kubebuilder保持一致。关于kubebuilder的介绍可参考:https://cloudnative.to/kubebuilder/

目录名称职责及说明
apiSwagger API 定义Json文件存放目录,主要是供Argo Server UI使用。
cmd入口源码文件
 - argoargo CLI
 - argoexecargoexec container image命令
 - workflow-controllerKubernetes CRD Controller
community开源社区相关介绍,目前就一个README.MD
configArgo Workflow Controller配置对象以及相关方法
docsArgo Workflow的相关介绍文档,与官网文档一致
errors封装第三方 github.com/pkg/errors 组件,argo Workflow内部使用的错误管理组件
examples丰富的使用示例,主要是yaml文件
hack项目使用到的脚本及工具文件
manifestsArgo的安装配置文件,都是些yaml文件,使用kustomize工具管理,关于kustomize工具的介绍请参考:https://kubernetes.io/zh/docs/tasks/manage-kubernetes-objects/kustomization/
persistArgo数据库持久化封装组件,支持MySQL/PostgreSQL两种数据库。持久化主要是针对于Archived Workflow对象的存储,包含Workflow的定义以及状态数据。
pkgArgo Workflow的对外API定义、结构定义、客户端定义,主要提供给外部服务、客户端使用。
 - apiclientArgo Server对外API相关定义、客户端组件。
 - workflowArgo Workflow Controller相关结构体定义。
 - clientArgo Workflow ControllerKubernetes交互的Client/Informer/Lister定义。
serverArgo Server模块。
test单元测试文件。
uiArgo Server的前端UI NodeJS源码文件,使用Yarn包管理。
util项目封装的工具包模块
workflowArgo Workflow的核心功能逻辑封装

四、Workflow Controller

Argo中最核心也最复杂的便是Workflow Controller的实现。Argo Workflow Controller的主要职责是CRD的实现,以及Pod的创建创建。由于Argo采用的是Kubernetes CRD设计,因此整体架构以及流程控制采用的是Kubernetes Informer实现,相关背景知识可以参考之前的两篇文章:Kubernetes Informer及client-go资料Kubernetes CRD, Controller, Operator

1、基本架构

2、重要设计

Argo Workflow Controller组件有一些,我个人觉得较为重要的设计给大家分享下。

1)定义与状态分离

这个其实是Kubernetes的标准设计,即CRD实现对象应当包含SpecStatus属性对象,其中Spec对应CR的定义,而Status对应CR的业务状态信息。Spec由业务客户端创建和修改,一般创建后不会更新,在Informer Controller处理流程中只能读取。而StatusInformer Controller中根据业务场景的需要不断变化的字段。

2)定义与数据分离

Argo Workflow Template应当只包含流程以及变量定义,而变量数据则是由运行时产生的,例如通过Template运行时生成到终端或者Artifact,再通过Outputs的定义被其他的Template引用。一个Node执行成功之后,它的输出数据将会被保存到Template.Status字段(Kubernetes etcd)或者Artifact中,返回执行不会重复生成。一个Node执行失败后,如果重新执行将会重新去拉取依赖的数据。这种定义与数据分离的设计使得Workflow Template可以预先设计,甚至可以通过UI拖拽的方式生成。

3)全局与局部变量

在Argo Workflow Controller内部中的变量分为两种:一种是Workflow全局生效的变量(globalParams),一种是当前Template生效的本地变量(localParams)。其中全局变量也包括开发者自定义的输入/输出变量、Workflow Annotations&Labels,这些变量也是能被Workflow全局中访问。两种变量由于访问方式不同,因此不会相互冲突。

4)模板化变量设计

Argo Workflow Controller的变量其实主要是使用到模板解析中。在Controller处理流程中,会看到多次的json.Marshal/json.Unmarshal操作:通过json.MarhsalTemplate对象转为字符串,再通过模板解析将字符串中的变量替换为真正的内容,随后再将字符串json.Unmarshal到该对象上覆盖原有属性值。这种设计也使得Workflow Template中的变量对应的内容必须是一个具体的值(字符串/数字等基本类型),不能是一个复杂对象,否则无法完成模板解析替换。

5)多模板融合设计

Argo Workflow中有三个地方可以设置Template运行模板,按照优先级顺序为:Default Template、Workflow Template和Node Template

Default Template: 全局Template定义,所有创建的Workflow都会自动使用到该Template定义。

Workflow Template: Workflow流程中所有Node都会使用到的Template定义。

Node Template: 使用Steps/DAG流程调度的各个步骤/任务Node使用到的Template。

优先级高的Template在运行时会覆盖优先级低的Template,最终融合生成的Template再使用到Pod的创建中。

6)简化的调度控制

Argo Workflow目前仅使用两种调度控制方式:StepsDAG

Steps: 通过步骤的先后顺序、并行/串行控制来调度执行任务。

DAG: 通过有向无环图,任务之间的依赖关系来调度执行任务。

并且这两种方式可以混合使用,使得Argo Workflow基本能满足绝大部分的任务调度业务场景。

3、核心结构

整个Controller逻辑中涉及到的核心数据结构如下。

数据结构结构介绍

WorkflowController

用于Workflow Controller流程控制的核心数据结构对象,封装了主要的Controller处理逻辑、维护着核心的相关业务逻辑对象、数据队列、KubeClient对象、Informer对象等等。该结构只有一个对象实例,由主流程创建。

Workflow

Workflow的内容管理对象,用于Workflow的逻辑处理。

WorkflowSpec

Workflow的内容定义映射对象,与开发者使用的yaml文件结构一一对应。需要注意与WorkflowStatus的区别:

  • WorkflowSpecWorkflow的定义,来源于Workflow Yaml配置以及对象初始化。初始化完成后再运行时不会执行修改操作,运行时操作中只对Spec对象执行读取操作。
  • WorkflowStatusWorkflow运行时的状态信息管理对象,因为状态信息会不断变化,因此内部的属性也会不停地被修改。

WorkflowStatus

Workflow逻辑处理流程中的运行时状态信息管理对象。该结构是与Kubernetes Pod操作相关的资源结构。几点重要的说明:

1、StoredTemplates

该属性是一个Map类型,存放了当前Workflow所有的Template对象,以便于全局访问。键名为生成的TemplateID,生成规则为:

Scope/MetaName/TemplateName


WorkflowStep

是的,你没猜错,这个是用来管理执行流程控制的每一个操作步骤对象。该步骤对象必然会绑定一个Template对象。

Workflow的初始化执行步骤是通过woc.execWf.Spec.Entrypoint 作为入口Template

wfOperationCtx

Workflow业务逻辑封装对象。

几点重要的说明:

1、wf/orig/execWf

1)wf

该对象是开发者通过yaml创建的Workflow对象的深度拷贝对象。官方注释建议运行时逻辑处理中应当使用execWf而不是wf对象,wf对象未来可能会被废弃掉。

2)orig

该对象是开发者通过yaml创建的Workflow对象,任何时候开发者都不应当去修改它,该对象主要用于后续可以对Workflowpatch更新判断。

3)execWf

该对象是运行时逻辑处理中修改的Workflow对象,因为Workflow对象会在逻辑处理中不断被修改更新,特别是execWf是多个模板(Wf/WfDefault/WfTemplate)的合并结构。

2、globalParams

全局变量,类型为map[string]string,该Workflow中的所有template共享该变量,该变量的名称也可被用于template中的模板变量。

3、update

该属性用于标识当前Workflow对象是否已更新,以便判断是否同步到Kubernetes中。

4、node

woc处理流程的源码中会出现node的概念,这里的nodeSteps/DAG中的执行节点,每一个节点都会运行一个Pod来执行。注意它和Template不是一个概念。

templateresolution.

Context

如注释所示,用于Workflow中的template检索。

4、核心流程

主要节点流程图:https://whimsical.com/kubernetes-argo-controller-4BkPmeF1ZNP548D3JmaHhS@2Ux7TurymME7dMV1vz75

由于Argo Workflow Controller的细节很多、流程非常长,这里对流程做了精简,只保留了相对比较重要的执行节点,以便有侧重性进行介绍。

1)WorkflowController

Controller启动是由Cobra命令行组件管理,通过workflow-controller命令执行启动。启动后创建WorkflowController对象,并执行该对象的Run方法将流程的控制交给了该对象维护。这里同时会创建一个HTTP Serever:6060/healthz,用于Controller容器的健康检查。不过,从执行结果来看,6060端口的健康检查服务并没有被使用,而是使用的后续开启的Metrics Http Server作为健康检查的地址。

  • 在初始化WorkflowController时会自动创建内部的一个Informer对象Watch ConfigMap的变化,argo的相关ConfigMap更新后,会自动更新wfc的相关配置,包括数据库连接Session

2)wfController.Run

WorkflowCotroller首先会进行大量的初始化操作,主要如下:

  • 创建wfc.wfInformer/wfc.wftmplInformer/wfc.podInformer/wfc.cwftmplInformer并绑定相关的Event Handler,根据各自设定的cache.ListWatch规则对Event进行过滤(只会监听argo创建的相关资源)。例如:

  • 创建Metrics Http Server:9090,用于Prometheus的指标上报,内部的指标有点多,可以单独创建一个话题来研究,这里就不深究了。
  • 经典的Kubernetes Client Leader选举逻辑,当选出Leader时,在Leader节点通过OnStartedLeading回调进入wfc.startLeading逻辑。
  • wfc.startLeading中开始队列的开启、异步任务的创建,这里使用了wait.Until方法,该方法会每隔一段时间创建一个异步的协程执行。
  • 这里涉及到3个队列的worker创建:wfc.wfQueue/wfc.podQueue/wfc.podCleanupQueue
    • wfc.wfQueue 用于核心的Workflow对象的创建/修改流程控制。
    • wfc.podQueue 用于Pod的更新,其实就是当Pod有更新时如果Pod还存在,那么重新往wfc.wfQueue中添加一条数据重新走一遍Workflow的流程对Pod执行修改。
    • wfc.podCleanupQueue 用于Pod的标记完成。关闭:先关闭main container,再关闭wait container(关闭时先发送syscall.SIGTERM再发送syscall.SIGKILL信号)。删除:直接从KubernetesDeletePod
    • 官方的架构图中也能看得到几个队列之间的关联关系。

 

3)wrc.wfQueue

wfc.wfQueue是最核心的一个消息队列,接下来我们主要学习对于该队列的业务逻辑处理。

4)util.FromUnstructured

由于我们的wfc.wfInformer使用的是dynamicInterface过滤类型,因此所有的事件对象都是unstructured.Unstructured对象(其实是一个map[string]interface{}),无法直接通过断言转换为Workflow对象。因此这里使用了util.FromUnstructured方法将unstructured.Unstructured对象转换为Workflow对象。

5)newWorkflowOperationCtx

该方法会创建核心的wfOperationCtx对象,该对象是在Workflow处理中核心的上下文流程和变量管理对象,接下来wfc(WorkflowController)会将业务逻辑的流程控制转交给woc(wfOperationCtx)来管理。我们可以这么来理解,wfc是一个Kubernetes Controller,用于CRD的实现,负责与Kubernetes Event打交道。woc负责内部的业务逻辑、流程、变量管理,因此wocWorkflow处理中的核心业务逻辑封装对象。

6)woc.operate

毫无疑问地,接下来的控制权转交给了woc(wfOperationCtx),通过woc.operate进入业务逻辑处理流程。

7)woc.setExecWorkflow

  • 通过woc.execWf属性对象设置wocvolumes磁盘挂载。
  • 通过woc.setGlobalParameters设置wocglobalParams全局变量。
  • 通过woc.substituteGlobalVariables解析woc.execWf.Spec中的模板变量。

8)woc.createTemplateContext

通过woc.CreateTemplateContext创建templateresolution.Context,该对象用于Workflow中的template检索。

9)woc.substituteParamsInVolumes

通过woc.substituteParamsInVolumes方法解析替换Volume配置中的变量内容。

10)woc.createPVCs

通过woc.createPVCs方法根据woc.execWf.Spec.VolumeClaimTemplates配置创建PVC

11)woc.executeTemplate

  • 通过woc.executeTemplate方法开始执行Workflow中的Template,入口为woc.execWf.Spec.Entrypoint

  • 内部会根据给定的Entrypoint先去StoredTemplates检索对应的Template对象,找到之后对该Template对象做深度拷贝并返回该拷贝对象。如果找不到则去Workflow对象中查找,并缓存、返回查找到的Template对象。

12)woc.mergedTemplateDefaultsInto

关于什么是TemplateDefaults请参考章节介绍:https://argoproj.github.io/argo-workflows/template-defaults/

通过woc.mergedTemplateDefaultsInto方法将用户配置的TemplateDefaults合并到当前操作的Template对象上。

13)common.ProcessArgs

common.ProcessArgs方法主要用于Template的模板变量解析。

注意:argo内部中的变量分为两种,一种是Workflow全局生效的变量(globalParams),一种是当前Template生效的本地变量(localParams)。其中全局变量也包括开发者自定义的输入/输出变量、Workflow Annotations&Labels,这些变量也是能被Workflow全局中访问。

在模板变量解析中,还有一个关键的点。Argo的模板变量是支持表达式的,表达式解析是使用 github.com/antonmedv/expr 组件。

14)processedTmpl.Memoize

processdTmpl.Memoize配置用于开发者自定义是否缓存当前Template执行结果,具体介绍请参考章节:https://argoproj.github.io/argo-workflows/memoization/#using-memoization

15)processedTmpl.GetType

接下来是Template执行的关键地方,根据不同的Template类型,执行不同的操作逻辑。从流程图中可以看到,最关键的是Container类型,以及Steps&DAG类型。其中Container类型是所有Template执行的终点,也就是说Template执行最终是需要一个容器来实现。而Steps&DAG类型用于控制用户编排的Template流程,通过循环执行的方式,最终也会落到Container类型中去执行。

  • Suspend

Suspend类型的Template通过woc.executeSuspend方法实现,内部只是将当前的Template标记一下更新时间和Suspend的时间并重新丢回队列以便下一次判断。


  • Script

Script类型的Template通过woc.executeScript方法实现,内部判断当前的Script是否有其他Template在使用,随后调用woc.createWorkflowPod创建PodKubernetes中。


  • Resource

Resource类型的Template通过woc.executeResource方法实现,Resource内容通过创建一个argoexec容器,并使用 argoexec resource 命令解析参数,容器创建通过调用woc.createWorkflowPod创建PodKubernetes中。


  • Data

Data类型的Template通过woc.executeData方法实现,data内容通过创建一个argoexec容器,并使用 argoexec data 命令解析参数,容器创建通过调用woc.createWorkflowPod创建PodKubernetes中。


  • ContainerSet

ContainerSet类型的Template通过woc.executeContainerSet方法实现,多个容器的创建通过调用woc.createWorkflowPod创建PodKubernetes中。关于ContainerSet类型的Template介绍请参考:https://argoproj.github.io/argo-workflows/container-set-template/


  • Steps & DAG

Steps&DAG类型的Template通过woc.executeStepswoc.executeDAG方法实现,内部会对多个Template的流程进行控制,循环调用woc.executeTemplate方法执行每个Template


  • Container

这部分是整个Workflow Controller调度的关键,是创建Pod的核心逻辑。Container类型的Template通过woc.executeTemplate方法实现。在该方法中,涉及到几点重要的Pod设置:

a)根据条件创建Init/Wait Containers,内部都是通过 woc.newExecContainer 创建容器,容器创建时并设置通用的环境变量以及Volume挂载。

b)addVolumeReferences 根据将开发者自定义的Volume,按照名称关联挂载到PodInit/Wait/Main Containers中。

c)addSchedulingConstraints 方法根据WorkflowSpec的配置来设置Pod调度的一些调度策略,包括:NodeSelector/Affinity/Tolerations/SchedulerName/PriorityClassName/Priority/HostAliases/SecurityContext

d)woc.addInputArtifactsVolumes 对于artifacts功能特性来说是一个很重要的方法,将Artifacts相关的Volume挂载到Pod中,这些Volume包括:/argo/inputs/artifacts 、 /mainctrfs以及开发者在配置中设置的Volume地址。

如果Template类型为Script,那么会增加挂载一个 /argo/stagingemptyDir类型的Volume,用于Init/Wait/Main Containers之间共享Resource内容。我们来看一个官方的例子(scripts-bash.yaml)

在使用artifacts配置的时候,它会创建一个名称为 inputs-artifactsemptyDir类型volume供Init/Wait/Main Containers共享artifacts数据。我们来看一个官方的例子(artifacts-passing.yaml):

e)addInitContainers & addSidecars & addOutputArtifactsVolumesMain Containers中的Volume同步挂载到Init/Wait Containers中,以便于共享数据。从一个示例可以看到,Main Containers中的VolumeInit/Wait Containers中都有。

f)一些固定的环境变量设置,注意其中的Template环境变量设置,将整个Template对象转换为Json后塞到环境变量中,以便于后续容器读取:

g)substituePodParams 最后一次变量替换,特别是来源于Workflow ConfigMap或者Volume属性的变量。

h)kubeclientset.CoreV1.Pods.Create 将之前创建的Pod提交到Kubernetes执行创建。

五、ArgoExec Container

1、核心结构

整个agoexec逻辑中涉及到的核心数据结构如下。

数据结构简要介绍

WorkflowExecutor

用于Init/Wait Containers的运行管理核心对象。

ContainerRuntimeExecutor

如注释所示,用于与Docker Container进行交互的API接口。

Artifact

Artifact资源管理对象。

ArtifactDriver

用于Artifacts的驱动管理。Argo默认支持多种Artifacts驱动。

ArchiveStrategy

ArchiveStrategy用以标识该Artifact的压缩策略。

2、ArgoExec Init

只有在Template类型为Script或者带有Artifacts功能时,Argo Workflow Controller才会为Pod创建Init Container,该Container使用的是argoexec镜像,通过 argoexec init 命令启动运行。Init Container主要的职责是将ScriptResource读取或将依赖的Artifacts内容拉取,保存到本地挂载的共享Volume上,便于后续启动的Main Container使用。

由于Init Container的执行流程比较简单,这里简单介绍一下。

1)iniExecutor & wfExecutor.Init

首先创建WorkflowExecutor对象,该对象用于Init/Wait Containers的核心业务逻辑封装、流程控制执行。

WorkflowExecutor对象创建时会同时创建ContainerRuntimeExecutor对象,用于Docker Container的交互,包括Docker终端输出读取、结果文件获取等重要操作。在默认情况下,WorkflowExecutor会创建一个DockerExecutor对象。

此外,大家可能会对于为何能与Pod内部的Container交互,并且如何获取到Docker的输出内容感觉好奇。那我们describe一个Pod来看大家也许就明白了:

可以看到,容器中挂载了docker.dock文件到本地,以便本地可以通过docker命令与docker进行交互。当然Init Container不会直接与Docker交互,往往只有Wait Container才会,所以Init Container中并没有挂载该docker.sock文件。

2)wfExecutor.StageFiles

wfExecutor.StageFiles方法用于将Script/Resource(如果有)以文件形式存写入到本地挂载的Volume位置,这些VolumeContainer之间共享后续操作,后续Main Container会通过共享Volume访问到这些文件。需要注意的是,不同的Template类型,内容来源以及写入的磁盘位置会不同:

3)wfExecutor.LoadArtifacts

该方法仅在使用了Artifacts功能的场景下有效。负责将配置的Artifact拉取到本地,并根据压缩策略进行解压,修改权限,以便下一步Main Container访问。为便于扩展,Artifacts使用了ArtifactDrive接口设计,不同类型的Artifact可以分开实现,并根据类型进行引入,通过接口进行使用。

3、ArgoExec Wait

所有的Argo Workflow Template在执行时都会创建一个Wait Container这是一个非常关键的Container。该Container负责监控 Main Container的生命周期,在 Main Container 中的主要逻辑运行结束之后,负责将输出部分读取、持久化,这样 Main Container 就不用操心如何将该步产生的结果传到后面的步骤上的问题。

由于Wait Container的执行流程比较简单,这里简单介绍一下。

1)wfxecutor.Wait

该方法用于等待Main Container完成,我们看看默认的DockerExecutor底层是怎么实现的:

2)wfExecutor.CaptureScriptResult

通过捕获Main Container的终端输出,并保存输出结果。需要特别注意的是执行结果的大小,如果超过256KB将会被强行截断。

2)wfExecutor.SaveLogs

保存日志,默认情况下会保存到argo自带的minio服务(使用S3通信协议)中,该日志也可以被Argo Server中访问展示。

Argo默认的ArtifactRepository

3)wfxecutor.SaveParameters

只有在Template中存在Outputs配置时才会执行该逻辑,该方法将容器执行的结果保存到当前 Template.Outputs.Parameters 中。

3)wfxecutor.SaveArtifacts

如果Template存在Artifacts操作时,该方法用于读取Main Container中的Artifacts保存到 /mainctrfs 目录,并且解压(untar/unzip)后保存临时目录/tmp/argo/outputs/artifacts下,随后将临时目录中的Artifacts文件将上传到Artifact Repository中。值得注意的是:

  • /mainctrfs 目录是Wait ContainerMain Container的共享Volume,因此直接文件Copy即可。这是内部Volume交互,文件都是压缩(tgz)过后的,无须解压。
  • 临时目录 /tmp/argo/outputs/artifacts下的Artifacts文件只是用于后续的ArtifactDriver上传到Artifact Repository中,并且上传的文件内容需要实现解压(untar/unzip),因为压缩的机制只是argo内部文件交互使用,并不对外部ArtifactDriver通用。
  • 默认的ArtifactRepositoryminio,因此执行结果也会保存到minio服务中。

4)wfExecutor.AnnotateOutputs

Wait Container最后这一步操作很有意思。但是可能会使得Metadata中的Annotation会变得比较大。使用时需要注意,Annotation本身是有大小限制的,Kubernetes对于该项默认大小限制是256KB

这个Annotations会在Workflow Controller调度时被自动读取出来设置到TemplateOutputs属性中,这样一个Template执行的输出便可以被其他关联的Template引用到:

归根到底,从底层实现来讲,多个Template传递流程数据的方式主要依靠Annotations、Artifacts及共享Volume

4、ArgoExec其他命令

ArgoExec的其他命令(data/resource/emissary)主要用于流程调度过程中的内容解析,比较简单,这里不再做介绍,感兴趣可以看下源码。

六、常见问题

Argo Workflow的流程和主要逻辑梳理完了,接下来我们回答最开始的那几个问题。

由于篇幅较长,我们将问答内容迁移到了这里:Argo Workflow常见问题







Content Menu

  • No labels

1 Comment

  1. 非常感谢,在公司已使用argo workflow处理数据集半年了,正好想学习下它的源码,为社区做下贡献。