2022-11-09 07:06:29 +01:00
|
|
|
import asyncio
|
|
|
|
import datetime
|
|
|
|
import time
|
2022-11-10 07:48:31 +01:00
|
|
|
import traceback
|
2022-11-09 07:06:29 +01:00
|
|
|
import uuid
|
2022-11-19 18:20:13 +01:00
|
|
|
from typing import List, Optional, Type
|
2022-11-09 07:06:29 +01:00
|
|
|
|
|
|
|
from django.utils import timezone
|
|
|
|
|
2022-11-20 22:20:28 +01:00
|
|
|
from core import exceptions
|
2022-11-23 05:06:21 +01:00
|
|
|
from core.models import Config
|
2022-11-10 06:29:33 +01:00
|
|
|
from stator.models import StatorModel
|
2022-11-09 07:06:29 +01:00
|
|
|
|
|
|
|
|
|
|
|
class StatorRunner:
|
|
|
|
"""
|
|
|
|
Runs tasks on models that are looking for state changes.
|
2022-11-25 01:11:04 +01:00
|
|
|
Designed to run either indefinitely, or just for a few seconds.
|
2022-11-09 07:06:29 +01:00
|
|
|
"""
|
|
|
|
|
2022-11-14 02:42:47 +01:00
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
models: List[Type[StatorModel]],
|
2022-11-19 01:24:43 +01:00
|
|
|
concurrency: int = 50,
|
|
|
|
concurrency_per_model: int = 10,
|
2022-11-19 18:20:13 +01:00
|
|
|
liveness_file: Optional[str] = None,
|
|
|
|
schedule_interval: int = 30,
|
|
|
|
lock_expiry: int = 300,
|
2022-11-26 19:54:14 +01:00
|
|
|
run_for: int = 0,
|
2022-11-14 02:42:47 +01:00
|
|
|
):
|
2022-11-09 07:06:29 +01:00
|
|
|
self.models = models
|
|
|
|
self.runner_id = uuid.uuid4().hex
|
2022-11-14 02:42:47 +01:00
|
|
|
self.concurrency = concurrency
|
|
|
|
self.concurrency_per_model = concurrency_per_model
|
2022-11-19 18:20:13 +01:00
|
|
|
self.liveness_file = liveness_file
|
|
|
|
self.schedule_interval = schedule_interval
|
|
|
|
self.lock_expiry = lock_expiry
|
2022-11-26 19:54:14 +01:00
|
|
|
self.run_for = run_for
|
2022-11-09 07:06:29 +01:00
|
|
|
|
|
|
|
async def run(self):
|
|
|
|
self.handled = 0
|
2022-11-26 19:54:14 +01:00
|
|
|
self.started = time.monotonic()
|
2022-11-19 18:20:13 +01:00
|
|
|
self.last_clean = time.monotonic() - self.schedule_interval
|
2022-11-09 07:06:29 +01:00
|
|
|
self.tasks = []
|
|
|
|
# For the first time period, launch tasks
|
2022-11-10 06:29:33 +01:00
|
|
|
print("Running main task loop")
|
2022-11-19 18:20:13 +01:00
|
|
|
try:
|
|
|
|
while True:
|
|
|
|
# Do we need to do cleaning?
|
|
|
|
if (time.monotonic() - self.last_clean) >= self.schedule_interval:
|
2022-11-23 05:06:21 +01:00
|
|
|
# Refresh the config
|
|
|
|
Config.system = await Config.aload_system()
|
2022-11-19 18:20:13 +01:00
|
|
|
print(f"{self.handled} tasks processed so far")
|
|
|
|
print("Running cleaning and scheduling")
|
2022-11-27 19:09:46 +01:00
|
|
|
await self.run_cleanup()
|
|
|
|
|
2022-11-20 20:32:49 +01:00
|
|
|
self.remove_completed_tasks()
|
2022-11-27 19:09:46 +01:00
|
|
|
await self.fetch_and_process_tasks()
|
|
|
|
|
2022-11-26 19:54:14 +01:00
|
|
|
# Are we in limited run mode?
|
|
|
|
if self.run_for and (time.monotonic() - self.started) > self.run_for:
|
2022-11-19 18:20:13 +01:00
|
|
|
break
|
|
|
|
# Prevent busylooping
|
2022-11-26 19:54:14 +01:00
|
|
|
await asyncio.sleep(0.5)
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
pass
|
|
|
|
# Wait for tasks to finish
|
|
|
|
print("Waiting for tasks to complete")
|
|
|
|
while True:
|
|
|
|
self.remove_completed_tasks()
|
|
|
|
if not self.tasks:
|
|
|
|
break
|
|
|
|
# Prevent busylooping
|
|
|
|
await asyncio.sleep(0.1)
|
2022-11-10 06:29:33 +01:00
|
|
|
print("Complete")
|
2022-11-09 07:06:29 +01:00
|
|
|
return self.handled
|
|
|
|
|
2022-11-27 19:09:46 +01:00
|
|
|
async def run_cleanup(self):
|
|
|
|
"""
|
|
|
|
Do any transition cleanup tasks
|
|
|
|
"""
|
|
|
|
for model in self.models:
|
|
|
|
asyncio.create_task(model.atransition_clean_locks())
|
|
|
|
asyncio.create_task(model.atransition_schedule_due())
|
|
|
|
self.last_clean = time.monotonic()
|
|
|
|
|
|
|
|
async def fetch_and_process_tasks(self):
|
|
|
|
# Calculate space left for tasks
|
|
|
|
space_remaining = self.concurrency - len(self.tasks)
|
|
|
|
# Fetch new tasks
|
|
|
|
for model in self.models:
|
|
|
|
if space_remaining > 0:
|
|
|
|
for instance in await model.atransition_get_with_lock(
|
|
|
|
number=min(space_remaining, self.concurrency_per_model),
|
|
|
|
lock_expiry=(
|
|
|
|
timezone.now() + datetime.timedelta(seconds=self.lock_expiry)
|
|
|
|
),
|
|
|
|
):
|
|
|
|
self.tasks.append(
|
|
|
|
asyncio.create_task(self.run_transition(instance))
|
|
|
|
)
|
|
|
|
self.handled += 1
|
|
|
|
space_remaining -= 1
|
|
|
|
|
2022-11-10 07:48:31 +01:00
|
|
|
async def run_transition(self, instance: StatorModel):
|
|
|
|
"""
|
|
|
|
Wrapper for atransition_attempt with fallback error handling
|
|
|
|
"""
|
|
|
|
try:
|
2022-11-11 07:42:43 +01:00
|
|
|
print(
|
|
|
|
f"Attempting transition on {instance._meta.label_lower}#{instance.pk} from state {instance.state}"
|
|
|
|
)
|
2022-11-10 07:48:31 +01:00
|
|
|
await instance.atransition_attempt()
|
2022-11-20 20:24:03 +01:00
|
|
|
except BaseException as e:
|
2022-11-20 22:20:28 +01:00
|
|
|
await exceptions.acapture_exception(e)
|
2022-11-10 07:48:31 +01:00
|
|
|
traceback.print_exc()
|
|
|
|
|
2022-11-09 07:06:29 +01:00
|
|
|
def remove_completed_tasks(self):
|
2022-11-10 07:48:31 +01:00
|
|
|
"""
|
|
|
|
Removes all completed asyncio.Tasks from our local in-progress list
|
|
|
|
"""
|
2022-11-09 07:06:29 +01:00
|
|
|
self.tasks = [t for t in self.tasks if not t.done()]
|