Merged in feature/HTTPASTE-3/model/user (pull request #2)

feat(mode/user): implement complex index
This commit is contained in:
Tiara Rodney 2022-04-02 16:06:52 +00:00
commit 83fee6293f
3 changed files with 58 additions and 27 deletions

View file

@ -6,6 +6,12 @@ acting as cells.
from pathlib import Path from pathlib import Path
from ast import literal_eval from ast import literal_eval
COLUMNS = [
'sub',
'key_hash',
'index',
]
def load( def load(
proto: object, proto: object,
@ -22,15 +28,17 @@ def load(
return None return None
cells = {} cells = {}
for column in ['key_hash', 'index']: for column in COLUMNS[1:]:
cell = row.joinpath(column) cell = row.joinpath(column)
if getattr(model_schema, column) == bytes: if not cell.exists():
cells[column] = None
elif getattr(model_schema, column) == bytes:
cells[column] = cell.read_bytes() cells[column] = cell.read_bytes()
elif getattr(model_schema, column) == str:
cells[column] = cell.read_text()
else: else:
cells[column] = literal_eval(cell.read_text()) cells[column] = literal_eval(cell.read_text())
return model_class( return model_class(
@ -46,7 +54,7 @@ def dump(model: object, path: Path, model_schema: object) -> None:
row = path.joinpath(model.sub.hex()) row = path.joinpath(model.sub.hex())
row.mkdir(parents=True, exist_ok=True) row.mkdir(parents=True, exist_ok=True)
for column in ['key_hash', 'index']: for column in COLUMNS[1:]:
cell = row.joinpath(column) cell = row.joinpath(column)

View file

@ -1,6 +1,6 @@
"""Model """Model
""" """
from typing import NamedTuple, Optional, Dict, Union from typing import NamedTuple, Optional, Dict, Union, Any, TypedDict
class PasteDataSchema: class PasteDataSchema:
@ -98,9 +98,11 @@ class Sub(UserDataSchema.sub):
""" """
class Index(Dict[PasteId, PasteKey]): class Index(TypedDict):
"""User Paste Index """User Paste Index
""" """
auth_expires: int
pastes: Dict[str, Dict[str, Any]]
class SerializedIndex(UserDataSchema.index): class SerializedIndex(UserDataSchema.index):

View file

@ -2,6 +2,7 @@
"""user model interface """user model interface
""" """
import json import json
from time import time
from typing import Optional from typing import Optional
from httpaste import Config from httpaste import Config
@ -34,7 +35,7 @@ class IndexError(Exception):
""" """
def load( def _load(
proto: User, proto: User,
master_key: str, master_key: str,
backend: object, backend: object,
@ -54,16 +55,19 @@ def load(
return None return None
try: try:
serialized_data = decrypt(model.index, master_key, salt, hmac_iter)
except DecryptionError as e:
raise IndexError('unable to decrypt user index') from e
else:
data = json.loads(serialized_data)
return User( return User(
*model[:-1], *model[:-1],
Index(**json.loads(decrypt(model.index, master_key, salt, hmac_iter))) Index(**data)
) )
except DecryptionError as e:
raise IndexError('unable to decrypt user index') from e
def dump( def _dump(
model: User, model: User,
key: MasterKey, key: MasterKey,
backend: object, backend: object,
@ -77,7 +81,7 @@ def dump(
:param salt: randomization salt :param salt: randomization salt
""" """
if not isinstance(model.index, Index): if model.index is not None and not isinstance(model.index, dict):
raise BaseException('index serialization pre-processing not allowed.') raise BaseException('index serialization pre-processing not allowed.')
@ -102,11 +106,13 @@ def load_paste_key(
:param salt: randomization salt :param salt: randomization salt
""" """
for k, v in load(User(sub), key, backend, salt, hmac_iter).index.items(): model = _load(User(sub), key, backend, salt, hmac_iter)
for k, v in model.index.get('pastes').items():
if bytes.fromhex(k) == pid: if bytes.fromhex(k) == pid:
return PasteKey(bytes.fromhex(v)) return PasteKey(bytes.fromhex(v.get('key')))
return None return None
@ -128,12 +134,13 @@ def dump_paste_key(
:param backend: user model backend :param backend: user model backend
""" """
model = load(User(sub), key, backend, salt, hmac_iter) model = _load(User(sub), key, backend, salt, hmac_iter)
dump(User(*model[:-1], Index({ model.index.setdefault('pastes', {})[pid.hex()] = {
**model.index, 'key': pkey.hex()
**{pid.hex(): pkey.hex()} }
})), key, backend, salt, hmac_iter)
_dump(model, key, backend, salt, hmac_iter)
def authenticate( def authenticate(
@ -154,22 +161,36 @@ def authenticate(
proto = User(sub) proto = User(sub)
bogus_decline_msg = 'unable to authenticate'
try: try:
model = load(proto, key, backend, salt, hmac_iter) model = _load(proto, key, backend, salt, hmac_iter)
except IndexError as e: except IndexError as e:
raise AuthenticationError('you dun goofed') raise AuthenticationError(bogus_decline_msg) from e
if not model: if not model:
model = User(sub, key_hash, Index({})) data = {
dump(model, key, backend, salt, hmac_iter) 'auth_expires': int(time()) + (1 * 60)
}
model = User(sub, key_hash, Index(data))
_dump(model, key, backend, salt, hmac_iter)
else: else:
if model.key_hash != key_hash: if model.key_hash != key_hash:
raise AuthenticationError('you dun goofed') raise AuthenticationError(bogus_decline_msg)
return { return {
'sub': sub, 'sub': sub,
'master_key': key 'master_key': key
} }
__all__ = [
AuthenticationError,
load_paste_key,
dump_paste_key,
authenticate
]