diff --git a/src/httpaste/backend/file/user.py b/src/httpaste/backend/file/user.py index 02255b2..72c9256 100644 --- a/src/httpaste/backend/file/user.py +++ b/src/httpaste/backend/file/user.py @@ -6,6 +6,12 @@ acting as cells. from pathlib import Path from ast import literal_eval +COLUMNS = [ + 'sub', + 'key_hash', + 'index', +] + def load( proto: object, @@ -22,15 +28,17 @@ def load( return None cells = {} - for column in ['key_hash', 'index']: + for column in COLUMNS[1:]: cell = row.joinpath(column) - if getattr(model_schema, column) == bytes: - + if not cell.exists(): + cells[column] = None + elif getattr(model_schema, column) == bytes: cells[column] = cell.read_bytes() + elif getattr(model_schema, column) == str: + cells[column] = cell.read_text() else: - cells[column] = literal_eval(cell.read_text()) return model_class( @@ -46,7 +54,7 @@ def dump(model: object, path: Path, model_schema: object) -> None: row = path.joinpath(model.sub.hex()) row.mkdir(parents=True, exist_ok=True) - for column in ['key_hash', 'index']: + for column in COLUMNS[1:]: cell = row.joinpath(column) diff --git a/src/httpaste/model/__init__.py b/src/httpaste/model/__init__.py index b9d1b4f..7d1280f 100644 --- a/src/httpaste/model/__init__.py +++ b/src/httpaste/model/__init__.py @@ -1,6 +1,6 @@ """Model """ -from typing import NamedTuple, Optional, Dict, Union +from typing import NamedTuple, Optional, Dict, Union, Any, TypedDict class PasteDataSchema: @@ -98,9 +98,11 @@ class Sub(UserDataSchema.sub): """ -class Index(Dict[PasteId, PasteKey]): +class Index(TypedDict): """User Paste Index """ + auth_expires: int + pastes: Dict[str, Dict[str, Any]] class SerializedIndex(UserDataSchema.index): diff --git a/src/httpaste/model/user.py b/src/httpaste/model/user.py index 7639127..b925063 100755 --- a/src/httpaste/model/user.py +++ b/src/httpaste/model/user.py @@ -2,6 +2,7 @@ """user model interface """ import json +from time import time from typing import Optional from httpaste import Config @@ -34,7 +35,7 @@ class IndexError(Exception): """ -def load( +def _load( proto: User, master_key: str, backend: object, @@ -54,16 +55,19 @@ def load( return None try: - return User( - *model[:-1], - Index(**json.loads(decrypt(model.index, master_key, salt, hmac_iter))) - ) + serialized_data = decrypt(model.index, master_key, salt, hmac_iter) except DecryptionError as e: - raise IndexError('unable to decrypt user index') from e + else: + data = json.loads(serialized_data) + + return User( + *model[:-1], + Index(**data) + ) -def dump( +def _dump( model: User, key: MasterKey, backend: object, @@ -77,7 +81,7 @@ def dump( :param salt: randomization salt """ - if not isinstance(model.index, Index): + if model.index is not None and not isinstance(model.index, dict): raise BaseException('index serialization pre-processing not allowed.') @@ -102,11 +106,13 @@ def load_paste_key( :param salt: randomization salt """ - for k, v in load(User(sub), key, backend, salt, hmac_iter).index.items(): + model = _load(User(sub), key, backend, salt, hmac_iter) + + for k, v in model.index.get('pastes').items(): if bytes.fromhex(k) == pid: - return PasteKey(bytes.fromhex(v)) + return PasteKey(bytes.fromhex(v.get('key'))) return None @@ -128,12 +134,13 @@ def dump_paste_key( :param backend: user model backend """ - model = load(User(sub), key, backend, salt, hmac_iter) + model = _load(User(sub), key, backend, salt, hmac_iter) - dump(User(*model[:-1], Index({ - **model.index, - **{pid.hex(): pkey.hex()} - })), key, backend, salt, hmac_iter) + model.index.setdefault('pastes', {})[pid.hex()] = { + 'key': pkey.hex() + } + + _dump(model, key, backend, salt, hmac_iter) def authenticate( @@ -154,22 +161,36 @@ def authenticate( proto = User(sub) + bogus_decline_msg = 'unable to authenticate' + try: - model = load(proto, key, backend, salt, hmac_iter) + model = _load(proto, key, backend, salt, hmac_iter) except IndexError as e: - raise AuthenticationError('you dun goofed') + raise AuthenticationError(bogus_decline_msg) from e if not model: - model = User(sub, key_hash, Index({})) - dump(model, key, backend, salt, hmac_iter) + data = { + 'auth_expires': int(time()) + (1 * 60) + } + + model = User(sub, key_hash, Index(data)) + _dump(model, key, backend, salt, hmac_iter) else: if model.key_hash != key_hash: - raise AuthenticationError('you dun goofed') + raise AuthenticationError(bogus_decline_msg) return { 'sub': sub, 'master_key': key } + + +__all__ = [ + AuthenticationError, + load_paste_key, + dump_paste_key, + authenticate +] \ No newline at end of file