Source code for flogin.plugin

from __future__ import annotations

import asyncio
import json
import logging
import os
import re
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
    AsyncIterable,
    Awaitable,
    Callable,
    Coroutine,
    Generic,
    Iterable,
    TypeVar,
    TypeVarTuple,
    overload,
)

from .default_events import get_default_events
from .errors import EnvNotSet, PluginNotInitialized
from .flow import FlowLauncherAPI, FlowSettings, PluginMetadata
from .jsonrpc import (
    ErrorResponse,
    ExecuteResponse,
    JsonRPCClient,
    QueryResponse,
    Result,
)
from .jsonrpc.responses import BaseResponse
from .query import Query
from .search_handler import SearchHandler
from .settings import Settings
from .utils import MISSING, cached_property, coro_or_gen, decorator, setup_logging

if TYPE_CHECKING:
    from typing_extensions import TypeVar

    from ._types import RawSettings, SearchHandlerCallback, SearchHandlerCondition

    SettingsT = TypeVar("SettingsT", default=Settings, bound=Settings)
else:
    SettingsT = TypeVar("SettingsT")

TS = TypeVarTuple("TS")
EventCallbackT = TypeVar(
    "EventCallbackT", bound=Callable[..., Coroutine[Any, Any, Any]]
)
LOG = logging.getLogger(__name__)

__all__ = ("Plugin",)


[docs] class Plugin(Generic[SettingsT]): r"""This class represents your plugin. This class implements a generic for a custom :class:`~flogin.settings.Settings` class for typechecking purposes. Parameters ----------- settings_no_update: Optional[:class:`bool`] Whether or not to let flow update flogin's version of the settings. This can be useful when using a custom settings menu. Defaults to ``False`` ignore_cancellation_requests: Optional[:class:`bool`] Whether or not to ignore cancellation requests sent from flow. Defaults to ``False`` """ __class_events__: list[str] = [] def __init__(self, **options: Any) -> None: self.options = options self._metadata: PluginMetadata | None = None self._search_handlers: list[SearchHandler] = [] self._results: dict[str, Result] = {} self._settings_are_populated: bool = False self._last_query: Query | None = None self._events: dict[str, Callable[..., Awaitable[Any]]] = get_default_events( self ) self.jsonrpc: JsonRPCClient = JsonRPCClient(self) # for event_name, event_callback in inspect.getmembers(self, lambda x: getattr(x, "__flogin_add_as_event__", False)): # self.register_event(event_callback, event_name) for event_name in self.__class_events__: self.register_event(getattr(self, event_name)) @property def last_query(self) -> Query | None: """:class:`~flogin.query.Query` | ``None``: The last query request that flow sent. This is ``None`` if no query request has been sent yet.""" return self._last_query @cached_property def api(self) -> FlowLauncherAPI: """:class:`~flogin.flow.api.FlowLauncherAPI`: An easy way to acess Flow Launcher's API""" return FlowLauncherAPI(self.jsonrpc) def _get_env(self, name: str, alternative: str | None = None) -> str: try: return os.environ[name] except KeyError: raise EnvNotSet(name, alternative) from None @cached_property def flow_version(self) -> str: """:class:`str`: the flow version from environment variables. .. versionadded:: 1.0.1 Raises ------ :class:`~flogin.errors.EnvNotSet` This is raised when the environment variable for this property is not set by flow or the plugin tester. """ return self._get_env("FLOW_VERSION", "flow_version") @cached_property def flow_application_dir(self) -> Path: """:class:`~pathlib.Path`: flow's application directory from environment variables. .. versionadded:: 1.0.1 Raises ------ :class:`~flogin.errors.EnvNotSet` This is raised when the environment variable for this property is not set by flow or the plugin tester. """ return Path(self._get_env("FLOW_APPLICATION_DIRECTORY", "flow_application_dir")) @cached_property def flow_program_dir(self) -> Path: """:class:`~pathlib.Path`: flow's application program from environment variables. .. versionadded:: 1.0.1 Raises ------ :class:`~flogin.errors.EnvNotSet` This is raised when the environment variable for this property is not set by flow or the plugin tester. """ return Path(self._get_env("FLOW_PROGRAM_DIRECTORY", "flow_program_dir")) @cached_property def settings(self) -> SettingsT: """:class:`~flogin.settings.Settings`: The plugin's settings set by the user""" fp = os.path.join( "..", "..", "Settings", "Plugins", self.metadata.name, "Settings.json" ) with open(fp, "r") as f: data = json.load(f) self._settings_are_populated = True LOG.debug(f"Settings filled from file: {data!r}") sets = Settings(data, no_update=self.options.get("settings_no_update", False)) return sets # type: ignore async def _run_event( self, coro: Callable[..., Awaitable[Any]], event_name: str, args: Iterable[Any], kwargs: dict[str, Any], error_handler: Callable[[Exception], Coroutine[Any, Any, Any]] | str = MISSING, ) -> Any: try: return await coro(*args, **kwargs) except asyncio.CancelledError: pass except Exception as e: if error_handler is MISSING: error_handler = "on_error" if isinstance(error_handler, str): return await self._events[error_handler](event_name, e, *args, **kwargs) else: return await error_handler(e) def _schedule_event( self, coro: Callable[..., Awaitable[Any]], event_name: str, args: Iterable[Any] = MISSING, kwargs: dict[str, Any] = MISSING, error_handler: Callable[[Exception], Coroutine[Any, Any, Any]] | str = MISSING, ) -> asyncio.Task: wrapped = self._run_event( coro, event_name, args or [], kwargs or {}, error_handler ) return asyncio.create_task(wrapped, name=f"flogin: {event_name}") def dispatch( self, event: str, *args: Any, **kwargs: Any ) -> None | asyncio.Task[None | BaseResponse]: method = f"on_{event}" # Special Event Cases replacements = { "on_initialize": "_initialize_wrapper", } method = replacements.get(method, method) LOG.debug("Dispatching event %s", method) event_callback = self._events.get(method) if event_callback: return self._schedule_event(event_callback, method, args, kwargs) async def _coro_or_gen_to_results( self, coro: Awaitable | AsyncIterable ) -> list[Result] | ErrorResponse: results = [] raw_results = await coro_or_gen(coro) if raw_results is None: return results if isinstance(raw_results, ErrorResponse): return raw_results if isinstance(raw_results, dict): res = Result.from_dict(raw_results) self._results[res.slug] = res results.append(res) else: if not isinstance(raw_results, list): raw_results = [raw_results] for raw_res in raw_results: res = Result.from_anything(raw_res) self._results[res.slug] = res results.append(res) return results async def _initialize_wrapper(self, arg: dict[str, Any]) -> ExecuteResponse: LOG.info(f"Initialize: {json.dumps(arg)}") self._metadata = PluginMetadata(arg["currentPluginMetadata"], self.api) self.dispatch("initialization") return ExecuteResponse(hide=False) async def process_context_menus( self, data: list[Any] ) -> QueryResponse | ErrorResponse: LOG.debug(f"Context Menu Handler: {data=}") if not data: results = [] else: result = self._results.get(data[0]) if result is not None: result.plugin = self task = self._schedule_event( self._coro_or_gen_to_results, event_name=f"ContextMenu-{result.slug}", args=[result.context_menu()], error_handler=lambda e: self._coro_or_gen_to_results( result.on_context_menu_error(e) ), ) results = await task else: results = [] if isinstance(results, ErrorResponse): return results return QueryResponse(results, self.settings._get_updates()) async def process_search_handlers( self, query: Query ) -> QueryResponse | ErrorResponse: results = [] for handler in self._search_handlers: handler.plugin = self if handler.condition(query): task = self._schedule_event( self._coro_or_gen_to_results, event_name=f"SearchHandler-{handler.name}", args=[handler.callback(query)], error_handler=lambda e: self._coro_or_gen_to_results( handler.on_error(query, e) ), ) results = await task break if isinstance(results, ErrorResponse): return results return QueryResponse(results, self.settings._get_updates()) @property def metadata(self) -> PluginMetadata: """ Returns the plugin's metadata. Raises -------- :class:`~flogin.errors.PluginNotInitialized` This gets raised if the plugin hasn't been initialized yet """ if self._metadata: return self._metadata raise PluginNotInitialized()
[docs] async def start(self): r"""|coro| The default startup/setup method. This can be overriden for advanced startup behavior, but make sure to run ``await super().start()`` to actually start your plugin. """ import aioconsole reader, writer = await aioconsole.get_standard_streams() await self.jsonrpc.start_listening(reader, writer)
[docs] def run(self, *, setup_default_log_handler: bool = True) -> None: r"""The default runner. This runs the :func:`~flogin.plugin.Plugin.start` coroutine, and setups up logging. Parameters -------- setup_default_log_handler: :class:`bool` Whether to setup the default log handler or not, defaults to `True`. """ if setup_default_log_handler: setup_logging() try: asyncio.run(self.start()) except Exception as e: LOG.exception( f"A fatal error has occured which crashed flogin: {e}", exc_info=e )
[docs] def register_search_handler(self, handler: SearchHandler[Any]) -> None: r"""Register a new search handler See the :ref:`search handler section <search_handlers>` for more information about using search handlers. Parameters ----------- handler: :class:`~flogin.search_handler.SearchHandler` The search handler to be registered """ self._search_handlers.append(handler) LOG.info(f"Registered search handler: {handler}")
[docs] def register_search_handlers(self, *handlers: SearchHandler[Any]) -> None: r"""Register new search handlers See the :ref:`search handler section <search_handlers>` for more information about using search handlers. Parameters ----------- *handlers: list[:class:`~flogin.search_handler.SearchHandler`] The search handlers to be registered """ for handler in handlers: self.register_search_handler(handler)
[docs] def register_event( self, callback: Callable[..., Coroutine[Any, Any, Any]], name: str | None = None ) -> None: """Registers an event to listen for. See the :func:`~flogin.plugin.Plugin.event` decorator for another method of registering events. All events must be a :ref:`coroutine <coroutine>`. .. NOTE:: See the :ref:`event reference <events>` to see what valid events there are. Parameters ----------- callback: :ref:`coroutine <coroutine>` The :ref:`coroutine <coroutine>` to be executed with the event name: Optional[:class:`str`] The name of the event to be registered. Defaults to the callback's name. """ self._events[name or callback.__name__] = callback
[docs] @decorator(is_factory=False) def event(self, callback: EventCallbackT) -> EventCallbackT: """A decorator that registers an event to listen for. This decorator can be used with a plugin instance or as a classmethod. All events must be a :ref:`coroutine <coroutine>`. .. versionchanged:: 1.1.0 The decorator can now be used as a classmethod .. NOTE:: See the :ref:`event reference <events>` to see what valid events there are. Example --------- With a plugin instance: .. code-block:: python3 @plugin.event async def on_initialization(): print('Ready!') As a classmethod: .. code-block:: python3 class MyPlugin(Plugin): @Plugin.event async def on_initialization(self): print('Ready!') """ self.register_event(callback) return callback
@event.classmethod @classmethod def __event_classmethod_deco(cls, callback: EventCallbackT) -> EventCallbackT: # setattr(callback, "__flogin_add_as_event__", True) cls.__class_events__.append(callback.__name__) return callback @overload def search( self, condition: SearchHandlerCondition ) -> Callable[[SearchHandlerCallback], SearchHandler]: ... @overload def search( self, *, text: str ) -> Callable[[SearchHandlerCallback], SearchHandler]: ... @overload def search( self, *, pattern: re.Pattern | str = MISSING, ) -> Callable[[SearchHandlerCallback], SearchHandler]: ... @overload def search( self, *, keyword: str = MISSING, ) -> Callable[[SearchHandlerCallback], SearchHandler]: ... @overload def search( self, *, allowed_keywords: Iterable[str] = MISSING, ) -> Callable[[SearchHandlerCallback], SearchHandler]: ... @overload def search( self, *, disallowed_keywords: Iterable[str] = MISSING, ) -> Callable[[SearchHandlerCallback], SearchHandler]: ... @overload def search( self, ) -> Callable[[SearchHandlerCallback], SearchHandler]: ...
[docs] def search( self, condition: SearchHandlerCondition | None = None, *, text: str = MISSING, pattern: re.Pattern | str = MISSING, keyword: str = MISSING, allowed_keywords: Iterable[str] = MISSING, disallowed_keywords: Iterable[str] = MISSING, ) -> Callable[[SearchHandlerCallback], SearchHandler]: """A decorator that registers a search handler. All search handlers must be a :ref:`coroutine <coroutine>`. See the :ref:`search handler section <search_handlers>` for more information about using search handlers. Parameters ---------- condition: Optional[:ref:`condition <condition_example>`] The condition to determine which queries this handler should run on. If given, this should be the only argument given. text: Optional[:class:`str`] A kwarg to quickly add a :class:`~flogin.conditions.PlainTextCondition`. If given, this should be the only argument given. pattern: Optional[:class:`re.Pattern` | :class:`str`] A kwarg to quickly add a :class:`~flogin.conditions.RegexCondition`. If given, this should be the only argument given. keyword: Optional[:class:`str`] A kwarg to quickly set the condition to a :class:`~flogin.conditions.KeywordCondition` condition with the ``keyword`` kwarg being the only allowed keyword. allowed_keywords: Optional[Iterable[:class:`str`]] A kwarg to quickly set the condition to a :class:`~flogin.conditions.KeywordCondition` condition with the kwarg being the list of allowed keywords. disallowed_keywords: Optional[Iterable[:class:`str`]] A kwarg to quickly set the condition to a :class:`~flogin.conditions.KeywordCondition` condition with the kwarg being the list of disallowed keywords. Example --------- .. code-block:: python3 @plugin.on_search() async def example_search_handler(data: Query): return "This is a result!" """ if condition is None: condition = SearchHandler._builtin_condition_kwarg_to_obj( text=text, pattern=pattern, keyword=keyword, allowed_keywords=allowed_keywords, disallowed_keywords=disallowed_keywords, ) def inner(func: SearchHandlerCallback) -> SearchHandler: handler = SearchHandler() if condition: handler.condition = condition # type: ignore handler.callback = func # type: ignore # type is the same self.register_search_handler(handler) return handler return inner
[docs] def fetch_flow_settings(self) -> FlowSettings: """Fetches flow's settings from flow's config file Returns -------- :class:`~flogin.flow.settings.FlowSettings` A dataclass containing all of flow's settings """ path = os.path.join("..", "..", "Settings", "Settings.json") with open(path, "r") as f: data = json.load(f) return FlowSettings(data)