跳到主要内容

Cloudflare 工作流体验

cloudflare 又又又又有新玩意可以玩了,这不得马上试试。

Cloudflare Workers 的持久执行引擎

先看官方文档介绍:workflows

工作流是一个基于 Cloudflare Workers 的持久执行引擎。工作流允许你构建多步骤应用程序,这些应用程序可以自动重试、 持久化状态并运行数分钟、数小时、数天或数周。Workflows 引入了一种编程模型,可以更轻松地构建可靠、长期运行的任务,观察任务的进展情况,并根据服务中的事件以编程方式触发实例。

那跟着他的指南试试

新手入门

工作流允许你使用 Workers 平台构建持久的多步骤应用程序。工作流可以自动重试、持久状态、运行数小时或数天,并在第三方 API 之间进行协调。

你可以建立工作流程,对上传到 R2 对象存储空间的文件进行后处理,自动生成 Workers AI 嵌入到 Vectorize 矢量数据库,或使用你最喜欢的电子邮件 API 触发用户生命周期电子邮件。

本指南将指导你完成以下操作:

  • 定义你的第一个工作流程并发布它
  • 将工作流程部署到你的 Cloudflare 帐户
  • 运行(触发)你的工作流程并观察其输出

在本指南结束时,你应该能够编写、部署和调试自己的工作流应用程序。

前提条件

  • 有cloudflare账户
  • 安装了Node.js

1. 定义你的工作流

要创建第一个工作流,请使用create cloudflare CLI 工具,指定工作流启动模板:

npm create cloudflare@latest workflows-starter -- --template "cloudflare/workflows-starter"

这将创建一个名为 workflows-starter 的新文件夹。在文本编辑器中打开 src/index.ts 文件。该文件包含以下代码,是工作流定义的最基本实例:

提示

如果创建失败,很可能是克隆模板仓库失败,直接去 https://github.com/cloudflare/workflows-starter 下载,然后删除./git 目录就可以了

src/index.ts
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';

type Env = {
// 在这里添加你的绑定,例如Workers KV、D1、Workers AI 等
MY_WORKFLOW: Workflow;
};

// 传递到你的工作流的用户定义参数
type Params = {
email: string;
metadata: Record<string, string>;
};

export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
// 可以访问 `this.env` 上的绑定
// 可以访问`event.params`上的参数

const files = await step.do('my first step', async () => {
// 从 $SOME_SERVICE 获取文件列表
return {
inputParams: event,
files: [
'doc_7392_rev3.pdf',
'report_x29_final.pdf',
'memo_2024_05_12.pdf',
'file_089_update.pdf',
'proj_alpha_v2.pdf',
'data_analysis_q2.pdf',
'notes_meeting_52.pdf',
'summary_fy24_draft.pdf',
],
};
});

const apiResponse = await step.do('some other step', async () => {
let resp = await fetch('https://api.cloudflare.com/client/v4/ips');
return await resp.json<any>();
});

await step.sleep('wait on something', '1 minute');

await step.do(
'make a call to write that could maybe, just might, fail',
// 定义重试策略
{
retries: {
limit: 5,
delay: '5 second',
backoff: 'exponential',
},
timeout: '15 minutes',
},
async () => {
// 在这里做一些事情,可以访问我们之前步骤中的状态
if (Math.random() > 0.5) {
throw new Error('API call to $STORAGE_SYSTEM failed');
}
},
);
}
}

工作流程定义:

  • 1.定义一个run方法,该方法包含工作流的主要逻辑。
  • 2.至少有一个或多个对 step.do 的调用,该调用封装了工作流的逻辑。
  • 3.(可选)允许step返回状态,这样即使后续step失败,工作流也能继续执行,而无需重新运行之前的所有step

单个 Worker 应用程序可以包含多个工作流定义,只要每个工作流都有唯一的类名。这对于代码重用或定义概念上彼此相关的工作流程非常有用。 每个工作流在其他方面都是完全独立的:一个定义多个工作流的 Worker 与一组各自定义一个工作流的 Worker 没有什么不同。

2.创建你的工作流step

工作流中的每个step都是一个独立的可重试函数。

step是工作流的强大之处,因为你可以在工作流从一个step到另一个step的过程中封装错误并持久化状态,从而避免应用程序在失败时从头开始,并最终构建出更可靠的应用程序。

  • step可以执行代码 (step.do) 或休眠工作流 (step.sleep)。
  • 如果step失败(引发异常),将根据你的重试逻辑自动重试。
  • 如果step成功,任何它返回的状态将保留在工作流程中。

最基本的step如下所示:

src/index.ts
// Import the Workflow definition
import { WorkflowEntrypoint, WorkflowEvent, WorkflowStep } from "cloudflare:workers"

type Params = {}

// 创建你自己的实现工作流的类
export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
// 定义一个 run() 方法
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
// 定义一个或多个可选择返回状态的`step`。
let state = step.do("my first step", async () => {

})

step.do("my second step", async () => {

})
}
}

每次调用 step.do 都会接受三个参数:

  • 1.(必填)一个step名称,用于在日志和遥测中标识该step
  • 2.(必填)一个回调函数,其中包含要为step运行的代码,以及希望工作流保持的任何状态
  • 3.(可选)一个 StepConfig,用于定义step的重试配置(最大重试次数、延迟和回退算法)

在决定是否将代码分解为多个step时,一个好的经验法则是问 "如果其中一个部分失败,我是否希望所有代码都重新运行?

在很多情况下,如果接下来的数据处理阶段失败,或者在尝试发送完成或欢迎电子邮件时出错,你就不希望重复调用 API。

例如,以下每个任务都理想地封装在其自己的step中,因此任何失败(例如文件不存在、第三方 API 关闭或速率受限)都不会导致整个程序失败。

  • 从 R2 读取或写入文件
  • 使用 Workers AI 运行 AI 任务
  • 查询 D1 数据库或通过 Hyperdrive 的数据库
  • 调用第三方 API

如果后续步骤失败,你的工作流可以使用前一步骤返回的任何状态从该步骤重试。这还可以帮助你避免不必要地查询数据库或重复调用付费 API 来获取已获取的数据。

3.配置你的工作流

在部署工作流之前,你需要对其进行配置。打开工作流启动器文件夹根部的 wrangler.toml 文件,其中包含以下 [[workflows]] 配置:

wrangler.toml
#:schema node_modules/wrangler/config-schema.json
name = "workflows-starter"
main = "src/index.ts"
compatibility_date = "2024-10-22"

[[workflows]]
# name of your workflow
name = "workflows-starter"
# binding name env.MY_WORKFLOW
binding = "MY_WORKFLOW"
# this is class that extends the Workflow class in src/index.ts
class_name = "MyWorkflow"

如果更改了 Wrangler 命令中工作流的名称、JavaScript 类名称或创建的项目名称,请确保更新上述值以匹配更改。

该配置会告诉 Workers 平台哪个 JavaScript 类代表你的工作流,并设置一个绑定名称,使你可以从其他处理程序运行工作流,或从其他 Workers 脚本调用工作流。

4. 绑定你的工作流

我们有一个非常基本的工作流定义,但现在需要提供一种从代码中调用它的方法。Workers 流程可以通过以下方式触发:

  • 1.通过 fetch() 处理程序处理外部 HTTP 请求
  • 2.来自队列的消息
  • 3.通过 Cron 触发器的计划
  • 4.通过 Workflows REST APIwrangler CLI

返回到我们在上一步中创建的 src/index.ts 文件,并添加一个绑定到我们的工作流的获取处理程序。此绑定允许我们创建新的工作流实例、获取现有工作流的状态、暂停和/或终止工作流。

src/index.ts

export default {
async fetch(req: Request, env: Env): Promise<Response> {
let url = new URL(req.url);

if (url.pathname.startsWith('/favicon')) {
return Response.json({}, { status: 404 });
}

// 获取现有实例的状态(如果提供)
let id = url.searchParams.get('instanceId');
if (id) {
let instance = await env.MY_WORKFLOW.get(id);
return Response.json({
status: await instance.status(),
});
}

// 生成一个新实例并返回 ID 和状态
let instance = await env.MY_WORKFLOW.create();
return Response.json({
id: instance.id,
details: await instance.status(),
});
},
};
提示

高亮的代码位置需要你替换为自己的工作流名称

此处的代码公开了一个 HTTP 接口,该接口生成随机 ID 并运行工作流,返回 ID 和工作流状态。它还接受可选的 instanceId 查询参数,该参数通过 ID 检索工作流实例的状态。

在生产应用程序中,你可以选择将身份验证放在接口前面,以便只有授权用户才能运行工作流。或者,你可以将消息从队列使用者传递到工作流,以允许长时间运行的任务。

检查你的工作流程代码

这是你在本指南开头使用 cloudflare/workflows-starter 模板时下拉取的 src/index.ts 文件的完整内容。

在部署之前,你可以查看完整的工作流代码和fetch handler允许你通过 HTTP 触发工作流:

src/index.ts
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers';

type Env = {
// Add your bindings here, e.g. Workers KV, D1, Workers AI, etc.
MY_WORKFLOW: Workflow;
};

// User-defined params passed to your workflow
type Params = {
email: string;
metadata: Record<string, string>;
};

export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
// Can access bindings on `this.env`
// Can access params on `event.params`

const files = await step.do('my first step', async () => {
// Fetch a list of files from $SOME_SERVICE
return {
inputParams: event,
files: [
'doc_7392_rev3.pdf',
'report_x29_final.pdf',
'memo_2024_05_12.pdf',
'file_089_update.pdf',
'proj_alpha_v2.pdf',
'data_analysis_q2.pdf',
'notes_meeting_52.pdf',
'summary_fy24_draft.pdf',
],
};
});

const apiResponse = await step.do('some other step', async () => {
let resp = await fetch('https://api.cloudflare.com/client/v4/ips');
return await resp.json<any>();
});

await step.sleep('wait on something', '1 minute');

await step.do(
'make a call to write that could maybe, just might, fail',
// Define a retry strategy
{
retries: {
limit: 5,
delay: '5 second',
backoff: 'exponential',
},
timeout: '15 minutes',
},
async () => {
// Do stuff here, with access to the state from our previous steps
if (Math.random() > 0.5) {
throw new Error('API call to $STORAGE_SYSTEM failed');
}
},
);
}
}

export default {
async fetch(req: Request, env: Env): Promise<Response> {
let url = new URL(req.url);

if (url.pathname.startsWith('/favicon')) {
return Response.json({}, { status: 404 });
}

// Get the status of an existing instance, if provided
let id = url.searchParams.get('instanceId');
if (id) {
let instance = await env.MY_WORKFLOW.get(id);
return Response.json({
status: await instance.status(),
});
}

// Spawn a new instance and return the ID and status
let instance = await env.MY_WORKFLOW.create();
return Response.json({
id: instance.id,
details: await instance.status(),
});
},
};

5.部署你的工作流

部署工作流与部署 Worker 相同。

npx wrangler deploy

具有有效工作流定义的 Worker 将自动被工作流注册。你可以使用 Wrangler 列出当前的工作流:

npx wrangler workflows list

输出:

asher@192 workflows-starter % npx wrangler workflows list



⛅️ wrangler 3.88.0 (update available 3.90.0)
-------------------------------------------------------

▲ [WARNING] 🚧 `wrangler workflows list` is an open-beta command. Please report any issues to https://github.com/cloudflare/workers-sdk/issues/new/choose


Showing last 1 workflow:
┌───────────────────┬───────────────────┬────────────┬────────────────────────┬────────────────────────┐
│ Name │ Script name │ Class name │ Created │ Modified │
├───────────────────┼───────────────────┼────────────┼────────────────────────┼────────────────────────┤
│ workflows-starter │ workflows-starter │ MyWorkflow │ 11/24/2024, 9:19:03 PM │ 11/24/2024, 9:19:03 PM │
└───────────────────┴───────────────────┴────────────┴────────────────────────┴────────────────────────┘

6.运行并观测你的工作流

部署工作流程后,你现在可以运行它。

  • 1.工作流可以并行运行:工作流的每个唯一调用都是该工作流的一个实例。
  • 2.实例将运行至完成(成功或失败)。
  • 3.部署较新版本的工作流将导致该点之后的所有实例运行最新的工作流代码。

由于工作流可以长期运行,因此有可能出现代表不同版本工作流代码的运行实例。

要触发我们的工作流,我们将使用 wrangler CLI 并传递一个可选的--payload。payload 将作为一个事件传递给工作流的运行方法处理程序。

npx wrangler workflows trigger workflows-starter '{"hello":"world"}'

输出:

asher@192 workflows-starter % npx wrangler workflows trigger workflows-starter '{"hello":"world"}'



⛅️ wrangler 3.88.0 (update available 3.90.0)
-------------------------------------------------------

▲ [WARNING] 🚧 `wrangler workflows trigger` is an open-beta command. Please report any issues to https://github.com/cloudflare/workers-sdk/issues/new/choose


🚀 Workflow instance "xxx" has been queued successfully

要查看我们刚刚触发的工作流实例的当前状态,我们可以通过 ID 或使用关键字 latest 来引用它:

npx wrangler@latest workflows instances describe workflows-starter latest

输出:

asher@192 workflows-starter % npx wrangler@latest workflows instances describe workflows-starter latest


Need to install the following packages:
[email protected]
Ok to proceed? (y) y


⛅️ wrangler 3.90.0
-------------------

▲ [WARNING] 🚧 `wrangler workflows instances describe` is an open-beta command. Please report any issues to https://github.com/cloudflare/workers-sdk/issues/new/choose


Workflow Name: workflows-starter
Instance Id: xxx
Version Id: xxx
Status: ▶ Running
Trigger: 🌎 API
Queued: 11/24/2024, 9:20:12 PM
Start: 11/24/2024, 9:20:12 PM
Duration: 8 hours
Last Successful Step: wait on something-1
Steps:

Name: my first step-1
Type: 🎯 Step
Start: 11/24/2024, 9:20:12 PM
End: 11/24/2024, 9:20:12 PM
Duration: 0 seconds
Success: ✅ Yes
Output: "{\"inputParams\":{\"timestamp\":\"2024-11-24T13:20:10.658Z\",\"payload\":{\"hello\":\"world\"}},\"files\":[\"doc_7392_rev3.pdf\",\"report_x29_final.pdf\",\"memo_2024_05_12.pdf\",\"file_089_update.pdf\",\"proj_alpha_v2.pdf\",\"data_analysis_q2.pdf\",\"notes_meeting_52.pdf\",\"summary_fy24_draft.pdf\"]}"
┌────────────────────────┬────────────────────────┬───────────┬────────────┐
│ Start │ End │ Duration │ State │
├────────────────────────┼────────────────────────┼───────────┼────────────┤
│ 11/24/2024, 9:20:12 PM │ 11/24/2024, 9:20:12 PM │ 0 seconds │ ✅ Success │
└────────────────────────┴────────────────────────┴───────────┴────────────┘

Name: some other step-1
Type: 🎯 Step
Start: 11/24/2024, 9:20:12 PM
End: 11/24/2024, 9:20:12 PM
Duration: 0 seconds
Success: ✅ Yes
Output: "{\"result\":{\"ipv4_cidrs\":[\"173.245.48.0/20\",\"103.21.244.0/22\",\"103.22.200.0/22\",\"103.31.4.0/22\",\"141.101.64.0/18\",\"108.162.192.0/18\",\"190.93.240.0/20\",\"188.114.96.0/20\",\"197.234.240.0/22\",\"198.41.128.0/17\",\"162.158.0.0/15\",\"104.16.0.0/13\",\"104.24.0.0/14\",\"172.64.0.0/13\",\"131.0.72.0/22\"],\"ipv6_cidrs\":[\"2400:cb00::/32\",\"2606:4700::/32\",\"2803:f800::/32\",\"2405:b500::/32\",\"2405:8100::/32\",\"2a06:98c0::/29\",\"2c0f:f248::/32\"],\"etag\":\"38f79d050aa027e3be3865e495dcc9bc\"},\"success\":true,\"errors\":[],\"messages\":[]}"
┌────────────────────────┬────────────────────────┬───────────┬────────────┐
│ Start │ End │ Duration │ State │
├────────────────────────┼────────────────────────┼───────────┼────────────┤
│ 11/24/2024, 9:20:12 PM │ 11/24/2024, 9:20:12 PM │ 0 seconds │ ✅ Success │
└────────────────────────┴────────────────────────┴───────────┴────────────┘

Name: wait on something-1
Type: 💤 Sleeping
Start: 11/24/2024, 9:20:12 PM
End: 11/24/2024, 9:21:12 PM
Duration: 1 minute

从上面的输出中,我们可以看到

  • 每个步骤的状态(成功、失败、运行
  • 步骤发出的任何状态
  • 任何睡眠状态,包括工作流何时唤醒
  • 与每个步骤相关的重试
  • 错误,包括异常信息

你不必等到工作流实例执行完毕才能查看其当前状态。wrangler 工作流实例描述子命令将显示正在执行的实例的状态,包括任何持久化状态、是否处于休眠状态以及任何错误或重试。这在开发过程中调试工作流时特别有用。

在上一步中,我们还将 Workers 脚本绑定到工作流中。你可以通过浏览器或任何 HTTP 客户端访问(已部署的)Workers 脚本来触发工作流。

# 这必须与步骤 6 中提供的 URL 匹配
curl -s https://workflows-starter.YOUR_WORKERS_SUBDOMAIN.workers.dev/

总结

体验下来,倒是不难,类似jenkins file中写不同的stage,就是感觉好像不能找到合适的场景去使用这个工作流。它既不像是定时任务,又不像是jenkins那样的自动化部署。 不过我对这个工作流比较感兴趣的一点是文档中说到的:工作流可以自动重试、持久状态、运行数小时或数天,并在第三方 API 之间进行协调。

只不过不知道如果持续运行数天,估计收费也会很高吧?因为工作流还是在beta状态,再过段时间来看估计会有更多的特性和使用场景。