Skip to main content

Server-Sent Events (SSE)

The @inversifyjs/http-sse package provides a type-safe, adapter-agnostic way to stream server-sent events (SSE) from your Inversify HTTP controllers. Send real-time updates to clients over standard HTTP connections using a simple, decorator-driven API that works across Express 4/5, Fastify, Hono, and uWebSockets.js.

What is Server-Sent Events?

Server-Sent Events (SSE) is a web standard for pushing real-time updates from server to client over HTTP. Unlike WebSockets, SSE is unidirectional (server → client), uses plain HTTP, and automatically handles reconnection. It's perfect for:

  • Live notifications and alerts
  • Real-time dashboards and metrics
  • Chat applications (server broadcasts)
  • Progress tracking for long-running operations
  • Event logs and activity streams

Key features

  • Adapter agnostic: Works with Express 4/5, Fastify, Hono, and uWebSockets.js
  • Type-safe: Full TypeScript support with typed message events
  • Backpressure handling: Built-in stream management prevents overwhelming clients
  • Flexible API: Use async generators or SseStream for manual control
  • Standards compliant: Follows W3C Server-Sent Events specification

Installation

npm install @inversifyjs/http-sse

The package has peer dependencies on @inversifyjs/framework-core and @inversifyjs/http-core, which are typically already installed when using the Inversify HTTP framework.

Basic usage

Use the @SsePublisher() decorator to inject an SSE publisher function into your controller method. Pass it an async generator that yields message events:

@Controller('/events')
export class EventsController {
@Get()
public getEvents(
@SsePublisher()
ssePublisher: (options: SsePublisherOptions) => unknown,
): unknown {
return ssePublisher({
events: this.#generateEvents(),
});
}

async *#generateEvents(): AsyncGenerator<MessageEvent> {
yield { data: 'Event 1' };
yield { data: 'Event 2' };
yield { data: 'Event 3' };
}
}

The client receives events in the standard SSE format:

data: Event 1

data: Event 2

data: Event 3

Message event structure

Each message event supports multiple fields that control how the client processes it:

@Controller('/notifications')
export class NotificationsController {
@Get()
public getNotifications(
@SsePublisher()
ssePublisher: (options: SsePublisherOptions) => unknown,
): unknown {
return ssePublisher({
events: this.#generateNotifications(),
});
}

async *#generateNotifications(): AsyncGenerator<MessageEvent> {
// Simple message with just data
yield { data: 'Hello, world!' };

// Message with event type
yield {
data: 'New notification',
type: 'notification',
};

// Message with ID for client tracking
yield {
data: 'Important update',
id: '123',
type: 'update',
};

// Message with retry interval (milliseconds)
yield {
data: 'Status changed',
retry: 5000,
type: 'status',
};

// Multi-line data
yield {
data: ['Line 1', 'Line 2', 'Line 3'],
type: 'multiline',
};
}
}

MessageEvent fields:

  • data: Message content (string or array of strings for multi-line data). The library does not support embedded newline characters in a single string; multi-line messages must be supplied as an array of strings, where each array entry becomes a separate data: line in the SSE stream. If you have a multi-line string, split it on newlines before sending (e.g., message.split('\n')).
  • type: Event type for client-side filtering (triggers custom event listeners)
  • id: Unique identifier that clients send back when reconnecting
  • retry: Milliseconds before client should retry on connection loss

Using SseStream for manual control

For scenarios where you need manual control over when events are sent, use SseStream. This is a Node.js Readable stream with built-in backpressure handling:

@Controller('/live-updates')
export class LiveUpdatesController {
@Get()
public getLiveUpdates(
@SsePublisher()
ssePublisher: (options: SsePublisherOptions) => unknown,
): unknown {
const stream: SseStream = new SseStream();

// Send events asynchronously
void this.#sendUpdates(stream);

return ssePublisher({
events: stream,
});
}

async #sendUpdates(stream: SseStream): Promise<void> {
// Write events with backpressure handling
await stream.writeMessageEvent({ data: 'Update 1' });

// Simulate some work
await new Promise((resolve: (value: unknown) => void) =>
setTimeout(resolve, 1000),
);

await stream.writeMessageEvent({ data: 'Update 2' });
await stream.writeMessageEvent({ data: 'Update 3' });

// Signal end of stream
stream.end();
}
}

SseStream benefits:

  • Full control over event timing
  • Automatic backpressure handling (waits when client buffer is full)
  • Can be passed around to other services
  • Supports both push and pull patterns

Custom status codes

By default, SSE connections return 200 OK. You can customize the status code:

@Controller('/events')
export class EventsController {
@Get('/custom-status')
public getEventsWithCustomStatus(
@SsePublisher()
ssePublisher: (options: SsePublisherOptions) => unknown,
): unknown {
return ssePublisher({
events: this.#generateEvents(),
statusCode: HttpStatusCode.CREATED,
});
}

async *#generateEvents(): AsyncGenerator<MessageEvent> {
yield { data: 'Event with custom status' };
}
}

Real-world example: Live chat

Here's a complete example showing how to build a broadcast-style chat system with SSE:

@injectable()
class ChatService {
readonly #streams: Set<SseStream> = new Set();

public subscribe(): SseStream {
const stream: SseStream = new SseStream();
this.#streams.add(stream);

// Clean up when stream ends
stream.on('close', () => {
this.#streams.delete(stream);
});

return stream;
}

public async broadcast(message: ChatMessage): Promise<void> {
const messageData: string = JSON.stringify(message);

for (const stream of this.#streams) {
await stream.writeMessageEvent({
data: messageData,
type: 'message',
});
}
}
}

@Controller('/chat')
export class ChatController {
constructor(private readonly chatService: ChatService) {}

@Get('/stream')
public streamMessages(
@SsePublisher()
ssePublisher: (options: SsePublisherOptions) => unknown,
): unknown {
const stream: SseStream = this.chatService.subscribe();

return ssePublisher({
events: stream,
});
}

@Post('/send')
public async sendMessage(@Body() message: ChatMessage): Promise<void> {
await this.chatService.broadcast({
...message,
timestamp: new Date().toISOString(),
});
}
}

How it works:

  1. Clients connect to /chat/stream and receive an SSE stream
  2. ChatService manages multiple active streams
  3. When a message is posted to /chat/send, it's broadcast to all connected clients
  4. Streams are automatically cleaned up when clients disconnect

HTTP headers

The SSE publisher automatically sets the appropriate headers for SSE connections:

  • Content-Type: text/event-stream
  • Cache-Control: private, no-cache, no-store, must-revalidate, max-age=0, no-transform
  • Connection: keep-alive
  • Transfer-Encoding: chunked
  • X-Accel-Buffering: no (for NGINX support)

Client-side usage

Connect to your SSE endpoint from the browser using the standard EventSource API:

// Basic connection
const eventSource = new EventSource('/events');

eventSource.onmessage = (event) => {
console.log('Received:', event.data);
};

// Listen for custom event types
eventSource.addEventListener('notification', (event) => {
console.log('Notification:', event.data);
});

// Handle errors and reconnection
eventSource.onerror = (error) => {
console.error('Connection error:', error);
// Browser automatically reconnects
};

// Close when done
eventSource.close();