📢 Announcing our research paper: Zentry achieves 26% higher accuracy than OpenAI Memory, 91% lower latency, and 90% token savings! Read the paper to learn how we're revolutionizing AI agent memory.
Pipecat Integration
Zentry seamlessly integrates with Pipecat, providing long-term memory capabilities for conversational AI agents. This integration allows your Pipecat-powered applications to remember past conversations and provide personalized responses based on user history.
Installation
To use Zentry with Pipecat, install the required dependencies:
pip install "pipecat-ai[Zentry]"
You’ll also need to set up your Zentry API key as an environment variable:
export Zentry_API_KEY=your_Zentry_api_key
You can obtain a Zentry API key by signing up at zentry.gg.
Configuration
Zentry integration is provided through the ZentryMemoryService
class in Pipecat. Here’s how to configure it:
from pipecat.services.Zentry import ZentryMemoryService
memory = ZentryMemoryService(
api_key=os.getenv("Zentry_API_KEY"), # Your Zentry API key
user_id="unique_user_id", # Unique identifier for the end user
agent_id="my_agent", # Identifier for the agent using the memory
run_id="session_123", # Optional: specific conversation session ID
params={ # Optional: configuration parameters
"search_limit": 10, # Maximum memories to retrieve per query
"search_threshold": 0.1, # Relevance threshold (0.0 to 1.0)
"system_prompt": "Here are your past memories:", # Custom prefix for memories
"add_as_system_message": True, # Add memories as system (True) or user (False) message
"position": 1, # Position in context to insert memories
}
)
Pipeline Integration
The ZentryMemoryService
should be positioned between your context aggregator and LLM service in the Pipecat pipeline:
pipeline = Pipeline([
transport.input(),
stt, # Speech-to-text for audio input
user_context, # User context aggregator
memory, # Zentry Memory service enhances context here
llm, # LLM for response generation
tts, # Optional: Text-to-speech
transport.output(),
assistant_context # Assistant context aggregator
])
Example: Voice Agent with Memory
Here’s a complete example of a Pipecat voice agent with Zentry memory integration:
import asyncio
import os
from fastapi import FastAPI, WebSocket
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.services.Zentry import ZentryMemoryService
from pipecat.services.openai import OpenAILLMService, OpenAIUserContextAggregator, OpenAIAssistantContextAggregator
from pipecat.transports.network.fastapi_websocket import (
FastAPIWebsocketTransport,
FastAPIWebsocketParams
)
from pipecat.serializers.protobuf import ProtobufFrameSerializer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.services.whisper import WhisperSTTService
app = FastAPI()
@app.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# Basic setup with minimal configuration
user_id = "user123"
# WebSocket transport
transport = FastAPIWebsocketTransport(
websocket=websocket,
params=FastAPIWebsocketParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
serializer=ProtobufFrameSerializer(),
)
)
# Core services
user_context = OpenAIUserContextAggregator()
assistant_context = OpenAIAssistantContextAggregator()
stt = WhisperSTTService(api_key=os.getenv("OPENAI_API_KEY"))
# Memory service - the key component
memory = ZentryMemoryService(
api_key=os.getenv("Zentry_API_KEY"),
user_id=user_id,
agent_id="fastapi_memory_bot"
)
# LLM for response generation
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-3.5-turbo",
system_prompt="You are a helpful assistant that remembers past conversations."
)
# Simple pipeline
pipeline = Pipeline([
transport.input(),
stt, # Speech-to-text for audio input
user_context,
memory, # Memory service enhances context here
llm,
transport.output(),
assistant_context
])
# Run the pipeline
runner = PipelineRunner()
task = PipelineTask(pipeline)
# Event handlers for WebSocket connections
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
# Send welcome message when client connects
await task.queue_frame(TextFrame("Hello! I'm a memory bot. I'll remember our conversation."))
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
# Clean up when client disconnects
await task.cancel()
await runner.run(task)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
How It Works
When integrated with Pipecat, Zentry provides two key functionalities:
1. Message Storage
All conversation messages are automatically stored in Zentry for future reference:
- Captures the full message history from context frames
- Associates messages with the specified user, agent, and run IDs
- Stores metadata to enable efficient retrieval
2. Memory Retrieval
When a new user message is detected:
- The message is used as a search query to find relevant past memories
- Relevant memories are retrieved from Zentry’s database
- Memories are formatted and added to the conversation context
- The enhanced context is passed to the LLM for response generation
Additional Configuration Options
Memory Search Parameters
You can customize how memories are retrieved and used:
memory = ZentryMemoryService(
api_key=os.getenv("Zentry_API_KEY"),
user_id="user123",
params={
"search_limit": 5, # Retrieve up to 5 memories
"search_threshold": 0.2, # Higher threshold for more relevant matches
"api_version": "v2", # Zentry API version
}
)
Memory Presentation Options
Control how memories are presented to the LLM:
memory = ZentryMemoryService(
api_key=os.getenv("Zentry_API_KEY"),
user_id="user123",
params={
"system_prompt": "Previous conversations with this user:",
"add_as_system_message": True, # Add as system message instead of user message
"position": 0, # Insert at the beginning of the context
}
)
Resources