session-compression

AI session compression techniques for managing multi-turn conversations efficiently through summarization, embedding-based retrieval, and intelligent context…

INSTALLATION
npx skills add https://github.com/bobmatnyc/claude-mpm-skills --skill session-compression
Run in your project or agent environment. Adjust flags if your CLI version differs.

SKILL.md

AI Session Compression Techniques

Summary

Compress long AI conversations to fit context windows while preserving critical information.

Session compression enables production AI applications to manage multi-turn conversations efficiently by reducing token usage by 70-95% through summarization, embedding-based retrieval, and intelligent context management. Achieve 3-20x compression ratios with minimal performance degradation.

Key Benefits:

  • Cost Reduction: 80-90% token cost savings through hierarchical memory
  • Performance: 2x faster responses with compressed context
  • Scalability: Handle conversations exceeding 1M tokens
  • Quality: Preserve critical information with <2% accuracy loss

When to Use

Use session compression when:

  • Multi-turn conversations approach context window limits (>50% capacity)
  • Long-running chat sessions (customer support, tutoring, code assistants)
  • Token costs become significant (high-volume applications)
  • Response latency increases due to large context
  • Managing conversation history across multiple sessions

Don't use when:

  • Short conversations (<10 turns) fitting easily in context
  • Every detail must be preserved verbatim (legal, compliance)
  • Single-turn or stateless interactions
  • Context window usage is <30%

Ideal scenarios:

  • Chatbots with 50+ turn conversations
  • AI code assistants tracking long development sessions
  • Customer support with multi-session ticket history
  • Educational tutors with student progress tracking
  • Multi-day collaborative AI workflows

Quick Start

Basic Setup with LangChain

from langchain.memory import ConversationSummaryBufferMemory

from langchain_anthropic import ChatAnthropic

from anthropic import Anthropic

# Initialize Claude client

llm = ChatAnthropic(

    model="claude-3-5-sonnet-20241022",

    api_key="your-api-key"

)

# Setup memory with automatic summarization

memory = ConversationSummaryBufferMemory(

    llm=llm,

    max_token_limit=2000,  # Summarize when exceeding this

    return_messages=True

)

# Add conversation turns

memory.save_context(

    {"input": "What's session compression?"},

    {"output": "Session compression reduces conversation token usage..."}

)

# Retrieve compressed context

context = memory.load_memory_variables({})

Progressive Compression Pattern

from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

class ProgressiveCompressor:

    def __init__(self, thresholds=[0.70, 0.85, 0.95]):

        self.thresholds = thresholds

        self.messages = []

        self.max_tokens = 200000  # Claude context window

    def add_message(self, role: str, content: str):

        self.messages.append({"role": role, "content": content})

        # Check if compression needed

        current_usage = self._estimate_tokens()

        usage_ratio = current_usage / self.max_tokens

        if usage_ratio >= self.thresholds[0]:

            self._compress(level=self._get_compression_level(usage_ratio))

    def _estimate_tokens(self):

        return sum(len(m["content"]) // 4 for m in self.messages)

    def _get_compression_level(self, ratio):

        for i, threshold in enumerate(self.thresholds):

            if ratio < threshold:

                return i

        return len(self.thresholds)

    def _compress(self, level: int):

        """Apply compression based on severity level."""

        if level == 1:  # 70% threshold: Light compression

            self._remove_redundant_messages()

        elif level == 2:  # 85% threshold: Medium compression

            self._summarize_old_messages(keep_recent=10)

        else:  # 95% threshold: Aggressive compression

            self._summarize_old_messages(keep_recent=5)

    def _remove_redundant_messages(self):

        """Remove duplicate or low-value messages."""

        # Implementation: Use semantic deduplication

        pass

    def _summarize_old_messages(self, keep_recent: int):

        """Summarize older messages, keep recent ones verbatim."""

        if len(self.messages) <= keep_recent:

            return

        # Messages to summarize

        to_summarize = self.messages[:-keep_recent]

        recent = self.messages[-keep_recent:]

        # Generate summary

        conversation_text = "\n\n".join([

            f"{m['role'].upper()}: {m['content']}"

            for m in to_summarize

        ])

        response = client.messages.create(

            model="claude-3-5-haiku-20241022",

            max_tokens=500,

            messages=[{

                "role": "user",

                "content": f"Summarize this conversation:\n\n{conversation_text}"

            }]

        )

        # Replace old messages with summary

        summary = {

            "role": "system",

            "content": f"[Summary]\n{response.content[0].text}"

        }

        self.messages = [summary] + recent

# Usage

compressor = ProgressiveCompressor()

for i in range(100):

    compressor.add_message("user", f"Message {i}")

    compressor.add_message("assistant", f"Response {i}")

Using Anthropic Prompt Caching (90% Cost Reduction)

from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

# Build context with cache control

messages = [

    {

        "role": "user",

        "content": [

            {

                "type": "text",

                "text": "Long conversation context here...",

                "cache_control": {"type": "ephemeral"}  # Cache this

            }

        ]

    },

    {

        "role": "assistant",

        "content": "Previous response..."

    },

    {

        "role": "user",

        "content": "New question"  # Not cached, changes frequently

    }

]

response = client.messages.create(

    model="claude-3-5-sonnet-20241022",

    max_tokens=1024,

    messages=messages

)

# Cache hit reduces costs by 90% for cached content

Core Concepts

Context Windows and Token Limits

Context window: Maximum tokens an LLM can process in a single request (input + output).

Current limits (2025):

  • Claude 3.5 Sonnet: 200K tokens (~150K words, ~600 pages)
  • GPT-4 Turbo: 128K tokens (~96K words, ~384 pages)
  • Gemini 1.5 Pro: 2M tokens (~1.5M words, ~6000 pages)

Token estimation:

  • English: ~4 characters per token
  • Code: ~3 characters per token
  • Rule of thumb: 1 token ≈ 0.75 words

Why compression matters:

  • Cost: Claude Sonnet costs $3/$15 per 1M input/output tokens
  • Latency: Larger contexts increase processing time
  • Quality: Excessive context can dilute attention on relevant information

Compression Ratios

Compression ratio = Original tokens / Compressed tokens

Industry benchmarks:

  • Extractive summarization: 2-3x
  • Abstractive summarization: 5-10x
  • Hierarchical summarization: 20x+
  • LLMLingua (prompt compression): 20x with 1.5% accuracy loss
  • KVzip (KV cache compression): 3-4x with 2x speed improvement

Target ratios by use case:

  • Customer support: 5-7x (preserve details)
  • General chat: 8-12x (balance quality/efficiency)
  • Code assistants: 3-5x (preserve technical accuracy)
  • Long documents: 15-20x (extract key insights)

Progressive Compression Thresholds

Industry standard pattern:

Context Usage    Action                     Technique

─────────────────────────────────────────────────────────

0-70%           No compression             Store verbatim

70-85%          Light compression          Remove redundancy

85-95%          Medium compression         Summarize old messages

95-100%         Aggressive compression     Hierarchical + RAG

Implementation guidelines:

  • 70% threshold: Remove duplicate/redundant messages, semantic deduplication
  • 85% threshold: Summarize messages older than 20 turns, keep recent 10-15
  • 95% threshold: Multi-level hierarchical summarization + vector store archival
  • Emergency (100%): Drop least important messages, aggressive summarization

Compression Techniques

1. Summarization Techniques

#### 1.1 Extractive Summarization

Selects key sentences/phrases without modification.

Pros: No hallucination, fast, deterministic

Cons: Limited compression (2-3x), may feel disjointed

Best for: Legal/compliance, short-term compression

from sklearn.feature_extraction.text import TfidfVectorizer

import numpy as np

def extractive_compress(messages: list, compression_ratio: float = 0.3):

    """Extract most important messages using TF-IDF scoring."""

    texts = [msg['content'] for msg in messages]

    # Calculate TF-IDF scores

    vectorizer = TfidfVectorizer()

    tfidf_matrix = vectorizer.fit_transform(texts)

    scores = np.array(tfidf_matrix.sum(axis=1)).flatten()

    # Select top messages

    n_keep = max(1, int(len(messages) * compression_ratio))

    top_indices = sorted(np.argsort(scores)[-n_keep:])

    return [messages[i] for i in top_indices]

#### 1.2 Abstractive Summarization

Uses LLMs to semantically condense conversation history.

Pros: Higher compression (5-10x), coherent, synthesizes information

Cons: Risk of hallucination, higher cost, less deterministic

Best for: General chat, customer support, multi-session continuity

from anthropic import Anthropic

def abstractive_compress(messages: list, client: Anthropic):

    """Generate semantic summary using Claude."""

    conversation_text = "\n\n".join([

        f"{msg['role'].upper()}: {msg['content']}"

        for msg in messages

    ])

    response = client.messages.create(

        model="claude-3-5-sonnet-20241022",

        max_tokens=500,

        messages=[{

            "role": "user",

            "content": f"""Summarize this conversation, preserving:

1. Key decisions made

2. Important context and facts

3. Unresolved questions

4. Action items

Conversation:

{conversation_text}

Summary (aim for 1/5 the original length):"""

        }]

    )

    return {

        "role": "assistant",

        "content": f"[Summary]\n{response.content[0].text}"

    }

#### 1.3 Hierarchical Summarization (Multi-Level)

Creates summaries of summaries in a tree structure.

Pros: Extreme compression (20x+), handles 1M+ token conversations

Cons: Complex implementation, multiple LLM calls, information loss accumulates

Best for: Long-running conversations, multi-session applications

Architecture:

Level 0 (Raw):    [Msg1][Msg2][Msg3][Msg4][Msg5][Msg6][Msg7][Msg8]

Level 1 (Chunk):  [Summary1-2]  [Summary3-4]  [Summary5-6]  [Summary7-8]

Level 2 (Group):  [Summary1-4]              [Summary5-8]

Level 3 (Session): [Overall Session Summary]
from anthropic import Anthropic

from typing import List, Dict

class HierarchicalMemory:

    def __init__(self, client: Anthropic, chunk_size: int = 10):

        self.client = client

        self.chunk_size = chunk_size

        self.levels: List[List[Dict]] = [[]]  # Level 0 = raw messages

    def add_message(self, message: Dict):

        """Add message and trigger summarization if needed."""

        self.levels[0].append(message)

        if len(self.levels[0]) >= self.chunk_size * 2:

            self._summarize_level(0)

    def _summarize_level(self, level: int):

        """Summarize a level into the next higher level."""

        messages = self.levels[level]

        # Ensure next level exists

        while len(self.levels) <= level + 1:

            self.levels.append([])

        # Summarize first chunk

        chunk = messages[:self.chunk_size]

        summary = self._generate_summary(chunk, level)

        # Move to next level

        self.levels[level + 1].append(summary)

        self.levels[level] = messages[self.chunk_size:]

        # Recursively check if next level needs summarization

        if len(self.levels[level + 1]) >= self.chunk_size * 2:

            self._summarize_level(level + 1)

    def _generate_summary(self, messages: List[Dict], level: int) -> Dict:

        """Generate summary for a chunk."""

        conversation_text = "\n\n".join([

            f"{msg['role'].upper()}: {msg['content']}"

            for msg in messages

        ])

        response = self.client.messages.create(

            model="claude-3-5-haiku-20241022",

            max_tokens=300,

            messages=[{

                "role": "user",

                "content": f"Summarize this Level {level} conversation chunk:\n\n{conversation_text}"

            }]

        )

        return {

            "role": "system",

            "content": f"[L{level+1} Summary] {response.content[0].text}",

            "level": level + 1

        }

    def get_context(self, max_tokens: int = 4000) -> List[Dict]:

        """Retrieve context within token budget."""

        context = []

        token_count = 0

        # Prioritize recent raw messages

        for msg in reversed(self.levels[0]):

            msg_tokens = len(msg['content']) // 4

            if token_count + msg_tokens > max_tokens * 0.6:

                break

            context.insert(0, msg)

            token_count += msg_tokens

        # Add summaries from higher levels

        for level in range(1, len(self.levels)):

            for summary in self.levels[level]:

                summary_tokens = len(summary['content']) // 4

                if token_count + summary_tokens > max_tokens:

                    break

                context.insert(0, summary)

                token_count += summary_tokens

        return context

Academic reference: "Recursively Summarizing Enables Long-Term Dialogue Memory in Large Language Models" (arXiv:2308.15022)

#### 1.4 Rolling Summarization (Continuous)

Continuously compresses conversation with sliding window.

Pros: Low latency, predictable token usage, simple

Cons: Early details over-compressed, no information recovery

Best for: Real-time chat, streaming conversations

from anthropic import Anthropic

class RollingMemory:

    def __init__(self, client: Anthropic, window_size: int = 10, compress_threshold: int = 15):

        self.client = client

        self.window_size = window_size

        self.compress_threshold = compress_threshold

        self.rolling_summary = None

        self.recent_messages = []

    def add_message(self, message: dict):

        self.recent_messages.append(message)

        if len(self.recent_messages) >= self.compress_threshold:

            self._compress()

    def _compress(self):

        """Compress older messages into rolling summary."""

        messages_to_compress = self.recent_messages[:-self.window_size]

        parts = []

        if self.rolling_summary:

            parts.append(f"Existing summary:\n{self.rolling_summary}")

        parts.append("\nNew messages:\n" + "\n\n".join([

            f"{msg['role']}: {msg['content']}"

            for msg in messages_to_compress

        ]))

        response = self.client.messages.create(

            model="claude-3-5-haiku-20241022",

            max_tokens=400,

            messages=[{

                "role": "user",

                "content": "\n".join(parts) + "\n\nUpdate the summary:"

            }]

        )

        self.rolling_summary = response.content[0].text

        self.recent_messages = self.recent_messages[-self.window_size:]

    def get_context(self):

        context = []

        if self.rolling_summary:

            context.append({

                "role": "system",

                "content": f"[Summary]\n{self.rolling_summary}"

            })

        context.extend(self.recent_messages)

        return context

2. Embedding-Based Approaches

#### 2.1 RAG (Retrieval-Augmented Generation)

Store full conversation in vector database, retrieve only relevant chunks.

Pros: Extremely scalable, no information loss, high relevance

Cons: Requires vector DB infrastructure, retrieval latency

Best for: Knowledge bases, customer support with large history

from anthropic import Anthropic

from openai import OpenAI

import chromadb

class RAGMemory:

    def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):

        self.anthropic = anthropic_client

        self.openai = openai_client

        # Initialize vector store

        self.chroma = chromadb.Client()

        self.collection = self.chroma.create_collection(

            name="conversation",

            metadata={"hnsw:space": "cosine"}

        )

        self.recent_messages = []

        self.recent_window = 5

        self.message_counter = 0

    def add_message(self, message: dict):

        """Add to recent memory and vector store."""

        self.recent_messages.append(message)

        if len(self.recent_messages) > self.recent_window:

            old_msg = self.recent_messages.pop(0)

            self._store_in_vectordb(old_msg)

    def _store_in_vectordb(self, message: dict):

        """Archive to vector database."""

        # Generate embedding

        response = self.openai.embeddings.create(

            model="text-embedding-3-small",

            input=message['content']

        )

        self.collection.add(

            embeddings=[response.data[0].embedding],

            documents=[message['content']],

            metadatas=[{"role": message['role']}],

            ids=[f"msg_{self.message_counter}"]

        )

        self.message_counter += 1

    def retrieve_context(self, query: str, max_tokens: int = 4000):

        """Retrieve relevant context using RAG."""

        context = []

        token_count = 0

        # 1. Recent messages (short-term memory)

        for msg in self.recent_messages:

            context.append(msg)

            token_count += len(msg['content']) // 4

        # 2. Retrieve relevant historical context

        if token_count < max_tokens:

            query_embedding = self.openai.embeddings.create(

                model="text-embedding-3-small",

                input=query

            )

            n_results = min(10, (max_tokens - token_count) // 100)

            results = self.collection.query(

                query_embeddings=[query_embedding.data[0].embedding],

                n_results=n_results

            )

            for i, doc in enumerate(results['documents'][0]):

                if token_count + len(doc) // 4 > max_tokens:

                    break

                metadata = results['metadatas'][0][i]

                context.insert(0, {

                    "role": metadata['role'],

                    "content": f"[Retrieved] {doc}"

                })

                token_count += len(doc) // 4

        return context

Vector database options:

  • ChromaDB: Embedded, easy local development
  • Pinecone: Managed, 50ms p95 latency
  • Weaviate: Open-source, hybrid search
  • Qdrant: High performance, payload filtering

#### 2.2 Vector Search and Clustering

Group similar messages into clusters, represent with centroids.

Pros: Reduces redundancy, identifies themes, multi-topic handling

Cons: Requires sufficient data, may lose nuances

Best for: Multi-topic conversations, meeting summaries

from sklearn.cluster import KMeans

from openai import OpenAI

import numpy as np

class ClusteredMemory:

    def __init__(self, openai_client: OpenAI, n_clusters: int = 5):

        self.client = openai_client

        self.n_clusters = n_clusters

        self.messages = []

        self.embeddings = []

    def add_messages(self, messages: list):

        for msg in messages:

            self.messages.append(msg)

            response = self.client.embeddings.create(

                model="text-embedding-3-small",

                input=msg['content']

            )

            self.embeddings.append(response.data[0].embedding)

    def compress_by_clustering(self):

        """Cluster messages and return representatives."""

        if len(self.messages) < self.n_clusters:

            return self.messages

        embeddings_array = np.array(self.embeddings)

        kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)

        labels = kmeans.fit_predict(embeddings_array)

        # Select message closest to each centroid

        compressed = []

        for cluster_id in range(self.n_clusters):

            cluster_indices = np.where(labels == cluster_id)[0]

            centroid = kmeans.cluster_centers_[cluster_id]

            cluster_embeddings = embeddings_array[cluster_indices]

            distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)

            closest_idx = cluster_indices[np.argmin(distances)]

            compressed.append({

                **self.messages[closest_idx],

                "cluster_id": int(cluster_id),

                "cluster_size": len(cluster_indices)

            })

        return compressed

#### 2.3 Semantic Deduplication

Remove semantically similar messages that convey redundant information.

Pros: Reduces redundancy without losing unique content

Cons: Requires threshold tuning, O(n²) complexity

Best for: FAQ systems, repetitive conversations

from openai import OpenAI

import numpy as np

from sklearn.metrics.pairwise import cosine_similarity

class SemanticDeduplicator:

    def __init__(self, openai_client: OpenAI, similarity_threshold: float = 0.85):

        self.client = openai_client

        self.threshold = similarity_threshold

    def deduplicate(self, messages: list):

        """Remove semantically similar messages."""

        if len(messages) <= 1:

            return messages

        # Generate embeddings

        embeddings = []

        for msg in messages:

            response = self.client.embeddings.create(

                model="text-embedding-3-small",

                input=msg['content']

            )

            embeddings.append(response.data[0].embedding)

        embeddings_array = np.array(embeddings)

        similarity_matrix = cosine_similarity(embeddings_array)

        # Mark unique messages

        keep_indices = []

        for i in range(len(messages)):

            is_unique = True

            for j in keep_indices:

                if similarity_matrix[i][j] > self.threshold:

                    is_unique = False

                    break

            if is_unique:

                keep_indices.append(i)

        return [messages[i] for i in keep_indices]

3. Token-Efficient Strategies

#### 3.1 Message Prioritization

Assign importance scores and retain only high-priority content.

Pros: Retains most important information, flexible criteria

Cons: Scoring is heuristic-based, may break flow

Best for: Mixed-importance conversations, filtering noise

import re

class MessagePrioritizer:

    def score_message(self, msg: dict, index: int, total: int) -> float:

        """Calculate composite importance score."""

        scores = []

        # Length score (longer = more info)

        scores.append(min(len(msg['content']) / 500, 1.0))

        # Question score

        if msg['role'] == 'user':

            scores.append(min(msg['content'].count('?') * 0.5, 1.0))

        # Entity score (capitalized words)

        entities = len(re.findall(r'\b[A-Z][a-z]+', msg['content']))

        scores.append(min(entities / 10, 1.0))

        # Recency score (linear decay)

        scores.append(index / max(total - 1, 1))

        # Role score

        scores.append(0.6 if msg['role'] == 'user' else 0.4)

        return sum(scores) / len(scores)

    def prioritize(self, messages: list, target_count: int):

        """Select top N messages by priority."""

        scored = [

            (msg, self.score_message(msg, i, len(messages)), i)

            for i, msg in enumerate(messages)

        ]

        scored.sort(key=lambda x: x[1], reverse=True)

        top_messages = scored[:target_count]

        top_messages.sort(key=lambda x: x[2])  # Restore chronological order

        return [msg for msg, score, idx in top_messages]

#### 3.2 Delta Compression

Store only changes between consecutive messages.

Pros: Highly efficient for incremental changes

Cons: Reconstruction overhead, not suitable for all content

Best for: Code assistants with incremental edits

import difflib

class DeltaCompressor:

    def __init__(self):

        self.base_messages = []

        self.deltas = []

    def add_message(self, message: dict):

        if not self.base_messages:

            self.base_messages.append(message)

            return

        # Find most similar previous message

        last_msg = self.base_messages[-1]

        if last_msg['role'] == message['role']:

            # Calculate delta

            diff = list(difflib.unified_diff(

                last_msg['content'].splitlines(),

                message['content'].splitlines(),

                lineterm=''

            ))

            if len('\n'.join(diff)) < len(message['content']) * 0.7:

                # Store as delta if compression achieved

                self.deltas.append({

                    'base_index': len(self.base_messages) - 1,

                    'delta': diff,

                    'role': message['role']

                })

                return

        # Store as new base message

        self.base_messages.append(message)

    def reconstruct(self):

        """Reconstruct full conversation from bases + deltas."""

        messages = self.base_messages.copy()

        for delta_info in self.deltas:

            base_content = messages[delta_info['base_index']]['content']

            # Apply diff to reconstruct (simplified)

            reconstructed = base_content  # Full implementation would apply diff

            messages.append({

                'role': delta_info['role'],

                'content': reconstructed

            })

        return messages

4. LangChain Memory Types

#### 4.1 ConversationSummaryMemory

Automatically summarizes conversation as it progresses.

from langchain.memory import ConversationSummaryMemory

from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")

memory = ConversationSummaryMemory(llm=llm)

# Add conversation

memory.save_context(

    {"input": "Hi, I'm working on a Python project"},

    {"output": "Great! How can I help with your Python project?"}

)

# Get summary

summary = memory.load_memory_variables({})

print(summary['history'])

Pros: Automatic summarization, simple API

Cons: Every turn triggers LLM call

Best for: Medium conversations (20-50 turns)

#### 4.2 ConversationSummaryBufferMemory

Hybrid: Recent messages verbatim, older summarized.

from langchain.memory import ConversationSummaryBufferMemory

from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-haiku-20241022")

memory = ConversationSummaryBufferMemory(

    llm=llm,

    max_token_limit=2000,  # Summarize when exceeding

    return_messages=True

)

# Add conversation

for i in range(50):

    memory.save_context(

        {"input": f"Question {i}"},

        {"output": f"Answer {i}"}

    )

# Automatically keeps recent messages + summary of old

context = memory.load_memory_variables({})

Pros: Best balance of detail and compression

Cons: Requires token limit tuning

Best for: Most production applications

#### 4.3 ConversationTokenBufferMemory

Maintains fixed token budget, drops oldest when exceeded.

from langchain.memory import ConversationTokenBufferMemory

from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")

memory = ConversationTokenBufferMemory(

    llm=llm,

    max_token_limit=2000

)

# Simple FIFO when token limit exceeded

Pros: Predictable token usage, simple

Cons: Loses old information completely

Best for: Real-time chat with strict limits

#### 4.4 VectorStoreRetrieverMemory

Stores all messages in vector database, retrieves relevant ones.

from langchain.memory import VectorStoreRetrieverMemory

from langchain_community.vectorstores import Chroma

from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()

vectorstore = Chroma(embedding_function=embeddings)

memory = VectorStoreRetrieverMemory(

    retriever=vectorstore.as_retriever(search_kwargs={"k": 5})

)

# Automatically retrieves most relevant context

Pros: Infinite conversation length, semantic retrieval

Cons: Requires vector DB, retrieval overhead

Best for: Long-running conversations, knowledge bases

5. Anthropic-Specific Patterns

#### 5.1 Prompt Caching (90% Cost Reduction)

Cache static context to reduce token costs.

from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

# Long conversation context

conversation_history = [

    {"role": "user", "content": "Message 1"},

    {"role": "assistant", "content": "Response 1"},

    # ... many more messages

]

# Mark context for caching

messages = []

for i, msg in enumerate(conversation_history[:-1]):

    content = msg['content']

    # Add cache control to last context message

    if i == len(conversation_history) - 2:

        messages.append({

            "role": msg['role'],

            "content": [

                {

                    "type": "text",

                    "text": content,

                    "cache_control": {"type": "ephemeral"}

                }

            ]

        })

    else:

        messages.append(msg)

# Add new user message (not cached)

messages.append(conversation_history[-1])

response = client.messages.create(

    model="claude-3-5-sonnet-20241022",

    max_tokens=1024,

    messages=messages

)

# Subsequent calls with same cached context cost 90% less

Cache TTL: 5 minutes

Savings: 90% cost reduction for cached tokens

Limits: Max 4 cache breakpoints per request

Best practices:

  • Cache conversation history, not current query
  • Update cache when context changes significantly
  • Combine with summarization for maximum efficiency

#### 5.2 Extended Thinking for Compression Planning

Use extended thinking to plan optimal compression strategy.

from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

response = client.messages.create(

    model="claude-3-7-sonnet-20250219",

    max_tokens=16000,

    thinking={

        "type": "enabled",

        "budget_tokens": 10000

    },

    messages=[{

        "role": "user",

        "content": f"""Analyze this conversation and recommend compression:

{conversation_text}

Current token count: {current_tokens}

Target: {target_tokens}

Required compression: {compression_ratio}x

Recommend optimal strategy."""

    }]

)

# Access thinking process

thinking_content = [

    block for block in response.content

    if block.type == "thinking"

]

# Get compression recommendation

recommendation = response.content[-1].text

Production Patterns

Checkpointing and Persistence

Save compression state for recovery and resume.

import json

import pickle

from pathlib import Path

class PersistentMemory:

    def __init__(self, checkpoint_dir: str = "./checkpoints"):

        self.checkpoint_dir = Path(checkpoint_dir)

        self.checkpoint_dir.mkdir(exist_ok=True)

        self.memory = []

        self.summary = None

    def save_checkpoint(self, session_id: str):

        """Save current memory state."""

        checkpoint = {

            'messages': self.memory,

            'summary': self.summary,

            'timestamp': time.time()

        }

        checkpoint_file = self.checkpoint_dir / f"{session_id}.json"

        with open(checkpoint_file, 'w') as f:

            json.dump(checkpoint, f, indent=2)

    def load_checkpoint(self, session_id: str):

        """Load memory state from checkpoint."""

        checkpoint_file = self.checkpoint_dir / f"{session_id}.json"

        if checkpoint_file.exists():

            with open(checkpoint_file, 'r') as f:

                checkpoint = json.load(f)

            self.memory = checkpoint['messages']

            self.summary = checkpoint.get('summary')

            return True

        return False

    def auto_checkpoint(self, session_id: str, interval: int = 10):

        """Automatically save every N messages."""

        if len(self.memory) % interval == 0:

            self.save_checkpoint(session_id)

Resume Workflows

Continue conversations across sessions.

from anthropic import Anthropic

import json

class ResumableConversation:

    def __init__(self, client: Anthropic, session_id: str):

        self.client = client

        self.session_id = session_id

        self.memory = self._load_or_create()

    def _load_or_create(self):

        """Load existing session or create new."""

        try:

            with open(f'sessions/{self.session_id}.json', 'r') as f:

                return json.load(f)

        except FileNotFoundError:

            return {

                'messages': [],

                'summary': None,

                'created_at': time.time()

            }

    def add_turn(self, user_message: str):

        """Add user message and get response."""

        # Add user message

        self.memory['messages'].append({

            'role': 'user',

            'content': user_message

        })

        # Build context (with compression)

        context = self._build_context()

        # Get response

        response = self.client.messages.create(

            model="claude-3-5-sonnet-20241022",

            max_tokens=1024,

            messages=context + [{

                'role': 'user',

                'content': user_message

            }]

        )

        # Save response

        assistant_message = response.content[0].text

        self.memory['messages'].append({

            'role': 'assistant',

            'content': assistant_message

        })

        # Compress if needed

        if len(self.memory['messages']) > 20:

            self._compress()

        # Save state

        self._save()

        return assistant_message

    def _build_context(self):

        """Build context with compression."""

        context = []

        # Add summary if exists

        if self.memory['summary']:

            context.append({

                'role': 'system',

                'content': f"[Previous conversation summary]\n{self.memory['summary']}"

            })

        # Add recent messages

        context.extend(self.memory['messages'][-10:])

        return context

    def _compress(self):

        """Compress older messages."""

        if len(self.memory['messages']) < 15:

            return

        # Messages to summarize

        to_summarize = self.memory['messages'][:-10]

        # Generate summary

        conversation_text = "\n\n".join([

            f"{msg['role']}: {msg['content']}"

            for msg in to_summarize

        ])

        response = self.client.messages.create(

            model="claude-3-5-haiku-20241022",

            max_tokens=500,

            messages=[{

                'role': 'user',

                'content': f"Summarize this conversation:\n\n{conversation_text}"

            }]

        )

        # Update memory

        self.memory['summary'] = response.content[0].text

        self.memory['messages'] = self.memory['messages'][-10:]

    def _save(self):

        """Save session to disk."""

        with open(f'sessions/{self.session_id}.json', 'w') as f:

            json.dump(self.memory, f, indent=2)

# Usage

client = Anthropic(api_key="your-api-key")

conversation = ResumableConversation(client, session_id="user123_session1")

# Continue across multiple sessions

response1 = conversation.add_turn("What's Python?")

# ... later session

response2 = conversation.add_turn("Show me an example")  # Remembers context

Hybrid Approaches (Best Practice)

Combine multiple techniques for optimal results.

from anthropic import Anthropic

from openai import OpenAI

import chromadb

class HybridMemorySystem:

    """

    Combines:

    - Rolling summarization (short-term compression)

    - RAG retrieval (long-term memory)

    - Prompt caching (cost optimization)

    - Progressive compression (adaptive behavior)

    """

    def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):

        self.anthropic = anthropic_client

        self.openai = openai_client

        # Recent messages (verbatim)

        self.recent_messages = []

        self.recent_window = 10

        # Rolling summary

        self.rolling_summary = None

        # Vector store (long-term)

        self.chroma = chromadb.Client()

        self.collection = self.chroma.create_collection(name="memory")

        self.message_counter = 0

        # Compression thresholds

        self.thresholds = {

            'light': 0.70,    # Start basic compression

            'medium': 0.85,   # Aggressive summarization

            'heavy': 0.95     # Emergency measures

        }

    def add_message(self, message: dict):

        """Add message with intelligent compression."""

        self.recent_messages.append(message)

        # Check compression needs

        usage_ratio = self._estimate_usage()

        if usage_ratio >= self.thresholds['heavy']:

            self._emergency_compress()

        elif usage_ratio >= self.thresholds['medium']:

            self._medium_compress()

        elif usage_ratio >= self.thresholds['light']:

            self._light_compress()

    def _light_compress(self):

        """Remove redundancy, archive to vector store."""

        if len(self.recent_messages) > self.recent_window * 1.5:

            # Archive oldest to vector store

            to_archive = self.recent_messages[:5]

            for msg in to_archive:

                self._archive_to_vectorstore(msg)

            self.recent_messages = self.recent_messages[5:]

    def _medium_compress(self):

        """Generate rolling summary, aggressive archival."""

        if len(self.recent_messages) > self.recent_window:

            # Summarize older messages

            to_summarize = self.recent_messages[:-self.recent_window]

            summary_text = "\n\n".join([

                f"{msg['role']}: {msg['content']}"

                for msg in to_summarize

            ])

            if self.rolling_summary:

                summary_text = f"Existing: {self.rolling_summary}\n\nNew: {summary_text}"

            response = self.anthropic.messages.create(

                model="claude-3-5-haiku-20241022",

                max_tokens=400,

                messages=[{

                    'role': 'user',

                    'content': f"Update summary:\n{summary_text}"

                }]

            )

            self.rolling_summary = response.content[0].text

            # Archive all summarized messages

            for msg in to_summarize:

                self._archive_to_vectorstore(msg)

            self.recent_messages = self.recent_messages[-self.recent_window:]

    def _emergency_compress(self):

        """Extreme compression for near-limit situations."""

        # Keep only 5 most recent messages

        to_archive = self.recent_messages[:-5]

        for msg in to_archive:

            self._archive_to_vectorstore(msg)

        self.recent_messages = self.recent_messages[-5:]

        # Compress summary further if needed

        if self.rolling_summary and len(self.rolling_summary) > 1000:

            response = self.anthropic.messages.create(

                model="claude-3-5-haiku-20241022",

                max_tokens=200,

                messages=[{

                    'role': 'user',

                    'content': f"Create ultra-concise summary:\n{self.rolling_summary}"

                }]

            )

            self.rolling_summary = response.content[0].text

    def _archive_to_vectorstore(self, message: dict):

        """Store in vector database for retrieval."""

        embedding_response = self.openai.embeddings.create(

            model="text-embedding-3-small",

            input=message['content']

        )

        self.collection.add(

            embeddings=[embedding_response.data[0].embedding],

            documents=[message['content']],

            metadatas=[{'role': message['role']}],

            ids=[f"msg_{self.message_counter}"]

        )

        self.message_counter += 1

    def get_context(self, current_query: str, max_tokens: int = 8000):

        """Build optimal context for current query."""

        context = []

        token_count = 0

        # 1. Add rolling summary (if exists)

        if self.rolling_summary:

            summary_msg = {

                'role': 'system',

                'content': [

                    {

                        'type': 'text',

                        'text': f"[Conversation Summary]\n{self.rolling_summary}",

                        'cache_control': {'type': 'ephemeral'}  # Cache it

                    }

                ]

            }

            context.append(summary_msg)

            token_count += len(self.rolling_summary) // 4

        # 2. Retrieve relevant historical context (RAG)

        if token_count < max_tokens * 0.3:

            query_embedding = self.openai.embeddings.create(

                model="text-embedding-3-small",

                input=current_query

            )

            results = self.collection.query(

                query_embeddings=[query_embedding.data[0].embedding],

                n_results=5

            )

            for i, doc in enumerate(results['documents'][0]):

                if token_count + len(doc) // 4 > max_tokens * 0.3:

                    break

                metadata = results['metadatas'][0][i]

                context.append({

                    'role': metadata['role'],

                    'content': f"[Retrieved] {doc}"

                })

                token_count += len(doc) // 4

        # 3. Add recent messages verbatim

        for msg in self.recent_messages:

            if token_count + len(msg['content']) // 4 > max_tokens * 0.8:

                break

            context.append(msg)

            token_count += len(msg['content']) // 4

        return context

    def _estimate_usage(self):

        """Estimate current context window usage."""

        total_tokens = 0

        if self.rolling_summary:

            total_tokens += len(self.rolling_summary) // 4

        for msg in self.recent_messages:

            total_tokens += len(msg['content']) // 4

        return total_tokens / 200000  # Claude Sonnet context window

# Usage

anthropic_client = Anthropic(api_key="your-anthropic-key")

openai_client = OpenAI(api_key="your-openai-key")

memory = HybridMemorySystem(anthropic_client, openai_client)

# Add messages over time

for i in range(1000):

    memory.add_message({

        'role': 'user' if i % 2 == 0 else 'assistant',

        'content': f"Message {i} with some content..."

    })

# Retrieve optimized context

current_query = "What did we discuss about pricing?"

context = memory.get_context(current_query)

# Use with Claude

response = anthropic_client.messages.create(

    model="claude-3-5-sonnet-20241022",

    max_tokens=1024,

    messages=context + [{

        'role': 'user',

        'content': current_query

    }]

)

Performance Benchmarks

Compression Efficiency

Technique

Compression Ratio

Quality Loss

Latency

Cost Impact

Extractive

2-3x

<1%

<10ms

None

Abstractive

5-10x

2-5%

1-2s

+$0.001/turn

Hierarchical

20x+

5-8%

2-5s

+$0.003/turn

LLMLingua

20x

1.5%

500ms

None

RAG

Variable

<1%

100-300ms

+$0.0005/turn

Prompt Caching

N/A

0%

0ms

-90%

Token Savings by Use Case

Customer Support (50-turn conversation):

  • No compression: ~8,000 tokens/request
  • Rolling summary: ~2,000 tokens/request (75% reduction)
  • Hybrid (RAG + summary): ~1,500 tokens/request (81% reduction)

Code Assistant (100-turn session):

  • No compression: ~25,000 tokens/request
  • Hierarchical: ~5,000 tokens/request (80% reduction)
  • Hybrid + caching: ~1,000 tokens/request effective (96% cost reduction)

Educational Tutor (multi-session):

  • No compression: Would exceed context window
  • RAG + summarization: ~3,000 tokens/request
  • Infinite session length enabled

Cost Analysis

Example: Claude Sonnet pricing ($3 input, $15 output per 1M tokens)

1,000 conversations, 50 turns each:

-

No compression:

  • Avg 8K tokens/request × 50K requests = 400M tokens
  • Cost: $1,200

-

With rolling summarization:

  • Avg 2K tokens/request × 50K requests = 100M tokens
  • Summarization overhead: +10M tokens
  • Cost: $330 (72% savings)

-

With hybrid system + caching:

  • First turn: 2K tokens (no cache)
  • Subsequent: 200 tokens effective (90% cache hit)
  • Total: ~15M tokens effective
  • Cost: $45 (96% savings)

Tool Recommendations

Memory Management Tools

#### Mem0 (Recommended for Production)

Best for: Hybrid memory systems with minimal code

from mem0 import MemoryClient

client = MemoryClient(api_key="your-mem0-key")

# Automatically handles compression, summarization, RAG

memory = client.create_memory(

    user_id="user123",

    messages=[

        {"role": "user", "content": "I'm working on a Python project"},

        {"role": "assistant", "content": "Great! What kind of project?"}

    ]

)

# Retrieve relevant context

context = client.get_memory(

    user_id="user123",

    query="What programming language am I using?"

)

Features:

  • Automatic hierarchical summarization
  • Built-in RAG retrieval
  • Multi-user session management
  • Analytics dashboard

Pricing: $0.40/1K memory operations

#### Zep

Best for: Low-latency production deployments**

from zep_python import ZepClient

client = ZepClient(api_key="your-zep-key")

# Add to session

client.memory.add_memory(

    session_id="session123",

    messages=[

        {"role": "user", "content": "Hello"},

        {"role": "assistant", "content": "Hi there!"}

    ]

)

# Auto-summarized retrieval

memory = client.memory.get_memory(session_id="session123")

Features:

  • <100ms retrieval latency
  • Automatic fact extraction
  • Entity recognition
  • Session management

Pricing: Open-source (self-hosted) or $0.50/1K operations (cloud)

#### ChromaDB

Best for: Self-hosted vector storage**

import chromadb

client = chromadb.Client()

collection = client.create_collection("conversations")

# Store embeddings

collection.add(

    documents=["Message content"],

    embeddings=[[0.1, 0.2, ...]],

    ids=["msg1"]

)

# Retrieve

results = collection.query(

    query_embeddings=[[0.1, 0.2, ...]],

    n_results=5

)

Features:

  • Fully open-source
  • Embedded or client-server
  • Fast local development

Pricing: Free (self-hosted)

#### LangChain

Best for: Rapid prototyping and experimentation**

from langchain.memory import ConversationSummaryBufferMemory

from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")

memory = ConversationSummaryBufferMemory(llm=llm, max_token_limit=2000)

Features:

  • Multiple memory types
  • Framework integration
  • Extensive documentation

Pricing: Free (uses your LLM API costs)

Compression Libraries

#### LLMLingua

Best for: Extreme compression with minimal quality loss**

from llmlingua import PromptCompressor

compressor = PromptCompressor()

compressed = compressor.compress_prompt(

    context="Long conversation history...",

    instruction="Current user query",

    target_token=500

)

# Achieves 20x compression with 1.5% accuracy loss

Features:

  • 20x compression ratios
  • <2% quality degradation
  • Fast inference (<500ms)

Pricing: Free (open-source)

Use Cases and Patterns

Chatbot (Customer Support)

Requirements:

  • Multi-turn conversations (50-100 turns)
  • Preserve customer context
  • Fast response times
  • Cost-efficient

Recommended approach:

  • ConversationSummaryBufferMemory (LangChain)
  • 70% threshold: Semantic deduplication
  • 85% threshold: Rolling summarization
  • Prompt caching for frequent patterns

Implementation:

from langchain.memory import ConversationSummaryBufferMemory

from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-haiku-20241022")

memory = ConversationSummaryBufferMemory(

    llm=llm,

    max_token_limit=2000,

    return_messages=True

)

# Add customer conversation

for turn in customer_conversation:

    memory.save_context(

        {"input": turn['customer_message']},

        {"output": turn['agent_response']}

    )

# Retrieve compressed context

context = memory.load_memory_variables({})

Code Assistant

Requirements:

  • Long development sessions (100+ turns)
  • Preserve technical details
  • Handle large code blocks
  • Track incremental changes

Recommended approach:

  • Hierarchical summarization for overall context
  • RAG retrieval for specific code references
  • Delta compression for iterative edits
  • Prompt caching for system prompts

Implementation:

from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

class CodeAssistantMemory:

    def __init__(self):

        self.hierarchy = HierarchicalMemory(client, chunk_size=15)

        self.rag = RAGMemory(anthropic_client=client, openai_client=openai_client)

        self.deltas = DeltaCompressor()

    def add_interaction(self, code_change: dict):

        # Store in hierarchy

        self.hierarchy.add_message({

            'role': 'user',

            'content': code_change['description']

        })

        # Store in RAG for retrieval

        self.rag.add_message(code_change)

        # Store as delta if incremental

        if code_change.get('is_incremental'):

            self.deltas.add_message(code_change)

    def get_context(self, current_query: str):

        # Combine hierarchical summary + RAG retrieval

        summary_context = self.hierarchy.get_context(max_tokens=2000)

        rag_context = self.rag.retrieve_context(current_query, max_tokens=2000)

        return summary_context + rag_context

Educational Tutor

Requirements:

  • Multi-session tracking
  • Student progress persistence
  • Personalized context retrieval
  • Long-term knowledge retention

Recommended approach:

  • VectorStoreRetrieverMemory for multi-session
  • Fact extraction for student knowledge
  • Progressive compression across sessions
  • Resumable conversations

Implementation:

from langchain.memory import VectorStoreRetrieverMemory

from langchain_community.vectorstores import Chroma

from langchain_openai import OpenAIEmbeddings

class TutorMemory:

    def __init__(self, student_id: str):

        self.student_id = student_id

        # Vector store for all sessions

        embeddings = OpenAIEmbeddings()

        vectorstore = Chroma(

            collection_name=f"student_{student_id}",

            embedding_function=embeddings

        )

        self.memory = VectorStoreRetrieverMemory(

            retriever=vectorstore.as_retriever(search_kwargs={"k": 10})

        )

    def add_lesson_content(self, lesson: dict):

        """Add lesson interaction to student memory."""

        self.memory.save_context(

            {"input": lesson['topic']},

            {"output": lesson['explanation']}

        )

    def get_student_context(self, current_topic: str):

        """Retrieve relevant past lessons for current topic."""

        return self.memory.load_memory_variables({

            "prompt": current_topic

        })

Best Practices

1. Choose the Right Technique for Your Use Case

  • Short conversations (<20 turns): No compression needed
  • Medium conversations (20-50 turns): ConversationSummaryBufferMemory
  • Long conversations (50-100 turns): Hierarchical or rolling summarization
  • Very long (100+ turns): Hybrid (RAG + summarization + caching)
  • Multi-session: VectorStoreRetrieverMemory or Mem0

2. Implement Progressive Compression

Don't compress aggressively from the start. Use thresholds:

  • 0-70%: Store verbatim
  • 70-85%: Light compression (deduplication)
  • 85-95%: Medium compression (summarization)
  • 95-100%: Aggressive compression (hierarchical)

3. Combine Techniques

Single-technique approaches are suboptimal. Best production systems use:

  • Rolling summarization (short-term)
  • RAG retrieval (long-term)
  • Prompt caching (cost optimization)
  • Semantic deduplication (redundancy removal)

4. Monitor Quality Metrics

Track compression impact:

  • Response relevance score
  • Information retention rate
  • User satisfaction metrics
  • Token usage reduction

5. Use Prompt Caching Strategically

Cache stable content:

  • Conversation summaries
  • System prompts
  • Knowledge base context
  • User profiles

Don't cache frequently changing content:

  • Current user query
  • Real-time data
  • Session-specific state

6. Implement Checkpointing

Save compression state for:

  • Recovery from failures
  • Multi-session continuity
  • Analytics and debugging
  • A/B testing different strategies

7. Tune Compression Parameters

Test and optimize:

  • Summary token limits
  • Compression thresholds
  • Retrieval result counts
  • Cache TTLs
  • Chunk sizes for hierarchical

8. Handle Edge Cases

Plan for:

  • Very long messages (split or compress individually)
  • Code blocks (preserve formatting)
  • Multi-language content
  • Rapidly changing context

Troubleshooting

Problem: Summary loses critical information

Solutions:

  • Lower compression ratio (less aggressive)
  • Implement importance scoring to preserve key messages
  • Use extractive summarization for critical sections
  • Increase summary token budget

Problem: Retrieval returns irrelevant context

Solutions:

  • Improve embedding model quality
  • Add metadata filtering (timestamps, topics)
  • Adjust similarity threshold
  • Use hybrid search (semantic + keyword)

Problem: High latency from compression

Solutions:

  • Compress asynchronously (background tasks)
  • Use faster models for summarization (Haiku instead of Sonnet)
  • Cache summaries more aggressively
  • Reduce compression frequency

Problem: Conversations still exceeding context window

Solutions:

  • Implement hierarchical compression
  • Archive to vector database more aggressively
  • Use more aggressive compression ratios
  • Consider switching to model with larger context window

Problem: High costs despite compression

Solutions:

  • Implement prompt caching
  • Use cheaper models for summarization (Haiku)
  • Batch summarization operations
  • Reduce summarization frequency

Problem: Lost conversation continuity

Solutions:

  • Increase recent message window
  • Include summary in every request
  • Use more descriptive summaries
  • Implement session resumption with context injection

Advanced Topics

Streaming Compression

Compress in real-time as conversation progresses:

async def streaming_compress(messages: list):

    """Compress while streaming responses."""

    compressor = ProgressiveCompressor()

    async for message in conversation_stream:

        compressor.add_message(message)

        # Compression happens asynchronously

        if compressor.should_compress():

            asyncio.create_task(compressor.compress_async())

    return compressor.get_context()

Multi-User Session Management

Handle concurrent conversations with shared context:

class MultiUserMemory:

    def __init__(self):

        self.user_sessions = {}

    def get_or_create_session(self, user_id: str):

        if user_id not in self.user_sessions:

            self.user_sessions[user_id] = HybridMemorySystem(...)

        return self.user_sessions[user_id]

    def cleanup_inactive_sessions(self, timeout: int = 3600):

        """Remove sessions inactive for > timeout seconds."""

        current_time = time.time()

        inactive = [

            user_id for user_id, session in self.user_sessions.items()

            if current_time - session.last_activity > timeout

        ]

        for user_id in inactive:

            self._archive_session(user_id)

            del self.user_sessions[user_id]

Custom Importance Scoring

Train ML models to score message importance:

from transformers import pipeline

class MLImportanceScorer:

    def __init__(self):

        # Use pre-trained classifier or fine-tune on your data

        self.classifier = pipeline(

            "text-classification",

            model="your-importance-model"

        )

    def score(self, message: dict) -> float:

        """Score message importance (0-1)."""

        result = self.classifier(message['content'])

        return result[0]['score']

Context Window Utilization Optimization

Maximize information density within token budget:

def optimize_context_allocation(

    summary_tokens: int,

    recent_tokens: int,

    retrieval_tokens: int,

    max_tokens: int

):

    """

    Optimal allocation (empirically tested):

    - 20% summary

    - 50% recent messages

    - 30% retrieved context

    """

    return {

        'summary': int(max_tokens * 0.20),

        'recent': int(max_tokens * 0.50),

        'retrieval': int(max_tokens * 0.30)

    }

Future Directions

Emerging Techniques (2025+)

1. Infinite Attention Mechanisms

  • Models with >10M token context windows (Gemini 1.5, future Claude)
  • Reduces need for compression but doesn't eliminate cost concerns

2. Learned Compression Models

  • Neural networks trained to compress conversation optimally
  • Maintain semantic meaning while minimizing tokens
  • Examples: LLMLingua v2, PromptCompressor

3. Multimodal Session Compression

  • Compress conversations with images, audio, video
  • Maintain cross-modal context relationships

4. Federated Memory Systems

  • Distributed compression across multiple memory stores
  • Privacy-preserving compression for sensitive conversations

5. Adaptive Compression Strategies

  • RL-based systems that learn optimal compression per user/domain
  • Dynamic threshold adjustment based on conversation importance

References

Academic Papers

  • "Recursively Summarizing Enables Long-Term Dialogue Memory" (arXiv:2308.15022)
  • "LLMLingua: Compressing Prompts for Accelerated Inference" (arXiv:2310.05736)
  • "Lost in the Middle: How Language Models Use Long Contexts" (arXiv:2307.03172)

Documentation

Tools

  • Mem0 - Managed memory service
  • Zep - Fast memory layer

Last Updated: 2025-11-30

Version: 1.0.0

License: MIT

BrowserAct

Let your agent run on any real-world website

Bypass CAPTCHA & anti-bot for free. Start local, scale to cloud.

Explore BrowserAct Skills →

Stop writing automation&scrapers

Install the CLI. Run your first Skill in 30 seconds. Scale when you're ready.

Start free
free · no credit card