Go SDK

Idiomatic Go for MUXI

Build Go applications that interact with MUXI formations. Full support for chat, streaming, context cancellation, and all Formation API operations.

GitHub: muxi-ai/muxi-go

Installation

go get github.com/muxi-ai/muxi-go

Quick Start

package main

import (
    "context"
    "fmt"
    "log"
    "os"

    muxi "github.com/muxi-ai/muxi-go"
)

func main() {
    ctx := context.Background()

    client := muxi.NewFormationClient(&muxi.FormationConfig{
        FormationID: "my-assistant",
        ServerURL:   os.Getenv("MUXI_SERVER_URL"),
        ClientKey:   os.Getenv("MUXI_CLIENT_KEY"),
    })

    resp, err := client.Chat(ctx, &muxi.ChatRequest{
        Message: "Hello!",
        UserID:  "user_123",
    })
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(resp.Response)
}

Formation Client

Initialize

import muxi "github.com/muxi-ai/muxi-go"

// Production (via server proxy)
client := muxi.NewFormationClient(&muxi.FormationConfig{
    FormationID: "my-assistant",
    ServerURL:   "http://localhost:7890",
    ClientKey:   "your_client_key",
    AdminKey:    "your_admin_key",  // Optional, for admin operations
})

// Local development (with muxi up)
client := muxi.NewFormationClient(&muxi.FormationConfig{
    FormationID: "my-assistant",
    ServerURL:   "http://localhost:7890",
    Mode:        "draft",  // Uses /draft/ prefix instead of /api/
    ClientKey:   "your_client_key",
})

// Direct connection (bypasses server proxy)
client := muxi.NewFormationClient(&muxi.FormationConfig{
    URL:       "http://localhost:8001",  // Direct to formation port
    ClientKey: "your_client_key",
})

Local dev workflow: Use Mode: "draft" during development with muxi up, then remove it when deploying to production.

Chat

// Simple chat
resp, err := client.Chat(ctx, &muxi.ChatRequest{
    Message: "Hello!",
    UserID:  "user_123",
})
if err != nil {
    log.Fatal(err)
}
fmt.Println(resp.Response)

Streaming

stream, errs := client.ChatStream(ctx, &muxi.ChatRequest{
    Message: "Tell me a story",
    UserID:  "user_123",
})

for stream != nil {
    select {
    case chunk, ok := <-stream:
        if !ok {
            stream = nil
            continue
        }
        if chunk.Type == "text" {
            fmt.Print(chunk.Text)
        }
    case err := <-errs:
        if err != nil {
            log.Fatal(err)
        }
        errs = nil
    }
}

Chunk types: text, tool_call, tool_result, agent_handoff, thinking, error, done.

Memory

// Get memory config
config, err := client.GetMemoryConfig(ctx)

// Get memories for user
memories, err := client.GetMemories(ctx, "user_123")

// Add a memory (ctx, userID, memType, detail)
err = client.AddMemory(ctx, "user_123", "preference", "User prefers Go")

// Clear user buffer
err = client.ClearUserBuffer(ctx, "user_123")

// Get buffer stats
stats, err := client.GetBufferStats(ctx)

Scheduler

// List scheduled jobs
jobs, err := client.GetSchedulerJobs(ctx, "user_123")

// Create a job (ctx, jobType, schedule, message, userID)
job, err := client.CreateSchedulerJob(ctx, "prompt", "0 9 * * *", "Generate daily summary", "user_123")

// Delete a job
err = client.DeleteSchedulerJob(ctx, "job_abc123")

Sessions & Requests

// Get sessions
sessions, err := client.GetSessions(ctx, "user_123")

// Get request status
status, err := client.GetRequestStatus(ctx, "req_abc123")

// Cancel a request
err = client.CancelRequest(ctx, "req_abc123")

// Stream request events
stream, errs := client.StreamRequest(ctx, "req_abc123")

Event & Log Streaming

// Stream events for a user
events, errs := client.StreamEvents(ctx, "user_123")
for event := range events {
    fmt.Println(event)
}

// Stream logs (admin)
logs, errs := client.StreamLogs(ctx, &muxi.LogFilter{Level: "info"})
for log := range logs {
    fmt.Println(log)
}

Server Client

For managing formations (deploy, start, stop):

import muxi "github.com/muxi-ai/muxi-go"

server := muxi.NewServerClient(&muxi.ServerConfig{
    URL:       os.Getenv("MUXI_SERVER_URL"),
    KeyID:     os.Getenv("MUXI_KEY_ID"),
    SecretKey: os.Getenv("MUXI_SECRET_KEY"),
})

// Check server status
status, err := server.Status(ctx)
fmt.Println(status)

// List formations
formations, err := server.ListFormations(ctx)

// Deploy a formation
res, err := server.DeployFormation(ctx, &muxi.DeployRequest{
    BundlePath: "my-bot.tar.gz",
})
fmt.Println("Deployed:", res.FormationID)

// Stop/start/restart
server.StopFormation(ctx, "my-bot")
server.StartFormation(ctx, "my-bot")
server.RestartFormation(ctx, "my-bot")

// Get server logs
logs, err := server.GetServerLogs(ctx, 200)

Error Handling

resp, err := client.Chat(ctx, &muxi.ChatRequest{Message: "Hello!", UserID: "u1"})
if err != nil {
    var authErr *muxi.AuthenticationError
    var notFound *muxi.NotFoundError
    var rateLimit *muxi.RateLimitError

    switch {
    case errors.As(err, &authErr):
        log.Println("Authentication failed")
    case errors.As(err, &notFound):
        log.Println("Not found:", notFound.Message)
    case errors.As(err, &rateLimit):
        log.Printf("Rate limited, retry after %d seconds", rateLimit.RetryAfter)
    default:
        log.Fatal(err)
    }
}

Error types: AuthenticationError, AuthorizationError, NotFoundError, ValidationError, RateLimitError, ServerError, ConnectionError.

Webhook Handlers

Handle incoming async/trigger webhook callbacks with signature verification:

import "github.com/muxi-ai/muxi-go/webhook"

func handleWebhook(w http.ResponseWriter, r *http.Request) {
    payload, _ := io.ReadAll(r.Body)
    sig := r.Header.Get("X-Muxi-Signature")
    
    // Verify signature (prevents spoofing and replay attacks)
    if err := webhook.VerifySignature(payload, sig, webhookSecret); err != nil {
        http.Error(w, "Invalid signature", http.StatusUnauthorized)
        return
    }
    
    // Parse into typed WebhookEvent
    event, err := webhook.Parse(payload)
    if err != nil {
        http.Error(w, "Invalid payload", http.StatusBadRequest)
        return
    }
    
    switch event.Status {
    case "completed":
        for _, item := range event.Content {
            if item.Type == "text" {
                fmt.Println(item.Text)
            }
        }
    case "failed":
        fmt.Printf("Error: %s - %s
", event.Error.Code, event.Error.Message)
    case "awaiting_clarification":
        fmt.Printf("Question: %s
", event.Clarification.Question)
    }
    
    w.WriteHeader(http.StatusOK)
}

WebhookEvent fields:

  • RequestID, Status, Timestamp
  • Content - Slice of ContentItem (Type, Text, File)
  • Error - *ErrorDetails (Code, Message, Trace)
  • Clarification - *Clarification (Question, ClarificationRequestID)
  • FormationID, UserID, ProcessingTime, Raw

Configuration

  • Default timeout: 30s (no timeout for streaming)
  • Retries: GET/DELETE only, exponential backoff (500ms × 2, max 30s, ±10% jitter)
  • Idempotency: X-Muxi-Idempotency-Key auto-generated on every request
  • Context: All methods accept context.Context for cancellation

Examples

Chat Bot

package main

import (
    "bufio"
    "context"
    "fmt"
    "os"
    "strings"

    muxi "github.com/muxi-ai/muxi-go"
)

func main() {
    ctx := context.Background()
    client := muxi.NewFormationClient(&muxi.FormationConfig{
        FormationID: "my-assistant",
        ServerURL:   os.Getenv("MUXI_SERVER_URL"),
        ClientKey:   os.Getenv("MUXI_CLIENT_KEY"),
    })

    reader := bufio.NewReader(os.Stdin)
    userID := "user_123"

    for {
        fmt.Print("You: ")
        input, _ := reader.ReadString('
')
        input = strings.TrimSpace(input)

        if input == "quit" {
            break
        }

        fmt.Print("Assistant: ")
        stream, errs := client.ChatStream(ctx, &muxi.ChatRequest{
            Message: input,
            UserID:  userID,
        })

        for stream != nil {
            select {
            case chunk, ok := <-stream:
                if !ok {
                    stream = nil
                    continue
                }
                if chunk.Type == "text" {
                    fmt.Print(chunk.Text)
                }
            case err := <-errs:
                if err != nil {
                    fmt.Println("Error:", err)
                }
                errs = nil
            }
        }
        fmt.Println()
    }
}

HTTP Handler

package main

import (
    "context"
    "encoding/json"
    "net/http"
    "os"

    muxi "github.com/muxi-ai/muxi-go"
)

var client = muxi.NewFormationClient(&muxi.FormationConfig{
    FormationID: os.Getenv("MUXI_FORMATION_ID"),
    ServerURL:   os.Getenv("MUXI_SERVER_URL"),
    ClientKey:   os.Getenv("MUXI_CLIENT_KEY"),
})

func chatHandler(w http.ResponseWriter, r *http.Request) {
    var req struct {
        Message string json:"message"
        UserID  string json:"user_id"
    }
    json.NewDecoder(r.Body).Decode(&req)

    resp, err := client.Chat(r.Context(), &muxi.ChatRequest{
        Message: req.Message,
        UserID:  req.UserID,
    })
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    json.NewEncoder(w).Encode(map[string]string{
        "response": resp.Response,
    })
}

func main() {
    http.HandleFunc("/chat", chatHandler)
    http.ListenAndServe(":3000", nil)
}

Learn More