diff options
| author | lolka1333 <xtrafcyz@gmail.com> | 2026-04-19 22:01:00 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-04-19 22:01:00 +0300 |
| commit | fec714a2431c482024a0952982fa36f38935e7ed (patch) | |
| tree | cb2e206b375a725623a0c0b18a22785d44037d67 /web/websocket | |
| parent | e02f78ac68e96066288c5da0c38e293160b23143 (diff) | |
fix: enhance WebSocket stability, resolve XHTTP configurations and fix UI loading shifts (#3997)
* feat: implement real-time traffic monitoring and UI updates using a high-performance WebSocket hub and background job system
* feat: add bulk client management support and improve inbound data handling
* Fix bug
* **Fixes & Changes:**
1. **Fixed XPadding Placement Dropdown**:
- Added the missing `cookie` and `query` options to `xPaddingPlacement` (`stream_xhttp.html`).
- *Why:* Previously, users wanting `cookie` obfuscation were forced to use the `header` placement string. This caused Xray-core to blindly intercept the entire monolithic HTTP Cookie header, failing internal padding-length validations and causing the inbound to silently drop the connection.
2. **Fixed Uplink Data Placement Validation**:
- Replaced the unsupported `query` option with `cookie` in `uplinkDataPlacement`.
- *Why:* Xray-core's `transport_internet.go` explicitly forbids `query` as an uplink placement option. Selecting it from the UI previously sent a payload that would cause Xray-core to instantly throw an `unsupported uplink data placement: query` panic. Adding `cookie` perfectly aligns the UI with Xray-core restrictions.
### Related Issues
- Resolves #3992
* This commit fixes structural payload issues preventing XHTTP from functioning correctly and eliminates WebSocket log spam.
- **[Fix X-Padding UI]** Added missing `cookie` and `query` options to X-Padding Placement. Fixes the issue where using Cookie fallback triggers whole HTTP Cookie header interception and silent drop in Xray-core. (Resolves [#3992](https://github.com/MHSanaei/3x-ui/issues/3992))
- **[Fix Uplink Data Options]** Replaced the invalid `query` option with `cookie` in Uplink Data Placement dropdown to prevent Xray-core backend panic `unsupported uplink data placement: query`.
- **[Fix WebSockets Spam]** Boosted `maxMessageSize` boundary to 100MB and gracefully handled fallback fetch signals via `broadcastInvalidate` to avoid buffer dropping spam. (Resolves [#3984](https://github.com/MHSanaei/3x-ui/issues/3984))
* Fix
* gofmt
* fix(websocket): resolve channel race condition and graceful shutdown deadlock
* Fix: inbounds switch
* Change max quantity from 10000 to 500
* fix
Diffstat (limited to 'web/websocket')
| -rw-r--r-- | web/websocket/hub.go | 96 | ||||
| -rw-r--r-- | web/websocket/notifier.go | 21 |
2 files changed, 80 insertions, 37 deletions
diff --git a/web/websocket/hub.go b/web/websocket/hub.go index 8aa5903c..1455d1fa 100644 --- a/web/websocket/hub.go +++ b/web/websocket/hub.go @@ -21,6 +21,7 @@ const ( MessageTypeNotification MessageType = "notification" // System notification MessageTypeXrayState MessageType = "xray_state" // Xray state change MessageTypeOutbounds MessageType = "outbounds" // Outbounds list update + MessageTypeInvalidate MessageType = "invalidate" // Lightweight signal telling frontend to re-fetch data via REST ) // Message represents a WebSocket message @@ -32,10 +33,11 @@ type Message struct { // Client represents a WebSocket client connection type Client struct { - ID string - Send chan []byte - Hub *Hub - Topics map[MessageType]bool // Subscribed topics + ID string + Send chan []byte + Hub *Hub + Topics map[MessageType]bool // Subscribed topics + closeOnce sync.Once // Ensures Send channel is closed exactly once } // Hub maintains the set of active clients and broadcasts messages to them @@ -61,7 +63,6 @@ type Hub struct { // Worker pool for parallel broadcasting workerPoolSize int - broadcastWg sync.WaitGroup } // NewHub creates a new WebSocket hub @@ -104,20 +105,12 @@ func (h *Hub) Run() { // Graceful shutdown: close all clients h.mu.Lock() for client := range h.clients { - // Safely close channel (avoid double close panic) - select { - case _, stillOpen := <-client.Send: - if stillOpen { - close(client.Send) - } - default: + client.closeOnce.Do(func() { close(client.Send) - } + }) } h.clients = make(map[*Client]bool) h.mu.Unlock() - // Wait for all broadcast workers to finish - h.broadcastWg.Wait() logger.Info("WebSocket hub stopped gracefully") return @@ -138,19 +131,9 @@ func (h *Hub) Run() { h.mu.Lock() if _, ok := h.clients[client]; ok { delete(h.clients, client) - // Safely close channel (avoid double close panic) - // Check if channel is already closed by trying to read from it - select { - case _, stillOpen := <-client.Send: - if stillOpen { - // Channel was open and had data, now it's empty, safe to close - close(client.Send) - } - // If stillOpen is false, channel was already closed, do nothing - default: - // Channel is empty and open, safe to close + client.closeOnce.Do(func() { close(client.Send) - } + }) } count := len(h.clients) h.mu.Unlock() @@ -220,11 +203,12 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) { } close(clientChan) - // Start workers for parallel processing - h.broadcastWg.Add(h.workerPoolSize) + // Use a local WaitGroup to avoid blocking hub shutdown + var wg sync.WaitGroup + wg.Add(h.workerPoolSize) for i := 0; i < h.workerPoolSize; i++ { go func() { - defer h.broadcastWg.Done() + defer wg.Done() for client := range clientChan { func() { defer func() { @@ -246,7 +230,7 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) { } // Wait for all workers to finish - h.broadcastWg.Wait() + wg.Wait() } // Broadcast sends a message to all connected clients @@ -259,6 +243,11 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) { return } + // Skip all work if no clients are connected + if h.GetClientCount() == 0 { + return + } + msg := Message{ Type: messageType, Payload: payload, @@ -271,10 +260,12 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) { return } - // Limit message size to prevent memory issues - const maxMessageSize = 1024 * 1024 // 1MB + // If message exceeds size limit, send a lightweight invalidate notification + // instead of dropping it entirely — the frontend will re-fetch via REST API + const maxMessageSize = 10 * 1024 * 1024 // 10MB if len(data) > maxMessageSize { - logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data)) + logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType) + h.broadcastInvalidate(messageType) return } @@ -298,6 +289,11 @@ func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) { return } + // Skip all work if no clients are connected + if h.GetClientCount() == 0 { + return + } + msg := Message{ Type: messageType, Payload: payload, @@ -310,10 +306,11 @@ func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) { return } - // Limit message size to prevent memory issues - const maxMessageSize = 1024 * 1024 // 1MB + // If message exceeds size limit, send a lightweight invalidate notification + const maxMessageSize = 10 * 1024 * 1024 // 10MB if len(data) > maxMessageSize { - logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data)) + logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType) + h.broadcastInvalidate(messageType) return } @@ -374,6 +371,31 @@ func (h *Hub) Stop() { } } +// broadcastInvalidate sends a lightweight invalidate message to all clients, +// telling them to re-fetch the specified data type via REST API. +// This is used when the full payload exceeds the WebSocket message size limit. +func (h *Hub) broadcastInvalidate(originalType MessageType) { + msg := Message{ + Type: MessageTypeInvalidate, + Payload: map[string]string{"type": string(originalType)}, + Time: getCurrentTimestamp(), + } + + data, err := json.Marshal(msg) + if err != nil { + logger.Error("Failed to marshal invalidate message:", err) + return + } + + // Non-blocking send with timeout + select { + case h.broadcast <- data: + case <-time.After(100 * time.Millisecond): + logger.Warning("WebSocket broadcast channel is full, dropping invalidate message") + case <-h.ctx.Done(): + } +} + // getCurrentTimestamp returns current Unix timestamp in milliseconds func getCurrentTimestamp() int64 { return time.Now().UnixMilli() diff --git a/web/websocket/notifier.go b/web/websocket/notifier.go index 74cf61b2..2db78578 100644 --- a/web/websocket/notifier.go +++ b/web/websocket/notifier.go @@ -24,6 +24,16 @@ func GetHub() *Hub { return wsHub } +// HasClients returns true if there are any WebSocket clients connected. +// Use this to skip expensive work (DB queries, serialization) when no browser is open. +func HasClients() bool { + hub := GetHub() + if hub == nil { + return false + } + return hub.GetClientCount() > 0 +} + // BroadcastStatus broadcasts server status update to all connected clients func BroadcastStatus(status any) { hub := GetHub() @@ -80,3 +90,14 @@ func BroadcastXrayState(state string, errorMsg string) { hub.Broadcast(MessageTypeXrayState, stateUpdate) } } + +// BroadcastInvalidate sends a lightweight invalidate signal for the given data type, +// telling connected frontends to re-fetch data via REST API. +// Use this instead of BroadcastInbounds/BroadcastOutbounds when you know the payload +// will be too large, to avoid wasting resources on serialization. +func BroadcastInvalidate(dataType MessageType) { + hub := GetHub() + if hub != nil { + hub.broadcastInvalidate(dataType) + } +} |
