Skip to main content

Overview

Ingestion is how you feed data into Synap’s memory system. Every conversation, document, email, or transcript you send through sdk.memories.create() enters the ingestion pipeline where it is categorized, chunked, entities are extracted and resolved, and the result is persisted across Synap’s vector and graph storage engines. Ingestion is asynchronous. When you call create(), Synap immediately returns an ingestion_id that you can use to poll the processing status. This design allows high-throughput workloads without blocking your application.

Creating a Memory

Use sdk.memories.create() to send a single document into the ingestion pipeline.
from datetime import datetime

response = await sdk.memories.create(
    document="User: What's the status of Project Atlas?\nAssistant: Project Atlas is on track for Q2 launch...",
    document_type="ai-chat-conversation",
    user_id="user_12345",
    customer_id="cust_67890",
    mode="long-range",
    metadata={"session_id": "sess_abc", "agent_version": "2.1.0"}
)

print(f"Ingestion ID: {response.ingestion_id}")
print(f"Status: {response.status}")

Parameter Reference

document
str
required
The raw content to ingest. For conversations, include the full transcript with speaker labels. For documents, include the full text content.
document_type
str
default:"ai-chat-conversation"
The type of content being ingested. Determines which extraction and chunking strategies are applied. See Document Types below.
document_id
str
An optional idempotency key. If you submit the same document_id twice, the second submission updates the existing memory rather than creating a duplicate. Useful for re-ingesting edited documents.
document_created_at
datetime
The original creation timestamp of the document. If omitted, defaults to the current time. Providing the true creation time improves temporal reasoning during retrieval.
user_id
str
required
The end-user identifier. Scopes the resulting memory to this user. Memories created with a user_id are retrievable via sdk.user.context.fetch() and sdk.conversation.context.fetch() when the same user is queried.
customer_id
str
required
The customer (organization/tenant) identifier. Scopes the resulting memory to this customer. Use this when multiple users belong to the same customer and should share certain memories.
mode
str
default:"long-range"
The ingestion mode. Controls the depth of extraction and processing. See Ingest Modes below.
metadata
dict
Arbitrary key-value metadata attached to the memory. Metadata is stored but not indexed for retrieval. Use it for your own bookkeeping (session IDs, agent versions, source systems, etc.).

Response

ingestion_id
UUID
required
Unique identifier for this ingestion job. Use with sdk.memories.status() to track progress.
status
str
required
Initial status of the ingestion job. Always "queued" on creation.
memory_ids
list[UUID]
The IDs of the memories that will be created. Available immediately so you can reference them before processing completes.

Document Types

The document_type parameter tells the ingestion pipeline which extraction and chunking strategies to apply.
Document TypeDescriptionOptimized For
ai-chat-conversationMulti-turn AI assistant conversationsSpeaker turns, intent extraction, preference detection
documentGeneral text documentsParagraph chunking, topic extraction
emailEmail messages and threadsSender/recipient extraction, action items, thread context
pdfPDF document content (text extracted)Section-aware chunking, header/footer handling
imageImage descriptions or OCR textEntity extraction from visual content descriptions
audioAudio transcriptionsSpeaker diarization awareness, temporal markers
meeting-transcriptMeeting transcription contentMulti-speaker extraction, action items, decisions
For image and audio types, you provide the text content (description, transcript, or OCR output), not the raw binary file. Media processing and transcription should be handled upstream of Synap.

Ingest Modes

Synap offers two ingestion modes that trade off processing depth against throughput.

fast

Optimized for speed. Performs basic chunking, lightweight entity extraction, and vector embedding. Skips deep relationship mapping and advanced categorization.
  • Processing time: seconds
  • Best for: high-throughput pipelines, real-time chat logging, non-critical data

long-range

Optimized for quality. Runs the full extraction pipeline including deep entity resolution, relationship mapping, preference detection, emotional analysis, and graph storage.
  • Processing time: seconds to minutes
  • Best for: conversations, documents where deep extraction matters, building long-term user profiles
Use long-range mode for conversations and documents where deep extraction matters. Use fast for high-throughput scenarios where speed is critical. You can always re-ingest a document in long-range mode later by resubmitting with the same document_id.

Code Examples

Ingesting a Conversation

response = await sdk.memories.create(
    document="""User: I'm planning a trip to Japan in April.
Assistant: Great choice! April is cherry blossom season in Japan. Would you like recommendations for Tokyo or Kyoto?
User: Both! I prefer boutique hotels over large chains, and I'm vegetarian.
Assistant: I'll keep your preference for boutique hotels and vegetarian dining in mind...""",
    document_type="ai-chat-conversation",
    user_id="user_12345",
    customer_id="cust_67890",
    mode="long-range"
)

Ingesting a Document

response = await sdk.memories.create(
    document="Q3 2025 Engineering OKRs\n\n1. Ship Synap SDK v2.0...",
    document_type="document",
    user_id="user_12345",
    customer_id="cust_67890",
    document_created_at=datetime(2025, 7, 1),
    metadata={"source": "confluence", "page_id": "12345"}
)

Ingesting with User and Customer Scoping

When both user_id and customer_id are provided, the memory is accessible at both scopes. This is useful when a user’s conversation may contain information relevant to the broader organization.
response = await sdk.memories.create(
    document="User: Our team decided to switch from Jira to Linear...",
    document_type="ai-chat-conversation",
    user_id="user_alice",
    customer_id="cust_acme_corp",
    mode="long-range"
)

# This memory is now retrievable via:
# - sdk.user.context.fetch() for user_alice
# - sdk.customer.context.fetch() for cust_acme_corp
# - sdk.conversation.context.fetch() for the conversation

Batch Ingestion

For bulk workloads, use sdk.memories.batch_create() to submit multiple documents in a single request.
from synap.types import CreateMemoryRequest

documents = [
    CreateMemoryRequest(
        document="User: Book me a flight to NYC next Tuesday...",
        document_type="ai-chat-conversation",
        user_id="user_12345",
        mode="long-range"
    ),
    CreateMemoryRequest(
        document="Meeting notes from sprint planning...",
        document_type="meeting-transcript",
        customer_id="cust_67890",
        mode="fast"
    ),
    CreateMemoryRequest(
        document="Support ticket: Login issues after password reset...",
        document_type="document",
        user_id="user_67890",
        customer_id="cust_67890",
        mode="long-range"
    ),
]

batch_response = await sdk.memories.batch_create(
    documents=documents,
    fail_fast=False
)

print(f"Submitted: {batch_response.total}")
print(f"Succeeded: {batch_response.succeeded}")
print(f"Failed: {batch_response.failed}")

for result in batch_response.results:
    print(f"  {result.ingestion_id}: {result.status}")

The fail_fast Option

fail_fast
bool
default:"False"
Controls batch behavior on validation errors.
  • False (default): Process all documents. Invalid documents are rejected individually while valid ones proceed.
  • True: Abort the entire batch if any document fails validation. No documents are ingested.

Checking Ingestion Status

Ingestion is asynchronous. Use sdk.memories.status() to poll the processing state of a submitted document.
status = await sdk.memories.status(ingestion_id=response.ingestion_id)

print(f"Status: {status.status}")
print(f"Queued at: {status.queued_at}")
print(f"Started at: {status.started_at}")
print(f"Completed at: {status.completed_at}")

if status.status == "completed":
    print(f"Memory IDs: {status.memory_ids}")
    print(f"Memories created: {status.memories_created}")
elif status.status == "failed":
    print(f"Error: {status.error_message}")

Ingestion Statuses

StatusDescription
queuedDocument accepted and waiting for processing
processingActively being processed through the ingestion pipeline
completedSuccessfully ingested. Memories are available for retrieval
failedProcessing failed. Check error_message for details
partial_successSome extractions succeeded but others failed. Partial results are available
The partial_success status typically occurs with large documents where some chunks process successfully while others encounter extraction errors. The successfully processed portions are still available for retrieval.

Updating Memories

Update an existing memory using sdk.memories.update(). This is useful when the source document has been edited or when you want to append new information.
updated = await sdk.memories.update(
    memory_id=memory_id,
    document="Updated conversation transcript with additional turns...",
    merge_strategy="smart-merge",
    metadata={"updated_at": "2025-03-15", "reason": "new turns added"}
)

Merge Strategies

Completely replaces the existing memory content with the new document. Previous extractions are discarded and re-extracted from the new content. Use when the document has been fully rewritten.
Adds the new content to the end of the existing memory. Previous extractions are preserved, and new extractions are generated only from the appended content. Use when adding new turns to a conversation.
Intelligently merges the new content with the existing memory. The pipeline detects overlapping sections, deduplicates extractions, and reconciles conflicting information by preferring the newer version. Use when the document has been partially edited.

Deleting Memories

Remove a memory and all its associated extractions permanently.
await sdk.memories.delete(memory_id=memory_id)
Deletion is permanent and irreversible. All extractions, entity associations, and graph relationships derived from this memory are removed. This operation cannot be undone.

Best Practices

Always prefix conversation turns with speaker labels (User:, Assistant:, or actual names). The ingestion pipeline uses these labels to correctly attribute preferences, facts, and intents to the right participant.
Use stable, deterministic identifiers for user_id and customer_id. These IDs form the basis of scoped retrieval. Inconsistent IDs fragment the user’s memory across multiple scopes.
When re-ingesting content that may have been submitted before (e.g., webhook retries), always provide a document_id. This prevents duplicate memories and ensures updates are applied cleanly.
When backfilling historical conversations or documents, set document_created_at to the original timestamp. This enables accurate temporal reasoning (“What did the user say last month?”).
Reserve long-range mode for content where deep understanding matters (user conversations, strategic documents). Use fast mode for high-volume, lower-priority data (automated logs, bulk imports).
For bulk ingestion (backfills, migrations), use batch_create() with fail_fast=False. This reduces HTTP overhead and allows the pipeline to optimize scheduling.

Next Steps

Context Fetch

Query the memories you have ingested for contextual retrieval.

Entity Resolution

Understand how entities are automatically resolved during ingestion.

Context Compaction

Compress long conversations to reduce token costs.

SDK Configuration

Configure SDK behavior, timeouts, and retry policies.