Compare commits
1 commit
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e5f0d1df58 |
45 changed files with 3248 additions and 2366 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
|
@ -11,5 +11,3 @@
|
|||
/configure~
|
||||
*.swo
|
||||
*.swp
|
||||
/test-reports/
|
||||
/.tox/
|
||||
|
|
|
|||
122
DEVELOPMENT.md
122
DEVELOPMENT.md
|
|
@ -1,122 +0,0 @@
|
|||
# Development
|
||||
|
||||
> All changes MUST follow the vendor/tiara-gitflow-spec.git and no work MUST be
|
||||
> started without a TODO issue.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Python 3.9+
|
||||
- [Pipenv](https://pipenv.pypa.io/)
|
||||
- [tox](https://tox.wiki/) (installed via Pipenv dev dependencies)
|
||||
- Node.js (for the `@byteb4rb1e/mime-todo` issue tracker CLI)
|
||||
|
||||
## Setup
|
||||
|
||||
Iniitialize Git submodules:
|
||||
|
||||
```bash
|
||||
git submodule update --init --remote --recursive
|
||||
```
|
||||
|
||||
Install dependencies (includes the package in editable mode):
|
||||
|
||||
```bash
|
||||
pipenv install --dev
|
||||
```
|
||||
|
||||
|
||||
## Tooling
|
||||
|
||||
### Package
|
||||
|
||||
The project is packaged as `byteb4rb1e.utils` under a namespace package
|
||||
layout (`src/byteb4rb1e/utils/`). It is installed in editable mode via
|
||||
Pipenv.
|
||||
|
||||
Build a distribution:
|
||||
|
||||
```bash
|
||||
pipenv run dist
|
||||
```
|
||||
|
||||
### Testing
|
||||
|
||||
Tests are managed by tox. Test environments are defined in `tox.ini`:
|
||||
|
||||
```bash
|
||||
# run all test suites
|
||||
tox
|
||||
|
||||
# run specific environments
|
||||
tox -e unit-py313
|
||||
tox -e lint
|
||||
tox -e format
|
||||
```
|
||||
|
||||
| Environment | Purpose |
|
||||
|---|---|
|
||||
| `unit-py3{9-13}` | Unit tests |
|
||||
| `smoke-py3{9-13}` | Smoke tests |
|
||||
| `integration-py3{9-13}` | Integration tests |
|
||||
| `lint` | Type checking (mypy) |
|
||||
| `format` | Code style (autopep8) |
|
||||
| `audit` | Dependency audit (pip-audit) |
|
||||
|
||||
### Issue tracker
|
||||
|
||||
Issues are tracked in the `TODO` file using the
|
||||
[MIME TODO](https://specs.code.tiararodney.com/mime-todo/) format. Use the
|
||||
`@byteb4rb1e/mime-todo` CLI to interact with it:
|
||||
|
||||
```bash
|
||||
# list issues
|
||||
npx @byteb4rb1e/mime-todo list
|
||||
|
||||
# show a specific issue
|
||||
npx @byteb4rb1e/mime-todo show 3
|
||||
|
||||
# create an issue
|
||||
npx @byteb4rb1e/mime-todo create --type feature --title "Title" --plan "Description" --module homeostat
|
||||
```
|
||||
|
||||
See [CONTRIBUTING.md](CONTRIBUTING.md) for the full issue lifecycle.
|
||||
|
||||
### Publishing
|
||||
|
||||
Build wheel and source distributions:
|
||||
|
||||
```sh
|
||||
pipenv run sdist
|
||||
```
|
||||
|
||||
Configure publishing options:
|
||||
|
||||
`~/.pypirc`
|
||||
```
|
||||
[distutils]
|
||||
index-servers =
|
||||
tiararodney
|
||||
|
||||
[tiararodney]
|
||||
repository: https://pypi.code.tiararodney.com/root/byteb4rb1e/
|
||||
username: <username>
|
||||
password: <password>
|
||||
```
|
||||
|
||||
Publish to pypi.code.tiararodney.com:
|
||||
|
||||
```sh
|
||||
pipenv run sdist:publish:tiarardoney
|
||||
```
|
||||
|
||||
|
||||
## Project layout
|
||||
|
||||
```
|
||||
src/byteb4rb1e/utils/ # package source
|
||||
tests/ # test suites (unit/, smoke/, integration/)
|
||||
vendor/ # vendored specs
|
||||
dist/ # sdist and wheel build output
|
||||
DEVELOPMENT.md # this file
|
||||
TODO # issue tracker (MIME TODO format)
|
||||
```
|
||||
0
LICENSE
0
LICENSE
24
Makefile
Normal file
24
Makefile
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
.PHONY: chore configure
|
||||
|
||||
chore: configure Pipfile.lock requirements-dev.txt
|
||||
|
||||
Pipfile.lock: .venv Pipfile
|
||||
.venv/bin/pipenv lock
|
||||
|
||||
requirements-dev.txt: .venv Pipfile.lock
|
||||
.venv/bin/pipenv requirements --dev-only > requirements-dev.txt
|
||||
|
||||
configure: configure.ac
|
||||
autoconf
|
||||
|
||||
.venv: requirements-dev.txt
|
||||
python3 -m venv .venv
|
||||
.venv/bin/python3 -m pip install --upgrade pip
|
||||
.venv/bin/pip install -r requirements-dev.txt
|
||||
|
||||
test-reports:
|
||||
.venv/bin/python3 -m unittest discover -v
|
||||
|
||||
build: .venv/bin/pipenv
|
||||
.venv/bin/pipenv run build
|
||||
|
||||
88
NOTES
Normal file
88
NOTES
Normal file
|
|
@ -0,0 +1,88 @@
|
|||
These are just a couple of brain farts that came up and I'd rather note down.
|
||||
There's no clear structure.
|
||||
|
||||
RFC 1341 Boundary Matching in a Circular Buffer
|
||||
1. Algorithm Considerations
|
||||
|
||||
Knuth-Morris-Pratt (KMP) Limitations:
|
||||
|
||||
Useful when patterns have prefix-suffix overlaps for efficient skipping.
|
||||
|
||||
If the failure table consists only of zeros, KMP provides no speed advantage
|
||||
over naive searching.
|
||||
|
||||
Boundary pattern is arbitrary, meaning KMP’s preprocessing may not be
|
||||
beneficial.
|
||||
|
||||
Alternatives to KMP:
|
||||
|
||||
Rabin-Karp rolling hash → Uses fast hash comparisons instead of
|
||||
character-by-character matching.
|
||||
|
||||
Boyer-Moore-Horspool → Precomputes skip distances to avoid redundant
|
||||
comparisons, works well for longer patterns.
|
||||
|
||||
Crochemore-Perrin two-way search → used by str.find(), flexible
|
||||
but assumes a linear memory layout so not really applicable for my circular
|
||||
buffer approach
|
||||
|
||||
2. Boundary Characteristics
|
||||
|
||||
Max length: 70 bytes. Character set: ASCII only. No structure guarantees: The
|
||||
boundary is client-defined, so I must be able to handle arbitrary sequences.
|
||||
|
||||
3. Algorithm Selection
|
||||
|
||||
Rolling Hash → Best for arbitrary short-to-medium patterns in a circular buffer.
|
||||
Boyer-Moore → Ideal if the boundary has distinct character distributions to
|
||||
optimize skipping.
|
||||
|
||||
|
||||
|
||||
|
||||
# Optimized Chunk-Based Rolling Hash Matching
|
||||
|
||||
We need to efficiently detect an RFC 1341 multipart boundary inside a circular
|
||||
buffer, ensuring minimal overhead while avoiding unnecessary comparisons.
|
||||
|
||||
Traditional approaches like Knuth-Morris-Pratt (KMP) don’t provide an advantage
|
||||
when the boundary lacks repeated subpatterns. Meanwhile, full rolling hash
|
||||
matching scans every byte, which can be wasteful.
|
||||
|
||||
Thus, we introduce a chunk-wise hash-based skipping strategy, allowing us to
|
||||
skip large sections of the buffer when an early non-match is detected.
|
||||
|
||||
## Core Idea
|
||||
|
||||
Precompute hashes for evenly sized chunks of the boundary. -> First, match only
|
||||
the hash of the first chunk → immediately skip unnecessary buffer sections if no
|
||||
match. -> If the first chunk matches, progressively verify subsequent chunks
|
||||
until the full boundary is confirmed. Benefits Over Full Matching
|
||||
|
||||
## Benefits Over Full Matching
|
||||
|
||||
- Reduces comparisons significantly → eliminates large sections early when
|
||||
non-matches occur.
|
||||
- Balances preprocessing cost vs runtime → faster
|
||||
elimination means fewer wasted cycles.
|
||||
Integrates seamlessly into circular buffers → allows skipping intelligently.
|
||||
|
||||
|
||||
### Precompute Chunk Hashes
|
||||
|
||||
- Divide the pattern into `N` equal-sized chunks (e.g., 7 chunks of 10 bytes
|
||||
for a 70-byte boundary).
|
||||
- Compute a rolling hash for each chunk in addition to the full pattern, storing
|
||||
them for quick lookup.
|
||||
|
||||
### Sliding Window Search in the Buffer
|
||||
|
||||
- Compute the rolling hash for each window of size chunk_size.
|
||||
- Compare the first chunk’s hash with the buffer window.
|
||||
- If no match, skip boundary_length - chunk_size bytes.
|
||||
|
||||
### Progressive Chunk Verification
|
||||
|
||||
- If the first chunk matches, verify the next chunk sequentially.
|
||||
- Continue matching chunks until the full boundary is confirmed.
|
||||
- Perform final character-by-character validation to rule out hash collisions.
|
||||
20
Pipfile
20
Pipfile
|
|
@ -4,25 +4,17 @@ verify_ssl = true
|
|||
name = "pypi"
|
||||
|
||||
[dev-packages]
|
||||
mypy = "~=1.15.0"
|
||||
autopep8 = "~=2.3.2"
|
||||
setuptools-scm = "~=8.2.0"
|
||||
pylint = "~=3.3.6"
|
||||
build = "*"
|
||||
pipenv = "*"
|
||||
tox = "*"
|
||||
twine = "*"
|
||||
pypi-attestations = "*"
|
||||
autopep8 = "*"
|
||||
byteb4rb1e-utils = { editable = true, path = '.'}
|
||||
|
||||
[requires]
|
||||
python_version = "3"
|
||||
python_version = "3.11"
|
||||
|
||||
[scripts]
|
||||
"dist" = "python3 -m build"
|
||||
"dist:attestations" = "python3 -m pypi_attestations sign dist/*"
|
||||
"dist:publish:tiararodney" = "python3 -m twine upload --sign --repository tiararodney dist/*"
|
||||
"test" = "tox"
|
||||
"test:static" = "tox run -m static"
|
||||
"test:unit" = "tox run -m unit"
|
||||
"test:integration" = "tox run -m integration"
|
||||
"build" = "python3 -m build"
|
||||
|
||||
[packages]
|
||||
"byteb4rb1e.utils" = {file = ".", editable = true}
|
||||
|
|
|
|||
948
Pipfile.lock
generated
948
Pipfile.lock
generated
File diff suppressed because it is too large
Load diff
132
TODO
132
TODO
|
|
@ -109,135 +109,3 @@ Description: Implement my custom algorithm for doing rolling hash string search
|
|||
against a fixed length ring buffer
|
||||
|
||||
---
|
||||
|
||||
ID: 6
|
||||
Type: feature
|
||||
Title: implement importlib.resources handler for urllib
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-06-20
|
||||
Description: A handler that can be registered with an urllib.request
|
||||
OpenerDirector to open importlib.resources package files.
|
||||
|
||||
---
|
||||
|
||||
ID: 7
|
||||
Type: feature
|
||||
Title: setup advanced testing environment
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-06-20
|
||||
Description: copy the testing environment setup from
|
||||
byteb4rb1e.sphinxcontrib.ext
|
||||
|
||||
---
|
||||
|
||||
ID: 8
|
||||
Type: bugfix
|
||||
Title: rename package
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-06-20
|
||||
Description: use dot namespaces to make the package a little more elegant
|
||||
|
||||
---
|
||||
|
||||
ID: 9
|
||||
Type: bugfix
|
||||
Title: fix LICENSE reference
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-06-20
|
||||
Description: license specification is no longer a trove classifier in
|
||||
pyproject.toml, hence the reference to LICENSE must be changed
|
||||
|
||||
---
|
||||
|
||||
ID: 10
|
||||
Type: feature
|
||||
Title: pytest current test context fixtures
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-06-20
|
||||
Description: add fixtures for doing things in relation to the active testing
|
||||
context
|
||||
|
||||
---
|
||||
|
||||
ID: 11
|
||||
Type: bugfix
|
||||
Title: move testing utils out of utils
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-06-20
|
||||
Description: to shorten the namespace and also indicate that testing utilities
|
||||
are different from regular utilities
|
||||
|
||||
---
|
||||
|
||||
ID: 12
|
||||
Type: feature
|
||||
Title: simplify testing.fixtures.mock_pkg
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-06-21
|
||||
Description: Only bootstrap a package mock with the minimum requirements for a
|
||||
Python module and let the consumer handle the directory layout.
|
||||
|
||||
---
|
||||
|
||||
ID: 13
|
||||
Type: bugfix
|
||||
Title: fix unit tests for urllib PkgHandler
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-06-21
|
||||
Description: change of issue 12 wasn't properly reflected in urllib PkgHandler
|
||||
unit tests
|
||||
|
||||
---
|
||||
|
||||
ID: 14
|
||||
Type: feature
|
||||
Title: add compression support for urllib PkgHandler
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-06-21
|
||||
Description: with a proper content-type of the PkgHandler addinfourl object, a
|
||||
consumer can determine whether the file is compressed or not.
|
||||
|
||||
---
|
||||
|
||||
ID: 15
|
||||
Type: bugfix
|
||||
Title: modularize module containers
|
||||
Status: open
|
||||
Priority: high
|
||||
Created: 2025-06-28
|
||||
Description: Even though importlib can find submodules through traversing paths
|
||||
instead of relying on __init__.py for every ancestor module, this
|
||||
is not supported by some modules like sphinx.ext.autosummary
|
||||
|
||||
---
|
||||
|
||||
ID: 16
|
||||
Type: feature
|
||||
Title: SQL-aware dataclass
|
||||
Status: in-progress
|
||||
Priority: low
|
||||
Created: 2025-12-31
|
||||
Description: A dataclass that transparently maps onto an SQL datastore, with
|
||||
command generation for syncing data between data class and store
|
||||
|
||||
---
|
||||
|
||||
ID: 17
|
||||
Type: feature
|
||||
Title: recursive-descent HTML (DOM) parser
|
||||
Status: in-progress
|
||||
Priority: high
|
||||
Created: 2025-12-31
|
||||
Description: Extend the built-in event-driven parser to be modeled after DOM
|
||||
recursive-descent HTML parser
|
||||
|
||||
---
|
||||
|
|
|
|||
27
configure.ac
Normal file
27
configure.ac
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
AC_INIT
|
||||
|
||||
AC_CHECK_PROGS([MAKE], [make], [no])
|
||||
AS_IF([test "$MAKE" == "no"],
|
||||
[AC_MSG_NOTICE([without GNU Make, you have to inspect 'Makefile' and deduce build targets yourself.])])
|
||||
|
||||
AC_CHECK_PROGS([GIT], [git], [no])
|
||||
AS_IF([test "$GIT" == "no"],
|
||||
[AC_MSG_ERROR([install Git, before continuing.])])
|
||||
|
||||
AC_CHECK_PROGS([PYTHON3], [python3], [no])
|
||||
AS_IF([test "$PYTHON3" == "no"],
|
||||
[AC_MSG_ERROR([install Python 3, before continuing.])])
|
||||
|
||||
# required in Makefile to ensure proper path resolution during preprocessing
|
||||
# realpath is not available on macOS
|
||||
AC_CHECK_PROGS([REALPATH], [realpath], [no])
|
||||
AS_IF([test "$REALPATH" == "no"],
|
||||
[AC_MSG_ERROR([set a persistent alias for 'realpath', before continuing, e.g.
|
||||
|
||||
alias='python3 -c "import pathlib,sys;print(pathlib.Path(sys.argv[[1]]).resolve())"'"
|
||||
])])
|
||||
|
||||
AC_MSG_NOTICE([initializing python3 venv...])
|
||||
make .venv
|
||||
|
||||
AC_OUTPUT
|
||||
|
|
@ -7,12 +7,12 @@ requires = [
|
|||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "byteb4rb1e.utils"
|
||||
name = "byteb4rb1e-utils"
|
||||
description = "personal utilities and helpers"
|
||||
authors = [
|
||||
{ name = "Tiara Rodney", email = "tiara.rodney@byteb4rb1e.me" }
|
||||
{ name = "Tiara Rodney", email = "tiara.rodney@administratrix.de" }
|
||||
]
|
||||
license-files = ["LICENSE"]
|
||||
license = { file = "LICENSE" }
|
||||
readme = "README.md"
|
||||
classifiers = [
|
||||
"Development Status :: 1 - Planning",
|
||||
|
|
@ -48,6 +48,7 @@ strict = true
|
|||
max_line_length = 80
|
||||
aggressive = 3
|
||||
recursive = true
|
||||
in-place = true
|
||||
|
||||
[tool.setuptools_scm]
|
||||
|
||||
|
|
|
|||
25
requirements-dev.txt
Normal file
25
requirements-dev.txt
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
-i https://pypi.org/simple
|
||||
astroid==3.3.9; python_full_version >= '3.9.0'
|
||||
autopep8==2.3.2; python_version >= '3.9'
|
||||
build==1.2.2.post1; python_version >= '3.8'
|
||||
-e .
|
||||
certifi==2025.4.26; python_version >= '3.6'
|
||||
colorama==0.4.6; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5, 3.6'
|
||||
dill==0.4.0; python_version >= '3.8'
|
||||
distlib==0.3.9
|
||||
filelock==3.18.0; python_version >= '3.9'
|
||||
isort==6.0.1; python_full_version >= '3.9.0'
|
||||
mccabe==0.7.0; python_version >= '3.6'
|
||||
mypy==1.15.0; python_version >= '3.9'
|
||||
mypy-extensions==1.1.0; python_version >= '3.8'
|
||||
packaging==25.0; python_version >= '3.8'
|
||||
pipenv==2025.0.2; python_version >= '3.9'
|
||||
platformdirs==4.3.7; python_version >= '3.9'
|
||||
pycodestyle==2.13.0; python_version >= '3.9'
|
||||
pylint==3.3.6; python_full_version >= '3.9.0'
|
||||
pyproject-hooks==1.2.0; python_version >= '3.7'
|
||||
setuptools==80.3.0; python_version >= '3.9'
|
||||
setuptools-scm==8.2.0; python_version >= '3.8'
|
||||
tomlkit==0.13.2; python_version >= '3.8'
|
||||
typing-extensions==4.13.2; python_version >= '3.8'
|
||||
virtualenv==20.30.0; python_version >= '3.8'
|
||||
|
|
@ -1,14 +0,0 @@
|
|||
import os
|
||||
from pathlib import Path
|
||||
from typing import Tuple
|
||||
|
||||
|
||||
def get_current_test() -> Tuple[Path, str]:
|
||||
current_test_env = os.getenv("PYTEST_CURRENT_TEST")
|
||||
if current_test_env is None:
|
||||
raise RuntimeError("PYTEST_CURRENT_TEST not set. Must be run under pytest.")
|
||||
|
||||
suite_path, case_name = current_test_env.split('::', 1)
|
||||
case_name = case_name.split(' ', 1)[0]
|
||||
return Path(suite_path).resolve(), case_name
|
||||
|
||||
|
|
@ -1,47 +0,0 @@
|
|||
from functools import wraps
|
||||
from pathlib import Path
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from byteb4rb1e.testing.pytest import get_current_test
|
||||
|
||||
|
||||
def run_in_subprocess_once():
|
||||
"""
|
||||
A decorator that reruns th test in a subprocess if not already inside one.
|
||||
Requires pytest to be installed and test to be run by pytest.
|
||||
|
||||
For what? Anything that can't be done in a thread-safe manner, e.g. modifying PYTHON_PATH
|
||||
"""
|
||||
def decorator(test_func):
|
||||
@wraps(test_func)
|
||||
def wrapper(*args, **kwargs):
|
||||
if os.environ.get("XPYTEST_INSIDE_SUBPROCESS") == "1":
|
||||
return test_func(*args, **kwargs)
|
||||
|
||||
suite_path, case_name = get_current_test()
|
||||
|
||||
cmd = [
|
||||
sys.executable,
|
||||
"-m", "pytest",
|
||||
f"{suite_path}::{case_name}",
|
||||
]
|
||||
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
env={**os.environ, "XPYTEST_INSIDE_SUBPROCESS": "1"},
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
print(' '.join(cmd))
|
||||
print("==== Subprocess stdout ====")
|
||||
print(result.stdout)
|
||||
print("==== Subprocess stderr ====")
|
||||
print(result.stderr)
|
||||
raise AssertionError(f"Subprocess test failed with exit code {result.returncode}")
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
|
@ -1,44 +0,0 @@
|
|||
import os
|
||||
from pathlib import Path
|
||||
import sys
|
||||
from typing import Dict, Tuple, Union
|
||||
|
||||
import pytest
|
||||
|
||||
from byteb4rb1e.testing.pytest import get_current_test
|
||||
|
||||
_SITE_PACKAGE_COUNTER: Dict[str, int] = {}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def current_test() -> Tuple[Path, str]:
|
||||
"""
|
||||
"""
|
||||
return get_current_test()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_system_site_package_dir(tmp_path):
|
||||
global _SITE_PACKAGE_COUNTER
|
||||
|
||||
package_id = _SITE_PACKAGE_COUNTER.setdefault(tmp_path, 0)
|
||||
_SITE_PACKAGE_COUNTER[tmp_path] += 1
|
||||
|
||||
sys_path = tmp_path / str(package_id)
|
||||
|
||||
def _create(name: str) -> Path:
|
||||
pkg_path = sys_path / name.replace('.', os.path.sep)
|
||||
|
||||
pkg_path.mkdir(parents=True)
|
||||
|
||||
(pkg_path / "__init__.py").touch()
|
||||
|
||||
sys.path.insert(0, str(sys_path))
|
||||
|
||||
return pkg_path
|
||||
|
||||
yield _create
|
||||
|
||||
# cleanup sys.path after test
|
||||
if str(sys_path) in sys.path:
|
||||
sys.path.remove(str(sys_path))
|
||||
|
|
@ -1,6 +0,0 @@
|
|||
"""Utilities for building composable CLIs from command dataclasses."""
|
||||
|
||||
from byteb4rb1e.utils.argparse.command import CLICommand
|
||||
from byteb4rb1e.utils.argparse.dispatcher import CLI
|
||||
|
||||
__all__ = ["CLI", "CLICommand"]
|
||||
|
|
@ -1,54 +0,0 @@
|
|||
"""Base command dataclass for composable CLI trees."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from argparse import ArgumentParser
|
||||
from dataclasses import dataclass, fields
|
||||
from typing import Any, ClassVar, Dict, List, Optional, Type
|
||||
|
||||
|
||||
@dataclass
|
||||
class CLICommand:
|
||||
"""Base class for CLI commands.
|
||||
|
||||
Subclasses define their identity (name, help, description) as
|
||||
dataclass fields. These are passed as kwargs to
|
||||
``subparsers.add_parser()``.
|
||||
|
||||
Override ``add_arguments`` to register flags and positionals.
|
||||
Override ``execute`` to implement the command's logic.
|
||||
|
||||
Nest subcommands by setting ``_subcommands`` as a class variable.
|
||||
"""
|
||||
|
||||
name: str = ""
|
||||
help: str = ""
|
||||
description: str = ""
|
||||
|
||||
_subcommands: ClassVar[List[Type[Command]]] = []
|
||||
|
||||
def add_arguments(self, parser: ArgumentParser) -> None:
|
||||
"""Add arguments to the parser. Override in subclasses."""
|
||||
|
||||
def execute(self, args: Any) -> int:
|
||||
"""Run the command. Override in subclasses.
|
||||
|
||||
Returns an exit code (0 = success).
|
||||
"""
|
||||
return 0
|
||||
|
||||
def parser_kwargs(self) -> Dict[str, Any]:
|
||||
"""Return the dataclass fields as kwargs for add_parser.
|
||||
|
||||
Excludes ``name`` (used as the positional parser name) and
|
||||
any empty-string fields so argparse defaults apply.
|
||||
"""
|
||||
skip = {"name"}
|
||||
kwargs = {}
|
||||
for f in fields(self):
|
||||
if f.name in skip or f.name.startswith("_"):
|
||||
continue
|
||||
val = getattr(self, f.name)
|
||||
if val != "":
|
||||
kwargs[f.name] = val
|
||||
return kwargs
|
||||
|
|
@ -1,122 +0,0 @@
|
|||
"""CLI dispatcher — builds parser trees from command dataclasses."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
|
||||
from typing import Any, Dict, List, Optional, Type
|
||||
|
||||
from byteb4rb1e.utils.argparse.command import CLICommand
|
||||
|
||||
|
||||
class CLI:
|
||||
"""Composable CLI built from a tree of Command dataclasses.
|
||||
|
||||
Recursively bootstraps an argparse parser hierarchy and tracks
|
||||
dest names so ``run()`` can dispatch to the correct leaf command
|
||||
without dest chaining in the caller.
|
||||
|
||||
Usage::
|
||||
|
||||
cli = CLI(prog="repository", description="...")
|
||||
cli.bootstrap([MirrorCommand, IndexCommand])
|
||||
cli.run()
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
prog: Optional[str] = None,
|
||||
description: str = "",
|
||||
) -> None:
|
||||
kwargs = {} # type: Dict[str, Any]
|
||||
if prog:
|
||||
kwargs["prog"] = prog
|
||||
if description:
|
||||
kwargs["description"] = description
|
||||
kwargs.setdefault(
|
||||
"formatter_class", ArgumentDefaultsHelpFormatter,
|
||||
)
|
||||
self.parser = ArgumentParser(**kwargs)
|
||||
self._dests = [] # type: List[str]
|
||||
self._commands = {} # type: Dict[str, Command]
|
||||
|
||||
def add_arguments(self, parser: ArgumentParser) -> None:
|
||||
"""Add global arguments to the root parser."""
|
||||
parser.add_argument(
|
||||
"-v", "--verbose", action="count", default=0,
|
||||
help="Increase verbosity (-v for INFO, -vv for DEBUG)",
|
||||
)
|
||||
|
||||
def bootstrap(
|
||||
self,
|
||||
commands: List[Type[Command]],
|
||||
) -> None:
|
||||
"""Build the parser tree from a list of top-level commands."""
|
||||
self.add_arguments(self.parser)
|
||||
dest = "command"
|
||||
self._dests.append(dest)
|
||||
sub = self.parser.add_subparsers(dest=dest)
|
||||
for cmd_cls in commands:
|
||||
self._add(sub, cmd_cls, prefix="")
|
||||
|
||||
def _add(
|
||||
self,
|
||||
subparsers: Any,
|
||||
cmd_cls: Type[Command],
|
||||
prefix: str,
|
||||
) -> None:
|
||||
"""Recursively add a command and its subcommands."""
|
||||
cmd = cmd_cls()
|
||||
parser = subparsers.add_parser(
|
||||
cmd.name,
|
||||
formatter_class=ArgumentDefaultsHelpFormatter,
|
||||
**cmd.parser_kwargs(),
|
||||
)
|
||||
cmd.add_arguments(parser)
|
||||
|
||||
key = "%s.%s" % (prefix, cmd.name) if prefix else cmd.name
|
||||
self._commands[key] = cmd
|
||||
|
||||
if cmd._subcommands:
|
||||
dest = "%s_command" % cmd.name
|
||||
self._dests.append(dest)
|
||||
child_sub = parser.add_subparsers(dest=dest)
|
||||
for sc_cls in cmd._subcommands:
|
||||
self._add(child_sub, sc_cls, prefix=key)
|
||||
|
||||
def _resolve(self, args: Any) -> Optional[Command]:
|
||||
"""Walk dest chain to find the leaf command."""
|
||||
parts = [] # type: List[str]
|
||||
for dest in self._dests:
|
||||
val = getattr(args, dest, None)
|
||||
if val is None:
|
||||
continue
|
||||
parts.append(val)
|
||||
if not parts:
|
||||
return None
|
||||
key = ".".join(parts)
|
||||
return self._commands.get(key)
|
||||
|
||||
@staticmethod
|
||||
def _setup_logging(verbosity: int) -> None:
|
||||
if verbosity >= 2:
|
||||
level = logging.DEBUG
|
||||
elif verbosity >= 1:
|
||||
level = logging.INFO
|
||||
else:
|
||||
level = logging.WARNING
|
||||
logging.basicConfig(
|
||||
level=level,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
handlers=[logging.StreamHandler()],
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
"""Parse args and dispatch to the leaf command."""
|
||||
args = self.parser.parse_args()
|
||||
self._setup_logging(getattr(args, "verbose", 0))
|
||||
cmd = self._resolve(args)
|
||||
if cmd is None:
|
||||
self.parser.print_help()
|
||||
raise SystemExit(1)
|
||||
raise SystemExit(cmd.execute(args))
|
||||
|
|
@ -1,109 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Generic HTTP client.
|
||||
|
||||
Thin urllib wrapper with retry-on-rate-limit. No domain knowledge —
|
||||
GitHub, Bitbucket, etc. are handled by higher-level modules.
|
||||
"""
|
||||
|
||||
import json
|
||||
import time
|
||||
from typing import Any, Dict, Optional
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
from warnings import warn
|
||||
|
||||
|
||||
class HttpResponse:
|
||||
def __init__(self, status: int, headers: dict, data: bytes, reason: str):
|
||||
self.status_code = status
|
||||
self.headers = headers
|
||||
self.data = data
|
||||
self.reason = reason
|
||||
self.text = data.decode("utf-8", errors="replace")
|
||||
|
||||
def json(self):
|
||||
return json.loads(self.data.decode("utf-8"))
|
||||
|
||||
|
||||
def _request(
|
||||
url: str,
|
||||
method: str = "GET",
|
||||
params: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
data: Optional[bytes] = None,
|
||||
) -> HttpResponse:
|
||||
# TODO: do proper exponential backoff
|
||||
backoff = [1, 2, 4]
|
||||
|
||||
if params:
|
||||
query = urllib.parse.urlencode(params)
|
||||
url = f"{url}?{query}"
|
||||
|
||||
req = urllib.request.Request(
|
||||
url,
|
||||
headers=headers or {},
|
||||
method=method,
|
||||
data=data,
|
||||
)
|
||||
|
||||
for delay in backoff:
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
status = resp.getcode()
|
||||
resp_data = resp.read()
|
||||
resp_headers = dict(resp.getheaders())
|
||||
|
||||
if status == 429:
|
||||
warn(f"Rate-limited on {url} (HTTP {status})."
|
||||
f" Backing off {delay}s...")
|
||||
time.sleep(delay)
|
||||
continue
|
||||
|
||||
return HttpResponse(
|
||||
status, resp_headers, resp_data, resp.reason,
|
||||
)
|
||||
|
||||
except urllib.error.HTTPError as e:
|
||||
status = e.code
|
||||
err_data = e.read()
|
||||
err_headers = dict(e.headers.items())
|
||||
if status == 429:
|
||||
warn(f"Rate-limited on {url} (HTTP {status})."
|
||||
f" Backing off {delay}s...")
|
||||
time.sleep(delay)
|
||||
continue
|
||||
return HttpResponse(
|
||||
status, err_headers, err_data, e.reason,
|
||||
)
|
||||
|
||||
except urllib.error.URLError as e:
|
||||
raise Exception(
|
||||
"Network error on %s: %s", url, e,
|
||||
) from e
|
||||
|
||||
# If all retries exhausted, return last error-like response
|
||||
return HttpResponse(503, {}, b"", "Service unavailable")
|
||||
|
||||
|
||||
def get(
|
||||
url: str,
|
||||
params: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
) -> HttpResponse:
|
||||
return _request(url, method="GET", params=params, headers=headers)
|
||||
|
||||
|
||||
def post(
|
||||
url: str,
|
||||
data: Optional[bytes] = None,
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
) -> HttpResponse:
|
||||
return _request(url, method="POST", headers=headers, data=data)
|
||||
|
||||
|
||||
def put(
|
||||
url: str,
|
||||
data: Optional[bytes] = None,
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
) -> HttpResponse:
|
||||
return _request(url, method="PUT", headers=headers, data=data)
|
||||
|
|
@ -1,78 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Bitbucket Cloud REST API v2.0 wrapper.
|
||||
|
||||
Thin layer over http.py for Bitbucket-specific operations:
|
||||
|
||||
- Bearer token authentication
|
||||
- Repository existence checks
|
||||
- Repository creation within a workspace/project
|
||||
"""
|
||||
|
||||
import json
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from byteb4rb1e.utils.http import client as http_client
|
||||
|
||||
|
||||
BITBUCKET_API = "https://api.bitbucket.org/2.0"
|
||||
|
||||
|
||||
def http_headers(token: str) -> Dict[str, str]:
|
||||
"""Construct Bitbucket API headers with Bearer token auth."""
|
||||
return {
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Accept": "application/json",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
|
||||
def repository_exists(
|
||||
workspace: str,
|
||||
repo_slug: str,
|
||||
token: str,
|
||||
) -> bool:
|
||||
"""Check whether a repository exists in the workspace."""
|
||||
url = f"{BITBUCKET_API}/repositories/{workspace}/{repo_slug}"
|
||||
resp = http_client.get(url, headers=http_headers(token))
|
||||
return resp.status_code == 200
|
||||
|
||||
|
||||
def create_repository(
|
||||
workspace: str,
|
||||
repo_slug: str,
|
||||
token: str,
|
||||
project: Optional[str] = None,
|
||||
description: str = "",
|
||||
is_private: bool = True,
|
||||
) -> http_client.HttpResponse:
|
||||
"""Create a new repository in the workspace.
|
||||
|
||||
When *project* is given the repository is assigned to that
|
||||
Bitbucket project (by key). This is required for workspaces
|
||||
that scope access keys at the project level.
|
||||
|
||||
Returns the API response. Caller should check status_code == 200
|
||||
for success.
|
||||
"""
|
||||
url = f"{BITBUCKET_API}/repositories/{workspace}/{repo_slug}"
|
||||
body: Dict[str, Any] = {
|
||||
"scm": "git",
|
||||
"is_private": is_private,
|
||||
"description": description,
|
||||
"fork_policy": "no_forks",
|
||||
}
|
||||
if project:
|
||||
body["project"] = {"key": project}
|
||||
return http_client.put(
|
||||
url,
|
||||
data=json.dumps(body).encode("utf-8"),
|
||||
headers=http_headers(token),
|
||||
)
|
||||
|
||||
|
||||
def clone_url(
|
||||
workspace: str,
|
||||
repo_slug: str,
|
||||
) -> str:
|
||||
"""Return the SSH clone URL for a Bitbucket repository."""
|
||||
return f"git@bitbucket.org:{workspace}/{repo_slug}.git"
|
||||
|
|
@ -1,65 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from byteb4rb1e.utils.http import client as http_client
|
||||
|
||||
|
||||
GITHUB_API = "https://api.github.com"
|
||||
|
||||
|
||||
def http_headers(token: Optional[str]) -> Dict[str, str]:
|
||||
headers = {
|
||||
"Accept": "application/vnd.github+json",
|
||||
"User-Agent": "sphinx-h5p-worker1"
|
||||
}
|
||||
if token:
|
||||
# Use standard PAT header; token not logged anywhere.
|
||||
headers["Authorization"] = f"Bearer {token}"
|
||||
return headers
|
||||
|
||||
|
||||
def blob_sha(path: Path) -> str:
|
||||
"""Calculate Git blob SHA-1 for a file, matching GitHub API 'sha'."""
|
||||
data = path.read_bytes()
|
||||
header = f"blob {len(data)}\0".encode("utf-8")
|
||||
store = header + data
|
||||
return hashlib.sha1(store).hexdigest()
|
||||
|
||||
|
||||
def list_org_repos(org: str, token: Optional[str]) -> List[Dict[str, Any]]:
|
||||
repos: List[Dict[str, Any]] = []
|
||||
page = 1
|
||||
per_page = 100
|
||||
while True:
|
||||
url = f"{GITHUB_API}/orgs/{org}/repos"
|
||||
resp = http_client.get(
|
||||
url,
|
||||
params={"page": page, "per_page": per_page, "type": "public"},
|
||||
headers=http_headers(token),
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
raise RuntimeError(f"Failed to list repos for org {org}: {resp.status_code} {resp.text}")
|
||||
batch = resp.json()
|
||||
if not batch:
|
||||
break
|
||||
repos.extend(batch)
|
||||
page += 1
|
||||
return repos
|
||||
|
||||
|
||||
def fetch_file(
|
||||
org: str,
|
||||
repo: str,
|
||||
path: str,
|
||||
token: str
|
||||
) -> http_client.HttpResponse:
|
||||
"""
|
||||
"""
|
||||
url = f"{GITHUB_API}/repos/{org}/{repo}/{path}"
|
||||
|
||||
return http_client.get(
|
||||
url,
|
||||
headers=http_headers(token),
|
||||
)
|
||||
|
|
@ -1,91 +0,0 @@
|
|||
from typing import Optional
|
||||
|
||||
|
||||
class RollingHash:
|
||||
"""implementation of Rabin-Karp rolling hash
|
||||
"""
|
||||
#: default base
|
||||
base: int = 31
|
||||
#: default modulus
|
||||
mod: int = 10**9 + 7
|
||||
#: current computed hash
|
||||
_hash: int
|
||||
#: prime number base (e.g., 31)
|
||||
_base: int
|
||||
#: large prime modulus (to prevent overflow)
|
||||
_mod: int
|
||||
# Precomputation of ``base^(length-1) % mod`` for removing the old byte when
|
||||
# rolling over
|
||||
_hbase_factor: int
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
data: bytes,
|
||||
base: Optional[int] = None,
|
||||
mod: Optional[int] = None
|
||||
):
|
||||
"""Initialize the rolling hash with a given base and modulus.
|
||||
|
||||
base: Prime number base (e.g., 31)
|
||||
mod: Large prime modulus to prevent overflow
|
||||
length: Length of the pattern to match
|
||||
"""
|
||||
self._base = base if base else RollingHash.base
|
||||
|
||||
self._mod = mod if mod else RollingHash.mod
|
||||
|
||||
self._hash = RollingHash.compute_initial_hash(
|
||||
data,
|
||||
self._base,
|
||||
self._mod
|
||||
)
|
||||
|
||||
self._hbase_factor = pow(self._base, len(data) - 1, self._mod)
|
||||
|
||||
@staticmethod
|
||||
def compute_initial_hash(
|
||||
data: bytes,
|
||||
base: int,
|
||||
mod: int,
|
||||
) -> int:
|
||||
"""Compute the hash for the initial window (first `length` bytes).
|
||||
|
||||
rather use this standalone for computing the hash of the search pattern,
|
||||
to avoid the overhead of instantiating an object.
|
||||
|
||||
:param data: data to build hash for
|
||||
:param base:
|
||||
:param: mod:
|
||||
|
||||
:returns: hash of data
|
||||
"""
|
||||
hash_ = 0
|
||||
for i in range(len(data)):
|
||||
# computing the modulus at each iteration, as to avoid the summed
|
||||
# integer to be chunky, as in HUUUUGEE...
|
||||
hash_ = (hash_ * base + data[i]) % mod
|
||||
return hash_
|
||||
|
||||
def roll(self, old_byte: int, new_byte: int) -> int:
|
||||
"""Efficiently update hash by removing ``old_byte`` and adding
|
||||
``new_byte``
|
||||
|
||||
The old_byte removal uses a pre-computed value of the highest base used
|
||||
in the polynomial calculation. This speeds things up a bit.
|
||||
|
||||
I was thinking about a way on how to store the old_byte efficiently
|
||||
within the class object, but that would require storing the entire data,
|
||||
basically doubling the memory consumption as the data must definetly
|
||||
also live outside of the class object. A memoryview could solve this
|
||||
problem, but at the cost of making the implementation more complex, so
|
||||
this will have to do.
|
||||
|
||||
:param old_byte: The ordinal of the first byte in buffer to roll over
|
||||
:param new_byte: The ordinal of the byte newly appended to the buffer
|
||||
"""
|
||||
# Remove old
|
||||
self._hash = (self._hash - old_byte * self._hbase_factor) % self.mod
|
||||
# Add new
|
||||
self._hash = (self._hash * self.base + new_byte) % self.mod
|
||||
|
||||
return self._hash
|
||||
|
|
@ -1,41 +0,0 @@
|
|||
import email
|
||||
import importlib.resources
|
||||
import mimetypes
|
||||
from urllib.request import URLError
|
||||
import urllib.request
|
||||
|
||||
|
||||
class PkgHandler(urllib.request.BaseHandler):
|
||||
"""
|
||||
"""
|
||||
def pkg_open(self, req) -> urllib.request.addinfourl:
|
||||
pkg_files = importlib.resources.files(req.host)
|
||||
|
||||
try:
|
||||
fh = next(
|
||||
pkg_files.glob(req.selector.lstrip('//'))
|
||||
).open('rb')
|
||||
except Exception as e:
|
||||
raise URLError(f'{e.__class__.__name__}: {e}') from e
|
||||
|
||||
fh.seek(0, 2);
|
||||
size = fh.tell();
|
||||
fh.seek(0);
|
||||
|
||||
mtype, compression = mimetypes.guess_type(req.selector)
|
||||
|
||||
if compression and mtype:
|
||||
mtype = f"{mtype}+{compression}"
|
||||
|
||||
headers = email.message_from_string(
|
||||
'Content-Type: %s\nContent-Length: %d\n' %
|
||||
(mtype or 'text/plain', size)
|
||||
)
|
||||
|
||||
if not mtype or mtype.startswith('text/'):
|
||||
fh.close()
|
||||
fh = next(
|
||||
pkg_files.glob(req.selector.lstrip('//'))
|
||||
).open('r')
|
||||
|
||||
return urllib.request.addinfourl(fh, headers, None)
|
||||
|
|
@ -1,345 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Git subprocess wrapper for repository operations.
|
||||
|
||||
Provides primitives for mirror cloning, syncing, remote management,
|
||||
file extraction from bare repos, and submodule management.
|
||||
No pygit2 or gitpython, uses subprocess only.
|
||||
"""
|
||||
import logging
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GitError(Exception):
|
||||
"""A git subprocess returned a non-zero exit code."""
|
||||
|
||||
def __init__(self, args: List[str], returncode: int, stderr: str):
|
||||
self.args_list = args
|
||||
self.returncode = returncode
|
||||
self.stderr = stderr
|
||||
super().__init__(
|
||||
f"git exited {returncode}: {' '.join(args)}\n{stderr}"
|
||||
)
|
||||
|
||||
|
||||
def parse_base_url(base_url: str) -> str:
|
||||
"""Extract workspace from an SCP-style Bitbucket base URL.
|
||||
|
||||
The host part must be exactly ``bitbucket.org`` — bootstrapping
|
||||
requires the Bitbucket API, so other hosts are rejected.
|
||||
|
||||
>>> _parse_base_url("git@bitbucket.org:byteb4rb1e")
|
||||
'byteb4rb1e'
|
||||
"""
|
||||
# SCP-style: git@bitbucket.org:workspace
|
||||
if ":" not in base_url or "//" in base_url:
|
||||
raise ValueError(
|
||||
f"Expected SCP-style URL (git@bitbucket.org:workspace), "
|
||||
f"got: {base_url}"
|
||||
)
|
||||
host_part, workspace = base_url.split(":", 1)
|
||||
# host_part is e.g. "git@bitbucket.org"
|
||||
host = host_part.split("@", 1)[-1]
|
||||
if host != "bitbucket.org":
|
||||
raise ValueError(
|
||||
f"Mirror base URL must target bitbucket.org, "
|
||||
f"got host: {host}"
|
||||
)
|
||||
return Path(workspace).parent
|
||||
|
||||
|
||||
def parse_repo_name(base_url: str) -> str:
|
||||
"""Extract workspace from an SCP-style Bitbucket base URL.
|
||||
|
||||
The host part must be exactly ``bitbucket.org`` — bootstrapping
|
||||
requires the Bitbucket API, so other hosts are rejected.
|
||||
|
||||
>>> _parse_base_url("git@bitbucket.org:byteb4rb1e")
|
||||
'byteb4rb1e'
|
||||
"""
|
||||
# SCP-style: git@bitbucket.org:workspace
|
||||
if ":" not in base_url or "//" in base_url:
|
||||
raise ValueError(
|
||||
f"Expected SCP-style URL (git@bitbucket.org:workspace), "
|
||||
f"got: {base_url}"
|
||||
)
|
||||
host_part, workspace = base_url.split(":", 1)
|
||||
# host_part is e.g. "git@bitbucket.org"
|
||||
host = host_part.split("@", 1)[-1]
|
||||
if host != "bitbucket.org":
|
||||
raise ValueError(
|
||||
f"Mirror base URL must target bitbucket.org, "
|
||||
f"got host: {host}"
|
||||
)
|
||||
return Path(workspace).name.split('.')[0]
|
||||
|
||||
|
||||
|
||||
def _run(
|
||||
args: List[str],
|
||||
cwd: Optional[Path] = None,
|
||||
capture_stdout: bool = False,
|
||||
) -> subprocess.CompletedProcess: # type: ignore[type-arg]
|
||||
"""Run a git command, raising GitError on failure."""
|
||||
cmd = ["git"] + args
|
||||
logger.debug("$ %s", " ".join(cmd))
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
cwd=cwd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
raise GitError(cmd, result.returncode, result.stderr.strip())
|
||||
return result
|
||||
|
||||
|
||||
def mirror_clone(source_url: str, dest: Path) -> None:
|
||||
"""Clone a repository as a bare mirror.
|
||||
|
||||
Equivalent to ``git clone --mirror <source_url> <dest>``.
|
||||
The destination directory must not already exist.
|
||||
"""
|
||||
_run(["clone", "--mirror", source_url, str(dest)])
|
||||
logger.info("Cloned mirror %s → %s", source_url, dest)
|
||||
|
||||
|
||||
def add_remote(repo: Path, name: str, url: str) -> None:
|
||||
"""Add a named remote to a bare repository."""
|
||||
_run(["remote", "add", name, url], cwd=repo)
|
||||
logger.debug("Added remote %s → %s in %s", name, url, repo)
|
||||
|
||||
|
||||
def has_remote(repo: Path, name: str) -> bool:
|
||||
"""Check whether a named remote exists."""
|
||||
result = _run(["remote"], cwd=repo)
|
||||
return name in result.stdout.splitlines()
|
||||
|
||||
|
||||
def mirror_update(repo: Path) -> None:
|
||||
"""Fetch all remotes in a bare mirror repository.
|
||||
|
||||
Equivalent to ``git remote update`` inside the bare repo.
|
||||
"""
|
||||
_run(["remote", "update"], cwd=repo)
|
||||
logger.debug("Updated remotes in %s", repo)
|
||||
|
||||
|
||||
def fetch(repo: Path, remote: str = "origin") -> None:
|
||||
"""Fetch from a single remote."""
|
||||
_run(["fetch", remote], cwd=repo)
|
||||
logger.debug("fetched %s in %s", remote, repo)
|
||||
|
||||
|
||||
def show_ref(repo: Path) -> str:
|
||||
"""Return the raw output of ``git show-ref`` (all refs + SHAs).
|
||||
|
||||
Returns an empty string if the repo has no refs.
|
||||
"""
|
||||
try:
|
||||
result = _run(["show-ref"], cwd=repo)
|
||||
return result.stdout
|
||||
except GitError:
|
||||
return ""
|
||||
|
||||
|
||||
def ls_remote(repo: Path, remote: str) -> str:
|
||||
"""Return the raw output of ``git ls-remote <remote>``.
|
||||
|
||||
Returns an empty string if the remote has no refs or on error.
|
||||
"""
|
||||
try:
|
||||
result = _run(["ls-remote", remote], cwd=repo)
|
||||
return result.stdout
|
||||
except GitError:
|
||||
return ""
|
||||
|
||||
|
||||
def mirror_push(repo: Path, remote: str) -> None:
|
||||
"""Push the full mirror to a remote.
|
||||
|
||||
Equivalent to ``git push --mirror <remote>``.
|
||||
"""
|
||||
_run(["push", "--mirror", remote], cwd=repo)
|
||||
logger.info("Pushed mirror to %s from %s", remote, repo)
|
||||
|
||||
|
||||
def read_file(
|
||||
repo: Path,
|
||||
filepath: str,
|
||||
ref: str = "HEAD",
|
||||
) -> Optional[str]:
|
||||
"""Extract a file's contents from a bare repo without checkout.
|
||||
|
||||
Returns the file content as a string, or None if the file does
|
||||
not exist at the given ref.
|
||||
"""
|
||||
try:
|
||||
result = _run(
|
||||
["show", f"{ref}:{filepath}"],
|
||||
cwd=repo,
|
||||
capture_stdout=True,
|
||||
)
|
||||
return result.stdout
|
||||
except GitError:
|
||||
return None
|
||||
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Ref / tag primitives
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
def list_tags(repo: Path) -> List[str]:
|
||||
"""List all tags in a repository."""
|
||||
result = _run(["tag", "-l"], cwd=repo)
|
||||
return [t for t in result.stdout.splitlines() if t]
|
||||
|
||||
|
||||
def resolve_ref(repo: Path, ref: str) -> str:
|
||||
"""Resolve a ref to a full SHA.
|
||||
|
||||
Raises GitError if the ref cannot be resolved.
|
||||
"""
|
||||
result = _run(
|
||||
["rev-parse", ref], cwd=repo, capture_stdout=True,
|
||||
)
|
||||
return result.stdout.strip()
|
||||
|
||||
|
||||
def head_ref(repo: Path) -> str:
|
||||
"""Return the full SHA of HEAD."""
|
||||
return resolve_ref(repo, "HEAD")
|
||||
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Pull-through bare clone cache
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
def bare_path_for_url(url: str, cache_dir: Path) -> Path:
|
||||
"""Derive a cache path from a clone URL.
|
||||
|
||||
Strips scheme/host, keeps the path component, appends ``.git``.
|
||||
|
||||
Examples::
|
||||
|
||||
https://github.com/h5p/h5p-multi-choice
|
||||
→ cache_dir / h5p / h5p-multi-choice.git
|
||||
git@github.com:h5p/h5p-multi-choice.git
|
||||
→ cache_dir / h5p / h5p-multi-choice.git
|
||||
"""
|
||||
# Handle SCP-style URLs (git@host:path)
|
||||
if ":" in url and "//" not in url:
|
||||
path_part = url.split(":", 1)[1]
|
||||
else:
|
||||
# Strip scheme + host
|
||||
from urllib.parse import urlparse
|
||||
parsed = urlparse(url)
|
||||
path_part = parsed.path.lstrip("/")
|
||||
|
||||
# Strip trailing .git if present, then re-add it
|
||||
if path_part.endswith(".git"):
|
||||
path_part = path_part[:-4]
|
||||
|
||||
return cache_dir / (path_part + ".git")
|
||||
|
||||
|
||||
def ensure_bare_clone(url: str, cache_dir: Path) -> Path:
|
||||
"""Ensure a bare mirror clone exists in *cache_dir*.
|
||||
|
||||
If the bare repo already exists, fetches updates via
|
||||
``mirror_update``. Otherwise, creates a new mirror clone.
|
||||
Returns the path to the bare repo.
|
||||
"""
|
||||
bare_path = bare_path_for_url(url, cache_dir)
|
||||
if bare_path.exists():
|
||||
mirror_update(bare_path)
|
||||
logger.debug("Updated existing cache %s", bare_path)
|
||||
else:
|
||||
bare_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
mirror_clone(url, bare_path)
|
||||
logger.info("Cached new bare clone %s", bare_path)
|
||||
return bare_path
|
||||
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Submodule operations
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
def has_submodule(repo: Path, path: str) -> bool:
|
||||
"""Check whether a submodule is registered at *path*.
|
||||
|
||||
Reads ``.gitmodules`` to determine whether the submodule exists.
|
||||
*path* is resolved relative to *repo*, then compared against
|
||||
the repository root so the check works when *repo* is a
|
||||
subdirectory of the actual git working tree.
|
||||
Returns False if ``.gitmodules`` does not exist.
|
||||
"""
|
||||
try:
|
||||
toplevel = Path(
|
||||
_run(
|
||||
["rev-parse", "--show-toplevel"], cwd=repo,
|
||||
).stdout.strip()
|
||||
)
|
||||
except GitError:
|
||||
return False
|
||||
gitmodules = toplevel / ".gitmodules"
|
||||
if not gitmodules.is_file():
|
||||
return False
|
||||
# Resolve the full path relative to the repo root
|
||||
full_path = (repo / path).resolve()
|
||||
try:
|
||||
rel_path = str(full_path.relative_to(toplevel.resolve()))
|
||||
except ValueError:
|
||||
return False
|
||||
try:
|
||||
result = _run(
|
||||
["config", "--file", str(gitmodules),
|
||||
"--get-regexp", r"submodule\..*\.path"],
|
||||
cwd=toplevel,
|
||||
)
|
||||
except GitError:
|
||||
return False
|
||||
for line in result.stdout.splitlines():
|
||||
parts = line.split(None, 1)
|
||||
if len(parts) == 2 and parts[1] == rel_path:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def submodule_add(repo: Path, url: str, path: str) -> None:
|
||||
"""Add a git submodule at *path* pointing to *url*.
|
||||
|
||||
Equivalent to ``git submodule add <url> <path>`` inside *repo*.
|
||||
"""
|
||||
_run(["submodule", "add", url, path], cwd=repo)
|
||||
logger.info("Added submodule %s → %s", url, path)
|
||||
|
||||
|
||||
def submodule_update(repo: Path, path: str) -> None:
|
||||
"""Fetch and update a submodule to the latest remote HEAD.
|
||||
|
||||
Enters the submodule directory, fetches origin, and checks out
|
||||
the latest commit on the remote default branch.
|
||||
"""
|
||||
sub_path = repo / path
|
||||
_run(["fetch", "origin"], cwd=sub_path)
|
||||
# Determine default branch from remote HEAD
|
||||
result = _run(
|
||||
["symbolic-ref", "refs/remotes/origin/HEAD",
|
||||
"--short"],
|
||||
cwd=sub_path,
|
||||
)
|
||||
default_branch = result.stdout.strip()
|
||||
_run(["checkout", default_branch], cwd=sub_path)
|
||||
logger.info("Updated submodule %s to %s", path, default_branch)
|
||||
|
||||
|
||||
def submodule_checkout(repo: Path, path: str, ref: str) -> None:
|
||||
"""Fetch and checkout a specific ref in a submodule."""
|
||||
sub_path = repo / path
|
||||
_run(["fetch", "origin"], cwd=sub_path)
|
||||
_run(["checkout", ref], cwd=sub_path)
|
||||
logger.info("Checked out submodule %s at %s", path, ref)
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
from dataclasses import dataclass
|
||||
from http.server import SimpleHTTPRequestHandler
|
||||
|
||||
from byteb4rb1e.utils.io import ChunksIO
|
||||
from byteb4rb1e_utils.io import ChunksIO
|
||||
|
||||
|
||||
@dataclass
|
||||
|
|
@ -8,12 +8,12 @@ from http.server import HTTPServer
|
|||
from io import BytesIO, IOBase
|
||||
from typing import Optional, Tuple, List
|
||||
|
||||
from byteb4rb1e.utils.http.server import (
|
||||
from byteb4rb1e_utils.http.server import (
|
||||
HandlerOptions,
|
||||
MultipartUploadHandler,
|
||||
ServerOptions,
|
||||
)
|
||||
from byteb4rb1e.utils.io import ChunksIO
|
||||
from byteb4rb1e_utils.io import ChunksIO
|
||||
|
||||
|
||||
__doc__ = """tsmuds - Tiara's Simple Multipart Upload Debugging Server
|
||||
228
src/byteb4rb1e_utils/string.py
Normal file
228
src/byteb4rb1e_utils/string.py
Normal file
|
|
@ -0,0 +1,228 @@
|
|||
from dataclasses import dataclass
|
||||
import math
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
|
||||
class RollingHash:
|
||||
"""implementation of Rabin-Karp rolling hash
|
||||
"""
|
||||
#: default base
|
||||
base: int = 31
|
||||
#: default modulus
|
||||
mod: int = 10**9 + 7
|
||||
#: current computed hash
|
||||
_hash: int
|
||||
#: prime number base (e.g., 31)
|
||||
_base: int
|
||||
#: large prime modulus (to prevent overflow)
|
||||
_mod: int
|
||||
# Precomputation of ``base^(length-1) % mod`` for removing the old byte when
|
||||
# rolling over
|
||||
_hbase_factor: int
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
data: bytes,
|
||||
base: Optional[int] = None,
|
||||
mod: Optional[int] = None
|
||||
):
|
||||
"""Initialize the rolling hash with a given base and modulus.
|
||||
|
||||
base: Prime number base (e.g., 31)
|
||||
mod: Large prime modulus to prevent overflow
|
||||
length: Length of the pattern to match
|
||||
"""
|
||||
self._base = base if base else RollingHash.base
|
||||
|
||||
self._mod = mod if mod else RollingHash.mod
|
||||
|
||||
self._hash = RollingHash.compute_initial_hash(
|
||||
data,
|
||||
self._base,
|
||||
self._mod
|
||||
)
|
||||
|
||||
self._hbase_factor = pow(self._base, len(data) - 1, self._mod)
|
||||
|
||||
@staticmethod
|
||||
def compute_initial_hash(
|
||||
data: bytes,
|
||||
base: int,
|
||||
mod: int,
|
||||
) -> int:
|
||||
"""Compute the hash for the initial window (first `length` bytes).
|
||||
|
||||
rather use this standalone for computing the hash of the search pattern,
|
||||
to avoid the overhead of instantiating an object.
|
||||
|
||||
:param data: data to build hash for
|
||||
:param base:
|
||||
:param: mod:
|
||||
|
||||
:returns: hash of data
|
||||
"""
|
||||
hash_ = 0
|
||||
for i in range(len(data)):
|
||||
# computing the modulus at each iteration, as to avoid the summed
|
||||
# integer to be chunky, as in HUUUUGEE...
|
||||
hash_ = (hash_ * base + data[i]) % mod
|
||||
return hash_
|
||||
|
||||
def roll(self, old_byte: int, new_byte: int) -> int:
|
||||
"""Efficiently update hash by removing ``old_byte`` and adding
|
||||
``new_byte``
|
||||
|
||||
The old_byte removal uses a pre-computed value of the highest base used
|
||||
in the polynomial calculation. This speeds things up a bit.
|
||||
|
||||
I was thinking about a way on how to store the old_byte efficiently
|
||||
within the class object, but that would require storing the entire data,
|
||||
basically doubling the memory consumption as the data must definetly
|
||||
also live outside of the class object. A memoryview could solve this
|
||||
problem, but at the cost of making the implementation more complex, so
|
||||
this will have to do.
|
||||
|
||||
:param old_byte: The ordinal of the first byte in buffer to roll over
|
||||
:param new_byte: The ordinal of the byte newly appended to the buffer
|
||||
"""
|
||||
# Remove old
|
||||
self._hash = (self._hash - old_byte * self._hbase_factor) % self.mod
|
||||
# Add new
|
||||
self._hash = (self._hash * self.base + new_byte) % self.mod
|
||||
|
||||
return self._hash
|
||||
|
||||
|
||||
@dataclass
|
||||
class ChunkedRollingHashOptions:
|
||||
"""
|
||||
"""
|
||||
max_chunk_size: int = 10
|
||||
base: int = RollingHash.base
|
||||
mod: int = RollingHash.mod
|
||||
|
||||
|
||||
class ChunkedRollingHash:
|
||||
"""Chunked Rolling hash for linear and circular buffers
|
||||
|
||||
This implementation was inspired by the Rabin-Karp rolling hash
|
||||
algorithm.
|
||||
|
||||
A search pattern is chunked and for each chunk its hash is calculated.
|
||||
|
||||
I came up with this approach as the requirement for efficient RFC1341 HTTP
|
||||
multipart entity boundary matching for stream data in a circular/ring
|
||||
buffer. I've tested a couple of algorithms, but none gave me any real
|
||||
performance improvements over a naive/bruteforce search.
|
||||
|
||||
That's how this algorithm came to be. Big O? I don't know (yet)...
|
||||
|
||||
Why this is more performant for my specific use-cases?
|
||||
------------------------------------------------------
|
||||
|
||||
#. Precompute hashes for evenly sized chunks of a search pattern, in
|
||||
addition of a hash of the full search-pattern.
|
||||
#. First, match only the hash of the first chunk → immediately skip
|
||||
unnecessary buffer sections if no match.
|
||||
#. If the first chunk matches, progressively verify subsequent chunks,
|
||||
until the full search pattern is confirmed.
|
||||
|
||||
Benefits Over Full Matching
|
||||
---------------------------
|
||||
|
||||
- Reduces comparisons significantly → eliminates large sections early when
|
||||
non-matches occur.
|
||||
- Balances preprocessing cost vs runtime → faster elimination means fewer
|
||||
wasted cycles.
|
||||
- Integrates seamlessly into circular buffers → allows skipping
|
||||
intelligently.
|
||||
"""
|
||||
_chunk_count: int
|
||||
#: hashes of chunks of search string
|
||||
_chunks_hash: List[int]
|
||||
#: hash of the full search string
|
||||
_hash: int
|
||||
#: length of search string
|
||||
_length: int
|
||||
#: remainder for calculating the actual size of the last chunk
|
||||
_remainder: int
|
||||
|
||||
_base: int
|
||||
|
||||
_mod: int
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
data: bytes,
|
||||
options: ChunkedRollingHashOptions = ChunkedRollingHashOptions()
|
||||
):
|
||||
"""
|
||||
"""
|
||||
self._base = options.base
|
||||
self._mod = options.mod
|
||||
self._max_chunk_size = options.max_chunk_size
|
||||
self._chunks_hash = []
|
||||
self._hash = RollingHash.compute_initial_hash(
|
||||
data,
|
||||
base = self._base,
|
||||
mod = self._mod
|
||||
)
|
||||
self._length = len(data)
|
||||
|
||||
# only the last chunk differs in size; store its remainder separately
|
||||
# for optimized handling
|
||||
self._remainder = self._length % self._max_chunk_size
|
||||
|
||||
self._chunk_count = math.ceil(self._length / self._max_chunk_size)
|
||||
# tracks chunk progression during matching
|
||||
self._current = 0
|
||||
|
||||
# precompute hashes for all chunks to enable rapid comparison
|
||||
for i in range(0, self._chunk_count):
|
||||
chunk = data[i*self._max_chunk_size:(i+1)*self._max_chunk_size]
|
||||
|
||||
self._chunks_hash.append(
|
||||
RollingHash.compute_initial_hash(chunk, base=self._base, mod=self._mod)
|
||||
)
|
||||
|
||||
def match(
|
||||
self,
|
||||
data: bytes
|
||||
):
|
||||
"""match a buffer against a search string through chunked hashing
|
||||
"""
|
||||
# progressively match each chunk
|
||||
for i in range(self._current, self._chunk_count - 1):
|
||||
chunk = data[i*self._max_chunk_size:(i+1)*self._max_chunk_size]
|
||||
|
||||
# no more data left to process
|
||||
if chunk == b'': break
|
||||
|
||||
chunk_hash = RollingHash.compute_initial_hash(
|
||||
chunk,
|
||||
base = self._base,
|
||||
mod = self._mod
|
||||
)
|
||||
|
||||
if chunk_hash != self._chunks_hash[i]:
|
||||
self._current = 0
|
||||
return False
|
||||
|
||||
self._current += 1
|
||||
|
||||
# processing hasn't completed for last chunk to be processed yet
|
||||
if self._current != self._chunk_count - 1:
|
||||
return
|
||||
|
||||
last_chunk = data[-self._remainder:]
|
||||
last_chunk_hash = RollingHash.compute_initial_hash(
|
||||
last_chunk,
|
||||
base = self._base,
|
||||
mod = self._mod
|
||||
)
|
||||
|
||||
if self._chunks_hash[self._current] == last_chunk_hash:
|
||||
return True
|
||||
|
||||
self._current = 0
|
||||
return False
|
||||
|
|
@ -1,33 +0,0 @@
|
|||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.pytest
|
||||
|
||||
from byteb4rb1e.testing.pytest import get_current_test
|
||||
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
|
||||
|
||||
|
||||
class Test_get_current_test:
|
||||
"""
|
||||
"""
|
||||
|
||||
def test_default(self):
|
||||
"""
|
||||
"""
|
||||
os.environ['PYTEST_CURRENT_TEST'] = 'foo::bar (something)'
|
||||
|
||||
result = get_current_test()
|
||||
|
||||
assert isinstance(result[0], Path)
|
||||
assert str(result[0].name) == 'foo'
|
||||
|
||||
assert result[1] == 'bar'
|
||||
|
||||
def test_invalid(self):
|
||||
"""
|
||||
"""
|
||||
del os.environ['PYTEST_CURRENT_TEST']
|
||||
with pytest.raises(RuntimeError):
|
||||
get_current_test()
|
||||
|
|
@ -1,21 +0,0 @@
|
|||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.pytest
|
||||
|
||||
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
|
||||
|
||||
|
||||
@run_in_subprocess_once()
|
||||
def test_run_in_subprocess_once(tmp_path):
|
||||
marker = tmp_path / "executed_in_subprocess.txt"
|
||||
|
||||
if marker.exists():
|
||||
raise AssertionError("Marker file exists before test logic ran (shouldn't happen in parent process)")
|
||||
|
||||
# Create proof of execution
|
||||
marker.write_text("Subprocess was here.")
|
||||
|
||||
# Now assert it
|
||||
assert marker.exists()
|
||||
|
|
@ -1,38 +0,0 @@
|
|||
from pathlib import Path
|
||||
import importlib.resources
|
||||
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.pytest
|
||||
|
||||
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
|
||||
from byteb4rb1e.testing.pytest.fixtures import (
|
||||
current_test,
|
||||
mock_system_site_package_dir
|
||||
)
|
||||
|
||||
|
||||
def test_current_test(current_test):
|
||||
"""
|
||||
"""
|
||||
suite_path, case_name = current_test
|
||||
|
||||
assert str(Path(__file__)) == str(suite_path)
|
||||
assert case_name == "test_current_test"
|
||||
|
||||
|
||||
@run_in_subprocess_once()
|
||||
def test_mock_system_site_package_dir(mock_system_site_package_dir):
|
||||
"""
|
||||
"""
|
||||
dummy_data = 'Hello'
|
||||
|
||||
pkgdir = mock_system_site_package_dir('foobarpkg')
|
||||
|
||||
(pkgdir / 'data.txt').write_text(dummy_data)
|
||||
|
||||
assert (pkgdir / '__init__.py').exists()
|
||||
|
||||
result = next(importlib.resources.files('foobarpkg').glob('data.txt')).read_text()
|
||||
|
||||
assert result == dummy_data
|
||||
|
|
@ -1,5 +0,0 @@
|
|||
def pytest_configure(config):
|
||||
# register an additional marker
|
||||
config.addinivalue_line(
|
||||
"markers", "pytest: test pytest integration"
|
||||
)
|
||||
|
|
@ -1,93 +0,0 @@
|
|||
import os.path
|
||||
import sys
|
||||
import urllib.request
|
||||
|
||||
import pytest
|
||||
|
||||
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
|
||||
from byteb4rb1e.testing.pytest.fixtures import mock_system_site_package_dir
|
||||
from byteb4rb1e.utils.urllib.request import PkgHandler
|
||||
|
||||
|
||||
class TestPkgHandler:
|
||||
"""
|
||||
"""
|
||||
@run_in_subprocess_once()
|
||||
def test_text(self, mock_system_site_package_dir):
|
||||
"""
|
||||
"""
|
||||
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
|
||||
PkgHandler()
|
||||
)
|
||||
|
||||
dummy_data = 'Hello'
|
||||
|
||||
pkg_dir = mock_system_site_package_dir('foobarpkg')
|
||||
(pkg_dir / 'data.txt').write_text(dummy_data)
|
||||
|
||||
result = _opener.open('pkg://foobarpkg/data.txt').readline()
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert result == dummy_data
|
||||
|
||||
|
||||
@run_in_subprocess_once()
|
||||
def test_bytes(self, mock_system_site_package_dir):
|
||||
"""
|
||||
"""
|
||||
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
|
||||
PkgHandler()
|
||||
)
|
||||
|
||||
dummy_data = b'foobar123'
|
||||
|
||||
pkg_dir = mock_system_site_package_dir('foobarpkg')
|
||||
(pkg_dir / 'data.bin').write_bytes(dummy_data)
|
||||
|
||||
result = _opener.open('pkg://foobarpkg/data.bin').readline()
|
||||
|
||||
assert isinstance(result, bytes)
|
||||
assert result == dummy_data
|
||||
|
||||
|
||||
@run_in_subprocess_once()
|
||||
def test_subdir(self, mock_system_site_package_dir):
|
||||
"""
|
||||
"""
|
||||
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
|
||||
PkgHandler()
|
||||
)
|
||||
|
||||
dummy_data = 'foobar123'
|
||||
|
||||
pkg_dir = mock_system_site_package_dir('foobarpkg')
|
||||
|
||||
dummy_file = (pkg_dir / 'foo' / 'bar' / 'data.txt')
|
||||
|
||||
dummy_file.parent.mkdir(parents=True)
|
||||
dummy_file.write_text(dummy_data)
|
||||
|
||||
result = _opener.open('pkg://foobarpkg/foo/bar/data.txt').readline()
|
||||
|
||||
assert result == dummy_data
|
||||
|
||||
|
||||
@run_in_subprocess_once()
|
||||
def test_nested_module(self, mock_system_site_package_dir):
|
||||
"""
|
||||
"""
|
||||
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
|
||||
PkgHandler()
|
||||
)
|
||||
|
||||
dummy_data = 'foobar123'
|
||||
|
||||
pkg_dir = mock_system_site_package_dir('foo.bar.pkg')
|
||||
dummy_file = (pkg_dir / 'dummy' / 'data.txt')
|
||||
|
||||
dummy_file.parent.mkdir(parents=True)
|
||||
dummy_file.write_text(dummy_data)
|
||||
|
||||
result = _opener.open('pkg://foo.bar.pkg/dummy/data.txt').readline()
|
||||
|
||||
assert result == dummy_data
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
import unittest
|
||||
|
||||
from byteb4rb1e.utils.collections import CircularBuffer
|
||||
from byteb4rb1e_utils.collections import CircularBuffer
|
||||
|
||||
class test_init(unittest.TestCase):
|
||||
"""CircularBuffer.__init__()"""
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
from io import BytesIO, IOBase
|
||||
import unittest
|
||||
|
||||
from byteb4rb1e.utils.io import ChunksIO
|
||||
from byteb4rb1e_utils.io import ChunksIO
|
||||
|
||||
|
||||
class TestGetChunkSize(unittest.TestCase):
|
||||
56
tests/unit/byteb4rb1e_utils/string/test_chunked_hash.py
Normal file
56
tests/unit/byteb4rb1e_utils/string/test_chunked_hash.py
Normal file
|
|
@ -0,0 +1,56 @@
|
|||
import unittest
|
||||
|
||||
from byteb4rb1e_utils.string import (
|
||||
ChunkedRollingHash,
|
||||
ChunkedRollingHashOptions,
|
||||
RollingHash,
|
||||
)
|
||||
|
||||
class test___init__(unittest.TestCase):
|
||||
"""ChunkedRollingHash.__init__()"""
|
||||
def test_default(self):
|
||||
"""default options"""
|
||||
result = ChunkedRollingHash(b'abcdefgh')
|
||||
|
||||
self.assertEqual(result._mod, ChunkedRollingHashOptions.mod)
|
||||
self.assertEqual(result._base, ChunkedRollingHashOptions.base)
|
||||
self.assertEqual(result._max_chunk_size, ChunkedRollingHashOptions.max_chunk_size)
|
||||
|
||||
control_hash = RollingHash.compute_initial_hash(
|
||||
b'abcdefgh',
|
||||
base = result._base,
|
||||
mod = result._mod
|
||||
)
|
||||
|
||||
self.assertEqual(result._length, 8)
|
||||
self.assertEqual(result._chunk_count, 1)
|
||||
self.assertEqual(len(result._chunks_hash), result._chunk_count)
|
||||
self.assertEqual(result._hash, control_hash)
|
||||
self.assertEqual(result._chunks_hash[0], control_hash)
|
||||
|
||||
def test_override(self):
|
||||
"""override of options"""
|
||||
options = ChunkedRollingHashOptions(
|
||||
mod = 4,
|
||||
base = 10,
|
||||
max_chunk_size = 5,
|
||||
)
|
||||
result = ChunkedRollingHash(b'abcdefgh', options)
|
||||
|
||||
self.assertEqual(result._mod, options.mod)
|
||||
self.assertEqual(result._base, options.base)
|
||||
self.assertEqual(result._max_chunk_size, options.max_chunk_size)
|
||||
|
||||
control_hash1 = RollingHash.compute_initial_hash(
|
||||
b'abcde',
|
||||
base = result._base,
|
||||
mod = result._mod
|
||||
)
|
||||
control_hash2 = RollingHash.compute_initial_hash(
|
||||
b'fgh',
|
||||
base = result._base,
|
||||
mod = result._mod
|
||||
)
|
||||
|
||||
self.assertEqual(result._chunks_hash[0], control_hash1)
|
||||
self.assertEqual(result._chunks_hash[1], control_hash2)
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
import unittest
|
||||
|
||||
from byteb4rb1e.utils.string import RollingHash
|
||||
from byteb4rb1e_utils.string import RollingHash
|
||||
|
||||
class test_compute_initial_hash(unittest.TestCase):
|
||||
"""RollingHash.compute_initial_hash()
|
||||
54
tox.ini
54
tox.ini
|
|
@ -1,54 +0,0 @@
|
|||
[tox]
|
||||
requires =
|
||||
tox>=4.19
|
||||
env_list =
|
||||
unit-py3{9-13}
|
||||
integration-py3{9-13}-pytest8
|
||||
lint
|
||||
format
|
||||
|
||||
[testenv]
|
||||
deps =
|
||||
.
|
||||
|
||||
[testenv:lint]
|
||||
description = run type check on code base
|
||||
labels = static
|
||||
deps =
|
||||
mypy
|
||||
commands =
|
||||
mypy src tests --junit-xml test-reports/{env_name}.xml
|
||||
|
||||
[testenv:audit]
|
||||
description = run type check on code base
|
||||
labels = audit
|
||||
deps =
|
||||
pip-audit
|
||||
commands =
|
||||
pip-audit .
|
||||
|
||||
[testenv:format]
|
||||
description = run type check on code base
|
||||
labels = static
|
||||
deps =
|
||||
autopep8
|
||||
commands =
|
||||
autopep8 --diff --exit-code src tests
|
||||
|
||||
[testenv:unit-py3{9-13}]
|
||||
description = run type check on code base
|
||||
labels = unit
|
||||
deps =
|
||||
{[testenv]deps}
|
||||
pytest
|
||||
commands =
|
||||
pytest tests/unit --junitxml=test-reports/{env_name}.xml
|
||||
|
||||
[testenv:integration-py3{9-13}-pytest8]
|
||||
description = run pytest integration tests
|
||||
labels = integration
|
||||
deps =
|
||||
{[testenv]deps}
|
||||
pytest8: pytest>=8.0,<=9.0
|
||||
commands =
|
||||
pytest tests/integration -m pytest --junitxml=test-reports/{env_name}.xml
|
||||
Loading…
Add table
Add a link
Reference in a new issue