This repository has been archived on 2023-09-24. You can view files and clone it, but cannot push or open issues or pull requests.
takahe/miniq/tasks.py
Andrew Godwin fb6c409a9a Rework task system and fetching.
I can taste how close follow is to working.
2022-11-06 21:30:07 -07:00

35 lines
1.0 KiB
Python

import traceback
from users.tasks.follow import handle_follow_request
from users.tasks.identity import handle_identity_fetch
from users.tasks.inbox import handle_inbox_item
class TaskHandler:
handlers = {
"identity_fetch": handle_identity_fetch,
"inbox_item": handle_inbox_item,
"follow_request": handle_follow_request,
}
def __init__(self, task):
self.task = task
self.subject = self.task.subject
self.payload = self.task.payload
async def handle(self):
try:
print(f"Task {self.task}: Starting")
if self.task.type not in self.handlers:
raise ValueError(f"Cannot handle type {self.task.type}")
await self.handlers[self.task.type](
self,
)
await self.task.complete()
print(f"Task {self.task}: Complete")
except BaseException as e:
print(f"Task {self.task}: Error {e}")
traceback.print_exc()
await self.task.fail(f"{e}\n\n" + traceback.format_exc())