add basic Scheduler and interval tasks

- MedicationTimer refactor using new scheduler
- add `/info tasks` for monitoring morny global tasks
This commit is contained in:
A.C.Sukazyo Eyre 2023-11-03 20:23:32 +08:00
parent adb91a06d5
commit b57d87dece
Signed by: Eyre_S
GPG Key ID: C17CE40291207874
12 changed files with 277 additions and 40 deletions

View File

@ -5,7 +5,7 @@ MORNY_ARCHIVE_NAME = morny-coeur
MORNY_CODE_STORE = https://github.com/Eyre-S/Coeur-Morny-Cono MORNY_CODE_STORE = https://github.com/Eyre-S/Coeur-Morny-Cono
MORNY_COMMIT_PATH = https://github.com/Eyre-S/Coeur-Morny-Cono/commit/%s MORNY_COMMIT_PATH = https://github.com/Eyre-S/Coeur-Morny-Cono/commit/%s
VERSION = 1.2.1 VERSION = 1.3.0-feat/scheduler
USE_DELTA = false USE_DELTA = false
VERSION_DELTA = VERSION_DELTA =

View File

@ -7,6 +7,7 @@ import cc.sukazyo.cono.morny.MornyCoeur.THREAD_SERVER_EXIT
import cc.sukazyo.cono.morny.bot.api.EventListenerManager import cc.sukazyo.cono.morny.bot.api.EventListenerManager
import cc.sukazyo.cono.morny.bot.event.{MornyEventListeners, MornyOnInlineQuery, MornyOnTelegramCommand, MornyOnUpdateTimestampOffsetLock} import cc.sukazyo.cono.morny.bot.event.{MornyEventListeners, MornyOnInlineQuery, MornyOnTelegramCommand, MornyOnUpdateTimestampOffsetLock}
import cc.sukazyo.cono.morny.bot.query.MornyQueries import cc.sukazyo.cono.morny.bot.query.MornyQueries
import cc.sukazyo.cono.morny.internal.schedule.Scheduler
import com.pengrad.telegrambot.TelegramBot import com.pengrad.telegrambot.TelegramBot
import com.pengrad.telegrambot.request.GetMe import com.pengrad.telegrambot.request.GetMe
@ -64,6 +65,8 @@ class MornyCoeur (using val config: MornyConfig) {
/** current Morny's [[MornyTrusted]] instance */ /** current Morny's [[MornyTrusted]] instance */
val trusted: MornyTrusted = MornyTrusted() val trusted: MornyTrusted = MornyTrusted()
/** Morny's task [[Scheduler]] */
val tasks: Scheduler = Scheduler()
val daemons: MornyDaemons = MornyDaemons() val daemons: MornyDaemons = MornyDaemons()
//noinspection ScalaWeakerAccess //noinspection ScalaWeakerAccess
@ -101,6 +104,8 @@ class MornyCoeur (using val config: MornyConfig) {
account.shutdown() account.shutdown()
logger info "stopped bot account" logger info "stopped bot account"
daemons.stop() daemons.stop()
tasks.waitForStop()
logger info s"morny tasks stopped: remains ${tasks.amount} tasks not be executed"
if config.commandLogoutClear then if config.commandLogoutClear then
commands.automaticTGListRemove() commands.automaticTGListRemove()
logger info "done exit cleanup" logger info "done exit cleanup"
@ -160,5 +165,5 @@ class MornyCoeur (using val config: MornyConfig) {
} }
} }
} }

View File

@ -22,11 +22,12 @@ class MornyInformation (using coeur: MornyCoeur) extends ITelegramCommand {
val RUNTIME = "runtime" val RUNTIME = "runtime"
val VERSION = "version" val VERSION = "version"
val VERSION_2 = "v" val VERSION_2 = "v"
val TASKS = "tasks"
} }
override val name: String = "info" override val name: String = "info"
override val aliases: Array[ICommandAlias]|Null = null override val aliases: Array[ICommandAlias]|Null = null
override val paramRule: String = "[(version|runtime|stickers[.IDs])]" override val paramRule: String = "[(version|runtime|stickers[.IDs]|tasks)]"
override val description: String = "输出当前 Morny 的各种信息" override val description: String = "输出当前 Morny 的各种信息"
override def execute (using command: InputCommand, event: Update): Unit = { override def execute (using command: InputCommand, event: Update): Unit = {
@ -42,6 +43,7 @@ class MornyInformation (using coeur: MornyCoeur) extends ITelegramCommand {
case s if s startsWith Subs.STICKERS => echoStickers case s if s startsWith Subs.STICKERS => echoStickers
case Subs.RUNTIME => echoRuntime case Subs.RUNTIME => echoRuntime
case Subs.VERSION | Subs.VERSION_2 => echoVersion case Subs.VERSION | Subs.VERSION_2 => echoVersion
case Subs.TASKS => echoTasksStatus
case _ => echo404 case _ => echo404
} }
@ -144,6 +146,18 @@ class MornyInformation (using coeur: MornyCoeur) extends ITelegramCommand {
).parseMode(ParseMode HTML).replyToMessageId(event.message.messageId) ).parseMode(ParseMode HTML).replyToMessageId(event.message.messageId)
} }
private def echoTasksStatus (using update: Update): Unit = {
// if !coeur.trusted.isTrusted(update.message.from.id) then return;
coeur.account exec SendMessage(
update.message.chat.id,
// language=html
s"""<b>Coeur Task Scheduler:</b>
| - <i>scheduled tasks</i>: <code>${coeur.tasks.amount}</code>
| - <i>current runner status</i>: <code>${coeur.tasks.state}</code>
|""".stripMargin
).parseMode(ParseMode.HTML).replyToMessageId(update.message.messageId)
}
private def echo404 (using event: Update): Unit = private def echo404 (using event: Update): Unit =
coeur.account exec new SendSticker( coeur.account exec new SendSticker(
event.message.chat.id, event.message.chat.id,

View File

@ -3,6 +3,7 @@ package cc.sukazyo.cono.morny.daemon
import cc.sukazyo.cono.morny.Log.{exceptionLog, logger} import cc.sukazyo.cono.morny.Log.{exceptionLog, logger}
import cc.sukazyo.cono.morny.MornyCoeur import cc.sukazyo.cono.morny.MornyCoeur
import cc.sukazyo.cono.morny.daemon.MedicationTimer.calcNextRoutineTimestamp import cc.sukazyo.cono.morny.daemon.MedicationTimer.calcNextRoutineTimestamp
import cc.sukazyo.cono.morny.internal.schedule.RoutineTask
import cc.sukazyo.cono.morny.util.tgapi.TelegramExtensions.Bot.exec import cc.sukazyo.cono.morny.util.tgapi.TelegramExtensions.Bot.exec
import cc.sukazyo.cono.morny.util.CommonFormat import cc.sukazyo.cono.morny.util.CommonFormat
import com.pengrad.telegrambot.model.{Message, MessageEntity} import com.pengrad.telegrambot.model.{Message, MessageEntity}
@ -13,7 +14,7 @@ import java.time.{LocalDateTime, ZoneOffset}
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions import scala.language.implicitConversions
class MedicationTimer (using coeur: MornyCoeur) extends Thread { class MedicationTimer (using coeur: MornyCoeur) {
private val NOTIFY_MESSAGE = "🍥⏲" private val NOTIFY_MESSAGE = "🍥⏲"
private val DAEMON_THREAD_NAME_DEF = "MedicationTimer" private val DAEMON_THREAD_NAME_DEF = "MedicationTimer"
@ -23,45 +24,41 @@ class MedicationTimer (using coeur: MornyCoeur) extends Thread {
private val notify_atHour: Set[Int] = coeur.config.medicationNotifyAt.asScala.toSet.map(_.intValue) private val notify_atHour: Set[Int] = coeur.config.medicationNotifyAt.asScala.toSet.map(_.intValue)
private val notify_toChat = coeur.config.medicationNotifyToChat private val notify_toChat = coeur.config.medicationNotifyToChat
this.setName(DAEMON_THREAD_NAME_DEF)
private var lastNotify_messageId: Option[Int] = None private var lastNotify_messageId: Option[Int] = None
override def run (): Unit = { private val scheduleTask: RoutineTask = new RoutineTask {
if ((notify_toChat == -1) || (notify_atHour isEmpty)) { override def name: String = DAEMON_THREAD_NAME_DEF
logger notice "Medication Timer disabled : related param is not complete set"
return
}
logger notice "Medication Timer started." def calcNextSendTime: Long =
while (!this.isInterrupted) { val next_time = calcNextRoutineTimestamp(System.currentTimeMillis, use_timeZone, notify_atHour)
try { logger info s"medication timer will send next notify at ${CommonFormat.formatDate(next_time, use_timeZone.getTotalSeconds / 60 / 60)} with $use_timeZone [$next_time]"
val next_time = calcNextRoutineTimestamp(System.currentTimeMillis, use_timeZone, notify_atHour) next_time
logger info s"medication timer will send next notify at ${CommonFormat.formatDate(next_time, use_timeZone.getTotalSeconds/60/60)} with $use_timeZone [$next_time]"
val sleep_millis = next_time - System.currentTimeMillis override def firstRoutineTimeMillis: Long =
logger debug s"medication timer will sleep ${CommonFormat.formatDuration(sleep_millis)} [$sleep_millis]" calcNextSendTime
Thread sleep sleep_millis
sendNotification() override def nextRoutineTimeMillis (previousRoutineScheduledTimeMillis: Long): Long | Null =
logger info "medication notify sent." calcNextSendTime
} catch
case _: InterruptedException => override def main: Unit = {
interrupt() sendNotification()
logger notice "MedicationTimer was interrupted, will be exit now" logger info "medication notify sent."
case ill: IllegalArgumentException =>
logger warn "MedicationTimer will not work due to: " + ill.getMessage
interrupt()
case e =>
logger error
s"""unexpected error occurred on NotificationTimer
|${exceptionLog(e)}"""
.stripMargin
coeur.daemons.reporter.exception(e)
} }
logger notice "Medication Timer stopped."
} }
def start(): Unit =
if ((notify_toChat == -1) || (notify_atHour isEmpty))
logger notice "Medication Timer disabled : related param is not complete set"
return;
coeur.tasks ++ scheduleTask
logger notice "Medication Timer started."
def stop(): Unit =
coeur.tasks % scheduleTask
logger notice "Medication Timer stopped."
private def sendNotification(): Unit = { private def sendNotification(): Unit = {
val sendResponse: SendResponse = coeur.account exec SendMessage(notify_toChat, NOTIFY_MESSAGE) val sendResponse: SendResponse = coeur.account exec SendMessage(notify_toChat, NOTIFY_MESSAGE)
if sendResponse isOk then lastNotify_messageId = Some(sendResponse.message.messageId) if sendResponse isOk then lastNotify_messageId = Some(sendResponse.message.messageId)

View File

@ -25,11 +25,8 @@ class MornyDaemons (using val coeur: MornyCoeur) {
logger notice "stopping All Morny Daemons..." logger notice "stopping All Morny Daemons..."
// TrackerDataManager.DAEMON.interrupt(); // TrackerDataManager.DAEMON.interrupt();
medicationTimer.interrupt() medicationTimer.stop()
// TrackerDataManager.trackingLock.lock(); // TrackerDataManager.trackingLock.lock();
try { medicationTimer.join() }
catch case e: InterruptedException =>
e.printStackTrace(System.out)
logger notice "stopped ALL Morny Daemons." logger notice "stopped ALL Morny Daemons."
} }

View File

@ -0,0 +1,18 @@
package cc.sukazyo.cono.morny.internal.schedule
trait DelayedTask (
val delayedMillis: Long
) extends Task {
override val scheduledTimeMillis: Long = System.currentTimeMillis + delayedMillis
}
object DelayedTask {
def apply (_name: String, delayedMillis: Long, task: =>Unit): DelayedTask =
new DelayedTask (delayedMillis):
override val name: String = _name
override def main: Unit = task
}

View File

@ -0,0 +1,25 @@
package cc.sukazyo.cono.morny.internal.schedule
trait IntervalTask extends RoutineTask {
def intervalMillis: Long
override def firstRoutineTimeMillis: Long =
System.currentTimeMillis() + intervalMillis
override def nextRoutineTimeMillis (
previousScheduledRoutineTimeMillis: Long
): Long|Null =
previousScheduledRoutineTimeMillis + intervalMillis
}
object IntervalTask {
def apply (_name: String, _intervalMillis: Long, task: =>Unit): IntervalTask =
new IntervalTask:
override def intervalMillis: Long = _intervalMillis
override def name: String = _name
override def main: Unit = task
}

View File

@ -0,0 +1,26 @@
package cc.sukazyo.cono.morny.internal.schedule
trait IntervalWithTimesTask extends IntervalTask {
def times: Int
private var currentExecutedTimes = 1
override def nextRoutineTimeMillis (previousScheduledRoutineTimeMillis: Long): Long | Null =
if currentExecutedTimes >= times then
null
else
currentExecutedTimes = currentExecutedTimes + 1
super.nextRoutineTimeMillis(previousScheduledRoutineTimeMillis)
}
object IntervalWithTimesTask {
def apply (_name: String, _intervalMillis: Long, _times: Int, task: =>Unit): IntervalWithTimesTask =
new IntervalWithTimesTask:
override def name: String = _name
override def times: Int = _times
override def intervalMillis: Long = _intervalMillis
override def main: Unit = task
}

View File

@ -0,0 +1,12 @@
package cc.sukazyo.cono.morny.internal.schedule
trait RoutineTask extends Task {
private[schedule] var currentScheduledTimeMillis: Long = firstRoutineTimeMillis
override def scheduledTimeMillis: Long = currentScheduledTimeMillis
def firstRoutineTimeMillis: Long
def nextRoutineTimeMillis (previousRoutineScheduledTimeMillis: Long): Long|Null
}

View File

@ -0,0 +1,119 @@
package cc.sukazyo.cono.morny.internal.schedule
import scala.collection.mutable
class Scheduler {
// val taskList: util.TreeSet[Task] =
// Collections.synchronizedSortedSet(util.TreeSet[Task]())
private val taskList: mutable.TreeSet[Task] = mutable.TreeSet.empty
private var exitAtNextRoutine = false
private var waitForDone = false
private var currentRunning: Task|Null = _
private var currentRunning_isScheduledCancel = false
private val runtime: Thread = new Thread {
override def run (): Unit = {
def willExit: Boolean =
if exitAtNextRoutine then true
else if waitForDone then
taskList.synchronized:
if taskList.isEmpty then true
else false
else false
while (!willExit) {
val nextMove: Task|Long = taskList.synchronized {
taskList.headOption match
case Some(_readyToRun) if System.currentTimeMillis >= _readyToRun.scheduledTimeMillis =>
taskList -= _readyToRun
currentRunning = _readyToRun
_readyToRun
case Some(_notReady) =>
_notReady.scheduledTimeMillis - System.currentTimeMillis
case None =>
Long.MaxValue
}
nextMove match
case readyToRun: Task =>
this setName readyToRun.name
try {
readyToRun.main
} catch case _: (Exception | Error) => {}
this setName s"${readyToRun.name}#post"
currentRunning match
case routine: RoutineTask =>
routine.nextRoutineTimeMillis(routine.currentScheduledTimeMillis) match
case next: Long =>
routine.currentScheduledTimeMillis = next
if (!currentRunning_isScheduledCancel) schedule(routine)
case _ =>
case _ =>
this setName runnerName
currentRunning = null
case needToWaitMillis: Long =>
try Thread.sleep(needToWaitMillis)
catch case _: InterruptedException => {}
}
}
}
runtime.start()
//noinspection ScalaWeakerAccess
def runnerName: String =
this.toString
def ++ (task: Task): this.type =
schedule(task)
this
def schedule (task: Task): Boolean =
try taskList.synchronized:
taskList add task
finally runtime.interrupt()
def % (task: Task): this.type =
cancel(task)
this
def cancel (task: Task): Boolean =
try {
val succeed = taskList.synchronized { taskList remove task }
if succeed then succeed
else if task == currentRunning then
currentRunning_isScheduledCancel = true
true
else false
}
finally runtime.interrupt()
def amount: Int =
taskList.size
def state: Thread.State =
runtime.getState
def stop (): Unit =
exitAtNextRoutine = true
runtime.interrupt()
def waitForStop (): Unit =
stop()
runtime.join()
//noinspection ScalaWeakerAccess
def tagStopAtAllDone (): Unit =
waitForDone = true
def waitForStopAtAllDone(): Unit =
tagStopAtAllDone()
runtime.join()
}

View File

@ -0,0 +1,21 @@
package cc.sukazyo.cono.morny.internal.schedule
trait Task extends Ordered[Task] {
def name: String
def scheduledTimeMillis: Long
//noinspection UnitMethodIsParameterless
def main: Unit
override def compare (that: Task): Int =
if this.scheduledTimeMillis == that.scheduledTimeMillis then
this.hashCode - that.hashCode
else if this.scheduledTimeMillis > that.scheduledTimeMillis then
1
else -1
override def toString: String =
s"""${super.toString}{"$name": $scheduledTimeMillis}"""
}

View File

@ -94,7 +94,10 @@ class BilibiliFormsTest extends MornyTests with TableDrivenPropertyChecks {
val examples = Table( val examples = Table(
("b23_link", "bilibili_video_link"), ("b23_link", "bilibili_video_link"),
("https://b23.tv/iiCldvZ", "https://www.bilibili.com/video/BV1Gh411P7Sh?buvid=XY6F25B69BE9CF469FF5B917D012C93E95E72&is_story_h5=false&mid=wD6DQnYivIG5pfA3sAGL6A%3D%3D&p=1&plat_id=114&share_from=ugc&share_medium=android&share_plat=android&share_session_id=8081015b-1210-4dea-a665-6746b4850fcd&share_source=COPY&share_tag=s_i&timestamp=1689605644&unique_k=iiCldvZ&up_id=19977489"), ("https://b23.tv/iiCldvZ", "https://www.bilibili.com/video/BV1Gh411P7Sh?buvid=XY6F25B69BE9CF469FF5B917D012C93E95E72&is_story_h5=false&mid=wD6DQnYivIG5pfA3sAGL6A%3D%3D&p=1&plat_id=114&share_from=ugc&share_medium=android&share_plat=android&share_session_id=8081015b-1210-4dea-a665-6746b4850fcd&share_source=COPY&share_tag=s_i&timestamp=1689605644&unique_k=iiCldvZ&up_id=19977489"),
("http://b23.tv/3ymowwx", "https://www.bilibili.com/video/BV15Y411n754?p=1&share_medium=android_i&share_plat=android&share_source=COPY&share_tag=s_i&timestamp=1650293889&unique_k=3ymowwx") ("https://b23.tv/xWiWFl9", "https://www.bilibili.com/video/BV1N54y1c7us?buvid=XY705C970C2ADBB710C1801E1F45BDC3B9210&is_story_h5=false&mid=w%2B1u1wpibjYsW4pP%2FIo7Ww%3D%3D&p=1&plat_id=116&share_from=ugc&share_medium=android&share_plat=android&share_session_id=6da09711-d601-4da4-bba1-46a4edbb1c60&share_source=COPY&share_tag=s_i&timestamp=1680280016&unique_k=xWiWFl9&up_id=275354674"),
("http://b23.tv/uJPIvhv", "https://www.bilibili.com/video/BV1E84y1C7in?is_story_h5=false&p=1&share_from=ugc&share_medium=android&share_plat=android&share_session_id=4a077fa1-5ee2-40d4-ac37-bf9a2bf567e3&share_source=COPY&share_tag=s_i&timestamp=1669044671&unique_k=uJPIvhv")
// this link have been expired
// ("http://b23.tv/3ymowwx", "https://www.bilibili.com/video/BV15Y411n754?p=1&share_medium=android_i&share_plat=android&share_source=COPY&share_tag=s_i&timestamp=1650293889&unique_k=3ymowwx")
) )
"not b23.tv link is not supported" in: "not b23.tv link is not supported" in: