使用 asynq 处理任务的基本流程可以分为以下几个步骤,简单来说就是:

  1. 定义任务类型:确定你要做什么任务(例如检查一个网站的状态码)。
  2. 创建任务:将任务数据封装为一个任务对象(asynq.Task)。
  3. 推送任务到队列:通过客户端将任务推送到 Redis 队列中。
  4. 定义任务处理函数:编写处理任务的函数,处理任务时做相应的操作。
  5. 启动任务队列服务器:启动服务器,监听任务队列,等待任务到来并处理。

详细流程步骤

1. 定义任务类型

任务类型是一个字符串,用于标识不同类型的任务。在任务执行时,服务器会根据任务类型来分配不同的处理函数。定义任务类型常量非常重要,因为任务类型会在客户端和服务器中使用,确保两者一致。

const (
    CheckStatusCodeTaskType   = "check:status_code"
    CheckBuyStatusCodeTaskType = "check:buy_status_code"
)

2. 创建任务

使用 asynq.NewTask 创建任务。任务通常需要指定任务类型和数据(payload),数据可以是任何你需要传递的信息,比如 URL、任务参数等。

task1 := asynq.NewTask(CheckStatusCodeTaskType, nil)
task2 := asynq.NewTask(CheckBuyStatusCodeTaskType, nil)

3. 推送任务到队列

通过客户端将任务推送到 Redis 队列中。这个步骤将任务放入队列等待服务器处理。

client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
defer client.Close()

_, err := client.Enqueue(task1)
if err != nil {
    log.Fatalf("Failed to enqueue task1: %v", err)
}

_, err = client.Enqueue(task2)
if err != nil {
    log.Fatalf("Failed to enqueue task2: %v", err)
}

4. 定义任务处理函数

任务处理函数是你在队列中执行任务时会调用的函数,它接受任务并根据任务数据来执行任务。你可以根据任务类型,定义不同的处理函数。

// 任务处理函数:获取 https://free.iosapp.icu/ 的状态码
func handleCheckStatusCodeTask(ctx context.Context, t *asynq.Task) error {
    url := "https://free.iosapp.icu/"
    resp, err := http.Get(url)
    if err != nil {
        return fmt.Errorf("failed to get status code for %s: %v", url, err)
    }
    defer resp.Body.Close()
    fmt.Printf("Status code for %s: %d\n", url, resp.StatusCode)
    return nil
}

5. 启动任务队列服务器

服务器需要与 Redis 连接,监听任务队列中的任务,一旦队列中有任务,就会调用对应的任务处理函数来处理任务。

mux := asynq.NewServeMux()
mux.HandleFunc(CheckStatusCodeTaskType, handleCheckStatusCodeTask)
mux.HandleFunc(CheckBuyStatusCodeTaskType, handleCheckBuyStatusCodeTask)

server := asynq.NewServer(asynq.RedisClientOpt{Addr: "localhost:6379"}, asynq.Config{Concurrency: 10})
if err := server.Run(mux); err != nil {
    log.Fatalf("Error starting server: %v", err)
}

简单总结:

  1. 客户端创建任务,推送到 Redis 队列
  2. 服务器连接到 Redis 队列,等待任务并处理它们。
  3. 任务处理函数根据任务类型执行实际任务。

核心概念:

  • 客户端(Producer):负责创建任务并将其推送到队列中。
  • 服务器(Consumer):负责从队列中获取任务并执行任务。
  • Redis:作为队列管理的中介,存储待处理的任务。

这种设计模式类似于 生产者消费者模型,生产者负责生产任务并放入队列,消费者负责从队列中取任务并处理。

为什么要使用 Asynq?

  • 任务异步执行:可以将需要较长时间处理的任务放入队列,由后台处理,避免阻塞主流程。
  • 任务重试机制:如果任务执行失败,Asynq 可以自动重试任务。
  • 任务并发:支持并发处理任务,通过设置 Concurrency 参数,控制同时处理任务的数量。
  • Redis 支持:通过 Redis 实现任务队列管理,可靠性高且支持分布式架构。

这样一来,使用 Asynq 使得任务处理变得非常灵活且高效,适用于需要异步执行和高并发的任务场景。

    package main

import (
    "fmt"
    "log"
    "net/http"
    "time"

    "github.com/hibiken/asynq"
)

// 定义任务类型常量
const (
    CheckStatusCodeTaskType   = "check:status_code"
    CheckBuyStatusCodeTaskType = "check:buy_status_code"
)

// 任务处理函数:获取 https://free.iosapp.icu/ 的状态码
func handleCheckStatusCodeTask(ctx context.Context, t *asynq.Task) error {
    // 获取任务的 URL 参数
    url := "https://free.iosapp.icu/"

    // 发送 HTTP 请求获取状态码
    resp, err := http.Get(url)
    if err != nil {
        return fmt.Errorf("failed to get status code for %s: %v", url, err)
    }
    defer resp.Body.Close()

    // 打印状态码
    fmt.Printf("Status code for %s: %d\n", url, resp.StatusCode)
    return nil
}

// 任务处理函数:获取 https://buy.iosapp.icu/ 的状态码
func handleCheckBuyStatusCodeTask(ctx context.Context, t *asynq.Task) error {
    // 获取任务的 URL 参数
    url := "https://buy.iosapp.icu/"

    // 发送 HTTP 请求获取状态码
    resp, err := http.Get(url)
    if err != nil {
        return fmt.Errorf("failed to get status code for %s: %v", url, err)
    }
    defer resp.Body.Close()

    // 打印状态码
    fmt.Printf("Status code for %s: %d\n", url, resp.StatusCode)
    return nil
}

func main() {
    // 初始化 Asynq 客户端和队列
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: "localhost:6379"})
    defer client.Close()

    // 定义任务:检查 https://free.iosapp.icu/ 状态码
    task1 := asynq.NewTask(CheckStatusCodeTaskType, nil)

    // 定义任务:检查 https://buy.iosapp.icu/ 状态码
    task2 := asynq.NewTask(CheckBuyStatusCodeTaskType, nil)

    // 将任务添加到队列
    _, err := client.Enqueue(task1)
    if err != nil {
        log.Fatalf("Failed to enqueue task1: %v", err)
    }
    _, err = client.Enqueue(task2)
    if err != nil {
        log.Fatalf("Failed to enqueue task2: %v", err)
    }

    // 设置任务处理函数和启动任务队列
    mux := asynq.NewServeMux()
    mux.HandleFunc(CheckStatusCodeTaskType, handleCheckStatusCodeTask)
    mux.HandleFunc(CheckBuyStatusCodeTaskType, handleCheckBuyStatusCodeTask)

    // 启动工作队列,等待处理任务
    server := asynq.NewServer(asynq.RedisClientOpt{Addr: "localhost:6379"}, asynq.Config{Concurrency: 10})
    if err := server.Run(mux); err != nil {
        log.Fatalf("Error starting server: %v", err)
    }
}


asynqMaxRetry 和其他参数选项的笔记

在使用 asynq 库时,Enqueue() 方法用于将任务放入队列,第二个参数是一个可选的配置项,它是一个或多个 asynq.Option,用于指定任务的执行行为。以下是一些常见的配置项及其作用:


1. asynq.MaxRetry(n)

  • 功能:设置任务的最大重试次数。
  • 参数n 是重试的最大次数,超过最大重试次数后任务会被标记为失败。
  • 示例asynq.MaxRetry(5) — 任务最多重试 5 次。

2. asynq.Delay(t)

  • 功能:设置任务的延迟时间,任务将在延迟时间后开始执行。
  • 参数t 是延迟的时间,可以是 time.Duration 类型。
  • 示例asynq.Delay(time.Second * 10) — 任务延迟 10 秒执行。

3. asynq.Schedule(t)

  • 功能:设置任务的开始执行时间点,而不是立即执行。
  • 参数t 是一个指定时间,表示任务应在该时间点开始执行。
  • 示例asynq.Schedule(time.Now().Add(time.Hour * 1)) — 任务将在 1 小时后执行。

4. asynq.TaskTimeout(t)

  • 功能:设置任务的最大超时时间,超过这个时间任务会被自动取消。
  • 参数t 是任务的超时时间,time.Duration 类型。
  • 示例asynq.TaskTimeout(time.Second * 30) — 任务超时 30 秒。

5. asynq.Retention(t)

  • 功能:设置任务在队列中的保留时间,超过此时间任务将被删除。
  • 参数t 是任务保留的时间,time.Duration 类型。
  • 示例asynq.Retention(time.Hour * 24) — 任务在队列中最多保留 24 小时。

6. asynq.Priority(priority)

  • 功能:设置任务的优先级,优先级高的任务会先被执行。
  • 参数priority 可以是 asynq.PriorityLow, asynq.PriorityNormal, 或 asynq.PriorityHigh
  • 示例asynq.Priority(asynq.PriorityHigh) — 设置任务为高优先级。

示例:多种选项组合

你可以将多个选项组合使用,控制任务的执行行为:

mq.MClient.Enqueue(orderCallbackQueue,
    asynq.MaxRetry(5),
    asynq.Delay(time.Second * 10),
    asynq.TaskTimeout(time.Second * 30),
    asynq.Priority(asynq.PriorityHigh),
)

说明

  • 任务最多重试 5 次。
  • 任务延迟 10 秒后执行。
  • 如果任务在 30 秒内未完成,将会超时取消。
  • 设置任务为高优先级。

总结

asynq 提供了许多选项来控制任务的行为。常见的配置项包括:

  • MaxRetry:最大重试次数。
  • Delay:任务延迟执行。
  • Schedule:设置任务的开始执行时间。
  • TaskTimeout:设置任务的超时时间。
  • Retention:任务在队列中的保留时间。
  • Priority:设置任务的优先级。

通过合理配置这些选项,你可以精细控制任务的执行方式,从而提高任务处理的可靠性和效率。

每日更新-免费小火箭账号
不要错过任何机会,探索最新的应用和游戏,就在我们的平台。
立即访问
最后修改:2024 年 12 月 31 日
如果觉得我的文章对你有用,请随意赞赏