fix(controller/paste): proxy public and private

This commit is contained in:
Tiara Rodney 2022-04-02 03:03:35 +02:00
parent 0d6c010cce
commit 6b96bf8efb
24 changed files with 679 additions and 584 deletions

View file

@ -1,5 +1,141 @@
#!/usr/bin/env python3
"""httpaste - secure pasting over http
"""PSEUDO-MAN-PAGE
NAME
httpaste - versatile HTTP pastebin
SYNOPSIS
HTTP [POST|PUT|DELETE|GET] {url}paste/[public|private]
{url}ui
DESCRIPTION
This program offers an HTTP interface for storing public and private data
(a.k.a. pastes). Public data can be accessed through an URL, where as
private pastes additionally require HTTP basic authentication. Creation of
authentication credentials happens on the fly, there is no sign-up process.
Public pastes can only be accessed by knowing their paste ids, they are not
listed on any index, since it isn't technically possible (by design).
All pastes are symetrically encrypted with an HMAC derived key using
{hmac_iterations} iterations and SHA-512 hashing, a server-side salt and a
randomly generated password. Public paste's passwords are derived from
their ids. Private paste's passwords are randomly generated and stored
inside a symetrically encrypted personal database, with the encryption key
also being derived through the same HMAC mechanism, where the HTTP basic
authentication credentials act as the master password.
Paste ids, usernames, and any other identifiable attributes are only stored
inside databases as keyed and salted BLAKE2 hashes.
Default Paste Encoding: {paste_default_encoding}
Default Paste Lifetime: {paste_lifetime} minutes
Minimum Paste Lifetime: 1 minute
Maximum Paste Lifetime: {paste_max_lifetime} hours
EXAMPLES
POST Public Paste
$ echo '#My public paste' | curl {url}paste/public \\
-F 'data=<-'
{url}/paste/private/I0ah7fyA
GET Public Paste
$ curl {url}/paste/public/I0ah7fyA
#My public paste
POST Private Paste
$ echo '#My private paste' | curl {url}paste/private \\
-F 'data=<-' \\
-u myusername:mypassword
{url}paste/private/4FtNL75g
GET Private Paste
$ curl {url}paste/private -u myusername:mypassword
#My private paste
POST Paste (with non-default expiration)
$ echo '#My paste expires in 20 minutes' | curl \\
{url}paste/public?lifetime=20 \\
-F 'data=<-' \\
{url}paste/public/xMxEmNi8
POST Paste (with expiration after first read)
$ echo '#My paste expires after first read' | curl \\
{url}paste/public?lifetime=-1 \\
-F 'data=<-' \\
{url}paste/public/4FtNL75g
POST Paste with binary data
$ cat my.pdf | base64 | curl \\
"{url}paste/public?encoding=base64" \\
-F "data=<-"
{url}paste/public/zYWpEzXU
GET Paste (with shell syntax highlighting)
$ curl "{url}paste/public/I0ah7fyA?syntax=json"
#My public paste
GET Paste (with line numbers)
$ curl "{url}paste/public/I0ah7fyA?syntax=shell&linenos=true"
0001: #My public paste
0002:
GET Paste (with formatted output)
$ curl "{url}paste/public/I0ah7fyA?syntax=shell&format=html"
--e.g. HTML OUTPUT WITH INLINE CSS--
GET Paste (and set HTTP response content type)
$ curl "{url}paste/public/zYWpEzXU?mime=application/pdf"
--e.g. BINARY--
SEE ALSO
Documentation <https://victorykit.bitbucket.org/httpaste>
Sources <https://bitbucket.org/victorykit/httpaste>
Host (HTTPS) <https://httpaste.it>
(HTTP) <http://httpaste.it>
NOTES
THIS PROGRAM IS FREE SOFTWARE.
IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS
THE PROGRAM AS PERMITTED, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY
GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE
USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF
DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD
PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS),
EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF
SUCH DAMAGES.
"""
from typing import NamedTuple, Tuple, Any
from string import ascii_uppercase, digits, ascii_letters, punctuation
@ -13,23 +149,28 @@ from connexion.resolver import RestyResolver
from httpaste.model import Backend
from httpaste.backend import get_backend_map
from httpaste.helper.exception import BadRequestHttpException, ForbiddenHttpException, GoneHttpException, NotFoundHttpException, UnauthorizedHttpException
from httpaste.helper.common import generate_random_string
from httpaste.helper.http import (
BadRequestError,
ForbiddenError,
GoneError,
NotFoundError,
UnauthorizedError)
CONFIGPATH_ENVIRON = 'HTTPASTE_CONFIG'
def get_sanitized_config_charset(charset:str):
def get_sanitized_config_charset(charset: str):
for x in [ "$", "%"]:
for x in ["$", "%"]:
charset = charset.replace(x, f'{x}{x}')
return charset
class ConfigException(BaseException):
class ConfigError(Exception):
"""Config Exception
"""
@ -37,13 +178,17 @@ class ConfigException(BaseException):
class Config:
"""httpaste global config
"""
salt: bytes = get_sanitized_config_charset(generate_random_string(32, ascii_letters + digits + punctuation)).encode('utf-8')
salt: bytes = get_sanitized_config_charset(generate_random_string(
32, ascii_letters + digits + punctuation)).encode('utf-8')
paste_id_size: int = 8
paste_id_charset: str = ascii_letters + digits
paste_key_size: int = 32
paste_key_charset: str = get_sanitized_config_charset(ascii_letters + digits + punctuation)
paste_key_charset: str = get_sanitized_config_charset(
ascii_letters + digits + punctuation)
paste_lifetime: int = 5
backend: Backend = None
hmac_iterations: int = 20000
paste_default_encoding: str = 'utf-8'
class ServerConfig:
@ -53,14 +198,17 @@ class ServerConfig:
bind_address = None
def get_config_path(environ:str=CONFIGPATH_ENVIRON):
def get_config_path(environ: str = CONFIGPATH_ENVIRON):
"""
"""
try:
return os.environ[environ]
except KeyError as e:
raise ConfigException('environment variable \'{environ}\' not set.') from e
raise ConfigError(
'environment variable \'{environ}\' not set.') from e
def load_config(path: str) -> Tuple[Config, ServerConfig]:
@ -78,8 +226,10 @@ def load_config(path: str) -> Tuple[Config, ServerConfig]:
bcl, bparamcl = backends[btype]
except KeyError as e:
bids = ', '.join(backends.keys())
msg = f'invalid backend \'{btype}\' in \'{path}\'. must be any of [{bids}]'
raise ConfigException(msg) from e
raise ConfigError(' '.join((
f'invalid backend \'{btype}\' in \'{path}\'. ',
f'must be any of [{bids}]'
))) from e
config = dict(_config.items('general'))
server_config = dict(_config.items('server'))
@ -105,6 +255,8 @@ def load_config(path: str) -> Tuple[Config, ServerConfig]:
def default_config() -> str:
"""
"""
config = ConfigParser()
@ -148,26 +300,35 @@ def get_flask_app(
options = {"swagger_ui": server_config.swagger_ui}
app = FlaskApp(
__name__,
specification_dir='schema/')
application = FlaskApp(__name__, specification_dir='schema/')
app.add_api(
application.add_api(
'httpaste.openapi.json',
options=options,
resolver=RestyResolver('httpaste.controller')
)
app.add_error_handler(
BadRequestHttpException,
BadRequestHttpException.render)
app.add_error_handler(ForbiddenHttpException, ForbiddenHttpException.render)
app.add_error_handler(GoneHttpException, GoneHttpException.render)
app.add_error_handler(NotFoundHttpException, NotFoundHttpException.render)
app.add_error_handler(UnauthorizedHttpException, UnauthorizedHttpException.render)
for err_cls in [
BadRequestError,
ForbiddenError,
GoneError,
NotFoundError,
UnauthorizedError
]:
application.add_error_handler(
err_cls, getattr(err_cls, 'render')
)
with app.app.app_context():
with application.app.app_context():
application.app.httpaste = config
app.app.httpaste = config
return application
return app
__all__ = [
Config,
ServerConfig,
load_config,
default_config,
get_flask_app
]

View file

@ -4,14 +4,14 @@ import argparse
import os
def _this_dir(basename:str)-> str:
def _this_dir(basename: str) -> str:
"""build path with script directory name and provided basename
"""
return os.path.join(os.path.dirname(__file__), basename)
def _path_output(path, echo:bool=False) -> str:
def _path_output(path, echo: bool = False) -> str:
"""print path content or path
"""
@ -30,12 +30,14 @@ def command_standalone(**kwargs):
"""
from httpaste import load_config, get_flask_app
from gevent.pywsgi import WSGIServer
config, server_config = load_config(kwargs.get('config_path'))
application = get_flask_app(config, server_config)
application.run(port=8080)
http_server = WSGIServer(('', kwargs.get('port')), application)
http_server.serve_forever()
def command_wsgi(**kwargs):
@ -109,10 +111,14 @@ def parser():
p_fcgi = sp.add_parser('fcgi', help=command_fcgi.__doc__)
p_fcgi.add_argument('--echo', '-e', action='store_true')
p_default_config = sp.add_parser('default-config', help=command_default_config.__doc__)
p_default_config = sp.add_parser(
'default-config',
help=command_default_config.__doc__)
p_default_config.add_argument('--dump', '-d')
p_init_backend = sp.add_parser('init-backend', help=command_init_backend.__doc__)
p_init_backend = sp.add_parser(
'init-backend',
help=command_init_backend.__doc__)
p_init_backend.add_argument('--config', '-c', required=True)
return p

View file

@ -1,6 +1,6 @@
"""Filesystem backend
"""
import os
from os import path
from pathlib import Path
from typing import NamedTuple, Optional
@ -12,9 +12,12 @@ class Parameters(NamedTuple):
"""Filesystem backend parameters
"""
#: path of base directory
base_dirname: str
user_dirname:str = 'users'
paste_dirname:str = 'pastes'
#: basename of users table directory
user_dirname: Optional[str] = 'users'
#: basename of pastes table directory
paste_dirname: Optional[str] = 'pastes'
class User(object):
@ -24,22 +27,25 @@ class User(object):
dirname: Path
path: Path
def __init__(self, parameters: Parameters, model_class:type, model_schema:type):
def __init__(
self,
parameters: Parameters,
model_class: type,
model_schema: type):
self.model_class = model_class
self.model_schema = model_schema
self.dirname = os.path.join(parameters.base_dirname, parameters.user_dirname)
self.dirname = path.join(parameters.base_dirname,
parameters.user_dirname)
self.path = Path(self.dirname)
def load(self, proto: object):
return user.load(proto, self.path, self.model_class, self.model_schema)
def dump(self, model: object):
return user.dump(model, self.path, self.model_schema)
@ -60,13 +66,18 @@ class Paste(object):
dirname: str
path: Path
def __init__(self, parameters: Parameters, model_class:type, model_schema:type):
def __init__(
self,
parameters: Parameters,
model_class: type,
model_schema: type):
self.model_class = model_class
self.model_schema = model_schema
self.dirname = os.path.join(parameters.base_dirname, parameters.paste_dirname)
self.dirname = path.join(parameters.base_dirname,
parameters.paste_dirname)
self.path = Path(self.dirname)

View file

@ -7,7 +7,11 @@ from pathlib import Path
from ast import literal_eval
def load(proto: object, path: Path, model_class: type, model_schema: type) -> object:
def load(
proto: object,
path: Path,
model_class: type,
model_schema: type) -> object:
"""load a paste
"""
@ -18,17 +22,34 @@ def load(proto: object, path: Path, model_class: type, model_schema: type) -> ob
return None
cells = {}
for column in ['data', 'data_hash', 'sub', 'timestamp', 'lifetime', 'encoding']:
for column in [
'data',
'data_hash',
'sub',
'timestamp',
'lifetime',
'encoding']:
cell = row.joinpath(column)
cell_schema = getattr(model_schema, column)
try:
cell_schema = getattr(model_schema, column)
except AttributeError:
raise RuntimeError(
'Schema {model_schema.__name__} has no attribute {column}'
)
if not cell.exists():
cells[column] = None
elif cell_schema == bytes:
cells[column] = cell.read_bytes()
elif cell_schema == str:
cells[column] = cell.read_text()
else:
cells[column] = literal_eval(cell.read_text())
try:
cells[column] = literal_eval(cell.read_text())
except ValueError as e:
raise ValueError(f'error evaluating column [{column}]') from e
return model_class(
proto.pid,
@ -47,7 +68,13 @@ def dump(model: object, path: Path, model_schema: type) -> None:
row = path.joinpath(model.pid.hex())
row.mkdir(parents=True, exist_ok=True)
for column in ['data', 'data_hash', 'sub', 'timestamp', 'lifetime', 'encoding']:
for column in [
'data',
'data_hash',
'sub',
'timestamp',
'lifetime',
'encoding']:
cell = row.joinpath(column)
cell_schema = getattr(model_schema, column)
@ -63,7 +90,6 @@ def dump(model: object, path: Path, model_schema: type) -> None:
def delete(proto: object, path: Path) -> bool:
row = path.joinpath(proto.pid.hex())
if row.exists():

View file

@ -7,7 +7,11 @@ from pathlib import Path
from ast import literal_eval
def load(proto: object, path: Path, model_class: type, model_schema: type) -> object:
def load(
proto: object,
path: Path,
model_class: type,
model_schema: type) -> object:
"""load a paste
"""

View file

@ -11,7 +11,9 @@ class Parameters(NamedTuple):
"""SQLite backend parameters
"""
#: local path or URI
path: str
#: a sqlite3.Connection object (does not apply to config)
connection: Optional[object] = None
@ -21,7 +23,7 @@ class User(object):
connection: Connection
def __init__(self, parameters: Parameters, model_class:type):
def __init__(self, parameters: Parameters, model_class: type):
self.model_class = model_class
@ -43,13 +45,14 @@ class User(object):
return user.init(self.connection)
class Paste(object):
"""SQLite paste model backend
"""
connection: Connection
def __init__(self, parameters: Parameters, model_class:type):
def __init__(self, parameters: Parameters, model_class: type):
self.model_class = model_class

View file

@ -1,90 +1,16 @@
"""NAME
httpaste - secure, easy-to-use, anonymous production pastebin
SYNOPSIS
HTTP [POST/PUT/DELETE/GET] {url}
DESCRIPTION
TODO
EXAMPLES
POST Public Paste
$ echo '#My public paste' | curl {url}paste/public \\
-F 'data=<-'
{url}/paste/private/I0ah7fyA
GET Public Paste
$ curl {url}/paste/public/I0ah7fyA
#My public paste
POST Private Paste
$ echo '#My private paste' | curl {url}paste/private \\
-F 'data=<-' \\
-u myusername:mypassword
{url}paste/private/4FtNL75g
GET Private Paste
$ curl {url}paste/private -u myusername:mypassword
#My private paste
POST Paste (with non-default expiration)
$ echo '#My paste expires in 20 minutes' | curl \\
{url}paste/public?lifetime=20 \\
-F 'data=<-' \\
{url}paste/public/xMxEmNi8
POST Paste (with expiration after first read)
$ echo '#My paste expires after first read' | curl \\
{url}paste/public?lifetime=-1 \\
-F 'data=<-' \\
{url}paste/public/4FtNL75g
GET Paste (with shell syntax highlighting)
$ curl {url}paste/public/I0ah7fyA?syntax=json
#My public paste
GET Paste (with line numbers)
$ curl {url}paste/public/I0ah7fyA?syntax=shell&linenos=true
0001: #My public paste
0002:
GET Paste (with formatted output)
$ curl {url}paste/public/I0ah7fyA?syntax=shell&format=html
--HTML OUTPUT WITH INLINE CSS--
SEE ALSO
https://bitbucket.org/victorykit/httpaste
https://paste.victoryk.it
"""
import connexion
from flask import current_app
import httpaste
def get(**kwargs):
print(dir(connexion.request))
config = current_app.httpaste
return __doc__.format(url=connexion.request.url), 200
return httpaste.__doc__.format(
url=connexion.request.url,
hmac_iterations=config.hmac_iterations,
paste_lifetime=config.paste_lifetime,
paste_max_lifetime=str(round(config.paste_max_lifetime / 60)),
paste_default_encoding=config.paste_default_encoding
), 200

View file

@ -1,14 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<title>httpaste</title>
</head>
<body>
<form><dl>
<dt>Data</dt>
<dd><input type="text"></dd>
<dt>Lifetime</dt>
<dd><input type="text"></dd>
</dl></form>
</body>
</html>

View file

@ -0,0 +1,153 @@
from connexion import request
from connexion.lifecycle import ConnexionResponse
from flask import current_app
from httpaste.helper.common import decode, DecodeError, join_url
import httpaste.model.paste as paste_model
import httpaste.model.user as user_model
from httpaste.helper.http import BadRequestError, GoneError, NotFoundError
from httpaste.model import (
PasteKey,
PasteData,
PasteLifetime,
PasteEncoding,
MasterKey,
Sub)
def delete(**kwargs):
"""
"""
pid = httpaste.model.PasteKey(request['id'].encode('utf-8'))
key = httpaste.model.MasterKey(request['token_info'].get('master_key'))
sub = httpaste.model.Sub(request['token_info'].get('sub'))
pkey = httpaste.model.user.load_paste_key(
pid,
sub,
key,
current_app.httpaste.backend.user,
current_app.httpaste.salt)
httpaste.model.paste.remove_safe(
pid,
sub,
pkey,
current_app.httpaste.backend.paste,
current_app.httpaste.salt)
return None, 200
def get(**kwargs):
"""
"""
config = current_app.httpaste
pid = PasteKey(kwargs['id'].encode('utf-8'))
syntax = kwargs.get('syntax')
formatter = kwargs.get('format', 'terminal256')
linenos = kwargs.get('linenos', False)
mime = kwargs.get('mime', 'text/plain')
if kwargs.get('user') is not None:
# authenticated
key = MasterKey(kwargs['token_info'].get('master_key'))
sub = Sub(kwargs['token_info'].get('sub'))
pkey = user_model.load_paste_key(pid, sub, key, config.backend.user,
config.salt, config.hmac_iterations)
def call(): return paste_model.get_safe(pid, pkey, sub,
config.backend.paste,
config.salt, config.hmac_iterations)
else:
# unauthenticated
def call(): return paste_model.get(pid, config.backend.paste,
config.salt, config.hmac_iterations)
try:
data, lifetime, encoding = call()
except paste_model.LifetimeError as e:
if kwargs.get('user') is not None:
paste_model.remove_safe(pid, sub, pkey, config.backend.paste,
config.salt, config.hmac_iterations)
else:
paste_model.remove(pid, config.backend.paste)
raise GoneError(str(e)) from e
except paste_model.NotFoundError as e:
raise NotFoundError(str(e))
except paste_model.SubError as e:
raise ForbiddenError(str(e))
# burn after read
if lifetime < 0:
if kwargs.get('user') is not None:
paste_model.remove_safe(pid, sub, pkey, config.backend.paste,
config.salt, config.hmac_iterations)
else:
paste_model.remove(pid, config.backend.paste)
if syntax is not None:
data = highlight(data, str(syntax), formatter, linenos)
if encoding is not None:
data = data.decode(encoding)
return ConnexionResponse(
status_code=200,
content_type=mime,
body=data
)
def post(**kwargs):
"""
"""
config = current_app.httpaste
if kwargs['body'].get('data') is None:
raise BadRequestError('form field \'data\' missing.')
encoding = PasteEncoding(kwargs.get('encoding', 'utf-8'))
lifetime = PasteLifetime(kwargs.get('lifetime', config.paste_lifetime))
if encoding not in ['utf-8', 'utf-16', 'ascii']:
try:
decoded_data = decode(kwargs['body'].get('data'), encoding)
except DecodeError as e:
raise BadRequestError(str(e))
else:
pdata = PasteData(decoded_data)
encoding = None
else:
pdata = PasteData(kwargs['body']['data'].encode(encoding))
if kwargs.get('user') is not None:
# authenticated
key = MasterKey(kwargs['token_info'].get('master_key'))
sub = Sub(kwargs['token_info'].get('sub'))
pid, pkey = paste_model.create_safe(pdata, lifetime, sub, encoding,
config.backend.paste, config.salt, config.hmac_iterations)
user_model.dump_paste_key(pid, pkey, sub, key, config.backend.user,
config.salt, config.hmac_iterations)
else:
# unauthenticated
pid = paste_model.create(pdata, lifetime, encoding, config.backend.paste,
config.salt, config.hmac_iterations)
base_url = join_url(request.root_url, request.path)
url = '/'.join((base_url, pid.decode('utf-8')))
return '\n'.join((url, '')), 200

View file

@ -1,38 +1,4 @@
from typing import TypedDict, Optional
import connexion
from flask import current_app
from httpaste.helper.exception import BadRequestHttpException, GoneHttpException, NotFoundHttpException
import httpaste.model
import httpaste.model.paste
import httpaste.model.user
class Form(TypedDict):
data: str
rsa_public_key: Optional[str]
class TokenInfo(TypedDict):
sub: str
token: str
class Lifetime(int):
"""
"""
class PostRequest(TypedDict):
body: Form
token_info: TokenInfo
lifetime: Lifetime
class GetRequest(TypedDict):
id: str
token_info: TokenInfo
from httpaste.controller import paste
def search(**kwargs):
@ -44,111 +10,15 @@ def search(**kwargs):
return 'Hallo', 200
def put(**kwargs):
"""
"""
print(args)
return 'Hallo', 200
def delete(**kwargs):
"""
"""
pid = httpaste.model.PasteKey(request['id'].encode('utf-8'))
key = httpaste.model.MasterKey(request['token_info'].get('master_key'))
sub = httpaste.model.Sub(request['token_info'].get('sub'))
pkey = httpaste.model.user.load_paste_key(
pid,
sub,
key,
current_app.httpaste.backend.user,
current_app.httpaste.salt)
httpaste.model.paste.remove_safe(pid, sub, pkey, current_app.httpaste.backend.paste, current_app.httpaste.salt)
return None, 200
def get(**kwargs):
"""
"""
request = GetRequest(**kwargs)
pid = httpaste.model.PasteKey(request['id'].encode('utf-8'))
key = httpaste.model.MasterKey(request['token_info'].get('master_key'))
sub = httpaste.model.Sub(request['token_info'].get('sub'))
pkey = httpaste.model.user.load_paste_key(
pid,
sub,
key,
current_app.httpaste.backend.user,
current_app.httpaste.salt)
try:
data, lifetime, encoding = httpaste.model.paste.get_safe(
pid,
pkey,
sub,
current_app.httpaste.backend.paste,
current_app.httpaste.salt)
except httpaste.model.paste.PasteLifetimeException as e:
httpaste.model.paste.remove_safe(pid, sub, pkey, current_app.httpaste.backend.paste, current_app.httpaste.salt)
raise GoneHttpException(str(e))
except httpaste.model.paste.PasteNotFoundException as e:
raise NotFoundHttpException(str(e))
if lifetime < 0:
httpaste.model.paste.remove_safe(pid, sub, pkey, current_app.httpaste.backend.paste, current_app.httpaste.salt)
return data.decode('utf-8'), 200
return paste.get(**kwargs)
def post(**kwargs):
"""
"""
request = PostRequest(**kwargs)
if not request['body'].get('data'):
raise BadRequestHttpException('form field \'data\' missing.')
encoding = request.get('encoding', 'utf-8')
if encoding not in ['utf-8', 'utf-16', 'ascii']:
try:
data = httpaste.model.paste.PasteData(decode(request['body'].get('data'), encoding))
encoding = None
except DecodeException as e:
raise BadRequestException(str(e))
else:
data = httpaste.model.paste.PasteData(
request['body']['data'].encode(encoding))
request.setdefault('lifetime', current_app.httpaste.paste_lifetime)
key = httpaste.model.MasterKey(request['token_info'].get('master_key'))
sub = httpaste.model.Sub(request['token_info'].get('sub'))
lifetime = httpaste.model.PasteLifetime(request['lifetime'])
pid, pkey = httpaste.model.paste.create_safe(data, lifetime, sub, encoding,
current_app.httpaste.backend.paste,
current_app.httpaste.salt)
httpaste.model.user.dump_paste_key(pid, pkey, sub, key,
current_app.httpaste.backend.user,
current_app.httpaste.salt)
return '/'.join((connexion.request.url, pid.decode('utf-8'))) + '\n', 200
return paste.post(**kwargs)

View file

@ -1,34 +1,4 @@
from typing import TypedDict, Optional
import connexion
from connexion.lifecycle import ConnexionResponse
from flask import current_app
from httpaste.helper.exception import BadRequestHttpException, \
GoneHttpException, \
NotFoundHttpException, \
ForbiddenHttpException
import httpaste.model.paste
from httpaste.helper.syntax import highlight
from httpaste.helper.common import decode, DecodeException
class Form(TypedDict):
data: str
class TokenInfo(TypedDict):
sub: str
token: str
class PostRequest(TypedDict):
body: Form
token_info: TokenInfo
class GetRequest(TypedDict):
id: str
token_info: TokenInfo
from httpaste.controller import paste
def search(**kwargs):
@ -40,86 +10,15 @@ def search(**kwargs):
return 'Hallo', 200
def get(**kwargs):
"""
"""
request = GetRequest(**kwargs)
pid = httpaste.model.PasteKey(request['id'].encode('utf-8'))
syntax = request.get('syntax')
formatter = request.get('format', 'terminal256')
linenos = request.get('linenos', False)
mime = request.get('mime', 'text/plain')
try:
data, lifetime, encoding = httpaste.model.paste.get(pid,
current_app.httpaste.backend.paste,
current_app.httpaste.salt)
except httpaste.model.paste.PasteLifetimeException as e:
httpaste.model.paste.remove(pid, current_app.httpaste.backend.paste)
raise GoneHttpException(str(e)) from e
except httpaste.model.paste.PasteNotFoundException as e:
raise NotFoundHttpException(str(e))
except httpaste.model.paste.PasteSubException as e:
raise ForbiddenHttpException(str(e))
if lifetime < 0:
httpaste.model.paste.remove(pid, current_app.httpaste.backend.paste)
if syntax:
data = highlight(data, str(syntax), formatter, linenos)
if encoding:
data = data.decode(encoding)
return ConnexionResponse(
status_code=200,
content_type=mime,
body=data
)
return paste.get(**kwargs)
def post(**kwargs):
"""
"""
request = PostRequest(**kwargs)
request.setdefault('lifetime', current_app.httpaste.paste_lifetime)
if not request['body'].get('data'):
raise BadRequestHttpException('form field \'data\' missing.')
encoding = request.get('encoding', 'utf-8')
if encoding not in ['utf-8', 'utf-16', 'ascii']:
try:
data = httpaste.model.PasteData(decode(request['body'].get('data'), encoding))
encoding = None
except DecodeException as e:
raise BadRequestHttpException(str(e))
else:
data = httpaste.model.PasteData(request['body']['data'].encode(encoding))
lifetime = httpaste.model.PasteLifetime(request['lifetime'])
pid = httpaste.model.paste.create(data, lifetime, encoding,
current_app.httpaste.backend.paste,
current_app.httpaste.salt)
return '/'.join((connexion.request.url, pid.decode('utf-8'))) + '\n', 200
return paste.post(**kwargs)

View file

View file

@ -0,0 +1,23 @@
"""
"""
from flask import current_app
from httpaste.helper.http import ForbiddenError
from httpaste.model.user import authenticate, AuthenticationError
def post(*args, **kwargs):
"""
"""
config = current_app.httpaste
user_id = args[0].encode('utf-8')
password = args[1].encode('utf-8')
try:
return authenticate(user_id, password, config.backend.user, config.salt, config.hmac_iterations)
except AuthenticationError as e:
raise ForbiddenError('You shall not pass!') from e

View file

@ -1,8 +1,8 @@
from random import choice
from base64 import b64decode
from urllib.parse import urljoin
class DecodeException(BaseException):
class DecodeError(Exception):
"""
"""
@ -12,12 +12,21 @@ def generate_random_string(length: int, charset: str) -> str:
return ''.join(choice(charset) for _ in range(length))
def decode(data:str, encoding:str) -> bytes:
def decode(data: str, encoding: str) -> bytes:
if encoding == 'base64':
return b64decode(data.encode('ascii'))
try:
return b64decode(data.encode('ascii'))
except UnicodeEncodeError as e:
raise DecodeError('unable to decode with base64.') from e
else:
raise DecodeException(f'unknown encoding \'{encoding}\'.')
raise DecodeError(f'unknown encoding \'{encoding}\'.')
def join_url(base:str, url: str) -> str:
return urljoin(base, url, True)

View file

@ -11,7 +11,10 @@ from cryptography.fernet import Fernet, InvalidToken
from httpaste import Config
class DecryptionException(BaseException):
DEFAULT_HMAC_ITERATIONS = 20000
class DecryptionError(Exception):
"""
"""
@ -35,7 +38,7 @@ def dhash(data: bytes):
return hashlib.sha512(data).digest()
def derive_key(main_key: str, salt: bytes = Config.salt) -> bytes:
def derive_key(main_key: str, salt: bytes = Config.salt, iterations:int=DEFAULT_HMAC_ITERATIONS) -> bytes:
"""derive a key from a main key
:param main_key: main key to derive from
@ -46,13 +49,13 @@ def derive_key(main_key: str, salt: bytes = Config.salt) -> bytes:
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=390000,
iterations=iterations,
)
return base64.urlsafe_b64encode(kdf.derive(main_key))
def encrypt(data: bytes, key: bytes, salt: bytes) -> bytes:
def encrypt(data: bytes, key: bytes, salt: bytes, hmac_iterations:int=DEFAULT_HMAC_ITERATIONS) -> bytes:
"""encrypt a data block
:param data: data block
@ -60,10 +63,10 @@ def encrypt(data: bytes, key: bytes, salt: bytes) -> bytes:
:param salt: randomization salt
"""
return Fernet(derive_key(key, salt)).encrypt(data)
return Fernet(derive_key(key, salt, hmac_iterations)).encrypt(data)
def decrypt(data: bytes, key: bytes, salt: bytes):
def decrypt(data: bytes, key: bytes, salt: bytes, hmac_iterations:int=DEFAULT_HMAC_ITERATIONS):
"""encrypt a data block
:param data: data block
@ -73,8 +76,8 @@ def decrypt(data: bytes, key: bytes, salt: bytes):
try:
return Fernet(derive_key(key, salt)).decrypt(data)
return Fernet(derive_key(key, salt, hmac_iterations)).decrypt(data)
except InvalidToken as e:
raise DecryptionException('unable to decrypt') from e
raise DecryptionError('unable to decrypt') from e

View file

@ -1,5 +1,5 @@
class BadRequestHttpException(RuntimeError):
class BadRequestError(RuntimeError):
def __init__(self, msg=None):
super().__init__(msg)
@ -12,7 +12,7 @@ class BadRequestHttpException(RuntimeError):
}, 400
class UnauthorizedHttpException(RuntimeError):
class UnauthorizedError(RuntimeError):
def __init__(self, msg=None):
super().__init__(msg)
@ -25,8 +25,7 @@ class UnauthorizedHttpException(RuntimeError):
}, 401
class ForbiddenHttpException(RuntimeError):
class ForbiddenError(RuntimeError):
def __init__(self, msg=None):
super().__init__(msg)
@ -39,7 +38,7 @@ class ForbiddenHttpException(RuntimeError):
}, 403
class GoneHttpException(RuntimeError):
class GoneError(RuntimeError):
def __init__(self, msg=None):
super().__init__(msg)
@ -52,7 +51,7 @@ class GoneHttpException(RuntimeError):
}, 410
class NotFoundHttpException(RuntimeError):
class NotFoundError(RuntimeError):
def __init__(self, msg=None):
super().__init__(msg)

View file

@ -2,7 +2,11 @@ from pygments.lexers import get_lexer_by_name, find_lexer_class_by_name
from pygments.formatters import find_formatter_class, HtmlFormatter
def highlight(data:str, lexer_alias:str, format_alias:str, linenos:bool=False):
def highlight(
data: str,
lexer_alias: str,
format_alias: str,
linenos: bool = False):
from pygments import highlight

View file

@ -18,9 +18,9 @@ class PasteDataSchema:
class UserDataSchema:
"""User Interface Schema between Model and Backend
"""
sub= bytes
key_hash= bytes
index= bytes
sub = bytes
key_hash = bytes
index = bytes
class Backend(object):

View file

@ -7,32 +7,37 @@ import time
from httpaste import Config
from httpaste.helper.crypto import dhash, shash, encrypt, decrypt
from httpaste.model import Paste, PasteId, Sub, MasterKey, PasteKey, Salt, \
PasteData, PasteHash, PasteTimestamp, PasteSub, \
PasteLifetime, PasteEncoding
from httpaste.helper.common import generate_random_string
from httpaste.model import (Paste, PasteId, Sub, MasterKey, PasteKey, Salt,
PasteData, PasteHash, PasteTimestamp, PasteSub,
PasteLifetime, PasteEncoding)
class PasteNotFoundException(BaseException):
class NotFoundError(Exception):
"""Paste Exception
"""
class PasteSubException(BaseException):
class SubError(Exception):
"""Paste Sub Exception
"""
class PasteChecksumException(BaseException):
class ChecksumError(Exception):
"""Paste Checksum Exception
"""
class PasteLifetimeException(BaseException):
class LifetimeError(Exception):
"""Paste Lifetime Exception
"""
class BackendError(Exception):
"""
"""
def generate_paste_id(
length: int = Config.paste_id_size,
charset: str = Config.paste_id_charset) -> bytes:
@ -66,19 +71,26 @@ def load(proto: Paste, backend: object) -> Optional[Paste]:
safe_pid = PasteId(dhash(proto.pid))
model = backend.load(Paste(safe_pid))
try:
model = backend.load(Paste(safe_pid))
except Exception as e:
raise BackendError(f'{e.__class__.__name__}: {e}') from e
if not model:
raise PasteNotFoundException('Paste does not exist')
raise NotFoundError('Paste does not exist')
if proto.sub and model.sub != shash(proto.sub, model.data_hash, proto.pid) or not proto.sub and model.sub:
if proto.sub and model.sub != shash(
proto.sub,
model.data_hash,
proto.pid) or not proto.sub and model.sub:
raise PasteSubException('Paste not owned by user')
raise SubError('Paste not owned by user')
if model.lifetime >= 0 and model.timestamp + (60 * model.lifetime) < int(time.time()):
if model.lifetime >= 0 and model.timestamp + \
(60 * model.lifetime) < int(time.time()):
raise PasteLifetimeException('Paste expired')
raise LifetimeError('Paste expired')
return model
@ -87,7 +99,8 @@ def load_safe(
proto: Paste,
key: PasteKey,
backend: object,
salt: Salt = Config.salt):
salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations):
"""load an encrypted paste model
:param proto: paste model prototype
@ -97,11 +110,11 @@ def load_safe(
model = load(proto, backend)
data = decrypt(model.data, key, salt)
data = decrypt(model.data, key, salt, hmac_iter)
if model.data_hash and dhash(data) != model.data_hash:
raise PasteChecksumException('Paste data scrambled')
raise ChecksumError('Paste data scrambled')
return Paste(
proto.pid,
@ -120,7 +133,10 @@ def dump(model: Paste, backend: object) -> None:
:param backend: model backend object
"""
backend.dump(model)
try:
backend.dump(model)
except Exception as e:
raise BackendError(str(e)) from e
def delete(proto: Paste, backend: object) -> None:
@ -129,21 +145,29 @@ def delete(proto: Paste, backend: object) -> None:
try:
model = load(proto, backend)
except PasteLifetimeException:
except LifetimeError:
pass
safe_pid = PasteId(dhash(proto.pid))
backend.delete(Paste(safe_pid))
try:
backend.delete(Paste(safe_pid))
except Exception as e:
raise BackendError(str(e)) from e
def delete_safe(proto: Paste, key:PasteKey, backend: object,salt: Salt = Config.salt) -> None:
def delete_safe(
proto: Paste,
key: PasteKey,
backend: object,
salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> None:
"""
"""
try:
model = load_safe(proto, key, backend, salt)
except PasteLifetimeException:
model = load_safe(proto, key, backend, salt, hmac_iter)
except LifetimeError:
pass
safe_pid = PasteId(dhash(proto.pid))
@ -156,7 +180,8 @@ def create(
lifetime: PasteLifetime,
encoding: PasteEncoding,
backend: object,
salt: Salt = Config.salt) -> PasteId:
salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> PasteId:
"""create an unencrypted paste
:param data: paste data
@ -170,9 +195,16 @@ def create(
sub = None
timestamp = PasteTimestamp(int(time.time()))
safe_data = PasteData(encrypt(data, pid, salt))
safe_data = PasteData(encrypt(data, pid, salt, hmac_iter))
model = Paste(safe_pid, sub, safe_data, data_hash, timestamp, lifetime, encoding)
model = Paste(
safe_pid,
sub,
safe_data,
data_hash,
timestamp,
lifetime,
encoding)
dump(model, backend)
@ -184,8 +216,8 @@ def create_safe(data: PasteData,
sub: Sub,
encoding: PasteEncoding,
backend: object,
salt: Salt = Config.salt) -> Tuple[PasteId,
PasteKey]:
salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> Tuple[PasteId,PasteKey]:
"""create an encrypted paste
:param data: paste data
@ -202,7 +234,7 @@ def create_safe(data: PasteData,
safe_sub = PasteSub(shash(sub, data_hash, pid))
timestamp = PasteTimestamp(int(time.time()))
safe_data = PasteData(encrypt(data, pkey, salt))
safe_data = PasteData(encrypt(data, pkey, salt, hmac_iter))
dump(Paste(
safe_pid,
@ -226,20 +258,26 @@ def remove(pid: PasteId, backend: object):
delete(proto, backend)
def remove_safe(pid: PasteId, sub:Sub, key:PasteKey, backend:object,salt: Salt = Config.salt):
def remove_safe(
pid: PasteId,
sub: Sub,
key: PasteKey,
backend: object,
salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations):
proto = Paste(pid, sub)
delete_safe(proto, key, backend, salt)
delete_safe(proto, key, backend, salt, hmac_iter)
def get(pid: PasteId, backend: object, salt: Salt = Config.salt) -> PasteData:
def get(pid: PasteId, backend: object, salt: Salt = Config.salt, hmac_iter: int = Config.hmac_iterations) -> PasteData:
"""conveniently load an unencrypted paste
"""
model = load(Paste(pid), backend)
data = decrypt(model.data, pid, salt)
data = decrypt(model.data, pid, salt, hmac_iter)
return PasteData(data), model.lifetime, model.encoding
@ -249,10 +287,11 @@ def get_safe(
pkey: PasteKey,
sub: Sub,
backend: object,
salt: Salt = Config.salt) -> PasteData:
salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> PasteData:
"""conveniently load an encrypted paste
"""
model = load_safe(Paste(pid, sub), pkey, backend, salt)
model = load_safe(Paste(pid, sub), pkey, backend, salt, hmac_iter)
return PasteData(model.data), model.lifetime, model.encoding

View file

@ -1,27 +1,35 @@
#!/usr/bin/env python3
"""user model interface
"""
import hashlib
import sqlite3
import os
import base64
import json
from typing import NamedTuple, Dict, Optional, Union
from flask import g, current_app
from typing import Optional
from httpaste import Config
from httpaste.helper.crypto import dhash, shash, encrypt, decrypt, derive_key, DecryptionException
from httpaste.model import User, KeyHash, Index, SerializedIndex, Salt, \
PasteKey, PasteId, MasterKey, Sub
from httpaste.helper.crypto import (
dhash,
shash,
encrypt,
decrypt,
derive_key,
DecryptionError)
from httpaste.model import (
User,
KeyHash,
Index,
SerializedIndex,
Salt,
PasteKey,
PasteId,
MasterKey,
Sub)
class AuthenticationError(BaseException):
class AuthenticationError(Exception):
"""Authentication Error
"""
class IndexError(BaseException):
class IndexError(Exception):
"""Index Decryption Error
"""
@ -30,7 +38,8 @@ def load(
proto: User,
master_key: str,
backend: object,
salt: Salt = Config.salt) -> Optional[User]:
salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> Optional[User]:
"""load user model
:param model: user model prototype
@ -47,9 +56,9 @@ def load(
try:
return User(
*model[:-1],
Index(**json.loads(decrypt(model.index, master_key, salt)))
Index(**json.loads(decrypt(model.index, master_key, salt, hmac_iter)))
)
except DecryptionException as e:
except DecryptionError as e:
raise IndexError('unable to decrypt user index') from e
@ -58,7 +67,8 @@ def dump(
model: User,
key: MasterKey,
backend: object,
salt: Salt = Config.salt) -> None:
salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> None:
"""dump a user model
:param model: user model
@ -73,7 +83,7 @@ def dump(
serialized_index = json.dumps(model.index).encode('utf-8')
safe_index = SerializedIndex(encrypt(serialized_index, key, salt))
safe_index = SerializedIndex(encrypt(serialized_index, key, salt, hmac_iter))
backend.dump(User(*model[:-1], safe_index))
@ -82,7 +92,7 @@ def load_paste_key(
pid: PasteId,
sub: Sub,
key: MasterKey,
backend: object, salt: Salt = Config.salt) -> Optional[PasteKey]:
backend: object, salt: Salt = Config.salt, hmac_iter: int = Config.hmac_iterations) -> Optional[PasteKey]:
"""load a user paste key
:param pid: paste id
@ -92,7 +102,7 @@ def load_paste_key(
:param salt: randomization salt
"""
for k, v in load(User(sub), key, backend, salt).index.items():
for k, v in load(User(sub), key, backend, salt, hmac_iter).index.items():
if bytes.fromhex(k) == pid:
@ -107,7 +117,8 @@ def dump_paste_key(
sub: Sub,
key: MasterKey,
backend: object,
salt: str = Config.salt) -> None:
salt: str = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> None:
"""dump a user paste key
:param pid: paste id
@ -117,19 +128,20 @@ def dump_paste_key(
:param backend: user model backend
"""
model = load(User(sub), key, backend, salt)
model = load(User(sub), key, backend, salt, hmac_iter)
dump(User(*model[:-1], Index({
**model.index,
**{pid.hex(): pkey.hex()}
})), key, backend, salt)
})), key, backend, salt, hmac_iter)
def authenticate(
user_id: bytes,
password: bytes,
backend: object,
salt: Salt = Config.salt):
salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations):
"""authenticate a user
:param user_id: human-readable user id
@ -137,20 +149,20 @@ def authenticate(
"""
sub = Sub(dhash(user_id))
key = MasterKey(derive_key(password, salt))
key = MasterKey(derive_key(password, salt, hmac_iter))
key_hash = KeyHash(dhash(key))
proto = User(sub)
try:
model = load(proto, key, backend, salt)
model = load(proto, key, backend, salt, hmac_iter)
except IndexError as e:
raise AuthenticationError('you dun goofed')
if not model:
model = User(sub, key_hash, Index({}))
dump(model, key, backend, salt)
dump(model, key, backend, salt, hmac_iter)
else:
if model.key_hash != key_hash:
@ -161,18 +173,3 @@ def authenticate(
'sub': sub,
'master_key': key
}
def _authenticate_connexion(user_id: str, password: str):
from httpaste.helper.exception import ForbiddenHttpException
from flask import current_app
try:
return authenticate(user_id.encode('utf-8'), password.encode('utf-8'),
current_app.httpaste.backend.user,
current_app.httpaste.salt)
except AuthenticationError as e:
raise ForbiddenHttpException('You shall not pass!') from e

View file

@ -32,14 +32,6 @@
},
},
"/paste/public": {
"get": {
"description": "get last 100 public pastes",
"responses": {
"200": {
"description": "",
}
}
},
"post": {
"description": "create a new public paste",
"requestBody": {
@ -58,11 +50,11 @@
],
"responses": {
"200": {
"description": "",
"description": "paste location",
"content": {
"text/plain": {
"schema": {
"type": "string"
"$ref": "#/components/schemas/PasteURL"
}
}
}
@ -72,10 +64,17 @@
},
"/paste/private": {
"get": {
"description": "get private paste ids",
"description": "get private paste locations",
"responses": {
"200": {
"description": "",
"description": "paste locations",
"content": {
"text/plain": {
"schema": {
"$ref": "#/components/schemas/NewLineDelimitedPasteURLs"
}
}
}
}
}
},
@ -92,15 +91,18 @@
"parameters": [
{
"$ref": "#/components/parameters/lifetime"
},
{
"$ref": "#/components/parameters/encoding"
}
],
"responses": {
"200": {
"description": "",
"description": "paste location",
"content": {
"text/plain": {
"schema": {
"type": "string"
"$ref": "#/components/schemas/PasteURL"
}
}
}
@ -108,78 +110,6 @@
}
}
},
"/paste/private/{id}": {
"get": {
"description": "get a private paste",
"security": [
{
"basicAuth": []
}
],
"parameters": [
{
"$ref": "#/components/parameters/id"
},
{
"$ref": "#/components/parameters/syntax"
}
],
"responses": {
"200": {
"description": "",
"content": {
"text/plain": {
"schema": {
"type": "string"
}
}
}
}
}
},
"put": {
"description": "update an existing private paste",
"security": [
{
"basicAuth": []
}
],
"requestBody": {
"$ref": "#/components/requestBodies/pastePost"
},
"parameters": [
{
"$ref": "#/components/parameters/id"
},
{
"$ref": "#/components/parameters/lifetime"
}
],
"responses": {
"200": {
"description": "",
}
}
},
"delete": {
"description": "delete an existing private paste",
"security": [
{
"basicAuth": []
}
],
"parameters": [
{
"$ref": "#/components/parameters/id"
}
],
"responses": {
"200": {
"description": "",
}
}
}
},
"/paste/public/{id}": {
"get": {
"description": "get a public paste",
@ -205,11 +135,41 @@
],
"responses": {
"200": {
"description": "",
"description": "paste data. content type may vary.",
"content": {
"text/plain": {
"schema": {
"type": "string"
"$ref": "#/components/schemas/PasteData"
}
}
}
}
}
}
},
"/paste/private/{id}": {
"get": {
"description": "get a private paste",
"security": [
{
"basicAuth": []
}
],
"parameters": [
{
"$ref": "#/components/parameters/id"
},
{
"$ref": "#/components/parameters/syntax"
}
],
"responses": {
"200": {
"description": "paste data. content type may vary.",
"content": {
"text/plain": {
"schema": {
"$ref": "#/components/schemas/PasteData"
}
}
}
@ -219,6 +179,20 @@
}
},
"components": {
"schemas": {
"PasteData": {
"description": "Paste Data",
"type": "string"
},
"PasteURL": {
"description": "URL pointing to paste",
"type": "string"
},
"NewLineDelimitedPasteURLs": {
"description": "new line delimited paste URLs",
"type": "string"
}
},
"requestBodies": {
"pastePost": {
"description": "Optional description in *Markdown*",
@ -270,6 +244,7 @@
"in": "query",
"required": false,
"schema": {
"description": "https://pygments.org/docs/lexers/",
"type": "string"
}
},
@ -279,6 +254,7 @@
"in": "query",
"required": false,
"schema": {
"description": "https://pygments.org/docs/formatters/",
"type": "string"
}
},
@ -292,13 +268,13 @@
}
},
"encoding": {
"description": "syntax highlighting with line numbers",
"description": "data encoding",
"name": "encoding",
"in": "query",
"required": false,
"schema": {
"type": "string",
"enum": ["base64"]
"enum": ["base64", "utf-8", "utf-16", "ascii"]
}
},
"mime": {
@ -315,7 +291,7 @@
"basicAuth": {
"type": "http",
"scheme": "basic",
"x-basicInfoFunc": "httpaste.model.user._authenticate_connexion"
"x-basicInfoFunc": "httpaste.controller.user.session.post"
}
}
}