使用 asynq 处理任务的基本流程可以分为以下几个步骤,简单来说就是:
- 定义任务类型:确定你要做什么任务(例如检查一个网站的状态码)。
- 创建任务:将任务数据封装为一个任务对象(
asynq.Task
)。 - 推送任务到队列:通过客户端将任务推送到 Redis 队列中。
- 定义任务处理函数:编写处理任务的函数,处理任务时做相应的操作。
- 启动任务队列服务器:启动服务器,监听任务队列,等待任务到来并处理。
详细流程步骤
1. 定义任务类型
任务类型是一个字符串,用于标识不同类型的任务。在任务执行时,服务器会根据任务类型来分配不同的处理函数。定义任务类型常量非常重要,因为任务类型会在客户端和服务器中使用,确保两者一致。
const (
CheckStatusCodeTaskType = "check:status_code"
CheckBuyStatusCodeTaskType = "check:buy_status_code"
)
2. 创建任务
使用 asynq.NewTask
创建任务。任务通常需要指定任务类型和数据(payload),数据可以是任何你需要传递的信息,比如 URL、任务参数等。
task1 := asynq.NewTask(CheckStatusCodeTaskType, nil)
task2 := asynq.NewTask(CheckBuyStatusCodeTaskType, nil)
3. 推送任务到队列
通过客户端将任务推送到 Redis 队列中。这个步骤将任务放入队列等待服务器处理。
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
defer client.Close()
_, err := client.Enqueue(task1)
if err != nil {
log.Fatalf("Failed to enqueue task1: %v", err)
}
_, err = client.Enqueue(task2)
if err != nil {
log.Fatalf("Failed to enqueue task2: %v", err)
}
4. 定义任务处理函数
任务处理函数是你在队列中执行任务时会调用的函数,它接受任务并根据任务数据来执行任务。你可以根据任务类型,定义不同的处理函数。
// 任务处理函数:获取 https://free.iosapp.icu/ 的状态码
func handleCheckStatusCodeTask(ctx context.Context, t *asynq.Task) error {
url := "https://free.iosapp.icu/"
resp, err := http.Get(url)
if err != nil {
return fmt.Errorf("failed to get status code for %s: %v", url, err)
}
defer resp.Body.Close()
fmt.Printf("Status code for %s: %d\n", url, resp.StatusCode)
return nil
}
5. 启动任务队列服务器
服务器需要与 Redis 连接,监听任务队列中的任务,一旦队列中有任务,就会调用对应的任务处理函数来处理任务。
mux := asynq.NewServeMux()
mux.HandleFunc(CheckStatusCodeTaskType, handleCheckStatusCodeTask)
mux.HandleFunc(CheckBuyStatusCodeTaskType, handleCheckBuyStatusCodeTask)
server := asynq.NewServer(asynq.RedisClientOpt{Addr: "localhost:6379"}, asynq.Config{Concurrency: 10})
if err := server.Run(mux); err != nil {
log.Fatalf("Error starting server: %v", err)
}
简单总结:
- 客户端创建任务,推送到 Redis 队列。
- 服务器连接到 Redis 队列,等待任务并处理它们。
- 任务处理函数根据任务类型执行实际任务。
核心概念:
- 客户端(Producer):负责创建任务并将其推送到队列中。
- 服务器(Consumer):负责从队列中获取任务并执行任务。
- Redis:作为队列管理的中介,存储待处理的任务。
这种设计模式类似于 生产者消费者模型,生产者负责生产任务并放入队列,消费者负责从队列中取任务并处理。
为什么要使用 Asynq?
- 任务异步执行:可以将需要较长时间处理的任务放入队列,由后台处理,避免阻塞主流程。
- 任务重试机制:如果任务执行失败,Asynq 可以自动重试任务。
- 任务并发:支持并发处理任务,通过设置
Concurrency
参数,控制同时处理任务的数量。 - Redis 支持:通过 Redis 实现任务队列管理,可靠性高且支持分布式架构。
这样一来,使用 Asynq 使得任务处理变得非常灵活且高效,适用于需要异步执行和高并发的任务场景。
package main
import (
"fmt"
"log"
"net/http"
"time"
"github.com/hibiken/asynq"
)
// 定义任务类型常量
const (
CheckStatusCodeTaskType = "check:status_code"
CheckBuyStatusCodeTaskType = "check:buy_status_code"
)
// 任务处理函数:获取 https://free.iosapp.icu/ 的状态码
func handleCheckStatusCodeTask(ctx context.Context, t *asynq.Task) error {
// 获取任务的 URL 参数
url := "https://free.iosapp.icu/"
// 发送 HTTP 请求获取状态码
resp, err := http.Get(url)
if err != nil {
return fmt.Errorf("failed to get status code for %s: %v", url, err)
}
defer resp.Body.Close()
// 打印状态码
fmt.Printf("Status code for %s: %d\n", url, resp.StatusCode)
return nil
}
// 任务处理函数:获取 https://buy.iosapp.icu/ 的状态码
func handleCheckBuyStatusCodeTask(ctx context.Context, t *asynq.Task) error {
// 获取任务的 URL 参数
url := "https://buy.iosapp.icu/"
// 发送 HTTP 请求获取状态码
resp, err := http.Get(url)
if err != nil {
return fmt.Errorf("failed to get status code for %s: %v", url, err)
}
defer resp.Body.Close()
// 打印状态码
fmt.Printf("Status code for %s: %d\n", url, resp.StatusCode)
return nil
}
func main() {
// 初始化 Asynq 客户端和队列
client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
defer client.Close()
// 定义任务:检查 https://free.iosapp.icu/ 状态码
task1 := asynq.NewTask(CheckStatusCodeTaskType, nil)
// 定义任务:检查 https://buy.iosapp.icu/ 状态码
task2 := asynq.NewTask(CheckBuyStatusCodeTaskType, nil)
// 将任务添加到队列
_, err := client.Enqueue(task1)
if err != nil {
log.Fatalf("Failed to enqueue task1: %v", err)
}
_, err = client.Enqueue(task2)
if err != nil {
log.Fatalf("Failed to enqueue task2: %v", err)
}
// 设置任务处理函数和启动任务队列
mux := asynq.NewServeMux()
mux.HandleFunc(CheckStatusCodeTaskType, handleCheckStatusCodeTask)
mux.HandleFunc(CheckBuyStatusCodeTaskType, handleCheckBuyStatusCodeTask)
// 启动工作队列,等待处理任务
server := asynq.NewServer(asynq.RedisClientOpt{Addr: "localhost:6379"}, asynq.Config{Concurrency: 10})
if err := server.Run(mux); err != nil {
log.Fatalf("Error starting server: %v", err)
}
}
asynq
中 MaxRetry
和其他参数选项的笔记
在使用 asynq
库时,Enqueue()
方法用于将任务放入队列,第二个参数是一个可选的配置项,它是一个或多个 asynq.Option
,用于指定任务的执行行为。以下是一些常见的配置项及其作用:
1. asynq.MaxRetry(n)
- 功能:设置任务的最大重试次数。
- 参数:
n
是重试的最大次数,超过最大重试次数后任务会被标记为失败。 - 示例:
asynq.MaxRetry(5)
— 任务最多重试 5 次。
2. asynq.Delay(t)
- 功能:设置任务的延迟时间,任务将在延迟时间后开始执行。
- 参数:
t
是延迟的时间,可以是time.Duration
类型。 - 示例:
asynq.Delay(time.Second * 10)
— 任务延迟 10 秒执行。
3. asynq.Schedule(t)
- 功能:设置任务的开始执行时间点,而不是立即执行。
- 参数:
t
是一个指定时间,表示任务应在该时间点开始执行。 - 示例:
asynq.Schedule(time.Now().Add(time.Hour * 1))
— 任务将在 1 小时后执行。
4. asynq.TaskTimeout(t)
- 功能:设置任务的最大超时时间,超过这个时间任务会被自动取消。
- 参数:
t
是任务的超时时间,time.Duration
类型。 - 示例:
asynq.TaskTimeout(time.Second * 30)
— 任务超时 30 秒。
5. asynq.Retention(t)
- 功能:设置任务在队列中的保留时间,超过此时间任务将被删除。
- 参数:
t
是任务保留的时间,time.Duration
类型。 - 示例:
asynq.Retention(time.Hour * 24)
— 任务在队列中最多保留 24 小时。
6. asynq.Priority(priority)
- 功能:设置任务的优先级,优先级高的任务会先被执行。
- 参数:
priority
可以是asynq.PriorityLow
,asynq.PriorityNormal
, 或asynq.PriorityHigh
。 - 示例:
asynq.Priority(asynq.PriorityHigh)
— 设置任务为高优先级。
示例:多种选项组合
你可以将多个选项组合使用,控制任务的执行行为:
mq.MClient.Enqueue(orderCallbackQueue,
asynq.MaxRetry(5),
asynq.Delay(time.Second * 10),
asynq.TaskTimeout(time.Second * 30),
asynq.Priority(asynq.PriorityHigh),
)
说明:
- 任务最多重试 5 次。
- 任务延迟 10 秒后执行。
- 如果任务在 30 秒内未完成,将会超时取消。
- 设置任务为高优先级。
总结
asynq
提供了许多选项来控制任务的行为。常见的配置项包括:
MaxRetry
:最大重试次数。Delay
:任务延迟执行。Schedule
:设置任务的开始执行时间。TaskTimeout
:设置任务的超时时间。Retention
:任务在队列中的保留时间。Priority
:设置任务的优先级。
通过合理配置这些选项,你可以精细控制任务的执行方式,从而提高任务处理的可靠性和效率。