diff --git a/internal/api/api.go b/internal/api/api.go index e9aa30d..8939ea4 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -53,9 +53,6 @@ type API struct { // websocketManager WebSocket管理器 websocketManager *websocket.Manager - // websocketService WebSocket服务 - websocketService *service.WebSocketService - // heartbeatService 心跳服务 heartbeatService *service.HeartbeatService @@ -68,7 +65,7 @@ type API struct { // NewAPI 创建并返回一个新的API实例 // 初始化Gin引擎和相关配置 -func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRepo repository.OperationHistoryRepo, deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketService *service.WebSocketService, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *API { +func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRepo repository.OperationHistoryRepo, deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketManager *websocket.Manager, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *API { // 设置Gin为发布模式 gin.SetMode(gin.DebugMode) @@ -99,13 +96,10 @@ func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRe operationController := operation.NewController(operationHistoryRepo) // 创建设备控制控制器 - deviceController := device.NewController(deviceControlRepo, deviceRepo, websocketService, heartbeatService, deviceStatusPool) - - // 创建WebSocket管理器 - websocketManager := websocket.NewManager(websocketService, deviceRepo) + deviceController := device.NewController(deviceControlRepo, deviceRepo, websocketManager, heartbeatService, deviceStatusPool) // 创建远程控制控制器 - remoteController := remote.NewController(websocketService) + remoteController := remote.NewController(websocketManager) // 创建鉴权中间件 authMiddleware := middleware.NewAuthMiddleware(userRepo) @@ -119,7 +113,6 @@ func NewAPI(cfg *config.Config, userRepo repository.UserRepo, operationHistoryRe remoteController: remoteController, authMiddleware: authMiddleware, websocketManager: websocketManager, - websocketService: websocketService, heartbeatService: heartbeatService, deviceStatusPool: deviceStatusPool, logger: logs.NewLogger(), diff --git a/internal/controller/device/device.go b/internal/controller/device/device.go index 9b30e5f..3f7f3eb 100644 --- a/internal/controller/device/device.go +++ b/internal/controller/device/device.go @@ -12,6 +12,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/model" "git.huangwc.com/pig/pig-farm-controller/internal/service" "git.huangwc.com/pig/pig-farm-controller/internal/storage/repository" + "git.huangwc.com/pig/pig-farm-controller/internal/websocket" "github.com/gin-gonic/gin" ) @@ -121,18 +122,18 @@ func (req *DeviceRequest) BindAndValidate(data []byte) error { type Controller struct { deviceControlRepo repository.DeviceControlRepo deviceRepo repository.DeviceRepo - websocketService *service.WebSocketService + websocketManager *websocket.Manager heartbeatService *service.HeartbeatService deviceStatusPool *service.DeviceStatusPool logger *logs.Logger } // NewController 创建设备控制控制器实例 -func NewController(deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketService *service.WebSocketService, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *Controller { +func NewController(deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketManager *websocket.Manager, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *Controller { return &Controller{ deviceControlRepo: deviceControlRepo, deviceRepo: deviceRepo, - websocketService: websocketService, + websocketManager: websocketManager, heartbeatService: heartbeatService, deviceStatusPool: deviceStatusPool, logger: logs.NewLogger(), @@ -367,7 +368,7 @@ func (c *Controller) Switch(ctx *gin.Context) { } // 发送指令并等待响应 - response, err := c.websocketService.SendCommandAndWait("relay-001", "control_device", controlData, 0) + response, err := c.websocketManager.SendCommandAndWait("relay-001", "control_device", controlData, 0) if err != nil { c.logger.Error("通过WebSocket发送设备控制指令失败: " + err.Error()) controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "设备控制失败: "+err.Error()) diff --git a/internal/controller/remote/remote.go b/internal/controller/remote/remote.go index 2c305b4..7c90e0e 100644 --- a/internal/controller/remote/remote.go +++ b/internal/controller/remote/remote.go @@ -5,20 +5,20 @@ package remote import ( "git.huangwc.com/pig/pig-farm-controller/internal/controller" "git.huangwc.com/pig/pig-farm-controller/internal/logs" - "git.huangwc.com/pig/pig-farm-controller/internal/service" + "git.huangwc.com/pig/pig-farm-controller/internal/websocket" "github.com/gin-gonic/gin" ) // Controller 远程控制控制器 type Controller struct { - websocketService *service.WebSocketService + websocketManager *websocket.Manager logger *logs.Logger } // NewController 创建远程控制控制器实例 -func NewController(websocketService *service.WebSocketService) *Controller { +func NewController(websocketManager *websocket.Manager) *Controller { return &Controller{ - websocketService: websocketService, + websocketManager: websocketManager, logger: logs.NewLogger(), } } @@ -69,7 +69,7 @@ func (c *Controller) SendCommand(ctx *gin.Context) { } // 发送指令并等待响应 - response, err := c.websocketService.SendCommandAndWait(req.DeviceID, req.Command, commandData, 0) + response, err := c.websocketManager.SendCommandAndWait(req.DeviceID, req.Command, commandData, 0) if err != nil { c.logger.Error("发送指令失败: " + err.Error()) controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "发送指令失败: "+err.Error()) @@ -100,7 +100,7 @@ type ListConnectedDevicesResponseData struct { // @Router /api/v1/remote/devices [get] func (c *Controller) ListConnectedDevices(ctx *gin.Context) { // 获取已连接的设备列表 - devices := c.websocketService.GetConnectedDevices() + devices := c.websocketManager.GetConnectedDevices() data := ListConnectedDevicesResponseData{ Devices: devices, diff --git a/internal/core/application.go b/internal/core/application.go index 3da5336..ce71b9a 100644 --- a/internal/core/application.go +++ b/internal/core/application.go @@ -13,6 +13,7 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/storage/db" "git.huangwc.com/pig/pig-farm-controller/internal/storage/repository" "git.huangwc.com/pig/pig-farm-controller/internal/task" + "git.huangwc.com/pig/pig-farm-controller/internal/websocket" ) // Application 代表核心应用结构 @@ -39,8 +40,8 @@ type Application struct { // DeviceRepo 设备仓库实例 DeviceRepo repository.DeviceRepo - // WebSocketService WebSocket服务实例 - WebSocketService *service.WebSocketService + // WebsocketManager WebSocket管理器 + WebsocketManager *websocket.Manager // DeviceStatusPool 设备状态池实例 DeviceStatusPool *service.DeviceStatusPool @@ -100,17 +101,15 @@ func (app *Application) Start() error { app.DeviceStatusPool = service.NewDeviceStatusPool() // 初始化WebSocket服务 - app.WebSocketService = service.NewWebSocketService(app.DeviceRepo) - // 设置设备状态池 - app.WebSocketService.SetDeviceStatusPool(app.DeviceStatusPool) + app.WebsocketManager = websocket.NewManager(app.DeviceRepo) // 设置WebSocket超时时间 - app.WebSocketService.SetDefaultTimeout(app.Config.GetWebSocketTimeout()) + app.WebsocketManager.SetDefaultTimeout(app.Config.GetWebSocketConfig().Timeout) // 初始化心跳服务 - app.HeartbeatService = service.NewHeartbeatService(app.WebSocketService, app.DeviceStatusPool, app.DeviceRepo, app.Config) + app.HeartbeatService = service.NewHeartbeatService(app.WebsocketManager, app.DeviceStatusPool, app.DeviceRepo, app.Config) // 初始化API组件 - app.API = api.NewAPI(app.Config, app.UserRepo, app.OperationHistoryRepo, app.DeviceControlRepo, app.DeviceRepo, app.WebSocketService, app.HeartbeatService, app.DeviceStatusPool) + app.API = api.NewAPI(app.Config, app.UserRepo, app.OperationHistoryRepo, app.DeviceControlRepo, app.DeviceRepo, app.WebsocketManager, app.HeartbeatService, app.DeviceStatusPool) // 初始化任务执行器组件(使用5个工作协程) app.TaskExecutor = task.NewExecutor(5) diff --git a/internal/service/heartbeat.go b/internal/service/heartbeat.go index 885139a..adf3ee7 100644 --- a/internal/service/heartbeat.go +++ b/internal/service/heartbeat.go @@ -12,13 +12,14 @@ import ( "git.huangwc.com/pig/pig-farm-controller/internal/logs" "git.huangwc.com/pig/pig-farm-controller/internal/model" "git.huangwc.com/pig/pig-farm-controller/internal/storage/repository" + "git.huangwc.com/pig/pig-farm-controller/internal/websocket" "github.com/panjf2000/ants/v2" ) // HeartbeatService 心跳服务,负责管理设备的心跳检测 type HeartbeatService struct { - // websocketService WebSocket服务 - websocketService *WebSocketService + // websocketManager WebSocket管理器 + websocketManager *websocket.Manager // deviceStatusPool 设备状态池 deviceStatusPool *DeviceStatusPool @@ -52,7 +53,7 @@ type HeartbeatService struct { } // NewHeartbeatService 创建心跳服务实例 -func NewHeartbeatService(websocketService *WebSocketService, deviceStatusPool *DeviceStatusPool, deviceRepo repository.DeviceRepo, config *config.Config) *HeartbeatService { +func NewHeartbeatService(websocketManager *websocket.Manager, deviceStatusPool *DeviceStatusPool, deviceRepo repository.DeviceRepo, config *config.Config) *HeartbeatService { interval := config.GetHeartbeatConfig().Interval if interval <= 0 { @@ -65,7 +66,7 @@ func NewHeartbeatService(websocketService *WebSocketService, deviceStatusPool *D } return &HeartbeatService{ - websocketService: websocketService, + websocketManager: websocketManager, deviceStatusPool: deviceStatusPool, deviceRepo: deviceRepo, logger: logs.NewLogger(), @@ -241,7 +242,7 @@ func (hs *HeartbeatService) handleHeartbeatWithStatus(deviceID string, tempStatu } // 发送心跳包到设备 - response, err := hs.websocketService.SendCommandAndWait(deviceID, "heartbeat", heartbeatData, 0) + response, err := hs.websocketManager.SendCommandAndWait(deviceID, "heartbeat", heartbeatData, 0) if err != nil { hs.logger.Error(fmt.Sprintf("向设备 %s 发送心跳包失败: %v", deviceID, err)) // 更新设备状态为离线