Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/MHSanaei/3x-ui.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/web/job
diff options
context:
space:
mode:
authorAli Rahimi <alirahimi818@gmail.com>2025-01-21 05:01:54 +0300
committerGitHub <noreply@github.com>2025-01-21 05:01:54 +0300
commit6e9180a6659479ea0b872d3c1bc4727fdd8571bd (patch)
treea69ae46d922ae6982e3067debcd23d899e7fa09c /web/job
parent66fe84181b9c4e2f6c6be943a7f486b4308c32ff (diff)
Group Management of Subscription Clients (#2644)
* add group user with the same subscription id to all inbounds * code format compare * add await for reset client traffic * en language changed * added client traffic syncer job * handle exist email duplicate in sub group * multi reset and delete request for clients group * add client traffic syncer setting option * vi translate file updated * auto open qr-modal bug fixed
Diffstat (limited to 'web/job')
-rw-r--r--web/job/client_traffic_sync_job.go178
1 files changed, 178 insertions, 0 deletions
diff --git a/web/job/client_traffic_sync_job.go b/web/job/client_traffic_sync_job.go
new file mode 100644
index 00000000..587c76e1
--- /dev/null
+++ b/web/job/client_traffic_sync_job.go
@@ -0,0 +1,178 @@
+package job
+
+import (
+ "encoding/json"
+ "fmt"
+ "time"
+
+ "x-ui/database"
+ "x-ui/database/model"
+ "x-ui/logger"
+ "x-ui/xray"
+
+ "gorm.io/gorm"
+)
+
+type SyncClientTrafficJob struct {
+ subClientsCollection map[string][]string
+}
+
+func NewClientTrafficSyncJob() *SyncClientTrafficJob {
+ return new(SyncClientTrafficJob)
+}
+func (j *SyncClientTrafficJob) Run() {
+ // Step 1: Group clients by SubID
+ subClientsCollection := j.collectClientsGroupedBySubId()
+
+ // Step 2: Sync client traffics for each SubID group
+ for subId, emails := range subClientsCollection {
+ err := j.syncClientTraffics(map[string][]string{subId: emails})
+ if err != nil {
+ logger.Error("Failed to sync traffics for SubID ", subId, ": ", err)
+ }
+ }
+}
+
+// collectClientsGroupedBySubId groups clients by their SubIDs
+func (j *SyncClientTrafficJob) collectClientsGroupedBySubId() map[string][]string {
+ db := database.GetDB()
+ result := make(map[string][]string)
+
+ // Fetch all inbounds
+ var inbounds []*model.Inbound
+ if err := db.Model(&model.Inbound{}).Find(&inbounds).Error; err != nil {
+ logger.Error("Error fetching inbounds: ", err)
+ return result // Return empty map on error
+ }
+
+ // Process each inbound
+ for _, inbound := range inbounds {
+ if inbound.Settings == "" {
+ continue
+ }
+
+ settingsMap, err := parseSettings(inbound.Settings, uint(inbound.Id))
+ if err != nil {
+ logger.Error(err)
+ continue
+ }
+
+ clients, ok := settingsMap["clients"].([]interface{})
+ if !ok {
+ continue
+ }
+
+ processClients(clients, result)
+ }
+
+ // Remove SubIDs with one or fewer emails
+ filterSingleEmailSubIDs(result)
+
+ return result
+}
+
+// parseSettings unmarshals the JSON settings and returns it as a map
+func parseSettings(settings string, inboundID uint) (map[string]interface{}, error) {
+ if !json.Valid([]byte(settings)) {
+ return nil, fmt.Errorf("Invalid JSON format in Settings for inbound ID %d", inboundID)
+ }
+
+ var tempData map[string]interface{}
+ if err := json.Unmarshal([]byte(settings), &tempData); err != nil {
+ return nil, fmt.Errorf("Error unmarshalling settings for inbound ID %d: %v", inboundID, err)
+ }
+
+ return tempData, nil
+}
+
+// processClients extracts SubID and email from the clients and populates the result map
+func processClients(clients []interface{}, result map[string][]string) {
+ for _, client := range clients {
+ clientMap, ok := client.(map[string]interface{})
+ if !ok {
+ continue
+ }
+
+ subId, ok := clientMap["subId"].(string)
+ if !ok || subId == "" {
+ continue
+ }
+
+ email, ok := clientMap["email"].(string)
+ if !ok || email == "" {
+ continue
+ }
+
+ result[subId] = append(result[subId], email)
+ }
+}
+
+// filterSingleEmailSubIDs removes SubIDs with one or fewer emails from the result map
+func filterSingleEmailSubIDs(result map[string][]string) {
+ for subId, emails := range result {
+ if len(emails) <= 1 {
+ delete(result, subId)
+ }
+ }
+}
+
+// syncClientTraffics synchronizes traffic data for each SubID group
+func (j *SyncClientTrafficJob) syncClientTraffics(result map[string][]string) error {
+ for subId, emails := range result {
+ db := database.GetDB()
+
+ // Step 1: Calculate maxUp and maxDown (outside transaction)
+ var maxUp, maxDown int64
+ err := calculateMaxTraffic(db, emails, &maxUp, &maxDown)
+ if err != nil {
+ logger.Error("Failed to calculate max traffic for SubID ", subId, ": ", err)
+ continue
+ }
+
+ // Step 2: Update traffic data with retry mechanism
+ err = retryOperation(func() error {
+ return updateTraffic(db, emails, maxUp, maxDown)
+ }, 5, 100*time.Millisecond)
+
+ if err != nil {
+ logger.Error("Failed to update client traffics for SubID ", subId, ": ", err)
+ }
+ }
+ return nil
+}
+
+// calculateMaxTraffic calculates max up and down traffic for a group of emails
+func calculateMaxTraffic(db *gorm.DB, emails []string, maxUp, maxDown *int64) error {
+ return db.Model(&xray.ClientTraffic{}).
+ Where("email IN ?", emails).
+ Select("MAX(up) AS max_up, MAX(down) AS max_down").
+ Row().
+ Scan(maxUp, maxDown)
+}
+
+// updateTraffic updates the traffic data in the database within a transaction
+func updateTraffic(db *gorm.DB, emails []string, maxUp, maxDown int64) error {
+ return db.Transaction(func(tx *gorm.DB) error {
+ return tx.Model(&xray.ClientTraffic{}).
+ Where("email IN ?", emails).
+ Updates(map[string]interface{}{
+ "up": maxUp,
+ "down": maxDown,
+ }).Error
+ })
+}
+
+// retryOperation retries an operation multiple times with a delay
+func retryOperation(operation func() error, maxRetries int, delay time.Duration) error {
+ var err error
+ for i := 0; i < maxRetries; i++ {
+ err = operation()
+ if err == nil {
+ return nil
+ }
+ logger.Info(fmt.Sprintf("Retry %d/%d failed: %v", i+1, maxRetries, err))
+ time.Sleep(delay)
+ }
+ return err
+}
+