Compare commits
2 commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b9a67eb0a5 | ||
|
|
6f267c29e6 |
1 changed files with 68 additions and 0 deletions
68
src/byteb4rb1e_utils/string.py
Normal file
68
src/byteb4rb1e_utils/string.py
Normal file
|
|
@ -0,0 +1,68 @@
|
||||||
|
from typing import List, Union
|
||||||
|
|
||||||
|
from byteb4rb1e_utils.collections import CircularBuffer
|
||||||
|
|
||||||
|
|
||||||
|
class KnuthMorrisPratt:
|
||||||
|
"""Knuth-Morris-Pratt string searching algorithm implemented as a class
|
||||||
|
|
||||||
|
https://gwern.net/doc/cs/algorithm/1977-knuth.pdf
|
||||||
|
"""
|
||||||
|
def __init__(self, pattern: bytes):
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
self._table = KnuthMorrisPratt.build_table(pattern)
|
||||||
|
self._pattern = pattern
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def build_table(pattern: bytes) -> List[int]:
|
||||||
|
"""builds the failure table
|
||||||
|
"""
|
||||||
|
table = [0] * len(pattern)
|
||||||
|
j = 0
|
||||||
|
for i in range(1, len(pattern)):
|
||||||
|
while j > 0 and pattern[i] != pattern[j]:
|
||||||
|
j = table[j - 1]
|
||||||
|
if pattern[i] == pattern[j]:
|
||||||
|
j += 1
|
||||||
|
table[i] = j
|
||||||
|
return table
|
||||||
|
|
||||||
|
def match_linear(
|
||||||
|
self,
|
||||||
|
data: Union[bytes, bytearray, memoryview],
|
||||||
|
start: int = 0
|
||||||
|
) -> bool:
|
||||||
|
"""match against a linear fixed-size buffer
|
||||||
|
|
||||||
|
:returns: index of the match or -1 if not found
|
||||||
|
"""
|
||||||
|
m, j = len(self.pattern), 0
|
||||||
|
|
||||||
|
for i in range(start, len(data)):
|
||||||
|
while j > 0 and data[i] != self.pattern[j]:
|
||||||
|
j = self._table[j - 1]
|
||||||
|
|
||||||
|
if data[i] == self.pattern[j]:
|
||||||
|
j += 1
|
||||||
|
|
||||||
|
if j == m:
|
||||||
|
return i - m + 1
|
||||||
|
return -1
|
||||||
|
|
||||||
|
def match_circular(self, data: CircularBuffer):
|
||||||
|
"""Finds the boundary using KMP, handling circular wraparound."""
|
||||||
|
i, j = data.start, 0 # Start checking from the oldest data in the buffer
|
||||||
|
|
||||||
|
while j < len(boundary) and (data.filled or i != data.end):
|
||||||
|
if data.buf[i] == self.pattern[j]:
|
||||||
|
i = (i + 1) % data.size
|
||||||
|
j += 1
|
||||||
|
if j == len(self.pattern): # Full match found
|
||||||
|
return True
|
||||||
|
elif j > 0:
|
||||||
|
j = table[j - 1]
|
||||||
|
else:
|
||||||
|
i = (i + 1) % data.size
|
||||||
|
|
||||||
|
return False
|
||||||
Loading…
Add table
Add a link
Reference in a new issue