Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/MHSanaei/3x-ui.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlolka1333 <xtrafcyz@gmail.com>2026-05-05 18:27:49 +0300
committerGitHub <noreply@github.com>2026-05-05 18:27:49 +0300
commit8177f6dc667f2edca66073e433baf5cff36cda41 (patch)
tree7fddbec4848c3c0ab0fc39e3dc45703c368e9340 /web/service
parent77d94b25d054bd6cf7ace029571db9c58ae87fa9 (diff)
ws/inbounds: realtime fixes + perf for 10k+ client inbounds (#4123)HEADmain
* ws/inbounds: realtime fixes + perf for 10k+ client inbounds - hub: dedup, throttle, panic-restart, deadlock fix, race tests - client: backoff cap + slow-retry instead of giving up - broadcast: delta-only payload, count-based invalidate fallback - filter: fix empty online list (Inbound has no .id, use dbInbound.toInbound) - perf: O(N²)→O(N) traffic merge, bulk delete, /setEnable endpoint - traffic: monotonic all_time + UI clamp + propagate in delta handler - session: persist on update/logout (fixes logout-after-password-change) - ui: protocol tags flex, traffic bar normalize * Remove hub_test.go file * fix: ws hub, inbound service, and frontend correctness - propagate DelInbound error on disable path in SetInboundEnable - skip empty emails in updateClientTraffics to avoid constraint violations - use consistent IN ? clause, drop redundant ErrRecordNotFound guards - Hub.Unregister: direct removeClient fallback when channel is full - applyClientStatsDelta: O(1) email lookup via per-inbound Map cache - WS payload size check: Blob.size instead of .length for real byte count * fix: chunk large IN ? queries and fix IPv6 same-origin check * fix: chunk large IN ? queries and fix IPv6 same-origin check * fix: unify clientStats cache, throttle clarity, hub constants * fix(ui): align traffic/expiry cell columns across all rows * style(ui): redesign outbounds table for visual consistency * style(ui): redesign routing table for visual consistency * fix: * fix: * fix: * fix: * fix: * fix: font * refactor: simplify outbound tone functions for consistency and maintainability --------- Co-authored-by: lolka1333 <test123@gmail.com>
Diffstat (limited to 'web/service')
-rw-r--r--web/service/inbound.go379
1 files changed, 316 insertions, 63 deletions
diff --git a/web/service/inbound.go b/web/service/inbound.go
index f536efb9..d011e093 100644
--- a/web/service/inbound.go
+++ b/web/service/inbound.go
@@ -366,10 +366,22 @@ func (s *InboundService) DelInbound(id int) (bool, error) {
if err != nil {
return false, err
}
- for _, client := range clients {
- err := s.DelClientIPs(db, client.Email)
- if err != nil {
- return false, err
+ // Bulk-delete client IPs for every email in this inbound. The previous
+ // per-client loop fired one DELETE per row — at 7k+ clients that meant
+ // thousands of synchronous SQL roundtrips and a multi-second freeze.
+ // Chunked to stay under SQLite's bind-variable limit on huge inbounds.
+ if len(clients) > 0 {
+ emails := make([]string, 0, len(clients))
+ for i := range clients {
+ if clients[i].Email != "" {
+ emails = append(emails, clients[i].Email)
+ }
+ }
+ for _, batch := range chunkStrings(uniqueNonEmptyStrings(emails), sqliteMaxVars) {
+ if err := db.Where("client_email IN ?", batch).
+ Delete(model.InboundClientIps{}).Error; err != nil {
+ return false, err
+ }
}
}
@@ -386,6 +398,66 @@ func (s *InboundService) GetInbound(id int) (*model.Inbound, error) {
return inbound, nil
}
+// SetInboundEnable toggles only the enable flag of an inbound, without
+// rewriting the (potentially multi-MB) settings JSON. Used by the UI's
+// per-row enable switch — for inbounds with thousands of clients the full
+// UpdateInbound path is an order of magnitude too slow for an interactive
+// toggle (parses + reserialises every client, runs O(N) traffic diff).
+//
+// Returns (needRestart, error). needRestart is true when the xray runtime
+// could not be re-synced from the cached config and a full restart is
+// required to pick up the change.
+func (s *InboundService) SetInboundEnable(id int, enable bool) (bool, error) {
+ inbound, err := s.GetInbound(id)
+ if err != nil {
+ return false, err
+ }
+ if inbound.Enable == enable {
+ return false, nil
+ }
+
+ db := database.GetDB()
+ if err := db.Model(model.Inbound{}).Where("id = ?", id).
+ Update("enable", enable).Error; err != nil {
+ return false, err
+ }
+ inbound.Enable = enable
+
+ // Sync xray runtime: drop the live inbound, add it back if we're enabling.
+ // "User not found"-style errors from DelInbound mean the inbound was
+ // already absent from the live config — that's fine. Any other error
+ // means the live config and DB diverged, so we ask the caller to
+ // schedule a restart.
+ needRestart := false
+ s.xrayApi.Init(p.GetAPIPort())
+ defer s.xrayApi.Close()
+
+ if err := s.xrayApi.DelInbound(inbound.Tag); err != nil &&
+ !strings.Contains(err.Error(), "not found") {
+ logger.Debug("SetInboundEnable: DelInbound via api failed:", err)
+ needRestart = true
+ }
+ if !enable {
+ return needRestart, nil
+ }
+
+ runtimeInbound, err := s.buildRuntimeInboundForAPI(db, inbound)
+ if err != nil {
+ logger.Debug("SetInboundEnable: build runtime config failed:", err)
+ return true, nil
+ }
+ inboundJson, err := json.MarshalIndent(runtimeInbound.GenXrayInboundConfig(), "", " ")
+ if err != nil {
+ logger.Debug("SetInboundEnable: marshal runtime config failed:", err)
+ return true, nil
+ }
+ if err := s.xrayApi.AddInbound(inboundJson); err != nil {
+ logger.Debug("SetInboundEnable: AddInbound via api failed:", err)
+ needRestart = true
+ }
+ return needRestart, nil
+}
+
// UpdateInbound modifies an existing inbound configuration.
// It validates changes, updates the database, and syncs with the running Xray instance.
// Returns the updated inbound, whether Xray needs restart, and any error.
@@ -589,6 +661,11 @@ func (s *InboundService) buildRuntimeInboundForAPI(tx *gorm.DB, inbound *model.I
return &runtimeInbound, nil
}
+// updateClientTraffics syncs the ClientTraffic rows with the inbound's clients
+// list: removes rows for emails that disappeared, inserts rows for newly-added
+// emails. Uses sets for O(N) lookup — the previous nested-loop implementation
+// was O(N²) and degraded into multi-second pauses on inbounds with thousands
+// of clients (toggling, saving, or deleting any such inbound felt frozen).
func (s *InboundService) updateClientTraffics(tx *gorm.DB, oldInbound *model.Inbound, newInbound *model.Inbound) error {
oldClients, err := s.GetClients(oldInbound)
if err != nil {
@@ -599,36 +676,48 @@ func (s *InboundService) updateClientTraffics(tx *gorm.DB, oldInbound *model.Inb
return err
}
- var emailExists bool
+ // Email is the unique key for ClientTraffic rows. Clients without an
+ // email have no stats row to sync — skip them on both sides instead of
+ // risking a unique-constraint hit or accidental delete of an unrelated row.
+ oldEmails := make(map[string]struct{}, len(oldClients))
+ for i := range oldClients {
+ if oldClients[i].Email == "" {
+ continue
+ }
+ oldEmails[oldClients[i].Email] = struct{}{}
+ }
+ newEmails := make(map[string]struct{}, len(newClients))
+ for i := range newClients {
+ if newClients[i].Email == "" {
+ continue
+ }
+ newEmails[newClients[i].Email] = struct{}{}
+ }
- for _, oldClient := range oldClients {
- emailExists = false
- for _, newClient := range newClients {
- if oldClient.Email == newClient.Email {
- emailExists = true
- break
- }
+ // Removed clients — drop their stats rows.
+ for i := range oldClients {
+ email := oldClients[i].Email
+ if email == "" {
+ continue
}
- if !emailExists {
- err = s.DelClientStat(tx, oldClient.Email)
- if err != nil {
- return err
- }
+ if _, kept := newEmails[email]; kept {
+ continue
+ }
+ if err := s.DelClientStat(tx, email); err != nil {
+ return err
}
}
- for _, newClient := range newClients {
- emailExists = false
- for _, oldClient := range oldClients {
- if newClient.Email == oldClient.Email {
- emailExists = true
- break
- }
+ // Added clients — create their stats rows.
+ for i := range newClients {
+ email := newClients[i].Email
+ if email == "" {
+ continue
}
- if !emailExists {
- err = s.AddClientStat(tx, oldInbound.Id, &newClient)
- if err != nil {
- return err
- }
+ if _, existed := oldEmails[email]; existed {
+ continue
+ }
+ if err := s.AddClientStat(tx, oldInbound.Id, &newClients[i]); err != nil {
+ return err
}
}
return nil
@@ -1228,7 +1317,7 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin
return needRestart, tx.Save(oldInbound).Error
}
-func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (error, bool, bool) {
+func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, error) {
var err error
db := database.GetDB()
tx := db.Begin()
@@ -1242,11 +1331,11 @@ func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraff
}()
err = s.addInboundTraffic(tx, inboundTraffics)
if err != nil {
- return err, false, false
+ return false, false, err
}
err = s.addClientTraffic(tx, clientTraffics)
if err != nil {
- return err, false, false
+ return false, false, err
}
needRestart0, count, err := s.autoRenewClients(tx)
@@ -1271,7 +1360,7 @@ func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraff
} else if count > 0 {
logger.Debugf("%v inbounds disabled", count)
}
- return nil, (needRestart0 || needRestart1 || needRestart2), disabledClientsCount > 0
+ return needRestart0 || needRestart1 || needRestart2, disabledClientsCount > 0, nil
}
func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {
@@ -1328,20 +1417,27 @@ func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTr
return err
}
+ // Index by email for O(N) merge — the previous nested loop was O(N²)
+ // and dominated each cron tick on inbounds with thousands of active
+ // clients (7500 × 7500 = 56M string comparisons every 10 seconds).
+ trafficByEmail := make(map[string]*xray.ClientTraffic, len(traffics))
+ for i := range traffics {
+ if traffics[i] != nil {
+ trafficByEmail[traffics[i].Email] = traffics[i]
+ }
+ }
+ now := time.Now().UnixMilli()
for dbTraffic_index := range dbClientTraffics {
- for traffic_index := range traffics {
- if dbClientTraffics[dbTraffic_index].Email == traffics[traffic_index].Email {
- dbClientTraffics[dbTraffic_index].Up += traffics[traffic_index].Up
- dbClientTraffics[dbTraffic_index].Down += traffics[traffic_index].Down
- dbClientTraffics[dbTraffic_index].AllTime += (traffics[traffic_index].Up + traffics[traffic_index].Down)
-
- // Add user in onlineUsers array on traffic
- if traffics[traffic_index].Up+traffics[traffic_index].Down > 0 {
- onlineClients = append(onlineClients, traffics[traffic_index].Email)
- dbClientTraffics[dbTraffic_index].LastOnline = time.Now().UnixMilli()
- }
- break
- }
+ t, ok := trafficByEmail[dbClientTraffics[dbTraffic_index].Email]
+ if !ok {
+ continue
+ }
+ dbClientTraffics[dbTraffic_index].Up += t.Up
+ dbClientTraffics[dbTraffic_index].Down += t.Down
+ dbClientTraffics[dbTraffic_index].AllTime += t.Up + t.Down
+ if t.Up+t.Down > 0 {
+ onlineClients = append(onlineClients, t.Email)
+ dbClientTraffics[dbTraffic_index].LastOnline = now
}
}
@@ -1441,9 +1537,17 @@ func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
for _, traffic := range traffics {
inbound_ids = append(inbound_ids, traffic.InboundId)
}
- err = tx.Model(model.Inbound{}).Where("id IN ?", inbound_ids).Find(&inbounds).Error
- if err != nil {
- return false, 0, err
+ // Dedupe so an inbound hosting N expired clients is fetched and saved once
+ // per tick instead of N times across chunk boundaries.
+ inbound_ids = uniqueInts(inbound_ids)
+ // Chunked to stay under SQLite's bind-variable limit when many inbounds
+ // are touched in a single tick.
+ for _, batch := range chunkInts(inbound_ids, sqliteMaxVars) {
+ var page []*model.Inbound
+ if err = tx.Model(model.Inbound{}).Where("id IN ?", batch).Find(&page).Error; err != nil {
+ return false, 0, err
+ }
+ inbounds = append(inbounds, page...)
}
for inbound_index := range inbounds {
settings := map[string]any{}
@@ -2362,15 +2466,24 @@ func (s *InboundService) GetClientTrafficTgBot(tgId int64) ([]*xray.ClientTraffi
}
}
- var traffics []*xray.ClientTraffic
- err = db.Model(xray.ClientTraffic{}).Where("email IN ?", emails).Find(&traffics).Error
- if err != nil {
- if err == gorm.ErrRecordNotFound {
- logger.Warning("No ClientTraffic records found for emails:", emails)
- return nil, nil
+ // Chunked to stay under SQLite's bind-variable limit when a single Telegram
+ // account owns thousands of clients across inbounds.
+ uniqEmails := uniqueNonEmptyStrings(emails)
+ traffics := make([]*xray.ClientTraffic, 0, len(uniqEmails))
+ for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
+ var page []*xray.ClientTraffic
+ if err = db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
+ if err == gorm.ErrRecordNotFound {
+ continue
+ }
+ logger.Errorf("Error retrieving ClientTraffic for emails %v: %v", batch, err)
+ return nil, err
}
- logger.Errorf("Error retrieving ClientTraffic for emails %v: %v", emails, err)
- return nil, err
+ traffics = append(traffics, page...)
+ }
+ if len(traffics) == 0 {
+ logger.Warning("No ClientTraffic records found for emails:", emails)
+ return nil, nil
}
// Populate UUID and other client data for each traffic record
@@ -2385,6 +2498,133 @@ func (s *InboundService) GetClientTrafficTgBot(tgId int64) ([]*xray.ClientTraffi
return traffics, nil
}
+// sqliteMaxVars is a safe ceiling for the number of bind parameters in a
+// single SQL statement. SQLite's SQLITE_MAX_VARIABLE_NUMBER is 999 on builds
+// before 3.32 and 32766 after; staying under 999 keeps queries portable
+// across forks/old binaries and also bounds per-query memory on truly large
+// installs (>32k clients) where even modern SQLite would refuse a single IN.
+const sqliteMaxVars = 900
+
+// uniqueNonEmptyStrings returns a deduplicated copy of in with empty strings
+// removed, preserving the order of first occurrence.
+func uniqueNonEmptyStrings(in []string) []string {
+ if len(in) == 0 {
+ return nil
+ }
+ seen := make(map[string]struct{}, len(in))
+ out := make([]string, 0, len(in))
+ for _, v := range in {
+ if v == "" {
+ continue
+ }
+ if _, ok := seen[v]; ok {
+ continue
+ }
+ seen[v] = struct{}{}
+ out = append(out, v)
+ }
+ return out
+}
+
+// uniqueInts returns a deduplicated copy of in, preserving order of first occurrence.
+func uniqueInts(in []int) []int {
+ if len(in) == 0 {
+ return nil
+ }
+ seen := make(map[int]struct{}, len(in))
+ out := make([]int, 0, len(in))
+ for _, v := range in {
+ if _, ok := seen[v]; ok {
+ continue
+ }
+ seen[v] = struct{}{}
+ out = append(out, v)
+ }
+ return out
+}
+
+// chunkStrings splits s into consecutive sub-slices of at most size elements.
+// Returns nil for an empty input or non-positive size.
+func chunkStrings(s []string, size int) [][]string {
+ if size <= 0 || len(s) == 0 {
+ return nil
+ }
+ out := make([][]string, 0, (len(s)+size-1)/size)
+ for i := 0; i < len(s); i += size {
+ end := i + size
+ if end > len(s) {
+ end = len(s)
+ }
+ out = append(out, s[i:end])
+ }
+ return out
+}
+
+// chunkInts splits s into consecutive sub-slices of at most size elements.
+// Returns nil for an empty input or non-positive size.
+func chunkInts(s []int, size int) [][]int {
+ if size <= 0 || len(s) == 0 {
+ return nil
+ }
+ out := make([][]int, 0, (len(s)+size-1)/size)
+ for i := 0; i < len(s); i += size {
+ end := i + size
+ if end > len(s) {
+ end = len(s)
+ }
+ out = append(out, s[i:end])
+ }
+ return out
+}
+
+// GetActiveClientTraffics returns the absolute ClientTraffic rows for the given
+// emails. Used by the WebSocket delta path to push per-client absolute
+// counters without re-serializing the full inbound list. The query is chunked
+// to stay under SQLite's bind-variable limit on very large active sets.
+// Empty input returns (nil, nil).
+func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.ClientTraffic, error) {
+ uniq := uniqueNonEmptyStrings(emails)
+ if len(uniq) == 0 {
+ return nil, nil
+ }
+ db := database.GetDB()
+ traffics := make([]*xray.ClientTraffic, 0, len(uniq))
+ for _, batch := range chunkStrings(uniq, sqliteMaxVars) {
+ var page []*xray.ClientTraffic
+ if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
+ return nil, err
+ }
+ traffics = append(traffics, page...)
+ }
+ return traffics, nil
+}
+
+// InboundTrafficSummary is the minimal projection of an inbound's traffic
+// counters used by the WebSocket delta path. Excludes Settings/StreamSettings
+// blobs so the broadcast stays compact even with many inbounds.
+type InboundTrafficSummary struct {
+ Id int `json:"id"`
+ Up int64 `json:"up"`
+ Down int64 `json:"down"`
+ Total int64 `json:"total"`
+ AllTime int64 `json:"allTime"`
+ Enable bool `json:"enable"`
+}
+
+// GetInboundsTrafficSummary returns inbound-level absolute traffic counters
+// (no per-client expansion). Companion to GetActiveClientTraffics — together
+// they replace the heavy "full inbound list" broadcast on each cron tick.
+func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, error) {
+ db := database.GetDB()
+ var summaries []InboundTrafficSummary
+ if err := db.Model(&model.Inbound{}).
+ Select("id, up, down, total, all_time, enable").
+ Find(&summaries).Error; err != nil {
+ return nil, err
+ }
+ return summaries, nil
+}
+
func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.ClientTraffic, err error) {
// Prefer retrieving along with client to reflect actual enabled state from inbound settings
t, client, err := s.GetClientByEmail(email)
@@ -2403,9 +2643,17 @@ func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.Cl
func (s *InboundService) UpdateClientTrafficByEmail(email string, upload int64, download int64) error {
db := database.GetDB()
+ // Keep all_time monotonic: it represents historical cumulative usage and
+ // must never be less than the currently-tracked up+down. Without this,
+ // the UI showed "Общий трафик" (allTime) below the live consumed value
+ // after admins manually edited a client's counters.
result := db.Model(xray.ClientTraffic{}).
Where("email = ?", email).
- Updates(map[string]any{"up": upload, "down": download})
+ Updates(map[string]any{
+ "up": upload,
+ "down": download,
+ "all_time": gorm.Expr("CASE WHEN COALESCE(all_time, 0) < ? THEN ? ELSE all_time END", upload+download, upload+download),
+ })
err := result.Error
if err != nil {
@@ -2746,11 +2994,16 @@ func (s *InboundService) GetClientsLastOnline() (map[string]int64, error) {
func (s *InboundService) FilterAndSortClientEmails(emails []string) ([]string, []string, error) {
db := database.GetDB()
- // Step 1: Get ClientTraffic records for emails in the input list
- var clients []xray.ClientTraffic
- err := db.Where("email IN ?", emails).Find(&clients).Error
- if err != nil && err != gorm.ErrRecordNotFound {
- return nil, nil, err
+ // Step 1: Get ClientTraffic records for emails in the input list.
+ // Chunked to stay under SQLite's bind-variable limit on huge inputs.
+ uniqEmails := uniqueNonEmptyStrings(emails)
+ clients := make([]xray.ClientTraffic, 0, len(uniqEmails))
+ for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
+ var page []xray.ClientTraffic
+ if err := db.Where("email IN ?", batch).Find(&page).Error; err != nil && err != gorm.ErrRecordNotFound {
+ return nil, nil, err
+ }
+ clients = append(clients, page...)
}
// Step 2: Sort clients by (Up + Down) descending