import asyncio
import logging
from maximem_synap import MaximemSynapSDK, SynapError, SynapTransientError
sdk = MaximemSynapSDK(api_key=...)
log = logging.getLogger(__name__)
async def safe_fetch_context(conversation_id: str, query: str):
"""Always returns something — even if it's empty."""
try:
return await asyncio.wait_for(
sdk.conversation.context.fetch(
conversation_id=conversation_id,
search_query=[query],
mode="fast",
max_results=8,
),
timeout=YOUR_CONVERSATIONAL_BUDGET_SECONDS, # illustrative — tune to your own conversational budget
)
except asyncio.TimeoutError:
log.warning("synap_context_timeout conv=%s", conversation_id)
return None
except SynapTransientError as e:
log.warning("synap_transient err=%s correlation_id=%s", e, e.correlation_id)
return None
except SynapError as e:
log.error("synap_unexpected err=%s correlation_id=%s", e, e.correlation_id)
return None
async def handle_turn(user_id: str, customer_id: str, conversation_id: str, msg: str) -> str:
ctx = await safe_fetch_context(conversation_id, msg)
if ctx is None:
# Degraded mode — call the LLM without memory rather than 500ing
memory_block = ""
log.info("turn_degraded user=%s", user_id)
else:
memory_block = "\n".join(f"- {f.content}" for f in ctx.facts[:5])
reply = await call_llm(memory_block, msg)
# Ingest in the background. If it fails, queue for retry — don't await.
asyncio.create_task(safe_ingest(user_id, customer_id, conversation_id, msg, reply))
return reply
async def safe_ingest(user_id, customer_id, conversation_id, msg, reply, _retries=0):
try:
await sdk.memories.create(
document=f"User: {msg}\nAssistant: {reply}",
document_type="ai-chat-conversation",
user_id=user_id,
customer_id=customer_id,
metadata={"conversation_id": conversation_id},
)
except SynapTransientError as e:
if _retries < 3:
await asyncio.sleep(2 ** _retries)
return await safe_ingest(user_id, customer_id, conversation_id, msg, reply, _retries + 1)
log.error("synap_ingest_dropped after retries user=%s msg_excerpt=%r correlation_id=%s",
user_id, msg[:80], e.correlation_id)
# Optionally: enqueue for an out-of-band replayer
await enqueue_for_replay(user_id, customer_id, conversation_id, msg, reply)
except SynapError as e:
log.error("synap_ingest_permanent err=%s correlation_id=%s", e, e.correlation_id)