Async Processing

Let users keep working while agents handle big tasks

Some tasks take time - major research, multi-step workflows, document generation. Users shouldn't have to sit and wait. With async processing, MUXI accepts the request, works in the background, and notifies users when it's done.

Why Async?

It's a productivity feature, not a technical workaround.

Synchronous (user waits):

User:  "Research AI trends and create comprehensive report"
         ↓
User waits... (30 seconds)
         ↓
User waits... (60 seconds)
         ↓
User waits... (2 minutes)
         ↓
Finally: "Here's your report"

User: lost 2 minutes staring at a spinner

Asynchronous (user keeps working):

User:  "Research AI trends and create comprehensive report"
         ↓
MUXI: "Got it. I'll notify you when it's ready."
         ↓
User continues with other work
         ↓
[MUXI works in background]
         ↓
Webhook/notification: "Your report is ready!"

User: stayed productive the whole time

When MUXI Uses Async

MUXI automatically switches to async when:

async:
  threshold_seconds: 30          # Default: 30 seconds

If estimated execution time ≥ threshold → async mode automatically.

Typical async operations:

  • ✅ Complex research (5+ sources)
  • ✅ Multi-step workflows (decomposed tasks)
  • ✅ Large file processing
  • ✅ External API calls (slow responses)
  • ✅ Database queries (large datasets)

How It Works

User Request → Overlord
         ↓
Estimate execution time:
  - Complexity score
  - Number of subtasks
  - Tool requirements
  - Historical data
         ↓
Estimated time ≥ threshold?
         ↓
    YES: Async Mode
      - Return request_id immediately
      - Execute in background
      - Notify via webhook (optional)
         ↓
    NO: Sync Mode
      - Execute immediately
      - Return result when done

Execution Time Estimation

MUXI analyzes:

Factor Time Added
Research required +5-15s per source
Complex analysis +10-30s
Tool execution +1-10s per tool
File generation +5-20s per file
External API calls +2-10s per call
Agent coordination +1-5s per agent

Example:

Request: "Research 3 competitors, compare features, create report"

Estimated breakdown:
- Research 3 companies: 3 × 10s = 30s
- Feature comparison: 15s
- Report generation: 10s
- Agent coordination: 5s

Total estimate: 60s ≥ 10s threshold
→ Use async mode

Sync vs Async

Synchronous (Default)

Best for:

  • Quick questions (<10 seconds)
  • Interactive conversations
  • Real-time responses needed

Example:

User:  "What's the weather in SF?"
         ↓
MUXI executes immediately
         ↓
Response: "It's 68°F and sunny" (2 seconds)

Asynchronous

Best for:

  • Long-running tasks (>10 seconds)
  • Background processing
  • Batch operations
  • Webhook triggers

Example:

User:  "Analyze entire codebase and create security report"
         ↓
MUXI: "Task started. ID: req_abc123"
         ↓
[Executes in background]
         ↓
Webhook: "Task complete! Security report ready."

Configuration

Async Modes

There are three ways to control async behavior:

# Mode 1: Let Overlord decide (RECOMMENDED)
async:
  mode: auto                     # Default - Overlord estimates time
  threshold_seconds: 60          # Switch to async if estimated > 60s

# Mode 2: Everything synchronous (not recommended)
async:
  mode: sync                     # Always wait for completion

# Mode 3: Everything asynchronous (not recommended)
async:
  mode: async                    # Always return request_id immediately

Mode 1 (auto) is recommended because the Overlord intelligently estimates how long each request will take. Quick questions get immediate answers; complex tasks go async.

Per-request override: You can also pass async: true or async: false in individual requests to override the formation setting.

Threshold Configuration

async:
  threshold_seconds: 60          # Default: 60 seconds

The Overlord compares estimated execution time to this threshold:

  • Below threshold → synchronous (user waits)
  • Above threshold → async (immediate response with request ID)

Adjust threshold based on UX needs:

Threshold When to Use
10-30s Mobile apps, impatient users
60s Default, balanced experience
120-300s Desktop apps, tolerant users

Webhook Configuration

Set up automatic notifications when async tasks complete:

async:
  webhook_url: "https://your-app.com/muxi-callback"  # Default webhook
  webhook_retries: 3              # Retry failed deliveries (default: 3)
  webhook_timeout: 10             # Timeout per delivery attempt in seconds (default: 10)

The webhook URL can also be specified per-request in the API call.

Using Async Mode

1. Explicit Async Request

API:

curl -X POST http://localhost:8001/v1/chat \
  -H "X-Muxi-Client-Key: fmc_..." \
  -d '{
    "message": "Research AI trends",
    "async": true
  }'

Response (immediate):

{
  "request_id": "req_abc123",
  "status": "processing",
  "estimated_time": "45-60 seconds"
}

2. Check Status

Poll for completion:

curl http://localhost:8001/v1/requests/req_abc123 \
  -H "X-Muxi-Client-Key: fmc_..."

Response:

{
  "request_id": "req_abc123",
  "status": "processing",
  "progress": 60,
  "message": "Analyzing third data source..."
}

When complete:

{
  "request_id": "req_abc123",
  "status": "completed",
  "result": {
    "text": "Based on my research...",
    "artifacts": ["ai_trends_report.pdf"]
  },
  "duration_ms": 47320
}

3. Webhook Notification

Request with webhook:

curl -X POST http://localhost:8001/v1/chat \
  -d '{
    "message": "Research AI trends",
    "async": true,
    "webhook_url": "https://your-app.com/muxi-callback"
  }'

MUXI posts to your webhook when done:

POST https://your-app.com/muxi-callback
Content-Type: application/json
X-Muxi-Signature: t=1706616000,v1=abc123...

{
  "id": "req_abc123",
  "status": "completed",
  "response": [
    {"type": "text", "text": "Based on my research..."}
  ]
}

4. Handling Webhooks in Your Application

When MUXI sends a webhook callback, you need to:

  1. Verify the signature (security - prevents spoofed requests)
  2. Parse the payload (typed access to response data)

The SDKs provide helpers for both.

Webhook Payload Structure

{
  "id": "req_abc123",
  "status": "completed",
  "timestamp": 1706616000,
  "response": [
    {"type": "text", "text": "Here's your report..."},
    {"type": "file", "file": {"name": "report.pdf", "url": "..."}}
  ],
  "error": null,
  "formation_id": "my-assistant",
  "user_id": "user_123",
  "processing_time": 47.32,
  "processing_mode": "async"
}
Field Description
id Request ID
status completed, failed, or awaiting_clarification
response Array of content items (text, files)
error Error details if status is failed
clarification Question details if status is awaiting_clarification
from muxi import webhook

@app.post("/webhooks/muxi")
async def handle_webhook(request: Request):
    payload = await request.body()
    signature = request.headers.get("X-Muxi-Signature")

    # Verify signature (prevents spoofing and replay attacks)
    if not webhook.verify_signature(payload, signature, WEBHOOK_SECRET):
        raise HTTPException(401, "Invalid signature")

    # Parse into typed object
    event = webhook.parse(payload)

    if event.status == "completed":
        for item in event.content:
            if item.type == "text":
                print(item.text)
            elif item.type == "file":
                print(f"File: {item.file['name']}")
    elif event.status == "failed":
        print(f"Error: {event.error.code} - {event.error.message}")
    elif event.status == "awaiting_clarification":
        print(f"Clarification needed: {event.clarification.question}")

    return {"received": True}
import { webhook } from "@muxi-ai/muxi-typescript";

app.post("/webhooks/muxi", (req, res) => {
    const signature = req.headers["x-muxi-signature"] as string;

    // Verify signature
    if (!webhook.verifySignature(req.rawBody, signature, WEBHOOK_SECRET)) {
        return res.status(401).send("Invalid signature");
    }

    // Parse into typed object
    const event = webhook.parse(req.rawBody);

    if (event.status === "completed") {
        for (const item of event.content) {
            if (item.type === "text") console.log(item.text);
        }
    } else if (event.status === "failed") {
        console.error(Error: ${event.error?.message});
    }

    res.json({ received: true });
});
import "github.com/muxi-ai/muxi-go/webhook"

func handleWebhook(w http.ResponseWriter, r *http.Request) {
    payload, _ := io.ReadAll(r.Body)
    sig := r.Header.Get("X-Muxi-Signature")

    // Verify signature
    if err := webhook.VerifySignature(payload, sig, secret); err != nil {
        http.Error(w, "Invalid signature", http.StatusUnauthorized)
        return
    }

    // Parse into typed object
    event, err := webhook.Parse(payload)
    if err != nil {
        http.Error(w, "Invalid payload", http.StatusBadRequest)
        return
    }

    switch event.Status {
    case "completed":
        for _, item := range event.Content {
            if item.Type == "text" {
                fmt.Println(item.Text)
            }
        }
    case "failed":
        fmt.Printf("Error: %s
", event.Error.Message)
    }

    w.WriteHeader(http.StatusOK)
}

Signature Verification Details

The X-Muxi-Signature header format: t=,v1=

  • t: Unix timestamp when webhook was sent
  • v1: HMAC-SHA256 of {timestamp}.{payload} using your webhook secret

Security features:

  • Replay attack prevention: Signatures expire after 5 minutes (configurable)
  • Constant-time comparison: Prevents timing attacks
  • HMAC-SHA256: Cryptographically secure verification

Webhook secret: Use your formation's admin_key or configure a dedicated webhook secret.

Request States

State Description
pending Queued, not started yet
processing Currently executing
completed Finished successfully
failed Error occurred
cancelled Cancelled by user

Progress Tracking

For long-running tasks:

{
  "request_id": "req_abc123",
  "status": "processing",
  "progress": 45,
  "current_step": "Researching competitor B",
  "total_steps": 5,
  "completed_steps": 2
}

SDK Examples

Python

Polling for async request status:

from muxi import FormationClient
import time

formation = FormationClient(
    server_url="http://localhost:7890",
    formation_id="my-assistant",
    client_key="...",
)

# Streaming chat - long operations return request_id
for event in formation.chat_stream(
    {"message": "Research AI trends"},
    user_id="user_123"
):
    if event.get("request_id"):
        request_id = event.get("request_id")
        print(f"Request ID: {request_id}")

# Poll for completion
while True:
    status = formation.get_request_status(request_id)

    if status.get("status") == "completed":
        print(status.get("result"))
        break

    print(f"Status: {status.get('status')}")
    time.sleep(2)

Webhook configuration: Configure webhooks in your formation.afs:

async:
  webhook_url: "https://your-app.com/callback"
  webhook_retries: 3

Async with callback:

def on_complete(result):
    print(f"Done! {result.text}")

def on_progress(status):
    print(f"Progress: {status.progress}%")

# Start with callback
request = formation.chat_async(
    "Research AI trends",
    on_complete=on_complete,
    on_progress=on_progress
)

TypeScript

import { FormationClient } from "@muxi-ai/muxi-typescript";

const formation = new FormationClient({
  serverUrl: "http://localhost:7890",
  formationId: "my-assistant",
  clientKey: "...",
});

// Get request ID from streaming response
let requestId: string;
for await (const chunk of await formation.chatStream(
  { message: "Research AI trends" },
  "user_123"
)) {
  if (chunk.requestId) {
    requestId = chunk.requestId;
    console.log(Request ID: ${requestId});
  }
}

// Poll for completion
while (true) {
  const status = await formation.getRequestStatus(requestId);

  if (status.status === "completed") {
    console.log(status.result);
    break;
  }

  console.log(Status: ${status.status});
  await new Promise(resolve => setTimeout(resolve, 2000));
}

// Async with webhook
await formation.chatAsync("Research AI trends", {
  webhookUrl: "https://your-app.com/callback"
});

Triggers (Default Async)

Webhook triggers default to async:

# GitHub webhook triggers MUXI
POST http://localhost:8001/v1/triggers/github-issue
{
  "action": "opened",
  "issue": {...}
}

# Returns immediately
{
  "request_id": "req_xyz789",
  "status": "processing"
}

# Webhook caller doesn't wait
# MUXI processes in background

Force sync trigger:

POST http://localhost:8001/v1/triggers/github-issue
{
  "data": {...},
  "use_async": false     # Wait for completion
}

Timeout Configuration

Task and workflow timeouts are configured under overlord.workflow:

overlord:
  workflow:
    timeouts:
      task_timeout: 300          # 5 minutes per task (default: 300)
      workflow_timeout: 1800     # 30 minutes total (default: 3600)

What happens on timeout:

Task running for 5 minutes...
         ↓
Timeout reached
         ↓
Task cancelled
         ↓
Status: "failed"
Error: "Task exceeded timeout (300s)"

Cancelling Requests

Cancel in-flight request:

DELETE /v1/requests/req_abc123

Response:

{
  "request_id": "req_abc123",
  "status": "cancelled",
  "message": "Request cancelled by user"
}

SDK:

formation.cancel_request("req_abc123")

Error Handling

Failed Requests

{
  "request_id": "req_abc123",
  "status": "failed",
  "error": {
    "type": "tool_error",
    "message": "Web search API rate limit exceeded",
    "retryable": true
  }
}

Automatic Retries

Configure retry behavior in overlord workflow settings:

overlord:
  workflow:
    retry:
      max_attempts: 3            # Default: 3
      initial_delay: 1.0         # Seconds before first retry (default: 1.0)
      max_delay: 60.0            # Max retry delay (default: 60.0)
      backoff_factor: 2.0        # Exponential backoff (default: 2.0)

On failure:

Task fails (API timeout)
         ↓
Wait 5 seconds
         ↓
Retry (attempt 2/3)
         ↓
Success! Continue processing

Use Cases

1. Research & Analysis

# Long research task
request = formation.chat_async(
    "Research top 10 AI companies, analyze products, create comparison report"
)
# Estimated: 2-3 minutes

2. Batch Processing

# Process 100 documents
for doc in documents:
    formation.chat_async(
        f"Analyze {doc.name}",
        webhook_url=f"https://app.com/callback/{doc.id}"
    )
# All process in parallel

3. Scheduled Reports

# Daily report generation
@schedule.daily("09:00")
def generate_report():
    request = formation.chat_async(
        "Generate daily analytics report",
        webhook_url="https://app.com/report-ready"
    )

4. Webhook Integrations

# GitHub webhook → MUXI
@app.post("/github-webhook")
def handle_github(payload):
    request = formation.trigger_async(
        "github-issue-created",
        data=payload
    )
    return {"request_id": request.id}

Best Practices

DO:

  • ✅ Use async for tasks >10 seconds
  • ✅ Implement webhooks (better than polling)
  • ✅ Handle failures gracefully
  • ✅ Set reasonable timeouts
  • ✅ Show progress to users
  • ✅ Provide cancel option

DON'T:

  • ❌ Poll too frequently (use webhooks)
  • ❌ Set timeout too low (causes failures)
  • ❌ Block UI waiting for async tasks
  • ❌ Ignore failure status
  • ❌ Use async for quick tasks (<5s)

Sync vs Async Decision

Use SYNC when:

  • ✅ Task completes in <10 seconds
  • ✅ User needs immediate response
  • ✅ Interactive conversation
  • ✅ Simple queries

Use ASYNC when:

  • ✅ Task takes >10 seconds
  • ✅ Background processing acceptable
  • ✅ Batch operations
  • ✅ Webhook triggers
  • ✅ Resource-intensive tasks

Monitoring

Track async performance:

observability:
  track_async_tasks: true

Metrics:

  • Average async task duration
  • Async task success rate
  • Webhook delivery success
  • Queue depth

Troubleshooting

Task Stuck in "Processing"

# Check timeout settings
overlord:
  workflow:
    timeouts:
      task_timeout: 600          # Increase if needed (default: 300)
      workflow_timeout: 7200     # Increase overall limit (default: 3600)

Webhook Not Receiving Callbacks

1. Check webhook URL is accessible
2. Verify webhook endpoint accepts POST
3. Check firewall/security settings
4. Test with ngrok for local development

Polling Too Slow

# Increase poll frequency (but don't spam)
while True:
    status = formation.get_request(request.id)
    if status.is_complete: break
    time.sleep(2)  # Poll every 2 seconds (reasonable)

Learn More