删掉所有文件

This commit is contained in:
2025-09-11 19:30:04 +08:00
parent 3743b5ddcd
commit 4cd05c1b9b
2302 changed files with 4 additions and 1210523 deletions

View File

@@ -1,276 +0,0 @@
// Package api 提供统一的API接口层
// 负责处理所有外部请求包括HTTP和WebSocket接口
// 将请求路由到相应的服务层进行处理
package api
import (
"context"
"fmt"
"net/http"
"strings"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/api/middleware"
"git.huangwc.com/pig/pig-farm-controller/internal/config"
"git.huangwc.com/pig/pig-farm-controller/internal/controller/device"
"git.huangwc.com/pig/pig-farm-controller/internal/controller/feed"
"git.huangwc.com/pig/pig-farm-controller/internal/controller/operation"
"git.huangwc.com/pig/pig-farm-controller/internal/controller/remote"
"git.huangwc.com/pig/pig-farm-controller/internal/controller/user"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/service"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/websocket"
"github.com/gin-gonic/gin"
)
// API 代表API接口层的结构
// 包含Gin引擎和HTTP服务器实例
type API struct {
// engine Gin引擎实例
engine *gin.Engine
// server HTTP服务器实例
server *http.Server
// config 应用配置
config *config.Config
// userController 用户控制器
userController *user.Controller
// operationController 操作历史控制器
operationController *operation.Controller
// deviceController 设备控制控制器
deviceController *device.Controller
// feedController 饲喂管理控制器
feedController *feed.Controller
// remoteController 远程控制控制器
remoteController *remote.Controller
// authMiddleware 鉴权中间件
authMiddleware *middleware.AuthMiddleware
// websocketManager WebSocket管理器
websocketManager *websocket.Manager
// heartbeatService 心跳服务
heartbeatService *service.HeartbeatService
// deviceStatusPool 设备状态池
deviceStatusPool *service.DeviceStatusPool
// logger 日志记录器
logger *logs.Logger
}
// NewAPI 创建并返回一个新的API实例
// 初始化Gin引擎和相关配置
func NewAPI(cfg *config.Config,
userRepo repository.UserRepo,
operationHistoryRepo repository.OperationHistoryRepo,
deviceControlRepo repository.DeviceControlRepo,
deviceRepo repository.DeviceRepo,
feedRepo repository.FeedPlanRepo,
websocketManager *websocket.Manager,
heartbeatService *service.HeartbeatService,
deviceStatusPool *service.DeviceStatusPool,
) *API {
// 设置Gin为发布模式
gin.SetMode(gin.DebugMode)
// 创建Gin引擎实例
engine := gin.New()
// 添加日志和恢复中间件
engine.Use(gin.LoggerWithFormatter(func(param gin.LogFormatterParams) string {
return fmt.Sprintf("[API] %s - [%s] \"%s %s %s %d %s \"%s\" %s\"\n",
param.ClientIP,
time.Now().Format(time.RFC3339),
param.Method,
param.Path,
param.Request.Proto,
param.StatusCode,
param.Latency,
param.Request.UserAgent(),
param.ErrorMessage,
)
}))
engine.Use(gin.Recovery())
// 创建用户控制器
userController := user.NewController(userRepo)
// 创建操作历史控制器
operationController := operation.NewController(operationHistoryRepo)
// 创建设备控制控制器
deviceController := device.NewController(deviceControlRepo, deviceRepo, websocketManager, heartbeatService, deviceStatusPool)
// 创建饲喂管理控制器
feedController := feed.NewController(feedRepo)
// 创建远程控制控制器
remoteController := remote.NewController(websocketManager)
// 创建鉴权中间件
authMiddleware := middleware.NewAuthMiddleware(userRepo)
return &API{
engine: engine,
config: cfg,
userController: userController,
operationController: operationController,
deviceController: deviceController,
feedController: feedController,
remoteController: remoteController,
authMiddleware: authMiddleware,
websocketManager: websocketManager,
heartbeatService: heartbeatService,
deviceStatusPool: deviceStatusPool,
logger: logs.NewLogger(),
}
}
// Start 启动HTTP服务器
func (a *API) Start() error {
// 配置路由
a.setupRoutes()
// 创建HTTP服务器
a.server = &http.Server{
Addr: fmt.Sprintf("%s:%d", a.config.Server.Host, a.config.Server.Port),
Handler: a.engine,
// 添加服务器配置
ReadTimeout: time.Duration(a.config.Server.ReadTimeout) * time.Second,
WriteTimeout: time.Duration(a.config.Server.WriteTimeout) * time.Second,
IdleTimeout: time.Duration(a.config.Server.IdleTimeout) * time.Second,
}
// 启动HTTP服务器
a.logger.Info(fmt.Sprintf("正在启动HTTP服务器 %s:%d", a.config.Server.Host, a.config.Server.Port))
go func() {
if err := a.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
a.logger.Error(fmt.Sprintf("HTTP服务器启动失败: %v", err))
}
}()
return nil
}
// Stop 停止HTTP服务器
func (a *API) Stop() error {
a.logger.Info("正在停止HTTP服务器")
// 创建一个5秒的超时上下文
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 优雅地关闭服务器
if err := a.server.Shutdown(ctx); err != nil {
a.logger.Error(fmt.Sprintf("HTTP服务器关闭错误: %v", err))
return err
}
a.logger.Info("HTTP服务器已停止")
return nil
}
// setupRoutes 配置路由
func (a *API) setupRoutes() {
// 基础路由示例
a.engine.GET("/health", a.healthHandler)
// WebSocket路由
a.engine.GET("/ws/device", a.websocketManager.HandleConnection)
// 用户相关路由
userGroup := a.engine.Group("/api/v1/user")
{
userGroup.POST("/register", a.userController.Register)
userGroup.POST("/login", a.userController.Login)
}
// 配置静态文件服务
a.engine.Static("/assets/", "./frontend/dist/assets/")
// 使用NoRoute处理器处理前端路由
a.engine.NoRoute(func(c *gin.Context) {
path := c.Request.URL.Path
// 判断是否为API路径
if strings.HasPrefix(path, "/api/") || strings.HasPrefix(path, "/ws/") || path == "/health" {
// API路径返回404
c.JSON(http.StatusNotFound, gin.H{
"error": "API路径未找到",
})
return
}
// 对于前端路由提供index.html支持Vue Router的history模式
c.File("./frontend/dist/index.html")
})
// 需要鉴权的路由组
protectedGroup := a.engine.Group("/api/v1")
protectedGroup.Use(a.authMiddleware.Handle())
{
// 操作历史相关路由
operationGroup := protectedGroup.Group("/operation")
{
operationGroup.POST("/", a.operationController.Create)
operationGroup.GET("/list", a.operationController.ListByUser)
operationGroup.GET("/:id", a.operationController.Get)
}
// 设备控制相关路由
deviceGroup := protectedGroup.Group("/device")
{
deviceGroup.POST("/switch", a.deviceController.Switch)
deviceGroup.GET("/list", a.deviceController.List)
deviceGroup.POST("/create", a.deviceController.Create)
deviceGroup.POST("/update", a.deviceController.Update)
deviceGroup.POST("/delete", a.deviceController.Delete)
deviceGroup.GET("/status", a.deviceController.GetDeviceStatus)
}
// 饲喂相关路由
feedGroup := protectedGroup.Group("/feed")
{
feedGroup.GET("/plan/list", a.feedController.ListPlans)
feedGroup.GET("/plan/detail", a.feedController.Detail)
feedGroup.POST("/plan/create", a.feedController.Create)
feedGroup.POST("/plan/update", a.feedController.Update)
feedGroup.POST("/plan/delete", a.feedController.Delete)
}
// 远程控制相关路由
remoteGroup := protectedGroup.Group("/remote")
{
remoteGroup.POST("/command", a.remoteController.SendCommand)
remoteGroup.GET("/devices", a.remoteController.ListConnectedDevices)
}
}
// TODO: 添加更多路由
}
// healthHandler 健康检查处理函数
// @Summary 健康检查
// @Description 检查API服务是否正常运行
// @Tags health
// @Accept json
// @Produce json
// @Success 200 {object} map[string]interface{}
// @Router /health [get]
func (a *API) healthHandler(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"status": "ok",
"message": "猪场控制器API正在运行",
})
}

View File

@@ -1,148 +0,0 @@
// Package middleware 提供HTTP中间件功能
// 包含鉴权、日志、恢复等中间件实现
package middleware
import (
"net/http"
"os"
"strings"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"github.com/gin-gonic/gin"
"github.com/golang-jwt/jwt/v5"
"gorm.io/gorm"
)
// AuthMiddleware 鉴权中间件结构
type AuthMiddleware struct {
userRepo repository.UserRepo
logger *logs.Logger
}
// AuthUser 用于在上下文中存储的用户信息
type AuthUser struct {
ID uint `json:"id"`
Username string `json:"username"`
}
// JWTClaims 自定义JWT声明
type JWTClaims struct {
UserID uint `json:"user_id"`
Username string `json:"username"`
jwt.RegisteredClaims
}
// NewAuthMiddleware 创建鉴权中间件实例
func NewAuthMiddleware(userRepo repository.UserRepo) *AuthMiddleware {
return &AuthMiddleware{
userRepo: userRepo,
logger: logs.NewLogger(),
}
}
// getJWTSecret 获取JWT密钥
func (m *AuthMiddleware) getJWTSecret() []byte {
// 在实际项目中,应该从配置文件或环境变量中读取
secret := os.Getenv("JWT_SECRET")
if secret == "" {
secret = "pig-farm-controller-secret-key" // 默认密钥
}
return []byte(secret)
}
// GenerateToken 为用户生成JWT token
func (m *AuthMiddleware) GenerateToken(userID uint, username string) (string, error) {
claims := JWTClaims{
UserID: userID,
Username: username,
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(24 * time.Hour)), // 24小时过期
IssuedAt: jwt.NewNumericDate(time.Now()),
NotBefore: jwt.NewNumericDate(time.Now()),
Issuer: "pig-farm-controller",
},
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return token.SignedString(m.getJWTSecret())
}
// Handle 鉴权中间件处理函数
func (m *AuthMiddleware) Handle() gin.HandlerFunc {
return func(c *gin.Context) {
// 从请求头中获取认证信息
authHeader := c.GetHeader("Authorization")
if authHeader == "" {
c.JSON(http.StatusUnauthorized, gin.H{"error": "缺少认证信息"})
c.Abort()
return
}
// 检查Bearer token格式
if !strings.HasPrefix(authHeader, "Bearer ") {
c.JSON(http.StatusUnauthorized, gin.H{"error": "认证信息格式错误"})
c.Abort()
return
}
// 解析token
tokenString := strings.TrimPrefix(authHeader, "Bearer ")
// 验证token并获取用户信息
user, err := m.getUserFromJWT(tokenString)
if err != nil {
if err == gorm.ErrRecordNotFound {
c.JSON(http.StatusUnauthorized, gin.H{"error": "用户不存在"})
} else {
m.logger.Error("Token验证失败: " + err.Error())
c.JSON(http.StatusUnauthorized, gin.H{"error": "无效的认证令牌"})
}
c.Abort()
return
}
// 将用户信息保存到上下文中,供后续处理函数使用
c.Set("user", user)
// 继续处理请求
c.Next()
}
}
// getUserFromJWT 从JWT token中获取用户信息
func (m *AuthMiddleware) getUserFromJWT(tokenString string) (*AuthUser, error) {
// 解析token
token, err := jwt.ParseWithClaims(tokenString, &JWTClaims{}, func(token *jwt.Token) (interface{}, error) {
return m.getJWTSecret(), nil
})
if err != nil {
return nil, err
}
// 验证token
if !token.Valid {
return nil, gorm.ErrRecordNotFound
}
// 获取声明
claims, ok := token.Claims.(*JWTClaims)
if !ok {
return nil, gorm.ErrRecordNotFound
}
// 根据用户ID查找用户
userModel, err := m.userRepo.FindByID(claims.UserID)
if err != nil {
return nil, err
}
user := &AuthUser{
ID: userModel.ID,
Username: userModel.Username,
}
return user, nil
}

View File

@@ -1,144 +0,0 @@
// Package config 提供配置文件读取和解析功能
// 支持YAML格式的配置文件解析
// 包含服务器和数据库相关配置
package config
import (
"fmt"
"os"
"gopkg.in/yaml.v2"
)
// Config 代表应用的完整配置结构
type Config struct {
// Server 服务器配置
Server ServerConfig `yaml:"server"`
// Database 数据库配置
Database DatabaseConfig `yaml:"database"`
// WebSocket WebSocket配置
WebSocket WebSocketConfig `yaml:"websocket"`
// Heartbeat 心跳配置
Heartbeat HeartbeatConfig `yaml:"heartbeat"`
}
// ServerConfig 代表服务器配置
type ServerConfig struct {
// Host 服务器监听IP
Host string `yaml:"host"`
// Port 服务器监听端口
Port int `yaml:"port"`
// ReadTimeout 读取超时(秒)
ReadTimeout int `yaml:"read_timeout"`
// WriteTimeout 写入超时(秒)
WriteTimeout int `yaml:"write_timeout"`
// IdleTimeout 空闲超时(秒)
IdleTimeout int `yaml:"idle_timeout"`
}
// DatabaseConfig 代表数据库配置
type DatabaseConfig struct {
// Host 数据库主机地址
Host string `yaml:"host"`
// Port 数据库端口
Port int `yaml:"port"`
// Username 数据库用户名
Username string `yaml:"username"`
// Password 数据库密码
Password string `yaml:"password"`
// DBName 数据库名称
DBName string `yaml:"dbname"`
// SSLMode SSL模式
SSLMode string `yaml:"sslmode"`
// MaxOpenConns 最大开放连接数
MaxOpenConns int `yaml:"max_open_conns"`
// MaxIdleConns 最大空闲连接数
MaxIdleConns int `yaml:"max_idle_conns"`
// ConnMaxLifetime 连接最大生命周期(秒)
ConnMaxLifetime int `yaml:"conn_max_lifetime"`
}
// WebSocketConfig 代表WebSocket配置
type WebSocketConfig struct {
// Timeout WebSocket请求超时时间(秒)
Timeout int `yaml:"timeout"`
// HeartbeatInterval 心跳检测间隔(秒), 如果超过这个时间没有消息往来系统会自动发送一个心跳包维持长链接
HeartbeatInterval int `yaml:"heartbeat_interval"`
}
// HeartbeatConfig 代表心跳配置
type HeartbeatConfig struct {
// Interval 心跳间隔(秒)
Interval int `yaml:"interval"`
// Concurrency 请求并发数
Concurrency int `yaml:"concurrency"`
}
// NewConfig 创建并返回一个新的配置实例
func NewConfig() *Config {
return &Config{
WebSocket: WebSocketConfig{
Timeout: 5, // 默认5秒超时
},
Heartbeat: HeartbeatConfig{
Interval: 30, // 默认30秒心跳间隔
},
}
}
// Load 从指定路径加载配置文件
func (c *Config) Load(path string) error {
// 读取配置文件
data, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("配置文件读取失败: %v", err)
}
// 解析YAML配置
if err := yaml.Unmarshal(data, c); err != nil {
return fmt.Errorf("配置文件解析失败: %v", err)
}
return nil
}
// GetDatabaseConnectionString 获取数据库连接字符串
func (c *Config) GetDatabaseConnectionString() string {
// 构建PostgreSQL连接字符串
return fmt.Sprintf(
"user=%s password=%s dbname=%s host=%s port=%d sslmode=%s",
c.Database.Username,
c.Database.Password,
c.Database.DBName,
c.Database.Host,
c.Database.Port,
c.Database.SSLMode,
)
}
// GetWebSocketConfig 获取WebSocket配置
func (c *Config) GetWebSocketConfig() WebSocketConfig {
return c.WebSocket
}
// GetHeartbeatConfig 获取心跳配置
func (c *Config) GetHeartbeatConfig() HeartbeatConfig {
return c.Heartbeat
}

View File

@@ -1,16 +0,0 @@
// Package biosecurity 提供生物安全控制功能
// 实现人员/车辆进出管理、消毒流程控制等
// 通过任务执行器执行具体控制任务
package biosecurity
// BiosecurityController 生物安全控制器
// 管理生物安全相关控制逻辑
type BiosecurityController struct {
// TODO: 定义生物安全控制器结构
}
// NewBiosecurityController 创建并返回一个新的生物安全控制器实例
func NewBiosecurityController() *BiosecurityController {
// TODO: 实现生物安全控制器初始化
return nil
}

View File

@@ -1,504 +0,0 @@
// Package device 提供设备控制相关功能的控制器
// 实现设备控制、查询等操作
package device
import (
"encoding/json"
"strconv"
"git.huangwc.com/pig/pig-farm-controller/internal/api/middleware"
"git.huangwc.com/pig/pig-farm-controller/internal/controller"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/model"
"git.huangwc.com/pig/pig-farm-controller/internal/service"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/websocket"
"github.com/gin-gonic/gin"
)
// ListResponse 设备列表响应结构体
type ListResponse struct {
Devices []DeviceListItem `json:"devices"`
}
// DeviceListItem 设备列表项结构体
type DeviceListItem struct {
model.Device
Active bool `json:"active"`
}
// DeviceRequest 设备创建/更新请求结构体
type DeviceRequest struct {
Name string `json:"name" binding:"required"` // 设备名称,必填
Type model.DeviceType `json:"type" binding:"required"` // 设备类型,必填
ParentID *uint `json:"parent_id,omitempty"` // 父设备ID可选
Address *string `json:"address,omitempty"` // 设备地址,可选
// 485总线设备的额外字段
BusNumber *int `json:"bus_number,omitempty"` // 485总线号
DeviceAddress *string `json:"device_address,omitempty"` // 485设备地址
}
// BindAndValidate 绑定并验证请求数据
func (req *DeviceRequest) BindAndValidate(data []byte) error {
// 创建一个map来解析原始JSON
raw := make(map[string]interface{})
if err := json.Unmarshal(data, &raw); err != nil {
return err
}
// 解析已知字段
if name, ok := raw["name"].(string); ok {
req.Name = name
}
if typ, ok := raw["type"].(string); ok {
req.Type = model.DeviceType(typ)
}
// 特殊处理parent_id字段
if parentIDVal, exists := raw["parent_id"]; exists && parentIDVal != nil {
switch v := parentIDVal.(type) {
case float64:
// JSON数字默认是float64类型
if v >= 0 {
parentID := uint(v)
req.ParentID = &parentID
}
case string:
// 如果是字符串尝试转换为uint
if v != "" && v != "null" {
if parentID, err := strconv.ParseUint(v, 10, 32); err == nil {
parentIDUint := uint(parentID)
req.ParentID = &parentIDUint
}
}
}
}
// 特殊处理address字段
if addressVal, exists := raw["address"]; exists && addressVal != nil {
switch v := addressVal.(type) {
case string:
// 如果是字符串,直接赋值
if v != "" {
req.Address = &v
}
}
}
// 特殊处理bus_number字段
if busNumberVal, exists := raw["bus_number"]; exists && busNumberVal != nil {
switch v := busNumberVal.(type) {
case float64:
// JSON数字默认是float64类型
busNumber := int(v)
req.BusNumber = &busNumber
case string:
// 如果是字符串尝试转换为int
if v != "" && v != "null" {
if busNumber, err := strconv.Atoi(v); err == nil {
req.BusNumber = &busNumber
}
}
}
}
// 特殊处理device_address字段
if deviceAddressVal, exists := raw["device_address"]; exists && deviceAddressVal != nil {
switch v := deviceAddressVal.(type) {
case string:
// 如果是字符串,直接赋值
if v != "" {
req.DeviceAddress = &v
}
}
}
return nil
}
// Controller 设备控制控制器
type Controller struct {
deviceControlRepo repository.DeviceControlRepo
deviceRepo repository.DeviceRepo
websocketManager *websocket.Manager
heartbeatService *service.HeartbeatService
deviceStatusPool *service.DeviceStatusPool
logger *logs.Logger
}
// NewController 创建设备控制控制器实例
func NewController(deviceControlRepo repository.DeviceControlRepo, deviceRepo repository.DeviceRepo, websocketManager *websocket.Manager, heartbeatService *service.HeartbeatService, deviceStatusPool *service.DeviceStatusPool) *Controller {
return &Controller{
deviceControlRepo: deviceControlRepo,
deviceRepo: deviceRepo,
websocketManager: websocketManager,
heartbeatService: heartbeatService,
deviceStatusPool: deviceStatusPool,
logger: logs.NewLogger(),
}
}
// List 获取设备列表
func (c *Controller) List(ctx *gin.Context) {
devices, err := c.deviceRepo.ListAll()
if err != nil {
c.logger.Error("获取设备列表失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "获取设备列表失败")
return
}
// 构建设备列表项,包含设备状态信息
deviceList := make([]DeviceListItem, len(devices))
for i, device := range devices {
// 从设备状态池获取设备状态,默认为非激活状态
active := false
if status, exists := c.deviceStatusPool.GetStatus(strconv.FormatUint(uint64(device.ID), 10)); exists {
active = status.Active
}
deviceList[i] = DeviceListItem{
Device: device,
Active: active,
}
}
controller.SendSuccessResponse(ctx, "获取设备列表成功", ListResponse{Devices: deviceList})
}
// Create 创建设备
func (c *Controller) Create(ctx *gin.Context) {
var req DeviceRequest
// 直接使用绑定和验证方法处理JSON数据
rawData, err := ctx.GetRawData()
if err != nil {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "无法读取请求数据: "+err.Error())
return
}
if err := req.BindAndValidate(rawData); err != nil {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "请求参数错误: "+err.Error())
return
}
device := &model.Device{
Name: req.Name,
Type: req.Type,
ParentID: req.ParentID,
Address: req.Address,
}
// 如果是485总线设备且提供了总线号和设备地址则合并为一个地址
if (req.Type == model.DeviceTypeFan || req.Type == model.DeviceTypeWaterCurtain) &&
req.BusNumber != nil && req.DeviceAddress != nil {
device.Set485Address(*req.BusNumber, *req.DeviceAddress)
}
if err := c.deviceRepo.Create(device); err != nil {
c.logger.Error("创建设备失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "创建设备失败")
return
}
// 刷新设备状态
c.heartbeatService.TriggerManualHeartbeatAsync()
controller.SendSuccessResponse(ctx, "创建设备成功", device)
}
// Update 更新设备
func (c *Controller) Update(ctx *gin.Context) {
var req struct {
ID uint `json:"id" binding:"required"`
DeviceRequest
}
// 直接使用绑定和验证方法处理JSON数据
rawData, err := ctx.GetRawData()
if err != nil {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "无法读取请求数据: "+err.Error())
return
}
// 先解析ID
var raw map[string]interface{}
if json.Unmarshal(rawData, &raw) == nil {
if idVal, ok := raw["id"]; ok {
switch id := idVal.(type) {
case float64:
req.ID = uint(id)
case string:
if idUint, err := strconv.ParseUint(id, 10, 32); err == nil {
req.ID = uint(idUint)
}
}
}
}
// 再解析DeviceRequest部分
if err := req.DeviceRequest.BindAndValidate(rawData); err != nil {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "请求参数错误: "+err.Error())
return
}
device, err := c.deviceRepo.FindByID(req.ID)
if err != nil {
c.logger.Error("查找设备失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.NotFoundCode, "设备不存在")
return
}
device.Name = req.Name
device.Type = req.Type
device.ParentID = req.ParentID
device.Address = req.Address
// 如果是485总线设备且提供了总线号和设备地址则合并为一个地址
if (req.Type == model.DeviceTypeFan || req.Type == model.DeviceTypeWaterCurtain) &&
req.BusNumber != nil && req.DeviceAddress != nil {
device.Set485Address(*req.BusNumber, *req.DeviceAddress)
}
if err := c.deviceRepo.Update(device); err != nil {
c.logger.Error("更新设备失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "更新设备失败")
return
}
// 刷新设备状态
c.heartbeatService.TriggerManualHeartbeatAsync()
controller.SendSuccessResponse(ctx, "更新设备成功", device)
}
// Delete 删除设备
func (c *Controller) Delete(ctx *gin.Context) {
var req struct {
ID uint `json:"id" binding:"required"`
}
if err := ctx.ShouldBindJSON(&req); err != nil {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "请求参数错误: "+err.Error())
return
}
// 先检查设备是否存在
_, err := c.deviceRepo.FindByID(req.ID)
if err != nil {
c.logger.Error("查找设备失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.NotFoundCode, "设备不存在")
return
}
if err := c.deviceRepo.Delete(req.ID); err != nil {
c.logger.Error("删除设备失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "删除设备失败")
return
}
// 刷新设备状态
c.heartbeatService.TriggerManualHeartbeatAsync()
controller.SendSuccessResponse(ctx, "删除设备成功", nil)
}
// SwitchRequest 设备控制请求结构体
type SwitchRequest struct {
ParentID *uint `json:"parent_id"` // 区域主控ID
DeviceType string `json:"device_type" binding:"required,oneof=fan water_curtain"`
DeviceID string `json:"device_id" binding:"required"`
Action string `json:"action" binding:"required,oneof=on off"`
}
// SwitchResponseData 设备控制响应数据结构体
type SwitchResponseData struct {
DeviceType string `json:"device_type"`
DeviceID string `json:"device_id"`
Action string `json:"action"`
Status string `json:"status"` // 添加状态字段
Message string `json:"message"` // 添加消息字段
}
// DeviceStatusResponse 设备状态响应结构体
type DeviceStatusResponse struct {
DeviceID string `json:"device_id"`
Active bool `json:"active"`
}
// RelayControlData 发送给中继设备的控制数据结构体
type RelayControlData struct {
DeviceType string `json:"device_type"`
DeviceID string `json:"device_id"`
Action string `json:"action"`
}
// RelayControlResponseData 中继设备控制响应数据结构体
type RelayControlResponseData struct {
Status string `json:"status"`
Message string `json:"message"`
}
// Switch 设备控制接口
func (c *Controller) Switch(ctx *gin.Context) {
// 从上下文中获取用户信息
userValue, exists := ctx.Get("user")
if !exists {
controller.SendErrorResponse(ctx, controller.UnauthorizedCode, "无法获取用户信息")
return
}
user, ok := userValue.(*middleware.AuthUser)
if !ok {
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "用户信息格式错误")
return
}
var req SwitchRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "请求参数错误")
return
}
// 通过WebSocket向中继设备发送控制指令
controlData := RelayControlData{
DeviceType: req.DeviceType,
DeviceID: req.DeviceID,
Action: req.Action,
}
// 发送指令并等待响应
response, err := c.websocketManager.SendCommandAndWait("relay-001", "control_device", controlData, 0)
if err != nil {
c.logger.Error("通过WebSocket发送设备控制指令失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "设备控制失败: "+err.Error())
return
}
// 解析响应数据
status := "解析失败"
message := "消息解析失败"
var responseData RelayControlResponseData
if err := response.ParseData(&responseData); err == nil {
status = responseData.Status
message = responseData.Message
}
// 创建设备控制记录
if err := c.createDeviceControlRecord(
user.ID,
req.DeviceID,
req.DeviceType,
req.Action,
status,
message,
); err != nil {
c.logger.Error("创建设备控制记录失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "记录控制历史失败")
return
}
data := SwitchResponseData{
DeviceType: req.DeviceType,
DeviceID: req.DeviceID,
Action: req.Action,
Status: status,
Message: message,
}
// 刷新设备状态
c.heartbeatService.TriggerManualHeartbeatAsync()
controller.SendSuccessResponse(ctx, "设备控制成功", data)
}
// createDeviceControlRecord 创建设备控制记录
func (c *Controller) createDeviceControlRecord(userID uint, deviceID, deviceType, action, status, result string) error {
// 获取设备信息
device, err := c.deviceRepo.FindByIDString(deviceID)
if err != nil {
return err
}
// 构建位置信息
var location string
switch device.Type {
case model.DeviceTypeRelay:
// 如果设备本身就是中继设备
location = device.Name
case model.DeviceTypePigPenController, model.DeviceTypeFeedMillController:
// 如果设备本身就是区域主控设备
if device.ParentID != nil {
// 获取中继设备
relayDevice, err := c.deviceRepo.FindByID(*device.ParentID)
if err != nil {
location = device.Name
} else {
location = relayDevice.Name + " -> " + device.Name
}
} else {
location = device.Name
}
default:
// 如果是普通设备(风机、水帘等)
if device.ParentID != nil {
// 获取区域主控设备
parentDevice, err := c.deviceRepo.FindByID(*device.ParentID)
if err != nil {
location = "未知区域"
} else {
// 检查区域主控设备是否有上级设备(中继设备)
if parentDevice.ParentID != nil {
// 获取中继设备
relayDevice, err := c.deviceRepo.FindByID(*parentDevice.ParentID)
if err != nil {
location = parentDevice.Name
} else {
location = relayDevice.Name + " -> " + parentDevice.Name
}
} else {
location = parentDevice.Name
}
}
} else {
location = "未知区域"
}
}
control := &model.DeviceControl{
UserID: userID,
Location: location,
DeviceType: model.DeviceType(deviceType),
DeviceID: deviceID,
Action: action,
Status: status,
Result: result,
}
return c.deviceControlRepo.Create(control)
}
// GetDeviceStatus 获取设备当前状态
func (c *Controller) GetDeviceStatus(ctx *gin.Context) {
deviceID := ctx.Query("device_id")
if deviceID == "" {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "设备ID不能为空")
return
}
// TODO 需要刷新设备状态吗? 刷新的话这个接口可能会很慢
// 从设备状态池中获取设备状态
status, exists := c.deviceStatusPool.GetStatus(deviceID)
if !exists {
controller.SendErrorResponse(ctx, controller.NotFoundCode, "设备状态不存在")
return
}
response := DeviceStatusResponse{
DeviceID: deviceID,
Active: status.Active,
}
controller.SendSuccessResponse(ctx, "获取设备状态成功", response)
}

View File

@@ -1,473 +0,0 @@
// Package feed 提供饲料控制功能
// 实现饲料制备和分配的控制逻辑
// 通过任务执行器执行具体控制任务
package feed
import (
"fmt"
"strconv"
"git.huangwc.com/pig/pig-farm-controller/internal/controller"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/model"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"github.com/gin-gonic/gin"
)
// Controller 饲料控制器
// 管理饲料制备和分配设备的控制逻辑
type Controller struct {
feedPlanRepo repository.FeedPlanRepo
logger *logs.Logger
}
// NewController 创建并返回一个新的饲料控制器实例
func NewController(feedPlanRepo repository.FeedPlanRepo) *Controller {
// TODO: 实现饲料控制器初始化
return &Controller{
feedPlanRepo: feedPlanRepo,
logger: logs.NewLogger(),
}
}
// CreateRequest 创建计划请求结构体
type CreateRequest struct {
// Name 计划名称
Name string `json:"name"`
// Description 计划描述
Description string `json:"description"`
// Type 计划类型(手动触发/自动触发)
Type model.FeedingPlanType `json:"type"`
// Enabled 是否启用
Enabled bool `json:"enabled"`
// ScheduleCron 定时任务表达式(仅当Type为auto时有效)
ScheduleCron *string `json:"schedule_cron,omitempty"`
// ExecutionLimit 执行次数限制(0表示无限制仅当Type为auto时有效)
ExecutionLimit int `json:"execution_limit"`
// ParentID 父计划ID用于支持子计划结构
ParentID *uint `json:"parent_id,omitempty"`
// OrderInParent 在父计划中的执行顺序
OrderInParent *int `json:"order_in_parent,omitempty"`
// IsMaster 是否为主计划(主计划可以包含子计划)
IsMaster bool `json:"is_master"`
// Steps 计划步骤列表
Steps []FeedingPlanStep `json:"steps"`
// SubPlans 子计划列表
SubPlans []CreateRequest `json:"sub_plans"`
}
// Create 创建饲料计划
func (c *Controller) Create(ctx *gin.Context) {
var req CreateRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "请求参数错误: "+err.Error())
return
}
// 校验计划结构
if err := c.validatePlanStructure(&req); err != nil {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "计划结构错误: "+err.Error())
return
}
// 转换请求结构体为模型
plan := c.convertToCreateModel(&req)
// 调用仓库创建计划
if err := c.feedPlanRepo.CreateFeedingPlan(plan); err != nil {
c.logger.Error("创建计划失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "创建计划失败")
return
}
controller.SendSuccessResponse(ctx, "创建计划成功", nil)
}
// validatePlanStructure 校验计划结构,不允许计划同时包含步骤和子计划
func (c *Controller) validatePlanStructure(req *CreateRequest) error {
// 检查当前计划是否同时包含步骤和子计划
if len(req.Steps) > 0 && len(req.SubPlans) > 0 {
return fmt.Errorf("计划不能同时包含步骤和子计划")
}
// 递归检查子计划
for _, subPlan := range req.SubPlans {
if err := c.validatePlanStructure(&subPlan); err != nil {
return err
}
}
return nil
}
// convertToCreateModel 将创建请求结构体转换为数据库模型
func (c *Controller) convertToCreateModel(req *CreateRequest) *model.FeedingPlan {
plan := &model.FeedingPlan{
Name: req.Name,
Description: req.Description,
Type: req.Type,
Enabled: req.Enabled,
ScheduleCron: req.ScheduleCron,
ExecutionLimit: req.ExecutionLimit,
ParentID: req.ParentID,
OrderInParent: req.OrderInParent,
// 不需要显式设置ID字段仓库层会处理
}
// 转换步骤
plan.Steps = make([]model.FeedingPlanStep, len(req.Steps))
for i, step := range req.Steps {
plan.Steps[i] = model.FeedingPlanStep{
// ID在创建时不需要设置
// PlanID会在创建过程中自动设置
StepOrder: step.StepOrder,
DeviceID: step.DeviceID,
TargetValue: step.TargetValue,
Action: step.Action,
ScheduleCron: step.ScheduleCron,
ExecutionLimit: step.ExecutionLimit,
}
}
// 转换子计划
plan.SubPlans = make([]model.FeedingPlan, len(req.SubPlans))
for i, subReq := range req.SubPlans {
plan.SubPlans[i] = *c.convertToCreateModel(&subReq)
}
return plan
}
// Delete 删除饲料计划
func (c *Controller) Delete(ctx *gin.Context) {
// 获取路径参数中的计划ID
var req struct {
ID uint `json:"id" binding:"required"`
}
if err := ctx.ShouldBindJSON(&req); err != nil {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "请求参数错误: "+err.Error())
return
}
// 调用仓库删除计划
if err := c.feedPlanRepo.DeleteFeedingPlan(uint(req.ID)); err != nil {
c.logger.Error("删除计划失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "删除计划失败")
return
}
controller.SendSuccessResponse(ctx, "删除计划成功", nil)
}
type ListPlansResponse struct {
Plans []ListPlanResponseItem `json:"plans"`
}
type ListPlanResponseItem struct {
// ID 计划ID
ID uint `json:"id"`
// Name 计划名称
Name string `json:"name"`
// Description 计划描述
Description string `json:"description"`
// Type 计划类型
Type model.FeedingPlanType `json:"type"`
// Enabled 是否启用
Enabled bool `json:"enabled"`
// ScheduleCron 定时任务表达式
ScheduleCron *string `json:"schedule_cron,omitempty"`
}
// ListPlans 获取饲料计划列表
func (c *Controller) ListPlans(ctx *gin.Context) {
introductions, err := c.feedPlanRepo.ListAllPlanIntroduction()
if err != nil {
c.logger.Error("获取设备列表失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "获取计划列表失败")
}
resp := ListPlansResponse{
Plans: []ListPlanResponseItem{},
}
for _, introduction := range introductions {
resp.Plans = append(resp.Plans, ListPlanResponseItem{
ID: introduction.ID,
Name: introduction.Name,
Description: introduction.Description,
Enabled: introduction.Enabled,
Type: introduction.Type,
ScheduleCron: introduction.ScheduleCron,
})
}
controller.SendSuccessResponse(ctx, "success", resp)
}
// UpdateRequest 更新计划请求结构体
type UpdateRequest struct {
// ID 计划ID
ID uint `json:"id"`
// Name 计划名称
Name string `json:"name"`
// Description 计划描述
Description string `json:"description"`
// Type 计划类型(手动触发/自动触发)
Type model.FeedingPlanType `json:"type"`
// Enabled 是否启用
Enabled bool `json:"enabled"`
// ScheduleCron 定时任务表达式(仅当Type为auto时有效)
ScheduleCron *string `json:"schedule_cron,omitempty"`
// ExecutionLimit 执行次数限制(0表示无限制仅当Type为auto时有效)
ExecutionLimit int `json:"execution_limit"`
// ParentID 父计划ID用于支持子计划结构
ParentID *uint `json:"parent_id,omitempty"`
// OrderInParent 在父计划中的执行顺序
OrderInParent *int `json:"order_in_parent,omitempty"`
// IsMaster 是否为主计划(主计划可以包含子计划)
IsMaster bool `json:"is_master"`
// Steps 计划步骤列表
Steps []FeedingPlanStep `json:"steps"`
// SubPlans 子计划列表
SubPlans []UpdateRequest `json:"sub_plans"`
}
// DetailResponse 喂料计划主表
type DetailResponse struct {
// ID 计划ID
ID uint `json:"id"`
// Name 计划名称
Name string `json:"name"`
// Description 计划描述
Description string `json:"description"`
// Type 计划类型(手动触发/自动触发)
Type model.FeedingPlanType `json:"type"`
// Enabled 是否启用
Enabled bool `json:"enabled"`
// ScheduleCron 定时任务表达式(仅当Type为auto时有效)
ScheduleCron *string `json:"schedule_cron,omitempty"`
// ExecutionLimit 执行次数限制(0表示无限制仅当Type为auto时有效)
ExecutionLimit int `json:"execution_limit"`
// ParentID 父计划ID用于支持子计划结构
ParentID *uint `json:"parent_id,omitempty"`
// OrderInParent 在父计划中的执行顺序
OrderInParent *int `json:"order_in_parent,omitempty"`
// Steps 计划步骤列表
Steps []FeedingPlanStep `json:"steps"`
// SubPlans 子计划列表
SubPlans []DetailResponse `json:"sub_plans"`
}
// FeedingPlanStep 喂料计划步骤表,表示计划中的每个设备动作
type FeedingPlanStep struct {
// ID 步骤ID
ID uint `json:"id"`
// PlanID 关联的计划ID
PlanID uint `json:"plan_id"`
// StepOrder 步骤顺序
StepOrder int `json:"step_order"`
// DeviceID 关联的设备ID
DeviceID uint `json:"device_id"`
// TargetValue 目标值(达到该值后停止工作切换到下一个设备)
TargetValue float64 `json:"target_value"`
// Action 动作(如:打开设备)
Action string `json:"action"`
// ScheduleCron 步骤定时任务表达式(可选)
ScheduleCron *string `json:"schedule_cron,omitempty"`
// ExecutionLimit 步骤执行次数限制(0表示无限制)
ExecutionLimit int `json:"execution_limit"`
}
// Detail 获取饲料计划列细节
func (c *Controller) Detail(ctx *gin.Context) {
// 获取查询参数中的计划ID
planIDStr := ctx.Query("id")
if planIDStr == "" {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "缺少计划ID参数")
return
}
planID, err := strconv.ParseUint(planIDStr, 10, 32)
if err != nil {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "无效的计划ID")
return
}
// 从仓库中获取计划详情
plan, err := c.feedPlanRepo.FindFeedingPlanByID(uint(planID))
if err != nil {
c.logger.Error("获取计划详情失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "获取计划详情失败")
return
}
// 转换为响应结构体
resp := c.convertToDetailResponse(plan)
controller.SendSuccessResponse(ctx, "success", resp)
}
// Update 更新饲料计划
func (c *Controller) Update(ctx *gin.Context) {
var req UpdateRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "请求参数错误: "+err.Error())
return
}
// 校验计划结构
if err := c.validateUpdatePlanStructure(&req); err != nil {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "计划结构错误: "+err.Error())
return
}
// 转换请求结构体为模型
plan := c.convertToUpdateModel(&req)
// 调用仓库更新计划
if err := c.feedPlanRepo.UpdateFeedingPlan(plan); err != nil {
c.logger.Error("更新计划失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "更新计划失败")
return
}
controller.SendSuccessResponse(ctx, "更新计划成功", nil)
}
// validateUpdatePlanStructure 校验更新计划结构,不允许计划同时包含步骤和子计划
func (c *Controller) validateUpdatePlanStructure(req *UpdateRequest) error {
// 检查当前计划是否同时包含步骤和子计划
if len(req.Steps) > 0 && len(req.SubPlans) > 0 {
return fmt.Errorf("计划不能同时包含步骤和子计划")
}
// 递归检查子计划
for _, subPlan := range req.SubPlans {
if err := c.validateUpdatePlanStructure(&subPlan); err != nil {
return err
}
}
return nil
}
// convertToUpdateModel 将更新请求结构体转换为数据库模型
func (c *Controller) convertToUpdateModel(req *UpdateRequest) *model.FeedingPlan {
plan := &model.FeedingPlan{
ID: req.ID,
Name: req.Name,
Description: req.Description,
Type: req.Type,
Enabled: req.Enabled,
ScheduleCron: req.ScheduleCron,
ExecutionLimit: req.ExecutionLimit,
ParentID: req.ParentID,
OrderInParent: req.OrderInParent,
Steps: make([]model.FeedingPlanStep, len(req.Steps)),
SubPlans: make([]model.FeedingPlan, len(req.SubPlans)),
}
// 转换步骤
for i, step := range req.Steps {
plan.Steps[i] = model.FeedingPlanStep{
ID: step.ID,
PlanID: step.PlanID,
StepOrder: step.StepOrder,
DeviceID: step.DeviceID,
TargetValue: step.TargetValue,
Action: step.Action,
ScheduleCron: step.ScheduleCron,
ExecutionLimit: step.ExecutionLimit,
}
}
// 转换子计划
for i, subReq := range req.SubPlans {
plan.SubPlans[i] = *c.convertToUpdateModel(&subReq)
}
return plan
}
// convertToDetailResponse 将数据库模型转换为响应结构体
func (c *Controller) convertToDetailResponse(plan *model.FeedingPlan) *DetailResponse {
resp := &DetailResponse{
ID: plan.ID,
Name: plan.Name,
Description: plan.Description,
Type: plan.Type,
Enabled: plan.Enabled,
ScheduleCron: plan.ScheduleCron,
ExecutionLimit: plan.ExecutionLimit,
ParentID: plan.ParentID,
OrderInParent: plan.OrderInParent,
Steps: make([]FeedingPlanStep, len(plan.Steps)),
SubPlans: make([]DetailResponse, len(plan.SubPlans)),
}
// 转换步骤
for i, step := range plan.Steps {
resp.Steps[i] = FeedingPlanStep{
ID: step.ID,
PlanID: step.PlanID,
StepOrder: step.StepOrder,
DeviceID: step.DeviceID,
TargetValue: step.TargetValue,
Action: step.Action,
ScheduleCron: step.ScheduleCron,
ExecutionLimit: step.ExecutionLimit,
}
}
// 转换子计划
for i, subPlan := range plan.SubPlans {
// 递归转换子计划
resp.SubPlans[i] = *c.convertToDetailResponse(&subPlan)
}
return resp
}

View File

@@ -1,16 +0,0 @@
// Package house 提供猪舍控制功能
// 实现对猪舍内设备的控制和环境监测
// 通过任务执行器执行具体控制任务
package house
// HouseController 猪舍控制器
// 管理猪舍内所有设备的控制逻辑
type HouseController struct {
// TODO: 定义猪舍控制器结构
}
// NewHouseController 创建并返回一个新的猪舍控制器实例
func NewHouseController() *HouseController {
// TODO: 实现猪舍控制器初始化
return nil
}

View File

@@ -1,157 +0,0 @@
// Package operation 提供操作历史相关功能的控制器
// 实现操作历史记录、查询等操作
package operation
import (
"strconv"
"git.huangwc.com/pig/pig-farm-controller/internal/api/middleware"
"git.huangwc.com/pig/pig-farm-controller/internal/controller"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/model"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"github.com/gin-gonic/gin"
)
// Controller 操作历史控制器
type Controller struct {
operationHistoryRepo repository.OperationHistoryRepo
logger *logs.Logger
}
// NewController 创建操作历史控制器实例
func NewController(operationHistoryRepo repository.OperationHistoryRepo) *Controller {
return &Controller{
operationHistoryRepo: operationHistoryRepo,
logger: logs.NewLogger(),
}
}
// CreateRequest 创建操作历史请求结构体
type CreateRequest struct {
Action string `json:"action" binding:"required"`
Target string `json:"target"`
Parameters string `json:"parameters"`
Status string `json:"status" binding:"required"`
Result string `json:"result"`
}
// Create 创建操作历史记录
func (c *Controller) Create(ctx *gin.Context) {
// 从上下文中获取用户信息
userValue, exists := ctx.Get("user")
if !exists {
controller.SendErrorResponse(ctx, controller.UnauthorizedCode, "无法获取用户信息")
return
}
user, ok := userValue.(*middleware.AuthUser)
if !ok {
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "用户信息格式错误")
return
}
var req CreateRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "请求参数错误")
return
}
// 创建操作历史记录
operation := &model.OperationHistory{
UserID: user.ID,
Action: req.Action,
Target: req.Target,
Parameters: req.Parameters,
Status: req.Status,
Result: req.Result,
}
if err := c.operationHistoryRepo.Create(operation); err != nil {
c.logger.Error("创建操作历史记录失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "创建操作历史记录失败")
return
}
controller.SendSuccessResponse(ctx, "操作历史记录创建成功", nil)
}
// ListByUser 获取当前用户的所有操作历史记录
func (c *Controller) ListByUser(ctx *gin.Context) {
// 从上下文中获取用户信息
userValue, exists := ctx.Get("user")
if !exists {
controller.SendErrorResponse(ctx, controller.UnauthorizedCode, "无法获取用户信息")
return
}
user, ok := userValue.(*middleware.AuthUser)
if !ok {
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "用户信息格式错误")
return
}
// 获取分页参数
page, _ := strconv.Atoi(ctx.DefaultQuery("page", "1"))
limit, _ := strconv.Atoi(ctx.DefaultQuery("limit", "10"))
// 确保分页参数有效
if page < 1 {
page = 1
}
if limit < 1 || limit > 100 {
limit = 10
}
// 计算偏移量
offset := (page - 1) * limit
// 查询操作历史记录
operations, err := c.operationHistoryRepo.ListByUserID(user.ID, offset, limit)
if err != nil {
c.logger.Error("获取操作历史记录失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "获取操作历史记录失败")
return
}
controller.SendSuccessResponse(ctx, "获取操作历史记录成功", operations)
}
// Get 获取单个操作历史记录
func (c *Controller) Get(ctx *gin.Context) {
// 获取操作历史记录ID
id, err := strconv.ParseUint(ctx.Param("id"), 10, 64)
if err != nil {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "无效的操作历史记录ID")
return
}
// 查询操作历史记录
operation, err := c.operationHistoryRepo.FindByID(uint(id))
if err != nil {
c.logger.Error("获取操作历史记录失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "获取操作历史记录失败")
return
}
// 从上下文中获取用户信息
userValue, exists := ctx.Get("user")
if !exists {
controller.SendErrorResponse(ctx, controller.UnauthorizedCode, "无法获取用户信息")
return
}
user, ok := userValue.(*middleware.AuthUser)
if !ok {
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "用户信息格式错误")
return
}
// 检查操作历史记录是否属于当前用户
if operation.UserID != user.ID {
controller.SendErrorResponse(ctx, controller.ForbiddenCode, "无权访问该操作历史记录")
return
}
controller.SendSuccessResponse(ctx, "获取操作历史记录成功", operation)
}

View File

@@ -1,110 +0,0 @@
// Package remote 提供远程设备控制相关功能的控制器
// 实现平台向中继设备发送指令等操作
package remote
import (
"git.huangwc.com/pig/pig-farm-controller/internal/controller"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/websocket"
"github.com/gin-gonic/gin"
)
// Controller 远程控制控制器
type Controller struct {
websocketManager *websocket.Manager
logger *logs.Logger
}
// NewController 创建远程控制控制器实例
func NewController(websocketManager *websocket.Manager) *Controller {
return &Controller{
websocketManager: websocketManager,
logger: logs.NewLogger(),
}
}
// SendCommandRequest 发送指令请求结构体
type SendCommandRequest struct {
DeviceID string `json:"device_id" binding:"required"`
Command string `json:"command" binding:"required"`
Data interface{} `json:"data,omitempty"`
}
// SendCommandResponseData 发送指令响应数据结构体
type SendCommandResponseData struct {
DeviceID string `json:"device_id"`
Command string `json:"command"`
Status string `json:"status"`
Data interface{} `json:"data,omitempty"`
}
// RelayCommandData 发送到中继设备的命令数据结构体
type RelayCommandData struct {
DeviceID string `json:"device_id"`
Command string `json:"command"`
Data interface{} `json:"data,omitempty"`
}
// SendCommand 向设备发送指令接口
// @Summary 向设备发送指令
// @Description 平台向指定设备发送控制指令
// @Tags remote
// @Accept json
// @Produce json
// @Param request body SendCommandRequest true "指令请求"
// @Success 200 {object} controller.APIResponse{data=SendCommandResponseData}
// @Router /api/v1/remote/command [post]
func (c *Controller) SendCommand(ctx *gin.Context) {
var req SendCommandRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "请求参数错误")
return
}
// 通过WebSocket服务向设备发送指令
commandData := RelayCommandData{
DeviceID: req.DeviceID,
Command: req.Command,
Data: req.Data,
}
// 发送指令并等待响应
response, err := c.websocketManager.SendCommandAndWait(req.DeviceID, req.Command, commandData, 0)
if err != nil {
c.logger.Error("发送指令失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "发送指令失败: "+err.Error())
return
}
data := SendCommandResponseData{
DeviceID: req.DeviceID,
Command: req.Command,
Status: "sent",
Data: response.Data,
}
controller.SendSuccessResponse(ctx, "指令发送成功", data)
}
// ListConnectedDevicesResponseData 获取已连接设备列表响应数据结构体
type ListConnectedDevicesResponseData struct {
Devices []string `json:"devices"`
}
// ListConnectedDevices 获取已连接设备列表接口
// @Summary 获取已连接设备列表
// @Description 获取当前通过WebSocket连接到平台的设备列表
// @Tags remote
// @Produce json
// @Success 200 {object} controller.APIResponse{data=ListConnectedDevicesResponseData}
// @Router /api/v1/remote/devices [get]
func (c *Controller) ListConnectedDevices(ctx *gin.Context) {
// 获取已连接的设备列表
devices := c.websocketManager.GetConnectedDevices()
data := ListConnectedDevicesResponseData{
Devices: devices,
}
controller.SendSuccessResponse(ctx, "获取设备列表成功", data)
}

View File

@@ -1,60 +0,0 @@
// Package controller 提供控制器层的公共功能
// 包含公共响应结构体和处理函数等
package controller
import (
"net/http"
"github.com/gin-gonic/gin"
)
// APIResponse 通用API响应结构体
type APIResponse struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
// 自定义状态码常量
const (
SuccessCode = 0 // 成功
InvalidParameterCode = 1001 // 参数无效
UnauthorizedCode = 1002 // 未授权
ForbiddenCode = 1003 // 禁止访问
NotFoundCode = 1004 // 资源未找到
InternalServerErrorCode = 1005 // 内部服务器错误
)
// SuccessResponseData 成功响应数据结构体
type SuccessResponseData struct {
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
// SendSuccessResponse 发送成功响应的公共函数
func SendSuccessResponse(ctx *gin.Context, message string, data interface{}) {
response := APIResponse{
Code: SuccessCode,
Message: message,
Data: data,
}
ctx.JSON(http.StatusOK, response)
}
// SendErrorResponse 发送错误响应的公共函数
func SendErrorResponse(ctx *gin.Context, code int, message string) {
response := APIResponse{
Code: code,
Message: message,
}
ctx.JSON(http.StatusOK, response)
}
// SendHTTPErrorResponse 发送HTTP错误响应的公共函数
func SendHTTPErrorResponse(ctx *gin.Context, httpCode int, code int, message string) {
response := APIResponse{
Code: code,
Message: message,
}
ctx.JSON(httpCode, response)
}

View File

@@ -1,120 +0,0 @@
// Package user 提供用户相关功能的控制器
// 实现用户注册、登录等操作
package user
import (
"git.huangwc.com/pig/pig-farm-controller/internal/api/middleware"
"git.huangwc.com/pig/pig-farm-controller/internal/controller"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"github.com/gin-gonic/gin"
"golang.org/x/crypto/bcrypt"
)
// Controller 用户控制器
type Controller struct {
userRepo repository.UserRepo
logger *logs.Logger
}
// NewController 创建用户控制器实例
func NewController(userRepo repository.UserRepo) *Controller {
return &Controller{
userRepo: userRepo,
logger: logs.NewLogger(),
}
}
// RegisterRequest 用户注册请求结构体
type RegisterRequest struct {
Username string `json:"username" binding:"required"`
Password string `json:"password" binding:"required"`
}
// LoginRequest 用户登录请求结构体
type LoginRequest struct {
Username string `json:"username" binding:"required"`
Password string `json:"password" binding:"required"`
}
// RegisterResponseData 用户注册响应数据
type RegisterResponseData struct {
ID uint `json:"id"`
Username string `json:"username"`
}
// LoginResponseData 用户登录响应数据
type LoginResponseData struct {
ID uint `json:"id"`
Username string `json:"username"`
Token string `json:"token"`
}
// Register 用户注册接口
func (c *Controller) Register(ctx *gin.Context) {
var req RegisterRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "请求参数错误")
return
}
// 检查用户名是否已存在
_, err := c.userRepo.FindByUsername(req.Username)
if err == nil {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "用户名已存在")
return
}
// 创建用户
user, err := c.userRepo.CreateUser(req.Username, req.Password)
if err != nil {
c.logger.Error("创建用户失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "用户注册失败")
return
}
data := RegisterResponseData{
ID: user.ID,
Username: user.Username,
}
controller.SendSuccessResponse(ctx, "用户注册成功", data)
}
// Login 用户登录接口
func (c *Controller) Login(ctx *gin.Context) {
var req LoginRequest
if err := ctx.ShouldBindJSON(&req); err != nil {
controller.SendErrorResponse(ctx, controller.InvalidParameterCode, "请求参数错误")
return
}
// 查找用户
user, err := c.userRepo.FindByUsername(req.Username)
if err != nil {
controller.SendErrorResponse(ctx, controller.UnauthorizedCode, "用户名或密码错误")
return
}
// 验证密码
if err := bcrypt.CompareHashAndPassword([]byte(user.PasswordHash), []byte(req.Password)); err != nil {
controller.SendErrorResponse(ctx, controller.UnauthorizedCode, "用户名或密码错误")
return
}
// 生成JWT token
token, err := middleware.NewAuthMiddleware(c.userRepo).GenerateToken(user.ID, user.Username)
if err != nil {
c.logger.Error("生成JWT token失败: " + err.Error())
controller.SendErrorResponse(ctx, controller.InternalServerErrorCode, "登录失败")
return
}
data := LoginResponseData{
ID: user.ID,
Username: user.Username,
Token: token,
}
controller.SendSuccessResponse(ctx, "登录成功", data)
}

View File

@@ -1,170 +0,0 @@
// Package core 提供核心应用协调功能
// 负责初始化和协调API、任务执行器和存储等核心组件
// 管理整个应用的生命周期
package core
import (
"fmt"
"git.huangwc.com/pig/pig-farm-controller/internal/api"
"git.huangwc.com/pig/pig-farm-controller/internal/config"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/service"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/db"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/task"
"git.huangwc.com/pig/pig-farm-controller/internal/websocket"
)
// Application 代表核心应用结构
// 协调API、任务执行器和存储组件的工作
type Application struct {
// Storage 存储组件实例
Storage db.Storage
// API API组件实例
API *api.API
// TaskExecutor 任务执行器组件实例
TaskExecutor *task.Executor
// UserRepo 用户仓库实例
UserRepo repository.UserRepo
// OperationHistoryRepo 操作历史仓库实例
OperationHistoryRepo repository.OperationHistoryRepo
// DeviceControlRepo 设备控制仓库实例
DeviceControlRepo repository.DeviceControlRepo
// DeviceRepo 设备仓库实例
DeviceRepo repository.DeviceRepo
// FeedPlanRepo 投喂计划仓库实例
FeedPlanRepo repository.FeedPlanRepo
// WebsocketManager WebSocket管理器
WebsocketManager *websocket.Manager
// DeviceStatusPool 设备状态池实例
DeviceStatusPool *service.DeviceStatusPool
// HeartbeatService 心跳服务实例
HeartbeatService *service.HeartbeatService
// Config 应用配置
Config *config.Config
// logger 日志组件实例
logger *logs.Logger
}
// NewApplication 创建并返回一个新的核心应用实例
// 初始化所有核心组件
func NewApplication(cfg *config.Config) *Application {
return &Application{
Config: cfg,
logger: logs.NewLogger(),
}
}
// Start 启动核心应用
// 按正确顺序启动所有核心组件
func (app *Application) Start() error {
// 从配置中获取数据库连接字符串
connectionString := app.Config.GetDatabaseConnectionString()
// 从配置中获取连接池参数
maxOpenConns := app.Config.Database.MaxOpenConns
maxIdleConns := app.Config.Database.MaxIdleConns
connMaxLifetime := app.Config.Database.ConnMaxLifetime
// 初始化存储组件
app.Storage = db.NewStorage(connectionString, maxOpenConns, maxIdleConns, connMaxLifetime)
// 启动存储组件
if err := app.Storage.Connect(); err != nil {
return fmt.Errorf("存储连接失败: %v", err)
}
app.logger.Info("存储连接成功")
// 初始化用户仓库
app.UserRepo = repository.NewUserRepo(app.Storage.GetDB())
// 初始化操作历史仓库
app.OperationHistoryRepo = repository.NewOperationHistoryRepo(app.Storage.GetDB())
// 初始化设备控制仓库
app.DeviceControlRepo = repository.NewDeviceControlRepo(app.Storage.GetDB())
// 初始化设备仓库
app.DeviceRepo = repository.NewDeviceRepo(app.Storage.GetDB())
app.FeedPlanRepo = repository.NewFeedPlanRepo(app.Storage.GetDB())
// 初始化设备状态池
app.DeviceStatusPool = service.NewDeviceStatusPool()
// 初始化WebSocket服务
app.WebsocketManager = websocket.NewManager(app.DeviceRepo)
// 设置WebSocket超时时间
app.WebsocketManager.SetDefaultTimeout(app.Config.GetWebSocketConfig().Timeout)
// 初始化心跳服务
app.HeartbeatService = service.NewHeartbeatService(app.WebsocketManager, app.DeviceStatusPool, app.DeviceRepo, app.Config)
// 初始化API组件
app.API = api.NewAPI(app.Config,
app.UserRepo,
app.OperationHistoryRepo,
app.DeviceControlRepo,
app.DeviceRepo,
app.FeedPlanRepo,
app.WebsocketManager,
app.HeartbeatService,
app.DeviceStatusPool,
)
// 初始化任务执行器组件(使用5个工作协程)
app.TaskExecutor = task.NewExecutor(5)
// 启动API组件
if err := app.API.Start(); err != nil {
return fmt.Errorf("API启动失败: %v", err)
}
app.logger.Info("API启动成功")
// 启动任务执行器组件
app.logger.Info("启动任务执行器")
app.TaskExecutor.Start()
// 启动心跳服务
app.logger.Info("启动心跳服务")
app.HeartbeatService.Start()
return nil
}
// Stop 停止核心应用
// 按正确顺序停止所有核心组件
func (app *Application) Stop() error {
// 停止API组件
if err := app.API.Stop(); err != nil {
app.logger.Error(fmt.Sprintf("API停止失败: %v", err))
}
// 停止任务执行器组件
app.logger.Info("停止任务执行器")
app.TaskExecutor.Stop()
// 停止心跳服务
app.logger.Info("停止心跳服务")
app.HeartbeatService.Stop()
// 停止存储组件
if err := app.Storage.Disconnect(); err != nil {
return fmt.Errorf("存储断开连接失败: %v", err)
}
app.logger.Info("存储断开连接成功")
return nil
}

View File

@@ -1,138 +0,0 @@
// Package model 提供数据模型定义
// 包含设备控制等相关数据结构
package model
import (
"fmt"
"strconv"
"strings"
"time"
"gorm.io/gorm"
)
// DeviceType 设备类型枚举
type DeviceType string
const (
// DeviceTypeFan 风机
DeviceTypeFan DeviceType = "fan"
// DeviceTypeWaterCurtain 水帘
DeviceTypeWaterCurtain DeviceType = "water_curtain"
// DeviceTypePigPenController 猪舍主控
DeviceTypePigPenController DeviceType = "pig_pen_controller"
// DeviceTypeFeedMillController 做料车间主控
DeviceTypeFeedMillController DeviceType = "feed_mill_controller"
// DeviceTypeRelay 中继设备
DeviceTypeRelay DeviceType = "relay"
)
// Device 代表设备信息
type Device struct {
// ID 设备ID
ID uint `gorm:"primaryKey;column:id" json:"id"`
// Name 设备名称
Name string `gorm:"not null;column:name" json:"name"`
// Type 设备类型
Type DeviceType `gorm:"not null;column:type" json:"type"`
// ParentID 上级设备ID(用于设备层级关系,指向区域主控设备)
ParentID *uint `gorm:"column:parent_id;index" json:"parent_id"`
// Address 设备地址普通设备的485总线地址或区域主控的Lora地址中继设备不需要
// 格式:对于普通设备,可以是"bus_number:device_address"或"device_address"
// 对于区域主控是Lora地址
Address *string `gorm:"column:address" json:"address,omitempty"`
// CreatedAt 创建时间
CreatedAt time.Time `gorm:"column:created_at" json:"created_at"`
// UpdatedAt 更新时间
UpdatedAt time.Time `gorm:"column:updated_at" json:"updated_at"`
// DeletedAt 删除时间(用于软删除)
DeletedAt gorm.DeletedAt `gorm:"index;column:deleted_at" json:"-"`
}
// TableName 指定Device模型对应的数据库表名
func (Device) TableName() string {
return "devices"
}
// Set485Address 设置普通设备的485总线地址和设备地址
func (d *Device) Set485Address(busNumber int, deviceAddress string) {
if d.Type != DeviceTypeFan && d.Type != DeviceTypeWaterCurtain {
return
}
address := fmt.Sprintf("%d:%s", busNumber, deviceAddress)
d.Address = &address
}
// Get485Address 获取普通设备的总线号和设备地址
func (d *Device) Get485Address() (busNumber int, deviceAddress string, err error) {
if d.Address == nil {
return 0, "", fmt.Errorf("address is nil")
}
parts := strings.Split(*d.Address, ":")
if len(parts) != 2 {
// 如果没有总线号默认为总线0
return 0, *d.Address, nil
}
busNumber, err = strconv.Atoi(parts[0])
if err != nil {
return 0, "", fmt.Errorf("invalid bus number: %v", err)
}
deviceAddress = parts[1]
return busNumber, deviceAddress, nil
}
// DeviceControl 代表设备控制记录
type DeviceControl struct {
// ID 记录ID
ID uint `gorm:"primaryKey;column:id" json:"id"`
// UserID 用户ID
UserID uint `gorm:"not null;column:user_id;index" json:"user_id"`
// Location 设备安装位置描述
Location string `gorm:"not null;column:location" json:"location"`
// DeviceType 设备类型
DeviceType DeviceType `gorm:"not null;column:device_type" json:"device_type"`
// DeviceID 设备编号
DeviceID string `gorm:"not null;column:device_id" json:"device_id"`
// Action 控制动作(开/关)
Action string `gorm:"not null;column:action" json:"action"`
// Status 控制状态(成功/失败)
Status string `gorm:"not null;column:status" json:"status"`
// Result 控制结果详情(可选)
Result string `gorm:"column:result" json:"result"`
// CreatedAt 创建时间
CreatedAt time.Time `gorm:"column:created_at" json:"created_at"`
// UpdatedAt 更新时间
UpdatedAt time.Time `gorm:"column:updated_at" json:"updated_at"`
// DeletedAt 删除时间(用于软删除)
DeletedAt gorm.DeletedAt `gorm:"index;column:deleted_at" json:"-"`
}
// TableName 指定DeviceControl模型对应的数据库表名
func (DeviceControl) TableName() string {
return "device_controls"
}

View File

@@ -1,193 +0,0 @@
package model
import (
"time"
"gorm.io/gorm"
)
// FeedingPlanType 喂料计划类型枚举
type FeedingPlanType string
const (
// FeedingPlanTypeManual 手动触发
FeedingPlanTypeManual FeedingPlanType = "manual"
// FeedingPlanTypeAuto 自动触发
FeedingPlanTypeAuto FeedingPlanType = "auto"
)
// FeedingPlan 喂料计划主表
type FeedingPlan struct {
// ID 计划ID
ID uint `gorm:"primaryKey;column:id" json:"id"`
// Name 计划名称
Name string `gorm:"not null;column:name" json:"name"`
// Description 计划描述
Description string `gorm:"column:description" json:"description"`
// Type 计划类型(手动触发/自动触发)
Type FeedingPlanType `gorm:"not null;column:type" json:"type"`
// Enabled 是否启用
Enabled bool `gorm:"not null;default:true;column:enabled" json:"enabled"`
// ScheduleCron 定时任务表达式(仅当Type为auto时有效)
ScheduleCron *string `gorm:"column:schedule_cron" json:"schedule_cron,omitempty"`
// ExecutionLimit 执行次数限制(0表示无限制仅当Type为auto时有效)
ExecutionLimit int `gorm:"not null;default:0;column:execution_limit" json:"execution_limit"`
// ParentID 父计划ID用于支持子计划结构
ParentID *uint `gorm:"column:parent_id;index" json:"parent_id,omitempty"`
// OrderInParent 在父计划中的执行顺序
OrderInParent *int `gorm:"column:order_in_parent" json:"order_in_parent,omitempty"`
// CreatedAt 创建时间
CreatedAt time.Time `gorm:"column:created_at" json:"created_at"`
// UpdatedAt 更新时间
UpdatedAt time.Time `gorm:"column:updated_at" json:"updated_at"`
// DeletedAt 删除时间(用于软删除)
DeletedAt gorm.DeletedAt `gorm:"index;column:deleted_at" json:"-"`
// Steps 计划步骤列表
Steps []FeedingPlanStep `gorm:"foreignKey:PlanID" json:"-"`
// SubPlans 子计划列表
SubPlans []FeedingPlan `gorm:"foreignKey:ParentID" json:"-"`
}
// TableName 指定FeedingPlan模型对应的数据库表名
func (FeedingPlan) TableName() string {
return "feeding_plans"
}
// FeedingPlanStep 喂料计划步骤表,表示计划中的每个设备动作
type FeedingPlanStep struct {
// ID 步骤ID
ID uint `gorm:"primaryKey;column:id" json:"id"`
// PlanID 关联的计划ID
PlanID uint `gorm:"not null;column:plan_id;index" json:"plan_id"`
// StepOrder 步骤顺序
StepOrder int `gorm:"not null;column:step_order" json:"step_order"`
// DeviceID 关联的设备ID
DeviceID uint `gorm:"not null;column:device_id;index" json:"device_id"`
// TargetValue 目标值(达到该值后停止工作切换到下一个设备)
TargetValue float64 `gorm:"not null;column:target_value" json:"target_value"`
// Action 动作(如:打开设备)
Action string `gorm:"not null;column:action" json:"action"`
// ScheduleCron 步骤定时任务表达式(可选)
ScheduleCron *string `gorm:"column:schedule_cron" json:"schedule_cron,omitempty"`
// ExecutionLimit 步骤执行次数限制(0表示无限制)
ExecutionLimit int `gorm:"not null;default:0;column:execution_limit" json:"execution_limit"`
// CreatedAt 创建时间
CreatedAt time.Time `gorm:"column:created_at" json:"created_at"`
// UpdatedAt 更新时间
UpdatedAt time.Time `gorm:"column:updated_at" json:"updated_at"`
// DeletedAt 删除时间(用于软删除)
DeletedAt gorm.DeletedAt `gorm:"index;column:deleted_at" json:"-"`
}
// TableName 指定FeedingPlanStep模型对应的数据库表名
func (FeedingPlanStep) TableName() string {
return "feeding_plan_steps"
}
// FeedingExecution 喂料执行记录表
type FeedingExecution struct {
// ID 执行记录ID
ID uint `gorm:"primaryKey;column:id" json:"id"`
// PlanID 关联的计划ID
PlanID uint `gorm:"not null;column:plan_id;index" json:"plan_id"`
// MasterPlanID 主计划ID如果是子计划执行记录主计划ID
MasterPlanID *uint `gorm:"column:master_plan_id;index" json:"master_plan_id,omitempty"`
// TriggerType 触发类型(手动/自动)
TriggerType FeedingPlanType `gorm:"not null;column:trigger_type" json:"trigger_type"`
// Status 执行状态(进行中/已完成/已取消/失败)
Status string `gorm:"not null;column:status" json:"status"`
// StartedAt 开始执行时间
StartedAt *time.Time `gorm:"column:started_at" json:"started_at,omitempty"`
// FinishedAt 完成时间
FinishedAt *time.Time `gorm:"column:finished_at" json:"finished_at,omitempty"`
// CreatedAt 创建时间
CreatedAt time.Time `gorm:"column:created_at" json:"created_at"`
// UpdatedAt 更新时间
UpdatedAt time.Time `gorm:"column:updated_at" json:"updated_at"`
// DeletedAt 删除时间(用于软删除)
DeletedAt gorm.DeletedAt `gorm:"index;column:deleted_at" json:"-"`
// Steps 执行步骤详情
Steps []FeedingExecutionStep `gorm:"foreignKey:ExecutionID" json:"-"`
}
// TableName 指定FeedingExecution模型对应的数据库表名
func (FeedingExecution) TableName() string {
return "feeding_executions"
}
// FeedingExecutionStep 喂料执行步骤详情表
type FeedingExecutionStep struct {
// ID 执行步骤ID
ID uint `gorm:"primaryKey;column:id" json:"id"`
// ExecutionID 关联的执行记录ID
ExecutionID uint `gorm:"not null;column:execution_id;index" json:"execution_id"`
// StepID 关联的计划步骤ID
StepID uint `gorm:"not null;column:step_id;index" json:"step_id"`
// DeviceID 关联的设备ID
DeviceID uint `gorm:"not null;column:device_id;index" json:"device_id"`
// TargetValue 目标值
TargetValue float64 `gorm:"not null;column:target_value" json:"target_value"`
// ActualValue 实际值
ActualValue *float64 `gorm:"column:actual_value" json:"actual_value,omitempty"`
// Status 步骤状态(待执行/执行中/已完成/失败)
Status string `gorm:"not null;column:status" json:"status"`
// StartedAt 开始执行时间
StartedAt *time.Time `gorm:"column:started_at" json:"started_at,omitempty"`
// FinishedAt 完成时间
FinishedAt *time.Time `gorm:"column:finished_at" json:"finished_at,omitempty"`
// CreatedAt 创建时间
CreatedAt time.Time `gorm:"column:created_at" json:"created_at"`
// UpdatedAt 更新时间
UpdatedAt time.Time `gorm:"column:updated_at" json:"updated_at"`
// DeletedAt 删除时间(用于软删除)
DeletedAt gorm.DeletedAt `gorm:"index;column:deleted_at" json:"-"`
}
// TableName 指定FeedingExecutionStep模型对应的数据库表名
func (FeedingExecutionStep) TableName() string {
return "feeding_execution_steps"
}

View File

@@ -1,47 +0,0 @@
// Package model 提供数据模型定义
// 包含用户、操作历史等相关数据结构
package model
import (
"time"
"gorm.io/gorm"
)
// OperationHistory 代表用户操作历史记录
type OperationHistory struct {
// ID 记录ID
ID uint `gorm:"primaryKey;column:id" json:"id"`
// UserID 用户ID
UserID uint `gorm:"not null;column:user_id;index" json:"user_id"`
// Action 操作类型/指令
Action string `gorm:"not null;column:action" json:"action"`
// Target 操作目标(可选)
Target string `gorm:"column:target" json:"target"`
// Parameters 操作参数(可选)
Parameters string `gorm:"column:parameters" json:"parameters"`
// Status 操作状态(成功/失败)
Status string `gorm:"not null;column:status" json:"status"`
// Result 操作结果详情(可选)
Result string `gorm:"column:result" json:"result"`
// CreatedAt 创建时间
CreatedAt time.Time `gorm:"column:created_at" json:"created_at"`
// UpdatedAt 更新时间
UpdatedAt time.Time `gorm:"column:updated_at" json:"updated_at"`
// DeletedAt 删除时间(用于软删除)
DeletedAt gorm.DeletedAt `gorm:"index;column:deleted_at" json:"-"`
}
// TableName 指定OperationHistory模型对应的数据库表名
func (OperationHistory) TableName() string {
return "operation_histories"
}

View File

@@ -1,38 +0,0 @@
// Package model 提供数据模型定义
// 包含用户、猪舍、饲料等相关数据结构
package model
import (
"time"
"gorm.io/gorm"
)
// User 代表系统用户
type User struct {
// ID 用户ID
ID uint `gorm:"primaryKey;column:id" json:"id"`
// Username 用户名
Username string `gorm:"uniqueIndex;not null;column:username" json:"username"`
// PasswordHash 密码哈希值
PasswordHash string `gorm:"not null;column:password_hash" json:"-"`
// CreatedAt 创建时间
CreatedAt time.Time `gorm:"column:created_at" json:"created_at"`
// UpdatedAt 更新时间
UpdatedAt time.Time `gorm:"column:updated_at" json:"updated_at"`
// DeletedAt 删除时间(用于软删除)
DeletedAt gorm.DeletedAt `gorm:"index;column:deleted_at" json:"-"`
// OperationHistories 用户的操作历史记录
OperationHistories []OperationHistory `gorm:"foreignKey:UserID" json:"-"`
}
// TableName 指定User模型对应的数据库表名
func (User) TableName() string {
return "users"
}

View File

@@ -1,83 +0,0 @@
// Package service 提供各种业务服务功能
package service
import (
"sync"
)
// DeviceStatus 设备状态信息
type DeviceStatus struct {
// Active 设备是否启动
Active bool
}
// DeviceStatusPool 设备状态池,用于管理所有设备的当前状态
type DeviceStatusPool struct {
// statuses 设备状态映射 设备ID:状态
statuses map[string]*DeviceStatus
// mutex 读写锁,保证并发安全
mutex sync.RWMutex
}
// NewDeviceStatusPool 创建设备状态池实例
func NewDeviceStatusPool() *DeviceStatusPool {
return &DeviceStatusPool{
statuses: make(map[string]*DeviceStatus),
}
}
// SetStatus 设置设备状态
func (dsp *DeviceStatusPool) SetStatus(deviceID string, status *DeviceStatus) {
dsp.mutex.Lock()
defer dsp.mutex.Unlock()
dsp.statuses[deviceID] = status
}
// GetStatus 获取设备状态
func (dsp *DeviceStatusPool) GetStatus(deviceID string) (*DeviceStatus, bool) {
dsp.mutex.RLock()
defer dsp.mutex.RUnlock()
status, exists := dsp.statuses[deviceID]
return status, exists
}
// DeleteStatus 删除设备状态
func (dsp *DeviceStatusPool) DeleteStatus(deviceID string) {
dsp.mutex.Lock()
defer dsp.mutex.Unlock()
delete(dsp.statuses, deviceID)
}
// GetAllStatuses 获取所有设备状态
func (dsp *DeviceStatusPool) GetAllStatuses() map[string]*DeviceStatus {
dsp.mutex.RLock()
defer dsp.mutex.RUnlock()
// 创建副本以避免外部修改
result := make(map[string]*DeviceStatus)
for id, status := range dsp.statuses {
result[id] = status
}
return result
}
// SetAllStatuses 全量更新设备状态池
func (dsp *DeviceStatusPool) SetAllStatuses(statuses map[string]*DeviceStatus) {
dsp.mutex.Lock()
defer dsp.mutex.Unlock()
// 清空现有状态
for id := range dsp.statuses {
delete(dsp.statuses, id)
}
// 添加新状态
for id, status := range statuses {
dsp.statuses[id] = status
}
}

View File

@@ -1,297 +0,0 @@
// Package service 提供各种业务服务功能
package service
import (
"context"
"errors"
"fmt"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/config"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/model"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"git.huangwc.com/pig/pig-farm-controller/internal/websocket"
"github.com/panjf2000/ants/v2"
)
// HeartbeatService 心跳服务,负责管理设备的心跳检测
type HeartbeatService struct {
// websocketManager WebSocket管理器
websocketManager *websocket.Manager
// deviceStatusPool 设备状态池
deviceStatusPool *DeviceStatusPool
// deviceRepo 设备仓库
deviceRepo repository.DeviceRepo
// logger 日志记录器
logger *logs.Logger
// 心跳间隔
heartbeatInterval time.Duration
// 手动心跳触发器
triggerChan chan struct{}
// ticker 心跳定时器
ticker *time.Ticker
// poolSize 线程池大小
poolSize int
// pool 线程池
pool *ants.Pool
// ctx 上下文
ctx context.Context
// cancel 取消函数
cancel context.CancelFunc
}
// NewHeartbeatService 创建心跳服务实例
func NewHeartbeatService(websocketManager *websocket.Manager, deviceStatusPool *DeviceStatusPool, deviceRepo repository.DeviceRepo, config *config.Config) *HeartbeatService {
interval := config.GetHeartbeatConfig().Interval
if interval <= 0 {
interval = 30 // 默认30秒心跳间隔
}
concurrency := config.GetHeartbeatConfig().Concurrency
if concurrency <= 0 {
concurrency = 10 // 默认10个并发
}
return &HeartbeatService{
websocketManager: websocketManager,
deviceStatusPool: deviceStatusPool,
deviceRepo: deviceRepo,
logger: logs.NewLogger(),
heartbeatInterval: time.Duration(interval) * time.Second,
poolSize: concurrency,
triggerChan: make(chan struct{}),
}
}
// Start 启动心跳服务
func (hs *HeartbeatService) Start() {
// 创建上下文
ctx, cancel := context.WithCancel(context.Background())
hs.cancel = cancel
// 创建定时器
hs.logger.Info(fmt.Sprintf("设置心跳间隔为 %d 秒", int(hs.heartbeatInterval.Seconds())))
hs.ticker = time.NewTicker(hs.heartbeatInterval)
// 创建线程池
hs.pool, _ = ants.NewPool(hs.poolSize)
// 启动心跳goroutine
go func() {
for {
select {
case <-hs.ticker.C:
hs.handleHeartbeatAll()
case <-hs.triggerChan:
hs.handleHeartbeatAll()
case <-ctx.Done():
hs.logger.Info("心跳服务已停止")
return
}
}
}()
hs.logger.Info("心跳服务已启动")
}
// Stop 停止心跳服务
func (hs *HeartbeatService) Stop() {
if hs == nil {
return
}
if hs.ticker != nil {
hs.ticker.Stop()
}
if hs.cancel != nil {
hs.cancel()
}
if hs.pool != nil {
hs.pool.Release()
}
hs.logger.Info("[Heartbeat] 心跳任务停止指令已发送")
}
// TriggerManualHeartbeat 手动触发心跳检测
func (hs *HeartbeatService) TriggerManualHeartbeat() {
hs.logger.Info("收到手动触发心跳检测请求")
hs.triggerChan <- struct{}{}
hs.logger.Info("手动心跳检测完成")
}
// TriggerManualHeartbeatAsync 手动触发心跳检测且不等待检测结果
func (hs *HeartbeatService) TriggerManualHeartbeatAsync() {
hs.logger.Info("收到手动触发异步心跳检测请求")
go func() {
hs.triggerChan <- struct{}{}
hs.logger.Info("手动心跳检测完成")
}()
}
// sendHeartbeatAll 发送心跳包到所有中继设备
func (hs *HeartbeatService) handleHeartbeatAll() {
// 记录心跳开始日志
hs.logger.Debug("开始发送心跳包")
// 获取所有中继设备
relays, err := hs.deviceRepo.ListAll()
if err != nil {
hs.logger.Error("获取设备列表失败: " + err.Error())
return
}
// 创建线程安全的临时map用于保存所有设备状态
tempStatusMap := &TempStatusMap{
data: make(map[string]*DeviceStatus),
mu: sync.RWMutex{},
}
// 遍历所有连接的设备并发送心跳包
wg := sync.WaitGroup{}
for _, relay := range relays {
// 心跳包之发送给中继设备
if relay.Type != model.DeviceTypeRelay {
continue
}
id := fmt.Sprintf("%v", relay.ID)
name := relay.Name
wg.Add(1)
err := hs.pool.Submit(func() {
defer wg.Done()
err := hs.handleHeartbeatWithStatus(id, tempStatusMap)
if err != nil {
hs.logger.Error("[Heartbeat] 向设备 " + name + "(id:" + id + ") 发送心跳包失败: " + err.Error())
}
})
if err != nil {
hs.logger.Error("向设备 " + name + "(id:" + id + ") 发送心跳包失败(线程池异常): " + err.Error())
}
}
wg.Wait()
// 获取所有设备列表
allDevices, err := hs.deviceRepo.ListAll()
if err != nil {
hs.logger.Error("获取所有设备列表失败: " + err.Error())
return
}
// 补齐临时map中缺失的设备缺失的设备全部设为离线状态
tempStatusMap.mu.Lock()
for _, device := range allDevices {
id := fmt.Sprintf("%v", device.ID)
if _, exists := tempStatusMap.data[id]; !exists {
tempStatusMap.data[id] = &DeviceStatus{
Active: false,
}
}
}
tempStatusMap.mu.Unlock()
// 将临时状态更新到全局状态池
hs.deviceStatusPool.SetAllStatuses(tempStatusMap.data)
hs.logger.Debug("心跳包发送完成")
}
// TempStatusMap 线程安全的临时状态映射
type TempStatusMap struct {
data map[string]*DeviceStatus
mu sync.RWMutex
}
// SetStatus 设置设备状态
func (tsm *TempStatusMap) SetStatus(deviceID string, status *DeviceStatus) {
tsm.mu.Lock()
defer tsm.mu.Unlock()
tsm.data[deviceID] = status
}
// GetStatus 获取设备状态
func (tsm *TempStatusMap) GetStatus(deviceID string) (*DeviceStatus, bool) {
tsm.mu.RLock()
defer tsm.mu.RUnlock()
status, exists := tsm.data[deviceID]
return status, exists
}
// sendHeartbeat 发送心跳包到所有中继设备
func (hs *HeartbeatService) handleHeartbeatWithStatus(deviceID string, tempStatusMap *TempStatusMap) error {
// 构造带时间戳的心跳包数据
heartbeatData := map[string]interface{}{
"timestamp": time.Now().Unix(),
}
// 发送心跳包到设备
response, err := hs.websocketManager.SendCommandAndWait(deviceID, "heartbeat", heartbeatData, 0)
if err != nil {
hs.logger.Error(fmt.Sprintf("向设备 %s 发送心跳包失败: %v", deviceID, err))
// 更新设备状态为离线
tempStatusMap.SetStatus(deviceID, &DeviceStatus{
Active: false,
})
return err
}
// 记录收到心跳响应
hs.logger.Debug(fmt.Sprintf("收到来自设备 %s 的心跳响应: %+v", deviceID, response))
// 有响应中继设备就是在线
tempStatusMap.SetStatus(deviceID, &DeviceStatus{
Active: true,
})
// 时间戳校验
if response.Timestamp.Unix() != heartbeatData["timestamp"] {
hs.logger.Error(fmt.Sprintf("心跳响应时间戳校验失败: %v , 响应时间戳应当与发送的时间戳一致", response))
return errors.New("心跳响应时间戳校验失败")
}
// 解析响应中的下级设备状态
type DeviceStatusInfo struct {
DeviceID string `json:"device_id"`
DeviceType string `json:"device_type"`
Status string `json:"status"`
}
type HeartbeatResponseData struct {
Devices []DeviceStatusInfo `json:"devices"`
}
var responseData HeartbeatResponseData
if err := response.ParseData(&responseData); err != nil {
hs.logger.Error(fmt.Sprintf("解析设备 %s 的心跳响应数据失败: %v", deviceID, err))
return err
}
// 更新所有下级设备的状态
for _, device := range responseData.Devices {
// 根据设备状态确定Active值
isActive := device.Status == "running" || device.Status == "online" || device.Status == "active"
tempStatusMap.SetStatus(device.DeviceID, &DeviceStatus{
Active: isActive,
})
}
return nil
}

View File

@@ -1,128 +0,0 @@
// Package db 提供基于PostgreSQL的数据存储功能
// 使用GORM作为ORM库来操作数据库
// 实现与PostgreSQL数据库的连接和基本操作
package db
import (
"fmt"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/model"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
// migrateModels 需要自动迁移的数据库模型列表
var migrateModels = []interface{}{
&model.User{},
&model.OperationHistory{},
&model.Device{},
&model.DeviceControl{},
&model.FeedingPlan{},
&model.FeedingPlanStep{},
&model.FeedingExecution{},
&model.FeedingExecutionStep{},
}
// PostgresStorage 代表基于PostgreSQL的存储实现
// 使用GORM作为ORM库
type PostgresStorage struct {
// db GORM数据库实例
db *gorm.DB
// connectionString 数据库连接字符串
connectionString string
// maxOpenConns 最大开放连接数
maxOpenConns int
// maxIdleConns 最大空闲连接数
maxIdleConns int
// connMaxLifetime 连接最大生命周期(秒)
connMaxLifetime int
// logger 日志记录器
logger *logs.Logger
}
// NewPostgresStorage 创建并返回一个新的PostgreSQL存储实例
// 初始化数据库连接和相关配置
func NewPostgresStorage(connectionString string, maxOpenConns, maxIdleConns, connMaxLifetime int) *PostgresStorage {
return &PostgresStorage{
connectionString: connectionString,
maxOpenConns: maxOpenConns,
maxIdleConns: maxIdleConns,
connMaxLifetime: connMaxLifetime,
logger: logs.NewLogger(),
}
}
// Connect 建立与PostgreSQL数据库的连接
// 使用GORM建立数据库连接
func (ps *PostgresStorage) Connect() error {
ps.logger.Info("正在连接PostgreSQL数据库")
var err error
ps.db, err = gorm.Open(postgres.Open(ps.connectionString), &gorm.Config{})
if err != nil {
ps.logger.Error(fmt.Sprintf("数据库连接失败: %v", err))
return fmt.Errorf("数据库连接失败: %v", err)
}
// 测试连接
sqlDB, err := ps.db.DB()
if err != nil {
ps.logger.Error(fmt.Sprintf("获取数据库实例失败: %v", err))
return fmt.Errorf("获取数据库实例失败: %v", err)
}
if err = sqlDB.Ping(); err != nil {
ps.logger.Error(fmt.Sprintf("数据库连接测试失败: %v", err))
return fmt.Errorf("数据库连接测试失败: %v", err)
}
// 设置连接池参数
sqlDB.SetMaxOpenConns(ps.maxOpenConns)
sqlDB.SetMaxIdleConns(ps.maxIdleConns)
sqlDB.SetConnMaxLifetime(time.Duration(ps.connMaxLifetime) * time.Second)
// 自动迁移数据库表结构
ps.logger.Info("正在自动迁移数据库表结构")
if err = ps.db.AutoMigrate(migrateModels...); err != nil {
ps.logger.Error(fmt.Sprintf("数据库表结构迁移失败: %v", err))
return fmt.Errorf("数据库表结构迁移失败: %v", err)
}
ps.logger.Info("数据库表结构迁移完成")
ps.logger.Info("PostgreSQL数据库连接成功")
return nil
}
// Disconnect 断开与PostgreSQL数据库的连接
// 安全地关闭所有数据库连接
func (ps *PostgresStorage) Disconnect() error {
if ps.db != nil {
ps.logger.Info("正在断开PostgreSQL数据库连接")
sqlDB, err := ps.db.DB()
if err != nil {
ps.logger.Error(fmt.Sprintf("获取数据库实例失败: %v", err))
return fmt.Errorf("获取数据库实例失败: %v", err)
}
if err := sqlDB.Close(); err != nil {
ps.logger.Error(fmt.Sprintf("关闭数据库连接失败: %v", err))
return fmt.Errorf("关闭数据库连接失败: %v", err)
}
ps.logger.Info("PostgreSQL数据库连接已断开")
}
return nil
}
// GetDB 获取GORM数据库实例
// 用于执行具体的数据库操作
func (ps *PostgresStorage) GetDB() *gorm.DB {
return ps.db
}

View File

@@ -1,29 +0,0 @@
// Package db 提供统一的数据存储接口
// 定义存储接口规范,支持多种存储后端实现
// 当前支持PostgreSQL实现
package db
import (
"gorm.io/gorm"
)
// Storage 代表统一的存储接口
// 所有存储实现都需要实现此接口定义的方法
type Storage interface {
// Connect 建立与存储后端的连接
Connect() error
// Disconnect 断开与存储后端的连接
Disconnect() error
// GetDB 获取数据库实例
GetDB() *gorm.DB
}
// NewStorage 创建并返回一个存储实例
// 根据配置返回相应的存储实现
func NewStorage(connectionString string, maxOpenConns, maxIdleConns, connMaxLifetime int) Storage {
// 当前默认返回PostgreSQL存储实现
s := NewPostgresStorage(connectionString, maxOpenConns, maxIdleConns, connMaxLifetime)
return s
}

View File

@@ -1,197 +0,0 @@
// Package repository 提供数据访问层实现
// 包含设备控制等数据实体的仓库接口和实现
package repository
import (
"strconv"
"git.huangwc.com/pig/pig-farm-controller/internal/model"
"gorm.io/gorm"
)
// DeviceRepo 设备仓库接口
type DeviceRepo interface {
// Create 创建设备
Create(device *model.Device) error
// FindByID 根据ID查找设备
FindByID(id uint) (*model.Device, error)
// FindByIDString 根据ID字符串查找设备
FindByIDString(id string) (*model.Device, error)
// FindByParentID 根据上级设备ID查找设备
FindByParentID(parentID uint) ([]*model.Device, error)
// FindByType 根据设备类型查找设备
FindByType(deviceType model.DeviceType) ([]*model.Device, error)
// Update 更新设备信息
Update(device *model.Device) error
// Delete 删除设备
Delete(id uint) error
// ListAll 获取所有设备列表
ListAll() ([]model.Device, error)
// FindRelayDevices 获取所有中继设备
FindRelayDevices() ([]*model.Device, error)
}
// DeviceControlRepo 设备控制仓库接口
type DeviceControlRepo interface {
// Create 创建设备控制记录
Create(control *model.DeviceControl) error
// FindByUserID 根据用户ID查找设备控制记录
FindByUserID(userID uint) ([]*model.DeviceControl, error)
// FindByID 根据ID查找设备控制记录
FindByID(id uint) (*model.DeviceControl, error)
// List 获取设备控制记录列表(分页)
List(offset, limit int) ([]*model.DeviceControl, error)
}
// deviceRepo 设备仓库实现
type deviceRepo struct {
db *gorm.DB
}
// deviceControlRepo 设备控制仓库实现
type deviceControlRepo struct {
db *gorm.DB
}
// NewDeviceRepo 创建设备仓库实例
func NewDeviceRepo(db *gorm.DB) DeviceRepo {
return &deviceRepo{
db: db,
}
}
// NewDeviceControlRepo 创建设备控制仓库实例
func NewDeviceControlRepo(db *gorm.DB) DeviceControlRepo {
return &deviceControlRepo{
db: db,
}
}
// Create 创建设备
func (r *deviceRepo) Create(device *model.Device) error {
result := r.db.Create(device)
return result.Error
}
// FindByID 根据ID查找设备
func (r *deviceRepo) FindByID(id uint) (*model.Device, error) {
var device model.Device
result := r.db.First(&device, id)
if result.Error != nil {
return nil, result.Error
}
return &device, nil
}
// FindByIDString 根据ID字符串查找设备
func (r *deviceRepo) FindByIDString(id string) (*model.Device, error) {
deviceID, err := strconv.ParseUint(id, 10, 64)
if err != nil {
return nil, err
}
var device model.Device
result := r.db.First(&device, deviceID)
if result.Error != nil {
return nil, result.Error
}
return &device, nil
}
// ListAll 获取所有设备列表
func (r *deviceRepo) ListAll() ([]model.Device, error) {
var devices []model.Device
if err := r.db.Find(&devices).Error; err != nil {
return nil, err
}
return devices, nil
}
// FindByParentID 根据上级设备ID查找设备
func (r *deviceRepo) FindByParentID(parentID uint) ([]*model.Device, error) {
var devices []*model.Device
result := r.db.Where("parent_id = ?", parentID).Find(&devices)
if result.Error != nil {
return nil, result.Error
}
return devices, nil
}
// FindByType 根据设备类型查找设备
func (r *deviceRepo) FindByType(deviceType model.DeviceType) ([]*model.Device, error) {
var devices []*model.Device
result := r.db.Where("type = ?", deviceType).Find(&devices)
if result.Error != nil {
return nil, result.Error
}
return devices, nil
}
// FindRelayDevices 获取所有中继设备
func (r *deviceRepo) FindRelayDevices() ([]*model.Device, error) {
var devices []*model.Device
result := r.db.Where("type = ?", model.DeviceTypeRelay).Find(&devices)
if result.Error != nil {
return nil, result.Error
}
return devices, nil
}
// Update 更新设备信息
func (r *deviceRepo) Update(device *model.Device) error {
result := r.db.Save(device)
return result.Error
}
// Delete 删除设备
func (r *deviceRepo) Delete(id uint) error {
result := r.db.Delete(&model.Device{}, id)
return result.Error
}
// Create 创建设备控制记录
func (r *deviceControlRepo) Create(control *model.DeviceControl) error {
result := r.db.Create(control)
return result.Error
}
// FindByUserID 根据用户ID查找设备控制记录
func (r *deviceControlRepo) FindByUserID(userID uint) ([]*model.DeviceControl, error) {
var controls []*model.DeviceControl
result := r.db.Where("user_id = ?", userID).Order("created_at DESC").Find(&controls)
if result.Error != nil {
return nil, result.Error
}
return controls, nil
}
// FindByID 根据ID查找设备控制记录
func (r *deviceControlRepo) FindByID(id uint) (*model.DeviceControl, error) {
var control model.DeviceControl
result := r.db.First(&control, id)
if result.Error != nil {
return nil, result.Error
}
return &control, nil
}
// List 获取设备控制记录列表(分页)
func (r *deviceControlRepo) List(offset, limit int) ([]*model.DeviceControl, error) {
var controls []*model.DeviceControl
result := r.db.Offset(offset).Limit(limit).Order("created_at DESC").Find(&controls)
if result.Error != nil {
return nil, result.Error
}
return controls, nil
}

View File

@@ -1,205 +0,0 @@
package repository
import (
"sort"
"git.huangwc.com/pig/pig-farm-controller/internal/model"
"gorm.io/gorm"
)
// FeedPlanRepo 饲喂管理接口
type FeedPlanRepo interface {
// ListAllPlanIntroduction 获取所有计划简介
ListAllPlanIntroduction() ([]*model.FeedingPlan, error)
// FindFeedingPlanByID 根据ID获取计划详情
FindFeedingPlanByID(id uint) (*model.FeedingPlan, error)
// CreateFeedingPlan 创建饲料计划
CreateFeedingPlan(feedingPlan *model.FeedingPlan) error
// DeleteFeedingPlan 删除饲料计划及其所有子计划和步骤
DeleteFeedingPlan(id uint) error
// UpdateFeedingPlan 更新饲料计划,采用先删除再重新创建的方式
UpdateFeedingPlan(feedingPlan *model.FeedingPlan) error
}
type feedPlanRepo struct {
db *gorm.DB
}
func NewFeedPlanRepo(db *gorm.DB) FeedPlanRepo {
return &feedPlanRepo{
db: db,
}
}
// ListAllPlanIntroduction 获取所有计划简介
func (f *feedPlanRepo) ListAllPlanIntroduction() ([]*model.FeedingPlan, error) {
var plans []*model.FeedingPlan
err := f.db.Model(&model.FeedingPlan{}).
Select("id, name, description, type, enabled, schedule_cron").
Find(&plans).Error
return plans, err
}
// FindFeedingPlanByID 根据ID获取计划详情
func (f *feedPlanRepo) FindFeedingPlanByID(feedingPlanID uint) (*model.FeedingPlan, error) {
var plan model.FeedingPlan
err := f.db.Where("id = ?", feedingPlanID).
Preload("Steps").
Preload("SubPlans").
First(&plan).Error
if err != nil {
return nil, err
}
return &plan, nil
}
// CreateFeedingPlan 创建饲料计划,包括步骤和子计划
func (f *feedPlanRepo) CreateFeedingPlan(feedingPlan *model.FeedingPlan) error {
// 清空所有ID确保创建新记录
f.clearAllIDs(feedingPlan)
return f.db.Transaction(func(tx *gorm.DB) error {
return f.createFeedingPlanWithTx(tx, feedingPlan)
})
}
// UpdateFeedingPlan 更新饲料计划,采用先删除再重新创建的方式
func (f *feedPlanRepo) UpdateFeedingPlan(feedingPlan *model.FeedingPlan) error {
// 检查计划是否存在
_, err := f.FindFeedingPlanByID(feedingPlan.ID)
if err != nil {
return err
}
return f.db.Transaction(func(tx *gorm.DB) error {
// 先删除原有的计划
if err := f.deleteFeedingPlanWithTx(tx, feedingPlan.ID); err != nil {
return err
}
// 清空所有ID包括子计划和步骤的ID
f.clearAllIDs(feedingPlan)
// 再重新创建更新后的计划
if err := f.createFeedingPlanWithTx(tx, feedingPlan); err != nil {
return err
}
return nil
})
}
// DeleteFeedingPlan 删除饲料计划及其所有子计划和步骤
func (f *feedPlanRepo) DeleteFeedingPlan(id uint) error {
return f.db.Transaction(func(tx *gorm.DB) error {
// 递归删除计划及其所有子计划
if err := f.deleteFeedingPlanWithTx(tx, id); err != nil {
return err
}
return nil
})
}
// deleteFeedingPlanWithTx 在事务中递归删除饲料计划
func (f *feedPlanRepo) deleteFeedingPlanWithTx(tx *gorm.DB, id uint) error {
// 先查找计划及其子计划
var plan model.FeedingPlan
if err := tx.Where("id = ?", id).Preload("SubPlans").First(&plan).Error; err != nil {
return err
}
// 递归删除所有子计划
for _, subPlan := range plan.SubPlans {
if err := f.deleteFeedingPlanWithTx(tx, subPlan.ID); err != nil {
return err
}
}
// 删除该计划的所有步骤
if err := tx.Where("plan_id = ?", id).Delete(&model.FeedingPlanStep{}).Error; err != nil {
return err
}
// 删除计划本身
if err := tx.Delete(&model.FeedingPlan{}, id).Error; err != nil {
return err
}
return nil
}
// createFeedingPlanWithTx 在事务中递归创建饲料计划
func (f *feedPlanRepo) createFeedingPlanWithTx(tx *gorm.DB, feedingPlan *model.FeedingPlan) error {
// 先创建计划主体
if err := tx.Create(feedingPlan).Error; err != nil {
return err
}
// 处理步骤 - 先按现有顺序排序再重新分配从0开始的连续编号
sort.Slice(feedingPlan.Steps, func(i, j int) bool {
return feedingPlan.Steps[i].StepOrder < feedingPlan.Steps[j].StepOrder
})
// 重新填充步骤编号
for i := range feedingPlan.Steps {
feedingPlan.Steps[i].StepOrder = i
feedingPlan.Steps[i].PlanID = feedingPlan.ID
}
// 如果有步骤,批量创建步骤
if len(feedingPlan.Steps) > 0 {
if err := tx.Create(&feedingPlan.Steps).Error; err != nil {
return err
}
}
// 处理子计划 - 重新填充子计划编号和父ID
sort.Slice(feedingPlan.SubPlans, func(i, j int) bool {
// 如果OrderInParent为nil放在最后
if feedingPlan.SubPlans[i].OrderInParent == nil {
return false
}
if feedingPlan.SubPlans[j].OrderInParent == nil {
return true
}
return *feedingPlan.SubPlans[i].OrderInParent < *feedingPlan.SubPlans[j].OrderInParent
})
// 重新填充子计划编号和父ID
for i := range feedingPlan.SubPlans {
order := i
feedingPlan.SubPlans[i].OrderInParent = &order
feedingPlan.SubPlans[i].ParentID = &feedingPlan.ID
// 递归创建子计划
if err := f.createFeedingPlanWithTx(tx, &feedingPlan.SubPlans[i]); err != nil {
return err
}
}
return nil
}
// clearAllIDs 清空计划及其子计划和步骤的所有ID
func (f *feedPlanRepo) clearAllIDs(plan *model.FeedingPlan) {
// 清空计划ID
plan.ID = 0
// 清空所有步骤的ID和关联的计划ID
for i := range plan.Steps {
plan.Steps[i].ID = 0
plan.Steps[i].PlanID = 0
}
// 清空所有子计划的ID和关联的父计划ID并递归清空子计划的ID
for i := range plan.SubPlans {
plan.SubPlans[i].ID = 0
plan.SubPlans[i].ParentID = nil
f.clearAllIDs(&plan.SubPlans[i])
}
}

View File

@@ -1,84 +0,0 @@
// Package repository 提供数据访问层实现
// 包含各种数据实体的仓库接口和实现
package repository
import (
"git.huangwc.com/pig/pig-farm-controller/internal/model"
"gorm.io/gorm"
)
// OperationHistoryRepo 操作历史仓库接口
type OperationHistoryRepo interface {
// Create 创建操作历史记录
Create(history *model.OperationHistory) error
// FindByUserID 根据用户ID查找操作历史记录
FindByUserID(userID uint) ([]*model.OperationHistory, error)
// FindByID 根据ID查找操作历史记录
FindByID(id uint) (*model.OperationHistory, error)
// List 获取操作历史记录列表(分页)
List(offset, limit int) ([]*model.OperationHistory, error)
// ListByUserID 根据用户ID获取操作历史记录列表(分页)
ListByUserID(userID uint, offset, limit int) ([]*model.OperationHistory, error)
}
// operationHistoryRepo 操作历史仓库实现
type operationHistoryRepo struct {
db *gorm.DB
}
// NewOperationHistoryRepo 创建操作历史仓库实例
func NewOperationHistoryRepo(db *gorm.DB) OperationHistoryRepo {
return &operationHistoryRepo{
db: db,
}
}
// Create 创建操作历史记录
func (r *operationHistoryRepo) Create(history *model.OperationHistory) error {
result := r.db.Create(history)
return result.Error
}
// FindByUserID 根据用户ID查找操作历史记录
func (r *operationHistoryRepo) FindByUserID(userID uint) ([]*model.OperationHistory, error) {
var histories []*model.OperationHistory
result := r.db.Where("user_id = ?", userID).Order("created_at DESC").Find(&histories)
if result.Error != nil {
return nil, result.Error
}
return histories, nil
}
// FindByID 根据ID查找操作历史记录
func (r *operationHistoryRepo) FindByID(id uint) (*model.OperationHistory, error) {
var history model.OperationHistory
result := r.db.First(&history, id)
if result.Error != nil {
return nil, result.Error
}
return &history, nil
}
// List 获取操作历史记录列表(分页)
func (r *operationHistoryRepo) List(offset, limit int) ([]*model.OperationHistory, error) {
var histories []*model.OperationHistory
result := r.db.Offset(offset).Limit(limit).Order("created_at DESC").Find(&histories)
if result.Error != nil {
return nil, result.Error
}
return histories, nil
}
// ListByUserID 根据用户ID获取操作历史记录列表(分页)
func (r *operationHistoryRepo) ListByUserID(userID uint, offset, limit int) ([]*model.OperationHistory, error) {
var histories []*model.OperationHistory
result := r.db.Where("user_id = ?", userID).Offset(offset).Limit(limit).Order("created_at DESC").Find(&histories)
if result.Error != nil {
return nil, result.Error
}
return histories, nil
}

View File

@@ -1,84 +0,0 @@
// Package repository 提供数据访问层实现
// 包含各种数据实体的仓库接口和实现
package repository
import (
"fmt"
"git.huangwc.com/pig/pig-farm-controller/internal/model"
"golang.org/x/crypto/bcrypt"
"gorm.io/gorm"
)
// UserRepo 用户仓库接口
type UserRepo interface {
// CreateUser 创建新用户
CreateUser(username, password string) (*model.User, error)
// FindByUsername 根据用户名查找用户
FindByUsername(username string) (*model.User, error)
// FindByID 根据ID查找用户
FindByID(id uint) (*model.User, error)
}
// userRepo 用户仓库实现
type userRepo struct {
db *gorm.DB
}
// NewUserRepo 创建用户仓库实例
func NewUserRepo(db *gorm.DB) UserRepo {
return &userRepo{
db: db,
}
}
// CreateUser 创建新用户
func (r *userRepo) CreateUser(username, password string) (*model.User, error) {
// 检查用户是否已存在
var existingUser model.User
result := r.db.Where("username = ?", username).First(&existingUser)
if result.Error == nil {
return nil, fmt.Errorf("用户已存在")
}
// 对密码进行哈希处理
hashedPassword, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost)
if err != nil {
return nil, fmt.Errorf("密码加密失败: %v", err)
}
// 创建新用户
user := &model.User{
Username: username,
PasswordHash: string(hashedPassword),
}
result = r.db.Create(user)
if result.Error != nil {
return nil, fmt.Errorf("用户创建失败: %v", result.Error)
}
return user, nil
}
// FindByUsername 根据用户名查找用户
func (r *userRepo) FindByUsername(username string) (*model.User, error) {
var user model.User
result := r.db.Where("username = ?", username).First(&user)
if result.Error != nil {
return nil, result.Error
}
return &user, nil
}
// FindByID 根据ID查找用户
func (r *userRepo) FindByID(id uint) (*model.User, error) {
var user model.User
result := r.db.First(&user, id)
if result.Error != nil {
return nil, result.Error
}
return &user, nil
}

View File

@@ -1,129 +0,0 @@
# Task包使用说明
## 概述
Task包提供了一个基本的任务队列和执行框架用于管理、调度和执行各种控制任务。它支持并发执行、任务优先级和优雅的任务管理。
## 核心组件
### 1. Task接口
所有任务都需要实现Task接口包含以下方法
- `Execute() error` - 执行任务
- `GetID() string` - 获取任务ID
- `GetPriority() int` - 获取任务优先级
### 2. Executor执行器
负责管理任务队列和执行任务,支持并发执行。
### 3. TaskQueue任务队列
用于存储和管理待执行的任务。
## 使用方法
### 1. 实现任务
首先需要实现Task接口来创建自定义任务
```go
type MyTask struct {
id string
priority int
// 其他任务特定字段
}
func (t *MyTask) Execute() error {
// 实现任务逻辑
return nil
}
func (t *MyTask) GetID() string {
return t.id
}
func (t *MyTask) GetPriority() int {
return t.priority
}
```
### 2. 创建和启动执行器
```go
// 创建执行器,指定工作协程数量
executor := task.NewExecutor(5) // 5个工作协程
// 启动执行器
executor.Start()
```
### 3. 提交任务
```go
// 创建任务实例
myTask := NewMyTask("task-1", 1)
// 提交任务到执行器
executor.SubmitTask(myTask)
```
### 4. 停止执行器
```go
// 停止执行器(会等待所有正在执行的任务完成)
executor.Stop()
```
## 处理定时循环任务
对于定时循环任务,建议采用以下方式:
1. 使用`time.Ticker`定期创建任务
2. 将任务提交到执行器
```go
func RunScheduledTasks(executor *task.Executor) {
// 启动一个协程来定期提交定时任务
go func() {
ticker := time.NewTicker(30 * time.Second) // 每30秒执行一次
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 创建定时任务并提交
scheduledTask := NewScheduledTask("scheduled-task", 1)
executor.SubmitTask(scheduledTask)
case <-executor.ctx.Done():
return // 执行器已停止
}
}
}()
}
```
## 处理互不相关的独立任务
对于互不相关的独立任务,可以直接创建并提交:
```go
// 创建多个独立任务
task1 := NewIndependentTask("task-1", "data1", 1)
task2 := NewIndependentTask("task-2", "data2", 2)
task3 := NewIndependentTask("task-3", "data3", 1)
// 提交所有任务
executor.SubmitTask(task1)
executor.SubmitTask(task2)
executor.SubmitTask(task3)
```
## 最佳实践
1. **合理设置工作协程数量**:根据系统资源和任务特性设置适当的工作协程数量
2. **正确处理任务错误**在任务的Execute方法中正确处理和返回错误
3. **合理设置任务优先级**:重要的任务可以设置更高的优先级
4. **优雅关闭**使用Stop方法确保所有任务都能正确完成
5. **避免任务阻塞**:任务执行时间过长会阻塞工作协程
## 示例
请参考 [example_task.go](./example_task.go) 和 [usage_example.go](./usage_example.go) 文件获取完整的使用示例。

View File

@@ -24,9 +24,6 @@ type Task interface {
// GetPriority 获取任务优先级
GetPriority() int
// Done 返回一个channel当任务执行完毕时该channel会被关闭
Done() <-chan struct{}
// IsDone 检查任务是否已完成
IsDone() bool
}

View File

@@ -1,164 +0,0 @@
package mocks
// Package mocks 模拟测试包
import (
"git.huangwc.com/pig/pig-farm-controller/internal/model"
"github.com/stretchr/testify/mock"
)
// MockDeviceRepo 模拟设备仓库实现DeviceRepo接口
type MockDeviceRepo struct {
mock.Mock
}
// Create 模拟创建设备方法
func (m *MockDeviceRepo) Create(device *model.Device) error {
args := m.Called(device)
return args.Error(0)
}
// FindByID 模拟根据ID查找设备方法
func (m *MockDeviceRepo) FindByID(id uint) (*model.Device, error) {
args := m.Called(id)
// 返回第一个参数作为设备,第二个参数作为错误
device, ok := args.Get(0).(*model.Device)
if !ok {
return nil, args.Error(1)
}
return device, args.Error(1)
}
// FindByIDString 模拟根据ID字符串查找设备方法
func (m *MockDeviceRepo) FindByIDString(id string) (*model.Device, error) {
args := m.Called(id)
// 返回第一个参数作为设备,第二个参数作为错误
device, ok := args.Get(0).(*model.Device)
if !ok {
return nil, args.Error(1)
}
return device, args.Error(1)
}
// FindByParentID 模拟根据上级设备ID查找设备方法
func (m *MockDeviceRepo) FindByParentID(parentID uint) ([]*model.Device, error) {
args := m.Called(parentID)
// 返回第一个参数作为设备列表,第二个参数作为错误
devices, ok := args.Get(0).([]*model.Device)
if !ok {
return nil, args.Error(1)
}
return devices, args.Error(1)
}
// FindByType 模拟根据设备类型查找设备方法
func (m *MockDeviceRepo) FindByType(deviceType model.DeviceType) ([]*model.Device, error) {
args := m.Called(deviceType)
// 返回第一个参数作为设备列表,第二个参数作为错误
devices, ok := args.Get(0).([]*model.Device)
if !ok {
return nil, args.Error(1)
}
return devices, args.Error(1)
}
// Update 模拟更新设备信息方法
func (m *MockDeviceRepo) Update(device *model.Device) error {
args := m.Called(device)
return args.Error(0)
}
// Delete 模拟删除设备方法
func (m *MockDeviceRepo) Delete(id uint) error {
args := m.Called(id)
return args.Error(0)
}
// ListAll 模拟获取所有设备列表方法
func (m *MockDeviceRepo) ListAll() ([]model.Device, error) {
args := m.Called()
// 返回第一个参数作为设备列表,第二个参数作为错误
devices, ok := args.Get(0).([]model.Device)
if !ok {
return nil, args.Error(1)
}
return devices, args.Error(1)
}
// FindRelayDevices 模拟获取所有中继设备方法
func (m *MockDeviceRepo) FindRelayDevices() ([]*model.Device, error) {
args := m.Called()
// 返回第一个参数作为设备列表,第二个参数作为错误
devices, ok := args.Get(0).([]*model.Device)
if !ok {
return nil, args.Error(1)
}
return devices, args.Error(1)
}
// FindByDeviceID 模拟根据设备ID查找设备方法额外方法
func (m *MockDeviceRepo) FindByDeviceID(deviceID string) (*model.Device, error) {
args := m.Called(deviceID)
// 返回第一个参数作为设备,第二个参数作为错误
device, ok := args.Get(0).(*model.Device)
if !ok {
return nil, args.Error(1)
}
return device, args.Error(1)
}
// FindControllers 模拟查找控制器方法(额外方法)
func (m *MockDeviceRepo) FindControllers() ([]*model.Device, error) {
args := m.Called()
// 返回第一个参数作为设备列表,第二个参数作为错误
devices, ok := args.Get(0).([]*model.Device)
if !ok {
return nil, args.Error(1)
}
return devices, args.Error(1)
}
// FindRelays 模拟查找中继设备方法(额外方法)
func (m *MockDeviceRepo) FindRelays() ([]*model.Device, error) {
args := m.Called()
// 返回第一个参数作为设备列表,第二个参数作为错误
devices, ok := args.Get(0).([]*model.Device)
if !ok {
return nil, args.Error(1)
}
return devices, args.Error(1)
}
// FindDevicesByType 模拟根据类型查找设备方法(额外方法)
func (m *MockDeviceRepo) FindDevicesByType(deviceType string) ([]*model.Device, error) {
args := m.Called(deviceType)
// 返回第一个参数作为设备列表,第二个参数作为错误
devices, ok := args.Get(0).([]*model.Device)
if !ok {
return nil, args.Error(1)
}
return devices, args.Error(1)
}
// FindRelayDevices 模拟根据中继ID查找设备方法额外方法
func (m *MockDeviceRepo) FindRelayDevicesByID(relayID uint) ([]*model.Device, error) {
args := m.Called(relayID)
// 返回第一个参数作为设备列表,第二个参数作为错误
devices, ok := args.Get(0).([]*model.Device)
if !ok {
return nil, args.Error(1)
}
return devices, args.Error(1)
}
// UpdateDeviceStatus 模拟更新设备状态方法(额外方法)
func (m *MockDeviceRepo) UpdateDeviceStatus(id uint, active bool) error {
args := m.Called(id, active)
return args.Error(0)
}
// GetDeviceStatus 模拟获取设备状态方法(额外方法)
func (m *MockDeviceRepo) GetDeviceStatus(id uint) (bool, error) {
args := m.Called(id)
return args.Bool(0), args.Error(1)
}

View File

@@ -1,209 +0,0 @@
// Package websocket 提供WebSocket通信功能
// 实现中继设备与平台之间的实时通信
package websocket
import (
"fmt"
"net/http"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"github.com/gorilla/websocket"
)
// Message WebSocket消息结构
type Message struct {
DeviceID string `json:"device_id"`
Type string `json:"type"`
Data interface{} `json:"data"`
Timestamp time.Time `json:"timestamp"`
}
// Hub WebSocket中心管理所有客户端连接
type Hub struct {
// 注册客户端的通道
register chan *Client
// 注销客户端的通道
unregister chan *Client
// 当前活跃的客户端映射
clients map[*Client]bool
// 广播消息通道
broadcast chan Message
// 设备ID到客户端的映射
deviceClients map[string]*Client
// 日志记录器
logger *logs.Logger
// 互斥锁保护映射
mutex sync.RWMutex
// deviceRepo 设备仓库
deviceRepo repository.DeviceRepo
// 关闭消息
close chan struct{}
}
// Client WebSocket客户端结构
type Client struct {
hub *Hub
// WebSocket连接
conn *websocket.Conn
// 发送缓冲区
send chan Message
// 设备ID
DeviceID string
// HTTP请求
Request *http.Request
// 日志记录器
logger *logs.Logger
}
// NewHub 创建新的WebSocket中心实例
func NewHub(deviceRepo repository.DeviceRepo) *Hub {
return &Hub{
register: make(chan *Client),
unregister: make(chan *Client),
clients: make(map[*Client]bool),
broadcast: make(chan Message),
deviceClients: make(map[string]*Client),
logger: logs.NewLogger(),
deviceRepo: deviceRepo,
close: make(chan struct{}),
}
}
// getDeviceDisplayName 获取设备显示名称
func (h *Hub) getDeviceDisplayName(deviceID string) string {
if h.deviceRepo != nil {
if device, err := h.deviceRepo.FindByIDString(deviceID); err == nil && device != nil {
return fmt.Sprintf("%s(id:%s)", device.Name, deviceID)
}
}
return fmt.Sprintf("未知设备(id:%s)", deviceID)
}
// Run 启动WebSocket中心
func (h *Hub) Run() {
for {
select {
case client := <-h.register:
h.registerClient(client)
case client := <-h.unregister:
h.unregisterClient(client)
case message := <-h.broadcast:
h.broadcastMessage(message)
case <-h.close:
return
}
}
}
func (h *Hub) Close() {
// 关闭时清理所有资源
for client := range h.clients {
h.unregisterClient(client)
}
close(h.close)
}
// registerClient 注册客户端
func (h *Hub) registerClient(client *Client) {
h.mutex.Lock()
defer h.mutex.Unlock()
h.clients[client] = true
if client.DeviceID != "" {
h.deviceClients[client.DeviceID] = client
}
deviceName := h.getDeviceDisplayName(client.DeviceID)
h.logger.Info("[WebSocket] 客户端 " + deviceName + " 已注册,当前客户端数: " + fmt.Sprintf("%d", len(h.clients)))
}
// unregisterClient 注销客户端
func (h *Hub) unregisterClient(client *Client) {
h.mutex.Lock()
defer h.mutex.Unlock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
if client.DeviceID != "" {
delete(h.deviceClients, client.DeviceID)
}
close(client.send)
}
deviceName := h.getDeviceDisplayName(client.DeviceID)
h.logger.Info("[WebSocket] 客户端 " + deviceName + " 已注销,当前客户端数: " + fmt.Sprintf("%d", len(h.clients)))
}
// broadcastMessage 广播消息
func (h *Hub) broadcastMessage(message Message) {
h.mutex.RLock()
defer h.mutex.RUnlock()
if client, exists := h.deviceClients[message.DeviceID]; exists {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
delete(h.deviceClients, message.DeviceID)
}
}
}
// SendToDevice 向指定设备发送消息
func (h *Hub) SendToDevice(deviceID string, msgType string, data interface{}) error {
h.mutex.RLock()
defer h.mutex.RUnlock()
deviceName := h.getDeviceDisplayName(deviceID)
if client, exists := h.deviceClients[deviceID]; exists {
message := Message{
DeviceID: deviceID,
Type: msgType,
Data: data,
Timestamp: time.Now(),
}
select {
case client.send <- message:
h.logger.Info(fmt.Sprintf("[WebSocket] 向设备 %s 发送消息: %s", deviceName, msgType))
return nil
default:
h.logger.Error(fmt.Sprintf("[WebSocket] 设备 %s 消息通道已满", deviceName))
return fmt.Errorf("设备 %s 消息通道已满", deviceName)
}
}
h.logger.Warn(fmt.Sprintf("[WebSocket] 设备 %s 未连接", deviceName))
return fmt.Errorf("设备 %s 未连接", deviceName)
}
// GetConnectedDevices 获取已连接的设备列表
func (h *Hub) GetConnectedDevices() []string {
h.mutex.RLock()
defer h.mutex.RUnlock()
devices := make([]string, 0, len(h.deviceClients))
for deviceID := range h.deviceClients {
devices = append(devices, deviceID)
}
return devices
}

View File

@@ -1,394 +0,0 @@
// Package websocket 提供WebSocket通信功能
// 实现中继设备和平台之间的双向通信
package websocket
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
// WebSocket消息类型常量
const (
// MessageTypeCommand 平台向设备发送的指令
MessageTypeCommand = "command"
// MessageTypeResponse 设备向平台发送的响应
MessageTypeResponse = "response"
// MessageTypeHeartbeat 心跳消息
MessageTypeHeartbeat = "heartbeat"
)
// WebSocketMessage WebSocket消息结构
type WebSocketMessage struct {
// Type 消息类型
Type string `json:"type"`
// DeviceID 设备ID
DeviceID string `json:"device_id,omitempty"`
// Command 指令内容
Command string `json:"command,omitempty"`
// Data 消息数据
Data interface{} `json:"data,omitempty"`
// Timestamp 时间戳
Timestamp time.Time `json:"timestamp"`
}
// DeviceConnection 设备连接信息
type DeviceConnection struct {
// DeviceID 设备ID
DeviceID string
// Connection WebSocket连接
Connection *websocket.Conn
// LastHeartbeat 最后心跳时间
LastHeartbeat time.Time
// ResponseChan 响应通道
ResponseChan chan *WebSocketMessage
}
// CommandResponse WebSocket命令响应结构体
type CommandResponse struct {
// DeviceID 设备ID
DeviceID string `json:"device_id,omitempty"`
// Command 命令名称
Command string `json:"command,omitempty"`
// Data 响应数据
Data interface{} `json:"data,omitempty"`
// Status 响应状态
Status string `json:"status,omitempty"`
// Message 响应消息
Message string `json:"message,omitempty"`
// Timestamp 时间戳
Timestamp time.Time `json:"timestamp"`
}
// ParseData 将响应数据解析到目标结构体
func (cr *CommandResponse) ParseData(target interface{}) error {
dataBytes, err := json.Marshal(cr.Data)
if err != nil {
return err
}
return json.Unmarshal(dataBytes, target)
}
// CommandResult WebSocket命令执行结果
type CommandResult struct {
// Response 响应消息
Response *CommandResponse
// Error 错误信息
Error error
}
// Manager WebSocket管理器
type Manager struct {
// connections 设备连接映射
connections map[string]*DeviceConnection
// mutex 互斥锁
mutex sync.RWMutex
// logger 日志记录器
logger *logs.Logger
// upgrader WebSocket升级器
upgrader websocket.Upgrader
// defaultTimeout 默认超时时间(秒)
defaultTimeout int
// deviceRepo 设备仓库
deviceRepo repository.DeviceRepo
}
// NewManager 创建WebSocket管理器实例
func NewManager(deviceRepo repository.DeviceRepo) *Manager {
return &Manager{
connections: make(map[string]*DeviceConnection),
logger: logs.NewLogger(),
defaultTimeout: 5, // 默认5秒超时
deviceRepo: deviceRepo,
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// 允许所有跨域请求
return true
},
},
}
}
// SetDefaultTimeout 设置默认超时时间
func (wm *Manager) SetDefaultTimeout(timeout int) {
wm.defaultTimeout = timeout
}
// getDeviceDisplayName 获取设备显示名称
func (wm *Manager) getDeviceDisplayName(deviceID string) string {
if wm.deviceRepo != nil {
if device, err := wm.deviceRepo.FindByIDString(deviceID); err == nil && device != nil {
return fmt.Sprintf("%s(id:%s)", device.Name, deviceID)
}
}
return fmt.Sprintf("未知设备(id:%s)", deviceID)
}
// AddConnection 添加设备连接
func (wm *Manager) AddConnection(deviceID string, conn *websocket.Conn) {
wm.mutex.Lock()
defer wm.mutex.Unlock()
wm.connections[deviceID] = &DeviceConnection{
DeviceID: deviceID,
Connection: conn,
LastHeartbeat: time.Now(),
}
deviceName := wm.getDeviceDisplayName(deviceID)
wm.logger.Info(fmt.Sprintf("设备 %s 已连接", deviceName))
}
// RemoveConnection 移除设备连接
func (wm *Manager) RemoveConnection(deviceID string) {
wm.mutex.Lock()
defer wm.mutex.Unlock()
deviceName := wm.getDeviceDisplayName(deviceID)
delete(wm.connections, deviceID)
wm.logger.Info(fmt.Sprintf("设备 %s 已断开连接", deviceName))
}
// SetResponseHandler 设置响应处理器
func (wm *Manager) SetResponseHandler(deviceID string, responseChan chan *WebSocketMessage) {
wm.mutex.Lock()
defer wm.mutex.Unlock()
if deviceConn, exists := wm.connections[deviceID]; exists {
deviceConn.ResponseChan = responseChan
}
}
// SendCommand 向指定设备发送指令
func (wm *Manager) SendCommand(deviceID, command string, data interface{}) error {
wm.mutex.RLock()
deviceConn, exists := wm.connections[deviceID]
wm.mutex.RUnlock()
deviceName := wm.getDeviceDisplayName(deviceID)
if !exists {
return fmt.Errorf("设备 %s 未连接", deviceName)
}
// 构造消息
msg := WebSocketMessage{
Type: MessageTypeCommand,
Command: command,
Data: data,
Timestamp: time.Now(),
}
// 发送消息
if err := deviceConn.Connection.WriteJSON(msg); err != nil {
return fmt.Errorf("向设备 %s 发送指令失败: %v", deviceName, err)
}
return nil
}
// SendCommandAndWait 发送指令并等待响应
func (wm *Manager) SendCommandAndWait(deviceID, command string, data interface{}, timeout int) (*CommandResponse, error) {
deviceName := wm.getDeviceDisplayName(deviceID)
// 如果未指定超时时间,使用默认超时时间
if timeout <= 0 {
timeout = wm.defaultTimeout
}
// 创建用于接收响应的通道
responseChan := make(chan *WebSocketMessage, 1)
wm.SetResponseHandler(deviceID, responseChan)
// 发送指令
if err := wm.SendCommand(deviceID, command, data); err != nil {
return nil, fmt.Errorf("发送指令失败: %v", err)
}
// 等待设备响应,设置超时
var response *WebSocketMessage
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
defer cancel()
select {
case response = <-responseChan:
// 成功接收到响应
// 转换为CommandResponse结构体
commandResponse := &CommandResponse{
DeviceID: response.DeviceID,
Command: response.Command,
Data: response.Data,
Timestamp: response.Timestamp,
}
// 尝试提取状态和消息字段
if responseData, ok := response.Data.(map[string]interface{}); ok {
if status, exists := responseData["status"]; exists {
if statusStr, ok := status.(string); ok {
commandResponse.Status = statusStr
}
}
if message, exists := responseData["message"]; exists {
if messageStr, ok := message.(string); ok {
commandResponse.Message = messageStr
}
}
}
return commandResponse, nil
case <-ctx.Done():
// 超时处理
return nil, fmt.Errorf("等待设备 %s 响应超时", deviceName)
}
}
// GetConnectedDevices 获取已连接的设备列表
func (wm *Manager) GetConnectedDevices() []string {
wm.mutex.RLock()
defer wm.mutex.RUnlock()
devices := make([]string, 0, len(wm.connections))
for deviceID := range wm.connections {
devices = append(devices, deviceID)
}
return devices
}
// HandleMessage 处理来自设备的消息
func (wm *Manager) HandleMessage(deviceID string, message []byte) error {
// 解析消息
var msg WebSocketMessage
if err := json.Unmarshal(message, &msg); err != nil {
return fmt.Errorf("解析设备 %s 消息失败: %v", wm.getDeviceDisplayName(deviceID), err)
}
// 更新心跳时间
if msg.Type == MessageTypeHeartbeat {
wm.mutex.Lock()
if deviceConn, exists := wm.connections[deviceID]; exists {
deviceConn.LastHeartbeat = time.Now()
}
wm.mutex.Unlock()
}
// 处理响应消息
if msg.Type == MessageTypeResponse {
wm.mutex.RLock()
if deviceConn, exists := wm.connections[deviceID]; exists && deviceConn.ResponseChan != nil {
// 发送响应到通道
select {
case deviceConn.ResponseChan <- &msg:
// 成功发送
default:
// 通道已满,丢弃消息
wm.logger.Warn(fmt.Sprintf("设备 %s 的响应通道已满,丢弃响应消息", wm.getDeviceDisplayName(deviceID)))
}
}
wm.mutex.RUnlock()
}
// 记录消息日志
wm.logger.Info(fmt.Sprintf("收到来自设备 %s 的消息: %v", wm.getDeviceDisplayName(deviceID), msg))
return nil
}
// GetDeviceConnection 获取设备连接信息
func (wm *Manager) GetDeviceConnection(deviceID string) (*DeviceConnection, bool) {
wm.mutex.RLock()
defer wm.mutex.RUnlock()
deviceConn, exists := wm.connections[deviceID]
return deviceConn, exists
}
// HandleConnection 处理WebSocket连接
func (wm *Manager) HandleConnection(c *gin.Context) {
// 升级HTTP连接到WebSocket
conn, err := wm.upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
wm.logger.Error(fmt.Sprintf("WebSocket连接升级失败: %v", err))
return
}
// 获取设备ID
deviceID := c.Query("device_id")
if deviceID == "" {
wm.logger.Error("缺少设备ID参数")
conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "缺少设备ID参数"))
conn.Close()
return
}
// 添加连接
wm.AddConnection(deviceID, conn)
deviceName := wm.getDeviceDisplayName(deviceID)
wm.logger.Info("设备 " + deviceName + " 已连接")
// 发送连接成功消息
successMsg := WebSocketMessage{
Type: "system",
Command: "connected",
Timestamp: time.Now(),
}
conn.WriteJSON(successMsg)
// 处理消息循环
for {
// 读取消息
messageType, message, err := conn.ReadMessage()
if err != nil {
wm.logger.Error(fmt.Sprintf("读取设备 %s 消息失败: %v", deviceName, err))
break
}
// 只处理文本消息
if messageType != websocket.TextMessage {
continue
}
// 处理设备消息
if err := wm.HandleMessage(deviceID, message); err != nil {
wm.logger.Error(fmt.Sprintf("处理设备 %s 消息失败: %v", deviceName, err))
continue
}
}
// 连接断开时清理
wm.RemoveConnection(deviceID)
conn.Close()
wm.logger.Info("设备 " + deviceName + " 已断开连接")
}

View File

@@ -1,214 +0,0 @@
// Package websocket 提供WebSocket通信功能
// 实现中继设备与平台之间的实时通信
package websocket
import (
"encoding/json"
"fmt"
"net/http"
"time"
"git.huangwc.com/pig/pig-farm-controller/internal/logs"
"git.huangwc.com/pig/pig-farm-controller/internal/storage/repository"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
// Server WebSocket服务器结构
type Server struct {
hub *Hub
logger *logs.Logger
deviceRepo repository.DeviceRepo
}
const (
// 允许写入的最长时间
writeWait = 10 * time.Second
// 允许读取的最长时间
pongWait = 60 * time.Second
// 发送ping消息的周期
pingPeriod = (pongWait * 9) / 10
// 发送队列的最大容量
maxMessageSize = 512
)
var (
newline = []byte{'\n'}
space = []byte{' '}
)
// Upgrader WebSocket升级器
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// 允许所有来源的连接(在生产环境中应该更严格)
return true
},
}
// NewServer 创建新的WebSocket服务器实例
func NewServer(deviceRepo repository.DeviceRepo) *Server {
return &Server{
hub: NewHub(deviceRepo),
logger: logs.NewLogger(),
deviceRepo: deviceRepo,
}
}
// getDeviceDisplayName 获取设备显示名称
func (s *Server) getDeviceDisplayName(deviceID string) string {
if s.deviceRepo != nil {
if device, err := s.deviceRepo.FindByIDString(deviceID); err == nil && device != nil {
return fmt.Sprintf("%s(id:%s)", device.Name, deviceID)
}
}
return fmt.Sprintf("未知设备(id:%s)", deviceID)
}
// Start 启动WebSocket服务器
func (s *Server) Start() {
// 启动hub
go s.hub.Run()
}
func (s *Server) Stop() {
s.hub.Close()
}
// readPump 从WebSocket连接读取消息
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
var msg Message
err := c.conn.ReadJSON(&msg)
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
c.logger.Error("[WebSocket] 读取错误: " + err.Error())
}
break
}
// 处理收到的消息
c.hub.broadcast <- msg
}
}
// writePump 向WebSocket连接写入消息
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// hub关闭了send通道
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
// 将消息序列化为JSON
data, err := json.Marshal(message)
if err != nil {
c.logger.Error("[WebSocket] 消息序列化失败: " + err.Error())
continue
}
w.Write(data)
// 添加队列中的其他消息
n := len(c.send)
for i := 0; i < n; i++ {
msg := <-c.send
data, err := json.Marshal(msg)
if err != nil {
c.logger.Error("[WebSocket] 消息序列化失败: " + err.Error())
continue
}
w.Write(newline)
w.Write(data)
}
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// HandleConnection 处理WebSocket连接请求
func (s *Server) HandleConnection(c *gin.Context) {
// 升级HTTP连接为WebSocket连接
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
s.logger.Error("[WebSocket] 连接升级失败: " + err.Error())
return
}
// 从查询参数获取设备ID
deviceID := c.Query("device_id")
if deviceID == "" {
s.logger.Warn("[WebSocket] 缺少设备ID参数")
conn.Close()
return
}
// 创建客户端
client := &Client{
hub: s.hub,
conn: conn,
send: make(chan Message, 256),
DeviceID: deviceID,
Request: c.Request,
logger: s.logger,
}
// 注册客户端
client.hub.register <- client
// 启动读写goroutine
go client.writePump()
go client.readPump()
deviceName := s.getDeviceDisplayName(deviceID)
s.logger.Info("[WebSocket] 设备 " + deviceName + " 连接成功")
}
// SendToDevice 向指定设备发送消息
func (s *Server) SendToDevice(deviceID string, msgType string, data interface{}) error {
return s.hub.SendToDevice(deviceID, msgType, data)
}
// GetConnectedDevices 获取已连接的设备列表
func (s *Server) GetConnectedDevices() []string {
return s.hub.GetConnectedDevices()
}