Compare commits

...
Sign in to create a new pull request.

2 commits

Author SHA1 Message Date
Rodney, Tiara
b9a67eb0a5
feat(string): add circular buffer support for KMP search 2025-05-04 03:03:14 +02:00
Rodney, Tiara
6f267c29e6
feat(string): init kmp string search 2025-05-04 02:34:49 +02:00

View file

@ -0,0 +1,68 @@
from typing import List, Union
from byteb4rb1e_utils.collections import CircularBuffer
class KnuthMorrisPratt:
"""Knuth-Morris-Pratt string searching algorithm implemented as a class
https://gwern.net/doc/cs/algorithm/1977-knuth.pdf
"""
def __init__(self, pattern: bytes):
"""
"""
self._table = KnuthMorrisPratt.build_table(pattern)
self._pattern = pattern
@staticmethod
def build_table(pattern: bytes) -> List[int]:
"""builds the failure table
"""
table = [0] * len(pattern)
j = 0
for i in range(1, len(pattern)):
while j > 0 and pattern[i] != pattern[j]:
j = table[j - 1]
if pattern[i] == pattern[j]:
j += 1
table[i] = j
return table
def match_linear(
self,
data: Union[bytes, bytearray, memoryview],
start: int = 0
) -> bool:
"""match against a linear fixed-size buffer
:returns: index of the match or -1 if not found
"""
m, j = len(self.pattern), 0
for i in range(start, len(data)):
while j > 0 and data[i] != self.pattern[j]:
j = self._table[j - 1]
if data[i] == self.pattern[j]:
j += 1
if j == m:
return i - m + 1
return -1
def match_circular(self, data: CircularBuffer):
"""Finds the boundary using KMP, handling circular wraparound."""
i, j = data.start, 0 # Start checking from the oldest data in the buffer
while j < len(boundary) and (data.filled or i != data.end):
if data.buf[i] == self.pattern[j]:
i = (i + 1) % data.size
j += 1
if j == len(self.pattern): # Full match found
return True
elif j > 0:
j = table[j - 1]
else:
i = (i + 1) % data.size
return False