Implemented Rabin-Karp rolling hash class abstraction. After testing muliple algorithms for efficient substring searching in a stream abstracted by a ring buffer, I've dropped the idea of using KMP in favor of implementing my own algorithm based of the Rabin-Karp rolling hash algorithm.
91 lines
2.9 KiB
Python
91 lines
2.9 KiB
Python
from typing import Optional
|
|
|
|
|
|
class RollingHash:
|
|
"""implementation of Rabin-Karp rolling hash
|
|
"""
|
|
#: default base
|
|
base: int = 31
|
|
#: default modulus
|
|
mod: int = 10**9 + 7
|
|
#: current computed hash
|
|
_hash: int
|
|
#: prime number base (e.g., 31)
|
|
_base: int
|
|
#: large prime modulus (to prevent overflow)
|
|
_mod: int
|
|
# Precomputation of ``base^(length-1) % mod`` for removing the old byte when
|
|
# rolling over
|
|
_hbase_factor: int
|
|
|
|
def __init__(
|
|
self,
|
|
data: bytes,
|
|
base: Optional[int] = None,
|
|
mod: Optional[int] = None
|
|
):
|
|
"""Initialize the rolling hash with a given base and modulus.
|
|
|
|
base: Prime number base (e.g., 31)
|
|
mod: Large prime modulus to prevent overflow
|
|
length: Length of the pattern to match
|
|
"""
|
|
self._base = base if base else RollingHash.base
|
|
|
|
self._mod = mod if mod else RollingHash.mod
|
|
|
|
self._hash = RollingHash.compute_initial_hash(
|
|
data,
|
|
self._base,
|
|
self._mod
|
|
)
|
|
|
|
self._hbase_factor = pow(self._base, len(data) - 1, self._mod)
|
|
|
|
@staticmethod
|
|
def compute_initial_hash(
|
|
data: bytes,
|
|
base: int,
|
|
mod: int,
|
|
) -> int:
|
|
"""Compute the hash for the initial window (first `length` bytes).
|
|
|
|
rather use this standalone for computing the hash of the search pattern,
|
|
to avoid the overhead of instantiating an object.
|
|
|
|
:param data: data to build hash for
|
|
:param base:
|
|
:param: mod:
|
|
|
|
:returns: hash of data
|
|
"""
|
|
hash_ = 0
|
|
for i in range(len(data)):
|
|
# computing the modulus at each iteration, as to avoid the summed
|
|
# integer to be chunky, as in HUUUUGEE...
|
|
hash_ = (hash_ * base + data[i]) % mod
|
|
return hash_
|
|
|
|
def roll(self, old_byte: int, new_byte: int) -> int:
|
|
"""Efficiently update hash by removing ``old_byte`` and adding
|
|
``new_byte``
|
|
|
|
The old_byte removal uses a pre-computed value of the highest base used
|
|
in the polynomial calculation. This speeds things up a bit.
|
|
|
|
I was thinking about a way on how to store the old_byte efficiently
|
|
within the class object, but that would require storing the entire data,
|
|
basically doubling the memory consumption as the data must definetly
|
|
also live outside of the class object. A memoryview could solve this
|
|
problem, but at the cost of making the implementation more complex, so
|
|
this will have to do.
|
|
|
|
:param old_byte: The ordinal of the first byte in buffer to roll over
|
|
:param new_byte: The ordinal of the byte newly appended to the buffer
|
|
"""
|
|
# Remove old
|
|
self._hash = (self._hash - old_byte * self._hbase_factor) % self.mod
|
|
# Add new
|
|
self._hash = (self._hash * self.base + new_byte) % self.mod
|
|
|
|
return self._hash
|