Source code for tests.server.manager

"""The middleman between Plover and the server."""

import os
from operator import itemgetter

from aiohttp.web import WebSocketResponse

from tests.server.config import ServerConfig
from tests.server.errors import ERROR_NO_SERVER, ERROR_SERVER_RUNNING
from tests.server.logger import log
from tests.server.server import EngineServer, ServerStatus
from tests.server.websocket.server import WebSocketServer

SERVER_CONFIG_FILE = "config.json"
CONFIG_DIR = "./"


[docs] class EngineServerManager: """Manages a server that exposes the Plover engine.""" _server: EngineServer def __init__(self) -> None: self._server = None self._config_path: str = os.path.join(CONFIG_DIR, SERVER_CONFIG_FILE) if self.get_server_status() != ServerStatus.Stopped: raise AssertionError(ERROR_SERVER_RUNNING) self._config = ServerConfig( self._config_path ) # reload the configuration when the server is restarted self._server = WebSocketServer( self._config.host, self._config.port, self._config.ssl, self._config.remotes, self._config.private_key, ) self._server.register_message_callback(self._on_message)
[docs] def start(self) -> None: """Starts the server. Raises: AssertionError: The server failed to start. IOError: The server failed to start. """ self._server.start()
[docs] def stop(self) -> None: """Stops the server. Raises: AssertionError: The server failed to stop. """ if self.get_server_status() != ServerStatus.Running: raise AssertionError(ERROR_NO_SERVER) self._server.data.stop_listening() self.stop_listening() self._server.queue_stop()
[docs] def get_server_status(self) -> ServerStatus: """Gets the status of the server. Returns: The status of the server. """ return self._server.listened.status if self._server else ServerStatus.Stopped
[docs] def join(self) -> None: """Waits for the server to finish processing all requests and stops the server.""" self._server.join()
[docs] def add_listener(self, listener: callable) -> None: """Adds a listener to the server. Args: listener (callable): The listener function to be added. """ self._server.listened.add_listener(listener)
[docs] def stop_listening(self) -> None: """Stops the server from listening for status changes.""" self._server.listened.stop_listening()
async def _on_message(self, data: dict) -> None: """ Process the received message. Args: data (dict): The received message data. Returns: None """ decrypted: dict = itemgetter("decrypted")(data) log.info(f"Received data {decrypted}") socket: WebSocketResponse = itemgetter("socket")(data) await socket.send_json(decrypted)