Source code for jsonrpyc

# coding: utf-8

from __future__ import annotations

__all__: list[str] = []

import os
import sys
import json
import io
import time
import threading
from typing import Any, Callable, Type, Protocol, Optional

from typing_extensions import TypeAlias

# package infos
from jsonrpyc.__meta__ import (  # noqa

Callback: TypeAlias = Callable[[Optional[Exception], Optional[Any]], None]

class InputStream(Protocol):

    def fileno(self) -> int:

    def closed(self) -> bool:

    def isatty(self) -> bool:

    def tell(self) -> int:

    def seek(self, offset: int, whence: int = os.SEEK_SET, /) -> int:

    def readline(self) -> str:

    def readlines(self) -> list[str]:

class OutputStream(Protocol):

    def fileno(self) -> int:

    def closed(self) -> bool:

    def write(self, b: str) -> int:

    def flush(self) -> None:

[docs]class Spec(object): """ This class wraps methods that create JSON-RPC 2.0 compatible string representations of request, response and error objects. All methods are class members, so you might never want to create an instance of this class, but rather use the methods directly: .. code-block:: python Spec.request("my_method", 18) # the id is optional # => '{"jsonrpc":"2.0","method":"my_method","id": 18}' Spec.response(18, "some_result") # => '{"jsonrpc":"2.0","id":18,"result":"some_result"}' Spec.error(18, -32603) # => '{"jsonrpc":"2.0","id":18,"error":{"code":-32603,"message":"Internal error"}}' """
[docs] @classmethod def check_id(cls, id: str | int | None, *, allow_empty: bool = False) -> None: """ Value check for *id* entries. When *allow_empty* is *True*, *id* is allowed to be *None*. Raises a *TypeError* when *id* is neither an integer nor a string. :param id: The id to check. :param allow_empty: Whether *id* is allowed to be *None*. :return: None. :raises TypeError: When *id* is invalid. """ if (id is not None or not allow_empty) and not isinstance(id, (int, str)): raise TypeError(f"id must be an integer or string, got {id} ({type(id)})")
[docs] @classmethod def check_method(cls, method: str, /) -> None: """ Value check for *method* entries. Raises a *TypeError* when *method* is not a string. :param method: The method to check. :return: None. :raises TypeError: When *method* is invalid. """ if not isinstance(method, str): raise TypeError(f"method must be a string, got {method} ({type(method)})")
[docs] @classmethod def check_code(cls, code: int, /) -> None: """ Value check for *code* entries. Raises a *TypeError* when *code* is not an integer, or a *KeyError* when there is no :py:class:`RPCError` subclass registered for that *code*. :param code: The error code to check. :return: None. :raises TypeError: When *code* is invalid. """ try: get_error(code) except: raise TypeError(f"invalid error code, got {code} ({type(code)})")
[docs] @classmethod def request( cls, method: str, /, id: str | int | None = None, *, params: dict[str, Any] | None = None, ) -> str: """ Creates the string representation of a request that calls *method* with optional *params* which are encoded by ``json.dumps``. When *id* is *None*, the request is considered a notification. :param method: The method to call. :param id: The id of the request. :param params: The parameters of the request. :return: The request string. :raises RPCInvalidRequest: When *method* or *id* are invalid. :raises RPCParseError: When *params* could not be encoded. """ try: cls.check_method(method) cls.check_id(id, allow_empty=True) except Exception as e: raise RPCInvalidRequest(str(e)) # start building the request string req = f"{{\"jsonrpc\":\"2.0\",\"method\":\"{method}\"" # add the id when given if id is not None: # encode string ids if isinstance(id, str): id = json.dumps(id) req += f",\"id\":{id}" # add parameters when given if params is not None: try: req += f",\"params\":{json.dumps(params)}" except Exception as e: raise RPCParseError(str(e)) # end the request string req += "}" return req
[docs] @classmethod def response(cls, id: str | int | None, result: Any, /) -> str: """ Creates the string representation of a respone that was triggered by a request with *id*. A *result* is required, even if it is *None*. :param id: The id of the request that triggered this response. :param result: The result of the request. :return: The response string. :raises RPCInvalidRequest: When *id* is invalid. :raises RPCParseError: When *result* could not be encoded. """ try: cls.check_id(id) except Exception as e: raise RPCInvalidRequest(str(e)) # encode string ids if isinstance(id, str): id = json.dumps(id) # build the response string try: res = f"{{\"jsonrpc\":\"2.0\",\"id\":{id},\"result\":{json.dumps(result)}}}" except Exception as e: raise RPCParseError(str(e)) return res
[docs] @classmethod def error( cls, id: str | int | None, code: int, *, data: Any | None = None, ) -> str: """ Creates the string representation of an error that occured while processing a request with *id*. *code* must lead to a registered :py:class:`RPCError`. *data* might contain additional, detailed error information and is encoded by ``json.dumps`` when set. :param id: The id of the request that triggered this error. :param code: The error code. :param data: Additional error data. :return: The error string. :raises RPCInvalidRequest: When *id* or *code* are invalid. :raises RPCParseError: When *data* could not be encoded. """ try: cls.check_id(id) cls.check_code(code) except Exception as e: raise RPCInvalidRequest(str(e)) # build the inner error data message = get_error(code).title # type: ignore[union-attr] err_data = f"{{\"code\":{code},\"message\":\"{message}\"" # insert data when given if data is not None: try: err_data += f",\"data\":{json.dumps(data)}}}" except Exception as e: raise RPCParseError(str(e)) else: err_data += "}" # encode string ids if isinstance(id, str): id = json.dumps(id) # start building the error string err = f"{{\"jsonrpc\":\"2.0\",\"id\":{id},\"error\":{err_data}}}" return err
[docs]class RPC(object): """ The main class of *jsonrpyc*. Instances of this class wrap an input stream *stdin* and an output stream *stdout* in order to communicate with other services. A service is not even forced to be written in Python as long as it strictly implements the JSON-RPC 2.0 specification. RPC instances may wrap a *target* object. By means of a :py:class:`Watchdog` instance, incoming requests are routed to methods of this object whose result might be sent back as a response. The watchdog instance is created but not started yet, when *watch* is not *True*. :param target: The target object to wrap. :param stdin: The input stream. :param stdout: The output stream. :param watch: Whether to start the watchdog. :param watch_kwargs: Additional keyword arguments for the watchdog. Example implementation: ** .. code-block:: python import jsonrpyc class MyTarget(object): def greet(self, name): return f"Hi, {name}!" jsonrpc.RPC(MyTarget()) ** .. code-block:: python import jsonrpyc from subprocess import Popen, PIPE p = Popen(["python", ""], stdin=PIPE, stdout=PIPE) rpc = jsonrpyc.RPC(stdout=p.stdin, stdin=p.stdout) # non-blocking remote procedure call with callback and js-like signature def cb(err, res=None): if err: throw err print(f"callback got: {res}") rpc("greet", args=("John",), callback=cb) # cb is called asynchronously which prints # => "callback got: Hi, John!" # blocking remote procedure call with 0.1s polling print(rpc("greet", args=("John",), block=0.1)) # => "Hi, John!" # shutdown the process p.stdin.close() p.stdout.close() p.terminate() p.wait() .. py:attribute:: target The wrapped target object. Might be *None* when no object is wrapped, e.g. for the *client* RPC instance. .. py:attribute:: stdin The input stream, re-opened with ``"rb"``. .. py:attribute:: stdout The output stream, re-opened with ``"wb"``. .. py:attribute:: watch The :py:class:`Watchdog` instance that optionally watches *stdin* and dispatches incoming requests. """ EMPTY_RESULT = object() def __init__( self, target: Any | None = None, stdin: InputStream | None = None, stdout: OutputStream | None = None, *, watch: bool = True, watch_kwargs: dict[str, Any] | None = None, ) -> None: super().__init__() # the wrapped target object = target # open input stream if stdin is None: stdin = sys.stdin self.original_stdin = stdin self.stdin =, "rb") # open output stream if stdout is None: stdout = sys.stdout self.original_stdout = stdout self.stdout =, "wb") # other attributes self._i = -1 self._callbacks: dict[int, Callback] = {} self._results: dict[int, Any] = {} # create and optionall start the watchdog watch_kwargs = watch_kwargs or {} watch_kwargs["start"] = watch watch_kwargs.setdefault("daemon", target is None) self.watchdog = Watchdog(self, **watch_kwargs) def __del__(self) -> None: watchdog = getattr(self, "watchdog", None) if watchdog: watchdog.stop() def __call__(self, *args, **kwargs) -> None: """ Shorthand for :py:meth:`call`. """ return*args, **kwargs)
[docs] def call( self, method: str, args: tuple[Any, ...] = (), kwargs: dict | None = None, *, callback: Callback | None = None, block: int = 0, timeout: float | int = 0, ) -> None: """ Performs an actual remote procedure call by writing a request representation (a string) to the output stream. The remote RPC instance uses *method* to route to the actual method to call with *args* and *kwargs*. When *callback* is set, it will be called with the result of the remote call. When *block* is larger than *0*, the calling thread is blocked until the result is received. In this case, *block* will be the poll interval, emulating synchronuous return value behavior. When both *callback* is *None* and *block* is *0* or smaller, the request is considered a notification and the remote RPC instance will not send a response. If *timeout* is not zero, raise TimeoutError after *timeout* seconds with no response. :param method: The method to call. :param args: The positional arguments of the method. :param kwargs: The keyword arguments of the method. :param callback: The callback function. :param block: The poll interval in seconds. :param timeout: The timeout in seconds. :return: None. :raises TimeoutError: When the request times out. """ starting_time = time.monotonic() # default kwargs if kwargs is None: kwargs = {} # check if the call is a notification is_notification = callback is None and block <= 0 # create a new id for requests expecting a response id = -1 if not is_notification: self._i += 1 id = self._i # register the callback if callback is not None: self._callbacks[id] = callback # store an empty result for the meantime if block > 0: self._results[id] = self.EMPTY_RESULT # create the request params = {"args": args, "kwargs": kwargs} req = Spec.request(method, id=id, params=params) self._write(req) # blocking return value behavior if block > 0: while True: if self._results[id] != self.EMPTY_RESULT: result = self._results[id] del self._results[id] if isinstance(result, Exception): raise result return result if timeout: elapsed = time.monotonic() - starting_time if elapsed > timeout: raise TimeoutError("RPC Request timed out") time.sleep(block)
def _handle(self, line: str) -> None: """ Handles an incoming *line* and dispatches the parsed object to the request, response, or error handlers. :param line: The incoming line. :return: None. """ obj = json.loads(line) # dispatch to the correct handler if "method" in obj: # request self._handle_request(obj) elif "error" not in obj: # response self._handle_response(obj) else: # error self._handle_error(obj) def _handle_request(self, req: dict[str, Any]) -> None: """ Handles an incoming request *req*. When it containes an id, a response or error is sent back. :param req: The incoming request. :return: None. """ try: method = self._route(req["method"]) result = method(*req["params"]["args"], **req["params"]["kwargs"]) if "id" in req: res = Spec.response(req["id"], result) self._write(res) except Exception as e: if "id" in req: if isinstance(e, RPCError): err = Spec.error(req["id"], e.code, else: err = Spec.error(req["id"], -32603, data=str(e)) self._write(err) def _handle_response(self, res: dict[str, Any]) -> None: """ Handles an incoming successful response *res*. Blocking calls are resolved and registered callbacks are invoked with the first error argument being set to *None*. :param res: The incoming response. :return: None. """ # set the result if res["id"] in self._results: self._results[res["id"]] = res["result"] # lookup and invoke the callback if res["id"] in self._callbacks: callback = self._callbacks[res["id"]] del self._callbacks[res["id"]] callback(None, res["result"]) def _handle_error(self, res: dict[str, Any]) -> None: """ Handles an incoming failed response *res*. Blocking calls throw an exception and registered callbacks are invoked with an exception and the second result argument set to *None*. :param res: The incoming error response. :return: None. """ # extract the error and create an actual error instance to raise err = res["error"] error = get_error(err["code"])(err.get("data", err["message"])) # set the error if res["id"] in self._results: self._results[res["id"]] = error # lookup and invoke the callback if res["id"] in self._callbacks: callback = self._callbacks[res["id"]] del self._callbacks[res["id"]] callback(error, None) def _route(self, method: str) -> Any: """ Returnes the method of the wrapped target object to be called when *method* is requested. :param method: The method to route to. :return: The method to call. :raises RPCMethodNotFound: When the method is not found. Example: .. code-block:: python MyClassB(object): def foo(self): return 123 MyClassA(object): def __init__(self): self.b = MyClassB() def bar(self): return "test" rpc = RPC(MyClassA()) rpc._route("bar") # => <bound method ...> rpc._route("") # => <bound method ...> """ # recursively traverse target attributes obj = for part in method.split("."): if not hasattr(obj, part): break obj = getattr(obj, part) else: return obj raise RPCMethodNotFound(data=method) def _write(self, s: str) -> None: """ Writes a string *s* to the output stream. :param s: The string to write. :return: None. """ self.stdout.write(bytearray(f"{s}\n", "utf-8")) self.stdout.flush()
[docs]class Watchdog(threading.Thread): """ This class represents a thread that watches the input stream of an :py:class:`RPC` instance for incoming content and dispatches requests to it. :param rpc: The :py:class:`RPC` instance to watch. :param name: The thread's name. :param interval: The polling interval of the run loop. :param daemon: The thread's daemon flag. :param start: Whether to start the thread immediately. .. py:attribute:: rpc The :py:class:`RPC` instance. .. py:attribute:: name The thread's name. .. py:attribute:: interval The polling interval of the run loop. .. py:attribute:: daemon The thread's daemon flag. """ def __init__( self, rpc: RPC, name: str = "watchdog", interval: float | int = 0.1, daemon: bool = False, start: bool = True, ) -> None: super().__init__() # store attributes self.rpc = rpc = name self.interval = interval self.daemon = daemon # register a stop event self._stop = threading.Event() if start: self.start()
[docs] def start(self) -> None: """ Starts the thread's activity. :return: None. """ super().start()
[docs] def stop(self) -> None: """ Stops the thread's activity. :return: None. """ self._stop.set()
[docs] def run(self) -> None: """ The main run loop of the watchdog thread. Reads the input stream of the :py:class:`RPC` instance and dispatches incoming content to it. :return: None. """ # reset the stop event self._stop.clear() # stop here when stdin is not set or closed if self.rpc.stdin is None or self.rpc.stdin.closed: return # read new incoming lines last_pos = 0 while not self._stop.is_set(): lines = None # stop when stdin is closed if self.rpc.stdin.closed: break # Keep linter happy if self.rpc.original_stdin and self.rpc.original_stdin.closed: # type: ignore[attr-defined] # noqa break # read from stdin depending on whether it is a tty or not if self.rpc.stdin.isatty(): cur_pos = self.rpc.stdin.tell() if cur_pos != last_pos: lines = self.rpc.stdin.readlines() last_pos = self.rpc.stdin.tell() else: try: lines = [self.rpc.stdin.readline()] except IOError: # prevent residual race conditions occurring when stdin is closed externally pass # handle new lines if any if lines: for b_line in lines: line = b_line.decode("utf-8").strip() if line: self.rpc._handle(line) else: self._stop.wait(self.interval)
[docs]class RPCError(Exception): """ Base class for RPC errors. :param data: Additional error data. .. py:attribute:: code_range The range of error codes that this error class is responsible for. .. py:attribute:: code The error code of this error. .. py:attribute:: title The title of this error. .. py:attribute:: message The message of this error, i.e., ``"<title> (<code>)[, data: <data>]"``. .. py:attribute:: data Additional data of this error. Setting the data attribute will also change the message attribute. """ code_range: tuple[int, int] code: int title: str
[docs] @classmethod def is_code_range(cls, code: Any) -> bool: """ Returns *True* when *code* is a valid error code range, i.e., a tuple of two integers where the first integer is less or equal to the second integer. :param code: The code to check. :return: Whether *code* is a valid error code range. """ return ( isinstance(code, tuple) and len(code) == 2 and all(isinstance(i, int) for i in code) and code[0] <= code[1] )
def __init__(self, data: str | None = None) -> None: # build the error message message = f"{self.title} ({self.code})" if data is not None: message += f", data: {data}" self.message = message super().__init__(message) = data def __str__(self) -> str: return self.message
error_map_code: dict[int, Type[RPCError]] = {} error_map_code_range: dict[tuple[int, int], Type[RPCError]] = {}
[docs]def register_error(cls: Type[RPCError]) -> Type[RPCError]: """ Decorator that registers a new RPC error derived from :py:class:`RPCError`. The purpose of error registration is to have a mapping of error codes/code ranges to error classes for faster lookups during error creation. .. code-block:: python @register_error class MyCustomRPCError(RPCError): code_range = (lower_bound, upper_bound) # both inclusive code = code_range[0] # default code when used as is title = "My custom error" """ if not issubclass(cls, RPCError): raise TypeError(f"'{cls}' is not a subclass of RPCError") # check duplicates if cls.code in error_map_code: raise AttributeError(f"duplicate RPC error code {cls.code}") if cls.code_range in error_map_code_range: raise AttributeError(f"duplicate RPC error code range {cls.code_range}") # register error_map_code[cls.code] = cls error_map_code_range[cls.code_range] = cls return cls
[docs]def get_error(code: int) -> Type[RPCError]: """ Returns the RPC error class that was previously registered to *code*. A ``ValueError`` is raised if no error class was found for *code*. :param code: The error code to look up. :return: The error class. :raises ValueError: When no error class was found for *code*. """ if code in error_map_code: return error_map_code[code] for (lower, upper), cls in error_map_code_range.items(): if lower <= code <= upper: return cls raise ValueError(f"unknown error code '{code}' ({type(code)})")
[docs]@register_error class RPCParseError(RPCError): code_range = (-32700, -32700) code = code_range[0] title = "Parse error"
[docs]@register_error class RPCInvalidRequest(RPCError): code_range = (-32600, -32600) code = code_range[0] title = "Invalid Request"
[docs]@register_error class RPCMethodNotFound(RPCError): code_range = (-32601, -32601) code = code_range[0] title = "Method not found"
[docs]@register_error class RPCInvalidParams(RPCError): code_range = (-32602, -32602) code = code_range[0] title = "Invalid params"
[docs]@register_error class RPCInternalError(RPCError): code_range = (-32603, -32603) code = code_range[0] title = "Internal error"
[docs]@register_error class RPCServerError(RPCError): code_range = (-32099, -32000) code = code_range[0] # default code when used as is title = "Server error"