Merged in bugfix/HTTPASTE-43/config (pull request #37)

Bugfix/HTTPASTE-43/config
This commit is contained in:
Tiara Rodney 2022-04-15 00:05:19 +00:00
commit c506cec36a
32 changed files with 1174 additions and 684 deletions

View file

@ -1,16 +1,38 @@
[general]
[context]
salt = '&)UxB-_$Lk$m=CB}dw[d85{-ZWR?uUNx'
paste_id_size = 8
paste_key_size = 32
paste_lifetime = 5
paste_max_lifetime = 1440
hmac_iterations = 20000
paste_default_encoding = 'utf-8'
hmac_iter = 20000
[model.paste]
default_encoding = 'utf-8'
id_size = 8
key_size = 32
default_lifetime = 5
default_max_lifetime = 1440
[controller.paste]
default_mime_type = 'text/plain'
default_linenos = False
default_syntax = False
default_formatter = 'terminal256'
[backend]
type = file
[backend.file]
base_dirname = 'sample_data'
[backend.sqlite]
path = 'devel/sample.db'
[backend.mysql]
user = 'example-user'
password = 'my_cool_secret'
database = 'httpaste'
host = '127.0.0.1'
[server]
swagger_ui = False
bind_address = 'sample.sock'

View file

@ -137,21 +137,23 @@ NOTES
SUCH DAMAGES.
"""
from typing import NamedTuple, Tuple, Any
from string import ascii_uppercase, digits, ascii_letters, punctuation
from inspect import isclass
from typing import NamedTuple
from configparser import ConfigParser
from ast import literal_eval
from io import StringIO
from os import environ
from importlib.resources import path as resource_path
from pathlib import Path
from connexion import FlaskApp
from connexion.resolver import RestyResolver
from httpaste.model import Backend
from httpaste.backend import get_backend_map
from httpaste.helper.common import generate_random_string
from httpaste.server import get_server_config
from httpaste.server import Config as ServerConfig
from httpaste.context import get_context_config
from httpaste.context import Config as ContextConfig
from httpaste.model import get_model_config
from httpaste.model import Config as ModelConfig
from httpaste.backend import get_backend_config
from httpaste.backend import Config as BackendConfig
from httpaste.helper.config import get_configparser, CONFIGPATH_ENVIRON
from httpaste.helper.http import (
BadRequestError,
ForbiddenError,
@ -160,147 +162,48 @@ from httpaste.helper.http import (
UnauthorizedError)
CONFIGPATH_ENVIRON = 'HTTPASTE_CONFIGPATH'
def get_sanitized_config_charset(charset: str):
for x in ["$", "%"]:
charset = charset.replace(x, f'{x}{x}')
return charset
class ConfigError(Exception):
"""Config Exception
class Config(NamedTuple):
"""
class Config:
"""httpaste global config
"""
salt: bytes = get_sanitized_config_charset(generate_random_string(
32, ascii_letters + digits + punctuation)).encode('utf-8')
paste_id_size: int = 8
paste_id_charset: str = ascii_letters + digits
paste_key_size: int = 32
paste_key_charset: str = get_sanitized_config_charset(
ascii_letters + digits + punctuation)
paste_lifetime: int = 5
backend: Backend = None
hmac_iterations: int = 20000
paste_default_encoding: str = 'utf-8'
context: ContextConfig
server: ServerConfig
model: ModelConfig
backend: BackendConfig
class ServerConfig:
"""connexion config
"""
swagger_ui: bool = True
bind_address = None
def get_config_path(var_name: str = CONFIGPATH_ENVIRON):
def get_config(configIni: ConfigParser, path: Path):
"""
"""
try:
from httpaste.model import Config as ModelConfig
return environ[var_name]
except KeyError as e:
context_config = get_context_config(configIni)
server_config = get_server_config(configIni)
model_config = get_model_config(configIni, path)
backend_config = get_backend_config(configIni, path)
raise ConfigError(
f'environment variable \'{var_name}\' not set.') from e
return Config(
context=context_config,
server=server_config,
model=model_config,
backend=backend_config
)
def load_config(path: str) -> Tuple[Config, ServerConfig]:
"""get config objects from file
"""
_config = ConfigParser()
_config.read(path)
backends = get_backend_map()
bconf = dict(_config.items('backend'))
btype = bconf.pop('type')
try:
bcl, bparamcl = backends[btype]
except KeyError as e:
bids = ', '.join(backends.keys())
raise ConfigError(' '.join((
f'invalid backend \'{btype}\' in \'{path}\'. ',
f'must be any of [{bids}]'
))) from e
config = dict(_config.items('general'))
server_config = dict(_config.items('server'))
c = Config()
sc = ServerConfig()
# typecast model_backend section items
bconf = {k: literal_eval(v) for k, v in bconf.items()}
# initialize model backend
c.backend = bcl(bparamcl(**bconf))
# typecast general section items
for k, v in config.items():
setattr(c, k, literal_eval(v))
# typecast server section items
for k, v in server_config.items():
setattr(sc, k, literal_eval(v))
c.salt = c.salt.encode('utf-8')
return c, sc
def default_config() -> str:
def load_config(path: str = None, var_name: str = CONFIGPATH_ENVIRON):
"""
"""
config = ConfigParser()
configIni, _ = get_configparser(path, var_name)
config['general'] = {
'salt': Config.salt.decode('utf-8'),
'paste_key_charset': Config.paste_key_charset,
'paste_id_charset': Config.paste_id_charset
}
for literal in [
'paste_id_size',
'paste_key_size',
'paste_lifetime'
]:
config['general'][literal] = str(getattr(Config, literal))
config['backend'] = {
'type': 'sqlite',
'path': 'file::memory:?cache=shared'
}
config['server'] = {}
for literal in [
'swagger_ui',
'bind_address'
]:
config['server'][literal] = str(getattr(ServerConfig, literal))
stream = StringIO()
config.write(stream)
stream.seek(0)
return stream.read()
return get_config(configIni, Path(path).resolve().parent)
def get_flask_app(
config: Config,
server_config: ServerConfig = ServerConfig) -> FlaskApp:
def get_flask_app(config: Config) -> FlaskApp:
"""get a flask app object
"""
options = {"swagger_ui": server_config.swagger_ui}
options = {"swagger_ui": config.server.swagger_ui}
#context manager returns a pathlib.Path object
with resource_path('httpaste.schema', 'httpaste.openapi.json') as path:
@ -340,8 +243,6 @@ def get_flask_app(
__all__ = [
Config,
ServerConfig,
load_config,
default_config,
get_flask_app
]

View file

@ -40,9 +40,9 @@ def command_standalone(**kwargs):
'Please install it by running \'python3 -m pip install gevent\'.'
))) from e
config, server_config = load_config(kwargs.get('config_path'))
config = load_config(kwargs.get('config_path'))
application = get_flask_app(config, server_config)
application = get_flask_app(config)
http_server = WSGIServer(('', kwargs.get('port')), application)
http_server.serve_forever()
@ -122,7 +122,7 @@ def parser():
p_standalone = sp.add_parser('standalone', help=command_standalone.__doc__)
p_standalone.add_argument('--config-path', '-c', required=True)
p_standalone.add_argument('--port', '-p', default=8080)
p_standalone.add_argument('--port', '-p', default=8082)
p_wsgi = sp.add_parser('wsgi', help=command_wsgi.__doc__)
p_wsgi.add_argument('--echo', '-e', action='store_true')

View file

@ -2,83 +2,114 @@
implements backend of model
"""
import sys
from inspect import isclass
from typing import Dict, Tuple
from abc import ABC, abstractmethod
from importlib import import_module
from configparser import ConfigParser
from typing import NamedTuple
from pathlib import Path
from httpaste.model import Backend, UserDataSchema, PasteDataSchema, User, Paste
from .sqlite import Parameters as SqliteParameters
from .sqlite import User as SqliteUser
from .sqlite import Paste as SqlitePaste
from .sqlite import get_connection as get_sqlite_connection
from .mysql import get_connection as get_mysql_connection
from .file import Parameters as FileParameters
from .file import User as FileUser
from .file import Paste as FilePaste
from .mysql import Parameters as MySQLParameters
from .mysql import User as MySQLUser
from .mysql import Paste as MySQLPaste
from httpaste.schema import User, Paste, UserDataSchema, PasteDataSchema
from httpaste.helper.config import get_config, ConfigError
class SQLite(Backend):
"""SQLite backend interface
class BackendError(Exception):
"""
"""
parameter_class = SqliteParameters
user: SqliteUser
paste: SqlitePaste
def __init__(self, parameters: SqliteParameters):
parameters = SqliteParameters(parameters.path, get_sqlite_connection(parameters))
self.user = SqliteUser(parameters, User)
self.paste = SqlitePaste(parameters, Paste)
class File(Backend):
"""File backend interface
class ObjectBackend(ABC):
"""
"""
parameter_class = FileParameters
user: FileUser
paste: FilePaste
@abstractmethod
def load(self, proto: object) -> object:
pass
def __init__(self, parameters: FileParameters):
@abstractmethod
def dump(self, model: object) -> None:
pass
self.user = FileUser(parameters, User, UserDataSchema)
self.paste = FilePaste(parameters, Paste, PasteDataSchema)
@abstractmethod
def delete(self, proto: object) -> None:
pass
@abstractmethod
def init(self) -> object:
pass
@abstractmethod
def sanitize(self) -> None:
pass
class MySQL(Backend):
"""MySQL backend interface
class BackendInterface(ABC):
"""
"""
parameter_class = MySQLParameters
user: MySQLUser
paste: MySQLPaste
@abstractmethod
def __init__(self, params: object,
user_model_class: type,
paste_model_class: type,
user_schema: type,
paste_schema: type) -> None:
pass
def __init__(self, parameters: MySQLParameters):
@property
@abstractmethod
def user(self) -> ObjectBackend:
pass
#parameters = MySQLParameters(*parameters[1:], get_mysql_connection(parameters))
self.user = MySQLUser(parameters, User)
self.paste = MySQLPaste(parameters, Paste)
@property
@abstractmethod
def paste(self) -> ObjectBackend:
pass
def get_backend_map() -> Dict[str, Tuple[type, type]]:
"""get a map of backend ids and their classes
class Config(NamedTuple):
"""Backend Configuration
"""
interface: type
config: dict
def load_backend(config: Config) -> BackendInterface:
"""load a backend
"""
mod = sys.modules[__name__]
out = {}
backend = config.interface(config.config, Paste, User, PasteDataSchema,
UserDataSchema)
for i in dir(mod):
return backend
obj = getattr(mod, i)
if isclass(obj) and obj.__module__ == __name__:
def get_backend_config(configIni: ConfigParser, path:Path) -> Config:
"""retrieve a cascaded backend configuration from an INI config object
"""
out[i.lower()] = (obj, obj.parameter_class)
if 'backend' not in configIni:
return out
raise ConfigError('missing [backend] section.')
if 'type' not in configIni['backend']:
raise ConfigError('missing [backend] \'type\'.')
mod_name = configIni['backend']['type']
section = f'backend.{mod_name}'
try:
mod = import_module(f'.{mod_name}', 'httpaste.backend')
except ImportError as e:
raise BackendError(f'backend \'{mod_name}\' does not exist: {e}') from e
else:
interface = mod.Backend
config = get_config(configIni, section, mod.Config, path)
return Config(interface=interface,config=config)
__all__ = [
load_backend,
get_backend_config
]

View file

@ -3,110 +3,117 @@
from os import path
from pathlib import Path
from typing import NamedTuple, Optional
from . import user
from . import paste
from httpaste.backend import BackendInterface as BackendAbc
from httpaste.backend import ObjectBackend as ObjectBackendAbc
class Parameters(NamedTuple):
"""Filesystem backend parameters
class Config(NamedTuple):
"""Filesystem backend config
"""
#: path of base directory
base_dirname: str
base_dirname: Path
#: basename of users table directory
user_dirname: Optional[str] = 'users'
user_dirname: str = 'users'
#: basename of pastes table directory
paste_dirname: Optional[str] = 'pastes'
paste_dirname: str = 'pastes'
class User(object):
"""Filesystem user model backend
"""
class ObjectBackendBc(ObjectBackendAbc):
dirname: Path
path: Path
def __init__(
self,
parameters: Parameters,
interface: object,
basename_attr: str,
config: Config,
model_class: type,
model_schema: type):
self.interface = interface
self.model_class = model_class
self.model_schema = model_schema
self.dirname = path.join(parameters.base_dirname,
parameters.user_dirname)
self.dirname = path.join(config.base_dirname,
getattr(config, basename_attr))
self.path = Path(self.dirname)
def load(self, proto: object):
return user.load(proto, self.path, self.model_class, self.model_schema)
return self.interface.load(proto, self.path, self.model_class, self.model_schema)
def dump(self, model: object):
return user.dump(model, self.path, self.model_schema)
return self.interface.dump(model, self.path, self.model_schema)
def delete(self, proto: object):
return user.delete(proto, self.path)
return self.interface.delete(proto, self.path)
def init(self):
return user.init(self.path)
return self.interface.init(self.path)
def sanitize(self):
if self.path.exists():
return user.sanitize(self.path, self.model_class, self.model_schema)
return self.interface.sanitize(self.path, self.model_class, self.model_schema)
return None
class Paste(object):
class UserBackend(ObjectBackendBc):
"""Filesystem user model backend
"""
def __init__(self, *args):
from . import user
super().__init__(user, 'user_dirname', *args)
class PasteBackend(ObjectBackendBc):
"""Filesystem paste model backend
"""
dirname: str
path: Path
def __init__(self, *args):
def __init__(
self,
parameters: Parameters,
model_class: type,
model_schema: type):
from . import paste
self.model_class = model_class
super().__init__(paste, 'paste_dirname', *args)
self.model_schema = model_schema
self.dirname = path.join(parameters.base_dirname,
parameters.paste_dirname)
class Backend(BackendAbc):
"""File backend interface
"""
self.path = Path(self.dirname)
_user: UserBackend
_paste: PasteBackend
def load(self, proto: object):
def __init__(self,
config: Config,
paste_model_class: type,
user_model_class: type,
paste_schema: type,
user_schema: type):
return paste.load(proto, self.path, self.model_class, self.model_schema)
self._user = UserBackend(config, user_model_class, user_schema)
self._paste = PasteBackend(config, paste_model_class, paste_schema)
def dump(self, model: object):
@property
def user(self) -> UserBackend:
return paste.dump(model, self.path, self.model_schema)
return self._user
def delete(self, proto: object):
@property
def paste(self) -> PasteBackend:
return paste.delete(proto, self.path)
return self._paste
def init(self):
return paste.init(self.path)
def sanitize(self):
if self.path.exists():
return paste.sanitize(self.path, self.model_class, self.model_schema)
return None
__all__ = [
Config,
Backend
]

View file

@ -1,10 +1,12 @@
"""MySQL backend
"""
from typing import NamedTuple, Optional, Callable
from typing import NamedTuple, Optional
from httpaste.backend import BackendInterface as BackendAbc
from httpaste.backend import ObjectBackend as ObjectBackendAbc
class Parameters(NamedTuple):
"""MySQL parameters
class Config(NamedTuple):
"""MySQL config
"""
#: user name
@ -16,26 +18,18 @@ class Parameters(NamedTuple):
#: database identifier
database: str
#: a mysql.connection.MySQLConnection object (does not apply to config)
connection: Optional[object] = None
connection: object = None
class User(object):
"""MySQL user model backend
"""
class ObjectBackendBc(ObjectBackendAbc):
connection: object
def __init__(self, parameters: Parameters, model_class: type) -> None:
from . import user
connect = get_mysql_connect_callee()
self.interface = user
def __init__(self, interface: object, config: Config, model_class: type) -> None:
self.interface = interface
self.model_class = model_class
self.connection = get_connection(parameters, connect)
self.connection = get_connection(config)
def load(self, proto: object) -> object:
@ -58,50 +52,54 @@ class User(object):
return self.interface.sanitize(self.connection, self.model_class)
class Paste(object):
class UserBackend(ObjectBackendBc):
"""MySQL user model backend
"""
def __init__(self, *args) -> None:
from . import user
super().__init__(paste, *args)
class PasteBackend(ObjectBackendBc):
"""MySQL paste model backend
"""
connection: object
def __init__(self, parameters: Parameters, model_class: type) -> None:
def __init__(self, *args) -> None:
from . import paste
connect = get_mysql_connect_callee()
self.interface = paste
self.model_class = model_class
self.connection = get_connection(parameters, connect)
def load(self, proto: object) -> object:
return self.interface.load(proto, self.connection, self.model_class)
def dump(self, model: object) -> None:
return self.interface.dump(model, self.connection)
def delete(self, proto: object) -> None:
return self.interface.delete(proto, self.connection)
def init(self) -> None:
return paste.init(self.connection)
def sanitize(self) -> None:
return self.interface.sanitize(self.connection, self.model_class)
super().__init__(paste, *args)
def get_mysql_connect_callee() -> object:
class Backend(BackendAbc):
"""MySQL backend interface
"""
user: UserBackend
paste: PasteBackend
def __init__(self,
config: Config,
paste_model_class: type,
user_model_class: type,
paste_schema: type,
user_schema: type):
self.user = UserBackend(config, user_model_class, user_schema)
self.paste = PasteBackend(config, paste_model_class, paste_schema)
def get_connection(config: Config) -> object:
"""get a mysql.connection.MySQLConnection object
"""
try:
from mysql.connector import connect
from mysql.connector.connection import MySQLConnection
except ImportError as e:
raise ImportError(' '.join((
'\'mysql-connector-python\' is not installed.',
@ -109,26 +107,18 @@ def get_mysql_connect_callee() -> object:
'\'python3 -m pip install mysql-connector-python\'.'
))) from e
return connect
if config.connection is not None:
return config.connection
def get_connection(parameters: Parameters, connect: Callable) -> object:
"""get a mysql.connection.MySQLConnection object
"""
if parameters.connection is not None:
return parameters.connection
connection = connect(user=parameters.user, password=parameters.password,
host=parameters.host,
database=parameters.database)
connection = connect(user=config.user, password=config.password,
host=config.host,
database=config.database)
return connection
__all__ = [
Parameters,
User,
Paste
Config,
Backend
]

View file

@ -1,16 +1,6 @@
from os import path
from time import time
from importlib.resources import open_text
try:
from mysql.connector.connection import MySQLConnection
except ImportError as e:
raise ImportError(' '.join((
'\'mysql-connector-python\' is not installed.',
'Install it by running',
'\'python3 -m pip install mysql-connector-python\'.'
))) from e
from mysql.connector.connection import MySQLConnection
def load(proto:object, connection: MySQLConnection, model_class: type):
@ -94,9 +84,17 @@ def init(connection: MySQLConnection):
cursor = connection.cursor()
with open_text('httpaste.backend.mysql', 'paste.sql') as fh:
statement = '''CREATE TABLE `httpaste_pastes` (
`pid` blob NOT NULL,
`data` longblob NOT NULL,
`data_hash` blob NOT NULL,
`sub` blob DEFAULT NULL,
`expiration` int(16) NOT NULL,
`encoding` tinytext DEFAULT NULL,
PRIMARY KEY (`pid`(128))
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;'''
cursor.execute(fh.read())
cursor.execute(statement)
connection.commit()

View file

@ -1,9 +0,0 @@
CREATE TABLE `httpaste_pastes` (
`pid` blob NOT NULL,
`data` longblob NOT NULL,
`data_hash` blob NOT NULL,
`sub` blob DEFAULT NULL,
`expiration` int(16) NOT NULL,
`encoding` tinytext DEFAULT NULL,
PRIMARY KEY (`pid`(128))
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

View file

@ -1,13 +1,5 @@
from os import path
from importlib.resources import open_text
try:
from mysql.connector.connection import MySQLConnection
except ImportError as e:
raise ImportError(' '.join((
'\'mysql-connector-python\' is not installed.',
'Install it by running',
'\'python3 -m pip install mysql-connector-python\'.'
))) from e
from mysql.connector.connection import MySQLConnection
def load(proto:object, connection: MySQLConnection, model_class: type):
@ -84,9 +76,14 @@ def init(connection: MySQLConnection):
cursor = connection.cursor()
with open_text('httpaste.backend.mysql', 'user.sql') as fh:
statement = '''CREATE TABLE `httpaste_users` (
`sub` blob NOT NULL,
`key_hash` blob NOT NULL,
`paste_index` blob NOT NULL,
PRIMARY KEY (`sub`(128))
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;'''
cursor.execute(fh.read())
cursor.execute(statement)
connection.commit()

View file

@ -1,6 +0,0 @@
CREATE TABLE `httpaste_users` (
`sub` blob NOT NULL,
`key_hash` blob NOT NULL,
`paste_index` blob NOT NULL,
PRIMARY KEY (`sub`(128))
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

View file

@ -2,96 +2,113 @@
"""
from sqlite3 import Connection, Row, connect
from typing import NamedTuple, Optional
from pathlib import Path
from . import user
from . import paste
from httpaste.backend import BackendInterface as BackendAbc
from httpaste.backend import ObjectBackend as ObjectBackendAbc
class Parameters(NamedTuple):
"""SQLite backend parameters
class Config(NamedTuple):
"""SQLite backend config
"""
#: local path or URI
path: str
uri: Path
user_table_name: str = 'httpaste_users'
paste_table_name: str = 'httpaste_pastes'
#: a sqlite3.Connection object (does not apply to config)
connection: Optional[object] = None
connection: Connection = None
class User(object):
"""SQLite user model backend
class ObjectBackendBc(ObjectBackendAbc):
connection: object
def __init__(self, interface: object, table_name_attr: str, config: Config, model_class: type, schema: type) -> None:
self.interface = interface
self.model_class = model_class
self.connection = get_connection(config)
self.table = getattr(config, table_name_attr)
def load(self, proto: object) -> object:
return self.interface.load(proto, self.connection, self.table, self.model_class)
def dump(self, model: object) -> None:
return self.interface.dump(model, self.connection, self.table)
def delete(self, proto: object) -> None:
return self.interface.delete(proto, self.connection, self.table)
def init(self) -> None:
return self.interface.init(self.connection, self.table)
def sanitize(self) -> None:
return self.interface.sanitize(self.connection, self.table, self.model_class)
class UserBackend(ObjectBackendBc):
"""sqlite user model backend
"""
connection: Connection
def __init__(self, *args) -> None:
def __init__(self, parameters: Parameters, model_class: type):
from . import user
self.model_class = model_class
self.connection = get_connection(parameters)
def load(self, proto: object):
return user.load(proto, self.connection, self.model_class)
def dump(self, model: object):
return user.dump(model, self.connection)
def delete(self, proto: object):
return user.delete(proto, self.connection)
def init(self):
return user.init(self.connection)
def sanitize(self):
return user.sanitize(self.connection, self.model_class)
super().__init__(paste, 'user_table_name', *args)
class Paste(object):
"""SQLite paste model backend
class PasteBackend(ObjectBackendBc):
"""sqlite paste model backend
"""
connection: Connection
connection: object
def __init__(self, parameters: Parameters, model_class: type):
def __init__(self, *args) -> None:
self.model_class = model_class
from . import paste
self.connection = get_connection(parameters)
def load(self, proto: object):
return paste.load(proto, self.connection, self.model_class)
def dump(self, model: object):
return paste.dump(model, self.connection)
def delete(self, proto: object):
return paste.delete(proto, self.connection)
def init(self):
return paste.init(self.connection)
def sanitize(self):
return paste.sanitize(self.connection, self.model_class)
super().__init__(paste, 'paste_table_name', *args)
def get_connection(parameters: Parameters):
class Backend(BackendAbc):
"""sqlite backend interface
"""
user: UserBackend
paste: PasteBackend
def __init__(self,
config: Config,
paste_model_class: type,
user_model_class: type,
paste_schema: type,
user_schema: type):
self.user = UserBackend(config, user_model_class, user_schema)
self.paste = PasteBackend(config, paste_model_class, paste_schema)
def get_connection(config: Config):
"""get an sqlite connection object
"""
if parameters.connection:
if config.connection:
return parameters.connection
return config.connection
connection = connect(parameters.path, check_same_thread=False)
connection = connect(config.uri, check_same_thread=False)
connection.row_factory = Row
return connection
__all__ = [
Config,
Backend
]

View file

@ -6,14 +6,14 @@ from time import time
from importlib.resources import open_text
def load(proto: object, connection: Connection, model_class: type):
def load(proto: object, connection: Connection, table: str, model_class: type):
"""load a paste
"""
cursor = connection.cursor()
statement = '''SELECT pid, data, data_hash, sub, expiration, encoding
FROM pastes
statement = f'''SELECT pid, data, data_hash, sub, expiration, encoding
FROM {table}
WHERE pid=?'''
cursor.execute(statement, (proto.pid,))
@ -33,13 +33,13 @@ def load(proto: object, connection: Connection, model_class: type):
return None
def dump(model: object, connection: Connection) -> None:
def dump(model: object, connection: Connection, table: str) -> None:
"""dump a paste
"""
cursor = connection.cursor()
statement = '''INSERT INTO pastes
statement = f'''INSERT INTO "{table}"
(pid, data, data_hash, sub, expiration, encoding)
VALUES (?,?,?,?,?,?)'''
@ -57,35 +57,41 @@ def dump(model: object, connection: Connection) -> None:
return None
def delete(proto: object, connection: Connection) -> None:
def delete(proto: object, connection: Connection, table: str) -> None:
cursor = connection.cursor()
cursor.execute('''DELETE FROM pastes WHERE pid=?''', (proto.pid,))
cursor.execute(f'''DELETE FROM {table} WHERE pid=?''', (proto.pid,))
connection.commit()
return None
def init(connection: Connection):
def init(connection: Connection, table: str):
cursor = connection.cursor()
with open_text('httpaste.backend.sqlite', 'paste.sql') as fh:
statement = fh.read()
statement = f'''CREATE TABLE IF NOT EXISTS "{table}" (
"pid" BLOB NOT NULL UNIQUE,
"data" BLOB NOT NULL,
"data_hash" BLOB NOT NULL,
"sub" BLOB UNIQUE,
"expiration" INTEGER NOT NULL,
"encoding" TEXT,
PRIMARY KEY("pid")
);'''
cursor.execute(statement)
connection.commit()
def sanitize(connection: Connection, model_class: type) -> int:
def sanitize(connection: Connection, table: str, model_class: type) -> int:
cursor = connection.cursor()
statement = '''SELECT pid FROM pastes
statement = f'''SELECT pid FROM {table}
WHERE expiration < ? AND expiration > 0'''
cursor.execute(statement, (int(time()),))

View file

@ -1,9 +0,0 @@
CREATE TABLE IF NOT EXISTS "pastes" (
"pid" BLOB NOT NULL UNIQUE,
"data" BLOB NOT NULL,
"data_hash" BLOB NOT NULL,
"sub" BLOB UNIQUE,
"expiration" INTEGER NOT NULL,
"encoding" TEXT,
PRIMARY KEY("pid")
);

View file

@ -6,14 +6,14 @@ from httpaste.model import User
from importlib.resources import open_text
def load(proto: object, connection: Connection, model_class: type):
def load(proto: object, connection: Connection, table: str, model_class: type):
"""load a user
"""
cursor = connection.cursor()
statement = '''SELECT sub, key_hash, paste_index
FROM users
statement = f'''SELECT sub, key_hash, paste_index
FROM {table}
WHERE sub=?'''
cursor.execute(statement, (proto.sub,))
@ -27,13 +27,13 @@ def load(proto: object, connection: Connection, model_class: type):
return None
def dump(model: object, connection: Connection) -> None:
def dump(model: object, connection: Connection, table: str) -> None:
"""dump a user
"""
cursor = connection.cursor()
statement = '''INSERT OR REPLACE INTO users
statement = f'''INSERT OR REPLACE INTO {table}
(sub, key_hash, paste_index)
VALUES (?,?,?)'''
@ -44,24 +44,27 @@ def dump(model: object, connection: Connection) -> None:
return None
def delete(proto: object, connection: Connection) -> None:
def delete(proto: object, connection: Connection, table: str) -> None:
cursor = connection.cursor()
cursor.execute('''DELETE FROM users WHERE sub=?''', (proto.sub,))
cursor.execute(f'''DELETE FROM {table} WHERE sub=?''', (proto.sub,))
connection.commit()
return None
def init(connection: Connection) -> None:
def init(connection: Connection, table: str) -> None:
cursor = connection.cursor()
with open_text('httpaste.backend.sqlite', 'user.sql') as fh:
statement = fh.read()
statement = f'''CREATE TABLE IF NOT EXISTS "{table}" (
"sub" BLOB NOT NULL UNIQUE,
"key_hash" BLOB NOT NULL,
"paste_index" BLOB,
PRIMARY KEY("sub")
);'''
cursor.execute(statement)
@ -70,6 +73,6 @@ def init(connection: Connection) -> None:
return None
def sanitize(connection: Connection, model_class) -> int:
def sanitize(connection: Connection, table: str, model_class) -> int:
return 0

View file

@ -1,6 +0,0 @@
CREATE TABLE IF NOT EXISTS "users" (
"sub" BLOB NOT NULL UNIQUE,
"key_hash" BLOB NOT NULL,
"paste_index" BLOB,
PRIMARY KEY("sub")
);

18
src/httpaste/context.py Executable file
View file

@ -0,0 +1,18 @@
from typing import NamedTuple
from string import ascii_uppercase, digits, ascii_letters, punctuation
from configparser import ConfigParser
from httpaste.helper.common import generate_random_string
from httpaste.helper.config import get_sanitized_config_charset, get_config
class Config(NamedTuple):
"""httpaste global config
"""
salt: bytes = get_sanitized_config_charset(generate_random_string(
32, ascii_letters + digits + punctuation)).encode('utf-8')
hmac_iter: int = 20000
def get_context_config(configIni: ConfigParser) -> Config:
return get_config(configIni, 'context', Config)

View file

@ -5,12 +5,14 @@ import httpaste
def get(**kwargs):
config = current_app.httpaste
context = config.context
model = config.model
return httpaste.__doc__.format(
url=connexion.request.url,
hmac_iterations=config.hmac_iterations,
paste_lifetime=config.paste_lifetime,
paste_max_lifetime=str(round(config.paste_max_lifetime / 60)),
paste_default_encoding=config.paste_default_encoding
hmac_iterations=context.hmac_iter,
paste_lifetime=model.paste.default_lifetime,
paste_max_lifetime=str(round(model.paste.default_max_lifetime / 60)),
paste_default_encoding=model.paste.default_encoding
), 200

View file

@ -5,8 +5,10 @@ from flask import current_app
from httpaste.helper.common import decode, DecodeError, join_url
import httpaste.model.paste as paste_model
import httpaste.model.user as user_model
from httpaste.backend import load_backend
from httpaste.helper.http import BadRequestError, GoneError, NotFoundError
from httpaste.model import (
from httpaste.helper.syntax import highlight
from httpaste.schema import (
PasteKey,
PasteData,
PasteLifetime,
@ -15,6 +17,13 @@ from httpaste.model import (
Sub)
class Config:
default_mime_type: str = 'text/plain'
default_linenos: bool = False
default_syntax: bool = False
default_formatter: str = 'terminal256'
def delete(**kwargs):
"""
"""
@ -45,12 +54,15 @@ def get(**kwargs):
"""
config = current_app.httpaste
backend = load_backend(config.backend)
context = config.context
syntax = kwargs.get('syntax')
formatter = kwargs.get('format', Config.default_formatter)
linenos = kwargs.get('linenos', Config.default_linenos)
mime = kwargs.get('mime', Config.default_mime_type)
pid = PasteKey(kwargs['id'].encode('utf-8'))
syntax = kwargs.get('syntax')
formatter = kwargs.get('format', 'terminal256')
linenos = kwargs.get('linenos', False)
mime = kwargs.get('mime', 'text/plain')
if kwargs.get('user') is not None:
# authenticated
@ -58,26 +70,23 @@ def get(**kwargs):
key = MasterKey(kwargs['token_info'].get('master_key'))
sub = Sub(kwargs['token_info'].get('sub'))
pkey = user_model.load_paste_key(pid, sub, key, config.backend.user,
config.salt, config.hmac_iterations)
pkey = user_model.load_paste_key(pid, sub, key, backend.user, context)
def call(): return paste_model.get_safe(pid, pkey, sub,
config.backend.paste,
config.salt, config.hmac_iterations)
config.model.paste,
backend.paste, context)
else:
# unauthenticated
def call(): return paste_model.get(pid, config.backend.paste,
config.salt, config.hmac_iterations)
def call(): return paste_model.get(pid, backend.paste, context)
try:
data, expiration, encoding = call()
except paste_model.LifetimeError as e:
if kwargs.get('user') is not None:
paste_model.remove_safe(pid, sub, pkey, config.backend.paste,
config.salt, config.hmac_iterations)
paste_model.remove_safe(pid, sub, pkey, backend.paste, context)
else:
paste_model.remove(pid, config.backend.paste)
paste_model.remove(pid, backend.paste)
raise GoneError(str(e)) from e
except paste_model.NotFoundError as e:
raise NotFoundError(str(e))
@ -87,10 +96,9 @@ def get(**kwargs):
# burn after read
if expiration < 0:
if kwargs.get('user') is not None:
paste_model.remove_safe(pid, sub, pkey, config.backend.paste,
config.salt, config.hmac_iterations)
paste_model.remove_safe(pid, sub, pkey, backend.paste, context)
else:
paste_model.remove(pid, config.backend.paste)
paste_model.remove(pid, backend.paste)
if syntax is not None:
data = highlight(data, str(syntax), formatter, linenos)
@ -110,12 +118,14 @@ def post(**kwargs):
"""
config = current_app.httpaste
backend = load_backend(config.backend)
context = config.context
if kwargs['body'].get('data') is None:
raise BadRequestError('form field \'data\' missing.')
encoding = PasteEncoding(kwargs.get('encoding', 'utf-8'))
lifetime = PasteLifetime(kwargs.get('lifetime', config.paste_lifetime))
lifetime = PasteLifetime(kwargs.get('lifetime', config.model.paste.default_lifetime))
if encoding not in ['utf-8', 'utf-16', 'ascii']:
try:
@ -135,15 +145,15 @@ def post(**kwargs):
sub = Sub(kwargs['token_info'].get('sub'))
pid, pkey = paste_model.create_safe(pdata, lifetime, sub, encoding,
config.backend.paste, config.salt, config.hmac_iterations)
config.model.paste, backend.paste,
context)
user_model.dump_paste_key(pid, pkey, sub, key, config.backend.user,
config.salt, config.hmac_iterations)
user_model.dump_paste_key(pid, pkey, sub, key, backend.user, context)
else:
# unauthenticated
pid = paste_model.create(pdata, lifetime, encoding, config.backend.paste,
config.salt, config.hmac_iterations)
pid = paste_model.create(pdata, lifetime, encoding, config.model.paste,
backend.paste, context)
base_url = join_url(request.root_url, request.path)

View file

@ -5,8 +5,6 @@ def search(**kwargs):
"""
"""
print(args)
return 'Hallo', 200

View file

@ -4,20 +4,22 @@ from flask import current_app
from httpaste.helper.http import ForbiddenError
from httpaste.model.user import authenticate, AuthenticationError
from httpaste.backend import load_backend
def post(*args, **kwargs):
"""
"""
config = current_app.httpaste
backend = load_backend(config.backend)
context = config.context
user_id = args[0].encode('utf-8')
password = args[1].encode('utf-8')
try:
return authenticate(user_id, password, config.backend.user, config.salt, config.hmac_iterations)
return authenticate(user_id, password, backend.user, context)
except AuthenticationError as e:
raise ForbiddenError('You shall not pass!') from e

111
src/httpaste/helper/config.py Executable file
View file

@ -0,0 +1,111 @@
import os
from pathlib import Path
from configparser import ConfigParser, NoSectionError
from typing import Optional, NamedTuple
from os import environ
CONFIGPATH_ENVIRON = 'HTTPASTE_CONFIGPATH'
class ConfigError(Exception):
"""
"""
def get_sanitized_config_charset(charset: str):
for x in ["$", "%"]:
charset = charset.replace(x, f'{x}{x}')
return charset
def typecast(obj: dict, aclass: type, dirname:Optional[Path] = None) -> dict:
"""typecast a dictionary according to class annotations
:param obj: dictionary to typecast
:param aclass: class containing typehint annotations
:param basepath: basepath for filesystem path typecasting
:returns: typecasted dictionary
"""
casted = {}
for k,v in obj.items():
v = v.strip('\'"')
try:
bclass = aclass.__annotations__[k]
except KeyError as e:
raise KeyError(f'{k}: not allowed') from e
if issubclass(bclass, Path) and not str(v[0]).startswith(os.path.sep):
if not isinstance(dirname, Path):
raise TypeError('no dirname for Path type specified.')
casted[k] = casted_val = dirname / v
elif issubclass(bclass, bytes):
casted[k] = bclass(v.encode('utf-8'))
else:
try:
casted_val = bclass(v)
except ValueError as e:
raise ValueError(f'{k}: {e}') from e
else:
casted[k] = casted_val
return casted
def get_config(configIni: ConfigParser, section: str, bclass: type, dirname: Path = None) -> object:
"""get an object-oriented configuration from an INI file
:param configIni: configparser.Configparser object with initialized stream
:param section: name of section to get configuration for
:param bclass: configuration base class
:param dirname: directory name of INI file stream
:returns: initialized configuration instance
"""
try:
raw_config = dict(configIni.items(section))
except NoSectionError as e:
raw_config = {}
try:
casted_config = typecast(raw_config, bclass, dirname)
except KeyError as e:
raise ConfigError(f'[{section}] {e}') from e
except ValueError as e:
raise ConfigError(f'[{section}] {e}') from e
try:
config = bclass(**casted_config)
except TypeError as e:
raise ConfigError(f'[{section}] {e}') from e
return config
def get_configparser(path: Path = None, var_name: str = CONFIGPATH_ENVIRON):
"""
"""
if path is None:
try:
path = environ[var_name]
except KeyError as e:
raise ConfigError(
f'environment variable \'{var_name}\' not set.') from e
configIni = ConfigParser()
configIni.read(path)
return configIni, path

View file

@ -8,7 +8,7 @@ from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.fernet import Fernet, InvalidToken
from httpaste import Config
from httpaste.context import Config
DEFAULT_HMAC_ITERATIONS = 20000
@ -38,7 +38,7 @@ def dhash(data: bytes):
return hashlib.sha512(data).digest()
def derive_key(main_key: str, salt: bytes = Config.salt, iterations:int=DEFAULT_HMAC_ITERATIONS) -> bytes:
def derive_key(main_key: str, salt: bytes, iterations:int=DEFAULT_HMAC_ITERATIONS) -> bytes:
"""derive a key from a main key
:param main_key: main key to derive from

View file

@ -1,144 +1,19 @@
"""Model
"""
from typing import NamedTuple, Optional, Dict, Union, Any, TypedDict
from typing import NamedTuple
from configparser import ConfigParser
from pathlib import Path
from httpaste.model.paste import Config as PasteConfig
from httpaste.model.paste import get_paste_model_config
class Config(NamedTuple):
"""Model Configuration"""
paste: PasteConfig
class PasteDataSchema:
"""Paste Interface schema between Model and Backend
"""
pid = bytes
data = bytes
data_hash = bytes
sub = bytes
timestamp = int
lifetime = int
expiration = int
encoding = str
def get_model_config(configIni: ConfigParser, path:Path) -> Config:
paste_config = get_paste_model_config(configIni)
class UserDataSchema:
"""User Interface Schema between Model and Backend
"""
sub = bytes
key_hash = bytes
index = bytes
class Backend(object):
"""Backend
"""
parameter_class: str
class Salt(bytes):
"""Salt
"""
class PasteData(PasteDataSchema.data):
"""Paste Data
"""
class PasteHash(PasteDataSchema.data_hash):
"""Paste Data Hash
"""
class PasteTimestamp(PasteDataSchema.timestamp):
"""Paste Timestamp
"""
class PasteEncoding(PasteDataSchema.encoding):
"""
"""
class PasteExpiration(PasteDataSchema.expiration):
"""Paste Expiration
< 0: after first acccess
0: never
"""
class PasteLifetime(PasteDataSchema.lifetime):
"""Paste Lifetime
"""
class PasteSub(PasteDataSchema.sub):
"""Hashed user id
"""
class KeyHash(UserDataSchema.key_hash):
"""User Master Key Hash
"""
class PasteKey(bytes):
"""Paste encryption key
"""
class PasteId(PasteDataSchema.pid):
"""Paste unique identifier
"""
class MasterKey(bytes):
"""User's master encryption key
"""
class Sub(UserDataSchema.sub):
"""User id
"""
class Index(TypedDict):
"""User Paste Index
"""
auth_expires: int
pastes: Dict[str, Dict[str, Any]]
class SerializedIndex(UserDataSchema.index):
"""User Paste Index (serialized)
"""
class User(NamedTuple):
"""Global User Model (and Prototype)
non-optional values are prototype values
"""
#: user id
sub: Sub
#: user's master key hash
key_hash: Optional[KeyHash] = None
#: user's paste index
index: Optional[Union[Index, SerializedIndex]] = None
class Paste(NamedTuple):
"""Global Paste Model (and Prototype)
non-optional values are prototype values
"""
#: paste id
pid: PasteId
#: paste owner
sub: Optional[PasteSub] = None
#: paste data
data: Optional[PasteData] = None
#: paste data hash
data_hash: Optional[PasteHash] = None
#: paste timestamp
expiration: Optional[PasteExpiration] = None
#: paste encoding
encoding: Optional[PasteEncoding] = None
return Config(paste=paste_config)

View file

@ -2,17 +2,32 @@
"""paste model interface
"""
import json
from typing import Optional, Tuple
from typing import Optional, Tuple, NamedTuple
import time
from configparser import ConfigParser
from string import ascii_uppercase, digits, ascii_letters, punctuation
from httpaste import Config
from httpaste.context import Config as ContextConfig
from httpaste.helper.crypto import dhash, shash, encrypt, decrypt
from httpaste.helper.config import get_sanitized_config_charset, get_config
from httpaste.helper.common import generate_random_string
from httpaste.model import (Paste, PasteId, Sub, MasterKey, PasteKey, Salt,
from httpaste.schema import (Paste, PasteId, Sub, MasterKey, PasteKey, Salt,
PasteData, PasteHash, PasteTimestamp, PasteSub,
PasteLifetime, PasteEncoding, PasteExpiration)
class Config(NamedTuple):
id_size: int = 8
id_charset: str = ascii_letters + digits
key_size: int = 32
key_charset: str = get_sanitized_config_charset(ascii_letters + digits + punctuation)
default_lifetime: int = 5
default_max_lifetime: int = 1440
default_min_lifetime: int = 1
default_encoding: str = 'utf-8'
class NotFoundError(Exception):
"""Paste Exception
"""
@ -38,9 +53,7 @@ class BackendError(Exception):
"""
def generate_paste_id(
length: int = Config.paste_id_size,
charset: str = Config.paste_id_charset) -> bytes:
def generate_paste_id(length: int, charset: str) -> bytes:
"""generate a paste id
:param length: length of id
@ -50,9 +63,7 @@ def generate_paste_id(
return generate_random_string(length, charset).encode('utf-8')
def generate_paste_key(
length: int = Config.paste_key_size,
charset: str = Config.paste_key_charset) -> bytes:
def generate_paste_key(length: int, charset: str) -> bytes:
"""generate a paste encryption key
:param length: length of key
@ -98,8 +109,7 @@ def load_safe(
proto: Paste,
key: PasteKey,
backend: object,
salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations):
context: ContextConfig):
"""load an encrypted paste model
:param proto: paste model prototype
@ -109,7 +119,7 @@ def load_safe(
model = load(proto, backend)
data = decrypt(model.data, key, salt, hmac_iter)
data = decrypt(model.data, key, context.salt, context.hmac_iter)
if model.data_hash and dhash(data) != model.data_hash:
@ -131,10 +141,7 @@ def dump(model: Paste, backend: object) -> None:
:param backend: model backend object
"""
try:
backend.dump(model)
except Exception as e:
raise BackendError(str(e)) from e
def delete(proto: Paste, backend: object) -> None:
@ -158,13 +165,12 @@ def delete_safe(
proto: Paste,
key: PasteKey,
backend: object,
salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> None:
context: ContextConfig) -> None:
"""
"""
try:
model = load_safe(proto, key, backend, salt, hmac_iter)
model = load_safe(proto, key, backend, context)
except LifetimeError:
pass
@ -177,9 +183,9 @@ def create(
data: PasteData,
lifetime: PasteLifetime,
encoding: PasteEncoding,
config: Config,
backend: object,
salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> PasteId:
context: ContextConfig) -> PasteId:
"""create an unencrypted paste
:param data: paste data
@ -187,18 +193,20 @@ def create(
:param backend: model backend object
"""
pid = PasteId(generate_paste_id())
pid = PasteId(generate_paste_id(config.id_size, config.id_charset))
safe_pid = PasteId(dhash(pid))
data_hash = PasteHash(dhash(data))
sub = None
timestamp = PasteTimestamp(int(time.time()))
if lifetime < 0:
if lifetime is None:
lifetime = config.default_lifetime
elif lifetime < 0:
expiration = -1
else:
expiration = PasteExpiration(timestamp + (lifetime * 60))
safe_data = PasteData(encrypt(data, pid, salt, hmac_iter))
safe_data = PasteData(encrypt(data, pid, context.salt, context.hmac_iter))
model = Paste(
safe_pid,
@ -217,9 +225,9 @@ def create_safe(data: PasteData,
lifetime: PasteLifetime,
sub: Sub,
encoding: PasteEncoding,
config: Config,
backend: object,
salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> Tuple[PasteId,PasteKey]:
context: ContextConfig) -> Tuple[PasteId,PasteKey]:
"""create an encrypted paste
:param data: paste data
@ -229,19 +237,21 @@ def create_safe(data: PasteData,
:param salt: randomization salt
"""
pid = PasteId(generate_paste_id())
pid = PasteId(generate_paste_id(config.id_size, config.id_charset))
safe_pid = PasteId(dhash(pid))
pkey = PasteKey(generate_paste_key())
pkey = PasteKey(generate_paste_key(config.key_size, config.key_charset))
data_hash = PasteHash(dhash(data))
safe_sub = PasteSub(shash(sub, data_hash, pid))
timestamp = PasteTimestamp(int(time.time()))
if lifetime < 0:
if lifetime is None:
lifetime = config.default_lifetime
elif lifetime < 0:
expiration = -1
else:
expiration = PasteExpiration(timestamp + (lifetime * 60))
safe_data = PasteData(encrypt(data, pkey, salt, hmac_iter))
safe_data = PasteData(encrypt(data, pkey, context.salt, context.hmac_iter))
dump(Paste(
safe_pid,
@ -269,21 +279,20 @@ def remove_safe(
sub: Sub,
key: PasteKey,
backend: object,
salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations):
context: ContextConfig):
proto = Paste(pid, sub)
delete_safe(proto, key, backend, salt, hmac_iter)
delete_safe(proto, key, backend, context)
def get(pid: PasteId, backend: object, salt: Salt = Config.salt, hmac_iter: int = Config.hmac_iterations) -> PasteData:
def get(pid: PasteId, backend: object, context: ContextConfig) -> PasteData:
"""conveniently load an unencrypted paste
"""
model = load(Paste(pid), backend)
data = decrypt(model.data, pid, salt, hmac_iter)
data = decrypt(model.data, pid, context.salt, context.hmac_iter)
return PasteData(data), model.expiration, model.encoding
@ -292,12 +301,22 @@ def get_safe(
pid: PasteId,
pkey: PasteKey,
sub: Sub,
config: Config,
backend: object,
salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> PasteData:
context: ContextConfig) -> PasteData:
"""conveniently load an encrypted paste
"""
model = load_safe(Paste(pid, sub), pkey, backend, salt, hmac_iter)
model = load_safe(Paste(pid, sub), pkey, backend, context)
return PasteData(model.data), model.expiration, model.encoding
def get_paste_model_config(configIni: ConfigParser) -> Config:
return get_config(configIni, 'model.paste', Config)
__all__ = [
get_paste_model_config
]

View file

@ -5,7 +5,7 @@ import json
from time import time
from typing import Optional
from httpaste import Config
from httpaste.context import Config as ContextConfig
from httpaste.helper.crypto import (
dhash,
shash,
@ -13,7 +13,7 @@ from httpaste.helper.crypto import (
decrypt,
derive_key,
DecryptionError)
from httpaste.model import (
from httpaste.schema import (
User,
KeyHash,
Index,
@ -39,8 +39,7 @@ def _load(
proto: User,
master_key: str,
backend: object,
salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> Optional[User]:
context: ContextConfig) -> Optional[User]:
"""load user model
:param model: user model prototype
@ -55,7 +54,7 @@ def _load(
return None
try:
serialized_data = decrypt(model.index, master_key, salt, hmac_iter)
serialized_data = decrypt(model.index, master_key, context.salt, context.hmac_iter)
except DecryptionError as e:
raise IndexError('unable to decrypt user index') from e
else:
@ -71,8 +70,7 @@ def _dump(
model: User,
key: MasterKey,
backend: object,
salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> None:
context: ContextConfig) -> None:
"""dump a user model
:param model: user model
@ -87,7 +85,7 @@ def _dump(
serialized_index = json.dumps(model.index).encode('utf-8')
safe_index = SerializedIndex(encrypt(serialized_index, key, salt, hmac_iter))
safe_index = SerializedIndex(encrypt(serialized_index, key, context.salt, context.hmac_iter))
backend.dump(User(*model[:-1], safe_index))
@ -96,7 +94,8 @@ def load_paste_key(
pid: PasteId,
sub: Sub,
key: MasterKey,
backend: object, salt: Salt = Config.salt, hmac_iter: int = Config.hmac_iterations) -> Optional[PasteKey]:
backend: object,
context: ContextConfig) -> Optional[PasteKey]:
"""load a user paste key
:param pid: paste id
@ -106,7 +105,7 @@ def load_paste_key(
:param salt: randomization salt
"""
model = _load(User(sub), key, backend, salt, hmac_iter)
model = _load(User(sub), key, backend, context)
for k, v in model.index.get('pastes').items():
@ -123,8 +122,7 @@ def dump_paste_key(
sub: Sub,
key: MasterKey,
backend: object,
salt: str = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> None:
context: ContextConfig) -> None:
"""dump a user paste key
:param pid: paste id
@ -134,21 +132,20 @@ def dump_paste_key(
:param backend: user model backend
"""
model = _load(User(sub), key, backend, salt, hmac_iter)
model = _load(User(sub), key, backend, context)
model.index.setdefault('pastes', {})[pid.hex()] = {
'key': pkey.hex()
}
_dump(model, key, backend, salt, hmac_iter)
_dump(model, key, backend, context)
def authenticate(
user_id: bytes,
password: bytes,
backend: object,
salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations):
context: ContextConfig):
"""authenticate a user
:param user_id: human-readable user id
@ -156,7 +153,7 @@ def authenticate(
"""
sub = Sub(dhash(user_id))
key = MasterKey(derive_key(password, salt, hmac_iter))
key = MasterKey(derive_key(password, context.salt, context.hmac_iter))
key_hash = KeyHash(dhash(key))
proto = User(sub)
@ -164,7 +161,7 @@ def authenticate(
bogus_decline_msg = 'unable to authenticate'
try:
model = _load(proto, key, backend, salt, hmac_iter)
model = _load(proto, key, backend, context)
except IndexError as e:
raise AuthenticationError(bogus_decline_msg) from e
@ -175,7 +172,7 @@ def authenticate(
}
model = User(sub, key_hash, Index(data))
_dump(model, key, backend, salt, hmac_iter)
_dump(model, key, backend, context)
else:
if model.key_hash != key_hash:

View file

@ -0,0 +1,137 @@
from typing import NamedTuple, Optional, Dict, Union, Any, TypedDict
class PasteDataSchema:
"""Paste Interface schema between Model and Backend
"""
pid = bytes
data = bytes
data_hash = bytes
sub = bytes
timestamp = int
lifetime = int
expiration = int
encoding = str
class UserDataSchema:
"""User Interface Schema between Model and Backend
"""
sub = bytes
key_hash = bytes
index = bytes
class Salt(bytes):
"""Salt
"""
class PasteData(PasteDataSchema.data):
"""Paste Data
"""
class PasteHash(PasteDataSchema.data_hash):
"""Paste Data Hash
"""
class PasteTimestamp(PasteDataSchema.timestamp):
"""Paste Timestamp
"""
class PasteEncoding(PasteDataSchema.encoding):
"""
"""
class PasteExpiration(PasteDataSchema.expiration):
"""Paste Expiration
< 0: after first acccess
0: never
"""
class PasteLifetime(PasteDataSchema.lifetime):
"""Paste Lifetime
"""
class PasteSub(PasteDataSchema.sub):
"""Hashed user id
"""
class KeyHash(UserDataSchema.key_hash):
"""User Master Key Hash
"""
class PasteKey(bytes):
"""Paste encryption key
"""
class PasteId(PasteDataSchema.pid):
"""Paste unique identifier
"""
class MasterKey(bytes):
"""User's master encryption key
"""
class Sub(UserDataSchema.sub):
"""User id
"""
class Index(TypedDict):
"""User Paste Index
"""
auth_expires: int
pastes: Dict[str, Dict[str, Any]]
class SerializedIndex(UserDataSchema.index):
"""User Paste Index (serialized)
"""
class User(NamedTuple):
"""Global User Model (and Prototype)
non-optional values are prototype values
"""
#: user id
sub: Sub
#: user's master key hash
key_hash: Optional[KeyHash] = None
#: user's paste index
index: Optional[Union[Index, SerializedIndex]] = None
class Paste(NamedTuple):
"""Global Paste Model (and Prototype)
non-optional values are prototype values
"""
#: paste id
pid: PasteId
#: paste owner
sub: Optional[PasteSub] = None
#: paste data
data: Optional[PasteData] = None
#: paste data hash
data_hash: Optional[PasteHash] = None
#: paste timestamp
expiration: Optional[PasteExpiration] = None
#: paste encoding
encoding: Optional[PasteEncoding] = None

17
src/httpaste/server.py Executable file
View file

@ -0,0 +1,17 @@
from typing import NamedTuple
from configparser import ConfigParser
from httpaste.helper.config import get_config
class Config(NamedTuple):
"""connexion config
"""
swagger_ui: bool = True
bind_address: str = None
def get_server_config(configIni: ConfigParser) -> Config:
return get_config(configIni, 'server', Config)

View file

View file

@ -0,0 +1,134 @@
#!/usr/bin/env python3
import pytest
from textwrap import dedent
from unittest.mock import mock_open, patch
from configparser import ConfigParser
from pathlib import Path
@pytest.fixture
def module():
from httpaste import backend
return backend
class Test_get_backend_config():
@pytest.fixture(autouse=True)
def setup(self, module):
self.func = module.get_backend_config
def test_default_file(self, module):
data = dedent("""
[backend]
type = file
[backend.file]
base_dirname = 'sample_data'
""")
path = Path('/foo')
configIni = ConfigParser()
with patch('builtins.open', mock_open(read_data=data)):
configIni.read(str(path))
config = self.func(configIni, path)
assert isinstance(config, module.Config)
assert issubclass(config.interface, module.BackendInterface)
assert str(config.config.base_dirname) == '/foo/sample_data'
def test_sqlite(self, module):
data = dedent("""
[backend]
type = sqlite
[backend.sqlite]
uri = 'foobar.db'
""")
configIni = ConfigParser()
with patch('builtins.open', mock_open(read_data=data)):
configIni.read('void')
config = self.func(configIni, Path('/foo'))
assert str(config.config.uri) == '/foo/foobar.db'
def test_mysql(self, module):
data = dedent("""
[backend]
type = mysql
[backend.mysql]
user = 'foo'
password = bar
host = manana
database = test
""")
configIni = ConfigParser()
with patch('builtins.open', mock_open(read_data=data)):
configIni.read('void')
config = self.func(configIni, Path('/foo'))
assert config.config.user == 'foo'
assert config.config.password == 'bar'
assert config.config.host == 'manana'
assert config.config.database == 'test'
#class Test_load():
#
# @pytest.fixture(autouse=True)
# def setup(self, module):
#
# self.func = module.load
#
# def test_missing_parameter(self, module):
#
# config = module.Config()
# config.name = 'file'
# config.parameters = {}
#
# with pytest.raises(module.BackendError):
# self.func(config)
#
# def test_unknown_parameter(self, module):
#
# config = module.Config()
# config.name = 'file'
# config.parameters = {
# 'base_dirname': 'foofoo',
# 'foo': 'bar'
# }
#
# with pytest.raises(module.BackendError):
# self.func(config)
#
# def test_file(self, module):
#
# config = module.Config()
# config.name = 'file'
# config.parameters = {
# 'base_dirname': 'foofoo',
# 'user_dirnamea': 'test'
# }
#
# backend = self.func(config)
#
# assert isinstance(backend, module.BackendInterface)

View file

View file

@ -0,0 +1,149 @@
import pytest
from typing import NamedTuple
from unittest.mock import mock_open, patch
from textwrap import dedent
from pathlib import Path
from configparser import ConfigParser
@pytest.fixture
def module():
from httpaste.helper import config
return config
@pytest.fixture
def mock_aclass():
class Foobar(NamedTuple):
foo: int
bar: str = 'test'
return Foobar
@pytest.fixture
def mock_aclass_special():
class Foobar(NamedTuple):
foobar: Path
return Foobar
class Test_typecast():
@pytest.fixture(autouse=True)
def setup(self, module, mock_aclass):
self.func = module.typecast
self.mock_aclass = mock_aclass
def test_default(self, module):
foobar = {
'foo': '45'
}
result = self.func(foobar, self.mock_aclass)
assert isinstance(result, dict)
assert result['foo'] == 45
assert result.get('bar') is None
def test_type_mismatch(self, module):
foobar = {
'foo': 'foobar'
}
with pytest.raises(ValueError):
self.func(foobar, self.mock_aclass)
def test_unknown_key(self, module):
foobar = {
'foo': '45',
'foobar': 'foobar'
}
with pytest.raises(KeyError):
self.func(foobar, self.mock_aclass)
class Test_get_config():
@pytest.fixture(autouse=True)
def setup(self, module, mock_aclass):
self.func = module.get_config
self.mock_aclass = mock_aclass
def test_default(self):
data = dedent("""
[foobar]
foo = 45
""")
configIni = ConfigParser()
with patch('builtins.open', mock_open(read_data=data)):
configIni.read(str('void'))
result = self.func(configIni, 'foobar', self.mock_aclass)
assert isinstance(result, self.mock_aclass)
assert result.foo == 45
def test_relative_path(self, mock_aclass_special):
data = dedent("""
[foobar]
foobar = 'bar/foo'
""")
dirname = Path('/foo/bar')
configIni = ConfigParser()
with patch('builtins.open', mock_open(read_data=data)):
configIni.read(str('void'))
result = self.func(configIni, 'foobar', mock_aclass_special, dirname)
assert isinstance(result, mock_aclass_special)
assert isinstance(result.foobar, Path)
assert str(result.foobar) == '/foo/bar/bar/foo'
def test_absolute_path(self, mock_aclass_special):
data = dedent("""
[foobar]
foobar = '/bar/foo'
""")
configIni = ConfigParser()
with patch('builtins.open', mock_open(read_data=data)):
configIni.read(str('void'))
result = self.func(configIni, 'foobar', mock_aclass_special)
assert isinstance(result, mock_aclass_special)
assert isinstance(result.foobar, Path)
assert str(result.foobar) == '/bar/foo'

View file

@ -0,0 +1,79 @@
#!/usr/bin/env python3
import pytest
from textwrap import dedent
from unittest.mock import mock_open, patch
from configparser import ConfigParser
from pathlib import Path
@pytest.fixture
def module():
from httpaste.model import paste
return paste
class Test_get_paste_model_config():
@pytest.fixture(autouse=True)
def setup(self, module):
self.func = module.get_paste_model_config
def test_default(self, module):
data = ''
configIni = ConfigParser()
with patch('builtins.open', mock_open(read_data=data)):
configIni.read('void')
result = self.func(configIni)
assert isinstance(result, module._Config)
assert isinstance(result.id_size, int), result.id_size
assert isinstance(result.key_size, int), result.key_size
#class Test_load():
#
# @pytest.fixture(autouse=True)
# def setup(self, module):
#
# self.func = module.load
#
# def test_missing_parameter(self, module):
#
# config = module.Config()
# config.name = 'file'
# config.parameters = {}
#
# with pytest.raises(module.BackendError):
# self.func(config)
#
# def test_unknown_parameter(self, module):
#
# config = module.Config()
# config.name = 'file'
# config.parameters = {
# 'base_dirname': 'foofoo',
# 'foo': 'bar'
# }
#
# with pytest.raises(module.BackendError):
# self.func(config)
#
# def test_file(self, module):
#
# config = module.Config()
# config.name = 'file'
# config.parameters = {
# 'base_dirname': 'foofoo',
# 'user_dirnamea': 'test'
# }
#
# backend = self.func(config)
#
# assert isinstance(backend, module.BackendInterface)