Source code for flask_paseto_extended.issuer

import json
import typing as t

from pyseto import Key, Paseto

from .config import PASETO_VERSIONS_ACCEPTABLE
from .exceptions import EncodeError

PASETO_DEFAULT_USE_ISS: bool = True
PASETO_DEFAULT_USE_IAT: bool = False
PASETO_DEFAULT_EXP: int = 3600
PASETO_DEFAULT_USE_KID: bool = False
PASETO_DEFAULT_SERIALIZER: t.Any = json


[docs] class PasetoIssuer(object): def __init__(self, app=None, add_context_processor=True): if app is not None: self.init_app(app, add_context_processor)
[docs] def init_app(self, app, add_context_processor=True): """ Configures an Flask application to use this PasetoIssuer. """ app.paseto_issuer = self # _iss self._iss = app.config.get("PASETO_ISS", "") if not self._iss: raise ValueError("PASETO_ISS must be set.") if not isinstance(self._iss, str): raise ValueError("PASETO_ISS must be str.") # _use_iss self._use_iss = app.config.get("PASETO_USE_ISS", PASETO_DEFAULT_USE_ISS) if not isinstance(self._use_iss, bool): raise ValueError("PASETO_USE_ISS must be bool.") # _use_iat self._use_iat = app.config.get("PASETO_USE_IAT", PASETO_DEFAULT_USE_IAT) if not isinstance(self._use_iat, bool): raise ValueError("PASETO_USE_IAT must be bool.") # _exp self._exp = app.config.get("PASETO_EXP", PASETO_DEFAULT_EXP) if not isinstance(self._exp, int) or self._exp < 0: raise ValueError("PASETO_EXP must be int (>= 0).") # _use_kid self._use_kid = app.config.get("PASETO_USE_KID", PASETO_DEFAULT_USE_KID) if not isinstance(self._use_kid, bool): raise ValueError("PASETO_USE_KID must be bool.") # _serializer self._serializer: t.Any = app.config.get("PASETO_SERIALIZER", PASETO_DEFAULT_SERIALIZER) if not hasattr(self._serializer, "dumps") or not callable(self._serializer.dumps): raise ValueError("PASETO_SERIALIZER must have a callable 'dumps'.") # _keys keys: list = app.config.get("PASETO_PRIVATE_KEYS", []) if not keys: raise ValueError("PASETO_PRIVATE_KEYS must be set.") self._keys: dict = {} key: t.Any = None kid: bytes = b"" for k in keys: if "paserk" in k: try: key = Key.from_paserk(k["paserk"]) except Exception as err: raise ValueError("Invalid PASERK data.") from err if key.purpose == "local": raise ValueError("A local key is not allowed.") else: if "version" not in k: raise ValueError("A key object must have a 'paserk' or a pair of 'version' and 'key'.") if not isinstance(k["version"], int): raise ValueError("A 'version' in PASETO_PRIVATE_KEYS must be int.") if k["version"] not in PASETO_VERSIONS_ACCEPTABLE: raise ValueError(f"Invalid PASETO version: {k['version']}.") if "key" not in k: raise ValueError("A key object must have a 'paserk' or a pair of 'version' and 'key'.") try: key = Key.new(k["version"], "public", k["key"]) except Exception as err: raise ValueError("A 'key' must be a PEM formatted key.") from err kid = key.to_paserk_id() self._keys[kid] = {"key": key} self._paseto = Paseto.new(exp=self._exp, include_iat=self._use_iat) return
[docs] def issue(self, payload: dict, kid: str = "") -> str: key = self._keys[list(self._keys)[0]] if len(self._keys) == 1 else self._keys.get(kid, None) if not key: raise ValueError("A signing key is not found.") footer: dict = {} if self._use_iss: payload["iss"] = self._iss if self._use_kid: footer["kid"] = list(self._keys.keys())[0] if len(self._keys) == 1 else kid try: if not footer: return self._paseto.encode(key["key"], payload, serializer=self._serializer) return self._paseto.encode(key["key"], payload, footer, serializer=self._serializer) except Exception as err: raise EncodeError("Failed to encode a token.") from err