diff --git a/src/main/scala/cc/sukazyo/cono/morny/MornyCoeur.scala b/src/main/scala/cc/sukazyo/cono/morny/MornyCoeur.scala index af4bb17..f763451 100644 --- a/src/main/scala/cc/sukazyo/cono/morny/MornyCoeur.scala +++ b/src/main/scala/cc/sukazyo/cono/morny/MornyCoeur.scala @@ -7,7 +7,7 @@ import cc.sukazyo.cono.morny.MornyCoeur.THREAD_SERVER_EXIT import cc.sukazyo.cono.morny.bot.api.EventListenerManager import cc.sukazyo.cono.morny.bot.event.{MornyEventListeners, MornyOnInlineQuery, MornyOnTelegramCommand, MornyOnUpdateTimestampOffsetLock} import cc.sukazyo.cono.morny.bot.query.MornyQueries -import cc.sukazyo.cono.morny.internal.schedule.Scheduler +import cc.sukazyo.cono.morny.util.schedule.Scheduler import com.pengrad.telegrambot.TelegramBot import com.pengrad.telegrambot.request.GetMe diff --git a/src/main/scala/cc/sukazyo/cono/morny/bot/command/MornyInformation.scala b/src/main/scala/cc/sukazyo/cono/morny/bot/command/MornyInformation.scala index c877082..b754e52 100644 --- a/src/main/scala/cc/sukazyo/cono/morny/bot/command/MornyInformation.scala +++ b/src/main/scala/cc/sukazyo/cono/morny/bot/command/MornyInformation.scala @@ -153,7 +153,8 @@ class MornyInformation (using coeur: MornyCoeur) extends ITelegramCommand { // language=html s"""Coeur Task Scheduler: | - scheduled tasks: ${coeur.tasks.amount} - | - current runner status: ${coeur.tasks.state} + | - scheduler status: ${coeur.tasks.state} + | - current runner status: ${coeur.tasks.runnerState} |""".stripMargin ).parseMode(ParseMode.HTML).replyToMessageId(update.message.messageId) } diff --git a/src/main/scala/cc/sukazyo/cono/morny/daemon/MedicationTimer.scala b/src/main/scala/cc/sukazyo/cono/morny/daemon/MedicationTimer.scala index f90a54d..7d81ebd 100644 --- a/src/main/scala/cc/sukazyo/cono/morny/daemon/MedicationTimer.scala +++ b/src/main/scala/cc/sukazyo/cono/morny/daemon/MedicationTimer.scala @@ -1,9 +1,9 @@ package cc.sukazyo.cono.morny.daemon -import cc.sukazyo.cono.morny.Log.{exceptionLog, logger} +import cc.sukazyo.cono.morny.Log.logger import cc.sukazyo.cono.morny.MornyCoeur import cc.sukazyo.cono.morny.daemon.MedicationTimer.calcNextRoutineTimestamp -import cc.sukazyo.cono.morny.internal.schedule.RoutineTask +import cc.sukazyo.cono.morny.util.schedule.RoutineTask import cc.sukazyo.cono.morny.util.tgapi.TelegramExtensions.Bot.exec import cc.sukazyo.cono.morny.util.CommonFormat import com.pengrad.telegrambot.model.{Message, MessageEntity} diff --git a/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/Scheduler.scala b/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/Scheduler.scala deleted file mode 100644 index 9b71b29..0000000 --- a/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/Scheduler.scala +++ /dev/null @@ -1,119 +0,0 @@ -package cc.sukazyo.cono.morny.internal.schedule - -import scala.collection.mutable - -class Scheduler { - -// val taskList: util.TreeSet[Task] = -// Collections.synchronizedSortedSet(util.TreeSet[Task]()) - private val taskList: mutable.TreeSet[Task] = mutable.TreeSet.empty - private var exitAtNextRoutine = false - private var waitForDone = false - private var currentRunning: Task|Null = _ - private var currentRunning_isScheduledCancel = false - private val runtime: Thread = new Thread { - - override def run (): Unit = { - def willExit: Boolean = - if exitAtNextRoutine then true - else if waitForDone then - taskList.synchronized: - if taskList.isEmpty then true - else false - else false - while (!willExit) { - - val nextMove: Task|Long = taskList.synchronized { - taskList.headOption match - case Some(_readyToRun) if System.currentTimeMillis >= _readyToRun.scheduledTimeMillis => - taskList -= _readyToRun - currentRunning = _readyToRun - _readyToRun - case Some(_notReady) => - _notReady.scheduledTimeMillis - System.currentTimeMillis - case None => - Long.MaxValue - } - - nextMove match - case readyToRun: Task => - - this setName readyToRun.name - - try { - readyToRun.main - } catch case _: (Exception | Error) => {} - - this setName s"${readyToRun.name}#post" - - currentRunning match - case routine: RoutineTask => - routine.nextRoutineTimeMillis(routine.currentScheduledTimeMillis) match - case next: Long => - routine.currentScheduledTimeMillis = next - if (!currentRunning_isScheduledCancel) schedule(routine) - case _ => - case _ => - - this setName runnerName - currentRunning = null - - case needToWaitMillis: Long => - try Thread.sleep(needToWaitMillis) - catch case _: InterruptedException => {} - - } - } - - } - runtime.start() - - //noinspection ScalaWeakerAccess - def runnerName: String = - this.toString - - def ++ (task: Task): this.type = - schedule(task) - this - def schedule (task: Task): Boolean = - try taskList.synchronized: - taskList add task - finally runtime.interrupt() - - def % (task: Task): this.type = - cancel(task) - this - def cancel (task: Task): Boolean = - try { - val succeed = taskList.synchronized { taskList remove task } - if succeed then succeed - else if task == currentRunning then - currentRunning_isScheduledCancel = true - true - else false - } - finally runtime.interrupt() - - def amount: Int = - taskList.size - - def state: Thread.State = - runtime.getState - - def stop (): Unit = - exitAtNextRoutine = true - runtime.interrupt() - - def waitForStop (): Unit = - stop() - runtime.join() - - //noinspection ScalaWeakerAccess - def tagStopAtAllDone (): Unit = - waitForDone = true - - def waitForStopAtAllDone(): Unit = - tagStopAtAllDone() - runtime.join() - -} diff --git a/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/Task.scala b/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/Task.scala deleted file mode 100644 index ec1840f..0000000 --- a/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/Task.scala +++ /dev/null @@ -1,21 +0,0 @@ -package cc.sukazyo.cono.morny.internal.schedule - -trait Task extends Ordered[Task] { - - def name: String - def scheduledTimeMillis: Long - - //noinspection UnitMethodIsParameterless - def main: Unit - - override def compare (that: Task): Int = - if this.scheduledTimeMillis == that.scheduledTimeMillis then - this.hashCode - that.hashCode - else if this.scheduledTimeMillis > that.scheduledTimeMillis then - 1 - else -1 - - override def toString: String = - s"""${super.toString}{"$name": $scheduledTimeMillis}""" - -} diff --git a/src/main/scala/cc/sukazyo/cono/morny/util/EpochDateTime.scala b/src/main/scala/cc/sukazyo/cono/morny/util/EpochDateTime.scala index 0bc623f..81613c9 100644 --- a/src/main/scala/cc/sukazyo/cono/morny/util/EpochDateTime.scala +++ b/src/main/scala/cc/sukazyo/cono/morny/util/EpochDateTime.scala @@ -5,7 +5,12 @@ import java.time.format.DateTimeFormatter object EpochDateTime { + /** The UNIX Epoch Time in milliseconds. + * + * aka. Milliseconds since 00:00:00 UTC on Thursday, 1 January 1970. + */ type EpochMillis = Long + /** Time duration/interval in milliseconds. */ type DurationMillis = Long object EpochMillis: diff --git a/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/DelayedTask.scala b/src/main/scala/cc/sukazyo/cono/morny/util/schedule/DelayedTask.scala similarity index 88% rename from src/main/scala/cc/sukazyo/cono/morny/internal/schedule/DelayedTask.scala rename to src/main/scala/cc/sukazyo/cono/morny/util/schedule/DelayedTask.scala index 582e4f6..a8a9499 100644 --- a/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/DelayedTask.scala +++ b/src/main/scala/cc/sukazyo/cono/morny/util/schedule/DelayedTask.scala @@ -1,4 +1,4 @@ -package cc.sukazyo.cono.morny.internal.schedule +package cc.sukazyo.cono.morny.util.schedule trait DelayedTask ( val delayedMillis: Long diff --git a/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/IntervalTask.scala b/src/main/scala/cc/sukazyo/cono/morny/util/schedule/IntervalTask.scala similarity index 92% rename from src/main/scala/cc/sukazyo/cono/morny/internal/schedule/IntervalTask.scala rename to src/main/scala/cc/sukazyo/cono/morny/util/schedule/IntervalTask.scala index 5b255de..cc87240 100644 --- a/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/IntervalTask.scala +++ b/src/main/scala/cc/sukazyo/cono/morny/util/schedule/IntervalTask.scala @@ -1,4 +1,4 @@ -package cc.sukazyo.cono.morny.internal.schedule +package cc.sukazyo.cono.morny.util.schedule trait IntervalTask extends RoutineTask { diff --git a/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/IntervalWithTimesTask.scala b/src/main/scala/cc/sukazyo/cono/morny/util/schedule/IntervalWithTimesTask.scala similarity index 93% rename from src/main/scala/cc/sukazyo/cono/morny/internal/schedule/IntervalWithTimesTask.scala rename to src/main/scala/cc/sukazyo/cono/morny/util/schedule/IntervalWithTimesTask.scala index 249a95e..c2055a9 100644 --- a/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/IntervalWithTimesTask.scala +++ b/src/main/scala/cc/sukazyo/cono/morny/util/schedule/IntervalWithTimesTask.scala @@ -1,4 +1,4 @@ -package cc.sukazyo.cono.morny.internal.schedule +package cc.sukazyo.cono.morny.util.schedule trait IntervalWithTimesTask extends IntervalTask { diff --git a/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/RoutineTask.scala b/src/main/scala/cc/sukazyo/cono/morny/util/schedule/RoutineTask.scala similarity index 86% rename from src/main/scala/cc/sukazyo/cono/morny/internal/schedule/RoutineTask.scala rename to src/main/scala/cc/sukazyo/cono/morny/util/schedule/RoutineTask.scala index 40f47ef..09ade60 100644 --- a/src/main/scala/cc/sukazyo/cono/morny/internal/schedule/RoutineTask.scala +++ b/src/main/scala/cc/sukazyo/cono/morny/util/schedule/RoutineTask.scala @@ -1,4 +1,4 @@ -package cc.sukazyo.cono.morny.internal.schedule +package cc.sukazyo.cono.morny.util.schedule trait RoutineTask extends Task { diff --git a/src/main/scala/cc/sukazyo/cono/morny/util/schedule/Scheduler.scala b/src/main/scala/cc/sukazyo/cono/morny/util/schedule/Scheduler.scala new file mode 100644 index 0000000..190f9f7 --- /dev/null +++ b/src/main/scala/cc/sukazyo/cono/morny/util/schedule/Scheduler.scala @@ -0,0 +1,269 @@ +package cc.sukazyo.cono.morny.util.schedule + +import scala.annotation.targetName +import scala.collection.mutable + +/** Stores some [[Task tasks]] and execute them at time defined in task. + * + * == Usage == + * + * Start a new scheduler instance by create a new Scheduler object, and + * the scheduler runner will automatic start to run. + * + * Using [[Scheduler.++]] or [[Scheduler.schedule]] to add a [[Task]] to + * a Scheduler instance. + * + * If you want to remove a task, use [[Scheduler.%]] or [[Scheduler.cancel]]. + * Removal task should be the same task object, but not just the same name. + * + * The scheduler will not automatic stop when the tasks is all done and the + * main thread is stopped. You can/should use [[stop]], [[waitForStop]], + * [[tagStopAtAllDone]], [[waitForStopAtAllDone]] to async or syncing stop + * the scheduler. + * + * == Implementation details == + * + * Inside the Scheduler, the runner's implementation is very similar to + * java's [[java.util.Timer]]: There's a task queue sorted by [[Task.scheduledTimeMillis]] + * (which is the default order method implemented in [[Task]]), and a + * runner getting the most previous task in the queue, and sleep to that + * task's execution time. + * + * Every time the runner is executing a task, it will firstly set its thread name + * to [[Task.name]]. After running a task, if the task have some post-process + * method (like [[RoutineTask]] will do prepare for next routine), the runner's + * thread name will be set to [[Task.name]]#post. After all of + * that, the task is fully complete, and the runner's thread name will be + * reset to [[runnerName]]. + */ +class Scheduler { + + /** Status tag of this scheduler. */ + //noinspection ScalaWeakerAccess + enum State: + /** The scheduler is on init stage, have not prepared for running tasks. */ + case INIT + /** The scheduler is managing the task queue, processing the exit signal, + * and looking for the next running task. */ + case PREPARE_RUN + /** The scheduler is infinitely waiting due to there's nothing in the task + * queue. */ + case WAITING_EMPTY + /** The scheduler is waiting until the next task's running time. */ + case WAITING + /** The scheduler is running a task in the runner. */ + case RUNNING + /** The scheduler is executing a task's post effect. */ + case RUNNING_POST + /** The scheduler have been stopped, will not process any more tasks. */ + case END + + private val taskList: mutable.TreeSet[Task] = mutable.TreeSet.empty + private var exitAtNextRoutine = false + private var waitForDone = false + private var currentRunning: Task|Null = _ + private var currentRunning_isScheduledCancel = false + private var runtimeStatus = State.INIT + private val runtime: Thread = new Thread { + + override def run (): Unit = { + def willExit: Boolean = + if exitAtNextRoutine then true + else if waitForDone then + taskList.synchronized: + if taskList.isEmpty then true + else false + else false + while (!willExit) { + + runtimeStatus = State.PREPARE_RUN + + val nextMove: Task|Long|"None" = taskList.synchronized { + taskList.headOption match + case Some(_readyToRun) if System.currentTimeMillis >= _readyToRun.scheduledTimeMillis => + taskList -= _readyToRun + currentRunning = _readyToRun + _readyToRun + case Some(_notReady) => + _notReady.scheduledTimeMillis - System.currentTimeMillis + case None => "None" + } + + nextMove match + case readyToRun: Task => + + runtimeStatus = State.RUNNING + this setName readyToRun.name + + try { + readyToRun.main + } catch case _: (Exception | Error) => {} + + runtimeStatus = State.RUNNING_POST + this setName s"${readyToRun.name}#post" + + if currentRunning_isScheduledCancel then {} + else { + currentRunning match + case routine: RoutineTask => + routine.nextRoutineTimeMillis(routine.currentScheduledTimeMillis) match + case next: Long => + routine.currentScheduledTimeMillis = next + if (!currentRunning_isScheduledCancel) schedule(routine) + case _ => + case _ => + } + + currentRunning = null + this setName runnerName + + case needToWaitMillis: Long => + runtimeStatus = State.WAITING + try Thread.sleep(needToWaitMillis) + catch case _: InterruptedException => {} + case _: "None" => + runtimeStatus = State.WAITING_EMPTY + try Thread.sleep(Long.MaxValue) + catch case _: InterruptedException => {} + + } + runtimeStatus = State.END + } + + } + runtime setName runnerName + runtime.start() + + /** Name of the scheduler runner. + * Currently, same with the scheduler [[toString]] + */ + //noinspection ScalaWeakerAccess + def runnerName: String = + this.toString + + /** Add one task to scheduler task queue. + * @return this scheduler for chained call. + */ + @targetName("scheduleIt") + def ++ (task: Task): this.type = + schedule(task) + this + /** Add one task to scheduler task queue. + * @return [[true]] if the task is added. + */ + def schedule (task: Task): Boolean = + try taskList.synchronized: + taskList add task + finally runtime.interrupt() + + /** Remove the task from scheduler task queue. + * + * If the removal task is running, the current run will be done, but will + * not do the post effect of the task (like schedule the next routine + * of [[RoutineTask]]). + * + * @return this scheduler for chained call. + */ + @targetName("cancelIt") + def % (task: Task): this.type = + cancel(task) + this + /** Remove the task from scheduler task queue. + * + * If the removal task is running, the current run will be done, but will + * not do the post effect of the task (like schedule the next routine + * of [[RoutineTask]]). + * + * @return [[true]] if the task is in task queue or is running, and have been + * succeed removed from task queue. + */ + def cancel (task: Task): Boolean = + try { + val succeed = taskList.synchronized { taskList remove task } + if succeed then succeed + else if task == currentRunning then + currentRunning_isScheduledCancel = true + true + else false + } + finally runtime.interrupt() + + /** Count of tasks in the task queue. + * + * Do not contains the running task. + */ + def amount: Int = + taskList.size + + /** Current [[State status]] */ + def state: this.State = + runtimeStatus + + /** This scheduler's runner thread state */ + def runnerState: Thread.State = + runtime.getState + + /** Stop the scheduler's runner, no matter how much task is not run yet. + * + * After call this, it will immediately give a signal to the runner for + * stopping it. If the runner is not running any task, it will stop immediately; + * If there's one task running, the runner will continue executing until + * the current task is done and the current task's post effect is done, then + * stop. + * + * This method is async, means complete this method does not means the + * runner is stopped. If you want a sync version, see [[waitForStop]]. + */ + def stop (): Unit = + exitAtNextRoutine = true + runtime.interrupt() + + /** Stop the scheduler's runner, no matter how much task is not run yet, + * and wait for the runner stopped. + * + * It do the same job with [[stop]], the only different is this method + * will join the runner thread to wait it stopped. + * + * @throws InterruptedException if any thread has interrupted the current + * thread. The interrupted status of the current + * thread is cleared when this exception is thrown. + */ + @throws[InterruptedException] + def waitForStop (): Unit = + stop() + runtime.join() + + /** Tag this scheduler runner stop when all of the scheduler's task in task + * queue have been stopped. + * + * After called this method, the runner will exit when all tasks executed done + * and there's no more task can be found in task queue. + * + * Notice that if there's [[RoutineTask]] in task queue, due to the routine + * task will re-enter the task queue in task's post effect stage after executed, + * it will cause the task queue will never be empty. You may need to remove all + * routine tasks before calling this. + * + * This method is async, means complete this method does not means the + * runner is stopped. If you want a sync version, see [[waitForStopAtAllDone]]. + */ + //noinspection ScalaWeakerAccess + def tagStopAtAllDone (): Unit = + waitForDone = true + runtime.interrupt() + + /** Tag this scheduler runner stop when all of the scheduler's task in task + * queue have been stopped, and wait for the runner stopped. + * + * It do the same job with [[tagStopAtAllDone]], the only different is this method + * will join the runner thread to wait it stopped. + * + * @throws InterruptedException if any thread has interrupted the current + * thread. The interrupted status of the current + * thread is cleared when this exception is thrown. + */ + def waitForStopAtAllDone(): Unit = + tagStopAtAllDone() + runtime.join() + +} diff --git a/src/main/scala/cc/sukazyo/cono/morny/util/schedule/Task.scala b/src/main/scala/cc/sukazyo/cono/morny/util/schedule/Task.scala new file mode 100644 index 0000000..2dc4503 --- /dev/null +++ b/src/main/scala/cc/sukazyo/cono/morny/util/schedule/Task.scala @@ -0,0 +1,57 @@ +package cc.sukazyo.cono.morny.util.schedule + +import cc.sukazyo.cono.morny.util.EpochDateTime.EpochMillis + +/** A schedule task that can be added to [[Scheduler]]. + * + * Contains some basic task information: [[name]], [[scheduledTimeMillis]], + * and [[main]] as the method which will be called. + * + * Tasks are ordered by time, and makes sure that two different task instance + * is NOT THE SAME. + *
+ * When comparing two tasks, it will firstly compare the [[scheduledTimeMillis]]: + * If the result is the not the same, return it; If the result is the same, then + * using [[Object]]'s compare method to compare it. + *
+ */ +trait Task extends Ordered[Task] { + + /** Task name. Also the executor thread name when task is executing. + * + * Will be used in [[Scheduler]] to change the running thread's name. + */ + def name: String + /** Next running time. + * + * If it is smaller than current time, the task should be executed immediately. + */ + def scheduledTimeMillis: EpochMillis + + //noinspection UnitMethodIsParameterless + def main: Unit + + override def compare (that: Task): Int = + scheduledTimeMillis.compareTo(that.scheduledTimeMillis) match + case 0 => this.hashCode - that.hashCode + case n => n + + /** Returns this task's object name and the task name. + * + * for example: + * {{{ + * scala> val task = new Task { + * val name = "example-task" + * val scheduledTimeMillis = 0 + * def main = println("example") + * } + * val task: cc.sukazyo.cono.morny.util.schedule.Task = anon$1@26d8908e{"example-task": 0} + * + * scala> task.toString + * val res0: String = anon$1@26d8908e{"example-task": 0} + * }}} + */ + override def toString: String = + s"""${super.toString}{"$name": $scheduledTimeMillis}""" + +}