This commit is contained in:
Rodney, Tiara 2025-05-03 19:26:12 +02:00
commit 29c7308410
No known key found for this signature in database
GPG key ID: 5CD8EC1D46106723
17 changed files with 3755 additions and 0 deletions

View file

@ -0,0 +1,233 @@
from io import BytesIO, IOBase
import math
from typing import Optional, Tuple, List
class ChunksIO(IOBase):
"""handler for HTTP/1.1 chunked transfer-encoded (RFC 9112 §7) byte streams
Compact and predictable implementation of a RFC 9112 compliant stream
handler, which exposes a common IOBase interface for treating chunked byte
streams as pure, unencoded byte streams.
.. notice::
The implementation is currently only concerned with read operations,
though the layout is prepared for an easy straightforward implementation
of write operations.
"""
#: maximum allowed size of a chunk
# MiB by default, just guessing 10 MiB is a sensible limit
max_chunk_size = int(10 * (1024 ** 2))
#: optional write-through buffer
_buffer: BytesIO
#: chunks sizes
_chunks_size: List[Tuple[int, int]]
#: index of current chunk
_current_chunk: int
#: cursor position on the underlying stream, as the stream is not expected
# to implement ``tell()``. Limiting factor of how large the stream may be.
# Look at ``sys.maxsize`` for more information.
_cursor: int
#: chunk encoded stream
_stream: BytesIO
def __init__(
self,
stream: BytesIO,
buffer: Optional[BytesIO] = None,
):
"""initialize the instance
.. notice::
The write-through buffer is required to be seekable, writable and
readable and MUST be considered locked during any operation of the
ChunksIO implementation. The buffer's cursor position does not
reflect the cursor position of the underlying stream.
:param stream: a byte-stream to abstract
:param buffer: write-through buffer for all read operations on the
underlying stream. This can be useful, if the data needs
to be accessed again later on.
:param max_chunk_size: the maximum size of a single chunk (excluding
it's bytes size segment)
"""
if stream.readable() == False:
raise Exception('expected readable stream')
if buffer != None:
if buffer.writable() == False:
raise Exception('expected writable buffer')
self._buffer = buffer
self._chunks_size = []
self._current_chunk = 0
self._cursor = 0
self._stream = stream
super().__init__()
@staticmethod
def get_chunk_size(
stream: BytesIO,
max_size: int,
) -> Tuple[int, int]:
"""get the size of the next chunk from a RFC 9112 (§7) chunk encoded
byte stream
stream cursor position is assumed to be at the start of the preceeding
byte size segment of chunk data. The max_size parameter is converted to
its bytes representation, to determine early on if a read is feasible
and won't cause a denial-of-service.
:param stream: the stream to read the chunk size from
:param max_size: the maximum allowed size a chunk can be. I wasn't able
to find a definitive limit defined in the RFC so this
is guess working and at least curl has a pretty big
chunk size of more than 6 MiB.
:returns: tuple of the size of the bytes size segment and the data
bytes size, whose sum is the total size of the chunk
"""
_terminator = b'\r\n'
chunk_size = b''
terminator = b''
# calculate the number of bytes the max_size byte representation
# requires. This is a precaution so that chunks can't be arbitrarily
# long.
max_size_bytes = math.ceil(max_size.bit_length() / 8)
# the iteration could be handled with less system calls by reading a
# larger *chunk* of data and iterating over that in-memory cache.
# Though, this would come at the expense of unpredictable memory
# consumption and would require a write-through buffer by default, in
# addition to making the implementation more complex.
for _ in range(max_size_bytes + len(_terminator)):
buf = stream.read(1)
if buf in _terminator: terminator += buf
else: chunk_size += buf
if terminator == _terminator:
if (not chunk_size):
raise ValueError(
'terminator reached without having parsed ' +
'any byte size'
)
return (
len(chunk_size + terminator),
int.from_bytes(chunk_size, byteorder='big')
)
raise ValueError(
'unable to reach terminator with a max chunk size of ' +
f'{max_size / (1024 ** 2)} MiB'
)
def read(self, size = -1) -> bytes:
"""read an arbitrary amount of data from the underlying stream.
"""
buffer = b''
# if no chunk has been read yet
if len(self._chunks_size) == 0:
# determine the size of the initial chunk
try:
ichunk_size = ChunksIO.get_chunk_size(
self._stream,
self.max_chunk_size
)
except ValueError as e:
raise ValueError(
f'chunk #{self._current_chunk}: {e}'
) from e
self._chunks_size.append(ichunk_size)
self._cursor += self._chunks_size[self._current_chunk][0]
# end position of current chunk
cc_end = sum(
[sum(c) for c in self._chunks_size[:self._current_chunk + 1]]
)
# if the requested read end position exceeds the end position of the
# current chunk and it's not the end chunk
if self._cursor + size > cc_end and \
self._chunks_size[self._current_chunk][1] != 0:
# size of remaining bytes to read from current chunk
cc_remaining = cc_end - self._cursor
buffer += self._stream.read(cc_remaining)
if len(buffer) != cc_remaining:
raise ValueError(
f'chunk #{self._current_chunk}: stream yielded too few bytes'
)
if self._buffer: self._buffer.write(buffer)
# determine the size of the next chunk
try:
chunk_size = ChunksIO.get_chunk_size(
self._stream,
self.max_chunk_size
)
except ValueError as e:
raise ValueError(
f'chunk #{self._current_chunk + 1}: {e}'
) from e
self._chunks_size.append(chunk_size)
self._current_chunk += 1
self._cursor += self._chunks_size[self._current_chunk][0]
size = size - cc_remaining
buffer += self._stream.read(size)
if self._buffer: self._buffer.write(buffer)
self._cursor += len(buffer)
return buffer
def readable() -> bool:
"""
"""
return True
def readChunk() -> bytes:
"""read until the end of a chunk
if buffered and cursor is not at the start position of a chunk, position
will be seeked backwards, prior to reading. If unbuffered and not at the
start position of a chunk, exception will be raised.
"""
if self._cursor != self._offset:
raise Exception(
'cursor not at starting position of a chunk. Mixing ' +
'read() and readChunk() calls is currently not supported.'
)
buffer = self.read(self._chunks_size[self._current_chunk])
if self._buffer: self._buffer.write(buffer)
self._cursor += len(buffer)
return buffer
def readChunks() -> bytes:
"""yield all chunks until the terminating 0 byte chunk is reached
"""
def tell() -> int:
"""return the current stream position
"""
return this._cursor