feat(mode/user): implement complex index
to accomodate more personalized user data, the index can now hold more than just paste constraints. It was planned to implement an authentication expiration, however, this does not make sense in an HTTP basic auth context.
This commit is contained in:
parent
bcfab0fbad
commit
bd7af05850
3 changed files with 58 additions and 27 deletions
|
|
@ -6,6 +6,12 @@ acting as cells.
|
|||
from pathlib import Path
|
||||
from ast import literal_eval
|
||||
|
||||
COLUMNS = [
|
||||
'sub',
|
||||
'key_hash',
|
||||
'index',
|
||||
]
|
||||
|
||||
|
||||
def load(
|
||||
proto: object,
|
||||
|
|
@ -22,15 +28,17 @@ def load(
|
|||
return None
|
||||
|
||||
cells = {}
|
||||
for column in ['key_hash', 'index']:
|
||||
for column in COLUMNS[1:]:
|
||||
|
||||
cell = row.joinpath(column)
|
||||
|
||||
if getattr(model_schema, column) == bytes:
|
||||
|
||||
if not cell.exists():
|
||||
cells[column] = None
|
||||
elif getattr(model_schema, column) == bytes:
|
||||
cells[column] = cell.read_bytes()
|
||||
elif getattr(model_schema, column) == str:
|
||||
cells[column] = cell.read_text()
|
||||
else:
|
||||
|
||||
cells[column] = literal_eval(cell.read_text())
|
||||
|
||||
return model_class(
|
||||
|
|
@ -46,7 +54,7 @@ def dump(model: object, path: Path, model_schema: object) -> None:
|
|||
row = path.joinpath(model.sub.hex())
|
||||
row.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for column in ['key_hash', 'index']:
|
||||
for column in COLUMNS[1:]:
|
||||
|
||||
cell = row.joinpath(column)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
"""Model
|
||||
"""
|
||||
from typing import NamedTuple, Optional, Dict, Union
|
||||
from typing import NamedTuple, Optional, Dict, Union, Any, TypedDict
|
||||
|
||||
|
||||
class PasteDataSchema:
|
||||
|
|
@ -89,9 +89,11 @@ class Sub(UserDataSchema.sub):
|
|||
"""
|
||||
|
||||
|
||||
class Index(Dict[PasteId, PasteKey]):
|
||||
class Index(TypedDict):
|
||||
"""User Paste Index
|
||||
"""
|
||||
auth_expires: int
|
||||
pastes: Dict[str, Dict[str, Any]]
|
||||
|
||||
|
||||
class SerializedIndex(UserDataSchema.index):
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
"""user model interface
|
||||
"""
|
||||
import json
|
||||
from time import time
|
||||
from typing import Optional
|
||||
|
||||
from httpaste import Config
|
||||
|
|
@ -34,7 +35,7 @@ class IndexError(Exception):
|
|||
"""
|
||||
|
||||
|
||||
def load(
|
||||
def _load(
|
||||
proto: User,
|
||||
master_key: str,
|
||||
backend: object,
|
||||
|
|
@ -54,16 +55,19 @@ def load(
|
|||
return None
|
||||
|
||||
try:
|
||||
serialized_data = decrypt(model.index, master_key, salt, hmac_iter)
|
||||
except DecryptionError as e:
|
||||
raise IndexError('unable to decrypt user index') from e
|
||||
else:
|
||||
data = json.loads(serialized_data)
|
||||
|
||||
return User(
|
||||
*model[:-1],
|
||||
Index(**json.loads(decrypt(model.index, master_key, salt, hmac_iter)))
|
||||
Index(**data)
|
||||
)
|
||||
except DecryptionError as e:
|
||||
|
||||
raise IndexError('unable to decrypt user index') from e
|
||||
|
||||
|
||||
def dump(
|
||||
def _dump(
|
||||
model: User,
|
||||
key: MasterKey,
|
||||
backend: object,
|
||||
|
|
@ -77,7 +81,7 @@ def dump(
|
|||
:param salt: randomization salt
|
||||
"""
|
||||
|
||||
if not isinstance(model.index, Index):
|
||||
if model.index is not None and not isinstance(model.index, dict):
|
||||
|
||||
raise BaseException('index serialization pre-processing not allowed.')
|
||||
|
||||
|
|
@ -102,11 +106,13 @@ def load_paste_key(
|
|||
:param salt: randomization salt
|
||||
"""
|
||||
|
||||
for k, v in load(User(sub), key, backend, salt, hmac_iter).index.items():
|
||||
model = _load(User(sub), key, backend, salt, hmac_iter)
|
||||
|
||||
for k, v in model.index.get('pastes').items():
|
||||
|
||||
if bytes.fromhex(k) == pid:
|
||||
|
||||
return PasteKey(bytes.fromhex(v))
|
||||
return PasteKey(bytes.fromhex(v.get('key')))
|
||||
|
||||
return None
|
||||
|
||||
|
|
@ -128,12 +134,13 @@ def dump_paste_key(
|
|||
:param backend: user model backend
|
||||
"""
|
||||
|
||||
model = load(User(sub), key, backend, salt, hmac_iter)
|
||||
model = _load(User(sub), key, backend, salt, hmac_iter)
|
||||
|
||||
dump(User(*model[:-1], Index({
|
||||
**model.index,
|
||||
**{pid.hex(): pkey.hex()}
|
||||
})), key, backend, salt, hmac_iter)
|
||||
model.index.setdefault('pastes', {})[pid.hex()] = {
|
||||
'key': pkey.hex()
|
||||
}
|
||||
|
||||
_dump(model, key, backend, salt, hmac_iter)
|
||||
|
||||
|
||||
def authenticate(
|
||||
|
|
@ -154,22 +161,36 @@ def authenticate(
|
|||
|
||||
proto = User(sub)
|
||||
|
||||
bogus_decline_msg = 'unable to authenticate'
|
||||
|
||||
try:
|
||||
model = load(proto, key, backend, salt, hmac_iter)
|
||||
model = _load(proto, key, backend, salt, hmac_iter)
|
||||
except IndexError as e:
|
||||
raise AuthenticationError('you dun goofed')
|
||||
raise AuthenticationError(bogus_decline_msg) from e
|
||||
|
||||
if not model:
|
||||
|
||||
model = User(sub, key_hash, Index({}))
|
||||
dump(model, key, backend, salt, hmac_iter)
|
||||
data = {
|
||||
'auth_expires': int(time()) + (1 * 60)
|
||||
}
|
||||
|
||||
model = User(sub, key_hash, Index(data))
|
||||
_dump(model, key, backend, salt, hmac_iter)
|
||||
else:
|
||||
|
||||
if model.key_hash != key_hash:
|
||||
|
||||
raise AuthenticationError('you dun goofed')
|
||||
raise AuthenticationError(bogus_decline_msg)
|
||||
|
||||
return {
|
||||
'sub': sub,
|
||||
'master_key': key
|
||||
}
|
||||
|
||||
|
||||
__all__ = [
|
||||
AuthenticationError,
|
||||
load_paste_key,
|
||||
dump_paste_key,
|
||||
authenticate
|
||||
]
|
||||
Loading…
Add table
Add a link
Reference in a new issue