Skip to main content
When you turn Synap on for an existing app, you usually have months of conversation logs sitting in a database. Pump them in via batch ingestion so the agent starts day one with the same context your users already remember about themselves.
import asyncio
from dataclasses import dataclass
from datetime import datetime
from typing import AsyncIterator
from maximem_synap import CreateMemoryRequest, MaximemSynapSDK

sdk = MaximemSynapSDK(api_key=...)
await sdk.initialize()

# `db` is your own async DB handle — swap in your project's connection.
db = ...

@dataclass
class HistoricalTurn:
    user_id: str
    customer_id: str
    conversation_id: str
    message_index: int       # position of this turn within the conversation
    user_message: str
    assistant_message: str
    happened_at: datetime    # original timestamp from your DB


async def load_historical_turns() -> AsyncIterator[HistoricalTurn]:
    """Pull from your application database. Yield one turn at a time."""
    async for row in db.stream("SELECT * FROM conversations ORDER BY created_at"):
        yield HistoricalTurn(
            user_id=row.user_id,
            customer_id=row.customer_id,
            conversation_id=str(row.conversation_id),   # must be UUID
            message_index=row.message_index,
            user_message=row.user_text,
            assistant_message=row.assistant_text,
            happened_at=row.created_at,
        )


async def batch_replay(batch_size: int = 50):
    batch: list[CreateMemoryRequest] = []
    total = 0

    async for turn in load_historical_turns():
        batch.append(CreateMemoryRequest(
            document=f"User: {turn.user_message}\nAssistant: {turn.assistant_message}",
            document_type="ai-chat-conversation",
            document_created_at=turn.happened_at,   # preserve original time
            user_id=turn.user_id,
            customer_id=turn.customer_id,
            mode="long-range",                       # deep extraction for historical data
            metadata={
                "conversation_id": turn.conversation_id,
                "source": "backfill",
            },
        ))

        if len(batch) >= batch_size:
            await sdk.memories.batch_create(documents=batch, fail_fast=False)
            total += len(batch)
            print(f"backfilled {total} turns")
            batch.clear()

    if batch:
        await sdk.memories.batch_create(documents=batch, fail_fast=False)
        total += len(batch)

    print(f"done: {total} historical turns ingested")

asyncio.run(batch_replay())
Why mode="long-range" for backfill Historical conversations are the highest-value extraction target you’ll ever have — they’re the one-shot chance to build a rich entity graph and detailed preferences for every user. long-range runs the full extraction pipeline (entity resolution, preference detection, emotion analysis, relationship mapping). It’s slower per document, but speed doesn’t matter for an offline backfill. fast mode would skip most of this — fine for runtime chat ingestion, wrong for backfill. Why document_created_at Synap uses document_created_at as the ground-truth timestamp for temporal reasoning, recency ranking, and aging. Don’t skip it — without it, every memory looks like it happened today, and recency-weighted retrieval breaks. Pass the original timestamp from your database. Why batch and not one-by-one batch_create accepts up to 100 documents per call, and the cloud queues them on a backfill-friendly path that doesn’t slow real-time ingestion. One-by-one would 30x your wall-clock time and burn rate limits. Idempotency If you re-run the backfill (because you found a bug, or because you’re bringing online a second region), pass a stable document_id derived from your DB row:
document_id=f"backfill:{turn.conversation_id}:{turn.message_index}",
Synap deduplicates on document_id; duplicates surface as per-document failures in the batch_create response (results[].status / results[].error_message). Inspect the response and skip rows that already exist:
resp = await sdk.memories.batch_create(documents=batch, fail_fast=False)
for r in resp.results:
    if r.status == "failed":
        # e.g., duplicate document_id from a prior backfill run
        log.info("backfill_skip ingestion_id=%s err=%s", r.ingestion_id, r.error_message)
How long it takes Backfill throughput is gated by per-Instance limits — measure on a sample slice of your corpus before extrapolating; the numbers vary by document size and mode. Talk to support if you have multi-million-turn backfills and want a higher throughput allocation for the window.

Going further