diff --git a/src/byteb4rb1e/utils/urllib/request.py b/src/byteb4rb1e/utils/urllib/request.py index 28e8e8e..3f9ee20 100644 --- a/src/byteb4rb1e/utils/urllib/request.py +++ b/src/byteb4rb1e/utils/urllib/request.py @@ -1,4 +1,6 @@ +import email import importlib.resources +import mimetypes from urllib.request import URLError import urllib.request @@ -6,15 +8,13 @@ import urllib.request class PkgHandler(urllib.request.BaseHandler): """ """ - def pkg_open(self, req): + def pkg_open(self, req) -> urllib.request.addinfourl: pkg_files = importlib.resources.files(req.host) - raise Exception(sorted(pkg_files.glob('**/*'))) - try: - fh = list( + fh = next( pkg_files.glob(req.selector.lstrip('//')) - )[0].open('rb') + ).open('rb') except Exception as e: raise URLError(f'{e.__class__.__name__}: {e}') from e @@ -22,15 +22,17 @@ class PkgHandler(urllib.request.BaseHandler): size = fh.tell(); fh.seek(0); - mtype, _ = mimetypes.guess_type(url) + mtype, _ = mimetypes.guess_type(req.selector) headers = email.message_from_string( 'Content-Type: %s\nContent-Length: %d\n' % (mtype or 'text/plain', size) ) - if not mtype or mtype.starts_with('text/'): + if not mtype or mtype.startswith('text/'): fh.close() - fh = importlib.resources.files(req.host).glob(req.selector)[0].open('r') + fh = next( + pkg_files.glob(req.selector.lstrip('//')) + ).open('r') - return urllib.request.addinfourl(fh, header) + return urllib.request.addinfourl(fh, headers, None) diff --git a/tests/unit/byteb4rb1e/utils/urllib/test_request.py b/tests/unit/byteb4rb1e/utils/urllib/test_request.py index 74cf40d..5fdf7c0 100644 --- a/tests/unit/byteb4rb1e/utils/urllib/test_request.py +++ b/tests/unit/byteb4rb1e/utils/urllib/test_request.py @@ -1,13 +1,90 @@ +import os.path +import sys +import urllib.request + import pytest +from byteb4rb1e.utils.testing.pytest.decorators import run_in_subprocess_once +from byteb4rb1e.utils.testing.pytest.fixtures import mock_pkg from byteb4rb1e.utils.urllib.request import PkgHandler class TestPkgHandler: """ """ - def test_default(self): + @run_in_subprocess_once() + def test_text(self, mock_pkg): """ """ - pass + _opener: urllib.request.OpenerDirector = urllib.request.build_opener( + PkgHandler() + ) + dummy_data = 'Hello' + + mock_pkg('foobarpkg', { + 'data.txt': dummy_data + }) + + result = _opener.open('pkg://foobarpkg/data.txt').readline() + + assert isinstance(result, str) + assert result == dummy_data + + + @run_in_subprocess_once() + def test_bytes(self, mock_pkg): + """ + """ + _opener: urllib.request.OpenerDirector = urllib.request.build_opener( + PkgHandler() + ) + + dummy_data = b'foobar123' + + mock_pkg('foobarpkg', { + 'data.bin': dummy_data + }) + + result = _opener.open('pkg://foobarpkg/data.bin').readline() + + assert isinstance(result, bytes) + assert result == dummy_data + + + @run_in_subprocess_once() + def test_subdir(self, mock_pkg): + """ + """ + _opener: urllib.request.OpenerDirector = urllib.request.build_opener( + PkgHandler() + ) + + dummy_data = 'foobar123' + + mock_pkg('foobarpkg', { + 'foo/bar/data.txt': dummy_data + }) + + result = _opener.open('pkg://foobarpkg/foo/bar/data.txt').readline() + + assert result == dummy_data + + + @run_in_subprocess_once() + def test_nested_module(self, mock_pkg): + """ + """ + _opener: urllib.request.OpenerDirector = urllib.request.build_opener( + PkgHandler() + ) + + dummy_data = 'foobar123' + + mock_pkg('foo.bar.pkg', { + 'dummy/data.txt': dummy_data + }) + + result = _opener.open('pkg://foo.bar.pkg/dummy/data.txt').readline() + + assert result == dummy_data