embedding-strategies

Comprehensive guide for selecting, implementing, and optimizing embedding models for vector search and RAG applications. Covers 10+ embedding models with dimensions, token limits, and domain specialization (Voyage AI, OpenAI, open-source options for code, finance, legal, and multilingual content) Provides four chunking strategies: token-based, sentence-based, semantic sections, and recursive character splitting with overlap handling Includes three implementation templates for Voyage AI, OpenAI, and local Sentence Transformers with specialized query/document prefixes Features domain-specific pipelines for general documents and code, plus evaluation metrics (precision, recall, MRR, NDCG) for retrieval quality assessment Best practices section covers model selection, preprocessing, batching, caching, and common pitfalls

INSTALLATION
npx skills add https://github.com/wshobson/agents --skill embedding-strategies
Run in your project or agent environment. Adjust flags if your CLI version differs.

SKILL.md

Embedding Strategies

Guide to selecting and optimizing embedding models for vector search applications.

When to Use This Skill

  • Choosing embedding models for RAG
  • Optimizing chunking strategies
  • Fine-tuning embeddings for domains
  • Comparing embedding model performance
  • Reducing embedding dimensions
  • Handling multilingual content

Core Concepts

1. Embedding Model Comparison (2026)

Model

Dimensions

Max Tokens

Best For

voyage-3-large

1024

32000

Claude apps (Anthropic recommended)

voyage-3

1024

32000

Claude apps, cost-effective

voyage-code-3

1024

32000

Code search

voyage-finance-2

1024

32000

Financial documents

voyage-law-2

1024

32000

Legal documents

text-embedding-3-large

3072

8191

OpenAI apps, high accuracy

text-embedding-3-small

1536

8191

OpenAI apps, cost-effective

bge-large-en-v1.5

1024

512

Open source, local deployment

all-MiniLM-L6-v2

384

256

Fast, lightweight

multilingual-e5-large

1024

512

Multi-language

2. Embedding Pipeline

Document → Chunking → Preprocessing → Embedding Model → Vector

                ↓

        [Overlap, Size]  [Clean, Normalize]  [API/Local]

Templates

Template 1: Voyage AI Embeddings (Recommended for Claude)

from langchain_voyageai import VoyageAIEmbeddings

from typing import List

import os

# Initialize Voyage AI embeddings (recommended by Anthropic for Claude)

embeddings = VoyageAIEmbeddings(

    model="voyage-3-large",

    voyage_api_key=os.environ.get("VOYAGE_API_KEY")

)

def get_embeddings(texts: List[str]) -> List[List[float]]:

    """Get embeddings from Voyage AI."""

    return embeddings.embed_documents(texts)

def get_query_embedding(query: str) -> List[float]:

    """Get single query embedding."""

    return embeddings.embed_query(query)

# Specialized models for domains

code_embeddings = VoyageAIEmbeddings(model="voyage-code-3")

finance_embeddings = VoyageAIEmbeddings(model="voyage-finance-2")

legal_embeddings = VoyageAIEmbeddings(model="voyage-law-2")

Template 2: OpenAI Embeddings

from openai import OpenAI

from typing import List

import numpy as np

client = OpenAI()

def get_embeddings(

    texts: List[str],

    model: str = "text-embedding-3-small",

    dimensions: int = None

) -> List[List[float]]:

    """Get embeddings from OpenAI with optional dimension reduction."""

    # Handle batching for large lists

    batch_size = 100

    all_embeddings = []

    for i in range(0, len(texts), batch_size):

        batch = texts[i:i + batch_size]

        kwargs = {"input": batch, "model": model}

        if dimensions:

            # Matryoshka dimensionality reduction

            kwargs["dimensions"] = dimensions

        response = client.embeddings.create(**kwargs)

        embeddings = [item.embedding for item in response.data]

        all_embeddings.extend(embeddings)

    return all_embeddings

def get_embedding(text: str, **kwargs) -> List[float]:

    """Get single embedding."""

    return get_embeddings([text], **kwargs)[0]

# Dimension reduction with Matryoshka embeddings

def get_reduced_embedding(text: str, dimensions: int = 512) -> List[float]:

    """Get embedding with reduced dimensions (Matryoshka)."""

    return get_embedding(

        text,

        model="text-embedding-3-small",

        dimensions=dimensions

    )

Template 3: Local Embeddings with Sentence Transformers

from sentence_transformers import SentenceTransformer

from typing import List, Optional

import numpy as np

class LocalEmbedder:

    """Local embedding with sentence-transformers."""

    def __init__(

        self,

        model_name: str = "BAAI/bge-large-en-v1.5",

        device: str = "cuda"

    ):

        self.model = SentenceTransformer(model_name, device=device)

        self.model_name = model_name

    def embed(

        self,

        texts: List[str],

        normalize: bool = True,

        show_progress: bool = False

    ) -> np.ndarray:

        """Embed texts with optional normalization."""

        embeddings = self.model.encode(

            texts,

            normalize_embeddings=normalize,

            show_progress_bar=show_progress,

            convert_to_numpy=True

        )

        return embeddings

    def embed_query(self, query: str) -> np.ndarray:

        """Embed a query with appropriate prefix for retrieval models."""

        # BGE and similar models benefit from query prefix

        if "bge" in self.model_name.lower():

            query = f"Represent this sentence for searching relevant passages: {query}"

        return self.embed([query])[0]

    def embed_documents(self, documents: List[str]) -> np.ndarray:

        """Embed documents for indexing."""

        return self.embed(documents)

# E5 model with instructions

class E5Embedder:

    def __init__(self, model_name: str = "intfloat/multilingual-e5-large"):

        self.model = SentenceTransformer(model_name)

    def embed_query(self, query: str) -> np.ndarray:

        """E5 requires 'query:' prefix for queries."""

        return self.model.encode(f"query: {query}")

    def embed_document(self, document: str) -> np.ndarray:

        """E5 requires 'passage:' prefix for documents."""

        return self.model.encode(f"passage: {document}")

Template 4: Chunking Strategies

from typing import List, Tuple

import re

def chunk_by_tokens(

    text: str,

    chunk_size: int = 512,

    chunk_overlap: int = 50,

    tokenizer=None

) -> List[str]:

    """Chunk text by token count."""

    import tiktoken

    tokenizer = tokenizer or tiktoken.get_encoding("cl100k_base")

    tokens = tokenizer.encode(text)

    chunks = []

    start = 0

    while start < len(tokens):

        end = start + chunk_size

        chunk_tokens = tokens[start:end]

        chunk_text = tokenizer.decode(chunk_tokens)

        chunks.append(chunk_text)

        start = end - chunk_overlap

    return chunks

def chunk_by_sentences(

    text: str,

    max_chunk_size: int = 1000,

    min_chunk_size: int = 100

) -> List[str]:

    """Chunk text by sentences, respecting size limits."""

    import nltk

    sentences = nltk.sent_tokenize(text)

    chunks = []

    current_chunk = []

    current_size = 0

    for sentence in sentences:

        sentence_size = len(sentence)

        if current_size + sentence_size > max_chunk_size and current_chunk:

            chunks.append(" ".join(current_chunk))

            current_chunk = []

            current_size = 0

        current_chunk.append(sentence)

        current_size += sentence_size

    if current_chunk:

        chunks.append(" ".join(current_chunk))

    return chunks

def chunk_by_semantic_sections(

    text: str,

    headers_pattern: str = r'^#{1,3}\s+.+$'

) -> List[Tuple[str, str]]:

    """Chunk markdown by headers, preserving hierarchy."""

    lines = text.split('\n')

    chunks = []

    current_header = ""

    current_content = []

    for line in lines:

        if re.match(headers_pattern, line, re.MULTILINE):

            if current_content:

                chunks.append((current_header, '\n'.join(current_content)))

            current_header = line

            current_content = []

        else:

            current_content.append(line)

    if current_content:

        chunks.append((current_header, '\n'.join(current_content)))

    return chunks

def recursive_character_splitter(

    text: str,

    chunk_size: int = 1000,

    chunk_overlap: int = 200,

    separators: List[str] = None

) -> List[str]:

    """LangChain-style recursive splitter."""

    separators = separators or ["\n\n", "\n", ". ", " ", ""]

    def split_text(text: str, separators: List[str]) -> List[str]:

        if not text:

            return []

        separator = separators[0]

        remaining_separators = separators[1:]

        if separator == "":

            # Character-level split

            return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size - chunk_overlap)]

        splits = text.split(separator)

        chunks = []

        current_chunk = []

        current_length = 0

        for split in splits:

            split_length = len(split) + len(separator)

            if current_length + split_length > chunk_size and current_chunk:

                chunk_text = separator.join(current_chunk)

                # Recursively split if still too large

                if len(chunk_text) > chunk_size and remaining_separators:

                    chunks.extend(split_text(chunk_text, remaining_separators))

                else:

                    chunks.append(chunk_text)

                # Start new chunk with overlap

                overlap_splits = []

                overlap_length = 0

                for s in reversed(current_chunk):

                    if overlap_length + len(s) <= chunk_overlap:

                        overlap_splits.insert(0, s)

                        overlap_length += len(s)

                    else:

                        break

                current_chunk = overlap_splits

                current_length = overlap_length

            current_chunk.append(split)

            current_length += split_length

        if current_chunk:

            chunks.append(separator.join(current_chunk))

        return chunks

    return split_text(text, separators)

Template 5: Domain-Specific Embedding Pipeline

import re

from typing import List, Optional

from dataclasses import dataclass

@dataclass

class EmbeddedDocument:

    id: str

    document_id: str

    chunk_index: int

    text: str

    embedding: List[float]

    metadata: dict

class DomainEmbeddingPipeline:

    """Pipeline for domain-specific embeddings."""

    def __init__(

        self,

        embedding_model: str = "voyage-3-large",

        chunk_size: int = 512,

        chunk_overlap: int = 50,

        preprocessing_fn=None

    ):

        self.embeddings = VoyageAIEmbeddings(model=embedding_model)

        self.chunk_size = chunk_size

        self.chunk_overlap = chunk_overlap

        self.preprocess = preprocessing_fn or self._default_preprocess

    def _default_preprocess(self, text: str) -> str:

        """Default preprocessing."""

        # Remove excessive whitespace

        text = re.sub(r'\s+', ' ', text)

        # Remove special characters (customize for your domain)

        text = re.sub(r'[^\w\s.,!?-]', '', text)

        return text.strip()

    async def process_documents(

        self,

        documents: List[dict],

        id_field: str = "id",

        content_field: str = "content",

        metadata_fields: Optional[List[str]] = None

    ) -> List[EmbeddedDocument]:

        """Process documents for vector storage."""

        processed = []

        for doc in documents:

            content = doc[content_field]

            doc_id = doc[id_field]

            # Preprocess

            cleaned = self.preprocess(content)

            # Chunk

            chunks = chunk_by_tokens(

                cleaned,

                self.chunk_size,

                self.chunk_overlap

            )

            # Create embeddings

            embeddings = await self.embeddings.aembed_documents(chunks)

            # Create records

            for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):

                metadata = {"document_id": doc_id, "chunk_index": i}

                # Add specified metadata fields

                if metadata_fields:

                    for field in metadata_fields:

                        if field in doc:

                            metadata[field] = doc[field]

                processed.append(EmbeddedDocument(

                    id=f"{doc_id}_chunk_{i}",

                    document_id=doc_id,

                    chunk_index=i,

                    text=chunk,

                    embedding=embedding,

                    metadata=metadata

                ))

        return processed

# Code-specific pipeline

class CodeEmbeddingPipeline:

    """Specialized pipeline for code embeddings."""

    def __init__(self):

        # Use Voyage's code-specific model

        self.embeddings = VoyageAIEmbeddings(model="voyage-code-3")

    def chunk_code(self, code: str, language: str) -> List[dict]:

        """Chunk code by functions/classes using tree-sitter."""

        try:

            import tree_sitter_languages

            parser = tree_sitter_languages.get_parser(language)

            tree = parser.parse(bytes(code, "utf8"))

            chunks = []

            # Extract function and class definitions

            self._extract_nodes(tree.root_node, code, chunks)

            return chunks

        except ImportError:

            # Fallback to simple chunking

            return [{"text": code, "type": "module"}]

    def _extract_nodes(self, node, source_code: str, chunks: list):

        """Recursively extract function/class definitions."""

        if node.type in ['function_definition', 'class_definition', 'method_definition']:

            text = source_code[node.start_byte:node.end_byte]

            chunks.append({

                "text": text,

                "type": node.type,

                "name": self._get_name(node),

                "start_line": node.start_point[0],

                "end_line": node.end_point[0]

            })

        for child in node.children:

            self._extract_nodes(child, source_code, chunks)

    def _get_name(self, node) -> str:

        """Extract name from function/class node."""

        for child in node.children:

            if child.type == 'identifier' or child.type == 'name':

                return child.text.decode('utf8')

        return "unknown"

    async def embed_with_context(

        self,

        chunk: str,

        context: str = ""

    ) -> List[float]:

        """Embed code with surrounding context."""

        if context:

            combined = f"Context: {context}\n\nCode:\n{chunk}"

        else:

            combined = chunk

        return await self.embeddings.aembed_query(combined)

Template 6: Embedding Quality Evaluation

import numpy as np

from typing import List, Dict

def evaluate_retrieval_quality(

    queries: List[str],

    relevant_docs: List[List[str]],  # List of relevant doc IDs per query

    retrieved_docs: List[List[str]],  # List of retrieved doc IDs per query

    k: int = 10

) -> Dict[str, float]:

    """Evaluate embedding quality for retrieval."""

    def precision_at_k(relevant: set, retrieved: List[str], k: int) -> float:

        retrieved_k = retrieved[:k]

        relevant_retrieved = len(set(retrieved_k) &#x26; relevant)

        return relevant_retrieved / k if k > 0 else 0

    def recall_at_k(relevant: set, retrieved: List[str], k: int) -> float:

        retrieved_k = retrieved[:k]

        relevant_retrieved = len(set(retrieved_k) &#x26; relevant)

        return relevant_retrieved / len(relevant) if relevant else 0

    def mrr(relevant: set, retrieved: List[str]) -> float:

        for i, doc in enumerate(retrieved):

            if doc in relevant:

                return 1 / (i + 1)

        return 0

    def ndcg_at_k(relevant: set, retrieved: List[str], k: int) -> float:

        dcg = sum(

            1 / np.log2(i + 2) if doc in relevant else 0

            for i, doc in enumerate(retrieved[:k])

        )

        ideal_dcg = sum(1 / np.log2(i + 2) for i in range(min(len(relevant), k)))

        return dcg / ideal_dcg if ideal_dcg > 0 else 0

    metrics = {

        f"precision@{k}": [],

        f"recall@{k}": [],

        "mrr": [],

        f"ndcg@{k}": []

    }

    for relevant, retrieved in zip(relevant_docs, retrieved_docs):

        relevant_set = set(relevant)

        metrics[f"precision@{k}"].append(precision_at_k(relevant_set, retrieved, k))

        metrics[f"recall@{k}"].append(recall_at_k(relevant_set, retrieved, k))

        metrics["mrr"].append(mrr(relevant_set, retrieved))

        metrics[f"ndcg@{k}"].append(ndcg_at_k(relevant_set, retrieved, k))

    return {name: np.mean(values) for name, values in metrics.items()}

def compute_embedding_similarity(

    embeddings1: np.ndarray,

    embeddings2: np.ndarray,

    metric: str = "cosine"

) -> np.ndarray:

    """Compute similarity matrix between embedding sets."""

    if metric == "cosine":

        # Normalize and compute dot product

        norm1 = embeddings1 / np.linalg.norm(embeddings1, axis=1, keepdims=True)

        norm2 = embeddings2 / np.linalg.norm(embeddings2, axis=1, keepdims=True)

        return norm1 @ norm2.T

    elif metric == "euclidean":

        from scipy.spatial.distance import cdist

        return -cdist(embeddings1, embeddings2, metric='euclidean')

    elif metric == "dot":

        return embeddings1 @ embeddings2.T

    else:

        raise ValueError(f"Unknown metric: {metric}")

def compare_embedding_models(

    texts: List[str],

    models: Dict[str, callable],

    queries: List[str],

    relevant_indices: List[List[int]],

    k: int = 5

) -> Dict[str, Dict[str, float]]:

    """Compare multiple embedding models on retrieval quality."""

    results = {}

    for model_name, embed_fn in models.items():

        # Embed all texts

        doc_embeddings = np.array(embed_fn(texts))

        retrieved_per_query = []

        for query in queries:

            query_embedding = np.array(embed_fn([query])[0])

            # Compute similarities

            similarities = compute_embedding_similarity(

                query_embedding.reshape(1, -1),

                doc_embeddings,

                metric="cosine"

            )[0]

            # Get top-k indices

            top_k_indices = np.argsort(similarities)[::-1][:k]

            retrieved_per_query.append([str(i) for i in top_k_indices])

        # Convert relevant indices to string IDs

        relevant_docs = [[str(i) for i in indices] for indices in relevant_indices]

        results[model_name] = evaluate_retrieval_quality(

            queries, relevant_docs, retrieved_per_query, k

        )

    return results

Best Practices

Do's

  • Match model to use case: Code vs prose vs multilingual
  • Chunk thoughtfully: Preserve semantic boundaries
  • Normalize embeddings: For cosine similarity search
  • Batch requests: More efficient than one-by-one
  • Cache embeddings: Avoid recomputing for static content
  • Use Voyage AI for Claude apps: Recommended by Anthropic

Don'ts

  • Don't ignore token limits: Truncation loses information
  • Don't mix embedding models: Incompatible vector spaces
  • Don't skip preprocessing: Garbage in, garbage out
  • Don't over-chunk: Lose important context
  • Don't forget metadata: Essential for filtering and debugging
BrowserAct

Let your agent run on any real-world website

Bypass CAPTCHA & anti-bot for free. Start local, scale to cloud.

Explore BrowserAct Skills →

Stop writing automation&scrapers

Install the CLI. Run your first Skill in 30 seconds. Scale when you're ready.

Start free
free · no credit card