Mastering MindbricksSSE in Business API
Mastering Mindbricks

Supporting SSE in Business API Design

A comprehensive guide to Server-Sent Events (SSE) support in Mindbricks Business APIs. Covers streaming modes, stream sources, progress events, iterator actions, AI streaming, Kafka progress listeners, and SSE-aware workflow actions — all configured declaratively through patterns.

Supporting SSE in Business API Design

Overview

Every Mindbricks Business API runs over REST by default — one request, one response. But some operations don't fit that model:

  • Large datasets that should be streamed chunk-by-chunk instead of loaded into a single response.
  • Long-running operations (AI generation, bulk processing, image analysis) where the client needs progress updates.
  • External data sources (third-party SSE feeds, WebSocket services, AI token streams) that produce data incrementally.

The SSE layer solves all of these by adding a companion /stream endpoint alongside the existing REST endpoint. Both share the same Business API pipeline — authentication, parameters, authorization, workflow, actions — but the SSE endpoint delivers results progressively over a persistent HTTP connection using the Server-Sent Events protocol.

Everything is declarative. You configure SSE behavior through patterns; the code generator produces the streaming implementation. No custom code required.


Enabling SSE on a Business API

Add sseSettings to any Business API configuration:

{
  "sseSettings": {
    "hasSseController": true,
    "configuration": {
      "responseMode": "stream",
      "timeout": 300000,
      "chunkSize": 100
    }
  }
}

This generates a GET /v1/{resource}/stream endpoint (or GET /v1/{resource}/:id/stream for single-object APIs) alongside the existing REST endpoint.

Configuration Properties

PropertyTypeDefaultDescription
hasSseControllerBooleanfalseEnables the SSE controller for this API
responseMode"stream" | "events""stream"How the SSE endpoint delivers data
timeoutInteger300000Maximum SSE connection duration in milliseconds (5 minutes)
chunkSizeInteger100Rows per chunk when streaming list results
streamSourceStreamSourceConfignullCustom stream source (see below)
kafkaProgressListenersKafkaProgressListener[][]Kafka topic listeners for remote progress

Response Modes

Stream Mode ("stream")

Designed for delivering data in sequential chunks. The client receives a series of chunk events followed by a complete event.

Typical use cases:

  • Streaming a list of database records
  • AI token-by-token text generation
  • Relaying data from an external SSE or WebSocket source
  • File export with progressive delivery

Event sequence:

event: meta
data: { "dataName": "users", "action": "list", "chunkSize": 100, ... }

event: chunk
data: { "index": 0, "users": [...], "count": 100 }

event: chunk
data: { "index": 1, "users": [...], "count": 47 }

event: complete
data: { "totalChunks": 2, "totalRows": 147, "httpStatus": 200 }

The meta event is emitted first and contains session metadata: dataName, action, userId, sessionId, requestId, appVersion, and chunkSize.

The complete event may include additional data from addToOutput() if actions write to the output after the main streaming is done (see Hybrid Stream + Events).

Events Mode ("events")

Designed for long-running operations where the client needs status updates. The pipeline emits progress events at each lifecycle stage, and the final result is delivered as a single result event.

Typical use cases:

  • Multi-step operations (validate → process → enrich → save)
  • Operations where the result is compact but the process is slow

Event sequence:

event: progress
data: { "step": "auth", "message": "Authentication verified" }

event: progress
data: { "step": "params", "message": "Parameters validated" }

event: progress
data: { "step": "mainOperation", "message": "Starting main operation" }

event: progress
data: { "step": "mainOperation", "message": "Main operation completed" }

event: progress
data: { "step": "buildOutput", "message": "Building response" }

event: result
data: { "status": "OK", "user": { ... }, "httpStatus": 200 }

The Pipeline in SSE Mode

Both SSE modes share the same pre-pipeline as REST:

┌─ Pre-Pipeline (identical to REST) ─────────────────┐
│  checkValidLogin → readParameters → transformParams │
│  → checkParameters → checkBasicAuth                 │
│  → buildWhereClause → fetchInstance → checkInstance  │
│  → buildDataClause                                  │
└─────────────────────────────────────────────────────┘

             ┌──────────┴──────────┐
             │  SSE diverges here  │
             └──────────┬──────────┘

          ┌─────────────┴─────────────┐
          │                           │
    Stream Mode                 Events Mode
    ─────────────              ────────────
    meta event                 progress events
    chunk events               at each stage
    ↓                          ↓
    runAfterMainOperation      mainOperation
    addToOutput                runAfterMainOperation
    complete event             buildOutput
                               result event
          │                           │
          └─────────────┬─────────────┘

           runAfterResponse (Kafka events, etc.)

Key Differences from REST

  1. buildOutput() is skipped in stream mode — data is sent incrementally, not assembled into a single response object.
  2. this.output is populated with a lightweight summary (totalChunks, totalRows, elapsedMs) for event publishers and runAfterResponse().
  3. emitProgress() is a no-op in REST mode — actions can call it safely regardless of context.

Stream Sources

For list APIs, streaming is automatic. The system uses cursor-based (keyset) pagination internally, fetching chunkSize rows at a time using createdAt + id as the composite cursor. No configuration needed.

For non-list APIs (or when data comes from outside the database), configure a streamSource:

"streamSource": {
  "sourceType": "iteratorAction",
  "iteratorAction": "chatCompletion"
}

Source Types

libFunction

Uses a registered async generator library function.

{
  "sourceType": "libFunction",
  "libFunctionName": "streamReportData"
}

The lib function receives the ApiManager context (this) and must return an async iterable:

// In your library module
async function* streamReportData(ctx) {
  const rows = await db.fetchLargeDataset(ctx.request.query);
  for (const batch of chunk(rows, 100)) {
    yield batch;
  }
}

iteratorAction

Uses a declared action from the action store (either AiCallAction or IteratorAction). The action must be an async generator.

{
  "sourceType": "iteratorAction",
  "iteratorAction": "chatCompletion"
}

See AiCallAction and IteratorAction below for details.

sseRelay

Connects to an external SSE endpoint and relays its events as chunks.

{
  "sourceType": "sseRelay",
  "relayUrl": "process.env.ANALYTICS_SSE_URL + '/stream?userId=' + this.request.params.userId",
  "relayAuth": "'Bearer ' + process.env.ANALYTICS_API_KEY"
}

relayUrl and relayAuth are MScript expressions compiled at build time. Use process.env for environment variables, this.request.params for route params, this.request.query for query params, and this for the ApiManager instance.

wsRelay

Connects to an external WebSocket endpoint and relays its messages as chunks.

{
  "sourceType": "wsRelay",
  "relayUrl": "process.env.PROCESSOR_WS_URL + '/live/' + this.request.params.jobId",
  "relayAuth": "'Bearer ' + process.env.PROCESSOR_API_KEY"
}

SSE-Aware Actions

emitProgressInSSEContext — Universal Progress Wrapping

Any action in the BusinessApiActionStore can set emitProgressInSSEContext: true. When enabled, the workflow generator wraps the action invocation with progress events:

event: progress → { step: "enrichUser", message: "Starting enrichUser" }
[action executes]
event: progress → { step: "enrichUser", message: "enrichUser completed" }

This is a simple before/after wrapper. It works with any action type and requires no changes to the action itself. In REST mode, emitProgress() is a no-op — the wrapping is harmless.

{
  "actions": {
    "functionCallActions": [{
      "name": "enrichUser",
      "emitProgressInSSEContext": true,
      "callScript": "await LIB.enrichUserProfile(this.user)"
    }]
  }
}

EmitSseEventAction — Custom Named Events

A dedicated action type for emitting arbitrary SSE events during the workflow. Useful for sending structured data at specific pipeline stages.

{
  "actions": {
    "emitSseEventActions": [{
      "name": "emitUserLoaded",
      "eventName": "userLoaded",
      "data": "{ userId: this.user.id, fullname: this.user.fullname }"
    }]
  },
  "workflow": {
    "get": {
      "afterMainGetOperation": ["emitUserLoaded"]
    }
  }
}

The data field is an MScript expression evaluated against the pipeline context. The client receives:

event: userLoaded
data: { "userId": "abc-123", "fullname": "Jane Doe" }

In REST mode, this action is a no-op.

AiCallAction — Auto-Streaming

AiCallAction automatically detects SSE context and streams AI tokens. No flag or extra configuration is needed.

In SSE context: The action calls the AI provider's streaming API, yielding tokens one-by-one. Each token becomes an SSE chunk event to the client.

In REST context: Tokens are collected internally and joined into the full response text. Post-processing (JSON parsing, aiFetchProperty extraction, isArray handling) is applied to the collected result.

{
  "actions": {
    "aiCallActions": [{
      "name": "chatCompletion",
      "model": "gpt-4",
      "promptTemplate": {
        "systemPrompt": "You are a helpful assistant.",
        "userPrompt": "this.request.body.message"
      }
    }]
  },
  "sseSettings": {
    "hasSseController": true,
    "configuration": {
      "responseMode": "stream",
      "streamSource": {
        "sourceType": "iteratorAction",
        "iteratorAction": "chatCompletion"
      }
    }
  }
}

The client sees:

event: meta   → { "dataName": "chat", ... }
event: chunk  → { "index": 0, "chat": "Hello", "count": 1 }
event: chunk  → { "index": 1, "chat": " there", "count": 1 }
event: chunk  → { "index": 2, "chat": "!", "count": 1 }
event: complete → { "totalChunks": 3, "totalRows": 3 }

AiCallAction can also be used as a workflow step (not a stream source). When placed in a workflow milestone:

  • SSE mode: tokens stream to the client via _outputChannel
  • REST mode: tokens are collected into a string and stored in this.context["actionName"]

Supported providers: OpenAI (and all OpenAI-compatible APIs: Ollama, Together, Groq) and Anthropic. The provider is determined from the model field.

RefineByAiAction — Auto-Streaming Text Refinement

Like AiCallAction, RefineByAiAction auto-streams in SSE context. It sends the input text to an AI provider for refinement (grammar correction, summarization, tone adjustment) and yields tokens incrementally.

{
  "actions": {
    "refineByAiActions": [{
      "name": "polishDescription",
      "provider": "openai",
      "inputData": "this.data.description",
      "systemPrompt": "You are a professional editor. Improve clarity.",
      "mode": "rewrite"
    }]
  }
}

IteratorAction — Generic Streaming

A generic action for any async iterable source. The sourceScript is an MScript expression that evaluates to an object implementing Symbol.asyncIterator or Symbol.iterator.

{
  "actions": {
    "iteratorActions": [{
      "name": "streamAnalytics",
      "sourceScript": "LIB.streamAnalyticsData(this.request.query)"
    }]
  },
  "sseSettings": {
    "hasSseController": true,
    "configuration": {
      "responseMode": "stream",
      "streamSource": {
        "sourceType": "iteratorAction",
        "iteratorAction": "streamAnalytics"
      }
    }
  }
}

The library function must return an async iterable:

async function* streamAnalyticsData(query) {
  const cursor = db.createCursor(query);
  while (cursor.hasNext()) {
    yield await cursor.next();
  }
}

LoopAction — Per-Iteration Progress

When emitProgressInSSEContext: true, LoopAction emits a progress event after each iteration with step count and total:

{
  "actions": {
    "loopActions": [{
      "name": "processItems",
      "emitProgressInSSEContext": true,
      "loopFor": "this.data.items",
      "loopItemName": "item",
      "actions": ["enrichItem", "validateItem"]
    }]
  }
}

The client receives:

event: progress → { "step": "processItems", "message": "Starting processItems" }
event: progress → { "step": 1, "total": 20, "message": "Processing 1/20" }
event: progress → { "step": 2, "total": 20, "message": "Processing 2/20" }
...
event: progress → { "step": "processItems", "message": "processItems completed" }

The outer before/after wrapping (from emitProgressInSSEContext) and the inner per-iteration progress work together — the outer events provide the bookends, the inner events provide the granular detail.

CreateBulkCrudAction — Batch Progress

When emitProgressInSSEContext: true, emits progress events showing how many records are being created and how many succeeded:

{
  "actions": {
    "createBulkCrudActions": [{
      "name": "createTickets",
      "emitProgressInSSEContext": true,
      "childObject": "ticket",
      "objects": "this.data.tickets.map(t => ({ ...t, eventId: this.eventId }))"
    }]
  }
}

The client receives:

event: progress → { "total": 50, "message": "Creating 50 records" }
[bulk insert executes]
event: progress → { "created": 50, "total": 50, "message": "Created 50/50 records" }

Hybrid Stream + Events Flow

A powerful pattern: stream data chunks first, then emit additional events from workflow actions that run after the main operation.

When actions placed in afterMainOperation (or similar post-streaming workflow slots) call emitProgress() or write to the output via addToOutput(), their data is included in the complete event's payload.

{
  "sseSettings": {
    "hasSseController": true,
    "configuration": {
      "responseMode": "stream",
      "chunkSize": 5
    }
  },
  "actions": {
    "emitSseEventActions": [{
      "name": "emitSummary",
      "eventName": "summary",
      "data": "{ totalUsers: this.dbResult?.totalCount }"
    }],
    "functionCallActions": [{
      "name": "computeStats",
      "emitProgressInSSEContext": true,
      "callScript": "this.stats = { avgAge: 32 }"
    }],
    "addToResponseActions": [{
      "name": "addStats",
      "key": "stats",
      "value": "this.stats"
    }]
  },
  "workflow": {
    "list": {
      "afterMainListOperation": [
        "emitSummary",
        "computeStats",
        "addStats"
      ]
    }
  }
}

The client sees:

event: meta     → { "dataName": "users", ... }
event: chunk    → { "index": 0, "users": [...], "count": 5 }
event: chunk    → { "index": 1, "users": [...], "count": 3 }
event: summary  → { "totalUsers": 8 }
event: progress → { "step": "computeStats", "message": "Starting computeStats" }
event: progress → { "step": "computeStats", "message": "computeStats completed" }
event: complete → { "totalChunks": 2, "totalRows": 8, "stats": { "avgAge": 32 } }

Data added via addToOutput() is merged into the complete event, giving the client both the streamed data and any post-processing results.


Kafka Progress Listeners

For operations that trigger remote processes (background jobs, external services, cron tasks), the SSE connection can stay open after the pipeline completes and listen to Kafka topics for progress updates.

{
  "sseSettings": {
    "hasSseController": true,
    "configuration": {
      "responseMode": "events",
      "kafkaProgressListeners": [{
        "topic": "image-processing-progress",
        "correlationField": "dbResult.jobId",
        "kafkaFilterField": "jobId",
        "eventName": "processingProgress",
        "payloadMapping": "{ percent: msg.progress, status: msg.status }",
        "completeWhen": "msg.status === 'completed'"
      }]
    }
  }
}

How It Works

  1. The pipeline executes normally (events or stream mode).
  2. After the pipeline completes, instead of closing the SSE connection, the controller subscribes to the configured Kafka topics.
  3. Each incoming Kafka message is checked:
    • The kafkaFilterField in the message payload is compared to the correlationField evaluated from the pipeline context.
    • Only matching messages pass through.
  4. Matching messages are transformed via payloadMapping (MScript) and emitted as SSE events with the configured eventName.
  5. If completeWhen evaluates to true, the listener stops and the SSE connection closes.
  6. The connection also closes on timeout or client disconnect.

Configuration Properties

PropertyTypeRequiredDescription
topicStringYesKafka topic to subscribe to
correlationFieldMScriptYesExpression to extract correlation ID from pipeline context
kafkaFilterFieldStringYesField in Kafka message payload to match against correlation ID
eventNameStringYesSSE event name for relayed messages
payloadMappingMScriptNoTransform Kafka payload before sending. msg is the full message. If null, entire payload is forwarded.
completeWhenMScriptNoCondition to stop listening. msg is the current message. If null, listens until timeout or disconnect.

Error Handling

Errors at any pipeline stage are caught and delivered as SSE events:

event: error
data: { "message": "User not found", "code": "SSE_ERROR", "httpStatus": 404 }

If headers haven't been sent yet (error during init() or before the first SSE write), the error is thrown as a normal HTTP error response.

The SSE connection includes:

  • Timeout protection: Connections are closed after timeout milliseconds with an SSE_TIMEOUT error event.
  • Client disconnect handling: When the client closes the connection, Kafka listeners are cleaned up and any in-progress abort signal is set.
  • Abort propagation: this._aborted is set to true, which stops list streaming loops and stream source consumption.

SSE-Aware Action Summary

Action TypeSSE BehaviorConfiguration
AiCallActionAuto-streams AI tokens as chunksAutomatic — no flag needed
RefineByAiActionAuto-streams refined text tokensAutomatic — no flag needed
IteratorActionStreams any async iterablesourceScript MScript expression
EmitSseEventActionEmits custom named eventseventName + data MScript
LoopActionPer-iteration progress (step/total)emitProgressInSSEContext: true
CreateBulkCrudActionBefore/after batch count progressemitProgressInSSEContext: true
Any other actionBefore/after "Starting…" / "…completed"emitProgressInSSEContext: true

All SSE-specific behavior is no-op in REST mode. Actions can be used in APIs that have both REST and SSE endpoints without any conditional logic.


Client-Side Integration

JavaScript (EventSource)

const token = "Bearer eyJ...";
const evtSource = new EventSource(
  "https://api.example.com/v1/users/stream",
  { headers: { Authorization: token } }
);

evtSource.addEventListener("meta", (e) => {
  const meta = JSON.parse(e.data);
  console.log("Stream started:", meta.dataName);
});

evtSource.addEventListener("chunk", (e) => {
  const chunk = JSON.parse(e.data);
  appendToTable(chunk.users);
});

evtSource.addEventListener("progress", (e) => {
  const progress = JSON.parse(e.data);
  updateProgressBar(progress);
});

evtSource.addEventListener("complete", (e) => {
  const result = JSON.parse(e.data);
  console.log(`Done: ${result.totalRows} rows in ${result.totalChunks} chunks`);
  evtSource.close();
});

evtSource.addEventListener("error", (e) => {
  if (e.data) {
    const err = JSON.parse(e.data);
    showError(err.message);
  }
  evtSource.close();
});

Note: The native EventSource API does not support custom headers. For authenticated SSE endpoints, use a library like eventsource (Node.js), fetch-event-source (browser), or pass the token as a query parameter.

cURL

curl -N -H "Authorization: Bearer $TOKEN" \
  http://localhost:3001/v1/users/stream

Postman

  1. Create a new request with method GET.
  2. Set the URL to the /stream endpoint.
  3. Add the Authorization header.
  4. Send the request — Postman will display SSE events in the response body.

Known issue: When sending the same SSE request a second time in the same Postman tab, only the last event may be displayed. Open a new tab or close/reopen the response panel between requests.


Full Example: AI Chat with Progress

A complete configuration for an AI chat API that streams tokens and emits post-generation events:

{
  "apiOptions": {
    "name": "chatWithAI",
    "crudType": "create",
    "dataObjectName": "conversation",
    "raiseApiEvent": false
  },
  "sseSettings": {
    "hasSseController": true,
    "configuration": {
      "responseMode": "stream",
      "streamSource": {
        "sourceType": "iteratorAction",
        "iteratorAction": "generateResponse"
      }
    }
  },
  "actions": {
    "aiCallActions": [{
      "name": "generateResponse",
      "model": "gpt-4",
      "promptTemplate": {
        "systemPrompt": "You are a helpful assistant.",
        "userPrompt": "this.request.body.message"
      }
    }],
    "functionCallActions": [{
      "name": "saveToHistory",
      "emitProgressInSSEContext": true,
      "callScript": "await LIB.saveConversation(this.conversationId, this.context.generateResponse)"
    }],
    "emitSseEventActions": [{
      "name": "emitTokenCount",
      "eventName": "stats",
      "data": "{ tokens: this.context.generateResponse?.length }"
    }]
  },
  "workflow": {
    "create": {
      "afterMainCreateOperation": [
        "saveToHistory",
        "emitTokenCount"
      ]
    }
  }
}

Client receives:

event: meta       → { "dataName": "conversation", "action": "create", ... }
event: chunk      → { "index": 0, "conversation": "Sure", "count": 1 }
event: chunk      → { "index": 1, "conversation": ", I", "count": 1 }
event: chunk      → { "index": 2, "conversation": " can", "count": 1 }
event: chunk      → { "index": 3, "conversation": " help", "count": 1 }
...
event: progress   → { "step": "saveToHistory", "message": "Starting saveToHistory" }
event: progress   → { "step": "saveToHistory", "message": "saveToHistory completed" }
event: stats      → { "tokens": 42 }
event: complete   → { "totalChunks": 42, "totalRows": 42, "httpStatus": 200 }

Full Example: Background Job with Kafka Progress

An API that starts an image processing job and tracks its progress via Kafka:

{
  "apiOptions": {
    "name": "processImage",
    "crudType": "create",
    "dataObjectName": "imageJob"
  },
  "sseSettings": {
    "hasSseController": true,
    "configuration": {
      "responseMode": "events",
      "timeout": 600000,
      "kafkaProgressListeners": [{
        "topic": "image-processing-updates",
        "correlationField": "dbResult.id",
        "kafkaFilterField": "jobId",
        "eventName": "processingUpdate",
        "payloadMapping": "{ percent: msg.progress, phase: msg.phase, preview: msg.thumbnailUrl }",
        "completeWhen": "msg.status === 'done' || msg.status === 'failed'"
      }]
    }
  }
}

Client receives:

event: progress         → { "step": "auth", "message": "Authentication verified" }
event: progress         → { "step": "mainOperation", "message": "Main operation completed" }
event: result           → { "status": "OK", "imageJob": { "id": "job-123", "status": "queued" } }
event: processingUpdate → { "percent": 10, "phase": "resize", "preview": null }
event: processingUpdate → { "percent": 45, "phase": "filter", "preview": null }
event: processingUpdate → { "percent": 80, "phase": "optimize", "preview": "https://..." }
event: processingUpdate → { "percent": 100, "phase": "complete", "preview": "https://..." }
event: complete         → { "message": "Kafka listener completed" }

The SSE connection stays alive after the result event, waiting for Kafka messages. Once the completeWhen condition matches, the connection closes.

Was this page helpful?
Built with Documentation.AI

Last updated today