Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/MHSanaei/3x-ui.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOleksandrParshyn <43094723+OleksandrParshyn@users.noreply.github.com>2025-11-01 14:56:55 +0300
committermhsanaei <ho3ein.sanaei@gmail.com>2025-11-01 15:01:44 +0300
commit020cd63e227c0015ef1b984ad909000e6fdd52e5 (patch)
tree0c31371f6d61a064abdefa90bad6773d4ca06e68 /web/service
parent6e46e9b16e636ad399a52452d9ffecd6ce469d9c (diff)
Fix: Graceful Telegram bot shutdown to prevent 409 Conflict (#3580)
* Fix: Graceful Telegram bot shutdown to prevent 409 Conflict Introduces a `botCancel` context and a global `StopBot()` function to ensure the Telegram bot's Long Polling operation is safely terminated (via context cancellation) before the service restarts. This prevents the "Conflict: another update consumer is running" (409) error upon panel restart. Changes: - Added `botCancel context.CancelFunc` to manage context cancellation. - Implemented global `StopBot()` function. - Updated `Tgbot.Stop()` to call `StopBot()`. - Modified `Tgbot.OnReceive()` to use the new cancellable context for `UpdatesViaLongPolling`. * Fix: Prevent race condition and goroutine leak in TgBot Addresses a critical race condition on the global `botCancel` variable, which could occur if `Tgbot.OnReceive()` was called concurrently (e.g., during rapid panel restarts or unexpected behavior). Changes in tgbot.go: - Added `tgBotMutex sync.Mutex` to ensure thread safety. - Protected `botCancel` creation and assignment in `OnReceive()` using the mutex, and added a check to prevent overwriting an active context, which avoids goroutine leaks. - Protected the cancellation and cleanup logic in `StopBot()` with the mutex. * Refactor: Replace time.Sleep with sync.WaitGroup for reliable TgBot shutdown Replaced the unreliable `time.Sleep(1 * time.Second)` in `service.StopBot()` with `sync.WaitGroup`. This ensures the Long Polling goroutine is explicitly waited for and reliably exits before the panel continues, preventing potential resource leaks and incomplete shutdowns during restarts. Changes: - Added `botWG sync.WaitGroup` variable. - Updated `service.StopBot()` to call `botWG.Wait()` instead of `time.Sleep()`. - Modified `Tgbot.OnReceive()` to correctly use `botWG.Add(1)` and `defer botWG.Done()` within the Long Polling goroutine. - Corrected the goroutine structure in `OnReceive()` to properly encapsulate all message handling logic.
Diffstat (limited to 'web/service')
-rw-r--r--web/service/tgbot.go363
1 files changed, 208 insertions, 155 deletions
diff --git a/web/service/tgbot.go b/web/service/tgbot.go
index 0c9d820c..1573b2bf 100644
--- a/web/service/tgbot.go
+++ b/web/service/tgbot.go
@@ -38,7 +38,15 @@ import (
)
var (
- bot *telego.Bot
+ bot *telego.Bot
+
+ // botCancel stores the function to cancel the context, stopping Long Polling gracefully.
+ botCancel context.CancelFunc
+ // tgBotMutex protects concurrent access to botCancel variable
+ tgBotMutex sync.Mutex
+ // botWG waits for the OnReceive Long Polling goroutine to finish.
+ botWG sync.WaitGroup
+
botHandler *th.BotHandler
adminIds []int64
isRunning bool
@@ -306,8 +314,13 @@ func (t *Tgbot) SetHostname() {
hostname = host
}
-// Stop stops the Telegram bot and cleans up resources.
+// Stop safely stops the Telegram bot's Long Polling operation.
+// This method now calls the global StopBot function and cleans up other resources.
func (t *Tgbot) Stop() {
+ // Call the global StopBot function to gracefully shut down Long Polling
+ StopBot()
+
+ // Stop the bot handler (in case the goroutine hasn't exited yet)
if botHandler != nil {
botHandler.Stop()
}
@@ -316,6 +329,27 @@ func (t *Tgbot) Stop() {
adminIds = nil
}
+// StopBot safely stops the Telegram bot's Long Polling operation by cancelling its context.
+// This is the global function called from main.go's signal handler and t.Stop().
+func StopBot() {
+ tgBotMutex.Lock()
+ defer tgBotMutex.Unlock()
+
+ if botCancel != nil {
+ logger.Info("Sending cancellation signal to Telegram bot...")
+
+ // Calling botCancel() cancels the context passed to UpdatesViaLongPolling,
+ // which stops the Long Polling operation and closes the updates channel,
+ // allowing the th.Start() goroutine to exit cleanly.
+ botCancel()
+
+ botCancel = nil
+ // Giving the goroutine a small delay to exit cleanly.
+ botWG.Wait()
+ logger.Info("Telegram bot successfully stopped.")
+ }
+}
+
// encodeQuery encodes the query string if it's longer than 64 characters.
func (t *Tgbot) encodeQuery(query string) string {
// NOTE: we only need to hash for more than 64 chars
@@ -345,188 +379,207 @@ func (t *Tgbot) OnReceive() {
params := telego.GetUpdatesParams{
Timeout: 30, // Increased timeout to reduce API calls
}
+ // --- GRACEFUL SHUTDOWN FIX: Context creation ---
+ tgBotMutex.Lock()
- updates, _ := bot.UpdatesViaLongPolling(context.Background(), &params)
+ // Create a context with cancellation and store the cancel function.
+ var ctx context.Context
- botHandler, _ = th.NewBotHandler(bot, updates)
+ // Check if botCancel is already set (to prevent race condition overwrite and goroutine leak)
+ if botCancel == nil {
+ ctx, botCancel = context.WithCancel(context.Background())
+ } else {
+ // If botCancel is already set, use a non-cancellable context for this redundant call.
+ // This prevents overwriting the active botCancel and causing a goroutine leak from the previous call.
+ logger.Warning("TgBot OnReceive called concurrently. Using background context for redundant call.")
+ ctx = context.Background() // <<< ИЗМЕНЕНИЕ
+ }
- botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error {
- delete(userStates, message.Chat.ID)
- t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.keyboardClosed"), tu.ReplyKeyboardRemove())
- return nil
- }, th.TextEqual(t.I18nBot("tgbot.buttons.closeKeyboard")))
+ tgBotMutex.Unlock()
- botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error {
- // Use goroutine with worker pool for concurrent command processing
- go func() {
- messageWorkerPool <- struct{}{} // Acquire worker
- defer func() { <-messageWorkerPool }() // Release worker
+ // Get updates channel using the context.
+ updates, _ := bot.UpdatesViaLongPolling(ctx, &params)
+ botWG.Go(func() {
+ botHandler, _ = th.NewBotHandler(bot, updates)
+ botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error {
delete(userStates, message.Chat.ID)
- t.answerCommand(&message, message.Chat.ID, checkAdmin(message.From.ID))
- }()
- return nil
- }, th.AnyCommand())
-
- botHandler.HandleCallbackQuery(func(ctx *th.Context, query telego.CallbackQuery) error {
- // Use goroutine with worker pool for concurrent callback processing
- go func() {
- messageWorkerPool <- struct{}{} // Acquire worker
- defer func() { <-messageWorkerPool }() // Release worker
-
- delete(userStates, query.Message.GetChat().ID)
- t.answerCallback(&query, checkAdmin(query.From.ID))
- }()
- return nil
- }, th.AnyCallbackQueryWithMessage())
-
- botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error {
- if userState, exists := userStates[message.Chat.ID]; exists {
- switch userState {
- case "awaiting_id":
- if client_Id == strings.TrimSpace(message.Text) {
- t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove())
- delete(userStates, message.Chat.ID)
- inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID)
- message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol)
- t.addClient(message.Chat.ID, message_text)
- return nil
- }
+ t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.keyboardClosed"), tu.ReplyKeyboardRemove())
+ return nil
+ }, th.TextEqual(t.I18nBot("tgbot.buttons.closeKeyboard")))
- client_Id = strings.TrimSpace(message.Text)
- if t.isSingleWord(client_Id) {
- userStates[message.Chat.ID] = "awaiting_id"
+ botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error {
+ // Use goroutine with worker pool for concurrent command processing
+ go func() {
+ messageWorkerPool <- struct{}{} // Acquire worker
+ defer func() { <-messageWorkerPool }() // Release worker
- cancel_btn_markup := tu.InlineKeyboard(
- tu.InlineKeyboardRow(
- tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"),
- ),
- )
+ delete(userStates, message.Chat.ID)
+ t.answerCommand(&message, message.Chat.ID, checkAdmin(message.From.ID))
+ }()
+ return nil
+ }, th.AnyCommand())
+
+ botHandler.HandleCallbackQuery(func(ctx *th.Context, query telego.CallbackQuery) error {
+ // Use goroutine with worker pool for concurrent callback processing
+ go func() {
+ messageWorkerPool <- struct{}{} // Acquire worker
+ defer func() { <-messageWorkerPool }() // Release worker
+
+ delete(userStates, query.Message.GetChat().ID)
+ t.answerCallback(&query, checkAdmin(query.From.ID))
+ }()
+ return nil
+ }, th.AnyCallbackQueryWithMessage())
+
+ botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error {
+ if userState, exists := userStates[message.Chat.ID]; exists {
+ switch userState {
+ case "awaiting_id":
+ if client_Id == strings.TrimSpace(message.Text) {
+ t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove())
+ delete(userStates, message.Chat.ID)
+ inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID)
+ message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol)
+ t.addClient(message.Chat.ID, message_text)
+ return nil
+ }
- t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.messages.incorrect_input"), cancel_btn_markup)
- } else {
- t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_id"), 3, tu.ReplyKeyboardRemove())
- delete(userStates, message.Chat.ID)
- inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID)
- message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol)
- t.addClient(message.Chat.ID, message_text)
- }
- case "awaiting_password_tr":
- if client_TrPassword == strings.TrimSpace(message.Text) {
- t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove())
- delete(userStates, message.Chat.ID)
- return nil
- }
+ client_Id = strings.TrimSpace(message.Text)
+ if t.isSingleWord(client_Id) {
+ userStates[message.Chat.ID] = "awaiting_id"
- client_TrPassword = strings.TrimSpace(message.Text)
- if t.isSingleWord(client_TrPassword) {
- userStates[message.Chat.ID] = "awaiting_password_tr"
+ cancel_btn_markup := tu.InlineKeyboard(
+ tu.InlineKeyboardRow(
+ tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"),
+ ),
+ )
- cancel_btn_markup := tu.InlineKeyboard(
- tu.InlineKeyboardRow(
- tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"),
- ),
- )
+ t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.messages.incorrect_input"), cancel_btn_markup)
+ } else {
+ t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_id"), 3, tu.ReplyKeyboardRemove())
+ delete(userStates, message.Chat.ID)
+ inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID)
+ message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol)
+ t.addClient(message.Chat.ID, message_text)
+ }
+ case "awaiting_password_tr":
+ if client_TrPassword == strings.TrimSpace(message.Text) {
+ t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove())
+ delete(userStates, message.Chat.ID)
+ return nil
+ }
- t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.messages.incorrect_input"), cancel_btn_markup)
- } else {
- t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_password"), 3, tu.ReplyKeyboardRemove())
- delete(userStates, message.Chat.ID)
- inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID)
- message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol)
- t.addClient(message.Chat.ID, message_text)
- }
- case "awaiting_password_sh":
- if client_ShPassword == strings.TrimSpace(message.Text) {
- t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove())
- delete(userStates, message.Chat.ID)
- return nil
- }
+ client_TrPassword = strings.TrimSpace(message.Text)
+ if t.isSingleWord(client_TrPassword) {
+ userStates[message.Chat.ID] = "awaiting_password_tr"
+
+ cancel_btn_markup := tu.InlineKeyboard(
+ tu.InlineKeyboardRow(
+ tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"),
+ ),
+ )
- client_ShPassword = strings.TrimSpace(message.Text)
- if t.isSingleWord(client_ShPassword) {
- userStates[message.Chat.ID] = "awaiting_password_sh"
+ t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.messages.incorrect_input"), cancel_btn_markup)
+ } else {
+ t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_password"), 3, tu.ReplyKeyboardRemove())
+ delete(userStates, message.Chat.ID)
+ inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID)
+ message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol)
+ t.addClient(message.Chat.ID, message_text)
+ }
+ case "awaiting_password_sh":
+ if client_ShPassword == strings.TrimSpace(message.Text) {
+ t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove())
+ delete(userStates, message.Chat.ID)
+ return nil
+ }
- cancel_btn_markup := tu.InlineKeyboard(
- tu.InlineKeyboardRow(
- tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"),
- ),
- )
+ client_ShPassword = strings.TrimSpace(message.Text)
+ if t.isSingleWord(client_ShPassword) {
+ userStates[message.Chat.ID] = "awaiting_password_sh"
- t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.messages.incorrect_input"), cancel_btn_markup)
- } else {
- t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_password"), 3, tu.ReplyKeyboardRemove())
- delete(userStates, message.Chat.ID)
- inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID)
- message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol)
- t.addClient(message.Chat.ID, message_text)
- }
- case "awaiting_email":
- if client_Email == strings.TrimSpace(message.Text) {
- t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove())
- delete(userStates, message.Chat.ID)
- return nil
- }
+ cancel_btn_markup := tu.InlineKeyboard(
+ tu.InlineKeyboardRow(
+ tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"),
+ ),
+ )
- client_Email = strings.TrimSpace(message.Text)
- if t.isSingleWord(client_Email) {
- userStates[message.Chat.ID] = "awaiting_email"
+ t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.messages.incorrect_input"), cancel_btn_markup)
+ } else {
+ t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_password"), 3, tu.ReplyKeyboardRemove())
+ delete(userStates, message.Chat.ID)
+ inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID)
+ message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol)
+ t.addClient(message.Chat.ID, message_text)
+ }
+ case "awaiting_email":
+ if client_Email == strings.TrimSpace(message.Text) {
+ t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove())
+ delete(userStates, message.Chat.ID)
+ return nil
+ }
- cancel_btn_markup := tu.InlineKeyboard(
- tu.InlineKeyboardRow(
- tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"),
- ),
- )
+ client_Email = strings.TrimSpace(message.Text)
+ if t.isSingleWord(client_Email) {
+ userStates[message.Chat.ID] = "awaiting_email"
- t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.messages.incorrect_input"), cancel_btn_markup)
- } else {
- t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_email"), 3, tu.ReplyKeyboardRemove())
+ cancel_btn_markup := tu.InlineKeyboard(
+ tu.InlineKeyboardRow(
+ tu.InlineKeyboardButton(t.I18nBot("tgbot.buttons.use_default")).WithCallbackData("add_client_default_info"),
+ ),
+ )
+
+ t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.messages.incorrect_input"), cancel_btn_markup)
+ } else {
+ t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_email"), 3, tu.ReplyKeyboardRemove())
+ delete(userStates, message.Chat.ID)
+ inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID)
+ message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol)
+ t.addClient(message.Chat.ID, message_text)
+ }
+ case "awaiting_comment":
+ if client_Comment == strings.TrimSpace(message.Text) {
+ t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove())
+ delete(userStates, message.Chat.ID)
+ return nil
+ }
+
+ client_Comment = strings.TrimSpace(message.Text)
+ t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_comment"), 3, tu.ReplyKeyboardRemove())
delete(userStates, message.Chat.ID)
inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID)
message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol)
t.addClient(message.Chat.ID, message_text)
}
- case "awaiting_comment":
- if client_Comment == strings.TrimSpace(message.Text) {
- t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.using_default_value"), 3, tu.ReplyKeyboardRemove())
- delete(userStates, message.Chat.ID)
- return nil
- }
- client_Comment = strings.TrimSpace(message.Text)
- t.SendMsgToTgbotDeleteAfter(message.Chat.ID, t.I18nBot("tgbot.messages.received_comment"), 3, tu.ReplyKeyboardRemove())
- delete(userStates, message.Chat.ID)
- inbound, _ := t.inboundService.GetInbound(receiver_inbound_ID)
- message_text, _ := t.BuildInboundClientDataMessage(inbound.Remark, inbound.Protocol)
- t.addClient(message.Chat.ID, message_text)
- }
-
- } else {
- if message.UsersShared != nil {
- if checkAdmin(message.From.ID) {
- for _, sharedUser := range message.UsersShared.Users {
- userID := sharedUser.UserID
- needRestart, err := t.inboundService.SetClientTelegramUserID(message.UsersShared.RequestID, userID)
- if needRestart {
- t.xrayService.SetToNeedRestart()
- }
- output := ""
- if err != nil {
- output += t.I18nBot("tgbot.messages.selectUserFailed")
- } else {
- output += t.I18nBot("tgbot.messages.userSaved")
+ } else {
+ if message.UsersShared != nil {
+ if checkAdmin(message.From.ID) {
+ for _, sharedUser := range message.UsersShared.Users {
+ userID := sharedUser.UserID
+ needRestart, err := t.inboundService.SetClientTelegramUserID(message.UsersShared.RequestID, userID)
+ if needRestart {
+ t.xrayService.SetToNeedRestart()
+ }
+ output := ""
+ if err != nil {
+ output += t.I18nBot("tgbot.messages.selectUserFailed")
+ } else {
+ output += t.I18nBot("tgbot.messages.userSaved")
+ }
+ t.SendMsgToTgbot(message.Chat.ID, output, tu.ReplyKeyboardRemove())
}
- t.SendMsgToTgbot(message.Chat.ID, output, tu.ReplyKeyboardRemove())
+ } else {
+ t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.noResult"), tu.ReplyKeyboardRemove())
}
- } else {
- t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.noResult"), tu.ReplyKeyboardRemove())
}
}
- }
- return nil
- }, th.AnyMessage())
+ return nil
+ }, th.AnyMessage())
- botHandler.Start()
+ botHandler.Start()
+ })
}
// answerCommand processes incoming command messages from Telegram users.