User Guide

Building an Index

Use IndexBuilder to create a sparse index from document impact vectors. Each document is represented as a set of term indices with associated impact values.

import numpy as np
import impact_index

builder = impact_index.IndexBuilder("/path/to/index")

# Add documents: docid, term_indices, impact_values
terms = np.array([0, 5, 42], dtype=np.uintp)
values = np.array([1.2, 0.5, 3.1], dtype=np.float32)
builder.add(0, terms, values)

# More documents...
builder.add(1, np.array([2, 5, 8], dtype=np.uintp),
            np.array([0.3, 0.9, 1.1], dtype=np.float32))

# Finalize and get a searchable index
index = builder.build(in_memory=True)

Builder options

Use BuilderOptions to control checkpointing (for crash recovery) and memory usage:

options = impact_index.BuilderOptions()
options.checkpoint_frequency = 100000   # checkpoint every N documents
options.in_memory_threshold = 1000000   # max postings per term before flush

builder = impact_index.IndexBuilder("/path/to/index", options=options)

# Resume from a checkpoint (returns None if no checkpoint exists)
last_docid = builder.get_checkpoint_doc_id()
if last_docid is not None:
    print(f"Resuming from document {last_docid}")

Storage dtype

By default, impact values are stored as float32. You can choose a different on-disk type to trade precision for space:

# Use float16 for smaller indices
builder = impact_index.IndexBuilder("/path/to/index", dtype="float16")

Supported dtypes: "float32" (default), "float16", "bfloat16", "float64", "int32", "int64".

Searching

Load an existing index and search it with WAND or MaxScore. Both return a list of ScoredDocument:

import impact_index

index = impact_index.Index.load("/path/to/index", in_memory=True)

# Query: {term_index: query_weight}
query = {5: 1.0, 10: 0.5, 42: 1.5}

# WAND algorithm
results = index.search_wand(query, top_k=10)
for doc in results:
    print(f"Document {doc.docid}: {doc.score}")

# MaxScore algorithm (often faster on compressed/split indices)
results = index.search_maxscore(query, top_k=10)

Iterating over postings

You can inspect individual posting lists. Each element is a TermImpact:

iterator = index.postings(term_id)
print(f"Length: {iterator.length()}")
print(f"Max impact: {iterator.max_value()}")
print(f"Max doc ID: {iterator.max_doc_id()}")

for posting in iterator:
    print(f"Doc {posting.docid}: {posting.value}")

BM25 and Bag-of-Words Indexing

For traditional IR with BM25 scoring, use BOWIndexBuilder instead of IndexBuilder. It automatically tracks document lengths and optionally integrates text analysis (tokenization + stemming).

Pre-tokenized input

If you already have term indices and term-frequency values:

import numpy as np
import impact_index

builder = impact_index.BOWIndexBuilder("/path/to/index", dtype="int32")

# Add documents: docid, term_indices, tf_values
terms = np.array([0, 5, 42], dtype=np.uintp)
tf = np.array([3, 1, 2], dtype=np.int32)
builder.add(0, terms, tf)

builder.add(1, np.array([2, 5, 8], dtype=np.uintp),
            np.array([1, 4, 1], dtype=np.int32))

# Build returns searchable Index (doc metadata stored automatically)
index = builder.build(in_memory=True)

# Create a BM25-scored index (doc lengths loaded automatically)
scored = index.with_scoring(impact_index.BM25Scoring(k1=1.2, b=0.75))

# Search with MaxScore (fastest algorithm)
query = {0: 1.0, 5: 1.0}
results = scored.search_maxscore(query, top_k=10)
for doc in results:
    print(f"Document {doc.docid}: {doc.score}")

Raw text input with stemming

For direct text indexing with automatic tokenization, stemming, and vocabulary management:

import impact_index

builder = impact_index.BOWIndexBuilder(
    "/path/to/index",
    dtype="int32",
    stemmer="porter",  # Lucene-compatible Porter stemmer
    stop_words=True,   # Lucene default English stop words
)

builder.add_text(0, "the quick brown fox jumps over the lazy dog")
builder.add_text(1, "a quick brown cat jumps high")
builder.add_text(2, "the lazy dog sleeps all day")

# Build index (doc metadata and analyzer config saved automatically)
index = builder.build(in_memory=True)

# BM25 scoring (doc lengths loaded automatically from index)
scored = index.with_scoring(impact_index.BM25Scoring())

# Query analysis (analyzer loaded automatically from index)
query = index.analyzer().analyze_query("quick fox")
results = scored.search_maxscore(query, top_k=10)

Stop words

Stop words (common words like “the”, “is”, “a”) can be filtered during indexing and querying to reduce index size and improve search speed. Built-in stop word lists from Lucene are available for 17 languages.

import impact_index

# Use default stop words for the language (matches Lucene/Pyserini)
builder = impact_index.BOWIndexBuilder(
    "/path/to/index",
    stemmer="snowball",
    language="english",
    stop_words=True,
)

# Or provide an explicit list
builder = impact_index.BOWIndexBuilder(
    "/path/to/index",
    stemmer="snowball",
    stop_words=["the", "a", "is", "in"],
)

# Get the stop word list for any supported language
words = impact_index.get_stop_words("english")   # 33 words (Lucene default)
words = impact_index.get_stop_words("french")     # 154 words
words = impact_index.get_stop_words("german")     # 231 words

Supported languages: arabic, danish, dutch, english, finnish, french, german, greek, hungarian, italian, norwegian, portuguese, romanian, russian, spanish, swedish, turkish.

Note

For fair comparison with Pyserini/Lucene, always enable stop words. Without them, high-frequency terms like “the” create very long posting lists that slow down search significantly.

Loading a saved index

The index automatically detects and loads auxiliary components (doc metadata, analyzer config, vocabulary) from the directory:

import impact_index

index = impact_index.Index.load("/path/to/index", in_memory=True)

# Doc metadata and analyzer are loaded automatically
scored = index.with_scoring(impact_index.BM25Scoring())
analyzer = index.analyzer()
query = analyzer.analyze_query("quick fox")
results = scored.search_maxscore(query, top_k=10)

Compression and Transforms

Compressed indices use PFOR-delta for doc IDs and adaptive bitpacking for values, with 128-posting blocks that enable block-max pruning.

Quick compression

The simplest way to compress an index:

import impact_index

index = impact_index.Index.load("/path/to/raw_index", in_memory=True)

# Compress with defaults (PFOR doc IDs, lossless integer TF, block_size=128)
compressed = index.compress("/path/to/compressed")

# The compressed index is standalone — includes vocab, docmeta, analyzer
scored = compressed.with_scoring(impact_index.BM25Scoring())

# For neural IR with float impacts, use quantization:
compressed = index.compress("/path/to/compressed", nbits=8)

The compressed index is fully standalone: auxiliary files (vocabulary, analyzer config, document metadata) are automatically copied from the source index.

The default settings are optimized for BM25:

  • block_size=128: aligns with SIMD registers and enables effective block-max pruning during search.

  • nbits=0 (default): lossless integer bitpacking for TF counts (~2-3 bits per value). Use nbits=8 or nbits=16 for quantized float compression (neural IR like SPLADE).

Advanced compression

For full control over compressors, use CompressionTransform:

import impact_index

index = impact_index.Index.load("/path/to/raw_index", in_memory=True)

# Choose compressors
docid_compressor = impact_index.BitPackingCompressor()   # SIMD (recommended)
# docid_compressor = impact_index.EliasFanoCompressor()  # alternative

# Fixed-range quantization (if you know the value range)
impact_compressor = impact_index.ImpactQuantizer(nbits=8, min=0.0, max=10.0)

# Or auto-ranging quantization (determines range from the index)
impact_compressor = impact_index.GlobalImpactQuantizer(nbits=8)

# Apply compression
transform = impact_index.CompressionTransform(
    max_block_size=128,
    doc_ids_compressor=docid_compressor,
    impacts_compressor=impact_compressor,
)
transform.process("/path/to/compressed", index)

# Load the compressed index
compressed = impact_index.Index.load("/path/to/compressed", in_memory=True)

Splitting by quantiles

SplitIndexTransform partitions each term’s postings into sub-lists by value ranges, enabling more aggressive pruning with MaxScore:

base_transform = impact_index.CompressionTransform(
    max_block_size=128,
    doc_ids_compressor=impact_index.EliasFanoCompressor(),
    impacts_compressor=impact_index.GlobalImpactQuantizer(nbits=8),
)

split_transform = impact_index.SplitIndexTransform(
    quantiles=[0.5, 0.9],   # split at 50th and 90th percentile
    sink=base_transform,
)
split_transform.process("/path/to/split_index", index)

BMP (Block-Max Pruning)

BMP implements “Faster Learned Sparse Retrieval with Block-Max Pruning” (SIGIR 2024) for fast approximate search.

Converting to BMP format

import impact_index

index = impact_index.Index.load("/path/to/index", in_memory=True)

# Streaming conversion (recommended — memory-efficient)
index.to_bmp_streaming("/path/to/bmp_index.bin", bsize=64, compress_range=True)

# Or legacy method (loads all postings into memory)
# index.to_bmp("/path/to/bmp_index.bin", bsize=64, compress_range=True)

Searching with BMP

Load a BMP index with BmpSearcher and search:

searcher = impact_index.BmpSearcher("/path/to/bmp_index.bin")
print(f"Documents: {searcher.num_documents()}")

# Query uses string term IDs
query = {"term1": 1.0, "term2": 0.5}
doc_ids, scores = searcher.search(query, k=10, alpha=1.0, beta=1.0)
for docid, score in zip(doc_ids, scores):
    print(f"{docid}: {score}")

BMP search parameters:

  • k — number of results to return

  • alpha — controls early termination aggressiveness (default: 1.0)

  • beta — controls block skipping (default: 1.0)

Document Store

The document store provides compressed storage for document content and metadata, using zstd block compression. Documents can be retrieved by sequential number or by key fields.

Building a store

Use DocumentStoreBuilder to create a store:

import impact_index

builder = impact_index.DocumentStoreBuilder(
    "/path/to/store",
    block_size=4096,    # documents per compressed block
    zstd_level=3,       # compression level
)

# Add documents with key-value metadata and binary content
builder.add({"docno": "DOC001", "url": "http://example.com"}, b"document text here")
builder.add({"docno": "DOC002", "url": "http://example.com/2"}, b"another document")

# Finalize (can only be called once)
builder.build()

Resumable builds (crash recovery)

For long-running ingests, pass checkpoint_frequency to periodically persist the in-flight builder state. If the process crashes or exits before build(), re-instantiating the builder against the same folder (with the same non-zero checkpoint_frequency) will resume from the last checkpoint — any documents added after the last checkpoint but before the crash are discarded, and the output files are rewound to a consistent state.

builder = impact_index.DocumentStoreBuilder(
    "/path/to/store",
    checkpoint_frequency=10_000,  # checkpoint every 10k documents
)

for doc in documents:
    builder.add(doc.keys, doc.content)

builder.build()  # clears the checkpoint on success

# On resume after a crash:
builder = impact_index.DocumentStoreBuilder(
    "/path/to/store",
    checkpoint_frequency=10_000,
)
print(builder.num_documents())  # docs restored from the last checkpoint
# Continue adding from wherever your source left off, then build().

Call builder.checkpoint() directly to force a checkpoint at an arbitrary point (e.g. before exiting normally so that no work is lost). Passing checkpoint_frequency=0 (the default) disables checkpointing and truncates any stale checkpoint on open.

Retrieving documents

Load a store with load() and retrieve Document objects by number or key. Each document has keys (metadata dict) and content (bytes):

store = impact_index.DocumentStore.load(
    "/path/to/store",
    content_access="memory",  # or "mmap" or "disk"
)

print(f"Total documents: {store.num_documents()}")
print(f"Key fields: {store.key_names()}")

# By sequential number (0-based)
docs = store.get_by_number([0, 1, 2])
for doc in docs:
    print(doc.keys, doc.content)

# By key field value
docs = store.get_by_key("docno", ["DOC001", "DOC002"])
for doc in docs:
    if doc is not None:
        print(doc.keys, doc.content)

The content_access parameter controls how content data is accessed:

  • "memory" — loads all content into RAM (fastest, highest memory)

  • "mmap" — memory-mapped I/O (OS manages caching)

  • "disk" — reads from disk on demand (lowest memory)

Async retrieval

docs = await store.aio_get_by_number([0, 1, 2])
docs = await store.aio_get_by_key("docno", ["DOC001", "DOC002"])