Compare commits

..

No commits in common. "dev" and "master" have entirely different histories.
dev ... master

49 changed files with 637 additions and 1861 deletions

View file

@ -1 +0,0 @@
.gitignore

View file

@ -1,30 +0,0 @@
FROM python:3.10-slim as base
LABEL org.label-schema.schema-version="1.0"
LABEL org.label-schema.vendor="Tiara Rodney (victoryk.it)"
LABEL org.label-schema.name="victorykit/httpaste"
LABEL org.label-schema.description="a versatile HTTP pastebin"
LABEL org.label-schema.vcs-url="https://bitbucket.org/victorykit/httpaste"
LABEL org.label-schema.docker.cmd="docker run {image-id} {httpaste-args}"
LABEL org.label-schema.version=$BUILD_VERSION
LABEL org.label-schema.build-date=$BUILD_DATE
WORKDIR /usr/local/src/httpaste
COPY . .
RUN apt-get update && \
apt-get install -y libffi-dev gcc && \
python3 -m pip install pipenv && \
python3 -m pipenv install --deploy --system --verbose && \
python3 setup.py install && \
apt-get remove -y libffi-dev gcc && apt-get autoremove -y && apt-get clean -y
ENTRYPOINT ["httpaste"]
FROM base as uwsgi
ENTRYPOINT ["uwsgi", "--master", "--enable-threads", "--manage-script-name", "-w", "httpaste.wsgi:application"]
CMD ["-s", "/tmp/yourapplication.sock"]

View file

@ -7,10 +7,7 @@ name = 'pypi'
python_version = '3' python_version = '3'
[packages] [packages]
httpaste-victorykit = {editable = true, path = "."} httpaste = {editable = true, path = "."}
flup = '==1.0.3'
mysql-connector-python = '==8.0.28'
uWSGI = '==2.0.20'
[dev-packages] [dev-packages]
tox = '==3.23.0' tox = '==3.23.0'

104
Pipfile.lock generated
View file

@ -1,7 +1,7 @@
{ {
"_meta": { "_meta": {
"hash": { "hash": {
"sha256": "e8725ecbf33a0d4931d941bfa72dcb15bbcdbdcff1048ea65a4025146018a498" "sha256": "6fc8f1480cab514207ed13c95c3533fd240e04aa466d8fe781b969aa42b6313d"
}, },
"pipfile-spec": 6, "pipfile-spec": 6,
"requires": { "requires": {
@ -96,11 +96,11 @@
}, },
"click": { "click": {
"hashes": [ "hashes": [
"sha256:24e1a4a9ec5bf6299411369b208c1df2188d9eb8d916302fe6bf03faed227f1e", "sha256:5e0d195c2067da3136efb897449ec1e9e6c98282fbf30d7f9e164af9be901a6b",
"sha256:479707fe14d9ec9a0757618b7a100a0ae4c4e236fac5b7f80ca68028141a1a72" "sha256:7ab900e38149c9872376e8f9b5986ddcaf68c0f413cf73678a0bca5547e6f976"
], ],
"markers": "python_version >= '3.7'", "markers": "python_version >= '3.7'",
"version": "==8.1.2" "version": "==8.1.1"
}, },
"clickclick": { "clickclick": {
"hashes": [ "hashes": [
@ -110,6 +110,9 @@
"version": "==20.10.2" "version": "==20.10.2"
}, },
"connexion": { "connexion": {
"extras": [
"swagger-ui"
],
"hashes": [ "hashes": [
"sha256:0ba5c163d34cb3cb3bf597d5b95fc14bad5d3596bf10ec86e32cdb63f68d0c8a", "sha256:0ba5c163d34cb3cb3bf597d5b95fc14bad5d3596bf10ec86e32cdb63f68d0c8a",
"sha256:26a570a0283bbe4cdaf5d90dfb3441aaf8e18cb9de10f3f96bbc128a8a3d8b47" "sha256:26a570a0283bbe4cdaf5d90dfb3441aaf8e18cb9de10f3f96bbc128a8a3d8b47"
@ -151,13 +154,9 @@
"markers": "python_version >= '3.7'", "markers": "python_version >= '3.7'",
"version": "==2.1.1" "version": "==2.1.1"
}, },
"flup": { "httpaste": {
"hashes": [ "editable": true,
"sha256:5eb09f26eb0751f8380d8ac43d1dfb20e1d42eca0fa45ea9289fa532a79cd159", "path": "."
"sha256:ca9fd78e1cc0431da1236f73fafd1c01db684675b4d369460d5f5c62e6f0b8d6"
],
"index": "pypi",
"version": "==1.0.3"
}, },
"httpaste-victorykit": { "httpaste-victorykit": {
"editable": true, "editable": true,
@ -257,33 +256,6 @@
"markers": "python_version >= '3.7'", "markers": "python_version >= '3.7'",
"version": "==2.1.1" "version": "==2.1.1"
}, },
"mysql-connector-python": {
"hashes": [
"sha256:04d75ec7c181e7907df3d40c2a573063f25ecfc5a95a7a90374861c02ce738a9",
"sha256:47f059bc2a7378acd56ac7a60b0526a2ba95d96b696a875b5b233a0feae30980",
"sha256:4d126ce5e03675d926a9e49ce1638d06af43ca7bac2b502d93373dc9425d386f",
"sha256:50c87ff50762f4a0cc0816365dde0e7de763949e125488b8e872de6471e0e427",
"sha256:687071dc9e51892d0861bbcbcbd48e0f3579e3155f2a0ec310198704137c775a",
"sha256:73c5149b33401610e28589d1fc669cba11d3b16215a8f6a75f63ece1f3af5f88",
"sha256:77ec293e265d01db1896a8e63a16b3d5c848a885cf76c77148adfed8453846e8",
"sha256:78bb1abb57bbb85263d65a240a901195e3de0e0992f25e42c48af0869079bb74",
"sha256:7d518491d6d51b186b3182b3698b1560d9bd80675c055163359d0aeea0001de1",
"sha256:8d8dd02e0e6bb7262156a836c3e83582d1a1a1ebb9d72e777a46813709404601",
"sha256:91be638d1b084835edf7aa426d85228174611a1cd6f016ca0f6d4339ac3d9d7b",
"sha256:aaec9d13fc0177e421a3c4392f0eaf86347b825949d5dfc202d535cdb1e07f04",
"sha256:b3a747c5efd6de7b76686ab93834186e2276a62684600dbede615537040436ca",
"sha256:b4c5ce835078555b6640921cae036daad46884dd21027f43c742fb505221e4e6",
"sha256:bb317b179bfbb3e86c771bb2b34794188a2d2b010cdaa1b4d1b5ea0961d0812c",
"sha256:bd89598b173aa0fc525b59fff6e3598ff3cabad4260a3bb49cf420eac10d3b3b",
"sha256:bdb4f187f737316d1c403085b2fb7c91717268d052ecbfc86066cef59f6d72a4",
"sha256:c76d771fdce1314b07619efff184ec03f56abef6b4ccdc686d3a995f5b225fec",
"sha256:d559f69e8b58ac248e37d30e5676718adf69eeff56ed8a7c03f064d74af68f99",
"sha256:e008127430c8dc66bb1b6d6c7a17498ec57ffa81188fc1f8c9f764363c01d12e",
"sha256:f5da43c77d409c8135132f5b5aee9ac91c2e97c3f87352e1b3017438a9cb9b82"
],
"index": "pypi",
"version": "==8.0.28"
},
"packaging": { "packaging": {
"hashes": [ "hashes": [
"sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb", "sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb",
@ -292,36 +264,6 @@
"markers": "python_version >= '3.6'", "markers": "python_version >= '3.6'",
"version": "==21.3" "version": "==21.3"
}, },
"protobuf": {
"hashes": [
"sha256:001c2160c03b6349c04de39cf1a58e342750da3632f6978a1634a3dcca1ec10e",
"sha256:0b250c60256c8824219352dc2a228a6b49987e5bf94d3ffcf4c46585efcbd499",
"sha256:1d24c81c2310f0063b8fc1c20c8ed01f3331be9374b4b5c2de846f69e11e21fb",
"sha256:1eb13f5a5a59ca4973bcfa2fc8fff644bd39f2109c3f7a60bd5860cb6a49b679",
"sha256:25d2fcd6eef340082718ec9ad2c58d734429f2b1f7335d989523852f2bba220b",
"sha256:32bf4a90c207a0b4e70ca6dd09d43de3cb9898f7d5b69c2e9e3b966a7f342820",
"sha256:38fd9eb74b852e4ee14b16e9670cd401d147ee3f3ec0d4f7652e0c921d6227f8",
"sha256:47257d932de14a7b6c4ae1b7dbf592388153ee35ec7cae216b87ae6490ed39a3",
"sha256:4eda68bd9e2a4879385e6b1ea528c976f59cd9728382005cc54c28bcce8db983",
"sha256:52bae32a147c375522ce09bd6af4d2949aca32a0415bc62df1456b3ad17c6001",
"sha256:542f25a4adf3691a306dcc00bf9a73176554938ec9b98f20f929a044f80acf1b",
"sha256:5b5860b790498f233cdc8d635a17fc08de62e59d4dcd8cdb6c6c0d38a31edf2b",
"sha256:6efe066a7135233f97ce51a1aa007d4fb0be28ef093b4f88dac4ad1b3a2b7b6f",
"sha256:71b2c3d1cd26ed1ec7c8196834143258b2ad7f444efff26fdc366c6f5e752702",
"sha256:7a53d4035427b9dbfbb397f46642754d294f131e93c661d056366f2a31438263",
"sha256:7dcd84dc31ebb35ade755e06d1561d1bd3b85e85dbdbf6278011fc97b22810db",
"sha256:88c8be0558bdfc35e68c42ae5bf785eb9390d25915d4863bbc7583d23da77074",
"sha256:8be43a91ab66fe995e85ccdbdd1046d9f0443d59e060c0840319290de25b7d33",
"sha256:8d84453422312f8275455d1cb52d850d6a4d7d714b784e41b573c6f5bfc2a029",
"sha256:9d0f3aca8ca51c8b5e204ab92bd8afdb2a8e3df46bd0ce0bd39065d79aabcaa4",
"sha256:a1eebb6eb0653e594cb86cd8e536b9b083373fca9aba761ade6cd412d46fb2ab",
"sha256:bc14037281db66aa60856cd4ce4541a942040686d290e3f3224dd3978f88f554",
"sha256:fbcbb068ebe67c4ff6483d2e2aa87079c325f8470b24b098d6bf7d4d21d57a69",
"sha256:fd7133b885e356fa4920ead8289bb45dc6f185a164e99e10279f33732ed5ce15"
],
"markers": "python_version >= '3.7'",
"version": "==3.20.0"
},
"pycparser": { "pycparser": {
"hashes": [ "hashes": [
"sha256:8ee45429555515e1f6b185e78100aea234072576aa43ab53aefcae078162fca9", "sha256:8ee45429555515e1f6b185e78100aea234072576aa43ab53aefcae078162fca9",
@ -419,6 +361,13 @@
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5'", "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5'",
"version": "==2.27.1" "version": "==2.27.1"
}, },
"swagger-ui-bundle": {
"hashes": [
"sha256:b462aa1460261796ab78fd4663961a7f6f347ce01760f1303bbbdf630f11f516",
"sha256:cea116ed81147c345001027325c1ddc9ca78c1ee7319935c3c75d3669279d575"
],
"version": "==0.0.9"
},
"urllib3": { "urllib3": {
"hashes": [ "hashes": [
"sha256:44ece4d53fb1706f667c9bd1c648f5469a2ec925fcf3a776667042d645472c14", "sha256:44ece4d53fb1706f667c9bd1c648f5469a2ec925fcf3a776667042d645472c14",
@ -427,28 +376,21 @@
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' and python_version < '4'", "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' and python_version < '4'",
"version": "==1.26.9" "version": "==1.26.9"
}, },
"uwsgi": {
"hashes": [
"sha256:88ab9867d8973d8ae84719cf233b7dafc54326fcaec89683c3f9f77c002cdff9"
],
"index": "pypi",
"version": "==2.0.20"
},
"werkzeug": { "werkzeug": {
"hashes": [ "hashes": [
"sha256:3c5493ece8268fecdcdc9c0b112211acd006354723b280d643ec732b6d4063d6", "sha256:094ecfc981948f228b30ee09dbfe250e474823b69b9b1292658301b5894bbf08",
"sha256:f8e89a20aeabbe8a893c24a461d3ee5dad2123b05cc6abd73ceed01d39c3ae74" "sha256:9b55466a3e99e13b1f0686a66117d39bda85a992166e0a79aedfcf3586328f7a"
], ],
"markers": "python_version >= '3.7'", "markers": "python_version >= '3.7'",
"version": "==2.1.1" "version": "==2.1.0"
}, },
"zipp": { "zipp": {
"hashes": [ "hashes": [
"sha256:56bf8aadb83c24db6c4b577e13de374ccfb67da2078beba1d037c17980bf43ad", "sha256:9f50f446828eb9d45b267433fd3e9da8d801f614129124863f9c51ebceafb87d",
"sha256:c4f6e5bbf48e74f7a38e7cc5b0480ff42b0ae5178957d564d18932525d5cf099" "sha256:b47250dd24f92b7dd6a0a8fc5244da14608f3ca90a5efcd37a3b1642fac9a375"
], ],
"markers": "python_version >= '3.7'", "markers": "python_version >= '3.7'",
"version": "==3.8.0" "version": "==3.7.0"
} }
}, },
"develop": { "develop": {

View file

@ -2,7 +2,7 @@
![](docs/_assets/images/favpng_parrot-royalty-free-cartoon.png) ![](docs/_assets/images/favpng_parrot-royalty-free-cartoon.png)
**NOTE**: httpaste is publicly hosted at [httpaste.it](http://httpaste.it) and as a [Tor Onion Service](https://community.torproject.org/onion-services/overview/) ([http://paste77ubkwxy4fqezffsmthxdh3xerwi72tlsw2mch7ecjhw2xn7iyd.onion](http://paste77ubkwxy4fqezffsmthxdh3xerwi72tlsw2mch7ecjhw2xn7iyd.onion)). **NOTE**: httpaste is publicly hosted at [httpaste.it](http://httpaste.it) and as a hidden Tor service ([https://paste77ubkwxy4fqezffsmthxdh3xerwi72tlsw2mch7ecjhw2xn7iyd.onion](https://paste77ubkwxy4fqezffsmthxdh3xerwi72tlsw2mch7ecjhw2xn7iyd.onion)).
Both services are to be considered evaluatory, as long as the source code Both services are to be considered evaluatory, as long as the source code
is in pre-release. Regarding voidance of pre-release status, see [Open Issues](https://victorykit.atlassian.net/issues/?jql=project%20%3D%20HTTPASTE%20AND%20fixVersion%20in%20(1.1.0-beta%2C%201.2.0-beta%2C%201.3.0)), for more information. is in pre-release. Regarding voidance of pre-release status, see [Open Issues](https://victorykit.atlassian.net/issues/?jql=project%20%3D%20HTTPASTE%20AND%20fixVersion%20in%20(1.1.0-beta%2C%201.2.0-beta%2C%201.3.0)), for more information.

View file

@ -10,7 +10,7 @@ httpaste - versatile HTTP pastebin
.. image:: _assets/images/favpng_parrot-royalty-free-cartoon.png .. image:: _assets/images/favpng_parrot-royalty-free-cartoon.png
.. note:: .. note::
httpaste is publicly hosted at `httpaste.it`_ and as a `Tor Onion Service`_ (`<http://paste77ubkwxy4fqezffsmthxdh3xerwi72tlsw2mch7ecjhw2xn7iyd.onion>`_). httpaste is publicly hosted at `httpaste.it`_ and as a hidden Tor service (`<https://paste77ubkwxy4fqezffsmthxdh3xerwi72tlsw2mch7ecjhw2xn7iyd.onion>`_).
Both services are to be considered evaluatory, as long as the source code Both services are to be considered evaluatory, as long as the source code
is in pre-release. Regarding voidance of pre-release status, see `Open Issues`_, for more information. is in pre-release. Regarding voidance of pre-release status, see `Open Issues`_, for more information.
@ -79,8 +79,6 @@ This program uses licensed third-party software.
ARCHITECTURE ARCHITECTURE
CONTRIBUTING CONTRIBUTING
.. _Tor Onion Service: https://community.torproject.org/onion-services/overview/
.. _ix.io: http://ix.io/ .. _ix.io: http://ix.io/
.. _sprunge.us: http://sprunge.us .. _sprunge.us: http://sprunge.us
.. _pygments: https://pygments.org/ .. _pygments: https://pygments.org/

View file

@ -6,17 +6,11 @@ The backend can be configured within the `[backend]` section of the configuratio
SQLite SQLite
------ ------
.. autoclass:: httpaste.backend.sqlite.Config .. autoclass:: httpaste.backend.sqlite.Parameters
:members: :members:
Filesystem Filesystem
---------- ----------
.. autoclass:: httpaste.backend.file.Config .. autoclass:: httpaste.backend.file.Parameters
:members:
MySQL
-----
.. autoclass:: httpaste.backend.mysql.Config
:members: :members:

View file

@ -1,40 +0,0 @@
version: "3.4"
services:
httpaste:
build:
context: ../..
dockerfile: Dockerfile
target: uwsgi
environment:
HTTPASTE_CONFIGPATH: /usr/local/httpaste/config.ini
volumes:
-
type: volume
source: system-shared
target: /shared
volume:
nocopy: true
- ./httpaste/usr/local/httpaste/config.ini:/usr/local/httpaste/config.ini
command: -s /shared/uwsgi.sock --chmod-socket=666
httpd:
build:
context: ./httpd
dockerfile: Dockerfile
ports:
- "80:80"
volumes:
-
type: volume
source: system-shared
target: /shared
volume:
nocopy: true
- ./httpd/usr/local/apache2/conf/httpd.conf:/usr/local/apache2/conf/httpd.conf
tor:
build:
context: ./tor
dockerfile: Dockerfile
volumes:
- ./tor/etc/tor/torrc:/etc/tor/torrc
volumes:
system-shared:

View file

@ -1,17 +0,0 @@
[Unit]
Description=httpaste (via Docker Compose)
Requires=docker.service
After=docker.service
[Service]
WorkingDirectory=/usr/local/src/httpaste/samples/httpaste.it
ExecStart=docker-compose up
ExecStop=docker-compose down
TimeoutStartSec=0
Restart=on-failure
StartLimitIntervalSec=60
StartLimitBurst=3
[Install]
WantedBy=multi-user.target

View file

@ -1,38 +0,0 @@
[context]
salt = '&)UxB-_$Lk$m=CB}dw[d85{-ZWR?uUNx'
hmac_iter = 20000
[model.paste]
default_encoding = 'utf-8'
id_size = 8
key_size = 32
default_lifetime = 5
default_max_lifetime = 1440
[controller.paste]
default_mime_type = 'text/plain'
default_linenos = False
default_syntax = False
default_formatter = 'terminal256'
[backend]
type = file
[backend.file]
base_dirname = 'sample_data'
[backend.sqlite]
path = 'devel/sample.db'
[backend.mysql]
user = 'example-user'
password = 'my_cool_secret'
database = 'httpaste'
host = '127.0.0.1'
[server]
swagger_ui = False
bind_address = 'sample.sock'

View file

@ -1,14 +0,0 @@
FROM httpd:2.4
RUN apt-get update -y && apt-get install -y \
libapache2-mod-proxy-uwsgi \
libapache2-mod-evasive \
libapache2-mod-security2
RUN mkdir -p /usr/local/apache2/crs-tecmint
ADD https://github.com/SpiderLabs/owasp-modsecurity-crs/archive/refs/tags/v3.2.0.tar.gz /usr/local/apache2/crs/master
RUN cd /usr/local/apache2/crs && \
tar -xzf master && \
cp owasp-modsecurity-crs-3.2.0/crs-setup.conf.example owasp-modsecurity-crs-3.2.0/crs-setup.conf

View file

@ -1,90 +0,0 @@
ServerRoot "/usr/local/apache2"
Listen 0.0.0.0:80
LoadModule mpm_event_module modules/mod_mpm_event.so
LoadModule authn_core_module modules/mod_authn_core.so
LoadModule authz_core_module modules/mod_authz_core.so
#LoadModule brotli_module modules/mod_brotli.so
LoadModule mime_module modules/mod_mime.so
LoadModule log_config_module modules/mod_log_config.so
#LoadModule log_debug_module modules/mod_log_debug.so
#LoadModule log_forensic_module modules/mod_log_forensic.so
LoadModule env_module modules/mod_env.so
LoadModule proxy_module modules/mod_proxy.so
LoadModule proxy_uwsgi_module modules/mod_proxy_uwsgi.so
LoadModule unixd_module modules/mod_unixd.so
LoadModule access_compat_module modules/mod_access_compat.so
LoadModule security2_module /usr/lib/apache2/modules/mod_security2.so
LoadModule evasive20_module /usr/lib/apache2/modules/mod_evasive20.so
<IfModule unixd_module>
User www-data
Group www-data
</IfModule>
ServerAdmin you@example.com
ServerSignature Off
ServerTokens Prod
<IfModule security2_module>
Include crs/owasp-modsecurity-crs-3.2.0/crs-setup.conf
Include crs/owasp-modsecurity-crs-3.2.0/rules/*.conf
</IfModule>
<IfModule mod_evasive24.c>
DOSHashTableSize 3097
DOSPageCount 3
DOSSiteCount 10
DOSPageInterval 1
DOSSiteInterval 1
DOSBlockingPeriod 10
DOSCloseSocket On
</IfModule>
ErrorLog /proc/self/fd/2
LogLevel warn
<IfModule log_config_module>
LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\"" combined
LogFormat "%h %l %u %t \"%r\" %>s %b" common
<IfModule logio_module>
# You need to enable mod_logio.c to use %I and %O
LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\" %I %O" combinedio
</IfModule>
CustomLog /proc/self/fd/1 common
</IfModule>
<IfModule ssl_module>
SSLRandomSeed startup builtin
SSLRandomSeed connect builtin
</IfModule>
ServerName 127.0.0.1
<VirtualHost *:80>
<Location />
Deny from all
Allow from none
</Location>
</VirtualHost>
<VirtualHost 0.0.0.0:80>
#ProxyPreserveHost On
ServerName httpaste.it
ServerAlias localhost
SetEnv proxy-sendchunks
ProxyPass "/" "unix:/shared/uwsgi.sock|uwsgi://localhost/"
</VirtualHost>
<VirtualHost 0.0.0.0:80>
#ProxyPreserveHost On
ServerAlias *.onion
SetEnv proxy-sendchunks
ProxyPass "/" "unix:/shared/uwsgi.sock|uwsgi://localhost/"
</VirtualHost>

View file

@ -1,10 +0,0 @@
FROM debian:bullseye-slim
RUN apt-get update -y && apt-get install -y tor
COPY ./usr/local/sbin/hostname.sh /usr/local/sbin/hostname
RUN chmod +x /usr/local/sbin/hostname
USER debian-tor
ENTRYPOINT ["tor"]

View file

@ -1,3 +0,0 @@
DataDirectory /var/lib/tor
HiddenServiceDir /var/lib/tor/hidden_service/
HiddenServicePort 80 httpd:80

View file

@ -1,3 +0,0 @@
#!/usr/bin/env sh
prop=HiddenServiceDir
cat $(grep $prop /etc/tor/torrc | sed "s/$prop //g")/hostname

View file

@ -9,6 +9,8 @@ SYNOPSIS
HTTP [POST|PUT|DELETE|GET] {url}paste/[public|private] HTTP [POST|PUT|DELETE|GET] {url}paste/[public|private]
{url}ui
DESCRIPTION DESCRIPTION
This program offers an HTTP interface for storing public and private data This program offers an HTTP interface for storing public and private data
@ -19,7 +21,7 @@ DESCRIPTION
listed on any index, since it isn't technically possible (by design). listed on any index, since it isn't technically possible (by design).
All pastes are symetrically encrypted with an HMAC derived key using All pastes are symetrically encrypted with an HMAC derived key using
{hmac_iterations} iterations and SHA-256 hashing, a server-side salt and a {hmac_iterations} iterations and SHA-512 hashing, a server-side salt and a
randomly generated password. Public paste's passwords are derived from randomly generated password. Public paste's passwords are derived from
their ids. Private paste's passwords are randomly generated and stored their ids. Private paste's passwords are randomly generated and stored
inside a symetrically encrypted personal database, with the encryption key inside a symetrically encrypted personal database, with the encryption key
@ -113,12 +115,12 @@ EXAMPLES
SEE ALSO SEE ALSO
Documentation <https://victorykit.bitbucket.io/httpaste> Documentation <https://victorykit.bitbucket.org/httpaste>
Sources <https://bitbucket.org/victorykit/httpaste> Sources <https://bitbucket.org/victorykit/httpaste>
Host (HTTP) <http://httpaste.it> Host (HTTPS) <https://httpaste.it>
(Onion) <http://paste77ubkwxy4fqezffsmthxdh3xerwi72tlsw2mch7ecjhw2xn7iyd.onion> (HTTP) <http://httpaste.it>
NOTES NOTES
@ -135,23 +137,21 @@ NOTES
SUCH DAMAGES. SUCH DAMAGES.
""" """
from typing import NamedTuple from typing import NamedTuple, Tuple, Any
from string import ascii_uppercase, digits, ascii_letters, punctuation
from inspect import isclass
from configparser import ConfigParser from configparser import ConfigParser
from ast import literal_eval
from io import StringIO
from os import environ
from importlib.resources import path as resource_path from importlib.resources import path as resource_path
from pathlib import Path
from connexion import FlaskApp from connexion import FlaskApp
from connexion.resolver import RestyResolver from connexion.resolver import RestyResolver
from httpaste.server import get_server_config from httpaste.model import Backend
from httpaste.server import Config as ServerConfig from httpaste.backend import get_backend_map
from httpaste.context import get_context_config from httpaste.helper.common import generate_random_string
from httpaste.context import Config as ContextConfig
from httpaste.model import get_model_config
from httpaste.model import Config as ModelConfig
from httpaste.backend import get_backend_config
from httpaste.backend import Config as BackendConfig
from httpaste.helper.config import get_configparser, CONFIGPATH_ENVIRON
from httpaste.helper.http import ( from httpaste.helper.http import (
BadRequestError, BadRequestError,
ForbiddenError, ForbiddenError,
@ -160,50 +160,147 @@ from httpaste.helper.http import (
UnauthorizedError) UnauthorizedError)
class Config(NamedTuple): CONFIGPATH_ENVIRON = 'HTTPASTE_CONFIGPATH'
"""
"""
context: ContextConfig
server: ServerConfig
model: ModelConfig
backend: BackendConfig
def get_config(configIni: ConfigParser, path: Path): def get_sanitized_config_charset(charset: str):
for x in ["$", "%"]:
charset = charset.replace(x, f'{x}{x}')
return charset
class ConfigError(Exception):
"""Config Exception
"""
class Config:
"""httpaste global config
"""
salt: bytes = get_sanitized_config_charset(generate_random_string(
32, ascii_letters + digits + punctuation)).encode('utf-8')
paste_id_size: int = 8
paste_id_charset: str = ascii_letters + digits
paste_key_size: int = 32
paste_key_charset: str = get_sanitized_config_charset(
ascii_letters + digits + punctuation)
paste_lifetime: int = 5
backend: Backend = None
hmac_iterations: int = 20000
paste_default_encoding: str = 'utf-8'
class ServerConfig:
"""connexion config
"""
swagger_ui: bool = True
bind_address = None
def get_config_path(var_name: str = CONFIGPATH_ENVIRON):
""" """
""" """
from httpaste.model import Config as ModelConfig try:
context_config = get_context_config(configIni) return environ[var_name]
server_config = get_server_config(configIni) except KeyError as e:
model_config = get_model_config(configIni, path)
backend_config = get_backend_config(configIni, path)
return Config( raise ConfigError(
context=context_config, f'environment variable \'{var_name}\' not set.') from e
server=server_config,
model=model_config,
backend=backend_config
)
def load_config(path: str = None, var_name: str = CONFIGPATH_ENVIRON): def load_config(path: str) -> Tuple[Config, ServerConfig]:
"""get config objects from file
"""
_config = ConfigParser()
_config.read(path)
backends = get_backend_map()
bconf = dict(_config.items('backend'))
btype = bconf.pop('type')
try:
bcl, bparamcl = backends[btype]
except KeyError as e:
bids = ', '.join(backends.keys())
raise ConfigError(' '.join((
f'invalid backend \'{btype}\' in \'{path}\'. ',
f'must be any of [{bids}]'
))) from e
config = dict(_config.items('general'))
server_config = dict(_config.items('server'))
c = Config()
sc = ServerConfig()
# typecast model_backend section items
bconf = {k: literal_eval(v) for k, v in bconf.items()}
# initialize model backend
c.backend = bcl(bparamcl(**bconf))
# typecast general section items
for k, v in config.items():
setattr(c, k, literal_eval(v))
# typecast server section items
for k, v in server_config.items():
setattr(sc, k, literal_eval(v))
c.salt = c.salt.encode('utf-8')
return c, sc
def default_config() -> str:
""" """
""" """
configIni, path = get_configparser(path, var_name) config = ConfigParser()
return get_config(configIni, Path(path).resolve().parent) config['general'] = {
'salt': Config.salt.decode('utf-8'),
'paste_key_charset': Config.paste_key_charset,
'paste_id_charset': Config.paste_id_charset
}
for literal in [
'paste_id_size',
'paste_key_size',
'paste_lifetime'
]:
config['general'][literal] = str(getattr(Config, literal))
config['backend'] = {
'type': 'sqlite',
'path': 'file::memory:?cache=shared'
}
config['server'] = {}
for literal in [
'swagger_ui',
'bind_address'
]:
config['server'][literal] = str(getattr(ServerConfig, literal))
stream = StringIO()
config.write(stream)
stream.seek(0)
return stream.read()
def get_flask_app(config: Config) -> FlaskApp: def get_flask_app(
config: Config,
server_config: ServerConfig = ServerConfig) -> FlaskApp:
"""get a flask app object """get a flask app object
""" """
print(config.server.swagger_ui) options = {"swagger_ui": server_config.swagger_ui}
options = {"swagger_ui": config.server.swagger_ui}
#context manager returns a pathlib.Path object #context manager returns a pathlib.Path object
with resource_path('httpaste.schema', 'httpaste.openapi.json') as path: with resource_path('httpaste.schema', 'httpaste.openapi.json') as path:
@ -243,6 +340,8 @@ def get_flask_app(config: Config) -> FlaskApp:
__all__ = [ __all__ = [
Config, Config,
ServerConfig,
load_config, load_config,
default_config,
get_flask_app get_flask_app
] ]

View file

@ -40,9 +40,9 @@ def command_standalone(**kwargs):
'Please install it by running \'python3 -m pip install gevent\'.' 'Please install it by running \'python3 -m pip install gevent\'.'
))) from e ))) from e
config = load_config(kwargs.get('config_path')) config, server_config = load_config(kwargs.get('config_path'))
application = get_flask_app(config) application = get_flask_app(config, server_config)
http_server = WSGIServer(('', kwargs.get('port')), application) http_server = WSGIServer(('', kwargs.get('port')), application)
http_server.serve_forever() http_server.serve_forever()
@ -122,7 +122,7 @@ def parser():
p_standalone = sp.add_parser('standalone', help=command_standalone.__doc__) p_standalone = sp.add_parser('standalone', help=command_standalone.__doc__)
p_standalone.add_argument('--config-path', '-c', required=True) p_standalone.add_argument('--config-path', '-c', required=True)
p_standalone.add_argument('--port', '-p', default=8082) p_standalone.add_argument('--port', '-p', default=8080)
p_wsgi = sp.add_parser('wsgi', help=command_wsgi.__doc__) p_wsgi = sp.add_parser('wsgi', help=command_wsgi.__doc__)
p_wsgi.add_argument('--echo', '-e', action='store_true') p_wsgi.add_argument('--echo', '-e', action='store_true')

View file

@ -2,114 +2,63 @@
implements backend of model implements backend of model
""" """
from abc import ABC, abstractmethod import sys
from importlib import import_module from inspect import isclass
from configparser import ConfigParser from typing import Dict, Tuple
from typing import NamedTuple
from pathlib import Path
from httpaste.schema import User, Paste, UserDataSchema, PasteDataSchema from httpaste.model import Backend, UserDataSchema, PasteDataSchema, User, Paste
from httpaste.helper.config import get_config, ConfigError from .sqlite import Parameters as SqliteParameters
from .sqlite import User as SqliteUser
from .sqlite import Paste as SqlitePaste
from .sqlite import get_connection as get_sqlite_connection
from .file import Parameters as FileParameters
from .file import User as FileUser
from .file import Paste as FilePaste
class BackendError(Exception): class SQLite(Backend):
""" """SQLite backend interface
""" """
parameter_class = SqliteParameters
user: SqliteUser
paste: SqlitePaste
class ObjectBackend(ABC): def __init__(self, parameters: SqliteParameters):
"""
parameters = SqliteParameters(parameters.path, get_sqlite_connection(parameters))
self.user = SqliteUser(parameters, User)
self.paste = SqlitePaste(parameters, Paste)
class File(Backend):
"""File backend interface
""" """
@abstractmethod parameter_class = FileParameters
def load(self, proto: object) -> object: user: FileUser
pass paste: FilePaste
@abstractmethod def __init__(self, parameters: FileParameters):
def dump(self, model: object) -> None:
pass
@abstractmethod self.user = FileUser(parameters, User, UserDataSchema)
def delete(self, proto: object) -> None: self.paste = FilePaste(parameters, Paste, PasteDataSchema)
pass
@abstractmethod
def init(self) -> object:
pass
@abstractmethod
def sanitize(self) -> None:
pass
class BackendInterface(ABC): def get_backend_map() -> Dict[str, Tuple[type, type]]:
""" """get a map of backend ids and their classes
""" """
@abstractmethod mod = sys.modules[__name__]
def __init__(self, params: object, out = {}
user_model_class: type,
paste_model_class: type,
user_schema: type,
paste_schema: type) -> None:
pass
@property for i in dir(mod):
@abstractmethod
def user(self) -> ObjectBackend:
pass
@property obj = getattr(mod, i)
@abstractmethod
def paste(self) -> ObjectBackend:
pass
if isclass(obj) and obj.__module__ == __name__:
class Config(NamedTuple): out[i.lower()] = (obj, obj.parameter_class)
"""Backend Configuration
"""
interface: type
config: dict
return out
def load_backend(config: Config) -> BackendInterface:
"""load a backend
"""
backend = config.interface(config.config, Paste, User, PasteDataSchema,
UserDataSchema)
return backend
def get_backend_config(configIni: ConfigParser, path:Path) -> Config:
"""retrieve a cascaded backend configuration from an INI config object
"""
if 'backend' not in configIni:
raise ConfigError('missing [backend] section.')
if 'type' not in configIni['backend']:
raise ConfigError('missing [backend] \'type\'.')
mod_name = configIni['backend']['type']
section = f'backend.{mod_name}'
try:
mod = import_module(f'.{mod_name}', 'httpaste.backend')
except ImportError as e:
raise BackendError(f'backend \'{mod_name}\' does not exist: {e}') from e
else:
interface = mod.Backend
config = get_config(configIni, section, mod.Config, path)
return Config(interface=interface,config=config)
__all__ = [
load_backend,
get_backend_config
]

View file

@ -3,117 +3,110 @@
from os import path from os import path
from pathlib import Path from pathlib import Path
from typing import NamedTuple, Optional from typing import NamedTuple, Optional
from httpaste.backend import BackendInterface as BackendAbc
from httpaste.backend import ObjectBackend as ObjectBackendAbc from . import user
from . import paste
class Config(NamedTuple): class Parameters(NamedTuple):
"""Filesystem backend config """Filesystem backend parameters
""" """
#: path of base directory #: path of base directory
base_dirname: Path base_dirname: str
#: basename of users table directory #: basename of users table directory
user_dirname: str = 'users' user_dirname: Optional[str] = 'users'
#: basename of pastes table directory #: basename of pastes table directory
paste_dirname: str = 'pastes' paste_dirname: Optional[str] = 'pastes'
class ObjectBackendBc(ObjectBackendAbc): class User(object):
"""Filesystem user model backend
"""
dirname: Path dirname: Path
path: Path path: Path
def __init__( def __init__(
self, self,
interface: object, parameters: Parameters,
basename_attr: str,
config: Config,
model_class: type, model_class: type,
model_schema: type): model_schema: type):
self.interface = interface
self.model_class = model_class self.model_class = model_class
self.model_schema = model_schema self.model_schema = model_schema
self.dirname = path.join(config.base_dirname,
getattr(config, basename_attr)) self.dirname = path.join(parameters.base_dirname,
parameters.user_dirname)
self.path = Path(self.dirname) self.path = Path(self.dirname)
def load(self, proto: object): def load(self, proto: object):
return self.interface.load(proto, self.path, self.model_class, self.model_schema) return user.load(proto, self.path, self.model_class, self.model_schema)
def dump(self, model: object): def dump(self, model: object):
return self.interface.dump(model, self.path, self.model_schema) return user.dump(model, self.path, self.model_schema)
def delete(self, proto: object): def delete(self, proto: object):
return self.interface.delete(proto, self.path) return user.delete(proto, self.path)
def init(self): def init(self):
return self.interface.init(self.path) return user.init(self.path)
def sanitize(self): def sanitize(self):
if self.path.exists(): if self.path.exists():
return self.interface.sanitize(self.path, self.model_class, self.model_schema) return user.sanitize(self.path, self.model_class, self.model_schema)
return None return None
class UserBackend(ObjectBackendBc): class Paste(object):
"""Filesystem user model backend
"""
def __init__(self, *args):
from . import user
super().__init__(user, 'user_dirname', *args)
class PasteBackend(ObjectBackendBc):
"""Filesystem paste model backend """Filesystem paste model backend
""" """
def __init__(self, *args): dirname: str
path: Path
from . import paste def __init__(
self,
parameters: Parameters,
model_class: type,
model_schema: type):
super().__init__(paste, 'paste_dirname', *args) self.model_class = model_class
self.model_schema = model_schema
class Backend(BackendAbc): self.dirname = path.join(parameters.base_dirname,
"""File backend interface parameters.paste_dirname)
"""
_user: UserBackend self.path = Path(self.dirname)
_paste: PasteBackend
def __init__(self, def load(self, proto: object):
config: Config,
paste_model_class: type,
user_model_class: type,
paste_schema: type,
user_schema: type):
self._user = UserBackend(config, user_model_class, user_schema) return paste.load(proto, self.path, self.model_class, self.model_schema)
self._paste = PasteBackend(config, paste_model_class, paste_schema)
@property def dump(self, model: object):
def user(self) -> UserBackend:
return self._user return paste.dump(model, self.path, self.model_schema)
@property def delete(self, proto: object):
def paste(self) -> PasteBackend:
return self._paste return paste.delete(proto, self.path)
def init(self):
__all__ = [ return paste.init(self.path)
Config,
Backend def sanitize(self):
]
if self.path.exists():
return paste.sanitize(self.path, self.model_class, self.model_schema)
return None

View file

@ -1,124 +0,0 @@
"""MySQL backend
"""
from typing import NamedTuple, Optional
from httpaste.backend import BackendInterface as BackendAbc
from httpaste.backend import ObjectBackend as ObjectBackendAbc
class Config(NamedTuple):
"""MySQL config
"""
#: user name
user: str
#: user password
password: str
#: hostname or IP address
host: str
#: database identifier
database: str
#: a mysql.connection.MySQLConnection object (does not apply to config)
connection: object = None
class ObjectBackendBc(ObjectBackendAbc):
connection: object
def __init__(self, interface: object, config: Config, model_class: type) -> None:
self.interface = interface
self.model_class = model_class
self.connection = get_connection(config)
def load(self, proto: object) -> object:
return self.interface.load(proto, self.connection, self.model_class)
def dump(self, model: object) -> None:
return self.interface.dump(model, self.connection)
def delete(self, proto: object) -> None:
return self.interface.delete(proto, self.connection)
def init(self) -> None:
return self.interface.init(self.connection)
def sanitize(self) -> None:
return self.interface.sanitize(self.connection, self.model_class)
class UserBackend(ObjectBackendBc):
"""MySQL user model backend
"""
def __init__(self, *args) -> None:
from . import user
super().__init__(paste, *args)
class PasteBackend(ObjectBackendBc):
"""MySQL paste model backend
"""
connection: object
def __init__(self, *args) -> None:
from . import paste
super().__init__(paste, *args)
class Backend(BackendAbc):
"""MySQL backend interface
"""
user: UserBackend
paste: PasteBackend
def __init__(self,
config: Config,
paste_model_class: type,
user_model_class: type,
paste_schema: type,
user_schema: type):
self.user = UserBackend(config, user_model_class, user_schema)
self.paste = PasteBackend(config, paste_model_class, paste_schema)
def get_connection(config: Config) -> object:
"""get a mysql.connection.MySQLConnection object
"""
try:
from mysql.connector import connect
except ImportError as e:
raise ImportError(' '.join((
'\'mysql-connector-python\' is not installed.',
'Install it by running',
'\'python3 -m pip install mysql-connector-python\'.'
))) from e
if config.connection is not None:
return config.connection
connection = connect(user=config.user, password=config.password,
host=config.host,
database=config.database)
return connection
__all__ = [
Config,
Backend
]

View file

@ -1,122 +0,0 @@
from os import path
from time import time
from mysql.connector.connection import MySQLConnection
def load(proto:object, connection: MySQLConnection, model_class: type):
"""load a paste model
:param model: model prototype
:param connection: mysql connector connection object
:param model_class: model class
"""
cursor = connection.cursor(dictionary=True)
statement = '''SELECT pid, data, data_hash, sub, expiration, encoding
FROM httpaste_pastes
WHERE pid=%s'''
cursor.execute(statement, (proto.pid,))
row = cursor.fetchone()
if row is not None:
return model_class(
row['pid'],
row['sub'],
row['data'],
row['data_hash'],
row['expiration'],
row['encoding'])
return None
def dump(model:object, connection: MySQLConnection):
"""dump a paste model
:param model: model object
:param connection: mysql connector connection object
:param model_class: model class
"""
cursor = connection.cursor()
statement = '''REPLACE INTO httpaste_pastes
(pid, data, data_hash, sub, expiration, encoding)
VALUES (%s, %s, %s, %s, %s, %s)'''
cursor.execute(statement, (model.pid, model.data, model.data_hash,
model.sub, model.expiration, model.encoding))
connection.commit()
return None
def delete(proto: object, connection: MySQLConnection):
"""delete a paste model
:param model: model prototype
:param connection: mysql connector connection object
:param model_class: model class
"""
cursor = connection.cursor()
statement = '''DELETE FROM httpaste_pastes
WHERE pid=%s'''
cursor.execute(statement, (proto.pid,))
connection.commit()
return None
def init(connection: MySQLConnection):
"""initialize paste model table
:param connection: mysql connector connection object
"""
cursor = connection.cursor()
statement = '''CREATE TABLE `httpaste_pastes` (
`pid` blob NOT NULL,
`data` longblob NOT NULL,
`data_hash` blob NOT NULL,
`sub` blob DEFAULT NULL,
`expiration` int(16) NOT NULL,
`encoding` tinytext DEFAULT NULL,
PRIMARY KEY (`pid`(128))
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;'''
cursor.execute(statement)
connection.commit()
return None
def sanitize(connection: MySQLConnection, model_class:type):
"""sanitize paste model table
:param connection: mysql connector connection object
"""
cursor = connection.cursor(dictionary=True)
statement = '''SELECT pid
FROM httpaste_pastes
WHERE expiration < %s AND expiration > 0'''
cursor.execute(statement, (time(),))
for row in cursor.fetchall():
delete(model_class(row['pid']))
return None

View file

@ -1,99 +0,0 @@
from os import path
from mysql.connector.connection import MySQLConnection
def load(proto:object, connection: MySQLConnection, model_class: type):
"""load a user model
:param model: model prototype
:param connection: mysql connector connection object
:param model_class: model class
"""
cursor = connection.cursor(dictionary=True)
statement = '''SELECT sub, key_hash, paste_index
FROM httpaste_users
WHERE sub=%s'''
cursor.execute(statement, (proto.sub,))
row = cursor.fetchone()
if row is not None:
return model_class(row['sub'], row['key_hash'], row['paste_index'])
return None
def dump(model:object, connection: MySQLConnection):
"""dump a user model
:param model: model object
:param connection: mysql connector connection object
:param model_class: model class
"""
cursor = connection.cursor()
statement = '''REPLACE INTO httpaste_users
(sub, key_hash, paste_index)
VALUES (%s, %s, %s)'''
cursor.execute(statement, (model.sub, model.key_hash, model.index))
connection.commit()
return None
def delete(proto: object, connection: MySQLConnection):
"""delete a user model
:param model: model prototype
:param connection: mysql connector connection object
:param model_class: model class
"""
cursor = connection.cursor()
statement = '''DELETE FROM httpaste_users
WHERE sub=%s'''
cursor.execute(statement, (proto.sub,))
connection.commit()
return None
def init(connection: MySQLConnection):
"""initialize user model table
:param connection: mysql connector connection object
"""
cursor = connection.cursor()
statement = '''CREATE TABLE `httpaste_users` (
`sub` blob NOT NULL,
`key_hash` blob NOT NULL,
`paste_index` blob NOT NULL,
PRIMARY KEY (`sub`(128))
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;'''
cursor.execute(statement)
connection.commit()
return None
def sanitize(connection: MySQLConnection, model_class: type):
"""sanitize user model table
:param connection: mysql connector connection object
"""
return None

View file

@ -2,113 +2,96 @@
""" """
from sqlite3 import Connection, Row, connect from sqlite3 import Connection, Row, connect
from typing import NamedTuple, Optional from typing import NamedTuple, Optional
from pathlib import Path
from httpaste.backend import BackendInterface as BackendAbc from . import user
from httpaste.backend import ObjectBackend as ObjectBackendAbc from . import paste
class Config(NamedTuple): class Parameters(NamedTuple):
"""SQLite backend config """SQLite backend parameters
""" """
#: local path or URI #: local path or URI
uri: Path path: str
user_table_name: str = 'httpaste_users'
paste_table_name: str = 'httpaste_pastes'
#: a sqlite3.Connection object (does not apply to config) #: a sqlite3.Connection object (does not apply to config)
connection: Connection = None connection: Optional[object] = None
class ObjectBackendBc(ObjectBackendAbc): class User(object):
"""SQLite user model backend
"""
connection: object connection: Connection
def __init__(self, interface: object, table_name_attr: str, config: Config, model_class: type, schema: type) -> None: def __init__(self, parameters: Parameters, model_class: type):
self.interface = interface
self.model_class = model_class self.model_class = model_class
self.connection = get_connection(config)
self.table = getattr(config, table_name_attr)
def load(self, proto: object) -> object: self.connection = get_connection(parameters)
return self.interface.load(proto, self.connection, self.table, self.model_class) def load(self, proto: object):
def dump(self, model: object) -> None: return user.load(proto, self.connection, self.model_class)
return self.interface.dump(model, self.connection, self.table) def dump(self, model: object):
def delete(self, proto: object) -> None: return user.dump(model, self.connection)
return self.interface.delete(proto, self.connection, self.table) def delete(self, proto: object):
def init(self) -> None: return user.delete(proto, self.connection)
return self.interface.init(self.connection, self.table) def init(self):
def sanitize(self) -> None: return user.init(self.connection)
return self.interface.sanitize(self.connection, self.table, self.model_class) def sanitize(self):
return user.sanitize(self.connection, self.model_class)
class UserBackend(ObjectBackendBc): class Paste(object):
"""sqlite user model backend """SQLite paste model backend
""" """
def __init__(self, *args) -> None: connection: Connection
from . import user def __init__(self, parameters: Parameters, model_class: type):
super().__init__(paste, 'user_table_name', *args) self.model_class = model_class
self.connection = get_connection(parameters)
def load(self, proto: object):
return paste.load(proto, self.connection, self.model_class)
def dump(self, model: object):
return paste.dump(model, self.connection)
def delete(self, proto: object):
return paste.delete(proto, self.connection)
def init(self):
return paste.init(self.connection)
def sanitize(self):
return paste.sanitize(self.connection, self.model_class)
class PasteBackend(ObjectBackendBc): def get_connection(parameters: Parameters):
"""sqlite paste model backend
"""
connection: object
def __init__(self, *args) -> None:
from . import paste
super().__init__(paste, 'paste_table_name', *args)
class Backend(BackendAbc):
"""sqlite backend interface
"""
user: UserBackend
paste: PasteBackend
def __init__(self,
config: Config,
paste_model_class: type,
user_model_class: type,
paste_schema: type,
user_schema: type):
self.user = UserBackend(config, user_model_class, user_schema)
self.paste = PasteBackend(config, paste_model_class, paste_schema)
def get_connection(config: Config):
"""get an sqlite connection object """get an sqlite connection object
""" """
if config.connection: if parameters.connection:
return config.connection return parameters.connection
connection = connect(config.uri, check_same_thread=False) connection = connect(parameters.path, check_same_thread=False)
connection.row_factory = Row connection.row_factory = Row
return connection return connection
__all__ = [
Config,
Backend
]

View file

@ -6,102 +6,77 @@ from time import time
from importlib.resources import open_text from importlib.resources import open_text
def load(proto: object, connection: Connection, table: str, model_class: type): def load(proto: object, connection: Connection, model_class: type):
"""load a paste """load a paste
""" """
cursor = connection.cursor() cur = connection.cursor()
statement = f'''SELECT pid, data, data_hash, sub, expiration, encoding cur.execute(
FROM {table} 'SELECT pid, data, data_hash, sub, expiration, encoding FROM pastes WHERE pid=?',
WHERE pid=?''' (proto.pid,
))
cursor.execute(statement, (proto.pid,)) result = cur.fetchone()
row = cursor.fetchone() if result:
if row is not None:
return model_class( return model_class(
row['pid'], result['pid'],
row['sub'], result['sub'],
row['data'], result['data'],
row['data_hash'], result['data_hash'],
row['expiration'], result['expiration'],
row['encoding']) result['encoding'])
return None return None
def dump(model: object, connection: Connection, table: str) -> None: def dump(model: object, connection: Connection):
"""dump a paste """dump a paste
""" """
cursor = connection.cursor() cur = connection.cursor()
statement = f'''INSERT INTO "{table}" cur.execute(
(pid, data, data_hash, sub, expiration, encoding) '''INSERT INTO pastes (pid, data, data_hash, sub, expiration, encoding)
VALUES (?,?,?,?,?,?)''' VALUES (?,?,?,?,?,?)''',
(model.pid,
values = (model.pid,
model.data, model.data,
model.data_hash, model.data_hash,
model.sub, model.sub,
model.expiration, model.expiration,
model.encoding) model.encoding))
cursor.execute(statement, values)
connection.commit()
return None
def delete(proto: object, connection: Connection, table: str) -> None:
cursor = connection.cursor()
cursor.execute(f'''DELETE FROM {table} WHERE pid=?''', (proto.pid,))
connection.commit()
return None
def init(connection: Connection, table: str):
cursor = connection.cursor()
statement = f'''CREATE TABLE IF NOT EXISTS "{table}" (
"pid" BLOB NOT NULL UNIQUE,
"data" BLOB NOT NULL,
"data_hash" BLOB NOT NULL,
"sub" BLOB UNIQUE,
"expiration" INTEGER NOT NULL,
"encoding" TEXT,
PRIMARY KEY("pid")
);'''
cursor.execute(statement)
connection.commit() connection.commit()
def sanitize(connection: Connection, table: str, model_class: type) -> int: def delete(proto: object, connection: Connection) -> bool:
cursor = connection.cursor() cur = connection.cursor()
statement = f'''SELECT pid FROM {table} cur.execute('''DELETE FROM pastes WHERE pid=?''', (proto.pid,))
WHERE expiration < ? AND expiration > 0'''
cursor.execute(statement, (int(time()),)) connection.commit()
srow_count = 0
def init(connection: Connection):
cur = connection.cursor()
with open_text('httpaste.backend.sqlite', 'paste.sql') as fh:
cur.execute(fh.read())
connection.commit()
def sanitize(connection: Connection, model_class: type) -> bool:
cur = connection.cursor()
cur.execute('''SELECT pid FROM pastes WHERE expiration < ? AND expiration > 0''', (int(time()),))
for row in cur.fetchall(): for row in cur.fetchall():
delete(model_class(row['pid'])) delete(model_class(row['pid']))
srow_count += 1
return srow_count

View file

@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS "pastes" (
"pid" BLOB NOT NULL UNIQUE,
"data" BLOB NOT NULL,
"data_hash" BLOB NOT NULL,
"sub" BLOB UNIQUE,
"expiration" INTEGER NOT NULL,
"encoding" TEXT,
PRIMARY KEY("pid")
);

View file

@ -6,73 +6,56 @@ from httpaste.model import User
from importlib.resources import open_text from importlib.resources import open_text
def load(proto: object, connection: Connection, table: str, model_class: type): def load(proto: User, connection: Connection):
"""load a user """load a user
""" """
cursor = connection.cursor() cur = connection.cursor()
statement = f'''SELECT sub, key_hash, paste_index cur.execute(
FROM {table} 'SELECT sub, key_hash, paste_index FROM users WHERE sub=?', (proto.sub,))
WHERE sub=?'''
cursor.execute(statement, (proto.sub,)) result = cur.fetchone()
row = cursor.fetchone() if result:
if row is not None: return User(result['sub'], result['key_hash'], result['paste_index'])
return model_class(row['sub'], row['key_hash'], row['paste_index'])
return None return None
def dump(model: object, connection: Connection, table: str) -> None: def dump(model: User, connection: Connection):
"""dump a user """dump a user
""" """
cursor = connection.cursor() cur = connection.cursor()
statement = f'''INSERT OR REPLACE INTO {table} cur.execute('''INSERT OR REPLACE INTO users (sub, key_hash, paste_index)
(sub, key_hash, paste_index) VALUES (?,?,?)''', (model.sub, model.key_hash, model.index))
VALUES (?,?,?)'''
cursor.execute(statement, (model.sub, model.key_hash, model.index))
connection.commit() connection.commit()
return None
def delete(proto: object, connection: Connection) -> bool:
def delete(proto: object, connection: Connection, table: str) -> None: cur = connection.cursor()
cursor = connection.cursor() cur.execute('''DELETE FROM users WHERE sub=?''', (proto.sub,))
cursor.execute(f'''DELETE FROM {table} WHERE sub=?''', (proto.sub,))
connection.commit() connection.commit()
return None
def init(connection: Connection):
def init(connection: Connection, table: str) -> None: cur = connection.cursor()
cursor = connection.cursor() with open_text('httpaste.backend.sqlite', 'user.sql') as fh:
statement = f'''CREATE TABLE IF NOT EXISTS "{table}" ( cur.execute(fh.read())
"sub" BLOB NOT NULL UNIQUE,
"key_hash" BLOB NOT NULL,
"paste_index" BLOB,
PRIMARY KEY("sub")
);'''
cursor.execute(statement)
connection.commit() connection.commit()
def sanitize(connection: Connection, model_class) -> bool:
return None return None
def sanitize(connection: Connection, table: str, model_class) -> int:
return 0

View file

@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS "users" (
"sub" BLOB NOT NULL UNIQUE,
"key_hash" BLOB NOT NULL,
"paste_index" BLOB,
PRIMARY KEY("sub")
);

View file

@ -2,10 +2,10 @@
"""httpaste CGI entrypoint """httpaste CGI entrypoint
""" """
from wsgiref.handlers import CGIHandler from wsgiref.handlers import CGIHandler
from httpaste import load_config, get_flask_app from httpaste import load_config, get_flask_app, get_config_path
config = load_config() config, server_config = load_config(get_config_path())
application = get_flask_app(config) application = get_flask_app(config, server_config)
CGIHandler().run(application) CGIHandler().run(application)

View file

@ -1,18 +0,0 @@
from typing import NamedTuple
from string import ascii_uppercase, digits, ascii_letters, punctuation
from configparser import ConfigParser
from httpaste.helper.common import generate_random_string
from httpaste.helper.config import get_sanitized_config_charset, get_config
class Config(NamedTuple):
"""httpaste global config
"""
salt: bytes = get_sanitized_config_charset(generate_random_string(
32, ascii_letters + digits + punctuation)).encode('utf-8')
hmac_iter: int = 20000
def get_context_config(configIni: ConfigParser) -> Config:
return get_config(configIni, 'context', Config)

View file

@ -5,14 +5,12 @@ import httpaste
def get(**kwargs): def get(**kwargs):
config = current_app.httpaste config = current_app.httpaste
context = config.context
model = config.model
return httpaste.__doc__.format( return httpaste.__doc__.format(
url=connexion.request.url, url=connexion.request.url,
hmac_iterations=context.hmac_iter, hmac_iterations=config.hmac_iterations,
paste_lifetime=model.paste.default_lifetime, paste_lifetime=config.paste_lifetime,
paste_max_lifetime=str(round(model.paste.default_max_lifetime / 60)), paste_max_lifetime=str(round(config.paste_max_lifetime / 60)),
paste_default_encoding=model.paste.default_encoding paste_default_encoding=config.paste_default_encoding
), 200 ), 200

View file

@ -5,10 +5,8 @@ from flask import current_app
from httpaste.helper.common import decode, DecodeError, join_url from httpaste.helper.common import decode, DecodeError, join_url
import httpaste.model.paste as paste_model import httpaste.model.paste as paste_model
import httpaste.model.user as user_model import httpaste.model.user as user_model
from httpaste.backend import load_backend
from httpaste.helper.http import BadRequestError, GoneError, NotFoundError from httpaste.helper.http import BadRequestError, GoneError, NotFoundError
from httpaste.helper.syntax import highlight from httpaste.model import (
from httpaste.schema import (
PasteKey, PasteKey,
PasteData, PasteData,
PasteLifetime, PasteLifetime,
@ -17,13 +15,6 @@ from httpaste.schema import (
Sub) Sub)
class Config:
default_mime_type: str = 'text/plain'
default_linenos: bool = False
default_syntax: bool = False
default_formatter: str = 'terminal256'
def delete(**kwargs): def delete(**kwargs):
""" """
""" """
@ -54,15 +45,12 @@ def get(**kwargs):
""" """
config = current_app.httpaste config = current_app.httpaste
backend = load_backend(config.backend)
context = config.context
syntax = kwargs.get('syntax')
formatter = kwargs.get('format', Config.default_formatter)
linenos = kwargs.get('linenos', Config.default_linenos)
mime = kwargs.get('mime', Config.default_mime_type)
pid = PasteKey(kwargs['id'].encode('utf-8')) pid = PasteKey(kwargs['id'].encode('utf-8'))
syntax = kwargs.get('syntax')
formatter = kwargs.get('format', 'terminal256')
linenos = kwargs.get('linenos', False)
mime = kwargs.get('mime', 'text/plain')
if kwargs.get('user') is not None: if kwargs.get('user') is not None:
# authenticated # authenticated
@ -70,23 +58,26 @@ def get(**kwargs):
key = MasterKey(kwargs['token_info'].get('master_key')) key = MasterKey(kwargs['token_info'].get('master_key'))
sub = Sub(kwargs['token_info'].get('sub')) sub = Sub(kwargs['token_info'].get('sub'))
pkey = user_model.load_paste_key(pid, sub, key, backend.user, context) pkey = user_model.load_paste_key(pid, sub, key, config.backend.user,
config.salt, config.hmac_iterations)
def call(): return paste_model.get_safe(pid, pkey, sub, def call(): return paste_model.get_safe(pid, pkey, sub,
config.model.paste, config.backend.paste,
backend.paste, context) config.salt, config.hmac_iterations)
else: else:
# unauthenticated # unauthenticated
def call(): return paste_model.get(pid, backend.paste, context) def call(): return paste_model.get(pid, config.backend.paste,
config.salt, config.hmac_iterations)
try: try:
data, expiration, encoding = call() data, expiration, encoding = call()
except paste_model.LifetimeError as e: except paste_model.LifetimeError as e:
if kwargs.get('user') is not None: if kwargs.get('user') is not None:
paste_model.remove_safe(pid, sub, pkey, backend.paste, context) paste_model.remove_safe(pid, sub, pkey, config.backend.paste,
config.salt, config.hmac_iterations)
else: else:
paste_model.remove(pid, backend.paste) paste_model.remove(pid, config.backend.paste)
raise GoneError(str(e)) from e raise GoneError(str(e)) from e
except paste_model.NotFoundError as e: except paste_model.NotFoundError as e:
raise NotFoundError(str(e)) raise NotFoundError(str(e))
@ -96,17 +87,16 @@ def get(**kwargs):
# burn after read # burn after read
if expiration < 0: if expiration < 0:
if kwargs.get('user') is not None: if kwargs.get('user') is not None:
paste_model.remove_safe(pid, sub, pkey, backend.paste, context) paste_model.remove_safe(pid, sub, pkey, config.backend.paste,
config.salt, config.hmac_iterations)
else: else:
paste_model.remove(pid, backend.paste) paste_model.remove(pid, config.backend.paste)
if encoding is not None:
data = data.decode(encoding)
if syntax is not None: if syntax is not None:
data = highlight(data, str(syntax), formatter, linenos) data = highlight(data, str(syntax), formatter, linenos)
if encoding is not None:
data = data.decode(encoding)
return ConnexionResponse( return ConnexionResponse(
status_code=200, status_code=200,
@ -120,14 +110,12 @@ def post(**kwargs):
""" """
config = current_app.httpaste config = current_app.httpaste
backend = load_backend(config.backend)
context = config.context
if kwargs['body'].get('data') is None: if kwargs['body'].get('data') is None:
raise BadRequestError('form field \'data\' missing.') raise BadRequestError('form field \'data\' missing.')
encoding = PasteEncoding(kwargs.get('encoding', 'utf-8')) encoding = PasteEncoding(kwargs.get('encoding', 'utf-8'))
lifetime = PasteLifetime(kwargs.get('lifetime', config.model.paste.default_lifetime)) lifetime = PasteLifetime(kwargs.get('lifetime', config.paste_lifetime))
if encoding not in ['utf-8', 'utf-16', 'ascii']: if encoding not in ['utf-8', 'utf-16', 'ascii']:
try: try:
@ -147,15 +135,15 @@ def post(**kwargs):
sub = Sub(kwargs['token_info'].get('sub')) sub = Sub(kwargs['token_info'].get('sub'))
pid, pkey = paste_model.create_safe(pdata, lifetime, sub, encoding, pid, pkey = paste_model.create_safe(pdata, lifetime, sub, encoding,
config.model.paste, backend.paste, config.backend.paste, config.salt, config.hmac_iterations)
context)
user_model.dump_paste_key(pid, pkey, sub, key, backend.user, context) user_model.dump_paste_key(pid, pkey, sub, key, config.backend.user,
config.salt, config.hmac_iterations)
else: else:
# unauthenticated # unauthenticated
pid = paste_model.create(pdata, lifetime, encoding, config.model.paste, pid = paste_model.create(pdata, lifetime, encoding, config.backend.paste,
backend.paste, context) config.salt, config.hmac_iterations)
base_url = join_url(request.root_url, request.path) base_url = join_url(request.root_url, request.path)

View file

@ -5,6 +5,8 @@ def search(**kwargs):
""" """
""" """
print(args)
return 'Hallo', 200 return 'Hallo', 200

View file

@ -4,22 +4,20 @@ from flask import current_app
from httpaste.helper.http import ForbiddenError from httpaste.helper.http import ForbiddenError
from httpaste.model.user import authenticate, AuthenticationError from httpaste.model.user import authenticate, AuthenticationError
from httpaste.backend import load_backend
def post(*args, **kwargs): def post(*args, **kwargs):
""" """
""" """
config = current_app.httpaste config = current_app.httpaste
backend = load_backend(config.backend)
context = config.context
user_id = args[0].encode('utf-8') user_id = args[0].encode('utf-8')
password = args[1].encode('utf-8') password = args[1].encode('utf-8')
try: try:
return authenticate(user_id, password, backend.user, context) return authenticate(user_id, password, config.backend.user, config.salt, config.hmac_iterations)
except AuthenticationError as e: except AuthenticationError as e:
raise ForbiddenError('You shall not pass!') from e raise ForbiddenError('You shall not pass!') from e

View file

@ -2,11 +2,11 @@
"""httpaste FastCGI entrypoint """httpaste FastCGI entrypoint
""" """
from flup.server.fcgi import WSGIServer from flup.server.fcgi import WSGIServer
from httpaste import load_config, get_flask_app from httpaste import load_config, get_flask_app, get_config_path
config = load_config() config, server_config = load_config(get_config_path())
application = get_flask_app(config) application = get_flask_app(config, server_config)
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -1,113 +0,0 @@
import os
from pathlib import Path
from configparser import ConfigParser, NoSectionError
from typing import Optional, NamedTuple
from os import environ
from ast import literal_eval
CONFIGPATH_ENVIRON = 'HTTPASTE_CONFIGPATH'
class ConfigError(Exception):
"""
"""
def get_sanitized_config_charset(charset: str):
for x in ["$", "%"]:
charset = charset.replace(x, f'{x}{x}')
return charset
def typecast(obj: dict, aclass: type, dirname:Optional[Path] = None) -> dict:
"""typecast a dictionary according to class annotations
:param obj: dictionary to typecast
:param aclass: class containing typehint annotations
:param basepath: basepath for filesystem path typecasting
:returns: typecasted dictionary
"""
casted = {}
for k,v in obj.items():
v = v.strip('\'"')
try:
bclass = aclass.__annotations__[k]
except KeyError as e:
raise KeyError(f'{k}: not allowed') from e
if issubclass(bclass, Path) and not str(v[0]).startswith(os.path.sep):
if not isinstance(dirname, Path):
raise TypeError('no dirname for Path type specified.')
casted[k] = casted_val = dirname / v
elif issubclass(bclass, bytes):
casted[k] = bclass(v.encode('utf-8'))
elif issubclass(bclass, bool):
casted[k] = literal_eval(v)
else:
try:
casted_val = bclass(v)
except ValueError as e:
raise ValueError(f'{k}: {e}') from e
else:
casted[k] = casted_val
return casted
def get_config(configIni: ConfigParser, section: str, bclass: type, dirname: Path = None) -> object:
"""get an object-oriented configuration from an INI file
:param configIni: configparser.Configparser object with initialized stream
:param section: name of section to get configuration for
:param bclass: configuration base class
:param dirname: directory name of INI file stream
:returns: initialized configuration instance
"""
try:
raw_config = dict(configIni.items(section))
except NoSectionError as e:
raw_config = {}
try:
casted_config = typecast(raw_config, bclass, dirname)
except KeyError as e:
raise ConfigError(f'[{section}] {e}') from e
except ValueError as e:
raise ConfigError(f'[{section}] {e}') from e
try:
config = bclass(**casted_config)
except TypeError as e:
raise ConfigError(f'[{section}] {e}') from e
return config
def get_configparser(path: Path = None, var_name: str = CONFIGPATH_ENVIRON):
"""
"""
if path is None:
try:
path = environ[var_name]
except KeyError as e:
raise ConfigError(
f'environment variable \'{var_name}\' not set.') from e
configIni = ConfigParser()
configIni.read(path)
return configIni, path

View file

@ -8,7 +8,7 @@ from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.fernet import Fernet, InvalidToken from cryptography.fernet import Fernet, InvalidToken
from httpaste.context import Config from httpaste import Config
DEFAULT_HMAC_ITERATIONS = 20000 DEFAULT_HMAC_ITERATIONS = 20000
@ -38,7 +38,7 @@ def dhash(data: bytes):
return hashlib.sha512(data).digest() return hashlib.sha512(data).digest()
def derive_key(main_key: str, salt: bytes, iterations:int=DEFAULT_HMAC_ITERATIONS) -> bytes: def derive_key(main_key: str, salt: bytes = Config.salt, iterations:int=DEFAULT_HMAC_ITERATIONS) -> bytes:
"""derive a key from a main key """derive a key from a main key
:param main_key: main key to derive from :param main_key: main key to derive from

View file

@ -1,19 +1,144 @@
"""Model """Model
""" """
from typing import NamedTuple from typing import NamedTuple, Optional, Dict, Union, Any, TypedDict
from configparser import ConfigParser
from pathlib import Path
from httpaste.model.paste import Config as PasteConfig
from httpaste.model.paste import get_paste_model_config
class Config(NamedTuple):
"""Model Configuration"""
paste: PasteConfig
def get_model_config(configIni: ConfigParser, path:Path) -> Config: class PasteDataSchema:
"""Paste Interface schema between Model and Backend
"""
pid = bytes
data = bytes
data_hash = bytes
sub = bytes
timestamp = int
lifetime = int
expiration = int
encoding = str
paste_config = get_paste_model_config(configIni)
return Config(paste=paste_config) class UserDataSchema:
"""User Interface Schema between Model and Backend
"""
sub = bytes
key_hash = bytes
index = bytes
class Backend(object):
"""Backend
"""
parameter_class: str
class Salt(bytes):
"""Salt
"""
class PasteData(PasteDataSchema.data):
"""Paste Data
"""
class PasteHash(PasteDataSchema.data_hash):
"""Paste Data Hash
"""
class PasteTimestamp(PasteDataSchema.timestamp):
"""Paste Timestamp
"""
class PasteEncoding(PasteDataSchema.encoding):
"""
"""
class PasteExpiration(PasteDataSchema.expiration):
"""Paste Expiration
< 0: after first acccess
0: never
"""
class PasteLifetime(PasteDataSchema.lifetime):
"""Paste Lifetime
"""
class PasteSub(PasteDataSchema.sub):
"""Hashed user id
"""
class KeyHash(UserDataSchema.key_hash):
"""User Master Key Hash
"""
class PasteKey(bytes):
"""Paste encryption key
"""
class PasteId(PasteDataSchema.pid):
"""Paste unique identifier
"""
class MasterKey(bytes):
"""User's master encryption key
"""
class Sub(UserDataSchema.sub):
"""User id
"""
class Index(TypedDict):
"""User Paste Index
"""
auth_expires: int
pastes: Dict[str, Dict[str, Any]]
class SerializedIndex(UserDataSchema.index):
"""User Paste Index (serialized)
"""
class User(NamedTuple):
"""Global User Model (and Prototype)
non-optional values are prototype values
"""
#: user id
sub: Sub
#: user's master key hash
key_hash: Optional[KeyHash] = None
#: user's paste index
index: Optional[Union[Index, SerializedIndex]] = None
class Paste(NamedTuple):
"""Global Paste Model (and Prototype)
non-optional values are prototype values
"""
#: paste id
pid: PasteId
#: paste owner
sub: Optional[PasteSub] = None
#: paste data
data: Optional[PasteData] = None
#: paste data hash
data_hash: Optional[PasteHash] = None
#: paste timestamp
expiration: Optional[PasteExpiration] = None
#: paste encoding
encoding: Optional[PasteEncoding] = None

View file

@ -2,32 +2,17 @@
"""paste model interface """paste model interface
""" """
import json import json
from typing import Optional, Tuple, NamedTuple from typing import Optional, Tuple
import time import time
from configparser import ConfigParser
from string import ascii_uppercase, digits, ascii_letters, punctuation
from httpaste import Config
from httpaste.context import Config as ContextConfig
from httpaste.helper.crypto import dhash, shash, encrypt, decrypt from httpaste.helper.crypto import dhash, shash, encrypt, decrypt
from httpaste.helper.config import get_sanitized_config_charset, get_config
from httpaste.helper.common import generate_random_string from httpaste.helper.common import generate_random_string
from httpaste.schema import (Paste, PasteId, Sub, MasterKey, PasteKey, Salt, from httpaste.model import (Paste, PasteId, Sub, MasterKey, PasteKey, Salt,
PasteData, PasteHash, PasteTimestamp, PasteSub, PasteData, PasteHash, PasteTimestamp, PasteSub,
PasteLifetime, PasteEncoding, PasteExpiration) PasteLifetime, PasteEncoding, PasteExpiration)
class Config(NamedTuple):
id_size: int = 8
id_charset: str = ascii_letters + digits
key_size: int = 32
key_charset: str = get_sanitized_config_charset(ascii_letters + digits + punctuation)
default_lifetime: int = 5
default_max_lifetime: int = 1440
default_min_lifetime: int = 1
default_encoding: str = 'utf-8'
class NotFoundError(Exception): class NotFoundError(Exception):
"""Paste Exception """Paste Exception
""" """
@ -53,7 +38,9 @@ class BackendError(Exception):
""" """
def generate_paste_id(length: int, charset: str) -> bytes: def generate_paste_id(
length: int = Config.paste_id_size,
charset: str = Config.paste_id_charset) -> bytes:
"""generate a paste id """generate a paste id
:param length: length of id :param length: length of id
@ -63,7 +50,9 @@ def generate_paste_id(length: int, charset: str) -> bytes:
return generate_random_string(length, charset).encode('utf-8') return generate_random_string(length, charset).encode('utf-8')
def generate_paste_key(length: int, charset: str) -> bytes: def generate_paste_key(
length: int = Config.paste_key_size,
charset: str = Config.paste_key_charset) -> bytes:
"""generate a paste encryption key """generate a paste encryption key
:param length: length of key :param length: length of key
@ -109,7 +98,8 @@ def load_safe(
proto: Paste, proto: Paste,
key: PasteKey, key: PasteKey,
backend: object, backend: object,
context: ContextConfig): salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations):
"""load an encrypted paste model """load an encrypted paste model
:param proto: paste model prototype :param proto: paste model prototype
@ -119,7 +109,7 @@ def load_safe(
model = load(proto, backend) model = load(proto, backend)
data = decrypt(model.data, key, context.salt, context.hmac_iter) data = decrypt(model.data, key, salt, hmac_iter)
if model.data_hash and dhash(data) != model.data_hash: if model.data_hash and dhash(data) != model.data_hash:
@ -141,7 +131,10 @@ def dump(model: Paste, backend: object) -> None:
:param backend: model backend object :param backend: model backend object
""" """
try:
backend.dump(model) backend.dump(model)
except Exception as e:
raise BackendError(str(e)) from e
def delete(proto: Paste, backend: object) -> None: def delete(proto: Paste, backend: object) -> None:
@ -165,12 +158,13 @@ def delete_safe(
proto: Paste, proto: Paste,
key: PasteKey, key: PasteKey,
backend: object, backend: object,
context: ContextConfig) -> None: salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> None:
""" """
""" """
try: try:
model = load_safe(proto, key, backend, context) model = load_safe(proto, key, backend, salt, hmac_iter)
except LifetimeError: except LifetimeError:
pass pass
@ -183,9 +177,9 @@ def create(
data: PasteData, data: PasteData,
lifetime: PasteLifetime, lifetime: PasteLifetime,
encoding: PasteEncoding, encoding: PasteEncoding,
config: Config,
backend: object, backend: object,
context: ContextConfig) -> PasteId: salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> PasteId:
"""create an unencrypted paste """create an unencrypted paste
:param data: paste data :param data: paste data
@ -193,20 +187,18 @@ def create(
:param backend: model backend object :param backend: model backend object
""" """
pid = PasteId(generate_paste_id(config.id_size, config.id_charset)) pid = PasteId(generate_paste_id())
safe_pid = PasteId(dhash(pid)) safe_pid = PasteId(dhash(pid))
data_hash = PasteHash(dhash(data)) data_hash = PasteHash(dhash(data))
sub = None sub = None
timestamp = PasteTimestamp(int(time.time())) timestamp = PasteTimestamp(int(time.time()))
if lifetime is None: if lifetime < 0:
lifetime = config.default_lifetime
elif lifetime < 0:
expiration = -1 expiration = -1
else: else:
expiration = PasteExpiration(timestamp + (lifetime * 60)) expiration = PasteExpiration(timestamp + (lifetime * 60))
safe_data = PasteData(encrypt(data, pid, context.salt, context.hmac_iter)) safe_data = PasteData(encrypt(data, pid, salt, hmac_iter))
model = Paste( model = Paste(
safe_pid, safe_pid,
@ -225,9 +217,9 @@ def create_safe(data: PasteData,
lifetime: PasteLifetime, lifetime: PasteLifetime,
sub: Sub, sub: Sub,
encoding: PasteEncoding, encoding: PasteEncoding,
config: Config,
backend: object, backend: object,
context: ContextConfig) -> Tuple[PasteId,PasteKey]: salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> Tuple[PasteId,PasteKey]:
"""create an encrypted paste """create an encrypted paste
:param data: paste data :param data: paste data
@ -237,21 +229,19 @@ def create_safe(data: PasteData,
:param salt: randomization salt :param salt: randomization salt
""" """
pid = PasteId(generate_paste_id(config.id_size, config.id_charset)) pid = PasteId(generate_paste_id())
safe_pid = PasteId(dhash(pid)) safe_pid = PasteId(dhash(pid))
pkey = PasteKey(generate_paste_key(config.key_size, config.key_charset)) pkey = PasteKey(generate_paste_key())
data_hash = PasteHash(dhash(data)) data_hash = PasteHash(dhash(data))
safe_sub = PasteSub(shash(sub, data_hash, pid)) safe_sub = PasteSub(shash(sub, data_hash, pid))
timestamp = PasteTimestamp(int(time.time())) timestamp = PasteTimestamp(int(time.time()))
if lifetime is None: if lifetime < 0:
lifetime = config.default_lifetime
elif lifetime < 0:
expiration = -1 expiration = -1
else: else:
expiration = PasteExpiration(timestamp + (lifetime * 60)) expiration = PasteExpiration(timestamp + (lifetime * 60))
safe_data = PasteData(encrypt(data, pkey, context.salt, context.hmac_iter)) safe_data = PasteData(encrypt(data, pkey, salt, hmac_iter))
dump(Paste( dump(Paste(
safe_pid, safe_pid,
@ -279,20 +269,21 @@ def remove_safe(
sub: Sub, sub: Sub,
key: PasteKey, key: PasteKey,
backend: object, backend: object,
context: ContextConfig): salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations):
proto = Paste(pid, sub) proto = Paste(pid, sub)
delete_safe(proto, key, backend, context) delete_safe(proto, key, backend, salt, hmac_iter)
def get(pid: PasteId, backend: object, context: ContextConfig) -> PasteData: def get(pid: PasteId, backend: object, salt: Salt = Config.salt, hmac_iter: int = Config.hmac_iterations) -> PasteData:
"""conveniently load an unencrypted paste """conveniently load an unencrypted paste
""" """
model = load(Paste(pid), backend) model = load(Paste(pid), backend)
data = decrypt(model.data, pid, context.salt, context.hmac_iter) data = decrypt(model.data, pid, salt, hmac_iter)
return PasteData(data), model.expiration, model.encoding return PasteData(data), model.expiration, model.encoding
@ -301,22 +292,12 @@ def get_safe(
pid: PasteId, pid: PasteId,
pkey: PasteKey, pkey: PasteKey,
sub: Sub, sub: Sub,
config: Config,
backend: object, backend: object,
context: ContextConfig) -> PasteData: salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> PasteData:
"""conveniently load an encrypted paste """conveniently load an encrypted paste
""" """
model = load_safe(Paste(pid, sub), pkey, backend, context) model = load_safe(Paste(pid, sub), pkey, backend, salt, hmac_iter)
return PasteData(model.data), model.expiration, model.encoding return PasteData(model.data), model.expiration, model.encoding
def get_paste_model_config(configIni: ConfigParser) -> Config:
return get_config(configIni, 'model.paste', Config)
__all__ = [
get_paste_model_config
]

View file

@ -5,7 +5,7 @@ import json
from time import time from time import time
from typing import Optional from typing import Optional
from httpaste.context import Config as ContextConfig from httpaste import Config
from httpaste.helper.crypto import ( from httpaste.helper.crypto import (
dhash, dhash,
shash, shash,
@ -13,7 +13,7 @@ from httpaste.helper.crypto import (
decrypt, decrypt,
derive_key, derive_key,
DecryptionError) DecryptionError)
from httpaste.schema import ( from httpaste.model import (
User, User,
KeyHash, KeyHash,
Index, Index,
@ -39,7 +39,8 @@ def _load(
proto: User, proto: User,
master_key: str, master_key: str,
backend: object, backend: object,
context: ContextConfig) -> Optional[User]: salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> Optional[User]:
"""load user model """load user model
:param model: user model prototype :param model: user model prototype
@ -54,7 +55,7 @@ def _load(
return None return None
try: try:
serialized_data = decrypt(model.index, master_key, context.salt, context.hmac_iter) serialized_data = decrypt(model.index, master_key, salt, hmac_iter)
except DecryptionError as e: except DecryptionError as e:
raise IndexError('unable to decrypt user index') from e raise IndexError('unable to decrypt user index') from e
else: else:
@ -70,7 +71,8 @@ def _dump(
model: User, model: User,
key: MasterKey, key: MasterKey,
backend: object, backend: object,
context: ContextConfig) -> None: salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> None:
"""dump a user model """dump a user model
:param model: user model :param model: user model
@ -85,7 +87,7 @@ def _dump(
serialized_index = json.dumps(model.index).encode('utf-8') serialized_index = json.dumps(model.index).encode('utf-8')
safe_index = SerializedIndex(encrypt(serialized_index, key, context.salt, context.hmac_iter)) safe_index = SerializedIndex(encrypt(serialized_index, key, salt, hmac_iter))
backend.dump(User(*model[:-1], safe_index)) backend.dump(User(*model[:-1], safe_index))
@ -94,8 +96,7 @@ def load_paste_key(
pid: PasteId, pid: PasteId,
sub: Sub, sub: Sub,
key: MasterKey, key: MasterKey,
backend: object, backend: object, salt: Salt = Config.salt, hmac_iter: int = Config.hmac_iterations) -> Optional[PasteKey]:
context: ContextConfig) -> Optional[PasteKey]:
"""load a user paste key """load a user paste key
:param pid: paste id :param pid: paste id
@ -105,7 +106,7 @@ def load_paste_key(
:param salt: randomization salt :param salt: randomization salt
""" """
model = _load(User(sub), key, backend, context) model = _load(User(sub), key, backend, salt, hmac_iter)
for k, v in model.index.get('pastes').items(): for k, v in model.index.get('pastes').items():
@ -122,7 +123,8 @@ def dump_paste_key(
sub: Sub, sub: Sub,
key: MasterKey, key: MasterKey,
backend: object, backend: object,
context: ContextConfig) -> None: salt: str = Config.salt,
hmac_iter: int = Config.hmac_iterations) -> None:
"""dump a user paste key """dump a user paste key
:param pid: paste id :param pid: paste id
@ -132,20 +134,21 @@ def dump_paste_key(
:param backend: user model backend :param backend: user model backend
""" """
model = _load(User(sub), key, backend, context) model = _load(User(sub), key, backend, salt, hmac_iter)
model.index.setdefault('pastes', {})[pid.hex()] = { model.index.setdefault('pastes', {})[pid.hex()] = {
'key': pkey.hex() 'key': pkey.hex()
} }
_dump(model, key, backend, context) _dump(model, key, backend, salt, hmac_iter)
def authenticate( def authenticate(
user_id: bytes, user_id: bytes,
password: bytes, password: bytes,
backend: object, backend: object,
context: ContextConfig): salt: Salt = Config.salt,
hmac_iter: int = Config.hmac_iterations):
"""authenticate a user """authenticate a user
:param user_id: human-readable user id :param user_id: human-readable user id
@ -153,7 +156,7 @@ def authenticate(
""" """
sub = Sub(dhash(user_id)) sub = Sub(dhash(user_id))
key = MasterKey(derive_key(password, context.salt, context.hmac_iter)) key = MasterKey(derive_key(password, salt, hmac_iter))
key_hash = KeyHash(dhash(key)) key_hash = KeyHash(dhash(key))
proto = User(sub) proto = User(sub)
@ -161,7 +164,7 @@ def authenticate(
bogus_decline_msg = 'unable to authenticate' bogus_decline_msg = 'unable to authenticate'
try: try:
model = _load(proto, key, backend, context) model = _load(proto, key, backend, salt, hmac_iter)
except IndexError as e: except IndexError as e:
raise AuthenticationError(bogus_decline_msg) from e raise AuthenticationError(bogus_decline_msg) from e
@ -172,7 +175,7 @@ def authenticate(
} }
model = User(sub, key_hash, Index(data)) model = User(sub, key_hash, Index(data))
_dump(model, key, backend, context) _dump(model, key, backend, salt, hmac_iter)
else: else:
if model.key_hash != key_hash: if model.key_hash != key_hash:

View file

@ -1,137 +0,0 @@
from typing import NamedTuple, Optional, Dict, Union, Any, TypedDict
class PasteDataSchema:
"""Paste Interface schema between Model and Backend
"""
pid = bytes
data = bytes
data_hash = bytes
sub = bytes
timestamp = int
lifetime = int
expiration = int
encoding = str
class UserDataSchema:
"""User Interface Schema between Model and Backend
"""
sub = bytes
key_hash = bytes
index = bytes
class Salt(bytes):
"""Salt
"""
class PasteData(PasteDataSchema.data):
"""Paste Data
"""
class PasteHash(PasteDataSchema.data_hash):
"""Paste Data Hash
"""
class PasteTimestamp(PasteDataSchema.timestamp):
"""Paste Timestamp
"""
class PasteEncoding(PasteDataSchema.encoding):
"""
"""
class PasteExpiration(PasteDataSchema.expiration):
"""Paste Expiration
< 0: after first acccess
0: never
"""
class PasteLifetime(PasteDataSchema.lifetime):
"""Paste Lifetime
"""
class PasteSub(PasteDataSchema.sub):
"""Hashed user id
"""
class KeyHash(UserDataSchema.key_hash):
"""User Master Key Hash
"""
class PasteKey(bytes):
"""Paste encryption key
"""
class PasteId(PasteDataSchema.pid):
"""Paste unique identifier
"""
class MasterKey(bytes):
"""User's master encryption key
"""
class Sub(UserDataSchema.sub):
"""User id
"""
class Index(TypedDict):
"""User Paste Index
"""
auth_expires: int
pastes: Dict[str, Dict[str, Any]]
class SerializedIndex(UserDataSchema.index):
"""User Paste Index (serialized)
"""
class User(NamedTuple):
"""Global User Model (and Prototype)
non-optional values are prototype values
"""
#: user id
sub: Sub
#: user's master key hash
key_hash: Optional[KeyHash] = None
#: user's paste index
index: Optional[Union[Index, SerializedIndex]] = None
class Paste(NamedTuple):
"""Global Paste Model (and Prototype)
non-optional values are prototype values
"""
#: paste id
pid: PasteId
#: paste owner
sub: Optional[PasteSub] = None
#: paste data
data: Optional[PasteData] = None
#: paste data hash
data_hash: Optional[PasteHash] = None
#: paste timestamp
expiration: Optional[PasteExpiration] = None
#: paste encoding
encoding: Optional[PasteEncoding] = None

View file

@ -1,17 +0,0 @@
from typing import NamedTuple
from configparser import ConfigParser
from httpaste.helper.config import get_config
class Config(NamedTuple):
"""connexion config
"""
swagger_ui: bool = True
bind_address: str = None
def get_server_config(configIni: ConfigParser) -> Config:
return get_config(configIni, 'server', Config)

View file

@ -1,8 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""httpaste WSGI entrypoint """httpaste WSGI entrypoint
""" """
from httpaste import load_config, get_flask_app from httpaste import load_config, get_flask_app, get_config_path
config = load_config() config, server_config = load_config(get_config_path())
application = get_flask_app(config) application = get_flask_app(config, server_config)

View file

@ -1,134 +0,0 @@
#!/usr/bin/env python3
import pytest
from textwrap import dedent
from unittest.mock import mock_open, patch
from configparser import ConfigParser
from pathlib import Path
@pytest.fixture
def module():
from httpaste import backend
return backend
class Test_get_backend_config():
@pytest.fixture(autouse=True)
def setup(self, module):
self.func = module.get_backend_config
def test_default_file(self, module):
data = dedent("""
[backend]
type = file
[backend.file]
base_dirname = 'sample_data'
""")
path = Path('/foo')
configIni = ConfigParser()
with patch('builtins.open', mock_open(read_data=data)):
configIni.read(str(path))
config = self.func(configIni, path)
assert isinstance(config, module.Config)
assert issubclass(config.interface, module.BackendInterface)
assert str(config.config.base_dirname) == '/foo/sample_data'
def test_sqlite(self, module):
data = dedent("""
[backend]
type = sqlite
[backend.sqlite]
uri = 'foobar.db'
""")
configIni = ConfigParser()
with patch('builtins.open', mock_open(read_data=data)):
configIni.read('void')
config = self.func(configIni, Path('/foo'))
assert str(config.config.uri) == '/foo/foobar.db'
def test_mysql(self, module):
data = dedent("""
[backend]
type = mysql
[backend.mysql]
user = 'foo'
password = bar
host = manana
database = test
""")
configIni = ConfigParser()
with patch('builtins.open', mock_open(read_data=data)):
configIni.read('void')
config = self.func(configIni, Path('/foo'))
assert config.config.user == 'foo'
assert config.config.password == 'bar'
assert config.config.host == 'manana'
assert config.config.database == 'test'
#class Test_load():
#
# @pytest.fixture(autouse=True)
# def setup(self, module):
#
# self.func = module.load
#
# def test_missing_parameter(self, module):
#
# config = module.Config()
# config.name = 'file'
# config.parameters = {}
#
# with pytest.raises(module.BackendError):
# self.func(config)
#
# def test_unknown_parameter(self, module):
#
# config = module.Config()
# config.name = 'file'
# config.parameters = {
# 'base_dirname': 'foofoo',
# 'foo': 'bar'
# }
#
# with pytest.raises(module.BackendError):
# self.func(config)
#
# def test_file(self, module):
#
# config = module.Config()
# config.name = 'file'
# config.parameters = {
# 'base_dirname': 'foofoo',
# 'user_dirnamea': 'test'
# }
#
# backend = self.func(config)
#
# assert isinstance(backend, module.BackendInterface)

View file

@ -1,149 +0,0 @@
import pytest
from typing import NamedTuple
from unittest.mock import mock_open, patch
from textwrap import dedent
from pathlib import Path
from configparser import ConfigParser
@pytest.fixture
def module():
from httpaste.helper import config
return config
@pytest.fixture
def mock_aclass():
class Foobar(NamedTuple):
foo: int
bar: str = 'test'
return Foobar
@pytest.fixture
def mock_aclass_special():
class Foobar(NamedTuple):
foobar: Path
return Foobar
class Test_typecast():
@pytest.fixture(autouse=True)
def setup(self, module, mock_aclass):
self.func = module.typecast
self.mock_aclass = mock_aclass
def test_default(self, module):
foobar = {
'foo': '45'
}
result = self.func(foobar, self.mock_aclass)
assert isinstance(result, dict)
assert result['foo'] == 45
assert result.get('bar') is None
def test_type_mismatch(self, module):
foobar = {
'foo': 'foobar'
}
with pytest.raises(ValueError):
self.func(foobar, self.mock_aclass)
def test_unknown_key(self, module):
foobar = {
'foo': '45',
'foobar': 'foobar'
}
with pytest.raises(KeyError):
self.func(foobar, self.mock_aclass)
class Test_get_config():
@pytest.fixture(autouse=True)
def setup(self, module, mock_aclass):
self.func = module.get_config
self.mock_aclass = mock_aclass
def test_default(self):
data = dedent("""
[foobar]
foo = 45
""")
configIni = ConfigParser()
with patch('builtins.open', mock_open(read_data=data)):
configIni.read(str('void'))
result = self.func(configIni, 'foobar', self.mock_aclass)
assert isinstance(result, self.mock_aclass)
assert result.foo == 45
def test_relative_path(self, mock_aclass_special):
data = dedent("""
[foobar]
foobar = 'bar/foo'
""")
dirname = Path('/foo/bar')
configIni = ConfigParser()
with patch('builtins.open', mock_open(read_data=data)):
configIni.read(str('void'))
result = self.func(configIni, 'foobar', mock_aclass_special, dirname)
assert isinstance(result, mock_aclass_special)
assert isinstance(result.foobar, Path)
assert str(result.foobar) == '/foo/bar/bar/foo'
def test_absolute_path(self, mock_aclass_special):
data = dedent("""
[foobar]
foobar = '/bar/foo'
""")
configIni = ConfigParser()
with patch('builtins.open', mock_open(read_data=data)):
configIni.read(str('void'))
result = self.func(configIni, 'foobar', mock_aclass_special)
assert isinstance(result, mock_aclass_special)
assert isinstance(result.foobar, Path)
assert str(result.foobar) == '/bar/foo'

View file

@ -1,79 +0,0 @@
#!/usr/bin/env python3
import pytest
from textwrap import dedent
from unittest.mock import mock_open, patch
from configparser import ConfigParser
from pathlib import Path
@pytest.fixture
def module():
from httpaste.model import paste
return paste
class Test_get_paste_model_config():
@pytest.fixture(autouse=True)
def setup(self, module):
self.func = module.get_paste_model_config
def test_default(self, module):
data = ''
configIni = ConfigParser()
with patch('builtins.open', mock_open(read_data=data)):
configIni.read('void')
result = self.func(configIni)
assert isinstance(result, module._Config)
assert isinstance(result.id_size, int), result.id_size
assert isinstance(result.key_size, int), result.key_size
#class Test_load():
#
# @pytest.fixture(autouse=True)
# def setup(self, module):
#
# self.func = module.load
#
# def test_missing_parameter(self, module):
#
# config = module.Config()
# config.name = 'file'
# config.parameters = {}
#
# with pytest.raises(module.BackendError):
# self.func(config)
#
# def test_unknown_parameter(self, module):
#
# config = module.Config()
# config.name = 'file'
# config.parameters = {
# 'base_dirname': 'foofoo',
# 'foo': 'bar'
# }
#
# with pytest.raises(module.BackendError):
# self.func(config)
#
# def test_file(self, module):
#
# config = module.Config()
# config.name = 'file'
# config.parameters = {
# 'base_dirname': 'foofoo',
# 'user_dirnamea': 'test'
# }
#
# backend = self.func(config)
#
# assert isinstance(backend, module.BackendInterface)

View file

@ -35,15 +35,6 @@ deps =
commands = commands =
python3 -m build {posargs} python3 -m build {posargs}
[testenv:build-docker]
description = build docker image
passenv =
DOCKER_*
allowlist_externals =
docker
sh
commands =
docker image build -t victorykit/httpaste .
[testenv:docs] [testenv:docs]
description = build documentation description = build documentation