LLM Agent Context Window Limits: Understanding and Solutions
Context window limitations are the silent killer of AI agent projects. Your agent works perfectly in short conversations, then mysteriously starts forgetting previous interactions, losing track of multi-step tasks, and making decisions as if earlier context never existed.
Unlike traditional software where memory constraints are explicit and predictable, LLM context windows create a moving boundary that shrinks your agent’s “working memory” as conversations grow. This fundamental constraint affects every aspect of agent design—from tool usage patterns to memory architecture to user experience.
This guide provides a comprehensive understanding of context window limitations and proven strategies for building agents that work reliably regardless of conversation length or task complexity.
Understanding LLM Context Window Limitations
What Are Context Window Limits and Why They Matter
A context window is the maximum number of tokens (words, parts of words, and symbols) an LLM can process in a single request. This includes:
- System prompt and instructions
- Conversation history
- Tool outputs and intermediate results
- User input
- Reserved space for response generation
Current Context Window Sizes by Provider (as of 2026):
| Provider | Model | Context Window | Practical Limit* |
|---|---|---|---|
| OpenAI | GPT-4o | 128,000 tokens | ~100,000 tokens |
| OpenAI | GPT-4-Turbo | 128,000 tokens | ~100,000 tokens |
| Anthropic | Claude 3 | 200,000 tokens | ~160,000 tokens |
| Gemini 2.5 Pro | 1,048,576 tokens | ~1,000,000 tokens |
*Practical limit accounts for response generation space and reliability at scale
Why Context Limits Create Unique Problems:
Unlike traditional memory constraints that are fixed and known, context window limits create dynamic boundaries that shift based on conversation length. This means:
- Unpredictable Failures: Agents that work in testing may fail in production as conversations grow longer
- Silent Degradation: Context truncation often happens without explicit errors, leading to subtle performance issues
- Cascading Effects: Lost context affects all subsequent decisions and tool usage
- Non-Linear Impact: The effect of context loss compounds over multi-step workflows
How Context Limits Affect Different Agent Workflows
Short-Task Agents (Single request/response):
– Minimal impact if task fits within context
– Problems arise with complex instructions or large inputs
– Tool outputs may push context over limits
Conversational Agents (Multi-turn dialogue):
– Gradual context accumulation over conversation
– Loss of early conversation context affects coherence
– Personality and preferences may be forgotten
Workflow Agents (Multi-step task execution):
– Each tool use adds to context consumption
– Intermediate results consume significant tokens
– Failed tasks due to context loss mid-workflow
Research/Analysis Agents (Large data processing):
– Document summaries and extracts consume context rapidly
– Analysis depth limited by available context space
– Information loss during iterative analysis
Common Problems Caused by Context Limits
Context Window Overflow Example:
Token Breakdown for Failed Agent:
System Prompt: 1,500 tokens
Conversation History: 8,000 tokens
Tool Outputs: 12,000 tokens
Current Request: 2,000 tokens
Response Reserve: 4,000 tokens
Total: 27,500 tokens
Context Limit: 24,000 tokens
Overflow: 3,500 tokens (silently truncated)
Tool Use Failures Due to Context Overflow:
When context limits are exceeded, the model may lose:
– Previous tool outputs needed for next steps
– Instructions about tool usage patterns
– Context about what tools have already been tried
– Error recovery information from failed attempts
Example Failure Scenario:
# Step 1: Agent searches database (success)
search_result = "Found 150 matching records..."
# Step 2: Agent analyzes results (context growing)
analysis = "Analysis shows patterns in data..."
# Step 3: Context limit exceeded, search_result truncated
# Agent tries to re-search same data, entering infinite loop
Context Window Management Strategies
Sliding Window Techniques
Implement intelligent context management that preserves critical information:
class SlidingContextManager:
def __init__(self, max_tokens=4000, preservation_strategy='priority'):
self.max_tokens = max_tokens
self.preservation_strategy = preservation_strategy
self.context_segments = {
'system': {'tokens': 0, 'content': [], 'priority': 10},
'user_preferences': {'tokens': 0, 'content': [], 'priority': 9},
'recent_messages': {'tokens': 0, 'content': [], 'priority': 8},
'tool_outputs': {'tokens': 0, 'content': [], 'priority': 7},
'old_messages': {'tokens': 0, 'content': [], 'priority': 5}
}
def add_message(self, message, segment_type='recent_messages'):
"""Add message to appropriate context segment"""
tokens = self._count_tokens(message)
self.context_segments[segment_type]['content'].append(message)
self.context_segments[segment_type]['tokens'] += tokens
# Check if context management needed
if self._total_tokens() > self.max_tokens:
self._manage_context()
def _manage_context(self):
"""Intelligently manage context to stay within limits"""
if self.preservation_strategy == 'priority':
self._priority_based_management()
elif self.preservation_strategy == 'sliding':
self._sliding_window_management()
elif self.preservation_strategy == 'semantic':
self._semantic_compression_management()
def _priority_based_management(self):
"""Remove content based on priority levels"""
total_tokens = self._total_tokens()
tokens_to_remove = total_tokens - self.max_tokens
# Sort segments by priority (lowest first)
sorted_segments = sorted(
self.context_segments.items(),
key=lambda x: x[1]['priority']
)
for segment_name, segment in sorted_segments:
if tokens_to_remove <= 0:
break
# Remove oldest content from this segment
while segment['content'] and tokens_to_remove > 0:
removed_message = segment['content'].pop(0)
removed_tokens = self._count_tokens(removed_message)
segment['tokens'] -= removed_tokens
tokens_to_remove -= removed_tokens
def _sliding_window_management(self):
"""Maintain recent context while preserving critical segments"""
# Always preserve system and user preferences
protected_tokens = (
self.context_segments['system']['tokens'] +
self.context_segments['user_preferences']['tokens']
)
available_tokens = self.max_tokens - protected_tokens
# Allocate remaining tokens to other segments
recent_allocation = int(available_tokens * 0.4)
tool_allocation = int(available_tokens * 0.3)
old_allocation = available_tokens - recent_allocation - tool_allocation
self._trim_segment('recent_messages', recent_allocation)
self._trim_segment('tool_outputs', tool_allocation)
self._trim_segment('old_messages', old_allocation)
def _trim_segment(self, segment_name, max_tokens):
"""Trim segment to specified token limit"""
segment = self.context_segments[segment_name]
while segment['tokens'] > max_tokens and segment['content']:
# Remove oldest message
removed_message = segment['content'].pop(0)
removed_tokens = self._count_tokens(removed_message)
segment['tokens'] -= removed_tokens
def build_context(self):
"""Build final context string from managed segments"""
context_parts = []
# Add segments in logical order
for segment_name in ['system', 'user_preferences', 'old_messages', 'tool_outputs', 'recent_messages']:
segment_content = self.context_segments[segment_name]['content']
if segment_content:
context_parts.extend(segment_content)
return '\n'.join(context_parts)
def get_context_summary(self):
"""Get summary of current context usage"""
return {
'total_tokens': self._total_tokens(),
'max_tokens': self.max_tokens,
'utilization': f"{(self._total_tokens() / self.max_tokens) * 100:.1f}%",
'segments': {
name: {
'tokens': segment['tokens'],
'messages': len(segment['content'])
}
for name, segment in self.context_segments.items()
}
}
Context Summarization and Compression
Implement intelligent compression that preserves semantic meaning:
class ContextCompressor:
def __init__(self, compression_model='gpt-3.5-turbo'):
self.compression_model = compression_model
self.compression_prompts = {
'conversation': """Summarize this conversation, preserving:
- Key decisions made
- Important information shared
- User preferences mentioned
- Unresolved questions or tasks
Conversation to summarize:
{content}
Summary:""",
'tool_outputs': """Summarize these tool outputs, preserving:
- Key results and findings
- Data patterns discovered
- Errors encountered
- Next steps suggested
Tool outputs:
{content}
Summary:""",
'analysis': """Compress this analysis, keeping:
- Main conclusions
- Critical data points
- Methodology used
- Recommendations
Analysis:
{content}
Compressed analysis:"""
}
async def compress_context_segment(self, content, segment_type, target_reduction=0.7):
"""Compress context segment while preserving key information"""
original_tokens = self._count_tokens(content)
target_tokens = int(original_tokens * target_reduction)
# Choose compression strategy based on content size and type
if original_tokens < 500: # Small content, no compression needed
return content
elif original_tokens < 2000: # Medium content, extractive summarization
return await self._extractive_summarization(content, target_tokens)
else: # Large content, abstractive summarization
return await self._abstractive_summarization(content, segment_type, target_tokens)
async def _abstractive_summarization(self, content, segment_type, target_tokens):
"""Use LLM for intelligent summarization"""
prompt = self.compression_prompts.get(segment_type, self.compression_prompts['conversation'])
compression_request = {
'model': self.compression_model,
'messages': [
{
'role': 'user',
'content': prompt.format(content=content)
}
],
'max_tokens': target_tokens,
'temperature': 0.1 # Low temperature for consistent summarization
}
try:
response = await self._make_llm_request(compression_request)
summary = response['choices'][0]['message']['content']
# Verify compression achieved target
summary_tokens = self._count_tokens(summary)
if summary_tokens > target_tokens:
# Further compress if needed
return summary[:int(len(summary) * (target_tokens / summary_tokens))]
return summary
except Exception as e:
# Fallback to extractive summarization on error
return await self._extractive_summarization(content, target_tokens)
async def _extractive_summarization(self, content, target_tokens):
"""Extract most important sentences to meet token target"""
sentences = self._split_into_sentences(content)
sentence_scores = await self._score_sentence_importance(sentences)
# Sort by importance score
scored_sentences = list(zip(sentences, sentence_scores))
scored_sentences.sort(key=lambda x: x[1], reverse=True)
# Select sentences until target tokens reached
selected_sentences = []
current_tokens = 0
for sentence, score in scored_sentences:
sentence_tokens = self._count_tokens(sentence)
if current_tokens + sentence_tokens <= target_tokens:
selected_sentences.append((sentence, score))
current_tokens += sentence_tokens
else:
break
# Reorder selected sentences chronologically
selected_sentences.sort(key=lambda x: sentences.index(x[0]))
return ' '.join([sentence for sentence, score in selected_sentences])
async def _score_sentence_importance(self, sentences):
"""Score sentence importance for extractive summarization"""
scores = []
for sentence in sentences:
score = 0
# Higher score for sentences with key indicators
key_indicators = [
'decided', 'concluded', 'important', 'key', 'critical',
'user wants', 'preference', 'requirement', 'must',
'error', 'failed', 'success', 'found', 'result'
]
for indicator in key_indicators:
if indicator.lower() in sentence.lower():
score += 1
# Higher score for sentences with numbers/data
import re
if re.search(r'\d+', sentence):
score += 0.5
# Lower score for very short or very long sentences
word_count = len(sentence.split())
if word_count < 5 or word_count > 50:
score -= 0.5
scores.append(max(0, score)) # Ensure non-negative
return scores
Selective Context Retention Patterns
Implement intelligent patterns for retaining the most important context:
class SelectiveContextRetainer:
def __init__(self):
self.retention_rules = {
'user_goals': {'weight': 10, 'always_retain': True},
'successful_actions': {'weight': 8, 'decay_rate': 0.1},
'error_patterns': {'weight': 7, 'decay_rate': 0.2},
'user_preferences': {'weight': 9, 'always_retain': True},
'tool_configurations': {'weight': 6, 'decay_rate': 0.3},
'intermediate_results': {'weight': 5, 'decay_rate': 0.5}
}
def evaluate_context_importance(self, context_item, current_time):
"""Evaluate importance of a context item for retention"""
item_type = self._classify_context_item(context_item)
rule = self.retention_rules.get(item_type, {'weight': 1, 'decay_rate': 0.8})
base_importance = rule['weight']
# Apply time-based decay (except for always_retain items)
if not rule.get('always_retain', False):
time_since_creation = current_time - context_item.get('timestamp', current_time)
decay_factor = math.exp(-rule.get('decay_rate', 0.5) * time_since_creation.total_seconds() / 3600)
importance = base_importance * decay_factor
else:
importance = base_importance
# Boost importance for frequently referenced items
reference_count = context_item.get('reference_count', 0)
importance *= (1 + 0.1 * reference_count)
# Boost importance for items that led to successful outcomes
if context_item.get('led_to_success', False):
importance *= 1.5
return importance
def _classify_context_item(self, context_item):
"""Classify context item for retention rule selection"""
content = context_item.get('content', '').lower()
item_type = context_item.get('type', 'unknown')
if item_type == 'user_message' and any(keyword in content for keyword in ['want', 'need', 'goal', 'objective']):
return 'user_goals'
elif item_type == 'tool_output' and context_item.get('success', False):
return 'successful_actions'
elif item_type == 'error' or 'error' in content:
return 'error_patterns'
elif 'prefer' in content or 'like' in content or 'setting' in content:
return 'user_preferences'
elif item_type == 'tool_config':
return 'tool_configurations'
elif item_type == 'intermediate_result':
return 'intermediate_results'
else:
return 'general'
def select_context_for_retention(self, context_items, max_tokens, current_time):
"""Select most important context items within token budget"""
# Score all context items
scored_items = []
for item in context_items:
importance = self.evaluate_context_importance(item, current_time)
token_count = self._count_tokens(item.get('content', ''))
efficiency = importance / max(token_count, 1) # Importance per token
scored_items.append({
'item': item,
'importance': importance,
'tokens': token_count,
'efficiency': efficiency
})
# Sort by importance, then efficiency
scored_items.sort(key=lambda x: (x['importance'], x['efficiency']), reverse=True)
# Select items until token budget exhausted
selected_items = []
total_tokens = 0
for scored_item in scored_items:
if total_tokens + scored_item['tokens'] <= max_tokens:
selected_items.append(scored_item['item'])
total_tokens += scored_item['tokens']
else:
break
return selected_items, total_tokens
def create_context_summary(self, selected_items):
"""Create coherent summary from selected context items"""
# Group items by type and chronology
grouped_items = {}
for item in selected_items:
item_type = self._classify_context_item(item)
if item_type not in grouped_items:
grouped_items[item_type] = []
grouped_items[item_type].append(item)
# Build summary with logical flow
summary_parts = []
# Start with user goals and preferences
if 'user_goals' in grouped_items:
goals_text = self._format_goals(grouped_items['user_goals'])
summary_parts.append(f"User Goals: {goals_text}")
if 'user_preferences' in grouped_items:
prefs_text = self._format_preferences(grouped_items['user_preferences'])
summary_parts.append(f"User Preferences: {prefs_text}")
# Add successful actions and results
if 'successful_actions' in grouped_items:
actions_text = self._format_actions(grouped_items['successful_actions'])
summary_parts.append(f"Successful Actions: {actions_text}")
# Add important errors to avoid
if 'error_patterns' in grouped_items:
errors_text = self._format_errors(grouped_items['error_patterns'])
summary_parts.append(f"Issues to Avoid: {errors_text}")
return '\n\n'.join(summary_parts)
For comprehensive memory architecture strategies that work alongside context window management, see our guide on external memory alternatives to context window storage.
Alternative Memory Architectures
External Memory Systems for Agents
Implement external memory that supplements context windows:
class ExternalMemoryAgent:
def __init__(self, config):
self.context_manager = ContextManager(config.context_window_size)
self.external_memory = ExternalMemorySystem(config.memory_config)
self.memory_retriever = MemoryRetriever(self.external_memory)
async def process_request(self, request):
"""Process request using external memory + context window"""
# 1. Store current request in external memory
await self.external_memory.store_interaction(request)
# 2. Retrieve relevant memories from external storage
relevant_memories = await self.memory_retriever.retrieve_relevant(
query=request['content'],
context_budget=self.context_manager.available_memory_tokens()
)
# 3. Build context with current conversation + relevant memories
context = await self.context_manager.build_context(
current_request=request,
relevant_memories=relevant_memories,
conversation_history=self._get_recent_history()
)
# 4. Generate response
response = await self.llm_service.generate(context)
# 5. Store response and update memory
await self.external_memory.store_interaction(response)
await self.external_memory.update_memory_relevance(
request, response, relevant_memories
)
return response
class ExternalMemorySystem:
def __init__(self, config):
self.vector_store = VectorDatabase(config.vector_config)
self.sql_store = SQLDatabase(config.sql_config)
self.cache = MemoryCache(config.cache_config)
async def store_interaction(self, interaction):
"""Store interaction in multiple memory layers"""
interaction_id = str(uuid.uuid4())
timestamp = datetime.now()
# Store in vector database for semantic search
embedding = await self._generate_embedding(interaction['content'])
await self.vector_store.store(
id=interaction_id,
vector=embedding,
metadata={
'timestamp': timestamp,
'type': interaction['type'],
'user_id': interaction.get('user_id'),
'session_id': interaction.get('session_id')
}
)
# Store in SQL for structured queries
await self.sql_store.execute("""
INSERT INTO interactions (id, content, type, user_id, session_id, timestamp)
VALUES (?, ?, ?, ?, ?, ?)
""", (interaction_id, interaction['content'], interaction['type'],
interaction.get('user_id'), interaction.get('session_id'), timestamp))
# Cache recent interactions for fast access
await self.cache.set(
f"recent:{interaction.get('session_id', 'default')}:{interaction_id}",
interaction,
ttl=3600 # 1 hour
)
return interaction_id
async def retrieve_memories(self, query, filters=None, limit=10):
"""Retrieve memories using hybrid search"""
# Semantic search via vector store
query_embedding = await self._generate_embedding(query)
vector_results = await self.vector_store.search(
vector=query_embedding,
filters=filters,
limit=limit*2 # Get more candidates
)
# Structured search via SQL (if filters specified)
sql_results = []
if filters:
sql_conditions = self._build_sql_conditions(filters)
sql_results = await self.sql_store.execute(f"""
SELECT * FROM interactions
WHERE {sql_conditions}
ORDER BY timestamp DESC
LIMIT ?
""", (limit,)).fetchall()
# Merge and rank results
merged_results = self._merge_search_results(vector_results, sql_results, limit)
return merged_results
Hierarchical Context Management
Implement multi-level context management that mirrors human memory patterns:
class HierarchicalContextManager:
def __init__(self, config):
self.levels = {
'immediate': ImmediateContext(config.immediate_tokens), # Last few exchanges
'working': WorkingContext(config.working_tokens), # Current session
'episodic': EpisodicContext(config.episodic_tokens), # Related episodes
'semantic': SemanticContext(config.semantic_tokens) # General knowledge
}
self.total_budget = config.total_context_tokens
self.allocation_strategy = config.allocation_strategy
async def build_hierarchical_context(self, current_request):
"""Build context using hierarchical memory structure"""
# Allocate tokens across memory levels
token_allocation = self._calculate_token_allocation()
context_components = {}
# Level 1: Immediate context (always included)
context_components['immediate'] = await self.levels['immediate'].get_context(
current_request, token_allocation['immediate']
)
# Level 2: Working memory for current session
context_components['working'] = await self.levels['working'].get_context(
current_request, token_allocation['working']
)
# Level 3: Episodic memory from similar situations
context_components['episodic'] = await self.levels['episodic'].get_context(
current_request, token_allocation['episodic']
)
# Level 4: Semantic knowledge relevant to request
context_components['semantic'] = await self.levels['semantic'].get_context(
current_request, token_allocation['semantic']
)
# Combine contexts in logical order
final_context = self._combine_contexts(context_components)
return final_context
def _calculate_token_allocation(self):
"""Calculate optimal token allocation across memory levels"""
if self.allocation_strategy == 'balanced':
return {
'immediate': int(self.total_budget * 0.3),
'working': int(self.total_budget * 0.3),
'episodic': int(self.total_budget * 0.2),
'semantic': int(self.total_budget * 0.2)
}
elif self.allocation_strategy == 'recent_focused':
return {
'immediate': int(self.total_budget * 0.5),
'working': int(self.total_budget * 0.3),
'episodic': int(self.total_budget * 0.1),
'semantic': int(self.total_budget * 0.1)
}
elif self.allocation_strategy == 'knowledge_focused':
return {
'immediate': int(self.total_budget * 0.2),
'working': int(self.total_budget * 0.2),
'episodic': int(self.total_budget * 0.3),
'semantic': int(self.total_budget * 0.3)
}
else:
# Adaptive allocation based on request analysis
return self._adaptive_token_allocation(current_request)
def _adaptive_token_allocation(self, request):
"""Dynamically allocate tokens based on request characteristics"""
request_analysis = self._analyze_request(request)
base_allocation = {
'immediate': 0.25,
'working': 0.25,
'episodic': 0.25,
'semantic': 0.25
}
# Adjust based on request characteristics
if request_analysis['requires_recent_context']:
base_allocation['immediate'] += 0.1
base_allocation['working'] += 0.1
base_allocation['episodic'] -= 0.1
base_allocation['semantic'] -= 0.1
if request_analysis['requires_domain_knowledge']:
base_allocation['semantic'] += 0.15
base_allocation['immediate'] -= 0.05
base_allocation['working'] -= 0.05
base_allocation['episodic'] -= 0.05
if request_analysis['continues_previous_task']:
base_allocation['episodic'] += 0.15
base_allocation['immediate'] -= 0.05
base_allocation['working'] += 0.1
base_allocation['semantic'] -= 0.2
# Convert to token counts
return {
level: int(self.total_budget * allocation)
for level, allocation in base_allocation.items()
}
class EpisodicContext:
def __init__(self, max_tokens):
self.max_tokens = max_tokens
self.episode_store = EpisodeStore()
async def get_context(self, current_request, token_budget):
"""Retrieve relevant episodic context"""
# Find episodes similar to current situation
similar_episodes = await self.episode_store.find_similar(
current_request,
similarity_threshold=0.7
)
# Select most relevant episodes within token budget
selected_episodes = []
current_tokens = 0
for episode in similar_episodes:
episode_summary = await self._summarize_episode(episode)
episode_tokens = self._count_tokens(episode_summary)
if current_tokens + episode_tokens <= token_budget:
selected_episodes.append(episode_summary)
current_tokens += episode_tokens
else:
break
return '\n\n'.join([
"Relevant Past Episodes:",
*selected_episodes
]) if selected_episodes else ""
async def _summarize_episode(self, episode):
"""Create concise episode summary for context inclusion"""
summary_parts = []
if episode.get('goal'):
summary_parts.append(f"Goal: {episode['goal']}")
if episode.get('actions_taken'):
key_actions = episode['actions_taken'][:3] # Top 3 actions
summary_parts.append(f"Actions: {', '.join(key_actions)}")
if episode.get('outcome'):
summary_parts.append(f"Outcome: {episode['outcome']}")
if episode.get('lessons_learned'):
summary_parts.append(f"Lessons: {episode['lessons_learned']}")
return ' | '.join(summary_parts)
Hybrid In-Context and External Memory
Combine the benefits of both approaches:
class HybridMemoryAgent:
def __init__(self, config):
self.context_window_size = config.context_window_size
self.working_memory_ratio = config.working_memory_ratio # e.g., 0.6
self.external_memory = ExternalMemorySystem(config)
# Calculate memory allocations
self.working_memory_tokens = int(self.context_window_size * self.working_memory_ratio)
self.retrieved_memory_tokens = self.context_window_size - self.working_memory_tokens
async def process_request(self, request):
"""Process request using hybrid memory approach"""
# Phase 1: Retrieve relevant external memories
external_memories = await self.external_memory.retrieve_relevant(
query=request['content'],
max_tokens=self.retrieved_memory_tokens,
relevance_threshold=0.6
)
# Phase 2: Build working memory context
working_context = await self._build_working_context(
request,
max_tokens=self.working_memory_tokens
)
# Phase 3: Intelligently merge contexts
merged_context = await self._merge_contexts(
working_context,
external_memories,
request
)
# Phase 4: Generate response with hybrid context
response = await self.llm_service.generate(merged_context)
# Phase 5: Update both memory systems
await self._update_working_memory(request, response)
await self.external_memory.store_interaction({
'request': request,
'response': response,
'context_used': merged_context,
'timestamp': datetime.now()
})
return response
async def _merge_contexts(self, working_context, external_memories, request):
"""Intelligently merge working and external memory contexts"""
# Analyze request to determine optimal merge strategy
request_type = self._classify_request_type(request)
if request_type == 'continuation':
# For task continuation, prioritize working memory
merge_strategy = 'working_priority'
elif request_type == 'new_topic':
# For new topics, balance both memory types
merge_strategy = 'balanced'
elif request_type == 'knowledge_query':
# For knowledge queries, prioritize external memory
merge_strategy = 'external_priority'
else:
merge_strategy = 'balanced'
return await self._apply_merge_strategy(
working_context, external_memories, merge_strategy
)
async def _apply_merge_strategy(self, working_context, external_memories, strategy):
"""Apply specific merge strategy for context combination"""
if strategy == 'working_priority':
# Include full working context, compress external if needed
context_parts = [working_context]
remaining_tokens = self.context_window_size - self._count_tokens(working_context)
compressed_external = await self._compress_memories(
external_memories, remaining_tokens
)
if compressed_external:
context_parts.append(f"Relevant Background:\n{compressed_external}")
elif strategy == 'external_priority':
# Include key external memories, summarize working context
context_parts = []
# Reserve space for external memories
external_token_budget = int(self.context_window_size * 0.7)
working_token_budget = self.context_window_size - external_token_budget
summarized_working = await self._summarize_working_context(
working_context, working_token_budget
)
context_parts.append(summarized_working)
relevant_external = await self._select_top_memories(
external_memories, external_token_budget
)
context_parts.append(f"Relevant Information:\n{relevant_external}")
else: # balanced
# Balance both memory types equally
working_budget = int(self.context_window_size * 0.5)
external_budget = self.context_window_size - working_budget
balanced_working = await self._fit_to_budget(working_context, working_budget)
balanced_external = await self._select_top_memories(external_memories, external_budget)
context_parts = [
balanced_working,
f"Additional Context:\n{balanced_external}" if balanced_external else ""
]
return '\n\n'.join([part for part in context_parts if part.strip()])
For understanding how these memory solutions prevent production failures, see our comprehensive guide on context window related failures and their prevention.
Optimization and Performance Tuning
Token Usage Monitoring and Optimization
Implement comprehensive token monitoring for optimization:
class TokenUsageMonitor:
def __init__(self):
self.usage_history = []
self.optimization_rules = []
self.alert_thresholds = {
'context_utilization': 0.85, # Alert at 85% context usage
'token_waste': 0.15, # Alert if >15% tokens unused
'compression_ratio': 0.3 # Alert if compression <30%
}
async def monitor_request(self, request, context, response):
"""Monitor token usage for a single request"""
usage_data = {
'timestamp': datetime.now(),
'request_tokens': self._count_tokens(request.get('content', '')),
'context_tokens': self._count_tokens(context),
'response_tokens': self._count_tokens(response.get('content', '')),
'total_tokens': 0,
'context_utilization': 0,
'efficiency_metrics': {}
}
usage_data['total_tokens'] = (
usage_data['request_tokens'] +
usage_data['context_tokens'] +
usage_data['response_tokens']
)
# Calculate utilization metrics
max_context = request.get('max_context_tokens', 4000)
usage_data['context_utilization'] = usage_data['context_tokens'] / max_context
# Analyze efficiency
usage_data['efficiency_metrics'] = await self._analyze_efficiency(
request, context, response, usage_data
)
self.usage_history.append(usage_data)
# Check for optimization opportunities
await self._check_optimization_triggers(usage_data)
return usage_data
async def _analyze_efficiency(self, request, context, response, usage_data):
"""Analyze token usage efficiency"""
metrics = {}
# Context relevance analysis
context_relevance = await self._measure_context_relevance(context, request)
metrics['context_relevance'] = context_relevance
# Response quality vs token usage
response_quality = await self._estimate_response_quality(response)
metrics['quality_per_token'] = response_quality / max(usage_data['total_tokens'], 1)
# Redundancy detection
redundancy_ratio = await self._detect_context_redundancy(context)
metrics['redundancy_ratio'] = redundancy_ratio
# Compression opportunity
potential_compression = await self._estimate_compression_potential(context)
metrics['compression_potential'] = potential_compression
return metrics
async def _check_optimization_triggers(self, usage_data):
"""Check if optimization is needed based on usage patterns"""
# High context utilization
if usage_data['context_utilization'] > self.alert_thresholds['context_utilization']:
await self._trigger_optimization('high_context_usage', usage_data)
# Low context relevance
if usage_data['efficiency_metrics']['context_relevance'] < 0.6:
await self._trigger_optimization('low_context_relevance', usage_data)
# High redundancy
if usage_data['efficiency_metrics']['redundancy_ratio'] > 0.4:
await self._trigger_optimization('high_redundancy', usage_data)
async def get_optimization_recommendations(self, time_window_hours=24):
"""Generate optimization recommendations based on usage history"""
recent_usage = [
usage for usage in self.usage_history
if (datetime.now() - usage['timestamp']).total_seconds() < time_window_hours * 3600
]
if not recent_usage:
return []
recommendations = []
# Analyze patterns
avg_utilization = sum(u['context_utilization'] for u in recent_usage) / len(recent_usage)
avg_redundancy = sum(u['efficiency_metrics']['redundancy_ratio'] for u in recent_usage) / len(recent_usage)
avg_relevance = sum(u['efficiency_metrics']['context_relevance'] for u in recent_usage) / len(recent_usage)
# Generate specific recommendations
if avg_utilization > 0.8:
recommendations.append({
'type': 'context_compression',
'priority': 'high',
'description': f'Average context utilization is {avg_utilization:.1%}. Implement aggressive context compression.',
'estimated_savings': f'{(avg_utilization - 0.7) * 100:.1f}% token reduction'
})
if avg_redundancy > 0.3:
recommendations.append({
'type': 'redundancy_removal',
'priority': 'medium',
'description': f'High redundancy detected ({avg_redundancy:.1%}). Implement deduplication.',
'estimated_savings': f'{avg_redundancy * 100:.1f}% token reduction'
})
if avg_relevance < 0.7:
recommendations.append({
'type': 'relevance_filtering',
'priority': 'high',
'description': f'Low context relevance ({avg_relevance:.1%}). Improve context selection.',
'estimated_improvement': 'Better response quality with fewer tokens'
})
return recommendations
class ContextOptimizer:
def __init__(self, token_monitor):
self.token_monitor = token_monitor
self.optimization_strategies = {
'compression': self._apply_compression,
'deduplication': self._remove_duplicates,
'relevance_filtering': self._filter_by_relevance,
'smart_truncation': self._smart_truncate
}
async def optimize_context(self, context, target_reduction=0.3):
"""Apply multiple optimization strategies to reduce context size"""
original_tokens = self._count_tokens(context)
target_tokens = int(original_tokens * (1 - target_reduction))
optimized_context = context
applied_optimizations = []
# Apply optimizations in order of effectiveness
for strategy_name, strategy_func in self.optimization_strategies.items():
if self._count_tokens(optimized_context) <= target_tokens:
break
try:
new_context = await strategy_func(optimized_context, target_tokens)
if self._count_tokens(new_context) < self._count_tokens(optimized_context):
applied_optimizations.append(strategy_name)
optimized_context = new_context
except Exception as e:
logging.warning(f"Optimization strategy {strategy_name} failed: {e}")
continue
final_tokens = self._count_tokens(optimized_context)
reduction_achieved = (original_tokens - final_tokens) / original_tokens
return {
'optimized_context': optimized_context,
'original_tokens': original_tokens,
'final_tokens': final_tokens,
'reduction_achieved': reduction_achieved,
'applied_optimizations': applied_optimizations
}
async def _apply_compression(self, context, target_tokens):
"""Apply intelligent compression to context"""
compressor = ContextCompressor()
return await compressor.compress_context_segment(
context, 'general', target_reduction=(target_tokens / self._count_tokens(context))
)
async def _remove_duplicates(self, context, target_tokens):
"""Remove duplicate or near-duplicate content"""
sentences = self._split_into_sentences(context)
unique_sentences = []
seen_embeddings = []
for sentence in sentences:
sentence_embedding = await self._get_sentence_embedding(sentence)
# Check similarity with existing sentences
is_duplicate = False
for existing_embedding in seen_embeddings:
similarity = cosine_similarity([sentence_embedding], [existing_embedding])[0][0]
if similarity > 0.9: # High similarity threshold
is_duplicate = True
break
if not is_duplicate:
unique_sentences.append(sentence)
seen_embeddings.append(sentence_embedding)
# Stop if we've reached target
if self._count_tokens(' '.join(unique_sentences)) >= target_tokens:
break
return ' '.join(unique_sentences)
async def _filter_by_relevance(self, context, target_tokens):
"""Keep only the most relevant parts of context"""
sentences = self._split_into_sentences(context)
# This would need a query or topic to determine relevance
# For now, use a simple heuristic based on key terms
scored_sentences = []
for sentence in sentences:
relevance_score = self._calculate_relevance_score(sentence)
scored_sentences.append((sentence, relevance_score))
# Sort by relevance and select top sentences within token budget
scored_sentences.sort(key=lambda x: x[1], reverse=True)
selected_sentences = []
current_tokens = 0
for sentence, score in scored_sentences:
sentence_tokens = self._count_tokens(sentence)
if current_tokens + sentence_tokens <= target_tokens:
selected_sentences.append(sentence)
current_tokens += sentence_tokens
else:
break
return ' '.join(selected_sentences)
Context-Aware Tool Selection
Optimize tool usage based on available context space:
class ContextAwareToolManager:
def __init__(self, available_tools, context_budget_tracker):
self.tools = available_tools
self.budget_tracker = context_budget_tracker
self.tool_efficiency_history = {}
async def select_optimal_tools(self, task_requirements, available_context_tokens):
"""Select tools that maximize capability within context constraints"""
# Analyze each tool's context requirements and capabilities
tool_analysis = []
for tool_name, tool in self.tools.items():
analysis = await self._analyze_tool_context_requirements(tool, task_requirements)
analysis['name'] = tool_name
analysis['tool'] = tool
tool_analysis.append(analysis)
# Score tools based on capability vs context cost
scored_tools = self._score_tools_for_context(tool_analysis, available_context_tokens)
# Select optimal tool combination
selected_tools = self._select_tool_combination(scored_tools, available_context_tokens)
return selected_tools
async def _analyze_tool_context_requirements(self, tool, task_requirements):
"""Analyze how much context a tool typically consumes"""
# Get historical usage data
tool_name = tool.__class__.__name__
historical_usage = self.tool_efficiency_history.get(tool_name, {})
# Estimate context consumption
estimated_input_tokens = await self._estimate_input_tokens(tool, task_requirements)
estimated_output_tokens = await self._estimate_output_tokens(tool, task_requirements)
# Calculate efficiency metrics
efficiency_score = historical_usage.get('success_rate', 0.5) * historical_usage.get('avg_quality', 0.5)
context_efficiency = efficiency_score / max(estimated_input_tokens + estimated_output_tokens, 1)
return {
'estimated_input_tokens': estimated_input_tokens,
'estimated_output_tokens': estimated_output_tokens,
'total_estimated_tokens': estimated_input_tokens + estimated_output_tokens,
'efficiency_score': efficiency_score,
'context_efficiency': context_efficiency,
'historical_data': historical_usage
}
def _score_tools_for_context(self, tool_analysis, available_tokens):
"""Score tools based on their value within context constraints"""
scored_tools = []
for analysis in tool_analysis:
# Base capability score
capability_score = self._calculate_capability_score(analysis, available_tokens)
# Context cost penalty
context_cost = analysis['total_estimated_tokens']
cost_penalty = min(context_cost / available_tokens, 1.0) # Normalized cost
# Final score combines capability and efficiency
final_score = (
capability_score * (1 - cost_penalty) * analysis['context_efficiency']
)
scored_tools.append({
**analysis,
'capability_score': capability_score,
'cost_penalty': cost_penalty,
'final_score': final_score
})
return sorted(scored_tools, key=lambda x: x['final_score'], reverse=True)
def _select_tool_combination(self, scored_tools, available_tokens):
"""Select combination of tools that maximizes value within constraints"""
# Use greedy algorithm for tool selection
selected_tools = []
remaining_tokens = available_tokens
total_value = 0
for tool_info in scored_tools:
tool_cost = tool_info['total_estimated_tokens']
if tool_cost <= remaining_tokens:
selected_tools.append(tool_info['tool'])
remaining_tokens -= tool_cost
total_value += tool_info['final_score']
# Reserve some tokens for response generation
if remaining_tokens < available_tokens * 0.2:
break
return {
'selected_tools': selected_tools,
'estimated_total_tokens': available_tokens - remaining_tokens,
'estimated_value': total_value,
'remaining_tokens': remaining_tokens
}
async def execute_with_context_monitoring(self, tool, inputs, context_budget):
"""Execute tool while monitoring context usage"""
start_time = time.time()
try:
# Monitor context before execution
pre_execution_context = await self.budget_tracker.get_current_usage()
# Execute tool
result = await tool.execute(inputs)
# Monitor context after execution
post_execution_context = await self.budget_tracker.get_current_usage()
# Calculate actual usage
actual_tokens_used = post_execution_context['tokens'] - pre_execution_context['tokens']
execution_time = time.time() - start_time
# Update efficiency tracking
tool_name = tool.__class__.__name__
await self._update_tool_efficiency(tool_name, {
'tokens_used': actual_tokens_used,
'execution_time': execution_time,
'success': True,
'result_quality': await self._assess_result_quality(result)
})
return result
except Exception as e:
# Record failed execution
tool_name = tool.__class__.__name__
await self._update_tool_efficiency(tool_name, {
'tokens_used': 0,
'execution_time': time.time() - start_time,
'success': False,
'error': str(e)
})
raise
Dynamic Context Sizing Based on Task Complexity
Implement adaptive context sizing:
class DynamicContextSizer:
def __init__(self, base_context_size=4000, max_context_size=16000):
self.base_context_size = base_context_size
self.max_context_size = max_context_size
self.sizing_models = {
'task_complexity': self._size_for_task_complexity,
'conversation_length': self._size_for_conversation,
'tool_requirements': self._size_for_tools,
'user_preference': self._size_for_user_preference
}
async def calculate_optimal_context_size(self, request_analysis, conversation_state):
"""Calculate optimal context size based on multiple factors"""
# Get sizing recommendations from different models
size_recommendations = {}
for model_name, model_func in self.sizing_models.items():
try:
recommendation = await model_func(request_analysis, conversation_state)
size_recommendations[model_name] = recommendation
except Exception as e:
logging.warning(f"Sizing model {model_name} failed: {e}")
size_recommendations[model_name] = self.base_context_size
# Combine recommendations with weighted average
weights = {
'task_complexity': 0.3,
'conversation_length': 0.25,
'tool_requirements': 0.3,
'user_preference': 0.15
}
weighted_size = sum(
size_recommendations[model] * weights.get(model, 0.25)
for model in size_recommendations
)
# Apply constraints
final_size = max(
self.base_context_size,
min(weighted_size, self.max_context_size)
)
# Round to reasonable increments
final_size = int(final_size / 1000) * 1000 # Round to nearest 1000
return {
'recommended_size': final_size,
'size_breakdown': size_recommendations,
'reasoning': self._explain_sizing_decision(size_recommendations, final_size)
}
async def _size_for_task_complexity(self, request_analysis, conversation_state):
"""Size context based on task complexity analysis"""
complexity_indicators = {
'multi_step': request_analysis.get('requires_multi_step', False),
'data_analysis': request_analysis.get('involves_data_analysis', False),
'long_term_planning': request_analysis.get('requires_planning', False),
'complex_reasoning': request_analysis.get('complex_reasoning_required', False),
'multiple_domains': request_analysis.get('crosses_domains', False)
}
base_size = self.base_context_size
complexity_multiplier = 1.0
# Increase size for each complexity factor
for indicator, present in complexity_indicators.items():
if present:
if indicator == 'multi_step':
complexity_multiplier *= 1.3
elif indicator == 'data_analysis':
complexity_multiplier *= 1.4
elif indicator == 'long_term_planning':
complexity_multiplier *= 1.2
elif indicator == 'complex_reasoning':
complexity_multiplier *= 1.25
elif indicator == 'multiple_domains':
complexity_multiplier *= 1.15
return int(base_size * complexity_multiplier)
async def _size_for_conversation(self, request_analysis, conversation_state):
"""Size context based on conversation characteristics"""
conversation_length = len(conversation_state.get('message_history', []))
context_importance = conversation_state.get('context_importance_score', 0.5)
if conversation_length < 5:
# Short conversations need less context
return int(self.base_context_size * 0.8)
elif conversation_length < 15:
# Medium conversations use base context
return self.base_context_size
else:
# Long conversations need more context for coherence
length_multiplier = min(1 + (conversation_length - 15) * 0.1, 2.0)
importance_multiplier = 0.5 + context_importance
return int(self.base_context_size * length_multiplier * importance_multiplier)
async def _size_for_tools(self, request_analysis, conversation_state):
"""Size context based on tool usage requirements"""
required_tools = request_analysis.get('required_tools', [])
tool_context_requirements = {}
# Analyze context requirements for each tool
total_tool_overhead = 0
for tool_name in required_tools:
# Estimate context overhead for this tool
tool_overhead = await self._estimate_tool_context_overhead(tool_name)
tool_context_requirements[tool_name] = tool_overhead
total_tool_overhead += tool_overhead
# Base context + tool overhead + buffer
recommended_size = self.base_context_size + total_tool_overhead + 1000
return min(recommended_size, self.max_context_size)
async def _estimate_tool_context_overhead(self, tool_name):
"""Estimate context overhead for specific tools"""
# These would be based on empirical measurements
tool_overheads = {
'web_search': 2000, # Search results can be large
'data_analysis': 3000, # Data summaries and charts
'code_execution': 1500, # Code + output
'file_operations': 1000, # File contents
'api_calls': 800, # API responses
'calculator': 200 # Minimal overhead
}
return tool_overheads.get(tool_name, 1000) # Default overhead
Frequently Asked Questions
Q: What happens when my agent hits the context window limit?
A: Most LLM APIs silently truncate the context, usually removing the oldest messages first. This can cause your agent to lose critical information, forget previous tool outputs, or repeat actions. The agent continues running but may produce incorrect or inconsistent results.
Q: How do I know if my agent is experiencing context window issues?
A: Monitor these warning signs: agents asking questions you’ve already answered, forgetting previous tool outputs, repeating the same actions, degraded response quality in longer sessions, or responses that ignore earlier conversation context.
Q: Should I use the largest available context window for better performance?
A: Not necessarily. Larger context windows cost more, process slower, and don’t guarantee better performance. Many tasks work better with smaller, well-managed context windows plus external memory systems. Only use large context windows when you specifically need to process large documents or maintain very long conversation context.
Q: What’s the difference between context compression and external memory?
A: Context compression reduces the size of information within the context window through summarization or filtering. External memory stores information outside the context window entirely, retrieving relevant pieces when needed. Use compression for recent, important context and external memory for historical information.
Q: How do I implement external memory without making my agent slow?
A: Use fast vector databases for semantic search, implement caching for frequently accessed memories, preload relevant context based on conversation patterns, and use parallel retrieval while the agent processes other tasks. Well-implemented external memory should add <200ms to response times.
Q: Can I split long tasks across multiple context windows?
A: Yes, but carefully. Break tasks into discrete steps, persist intermediate results in external memory, design clear handoff points between context windows, and implement recovery mechanisms for failed steps. Each step should be mostly self-contained.
Q: What’s the optimal ratio of working memory to retrieved memory in hybrid approaches?
A: Start with 60% working memory (immediate conversation) and 40% retrieved memory (external context). Adjust based on your use case: conversational agents need more working memory, while research/analysis agents benefit from more retrieved memory.
For comprehensive memory architecture implementation that supports these context window solutions, see our guide on external memory alternatives that prevent context window limitations. To understand how context window issues contribute to broader production failures, review our analysis of solving context overflow and memory loss issues.