Generation Pipelines
Build non-chat AI generation workflows for batch content creation, media generation, and multi-stage processing pipelines.
AI workflows are not limited to chat interfaces. Many production use cases involve batch content generation, image and media creation, text analysis, and multi-stage processing pipelines. Workflow DevKit makes these pipelines durable, retryable, and observable using the same workflow and step primitives you already know.
This guide covers patterns for building non-chat AI generation workflows.
Batch Content Generation
Process multiple items through an AI pipeline by combining Promise.all with step functions. For batch scenarios where you do not need streaming, use AI SDK's generateText to get complete results from each call.
import { generateText } from "ai";
import { openai } from "@workflow/ai/openai";
async function generateContent(prompt: string) {
"use step";
const { text } = await generateText({
model: openai("gpt-4o"),
prompt,
});
return text;
}
export async function batchGenerateWorkflow(prompts: string[]) {
"use workflow";
const results = await Promise.all(
prompts.map((prompt) => generateContent(prompt))
);
return results;
}Each call to generateContent runs as an independent step. If one fails, it retries without affecting the others. Results are persisted to the event log as they complete.
Image and Media Generation
For workflows that call third-party generation services, use getWritable() to stream progress updates back to the client while the generation runs.
import { getWritable, getStepMetadata } from "workflow";
type ProgressUpdate = {
status: string;
imageUrl?: string;
};
async function generateImage(prompt: string) {
"use step";
const { stepId } = getStepMetadata();
const writable = getWritable<ProgressUpdate>();
const writer = writable.getWriter();
try {
await writer.write({ status: "Submitting generation request..." });
// Call your image generation service
const response = await fetch("https://api.example.com/generate", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt, requestId: stepId }),
});
const { jobId } = await response.json() as { jobId: string };
// Poll for completion
let result: { status: string; url?: string } | undefined;
while (!result?.url) {
await new Promise((r) => setTimeout(r, 2000));
const poll = await fetch(`https://api.example.com/status/${jobId}`);
result = await poll.json() as { status: string; url?: string };
await writer.write({ status: `Generation ${result.status}...` });
}
await writer.write({ status: "complete", imageUrl: result.url });
return result.url;
} finally {
writer.releaseLock();
}
}
export async function imageGenerationWorkflow(prompt: string) {
"use workflow";
const imageUrl = await generateImage(prompt);
return { imageUrl };
}The client receives progress updates in real time. If the step fails midway through polling, the retry re-executes the entire step from the beginning - so make sure your generation service supports idempotent requests or that you can handle duplicate submissions.
Multi-Stage Pipelines
Chain AI steps to build processing pipelines where each stage transforms the output of the previous one. Each step is independently retryable and observable.
import { generateText } from "ai";
import { openai } from "@workflow/ai/openai";
async function extractEntities(text: string) {
"use step";
const { text: entities } = await generateText({
model: openai("gpt-4o"),
system: "Extract all named entities from the text. Return as JSON array.",
prompt: text,
});
return JSON.parse(entities) as string[];
}
async function analyzeSentiment(text: string) {
"use step";
const { text: sentiment } = await generateText({
model: openai("gpt-4o"),
system: "Analyze the sentiment. Return: positive, negative, or neutral.",
prompt: text,
});
return sentiment.trim();
}
async function generateSummary(
text: string,
entities: string[],
sentiment: string
) {
"use step";
const { text: summary } = await generateText({
model: openai("gpt-4o"),
system: "Write a brief summary incorporating the extracted entities and sentiment analysis.",
prompt: `Text: ${text}\nEntities: ${entities.join(", ")}\nSentiment: ${sentiment}`,
});
return summary;
}
export async function analysisPipelineWorkflow(document: string) {
"use workflow";
const entities = await extractEntities(document);
const sentiment = await analyzeSentiment(document);
const summary = await generateSummary(document, entities, sentiment);
return { entities, sentiment, summary };
}If the generateSummary step fails, only that step retries. The extractEntities and analyzeSentiment results are already persisted and replayed from the event log.
Parallel Processing with Fan-Out
Use Promise.all to process multiple documents through AI analysis concurrently. Each item runs as its own step, so failures are isolated.
import { generateText } from "ai";
declare function getModel(): Parameters<typeof generateText>[0]["model"]; // @setup
type AnalysisResult = {
documentId: string;
summary: string;
topics: string[];
};
async function analyzeDocument(
documentId: string,
content: string
): Promise<AnalysisResult> {
"use step";
const { text } = await generateText({
model: getModel(),
system: "Analyze this document. Return JSON with 'summary' (string) and 'topics' (string array).",
prompt: content,
});
const parsed = JSON.parse(text) as { summary: string; topics: string[] };
return { documentId, ...parsed };
}
export async function fanOutAnalysisWorkflow(
documents: Array<{ id: string; content: string }>
) {
"use workflow";
const results = await Promise.all(
documents.map((doc) => analyzeDocument(doc.id, doc.content))
);
return results;
}All documents process in parallel. If one document's analysis fails and exhausts retries, the workflow fails - but all successful results are preserved in the event log. See Common Patterns for more on parallel execution.
Progress Streaming for Generation Tasks
For long-running generation tasks, stream progress updates to clients using getWritable(). This keeps users informed without waiting for the entire workflow to finish.
import { getWritable } from "workflow";
type GenerationProgress = {
completed: number;
total: number;
currentItem: string;
};
async function processItem(item: string, index: number, total: number) {
"use step";
const writable = getWritable<GenerationProgress>();
const writer = writable.getWriter();
try {
// Report progress before processing
await writer.write({
completed: index,
total,
currentItem: item,
});
// Do the actual work
const result = await fetch("https://api.example.com/process", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ item }),
});
return result.json();
} finally {
writer.releaseLock();
}
}
async function closeProgressStream() {
"use step";
await getWritable().close();
}
export async function progressWorkflow(items: string[]) {
"use workflow";
const results = [];
for (let i = 0; i < items.length; i++) {
const result = await processItem(items[i], i, items.length);
results.push(result);
}
await closeProgressStream();
return results;
}import { start } from "workflow/api";
import { progressWorkflow } from "@/workflows/progress-stream";
type GenerationProgress = {
completed: number;
total: number;
currentItem: string;
};
export async function POST(request: Request) {
const { items } = await request.json() as { items: string[] };
const run = await start(progressWorkflow, [items]);
return new Response(run.readable, {
headers: { "Content-Type": "application/json" },
});
}For more streaming patterns, see the Streaming guide.
Error Handling for AI Services
AI services have specific failure modes that benefit from targeted error handling. Use RetryableError for transient failures like rate limits, and FatalError for permanent failures like content policy violations.
import { FatalError, RetryableError, getStepMetadata } from "workflow";
async function callAIService(prompt: string) {
"use step";
const metadata = getStepMetadata();
const response = await fetch("https://api.example.com/generate", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ prompt }),
});
if (response.status === 429) {
// Rate limited - retry with exponential backoff
throw new RetryableError("Rate limited", {
retryAfter: (metadata.attempt ** 2) * 1000,
});
}
if (response.status === 400) {
const body = await response.json() as { error?: string };
// Content policy violation - do not retry
throw new FatalError(`Request rejected: ${body.error}`);
}
if (response.status >= 500) {
// Server error - use default retry behavior
throw new Error(`AI service error: ${response.status}`);
}
return response.json();
}
callAIService.maxRetries = 5; // Allow more retries for AI servicesThe metadata.attempt value starts at 0 and increments on each retry, enabling exponential backoff. For a full breakdown of retry semantics, see Errors and Retries.
When retrying AI generation steps, ensure your requests are idempotent. If your service does not deduplicate requests, retries may produce duplicate content. See Idempotency for strategies.
Related Documentation
- Building Durable AI Agents - Chat-based agent patterns with
DurableAgent - Streaming - In-depth streaming guide
- Common Patterns - Sequential, parallel, and composition patterns
- Errors and Retries - Retry behavior and error types
- Serialization - Data types that can be passed in workflows