From 8177f6dc667f2edca66073e433baf5cff36cda41 Mon Sep 17 00:00:00 2001 From: lolka1333 Date: Tue, 5 May 2026 18:27:49 +0300 Subject: ws/inbounds: realtime fixes + perf for 10k+ client inbounds (#4123) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ws/inbounds: realtime fixes + perf for 10k+ client inbounds - hub: dedup, throttle, panic-restart, deadlock fix, race tests - client: backoff cap + slow-retry instead of giving up - broadcast: delta-only payload, count-based invalidate fallback - filter: fix empty online list (Inbound has no .id, use dbInbound.toInbound) - perf: O(N²)→O(N) traffic merge, bulk delete, /setEnable endpoint - traffic: monotonic all_time + UI clamp + propagate in delta handler - session: persist on update/logout (fixes logout-after-password-change) - ui: protocol tags flex, traffic bar normalize * Remove hub_test.go file * fix: ws hub, inbound service, and frontend correctness - propagate DelInbound error on disable path in SetInboundEnable - skip empty emails in updateClientTraffics to avoid constraint violations - use consistent IN ? clause, drop redundant ErrRecordNotFound guards - Hub.Unregister: direct removeClient fallback when channel is full - applyClientStatsDelta: O(1) email lookup via per-inbound Map cache - WS payload size check: Blob.size instead of .length for real byte count * fix: chunk large IN ? queries and fix IPv6 same-origin check * fix: chunk large IN ? queries and fix IPv6 same-origin check * fix: unify clientStats cache, throttle clarity, hub constants * fix(ui): align traffic/expiry cell columns across all rows * style(ui): redesign outbounds table for visual consistency * style(ui): redesign routing table for visual consistency * fix: * fix: * fix: * fix: * fix: * fix: font * refactor: simplify outbound tone functions for consistency and maintainability --------- Co-authored-by: lolka1333 --- web/job/xray_traffic_job.go | 87 ++++++++++++++++++++++++++++++++------------- 1 file changed, 62 insertions(+), 25 deletions(-) (limited to 'web/job') diff --git a/web/job/xray_traffic_job.go b/web/job/xray_traffic_job.go index 71e90caa..f3b04868 100644 --- a/web/job/xray_traffic_job.go +++ b/web/job/xray_traffic_job.go @@ -24,7 +24,9 @@ func NewXrayTrafficJob() *XrayTrafficJob { return new(XrayTrafficJob) } -// Run collects traffic statistics from Xray and updates the database, triggering restart if needed. +// Run collects traffic statistics from Xray, updates the database, and pushes +// real-time updates over WebSocket using compact delta payloads — no REST +// fallback, scales to 10k–20k+ clients per inbound. func (j *XrayTrafficJob) Run() { if !j.xrayService.IsXrayRunning() { return @@ -33,7 +35,7 @@ func (j *XrayTrafficJob) Run() { if err != nil { return } - err, needRestart0, clientsDisabled := j.inboundService.AddTraffic(traffics, clientTraffics) + needRestart0, clientsDisabled, err := j.inboundService.AddTraffic(traffics, clientTraffics) if err != nil { logger.Warning("add inbound traffic failed:", err) } @@ -62,50 +64,85 @@ func (j *XrayTrafficJob) Run() { j.xrayService.SetToNeedRestart() } - // If no frontend client is connected, skip all WebSocket broadcasting routines, - // including expensive DB queries for online clients and JSON marshaling. + // If no frontend client is connected, skip all WebSocket broadcasting + // routines — including the active-client DB query and JSON marshaling. if !websocket.HasClients() { return } - // Update online clients list and map + // Online presence + traffic deltas — small payload, always fits in WS. + // Force non-nil slice/map so JSON marshalling produces [] / {} instead of + // `null` when everyone is offline. The frontend's traffic handler treats + // a missing/null onlineClients field as "no update", so without this the + // "everyone went offline" transition was silently dropped — stale online + // users lingered in the list and the online filter kept showing them. onlineClients := j.inboundService.GetOnlineClients() + if onlineClients == nil { + onlineClients = []string{} + } lastOnlineMap, err := j.inboundService.GetClientsLastOnline() if err != nil { logger.Warning("get clients last online failed:", err) + } + if lastOnlineMap == nil { lastOnlineMap = make(map[string]int64) } - - // Broadcast traffic update (deltas and online stats) via WebSocket - trafficUpdate := map[string]any{ + websocket.BroadcastTraffic(map[string]any{ "traffics": traffics, "clientTraffics": clientTraffics, "onlineClients": onlineClients, "lastOnlineMap": lastOnlineMap, - } - websocket.BroadcastTraffic(trafficUpdate) + }) - // Fetch updated inbounds from database with accumulated traffic values - // This ensures frontend receives the actual total traffic for real-time UI refresh. - updatedInbounds, err := j.inboundService.GetAllInbounds() - if err != nil { - logger.Warning("get all inbounds for websocket failed:", err) + // Compact delta payload: per-client absolute counters for clients active + // this cycle, plus inbound-level absolute totals. Frontend applies both + // in-place — typical payload ~10–50KB even for 10k+ client deployments. + // Replaces the old full-inbound-list broadcast that hit WS size limits + // (5–10MB) and forced the frontend into a REST refetch. + clientStatsPayload := map[string]any{} + if activeEmails := activeEmails(clientTraffics); len(activeEmails) > 0 { + if stats, err := j.inboundService.GetActiveClientTraffics(activeEmails); err != nil { + logger.Warning("get active client traffics for websocket failed:", err) + } else if len(stats) > 0 { + clientStatsPayload["clients"] = stats + } } - - updatedOutbounds, err := j.outboundService.GetOutboundsTraffic() - if err != nil { - logger.Warning("get all outbounds for websocket failed:", err) + if inboundSummary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil { + logger.Warning("get inbounds traffic summary for websocket failed:", err) + } else if len(inboundSummary) > 0 { + clientStatsPayload["inbounds"] = inboundSummary } - - // The WebSocket hub will automatically check the payload size. - // If it exceeds 100MB, it sends a lightweight 'invalidate' signal instead. - if updatedInbounds != nil { - websocket.BroadcastInbounds(updatedInbounds) + if len(clientStatsPayload) > 0 { + websocket.BroadcastClientStats(clientStatsPayload) } - if updatedOutbounds != nil { + // Outbounds list is small (one row per outbound, no per-client expansion) + // so the full snapshot still fits comfortably in WS. + if updatedOutbounds, err := j.outboundService.GetOutboundsTraffic(); err == nil && updatedOutbounds != nil { websocket.BroadcastOutbounds(updatedOutbounds) + } else if err != nil { + logger.Warning("get all outbounds for websocket failed:", err) + } +} + +// activeEmails returns the set of client emails that had non-zero traffic in +// the current collection window. Idle clients are skipped — no need to push +// their (unchanged) counters to the frontend. +func activeEmails(clientTraffics []*xray.ClientTraffic) []string { + if len(clientTraffics) == 0 { + return nil + } + emails := make([]string, 0, len(clientTraffics)) + for _, ct := range clientTraffics { + if ct == nil || ct.Email == "" { + continue + } + if ct.Up == 0 && ct.Down == 0 { + continue + } + emails = append(emails, ct.Email) } + return emails } func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) { -- cgit v1.2.3