186 lines
5.8 KiB
Python
186 lines
5.8 KiB
Python
from io import BytesIO, IOBase
|
|
import unittest
|
|
|
|
from byteb4rb1e_utils.io import ChunksIO
|
|
|
|
|
|
class TestGetChunkSize(unittest.TestCase):
|
|
|
|
def test_default(self):
|
|
sample = int.to_bytes(100) + b'\r\n'
|
|
|
|
self.assertEqual(
|
|
ChunksIO.get_chunk_size(BytesIO(sample), ChunksIO.max_chunk_size),
|
|
(3, 100)
|
|
)
|
|
|
|
def test_oversized(self):
|
|
"""any 4-digit integer, exceeds the byte size definition of 512
|
|
"""
|
|
chunk_size = 512
|
|
sample = b''.join([int.to_bytes(1) for _ in range(4)]) + b'\r\n'
|
|
|
|
with self.assertRaises(ValueError) as result:
|
|
ChunksIO.get_chunk_size(BytesIO(sample), max_size=chunk_size)
|
|
|
|
self.assertTrue('unable to reach terminator' in str(result.exception))
|
|
|
|
def test_missing_terminator(self):
|
|
chunk_size = 512
|
|
sample = b''.join([int.to_bytes(9) for _ in range(chunk_size)])
|
|
|
|
with self.assertRaises(ValueError) as result:
|
|
ChunksIO.get_chunk_size(BytesIO(sample), max_size=chunk_size)
|
|
|
|
self.assertTrue('unable to reach terminator' in str(result.exception))
|
|
|
|
def test_missing_byte_size(self):
|
|
chunk_size = 512
|
|
sample = b'\r\n'
|
|
|
|
with self.assertRaises(ValueError) as result:
|
|
ChunksIO.get_chunk_size(BytesIO(sample), max_size=chunk_size)
|
|
|
|
self.assertTrue(
|
|
'without having parsed any byte size' in str(result.exception)
|
|
)
|
|
|
|
|
|
class TestRead(unittest.TestCase):
|
|
def test_default(self):
|
|
"""
|
|
"""
|
|
chunk_data = 'Foobar'.encode('utf-8')
|
|
chunk = int.to_bytes(len(chunk_data), byteorder="big") + \
|
|
b'\r\n' + \
|
|
chunk_data
|
|
|
|
self.assertEqual(ChunksIO(BytesIO(chunk)).read(), b'Foobar')
|
|
|
|
def test_perfect_multiple(self):
|
|
"""read operations match sizes of chunks
|
|
|
|
chunk 1 chunk 2
|
|
|----------|---------|
|
|
^
|
|
cursor is here
|
|
>------- --|
|
|
^
|
|
first requested read ends here
|
|
>---------|
|
|
^
|
|
second requested read ends here
|
|
"""
|
|
chunk1_data = 'Foobar'.encode('utf-8')
|
|
chunk1 = int.to_bytes(len(chunk1_data), byteorder="big") + \
|
|
b'\r\n' + \
|
|
chunk1_data
|
|
|
|
chunk2_data = 'RTFM'.encode('utf-8')
|
|
chunk2 = int.to_bytes(len(chunk2_data), byteorder="big") + \
|
|
b'\r\n' + \
|
|
chunk2_data
|
|
|
|
handler = ChunksIO(BytesIO(chunk1 + chunk2))
|
|
|
|
self.assertEqual(handler.read(6), b'Foobar')
|
|
self.assertEqual(handler.read(4), b'RTFM')
|
|
|
|
def test_imperfect_multiple_first(self):
|
|
"""first read operation does not match the size of the current chunk
|
|
|
|
chunk 1 chunk 2
|
|
|----------|---------|
|
|
^
|
|
cursor is here
|
|
>--------------|
|
|
^
|
|
first requested read ends here
|
|
>-----|
|
|
^
|
|
second requested read ends here
|
|
"""
|
|
chunk1_data = 'Foo'.encode('utf-8')
|
|
chunk1 = int.to_bytes(len(chunk1_data), byteorder="big") + \
|
|
b'\r\n' + \
|
|
chunk1_data
|
|
|
|
chunk2_data = 'barRTFM'.encode('utf-8')
|
|
chunk2 = int.to_bytes(len(chunk2_data), byteorder="big") + \
|
|
b'\r\n' + \
|
|
chunk2_data
|
|
|
|
handler = ChunksIO(BytesIO(chunk1 + chunk2))
|
|
|
|
self.assertEqual(handler.read(6), b'Foobar')
|
|
self.assertEqual(handler.read(4), b'RTFM')
|
|
|
|
def test_imperfect_multiple_second(self):
|
|
"""first read operation does not match the size of the current chunk
|
|
|
|
chunk 1 chunk 2
|
|
|----------|---------|
|
|
^
|
|
cursor is here
|
|
>------|
|
|
^
|
|
first requested read ends here
|
|
>-------------|
|
|
^
|
|
second requested read ends here
|
|
"""
|
|
chunk1_data = 'FoobarRT'.encode('utf-8')
|
|
chunk1 = int.to_bytes(len(chunk1_data), byteorder="big") + \
|
|
b'\r\n' + \
|
|
chunk1_data
|
|
|
|
chunk2_data = 'FM'.encode('utf-8')
|
|
chunk2 = int.to_bytes(len(chunk2_data), byteorder="big") + \
|
|
b'\r\n' + \
|
|
chunk2_data
|
|
|
|
handler = ChunksIO(BytesIO(chunk1 + chunk2))
|
|
|
|
self.assertEqual(handler.read(6), b'Foobar')
|
|
self.assertEqual(handler.read(4), b'RTFM')
|
|
|
|
def test_properly_terminated(self):
|
|
"""a proper termination chunk is emitted, resulting in no further
|
|
attempts to retrieved chunks, exposing the behavior of the underlying
|
|
stream
|
|
"""
|
|
chunk1_data = 'Foobar'.encode('utf-8')
|
|
chunk1 = int.to_bytes(len(chunk1_data), byteorder="big") + \
|
|
b'\r\n' + \
|
|
chunk1_data
|
|
|
|
chunk2_data = ''.encode('utf-8')
|
|
chunk2 = int.to_bytes(len(chunk2_data), byteorder="big") + \
|
|
b'\r\n' + \
|
|
chunk2_data
|
|
|
|
handler = ChunksIO(BytesIO(chunk1 + chunk2))
|
|
|
|
self.assertEqual(handler.read(6), b'Foobar')
|
|
self.assertEqual(handler.read(4), b'')
|
|
self.assertEqual(handler.read(4), b'')
|
|
|
|
def test_not_properly_terminated(self):
|
|
"""a proper termination chunk is emitted, resulting in no further
|
|
attempts to retrieved chunks, exposing the behavior of the underlying
|
|
stream
|
|
"""
|
|
chunk_data = 'Foobar'.encode('utf-8')
|
|
chunk = int.to_bytes(len(chunk_data), byteorder="big") + \
|
|
b'\r\n' + \
|
|
chunk_data
|
|
|
|
handler = ChunksIO(BytesIO(chunk))
|
|
|
|
self.assertEqual(handler.read(6), b'Foobar')
|
|
|
|
with self.assertRaises(ValueError) as context:
|
|
handler.read(4)
|
|
|
|
self.assertTrue('unable to reach terminator' in str(context.exception))
|
|
|