📢 Announcing our research paper: Zentry achieves 26% higher accuracy than OpenAI Memory, 91% lower latency, and 90% token savings! Read the paper to learn how we're revolutionizing AI agent memory.

Pipecat Integration

Zentry seamlessly integrates with Pipecat, providing long-term memory capabilities for conversational AI agents. This integration allows your Pipecat-powered applications to remember past conversations and provide personalized responses based on user history.

Installation

To use Zentry with Pipecat, install the required dependencies:

pip install "pipecat-ai[Zentry]"

You’ll also need to set up your Zentry API key as an environment variable:

export Zentry_API_KEY=your_Zentry_api_key

You can obtain a Zentry API key by signing up at zentry.gg.

Configuration

Zentry integration is provided through the ZentryMemoryService class in Pipecat. Here’s how to configure it:

from pipecat.services.Zentry import ZentryMemoryService

memory = ZentryMemoryService(
    api_key=os.getenv("Zentry_API_KEY"),  # Your Zentry API key
    user_id="unique_user_id",           # Unique identifier for the end user
    agent_id="my_agent",                # Identifier for the agent using the memory
    run_id="session_123",               # Optional: specific conversation session ID
    params={                            # Optional: configuration parameters
        "search_limit": 10,             # Maximum memories to retrieve per query
        "search_threshold": 0.1,        # Relevance threshold (0.0 to 1.0)
        "system_prompt": "Here are your past memories:", # Custom prefix for memories
        "add_as_system_message": True,  # Add memories as system (True) or user (False) message
        "position": 1,                  # Position in context to insert memories
    }
)

Pipeline Integration

The ZentryMemoryService should be positioned between your context aggregator and LLM service in the Pipecat pipeline:

pipeline = Pipeline([
    transport.input(),
    stt,                # Speech-to-text for audio input
    user_context,       # User context aggregator
    memory,             # Zentry Memory service enhances context here
    llm,                # LLM for response generation
    tts,                # Optional: Text-to-speech
    transport.output(),
    assistant_context   # Assistant context aggregator
])

Example: Voice Agent with Memory

Here’s a complete example of a Pipecat voice agent with Zentry memory integration:

import asyncio
import os
from fastapi import FastAPI, WebSocket

from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.services.Zentry import ZentryMemoryService
from pipecat.services.openai import OpenAILLMService, OpenAIUserContextAggregator, OpenAIAssistantContextAggregator
from pipecat.transports.network.fastapi_websocket import (
    FastAPIWebsocketTransport,
    FastAPIWebsocketParams
)
from pipecat.serializers.protobuf import ProtobufFrameSerializer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.services.whisper import WhisperSTTService

app = FastAPI()

@app.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    # Basic setup with minimal configuration
    user_id = "user123"
    
    # WebSocket transport
    transport = FastAPIWebsocketTransport(
        websocket=websocket,
        params=FastAPIWebsocketParams(
            audio_out_enabled=True,
            vad_enabled=True,
            vad_analyzer=SileroVADAnalyzer(),
            vad_audio_passthrough=True,
            serializer=ProtobufFrameSerializer(),
        )
    )
    
    # Core services
    user_context = OpenAIUserContextAggregator()
    assistant_context = OpenAIAssistantContextAggregator()
    stt = WhisperSTTService(api_key=os.getenv("OPENAI_API_KEY"))
    
    # Memory service - the key component
    memory = ZentryMemoryService(
        api_key=os.getenv("Zentry_API_KEY"),
        user_id=user_id,
        agent_id="fastapi_memory_bot"
    )
    
    # LLM for response generation
    llm = OpenAILLMService(
        api_key=os.getenv("OPENAI_API_KEY"),
        model="gpt-3.5-turbo",
        system_prompt="You are a helpful assistant that remembers past conversations."
    )
    
    # Simple pipeline
    pipeline = Pipeline([
        transport.input(),
        stt,                # Speech-to-text for audio input
        user_context,
        memory,             # Memory service enhances context here
        llm,
        transport.output(),
        assistant_context
    ])
    
    # Run the pipeline
    runner = PipelineRunner()
    task = PipelineTask(pipeline)
    
    # Event handlers for WebSocket connections
    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, client):
        # Send welcome message when client connects
        await task.queue_frame(TextFrame("Hello! I'm a memory bot. I'll remember our conversation."))
    
    @transport.event_handler("on_client_disconnected")
    async def on_client_disconnected(transport, client):
        # Clean up when client disconnects
        await task.cancel()
    
    await runner.run(task)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

How It Works

When integrated with Pipecat, Zentry provides two key functionalities:

1. Message Storage

All conversation messages are automatically stored in Zentry for future reference:

  • Captures the full message history from context frames
  • Associates messages with the specified user, agent, and run IDs
  • Stores metadata to enable efficient retrieval

2. Memory Retrieval

When a new user message is detected:

  1. The message is used as a search query to find relevant past memories
  2. Relevant memories are retrieved from Zentry’s database
  3. Memories are formatted and added to the conversation context
  4. The enhanced context is passed to the LLM for response generation

Additional Configuration Options

Memory Search Parameters

You can customize how memories are retrieved and used:

memory = ZentryMemoryService(
    api_key=os.getenv("Zentry_API_KEY"),
    user_id="user123",
    params={
        "search_limit": 5,            # Retrieve up to 5 memories
        "search_threshold": 0.2,      # Higher threshold for more relevant matches
        "api_version": "v2",          # Zentry API version
    }
)

Memory Presentation Options

Control how memories are presented to the LLM:

memory = ZentryMemoryService(
    api_key=os.getenv("Zentry_API_KEY"),
    user_id="user123",
    params={
        "system_prompt": "Previous conversations with this user:",
        "add_as_system_message": True,  # Add as system message instead of user message
        "position": 0,                  # Insert at the beginning of the context
    }
)

Resources