Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/MHSanaei/3x-ui.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'web/controller')
-rw-r--r--web/controller/inbound.go12
-rw-r--r--web/controller/server.go17
-rw-r--r--web/controller/websocket.go189
3 files changed, 218 insertions, 0 deletions
diff --git a/web/controller/inbound.go b/web/controller/inbound.go
index eeb160d6..8317de31 100644
--- a/web/controller/inbound.go
+++ b/web/controller/inbound.go
@@ -8,6 +8,7 @@ import (
"github.com/mhsanaei/3x-ui/v2/database/model"
"github.com/mhsanaei/3x-ui/v2/web/service"
"github.com/mhsanaei/3x-ui/v2/web/session"
+ "github.com/mhsanaei/3x-ui/v2/web/websocket"
"github.com/gin-gonic/gin"
)
@@ -125,6 +126,9 @@ func (a *InboundController) addInbound(c *gin.Context) {
if needRestart {
a.xrayService.SetToNeedRestart()
}
+ // Broadcast inbounds update via WebSocket
+ inbounds, _ := a.inboundService.GetInbounds(user.Id)
+ websocket.BroadcastInbounds(inbounds)
}
// delInbound deletes an inbound configuration by its ID.
@@ -143,6 +147,10 @@ func (a *InboundController) delInbound(c *gin.Context) {
if needRestart {
a.xrayService.SetToNeedRestart()
}
+ // Broadcast inbounds update via WebSocket
+ user := session.GetLoginUser(c)
+ inbounds, _ := a.inboundService.GetInbounds(user.Id)
+ websocket.BroadcastInbounds(inbounds)
}
// updateInbound updates an existing inbound configuration.
@@ -169,6 +177,10 @@ func (a *InboundController) updateInbound(c *gin.Context) {
if needRestart {
a.xrayService.SetToNeedRestart()
}
+ // Broadcast inbounds update via WebSocket
+ user := session.GetLoginUser(c)
+ inbounds, _ := a.inboundService.GetInbounds(user.Id)
+ websocket.BroadcastInbounds(inbounds)
}
// getClientIps retrieves the IP addresses associated with a client by email.
diff --git a/web/controller/server.go b/web/controller/server.go
index 292ef338..5b39700e 100644
--- a/web/controller/server.go
+++ b/web/controller/server.go
@@ -9,6 +9,7 @@ import (
"github.com/mhsanaei/3x-ui/v2/web/global"
"github.com/mhsanaei/3x-ui/v2/web/service"
+ "github.com/mhsanaei/3x-ui/v2/web/websocket"
"github.com/gin-gonic/gin"
)
@@ -67,6 +68,8 @@ func (a *ServerController) refreshStatus() {
// collect cpu history when status is fresh
if a.lastStatus != nil {
a.serverService.AppendCpuSample(time.Now(), a.lastStatus.Cpu)
+ // Broadcast status update via WebSocket
+ websocket.BroadcastStatus(a.lastStatus)
}
}
@@ -155,9 +158,16 @@ func (a *ServerController) stopXrayService(c *gin.Context) {
err := a.serverService.StopXrayService()
if err != nil {
jsonMsg(c, I18nWeb(c, "pages.xray.stopError"), err)
+ websocket.BroadcastXrayState("error", err.Error())
return
}
jsonMsg(c, I18nWeb(c, "pages.xray.stopSuccess"), err)
+ websocket.BroadcastXrayState("stop", "")
+ websocket.BroadcastNotification(
+ I18nWeb(c, "pages.xray.stopSuccess"),
+ "Xray service has been stopped",
+ "warning",
+ )
}
// restartXrayService restarts the Xray service.
@@ -165,9 +175,16 @@ func (a *ServerController) restartXrayService(c *gin.Context) {
err := a.serverService.RestartXrayService()
if err != nil {
jsonMsg(c, I18nWeb(c, "pages.xray.restartError"), err)
+ websocket.BroadcastXrayState("error", err.Error())
return
}
jsonMsg(c, I18nWeb(c, "pages.xray.restartSuccess"), err)
+ websocket.BroadcastXrayState("running", "")
+ websocket.BroadcastNotification(
+ I18nWeb(c, "pages.xray.restartSuccess"),
+ "Xray service has been restarted successfully",
+ "success",
+ )
}
// getLogs retrieves the application logs based on count, level, and syslog filters.
diff --git a/web/controller/websocket.go b/web/controller/websocket.go
new file mode 100644
index 00000000..0ad5c845
--- /dev/null
+++ b/web/controller/websocket.go
@@ -0,0 +1,189 @@
+package controller
+
+import (
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/mhsanaei/3x-ui/v2/logger"
+ "github.com/mhsanaei/3x-ui/v2/util/common"
+ "github.com/mhsanaei/3x-ui/v2/web/session"
+ "github.com/mhsanaei/3x-ui/v2/web/websocket"
+
+ "github.com/gin-gonic/gin"
+ ws "github.com/gorilla/websocket"
+)
+
+const (
+ // Time allowed to write a message to the peer
+ writeWait = 10 * time.Second
+
+ // Time allowed to read the next pong message from the peer
+ pongWait = 60 * time.Second
+
+ // Send pings to peer with this period (must be less than pongWait)
+ pingPeriod = (pongWait * 9) / 10
+
+ // Maximum message size allowed from peer
+ maxMessageSize = 512
+)
+
+var upgrader = ws.Upgrader{
+ ReadBufferSize: 4096, // Increased from 1024 for better performance
+ WriteBufferSize: 4096, // Increased from 1024 for better performance
+ CheckOrigin: func(r *http.Request) bool {
+ // Check origin for security
+ origin := r.Header.Get("Origin")
+ if origin == "" {
+ // Allow connections without Origin header (same-origin requests)
+ return true
+ }
+ // Get the host from the request
+ host := r.Host
+ // Extract scheme and host from origin
+ originURL := origin
+ // Simple check: origin should match the request host
+ // This prevents cross-origin WebSocket hijacking
+ if strings.HasPrefix(originURL, "http://") || strings.HasPrefix(originURL, "https://") {
+ // Extract host from origin
+ originHost := strings.TrimPrefix(strings.TrimPrefix(originURL, "http://"), "https://")
+ if idx := strings.Index(originHost, "/"); idx != -1 {
+ originHost = originHost[:idx]
+ }
+ if idx := strings.Index(originHost, ":"); idx != -1 {
+ originHost = originHost[:idx]
+ }
+ // Compare hosts (without port)
+ requestHost := host
+ if idx := strings.Index(requestHost, ":"); idx != -1 {
+ requestHost = requestHost[:idx]
+ }
+ return originHost == requestHost || originHost == "" || requestHost == ""
+ }
+ return false
+ },
+}
+
+// WebSocketController handles WebSocket connections for real-time updates
+type WebSocketController struct {
+ BaseController
+ hub *websocket.Hub
+}
+
+// NewWebSocketController creates a new WebSocket controller
+func NewWebSocketController(hub *websocket.Hub) *WebSocketController {
+ return &WebSocketController{
+ hub: hub,
+ }
+}
+
+// HandleWebSocket handles WebSocket connections
+func (w *WebSocketController) HandleWebSocket(c *gin.Context) {
+ // Check authentication
+ if !session.IsLogin(c) {
+ logger.Warningf("Unauthorized WebSocket connection attempt from %s", getRemoteIp(c))
+ c.AbortWithStatus(http.StatusUnauthorized)
+ return
+ }
+
+ // Upgrade connection to WebSocket
+ conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
+ if err != nil {
+ logger.Error("Failed to upgrade WebSocket connection:", err)
+ return
+ }
+
+ // Create client
+ clientID := uuid.New().String()
+ client := &websocket.Client{
+ ID: clientID,
+ Hub: w.hub,
+ Send: make(chan []byte, 512), // Increased from 256 to 512 to prevent overflow
+ Topics: make(map[websocket.MessageType]bool),
+ }
+
+ // Register client
+ w.hub.Register(client)
+ logger.Debugf("WebSocket client %s registered from %s", clientID, getRemoteIp(c))
+
+ // Start goroutines for reading and writing
+ go w.writePump(client, conn)
+ go w.readPump(client, conn)
+}
+
+// readPump pumps messages from the WebSocket connection to the hub
+func (w *WebSocketController) readPump(client *websocket.Client, conn *ws.Conn) {
+ defer func() {
+ if r := common.Recover("WebSocket readPump panic"); r != nil {
+ logger.Error("WebSocket readPump panic recovered:", r)
+ }
+ w.hub.Unregister(client)
+ conn.Close()
+ }()
+
+ conn.SetReadDeadline(time.Now().Add(pongWait))
+ conn.SetPongHandler(func(string) error {
+ conn.SetReadDeadline(time.Now().Add(pongWait))
+ return nil
+ })
+ conn.SetReadLimit(maxMessageSize)
+
+ for {
+ _, message, err := conn.ReadMessage()
+ if err != nil {
+ if ws.IsUnexpectedCloseError(err, ws.CloseGoingAway, ws.CloseAbnormalClosure) {
+ logger.Debugf("WebSocket read error for client %s: %v", client.ID, err)
+ }
+ break
+ }
+
+ // Validate message size
+ if len(message) > maxMessageSize {
+ logger.Warningf("WebSocket message from client %s exceeds max size: %d bytes", client.ID, len(message))
+ continue
+ }
+
+ // Handle incoming messages (e.g., subscription requests)
+ // For now, we'll just log them
+ logger.Debugf("Received WebSocket message from client %s: %s", client.ID, string(message))
+ }
+}
+
+// writePump pumps messages from the hub to the WebSocket connection
+func (w *WebSocketController) writePump(client *websocket.Client, conn *ws.Conn) {
+ ticker := time.NewTicker(pingPeriod)
+ defer func() {
+ if r := common.Recover("WebSocket writePump panic"); r != nil {
+ logger.Error("WebSocket writePump panic recovered:", r)
+ }
+ ticker.Stop()
+ conn.Close()
+ }()
+
+ for {
+ select {
+ case message, ok := <-client.Send:
+ conn.SetWriteDeadline(time.Now().Add(writeWait))
+ if !ok {
+ // Hub closed the channel
+ conn.WriteMessage(ws.CloseMessage, []byte{})
+ return
+ }
+
+ // Send each message individually (no batching)
+ // This ensures each JSON message is sent separately and can be parsed correctly
+ if err := conn.WriteMessage(ws.TextMessage, message); err != nil {
+ logger.Debugf("WebSocket write error for client %s: %v", client.ID, err)
+ return
+ }
+
+ case <-ticker.C:
+ conn.SetWriteDeadline(time.Now().Add(writeWait))
+ if err := conn.WriteMessage(ws.PingMessage, nil); err != nil {
+ logger.Debugf("WebSocket ping error for client %s: %v", client.ID, err)
+ return
+ }
+ }
+ }
+}