asynq 中,你可以使用 asynq.ProcessInasynq.ProcessAt 来设置任务的延迟执行时间。以下是几种设置过期时间的方式:

方式 1:使用 ProcessIn 设置延迟时间

// 创建延迟任务
func EnqueueExpirationTask(orderID string, expirationMinutes int) error {
    client := asynq.NewClient(asynq.RedisClientOpt{
        Addr: "localhost:6379",
    })
    defer client.Close()

    // 创建任务
    task := asynq.NewTask("order:expire", []byte(orderID))

    // 设置在 expirationMinutes 分钟后执行
    _, err := client.Enqueue(
        task,
        asynq.Queue(QueueOrderExpiration),
        asynq.ProcessIn(time.Duration(expirationMinutes) * time.Minute),  // 设置延迟时间
    )
    
    return err
}

方式 2:使用 ProcessAt 设置具体执行时间

// 创建定时任务
func EnqueueExpirationTask(orderID string, expirationTime time.Time) error {
    client := asynq.NewClient(asynq.RedisClientOpt{
        Addr: "localhost:6379",
    })
    defer client.Close()

    task := asynq.NewTask("order:expire", []byte(orderID))

    // 设置在指定时间执行
    _, err := client.Enqueue(
        task,
        asynq.Queue(QueueOrderExpiration),
        asynq.ProcessAt(expirationTime),  // 设置具体执行时间
    )
    
    return err
}

完整示例(包含订单创建和过期处理):

// 创建订单并设置过期任务
func CreateOrder(c *gin.Context) {
    // 获取配置的过期时间
    orderExpirationMinutes := viper.GetInt("order_expiration_time")
    if orderExpirationMinutes <= 0 {
        orderExpirationMinutes = 10 // 默认10分钟
    }

    // 生成订单ID
    orderID, err := GenerateUniqueTradeID()
    if err != nil {
        c.JSON(http.StatusInternalServerError, gin.H{"error": "生成订单ID失败"})
        return
    }

    // 创建过期任务
    client := asynq.NewClient(asynq.RedisClientOpt{
        Addr: "localhost:6379",
    })
    defer client.Close()

    // 创建任务并设置过期时间
    task := asynq.NewTask("order:expire", []byte(orderID))
    _, err = client.Enqueue(
        task,
        asynq.Queue(QueueOrderExpiration),
        asynq.ProcessIn(time.Duration(orderExpirationMinutes) * time.Minute),
        asynq.MaxRetry(3),  // 设置最大重试次数
    )

    if err != nil {
        c.JSON(http.StatusInternalServerError, gin.H{"error": "设置订单过期任务失败"})
        return
    }

    // 返回成功响应
    c.JSON(http.StatusOK, gin.H{
        "order_id": orderID,
        "expires_in": fmt.Sprintf("%d分钟", orderExpirationMinutes),
    })
}

// 处理订单过期的任务
func HandleOrderExpiration(ctx context.Context, t *asynq.Task) error {
    orderID := string(t.Payload())
    
    // 处理订单过期逻辑
    err := expireOrder(orderID)
    if err != nil {
        return fmt.Errorf("处理订单过期失败: %v", err)
    }
    
    return nil
}

注意事项:

  1. 任务重试

    • 可以使用 asynq.MaxRetry(n) 设置任务失败后的重试次数
    • 可以设置重试间隔和策略
  2. 任务优先级

    • 可以为不同队列设置不同的优先级
    • 高优先级的队列会优先处理
  3. 错误处理

    • 需要妥善处理任务执行过程中的错误
    • 可以设置错误通知和监控
  4. 监控和管理

    • asynq 提供了 Web UI 用于监控任务执行情况
    • 可以查看任务状态、重试情况等
每日更新-免费小火箭账号
不要错过任何机会,探索最新的应用和游戏,就在我们的平台。
立即访问
最后修改:2024 年 12 月 30 日
如果觉得我的文章对你有用,请随意赞赏