定义上行事件监听器
This commit is contained in:
@@ -20,6 +20,7 @@ import (
|
|||||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/controller/plan"
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/controller/plan"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/controller/user"
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/controller/user"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/token"
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/token"
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/transport"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/repository"
|
||||||
@@ -41,11 +42,12 @@ type API struct {
|
|||||||
userController *user.Controller // 用户控制器实例
|
userController *user.Controller // 用户控制器实例
|
||||||
deviceController *device.Controller // 设备控制器实例
|
deviceController *device.Controller // 设备控制器实例
|
||||||
planController *plan.Controller // 计划控制器实例
|
planController *plan.Controller // 计划控制器实例
|
||||||
|
listenHandler transport.ListenHandler // 设备上行事件监听器
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAPI 创建并返回一个新的 API 实例
|
// NewAPI 创建并返回一个新的 API 实例
|
||||||
// 负责初始化 Gin 引擎、设置全局中间件,并注入所有必要的依赖。
|
// 负责初始化 Gin 引擎、设置全局中间件,并注入所有必要的依赖。
|
||||||
func NewAPI(cfg config.ServerConfig, logger *logs.Logger, userRepo repository.UserRepository, deviceRepository repository.DeviceRepository, planRepository repository.PlanRepository, tokenService token.TokenService) *API {
|
func NewAPI(cfg config.ServerConfig, logger *logs.Logger, userRepo repository.UserRepository, deviceRepository repository.DeviceRepository, planRepository repository.PlanRepository, tokenService token.TokenService, listenHandler transport.ListenHandler) *API {
|
||||||
// 设置 Gin 模式,例如 gin.ReleaseMode (生产模式) 或 gin.DebugMode (开发模式)
|
// 设置 Gin 模式,例如 gin.ReleaseMode (生产模式) 或 gin.DebugMode (开发模式)
|
||||||
// 从配置中获取 Gin 模式
|
// 从配置中获取 Gin 模式
|
||||||
gin.SetMode(cfg.Mode)
|
gin.SetMode(cfg.Mode)
|
||||||
@@ -60,11 +62,12 @@ func NewAPI(cfg config.ServerConfig, logger *logs.Logger, userRepo repository.Us
|
|||||||
|
|
||||||
// 初始化 API 结构体
|
// 初始化 API 结构体
|
||||||
api := &API{
|
api := &API{
|
||||||
engine: engine,
|
engine: engine,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
userRepo: userRepo,
|
userRepo: userRepo,
|
||||||
tokenService: tokenService,
|
tokenService: tokenService,
|
||||||
config: cfg,
|
config: cfg,
|
||||||
|
listenHandler: listenHandler,
|
||||||
// 在 NewAPI 中初始化用户控制器,并将其作为 API 结构体的成员
|
// 在 NewAPI 中初始化用户控制器,并将其作为 API 结构体的成员
|
||||||
userController: user.NewController(userRepo, logger, tokenService),
|
userController: user.NewController(userRepo, logger, tokenService),
|
||||||
// 在 NewAPI 中初始化设备控制器,并将其作为 API 结构体的成员
|
// 在 NewAPI 中初始化设备控制器,并将其作为 API 结构体的成员
|
||||||
@@ -113,6 +116,12 @@ func (a *API) setupRoutes() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 上行事件监听路由
|
||||||
|
a.engine.POST("/upstream", func(c *gin.Context) {
|
||||||
|
h := a.listenHandler.Handler()
|
||||||
|
h.ServeHTTP(c.Writer, c.Request)
|
||||||
|
})
|
||||||
|
|
||||||
// 添加 Swagger UI 路由
|
// 添加 Swagger UI 路由
|
||||||
a.engine.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
|
a.engine.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
|
||||||
a.logger.Info("Swagger UI is available at /swagger/index.html")
|
a.logger.Info("Swagger UI is available at /swagger/index.html")
|
||||||
|
|||||||
51
internal/app/service/transport/chirp_stack.go
Normal file
51
internal/app/service/transport/chirp_stack.go
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
package transport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ChirpStackListener 是一个监听器, 用于监听ChirpStack反馈的设备上行事件
|
||||||
|
type ChirpStackListener struct {
|
||||||
|
logger *logs.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewChirpStackListener(logger *logs.Logger) *ChirpStackListener {
|
||||||
|
return &ChirpStackListener{
|
||||||
|
logger: logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ChirpStackListener) Handler() http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
b, err := io.ReadAll(r.Body)
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Errorf("读取请求体失败: %v", err)
|
||||||
|
|
||||||
|
// TODO 直接崩溃不太合适
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
event := r.URL.Query().Get("event")
|
||||||
|
|
||||||
|
switch event {
|
||||||
|
case "up": // 链路上行事件
|
||||||
|
err = c.up(b)
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Errorf("处理链路上行事件失败: %v", err)
|
||||||
|
|
||||||
|
// TODO 直接崩溃不太合适
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
c.logger.Errorf("未知的ChirpStack事件: %s", event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// up 处理链路上行事件
|
||||||
|
func (c *ChirpStackListener) up(data []byte) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
8
internal/app/service/transport/transport.go
Normal file
8
internal/app/service/transport/transport.go
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
package transport
|
||||||
|
|
||||||
|
import "net/http"
|
||||||
|
|
||||||
|
// ListenHandler 是一个监听器, 用于监听设备上行事件
|
||||||
|
type ListenHandler interface {
|
||||||
|
Handler() http.HandlerFunc
|
||||||
|
}
|
||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/api"
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/api"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/token"
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/token"
|
||||||
|
"git.huangwc.com/pig/pig-farm-controller/internal/app/service/transport"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/config"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/database"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/database"
|
||||||
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
"git.huangwc.com/pig/pig-farm-controller/internal/infra/logs"
|
||||||
@@ -28,40 +29,43 @@ type Application struct {
|
|||||||
// NewApplication 创建并初始化一个新的 Application 实例。
|
// NewApplication 创建并初始化一个新的 Application 实例。
|
||||||
// 这是应用的“组合根”,所有依赖都在这里被创建和注入。
|
// 这是应用的“组合根”,所有依赖都在这里被创建和注入。
|
||||||
func NewApplication(configPath string) (*Application, error) {
|
func NewApplication(configPath string) (*Application, error) {
|
||||||
// 1. 加载配置
|
// 加载配置
|
||||||
cfg := config.NewConfig()
|
cfg := config.NewConfig()
|
||||||
if err := cfg.Load(configPath); err != nil {
|
if err := cfg.Load(configPath); err != nil {
|
||||||
return nil, fmt.Errorf("无法加载配置: %w", err)
|
return nil, fmt.Errorf("无法加载配置: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. 初始化日志记录器
|
// 初始化日志记录器
|
||||||
logger := logs.NewLogger(cfg.Log)
|
logger := logs.NewLogger(cfg.Log)
|
||||||
|
|
||||||
// 3. 初始化数据库存储
|
// 初始化数据库存储
|
||||||
storage, err := initStorage(cfg.Database, logger)
|
storage, err := initStorage(cfg.Database, logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err // 错误已在 initStorage 中被包装
|
return nil, err // 错误已在 initStorage 中被包装
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. 初始化任务执行器
|
// 初始化任务执行器
|
||||||
executor := task.NewExecutor(cfg.Heartbeat.Concurrency, logger)
|
executor := task.NewExecutor(cfg.Heartbeat.Concurrency, logger)
|
||||||
|
|
||||||
// 5. 初始化 Token 服务
|
// 初始化 Token 服务
|
||||||
tokenService := token.NewTokenService([]byte(cfg.App.JWTSecret))
|
tokenService := token.NewTokenService([]byte(cfg.App.JWTSecret))
|
||||||
|
|
||||||
// 6. 初始化用户仓库
|
// 初始化用户仓库
|
||||||
userRepo := repository.NewGormUserRepository(storage.GetDB())
|
userRepo := repository.NewGormUserRepository(storage.GetDB())
|
||||||
|
|
||||||
// 7. 初始化设备仓库
|
// 初始化设备仓库
|
||||||
deviceRepo := repository.NewGormDeviceRepository(storage.GetDB())
|
deviceRepo := repository.NewGormDeviceRepository(storage.GetDB())
|
||||||
|
|
||||||
// 8. 初始化计划仓库
|
// 初始化计划仓库
|
||||||
planRepo := repository.NewGormPlanRepository(storage.GetDB())
|
planRepo := repository.NewGormPlanRepository(storage.GetDB())
|
||||||
|
|
||||||
// 9. 初始化 API 服务器
|
// 初始化设备上行监听器
|
||||||
apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, tokenService)
|
listenHandler := transport.NewChirpStackListener(logger)
|
||||||
|
|
||||||
// 10. 组装 Application 对象
|
// 初始化 API 服务器
|
||||||
|
apiServer := api.NewAPI(cfg.Server, logger, userRepo, deviceRepo, planRepo, tokenService, listenHandler)
|
||||||
|
|
||||||
|
// 组装 Application 对象
|
||||||
app := &Application{
|
app := &Application{
|
||||||
Config: cfg,
|
Config: cfg,
|
||||||
Logger: logger,
|
Logger: logger,
|
||||||
|
|||||||
Reference in New Issue
Block a user