From 966c4dfa926b3abb1f3b111c11f5097f6113d2f1 Mon Sep 17 00:00:00 2001 From: Eyre_S Date: Sun, 18 Feb 2024 18:01:23 +0800 Subject: [PATCH] add the message thread framework. --- project/MornyConfiguration.scala | 2 +- .../sukazyo/cono/morny/core/MornyCoeur.scala | 6 + .../core/bot/api/messages/MessageThread.scala | 59 +++++++++ .../bot/api/messages/MessagingContext.scala | 48 ++++++++ .../bot/api/messages/ThreadingManager.scala | 62 ++++++++++ .../bot/internal/ThreadingManagerImpl.scala | 114 ++++++++++++++++++ .../cono/morny/morny_misc/Testing.scala | 20 ++- .../cono/morny/util/schedule/Scheduler.scala | 20 ++- .../cono/morny/util/tgapi/Standardize.scala | 8 +- 9 files changed, 328 insertions(+), 11 deletions(-) create mode 100644 src/main/scala/cc/sukazyo/cono/morny/core/bot/api/messages/MessageThread.scala create mode 100644 src/main/scala/cc/sukazyo/cono/morny/core/bot/api/messages/MessagingContext.scala create mode 100644 src/main/scala/cc/sukazyo/cono/morny/core/bot/api/messages/ThreadingManager.scala create mode 100644 src/main/scala/cc/sukazyo/cono/morny/core/bot/internal/ThreadingManagerImpl.scala diff --git a/project/MornyConfiguration.scala b/project/MornyConfiguration.scala index 8b35ed8..1fb6b3e 100644 --- a/project/MornyConfiguration.scala +++ b/project/MornyConfiguration.scala @@ -8,7 +8,7 @@ object MornyConfiguration { val MORNY_CODE_STORE = "https://github.com/Eyre-S/Coeur-Morny-Cono" val MORNY_COMMIT_PATH = "https://github.com/Eyre-S/Coeur-Morny-Cono/commit/%s" - val VERSION = "2.0.0-alpha14" + val VERSION = "2.0.0-alpha15" val VERSION_DELTA: Option[String] = None val CODENAME = "guanggu" diff --git a/src/main/scala/cc/sukazyo/cono/morny/core/MornyCoeur.scala b/src/main/scala/cc/sukazyo/cono/morny/core/MornyCoeur.scala index f58c148..cb6b6c5 100644 --- a/src/main/scala/cc/sukazyo/cono/morny/core/MornyCoeur.scala +++ b/src/main/scala/cc/sukazyo/cono/morny/core/MornyCoeur.scala @@ -3,7 +3,9 @@ package cc.sukazyo.cono.morny.core import cc.sukazyo.cono.morny.core.Log.logger import cc.sukazyo.cono.morny.core.MornyCoeur.* import cc.sukazyo.cono.morny.core.bot.api.{EventListenerManager, MornyCommandManager, MornyQueryManager} +import cc.sukazyo.cono.morny.core.bot.api.messages.ThreadingManager import cc.sukazyo.cono.morny.core.bot.event.{MornyOnInlineQuery, MornyOnTelegramCommand, MornyOnUpdateTimestampOffsetLock} +import cc.sukazyo.cono.morny.core.bot.internal.ThreadingManagerImpl import cc.sukazyo.cono.morny.core.http.api.{HttpServer, MornyHttpServerContext} import cc.sukazyo.cono.morny.core.http.internal.MornyHttpServerContextImpl import cc.sukazyo.cono.morny.reporter.MornyReport @@ -167,6 +169,8 @@ class MornyCoeur (modules: List[MornyModule])(using val config: MornyConfig)(tes val tasks: Scheduler = Scheduler() /** current Morny's [[MornyTrusted]] instance */ val trusted: MornyTrusted = MornyTrusted() + private val _messageThreading: ThreadingManagerImpl = ThreadingManagerImpl(using account) + val messageThreading: ThreadingManager = _messageThreading val eventManager: EventListenerManager = EventListenerManager() val commands: MornyCommandManager = MornyCommandManager() @@ -186,6 +190,7 @@ class MornyCoeur (modules: List[MornyModule])(using val config: MornyConfig)(tes eventManager register MornyOnUpdateTimestampOffsetLock() eventManager register MornyOnTelegramCommand(using commands) eventManager register MornyOnInlineQuery(using queries) + eventManager register _messageThreading.NextMessageCatcher { // register core commands import bot.command.* val $MornyHellos = MornyHellos() @@ -205,6 +210,7 @@ class MornyCoeur (modules: List[MornyModule])(using val config: MornyConfig)(tes $MornyManagers.Exit, DirectMsgClear(), + _messageThreading.CancelCommand, ) } diff --git a/src/main/scala/cc/sukazyo/cono/morny/core/bot/api/messages/MessageThread.scala b/src/main/scala/cc/sukazyo/cono/morny/core/bot/api/messages/MessageThread.scala new file mode 100644 index 0000000..8eaf5fb --- /dev/null +++ b/src/main/scala/cc/sukazyo/cono/morny/core/bot/api/messages/MessageThread.scala @@ -0,0 +1,59 @@ +package cc.sukazyo.cono.morny.core.bot.api.messages + +import cc.sukazyo.cono.morny.core.bot.api.messages.MessageThread.{CallbackParameterized, ThreadKey} +import cc.sukazyo.cono.morny.util.tgapi.Standardize.{ChatID, UserID} +import cc.sukazyo.cono.morny.util.EpochDateTime.DurationMillis +import com.pengrad.telegrambot.model.Message + +trait MessageThread [P] { + + val starterContext: MessagingContext.WithUserAndMessage + lazy val threadKey: ThreadKey = ThreadKey `fromContext` starterContext + val passingData: P + val callback: CallbackParameterized[P] + val timeout: DurationMillis = 5 * 60 * 1000 + + def continueThread (continuingMessage: Message): Unit = { + callback.callback(continuingMessage, starterContext, passingData) + } + +} + +object MessageThread { + + @FunctionalInterface + trait Callback: + def callback(message: Message, previousContext: MessagingContext.WithUserAndMessage): Any + @FunctionalInterface + trait CallbackParameterized [P]: + def callback(message: Message, previousContext: MessagingContext.WithUserAndMessage, passingContext: P): Any + given Conversion[Callback, CallbackParameterized[Unit]] = + (callback: Callback) => (message, previousContext, _) => callback.callback(message, previousContext) + + def apply [P] + (using _cxt: MessagingContext.WithUserAndMessage) + (_data: P) + (_callback: CallbackParameterized[P]) + : MessageThread[P] = new MessageThread[P] { + override val starterContext: MessagingContext.WithUserAndMessage = _cxt + override val passingData: P = _data + override val callback: CallbackParameterized[P] = _callback + } + + def apply + (using _cxt: MessagingContext.WithUserAndMessage) + (_callback: Callback) + : MessageThread[Unit] = new MessageThread[Unit] { + override val starterContext: MessagingContext.WithUserAndMessage = _cxt + override val passingData: Unit = () + override val callback: CallbackParameterized[Unit] = _callback + } + + case class ThreadKey (chatid: ChatID, userid: UserID) + object ThreadKey: + infix def fromMessage (message: Message): ThreadKey = + ThreadKey(message.chat().id(), message.from().id()) + infix def fromContext (cxt: MessagingContext.WithUser): ThreadKey = + ThreadKey(cxt.bind_chat.id(), cxt.bind_user.id()) + +} diff --git a/src/main/scala/cc/sukazyo/cono/morny/core/bot/api/messages/MessagingContext.scala b/src/main/scala/cc/sukazyo/cono/morny/core/bot/api/messages/MessagingContext.scala new file mode 100644 index 0000000..601ed56 --- /dev/null +++ b/src/main/scala/cc/sukazyo/cono/morny/core/bot/api/messages/MessagingContext.scala @@ -0,0 +1,48 @@ +package cc.sukazyo.cono.morny.core.bot.api.messages + +import com.pengrad.telegrambot.model.{Chat, Message, User} + +/** + * @since 2.0.0 + */ +trait MessagingContext: + val bind_chat: Chat + +/** + * @since 2.0.0 + */ +object MessagingContext { + + given String = "aaa" + + def apply (_chat: Chat): MessagingContext = + new MessagingContext: + override val bind_chat: Chat = _chat + trait WithUser extends MessagingContext: + val bind_user: User + def apply (_chat: Chat, _user: User): WithUser = + new WithUser: + override val bind_chat: Chat = _chat + override val bind_user: User = _user + trait WithMessage extends MessagingContext: + val bind_message: Message + def apply (_chat: Chat, _message: Message): WithMessage = + new WithMessage: + override val bind_chat: Chat = _chat + override val bind_message: Message = _message + trait WithUserAndMessage extends MessagingContext with WithMessage with WithUser + def apply (_chat: Chat, _user: User, _message: Message): WithUserAndMessage = + new WithUserAndMessage: + override val bind_chat: Chat = _chat + override val bind_user: User = _user + override val bind_message: Message = _message + + /** Extract a message context from a message (or message event). + * + * @param message The message. + * @return The message context, contains the message's belongs chat, sender user and message itself. + */ + def extract (using message: Message): WithUserAndMessage = + apply(message.chat, message.from, message) + +} diff --git a/src/main/scala/cc/sukazyo/cono/morny/core/bot/api/messages/ThreadingManager.scala b/src/main/scala/cc/sukazyo/cono/morny/core/bot/api/messages/ThreadingManager.scala new file mode 100644 index 0000000..b4437c4 --- /dev/null +++ b/src/main/scala/cc/sukazyo/cono/morny/core/bot/api/messages/ThreadingManager.scala @@ -0,0 +1,62 @@ +package cc.sukazyo.cono.morny.core.bot.api.messages + +import cc.sukazyo.cono.morny.core.bot.api.messages.MessageThread.{Callback, CallbackParameterized, ThreadKey} +import com.pengrad.telegrambot.model.Message + +/** Message threads controller. + */ +trait ThreadingManager { + + /** Do the `_callback` when the next message is arrived. + * + * @since 2.0.0 + * + * @param _cxt Current message event context. + * @param _callback Function that will be executed in the next message. + */ + def doAfter + (using _cxt: MessagingContext.WithUserAndMessage) + (_callback: Callback) + : Unit + + /** Do the `_callback` when the next message is arrived. + * + * @since 2.0.0 + * + * @param _cxt Current message event context. + * @param _data Data that will passing to the `_callback` from current context. + * @param _callback The callback function that will be executed in the next message. + * @tparam P Type of the passing `_data`. + */ + def doAfter[P] + (using _cxt: MessagingContext.WithUserAndMessage) + (_data: P) + (_callback: CallbackParameterized[P]) + : Unit + + /** + * @since 2.0.0 + */ + def doAfter[P] (thread: MessageThread[P]): Unit + + /** Try to continue run a message thread using the given message. + * + * @since 2.0.0 + * + * @param message The message that will be used to continue the thread. + * @return `true` if any one message thread is successfully continued, `false` if this given + * message cannot continue any one message thread. + */ + def tryUpdate (message: Message): Boolean + + /** Cancel a message thread. + * + * @since 2.0.0 + * + * @param threadKey The key of the message thread. + * @return `true` if there's any one message thread is canceled, `false` if there's no message + * thread associated with the given key. + */ + def cancelThread (threadKey: ThreadKey): Boolean + +} diff --git a/src/main/scala/cc/sukazyo/cono/morny/core/bot/internal/ThreadingManagerImpl.scala b/src/main/scala/cc/sukazyo/cono/morny/core/bot/internal/ThreadingManagerImpl.scala new file mode 100644 index 0000000..b74b2e2 --- /dev/null +++ b/src/main/scala/cc/sukazyo/cono/morny/core/bot/internal/ThreadingManagerImpl.scala @@ -0,0 +1,114 @@ +package cc.sukazyo.cono.morny.core.bot.internal + +import cc.sukazyo.cono.morny.core.bot.api.{EventEnv, EventListener, ICommandAlias, ISimpleCommand} +import cc.sukazyo.cono.morny.core.bot.api.messages.{MessageThread, MessagingContext, ThreadingManager} +import cc.sukazyo.cono.morny.core.bot.api.messages.MessageThread.{Callback, CallbackParameterized, ThreadKey} +import cc.sukazyo.cono.morny.util.schedule.{DelayedTask, Scheduler, Task} +import cc.sukazyo.cono.morny.util.tgapi.InputCommand +import cc.sukazyo.cono.morny.util.tgapi.TelegramExtensions.Requests.unsafeExecute +import com.pengrad.telegrambot.model.{Message, Update} +import com.pengrad.telegrambot.TelegramBot +import com.pengrad.telegrambot.request.SendMessage + +class ThreadingManagerImpl (using bot: TelegramBot) extends ThreadingManager { + + private val threadMap = collection.mutable.Map[ThreadKey, InternalMessageThread[?]]() + private val threadMapCleaner = Scheduler(isDaemon = true) + + private class InternalMessageThread [P] ( + val thread: MessageThread[P] + ) { + + val timeoutCleanerTask: Task = ThreadingCleanerTask(this) + + def onExecuteIt (message: Message): Boolean = + val succeed = threadMap.synchronized: + this.onCancelIt() + if succeed then + this.thread.continueThread(message) + succeed + + def onCancelIt (): Boolean = + threadMap.synchronized: + threadMapCleaner % timeoutCleanerTask + threadMap.remove(thread.threadKey).nonEmpty + + } + private def ThreadingCleanerTask [P] (iThread: InternalMessageThread[P]): Task = + DelayedTask(s"", iThread.thread.timeout, { + threadMap.synchronized: + SendMessage( + iThread.thread.threadKey.chatid, + s"Timeout for future messages." + ).unsafeExecute + threadMap -= iThread.thread.threadKey + }) + + private def registerThread [P] (thread: MessageThread[P]): Unit = { + threadMap.synchronized: + if this.cancelThread(thread.threadKey) then + SendMessage( + thread.threadKey.chatid, + """There seems another message thread is waiting for future messages. + |That thread has been canceled automatically. + |""".stripMargin + ).unsafeExecute + val iThread = InternalMessageThread[P](thread) + threadMapCleaner ++ iThread.timeoutCleanerTask + threadMap += (thread.threadKey -> iThread) + } + + override def doAfter + (using _cxt: MessagingContext.WithUserAndMessage) + (_callback: Callback) + : Unit = + registerThread(MessageThread(_callback)) + + override def doAfter[P] + (using _cxt: MessagingContext.WithUserAndMessage) + (_data: P) + (_callback: CallbackParameterized[P]) + : Unit = + registerThread(MessageThread(_data)(_callback)) + + override def doAfter[P] (thread: MessageThread[P]): Unit = + registerThread(thread) + + override def tryUpdate (message: Message): Boolean = + threadMap.get(ThreadKey fromMessage message) + .exists(_.onExecuteIt(message)) + + override def cancelThread (threadKey: ThreadKey): Boolean = + threadMap.get(threadKey) + .exists(_.onCancelIt()) + + object NextMessageCatcher extends EventListener { + + override def onMessage (using event: EventEnv): Unit = { + if tryUpdate(event.update.message) then + event.setEventOk + } + + } + + object CancelCommand extends ISimpleCommand { + + override val name: String = "cancel" + override val aliases: List[ICommandAlias] = Nil + + override def execute (using command: InputCommand, event: Update): Unit = { + if cancelThread(ThreadKey fromMessage event.message) then + SendMessage( + event.message.chat.id, + "Canceled." + ).replyToMessageId(event.message.messageId).unsafeExecute + else + SendMessage( + event.message.chat.id, + "No active message thread to cancel." + ).replyToMessageId(event.message.messageId).unsafeExecute + } + + } + +} diff --git a/src/main/scala/cc/sukazyo/cono/morny/morny_misc/Testing.scala b/src/main/scala/cc/sukazyo/cono/morny/morny_misc/Testing.scala index 66ad141..5a25fc7 100644 --- a/src/main/scala/cc/sukazyo/cono/morny/morny_misc/Testing.scala +++ b/src/main/scala/cc/sukazyo/cono/morny/morny_misc/Testing.scala @@ -2,15 +2,14 @@ package cc.sukazyo.cono.morny.morny_misc import cc.sukazyo.cono.morny.core.MornyCoeur import cc.sukazyo.cono.morny.core.bot.api.{ICommandAlias, ISimpleCommand} +import cc.sukazyo.cono.morny.core.bot.api.messages.MessagingContext import cc.sukazyo.cono.morny.util.tgapi.InputCommand import cc.sukazyo.cono.morny.util.tgapi.TelegramExtensions.Requests.unsafeExecute -import com.pengrad.telegrambot.model.Update +import com.pengrad.telegrambot.model.{Message, Update} import com.pengrad.telegrambot.model.request.ParseMode import com.pengrad.telegrambot.request.SendMessage import com.pengrad.telegrambot.TelegramBot -import scala.language.postfixOps - class Testing (using coeur: MornyCoeur) extends ISimpleCommand { private given TelegramBot = coeur.account @@ -18,14 +17,27 @@ class Testing (using coeur: MornyCoeur) extends ISimpleCommand { override val aliases: List[ICommandAlias] = Nil override def execute (using command: InputCommand, event: Update): Unit = { + given context: MessagingContext.WithUserAndMessage = MessagingContext.extract(using event.message) SendMessage( event.message.chat.id, // language=html - "Just a TEST command." + "Just a TEST command.\n" + + "Please input something to test the command." ).replyToMessageId(event.message.messageId).parseMode(ParseMode HTML) .unsafeExecute + coeur.messageThreading.doAfter(execute2) + + } + + private def execute2 (message: Message, previousContext: MessagingContext.WithUserAndMessage): Unit = { + SendMessage( + message.chat.id, + // language=html + "Test command with following input:\n" + message.text + ).replyToMessageId(message.messageId).parseMode(ParseMode HTML) + .unsafeExecute } } diff --git a/src/main/scala/cc/sukazyo/cono/morny/util/schedule/Scheduler.scala b/src/main/scala/cc/sukazyo/cono/morny/util/schedule/Scheduler.scala index a51e832..2ef0ecb 100644 --- a/src/main/scala/cc/sukazyo/cono/morny/util/schedule/Scheduler.scala +++ b/src/main/scala/cc/sukazyo/cono/morny/util/schedule/Scheduler.scala @@ -18,10 +18,16 @@ import scala.collection.mutable * If you want to remove a task, use [[Scheduler.%]] or [[Scheduler.cancel]]. * Removal task should be the same task object, but not just the same name. * - * The scheduler will not automatic stop when the tasks is all done and the - * main thread is stopped. You can/should use [[stop]], [[waitForStop]], - * [[tagStopAtAllDone]], [[waitForStopAtAllDone]] to async or syncing stop - * the scheduler. + * As defaults behavior, the scheduler will not automatic stop when the tasks + * is all done and the main thread is stopped. You can/should use [[stop]], + * [[waitForStop]], [[tagStopAtAllDone]], [[waitForStopAtAllDone]] to async + * or syncing stop the scheduler. + * + * If you want to let it stop automatically when the main thread is stopped, + * set the `isDaemon` parameter to `true` when creating the scheduler. Therefore + * this scheduler's runner thread will be tagged as a daemon thread, and will + * automatically stop when the main thread is stopped. For more details about daemon + * thread, see [[Thread.setDaemon]]. * * == Implementation details == * @@ -37,8 +43,11 @@ import scala.collection.mutable * thread name will be set to [[Task.name]]#post. After all of * that, the task is fully complete, and the runner's thread name will be * reset to [[runnerName]]. + * + * @param isDaemon if the runner thread should be a daemon thread. See [[Thread.setDaemon]] + * for more information. Defaults are false. */ -class Scheduler { +class Scheduler (isDaemon: Boolean = false) { /** Status tag of this scheduler. */ //noinspection ScalaWeakerAccess @@ -134,6 +143,7 @@ class Scheduler { } runtime `setName` runnerName + runtime.setDaemon(isDaemon) runtime.start() /** Name of the scheduler runner. diff --git a/src/main/scala/cc/sukazyo/cono/morny/util/tgapi/Standardize.scala b/src/main/scala/cc/sukazyo/cono/morny/util/tgapi/Standardize.scala index 2796c02..e3547e8 100644 --- a/src/main/scala/cc/sukazyo/cono/morny/util/tgapi/Standardize.scala +++ b/src/main/scala/cc/sukazyo/cono/morny/util/tgapi/Standardize.scala @@ -1,9 +1,15 @@ package cc.sukazyo.cono.morny.util.tgapi +import com.pengrad.telegrambot.model.{Chat, Message, User} + object Standardize { + type UserID = Long + type ChatID = Long + type MessageID = Int + val CHANNEL_SPEAKER_MAGIC_ID = 136817688 val MASK_BOTAPI_ID: Long = -1000000000000 -} \ No newline at end of file +}