diff options
Diffstat (limited to 'web/websocket')
| -rw-r--r-- | web/websocket/hub.go | 96 | ||||
| -rw-r--r-- | web/websocket/notifier.go | 21 |
2 files changed, 80 insertions, 37 deletions
diff --git a/web/websocket/hub.go b/web/websocket/hub.go index 8aa5903c..1455d1fa 100644 --- a/web/websocket/hub.go +++ b/web/websocket/hub.go @@ -21,6 +21,7 @@ const ( MessageTypeNotification MessageType = "notification" // System notification MessageTypeXrayState MessageType = "xray_state" // Xray state change MessageTypeOutbounds MessageType = "outbounds" // Outbounds list update + MessageTypeInvalidate MessageType = "invalidate" // Lightweight signal telling frontend to re-fetch data via REST ) // Message represents a WebSocket message @@ -32,10 +33,11 @@ type Message struct { // Client represents a WebSocket client connection type Client struct { - ID string - Send chan []byte - Hub *Hub - Topics map[MessageType]bool // Subscribed topics + ID string + Send chan []byte + Hub *Hub + Topics map[MessageType]bool // Subscribed topics + closeOnce sync.Once // Ensures Send channel is closed exactly once } // Hub maintains the set of active clients and broadcasts messages to them @@ -61,7 +63,6 @@ type Hub struct { // Worker pool for parallel broadcasting workerPoolSize int - broadcastWg sync.WaitGroup } // NewHub creates a new WebSocket hub @@ -104,20 +105,12 @@ func (h *Hub) Run() { // Graceful shutdown: close all clients h.mu.Lock() for client := range h.clients { - // Safely close channel (avoid double close panic) - select { - case _, stillOpen := <-client.Send: - if stillOpen { - close(client.Send) - } - default: + client.closeOnce.Do(func() { close(client.Send) - } + }) } h.clients = make(map[*Client]bool) h.mu.Unlock() - // Wait for all broadcast workers to finish - h.broadcastWg.Wait() logger.Info("WebSocket hub stopped gracefully") return @@ -138,19 +131,9 @@ func (h *Hub) Run() { h.mu.Lock() if _, ok := h.clients[client]; ok { delete(h.clients, client) - // Safely close channel (avoid double close panic) - // Check if channel is already closed by trying to read from it - select { - case _, stillOpen := <-client.Send: - if stillOpen { - // Channel was open and had data, now it's empty, safe to close - close(client.Send) - } - // If stillOpen is false, channel was already closed, do nothing - default: - // Channel is empty and open, safe to close + client.closeOnce.Do(func() { close(client.Send) - } + }) } count := len(h.clients) h.mu.Unlock() @@ -220,11 +203,12 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) { } close(clientChan) - // Start workers for parallel processing - h.broadcastWg.Add(h.workerPoolSize) + // Use a local WaitGroup to avoid blocking hub shutdown + var wg sync.WaitGroup + wg.Add(h.workerPoolSize) for i := 0; i < h.workerPoolSize; i++ { go func() { - defer h.broadcastWg.Done() + defer wg.Done() for client := range clientChan { func() { defer func() { @@ -246,7 +230,7 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) { } // Wait for all workers to finish - h.broadcastWg.Wait() + wg.Wait() } // Broadcast sends a message to all connected clients @@ -259,6 +243,11 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) { return } + // Skip all work if no clients are connected + if h.GetClientCount() == 0 { + return + } + msg := Message{ Type: messageType, Payload: payload, @@ -271,10 +260,12 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) { return } - // Limit message size to prevent memory issues - const maxMessageSize = 1024 * 1024 // 1MB + // If message exceeds size limit, send a lightweight invalidate notification + // instead of dropping it entirely — the frontend will re-fetch via REST API + const maxMessageSize = 10 * 1024 * 1024 // 10MB if len(data) > maxMessageSize { - logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data)) + logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType) + h.broadcastInvalidate(messageType) return } @@ -298,6 +289,11 @@ func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) { return } + // Skip all work if no clients are connected + if h.GetClientCount() == 0 { + return + } + msg := Message{ Type: messageType, Payload: payload, @@ -310,10 +306,11 @@ func (h *Hub) BroadcastToTopic(messageType MessageType, payload any) { return } - // Limit message size to prevent memory issues - const maxMessageSize = 1024 * 1024 // 1MB + // If message exceeds size limit, send a lightweight invalidate notification + const maxMessageSize = 10 * 1024 * 1024 // 10MB if len(data) > maxMessageSize { - logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data)) + logger.Debugf("WebSocket message too large (%d bytes) for type %s, sending invalidate signal", len(data), messageType) + h.broadcastInvalidate(messageType) return } @@ -374,6 +371,31 @@ func (h *Hub) Stop() { } } +// broadcastInvalidate sends a lightweight invalidate message to all clients, +// telling them to re-fetch the specified data type via REST API. +// This is used when the full payload exceeds the WebSocket message size limit. +func (h *Hub) broadcastInvalidate(originalType MessageType) { + msg := Message{ + Type: MessageTypeInvalidate, + Payload: map[string]string{"type": string(originalType)}, + Time: getCurrentTimestamp(), + } + + data, err := json.Marshal(msg) + if err != nil { + logger.Error("Failed to marshal invalidate message:", err) + return + } + + // Non-blocking send with timeout + select { + case h.broadcast <- data: + case <-time.After(100 * time.Millisecond): + logger.Warning("WebSocket broadcast channel is full, dropping invalidate message") + case <-h.ctx.Done(): + } +} + // getCurrentTimestamp returns current Unix timestamp in milliseconds func getCurrentTimestamp() int64 { return time.Now().UnixMilli() diff --git a/web/websocket/notifier.go b/web/websocket/notifier.go index 74cf61b2..2db78578 100644 --- a/web/websocket/notifier.go +++ b/web/websocket/notifier.go @@ -24,6 +24,16 @@ func GetHub() *Hub { return wsHub } +// HasClients returns true if there are any WebSocket clients connected. +// Use this to skip expensive work (DB queries, serialization) when no browser is open. +func HasClients() bool { + hub := GetHub() + if hub == nil { + return false + } + return hub.GetClientCount() > 0 +} + // BroadcastStatus broadcasts server status update to all connected clients func BroadcastStatus(status any) { hub := GetHub() @@ -80,3 +90,14 @@ func BroadcastXrayState(state string, errorMsg string) { hub.Broadcast(MessageTypeXrayState, stateUpdate) } } + +// BroadcastInvalidate sends a lightweight invalidate signal for the given data type, +// telling connected frontends to re-fetch data via REST API. +// Use this instead of BroadcastInbounds/BroadcastOutbounds when you know the payload +// will be too large, to avoid wasting resources on serialization. +func BroadcastInvalidate(dataType MessageType) { + hub := GetHub() + if hub != nil { + hub.broadcastInvalidate(dataType) + } +} |
