Python
Building a basic RAG Agent with GoodMem
View source on Github Run in Codespace
Building a Basic RAG Agent with GoodMem
Overview
This tutorial will guide you through building a complete Retrieval-Augmented Generation (RAG) system using GoodMem's vector memory capabilities. By the end of this guide, you'll have a functional Q&A system that can:
- 🔍 Semantically search through your documents
- 📝 Generate contextual answers using retrieved information
- 🏗️ Scale to handle large document collections
What is RAG?
RAG combines the power of retrieval (finding relevant information) with generation (creating natural language responses). This approach allows AI systems to provide accurate, context-aware answers by:
- Retrieving relevant documents from a knowledge base
- Augmenting the query with this context
- Generating a comprehensive answer using both the query and retrieved information
Why GoodMem for RAG?
GoodMem provides enterprise-grade vector storage with:
- Multiple embedder support for optimal retrieval accuracy
- Streaming APIs for real-time responses
- Advanced post-processing with reranking and summarization
- Scalable architecture for production workloads
Prerequisites
Before starting, ensure you have:
- ✅ GoodMem server running (install with:
curl -s https://get.goodmem.ai | bash) - ✅ Python 3.9+ installed
- ✅ API key for your GoodMem instance
Installation & Setup
First, let's install the required packages:
# Install required packages
!pip install goodmem-client openai python-dotenvAuthentication & Configuration
Let's configure our GoodMem client and test the connection:
import os
import json
import time
from typing import List, Dict, Optional
from dotenv import load_dotenv
# Load environment variables (optional)
load_dotenv()
# Configuration - Update these values for your setup
GOODMEM_HOST = os.getenv('GOODMEM_HOST', 'http://localhost:8080')
GOODMEM_API_KEY = os.getenv('GOODMEM_API_KEY', 'your-api-key-here')
print(f"GoodMem Host: {GOODMEM_HOST}")
print(f"API Key configured: {'Yes' if GOODMEM_API_KEY != 'your-api-key-here' else 'No - Please update'}")# Import GoodMem client libraries
from goodmem_client.api import SpacesApi, MemoriesApi, EmbeddersApi
from goodmem_client.configuration import Configuration
from goodmem_client.api_client import ApiClient
from goodmem_client.streaming import MemoryStreamClient
from goodmem_client.exceptions import ApiException
# Configure the API client
def create_goodmem_clients():
"""Create and configure GoodMem API clients."""
configuration = Configuration()
configuration.host = GOODMEM_HOST
# Create API client instance
api_client = ApiClient(configuration=configuration)
# Add authentication header
api_client.default_headers["x-api-key"] = GOODMEM_API_KEY
# Create API instances
spaces_api = SpacesApi(api_client=api_client)
memories_api = MemoriesApi(api_client=api_client)
embedders_api = EmbeddersApi(api_client=api_client)
stream_client = MemoryStreamClient(api_client)
return spaces_api, memories_api, embedders_api, stream_client
# Test connection
try:
spaces_api, memories_api, embedders_api, stream_client = create_goodmem_clients()
# Test the connection by listing spaces
response = spaces_api.list_spaces()
print(f"✅ Successfully connected to GoodMem!")
print(f" Found {len(getattr(response, 'spaces', []))} existing spaces")
except ApiException as e:
print(f"❌ Error connecting to GoodMem: {e}")
print(" Please check your API key and host configuration")
except Exception as e:
print(f"❌ Unexpected error: {e}")Creating Your First Space
In GoodMem, a Space is a logical container for organizing memories. Each space has:
- Associated embedders for generating vector representations
- Access controls (public/private)
- Metadata labels for organization
Let's create a space for our RAG demo:
# First, let's see what embedders are available
try:
embedders_response = embedders_api.list_embedders()
available_embedders = getattr(embedders_response, 'embedders', [])
print(f"📋 Available Embedders ({len(available_embedders)}):")
for i, embedder in enumerate(available_embedders):
print(f" {i+1}. {embedder.display_name} - {embedder.provider_type}")
print(f" Model: {getattr(embedder, 'model_identifier', 'N/A')}")
print(f" ID: {embedder.embedder_id}")
print()
if available_embedders:
default_embedder = available_embedders[0]
print(f"🎯 Using embedder: {default_embedder.display_name}")
else:
print("⚠️ No embedders found. You may need to configure an embedder first.")
print(" Refer to the documentation: See https://docs.goodmem.ai/docs/reference/cli/goodmem_embedder_create/")
except ApiException as e:
print(f"❌ Error listing embedders: {e}")
default_embedder = Nonefrom goodmem_client.models import SpaceCreationRequest, SpaceEmbedderConfig
# Create a space for our RAG demo
SPACE_NAME = "RAG Demo Knowledge Base"
# Define chunking configuration that we'll reuse throughout the tutorial
# Save this configuration to ensure consistency across all memory creation requests
DEMO_CHUNKING_CONFIG = {
"recursive": {
"chunk_size": 256, # 256 character chunks for optimal RAG performance
"chunk_overlap": 25, # 25 character overlap between chunks
"separators": ["\n\n", "\n", ". ", " ", ""], # Hierarchical splitting
"keep_strategy": "KEEP_END", # Append separator to preceding chunk
"separator_is_regex": False, # Plain text separators
"length_measurement": "CHARACTER_COUNT" # Measure by characters
}
}
def create_demo_space():
"""Create a space for our RAG demonstration."""
try:
# Check if space already exists
existing_spaces = spaces_api.list_spaces()
for space in getattr(existing_spaces, 'spaces', []):
if space.name == SPACE_NAME:
print(f"📁 Space '{SPACE_NAME}' already exists")
print(f" Space ID: {space.space_id}")
print(" To remove existing space, see https://docs.goodmem.ai/docs/reference/cli/goodmem_space_delete/")
return space
# Configure space embedders if we have available embedders
space_embedders = []
if available_embedders:
space_embedders = [
SpaceEmbedderConfig(
embedder_id=default_embedder.embedder_id,
default_retrieval_weight=1.0
)
]
# Create space request with our saved chunking configuration
create_request = SpaceCreationRequest(
name=SPACE_NAME,
labels={
"purpose": "rag-demo",
"environment": "tutorial",
"content-type": "documentation"
},
space_embedders=space_embedders,
public_read=False, # Private space
default_chunking_config=DEMO_CHUNKING_CONFIG # Use our saved config
)
# Create the space
new_space = spaces_api.create_space(create_request)
print(f"✅ Created space: {new_space.name}")
print(f" Space ID: {new_space.space_id}")
print(f" Embedders: {len(new_space.space_embedders)}")
print(f" Labels: {dict(new_space.labels)}")
print(f" Chunking Config Saved: {DEMO_CHUNKING_CONFIG['recursive']['chunk_size']} chars with {DEMO_CHUNKING_CONFIG['recursive']['chunk_overlap']} overlap")
print(f" 💡 This chunking config will be reused for all memory creation!")
return new_space
except ApiException as e:
print(f"❌ Error creating space: {e}")
return None
# Create our demo space
demo_space = create_demo_space()# Verify our space configuration
if demo_space:
try:
# Get detailed space information
space_details = spaces_api.get_space(demo_space.space_id)
print(f"🔍 Space Configuration:")
print(f" Name: {space_details.name}")
print(f" Owner ID: {space_details.owner_id}")
print(f" Public Read: {space_details.public_read}")
print(f" Created: {space_details.created_at}")
print(f" Labels: {dict(space_details.labels)}")
print(f"\n🤖 Associated Embedders:")
for embedder_assoc in space_details.space_embedders:
print(f" Embedder ID: {embedder_assoc.embedder_id}")
print(f" Retrieval Weight: {embedder_assoc.default_retrieval_weight}")
except ApiException as e:
print(f"❌ Error getting space details: {e}")
else:
print("⚠️ No space available for the demo")Adding Documents to Memory
Now let's add some sample documents to our space. GoodMem will automatically:
- Chunk the documents into optimal sizes
- Generate embeddings using the configured embedders
- Index the content for fast retrieval
We'll use sample company documents that represent common business use cases:
import os
# Load our sample documents
def load_sample_documents():
"""Load sample documents from the sample_documents directory."""
documents = []
sample_dir = "sample_documents"
# Document files and their descriptions
doc_files = {
"company_handbook.txt": "Employee handbook with policies and procedures",
"technical_documentation.txt": "API documentation and technical guides",
"product_faq.txt": "Frequently asked questions about products",
"security_policy.txt": "Information security policies and procedures"
}
for filename, description in doc_files.items():
filepath = os.path.join(sample_dir, filename)
if os.path.exists(filepath):
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
documents.append({
'filename': filename,
'description': description,
'content': content
})
print(f"📄 Loaded: {filename} ({len(content):,} characters)")
else:
print(f"⚠️ File not found: {filepath}")
return documents
# Load the documents
sample_docs = load_sample_documents()
print(f"\n📚 Total documents loaded: {len(sample_docs)}")# Create the first memory individually to demonstrate single memory creation
from goodmem_client.models import MemoryCreationRequest
def create_single_memory(space_id: str, document: dict) -> dict:
"""Create a single memory in GoodMem to demonstrate individual memory creation."""
try:
# Create memory request
memory_request = MemoryCreationRequest(
space_id=space_id,
original_content=document['content'],
content_type="text/plain",
metadata={
"filename": document['filename'],
"description": document['description'],
"source": "sample_documents",
"document_type": document['filename'].split('_')[0],
"ingestion_method": "single" # Track how this was ingested
},
chunkingConfig=DEMO_CHUNKING_CONFIG
)
# Create the memory
memory = memories_api.create_memory(memory_request)
print(f"✅ Created single memory: {document['filename']}")
print(f" Memory ID: {memory.memory_id}")
print(f" Status: {memory.processing_status}")
print(f" Content Length: {len(document['content'])} characters")
print()
return memory
except ApiException as e:
print(f"❌ Error creating memory for {document['filename']}: {e}")
return None
except Exception as e:
print(f"❌ Unexpected error with {document['filename']}: {e}")
return None
if demo_space and sample_docs:
# Create the first document using single memory creation
first_doc = sample_docs[0]
print(f"📝 Creating first document using CreateMemory API:")
print(f" Document: {first_doc['filename']}")
print(f" Method: Individual memory creation")
print()
single_memory = create_single_memory(demo_space.space_id, first_doc)
if single_memory:
print(f"🎯 Single memory creation completed successfully!")
else:
print(f"⚠️ Single memory creation failed")
else:
print("⚠️ Cannot create memory: missing space or documents")
single_memory = None# Demonstrate retrieving a memory by ID using get_memory
import base64
if single_memory:
try:
print(f"📖 Retrieving memory details using get_memory API:")
print(f" Memory ID: {single_memory.memory_id}")
print()
# Retrieve the memory without content
retrieved_memory = memories_api.get_memory(
id=single_memory.memory_id,
include_content=False
)
print(f"✅ Successfully retrieved memory:")
print(f" Memory ID: {retrieved_memory.memory_id}")
print(f" Space ID: {retrieved_memory.space_id}")
print(f" Status: {retrieved_memory.processing_status}")
print(f" Content Type: {retrieved_memory.content_type}")
print(f" Created At: {retrieved_memory.created_at}")
print(f" Updated At: {retrieved_memory.updated_at}")
if retrieved_memory.metadata:
print(f"\n 📋 Metadata:")
for key, value in retrieved_memory.metadata.items():
print(f" {key}: {value}")
# Now retrieve with content included
print(f"\n📖 Retrieving memory with content:")
retrieved_with_content = memories_api.get_memory(
id=single_memory.memory_id,
include_content=True
)
if retrieved_with_content.original_content:
# Decode the base64 encoded content
decoded_content = base64.b64decode(retrieved_with_content.original_content).decode('utf-8')
print(f"✅ Content retrieved and decoded:")
print(f" Content length: {len(decoded_content)} characters")
print(f" First 200 chars: {decoded_content[:200]}...")
else:
print(f"⚠️ No content available")
except ApiException as e:
print(f"❌ Error retrieving memory: {e}")
except Exception as e:
print(f"❌ Unexpected error: {e}")
else:
print("⚠️ No memory available to retrieve")# Create the remaining documents using batch memory creation
from goodmem_client.models import BatchMemoryCreationRequest
def create_batch_memories(space_id: str, documents: List[dict]) -> List[dict]:
"""Create multiple memories in GoodMem using batch creation for efficiency."""
# Prepare batch memory requests using our saved chunking configuration
memory_requests = []
for i, doc in enumerate(documents):
# Create memory request with our saved chunking configuration
memory_request = MemoryCreationRequest(
space_id=space_id,
original_content=doc['content'],
content_type="text/plain",
chunking_config=DEMO_CHUNKING_CONFIG, # Reuse saved chunking configuration
metadata={
"filename": doc['filename'],
"description": doc['description'],
"source": "sample_documents",
"document_type": doc['filename'].split('_')[0],
"ingestion_method": "batch"
}
)
memory_requests.append(memory_request)
try:
# Create batch request
batch_request = BatchMemoryCreationRequest(
requests=memory_requests
)
print(f"📦 Creating {len(memory_requests)} memories using BatchCreateMemory API:")
# Execute batch creation - this returns None on success
memories_api.batch_create_memory(batch_request)
except ApiException as e:
print(f"❌ Error during batch creation: {e}")
except Exception as e:
print(f"❌ Unexpected error during batch creation: {e}")
if demo_space and sample_docs and len(sample_docs) > 1:
# Create the remaining documents (skip the first one we already created)
remaining_docs = sample_docs[1:] # All documents except the first
create_batch_memories(demo_space.space_id, remaining_docs)
print(f"\n📋 Total Memory Creation Summary:")
print(f" 📄 Single CreateMemory: 1 document")
print(f" 📦 Batch CreateMemory: {len(remaining_docs)} documents submitted")
print(f" ⏳ Check processing status in the next cell")
else:
print("⚠️ Cannot create batch memories: insufficient documents or missing space")# List all memories in our space to verify they're ready
if demo_space:
try:
memories_response = memories_api.list_memories(space_id=demo_space.space_id)
memories = getattr(memories_response, 'memories', [])
print(f"📚 Memories in space '{demo_space.name}':")
print(f" Total memories: {len(memories)}")
print()
for i, memory in enumerate(memories, 1):
metadata = memory.metadata or {}
filename = metadata.get('filename', 'Unknown')
description = metadata.get('description', 'No description')
print(f" {i}. {filename}")
print(f" Status: {memory.processing_status}")
print(f" Description: {description}")
print(f" Created: {memory.created_at}")
print()
except ApiException as e:
print(f"❌ Error listing memories: {e}")# Monitor processing status for all created memories
def wait_for_processing_completion(space_id: str, max_wait_seconds: int = 120):
"""Wait for memories to finish processing."""
print("⏳ Waiting for document processing to complete...")
print(" 💡 Note: Batch memories are processed asynchronously, so we check by listing all memories in the space")
print()
start_time = time.time()
while time.time() - start_time < max_wait_seconds:
try:
# List memories in our space
memories_response = memories_api.list_memories(space_id=space_id)
memories = getattr(memories_response, 'memories', [])
# Check processing status
status_counts = {}
for memory in memories:
status = memory.processing_status
status_counts[status] = status_counts.get(status, 0) + 1
print(f"📊 Processing status: {dict(status_counts)} (Total: {len(memories)} memories)")
# Check if all are completed
if all(memory.processing_status == 'COMPLETED' for memory in memories):
print("✅ All documents processed successfully!")
return True
# Check for any failures
failed_count = status_counts.get('FAILED', 0)
if failed_count > 0:
print(f"❌ {failed_count} memories failed processing")
return False
time.sleep(5) # Wait 5 seconds before checking again
except ApiException as e:
print(f"❌ Error checking processing status: {e}")
return False
print(f"⏰ Timeout waiting for processing (waited {max_wait_seconds}s)")
return False
if demo_space:
# Wait for processing to complete for all memories (single + batch)
# Since batch_create_memory returns None, we monitor by listing all memories
processing_complete = wait_for_processing_completion(demo_space.space_id)
if processing_complete:
print("🎉 Ready for semantic search and retrieval!")
print(f"📈 Batch API benefit: Multiple documents submitted in a single API call")
print(f"🔧 Consistent chunking: All memories use DEMO_CHUNKING_CONFIG")
else:
print("⚠️ Some documents may still be processing. You can continue with the tutorial.")
else:
print("⚠️ Skipping processing check - no space available")
processing_complete = FalseSemantic Search & Retrieval
Now comes the exciting part! Let's perform semantic search using GoodMem's streaming API. This will:
- Find relevant chunks based on semantic similarity
- Stream results in real-time
- Include relevance scores for ranking
- Return structured data for easy processing
def semantic_search(query: str, space_id: str, max_results: int = 5) -> List[dict]:
"""
Perform semantic search using GoodMem's streaming API.
Args:
query: The search query
space_id: ID of the space to search
max_results: Maximum number of results to return
Returns:
List of search results with chunks and metadata
"""
try:
print(f"🔍 Searching for: '{query}'")
print(f"📁 Space ID: {space_id}")
print(f"📊 Max results: {max_results}")
print("-" * 50)
# Perform streaming search
event_count = 0
retrieved_chunks = []
for event in stream_client.retrieve_memory_stream(
message=query,
space_ids=[space_id],
requested_size=max_results,
fetch_memory=True,
fetch_memory_content=False, # We don't need full content for this demo
format="ndjson"
):
event_count += 1
if event.retrieved_item and event.retrieved_item.chunk:
chunk_info = event.retrieved_item.chunk
chunk_data = chunk_info.chunk
retrieved_chunks.append({
'chunk_text': chunk_data.get('chunkText', ''),
'relevance_score': chunk_info.relevance_score,
'memory_index': chunk_info.memory_index,
'result_set_id': chunk_info.result_set_id,
'chunk_sequence': chunk_data.get('chunkSequenceNumber', 0)
})
print(f"📄 Chunk {len(retrieved_chunks)}:")
print(f" Relevance: {chunk_info.relevance_score:.3f}")
print(f" Text: {chunk_data.get('chunkText', '')}...")
print()
print(f"✅ Search completed: {len(retrieved_chunks)} chunks found, {event_count} events processed")
return retrieved_chunks
except Exception as e:
print(f"❌ Error during search: {e}")
return []
# Test semantic search with a sample query
if demo_space:
sample_query = "What is the vacation policy for employees?"
search_results = semantic_search(sample_query, demo_space.space_id)
else:
print("⚠️ No space available for search")
search_results = []# Let's try a few different queries to see how semantic search works
def test_multiple_queries(space_id: str):
"""Test semantic search with different types of queries."""
test_queries = [
"How do I reset my password?",
"What are the security requirements for remote work?",
"API authentication and rate limits",
"Employee benefits and health insurance",
"How much does the software cost?"
]
for i, query in enumerate(test_queries, 1):
print(f"\n🔍 Test Query {i}: {query}")
print("=" * 60)
semantic_search(query, space_id, max_results=3)
print("\n" + "-" * 60)
if demo_space:
test_multiple_queries(demo_space.space_id)
else:
print("⚠️ No space available for testing multiple queries")Next Steps & Advanced Features
Congratulations! 🎉 You've successfully built a semantic search system using GoodMem. Here's what you've accomplished:
✅ What You Built
- Document ingestion pipeline with automatic chunking and embedding
- Semantic search system with relevance scoring
- Simple Q&A system using GoodMem's vector capabilities
🚀 Next Steps for Advanced Implementation
1. Multiple Embedders & Reranking
- Coming Soon
2. Integration with Popular Frameworks
- Coming Soon
3. Advanced Post-Processing
- Coming Soon
📚 Additional Resources
GoodMem Documentation: