95 lines
3.2 KiB
Python
95 lines
3.2 KiB
Python
import asyncio
|
|
import datetime
|
|
import time
|
|
import traceback
|
|
import uuid
|
|
from typing import List, Type
|
|
|
|
from django.utils import timezone
|
|
|
|
from stator.models import StatorModel
|
|
|
|
|
|
class StatorRunner:
|
|
"""
|
|
Runs tasks on models that are looking for state changes.
|
|
Designed to run in a one-shot mode, living inside a request.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
models: List[Type[StatorModel]],
|
|
concurrency: int = 50,
|
|
concurrency_per_model: int = 10,
|
|
run_period: int = 60,
|
|
wait_period: int = 30,
|
|
):
|
|
self.models = models
|
|
self.runner_id = uuid.uuid4().hex
|
|
self.concurrency = concurrency
|
|
self.concurrency_per_model = concurrency_per_model
|
|
self.run_period = run_period
|
|
self.total_period = run_period + wait_period
|
|
|
|
async def run(self):
|
|
start_time = time.monotonic()
|
|
self.handled = 0
|
|
self.tasks = []
|
|
# Clean up old locks
|
|
print("Running initial cleaning and scheduling")
|
|
initial_tasks = []
|
|
for model in self.models:
|
|
initial_tasks.append(model.atransition_clean_locks())
|
|
initial_tasks.append(model.atransition_schedule_due())
|
|
await asyncio.gather(*initial_tasks)
|
|
# For the first time period, launch tasks
|
|
print("Running main task loop")
|
|
while (time.monotonic() - start_time) < self.run_period:
|
|
self.remove_completed_tasks()
|
|
space_remaining = self.concurrency - len(self.tasks)
|
|
# Fetch new tasks
|
|
for model in self.models:
|
|
if space_remaining > 0:
|
|
for instance in await model.atransition_get_with_lock(
|
|
number=min(space_remaining, self.concurrency_per_model),
|
|
lock_expiry=(
|
|
timezone.now()
|
|
+ datetime.timedelta(seconds=(self.total_period * 2) + 60)
|
|
),
|
|
):
|
|
self.tasks.append(
|
|
asyncio.create_task(self.run_transition(instance))
|
|
)
|
|
self.handled += 1
|
|
space_remaining -= 1
|
|
# Prevent busylooping
|
|
await asyncio.sleep(0.1)
|
|
# Then wait for tasks to finish
|
|
print("Waiting for tasks to complete")
|
|
while (time.monotonic() - start_time) < self.total_period:
|
|
self.remove_completed_tasks()
|
|
if not self.tasks:
|
|
break
|
|
# Prevent busylooping
|
|
await asyncio.sleep(1)
|
|
print("Complete")
|
|
return self.handled
|
|
|
|
async def run_transition(self, instance: StatorModel):
|
|
"""
|
|
Wrapper for atransition_attempt with fallback error handling
|
|
"""
|
|
try:
|
|
print(
|
|
f"Attempting transition on {instance._meta.label_lower}#{instance.pk} from state {instance.state}"
|
|
)
|
|
await instance.atransition_attempt()
|
|
except BaseException:
|
|
traceback.print_exc()
|
|
|
|
def remove_completed_tasks(self):
|
|
"""
|
|
Removes all completed asyncio.Tasks from our local in-progress list
|
|
"""
|
|
self.tasks = [t for t in self.tasks if not t.done()]
|