Events URL
Connect to the events endpoint to receive Server-Sent Events (SSE) when memories you have subscribed to are created, updated, or deleted:
https://events.ensue-network.ai/mcp
Subscription Tools
Use these MCP tools via the Public API URL to manage your memory subscriptions:
subscribe_to_memory
Subscribe to notifications for changes to a specific memory key.
| Parameter | Description |
|---|---|
key_name |
The memory key to watch |
Returns subscription details including subscribed_at and expires_at timestamps. Subscriptions are time-limited based on your organization's tier (free tier: 3 hours).
unsubscribe_from_memory
Stop receiving notifications for a memory key.
| Parameter | Description |
|---|---|
key_name |
The memory key to unsubscribe from |
Returns unsubscribed boolean indicating success.
list_subscriptions
View all active memory subscriptions for the current user. Returns a list of subscriptions with key names and expiration times.
Event Types
Notifications are delivered when subscribed memories change:
- Created - A new memory is stored at a subscribed key
- Updated - An existing subscribed memory is modified
- Deleted - A subscribed memory is removed
Example Listener
Use this Python script to monitor events from the Events API. It connects using the MCP SDK and prints notifications when subscribed memories change.
#!/usr/bin/env python3
"""
Listener for MCP Streamable HTTP notifications.
Connects to the notification service and prints received notifications.
"""
import sys
import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
import mcp.types as types
async def main():
api_key = sys.argv[1]
url = sys.argv[2] if len(sys.argv) > 2 else "https://events.ensue-network.ai/mcp"
headers = {
"Authorization": f"Bearer {api_key}",
}
stop_event = asyncio.Event()
async def message_handler(message):
if isinstance(message, types.ServerNotification):
notification = message.root
if isinstance(notification, types.ResourceUpdatedNotification):
uri = notification.params.uri
print(f"NOTIFICATION: {uri}", flush=True)
async with streamablehttp_client(url, headers=headers) as (read, write, _):
async with ClientSession(read, write, message_handler=message_handler) as session:
await session.initialize()
print("CONNECTED", flush=True)
try:
await stop_event.wait()
except asyncio.CancelledError:
pass
if __name__ == "__main__":
asyncio.run(main())
Install the MCP SDK and run the listener:
pip install mcp
python listener.py YOUR_API_KEY https://events.ensue-network.ai/mcp
Orchestrator Pattern
A common pattern uses a coordinator agent that subscribes to task keys and delegates work:
- Subscribe to a task queue key (e.g.,
tasks/pending) - Receive notifications when new tasks arrive
- Delegate subtasks to worker agents by writing to their memory keys
- Monitor worker completion keys and synthesize results
This enables fully reactive, event-driven multiagent coordination without polling.
Best Practices
Idempotent Handlers
Ensure your notification handlers can be safely retried. Store a "processed" marker to avoid duplicate processing.
Error Handling
When processing fails, store the failure in a separate key for retry or alerting.
Subscription Expiration
Remember that subscriptions expire based on your tier. Re-subscribe before expiration to maintain continuous monitoring.
Next Steps
- Search & Hypergraph - Understand retrieval for finding relevant memories
- Putnam Proof Example - See orchestration patterns in action
- API Reference - Complete subscription API