User Guide¶
Building an Index¶
Use IndexBuilder to create a sparse index from
document impact vectors. Each document is represented as a set of term
indices with associated impact values.
import numpy as np
import impact_index
builder = impact_index.IndexBuilder("/path/to/index")
# Add documents: docid, term_indices, impact_values
terms = np.array([0, 5, 42], dtype=np.uintp)
values = np.array([1.2, 0.5, 3.1], dtype=np.float32)
builder.add(0, terms, values)
# More documents...
builder.add(1, np.array([2, 5, 8], dtype=np.uintp),
np.array([0.3, 0.9, 1.1], dtype=np.float32))
# Finalize and get a searchable index
index = builder.build(in_memory=True)
Builder options¶
Use BuilderOptions to control checkpointing
(for crash recovery) and memory usage:
options = impact_index.BuilderOptions()
options.checkpoint_frequency = 100000 # checkpoint every N documents
options.in_memory_threshold = 1000000 # max postings per term before flush
builder = impact_index.IndexBuilder("/path/to/index", options=options)
# Resume from a checkpoint (returns None if no checkpoint exists)
last_docid = builder.get_checkpoint_doc_id()
if last_docid is not None:
print(f"Resuming from document {last_docid}")
Storage dtype¶
By default, impact values are stored as float32. You can choose a
different on-disk type to trade precision for space:
# Use float16 for smaller indices
builder = impact_index.IndexBuilder("/path/to/index", dtype="float16")
Supported dtypes: "float32" (default), "float16", "bfloat16",
"float64", "int32", "int64".
Searching¶
Load an existing index and search it with WAND or MaxScore. Both return
a list of ScoredDocument:
import impact_index
index = impact_index.Index.load("/path/to/index", in_memory=True)
# Query: {term_index: query_weight}
query = {5: 1.0, 10: 0.5, 42: 1.5}
# WAND algorithm
results = index.search_wand(query, top_k=10)
for doc in results:
print(f"Document {doc.docid}: {doc.score}")
# MaxScore algorithm (often faster on compressed/split indices)
results = index.search_maxscore(query, top_k=10)
Async search¶
For non-blocking retrieval (e.g., in a web server):
results = await index.aio_search_wand(query, top_k=10)
results = await index.aio_search_maxscore(query, top_k=10)
Iterating over postings¶
You can inspect individual posting lists. Each element is a
TermImpact:
BM25 and Bag-of-Words Indexing¶
For traditional IR with BM25 scoring, use
BOWIndexBuilder instead of
IndexBuilder. It automatically tracks document
lengths and optionally integrates text analysis (tokenization + stemming).
Pre-tokenized input¶
If you already have term indices and term-frequency values:
import numpy as np
import impact_index
builder = impact_index.BOWIndexBuilder("/path/to/index", dtype="int32")
# Add documents: docid, term_indices, tf_values
terms = np.array([0, 5, 42], dtype=np.uintp)
tf = np.array([3, 1, 2], dtype=np.int32)
builder.add(0, terms, tf)
builder.add(1, np.array([2, 5, 8], dtype=np.uintp),
np.array([1, 4, 1], dtype=np.int32))
# Build returns searchable Index (doc metadata stored automatically)
index = builder.build(in_memory=True)
# Create a BM25-scored index (doc lengths loaded automatically)
scored = index.with_scoring(impact_index.BM25Scoring(k1=1.2, b=0.75))
# Search with MaxScore (fastest algorithm)
query = {0: 1.0, 5: 1.0}
results = scored.search_maxscore(query, top_k=10)
for doc in results:
print(f"Document {doc.docid}: {doc.score}")
Raw text input with stemming¶
For direct text indexing with automatic tokenization, stemming, and vocabulary management:
import impact_index
builder = impact_index.BOWIndexBuilder(
"/path/to/index",
dtype="int32",
stemmer="porter", # Lucene-compatible Porter stemmer
stop_words=True, # Lucene default English stop words
)
builder.add_text(0, "the quick brown fox jumps over the lazy dog")
builder.add_text(1, "a quick brown cat jumps high")
builder.add_text(2, "the lazy dog sleeps all day")
# Build index (doc metadata and analyzer config saved automatically)
index = builder.build(in_memory=True)
# BM25 scoring (doc lengths loaded automatically from index)
scored = index.with_scoring(impact_index.BM25Scoring())
# Query analysis (analyzer loaded automatically from index)
query = index.analyzer().analyze_query("quick fox")
results = scored.search_maxscore(query, top_k=10)
Stop words¶
Stop words (common words like “the”, “is”, “a”) can be filtered during indexing and querying to reduce index size and improve search speed. Built-in stop word lists from Lucene are available for 17 languages.
import impact_index
# Use default stop words for the language (matches Lucene/Pyserini)
builder = impact_index.BOWIndexBuilder(
"/path/to/index",
stemmer="snowball",
language="english",
stop_words=True,
)
# Or provide an explicit list
builder = impact_index.BOWIndexBuilder(
"/path/to/index",
stemmer="snowball",
stop_words=["the", "a", "is", "in"],
)
# Get the stop word list for any supported language
words = impact_index.get_stop_words("english") # 33 words (Lucene default)
words = impact_index.get_stop_words("french") # 154 words
words = impact_index.get_stop_words("german") # 231 words
Supported languages: arabic, danish, dutch, english, finnish, french, german, greek, hungarian, italian, norwegian, portuguese, romanian, russian, spanish, swedish, turkish.
Note
For fair comparison with Pyserini/Lucene, always enable stop words. Without them, high-frequency terms like “the” create very long posting lists that slow down search significantly.
Loading a saved index¶
The index automatically detects and loads auxiliary components (doc metadata, analyzer config, vocabulary) from the directory:
import impact_index
index = impact_index.Index.load("/path/to/index", in_memory=True)
# Doc metadata and analyzer are loaded automatically
scored = index.with_scoring(impact_index.BM25Scoring())
analyzer = index.analyzer()
query = analyzer.analyze_query("quick fox")
results = scored.search_maxscore(query, top_k=10)
Compression and Transforms¶
Compressed indices use PFOR-delta for doc IDs and adaptive bitpacking for values, with 128-posting blocks that enable block-max pruning.
Quick compression¶
The simplest way to compress an index:
import impact_index
index = impact_index.Index.load("/path/to/raw_index", in_memory=True)
# Compress with defaults (PFOR doc IDs, lossless integer TF, block_size=128)
compressed = index.compress("/path/to/compressed")
# The compressed index is standalone — includes vocab, docmeta, analyzer
scored = compressed.with_scoring(impact_index.BM25Scoring())
# For neural IR with float impacts, use quantization:
compressed = index.compress("/path/to/compressed", nbits=8)
The compressed index is fully standalone: auxiliary files (vocabulary, analyzer config, document metadata) are automatically copied from the source index.
The default settings are optimized for BM25:
block_size=128: aligns with SIMD registers and enables effective block-max pruning during search.
nbits=0 (default): lossless integer bitpacking for TF counts (~2-3 bits per value). Use
nbits=8ornbits=16for quantized float compression (neural IR like SPLADE).
Advanced compression¶
For full control over compressors, use
CompressionTransform:
import impact_index
index = impact_index.Index.load("/path/to/raw_index", in_memory=True)
# Choose compressors
docid_compressor = impact_index.BitPackingCompressor() # SIMD (recommended)
# docid_compressor = impact_index.EliasFanoCompressor() # alternative
# Fixed-range quantization (if you know the value range)
impact_compressor = impact_index.ImpactQuantizer(nbits=8, min=0.0, max=10.0)
# Or auto-ranging quantization (determines range from the index)
impact_compressor = impact_index.GlobalImpactQuantizer(nbits=8)
# Apply compression
transform = impact_index.CompressionTransform(
max_block_size=128,
doc_ids_compressor=docid_compressor,
impacts_compressor=impact_compressor,
)
transform.process("/path/to/compressed", index)
# Load the compressed index
compressed = impact_index.Index.load("/path/to/compressed", in_memory=True)
Splitting by quantiles¶
SplitIndexTransform partitions each term’s postings
into sub-lists by value ranges, enabling more aggressive pruning with
MaxScore:
base_transform = impact_index.CompressionTransform(
max_block_size=128,
doc_ids_compressor=impact_index.EliasFanoCompressor(),
impacts_compressor=impact_index.GlobalImpactQuantizer(nbits=8),
)
split_transform = impact_index.SplitIndexTransform(
quantiles=[0.5, 0.9], # split at 50th and 90th percentile
sink=base_transform,
)
split_transform.process("/path/to/split_index", index)
BMP (Block-Max Pruning)¶
BMP implements “Faster Learned Sparse Retrieval with Block-Max Pruning” (SIGIR 2024) for fast approximate search.
Converting to BMP format¶
import impact_index
index = impact_index.Index.load("/path/to/index", in_memory=True)
# Streaming conversion (recommended — memory-efficient)
index.to_bmp_streaming("/path/to/bmp_index.bin", bsize=64, compress_range=True)
# Or legacy method (loads all postings into memory)
# index.to_bmp("/path/to/bmp_index.bin", bsize=64, compress_range=True)
Searching with BMP¶
Load a BMP index with BmpSearcher and search:
searcher = impact_index.BmpSearcher("/path/to/bmp_index.bin")
print(f"Documents: {searcher.num_documents()}")
# Query uses string term IDs
query = {"term1": 1.0, "term2": 0.5}
doc_ids, scores = searcher.search(query, k=10, alpha=1.0, beta=1.0)
for docid, score in zip(doc_ids, scores):
print(f"{docid}: {score}")
BMP search parameters:
k— number of results to returnalpha— controls early termination aggressiveness (default: 1.0)beta— controls block skipping (default: 1.0)
Document Store¶
The document store provides compressed storage for document content and metadata, using zstd block compression. Documents can be retrieved by sequential number or by key fields.
Building a store¶
Use DocumentStoreBuilder to create a store:
import impact_index
builder = impact_index.DocumentStoreBuilder(
"/path/to/store",
block_size=4096, # documents per compressed block
zstd_level=3, # compression level
)
# Add documents with key-value metadata and binary content
builder.add({"docno": "DOC001", "url": "http://example.com"}, b"document text here")
builder.add({"docno": "DOC002", "url": "http://example.com/2"}, b"another document")
# Finalize (can only be called once)
builder.build()
Resumable builds (crash recovery)¶
For long-running ingests, pass checkpoint_frequency to periodically
persist the in-flight builder state. If the process crashes or exits
before build(), re-instantiating the builder against the same folder
(with the same non-zero checkpoint_frequency) will resume from the
last checkpoint — any documents added after the last checkpoint but
before the crash are discarded, and the output files are rewound to a
consistent state.
builder = impact_index.DocumentStoreBuilder(
"/path/to/store",
checkpoint_frequency=10_000, # checkpoint every 10k documents
)
for doc in documents:
builder.add(doc.keys, doc.content)
builder.build() # clears the checkpoint on success
# On resume after a crash:
builder = impact_index.DocumentStoreBuilder(
"/path/to/store",
checkpoint_frequency=10_000,
)
print(builder.num_documents()) # docs restored from the last checkpoint
# Continue adding from wherever your source left off, then build().
Call builder.checkpoint() directly to force a checkpoint at an
arbitrary point (e.g. before exiting normally so that no work is lost).
Passing checkpoint_frequency=0 (the default) disables checkpointing
and truncates any stale checkpoint on open.
Retrieving documents¶
Load a store with load() and retrieve
Document objects by number or key. Each document
has keys (metadata dict) and
content (bytes):
store = impact_index.DocumentStore.load(
"/path/to/store",
content_access="memory", # or "mmap" or "disk"
)
print(f"Total documents: {store.num_documents()}")
print(f"Key fields: {store.key_names()}")
# By sequential number (0-based)
docs = store.get_by_number([0, 1, 2])
for doc in docs:
print(doc.keys, doc.content)
# By key field value
docs = store.get_by_key("docno", ["DOC001", "DOC002"])
for doc in docs:
if doc is not None:
print(doc.keys, doc.content)
The content_access parameter controls how content data is accessed:
"memory"— loads all content into RAM (fastest, highest memory)"mmap"— memory-mapped I/O (OS manages caching)"disk"— reads from disk on demand (lowest memory)
Async retrieval¶
docs = await store.aio_get_by_number([0, 1, 2])
docs = await store.aio_get_by_key("docno", ["DOC001", "DOC002"])