Compare commits

..

74 commits

Author SHA1 Message Date
Tiara Rodney
9221fdcfe2
chore: cleanup 2026-03-27 19:00:09 +01:00
Tiara Rodney
d8d32e1662
docs: add development guidelines 2026-03-21 18:35:12 +01:00
Tiara Rodney
3ee3f15326
chore: update build 2026-03-21 17:13:02 +01:00
Tiara Rodney
bd3d0814c9
chore: reapply editable 2026-03-16 00:24:35 +01:00
Tiara Rodney
4cd79cc6a4
chore: reapply editable 2026-03-16 00:21:30 +01:00
Tiara Rodney
4cdf357022
dirty 2026-03-16 00:09:57 +01:00
Tiara Rodney
c4fb29f694
feat(git): submodule and remote handling 2026-03-04 18:10:18 +01:00
Tiara Rodney
5bf4a7eee4
migrate sphinxcontrib.h5p.utils 2026-03-04 13:11:07 +01:00
Tiara Rodney
cc4b567181
chore: ignore tox working directory 2025-12-31 14:40:39 +01:00
Tiara Rodney
22aeecd630
todo(17): in-progress 2025-12-31 14:28:27 +01:00
Tiara Rodney
90567899e0
todo(17): open 2025-12-31 14:27:59 +01:00
Tiara Rodney
0a23243b76
todo(16): change priority
project that uses it is currently on hold, hence priority is low
2025-12-31 14:24:38 +01:00
Tiara Rodney
b257fd3d88
todo(16): in-progress 2025-12-31 14:24:09 +01:00
Tiara Rodney
6b29a8d525
todo(16): open 2025-12-31 14:22:38 +01:00
Tiara Rodney
c96568f42f
Merge branch 'bugfix/15' into dev
ID: 15
Type: bugfix
Title: modularize module containers
Status: open
Priority: high
Created: 2025-06-28
Description: Even though importlib can find submodules through traversing paths
             instead of relying on __init__.py for every ancestor module, this
             is not supported by some modules like sphinx.ext.autosummary
2025-06-28 01:42:33 +02:00
Tiara Rodney
74dac5249b
todo(15): done 2025-06-28 01:42:13 +02:00
Tiara Rodney
89f7420fae
refactor: modularize submodules 2025-06-28 01:41:32 +02:00
Tiara Rodney
9d09a4abaa
todo(15): in-progress 2025-06-28 01:40:17 +02:00
Tiara Rodney
b9e9e13630
todo(15): open 2025-06-28 01:39:37 +02:00
Tiara Rodney
32ae99c5fa
Merge branch 'feature/14' into dev
ID: 14
Type: feature
Title: add compression support for urllib PkgHandler
Status: done
Priority: high
Created: 2025-06-21
Description: with a proper content-type of the PkgHandler addinfourl object, a
             consumer can determine whether the file is compressed or not.
2025-06-21 20:27:58 +02:00
Tiara Rodney
7e18e4795d
todo(14): done 2025-06-21 20:27:09 +02:00
Tiara Rodney
2d9fe8b625
feat(urllib.request): add compression support for PkgHandler 2025-06-21 20:26:36 +02:00
Tiara Rodney
981985e51a
todo(14): in-progress 2025-06-21 20:26:15 +02:00
Tiara Rodney
b6a24de08c
Merge branch 'bugfix/13' into dev
ID: 13
Type: bugfix
Title: fix unit tests for urllib PkgHandler
Status: done
Priority: high
Created: 2025-06-21
Description: change of issue 12 wasn't properly reflected in urllib PkgHandler
             unit tests
2025-06-21 20:25:43 +02:00
Tiara Rodney
5beb699dda
todo(13): done 2025-06-21 20:25:28 +02:00
Tiara Rodney
f9897a9aa5
fix(tests): adapt name of new package mock fixture 2025-06-21 20:25:11 +02:00
Tiara Rodney
69d498a203
todo(13): in-progress 2025-06-21 20:24:51 +02:00
Tiara Rodney
3b5097a99c
todo(14): open 2025-06-21 20:23:41 +02:00
Tiara Rodney
81dc18c506
todo(13): open 2025-06-21 20:22:19 +02:00
Tiara Rodney
aad29bc76a
Merge branch 'feature/12' into dev
ID: 12
Type: feature
Title: simplify testing.fixtures.mock_pkg
Status: done
Priority: high
Created: 2025-06-21
Description: Only bootstrap a package mock with the minimum requirements for a
             Python module and let the consumer handle the directory layout.
2025-06-21 18:16:34 +02:00
Tiara Rodney
16c12b5576
todo(12): done 2025-06-21 18:16:18 +02:00
Tiara Rodney
aa2540cf3f
refactor(testing.pytest): change mock_pkg interface
being more explicit about what the fixture provides as an output, instead of
solely describing the site effects.

Also the consumer is now responsible for the module directory layout.
2025-06-21 18:14:52 +02:00
Tiara Rodney
e6bf657919
todo(12): in-progress 2025-06-21 18:14:22 +02:00
Tiara Rodney
6b6fb02f41
todo(12): open 2025-06-21 18:13:59 +02:00
Tiara Rodney
b9bea79c60
Merge branch 'bugfix/11' into dev
ID: 11
Type: bugfix
Title: move testing utils out of utils
Status: in-progres
Priority: high
Created: 2025-06-20
Description: to shorten the namespace and also indicate that testing utilities
             are different from regular utilities
2025-06-21 01:43:07 +02:00
Tiara Rodney
53dc780a04
todo(11): done 2025-06-21 01:42:16 +02:00
Tiara Rodney
9abfabde00
chore: move testing utils out of utils 2025-06-21 01:41:47 +02:00
Tiara Rodney
55ec6323bb
todo(11): in-progress 2025-06-21 01:34:30 +02:00
Tiara Rodney
0eee6c5771
todo(11): open 2025-06-21 01:34:14 +02:00
Tiara Rodney
d0dfa1cb12
Merge branch 'feature/6' into dev
ID: 6
Type: feature
Title: implement importlib.resources handler for urllib
Status: done
Priority: high
Created: 2025-06-20
Description: A handler that can be registered with an urllib.request
             OpenerDirector to open importlib.resources package files.
2025-06-21 00:39:28 +02:00
Tiara Rodney
a4fa083980
todo(6): done 2025-06-21 00:39:13 +02:00
Tiara Rodney
3795ff3e38
chore: add integration test to default tests 2025-06-21 00:38:20 +02:00
Tiara Rodney
59713aefb8
feat(urllib.request): add importlib resource handler 2025-06-21 00:34:29 +02:00
Tiara Rodney
24806959bb
feat(testing.pytest): add pkg mock fixture 2025-06-21 00:32:07 +02:00
Tiara Rodney
43cdf21d4b
feat(testing.pytest): add subprocess decorator 2025-06-20 23:36:56 +02:00
Tiara Rodney
1ea3b3a24d
feat(urllib): init PkgHandler 2025-06-20 21:49:28 +02:00
Tiara Rodney
59dcc2c7c0
Merge branch 'feature/10' into dev
ID: 10
Type: feature
Title: pytest current test context fixtures
Status: done
Priority: high
Created: 2025-06-20
Description: add fixtures for doing things in relation to the active testing
             context
2025-06-20 21:48:41 +02:00
Tiara Rodney
03561be791
todo(10): done 2025-06-20 21:48:21 +02:00
Tiara Rodney
644beb8696
feat(testing): init pytest fixtures
current_test fixture allows to retrieve the current test context, that is
exposed through the shell environment
2025-06-20 21:47:17 +02:00
Tiara Rodney
c579ddd022
todo(10): in-progress 2025-06-20 21:00:53 +02:00
Tiara Rodney
44e35846a5
todo(10): open 2025-06-20 21:00:31 +02:00
Tiara Rodney
18d4958658
Merge branch 'bugfix/9' into dev
ID: 9
Type: bugfix
Title: fix LICENSE reference
Status: done
Priority: high
Created: 2025-06-20
Description: license specification is no longer a trove classifier in
             pyproject.toml, hence the reference to LICENSE must be changed
2025-06-20 20:40:08 +02:00
Tiara Rodney
324df0e6d2
todo(9): done 2025-06-20 20:39:53 +02:00
Tiara Rodney
d799c62c78
chore: change license reference 2025-06-20 20:39:34 +02:00
Tiara Rodney
7e8082bae2
feat(license): add unlicense license
Don't know what to license this under yet
2025-06-20 20:39:19 +02:00
Tiara Rodney
e0e99480e3
todo(9): in-progress 2025-06-20 20:38:14 +02:00
Tiara Rodney
576aad9d4c
todo(9): open 2025-06-20 20:37:49 +02:00
Tiara Rodney
e14886dc3f
Merge branch 'bugfix/8' into dev
ID: 8
Type: bugfix
Title: rename package
Status: done
Priority: high
Created: 2025-06-20
Description: use dot namespaces to make the package a little more elegant
2025-06-20 20:35:29 +02:00
Tiara Rodney
fb0c65c6af
todo(8): done 2025-06-20 20:35:11 +02:00
Tiara Rodney
1fb1e0d0bf
chore: rename package 2025-06-20 20:33:37 +02:00
Tiara Rodney
dd57ecabb9
todo(8): in-progress 2025-06-20 20:25:51 +02:00
Tiara Rodney
dfc28e2240
Merge branch 'feature/7' into dev
ID: 7
Type: feature
Title: setup advanced testing environment
Status: done
Priority: high
Created: 2025-06-20
Description: copy the testing environment setup from
             byteb4rb1e.sphinxcontrib.ext
2025-06-20 20:21:30 +02:00
Tiara Rodney
f0f36542f4
todo(7): done 2025-06-20 20:21:06 +02:00
Tiara Rodney
6bf67f4a88
chore(test): update Makefile 2025-06-20 20:19:32 +02:00
Tiara Rodney
b6a99d4b2d
chore(test): ignore test-reports 2025-06-20 20:19:32 +02:00
Tiara Rodney
dc69eea88a
chore(test): remove redundant module entrypoints
pytest compared to built-in unittest does not discover test suites based on a
directory being marked as a module, instead matching against the basename of a
file to determine whether it is a test suite or not.
2025-06-20 20:19:32 +02:00
Tiara Rodney
ab626d5c8e
chore(test): remove redundant dependencies
dependencies for test environments are handled by tox and defined in tox.ini
2025-06-20 20:19:31 +02:00
Tiara Rodney
6955b5e330
feat(test): add entrypoints and runtime dependency 2025-06-20 20:19:31 +02:00
Tiara Rodney
b1a469a351
feat(test): init tox config 2025-06-20 20:19:31 +02:00
Tiara Rodney
c0adb4cdfb
todo(8): open 2025-06-20 20:19:05 +02:00
Tiara Rodney
c6c6d806ac
todo(7): in-progress 2025-06-20 19:48:22 +02:00
Tiara Rodney
d4068f464b
todo(7): open 2025-06-20 19:48:05 +02:00
Tiara Rodney
32d6a7a0df
todo(6): in-progress 2025-06-20 19:44:25 +02:00
Tiara Rodney
a384efbe05
todo(6): open 2025-06-20 19:43:43 +02:00
45 changed files with 2367 additions and 3249 deletions

2
.gitignore vendored
View file

@ -11,3 +11,5 @@
/configure~
*.swo
*.swp
/test-reports/
/.tox/

122
DEVELOPMENT.md Normal file
View file

@ -0,0 +1,122 @@
# Development
> All changes MUST follow the vendor/tiara-gitflow-spec.git and no work MUST be
> started without a TODO issue.
## Prerequisites
- Python 3.9+
- [Pipenv](https://pipenv.pypa.io/)
- [tox](https://tox.wiki/) (installed via Pipenv dev dependencies)
- Node.js (for the `@byteb4rb1e/mime-todo` issue tracker CLI)
## Setup
Iniitialize Git submodules:
```bash
git submodule update --init --remote --recursive
```
Install dependencies (includes the package in editable mode):
```bash
pipenv install --dev
```
## Tooling
### Package
The project is packaged as `byteb4rb1e.utils` under a namespace package
layout (`src/byteb4rb1e/utils/`). It is installed in editable mode via
Pipenv.
Build a distribution:
```bash
pipenv run dist
```
### Testing
Tests are managed by tox. Test environments are defined in `tox.ini`:
```bash
# run all test suites
tox
# run specific environments
tox -e unit-py313
tox -e lint
tox -e format
```
| Environment | Purpose |
|---|---|
| `unit-py3{9-13}` | Unit tests |
| `smoke-py3{9-13}` | Smoke tests |
| `integration-py3{9-13}` | Integration tests |
| `lint` | Type checking (mypy) |
| `format` | Code style (autopep8) |
| `audit` | Dependency audit (pip-audit) |
### Issue tracker
Issues are tracked in the `TODO` file using the
[MIME TODO](https://specs.code.tiararodney.com/mime-todo/) format. Use the
`@byteb4rb1e/mime-todo` CLI to interact with it:
```bash
# list issues
npx @byteb4rb1e/mime-todo list
# show a specific issue
npx @byteb4rb1e/mime-todo show 3
# create an issue
npx @byteb4rb1e/mime-todo create --type feature --title "Title" --plan "Description" --module homeostat
```
See [CONTRIBUTING.md](CONTRIBUTING.md) for the full issue lifecycle.
### Publishing
Build wheel and source distributions:
```sh
pipenv run sdist
```
Configure publishing options:
`~/.pypirc`
```
[distutils]
index-servers =
tiararodney
[tiararodney]
repository: https://pypi.code.tiararodney.com/root/byteb4rb1e/
username: <username>
password: <password>
```
Publish to pypi.code.tiararodney.com:
```sh
pipenv run sdist:publish:tiarardoney
```
## Project layout
```
src/byteb4rb1e/utils/ # package source
tests/ # test suites (unit/, smoke/, integration/)
vendor/ # vendored specs
dist/ # sdist and wheel build output
DEVELOPMENT.md # this file
TODO # issue tracker (MIME TODO format)
```

View file

@ -1,24 +0,0 @@
.PHONY: chore configure
chore: configure Pipfile.lock requirements-dev.txt
Pipfile.lock: .venv Pipfile
.venv/bin/pipenv lock
requirements-dev.txt: .venv Pipfile.lock
.venv/bin/pipenv requirements --dev-only > requirements-dev.txt
configure: configure.ac
autoconf
.venv: requirements-dev.txt
python3 -m venv .venv
.venv/bin/python3 -m pip install --upgrade pip
.venv/bin/pip install -r requirements-dev.txt
test-reports:
.venv/bin/python3 -m unittest discover -v
build: .venv/bin/pipenv
.venv/bin/pipenv run build

88
NOTES
View file

@ -1,88 +0,0 @@
These are just a couple of brain farts that came up and I'd rather note down.
There's no clear structure.
RFC 1341 Boundary Matching in a Circular Buffer
1. Algorithm Considerations
Knuth-Morris-Pratt (KMP) Limitations:
Useful when patterns have prefix-suffix overlaps for efficient skipping.
If the failure table consists only of zeros, KMP provides no speed advantage
over naive searching.
Boundary pattern is arbitrary, meaning KMPs preprocessing may not be
beneficial.
Alternatives to KMP:
Rabin-Karp rolling hash → Uses fast hash comparisons instead of
character-by-character matching.
Boyer-Moore-Horspool → Precomputes skip distances to avoid redundant
comparisons, works well for longer patterns.
Crochemore-Perrin two-way search → used by str.find(), flexible
but assumes a linear memory layout so not really applicable for my circular
buffer approach
2. Boundary Characteristics
Max length: 70 bytes. Character set: ASCII only. No structure guarantees: The
boundary is client-defined, so I must be able to handle arbitrary sequences.
3. Algorithm Selection
Rolling Hash → Best for arbitrary short-to-medium patterns in a circular buffer.
Boyer-Moore → Ideal if the boundary has distinct character distributions to
optimize skipping.
# Optimized Chunk-Based Rolling Hash Matching
We need to efficiently detect an RFC 1341 multipart boundary inside a circular
buffer, ensuring minimal overhead while avoiding unnecessary comparisons.
Traditional approaches like Knuth-Morris-Pratt (KMP) dont provide an advantage
when the boundary lacks repeated subpatterns. Meanwhile, full rolling hash
matching scans every byte, which can be wasteful.
Thus, we introduce a chunk-wise hash-based skipping strategy, allowing us to
skip large sections of the buffer when an early non-match is detected.
## Core Idea
Precompute hashes for evenly sized chunks of the boundary. -> First, match only
the hash of the first chunk → immediately skip unnecessary buffer sections if no
match. -> If the first chunk matches, progressively verify subsequent chunks
until the full boundary is confirmed. Benefits Over Full Matching
## Benefits Over Full Matching
- Reduces comparisons significantly → eliminates large sections early when
non-matches occur.
- Balances preprocessing cost vs runtime → faster
elimination means fewer wasted cycles.
Integrates seamlessly into circular buffers → allows skipping intelligently.
### Precompute Chunk Hashes
- Divide the pattern into `N` equal-sized chunks (e.g., 7 chunks of 10 bytes
for a 70-byte boundary).
- Compute a rolling hash for each chunk in addition to the full pattern, storing
them for quick lookup.
### Sliding Window Search in the Buffer
- Compute the rolling hash for each window of size chunk_size.
- Compare the first chunks hash with the buffer window.
- If no match, skip boundary_length - chunk_size bytes.
### Progressive Chunk Verification
- If the first chunk matches, verify the next chunk sequentially.
- Continue matching chunks until the full boundary is confirmed.
- Perform final character-by-character validation to rule out hash collisions.

20
Pipfile
View file

@ -4,17 +4,25 @@ verify_ssl = true
name = "pypi"
[dev-packages]
mypy = "~=1.15.0"
autopep8 = "~=2.3.2"
setuptools-scm = "~=8.2.0"
pylint = "~=3.3.6"
build = "*"
pipenv = "*"
byteb4rb1e-utils = { editable = true, path = '.'}
tox = "*"
twine = "*"
pypi-attestations = "*"
autopep8 = "*"
[requires]
python_version = "3.11"
python_version = "3"
[scripts]
"build" = "python3 -m build"
"dist" = "python3 -m build"
"dist:attestations" = "python3 -m pypi_attestations sign dist/*"
"dist:publish:tiararodney" = "python3 -m twine upload --sign --repository tiararodney dist/*"
"test" = "tox"
"test:static" = "tox run -m static"
"test:unit" = "tox run -m unit"
"test:integration" = "tox run -m integration"
[packages]
"byteb4rb1e.utils" = {file = ".", editable = true}

950
Pipfile.lock generated

File diff suppressed because it is too large Load diff

132
TODO
View file

@ -109,3 +109,135 @@ Description: Implement my custom algorithm for doing rolling hash string search
against a fixed length ring buffer
---
ID: 6
Type: feature
Title: implement importlib.resources handler for urllib
Status: done
Priority: high
Created: 2025-06-20
Description: A handler that can be registered with an urllib.request
OpenerDirector to open importlib.resources package files.
---
ID: 7
Type: feature
Title: setup advanced testing environment
Status: done
Priority: high
Created: 2025-06-20
Description: copy the testing environment setup from
byteb4rb1e.sphinxcontrib.ext
---
ID: 8
Type: bugfix
Title: rename package
Status: done
Priority: high
Created: 2025-06-20
Description: use dot namespaces to make the package a little more elegant
---
ID: 9
Type: bugfix
Title: fix LICENSE reference
Status: done
Priority: high
Created: 2025-06-20
Description: license specification is no longer a trove classifier in
pyproject.toml, hence the reference to LICENSE must be changed
---
ID: 10
Type: feature
Title: pytest current test context fixtures
Status: done
Priority: high
Created: 2025-06-20
Description: add fixtures for doing things in relation to the active testing
context
---
ID: 11
Type: bugfix
Title: move testing utils out of utils
Status: done
Priority: high
Created: 2025-06-20
Description: to shorten the namespace and also indicate that testing utilities
are different from regular utilities
---
ID: 12
Type: feature
Title: simplify testing.fixtures.mock_pkg
Status: done
Priority: high
Created: 2025-06-21
Description: Only bootstrap a package mock with the minimum requirements for a
Python module and let the consumer handle the directory layout.
---
ID: 13
Type: bugfix
Title: fix unit tests for urllib PkgHandler
Status: done
Priority: high
Created: 2025-06-21
Description: change of issue 12 wasn't properly reflected in urllib PkgHandler
unit tests
---
ID: 14
Type: feature
Title: add compression support for urllib PkgHandler
Status: done
Priority: high
Created: 2025-06-21
Description: with a proper content-type of the PkgHandler addinfourl object, a
consumer can determine whether the file is compressed or not.
---
ID: 15
Type: bugfix
Title: modularize module containers
Status: open
Priority: high
Created: 2025-06-28
Description: Even though importlib can find submodules through traversing paths
instead of relying on __init__.py for every ancestor module, this
is not supported by some modules like sphinx.ext.autosummary
---
ID: 16
Type: feature
Title: SQL-aware dataclass
Status: in-progress
Priority: low
Created: 2025-12-31
Description: A dataclass that transparently maps onto an SQL datastore, with
command generation for syncing data between data class and store
---
ID: 17
Type: feature
Title: recursive-descent HTML (DOM) parser
Status: in-progress
Priority: high
Created: 2025-12-31
Description: Extend the built-in event-driven parser to be modeled after DOM
recursive-descent HTML parser
---

2663
configure vendored

File diff suppressed because it is too large Load diff

View file

@ -1,27 +0,0 @@
AC_INIT
AC_CHECK_PROGS([MAKE], [make], [no])
AS_IF([test "$MAKE" == "no"],
[AC_MSG_NOTICE([without GNU Make, you have to inspect 'Makefile' and deduce build targets yourself.])])
AC_CHECK_PROGS([GIT], [git], [no])
AS_IF([test "$GIT" == "no"],
[AC_MSG_ERROR([install Git, before continuing.])])
AC_CHECK_PROGS([PYTHON3], [python3], [no])
AS_IF([test "$PYTHON3" == "no"],
[AC_MSG_ERROR([install Python 3, before continuing.])])
# required in Makefile to ensure proper path resolution during preprocessing
# realpath is not available on macOS
AC_CHECK_PROGS([REALPATH], [realpath], [no])
AS_IF([test "$REALPATH" == "no"],
[AC_MSG_ERROR([set a persistent alias for 'realpath', before continuing, e.g.
alias='python3 -c "import pathlib,sys;print(pathlib.Path(sys.argv[[1]]).resolve())"'"
])])
AC_MSG_NOTICE([initializing python3 venv...])
make .venv
AC_OUTPUT

View file

@ -7,12 +7,12 @@ requires = [
build-backend = "setuptools.build_meta"
[project]
name = "byteb4rb1e-utils"
name = "byteb4rb1e.utils"
description = "personal utilities and helpers"
authors = [
{ name = "Tiara Rodney", email = "tiara.rodney@administratrix.de" }
{ name = "Tiara Rodney", email = "tiara.rodney@byteb4rb1e.me" }
]
license = { file = "LICENSE" }
license-files = ["LICENSE"]
readme = "README.md"
classifiers = [
"Development Status :: 1 - Planning",
@ -48,7 +48,6 @@ strict = true
max_line_length = 80
aggressive = 3
recursive = true
in-place = true
[tool.setuptools_scm]

View file

@ -1,25 +0,0 @@
-i https://pypi.org/simple
astroid==3.3.9; python_full_version >= '3.9.0'
autopep8==2.3.2; python_version >= '3.9'
build==1.2.2.post1; python_version >= '3.8'
-e .
certifi==2025.4.26; python_version >= '3.6'
colorama==0.4.6; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5, 3.6'
dill==0.4.0; python_version >= '3.8'
distlib==0.3.9
filelock==3.18.0; python_version >= '3.9'
isort==6.0.1; python_full_version >= '3.9.0'
mccabe==0.7.0; python_version >= '3.6'
mypy==1.15.0; python_version >= '3.9'
mypy-extensions==1.1.0; python_version >= '3.8'
packaging==25.0; python_version >= '3.8'
pipenv==2025.0.2; python_version >= '3.9'
platformdirs==4.3.7; python_version >= '3.9'
pycodestyle==2.13.0; python_version >= '3.9'
pylint==3.3.6; python_full_version >= '3.9.0'
pyproject-hooks==1.2.0; python_version >= '3.7'
setuptools==80.3.0; python_version >= '3.9'
setuptools-scm==8.2.0; python_version >= '3.8'
tomlkit==0.13.2; python_version >= '3.8'
typing-extensions==4.13.2; python_version >= '3.8'
virtualenv==20.30.0; python_version >= '3.8'

View file

@ -0,0 +1,14 @@
import os
from pathlib import Path
from typing import Tuple
def get_current_test() -> Tuple[Path, str]:
current_test_env = os.getenv("PYTEST_CURRENT_TEST")
if current_test_env is None:
raise RuntimeError("PYTEST_CURRENT_TEST not set. Must be run under pytest.")
suite_path, case_name = current_test_env.split('::', 1)
case_name = case_name.split(' ', 1)[0]
return Path(suite_path).resolve(), case_name

View file

@ -0,0 +1,47 @@
from functools import wraps
from pathlib import Path
import os
import subprocess
import sys
from byteb4rb1e.testing.pytest import get_current_test
def run_in_subprocess_once():
"""
A decorator that reruns th test in a subprocess if not already inside one.
Requires pytest to be installed and test to be run by pytest.
For what? Anything that can't be done in a thread-safe manner, e.g. modifying PYTHON_PATH
"""
def decorator(test_func):
@wraps(test_func)
def wrapper(*args, **kwargs):
if os.environ.get("XPYTEST_INSIDE_SUBPROCESS") == "1":
return test_func(*args, **kwargs)
suite_path, case_name = get_current_test()
cmd = [
sys.executable,
"-m", "pytest",
f"{suite_path}::{case_name}",
]
result = subprocess.run(
cmd,
env={**os.environ, "XPYTEST_INSIDE_SUBPROCESS": "1"},
capture_output=True,
text=True,
)
if result.returncode != 0:
print(' '.join(cmd))
print("==== Subprocess stdout ====")
print(result.stdout)
print("==== Subprocess stderr ====")
print(result.stderr)
raise AssertionError(f"Subprocess test failed with exit code {result.returncode}")
return wrapper
return decorator

View file

@ -0,0 +1,44 @@
import os
from pathlib import Path
import sys
from typing import Dict, Tuple, Union
import pytest
from byteb4rb1e.testing.pytest import get_current_test
_SITE_PACKAGE_COUNTER: Dict[str, int] = {}
@pytest.fixture
def current_test() -> Tuple[Path, str]:
"""
"""
return get_current_test()
@pytest.fixture
def mock_system_site_package_dir(tmp_path):
global _SITE_PACKAGE_COUNTER
package_id = _SITE_PACKAGE_COUNTER.setdefault(tmp_path, 0)
_SITE_PACKAGE_COUNTER[tmp_path] += 1
sys_path = tmp_path / str(package_id)
def _create(name: str) -> Path:
pkg_path = sys_path / name.replace('.', os.path.sep)
pkg_path.mkdir(parents=True)
(pkg_path / "__init__.py").touch()
sys.path.insert(0, str(sys_path))
return pkg_path
yield _create
# cleanup sys.path after test
if str(sys_path) in sys.path:
sys.path.remove(str(sys_path))

View file

@ -0,0 +1,6 @@
"""Utilities for building composable CLIs from command dataclasses."""
from byteb4rb1e.utils.argparse.command import CLICommand
from byteb4rb1e.utils.argparse.dispatcher import CLI
__all__ = ["CLI", "CLICommand"]

View file

@ -0,0 +1,54 @@
"""Base command dataclass for composable CLI trees."""
from __future__ import annotations
from argparse import ArgumentParser
from dataclasses import dataclass, fields
from typing import Any, ClassVar, Dict, List, Optional, Type
@dataclass
class CLICommand:
"""Base class for CLI commands.
Subclasses define their identity (name, help, description) as
dataclass fields. These are passed as kwargs to
``subparsers.add_parser()``.
Override ``add_arguments`` to register flags and positionals.
Override ``execute`` to implement the command's logic.
Nest subcommands by setting ``_subcommands`` as a class variable.
"""
name: str = ""
help: str = ""
description: str = ""
_subcommands: ClassVar[List[Type[Command]]] = []
def add_arguments(self, parser: ArgumentParser) -> None:
"""Add arguments to the parser. Override in subclasses."""
def execute(self, args: Any) -> int:
"""Run the command. Override in subclasses.
Returns an exit code (0 = success).
"""
return 0
def parser_kwargs(self) -> Dict[str, Any]:
"""Return the dataclass fields as kwargs for add_parser.
Excludes ``name`` (used as the positional parser name) and
any empty-string fields so argparse defaults apply.
"""
skip = {"name"}
kwargs = {}
for f in fields(self):
if f.name in skip or f.name.startswith("_"):
continue
val = getattr(self, f.name)
if val != "":
kwargs[f.name] = val
return kwargs

View file

@ -0,0 +1,122 @@
"""CLI dispatcher — builds parser trees from command dataclasses."""
from __future__ import annotations
import logging
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from typing import Any, Dict, List, Optional, Type
from byteb4rb1e.utils.argparse.command import CLICommand
class CLI:
"""Composable CLI built from a tree of Command dataclasses.
Recursively bootstraps an argparse parser hierarchy and tracks
dest names so ``run()`` can dispatch to the correct leaf command
without dest chaining in the caller.
Usage::
cli = CLI(prog="repository", description="...")
cli.bootstrap([MirrorCommand, IndexCommand])
cli.run()
"""
def __init__(
self,
prog: Optional[str] = None,
description: str = "",
) -> None:
kwargs = {} # type: Dict[str, Any]
if prog:
kwargs["prog"] = prog
if description:
kwargs["description"] = description
kwargs.setdefault(
"formatter_class", ArgumentDefaultsHelpFormatter,
)
self.parser = ArgumentParser(**kwargs)
self._dests = [] # type: List[str]
self._commands = {} # type: Dict[str, Command]
def add_arguments(self, parser: ArgumentParser) -> None:
"""Add global arguments to the root parser."""
parser.add_argument(
"-v", "--verbose", action="count", default=0,
help="Increase verbosity (-v for INFO, -vv for DEBUG)",
)
def bootstrap(
self,
commands: List[Type[Command]],
) -> None:
"""Build the parser tree from a list of top-level commands."""
self.add_arguments(self.parser)
dest = "command"
self._dests.append(dest)
sub = self.parser.add_subparsers(dest=dest)
for cmd_cls in commands:
self._add(sub, cmd_cls, prefix="")
def _add(
self,
subparsers: Any,
cmd_cls: Type[Command],
prefix: str,
) -> None:
"""Recursively add a command and its subcommands."""
cmd = cmd_cls()
parser = subparsers.add_parser(
cmd.name,
formatter_class=ArgumentDefaultsHelpFormatter,
**cmd.parser_kwargs(),
)
cmd.add_arguments(parser)
key = "%s.%s" % (prefix, cmd.name) if prefix else cmd.name
self._commands[key] = cmd
if cmd._subcommands:
dest = "%s_command" % cmd.name
self._dests.append(dest)
child_sub = parser.add_subparsers(dest=dest)
for sc_cls in cmd._subcommands:
self._add(child_sub, sc_cls, prefix=key)
def _resolve(self, args: Any) -> Optional[Command]:
"""Walk dest chain to find the leaf command."""
parts = [] # type: List[str]
for dest in self._dests:
val = getattr(args, dest, None)
if val is None:
continue
parts.append(val)
if not parts:
return None
key = ".".join(parts)
return self._commands.get(key)
@staticmethod
def _setup_logging(verbosity: int) -> None:
if verbosity >= 2:
level = logging.DEBUG
elif verbosity >= 1:
level = logging.INFO
else:
level = logging.WARNING
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()],
)
def run(self) -> None:
"""Parse args and dispatch to the leaf command."""
args = self.parser.parse_args()
self._setup_logging(getattr(args, "verbose", 0))
cmd = self._resolve(args)
if cmd is None:
self.parser.print_help()
raise SystemExit(1)
raise SystemExit(cmd.execute(args))

View file

@ -0,0 +1,109 @@
#!/usr/bin/env python3
"""Generic HTTP client.
Thin urllib wrapper with retry-on-rate-limit. No domain knowledge
GitHub, Bitbucket, etc. are handled by higher-level modules.
"""
import json
import time
from typing import Any, Dict, Optional
import urllib.request
import urllib.parse
from warnings import warn
class HttpResponse:
def __init__(self, status: int, headers: dict, data: bytes, reason: str):
self.status_code = status
self.headers = headers
self.data = data
self.reason = reason
self.text = data.decode("utf-8", errors="replace")
def json(self):
return json.loads(self.data.decode("utf-8"))
def _request(
url: str,
method: str = "GET",
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
data: Optional[bytes] = None,
) -> HttpResponse:
# TODO: do proper exponential backoff
backoff = [1, 2, 4]
if params:
query = urllib.parse.urlencode(params)
url = f"{url}?{query}"
req = urllib.request.Request(
url,
headers=headers or {},
method=method,
data=data,
)
for delay in backoff:
try:
with urllib.request.urlopen(req, timeout=30) as resp:
status = resp.getcode()
resp_data = resp.read()
resp_headers = dict(resp.getheaders())
if status == 429:
warn(f"Rate-limited on {url} (HTTP {status})."
f" Backing off {delay}s...")
time.sleep(delay)
continue
return HttpResponse(
status, resp_headers, resp_data, resp.reason,
)
except urllib.error.HTTPError as e:
status = e.code
err_data = e.read()
err_headers = dict(e.headers.items())
if status == 429:
warn(f"Rate-limited on {url} (HTTP {status})."
f" Backing off {delay}s...")
time.sleep(delay)
continue
return HttpResponse(
status, err_headers, err_data, e.reason,
)
except urllib.error.URLError as e:
raise Exception(
"Network error on %s: %s", url, e,
) from e
# If all retries exhausted, return last error-like response
return HttpResponse(503, {}, b"", "Service unavailable")
def get(
url: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
) -> HttpResponse:
return _request(url, method="GET", params=params, headers=headers)
def post(
url: str,
data: Optional[bytes] = None,
headers: Optional[Dict[str, str]] = None,
) -> HttpResponse:
return _request(url, method="POST", headers=headers, data=data)
def put(
url: str,
data: Optional[bytes] = None,
headers: Optional[Dict[str, str]] = None,
) -> HttpResponse:
return _request(url, method="PUT", headers=headers, data=data)

View file

@ -1,7 +1,7 @@
from dataclasses import dataclass
from http.server import SimpleHTTPRequestHandler
from byteb4rb1e_utils.io import ChunksIO
from byteb4rb1e.utils.io import ChunksIO
@dataclass

View file

@ -8,12 +8,12 @@ from http.server import HTTPServer
from io import BytesIO, IOBase
from typing import Optional, Tuple, List
from byteb4rb1e_utils.http.server import (
from byteb4rb1e.utils.http.server import (
HandlerOptions,
MultipartUploadHandler,
ServerOptions,
)
from byteb4rb1e_utils.io import ChunksIO
from byteb4rb1e.utils.io import ChunksIO
__doc__ = """tsmuds - Tiara's Simple Multipart Upload Debugging Server

View file

@ -0,0 +1,78 @@
#!/usr/bin/env python3
"""Bitbucket Cloud REST API v2.0 wrapper.
Thin layer over http.py for Bitbucket-specific operations:
- Bearer token authentication
- Repository existence checks
- Repository creation within a workspace/project
"""
import json
from typing import Any, Dict, Optional
from byteb4rb1e.utils.http import client as http_client
BITBUCKET_API = "https://api.bitbucket.org/2.0"
def http_headers(token: str) -> Dict[str, str]:
"""Construct Bitbucket API headers with Bearer token auth."""
return {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
"Content-Type": "application/json",
}
def repository_exists(
workspace: str,
repo_slug: str,
token: str,
) -> bool:
"""Check whether a repository exists in the workspace."""
url = f"{BITBUCKET_API}/repositories/{workspace}/{repo_slug}"
resp = http_client.get(url, headers=http_headers(token))
return resp.status_code == 200
def create_repository(
workspace: str,
repo_slug: str,
token: str,
project: Optional[str] = None,
description: str = "",
is_private: bool = True,
) -> http_client.HttpResponse:
"""Create a new repository in the workspace.
When *project* is given the repository is assigned to that
Bitbucket project (by key). This is required for workspaces
that scope access keys at the project level.
Returns the API response. Caller should check status_code == 200
for success.
"""
url = f"{BITBUCKET_API}/repositories/{workspace}/{repo_slug}"
body: Dict[str, Any] = {
"scm": "git",
"is_private": is_private,
"description": description,
"fork_policy": "no_forks",
}
if project:
body["project"] = {"key": project}
return http_client.put(
url,
data=json.dumps(body).encode("utf-8"),
headers=http_headers(token),
)
def clone_url(
workspace: str,
repo_slug: str,
) -> str:
"""Return the SSH clone URL for a Bitbucket repository."""
return f"git@bitbucket.org:{workspace}/{repo_slug}.git"

View file

@ -0,0 +1,65 @@
#!/usr/bin/env python3
import hashlib
from pathlib import Path
from typing import Any, Dict, List, Optional
from byteb4rb1e.utils.http import client as http_client
GITHUB_API = "https://api.github.com"
def http_headers(token: Optional[str]) -> Dict[str, str]:
headers = {
"Accept": "application/vnd.github+json",
"User-Agent": "sphinx-h5p-worker1"
}
if token:
# Use standard PAT header; token not logged anywhere.
headers["Authorization"] = f"Bearer {token}"
return headers
def blob_sha(path: Path) -> str:
"""Calculate Git blob SHA-1 for a file, matching GitHub API 'sha'."""
data = path.read_bytes()
header = f"blob {len(data)}\0".encode("utf-8")
store = header + data
return hashlib.sha1(store).hexdigest()
def list_org_repos(org: str, token: Optional[str]) -> List[Dict[str, Any]]:
repos: List[Dict[str, Any]] = []
page = 1
per_page = 100
while True:
url = f"{GITHUB_API}/orgs/{org}/repos"
resp = http_client.get(
url,
params={"page": page, "per_page": per_page, "type": "public"},
headers=http_headers(token),
)
if resp.status_code != 200:
raise RuntimeError(f"Failed to list repos for org {org}: {resp.status_code} {resp.text}")
batch = resp.json()
if not batch:
break
repos.extend(batch)
page += 1
return repos
def fetch_file(
org: str,
repo: str,
path: str,
token: str
) -> http_client.HttpResponse:
"""
"""
url = f"{GITHUB_API}/repos/{org}/{repo}/{path}"
return http_client.get(
url,
headers=http_headers(token),
)

View file

@ -0,0 +1,91 @@
from typing import Optional
class RollingHash:
"""implementation of Rabin-Karp rolling hash
"""
#: default base
base: int = 31
#: default modulus
mod: int = 10**9 + 7
#: current computed hash
_hash: int
#: prime number base (e.g., 31)
_base: int
#: large prime modulus (to prevent overflow)
_mod: int
# Precomputation of ``base^(length-1) % mod`` for removing the old byte when
# rolling over
_hbase_factor: int
def __init__(
self,
data: bytes,
base: Optional[int] = None,
mod: Optional[int] = None
):
"""Initialize the rolling hash with a given base and modulus.
base: Prime number base (e.g., 31)
mod: Large prime modulus to prevent overflow
length: Length of the pattern to match
"""
self._base = base if base else RollingHash.base
self._mod = mod if mod else RollingHash.mod
self._hash = RollingHash.compute_initial_hash(
data,
self._base,
self._mod
)
self._hbase_factor = pow(self._base, len(data) - 1, self._mod)
@staticmethod
def compute_initial_hash(
data: bytes,
base: int,
mod: int,
) -> int:
"""Compute the hash for the initial window (first `length` bytes).
rather use this standalone for computing the hash of the search pattern,
to avoid the overhead of instantiating an object.
:param data: data to build hash for
:param base:
:param: mod:
:returns: hash of data
"""
hash_ = 0
for i in range(len(data)):
# computing the modulus at each iteration, as to avoid the summed
# integer to be chunky, as in HUUUUGEE...
hash_ = (hash_ * base + data[i]) % mod
return hash_
def roll(self, old_byte: int, new_byte: int) -> int:
"""Efficiently update hash by removing ``old_byte`` and adding
``new_byte``
The old_byte removal uses a pre-computed value of the highest base used
in the polynomial calculation. This speeds things up a bit.
I was thinking about a way on how to store the old_byte efficiently
within the class object, but that would require storing the entire data,
basically doubling the memory consumption as the data must definetly
also live outside of the class object. A memoryview could solve this
problem, but at the cost of making the implementation more complex, so
this will have to do.
:param old_byte: The ordinal of the first byte in buffer to roll over
:param new_byte: The ordinal of the byte newly appended to the buffer
"""
# Remove old
self._hash = (self._hash - old_byte * self._hbase_factor) % self.mod
# Add new
self._hash = (self._hash * self.base + new_byte) % self.mod
return self._hash

View file

@ -0,0 +1,41 @@
import email
import importlib.resources
import mimetypes
from urllib.request import URLError
import urllib.request
class PkgHandler(urllib.request.BaseHandler):
"""
"""
def pkg_open(self, req) -> urllib.request.addinfourl:
pkg_files = importlib.resources.files(req.host)
try:
fh = next(
pkg_files.glob(req.selector.lstrip('//'))
).open('rb')
except Exception as e:
raise URLError(f'{e.__class__.__name__}: {e}') from e
fh.seek(0, 2);
size = fh.tell();
fh.seek(0);
mtype, compression = mimetypes.guess_type(req.selector)
if compression and mtype:
mtype = f"{mtype}+{compression}"
headers = email.message_from_string(
'Content-Type: %s\nContent-Length: %d\n' %
(mtype or 'text/plain', size)
)
if not mtype or mtype.startswith('text/'):
fh.close()
fh = next(
pkg_files.glob(req.selector.lstrip('//'))
).open('r')
return urllib.request.addinfourl(fh, headers, None)

View file

@ -0,0 +1,345 @@
#!/usr/bin/env python3
"""Git subprocess wrapper for repository operations.
Provides primitives for mirror cloning, syncing, remote management,
file extraction from bare repos, and submodule management.
No pygit2 or gitpython, uses subprocess only.
"""
import logging
import subprocess
from pathlib import Path
from typing import List, Optional
logger = logging.getLogger(__name__)
class GitError(Exception):
"""A git subprocess returned a non-zero exit code."""
def __init__(self, args: List[str], returncode: int, stderr: str):
self.args_list = args
self.returncode = returncode
self.stderr = stderr
super().__init__(
f"git exited {returncode}: {' '.join(args)}\n{stderr}"
)
def parse_base_url(base_url: str) -> str:
"""Extract workspace from an SCP-style Bitbucket base URL.
The host part must be exactly ``bitbucket.org`` bootstrapping
requires the Bitbucket API, so other hosts are rejected.
>>> _parse_base_url("git@bitbucket.org:byteb4rb1e")
'byteb4rb1e'
"""
# SCP-style: git@bitbucket.org:workspace
if ":" not in base_url or "//" in base_url:
raise ValueError(
f"Expected SCP-style URL (git@bitbucket.org:workspace), "
f"got: {base_url}"
)
host_part, workspace = base_url.split(":", 1)
# host_part is e.g. "git@bitbucket.org"
host = host_part.split("@", 1)[-1]
if host != "bitbucket.org":
raise ValueError(
f"Mirror base URL must target bitbucket.org, "
f"got host: {host}"
)
return Path(workspace).parent
def parse_repo_name(base_url: str) -> str:
"""Extract workspace from an SCP-style Bitbucket base URL.
The host part must be exactly ``bitbucket.org`` bootstrapping
requires the Bitbucket API, so other hosts are rejected.
>>> _parse_base_url("git@bitbucket.org:byteb4rb1e")
'byteb4rb1e'
"""
# SCP-style: git@bitbucket.org:workspace
if ":" not in base_url or "//" in base_url:
raise ValueError(
f"Expected SCP-style URL (git@bitbucket.org:workspace), "
f"got: {base_url}"
)
host_part, workspace = base_url.split(":", 1)
# host_part is e.g. "git@bitbucket.org"
host = host_part.split("@", 1)[-1]
if host != "bitbucket.org":
raise ValueError(
f"Mirror base URL must target bitbucket.org, "
f"got host: {host}"
)
return Path(workspace).name.split('.')[0]
def _run(
args: List[str],
cwd: Optional[Path] = None,
capture_stdout: bool = False,
) -> subprocess.CompletedProcess: # type: ignore[type-arg]
"""Run a git command, raising GitError on failure."""
cmd = ["git"] + args
logger.debug("$ %s", " ".join(cmd))
result = subprocess.run(
cmd,
cwd=cwd,
capture_output=True,
text=True,
)
if result.returncode != 0:
raise GitError(cmd, result.returncode, result.stderr.strip())
return result
def mirror_clone(source_url: str, dest: Path) -> None:
"""Clone a repository as a bare mirror.
Equivalent to ``git clone --mirror <source_url> <dest>``.
The destination directory must not already exist.
"""
_run(["clone", "--mirror", source_url, str(dest)])
logger.info("Cloned mirror %s%s", source_url, dest)
def add_remote(repo: Path, name: str, url: str) -> None:
"""Add a named remote to a bare repository."""
_run(["remote", "add", name, url], cwd=repo)
logger.debug("Added remote %s%s in %s", name, url, repo)
def has_remote(repo: Path, name: str) -> bool:
"""Check whether a named remote exists."""
result = _run(["remote"], cwd=repo)
return name in result.stdout.splitlines()
def mirror_update(repo: Path) -> None:
"""Fetch all remotes in a bare mirror repository.
Equivalent to ``git remote update`` inside the bare repo.
"""
_run(["remote", "update"], cwd=repo)
logger.debug("Updated remotes in %s", repo)
def fetch(repo: Path, remote: str = "origin") -> None:
"""Fetch from a single remote."""
_run(["fetch", remote], cwd=repo)
logger.debug("fetched %s in %s", remote, repo)
def show_ref(repo: Path) -> str:
"""Return the raw output of ``git show-ref`` (all refs + SHAs).
Returns an empty string if the repo has no refs.
"""
try:
result = _run(["show-ref"], cwd=repo)
return result.stdout
except GitError:
return ""
def ls_remote(repo: Path, remote: str) -> str:
"""Return the raw output of ``git ls-remote <remote>``.
Returns an empty string if the remote has no refs or on error.
"""
try:
result = _run(["ls-remote", remote], cwd=repo)
return result.stdout
except GitError:
return ""
def mirror_push(repo: Path, remote: str) -> None:
"""Push the full mirror to a remote.
Equivalent to ``git push --mirror <remote>``.
"""
_run(["push", "--mirror", remote], cwd=repo)
logger.info("Pushed mirror to %s from %s", remote, repo)
def read_file(
repo: Path,
filepath: str,
ref: str = "HEAD",
) -> Optional[str]:
"""Extract a file's contents from a bare repo without checkout.
Returns the file content as a string, or None if the file does
not exist at the given ref.
"""
try:
result = _run(
["show", f"{ref}:{filepath}"],
cwd=repo,
capture_stdout=True,
)
return result.stdout
except GitError:
return None
# -------------------------------------------------------------------
# Ref / tag primitives
# -------------------------------------------------------------------
def list_tags(repo: Path) -> List[str]:
"""List all tags in a repository."""
result = _run(["tag", "-l"], cwd=repo)
return [t for t in result.stdout.splitlines() if t]
def resolve_ref(repo: Path, ref: str) -> str:
"""Resolve a ref to a full SHA.
Raises GitError if the ref cannot be resolved.
"""
result = _run(
["rev-parse", ref], cwd=repo, capture_stdout=True,
)
return result.stdout.strip()
def head_ref(repo: Path) -> str:
"""Return the full SHA of HEAD."""
return resolve_ref(repo, "HEAD")
# -------------------------------------------------------------------
# Pull-through bare clone cache
# -------------------------------------------------------------------
def bare_path_for_url(url: str, cache_dir: Path) -> Path:
"""Derive a cache path from a clone URL.
Strips scheme/host, keeps the path component, appends ``.git``.
Examples::
https://github.com/h5p/h5p-multi-choice
cache_dir / h5p / h5p-multi-choice.git
git@github.com:h5p/h5p-multi-choice.git
cache_dir / h5p / h5p-multi-choice.git
"""
# Handle SCP-style URLs (git@host:path)
if ":" in url and "//" not in url:
path_part = url.split(":", 1)[1]
else:
# Strip scheme + host
from urllib.parse import urlparse
parsed = urlparse(url)
path_part = parsed.path.lstrip("/")
# Strip trailing .git if present, then re-add it
if path_part.endswith(".git"):
path_part = path_part[:-4]
return cache_dir / (path_part + ".git")
def ensure_bare_clone(url: str, cache_dir: Path) -> Path:
"""Ensure a bare mirror clone exists in *cache_dir*.
If the bare repo already exists, fetches updates via
``mirror_update``. Otherwise, creates a new mirror clone.
Returns the path to the bare repo.
"""
bare_path = bare_path_for_url(url, cache_dir)
if bare_path.exists():
mirror_update(bare_path)
logger.debug("Updated existing cache %s", bare_path)
else:
bare_path.parent.mkdir(parents=True, exist_ok=True)
mirror_clone(url, bare_path)
logger.info("Cached new bare clone %s", bare_path)
return bare_path
# -------------------------------------------------------------------
# Submodule operations
# -------------------------------------------------------------------
def has_submodule(repo: Path, path: str) -> bool:
"""Check whether a submodule is registered at *path*.
Reads ``.gitmodules`` to determine whether the submodule exists.
*path* is resolved relative to *repo*, then compared against
the repository root so the check works when *repo* is a
subdirectory of the actual git working tree.
Returns False if ``.gitmodules`` does not exist.
"""
try:
toplevel = Path(
_run(
["rev-parse", "--show-toplevel"], cwd=repo,
).stdout.strip()
)
except GitError:
return False
gitmodules = toplevel / ".gitmodules"
if not gitmodules.is_file():
return False
# Resolve the full path relative to the repo root
full_path = (repo / path).resolve()
try:
rel_path = str(full_path.relative_to(toplevel.resolve()))
except ValueError:
return False
try:
result = _run(
["config", "--file", str(gitmodules),
"--get-regexp", r"submodule\..*\.path"],
cwd=toplevel,
)
except GitError:
return False
for line in result.stdout.splitlines():
parts = line.split(None, 1)
if len(parts) == 2 and parts[1] == rel_path:
return True
return False
def submodule_add(repo: Path, url: str, path: str) -> None:
"""Add a git submodule at *path* pointing to *url*.
Equivalent to ``git submodule add <url> <path>`` inside *repo*.
"""
_run(["submodule", "add", url, path], cwd=repo)
logger.info("Added submodule %s%s", url, path)
def submodule_update(repo: Path, path: str) -> None:
"""Fetch and update a submodule to the latest remote HEAD.
Enters the submodule directory, fetches origin, and checks out
the latest commit on the remote default branch.
"""
sub_path = repo / path
_run(["fetch", "origin"], cwd=sub_path)
# Determine default branch from remote HEAD
result = _run(
["symbolic-ref", "refs/remotes/origin/HEAD",
"--short"],
cwd=sub_path,
)
default_branch = result.stdout.strip()
_run(["checkout", default_branch], cwd=sub_path)
logger.info("Updated submodule %s to %s", path, default_branch)
def submodule_checkout(repo: Path, path: str, ref: str) -> None:
"""Fetch and checkout a specific ref in a submodule."""
sub_path = repo / path
_run(["fetch", "origin"], cwd=sub_path)
_run(["checkout", ref], cwd=sub_path)
logger.info("Checked out submodule %s at %s", path, ref)

View file

@ -1,228 +0,0 @@
from dataclasses import dataclass
import math
from typing import List, Optional, Tuple
class RollingHash:
"""implementation of Rabin-Karp rolling hash
"""
#: default base
base: int = 31
#: default modulus
mod: int = 10**9 + 7
#: current computed hash
_hash: int
#: prime number base (e.g., 31)
_base: int
#: large prime modulus (to prevent overflow)
_mod: int
# Precomputation of ``base^(length-1) % mod`` for removing the old byte when
# rolling over
_hbase_factor: int
def __init__(
self,
data: bytes,
base: Optional[int] = None,
mod: Optional[int] = None
):
"""Initialize the rolling hash with a given base and modulus.
base: Prime number base (e.g., 31)
mod: Large prime modulus to prevent overflow
length: Length of the pattern to match
"""
self._base = base if base else RollingHash.base
self._mod = mod if mod else RollingHash.mod
self._hash = RollingHash.compute_initial_hash(
data,
self._base,
self._mod
)
self._hbase_factor = pow(self._base, len(data) - 1, self._mod)
@staticmethod
def compute_initial_hash(
data: bytes,
base: int,
mod: int,
) -> int:
"""Compute the hash for the initial window (first `length` bytes).
rather use this standalone for computing the hash of the search pattern,
to avoid the overhead of instantiating an object.
:param data: data to build hash for
:param base:
:param: mod:
:returns: hash of data
"""
hash_ = 0
for i in range(len(data)):
# computing the modulus at each iteration, as to avoid the summed
# integer to be chunky, as in HUUUUGEE...
hash_ = (hash_ * base + data[i]) % mod
return hash_
def roll(self, old_byte: int, new_byte: int) -> int:
"""Efficiently update hash by removing ``old_byte`` and adding
``new_byte``
The old_byte removal uses a pre-computed value of the highest base used
in the polynomial calculation. This speeds things up a bit.
I was thinking about a way on how to store the old_byte efficiently
within the class object, but that would require storing the entire data,
basically doubling the memory consumption as the data must definetly
also live outside of the class object. A memoryview could solve this
problem, but at the cost of making the implementation more complex, so
this will have to do.
:param old_byte: The ordinal of the first byte in buffer to roll over
:param new_byte: The ordinal of the byte newly appended to the buffer
"""
# Remove old
self._hash = (self._hash - old_byte * self._hbase_factor) % self.mod
# Add new
self._hash = (self._hash * self.base + new_byte) % self.mod
return self._hash
@dataclass
class ChunkedRollingHashOptions:
"""
"""
max_chunk_size: int = 10
base: int = RollingHash.base
mod: int = RollingHash.mod
class ChunkedRollingHash:
"""Chunked Rolling hash for linear and circular buffers
This implementation was inspired by the Rabin-Karp rolling hash
algorithm.
A search pattern is chunked and for each chunk its hash is calculated.
I came up with this approach as the requirement for efficient RFC1341 HTTP
multipart entity boundary matching for stream data in a circular/ring
buffer. I've tested a couple of algorithms, but none gave me any real
performance improvements over a naive/bruteforce search.
That's how this algorithm came to be. Big O? I don't know (yet)...
Why this is more performant for my specific use-cases?
------------------------------------------------------
#. Precompute hashes for evenly sized chunks of a search pattern, in
addition of a hash of the full search-pattern.
#. First, match only the hash of the first chunk → immediately skip
unnecessary buffer sections if no match.
#. If the first chunk matches, progressively verify subsequent chunks,
until the full search pattern is confirmed.
Benefits Over Full Matching
---------------------------
- Reduces comparisons significantly eliminates large sections early when
non-matches occur.
- Balances preprocessing cost vs runtime faster elimination means fewer
wasted cycles.
- Integrates seamlessly into circular buffers allows skipping
intelligently.
"""
_chunk_count: int
#: hashes of chunks of search string
_chunks_hash: List[int]
#: hash of the full search string
_hash: int
#: length of search string
_length: int
#: remainder for calculating the actual size of the last chunk
_remainder: int
_base: int
_mod: int
def __init__(
self,
data: bytes,
options: ChunkedRollingHashOptions = ChunkedRollingHashOptions()
):
"""
"""
self._base = options.base
self._mod = options.mod
self._max_chunk_size = options.max_chunk_size
self._chunks_hash = []
self._hash = RollingHash.compute_initial_hash(
data,
base = self._base,
mod = self._mod
)
self._length = len(data)
# only the last chunk differs in size; store its remainder separately
# for optimized handling
self._remainder = self._length % self._max_chunk_size
self._chunk_count = math.ceil(self._length / self._max_chunk_size)
# tracks chunk progression during matching
self._current = 0
# precompute hashes for all chunks to enable rapid comparison
for i in range(0, self._chunk_count):
chunk = data[i*self._max_chunk_size:(i+1)*self._max_chunk_size]
self._chunks_hash.append(
RollingHash.compute_initial_hash(chunk, base=self._base, mod=self._mod)
)
def match(
self,
data: bytes
):
"""match a buffer against a search string through chunked hashing
"""
# progressively match each chunk
for i in range(self._current, self._chunk_count - 1):
chunk = data[i*self._max_chunk_size:(i+1)*self._max_chunk_size]
# no more data left to process
if chunk == b'': break
chunk_hash = RollingHash.compute_initial_hash(
chunk,
base = self._base,
mod = self._mod
)
if chunk_hash != self._chunks_hash[i]:
self._current = 0
return False
self._current += 1
# processing hasn't completed for last chunk to be processed yet
if self._current != self._chunk_count - 1:
return
last_chunk = data[-self._remainder:]
last_chunk_hash = RollingHash.compute_initial_hash(
last_chunk,
base = self._base,
mod = self._mod
)
if self._chunks_hash[self._current] == last_chunk_hash:
return True
self._current = 0
return False

View file

@ -0,0 +1,33 @@
import os
from pathlib import Path
import pytest
pytestmark = pytest.mark.pytest
from byteb4rb1e.testing.pytest import get_current_test
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
class Test_get_current_test:
"""
"""
def test_default(self):
"""
"""
os.environ['PYTEST_CURRENT_TEST'] = 'foo::bar (something)'
result = get_current_test()
assert isinstance(result[0], Path)
assert str(result[0].name) == 'foo'
assert result[1] == 'bar'
def test_invalid(self):
"""
"""
del os.environ['PYTEST_CURRENT_TEST']
with pytest.raises(RuntimeError):
get_current_test()

View file

@ -0,0 +1,21 @@
from pathlib import Path
import pytest
pytestmark = pytest.mark.pytest
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
@run_in_subprocess_once()
def test_run_in_subprocess_once(tmp_path):
marker = tmp_path / "executed_in_subprocess.txt"
if marker.exists():
raise AssertionError("Marker file exists before test logic ran (shouldn't happen in parent process)")
# Create proof of execution
marker.write_text("Subprocess was here.")
# Now assert it
assert marker.exists()

View file

@ -0,0 +1,38 @@
from pathlib import Path
import importlib.resources
import pytest
pytestmark = pytest.mark.pytest
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
from byteb4rb1e.testing.pytest.fixtures import (
current_test,
mock_system_site_package_dir
)
def test_current_test(current_test):
"""
"""
suite_path, case_name = current_test
assert str(Path(__file__)) == str(suite_path)
assert case_name == "test_current_test"
@run_in_subprocess_once()
def test_mock_system_site_package_dir(mock_system_site_package_dir):
"""
"""
dummy_data = 'Hello'
pkgdir = mock_system_site_package_dir('foobarpkg')
(pkgdir / 'data.txt').write_text(dummy_data)
assert (pkgdir / '__init__.py').exists()
result = next(importlib.resources.files('foobarpkg').glob('data.txt')).read_text()
assert result == dummy_data

View file

@ -0,0 +1,5 @@
def pytest_configure(config):
# register an additional marker
config.addinivalue_line(
"markers", "pytest: test pytest integration"
)

View file

View file

@ -1,6 +1,6 @@
import unittest
from byteb4rb1e_utils.collections import CircularBuffer
from byteb4rb1e.utils.collections import CircularBuffer
class test_init(unittest.TestCase):
"""CircularBuffer.__init__()"""

View file

@ -1,7 +1,7 @@
from io import BytesIO, IOBase
import unittest
from byteb4rb1e_utils.io import ChunksIO
from byteb4rb1e.utils.io import ChunksIO
class TestGetChunkSize(unittest.TestCase):

View file

@ -1,6 +1,6 @@
import unittest
from byteb4rb1e_utils.string import RollingHash
from byteb4rb1e.utils.string import RollingHash
class test_compute_initial_hash(unittest.TestCase):
"""RollingHash.compute_initial_hash()

View file

@ -0,0 +1,93 @@
import os.path
import sys
import urllib.request
import pytest
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
from byteb4rb1e.testing.pytest.fixtures import mock_system_site_package_dir
from byteb4rb1e.utils.urllib.request import PkgHandler
class TestPkgHandler:
"""
"""
@run_in_subprocess_once()
def test_text(self, mock_system_site_package_dir):
"""
"""
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
PkgHandler()
)
dummy_data = 'Hello'
pkg_dir = mock_system_site_package_dir('foobarpkg')
(pkg_dir / 'data.txt').write_text(dummy_data)
result = _opener.open('pkg://foobarpkg/data.txt').readline()
assert isinstance(result, str)
assert result == dummy_data
@run_in_subprocess_once()
def test_bytes(self, mock_system_site_package_dir):
"""
"""
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
PkgHandler()
)
dummy_data = b'foobar123'
pkg_dir = mock_system_site_package_dir('foobarpkg')
(pkg_dir / 'data.bin').write_bytes(dummy_data)
result = _opener.open('pkg://foobarpkg/data.bin').readline()
assert isinstance(result, bytes)
assert result == dummy_data
@run_in_subprocess_once()
def test_subdir(self, mock_system_site_package_dir):
"""
"""
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
PkgHandler()
)
dummy_data = 'foobar123'
pkg_dir = mock_system_site_package_dir('foobarpkg')
dummy_file = (pkg_dir / 'foo' / 'bar' / 'data.txt')
dummy_file.parent.mkdir(parents=True)
dummy_file.write_text(dummy_data)
result = _opener.open('pkg://foobarpkg/foo/bar/data.txt').readline()
assert result == dummy_data
@run_in_subprocess_once()
def test_nested_module(self, mock_system_site_package_dir):
"""
"""
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
PkgHandler()
)
dummy_data = 'foobar123'
pkg_dir = mock_system_site_package_dir('foo.bar.pkg')
dummy_file = (pkg_dir / 'dummy' / 'data.txt')
dummy_file.parent.mkdir(parents=True)
dummy_file.write_text(dummy_data)
result = _opener.open('pkg://foo.bar.pkg/dummy/data.txt').readline()
assert result == dummy_data

View file

@ -1,56 +0,0 @@
import unittest
from byteb4rb1e_utils.string import (
ChunkedRollingHash,
ChunkedRollingHashOptions,
RollingHash,
)
class test___init__(unittest.TestCase):
"""ChunkedRollingHash.__init__()"""
def test_default(self):
"""default options"""
result = ChunkedRollingHash(b'abcdefgh')
self.assertEqual(result._mod, ChunkedRollingHashOptions.mod)
self.assertEqual(result._base, ChunkedRollingHashOptions.base)
self.assertEqual(result._max_chunk_size, ChunkedRollingHashOptions.max_chunk_size)
control_hash = RollingHash.compute_initial_hash(
b'abcdefgh',
base = result._base,
mod = result._mod
)
self.assertEqual(result._length, 8)
self.assertEqual(result._chunk_count, 1)
self.assertEqual(len(result._chunks_hash), result._chunk_count)
self.assertEqual(result._hash, control_hash)
self.assertEqual(result._chunks_hash[0], control_hash)
def test_override(self):
"""override of options"""
options = ChunkedRollingHashOptions(
mod = 4,
base = 10,
max_chunk_size = 5,
)
result = ChunkedRollingHash(b'abcdefgh', options)
self.assertEqual(result._mod, options.mod)
self.assertEqual(result._base, options.base)
self.assertEqual(result._max_chunk_size, options.max_chunk_size)
control_hash1 = RollingHash.compute_initial_hash(
b'abcde',
base = result._base,
mod = result._mod
)
control_hash2 = RollingHash.compute_initial_hash(
b'fgh',
base = result._base,
mod = result._mod
)
self.assertEqual(result._chunks_hash[0], control_hash1)
self.assertEqual(result._chunks_hash[1], control_hash2)

54
tox.ini Normal file
View file

@ -0,0 +1,54 @@
[tox]
requires =
tox>=4.19
env_list =
unit-py3{9-13}
integration-py3{9-13}-pytest8
lint
format
[testenv]
deps =
.
[testenv:lint]
description = run type check on code base
labels = static
deps =
mypy
commands =
mypy src tests --junit-xml test-reports/{env_name}.xml
[testenv:audit]
description = run type check on code base
labels = audit
deps =
pip-audit
commands =
pip-audit .
[testenv:format]
description = run type check on code base
labels = static
deps =
autopep8
commands =
autopep8 --diff --exit-code src tests
[testenv:unit-py3{9-13}]
description = run type check on code base
labels = unit
deps =
{[testenv]deps}
pytest
commands =
pytest tests/unit --junitxml=test-reports/{env_name}.xml
[testenv:integration-py3{9-13}-pytest8]
description = run pytest integration tests
labels = integration
deps =
{[testenv]deps}
pytest8: pytest>=8.0,<=9.0
commands =
pytest tests/integration -m pytest --junitxml=test-reports/{env_name}.xml