2025-09-11 23:10:02 +08:00
// Package database 提供基于PostgreSQL的数据存储功能
2025-09-11 20:50:51 +08:00
// 使用GORM作为ORM库来操作数据库
// 实现与PostgreSQL数据库的连接和基本操作
2025-09-11 23:10:02 +08:00
package database
2025-09-11 20:50:51 +08:00
import (
2025-11-05 22:22:46 +08:00
"context"
2025-09-11 20:50:51 +08:00
"fmt"
"time"
2025-11-05 22:22:46 +08:00
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
2025-09-27 23:05:48 +08:00
"git.huangwc.com/pig/pig-farm-controller/internal/infra/models"
2025-09-11 20:50:51 +08:00
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
// PostgresStorage 代表基于PostgreSQL的存储实现
// 使用GORM作为ORM库
type PostgresStorage struct {
2025-11-05 22:22:46 +08:00
ctx context . Context
2025-09-11 20:50:51 +08:00
db * gorm . DB
2025-09-24 16:06:05 +08:00
isTimescaleDB bool
2025-09-11 20:50:51 +08:00
connectionString string
maxOpenConns int
maxIdleConns int
connMaxLifetime int
}
// NewPostgresStorage 创建并返回一个新的PostgreSQL存储实例
// 它接收一个 logger 实例,而不是自己创建
2025-11-05 22:22:46 +08:00
func NewPostgresStorage ( ctx context . Context , connectionString string , isTimescaleDB bool , maxOpenConns , maxIdleConns , connMaxLifetime int ) * PostgresStorage {
2025-09-11 20:50:51 +08:00
return & PostgresStorage {
2025-11-05 22:22:46 +08:00
ctx : ctx ,
2025-09-11 20:50:51 +08:00
connectionString : connectionString ,
2025-09-24 16:06:05 +08:00
isTimescaleDB : isTimescaleDB ,
2025-09-11 20:50:51 +08:00
maxOpenConns : maxOpenConns ,
maxIdleConns : maxIdleConns ,
connMaxLifetime : connMaxLifetime ,
}
}
// Connect 建立与PostgreSQL数据库的连接
// 使用GORM建立数据库连接, 并使用自定义的 logger 接管 GORM 日志
2025-11-05 22:22:46 +08:00
func ( ps * PostgresStorage ) Connect ( ctx context . Context ) error {
storageCtx , logger := logs . Trace ( ctx , ps . ctx , "Connect" )
logger . Info ( "正在连接PostgreSQL数据库" )
2025-09-11 20:50:51 +08:00
// 创建 GORM 的 logger 适配器
2025-11-05 23:52:48 +08:00
gormLogger := logs . NewGormLogger ( logs . GetLogger ( logs . AddCompName ( context . Background ( ) , "GORM" ) ) )
2025-09-11 20:50:51 +08:00
var err error
// 在 gorm.Open 时传入我们自定义的 logger
ps . db , err = gorm . Open ( postgres . Open ( ps . connectionString ) , & gorm . Config {
Logger : gormLogger ,
} )
if err != nil {
2025-11-05 22:22:46 +08:00
logger . Errorw ( "数据库连接失败" , "error" , err )
2025-09-11 20:50:51 +08:00
return fmt . Errorf ( "数据库连接失败: %w" , err ) // 使用 %w 进行错误包装
}
// 测试连接
2025-11-05 22:22:46 +08:00
sqlDB , err := ps . db . WithContext ( storageCtx ) . DB ( )
2025-09-11 20:50:51 +08:00
if err != nil {
2025-11-05 22:22:46 +08:00
logger . Errorw ( "获取数据库实例失败" , "error" , err )
2025-09-11 20:50:51 +08:00
return fmt . Errorf ( "获取数据库实例失败: %w" , err )
}
if err = sqlDB . Ping ( ) ; err != nil {
2025-11-05 22:22:46 +08:00
logger . Errorw ( "数据库连接测试失败" , "error" , err )
2025-09-11 20:50:51 +08:00
return fmt . Errorf ( "数据库连接测试失败: %w" , err )
}
// 设置连接池参数
sqlDB . SetMaxOpenConns ( ps . maxOpenConns )
sqlDB . SetMaxIdleConns ( ps . maxIdleConns )
sqlDB . SetConnMaxLifetime ( time . Duration ( ps . connMaxLifetime ) * time . Second )
2025-09-21 14:22:48 +08:00
// gorm会根据字段名自动创建外键约束, 但触发器Task的PlanID是不存在的, 所以需要关闭, 这个关闭对
ps . db . DisableForeignKeyConstraintWhenMigrating = true
2025-11-05 22:22:46 +08:00
logger . Info ( "PostgreSQL数据库连接成功" )
2025-09-11 20:50:51 +08:00
return nil
}
// Disconnect 断开与PostgreSQL数据库的连接
// 安全地关闭所有数据库连接
2025-11-05 22:22:46 +08:00
func ( ps * PostgresStorage ) Disconnect ( ctx context . Context ) error {
storageCtx , logger := logs . Trace ( ctx , ps . ctx , "Disconnect" )
2025-09-11 20:50:51 +08:00
if ps . db != nil {
2025-11-05 22:22:46 +08:00
logger . Info ( "正在断开PostgreSQL数据库连接" )
2025-09-11 20:50:51 +08:00
2025-11-05 22:22:46 +08:00
sqlDB , err := ps . db . WithContext ( storageCtx ) . DB ( )
2025-09-11 20:50:51 +08:00
if err != nil {
2025-11-05 22:22:46 +08:00
logger . Errorw ( "获取数据库实例失败" , "error" , err )
2025-09-11 20:50:51 +08:00
return fmt . Errorf ( "获取数据库实例失败: %w" , err )
}
if err := sqlDB . Close ( ) ; err != nil {
2025-11-05 22:22:46 +08:00
logger . Errorw ( "关闭数据库连接失败" , "error" , err )
2025-09-11 20:50:51 +08:00
return fmt . Errorf ( "关闭数据库连接失败: %w" , err )
}
2025-11-05 22:22:46 +08:00
logger . Info ( "PostgreSQL数据库连接已断开" )
2025-09-11 20:50:51 +08:00
}
return nil
}
// GetDB 获取GORM数据库实例
// 用于执行具体的数据库操作
2025-11-05 22:22:46 +08:00
func ( ps * PostgresStorage ) GetDB ( ctx context . Context ) * gorm . DB {
storageCtx := logs . AddFuncName ( ctx , ps . ctx , "GetDB" )
return ps . db . WithContext ( storageCtx )
2025-09-11 20:50:51 +08:00
}
// Migrate 执行数据库迁移
2025-11-05 22:22:46 +08:00
func ( ps * PostgresStorage ) Migrate ( ctx context . Context , models ... interface { } ) error {
storageCtx , logger := logs . Trace ( ctx , ps . ctx , "Migrate" )
2025-09-11 20:50:51 +08:00
if len ( models ) == 0 {
2025-11-05 22:22:46 +08:00
logger . Info ( "没有需要迁移的数据库模型,跳过迁移步骤" )
2025-09-11 20:50:51 +08:00
return nil
}
2025-11-05 22:22:46 +08:00
logger . Info ( "正在自动迁移数据库表结构" )
if err := ps . db . WithContext ( storageCtx ) . AutoMigrate ( models ... ) ; err != nil {
logger . Errorw ( "数据库表结构迁移失败" , "error" , err )
2025-09-11 20:50:51 +08:00
return fmt . Errorf ( "数据库表结构迁移失败: %w" , err )
}
2025-11-05 22:22:46 +08:00
logger . Info ( "数据库表结构迁移完成" )
2025-09-24 16:34:16 +08:00
// -- 处理gorm做不到的初始化逻辑 --
2025-11-05 22:22:46 +08:00
if err := ps . creatingIndex ( storageCtx ) ; err != nil {
2025-09-24 16:48:41 +08:00
return err
2025-09-24 16:34:16 +08:00
}
2025-09-25 10:34:35 +08:00
// 如果是 TimescaleDB, 则将部分表转换为 hypertable
2025-09-24 16:34:16 +08:00
if ps . isTimescaleDB {
2025-11-05 22:22:46 +08:00
logger . Info ( "检测到 TimescaleDB, 准备进行超表转换和压缩策略配置" )
if err := ps . setupTimescaleDB ( storageCtx ) ; err != nil {
2025-09-24 23:08:59 +08:00
return err
2025-09-24 16:34:16 +08:00
}
}
2025-09-11 20:50:51 +08:00
return nil
}
2025-09-24 16:48:41 +08:00
2025-09-27 23:05:48 +08:00
// setupTimescaleDB 统一处理所有 TimescaleDB 相关的设置
2025-11-05 22:22:46 +08:00
func ( ps * PostgresStorage ) setupTimescaleDB ( ctx context . Context ) error {
storageCtx := logs . AddFuncName ( ctx , ps . ctx , "setupTimescaleDB" )
if err := ps . creatingHyperTable ( storageCtx ) ; err != nil {
2025-09-27 23:05:48 +08:00
return err
}
2025-11-05 22:22:46 +08:00
if err := ps . applyCompressionPolicies ( storageCtx ) ; err != nil {
2025-09-27 23:05:48 +08:00
return err
}
return nil
}
2025-09-24 23:08:59 +08:00
// creatingHyperTable 用于在数据库是 TimescaleDB 时创建超表
2025-11-05 22:22:46 +08:00
func ( ps * PostgresStorage ) creatingHyperTable ( ctx context . Context ) error {
storageCtx , logger := logs . Trace ( ctx , ps . ctx , "creatingHyperTable" )
2025-09-27 23:05:48 +08:00
// 定义一个辅助结构体来管理超表转换
tablesToConvert := [ ] struct {
model interface { TableName ( ) string }
timeColumn string
} {
{ models . SensorData { } , "time" } ,
{ models . DeviceCommandLog { } , "sent_at" } ,
2025-09-30 21:44:03 +08:00
{ models . PlanExecutionLog { } , "created_at" } ,
{ models . TaskExecutionLog { } , "created_at" } ,
2025-09-27 23:05:48 +08:00
{ models . PendingCollection { } , "created_at" } ,
2025-09-28 00:13:47 +08:00
{ models . UserActionLog { } , "time" } ,
2025-10-06 15:35:20 +08:00
{ models . MedicationLog { } , "happened_at" } ,
2025-10-03 18:27:53 +08:00
{ models . PigBatchLog { } , "happened_at" } ,
2025-10-03 20:32:34 +08:00
{ models . WeighingBatch { } , "weighing_time" } ,
{ models . WeighingRecord { } , "weighing_time" } ,
2025-10-05 21:20:22 +08:00
{ models . PigTransferLog { } , "transfer_time" } ,
2025-10-06 15:35:20 +08:00
{ models . PigSickLog { } , "happened_at" } ,
2025-10-05 22:09:25 +08:00
{ models . PigPurchase { } , "purchase_date" } ,
{ models . PigSale { } , "sale_date" } ,
2025-10-25 13:28:19 +08:00
{ models . Notification { } , "alarm_timestamp" } ,
2025-11-07 22:19:55 +08:00
{ models . HistoricalAlarm { } , "trigger_time" } ,
2025-11-18 22:22:31 +08:00
{ models . RawMaterialStockLog { } , "happened_at" } ,
2025-09-27 23:05:48 +08:00
}
for _ , table := range tablesToConvert {
tableName := table . model . TableName ( )
2025-10-03 20:32:34 +08:00
chunkInterval := "1 days" // 统一设置为1天
2025-11-05 22:22:46 +08:00
logger . Debugw ( "准备将表转换为超表" , "table" , tableName , "chunk_interval" , chunkInterval )
2025-09-27 23:05:48 +08:00
sql := fmt . Sprintf ( "SELECT create_hypertable('%s', '%s', chunk_time_interval => INTERVAL '%s', if_not_exists => TRUE);" , tableName , table . timeColumn , chunkInterval )
2025-11-05 22:22:46 +08:00
if err := ps . db . WithContext ( storageCtx ) . Exec ( sql ) . Error ; err != nil {
logger . Errorw ( "转换为超表失败" , "table" , tableName , "error" , err )
2025-09-27 23:05:48 +08:00
return fmt . Errorf ( "将 %s 转换为超表失败: %w" , tableName , err )
}
2025-11-05 22:22:46 +08:00
logger . Debugw ( "成功将表转换为超表 (或已转换)" , "table" , tableName )
2025-09-27 23:05:48 +08:00
}
return nil
}
// applyCompressionPolicies 为超表配置自动压缩策略
2025-11-05 22:22:46 +08:00
func ( ps * PostgresStorage ) applyCompressionPolicies ( ctx context . Context ) error {
storageCtx , logger := logs . Trace ( ctx , ps . ctx , "applyCompressionPolicies" )
2025-09-27 23:05:48 +08:00
policies := [ ] struct {
model interface { TableName ( ) string }
segmentColumn string
} {
{ models . SensorData { } , "device_id" } ,
{ models . DeviceCommandLog { } , "device_id" } ,
{ models . PlanExecutionLog { } , "plan_id" } ,
{ models . TaskExecutionLog { } , "task_id" } ,
{ models . PendingCollection { } , "device_id" } ,
2025-09-28 00:13:47 +08:00
{ models . UserActionLog { } , "user_id" } ,
2025-10-06 15:35:20 +08:00
{ models . MedicationLog { } , "pig_batch_id" } ,
2025-10-03 20:32:34 +08:00
{ models . PigBatchLog { } , "pig_batch_id" } ,
{ models . WeighingBatch { } , "pig_batch_id" } ,
{ models . WeighingRecord { } , "weighing_batch_id" } ,
2025-10-05 21:20:22 +08:00
{ models . PigTransferLog { } , "pig_batch_id" } ,
2025-10-06 15:35:20 +08:00
{ models . PigSickLog { } , "pig_batch_id" } ,
2025-10-05 22:09:25 +08:00
{ models . PigPurchase { } , "pig_batch_id" } ,
{ models . PigSale { } , "pig_batch_id" } ,
2025-10-25 13:28:19 +08:00
{ models . Notification { } , "user_id" } ,
2025-11-07 22:19:55 +08:00
{ models . HistoricalAlarm { } , "source_id" } ,
2025-11-18 22:22:31 +08:00
{ models . RawMaterialStockLog { } , "raw_material_id" } ,
2025-09-27 23:05:48 +08:00
}
for _ , policy := range policies {
tableName := policy . model . TableName ( )
2025-10-03 20:32:34 +08:00
compressAfter := "3 days" // 统一设置为2天后( 即进入第3天) 开始压缩
2025-09-27 23:05:48 +08:00
// 1. 开启表的压缩设置,并指定分段列
2025-11-05 22:22:46 +08:00
logger . Debugw ( "为表启用压缩设置" , "table" , tableName , "segment_by" , policy . segmentColumn )
2025-10-19 20:51:30 +08:00
// 使用 + 而非Sprintf以规避goland静态检查报错
alterSQL := "ALTER TABLE" + " " + tableName + " SET (timescaledb.compress, timescaledb.compress_segmentby = '" + policy . segmentColumn + "');"
2025-11-05 22:22:46 +08:00
if err := ps . db . WithContext ( storageCtx ) . Exec ( alterSQL ) . Error ; err != nil {
2025-09-27 23:05:48 +08:00
// 忽略错误,因为这个设置可能是不可变的,重复执行会报错
2025-11-05 22:22:46 +08:00
logger . Warnw ( "启用压缩设置时遇到问题 (可能已设置,可忽略)" , "table" , tableName , "error" , err )
2025-09-27 23:05:48 +08:00
}
2025-11-05 22:22:46 +08:00
logger . Debugw ( "成功为表启用压缩设置 (或已启用)" , "table" , tableName )
2025-09-27 23:05:48 +08:00
// 2. 添加压缩策略
2025-11-05 22:22:46 +08:00
logger . Debugw ( "为表添加压缩策略" , "table" , tableName , "compress_after" , compressAfter )
2025-09-27 23:05:48 +08:00
policySQL := fmt . Sprintf ( "SELECT add_compression_policy('%s', INTERVAL '%s', if_not_exists => TRUE);" , tableName , compressAfter )
2025-11-05 22:22:46 +08:00
if err := ps . db . WithContext ( storageCtx ) . Exec ( policySQL ) . Error ; err != nil {
logger . Errorw ( "添加压缩策略失败" , "table" , tableName , "error" , err )
2025-09-27 23:05:48 +08:00
return fmt . Errorf ( "为 %s 添加压缩策略失败: %w" , tableName , err )
}
2025-11-05 22:22:46 +08:00
logger . Debugw ( "成功为表添加压缩策略 (或已存在)" , "table" , tableName )
2025-09-27 23:05:48 +08:00
}
2025-09-24 23:08:59 +08:00
return nil
}
2025-09-24 16:48:41 +08:00
// creatingIndex 用于创建gorm无法处理的索引, 如gin索引
2025-11-05 22:22:46 +08:00
func ( ps * PostgresStorage ) creatingIndex ( ctx context . Context ) error {
storageCtx , logger := logs . Trace ( ctx , ps . ctx , "creatingIndex" )
2025-09-24 16:48:41 +08:00
// 使用 IF NOT EXISTS 保证幂等性
// 如果索引已存在,此命令不会报错
2025-09-24 23:08:59 +08:00
2025-11-18 22:22:31 +08:00
// 为 raw_material_nutrients 表创建部分唯一索引,以兼容软删除
logger . Debug ( "正在为 raw_material_nutrients 表创建部分唯一索引" )
partialIndexSQL := "CREATE UNIQUE INDEX IF NOT EXISTS idx_raw_material_nutrients_unique_when_not_deleted ON raw_material_nutrients (raw_material_id, nutrient_id) WHERE deleted_at IS NULL;"
if err := ps . db . WithContext ( storageCtx ) . Exec ( partialIndexSQL ) . Error ; err != nil {
logger . Errorw ( "为 raw_material_nutrients 创建部分唯一索引失败" , "error" , err )
return fmt . Errorf ( "为 raw_material_nutrients 创建部分唯一索引失败: %w" , err )
}
logger . Debug ( "成功为 raw_material_nutrients 创建部分唯一索引 (或已存在)" )
2025-09-24 23:08:59 +08:00
// 为 sensor_data 表的 data 字段创建 GIN 索引
2025-11-05 22:22:46 +08:00
logger . Debug ( "正在为 sensor_data 表的 data 字段创建 GIN 索引" )
2025-09-24 23:08:59 +08:00
ginSensorDataIndexSQL := "CREATE INDEX IF NOT EXISTS idx_sensor_data_data_gin ON sensor_data USING GIN (data);"
2025-11-05 22:22:46 +08:00
if err := ps . db . WithContext ( storageCtx ) . Exec ( ginSensorDataIndexSQL ) . Error ; err != nil {
logger . Errorw ( "为 sensor_data 的 data 字段创建 GIN 索引失败" , "error" , err )
2025-09-24 16:48:41 +08:00
return fmt . Errorf ( "为 sensor_data 的 data 字段创建 GIN 索引失败: %w" , err )
}
2025-11-05 22:22:46 +08:00
logger . Debug ( "成功为 sensor_data 的 data 字段创建 GIN 索引 (或已存在)" )
2025-09-24 16:48:41 +08:00
// 为 tasks.parameters 创建 GIN 索引
2025-11-05 22:22:46 +08:00
logger . Debug ( "正在为 tasks 表的 parameters 字段创建 GIN 索引" )
2025-09-24 16:48:41 +08:00
taskGinIndexSQL := "CREATE INDEX IF NOT EXISTS idx_tasks_parameters_gin ON tasks USING GIN (parameters);"
2025-11-05 22:22:46 +08:00
if err := ps . db . WithContext ( storageCtx ) . Exec ( taskGinIndexSQL ) . Error ; err != nil {
logger . Errorw ( "为 tasks 的 parameters 字段创建 GIN 索引失败" , "error" , err )
2025-09-24 16:48:41 +08:00
return fmt . Errorf ( "为 tasks 的 parameters 字段创建 GIN 索引失败: %w" , err )
}
2025-11-05 22:22:46 +08:00
logger . Debug ( "成功为 tasks 的 parameters 字段创建 GIN 索引 (或已存在)" )
2025-09-24 23:08:59 +08:00
2025-09-24 16:48:41 +08:00
return nil
}