Compare commits
2 commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b9a67eb0a5 | ||
|
|
6f267c29e6 |
51 changed files with 2997 additions and 3954 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
|
@ -11,5 +11,3 @@
|
|||
/configure~
|
||||
*.swo
|
||||
*.swp
|
||||
/test-reports/
|
||||
/.tox/
|
||||
|
|
|
|||
122
DEVELOPMENT.md
122
DEVELOPMENT.md
|
|
@ -1,122 +0,0 @@
|
|||
# Development
|
||||
|
||||
> All changes MUST follow the vendor/tiara-gitflow-spec.git and no work MUST be
|
||||
> started without a TODO issue.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- Python 3.9+
|
||||
- [Pipenv](https://pipenv.pypa.io/)
|
||||
- [tox](https://tox.wiki/) (installed via Pipenv dev dependencies)
|
||||
- Node.js (for the `@byteb4rb1e/mime-todo` issue tracker CLI)
|
||||
|
||||
## Setup
|
||||
|
||||
Iniitialize Git submodules:
|
||||
|
||||
```bash
|
||||
git submodule update --init --remote --recursive
|
||||
```
|
||||
|
||||
Install dependencies (includes the package in editable mode):
|
||||
|
||||
```bash
|
||||
pipenv install --dev
|
||||
```
|
||||
|
||||
|
||||
## Tooling
|
||||
|
||||
### Package
|
||||
|
||||
The project is packaged as `byteb4rb1e.utils` under a namespace package
|
||||
layout (`src/byteb4rb1e/utils/`). It is installed in editable mode via
|
||||
Pipenv.
|
||||
|
||||
Build a distribution:
|
||||
|
||||
```bash
|
||||
pipenv run dist
|
||||
```
|
||||
|
||||
### Testing
|
||||
|
||||
Tests are managed by tox. Test environments are defined in `tox.ini`:
|
||||
|
||||
```bash
|
||||
# run all test suites
|
||||
tox
|
||||
|
||||
# run specific environments
|
||||
tox -e unit-py313
|
||||
tox -e lint
|
||||
tox -e format
|
||||
```
|
||||
|
||||
| Environment | Purpose |
|
||||
|---|---|
|
||||
| `unit-py3{9-13}` | Unit tests |
|
||||
| `smoke-py3{9-13}` | Smoke tests |
|
||||
| `integration-py3{9-13}` | Integration tests |
|
||||
| `lint` | Type checking (mypy) |
|
||||
| `format` | Code style (autopep8) |
|
||||
| `audit` | Dependency audit (pip-audit) |
|
||||
|
||||
### Issue tracker
|
||||
|
||||
Issues are tracked in the `TODO` file using the
|
||||
[MIME TODO](https://specs.code.tiararodney.com/mime-todo/) format. Use the
|
||||
`@byteb4rb1e/mime-todo` CLI to interact with it:
|
||||
|
||||
```bash
|
||||
# list issues
|
||||
npx @byteb4rb1e/mime-todo list
|
||||
|
||||
# show a specific issue
|
||||
npx @byteb4rb1e/mime-todo show 3
|
||||
|
||||
# create an issue
|
||||
npx @byteb4rb1e/mime-todo create --type feature --title "Title" --plan "Description" --module homeostat
|
||||
```
|
||||
|
||||
See [CONTRIBUTING.md](CONTRIBUTING.md) for the full issue lifecycle.
|
||||
|
||||
### Publishing
|
||||
|
||||
Build wheel and source distributions:
|
||||
|
||||
```sh
|
||||
pipenv run sdist
|
||||
```
|
||||
|
||||
Configure publishing options:
|
||||
|
||||
`~/.pypirc`
|
||||
```
|
||||
[distutils]
|
||||
index-servers =
|
||||
tiararodney
|
||||
|
||||
[tiararodney]
|
||||
repository: https://pypi.code.tiararodney.com/root/byteb4rb1e/
|
||||
username: <username>
|
||||
password: <password>
|
||||
```
|
||||
|
||||
Publish to pypi.code.tiararodney.com:
|
||||
|
||||
```sh
|
||||
pipenv run sdist:publish:tiarardoney
|
||||
```
|
||||
|
||||
|
||||
## Project layout
|
||||
|
||||
```
|
||||
src/byteb4rb1e/utils/ # package source
|
||||
tests/ # test suites (unit/, smoke/, integration/)
|
||||
vendor/ # vendored specs
|
||||
dist/ # sdist and wheel build output
|
||||
DEVELOPMENT.md # this file
|
||||
TODO # issue tracker (MIME TODO format)
|
||||
```
|
||||
0
LICENSE
0
LICENSE
24
Makefile
Normal file
24
Makefile
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
.PHONY: chore configure
|
||||
|
||||
chore: configure Pipfile.lock requirements-dev.txt
|
||||
|
||||
Pipfile.lock: .venv Pipfile
|
||||
.venv/bin/pipenv lock
|
||||
|
||||
requirements-dev.txt: .venv Pipfile.lock
|
||||
.venv/bin/pipenv requirements --dev-only > requirements-dev.txt
|
||||
|
||||
configure: configure.ac
|
||||
autoconf
|
||||
|
||||
.venv: requirements-dev.txt
|
||||
python3 -m venv .venv
|
||||
.venv/bin/python3 -m pip install --upgrade pip
|
||||
.venv/bin/pip install -r requirements-dev.txt
|
||||
|
||||
test-reports:
|
||||
.venv/bin/python3 -m unittest discover -v
|
||||
|
||||
build: .venv/bin/pipenv
|
||||
.venv/bin/pipenv run build
|
||||
|
||||
20
Pipfile
20
Pipfile
|
|
@ -4,25 +4,17 @@ verify_ssl = true
|
|||
name = "pypi"
|
||||
|
||||
[dev-packages]
|
||||
mypy = "~=1.15.0"
|
||||
autopep8 = "~=2.3.2"
|
||||
setuptools-scm = "~=8.2.0"
|
||||
pylint = "~=3.3.6"
|
||||
build = "*"
|
||||
pipenv = "*"
|
||||
tox = "*"
|
||||
twine = "*"
|
||||
pypi-attestations = "*"
|
||||
autopep8 = "*"
|
||||
byteb4rb1e-utils = { editable = true, path = '.'}
|
||||
|
||||
[requires]
|
||||
python_version = "3"
|
||||
python_version = "3.11"
|
||||
|
||||
[scripts]
|
||||
"dist" = "python3 -m build"
|
||||
"dist:attestations" = "python3 -m pypi_attestations sign dist/*"
|
||||
"dist:publish:tiararodney" = "python3 -m twine upload --sign --repository tiararodney dist/*"
|
||||
"test" = "tox"
|
||||
"test:static" = "tox run -m static"
|
||||
"test:unit" = "tox run -m unit"
|
||||
"test:integration" = "tox run -m integration"
|
||||
"build" = "python3 -m build"
|
||||
|
||||
[packages]
|
||||
"byteb4rb1e.utils" = {file = ".", editable = true}
|
||||
|
|
|
|||
948
Pipfile.lock
generated
948
Pipfile.lock
generated
File diff suppressed because it is too large
Load diff
326
TODO
326
TODO
|
|
@ -1,299 +1,81 @@
|
|||
--ISSUE
|
||||
Content-Type: application/sprints
|
||||
Sprints:
|
||||
# TODO List for esm-logging
|
||||
|
||||
This is a poor-man's issue tracker. I am not primarily a GitHub user so don't
|
||||
want to commit to their issue tracking feature, but my primary SVC service
|
||||
provider (Bitbucket) only offers paid integration into their issue tracker
|
||||
(Jira). I don't have the time (and patience) at the moment to analyze the best
|
||||
approach, so this file will have to suffice.
|
||||
|
||||
It's a very simple concept: Track any issues (features, bugfixes, hotfixes) in
|
||||
here, assign a sequential number to it and use that number when branching.
|
||||
|
||||
I will try to develop a format so that I can parse the file later on, should I
|
||||
decide to migrate to a real issue tracker. It's probably going to be Bugzilla,
|
||||
but for that my html-theme-ref project needs to stabilize first.
|
||||
|
||||
## Format Specification
|
||||
|
||||
The file uses Markdown conventions for formatting headers and other text block
|
||||
entitities, but SHOULD NOT be considered a Markdown file. That's why it has no
|
||||
definitive file extension.
|
||||
|
||||
Each issue entry follows a structured format for easier parsing and future
|
||||
migration. Issues MUST be **appended** to this file and never moved, to
|
||||
preserve Git diffing.
|
||||
|
||||
### Issue Format
|
||||
|
||||
```
|
||||
|
||||
ID: [ISSUE-NUMBER]
|
||||
Type: [feature/bugfix/hotfix]
|
||||
Title: [Short title]
|
||||
Status: [open/in-progress/done]
|
||||
Priority: [low/medium/high]
|
||||
Created: [YYYY-MM-DD]
|
||||
Description: [Detailed explanation]
|
||||
|
||||
---
|
||||
```
|
||||
|
||||
- ISSUE-NUMBERs must be sequential
|
||||
- truncation of description must be indentended so that every line starts at the
|
||||
same column
|
||||
- issues must be started with two LF
|
||||
- issues must be terminated with two LF, then `---`
|
||||
- issues may have a free-text field (epilog), which must be started with two LF.
|
||||
|
||||
## Issues
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 1
|
||||
Type: feature
|
||||
Title: implement KMP algorithm for string searching
|
||||
Status: hold
|
||||
Status: in-progress
|
||||
Priority: high
|
||||
Created: 2025-05-03
|
||||
Relationships:
|
||||
Description: Implement the Knuth-Morris-Pratt algorithm for string searching.
|
||||
I require this for matching RFC 9112 boundaries of entities against
|
||||
a circular buffer.
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
---
|
||||
|
||||
ID: 2
|
||||
Type: feature
|
||||
Title: implement circular buffer
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-05-04
|
||||
Relationships:
|
||||
Description: implement a simple circular buffer
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
---
|
||||
|
||||
ID: 3
|
||||
Type: bugfix
|
||||
Title: move unit tests to subdirectory
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-05-04
|
||||
Relationships:
|
||||
Description: move the unit test suites to a unit/ subdirectory so that
|
||||
integration tests and benchmarks can be cleanly separated
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 4
|
||||
Type: feature
|
||||
Title: implement Rabin-Karp rolling hash algorithm
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-05-05
|
||||
Relationships:
|
||||
Description: After testing a couple of string search algorithms, I've ditched
|
||||
the idea of using KMP as my use-case gives no advantage compared to
|
||||
naive searching. In addition I've came upon the challenge that many
|
||||
string search algorithms are optimized for search on a linear
|
||||
buffer, which in my case is not applicable as the implementation
|
||||
the search algorithm is for uses a circular buffer. Rabin-Karp
|
||||
seemed promising. I've come up with a different approach though,
|
||||
which is still based on rolling hashes, therefore, as a base, I
|
||||
need an implementation of the original Rabin-Karp rolling hash
|
||||
algorithm
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 5
|
||||
Type: feature
|
||||
Title: implement chunked rolling hash algorithm
|
||||
Status: in-progress
|
||||
Priority: high
|
||||
Created: 2025-05-05
|
||||
Relationships:
|
||||
Description: Implement my custom algorithm for doing rolling hash string search
|
||||
against a fixed length ring buffer
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 6
|
||||
Type: feature
|
||||
Title: implement importlib.resources handler for urllib
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-06-20
|
||||
Relationships:
|
||||
Description: A handler that can be registered with an urllib.request
|
||||
OpenerDirector to open importlib.resources package files.
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 7
|
||||
Type: feature
|
||||
Title: setup advanced testing environment
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-06-20
|
||||
Relationships:
|
||||
Description: copy the testing environment setup from
|
||||
byteb4rb1e.sphinxcontrib.ext
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 8
|
||||
Type: bugfix
|
||||
Title: rename package
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-06-20
|
||||
Relationships:
|
||||
Description: use dot namespaces to make the package a little more elegant
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 9
|
||||
Type: bugfix
|
||||
Title: fix LICENSE reference
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-06-20
|
||||
Relationships:
|
||||
Description: license specification is no longer a trove classifier in
|
||||
pyproject.toml, hence the reference to LICENSE must be changed
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 10
|
||||
Type: feature
|
||||
Title: pytest current test context fixtures
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-06-20
|
||||
Relationships:
|
||||
Description: add fixtures for doing things in relation to the active testing
|
||||
context
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 11
|
||||
Type: bugfix
|
||||
Title: move testing utils out of utils
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-06-20
|
||||
Relationships:
|
||||
Description: to shorten the namespace and also indicate that testing utilities
|
||||
are different from regular utilities
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 12
|
||||
Type: feature
|
||||
Title: simplify testing.fixtures.mock_pkg
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-06-21
|
||||
Relationships:
|
||||
Description: Only bootstrap a package mock with the minimum requirements for a
|
||||
Python module and let the consumer handle the directory layout.
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 13
|
||||
Type: bugfix
|
||||
Title: fix unit tests for urllib PkgHandler
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-06-21
|
||||
Relationships:
|
||||
Description: change of issue 12 wasn't properly reflected in urllib PkgHandler
|
||||
unit tests
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 14
|
||||
Type: feature
|
||||
Title: add compression support for urllib PkgHandler
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2025-06-21
|
||||
Relationships:
|
||||
Description: with a proper content-type of the PkgHandler addinfourl object, a
|
||||
consumer can determine whether the file is compressed or not.
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 15
|
||||
Type: bugfix
|
||||
Title: modularize module containers
|
||||
Status: open
|
||||
Priority: high
|
||||
Created: 2025-06-28
|
||||
Relationships:
|
||||
Description: Even though importlib can find submodules through traversing paths
|
||||
instead of relying on __init__.py for every ancestor module, this
|
||||
is not supported by some modules like sphinx.ext.autosummary
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 16
|
||||
Type: feature
|
||||
Title: SQL-aware dataclass
|
||||
Status: in-progress
|
||||
Priority: low
|
||||
Created: 2025-12-31
|
||||
Relationships:
|
||||
Description: A dataclass that transparently maps onto an SQL datastore, with
|
||||
command generation for syncing data between data class and store
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 17
|
||||
Type: feature
|
||||
Title: recursive-descent HTML (DOM) parser
|
||||
Status: in-progress
|
||||
Priority: high
|
||||
Created: 2025-12-31
|
||||
Relationships:
|
||||
Description: Extend the built-in event-driven parser to be modeled after DOM
|
||||
recursive-descent HTML parser
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 18
|
||||
Type: feature
|
||||
Title: implement saas wrapper for Forgejo
|
||||
Status: done
|
||||
Priority: medium
|
||||
Created: 2026-06-06
|
||||
Relationships:
|
||||
Description: Add a new sub-package byteb4rb1e.utils.saas.forgejo, supporting the
|
||||
same/similar operations as the Bitbucket wrapper
|
||||
(byteb4rb1e.utils.saas.bitbucket) against the Forgejo REST API:
|
||||
token-based authentication headers, repository existence checks,
|
||||
repository creation within an owner/organization, and clone URL
|
||||
construction. Implement as a thin layer over
|
||||
byteb4rb1e.utils.http.client, consistent with the existing
|
||||
Bitbucket and GitHub modules.
|
||||
Unlike Bitbucket (one global SaaS instance, hence the hardcoded
|
||||
api.bitbucket.org), Forgejo is self-hosted (e.g.
|
||||
git.code.tiararodney.com). The wrapper MUST take a host/instance
|
||||
URL parameter (or read one from config) rather than baking any
|
||||
specific instance in. This is the biggest API-surface difference
|
||||
from the bitbucket module.
|
||||
Bitbucket's clone_url constructs SSH only. Forgejo's repository
|
||||
API returns both clone_url (HTTPS) and ssh_url, and HTTPS is
|
||||
needed in CI (no SSH host keys on the Woodpecker runner). The
|
||||
wrapper SHOULD expose both, either as ssh_clone_url and
|
||||
https_clone_url, or a single clone_url(..., scheme="ssh"|"https").
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 19
|
||||
Type: feature
|
||||
Title: config framework with CLI integration
|
||||
Status: done
|
||||
Priority: medium
|
||||
Created: 2026-06-06
|
||||
Relationships:
|
||||
Description: Add byteb4rb1e.utils.config: INI-backed config dataclasses where a
|
||||
dataclass is the single source of truth for settings, with three
|
||||
layers (field defaults, INI file sections, CLI overrides). Includes
|
||||
INI loading/writing (load_ini, ensure_ini, ensure_ini_multi,
|
||||
format_section), per-flag CLI integration (add_config_arguments,
|
||||
apply_cli_overrides), dotted-path overrides via a unified --config
|
||||
KEY=VALUE flag (apply_overrides, format_help), and the companion
|
||||
argparse KeyValueAction (byteb4rb1e.utils.argparse.actions) that
|
||||
accumulates KEY=VALUE pairs into a dict.
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 20
|
||||
Type: feature
|
||||
Title: cookie-persisting HTTP session client
|
||||
Status: done
|
||||
Priority: medium
|
||||
Created: 2026-06-06
|
||||
Relationships:
|
||||
Description: Extend byteb4rb1e.utils.http.client with an HttpSession class that
|
||||
persists cookies across requests via http.cookiejar (suitable for
|
||||
login followed by cookie-authenticated page fetches), supporting
|
||||
GET with query params, form-encoded POST, default/per-request
|
||||
header merging, and HTTPError-to-response conversion. Also refactor
|
||||
HttpResponse into a frozen dataclass with text as a derived
|
||||
property.
|
||||
|
||||
--ISSUE
|
||||
Content-Type: application/issue
|
||||
ID: 21
|
||||
Type: feature
|
||||
Title: relax host restriction in vcs.git parse_base_url and parse_repo_name
|
||||
Status: done
|
||||
Priority: high
|
||||
Created: 2026-06-06
|
||||
Relationships:
|
||||
Description: Both byteb4rb1e.utils.vcs.git.parse_base_url and parse_repo_name
|
||||
currently hard-reject any URL whose host is not exactly
|
||||
'bitbucket.org' with a ValueError. The check predates the
|
||||
multi-SaaS world (it dates back to when bootstrapping required the
|
||||
Bitbucket API). With the new forgejo saas wrapper (#18) in place,
|
||||
downstream consumers (specifically sphinxcontrib.h5p.utils.pkg
|
||||
#105) now feed Forgejo-shaped URLs like
|
||||
'git@git.code.tiararodney.com:h5p-mirror/foo.git' through these
|
||||
helpers and hit the restriction.
|
||||
---
|
||||
|
|
|
|||
27
configure.ac
Normal file
27
configure.ac
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
AC_INIT
|
||||
|
||||
AC_CHECK_PROGS([MAKE], [make], [no])
|
||||
AS_IF([test "$MAKE" == "no"],
|
||||
[AC_MSG_NOTICE([without GNU Make, you have to inspect 'Makefile' and deduce build targets yourself.])])
|
||||
|
||||
AC_CHECK_PROGS([GIT], [git], [no])
|
||||
AS_IF([test "$GIT" == "no"],
|
||||
[AC_MSG_ERROR([install Git, before continuing.])])
|
||||
|
||||
AC_CHECK_PROGS([PYTHON3], [python3], [no])
|
||||
AS_IF([test "$PYTHON3" == "no"],
|
||||
[AC_MSG_ERROR([install Python 3, before continuing.])])
|
||||
|
||||
# required in Makefile to ensure proper path resolution during preprocessing
|
||||
# realpath is not available on macOS
|
||||
AC_CHECK_PROGS([REALPATH], [realpath], [no])
|
||||
AS_IF([test "$REALPATH" == "no"],
|
||||
[AC_MSG_ERROR([set a persistent alias for 'realpath', before continuing, e.g.
|
||||
|
||||
alias='python3 -c "import pathlib,sys;print(pathlib.Path(sys.argv[[1]]).resolve())"'"
|
||||
])])
|
||||
|
||||
AC_MSG_NOTICE([initializing python3 venv...])
|
||||
make .venv
|
||||
|
||||
AC_OUTPUT
|
||||
|
|
@ -7,12 +7,12 @@ requires = [
|
|||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "byteb4rb1e.utils"
|
||||
name = "byteb4rb1e-utils"
|
||||
description = "personal utilities and helpers"
|
||||
authors = [
|
||||
{ name = "Tiara Rodney", email = "tiara.rodney@byteb4rb1e.me" }
|
||||
{ name = "Tiara Rodney", email = "tiara.rodney@administratrix.de" }
|
||||
]
|
||||
license-files = ["LICENSE"]
|
||||
license = { file = "LICENSE" }
|
||||
readme = "README.md"
|
||||
classifiers = [
|
||||
"Development Status :: 1 - Planning",
|
||||
|
|
@ -48,6 +48,7 @@ strict = true
|
|||
max_line_length = 80
|
||||
aggressive = 3
|
||||
recursive = true
|
||||
in-place = true
|
||||
|
||||
[tool.setuptools_scm]
|
||||
|
||||
|
|
|
|||
25
requirements-dev.txt
Normal file
25
requirements-dev.txt
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
-i https://pypi.org/simple
|
||||
astroid==3.3.9; python_full_version >= '3.9.0'
|
||||
autopep8==2.3.2; python_version >= '3.9'
|
||||
build==1.2.2.post1; python_version >= '3.8'
|
||||
-e .
|
||||
certifi==2025.4.26; python_version >= '3.6'
|
||||
colorama==0.4.6; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5, 3.6'
|
||||
dill==0.4.0; python_version >= '3.8'
|
||||
distlib==0.3.9
|
||||
filelock==3.18.0; python_version >= '3.9'
|
||||
isort==6.0.1; python_full_version >= '3.9.0'
|
||||
mccabe==0.7.0; python_version >= '3.6'
|
||||
mypy==1.15.0; python_version >= '3.9'
|
||||
mypy-extensions==1.1.0; python_version >= '3.8'
|
||||
packaging==25.0; python_version >= '3.8'
|
||||
pipenv==2025.0.2; python_version >= '3.9'
|
||||
platformdirs==4.3.7; python_version >= '3.9'
|
||||
pycodestyle==2.13.0; python_version >= '3.9'
|
||||
pylint==3.3.6; python_full_version >= '3.9.0'
|
||||
pyproject-hooks==1.2.0; python_version >= '3.7'
|
||||
setuptools==80.3.0; python_version >= '3.9'
|
||||
setuptools-scm==8.2.0; python_version >= '3.8'
|
||||
tomlkit==0.13.2; python_version >= '3.8'
|
||||
typing-extensions==4.13.2; python_version >= '3.8'
|
||||
virtualenv==20.30.0; python_version >= '3.8'
|
||||
|
|
@ -1,14 +0,0 @@
|
|||
import os
|
||||
from pathlib import Path
|
||||
from typing import Tuple
|
||||
|
||||
|
||||
def get_current_test() -> Tuple[Path, str]:
|
||||
current_test_env = os.getenv("PYTEST_CURRENT_TEST")
|
||||
if current_test_env is None:
|
||||
raise RuntimeError("PYTEST_CURRENT_TEST not set. Must be run under pytest.")
|
||||
|
||||
suite_path, case_name = current_test_env.split('::', 1)
|
||||
case_name = case_name.split(' ', 1)[0]
|
||||
return Path(suite_path).resolve(), case_name
|
||||
|
||||
|
|
@ -1,47 +0,0 @@
|
|||
from functools import wraps
|
||||
from pathlib import Path
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from byteb4rb1e.testing.pytest import get_current_test
|
||||
|
||||
|
||||
def run_in_subprocess_once():
|
||||
"""
|
||||
A decorator that reruns th test in a subprocess if not already inside one.
|
||||
Requires pytest to be installed and test to be run by pytest.
|
||||
|
||||
For what? Anything that can't be done in a thread-safe manner, e.g. modifying PYTHON_PATH
|
||||
"""
|
||||
def decorator(test_func):
|
||||
@wraps(test_func)
|
||||
def wrapper(*args, **kwargs):
|
||||
if os.environ.get("XPYTEST_INSIDE_SUBPROCESS") == "1":
|
||||
return test_func(*args, **kwargs)
|
||||
|
||||
suite_path, case_name = get_current_test()
|
||||
|
||||
cmd = [
|
||||
sys.executable,
|
||||
"-m", "pytest",
|
||||
f"{suite_path}::{case_name}",
|
||||
]
|
||||
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
env={**os.environ, "XPYTEST_INSIDE_SUBPROCESS": "1"},
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
print(' '.join(cmd))
|
||||
print("==== Subprocess stdout ====")
|
||||
print(result.stdout)
|
||||
print("==== Subprocess stderr ====")
|
||||
print(result.stderr)
|
||||
raise AssertionError(f"Subprocess test failed with exit code {result.returncode}")
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
|
@ -1,44 +0,0 @@
|
|||
import os
|
||||
from pathlib import Path
|
||||
import sys
|
||||
from typing import Dict, Tuple, Union
|
||||
|
||||
import pytest
|
||||
|
||||
from byteb4rb1e.testing.pytest import get_current_test
|
||||
|
||||
_SITE_PACKAGE_COUNTER: Dict[str, int] = {}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def current_test() -> Tuple[Path, str]:
|
||||
"""
|
||||
"""
|
||||
return get_current_test()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_system_site_package_dir(tmp_path):
|
||||
global _SITE_PACKAGE_COUNTER
|
||||
|
||||
package_id = _SITE_PACKAGE_COUNTER.setdefault(tmp_path, 0)
|
||||
_SITE_PACKAGE_COUNTER[tmp_path] += 1
|
||||
|
||||
sys_path = tmp_path / str(package_id)
|
||||
|
||||
def _create(name: str) -> Path:
|
||||
pkg_path = sys_path / name.replace('.', os.path.sep)
|
||||
|
||||
pkg_path.mkdir(parents=True)
|
||||
|
||||
(pkg_path / "__init__.py").touch()
|
||||
|
||||
sys.path.insert(0, str(sys_path))
|
||||
|
||||
return pkg_path
|
||||
|
||||
yield _create
|
||||
|
||||
# cleanup sys.path after test
|
||||
if str(sys_path) in sys.path:
|
||||
sys.path.remove(str(sys_path))
|
||||
|
|
@ -1,7 +0,0 @@
|
|||
"""Utilities for building composable CLIs from command dataclasses."""
|
||||
|
||||
from byteb4rb1e.utils.argparse.actions import KeyValueAction
|
||||
from byteb4rb1e.utils.argparse.command import CLICommand
|
||||
from byteb4rb1e.utils.argparse.dispatcher import CLI
|
||||
|
||||
__all__ = ["CLI", "CLICommand", "KeyValueAction"]
|
||||
|
|
@ -1,33 +0,0 @@
|
|||
"""Custom argparse actions."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
from typing import Any
|
||||
|
||||
|
||||
class KeyValueAction(argparse.Action):
|
||||
"""Argparse action that accumulates ``KEY=VALUE`` pairs into a dict.
|
||||
|
||||
Usage::
|
||||
|
||||
parser.add_argument("--config", action=KeyValueAction,
|
||||
default={}, metavar="KEY=VALUE",
|
||||
help="Set a config option (can be repeated)")
|
||||
|
||||
Then ``args.config`` is a ``dict[str, str]``.
|
||||
"""
|
||||
|
||||
def __call__(
|
||||
self,
|
||||
parser: argparse.ArgumentParser,
|
||||
namespace: argparse.Namespace,
|
||||
values: Any,
|
||||
option_string: str | None = None,
|
||||
) -> None:
|
||||
d = getattr(namespace, self.dest, None) or {}
|
||||
if "=" not in values:
|
||||
parser.error(f"Invalid format: {values!r} (expected KEY=VALUE)")
|
||||
key, _, value = values.partition("=")
|
||||
d[key.strip()] = value.strip()
|
||||
setattr(namespace, self.dest, d)
|
||||
|
|
@ -1,54 +0,0 @@
|
|||
"""Base command dataclass for composable CLI trees."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from argparse import ArgumentParser
|
||||
from dataclasses import dataclass, fields
|
||||
from typing import Any, ClassVar, Dict, List, Optional, Type
|
||||
|
||||
|
||||
@dataclass
|
||||
class CLICommand:
|
||||
"""Base class for CLI commands.
|
||||
|
||||
Subclasses define their identity (name, help, description) as
|
||||
dataclass fields. These are passed as kwargs to
|
||||
``subparsers.add_parser()``.
|
||||
|
||||
Override ``add_arguments`` to register flags and positionals.
|
||||
Override ``execute`` to implement the command's logic.
|
||||
|
||||
Nest subcommands by setting ``_subcommands`` as a class variable.
|
||||
"""
|
||||
|
||||
name: str = ""
|
||||
help: str = ""
|
||||
description: str = ""
|
||||
|
||||
_subcommands: ClassVar[List[Type[Command]]] = []
|
||||
|
||||
def add_arguments(self, parser: ArgumentParser) -> None:
|
||||
"""Add arguments to the parser. Override in subclasses."""
|
||||
|
||||
def execute(self, args: Any) -> int:
|
||||
"""Run the command. Override in subclasses.
|
||||
|
||||
Returns an exit code (0 = success).
|
||||
"""
|
||||
return 0
|
||||
|
||||
def parser_kwargs(self) -> Dict[str, Any]:
|
||||
"""Return the dataclass fields as kwargs for add_parser.
|
||||
|
||||
Excludes ``name`` (used as the positional parser name) and
|
||||
any empty-string fields so argparse defaults apply.
|
||||
"""
|
||||
skip = {"name"}
|
||||
kwargs = {}
|
||||
for f in fields(self):
|
||||
if f.name in skip or f.name.startswith("_"):
|
||||
continue
|
||||
val = getattr(self, f.name)
|
||||
if val != "":
|
||||
kwargs[f.name] = val
|
||||
return kwargs
|
||||
|
|
@ -1,122 +0,0 @@
|
|||
"""CLI dispatcher — builds parser trees from command dataclasses."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
|
||||
from typing import Any, Dict, List, Optional, Type
|
||||
|
||||
from byteb4rb1e.utils.argparse.command import CLICommand
|
||||
|
||||
|
||||
class CLI:
|
||||
"""Composable CLI built from a tree of Command dataclasses.
|
||||
|
||||
Recursively bootstraps an argparse parser hierarchy and tracks
|
||||
dest names so ``run()`` can dispatch to the correct leaf command
|
||||
without dest chaining in the caller.
|
||||
|
||||
Usage::
|
||||
|
||||
cli = CLI(prog="repository", description="...")
|
||||
cli.bootstrap([MirrorCommand, IndexCommand])
|
||||
cli.run()
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
prog: Optional[str] = None,
|
||||
description: str = "",
|
||||
) -> None:
|
||||
kwargs = {} # type: Dict[str, Any]
|
||||
if prog:
|
||||
kwargs["prog"] = prog
|
||||
if description:
|
||||
kwargs["description"] = description
|
||||
kwargs.setdefault(
|
||||
"formatter_class", ArgumentDefaultsHelpFormatter,
|
||||
)
|
||||
self.parser = ArgumentParser(**kwargs)
|
||||
self._dests = [] # type: List[str]
|
||||
self._commands = {} # type: Dict[str, Command]
|
||||
|
||||
def add_arguments(self, parser: ArgumentParser) -> None:
|
||||
"""Add global arguments to the root parser."""
|
||||
parser.add_argument(
|
||||
"-v", "--verbose", action="count", default=0,
|
||||
help="Increase verbosity (-v for INFO, -vv for DEBUG)",
|
||||
)
|
||||
|
||||
def bootstrap(
|
||||
self,
|
||||
commands: List[Type[Command]],
|
||||
) -> None:
|
||||
"""Build the parser tree from a list of top-level commands."""
|
||||
self.add_arguments(self.parser)
|
||||
dest = "command"
|
||||
self._dests.append(dest)
|
||||
sub = self.parser.add_subparsers(dest=dest)
|
||||
for cmd_cls in commands:
|
||||
self._add(sub, cmd_cls, prefix="")
|
||||
|
||||
def _add(
|
||||
self,
|
||||
subparsers: Any,
|
||||
cmd_cls: Type[Command],
|
||||
prefix: str,
|
||||
) -> None:
|
||||
"""Recursively add a command and its subcommands."""
|
||||
cmd = cmd_cls()
|
||||
parser = subparsers.add_parser(
|
||||
cmd.name,
|
||||
formatter_class=ArgumentDefaultsHelpFormatter,
|
||||
**cmd.parser_kwargs(),
|
||||
)
|
||||
cmd.add_arguments(parser)
|
||||
|
||||
key = "%s.%s" % (prefix, cmd.name) if prefix else cmd.name
|
||||
self._commands[key] = cmd
|
||||
|
||||
if cmd._subcommands:
|
||||
dest = "%s_command" % cmd.name
|
||||
self._dests.append(dest)
|
||||
child_sub = parser.add_subparsers(dest=dest)
|
||||
for sc_cls in cmd._subcommands:
|
||||
self._add(child_sub, sc_cls, prefix=key)
|
||||
|
||||
def _resolve(self, args: Any) -> Optional[Command]:
|
||||
"""Walk dest chain to find the leaf command."""
|
||||
parts = [] # type: List[str]
|
||||
for dest in self._dests:
|
||||
val = getattr(args, dest, None)
|
||||
if val is None:
|
||||
continue
|
||||
parts.append(val)
|
||||
if not parts:
|
||||
return None
|
||||
key = ".".join(parts)
|
||||
return self._commands.get(key)
|
||||
|
||||
@staticmethod
|
||||
def _setup_logging(verbosity: int) -> None:
|
||||
if verbosity >= 2:
|
||||
level = logging.DEBUG
|
||||
elif verbosity >= 1:
|
||||
level = logging.INFO
|
||||
else:
|
||||
level = logging.WARNING
|
||||
logging.basicConfig(
|
||||
level=level,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
handlers=[logging.StreamHandler()],
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
"""Parse args and dispatch to the leaf command."""
|
||||
args = self.parser.parse_args()
|
||||
self._setup_logging(getattr(args, "verbose", 0))
|
||||
cmd = self._resolve(args)
|
||||
if cmd is None:
|
||||
self.parser.print_help()
|
||||
raise SystemExit(1)
|
||||
raise SystemExit(cmd.execute(args))
|
||||
|
|
@ -1,369 +0,0 @@
|
|||
"""Config framework — INI-backed dataclasses with CLI integration.
|
||||
|
||||
A config dataclass is the single source of truth for settings. Values
|
||||
come from three layers (later wins):
|
||||
|
||||
1. Dataclass field defaults
|
||||
2. INI file sections
|
||||
3. CLI overrides (via argparse flags or ``--config KEY=VALUE``)
|
||||
|
||||
Two CLI integration styles:
|
||||
|
||||
- ``add_config_arguments`` — generates one ``--flag`` per field.
|
||||
- ``apply_overrides`` — accepts a ``dict[str, str]`` of dotted-path
|
||||
overrides from a unified ``--config KEY=VALUE`` flag.
|
||||
"""
|
||||
|
||||
import configparser
|
||||
from argparse import ArgumentParser, Namespace
|
||||
from dataclasses import MISSING, fields
|
||||
from pathlib import Path
|
||||
from typing import Any, Type, TypeVar, get_type_hints
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _parse_bool(value: str) -> bool:
|
||||
"""Parse a boolean from INI/CLI string."""
|
||||
return value.lower() in ("true", "yes", "1", "on")
|
||||
|
||||
|
||||
_TYPE_MAP = {
|
||||
int: int,
|
||||
float: float,
|
||||
str: str,
|
||||
bool: _parse_bool,
|
||||
}
|
||||
|
||||
|
||||
def resolve_hints(cls: Type) -> dict[str, type]:
|
||||
"""Resolve type hints for a dataclass, handling both evaluated
|
||||
and string annotations.
|
||||
|
||||
:param cls: a dataclass class.
|
||||
:returns: dict mapping field names to resolved types.
|
||||
"""
|
||||
try:
|
||||
return get_type_hints(cls)
|
||||
except Exception:
|
||||
return {
|
||||
f.name: f.type if isinstance(f.type, type)
|
||||
else str
|
||||
for f in fields(cls)
|
||||
}
|
||||
|
||||
|
||||
def _section_name(cls: Type, section: str | None = None) -> str:
|
||||
"""Derive INI section name from class name if not provided."""
|
||||
if section is not None:
|
||||
return section
|
||||
name = cls.__name__
|
||||
if name.endswith("Config"):
|
||||
name = name[: -len("Config")]
|
||||
return name.lower()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# INI loading
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def load_ini(
|
||||
cls: Type[T],
|
||||
path: Path,
|
||||
section: str | None = None,
|
||||
) -> T:
|
||||
"""Load a config dataclass from an INI file.
|
||||
|
||||
If *section* is not given, the dataclass name (lowercased,
|
||||
without trailing "Config") is used.
|
||||
|
||||
Unknown keys in the INI file raise ValueError. Missing keys
|
||||
use the dataclass default.
|
||||
"""
|
||||
section = _section_name(cls, section)
|
||||
|
||||
parser = configparser.ConfigParser(
|
||||
comment_prefixes=("#", ";"),
|
||||
inline_comment_prefixes=("#", ";"),
|
||||
)
|
||||
parser.read(path)
|
||||
|
||||
if not parser.has_section(section):
|
||||
return cls() # type: ignore[call-arg]
|
||||
|
||||
hints = resolve_hints(cls)
|
||||
field_names = {f.name for f in fields(cls) if f.init}
|
||||
kwargs: dict[str, Any] = {}
|
||||
|
||||
for key, raw_value in parser.items(section):
|
||||
if key not in field_names:
|
||||
raise ValueError(
|
||||
f"Unknown config key '{key}' in"
|
||||
f" [{section}]. Valid keys:"
|
||||
f" {sorted(field_names)}"
|
||||
)
|
||||
|
||||
field_type = hints.get(key, str)
|
||||
coerce = _TYPE_MAP.get(field_type, field_type)
|
||||
kwargs[key] = coerce(raw_value)
|
||||
|
||||
return cls(**kwargs) # type: ignore[call-arg]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# INI writing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def format_section(cls: Type, section: str | None = None) -> str:
|
||||
"""Format a config dataclass as an INI section string.
|
||||
|
||||
Returns the section header and all fields with their defaults
|
||||
as commented key-value pairs.
|
||||
|
||||
:param cls: a dataclass class.
|
||||
:param section: section name (derived from class name if None).
|
||||
:returns: INI section string.
|
||||
"""
|
||||
section = _section_name(cls, section)
|
||||
hints = resolve_hints(cls)
|
||||
lines = [f"[{section}]"]
|
||||
|
||||
for f in fields(cls):
|
||||
if not f.init:
|
||||
continue
|
||||
field_type = hints.get(f.name, str)
|
||||
type_name = getattr(field_type, "__name__", str(field_type))
|
||||
|
||||
if f.default is not MISSING:
|
||||
default = f.default
|
||||
elif f.default_factory is not MISSING: # type: ignore[arg-type]
|
||||
default = f.default_factory() # type: ignore[misc]
|
||||
else:
|
||||
continue
|
||||
|
||||
lines.append(f"# {f.name} ({type_name})")
|
||||
lines.append(f"{f.name} = {default}")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def ensure_ini(
|
||||
cls: Type[T],
|
||||
path: Path,
|
||||
section: str | None = None,
|
||||
) -> T:
|
||||
"""Load config from INI, creating the file with defaults if
|
||||
it does not exist.
|
||||
|
||||
On first run, writes a commented INI file with all fields and
|
||||
their default values. On subsequent runs, reads the existing
|
||||
file. Never writes back CLI overrides.
|
||||
"""
|
||||
section = _section_name(cls, section)
|
||||
|
||||
if not path.exists():
|
||||
_write_default_ini(cls, path, section)
|
||||
|
||||
return load_ini(cls, path, section)
|
||||
|
||||
|
||||
def ensure_ini_multi(
|
||||
configs: list[tuple[Type, str | None]],
|
||||
path: Path,
|
||||
) -> None:
|
||||
"""Create an INI file with multiple sections if it does not exist.
|
||||
|
||||
Each entry is a (dataclass_cls, section_name) tuple. If
|
||||
section_name is None, it is derived from the class name.
|
||||
|
||||
Does not overwrite an existing file.
|
||||
|
||||
:param configs: list of (cls, section) tuples.
|
||||
:param path: path to the INI file.
|
||||
"""
|
||||
if path.exists():
|
||||
return
|
||||
|
||||
sections = [format_section(cls, section) for cls, section in configs]
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text("\n".join(sections) + "\n")
|
||||
|
||||
|
||||
def _write_default_ini(
|
||||
cls: Type,
|
||||
path: Path,
|
||||
section: str,
|
||||
) -> None:
|
||||
"""Write an INI file with all fields as commented defaults."""
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(format_section(cls, section) + "\n")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI: per-flag style (add_config_arguments / apply_cli_overrides)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def add_config_arguments(
|
||||
cls: Type[T],
|
||||
parser: ArgumentParser,
|
||||
prefix: str = "",
|
||||
) -> None:
|
||||
"""Add CLI arguments for each field in a config dataclass.
|
||||
|
||||
Field names are converted to CLI flags: ``heart_rate_resolution``
|
||||
becomes ``--heart-rate-resolution`` (or ``--<prefix>-heart-rate-resolution``
|
||||
if a prefix is given).
|
||||
"""
|
||||
hints = resolve_hints(cls)
|
||||
|
||||
for f in fields(cls):
|
||||
if not f.init:
|
||||
continue
|
||||
flag_name = f.name.replace("_", "-")
|
||||
if prefix:
|
||||
flag_name = f"{prefix}-{flag_name}"
|
||||
|
||||
field_type = hints.get(f.name, str)
|
||||
|
||||
kwargs: dict[str, Any] = {
|
||||
"dest": f.name,
|
||||
}
|
||||
|
||||
if field_type is bool:
|
||||
kwargs["action"] = (
|
||||
"store_false"
|
||||
if f.default is True
|
||||
else "store_true"
|
||||
)
|
||||
kwargs["default"] = None
|
||||
else:
|
||||
kwargs["type"] = _TYPE_MAP.get(
|
||||
field_type, field_type
|
||||
)
|
||||
kwargs["default"] = None
|
||||
kwargs["metavar"] = field_type.__name__.upper()
|
||||
|
||||
parser.add_argument(f"--{flag_name}", **kwargs)
|
||||
|
||||
|
||||
def apply_cli_overrides(
|
||||
config: T,
|
||||
args: Namespace,
|
||||
) -> T:
|
||||
"""Apply CLI argument values to a config instance.
|
||||
|
||||
Only overrides fields that were explicitly set on the command
|
||||
line (not None). Returns a new instance.
|
||||
"""
|
||||
overrides = {}
|
||||
for f in fields(config): # type: ignore[arg-type]
|
||||
if not f.init:
|
||||
continue
|
||||
cli_value = getattr(args, f.name, None)
|
||||
if cli_value is not None:
|
||||
overrides[f.name] = cli_value
|
||||
|
||||
if not overrides:
|
||||
return config
|
||||
|
||||
from dataclasses import asdict
|
||||
merged = asdict(config) # type: ignore[arg-type]
|
||||
merged.update(overrides)
|
||||
return type(config)(**merged) # type: ignore[return-value]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI: dotted-path style (apply_overrides)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def apply_overrides(
|
||||
config: T,
|
||||
overrides: dict[str, str],
|
||||
prefix: str = "",
|
||||
) -> T:
|
||||
"""Apply dotted-path string overrides to a config dataclass.
|
||||
|
||||
Used with a unified ``--config KEY=VALUE`` CLI flag. Each key
|
||||
is a dotted path relative to the prefix.
|
||||
|
||||
Example::
|
||||
|
||||
overrides = {
|
||||
"provider.base_url": "http://localhost:4000",
|
||||
"provider.model": "qwen2.5:7b",
|
||||
}
|
||||
config = apply_overrides(config, overrides, prefix="provider")
|
||||
# config.base_url == "http://localhost:4000"
|
||||
# config.model == "qwen2.5:7b"
|
||||
|
||||
:param config: a dataclass instance.
|
||||
:param overrides: dict of dotted keys to string values.
|
||||
:param prefix: only apply keys starting with this prefix.
|
||||
:returns: new config instance with overrides applied.
|
||||
"""
|
||||
hints = resolve_hints(type(config))
|
||||
kwargs: dict[str, Any] = {}
|
||||
changed = False
|
||||
|
||||
for f in fields(config):
|
||||
if not f.init:
|
||||
continue
|
||||
full_key = f"{prefix}.{f.name}" if prefix else f.name
|
||||
if full_key in overrides:
|
||||
raw = overrides[full_key]
|
||||
field_type = hints.get(f.name, str)
|
||||
coerce = _TYPE_MAP.get(field_type, field_type)
|
||||
kwargs[f.name] = coerce(raw)
|
||||
changed = True
|
||||
else:
|
||||
kwargs[f.name] = getattr(config, f.name)
|
||||
|
||||
if not changed:
|
||||
return config
|
||||
|
||||
return type(config)(**kwargs) # type: ignore[return-value]
|
||||
|
||||
|
||||
def format_help(cls: Type, prefix: str = "") -> list[str]:
|
||||
"""Generate help lines for a config dataclass.
|
||||
|
||||
Each line shows the dotted key path, type, and default value.
|
||||
Suitable for CLI epilog text.
|
||||
|
||||
:param cls: a dataclass class.
|
||||
:param prefix: prepended to each key path.
|
||||
:returns: list of formatted help strings.
|
||||
"""
|
||||
hints = resolve_hints(cls)
|
||||
lines = []
|
||||
|
||||
for f in fields(cls):
|
||||
if not f.init:
|
||||
continue
|
||||
field_type = hints.get(f.name, str)
|
||||
type_name = getattr(field_type, "__name__", str(field_type))
|
||||
key = f"{prefix}.{f.name}" if prefix else f.name
|
||||
|
||||
if f.default is not MISSING:
|
||||
default = f.default
|
||||
elif f.default_factory is not MISSING: # type: ignore[arg-type]
|
||||
default = repr(f.default_factory()) # type: ignore[misc]
|
||||
else:
|
||||
default = "(required)"
|
||||
|
||||
lines.append(f" {key} ({type_name}, default: {default})")
|
||||
|
||||
return lines
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Backwards compat
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# keep the old private name working for existing callers
|
||||
_resolve_hints = resolve_hints
|
||||
|
|
@ -1,201 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Generic HTTP client.
|
||||
|
||||
Thin urllib wrapper with retry-on-rate-limit. No domain knowledge —
|
||||
GitHub, Bitbucket, etc. are handled by higher-level modules.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
import http.cookiejar
|
||||
import json
|
||||
import time
|
||||
from typing import Any, Dict, Optional
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
from warnings import warn
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class HttpResponse:
|
||||
status_code: int
|
||||
headers: dict[str, str]
|
||||
data: bytes
|
||||
reason: Optional[str] = None
|
||||
|
||||
def json(self):
|
||||
return json.loads(self.data.decode("utf-8"))
|
||||
|
||||
@property
|
||||
def text(self) -> str:
|
||||
return self.data.decode("utf-8", errors="replace")
|
||||
|
||||
|
||||
class HttpSession:
|
||||
"""HTTP client that persists cookies across requests.
|
||||
|
||||
Suitable for sites that require login followed by
|
||||
cookie-authenticated page fetches.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
default_headers: dict[str, str] | None = None,
|
||||
timeout: int = 30,
|
||||
) -> None:
|
||||
self._timeout = timeout
|
||||
self._default_headers = default_headers or {}
|
||||
self._jar = http.cookiejar.CookieJar()
|
||||
self._opener = urllib.request.build_opener(
|
||||
urllib.request.HTTPCookieProcessor(self._jar),
|
||||
)
|
||||
|
||||
def get(
|
||||
self,
|
||||
url: str,
|
||||
params: dict[str, str] | None = None,
|
||||
headers: dict[str, str] | None = None,
|
||||
) -> HttpResponse:
|
||||
if params:
|
||||
query = urllib.parse.urlencode(params)
|
||||
url = f"{url}?{query}"
|
||||
|
||||
req = urllib.request.Request(
|
||||
url,
|
||||
headers=self._merged_headers(headers),
|
||||
method="GET",
|
||||
)
|
||||
return self._send(req)
|
||||
|
||||
def post(
|
||||
self,
|
||||
url: str,
|
||||
data: dict[str, str] | None = None,
|
||||
headers: dict[str, str] | None = None,
|
||||
) -> HttpResponse:
|
||||
body = (
|
||||
urllib.parse.urlencode(data).encode()
|
||||
if data else None
|
||||
)
|
||||
|
||||
merged = self._merged_headers(headers)
|
||||
if data and "Content-Type" not in merged:
|
||||
merged["Content-Type"] = (
|
||||
"application/x-www-form-urlencoded"
|
||||
)
|
||||
|
||||
req = urllib.request.Request(
|
||||
url,
|
||||
data=body,
|
||||
headers=merged,
|
||||
method="POST",
|
||||
)
|
||||
return self._send(req)
|
||||
|
||||
def _send(self, req: urllib.request.Request) -> HttpResponse:
|
||||
try:
|
||||
with self._opener.open(
|
||||
req, timeout=self._timeout
|
||||
) as resp:
|
||||
return HttpResponse(
|
||||
status_code=resp.getcode(),
|
||||
headers=dict(resp.getheaders()),
|
||||
data=resp.read(),
|
||||
)
|
||||
except urllib.error.HTTPError as e:
|
||||
return HttpResponse(
|
||||
status_code=e.code,
|
||||
headers=dict(e.headers.items()),
|
||||
data=e.read(),
|
||||
)
|
||||
|
||||
def _merged_headers(
|
||||
self, extra: dict[str, str] | None
|
||||
) -> dict[str, str]:
|
||||
merged = dict(self._default_headers)
|
||||
if extra:
|
||||
merged.update(extra)
|
||||
return merged
|
||||
|
||||
|
||||
def _request(
|
||||
url: str,
|
||||
method: str = "GET",
|
||||
params: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
data: Optional[bytes] = None,
|
||||
) -> HttpResponse:
|
||||
# TODO: do proper exponential backoff
|
||||
backoff = [1, 2, 4]
|
||||
|
||||
if params:
|
||||
query = urllib.parse.urlencode(params)
|
||||
url = f"{url}?{query}"
|
||||
|
||||
req = urllib.request.Request(
|
||||
url,
|
||||
headers=headers or {},
|
||||
method=method,
|
||||
data=data,
|
||||
)
|
||||
|
||||
for delay in backoff:
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
status = resp.getcode()
|
||||
resp_data = resp.read()
|
||||
resp_headers = dict(resp.getheaders())
|
||||
|
||||
if status == 429:
|
||||
warn(f"Rate-limited on {url} (HTTP {status})."
|
||||
f" Backing off {delay}s...")
|
||||
time.sleep(delay)
|
||||
continue
|
||||
|
||||
return HttpResponse(
|
||||
status, resp_headers, resp_data, resp.reason,
|
||||
)
|
||||
|
||||
except urllib.error.HTTPError as e:
|
||||
status = e.code
|
||||
err_data = e.read()
|
||||
err_headers = dict(e.headers.items())
|
||||
if status == 429:
|
||||
warn(f"Rate-limited on {url} (HTTP {status})."
|
||||
f" Backing off {delay}s...")
|
||||
time.sleep(delay)
|
||||
continue
|
||||
return HttpResponse(
|
||||
status, err_headers, err_data, e.reason,
|
||||
)
|
||||
|
||||
except urllib.error.URLError as e:
|
||||
raise Exception(
|
||||
"Network error on %s: %s", url, e,
|
||||
) from e
|
||||
|
||||
# If all retries exhausted, return last error-like response
|
||||
return HttpResponse(503, {}, b"", "Service unavailable")
|
||||
|
||||
|
||||
def get(
|
||||
url: str,
|
||||
params: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
) -> HttpResponse:
|
||||
return _request(url, method="GET", params=params, headers=headers)
|
||||
|
||||
|
||||
def post(
|
||||
url: str,
|
||||
data: Optional[bytes] = None,
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
) -> HttpResponse:
|
||||
return _request(url, method="POST", headers=headers, data=data)
|
||||
|
||||
|
||||
def put(
|
||||
url: str,
|
||||
data: Optional[bytes] = None,
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
) -> HttpResponse:
|
||||
return _request(url, method="PUT", headers=headers, data=data)
|
||||
|
|
@ -1,78 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Bitbucket Cloud REST API v2.0 wrapper.
|
||||
|
||||
Thin layer over http.py for Bitbucket-specific operations:
|
||||
|
||||
- Bearer token authentication
|
||||
- Repository existence checks
|
||||
- Repository creation within a workspace/project
|
||||
"""
|
||||
|
||||
import json
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from byteb4rb1e.utils.http import client as http_client
|
||||
|
||||
|
||||
BITBUCKET_API = "https://api.bitbucket.org/2.0"
|
||||
|
||||
|
||||
def http_headers(token: str) -> Dict[str, str]:
|
||||
"""Construct Bitbucket API headers with Bearer token auth."""
|
||||
return {
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Accept": "application/json",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
|
||||
def repository_exists(
|
||||
workspace: str,
|
||||
repo_slug: str,
|
||||
token: str,
|
||||
) -> bool:
|
||||
"""Check whether a repository exists in the workspace."""
|
||||
url = f"{BITBUCKET_API}/repositories/{workspace}/{repo_slug}"
|
||||
resp = http_client.get(url, headers=http_headers(token))
|
||||
return resp.status_code == 200
|
||||
|
||||
|
||||
def create_repository(
|
||||
workspace: str,
|
||||
repo_slug: str,
|
||||
token: str,
|
||||
project: Optional[str] = None,
|
||||
description: str = "",
|
||||
is_private: bool = True,
|
||||
) -> http_client.HttpResponse:
|
||||
"""Create a new repository in the workspace.
|
||||
|
||||
When *project* is given the repository is assigned to that
|
||||
Bitbucket project (by key). This is required for workspaces
|
||||
that scope access keys at the project level.
|
||||
|
||||
Returns the API response. Caller should check status_code == 200
|
||||
for success.
|
||||
"""
|
||||
url = f"{BITBUCKET_API}/repositories/{workspace}/{repo_slug}"
|
||||
body: Dict[str, Any] = {
|
||||
"scm": "git",
|
||||
"is_private": is_private,
|
||||
"description": description,
|
||||
"fork_policy": "no_forks",
|
||||
}
|
||||
if project:
|
||||
body["project"] = {"key": project}
|
||||
return http_client.put(
|
||||
url,
|
||||
data=json.dumps(body).encode("utf-8"),
|
||||
headers=http_headers(token),
|
||||
)
|
||||
|
||||
|
||||
def clone_url(
|
||||
workspace: str,
|
||||
repo_slug: str,
|
||||
) -> str:
|
||||
"""Return the SSH clone URL for a Bitbucket repository."""
|
||||
return f"git@bitbucket.org:{workspace}/{repo_slug}.git"
|
||||
|
|
@ -1,98 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Forgejo REST API v1 wrapper.
|
||||
|
||||
Thin layer over http.py for Forgejo-specific operations:
|
||||
|
||||
- Token authentication
|
||||
- Repository existence checks
|
||||
- Repository creation under the authenticated user or an organization
|
||||
- SSH and HTTPS clone URL construction
|
||||
|
||||
Unlike Bitbucket (one global SaaS instance), Forgejo is self-hosted,
|
||||
so every operation takes a *host* parameter instead of baking any
|
||||
specific instance in.
|
||||
"""
|
||||
|
||||
import json
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from byteb4rb1e.utils.http import client as http_client
|
||||
|
||||
|
||||
def api_url(host: str) -> str:
|
||||
"""Return the API base URL for a Forgejo instance."""
|
||||
return f"https://{host}/api/v1"
|
||||
|
||||
|
||||
def http_headers(token: str) -> Dict[str, str]:
|
||||
"""Construct Forgejo API headers with token auth."""
|
||||
return {
|
||||
"Authorization": f"token {token}",
|
||||
"Accept": "application/json",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
|
||||
def repository_exists(
|
||||
host: str,
|
||||
owner: str,
|
||||
repo_slug: str,
|
||||
token: str,
|
||||
) -> bool:
|
||||
"""Check whether a repository exists under the owner."""
|
||||
url = f"{api_url(host)}/repos/{owner}/{repo_slug}"
|
||||
resp = http_client.get(url, headers=http_headers(token))
|
||||
return bool(resp.status_code == 200)
|
||||
|
||||
|
||||
def create_repository(
|
||||
host: str,
|
||||
repo_slug: str,
|
||||
token: str,
|
||||
org: Optional[str] = None,
|
||||
description: str = "",
|
||||
is_private: bool = True,
|
||||
) -> http_client.HttpResponse:
|
||||
"""Create a new repository on the Forgejo instance.
|
||||
|
||||
When *org* is given the repository is created in that
|
||||
organization, otherwise under the authenticated user.
|
||||
|
||||
Returns the API response. Caller should check status_code == 201
|
||||
for success.
|
||||
"""
|
||||
if org:
|
||||
url = f"{api_url(host)}/orgs/{org}/repos"
|
||||
else:
|
||||
url = f"{api_url(host)}/user/repos"
|
||||
body: Dict[str, Any] = {
|
||||
"name": repo_slug,
|
||||
"private": is_private,
|
||||
"description": description,
|
||||
}
|
||||
return http_client.post(
|
||||
url,
|
||||
data=json.dumps(body).encode("utf-8"),
|
||||
headers=http_headers(token),
|
||||
)
|
||||
|
||||
|
||||
def ssh_clone_url(
|
||||
host: str,
|
||||
owner: str,
|
||||
repo_slug: str,
|
||||
) -> str:
|
||||
"""Return the SSH clone URL for a Forgejo repository."""
|
||||
return f"git@{host}:{owner}/{repo_slug}.git"
|
||||
|
||||
|
||||
def https_clone_url(
|
||||
host: str,
|
||||
owner: str,
|
||||
repo_slug: str,
|
||||
) -> str:
|
||||
"""Return the HTTPS clone URL for a Forgejo repository.
|
||||
|
||||
Preferred in CI environments without SSH host keys.
|
||||
"""
|
||||
return f"https://{host}/{owner}/{repo_slug}.git"
|
||||
|
|
@ -1,65 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from byteb4rb1e.utils.http import client as http_client
|
||||
|
||||
|
||||
GITHUB_API = "https://api.github.com"
|
||||
|
||||
|
||||
def http_headers(token: Optional[str]) -> Dict[str, str]:
|
||||
headers = {
|
||||
"Accept": "application/vnd.github+json",
|
||||
"User-Agent": "sphinx-h5p-worker1"
|
||||
}
|
||||
if token:
|
||||
# Use standard PAT header; token not logged anywhere.
|
||||
headers["Authorization"] = f"Bearer {token}"
|
||||
return headers
|
||||
|
||||
|
||||
def blob_sha(path: Path) -> str:
|
||||
"""Calculate Git blob SHA-1 for a file, matching GitHub API 'sha'."""
|
||||
data = path.read_bytes()
|
||||
header = f"blob {len(data)}\0".encode("utf-8")
|
||||
store = header + data
|
||||
return hashlib.sha1(store).hexdigest()
|
||||
|
||||
|
||||
def list_org_repos(org: str, token: Optional[str]) -> List[Dict[str, Any]]:
|
||||
repos: List[Dict[str, Any]] = []
|
||||
page = 1
|
||||
per_page = 100
|
||||
while True:
|
||||
url = f"{GITHUB_API}/orgs/{org}/repos"
|
||||
resp = http_client.get(
|
||||
url,
|
||||
params={"page": page, "per_page": per_page, "type": "public"},
|
||||
headers=http_headers(token),
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
raise RuntimeError(f"Failed to list repos for org {org}: {resp.status_code} {resp.text}")
|
||||
batch = resp.json()
|
||||
if not batch:
|
||||
break
|
||||
repos.extend(batch)
|
||||
page += 1
|
||||
return repos
|
||||
|
||||
|
||||
def fetch_file(
|
||||
org: str,
|
||||
repo: str,
|
||||
path: str,
|
||||
token: str
|
||||
) -> http_client.HttpResponse:
|
||||
"""
|
||||
"""
|
||||
url = f"{GITHUB_API}/repos/{org}/{repo}/{path}"
|
||||
|
||||
return http_client.get(
|
||||
url,
|
||||
headers=http_headers(token),
|
||||
)
|
||||
|
|
@ -1,91 +0,0 @@
|
|||
from typing import Optional
|
||||
|
||||
|
||||
class RollingHash:
|
||||
"""implementation of Rabin-Karp rolling hash
|
||||
"""
|
||||
#: default base
|
||||
base: int = 31
|
||||
#: default modulus
|
||||
mod: int = 10**9 + 7
|
||||
#: current computed hash
|
||||
_hash: int
|
||||
#: prime number base (e.g., 31)
|
||||
_base: int
|
||||
#: large prime modulus (to prevent overflow)
|
||||
_mod: int
|
||||
# Precomputation of ``base^(length-1) % mod`` for removing the old byte when
|
||||
# rolling over
|
||||
_hbase_factor: int
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
data: bytes,
|
||||
base: Optional[int] = None,
|
||||
mod: Optional[int] = None
|
||||
):
|
||||
"""Initialize the rolling hash with a given base and modulus.
|
||||
|
||||
base: Prime number base (e.g., 31)
|
||||
mod: Large prime modulus to prevent overflow
|
||||
length: Length of the pattern to match
|
||||
"""
|
||||
self._base = base if base else RollingHash.base
|
||||
|
||||
self._mod = mod if mod else RollingHash.mod
|
||||
|
||||
self._hash = RollingHash.compute_initial_hash(
|
||||
data,
|
||||
self._base,
|
||||
self._mod
|
||||
)
|
||||
|
||||
self._hbase_factor = pow(self._base, len(data) - 1, self._mod)
|
||||
|
||||
@staticmethod
|
||||
def compute_initial_hash(
|
||||
data: bytes,
|
||||
base: int,
|
||||
mod: int,
|
||||
) -> int:
|
||||
"""Compute the hash for the initial window (first `length` bytes).
|
||||
|
||||
rather use this standalone for computing the hash of the search pattern,
|
||||
to avoid the overhead of instantiating an object.
|
||||
|
||||
:param data: data to build hash for
|
||||
:param base:
|
||||
:param: mod:
|
||||
|
||||
:returns: hash of data
|
||||
"""
|
||||
hash_ = 0
|
||||
for i in range(len(data)):
|
||||
# computing the modulus at each iteration, as to avoid the summed
|
||||
# integer to be chunky, as in HUUUUGEE...
|
||||
hash_ = (hash_ * base + data[i]) % mod
|
||||
return hash_
|
||||
|
||||
def roll(self, old_byte: int, new_byte: int) -> int:
|
||||
"""Efficiently update hash by removing ``old_byte`` and adding
|
||||
``new_byte``
|
||||
|
||||
The old_byte removal uses a pre-computed value of the highest base used
|
||||
in the polynomial calculation. This speeds things up a bit.
|
||||
|
||||
I was thinking about a way on how to store the old_byte efficiently
|
||||
within the class object, but that would require storing the entire data,
|
||||
basically doubling the memory consumption as the data must definetly
|
||||
also live outside of the class object. A memoryview could solve this
|
||||
problem, but at the cost of making the implementation more complex, so
|
||||
this will have to do.
|
||||
|
||||
:param old_byte: The ordinal of the first byte in buffer to roll over
|
||||
:param new_byte: The ordinal of the byte newly appended to the buffer
|
||||
"""
|
||||
# Remove old
|
||||
self._hash = (self._hash - old_byte * self._hbase_factor) % self.mod
|
||||
# Add new
|
||||
self._hash = (self._hash * self.base + new_byte) % self.mod
|
||||
|
||||
return self._hash
|
||||
|
|
@ -1,41 +0,0 @@
|
|||
import email
|
||||
import importlib.resources
|
||||
import mimetypes
|
||||
from urllib.request import URLError
|
||||
import urllib.request
|
||||
|
||||
|
||||
class PkgHandler(urllib.request.BaseHandler):
|
||||
"""
|
||||
"""
|
||||
def pkg_open(self, req) -> urllib.request.addinfourl:
|
||||
pkg_files = importlib.resources.files(req.host)
|
||||
|
||||
try:
|
||||
fh = next(
|
||||
pkg_files.glob(req.selector.lstrip('//'))
|
||||
).open('rb')
|
||||
except Exception as e:
|
||||
raise URLError(f'{e.__class__.__name__}: {e}') from e
|
||||
|
||||
fh.seek(0, 2);
|
||||
size = fh.tell();
|
||||
fh.seek(0);
|
||||
|
||||
mtype, compression = mimetypes.guess_type(req.selector)
|
||||
|
||||
if compression and mtype:
|
||||
mtype = f"{mtype}+{compression}"
|
||||
|
||||
headers = email.message_from_string(
|
||||
'Content-Type: %s\nContent-Length: %d\n' %
|
||||
(mtype or 'text/plain', size)
|
||||
)
|
||||
|
||||
if not mtype or mtype.startswith('text/'):
|
||||
fh.close()
|
||||
fh = next(
|
||||
pkg_files.glob(req.selector.lstrip('//'))
|
||||
).open('r')
|
||||
|
||||
return urllib.request.addinfourl(fh, headers, None)
|
||||
|
|
@ -1,331 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Git subprocess wrapper for repository operations.
|
||||
|
||||
Provides primitives for mirror cloning, syncing, remote management,
|
||||
file extraction from bare repos, and submodule management.
|
||||
No pygit2 or gitpython, uses subprocess only.
|
||||
"""
|
||||
import logging
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GitError(Exception):
|
||||
"""A git subprocess returned a non-zero exit code."""
|
||||
|
||||
def __init__(self, args: List[str], returncode: int, stderr: str):
|
||||
self.args_list = args
|
||||
self.returncode = returncode
|
||||
self.stderr = stderr
|
||||
super().__init__(
|
||||
f"git exited {returncode}: {' '.join(args)}\n{stderr}"
|
||||
)
|
||||
|
||||
|
||||
def parse_base_url(base_url: str) -> str:
|
||||
"""Extract the workspace from an SCP-style base URL.
|
||||
|
||||
Accepts any host (Bitbucket, Forgejo, GitHub, ...) as long as
|
||||
the URL is SCP-style::
|
||||
|
||||
git@bitbucket.org:byteb4rb1e/foo.git → byteb4rb1e
|
||||
git@git.code.tiararodney.com:h5p-mirror/foo.git → h5p-mirror
|
||||
"""
|
||||
# SCP-style: git@host:workspace/repo
|
||||
if ":" not in base_url or "//" in base_url:
|
||||
raise ValueError(
|
||||
f"Expected SCP-style URL (git@host:workspace), "
|
||||
f"got: {base_url}"
|
||||
)
|
||||
_, workspace = base_url.split(":", 1)
|
||||
return str(Path(workspace).parent)
|
||||
|
||||
|
||||
def parse_repo_name(base_url: str) -> str:
|
||||
"""Extract the repository name from an SCP-style base URL.
|
||||
|
||||
Accepts any host (Bitbucket, Forgejo, GitHub, ...) as long as
|
||||
the URL is SCP-style::
|
||||
|
||||
git@bitbucket.org:byteb4rb1e/foo.git → foo
|
||||
git@git.code.tiararodney.com:h5p-mirror/foo.git → foo
|
||||
"""
|
||||
# SCP-style: git@host:workspace/repo
|
||||
if ":" not in base_url or "//" in base_url:
|
||||
raise ValueError(
|
||||
f"Expected SCP-style URL (git@host:workspace), "
|
||||
f"got: {base_url}"
|
||||
)
|
||||
_, workspace = base_url.split(":", 1)
|
||||
return Path(workspace).name.split('.')[0]
|
||||
|
||||
|
||||
|
||||
def _run(
|
||||
args: List[str],
|
||||
cwd: Optional[Path] = None,
|
||||
capture_stdout: bool = False,
|
||||
) -> subprocess.CompletedProcess: # type: ignore[type-arg]
|
||||
"""Run a git command, raising GitError on failure."""
|
||||
cmd = ["git"] + args
|
||||
logger.debug("$ %s", " ".join(cmd))
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
cwd=cwd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
raise GitError(cmd, result.returncode, result.stderr.strip())
|
||||
return result
|
||||
|
||||
|
||||
def mirror_clone(source_url: str, dest: Path) -> None:
|
||||
"""Clone a repository as a bare mirror.
|
||||
|
||||
Equivalent to ``git clone --mirror <source_url> <dest>``.
|
||||
The destination directory must not already exist.
|
||||
"""
|
||||
_run(["clone", "--mirror", source_url, str(dest)])
|
||||
logger.info("Cloned mirror %s → %s", source_url, dest)
|
||||
|
||||
|
||||
def add_remote(repo: Path, name: str, url: str) -> None:
|
||||
"""Add a named remote to a bare repository."""
|
||||
_run(["remote", "add", name, url], cwd=repo)
|
||||
logger.debug("Added remote %s → %s in %s", name, url, repo)
|
||||
|
||||
|
||||
def has_remote(repo: Path, name: str) -> bool:
|
||||
"""Check whether a named remote exists."""
|
||||
result = _run(["remote"], cwd=repo)
|
||||
return name in result.stdout.splitlines()
|
||||
|
||||
|
||||
def mirror_update(repo: Path) -> None:
|
||||
"""Fetch all remotes in a bare mirror repository.
|
||||
|
||||
Equivalent to ``git remote update`` inside the bare repo.
|
||||
"""
|
||||
_run(["remote", "update"], cwd=repo)
|
||||
logger.debug("Updated remotes in %s", repo)
|
||||
|
||||
|
||||
def fetch(repo: Path, remote: str = "origin") -> None:
|
||||
"""Fetch from a single remote."""
|
||||
_run(["fetch", remote], cwd=repo)
|
||||
logger.debug("fetched %s in %s", remote, repo)
|
||||
|
||||
|
||||
def show_ref(repo: Path) -> str:
|
||||
"""Return the raw output of ``git show-ref`` (all refs + SHAs).
|
||||
|
||||
Returns an empty string if the repo has no refs.
|
||||
"""
|
||||
try:
|
||||
result = _run(["show-ref"], cwd=repo)
|
||||
return result.stdout
|
||||
except GitError:
|
||||
return ""
|
||||
|
||||
|
||||
def ls_remote(repo: Path, remote: str) -> str:
|
||||
"""Return the raw output of ``git ls-remote <remote>``.
|
||||
|
||||
Returns an empty string if the remote has no refs or on error.
|
||||
"""
|
||||
try:
|
||||
result = _run(["ls-remote", remote], cwd=repo)
|
||||
return result.stdout
|
||||
except GitError:
|
||||
return ""
|
||||
|
||||
|
||||
def mirror_push(repo: Path, remote: str) -> None:
|
||||
"""Push the full mirror to a remote.
|
||||
|
||||
Equivalent to ``git push --mirror <remote>``.
|
||||
"""
|
||||
_run(["push", "--mirror", remote], cwd=repo)
|
||||
logger.info("Pushed mirror to %s from %s", remote, repo)
|
||||
|
||||
|
||||
def read_file(
|
||||
repo: Path,
|
||||
filepath: str,
|
||||
ref: str = "HEAD",
|
||||
) -> Optional[str]:
|
||||
"""Extract a file's contents from a bare repo without checkout.
|
||||
|
||||
Returns the file content as a string, or None if the file does
|
||||
not exist at the given ref.
|
||||
"""
|
||||
try:
|
||||
result = _run(
|
||||
["show", f"{ref}:{filepath}"],
|
||||
cwd=repo,
|
||||
capture_stdout=True,
|
||||
)
|
||||
return result.stdout
|
||||
except GitError:
|
||||
return None
|
||||
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Ref / tag primitives
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
def list_tags(repo: Path) -> List[str]:
|
||||
"""List all tags in a repository."""
|
||||
result = _run(["tag", "-l"], cwd=repo)
|
||||
return [t for t in result.stdout.splitlines() if t]
|
||||
|
||||
|
||||
def resolve_ref(repo: Path, ref: str) -> str:
|
||||
"""Resolve a ref to a full SHA.
|
||||
|
||||
Raises GitError if the ref cannot be resolved.
|
||||
"""
|
||||
result = _run(
|
||||
["rev-parse", ref], cwd=repo, capture_stdout=True,
|
||||
)
|
||||
return result.stdout.strip()
|
||||
|
||||
|
||||
def head_ref(repo: Path) -> str:
|
||||
"""Return the full SHA of HEAD."""
|
||||
return resolve_ref(repo, "HEAD")
|
||||
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Pull-through bare clone cache
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
def bare_path_for_url(url: str, cache_dir: Path) -> Path:
|
||||
"""Derive a cache path from a clone URL.
|
||||
|
||||
Strips scheme/host, keeps the path component, appends ``.git``.
|
||||
|
||||
Examples::
|
||||
|
||||
https://github.com/h5p/h5p-multi-choice
|
||||
→ cache_dir / h5p / h5p-multi-choice.git
|
||||
git@github.com:h5p/h5p-multi-choice.git
|
||||
→ cache_dir / h5p / h5p-multi-choice.git
|
||||
"""
|
||||
# Handle SCP-style URLs (git@host:path)
|
||||
if ":" in url and "//" not in url:
|
||||
path_part = url.split(":", 1)[1]
|
||||
else:
|
||||
# Strip scheme + host
|
||||
from urllib.parse import urlparse
|
||||
parsed = urlparse(url)
|
||||
path_part = parsed.path.lstrip("/")
|
||||
|
||||
# Strip trailing .git if present, then re-add it
|
||||
if path_part.endswith(".git"):
|
||||
path_part = path_part[:-4]
|
||||
|
||||
return cache_dir / (path_part + ".git")
|
||||
|
||||
|
||||
def ensure_bare_clone(url: str, cache_dir: Path) -> Path:
|
||||
"""Ensure a bare mirror clone exists in *cache_dir*.
|
||||
|
||||
If the bare repo already exists, fetches updates via
|
||||
``mirror_update``. Otherwise, creates a new mirror clone.
|
||||
Returns the path to the bare repo.
|
||||
"""
|
||||
bare_path = bare_path_for_url(url, cache_dir)
|
||||
if bare_path.exists():
|
||||
mirror_update(bare_path)
|
||||
logger.debug("Updated existing cache %s", bare_path)
|
||||
else:
|
||||
bare_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
mirror_clone(url, bare_path)
|
||||
logger.info("Cached new bare clone %s", bare_path)
|
||||
return bare_path
|
||||
|
||||
|
||||
# -------------------------------------------------------------------
|
||||
# Submodule operations
|
||||
# -------------------------------------------------------------------
|
||||
|
||||
def has_submodule(repo: Path, path: str) -> bool:
|
||||
"""Check whether a submodule is registered at *path*.
|
||||
|
||||
Reads ``.gitmodules`` to determine whether the submodule exists.
|
||||
*path* is resolved relative to *repo*, then compared against
|
||||
the repository root so the check works when *repo* is a
|
||||
subdirectory of the actual git working tree.
|
||||
Returns False if ``.gitmodules`` does not exist.
|
||||
"""
|
||||
try:
|
||||
toplevel = Path(
|
||||
_run(
|
||||
["rev-parse", "--show-toplevel"], cwd=repo,
|
||||
).stdout.strip()
|
||||
)
|
||||
except GitError:
|
||||
return False
|
||||
gitmodules = toplevel / ".gitmodules"
|
||||
if not gitmodules.is_file():
|
||||
return False
|
||||
# Resolve the full path relative to the repo root
|
||||
full_path = (repo / path).resolve()
|
||||
try:
|
||||
rel_path = str(full_path.relative_to(toplevel.resolve()))
|
||||
except ValueError:
|
||||
return False
|
||||
try:
|
||||
result = _run(
|
||||
["config", "--file", str(gitmodules),
|
||||
"--get-regexp", r"submodule\..*\.path"],
|
||||
cwd=toplevel,
|
||||
)
|
||||
except GitError:
|
||||
return False
|
||||
for line in result.stdout.splitlines():
|
||||
parts = line.split(None, 1)
|
||||
if len(parts) == 2 and parts[1] == rel_path:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def submodule_add(repo: Path, url: str, path: str) -> None:
|
||||
"""Add a git submodule at *path* pointing to *url*.
|
||||
|
||||
Equivalent to ``git submodule add <url> <path>`` inside *repo*.
|
||||
"""
|
||||
_run(["submodule", "add", url, path], cwd=repo)
|
||||
logger.info("Added submodule %s → %s", url, path)
|
||||
|
||||
|
||||
def submodule_update(repo: Path, path: str) -> None:
|
||||
"""Fetch and update a submodule to the latest remote HEAD.
|
||||
|
||||
Enters the submodule directory, fetches origin, and checks out
|
||||
the latest commit on the remote default branch.
|
||||
"""
|
||||
sub_path = repo / path
|
||||
_run(["fetch", "origin"], cwd=sub_path)
|
||||
# Determine default branch from remote HEAD
|
||||
result = _run(
|
||||
["symbolic-ref", "refs/remotes/origin/HEAD",
|
||||
"--short"],
|
||||
cwd=sub_path,
|
||||
)
|
||||
default_branch = result.stdout.strip()
|
||||
_run(["checkout", default_branch], cwd=sub_path)
|
||||
logger.info("Updated submodule %s to %s", path, default_branch)
|
||||
|
||||
|
||||
def submodule_checkout(repo: Path, path: str, ref: str) -> None:
|
||||
"""Fetch and checkout a specific ref in a submodule."""
|
||||
sub_path = repo / path
|
||||
_run(["fetch", "origin"], cwd=sub_path)
|
||||
_run(["checkout", ref], cwd=sub_path)
|
||||
logger.info("Checked out submodule %s at %s", path, ref)
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
from dataclasses import dataclass
|
||||
from http.server import SimpleHTTPRequestHandler
|
||||
|
||||
from byteb4rb1e.utils.io import ChunksIO
|
||||
from byteb4rb1e_utils.io import ChunksIO
|
||||
|
||||
|
||||
@dataclass
|
||||
|
|
@ -8,12 +8,12 @@ from http.server import HTTPServer
|
|||
from io import BytesIO, IOBase
|
||||
from typing import Optional, Tuple, List
|
||||
|
||||
from byteb4rb1e.utils.http.server import (
|
||||
from byteb4rb1e_utils.http.server import (
|
||||
HandlerOptions,
|
||||
MultipartUploadHandler,
|
||||
ServerOptions,
|
||||
)
|
||||
from byteb4rb1e.utils.io import ChunksIO
|
||||
from byteb4rb1e_utils.io import ChunksIO
|
||||
|
||||
|
||||
__doc__ = """tsmuds - Tiara's Simple Multipart Upload Debugging Server
|
||||
68
src/byteb4rb1e_utils/string.py
Normal file
68
src/byteb4rb1e_utils/string.py
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
from typing import List, Union
|
||||
|
||||
from byteb4rb1e_utils.collections import CircularBuffer
|
||||
|
||||
|
||||
class KnuthMorrisPratt:
|
||||
"""Knuth-Morris-Pratt string searching algorithm implemented as a class
|
||||
|
||||
https://gwern.net/doc/cs/algorithm/1977-knuth.pdf
|
||||
"""
|
||||
def __init__(self, pattern: bytes):
|
||||
"""
|
||||
"""
|
||||
self._table = KnuthMorrisPratt.build_table(pattern)
|
||||
self._pattern = pattern
|
||||
|
||||
@staticmethod
|
||||
def build_table(pattern: bytes) -> List[int]:
|
||||
"""builds the failure table
|
||||
"""
|
||||
table = [0] * len(pattern)
|
||||
j = 0
|
||||
for i in range(1, len(pattern)):
|
||||
while j > 0 and pattern[i] != pattern[j]:
|
||||
j = table[j - 1]
|
||||
if pattern[i] == pattern[j]:
|
||||
j += 1
|
||||
table[i] = j
|
||||
return table
|
||||
|
||||
def match_linear(
|
||||
self,
|
||||
data: Union[bytes, bytearray, memoryview],
|
||||
start: int = 0
|
||||
) -> bool:
|
||||
"""match against a linear fixed-size buffer
|
||||
|
||||
:returns: index of the match or -1 if not found
|
||||
"""
|
||||
m, j = len(self.pattern), 0
|
||||
|
||||
for i in range(start, len(data)):
|
||||
while j > 0 and data[i] != self.pattern[j]:
|
||||
j = self._table[j - 1]
|
||||
|
||||
if data[i] == self.pattern[j]:
|
||||
j += 1
|
||||
|
||||
if j == m:
|
||||
return i - m + 1
|
||||
return -1
|
||||
|
||||
def match_circular(self, data: CircularBuffer):
|
||||
"""Finds the boundary using KMP, handling circular wraparound."""
|
||||
i, j = data.start, 0 # Start checking from the oldest data in the buffer
|
||||
|
||||
while j < len(boundary) and (data.filled or i != data.end):
|
||||
if data.buf[i] == self.pattern[j]:
|
||||
i = (i + 1) % data.size
|
||||
j += 1
|
||||
if j == len(self.pattern): # Full match found
|
||||
return True
|
||||
elif j > 0:
|
||||
j = table[j - 1]
|
||||
else:
|
||||
i = (i + 1) % data.size
|
||||
|
||||
return False
|
||||
|
|
@ -1,33 +0,0 @@
|
|||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.pytest
|
||||
|
||||
from byteb4rb1e.testing.pytest import get_current_test
|
||||
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
|
||||
|
||||
|
||||
class Test_get_current_test:
|
||||
"""
|
||||
"""
|
||||
|
||||
def test_default(self):
|
||||
"""
|
||||
"""
|
||||
os.environ['PYTEST_CURRENT_TEST'] = 'foo::bar (something)'
|
||||
|
||||
result = get_current_test()
|
||||
|
||||
assert isinstance(result[0], Path)
|
||||
assert str(result[0].name) == 'foo'
|
||||
|
||||
assert result[1] == 'bar'
|
||||
|
||||
def test_invalid(self):
|
||||
"""
|
||||
"""
|
||||
del os.environ['PYTEST_CURRENT_TEST']
|
||||
with pytest.raises(RuntimeError):
|
||||
get_current_test()
|
||||
|
|
@ -1,21 +0,0 @@
|
|||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.pytest
|
||||
|
||||
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
|
||||
|
||||
|
||||
@run_in_subprocess_once()
|
||||
def test_run_in_subprocess_once(tmp_path):
|
||||
marker = tmp_path / "executed_in_subprocess.txt"
|
||||
|
||||
if marker.exists():
|
||||
raise AssertionError("Marker file exists before test logic ran (shouldn't happen in parent process)")
|
||||
|
||||
# Create proof of execution
|
||||
marker.write_text("Subprocess was here.")
|
||||
|
||||
# Now assert it
|
||||
assert marker.exists()
|
||||
|
|
@ -1,38 +0,0 @@
|
|||
from pathlib import Path
|
||||
import importlib.resources
|
||||
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.pytest
|
||||
|
||||
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
|
||||
from byteb4rb1e.testing.pytest.fixtures import (
|
||||
current_test,
|
||||
mock_system_site_package_dir
|
||||
)
|
||||
|
||||
|
||||
def test_current_test(current_test):
|
||||
"""
|
||||
"""
|
||||
suite_path, case_name = current_test
|
||||
|
||||
assert str(Path(__file__)) == str(suite_path)
|
||||
assert case_name == "test_current_test"
|
||||
|
||||
|
||||
@run_in_subprocess_once()
|
||||
def test_mock_system_site_package_dir(mock_system_site_package_dir):
|
||||
"""
|
||||
"""
|
||||
dummy_data = 'Hello'
|
||||
|
||||
pkgdir = mock_system_site_package_dir('foobarpkg')
|
||||
|
||||
(pkgdir / 'data.txt').write_text(dummy_data)
|
||||
|
||||
assert (pkgdir / '__init__.py').exists()
|
||||
|
||||
result = next(importlib.resources.files('foobarpkg').glob('data.txt')).read_text()
|
||||
|
||||
assert result == dummy_data
|
||||
|
|
@ -1,5 +0,0 @@
|
|||
def pytest_configure(config):
|
||||
# register an additional marker
|
||||
config.addinivalue_line(
|
||||
"markers", "pytest: test pytest integration"
|
||||
)
|
||||
|
|
@ -1,52 +0,0 @@
|
|||
"""Tests for custom argparse actions."""
|
||||
|
||||
from argparse import ArgumentParser
|
||||
|
||||
import pytest
|
||||
|
||||
from byteb4rb1e.utils.argparse.actions import KeyValueAction
|
||||
|
||||
|
||||
def _parse(*args):
|
||||
parser = ArgumentParser()
|
||||
parser.add_argument("--config", action=KeyValueAction, default={}, metavar="KEY=VALUE")
|
||||
return parser.parse_args(list(args))
|
||||
|
||||
|
||||
class TestKeyValueAction:
|
||||
|
||||
def test_single_pair(self):
|
||||
args = _parse("--config", "key=value")
|
||||
assert args.config == {"key": "value"}
|
||||
|
||||
def test_multiple_pairs(self):
|
||||
args = _parse("--config", "a=1", "--config", "b=2")
|
||||
assert args.config == {"a": "1", "b": "2"}
|
||||
|
||||
def test_dotted_key(self):
|
||||
args = _parse("--config", "provider.base_url=http://localhost")
|
||||
assert args.config == {"provider.base_url": "http://localhost"}
|
||||
|
||||
def test_value_with_equals(self):
|
||||
args = _parse("--config", "url=http://host?a=1&b=2")
|
||||
assert args.config == {"url": "http://host?a=1&b=2"}
|
||||
|
||||
def test_empty_value(self):
|
||||
args = _parse("--config", "key=")
|
||||
assert args.config == {"key": ""}
|
||||
|
||||
def test_strips_whitespace(self):
|
||||
args = _parse("--config", " key = value ")
|
||||
assert args.config == {"key": "value"}
|
||||
|
||||
def test_overwrites_duplicate_key(self):
|
||||
args = _parse("--config", "key=first", "--config", "key=second")
|
||||
assert args.config == {"key": "second"}
|
||||
|
||||
def test_default_empty_dict(self):
|
||||
args = _parse()
|
||||
assert args.config == {}
|
||||
|
||||
def test_no_equals_raises(self):
|
||||
with pytest.raises(SystemExit):
|
||||
_parse("--config", "no_equals_here")
|
||||
|
|
@ -1,347 +0,0 @@
|
|||
"""Unit tests for the config framework."""
|
||||
|
||||
from argparse import ArgumentParser, Namespace
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from byteb4rb1e.utils.config import (
|
||||
add_config_arguments,
|
||||
apply_cli_overrides,
|
||||
apply_overrides,
|
||||
ensure_ini,
|
||||
ensure_ini_multi,
|
||||
format_help,
|
||||
format_section,
|
||||
load_ini,
|
||||
resolve_hints,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SampleConfig:
|
||||
name: str = "default"
|
||||
count: int = 10
|
||||
ratio: float = 0.5
|
||||
enabled: bool = True
|
||||
|
||||
|
||||
class TestLoadIni:
|
||||
def test_loads_values(self, tmp_path):
|
||||
ini = tmp_path / "test.ini"
|
||||
ini.write_text(
|
||||
"[sample]\n"
|
||||
"name = custom\n"
|
||||
"count = 42\n"
|
||||
"ratio = 0.75\n"
|
||||
)
|
||||
config = load_ini(SampleConfig, ini)
|
||||
assert config.name == "custom"
|
||||
assert config.count == 42
|
||||
assert config.ratio == 0.75
|
||||
assert config.enabled is True # default
|
||||
|
||||
def test_missing_section_uses_defaults(self, tmp_path):
|
||||
ini = tmp_path / "test.ini"
|
||||
ini.write_text("[other]\nfoo = bar\n")
|
||||
config = load_ini(SampleConfig, ini)
|
||||
assert config.name == "default"
|
||||
assert config.count == 10
|
||||
|
||||
def test_missing_file_uses_defaults(self, tmp_path):
|
||||
config = load_ini(
|
||||
SampleConfig, tmp_path / "missing.ini"
|
||||
)
|
||||
assert config.name == "default"
|
||||
|
||||
def test_unknown_key_raises(self, tmp_path):
|
||||
ini = tmp_path / "test.ini"
|
||||
ini.write_text("[sample]\nunknown_key = bad\n")
|
||||
with pytest.raises(ValueError, match="unknown_key"):
|
||||
load_ini(SampleConfig, ini)
|
||||
|
||||
def test_custom_section_name(self, tmp_path):
|
||||
ini = tmp_path / "test.ini"
|
||||
ini.write_text("[mysection]\nname = custom\n")
|
||||
config = load_ini(
|
||||
SampleConfig, ini, section="mysection"
|
||||
)
|
||||
assert config.name == "custom"
|
||||
|
||||
def test_comments_ignored(self, tmp_path):
|
||||
ini = tmp_path / "test.ini"
|
||||
ini.write_text(
|
||||
"[sample]\n"
|
||||
"# this is a comment\n"
|
||||
"name = works # inline comment\n"
|
||||
)
|
||||
config = load_ini(SampleConfig, ini)
|
||||
assert config.name == "works"
|
||||
|
||||
|
||||
class TestAddConfigArguments:
|
||||
def test_generates_flags(self):
|
||||
parser = ArgumentParser()
|
||||
add_config_arguments(SampleConfig, parser)
|
||||
args = parser.parse_args(
|
||||
["--name", "cli", "--count", "99"]
|
||||
)
|
||||
assert args.name == "cli"
|
||||
assert args.count == 99
|
||||
|
||||
def test_defaults_are_none(self):
|
||||
parser = ArgumentParser()
|
||||
add_config_arguments(SampleConfig, parser)
|
||||
args = parser.parse_args([])
|
||||
assert args.name is None
|
||||
assert args.count is None
|
||||
|
||||
def test_underscores_become_dashes(self):
|
||||
@dataclass
|
||||
class DashConfig:
|
||||
my_long_name: str = "x"
|
||||
|
||||
parser = ArgumentParser()
|
||||
add_config_arguments(DashConfig, parser)
|
||||
args = parser.parse_args(
|
||||
["--my-long-name", "val"]
|
||||
)
|
||||
assert args.my_long_name == "val"
|
||||
|
||||
|
||||
class TestApplyCliOverrides:
|
||||
def test_overrides_set_values(self):
|
||||
config = SampleConfig()
|
||||
args = Namespace(name="override", count=None,
|
||||
ratio=None, enabled=None)
|
||||
result = apply_cli_overrides(config, args)
|
||||
assert result.name == "override"
|
||||
assert result.count == 10 # unchanged
|
||||
|
||||
def test_no_overrides_returns_same(self):
|
||||
config = SampleConfig()
|
||||
args = Namespace(name=None, count=None,
|
||||
ratio=None, enabled=None)
|
||||
result = apply_cli_overrides(config, args)
|
||||
assert result.name == "default"
|
||||
assert result is config
|
||||
|
||||
|
||||
class TestEnsureIni:
|
||||
def test_creates_file_if_missing(self, tmp_path):
|
||||
ini = tmp_path / "new.ini"
|
||||
assert not ini.exists()
|
||||
config = ensure_ini(SampleConfig, ini)
|
||||
assert ini.exists()
|
||||
assert config.name == "default"
|
||||
assert config.count == 10
|
||||
|
||||
def test_created_file_has_all_fields(self, tmp_path):
|
||||
ini = tmp_path / "new.ini"
|
||||
ensure_ini(SampleConfig, ini)
|
||||
content = ini.read_text()
|
||||
assert "name" in content
|
||||
assert "count" in content
|
||||
assert "ratio" in content
|
||||
assert "enabled" in content
|
||||
|
||||
def test_created_file_has_comments(self, tmp_path):
|
||||
ini = tmp_path / "new.ini"
|
||||
ensure_ini(SampleConfig, ini)
|
||||
content = ini.read_text()
|
||||
assert "# name (str)" in content
|
||||
assert "# count (int)" in content
|
||||
|
||||
def test_reads_existing_file(self, tmp_path):
|
||||
ini = tmp_path / "existing.ini"
|
||||
ini.write_text("[sample]\ncount = 42\n")
|
||||
config = ensure_ini(SampleConfig, ini)
|
||||
assert config.count == 42
|
||||
|
||||
def test_does_not_overwrite_existing(self, tmp_path):
|
||||
ini = tmp_path / "existing.ini"
|
||||
ini.write_text("[sample]\ncount = 42\n")
|
||||
ensure_ini(SampleConfig, ini)
|
||||
content = ini.read_text()
|
||||
assert content == "[sample]\ncount = 42\n"
|
||||
|
||||
def test_created_file_is_loadable(self, tmp_path):
|
||||
ini = tmp_path / "new.ini"
|
||||
ensure_ini(SampleConfig, ini)
|
||||
config = load_ini(SampleConfig, ini)
|
||||
assert config.name == "default"
|
||||
assert config.count == 10
|
||||
assert config.ratio == 0.5
|
||||
|
||||
|
||||
class TestIntegration:
|
||||
def test_ini_then_cli_override(self, tmp_path):
|
||||
ini = tmp_path / "test.ini"
|
||||
ini.write_text("[sample]\ncount = 42\n")
|
||||
config = load_ini(SampleConfig, ini)
|
||||
assert config.count == 42
|
||||
|
||||
args = Namespace(name=None, count=99,
|
||||
ratio=None, enabled=None)
|
||||
config = apply_cli_overrides(config, args)
|
||||
assert config.count == 99
|
||||
assert config.name == "default"
|
||||
|
||||
def test_ensure_then_cli_override(self, tmp_path):
|
||||
ini = tmp_path / "new.ini"
|
||||
config = ensure_ini(SampleConfig, ini)
|
||||
assert config.count == 10
|
||||
|
||||
args = Namespace(name=None, count=99,
|
||||
ratio=None, enabled=None)
|
||||
config = apply_cli_overrides(config, args)
|
||||
assert config.count == 99
|
||||
assert config.name == "default"
|
||||
|
||||
# Config file unchanged
|
||||
reloaded = load_ini(SampleConfig, ini)
|
||||
assert reloaded.count == 10
|
||||
|
||||
|
||||
class TestResolveHints:
|
||||
def test_returns_type_dict(self):
|
||||
hints = resolve_hints(SampleConfig)
|
||||
assert hints["name"] is str
|
||||
assert hints["count"] is int
|
||||
assert hints["ratio"] is float
|
||||
assert hints["enabled"] is bool
|
||||
|
||||
|
||||
class TestFormatSection:
|
||||
def test_includes_section_header(self):
|
||||
text = format_section(SampleConfig)
|
||||
assert "[sample]" in text
|
||||
|
||||
def test_custom_section_name(self):
|
||||
text = format_section(SampleConfig, "custom")
|
||||
assert "[custom]" in text
|
||||
|
||||
def test_includes_all_fields(self):
|
||||
text = format_section(SampleConfig)
|
||||
assert "name = default" in text
|
||||
assert "count = 10" in text
|
||||
assert "ratio = 0.5" in text
|
||||
assert "enabled = True" in text
|
||||
|
||||
def test_includes_type_comments(self):
|
||||
text = format_section(SampleConfig)
|
||||
assert "# name (str)" in text
|
||||
assert "# count (int)" in text
|
||||
|
||||
def test_is_loadable(self, tmp_path):
|
||||
ini = tmp_path / "test.ini"
|
||||
ini.write_text(format_section(SampleConfig) + "\n")
|
||||
config = load_ini(SampleConfig, ini)
|
||||
assert config.name == "default"
|
||||
assert config.count == 10
|
||||
|
||||
|
||||
class TestEnsureIniMulti:
|
||||
def test_creates_file_with_multiple_sections(self, tmp_path):
|
||||
@dataclass
|
||||
class OtherConfig:
|
||||
host: str = "localhost"
|
||||
port: int = 8080
|
||||
|
||||
ini = tmp_path / "multi.ini"
|
||||
ensure_ini_multi([
|
||||
(SampleConfig, None),
|
||||
(OtherConfig, "server"),
|
||||
], ini)
|
||||
|
||||
content = ini.read_text()
|
||||
assert "[sample]" in content
|
||||
assert "[server]" in content
|
||||
assert "name = default" in content
|
||||
assert "host = localhost" in content
|
||||
|
||||
def test_does_not_overwrite_existing(self, tmp_path):
|
||||
ini = tmp_path / "multi.ini"
|
||||
ini.write_text("[existing]\nfoo = bar\n")
|
||||
ensure_ini_multi([(SampleConfig, None)], ini)
|
||||
content = ini.read_text()
|
||||
assert content == "[existing]\nfoo = bar\n"
|
||||
|
||||
def test_sections_are_loadable(self, tmp_path):
|
||||
@dataclass
|
||||
class DbConfig:
|
||||
url: str = "sqlite:///test.db"
|
||||
|
||||
ini = tmp_path / "multi.ini"
|
||||
ensure_ini_multi([
|
||||
(SampleConfig, None),
|
||||
(DbConfig, "database"),
|
||||
], ini)
|
||||
|
||||
sample = load_ini(SampleConfig, ini)
|
||||
db = load_ini(DbConfig, ini, section="database")
|
||||
assert sample.name == "default"
|
||||
assert db.url == "sqlite:///test.db"
|
||||
|
||||
|
||||
class TestApplyOverrides:
|
||||
def test_applies_dotted_path(self):
|
||||
config = SampleConfig()
|
||||
result = apply_overrides(config, {
|
||||
"provider.name": "custom",
|
||||
"provider.count": "99",
|
||||
}, prefix="provider")
|
||||
assert result.name == "custom"
|
||||
assert result.count == 99
|
||||
|
||||
def test_without_prefix(self):
|
||||
config = SampleConfig()
|
||||
result = apply_overrides(config, {
|
||||
"name": "direct",
|
||||
"count": "42",
|
||||
})
|
||||
assert result.name == "direct"
|
||||
assert result.count == 42
|
||||
|
||||
def test_no_matching_keys_returns_same(self):
|
||||
config = SampleConfig()
|
||||
result = apply_overrides(config, {"other.key": "val"}, prefix="provider")
|
||||
assert result is config
|
||||
|
||||
def test_bool_coercion(self):
|
||||
config = SampleConfig()
|
||||
result = apply_overrides(config, {"enabled": "false"})
|
||||
assert result.enabled is False
|
||||
|
||||
def test_preserves_unset_fields(self):
|
||||
config = SampleConfig()
|
||||
result = apply_overrides(config, {"name": "changed"})
|
||||
assert result.name == "changed"
|
||||
assert result.count == 10 # unchanged
|
||||
assert result.ratio == 0.5 # unchanged
|
||||
|
||||
|
||||
class TestFormatHelp:
|
||||
def test_lists_all_fields(self):
|
||||
lines = format_help(SampleConfig)
|
||||
assert len(lines) == 4
|
||||
assert any("name" in l for l in lines)
|
||||
assert any("count" in l for l in lines)
|
||||
|
||||
def test_includes_types(self):
|
||||
lines = format_help(SampleConfig)
|
||||
text = "\n".join(lines)
|
||||
assert "str" in text
|
||||
assert "int" in text
|
||||
|
||||
def test_includes_defaults(self):
|
||||
lines = format_help(SampleConfig)
|
||||
text = "\n".join(lines)
|
||||
assert "default" in text
|
||||
assert "10" in text
|
||||
|
||||
def test_with_prefix(self):
|
||||
lines = format_help(SampleConfig, prefix="provider")
|
||||
assert any("provider.name" in l for l in lines)
|
||||
assert any("provider.count" in l for l in lines)
|
||||
|
|
@ -1,217 +0,0 @@
|
|||
"""Tests for the generic HTTP client."""
|
||||
|
||||
import email.message
|
||||
import io
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from types import TracebackType
|
||||
from typing import Dict, List, Optional, Tuple, Type, Union
|
||||
|
||||
import pytest
|
||||
|
||||
from byteb4rb1e.utils.http.client import HttpResponse, HttpSession
|
||||
|
||||
|
||||
class _FakeRawResponse:
|
||||
"""Stands in for the object returned by OpenerDirector.open()."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
status: int = 200,
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
data: bytes = b"",
|
||||
) -> None:
|
||||
self._status = status
|
||||
self._headers = headers or {}
|
||||
self._data = data
|
||||
|
||||
def getcode(self) -> int:
|
||||
return self._status
|
||||
|
||||
def getheaders(self) -> List[Tuple[str, str]]:
|
||||
return list(self._headers.items())
|
||||
|
||||
def read(self) -> bytes:
|
||||
return self._data
|
||||
|
||||
def __enter__(self) -> "_FakeRawResponse":
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: Optional[Type[BaseException]],
|
||||
exc: Optional[BaseException],
|
||||
tb: Optional[TracebackType],
|
||||
) -> None:
|
||||
return None
|
||||
|
||||
|
||||
class _FakeOpener:
|
||||
"""Records requests and replays canned responses."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
responses: Optional[
|
||||
List[Union[_FakeRawResponse, Exception]]
|
||||
] = None,
|
||||
) -> None:
|
||||
self.requests: List[urllib.request.Request] = []
|
||||
self._responses = list(responses or [_FakeRawResponse()])
|
||||
|
||||
def open(
|
||||
self,
|
||||
req: urllib.request.Request,
|
||||
timeout: Optional[int] = None,
|
||||
) -> _FakeRawResponse:
|
||||
self.requests.append(req)
|
||||
response = self._responses.pop(0)
|
||||
if isinstance(response, Exception):
|
||||
raise response
|
||||
return response
|
||||
|
||||
|
||||
def _http_error(
|
||||
code: int = 404,
|
||||
data: bytes = b"",
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
) -> urllib.error.HTTPError:
|
||||
hdrs = email.message.Message()
|
||||
for key, value in (headers or {}).items():
|
||||
hdrs[key] = value
|
||||
return urllib.error.HTTPError(
|
||||
"http://testserver/", code, "error", hdrs, io.BytesIO(data),
|
||||
)
|
||||
|
||||
|
||||
class TestHttpResponse:
|
||||
|
||||
def test_json(self) -> None:
|
||||
resp = HttpResponse(200, {}, b'{"a": 1}')
|
||||
assert resp.json() == {"a": 1}
|
||||
|
||||
def test_text(self) -> None:
|
||||
resp = HttpResponse(200, {}, b"hello")
|
||||
assert resp.text == "hello"
|
||||
|
||||
def test_text_replaces_invalid_utf8(self) -> None:
|
||||
resp = HttpResponse(200, {}, b"\xff\xfe")
|
||||
assert "<EFBFBD>" in resp.text
|
||||
|
||||
def test_reason_defaults_to_none(self) -> None:
|
||||
resp = HttpResponse(200, {}, b"")
|
||||
assert resp.reason is None
|
||||
|
||||
def test_frozen(self) -> None:
|
||||
resp = HttpResponse(200, {}, b"")
|
||||
with pytest.raises(Exception):
|
||||
resp.status_code = 500
|
||||
|
||||
|
||||
class TestHttpSession:
|
||||
|
||||
def test_opener_has_cookie_processor(self) -> None:
|
||||
session = HttpSession()
|
||||
processors = [
|
||||
h for h in session._opener.handlers
|
||||
if isinstance(h, urllib.request.HTTPCookieProcessor)
|
||||
]
|
||||
assert len(processors) == 1
|
||||
assert processors[0].cookiejar is session._jar
|
||||
|
||||
def test_get(self) -> None:
|
||||
opener = _FakeOpener([
|
||||
_FakeRawResponse(200, {"X-Foo": "bar"}, b"body"),
|
||||
])
|
||||
session = HttpSession()
|
||||
session._opener = opener
|
||||
|
||||
resp = session.get("http://testserver/page")
|
||||
|
||||
assert resp.status_code == 200
|
||||
assert resp.data == b"body"
|
||||
assert resp.headers == {"X-Foo": "bar"}
|
||||
assert opener.requests[0].get_method() == "GET"
|
||||
assert opener.requests[0].full_url == "http://testserver/page"
|
||||
|
||||
def test_get_with_params(self) -> None:
|
||||
opener = _FakeOpener()
|
||||
session = HttpSession()
|
||||
session._opener = opener
|
||||
|
||||
session.get("http://testserver/page", params={"a": "1", "b": "x y"})
|
||||
|
||||
assert opener.requests[0].full_url == (
|
||||
"http://testserver/page?a=1&b=x+y"
|
||||
)
|
||||
|
||||
def test_default_headers_sent(self) -> None:
|
||||
opener = _FakeOpener()
|
||||
session = HttpSession(default_headers={"User-Agent": "test"})
|
||||
session._opener = opener
|
||||
|
||||
session.get("http://testserver/")
|
||||
|
||||
assert opener.requests[0].get_header("User-agent") == "test"
|
||||
|
||||
def test_request_headers_override_defaults(self) -> None:
|
||||
opener = _FakeOpener()
|
||||
session = HttpSession(default_headers={"X-Token": "default"})
|
||||
session._opener = opener
|
||||
|
||||
session.get("http://testserver/", headers={"X-Token": "override"})
|
||||
|
||||
assert opener.requests[0].get_header("X-token") == "override"
|
||||
|
||||
def test_post_form_encodes_data(self) -> None:
|
||||
opener = _FakeOpener()
|
||||
session = HttpSession()
|
||||
session._opener = opener
|
||||
|
||||
session.post("http://testserver/login", data={"user": "u", "pass": "p"})
|
||||
|
||||
req = opener.requests[0]
|
||||
assert req.get_method() == "POST"
|
||||
assert isinstance(req.data, bytes)
|
||||
assert dict(urllib.parse.parse_qsl(req.data.decode())) == {
|
||||
"user": "u",
|
||||
"pass": "p",
|
||||
}
|
||||
assert req.get_header("Content-type") == (
|
||||
"application/x-www-form-urlencoded"
|
||||
)
|
||||
|
||||
def test_post_keeps_explicit_content_type(self) -> None:
|
||||
opener = _FakeOpener()
|
||||
session = HttpSession()
|
||||
session._opener = opener
|
||||
|
||||
session.post(
|
||||
"http://testserver/",
|
||||
data={"a": "1"},
|
||||
headers={"Content-Type": "text/plain"},
|
||||
)
|
||||
|
||||
assert opener.requests[0].get_header("Content-type") == "text/plain"
|
||||
|
||||
def test_post_without_data(self) -> None:
|
||||
opener = _FakeOpener()
|
||||
session = HttpSession()
|
||||
session._opener = opener
|
||||
|
||||
session.post("http://testserver/")
|
||||
|
||||
assert opener.requests[0].data is None
|
||||
|
||||
def test_http_error_returned_as_response(self) -> None:
|
||||
opener = _FakeOpener([
|
||||
_http_error(404, b"missing", {"X-Err": "yes"}),
|
||||
])
|
||||
session = HttpSession()
|
||||
session._opener = opener
|
||||
|
||||
resp = session.get("http://testserver/nope")
|
||||
|
||||
assert resp.status_code == 404
|
||||
assert resp.data == b"missing"
|
||||
assert resp.headers["X-Err"] == "yes"
|
||||
|
|
@ -1,133 +0,0 @@
|
|||
"""Tests for the Forgejo API wrapper."""
|
||||
|
||||
import json
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
import pytest
|
||||
|
||||
from byteb4rb1e.utils.http.client import HttpResponse
|
||||
from byteb4rb1e.utils.saas import forgejo
|
||||
|
||||
HOST = "git.example.com"
|
||||
|
||||
|
||||
class _Recorder:
|
||||
"""Records http_client calls and replays a canned response."""
|
||||
|
||||
def __init__(self, response: HttpResponse) -> None:
|
||||
self.calls: List[Tuple[str, Dict[str, Any]]] = []
|
||||
self._response = response
|
||||
|
||||
def __call__(self, url: str, **kwargs: Any) -> HttpResponse:
|
||||
self.calls.append((url, kwargs))
|
||||
return self._response
|
||||
|
||||
|
||||
class TestApiUrl:
|
||||
|
||||
def test_host_only(self) -> None:
|
||||
assert forgejo.api_url(HOST) == "https://git.example.com/api/v1"
|
||||
|
||||
|
||||
class TestHttpHeaders:
|
||||
|
||||
def test_token_header(self) -> None:
|
||||
headers = forgejo.http_headers("s3cret")
|
||||
assert headers["Authorization"] == "token s3cret"
|
||||
assert headers["Accept"] == "application/json"
|
||||
assert headers["Content-Type"] == "application/json"
|
||||
|
||||
|
||||
class TestRepositoryExists:
|
||||
|
||||
def test_exists(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
recorder = _Recorder(HttpResponse(200, {}, b"{}"))
|
||||
monkeypatch.setattr(forgejo.http_client, "get", recorder)
|
||||
|
||||
assert forgejo.repository_exists(HOST, "tiara", "repo", "t") is True
|
||||
url, kwargs = recorder.calls[0]
|
||||
assert url == "https://git.example.com/api/v1/repos/tiara/repo"
|
||||
assert kwargs["headers"]["Authorization"] == "token t"
|
||||
|
||||
def test_missing(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
recorder = _Recorder(HttpResponse(404, {}, b""))
|
||||
monkeypatch.setattr(forgejo.http_client, "get", recorder)
|
||||
|
||||
assert forgejo.repository_exists(HOST, "tiara", "repo", "t") is False
|
||||
|
||||
|
||||
class TestCreateRepository:
|
||||
|
||||
def _create(
|
||||
self,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
org: Optional[str] = None,
|
||||
**kwargs: Any,
|
||||
) -> _Recorder:
|
||||
recorder = _Recorder(HttpResponse(201, {}, b"{}"))
|
||||
monkeypatch.setattr(forgejo.http_client, "post", recorder)
|
||||
forgejo.create_repository(HOST, "repo", "t", org=org, **kwargs)
|
||||
return recorder
|
||||
|
||||
def test_user_repo_endpoint(
|
||||
self, monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
recorder = self._create(monkeypatch)
|
||||
url, _ = recorder.calls[0]
|
||||
assert url == "https://git.example.com/api/v1/user/repos"
|
||||
|
||||
def test_org_repo_endpoint(
|
||||
self, monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
recorder = self._create(monkeypatch, org="byteb4rb1e")
|
||||
url, _ = recorder.calls[0]
|
||||
assert url == "https://git.example.com/api/v1/orgs/byteb4rb1e/repos"
|
||||
|
||||
def test_body(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
recorder = self._create(
|
||||
monkeypatch, description="demo", is_private=False,
|
||||
)
|
||||
_, kwargs = recorder.calls[0]
|
||||
body = json.loads(kwargs["data"].decode("utf-8"))
|
||||
assert body == {
|
||||
"name": "repo",
|
||||
"private": False,
|
||||
"description": "demo",
|
||||
}
|
||||
|
||||
def test_defaults_to_private(
|
||||
self, monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
recorder = self._create(monkeypatch)
|
||||
_, kwargs = recorder.calls[0]
|
||||
body = json.loads(kwargs["data"].decode("utf-8"))
|
||||
assert body["private"] is True
|
||||
|
||||
def test_auth_header(self, monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
recorder = self._create(monkeypatch)
|
||||
_, kwargs = recorder.calls[0]
|
||||
assert kwargs["headers"]["Authorization"] == "token t"
|
||||
|
||||
def test_returns_response(
|
||||
self, monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
response = HttpResponse(201, {}, b'{"id": 1}')
|
||||
recorder = _Recorder(response)
|
||||
monkeypatch.setattr(forgejo.http_client, "post", recorder)
|
||||
|
||||
resp = forgejo.create_repository(HOST, "repo", "t")
|
||||
|
||||
assert resp is response
|
||||
|
||||
|
||||
class TestCloneUrls:
|
||||
|
||||
def test_ssh(self) -> None:
|
||||
assert forgejo.ssh_clone_url(HOST, "tiara", "repo") == (
|
||||
"git@git.example.com:tiara/repo.git"
|
||||
)
|
||||
|
||||
def test_https(self) -> None:
|
||||
assert forgejo.https_clone_url(HOST, "tiara", "repo") == (
|
||||
"https://git.example.com/tiara/repo.git"
|
||||
)
|
||||
|
|
@ -1,61 +0,0 @@
|
|||
import unittest
|
||||
|
||||
from byteb4rb1e.utils.string import RollingHash
|
||||
|
||||
class test_compute_initial_hash(unittest.TestCase):
|
||||
"""RollingHash.compute_initial_hash()
|
||||
|
||||
i calculated the hashes by hand, as a basis for this test case. Hopefully
|
||||
there's no logical flaw...
|
||||
"""
|
||||
|
||||
def test_default(self):
|
||||
"""computation of hash"""
|
||||
result = RollingHash.compute_initial_hash(
|
||||
b'abcdefg',
|
||||
base = 31,
|
||||
mod = 10**9 + 7
|
||||
)
|
||||
|
||||
self.assertEqual(result, 988021244)
|
||||
|
||||
class test___init__(unittest.TestCase):
|
||||
"""RollingHash.__init__()
|
||||
|
||||
Make sure the class instance is initialized correctly
|
||||
|
||||
I calculated the hashes by hand, as a basis for this test case. Hopefully
|
||||
there's no logical flaw...
|
||||
"""
|
||||
|
||||
def test_default(self):
|
||||
"""computation of initial hash and highest base factor"""
|
||||
instance = RollingHash(b'abcdefg')
|
||||
|
||||
self.assertEqual(instance._hash, 988021244)
|
||||
self.assertEqual(instance._hbase_factor, 887503681)
|
||||
|
||||
def test_defaults_override(self):
|
||||
"""override of defaults"""
|
||||
instance = RollingHash(
|
||||
b'abcdefg',
|
||||
base = 9,
|
||||
mod = 4
|
||||
)
|
||||
|
||||
self.assertEqual(instance._mod, 4)
|
||||
self.assertEqual(instance._base, 9)
|
||||
|
||||
|
||||
class test_roll(unittest.TestCase):
|
||||
"""RollingHash.roll()"""
|
||||
def test_rolling_hash(self):
|
||||
base=31
|
||||
mod=10**9 + 7
|
||||
|
||||
rh = RollingHash(b"foobar", base=base, mod=mod)
|
||||
rolled_hash = rh.roll(ord("f"), ord("n"))
|
||||
|
||||
control_hash = RollingHash.compute_initial_hash(b"oobarn", base, mod)
|
||||
|
||||
self.assertEqual(rolled_hash, control_hash)
|
||||
|
|
@ -1,93 +0,0 @@
|
|||
import os.path
|
||||
import sys
|
||||
import urllib.request
|
||||
|
||||
import pytest
|
||||
|
||||
from byteb4rb1e.testing.pytest.decorators import run_in_subprocess_once
|
||||
from byteb4rb1e.testing.pytest.fixtures import mock_system_site_package_dir
|
||||
from byteb4rb1e.utils.urllib.request import PkgHandler
|
||||
|
||||
|
||||
class TestPkgHandler:
|
||||
"""
|
||||
"""
|
||||
@run_in_subprocess_once()
|
||||
def test_text(self, mock_system_site_package_dir):
|
||||
"""
|
||||
"""
|
||||
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
|
||||
PkgHandler()
|
||||
)
|
||||
|
||||
dummy_data = 'Hello'
|
||||
|
||||
pkg_dir = mock_system_site_package_dir('foobarpkg')
|
||||
(pkg_dir / 'data.txt').write_text(dummy_data)
|
||||
|
||||
result = _opener.open('pkg://foobarpkg/data.txt').readline()
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert result == dummy_data
|
||||
|
||||
|
||||
@run_in_subprocess_once()
|
||||
def test_bytes(self, mock_system_site_package_dir):
|
||||
"""
|
||||
"""
|
||||
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
|
||||
PkgHandler()
|
||||
)
|
||||
|
||||
dummy_data = b'foobar123'
|
||||
|
||||
pkg_dir = mock_system_site_package_dir('foobarpkg')
|
||||
(pkg_dir / 'data.bin').write_bytes(dummy_data)
|
||||
|
||||
result = _opener.open('pkg://foobarpkg/data.bin').readline()
|
||||
|
||||
assert isinstance(result, bytes)
|
||||
assert result == dummy_data
|
||||
|
||||
|
||||
@run_in_subprocess_once()
|
||||
def test_subdir(self, mock_system_site_package_dir):
|
||||
"""
|
||||
"""
|
||||
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
|
||||
PkgHandler()
|
||||
)
|
||||
|
||||
dummy_data = 'foobar123'
|
||||
|
||||
pkg_dir = mock_system_site_package_dir('foobarpkg')
|
||||
|
||||
dummy_file = (pkg_dir / 'foo' / 'bar' / 'data.txt')
|
||||
|
||||
dummy_file.parent.mkdir(parents=True)
|
||||
dummy_file.write_text(dummy_data)
|
||||
|
||||
result = _opener.open('pkg://foobarpkg/foo/bar/data.txt').readline()
|
||||
|
||||
assert result == dummy_data
|
||||
|
||||
|
||||
@run_in_subprocess_once()
|
||||
def test_nested_module(self, mock_system_site_package_dir):
|
||||
"""
|
||||
"""
|
||||
_opener: urllib.request.OpenerDirector = urllib.request.build_opener(
|
||||
PkgHandler()
|
||||
)
|
||||
|
||||
dummy_data = 'foobar123'
|
||||
|
||||
pkg_dir = mock_system_site_package_dir('foo.bar.pkg')
|
||||
dummy_file = (pkg_dir / 'dummy' / 'data.txt')
|
||||
|
||||
dummy_file.parent.mkdir(parents=True)
|
||||
dummy_file.write_text(dummy_data)
|
||||
|
||||
result = _opener.open('pkg://foo.bar.pkg/dummy/data.txt').readline()
|
||||
|
||||
assert result == dummy_data
|
||||
|
|
@ -1,60 +0,0 @@
|
|||
"""Tests for the git subprocess wrapper's URL parsing helpers."""
|
||||
|
||||
import pytest
|
||||
|
||||
from byteb4rb1e.utils.vcs.git import parse_base_url, parse_repo_name
|
||||
|
||||
|
||||
class TestParseBaseUrl:
|
||||
|
||||
def test_bitbucket(self) -> None:
|
||||
result = parse_base_url("git@bitbucket.org:byteb4rb1e/foo.git")
|
||||
assert result == "byteb4rb1e"
|
||||
|
||||
def test_forgejo_host(self) -> None:
|
||||
result = parse_base_url(
|
||||
"git@git.code.tiararodney.com:h5p-mirror/foo.git"
|
||||
)
|
||||
assert result == "h5p-mirror"
|
||||
|
||||
def test_github_host(self) -> None:
|
||||
result = parse_base_url("git@github.com:h5p/h5p-multi-choice.git")
|
||||
assert result == "h5p"
|
||||
|
||||
def test_returns_str(self) -> None:
|
||||
result = parse_base_url("git@bitbucket.org:byteb4rb1e/foo.git")
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_rejects_https_url(self) -> None:
|
||||
with pytest.raises(ValueError):
|
||||
parse_base_url("https://bitbucket.org/byteb4rb1e/foo.git")
|
||||
|
||||
def test_rejects_url_without_colon(self) -> None:
|
||||
with pytest.raises(ValueError):
|
||||
parse_base_url("bitbucket.org/byteb4rb1e/foo.git")
|
||||
|
||||
|
||||
class TestParseRepoName:
|
||||
|
||||
def test_bitbucket(self) -> None:
|
||||
assert parse_repo_name(
|
||||
"git@bitbucket.org:byteb4rb1e/foo.git"
|
||||
) == "foo"
|
||||
|
||||
def test_forgejo_host(self) -> None:
|
||||
assert parse_repo_name(
|
||||
"git@git.code.tiararodney.com:h5p-mirror/foo.git"
|
||||
) == "foo"
|
||||
|
||||
def test_without_git_suffix(self) -> None:
|
||||
assert parse_repo_name(
|
||||
"git@git.code.tiararodney.com:h5p-mirror/foo"
|
||||
) == "foo"
|
||||
|
||||
def test_rejects_https_url(self) -> None:
|
||||
with pytest.raises(ValueError):
|
||||
parse_repo_name("https://git.code.tiararodney.com/x/foo.git")
|
||||
|
||||
def test_rejects_url_without_colon(self) -> None:
|
||||
with pytest.raises(ValueError):
|
||||
parse_repo_name("git.code.tiararodney.com/x/foo.git")
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
import unittest
|
||||
|
||||
from byteb4rb1e.utils.collections import CircularBuffer
|
||||
from byteb4rb1e_utils.collections import CircularBuffer
|
||||
|
||||
class test_init(unittest.TestCase):
|
||||
"""CircularBuffer.__init__()"""
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
from io import BytesIO, IOBase
|
||||
import unittest
|
||||
|
||||
from byteb4rb1e.utils.io import ChunksIO
|
||||
from byteb4rb1e_utils.io import ChunksIO
|
||||
|
||||
|
||||
class TestGetChunkSize(unittest.TestCase):
|
||||
54
tox.ini
54
tox.ini
|
|
@ -1,54 +0,0 @@
|
|||
[tox]
|
||||
requires =
|
||||
tox>=4.19
|
||||
env_list =
|
||||
unit-py3{9-13}
|
||||
integration-py3{9-13}-pytest8
|
||||
lint
|
||||
format
|
||||
|
||||
[testenv]
|
||||
deps =
|
||||
.
|
||||
|
||||
[testenv:lint]
|
||||
description = run type check on code base
|
||||
labels = static
|
||||
deps =
|
||||
mypy
|
||||
commands =
|
||||
mypy src tests --junit-xml test-reports/{env_name}.xml
|
||||
|
||||
[testenv:audit]
|
||||
description = run type check on code base
|
||||
labels = audit
|
||||
deps =
|
||||
pip-audit
|
||||
commands =
|
||||
pip-audit .
|
||||
|
||||
[testenv:format]
|
||||
description = run type check on code base
|
||||
labels = static
|
||||
deps =
|
||||
autopep8
|
||||
commands =
|
||||
autopep8 --diff --exit-code src tests
|
||||
|
||||
[testenv:unit-py3{9-13}]
|
||||
description = run type check on code base
|
||||
labels = unit
|
||||
deps =
|
||||
{[testenv]deps}
|
||||
pytest
|
||||
commands =
|
||||
pytest tests/unit --junitxml=test-reports/{env_name}.xml
|
||||
|
||||
[testenv:integration-py3{9-13}-pytest8]
|
||||
description = run pytest integration tests
|
||||
labels = integration
|
||||
deps =
|
||||
{[testenv]deps}
|
||||
pytest8: pytest>=8.0,<=9.0
|
||||
commands =
|
||||
pytest tests/integration -m pytest --junitxml=test-reports/{env_name}.xml
|
||||
Loading…
Add table
Add a link
Reference in a new issue