重构lora初始化部分
This commit is contained in:
@@ -8,8 +8,11 @@ import (
|
|||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/api"
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/api"
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/listener"
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/listener/chirp_stack"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/transport/lora"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Application 是整个应用的核心,封装了所有组件和生命周期。
|
// Application 是整个应用的核心,封装了所有组件和生命周期。
|
||||||
@@ -32,14 +35,11 @@ func NewApplication(configPath string) (*Application, error) {
|
|||||||
if err := cfg.Load(configPath); err != nil {
|
if err := cfg.Load(configPath); err != nil {
|
||||||
return nil, fmt.Errorf("无法加载配置: %w", err)
|
return nil, fmt.Errorf("无法加载配置: %w", err)
|
||||||
}
|
}
|
||||||
// 初始化全局日志记录器
|
|
||||||
logs.InitDefaultLogger(cfg.Log)
|
logs.InitDefaultLogger(cfg.Log)
|
||||||
|
|
||||||
// 为 Application 本身创建 Ctx
|
|
||||||
selfCtx := logs.AddCompName(context.Background(), "Application")
|
selfCtx := logs.AddCompName(context.Background(), "Application")
|
||||||
ctx := logs.AddFuncName(selfCtx, selfCtx, "NewApplication")
|
ctx := logs.AddFuncName(selfCtx, selfCtx, "NewApplication")
|
||||||
|
|
||||||
// 2. 初始化所有分层服务
|
// 2. 初始化基础设施和领域服务 (此时它们是解耦的)
|
||||||
infra, err := initInfrastructure(ctx, cfg)
|
infra, err := initInfrastructure(ctx, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("初始化基础设施失败: %w", err)
|
return nil, fmt.Errorf("初始化基础设施失败: %w", err)
|
||||||
@@ -50,7 +50,31 @@ func NewApplication(configPath string) (*Application, error) {
|
|||||||
}
|
}
|
||||||
appServices := initAppServices(ctx, infra, domain)
|
appServices := initAppServices(ctx, infra, domain)
|
||||||
|
|
||||||
// 3. 初始化 API 入口点
|
// 3. 【核心组装逻辑】创建应用层监听器并注入到基础设施层
|
||||||
|
// 此时,所有依赖项(repos, domain services, comm)都已准备就绪
|
||||||
|
upstreamHandler := listener.NewLoRaListener(
|
||||||
|
selfCtx,
|
||||||
|
infra.repos.areaControllerRepo,
|
||||||
|
infra.repos.pendingCollectionRepo,
|
||||||
|
infra.repos.deviceRepo,
|
||||||
|
infra.repos.sensorDataRepo,
|
||||||
|
infra.repos.deviceCommandLogRepo,
|
||||||
|
infra.repos.otaRepo,
|
||||||
|
infra.lora.comm,
|
||||||
|
)
|
||||||
|
|
||||||
|
// 根据 LoRa 模式完成最终的绑定
|
||||||
|
if cfg.Lora.Mode == config.LoraMode_LoRaWAN {
|
||||||
|
// 对于 LoRaWAN,创建真正的 Webhook 处理器并替换掉占位符
|
||||||
|
infra.lora.listenHandler = chirp_stack.NewChirpStackListener(selfCtx, upstreamHandler)
|
||||||
|
} else {
|
||||||
|
// 对于 LoRa Mesh,将处理器注入到已创建的 transport 实例中
|
||||||
|
if tp, ok := infra.lora.loraListener.(*lora.LoRaMeshUartPassthroughTransport); ok {
|
||||||
|
tp.SetHandler(upstreamHandler)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. 初始化 API 入口点 (现在可以安全地传入 listenHandler)
|
||||||
apiServer := api.NewAPI(
|
apiServer := api.NewAPI(
|
||||||
cfg.Server,
|
cfg.Server,
|
||||||
logs.AddCompName(context.Background(), "API"),
|
logs.AddCompName(context.Background(), "API"),
|
||||||
@@ -73,10 +97,10 @@ func NewApplication(configPath string) (*Application, error) {
|
|||||||
appServices.recipeService,
|
appServices.recipeService,
|
||||||
appServices.inventoryService,
|
appServices.inventoryService,
|
||||||
infra.tokenGenerator,
|
infra.tokenGenerator,
|
||||||
infra.lora.listenHandler,
|
infra.lora.listenHandler, // 此处传入的是最终组装好的 handler
|
||||||
)
|
)
|
||||||
|
|
||||||
// 4. 组装 Application 对象
|
// 5. 组装 Application 对象
|
||||||
app := &Application{
|
app := &Application{
|
||||||
cfgPath: configPath,
|
cfgPath: configPath,
|
||||||
Config: cfg,
|
Config: cfg,
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ func initInfrastructure(ctx context.Context, cfg *config.Config) (*Infrastructur
|
|||||||
|
|
||||||
repos := initRepositories(ctx, storage.GetDB(ctx))
|
repos := initRepositories(ctx, storage.GetDB(ctx))
|
||||||
|
|
||||||
lora, err := initLora(ctx, cfg, repos)
|
lora, err := initLora(ctx, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -399,10 +399,10 @@ type LoraComponents struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// initLora 根据配置初始化 LoRa 相关组件。
|
// initLora 根据配置初始化 LoRa 相关组件。
|
||||||
|
// 此函数只负责创建和返回底层的传输和监听组件,不关心业务处理器的创建。
|
||||||
func initLora(
|
func initLora(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
cfg *config.Config,
|
cfg *config.Config,
|
||||||
repos *Repositories,
|
|
||||||
) (*LoraComponents, error) {
|
) (*LoraComponents, error) {
|
||||||
var listenHandler listener.ListenHandler
|
var listenHandler listener.ListenHandler
|
||||||
var comm transport.Communicator
|
var comm transport.Communicator
|
||||||
@@ -410,38 +410,28 @@ func initLora(
|
|||||||
baseCtx := context.Background()
|
baseCtx := context.Background()
|
||||||
logger := logs.GetLogger(ctx)
|
logger := logs.GetLogger(ctx)
|
||||||
|
|
||||||
// 1. 创建统一的业务处理器 (App层适配器)
|
// 1. 根据配置初始化具体的传输层和监听器
|
||||||
// 它实现了 infra 层的 transport.UpstreamHandler 接口
|
|
||||||
upstreamHandler := listener.NewLoRaListener(
|
|
||||||
baseCtx,
|
|
||||||
repos.areaControllerRepo,
|
|
||||||
repos.pendingCollectionRepo,
|
|
||||||
repos.deviceRepo,
|
|
||||||
repos.sensorDataRepo,
|
|
||||||
repos.deviceCommandLogRepo,
|
|
||||||
)
|
|
||||||
|
|
||||||
// 2. 根据配置初始化具体的传输层和监听器
|
|
||||||
if cfg.Lora.Mode == config.LoraMode_LoRaWAN {
|
if cfg.Lora.Mode == config.LoraMode_LoRaWAN {
|
||||||
logger.Info("当前运行模式: lora_wan。初始化 ChirpStack 监听器和传输层。")
|
logger.Info("当前运行模式: lora_wan。初始化 ChirpStack 传输层和占位符监听器。")
|
||||||
|
|
||||||
// 2a. 创建 ChirpStack 的 Webhook 监听器 (infra),并注入 App 层的业务处理器
|
// 1a. 创建 ChirpStack 的发送器 (infra)
|
||||||
listenHandler = chirp_stack.NewChirpStackListener(baseCtx, upstreamHandler)
|
|
||||||
|
|
||||||
// 2b. 创建 ChirpStack 的发送器 (infra)
|
|
||||||
comm = lora.NewChirpStackTransport(logs.AddCompName(baseCtx, "ChirpStackTransport"), cfg.ChirpStack)
|
comm = lora.NewChirpStackTransport(logs.AddCompName(baseCtx, "ChirpStackTransport"), cfg.ChirpStack)
|
||||||
|
|
||||||
// 2c. LoRaWAN 模式下没有主动监听的 Listener,使用占位符
|
// 1b. LoRaWAN 模式下没有主动监听的 Listener,使用占位符
|
||||||
loraListener = lora.NewPlaceholderTransport(logs.AddCompName(baseCtx, "PlaceholderTransport"))
|
loraListener = lora.NewPlaceholderTransport(logs.AddCompName(baseCtx, "PlaceholderTransport"))
|
||||||
|
|
||||||
|
// 1c. Webhook 监听器将在 Application 层组装时创建,此处使用占位符
|
||||||
|
listenHandler = chirp_stack.NewPlaceholderListener(logs.AddCompName(baseCtx, "PlaceholderListener"))
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
logger.Info("当前运行模式: lora_mesh。初始化 LoRa Mesh 传输层和占位符监听器。")
|
logger.Info("当前运行模式: lora_mesh。初始化 LoRa Mesh 传输层和占位符监听器。")
|
||||||
|
|
||||||
// 2a. LoRa Mesh 模式下没有 Webhook 监听器,使用占位符
|
// 1a. LoRa Mesh 模式下没有 Webhook 监听器,使用占位符
|
||||||
listenHandler = chirp_stack.NewPlaceholderListener(logs.AddCompName(baseCtx, "PlaceholderListener"))
|
listenHandler = chirp_stack.NewPlaceholderListener(logs.AddCompName(baseCtx, "PlaceholderListener"))
|
||||||
|
|
||||||
// 2b. 创建串口的传输工具 (infra),它同时实现了发送和监听,并注入 App 层的业务处理器
|
// 1b. 创建串口的传输工具 (infra),它同时实现了发送和监听
|
||||||
tp, err := lora.NewLoRaMeshUartPassthroughTransport(baseCtx, cfg.LoraMesh, upstreamHandler)
|
// 注意:此处传入 nil,业务处理器将在 Application 层组装时被注入
|
||||||
|
tp, err := lora.NewLoRaMeshUartPassthroughTransport(baseCtx, cfg.LoraMesh, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("无法初始化 LoRa Mesh 模块: %w", err)
|
return nil, fmt.Errorf("无法初始化 LoRa Mesh 模块: %w", err)
|
||||||
}
|
}
|
||||||
@@ -449,6 +439,7 @@ func initLora(
|
|||||||
comm = tp
|
comm = tp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 2. 返回只包含基础设施组件的 LoraComponents
|
||||||
return &LoraComponents{
|
return &LoraComponents{
|
||||||
listenHandler: listenHandler,
|
listenHandler: listenHandler,
|
||||||
comm: comm,
|
comm: comm,
|
||||||
|
|||||||
@@ -109,6 +109,12 @@ func NewLoRaMeshUartPassthroughTransport(
|
|||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetHandler 允许在创建实例后设置或更新上行处理器。
|
||||||
|
// 这对于解决循环依赖至关重要。
|
||||||
|
func (t *LoRaMeshUartPassthroughTransport) SetHandler(handler transport.UpstreamHandler) {
|
||||||
|
t.handler = handler
|
||||||
|
}
|
||||||
|
|
||||||
// Listen 启动后台监听协程(非阻塞)
|
// Listen 启动后台监听协程(非阻塞)
|
||||||
func (t *LoRaMeshUartPassthroughTransport) Listen(ctx context.Context) error {
|
func (t *LoRaMeshUartPassthroughTransport) Listen(ctx context.Context) error {
|
||||||
// 注意:这里的 loraCtx 是从 selfCtx 派生的,因为它代表了这个组件自身的生命周期
|
// 注意:这里的 loraCtx 是从 selfCtx 派生的,因为它代表了这个组件自身的生命周期
|
||||||
|
|||||||
Reference in New Issue
Block a user