From bd7af0585011ccbce7d7a105d6c913de0ed3579a Mon Sep 17 00:00:00 2001 From: Tiara Rodney Date: Sat, 2 Apr 2022 17:59:04 +0200 Subject: [PATCH] feat(mode/user): implement complex index to accomodate more personalized user data, the index can now hold more than just paste constraints. It was planned to implement an authentication expiration, however, this does not make sense in an HTTP basic auth context. --- src/httpaste/backend/file/user.py | 18 ++++++--- src/httpaste/model/__init__.py | 6 ++- src/httpaste/model/user.py | 61 +++++++++++++++++++++---------- 3 files changed, 58 insertions(+), 27 deletions(-) diff --git a/src/httpaste/backend/file/user.py b/src/httpaste/backend/file/user.py index 02255b2..72c9256 100644 --- a/src/httpaste/backend/file/user.py +++ b/src/httpaste/backend/file/user.py @@ -6,6 +6,12 @@ acting as cells. from pathlib import Path from ast import literal_eval +COLUMNS = [ + 'sub', + 'key_hash', + 'index', +] + def load( proto: object, @@ -22,15 +28,17 @@ def load( return None cells = {} - for column in ['key_hash', 'index']: + for column in COLUMNS[1:]: cell = row.joinpath(column) - if getattr(model_schema, column) == bytes: - + if not cell.exists(): + cells[column] = None + elif getattr(model_schema, column) == bytes: cells[column] = cell.read_bytes() + elif getattr(model_schema, column) == str: + cells[column] = cell.read_text() else: - cells[column] = literal_eval(cell.read_text()) return model_class( @@ -46,7 +54,7 @@ def dump(model: object, path: Path, model_schema: object) -> None: row = path.joinpath(model.sub.hex()) row.mkdir(parents=True, exist_ok=True) - for column in ['key_hash', 'index']: + for column in COLUMNS[1:]: cell = row.joinpath(column) diff --git a/src/httpaste/model/__init__.py b/src/httpaste/model/__init__.py index 955ded4..e41e4c1 100644 --- a/src/httpaste/model/__init__.py +++ b/src/httpaste/model/__init__.py @@ -1,6 +1,6 @@ """Model """ -from typing import NamedTuple, Optional, Dict, Union +from typing import NamedTuple, Optional, Dict, Union, Any, TypedDict class PasteDataSchema: @@ -89,9 +89,11 @@ class Sub(UserDataSchema.sub): """ -class Index(Dict[PasteId, PasteKey]): +class Index(TypedDict): """User Paste Index """ + auth_expires: int + pastes: Dict[str, Dict[str, Any]] class SerializedIndex(UserDataSchema.index): diff --git a/src/httpaste/model/user.py b/src/httpaste/model/user.py index 7639127..b925063 100755 --- a/src/httpaste/model/user.py +++ b/src/httpaste/model/user.py @@ -2,6 +2,7 @@ """user model interface """ import json +from time import time from typing import Optional from httpaste import Config @@ -34,7 +35,7 @@ class IndexError(Exception): """ -def load( +def _load( proto: User, master_key: str, backend: object, @@ -54,16 +55,19 @@ def load( return None try: - return User( - *model[:-1], - Index(**json.loads(decrypt(model.index, master_key, salt, hmac_iter))) - ) + serialized_data = decrypt(model.index, master_key, salt, hmac_iter) except DecryptionError as e: - raise IndexError('unable to decrypt user index') from e + else: + data = json.loads(serialized_data) + + return User( + *model[:-1], + Index(**data) + ) -def dump( +def _dump( model: User, key: MasterKey, backend: object, @@ -77,7 +81,7 @@ def dump( :param salt: randomization salt """ - if not isinstance(model.index, Index): + if model.index is not None and not isinstance(model.index, dict): raise BaseException('index serialization pre-processing not allowed.') @@ -102,11 +106,13 @@ def load_paste_key( :param salt: randomization salt """ - for k, v in load(User(sub), key, backend, salt, hmac_iter).index.items(): + model = _load(User(sub), key, backend, salt, hmac_iter) + + for k, v in model.index.get('pastes').items(): if bytes.fromhex(k) == pid: - return PasteKey(bytes.fromhex(v)) + return PasteKey(bytes.fromhex(v.get('key'))) return None @@ -128,12 +134,13 @@ def dump_paste_key( :param backend: user model backend """ - model = load(User(sub), key, backend, salt, hmac_iter) + model = _load(User(sub), key, backend, salt, hmac_iter) - dump(User(*model[:-1], Index({ - **model.index, - **{pid.hex(): pkey.hex()} - })), key, backend, salt, hmac_iter) + model.index.setdefault('pastes', {})[pid.hex()] = { + 'key': pkey.hex() + } + + _dump(model, key, backend, salt, hmac_iter) def authenticate( @@ -154,22 +161,36 @@ def authenticate( proto = User(sub) + bogus_decline_msg = 'unable to authenticate' + try: - model = load(proto, key, backend, salt, hmac_iter) + model = _load(proto, key, backend, salt, hmac_iter) except IndexError as e: - raise AuthenticationError('you dun goofed') + raise AuthenticationError(bogus_decline_msg) from e if not model: - model = User(sub, key_hash, Index({})) - dump(model, key, backend, salt, hmac_iter) + data = { + 'auth_expires': int(time()) + (1 * 60) + } + + model = User(sub, key_hash, Index(data)) + _dump(model, key, backend, salt, hmac_iter) else: if model.key_hash != key_hash: - raise AuthenticationError('you dun goofed') + raise AuthenticationError(bogus_decline_msg) return { 'sub': sub, 'master_key': key } + + +__all__ = [ + AuthenticationError, + load_paste_key, + dump_paste_key, + authenticate +] \ No newline at end of file