Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/MHSanaei/3x-ui.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/web/job
diff options
context:
space:
mode:
Diffstat (limited to 'web/job')
-rw-r--r--web/job/xray_traffic_job.go87
1 files changed, 62 insertions, 25 deletions
diff --git a/web/job/xray_traffic_job.go b/web/job/xray_traffic_job.go
index 71e90caa..f3b04868 100644
--- a/web/job/xray_traffic_job.go
+++ b/web/job/xray_traffic_job.go
@@ -24,7 +24,9 @@ func NewXrayTrafficJob() *XrayTrafficJob {
return new(XrayTrafficJob)
}
-// Run collects traffic statistics from Xray and updates the database, triggering restart if needed.
+// Run collects traffic statistics from Xray, updates the database, and pushes
+// real-time updates over WebSocket using compact delta payloads — no REST
+// fallback, scales to 10k–20k+ clients per inbound.
func (j *XrayTrafficJob) Run() {
if !j.xrayService.IsXrayRunning() {
return
@@ -33,7 +35,7 @@ func (j *XrayTrafficJob) Run() {
if err != nil {
return
}
- err, needRestart0, clientsDisabled := j.inboundService.AddTraffic(traffics, clientTraffics)
+ needRestart0, clientsDisabled, err := j.inboundService.AddTraffic(traffics, clientTraffics)
if err != nil {
logger.Warning("add inbound traffic failed:", err)
}
@@ -62,50 +64,85 @@ func (j *XrayTrafficJob) Run() {
j.xrayService.SetToNeedRestart()
}
- // If no frontend client is connected, skip all WebSocket broadcasting routines,
- // including expensive DB queries for online clients and JSON marshaling.
+ // If no frontend client is connected, skip all WebSocket broadcasting
+ // routines — including the active-client DB query and JSON marshaling.
if !websocket.HasClients() {
return
}
- // Update online clients list and map
+ // Online presence + traffic deltas — small payload, always fits in WS.
+ // Force non-nil slice/map so JSON marshalling produces [] / {} instead of
+ // `null` when everyone is offline. The frontend's traffic handler treats
+ // a missing/null onlineClients field as "no update", so without this the
+ // "everyone went offline" transition was silently dropped — stale online
+ // users lingered in the list and the online filter kept showing them.
onlineClients := j.inboundService.GetOnlineClients()
+ if onlineClients == nil {
+ onlineClients = []string{}
+ }
lastOnlineMap, err := j.inboundService.GetClientsLastOnline()
if err != nil {
logger.Warning("get clients last online failed:", err)
+ }
+ if lastOnlineMap == nil {
lastOnlineMap = make(map[string]int64)
}
-
- // Broadcast traffic update (deltas and online stats) via WebSocket
- trafficUpdate := map[string]any{
+ websocket.BroadcastTraffic(map[string]any{
"traffics": traffics,
"clientTraffics": clientTraffics,
"onlineClients": onlineClients,
"lastOnlineMap": lastOnlineMap,
- }
- websocket.BroadcastTraffic(trafficUpdate)
+ })
- // Fetch updated inbounds from database with accumulated traffic values
- // This ensures frontend receives the actual total traffic for real-time UI refresh.
- updatedInbounds, err := j.inboundService.GetAllInbounds()
- if err != nil {
- logger.Warning("get all inbounds for websocket failed:", err)
+ // Compact delta payload: per-client absolute counters for clients active
+ // this cycle, plus inbound-level absolute totals. Frontend applies both
+ // in-place — typical payload ~10–50KB even for 10k+ client deployments.
+ // Replaces the old full-inbound-list broadcast that hit WS size limits
+ // (5–10MB) and forced the frontend into a REST refetch.
+ clientStatsPayload := map[string]any{}
+ if activeEmails := activeEmails(clientTraffics); len(activeEmails) > 0 {
+ if stats, err := j.inboundService.GetActiveClientTraffics(activeEmails); err != nil {
+ logger.Warning("get active client traffics for websocket failed:", err)
+ } else if len(stats) > 0 {
+ clientStatsPayload["clients"] = stats
+ }
}
-
- updatedOutbounds, err := j.outboundService.GetOutboundsTraffic()
- if err != nil {
- logger.Warning("get all outbounds for websocket failed:", err)
+ if inboundSummary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
+ logger.Warning("get inbounds traffic summary for websocket failed:", err)
+ } else if len(inboundSummary) > 0 {
+ clientStatsPayload["inbounds"] = inboundSummary
}
-
- // The WebSocket hub will automatically check the payload size.
- // If it exceeds 100MB, it sends a lightweight 'invalidate' signal instead.
- if updatedInbounds != nil {
- websocket.BroadcastInbounds(updatedInbounds)
+ if len(clientStatsPayload) > 0 {
+ websocket.BroadcastClientStats(clientStatsPayload)
}
- if updatedOutbounds != nil {
+ // Outbounds list is small (one row per outbound, no per-client expansion)
+ // so the full snapshot still fits comfortably in WS.
+ if updatedOutbounds, err := j.outboundService.GetOutboundsTraffic(); err == nil && updatedOutbounds != nil {
websocket.BroadcastOutbounds(updatedOutbounds)
+ } else if err != nil {
+ logger.Warning("get all outbounds for websocket failed:", err)
+ }
+}
+
+// activeEmails returns the set of client emails that had non-zero traffic in
+// the current collection window. Idle clients are skipped — no need to push
+// their (unchanged) counters to the frontend.
+func activeEmails(clientTraffics []*xray.ClientTraffic) []string {
+ if len(clientTraffics) == 0 {
+ return nil
+ }
+ emails := make([]string, 0, len(clientTraffics))
+ for _, ct := range clientTraffics {
+ if ct == nil || ct.Email == "" {
+ continue
+ }
+ if ct.Up == 0 && ct.Down == 0 {
+ continue
+ }
+ emails = append(emails, ct.Email)
}
+ return emails
}
func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) {