Skip to content

fediverse_pasture.server

BlueprintForActors dataclass

Creates a blueprint to expose actors. This blueprint handles exposing the Actor objects and their Inboxes

If application_actor is not set, it is set to the actor with name actor from the list actors.

Parameters:

Name Type Description Default
actors List[ActorData]

The actors to expose through the blueprint

required
application_actor ActorData | None

Can be used to specify the application actor. Otherwise, the actor from actors with name ‘actor’ is used.

None
on_inbox Callable[list, Awaitable] | None

Awaited when a message is received in the inbox. The json content of the message is passed as a parameter.

None
Source code in fediverse_pasture/server/__init__.py
@dataclass
class BlueprintForActors:
    """Creates a blueprint to expose actors. This blueprint handles
    exposing the Actor objects and their Inboxes

    If `application_actor` is not set, it is set to the actor
    with name `actor` from the list `actors`."""

    actors: List[ActorData] = field(
        metadata={"description": """The actors to expose through the blueprint"""},
    )
    application_actor: ActorData | None = field(
        default=None,
        metadata={
            "description": """Can be used to specify the application
        actor. Otherwise, the actor from `actors` with name 'actor' is used."""
        },
    )
    on_inbox: Callable[[dict], Awaitable] | None = field(
        default=None,
        metadata={
            "description": """Awaited when a message is received in the inbox. The
        json content of the message is passed as a parameter."""
        },
    )

    def __post_init__(self):
        if self.application_actor is None:
            self.application_actor = self.actor_for_name("actor")

    def actor_for_name(self, name: str) -> ActorData | None:
        for actor in self.actors:
            if actor.actor_name == name:
                return actor
        return None

    def actor_for_user(self, user: str) -> ActorData | None:
        for actor in self.actors:
            if actor.user_part == user:
                return actor
        return None

    async def validate_request(self, request):
        logger.info("Validating request")
        application_actor_id = f"{scheme_for_request(request)}://{request.host}/actor"
        application, _ = bovine_actor_for_actor_data(
            application_actor_id, self.application_actor
        )
        async with aiohttp.ClientSession() as session:
            await application.init(session=session)

            async def key_retriever(key_id):
                logger.debug("Retrieving key for %s", key_id)
                result = await application.get(key_id)
                public_key, owner = actor_object_to_public_key(result, key_id)
                logger.debug("Got key %s for owner %s", public_key, owner)
                return CryptographicIdentifier.from_pem(public_key, owner)

            result = await build_validate_http_signature(key_retriever)(request)

            logger.debug("Got validation result %s", str(result))
            logger.debug(request.headers)
            logger.debug(request.full_path)
            return result

    @property
    def blueprint(self) -> Blueprint:
        """[Quart Blueprint][quart.blueprints.Blueprint] providing the endpoints

        ```plaintext
            - GET /.well-known/webfinger
            - GET /<actor_name
            - POST /<actor_name>/inbox
        ```
        """
        actor_blueprint = Blueprint("actors", __name__)

        @actor_blueprint.get("/.well-known/webfinger")
        async def webfinger():
            resource = request.args.get("resource")
            if not resource or not resource.startswith("acct:") or "@" not in resource:
                return "", 404

            acct = resource.removeprefix("acct:")
            user, domain = acct.split("@", 1)

            if domain != request.host:
                return "", 404

            actor = self.actor_for_user(user)
            if not actor:
                return "", 404
            actor_id = (
                f"{scheme_for_request(request)}://{request.host}/{actor.actor_name}"
            )
            return webfinger_response_json(resource, actor_id)

        @actor_blueprint.get("/<actor_name>")
        async def actor_get(actor_name):
            actor = self.actor_for_name(actor_name)
            if not actor:
                return "not found", 404
            if actor.requires_signed_get_for_actor:
                if not await self.validate_request(request):
                    return "unauthorized", 401
            actor_id = (
                f"{scheme_for_request(request)}://{request.host}/{actor.actor_name}"
            )
            _, data = bovine_actor_for_actor_data(actor_id, actor)
            return data.build(), 200, {"content-type": "application/activity+json"}

        @actor_blueprint.post("/<actor_name>/inbox")
        async def inbox_post(actor_name):
            actor = self.actor_for_name(actor_name)
            if not actor:
                return "not found", 404
            if actor.requires_signed_post_for_inbox:
                if not await self.validate_request(request):
                    return "unauthorized", 401

            if self.on_inbox:
                try:
                    data = await request.get_json()
                    await self.on_inbox(data)
                except Exception as e:
                    logger.warning("Something went wrong in inbox parsing")
                    logger.warning(repr(e))

            return "", 202

        @actor_blueprint.get("/<actor_name>/outbox")
        async def outbox(actor_name):
            return {
                "@context": "https://www.w3.org/ns/activitystreams",
                "id": f"http://pasture-one-actor/{actor_name}/outbox",
                "type": "OrderedCollection",
                "totalItems": 0,
            }, {"content-type": "application/activity"}

        return actor_blueprint

blueprint property

Quart Blueprint providing the endpoints

    - GET /.well-known/webfinger
    - GET /<actor_name
    - POST /<actor_name>/inbox

assets_blueprint_for_directory(directory)

Returns a Blueprint that serves the directory as static files. Files ending in .jsonap will be served with content-type application/activity+json.

Parameters:

Name Type Description Default
directory str

Directory to serve static files from

required
Source code in fediverse_pasture/server/__init__.py
def assets_blueprint_for_directory(directory: str) -> Blueprint:
    """Returns a [Blueprint][quart.blueprints.Blueprint] that serves the directory
    as static files. Files ending in `.jsonap` will be served with
    content-type `application/activity+json`.

    :param directory: Directory to serve static files from
    """

    blueprint = Blueprint("assets", __name__)

    @blueprint.get("/assets/<filename>")
    async def assets(filename):
        if filename.endswith(".jsonap"):
            return await send_from_directory(
                directory, filename, mimetype="application/activity+json"
            )
        return await send_from_directory(directory, filename)

    return blueprint

image_provider_blueprint

image_provider_blueprint = Blueprint('image_provider', __name__) module-attribute

Provides the capability to generate randomly colored 40 by 40 pixel images in various formats

get_filename(filename) async

Returns a random image. If filename ends with jpg, eps, gif, tiff, or webp an image of that type is returned otherwise png

Source code in fediverse_pasture/server/image_provider_blueprint.py
@image_provider_blueprint.get("/<filename>")
async def get_filename(filename):
    """Returns a random image. If filename ends with jpg, eps, gif, tiff, or webp an image of that type is returned otherwise png"""
    image_format, content_type = determine_format(filename)

    b = BytesIO()
    image = Image.new(
        "RGB", (40, 40), (randint(0, 255), randint(0, 255), randint(0, 255))
    )
    image.save(b, format=image_format)

    return b.getvalue(), 200, {"content-type": content_type}

nodeinfo_blueprint

nodeinfo() async

Returns the JRD corresponding to /.well-known/nodeinfo

Source code in fediverse_pasture/server/nodeinfo_blueprint.py
@nodeinfo_blueprint.get("/.well-known/nodeinfo")
async def nodeinfo() -> tuple[dict, int]:
    """Returns the JRD corresponding to `/.well-known/nodeinfo`"""
    path = urlparse(request.url)
    nodeinfo = JrdLink(
        rel="http://nodeinfo.diaspora.software/ns/schema/2.0",
        href=f"{path.scheme}://{path.netloc}/.well-known/nodeinfo2_0",
    )

    return (
        pydantic_to_json(JrdData(links=[nodeinfo])),
        200,
        {"content-type": "application/jrd+json"},
    )

utils

actor_object_to_public_key(actor, key_id)

As public_key_owner_from_dict, but applies context normalization first, then checks without out.

Source code in fediverse_pasture/server/utils.py
def actor_object_to_public_key(
    actor: dict, key_id: str
) -> Tuple[str | None, str | None]:
    """As [public_key_owner_from_dict][fediverse_pasture.server.utils.public_key_owner_from_dict], but applies context normalization first, then checks without out."""
    public_key, owner = public_key_owner_from_dict(with_external_context(actor), key_id)

    if public_key and owner:
        return public_key, owner

    return public_key_owner_from_dict(actor, key_id)

find_with_item(dict_list, key_id)

Given a list of dictionaries, finds the dictionary with id = key_id

Source code in fediverse_pasture/server/utils.py
def find_with_item(dict_list, key_id):
    """Given a list of dictionaries, finds the dictionary with
    id = key_id"""
    for key in dict_list:
        if key.get("id") == key_id:
            return key
    return None

public_key_owner_from_dict(actor, key_id)

Given an actor and key_id returns the public_key and the owner. This method directly checks the key publicKey

Source code in fediverse_pasture/server/utils.py
def public_key_owner_from_dict(
    actor: dict, key_id: str
) -> Tuple[str | None, str | None]:
    """Given an actor and key_id returns the public_key and the owner. This method directly checks the key `publicKey`"""

    public_key_data = actor.get("publicKey", {})

    if isinstance(public_key_data, list):
        if len(public_key_data) == 1:
            public_key_data = public_key_data[0]
        else:
            public_key_data = find_with_item(public_key_data, key_id)

    if not public_key_data:
        return None, None

    public_key = public_key_data.get("publicKeyPem")
    owner = public_key_data.get("owner")

    return public_key, owner

verify_actor

ActorVerifier dataclass

Class implementing the logic to verify a remote actor

Parameters:

Name Type Description Default
actor_list List[ActorData]

List of actors to run verification with

required
remote_uri str

URI of the remote actor. If acct-uri, it is resolved using webfinger

required
domain str

The domain the verification is done from

required
message Message

Message object containing an event log

required
remote_actor_uri str | None
None
scheme str
'http'
timeout int

The timeout for requests (in seconds)

20
Source code in fediverse_pasture/server/verify_actor.py
@dataclass
class ActorVerifier:
    """Class implementing the logic to verify a remote actor"""

    actor_list: List[ActorData] = field(
        metadata={"description": "List of actors to run verification with"}
    )
    remote_uri: str = field(
        metadata={
            "description": "URI of the remote actor. If acct-uri, it is resolved using webfinger"
        }
    )
    domain: str = field(
        metadata={"description": "The domain the verification is done from"}
    )
    message: Message = field(
        metadata={"description": "Message object containing an event log"}
    )

    remote_actor_uri: str | None = None
    scheme: str = "http"

    timeout: int = field(
        default=20, metadata={"description": "The timeout for requests (in seconds)"}
    )

    async def verify(self, only: dict = {}):
        """Public interface, this method loops over the actors in `actor_list`
        and verifies using them if the remote_uri is accessible.

        Returns a dictionary with actor.actor_name as key and the
        result as value.

        This function creates it's own aiohttp.ClientSession"""
        async with aiohttp.ClientSession() as session:
            await self.determine_remote_actor_uri(session)
            result = {
                actor.actor_name: await self.verify_for_actor(session, actor)
                for actor in self.actor_list
                if actor.actor_name != "actor" and only.get(actor.actor_name, True)
            }
            result["webfinger"] = await self.check_webfinger_result(session)

            return result

    @property
    def main_actor(self) -> ActorData | None:
        for actor in self.actor_list:
            if actor.actor_name == "actor":
                return actor
        return None

    async def init_bovine_actor(self, actor, session):
        actor_id = f"{self.scheme}://{self.domain}/{actor.actor_name}"
        bovine_actor, _ = bovine_actor_for_actor_data(actor_id, actor)
        await bovine_actor.init(session=session)

        return bovine_actor

    async def check_webfinger_result(self, session):
        bovine_actor = await self.init_bovine_actor(self.main_actor, session)

        try:
            async with asyncio.timeout(self.timeout):
                actor = await bovine_actor.get(self.remote_actor_uri)
        except Exception as e:
            logger.exception(e)
            return False

        if actor is None:
            self.message.error("Failed to fetch actor")
            return False

        remote_actor_uri = actor.get("id", self.remote_actor_uri)

        preferred_username = actor.get("preferredUsername")

        self.message.add(f"Got preferredUsername {preferred_username}")
        if preferred_username is None:
            return False

        domain = urlparse(self.remote_actor_uri).netloc
        acct_uri = f"acct:{preferred_username}@{domain}"

        self.message.add(f"computed acct uri {acct_uri}")

        try:
            object_id, _ = await lookup_uri_with_webfinger(
                session, acct_uri, f"{self.scheme}://{domain}"
            )
        except Exception as e:
            logger.exception(e)
            return False
        self.message.add(f"Retrieved id {object_id} using webfinger")

        if object_id == remote_actor_uri:
            self.message.add("webfinger result matches expectations")

            return True

        return False

    async def verify_for_actor(self, session, actor):
        bovine_actor = await self.init_bovine_actor(actor, session)
        self.message.add(f"Running verification for {actor.actor_name}")

        if actor.requires_signed_post_for_inbox:
            return await self.fetch_remote_and_post_using_actor(bovine_actor)
        else:
            return await self.fetch_remote_and_post_using_session(session)

    async def fetch_remote_and_post_using_actor(self, bovine_actor):
        result = {"get_actor": False, "post_inbox": False}

        try:
            async with asyncio.timeout(self.timeout):
                actor = await bovine_actor.get(self.remote_actor_uri)
                if actor is None:
                    self.message.add(
                        f"Failed to retrieve actor {self.remote_actor_uri}"
                    )
                    return result

                inbox = actor.get("inbox")
                self.message.add(f"Got inbox {inbox}")

                if inbox:
                    result["get_actor"] = True
                    try:
                        try:
                            response = await bovine_actor.post(
                                inbox,
                                data={
                                    "@context": "https://www.w3.org/ns/activitystreams",
                                    "type": "Like",
                                    "actor": bovine_actor.actor_id,
                                    "id": bovine_actor.actor_id
                                    + secrets.token_urlsafe(8),
                                    "object": bovine_actor.actor_id,
                                },
                            )
                            self.message.add("Successfully posted to inbox with result")
                            self.message.add(response.status)
                            self.message.add((await response.text()))

                        except aiohttp.ClientResponseError as e:
                            if e.status == 400:
                                self.message.add(
                                    "Successfully posted to inbox but remote server\
    indicated a bad request"
                                )
                            else:
                                raise e

                        result["post_inbox"] = True
                    except Exception as e:
                        self.message.add("Failed to post to inbox")
                        self.message.add(repr(e))

        except Exception as e:
            self.message.add("Something went wrong")
            self.message.add(repr(e))

        return result

    async def resolve_inbox_using_session(self, session) -> str:
        async with asyncio.timeout(self.timeout):
            async with session.get(
                self.remote_actor_uri,
                headers={"accept": "application/activity+json"},
            ) as response:
                actor = json.loads(await response.text())
            inbox = actor.get("inbox")
            self.message.add(f"Got inbox {inbox}")
            return inbox

    async def fetch_remote_and_post_using_session(self, session):
        result = {"get_actor": False, "post_inbox": False}

        try:
            inbox = await self.resolve_inbox_using_session(session)
        except Exception as e:
            self.message.add("Something went wrong when fetching actor")
            self.message.add(repr(e))
            return result

        if inbox:
            result["get_actor"] = True
            try:
                async with asyncio.timeout(self.timeout):
                    async with session.post(
                        inbox,
                        data=json.dumps(
                            {
                                "@context": "https://www.w3.org/ns/activitystreams",
                                "type": "EchoRequest",
                            }
                        ),
                        headers={"content-type": "application/activity+json"},
                    ) as response:
                        self.message.add(f"Got {response.status} for unsigned POST")

                        if response.status < 400 and response.status > 100:
                            result["post_inbox"] = True
            except Exception as e:
                self.message.add("Something went wrong when posting to inbox")
                self.message.add(repr(e))
                return result

        return result

    async def determine_remote_actor_uri(self, session) -> None:
        """Used to resolve an acct-URI into the corresponding http/https-URI
        using webfinger"""
        if can_be_resolved(self.remote_uri):
            self.remote_actor_uri = self.remote_uri
            self.message.add(f"Can fetch actor from {self.remote_uri}")
            return

        self.message.add(f"Need to resolve {self.remote_uri} to actor object id")

        if self.remote_uri.startswith("acct:"):
            acct_uri = self.remote_uri
        else:
            self.message.add("Not in account uri format")

            if self.remote_uri[0] == "@":
                acct_uri = "acct:" + self.remote_uri[1:]
            else:
                acct_uri = "acct:" + self.remote_uri

        if "@" not in acct_uri:
            self.message.add(f"Computed invalid account URI {acct_uri}")
            return
        domain = acct_uri.split("@")[1]

        self.message.add(f"Resolving {acct_uri} using webfinger")

        try:
            object_id, _ = await lookup_uri_with_webfinger(
                session, acct_uri, f"{self.scheme}://{domain}"
            )
        except Exception as e:
            logger.exception(e)
            return

        self.message.add(f"Resolved to {object_id}")
        self.remote_actor_uri = object_id

determine_remote_actor_uri(session) async

Used to resolve an acct-URI into the corresponding http/https-URI using webfinger

Source code in fediverse_pasture/server/verify_actor.py
async def determine_remote_actor_uri(self, session) -> None:
    """Used to resolve an acct-URI into the corresponding http/https-URI
    using webfinger"""
    if can_be_resolved(self.remote_uri):
        self.remote_actor_uri = self.remote_uri
        self.message.add(f"Can fetch actor from {self.remote_uri}")
        return

    self.message.add(f"Need to resolve {self.remote_uri} to actor object id")

    if self.remote_uri.startswith("acct:"):
        acct_uri = self.remote_uri
    else:
        self.message.add("Not in account uri format")

        if self.remote_uri[0] == "@":
            acct_uri = "acct:" + self.remote_uri[1:]
        else:
            acct_uri = "acct:" + self.remote_uri

    if "@" not in acct_uri:
        self.message.add(f"Computed invalid account URI {acct_uri}")
        return
    domain = acct_uri.split("@")[1]

    self.message.add(f"Resolving {acct_uri} using webfinger")

    try:
        object_id, _ = await lookup_uri_with_webfinger(
            session, acct_uri, f"{self.scheme}://{domain}"
        )
    except Exception as e:
        logger.exception(e)
        return

    self.message.add(f"Resolved to {object_id}")
    self.remote_actor_uri = object_id

verify(only={}) async

Public interface, this method loops over the actors in actor_list and verifies using them if the remote_uri is accessible.

Returns a dictionary with actor.actor_name as key and the result as value.

This function creates it’s own aiohttp.ClientSession

Source code in fediverse_pasture/server/verify_actor.py
async def verify(self, only: dict = {}):
    """Public interface, this method loops over the actors in `actor_list`
    and verifies using them if the remote_uri is accessible.

    Returns a dictionary with actor.actor_name as key and the
    result as value.

    This function creates it's own aiohttp.ClientSession"""
    async with aiohttp.ClientSession() as session:
        await self.determine_remote_actor_uri(session)
        result = {
            actor.actor_name: await self.verify_for_actor(session, actor)
            for actor in self.actor_list
            if actor.actor_name != "actor" and only.get(actor.actor_name, True)
        }
        result["webfinger"] = await self.check_webfinger_result(session)

        return result