N
Nexus API Referencev2.4.1

Folder upload into a space chat

End-to-end demo for unique_sdk 0.10.x: ships an inline Space.create_chat polyfill (drops once you upgrade), mints a chat via POST /space/chat with no bootstrap message, uploads every file in a local folder inheriting the space's ingestion settings, waits for parallel ingestion, then submits the instruction prompt.

GET/v2/unique-sdk-demo/folder-upload-into-a-space-chatRequires authentication

Code Example

python
"""
Script 3: Space API — Folder upload into a space chat

Demonstrates the create → upload → wait → instruct pattern needed when
your use case requires documents to be in the chat *before* the first real
instruction is sent.

The Unique SDK 0.10.x line has no Space.create_chat() classmethod, but the
underlying endpoint (POST /space/chat → {id, title, createdAt}) is already
live on the platform. The polyfill block at the top of this file attaches
sync + async classmethods to Space iff they aren't already there, so the
demo below can call unique_sdk.Space.create_chat(...) using the same API
that newer SDKs ship natively. Once you upgrade past 0.10.x, the hasattr
guards make the patch a no-op — and you can delete the block entirely.

  1. Mints a chat via POST /space/chat (no bootstrap message in history)
  2. Uploads every file in a local folder, scoped to that chatId
     (no ingestionConfig passed, so the space's defaults are inherited)
  3. Waits for ingestion (parallel server-side) before instructing
  4. Sends the user instruction with the now-populated chat

Usage:
    uv run python 03_space_api.py <folder_path> "<instruction prompt>"
"""

import mimetypes
import os
import sys
import time
from pathlib import Path
from typing import cast

import httpx
from dotenv import load_dotenv

import unique_sdk
from unique_sdk.api_resources._space import Space


# --- Space.create_chat polyfill (drop once your SDK ships it natively) ---
# Mirrors the upstream classmethod: POST /space/chat with {title, assistantId}
# returning {id, title, createdAt}. Headers, auth, retries, and api_base all
# flow through the existing SDK request layer.
def _create_chat(cls, user_id: str, company_id: str, **params):
    return cast(
        "Space.ChatResult",
        cls._static_request(
            "post", "/space/chat", user_id, company_id, params=params,
        ),
    )


async def _create_chat_async(cls, user_id: str, company_id: str, **params):
    return cast(
        "Space.ChatResult",
        await cls._static_request_async(
            "post", "/space/chat", user_id, company_id, params=params,
        ),
    )


if not hasattr(Space, "create_chat"):
    Space.create_chat = classmethod(_create_chat)  # type: ignore[attr-defined]

if not hasattr(Space, "create_chat_async"):
    Space.create_chat_async = classmethod(_create_chat_async)  # type: ignore[attr-defined]
# --- end polyfill ---


load_dotenv()

unique_sdk.api_key = os.environ["UNIQUE_API_KEY"]
unique_sdk.app_id = os.environ["UNIQUE_APP_ID"]

USER_ID = os.environ["UNIQUE_USER_ID"]
COMPANY_ID = os.environ["UNIQUE_COMPANY_ID"]

ASSISTANT_ID = os.environ.get("UNIQUE_ASSISTANT_ID", "assistant_...")

POLL_INTERVAL = 1.0
REPLY_MAX_WAIT = 60.0
INGESTION_MAX_WAIT_CAP = 120.0


def _ingestion_max_wait(num_files: int) -> float:
    # Ingestion runs concurrently server-side, so wall-clock grows with the
    # slowest file, not the sum. Bump slightly per file to absorb queueing.
    return min(60.0 + 20.0 * max(num_files - 1, 0), INGESTION_MAX_WAIT_CAP)


def _wait_for_assistant_reply(chat_id: str) -> str:
    """Poll Space.get_latest_message until the assistant stops streaming."""
    deadline = time.monotonic() + REPLY_MAX_WAIT
    while time.monotonic() < deadline:
        latest = unique_sdk.Space.get_latest_message(
            user_id=USER_ID,
            company_id=COMPANY_ID,
            chat_id=chat_id,
        )
        if latest.get("stoppedStreamingAt") is not None:
            return latest.get("text") or ""
        time.sleep(POLL_INTERVAL)
    raise TimeoutError(f"Timed out waiting for assistant reply in chat {chat_id}")


def create_chat(title: str = "Folder upload session") -> str:
    """
    Mint a new chat in the space without sending a message.

    Resolves to the upstream classmethod on newer SDKs and to the polyfill
    declared at the top of this file on 0.10.x — the call site is identical
    either way.
    """
    print("\n--- Creating new chat ---")
    chat = unique_sdk.Space.create_chat(
        user_id=USER_ID,
        company_id=COMPANY_ID,
        title=title,
        assistantId=ASSISTANT_ID,
    )
    chat_id = chat["id"]
    print(f"  Chat ID: {chat_id}")
    return chat_id


def send_message(chat_id: str, text: str) -> str:
    """Send a message and wait for the assistant's full reply."""
    print("\n--- Sending message ---")
    print(f"  User: {text}")
    unique_sdk.Space.create_message(
        user_id=USER_ID,
        company_id=COMPANY_ID,
        assistantId=ASSISTANT_ID,
        chatId=chat_id,
        text=text,
    )
    reply = _wait_for_assistant_reply(chat_id)
    print(f"  Assistant: {reply[:500]}")
    return reply


def upload_document(chat_id: str, file_path: str):
    """
    Upload a single document scoped to a chat. No ingestionConfig is passed,
    so the space's configured ingestion settings are inherited. Returns the
    Content object; caller is responsible for waiting on ingestion.
    """
    file_name = os.path.basename(file_path)
    mime_type, _ = mimetypes.guess_type(file_name)
    mime_type = mime_type or "application/octet-stream"

    print(f"  -> {file_name} ({mime_type})")
    content = unique_sdk.Content.upsert(
        user_id=USER_ID,
        company_id=COMPANY_ID,
        chatId=chat_id,
        input={
            "key": file_name,
            "title": file_name,
            "mimeType": mime_type,
        },
    )

    write_url = content.writeUrl
    if not write_url:
        raise RuntimeError(f"Content.upsert returned no writeUrl for {file_name}")

    with open(file_path, "rb") as f:
        file_bytes = f.read()

    httpx.put(
        write_url,
        content=file_bytes,
        headers={"Content-Type": mime_type, "x-ms-blob-type": "BlockBlob"},
    ).raise_for_status()

    print(f"     uploaded {len(file_bytes)} bytes, content {content.id}")
    return content


def upload_folder(chat_id: str, folder_path: str) -> list:
    """Upload every (non-hidden, non-directory) file in a folder, in name
    order. Returns the list of Content objects."""
    folder = Path(folder_path)
    if not folder.is_dir():
        raise NotADirectoryError(f"Not a folder: {folder_path}")

    files = sorted(
        p for p in folder.iterdir()
        if p.is_file() and not p.name.startswith(".")
    )
    if not files:
        raise FileNotFoundError(f"No files to upload in: {folder_path}")

    print(f"\n--- Uploading {len(files)} document(s) from {folder} ---")
    return [upload_document(chat_id, str(p)) for p in files]


def wait_for_ingestion(chat_id: str, content_ids: list[str]):
    """
    Poll until every content has finished parse/chunk/embed. The platform
    ingests in parallel, so the timeout scales gently with N and is capped
    at INGESTION_MAX_WAIT_CAP.
    """
    max_wait = _ingestion_max_wait(len(content_ids))
    print(f"\n--- Waiting for ingestion of {len(content_ids)} doc(s) (≤{max_wait:.0f}s) ---")

    pending = set(content_ids)
    deadline = time.monotonic() + max_wait

    while pending and time.monotonic() < deadline:
        for cid in list(pending):
            results = unique_sdk.Content.search(
                user_id=USER_ID,
                company_id=COMPANY_ID,
                where={"id": {"equals": cid}},
                chatId=chat_id,
                includeFailedContent=True,
            )
            if not results:
                continue
            state = results[0].get("ingestionState")
            if state == "FINISHED":
                print(f"  {cid}: FINISHED")
                pending.discard(cid)
            elif isinstance(state, str) and state.startswith("FAILED"):
                raise RuntimeError(f"Ingestion failed for {cid}: {state}")
        if pending:
            time.sleep(POLL_INTERVAL)

    if pending:
        raise TimeoutError(f"Ingestion timed out for: {sorted(pending)}")


def get_space_info(space_id: str):
    """Retrieve and display space configuration."""
    print(f"\n--- Space info: {space_id} ---")
    space = unique_sdk.Space.get_space(
        user_id=USER_ID,
        company_id=COMPANY_ID,
        space_id=space_id,
    )
    print(f"  Name: {space.name}")
    print(f"  Language model: {space.languageModel}")
    print(f"  Chat upload: {space.chatUpload}")
    print(f"  Modules: {[m['name'] for m in space.modules]}")
    return space


def get_history(chat_id: str):
    """Retrieve the full message history for a chat."""
    print("\n--- Chat history ---")
    result = unique_sdk.Space.get_chat_messages(
        user_id=USER_ID,
        company_id=COMPANY_ID,
        chat_id=chat_id,
    )
    for msg in result["messages"]:
        role = msg["role"]
        text = msg.get("text") or "(empty)"
        print(f"  [{role}] {text[:200]}")
    return result


if __name__ == "__main__":
    if len(sys.argv) < 3:
        print('Usage: uv run python 03_space_api.py <folder_path> "<instruction prompt>"')
        sys.exit(1)

    folder_path = sys.argv[1]
    instruction = sys.argv[2]

    chat_id = create_chat()
    contents = upload_folder(chat_id, folder_path)
    wait_for_ingestion(chat_id, [c.id for c in contents])
    send_message(chat_id, instruction)
    get_history(chat_id)
Last updated: May 14, 2026python