Supporting SSE in Business API Design
A comprehensive guide to Server-Sent Events (SSE) support in Mindbricks Business APIs. Covers streaming modes, stream sources, progress events, iterator actions, AI streaming, Kafka progress listeners, and SSE-aware workflow actions — all configured declaratively through patterns.
Supporting SSE in Business API Design
Overview
Every Mindbricks Business API runs over REST by default — one request, one response. But some operations don't fit that model:
- Large datasets that should be streamed chunk-by-chunk instead of loaded into a single response.
- Long-running operations (AI generation, bulk processing, image analysis) where the client needs progress updates.
- External data sources (third-party SSE feeds, WebSocket services, AI token streams) that produce data incrementally.
The SSE layer solves all of these by adding a companion /stream endpoint alongside the existing REST endpoint. Both share the same Business API pipeline — authentication, parameters, authorization, workflow, actions — but the SSE endpoint delivers results progressively over a persistent HTTP connection using the Server-Sent Events protocol.
Everything is declarative. You configure SSE behavior through patterns; the code generator produces the streaming implementation. No custom code required.
Enabling SSE on a Business API
Add sseSettings to any Business API configuration:
{
"sseSettings": {
"hasSseController": true,
"configuration": {
"responseMode": "stream",
"timeout": 300000,
"chunkSize": 100
}
}
}
This generates a GET /v1/{resource}/stream endpoint (or GET /v1/{resource}/:id/stream for single-object APIs) alongside the existing REST endpoint.
Configuration Properties
| Property | Type | Default | Description |
|---|---|---|---|
hasSseController | Boolean | false | Enables the SSE controller for this API |
responseMode | "stream" | "events" | "stream" | How the SSE endpoint delivers data |
timeout | Integer | 300000 | Maximum SSE connection duration in milliseconds (5 minutes) |
chunkSize | Integer | 100 | Rows per chunk when streaming list results |
streamSource | StreamSourceConfig | null | Custom stream source (see below) |
kafkaProgressListeners | KafkaProgressListener[] | [] | Kafka topic listeners for remote progress |
Response Modes
Stream Mode ("stream")
Designed for delivering data in sequential chunks. The client receives a series of chunk events followed by a complete event.
Typical use cases:
- Streaming a list of database records
- AI token-by-token text generation
- Relaying data from an external SSE or WebSocket source
- File export with progressive delivery
Event sequence:
event: meta
data: { "dataName": "users", "action": "list", "chunkSize": 100, ... }
event: chunk
data: { "index": 0, "users": [...], "count": 100 }
event: chunk
data: { "index": 1, "users": [...], "count": 47 }
event: complete
data: { "totalChunks": 2, "totalRows": 147, "httpStatus": 200 }
The meta event is emitted first and contains session metadata: dataName, action, userId, sessionId, requestId, appVersion, and chunkSize.
The complete event may include additional data from addToOutput() if actions write to the output after the main streaming is done (see Hybrid Stream + Events).
Events Mode ("events")
Designed for long-running operations where the client needs status updates. The pipeline emits progress events at each lifecycle stage, and the final result is delivered as a single result event.
Typical use cases:
- Multi-step operations (validate → process → enrich → save)
- Operations where the result is compact but the process is slow
Event sequence:
event: progress
data: { "step": "auth", "message": "Authentication verified" }
event: progress
data: { "step": "params", "message": "Parameters validated" }
event: progress
data: { "step": "mainOperation", "message": "Starting main operation" }
event: progress
data: { "step": "mainOperation", "message": "Main operation completed" }
event: progress
data: { "step": "buildOutput", "message": "Building response" }
event: result
data: { "status": "OK", "user": { ... }, "httpStatus": 200 }
The Pipeline in SSE Mode
Both SSE modes share the same pre-pipeline as REST:
┌─ Pre-Pipeline (identical to REST) ─────────────────┐
│ checkValidLogin → readParameters → transformParams │
│ → checkParameters → checkBasicAuth │
│ → buildWhereClause → fetchInstance → checkInstance │
│ → buildDataClause │
└─────────────────────────────────────────────────────┘
│
┌──────────┴──────────┐
│ SSE diverges here │
└──────────┬──────────┘
│
┌─────────────┴─────────────┐
│ │
Stream Mode Events Mode
───────────── ────────────
meta event progress events
chunk events at each stage
↓ ↓
runAfterMainOperation mainOperation
addToOutput runAfterMainOperation
complete event buildOutput
result event
│ │
└─────────────┬─────────────┘
│
runAfterResponse (Kafka events, etc.)
Key Differences from REST
buildOutput()is skipped in stream mode — data is sent incrementally, not assembled into a single response object.this.outputis populated with a lightweight summary (totalChunks, totalRows, elapsedMs) for event publishers andrunAfterResponse().emitProgress()is a no-op in REST mode — actions can call it safely regardless of context.
Stream Sources
For list APIs, streaming is automatic. The system uses cursor-based (keyset) pagination internally, fetching chunkSize rows at a time using createdAt + id as the composite cursor. No configuration needed.
For non-list APIs (or when data comes from outside the database), configure a streamSource:
"streamSource": {
"sourceType": "iteratorAction",
"iteratorAction": "chatCompletion"
}
Source Types
libFunction
Uses a registered async generator library function.
{
"sourceType": "libFunction",
"libFunctionName": "streamReportData"
}
The lib function receives the ApiManager context (this) and must return an async iterable:
// In your library module
async function* streamReportData(ctx) {
const rows = await db.fetchLargeDataset(ctx.request.query);
for (const batch of chunk(rows, 100)) {
yield batch;
}
}
iteratorAction
Uses a declared action from the action store (either AiCallAction or IteratorAction). The action must be an async generator.
{
"sourceType": "iteratorAction",
"iteratorAction": "chatCompletion"
}
See AiCallAction and IteratorAction below for details.
sseRelay
Connects to an external SSE endpoint and relays its events as chunks.
{
"sourceType": "sseRelay",
"relayUrl": "process.env.ANALYTICS_SSE_URL + '/stream?userId=' + this.request.params.userId",
"relayAuth": "'Bearer ' + process.env.ANALYTICS_API_KEY"
}
relayUrl and relayAuth are MScript expressions compiled at build time. Use process.env for environment variables, this.request.params for route params, this.request.query for query params, and this for the ApiManager instance.
wsRelay
Connects to an external WebSocket endpoint and relays its messages as chunks.
{
"sourceType": "wsRelay",
"relayUrl": "process.env.PROCESSOR_WS_URL + '/live/' + this.request.params.jobId",
"relayAuth": "'Bearer ' + process.env.PROCESSOR_API_KEY"
}
SSE-Aware Actions
emitProgressInSSEContext — Universal Progress Wrapping
Any action in the BusinessApiActionStore can set emitProgressInSSEContext: true. When enabled, the workflow generator wraps the action invocation with progress events:
event: progress → { step: "enrichUser", message: "Starting enrichUser" }
[action executes]
event: progress → { step: "enrichUser", message: "enrichUser completed" }
This is a simple before/after wrapper. It works with any action type and requires no changes to the action itself. In REST mode, emitProgress() is a no-op — the wrapping is harmless.
{
"actions": {
"functionCallActions": [{
"name": "enrichUser",
"emitProgressInSSEContext": true,
"callScript": "await LIB.enrichUserProfile(this.user)"
}]
}
}
EmitSseEventAction — Custom Named Events
A dedicated action type for emitting arbitrary SSE events during the workflow. Useful for sending structured data at specific pipeline stages.
{
"actions": {
"emitSseEventActions": [{
"name": "emitUserLoaded",
"eventName": "userLoaded",
"data": "{ userId: this.user.id, fullname: this.user.fullname }"
}]
},
"workflow": {
"get": {
"afterMainGetOperation": ["emitUserLoaded"]
}
}
}
The data field is an MScript expression evaluated against the pipeline context. The client receives:
event: userLoaded
data: { "userId": "abc-123", "fullname": "Jane Doe" }
In REST mode, this action is a no-op.
AiCallAction — Auto-Streaming
AiCallAction automatically detects SSE context and streams AI tokens. No flag or extra configuration is needed.
In SSE context: The action calls the AI provider's streaming API, yielding tokens one-by-one. Each token becomes an SSE chunk event to the client.
In REST context: Tokens are collected internally and joined into the full response text. Post-processing (JSON parsing, aiFetchProperty extraction, isArray handling) is applied to the collected result.
{
"actions": {
"aiCallActions": [{
"name": "chatCompletion",
"model": "gpt-4",
"promptTemplate": {
"systemPrompt": "You are a helpful assistant.",
"userPrompt": "this.request.body.message"
}
}]
},
"sseSettings": {
"hasSseController": true,
"configuration": {
"responseMode": "stream",
"streamSource": {
"sourceType": "iteratorAction",
"iteratorAction": "chatCompletion"
}
}
}
}
The client sees:
event: meta → { "dataName": "chat", ... }
event: chunk → { "index": 0, "chat": "Hello", "count": 1 }
event: chunk → { "index": 1, "chat": " there", "count": 1 }
event: chunk → { "index": 2, "chat": "!", "count": 1 }
event: complete → { "totalChunks": 3, "totalRows": 3 }
AiCallAction can also be used as a workflow step (not a stream source). When placed in a workflow milestone:
- SSE mode: tokens stream to the client via
_outputChannel - REST mode: tokens are collected into a string and stored in
this.context["actionName"]
Supported providers: OpenAI (and all OpenAI-compatible APIs: Ollama, Together, Groq) and Anthropic. The provider is determined from the model field.
RefineByAiAction — Auto-Streaming Text Refinement
Like AiCallAction, RefineByAiAction auto-streams in SSE context. It sends the input text to an AI provider for refinement (grammar correction, summarization, tone adjustment) and yields tokens incrementally.
{
"actions": {
"refineByAiActions": [{
"name": "polishDescription",
"provider": "openai",
"inputData": "this.data.description",
"systemPrompt": "You are a professional editor. Improve clarity.",
"mode": "rewrite"
}]
}
}
IteratorAction — Generic Streaming
A generic action for any async iterable source. The sourceScript is an MScript expression that evaluates to an object implementing Symbol.asyncIterator or Symbol.iterator.
{
"actions": {
"iteratorActions": [{
"name": "streamAnalytics",
"sourceScript": "LIB.streamAnalyticsData(this.request.query)"
}]
},
"sseSettings": {
"hasSseController": true,
"configuration": {
"responseMode": "stream",
"streamSource": {
"sourceType": "iteratorAction",
"iteratorAction": "streamAnalytics"
}
}
}
}
The library function must return an async iterable:
async function* streamAnalyticsData(query) {
const cursor = db.createCursor(query);
while (cursor.hasNext()) {
yield await cursor.next();
}
}
LoopAction — Per-Iteration Progress
When emitProgressInSSEContext: true, LoopAction emits a progress event after each iteration with step count and total:
{
"actions": {
"loopActions": [{
"name": "processItems",
"emitProgressInSSEContext": true,
"loopFor": "this.data.items",
"loopItemName": "item",
"actions": ["enrichItem", "validateItem"]
}]
}
}
The client receives:
event: progress → { "step": "processItems", "message": "Starting processItems" }
event: progress → { "step": 1, "total": 20, "message": "Processing 1/20" }
event: progress → { "step": 2, "total": 20, "message": "Processing 2/20" }
...
event: progress → { "step": "processItems", "message": "processItems completed" }
The outer before/after wrapping (from emitProgressInSSEContext) and the inner per-iteration progress work together — the outer events provide the bookends, the inner events provide the granular detail.
CreateBulkCrudAction — Batch Progress
When emitProgressInSSEContext: true, emits progress events showing how many records are being created and how many succeeded:
{
"actions": {
"createBulkCrudActions": [{
"name": "createTickets",
"emitProgressInSSEContext": true,
"childObject": "ticket",
"objects": "this.data.tickets.map(t => ({ ...t, eventId: this.eventId }))"
}]
}
}
The client receives:
event: progress → { "total": 50, "message": "Creating 50 records" }
[bulk insert executes]
event: progress → { "created": 50, "total": 50, "message": "Created 50/50 records" }
Hybrid Stream + Events Flow
A powerful pattern: stream data chunks first, then emit additional events from workflow actions that run after the main operation.
When actions placed in afterMainOperation (or similar post-streaming workflow slots) call emitProgress() or write to the output via addToOutput(), their data is included in the complete event's payload.
{
"sseSettings": {
"hasSseController": true,
"configuration": {
"responseMode": "stream",
"chunkSize": 5
}
},
"actions": {
"emitSseEventActions": [{
"name": "emitSummary",
"eventName": "summary",
"data": "{ totalUsers: this.dbResult?.totalCount }"
}],
"functionCallActions": [{
"name": "computeStats",
"emitProgressInSSEContext": true,
"callScript": "this.stats = { avgAge: 32 }"
}],
"addToResponseActions": [{
"name": "addStats",
"key": "stats",
"value": "this.stats"
}]
},
"workflow": {
"list": {
"afterMainListOperation": [
"emitSummary",
"computeStats",
"addStats"
]
}
}
}
The client sees:
event: meta → { "dataName": "users", ... }
event: chunk → { "index": 0, "users": [...], "count": 5 }
event: chunk → { "index": 1, "users": [...], "count": 3 }
event: summary → { "totalUsers": 8 }
event: progress → { "step": "computeStats", "message": "Starting computeStats" }
event: progress → { "step": "computeStats", "message": "computeStats completed" }
event: complete → { "totalChunks": 2, "totalRows": 8, "stats": { "avgAge": 32 } }
Data added via addToOutput() is merged into the complete event, giving the client both the streamed data and any post-processing results.
Kafka Progress Listeners
For operations that trigger remote processes (background jobs, external services, cron tasks), the SSE connection can stay open after the pipeline completes and listen to Kafka topics for progress updates.
{
"sseSettings": {
"hasSseController": true,
"configuration": {
"responseMode": "events",
"kafkaProgressListeners": [{
"topic": "image-processing-progress",
"correlationField": "dbResult.jobId",
"kafkaFilterField": "jobId",
"eventName": "processingProgress",
"payloadMapping": "{ percent: msg.progress, status: msg.status }",
"completeWhen": "msg.status === 'completed'"
}]
}
}
}
How It Works
- The pipeline executes normally (events or stream mode).
- After the pipeline completes, instead of closing the SSE connection, the controller subscribes to the configured Kafka topics.
- Each incoming Kafka message is checked:
- The
kafkaFilterFieldin the message payload is compared to thecorrelationFieldevaluated from the pipeline context. - Only matching messages pass through.
- The
- Matching messages are transformed via
payloadMapping(MScript) and emitted as SSE events with the configuredeventName. - If
completeWhenevaluates totrue, the listener stops and the SSE connection closes. - The connection also closes on timeout or client disconnect.
Configuration Properties
| Property | Type | Required | Description |
|---|---|---|---|
topic | String | Yes | Kafka topic to subscribe to |
correlationField | MScript | Yes | Expression to extract correlation ID from pipeline context |
kafkaFilterField | String | Yes | Field in Kafka message payload to match against correlation ID |
eventName | String | Yes | SSE event name for relayed messages |
payloadMapping | MScript | No | Transform Kafka payload before sending. msg is the full message. If null, entire payload is forwarded. |
completeWhen | MScript | No | Condition to stop listening. msg is the current message. If null, listens until timeout or disconnect. |
Error Handling
Errors at any pipeline stage are caught and delivered as SSE events:
event: error
data: { "message": "User not found", "code": "SSE_ERROR", "httpStatus": 404 }
If headers haven't been sent yet (error during init() or before the first SSE write), the error is thrown as a normal HTTP error response.
The SSE connection includes:
- Timeout protection: Connections are closed after
timeoutmilliseconds with anSSE_TIMEOUTerror event. - Client disconnect handling: When the client closes the connection, Kafka listeners are cleaned up and any in-progress abort signal is set.
- Abort propagation:
this._abortedis set totrue, which stops list streaming loops and stream source consumption.
SSE-Aware Action Summary
| Action Type | SSE Behavior | Configuration |
|---|---|---|
| AiCallAction | Auto-streams AI tokens as chunks | Automatic — no flag needed |
| RefineByAiAction | Auto-streams refined text tokens | Automatic — no flag needed |
| IteratorAction | Streams any async iterable | sourceScript MScript expression |
| EmitSseEventAction | Emits custom named events | eventName + data MScript |
| LoopAction | Per-iteration progress (step/total) | emitProgressInSSEContext: true |
| CreateBulkCrudAction | Before/after batch count progress | emitProgressInSSEContext: true |
| Any other action | Before/after "Starting…" / "…completed" | emitProgressInSSEContext: true |
All SSE-specific behavior is no-op in REST mode. Actions can be used in APIs that have both REST and SSE endpoints without any conditional logic.
Client-Side Integration
JavaScript (EventSource)
const token = "Bearer eyJ...";
const evtSource = new EventSource(
"https://api.example.com/v1/users/stream",
{ headers: { Authorization: token } }
);
evtSource.addEventListener("meta", (e) => {
const meta = JSON.parse(e.data);
console.log("Stream started:", meta.dataName);
});
evtSource.addEventListener("chunk", (e) => {
const chunk = JSON.parse(e.data);
appendToTable(chunk.users);
});
evtSource.addEventListener("progress", (e) => {
const progress = JSON.parse(e.data);
updateProgressBar(progress);
});
evtSource.addEventListener("complete", (e) => {
const result = JSON.parse(e.data);
console.log(`Done: ${result.totalRows} rows in ${result.totalChunks} chunks`);
evtSource.close();
});
evtSource.addEventListener("error", (e) => {
if (e.data) {
const err = JSON.parse(e.data);
showError(err.message);
}
evtSource.close();
});
Note: The native
EventSourceAPI does not support custom headers. For authenticated SSE endpoints, use a library like eventsource (Node.js), fetch-event-source (browser), or pass the token as a query parameter.
cURL
curl -N -H "Authorization: Bearer $TOKEN" \
http://localhost:3001/v1/users/stream
Postman
- Create a new request with method GET.
- Set the URL to the
/streamendpoint. - Add the
Authorizationheader. - Send the request — Postman will display SSE events in the response body.
Known issue: When sending the same SSE request a second time in the same Postman tab, only the last event may be displayed. Open a new tab or close/reopen the response panel between requests.
Full Example: AI Chat with Progress
A complete configuration for an AI chat API that streams tokens and emits post-generation events:
{
"apiOptions": {
"name": "chatWithAI",
"crudType": "create",
"dataObjectName": "conversation",
"raiseApiEvent": false
},
"sseSettings": {
"hasSseController": true,
"configuration": {
"responseMode": "stream",
"streamSource": {
"sourceType": "iteratorAction",
"iteratorAction": "generateResponse"
}
}
},
"actions": {
"aiCallActions": [{
"name": "generateResponse",
"model": "gpt-4",
"promptTemplate": {
"systemPrompt": "You are a helpful assistant.",
"userPrompt": "this.request.body.message"
}
}],
"functionCallActions": [{
"name": "saveToHistory",
"emitProgressInSSEContext": true,
"callScript": "await LIB.saveConversation(this.conversationId, this.context.generateResponse)"
}],
"emitSseEventActions": [{
"name": "emitTokenCount",
"eventName": "stats",
"data": "{ tokens: this.context.generateResponse?.length }"
}]
},
"workflow": {
"create": {
"afterMainCreateOperation": [
"saveToHistory",
"emitTokenCount"
]
}
}
}
Client receives:
event: meta → { "dataName": "conversation", "action": "create", ... }
event: chunk → { "index": 0, "conversation": "Sure", "count": 1 }
event: chunk → { "index": 1, "conversation": ", I", "count": 1 }
event: chunk → { "index": 2, "conversation": " can", "count": 1 }
event: chunk → { "index": 3, "conversation": " help", "count": 1 }
...
event: progress → { "step": "saveToHistory", "message": "Starting saveToHistory" }
event: progress → { "step": "saveToHistory", "message": "saveToHistory completed" }
event: stats → { "tokens": 42 }
event: complete → { "totalChunks": 42, "totalRows": 42, "httpStatus": 200 }
Full Example: Background Job with Kafka Progress
An API that starts an image processing job and tracks its progress via Kafka:
{
"apiOptions": {
"name": "processImage",
"crudType": "create",
"dataObjectName": "imageJob"
},
"sseSettings": {
"hasSseController": true,
"configuration": {
"responseMode": "events",
"timeout": 600000,
"kafkaProgressListeners": [{
"topic": "image-processing-updates",
"correlationField": "dbResult.id",
"kafkaFilterField": "jobId",
"eventName": "processingUpdate",
"payloadMapping": "{ percent: msg.progress, phase: msg.phase, preview: msg.thumbnailUrl }",
"completeWhen": "msg.status === 'done' || msg.status === 'failed'"
}]
}
}
}
Client receives:
event: progress → { "step": "auth", "message": "Authentication verified" }
event: progress → { "step": "mainOperation", "message": "Main operation completed" }
event: result → { "status": "OK", "imageJob": { "id": "job-123", "status": "queued" } }
event: processingUpdate → { "percent": 10, "phase": "resize", "preview": null }
event: processingUpdate → { "percent": 45, "phase": "filter", "preview": null }
event: processingUpdate → { "percent": 80, "phase": "optimize", "preview": "https://..." }
event: processingUpdate → { "percent": 100, "phase": "complete", "preview": "https://..." }
event: complete → { "message": "Kafka listener completed" }
The SSE connection stays alive after the result event, waiting for Kafka messages. Once the completeWhen condition matches, the connection closes.
Last updated today