migrate sphinxcontrib.h5p.utils

This commit is contained in:
Tiara Rodney 2026-03-04 13:11:07 +01:00
parent cc4b567181
commit 5bf4a7eee4
No known key found for this signature in database
GPG key ID: 5CD8EC1D46106723
8 changed files with 742 additions and 0 deletions

View file

@ -0,0 +1,6 @@
"""Utilities for building composable CLIs from command dataclasses."""
from byteb4rb1e.utils.argparse.command import CLICommand
from byteb4rb1e.utils.argparse.dispatcher import CLI
__all__ = ["CLI", "CLICommand"]

View file

@ -0,0 +1,54 @@
"""Base command dataclass for composable CLI trees."""
from __future__ import annotations
from argparse import ArgumentParser
from dataclasses import dataclass, fields
from typing import Any, ClassVar, Dict, List, Optional, Type
@dataclass
class CLICommand:
"""Base class for CLI commands.
Subclasses define their identity (name, help, description) as
dataclass fields. These are passed as kwargs to
``subparsers.add_parser()``.
Override ``add_arguments`` to register flags and positionals.
Override ``execute`` to implement the command's logic.
Nest subcommands by setting ``_subcommands`` as a class variable.
"""
name: str = ""
help: str = ""
description: str = ""
_subcommands: ClassVar[List[Type[Command]]] = []
def add_arguments(self, parser: ArgumentParser) -> None:
"""Add arguments to the parser. Override in subclasses."""
def execute(self, args: Any) -> int:
"""Run the command. Override in subclasses.
Returns an exit code (0 = success).
"""
return 0
def parser_kwargs(self) -> Dict[str, Any]:
"""Return the dataclass fields as kwargs for add_parser.
Excludes ``name`` (used as the positional parser name) and
any empty-string fields so argparse defaults apply.
"""
skip = {"name"}
kwargs = {}
for f in fields(self):
if f.name in skip or f.name.startswith("_"):
continue
val = getattr(self, f.name)
if val != "":
kwargs[f.name] = val
return kwargs

View file

@ -0,0 +1,122 @@
"""CLI dispatcher — builds parser trees from command dataclasses."""
from __future__ import annotations
import logging
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from typing import Any, Dict, List, Optional, Type
from byteb4rb1e.utils.argparse.command import CLICommand
class CLI:
"""Composable CLI built from a tree of Command dataclasses.
Recursively bootstraps an argparse parser hierarchy and tracks
dest names so ``run()`` can dispatch to the correct leaf command
without dest chaining in the caller.
Usage::
cli = CLI(prog="repository", description="...")
cli.bootstrap([MirrorCommand, IndexCommand])
cli.run()
"""
def __init__(
self,
prog: Optional[str] = None,
description: str = "",
) -> None:
kwargs = {} # type: Dict[str, Any]
if prog:
kwargs["prog"] = prog
if description:
kwargs["description"] = description
kwargs.setdefault(
"formatter_class", ArgumentDefaultsHelpFormatter,
)
self.parser = ArgumentParser(**kwargs)
self._dests = [] # type: List[str]
self._commands = {} # type: Dict[str, Command]
def add_arguments(self, parser: ArgumentParser) -> None:
"""Add global arguments to the root parser."""
parser.add_argument(
"-v", "--verbose", action="count", default=0,
help="Increase verbosity (-v for INFO, -vv for DEBUG)",
)
def bootstrap(
self,
commands: List[Type[Command]],
) -> None:
"""Build the parser tree from a list of top-level commands."""
self.add_arguments(self.parser)
dest = "command"
self._dests.append(dest)
sub = self.parser.add_subparsers(dest=dest)
for cmd_cls in commands:
self._add(sub, cmd_cls, prefix="")
def _add(
self,
subparsers: Any,
cmd_cls: Type[Command],
prefix: str,
) -> None:
"""Recursively add a command and its subcommands."""
cmd = cmd_cls()
parser = subparsers.add_parser(
cmd.name,
formatter_class=ArgumentDefaultsHelpFormatter,
**cmd.parser_kwargs(),
)
cmd.add_arguments(parser)
key = "%s.%s" % (prefix, cmd.name) if prefix else cmd.name
self._commands[key] = cmd
if cmd._subcommands:
dest = "%s_command" % cmd.name
self._dests.append(dest)
child_sub = parser.add_subparsers(dest=dest)
for sc_cls in cmd._subcommands:
self._add(child_sub, sc_cls, prefix=key)
def _resolve(self, args: Any) -> Optional[Command]:
"""Walk dest chain to find the leaf command."""
parts = [] # type: List[str]
for dest in self._dests:
val = getattr(args, dest, None)
if val is None:
break
parts.append(val)
if not parts:
return None
key = ".".join(parts)
return self._commands.get(key)
@staticmethod
def _setup_logging(verbosity: int) -> None:
if verbosity >= 2:
level = logging.DEBUG
elif verbosity >= 1:
level = logging.INFO
else:
level = logging.WARNING
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()],
)
def run(self) -> None:
"""Parse args and dispatch to the leaf command."""
args = self.parser.parse_args()
self._setup_logging(getattr(args, "verbose", 0))
cmd = self._resolve(args)
if cmd is None:
self.parser.print_help()
raise SystemExit(1)
raise SystemExit(cmd.execute(args))

View file

@ -0,0 +1,109 @@
#!/usr/bin/env python3
"""Generic HTTP client.
Thin urllib wrapper with retry-on-rate-limit. No domain knowledge
GitHub, Bitbucket, etc. are handled by higher-level modules.
"""
import json
import time
from typing import Any, Dict, Optional
import urllib.request
import urllib.parse
from warnings import warn
class HttpResponse:
def __init__(self, status: int, headers: dict, data: bytes, reason: str):
self.status_code = status
self.headers = headers
self.data = data
self.reason = reason
self.text = data.decode("utf-8", errors="replace")
def json(self):
return json.loads(self.data.decode("utf-8"))
def _request(
url: str,
method: str = "GET",
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
data: Optional[bytes] = None,
) -> HttpResponse:
# TODO: do proper exponential backoff
backoff = [1, 2, 4]
if params:
query = urllib.parse.urlencode(params)
url = f"{url}?{query}"
req = urllib.request.Request(
url,
headers=headers or {},
method=method,
data=data,
)
for delay in backoff:
try:
with urllib.request.urlopen(req, timeout=30) as resp:
status = resp.getcode()
resp_data = resp.read()
resp_headers = dict(resp.getheaders())
if status == 429:
warn(f"Rate-limited on {url} (HTTP {status})."
f" Backing off {delay}s...")
time.sleep(delay)
continue
return HttpResponse(
status, resp_headers, resp_data, resp.reason,
)
except urllib.error.HTTPError as e:
status = e.code
err_data = e.read()
err_headers = dict(e.headers.items())
if status == 429:
warn(f"Rate-limited on {url} (HTTP {status})."
f" Backing off {delay}s...")
time.sleep(delay)
continue
return HttpResponse(
status, err_headers, err_data, e.reason,
)
except urllib.error.URLError as e:
raise Exception(
"Network error on %s: %s", url, e,
) from e
# If all retries exhausted, return last error-like response
return HttpResponse(503, {}, b"", "Service unavailable")
def get(
url: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
) -> HttpResponse:
return _request(url, method="GET", params=params, headers=headers)
def post(
url: str,
data: Optional[bytes] = None,
headers: Optional[Dict[str, str]] = None,
) -> HttpResponse:
return _request(url, method="POST", headers=headers, data=data)
def put(
url: str,
data: Optional[bytes] = None,
headers: Optional[Dict[str, str]] = None,
) -> HttpResponse:
return _request(url, method="PUT", headers=headers, data=data)

View file

@ -0,0 +1,78 @@
#!/usr/bin/env python3
"""Bitbucket Cloud REST API v2.0 wrapper.
Thin layer over http.py for Bitbucket-specific operations:
- Bearer token authentication
- Repository existence checks
- Repository creation within a workspace/project
"""
import json
from typing import Any, Dict, Optional
from byteb4rb1e.utils.http import client as http_client
BITBUCKET_API = "https://api.bitbucket.org/2.0"
def http_headers(token: str) -> Dict[str, str]:
"""Construct Bitbucket API headers with Bearer token auth."""
return {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
"Content-Type": "application/json",
}
def repository_exists(
workspace: str,
repo_slug: str,
token: str,
) -> bool:
"""Check whether a repository exists in the workspace."""
url = f"{BITBUCKET_API}/repositories/{workspace}/{repo_slug}"
resp = http_client.get(url, headers=http_headers(token))
return resp.status_code == 200
def create_repository(
workspace: str,
repo_slug: str,
token: str,
project: Optional[str] = None,
description: str = "",
is_private: bool = True,
) -> http_client.HttpResponse:
"""Create a new repository in the workspace.
When *project* is given the repository is assigned to that
Bitbucket project (by key). This is required for workspaces
that scope access keys at the project level.
Returns the API response. Caller should check status_code == 200
for success.
"""
url = f"{BITBUCKET_API}/repositories/{workspace}/{repo_slug}"
body: Dict[str, Any] = {
"scm": "git",
"is_private": is_private,
"description": description,
"fork_policy": "no_forks",
}
if project:
body["project"] = {"key": project}
return http_client.put(
url,
data=json.dumps(body).encode("utf-8"),
headers=http_headers(token),
)
def clone_url(
workspace: str,
repo_slug: str,
) -> str:
"""Return the SSH clone URL for a Bitbucket repository."""
return f"git@bitbucket.org:{workspace}/{repo_slug}.git"

View file

@ -0,0 +1,65 @@
#!/usr/bin/env python3
import hashlib
from pathlib import Path
from typing import Any, Dict, List, Optional
from byteb4rb1e.utils.http import client as http_client
GITHUB_API = "https://api.github.com"
def http_headers(token: Optional[str]) -> Dict[str, str]:
headers = {
"Accept": "application/vnd.github+json",
"User-Agent": "sphinx-h5p-worker1"
}
if token:
# Use standard PAT header; token not logged anywhere.
headers["Authorization"] = f"Bearer {token}"
return headers
def blob_sha(path: Path) -> str:
"""Calculate Git blob SHA-1 for a file, matching GitHub API 'sha'."""
data = path.read_bytes()
header = f"blob {len(data)}\0".encode("utf-8")
store = header + data
return hashlib.sha1(store).hexdigest()
def list_org_repos(org: str, token: Optional[str]) -> List[Dict[str, Any]]:
repos: List[Dict[str, Any]] = []
page = 1
per_page = 100
while True:
url = f"{GITHUB_API}/orgs/{org}/repos"
resp = http_client.get(
url,
params={"page": page, "per_page": per_page, "type": "public"},
headers=http_headers(token),
)
if resp.status_code != 200:
raise RuntimeError(f"Failed to list repos for org {org}: {resp.status_code} {resp.text}")
batch = resp.json()
if not batch:
break
repos.extend(batch)
page += 1
return repos
def fetch_file(
org: str,
repo: str,
path: str,
token: str
) -> http_client.HttpResponse:
"""
"""
url = f"{GITHUB_API}/repos/{org}/{repo}/{path}"
return http_client.get(
url,
headers=http_headers(token),
)

View file

View file

@ -0,0 +1,308 @@
#!/usr/bin/env python3
"""Git subprocess wrapper for repository operations.
Provides primitives for mirror cloning, syncing, remote management,
file extraction from bare repos, and submodule management.
No pygit2 or gitpython, uses subprocess only.
"""
import logging
import subprocess
from pathlib import Path
from typing import List, Optional
logger = logging.getLogger(__name__)
class GitError(Exception):
"""A git subprocess returned a non-zero exit code."""
def __init__(self, args: List[str], returncode: int, stderr: str):
self.args_list = args
self.returncode = returncode
self.stderr = stderr
super().__init__(
f"git exited {returncode}: {' '.join(args)}\n{stderr}"
)
def parse_base_url(base_url: str) -> str:
"""Extract workspace from an SCP-style Bitbucket base URL.
The host part must be exactly ``bitbucket.org`` bootstrapping
requires the Bitbucket API, so other hosts are rejected.
>>> _parse_base_url("git@bitbucket.org:byteb4rb1e")
'byteb4rb1e'
"""
# SCP-style: git@bitbucket.org:workspace
if ":" not in base_url or "//" in base_url:
raise ValueError(
f"Expected SCP-style URL (git@bitbucket.org:workspace), "
f"got: {base_url}"
)
host_part, workspace = base_url.split(":", 1)
# host_part is e.g. "git@bitbucket.org"
host = host_part.split("@", 1)[-1]
if host != "bitbucket.org":
raise ValueError(
f"Mirror base URL must target bitbucket.org, "
f"got host: {host}"
)
return Path(workspace).parent
def parse_repo_name(base_url: str) -> str:
"""Extract workspace from an SCP-style Bitbucket base URL.
The host part must be exactly ``bitbucket.org`` bootstrapping
requires the Bitbucket API, so other hosts are rejected.
>>> _parse_base_url("git@bitbucket.org:byteb4rb1e")
'byteb4rb1e'
"""
# SCP-style: git@bitbucket.org:workspace
if ":" not in base_url or "//" in base_url:
raise ValueError(
f"Expected SCP-style URL (git@bitbucket.org:workspace), "
f"got: {base_url}"
)
host_part, workspace = base_url.split(":", 1)
# host_part is e.g. "git@bitbucket.org"
host = host_part.split("@", 1)[-1]
if host != "bitbucket.org":
raise ValueError(
f"Mirror base URL must target bitbucket.org, "
f"got host: {host}"
)
return Path(workspace).name.split('.')[0]
def _run(
args: List[str],
cwd: Optional[Path] = None,
capture_stdout: bool = False,
) -> subprocess.CompletedProcess: # type: ignore[type-arg]
"""Run a git command, raising GitError on failure."""
cmd = ["git"] + args
logger.debug("$ %s", " ".join(cmd))
result = subprocess.run(
cmd,
cwd=cwd,
capture_output=True,
text=True,
)
if result.returncode != 0:
raise GitError(cmd, result.returncode, result.stderr.strip())
return result
def mirror_clone(source_url: str, dest: Path) -> None:
"""Clone a repository as a bare mirror.
Equivalent to ``git clone --mirror <source_url> <dest>``.
The destination directory must not already exist.
"""
_run(["clone", "--mirror", source_url, str(dest)])
logger.info("Cloned mirror %s%s", source_url, dest)
def add_remote(repo: Path, name: str, url: str) -> None:
"""Add a named remote to a bare repository."""
_run(["remote", "add", name, url], cwd=repo)
logger.debug("Added remote %s%s in %s", name, url, repo)
def has_remote(repo: Path, name: str) -> bool:
"""Check whether a named remote exists."""
result = _run(["remote"], cwd=repo)
return name in result.stdout.splitlines()
def mirror_update(repo: Path) -> None:
"""Fetch all remotes in a bare mirror repository.
Equivalent to ``git remote update`` inside the bare repo.
"""
_run(["remote", "update"], cwd=repo)
logger.debug("Updated remotes in %s", repo)
def fetch(repo: Path, remote: str = "origin") -> None:
"""Fetch from a single remote."""
_run(["fetch", remote], cwd=repo)
logger.debug("fetched %s in %s", remote, repo)
def show_ref(repo: Path) -> str:
"""Return the raw output of ``git show-ref`` (all refs + SHAs).
Returns an empty string if the repo has no refs.
"""
try:
result = _run(["show-ref"], cwd=repo)
return result.stdout
except GitError:
return ""
def mirror_push(repo: Path, remote: str) -> None:
"""Push the full mirror to a remote.
Equivalent to ``git push --mirror <remote>``.
"""
_run(["push", "--mirror", remote], cwd=repo)
logger.info("Pushed mirror to %s from %s", remote, repo)
def read_file(
repo: Path,
filepath: str,
ref: str = "HEAD",
) -> Optional[str]:
"""Extract a file's contents from a bare repo without checkout.
Returns the file content as a string, or None if the file does
not exist at the given ref.
"""
try:
result = _run(
["show", f"{ref}:{filepath}"],
cwd=repo,
capture_stdout=True,
)
return result.stdout
except GitError:
return None
# -------------------------------------------------------------------
# Ref / tag primitives
# -------------------------------------------------------------------
def list_tags(repo: Path) -> List[str]:
"""List all tags in a repository."""
result = _run(["tag", "-l"], cwd=repo)
return [t for t in result.stdout.splitlines() if t]
def resolve_ref(repo: Path, ref: str) -> str:
"""Resolve a ref to a full SHA.
Raises GitError if the ref cannot be resolved.
"""
result = _run(
["rev-parse", ref], cwd=repo, capture_stdout=True,
)
return result.stdout.strip()
def head_ref(repo: Path) -> str:
"""Return the full SHA of HEAD."""
return resolve_ref(repo, "HEAD")
# -------------------------------------------------------------------
# Pull-through bare clone cache
# -------------------------------------------------------------------
def bare_path_for_url(url: str, cache_dir: Path) -> Path:
"""Derive a cache path from a clone URL.
Strips scheme/host, keeps the path component, appends ``.git``.
Examples::
https://github.com/h5p/h5p-multi-choice
cache_dir / h5p / h5p-multi-choice.git
git@github.com:h5p/h5p-multi-choice.git
cache_dir / h5p / h5p-multi-choice.git
"""
# Handle SCP-style URLs (git@host:path)
if ":" in url and "//" not in url:
path_part = url.split(":", 1)[1]
else:
# Strip scheme + host
from urllib.parse import urlparse
parsed = urlparse(url)
path_part = parsed.path.lstrip("/")
# Strip trailing .git if present, then re-add it
if path_part.endswith(".git"):
path_part = path_part[:-4]
return cache_dir / (path_part + ".git")
def ensure_bare_clone(url: str, cache_dir: Path) -> Path:
"""Ensure a bare mirror clone exists in *cache_dir*.
If the bare repo already exists, fetches updates via
``mirror_update``. Otherwise, creates a new mirror clone.
Returns the path to the bare repo.
"""
bare_path = bare_path_for_url(url, cache_dir)
if bare_path.exists():
mirror_update(bare_path)
logger.debug("Updated existing cache %s", bare_path)
else:
bare_path.parent.mkdir(parents=True, exist_ok=True)
mirror_clone(url, bare_path)
logger.info("Cached new bare clone %s", bare_path)
return bare_path
# -------------------------------------------------------------------
# Submodule operations
# -------------------------------------------------------------------
def has_submodule(repo: Path, path: str) -> bool:
"""Check whether a submodule is registered at *path*.
Reads ``.gitmodules`` to determine whether the submodule exists.
Returns False if ``.gitmodules`` does not exist.
"""
gitmodules = repo / ".gitmodules"
if not gitmodules.is_file():
return False
try:
result = _run(
["config", "--file", ".gitmodules",
"--get-regexp", r"submodule\..*\.path"],
cwd=repo,
)
except GitError:
return False
for line in result.stdout.splitlines():
parts = line.split(None, 1)
if len(parts) == 2 and parts[1] == path:
return True
return False
def submodule_add(repo: Path, url: str, path: str) -> None:
"""Add a git submodule at *path* pointing to *url*.
Equivalent to ``git submodule add <url> <path>`` inside *repo*.
"""
_run(["submodule", "add", url, path], cwd=repo)
logger.info("Added submodule %s%s", url, path)
def submodule_update(repo: Path, path: str) -> None:
"""Fetch and update a submodule to the latest remote HEAD.
Enters the submodule directory, fetches origin, and checks out
the latest commit on the remote default branch.
"""
sub_path = repo / path
_run(["fetch", "origin"], cwd=sub_path)
# Determine default branch from remote HEAD
result = _run(
["symbolic-ref", "refs/remotes/origin/HEAD",
"--short"],
cwd=sub_path,
)
default_branch = result.stdout.strip()
_run(["checkout", default_branch], cwd=sub_path)
logger.info("Updated submodule %s to %s", path, default_branch)