Agents & Enrichment Loops
In Part 0, we introduced the concept of a life stream: a unified event log of your digital interactions. But raw events are just data. The magic happens when we transform them into something meaningful.
This is where agents come in.
What is an Agent?
In the Vibe Decoding system, an agent is a Kafka consumer that does one thing well: it listens for specific event types, performs some transformation or enrichment, and emits new events as output.
Agents are the workhorses of the enrichment pipeline. Each agent:
- Subscribes to one or more Kafka topics
- Filters for events it cares about
- Transforms the event data (fetch content, call an LLM, compute derived values)
- Emits new events that downstream agents or the materializer can consume
The beauty of this architecture is that agents are decoupled. The fetcher doesn’t know about the enricher. The enricher doesn’t know about the materializer. They communicate only through events, which means you can:
- Replay events to reprocess data
- Add new agents without touching existing ones
- Scale agents independently based on their workload
- Replace an agent’s implementation without changing the interface
The Fetcher Agent
The first agent in our pipeline is the Fetcher. When a link.added event arrives, the fetcher’s job is simple: fetch the URL and extract readable content.
Here’s the core loop:
// From: stream-agents/scripts/agent_fetcher.ts
async function handleMessage({ topic, partition, message }: EachMessagePayload): Promise<void> {
if (!message.value) {
return;
}
const event: LifestreamEvent = JSON.parse(message.value.toString());
// Only process link.added events
if (event.event_type !== 'link.added') {
return;
}
const { url } = event.payload as { url?: string };
if (!url) {
console.log(`[fetcher] WARNING: Missing url in payload, skipping: ${event.subject_id}`);
return;
}
// Check if already fetched (idempotency)
if (await contentAlreadyFetched(event.subject_id)) {
console.log(`[fetcher] Already fetched: ${event.subject_id}`);
return;
}
console.log(`[fetcher] Fetching: ${url}`);
// Rate limit per domain
const domain = getDomain(url);
await rateLimitForDomain(domain);
// Fetch and extract content
const result = await fetchAndExtract(url);
// Emit content.fetched event
await emitContentFetched(event.subject_id, result, event.id);
}
The fetcher uses Mozilla Readability, the same library that powers Firefox’s Reader View, to extract the main content from web pages. This strips away navigation, ads, and other noise, leaving just the article text.
// From: stream-agents/scripts/agent_fetcher.ts
async function fetchAndExtract(url: string): Promise<FetchResult> {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS);
try {
const response = await fetch(url, {
signal: controller.signal,
headers: {
'User-Agent': 'Mozilla/5.0 (compatible; LifestreamBot/1.0)',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
},
redirect: 'follow',
});
clearTimeout(timeoutId);
if (!response.ok) {
return {
finalUrl: response.url,
title: null,
textContent: null,
error: `HTTP ${response.status}: ${response.statusText}`,
};
}
const html = await response.text();
const { document } = parseHTML(html);
// Try Readability extraction
const reader = new Readability(document);
const article = reader.parse();
if (article) {
return {
finalUrl: response.url,
title: article.title,
textContent: article.textContent,
error: null,
};
}
// Fallback: extract title from document
const title = document.querySelector('title')?.textContent ?? null;
return {
finalUrl: response.url,
title,
textContent: null,
error: 'Readability could not parse article',
};
} catch (err) {
clearTimeout(timeoutId);
// ... error handling
}
}
Notice a few important details:
- Rate limiting per domain: We track the last fetch time for each domain and wait at least 1 second between requests to the same host. This keeps us from hammering servers.
- Timeout handling: Network requests can hang. We use an AbortController with a 30-second timeout.
- Error tracking: When extraction fails, we still emit an event, but with the error recorded. This lets downstream systems know what happened without blocking the pipeline.
The Enricher Agent
Once we have content, the Enricher agent takes over. This is where AI enters the picture.
The enricher listens for content.fetched events and calls an LLM to generate:
- Tags for categorization
- Short summary (1-2 sentences)
- Long summary (2-3 paragraphs)
- Language detection
// From: stream-agents/scripts/agent_enricher.ts
async function enrichContent(
title: string | null,
textContent: string,
existingTags: Set<string>
): Promise<EnrichmentResult> {
// Truncate content for cost control
const truncated = textContent.slice(0, MAX_CONTENT_CHARS);
// Build tag list for prompt (limit size)
const tagList = Array.from(existingTags).slice(0, MAX_TAGS_IN_PROMPT).join(', ');
const prompt = `Analyze this article and provide tags and summaries.
EXISTING TAGS (prefer these when appropriate, but create new ones if needed):
${tagList || '(none yet)'}
Rules for tags:
- Use 3-7 tags per article
- Prefer existing tags when they fit
- New tags should be lowercase, hyphenated (e.g., "machine-learning", "web-development")
- Be specific but not too narrow
${title ? `Title: ${title}\n\n` : ''}Content:
${truncated}
Respond ONLY with valid JSON in this exact format:
{
"tags": ["tag1", "tag2"],
"summary_short": "1-2 sentences, max 200 chars",
"summary_long": "2-3 paragraphs",
"language": "en"
}`;
const response = await openai.chat.completions.create({
model: MODEL,
messages: [{ role: 'user', content: prompt }],
max_tokens: 1024,
temperature: 0.3,
response_format: { type: 'json_object' },
});
const content = response.choices[0]?.message?.content;
if (!content) {
throw new Error('Empty response from OpenAI');
}
return JSON.parse(content) as EnrichmentResult;
}
The enricher uses gpt-4o-mini with a low temperature (0.3) for consistent, deterministic outputs. We also use JSON mode (response_format: { type: 'json_object' }) to ensure we get structured data back.
Any LLM with structured output works here: Claude, local models via Ollama, or other providers. The key is consistent JSON responses.
The Materializer
Agents emit events, but something needs to turn those events into queryable state. That’s the job of the Materializer.
The materializer is a special consumer that listens to all events and projects them into database tables. It’s the bridge between the event stream and the application layer.
// From: stream-agents/scripts/consume_kafka_materialize.ts
async function processEvent(event: LifestreamEvent): Promise<void> {
switch (event.event_type) {
case 'link.added':
await handleLinkAdded(event);
break;
case 'content.fetched':
await handleContentFetched(event);
break;
case 'enrichment.completed':
await handleEnrichmentCompleted(event);
break;
case 'publish.completed':
await handlePublishCompleted(event);
break;
case 'temp.reading_recorded':
await handleTempReading(event);
break;
case 'todo.created':
await handleTodoCreated(event);
break;
case 'todo.completed':
await handleTodoCompleted(event);
break;
case 'annotation.added':
await handleAnnotationAdded(event);
break;
default:
console.log(` [unknown] Skipping unhandled event type: ${event.event_type}`);
}
}
Each handler knows how to project its event type into the appropriate tables. For example, when an enrichment.completed event arrives:
// From: stream-agents/scripts/consume_kafka_materialize.ts
async function handleEnrichmentCompleted(event: LifestreamEvent): Promise<void> {
const { tags, summary_short, summary_long, summary, language, model_version } = event.payload as {
tags?: string[];
summary_short?: string;
summary_long?: string;
summary?: string;
language?: string;
model_version?: string;
};
const shortSummary = summary_short ?? summary ?? null;
// Upsert link_metadata
await sql`
INSERT INTO lifestream.link_metadata (subject_id, tags, summary_short, summary_long, language, model_version)
VALUES (${event.subject_id}, ${tags ?? []}, ${shortSummary}, ${summary_long ?? null}, ${language ?? null}, ${model_version ?? null})
ON CONFLICT (subject_id) DO UPDATE SET
tags = CASE
WHEN array_length(EXCLUDED.tags, 1) > 0 THEN EXCLUDED.tags
ELSE COALESCE(NULLIF(lifestream.link_metadata.tags, '{}'), EXCLUDED.tags)
END,
summary_short = COALESCE(EXCLUDED.summary_short, lifestream.link_metadata.summary_short),
summary_long = COALESCE(EXCLUDED.summary_long, lifestream.link_metadata.summary_long),
language = COALESCE(EXCLUDED.language, lifestream.link_metadata.language),
model_version = COALESCE(EXCLUDED.model_version, lifestream.link_metadata.model_version)
`;
// Update link status
await sql`
UPDATE lifestream.links
SET status = 'enriched'
WHERE subject_id = ${event.subject_id} AND status IN ('new', 'fetched')
`;
}
The materializer uses ON CONFLICT ... DO UPDATE (upserts) extensively. This means events can be replayed safely: the same event processed twice will produce the same result.
Agent Idempotency
A critical property of any event-driven system is idempotency: processing the same event multiple times should produce the same result.
Why does this matter? In distributed systems, things fail. Consumers crash. Networks hiccup. Kafka might deliver the same message twice. Without idempotency, you’d end up with duplicate processing, wasted resources, and potentially inconsistent state.
Here’s how the fetcher implements this:
// From: stream-agents/scripts/agent_fetcher.ts
async function contentAlreadyFetched(subjectId: string): Promise<boolean> {
const result = await sql`
SELECT 1 FROM lifestream.link_content
WHERE subject_id = ${subjectId}
`;
return result.length > 0;
}
And the enricher:
// From: stream-agents/scripts/agent_enricher.ts
async function alreadyEnriched(subjectId: string): Promise<boolean> {
const result = await sql`
SELECT 1 FROM lifestream.link_metadata
WHERE subject_id = ${subjectId}
AND array_length(tags, 1) > 0
`;
return result.length > 0;
}
Notice the enricher’s check is slightly more sophisticated: it not only checks if a record exists, but whether it has actual tags. This handles the case where a previous enrichment attempt failed partway through.
The materializer also implements idempotency at the Kafka level, tracking which offsets have been processed:
// From: stream-agents/scripts/consume_kafka_materialize.ts
async function isAlreadyProcessed(topic: string, partition: number, offset: bigint): Promise<boolean> {
const result = await sql`
SELECT 1 FROM lifestream.event_ingest_dedupe
WHERE topic = ${topic}
AND partition = ${partition}
AND kafka_offset = ${offset}
`;
return result.length > 0;
}
This double-layered idempotency (at both the business logic and infrastructure levels) makes the system robust against restarts, replays, and failures.
The KTable Pattern
Here’s a problem: when the enricher generates tags, we want consistency. If one article is tagged machine-learning and another is tagged ml, that’s confusing. We want the LLM to prefer existing tags when they fit.
But how does the enricher know what tags already exist? It could query the database, but that adds latency and couples the agent to the materializer’s schema.
The solution is a KTable, a Kafka-native pattern for maintaining stateful lookups.
The enricher maintains a local set of known tags:
// From: stream-agents/scripts/agent_enricher.ts
const knownTags = new Set<string>();
function handleTagCatalogMessage(message: EachMessagePayload['message']): void {
if (!message.value) return;
try {
const tags: string[] = JSON.parse(message.value.toString());
knownTags.clear();
tags.forEach(t => knownTags.add(t));
console.log(`[enricher] Tag catalog updated: ${knownTags.size} tags`);
} catch (err) {
console.error(`[enricher] Failed to parse tag catalog:`, err);
}
}
When the enricher discovers new tags, it updates the catalog:
// From: stream-agents/scripts/agent_enricher.ts
async function updateTagCatalog(newTags: string[]): Promise<void> {
const actuallyNew = newTags.filter(t => !knownTags.has(t));
if (actuallyNew.length === 0) return;
// Add to local state
actuallyNew.forEach(t => knownTags.add(t));
// Emit updated catalog to Kafka
try {
const producer = await getProducer();
await producer.send({
topic: TAGS_TOPIC,
messages: [{
key: 'all',
value: JSON.stringify(Array.from(knownTags))
}]
});
console.log(`[enricher] Added ${actuallyNew.length} new tags: ${actuallyNew.join(', ')}`);
} catch (err) {
console.error(`[enricher] Failed to update tag catalog:`, err);
// Continue anyway - local state is still updated
}
}
The tags.catalog topic is configured with log compaction, which means Kafka will keep only the latest message for each key. Since we use a single key (all), the topic effectively stores just the current state of all tags.
When a new enricher instance starts up, it subscribes to both topics:
// From: stream-agents/scripts/agent_enricher.ts
await consumer.subscribe({ topics: [EVENTS_TOPIC, TAGS_TOPIC], fromBeginning: true });
By reading fromBeginning, it rebuilds its local tag catalog before processing any events. This ensures tag consistency even after restarts.
The Enrichment Pipeline
Let’s trace through a complete example. You save a link to an article about React Server Components:
- Input capture emits
link.addedto Kafka - Fetcher sees
link.added, fetches the URL, extracts content, emitscontent.fetched - Enricher sees
content.fetched, calls the LLM with known tags, emitsenrichment.completed - Materializer sees all three events, projects them into
links,link_content, andlink_metadatatables
The link is now fully enriched, searchable by tags, and ready to surface when relevant.
link.added ──► Fetcher ──► content.fetched ──► Enricher ──► enrichment.completed
│
▼
┌──────────────┐
│ Materializer │
└──────────────┘
│
▼
┌──────────────┐
│ Database │
│ (queryable │
│ state) │
└──────────────┘
What’s Next
We’ve built the core enrichment pipeline: fetching content, calling LLMs for analysis, and projecting events into queryable state. In Part 3, we’ll dive deeper into Kafka patterns for personal streams: topics, partitions, consumer groups, and the tradeoffs involved.
But first, try running the agents yourself. Save a few links and watch them flow through the pipeline. There’s something satisfying about seeing an article transform from a bare URL into a tagged, summarized knowledge artifact.
Ready to continue? Head to Part 3: Kafka Patterns (coming soon) to explore the infrastructure that makes this all work.