Streaming de respuestas
Sin streaming, el servidor espera a que el LLM genere toda la respuesta antes de enviarla al cliente. Esto puede tardar segundos, haciendo que la interfaz parezca congelada.
Con streaming, los tokens se envían conforme se generan, creando la sensación de que el asistente "está escribiendo en tiempo real".
Sin streaming:
Con streaming:
Tecnologías para streaming en Node.js
| Tecnología | Descripción | Cuándo usarla |
|---|---|---|
| SSE (Server-Sent Events) | HTTP unidireccional del server al cliente | Chats, logs en tiempo real |
| WebSockets | Bidireccional | Chat bidireccional, colaboración |
| Streams de Node.js | Streams nativos | Pipelines internos, procesamiento |
Para la mayoría de casos de uso de chat con IA, SSE (Server-Sent Events) es la opción ideal: más simple que WebSockets y suficiente para enviar tokens de texto.
SSE (Server-Sent Events)
Es una conexión HTTP de larga duración donde el servidor envía eventos de texto con el formato:
data: {"texto": "Ho"}\n\n
data: {"texto": "la"}\n\n
data: {"texto": " mundo"}\n\n
data: [DONE]\n\n
Backend: Express con SSE + OpenAI
import express from 'express';
import OpenAI from 'openai';
const app = express();
app.use(express.json());
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
app.post('/chat/stream', async (req, res) => {
const { messages } = req.body;
// Cabeceras necesarias para SSE
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*'); // CORS si es necesario
try {
const stream = await openai.chat.completions.create({
model: 'gpt-4o-mini',
messages,
stream: true
});
for await (const chunk of stream) {
const delta = chunk.choices[0]?.delta?.content;
if (delta) {
// Formato SSE: "data: <payload>\n\n"
res.write(`data: ${JSON.stringify({ text: delta })}\n\n`);
}
// Señal de fin de stream
if (chunk.choices[0]?.finish_reason === 'stop') {
res.write('data: [DONE]\n\n');
res.end();
}
}
} catch (err) {
res.write(`data: ${JSON.stringify({ error: err.message })}\n\n`);
res.end();
}
});
app.listen(3000, () => console.log('Servidor en http://localhost:3000'));
Frontend: consumir SSE
// Vanilla JS
async function enviarMensaje(messages) {
const response = await fetch('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Procesar líneas completas
const lines = buffer.split('\n');
buffer = lines.pop(); // La última línea puede estar incompleta
for (const line of lines) {
if (line.startsWith('data: ')) {
const payload = line.slice(6).trim();
if (payload === '[DONE]') return;
try {
const { text } = JSON.parse(payload);
if (text) {
// Añadir a la UI
document.getElementById('respuesta').textContent += text;
}
} catch {
// Ignorar líneas malformadas
}
}
}
}
}
Streaming con Vercel AI SDK
El AI SDK simplifica mucho el streaming en el backend:
import express from 'express';
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
const app = express();
app.use(express.json());
app.post('/chat', async (req, res) => {
const { messages } = req.body;
// El AI SDK devuelve un objeto con helpers para distintos formatos
const result = streamText({
model: openai('gpt-4o-mini'),
messages
});
// pipeDataStreamToResponse maneja todas las cabeceras y el formato SSE
result.pipeDataStreamToResponse(res);
});
app.listen(3000);
Streaming con Ollama
import express from 'express';
import ollama from 'ollama';
const app = express();
app.use(express.json());
app.post('/chat/stream', async (req, res) => {
const { messages } = req.body;
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
const stream = await ollama.chat({
model: 'llama3.2',
messages,
stream: true
});
for await (const chunk of stream) {
const content = chunk.message?.content;
if (content) {
res.write(`data: ${JSON.stringify({ text: content })}\n\n`);
}
}
res.write('data: [DONE]\n\n');
res.end();
});
app.listen(3000);
WebSockets para chat bidireccional
Cuando el usuario también necesita enviar mensajes durante el streaming (por ejemplo, para interrumpirlo), WebSockets es más adecuado:
npm install ws
import { WebSocketServer } from 'ws';
import OpenAI from 'openai';
import http from 'http';
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const server = http.createServer();
const wss = new WebSocketServer({ server });
wss.on('connection', (ws) => {
const historial = [];
let streamActivo = null;
ws.on('message', async (data) => {
const { type, content } = JSON.parse(data);
if (type === 'message') {
historial.push({ role: 'user', content });
try {
const stream = await openai.chat.completions.create({
model: 'gpt-4o-mini',
messages: historial,
stream: true
});
streamActivo = stream;
let respuestaCompleta = '';
for await (const chunk of stream) {
// Verificar si el cliente canceló
if (ws.readyState !== ws.OPEN) break;
const delta = chunk.choices[0]?.delta?.content;
if (delta) {
respuestaCompleta += delta;
ws.send(JSON.stringify({ type: 'chunk', text: delta }));
}
}
// Guardar respuesta en historial
historial.push({ role: 'assistant', content: respuestaCompleta });
ws.send(JSON.stringify({ type: 'done' }));
} catch (err) {
ws.send(JSON.stringify({ type: 'error', message: err.message }));
}
}
if (type === 'cancel' && streamActivo) {
streamActivo.controller.abort();
ws.send(JSON.stringify({ type: 'cancelled' }));
}
});
});
server.listen(3000, () => console.log('WS en ws://localhost:3000'));
Gestión de errores y reconexión en SSE
// Cliente con reconexión automática
function conectarSSE(url, payload, onChunk) {
const abortController = new AbortController();
async function conectar() {
try {
const res = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
signal: abortController.signal
});
if (!res.ok) throw new Error(`HTTP ${res.status}`);
const reader = res.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n\n').filter(Boolean);
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data !== '[DONE]') onChunk(JSON.parse(data));
}
}
}
} catch (err) {
if (err.name !== 'AbortError') {
console.warn('Reconectando en 2s...', err.message);
setTimeout(conectar, 2000);
}
}
}
conectar();
return () => abortController.abort(); // función para cancelar
}