Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/MHSanaei/3x-ui.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlolka1333 <xtrafcyz@gmail.com>2026-01-03 07:26:00 +0300
committerGitHub <noreply@github.com>2026-01-03 07:26:00 +0300
commit313a2acbf66125feb4b145a5636351ed03e666da (patch)
tree6be6fac0ced2d0dce60ba55e2feaa83c257ed720 /web/websocket
parentb7477302112b43a2ae037b63994c59e85f9c0687 (diff)
feat: Add WebSocket support for real-time updates and enhance VLESS settings (#3605)
* feat: add support for trusted X-Forwarded-For and testseed parameters in VLESS settings * chore: update Xray Core version to 25.12.8 in release workflow * chore: update Xray Core version to 25.12.8 in Docker initialization script * chore: bump version to 2.8.6 and add watcher for security changes in inbound modal * refactor: remove default and random seed buttons from outbound form * refactor: update VLESS form to rename 'Test Seed' to 'Vision Seed' and change button functionality for seed generation * refactor: enhance TLS settings form layout with improved button styling and spacing * feat: integrate WebSocket support for real-time updates on inbounds and Xray service status * chore: downgrade version to 2.8.5 * refactor: translate comments to English * fix: ensure testseed is initialized correctly for VLESS protocol and improve client handling in inbound modal * refactor: simplify VLESS divider condition by removing unnecessary flow checks * fix: add fallback date formatting for cases when IntlUtil is not available * refactor: simplify WebSocket message handling by removing batching and ensuring individual message delivery * refactor: disable WebSocket notifications in inbound and index HTML files * refactor: enhance VLESS testseed initialization and button functionality in inbound modal * fix: * refactor: ensure proper WebSocket URL construction by normalizing basePath * fix: * fix: * fix: * refactor: update testseed methods for improved reactivity and binding in VLESS form * logger info to debug --------- Co-authored-by: lolka1333 <test123@gmail.com>
Diffstat (limited to 'web/websocket')
-rw-r--r--web/websocket/hub.go379
-rw-r--r--web/websocket/notifier.go74
2 files changed, 453 insertions, 0 deletions
diff --git a/web/websocket/hub.go b/web/websocket/hub.go
new file mode 100644
index 00000000..187ebb6f
--- /dev/null
+++ b/web/websocket/hub.go
@@ -0,0 +1,379 @@
+// Package websocket provides WebSocket hub for real-time updates and notifications.
+package websocket
+
+import (
+ "context"
+ "encoding/json"
+ "runtime"
+ "sync"
+ "time"
+
+ "github.com/mhsanaei/3x-ui/v2/logger"
+)
+
+// MessageType represents the type of WebSocket message
+type MessageType string
+
+const (
+ MessageTypeStatus MessageType = "status" // Server status update
+ MessageTypeTraffic MessageType = "traffic" // Traffic statistics update
+ MessageTypeInbounds MessageType = "inbounds" // Inbounds list update
+ MessageTypeNotification MessageType = "notification" // System notification
+ MessageTypeXrayState MessageType = "xray_state" // Xray state change
+)
+
+// Message represents a WebSocket message
+type Message struct {
+ Type MessageType `json:"type"`
+ Payload interface{} `json:"payload"`
+ Time int64 `json:"time"`
+}
+
+// Client represents a WebSocket client connection
+type Client struct {
+ ID string
+ Send chan []byte
+ Hub *Hub
+ Topics map[MessageType]bool // Subscribed topics
+}
+
+// Hub maintains the set of active clients and broadcasts messages to them
+type Hub struct {
+ // Registered clients
+ clients map[*Client]bool
+
+ // Inbound messages from clients
+ broadcast chan []byte
+
+ // Register requests from clients
+ register chan *Client
+
+ // Unregister requests from clients
+ unregister chan *Client
+
+ // Mutex for thread-safe operations
+ mu sync.RWMutex
+
+ // Context for graceful shutdown
+ ctx context.Context
+ cancel context.CancelFunc
+
+ // Worker pool for parallel broadcasting
+ workerPoolSize int
+ broadcastWg sync.WaitGroup
+}
+
+// NewHub creates a new WebSocket hub
+func NewHub() *Hub {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ // Calculate optimal worker pool size (CPU cores * 2, but max 100)
+ workerPoolSize := runtime.NumCPU() * 2
+ if workerPoolSize > 100 {
+ workerPoolSize = 100
+ }
+ if workerPoolSize < 10 {
+ workerPoolSize = 10
+ }
+
+ return &Hub{
+ clients: make(map[*Client]bool),
+ broadcast: make(chan []byte, 2048), // Increased from 256 to 2048 for high load
+ register: make(chan *Client, 100), // Buffered channel for fast registration
+ unregister: make(chan *Client, 100), // Buffered channel for fast unregistration
+ ctx: ctx,
+ cancel: cancel,
+ workerPoolSize: workerPoolSize,
+ }
+}
+
+// Run starts the hub's main loop
+func (h *Hub) Run() {
+ defer func() {
+ if r := recover(); r != nil {
+ logger.Error("WebSocket hub panic recovered:", r)
+ // Restart the hub loop
+ go h.Run()
+ }
+ }()
+
+ for {
+ select {
+ case <-h.ctx.Done():
+ // Graceful shutdown: close all clients
+ h.mu.Lock()
+ for client := range h.clients {
+ // Safely close channel (avoid double close panic)
+ select {
+ case _, stillOpen := <-client.Send:
+ if stillOpen {
+ close(client.Send)
+ }
+ default:
+ close(client.Send)
+ }
+ }
+ h.clients = make(map[*Client]bool)
+ h.mu.Unlock()
+ // Wait for all broadcast workers to finish
+ h.broadcastWg.Wait()
+ logger.Info("WebSocket hub stopped gracefully")
+ return
+
+ case client := <-h.register:
+ if client == nil {
+ continue
+ }
+ h.mu.Lock()
+ h.clients[client] = true
+ count := len(h.clients)
+ h.mu.Unlock()
+ logger.Debugf("WebSocket client connected: %s (total: %d)", client.ID, count)
+
+ case client := <-h.unregister:
+ if client == nil {
+ continue
+ }
+ h.mu.Lock()
+ if _, ok := h.clients[client]; ok {
+ delete(h.clients, client)
+ // Safely close channel (avoid double close panic)
+ // Check if channel is already closed by trying to read from it
+ select {
+ case _, stillOpen := <-client.Send:
+ if stillOpen {
+ // Channel was open and had data, now it's empty, safe to close
+ close(client.Send)
+ }
+ // If stillOpen is false, channel was already closed, do nothing
+ default:
+ // Channel is empty and open, safe to close
+ close(client.Send)
+ }
+ }
+ count := len(h.clients)
+ h.mu.Unlock()
+ logger.Debugf("WebSocket client disconnected: %s (total: %d)", client.ID, count)
+
+ case message := <-h.broadcast:
+ if message == nil {
+ continue
+ }
+ // Optimization: quickly copy client list and release lock
+ h.mu.RLock()
+ clientCount := len(h.clients)
+ if clientCount == 0 {
+ h.mu.RUnlock()
+ continue
+ }
+
+ // Pre-allocate memory for client list
+ clients := make([]*Client, 0, clientCount)
+ for client := range h.clients {
+ clients = append(clients, client)
+ }
+ h.mu.RUnlock()
+
+ // Parallel broadcast using worker pool
+ h.broadcastParallel(clients, message)
+ }
+ }
+}
+
+// broadcastParallel sends message to all clients in parallel for maximum performance
+func (h *Hub) broadcastParallel(clients []*Client, message []byte) {
+ if len(clients) == 0 {
+ return
+ }
+
+ // For small number of clients, use simple parallel sending
+ if len(clients) < h.workerPoolSize {
+ var wg sync.WaitGroup
+ for _, client := range clients {
+ wg.Add(1)
+ go func(c *Client) {
+ defer wg.Done()
+ defer func() {
+ if r := recover(); r != nil {
+ // Channel may be closed, safely ignore
+ logger.Debugf("WebSocket broadcast panic recovered for client %s: %v", c.ID, r)
+ }
+ }()
+ select {
+ case c.Send <- message:
+ default:
+ // Client's send buffer is full, disconnect
+ logger.Debugf("WebSocket client %s send buffer full, disconnecting", c.ID)
+ h.Unregister(c)
+ }
+ }(client)
+ }
+ wg.Wait()
+ return
+ }
+
+ // For large number of clients, use worker pool for optimal performance
+ clientChan := make(chan *Client, len(clients))
+ for _, client := range clients {
+ clientChan <- client
+ }
+ close(clientChan)
+
+ // Start workers for parallel processing
+ h.broadcastWg.Add(h.workerPoolSize)
+ for i := 0; i < h.workerPoolSize; i++ {
+ go func() {
+ defer h.broadcastWg.Done()
+ for client := range clientChan {
+ func() {
+ defer func() {
+ if r := recover(); r != nil {
+ // Channel may be closed, safely ignore
+ logger.Debugf("WebSocket broadcast panic recovered for client %s: %v", client.ID, r)
+ }
+ }()
+ select {
+ case client.Send <- message:
+ default:
+ // Client's send buffer is full, disconnect
+ logger.Debugf("WebSocket client %s send buffer full, disconnecting", client.ID)
+ h.Unregister(client)
+ }
+ }()
+ }
+ }()
+ }
+
+ // Wait for all workers to finish
+ h.broadcastWg.Wait()
+}
+
+// Broadcast sends a message to all connected clients
+func (h *Hub) Broadcast(messageType MessageType, payload interface{}) {
+ if h == nil {
+ return
+ }
+ if payload == nil {
+ logger.Warning("Attempted to broadcast nil payload")
+ return
+ }
+
+ msg := Message{
+ Type: messageType,
+ Payload: payload,
+ Time: getCurrentTimestamp(),
+ }
+
+ data, err := json.Marshal(msg)
+ if err != nil {
+ logger.Error("Failed to marshal WebSocket message:", err)
+ return
+ }
+
+ // Limit message size to prevent memory issues
+ const maxMessageSize = 1024 * 1024 // 1MB
+ if len(data) > maxMessageSize {
+ logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data))
+ return
+ }
+
+ // Non-blocking send with timeout to prevent delays
+ select {
+ case h.broadcast <- data:
+ case <-time.After(100 * time.Millisecond):
+ logger.Warning("WebSocket broadcast channel is full, dropping message")
+ case <-h.ctx.Done():
+ // Hub is shutting down
+ }
+}
+
+// BroadcastToTopic sends a message only to clients subscribed to the specific topic
+func (h *Hub) BroadcastToTopic(messageType MessageType, payload interface{}) {
+ if h == nil {
+ return
+ }
+ if payload == nil {
+ logger.Warning("Attempted to broadcast nil payload to topic")
+ return
+ }
+
+ msg := Message{
+ Type: messageType,
+ Payload: payload,
+ Time: getCurrentTimestamp(),
+ }
+
+ data, err := json.Marshal(msg)
+ if err != nil {
+ logger.Error("Failed to marshal WebSocket message:", err)
+ return
+ }
+
+ // Limit message size to prevent memory issues
+ const maxMessageSize = 1024 * 1024 // 1MB
+ if len(data) > maxMessageSize {
+ logger.Warningf("WebSocket message too large: %d bytes, dropping", len(data))
+ return
+ }
+
+ h.mu.RLock()
+ // Filter clients by topics and quickly release lock
+ subscribedClients := make([]*Client, 0)
+ for client := range h.clients {
+ if len(client.Topics) == 0 || client.Topics[messageType] {
+ subscribedClients = append(subscribedClients, client)
+ }
+ }
+ h.mu.RUnlock()
+
+ // Parallel send to subscribed clients
+ if len(subscribedClients) > 0 {
+ h.broadcastParallel(subscribedClients, data)
+ }
+}
+
+// GetClientCount returns the number of connected clients
+func (h *Hub) GetClientCount() int {
+ h.mu.RLock()
+ defer h.mu.RUnlock()
+ return len(h.clients)
+}
+
+// Register registers a new client with the hub
+func (h *Hub) Register(client *Client) {
+ if h == nil || client == nil {
+ return
+ }
+ select {
+ case h.register <- client:
+ case <-h.ctx.Done():
+ // Hub is shutting down
+ }
+}
+
+// Unregister unregisters a client from the hub
+func (h *Hub) Unregister(client *Client) {
+ if h == nil || client == nil {
+ return
+ }
+ select {
+ case h.unregister <- client:
+ case <-h.ctx.Done():
+ // Hub is shutting down
+ }
+}
+
+// Stop gracefully stops the hub and closes all connections
+func (h *Hub) Stop() {
+ if h == nil {
+ return
+ }
+ if h.cancel != nil {
+ h.cancel()
+ }
+}
+
+// getCurrentTimestamp returns current Unix timestamp in milliseconds
+func getCurrentTimestamp() int64 {
+ return time.Now().UnixMilli()
+}
diff --git a/web/websocket/notifier.go b/web/websocket/notifier.go
new file mode 100644
index 00000000..cedf56f2
--- /dev/null
+++ b/web/websocket/notifier.go
@@ -0,0 +1,74 @@
+// Package websocket provides WebSocket hub for real-time updates and notifications.
+package websocket
+
+import (
+ "github.com/mhsanaei/3x-ui/v2/logger"
+ "github.com/mhsanaei/3x-ui/v2/web/global"
+)
+
+// GetHub returns the global WebSocket hub instance
+func GetHub() *Hub {
+ webServer := global.GetWebServer()
+ if webServer == nil {
+ return nil
+ }
+ hub := webServer.GetWSHub()
+ if hub == nil {
+ return nil
+ }
+ wsHub, ok := hub.(*Hub)
+ if !ok {
+ logger.Warning("WebSocket hub type assertion failed")
+ return nil
+ }
+ return wsHub
+}
+
+// BroadcastStatus broadcasts server status update to all connected clients
+func BroadcastStatus(status interface{}) {
+ hub := GetHub()
+ if hub != nil {
+ hub.Broadcast(MessageTypeStatus, status)
+ }
+}
+
+// BroadcastTraffic broadcasts traffic statistics update to all connected clients
+func BroadcastTraffic(traffic interface{}) {
+ hub := GetHub()
+ if hub != nil {
+ hub.Broadcast(MessageTypeTraffic, traffic)
+ }
+}
+
+// BroadcastInbounds broadcasts inbounds list update to all connected clients
+func BroadcastInbounds(inbounds interface{}) {
+ hub := GetHub()
+ if hub != nil {
+ hub.Broadcast(MessageTypeInbounds, inbounds)
+ }
+}
+
+// BroadcastNotification broadcasts a system notification to all connected clients
+func BroadcastNotification(title, message, level string) {
+ hub := GetHub()
+ if hub != nil {
+ notification := map[string]string{
+ "title": title,
+ "message": message,
+ "level": level, // info, warning, error, success
+ }
+ hub.Broadcast(MessageTypeNotification, notification)
+ }
+}
+
+// BroadcastXrayState broadcasts Xray state change to all connected clients
+func BroadcastXrayState(state string, errorMsg string) {
+ hub := GetHub()
+ if hub != nil {
+ stateUpdate := map[string]string{
+ "state": state,
+ "errorMsg": errorMsg,
+ }
+ hub.Broadcast(MessageTypeXrayState, stateUpdate)
+ }
+}