GoodMem
ReferenceClient SDKsGo SDK

Streaming Client

Custom streaming client for real-time memory retrieval

Overview

The StreamingClient provides custom streaming functionality for real-time memory retrieval using Server-Sent Events (SSE) or NDJSON format.

Features

  • Real-time memory retrieval with streaming
  • Support for SSE and NDJSON formats
  • Simple and advanced streaming interfaces
  • Automatic event parsing and handling
  • Built-in error handling and connection management

Creating a Streaming Client

import goodmem_client "github.com/PAIR-Systems-Inc/goodmem/clients/go"

// Create API client
configuration := goodmem_client.NewConfiguration()
configuration.Host = "http://localhost:8080"
configuration.DefaultHeader["Authorization"] = "Bearer your-api-key"

apiClient := goodmem_client.NewAPIClient(configuration)

// Create streaming client
streamingClient := goodmem_client.NewStreamingClient(apiClient)

Simple Streaming

The simplest way to retrieve memories with streaming:

ctx := context.Background()
spaceIDs := []string{"your-space-id"}

stream, err := streamingClient.RetrieveMemoryStreamSimple(
    ctx,
    "your search query",
    spaceIDs,
)
if err != nil {
    log.Fatalf("Error starting stream: %v", err)
}
defer stream.Close()

// Process streaming events
for {
    event, err := stream.Recv()
    if err != nil {
        if err.Error() == "EOF" {
            break // End of stream
        }
        log.Printf("Stream error: %v", err)
        break
    }

    // Handle events
    if event.AbstractReply != nil {
        fmt.Printf("Abstract: %s\n", event.AbstractReply.GetText())
    } else if event.RetrievedItem != nil {
        fmt.Printf("Memory ID: %s\n", event.RetrievedItem.Memory.GetMemoryId())
    }
}

Advanced Streaming Configuration

For more control over streaming behavior:

advancedConfig := &goodmem_client.StreamingConfig{
    RequestedSize:        10,
    FetchMemory:         true,
    FetchMemoryContent:  false,
    Format:              "ndjson",
    PostProcessorName:   "com.goodmem.retrieval.postprocess.ChatPostProcessorFactory",
    PostProcessorConfig: map[string]interface{}{
        "llm_id":              "your-llm-uuid",
        "reranker_id":         "your-reranker-uuid",
        "relevance_threshold": 0.5,
        "max_results":         10,
        "chronological_resort": true,
    },
}

stream, err := streamingClient.RetrieveMemoryStreamAdvanced(
    ctx,
    "your search query",
    spaceIDs,
    advancedConfig,
)

StreamingConfig Fields

FieldTypeDescription
RequestedSizeint32Number of results to request
FetchMemoryboolWhether to fetch full memory objects
FetchMemoryContentboolWhether to include memory content
FormatstringStreaming format: "sse" or "ndjson"
PostProcessorNamestringPost-processor factory class name
PostProcessorConfigmap[string]interface{}Post-processor configuration

Event Types

The streaming client returns RetrieveMemoryEvent objects with the following fields:

  • AbstractReply - Generated abstract/summary from LLM
  • RetrievedItem - Memory item with relevance score
  • MemoryDefinition - Memory metadata and structure

Error Handling

for {
    event, err := stream.Recv()
    if err != nil {
        // Check for end of stream
        if err.Error() == "EOF" {
            break
        }

        // Handle other errors
        log.Printf("Stream error: %v", err)
        break
    }

    // Process event...
}

Best Practices

  1. Always close streams: Use defer stream.Close() to ensure cleanup
  2. Handle EOF properly: Check for EOF to detect end of stream
  3. Use appropriate format: NDJSON for reliability, SSE for real-time
  4. Configure timeouts: Set appropriate HTTP client timeouts
  5. Process incrementally: Handle events as they arrive

See Also