More API read coverage

This commit is contained in:
Andrew Godwin 2022-12-11 11:22:06 -07:00
parent 3e062aed36
commit fc8a21fc5c
15 changed files with 536 additions and 147 deletions

View File

@ -185,3 +185,10 @@ class Hashtag(StatorModel):
return f'<a class="hashtag" href="/tags/{hashtag.lower()}/">#{hashtag}</a>'
return mark_safe(Hashtag.hashtag_regex.sub(replacer, content))
def to_mastodon_json(self):
return {
"name": self.hashtag,
"url": self.urls.view.full(),
"history": [],
}

View File

@ -262,7 +262,7 @@ class Post(StatorModel):
r"(^|[^\w\d\-_])@([\w\d\-_]+(?:@[\w\d\-_]+\.[\w\d\-_\.]+)?)"
)
def linkify_mentions(self, content, local=False):
def linkify_mentions(self, content: str, local: bool = False) -> str:
"""
Links mentions _in the context of the post_ - as in, using the mentions
property as the only source (as we might be doing this without other

View File

@ -1,5 +1,7 @@
from django.db import models
from core.ld import format_ld_date
class TimelineEvent(models.Model):
"""
@ -143,3 +145,26 @@ class TimelineEvent(models.Model):
subject_post_id=interaction.post_id,
subject_identity_id=interaction.identity_id,
).delete()
### Mastodon Client API ###
def to_mastodon_notification_json(self):
result = {
"id": self.pk,
"created_at": format_ld_date(self.created),
"account": self.subject_identity.to_mastodon_json(),
}
if self.type == self.Types.liked:
result["type"] = "favourite"
result["status"] = self.subject_post.to_mastodon_json()
elif self.type == self.Types.boosted:
result["type"] = "reblog"
result["status"] = self.subject_post.to_mastodon_json()
elif self.type == self.Types.mentioned:
result["type"] = "mention"
result["status"] = self.subject_post.to_mastodon_json()
elif self.type == self.Types.followed:
result["type"] = "follow"
else:
raise ValueError(f"Cannot convert {self.type} to notification JSON")
return result

136
activities/search.py Normal file
View File

@ -0,0 +1,136 @@
import httpx
from asgiref.sync import async_to_sync
from activities.models import Hashtag, Post
from core.ld import canonicalise
from users.models import Domain, Identity, IdentityStates
from users.models.system_actor import SystemActor
class Searcher:
"""
Captures the logic needed to search - reused in the UI and API
"""
def __init__(self, query: str, identity: Identity | None):
self.query = query.strip().lower()
self.identity = identity
def search_identities_handle(self) -> set[Identity]:
"""
Searches for identities by their handles
"""
# Short circuit if it's obviously not for us
if "://" in self.query:
return set()
# Try to fetch the user by handle
handle = self.query.lstrip("@")
results: set[Identity] = set()
if "@" in handle:
username, domain = handle.split("@", 1)
# Resolve the domain to the display domain
domain_instance = Domain.get_domain(domain)
try:
if domain_instance is None:
raise Identity.DoesNotExist()
identity = Identity.objects.get(
domain=domain_instance, username=username
)
except Identity.DoesNotExist:
if self.identity is not None:
# Allow authenticated users to fetch remote
identity = Identity.by_username_and_domain(
username, domain, fetch=True
)
if identity and identity.state == IdentityStates.outdated:
async_to_sync(identity.fetch_actor)()
else:
identity = None
if identity:
results.add(identity)
else:
for identity in Identity.objects.filter(username=handle)[:20]:
results.add(identity)
for identity in Identity.objects.filter(username__startswith=handle)[:20]:
results.add(identity)
return results
def search_url(self) -> Post | Identity | None:
"""
Searches for an identity or post by URL.
"""
# Short circuit if it's obviously not for us
if "://" not in self.query:
return None
# Fetch the provided URL as the system actor to retrieve the AP JSON
try:
response = async_to_sync(SystemActor().signed_request)(
method="get",
uri=self.query,
)
except (httpx.RequestError, httpx.ConnectError):
return None
if response.status_code >= 400:
return None
document = canonicalise(response.json(), include_security=True)
type = document.get("type", "unknown").lower()
# Is it an identity?
if type == "person":
# Try and retrieve the profile by actor URI
identity = Identity.by_actor_uri(document["id"], create=True)
if identity and identity.state == IdentityStates.outdated:
async_to_sync(identity.fetch_actor)()
return identity
# Is it a post?
elif type == "note":
# Try and retrieve the post by URI
# (we do not trust the JSON we just got - fetch from source!)
try:
return Post.by_object_uri(document["id"], fetch=True)
except Post.DoesNotExist:
return None
# Dunno what it is
else:
return None
def search_hashtags(self) -> set[Hashtag]:
"""
Searches for hashtags by their name
"""
# Short circuit out if it's obviously not a hashtag
if "@" in self.query or "://" in self.query:
return set()
results: set[Hashtag] = set()
name = self.query.lstrip("#")
for hashtag in Hashtag.objects.public().hashtag_or_alias(name)[:10]:
results.add(hashtag)
for hashtag in Hashtag.objects.public().filter(hashtag__startswith=name)[:10]:
results.add(hashtag)
return results
def search_all(self):
"""
Returns all possible results for a search
"""
results = {
"identities": self.search_identities_handle(),
"hashtags": self.search_hashtags(),
"posts": set(),
}
url_result = self.search_url()
if isinstance(url_result, Identity):
results["identities"].add(url_result)
if isinstance(url_result, Post):
results["posts"].add(url_result)
return results

View File

@ -1,12 +1,7 @@
import httpx
from asgiref.sync import async_to_sync
from django import forms
from django.views.generic import FormView
from activities.models import Hashtag, Post
from core.ld import canonicalise
from users.models import Domain, Identity, IdentityStates
from users.models.system_actor import SystemActor
from activities.search import Searcher
class Search(FormView):
@ -19,126 +14,9 @@ class Search(FormView):
widget=forms.TextInput(attrs={"type": "search", "autofocus": "autofocus"}),
)
def search_identities_handle(self, query: str):
"""
Searches for identities by their handles
"""
# Short circuit if it's obviously not for us
if "://" in query:
return set()
# Try to fetch the user by handle
query = query.lstrip("@")
results: set[Identity] = set()
if "@" in query:
username, domain = query.split("@", 1)
# Resolve the domain to the display domain
domain_instance = Domain.get_domain(domain)
try:
if domain_instance is None:
raise Identity.DoesNotExist()
identity = Identity.objects.get(
domain=domain_instance, username=username
)
except Identity.DoesNotExist:
if self.request.identity is not None:
# Allow authenticated users to fetch remote
identity = Identity.by_username_and_domain(
username, domain, fetch=True
)
if identity and identity.state == IdentityStates.outdated:
async_to_sync(identity.fetch_actor)()
else:
identity = None
if identity:
results.add(identity)
else:
for identity in Identity.objects.filter(username=query)[:20]:
results.add(identity)
for identity in Identity.objects.filter(username__startswith=query)[:20]:
results.add(identity)
return results
def search_url(self, query: str) -> Post | Identity | None:
"""
Searches for an identity or post by URL.
"""
# Short circuit if it's obviously not for us
if "://" not in query:
return None
# Clean up query
query = query.strip()
# Fetch the provided URL as the system actor to retrieve the AP JSON
try:
response = async_to_sync(SystemActor().signed_request)(
method="get", uri=query
)
except (httpx.RequestError, httpx.ConnectError):
return None
if response.status_code >= 400:
return None
document = canonicalise(response.json(), include_security=True)
type = document.get("type", "unknown").lower()
# Is it an identity?
if type == "person":
# Try and retrieve the profile by actor URI
identity = Identity.by_actor_uri(document["id"], create=True)
if identity and identity.state == IdentityStates.outdated:
async_to_sync(identity.fetch_actor)()
return identity
# Is it a post?
elif type == "note":
# Try and retrieve the post by URI
# (we do not trust the JSON we just got - fetch from source!)
try:
return Post.by_object_uri(document["id"], fetch=True)
except Post.DoesNotExist:
return None
# Dunno what it is
else:
return None
def search_hashtags(self, query: str):
"""
Searches for hashtags by their name
"""
# Short circuit out if it's obviously not a hashtag
if "@" in query or "://" in query:
return set()
results: set[Hashtag] = set()
query = query.lstrip("#")
for hashtag in Hashtag.objects.public().hashtag_or_alias(query)[:10]:
results.add(hashtag)
for hashtag in Hashtag.objects.public().filter(hashtag__startswith=query)[:10]:
results.add(hashtag)
return results
def form_valid(self, form):
query = form.cleaned_data["query"].lower()
results = {
"identities": self.search_identities_handle(query),
"hashtags": self.search_hashtags(query),
"posts": set(),
}
url_result = self.search_url(query)
if isinstance(url_result, Identity):
results["identities"].add(url_result)
if isinstance(url_result, Post):
results["posts"].add(url_result)
searcher = Searcher(form.cleaned_data["query"], self.request.identity)
# Render results
context = self.get_context_data(form=form)
context["results"] = results
context["results"] = searcher.search_all()
return self.render_to_response(context)

View File

@ -106,3 +106,57 @@ class Status(Schema):
muted: bool | None
bookmarked: bool | None
pinned: bool | None
class Conversation(Schema):
id: str
unread: bool
accounts: list[Account]
last_status: Status | None = Field(...)
class Notification(Schema):
id: str
type: Literal[
"mention",
"status",
"reblog",
"follow",
"follow_request",
"favourite",
"poll",
"update",
"admin.sign_up",
"admin.report",
]
created_at: str
account: Account
status: Status | None
class Tag(Schema):
name: str
url: str
history: dict
class Search(Schema):
accounts: list[Account]
statuses: list[Status]
hashtags: list[Tag]
class Relationship(Schema):
id: str
following: bool
followed_by: bool
showing_reblogs: bool
notifying: bool
blocking: bool
blocked_by: bool
muting: bool
muting_notifications: bool
requested: bool
domain_blocking: bool
endorsed: bool
note: str

View File

@ -1,6 +1,8 @@
from .accounts import * # noqa
from .apps import * # noqa
from .base import api # noqa
from .base import api_router # noqa
from .instance import * # noqa
from .notifications import * # noqa
from .oauth import * # noqa
from .search import * # noqa
from .timelines import * # noqa

View File

@ -1,9 +1,94 @@
from .. import schemas
from django.shortcuts import get_object_or_404
from activities.models import Post
from api import schemas
from api.views.base import api_router
from users.models import Identity
from ..decorators import identity_required
from .base import api
@api.get("/v1/accounts/verify_credentials", response=schemas.Account)
@api_router.get("/v1/accounts/verify_credentials", response=schemas.Account)
@identity_required
def verify_credentials(request):
return request.identity.to_mastodon_json()
@api_router.get("/v1/accounts/relationships", response=list[schemas.Relationship])
@identity_required
def account_relationships(request):
ids = request.GET.getlist("id[]")
result = []
for id in ids:
identity = get_object_or_404(Identity, pk=id)
result.append(
{
"id": identity.pk,
"following": identity.inbound_follows.filter(
source=request.identity
).exists(),
"followed_by": identity.outbound_follows.filter(
target=request.identity
).exists(),
"showing_reblogs": True,
"notifying": False,
"blocking": False,
"blocked_by": False,
"muting": False,
"muting_notifications": False,
"requested": False,
"domain_blocking": False,
"endorsed": False,
"note": "",
}
)
return result
@api_router.get("/v1/accounts/{id}", response=schemas.Account)
@identity_required
def account(request, id: str):
identity = get_object_or_404(Identity, pk=id)
return identity.to_mastodon_json()
@api_router.get("/v1/accounts/{id}/statuses", response=list[schemas.Status])
@identity_required
def account_statuses(
request,
id: str,
exclude_reblogs: bool = False,
exclude_replies: bool = False,
only_media: bool = False,
pinned: bool = False,
tagged: str | None = None,
max_id: str | None = None,
since_id: str | None = None,
min_id: str | None = None,
limit: int = 20,
):
identity = get_object_or_404(Identity, pk=id)
posts = (
identity.posts.public()
.select_related("author")
.prefetch_related("attachments")
.order_by("-created")
)
if pinned:
return []
if only_media:
posts = posts.filter(attachments__pk__isnull=False)
if tagged:
posts = posts.tagged_with(tagged)
if max_id:
anchor_post = Post.objects.get(pk=max_id)
posts = posts.filter(created__lt=anchor_post.created)
if since_id:
anchor_post = Post.objects.get(pk=since_id)
posts = posts.filter(created__gt=anchor_post.created)
if min_id:
# Min ID requires LIMIT posts _immediately_ newer than specified, so we
# invert the ordering to accomodate
anchor_post = Post.objects.get(pk=min_id)
posts = posts.filter(created__gt=anchor_post.created).order_by("created")
return [post.to_mastodon_json() for post in posts[:limit]]

View File

@ -4,7 +4,7 @@ from ninja import Schema
from .. import schemas
from ..models import Application
from .base import api
from .base import api_router
class CreateApplicationSchema(Schema):
@ -14,7 +14,7 @@ class CreateApplicationSchema(Schema):
website: None | str = None
@api.post("/v1/apps", response=schemas.Application)
@api_router.post("/v1/apps", response=schemas.Application)
def add_app(request, details: CreateApplicationSchema):
client_id = "tk-" + secrets.token_urlsafe(16)
client_secret = secrets.token_urlsafe(40)

View File

@ -2,4 +2,4 @@ from ninja import NinjaAPI
from api.parser import FormOrJsonParser
api = NinjaAPI(parser=FormOrJsonParser())
api_router = NinjaAPI(parser=FormOrJsonParser())

View File

@ -5,10 +5,10 @@ from core.models import Config
from takahe import __version__
from users.models import Domain, Identity
from .base import api
from .base import api_router
@api.get("/v1/instance")
@api_router.get("/v1/instance")
def instance_info(request):
return {
"uri": request.headers.get("host", settings.SETUP.MAIN_DOMAIN),
@ -16,7 +16,7 @@ def instance_info(request):
"short_description": "",
"description": "",
"email": "",
"version": __version__,
"version": f"takahe/{__version__}",
"urls": {},
"stats": {
"user_count": Identity.objects.filter(local=True).count(),

View File

@ -0,0 +1,52 @@
from activities.models import TimelineEvent
from .. import schemas
from ..decorators import identity_required
from .base import api_router
@api_router.get("/v1/notifications", response=list[schemas.Notification])
@identity_required
def notifications(
request,
max_id: str | None = None,
since_id: str | None = None,
min_id: str | None = None,
limit: int = 20,
account_id: str | None = None,
):
if limit > 40:
limit = 40
# Types/exclude_types use weird syntax so we have to handle them manually
base_types = {
"favourite": TimelineEvent.Types.liked,
"reblog": TimelineEvent.Types.boosted,
"mention": TimelineEvent.Types.mentioned,
"follow": TimelineEvent.Types.followed,
}
requested_types = set(request.GET.getlist("types[]"))
excluded_types = set(request.GET.getlist("exclude_types[]"))
if not requested_types:
requested_types = set(base_types.keys())
requested_types.difference_update(excluded_types)
# Use that to pull relevant events
events = (
TimelineEvent.objects.filter(
identity=request.identity,
type__in=[base_types[r] for r in requested_types],
)
.order_by("-created")
.select_related("subject_post", "subject_post__author", "subject_identity")
)
if max_id:
anchor_event = TimelineEvent.objects.get(pk=max_id)
events = events.filter(created__lt=anchor_event.created)
if since_id:
anchor_event = TimelineEvent.objects.get(pk=since_id)
events = events.filter(created__gt=anchor_event.created)
if min_id:
# Min ID requires LIMIT events _immediately_ newer than specified, so we
# invert the ordering to accomodate
anchor_event = TimelineEvent.objects.get(pk=min_id)
events = events.filter(created__gt=anchor_event.created).order_by("created")
return [event.to_mastodon_notification_json() for event in events[:limit]]

42
api/views/search.py Normal file
View File

@ -0,0 +1,42 @@
from typing import Literal
from ninja import Field
from activities.search import Searcher
from api import schemas
from api.decorators import identity_required
from api.views.base import api_router
@api_router.get("/v2/search", response=schemas.Search)
@identity_required
def search(
request,
q: str,
type: Literal["accounts", "hashtags", "statuses"] | None = None,
fetch_identities: bool = Field(False, alias="resolve"),
following: bool = False,
exclude_unreviewed: bool = False,
account_id: str | None = None,
max_id: str | None = None,
since_id: str | None = None,
min_id: str | None = None,
limit: int = 20,
offset: int = 0,
):
if limit > 40:
limit = 40
result: dict[str, list] = {"accounts": [], "statuses": [], "hashtags": []}
# We don't support pagination for searches yet
if max_id or since_id or min_id or offset:
return result
# Run search
searcher = Searcher(q, request.identity)
search_result = searcher.search_all()
if type is None or type == "accounts":
result["accounts"] = [i.to_mastodon_json() for i in search_result["identities"]]
if type is None or type == "hashtag":
result["hashtag"] = [h.to_mastodon_json() for h in search_result["hashtags"]]
if type is None or type == "statuses":
result["statuses"] = [p.to_mastodon_json() for p in search_result["posts"]]
return result

View File

@ -1,16 +1,21 @@
from activities.models import TimelineEvent
from activities.models import Post, TimelineEvent
from .. import schemas
from ..decorators import identity_required
from .base import api
from .base import api_router
@api.get("/v1/timelines/home", response=list[schemas.Status])
@api_router.get("/v1/timelines/home", response=list[schemas.Status])
@identity_required
def home(request):
if request.GET.get("max_id"):
return []
limit = int(request.GET.get("limit", "20"))
def home(
request,
max_id: str | None = None,
since_id: str | None = None,
min_id: str | None = None,
limit: int = 20,
):
if limit > 40:
limit = 40
events = (
TimelineEvent.objects.filter(
identity=request.identity,
@ -18,6 +23,109 @@ def home(request):
)
.select_related("subject_post", "subject_post__author")
.prefetch_related("subject_post__attachments")
.order_by("-created")[:limit]
.order_by("-created")
)
return [event.subject_post.to_mastodon_json() for event in events]
if max_id:
anchor_post = Post.objects.get(pk=max_id)
events = events.filter(created__lt=anchor_post.created)
if since_id:
anchor_post = Post.objects.get(pk=since_id)
events = events.filter(created__gt=anchor_post.created)
if min_id:
# Min ID requires LIMIT events _immediately_ newer than specified, so we
# invert the ordering to accomodate
anchor_post = Post.objects.get(pk=min_id)
events = events.filter(created__gt=anchor_post.created).order_by("created")
return [event.subject_post.to_mastodon_json() for event in events[:limit]]
@api_router.get("/v1/timelines/public", response=list[schemas.Status])
@identity_required
def public(
request,
local: bool = False,
remote: bool = False,
only_media: bool = False,
max_id: str | None = None,
since_id: str | None = None,
min_id: str | None = None,
limit: int = 20,
):
if limit > 40:
limit = 40
posts = (
Post.objects.public()
.select_related("author")
.prefetch_related("attachments")
.order_by("-created")
)
if local:
posts = posts.filter(local=True)
elif remote:
posts = posts.filter(local=False)
if only_media:
posts = posts.filter(attachments__id__isnull=True)
if max_id:
anchor_post = Post.objects.get(pk=max_id)
posts = posts.filter(created__lt=anchor_post.created)
if since_id:
anchor_post = Post.objects.get(pk=since_id)
posts = posts.filter(created__gt=anchor_post.created)
if min_id:
# Min ID requires LIMIT posts _immediately_ newer than specified, so we
# invert the ordering to accomodate
anchor_post = Post.objects.get(pk=min_id)
posts = posts.filter(created__gt=anchor_post.created).order_by("created")
return [post.to_mastodon_json() for post in posts[:limit]]
@api_router.get("/v1/timelines/tag/{hashtag}", response=list[schemas.Status])
@identity_required
def hashtag(
request,
hashtag: str,
local: bool = False,
only_media: bool = False,
max_id: str | None = None,
since_id: str | None = None,
min_id: str | None = None,
limit: int = 20,
):
if limit > 40:
limit = 40
posts = (
Post.objects.public()
.tagged_with(hashtag)
.select_related("author")
.prefetch_related("attachments")
.order_by("-created")
)
if local:
posts = posts.filter(local=True)
if only_media:
posts = posts.filter(attachments__id__isnull=True)
if max_id:
anchor_post = Post.objects.get(pk=max_id)
posts = posts.filter(created__lt=anchor_post.created)
if since_id:
anchor_post = Post.objects.get(pk=since_id)
posts = posts.filter(created__gt=anchor_post.created)
if min_id:
# Min ID requires LIMIT posts _immediately_ newer than specified, so we
# invert the ordering to accomodate
anchor_post = Post.objects.get(pk=min_id)
posts = posts.filter(created__gt=anchor_post.created).order_by("created")
return [post.to_mastodon_json() for post in posts[:limit]]
@api_router.get("/v1/conversations", response=list[schemas.Status])
@identity_required
def conversations(
request,
max_id: str | None = None,
since_id: str | None = None,
min_id: str | None = None,
limit: int = 20,
):
# We don't implement this yet
return []

View File

@ -4,7 +4,7 @@ from django.urls import path, re_path
from django.views.static import serve
from activities.views import compose, explore, follows, posts, search, timelines
from api.views import api, oauth
from api.views import api_router, oauth
from core import views as core
from mediaproxy import views as mediaproxy
from stator import views as stator
@ -203,7 +203,7 @@ urlpatterns = [
path("actor/inbox/", activitypub.Inbox.as_view()),
path("inbox/", activitypub.Inbox.as_view(), name="shared_inbox"),
# API/Oauth
path("api/", api.urls),
path("api/", api_router.urls),
path("oauth/authorize", oauth.AuthorizationView.as_view()),
path("oauth/token", oauth.TokenView.as_view()),
path("oauth/revoke_token", oauth.RevokeTokenView.as_view()),