refactor: cleanup
- lazy load backend - proper config evaluation - object-orientation of configuration
This commit is contained in:
parent
38bc403058
commit
fdf45fd114
26 changed files with 783 additions and 677 deletions
|
|
@ -137,21 +137,23 @@ NOTES
|
|||
SUCH DAMAGES.
|
||||
|
||||
"""
|
||||
from typing import NamedTuple, Tuple, Any
|
||||
from string import ascii_uppercase, digits, ascii_letters, punctuation
|
||||
from inspect import isclass
|
||||
from typing import NamedTuple
|
||||
from configparser import ConfigParser
|
||||
from ast import literal_eval
|
||||
from io import StringIO
|
||||
from os import environ
|
||||
from importlib.resources import path as resource_path
|
||||
from pathlib import Path
|
||||
|
||||
from connexion import FlaskApp
|
||||
from connexion.resolver import RestyResolver
|
||||
|
||||
from httpaste.model import Backend
|
||||
from httpaste.backend import get_backend_map
|
||||
from httpaste.helper.common import generate_random_string
|
||||
from httpaste.server import get_server_config
|
||||
from httpaste.server import Config as ServerConfig
|
||||
from httpaste.context import get_context_config
|
||||
from httpaste.context import Config as ContextConfig
|
||||
from httpaste.model import get_model_config
|
||||
from httpaste.model import Config as ModelConfig
|
||||
from httpaste.backend import get_backend_config
|
||||
from httpaste.backend import Config as BackendConfig
|
||||
from httpaste.helper.config import get_configparser, CONFIGPATH_ENVIRON
|
||||
from httpaste.helper.http import (
|
||||
BadRequestError,
|
||||
ForbiddenError,
|
||||
|
|
@ -160,147 +162,48 @@ from httpaste.helper.http import (
|
|||
UnauthorizedError)
|
||||
|
||||
|
||||
CONFIGPATH_ENVIRON = 'HTTPASTE_CONFIGPATH'
|
||||
|
||||
|
||||
def get_sanitized_config_charset(charset: str):
|
||||
|
||||
for x in ["$", "%"]:
|
||||
|
||||
charset = charset.replace(x, f'{x}{x}')
|
||||
|
||||
return charset
|
||||
|
||||
|
||||
class ConfigError(Exception):
|
||||
"""Config Exception
|
||||
class Config(NamedTuple):
|
||||
"""
|
||||
|
||||
|
||||
class Config:
|
||||
"""httpaste global config
|
||||
"""
|
||||
salt: bytes = get_sanitized_config_charset(generate_random_string(
|
||||
32, ascii_letters + digits + punctuation)).encode('utf-8')
|
||||
paste_id_size: int = 8
|
||||
paste_id_charset: str = ascii_letters + digits
|
||||
paste_key_size: int = 32
|
||||
paste_key_charset: str = get_sanitized_config_charset(
|
||||
ascii_letters + digits + punctuation)
|
||||
paste_lifetime: int = 5
|
||||
backend: Backend = None
|
||||
hmac_iterations: int = 20000
|
||||
paste_default_encoding: str = 'utf-8'
|
||||
context: ContextConfig
|
||||
server: ServerConfig
|
||||
model: ModelConfig
|
||||
backend: BackendConfig
|
||||
|
||||
|
||||
class ServerConfig:
|
||||
"""connexion config
|
||||
"""
|
||||
swagger_ui: bool = True
|
||||
bind_address = None
|
||||
|
||||
|
||||
def get_config_path(var_name: str = CONFIGPATH_ENVIRON):
|
||||
def get_config(configIni: ConfigParser, path: Path):
|
||||
"""
|
||||
"""
|
||||
|
||||
try:
|
||||
from httpaste.model import Config as ModelConfig
|
||||
|
||||
return environ[var_name]
|
||||
except KeyError as e:
|
||||
context_config = get_context_config(configIni)
|
||||
server_config = get_server_config(configIni)
|
||||
model_config = get_model_config(configIni, path)
|
||||
backend_config = get_backend_config(configIni, path)
|
||||
|
||||
raise ConfigError(
|
||||
f'environment variable \'{var_name}\' not set.') from e
|
||||
return Config(
|
||||
context=context_config,
|
||||
server=server_config,
|
||||
model=model_config,
|
||||
backend=backend_config
|
||||
)
|
||||
|
||||
|
||||
def load_config(path: str) -> Tuple[Config, ServerConfig]:
|
||||
"""get config objects from file
|
||||
"""
|
||||
|
||||
_config = ConfigParser()
|
||||
_config.read(path)
|
||||
|
||||
backends = get_backend_map()
|
||||
bconf = dict(_config.items('backend'))
|
||||
btype = bconf.pop('type')
|
||||
|
||||
try:
|
||||
bcl, bparamcl = backends[btype]
|
||||
except KeyError as e:
|
||||
bids = ', '.join(backends.keys())
|
||||
raise ConfigError(' '.join((
|
||||
f'invalid backend \'{btype}\' in \'{path}\'. ',
|
||||
f'must be any of [{bids}]'
|
||||
))) from e
|
||||
|
||||
config = dict(_config.items('general'))
|
||||
server_config = dict(_config.items('server'))
|
||||
|
||||
c = Config()
|
||||
sc = ServerConfig()
|
||||
|
||||
# typecast model_backend section items
|
||||
bconf = {k: literal_eval(v) for k, v in bconf.items()}
|
||||
# initialize model backend
|
||||
c.backend = bcl(bparamcl(**bconf))
|
||||
|
||||
# typecast general section items
|
||||
for k, v in config.items():
|
||||
setattr(c, k, literal_eval(v))
|
||||
# typecast server section items
|
||||
for k, v in server_config.items():
|
||||
setattr(sc, k, literal_eval(v))
|
||||
|
||||
c.salt = c.salt.encode('utf-8')
|
||||
|
||||
return c, sc
|
||||
|
||||
|
||||
def default_config() -> str:
|
||||
def load_config(path: str = None, var_name: str = CONFIGPATH_ENVIRON):
|
||||
"""
|
||||
"""
|
||||
|
||||
config = ConfigParser()
|
||||
configIni, _ = get_configparser(path, var_name)
|
||||
|
||||
config['general'] = {
|
||||
'salt': Config.salt.decode('utf-8'),
|
||||
'paste_key_charset': Config.paste_key_charset,
|
||||
'paste_id_charset': Config.paste_id_charset
|
||||
}
|
||||
|
||||
for literal in [
|
||||
'paste_id_size',
|
||||
'paste_key_size',
|
||||
'paste_lifetime'
|
||||
]:
|
||||
config['general'][literal] = str(getattr(Config, literal))
|
||||
|
||||
config['backend'] = {
|
||||
'type': 'sqlite',
|
||||
'path': 'file::memory:?cache=shared'
|
||||
}
|
||||
|
||||
config['server'] = {}
|
||||
for literal in [
|
||||
'swagger_ui',
|
||||
'bind_address'
|
||||
]:
|
||||
config['server'][literal] = str(getattr(ServerConfig, literal))
|
||||
|
||||
stream = StringIO()
|
||||
config.write(stream)
|
||||
stream.seek(0)
|
||||
|
||||
return stream.read()
|
||||
return get_config(configIni, Path(path).resolve().parent)
|
||||
|
||||
|
||||
def get_flask_app(
|
||||
config: Config,
|
||||
server_config: ServerConfig = ServerConfig) -> FlaskApp:
|
||||
def get_flask_app(config: Config) -> FlaskApp:
|
||||
"""get a flask app object
|
||||
"""
|
||||
|
||||
options = {"swagger_ui": server_config.swagger_ui}
|
||||
options = {"swagger_ui": config.server.swagger_ui}
|
||||
|
||||
#context manager returns a pathlib.Path object
|
||||
with resource_path('httpaste.schema', 'httpaste.openapi.json') as path:
|
||||
|
|
@ -340,8 +243,6 @@ def get_flask_app(
|
|||
|
||||
__all__ = [
|
||||
Config,
|
||||
ServerConfig,
|
||||
load_config,
|
||||
default_config,
|
||||
get_flask_app
|
||||
]
|
||||
|
|
|
|||
|
|
@ -40,9 +40,9 @@ def command_standalone(**kwargs):
|
|||
'Please install it by running \'python3 -m pip install gevent\'.'
|
||||
))) from e
|
||||
|
||||
config, server_config = load_config(kwargs.get('config_path'))
|
||||
config = load_config(kwargs.get('config_path'))
|
||||
|
||||
application = get_flask_app(config, server_config)
|
||||
application = get_flask_app(config)
|
||||
|
||||
http_server = WSGIServer(('', kwargs.get('port')), application)
|
||||
http_server.serve_forever()
|
||||
|
|
@ -122,7 +122,7 @@ def parser():
|
|||
|
||||
p_standalone = sp.add_parser('standalone', help=command_standalone.__doc__)
|
||||
p_standalone.add_argument('--config-path', '-c', required=True)
|
||||
p_standalone.add_argument('--port', '-p', default=8080)
|
||||
p_standalone.add_argument('--port', '-p', default=8082)
|
||||
|
||||
p_wsgi = sp.add_parser('wsgi', help=command_wsgi.__doc__)
|
||||
p_wsgi.add_argument('--echo', '-e', action='store_true')
|
||||
|
|
|
|||
|
|
@ -2,83 +2,114 @@
|
|||
|
||||
implements backend of model
|
||||
"""
|
||||
import sys
|
||||
from inspect import isclass
|
||||
from typing import Dict, Tuple
|
||||
from abc import ABC, abstractmethod
|
||||
from importlib import import_module
|
||||
from configparser import ConfigParser
|
||||
from typing import NamedTuple
|
||||
from pathlib import Path
|
||||
|
||||
from httpaste.model import Backend, UserDataSchema, PasteDataSchema, User, Paste
|
||||
from .sqlite import Parameters as SqliteParameters
|
||||
from .sqlite import User as SqliteUser
|
||||
from .sqlite import Paste as SqlitePaste
|
||||
from .sqlite import get_connection as get_sqlite_connection
|
||||
from .mysql import get_connection as get_mysql_connection
|
||||
from .file import Parameters as FileParameters
|
||||
from .file import User as FileUser
|
||||
from .file import Paste as FilePaste
|
||||
from .mysql import Parameters as MySQLParameters
|
||||
from .mysql import User as MySQLUser
|
||||
from .mysql import Paste as MySQLPaste
|
||||
from httpaste.schema import User, Paste, UserDataSchema, PasteDataSchema
|
||||
from httpaste.helper.config import get_config, ConfigError
|
||||
|
||||
|
||||
class SQLite(Backend):
|
||||
"""SQLite backend interface
|
||||
class BackendError(Exception):
|
||||
"""
|
||||
"""
|
||||
|
||||
parameter_class = SqliteParameters
|
||||
user: SqliteUser
|
||||
paste: SqlitePaste
|
||||
|
||||
def __init__(self, parameters: SqliteParameters):
|
||||
|
||||
parameters = SqliteParameters(parameters.path, get_sqlite_connection(parameters))
|
||||
|
||||
self.user = SqliteUser(parameters, User)
|
||||
self.paste = SqlitePaste(parameters, Paste)
|
||||
|
||||
|
||||
class File(Backend):
|
||||
"""File backend interface
|
||||
class ObjectBackend(ABC):
|
||||
"""
|
||||
"""
|
||||
|
||||
parameter_class = FileParameters
|
||||
user: FileUser
|
||||
paste: FilePaste
|
||||
@abstractmethod
|
||||
def load(self, proto: object) -> object:
|
||||
pass
|
||||
|
||||
def __init__(self, parameters: FileParameters):
|
||||
@abstractmethod
|
||||
def dump(self, model: object) -> None:
|
||||
pass
|
||||
|
||||
self.user = FileUser(parameters, User, UserDataSchema)
|
||||
self.paste = FilePaste(parameters, Paste, PasteDataSchema)
|
||||
@abstractmethod
|
||||
def delete(self, proto: object) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def init(self) -> object:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def sanitize(self) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class MySQL(Backend):
|
||||
"""MySQL backend interface
|
||||
class BackendInterface(ABC):
|
||||
"""
|
||||
"""
|
||||
|
||||
parameter_class = MySQLParameters
|
||||
user: MySQLUser
|
||||
paste: MySQLPaste
|
||||
@abstractmethod
|
||||
def __init__(self, params: object,
|
||||
user_model_class: type,
|
||||
paste_model_class: type,
|
||||
user_schema: type,
|
||||
paste_schema: type) -> None:
|
||||
pass
|
||||
|
||||
def __init__(self, parameters: MySQLParameters):
|
||||
@property
|
||||
@abstractmethod
|
||||
def user(self) -> ObjectBackend:
|
||||
pass
|
||||
|
||||
#parameters = MySQLParameters(*parameters[1:], get_mysql_connection(parameters))
|
||||
|
||||
self.user = MySQLUser(parameters, User)
|
||||
self.paste = MySQLPaste(parameters, Paste)
|
||||
@property
|
||||
@abstractmethod
|
||||
def paste(self) -> ObjectBackend:
|
||||
pass
|
||||
|
||||
|
||||
def get_backend_map() -> Dict[str, Tuple[type, type]]:
|
||||
"""get a map of backend ids and their classes
|
||||
class Config(NamedTuple):
|
||||
"""Backend Configuration
|
||||
"""
|
||||
interface: type
|
||||
config: dict
|
||||
|
||||
|
||||
def load_backend(config: Config) -> BackendInterface:
|
||||
"""load a backend
|
||||
"""
|
||||
|
||||
mod = sys.modules[__name__]
|
||||
out = {}
|
||||
backend = config.interface(config.config, Paste, User, PasteDataSchema,
|
||||
UserDataSchema)
|
||||
|
||||
for i in dir(mod):
|
||||
return backend
|
||||
|
||||
obj = getattr(mod, i)
|
||||
|
||||
if isclass(obj) and obj.__module__ == __name__:
|
||||
def get_backend_config(configIni: ConfigParser, path:Path) -> Config:
|
||||
"""retrieve a cascaded backend configuration from an INI config object
|
||||
"""
|
||||
|
||||
out[i.lower()] = (obj, obj.parameter_class)
|
||||
if 'backend' not in configIni:
|
||||
|
||||
return out
|
||||
raise ConfigError('missing [backend] section.')
|
||||
|
||||
if 'type' not in configIni['backend']:
|
||||
|
||||
raise ConfigError('missing [backend] \'type\'.')
|
||||
|
||||
mod_name = configIni['backend']['type']
|
||||
|
||||
section = f'backend.{mod_name}'
|
||||
|
||||
try:
|
||||
mod = import_module(f'.{mod_name}', 'httpaste.backend')
|
||||
except ImportError as e:
|
||||
raise BackendError(f'backend \'{mod_name}\' does not exist: {e}') from e
|
||||
else:
|
||||
interface = mod.Backend
|
||||
config = get_config(configIni, section, mod.Config, path)
|
||||
|
||||
return Config(interface=interface,config=config)
|
||||
|
||||
|
||||
__all__ = [
|
||||
load_backend,
|
||||
get_backend_config
|
||||
]
|
||||
|
|
|
|||
|
|
@ -3,110 +3,117 @@
|
|||
from os import path
|
||||
from pathlib import Path
|
||||
from typing import NamedTuple, Optional
|
||||
|
||||
from . import user
|
||||
from . import paste
|
||||
from httpaste.backend import BackendInterface as BackendAbc
|
||||
from httpaste.backend import ObjectBackend as ObjectBackendAbc
|
||||
|
||||
|
||||
class Parameters(NamedTuple):
|
||||
"""Filesystem backend parameters
|
||||
class Config(NamedTuple):
|
||||
"""Filesystem backend config
|
||||
"""
|
||||
|
||||
#: path of base directory
|
||||
base_dirname: str
|
||||
base_dirname: Path
|
||||
#: basename of users table directory
|
||||
user_dirname: Optional[str] = 'users'
|
||||
user_dirname: str = 'users'
|
||||
#: basename of pastes table directory
|
||||
paste_dirname: Optional[str] = 'pastes'
|
||||
paste_dirname: str = 'pastes'
|
||||
|
||||
|
||||
class User(object):
|
||||
"""Filesystem user model backend
|
||||
"""
|
||||
class ObjectBackendBc(ObjectBackendAbc):
|
||||
|
||||
dirname: Path
|
||||
path: Path
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parameters: Parameters,
|
||||
interface: object,
|
||||
basename_attr: str,
|
||||
config: Config,
|
||||
model_class: type,
|
||||
model_schema: type):
|
||||
|
||||
self.interface = interface
|
||||
self.model_class = model_class
|
||||
|
||||
self.model_schema = model_schema
|
||||
|
||||
self.dirname = path.join(parameters.base_dirname,
|
||||
parameters.user_dirname)
|
||||
|
||||
self.dirname = path.join(config.base_dirname,
|
||||
getattr(config, basename_attr))
|
||||
self.path = Path(self.dirname)
|
||||
|
||||
def load(self, proto: object):
|
||||
|
||||
return user.load(proto, self.path, self.model_class, self.model_schema)
|
||||
return self.interface.load(proto, self.path, self.model_class, self.model_schema)
|
||||
|
||||
def dump(self, model: object):
|
||||
|
||||
return user.dump(model, self.path, self.model_schema)
|
||||
return self.interface.dump(model, self.path, self.model_schema)
|
||||
|
||||
def delete(self, proto: object):
|
||||
|
||||
return user.delete(proto, self.path)
|
||||
return self.interface.delete(proto, self.path)
|
||||
|
||||
def init(self):
|
||||
|
||||
return user.init(self.path)
|
||||
return self.interface.init(self.path)
|
||||
|
||||
def sanitize(self):
|
||||
|
||||
if self.path.exists():
|
||||
return user.sanitize(self.path, self.model_class, self.model_schema)
|
||||
return self.interface.sanitize(self.path, self.model_class, self.model_schema)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class Paste(object):
|
||||
class UserBackend(ObjectBackendBc):
|
||||
"""Filesystem user model backend
|
||||
"""
|
||||
|
||||
def __init__(self, *args):
|
||||
|
||||
from . import user
|
||||
|
||||
super().__init__(user, 'user_dirname', *args)
|
||||
|
||||
|
||||
class PasteBackend(ObjectBackendBc):
|
||||
"""Filesystem paste model backend
|
||||
"""
|
||||
|
||||
dirname: str
|
||||
path: Path
|
||||
def __init__(self, *args):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parameters: Parameters,
|
||||
model_class: type,
|
||||
model_schema: type):
|
||||
from . import paste
|
||||
|
||||
self.model_class = model_class
|
||||
super().__init__(paste, 'paste_dirname', *args)
|
||||
|
||||
self.model_schema = model_schema
|
||||
|
||||
self.dirname = path.join(parameters.base_dirname,
|
||||
parameters.paste_dirname)
|
||||
class Backend(BackendAbc):
|
||||
"""File backend interface
|
||||
"""
|
||||
|
||||
self.path = Path(self.dirname)
|
||||
_user: UserBackend
|
||||
_paste: PasteBackend
|
||||
|
||||
def load(self, proto: object):
|
||||
def __init__(self,
|
||||
config: Config,
|
||||
paste_model_class: type,
|
||||
user_model_class: type,
|
||||
paste_schema: type,
|
||||
user_schema: type):
|
||||
|
||||
return paste.load(proto, self.path, self.model_class, self.model_schema)
|
||||
self._user = UserBackend(config, user_model_class, user_schema)
|
||||
self._paste = PasteBackend(config, paste_model_class, paste_schema)
|
||||
|
||||
def dump(self, model: object):
|
||||
@property
|
||||
def user(self) -> UserBackend:
|
||||
|
||||
return paste.dump(model, self.path, self.model_schema)
|
||||
return self._user
|
||||
|
||||
def delete(self, proto: object):
|
||||
@property
|
||||
def paste(self) -> PasteBackend:
|
||||
|
||||
return paste.delete(proto, self.path)
|
||||
return self._paste
|
||||
|
||||
def init(self):
|
||||
|
||||
return paste.init(self.path)
|
||||
|
||||
def sanitize(self):
|
||||
|
||||
if self.path.exists():
|
||||
return paste.sanitize(self.path, self.model_class, self.model_schema)
|
||||
|
||||
return None
|
||||
__all__ = [
|
||||
Config,
|
||||
Backend
|
||||
]
|
||||
|
|
|
|||
|
|
@ -1,10 +1,12 @@
|
|||
"""MySQL backend
|
||||
"""
|
||||
from typing import NamedTuple, Optional, Callable
|
||||
from typing import NamedTuple, Optional
|
||||
from httpaste.backend import BackendInterface as BackendAbc
|
||||
from httpaste.backend import ObjectBackend as ObjectBackendAbc
|
||||
|
||||
|
||||
class Parameters(NamedTuple):
|
||||
"""MySQL parameters
|
||||
class Config(NamedTuple):
|
||||
"""MySQL config
|
||||
"""
|
||||
|
||||
#: user name
|
||||
|
|
@ -16,26 +18,18 @@ class Parameters(NamedTuple):
|
|||
#: database identifier
|
||||
database: str
|
||||
#: a mysql.connection.MySQLConnection object (does not apply to config)
|
||||
connection: Optional[object] = None
|
||||
connection: object = None
|
||||
|
||||
|
||||
class User(object):
|
||||
"""MySQL user model backend
|
||||
"""
|
||||
class ObjectBackendBc(ObjectBackendAbc):
|
||||
|
||||
connection: object
|
||||
|
||||
def __init__(self, parameters: Parameters, model_class: type) -> None:
|
||||
|
||||
from . import user
|
||||
|
||||
connect = get_mysql_connect_callee()
|
||||
|
||||
self.interface = user
|
||||
def __init__(self, interface: object, config: Config, model_class: type) -> None:
|
||||
|
||||
self.interface = interface
|
||||
self.model_class = model_class
|
||||
|
||||
self.connection = get_connection(parameters, connect)
|
||||
self.connection = get_connection(config)
|
||||
|
||||
def load(self, proto: object) -> object:
|
||||
|
||||
|
|
@ -58,50 +52,54 @@ class User(object):
|
|||
return self.interface.sanitize(self.connection, self.model_class)
|
||||
|
||||
|
||||
class Paste(object):
|
||||
class UserBackend(ObjectBackendBc):
|
||||
"""MySQL user model backend
|
||||
"""
|
||||
|
||||
def __init__(self, *args) -> None:
|
||||
|
||||
from . import user
|
||||
|
||||
super().__init__(paste, *args)
|
||||
|
||||
|
||||
class PasteBackend(ObjectBackendBc):
|
||||
"""MySQL paste model backend
|
||||
"""
|
||||
|
||||
connection: object
|
||||
|
||||
def __init__(self, parameters: Parameters, model_class: type) -> None:
|
||||
def __init__(self, *args) -> None:
|
||||
|
||||
from . import paste
|
||||
|
||||
connect = get_mysql_connect_callee()
|
||||
|
||||
self.interface = paste
|
||||
|
||||
self.model_class = model_class
|
||||
|
||||
self.connection = get_connection(parameters, connect)
|
||||
|
||||
def load(self, proto: object) -> object:
|
||||
|
||||
return self.interface.load(proto, self.connection, self.model_class)
|
||||
|
||||
def dump(self, model: object) -> None:
|
||||
|
||||
return self.interface.dump(model, self.connection)
|
||||
|
||||
def delete(self, proto: object) -> None:
|
||||
|
||||
return self.interface.delete(proto, self.connection)
|
||||
|
||||
def init(self) -> None:
|
||||
|
||||
return paste.init(self.connection)
|
||||
|
||||
def sanitize(self) -> None:
|
||||
|
||||
return self.interface.sanitize(self.connection, self.model_class)
|
||||
super().__init__(paste, *args)
|
||||
|
||||
|
||||
def get_mysql_connect_callee() -> object:
|
||||
class Backend(BackendAbc):
|
||||
"""MySQL backend interface
|
||||
"""
|
||||
|
||||
user: UserBackend
|
||||
paste: PasteBackend
|
||||
|
||||
def __init__(self,
|
||||
config: Config,
|
||||
paste_model_class: type,
|
||||
user_model_class: type,
|
||||
paste_schema: type,
|
||||
user_schema: type):
|
||||
|
||||
self.user = UserBackend(config, user_model_class, user_schema)
|
||||
self.paste = PasteBackend(config, paste_model_class, paste_schema)
|
||||
|
||||
|
||||
def get_connection(config: Config) -> object:
|
||||
"""get a mysql.connection.MySQLConnection object
|
||||
"""
|
||||
|
||||
try:
|
||||
from mysql.connector import connect
|
||||
from mysql.connector.connection import MySQLConnection
|
||||
except ImportError as e:
|
||||
raise ImportError(' '.join((
|
||||
'\'mysql-connector-python\' is not installed.',
|
||||
|
|
@ -109,26 +107,18 @@ def get_mysql_connect_callee() -> object:
|
|||
'\'python3 -m pip install mysql-connector-python\'.'
|
||||
))) from e
|
||||
|
||||
return connect
|
||||
if config.connection is not None:
|
||||
|
||||
return config.connection
|
||||
|
||||
def get_connection(parameters: Parameters, connect: Callable) -> object:
|
||||
"""get a mysql.connection.MySQLConnection object
|
||||
"""
|
||||
|
||||
if parameters.connection is not None:
|
||||
|
||||
return parameters.connection
|
||||
|
||||
connection = connect(user=parameters.user, password=parameters.password,
|
||||
host=parameters.host,
|
||||
database=parameters.database)
|
||||
connection = connect(user=config.user, password=config.password,
|
||||
host=config.host,
|
||||
database=config.database)
|
||||
|
||||
return connection
|
||||
|
||||
|
||||
__all__ = [
|
||||
Parameters,
|
||||
User,
|
||||
Paste
|
||||
]
|
||||
Config,
|
||||
Backend
|
||||
]
|
||||
|
|
|
|||
|
|
@ -1,16 +1,6 @@
|
|||
from os import path
|
||||
from time import time
|
||||
from importlib.resources import open_text
|
||||
|
||||
|
||||
try:
|
||||
from mysql.connector.connection import MySQLConnection
|
||||
except ImportError as e:
|
||||
raise ImportError(' '.join((
|
||||
'\'mysql-connector-python\' is not installed.',
|
||||
'Install it by running',
|
||||
'\'python3 -m pip install mysql-connector-python\'.'
|
||||
))) from e
|
||||
from mysql.connector.connection import MySQLConnection
|
||||
|
||||
|
||||
def load(proto:object, connection: MySQLConnection, model_class: type):
|
||||
|
|
@ -94,9 +84,17 @@ def init(connection: MySQLConnection):
|
|||
|
||||
cursor = connection.cursor()
|
||||
|
||||
with open_text('httpaste.backend.mysql', 'paste.sql') as fh:
|
||||
statement = '''CREATE TABLE `httpaste_pastes` (
|
||||
`pid` blob NOT NULL,
|
||||
`data` longblob NOT NULL,
|
||||
`data_hash` blob NOT NULL,
|
||||
`sub` blob DEFAULT NULL,
|
||||
`expiration` int(16) NOT NULL,
|
||||
`encoding` tinytext DEFAULT NULL,
|
||||
PRIMARY KEY (`pid`(128))
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;'''
|
||||
|
||||
cursor.execute(fh.read())
|
||||
cursor.execute(statement)
|
||||
|
||||
connection.commit()
|
||||
|
||||
|
|
|
|||
|
|
@ -1,9 +0,0 @@
|
|||
CREATE TABLE `httpaste_pastes` (
|
||||
`pid` blob NOT NULL,
|
||||
`data` longblob NOT NULL,
|
||||
`data_hash` blob NOT NULL,
|
||||
`sub` blob DEFAULT NULL,
|
||||
`expiration` int(16) NOT NULL,
|
||||
`encoding` tinytext DEFAULT NULL,
|
||||
PRIMARY KEY (`pid`(128))
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
|
|
@ -1,13 +1,5 @@
|
|||
from os import path
|
||||
from importlib.resources import open_text
|
||||
try:
|
||||
from mysql.connector.connection import MySQLConnection
|
||||
except ImportError as e:
|
||||
raise ImportError(' '.join((
|
||||
'\'mysql-connector-python\' is not installed.',
|
||||
'Install it by running',
|
||||
'\'python3 -m pip install mysql-connector-python\'.'
|
||||
))) from e
|
||||
from mysql.connector.connection import MySQLConnection
|
||||
|
||||
|
||||
def load(proto:object, connection: MySQLConnection, model_class: type):
|
||||
|
|
@ -84,9 +76,14 @@ def init(connection: MySQLConnection):
|
|||
|
||||
cursor = connection.cursor()
|
||||
|
||||
with open_text('httpaste.backend.mysql', 'user.sql') as fh:
|
||||
statement = '''CREATE TABLE `httpaste_users` (
|
||||
`sub` blob NOT NULL,
|
||||
`key_hash` blob NOT NULL,
|
||||
`paste_index` blob NOT NULL,
|
||||
PRIMARY KEY (`sub`(128))
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;'''
|
||||
|
||||
cursor.execute(fh.read())
|
||||
cursor.execute(statement)
|
||||
|
||||
connection.commit()
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +0,0 @@
|
|||
CREATE TABLE `httpaste_users` (
|
||||
`sub` blob NOT NULL,
|
||||
`key_hash` blob NOT NULL,
|
||||
`paste_index` blob NOT NULL,
|
||||
PRIMARY KEY (`sub`(128))
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
|
|
@ -2,96 +2,113 @@
|
|||
"""
|
||||
from sqlite3 import Connection, Row, connect
|
||||
from typing import NamedTuple, Optional
|
||||
from pathlib import Path
|
||||
|
||||
from . import user
|
||||
from . import paste
|
||||
from httpaste.backend import BackendInterface as BackendAbc
|
||||
from httpaste.backend import ObjectBackend as ObjectBackendAbc
|
||||
|
||||
|
||||
class Parameters(NamedTuple):
|
||||
"""SQLite backend parameters
|
||||
class Config(NamedTuple):
|
||||
"""SQLite backend config
|
||||
"""
|
||||
|
||||
#: local path or URI
|
||||
path: str
|
||||
uri: Path
|
||||
user_table_name: str = 'httpaste_users'
|
||||
paste_table_name: str = 'httpaste_pastes'
|
||||
#: a sqlite3.Connection object (does not apply to config)
|
||||
connection: Optional[object] = None
|
||||
connection: Connection = None
|
||||
|
||||
|
||||
class User(object):
|
||||
"""SQLite user model backend
|
||||
class ObjectBackendBc(ObjectBackendAbc):
|
||||
|
||||
connection: object
|
||||
|
||||
def __init__(self, interface: object, table_name_attr: str, config: Config, model_class: type, schema: type) -> None:
|
||||
|
||||
self.interface = interface
|
||||
self.model_class = model_class
|
||||
self.connection = get_connection(config)
|
||||
self.table = getattr(config, table_name_attr)
|
||||
|
||||
def load(self, proto: object) -> object:
|
||||
|
||||
return self.interface.load(proto, self.connection, self.table, self.model_class)
|
||||
|
||||
def dump(self, model: object) -> None:
|
||||
|
||||
return self.interface.dump(model, self.connection, self.table)
|
||||
|
||||
def delete(self, proto: object) -> None:
|
||||
|
||||
return self.interface.delete(proto, self.connection, self.table)
|
||||
|
||||
def init(self) -> None:
|
||||
|
||||
return self.interface.init(self.connection, self.table)
|
||||
|
||||
def sanitize(self) -> None:
|
||||
|
||||
return self.interface.sanitize(self.connection, self.table, self.model_class)
|
||||
|
||||
|
||||
class UserBackend(ObjectBackendBc):
|
||||
"""sqlite user model backend
|
||||
"""
|
||||
|
||||
connection: Connection
|
||||
def __init__(self, *args) -> None:
|
||||
|
||||
def __init__(self, parameters: Parameters, model_class: type):
|
||||
from . import user
|
||||
|
||||
self.model_class = model_class
|
||||
|
||||
self.connection = get_connection(parameters)
|
||||
|
||||
def load(self, proto: object):
|
||||
|
||||
return user.load(proto, self.connection, self.model_class)
|
||||
|
||||
def dump(self, model: object):
|
||||
|
||||
return user.dump(model, self.connection)
|
||||
|
||||
def delete(self, proto: object):
|
||||
|
||||
return user.delete(proto, self.connection)
|
||||
|
||||
def init(self):
|
||||
|
||||
return user.init(self.connection)
|
||||
|
||||
def sanitize(self):
|
||||
|
||||
return user.sanitize(self.connection, self.model_class)
|
||||
super().__init__(paste, 'user_table_name', *args)
|
||||
|
||||
|
||||
class Paste(object):
|
||||
"""SQLite paste model backend
|
||||
class PasteBackend(ObjectBackendBc):
|
||||
"""sqlite paste model backend
|
||||
"""
|
||||
|
||||
connection: Connection
|
||||
connection: object
|
||||
|
||||
def __init__(self, parameters: Parameters, model_class: type):
|
||||
def __init__(self, *args) -> None:
|
||||
|
||||
self.model_class = model_class
|
||||
from . import paste
|
||||
|
||||
self.connection = get_connection(parameters)
|
||||
|
||||
def load(self, proto: object):
|
||||
|
||||
return paste.load(proto, self.connection, self.model_class)
|
||||
|
||||
def dump(self, model: object):
|
||||
|
||||
return paste.dump(model, self.connection)
|
||||
|
||||
def delete(self, proto: object):
|
||||
|
||||
return paste.delete(proto, self.connection)
|
||||
|
||||
def init(self):
|
||||
|
||||
return paste.init(self.connection)
|
||||
|
||||
def sanitize(self):
|
||||
|
||||
return paste.sanitize(self.connection, self.model_class)
|
||||
super().__init__(paste, 'paste_table_name', *args)
|
||||
|
||||
|
||||
def get_connection(parameters: Parameters):
|
||||
class Backend(BackendAbc):
|
||||
"""sqlite backend interface
|
||||
"""
|
||||
|
||||
user: UserBackend
|
||||
paste: PasteBackend
|
||||
|
||||
def __init__(self,
|
||||
config: Config,
|
||||
paste_model_class: type,
|
||||
user_model_class: type,
|
||||
paste_schema: type,
|
||||
user_schema: type):
|
||||
|
||||
self.user = UserBackend(config, user_model_class, user_schema)
|
||||
self.paste = PasteBackend(config, paste_model_class, paste_schema)
|
||||
|
||||
|
||||
def get_connection(config: Config):
|
||||
"""get an sqlite connection object
|
||||
"""
|
||||
|
||||
if parameters.connection:
|
||||
if config.connection:
|
||||
|
||||
return parameters.connection
|
||||
return config.connection
|
||||
|
||||
connection = connect(parameters.path, check_same_thread=False)
|
||||
connection = connect(config.uri, check_same_thread=False)
|
||||
connection.row_factory = Row
|
||||
|
||||
return connection
|
||||
|
||||
|
||||
__all__ = [
|
||||
Config,
|
||||
Backend
|
||||
]
|
||||
|
|
|
|||
|
|
@ -6,14 +6,14 @@ from time import time
|
|||
from importlib.resources import open_text
|
||||
|
||||
|
||||
def load(proto: object, connection: Connection, model_class: type):
|
||||
def load(proto: object, connection: Connection, table: str, model_class: type):
|
||||
"""load a paste
|
||||
"""
|
||||
|
||||
cursor = connection.cursor()
|
||||
|
||||
statement = '''SELECT pid, data, data_hash, sub, expiration, encoding
|
||||
FROM pastes
|
||||
statement = f'''SELECT pid, data, data_hash, sub, expiration, encoding
|
||||
FROM {table}
|
||||
WHERE pid=?'''
|
||||
|
||||
cursor.execute(statement, (proto.pid,))
|
||||
|
|
@ -33,13 +33,13 @@ def load(proto: object, connection: Connection, model_class: type):
|
|||
return None
|
||||
|
||||
|
||||
def dump(model: object, connection: Connection) -> None:
|
||||
def dump(model: object, connection: Connection, table: str) -> None:
|
||||
"""dump a paste
|
||||
"""
|
||||
|
||||
cursor = connection.cursor()
|
||||
|
||||
statement = '''INSERT INTO pastes
|
||||
statement = f'''INSERT INTO "{table}"
|
||||
(pid, data, data_hash, sub, expiration, encoding)
|
||||
VALUES (?,?,?,?,?,?)'''
|
||||
|
||||
|
|
@ -57,35 +57,41 @@ def dump(model: object, connection: Connection) -> None:
|
|||
return None
|
||||
|
||||
|
||||
def delete(proto: object, connection: Connection) -> None:
|
||||
def delete(proto: object, connection: Connection, table: str) -> None:
|
||||
|
||||
cursor = connection.cursor()
|
||||
|
||||
cursor.execute('''DELETE FROM pastes WHERE pid=?''', (proto.pid,))
|
||||
cursor.execute(f'''DELETE FROM {table} WHERE pid=?''', (proto.pid,))
|
||||
|
||||
connection.commit()
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def init(connection: Connection):
|
||||
def init(connection: Connection, table: str):
|
||||
|
||||
cursor = connection.cursor()
|
||||
|
||||
with open_text('httpaste.backend.sqlite', 'paste.sql') as fh:
|
||||
statement = f'''CREATE TABLE IF NOT EXISTS "{table}" (
|
||||
"pid" BLOB NOT NULL UNIQUE,
|
||||
"data" BLOB NOT NULL,
|
||||
"data_hash" BLOB NOT NULL,
|
||||
"sub" BLOB UNIQUE,
|
||||
"expiration" INTEGER NOT NULL,
|
||||
"encoding" TEXT,
|
||||
PRIMARY KEY("pid")
|
||||
);'''
|
||||
|
||||
statement = fh.read()
|
||||
|
||||
cursor.execute(statement)
|
||||
cursor.execute(statement)
|
||||
|
||||
connection.commit()
|
||||
|
||||
|
||||
def sanitize(connection: Connection, model_class: type) -> int:
|
||||
def sanitize(connection: Connection, table: str, model_class: type) -> int:
|
||||
|
||||
cursor = connection.cursor()
|
||||
|
||||
statement = '''SELECT pid FROM pastes
|
||||
statement = f'''SELECT pid FROM {table}
|
||||
WHERE expiration < ? AND expiration > 0'''
|
||||
|
||||
cursor.execute(statement, (int(time()),))
|
||||
|
|
|
|||
|
|
@ -1,9 +0,0 @@
|
|||
CREATE TABLE IF NOT EXISTS "pastes" (
|
||||
"pid" BLOB NOT NULL UNIQUE,
|
||||
"data" BLOB NOT NULL,
|
||||
"data_hash" BLOB NOT NULL,
|
||||
"sub" BLOB UNIQUE,
|
||||
"expiration" INTEGER NOT NULL,
|
||||
"encoding" TEXT,
|
||||
PRIMARY KEY("pid")
|
||||
);
|
||||
|
|
@ -6,14 +6,14 @@ from httpaste.model import User
|
|||
from importlib.resources import open_text
|
||||
|
||||
|
||||
def load(proto: object, connection: Connection, model_class: type):
|
||||
def load(proto: object, connection: Connection, table: str, model_class: type):
|
||||
"""load a user
|
||||
"""
|
||||
|
||||
cursor = connection.cursor()
|
||||
|
||||
statement = '''SELECT sub, key_hash, paste_index
|
||||
FROM users
|
||||
statement = f'''SELECT sub, key_hash, paste_index
|
||||
FROM {table}
|
||||
WHERE sub=?'''
|
||||
|
||||
cursor.execute(statement, (proto.sub,))
|
||||
|
|
@ -27,13 +27,13 @@ def load(proto: object, connection: Connection, model_class: type):
|
|||
return None
|
||||
|
||||
|
||||
def dump(model: object, connection: Connection) -> None:
|
||||
def dump(model: object, connection: Connection, table: str) -> None:
|
||||
"""dump a user
|
||||
"""
|
||||
|
||||
cursor = connection.cursor()
|
||||
|
||||
statement = '''INSERT OR REPLACE INTO users
|
||||
statement = f'''INSERT OR REPLACE INTO {table}
|
||||
(sub, key_hash, paste_index)
|
||||
VALUES (?,?,?)'''
|
||||
|
||||
|
|
@ -44,32 +44,35 @@ def dump(model: object, connection: Connection) -> None:
|
|||
return None
|
||||
|
||||
|
||||
def delete(proto: object, connection: Connection) -> None:
|
||||
def delete(proto: object, connection: Connection, table: str) -> None:
|
||||
|
||||
cursor = connection.cursor()
|
||||
|
||||
cursor.execute('''DELETE FROM users WHERE sub=?''', (proto.sub,))
|
||||
cursor.execute(f'''DELETE FROM {table} WHERE sub=?''', (proto.sub,))
|
||||
|
||||
connection.commit()
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def init(connection: Connection) -> None:
|
||||
def init(connection: Connection, table: str) -> None:
|
||||
|
||||
cursor = connection.cursor()
|
||||
|
||||
with open_text('httpaste.backend.sqlite', 'user.sql') as fh:
|
||||
statement = f'''CREATE TABLE IF NOT EXISTS "{table}" (
|
||||
"sub" BLOB NOT NULL UNIQUE,
|
||||
"key_hash" BLOB NOT NULL,
|
||||
"paste_index" BLOB,
|
||||
PRIMARY KEY("sub")
|
||||
);'''
|
||||
|
||||
statement = fh.read()
|
||||
|
||||
cursor.execute(statement)
|
||||
cursor.execute(statement)
|
||||
|
||||
connection.commit()
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def sanitize(connection: Connection, model_class) -> int:
|
||||
def sanitize(connection: Connection, table: str, model_class) -> int:
|
||||
|
||||
return 0
|
||||
|
|
|
|||
|
|
@ -1,6 +0,0 @@
|
|||
CREATE TABLE IF NOT EXISTS "users" (
|
||||
"sub" BLOB NOT NULL UNIQUE,
|
||||
"key_hash" BLOB NOT NULL,
|
||||
"paste_index" BLOB,
|
||||
PRIMARY KEY("sub")
|
||||
);
|
||||
18
src/httpaste/context.py
Executable file
18
src/httpaste/context.py
Executable file
|
|
@ -0,0 +1,18 @@
|
|||
from typing import NamedTuple
|
||||
from string import ascii_uppercase, digits, ascii_letters, punctuation
|
||||
from configparser import ConfigParser
|
||||
|
||||
from httpaste.helper.common import generate_random_string
|
||||
from httpaste.helper.config import get_sanitized_config_charset, get_config
|
||||
|
||||
class Config(NamedTuple):
|
||||
"""httpaste global config
|
||||
"""
|
||||
salt: bytes = get_sanitized_config_charset(generate_random_string(
|
||||
32, ascii_letters + digits + punctuation)).encode('utf-8')
|
||||
hmac_iter: int = 20000
|
||||
|
||||
|
||||
def get_context_config(configIni: ConfigParser) -> Config:
|
||||
|
||||
return get_config(configIni, 'context', Config)
|
||||
|
|
@ -5,12 +5,14 @@ import httpaste
|
|||
def get(**kwargs):
|
||||
|
||||
config = current_app.httpaste
|
||||
context = config.context
|
||||
model = config.model
|
||||
|
||||
|
||||
return httpaste.__doc__.format(
|
||||
url=connexion.request.url,
|
||||
hmac_iterations=config.hmac_iterations,
|
||||
paste_lifetime=config.paste_lifetime,
|
||||
paste_max_lifetime=str(round(config.paste_max_lifetime / 60)),
|
||||
paste_default_encoding=config.paste_default_encoding
|
||||
hmac_iterations=context.hmac_iter,
|
||||
paste_lifetime=model.paste.default_lifetime,
|
||||
paste_max_lifetime=str(round(model.paste.default_max_lifetime / 60)),
|
||||
paste_default_encoding=model.paste.default_encoding
|
||||
), 200
|
||||
|
|
|
|||
|
|
@ -5,8 +5,10 @@ from flask import current_app
|
|||
from httpaste.helper.common import decode, DecodeError, join_url
|
||||
import httpaste.model.paste as paste_model
|
||||
import httpaste.model.user as user_model
|
||||
from httpaste.backend import load_backend
|
||||
from httpaste.helper.http import BadRequestError, GoneError, NotFoundError
|
||||
from httpaste.model import (
|
||||
from httpaste.helper.syntax import highlight
|
||||
from httpaste.schema import (
|
||||
PasteKey,
|
||||
PasteData,
|
||||
PasteLifetime,
|
||||
|
|
@ -15,6 +17,13 @@ from httpaste.model import (
|
|||
Sub)
|
||||
|
||||
|
||||
class Config:
|
||||
default_mime_type: str = 'text/plain'
|
||||
default_linenos: bool = False
|
||||
default_syntax: bool = False
|
||||
default_formatter: str = 'terminal256'
|
||||
|
||||
|
||||
def delete(**kwargs):
|
||||
"""
|
||||
"""
|
||||
|
|
@ -45,12 +54,15 @@ def get(**kwargs):
|
|||
"""
|
||||
|
||||
config = current_app.httpaste
|
||||
backend = load_backend(config.backend)
|
||||
context = config.context
|
||||
|
||||
syntax = kwargs.get('syntax')
|
||||
formatter = kwargs.get('format', Config.default_formatter)
|
||||
linenos = kwargs.get('linenos', Config.default_linenos)
|
||||
mime = kwargs.get('mime', Config.default_mime_type)
|
||||
|
||||
pid = PasteKey(kwargs['id'].encode('utf-8'))
|
||||
syntax = kwargs.get('syntax')
|
||||
formatter = kwargs.get('format', 'terminal256')
|
||||
linenos = kwargs.get('linenos', False)
|
||||
mime = kwargs.get('mime', 'text/plain')
|
||||
|
||||
if kwargs.get('user') is not None:
|
||||
# authenticated
|
||||
|
|
@ -58,26 +70,23 @@ def get(**kwargs):
|
|||
key = MasterKey(kwargs['token_info'].get('master_key'))
|
||||
sub = Sub(kwargs['token_info'].get('sub'))
|
||||
|
||||
pkey = user_model.load_paste_key(pid, sub, key, config.backend.user,
|
||||
config.salt, config.hmac_iterations)
|
||||
pkey = user_model.load_paste_key(pid, sub, key, backend.user, context)
|
||||
|
||||
def call(): return paste_model.get_safe(pid, pkey, sub,
|
||||
config.backend.paste,
|
||||
config.salt, config.hmac_iterations)
|
||||
config.model.paste,
|
||||
backend.paste, context)
|
||||
else:
|
||||
# unauthenticated
|
||||
|
||||
def call(): return paste_model.get(pid, config.backend.paste,
|
||||
config.salt, config.hmac_iterations)
|
||||
def call(): return paste_model.get(pid, backend.paste, context)
|
||||
|
||||
try:
|
||||
data, expiration, encoding = call()
|
||||
except paste_model.LifetimeError as e:
|
||||
if kwargs.get('user') is not None:
|
||||
paste_model.remove_safe(pid, sub, pkey, config.backend.paste,
|
||||
config.salt, config.hmac_iterations)
|
||||
paste_model.remove_safe(pid, sub, pkey, backend.paste, context)
|
||||
else:
|
||||
paste_model.remove(pid, config.backend.paste)
|
||||
paste_model.remove(pid, backend.paste)
|
||||
raise GoneError(str(e)) from e
|
||||
except paste_model.NotFoundError as e:
|
||||
raise NotFoundError(str(e))
|
||||
|
|
@ -87,10 +96,9 @@ def get(**kwargs):
|
|||
# burn after read
|
||||
if expiration < 0:
|
||||
if kwargs.get('user') is not None:
|
||||
paste_model.remove_safe(pid, sub, pkey, config.backend.paste,
|
||||
config.salt, config.hmac_iterations)
|
||||
paste_model.remove_safe(pid, sub, pkey, backend.paste, context)
|
||||
else:
|
||||
paste_model.remove(pid, config.backend.paste)
|
||||
paste_model.remove(pid, backend.paste)
|
||||
|
||||
if syntax is not None:
|
||||
data = highlight(data, str(syntax), formatter, linenos)
|
||||
|
|
@ -110,12 +118,14 @@ def post(**kwargs):
|
|||
"""
|
||||
|
||||
config = current_app.httpaste
|
||||
backend = load_backend(config.backend)
|
||||
context = config.context
|
||||
|
||||
if kwargs['body'].get('data') is None:
|
||||
raise BadRequestError('form field \'data\' missing.')
|
||||
|
||||
encoding = PasteEncoding(kwargs.get('encoding', 'utf-8'))
|
||||
lifetime = PasteLifetime(kwargs.get('lifetime', config.paste_lifetime))
|
||||
lifetime = PasteLifetime(kwargs.get('lifetime', config.model.paste.default_lifetime))
|
||||
|
||||
if encoding not in ['utf-8', 'utf-16', 'ascii']:
|
||||
try:
|
||||
|
|
@ -135,15 +145,15 @@ def post(**kwargs):
|
|||
sub = Sub(kwargs['token_info'].get('sub'))
|
||||
|
||||
pid, pkey = paste_model.create_safe(pdata, lifetime, sub, encoding,
|
||||
config.backend.paste, config.salt, config.hmac_iterations)
|
||||
config.model.paste, backend.paste,
|
||||
context)
|
||||
|
||||
user_model.dump_paste_key(pid, pkey, sub, key, config.backend.user,
|
||||
config.salt, config.hmac_iterations)
|
||||
user_model.dump_paste_key(pid, pkey, sub, key, backend.user, context)
|
||||
else:
|
||||
# unauthenticated
|
||||
|
||||
pid = paste_model.create(pdata, lifetime, encoding, config.backend.paste,
|
||||
config.salt, config.hmac_iterations)
|
||||
pid = paste_model.create(pdata, lifetime, encoding, config.model.paste,
|
||||
backend.paste, context)
|
||||
|
||||
|
||||
base_url = join_url(request.root_url, request.path)
|
||||
|
|
|
|||
|
|
@ -5,8 +5,6 @@ def search(**kwargs):
|
|||
"""
|
||||
"""
|
||||
|
||||
print(args)
|
||||
|
||||
return 'Hallo', 200
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -4,20 +4,22 @@ from flask import current_app
|
|||
|
||||
from httpaste.helper.http import ForbiddenError
|
||||
from httpaste.model.user import authenticate, AuthenticationError
|
||||
|
||||
from httpaste.backend import load_backend
|
||||
|
||||
def post(*args, **kwargs):
|
||||
"""
|
||||
"""
|
||||
|
||||
config = current_app.httpaste
|
||||
backend = load_backend(config.backend)
|
||||
context = config.context
|
||||
|
||||
user_id = args[0].encode('utf-8')
|
||||
password = args[1].encode('utf-8')
|
||||
|
||||
try:
|
||||
|
||||
return authenticate(user_id, password, config.backend.user, config.salt, config.hmac_iterations)
|
||||
return authenticate(user_id, password, backend.user, context)
|
||||
except AuthenticationError as e:
|
||||
|
||||
raise ForbiddenError('You shall not pass!') from e
|
||||
|
|
|
|||
111
src/httpaste/helper/config.py
Executable file
111
src/httpaste/helper/config.py
Executable file
|
|
@ -0,0 +1,111 @@
|
|||
import os
|
||||
from pathlib import Path
|
||||
from configparser import ConfigParser, NoSectionError
|
||||
from typing import Optional, NamedTuple
|
||||
from os import environ
|
||||
|
||||
|
||||
CONFIGPATH_ENVIRON = 'HTTPASTE_CONFIGPATH'
|
||||
|
||||
|
||||
class ConfigError(Exception):
|
||||
"""
|
||||
"""
|
||||
|
||||
|
||||
def get_sanitized_config_charset(charset: str):
|
||||
|
||||
for x in ["$", "%"]:
|
||||
|
||||
charset = charset.replace(x, f'{x}{x}')
|
||||
|
||||
return charset
|
||||
|
||||
|
||||
def typecast(obj: dict, aclass: type, dirname:Optional[Path] = None) -> dict:
|
||||
"""typecast a dictionary according to class annotations
|
||||
|
||||
:param obj: dictionary to typecast
|
||||
:param aclass: class containing typehint annotations
|
||||
:param basepath: basepath for filesystem path typecasting
|
||||
|
||||
:returns: typecasted dictionary
|
||||
"""
|
||||
|
||||
casted = {}
|
||||
|
||||
for k,v in obj.items():
|
||||
|
||||
v = v.strip('\'"')
|
||||
|
||||
try:
|
||||
bclass = aclass.__annotations__[k]
|
||||
except KeyError as e:
|
||||
raise KeyError(f'{k}: not allowed') from e
|
||||
|
||||
if issubclass(bclass, Path) and not str(v[0]).startswith(os.path.sep):
|
||||
if not isinstance(dirname, Path):
|
||||
raise TypeError('no dirname for Path type specified.')
|
||||
|
||||
casted[k] = casted_val = dirname / v
|
||||
elif issubclass(bclass, bytes):
|
||||
|
||||
casted[k] = bclass(v.encode('utf-8'))
|
||||
else:
|
||||
try:
|
||||
casted_val = bclass(v)
|
||||
except ValueError as e:
|
||||
raise ValueError(f'{k}: {e}') from e
|
||||
else:
|
||||
casted[k] = casted_val
|
||||
|
||||
return casted
|
||||
|
||||
|
||||
def get_config(configIni: ConfigParser, section: str, bclass: type, dirname: Path = None) -> object:
|
||||
"""get an object-oriented configuration from an INI file
|
||||
|
||||
:param configIni: configparser.Configparser object with initialized stream
|
||||
:param section: name of section to get configuration for
|
||||
:param bclass: configuration base class
|
||||
:param dirname: directory name of INI file stream
|
||||
|
||||
:returns: initialized configuration instance
|
||||
"""
|
||||
|
||||
try:
|
||||
raw_config = dict(configIni.items(section))
|
||||
except NoSectionError as e:
|
||||
raw_config = {}
|
||||
|
||||
try:
|
||||
casted_config = typecast(raw_config, bclass, dirname)
|
||||
except KeyError as e:
|
||||
raise ConfigError(f'[{section}] {e}') from e
|
||||
except ValueError as e:
|
||||
raise ConfigError(f'[{section}] {e}') from e
|
||||
|
||||
try:
|
||||
config = bclass(**casted_config)
|
||||
except TypeError as e:
|
||||
raise ConfigError(f'[{section}] {e}') from e
|
||||
|
||||
return config
|
||||
|
||||
|
||||
|
||||
def get_configparser(path: Path = None, var_name: str = CONFIGPATH_ENVIRON):
|
||||
"""
|
||||
"""
|
||||
|
||||
if path is None:
|
||||
try:
|
||||
path = environ[var_name]
|
||||
except KeyError as e:
|
||||
raise ConfigError(
|
||||
f'environment variable \'{var_name}\' not set.') from e
|
||||
|
||||
configIni = ConfigParser()
|
||||
configIni.read(path)
|
||||
|
||||
return configIni, path
|
||||
|
|
@ -8,7 +8,7 @@ from cryptography.hazmat.primitives import hashes
|
|||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
from cryptography.fernet import Fernet, InvalidToken
|
||||
|
||||
from httpaste import Config
|
||||
from httpaste.context import Config
|
||||
|
||||
|
||||
DEFAULT_HMAC_ITERATIONS = 20000
|
||||
|
|
@ -38,7 +38,7 @@ def dhash(data: bytes):
|
|||
return hashlib.sha512(data).digest()
|
||||
|
||||
|
||||
def derive_key(main_key: str, salt: bytes = Config.salt, iterations:int=DEFAULT_HMAC_ITERATIONS) -> bytes:
|
||||
def derive_key(main_key: str, salt: bytes, iterations:int=DEFAULT_HMAC_ITERATIONS) -> bytes:
|
||||
"""derive a key from a main key
|
||||
|
||||
:param main_key: main key to derive from
|
||||
|
|
|
|||
|
|
@ -1,144 +1,19 @@
|
|||
"""Model
|
||||
"""
|
||||
from typing import NamedTuple, Optional, Dict, Union, Any, TypedDict
|
||||
from typing import NamedTuple
|
||||
from configparser import ConfigParser
|
||||
from pathlib import Path
|
||||
|
||||
from httpaste.model.paste import Config as PasteConfig
|
||||
from httpaste.model.paste import get_paste_model_config
|
||||
|
||||
class Config(NamedTuple):
|
||||
"""Model Configuration"""
|
||||
paste: PasteConfig
|
||||
|
||||
|
||||
class PasteDataSchema:
|
||||
"""Paste Interface schema between Model and Backend
|
||||
"""
|
||||
pid = bytes
|
||||
data = bytes
|
||||
data_hash = bytes
|
||||
sub = bytes
|
||||
timestamp = int
|
||||
lifetime = int
|
||||
expiration = int
|
||||
encoding = str
|
||||
def get_model_config(configIni: ConfigParser, path:Path) -> Config:
|
||||
|
||||
paste_config = get_paste_model_config(configIni)
|
||||
|
||||
class UserDataSchema:
|
||||
"""User Interface Schema between Model and Backend
|
||||
"""
|
||||
sub = bytes
|
||||
key_hash = bytes
|
||||
index = bytes
|
||||
|
||||
|
||||
class Backend(object):
|
||||
"""Backend
|
||||
"""
|
||||
parameter_class: str
|
||||
|
||||
|
||||
class Salt(bytes):
|
||||
"""Salt
|
||||
"""
|
||||
|
||||
|
||||
class PasteData(PasteDataSchema.data):
|
||||
"""Paste Data
|
||||
"""
|
||||
|
||||
|
||||
class PasteHash(PasteDataSchema.data_hash):
|
||||
"""Paste Data Hash
|
||||
"""
|
||||
|
||||
|
||||
class PasteTimestamp(PasteDataSchema.timestamp):
|
||||
"""Paste Timestamp
|
||||
"""
|
||||
|
||||
|
||||
class PasteEncoding(PasteDataSchema.encoding):
|
||||
"""
|
||||
"""
|
||||
|
||||
|
||||
class PasteExpiration(PasteDataSchema.expiration):
|
||||
"""Paste Expiration
|
||||
|
||||
< 0: after first acccess
|
||||
0: never
|
||||
"""
|
||||
|
||||
|
||||
class PasteLifetime(PasteDataSchema.lifetime):
|
||||
"""Paste Lifetime
|
||||
"""
|
||||
|
||||
|
||||
class PasteSub(PasteDataSchema.sub):
|
||||
"""Hashed user id
|
||||
"""
|
||||
|
||||
|
||||
class KeyHash(UserDataSchema.key_hash):
|
||||
"""User Master Key Hash
|
||||
"""
|
||||
|
||||
|
||||
class PasteKey(bytes):
|
||||
"""Paste encryption key
|
||||
"""
|
||||
|
||||
|
||||
class PasteId(PasteDataSchema.pid):
|
||||
"""Paste unique identifier
|
||||
"""
|
||||
|
||||
|
||||
class MasterKey(bytes):
|
||||
"""User's master encryption key
|
||||
"""
|
||||
|
||||
|
||||
class Sub(UserDataSchema.sub):
|
||||
"""User id
|
||||
"""
|
||||
|
||||
|
||||
class Index(TypedDict):
|
||||
"""User Paste Index
|
||||
"""
|
||||
auth_expires: int
|
||||
pastes: Dict[str, Dict[str, Any]]
|
||||
|
||||
|
||||
class SerializedIndex(UserDataSchema.index):
|
||||
"""User Paste Index (serialized)
|
||||
"""
|
||||
|
||||
|
||||
class User(NamedTuple):
|
||||
"""Global User Model (and Prototype)
|
||||
|
||||
non-optional values are prototype values
|
||||
"""
|
||||
|
||||
#: user id
|
||||
sub: Sub
|
||||
#: user's master key hash
|
||||
key_hash: Optional[KeyHash] = None
|
||||
#: user's paste index
|
||||
index: Optional[Union[Index, SerializedIndex]] = None
|
||||
|
||||
|
||||
class Paste(NamedTuple):
|
||||
"""Global Paste Model (and Prototype)
|
||||
|
||||
non-optional values are prototype values
|
||||
"""
|
||||
|
||||
#: paste id
|
||||
pid: PasteId
|
||||
#: paste owner
|
||||
sub: Optional[PasteSub] = None
|
||||
#: paste data
|
||||
data: Optional[PasteData] = None
|
||||
#: paste data hash
|
||||
data_hash: Optional[PasteHash] = None
|
||||
#: paste timestamp
|
||||
expiration: Optional[PasteExpiration] = None
|
||||
#: paste encoding
|
||||
encoding: Optional[PasteEncoding] = None
|
||||
return Config(paste=paste_config)
|
||||
|
|
@ -2,15 +2,30 @@
|
|||
"""paste model interface
|
||||
"""
|
||||
import json
|
||||
from typing import Optional, Tuple
|
||||
from typing import Optional, Tuple, NamedTuple
|
||||
import time
|
||||
from configparser import ConfigParser
|
||||
from string import ascii_uppercase, digits, ascii_letters, punctuation
|
||||
|
||||
from httpaste import Config
|
||||
|
||||
from httpaste.context import Config as ContextConfig
|
||||
from httpaste.helper.crypto import dhash, shash, encrypt, decrypt
|
||||
from httpaste.helper.config import get_sanitized_config_charset, get_config
|
||||
from httpaste.helper.common import generate_random_string
|
||||
from httpaste.model import (Paste, PasteId, Sub, MasterKey, PasteKey, Salt,
|
||||
PasteData, PasteHash, PasteTimestamp, PasteSub,
|
||||
PasteLifetime, PasteEncoding, PasteExpiration)
|
||||
from httpaste.schema import (Paste, PasteId, Sub, MasterKey, PasteKey, Salt,
|
||||
PasteData, PasteHash, PasteTimestamp, PasteSub,
|
||||
PasteLifetime, PasteEncoding, PasteExpiration)
|
||||
|
||||
|
||||
class Config(NamedTuple):
|
||||
id_size: int = 8
|
||||
id_charset: str = ascii_letters + digits
|
||||
key_size: int = 32
|
||||
key_charset: str = get_sanitized_config_charset(ascii_letters + digits + punctuation)
|
||||
default_lifetime: int = 5
|
||||
default_max_lifetime: int = 1440
|
||||
default_min_lifetime: int = 1
|
||||
default_encoding: str = 'utf-8'
|
||||
|
||||
|
||||
class NotFoundError(Exception):
|
||||
|
|
@ -38,9 +53,7 @@ class BackendError(Exception):
|
|||
"""
|
||||
|
||||
|
||||
def generate_paste_id(
|
||||
length: int = Config.paste_id_size,
|
||||
charset: str = Config.paste_id_charset) -> bytes:
|
||||
def generate_paste_id(length: int, charset: str) -> bytes:
|
||||
"""generate a paste id
|
||||
|
||||
:param length: length of id
|
||||
|
|
@ -50,9 +63,7 @@ def generate_paste_id(
|
|||
return generate_random_string(length, charset).encode('utf-8')
|
||||
|
||||
|
||||
def generate_paste_key(
|
||||
length: int = Config.paste_key_size,
|
||||
charset: str = Config.paste_key_charset) -> bytes:
|
||||
def generate_paste_key(length: int, charset: str) -> bytes:
|
||||
"""generate a paste encryption key
|
||||
|
||||
:param length: length of key
|
||||
|
|
@ -98,8 +109,7 @@ def load_safe(
|
|||
proto: Paste,
|
||||
key: PasteKey,
|
||||
backend: object,
|
||||
salt: Salt = Config.salt,
|
||||
hmac_iter: int = Config.hmac_iterations):
|
||||
context: ContextConfig):
|
||||
"""load an encrypted paste model
|
||||
|
||||
:param proto: paste model prototype
|
||||
|
|
@ -109,7 +119,7 @@ def load_safe(
|
|||
|
||||
model = load(proto, backend)
|
||||
|
||||
data = decrypt(model.data, key, salt, hmac_iter)
|
||||
data = decrypt(model.data, key, context.salt, context.hmac_iter)
|
||||
|
||||
if model.data_hash and dhash(data) != model.data_hash:
|
||||
|
||||
|
|
@ -131,10 +141,7 @@ def dump(model: Paste, backend: object) -> None:
|
|||
:param backend: model backend object
|
||||
"""
|
||||
|
||||
try:
|
||||
backend.dump(model)
|
||||
except Exception as e:
|
||||
raise BackendError(str(e)) from e
|
||||
backend.dump(model)
|
||||
|
||||
|
||||
def delete(proto: Paste, backend: object) -> None:
|
||||
|
|
@ -158,13 +165,12 @@ def delete_safe(
|
|||
proto: Paste,
|
||||
key: PasteKey,
|
||||
backend: object,
|
||||
salt: Salt = Config.salt,
|
||||
hmac_iter: int = Config.hmac_iterations) -> None:
|
||||
context: ContextConfig) -> None:
|
||||
"""
|
||||
"""
|
||||
|
||||
try:
|
||||
model = load_safe(proto, key, backend, salt, hmac_iter)
|
||||
model = load_safe(proto, key, backend, context)
|
||||
except LifetimeError:
|
||||
pass
|
||||
|
||||
|
|
@ -177,9 +183,9 @@ def create(
|
|||
data: PasteData,
|
||||
lifetime: PasteLifetime,
|
||||
encoding: PasteEncoding,
|
||||
config: Config,
|
||||
backend: object,
|
||||
salt: Salt = Config.salt,
|
||||
hmac_iter: int = Config.hmac_iterations) -> PasteId:
|
||||
context: ContextConfig) -> PasteId:
|
||||
"""create an unencrypted paste
|
||||
|
||||
:param data: paste data
|
||||
|
|
@ -187,18 +193,20 @@ def create(
|
|||
:param backend: model backend object
|
||||
"""
|
||||
|
||||
pid = PasteId(generate_paste_id())
|
||||
pid = PasteId(generate_paste_id(config.id_size, config.id_charset))
|
||||
safe_pid = PasteId(dhash(pid))
|
||||
data_hash = PasteHash(dhash(data))
|
||||
sub = None
|
||||
timestamp = PasteTimestamp(int(time.time()))
|
||||
|
||||
if lifetime < 0:
|
||||
if lifetime is None:
|
||||
lifetime = config.default_lifetime
|
||||
elif lifetime < 0:
|
||||
expiration = -1
|
||||
else:
|
||||
expiration = PasteExpiration(timestamp + (lifetime * 60))
|
||||
|
||||
safe_data = PasteData(encrypt(data, pid, salt, hmac_iter))
|
||||
safe_data = PasteData(encrypt(data, pid, context.salt, context.hmac_iter))
|
||||
|
||||
model = Paste(
|
||||
safe_pid,
|
||||
|
|
@ -217,9 +225,9 @@ def create_safe(data: PasteData,
|
|||
lifetime: PasteLifetime,
|
||||
sub: Sub,
|
||||
encoding: PasteEncoding,
|
||||
config: Config,
|
||||
backend: object,
|
||||
salt: Salt = Config.salt,
|
||||
hmac_iter: int = Config.hmac_iterations) -> Tuple[PasteId,PasteKey]:
|
||||
context: ContextConfig) -> Tuple[PasteId,PasteKey]:
|
||||
"""create an encrypted paste
|
||||
|
||||
:param data: paste data
|
||||
|
|
@ -229,19 +237,21 @@ def create_safe(data: PasteData,
|
|||
:param salt: randomization salt
|
||||
"""
|
||||
|
||||
pid = PasteId(generate_paste_id())
|
||||
pid = PasteId(generate_paste_id(config.id_size, config.id_charset))
|
||||
safe_pid = PasteId(dhash(pid))
|
||||
pkey = PasteKey(generate_paste_key())
|
||||
pkey = PasteKey(generate_paste_key(config.key_size, config.key_charset))
|
||||
data_hash = PasteHash(dhash(data))
|
||||
safe_sub = PasteSub(shash(sub, data_hash, pid))
|
||||
timestamp = PasteTimestamp(int(time.time()))
|
||||
|
||||
if lifetime < 0:
|
||||
if lifetime is None:
|
||||
lifetime = config.default_lifetime
|
||||
elif lifetime < 0:
|
||||
expiration = -1
|
||||
else:
|
||||
expiration = PasteExpiration(timestamp + (lifetime * 60))
|
||||
|
||||
safe_data = PasteData(encrypt(data, pkey, salt, hmac_iter))
|
||||
safe_data = PasteData(encrypt(data, pkey, context.salt, context.hmac_iter))
|
||||
|
||||
dump(Paste(
|
||||
safe_pid,
|
||||
|
|
@ -269,21 +279,20 @@ def remove_safe(
|
|||
sub: Sub,
|
||||
key: PasteKey,
|
||||
backend: object,
|
||||
salt: Salt = Config.salt,
|
||||
hmac_iter: int = Config.hmac_iterations):
|
||||
context: ContextConfig):
|
||||
|
||||
proto = Paste(pid, sub)
|
||||
|
||||
delete_safe(proto, key, backend, salt, hmac_iter)
|
||||
delete_safe(proto, key, backend, context)
|
||||
|
||||
|
||||
def get(pid: PasteId, backend: object, salt: Salt = Config.salt, hmac_iter: int = Config.hmac_iterations) -> PasteData:
|
||||
def get(pid: PasteId, backend: object, context: ContextConfig) -> PasteData:
|
||||
"""conveniently load an unencrypted paste
|
||||
"""
|
||||
|
||||
model = load(Paste(pid), backend)
|
||||
|
||||
data = decrypt(model.data, pid, salt, hmac_iter)
|
||||
data = decrypt(model.data, pid, context.salt, context.hmac_iter)
|
||||
|
||||
return PasteData(data), model.expiration, model.encoding
|
||||
|
||||
|
|
@ -292,12 +301,22 @@ def get_safe(
|
|||
pid: PasteId,
|
||||
pkey: PasteKey,
|
||||
sub: Sub,
|
||||
config: Config,
|
||||
backend: object,
|
||||
salt: Salt = Config.salt,
|
||||
hmac_iter: int = Config.hmac_iterations) -> PasteData:
|
||||
context: ContextConfig) -> PasteData:
|
||||
"""conveniently load an encrypted paste
|
||||
"""
|
||||
|
||||
model = load_safe(Paste(pid, sub), pkey, backend, salt, hmac_iter)
|
||||
model = load_safe(Paste(pid, sub), pkey, backend, context)
|
||||
|
||||
return PasteData(model.data), model.expiration, model.encoding
|
||||
|
||||
|
||||
def get_paste_model_config(configIni: ConfigParser) -> Config:
|
||||
|
||||
return get_config(configIni, 'model.paste', Config)
|
||||
|
||||
|
||||
__all__ = [
|
||||
get_paste_model_config
|
||||
]
|
||||
|
|
@ -5,7 +5,7 @@ import json
|
|||
from time import time
|
||||
from typing import Optional
|
||||
|
||||
from httpaste import Config
|
||||
from httpaste.context import Config as ContextConfig
|
||||
from httpaste.helper.crypto import (
|
||||
dhash,
|
||||
shash,
|
||||
|
|
@ -13,7 +13,7 @@ from httpaste.helper.crypto import (
|
|||
decrypt,
|
||||
derive_key,
|
||||
DecryptionError)
|
||||
from httpaste.model import (
|
||||
from httpaste.schema import (
|
||||
User,
|
||||
KeyHash,
|
||||
Index,
|
||||
|
|
@ -39,8 +39,7 @@ def _load(
|
|||
proto: User,
|
||||
master_key: str,
|
||||
backend: object,
|
||||
salt: Salt = Config.salt,
|
||||
hmac_iter: int = Config.hmac_iterations) -> Optional[User]:
|
||||
context: ContextConfig) -> Optional[User]:
|
||||
"""load user model
|
||||
|
||||
:param model: user model prototype
|
||||
|
|
@ -55,7 +54,7 @@ def _load(
|
|||
return None
|
||||
|
||||
try:
|
||||
serialized_data = decrypt(model.index, master_key, salt, hmac_iter)
|
||||
serialized_data = decrypt(model.index, master_key, context.salt, context.hmac_iter)
|
||||
except DecryptionError as e:
|
||||
raise IndexError('unable to decrypt user index') from e
|
||||
else:
|
||||
|
|
@ -71,8 +70,7 @@ def _dump(
|
|||
model: User,
|
||||
key: MasterKey,
|
||||
backend: object,
|
||||
salt: Salt = Config.salt,
|
||||
hmac_iter: int = Config.hmac_iterations) -> None:
|
||||
context: ContextConfig) -> None:
|
||||
"""dump a user model
|
||||
|
||||
:param model: user model
|
||||
|
|
@ -87,7 +85,7 @@ def _dump(
|
|||
|
||||
serialized_index = json.dumps(model.index).encode('utf-8')
|
||||
|
||||
safe_index = SerializedIndex(encrypt(serialized_index, key, salt, hmac_iter))
|
||||
safe_index = SerializedIndex(encrypt(serialized_index, key, context.salt, context.hmac_iter))
|
||||
|
||||
backend.dump(User(*model[:-1], safe_index))
|
||||
|
||||
|
|
@ -96,7 +94,8 @@ def load_paste_key(
|
|||
pid: PasteId,
|
||||
sub: Sub,
|
||||
key: MasterKey,
|
||||
backend: object, salt: Salt = Config.salt, hmac_iter: int = Config.hmac_iterations) -> Optional[PasteKey]:
|
||||
backend: object,
|
||||
context: ContextConfig) -> Optional[PasteKey]:
|
||||
"""load a user paste key
|
||||
|
||||
:param pid: paste id
|
||||
|
|
@ -106,7 +105,7 @@ def load_paste_key(
|
|||
:param salt: randomization salt
|
||||
"""
|
||||
|
||||
model = _load(User(sub), key, backend, salt, hmac_iter)
|
||||
model = _load(User(sub), key, backend, context)
|
||||
|
||||
for k, v in model.index.get('pastes').items():
|
||||
|
||||
|
|
@ -123,8 +122,7 @@ def dump_paste_key(
|
|||
sub: Sub,
|
||||
key: MasterKey,
|
||||
backend: object,
|
||||
salt: str = Config.salt,
|
||||
hmac_iter: int = Config.hmac_iterations) -> None:
|
||||
context: ContextConfig) -> None:
|
||||
"""dump a user paste key
|
||||
|
||||
:param pid: paste id
|
||||
|
|
@ -134,21 +132,20 @@ def dump_paste_key(
|
|||
:param backend: user model backend
|
||||
"""
|
||||
|
||||
model = _load(User(sub), key, backend, salt, hmac_iter)
|
||||
model = _load(User(sub), key, backend, context)
|
||||
|
||||
model.index.setdefault('pastes', {})[pid.hex()] = {
|
||||
'key': pkey.hex()
|
||||
}
|
||||
|
||||
_dump(model, key, backend, salt, hmac_iter)
|
||||
_dump(model, key, backend, context)
|
||||
|
||||
|
||||
def authenticate(
|
||||
user_id: bytes,
|
||||
password: bytes,
|
||||
backend: object,
|
||||
salt: Salt = Config.salt,
|
||||
hmac_iter: int = Config.hmac_iterations):
|
||||
context: ContextConfig):
|
||||
"""authenticate a user
|
||||
|
||||
:param user_id: human-readable user id
|
||||
|
|
@ -156,7 +153,7 @@ def authenticate(
|
|||
"""
|
||||
|
||||
sub = Sub(dhash(user_id))
|
||||
key = MasterKey(derive_key(password, salt, hmac_iter))
|
||||
key = MasterKey(derive_key(password, context.salt, context.hmac_iter))
|
||||
key_hash = KeyHash(dhash(key))
|
||||
|
||||
proto = User(sub)
|
||||
|
|
@ -164,7 +161,7 @@ def authenticate(
|
|||
bogus_decline_msg = 'unable to authenticate'
|
||||
|
||||
try:
|
||||
model = _load(proto, key, backend, salt, hmac_iter)
|
||||
model = _load(proto, key, backend, context)
|
||||
except IndexError as e:
|
||||
raise AuthenticationError(bogus_decline_msg) from e
|
||||
|
||||
|
|
@ -175,7 +172,7 @@ def authenticate(
|
|||
}
|
||||
|
||||
model = User(sub, key_hash, Index(data))
|
||||
_dump(model, key, backend, salt, hmac_iter)
|
||||
_dump(model, key, backend, context)
|
||||
else:
|
||||
|
||||
if model.key_hash != key_hash:
|
||||
|
|
|
|||
|
|
@ -0,0 +1,137 @@
|
|||
from typing import NamedTuple, Optional, Dict, Union, Any, TypedDict
|
||||
|
||||
|
||||
class PasteDataSchema:
|
||||
"""Paste Interface schema between Model and Backend
|
||||
"""
|
||||
pid = bytes
|
||||
data = bytes
|
||||
data_hash = bytes
|
||||
sub = bytes
|
||||
timestamp = int
|
||||
lifetime = int
|
||||
expiration = int
|
||||
encoding = str
|
||||
|
||||
|
||||
class UserDataSchema:
|
||||
"""User Interface Schema between Model and Backend
|
||||
"""
|
||||
sub = bytes
|
||||
key_hash = bytes
|
||||
index = bytes
|
||||
|
||||
|
||||
|
||||
class Salt(bytes):
|
||||
"""Salt
|
||||
"""
|
||||
|
||||
|
||||
class PasteData(PasteDataSchema.data):
|
||||
"""Paste Data
|
||||
"""
|
||||
|
||||
|
||||
class PasteHash(PasteDataSchema.data_hash):
|
||||
"""Paste Data Hash
|
||||
"""
|
||||
|
||||
|
||||
class PasteTimestamp(PasteDataSchema.timestamp):
|
||||
"""Paste Timestamp
|
||||
"""
|
||||
|
||||
|
||||
class PasteEncoding(PasteDataSchema.encoding):
|
||||
"""
|
||||
"""
|
||||
|
||||
|
||||
class PasteExpiration(PasteDataSchema.expiration):
|
||||
"""Paste Expiration
|
||||
|
||||
< 0: after first acccess
|
||||
0: never
|
||||
"""
|
||||
|
||||
|
||||
class PasteLifetime(PasteDataSchema.lifetime):
|
||||
"""Paste Lifetime
|
||||
"""
|
||||
|
||||
|
||||
class PasteSub(PasteDataSchema.sub):
|
||||
"""Hashed user id
|
||||
"""
|
||||
|
||||
|
||||
class KeyHash(UserDataSchema.key_hash):
|
||||
"""User Master Key Hash
|
||||
"""
|
||||
|
||||
|
||||
class PasteKey(bytes):
|
||||
"""Paste encryption key
|
||||
"""
|
||||
|
||||
|
||||
class PasteId(PasteDataSchema.pid):
|
||||
"""Paste unique identifier
|
||||
"""
|
||||
|
||||
|
||||
class MasterKey(bytes):
|
||||
"""User's master encryption key
|
||||
"""
|
||||
|
||||
|
||||
class Sub(UserDataSchema.sub):
|
||||
"""User id
|
||||
"""
|
||||
|
||||
|
||||
class Index(TypedDict):
|
||||
"""User Paste Index
|
||||
"""
|
||||
auth_expires: int
|
||||
pastes: Dict[str, Dict[str, Any]]
|
||||
|
||||
|
||||
class SerializedIndex(UserDataSchema.index):
|
||||
"""User Paste Index (serialized)
|
||||
"""
|
||||
|
||||
|
||||
class User(NamedTuple):
|
||||
"""Global User Model (and Prototype)
|
||||
|
||||
non-optional values are prototype values
|
||||
"""
|
||||
|
||||
#: user id
|
||||
sub: Sub
|
||||
#: user's master key hash
|
||||
key_hash: Optional[KeyHash] = None
|
||||
#: user's paste index
|
||||
index: Optional[Union[Index, SerializedIndex]] = None
|
||||
|
||||
|
||||
class Paste(NamedTuple):
|
||||
"""Global Paste Model (and Prototype)
|
||||
|
||||
non-optional values are prototype values
|
||||
"""
|
||||
|
||||
#: paste id
|
||||
pid: PasteId
|
||||
#: paste owner
|
||||
sub: Optional[PasteSub] = None
|
||||
#: paste data
|
||||
data: Optional[PasteData] = None
|
||||
#: paste data hash
|
||||
data_hash: Optional[PasteHash] = None
|
||||
#: paste timestamp
|
||||
expiration: Optional[PasteExpiration] = None
|
||||
#: paste encoding
|
||||
encoding: Optional[PasteEncoding] = None
|
||||
17
src/httpaste/server.py
Executable file
17
src/httpaste/server.py
Executable file
|
|
@ -0,0 +1,17 @@
|
|||
from typing import NamedTuple
|
||||
from configparser import ConfigParser
|
||||
|
||||
|
||||
from httpaste.helper.config import get_config
|
||||
|
||||
|
||||
class Config(NamedTuple):
|
||||
"""connexion config
|
||||
"""
|
||||
swagger_ui: bool = True
|
||||
bind_address: str = None
|
||||
|
||||
|
||||
def get_server_config(configIni: ConfigParser) -> Config:
|
||||
|
||||
return get_config(configIni, 'server', Config)
|
||||
Loading…
Add table
Add a link
Reference in a new issue