Agents & Enrichment Loops
In Part 0, we introduced the concept of a life stream: a unified event log of your digital interactions. But raw events are just data. The magic happens when we transform them into something meaningful.
This is where agents come in.
What is an Agent?
In the Vibe Decoding system, an agent is a Kafka consumer that does one thing well: it receives work commands from a dedicated queue, performs some transformation or enrichment, and emits new events as output.
Agents are the workhorses of the enrichment pipeline. Each agent:
- Subscribes to a dedicated work topic (e.g.,
work.fetch_link) - Processes every message it receives (no filtering needed)
- Transforms the data (fetch content, call an LLM, compute derived values)
- Emits new events that flow back through the system
The beauty of this architecture is that agents are decoupled. The fetcher doesnβt know about the enricher. The enricher doesnβt know about the materializer. A central router dispatches work to each agentβs queue, handling idempotency checks and retry logic. This means:
- Agents trust their input (if they receive work, it needs to be done)
- Retry logic is centralized, not duplicated
- You can monitor backlog per agent independently
- Scale agents independently based on their workload
The Fetcher Agent
The first agent in our pipeline is the Fetcher. When a work message arrives on work.fetch_link, the fetcherβs job is simple: fetch the URL and extract readable content.
Hereβs the core loop:
// From: stream-agents/scripts/agent_fetcher.ts
const CONSUMER_GROUP = 'fetcher-agent-v2';
const TOPIC = 'work.fetch_link';
async function handleMessage({ message }: EachMessagePayload): Promise<void> {
if (!message.value) return;
const workMessage: WorkMessage = JSON.parse(message.value.toString());
const { url } = workMessage.payload as { url?: string };
if (!url) {
console.log(`[fetcher] WARNING: Missing url, skipping: ${workMessage.subject_id}`);
return;
}
console.log(`[fetcher] Fetching: ${url} (attempt ${workMessage.attempt}/${workMessage.max_attempts})`);
try {
// Rate limit per domain
const domain = getDomain(url);
await rateLimitForDomain(domain);
// Fetch and extract content
const result = await fetchAndExtract(url);
// Check if fetch resulted in an error that should trigger retry
if (result.error && !result.textContent) {
await emitWorkFailed(workMessage, result.error);
return;
}
// Emit content.fetched event
await emitContentFetched(workMessage.subject_id, result, workMessage.correlation_id);
} catch (err) {
// Emit work.failed for router to handle retry
await emitWorkFailed(workMessage, err.message);
}
}
Notice whatβs not in this code:
- No event type filtering (we only receive work meant for us)
- No idempotency check (the router handles this)
- Retry logic is delegated to the router via
work.failedevents
The fetcher uses Mozilla Readability, the same library that powers Firefoxβs Reader View, to extract the main content from web pages. This strips away navigation, ads, and other noise, leaving just the article text.
// From: stream-agents/scripts/agent_fetcher.ts
async function fetchAndExtract(url: string): Promise<FetchResult> {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS);
try {
const response = await fetch(url, {
signal: controller.signal,
headers: {
'User-Agent': 'Mozilla/5.0 (compatible; LifestreamBot/1.0)',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
},
redirect: 'follow',
});
clearTimeout(timeoutId);
if (!response.ok) {
return {
finalUrl: response.url,
title: null,
textContent: null,
error: `HTTP ${response.status}: ${response.statusText}`,
};
}
const html = await response.text();
const { document } = parseHTML(html);
// Try Readability extraction
const reader = new Readability(document);
const article = reader.parse();
if (article) {
return {
finalUrl: response.url,
title: article.title,
textContent: article.textContent,
error: null,
};
}
// Fallback: extract title from document
const title = document.querySelector('title')?.textContent ?? null;
return {
finalUrl: response.url,
title,
textContent: null,
error: 'Readability could not parse article',
};
} catch (err) {
clearTimeout(timeoutId);
// ... error handling
}
}
Notice a few important details:
- Rate limiting per domain: We track the last fetch time for each domain and wait at least 1 second between requests to the same host. This keeps us from hammering servers.
- Timeout handling: Network requests can hang. We use an AbortController with a 30-second timeout.
- Error tracking: When extraction fails, we still emit an event, but with the error recorded. This lets downstream systems know what happened without blocking the pipeline.
The Enricher Agent
Once we have content, the Enricher agent takes over. This is where AI enters the picture.
The enricher listens for content.fetched events and calls an LLM to generate:
- Tags for categorization
- Short summary (1-2 sentences)
- Long summary (2-3 paragraphs)
- Language detection
// From: stream-agents/scripts/agent_enricher.ts
async function enrichContent(
title: string | null,
textContent: string,
existingTags: Set<string>
): Promise<EnrichmentResult> {
// Truncate content for cost control
const truncated = textContent.slice(0, MAX_CONTENT_CHARS);
// Build tag list for prompt (limit size)
const tagList = Array.from(existingTags).slice(0, MAX_TAGS_IN_PROMPT).join(', ');
const prompt = `Analyze this article and provide tags and summaries.
EXISTING TAGS (prefer these when appropriate, but create new ones if needed):
${tagList || '(none yet)'}
Rules for tags:
- Use 3-7 tags per article
- Prefer existing tags when they fit
- New tags should be lowercase, hyphenated (e.g., "machine-learning", "web-development")
- Be specific but not too narrow
${title ? `Title: ${title}\n\n` : ''}Content:
${truncated}
Respond ONLY with valid JSON in this exact format:
{
"tags": ["tag1", "tag2"],
"summary_short": "1-2 sentences, max 200 chars",
"summary_long": "2-3 paragraphs",
"language": "en"
}`;
const response = await openai.chat.completions.create({
model: MODEL,
messages: [{ role: 'user', content: prompt }],
max_tokens: 1024,
temperature: 0.3,
response_format: { type: 'json_object' },
});
const content = response.choices[0]?.message?.content;
if (!content) {
throw new Error('Empty response from OpenAI');
}
return JSON.parse(content) as EnrichmentResult;
}
The enricher uses gpt-4o-mini with a low temperature (0.3) for consistent, deterministic outputs. We also use JSON mode (response_format: { type: 'json_object' }) to ensure we get structured data back.
Any LLM with structured output works here: Claude, local models via Ollama, or other providers. The key is consistent JSON responses.
The Materializer
Agents emit events, but something needs to turn those events into queryable state. Thatβs the job of the Materializer.
The materializer is a special consumer that listens to all events and projects them into database tables. Itβs the bridge between the event stream and the application layer.
// From: stream-agents/scripts/consume_kafka_materialize.ts
async function processEvent(event: LifestreamEvent): Promise<void> {
switch (event.event_type) {
case 'link.added':
await handleLinkAdded(event);
break;
case 'content.fetched':
await handleContentFetched(event);
break;
case 'enrichment.completed':
await handleEnrichmentCompleted(event);
break;
case 'publish.completed':
await handlePublishCompleted(event);
break;
case 'temp.reading_recorded':
await handleTempReading(event);
break;
case 'todo.created':
await handleTodoCreated(event);
break;
case 'todo.completed':
await handleTodoCompleted(event);
break;
case 'annotation.added':
await handleAnnotationAdded(event);
break;
default:
console.log(` [unknown] Skipping unhandled event type: ${event.event_type}`);
}
}
Each handler knows how to project its event type into the appropriate tables. For example, when an enrichment.completed event arrives:
// From: stream-agents/scripts/consume_kafka_materialize.ts
async function handleEnrichmentCompleted(event: LifestreamEvent): Promise<void> {
const { tags, summary_short, summary_long, summary, language, model_version } = event.payload as {
tags?: string[];
summary_short?: string;
summary_long?: string;
summary?: string;
language?: string;
model_version?: string;
};
const shortSummary = summary_short ?? summary ?? null;
// Upsert link_metadata
await sql`
INSERT INTO lifestream.link_metadata (subject_id, tags, summary_short, summary_long, language, model_version)
VALUES (${event.subject_id}, ${tags ?? []}, ${shortSummary}, ${summary_long ?? null}, ${language ?? null}, ${model_version ?? null})
ON CONFLICT (subject_id) DO UPDATE SET
tags = CASE
WHEN array_length(EXCLUDED.tags, 1) > 0 THEN EXCLUDED.tags
ELSE COALESCE(NULLIF(lifestream.link_metadata.tags, '{}'), EXCLUDED.tags)
END,
summary_short = COALESCE(EXCLUDED.summary_short, lifestream.link_metadata.summary_short),
summary_long = COALESCE(EXCLUDED.summary_long, lifestream.link_metadata.summary_long),
language = COALESCE(EXCLUDED.language, lifestream.link_metadata.language),
model_version = COALESCE(EXCLUDED.model_version, lifestream.link_metadata.model_version)
`;
// Update link status
await sql`
UPDATE lifestream.links
SET status = 'enriched'
WHERE subject_id = ${event.subject_id} AND status IN ('new', 'fetched')
`;
}
The materializer uses ON CONFLICT ... DO UPDATE (upserts) extensively. This means events can be replayed safely: the same event processed twice will produce the same result.
Idempotency and the Router
A critical property of any event-driven system is idempotency: processing the same event multiple times should produce the same result.
Why does this matter? In distributed systems, things fail. Consumers crash. Networks hiccup. Kafka might deliver the same message twice. Without idempotency, youβd end up with duplicate processing, wasted resources, and potentially inconsistent state.
In our architecture, idempotency is handled by the router, not by individual agents. This centralizes the logic and simplifies agent code.
Hereβs how the router implements idempotency checks:
// From: stream-agents/scripts/router.ts
async function contentAlreadyFetched(subjectId: string): Promise<boolean> {
const result = await sql`
SELECT 1 FROM lifestream.link_content
WHERE subject_id = ${subjectId}
`;
return result.length > 0;
}
async function alreadyEnriched(subjectId: string): Promise<boolean> {
const result = await sql`
SELECT 1 FROM lifestream.link_metadata
WHERE subject_id = ${subjectId}
AND array_length(tags, 1) > 0
`;
return result.length > 0;
}
async function processEvent(event: LifestreamEvent): Promise<void> {
switch (event.event_type) {
case 'link.added':
if (await contentAlreadyFetched(event.subject_id)) return;
await emitWorkMessage('fetch_link', event);
break;
case 'content.fetched':
if (await alreadyEnriched(event.subject_id)) return;
await emitWorkMessage('enrich_link', event);
break;
// ... etc
}
}
The router also handles retry logic. When an agent fails, it emits a work.failed event. The router catches this and either retries (up to 3 attempts) or sends to the dead letter queue:
// From: stream-agents/scripts/router.ts
async function handleWorkFailed(event: LifestreamEvent): Promise<void> {
const { work_message, error, agent } = event.payload;
if (work_message.attempt < work_message.max_attempts) {
// Retry with incremented attempt count
await emitWorkMessage(work_message.work_type, {
...work_message,
attempt: work_message.attempt + 1,
last_error: error,
});
} else {
// Send to dead letter queue
await emitToDeadLetter(work_message, error, agent);
}
}
The materializer still implements idempotency at the Kafka level, tracking which offsets have been processed:
// From: stream-agents/scripts/consume_kafka_materialize.ts
async function isAlreadyProcessed(topic: string, partition: number, offset: bigint): Promise<boolean> {
const result = await sql`
SELECT 1 FROM lifestream.event_ingest_dedupe
WHERE topic = ${topic}
AND partition = ${partition}
AND kafka_offset = ${offset}
`;
return result.length > 0;
}
This layered approach (router for business logic, materializer for infrastructure) makes the system robust against restarts, replays, and failures while keeping agent code simple.
The KTable Pattern
Hereβs a problem: when the enricher generates tags, we want consistency. If one article is tagged machine-learning and another is tagged ml, thatβs confusing. We want the LLM to prefer existing tags when they fit.
But how does the enricher know what tags already exist? It could query the database, but that adds latency and couples the agent to the materializerβs schema.
The solution is a KTable, a Kafka-native pattern for maintaining stateful lookups.
The enricher maintains a local set of known tags:
// From: stream-agents/scripts/agent_enricher.ts
const knownTags = new Set<string>();
function handleTagCatalogMessage(message: EachMessagePayload['message']): void {
if (!message.value) return;
try {
const tags: string[] = JSON.parse(message.value.toString());
knownTags.clear();
tags.forEach(t => knownTags.add(t));
console.log(`[enricher] Tag catalog updated: ${knownTags.size} tags`);
} catch (err) {
console.error(`[enricher] Failed to parse tag catalog:`, err);
}
}
When the enricher discovers new tags, it updates the catalog:
// From: stream-agents/scripts/agent_enricher.ts
async function updateTagCatalog(newTags: string[]): Promise<void> {
const actuallyNew = newTags.filter(t => !knownTags.has(t));
if (actuallyNew.length === 0) return;
// Add to local state
actuallyNew.forEach(t => knownTags.add(t));
// Emit updated catalog to Kafka
try {
const producer = await getProducer();
await producer.send({
topic: TAGS_TOPIC,
messages: [{
key: 'all',
value: JSON.stringify(Array.from(knownTags))
}]
});
console.log(`[enricher] Added ${actuallyNew.length} new tags: ${actuallyNew.join(', ')}`);
} catch (err) {
console.error(`[enricher] Failed to update tag catalog:`, err);
// Continue anyway - local state is still updated
}
}
The tags.catalog topic is configured with log compaction, which means Kafka will keep only the latest message for each key. Since we use a single key (all), the topic effectively stores just the current state of all tags.
When a new enricher instance starts up, it subscribes to both its work topic and the tags catalog:
// From: stream-agents/scripts/agent_enricher.ts
const WORK_TOPIC = 'work.enrich_link';
const TAGS_TOPIC = 'tags.catalog';
await consumer.subscribe({ topics: [WORK_TOPIC, TAGS_TOPIC], fromBeginning: true });
By reading fromBeginning, it rebuilds its local tag catalog before processing any work. This ensures tag consistency even after restarts.
The Enrichment Pipeline
Letβs trace through a complete example. You save a link to an article about React Server Components:
- Input capture emits
link.addedto Kafka - Router sees
link.added, checks idempotency, emits towork.fetch_link - Fetcher processes work, fetches the URL, extracts content, emits
content.fetched - Router sees
content.fetched, emits towork.enrich_link - Enricher processes work, calls the LLM with known tags, emits
enrichment.completed - Router sees
enrichment.completed, emits towork.publish_link - Publisher emits
publish.completed - Materializer sees all events, projects them into
links,link_content,link_metadata, andpublish_statetables
The link is now fully enriched, searchable by tags, and ready to surface when relevant.
βββββββββββββββ
link.added βββΊ Router βββββββΊ βwork.fetch ββββΊ Fetcher βββΊ content.fetched
βββββββββββββββ β
βΌ
βββββββββββββββ Router
βwork.enrich βββββββββββββββββββββββββ
βββββββββββββββ
β
βΌ
Enricher βββΊ enrichment.completed βββΊ Router
β
βββββββββββββββ β
βwork.publish ββββββββββββββββββββββββββββ
βββββββββββββββ
β
βΌ
Publisher βββΊ publish.completed
β
βΌ
ββββββββββββββββ
β Materializer β
ββββββββββββββββ
β
βΌ
ββββββββββββββββ
β Database β
ββββββββββββββββ
Whatβs Next
Weβve built the core enrichment pipeline: fetching content, calling LLMs for analysis, and projecting events into queryable state. In Part 3, weβll dive deeper into Kafka patterns for personal streams: topics, partitions, consumer groups, and the tradeoffs involved.
But first, try running the agents yourself. Save a few links and watch them flow through the pipeline. Thereβs something satisfying about seeing an article transform from a bare URL into a tagged, summarized knowledge artifact.
Ready to continue? Head to Part 3: Kafka Patterns (coming soon) to explore the infrastructure that makes this all work.