Compare commits

..

2 commits

Author SHA1 Message Date
Rodney, Tiara
b9a67eb0a5
feat(string): add circular buffer support for KMP search 2025-05-04 03:03:14 +02:00
Rodney, Tiara
6f267c29e6
feat(string): init kmp string search 2025-05-04 02:34:49 +02:00
43 changed files with 2945 additions and 2458 deletions

2
.gitignore vendored
View file

@ -11,5 +11,3 @@
/configure~ /configure~
*.swo *.swo
*.swp *.swp
/test-reports/
/.tox/

View file

@ -1,122 +0,0 @@
# Development
> All changes MUST follow the vendor/tiara-gitflow-spec.git and no work MUST be
> started without a TODO issue.
## Prerequisites
- Python 3.9+
- [Pipenv](https://pipenv.pypa.io/)
- [tox](https://tox.wiki/) (installed via Pipenv dev dependencies)
- Node.js (for the `@byteb4rb1e/mime-todo` issue tracker CLI)
## Setup
Iniitialize Git submodules:
```bash
git submodule update --init --remote --recursive
```
Install dependencies (includes the package in editable mode):
```bash
pipenv install --dev
```
## Tooling
### Package
The project is packaged as `byteb4rb1e.utils` under a namespace package
layout (`src/byteb4rb1e/utils/`). It is installed in editable mode via
Pipenv.
Build a distribution:
```bash
pipenv run dist
```
### Testing
Tests are managed by tox. Test environments are defined in `tox.ini`:
```bash
# run all test suites
tox
# run specific environments
tox -e unit-py313
tox -e lint
tox -e format
```
| Environment | Purpose |
|---|---|
| `unit-py3{9-13}` | Unit tests |
| `smoke-py3{9-13}` | Smoke tests |
| `integration-py3{9-13}` | Integration tests |
| `lint` | Type checking (mypy) |
| `format` | Code style (autopep8) |
| `audit` | Dependency audit (pip-audit) |
### Issue tracker
Issues are tracked in the `TODO` file using the
[MIME TODO](https://specs.code.tiararodney.com/mime-todo/) format. Use the
`@byteb4rb1e/mime-todo` CLI to interact with it:
```bash
# list issues
npx @byteb4rb1e/mime-todo list
# show a specific issue
npx @byteb4rb1e/mime-todo show 3
# create an issue
npx @byteb4rb1e/mime-todo create --type feature --title "Title" --plan "Description" --module homeostat
```
See [CONTRIBUTING.md](CONTRIBUTING.md) for the full issue lifecycle.
### Publishing
Build wheel and source distributions:
```sh
pipenv run sdist
```
Configure publishing options:
`~/.pypirc`
```
[distutils]
index-servers =
tiararodney
[tiararodney]
repository: https://pypi.code.tiararodney.com/root/byteb4rb1e/
username: <username>
password: <password>
```
Publish to pypi.code.tiararodney.com:
```sh
pipenv run sdist:publish:tiarardoney
```
## Project layout
```
src/byteb4rb1e/utils/ # package source
tests/ # test suites (unit/, smoke/, integration/)
vendor/ # vendored specs
dist/ # sdist and wheel build output
DEVELOPMENT.md # this file
TODO # issue tracker (MIME TODO format)
```

View file

24
Makefile Normal file
View file

@ -0,0 +1,24 @@
.PHONY: chore configure
chore: configure Pipfile.lock requirements-dev.txt
Pipfile.lock: .venv Pipfile
.venv/bin/pipenv lock
requirements-dev.txt: .venv Pipfile.lock
.venv/bin/pipenv requirements --dev-only > requirements-dev.txt
configure: configure.ac
autoconf
.venv: requirements-dev.txt
python3 -m venv .venv
.venv/bin/python3 -m pip install --upgrade pip
.venv/bin/pip install -r requirements-dev.txt
test-reports:
.venv/bin/python3 -m unittest discover -v
build: .venv/bin/pipenv
.venv/bin/pipenv run build

20
Pipfile
View file

@ -4,25 +4,17 @@ verify_ssl = true
name = "pypi" name = "pypi"
[dev-packages] [dev-packages]
mypy = "~=1.15.0"
autopep8 = "~=2.3.2"
setuptools-scm = "~=8.2.0" setuptools-scm = "~=8.2.0"
pylint = "~=3.3.6"
build = "*" build = "*"
pipenv = "*" pipenv = "*"
tox = "*" byteb4rb1e-utils = { editable = true, path = '.'}
twine = "*"
pypi-attestations = "*"
autopep8 = "*"
[requires] [requires]
python_version = "3" python_version = "3.11"
[scripts] [scripts]
"dist" = "python3 -m build" "build" = "python3 -m build"
"dist:attestations" = "python3 -m pypi_attestations sign dist/*"
"dist:publish:tiararodney" = "python3 -m twine upload --sign --repository tiararodney dist/*"
"test" = "tox"
"test:static" = "tox run -m static"
"test:unit" = "tox run -m unit"
"test:integration" = "tox run -m integration"
[packages]
"byteb4rb1e.utils" = {file = ".", editable = true}

948
Pipfile.lock generated

File diff suppressed because it is too large Load diff

166
TODO
View file

@ -30,7 +30,7 @@ preserve Git diffing.
ID: [ISSUE-NUMBER] ID: [ISSUE-NUMBER]
Type: [feature/bugfix/hotfix] Type: [feature/bugfix/hotfix]
Title: [Short title] Title: [Short title]
Status: [open/in-progress/done/hold/cancelled] Status: [open/in-progress/done]
Priority: [low/medium/high] Priority: [low/medium/high]
Created: [YYYY-MM-DD] Created: [YYYY-MM-DD]
Description: [Detailed explanation] Description: [Detailed explanation]
@ -50,7 +50,7 @@ Description: [Detailed explanation]
ID: 1 ID: 1
Type: feature Type: feature
Title: implement KMP algorithm for string searching Title: implement KMP algorithm for string searching
Status: hold Status: in-progress
Priority: high Priority: high
Created: 2025-05-03 Created: 2025-05-03
Description: Implement the Knuth-Morris-Pratt algorithm for string searching. Description: Implement the Knuth-Morris-Pratt algorithm for string searching.
@ -79,165 +79,3 @@ Description: move the unit test suites to a unit/ subdirectory so that
integration tests and benchmarks can be cleanly separated integration tests and benchmarks can be cleanly separated
--- ---
ID: 4
Type: feature
Title: implement Rabin-Karp rolling hash algorithm
Status: done
Priority: high
Created: 2025-05-05
Description: After testing a couple of string search algorithms, I've ditched
the idea of using KMP as my use-case gives no advantage compared to
naive searching. In addition I've came upon the challenge that many
string search algorithms are optimized for search on a linear
buffer, which in my case is not applicable as the implementation
the search algorithm is for uses a circular buffer. Rabin-Karp
seemed promising. I've come up with a different approach though,
which is still based on rolling hashes, therefore, as a base, I
need an implementation of the original Rabin-Karp rolling hash
algorithm
---
ID: 5
Type: feature
Title: implement chunked rolling hash algorithm
Status: in-progress
Priority: high
Created: 2025-05-05
Description: Implement my custom algorithm for doing rolling hash string search
against a fixed length ring buffer
---
ID: 6
Type: feature
Title: implement importlib.resources handler for urllib
Status: done
Priority: high
Created: 2025-06-20
Description: A handler that can be registered with an urllib.request
OpenerDirector to open importlib.resources package files.
---
ID: 7
Type: feature
Title: setup advanced testing environment
Status: done
Priority: high
Created: 2025-06-20
Description: copy the testing environment setup from
byteb4rb1e.sphinxcontrib.ext
---
ID: 8
Type: bugfix
Title: rename package
Status: done
Priority: high
Created: 2025-06-20
Description: use dot namespaces to make the package a little more elegant
---
ID: 9
Type: bugfix
Title: fix LICENSE reference
Status: done
Priority: high
Created: 2025-06-20
Description: license specification is no longer a trove classifier in
pyproject.toml, hence the reference to LICENSE must be changed
---
ID: 10
Type: feature
Title: pytest current test context fixtures
Status: done
Priority: high
Created: 2025-06-20
Description: add fixtures for doing things in relation to the active testing
context
---
ID: 11
Type: bugfix
Title: move testing utils out of utils
Status: done
Priority: high
Created: 2025-06-20
Description: to shorten the namespace and also indicate that testing utilities
are different from regular utilities
---
ID: 12
Type: feature
Title: simplify testing.fixtures.mock_pkg
Status: done
Priority: high
Created: 2025-06-21
Description: Only bootstrap a package mock with the minimum requirements for a
Python module and let the consumer handle the directory layout.
---
ID: 13
Type: bugfix
Title: fix unit tests for urllib PkgHandler
Status: done
Priority: high
Created: 2025-06-21
Description: change of issue 12 wasn't properly reflected in urllib PkgHandler
unit tests
---
ID: 14
Type: feature
Title: add compression support for urllib PkgHandler
Status: done
Priority: high
Created: 2025-06-21
Description: with a proper content-type of the PkgHandler addinfourl object, a
consumer can determine whether the file is compressed or not.
---
ID: 15
Type: bugfix
Title: modularize module containers
Status: open
Priority: high
Created: 2025-06-28
Description: Even though importlib can find submodules through traversing paths
instead of relying on __init__.py for every ancestor module, this
is not supported by some modules like sphinx.ext.autosummary
---
ID: 16
Type: feature
Title: SQL-aware dataclass
Status: in-progress
Priority: low
Created: 2025-12-31
Description: A dataclass that transparently maps onto an SQL datastore, with
command generation for syncing data between data class and store
---
ID: 17
Type: feature
Title: recursive-descent HTML (DOM) parser
Status: in-progress
Priority: high
Created: 2025-12-31
Description: Extend the built-in event-driven parser to be modeled after DOM
recursive-descent HTML parser
---

2663
configure vendored Normal file

File diff suppressed because it is too large Load diff

27
configure.ac Normal file
View file

@ -0,0 +1,27 @@
AC_INIT
AC_CHECK_PROGS([MAKE], [make], [no])
AS_IF([test "$MAKE" == "no"],
[AC_MSG_NOTICE([without GNU Make, you have to inspect 'Makefile' and deduce build targets yourself.])])
AC_CHECK_PROGS([GIT], [git], [no])
AS_IF([test "$GIT" == "no"],
[AC_MSG_ERROR([install Git, before continuing.])])
AC_CHECK_PROGS([PYTHON3], [python3], [no])
AS_IF([test "$PYTHON3" == "no"],
[AC_MSG_ERROR([install Python 3, before continuing.])])
# required in Makefile to ensure proper path resolution during preprocessing
# realpath is not available on macOS
AC_CHECK_PROGS([REALPATH], [realpath], [no])
AS_IF([test "$REALPATH" == "no"],
[AC_MSG_ERROR([set a persistent alias for 'realpath', before continuing, e.g.
alias='python3 -c "import pathlib,sys;print(pathlib.Path(sys.argv[[1]]).resolve())"'"
])])
AC_MSG_NOTICE([initializing python3 venv...])
make .venv
AC_OUTPUT

View file

@ -7,12 +7,12 @@ requires = [
build-backend = "setuptools.build_meta" build-backend = "setuptools.build_meta"
[project] [project]
name = "byteb4rb1e.utils" name = "byteb4rb1e-utils"
description = "personal utilities and helpers" description = "personal utilities and helpers"
authors = [ authors = [
{ name = "Tiara Rodney", email = "tiara.rodney@byteb4rb1e.me" } { name = "Tiara Rodney", email = "tiara.rodney@administratrix.de" }
] ]
license-files = ["LICENSE"] license = { file = "LICENSE" }
readme = "README.md" readme = "README.md"
classifiers = [ classifiers = [
"Development Status :: 1 - Planning", "Development Status :: 1 - Planning",
@ -48,6 +48,7 @@ strict = true
max_line_length = 80 max_line_length = 80
aggressive = 3 aggressive = 3
recursive = true recursive = true
in-place = true
[tool.setuptools_scm] [tool.setuptools_scm]

25
requirements-dev.txt Normal file
View file

@ -0,0 +1,25 @@
-i https://pypi.org/simple
astroid==3.3.9; python_full_version >= '3.9.0'
autopep8==2.3.2; python_version >= '3.9'
build==1.2.2.post1; python_version >= '3.8'
-e .
certifi==2025.4.26; python_version >= '3.6'
colorama==0.4.6; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5, 3.6'
dill==0.4.0; python_version >= '3.8'
distlib==0.3.9
filelock==3.18.0; python_version >= '3.9'
isort==6.0.1; python_full_version >= '3.9.0'
mccabe==0.7.0; python_version >= '3.6'
mypy==1.15.0; python_version >= '3.9'
mypy-extensions==1.1.0; python_version >= '3.8'
packaging==25.0; python_version >= '3.8'
pipenv==2025.0.2; python_version >= '3.9'
platformdirs==4.3.7; python_version >= '3.9'
pycodestyle==2.13.0; python_version >= '3.9'
pylint==3.3.6; python_full_version >= '3.9.0'
pyproject-hooks==1.2.0; python_version >= '3.7'
setuptools==80.3.0; python_version >= '3.9'
setuptools-scm==8.2.0; python_version >= '3.8'
tomlkit==0.13.2; python_version >= '3.8'
typing-extensions==4.13.2; python_version >= '3.8'
virtualenv==20.30.0; python_version >= '3.8'

View file

@ -1,14 +0,0 @@
import os
from pathlib import Path
from typing import Tuple
def get_current_test() -> Tuple[Path, str]:
current_test_env = os.getenv("PYTEST_CURRENT_TEST")
if current_test_env is None:
raise RuntimeError("PYTEST_CURRENT_TEST not set. Must be run under pytest.")
suite_path, case_name = current_test_env.split('::', 1)
case_name = case_name.split(' ', 1)[0]
return Path(suite_path).resolve(), case_name

View file

@ -1,47 +0,0 @@
from functools import wraps
from pathlib import Path
import os
import subprocess
import sys
from byteb4rb1e.testing.pytest import get_current_test
def run_in_subprocess_once():
"""
A decorator that reruns th test in a subprocess if not already inside one.
Requires pytest to be installed and test to be run by pytest.
For what? Anything that can't be done in a thread-safe manner, e.g. modifying PYTHON_PATH
"""
def decorator(test_func):
@wraps(test_func)
def wrapper(*args, **kwargs):
if os.environ.get("XPYTEST_INSIDE_SUBPROCESS") == "1":
return test_func(*args, **kwargs)
suite_path, case_name = get_current_test()
cmd = [
sys.executable,
"-m", "pytest",
f"{suite_path}::{case_name}",
]
result = subprocess.run(
cmd,
env={**os.environ, "XPYTEST_INSIDE_SUBPROCESS": "1"},
capture_output=True,
text=True,
)
if result.returncode != 0:
print(' '.join(cmd))
print("==== Subprocess stdout ====")
print(result.stdout)
print("==== Subprocess stderr ====")
print(result.stderr)
raise AssertionError(f"Subprocess test failed with exit code {result.returncode}")
return wrapper
return decorator

View file

@ -1,44 +0,0 @@
import os
from pathlib import Path
import sys
from typing import Dict, Tuple, Union
import pytest
from byteb4rb1e.testing.pytest import get_current_test
_SITE_PACKAGE_COUNTER: Dict[str, int] = {}
@pytest.fixture
def current_test() -> Tuple[Path, str]:
"""
"""
return get_current_test()
@pytest.fixture
def mock_system_site_package_dir(tmp_path):
global _SITE_PACKAGE_COUNTER
package_id = _SITE_PACKAGE_COUNTER.setdefault(tmp_path, 0)
_SITE_PACKAGE_COUNTER[tmp_path] += 1
sys_path = tmp_path / str(package_id)
def _create(name: str) -> Path:
pkg_path = sys_path / name.replace('.', os.path.sep)
pkg_path.mkdir(parents=True)
(pkg_path / "__init__.py").touch()
sys.path.insert(0, str(sys_path))
return pkg_path
yield _create
# cleanup sys.path after test
if str(sys_path) in sys.path:
sys.path.remove(str(sys_path))

View file

@ -1,6 +0,0 @@
"""Utilities for building composable CLIs from command dataclasses."""
from byteb4rb1e.utils.argparse.command import CLICommand
from byteb4rb1e.utils.argparse.dispatcher import CLI
__all__ = ["CLI", "CLICommand"]

View file

@ -1,54 +0,0 @@
"""Base command dataclass for composable CLI trees."""
from __future__ import annotations
from argparse import ArgumentParser
from dataclasses import dataclass, fields
from typing import Any, ClassVar, Dict, List, Optional, Type
@dataclass
class CLICommand:
"""Base class for CLI commands.
Subclasses define their identity (name, help, description) as
dataclass fields. These are passed as kwargs to
``subparsers.add_parser()``.
Override ``add_arguments`` to register flags and positionals.
Override ``execute`` to implement the command's logic.
Nest subcommands by setting ``_subcommands`` as a class variable.
"""
name: str = ""
help: str = ""
description: str = ""
_subcommands: ClassVar[List[Type[Command]]] = []
def add_arguments(self, parser: ArgumentParser) -> None:
"""Add arguments to the parser. Override in subclasses."""
def execute(self, args: Any) -> int:
"""Run the command. Override in subclasses.
Returns an exit code (0 = success).
"""
return 0
def parser_kwargs(self) -> Dict[str, Any]:
"""Return the dataclass fields as kwargs for add_parser.
Excludes ``name`` (used as the positional parser name) and
any empty-string fields so argparse defaults apply.
"""
skip = {"name"}
kwargs = {}
for f in fields(self):
if f.name in skip or f.name.startswith("_"):
continue
val = getattr(self, f.name)
if val != "":
kwargs[f.name] = val
return kwargs

View file

@ -1,122 +0,0 @@
"""CLI dispatcher — builds parser trees from command dataclasses."""
from __future__ import annotations
import logging
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from typing import Any, Dict, List, Optional, Type
from byteb4rb1e.utils.argparse.command import CLICommand
class CLI:
"""Composable CLI built from a tree of Command dataclasses.
Recursively bootstraps an argparse parser hierarchy and tracks
dest names so ``run()`` can dispatch to the correct leaf command
without dest chaining in the caller.
Usage::
cli = CLI(prog="repository", description="...")
cli.bootstrap([MirrorCommand, IndexCommand])
cli.run()
"""
def __init__(
self,
prog: Optional[str] = None,
description: str = "",
) -> None:
kwargs = {} # type: Dict[str, Any]
if prog:
kwargs["prog"] = prog
if description:
kwargs["description"] = description
kwargs.setdefault(
"formatter_class", ArgumentDefaultsHelpFormatter,
)
self.parser = ArgumentParser(**kwargs)
self._dests = [] # type: List[str]
self._commands = {} # type: Dict[str, Command]
def add_arguments(self, parser: ArgumentParser) -> None:
"""Add global arguments to the root parser."""
parser.add_argument(
"-v", "--verbose", action="count", default=0,
help="Increase verbosity (-v for INFO, -vv for DEBUG)",
)
def bootstrap(
self,
commands: List[Type[Command]],
) -> None:
"""Build the parser tree from a list of top-level commands."""
self.add_arguments(self.parser)
dest = "command"
self._dests.append(dest)
sub = self.parser.add_subparsers(dest=dest)
for cmd_cls in commands:
self._add(sub, cmd_cls, prefix="")
def _add(
self,
subparsers: Any,
cmd_cls: Type[Command],
prefix: str,
) -> None:
"""Recursively add a command and its subcommands."""
cmd = cmd_cls()
parser = subparsers.add_parser(
cmd.name,
formatter_class=ArgumentDefaultsHelpFormatter,
**cmd.parser_kwargs(),
)
cmd.add_arguments(parser)
key = "%s.%s" % (prefix, cmd.name) if prefix else cmd.name
self._commands[key] = cmd
if cmd._subcommands:
dest = "%s_command" % cmd.name
self._dests.append(dest)
child_sub = parser.add_subparsers(dest=dest)
for sc_cls in cmd._subcommands:
self._add(child_sub, sc_cls, prefix=key)
def _resolve(self, args: Any) -> Optional[Command]:
"""Walk dest chain to find the leaf command."""
parts = [] # type: List[str]
for dest in self._dests:
val = getattr(args, dest, None)
if val is None:
continue
parts.append(val)
if not parts:
return None
key = ".".join(parts)
return self._commands.get(key)
@staticmethod
def _setup_logging(verbosity: int) -> None:
if verbosity >= 2:
level = logging.DEBUG
elif verbosity >= 1:
level = logging.INFO
else:
level = logging.WARNING
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()],
)
def run(self) -> None:
"""Parse args and dispatch to the leaf command."""
args = self.parser.parse_args()
self._setup_logging(getattr(args, "verbose", 0))
cmd = self._resolve(args)
if cmd is None:
self.parser.print_help()
raise SystemExit(1)
raise SystemExit(cmd.execute(args))

View file

@ -1,109 +0,0 @@
#!/usr/bin/env python3
"""Generic HTTP client.
Thin urllib wrapper with retry-on-rate-limit. No domain knowledge
GitHub, Bitbucket, etc. are handled by higher-level modules.
"""
import json
import time
from typing import Any, Dict, Optional
import urllib.request
import urllib.parse
from warnings import warn
class HttpResponse:
def __init__(self, status: int, headers: dict, data: bytes, reason: str):
self.status_code = status
self.headers = headers
self.data = data
self.reason = reason
self.text = data.decode("utf-8", errors="replace")
def json(self):
return json.loads(self.data.decode("utf-8"))
def _request(
url: str,
method: str = "GET",
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
data: Optional[bytes] = None,
) -> HttpResponse:
# TODO: do proper exponential backoff
backoff = [1, 2, 4]
if params:
query = urllib.parse.urlencode(params)
url = f"{url}?{query}"
req = urllib.request.Request(
url,
headers=headers or {},
method=method,
data=data,
)
for delay in backoff:
try:
with urllib.request.urlopen(req, timeout=30) as resp:
status = resp.getcode()
resp_data = resp.read()
resp_headers = dict(resp.getheaders())
if status == 429:
warn(f"Rate-limited on {url} (HTTP {status})."
f" Backing off {delay}s...")
time.sleep(delay)
continue
return HttpResponse(
status, resp_headers, resp_data, resp.reason,
)
except urllib.error.HTTPError as e:
status = e.code
err_data = e.read()
err_headers = dict(e.headers.items())
if status == 429:
warn(f"Rate-limited on {url} (HTTP {status})."
f" Backing off {delay}s...")
time.sleep(delay)
continue
return HttpResponse(
status, err_headers, err_data, e.reason,
)
except urllib.error.URLError as e:
raise Exception(
"Network error on %s: %s", url, e,
) from e
# If all retries exhausted, return last error-like response
return HttpResponse(503, {}, b"", "Service unavailable")
def get(
url: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
) -> HttpResponse:
return _request(url, method="GET", params=params, headers=headers)
def post(
url: str,
data: Optional[bytes] = None,
headers: Optional[Dict[str, str]] = None,
) -> HttpResponse:
return _request(url, method="POST", headers=headers, data=data)
def put(
url: str,
data: Optional[bytes] = None,
headers: Optional[Dict[str, str]] = None,
) -> HttpResponse:
return _request(url, method="PUT", headers=headers, data=data)

View file

@ -1,78 +0,0 @@
#!/usr/bin/env python3
"""Bitbucket Cloud REST API v2.0 wrapper.
Thin layer over http.py for Bitbucket-specific operations:
- Bearer token authentication
- Repository existence checks
- Repository creation within a workspace/project
"""
import json
from typing import Any, Dict, Optional
from byteb4rb1e.utils.http import client as http_client
BITBUCKET_API = "https://api.bitbucket.org/2.0"
def http_headers(token: str) -> Dict[str, str]:
"""Construct Bitbucket API headers with Bearer token auth."""
return {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
"Content-Type": "application/json",
}
def repository_exists(
workspace: str,
repo_slug: str,
token: str,
) -> bool:
"""Check whether a repository exists in the workspace."""
url = f"{BITBUCKET_API}/repositories/{workspace}/{repo_slug}"
resp = http_client.get(url, headers=http_headers(token))
return resp.status_code == 200
def create_repository(
workspace: str,
repo_slug: str,
token: str,
project: Optional[str] = None,
description: str = "",
is_private: bool = True,
) -> http_client.HttpResponse:
"""Create a new repository in the workspace.
When *project* is given the repository is assigned to that
Bitbucket project (by key). This is required for workspaces
that scope access keys at the project level.
Returns the API response. Caller should check status_code == 200
for success.
"""
url = f"{BITBUCKET_API}/repositories/{workspace}/{repo_slug}"
body: Dict[str, Any] = {
"scm": "git",
"is_private": is_private,
"description": description,
"fork_policy": "no_forks",
}
if project:
body["project"] = {"key": project}
return http_client.put(
url,
data=json.dumps(body).encode("utf-8"),
headers=http_headers(token),
)
def clone_url(
workspace: str,
repo_slug: str,
) -> str:
"""Return the SSH clone URL for a Bitbucket repository."""
return f"git@bitbucket.org:{workspace}/{repo_slug}.git"

View file

@ -1,65 +0,0 @@
#!/usr/bin/env python3
import hashlib
from pathlib import Path
from typing import Any, Dict, List, Optional
from byteb4rb1e.utils.http import client as http_client
GITHUB_API = "https://api.github.com"
def http_headers(token: Optional[str]) -> Dict[str, str]:
headers = {
"Accept": "application/vnd.github+json",
"User-Agent": "sphinx-h5p-worker1"
}
if token:
# Use standard PAT header; token not logged anywhere.
headers["Authorization"] = f"Bearer {token}"
return headers
def blob_sha(path: Path) -> str:
"""Calculate Git blob SHA-1 for a file, matching GitHub API 'sha'."""
data = path.read_bytes()
header = f"blob {len(data)}\0".encode("utf-8")
store = header + data
return hashlib.sha1(store).hexdigest()
def list_org_repos(org: str, token: Optional[str]) -> List[Dict[str, Any]]:
repos: List[Dict[str, Any]] = []
page = 1
per_page = 100
while True:
url = f"{GITHUB_API}/orgs/{org}/repos"
resp = http_client.get(
url,
params={"page": page, "per_page": per_page, "type": "public"},
headers=http_headers(token),
)
if resp.status_code != 200:
raise RuntimeError(f"Failed to list repos for org {org}: {resp.status_code} {resp.text}")
batch = resp.json()
if not batch:
break
repos.extend(batch)
page += 1
return repos
def fetch_file(
org: str,
repo: str,
path: str,
token: str
) -> http_client.HttpResponse:
"""
"""
url = f"{GITHUB_API}/repos/{org}/{repo}/{path}"
return http_client.get(
url,
headers=http_headers(token),
)

View file

@ -1,91 +0,0 @@
from typing import Optional
class RollingHash:
"""implementation of Rabin-Karp rolling hash
"""
#: default base
base: int = 31
#: default modulus
mod: int = 10**9 + 7
#: current computed hash
_hash: int
#: prime number base (e.g., 31)
_base: int
#: large prime modulus (to prevent overflow)
_mod: int
# Precomputation of ``base^(length-1) % mod`` for removing the old byte when
# rolling over
_hbase_factor: int
def __init__(
self,
data: bytes,
base: Optional[int] = None,
mod: Optional[int] = None
):
"""Initialize the rolling hash with a given base and modulus.
base: Prime number base (e.g., 31)
mod: Large prime modulus to prevent overflow
length: Length of the pattern to match
"""
self._base = base if base else RollingHash.base
self._mod = mod if mod else RollingHash.mod
self._hash = RollingHash.compute_initial_hash(
data,
self._base,
self._mod
)
self._hbase_factor = pow(self._base, len(data) - 1, self._mod)
@staticmethod
def compute_initial_hash(
data: bytes,
base: int,
mod: int,
) -> int:
"""Compute the hash for the initial window (first `length` bytes).
rather use this standalone for computing the hash of the search pattern,
to avoid the overhead of instantiating an object.
:param data: data to build hash for
:param base:
:param: mod:
:returns: hash of data
"""
hash_ = 0
for i in range(len(data)):
# computing the modulus at each iteration, as to avoid the summed
# integer to be chunky, as in HUUUUGEE...
hash_ = (hash_ * base + data[i]) % mod
return hash_
def roll(self, old_byte: int, new_byte: int) -> int:
"""Efficiently update hash by removing ``old_byte`` and adding
``new_byte``
The old_byte removal uses a pre-computed value of the highest base used
in the polynomial calculation. This speeds things up a bit.
I was thinking about a way on how to store the old_byte efficiently
within the class object, but that would require storing the entire data,
basically doubling the memory consumption as the data must definetly
also live outside of the class object. A memoryview could solve this
problem, but at the cost of making the implementation more complex, so
this will have to do.
:param old_byte: The ordinal of the first byte in buffer to roll over
:param new_byte: The ordinal of the byte newly appended to the buffer
"""
# Remove old
self._hash = (self._hash - old_byte * self._hbase_factor) % self.mod
# Add new
self._hash = (self._hash * self.base + new_byte) % self.mod
return self._hash

View file

@ -1,41 +0,0 @@
import email
import importlib.resources
import mimetypes
from urllib.request import URLError
import urllib.request
class PkgHandler(urllib.request.BaseHandler):
"""
"""
def pkg_open(self, req) -> urllib.request.addinfourl:
pkg_files = importlib.resources.files(req.host)
try:
fh = next(
pkg_files.glob(req.selector.lstrip('//'))
).open('rb')
except Exception as e:
raise URLError(f'{e.__class__.__name__}: {e}') from e
fh.seek(0, 2);
size = fh.tell();
fh.seek(0);
mtype, compression = mimetypes.guess_type(req.selector)
if compression and mtype:
mtype = f"{mtype}+{compression}"
headers = email.message_from_string(
'Content-Type: %s\nContent-Length: %d\n' %
(mtype or 'text/plain', size)
)
if not mtype or mtype.startswith('text/'):
fh.close()
fh = next(
pkg_files.glob(req.selector.lstrip('//'))
).open('r')
return urllib.request.addinfourl(fh, headers, None)

View file

@ -1,345 +0,0 @@
#!/usr/bin/env python3
"""Git subprocess wrapper for repository operations.
Provides primitives for mirror cloning, syncing, remote management,
file extraction from bare repos, and submodule management.
No pygit2 or gitpython, uses subprocess only.
"""
import logging
import subprocess
from pathlib import Path
from typing import List, Optional
logger = logging.getLogger(__name__)
class GitError(Exception):
"""A git subprocess returned a non-zero exit code."""
def __init__(self, args: List[str], returncode: int, stderr: str):
self.args_list = args
self.returncode = returncode
self.stderr = stderr
super().__init__(
f"git exited {returncode}: {' '.join(args)}\n{stderr}"
)
def parse_base_url(base_url: str) -> str:
"""Extract workspace from an SCP-style Bitbucket base URL.
The host part must be exactly ``bitbucket.org`` bootstrapping
requires the Bitbucket API, so other hosts are rejected.
>>> _parse_base_url("git@bitbucket.org:byteb4rb1e")
'byteb4rb1e'
"""
# SCP-style: git@bitbucket.org:workspace
if ":" not in base_url or "//" in base_url:
raise ValueError(
f"Expected SCP-style URL (git@bitbucket.org:workspace), "
f"got: {base_url}"
)
host_part, workspace = base_url.split(":", 1)
# host_part is e.g. "git@bitbucket.org"
host = host_part.split("@", 1)[-1]
if host != "bitbucket.org":
raise ValueError(
f"Mirror base URL must target bitbucket.org, "
f"got host: {host}"
)
return Path(workspace).parent
def parse_repo_name(base_url: str) -> str:
"""Extract workspace from an SCP-style Bitbucket base URL.
The host part must be exactly ``bitbucket.org`` bootstrapping
requires the Bitbucket API, so other hosts are rejected.
>>> _parse_base_url("git@bitbucket.org:byteb4rb1e")
'byteb4rb1e'
"""
# SCP-style: git@bitbucket.org:workspace
if ":" not in base_url or "//" in base_url:
raise ValueError(
f"Expected SCP-style URL (git@bitbucket.org:workspace), "
f"got: {base_url}"
)
host_part, workspace = base_url.split(":", 1)
# host_part is e.g. "git@bitbucket.org"
host = host_part.split("@", 1)[-1]
if host != "bitbucket.org":
raise ValueError(
f"Mirror base URL must target bitbucket.org, "
f"got host: {host}"
)
return Path(workspace).name.split('.')[0]
def _run(
args: List[str],
cwd: Optional[Path] = None,
capture_stdout: bool = False,
) -> subprocess.CompletedProcess: # type: ignore[type-arg]
"""Run a git command, raising GitError on failure."""
cmd = ["git"] + args
logger.debug("$ %s", " ".join(cmd))
result = subprocess.run(
cmd,
cwd=cwd,
capture_output=True,
text=True,
)
if result.returncode != 0:
raise GitError(cmd, result.returncode, result.stderr.strip())
return result
def mirror_clone(source_url: str, dest: Path) -> None:
"""Clone a repository as a bare mirror.
Equivalent to ``git clone --mirror <source_url> <dest>``.
The destination directory must not already exist.
"""
_run(["clone", "--mirror", source_url, str(dest)])
logger.info("Cloned mirror %s%s", source_url, dest)
def add_remote(repo: Path, name: str, url: str) -> None:
"""Add a named remote to a bare repository."""
_run(["remote", "add", name, url], cwd=repo)
logger.debug("Added remote %s%s in %s", name, url, repo)
def has_remote(repo: Path, name: str) -> bool:
"""Check whether a named remote exists."""
result = _run(["remote"], cwd=repo)
return name in result.stdout.splitlines()
def mirror_update(repo: Path) -> None:
"""Fetch all remotes in a bare mirror repository.
Equivalent to ``git remote update`` inside the bare repo.
"""
_run(["remote", "update"], cwd=repo)
logger.debug("Updated remotes in %s", repo)
def fetch(repo: Path, remote: str = "origin") -> None:
"""Fetch from a single remote."""
_run(["fetch", remote], cwd=repo)
logger.debug("fetched %s in %s", remote, repo)
def show_ref(repo: Path) -> str:
"""Return the raw output of ``git show-ref`` (all refs + SHAs).
Returns an empty string if the repo has no refs.
"""
try:
result = _run(["show-ref"], cwd=repo)
return result.stdout
except GitError:
return ""
def ls_remote(repo: Path, remote: str) -> str:
"""Return the raw output of ``git ls-remote <remote>``.
Returns an empty string if the remote has no refs or on error.
"""
try:
result = _run(["ls-remote", remote], cwd=repo)
return result.stdout
except GitError:
return ""
def mirror_push(repo: Path, remote: str) -> None:
"""Push the full mirror to a remote.
Equivalent to ``git push --mirror <remote>``.
"""
_run(["push", "--mirror", remote], cwd=repo)
logger.info("Pushed mirror to %s from %s", remote, repo)
def read_file(
repo: Path,
filepath: str,
ref: str = "HEAD",
) -> Optional[str]:
"""Extract a file's contents from a bare repo without checkout.
Returns the file content as a string, or None if the file does
not exist at the given ref.
"""
try:
result = _run(
["show", f"{ref}:{filepath}"],
cwd=repo,
capture_stdout=True,
)
return result.stdout
except GitError:
return None
# -------------------------------------------------------------------
# Ref / tag primitives
# -------------------------------------------------------------------
def list_tags(repo: Path) -> List[str]:
"""List all tags in a repository."""
result = _run(["tag", "-l"], cwd=repo)
return [t for t in result.stdout.splitlines() if t]
def resolve_ref(repo: Path, ref: str) -> str:
"""Resolve a ref to a full SHA.
Raises GitError if the ref cannot be resolved.
"""
result = _run(
["rev-parse", ref], cwd=repo, capture_stdout=True,
)
return result.stdout.strip()
def head_ref(repo: Path) -> str:
"""Return the full SHA of HEAD."""
return resolve_ref(repo, "HEAD")
# -------------------------------------------------------------------
# Pull-through bare clone cache
# -------------------------------------------------------------------
def bare_path_for_url(url: str, cache_dir: Path) -> Path:
"""Derive a cache path from a clone URL.
Strips scheme/host, keeps the path component, appends ``.git``.
Examples::
https://github.com/h5p/h5p-multi-choice
cache_dir / h5p / h5p-multi-choice.git
git@github.com:h5p/h5p-multi-choice.git
cache_dir / h5p / h5p-multi-choice.git
"""
# Handle SCP-style URLs (git@host:path)
if ":" in url and "//" not in url:
path_part = url.split(":", 1)[1]
else:
# Strip scheme + host
from urllib.parse import urlparse
parsed = urlparse(url)
path_part = parsed.path.lstrip("/")
# Strip trailing .git if present, then re-add it
if path_part.endswith(".git"):
path_part = path_part[:-4]
return cache_dir / (path_part + ".git")
def ensure_bare_clone(url: str, cache_dir: Path) -> Path:
"""Ensure a bare mirror clone exists in *cache_dir*.
If the bare repo already exists, fetches updates via
``mirror_update``. Otherwise, creates a new mirror clone.
Returns the path to the bare repo.
"""
bare_path = bare_path_for_url(url, cache_dir)
if bare_path.exists():
mirror_update(bare_path)
logger.debug("Updated existing cache %s", bare_path)
else:
bare_path.parent.mkdir(parents=True, exist_ok=True)
mirror_clone(url, bare_path)
logger.info("Cached new bare clone %s", bare_path)
return bare_path
# -------------------------------------------------------------------
# Submodule operations
# -------------------------------------------------------------------
def has_submodule(repo: Path, path: str) -> bool:
"""Check whether a submodule is registered at *path*.
Reads ``.gitmodules`` to determine whether the submodule exists.
*path* is resolved relative to *repo*, then compared against
the repository root so the check works when *repo* is a
subdirectory of the actual git working tree.
Returns False if ``.gitmodules`` does not exist.
"""
try:
toplevel = Path(
_run(
["rev-parse", "--show-toplevel"], cwd=repo,
).stdout.strip()
)
except GitError:
return False
gitmodules = toplevel / ".gitmodules"
if not gitmodules.is_file():
return False
# Resolve the full path relative to the repo root
full_path = (repo / path).resolve()
try:
rel_path = str(full_path.relative_to(toplevel.resolve()))
except ValueError:
return False
try:
result = _run(
["config", "--file", str(gitmodules),
"--get-regexp", r"submodule\..*\.path"],
cwd=toplevel,
)
except GitError:
return False
for line in result.stdout.splitlines():
parts = line.split(None, 1)
if len(parts) == 2 and parts[1] == rel_path:
return True
return False
def submodule_add(repo: Path, url: str, path: str) -> None:
"""Add a git submodule at *path* pointing to *url*.
Equivalent to ``git submodule add <url> <path>`` inside *repo*.
"""
_run(["submodule", "add", url, path], cwd=repo)
logger.info("Added submodule %s%s", url, path)
def submodule_update(repo: Path, path: str) -> None:
"""Fetch and update a submodule to the latest remote HEAD.
Enters the submodule directory, fetches origin, and checks out
the latest commit on the remote default branch.
"""
sub_path = repo / path
_run(["fetch", "origin"], cwd=sub_path)
# Determine default branch from remote HEAD
result = _run(
["symbolic-ref", "refs/remotes/origin/HEAD",
"--short"],
cwd=sub_path,
)
default_branch = result.stdout.strip()
_run(["checkout", default_branch], cwd=sub_path)
logger.info("Updated submodule %s to %s", path, default_branch)
def submodule_checkout(repo: Path, path: str, ref: str) -> None:
"""Fetch and checkout a specific ref in a submodule."""
sub_path = repo / path
_run(["fetch", "origin"], cwd=sub_path)
_run(["checkout", ref], cwd=sub_path)
logger.info("Checked out submodule %s at %s", path, ref)

View file

@ -1,7 +1,7 @@
from dataclasses import dataclass from dataclasses import dataclass
from http.server import SimpleHTTPRequestHandler from http.server import SimpleHTTPRequestHandler
from byteb4rb1e.utils.io import ChunksIO from byteb4rb1e_utils.io import ChunksIO
@dataclass @dataclass

View file

@ -8,12 +8,12 @@ from http.server import HTTPServer
from io import BytesIO, IOBase from io import BytesIO, IOBase
from typing import Optional, Tuple, List from typing import Optional, Tuple, List
from byteb4rb1e.utils.http.server import ( from byteb4rb1e_utils.http.server import (
HandlerOptions, HandlerOptions,
MultipartUploadHandler, MultipartUploadHandler,
ServerOptions, ServerOptions,
) )
from byteb4rb1e.utils.io import ChunksIO from byteb4rb1e_utils.io import ChunksIO
__doc__ = """tsmuds - Tiara's Simple Multipart Upload Debugging Server __doc__ = """tsmuds - Tiara's Simple Multipart Upload Debugging Server

View file

@ -0,0 +1,68 @@
from typing import List, Union
from byteb4rb1e_utils.collections import CircularBuffer
class KnuthMorrisPratt:
"""Knuth-Morris-Pratt string searching algorithm implemented as a class
https://gwern.net/doc/cs/algorithm/1977-knuth.pdf
"""
def __init__(self, pattern: bytes):
"""
"""
self._table = KnuthMorrisPratt.build_table(pattern)
self._pattern = pattern
@staticmethod
def build_table(pattern: bytes) -> List[int]:
"""builds the failure table
"""
table = [0] * len(pattern)
j = 0
for i in range(1, len(pattern)):
while j > 0 and pattern[i] != pattern[j]:
j = table[j - 1]
if pattern[i] == pattern[j]:
j += 1
table[i] = j
return table
def match_linear(
self,
data: Union[bytes, bytearray, memoryview],
start: int = 0
) -> bool:
"""match against a linear fixed-size buffer
:returns: index of the match or -1 if not found
"""
m, j = len(self.pattern), 0
for i in range(start, len(data)):
while j > 0 and data[i] != self.pattern[j]:
j = self._table[j - 1]
if data[i] == self.pattern[j]:
j += 1
if j == m:
return i - m + 1
return -1
def match_circular(self, data: CircularBuffer):
"""Finds the boundary using KMP, handling circular wraparound."""
i, j = data.start, 0 # Start checking from the oldest data in the buffer
while j < len(boundary) and (data.filled or i != data.end):
if data.buf[i] == self.pattern[j]:
i = (i + 1) % data.size
j += 1
if j == len(self.pattern): # Full match found
return True
elif j > 0:
j = table[j - 1]
else:
i = (i + 1) % data.size
return False

View file

@ -1,33 +0,0 @@
import os
from pathlib import Path
import pytest
pytestmark = pytest.mark.pytest
from byteb4rb1e.testing.pytest import get_current_test
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
class Test_get_current_test:
"""
"""
def test_default(self):
"""
"""
os.environ['PYTEST_CURRENT_TEST'] = 'foo::bar (something)'
result = get_current_test()
assert isinstance(result[0], Path)
assert str(result[0].name) == 'foo'
assert result[1] == 'bar'
def test_invalid(self):
"""
"""
del os.environ['PYTEST_CURRENT_TEST']
with pytest.raises(RuntimeError):
get_current_test()

View file

@ -1,21 +0,0 @@
from pathlib import Path
import pytest
pytestmark = pytest.mark.pytest
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
@run_in_subprocess_once()
def test_run_in_subprocess_once(tmp_path):
marker = tmp_path / "executed_in_subprocess.txt"
if marker.exists():
raise AssertionError("Marker file exists before test logic ran (shouldn't happen in parent process)")
# Create proof of execution
marker.write_text("Subprocess was here.")
# Now assert it
assert marker.exists()

View file

@ -1,38 +0,0 @@
from pathlib import Path
import importlib.resources
import pytest
pytestmark = pytest.mark.pytest
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
from byteb4rb1e.testing.pytest.fixtures import (
current_test,
mock_system_site_package_dir
)
def test_current_test(current_test):
"""
"""
suite_path, case_name = current_test
assert str(Path(__file__)) == str(suite_path)
assert case_name == "test_current_test"
@run_in_subprocess_once()
def test_mock_system_site_package_dir(mock_system_site_package_dir):
"""
"""
dummy_data = 'Hello'
pkgdir = mock_system_site_package_dir('foobarpkg')
(pkgdir / 'data.txt').write_text(dummy_data)
assert (pkgdir / '__init__.py').exists()
result = next(importlib.resources.files('foobarpkg').glob('data.txt')).read_text()
assert result == dummy_data

View file

@ -1,5 +0,0 @@
def pytest_configure(config):
# register an additional marker
config.addinivalue_line(
"markers", "pytest: test pytest integration"
)

View file

@ -1,61 +0,0 @@
import unittest
from byteb4rb1e.utils.string import RollingHash
class test_compute_initial_hash(unittest.TestCase):
"""RollingHash.compute_initial_hash()
i calculated the hashes by hand, as a basis for this test case. Hopefully
there's no logical flaw...
"""
def test_default(self):
"""computation of hash"""
result = RollingHash.compute_initial_hash(
b'abcdefg',
base = 31,
mod = 10**9 + 7
)
self.assertEqual(result, 988021244)
class test___init__(unittest.TestCase):
"""RollingHash.__init__()
Make sure the class instance is initialized correctly
I calculated the hashes by hand, as a basis for this test case. Hopefully
there's no logical flaw...
"""
def test_default(self):
"""computation of initial hash and highest base factor"""
instance = RollingHash(b'abcdefg')
self.assertEqual(instance._hash, 988021244)
self.assertEqual(instance._hbase_factor, 887503681)
def test_defaults_override(self):
"""override of defaults"""
instance = RollingHash(
b'abcdefg',
base = 9,
mod = 4
)
self.assertEqual(instance._mod, 4)
self.assertEqual(instance._base, 9)
class test_roll(unittest.TestCase):
"""RollingHash.roll()"""
def test_rolling_hash(self):
base=31
mod=10**9 + 7
rh = RollingHash(b"foobar", base=base, mod=mod)
rolled_hash = rh.roll(ord("f"), ord("n"))
control_hash = RollingHash.compute_initial_hash(b"oobarn", base, mod)
self.assertEqual(rolled_hash, control_hash)

View file

@ -1,93 +0,0 @@
import os.path
import sys
import urllib.request
import pytest
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
from byteb4rb1e.testing.pytest.fixtures import mock_system_site_package_dir
from byteb4rb1e.utils.urllib.request import PkgHandler
class TestPkgHandler:
"""
"""
@run_in_subprocess_once()
def test_text(self, mock_system_site_package_dir):
"""
"""
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
PkgHandler()
)
dummy_data = 'Hello'
pkg_dir = mock_system_site_package_dir('foobarpkg')
(pkg_dir / 'data.txt').write_text(dummy_data)
result = _opener.open('pkg://foobarpkg/data.txt').readline()
assert isinstance(result, str)
assert result == dummy_data
@run_in_subprocess_once()
def test_bytes(self, mock_system_site_package_dir):
"""
"""
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
PkgHandler()
)
dummy_data = b'foobar123'
pkg_dir = mock_system_site_package_dir('foobarpkg')
(pkg_dir / 'data.bin').write_bytes(dummy_data)
result = _opener.open('pkg://foobarpkg/data.bin').readline()
assert isinstance(result, bytes)
assert result == dummy_data
@run_in_subprocess_once()
def test_subdir(self, mock_system_site_package_dir):
"""
"""
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
PkgHandler()
)
dummy_data = 'foobar123'
pkg_dir = mock_system_site_package_dir('foobarpkg')
dummy_file = (pkg_dir / 'foo' / 'bar' / 'data.txt')
dummy_file.parent.mkdir(parents=True)
dummy_file.write_text(dummy_data)
result = _opener.open('pkg://foobarpkg/foo/bar/data.txt').readline()
assert result == dummy_data
@run_in_subprocess_once()
def test_nested_module(self, mock_system_site_package_dir):
"""
"""
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
PkgHandler()
)
dummy_data = 'foobar123'
pkg_dir = mock_system_site_package_dir('foo.bar.pkg')
dummy_file = (pkg_dir / 'dummy' / 'data.txt')
dummy_file.parent.mkdir(parents=True)
dummy_file.write_text(dummy_data)
result = _opener.open('pkg://foo.bar.pkg/dummy/data.txt').readline()
assert result == dummy_data

View file

@ -1,6 +1,6 @@
import unittest import unittest
from byteb4rb1e.utils.collections import CircularBuffer from byteb4rb1e_utils.collections import CircularBuffer
class test_init(unittest.TestCase): class test_init(unittest.TestCase):
"""CircularBuffer.__init__()""" """CircularBuffer.__init__()"""

View file

@ -1,7 +1,7 @@
from io import BytesIO, IOBase from io import BytesIO, IOBase
import unittest import unittest
from byteb4rb1e.utils.io import ChunksIO from byteb4rb1e_utils.io import ChunksIO
class TestGetChunkSize(unittest.TestCase): class TestGetChunkSize(unittest.TestCase):

54
tox.ini
View file

@ -1,54 +0,0 @@
[tox]
requires =
tox>=4.19
env_list =
unit-py3{9-13}
integration-py3{9-13}-pytest8
lint
format
[testenv]
deps =
.
[testenv:lint]
description = run type check on code base
labels = static
deps =
mypy
commands =
mypy src tests --junit-xml test-reports/{env_name}.xml
[testenv:audit]
description = run type check on code base
labels = audit
deps =
pip-audit
commands =
pip-audit .
[testenv:format]
description = run type check on code base
labels = static
deps =
autopep8
commands =
autopep8 --diff --exit-code src tests
[testenv:unit-py3{9-13}]
description = run type check on code base
labels = unit
deps =
{[testenv]deps}
pytest
commands =
pytest tests/unit --junitxml=test-reports/{env_name}.xml
[testenv:integration-py3{9-13}-pytest8]
description = run pytest integration tests
labels = integration
deps =
{[testenv]deps}
pytest8: pytest>=8.0,<=9.0
commands =
pytest tests/integration -m pytest --junitxml=test-reports/{env_name}.xml