diff options
Diffstat (limited to 'web/controller')
| -rw-r--r-- | web/controller/inbound.go | 12 | ||||
| -rw-r--r-- | web/controller/server.go | 17 | ||||
| -rw-r--r-- | web/controller/websocket.go | 189 |
3 files changed, 218 insertions, 0 deletions
diff --git a/web/controller/inbound.go b/web/controller/inbound.go index eeb160d6..8317de31 100644 --- a/web/controller/inbound.go +++ b/web/controller/inbound.go @@ -8,6 +8,7 @@ import ( "github.com/mhsanaei/3x-ui/v2/database/model" "github.com/mhsanaei/3x-ui/v2/web/service" "github.com/mhsanaei/3x-ui/v2/web/session" + "github.com/mhsanaei/3x-ui/v2/web/websocket" "github.com/gin-gonic/gin" ) @@ -125,6 +126,9 @@ func (a *InboundController) addInbound(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + // Broadcast inbounds update via WebSocket + inbounds, _ := a.inboundService.GetInbounds(user.Id) + websocket.BroadcastInbounds(inbounds) } // delInbound deletes an inbound configuration by its ID. @@ -143,6 +147,10 @@ func (a *InboundController) delInbound(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + // Broadcast inbounds update via WebSocket + user := session.GetLoginUser(c) + inbounds, _ := a.inboundService.GetInbounds(user.Id) + websocket.BroadcastInbounds(inbounds) } // updateInbound updates an existing inbound configuration. @@ -169,6 +177,10 @@ func (a *InboundController) updateInbound(c *gin.Context) { if needRestart { a.xrayService.SetToNeedRestart() } + // Broadcast inbounds update via WebSocket + user := session.GetLoginUser(c) + inbounds, _ := a.inboundService.GetInbounds(user.Id) + websocket.BroadcastInbounds(inbounds) } // getClientIps retrieves the IP addresses associated with a client by email. diff --git a/web/controller/server.go b/web/controller/server.go index 292ef338..5b39700e 100644 --- a/web/controller/server.go +++ b/web/controller/server.go @@ -9,6 +9,7 @@ import ( "github.com/mhsanaei/3x-ui/v2/web/global" "github.com/mhsanaei/3x-ui/v2/web/service" + "github.com/mhsanaei/3x-ui/v2/web/websocket" "github.com/gin-gonic/gin" ) @@ -67,6 +68,8 @@ func (a *ServerController) refreshStatus() { // collect cpu history when status is fresh if a.lastStatus != nil { a.serverService.AppendCpuSample(time.Now(), a.lastStatus.Cpu) + // Broadcast status update via WebSocket + websocket.BroadcastStatus(a.lastStatus) } } @@ -155,9 +158,16 @@ func (a *ServerController) stopXrayService(c *gin.Context) { err := a.serverService.StopXrayService() if err != nil { jsonMsg(c, I18nWeb(c, "pages.xray.stopError"), err) + websocket.BroadcastXrayState("error", err.Error()) return } jsonMsg(c, I18nWeb(c, "pages.xray.stopSuccess"), err) + websocket.BroadcastXrayState("stop", "") + websocket.BroadcastNotification( + I18nWeb(c, "pages.xray.stopSuccess"), + "Xray service has been stopped", + "warning", + ) } // restartXrayService restarts the Xray service. @@ -165,9 +175,16 @@ func (a *ServerController) restartXrayService(c *gin.Context) { err := a.serverService.RestartXrayService() if err != nil { jsonMsg(c, I18nWeb(c, "pages.xray.restartError"), err) + websocket.BroadcastXrayState("error", err.Error()) return } jsonMsg(c, I18nWeb(c, "pages.xray.restartSuccess"), err) + websocket.BroadcastXrayState("running", "") + websocket.BroadcastNotification( + I18nWeb(c, "pages.xray.restartSuccess"), + "Xray service has been restarted successfully", + "success", + ) } // getLogs retrieves the application logs based on count, level, and syslog filters. diff --git a/web/controller/websocket.go b/web/controller/websocket.go new file mode 100644 index 00000000..0ad5c845 --- /dev/null +++ b/web/controller/websocket.go @@ -0,0 +1,189 @@ +package controller + +import ( + "net/http" + "strings" + "time" + + "github.com/google/uuid" + "github.com/mhsanaei/3x-ui/v2/logger" + "github.com/mhsanaei/3x-ui/v2/util/common" + "github.com/mhsanaei/3x-ui/v2/web/session" + "github.com/mhsanaei/3x-ui/v2/web/websocket" + + "github.com/gin-gonic/gin" + ws "github.com/gorilla/websocket" +) + +const ( + // Time allowed to write a message to the peer + writeWait = 10 * time.Second + + // Time allowed to read the next pong message from the peer + pongWait = 60 * time.Second + + // Send pings to peer with this period (must be less than pongWait) + pingPeriod = (pongWait * 9) / 10 + + // Maximum message size allowed from peer + maxMessageSize = 512 +) + +var upgrader = ws.Upgrader{ + ReadBufferSize: 4096, // Increased from 1024 for better performance + WriteBufferSize: 4096, // Increased from 1024 for better performance + CheckOrigin: func(r *http.Request) bool { + // Check origin for security + origin := r.Header.Get("Origin") + if origin == "" { + // Allow connections without Origin header (same-origin requests) + return true + } + // Get the host from the request + host := r.Host + // Extract scheme and host from origin + originURL := origin + // Simple check: origin should match the request host + // This prevents cross-origin WebSocket hijacking + if strings.HasPrefix(originURL, "http://") || strings.HasPrefix(originURL, "https://") { + // Extract host from origin + originHost := strings.TrimPrefix(strings.TrimPrefix(originURL, "http://"), "https://") + if idx := strings.Index(originHost, "/"); idx != -1 { + originHost = originHost[:idx] + } + if idx := strings.Index(originHost, ":"); idx != -1 { + originHost = originHost[:idx] + } + // Compare hosts (without port) + requestHost := host + if idx := strings.Index(requestHost, ":"); idx != -1 { + requestHost = requestHost[:idx] + } + return originHost == requestHost || originHost == "" || requestHost == "" + } + return false + }, +} + +// WebSocketController handles WebSocket connections for real-time updates +type WebSocketController struct { + BaseController + hub *websocket.Hub +} + +// NewWebSocketController creates a new WebSocket controller +func NewWebSocketController(hub *websocket.Hub) *WebSocketController { + return &WebSocketController{ + hub: hub, + } +} + +// HandleWebSocket handles WebSocket connections +func (w *WebSocketController) HandleWebSocket(c *gin.Context) { + // Check authentication + if !session.IsLogin(c) { + logger.Warningf("Unauthorized WebSocket connection attempt from %s", getRemoteIp(c)) + c.AbortWithStatus(http.StatusUnauthorized) + return + } + + // Upgrade connection to WebSocket + conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + logger.Error("Failed to upgrade WebSocket connection:", err) + return + } + + // Create client + clientID := uuid.New().String() + client := &websocket.Client{ + ID: clientID, + Hub: w.hub, + Send: make(chan []byte, 512), // Increased from 256 to 512 to prevent overflow + Topics: make(map[websocket.MessageType]bool), + } + + // Register client + w.hub.Register(client) + logger.Debugf("WebSocket client %s registered from %s", clientID, getRemoteIp(c)) + + // Start goroutines for reading and writing + go w.writePump(client, conn) + go w.readPump(client, conn) +} + +// readPump pumps messages from the WebSocket connection to the hub +func (w *WebSocketController) readPump(client *websocket.Client, conn *ws.Conn) { + defer func() { + if r := common.Recover("WebSocket readPump panic"); r != nil { + logger.Error("WebSocket readPump panic recovered:", r) + } + w.hub.Unregister(client) + conn.Close() + }() + + conn.SetReadDeadline(time.Now().Add(pongWait)) + conn.SetPongHandler(func(string) error { + conn.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + conn.SetReadLimit(maxMessageSize) + + for { + _, message, err := conn.ReadMessage() + if err != nil { + if ws.IsUnexpectedCloseError(err, ws.CloseGoingAway, ws.CloseAbnormalClosure) { + logger.Debugf("WebSocket read error for client %s: %v", client.ID, err) + } + break + } + + // Validate message size + if len(message) > maxMessageSize { + logger.Warningf("WebSocket message from client %s exceeds max size: %d bytes", client.ID, len(message)) + continue + } + + // Handle incoming messages (e.g., subscription requests) + // For now, we'll just log them + logger.Debugf("Received WebSocket message from client %s: %s", client.ID, string(message)) + } +} + +// writePump pumps messages from the hub to the WebSocket connection +func (w *WebSocketController) writePump(client *websocket.Client, conn *ws.Conn) { + ticker := time.NewTicker(pingPeriod) + defer func() { + if r := common.Recover("WebSocket writePump panic"); r != nil { + logger.Error("WebSocket writePump panic recovered:", r) + } + ticker.Stop() + conn.Close() + }() + + for { + select { + case message, ok := <-client.Send: + conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + // Hub closed the channel + conn.WriteMessage(ws.CloseMessage, []byte{}) + return + } + + // Send each message individually (no batching) + // This ensures each JSON message is sent separately and can be parsed correctly + if err := conn.WriteMessage(ws.TextMessage, message); err != nil { + logger.Debugf("WebSocket write error for client %s: %v", client.ID, err) + return + } + + case <-ticker.C: + conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := conn.WriteMessage(ws.PingMessage, nil); err != nil { + logger.Debugf("WebSocket ping error for client %s: %v", client.ID, err) + return + } + } + } +} |
