233 lines
8 KiB
Python
233 lines
8 KiB
Python
from io import BytesIO, IOBase
|
|
import math
|
|
from typing import Optional, Tuple, List
|
|
|
|
|
|
class ChunksIO(IOBase):
|
|
"""handler for HTTP/1.1 chunked transfer-encoded (RFC 9112 §7) byte streams
|
|
|
|
Compact and predictable implementation of a RFC 9112 compliant stream
|
|
handler, which exposes a common IOBase interface for treating chunked byte
|
|
streams as pure, unencoded byte streams.
|
|
|
|
.. notice::
|
|
|
|
The implementation is currently only concerned with read operations,
|
|
though the layout is prepared for an easy straightforward implementation
|
|
of write operations.
|
|
"""
|
|
#: maximum allowed size of a chunk
|
|
# MiB by default, just guessing 10 MiB is a sensible limit
|
|
max_chunk_size = int(10 * (1024 ** 2))
|
|
#: optional write-through buffer
|
|
_buffer: BytesIO
|
|
#: chunks sizes
|
|
_chunks_size: List[Tuple[int, int]]
|
|
#: index of current chunk
|
|
_current_chunk: int
|
|
#: cursor position on the underlying stream, as the stream is not expected
|
|
# to implement ``tell()``. Limiting factor of how large the stream may be.
|
|
# Look at ``sys.maxsize`` for more information.
|
|
_cursor: int
|
|
#: chunk encoded stream
|
|
_stream: BytesIO
|
|
|
|
def __init__(
|
|
self,
|
|
stream: BytesIO,
|
|
buffer: Optional[BytesIO] = None,
|
|
):
|
|
"""initialize the instance
|
|
|
|
.. notice::
|
|
|
|
The write-through buffer is required to be seekable, writable and
|
|
readable and MUST be considered locked during any operation of the
|
|
ChunksIO implementation. The buffer's cursor position does not
|
|
reflect the cursor position of the underlying stream.
|
|
|
|
:param stream: a byte-stream to abstract
|
|
:param buffer: write-through buffer for all read operations on the
|
|
underlying stream. This can be useful, if the data needs
|
|
to be accessed again later on.
|
|
:param max_chunk_size: the maximum size of a single chunk (excluding
|
|
it's bytes size segment)
|
|
"""
|
|
if stream.readable() == False:
|
|
raise Exception('expected readable stream')
|
|
|
|
if buffer != None:
|
|
if buffer.writable() == False:
|
|
raise Exception('expected writable buffer')
|
|
|
|
self._buffer = buffer
|
|
self._chunks_size = []
|
|
self._current_chunk = 0
|
|
self._cursor = 0
|
|
self._stream = stream
|
|
|
|
super().__init__()
|
|
|
|
@staticmethod
|
|
def get_chunk_size(
|
|
stream: BytesIO,
|
|
max_size: int,
|
|
) -> Tuple[int, int]:
|
|
"""get the size of the next chunk from a RFC 9112 (§7) chunk encoded
|
|
byte stream
|
|
|
|
stream cursor position is assumed to be at the start of the preceeding
|
|
byte size segment of chunk data. The max_size parameter is converted to
|
|
its bytes representation, to determine early on if a read is feasible
|
|
and won't cause a denial-of-service.
|
|
|
|
:param stream: the stream to read the chunk size from
|
|
:param max_size: the maximum allowed size a chunk can be. I wasn't able
|
|
to find a definitive limit defined in the RFC so this
|
|
is guess working and at least curl has a pretty big
|
|
chunk size of more than 6 MiB.
|
|
|
|
:returns: tuple of the size of the bytes size segment and the data
|
|
bytes size, whose sum is the total size of the chunk
|
|
"""
|
|
_terminator = b'\r\n'
|
|
chunk_size = b''
|
|
terminator = b''
|
|
|
|
# calculate the number of bytes the max_size byte representation
|
|
# requires. This is a precaution so that chunks can't be arbitrarily
|
|
# long.
|
|
max_size_bytes = math.ceil(max_size.bit_length() / 8)
|
|
|
|
# the iteration could be handled with less system calls by reading a
|
|
# larger *chunk* of data and iterating over that in-memory cache.
|
|
# Though, this would come at the expense of unpredictable memory
|
|
# consumption and would require a write-through buffer by default, in
|
|
# addition to making the implementation more complex.
|
|
for _ in range(max_size_bytes + len(_terminator)):
|
|
buf = stream.read(1)
|
|
|
|
if buf in _terminator: terminator += buf
|
|
else: chunk_size += buf
|
|
|
|
if terminator == _terminator:
|
|
if (not chunk_size):
|
|
raise ValueError(
|
|
'terminator reached without having parsed ' +
|
|
'any byte size'
|
|
)
|
|
|
|
return (
|
|
len(chunk_size + terminator),
|
|
int.from_bytes(chunk_size, byteorder='big')
|
|
)
|
|
|
|
raise ValueError(
|
|
'unable to reach terminator with a max chunk size of ' +
|
|
f'{max_size / (1024 ** 2)} MiB'
|
|
)
|
|
|
|
def read(self, size = -1) -> bytes:
|
|
"""read an arbitrary amount of data from the underlying stream.
|
|
"""
|
|
buffer = b''
|
|
|
|
# if no chunk has been read yet
|
|
if len(self._chunks_size) == 0:
|
|
# determine the size of the initial chunk
|
|
try:
|
|
ichunk_size = ChunksIO.get_chunk_size(
|
|
self._stream,
|
|
self.max_chunk_size
|
|
)
|
|
except ValueError as e:
|
|
raise ValueError(
|
|
f'chunk #{self._current_chunk}: {e}'
|
|
) from e
|
|
|
|
self._chunks_size.append(ichunk_size)
|
|
|
|
self._cursor += self._chunks_size[self._current_chunk][0]
|
|
|
|
# end position of current chunk
|
|
cc_end = sum(
|
|
[sum(c) for c in self._chunks_size[:self._current_chunk + 1]]
|
|
)
|
|
|
|
# if the requested read end position exceeds the end position of the
|
|
# current chunk and it's not the end chunk
|
|
if self._cursor + size > cc_end and \
|
|
self._chunks_size[self._current_chunk][1] != 0:
|
|
# size of remaining bytes to read from current chunk
|
|
cc_remaining = cc_end - self._cursor
|
|
|
|
buffer += self._stream.read(cc_remaining)
|
|
|
|
if len(buffer) != cc_remaining:
|
|
raise ValueError(
|
|
f'chunk #{self._current_chunk}: stream yielded too few bytes'
|
|
)
|
|
|
|
if self._buffer: self._buffer.write(buffer)
|
|
|
|
# determine the size of the next chunk
|
|
try:
|
|
chunk_size = ChunksIO.get_chunk_size(
|
|
self._stream,
|
|
self.max_chunk_size
|
|
)
|
|
except ValueError as e:
|
|
raise ValueError(
|
|
f'chunk #{self._current_chunk + 1}: {e}'
|
|
) from e
|
|
|
|
self._chunks_size.append(chunk_size)
|
|
|
|
self._current_chunk += 1
|
|
|
|
self._cursor += self._chunks_size[self._current_chunk][0]
|
|
|
|
size = size - cc_remaining
|
|
|
|
buffer += self._stream.read(size)
|
|
|
|
if self._buffer: self._buffer.write(buffer)
|
|
|
|
self._cursor += len(buffer)
|
|
|
|
return buffer
|
|
|
|
def readable() -> bool:
|
|
"""
|
|
"""
|
|
return True
|
|
|
|
def readChunk() -> bytes:
|
|
"""read until the end of a chunk
|
|
|
|
if buffered and cursor is not at the start position of a chunk, position
|
|
will be seeked backwards, prior to reading. If unbuffered and not at the
|
|
start position of a chunk, exception will be raised.
|
|
"""
|
|
if self._cursor != self._offset:
|
|
raise Exception(
|
|
'cursor not at starting position of a chunk. Mixing ' +
|
|
'read() and readChunk() calls is currently not supported.'
|
|
)
|
|
|
|
buffer = self.read(self._chunks_size[self._current_chunk])
|
|
|
|
if self._buffer: self._buffer.write(buffer)
|
|
|
|
self._cursor += len(buffer)
|
|
|
|
return buffer
|
|
|
|
def readChunks() -> bytes:
|
|
"""yield all chunks until the terminating 0 byte chunk is reached
|
|
"""
|
|
|
|
def tell() -> int:
|
|
"""return the current stream position
|
|
"""
|
|
return this._cursor
|