diff options
| author | lolka1333 <xtrafcyz@gmail.com> | 2026-05-05 18:27:49 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-05-05 18:27:49 +0300 |
| commit | 8177f6dc667f2edca66073e433baf5cff36cda41 (patch) | |
| tree | 7fddbec4848c3c0ab0fc39e3dc45703c368e9340 /web/service | |
| parent | 77d94b25d054bd6cf7ace029571db9c58ae87fa9 (diff) | |
* ws/inbounds: realtime fixes + perf for 10k+ client inbounds
- hub: dedup, throttle, panic-restart, deadlock fix, race tests
- client: backoff cap + slow-retry instead of giving up
- broadcast: delta-only payload, count-based invalidate fallback
- filter: fix empty online list (Inbound has no .id, use dbInbound.toInbound)
- perf: O(N²)→O(N) traffic merge, bulk delete, /setEnable endpoint
- traffic: monotonic all_time + UI clamp + propagate in delta handler
- session: persist on update/logout (fixes logout-after-password-change)
- ui: protocol tags flex, traffic bar normalize
* Remove hub_test.go file
* fix: ws hub, inbound service, and frontend correctness
- propagate DelInbound error on disable path in SetInboundEnable
- skip empty emails in updateClientTraffics to avoid constraint violations
- use consistent IN ? clause, drop redundant ErrRecordNotFound guards
- Hub.Unregister: direct removeClient fallback when channel is full
- applyClientStatsDelta: O(1) email lookup via per-inbound Map cache
- WS payload size check: Blob.size instead of .length for real byte count
* fix: chunk large IN ? queries and fix IPv6 same-origin check
* fix: chunk large IN ? queries and fix IPv6 same-origin check
* fix: unify clientStats cache, throttle clarity, hub constants
* fix(ui): align traffic/expiry cell columns across all rows
* style(ui): redesign outbounds table for visual consistency
* style(ui): redesign routing table for visual consistency
* fix:
* fix:
* fix:
* fix:
* fix:
* fix: font
* refactor: simplify outbound tone functions for consistency and maintainability
---------
Co-authored-by: lolka1333 <test123@gmail.com>
Diffstat (limited to 'web/service')
| -rw-r--r-- | web/service/inbound.go | 379 |
1 files changed, 316 insertions, 63 deletions
diff --git a/web/service/inbound.go b/web/service/inbound.go index f536efb9..d011e093 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -366,10 +366,22 @@ func (s *InboundService) DelInbound(id int) (bool, error) { if err != nil { return false, err } - for _, client := range clients { - err := s.DelClientIPs(db, client.Email) - if err != nil { - return false, err + // Bulk-delete client IPs for every email in this inbound. The previous + // per-client loop fired one DELETE per row — at 7k+ clients that meant + // thousands of synchronous SQL roundtrips and a multi-second freeze. + // Chunked to stay under SQLite's bind-variable limit on huge inbounds. + if len(clients) > 0 { + emails := make([]string, 0, len(clients)) + for i := range clients { + if clients[i].Email != "" { + emails = append(emails, clients[i].Email) + } + } + for _, batch := range chunkStrings(uniqueNonEmptyStrings(emails), sqliteMaxVars) { + if err := db.Where("client_email IN ?", batch). + Delete(model.InboundClientIps{}).Error; err != nil { + return false, err + } } } @@ -386,6 +398,66 @@ func (s *InboundService) GetInbound(id int) (*model.Inbound, error) { return inbound, nil } +// SetInboundEnable toggles only the enable flag of an inbound, without +// rewriting the (potentially multi-MB) settings JSON. Used by the UI's +// per-row enable switch — for inbounds with thousands of clients the full +// UpdateInbound path is an order of magnitude too slow for an interactive +// toggle (parses + reserialises every client, runs O(N) traffic diff). +// +// Returns (needRestart, error). needRestart is true when the xray runtime +// could not be re-synced from the cached config and a full restart is +// required to pick up the change. +func (s *InboundService) SetInboundEnable(id int, enable bool) (bool, error) { + inbound, err := s.GetInbound(id) + if err != nil { + return false, err + } + if inbound.Enable == enable { + return false, nil + } + + db := database.GetDB() + if err := db.Model(model.Inbound{}).Where("id = ?", id). + Update("enable", enable).Error; err != nil { + return false, err + } + inbound.Enable = enable + + // Sync xray runtime: drop the live inbound, add it back if we're enabling. + // "User not found"-style errors from DelInbound mean the inbound was + // already absent from the live config — that's fine. Any other error + // means the live config and DB diverged, so we ask the caller to + // schedule a restart. + needRestart := false + s.xrayApi.Init(p.GetAPIPort()) + defer s.xrayApi.Close() + + if err := s.xrayApi.DelInbound(inbound.Tag); err != nil && + !strings.Contains(err.Error(), "not found") { + logger.Debug("SetInboundEnable: DelInbound via api failed:", err) + needRestart = true + } + if !enable { + return needRestart, nil + } + + runtimeInbound, err := s.buildRuntimeInboundForAPI(db, inbound) + if err != nil { + logger.Debug("SetInboundEnable: build runtime config failed:", err) + return true, nil + } + inboundJson, err := json.MarshalIndent(runtimeInbound.GenXrayInboundConfig(), "", " ") + if err != nil { + logger.Debug("SetInboundEnable: marshal runtime config failed:", err) + return true, nil + } + if err := s.xrayApi.AddInbound(inboundJson); err != nil { + logger.Debug("SetInboundEnable: AddInbound via api failed:", err) + needRestart = true + } + return needRestart, nil +} + // UpdateInbound modifies an existing inbound configuration. // It validates changes, updates the database, and syncs with the running Xray instance. // Returns the updated inbound, whether Xray needs restart, and any error. @@ -589,6 +661,11 @@ func (s *InboundService) buildRuntimeInboundForAPI(tx *gorm.DB, inbound *model.I return &runtimeInbound, nil } +// updateClientTraffics syncs the ClientTraffic rows with the inbound's clients +// list: removes rows for emails that disappeared, inserts rows for newly-added +// emails. Uses sets for O(N) lookup — the previous nested-loop implementation +// was O(N²) and degraded into multi-second pauses on inbounds with thousands +// of clients (toggling, saving, or deleting any such inbound felt frozen). func (s *InboundService) updateClientTraffics(tx *gorm.DB, oldInbound *model.Inbound, newInbound *model.Inbound) error { oldClients, err := s.GetClients(oldInbound) if err != nil { @@ -599,36 +676,48 @@ func (s *InboundService) updateClientTraffics(tx *gorm.DB, oldInbound *model.Inb return err } - var emailExists bool + // Email is the unique key for ClientTraffic rows. Clients without an + // email have no stats row to sync — skip them on both sides instead of + // risking a unique-constraint hit or accidental delete of an unrelated row. + oldEmails := make(map[string]struct{}, len(oldClients)) + for i := range oldClients { + if oldClients[i].Email == "" { + continue + } + oldEmails[oldClients[i].Email] = struct{}{} + } + newEmails := make(map[string]struct{}, len(newClients)) + for i := range newClients { + if newClients[i].Email == "" { + continue + } + newEmails[newClients[i].Email] = struct{}{} + } - for _, oldClient := range oldClients { - emailExists = false - for _, newClient := range newClients { - if oldClient.Email == newClient.Email { - emailExists = true - break - } + // Removed clients — drop their stats rows. + for i := range oldClients { + email := oldClients[i].Email + if email == "" { + continue } - if !emailExists { - err = s.DelClientStat(tx, oldClient.Email) - if err != nil { - return err - } + if _, kept := newEmails[email]; kept { + continue + } + if err := s.DelClientStat(tx, email); err != nil { + return err } } - for _, newClient := range newClients { - emailExists = false - for _, oldClient := range oldClients { - if newClient.Email == oldClient.Email { - emailExists = true - break - } + // Added clients — create their stats rows. + for i := range newClients { + email := newClients[i].Email + if email == "" { + continue } - if !emailExists { - err = s.AddClientStat(tx, oldInbound.Id, &newClient) - if err != nil { - return err - } + if _, existed := oldEmails[email]; existed { + continue + } + if err := s.AddClientStat(tx, oldInbound.Id, &newClients[i]); err != nil { + return err } } return nil @@ -1228,7 +1317,7 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin return needRestart, tx.Save(oldInbound).Error } -func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (error, bool, bool) { +func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, error) { var err error db := database.GetDB() tx := db.Begin() @@ -1242,11 +1331,11 @@ func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraff }() err = s.addInboundTraffic(tx, inboundTraffics) if err != nil { - return err, false, false + return false, false, err } err = s.addClientTraffic(tx, clientTraffics) if err != nil { - return err, false, false + return false, false, err } needRestart0, count, err := s.autoRenewClients(tx) @@ -1271,7 +1360,7 @@ func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraff } else if count > 0 { logger.Debugf("%v inbounds disabled", count) } - return nil, (needRestart0 || needRestart1 || needRestart2), disabledClientsCount > 0 + return needRestart0 || needRestart1 || needRestart2, disabledClientsCount > 0, nil } func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error { @@ -1328,20 +1417,27 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr return err } + // Index by email for O(N) merge — the previous nested loop was O(N²) + // and dominated each cron tick on inbounds with thousands of active + // clients (7500 × 7500 = 56M string comparisons every 10 seconds). + trafficByEmail := make(map[string]*xray.ClientTraffic, len(traffics)) + for i := range traffics { + if traffics[i] != nil { + trafficByEmail[traffics[i].Email] = traffics[i] + } + } + now := time.Now().UnixMilli() for dbTraffic_index := range dbClientTraffics { - for traffic_index := range traffics { - if dbClientTraffics[dbTraffic_index].Email == traffics[traffic_index].Email { - dbClientTraffics[dbTraffic_index].Up += traffics[traffic_index].Up - dbClientTraffics[dbTraffic_index].Down += traffics[traffic_index].Down - dbClientTraffics[dbTraffic_index].AllTime += (traffics[traffic_index].Up + traffics[traffic_index].Down) - - // Add user in onlineUsers array on traffic - if traffics[traffic_index].Up+traffics[traffic_index].Down > 0 { - onlineClients = append(onlineClients, traffics[traffic_index].Email) - dbClientTraffics[dbTraffic_index].LastOnline = time.Now().UnixMilli() - } - break - } + t, ok := trafficByEmail[dbClientTraffics[dbTraffic_index].Email] + if !ok { + continue + } + dbClientTraffics[dbTraffic_index].Up += t.Up + dbClientTraffics[dbTraffic_index].Down += t.Down + dbClientTraffics[dbTraffic_index].AllTime += t.Up + t.Down + if t.Up+t.Down > 0 { + onlineClients = append(onlineClients, t.Email) + dbClientTraffics[dbTraffic_index].LastOnline = now } } @@ -1441,9 +1537,17 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) { for _, traffic := range traffics { inbound_ids = append(inbound_ids, traffic.InboundId) } - err = tx.Model(model.Inbound{}).Where("id IN ?", inbound_ids).Find(&inbounds).Error - if err != nil { - return false, 0, err + // Dedupe so an inbound hosting N expired clients is fetched and saved once + // per tick instead of N times across chunk boundaries. + inbound_ids = uniqueInts(inbound_ids) + // Chunked to stay under SQLite's bind-variable limit when many inbounds + // are touched in a single tick. + for _, batch := range chunkInts(inbound_ids, sqliteMaxVars) { + var page []*model.Inbound + if err = tx.Model(model.Inbound{}).Where("id IN ?", batch).Find(&page).Error; err != nil { + return false, 0, err + } + inbounds = append(inbounds, page...) } for inbound_index := range inbounds { settings := map[string]any{} @@ -2362,15 +2466,24 @@ func (s *InboundService) GetClientTrafficTgBot(tgId int64) ([]*xray.ClientTraffi } } - var traffics []*xray.ClientTraffic - err = db.Model(xray.ClientTraffic{}).Where("email IN ?", emails).Find(&traffics).Error - if err != nil { - if err == gorm.ErrRecordNotFound { - logger.Warning("No ClientTraffic records found for emails:", emails) - return nil, nil + // Chunked to stay under SQLite's bind-variable limit when a single Telegram + // account owns thousands of clients across inbounds. + uniqEmails := uniqueNonEmptyStrings(emails) + traffics := make([]*xray.ClientTraffic, 0, len(uniqEmails)) + for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) { + var page []*xray.ClientTraffic + if err = db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil { + if err == gorm.ErrRecordNotFound { + continue + } + logger.Errorf("Error retrieving ClientTraffic for emails %v: %v", batch, err) + return nil, err } - logger.Errorf("Error retrieving ClientTraffic for emails %v: %v", emails, err) - return nil, err + traffics = append(traffics, page...) + } + if len(traffics) == 0 { + logger.Warning("No ClientTraffic records found for emails:", emails) + return nil, nil } // Populate UUID and other client data for each traffic record @@ -2385,6 +2498,133 @@ func (s *InboundService) GetClientTrafficTgBot(tgId int64) ([]*xray.ClientTraffi return traffics, nil } +// sqliteMaxVars is a safe ceiling for the number of bind parameters in a +// single SQL statement. SQLite's SQLITE_MAX_VARIABLE_NUMBER is 999 on builds +// before 3.32 and 32766 after; staying under 999 keeps queries portable +// across forks/old binaries and also bounds per-query memory on truly large +// installs (>32k clients) where even modern SQLite would refuse a single IN. +const sqliteMaxVars = 900 + +// uniqueNonEmptyStrings returns a deduplicated copy of in with empty strings +// removed, preserving the order of first occurrence. +func uniqueNonEmptyStrings(in []string) []string { + if len(in) == 0 { + return nil + } + seen := make(map[string]struct{}, len(in)) + out := make([]string, 0, len(in)) + for _, v := range in { + if v == "" { + continue + } + if _, ok := seen[v]; ok { + continue + } + seen[v] = struct{}{} + out = append(out, v) + } + return out +} + +// uniqueInts returns a deduplicated copy of in, preserving order of first occurrence. +func uniqueInts(in []int) []int { + if len(in) == 0 { + return nil + } + seen := make(map[int]struct{}, len(in)) + out := make([]int, 0, len(in)) + for _, v := range in { + if _, ok := seen[v]; ok { + continue + } + seen[v] = struct{}{} + out = append(out, v) + } + return out +} + +// chunkStrings splits s into consecutive sub-slices of at most size elements. +// Returns nil for an empty input or non-positive size. +func chunkStrings(s []string, size int) [][]string { + if size <= 0 || len(s) == 0 { + return nil + } + out := make([][]string, 0, (len(s)+size-1)/size) + for i := 0; i < len(s); i += size { + end := i + size + if end > len(s) { + end = len(s) + } + out = append(out, s[i:end]) + } + return out +} + +// chunkInts splits s into consecutive sub-slices of at most size elements. +// Returns nil for an empty input or non-positive size. +func chunkInts(s []int, size int) [][]int { + if size <= 0 || len(s) == 0 { + return nil + } + out := make([][]int, 0, (len(s)+size-1)/size) + for i := 0; i < len(s); i += size { + end := i + size + if end > len(s) { + end = len(s) + } + out = append(out, s[i:end]) + } + return out +} + +// GetActiveClientTraffics returns the absolute ClientTraffic rows for the given +// emails. Used by the WebSocket delta path to push per-client absolute +// counters without re-serializing the full inbound list. The query is chunked +// to stay under SQLite's bind-variable limit on very large active sets. +// Empty input returns (nil, nil). +func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.ClientTraffic, error) { + uniq := uniqueNonEmptyStrings(emails) + if len(uniq) == 0 { + return nil, nil + } + db := database.GetDB() + traffics := make([]*xray.ClientTraffic, 0, len(uniq)) + for _, batch := range chunkStrings(uniq, sqliteMaxVars) { + var page []*xray.ClientTraffic + if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil { + return nil, err + } + traffics = append(traffics, page...) + } + return traffics, nil +} + +// InboundTrafficSummary is the minimal projection of an inbound's traffic +// counters used by the WebSocket delta path. Excludes Settings/StreamSettings +// blobs so the broadcast stays compact even with many inbounds. +type InboundTrafficSummary struct { + Id int `json:"id"` + Up int64 `json:"up"` + Down int64 `json:"down"` + Total int64 `json:"total"` + AllTime int64 `json:"allTime"` + Enable bool `json:"enable"` +} + +// GetInboundsTrafficSummary returns inbound-level absolute traffic counters +// (no per-client expansion). Companion to GetActiveClientTraffics — together +// they replace the heavy "full inbound list" broadcast on each cron tick. +func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, error) { + db := database.GetDB() + var summaries []InboundTrafficSummary + if err := db.Model(&model.Inbound{}). + Select("id, up, down, total, all_time, enable"). + Find(&summaries).Error; err != nil { + return nil, err + } + return summaries, nil +} + func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.ClientTraffic, err error) { // Prefer retrieving along with client to reflect actual enabled state from inbound settings t, client, err := s.GetClientByEmail(email) @@ -2403,9 +2643,17 @@ func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.Cl func (s *InboundService) UpdateClientTrafficByEmail(email string, upload int64, download int64) error { db := database.GetDB() + // Keep all_time monotonic: it represents historical cumulative usage and + // must never be less than the currently-tracked up+down. Without this, + // the UI showed "Общий трафик" (allTime) below the live consumed value + // after admins manually edited a client's counters. result := db.Model(xray.ClientTraffic{}). Where("email = ?", email). - Updates(map[string]any{"up": upload, "down": download}) + Updates(map[string]any{ + "up": upload, + "down": download, + "all_time": gorm.Expr("CASE WHEN COALESCE(all_time, 0) < ? THEN ? ELSE all_time END", upload+download, upload+download), + }) err := result.Error if err != nil { @@ -2746,11 +2994,16 @@ func (s *InboundService) GetClientsLastOnline() (map[string]int64, error) { func (s *InboundService) FilterAndSortClientEmails(emails []string) ([]string, []string, error) { db := database.GetDB() - // Step 1: Get ClientTraffic records for emails in the input list - var clients []xray.ClientTraffic - err := db.Where("email IN ?", emails).Find(&clients).Error - if err != nil && err != gorm.ErrRecordNotFound { - return nil, nil, err + // Step 1: Get ClientTraffic records for emails in the input list. + // Chunked to stay under SQLite's bind-variable limit on huge inputs. + uniqEmails := uniqueNonEmptyStrings(emails) + clients := make([]xray.ClientTraffic, 0, len(uniqEmails)) + for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) { + var page []xray.ClientTraffic + if err := db.Where("email IN ?", batch).Find(&page).Error; err != nil && err != gorm.ErrRecordNotFound { + return nil, nil, err + } + clients = append(clients, page...) } // Step 2: Sort clients by (Up + Down) descending |
