From 148fa1f2bbad7f983c9554fb004f2a866ee0a6a0 Mon Sep 17 00:00:00 2001 From: huang <1724659546@qq.com> Date: Sun, 16 Nov 2025 20:18:42 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AEbug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../infra/repository/pending_task_repository.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/internal/infra/repository/pending_task_repository.go b/internal/infra/repository/pending_task_repository.go index 0f8cc3d..053a546 100644 --- a/internal/infra/repository/pending_task_repository.go +++ b/internal/infra/repository/pending_task_repository.go @@ -4,13 +4,13 @@ import ( "context" "errors" "fmt" + "sync" "time" "git.huangwc.com/pig/pig-farm-controller/internal/infra/logs" "git.huangwc.com/pig/pig-farm-controller/internal/infra/models" "gorm.io/gorm" - "gorm.io/gorm/clause" ) // PendingTaskRepository 定义了与待执行任务队列交互的接口。 @@ -43,11 +43,13 @@ type PendingTaskRepository interface { type gormPendingTaskRepository struct { ctx context.Context db *gorm.DB + + mutex sync.Mutex } // NewGormPendingTaskRepository 创建一个新的待执行任务队列仓库。 func NewGormPendingTaskRepository(ctx context.Context, db *gorm.DB) PendingTaskRepository { - return &gormPendingTaskRepository{ctx: ctx, db: db} + return &gormPendingTaskRepository{ctx: ctx, db: db, mutex: sync.Mutex{}} } func (r *gormPendingTaskRepository) FindAllPendingTasks(ctx context.Context) ([]models.PendingTask, error) { @@ -114,15 +116,16 @@ func (r *gormPendingTaskRepository) ClaimNextAvailableTask(ctx context.Context, var pendingTask models.PendingTask err := r.db.WithContext(repoCtx).Transaction(func(tx *gorm.DB) error { + // TODO task_execution_logs 开启了压缩, 所以无法使用行锁控制资源, 暂时使用程序锁控制并发问题, 这是基于目前只会启动一个实例的前提 + r.mutex.Lock() + defer r.mutex.Unlock() + // 从 pending_tasks 表开始构建查询 query := tx.WithContext(repoCtx).Model(&models.PendingTask{}) // JOIN task_execution_logs 表 query = query.Joins("JOIN task_execution_logs ON task_execution_logs.id = pending_tasks.task_execution_log_id") - // 添加行锁 - query = query.Clauses(clause.Locking{Strength: "UPDATE"}) - // 添加基础查询条件,并明确指定表名 query = query.Where("pending_tasks.execute_at <= ?", time.Now())