Source code for flogin.plugin

from __future__ import annotations

import asyncio
import json
import logging
import os
from collections.abc import Awaitable, Callable, Coroutine, Iterable
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
    ClassVar,
    Generic,
    Literal,
    TypeVar,
    TypeVarTuple,
    cast,
    overload,
)

from .default_events import get_default_events
from .errors import EnvNotSet, PluginNotInitialized
from .flow import FlowLauncherAPI, FlowSettings, PluginMetadata
from .jsonrpc import (
    ErrorResponse,
    ExecuteResponse,
    JsonRPCClient,
    QueryResponse,
    Result,
)
from .search_handler import SearchHandler
from .settings import Settings
from .utils import (
    MISSING,
    add_classmethod_alt,
    cached_property,
    coro_or_gen,
    decorator,
    func_with_self,
    setup_logging,
)

if TYPE_CHECKING:
    import re
    from asyncio.streams import StreamReader, StreamWriter

    from typing_extensions import TypeVar  # noqa: TC004

    from ._types.search_handlers import (
        ConvertableToResult,
        SearchHandlerCallback,
        SearchHandlerCallbackReturns,
        SearchHandlerCallbackWithSelf,
        SearchHandlerCondition,
    )
    from .query import Query

    SettingsT = TypeVar("SettingsT", default=Settings, bound=Settings)
else:
    SettingsT = TypeVar("SettingsT")

TS = TypeVarTuple("TS")
EventCallbackT = TypeVar(
    "EventCallbackT", bound=Callable[..., Coroutine[Any, Any, Any]]
)
log = logging.getLogger(__name__)

__all__ = ("Plugin",)


[docs] class Plugin(Generic[SettingsT]): r"""This class represents your plugin. Parameters ----------- settings_no_update: Optional[:class:`bool`] Whether or not to let flow update flogin's version of the settings. This can be useful when using a custom settings menu. Defaults to ``False`` ignore_cancellation_requests: Optional[:class:`bool`] Whether or not to ignore cancellation requests sent from flow. Defaults to ``False`` disable_log_override_files: Optional[:class:`bool`] Whether or not to disable the log override files. Defaults to ``False``. See :doc:`log-override-files` for more information on this. """ __class_events__: ClassVar[list[str]] = [] __class_search_handlers__: ClassVar[list[SearchHandler]] = [] def __init__(self, **options: Any) -> None: self.options = options self._metadata: PluginMetadata | None = None self._search_handlers: list[SearchHandler] = [] self._results: dict[str, Result] = {} self._settings_are_populated: bool = False self._last_query: Query | None = None self._events: dict[str, Callable[..., Awaitable[Any]]] = get_default_events( self ) self.jsonrpc: JsonRPCClient = JsonRPCClient(self) for event_name in self.__class_events__: self.register_event(getattr(self, event_name)) for handler in self.__class_search_handlers__: setattr(handler.callback, "owner", self) self.register_search_handler(handler) self.check_for_log_override_files()
[docs] def check_for_log_override_files(self) -> bool | None: """None=No-Changes, True=In-Prod, False=In-Debug""" if self.options.get("disable_log_override_files"): return from .utils import _logging_formatter_status dir = Path(os.getcwd()) for _ in dir.glob("*.flogin.debug"): if _logging_formatter_status is None: setup_logging() log.info("Debug file found, logs are now enabled.") return False for _ in dir.glob("*.flogin.prod"): if _logging_formatter_status is not None: log.info("Prod file found. Logs are being disabled") logger, handler = _logging_formatter_status handler.close() logger.removeHandler(handler) return True
@property def last_query(self) -> Query | None: """:class:`~flogin.query.Query` | ``None``: The last query request that flow sent. This is ``None`` if no query request has been sent yet.""" return self._last_query @cached_property def api(self) -> FlowLauncherAPI: """:class:`~flogin.flow.api.FlowLauncherAPI`: An easy way to acess Flow Launcher's API""" return FlowLauncherAPI(self.jsonrpc) def _get_env(self, name: str, alternative: str | None = None) -> str: try: return os.environ[name] except KeyError: raise EnvNotSet(name, alternative) from None @cached_property def flow_version(self) -> str: """:class:`str`: the flow version from environment variables. .. versionadded:: 1.0.1 Raises ------ :class:`~flogin.errors.EnvNotSet` This is raised when the environment variable for this property is not set by flow or the plugin tester. """ return self._get_env("FLOW_VERSION", "flow_version") @cached_property def flow_application_dir(self) -> Path: """:class:`~pathlib.Path`: flow's application directory from environment variables. .. versionadded:: 1.0.1 Raises ------ :class:`~flogin.errors.EnvNotSet` This is raised when the environment variable for this property is not set by flow or the plugin tester. """ return Path(self._get_env("FLOW_APPLICATION_DIRECTORY", "flow_application_dir")) @cached_property def flow_program_dir(self) -> Path: """:class:`~pathlib.Path`: flow's application program from environment variables. .. versionadded:: 1.0.1 Raises ------ :class:`~flogin.errors.EnvNotSet` This is raised when the environment variable for this property is not set by flow or the plugin tester. """ return Path(self._get_env("FLOW_PROGRAM_DIRECTORY", "flow_program_dir")) @cached_property def settings(self) -> SettingsT: """:class:`~flogin.settings.Settings`: The plugin's settings set by the user""" fp = os.path.join( "..", "..", "Settings", "Plugins", self.metadata.name, "Settings.json" ) with open(fp) as f: data = json.load(f) self._settings_are_populated = True log.debug("Settings filled from file: %r", data) return Settings(data, no_update=self.options.get("settings_no_update", False)) # type: ignore[reportReturnType] async def _run_event( self, coro: Callable[..., Awaitable[Any]], event_name: str, args: Iterable[Any], kwargs: dict[str, Any], error_handler: Callable[[Exception], Coroutine[Any, Any, Any]] | str = MISSING, ) -> Any: try: return await coro(*args, **kwargs) except asyncio.CancelledError: pass except Exception as e: if error_handler is MISSING: error_handler = "on_error" if isinstance(error_handler, str): return await self._events[error_handler](event_name, e, *args, **kwargs) return await error_handler(e) def _schedule_event( self, coro: Callable[..., Awaitable[Any]], event_name: str, args: Iterable[Any] = MISSING, kwargs: dict[str, Any] = MISSING, error_handler: Callable[[Exception], Coroutine[Any, Any, Any]] | str = MISSING, ) -> asyncio.Task[Any]: wrapped = self._run_event( coro, event_name, args or [], kwargs or {}, error_handler ) return asyncio.create_task(wrapped, name=f"flogin: {event_name}") def dispatch( self, event: str, *args: Any, **kwargs: Any ) -> None | asyncio.Task[Any]: method = f"on_{event}" # Special Event Cases replacements = { "on_initialize": "_initialize_wrapper", } method = replacements.get(method, method) log.debug("Dispatching event %s", method) event_callback = self._events.get(method) if event_callback: return self._schedule_event(event_callback, method, args, kwargs) async def _coro_or_gen_to_results( self, coro: SearchHandlerCallbackReturns ) -> list[Result] | ErrorResponse: results: list[Result] = [] raw_results = await coro_or_gen(coro) if raw_results is None: return results if isinstance(raw_results, ErrorResponse): return raw_results if not isinstance(raw_results, list): raw_results = [raw_results] for raw_res in cast("list[ConvertableToResult]", raw_results): res = Result.from_anything(raw_res) self._results[res.slug] = res results.append(res) return results async def _initialize_wrapper(self, arg: dict[str, Any]) -> ExecuteResponse: log.debug("Initialize: %r", arg) self._metadata = PluginMetadata(arg["currentPluginMetadata"], self.api) self.dispatch("initialization") return ExecuteResponse(hide=False) async def process_context_menus( self, data: list[Any] ) -> QueryResponse | ErrorResponse: log.debug("Context Menu Handler: data=%r", data) results: list[Result] if not data: results = [] else: result = self._results.get(data[0]) if result is not None: result.plugin = self task = self._schedule_event( self._coro_or_gen_to_results, event_name=f"ContextMenu-{result.slug}", args=[result.context_menu()], error_handler=lambda e: self._coro_or_gen_to_results( result.on_context_menu_error(e) ), ) results = await task else: results = [] if isinstance(results, ErrorResponse): return results return QueryResponse(results, self.settings._get_updates()) async def process_search_handlers( self, query: Query ) -> QueryResponse | ErrorResponse: results: list[Result] = [] for handler in self._search_handlers: handler.plugin = self if handler.condition(query): task = self._schedule_event( self._coro_or_gen_to_results, event_name=f"SearchHandler-{handler.name}", args=[handler.callback(query)], error_handler=lambda e: self._coro_or_gen_to_results( handler.on_error(query, e) ), ) results = await task break if isinstance(results, ErrorResponse): return results return QueryResponse(results, self.settings._get_updates()) async def _action_callback_wrapper( self, callback: Callable[[], Awaitable[ExecuteResponse | bool | None]] ) -> ExecuteResponse: value = await callback() if isinstance(value, bool): return ExecuteResponse(hide=value) if value is None: return ExecuteResponse() return value def process_action(self, method: str) -> asyncio.Task[ExecuteResponse] | None: slug = method.removeprefix("flogin.action.") result = self._results.get(slug) if result is None: return result.plugin = self return self._schedule_event( self._action_callback_wrapper, method, args=[result.callback], error_handler=result.on_error, ) @property def metadata(self) -> PluginMetadata: """ Returns the plugin's metadata. Raises -------- :class:`~flogin.errors.PluginNotInitialized` This gets raised if the plugin hasn't been initialized yet """ if self._metadata: return self._metadata raise PluginNotInitialized()
[docs] async def start(self) -> None: r"""|coro| The default startup/setup method. This can be overriden for advanced startup behavior, but make sure to run ``await super().start()`` to actually start your plugin. """ import aioconsole get_standard_streams: Awaitable[tuple[StreamReader, StreamWriter]] = ( aioconsole.get_standard_streams() # type: ignore ) reader, writer = await get_standard_streams await self.jsonrpc.start_listening(reader, writer)
[docs] def run(self, *, setup_default_log_handler: bool = True) -> None: r"""The default runner. This runs the :func:`~flogin.plugin.Plugin.start` coroutine, and setups up logging. Parameters -------- setup_default_log_handler: :class:`bool` Whether to setup the default log handler or not, defaults to `True`. """ log_status = self.check_for_log_override_files() if log_status is None and setup_default_log_handler: setup_logging() try: asyncio.run(self.start()) except Exception as e: log.exception("A fatal error has occurred which crashed flogin", exc_info=e)
[docs] def register_search_handler(self, handler: SearchHandler[Any]) -> None: r"""Register a new search handler See the :ref:`search handler section <search_handlers>` for more information about using search handlers. Parameters ----------- handler: :class:`~flogin.search_handler.SearchHandler` The search handler to be registered """ self._search_handlers.append(handler) log.debug("Registered search handler: %r", handler)
[docs] def register_search_handlers(self, *handlers: SearchHandler[Any]) -> None: r"""Register new search handlers See the :ref:`search handler section <search_handlers>` for more information about using search handlers. Parameters ----------- *handlers: list[:class:`~flogin.search_handler.SearchHandler`] The search handlers to be registered """ for handler in handlers: self.register_search_handler(handler)
[docs] def register_event( self, callback: Callable[..., Coroutine[Any, Any, Any]], name: str | None = None ) -> None: """Registers an event to listen for. See the :func:`~flogin.plugin.Plugin.event` decorator for another method of registering events. All events must be a :ref:`coroutine <coroutine>`. .. NOTE:: See the :ref:`event reference <events>` to see what valid events there are. Parameters ----------- callback: :ref:`coroutine <coroutine>` The :ref:`coroutine <coroutine>` to be executed with the event name: Optional[:class:`str`] The name of the event to be registered. Defaults to the callback's name. """ self._events[name or callback.__name__] = callback
@classmethod def __event_classmethod_deco(cls, callback: EventCallbackT) -> EventCallbackT: cls.__class_events__.append(callback.__name__) return callback
[docs] @decorator @add_classmethod_alt(__event_classmethod_deco) def event(self, callback: EventCallbackT) -> EventCallbackT: """A decorator that registers an event to listen for. This decorator can be used with a plugin instance or as a classmethod. All events must be a :ref:`coroutine <coroutine>`. .. versionchanged:: 1.1.0 The decorator can now be used as a classmethod .. NOTE:: See the :ref:`event reference <events>` to see what valid events there are. Example --------- With a plugin instance: .. code-block:: python3 from flogin.utils import print @plugin.event async def on_initialization(): print('Ready!') As a classmethod: .. code-block:: python3 from flogin.utils import print class MyPlugin(Plugin): @Plugin.event async def on_initialization(self): print('Ready!') """ self.register_event(callback) return callback
@overload @classmethod def __handle_search_deco( cls, registrate: Callable[[SearchHandler], None], *, add_self: Literal[True], condition: SearchHandlerCondition | None = None, **kwargs: Any, ) -> Callable[[SearchHandlerCallbackWithSelf], SearchHandler]: ... @overload @classmethod def __handle_search_deco( cls, registrate: Callable[[SearchHandler], None], *, add_self: Literal[False], condition: SearchHandlerCondition | None = None, **kwargs: Any, ) -> Callable[[SearchHandlerCallback], SearchHandler]: ... @classmethod def __handle_search_deco( cls, registrate: Callable[[SearchHandler], None], *, add_self: bool, condition: SearchHandlerCondition | None = None, **kwargs: Any, ) -> ( Callable[[SearchHandlerCallbackWithSelf], SearchHandler] | Callable[[SearchHandlerCallback], SearchHandler] ): if condition is None: condition = SearchHandler._builtin_condition_kwarg_to_obj(**kwargs) def inner( func: SearchHandlerCallbackWithSelf | SearchHandlerCallback, ) -> SearchHandler: handler = SearchHandler() if condition: setattr(handler, "condition", condition) if add_self: func = func_with_self(func) # type: ignore[reportArgumentType] setattr(handler, "callback", func) registrate(handler) return handler return inner @classmethod def __search_classmethod_deco( cls, condition: SearchHandlerCondition | None = None, *, text: str = MISSING, pattern: re.Pattern[str] | str = MISSING, keyword: str = MISSING, allowed_keywords: Iterable[str] = MISSING, disallowed_keywords: Iterable[str] = MISSING, ) -> Callable[[SearchHandlerCallbackWithSelf], SearchHandler]: return cls.__handle_search_deco( cls.__class_search_handlers__.append, add_self=True, condition=condition, text=text, pattern=pattern, keyword=keyword, allowed_keywords=allowed_keywords, disallowed_keywords=disallowed_keywords, )
[docs] @decorator @add_classmethod_alt(__search_classmethod_deco) def search( self, condition: SearchHandlerCondition | None = None, *, text: str = MISSING, pattern: re.Pattern[str] | str = MISSING, keyword: str = MISSING, allowed_keywords: Iterable[str] = MISSING, disallowed_keywords: Iterable[str] = MISSING, ) -> Callable[[SearchHandlerCallback], SearchHandler]: """A decorator that registers a search handler. All search handlers must be a :ref:`coroutine <coroutine>`. See the :ref:`search handler section <search_handlers>` for more information about using search handlers. .. versionchanged:: 2.0.0 The search decorator can now be used as a classmethod .. NOTE:: This decorator can also be used as a classmethod Parameters ---------- condition: Optional[:ref:`condition <condition_example>`] The condition to determine which queries this handler should run on. If given, this should be the only argument given. text: Optional[:class:`str`] A kwarg to quickly add a :class:`~flogin.conditions.PlainTextCondition`. If given, this should be the only argument given. pattern: Optional[:class:`re.Pattern` | :class:`str`] A kwarg to quickly add a :class:`~flogin.conditions.RegexCondition`. If given, this should be the only argument given. keyword: Optional[:class:`str`] A kwarg to quickly set the condition to a :class:`~flogin.conditions.KeywordCondition` condition with the ``keyword`` kwarg being the only allowed keyword. allowed_keywords: Optional[Iterable[:class:`str`]] A kwarg to quickly set the condition to a :class:`~flogin.conditions.KeywordCondition` condition with the kwarg being the list of allowed keywords. disallowed_keywords: Optional[Iterable[:class:`str`]] A kwarg to quickly set the condition to a :class:`~flogin.conditions.KeywordCondition` condition with the kwarg being the list of disallowed keywords. Example --------- With an instance: .. code-block:: python3 @plugin.search() async def example_search_handler(data: Query): return "This is a result!" As a classmethod: .. code-block:: python3 class MyPlugin(Plugin): @Plugin.search() async def example_search_handler(self, data: Query): return "This is a result!" """ return self.__handle_search_deco( self.register_search_handler, add_self=False, condition=condition, text=text, pattern=pattern, keyword=keyword, allowed_keywords=allowed_keywords, disallowed_keywords=disallowed_keywords, )
[docs] def fetch_flow_settings(self) -> FlowSettings: """Fetches flow's settings from flow's config file Returns -------- :class:`~flogin.flow.settings.FlowSettings` A dataclass containing all of flow's settings """ path = os.path.join("..", "..", "Settings", "Settings.json") with open(path) as f: data = json.load(f) return FlowSettings(data)