mirror of
https://github.com/Eyre-S/Coeur-Morny-Cono.git
synced 2025-01-18 23:12:23 +08:00
add some scaladoc for scheduler
This commit is contained in:
parent
b57d87dece
commit
f0d4471646
@ -7,7 +7,7 @@ import cc.sukazyo.cono.morny.MornyCoeur.THREAD_SERVER_EXIT
|
||||
import cc.sukazyo.cono.morny.bot.api.EventListenerManager
|
||||
import cc.sukazyo.cono.morny.bot.event.{MornyEventListeners, MornyOnInlineQuery, MornyOnTelegramCommand, MornyOnUpdateTimestampOffsetLock}
|
||||
import cc.sukazyo.cono.morny.bot.query.MornyQueries
|
||||
import cc.sukazyo.cono.morny.internal.schedule.Scheduler
|
||||
import cc.sukazyo.cono.morny.util.schedule.Scheduler
|
||||
import com.pengrad.telegrambot.TelegramBot
|
||||
import com.pengrad.telegrambot.request.GetMe
|
||||
|
||||
|
@ -153,7 +153,8 @@ class MornyInformation (using coeur: MornyCoeur) extends ITelegramCommand {
|
||||
// language=html
|
||||
s"""<b>Coeur Task Scheduler:</b>
|
||||
| - <i>scheduled tasks</i>: <code>${coeur.tasks.amount}</code>
|
||||
| - <i>current runner status</i>: <code>${coeur.tasks.state}</code>
|
||||
| - <i>scheduler status</i>: <code>${coeur.tasks.state}</code>
|
||||
| - <i>current runner status</i>: <code>${coeur.tasks.runnerState}</code>
|
||||
|""".stripMargin
|
||||
).parseMode(ParseMode.HTML).replyToMessageId(update.message.messageId)
|
||||
}
|
||||
|
@ -1,9 +1,9 @@
|
||||
package cc.sukazyo.cono.morny.daemon
|
||||
|
||||
import cc.sukazyo.cono.morny.Log.{exceptionLog, logger}
|
||||
import cc.sukazyo.cono.morny.Log.logger
|
||||
import cc.sukazyo.cono.morny.MornyCoeur
|
||||
import cc.sukazyo.cono.morny.daemon.MedicationTimer.calcNextRoutineTimestamp
|
||||
import cc.sukazyo.cono.morny.internal.schedule.RoutineTask
|
||||
import cc.sukazyo.cono.morny.util.schedule.RoutineTask
|
||||
import cc.sukazyo.cono.morny.util.tgapi.TelegramExtensions.Bot.exec
|
||||
import cc.sukazyo.cono.morny.util.CommonFormat
|
||||
import com.pengrad.telegrambot.model.{Message, MessageEntity}
|
||||
|
@ -1,119 +0,0 @@
|
||||
package cc.sukazyo.cono.morny.internal.schedule
|
||||
|
||||
import scala.collection.mutable
|
||||
|
||||
class Scheduler {
|
||||
|
||||
// val taskList: util.TreeSet[Task] =
|
||||
// Collections.synchronizedSortedSet(util.TreeSet[Task]())
|
||||
private val taskList: mutable.TreeSet[Task] = mutable.TreeSet.empty
|
||||
private var exitAtNextRoutine = false
|
||||
private var waitForDone = false
|
||||
private var currentRunning: Task|Null = _
|
||||
private var currentRunning_isScheduledCancel = false
|
||||
private val runtime: Thread = new Thread {
|
||||
|
||||
override def run (): Unit = {
|
||||
def willExit: Boolean =
|
||||
if exitAtNextRoutine then true
|
||||
else if waitForDone then
|
||||
taskList.synchronized:
|
||||
if taskList.isEmpty then true
|
||||
else false
|
||||
else false
|
||||
while (!willExit) {
|
||||
|
||||
val nextMove: Task|Long = taskList.synchronized {
|
||||
taskList.headOption match
|
||||
case Some(_readyToRun) if System.currentTimeMillis >= _readyToRun.scheduledTimeMillis =>
|
||||
taskList -= _readyToRun
|
||||
currentRunning = _readyToRun
|
||||
_readyToRun
|
||||
case Some(_notReady) =>
|
||||
_notReady.scheduledTimeMillis - System.currentTimeMillis
|
||||
case None =>
|
||||
Long.MaxValue
|
||||
}
|
||||
|
||||
nextMove match
|
||||
case readyToRun: Task =>
|
||||
|
||||
this setName readyToRun.name
|
||||
|
||||
try {
|
||||
readyToRun.main
|
||||
} catch case _: (Exception | Error) => {}
|
||||
|
||||
this setName s"${readyToRun.name}#post"
|
||||
|
||||
currentRunning match
|
||||
case routine: RoutineTask =>
|
||||
routine.nextRoutineTimeMillis(routine.currentScheduledTimeMillis) match
|
||||
case next: Long =>
|
||||
routine.currentScheduledTimeMillis = next
|
||||
if (!currentRunning_isScheduledCancel) schedule(routine)
|
||||
case _ =>
|
||||
case _ =>
|
||||
|
||||
this setName runnerName
|
||||
currentRunning = null
|
||||
|
||||
case needToWaitMillis: Long =>
|
||||
try Thread.sleep(needToWaitMillis)
|
||||
catch case _: InterruptedException => {}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
runtime.start()
|
||||
|
||||
//noinspection ScalaWeakerAccess
|
||||
def runnerName: String =
|
||||
this.toString
|
||||
|
||||
def ++ (task: Task): this.type =
|
||||
schedule(task)
|
||||
this
|
||||
def schedule (task: Task): Boolean =
|
||||
try taskList.synchronized:
|
||||
taskList add task
|
||||
finally runtime.interrupt()
|
||||
|
||||
def % (task: Task): this.type =
|
||||
cancel(task)
|
||||
this
|
||||
def cancel (task: Task): Boolean =
|
||||
try {
|
||||
val succeed = taskList.synchronized { taskList remove task }
|
||||
if succeed then succeed
|
||||
else if task == currentRunning then
|
||||
currentRunning_isScheduledCancel = true
|
||||
true
|
||||
else false
|
||||
}
|
||||
finally runtime.interrupt()
|
||||
|
||||
def amount: Int =
|
||||
taskList.size
|
||||
|
||||
def state: Thread.State =
|
||||
runtime.getState
|
||||
|
||||
def stop (): Unit =
|
||||
exitAtNextRoutine = true
|
||||
runtime.interrupt()
|
||||
|
||||
def waitForStop (): Unit =
|
||||
stop()
|
||||
runtime.join()
|
||||
|
||||
//noinspection ScalaWeakerAccess
|
||||
def tagStopAtAllDone (): Unit =
|
||||
waitForDone = true
|
||||
|
||||
def waitForStopAtAllDone(): Unit =
|
||||
tagStopAtAllDone()
|
||||
runtime.join()
|
||||
|
||||
}
|
@ -1,21 +0,0 @@
|
||||
package cc.sukazyo.cono.morny.internal.schedule
|
||||
|
||||
trait Task extends Ordered[Task] {
|
||||
|
||||
def name: String
|
||||
def scheduledTimeMillis: Long
|
||||
|
||||
//noinspection UnitMethodIsParameterless
|
||||
def main: Unit
|
||||
|
||||
override def compare (that: Task): Int =
|
||||
if this.scheduledTimeMillis == that.scheduledTimeMillis then
|
||||
this.hashCode - that.hashCode
|
||||
else if this.scheduledTimeMillis > that.scheduledTimeMillis then
|
||||
1
|
||||
else -1
|
||||
|
||||
override def toString: String =
|
||||
s"""${super.toString}{"$name": $scheduledTimeMillis}"""
|
||||
|
||||
}
|
@ -5,7 +5,12 @@ import java.time.format.DateTimeFormatter
|
||||
|
||||
object EpochDateTime {
|
||||
|
||||
/** The UNIX Epoch Time in milliseconds.
|
||||
*
|
||||
* aka. Milliseconds since 00:00:00 UTC on Thursday, 1 January 1970.
|
||||
*/
|
||||
type EpochMillis = Long
|
||||
/** Time duration/interval in milliseconds. */
|
||||
type DurationMillis = Long
|
||||
|
||||
object EpochMillis:
|
||||
|
@ -1,4 +1,4 @@
|
||||
package cc.sukazyo.cono.morny.internal.schedule
|
||||
package cc.sukazyo.cono.morny.util.schedule
|
||||
|
||||
trait DelayedTask (
|
||||
val delayedMillis: Long
|
@ -1,4 +1,4 @@
|
||||
package cc.sukazyo.cono.morny.internal.schedule
|
||||
package cc.sukazyo.cono.morny.util.schedule
|
||||
|
||||
trait IntervalTask extends RoutineTask {
|
||||
|
@ -1,4 +1,4 @@
|
||||
package cc.sukazyo.cono.morny.internal.schedule
|
||||
package cc.sukazyo.cono.morny.util.schedule
|
||||
|
||||
trait IntervalWithTimesTask extends IntervalTask {
|
||||
|
@ -1,4 +1,4 @@
|
||||
package cc.sukazyo.cono.morny.internal.schedule
|
||||
package cc.sukazyo.cono.morny.util.schedule
|
||||
|
||||
trait RoutineTask extends Task {
|
||||
|
@ -0,0 +1,269 @@
|
||||
package cc.sukazyo.cono.morny.util.schedule
|
||||
|
||||
import scala.annotation.targetName
|
||||
import scala.collection.mutable
|
||||
|
||||
/** Stores some [[Task tasks]] and execute them at time defined in task.
|
||||
*
|
||||
* == Usage ==
|
||||
*
|
||||
* Start a new scheduler instance by create a new Scheduler object, and
|
||||
* the scheduler runner will automatic start to run.
|
||||
*
|
||||
* Using [[Scheduler.++]] or [[Scheduler.schedule]] to add a [[Task]] to
|
||||
* a Scheduler instance.
|
||||
*
|
||||
* If you want to remove a task, use [[Scheduler.%]] or [[Scheduler.cancel]].
|
||||
* Removal task should be the same task object, but not just the same name.
|
||||
*
|
||||
* The scheduler will not automatic stop when the tasks is all done and the
|
||||
* main thread is stopped. You can/should use [[stop]], [[waitForStop]],
|
||||
* [[tagStopAtAllDone]], [[waitForStopAtAllDone]] to async or syncing stop
|
||||
* the scheduler.
|
||||
*
|
||||
* == Implementation details ==
|
||||
*
|
||||
* Inside the Scheduler, the runner's implementation is very similar to
|
||||
* java's [[java.util.Timer]]: There's a task queue sorted by [[Task.scheduledTimeMillis]]
|
||||
* (which is the default order method implemented in [[Task]]), and a
|
||||
* runner getting the most previous task in the queue, and sleep to that
|
||||
* task's execution time.
|
||||
*
|
||||
* Every time the runner is executing a task, it will firstly set its thread name
|
||||
* to [[Task.name]]. After running a task, if the task have some post-process
|
||||
* method (like [[RoutineTask]] will do prepare for next routine), the runner's
|
||||
* thread name will be set to <code>[[Task.name]]#post</code>. After all of
|
||||
* that, the task is fully complete, and the runner's thread name will be
|
||||
* reset to [[runnerName]].
|
||||
*/
|
||||
class Scheduler {
|
||||
|
||||
/** Status tag of this scheduler. */
|
||||
//noinspection ScalaWeakerAccess
|
||||
enum State:
|
||||
/** The scheduler is on init stage, have not prepared for running tasks. */
|
||||
case INIT
|
||||
/** The scheduler is managing the task queue, processing the exit signal,
|
||||
* and looking for the next running task. */
|
||||
case PREPARE_RUN
|
||||
/** The scheduler is infinitely waiting due to there's nothing in the task
|
||||
* queue. */
|
||||
case WAITING_EMPTY
|
||||
/** The scheduler is waiting until the next task's running time. */
|
||||
case WAITING
|
||||
/** The scheduler is running a task in the runner. */
|
||||
case RUNNING
|
||||
/** The scheduler is executing a task's post effect. */
|
||||
case RUNNING_POST
|
||||
/** The scheduler have been stopped, will not process any more tasks. */
|
||||
case END
|
||||
|
||||
private val taskList: mutable.TreeSet[Task] = mutable.TreeSet.empty
|
||||
private var exitAtNextRoutine = false
|
||||
private var waitForDone = false
|
||||
private var currentRunning: Task|Null = _
|
||||
private var currentRunning_isScheduledCancel = false
|
||||
private var runtimeStatus = State.INIT
|
||||
private val runtime: Thread = new Thread {
|
||||
|
||||
override def run (): Unit = {
|
||||
def willExit: Boolean =
|
||||
if exitAtNextRoutine then true
|
||||
else if waitForDone then
|
||||
taskList.synchronized:
|
||||
if taskList.isEmpty then true
|
||||
else false
|
||||
else false
|
||||
while (!willExit) {
|
||||
|
||||
runtimeStatus = State.PREPARE_RUN
|
||||
|
||||
val nextMove: Task|Long|"None" = taskList.synchronized {
|
||||
taskList.headOption match
|
||||
case Some(_readyToRun) if System.currentTimeMillis >= _readyToRun.scheduledTimeMillis =>
|
||||
taskList -= _readyToRun
|
||||
currentRunning = _readyToRun
|
||||
_readyToRun
|
||||
case Some(_notReady) =>
|
||||
_notReady.scheduledTimeMillis - System.currentTimeMillis
|
||||
case None => "None"
|
||||
}
|
||||
|
||||
nextMove match
|
||||
case readyToRun: Task =>
|
||||
|
||||
runtimeStatus = State.RUNNING
|
||||
this setName readyToRun.name
|
||||
|
||||
try {
|
||||
readyToRun.main
|
||||
} catch case _: (Exception | Error) => {}
|
||||
|
||||
runtimeStatus = State.RUNNING_POST
|
||||
this setName s"${readyToRun.name}#post"
|
||||
|
||||
if currentRunning_isScheduledCancel then {}
|
||||
else {
|
||||
currentRunning match
|
||||
case routine: RoutineTask =>
|
||||
routine.nextRoutineTimeMillis(routine.currentScheduledTimeMillis) match
|
||||
case next: Long =>
|
||||
routine.currentScheduledTimeMillis = next
|
||||
if (!currentRunning_isScheduledCancel) schedule(routine)
|
||||
case _ =>
|
||||
case _ =>
|
||||
}
|
||||
|
||||
currentRunning = null
|
||||
this setName runnerName
|
||||
|
||||
case needToWaitMillis: Long =>
|
||||
runtimeStatus = State.WAITING
|
||||
try Thread.sleep(needToWaitMillis)
|
||||
catch case _: InterruptedException => {}
|
||||
case _: "None" =>
|
||||
runtimeStatus = State.WAITING_EMPTY
|
||||
try Thread.sleep(Long.MaxValue)
|
||||
catch case _: InterruptedException => {}
|
||||
|
||||
}
|
||||
runtimeStatus = State.END
|
||||
}
|
||||
|
||||
}
|
||||
runtime setName runnerName
|
||||
runtime.start()
|
||||
|
||||
/** Name of the scheduler runner.
|
||||
* Currently, same with the scheduler [[toString]]
|
||||
*/
|
||||
//noinspection ScalaWeakerAccess
|
||||
def runnerName: String =
|
||||
this.toString
|
||||
|
||||
/** Add one task to scheduler task queue.
|
||||
* @return this scheduler for chained call.
|
||||
*/
|
||||
@targetName("scheduleIt")
|
||||
def ++ (task: Task): this.type =
|
||||
schedule(task)
|
||||
this
|
||||
/** Add one task to scheduler task queue.
|
||||
* @return [[true]] if the task is added.
|
||||
*/
|
||||
def schedule (task: Task): Boolean =
|
||||
try taskList.synchronized:
|
||||
taskList add task
|
||||
finally runtime.interrupt()
|
||||
|
||||
/** Remove the task from scheduler task queue.
|
||||
*
|
||||
* If the removal task is running, the current run will be done, but will
|
||||
* not do the post effect of the task (like schedule the next routine
|
||||
* of [[RoutineTask]]).
|
||||
*
|
||||
* @return this scheduler for chained call.
|
||||
*/
|
||||
@targetName("cancelIt")
|
||||
def % (task: Task): this.type =
|
||||
cancel(task)
|
||||
this
|
||||
/** Remove the task from scheduler task queue.
|
||||
*
|
||||
* If the removal task is running, the current run will be done, but will
|
||||
* not do the post effect of the task (like schedule the next routine
|
||||
* of [[RoutineTask]]).
|
||||
*
|
||||
* @return [[true]] if the task is in task queue or is running, and have been
|
||||
* succeed removed from task queue.
|
||||
*/
|
||||
def cancel (task: Task): Boolean =
|
||||
try {
|
||||
val succeed = taskList.synchronized { taskList remove task }
|
||||
if succeed then succeed
|
||||
else if task == currentRunning then
|
||||
currentRunning_isScheduledCancel = true
|
||||
true
|
||||
else false
|
||||
}
|
||||
finally runtime.interrupt()
|
||||
|
||||
/** Count of tasks in the task queue.
|
||||
*
|
||||
* Do not contains the running task.
|
||||
*/
|
||||
def amount: Int =
|
||||
taskList.size
|
||||
|
||||
/** Current [[State status]] */
|
||||
def state: this.State =
|
||||
runtimeStatus
|
||||
|
||||
/** This scheduler's runner thread state */
|
||||
def runnerState: Thread.State =
|
||||
runtime.getState
|
||||
|
||||
/** Stop the scheduler's runner, no matter how much task is not run yet.
|
||||
*
|
||||
* After call this, it will immediately give a signal to the runner for
|
||||
* stopping it. If the runner is not running any task, it will stop immediately;
|
||||
* If there's one task running, the runner will continue executing until
|
||||
* the current task is done and the current task's post effect is done, then
|
||||
* stop.
|
||||
*
|
||||
* This method is async, means complete this method does not means the
|
||||
* runner is stopped. If you want a sync version, see [[waitForStop]].
|
||||
*/
|
||||
def stop (): Unit =
|
||||
exitAtNextRoutine = true
|
||||
runtime.interrupt()
|
||||
|
||||
/** Stop the scheduler's runner, no matter how much task is not run yet,
|
||||
* and wait for the runner stopped.
|
||||
*
|
||||
* It do the same job with [[stop]], the only different is this method
|
||||
* will join the runner thread to wait it stopped.
|
||||
*
|
||||
* @throws InterruptedException if any thread has interrupted the current
|
||||
* thread. The interrupted status of the current
|
||||
* thread is cleared when this exception is thrown.
|
||||
*/
|
||||
@throws[InterruptedException]
|
||||
def waitForStop (): Unit =
|
||||
stop()
|
||||
runtime.join()
|
||||
|
||||
/** Tag this scheduler runner stop when all of the scheduler's task in task
|
||||
* queue have been stopped.
|
||||
*
|
||||
* After called this method, the runner will exit when all tasks executed done
|
||||
* and there's no more task can be found in task queue.
|
||||
*
|
||||
* Notice that if there's [[RoutineTask]] in task queue, due to the routine
|
||||
* task will re-enter the task queue in task's post effect stage after executed,
|
||||
* it will cause the task queue will never be empty. You may need to remove all
|
||||
* routine tasks before calling this.
|
||||
*
|
||||
* This method is async, means complete this method does not means the
|
||||
* runner is stopped. If you want a sync version, see [[waitForStopAtAllDone]].
|
||||
*/
|
||||
//noinspection ScalaWeakerAccess
|
||||
def tagStopAtAllDone (): Unit =
|
||||
waitForDone = true
|
||||
runtime.interrupt()
|
||||
|
||||
/** Tag this scheduler runner stop when all of the scheduler's task in task
|
||||
* queue have been stopped, and wait for the runner stopped.
|
||||
*
|
||||
* It do the same job with [[tagStopAtAllDone]], the only different is this method
|
||||
* will join the runner thread to wait it stopped.
|
||||
*
|
||||
* @throws InterruptedException if any thread has interrupted the current
|
||||
* thread. The interrupted status of the current
|
||||
* thread is cleared when this exception is thrown.
|
||||
*/
|
||||
def waitForStopAtAllDone(): Unit =
|
||||
tagStopAtAllDone()
|
||||
runtime.join()
|
||||
|
||||
}
|
@ -0,0 +1,57 @@
|
||||
package cc.sukazyo.cono.morny.util.schedule
|
||||
|
||||
import cc.sukazyo.cono.morny.util.EpochDateTime.EpochMillis
|
||||
|
||||
/** A schedule task that can be added to [[Scheduler]].
|
||||
*
|
||||
* Contains some basic task information: [[name]], [[scheduledTimeMillis]],
|
||||
* and [[main]] as the method which will be called.
|
||||
*
|
||||
* Tasks are ordered by time, and makes sure that two different task instance
|
||||
* is NOT THE SAME.
|
||||
* <blockquote>
|
||||
* When comparing two tasks, it will firstly compare the [[scheduledTimeMillis]]:
|
||||
* If the result is the not the same, return it; If the result is the same, then
|
||||
* using [[Object]]'s compare method to compare it.
|
||||
* </blockquote>
|
||||
*/
|
||||
trait Task extends Ordered[Task] {
|
||||
|
||||
/** Task name. Also the executor thread name when task is executing.
|
||||
*
|
||||
* Will be used in [[Scheduler]] to change the running thread's name.
|
||||
*/
|
||||
def name: String
|
||||
/** Next running time.
|
||||
*
|
||||
* If it is smaller than current time, the task should be executed immediately.
|
||||
*/
|
||||
def scheduledTimeMillis: EpochMillis
|
||||
|
||||
//noinspection UnitMethodIsParameterless
|
||||
def main: Unit
|
||||
|
||||
override def compare (that: Task): Int =
|
||||
scheduledTimeMillis.compareTo(that.scheduledTimeMillis) match
|
||||
case 0 => this.hashCode - that.hashCode
|
||||
case n => n
|
||||
|
||||
/** Returns this task's object name and the task name.
|
||||
*
|
||||
* for example:
|
||||
* {{{
|
||||
* scala> val task = new Task {
|
||||
* val name = "example-task"
|
||||
* val scheduledTimeMillis = 0
|
||||
* def main = println("example")
|
||||
* }
|
||||
* val task: cc.sukazyo.cono.morny.util.schedule.Task = anon$1@26d8908e{"example-task": 0}
|
||||
*
|
||||
* scala> task.toString
|
||||
* val res0: String = anon$1@26d8908e{"example-task": 0}
|
||||
* }}}
|
||||
*/
|
||||
override def toString: String =
|
||||
s"""${super.toString}{"$name": $scheduledTimeMillis}"""
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user