Compare commits

...
Sign in to create a new pull request.

1 commit

Author SHA1 Message Date
Rodney, Tiara
e5f0d1df58
feat(string): init ChunkedRollingHash 2025-05-06 16:39:44 +02:00
3 changed files with 282 additions and 1 deletions

88
NOTES Normal file
View file

@ -0,0 +1,88 @@
These are just a couple of brain farts that came up and I'd rather note down.
There's no clear structure.
RFC 1341 Boundary Matching in a Circular Buffer
1. Algorithm Considerations
Knuth-Morris-Pratt (KMP) Limitations:
Useful when patterns have prefix-suffix overlaps for efficient skipping.
If the failure table consists only of zeros, KMP provides no speed advantage
over naive searching.
Boundary pattern is arbitrary, meaning KMPs preprocessing may not be
beneficial.
Alternatives to KMP:
Rabin-Karp rolling hash → Uses fast hash comparisons instead of
character-by-character matching.
Boyer-Moore-Horspool → Precomputes skip distances to avoid redundant
comparisons, works well for longer patterns.
Crochemore-Perrin two-way search → used by str.find(), flexible
but assumes a linear memory layout so not really applicable for my circular
buffer approach
2. Boundary Characteristics
Max length: 70 bytes. Character set: ASCII only. No structure guarantees: The
boundary is client-defined, so I must be able to handle arbitrary sequences.
3. Algorithm Selection
Rolling Hash → Best for arbitrary short-to-medium patterns in a circular buffer.
Boyer-Moore → Ideal if the boundary has distinct character distributions to
optimize skipping.
# Optimized Chunk-Based Rolling Hash Matching
We need to efficiently detect an RFC 1341 multipart boundary inside a circular
buffer, ensuring minimal overhead while avoiding unnecessary comparisons.
Traditional approaches like Knuth-Morris-Pratt (KMP) dont provide an advantage
when the boundary lacks repeated subpatterns. Meanwhile, full rolling hash
matching scans every byte, which can be wasteful.
Thus, we introduce a chunk-wise hash-based skipping strategy, allowing us to
skip large sections of the buffer when an early non-match is detected.
## Core Idea
Precompute hashes for evenly sized chunks of the boundary. -> First, match only
the hash of the first chunk → immediately skip unnecessary buffer sections if no
match. -> If the first chunk matches, progressively verify subsequent chunks
until the full boundary is confirmed. Benefits Over Full Matching
## Benefits Over Full Matching
- Reduces comparisons significantly → eliminates large sections early when
non-matches occur.
- Balances preprocessing cost vs runtime → faster
elimination means fewer wasted cycles.
Integrates seamlessly into circular buffers → allows skipping intelligently.
### Precompute Chunk Hashes
- Divide the pattern into `N` equal-sized chunks (e.g., 7 chunks of 10 bytes
for a 70-byte boundary).
- Compute a rolling hash for each chunk in addition to the full pattern, storing
them for quick lookup.
### Sliding Window Search in the Buffer
- Compute the rolling hash for each window of size chunk_size.
- Compare the first chunks hash with the buffer window.
- If no match, skip boundary_length - chunk_size bytes.
### Progressive Chunk Verification
- If the first chunk matches, verify the next chunk sequentially.
- Continue matching chunks until the full boundary is confirmed.
- Perform final character-by-character validation to rule out hash collisions.

View file

@ -1,4 +1,6 @@
from typing import Optional from dataclasses import dataclass
import math
from typing import List, Optional, Tuple
class RollingHash: class RollingHash:
@ -89,3 +91,138 @@ class RollingHash:
self._hash = (self._hash * self.base + new_byte) % self.mod self._hash = (self._hash * self.base + new_byte) % self.mod
return self._hash return self._hash
@dataclass
class ChunkedRollingHashOptions:
"""
"""
max_chunk_size: int = 10
base: int = RollingHash.base
mod: int = RollingHash.mod
class ChunkedRollingHash:
"""Chunked Rolling hash for linear and circular buffers
This implementation was inspired by the Rabin-Karp rolling hash
algorithm.
A search pattern is chunked and for each chunk its hash is calculated.
I came up with this approach as the requirement for efficient RFC1341 HTTP
multipart entity boundary matching for stream data in a circular/ring
buffer. I've tested a couple of algorithms, but none gave me any real
performance improvements over a naive/bruteforce search.
That's how this algorithm came to be. Big O? I don't know (yet)...
Why this is more performant for my specific use-cases?
------------------------------------------------------
#. Precompute hashes for evenly sized chunks of a search pattern, in
addition of a hash of the full search-pattern.
#. First, match only the hash of the first chunk → immediately skip
unnecessary buffer sections if no match.
#. If the first chunk matches, progressively verify subsequent chunks,
until the full search pattern is confirmed.
Benefits Over Full Matching
---------------------------
- Reduces comparisons significantly eliminates large sections early when
non-matches occur.
- Balances preprocessing cost vs runtime faster elimination means fewer
wasted cycles.
- Integrates seamlessly into circular buffers allows skipping
intelligently.
"""
_chunk_count: int
#: hashes of chunks of search string
_chunks_hash: List[int]
#: hash of the full search string
_hash: int
#: length of search string
_length: int
#: remainder for calculating the actual size of the last chunk
_remainder: int
_base: int
_mod: int
def __init__(
self,
data: bytes,
options: ChunkedRollingHashOptions = ChunkedRollingHashOptions()
):
"""
"""
self._base = options.base
self._mod = options.mod
self._max_chunk_size = options.max_chunk_size
self._chunks_hash = []
self._hash = RollingHash.compute_initial_hash(
data,
base = self._base,
mod = self._mod
)
self._length = len(data)
# only the last chunk differs in size; store its remainder separately
# for optimized handling
self._remainder = self._length % self._max_chunk_size
self._chunk_count = math.ceil(self._length / self._max_chunk_size)
# tracks chunk progression during matching
self._current = 0
# precompute hashes for all chunks to enable rapid comparison
for i in range(0, self._chunk_count):
chunk = data[i*self._max_chunk_size:(i+1)*self._max_chunk_size]
self._chunks_hash.append(
RollingHash.compute_initial_hash(chunk, base=self._base, mod=self._mod)
)
def match(
self,
data: bytes
):
"""match a buffer against a search string through chunked hashing
"""
# progressively match each chunk
for i in range(self._current, self._chunk_count - 1):
chunk = data[i*self._max_chunk_size:(i+1)*self._max_chunk_size]
# no more data left to process
if chunk == b'': break
chunk_hash = RollingHash.compute_initial_hash(
chunk,
base = self._base,
mod = self._mod
)
if chunk_hash != self._chunks_hash[i]:
self._current = 0
return False
self._current += 1
# processing hasn't completed for last chunk to be processed yet
if self._current != self._chunk_count - 1:
return
last_chunk = data[-self._remainder:]
last_chunk_hash = RollingHash.compute_initial_hash(
last_chunk,
base = self._base,
mod = self._mod
)
if self._chunks_hash[self._current] == last_chunk_hash:
return True
self._current = 0
return False

View file

@ -0,0 +1,56 @@
import unittest
from byteb4rb1e_utils.string import (
ChunkedRollingHash,
ChunkedRollingHashOptions,
RollingHash,
)
class test___init__(unittest.TestCase):
"""ChunkedRollingHash.__init__()"""
def test_default(self):
"""default options"""
result = ChunkedRollingHash(b'abcdefgh')
self.assertEqual(result._mod, ChunkedRollingHashOptions.mod)
self.assertEqual(result._base, ChunkedRollingHashOptions.base)
self.assertEqual(result._max_chunk_size, ChunkedRollingHashOptions.max_chunk_size)
control_hash = RollingHash.compute_initial_hash(
b'abcdefgh',
base = result._base,
mod = result._mod
)
self.assertEqual(result._length, 8)
self.assertEqual(result._chunk_count, 1)
self.assertEqual(len(result._chunks_hash), result._chunk_count)
self.assertEqual(result._hash, control_hash)
self.assertEqual(result._chunks_hash[0], control_hash)
def test_override(self):
"""override of options"""
options = ChunkedRollingHashOptions(
mod = 4,
base = 10,
max_chunk_size = 5,
)
result = ChunkedRollingHash(b'abcdefgh', options)
self.assertEqual(result._mod, options.mod)
self.assertEqual(result._base, options.base)
self.assertEqual(result._max_chunk_size, options.max_chunk_size)
control_hash1 = RollingHash.compute_initial_hash(
b'abcde',
base = result._base,
mod = result._mod
)
control_hash2 = RollingHash.compute_initial_hash(
b'fgh',
base = result._base,
mod = result._mod
)
self.assertEqual(result._chunks_hash[0], control_hash1)
self.assertEqual(result._chunks_hash[1], control_hash2)