Compare commits

..

1 commit

Author SHA1 Message Date
Rodney, Tiara
e5f0d1df58
feat(string): init ChunkedRollingHash 2025-05-06 16:39:44 +02:00
53 changed files with 3305 additions and 3867 deletions

2
.gitignore vendored
View file

@ -11,5 +11,3 @@
/configure~
*.swo
*.swp
/test-reports/
/.tox/

View file

@ -1,122 +0,0 @@
# Development
> All changes MUST follow the vendor/tiara-gitflow-spec.git and no work MUST be
> started without a TODO issue.
## Prerequisites
- Python 3.9+
- [Pipenv](https://pipenv.pypa.io/)
- [tox](https://tox.wiki/) (installed via Pipenv dev dependencies)
- Node.js (for the `@byteb4rb1e/mime-todo` issue tracker CLI)
## Setup
Iniitialize Git submodules:
```bash
git submodule update --init --remote --recursive
```
Install dependencies (includes the package in editable mode):
```bash
pipenv install --dev
```
## Tooling
### Package
The project is packaged as `byteb4rb1e.utils` under a namespace package
layout (`src/byteb4rb1e/utils/`). It is installed in editable mode via
Pipenv.
Build a distribution:
```bash
pipenv run dist
```
### Testing
Tests are managed by tox. Test environments are defined in `tox.ini`:
```bash
# run all test suites
tox
# run specific environments
tox -e unit-py313
tox -e lint
tox -e format
```
| Environment | Purpose |
|---|---|
| `unit-py3{9-13}` | Unit tests |
| `smoke-py3{9-13}` | Smoke tests |
| `integration-py3{9-13}` | Integration tests |
| `lint` | Type checking (mypy) |
| `format` | Code style (autopep8) |
| `audit` | Dependency audit (pip-audit) |
### Issue tracker
Issues are tracked in the `TODO` file using the
[MIME TODO](https://specs.code.tiararodney.com/mime-todo/) format. Use the
`@byteb4rb1e/mime-todo` CLI to interact with it:
```bash
# list issues
npx @byteb4rb1e/mime-todo list
# show a specific issue
npx @byteb4rb1e/mime-todo show 3
# create an issue
npx @byteb4rb1e/mime-todo create --type feature --title "Title" --plan "Description" --module homeostat
```
See [CONTRIBUTING.md](CONTRIBUTING.md) for the full issue lifecycle.
### Publishing
Build wheel and source distributions:
```sh
pipenv run sdist
```
Configure publishing options:
`~/.pypirc`
```
[distutils]
index-servers =
tiararodney
[tiararodney]
repository: https://pypi.code.tiararodney.com/root/byteb4rb1e/
username: <username>
password: <password>
```
Publish to pypi.code.tiararodney.com:
```sh
pipenv run sdist:publish:tiarardoney
```
## Project layout
```
src/byteb4rb1e/utils/ # package source
tests/ # test suites (unit/, smoke/, integration/)
vendor/ # vendored specs
dist/ # sdist and wheel build output
DEVELOPMENT.md # this file
TODO # issue tracker (MIME TODO format)
```

View file

24
Makefile Normal file
View file

@ -0,0 +1,24 @@
.PHONY: chore configure
chore: configure Pipfile.lock requirements-dev.txt
Pipfile.lock: .venv Pipfile
.venv/bin/pipenv lock
requirements-dev.txt: .venv Pipfile.lock
.venv/bin/pipenv requirements --dev-only > requirements-dev.txt
configure: configure.ac
autoconf
.venv: requirements-dev.txt
python3 -m venv .venv
.venv/bin/python3 -m pip install --upgrade pip
.venv/bin/pip install -r requirements-dev.txt
test-reports:
.venv/bin/python3 -m unittest discover -v
build: .venv/bin/pipenv
.venv/bin/pipenv run build

88
NOTES Normal file
View file

@ -0,0 +1,88 @@
These are just a couple of brain farts that came up and I'd rather note down.
There's no clear structure.
RFC 1341 Boundary Matching in a Circular Buffer
1. Algorithm Considerations
Knuth-Morris-Pratt (KMP) Limitations:
Useful when patterns have prefix-suffix overlaps for efficient skipping.
If the failure table consists only of zeros, KMP provides no speed advantage
over naive searching.
Boundary pattern is arbitrary, meaning KMPs preprocessing may not be
beneficial.
Alternatives to KMP:
Rabin-Karp rolling hash → Uses fast hash comparisons instead of
character-by-character matching.
Boyer-Moore-Horspool → Precomputes skip distances to avoid redundant
comparisons, works well for longer patterns.
Crochemore-Perrin two-way search → used by str.find(), flexible
but assumes a linear memory layout so not really applicable for my circular
buffer approach
2. Boundary Characteristics
Max length: 70 bytes. Character set: ASCII only. No structure guarantees: The
boundary is client-defined, so I must be able to handle arbitrary sequences.
3. Algorithm Selection
Rolling Hash → Best for arbitrary short-to-medium patterns in a circular buffer.
Boyer-Moore → Ideal if the boundary has distinct character distributions to
optimize skipping.
# Optimized Chunk-Based Rolling Hash Matching
We need to efficiently detect an RFC 1341 multipart boundary inside a circular
buffer, ensuring minimal overhead while avoiding unnecessary comparisons.
Traditional approaches like Knuth-Morris-Pratt (KMP) dont provide an advantage
when the boundary lacks repeated subpatterns. Meanwhile, full rolling hash
matching scans every byte, which can be wasteful.
Thus, we introduce a chunk-wise hash-based skipping strategy, allowing us to
skip large sections of the buffer when an early non-match is detected.
## Core Idea
Precompute hashes for evenly sized chunks of the boundary. -> First, match only
the hash of the first chunk → immediately skip unnecessary buffer sections if no
match. -> If the first chunk matches, progressively verify subsequent chunks
until the full boundary is confirmed. Benefits Over Full Matching
## Benefits Over Full Matching
- Reduces comparisons significantly → eliminates large sections early when
non-matches occur.
- Balances preprocessing cost vs runtime → faster
elimination means fewer wasted cycles.
Integrates seamlessly into circular buffers → allows skipping intelligently.
### Precompute Chunk Hashes
- Divide the pattern into `N` equal-sized chunks (e.g., 7 chunks of 10 bytes
for a 70-byte boundary).
- Compute a rolling hash for each chunk in addition to the full pattern, storing
them for quick lookup.
### Sliding Window Search in the Buffer
- Compute the rolling hash for each window of size chunk_size.
- Compare the first chunks hash with the buffer window.
- If no match, skip boundary_length - chunk_size bytes.
### Progressive Chunk Verification
- If the first chunk matches, verify the next chunk sequentially.
- Continue matching chunks until the full boundary is confirmed.
- Perform final character-by-character validation to rule out hash collisions.

20
Pipfile
View file

@ -4,25 +4,17 @@ verify_ssl = true
name = "pypi"
[dev-packages]
mypy = "~=1.15.0"
autopep8 = "~=2.3.2"
setuptools-scm = "~=8.2.0"
pylint = "~=3.3.6"
build = "*"
pipenv = "*"
tox = "*"
twine = "*"
pypi-attestations = "*"
autopep8 = "*"
byteb4rb1e-utils = { editable = true, path = '.'}
[requires]
python_version = "3"
python_version = "3.11"
[scripts]
"dist" = "python3 -m build"
"dist:attestations" = "python3 -m pypi_attestations sign dist/*"
"dist:publish:tiararodney" = "python3 -m twine upload --sign --repository tiararodney dist/*"
"test" = "tox"
"test:static" = "tox run -m static"
"test:unit" = "tox run -m unit"
"test:integration" = "tox run -m integration"
"build" = "python3 -m build"
[packages]
"byteb4rb1e.utils" = {file = ".", editable = true}

948
Pipfile.lock generated

File diff suppressed because it is too large Load diff

302
TODO
View file

@ -1,52 +1,91 @@
--ISSUE
Content-Type: application/sprints
Sprints:
# TODO List for esm-logging
This is a poor-man's issue tracker. I am not primarily a GitHub user so don't
want to commit to their issue tracking feature, but my primary SVC service
provider (Bitbucket) only offers paid integration into their issue tracker
(Jira). I don't have the time (and patience) at the moment to analyze the best
approach, so this file will have to suffice.
It's a very simple concept: Track any issues (features, bugfixes, hotfixes) in
here, assign a sequential number to it and use that number when branching.
I will try to develop a format so that I can parse the file later on, should I
decide to migrate to a real issue tracker. It's probably going to be Bugzilla,
but for that my html-theme-ref project needs to stabilize first.
## Format Specification
The file uses Markdown conventions for formatting headers and other text block
entitities, but SHOULD NOT be considered a Markdown file. That's why it has no
definitive file extension.
Each issue entry follows a structured format for easier parsing and future
migration. Issues MUST be **appended** to this file and never moved, to
preserve Git diffing.
### Issue Format
```
ID: [ISSUE-NUMBER]
Type: [feature/bugfix/hotfix]
Title: [Short title]
Status: [open/in-progress/done/hold/cancelled]
Priority: [low/medium/high]
Created: [YYYY-MM-DD]
Description: [Detailed explanation]
---
```
- ISSUE-NUMBERs must be sequential
- truncation of description must be indentended so that every line starts at the
same column
- issues must be started with two LF
- issues must be terminated with two LF, then `---`
- issues may have a free-text field (epilog), which must be started with two LF.
## Issues
--ISSUE
Content-Type: application/issue
ID: 1
Type: feature
Title: implement KMP algorithm for string searching
Status: hold
Priority: high
Created: 2025-05-03
Relationships:
Description: Implement the Knuth-Morris-Pratt algorithm for string searching.
I require this for matching RFC 9112 boundaries of entities against
a circular buffer.
--ISSUE
Content-Type: application/issue
---
ID: 2
Type: feature
Title: implement circular buffer
Status: done
Priority: high
Created: 2025-05-04
Relationships:
Description: implement a simple circular buffer
--ISSUE
Content-Type: application/issue
---
ID: 3
Type: bugfix
Title: move unit tests to subdirectory
Status: done
Priority: high
Created: 2025-05-04
Relationships:
Description: move the unit test suites to a unit/ subdirectory so that
integration tests and benchmarks can be cleanly separated
--ISSUE
Content-Type: application/issue
---
ID: 4
Type: feature
Title: implement Rabin-Karp rolling hash algorithm
Status: done
Priority: high
Created: 2025-05-05
Relationships:
Description: After testing a couple of string search algorithms, I've ditched
the idea of using KMP as my use-case gives no advantage compared to
naive searching. In addition I've came upon the challenge that many
@ -58,242 +97,15 @@ Description: After testing a couple of string search algorithms, I've ditched
need an implementation of the original Rabin-Karp rolling hash
algorithm
--ISSUE
Content-Type: application/issue
---
ID: 5
Type: feature
Title: implement chunked rolling hash algorithm
Status: in-progress
Priority: high
Created: 2025-05-05
Relationships:
Description: Implement my custom algorithm for doing rolling hash string search
against a fixed length ring buffer
--ISSUE
Content-Type: application/issue
ID: 6
Type: feature
Title: implement importlib.resources handler for urllib
Status: done
Priority: high
Created: 2025-06-20
Relationships:
Description: A handler that can be registered with an urllib.request
OpenerDirector to open importlib.resources package files.
--ISSUE
Content-Type: application/issue
ID: 7
Type: feature
Title: setup advanced testing environment
Status: done
Priority: high
Created: 2025-06-20
Relationships:
Description: copy the testing environment setup from
byteb4rb1e.sphinxcontrib.ext
--ISSUE
Content-Type: application/issue
ID: 8
Type: bugfix
Title: rename package
Status: done
Priority: high
Created: 2025-06-20
Relationships:
Description: use dot namespaces to make the package a little more elegant
--ISSUE
Content-Type: application/issue
ID: 9
Type: bugfix
Title: fix LICENSE reference
Status: done
Priority: high
Created: 2025-06-20
Relationships:
Description: license specification is no longer a trove classifier in
pyproject.toml, hence the reference to LICENSE must be changed
--ISSUE
Content-Type: application/issue
ID: 10
Type: feature
Title: pytest current test context fixtures
Status: done
Priority: high
Created: 2025-06-20
Relationships:
Description: add fixtures for doing things in relation to the active testing
context
--ISSUE
Content-Type: application/issue
ID: 11
Type: bugfix
Title: move testing utils out of utils
Status: done
Priority: high
Created: 2025-06-20
Relationships:
Description: to shorten the namespace and also indicate that testing utilities
are different from regular utilities
--ISSUE
Content-Type: application/issue
ID: 12
Type: feature
Title: simplify testing.fixtures.mock_pkg
Status: done
Priority: high
Created: 2025-06-21
Relationships:
Description: Only bootstrap a package mock with the minimum requirements for a
Python module and let the consumer handle the directory layout.
--ISSUE
Content-Type: application/issue
ID: 13
Type: bugfix
Title: fix unit tests for urllib PkgHandler
Status: done
Priority: high
Created: 2025-06-21
Relationships:
Description: change of issue 12 wasn't properly reflected in urllib PkgHandler
unit tests
--ISSUE
Content-Type: application/issue
ID: 14
Type: feature
Title: add compression support for urllib PkgHandler
Status: done
Priority: high
Created: 2025-06-21
Relationships:
Description: with a proper content-type of the PkgHandler addinfourl object, a
consumer can determine whether the file is compressed or not.
--ISSUE
Content-Type: application/issue
ID: 15
Type: bugfix
Title: modularize module containers
Status: open
Priority: high
Created: 2025-06-28
Relationships:
Description: Even though importlib can find submodules through traversing paths
instead of relying on __init__.py for every ancestor module, this
is not supported by some modules like sphinx.ext.autosummary
--ISSUE
Content-Type: application/issue
ID: 16
Type: feature
Title: SQL-aware dataclass
Status: in-progress
Priority: low
Created: 2025-12-31
Relationships:
Description: A dataclass that transparently maps onto an SQL datastore, with
command generation for syncing data between data class and store
--ISSUE
Content-Type: application/issue
ID: 17
Type: feature
Title: recursive-descent HTML (DOM) parser
Status: in-progress
Priority: high
Created: 2025-12-31
Relationships:
Description: Extend the built-in event-driven parser to be modeled after DOM
recursive-descent HTML parser
--ISSUE
Content-Type: application/issue
ID: 18
Type: feature
Title: implement saas wrapper for Forgejo
Status: done
Priority: medium
Created: 2026-06-06
Relationships:
Description: Add a new sub-package byteb4rb1e.utils.saas.forgejo, supporting the
same/similar operations as the Bitbucket wrapper
(byteb4rb1e.utils.saas.bitbucket) against the Forgejo REST API:
token-based authentication headers, repository existence checks,
repository creation within an owner/organization, and clone URL
construction. Implement as a thin layer over
byteb4rb1e.utils.http.client, consistent with the existing
Bitbucket and GitHub modules.
Unlike Bitbucket (one global SaaS instance, hence the hardcoded
api.bitbucket.org), Forgejo is self-hosted (e.g.
git.code.tiararodney.com). The wrapper MUST take a host/instance
URL parameter (or read one from config) rather than baking any
specific instance in. This is the biggest API-surface difference
from the bitbucket module.
Bitbucket's clone_url constructs SSH only. Forgejo's repository
API returns both clone_url (HTTPS) and ssh_url, and HTTPS is
needed in CI (no SSH host keys on the Woodpecker runner). The
wrapper SHOULD expose both, either as ssh_clone_url and
https_clone_url, or a single clone_url(..., scheme="ssh"|"https").
--ISSUE
Content-Type: application/issue
ID: 19
Type: feature
Title: config framework with CLI integration
Status: done
Priority: medium
Created: 2026-06-06
Relationships:
Description: Add byteb4rb1e.utils.config: INI-backed config dataclasses where a
dataclass is the single source of truth for settings, with three
layers (field defaults, INI file sections, CLI overrides). Includes
INI loading/writing (load_ini, ensure_ini, ensure_ini_multi,
format_section), per-flag CLI integration (add_config_arguments,
apply_cli_overrides), dotted-path overrides via a unified --config
KEY=VALUE flag (apply_overrides, format_help), and the companion
argparse KeyValueAction (byteb4rb1e.utils.argparse.actions) that
accumulates KEY=VALUE pairs into a dict.
--ISSUE
Content-Type: application/issue
ID: 20
Type: feature
Title: cookie-persisting HTTP session client
Status: done
Priority: medium
Created: 2026-06-06
Relationships:
Description: Extend byteb4rb1e.utils.http.client with an HttpSession class that
persists cookies across requests via http.cookiejar (suitable for
login followed by cookie-authenticated page fetches), supporting
GET with query params, form-encoded POST, default/per-request
header merging, and HTTPError-to-response conversion. Also refactor
HttpResponse into a frozen dataclass with text as a derived
property.
--ISSUE
Content-Type: application/issue
ID: 21
Type: feature
Title: relax host restriction in vcs.git parse_base_url and parse_repo_name
Status: done
Priority: high
Created: 2026-06-06
Relationships:
Description: Both byteb4rb1e.utils.vcs.git.parse_base_url and parse_repo_name
currently hard-reject any URL whose host is not exactly
'bitbucket.org' with a ValueError. The check predates the
multi-SaaS world (it dates back to when bootstrapping required the
Bitbucket API). With the new forgejo saas wrapper (#18) in place,
downstream consumers (specifically sphinxcontrib.h5p.utils.pkg
#105) now feed Forgejo-shaped URLs like
'git@git.code.tiararodney.com:h5p-mirror/foo.git' through these
helpers and hit the restriction.
---

2663
configure vendored Normal file

File diff suppressed because it is too large Load diff

27
configure.ac Normal file
View file

@ -0,0 +1,27 @@
AC_INIT
AC_CHECK_PROGS([MAKE], [make], [no])
AS_IF([test "$MAKE" == "no"],
[AC_MSG_NOTICE([without GNU Make, you have to inspect 'Makefile' and deduce build targets yourself.])])
AC_CHECK_PROGS([GIT], [git], [no])
AS_IF([test "$GIT" == "no"],
[AC_MSG_ERROR([install Git, before continuing.])])
AC_CHECK_PROGS([PYTHON3], [python3], [no])
AS_IF([test "$PYTHON3" == "no"],
[AC_MSG_ERROR([install Python 3, before continuing.])])
# required in Makefile to ensure proper path resolution during preprocessing
# realpath is not available on macOS
AC_CHECK_PROGS([REALPATH], [realpath], [no])
AS_IF([test "$REALPATH" == "no"],
[AC_MSG_ERROR([set a persistent alias for 'realpath', before continuing, e.g.
alias='python3 -c "import pathlib,sys;print(pathlib.Path(sys.argv[[1]]).resolve())"'"
])])
AC_MSG_NOTICE([initializing python3 venv...])
make .venv
AC_OUTPUT

View file

@ -7,12 +7,12 @@ requires = [
build-backend = "setuptools.build_meta"
[project]
name = "byteb4rb1e.utils"
name = "byteb4rb1e-utils"
description = "personal utilities and helpers"
authors = [
{ name = "Tiara Rodney", email = "tiara.rodney@byteb4rb1e.me" }
{ name = "Tiara Rodney", email = "tiara.rodney@administratrix.de" }
]
license-files = ["LICENSE"]
license = { file = "LICENSE" }
readme = "README.md"
classifiers = [
"Development Status :: 1 - Planning",
@ -48,6 +48,7 @@ strict = true
max_line_length = 80
aggressive = 3
recursive = true
in-place = true
[tool.setuptools_scm]

25
requirements-dev.txt Normal file
View file

@ -0,0 +1,25 @@
-i https://pypi.org/simple
astroid==3.3.9; python_full_version >= '3.9.0'
autopep8==2.3.2; python_version >= '3.9'
build==1.2.2.post1; python_version >= '3.8'
-e .
certifi==2025.4.26; python_version >= '3.6'
colorama==0.4.6; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5, 3.6'
dill==0.4.0; python_version >= '3.8'
distlib==0.3.9
filelock==3.18.0; python_version >= '3.9'
isort==6.0.1; python_full_version >= '3.9.0'
mccabe==0.7.0; python_version >= '3.6'
mypy==1.15.0; python_version >= '3.9'
mypy-extensions==1.1.0; python_version >= '3.8'
packaging==25.0; python_version >= '3.8'
pipenv==2025.0.2; python_version >= '3.9'
platformdirs==4.3.7; python_version >= '3.9'
pycodestyle==2.13.0; python_version >= '3.9'
pylint==3.3.6; python_full_version >= '3.9.0'
pyproject-hooks==1.2.0; python_version >= '3.7'
setuptools==80.3.0; python_version >= '3.9'
setuptools-scm==8.2.0; python_version >= '3.8'
tomlkit==0.13.2; python_version >= '3.8'
typing-extensions==4.13.2; python_version >= '3.8'
virtualenv==20.30.0; python_version >= '3.8'

View file

@ -1,14 +0,0 @@
import os
from pathlib import Path
from typing import Tuple
def get_current_test() -> Tuple[Path, str]:
current_test_env = os.getenv("PYTEST_CURRENT_TEST")
if current_test_env is None:
raise RuntimeError("PYTEST_CURRENT_TEST not set. Must be run under pytest.")
suite_path, case_name = current_test_env.split('::', 1)
case_name = case_name.split(' ', 1)[0]
return Path(suite_path).resolve(), case_name

View file

@ -1,47 +0,0 @@
from functools import wraps
from pathlib import Path
import os
import subprocess
import sys
from byteb4rb1e.testing.pytest import get_current_test
def run_in_subprocess_once():
"""
A decorator that reruns th test in a subprocess if not already inside one.
Requires pytest to be installed and test to be run by pytest.
For what? Anything that can't be done in a thread-safe manner, e.g. modifying PYTHON_PATH
"""
def decorator(test_func):
@wraps(test_func)
def wrapper(*args, **kwargs):
if os.environ.get("XPYTEST_INSIDE_SUBPROCESS") == "1":
return test_func(*args, **kwargs)
suite_path, case_name = get_current_test()
cmd = [
sys.executable,
"-m", "pytest",
f"{suite_path}::{case_name}",
]
result = subprocess.run(
cmd,
env={**os.environ, "XPYTEST_INSIDE_SUBPROCESS": "1"},
capture_output=True,
text=True,
)
if result.returncode != 0:
print(' '.join(cmd))
print("==== Subprocess stdout ====")
print(result.stdout)
print("==== Subprocess stderr ====")
print(result.stderr)
raise AssertionError(f"Subprocess test failed with exit code {result.returncode}")
return wrapper
return decorator

View file

@ -1,44 +0,0 @@
import os
from pathlib import Path
import sys
from typing import Dict, Tuple, Union
import pytest
from byteb4rb1e.testing.pytest import get_current_test
_SITE_PACKAGE_COUNTER: Dict[str, int] = {}
@pytest.fixture
def current_test() -> Tuple[Path, str]:
"""
"""
return get_current_test()
@pytest.fixture
def mock_system_site_package_dir(tmp_path):
global _SITE_PACKAGE_COUNTER
package_id = _SITE_PACKAGE_COUNTER.setdefault(tmp_path, 0)
_SITE_PACKAGE_COUNTER[tmp_path] += 1
sys_path = tmp_path / str(package_id)
def _create(name: str) -> Path:
pkg_path = sys_path / name.replace('.', os.path.sep)
pkg_path.mkdir(parents=True)
(pkg_path / "__init__.py").touch()
sys.path.insert(0, str(sys_path))
return pkg_path
yield _create
# cleanup sys.path after test
if str(sys_path) in sys.path:
sys.path.remove(str(sys_path))

View file

@ -1,7 +0,0 @@
"""Utilities for building composable CLIs from command dataclasses."""
from byteb4rb1e.utils.argparse.actions import KeyValueAction
from byteb4rb1e.utils.argparse.command import CLICommand
from byteb4rb1e.utils.argparse.dispatcher import CLI
__all__ = ["CLI", "CLICommand", "KeyValueAction"]

View file

@ -1,33 +0,0 @@
"""Custom argparse actions."""
from __future__ import annotations
import argparse
from typing import Any
class KeyValueAction(argparse.Action):
"""Argparse action that accumulates ``KEY=VALUE`` pairs into a dict.
Usage::
parser.add_argument("--config", action=KeyValueAction,
default={}, metavar="KEY=VALUE",
help="Set a config option (can be repeated)")
Then ``args.config`` is a ``dict[str, str]``.
"""
def __call__(
self,
parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: Any,
option_string: str | None = None,
) -> None:
d = getattr(namespace, self.dest, None) or {}
if "=" not in values:
parser.error(f"Invalid format: {values!r} (expected KEY=VALUE)")
key, _, value = values.partition("=")
d[key.strip()] = value.strip()
setattr(namespace, self.dest, d)

View file

@ -1,54 +0,0 @@
"""Base command dataclass for composable CLI trees."""
from __future__ import annotations
from argparse import ArgumentParser
from dataclasses import dataclass, fields
from typing import Any, ClassVar, Dict, List, Optional, Type
@dataclass
class CLICommand:
"""Base class for CLI commands.
Subclasses define their identity (name, help, description) as
dataclass fields. These are passed as kwargs to
``subparsers.add_parser()``.
Override ``add_arguments`` to register flags and positionals.
Override ``execute`` to implement the command's logic.
Nest subcommands by setting ``_subcommands`` as a class variable.
"""
name: str = ""
help: str = ""
description: str = ""
_subcommands: ClassVar[List[Type[Command]]] = []
def add_arguments(self, parser: ArgumentParser) -> None:
"""Add arguments to the parser. Override in subclasses."""
def execute(self, args: Any) -> int:
"""Run the command. Override in subclasses.
Returns an exit code (0 = success).
"""
return 0
def parser_kwargs(self) -> Dict[str, Any]:
"""Return the dataclass fields as kwargs for add_parser.
Excludes ``name`` (used as the positional parser name) and
any empty-string fields so argparse defaults apply.
"""
skip = {"name"}
kwargs = {}
for f in fields(self):
if f.name in skip or f.name.startswith("_"):
continue
val = getattr(self, f.name)
if val != "":
kwargs[f.name] = val
return kwargs

View file

@ -1,122 +0,0 @@
"""CLI dispatcher — builds parser trees from command dataclasses."""
from __future__ import annotations
import logging
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from typing import Any, Dict, List, Optional, Type
from byteb4rb1e.utils.argparse.command import CLICommand
class CLI:
"""Composable CLI built from a tree of Command dataclasses.
Recursively bootstraps an argparse parser hierarchy and tracks
dest names so ``run()`` can dispatch to the correct leaf command
without dest chaining in the caller.
Usage::
cli = CLI(prog="repository", description="...")
cli.bootstrap([MirrorCommand, IndexCommand])
cli.run()
"""
def __init__(
self,
prog: Optional[str] = None,
description: str = "",
) -> None:
kwargs = {} # type: Dict[str, Any]
if prog:
kwargs["prog"] = prog
if description:
kwargs["description"] = description
kwargs.setdefault(
"formatter_class", ArgumentDefaultsHelpFormatter,
)
self.parser = ArgumentParser(**kwargs)
self._dests = [] # type: List[str]
self._commands = {} # type: Dict[str, Command]
def add_arguments(self, parser: ArgumentParser) -> None:
"""Add global arguments to the root parser."""
parser.add_argument(
"-v", "--verbose", action="count", default=0,
help="Increase verbosity (-v for INFO, -vv for DEBUG)",
)
def bootstrap(
self,
commands: List[Type[Command]],
) -> None:
"""Build the parser tree from a list of top-level commands."""
self.add_arguments(self.parser)
dest = "command"
self._dests.append(dest)
sub = self.parser.add_subparsers(dest=dest)
for cmd_cls in commands:
self._add(sub, cmd_cls, prefix="")
def _add(
self,
subparsers: Any,
cmd_cls: Type[Command],
prefix: str,
) -> None:
"""Recursively add a command and its subcommands."""
cmd = cmd_cls()
parser = subparsers.add_parser(
cmd.name,
formatter_class=ArgumentDefaultsHelpFormatter,
**cmd.parser_kwargs(),
)
cmd.add_arguments(parser)
key = "%s.%s" % (prefix, cmd.name) if prefix else cmd.name
self._commands[key] = cmd
if cmd._subcommands:
dest = "%s_command" % cmd.name
self._dests.append(dest)
child_sub = parser.add_subparsers(dest=dest)
for sc_cls in cmd._subcommands:
self._add(child_sub, sc_cls, prefix=key)
def _resolve(self, args: Any) -> Optional[Command]:
"""Walk dest chain to find the leaf command."""
parts = [] # type: List[str]
for dest in self._dests:
val = getattr(args, dest, None)
if val is None:
continue
parts.append(val)
if not parts:
return None
key = ".".join(parts)
return self._commands.get(key)
@staticmethod
def _setup_logging(verbosity: int) -> None:
if verbosity >= 2:
level = logging.DEBUG
elif verbosity >= 1:
level = logging.INFO
else:
level = logging.WARNING
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()],
)
def run(self) -> None:
"""Parse args and dispatch to the leaf command."""
args = self.parser.parse_args()
self._setup_logging(getattr(args, "verbose", 0))
cmd = self._resolve(args)
if cmd is None:
self.parser.print_help()
raise SystemExit(1)
raise SystemExit(cmd.execute(args))

View file

@ -1,369 +0,0 @@
"""Config framework — INI-backed dataclasses with CLI integration.
A config dataclass is the single source of truth for settings. Values
come from three layers (later wins):
1. Dataclass field defaults
2. INI file sections
3. CLI overrides (via argparse flags or ``--config KEY=VALUE``)
Two CLI integration styles:
- ``add_config_arguments`` generates one ``--flag`` per field.
- ``apply_overrides`` accepts a ``dict[str, str]`` of dotted-path
overrides from a unified ``--config KEY=VALUE`` flag.
"""
import configparser
from argparse import ArgumentParser, Namespace
from dataclasses import MISSING, fields
from pathlib import Path
from typing import Any, Type, TypeVar, get_type_hints
T = TypeVar("T")
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _parse_bool(value: str) -> bool:
"""Parse a boolean from INI/CLI string."""
return value.lower() in ("true", "yes", "1", "on")
_TYPE_MAP = {
int: int,
float: float,
str: str,
bool: _parse_bool,
}
def resolve_hints(cls: Type) -> dict[str, type]:
"""Resolve type hints for a dataclass, handling both evaluated
and string annotations.
:param cls: a dataclass class.
:returns: dict mapping field names to resolved types.
"""
try:
return get_type_hints(cls)
except Exception:
return {
f.name: f.type if isinstance(f.type, type)
else str
for f in fields(cls)
}
def _section_name(cls: Type, section: str | None = None) -> str:
"""Derive INI section name from class name if not provided."""
if section is not None:
return section
name = cls.__name__
if name.endswith("Config"):
name = name[: -len("Config")]
return name.lower()
# ---------------------------------------------------------------------------
# INI loading
# ---------------------------------------------------------------------------
def load_ini(
cls: Type[T],
path: Path,
section: str | None = None,
) -> T:
"""Load a config dataclass from an INI file.
If *section* is not given, the dataclass name (lowercased,
without trailing "Config") is used.
Unknown keys in the INI file raise ValueError. Missing keys
use the dataclass default.
"""
section = _section_name(cls, section)
parser = configparser.ConfigParser(
comment_prefixes=("#", ";"),
inline_comment_prefixes=("#", ";"),
)
parser.read(path)
if not parser.has_section(section):
return cls() # type: ignore[call-arg]
hints = resolve_hints(cls)
field_names = {f.name for f in fields(cls) if f.init}
kwargs: dict[str, Any] = {}
for key, raw_value in parser.items(section):
if key not in field_names:
raise ValueError(
f"Unknown config key '{key}' in"
f" [{section}]. Valid keys:"
f" {sorted(field_names)}"
)
field_type = hints.get(key, str)
coerce = _TYPE_MAP.get(field_type, field_type)
kwargs[key] = coerce(raw_value)
return cls(**kwargs) # type: ignore[call-arg]
# ---------------------------------------------------------------------------
# INI writing
# ---------------------------------------------------------------------------
def format_section(cls: Type, section: str | None = None) -> str:
"""Format a config dataclass as an INI section string.
Returns the section header and all fields with their defaults
as commented key-value pairs.
:param cls: a dataclass class.
:param section: section name (derived from class name if None).
:returns: INI section string.
"""
section = _section_name(cls, section)
hints = resolve_hints(cls)
lines = [f"[{section}]"]
for f in fields(cls):
if not f.init:
continue
field_type = hints.get(f.name, str)
type_name = getattr(field_type, "__name__", str(field_type))
if f.default is not MISSING:
default = f.default
elif f.default_factory is not MISSING: # type: ignore[arg-type]
default = f.default_factory() # type: ignore[misc]
else:
continue
lines.append(f"# {f.name} ({type_name})")
lines.append(f"{f.name} = {default}")
lines.append("")
return "\n".join(lines)
def ensure_ini(
cls: Type[T],
path: Path,
section: str | None = None,
) -> T:
"""Load config from INI, creating the file with defaults if
it does not exist.
On first run, writes a commented INI file with all fields and
their default values. On subsequent runs, reads the existing
file. Never writes back CLI overrides.
"""
section = _section_name(cls, section)
if not path.exists():
_write_default_ini(cls, path, section)
return load_ini(cls, path, section)
def ensure_ini_multi(
configs: list[tuple[Type, str | None]],
path: Path,
) -> None:
"""Create an INI file with multiple sections if it does not exist.
Each entry is a (dataclass_cls, section_name) tuple. If
section_name is None, it is derived from the class name.
Does not overwrite an existing file.
:param configs: list of (cls, section) tuples.
:param path: path to the INI file.
"""
if path.exists():
return
sections = [format_section(cls, section) for cls, section in configs]
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(sections) + "\n")
def _write_default_ini(
cls: Type,
path: Path,
section: str,
) -> None:
"""Write an INI file with all fields as commented defaults."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(format_section(cls, section) + "\n")
# ---------------------------------------------------------------------------
# CLI: per-flag style (add_config_arguments / apply_cli_overrides)
# ---------------------------------------------------------------------------
def add_config_arguments(
cls: Type[T],
parser: ArgumentParser,
prefix: str = "",
) -> None:
"""Add CLI arguments for each field in a config dataclass.
Field names are converted to CLI flags: ``heart_rate_resolution``
becomes ``--heart-rate-resolution`` (or ``--<prefix>-heart-rate-resolution``
if a prefix is given).
"""
hints = resolve_hints(cls)
for f in fields(cls):
if not f.init:
continue
flag_name = f.name.replace("_", "-")
if prefix:
flag_name = f"{prefix}-{flag_name}"
field_type = hints.get(f.name, str)
kwargs: dict[str, Any] = {
"dest": f.name,
}
if field_type is bool:
kwargs["action"] = (
"store_false"
if f.default is True
else "store_true"
)
kwargs["default"] = None
else:
kwargs["type"] = _TYPE_MAP.get(
field_type, field_type
)
kwargs["default"] = None
kwargs["metavar"] = field_type.__name__.upper()
parser.add_argument(f"--{flag_name}", **kwargs)
def apply_cli_overrides(
config: T,
args: Namespace,
) -> T:
"""Apply CLI argument values to a config instance.
Only overrides fields that were explicitly set on the command
line (not None). Returns a new instance.
"""
overrides = {}
for f in fields(config): # type: ignore[arg-type]
if not f.init:
continue
cli_value = getattr(args, f.name, None)
if cli_value is not None:
overrides[f.name] = cli_value
if not overrides:
return config
from dataclasses import asdict
merged = asdict(config) # type: ignore[arg-type]
merged.update(overrides)
return type(config)(**merged) # type: ignore[return-value]
# ---------------------------------------------------------------------------
# CLI: dotted-path style (apply_overrides)
# ---------------------------------------------------------------------------
def apply_overrides(
config: T,
overrides: dict[str, str],
prefix: str = "",
) -> T:
"""Apply dotted-path string overrides to a config dataclass.
Used with a unified ``--config KEY=VALUE`` CLI flag. Each key
is a dotted path relative to the prefix.
Example::
overrides = {
"provider.base_url": "http://localhost:4000",
"provider.model": "qwen2.5:7b",
}
config = apply_overrides(config, overrides, prefix="provider")
# config.base_url == "http://localhost:4000"
# config.model == "qwen2.5:7b"
:param config: a dataclass instance.
:param overrides: dict of dotted keys to string values.
:param prefix: only apply keys starting with this prefix.
:returns: new config instance with overrides applied.
"""
hints = resolve_hints(type(config))
kwargs: dict[str, Any] = {}
changed = False
for f in fields(config):
if not f.init:
continue
full_key = f"{prefix}.{f.name}" if prefix else f.name
if full_key in overrides:
raw = overrides[full_key]
field_type = hints.get(f.name, str)
coerce = _TYPE_MAP.get(field_type, field_type)
kwargs[f.name] = coerce(raw)
changed = True
else:
kwargs[f.name] = getattr(config, f.name)
if not changed:
return config
return type(config)(**kwargs) # type: ignore[return-value]
def format_help(cls: Type, prefix: str = "") -> list[str]:
"""Generate help lines for a config dataclass.
Each line shows the dotted key path, type, and default value.
Suitable for CLI epilog text.
:param cls: a dataclass class.
:param prefix: prepended to each key path.
:returns: list of formatted help strings.
"""
hints = resolve_hints(cls)
lines = []
for f in fields(cls):
if not f.init:
continue
field_type = hints.get(f.name, str)
type_name = getattr(field_type, "__name__", str(field_type))
key = f"{prefix}.{f.name}" if prefix else f.name
if f.default is not MISSING:
default = f.default
elif f.default_factory is not MISSING: # type: ignore[arg-type]
default = repr(f.default_factory()) # type: ignore[misc]
else:
default = "(required)"
lines.append(f" {key} ({type_name}, default: {default})")
return lines
# ---------------------------------------------------------------------------
# Backwards compat
# ---------------------------------------------------------------------------
# keep the old private name working for existing callers
_resolve_hints = resolve_hints

View file

@ -1,201 +0,0 @@
#!/usr/bin/env python3
"""Generic HTTP client.
Thin urllib wrapper with retry-on-rate-limit. No domain knowledge
GitHub, Bitbucket, etc. are handled by higher-level modules.
"""
from dataclasses import dataclass
import http.cookiejar
import json
import time
from typing import Any, Dict, Optional
import urllib.request
import urllib.parse
from warnings import warn
@dataclass(frozen=True)
class HttpResponse:
status_code: int
headers: dict[str, str]
data: bytes
reason: Optional[str] = None
def json(self):
return json.loads(self.data.decode("utf-8"))
@property
def text(self) -> str:
return self.data.decode("utf-8", errors="replace")
class HttpSession:
"""HTTP client that persists cookies across requests.
Suitable for sites that require login followed by
cookie-authenticated page fetches.
"""
def __init__(
self,
default_headers: dict[str, str] | None = None,
timeout: int = 30,
) -> None:
self._timeout = timeout
self._default_headers = default_headers or {}
self._jar = http.cookiejar.CookieJar()
self._opener = urllib.request.build_opener(
urllib.request.HTTPCookieProcessor(self._jar),
)
def get(
self,
url: str,
params: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> HttpResponse:
if params:
query = urllib.parse.urlencode(params)
url = f"{url}?{query}"
req = urllib.request.Request(
url,
headers=self._merged_headers(headers),
method="GET",
)
return self._send(req)
def post(
self,
url: str,
data: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
) -> HttpResponse:
body = (
urllib.parse.urlencode(data).encode()
if data else None
)
merged = self._merged_headers(headers)
if data and "Content-Type" not in merged:
merged["Content-Type"] = (
"application/x-www-form-urlencoded"
)
req = urllib.request.Request(
url,
data=body,
headers=merged,
method="POST",
)
return self._send(req)
def _send(self, req: urllib.request.Request) -> HttpResponse:
try:
with self._opener.open(
req, timeout=self._timeout
) as resp:
return HttpResponse(
status_code=resp.getcode(),
headers=dict(resp.getheaders()),
data=resp.read(),
)
except urllib.error.HTTPError as e:
return HttpResponse(
status_code=e.code,
headers=dict(e.headers.items()),
data=e.read(),
)
def _merged_headers(
self, extra: dict[str, str] | None
) -> dict[str, str]:
merged = dict(self._default_headers)
if extra:
merged.update(extra)
return merged
def _request(
url: str,
method: str = "GET",
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
data: Optional[bytes] = None,
) -> HttpResponse:
# TODO: do proper exponential backoff
backoff = [1, 2, 4]
if params:
query = urllib.parse.urlencode(params)
url = f"{url}?{query}"
req = urllib.request.Request(
url,
headers=headers or {},
method=method,
data=data,
)
for delay in backoff:
try:
with urllib.request.urlopen(req, timeout=30) as resp:
status = resp.getcode()
resp_data = resp.read()
resp_headers = dict(resp.getheaders())
if status == 429:
warn(f"Rate-limited on {url} (HTTP {status})."
f" Backing off {delay}s...")
time.sleep(delay)
continue
return HttpResponse(
status, resp_headers, resp_data, resp.reason,
)
except urllib.error.HTTPError as e:
status = e.code
err_data = e.read()
err_headers = dict(e.headers.items())
if status == 429:
warn(f"Rate-limited on {url} (HTTP {status})."
f" Backing off {delay}s...")
time.sleep(delay)
continue
return HttpResponse(
status, err_headers, err_data, e.reason,
)
except urllib.error.URLError as e:
raise Exception(
"Network error on %s: %s", url, e,
) from e
# If all retries exhausted, return last error-like response
return HttpResponse(503, {}, b"", "Service unavailable")
def get(
url: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
) -> HttpResponse:
return _request(url, method="GET", params=params, headers=headers)
def post(
url: str,
data: Optional[bytes] = None,
headers: Optional[Dict[str, str]] = None,
) -> HttpResponse:
return _request(url, method="POST", headers=headers, data=data)
def put(
url: str,
data: Optional[bytes] = None,
headers: Optional[Dict[str, str]] = None,
) -> HttpResponse:
return _request(url, method="PUT", headers=headers, data=data)

View file

@ -1,78 +0,0 @@
#!/usr/bin/env python3
"""Bitbucket Cloud REST API v2.0 wrapper.
Thin layer over http.py for Bitbucket-specific operations:
- Bearer token authentication
- Repository existence checks
- Repository creation within a workspace/project
"""
import json
from typing import Any, Dict, Optional
from byteb4rb1e.utils.http import client as http_client
BITBUCKET_API = "https://api.bitbucket.org/2.0"
def http_headers(token: str) -> Dict[str, str]:
"""Construct Bitbucket API headers with Bearer token auth."""
return {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
"Content-Type": "application/json",
}
def repository_exists(
workspace: str,
repo_slug: str,
token: str,
) -> bool:
"""Check whether a repository exists in the workspace."""
url = f"{BITBUCKET_API}/repositories/{workspace}/{repo_slug}"
resp = http_client.get(url, headers=http_headers(token))
return resp.status_code == 200
def create_repository(
workspace: str,
repo_slug: str,
token: str,
project: Optional[str] = None,
description: str = "",
is_private: bool = True,
) -> http_client.HttpResponse:
"""Create a new repository in the workspace.
When *project* is given the repository is assigned to that
Bitbucket project (by key). This is required for workspaces
that scope access keys at the project level.
Returns the API response. Caller should check status_code == 200
for success.
"""
url = f"{BITBUCKET_API}/repositories/{workspace}/{repo_slug}"
body: Dict[str, Any] = {
"scm": "git",
"is_private": is_private,
"description": description,
"fork_policy": "no_forks",
}
if project:
body["project"] = {"key": project}
return http_client.put(
url,
data=json.dumps(body).encode("utf-8"),
headers=http_headers(token),
)
def clone_url(
workspace: str,
repo_slug: str,
) -> str:
"""Return the SSH clone URL for a Bitbucket repository."""
return f"git@bitbucket.org:{workspace}/{repo_slug}.git"

View file

@ -1,98 +0,0 @@
#!/usr/bin/env python3
"""Forgejo REST API v1 wrapper.
Thin layer over http.py for Forgejo-specific operations:
- Token authentication
- Repository existence checks
- Repository creation under the authenticated user or an organization
- SSH and HTTPS clone URL construction
Unlike Bitbucket (one global SaaS instance), Forgejo is self-hosted,
so every operation takes a *host* parameter instead of baking any
specific instance in.
"""
import json
from typing import Any, Dict, Optional
from byteb4rb1e.utils.http import client as http_client
def api_url(host: str) -> str:
"""Return the API base URL for a Forgejo instance."""
return f"https://{host}/api/v1"
def http_headers(token: str) -> Dict[str, str]:
"""Construct Forgejo API headers with token auth."""
return {
"Authorization": f"token {token}",
"Accept": "application/json",
"Content-Type": "application/json",
}
def repository_exists(
host: str,
owner: str,
repo_slug: str,
token: str,
) -> bool:
"""Check whether a repository exists under the owner."""
url = f"{api_url(host)}/repos/{owner}/{repo_slug}"
resp = http_client.get(url, headers=http_headers(token))
return bool(resp.status_code == 200)
def create_repository(
host: str,
repo_slug: str,
token: str,
org: Optional[str] = None,
description: str = "",
is_private: bool = True,
) -> http_client.HttpResponse:
"""Create a new repository on the Forgejo instance.
When *org* is given the repository is created in that
organization, otherwise under the authenticated user.
Returns the API response. Caller should check status_code == 201
for success.
"""
if org:
url = f"{api_url(host)}/orgs/{org}/repos"
else:
url = f"{api_url(host)}/user/repos"
body: Dict[str, Any] = {
"name": repo_slug,
"private": is_private,
"description": description,
}
return http_client.post(
url,
data=json.dumps(body).encode("utf-8"),
headers=http_headers(token),
)
def ssh_clone_url(
host: str,
owner: str,
repo_slug: str,
) -> str:
"""Return the SSH clone URL for a Forgejo repository."""
return f"git@{host}:{owner}/{repo_slug}.git"
def https_clone_url(
host: str,
owner: str,
repo_slug: str,
) -> str:
"""Return the HTTPS clone URL for a Forgejo repository.
Preferred in CI environments without SSH host keys.
"""
return f"https://{host}/{owner}/{repo_slug}.git"

View file

@ -1,65 +0,0 @@
#!/usr/bin/env python3
import hashlib
from pathlib import Path
from typing import Any, Dict, List, Optional
from byteb4rb1e.utils.http import client as http_client
GITHUB_API = "https://api.github.com"
def http_headers(token: Optional[str]) -> Dict[str, str]:
headers = {
"Accept": "application/vnd.github+json",
"User-Agent": "sphinx-h5p-worker1"
}
if token:
# Use standard PAT header; token not logged anywhere.
headers["Authorization"] = f"Bearer {token}"
return headers
def blob_sha(path: Path) -> str:
"""Calculate Git blob SHA-1 for a file, matching GitHub API 'sha'."""
data = path.read_bytes()
header = f"blob {len(data)}\0".encode("utf-8")
store = header + data
return hashlib.sha1(store).hexdigest()
def list_org_repos(org: str, token: Optional[str]) -> List[Dict[str, Any]]:
repos: List[Dict[str, Any]] = []
page = 1
per_page = 100
while True:
url = f"{GITHUB_API}/orgs/{org}/repos"
resp = http_client.get(
url,
params={"page": page, "per_page": per_page, "type": "public"},
headers=http_headers(token),
)
if resp.status_code != 200:
raise RuntimeError(f"Failed to list repos for org {org}: {resp.status_code} {resp.text}")
batch = resp.json()
if not batch:
break
repos.extend(batch)
page += 1
return repos
def fetch_file(
org: str,
repo: str,
path: str,
token: str
) -> http_client.HttpResponse:
"""
"""
url = f"{GITHUB_API}/repos/{org}/{repo}/{path}"
return http_client.get(
url,
headers=http_headers(token),
)

View file

@ -1,91 +0,0 @@
from typing import Optional
class RollingHash:
"""implementation of Rabin-Karp rolling hash
"""
#: default base
base: int = 31
#: default modulus
mod: int = 10**9 + 7
#: current computed hash
_hash: int
#: prime number base (e.g., 31)
_base: int
#: large prime modulus (to prevent overflow)
_mod: int
# Precomputation of ``base^(length-1) % mod`` for removing the old byte when
# rolling over
_hbase_factor: int
def __init__(
self,
data: bytes,
base: Optional[int] = None,
mod: Optional[int] = None
):
"""Initialize the rolling hash with a given base and modulus.
base: Prime number base (e.g., 31)
mod: Large prime modulus to prevent overflow
length: Length of the pattern to match
"""
self._base = base if base else RollingHash.base
self._mod = mod if mod else RollingHash.mod
self._hash = RollingHash.compute_initial_hash(
data,
self._base,
self._mod
)
self._hbase_factor = pow(self._base, len(data) - 1, self._mod)
@staticmethod
def compute_initial_hash(
data: bytes,
base: int,
mod: int,
) -> int:
"""Compute the hash for the initial window (first `length` bytes).
rather use this standalone for computing the hash of the search pattern,
to avoid the overhead of instantiating an object.
:param data: data to build hash for
:param base:
:param: mod:
:returns: hash of data
"""
hash_ = 0
for i in range(len(data)):
# computing the modulus at each iteration, as to avoid the summed
# integer to be chunky, as in HUUUUGEE...
hash_ = (hash_ * base + data[i]) % mod
return hash_
def roll(self, old_byte: int, new_byte: int) -> int:
"""Efficiently update hash by removing ``old_byte`` and adding
``new_byte``
The old_byte removal uses a pre-computed value of the highest base used
in the polynomial calculation. This speeds things up a bit.
I was thinking about a way on how to store the old_byte efficiently
within the class object, but that would require storing the entire data,
basically doubling the memory consumption as the data must definetly
also live outside of the class object. A memoryview could solve this
problem, but at the cost of making the implementation more complex, so
this will have to do.
:param old_byte: The ordinal of the first byte in buffer to roll over
:param new_byte: The ordinal of the byte newly appended to the buffer
"""
# Remove old
self._hash = (self._hash - old_byte * self._hbase_factor) % self.mod
# Add new
self._hash = (self._hash * self.base + new_byte) % self.mod
return self._hash

View file

@ -1,41 +0,0 @@
import email
import importlib.resources
import mimetypes
from urllib.request import URLError
import urllib.request
class PkgHandler(urllib.request.BaseHandler):
"""
"""
def pkg_open(self, req) -> urllib.request.addinfourl:
pkg_files = importlib.resources.files(req.host)
try:
fh = next(
pkg_files.glob(req.selector.lstrip('//'))
).open('rb')
except Exception as e:
raise URLError(f'{e.__class__.__name__}: {e}') from e
fh.seek(0, 2);
size = fh.tell();
fh.seek(0);
mtype, compression = mimetypes.guess_type(req.selector)
if compression and mtype:
mtype = f"{mtype}+{compression}"
headers = email.message_from_string(
'Content-Type: %s\nContent-Length: %d\n' %
(mtype or 'text/plain', size)
)
if not mtype or mtype.startswith('text/'):
fh.close()
fh = next(
pkg_files.glob(req.selector.lstrip('//'))
).open('r')
return urllib.request.addinfourl(fh, headers, None)

View file

@ -1,331 +0,0 @@
#!/usr/bin/env python3
"""Git subprocess wrapper for repository operations.
Provides primitives for mirror cloning, syncing, remote management,
file extraction from bare repos, and submodule management.
No pygit2 or gitpython, uses subprocess only.
"""
import logging
import subprocess
from pathlib import Path
from typing import List, Optional
logger = logging.getLogger(__name__)
class GitError(Exception):
"""A git subprocess returned a non-zero exit code."""
def __init__(self, args: List[str], returncode: int, stderr: str):
self.args_list = args
self.returncode = returncode
self.stderr = stderr
super().__init__(
f"git exited {returncode}: {' '.join(args)}\n{stderr}"
)
def parse_base_url(base_url: str) -> str:
"""Extract the workspace from an SCP-style base URL.
Accepts any host (Bitbucket, Forgejo, GitHub, ...) as long as
the URL is SCP-style::
git@bitbucket.org:byteb4rb1e/foo.git byteb4rb1e
git@git.code.tiararodney.com:h5p-mirror/foo.git h5p-mirror
"""
# SCP-style: git@host:workspace/repo
if ":" not in base_url or "//" in base_url:
raise ValueError(
f"Expected SCP-style URL (git@host:workspace), "
f"got: {base_url}"
)
_, workspace = base_url.split(":", 1)
return str(Path(workspace).parent)
def parse_repo_name(base_url: str) -> str:
"""Extract the repository name from an SCP-style base URL.
Accepts any host (Bitbucket, Forgejo, GitHub, ...) as long as
the URL is SCP-style::
git@bitbucket.org:byteb4rb1e/foo.git foo
git@git.code.tiararodney.com:h5p-mirror/foo.git foo
"""
# SCP-style: git@host:workspace/repo
if ":" not in base_url or "//" in base_url:
raise ValueError(
f"Expected SCP-style URL (git@host:workspace), "
f"got: {base_url}"
)
_, workspace = base_url.split(":", 1)
return Path(workspace).name.split('.')[0]
def _run(
args: List[str],
cwd: Optional[Path] = None,
capture_stdout: bool = False,
) -> subprocess.CompletedProcess: # type: ignore[type-arg]
"""Run a git command, raising GitError on failure."""
cmd = ["git"] + args
logger.debug("$ %s", " ".join(cmd))
result = subprocess.run(
cmd,
cwd=cwd,
capture_output=True,
text=True,
)
if result.returncode != 0:
raise GitError(cmd, result.returncode, result.stderr.strip())
return result
def mirror_clone(source_url: str, dest: Path) -> None:
"""Clone a repository as a bare mirror.
Equivalent to ``git clone --mirror <source_url> <dest>``.
The destination directory must not already exist.
"""
_run(["clone", "--mirror", source_url, str(dest)])
logger.info("Cloned mirror %s%s", source_url, dest)
def add_remote(repo: Path, name: str, url: str) -> None:
"""Add a named remote to a bare repository."""
_run(["remote", "add", name, url], cwd=repo)
logger.debug("Added remote %s%s in %s", name, url, repo)
def has_remote(repo: Path, name: str) -> bool:
"""Check whether a named remote exists."""
result = _run(["remote"], cwd=repo)
return name in result.stdout.splitlines()
def mirror_update(repo: Path) -> None:
"""Fetch all remotes in a bare mirror repository.
Equivalent to ``git remote update`` inside the bare repo.
"""
_run(["remote", "update"], cwd=repo)
logger.debug("Updated remotes in %s", repo)
def fetch(repo: Path, remote: str = "origin") -> None:
"""Fetch from a single remote."""
_run(["fetch", remote], cwd=repo)
logger.debug("fetched %s in %s", remote, repo)
def show_ref(repo: Path) -> str:
"""Return the raw output of ``git show-ref`` (all refs + SHAs).
Returns an empty string if the repo has no refs.
"""
try:
result = _run(["show-ref"], cwd=repo)
return result.stdout
except GitError:
return ""
def ls_remote(repo: Path, remote: str) -> str:
"""Return the raw output of ``git ls-remote <remote>``.
Returns an empty string if the remote has no refs or on error.
"""
try:
result = _run(["ls-remote", remote], cwd=repo)
return result.stdout
except GitError:
return ""
def mirror_push(repo: Path, remote: str) -> None:
"""Push the full mirror to a remote.
Equivalent to ``git push --mirror <remote>``.
"""
_run(["push", "--mirror", remote], cwd=repo)
logger.info("Pushed mirror to %s from %s", remote, repo)
def read_file(
repo: Path,
filepath: str,
ref: str = "HEAD",
) -> Optional[str]:
"""Extract a file's contents from a bare repo without checkout.
Returns the file content as a string, or None if the file does
not exist at the given ref.
"""
try:
result = _run(
["show", f"{ref}:{filepath}"],
cwd=repo,
capture_stdout=True,
)
return result.stdout
except GitError:
return None
# -------------------------------------------------------------------
# Ref / tag primitives
# -------------------------------------------------------------------
def list_tags(repo: Path) -> List[str]:
"""List all tags in a repository."""
result = _run(["tag", "-l"], cwd=repo)
return [t for t in result.stdout.splitlines() if t]
def resolve_ref(repo: Path, ref: str) -> str:
"""Resolve a ref to a full SHA.
Raises GitError if the ref cannot be resolved.
"""
result = _run(
["rev-parse", ref], cwd=repo, capture_stdout=True,
)
return result.stdout.strip()
def head_ref(repo: Path) -> str:
"""Return the full SHA of HEAD."""
return resolve_ref(repo, "HEAD")
# -------------------------------------------------------------------
# Pull-through bare clone cache
# -------------------------------------------------------------------
def bare_path_for_url(url: str, cache_dir: Path) -> Path:
"""Derive a cache path from a clone URL.
Strips scheme/host, keeps the path component, appends ``.git``.
Examples::
https://github.com/h5p/h5p-multi-choice
cache_dir / h5p / h5p-multi-choice.git
git@github.com:h5p/h5p-multi-choice.git
cache_dir / h5p / h5p-multi-choice.git
"""
# Handle SCP-style URLs (git@host:path)
if ":" in url and "//" not in url:
path_part = url.split(":", 1)[1]
else:
# Strip scheme + host
from urllib.parse import urlparse
parsed = urlparse(url)
path_part = parsed.path.lstrip("/")
# Strip trailing .git if present, then re-add it
if path_part.endswith(".git"):
path_part = path_part[:-4]
return cache_dir / (path_part + ".git")
def ensure_bare_clone(url: str, cache_dir: Path) -> Path:
"""Ensure a bare mirror clone exists in *cache_dir*.
If the bare repo already exists, fetches updates via
``mirror_update``. Otherwise, creates a new mirror clone.
Returns the path to the bare repo.
"""
bare_path = bare_path_for_url(url, cache_dir)
if bare_path.exists():
mirror_update(bare_path)
logger.debug("Updated existing cache %s", bare_path)
else:
bare_path.parent.mkdir(parents=True, exist_ok=True)
mirror_clone(url, bare_path)
logger.info("Cached new bare clone %s", bare_path)
return bare_path
# -------------------------------------------------------------------
# Submodule operations
# -------------------------------------------------------------------
def has_submodule(repo: Path, path: str) -> bool:
"""Check whether a submodule is registered at *path*.
Reads ``.gitmodules`` to determine whether the submodule exists.
*path* is resolved relative to *repo*, then compared against
the repository root so the check works when *repo* is a
subdirectory of the actual git working tree.
Returns False if ``.gitmodules`` does not exist.
"""
try:
toplevel = Path(
_run(
["rev-parse", "--show-toplevel"], cwd=repo,
).stdout.strip()
)
except GitError:
return False
gitmodules = toplevel / ".gitmodules"
if not gitmodules.is_file():
return False
# Resolve the full path relative to the repo root
full_path = (repo / path).resolve()
try:
rel_path = str(full_path.relative_to(toplevel.resolve()))
except ValueError:
return False
try:
result = _run(
["config", "--file", str(gitmodules),
"--get-regexp", r"submodule\..*\.path"],
cwd=toplevel,
)
except GitError:
return False
for line in result.stdout.splitlines():
parts = line.split(None, 1)
if len(parts) == 2 and parts[1] == rel_path:
return True
return False
def submodule_add(repo: Path, url: str, path: str) -> None:
"""Add a git submodule at *path* pointing to *url*.
Equivalent to ``git submodule add <url> <path>`` inside *repo*.
"""
_run(["submodule", "add", url, path], cwd=repo)
logger.info("Added submodule %s%s", url, path)
def submodule_update(repo: Path, path: str) -> None:
"""Fetch and update a submodule to the latest remote HEAD.
Enters the submodule directory, fetches origin, and checks out
the latest commit on the remote default branch.
"""
sub_path = repo / path
_run(["fetch", "origin"], cwd=sub_path)
# Determine default branch from remote HEAD
result = _run(
["symbolic-ref", "refs/remotes/origin/HEAD",
"--short"],
cwd=sub_path,
)
default_branch = result.stdout.strip()
_run(["checkout", default_branch], cwd=sub_path)
logger.info("Updated submodule %s to %s", path, default_branch)
def submodule_checkout(repo: Path, path: str, ref: str) -> None:
"""Fetch and checkout a specific ref in a submodule."""
sub_path = repo / path
_run(["fetch", "origin"], cwd=sub_path)
_run(["checkout", ref], cwd=sub_path)
logger.info("Checked out submodule %s at %s", path, ref)

View file

@ -1,7 +1,7 @@
from dataclasses import dataclass
from http.server import SimpleHTTPRequestHandler
from byteb4rb1e.utils.io import ChunksIO
from byteb4rb1e_utils.io import ChunksIO
@dataclass

View file

@ -8,12 +8,12 @@ from http.server import HTTPServer
from io import BytesIO, IOBase
from typing import Optional, Tuple, List
from byteb4rb1e.utils.http.server import (
from byteb4rb1e_utils.http.server import (
HandlerOptions,
MultipartUploadHandler,
ServerOptions,
)
from byteb4rb1e.utils.io import ChunksIO
from byteb4rb1e_utils.io import ChunksIO
__doc__ = """tsmuds - Tiara's Simple Multipart Upload Debugging Server

View file

@ -0,0 +1,228 @@
from dataclasses import dataclass
import math
from typing import List, Optional, Tuple
class RollingHash:
"""implementation of Rabin-Karp rolling hash
"""
#: default base
base: int = 31
#: default modulus
mod: int = 10**9 + 7
#: current computed hash
_hash: int
#: prime number base (e.g., 31)
_base: int
#: large prime modulus (to prevent overflow)
_mod: int
# Precomputation of ``base^(length-1) % mod`` for removing the old byte when
# rolling over
_hbase_factor: int
def __init__(
self,
data: bytes,
base: Optional[int] = None,
mod: Optional[int] = None
):
"""Initialize the rolling hash with a given base and modulus.
base: Prime number base (e.g., 31)
mod: Large prime modulus to prevent overflow
length: Length of the pattern to match
"""
self._base = base if base else RollingHash.base
self._mod = mod if mod else RollingHash.mod
self._hash = RollingHash.compute_initial_hash(
data,
self._base,
self._mod
)
self._hbase_factor = pow(self._base, len(data) - 1, self._mod)
@staticmethod
def compute_initial_hash(
data: bytes,
base: int,
mod: int,
) -> int:
"""Compute the hash for the initial window (first `length` bytes).
rather use this standalone for computing the hash of the search pattern,
to avoid the overhead of instantiating an object.
:param data: data to build hash for
:param base:
:param: mod:
:returns: hash of data
"""
hash_ = 0
for i in range(len(data)):
# computing the modulus at each iteration, as to avoid the summed
# integer to be chunky, as in HUUUUGEE...
hash_ = (hash_ * base + data[i]) % mod
return hash_
def roll(self, old_byte: int, new_byte: int) -> int:
"""Efficiently update hash by removing ``old_byte`` and adding
``new_byte``
The old_byte removal uses a pre-computed value of the highest base used
in the polynomial calculation. This speeds things up a bit.
I was thinking about a way on how to store the old_byte efficiently
within the class object, but that would require storing the entire data,
basically doubling the memory consumption as the data must definetly
also live outside of the class object. A memoryview could solve this
problem, but at the cost of making the implementation more complex, so
this will have to do.
:param old_byte: The ordinal of the first byte in buffer to roll over
:param new_byte: The ordinal of the byte newly appended to the buffer
"""
# Remove old
self._hash = (self._hash - old_byte * self._hbase_factor) % self.mod
# Add new
self._hash = (self._hash * self.base + new_byte) % self.mod
return self._hash
@dataclass
class ChunkedRollingHashOptions:
"""
"""
max_chunk_size: int = 10
base: int = RollingHash.base
mod: int = RollingHash.mod
class ChunkedRollingHash:
"""Chunked Rolling hash for linear and circular buffers
This implementation was inspired by the Rabin-Karp rolling hash
algorithm.
A search pattern is chunked and for each chunk its hash is calculated.
I came up with this approach as the requirement for efficient RFC1341 HTTP
multipart entity boundary matching for stream data in a circular/ring
buffer. I've tested a couple of algorithms, but none gave me any real
performance improvements over a naive/bruteforce search.
That's how this algorithm came to be. Big O? I don't know (yet)...
Why this is more performant for my specific use-cases?
------------------------------------------------------
#. Precompute hashes for evenly sized chunks of a search pattern, in
addition of a hash of the full search-pattern.
#. First, match only the hash of the first chunk → immediately skip
unnecessary buffer sections if no match.
#. If the first chunk matches, progressively verify subsequent chunks,
until the full search pattern is confirmed.
Benefits Over Full Matching
---------------------------
- Reduces comparisons significantly eliminates large sections early when
non-matches occur.
- Balances preprocessing cost vs runtime faster elimination means fewer
wasted cycles.
- Integrates seamlessly into circular buffers allows skipping
intelligently.
"""
_chunk_count: int
#: hashes of chunks of search string
_chunks_hash: List[int]
#: hash of the full search string
_hash: int
#: length of search string
_length: int
#: remainder for calculating the actual size of the last chunk
_remainder: int
_base: int
_mod: int
def __init__(
self,
data: bytes,
options: ChunkedRollingHashOptions = ChunkedRollingHashOptions()
):
"""
"""
self._base = options.base
self._mod = options.mod
self._max_chunk_size = options.max_chunk_size
self._chunks_hash = []
self._hash = RollingHash.compute_initial_hash(
data,
base = self._base,
mod = self._mod
)
self._length = len(data)
# only the last chunk differs in size; store its remainder separately
# for optimized handling
self._remainder = self._length % self._max_chunk_size
self._chunk_count = math.ceil(self._length / self._max_chunk_size)
# tracks chunk progression during matching
self._current = 0
# precompute hashes for all chunks to enable rapid comparison
for i in range(0, self._chunk_count):
chunk = data[i*self._max_chunk_size:(i+1)*self._max_chunk_size]
self._chunks_hash.append(
RollingHash.compute_initial_hash(chunk, base=self._base, mod=self._mod)
)
def match(
self,
data: bytes
):
"""match a buffer against a search string through chunked hashing
"""
# progressively match each chunk
for i in range(self._current, self._chunk_count - 1):
chunk = data[i*self._max_chunk_size:(i+1)*self._max_chunk_size]
# no more data left to process
if chunk == b'': break
chunk_hash = RollingHash.compute_initial_hash(
chunk,
base = self._base,
mod = self._mod
)
if chunk_hash != self._chunks_hash[i]:
self._current = 0
return False
self._current += 1
# processing hasn't completed for last chunk to be processed yet
if self._current != self._chunk_count - 1:
return
last_chunk = data[-self._remainder:]
last_chunk_hash = RollingHash.compute_initial_hash(
last_chunk,
base = self._base,
mod = self._mod
)
if self._chunks_hash[self._current] == last_chunk_hash:
return True
self._current = 0
return False

View file

@ -1,33 +0,0 @@
import os
from pathlib import Path
import pytest
pytestmark = pytest.mark.pytest
from byteb4rb1e.testing.pytest import get_current_test
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
class Test_get_current_test:
"""
"""
def test_default(self):
"""
"""
os.environ['PYTEST_CURRENT_TEST'] = 'foo::bar (something)'
result = get_current_test()
assert isinstance(result[0], Path)
assert str(result[0].name) == 'foo'
assert result[1] == 'bar'
def test_invalid(self):
"""
"""
del os.environ['PYTEST_CURRENT_TEST']
with pytest.raises(RuntimeError):
get_current_test()

View file

@ -1,21 +0,0 @@
from pathlib import Path
import pytest
pytestmark = pytest.mark.pytest
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
@run_in_subprocess_once()
def test_run_in_subprocess_once(tmp_path):
marker = tmp_path / "executed_in_subprocess.txt"
if marker.exists():
raise AssertionError("Marker file exists before test logic ran (shouldn't happen in parent process)")
# Create proof of execution
marker.write_text("Subprocess was here.")
# Now assert it
assert marker.exists()

View file

@ -1,38 +0,0 @@
from pathlib import Path
import importlib.resources
import pytest
pytestmark = pytest.mark.pytest
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
from byteb4rb1e.testing.pytest.fixtures import (
current_test,
mock_system_site_package_dir
)
def test_current_test(current_test):
"""
"""
suite_path, case_name = current_test
assert str(Path(__file__)) == str(suite_path)
assert case_name == "test_current_test"
@run_in_subprocess_once()
def test_mock_system_site_package_dir(mock_system_site_package_dir):
"""
"""
dummy_data = 'Hello'
pkgdir = mock_system_site_package_dir('foobarpkg')
(pkgdir / 'data.txt').write_text(dummy_data)
assert (pkgdir / '__init__.py').exists()
result = next(importlib.resources.files('foobarpkg').glob('data.txt')).read_text()
assert result == dummy_data

View file

@ -1,5 +0,0 @@
def pytest_configure(config):
# register an additional marker
config.addinivalue_line(
"markers", "pytest: test pytest integration"
)

View file

@ -1,52 +0,0 @@
"""Tests for custom argparse actions."""
from argparse import ArgumentParser
import pytest
from byteb4rb1e.utils.argparse.actions import KeyValueAction
def _parse(*args):
parser = ArgumentParser()
parser.add_argument("--config", action=KeyValueAction, default={}, metavar="KEY=VALUE")
return parser.parse_args(list(args))
class TestKeyValueAction:
def test_single_pair(self):
args = _parse("--config", "key=value")
assert args.config == {"key": "value"}
def test_multiple_pairs(self):
args = _parse("--config", "a=1", "--config", "b=2")
assert args.config == {"a": "1", "b": "2"}
def test_dotted_key(self):
args = _parse("--config", "provider.base_url=http://localhost")
assert args.config == {"provider.base_url": "http://localhost"}
def test_value_with_equals(self):
args = _parse("--config", "url=http://host?a=1&b=2")
assert args.config == {"url": "http://host?a=1&b=2"}
def test_empty_value(self):
args = _parse("--config", "key=")
assert args.config == {"key": ""}
def test_strips_whitespace(self):
args = _parse("--config", " key = value ")
assert args.config == {"key": "value"}
def test_overwrites_duplicate_key(self):
args = _parse("--config", "key=first", "--config", "key=second")
assert args.config == {"key": "second"}
def test_default_empty_dict(self):
args = _parse()
assert args.config == {}
def test_no_equals_raises(self):
with pytest.raises(SystemExit):
_parse("--config", "no_equals_here")

View file

@ -1,347 +0,0 @@
"""Unit tests for the config framework."""
from argparse import ArgumentParser, Namespace
from dataclasses import dataclass
from pathlib import Path
import pytest
from byteb4rb1e.utils.config import (
add_config_arguments,
apply_cli_overrides,
apply_overrides,
ensure_ini,
ensure_ini_multi,
format_help,
format_section,
load_ini,
resolve_hints,
)
@dataclass
class SampleConfig:
name: str = "default"
count: int = 10
ratio: float = 0.5
enabled: bool = True
class TestLoadIni:
def test_loads_values(self, tmp_path):
ini = tmp_path / "test.ini"
ini.write_text(
"[sample]\n"
"name = custom\n"
"count = 42\n"
"ratio = 0.75\n"
)
config = load_ini(SampleConfig, ini)
assert config.name == "custom"
assert config.count == 42
assert config.ratio == 0.75
assert config.enabled is True # default
def test_missing_section_uses_defaults(self, tmp_path):
ini = tmp_path / "test.ini"
ini.write_text("[other]\nfoo = bar\n")
config = load_ini(SampleConfig, ini)
assert config.name == "default"
assert config.count == 10
def test_missing_file_uses_defaults(self, tmp_path):
config = load_ini(
SampleConfig, tmp_path / "missing.ini"
)
assert config.name == "default"
def test_unknown_key_raises(self, tmp_path):
ini = tmp_path / "test.ini"
ini.write_text("[sample]\nunknown_key = bad\n")
with pytest.raises(ValueError, match="unknown_key"):
load_ini(SampleConfig, ini)
def test_custom_section_name(self, tmp_path):
ini = tmp_path / "test.ini"
ini.write_text("[mysection]\nname = custom\n")
config = load_ini(
SampleConfig, ini, section="mysection"
)
assert config.name == "custom"
def test_comments_ignored(self, tmp_path):
ini = tmp_path / "test.ini"
ini.write_text(
"[sample]\n"
"# this is a comment\n"
"name = works # inline comment\n"
)
config = load_ini(SampleConfig, ini)
assert config.name == "works"
class TestAddConfigArguments:
def test_generates_flags(self):
parser = ArgumentParser()
add_config_arguments(SampleConfig, parser)
args = parser.parse_args(
["--name", "cli", "--count", "99"]
)
assert args.name == "cli"
assert args.count == 99
def test_defaults_are_none(self):
parser = ArgumentParser()
add_config_arguments(SampleConfig, parser)
args = parser.parse_args([])
assert args.name is None
assert args.count is None
def test_underscores_become_dashes(self):
@dataclass
class DashConfig:
my_long_name: str = "x"
parser = ArgumentParser()
add_config_arguments(DashConfig, parser)
args = parser.parse_args(
["--my-long-name", "val"]
)
assert args.my_long_name == "val"
class TestApplyCliOverrides:
def test_overrides_set_values(self):
config = SampleConfig()
args = Namespace(name="override", count=None,
ratio=None, enabled=None)
result = apply_cli_overrides(config, args)
assert result.name == "override"
assert result.count == 10 # unchanged
def test_no_overrides_returns_same(self):
config = SampleConfig()
args = Namespace(name=None, count=None,
ratio=None, enabled=None)
result = apply_cli_overrides(config, args)
assert result.name == "default"
assert result is config
class TestEnsureIni:
def test_creates_file_if_missing(self, tmp_path):
ini = tmp_path / "new.ini"
assert not ini.exists()
config = ensure_ini(SampleConfig, ini)
assert ini.exists()
assert config.name == "default"
assert config.count == 10
def test_created_file_has_all_fields(self, tmp_path):
ini = tmp_path / "new.ini"
ensure_ini(SampleConfig, ini)
content = ini.read_text()
assert "name" in content
assert "count" in content
assert "ratio" in content
assert "enabled" in content
def test_created_file_has_comments(self, tmp_path):
ini = tmp_path / "new.ini"
ensure_ini(SampleConfig, ini)
content = ini.read_text()
assert "# name (str)" in content
assert "# count (int)" in content
def test_reads_existing_file(self, tmp_path):
ini = tmp_path / "existing.ini"
ini.write_text("[sample]\ncount = 42\n")
config = ensure_ini(SampleConfig, ini)
assert config.count == 42
def test_does_not_overwrite_existing(self, tmp_path):
ini = tmp_path / "existing.ini"
ini.write_text("[sample]\ncount = 42\n")
ensure_ini(SampleConfig, ini)
content = ini.read_text()
assert content == "[sample]\ncount = 42\n"
def test_created_file_is_loadable(self, tmp_path):
ini = tmp_path / "new.ini"
ensure_ini(SampleConfig, ini)
config = load_ini(SampleConfig, ini)
assert config.name == "default"
assert config.count == 10
assert config.ratio == 0.5
class TestIntegration:
def test_ini_then_cli_override(self, tmp_path):
ini = tmp_path / "test.ini"
ini.write_text("[sample]\ncount = 42\n")
config = load_ini(SampleConfig, ini)
assert config.count == 42
args = Namespace(name=None, count=99,
ratio=None, enabled=None)
config = apply_cli_overrides(config, args)
assert config.count == 99
assert config.name == "default"
def test_ensure_then_cli_override(self, tmp_path):
ini = tmp_path / "new.ini"
config = ensure_ini(SampleConfig, ini)
assert config.count == 10
args = Namespace(name=None, count=99,
ratio=None, enabled=None)
config = apply_cli_overrides(config, args)
assert config.count == 99
assert config.name == "default"
# Config file unchanged
reloaded = load_ini(SampleConfig, ini)
assert reloaded.count == 10
class TestResolveHints:
def test_returns_type_dict(self):
hints = resolve_hints(SampleConfig)
assert hints["name"] is str
assert hints["count"] is int
assert hints["ratio"] is float
assert hints["enabled"] is bool
class TestFormatSection:
def test_includes_section_header(self):
text = format_section(SampleConfig)
assert "[sample]" in text
def test_custom_section_name(self):
text = format_section(SampleConfig, "custom")
assert "[custom]" in text
def test_includes_all_fields(self):
text = format_section(SampleConfig)
assert "name = default" in text
assert "count = 10" in text
assert "ratio = 0.5" in text
assert "enabled = True" in text
def test_includes_type_comments(self):
text = format_section(SampleConfig)
assert "# name (str)" in text
assert "# count (int)" in text
def test_is_loadable(self, tmp_path):
ini = tmp_path / "test.ini"
ini.write_text(format_section(SampleConfig) + "\n")
config = load_ini(SampleConfig, ini)
assert config.name == "default"
assert config.count == 10
class TestEnsureIniMulti:
def test_creates_file_with_multiple_sections(self, tmp_path):
@dataclass
class OtherConfig:
host: str = "localhost"
port: int = 8080
ini = tmp_path / "multi.ini"
ensure_ini_multi([
(SampleConfig, None),
(OtherConfig, "server"),
], ini)
content = ini.read_text()
assert "[sample]" in content
assert "[server]" in content
assert "name = default" in content
assert "host = localhost" in content
def test_does_not_overwrite_existing(self, tmp_path):
ini = tmp_path / "multi.ini"
ini.write_text("[existing]\nfoo = bar\n")
ensure_ini_multi([(SampleConfig, None)], ini)
content = ini.read_text()
assert content == "[existing]\nfoo = bar\n"
def test_sections_are_loadable(self, tmp_path):
@dataclass
class DbConfig:
url: str = "sqlite:///test.db"
ini = tmp_path / "multi.ini"
ensure_ini_multi([
(SampleConfig, None),
(DbConfig, "database"),
], ini)
sample = load_ini(SampleConfig, ini)
db = load_ini(DbConfig, ini, section="database")
assert sample.name == "default"
assert db.url == "sqlite:///test.db"
class TestApplyOverrides:
def test_applies_dotted_path(self):
config = SampleConfig()
result = apply_overrides(config, {
"provider.name": "custom",
"provider.count": "99",
}, prefix="provider")
assert result.name == "custom"
assert result.count == 99
def test_without_prefix(self):
config = SampleConfig()
result = apply_overrides(config, {
"name": "direct",
"count": "42",
})
assert result.name == "direct"
assert result.count == 42
def test_no_matching_keys_returns_same(self):
config = SampleConfig()
result = apply_overrides(config, {"other.key": "val"}, prefix="provider")
assert result is config
def test_bool_coercion(self):
config = SampleConfig()
result = apply_overrides(config, {"enabled": "false"})
assert result.enabled is False
def test_preserves_unset_fields(self):
config = SampleConfig()
result = apply_overrides(config, {"name": "changed"})
assert result.name == "changed"
assert result.count == 10 # unchanged
assert result.ratio == 0.5 # unchanged
class TestFormatHelp:
def test_lists_all_fields(self):
lines = format_help(SampleConfig)
assert len(lines) == 4
assert any("name" in l for l in lines)
assert any("count" in l for l in lines)
def test_includes_types(self):
lines = format_help(SampleConfig)
text = "\n".join(lines)
assert "str" in text
assert "int" in text
def test_includes_defaults(self):
lines = format_help(SampleConfig)
text = "\n".join(lines)
assert "default" in text
assert "10" in text
def test_with_prefix(self):
lines = format_help(SampleConfig, prefix="provider")
assert any("provider.name" in l for l in lines)
assert any("provider.count" in l for l in lines)

View file

@ -1,217 +0,0 @@
"""Tests for the generic HTTP client."""
import email.message
import io
import urllib.error
import urllib.parse
import urllib.request
from types import TracebackType
from typing import Dict, List, Optional, Tuple, Type, Union
import pytest
from byteb4rb1e.utils.http.client import HttpResponse, HttpSession
class _FakeRawResponse:
"""Stands in for the object returned by OpenerDirector.open()."""
def __init__(
self,
status: int = 200,
headers: Optional[Dict[str, str]] = None,
data: bytes = b"",
) -> None:
self._status = status
self._headers = headers or {}
self._data = data
def getcode(self) -> int:
return self._status
def getheaders(self) -> List[Tuple[str, str]]:
return list(self._headers.items())
def read(self) -> bytes:
return self._data
def __enter__(self) -> "_FakeRawResponse":
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
return None
class _FakeOpener:
"""Records requests and replays canned responses."""
def __init__(
self,
responses: Optional[
List[Union[_FakeRawResponse, Exception]]
] = None,
) -> None:
self.requests: List[urllib.request.Request] = []
self._responses = list(responses or [_FakeRawResponse()])
def open(
self,
req: urllib.request.Request,
timeout: Optional[int] = None,
) -> _FakeRawResponse:
self.requests.append(req)
response = self._responses.pop(0)
if isinstance(response, Exception):
raise response
return response
def _http_error(
code: int = 404,
data: bytes = b"",
headers: Optional[Dict[str, str]] = None,
) -> urllib.error.HTTPError:
hdrs = email.message.Message()
for key, value in (headers or {}).items():
hdrs[key] = value
return urllib.error.HTTPError(
"http://testserver/", code, "error", hdrs, io.BytesIO(data),
)
class TestHttpResponse:
def test_json(self) -> None:
resp = HttpResponse(200, {}, b'{"a": 1}')
assert resp.json() == {"a": 1}
def test_text(self) -> None:
resp = HttpResponse(200, {}, b"hello")
assert resp.text == "hello"
def test_text_replaces_invalid_utf8(self) -> None:
resp = HttpResponse(200, {}, b"\xff\xfe")
assert "<EFBFBD>" in resp.text
def test_reason_defaults_to_none(self) -> None:
resp = HttpResponse(200, {}, b"")
assert resp.reason is None
def test_frozen(self) -> None:
resp = HttpResponse(200, {}, b"")
with pytest.raises(Exception):
resp.status_code = 500
class TestHttpSession:
def test_opener_has_cookie_processor(self) -> None:
session = HttpSession()
processors = [
h for h in session._opener.handlers
if isinstance(h, urllib.request.HTTPCookieProcessor)
]
assert len(processors) == 1
assert processors[0].cookiejar is session._jar
def test_get(self) -> None:
opener = _FakeOpener([
_FakeRawResponse(200, {"X-Foo": "bar"}, b"body"),
])
session = HttpSession()
session._opener = opener
resp = session.get("http://testserver/page")
assert resp.status_code == 200
assert resp.data == b"body"
assert resp.headers == {"X-Foo": "bar"}
assert opener.requests[0].get_method() == "GET"
assert opener.requests[0].full_url == "http://testserver/page"
def test_get_with_params(self) -> None:
opener = _FakeOpener()
session = HttpSession()
session._opener = opener
session.get("http://testserver/page", params={"a": "1", "b": "x y"})
assert opener.requests[0].full_url == (
"http://testserver/page?a=1&b=x+y"
)
def test_default_headers_sent(self) -> None:
opener = _FakeOpener()
session = HttpSession(default_headers={"User-Agent": "test"})
session._opener = opener
session.get("http://testserver/")
assert opener.requests[0].get_header("User-agent") == "test"
def test_request_headers_override_defaults(self) -> None:
opener = _FakeOpener()
session = HttpSession(default_headers={"X-Token": "default"})
session._opener = opener
session.get("http://testserver/", headers={"X-Token": "override"})
assert opener.requests[0].get_header("X-token") == "override"
def test_post_form_encodes_data(self) -> None:
opener = _FakeOpener()
session = HttpSession()
session._opener = opener
session.post("http://testserver/login", data={"user": "u", "pass": "p"})
req = opener.requests[0]
assert req.get_method() == "POST"
assert isinstance(req.data, bytes)
assert dict(urllib.parse.parse_qsl(req.data.decode())) == {
"user": "u",
"pass": "p",
}
assert req.get_header("Content-type") == (
"application/x-www-form-urlencoded"
)
def test_post_keeps_explicit_content_type(self) -> None:
opener = _FakeOpener()
session = HttpSession()
session._opener = opener
session.post(
"http://testserver/",
data={"a": "1"},
headers={"Content-Type": "text/plain"},
)
assert opener.requests[0].get_header("Content-type") == "text/plain"
def test_post_without_data(self) -> None:
opener = _FakeOpener()
session = HttpSession()
session._opener = opener
session.post("http://testserver/")
assert opener.requests[0].data is None
def test_http_error_returned_as_response(self) -> None:
opener = _FakeOpener([
_http_error(404, b"missing", {"X-Err": "yes"}),
])
session = HttpSession()
session._opener = opener
resp = session.get("http://testserver/nope")
assert resp.status_code == 404
assert resp.data == b"missing"
assert resp.headers["X-Err"] == "yes"

View file

@ -1,133 +0,0 @@
"""Tests for the Forgejo API wrapper."""
import json
from typing import Any, Dict, List, Optional, Tuple
import pytest
from byteb4rb1e.utils.http.client import HttpResponse
from byteb4rb1e.utils.saas import forgejo
HOST = "git.example.com"
class _Recorder:
"""Records http_client calls and replays a canned response."""
def __init__(self, response: HttpResponse) -> None:
self.calls: List[Tuple[str, Dict[str, Any]]] = []
self._response = response
def __call__(self, url: str, **kwargs: Any) -> HttpResponse:
self.calls.append((url, kwargs))
return self._response
class TestApiUrl:
def test_host_only(self) -> None:
assert forgejo.api_url(HOST) == "https://git.example.com/api/v1"
class TestHttpHeaders:
def test_token_header(self) -> None:
headers = forgejo.http_headers("s3cret")
assert headers["Authorization"] == "token s3cret"
assert headers["Accept"] == "application/json"
assert headers["Content-Type"] == "application/json"
class TestRepositoryExists:
def test_exists(self, monkeypatch: pytest.MonkeyPatch) -> None:
recorder = _Recorder(HttpResponse(200, {}, b"{}"))
monkeypatch.setattr(forgejo.http_client, "get", recorder)
assert forgejo.repository_exists(HOST, "tiara", "repo", "t") is True
url, kwargs = recorder.calls[0]
assert url == "https://git.example.com/api/v1/repos/tiara/repo"
assert kwargs["headers"]["Authorization"] == "token t"
def test_missing(self, monkeypatch: pytest.MonkeyPatch) -> None:
recorder = _Recorder(HttpResponse(404, {}, b""))
monkeypatch.setattr(forgejo.http_client, "get", recorder)
assert forgejo.repository_exists(HOST, "tiara", "repo", "t") is False
class TestCreateRepository:
def _create(
self,
monkeypatch: pytest.MonkeyPatch,
org: Optional[str] = None,
**kwargs: Any,
) -> _Recorder:
recorder = _Recorder(HttpResponse(201, {}, b"{}"))
monkeypatch.setattr(forgejo.http_client, "post", recorder)
forgejo.create_repository(HOST, "repo", "t", org=org, **kwargs)
return recorder
def test_user_repo_endpoint(
self, monkeypatch: pytest.MonkeyPatch,
) -> None:
recorder = self._create(monkeypatch)
url, _ = recorder.calls[0]
assert url == "https://git.example.com/api/v1/user/repos"
def test_org_repo_endpoint(
self, monkeypatch: pytest.MonkeyPatch,
) -> None:
recorder = self._create(monkeypatch, org="byteb4rb1e")
url, _ = recorder.calls[0]
assert url == "https://git.example.com/api/v1/orgs/byteb4rb1e/repos"
def test_body(self, monkeypatch: pytest.MonkeyPatch) -> None:
recorder = self._create(
monkeypatch, description="demo", is_private=False,
)
_, kwargs = recorder.calls[0]
body = json.loads(kwargs["data"].decode("utf-8"))
assert body == {
"name": "repo",
"private": False,
"description": "demo",
}
def test_defaults_to_private(
self, monkeypatch: pytest.MonkeyPatch,
) -> None:
recorder = self._create(monkeypatch)
_, kwargs = recorder.calls[0]
body = json.loads(kwargs["data"].decode("utf-8"))
assert body["private"] is True
def test_auth_header(self, monkeypatch: pytest.MonkeyPatch) -> None:
recorder = self._create(monkeypatch)
_, kwargs = recorder.calls[0]
assert kwargs["headers"]["Authorization"] == "token t"
def test_returns_response(
self, monkeypatch: pytest.MonkeyPatch,
) -> None:
response = HttpResponse(201, {}, b'{"id": 1}')
recorder = _Recorder(response)
monkeypatch.setattr(forgejo.http_client, "post", recorder)
resp = forgejo.create_repository(HOST, "repo", "t")
assert resp is response
class TestCloneUrls:
def test_ssh(self) -> None:
assert forgejo.ssh_clone_url(HOST, "tiara", "repo") == (
"git@git.example.com:tiara/repo.git"
)
def test_https(self) -> None:
assert forgejo.https_clone_url(HOST, "tiara", "repo") == (
"https://git.example.com/tiara/repo.git"
)

View file

@ -1,93 +0,0 @@
import os.path
import sys
import urllib.request
import pytest
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
from byteb4rb1e.testing.pytest.fixtures import mock_system_site_package_dir
from byteb4rb1e.utils.urllib.request import PkgHandler
class TestPkgHandler:
"""
"""
@run_in_subprocess_once()
def test_text(self, mock_system_site_package_dir):
"""
"""
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
PkgHandler()
)
dummy_data = 'Hello'
pkg_dir = mock_system_site_package_dir('foobarpkg')
(pkg_dir / 'data.txt').write_text(dummy_data)
result = _opener.open('pkg://foobarpkg/data.txt').readline()
assert isinstance(result, str)
assert result == dummy_data
@run_in_subprocess_once()
def test_bytes(self, mock_system_site_package_dir):
"""
"""
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
PkgHandler()
)
dummy_data = b'foobar123'
pkg_dir = mock_system_site_package_dir('foobarpkg')
(pkg_dir / 'data.bin').write_bytes(dummy_data)
result = _opener.open('pkg://foobarpkg/data.bin').readline()
assert isinstance(result, bytes)
assert result == dummy_data
@run_in_subprocess_once()
def test_subdir(self, mock_system_site_package_dir):
"""
"""
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
PkgHandler()
)
dummy_data = 'foobar123'
pkg_dir = mock_system_site_package_dir('foobarpkg')
dummy_file = (pkg_dir / 'foo' / 'bar' / 'data.txt')
dummy_file.parent.mkdir(parents=True)
dummy_file.write_text(dummy_data)
result = _opener.open('pkg://foobarpkg/foo/bar/data.txt').readline()
assert result == dummy_data
@run_in_subprocess_once()
def test_nested_module(self, mock_system_site_package_dir):
"""
"""
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
PkgHandler()
)
dummy_data = 'foobar123'
pkg_dir = mock_system_site_package_dir('foo.bar.pkg')
dummy_file = (pkg_dir / 'dummy' / 'data.txt')
dummy_file.parent.mkdir(parents=True)
dummy_file.write_text(dummy_data)
result = _opener.open('pkg://foo.bar.pkg/dummy/data.txt').readline()
assert result == dummy_data

View file

@ -1,60 +0,0 @@
"""Tests for the git subprocess wrapper's URL parsing helpers."""
import pytest
from byteb4rb1e.utils.vcs.git import parse_base_url, parse_repo_name
class TestParseBaseUrl:
def test_bitbucket(self) -> None:
result = parse_base_url("git@bitbucket.org:byteb4rb1e/foo.git")
assert result == "byteb4rb1e"
def test_forgejo_host(self) -> None:
result = parse_base_url(
"git@git.code.tiararodney.com:h5p-mirror/foo.git"
)
assert result == "h5p-mirror"
def test_github_host(self) -> None:
result = parse_base_url("git@github.com:h5p/h5p-multi-choice.git")
assert result == "h5p"
def test_returns_str(self) -> None:
result = parse_base_url("git@bitbucket.org:byteb4rb1e/foo.git")
assert isinstance(result, str)
def test_rejects_https_url(self) -> None:
with pytest.raises(ValueError):
parse_base_url("https://bitbucket.org/byteb4rb1e/foo.git")
def test_rejects_url_without_colon(self) -> None:
with pytest.raises(ValueError):
parse_base_url("bitbucket.org/byteb4rb1e/foo.git")
class TestParseRepoName:
def test_bitbucket(self) -> None:
assert parse_repo_name(
"git@bitbucket.org:byteb4rb1e/foo.git"
) == "foo"
def test_forgejo_host(self) -> None:
assert parse_repo_name(
"git@git.code.tiararodney.com:h5p-mirror/foo.git"
) == "foo"
def test_without_git_suffix(self) -> None:
assert parse_repo_name(
"git@git.code.tiararodney.com:h5p-mirror/foo"
) == "foo"
def test_rejects_https_url(self) -> None:
with pytest.raises(ValueError):
parse_repo_name("https://git.code.tiararodney.com/x/foo.git")
def test_rejects_url_without_colon(self) -> None:
with pytest.raises(ValueError):
parse_repo_name("git.code.tiararodney.com/x/foo.git")

View file

@ -1,6 +1,6 @@
import unittest
from byteb4rb1e.utils.collections import CircularBuffer
from byteb4rb1e_utils.collections import CircularBuffer
class test_init(unittest.TestCase):
"""CircularBuffer.__init__()"""

View file

@ -1,7 +1,7 @@
from io import BytesIO, IOBase
import unittest
from byteb4rb1e.utils.io import ChunksIO
from byteb4rb1e_utils.io import ChunksIO
class TestGetChunkSize(unittest.TestCase):

View file

@ -0,0 +1,56 @@
import unittest
from byteb4rb1e_utils.string import (
ChunkedRollingHash,
ChunkedRollingHashOptions,
RollingHash,
)
class test___init__(unittest.TestCase):
"""ChunkedRollingHash.__init__()"""
def test_default(self):
"""default options"""
result = ChunkedRollingHash(b'abcdefgh')
self.assertEqual(result._mod, ChunkedRollingHashOptions.mod)
self.assertEqual(result._base, ChunkedRollingHashOptions.base)
self.assertEqual(result._max_chunk_size, ChunkedRollingHashOptions.max_chunk_size)
control_hash = RollingHash.compute_initial_hash(
b'abcdefgh',
base = result._base,
mod = result._mod
)
self.assertEqual(result._length, 8)
self.assertEqual(result._chunk_count, 1)
self.assertEqual(len(result._chunks_hash), result._chunk_count)
self.assertEqual(result._hash, control_hash)
self.assertEqual(result._chunks_hash[0], control_hash)
def test_override(self):
"""override of options"""
options = ChunkedRollingHashOptions(
mod = 4,
base = 10,
max_chunk_size = 5,
)
result = ChunkedRollingHash(b'abcdefgh', options)
self.assertEqual(result._mod, options.mod)
self.assertEqual(result._base, options.base)
self.assertEqual(result._max_chunk_size, options.max_chunk_size)
control_hash1 = RollingHash.compute_initial_hash(
b'abcde',
base = result._base,
mod = result._mod
)
control_hash2 = RollingHash.compute_initial_hash(
b'fgh',
base = result._base,
mod = result._mod
)
self.assertEqual(result._chunks_hash[0], control_hash1)
self.assertEqual(result._chunks_hash[1], control_hash2)

View file

@ -1,6 +1,6 @@
import unittest
from byteb4rb1e.utils.string import RollingHash
from byteb4rb1e_utils.string import RollingHash
class test_compute_initial_hash(unittest.TestCase):
"""RollingHash.compute_initial_hash()

54
tox.ini
View file

@ -1,54 +0,0 @@
[tox]
requires =
tox>=4.19
env_list =
unit-py3{9-13}
integration-py3{9-13}-pytest8
lint
format
[testenv]
deps =
.
[testenv:lint]
description = run type check on code base
labels = static
deps =
mypy
commands =
mypy src tests --junit-xml test-reports/{env_name}.xml
[testenv:audit]
description = run type check on code base
labels = audit
deps =
pip-audit
commands =
pip-audit .
[testenv:format]
description = run type check on code base
labels = static
deps =
autopep8
commands =
autopep8 --diff --exit-code src tests
[testenv:unit-py3{9-13}]
description = run type check on code base
labels = unit
deps =
{[testenv]deps}
pytest
commands =
pytest tests/unit --junitxml=test-reports/{env_name}.xml
[testenv:integration-py3{9-13}-pytest8]
description = run pytest integration tests
labels = integration
deps =
{[testenv]deps}
pytest8: pytest>=8.0,<=9.0
commands =
pytest tests/integration -m pytest --junitxml=test-reports/{env_name}.xml