chore: rename package

This commit is contained in:
Tiara Rodney 2025-06-20 20:33:37 +02:00
parent dd57ecabb9
commit 1fb1e0d0bf
No known key found for this signature in database
GPG key ID: 5F43FAB4FBE5B5EB
11 changed files with 7 additions and 7 deletions

View file

View file

@ -0,0 +1,82 @@
import unittest
from byteb4rb1e.utils.collections import CircularBuffer
class test_init(unittest.TestCase):
"""CircularBuffer.__init__()"""
def test_default(self):
"""Test buffer initializes correctly."""
buffer = CircularBuffer(10)
self.assertEqual(len(buffer.buf), 10)
self.assertEqual(buffer.size, 10)
self.assertEqual(buffer.start, 0)
self.assertEqual(buffer.end, 0)
self.assertFalse(buffer.filled)
class test_append(unittest.TestCase):
"""CircularBuffer.append()"""
def test_without_overflow(self):
"""Test appending bytes without overwriting."""
buffer = CircularBuffer(5)
buffer.append(b"abc")
self.assertEqual(buffer.buf[:3], bytearray(b"abc"))
self.assertEqual(buffer.start, 0)
self.assertEqual(buffer.end, 3)
self.assertFalse(buffer.filled)
def test_with_overflow(self):
"""appending bytes with overwriting"""
buffer = CircularBuffer(5)
buffer.append(b"abcde") # Fill buffer completely
buffer.append(b"fg") # Overwrite some elements
self.assertEqual(buffer.buf, bytearray(b"fgcde")) # Last five elements should remain
self.assertEqual(buffer.start, 3) # Should have moved forward due to overwrite
self.assertEqual(buffer.end, 2) # Wrapped around
self.assertTrue(buffer.filled) # Buffer should indicate overwrite occurred
def test_wraparound_behavior(self):
"""correct handling of buffer wraparound"""
buffer = CircularBuffer(4)
buffer.append(b"abcd") # Fill buffer completely
buffer.append(b"e") # Overwrite first element
self.assertEqual(buffer.buf, bytearray(b"ebcd")) # Should have wrapped around
self.assertEqual(buffer.start, 2)
self.assertEqual(buffer.end, 1)
self.assertTrue(buffer.filled)
def test_consecutive_overwrites(self):
"""repeated buffer overwrites across multiple cycles
verifies that the circular buffer correctly handles multiple overwrite cycles.
- Initially, the buffer is filled completely (`abcde`).
- New data (`fghij`) is appended, causing the oldest elements to be replaced.
- The buffer does not physically shift; only the `start` index moves forward as data is overwritten.
- When wrapping around, the `start` advances while `end` cycles back to zero.
Expected:
- Buffer contents should be `bytearray(b"fghij")`, containing only
the most recent data
- `start` moves forward due to full overwriting.
- `end` correctly wraps around.
- `filled` flag indicates that overwrites have occurred.
**Why `start = 1` instead of `0`?**
Well it took some brain juice for me to fully comprehend it, but in a
circular buffer, `start` always marks the oldest remaining element.
When a full overwrite occurs, `start` moves forward to indicate the next
oldest position, ensuring sequential ordering is preserved.
"""
buffer = CircularBuffer(5)
buffer.append(b"abcde") # Fill buffer
buffer.append(b"fghij") # Overwrite completely
self.assertEqual(buffer.buf, bytearray(b"fghij")) # Only the latest data remains
self.assertEqual(buffer.start, 1)
self.assertEqual(buffer.end, 0)
self.assertTrue(buffer.filled)

View file

@ -0,0 +1,186 @@
from io import BytesIO, IOBase
import unittest
from byteb4rb1e.utils.io import ChunksIO
class TestGetChunkSize(unittest.TestCase):
def test_default(self):
sample = int.to_bytes(100) + b'\r\n'
self.assertEqual(
ChunksIO.get_chunk_size(BytesIO(sample), ChunksIO.max_chunk_size),
(3, 100)
)
def test_oversized(self):
"""any 4-digit integer, exceeds the byte size definition of 512
"""
chunk_size = 512
sample = b''.join([int.to_bytes(1) for _ in range(4)]) + b'\r\n'
with self.assertRaises(ValueError) as result:
ChunksIO.get_chunk_size(BytesIO(sample), max_size=chunk_size)
self.assertTrue('unable to reach terminator' in str(result.exception))
def test_missing_terminator(self):
chunk_size = 512
sample = b''.join([int.to_bytes(9) for _ in range(chunk_size)])
with self.assertRaises(ValueError) as result:
ChunksIO.get_chunk_size(BytesIO(sample), max_size=chunk_size)
self.assertTrue('unable to reach terminator' in str(result.exception))
def test_missing_byte_size(self):
chunk_size = 512
sample = b'\r\n'
with self.assertRaises(ValueError) as result:
ChunksIO.get_chunk_size(BytesIO(sample), max_size=chunk_size)
self.assertTrue(
'without having parsed any byte size' in str(result.exception)
)
class TestRead(unittest.TestCase):
def test_default(self):
"""
"""
chunk_data = 'Foobar'.encode('utf-8')
chunk = int.to_bytes(len(chunk_data), byteorder="big") + \
b'\r\n' + \
chunk_data
self.assertEqual(ChunksIO(BytesIO(chunk)).read(), b'Foobar')
def test_perfect_multiple(self):
"""read operations match sizes of chunks
chunk 1 chunk 2
|----------|---------|
^
cursor is here
>------- --|
^
first requested read ends here
>---------|
^
second requested read ends here
"""
chunk1_data = 'Foobar'.encode('utf-8')
chunk1 = int.to_bytes(len(chunk1_data), byteorder="big") + \
b'\r\n' + \
chunk1_data
chunk2_data = 'RTFM'.encode('utf-8')
chunk2 = int.to_bytes(len(chunk2_data), byteorder="big") + \
b'\r\n' + \
chunk2_data
handler = ChunksIO(BytesIO(chunk1 + chunk2))
self.assertEqual(handler.read(6), b'Foobar')
self.assertEqual(handler.read(4), b'RTFM')
def test_imperfect_multiple_first(self):
"""first read operation does not match the size of the current chunk
chunk 1 chunk 2
|----------|---------|
^
cursor is here
>--------------|
^
first requested read ends here
>-----|
^
second requested read ends here
"""
chunk1_data = 'Foo'.encode('utf-8')
chunk1 = int.to_bytes(len(chunk1_data), byteorder="big") + \
b'\r\n' + \
chunk1_data
chunk2_data = 'barRTFM'.encode('utf-8')
chunk2 = int.to_bytes(len(chunk2_data), byteorder="big") + \
b'\r\n' + \
chunk2_data
handler = ChunksIO(BytesIO(chunk1 + chunk2))
self.assertEqual(handler.read(6), b'Foobar')
self.assertEqual(handler.read(4), b'RTFM')
def test_imperfect_multiple_second(self):
"""first read operation does not match the size of the current chunk
chunk 1 chunk 2
|----------|---------|
^
cursor is here
>------|
^
first requested read ends here
>-------------|
^
second requested read ends here
"""
chunk1_data = 'FoobarRT'.encode('utf-8')
chunk1 = int.to_bytes(len(chunk1_data), byteorder="big") + \
b'\r\n' + \
chunk1_data
chunk2_data = 'FM'.encode('utf-8')
chunk2 = int.to_bytes(len(chunk2_data), byteorder="big") + \
b'\r\n' + \
chunk2_data
handler = ChunksIO(BytesIO(chunk1 + chunk2))
self.assertEqual(handler.read(6), b'Foobar')
self.assertEqual(handler.read(4), b'RTFM')
def test_properly_terminated(self):
"""a proper termination chunk is emitted, resulting in no further
attempts to retrieved chunks, exposing the behavior of the underlying
stream
"""
chunk1_data = 'Foobar'.encode('utf-8')
chunk1 = int.to_bytes(len(chunk1_data), byteorder="big") + \
b'\r\n' + \
chunk1_data
chunk2_data = ''.encode('utf-8')
chunk2 = int.to_bytes(len(chunk2_data), byteorder="big") + \
b'\r\n' + \
chunk2_data
handler = ChunksIO(BytesIO(chunk1 + chunk2))
self.assertEqual(handler.read(6), b'Foobar')
self.assertEqual(handler.read(4), b'')
self.assertEqual(handler.read(4), b'')
def test_not_properly_terminated(self):
"""a proper termination chunk is emitted, resulting in no further
attempts to retrieved chunks, exposing the behavior of the underlying
stream
"""
chunk_data = 'Foobar'.encode('utf-8')
chunk = int.to_bytes(len(chunk_data), byteorder="big") + \
b'\r\n' + \
chunk_data
handler = ChunksIO(BytesIO(chunk))
self.assertEqual(handler.read(6), b'Foobar')
with self.assertRaises(ValueError) as context:
handler.read(4)
self.assertTrue('unable to reach terminator' in str(context.exception))

View file

@ -0,0 +1,61 @@
import unittest
from byteb4rb1e.utils.string import RollingHash
class test_compute_initial_hash(unittest.TestCase):
"""RollingHash.compute_initial_hash()
i calculated the hashes by hand, as a basis for this test case. Hopefully
there's no logical flaw...
"""
def test_default(self):
"""computation of hash"""
result = RollingHash.compute_initial_hash(
b'abcdefg',
base = 31,
mod = 10**9 + 7
)
self.assertEqual(result, 988021244)
class test___init__(unittest.TestCase):
"""RollingHash.__init__()
Make sure the class instance is initialized correctly
I calculated the hashes by hand, as a basis for this test case. Hopefully
there's no logical flaw...
"""
def test_default(self):
"""computation of initial hash and highest base factor"""
instance = RollingHash(b'abcdefg')
self.assertEqual(instance._hash, 988021244)
self.assertEqual(instance._hbase_factor, 887503681)
def test_defaults_override(self):
"""override of defaults"""
instance = RollingHash(
b'abcdefg',
base = 9,
mod = 4
)
self.assertEqual(instance._mod, 4)
self.assertEqual(instance._base, 9)
class test_roll(unittest.TestCase):
"""RollingHash.roll()"""
def test_rolling_hash(self):
base=31
mod=10**9 + 7
rh = RollingHash(b"foobar", base=base, mod=mod)
rolled_hash = rh.roll(ord("f"), ord("n"))
control_hash = RollingHash.compute_initial_hash(b"oobarn", base, mod)
self.assertEqual(rolled_hash, control_hash)