vector-index-tuning

Optimize vector index performance across latency, recall, and memory tradeoffs. Covers HNSW parameter tuning (M, efConstruction, efSearch) with benchmarking templates and automated recommendation logic based on vector count and target recall Includes quantization strategies: scalar INT8, product quantization, binary quantization, and FP16 compression with memory estimation tools Provides Qdrant collection configuration templates optimized for three scenarios: recall-focused, speed-focused, balanced, and memory-constrained deployments Includes search performance monitoring, latency profiling (p50/p95/p99), and recall measurement against ground truth

INSTALLATION
npx skills add https://github.com/wshobson/agents --skill vector-index-tuning
Run in your project or agent environment. Adjust flags if your CLI version differs.

SKILL.md

Vector Index Tuning

Guide to optimizing vector indexes for production performance.

When to Use This Skill

  • Tuning HNSW parameters
  • Implementing quantization
  • Optimizing memory usage
  • Reducing search latency
  • Balancing recall vs speed
  • Scaling to billions of vectors

Core Concepts

1. Index Type Selection

Data Size           Recommended Index

────────────────────────────────────────

< 10K vectors  →    Flat (exact search)

10K - 1M       →    HNSW

1M - 100M      →    HNSW + Quantization

> 100M         →    IVF + PQ or DiskANN

2. HNSW Parameters

Parameter

Default

Effect

M

16

Connections per node, ↑ = better recall, more memory

efConstruction

100

Build quality, ↑ = better index, slower build

efSearch

50

Search quality, ↑ = better recall, slower search

3. Quantization Types

Full Precision (FP32): 4 bytes × dimensions

Half Precision (FP16): 2 bytes × dimensions

INT8 Scalar:           1 byte × dimensions

Product Quantization:  ~32-64 bytes total

Binary:                dimensions/8 bytes

Templates

Template 1: HNSW Parameter Tuning

import numpy as np

from typing import List, Tuple

import time

def benchmark_hnsw_parameters(

    vectors: np.ndarray,

    queries: np.ndarray,

    ground_truth: np.ndarray,

    m_values: List[int] = [8, 16, 32, 64],

    ef_construction_values: List[int] = [64, 128, 256],

    ef_search_values: List[int] = [32, 64, 128, 256]

) -> List[dict]:

    """Benchmark different HNSW configurations."""

    import hnswlib

    results = []

    dim = vectors.shape[1]

    n = vectors.shape[0]

    for m in m_values:

        for ef_construction in ef_construction_values:

            # Build index

            index = hnswlib.Index(space='cosine', dim=dim)

            index.init_index(max_elements=n, M=m, ef_construction=ef_construction)

            build_start = time.time()

            index.add_items(vectors)

            build_time = time.time() - build_start

            # Get memory usage

            memory_bytes = index.element_count * (

                dim * 4 +  # Vector storage

                m * 2 * 4  # Graph edges (approximate)

            )

            for ef_search in ef_search_values:

                index.set_ef(ef_search)

                # Measure search

                search_start = time.time()

                labels, distances = index.knn_query(queries, k=10)

                search_time = time.time() - search_start

                # Calculate recall

                recall = calculate_recall(labels, ground_truth, k=10)

                results.append({

                    "M": m,

                    "ef_construction": ef_construction,

                    "ef_search": ef_search,

                    "build_time_s": build_time,

                    "search_time_ms": search_time * 1000 / len(queries),

                    "recall@10": recall,

                    "memory_mb": memory_bytes / 1024 / 1024

                })

    return results

def calculate_recall(predictions: np.ndarray, ground_truth: np.ndarray, k: int) -> float:

    """Calculate recall@k."""

    correct = 0

    for pred, truth in zip(predictions, ground_truth):

        correct += len(set(pred[:k]) &#x26; set(truth[:k]))

    return correct / (len(predictions) * k)

def recommend_hnsw_params(

    num_vectors: int,

    target_recall: float = 0.95,

    max_latency_ms: float = 10,

    available_memory_gb: float = 8

) -> dict:

    """Recommend HNSW parameters based on requirements."""

    # Base recommendations

    if num_vectors < 100_000:

        m = 16

        ef_construction = 100

    elif num_vectors < 1_000_000:

        m = 32

        ef_construction = 200

    else:

        m = 48

        ef_construction = 256

    # Adjust ef_search based on recall target

    if target_recall >= 0.99:

        ef_search = 256

    elif target_recall >= 0.95:

        ef_search = 128

    else:

        ef_search = 64

    return {

        "M": m,

        "ef_construction": ef_construction,

        "ef_search": ef_search,

        "notes": f"Estimated for {num_vectors:,} vectors, {target_recall:.0%} recall"

    }

Template 2: Quantization Strategies

import numpy as np

from typing import Optional

class VectorQuantizer:

    """Quantization strategies for vector compression."""

    @staticmethod

    def scalar_quantize_int8(

        vectors: np.ndarray,

        min_val: Optional[float] = None,

        max_val: Optional[float] = None

    ) -> Tuple[np.ndarray, dict]:

        """Scalar quantization to INT8."""

        if min_val is None:

            min_val = vectors.min()

        if max_val is None:

            max_val = vectors.max()

        # Scale to 0-255 range

        scale = 255.0 / (max_val - min_val)

        quantized = np.clip(

            np.round((vectors - min_val) * scale),

            0, 255

        ).astype(np.uint8)

        params = {"min_val": min_val, "max_val": max_val, "scale": scale}

        return quantized, params

    @staticmethod

    def dequantize_int8(

        quantized: np.ndarray,

        params: dict

    ) -> np.ndarray:

        """Dequantize INT8 vectors."""

        return quantized.astype(np.float32) / params["scale"] + params["min_val"]

    @staticmethod

    def product_quantize(

        vectors: np.ndarray,

        n_subvectors: int = 8,

        n_centroids: int = 256

    ) -> Tuple[np.ndarray, dict]:

        """Product quantization for aggressive compression."""

        from sklearn.cluster import KMeans

        n, dim = vectors.shape

        assert dim % n_subvectors == 0

        subvector_dim = dim // n_subvectors

        codebooks = []

        codes = np.zeros((n, n_subvectors), dtype=np.uint8)

        for i in range(n_subvectors):

            start = i * subvector_dim

            end = (i + 1) * subvector_dim

            subvectors = vectors[:, start:end]

            kmeans = KMeans(n_clusters=n_centroids, random_state=42)

            codes[:, i] = kmeans.fit_predict(subvectors)

            codebooks.append(kmeans.cluster_centers_)

        params = {

            "codebooks": codebooks,

            "n_subvectors": n_subvectors,

            "subvector_dim": subvector_dim

        }

        return codes, params

    @staticmethod

    def binary_quantize(vectors: np.ndarray) -> np.ndarray:

        """Binary quantization (sign of each dimension)."""

        # Convert to binary: positive = 1, negative = 0

        binary = (vectors > 0).astype(np.uint8)

        # Pack bits into bytes

        n, dim = vectors.shape

        packed_dim = (dim + 7) // 8

        packed = np.zeros((n, packed_dim), dtype=np.uint8)

        for i in range(dim):

            byte_idx = i // 8

            bit_idx = i % 8

            packed[:, byte_idx] |= (binary[:, i] << bit_idx)

        return packed

def estimate_memory_usage(

    num_vectors: int,

    dimensions: int,

    quantization: str = "fp32",

    index_type: str = "hnsw",

    hnsw_m: int = 16

) -> dict:

    """Estimate memory usage for different configurations."""

    # Vector storage

    bytes_per_dimension = {

        "fp32": 4,

        "fp16": 2,

        "int8": 1,

        "pq": 0.05,  # Approximate

        "binary": 0.125

    }

    vector_bytes = num_vectors * dimensions * bytes_per_dimension[quantization]

    # Index overhead

    if index_type == "hnsw":

        # Each node has ~M*2 edges, each edge is 4 bytes (int32)

        index_bytes = num_vectors * hnsw_m * 2 * 4

    elif index_type == "ivf":

        # Inverted lists + centroids

        index_bytes = num_vectors * 8 + 65536 * dimensions * 4

    else:

        index_bytes = 0

    total_bytes = vector_bytes + index_bytes

    return {

        "vector_storage_mb": vector_bytes / 1024 / 1024,

        "index_overhead_mb": index_bytes / 1024 / 1024,

        "total_mb": total_bytes / 1024 / 1024,

        "total_gb": total_bytes / 1024 / 1024 / 1024

    }

Template 3: Qdrant Index Configuration

from qdrant_client import QdrantClient

from qdrant_client.http import models

def create_optimized_collection(

    client: QdrantClient,

    collection_name: str,

    vector_size: int,

    num_vectors: int,

    optimize_for: str = "balanced"  # "recall", "speed", "memory"

) -> None:

    """Create collection with optimized settings."""

    # HNSW configuration based on optimization target

    hnsw_configs = {

        "recall": models.HnswConfigDiff(m=32, ef_construct=256),

        "speed": models.HnswConfigDiff(m=16, ef_construct=64),

        "balanced": models.HnswConfigDiff(m=16, ef_construct=128),

        "memory": models.HnswConfigDiff(m=8, ef_construct=64)

    }

    # Quantization configuration

    quantization_configs = {

        "recall": None,  # No quantization for max recall

        "speed": models.ScalarQuantization(

            scalar=models.ScalarQuantizationConfig(

                type=models.ScalarType.INT8,

                quantile=0.99,

                always_ram=True

            )

        ),

        "balanced": models.ScalarQuantization(

            scalar=models.ScalarQuantizationConfig(

                type=models.ScalarType.INT8,

                quantile=0.99,

                always_ram=False

            )

        ),

        "memory": models.ProductQuantization(

            product=models.ProductQuantizationConfig(

                compression=models.CompressionRatio.X16,

                always_ram=False

            )

        )

    }

    # Optimizer configuration

    optimizer_configs = {

        "recall": models.OptimizersConfigDiff(

            indexing_threshold=10000,

            memmap_threshold=50000

        ),

        "speed": models.OptimizersConfigDiff(

            indexing_threshold=5000,

            memmap_threshold=20000

        ),

        "balanced": models.OptimizersConfigDiff(

            indexing_threshold=20000,

            memmap_threshold=50000

        ),

        "memory": models.OptimizersConfigDiff(

            indexing_threshold=50000,

            memmap_threshold=10000  # Use disk sooner

        )

    }

    client.create_collection(

        collection_name=collection_name,

        vectors_config=models.VectorParams(

            size=vector_size,

            distance=models.Distance.COSINE

        ),

        hnsw_config=hnsw_configs[optimize_for],

        quantization_config=quantization_configs[optimize_for],

        optimizers_config=optimizer_configs[optimize_for]

    )

def tune_search_parameters(

    client: QdrantClient,

    collection_name: str,

    target_recall: float = 0.95

) -> dict:

    """Tune search parameters for target recall."""

    # Search parameter recommendations

    if target_recall >= 0.99:

        search_params = models.SearchParams(

            hnsw_ef=256,

            exact=False,

            quantization=models.QuantizationSearchParams(

                ignore=True,  # Don't use quantization for search

                rescore=True

            )

        )

    elif target_recall >= 0.95:

        search_params = models.SearchParams(

            hnsw_ef=128,

            exact=False,

            quantization=models.QuantizationSearchParams(

                ignore=False,

                rescore=True,

                oversampling=2.0

            )

        )

    else:

        search_params = models.SearchParams(

            hnsw_ef=64,

            exact=False,

            quantization=models.QuantizationSearchParams(

                ignore=False,

                rescore=False

            )

        )

    return search_params

Template 4: Performance Monitoring

import time

from dataclasses import dataclass

from typing import List

import numpy as np

@dataclass

class SearchMetrics:

    latency_p50_ms: float

    latency_p95_ms: float

    latency_p99_ms: float

    recall: float

    qps: float

class VectorSearchMonitor:

    """Monitor vector search performance."""

    def __init__(self, ground_truth_fn=None):

        self.latencies = []

        self.recalls = []

        self.ground_truth_fn = ground_truth_fn

    def measure_search(

        self,

        search_fn,

        query_vectors: np.ndarray,

        k: int = 10,

        num_iterations: int = 100

    ) -> SearchMetrics:

        """Benchmark search performance."""

        latencies = []

        for _ in range(num_iterations):

            for query in query_vectors:

                start = time.perf_counter()

                results = search_fn(query, k=k)

                latency = (time.perf_counter() - start) * 1000

                latencies.append(latency)

        latencies = np.array(latencies)

        total_queries = num_iterations * len(query_vectors)

        total_time = sum(latencies) / 1000  # seconds

        return SearchMetrics(

            latency_p50_ms=np.percentile(latencies, 50),

            latency_p95_ms=np.percentile(latencies, 95),

            latency_p99_ms=np.percentile(latencies, 99),

            recall=self._calculate_recall(search_fn, query_vectors, k) if self.ground_truth_fn else 0,

            qps=total_queries / total_time

        )

    def _calculate_recall(self, search_fn, queries: np.ndarray, k: int) -> float:

        """Calculate recall against ground truth."""

        if not self.ground_truth_fn:

            return 0

        correct = 0

        total = 0

        for query in queries:

            predicted = set(search_fn(query, k=k))

            actual = set(self.ground_truth_fn(query, k=k))

            correct += len(predicted &#x26; actual)

            total += k

        return correct / total

def profile_index_build(

    build_fn,

    vectors: np.ndarray,

    batch_sizes: List[int] = [1000, 10000, 50000]

) -> dict:

    """Profile index build performance."""

    results = {}

    for batch_size in batch_sizes:

        times = []

        for i in range(0, len(vectors), batch_size):

            batch = vectors[i:i + batch_size]

            start = time.perf_counter()

            build_fn(batch)

            times.append(time.perf_counter() - start)

        results[batch_size] = {

            "avg_batch_time_s": np.mean(times),

            "vectors_per_second": batch_size / np.mean(times)

        }

    return results

Best Practices

Do's

  • Benchmark with real queries - Synthetic may not represent production
  • Monitor recall continuously - Can degrade with data drift
  • Start with defaults - Tune only when needed
  • Use quantization - Significant memory savings
  • Consider tiered storage - Hot/cold data separation

Don'ts

  • Don't over-optimize early - Profile first
  • Don't ignore build time - Index updates have cost
  • Don't forget reindexing - Plan for maintenance
  • Don't skip warming - Cold indexes are slow
BrowserAct

Let your agent run on any real-world website

Bypass CAPTCHA & anti-bot for free. Start local, scale to cloud.

Explore BrowserAct Skills →

Stop writing automation&scrapers

Install the CLI. Run your first Skill in 30 seconds. Scale when you're ready.

Start free
free · no credit card