Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/MHSanaei/3x-ui.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/web
diff options
context:
space:
mode:
authorVlad Yaroslavlev <vladon@vladon.dev>2026-01-02 18:13:32 +0300
committerGitHub <noreply@github.com>2026-01-02 18:13:32 +0300
commit278aa1c85c6af86a64326c924a746eeec5865389 (patch)
tree43adbd6ae1ae4999e331f90ade07a48a6093091b /web
parent8fe297ef9d519995557add08c1e439381272dbed (diff)
Fix telegram bot issue (#3608)
* fix: improve Telegram bot handling for concurrent starts and graceful shutdown - Added logic to stop any existing long-polling loop when Start is called again. - Introduced a mutex to manage access to shared state variables, ensuring thread safety. - Updated the OnReceive method to prevent multiple concurrent executions. - Enhanced Stop method to ensure proper cleanup of resources and state management. * fix: enhance Telegram bot's long-polling management - Improved handling of concurrent starts by stopping existing long-polling loops. - Implemented mutex for thread-safe access to shared state variables. - Updated OnReceive method to prevent multiple executions. - Enhanced Stop method for better resource cleanup and state management. * .
Diffstat (limited to 'web')
-rw-r--r--web/service/tgbot.go96
1 files changed, 55 insertions, 41 deletions
diff --git a/web/service/tgbot.go b/web/service/tgbot.go
index 06c51faa..3a98dcb4 100644
--- a/web/service/tgbot.go
+++ b/web/service/tgbot.go
@@ -174,6 +174,10 @@ func (t *Tgbot) Start(i18nFS embed.FS) error {
return err
}
+ // If Start is called again (e.g. during reload), ensure any previous long-polling
+ // loop is stopped before creating a new bot / receiver.
+ StopBot()
+
// Initialize hash storage to store callback queries
hashStorage = global.NewHashStorage(20 * time.Minute)
@@ -207,6 +211,7 @@ func (t *Tgbot) Start(i18nFS embed.FS) error {
return err
}
+ parsedAdminIds := make([]int64, 0)
// Parse admin IDs from comma-separated string
if tgBotID != "" {
for _, adminID := range strings.Split(tgBotID, ",") {
@@ -215,9 +220,12 @@ func (t *Tgbot) Start(i18nFS embed.FS) error {
logger.Warning("Failed to parse admin ID from Telegram bot chat ID:", err)
return err
}
- adminIds = append(adminIds, int64(id))
+ parsedAdminIds = append(parsedAdminIds, int64(id))
}
}
+ tgBotMutex.Lock()
+ adminIds = parsedAdminIds
+ tgBotMutex.Unlock()
// Get Telegram bot proxy URL
tgBotProxy, err := t.settingService.GetTgBotProxy()
@@ -252,10 +260,12 @@ func (t *Tgbot) Start(i18nFS embed.FS) error {
}
// Start receiving Telegram bot messages
- if !isRunning {
+ tgBotMutex.Lock()
+ alreadyRunning := isRunning || botCancel != nil
+ tgBotMutex.Unlock()
+ if !alreadyRunning {
logger.Info("Telegram bot receiver started")
go t.OnReceive()
- isRunning = true
}
return nil
@@ -300,6 +310,8 @@ func (t *Tgbot) NewBot(token string, proxyUrl string, apiServerUrl string) (*tel
// IsRunning checks if the Telegram bot is currently running.
func (t *Tgbot) IsRunning() bool {
+ tgBotMutex.Lock()
+ defer tgBotMutex.Unlock()
return isRunning
}
@@ -317,34 +329,34 @@ func (t *Tgbot) SetHostname() {
// Stop safely stops the Telegram bot's Long Polling operation.
// This method now calls the global StopBot function and cleans up other resources.
func (t *Tgbot) Stop() {
- // Call the global StopBot function to gracefully shut down Long Polling
StopBot()
-
- // Stop the bot handler (in case the goroutine hasn't exited yet)
- if botHandler != nil {
- botHandler.Stop()
- }
logger.Info("Stop Telegram receiver ...")
- isRunning = false
+ tgBotMutex.Lock()
adminIds = nil
+ tgBotMutex.Unlock()
}
// StopBot safely stops the Telegram bot's Long Polling operation by cancelling its context.
// This is the global function called from main.go's signal handler and t.Stop().
func StopBot() {
+ // Don't hold the mutex while cancelling/waiting.
tgBotMutex.Lock()
- defer tgBotMutex.Unlock()
-
- if botCancel != nil {
- logger.Info("Sending cancellation signal to Telegram bot...")
+ cancel := botCancel
+ botCancel = nil
+ handler := botHandler
+ botHandler = nil
+ isRunning = false
+ tgBotMutex.Unlock()
- // Calling botCancel() cancels the context passed to UpdatesViaLongPolling,
- // which stops the Long Polling operation and closes the updates channel,
- // allowing the th.Start() goroutine to exit cleanly.
- botCancel()
+ if handler != nil {
+ handler.Stop()
+ }
- botCancel = nil
- // Giving the goroutine a small delay to exit cleanly.
+ if cancel != nil {
+ logger.Info("Sending cancellation signal to Telegram bot...")
+ // Cancels the context passed to UpdatesViaLongPolling; this closes updates channel
+ // and lets botHandler.Start() exit cleanly.
+ cancel()
botWG.Wait()
logger.Info("Telegram bot successfully stopped.")
}
@@ -379,36 +391,38 @@ func (t *Tgbot) OnReceive() {
params := telego.GetUpdatesParams{
Timeout: 30, // Increased timeout to reduce API calls
}
- // --- GRACEFUL SHUTDOWN FIX: Context creation ---
+ // Strict singleton: never start a second long-polling loop.
tgBotMutex.Lock()
-
- // Create a context with cancellation and store the cancel function.
- var ctx context.Context
-
- // Check if botCancel is already set (to prevent race condition overwrite and goroutine leak)
- if botCancel == nil {
- ctx, botCancel = context.WithCancel(context.Background())
- } else {
- // If botCancel is already set, use a non-cancellable context for this redundant call.
- // This prevents overwriting the active botCancel and causing a goroutine leak from the previous call.
- logger.Warning("TgBot OnReceive called concurrently. Using background context for redundant call.")
- ctx = context.Background() // <<< ИЗМЕНЕНИЕ
+ if botCancel != nil || isRunning {
+ tgBotMutex.Unlock()
+ logger.Warning("TgBot OnReceive called while already running; ignoring.")
+ return
}
+ ctx, cancel := context.WithCancel(context.Background())
+ botCancel = cancel
+ isRunning = true
+ // Add to WaitGroup before releasing the lock so StopBot() can't return
+ // before this receiver goroutine is accounted for.
+ botWG.Add(1)
tgBotMutex.Unlock()
// Get updates channel using the context.
updates, _ := bot.UpdatesViaLongPolling(ctx, &params)
- botWG.Go(func() {
+ go func() {
+ defer botWG.Done()
+ h, _ := th.NewBotHandler(bot, updates)
+ tgBotMutex.Lock()
+ botHandler = h
+ tgBotMutex.Unlock()
- botHandler, _ = th.NewBotHandler(bot, updates)
- botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error {
+ h.HandleMessage(func(ctx *th.Context, message telego.Message) error {
delete(userStates, message.Chat.ID)
t.SendMsgToTgbot(message.Chat.ID, t.I18nBot("tgbot.keyboardClosed"), tu.ReplyKeyboardRemove())
return nil
}, th.TextEqual(t.I18nBot("tgbot.buttons.closeKeyboard")))
- botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error {
+ h.HandleMessage(func(ctx *th.Context, message telego.Message) error {
// Use goroutine with worker pool for concurrent command processing
go func() {
messageWorkerPool <- struct{}{} // Acquire worker
@@ -420,7 +434,7 @@ func (t *Tgbot) OnReceive() {
return nil
}, th.AnyCommand())
- botHandler.HandleCallbackQuery(func(ctx *th.Context, query telego.CallbackQuery) error {
+ h.HandleCallbackQuery(func(ctx *th.Context, query telego.CallbackQuery) error {
// Use goroutine with worker pool for concurrent callback processing
go func() {
messageWorkerPool <- struct{}{} // Acquire worker
@@ -432,7 +446,7 @@ func (t *Tgbot) OnReceive() {
return nil
}, th.AnyCallbackQueryWithMessage())
- botHandler.HandleMessage(func(ctx *th.Context, message telego.Message) error {
+ h.HandleMessage(func(ctx *th.Context, message telego.Message) error {
if userState, exists := userStates[message.Chat.ID]; exists {
switch userState {
case "awaiting_id":
@@ -578,8 +592,8 @@ func (t *Tgbot) OnReceive() {
return nil
}, th.AnyMessage())
- botHandler.Start()
- })
+ h.Start()
+ }()
}
// answerCommand processes incoming command messages from Telegram users.