diff --git a/src/byteb4rb1e_utils/collections.py b/src/byteb4rb1e_utils/collections.py new file mode 100644 index 0000000..84cc82c --- /dev/null +++ b/src/byteb4rb1e_utils/collections.py @@ -0,0 +1,37 @@ +class CircularBuffer: + """circular buffer implementation for managing streamed data + """ + #: internal buffer storage maintaining a fixed size + buf: bytearray + #: maximum capacity of the buffer + size: int + #: index of the oldest element in the buffer + start: int + #: index where the next element will be inserted + end: int + #: indicates whether the buffer has overwritten older data + filled: bool + + def __init__(self, size: int): + """initializes the circular buffer with a fixed capacity + + :param size: maximum number of bytes the buffer can hold + """ + self.buf = bytearray(size) + self.size = size + self.start = 0 + self.end = 0 + self.filled = False + + def append(self, data: bytes): + """adds data to the circular buffer, overwriting old data if necessary + + :param data: byte sequence to append to the buffer + """ + for byte in data: + self.buf[self.end] = byte + self.end = (self.end + 1) % self.size + if self.end == self.start: # Overwriting case + self.start = (self.start + 1) % self.size + self.filled = True +