diff options
| author | lolka1333 <xtrafcyz@gmail.com> | 2026-04-19 22:01:00 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-04-19 22:01:00 +0300 |
| commit | fec714a2431c482024a0952982fa36f38935e7ed (patch) | |
| tree | cb2e206b375a725623a0c0b18a22785d44037d67 /web | |
| parent | e02f78ac68e96066288c5da0c38e293160b23143 (diff) | |
fix: enhance WebSocket stability, resolve XHTTP configurations and fix UI loading shifts (#3997)
* feat: implement real-time traffic monitoring and UI updates using a high-performance WebSocket hub and background job system
* feat: add bulk client management support and improve inbound data handling
* Fix bug
* **Fixes & Changes:**
1. **Fixed XPadding Placement Dropdown**:
- Added the missing `cookie` and `query` options to `xPaddingPlacement` (`stream_xhttp.html`).
- *Why:* Previously, users wanting `cookie` obfuscation were forced to use the `header` placement string. This caused Xray-core to blindly intercept the entire monolithic HTTP Cookie header, failing internal padding-length validations and causing the inbound to silently drop the connection.
2. **Fixed Uplink Data Placement Validation**:
- Replaced the unsupported `query` option with `cookie` in `uplinkDataPlacement`.
- *Why:* Xray-core's `transport_internet.go` explicitly forbids `query` as an uplink placement option. Selecting it from the UI previously sent a payload that would cause Xray-core to instantly throw an `unsupported uplink data placement: query` panic. Adding `cookie` perfectly aligns the UI with Xray-core restrictions.
### Related Issues
- Resolves #3992
* This commit fixes structural payload issues preventing XHTTP from functioning correctly and eliminates WebSocket log spam.
- **[Fix X-Padding UI]** Added missing `cookie` and `query` options to X-Padding Placement. Fixes the issue where using Cookie fallback triggers whole HTTP Cookie header interception and silent drop in Xray-core. (Resolves [#3992](https://github.com/MHSanaei/3x-ui/issues/3992))
- **[Fix Uplink Data Options]** Replaced the invalid `query` option with `cookie` in Uplink Data Placement dropdown to prevent Xray-core backend panic `unsupported uplink data placement: query`.
- **[Fix WebSockets Spam]** Boosted `maxMessageSize` boundary to 100MB and gracefully handled fallback fetch signals via `broadcastInvalidate` to avoid buffer dropping spam. (Resolves [#3984](https://github.com/MHSanaei/3x-ui/issues/3984))
* Fix
* gofmt
* fix(websocket): resolve channel race condition and graceful shutdown deadlock
* Fix: inbounds switch
* Change max quantity from 10000 to 500
* fix
Diffstat (limited to 'web')
| -rw-r--r-- | web/assets/js/model/dbinbound.js | 25 | ||||
| -rw-r--r-- | web/controller/websocket.go | 6 | ||||
| -rw-r--r-- | web/html/form/stream/stream_xhttp.html | 3 | ||||
| -rw-r--r-- | web/html/inbounds.html | 152 | ||||
| -rw-r--r-- | web/html/index.html | 6 | ||||
| -rw-r--r-- | web/html/modals/client_bulk_modal.html | 6 | ||||
| -rw-r--r-- | web/html/modals/client_modal.html | 8 | ||||
| -rw-r--r-- | web/html/settings.html | 7 | ||||
| -rw-r--r-- | web/html/xray.html | 15 | ||||
| -rw-r--r-- | web/job/xray_traffic_job.go | 32 | ||||
| -rw-r--r-- | web/service/xray.go | 40 | ||||
| -rw-r--r-- | web/websocket/hub.go | 96 | ||||
| -rw-r--r-- | web/websocket/notifier.go | 21 |
13 files changed, 288 insertions, 129 deletions
diff --git a/web/assets/js/model/dbinbound.js b/web/assets/js/model/dbinbound.js index befc618e..c347a7eb 100644 --- a/web/assets/js/model/dbinbound.js +++ b/web/assets/js/model/dbinbound.js @@ -90,7 +90,16 @@ class DBInbound { return this.expiryTime < new Date().getTime(); } + invalidateCache() { + this._cachedInbound = null; + this._clientStatsMap = null; + } + toInbound() { + if (this._cachedInbound) { + return this._cachedInbound; + } + let settings = {}; if (!ObjectUtil.isEmpty(this.settings)) { settings = JSON.parse(this.settings); @@ -116,7 +125,21 @@ class DBInbound { sniffing: sniffing, clientStats: this.clientStats, }; - return Inbound.fromJson(config); + + this._cachedInbound = Inbound.fromJson(config); + return this._cachedInbound; + } + + getClientStats(email) { + if (!this._clientStatsMap) { + this._clientStatsMap = new Map(); + if (this.clientStats && Array.isArray(this.clientStats)) { + for (const stats of this.clientStats) { + this._clientStatsMap.set(stats.email, stats); + } + } + } + return this._clientStatsMap.get(email); } isMultiUser() { diff --git a/web/controller/websocket.go b/web/controller/websocket.go index 0ad5c845..dfb59709 100644 --- a/web/controller/websocket.go +++ b/web/controller/websocket.go @@ -30,8 +30,10 @@ const ( ) var upgrader = ws.Upgrader{ - ReadBufferSize: 4096, // Increased from 1024 for better performance - WriteBufferSize: 4096, // Increased from 1024 for better performance + ReadBufferSize: 32768, + WriteBufferSize: 32768, + EnableCompression: true, // Negotiate permessage-deflate compression if the client supports it + CheckOrigin: func(r *http.Request) bool { // Check origin for security origin := r.Header.Get("Origin") diff --git a/web/html/form/stream/stream_xhttp.html b/web/html/form/stream/stream_xhttp.html index 447612c9..8fe836d0 100644 --- a/web/html/form/stream/stream_xhttp.html +++ b/web/html/form/stream/stream_xhttp.html @@ -70,6 +70,8 @@ <a-select-option value="queryInHeader">queryInHeader</a-select-option> <a-select-option value="header">header</a-select-option> + <a-select-option value="cookie">cookie</a-select-option> + <a-select-option value="query">query</a-select-option> </a-select> </a-form-item> <a-form-item label="Padding Method"> @@ -127,6 +129,7 @@ <a-select-option value>Default (body)</a-select-option> <a-select-option value="body">body</a-select-option> <a-select-option value="header">header</a-select-option> + <a-select-option value="cookie">cookie</a-select-option> <a-select-option value="query">query</a-select-option> </a-select> </a-form-item> diff --git a/web/html/inbounds.html b/web/html/inbounds.html index b945da90..231fc0c0 100644 --- a/web/html/inbounds.html +++ b/web/html/inbounds.html @@ -6,7 +6,7 @@ <a-sidebar></a-sidebar> <a-layout id="content-layout"> <a-layout-content> - <a-spin :spinning="loadingStates.spinning" :delay="500" tip='{{ i18n "loading"}}'> + <a-spin :spinning="loadingStates.spinning" :delay="500" tip='{{ i18n "loading"}}' size="large"> <transition name="list" appear> <a-alert type="error" v-if="showAlert && loadingStates.fetched" :style="{ marginBottom: '10px' }" message='{{ i18n "secAlertTitle" }}' color="red" description='{{ i18n "secAlertSsl" }}' show-icon closable> @@ -14,10 +14,7 @@ </transition> <transition name="list" appear> <a-row v-if="!loadingStates.fetched"> - <a-card - :style="{ textAlign: 'center', padding: '30px 0', marginTop: '10px', background: 'transparent', border: 'none' }"> - <a-spin tip='{{ i18n "loading" }}'></a-spin> - </a-card> + <div :style="{ minHeight: 'calc(100vh - 120px)' }"></div> </a-row> <a-row :gutter="[isMobile ? 8 : 16, isMobile ? 0 : 12]" v-else> <a-col> @@ -1101,7 +1098,10 @@ } data.sniffing = inbound.sniffing.toString(); - await this.submit(`/panel/api/inbounds/update/${dbInbound.id}`, data, inModal); + const formData = new FormData(); + Object.keys(data).forEach(key => formData.append(key, data[key])); + + await this.submit(`/panel/api/inbounds/update/${dbInbound.id}`, formData, inModal); }, openAddClient(dbInboundId) { dbInbound = this.dbInbounds.find(row => row.id === dbInboundId); @@ -1291,9 +1291,36 @@ infoModal.show(newDbInbound, index); }, switchEnable(dbInboundId, state) { - dbInbound = this.dbInbounds.find(row => row.id === dbInboundId); + let dbInbound = this.dbInbounds.find(row => row.id === dbInboundId); + if (!dbInbound) return; dbInbound.enable = state; - this.submit(`/panel/api/inbounds/update/${dbInboundId}`, dbInbound); + let inbound = dbInbound.toInbound(); + const data = { + up: dbInbound.up, + down: dbInbound.down, + total: dbInbound.total, + remark: dbInbound.remark, + enable: dbInbound.enable, + expiryTime: dbInbound.expiryTime, + trafficReset: dbInbound.trafficReset, + lastTrafficResetTime: dbInbound.lastTrafficResetTime, + + listen: inbound.listen, + port: inbound.port, + protocol: inbound.protocol, + settings: inbound.settings.toString(), + }; + if (inbound.canEnableStream()) { + data.streamSettings = inbound.stream.toString(); + } else if (inbound.stream?.sockopt) { + data.streamSettings = JSON.stringify({ sockopt: inbound.stream.sockopt.toJson() }, null, 2); + } + data.sniffing = inbound.sniffing.toString(); + + const formData = new FormData(); + Object.keys(data).forEach(key => formData.append(key, data[key])); + + this.submit(`/panel/api/inbounds/update/${dbInboundId}`, formData); }, async switchEnableClient(dbInboundId, client) { this.loading() @@ -1367,42 +1394,54 @@ isExpiry(dbInbound, index) { return dbInbound.toInbound().isExpiry(index); }, + getClientStats(dbInbound, email) { + if (!dbInbound) return null; + if (!dbInbound._clientStatsMap) { + dbInbound._clientStatsMap = new Map(); + if (dbInbound.clientStats && Array.isArray(dbInbound.clientStats)) { + for (const stats of dbInbound.clientStats) { + dbInbound._clientStatsMap.set(stats.email, stats); + } + } + } + return dbInbound._clientStatsMap.get(email); + }, getUpStats(dbInbound, email) { - if (email.length == 0) return 0; - clientStats = dbInbound.clientStats.find(stats => stats.email === email); + if (!email || email.length == 0) return 0; + let clientStats = this.getClientStats(dbInbound, email); return clientStats ? clientStats.up : 0; }, getDownStats(dbInbound, email) { - if (email.length == 0) return 0; - clientStats = dbInbound.clientStats.find(stats => stats.email === email); + if (!email || email.length == 0) return 0; + let clientStats = this.getClientStats(dbInbound, email); return clientStats ? clientStats.down : 0; }, getSumStats(dbInbound, email) { - if (email.length == 0) return 0; - clientStats = dbInbound.clientStats.find(stats => stats.email === email); + if (!email || email.length == 0) return 0; + let clientStats = this.getClientStats(dbInbound, email); return clientStats ? clientStats.up + clientStats.down : 0; }, getAllTimeClient(dbInbound, email) { - if (email.length == 0) return 0; - clientStats = dbInbound.clientStats.find(stats => stats.email === email); + if (!email || email.length == 0) return 0; + let clientStats = this.getClientStats(dbInbound, email); if (!clientStats) return 0; return clientStats.allTime || (clientStats.up + clientStats.down); }, getRemStats(dbInbound, email) { - if (email.length == 0) return 0; - clientStats = dbInbound.clientStats.find(stats => stats.email === email); + if (!email || email.length == 0) return 0; + let clientStats = this.getClientStats(dbInbound, email); if (!clientStats) return 0; - remained = clientStats.total - (clientStats.up + clientStats.down); + let remained = clientStats.total - (clientStats.up + clientStats.down); return remained > 0 ? remained : 0; }, clientStatsColor(dbInbound, email) { - if (email.length == 0) return ColorUtils.clientUsageColor(); - clientStats = dbInbound.clientStats.find(stats => stats.email === email); + if (!email || email.length == 0) return ColorUtils.clientUsageColor(); + let clientStats = this.getClientStats(dbInbound, email); return ColorUtils.clientUsageColor(clientStats, app.trafficDiff) }, statsProgress(dbInbound, email) { - if (email.length == 0) return 100; - clientStats = dbInbound.clientStats.find(stats => stats.email === email); + if (!email || email.length == 0) return 100; + let clientStats = this.getClientStats(dbInbound, email); if (!clientStats) return 0; if (clientStats.total == 0) return 100; return 100 * (clientStats.down + clientStats.up) / clientStats.total; @@ -1415,11 +1454,11 @@ return 100 * (1 - (remainedSeconds / resetSeconds)); }, statsExpColor(dbInbound, email) { - if (email.length == 0) return '#7a316f'; - clientStats = dbInbound.clientStats.find(stats => stats.email === email); + if (!email || email.length == 0) return '#7a316f'; + let clientStats = this.getClientStats(dbInbound, email); if (!clientStats) return '#7a316f'; - statsColor = ColorUtils.usageColor(clientStats.down + clientStats.up, this.trafficDiff, clientStats.total); - expColor = ColorUtils.usageColor(new Date().getTime(), this.expireDiff, clientStats.expiryTime); + let statsColor = ColorUtils.usageColor(clientStats.down + clientStats.up, this.trafficDiff, clientStats.total); + let expColor = ColorUtils.usageColor(new Date().getTime(), this.expireDiff, clientStats.expiryTime); switch (true) { case statsColor == "red" || expColor == "red": return "#cf3c3c"; // Red @@ -1432,12 +1471,12 @@ } }, isClientEnabled(dbInbound, email) { - clientStats = dbInbound.clientStats ? dbInbound.clientStats.find(stats => stats.email === email) : null; + let clientStats = dbInbound ? this.getClientStats(dbInbound, email) : null; return clientStats ? clientStats['enable'] : true; }, isClientDepleted(dbInbound, email) { - if (!email || !dbInbound || !dbInbound.clientStats) return false; - const stats = dbInbound.clientStats.find(s => s.email === email); + if (!email || !dbInbound) return false; + const stats = this.getClientStats(dbInbound, email); if (!stats) return false; const total = stats.total ?? 0; const used = (stats.up ?? 0) + (stats.down ?? 0); @@ -1557,12 +1596,18 @@ pagination(obj) { if (this.pageSize > 0 && obj.length > this.pageSize) { // Set page options based on object size - sizeOptions = []; - for (i = this.pageSize; i <= obj.length; i = i + this.pageSize) { - sizeOptions.push(i.toString()); + let sizeOptions = [this.pageSize.toString()]; + const increments = [2, 5, 10, 20]; + for (const m of increments) { + const val = this.pageSize * m; + if (val < obj.length && val <= 1000) { + sizeOptions.push(val.toString()); + } } // Add option to see all in one page - sizeOptions.push(i.toString()); + if (!sizeOptions.includes(obj.length.toString())) { + sizeOptions.push(obj.length.toString()); + } p = { showSizeChanger: true, @@ -1605,11 +1650,25 @@ } }); + // Listen for invalidate signals (sent when payload is too large for WebSocket) + // The server sends a lightweight notification and we re-fetch via REST API + let invalidateTimer = null; + window.wsClient.on('invalidate', (payload) => { + if (payload && (payload.type === 'inbounds' || payload.type === 'traffic')) { + // Debounce to avoid flooding the REST API with multiple invalidate signals + if (invalidateTimer) clearTimeout(invalidateTimer); + invalidateTimer = setTimeout(() => { + invalidateTimer = null; + this.getDBInbounds(); + }, 1000); + } + }); + // Listen for traffic updates window.wsClient.on('traffic', (payload) => { // Note: Do NOT update total consumed traffic (stats.up, stats.down) from this event // because clientTraffics contains delta/incremental values, not total accumulated values. - // Total traffic is updated via the 'inbounds' event which contains accumulated values from database. + // Total traffic is updated via the 'inbounds' WebSocket event (or 'invalidate' fallback for large panels). // Update online clients list in real-time if (payload && Array.isArray(payload.onlineClients)) { @@ -1627,22 +1686,27 @@ this.onlineClients = nextOnlineClients; if (onlineChanged) { // Recalculate client counts to update online status + // Use $set for Vue 2 reactivity — direct array index assignment is not reactive this.dbInbounds.forEach(dbInbound => { const inbound = this.inbounds.find(ib => ib.id === dbInbound.id); if (inbound && this.clientCount[dbInbound.id]) { - this.clientCount[dbInbound.id] = this.getClientCounts(dbInbound, inbound); + this.$set(this.clientCount, dbInbound.id, this.getClientCounts(dbInbound, inbound)); } }); + // Always trigger UI refresh — not just when filter is enabled if (this.enableFilter) { this.filterInbounds(); + } else { + this.searchInbounds(this.searchKey); } } } // Update last online map in real-time + // Replace entirely (server sends the full map) to avoid unbounded growth from deleted clients if (payload && payload.lastOnlineMap && typeof payload.lastOnlineMap === 'object') { - this.lastOnlineMap = { ...this.lastOnlineMap, ...payload.lastOnlineMap }; + this.lastOnlineMap = payload.lastOnlineMap; } }); @@ -1697,4 +1761,18 @@ }, }); </script> +<style> + #content-layout > .ant-layout-content > .ant-spin-nested-loading > div > .ant-spin { + position: fixed !important; + top: 50vh !important; + left: calc(50vw + 100px) !important; + transform: translate(-50%, -50%); + z-index: 99999 !important; + } + @media (max-width: 768px) { + #content-layout > .ant-layout-content > .ant-spin-nested-loading > div > .ant-spin { + left: 50vw !important; + } + } +</style> {{ template "page/body_end" .}}
\ No newline at end of file diff --git a/web/html/index.html b/web/html/index.html index bbbbb708..47645f7d 100644 --- a/web/html/index.html +++ b/web/html/index.html @@ -6,7 +6,7 @@ <a-sidebar></a-sidebar> <a-layout id="content-layout"> <a-layout-content> - <a-spin :spinning="loadingStates.spinning" :delay="200" :tip="loadingTip"> + <a-spin :spinning="loadingStates.spinning" :delay="200" :tip="loadingTip" size="large"> <transition name="list" appear> <a-alert type="error" v-if="showAlert && loadingStates.fetched" class="mb-10" message='{{ i18n "secAlertTitle" }}' color="red" description='{{ i18n "secAlertSsl" }}' show-icon closable> @@ -15,9 +15,7 @@ <transition name="list" appear> <template> <a-row v-if="!loadingStates.fetched"> - <a-card class="card-placeholder text-center"> - <a-spin tip='{{ i18n "loading" }}'></a-spin> - </a-card> + <div :style="{ minHeight: 'calc(100vh - 120px)' }"></div> </a-row> <a-row :gutter="[isMobile ? 8 : 16, isMobile ? 0 : 12]" v-else> <a-col> diff --git a/web/html/modals/client_bulk_modal.html b/web/html/modals/client_bulk_modal.html index ac0fa011..6e61feae 100644 --- a/web/html/modals/client_bulk_modal.html +++ b/web/html/modals/client_bulk_modal.html @@ -26,7 +26,7 @@ <a-input v-model.trim="clientsBulkModal.emailPostfix"></a-input> </a-form-item> <a-form-item label='{{ i18n "pages.client.clientCount" }}' v-if="clientsBulkModal.emailMethod < 2"> - <a-input-number v-model.number="clientsBulkModal.quantity" :min="1" :max="100"></a-input-number> + <a-input-number v-model.number="clientsBulkModal.quantity" :min="1" :max="500"></a-input-number> </a-form-item> <a-form-item label='{{ i18n "security" }}' v-if="inbound.protocol === Protocols.VMESS"> <a-select v-model="clientsBulkModal.security" :dropdown-class-name="themeSwitcher.currentTheme"> @@ -204,7 +204,7 @@ this.security = "auto"; this.flow = ""; this.dbInbound = new DBInbound(dbInbound); - this.inbound = dbInbound.toInbound(); + this.inbound = Inbound.fromJson(dbInbound.toInbound().toJson()); this.delayedStart = false; this.reset = 0; }, @@ -247,4 +247,4 @@ }); </script> -{{end}}
\ No newline at end of file +{{end}} diff --git a/web/html/modals/client_modal.html b/web/html/modals/client_modal.html index 8b57b8b2..a5d3e7ac 100644 --- a/web/html/modals/client_modal.html +++ b/web/html/modals/client_modal.html @@ -37,7 +37,7 @@ this.okText = okText; this.isEdit = isEdit; this.dbInbound = new DBInbound(dbInbound); - this.inbound = dbInbound.toInbound(); + this.inbound = Inbound.fromJson(dbInbound.toInbound().toJson()); this.clients = this.inbound.clients; this.index = index === null ? this.clients.length : index; this.delayedStart = false; @@ -98,9 +98,9 @@ return app.datepicker; }, get isTrafficExhausted() { - if (!clientStats) return false - if (clientStats.total <= 0) return false - if (clientStats.up + clientStats.down < clientStats.total) return false + if (!this.clientStats) return false + if (this.clientStats.total <= 0) return false + if (this.clientStats.up + this.clientStats.down < this.clientStats.total) return false return true }, get isExpiry() { diff --git a/web/html/settings.html b/web/html/settings.html index 21294da7..48aad524 100644 --- a/web/html/settings.html +++ b/web/html/settings.html @@ -6,7 +6,7 @@ <a-sidebar></a-sidebar> <a-layout id="content-layout"> <a-layout-content> - <a-spin :spinning="loadingStates.spinning" :delay="500" tip='{{ i18n "loading"}}'> + <a-spin :spinning="loadingStates.spinning" :delay="500" tip='{{ i18n "loading"}}' size="large"> <transition name="list" appear> <a-alert type="error" v-if="confAlerts.length>0 && loadingStates.fetched" :style="{ marginBottom: '10px' }" message='{{ i18n "secAlertTitle" }}' color="red" show-icon closable> @@ -21,10 +21,7 @@ <transition name="list" appear> <template> <a-row v-if="!loadingStates.fetched"> - <a-card - :style="{ textAlign: 'center', padding: '30px 0', marginTop: '10px', background: 'transparent', border: 'none' }"> - <a-spin tip='{{ i18n "loading" }}'></a-spin> - </a-card> + <div :style="{ minHeight: 'calc(100vh - 120px)' }"></div> </a-row> <a-row :gutter="[isMobile ? 8 : 16, isMobile ? 0 : 12]" v-else> <a-col> diff --git a/web/html/xray.html b/web/html/xray.html index ebe31f48..02243277 100644 --- a/web/html/xray.html +++ b/web/html/xray.html @@ -14,7 +14,7 @@ <a-layout id="content-layout"> <a-layout-content> <a-spin :spinning="loadingStates.spinning" :delay="500" - tip='{{ i18n "loading"}}'> + tip='{{ i18n "loading"}}' size="large"> <transition name="list" appear> <a-alert type="error" v-if="showAlert && loadingStates.fetched" :style="{ marginBottom: '10px' }" @@ -24,10 +24,7 @@ </transition> <transition name="list" appear> <a-row v-if="!loadingStates.fetched"> - <a-card - :style="{ textAlign: 'center', padding: '30px 0', marginTop: '10px', background: 'transparent', border: 'none' }"> - <a-spin tip='{{ i18n "loading" }}'></a-spin> - </a-card> + <div :style="{ minHeight: 'calc(100vh - 120px)' }"></div> </a-row> <a-row :gutter="[isMobile ? 8 : 16, isMobile ? 0 : 12]" v-else> <a-col> @@ -1075,6 +1072,14 @@ this.$forceUpdate(); } }); + + // Handle invalidate signals (sent when payload is too large for WebSocket, + // or when traffic job notifies about data changes) + window.wsClient.on('invalidate', (payload) => { + if (payload && payload.type === 'outbounds') { + this.refreshOutboundTraffic(); + } + }); } while (true) { diff --git a/web/job/xray_traffic_job.go b/web/job/xray_traffic_job.go index 8d2bfbd6..f443aa77 100644 --- a/web/job/xray_traffic_job.go +++ b/web/job/xray_traffic_job.go @@ -50,7 +50,13 @@ func (j *XrayTrafficJob) Run() { j.xrayService.SetToNeedRestart() } - // Get online clients and last online map for real-time status updates + // If no frontend client is connected, skip all WebSocket broadcasting routines, + // including expensive DB queries for online clients and JSON marshaling. + if !websocket.HasClients() { + return + } + + // Update online clients list and map onlineClients := j.inboundService.GetOnlineClients() lastOnlineMap, err := j.inboundService.GetClientsLastOnline() if err != nil { @@ -58,8 +64,17 @@ func (j *XrayTrafficJob) Run() { lastOnlineMap = make(map[string]int64) } + // Broadcast traffic update (deltas and online stats) via WebSocket + trafficUpdate := map[string]any{ + "traffics": traffics, + "clientTraffics": clientTraffics, + "onlineClients": onlineClients, + "lastOnlineMap": lastOnlineMap, + } + websocket.BroadcastTraffic(trafficUpdate) + // Fetch updated inbounds from database with accumulated traffic values - // This ensures frontend receives the actual total traffic, not just delta values + // This ensures frontend receives the actual total traffic for real-time UI refresh. updatedInbounds, err := j.inboundService.GetAllInbounds() if err != nil { logger.Warning("get all inbounds for websocket failed:", err) @@ -70,16 +85,8 @@ func (j *XrayTrafficJob) Run() { logger.Warning("get all outbounds for websocket failed:", err) } - // Broadcast traffic update via WebSocket with accumulated values from database - trafficUpdate := map[string]any{ - "traffics": traffics, - "clientTraffics": clientTraffics, - "onlineClients": onlineClients, - "lastOnlineMap": lastOnlineMap, - } - websocket.BroadcastTraffic(trafficUpdate) - - // Broadcast full inbounds update for real-time UI refresh + // The WebSocket hub will automatically check the payload size. + // If it exceeds 100MB, it sends a lightweight 'invalidate' signal instead. if updatedInbounds != nil { websocket.BroadcastInbounds(updatedInbounds) } @@ -87,7 +94,6 @@ func (j *XrayTrafficJob) Run() { if updatedOutbounds != nil { websocket.BroadcastOutbounds(updatedOutbounds) } - } func (j *XrayTrafficJob) informTrafficToExternalAPI(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) { diff --git a/web/service/xray.go b/web/service/xray.go index 511ffdda..be140ce6 100644 --- a/web/service/xray.go +++ b/web/service/xray.go @@ -118,31 +118,35 @@ func (s *XrayService) GetXrayConfig() (*xray.Config, error) { json.Unmarshal([]byte(inbound.Settings), &settings) clients, ok := settings["clients"].([]any) if ok { - // check users active or not + // Fast O(N) lookup map for client traffic enablement clientStats := inbound.ClientStats + enableMap := make(map[string]bool, len(clientStats)) for _, clientTraffic := range clientStats { - indexDecrease := 0 - for index, client := range clients { - c := client.(map[string]any) - if c["email"] == clientTraffic.Email { - if !clientTraffic.Enable { - clients = RemoveIndex(clients, index-indexDecrease) - indexDecrease++ - logger.Infof("Remove Inbound User %s due to expiration or traffic limit", c["email"]) - } - } - } + enableMap[clientTraffic.Email] = clientTraffic.Enable } - // clear client config for additional parameters + // filter and clean clients var final_clients []any for _, client := range clients { - c := client.(map[string]any) - if c["enable"] != nil { - if enable, ok := c["enable"].(bool); ok && !enable { - continue - } + c, ok := client.(map[string]any) + if !ok { + continue } + + email, _ := c["email"].(string) + + // check users active or not via stats + if enable, exists := enableMap[email]; exists && !enable { + logger.Infof("Remove Inbound User %s due to expiration or traffic limit", email) + continue + } + + // check manual disabled flag + if manualEnable, ok := c["enable"].(bool); ok && !manualEnable { + continue + } + + // clear client config for additional parameters for key := range c { if key != "email" && key != "id" && key != "password" && key != "flow" && key != "method" { delete(c, key) diff --git a/web/websocket/hub.go b/web/websocket/hub.go index 8aa5903c..1455d1fa 100644 --- a/web/websocket/hub.go +++ b/web/websocket/hub.go @@ -21,6 +21,7 @@ const ( MessageTypeNotification MessageType = "notification" // System notification MessageTypeXrayState MessageType = "xray_state" // Xray state change MessageTypeOutbounds MessageType = "outbounds" // Outbounds list update + MessageTypeInvalidate MessageType = "invalidate" // Lightweight signal telling frontend to re-fetch data via REST ) // Message represents a WebSocket message @@ -32,10 +33,11 @@ type Message struct { // Client represents a WebSocket client connection type Client struct { - ID string - Send chan []byte - Hub *Hub - Topics map[MessageType]bool // Subscribed topics + ID string + Send chan []byte + Hub *Hub + Topics map[MessageType]bool // Subscribed topics + closeOnce sync.Once // Ensures Send channel is closed exactly once } // Hub maintains the set of active clients and broadcasts messages to them @@ -61,7 +63,6 @@ type Hub struct { // Worker pool for parallel broadcasting workerPoolSize int - broadcastWg sync.WaitGroup } // NewHub creates a new WebSocket hub @@ -104,20 +105,12 @@ func (h *Hub) Run() { // Graceful shutdown: close all clients h.mu.Lock() for client := range h.clients { - // Safely close channel (avoid double close panic) - select { - case _, stillOpen := <-client.Send: - if stillOpen { - close(client.Send) - } - default: + client.closeOnce.Do(func() { close(client.Send) - } + }) } h.clients = make(map[*Client]bool) h.mu.Unlock() - // Wait for all broadcast workers to finish - h.broadcastWg.Wait() logger.Info("WebSocket hub stopped gracefully") return @@ -138,19 +131,9 @@ func (h *Hub) Run() { h.mu.Lock() if _, ok := h.clients[client]; ok { delete(h.clients, client) - // Safely close channel (avoid double close panic) - // Check if channel is already closed by trying to read from it - select { - case _, stillOpen := <-client.Send: - if stillOpen { - // Channel was open and had data, now it's empty, safe to close - close(client.Send) - } - // If stillOpen is false, channel was already closed, do nothing - default: - // Channel is empty and open, safe to close + client.closeOnce.Do(func() { close(client.Send) - } + }) } count := len(h.clients) h.mu.Unlock() @@ -220,11 +203,12 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) { } close(clientChan) - // Start workers for parallel processing - h.broadcastWg.Add(h.workerPoolSize) + // Use a local WaitGroup to avoid blocking hub shutdown + var wg sync.WaitGroup + wg.Add(h.workerPoolSize) for i := 0; i < h.workerPoolSize; i++ { go func() { - defer h.broadcastWg.Done() + defer wg.Done() for client := range clientChan { func() { defer func() { @@ -246,7 +230,7 @@ func (h *Hub) broadcastParallel(clients []*Client, message []byte) { } // Wait for all workers to finish - h.broadcastWg.Wait() + wg.Wait() } // Broadcast sends a message to all connected clients @@ -259,6 +243,11 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) { return } + // Skip all work if no clients are connected + if h.GetClientCount() == 0 { + return + } + msg := Message{ Type: messageType, Payload: payload, @@ -271,10 +260,12 @@ func (h *Hub) Broadcast(messageType MessageType, payload any) { return } - // Limit message size to prevent memory issues - const maxMessageSize = 1024 * 1024 // 1MB + // If message exceeds size limit, send a lightweight invalidate notification
contacts: admin@thfree.ru |
