This repository has been archived on 2023-09-24. You can view files and clone it, but cannot push or open issues or pull requests.
takahe/users/models/report.py

130 lines
3.5 KiB
Python
Raw Normal View History

2022-12-17 22:45:31 +01:00
import httpx
import urlman
from django.db import models
from core.ld import canonicalise
from stator.models import State, StateField, StateGraph, StatorModel
from users.models import SystemActor
class ReportStates(StateGraph):
new = State(try_interval=600)
sent = State()
new.transitions_to(sent)
@classmethod
async def handle_new(cls, instance: "Report"):
"""
Sends the report to the remote server if we need to
"""
report = await instance.afetch_full()
if report.forward and not report.subject_identity.domain.local:
system_actor = SystemActor()
try:
await system_actor.signed_request(
method="post",
uri=report.subject_identity.inbox_uri,
body=canonicalise(report.to_ap()),
)
except httpx.RequestError:
return
return cls.sent
class Report(StatorModel):
"""
A complaint about a user or post.
"""
class Types(models.TextChoices):
spam = "spam"
hateful = "hateful"
illegal = "illegal"
remote = "remote"
other = "other"
state = StateField(ReportStates)
subject_identity = models.ForeignKey(
"users.Identity",
on_delete=models.CASCADE,
blank=True,
null=True,
related_name="reports",
)
subject_post = models.ForeignKey(
"activities.Post",
on_delete=models.SET_NULL,
blank=True,
null=True,
related_name="reports",
)
source_identity = models.ForeignKey(
"users.Identity",
on_delete=models.CASCADE,
blank=True,
null=True,
related_name="filed_reports",
)
source_domain = models.ForeignKey(
"users.Domain",
on_delete=models.CASCADE,
blank=True,
null=True,
related_name="filed_reports",
)
moderator = models.ForeignKey(
"users.Identity",
on_delete=models.SET_NULL,
blank=True,
null=True,
related_name="moderated_reports",
)
type = models.CharField(max_length=100, choices=Types.choices)
complaint = models.TextField()
forward = models.BooleanField(default=False)
valid = models.BooleanField(null=True)
seen = models.DateTimeField(blank=True, null=True)
resolved = models.DateTimeField(blank=True, null=True)
notes = models.TextField(blank=True, null=True)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
class urls(urlman.Urls):
admin = "/admin/reports/"
admin_view = "{admin}{self.pk}/"
### ActivityPub ###
async def afetch_full(self) -> "Report":
return await Report.objects.select_related(
"source_identity",
"source_domain",
"subject_identity__domain",
"subject_identity",
"subject_post",
).aget(pk=self.pk)
def to_ap(self):
system_actor = SystemActor()
if self.subject_post:
objects = [
self.subject_post.object_uri,
self.subject_identity.actor_uri,
]
else:
objects = self.subject_identity.actor_uri
return {
"id": f"https://{self.source_domain.uri_domain}/reports/{self.id}/",
"type": "Flag",
"actor": system_actor.actor_uri,
"object": objects,
"content": self.complaint,
}