Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/MHSanaei/3x-ui.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlolka1333 <xtrafcyz@gmail.com>2026-04-19 22:01:00 +0300
committerGitHub <noreply@github.com>2026-04-19 22:01:00 +0300
commitfec714a2431c482024a0952982fa36f38935e7ed (patch)
treecb2e206b375a725623a0c0b18a22785d44037d67 /web/websocket
parente02f78ac68e96066288c5da0c38e293160b23143 (diff)
fix: enhance WebSocket stability, resolve XHTTP configurations and fix UI loading shifts (#3997)
* feat: implement real-time traffic monitoring and UI updates using a high-performance WebSocket hub and background job system * feat: add bulk client management support and improve inbound data handling * Fix bug * **Fixes & Changes:** 1. **Fixed XPadding Placement Dropdown**: - Added the missing `cookie` and `query` options to `xPaddingPlacement` (`stream_xhttp.html`). - *Why:* Previously, users wanting `cookie` obfuscation were forced to use the `header` placement string. This caused Xray-core to blindly intercept the entire monolithic HTTP Cookie header, failing internal padding-length validations and causing the inbound to silently drop the connection. 2. **Fixed Uplink Data Placement Validation**: - Replaced the unsupported `query` option with `cookie` in `uplinkDataPlacement`. - *Why:* Xray-core's `transport_internet.go` explicitly forbids `query` as an uplink placement option. Selecting it from the UI previously sent a payload that would cause Xray-core to instantly throw an `unsupported uplink data placement: query` panic. Adding `cookie` perfectly aligns the UI with Xray-core restrictions. ### Related Issues - Resolves #3992 * This commit fixes structural payload issues preventing XHTTP from functioning correctly and eliminates WebSocket log spam. - **[Fix X-Padding UI]** Added missing `cookie` and `query` options to X-Padding Placement. Fixes the issue where using Cookie fallback triggers whole HTTP Cookie header interception and silent drop in Xray-core. (Resolves [#3992](https://github.com/MHSanaei/3x-ui/issues/3992)) - **[Fix Uplink Data Options]** Replaced the invalid `query` option with `cookie` in Uplink Data Placement dropdown to prevent Xray-core backend panic `unsupported uplink data placement: query`. - **[Fix WebSockets Spam]** Boosted `maxMessageSize` boundary to 100MB and gracefully handled fallback fetch signals via `broadcastInvalidate` to avoid buffer dropping spam. (Resolves [#3984](https://github.com/MHSanaei/3x-ui/issues/3984)) * Fix * gofmt * fix(websocket): resolve channel race condition and graceful shutdown deadlock * Fix: inbounds switch * Change max quantity from 10000 to 500 * fix
Diffstat (limited to 'web/websocket')
-rw-r--r--web/websocket/hub.go96
-rw-r--r--web/websocket/notifier.go21
2 files changed, 80 insertions, 37 deletions
diff --git a/web/websocket/hub.go b/web/websocket/hub.go
index 8aa5903c..1455d1fa 100644
--- a/web/websocket/hub.go
+++ b/web/websocket/hub.go
@@ -21,6 +21,7 @@ const (
MessageTypeNotification MessageType = "notification" // System notification
MessageTypeXrayState MessageType = "xray_state" // Xray state change
MessageTypeOutbounds MessageType = "outbounds" // Outbounds list update
+ MessageTypeInvalidate MessageType = "invalidate" // Lightweight signal telling frontend to re-fetch data via REST
)
// Message represents a WebSocket message
@@ -32,10 +33,11 @@ type Message struct {
// Client represents a WebSocket client connection
type Client struct {
- ID string
- Send chan []byte
- Hub *Hub
- Topics map[MessageType]bool // Subscribed topics
+ ID string
+ Send chan []byte
+ Hub *Hub
+ Topics map[MessageType]bool // Subscribed topics
+ closeOnce sync.Once // Ensures Send channel is closed exactly once
}
// Hub maintains the set of active clients and broadcasts messages to them
@@ -61,7 +63,6 @@ type Hub struct {
// Worker pool for parallel broadcasting
workerPoolSize int
- broadcastWg sync.WaitGroup
}
// NewHub creates a new WebSocket hub
@@ -104,20 +105,12 @@ func (h *Hub) Run() {
// Graceful shutdown: close all clients
h.mu.Lock()
for client := range h.clients {
- // Safely close channel (avoid double close panic)
- select {
- case _, stillOpen := <-client.Send:
- if stillOpen {
- close(client.Send)
- }
- default:
+ client.closeOnce.Do(func() {
close(client.Send)
- }
+ })
}
h.clients = make(map[*Client]bool)
h.mu.Unlock()
- // Wait for all broadcast workers to finish
- h.broadcastWg.Wait()
logger.Info("WebSocket hub stopped gracefully")
return
@@ -138,19 +131,9 @@ func (h *Hub) Run() {
h.mu.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
- // Safely close channel (avoid double close panic)
- // Check if channel is already closed by trying to read from it
- select {
- case _, stillOpen := <-client.Send:
- if stillOpen {
- // Channel was open and had data, now it's empty, safe to close
- close(client.Send)
- }
- // If stillOpen is false, channel was already closed, do nothing
- default:
- // Channel is empty and open, safe to close
+ client.closeOnce.Do(func() {
close(client.Send)
- }
+ })
}
count := len(h.clients)
h.mu.Unlock()
@@ -220,11 +203,12 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
}
close(clientChan)
- // Start workers for parallel processing
- h.broadcastWg.Add(h.workerPoolSize)
+ // Use a local WaitGroup to avoid blocking hub shutdown
+ var wg sync.WaitGroup
+ wg.Add(h.workerPoolSize)
for i := 0; i < h.workerPoolSize; i++ {
go func() {
- defer h.broadcastWg.Done()
+ defer wg.Done()
for client := range clientChan {
func() {
defer func() {
@@ -246,7 +230,7 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
}
// Wait for all workers to finish
- h.broadcastWg.Wait()
+ wg.Wait()
}
// Broadcast sends a message to all connected clients
@@ -259,6 +243,11 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) {
return
}
+ // Skip all work if no clients are connected
+ if h.GetClientCount() == 0 {
+ return
+ }
+
msg := Message{
Type: messageType,
Payload: payload,
@@ -271,10 +260,12 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) {
return
}
- // Limit message size to prevent memory issues
- const maxMessageSize = 1024 * 1024 // 1MB
+ // If message exceeds size limit, send a lightweight invalidate notification
+ // instead of dropping it entirely — the frontend will re-fetch via REST API
+ const maxMessageSize = 10 * 1024 * 1024 // 10MB
if len(data) > maxMessageSize {
- logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data))
+ logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType)
+ h.broadcastInvalidate(messageType)
return
}
@@ -298,6 +289,11 @@ func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) {
return
}
+ // Skip all work if no clients are connected
+ if h.GetClientCount() == 0 {
+ return
+ }
+
msg := Message{
Type: messageType,
Payload: payload,
@@ -310,10 +306,11 @@ func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) {
return
}
- // Limit message size to prevent memory issues
- const maxMessageSize = 1024 * 1024 // 1MB
+ // If message exceeds size limit, send a lightweight invalidate notification
+ const maxMessageSize = 10 * 1024 * 1024 // 10MB
if len(data) > maxMessageSize {
- logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data))
+ logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType)
+ h.broadcastInvalidate(messageType)
return
}
@@ -374,6 +371,31 @@ func (h *Hub) Stop() {
}
}
+// broadcastInvalidate sends a lightweight invalidate message to all clients,
+// telling them to re-fetch the specified data type via REST API.
+// This is used when the full payload exceeds the WebSocket message size limit.
+func (h *Hub) broadcastInvalidate(originalType MessageType) {
+ msg := Message{
+ Type: MessageTypeInvalidate,
+ Payload: map[string]string{"type": string(originalType)},
+ Time: getCurrentTimestamp(),
+ }
+
+ data, err := json.Marshal(msg)
+ if err != nil {
+ logger.Error("Failed to marshal invalidate message:", err)
+ return
+ }
+
+ // Non-blocking send with timeout
+ select {
+ case h.broadcast <- data:
+ case <-time.After(100 * time.Millisecond):
+ logger.Warning("WebSocket broadcast channel is full, dropping invalidate message")
+ case <-h.ctx.Done():
+ }
+}
+
// getCurrentTimestamp returns current Unix timestamp in milliseconds
func getCurrentTimestamp() int64 {
return time.Now().UnixMilli()
diff --git a/web/websocket/notifier.go b/web/websocket/notifier.go
index 74cf61b2..2db78578 100644
--- a/web/websocket/notifier.go
+++ b/web/websocket/notifier.go
@@ -24,6 +24,16 @@ func GetHub() *Hub {
return wsHub
}
+// HasClients returns true if there are any WebSocket clients connected.
+// Use this to skip expensive work (DB queries, serialization) when no browser is open.
+func HasClients() bool {
+ hub := GetHub()
+ if hub == nil {
+ return false
+ }
+ return hub.GetClientCount() > 0
+}
+
// BroadcastStatus broadcasts server status update to all connected clients
func BroadcastStatus(status any) {
hub := GetHub()
@@ -80,3 +90,14 @@ func BroadcastXrayState(state string, errorMsg string) {
hub.Broadcast(MessageTypeXrayState, stateUpdate)
}
}
+
+// BroadcastInvalidate sends a lightweight invalidate signal for the given data type,
+// telling connected frontends to re-fetch data via REST API.
+// Use this instead of BroadcastInbounds/BroadcastOutbounds when you know the payload
+// will be too large, to avoid wasting resources on serialization.
+func BroadcastInvalidate(dataType MessageType) {
+ hub := GetHub()
+ if hub != nil {
+ hub.broadcastInvalidate(dataType)
+ }
+}