Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/MHSanaei/3x-ui.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlolka1333 <xtrafcyz@gmail.com>2026-05-05 18:27:49 +0300
committerGitHub <noreply@github.com>2026-05-05 18:27:49 +0300
commit8177f6dc667f2edca66073e433baf5cff36cda41 (patch)
tree7fddbec4848c3c0ab0fc39e3dc45703c368e9340 /web/job/xray_traffic_job.go
parent77d94b25d054bd6cf7ace029571db9c58ae87fa9 (diff)
ws/inbounds: realtime fixes + perf for 10k+ client inbounds (#4123)HEADmain
* ws/inbounds: realtime fixes + perf for 10k+ client inbounds - hub: dedup, throttle, panic-restart, deadlock fix, race tests - client: backoff cap + slow-retry instead of giving up - broadcast: delta-only payload, count-based invalidate fallback - filter: fix empty online list (Inbound has no .id, use dbInbound.toInbound) - perf: O(N²)→O(N) traffic merge, bulk delete, /setEnable endpoint - traffic: monotonic all_time + UI clamp + propagate in delta handler - session: persist on update/logout (fixes logout-after-password-change) - ui: protocol tags flex, traffic bar normalize * Remove hub_test.go file * fix: ws hub, inbound service, and frontend correctness - propagate DelInbound error on disable path in SetInboundEnable - skip empty emails in updateClientTraffics to avoid constraint violations - use consistent IN ? clause, drop redundant ErrRecordNotFound guards - Hub.Unregister: direct removeClient fallback when channel is full - applyClientStatsDelta: O(1) email lookup via per-inbound Map cache - WS payload size check: Blob.size instead of .length for real byte count * fix: chunk large IN ? queries and fix IPv6 same-origin check * fix: chunk large IN ? queries and fix IPv6 same-origin check * fix: unify clientStats cache, throttle clarity, hub constants * fix(ui): align traffic/expiry cell columns across all rows * style(ui): redesign outbounds table for visual consistency * style(ui): redesign routing table for visual consistency * fix: * fix: * fix: * fix: * fix: * fix: font * refactor: simplify outbound tone functions for consistency and maintainability --------- Co-authored-by: lolka1333 <test123@gmail.com>
Diffstat (limited to 'web/job/xray_traffic_job.go')
-rw-r--r--web/job/xray_traffic_job.go87
1 files changed, 62 insertions, 25 deletions
diff --git a/web/job/xray_traffic_job.go b/web/job/xray_traffic_job.go
index 71e90caa..f3b04868 100644
--- a/web/job/xray_traffic_job.go
+++ b/web/job/xray_traffic_job.go
@@ -24,7 +24,9 @@ func NewXrayTrafficJob() *XrayTrafficJob {
return new(XrayTrafficJob)
}
-// Run collects traffic statistics from Xray and updates the database, triggering restart if needed.
+// Run collects traffic statistics from Xray, updates the database, and pushes
+// real-time updates over WebSocket using compact delta payloads — no REST
+// fallback, scales to 10k–20k+ clients per inbound.
func (j *XrayTrafficJob) Run() {
if !j.xrayService.IsXrayRunning() {
return
@@ -33,7 +35,7 @@ func (j *XrayTrafficJob) Run() {
if err != nil {
return
}
- err, needRestart0, clientsDisabled := j.inboundService.AddTraffic(traffics, clientTraffics)
+ needRestart0, clientsDisabled, err := j.inboundService.AddTraffic(traffics, clientTraffics)
if err != nil {
logger.Warning("add inbound traffic failed:", err)
}
@@ -62,50 +64,85 @@ func (j *XrayTrafficJob) Run() {
j.xrayService.SetToNeedRestart()
}
- // If no frontend client is connected, skip all WebSocket broadcasting routines,
- // including expensive DB queries for online clients and JSON marshaling.
+ // If no frontend client is connected, skip all WebSocket broadcasting
+ // routines — including the active-client DB query and JSON marshaling.
if !websocket.HasClients() {
return
}
- // Update online clients list and map
+ // Online presence + traffic deltas — small payload, always fits in WS.
+ // Force non-nil slice/map so JSON marshalling produces [] / {} instead of
+ // `null` when everyone is offline. The frontend's traffic handler treats
+ // a missing/null onlineClients field as "no update", so without this the
+ // "everyone went offline" transition was silently dropped — stale online
+ // users lingered in the list and the online filter kept showing them.
onlineClients := j.inboundService.GetOnlineClients()
+ if onlineClients == nil {
+ onlineClients = []string{}
+ }
lastOnlineMap, err := j.inboundService.GetClientsLastOnline()
if err != nil {
logger.Warning("get clients last online failed:", err)
+ }
+ if lastOnlineMap == nil {
lastOnlineMap = make(map[string]int64)
}
-
- // Broadcast traffic update (deltas and online stats) via WebSocket
- trafficUpdate := map[string]any{
+ websocket.BroadcastTraffic(map[string]any{
"traffics": traffics,
"clientTraffics": clientTraffics,
"onlineClients": onlineClients,
"lastOnlineMap": lastOnlineMap,
- }
- websocket.BroadcastTraffic(trafficUpdate)
+ })
- // Fetch updated inbounds from database with accumulated traffic values
- // This ensures frontend receives the actual total traffic for real-time UI refresh.
- updatedInbounds, err := j.inboundService.GetAllInbounds()
- if err != nil {
- logger.Warning("get all inbounds for websocket failed:", err)
+ // Compact delta payload: per-client absolute counters for clients active
+ // this cycle, plus inbound-level absolute totals. Frontend applies both
+ // in-place — typical payload ~10–50KB even for 10k+ client deployments.
+ // Replaces the old full-inbound-list broadcast that hit WS size limits
+ // (5–10MB) and forced the frontend into a REST refetch.
+ clientStatsPayload := map[string]any{}
+ if activeEmails := activeEmails(clientTraffics); len(activeEmails) > 0 {
+ if stats, err := j.inboundService.GetActiveClientTraffics(activeEmails); err != nil {
+ logger.Warning("get active client traffics for websocket failed:", err)
+ } else if len(stats) > 0 {
+ clientStatsPayload["clients"] = stats
+ }
}
-
- updatedOutbounds, err := j.outboundService.GetOutboundsTraffic()
- if err != nil {
- logger.Warning("get all outbounds for websocket failed:", err)
+ if inboundSummary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
+ logger.Warning("get inbounds traffic summary for websocket failed:", err)
+ } else if len(inboundSummary) > 0 {
+ clientStatsPayload["inbounds"] = inboundSummary
}
-
- // The WebSocket hub will automatically check the payload size.
- // If it exceeds 100MB, it sends a lightweight 'invalidate' signal instead.
- if updatedInbounds != nil {
- websocket.BroadcastInbounds(updatedInbounds)
+ if len(clientStatsPayload) > 0 {
+ websocket.BroadcastClientStats(clientStatsPayload)
}
- if updatedOutbounds != nil {
+ // Outbounds list is small (one row per outbound, no per-client expansion)
+ // so the full snapshot still fits comfortably in WS.
+ if updatedOutbounds, err := j.outboundService.GetOutboundsTraffic(); err == nil && updatedOutbounds != nil {
websocket.BroadcastOutbounds(updatedOutbounds)
+ } else if err != nil {
+ logger.Warning("get all outbounds for websocket failed:", err)
+ }
+}
+
+// activeEmails returns the set of client emails that had non-zero traffic in
+// the current collection window. Idle clients are skipped — no need to push
+// their (unchanged) counters to the frontend.
+func activeEmails(clientTraffics []*xray.ClientTraffic) []string {
+ if len(clientTraffics) == 0 {
+ return nil
+ }
+ emails := make([]string, 0, len(clientTraffics))
+ for _, ct := range clientTraffics {
+ if ct == nil || ct.Email == "" {
+ continue
+ }
+ if ct.Up == 0 && ct.Down == 0 {
+ continue
+ }
+ emails = append(emails, ct.Email)
}
+ return emails
}
func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) {