2025-09-16 23:11:07 +08:00
|
|
|
|
package repository
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
2025-11-05 23:00:07 +08:00
|
|
|
|
"context"
|
2025-09-20 21:14:58 +08:00
|
|
|
|
"errors"
|
2025-09-20 21:45:38 +08:00
|
|
|
|
"fmt"
|
2025-11-16 20:18:42 +08:00
|
|
|
|
"sync"
|
2025-09-16 23:11:07 +08:00
|
|
|
|
"time"
|
|
|
|
|
|
|
2025-11-05 23:00:07 +08:00
|
|
|
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
2025-09-16 23:11:07 +08:00
|
|
|
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
|
2025-11-05 23:00:07 +08:00
|
|
|
|
|
2025-09-16 23:11:07 +08:00
|
|
|
|
"gorm.io/gorm"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// PendingTaskRepository 定义了与待执行任务队列交互的接口。
|
|
|
|
|
|
type PendingTaskRepository interface {
|
2025-11-05 23:00:07 +08:00
|
|
|
|
FindAllPendingTasks(ctx context.Context) ([]models.PendingTask, error)
|
2025-11-10 22:23:31 +08:00
|
|
|
|
FindPendingTriggerByPlanID(ctx context.Context, planID uint32) (*models.PendingTask, error)
|
|
|
|
|
|
DeletePendingTasksByIDs(ctx context.Context, ids []uint32) error
|
2025-11-05 23:00:07 +08:00
|
|
|
|
CreatePendingTask(ctx context.Context, task *models.PendingTask) error
|
|
|
|
|
|
CreatePendingTasksInBatch(ctx context.Context, tasks []*models.PendingTask) error
|
2025-09-20 21:45:38 +08:00
|
|
|
|
|
|
|
|
|
|
// UpdatePendingTaskExecuteAt 更新指定待执行任务的执行时间
|
2025-11-10 22:23:31 +08:00
|
|
|
|
UpdatePendingTaskExecuteAt(ctx context.Context, id uint32, executeAt time.Time) error
|
2025-09-20 21:45:38 +08:00
|
|
|
|
|
2025-09-20 23:50:27 +08:00
|
|
|
|
// ClearAllPendingTasks 清空所有待执行任务
|
2025-11-05 23:00:07 +08:00
|
|
|
|
ClearAllPendingTasks(ctx context.Context) error
|
2025-09-20 23:50:27 +08:00
|
|
|
|
|
2025-09-17 15:45:40 +08:00
|
|
|
|
// ClaimNextAvailableTask 原子地认领下一个可用的任务。
|
|
|
|
|
|
// 它会同时返回被认领任务对应的日志对象,以及被删除的待办任务对象的内存副本。
|
2025-11-10 22:23:31 +08:00
|
|
|
|
ClaimNextAvailableTask(ctx context.Context, excludePlanIDs []uint32) (*models.TaskExecutionLog, *models.PendingTask, error)
|
2025-09-17 15:45:40 +08:00
|
|
|
|
// RequeueTask 安全地将一个任务重新放回队列。
|
2025-11-05 23:00:07 +08:00
|
|
|
|
RequeueTask(ctx context.Context, originalPendingTask *models.PendingTask) error
|
2025-09-22 15:17:54 +08:00
|
|
|
|
// FindPendingTasksByTaskLogIDs 根据 TaskExecutionLogID 列表查找对应的待执行任务
|
2025-11-10 22:23:31 +08:00
|
|
|
|
FindPendingTasksByTaskLogIDs(ctx context.Context, taskLogIDs []uint32) ([]models.PendingTask, error)
|
2025-09-23 17:11:31 +08:00
|
|
|
|
|
|
|
|
|
|
// DeletePendingTasksByPlanLogID 删除与指定计划执行日志ID相关的所有待执行任务
|
2025-11-10 22:23:31 +08:00
|
|
|
|
DeletePendingTasksByPlanLogID(ctx context.Context, planLogID uint32) error
|
2025-09-16 23:11:07 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-09-17 23:01:15 +08:00
|
|
|
|
// gormPendingTaskRepository 是使用 GORM 的具体实现。
|
|
|
|
|
|
type gormPendingTaskRepository struct {
|
2025-11-05 23:00:07 +08:00
|
|
|
|
ctx context.Context
|
|
|
|
|
|
db *gorm.DB
|
2025-11-16 20:18:42 +08:00
|
|
|
|
|
|
|
|
|
|
mutex sync.Mutex
|
2025-09-16 23:11:07 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-09-17 23:01:15 +08:00
|
|
|
|
// NewGormPendingTaskRepository 创建一个新的待执行任务队列仓库。
|
2025-11-05 23:00:07 +08:00
|
|
|
|
func NewGormPendingTaskRepository(ctx context.Context, db *gorm.DB) PendingTaskRepository {
|
2025-11-16 20:18:42 +08:00
|
|
|
|
return &gormPendingTaskRepository{ctx: ctx, db: db, mutex: sync.Mutex{}}
|
2025-09-16 23:11:07 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-11-05 23:00:07 +08:00
|
|
|
|
func (r *gormPendingTaskRepository) FindAllPendingTasks(ctx context.Context) ([]models.PendingTask, error) {
|
|
|
|
|
|
repoCtx := logs.AddFuncName(ctx, r.ctx, "FindAllPendingTasks")
|
2025-09-20 21:14:58 +08:00
|
|
|
|
var tasks []models.PendingTask
|
|
|
|
|
|
// 预加载 Task 以便后续访问 Task.PlanID
|
2025-09-20 23:50:27 +08:00
|
|
|
|
// 预加载 TaskExecutionLog 以便后续访问 PlanExecutionLogID
|
2025-11-05 23:00:07 +08:00
|
|
|
|
err := r.db.WithContext(repoCtx).Preload("Task").Preload("TaskExecutionLog").Find(&tasks).Error
|
2025-09-20 21:14:58 +08:00
|
|
|
|
return tasks, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-11-10 22:23:31 +08:00
|
|
|
|
func (r *gormPendingTaskRepository) FindPendingTriggerByPlanID(ctx context.Context, planID uint32) (*models.PendingTask, error) {
|
2025-11-05 23:00:07 +08:00
|
|
|
|
repoCtx := logs.AddFuncName(ctx, r.ctx, "FindPendingTriggerByPlanID")
|
2025-09-20 21:14:58 +08:00
|
|
|
|
var pendingTask models.PendingTask
|
2025-09-20 21:45:38 +08:00
|
|
|
|
// 关键修改:通过 JOIN tasks 表并查询 parameters JSON 字段来查找触发器,而不是依赖 task.plan_id
|
2025-11-05 23:00:07 +08:00
|
|
|
|
err := r.db.WithContext(repoCtx).
|
2025-09-20 21:14:58 +08:00
|
|
|
|
Joins("JOIN tasks ON tasks.id = pending_tasks.task_id").
|
2025-09-20 21:45:38 +08:00
|
|
|
|
Where("tasks.type = ? AND tasks.parameters->>'plan_id' = ?", models.TaskPlanAnalysis, fmt.Sprintf("%d", planID)).
|
2025-09-20 21:14:58 +08:00
|
|
|
|
First(&pendingTask).Error
|
|
|
|
|
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
|
|
|
|
return nil, nil // 未找到不是错误
|
|
|
|
|
|
}
|
|
|
|
|
|
return &pendingTask, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-11-10 22:23:31 +08:00
|
|
|
|
func (r *gormPendingTaskRepository) DeletePendingTasksByIDs(ctx context.Context, ids []uint32) error {
|
2025-11-05 23:00:07 +08:00
|
|
|
|
repoCtx := logs.AddFuncName(ctx, r.ctx, "DeletePendingTasksByIDs")
|
2025-09-20 21:14:58 +08:00
|
|
|
|
if len(ids) == 0 {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
2025-11-05 23:00:07 +08:00
|
|
|
|
return r.db.WithContext(repoCtx).Where("id IN ?", ids).Delete(&models.PendingTask{}).Error
|
2025-09-20 21:14:58 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-11-05 23:00:07 +08:00
|
|
|
|
func (r *gormPendingTaskRepository) CreatePendingTask(ctx context.Context, task *models.PendingTask) error {
|
|
|
|
|
|
repoCtx := logs.AddFuncName(ctx, r.ctx, "CreatePendingTask")
|
|
|
|
|
|
return r.db.WithContext(repoCtx).Create(task).Error
|
2025-09-20 21:14:58 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-09-16 23:11:07 +08:00
|
|
|
|
// CreatePendingTasksInBatch 在一次数据库调用中创建多个待执行任务条目。
|
2025-11-05 23:00:07 +08:00
|
|
|
|
func (r *gormPendingTaskRepository) CreatePendingTasksInBatch(ctx context.Context, tasks []*models.PendingTask) error {
|
|
|
|
|
|
repoCtx := logs.AddFuncName(ctx, r.ctx, "CreatePendingTasksInBatch")
|
2025-09-20 21:14:58 +08:00
|
|
|
|
if len(tasks) == 0 {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
2025-11-05 23:00:07 +08:00
|
|
|
|
return r.db.WithContext(repoCtx).Create(&tasks).Error
|
2025-09-16 23:11:07 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-09-20 21:45:38 +08:00
|
|
|
|
// UpdatePendingTaskExecuteAt 更新指定待执行任务的执行时间
|
2025-11-10 22:23:31 +08:00
|
|
|
|
func (r *gormPendingTaskRepository) UpdatePendingTaskExecuteAt(ctx context.Context, id uint32, executeAt time.Time) error {
|
2025-11-05 23:00:07 +08:00
|
|
|
|
repoCtx := logs.AddFuncName(ctx, r.ctx, "UpdatePendingTaskExecuteAt")
|
|
|
|
|
|
return r.db.WithContext(repoCtx).Model(&models.PendingTask{}).Where("id = ?", id).Update("execute_at", executeAt).Error
|
2025-09-20 21:45:38 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-09-20 23:50:27 +08:00
|
|
|
|
// ClearAllPendingTasks 清空所有待执行任务
|
2025-11-05 23:00:07 +08:00
|
|
|
|
func (r *gormPendingTaskRepository) ClearAllPendingTasks(ctx context.Context) error {
|
|
|
|
|
|
repoCtx := logs.AddFuncName(ctx, r.ctx, "ClearAllPendingTasks")
|
|
|
|
|
|
return r.db.WithContext(repoCtx).Where("1 = 1").Delete(&models.PendingTask{}).Error
|
2025-09-20 23:50:27 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-09-17 14:58:41 +08:00
|
|
|
|
// ClaimNextAvailableTask 以原子方式认领下一个可用的任务。
|
2025-11-10 22:23:31 +08:00
|
|
|
|
func (r *gormPendingTaskRepository) ClaimNextAvailableTask(ctx context.Context, excludePlanIDs []uint32) (*models.TaskExecutionLog, *models.PendingTask, error) {
|
2025-11-05 23:00:07 +08:00
|
|
|
|
repoCtx := logs.AddFuncName(ctx, r.ctx, "ClaimNextAvailableTask")
|
2025-09-16 23:11:07 +08:00
|
|
|
|
var log models.TaskExecutionLog
|
2025-09-17 15:45:40 +08:00
|
|
|
|
var pendingTask models.PendingTask
|
2025-09-16 23:11:07 +08:00
|
|
|
|
|
2025-11-05 23:00:07 +08:00
|
|
|
|
err := r.db.WithContext(repoCtx).Transaction(func(tx *gorm.DB) error {
|
2025-11-16 20:18:42 +08:00
|
|
|
|
// TODO task_execution_logs 开启了压缩, 所以无法使用行锁控制资源, 暂时使用程序锁控制并发问题, 这是基于目前只会启动一个实例的前提
|
|
|
|
|
|
r.mutex.Lock()
|
|
|
|
|
|
defer r.mutex.Unlock()
|
|
|
|
|
|
|
2025-11-16 14:55:40 +08:00
|
|
|
|
// 从 pending_tasks 表开始构建查询
|
|
|
|
|
|
query := tx.WithContext(repoCtx).Model(&models.PendingTask{})
|
2025-09-17 14:58:41 +08:00
|
|
|
|
|
2025-11-16 14:55:40 +08:00
|
|
|
|
// JOIN task_execution_logs 表
|
|
|
|
|
|
query = query.Joins("JOIN task_execution_logs ON task_execution_logs.id = pending_tasks.task_execution_log_id")
|
|
|
|
|
|
|
|
|
|
|
|
// 添加基础查询条件,并明确指定表名
|
|
|
|
|
|
query = query.Where("pending_tasks.execute_at <= ?", time.Now())
|
|
|
|
|
|
|
|
|
|
|
|
// 如果需要排除,则基于 JOIN 后的表进行过滤
|
2025-09-17 14:58:41 +08:00
|
|
|
|
if len(excludePlanIDs) > 0 {
|
2025-11-16 14:55:40 +08:00
|
|
|
|
query = query.Where("task_execution_logs.plan_execution_log_id NOT IN ?", excludePlanIDs)
|
2025-09-17 14:58:41 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-11-16 14:55:40 +08:00
|
|
|
|
// 按执行时间排序,并明确指定表名
|
|
|
|
|
|
query = query.Order("pending_tasks.execute_at ASC")
|
|
|
|
|
|
|
|
|
|
|
|
// 明确 SELECT pending_tasks 的所有列,并执行查询
|
|
|
|
|
|
if err := query.Select("pending_tasks.*").First(&pendingTask).Error; err != nil {
|
2025-09-17 14:58:41 +08:00
|
|
|
|
return err
|
2025-09-16 23:11:07 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-11-05 23:00:07 +08:00
|
|
|
|
if err := tx.WithContext(repoCtx).Unscoped().Delete(&pendingTask).Error; err != nil {
|
2025-09-16 23:11:07 +08:00
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
updates := map[string]interface{}{
|
|
|
|
|
|
"status": models.ExecutionStatusStarted,
|
|
|
|
|
|
"started_at": time.Now(),
|
|
|
|
|
|
}
|
2025-11-05 23:00:07 +08:00
|
|
|
|
if err := tx.WithContext(repoCtx).Model(&models.TaskExecutionLog{}).Where("id = ?", pendingTask.TaskExecutionLogID).Updates(updates).Error; err != nil {
|
2025-09-16 23:11:07 +08:00
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-11-16 14:55:40 +08:00
|
|
|
|
// 在 Preload("Task") 时,使用 Unscoped() 来忽略 Task 的软删除状态
|
2025-11-05 23:00:07 +08:00
|
|
|
|
if err := tx.WithContext(repoCtx).Preload("Task", func(db *gorm.DB) *gorm.DB {
|
2025-09-23 22:08:18 +08:00
|
|
|
|
return db.Unscoped()
|
|
|
|
|
|
}).First(&log, pendingTask.TaskExecutionLogID).Error; err != nil {
|
2025-09-16 23:11:07 +08:00
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
2025-09-17 15:45:40 +08:00
|
|
|
|
return nil, nil, err
|
2025-09-16 23:11:07 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
2025-09-17 15:45:40 +08:00
|
|
|
|
return &log, &pendingTask, nil
|
2025-09-16 23:11:07 +08:00
|
|
|
|
}
|
2025-09-17 14:58:41 +08:00
|
|
|
|
|
2025-09-17 15:45:40 +08:00
|
|
|
|
// RequeueTask 安全地将一个任务重新放回队列。
|
|
|
|
|
|
// 它通过将原始 PendingTask 的 ID 重置为 0,并重新创建它来实现。
|
2025-11-05 23:00:07 +08:00
|
|
|
|
func (r *gormPendingTaskRepository) RequeueTask(ctx context.Context, originalPendingTask *models.PendingTask) error {
|
|
|
|
|
|
repoCtx := logs.AddFuncName(ctx, r.ctx, "RequeueTask")
|
|
|
|
|
|
return r.db.WithContext(repoCtx).Transaction(func(tx *gorm.DB) error {
|
2025-09-17 14:58:41 +08:00
|
|
|
|
// 1. 将日志状态恢复为 waiting
|
2025-11-05 23:00:07 +08:00
|
|
|
|
if err := tx.WithContext(repoCtx).Model(&models.TaskExecutionLog{}).Where("id = ?", originalPendingTask.TaskExecutionLogID).Update("status", models.ExecutionStatusWaiting).Error; err != nil {
|
2025-09-17 14:58:41 +08:00
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2025-09-17 15:45:40 +08:00
|
|
|
|
// 2. 关键:将传入的 PendingTask 的 ID 重置为 0。
|
|
|
|
|
|
// 这会告诉 GORM,这是一个需要创建(INSERT)的新记录,而不是更新。
|
|
|
|
|
|
originalPendingTask.ID = 0
|
|
|
|
|
|
|
|
|
|
|
|
// 3. 重新创建待办任务。GORM 会忽略掉已被重置的 ID,并让数据库生成一个新的主键。
|
2025-11-05 23:00:07 +08:00
|
|
|
|
return tx.WithContext(repoCtx).Create(originalPendingTask).Error
|
2025-09-17 14:58:41 +08:00
|
|
|
|
})
|
|
|
|
|
|
}
|
2025-09-22 15:17:54 +08:00
|
|
|
|
|
|
|
|
|
|
// FindPendingTasksByTaskLogIDs 根据 TaskExecutionLogID 列表查找对应的待执行任务
|
2025-11-10 22:23:31 +08:00
|
|
|
|
func (r *gormPendingTaskRepository) FindPendingTasksByTaskLogIDs(ctx context.Context, taskLogIDs []uint32) ([]models.PendingTask, error) {
|
2025-11-05 23:00:07 +08:00
|
|
|
|
repoCtx := logs.AddFuncName(ctx, r.ctx, "FindPendingTasksByTaskLogIDs")
|
2025-09-22 15:17:54 +08:00
|
|
|
|
if len(taskLogIDs) == 0 {
|
|
|
|
|
|
return []models.PendingTask{}, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
var pendingTasks []models.PendingTask
|
2025-11-05 23:00:07 +08:00
|
|
|
|
err := r.db.WithContext(repoCtx).Where("task_execution_log_id IN ?", taskLogIDs).Find(&pendingTasks).Error
|
2025-09-22 15:17:54 +08:00
|
|
|
|
return pendingTasks, err
|
|
|
|
|
|
}
|
2025-09-23 17:11:31 +08:00
|
|
|
|
|
|
|
|
|
|
// DeletePendingTasksByPlanLogID 删除与指定计划执行日志ID相关的所有待执行任务
|
2025-11-10 22:23:31 +08:00
|
|
|
|
func (r *gormPendingTaskRepository) DeletePendingTasksByPlanLogID(ctx context.Context, planLogID uint32) error {
|
2025-11-05 23:00:07 +08:00
|
|
|
|
repoCtx := logs.AddFuncName(ctx, r.ctx, "DeletePendingTasksByPlanLogID")
|
2025-09-23 17:11:31 +08:00
|
|
|
|
// 使用子查询找到所有与 planLogID 相关的 task_execution_log_id
|
2025-11-05 23:00:07 +08:00
|
|
|
|
subQuery := r.db.WithContext(repoCtx).Model(&models.TaskExecutionLog{}).Select("id").Where("plan_execution_log_id = ?", planLogID)
|
2025-09-23 17:11:31 +08:00
|
|
|
|
|
|
|
|
|
|
// 使用子查询的结果来删除待执行任务
|
2025-11-05 23:00:07 +08:00
|
|
|
|
return r.db.WithContext(repoCtx).Where("task_execution_log_id IN (?)", subQuery).Delete(&models.PendingTask{}).Error
|
2025-09-23 17:11:31 +08:00
|
|
|
|
}
|