From b57d87dece08da6f103dd9d88c8faf489a23af7f Mon Sep 17 00:00:00 2001 From: Eyre_S Date: Fri, 3 Nov 2023 20:23:32 +0800 Subject: [PATCH] add basic Scheduler and interval tasks - MedicationTimer refactor using new scheduler - add `/info tasks` for monitoring morny global tasks --- gradle.properties | 2 +- .../cc/sukazyo/cono/morny/MornyCoeur.scala | 7 +- .../morny/bot/command/MornyInformation.scala | 16 ++- .../cono/morny/daemon/MedicationTimer.scala | 61 +++++---- .../cono/morny/daemon/MornyDaemons.scala | 5 +- .../morny/internal/schedule/DelayedTask.scala | 18 +++ .../internal/schedule/IntervalTask.scala | 25 ++++ .../schedule/IntervalWithTimesTask.scala | 26 ++++ .../morny/internal/schedule/RoutineTask.scala | 12 ++ .../morny/internal/schedule/Scheduler.scala | 119 ++++++++++++++++++ .../cono/morny/internal/schedule/Task.scala | 21 ++++ .../morny/test/data/BilibiliFormsTest.scala | 5 +- 12 files changed, 277 insertions(+), 40 deletions(-) create mode 100644 src/main/scala/cc/sukazyo/cono/morny/internal/schedule/DelayedTask.scala create mode 100644 src/main/scala/cc/sukazyo/cono/morny/internal/schedule/IntervalTask.scala create mode 100644 src/main/scala/cc/sukazyo/cono/morny/internal/schedule/IntervalWithTimesTask.scala create mode 100644 src/main/scala/cc/sukazyo/cono/morny/internal/schedule/RoutineTask.scala create mode 100644 src/main/scala/cc/sukazyo/cono/morny/internal/schedule/Scheduler.scala create mode 100644 src/main/scala/cc/sukazyo/cono/morny/internal/schedule/Task.scala diff --git a/gradle.properties b/gradle.properties index d12c207..1266f2e 100644 --- a/gradle.properties +++ b/gradle.properties @@ -5,7 +5,7 @@ MORNY_ARCHIVE_NAME = morny-coeur MORNY_CODE_STORE = https://github.com/Eyre-S/Coeur-Morny-Cono MORNY_COMMIT_PATH = https://github.com/Eyre-S/Coeur-Morny-Cono/commit/%s -VERSION = 1.2.1 +VERSION = 1.3.0-feat/scheduler USE_DELTA = false VERSION_DELTA = diff --git a/src/main/scala/cc/sukazyo/cono/morny/MornyCoeur.scala b/src/main/scala/cc/sukazyo/cono/morny/MornyCoeur.scala index e9337cf..af4bb17 100644 --- a/src/main/scala/cc/sukazyo/cono/morny/MornyCoeur.scala +++ b/src/main/scala/cc/sukazyo/cono/morny/MornyCoeur.scala @@ -7,6 +7,7 @@ import cc.sukazyo.cono.morny.MornyCoeur.THREAD_SERVER_EXIT import cc.sukazyo.cono.morny.bot.api.EventListenerManager import cc.sukazyo.cono.morny.bot.event.{MornyEventListeners, MornyOnInlineQuery, MornyOnTelegramCommand, MornyOnUpdateTimestampOffsetLock} import cc.sukazyo.cono.morny.bot.query.MornyQueries +import cc.sukazyo.cono.morny.internal.schedule.Scheduler import com.pengrad.telegrambot.TelegramBot import com.pengrad.telegrambot.request.GetMe @@ -64,6 +65,8 @@ class MornyCoeur (using val config: MornyConfig) { /** current Morny's [[MornyTrusted]] instance */ val trusted: MornyTrusted = MornyTrusted() + /** Morny's task [[Scheduler]] */ + val tasks: Scheduler = Scheduler() val daemons: MornyDaemons = MornyDaemons() //noinspection ScalaWeakerAccess @@ -101,6 +104,8 @@ class MornyCoeur (using val config: MornyConfig) { account.shutdown() logger info "stopped bot account" daemons.stop() + tasks.waitForStop() + logger info s"morny tasks stopped: remains ${tasks.amount} tasks not be executed" if config.commandLogoutClear then commands.automaticTGListRemove() logger info "done exit cleanup" @@ -160,5 +165,5 @@ class MornyCoeur (using val config: MornyConfig) { } } - + } diff --git a/src/main/scala/cc/sukazyo/cono/morny/bot/command/MornyInformation.scala b/src/main/scala/cc/sukazyo/cono/morny/bot/command/MornyInformation.scala index 36222eb..c877082 100644 --- a/src/main/scala/cc/sukazyo/cono/morny/bot/command/MornyInformation.scala +++ b/src/main/scala/cc/sukazyo/cono/morny/bot/command/MornyInformation.scala @@ -22,11 +22,12 @@ class MornyInformation (using coeur: MornyCoeur) extends ITelegramCommand { val RUNTIME = "runtime" val VERSION = "version" val VERSION_2 = "v" + val TASKS = "tasks" } override val name: String = "info" override val aliases: Array[ICommandAlias]|Null = null - override val paramRule: String = "[(version|runtime|stickers[.IDs])]" + override val paramRule: String = "[(version|runtime|stickers[.IDs]|tasks)]" override val description: String = "输出当前 Morny 的各种信息" override def execute (using command: InputCommand, event: Update): Unit = { @@ -42,6 +43,7 @@ class MornyInformation (using coeur: MornyCoeur) extends ITelegramCommand { case s if s startsWith Subs.STICKERS => echoStickers case Subs.RUNTIME => echoRuntime case Subs.VERSION | Subs.VERSION_2 => echoVersion + case Subs.TASKS => echoTasksStatus case _ => echo404 } @@ -144,6 +146,18 @@ class MornyInformation (using coeur: MornyCoeur) extends ITelegramCommand { ).parseMode(ParseMode HTML).replyToMessageId(event.message.messageId) } + private def echoTasksStatus (using update: Update): Unit = { +// if !coeur.trusted.isTrusted(update.message.from.id) then return; + coeur.account exec SendMessage( + update.message.chat.id, + // language=html + s"""Coeur Task Scheduler: + | - scheduled tasks: ${coeur.tasks.amount} + | - current runner status: ${coeur.tasks.state} + |""".stripMargin + ).parseMode(ParseMode.HTML).replyToMessageId(update.message.messageId) + } + private def echo404 (using event: Update): Unit = coeur.account exec new SendSticker( event.message.chat.id, diff --git a/src/main/scala/cc/sukazyo/cono/morny/daemon/MedicationTimer.scala b/src/main/scala/cc/sukazyo/cono/morny/daemon/MedicationTimer.scala index 4840418..f90a54d 100644 --- a/src/main/scala/cc/sukazyo/cono/morny/daemon/MedicationTimer.scala +++ b/src/main/scala/cc/sukazyo/cono/morny/daemon/MedicationTimer.scala @@ -3,6 +3,7 @@ package cc.sukazyo.cono.morny.daemon import cc.sukazyo.cono.morny.Log.{exceptionLog, logger} import cc.sukazyo.cono.morny.MornyCoeur import cc.sukazyo.cono.morny.daemon.MedicationTimer.calcNextRoutineTimestamp +import cc.sukazyo.cono.morny.internal.schedule.RoutineTask import cc.sukazyo.cono.morny.util.tgapi.TelegramExtensions.Bot.exec import cc.sukazyo.cono.morny.util.CommonFormat import com.pengrad.telegrambot.model.{Message, MessageEntity} @@ -13,7 +14,7 @@ import java.time.{LocalDateTime, ZoneOffset} import scala.collection.mutable.ArrayBuffer import scala.language.implicitConversions -class MedicationTimer (using coeur: MornyCoeur) extends Thread { +class MedicationTimer (using coeur: MornyCoeur) { private val NOTIFY_MESSAGE = "🍥⏲" private val DAEMON_THREAD_NAME_DEF = "MedicationTimer" @@ -23,45 +24,41 @@ class MedicationTimer (using coeur: MornyCoeur) extends Thread { private val notify_atHour: Set[Int] = coeur.config.medicationNotifyAt.asScala.toSet.map(_.intValue) private val notify_toChat = coeur.config.medicationNotifyToChat - this.setName(DAEMON_THREAD_NAME_DEF) - private var lastNotify_messageId: Option[Int] = None - override def run (): Unit = { + private val scheduleTask: RoutineTask = new RoutineTask { - if ((notify_toChat == -1) || (notify_atHour isEmpty)) { - logger notice "Medication Timer disabled : related param is not complete set" - return - } + override def name: String = DAEMON_THREAD_NAME_DEF - logger notice "Medication Timer started." - while (!this.isInterrupted) { - try { - val next_time = calcNextRoutineTimestamp(System.currentTimeMillis, use_timeZone, notify_atHour) - logger info s"medication timer will send next notify at ${CommonFormat.formatDate(next_time, use_timeZone.getTotalSeconds/60/60)} with $use_timeZone [$next_time]" - val sleep_millis = next_time - System.currentTimeMillis - logger debug s"medication timer will sleep ${CommonFormat.formatDuration(sleep_millis)} [$sleep_millis]" - Thread sleep sleep_millis - sendNotification() - logger info "medication notify sent." - } catch - case _: InterruptedException => - interrupt() - logger notice "MedicationTimer was interrupted, will be exit now" - case ill: IllegalArgumentException => - logger warn "MedicationTimer will not work due to: " + ill.getMessage - interrupt() - case e => - logger error - s"""unexpected error occurred on NotificationTimer - |${exceptionLog(e)}""" - .stripMargin - coeur.daemons.reporter.exception(e) + def calcNextSendTime: Long = + val next_time = calcNextRoutineTimestamp(System.currentTimeMillis, use_timeZone, notify_atHour) + logger info s"medication timer will send next notify at ${CommonFormat.formatDate(next_time, use_timeZone.getTotalSeconds / 60 / 60)} with $use_timeZone [$next_time]" + next_time + + override def firstRoutineTimeMillis: Long = + calcNextSendTime + + override def nextRoutineTimeMillis (previousRoutineScheduledTimeMillis: Long): Long | Null = + calcNextSendTime + + override def main: Unit = { + sendNotification() + logger info "medication notify sent." } - logger notice "Medication Timer stopped." } + def start(): Unit = + if ((notify_toChat == -1) || (notify_atHour isEmpty)) + logger notice "Medication Timer disabled : related param is not complete set" + return; + coeur.tasks ++ scheduleTask + logger notice "Medication Timer started." + + def stop(): Unit = + coeur.tasks % scheduleTask + logger notice "Medication Timer stopped." + private def sendNotification(): Unit = { val sendResponse: SendResponse = coeur.account exec SendMessage(notify_toChat, NOTIFY_MESSAGE) if sendResponse isOk then lastNotify_messageId = Some(sendResponse.message.messageId) diff --git a/src/main/scala/cc/sukazyo/cono/morny/daemon/MornyDaemons.scala b/src/main/scala/cc/sukazyo/cono/morny/daemon/MornyDaemons.scala index 44c0edc..0a60e1a 100644 --- a/src/main/scala/cc/sukazyo/cono/morny/daemon/MornyDaemons.scala +++ b/src/main/scala/cc/sukazyo/cono/morny/daemon/MornyDaemons.scala @@ -25,11 +25,8 @@ class MornyDaemons (using val coeur: MornyCoeur) { logger notice "stopping All Morny Daemons..." // TrackerDataManager.DAEMON.interrupt(); - medicationTimer.interrupt() + medicationTimer.stop() // TrackerDataManager.trackingLock.lock(); - try { medicationTimer.join() } - catch case e: InterruptedException => - e.printStackTrace(System.out) logger notice "stopped ALL Morny Daemons." } diff --git a/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/DelayedTask.scala b/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/DelayedTask.scala new file mode 100644 index 0000000..582e4f6 --- /dev/null +++ b/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/DelayedTask.scala @@ -0,0 +1,18 @@ +package cc.sukazyo.cono.morny.internal.schedule + +trait DelayedTask ( + val delayedMillis: Long +) extends Task { + + override val scheduledTimeMillis: Long = System.currentTimeMillis + delayedMillis + +} + +object DelayedTask { + + def apply (_name: String, delayedMillis: Long, task: =>Unit): DelayedTask = + new DelayedTask (delayedMillis): + override val name: String = _name + override def main: Unit = task + +} diff --git a/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/IntervalTask.scala b/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/IntervalTask.scala new file mode 100644 index 0000000..5b255de --- /dev/null +++ b/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/IntervalTask.scala @@ -0,0 +1,25 @@ +package cc.sukazyo.cono.morny.internal.schedule + +trait IntervalTask extends RoutineTask { + + def intervalMillis: Long + + override def firstRoutineTimeMillis: Long = + System.currentTimeMillis() + intervalMillis + + override def nextRoutineTimeMillis ( + previousScheduledRoutineTimeMillis: Long + ): Long|Null = + previousScheduledRoutineTimeMillis + intervalMillis + +} + +object IntervalTask { + + def apply (_name: String, _intervalMillis: Long, task: =>Unit): IntervalTask = + new IntervalTask: + override def intervalMillis: Long = _intervalMillis + override def name: String = _name + override def main: Unit = task + +} diff --git a/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/IntervalWithTimesTask.scala b/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/IntervalWithTimesTask.scala new file mode 100644 index 0000000..249a95e --- /dev/null +++ b/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/IntervalWithTimesTask.scala @@ -0,0 +1,26 @@ +package cc.sukazyo.cono.morny.internal.schedule + +trait IntervalWithTimesTask extends IntervalTask { + + def times: Int + private var currentExecutedTimes = 1 + + override def nextRoutineTimeMillis (previousScheduledRoutineTimeMillis: Long): Long | Null = + if currentExecutedTimes >= times then + null + else + currentExecutedTimes = currentExecutedTimes + 1 + super.nextRoutineTimeMillis(previousScheduledRoutineTimeMillis) + +} + +object IntervalWithTimesTask { + + def apply (_name: String, _intervalMillis: Long, _times: Int, task: =>Unit): IntervalWithTimesTask = + new IntervalWithTimesTask: + override def name: String = _name + override def times: Int = _times + override def intervalMillis: Long = _intervalMillis + override def main: Unit = task + +} diff --git a/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/RoutineTask.scala b/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/RoutineTask.scala new file mode 100644 index 0000000..40f47ef --- /dev/null +++ b/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/RoutineTask.scala @@ -0,0 +1,12 @@ +package cc.sukazyo.cono.morny.internal.schedule + +trait RoutineTask extends Task { + + private[schedule] var currentScheduledTimeMillis: Long = firstRoutineTimeMillis + override def scheduledTimeMillis: Long = currentScheduledTimeMillis + + def firstRoutineTimeMillis: Long + + def nextRoutineTimeMillis (previousRoutineScheduledTimeMillis: Long): Long|Null + +} diff --git a/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/Scheduler.scala b/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/Scheduler.scala new file mode 100644 index 0000000..9b71b29 --- /dev/null +++ b/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/Scheduler.scala @@ -0,0 +1,119 @@ +package cc.sukazyo.cono.morny.internal.schedule + +import scala.collection.mutable + +class Scheduler { + +// val taskList: util.TreeSet[Task] = +// Collections.synchronizedSortedSet(util.TreeSet[Task]()) + private val taskList: mutable.TreeSet[Task] = mutable.TreeSet.empty + private var exitAtNextRoutine = false + private var waitForDone = false + private var currentRunning: Task|Null = _ + private var currentRunning_isScheduledCancel = false + private val runtime: Thread = new Thread { + + override def run (): Unit = { + def willExit: Boolean = + if exitAtNextRoutine then true + else if waitForDone then + taskList.synchronized: + if taskList.isEmpty then true + else false + else false + while (!willExit) { + + val nextMove: Task|Long = taskList.synchronized { + taskList.headOption match + case Some(_readyToRun) if System.currentTimeMillis >= _readyToRun.scheduledTimeMillis => + taskList -= _readyToRun + currentRunning = _readyToRun + _readyToRun + case Some(_notReady) => + _notReady.scheduledTimeMillis - System.currentTimeMillis + case None => + Long.MaxValue + } + + nextMove match + case readyToRun: Task => + + this setName readyToRun.name + + try { + readyToRun.main + } catch case _: (Exception | Error) => {} + + this setName s"${readyToRun.name}#post" + + currentRunning match + case routine: RoutineTask => + routine.nextRoutineTimeMillis(routine.currentScheduledTimeMillis) match + case next: Long => + routine.currentScheduledTimeMillis = next + if (!currentRunning_isScheduledCancel) schedule(routine) + case _ => + case _ => + + this setName runnerName + currentRunning = null + + case needToWaitMillis: Long => + try Thread.sleep(needToWaitMillis) + catch case _: InterruptedException => {} + + } + } + + } + runtime.start() + + //noinspection ScalaWeakerAccess + def runnerName: String = + this.toString + + def ++ (task: Task): this.type = + schedule(task) + this + def schedule (task: Task): Boolean = + try taskList.synchronized: + taskList add task + finally runtime.interrupt() + + def % (task: Task): this.type = + cancel(task) + this + def cancel (task: Task): Boolean = + try { + val succeed = taskList.synchronized { taskList remove task } + if succeed then succeed + else if task == currentRunning then + currentRunning_isScheduledCancel = true + true + else false + } + finally runtime.interrupt() + + def amount: Int = + taskList.size + + def state: Thread.State = + runtime.getState + + def stop (): Unit = + exitAtNextRoutine = true + runtime.interrupt() + + def waitForStop (): Unit = + stop() + runtime.join() + + //noinspection ScalaWeakerAccess + def tagStopAtAllDone (): Unit = + waitForDone = true + + def waitForStopAtAllDone(): Unit = + tagStopAtAllDone() + runtime.join() + +} diff --git a/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/Task.scala b/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/Task.scala new file mode 100644 index 0000000..ec1840f --- /dev/null +++ b/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/Task.scala @@ -0,0 +1,21 @@ +package cc.sukazyo.cono.morny.internal.schedule + +trait Task extends Ordered[Task] { + + def name: String + def scheduledTimeMillis: Long + + //noinspection UnitMethodIsParameterless + def main: Unit + + override def compare (that: Task): Int = + if this.scheduledTimeMillis == that.scheduledTimeMillis then + this.hashCode - that.hashCode + else if this.scheduledTimeMillis > that.scheduledTimeMillis then + 1 + else -1 + + override def toString: String = + s"""${super.toString}{"$name": $scheduledTimeMillis}""" + +} diff --git a/src/test/scala/cc/sukazyo/cono/morny/test/data/BilibiliFormsTest.scala b/src/test/scala/cc/sukazyo/cono/morny/test/data/BilibiliFormsTest.scala index c3dde75..6461e37 100644 --- a/src/test/scala/cc/sukazyo/cono/morny/test/data/BilibiliFormsTest.scala +++ b/src/test/scala/cc/sukazyo/cono/morny/test/data/BilibiliFormsTest.scala @@ -94,7 +94,10 @@ class BilibiliFormsTest extends MornyTests with TableDrivenPropertyChecks { val examples = Table( ("b23_link", "bilibili_video_link"), ("https://b23.tv/iiCldvZ", "https://www.bilibili.com/video/BV1Gh411P7Sh?buvid=XY6F25B69BE9CF469FF5B917D012C93E95E72&is_story_h5=false&mid=wD6DQnYivIG5pfA3sAGL6A%3D%3D&p=1&plat_id=114&share_from=ugc&share_medium=android&share_plat=android&share_session_id=8081015b-1210-4dea-a665-6746b4850fcd&share_source=COPY&share_tag=s_i×tamp=1689605644&unique_k=iiCldvZ&up_id=19977489"), - ("http://b23.tv/3ymowwx", "https://www.bilibili.com/video/BV15Y411n754?p=1&share_medium=android_i&share_plat=android&share_source=COPY&share_tag=s_i×tamp=1650293889&unique_k=3ymowwx") + ("https://b23.tv/xWiWFl9", "https://www.bilibili.com/video/BV1N54y1c7us?buvid=XY705C970C2ADBB710C1801E1F45BDC3B9210&is_story_h5=false&mid=w%2B1u1wpibjYsW4pP%2FIo7Ww%3D%3D&p=1&plat_id=116&share_from=ugc&share_medium=android&share_plat=android&share_session_id=6da09711-d601-4da4-bba1-46a4edbb1c60&share_source=COPY&share_tag=s_i×tamp=1680280016&unique_k=xWiWFl9&up_id=275354674"), + ("http://b23.tv/uJPIvhv", "https://www.bilibili.com/video/BV1E84y1C7in?is_story_h5=false&p=1&share_from=ugc&share_medium=android&share_plat=android&share_session_id=4a077fa1-5ee2-40d4-ac37-bf9a2bf567e3&share_source=COPY&share_tag=s_i×tamp=1669044671&unique_k=uJPIvhv") + // this link have been expired +// ("http://b23.tv/3ymowwx", "https://www.bilibili.com/video/BV15Y411n754?p=1&share_medium=android_i&share_plat=android&share_source=COPY&share_tag=s_i×tamp=1650293889&unique_k=3ymowwx") ) "not b23.tv link is not supported" in: