Skip to content

fediverse_pasture.runner

ActivityRunner dataclass

Coordinates sending an activity to many applications through an ActivitySender instances

Parameters:

Name Type Description Default
activity_sender ActivitySender

an activity sender

required
applications List[ApplicationAdapterForLastActivity]

list of applications to run against

required
wait_time float

Time in seconds between trying to fetch the activity from remote servers

0.5
tries int

Number of tries to fetch activity from remote servers

20
skip_fetch bool

Skips fetching the result

False
Source code in fediverse_pasture/runner/__init__.py
@dataclass
class ActivityRunner:
    """Coordinates sending an activity to many applications through an ActivitySender
    instances"""

    activity_sender: ActivitySender = field(
        metadata={"description": " an activity sender"}
    )
    applications: List[ApplicationAdapterForLastActivity] = field(
        metadata={"description": "list of applications to run against"}
    )

    wait_time: float = field(
        default=0.5,
        metadata={
            "description": """Time in seconds between trying to fetch the activity from remote servers"""
        },
    )

    tries: int = field(
        default=20,
        metadata={
            "description": """Number of tries to fetch activity from remote servers"""
        },
    )

    skip_fetch: bool = field(
        default=False, metadata={"description": """Skips fetching the result"""}
    )

    async def fetch_activity(self, application, object_id):
        for _ in range(self.tries):
            result = await application.fetch_activity(object_id)
            if result:
                return result
            await asyncio.sleep(self.wait_time)

    async def run_for_modifier(self, modifier: Callable[[dict], dict]):
        """modifier has the same format as for ActivitySender

        :param modifier: modifies the base object being send"""
        self.activity_sender.init_create_note(modifier)

        async with asyncio.TaskGroup() as tg:
            for application in self.applications:
                tg.create_task(self.activity_sender.send(application.actor_uri))

        if self.skip_fetch:
            return

        await asyncio.sleep(self.wait_time)

        result = {"activity": self.activity_sender.activity}

        for application in self.applications:
            result[application.application_name] = await self.fetch_activity(
                application, self.activity_sender.object_id
            )

        return result

run_for_modifier(modifier) async

modifier has the same format as for ActivitySender

Parameters:

Name Type Description Default
modifier Callable[[dict], dict]

modifies the base object being send

required
Source code in fediverse_pasture/runner/__init__.py
async def run_for_modifier(self, modifier: Callable[[dict], dict]):
    """modifier has the same format as for ActivitySender

    :param modifier: modifies the base object being send"""
    self.activity_sender.init_create_note(modifier)

    async with asyncio.TaskGroup() as tg:
        for application in self.applications:
            tg.create_task(self.activity_sender.send(application.actor_uri))

    if self.skip_fetch:
        return

    await asyncio.sleep(self.wait_time)

    result = {"activity": self.activity_sender.activity}

    for application in self.applications:
        result[application.application_name] = await self.fetch_activity(
            application, self.activity_sender.object_id
        )

    return result

ActivitySender dataclass

The ActivitySender class serves as a way to coordinate the process of sending the same activity to multiple Fediverse applications.

The basic process is

  • Create an Activity with a published timestamp stored in published
  • Send this activity to applications using send
  • Retrieve the result from said applications

The usual way to create an ActivitySender is the for_actor method, i.e.

activity_sender = ActivitySender.for_actor(bovine_actor, actor_object)

Parameters:

Name Type Description Default
activity_factory ActivityFactory
required
object_factory ObjectFactory
required
bovine_actor BovineActor
required
make_id Callable
required
published datetime | None
None
note dict | None
None
activity dict | None
None
object_id str | None
None
sleep_after_getting_inbox bool
True
include_mention bool

Set to True if the created Note should mention the target

False
replace_https_with_http bool

Replaces ‘https://’ with ‘http://’ in the actor URI and the resolved inbox. Useful to handle applications not supporting a configuration value dedicated to handling the http based Fediverse.

False
public_value str

The default value to use for public, see https://codeberg.org/funfedidev/python_fediverse_pasture/issues/41

'https://www.w3.org/ns/activitystreams#Public'
Source code in fediverse_pasture/runner/__init__.py
@dataclass
class ActivitySender:
    """The ActivitySender class serves as a way to coordinate the process
    of sending the same activity to multiple Fediverse applications.

    The basic process is

    * Create an Activity with a published timestamp stored in published
    * Send this activity to applications using `send`
    * Retrieve the result from said applications

    The usual way to create an ActivitySender is the `for_actor` method,
    i.e.

    ```python
    activity_sender = ActivitySender.for_actor(bovine_actor, actor_object)
    ```
    """

    activity_factory: ActivityFactory
    object_factory: ObjectFactory
    bovine_actor: BovineActor
    make_id: Callable

    published: datetime | None = field(default=None)
    note: dict | None = field(default=None)
    activity: dict | None = field(default=None)
    object_id: str | None = field(default=None)

    sleep_after_getting_inbox: bool = field(default=True)

    include_mention: bool = field(
        default=False,
        metadata={
            "description": "Set to True if the created Note should mention the target"
        },
    )

    replace_https_with_http: bool = field(
        default=False,
        metadata={
            "description": "Replaces 'https://' with 'http://' in the actor URI and the resolved inbox. Useful to handle applications not supporting a configuration value dedicated to handling the http based Fediverse."
        },
    )

    public_value: str = field(
        default="https://www.w3.org/ns/activitystreams#Public",
        metadata={
            "description": "The default value to use for public, see https://codeberg.org/funfedidev/python_fediverse_pasture/issues/41"
        },
    )

    _inbox_cache: dict[str, str] = field(
        default_factory=dict,
        metadata={"description": "cacched values for the inbox lookup"},
    )

    def init_create_note(self, modifier: MessageModifier):
        """Sets activity to a Create for a Note. Here the Note is
        constructed from a skeleton by applying `modifier` to it.
        To successfully send the note to most applications, modifier
        should set the Note's content, i.e.

        ```python
        >>> from bovine.testing import actor
        >>> from unittest.mock import AsyncMock, Mock
        >>> actor_mock = Mock(id=actor["id"])
        >>> actor_mock.build.return_value = actor
        >>> sender = ActivitySender.for_actor(AsyncMock(), actor_mock)
        >>> sender.init_create_note(lambda x: {**x, "content": "text"})
        >>> sender.note
        {'type': 'Note',
            'attributedTo': 'http://actor.example',
            'to': ['https://www.w3.org/ns/activitystreams#Public'],
            'id': 'http://actor.example/object/...
            'published': '...
            'content': 'text'}

        ```

        This method can be used to create objects of other types
        by overriding "type".
        """
        self.object_id = self.make_id()

        note = self.object_factory.note(
            id=self.object_id,
            to={self.public_value},
        ).build()
        del note["@context"]
        self.note = modifier(note)
        self.published = datetime.fromisoformat(note["published"].removesuffix("Z"))

    async def send(self, remote: str):
        """Sends the activity to the remote user

        :param remote: Actor URI of the remote user"""

        if self.note is None:
            return

        note = {
            **self.note,
        }
        note["to"] = note["to"] + [remote]

        if self.include_mention:
            mention = {"type": "Mention", "href": remote}
            note["tag"] = combine_as_list(note.get("tag", []), [mention])

        create = self.activity_factory.create(
            note, id=self.make_id(activity=True)
        ).build()

        if "@context" in note:
            create["@context"] = note["@context"]
        else:
            create["@context"] = [
                create["@context"],
                {"Hashtag": "as:Hashtag", "sensitive": "as:sensitive"},
            ]
        self.activity = create

        try:
            remote_inbox = await self._resolve_inbox(remote)
            return await self.bovine_actor.post(remote_inbox, create)
        except ClientResponseError as e:
            logger.warning("Posting to inbox of %s failed with %s", remote, e.status)
            logger.exception(e)
            return None

    async def _resolve_inbox(self, remote: str) -> str:
        if remote in self._inbox_cache:
            return self._inbox_cache[remote]

        result = await self.bovine_actor.get(self.adjust_uri(remote))
        if not isinstance(result, dict) or "inbox" not in result:
            raise Exception(
                "Remote %s does not have an inbox, cannot post activity", remote
            )

        remote_inbox = self.adjust_uri(result["inbox"])

        self._inbox_cache[remote] = remote_inbox

        if self.sleep_after_getting_inbox:
            #
            # See https://codeberg.org/helge/funfedidev/issues/138#issuecomment-1640700
            #
            await asyncio.sleep(0.1)

        return remote_inbox

    def adjust_uri(self, uri):
        if not self.replace_https_with_http:
            return uri

        return uri.replace("https://", "http://")

    @staticmethod
    def for_actor(bovine_actor: BovineActor, actor_object: Actor):
        """Initializes the Activity Sender object for a given BovineActor
        and the corresponding actor object"""
        activity_factory, object_factory = factories_for_actor_object(
            actor_object.build()
        )

        if actor_object.id is None:
            raise ValueError("Actor id should be part of actor object")

        return ActivitySender(
            activity_factory=activity_factory,
            object_factory=object_factory,
            bovine_actor=bovine_actor,
            make_id=create_make_id_for_actor_id(actor_object.id),
        )

for_actor(bovine_actor, actor_object) staticmethod

Initializes the Activity Sender object for a given BovineActor and the corresponding actor object

Source code in fediverse_pasture/runner/__init__.py
@staticmethod
def for_actor(bovine_actor: BovineActor, actor_object: Actor):
    """Initializes the Activity Sender object for a given BovineActor
    and the corresponding actor object"""
    activity_factory, object_factory = factories_for_actor_object(
        actor_object.build()
    )

    if actor_object.id is None:
        raise ValueError("Actor id should be part of actor object")

    return ActivitySender(
        activity_factory=activity_factory,
        object_factory=object_factory,
        bovine_actor=bovine_actor,
        make_id=create_make_id_for_actor_id(actor_object.id),
    )

init_create_note(modifier)

Sets activity to a Create for a Note. Here the Note is constructed from a skeleton by applying modifier to it. To successfully send the note to most applications, modifier should set the Note’s content, i.e.

>>> from bovine.testing import actor
>>> from unittest.mock import AsyncMock, Mock
>>> actor_mock = Mock(id=actor["id"])
>>> actor_mock.build.return_value = actor
>>> sender = ActivitySender.for_actor(AsyncMock(), actor_mock)
>>> sender.init_create_note(lambda x: {**x, "content": "text"})
>>> sender.note
{'type': 'Note',
    'attributedTo': 'http://actor.example',
    'to': ['https://www.w3.org/ns/activitystreams#Public'],
    'id': 'http://actor.example/object/...
    'published': '...
    'content': 'text'}

This method can be used to create objects of other types by overriding “type”.

Source code in fediverse_pasture/runner/__init__.py
def init_create_note(self, modifier: MessageModifier):
    """Sets activity to a Create for a Note. Here the Note is
    constructed from a skeleton by applying `modifier` to it.
    To successfully send the note to most applications, modifier
    should set the Note's content, i.e.

    ```python
    >>> from bovine.testing import actor
    >>> from unittest.mock import AsyncMock, Mock
    >>> actor_mock = Mock(id=actor["id"])
    >>> actor_mock.build.return_value = actor
    >>> sender = ActivitySender.for_actor(AsyncMock(), actor_mock)
    >>> sender.init_create_note(lambda x: {**x, "content": "text"})
    >>> sender.note
    {'type': 'Note',
        'attributedTo': 'http://actor.example',
        'to': ['https://www.w3.org/ns/activitystreams#Public'],
        'id': 'http://actor.example/object/...
        'published': '...
        'content': 'text'}

    ```

    This method can be used to create objects of other types
    by overriding "type".
    """
    self.object_id = self.make_id()

    note = self.object_factory.note(
        id=self.object_id,
        to={self.public_value},
    ).build()
    del note["@context"]
    self.note = modifier(note)
    self.published = datetime.fromisoformat(note["published"].removesuffix("Z"))

send(remote) async

Sends the activity to the remote user

Parameters:

Name Type Description Default
remote str

Actor URI of the remote user

required
Source code in fediverse_pasture/runner/__init__.py
async def send(self, remote: str):
    """Sends the activity to the remote user

    :param remote: Actor URI of the remote user"""

    if self.note is None:
        return

    note = {
        **self.note,
    }
    note["to"] = note["to"] + [remote]

    if self.include_mention:
        mention = {"type": "Mention", "href": remote}
        note["tag"] = combine_as_list(note.get("tag", []), [mention])

    create = self.activity_factory.create(
        note, id=self.make_id(activity=True)
    ).build()

    if "@context" in note:
        create["@context"] = note["@context"]
    else:
        create["@context"] = [
            create["@context"],
            {"Hashtag": "as:Hashtag", "sensitive": "as:sensitive"},
        ]
    self.activity = create

    try:
        remote_inbox = await self._resolve_inbox(remote)
        return await self.bovine_actor.post(remote_inbox, create)
    except ClientResponseError as e:
        logger.warning("Posting to inbox of %s failed with %s", remote, e.status)
        logger.exception(e)
        return None

application

app_name_to_coroutine = {None: mastodon_likes, 'bovine': activity_for_bovine_provider('bovine', 'vanilla', 'z3u2Yxcowsarethebestcowsarethebestcowsarethebest'), 'misskey': activity_for_misskey_provider('misskey', 'kitty'), 'mitra': mitra_app, 'firefish': firefish_app, 'hubzilla': hubzilla_app, 'sharkey': activity_for_mastodon_provider('sharkey', 'willy', 'token', determine_actor_uri=True, application_name='sharkey'), 'custom': custom_app} module-attribute

Used to lookup the coroutine to get the application object

activity_for_firefish(domain, username, session) async

Creates a ApplicationAdapterForLastActivity object for connecting to firefish. Example usage:

firefish = await activity_for_firefish("firefish_web", "admin", session)
Source code in fediverse_pasture/runner/application/__init__.py
async def activity_for_firefish(
    domain: str, username: str, session: aiohttp.ClientSession
) -> ApplicationAdapterForLastActivity:
    """Creates a ApplicationAdapterForLastActivity object for connecting to
    firefish. Example usage:

    ```python
    firefish = await activity_for_firefish("firefish_web", "admin", session)
    ```
    """
    firefish = FirefishApplication(domain=domain, username=username)

    return await firefish.last_activity(session)

activity_for_mastodon(domain, username, access_token, session, application_name='mastodon', determine_actor_uri=False) async

Creates a ApplicationAdapterForLastActivity object for connecting to mastodon. Example usage:

mastodon = await activity_for_mastodon("mastodon_web", "bob", "xxx", session)
Source code in fediverse_pasture/runner/application/__init__.py
async def activity_for_mastodon(
    domain: str,
    username: str,
    access_token: str,
    session: aiohttp.ClientSession,
    application_name: str = "mastodon",
    determine_actor_uri: bool = False,
) -> ApplicationAdapterForLastActivity:
    """Creates a ApplicationAdapterForLastActivity object for connecting to
    mastodon. Example usage:

    ```python
    mastodon = await activity_for_mastodon("mastodon_web", "bob", "xxx", session)
    ```
    """

    actor_uri = None

    if determine_actor_uri:
        actor_uri, _ = await lookup_uri_with_webfinger(
            session, f"acct:{username}@{domain}", f"http://{domain}"
        )

    mastodon = MastodonApplication(
        domain=domain, access_token=access_token, username=username, actor_uri=actor_uri
    )

    return mastodon.last_activity(session, application_name=application_name)

actor_for_application(account_uri, application_name, session) async

Creates a ApplicationAdapterForActor

Parameters:

Name Type Description Default
account_uri str

The acct uri, e.g. acct:user@domain

required
application_name str

The name of the application

required
session ClientSession

the aiohttp ClientSession

required
Source code in fediverse_pasture/runner/application/__init__.py
async def actor_for_application(
    account_uri: str, application_name: str, session: aiohttp.ClientSession
) -> ApplicationAdapterForActor:
    """Creates a ApplicationAdapterForActor

    :param account_uri: The acct uri, e.g. `acct:user@domain`
    :param application_name: The name of the application
    :param session: the aiohttp ClientSession
    """

    domain = account_uri.split("@")[1]

    actor_uri, _ = await lookup_uri_with_webfinger(
        session, account_uri, f"http://{domain}"
    )

    if not actor_uri:
        raise ValueError(f"Actor not found with URI {account_uri}")

    return ApplicationAdapterForActor(
        actor_uri=actor_uri, application_name=application_name
    )

bovine

BovineApplication dataclass

BovineApplication(domain: str, username: str, secret: str, actor_uri: str | None = None, client: bovine.BovineClient | None = None)

Parameters:

Name Type Description Default
domain str
required
username str
required
secret str
required
actor_uri str | None
None
client BovineClient | None
None
Source code in fediverse_pasture/runner/application/bovine.py
@dataclass
class BovineApplication:
    domain: str
    username: str
    secret: str
    actor_uri: str | None = None
    client: BovineClient | None = None

    async def fetch_activity(self, published):
        try:
            inbox = self.client.inbox()

            result = await inbox.__anext__()
            parsed_published = datetime.fromisoformat(
                result["object"]["published"].removesuffix("Z")
            )
            if parsed_published == published:
                return result.get("object")
        except Exception as e:
            print("Something went wrong with", repr(e))
        return None

    async def last_activity(
        self, session: aiohttp.ClientSession
    ) -> ApplicationAdapterForLastActivity:
        self.client = BovineClient(
            domain=f"http://{self.domain}",
            secret=self.secret,
        )
        await self.client.init(session=session)

        return ApplicationAdapterForLastActivity(
            actor_uri=self.client.actor_id,
            fetch_activity=self.fetch_activity,
            application_name=self.domain,
        )

firefish

FirefishApplication dataclass

Used to query a firefish application

Parameters:

Name Type Description Default
domain str
required
username str
required
session ClientSession | None
None
token str
'VvGrKxhpIzJ1PomJFTBObYWOELgGniVi'
firefish_id str | None
None
Source code in fediverse_pasture/runner/application/firefish.py
@dataclass
class FirefishApplication:
    """Used to query a firefish application"""

    domain: str
    username: str
    session: aiohttp.ClientSession | None = None
    token: str = "VvGrKxhpIzJ1PomJFTBObYWOELgGniVi"

    firefish_id: str | None = None

    async def determine_actor_uri(self):
        actor_uri, _ = await bovine.clients.lookup_uri_with_webfinger(
            self.session, f"acct:{self.username}@{self.domain}", f"http://{self.domain}"
        )
        return actor_uri

    async def determine_actor_firefish_id(self):
        """Determines the firefish id for the user on the
        one_actor server, i.e. the one with hostname
        `pasture_one_actor"""
        response = await self.session.post(
            f"http://{self.domain}/api/users/search", data={"query": "actor"}
        )
        users = await response.json()

        for user in users:
            if user.get("host") == "pasture_one_actor":
                return user.get("id")

    async def user_post_with_published(self, published: datetime):
        if not self.firefish_id:
            self.firefish_id = await self.determine_actor_firefish_id()

        response = await self.session.post(
            f"http://{self.domain}/api/users/notes",
            data={"userId": self.firefish_id},
            headers={"Authorization": f"Bearer {self.token}"},
        )
        notes = await response.json()

        for data in notes:
            created_at = data.get("createdAt")
            created_at = datetime.fromisoformat(created_at.removesuffix("Z"))
            if created_at == published:
                return data

        return None

    async def top_public(self):
        response = await self.session.post(
            f"http://{self.domain}/api/notes/global-timeline"
        )
        public_timeline = await response.json()
        return public_timeline[0]

    async def top_public_with_published(self, published: datetime) -> dict | None:
        data = await self.top_public()
        created_at = data.get("createdAt")
        created_at = datetime.fromisoformat(created_at.removesuffix("Z"))
        if created_at == published:
            return data
        return None

    async def last_activity(
        self, session: aiohttp.ClientSession
    ) -> ApplicationAdapterForLastActivity:
        self.session = session

        actor_uri = await self.determine_actor_uri()

        return ApplicationAdapterForLastActivity(
            actor_uri=actor_uri,
            fetch_activity=self.user_post_with_published,
            application_name="firefish",
        )
determine_actor_firefish_id() async

Determines the firefish id for the user on the one_actor server, i.e. the one with hostname `pasture_one_actor

Source code in fediverse_pasture/runner/application/firefish.py
async def determine_actor_firefish_id(self):
    """Determines the firefish id for the user on the
    one_actor server, i.e. the one with hostname
    `pasture_one_actor"""
    response = await self.session.post(
        f"http://{self.domain}/api/users/search", data={"query": "actor"}
    )
    users = await response.json()

    for user in users:
        if user.get("host") == "pasture_one_actor":
            return user.get("id")

mastodon

MastodonApplication dataclass

MastodonApplication(domain: str, access_token: str, username: str, client: bovine.clients.bearer.BearerAuthClient | None = None, actor_uri: str | None = None)

Parameters:

Name Type Description Default
domain str
required
access_token str
required
username str
required
client BearerAuthClient | None
None
actor_uri str | None
None
Source code in fediverse_pasture/runner/application/mastodon.py
@dataclass
class MastodonApplication:
    domain: str
    access_token: str
    username: str
    client: BearerAuthClient | None = None
    actor_uri: str | None = None

    def determine_actor_uri(self):
        if self.actor_uri:
            return self.actor_uri
        return f"http://{self.domain}/users/{self.username}"

    async def public_items(self):
        response = await self.client.get(
            f"http://{self.domain}/api/v1/timelines/public"
        )
        public_timeline = await response.json()
        logger.debug(json.dumps(public_timeline, indent=2))

        return public_timeline

    async def top_public_with_object_id(self, object_id: str) -> dict | None:
        for item in await self.public_items():
            if object_id in json.dumps(item):
                return item

        return None

    def last_activity(
        self, session: aiohttp.ClientSession, application_name: str = "mastodon"
    ) -> ApplicationAdapterForLastActivity:
        self.client = BearerAuthClient(session, self.access_token)

        return ApplicationAdapterForLastActivity(
            actor_uri=self.determine_actor_uri(),
            fetch_activity=self.top_public_with_object_id,
            application_name=application_name,
        )

misskey

MisskeyApplication dataclass

Used to query a misskey application

Parameters:

Name Type Description Default
domain str
required
username str
required
session ClientSession | None
None
misskey_id str | None
None
Source code in fediverse_pasture/runner/application/misskey.py
@dataclass
class MisskeyApplication:
    """Used to query a misskey application"""

    domain: str
    username: str
    session: aiohttp.ClientSession | None = None

    misskey_id: str | None = None

    async def determine_actor_uri(self):
        actor_uri, _ = await bovine.clients.lookup_uri_with_webfinger(
            self.session, f"acct:{self.username}@{self.domain}", f"http://{self.domain}"
        )
        return actor_uri

    async def determine_misskey_id(self):
        response = await self.session.post(
            f"http://{self.domain}/api/users/search",
            json={"query": "actor"},
            headers={"content-type": "application/json"},
        )
        users = await response.json()

        for user in users:
            if user.get("host") == "pasture-one-actor":
                return user.get("id")

    async def public_posts(self):
        if not self.misskey_id:
            self.misskey_id = await self.determine_misskey_id()

        response = await self.session.post(
            "http://misskey/api/users/notes",
            json={"userId": self.misskey_id},
            headers={"Authorization": "Bearer token"},
        )
        return await response.json()

    async def user_post_with_object_id(self, object_id: str):
        notes = await self.public_posts()

        for data in notes:
            if object_id in json.dumps(data):
                return data

    async def last_activity(self) -> ApplicationAdapterForLastActivity:
        actor_uri = await self.determine_actor_uri()

        return ApplicationAdapterForLastActivity(
            actor_uri=actor_uri,
            fetch_activity=self.user_post_with_object_id,
            application_name="misskey",
        )

entry

Entry dataclass

Represents an entry to generate support tables from.

Parameters:

Name Type Description Default
entry Dict[str, Dict]

Dictionary indexed by application with values the result

required
Source code in fediverse_pasture/runner/entry.py
@dataclass
class Entry:
    """Represents an entry to generate support tables from."""

    entry: Dict[str, Dict] = field(
        metadata={
            "description": "Dictionary indexed by application with values the result"
        }
    )

    @property
    def applications(self) -> Set[str]:
        """Returns the set of application names"""
        return set(self.entry.keys())

    @property
    def activity(self) -> Dict:
        result = self.entry.get("activity")
        if result is None:
            raise ValueError("No activity data found in entry")
        return result

    @property
    def object(self):
        activity = self.activity
        obj = activity.get("object", {})
        return {"@context": activity["@context"], **obj}

    def present_for(self, application):
        return application in self.entry

    def as_tabs(self, apps: List[str]) -> List[str]:
        """Renders the data for each application as tabbed markdown.

        If no data is present "no result" is shown.

        :param apps: List of applications to display"""
        lines = []
        for application_name in apps:
            lines.append(f"""=== "{application_name}"\n""")

            if application_name in self.entry:
                text = f"""```json title="{application_name}"\n"""
                text += json.dumps(self.entry[application_name], indent=2)
                text += "\n```\n"
                lines += textwrap.indent(text, " " * 4).split("\n")
            else:
                lines.append("    no result")

            lines += [""]

        return [x + "\n" for x in lines]

    def as_grid(self, apps: List[str]) -> List[str]:
        """Renders the data for each application as a grid.

        If no data is present "no result" is shown.

        :param apps: List of applications to display"""
        lines = ["""<div class="grid" markdown>\n"""]
        for application_name in apps:
            if application_name in self.entry:
                text = f"""```json title="{application_name}"\n"""
                text += json.dumps(self.entry[application_name], indent=2)
                text += "\n```\n"
                lines += textwrap.indent(text, " " * 0).split("\n")
            else:
                lines.append("no result")

            lines += [""]

        lines.append("</div>")

        return [x + "\n" for x in lines]

    def apply_to(
        self, application: str, function: Callable[[Dict], List[str]]
    ) -> List[str]:
        """Applies the function to the entry for the given application.

        :param application: The application name
        :param function: extractor for the desired data"""
        try:
            data = self.entry.get(application)
            return function(data)
        except Exception as e:
            logger.exception(e)
            return ["-"]

    @staticmethod
    def from_result_list(result):
        val = {x["application_name"]: x for x in result}
        for x in val:
            del val[x]["application_name"]
        return Entry(val)

applications property

Returns the set of application names

apply_to(application, function)

Applies the function to the entry for the given application.

Parameters:

Name Type Description Default
application str

The application name

required
function Callable[[Dict], List[str]]

extractor for the desired data

required
Source code in fediverse_pasture/runner/entry.py
def apply_to(
    self, application: str, function: Callable[[Dict], List[str]]
) -> List[str]:
    """Applies the function to the entry for the given application.

    :param application: The application name
    :param function: extractor for the desired data"""
    try:
        data = self.entry.get(application)
        return function(data)
    except Exception as e:
        logger.exception(e)
        return ["-"]

as_grid(apps)

Renders the data for each application as a grid.

If no data is present “no result” is shown.

Parameters:

Name Type Description Default
apps List[str]

List of applications to display

required
Source code in fediverse_pasture/runner/entry.py
def as_grid(self, apps: List[str]) -> List[str]:
    """Renders the data for each application as a grid.

    If no data is present "no result" is shown.

    :param apps: List of applications to display"""
    lines = ["""<div class="grid" markdown>\n"""]
    for application_name in apps:
        if application_name in self.entry:
            text = f"""```json title="{application_name}"\n"""
            text += json.dumps(self.entry[application_name], indent=2)
            text += "\n```\n"
            lines += textwrap.indent(text, " " * 0).split("\n")
        else:
            lines.append("no result")

        lines += [""]

    lines.append("</div>")

    return [x + "\n" for x in lines]

as_tabs(apps)

Renders the data for each application as tabbed markdown.

If no data is present “no result” is shown.

Parameters:

Name Type Description Default
apps List[str]

List of applications to display

required
Source code in fediverse_pasture/runner/entry.py
def as_tabs(self, apps: List[str]) -> List[str]:
    """Renders the data for each application as tabbed markdown.

    If no data is present "no result" is shown.

    :param apps: List of applications to display"""
    lines = []
    for application_name in apps:
        lines.append(f"""=== "{application_name}"\n""")

        if application_name in self.entry:
            text = f"""```json title="{application_name}"\n"""
            text += json.dumps(self.entry[application_name], indent=2)
            text += "\n```\n"
            lines += textwrap.indent(text, " " * 4).split("\n")
        else:
            lines.append("    no result")

        lines += [""]

    return [x + "\n" for x in lines]

result_store

ResultStore

Source code in fediverse_pasture/runner/result_store.py
class ResultStore:
    async def add_result(self, test_name: str, application_name: str, data: dict):
        """Adds a result to the database. The pairs (test_name, application_name)
        are assumed to be unique. If data already exists, it is overwritten with
        the new data.

        :param test_name: Name of the test
        :param application_name: Name of the application
        :param data: Data to add
        """
        await TestRecord.update_or_create(
            test_name=test_name,
            application_name=application_name,
            defaults={"data": data},
        )

    async def delete_record(self, test_name: str, application_name: str):
        """Deletes database record if exists

        :param test_name: Name of the test
        :param application_name: Application to delete the result for
        """
        record = await TestRecord.get_or_none(
            test_name=test_name,
            application_name=application_name,
        )
        if record:
            await record.delete()

    async def results_for_test(self, test_name: str) -> list:
        """Retrieves the results for a given test_name"""
        result = await TestRecord.filter(test_name=test_name).all()

        return [{"application_name": x.application_name, **x.data} for x in result]

    async def entry_for_test(self, test_name: str) -> Entry:
        """Returns an Entry for test_name. We note here that an
        Entry contains the results for each application the test result
        is available for.

        :param test_name: name of the test entry
        :return: Entry from record
        """
        result = await TestRecord.filter(test_name=test_name).all()

        return Entry({x.application_name: x.data for x in result})

    async def as_list(self) -> list:
        records = await TestRecord.filter().order_by("test_name", "application_name")

        return [
            {
                "test_name": record.test_name,
                "application_name": record.application_name,
                "data": json.dumps(record.data, indent=2),
            }
            for record in records
        ]

    async def load(self, filename: str = "test_results.toml"):
        """Deletes the current content from the database, then loads
        the content from filename to it.

        :param filename: filename to load from"""

        await TestRecord.filter().delete()

        with open(filename, "rb") as fp:
            data = tomllib.load(fp)

        for x in data["entries"]:
            await self.add_result(
                test_name=x["test_name"],
                application_name=x["application_name"],
                data=json.loads(x["data"]),
            )

    async def save(self, filename: str = "test_results.toml"):
        """Saves the content of the database to the file given by filename

        :param filename: filename to save to
        """

        with open(filename, "wb") as fp:
            tomli_w.dump({"entries": await self.as_list()}, fp, multiline_strings=True)

add_result(test_name, application_name, data) async

Adds a result to the database. The pairs (test_name, application_name) are assumed to be unique. If data already exists, it is overwritten with the new data.

Parameters:

Name Type Description Default
test_name str

Name of the test

required
application_name str

Name of the application

required
data dict

Data to add

required
Source code in fediverse_pasture/runner/result_store.py
async def add_result(self, test_name: str, application_name: str, data: dict):
    """Adds a result to the database. The pairs (test_name, application_name)
    are assumed to be unique. If data already exists, it is overwritten with
    the new data.

    :param test_name: Name of the test
    :param application_name: Name of the application
    :param data: Data to add
    """
    await TestRecord.update_or_create(
        test_name=test_name,
        application_name=application_name,
        defaults={"data": data},
    )

delete_record(test_name, application_name) async

Deletes database record if exists

Parameters:

Name Type Description Default
test_name str

Name of the test

required
application_name str

Application to delete the result for

required
Source code in fediverse_pasture/runner/result_store.py
async def delete_record(self, test_name: str, application_name: str):
    """Deletes database record if exists

    :param test_name: Name of the test
    :param application_name: Application to delete the result for
    """
    record = await TestRecord.get_or_none(
        test_name=test_name,
        application_name=application_name,
    )
    if record:
        await record.delete()

entry_for_test(test_name) async

Returns an Entry for test_name. We note here that an Entry contains the results for each application the test result is available for.

Parameters:

Name Type Description Default
test_name str

name of the test entry

required

Returns:

Type Description
Entry

Entry from record

Source code in fediverse_pasture/runner/result_store.py
async def entry_for_test(self, test_name: str) -> Entry:
    """Returns an Entry for test_name. We note here that an
    Entry contains the results for each application the test result
    is available for.

    :param test_name: name of the test entry
    :return: Entry from record
    """
    result = await TestRecord.filter(test_name=test_name).all()

    return Entry({x.application_name: x.data for x in result})

load(filename='test_results.toml') async

Deletes the current content from the database, then loads the content from filename to it.

Parameters:

Name Type Description Default
filename str

filename to load from

'test_results.toml'
Source code in fediverse_pasture/runner/result_store.py
async def load(self, filename: str = "test_results.toml"):
    """Deletes the current content from the database, then loads
    the content from filename to it.

    :param filename: filename to load from"""

    await TestRecord.filter().delete()

    with open(filename, "rb") as fp:
        data = tomllib.load(fp)

    for x in data["entries"]:
        await self.add_result(
            test_name=x["test_name"],
            application_name=x["application_name"],
            data=json.loads(x["data"]),
        )

results_for_test(test_name) async

Retrieves the results for a given test_name

Source code in fediverse_pasture/runner/result_store.py
async def results_for_test(self, test_name: str) -> list:
    """Retrieves the results for a given test_name"""
    result = await TestRecord.filter(test_name=test_name).all()

    return [{"application_name": x.application_name, **x.data} for x in result]

save(filename='test_results.toml') async

Saves the content of the database to the file given by filename

Parameters:

Name Type Description Default
filename str

filename to save to

'test_results.toml'
Source code in fediverse_pasture/runner/result_store.py
async def save(self, filename: str = "test_results.toml"):
    """Saves the content of the database to the file given by filename

    :param filename: filename to save to
    """

    with open(filename, "wb") as fp:
        tomli_w.dump({"entries": await self.as_list()}, fp, multiline_strings=True)

with_store(db_url='sqlite://test_results.sqlite') async

Initializes the database and returns a ResultStore. Usage:

async with with_store() as store:
    await store.add_result(...)
    ...
    await store.results_for_test(...)

Parameters:

Name Type Description Default
db_url str

Database url string

'sqlite://test_results.sqlite'
Source code in fediverse_pasture/runner/result_store.py
@asynccontextmanager
async def with_store(db_url: str = "sqlite://test_results.sqlite") -> ResultStore:
    """Initializes the database and returns a ResultStore. Usage:

    ```python
    async with with_store() as store:
        await store.add_result(...)
        ...
        await store.results_for_test(...)
    ```

    :param db_url: Database url string
    """
    await Tortoise.init(
        config={
            "connections": {"default": db_url},
            "apps": {
                "models": {
                    "models": [
                        "fediverse_pasture.runner.models",
                    ],
                    "default_connection": "default",
                },
            },
        },
    )
    await Tortoise.generate_schemas()

    yield ResultStore()

    await Tortoise.close_connections()

verify_actor

VerifyActorRunner dataclass

Class to run query the verify runner application for various applications

Parameters:

Name Type Description Default
application ApplicationAdapterForActor
required
verify_actor_url str
'http://pasture_verify_actor/'
session ClientSession | None
None
Source code in fediverse_pasture/runner/verify_actor.py
@dataclass
class VerifyActorRunner:
    """Class to run query the verify runner application for
    various applications"""

    application: ApplicationAdapterForActor
    verify_actor_url: str = "http://pasture_verify_actor/"
    session: aiohttp.ClientSession | None = None

    async def run(self) -> list:
        """Runs the query. Returns a list containing the results
        for each application"""
        if not self.session:
            async with aiohttp.ClientSession() as session:
                return await self.run_for_session(session)

        return await self.run_for_session(self.session)

    async def run_for_session(self, session) -> dict:
        async with session.post(
            self.verify_actor_url,
            data=aiohttp.FormData({"actor_uri": self.application.actor_uri}),
            headers={
                "content_type": "application/x-www-form-urlencoded",
                "accept": "application/json",
            },
        ) as response:
            data = await response.json()
            return {
                "application_name": self.application.application_name,
                "verify_actor_table": data["result"],
                "messages": data["messages"],
            }

run() async

Runs the query. Returns a list containing the results for each application

Source code in fediverse_pasture/runner/verify_actor.py
async def run(self) -> list:
    """Runs the query. Returns a list containing the results
    for each application"""
    if not self.session:
        async with aiohttp.ClientSession() as session:
            return await self.run_for_session(session)

    return await self.run_for_session(self.session)

format_verify_actor_result(result, prefix='###')

Routine to convert the obtained result into a markdown string

Source code in fediverse_pasture/runner/verify_actor.py
def format_verify_actor_result(result, prefix="###"):
    """Routine to convert the obtained result into a markdown string"""
    lines = []
    lines.append(f"{prefix} Verify Actor")
    lines.append("")
    for entry in result:
        lines.append(f'''=== "{entry["application_name"]}"''')
        lines.append("")
        table = [
            ("| Name   | GET Actor | POST Inbox |"),
            ("| ------ | --------- | ---------- |"),
        ]
        for name, value in entry["verify_actor_table"].items():
            if name != "webfinger":
                x_get = x_or_not(value["get_actor"])
                x_post = x_or_not(value["post_inbox"])
                table.append(f"""| {name} | {x_get} | {x_post} |""")

        lines.append(textwrap.indent("\n".join(table), " " * 4))
        lines.append("")

    lines.append("")
    lines.append(f"{prefix} Messages")
    lines.append("")
    for entry in result:
        lines.append(f'''=== "{entry["application_name"]}"''')
        lines.append("")
        lines.append("    ```json")
        lines.append(textwrap.indent(entry["messages"], " " * 4))
        lines.append("    ```")
        lines.append("")

    return "\n".join(lines)