From 5bf4a7eee4386822872423b09539d1517fd4967c Mon Sep 17 00:00:00 2001 From: Tiara Rodney Date: Wed, 4 Mar 2026 13:11:07 +0100 Subject: [PATCH] migrate sphinxcontrib.h5p.utils --- src/byteb4rb1e/utils/argparse/__init__.py | 6 + src/byteb4rb1e/utils/argparse/command.py | 54 ++++ src/byteb4rb1e/utils/argparse/dispatcher.py | 122 ++++++++ src/byteb4rb1e/utils/http/client.py | 109 +++++++ src/byteb4rb1e/utils/saas/bitbucket.py | 78 +++++ src/byteb4rb1e/utils/saas/github.py | 65 +++++ src/byteb4rb1e/utils/vcs/__init__.py | 0 src/byteb4rb1e/utils/vcs/git.py | 308 ++++++++++++++++++++ 8 files changed, 742 insertions(+) create mode 100644 src/byteb4rb1e/utils/argparse/__init__.py create mode 100644 src/byteb4rb1e/utils/argparse/command.py create mode 100644 src/byteb4rb1e/utils/argparse/dispatcher.py create mode 100644 src/byteb4rb1e/utils/http/client.py create mode 100644 src/byteb4rb1e/utils/saas/bitbucket.py create mode 100644 src/byteb4rb1e/utils/saas/github.py create mode 100644 src/byteb4rb1e/utils/vcs/__init__.py create mode 100644 src/byteb4rb1e/utils/vcs/git.py diff --git a/src/byteb4rb1e/utils/argparse/__init__.py b/src/byteb4rb1e/utils/argparse/__init__.py new file mode 100644 index 0000000..84ae3ed --- /dev/null +++ b/src/byteb4rb1e/utils/argparse/__init__.py @@ -0,0 +1,6 @@ +"""Utilities for building composable CLIs from command dataclasses.""" + +from byteb4rb1e.utils.argparse.command import CLICommand +from byteb4rb1e.utils.argparse.dispatcher import CLI + +__all__ = ["CLI", "CLICommand"] diff --git a/src/byteb4rb1e/utils/argparse/command.py b/src/byteb4rb1e/utils/argparse/command.py new file mode 100644 index 0000000..199e4f1 --- /dev/null +++ b/src/byteb4rb1e/utils/argparse/command.py @@ -0,0 +1,54 @@ +"""Base command dataclass for composable CLI trees.""" + +from __future__ import annotations + +from argparse import ArgumentParser +from dataclasses import dataclass, fields +from typing import Any, ClassVar, Dict, List, Optional, Type + + +@dataclass +class CLICommand: + """Base class for CLI commands. + + Subclasses define their identity (name, help, description) as + dataclass fields. These are passed as kwargs to + ``subparsers.add_parser()``. + + Override ``add_arguments`` to register flags and positionals. + Override ``execute`` to implement the command's logic. + + Nest subcommands by setting ``_subcommands`` as a class variable. + """ + + name: str = "" + help: str = "" + description: str = "" + + _subcommands: ClassVar[List[Type[Command]]] = [] + + def add_arguments(self, parser: ArgumentParser) -> None: + """Add arguments to the parser. Override in subclasses.""" + + def execute(self, args: Any) -> int: + """Run the command. Override in subclasses. + + Returns an exit code (0 = success). + """ + return 0 + + def parser_kwargs(self) -> Dict[str, Any]: + """Return the dataclass fields as kwargs for add_parser. + + Excludes ``name`` (used as the positional parser name) and + any empty-string fields so argparse defaults apply. + """ + skip = {"name"} + kwargs = {} + for f in fields(self): + if f.name in skip or f.name.startswith("_"): + continue + val = getattr(self, f.name) + if val != "": + kwargs[f.name] = val + return kwargs diff --git a/src/byteb4rb1e/utils/argparse/dispatcher.py b/src/byteb4rb1e/utils/argparse/dispatcher.py new file mode 100644 index 0000000..0c8768b --- /dev/null +++ b/src/byteb4rb1e/utils/argparse/dispatcher.py @@ -0,0 +1,122 @@ +"""CLI dispatcher — builds parser trees from command dataclasses.""" + +from __future__ import annotations + +import logging +from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser +from typing import Any, Dict, List, Optional, Type + +from byteb4rb1e.utils.argparse.command import CLICommand + + +class CLI: + """Composable CLI built from a tree of Command dataclasses. + + Recursively bootstraps an argparse parser hierarchy and tracks + dest names so ``run()`` can dispatch to the correct leaf command + without dest chaining in the caller. + + Usage:: + + cli = CLI(prog="repository", description="...") + cli.bootstrap([MirrorCommand, IndexCommand]) + cli.run() + """ + + def __init__( + self, + prog: Optional[str] = None, + description: str = "", + ) -> None: + kwargs = {} # type: Dict[str, Any] + if prog: + kwargs["prog"] = prog + if description: + kwargs["description"] = description + kwargs.setdefault( + "formatter_class", ArgumentDefaultsHelpFormatter, + ) + self.parser = ArgumentParser(**kwargs) + self._dests = [] # type: List[str] + self._commands = {} # type: Dict[str, Command] + + def add_arguments(self, parser: ArgumentParser) -> None: + """Add global arguments to the root parser.""" + parser.add_argument( + "-v", "--verbose", action="count", default=0, + help="Increase verbosity (-v for INFO, -vv for DEBUG)", + ) + + def bootstrap( + self, + commands: List[Type[Command]], + ) -> None: + """Build the parser tree from a list of top-level commands.""" + self.add_arguments(self.parser) + dest = "command" + self._dests.append(dest) + sub = self.parser.add_subparsers(dest=dest) + for cmd_cls in commands: + self._add(sub, cmd_cls, prefix="") + + def _add( + self, + subparsers: Any, + cmd_cls: Type[Command], + prefix: str, + ) -> None: + """Recursively add a command and its subcommands.""" + cmd = cmd_cls() + parser = subparsers.add_parser( + cmd.name, + formatter_class=ArgumentDefaultsHelpFormatter, + **cmd.parser_kwargs(), + ) + cmd.add_arguments(parser) + + key = "%s.%s" % (prefix, cmd.name) if prefix else cmd.name + self._commands[key] = cmd + + if cmd._subcommands: + dest = "%s_command" % cmd.name + self._dests.append(dest) + child_sub = parser.add_subparsers(dest=dest) + for sc_cls in cmd._subcommands: + self._add(child_sub, sc_cls, prefix=key) + + def _resolve(self, args: Any) -> Optional[Command]: + """Walk dest chain to find the leaf command.""" + parts = [] # type: List[str] + for dest in self._dests: + val = getattr(args, dest, None) + if val is None: + break + parts.append(val) + if not parts: + return None + key = ".".join(parts) + return self._commands.get(key) + + @staticmethod + def _setup_logging(verbosity: int) -> None: + if verbosity >= 2: + level = logging.DEBUG + elif verbosity >= 1: + level = logging.INFO + else: + level = logging.WARNING + logging.basicConfig( + level=level, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[logging.StreamHandler()], + ) + + def run(self) -> None: + """Parse args and dispatch to the leaf command.""" + args = self.parser.parse_args() + self._setup_logging(getattr(args, "verbose", 0)) + cmd = self._resolve(args) + if cmd is None: + self.parser.print_help() + raise SystemExit(1) + raise SystemExit(cmd.execute(args)) diff --git a/src/byteb4rb1e/utils/http/client.py b/src/byteb4rb1e/utils/http/client.py new file mode 100644 index 0000000..0962445 --- /dev/null +++ b/src/byteb4rb1e/utils/http/client.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python3 +"""Generic HTTP client. + +Thin urllib wrapper with retry-on-rate-limit. No domain knowledge — +GitHub, Bitbucket, etc. are handled by higher-level modules. +""" + +import json +import time +from typing import Any, Dict, Optional +import urllib.request +import urllib.parse +from warnings import warn + + +class HttpResponse: + def __init__(self, status: int, headers: dict, data: bytes, reason: str): + self.status_code = status + self.headers = headers + self.data = data + self.reason = reason + self.text = data.decode("utf-8", errors="replace") + + def json(self): + return json.loads(self.data.decode("utf-8")) + + +def _request( + url: str, + method: str = "GET", + params: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None, + data: Optional[bytes] = None, +) -> HttpResponse: + # TODO: do proper exponential backoff + backoff = [1, 2, 4] + + if params: + query = urllib.parse.urlencode(params) + url = f"{url}?{query}" + + req = urllib.request.Request( + url, + headers=headers or {}, + method=method, + data=data, + ) + + for delay in backoff: + try: + with urllib.request.urlopen(req, timeout=30) as resp: + status = resp.getcode() + resp_data = resp.read() + resp_headers = dict(resp.getheaders()) + + if status == 429: + warn(f"Rate-limited on {url} (HTTP {status})." + f" Backing off {delay}s...") + time.sleep(delay) + continue + + return HttpResponse( + status, resp_headers, resp_data, resp.reason, + ) + + except urllib.error.HTTPError as e: + status = e.code + err_data = e.read() + err_headers = dict(e.headers.items()) + if status == 429: + warn(f"Rate-limited on {url} (HTTP {status})." + f" Backing off {delay}s...") + time.sleep(delay) + continue + return HttpResponse( + status, err_headers, err_data, e.reason, + ) + + except urllib.error.URLError as e: + raise Exception( + "Network error on %s: %s", url, e, + ) from e + + # If all retries exhausted, return last error-like response + return HttpResponse(503, {}, b"", "Service unavailable") + + +def get( + url: str, + params: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None, +) -> HttpResponse: + return _request(url, method="GET", params=params, headers=headers) + + +def post( + url: str, + data: Optional[bytes] = None, + headers: Optional[Dict[str, str]] = None, +) -> HttpResponse: + return _request(url, method="POST", headers=headers, data=data) + + +def put( + url: str, + data: Optional[bytes] = None, + headers: Optional[Dict[str, str]] = None, +) -> HttpResponse: + return _request(url, method="PUT", headers=headers, data=data) diff --git a/src/byteb4rb1e/utils/saas/bitbucket.py b/src/byteb4rb1e/utils/saas/bitbucket.py new file mode 100644 index 0000000..d9b5bad --- /dev/null +++ b/src/byteb4rb1e/utils/saas/bitbucket.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 +"""Bitbucket Cloud REST API v2.0 wrapper. + +Thin layer over http.py for Bitbucket-specific operations: + +- Bearer token authentication +- Repository existence checks +- Repository creation within a workspace/project +""" + +import json +from typing import Any, Dict, Optional + +from byteb4rb1e.utils.http import client as http_client + + +BITBUCKET_API = "https://api.bitbucket.org/2.0" + + +def http_headers(token: str) -> Dict[str, str]: + """Construct Bitbucket API headers with Bearer token auth.""" + return { + "Authorization": f"Bearer {token}", + "Accept": "application/json", + "Content-Type": "application/json", + } + + +def repository_exists( + workspace: str, + repo_slug: str, + token: str, +) -> bool: + """Check whether a repository exists in the workspace.""" + url = f"{BITBUCKET_API}/repositories/{workspace}/{repo_slug}" + resp = http_client.get(url, headers=http_headers(token)) + return resp.status_code == 200 + + +def create_repository( + workspace: str, + repo_slug: str, + token: str, + project: Optional[str] = None, + description: str = "", + is_private: bool = True, +) -> http_client.HttpResponse: + """Create a new repository in the workspace. + + When *project* is given the repository is assigned to that + Bitbucket project (by key). This is required for workspaces + that scope access keys at the project level. + + Returns the API response. Caller should check status_code == 200 + for success. + """ + url = f"{BITBUCKET_API}/repositories/{workspace}/{repo_slug}" + body: Dict[str, Any] = { + "scm": "git", + "is_private": is_private, + "description": description, + "fork_policy": "no_forks", + } + if project: + body["project"] = {"key": project} + return http_client.put( + url, + data=json.dumps(body).encode("utf-8"), + headers=http_headers(token), + ) + + +def clone_url( + workspace: str, + repo_slug: str, +) -> str: + """Return the SSH clone URL for a Bitbucket repository.""" + return f"git@bitbucket.org:{workspace}/{repo_slug}.git" diff --git a/src/byteb4rb1e/utils/saas/github.py b/src/byteb4rb1e/utils/saas/github.py new file mode 100644 index 0000000..d174827 --- /dev/null +++ b/src/byteb4rb1e/utils/saas/github.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +import hashlib +from pathlib import Path +from typing import Any, Dict, List, Optional + +from byteb4rb1e.utils.http import client as http_client + + +GITHUB_API = "https://api.github.com" + + +def http_headers(token: Optional[str]) -> Dict[str, str]: + headers = { + "Accept": "application/vnd.github+json", + "User-Agent": "sphinx-h5p-worker1" + } + if token: + # Use standard PAT header; token not logged anywhere. + headers["Authorization"] = f"Bearer {token}" + return headers + + +def blob_sha(path: Path) -> str: + """Calculate Git blob SHA-1 for a file, matching GitHub API 'sha'.""" + data = path.read_bytes() + header = f"blob {len(data)}\0".encode("utf-8") + store = header + data + return hashlib.sha1(store).hexdigest() + + +def list_org_repos(org: str, token: Optional[str]) -> List[Dict[str, Any]]: + repos: List[Dict[str, Any]] = [] + page = 1 + per_page = 100 + while True: + url = f"{GITHUB_API}/orgs/{org}/repos" + resp = http_client.get( + url, + params={"page": page, "per_page": per_page, "type": "public"}, + headers=http_headers(token), + ) + if resp.status_code != 200: + raise RuntimeError(f"Failed to list repos for org {org}: {resp.status_code} {resp.text}") + batch = resp.json() + if not batch: + break + repos.extend(batch) + page += 1 + return repos + + +def fetch_file( + org: str, + repo: str, + path: str, + token: str +) -> http_client.HttpResponse: + """ + """ + url = f"{GITHUB_API}/repos/{org}/{repo}/{path}" + + return http_client.get( + url, + headers=http_headers(token), + ) diff --git a/src/byteb4rb1e/utils/vcs/__init__.py b/src/byteb4rb1e/utils/vcs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/byteb4rb1e/utils/vcs/git.py b/src/byteb4rb1e/utils/vcs/git.py new file mode 100644 index 0000000..ab7e87f --- /dev/null +++ b/src/byteb4rb1e/utils/vcs/git.py @@ -0,0 +1,308 @@ +#!/usr/bin/env python3 +"""Git subprocess wrapper for repository operations. + +Provides primitives for mirror cloning, syncing, remote management, +file extraction from bare repos, and submodule management. +No pygit2 or gitpython, uses subprocess only. +""" +import logging +import subprocess +from pathlib import Path +from typing import List, Optional + +logger = logging.getLogger(__name__) + + +class GitError(Exception): + """A git subprocess returned a non-zero exit code.""" + + def __init__(self, args: List[str], returncode: int, stderr: str): + self.args_list = args + self.returncode = returncode + self.stderr = stderr + super().__init__( + f"git exited {returncode}: {' '.join(args)}\n{stderr}" + ) + + +def parse_base_url(base_url: str) -> str: + """Extract workspace from an SCP-style Bitbucket base URL. + + The host part must be exactly ``bitbucket.org`` — bootstrapping + requires the Bitbucket API, so other hosts are rejected. + + >>> _parse_base_url("git@bitbucket.org:byteb4rb1e") + 'byteb4rb1e' + """ + # SCP-style: git@bitbucket.org:workspace + if ":" not in base_url or "//" in base_url: + raise ValueError( + f"Expected SCP-style URL (git@bitbucket.org:workspace), " + f"got: {base_url}" + ) + host_part, workspace = base_url.split(":", 1) + # host_part is e.g. "git@bitbucket.org" + host = host_part.split("@", 1)[-1] + if host != "bitbucket.org": + raise ValueError( + f"Mirror base URL must target bitbucket.org, " + f"got host: {host}" + ) + return Path(workspace).parent + + +def parse_repo_name(base_url: str) -> str: + """Extract workspace from an SCP-style Bitbucket base URL. + + The host part must be exactly ``bitbucket.org`` — bootstrapping + requires the Bitbucket API, so other hosts are rejected. + + >>> _parse_base_url("git@bitbucket.org:byteb4rb1e") + 'byteb4rb1e' + """ + # SCP-style: git@bitbucket.org:workspace + if ":" not in base_url or "//" in base_url: + raise ValueError( + f"Expected SCP-style URL (git@bitbucket.org:workspace), " + f"got: {base_url}" + ) + host_part, workspace = base_url.split(":", 1) + # host_part is e.g. "git@bitbucket.org" + host = host_part.split("@", 1)[-1] + if host != "bitbucket.org": + raise ValueError( + f"Mirror base URL must target bitbucket.org, " + f"got host: {host}" + ) + return Path(workspace).name.split('.')[0] + + + +def _run( + args: List[str], + cwd: Optional[Path] = None, + capture_stdout: bool = False, +) -> subprocess.CompletedProcess: # type: ignore[type-arg] + """Run a git command, raising GitError on failure.""" + cmd = ["git"] + args + logger.debug("$ %s", " ".join(cmd)) + result = subprocess.run( + cmd, + cwd=cwd, + capture_output=True, + text=True, + ) + if result.returncode != 0: + raise GitError(cmd, result.returncode, result.stderr.strip()) + return result + + +def mirror_clone(source_url: str, dest: Path) -> None: + """Clone a repository as a bare mirror. + + Equivalent to ``git clone --mirror ``. + The destination directory must not already exist. + """ + _run(["clone", "--mirror", source_url, str(dest)]) + logger.info("Cloned mirror %s → %s", source_url, dest) + + +def add_remote(repo: Path, name: str, url: str) -> None: + """Add a named remote to a bare repository.""" + _run(["remote", "add", name, url], cwd=repo) + logger.debug("Added remote %s → %s in %s", name, url, repo) + + +def has_remote(repo: Path, name: str) -> bool: + """Check whether a named remote exists.""" + result = _run(["remote"], cwd=repo) + return name in result.stdout.splitlines() + + +def mirror_update(repo: Path) -> None: + """Fetch all remotes in a bare mirror repository. + + Equivalent to ``git remote update`` inside the bare repo. + """ + _run(["remote", "update"], cwd=repo) + logger.debug("Updated remotes in %s", repo) + + +def fetch(repo: Path, remote: str = "origin") -> None: + """Fetch from a single remote.""" + _run(["fetch", remote], cwd=repo) + logger.debug("fetched %s in %s", remote, repo) + + +def show_ref(repo: Path) -> str: + """Return the raw output of ``git show-ref`` (all refs + SHAs). + + Returns an empty string if the repo has no refs. + """ + try: + result = _run(["show-ref"], cwd=repo) + return result.stdout + except GitError: + return "" + + +def mirror_push(repo: Path, remote: str) -> None: + """Push the full mirror to a remote. + + Equivalent to ``git push --mirror ``. + """ + _run(["push", "--mirror", remote], cwd=repo) + logger.info("Pushed mirror to %s from %s", remote, repo) + + +def read_file( + repo: Path, + filepath: str, + ref: str = "HEAD", +) -> Optional[str]: + """Extract a file's contents from a bare repo without checkout. + + Returns the file content as a string, or None if the file does + not exist at the given ref. + """ + try: + result = _run( + ["show", f"{ref}:{filepath}"], + cwd=repo, + capture_stdout=True, + ) + return result.stdout + except GitError: + return None + + +# ------------------------------------------------------------------- +# Ref / tag primitives +# ------------------------------------------------------------------- + +def list_tags(repo: Path) -> List[str]: + """List all tags in a repository.""" + result = _run(["tag", "-l"], cwd=repo) + return [t for t in result.stdout.splitlines() if t] + + +def resolve_ref(repo: Path, ref: str) -> str: + """Resolve a ref to a full SHA. + + Raises GitError if the ref cannot be resolved. + """ + result = _run( + ["rev-parse", ref], cwd=repo, capture_stdout=True, + ) + return result.stdout.strip() + + +def head_ref(repo: Path) -> str: + """Return the full SHA of HEAD.""" + return resolve_ref(repo, "HEAD") + + +# ------------------------------------------------------------------- +# Pull-through bare clone cache +# ------------------------------------------------------------------- + +def bare_path_for_url(url: str, cache_dir: Path) -> Path: + """Derive a cache path from a clone URL. + + Strips scheme/host, keeps the path component, appends ``.git``. + + Examples:: + + https://github.com/h5p/h5p-multi-choice + → cache_dir / h5p / h5p-multi-choice.git + git@github.com:h5p/h5p-multi-choice.git + → cache_dir / h5p / h5p-multi-choice.git + """ + # Handle SCP-style URLs (git@host:path) + if ":" in url and "//" not in url: + path_part = url.split(":", 1)[1] + else: + # Strip scheme + host + from urllib.parse import urlparse + parsed = urlparse(url) + path_part = parsed.path.lstrip("/") + + # Strip trailing .git if present, then re-add it + if path_part.endswith(".git"): + path_part = path_part[:-4] + + return cache_dir / (path_part + ".git") + + +def ensure_bare_clone(url: str, cache_dir: Path) -> Path: + """Ensure a bare mirror clone exists in *cache_dir*. + + If the bare repo already exists, fetches updates via + ``mirror_update``. Otherwise, creates a new mirror clone. + Returns the path to the bare repo. + """ + bare_path = bare_path_for_url(url, cache_dir) + if bare_path.exists(): + mirror_update(bare_path) + logger.debug("Updated existing cache %s", bare_path) + else: + bare_path.parent.mkdir(parents=True, exist_ok=True) + mirror_clone(url, bare_path) + logger.info("Cached new bare clone %s", bare_path) + return bare_path + + +# ------------------------------------------------------------------- +# Submodule operations +# ------------------------------------------------------------------- + +def has_submodule(repo: Path, path: str) -> bool: + """Check whether a submodule is registered at *path*. + + Reads ``.gitmodules`` to determine whether the submodule exists. + Returns False if ``.gitmodules`` does not exist. + """ + gitmodules = repo / ".gitmodules" + if not gitmodules.is_file(): + return False + try: + result = _run( + ["config", "--file", ".gitmodules", + "--get-regexp", r"submodule\..*\.path"], + cwd=repo, + ) + except GitError: + return False + for line in result.stdout.splitlines(): + parts = line.split(None, 1) + if len(parts) == 2 and parts[1] == path: + return True + return False + + +def submodule_add(repo: Path, url: str, path: str) -> None: + """Add a git submodule at *path* pointing to *url*. + + Equivalent to ``git submodule add `` inside *repo*. + """ + _run(["submodule", "add", url, path], cwd=repo) + logger.info("Added submodule %s → %s", url, path) + + +def submodule_update(repo: Path, path: str) -> None: + """Fetch and update a submodule to the latest remote HEAD. + + Enters the submodule directory, fetches origin, and checks out + the latest commit on the remote default branch. + """ + sub_path = repo / path + _run(["fetch", "origin"], cwd=sub_path) + # Determine default branch from remote HEAD + result = _run( + ["symbolic-ref", "refs/remotes/origin/HEAD", + "--short"], + cwd=sub_path, + ) + default_branch = result.stdout.strip() + _run(["checkout", default_branch], cwd=sub_path) + logger.info("Updated submodule %s to %s", path, default_branch)