Files
pig-farm-controller/internal/infra/database/postgres.go
2025-11-20 18:58:49 +08:00

435 lines
17 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package database 提供基于PostgreSQL的数据存储功能
// 使用GORM作为ORM库来操作数据库
// 实现与PostgreSQL数据库的连接和基本操作
package database
import (
"context"
"fmt"
"strings"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
// PostgresStorage 代表基于PostgreSQL的存储实现
// 使用GORM作为ORM库
type PostgresStorage struct {
ctx context.Context
db *gorm.DB
isTimescaleDB bool
connectionString string
maxOpenConns int
maxIdleConns int
connMaxLifetime int
}
// NewPostgresStorage 创建并返回一个新的PostgreSQL存储实例
// 它接收一个 logger 实例,而不是自己创建
func NewPostgresStorage(ctx context.Context, connectionString string, isTimescaleDB bool, maxOpenConns, maxIdleConns, connMaxLifetime int) *PostgresStorage {
return &PostgresStorage{
ctx: ctx,
connectionString: connectionString,
isTimescaleDB: isTimescaleDB,
maxOpenConns: maxOpenConns,
maxIdleConns: maxIdleConns,
connMaxLifetime: connMaxLifetime,
}
}
// Connect 建立与PostgreSQL数据库的连接
// 使用GORM建立数据库连接并使用自定义的 logger 接管 GORM 日志
func (ps *PostgresStorage) Connect(ctx context.Context) error {
storageCtx, logger := logs.Trace(ctx, ps.ctx, "Connect")
logger.Info("正在连接PostgreSQL数据库")
// 创建 GORM 的 logger 适配器
gormLogger := logs.NewGormLogger(logs.GetLogger(logs.AddCompName(context.Background(), "GORM")))
var err error
// 在 gorm.Open 时传入我们自定义的 logger
ps.db, err = gorm.Open(postgres.Open(ps.connectionString), &gorm.Config{
Logger: gormLogger,
})
if err != nil {
logger.Errorw("数据库连接失败", "error", err)
return fmt.Errorf("数据库连接失败: %w", err) // 使用 %w 进行错误包装
}
// 测试连接
sqlDB, err := ps.db.WithContext(storageCtx).DB()
if err != nil {
logger.Errorw("获取数据库实例失败", "error", err)
return fmt.Errorf("获取数据库实例失败: %w", err)
}
if err = sqlDB.Ping(); err != nil {
logger.Errorw("数据库连接测试失败", "error", err)
return fmt.Errorf("数据库连接测试失败: %w", err)
}
// 设置连接池参数
sqlDB.SetMaxOpenConns(ps.maxOpenConns)
sqlDB.SetMaxIdleConns(ps.maxIdleConns)
sqlDB.SetConnMaxLifetime(time.Duration(ps.connMaxLifetime) * time.Second)
// gorm会根据字段名自动创建外键约束, 但触发器Task的PlanID是不存在的, 所以需要关闭, 这个关闭对
ps.db.DisableForeignKeyConstraintWhenMigrating = true
logger.Info("PostgreSQL数据库连接成功")
return nil
}
// Disconnect 断开与PostgreSQL数据库的连接
// 安全地关闭所有数据库连接
func (ps *PostgresStorage) Disconnect(ctx context.Context) error {
storageCtx, logger := logs.Trace(ctx, ps.ctx, "Disconnect")
if ps.db != nil {
logger.Info("正在断开PostgreSQL数据库连接")
sqlDB, err := ps.db.WithContext(storageCtx).DB()
if err != nil {
logger.Errorw("获取数据库实例失败", "error", err)
return fmt.Errorf("获取数据库实例失败: %w", err)
}
if err := sqlDB.Close(); err != nil {
logger.Errorw("关闭数据库连接失败", "error", err)
return fmt.Errorf("关闭数据库连接失败: %w", err)
}
logger.Info("PostgreSQL数据库连接已断开")
}
return nil
}
// GetDB 获取GORM数据库实例
// 用于执行具体的数据库操作
func (ps *PostgresStorage) GetDB(ctx context.Context) *gorm.DB {
storageCtx := logs.AddFuncName(ctx, ps.ctx, "GetDB")
return ps.db.WithContext(storageCtx)
}
// Migrate 执行数据库迁移
func (ps *PostgresStorage) Migrate(ctx context.Context, models ...interface{}) error {
storageCtx, logger := logs.Trace(ctx, ps.ctx, "Migrate")
if len(models) == 0 {
logger.Info("没有需要迁移的数据库模型,跳过迁移步骤")
return nil
}
logger.Info("正在自动迁移数据库表结构")
if err := ps.db.WithContext(storageCtx).AutoMigrate(models...); err != nil {
logger.Errorw("数据库表结构迁移失败", "error", err)
return fmt.Errorf("数据库表结构迁移失败: %w", err)
}
logger.Info("数据库表结构迁移完成")
// -- 处理gorm做不到的初始化逻辑 --
if err := ps.creatingIndex(storageCtx); err != nil {
return err
}
// 如果是 TimescaleDB, 则将部分表转换为 hypertable
if ps.isTimescaleDB {
logger.Info("检测到 TimescaleDB, 准备进行超表转换和压缩策略配置")
if err := ps.setupTimescaleDB(storageCtx); err != nil {
return err
}
}
return nil
}
// setupTimescaleDB 统一处理所有 TimescaleDB 相关的设置
func (ps *PostgresStorage) setupTimescaleDB(ctx context.Context) error {
storageCtx := logs.AddFuncName(ctx, ps.ctx, "setupTimescaleDB")
if err := ps.creatingHyperTable(storageCtx); err != nil {
return err
}
if err := ps.applyCompressionPolicies(storageCtx); err != nil {
return err
}
return nil
}
// creatingHyperTable 用于在数据库是 TimescaleDB 时创建超表
func (ps *PostgresStorage) creatingHyperTable(ctx context.Context) error {
storageCtx, logger := logs.Trace(ctx, ps.ctx, "creatingHyperTable")
// 定义一个辅助结构体来管理超表转换
tablesToConvert := []struct {
model interface{ TableName() string }
timeColumn string
}{
{models.SensorData{}, "time"},
{models.DeviceCommandLog{}, "sent_at"},
{models.PlanExecutionLog{}, "created_at"},
{models.TaskExecutionLog{}, "created_at"},
{models.PendingCollection{}, "created_at"},
{models.UserActionLog{}, "time"},
{models.MedicationLog{}, "happened_at"},
{models.PigBatchLog{}, "happened_at"},
{models.WeighingBatch{}, "weighing_time"},
{models.WeighingRecord{}, "weighing_time"},
{models.PigTransferLog{}, "transfer_time"},
{models.PigSickLog{}, "happened_at"},
{models.PigPurchase{}, "purchase_date"},
{models.PigSale{}, "sale_date"},
{models.Notification{}, "alarm_timestamp"},
{models.HistoricalAlarm{}, "trigger_time"},
{models.RawMaterialStockLog{}, "happened_at"},
}
for _, table := range tablesToConvert {
tableName := table.model.TableName()
chunkInterval := "1 days" // 统一设置为1天
logger.Debugw("准备将表转换为超表", "table", tableName, "chunk_interval", chunkInterval)
sql := fmt.Sprintf("SELECT create_hypertable('%s', '%s', chunk_time_interval => INTERVAL '%s', if_not_exists => TRUE);", tableName, table.timeColumn, chunkInterval)
if err := ps.db.WithContext(storageCtx).Exec(sql).Error; err != nil {
logger.Errorw("转换为超表失败", "table", tableName, "error", err)
return fmt.Errorf("将 %s 转换为超表失败: %w", tableName, err)
}
logger.Debugw("成功将表转换为超表 (或已转换)", "table", tableName)
}
return nil
}
// applyCompressionPolicies 为超表配置自动压缩策略
func (ps *PostgresStorage) applyCompressionPolicies(ctx context.Context) error {
storageCtx, logger := logs.Trace(ctx, ps.ctx, "applyCompressionPolicies")
policies := []struct {
model interface{ TableName() string }
segmentColumn string
}{
{models.SensorData{}, "device_id"},
{models.DeviceCommandLog{}, "device_id"},
{models.PlanExecutionLog{}, "plan_id"},
{models.TaskExecutionLog{}, "task_id"},
{models.PendingCollection{}, "device_id"},
{models.UserActionLog{}, "user_id"},
{models.MedicationLog{}, "pig_batch_id"},
{models.PigBatchLog{}, "pig_batch_id"},
{models.WeighingBatch{}, "pig_batch_id"},
{models.WeighingRecord{}, "weighing_batch_id"},
{models.PigTransferLog{}, "pig_batch_id"},
{models.PigSickLog{}, "pig_batch_id"},
{models.PigPurchase{}, "pig_batch_id"},
{models.PigSale{}, "pig_batch_id"},
{models.Notification{}, "user_id"},
{models.HistoricalAlarm{}, "source_id"},
{models.RawMaterialStockLog{}, "raw_material_id"},
}
for _, policy := range policies {
tableName := policy.model.TableName()
compressAfter := "3 days" // 统一设置为2天后即进入第3天开始压缩
// 1. 开启表的压缩设置,并指定分段列
logger.Debugw("为表启用压缩设置", "table", tableName, "segment_by", policy.segmentColumn)
// 使用 + 而非Sprintf以规避goland静态检查报错
alterSQL := "ALTER TABLE" + " " + tableName + " SET (timescaledb.compress, timescaledb.compress_segmentby = '" + policy.segmentColumn + "');"
if err := ps.db.WithContext(storageCtx).Exec(alterSQL).Error; err != nil {
// 忽略错误,因为这个设置可能是不可变的,重复执行会报错
logger.Warnw("启用压缩设置时遇到问题 (可能已设置,可忽略)", "table", tableName, "error", err)
}
logger.Debugw("成功为表启用压缩设置 (或已启用)", "table", tableName)
// 2. 添加压缩策略
logger.Debugw("为表添加压缩策略", "table", tableName, "compress_after", compressAfter)
policySQL := fmt.Sprintf("SELECT add_compression_policy('%s', INTERVAL '%s', if_not_exists => TRUE);", tableName, compressAfter)
if err := ps.db.WithContext(storageCtx).Exec(policySQL).Error; err != nil {
logger.Errorw("添加压缩策略失败", "table", tableName, "error", err)
return fmt.Errorf("为 %s 添加压缩策略失败: %w", tableName, err)
}
logger.Debugw("成功为表添加压缩策略 (或已存在)", "table", tableName)
}
return nil
}
// creatingIndex 用于创建gorm无法处理的索引, 如gin索引
func (ps *PostgresStorage) creatingIndex(ctx context.Context) error {
storageCtx := logs.AddFuncName(ctx, ps.ctx, "creatingIndex")
// 使用 IF NOT EXISTS 保证幂等性
// 如果索引已存在,此命令不会报错
if err := ps.creatingUniqueIndex(storageCtx); err != nil {
return err
}
if err := ps.createGinIndexes(storageCtx); err != nil {
return err
}
return nil
}
// uniqueIndexDefinition 结构体定义了唯一索引的详细信息
type uniqueIndexDefinition struct {
tableName string // 索引所属的表名
columns []string // 构成唯一索引的列名
indexName string // 唯一索引的名称
whereClause string // 可选的 WHERE 子句,用于创建部分索引
description string // 索引的描述,用于日志记录
}
// ginIndexDefinition 结构体定义了 GIN 索引的详细信息
type ginIndexDefinition struct {
tableName string // 索引所属的表名
columnName string // 需要创建 GIN 索引的列名
indexName string // GIN 索引的名称
description string // 索引的描述,用于日志记录
}
func (ps *PostgresStorage) creatingUniqueIndex(ctx context.Context) error {
storageCtx, logger := logs.Trace(ctx, ps.ctx, "creatingUniqueIndex")
// 定义所有需要创建的唯一索引
uniqueIndexesToCreate := []uniqueIndexDefinition{
{
tableName: models.RawMaterialNutrient{}.TableName(),
columns: []string{"raw_material_id", "nutrient_id"},
indexName: "idx_raw_material_nutrients_unique_when_not_deleted",
whereClause: "WHERE deleted_at IS NULL",
description: "确保同一原料中的每种营养成分不重复",
},
{
tableName: models.PigBreed{}.TableName(),
columns: []string{"name"},
indexName: "idx_pig_breeds_unique_name_when_not_deleted",
whereClause: "WHERE deleted_at IS NULL",
description: "pig_breeds 表的部分唯一索引 (name 唯一)",
},
{
tableName: models.PigAgeStage{}.TableName(),
columns: []string{"name"},
indexName: "idx_pig_age_stages_unique_name_when_not_deleted",
whereClause: "WHERE deleted_at IS NULL",
description: "pig_age_stages 表的部分唯一索引 (name 唯一)",
},
{
tableName: models.PigType{}.TableName(),
columns: []string{"breed_id", "age_stage_id"},
indexName: "idx_pig_types_unique_breed_age_stage_when_not_deleted",
whereClause: "WHERE deleted_at IS NULL",
description: "pig_types 表的部分唯一索引 (breed_id, age_stage_id 组合唯一)",
},
{
tableName: models.PigNutrientRequirement{}.TableName(),
columns: []string{"pig_type_id", "nutrient_id"},
indexName: "idx_pig_nutrient_requirements_unique_type_nutrient_when_not_deleted",
whereClause: "WHERE deleted_at IS NULL",
description: "pig_nutrient_requirements 表的部分唯一索引 (pig_type_id, nutrient_id 组合唯一)",
},
{
tableName: models.User{}.TableName(),
columns: []string{"username"},
indexName: "idx_users_unique_username_when_not_deleted",
whereClause: "WHERE deleted_at IS NULL",
description: "users 表的部分唯一索引 (username 唯一)",
},
{
tableName: models.AreaController{}.TableName(),
columns: []string{"name"},
indexName: "idx_area_controllers_unique_name_when_not_deleted",
whereClause: "WHERE deleted_at IS NULL",
description: "area_controllers 表的部分唯一索引 (Name 唯一)",
},
{
tableName: models.AreaController{}.TableName(),
columns: []string{"network_id"},
indexName: "idx_area_controllers_unique_network_id_when_not_deleted",
whereClause: "WHERE deleted_at IS NULL",
description: "area_controllers 表的部分唯一索引 (NetworkID 唯一)",
},
{
tableName: models.DeviceTemplate{}.TableName(),
columns: []string{"name"},
indexName: "idx_device_templates_unique_name_when_not_deleted",
whereClause: "WHERE deleted_at IS NULL",
description: "device_templates 表的部分唯一索引 (name 唯一)",
},
{
tableName: models.PigBatch{}.TableName(),
columns: []string{"batch_number"},
indexName: "idx_pig_batches_unique_batch_number_when_not_deleted",
whereClause: "WHERE deleted_at IS NULL",
description: "pig_batches 表的部分唯一索引 (batch_number 唯一)",
},
{
tableName: models.PigHouse{}.TableName(),
columns: []string{"name"},
indexName: "idx_pig_houses_unique_name_when_not_deleted",
whereClause: "WHERE deleted_at IS NULL",
description: "pig_houses 表的部分唯一索引 (name 唯一)",
},
{
tableName: models.RawMaterial{}.TableName(),
columns: []string{"name"},
indexName: "idx_raw_materials_unique_name_when_not_deleted",
whereClause: "WHERE deleted_at IS NULL",
description: "raw_materials 表的部分唯一索引 (name 唯一)",
},
{
tableName: models.Nutrient{}.TableName(),
columns: []string{"name"},
indexName: "idx_nutrients_unique_name_when_not_deleted",
whereClause: "WHERE deleted_at IS NULL",
description: "nutrients 表的部分唯一索引 (name 唯一)",
},
}
for _, indexDef := range uniqueIndexesToCreate {
logger.Debugw("正在为表创建部分唯一索引", "表名", indexDef.tableName, "索引名", indexDef.indexName, "描述", indexDef.description)
// 拼接列名字符串
columnsStr := strings.Join(indexDef.columns, ", ")
// 构建 SQL 语句
sql := fmt.Sprintf("CREATE UNIQUE INDEX IF NOT EXISTS %s ON %s (%s) %s;",
indexDef.indexName, indexDef.tableName, columnsStr, indexDef.whereClause)
if err := ps.db.WithContext(storageCtx).Exec(sql).Error; err != nil {
logger.Errorw("创建部分唯一索引失败", "表名", indexDef.tableName, "索引名", indexDef.indexName, "错误", err)
return fmt.Errorf("为 %s 表创建部分唯一索引 %s 失败: %w", indexDef.tableName, indexDef.indexName, err)
}
logger.Debugw("成功为表创建部分唯一索引 (或已存在)", "表名", indexDef.tableName, "索引名", indexDef.indexName)
}
return nil
}
func (ps *PostgresStorage) createGinIndexes(ctx context.Context) error {
storageCtx, logger := logs.Trace(ctx, ps.ctx, "createGinIndexes")
// 定义所有需要创建的 GIN 索引
ginIndexesToCreate := []ginIndexDefinition{
{
tableName: "sensor_data",
columnName: "data",
indexName: "idx_sensor_data_data_gin",
description: "为 sensor_data 表的 data 字段创建 GIN 索引",
},
{
tableName: "tasks",
columnName: "parameters",
indexName: "idx_tasks_parameters_gin",
description: "为 tasks 表的 parameters 字段创建 GIN 索引",
},
}
for _, indexDef := range ginIndexesToCreate {
logger.Debugw("正在创建 GIN 索引", "表名", indexDef.tableName, "列名", indexDef.columnName, "描述", indexDef.description)
// 构建 SQL 语句
sql := fmt.Sprintf("CREATE INDEX IF NOT EXISTS %s ON %s USING GIN (%s);",
indexDef.indexName, indexDef.tableName, indexDef.columnName)
if err := ps.db.WithContext(storageCtx).Exec(sql).Error; err != nil {
logger.Errorw("创建 GIN 索引失败", "表名", indexDef.tableName, "索引名", indexDef.indexName, "错误", err)
return fmt.Errorf("为 %s 表的 %s 字段创建 GIN 索引 %s 失败: %w", indexDef.tableName, indexDef.columnName, indexDef.indexName, err)
}
logger.Debugw("成功创建 GIN 索引 (或已存在)", "表名", indexDef.tableName, "索引名", indexDef.indexName)
}
return nil
}