diff options
| author | lolka1333 <xtrafcyz@gmail.com> | 2026-05-05 18:27:49 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-05-05 18:27:49 +0300 |
| commit | 8177f6dc667f2edca66073e433baf5cff36cda41 (patch) | |
| tree | 7fddbec4848c3c0ab0fc39e3dc45703c368e9340 /web/job | |
| parent | 77d94b25d054bd6cf7ace029571db9c58ae87fa9 (diff) | |
* ws/inbounds: realtime fixes + perf for 10k+ client inbounds
- hub: dedup, throttle, panic-restart, deadlock fix, race tests
- client: backoff cap + slow-retry instead of giving up
- broadcast: delta-only payload, count-based invalidate fallback
- filter: fix empty online list (Inbound has no .id, use dbInbound.toInbound)
- perf: O(N²)→O(N) traffic merge, bulk delete, /setEnable endpoint
- traffic: monotonic all_time + UI clamp + propagate in delta handler
- session: persist on update/logout (fixes logout-after-password-change)
- ui: protocol tags flex, traffic bar normalize
* Remove hub_test.go file
* fix: ws hub, inbound service, and frontend correctness
- propagate DelInbound error on disable path in SetInboundEnable
- skip empty emails in updateClientTraffics to avoid constraint violations
- use consistent IN ? clause, drop redundant ErrRecordNotFound guards
- Hub.Unregister: direct removeClient fallback when channel is full
- applyClientStatsDelta: O(1) email lookup via per-inbound Map cache
- WS payload size check: Blob.size instead of .length for real byte count
* fix: chunk large IN ? queries and fix IPv6 same-origin check
* fix: chunk large IN ? queries and fix IPv6 same-origin check
* fix: unify clientStats cache, throttle clarity, hub constants
* fix(ui): align traffic/expiry cell columns across all rows
* style(ui): redesign outbounds table for visual consistency
* style(ui): redesign routing table for visual consistency
* fix:
* fix:
* fix:
* fix:
* fix:
* fix: font
* refactor: simplify outbound tone functions for consistency and maintainability
---------
Co-authored-by: lolka1333 <test123@gmail.com>
Diffstat (limited to 'web/job')
| -rw-r--r-- | web/job/xray_traffic_job.go | 87 |
1 files changed, 62 insertions, 25 deletions
diff --git a/web/job/xray_traffic_job.go b/web/job/xray_traffic_job.go index 71e90caa..f3b04868 100644 --- a/web/job/xray_traffic_job.go +++ b/web/job/xray_traffic_job.go @@ -24,7 +24,9 @@ func NewXrayTrafficJob() *XrayTrafficJob { return new(XrayTrafficJob) } -// Run collects traffic statistics from Xray and updates the database, triggering restart if needed. +// Run collects traffic statistics from Xray, updates the database, and pushes +// real-time updates over WebSocket using compact delta payloads — no REST +// fallback, scales to 10k–20k+ clients per inbound. func (j *XrayTrafficJob) Run() { if !j.xrayService.IsXrayRunning() { return @@ -33,7 +35,7 @@ func (j *XrayTrafficJob) Run() { if err != nil { return } - err, needRestart0, clientsDisabled := j.inboundService.AddTraffic(traffics, clientTraffics) + needRestart0, clientsDisabled, err := j.inboundService.AddTraffic(traffics, clientTraffics) if err != nil { logger.Warning("add inbound traffic failed:", err) } @@ -62,50 +64,85 @@ func (j *XrayTrafficJob) Run() { j.xrayService.SetToNeedRestart() } - // If no frontend client is connected, skip all WebSocket broadcasting routines, - // including expensive DB queries for online clients and JSON marshaling. + // If no frontend client is connected, skip all WebSocket broadcasting + // routines — including the active-client DB query and JSON marshaling. if !websocket.HasClients() { return } - // Update online clients list and map + // Online presence + traffic deltas — small payload, always fits in WS. + // Force non-nil slice/map so JSON marshalling produces [] / {} instead of + // `null` when everyone is offline. The frontend's traffic handler treats + // a missing/null onlineClients field as "no update", so without this the + // "everyone went offline" transition was silently dropped — stale online + // users lingered in the list and the online filter kept showing them. onlineClients := j.inboundService.GetOnlineClients() + if onlineClients == nil { + onlineClients = []string{} + } lastOnlineMap, err := j.inboundService.GetClientsLastOnline() if err != nil { logger.Warning("get clients last online failed:", err) + } + if lastOnlineMap == nil { lastOnlineMap = make(map[string]int64) } - - // Broadcast traffic update (deltas and online stats) via WebSocket - trafficUpdate := map[string]any{ + websocket.BroadcastTraffic(map[string]any{ "traffics": traffics, "clientTraffics": clientTraffics, "onlineClients": onlineClients, "lastOnlineMap": lastOnlineMap, - } - websocket.BroadcastTraffic(trafficUpdate) + }) - // Fetch updated inbounds from database with accumulated traffic values - // This ensures frontend receives the actual total traffic for real-time UI refresh. - updatedInbounds, err := j.inboundService.GetAllInbounds() - if err != nil { - logger.Warning("get all inbounds for websocket failed:", err) + // Compact delta payload: per-client absolute counters for clients active + // this cycle, plus inbound-level absolute totals. Frontend applies both + // in-place — typical payload ~10–50KB even for 10k+ client deployments. + // Replaces the old full-inbound-list broadcast that hit WS size limits + // (5–10MB) and forced the frontend into a REST refetch. + clientStatsPayload := map[string]any{} + if activeEmails := activeEmails(clientTraffics); len(activeEmails) > 0 { + if stats, err := j.inboundService.GetActiveClientTraffics(activeEmails); err != nil { + logger.Warning("get active client traffics for websocket failed:", err) + } else if len(stats) > 0 { + clientStatsPayload["clients"] = stats + } } - - updatedOutbounds, err := j.outboundService.GetOutboundsTraffic() - if err != nil { - logger.Warning("get all outbounds for websocket failed:", err) + if inboundSummary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil { + logger.Warning("get inbounds traffic summary for websocket failed:", err) + } else if len(inboundSummary) > 0 { + clientStatsPayload["inbounds"] = inboundSummary } - - // The WebSocket hub will automatically check the payload size. - // If it exceeds 100MB, it sends a lightweight 'invalidate' signal instead. - if updatedInbounds != nil { - websocket.BroadcastInbounds(updatedInbounds) + if len(clientStatsPayload) > 0 { + websocket.BroadcastClientStats(clientStatsPayload) } - if updatedOutbounds != nil { + // Outbounds list is small (one row per outbound, no per-client expansion) + // so the full snapshot still fits comfortably in WS. + if updatedOutbounds, err := j.outboundService.GetOutboundsTraffic(); err == nil && updatedOutbounds != nil { websocket.BroadcastOutbounds(updatedOutbounds) + } else if err != nil { + logger.Warning("get all outbounds for websocket failed:", err) + } +} + +// activeEmails returns the set of client emails that had non-zero traffic in +// the current collection window. Idle clients are skipped — no need to push +// their (unchanged) counters to the frontend. +func activeEmails(clientTraffics []*xray.ClientTraffic) []string { + if len(clientTraffics) == 0 { + return nil + } + emails := make([]string, 0, len(clientTraffics)) + for _, ct := range clientTraffics { + if ct == nil || ct.Email == "" { + continue + } + if ct.Up == 0 && ct.Down == 0 { + continue + } + emails = append(emails, ct.Email) } + return emails } func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) { |
