diff options
| author | lolka1333 <xtrafcyz@gmail.com> | 2026-01-03 07:26:00 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-01-03 07:26:00 +0300 |
| commit | 313a2acbf66125feb4b145a5636351ed03e666da (patch) | |
| tree | 6be6fac0ced2d0dce60ba55e2feaa83c257ed720 /web/websocket/hub.go | |
| parent | b7477302112b43a2ae037b63994c59e85f9c0687 (diff) | |
feat: Add WebSocket support for real-time updates and enhance VLESS settings (#3605)
* feat: add support for trusted X-Forwarded-For and testseed parameters in VLESS settings
* chore: update Xray Core version to 25.12.8 in release workflow
* chore: update Xray Core version to 25.12.8 in Docker initialization script
* chore: bump version to 2.8.6 and add watcher for security changes in inbound modal
* refactor: remove default and random seed buttons from outbound form
* refactor: update VLESS form to rename 'Test Seed' to 'Vision Seed' and change button functionality for seed generation
* refactor: enhance TLS settings form layout with improved button styling and spacing
* feat: integrate WebSocket support for real-time updates on inbounds and Xray service status
* chore: downgrade version to 2.8.5
* refactor: translate comments to English
* fix: ensure testseed is initialized correctly for VLESS protocol and improve client handling in inbound modal
* refactor: simplify VLESS divider condition by removing unnecessary flow checks
* fix: add fallback date formatting for cases when IntlUtil is not available
* refactor: simplify WebSocket message handling by removing batching and ensuring individual message delivery
* refactor: disable WebSocket notifications in inbound and index HTML files
* refactor: enhance VLESS testseed initialization and button functionality in inbound modal
* fix:
* refactor: ensure proper WebSocket URL construction by normalizing basePath
* fix:
* fix:
* fix:
* refactor: update testseed methods for improved reactivity and binding in VLESS form
* logger info to debug
---------
Co-authored-by: lolka1333 <test123@gmail.com>
Diffstat (limited to 'web/websocket/hub.go')
| -rw-r--r-- | web/websocket/hub.go | 379 |
1 files changed, 379 insertions, 0 deletions
diff --git a/web/websocket/hub.go b/web/websocket/hub.go new file mode 100644 index 00000000..187ebb6f --- /dev/null +++ b/web/websocket/hub.go @@ -0,0 +1,379 @@ +// Package websocket provides WebSocket hub for real-time updates and notifications. +package websocket + +import ( + "context" + "encoding/json" + "runtime" + "sync" + "time" + + "github.com/mhsanaei/3x-ui/v2/logger" +) + +// MessageType represents the type of WebSocket message +type MessageType string + +const ( + MessageTypeStatus MessageType = "status" // Server status update + MessageTypeTraffic MessageType = "traffic" // Traffic statistics update + MessageTypeInbounds MessageType = "inbounds" // Inbounds list update + MessageTypeNotification MessageType = "notification" // System notification + MessageTypeXrayState MessageType = "xray_state" // Xray state change +) + +// Message represents a WebSocket message +type Message struct { + Type MessageType `json:"type"` + Payload interface{} `json:"payload"` + Time int64 `json:"time"` +} + +// Client represents a WebSocket client connection +type Client struct { + ID string + Send chan []byte + Hub *Hub + Topics map[MessageType]bool // Subscribed topics +} + +// Hub maintains the set of active clients and broadcasts messages to them +type Hub struct { + // Registered clients + clients map[*Client]bool + + // Inbound messages from clients + broadcast chan []byte + + // Register requests from clients + register chan *Client + + // Unregister requests from clients + unregister chan *Client + + // Mutex for thread-safe operations + mu sync.RWMutex + + // Context for graceful shutdown + ctx context.Context + cancel context.CancelFunc + + // Worker pool for parallel broadcasting + workerPoolSize int + broadcastWg sync.WaitGroup +} + +// NewHub creates a new WebSocket hub +func NewHub() *Hub { + ctx, cancel := context.WithCancel(context.Background()) + + // Calculate optimal worker pool size (CPU cores * 2, but max 100) + workerPoolSize := runtime.NumCPU() * 2 + if workerPoolSize > 100 { + workerPoolSize = 100 + } + if workerPoolSize < 10 { + workerPoolSize = 10 + } + + return &Hub{ + clients: make(map[*Client]bool), + broadcast: make(chan []byte, 2048), // Increased from 256 to 2048 for high load + register: make(chan *Client, 100), // Buffered channel for fast registration + unregister: make(chan *Client, 100), // Buffered channel for fast unregistration + ctx: ctx, + cancel: cancel, + workerPoolSize: workerPoolSize, + } +} + +// Run starts the hub's main loop +func (h *Hub) Run() { + defer func() { + if r := recover(); r != nil { + logger.Error("WebSocket hub panic recovered:", r) + // Restart the hub loop + go h.Run() + } + }() + + for { + select { + case <-h.ctx.Done(): + // Graceful shutdown: close all clients + h.mu.Lock() + for client := range h.clients { + // Safely close channel (avoid double close panic) + select { + case _, stillOpen := <-client.Send: + if stillOpen { + close(client.Send) + } + default: + close(client.Send) + } + } + h.clients = make(map[*Client]bool) + h.mu.Unlock() + // Wait for all broadcast workers to finish + h.broadcastWg.Wait() + logger.Info("WebSocket hub stopped gracefully") + return + + case client := <-h.register: + if client == nil { + continue + } + h.mu.Lock() + h.clients[client] = true + count := len(h.clients) + h.mu.Unlock() + logger.Debugf("WebSocket client connected: %s (total: %d)", client.ID, count) + + case client := <-h.unregister: + if client == nil { + continue + } + h.mu.Lock() + if _, ok := h.clients[client]; ok { + delete(h.clients, client) + // Safely close channel (avoid double close panic) + // Check if channel is already closed by trying to read from it + select { + case _, stillOpen := <-client.Send: + if stillOpen { + // Channel was open and had data, now it's empty, safe to close + close(client.Send) + } + // If stillOpen is false, channel was already closed, do nothing + default: + // Channel is empty and open, safe to close + close(client.Send) + } + } + count := len(h.clients) + h.mu.Unlock() + logger.Debugf("WebSocket client disconnected: %s (total: %d)", client.ID, count) + + case message := <-h.broadcast: + if message == nil { + continue + } + // Optimization: quickly copy client list and release lock + h.mu.RLock() + clientCount := len(h.clients) + if clientCount == 0 { + h.mu.RUnlock() + continue + } + + // Pre-allocate memory for client list + clients := make([]*Client, 0, clientCount) + for client := range h.clients { + clients = append(clients, client) + } + h.mu.RUnlock() + + // Parallel broadcast using worker pool + h.broadcastParallel(clients, message) + } + } +} + +// broadcastParallel sends message to all clients in parallel for maximum performance +func (h *Hub) broadcastParallel(clients []*Client, message []byte) { + if len(clients) == 0 { + return + } + + // For small number of clients, use simple parallel sending + if len(clients) < h.workerPoolSize { + var wg sync.WaitGroup + for _, client := range clients { + wg.Add(1) + go func(c *Client) { + defer wg.Done() + defer func() { + if r := recover(); r != nil { + // Channel may be closed, safely ignore + logger.Debugf("WebSocket broadcast panic recovered for client %s: %v", c.ID, r) + } + }() + select { + case c.Send <- message: + default: + // Client's send buffer is full, disconnect + logger.Debugf("WebSocket client %s send buffer full, disconnecting", c.ID) + h.Unregister(c) + } + }(client) + } + wg.Wait() + return + } + + // For large number of clients, use worker pool for optimal performance + clientChan := make(chan *Client, len(clients)) + for _, client := range clients { + clientChan <- client + } + close(clientChan) + + // Start workers for parallel processing + h.broadcastWg.Add(h.workerPoolSize) + for i := 0; i < h.workerPoolSize; i++ { + go func() { + defer h.broadcastWg.Done() + for client := range clientChan { + func() { + defer func() { + if r := recover(); r != nil { + // Channel may be closed, safely ignore + logger.Debugf("WebSocket broadcast panic recovered for client %s: %v", client.ID, r) + } + }() + select { + case client.Send <- message: + default: + // Client's send buffer is full, disconnect + logger.Debugf("WebSocket client %s send buffer full, disconnecting", client.ID) + h.Unregister(client) + } + }() + } + }() + } + + // Wait for all workers to finish + h.broadcastWg.Wait() +} + +// Broadcast sends a message to all connected clients +func (h *Hub) Broadcast(messageType MessageType, payload interface{}) { + if h == nil { + return + } + if payload == nil { + logger.Warning("Attempted to broadcast nil payload") + return + } + + msg := Message{ + Type: messageType, + Payload: payload, + Time: getCurrentTimestamp(), + } + + data, err := json.Marshal(msg) + if err != nil { + logger.Error("Failed to marshal WebSocket message:", err) + return + } + + // Limit message size to prevent memory issues + const maxMessageSize = 1024 * 1024 // 1MB + if len(data) > maxMessageSize { + logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data)) + return + } + + // Non-blocking send with timeout to prevent delays + select { + case h.broadcast <- data: + case <-time.After(100 * time.Millisecond): + logger.Warning("WebSocket broadcast channel is full, dropping message") + case <-h.ctx.Done(): + // Hub is shutting down + } +} + +// BroadcastToTopic sends a message only to clients subscribed to the specific topic +func (h *Hub) BroadcastToTopic(messageType MessageType, payload interface{}) { + if h == nil { + return + } + if payload == nil { + logger.Warning("Attempted to broadcast nil payload to topic") + return + } + + msg := Message{ + Type: messageType, + Payload: payload, + Time: getCurrentTimestamp(), + } + + data, err := json.Marshal(msg) + if err != nil { + logger.Error("Failed to marshal WebSocket message:", err) + return + } + + // Limit message size to prevent memory issues + const maxMessageSize = 1024 * 1024 // 1MB + if len(data) > maxMessageSize { + logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data)) + return + } + + h.mu.RLock() + // Filter clients by topics and quickly release lock + subscribedClients := make([]*Client, 0) + for client := range h.clients { + if len(client.Topics) == 0 || client.Topics[messageType] { + subscribedClients = append(subscribedClients, client) + } + } + h.mu.RUnlock() + + // Parallel send to subscribed clients + if len(subscribedClients) > 0 { + h.broadcastParallel(subscribedClients, data) + } +} + +// GetClientCount returns the number of connected clients +func (h *Hub) GetClientCount() int { + h.mu.RLock() + defer h.mu.RUnlock() + return len(h.clients) +} + +// Register registers a new client with the hub +func (h *Hub) Register(client *Client) { + if h == nil || client == nil { + return + } + select { + case h.register <- client: + case <-h.ctx.Done(): + // Hub is shutting down + } +} + +// Unregister unregisters a client from the hub +func (h *Hub) Unregister(client *Client) { + if h == nil || client == nil { + return + } + select { + case h.unregister <- client: + case <-h.ctx.Done(): + // Hub is shutting down + } +} + +// Stop gracefully stops the hub and closes all connections +func (h *Hub) Stop() { + if h == nil { + return + } + if h.cancel != nil { + h.cancel() + } +} + +// getCurrentTimestamp returns current Unix timestamp in milliseconds +func getCurrentTimestamp() int64 { + return time.Now().UnixMilli() +} |
