Streaming Client
Custom streaming client for real-time memory retrieval
Overview
The StreamingClient provides custom streaming functionality for real-time memory retrieval using Server-Sent Events (SSE) or NDJSON format.
Features
- Real-time memory retrieval with streaming
- Support for SSE and NDJSON formats
- Simple and advanced streaming interfaces
- Automatic event parsing and handling
- Built-in error handling and connection management
Creating a Streaming Client
import goodmem_client "github.com/PAIR-Systems-Inc/goodmem/clients/go"
// Create API client
configuration := goodmem_client.NewConfiguration()
configuration.Host = "http://localhost:8080"
configuration.DefaultHeader["Authorization"] = "Bearer your-api-key"
apiClient := goodmem_client.NewAPIClient(configuration)
// Create streaming client
streamingClient := goodmem_client.NewStreamingClient(apiClient)Simple Streaming
The simplest way to retrieve memories with streaming:
ctx := context.Background()
spaceIDs := []string{"your-space-id"}
stream, err := streamingClient.RetrieveMemoryStreamSimple(
ctx,
"your search query",
spaceIDs,
)
if err != nil {
log.Fatalf("Error starting stream: %v", err)
}
defer stream.Close()
// Process streaming events
for {
event, err := stream.Recv()
if err != nil {
if err.Error() == "EOF" {
break // End of stream
}
log.Printf("Stream error: %v", err)
break
}
// Handle events
if event.AbstractReply != nil {
fmt.Printf("Abstract: %s\n", event.AbstractReply.GetText())
} else if event.RetrievedItem != nil {
fmt.Printf("Memory ID: %s\n", event.RetrievedItem.Memory.GetMemoryId())
}
}Advanced Streaming Configuration
For more control over streaming behavior:
advancedConfig := &goodmem_client.StreamingConfig{
RequestedSize: 10,
FetchMemory: true,
FetchMemoryContent: false,
Format: "ndjson",
PostProcessorName: "com.goodmem.retrieval.postprocess.ChatPostProcessorFactory",
PostProcessorConfig: map[string]interface{}{
"llm_id": "your-llm-uuid",
"reranker_id": "your-reranker-uuid",
"relevance_threshold": 0.5,
"max_results": 10,
"chronological_resort": true,
},
}
stream, err := streamingClient.RetrieveMemoryStreamAdvanced(
ctx,
"your search query",
spaceIDs,
advancedConfig,
)StreamingConfig Fields
| Field | Type | Description |
|---|---|---|
RequestedSize | int32 | Number of results to request |
FetchMemory | bool | Whether to fetch full memory objects |
FetchMemoryContent | bool | Whether to include memory content |
Format | string | Streaming format: "sse" or "ndjson" |
PostProcessorName | string | Post-processor factory class name |
PostProcessorConfig | map[string]interface{} | Post-processor configuration |
Event Types
The streaming client returns RetrieveMemoryEvent objects with the following fields:
AbstractReply- Generated abstract/summary from LLMRetrievedItem- Memory item with relevance scoreMemoryDefinition- Memory metadata and structure
Error Handling
for {
event, err := stream.Recv()
if err != nil {
// Check for end of stream
if err.Error() == "EOF" {
break
}
// Handle other errors
log.Printf("Stream error: %v", err)
break
}
// Process event...
}Best Practices
- Always close streams: Use
defer stream.Close()to ensure cleanup - Handle EOF properly: Check for EOF to detect end of stream
- Use appropriate format: NDJSON for reliability, SSE for real-time
- Configure timeouts: Set appropriate HTTP client timeouts
- Process incrementally: Handle events as they arrive
See Also
- Memories API - Memory retrieval endpoints
- RetrieveMemoryEvent - Event structure
- Go SDK Overview - Getting started guide