diff options
| author | MHSanaei <ho3ein.sanaei@gmail.com> | 2023-06-05 00:02:19 +0300 |
|---|---|---|
| committer | MHSanaei <ho3ein.sanaei@gmail.com> | 2023-06-05 00:02:19 +0300 |
| commit | 70f250dfe1e24249a4cc8102e3fef65959dfb15a (patch) | |
| tree | 9fdc990d656267abf6facafd851ab7b4835be977 /web/service | |
| parent | 1030bcf321f15ada665ca3c55436d7c2449b5faf (diff) | |
[feature] using xray api and more
Improve DB performance
[api] backward compatibility: add client by update
Co-Authored-By: Alireza Ahmadi <alireza7@gmail.com>
Diffstat (limited to 'web/service')
| -rw-r--r-- | web/service/inbound.go | 351 | ||||
| -rw-r--r-- | web/service/xray.go | 13 |
2 files changed, 301 insertions, 63 deletions
diff --git a/web/service/inbound.go b/web/service/inbound.go index 11522ad2..84ce9bd8 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -15,6 +15,7 @@ import ( ) type InboundService struct { + xrayApi xray.XrayAPI } func (s *InboundService) GetInbounds(userId int) ([]*model.Inbound, error) { @@ -156,11 +157,19 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, err } db := database.GetDB() + tx := db.Begin() + defer func() { + if err == nil { + tx.Commit() + } else { + tx.Rollback() + } + }() - err = db.Save(inbound).Error + err = tx.Save(inbound).Error if err == nil { for _, client := range clients { - s.AddClientStat(inbound.Id, &client) + s.AddClientStat(tx, inbound.Id, &client) } } return inbound, err @@ -244,6 +253,12 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound, if err != nil { return inbound, err } + + err = s.updateClientTraffics(oldInbound, inbound) + if err != nil { + return inbound, err + } + oldInbound.Up = inbound.Up oldInbound.Down = inbound.Down oldInbound.Total = inbound.Total @@ -262,36 +277,92 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound, return inbound, db.Save(oldInbound).Error } -func (s *InboundService) AddInboundClient(data *model.Inbound) error { - clients, err := s.GetClients(data) +func (s *InboundService) updateClientTraffics(oldInbound *model.Inbound, newInbound *model.Inbound) error { + oldClients, err := s.GetClients(oldInbound) + if err != nil { + return err + } + newClients, err := s.GetClients(newInbound) if err != nil { return err } + db := database.GetDB() + tx := db.Begin() + + defer func() { + if err != nil { + tx.Rollback() + } else { + tx.Commit() + } + }() + + var emailExists bool + + for _, oldClient := range oldClients { + emailExists = false + for _, newClient := range newClients { + if oldClient.Email == newClient.Email { + emailExists = true + break + } + } + if !emailExists { + err = s.DelClientStat(tx, oldClient.Email) + if err != nil { + return err + } + } + } + for _, newClient := range newClients { + emailExists = false + for _, oldClient := range oldClients { + if newClient.Email == oldClient.Email { + emailExists = true + break + } + } + if !emailExists { + err = s.AddClientStat(tx, oldInbound.Id, &newClient) + if err != nil { + return err + } + } + } + return nil +} + +func (s *InboundService) AddInboundClient(data *model.Inbound) (bool, error) { + clients, err := s.GetClients(data) + if err != nil { + return false, err + } + var settings map[string]interface{} err = json.Unmarshal([]byte(data.Settings), &settings) if err != nil { - return err + return false, err } interfaceClients := settings["clients"].([]interface{}) existEmail, err := s.checkEmailsExistForClients(clients) if err != nil { - return err + return false, err } if existEmail != "" { - return common.NewError("Duplicate email:", existEmail) + return false, common.NewError("Duplicate email:", existEmail) } oldInbound, err := s.GetInbound(data.Id) if err != nil { - return err + return false, err } var oldSettings map[string]interface{} err = json.Unmarshal([]byte(oldInbound.Settings), &oldSettings) if err != nil { - return err + return false, err } oldClients := oldSettings["clients"].([]interface{}) @@ -301,30 +372,58 @@ func (s *InboundService) AddInboundClient(data *model.Inbound) error { newSettings, err := json.MarshalIndent(oldSettings, "", " ") if err != nil { - return err + return false, err } oldInbound.Settings = string(newSettings) + db := database.GetDB() + tx := db.Begin() + + defer func() { + if err != nil { + tx.Rollback() + } else { + tx.Commit() + } + }() + + needRestart := false + s.xrayApi.Init(p.GetAPIPort()) for _, client := range clients { if len(client.Email) > 0 { - s.AddClientStat(data.Id, &client) + s.AddClientStat(tx, data.Id, &client) + err1 := s.xrayApi.AddUser(string(oldInbound.Protocol), oldInbound.Tag, map[string]interface{}{ + "email": client.Email, + "id": client.ID, + "alterId": client.AlterIds, + "flow": client.Flow, + "password": client.Password, + }) + if err1 == nil { + logger.Debug("Client added by api:", client.Email) + } else { + needRestart = true + } + } else { + needRestart = true } } - db := database.GetDB() - return db.Save(oldInbound).Error + s.xrayApi.Close() + + return needRestart, tx.Save(oldInbound).Error } -func (s *InboundService) DelInboundClient(inboundId int, clientId string) error { +func (s *InboundService) DelInboundClient(inboundId int, clientId string) (bool, error) { oldInbound, err := s.GetInbound(inboundId) if err != nil { logger.Error("Load Old Data Error") - return err + return false, err } var settings map[string]interface{} err = json.Unmarshal([]byte(oldInbound.Settings), &settings) if err != nil { - return err + return false, err } email := "" @@ -351,7 +450,7 @@ func (s *InboundService) DelInboundClient(inboundId int, clientId string) error settings["clients"] = newClients newSettings, err := json.MarshalIndent(settings, "", " ") if err != nil { - return err + return false, err } oldInbound.Settings = string(newSettings) @@ -360,39 +459,49 @@ func (s *InboundService) DelInboundClient(inboundId int, clientId string) error err = s.DelClientStat(db, email) if err != nil { logger.Error("Delete stats Data Error") - return err + return false, err } err = s.DelClientIPs(db, email) if err != nil { logger.Error("Error in delete client IPs") - return err + return false, err + } + needRestart := true + s.xrayApi.Init(p.GetAPIPort()) + if len(email) > 0 { + err = s.xrayApi.RemoveUser(oldInbound.Tag, email) + if err == nil { + logger.Debug("Client deleted by api:", email) + needRestart = false + } } - return db.Save(oldInbound).Error + s.xrayApi.Close() + return needRestart, db.Save(oldInbound).Error } -func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId string) error { +func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId string) (bool, error) { clients, err := s.GetClients(data) if err != nil { - return err + return false, err } var settings map[string]interface{} err = json.Unmarshal([]byte(data.Settings), &settings) if err != nil { - return err + return false, err } inerfaceClients := settings["clients"].([]interface{}) oldInbound, err := s.GetInbound(data.Id) if err != nil { - return err + return false, err } oldClients, err := s.GetClients(oldInbound) if err != nil { - return err + return false, err } oldEmail := "" @@ -416,17 +525,17 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin if len(clients[0].Email) > 0 && clients[0].Email != oldEmail { existEmail, err := s.checkEmailsExistForClients(clients) if err != nil { - return err + return false, err } if existEmail != "" { - return common.NewError("Duplicate email:", existEmail) + return false, common.NewError("Duplicate email:", existEmail) } } var oldSettings map[string]interface{} err = json.Unmarshal([]byte(oldInbound.Settings), &oldSettings) if err != nil { - return err + return false, err } settingsClients := oldSettings["clients"].([]interface{}) settingsClients[clientIndex] = inerfaceClients[0] @@ -434,36 +543,67 @@ func (s *InboundService) UpdateInboundClient(data *model.Inbound, clientId strin newSettings, err := json.MarshalIndent(oldSettings, "", " ") if err != nil { - return err + return false, err } oldInbound.Settings = string(newSettings) db := database.GetDB() + tx := db.Begin() + + defer func() { + if err != nil { + tx.Rollback() + } else { + tx.Commit() + } + }() if len(clients[0].Email) > 0 { if len(oldEmail) > 0 { err = s.UpdateClientStat(oldEmail, &clients[0]) if err != nil { - return err + return false, err } err = s.UpdateClientIPs(db, oldEmail, clients[0].Email) if err != nil { - return err + return false, err } } else { - s.AddClientStat(data.Id, &clients[0]) + s.AddClientStat(tx, data.Id, &clients[0]) } } else { - err = s.DelClientStat(db, oldEmail) + err = s.DelClientStat(tx, oldEmail) if err != nil { - return err + return false, err } err = s.DelClientIPs(db, oldEmail) if err != nil { - return err + return false, err } } - return db.Save(oldInbound).Error + needRestart := true + s.xrayApi.Init(p.GetAPIPort()) + if len(oldEmail) > 0 { + s.xrayApi.RemoveUser(oldInbound.Tag, oldEmail) + if clients[0].Enable { + err1 := s.xrayApi.AddUser(string(oldInbound.Protocol), oldInbound.Tag, map[string]interface{}{ + "email": clients[0].Email, + "id": clients[0].ID, + "alterId": clients[0].AlterIds, + "flow": clients[0].Flow, + "password": clients[0].Password, + }) + if err1 == nil { + logger.Debug("Client edited by api:", clients[0].Email) + needRestart = false + } + } else { + logger.Debug("Client disabled by api:", clients[0].Email) + needRestart = false + } + } + s.xrayApi.Close() + return needRestart, tx.Save(oldInbound).Error } func (s *InboundService) AddTraffic(traffics []*xray.Traffic) error { @@ -489,6 +629,7 @@ func (s *InboundService) AddTraffic(traffics []*xray.Traffic) error { return err } + func (s *InboundService) AddClientTraffic(traffics []*xray.ClientTraffic) (err error) { if len(traffics) == 0 { return nil @@ -601,15 +742,42 @@ func (s *InboundService) DisableInvalidInbounds() (int64, error) { return count, err } -func (s *InboundService) DisableInvalidClients() (int64, error) { +func (s *InboundService) DisableInvalidClients() (bool, int64, error) { db := database.GetDB() now := time.Now().Unix() * 1000 + needRestart := false + + if p != nil { + var results []struct { + Tag string + Email string + } + + err := db.Table("inbounds"). + Select("inbounds.tag, client_traffics.email"). + Joins("JOIN client_traffics ON inbounds.id = client_traffics.inbound_id"). + Where("((client_traffics.total > 0 AND client_traffics.up + client_traffics.down >= client_traffics.total) OR (client_traffics.expiry_time > 0 AND client_traffics.expiry_time <= ?)) AND client_traffics.enable = ?", now, true). + Scan(&results).Error + if err != nil { + return false, 0, err + } + s.xrayApi.Init(p.GetAPIPort()) + for _, result := range results { + err = s.xrayApi.RemoveUser(result.Tag, result.Email) + if err == nil { + logger.Debug("Client deleted by api:", result.Email) + } else { + needRestart = true + } + } + s.xrayApi.Close() + } result := db.Model(xray.ClientTraffic{}). Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true). Update("enable", false) err := result.Error count := result.RowsAffected - return count, err + return needRestart, count, err } func (s *InboundService) MigrationRemoveOrphanedTraffics() { @@ -624,9 +792,7 @@ func (s *InboundService) MigrationRemoveOrphanedTraffics() { `) } -func (s *InboundService) AddClientStat(inboundId int, client *model.Client) error { - db := database.GetDB() - +func (s *InboundService) AddClientStat(tx *gorm.DB, inboundId int, client *model.Client) error { clientTraffic := xray.ClientTraffic{} clientTraffic.InboundId = inboundId clientTraffic.Email = client.Email @@ -635,7 +801,7 @@ func (s *InboundService) AddClientStat(inboundId int, client *model.Client) erro clientTraffic.Enable = true clientTraffic.Up = 0 clientTraffic.Down = 0 - result := db.Create(&clientTraffic) + result := tx.Create(&clientTraffic) err := result.Error if err != nil { return err @@ -779,7 +945,11 @@ func (s *InboundService) SetClientTelegramUserID(trafficId int, tgId string) err return err } inbound.Settings = string(modifiedSettings) - return s.UpdateInboundClient(inbound, clientId) + _, err = s.UpdateInboundClient(inbound, clientId) + if err != nil { + return err + } + return nil } func (s *InboundService) ToggleClientEnableByEmail(clientEmail string) (bool, error) { @@ -835,7 +1005,13 @@ func (s *InboundService) ToggleClientEnableByEmail(clientEmail string) (bool, er return false, err } inbound.Settings = string(modifiedSettings) - return !clientOldEnabled, s.UpdateInboundClient(inbound, clientId) + + _, err = s.UpdateInboundClient(inbound, clientId) + if err != nil { + return false, err + } + + return !clientOldEnabled, nil } func (s *InboundService) ResetClientIpLimitByEmail(clientEmail string, count int) error { @@ -889,9 +1065,13 @@ func (s *InboundService) ResetClientIpLimitByEmail(clientEmail string, count int return err } inbound.Settings = string(modifiedSettings) - return s.UpdateInboundClient(inbound, clientId) -} + _, err = s.UpdateInboundClient(inbound, clientId) + if err != nil { + return err + } + return nil +} func (s *InboundService) ResetClientExpiryTimeByEmail(clientEmail string, expiry_time int64) error { _, inbound, err := s.GetClientInboundByEmail(clientEmail) if err != nil { @@ -943,7 +1123,12 @@ func (s *InboundService) ResetClientExpiryTimeByEmail(clientEmail string, expiry return err } inbound.Settings = string(modifiedSettings) - return s.UpdateInboundClient(inbound, clientId) + _, err = s.UpdateInboundClient(inbound, clientId) + if err != nil { + return err + } + return nil + } func (s *InboundService) ResetClientTrafficByEmail(clientEmail string) error { @@ -961,19 +1146,55 @@ func (s *InboundService) ResetClientTrafficByEmail(clientEmail string) error { return nil } -func (s *InboundService) ResetClientTraffic(id int, clientEmail string) error { - db := database.GetDB() +func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (bool, error) { + needRestart := false - result := db.Model(xray.ClientTraffic{}). - Where("inbound_id = ? and email = ?", id, clientEmail). - Updates(map[string]interface{}{"enable": true, "up": 0, "down": 0}) + traffic, err := s.GetClientTrafficByEmail(clientEmail) + if err != nil { + return false, err + } - err := result.Error + if !traffic.Enable { + inbound, err := s.GetInbound(id) + if err != nil { + return false, err + } + clients, err := s.GetClients(inbound) + if err != nil { + return false, err + } + for _, client := range clients { + if client.Email == clientEmail { + s.xrayApi.Init(p.GetAPIPort()) + err1 := s.xrayApi.AddUser(string(inbound.Protocol), inbound.Tag, map[string]interface{}{ + "email": client.Email, + "id": client.ID, + "alterId": client.AlterIds, + "flow": client.Flow, + "password": client.Password, + }) + if err1 == nil { + logger.Debug("Client enabled due to reset traffic:", clientEmail) + } else { + needRestart = true + } + s.xrayApi.Close() + break + } + } + } + + traffic.Up = 0 + traffic.Down = 0 + traffic.Enable = true + db := database.GetDB() + err = db.Save(traffic).Error if err != nil { - return err + return false, err } - return nil + + return needRestart, nil } func (s *InboundService) ResetAllClientTraffics(id int) error { @@ -1212,10 +1433,19 @@ func (s *InboundService) SearchInbounds(query string) ([]*model.Inbound, error) func (s *InboundService) MigrationRequirements() { db := database.GetDB() + tx := db.Begin() + var err error + defer func() { + if err == nil { + tx.Commit() + } else { + tx.Rollback() + } + }() // Fix inbounds based problems var inbounds []*model.Inbound - err := db.Model(model.Inbound{}).Where("protocol IN (?)", []string{"vmess", "vless", "trojan"}).Find(&inbounds).Error + err = tx.Model(model.Inbound{}).Where("protocol IN (?)", []string{"vmess", "vless", "trojan"}).Find(&inbounds).Error if err != nil && err != gorm.ErrRecordNotFound { return } @@ -1250,6 +1480,7 @@ func (s *InboundService) MigrationRequirements() { inbounds[inbound_index].Settings = string(modifiedSettings) } + // Add client traffic row for all clients which has email modelClients, err := s.GetClients(inbounds[inbound_index]) if err != nil { @@ -1258,17 +1489,17 @@ func (s *InboundService) MigrationRequirements() { for _, modelClient := range modelClients { if len(modelClient.Email) > 0 { var count int64 - db.Model(xray.ClientTraffic{}).Where("email = ?", modelClient.Email).Count(&count) + tx.Model(xray.ClientTraffic{}).Where("email = ?", modelClient.Email).Count(&count) if count == 0 { - s.AddClientStat(inbounds[inbound_index].Id, &modelClient) + s.AddClientStat(tx, inbounds[inbound_index].Id, &modelClient) } } } } - db.Save(inbounds) + tx.Save(inbounds) // Remove orphaned traffics - db.Where("inbound_id = 0").Delete(xray.ClientTraffic{}) + tx.Where("inbound_id = 0").Delete(xray.ClientTraffic{}) } func (s *InboundService) MigrateDB() { diff --git a/web/service/xray.go b/web/service/xray.go index bcc886fe..5475891f 100644 --- a/web/service/xray.go +++ b/web/service/xray.go @@ -18,6 +18,7 @@ var result string type XrayService struct { inboundService InboundService settingService SettingService + xrayAPI xray.XrayAPI } func (s *XrayService) IsXrayRunning() bool { @@ -143,7 +144,9 @@ func (s *XrayService) GetXrayTraffic() ([]*xray.Traffic, []*xray.ClientTraffic, if !s.IsXrayRunning() { return nil, nil, errors.New("xray is not running") } - return p.GetTraffic(true) + s.xrayAPI.Init(p.GetAPIPort()) + defer s.xrayAPI.Close() + return s.xrayAPI.GetTraffic(true) } func (s *XrayService) RestartXray(isForce bool) error { @@ -158,7 +161,7 @@ func (s *XrayService) RestartXray(isForce bool) error { if p != nil && p.IsRunning() { if !isForce && p.GetConfig().Equals(xrayConfig) { - logger.Debug("not need to restart xray") + logger.Debug("It does not need to restart xray") return nil } p.Stop() @@ -166,7 +169,11 @@ func (s *XrayService) RestartXray(isForce bool) error { p = xray.NewProcess(xrayConfig) result = "" - return p.Start() + err = p.Start() + if err != nil { + return err + } + return nil } func (s *XrayService) StopXray() error { |
