Agents & Enrichment Loops

In Part 0, we introduced the concept of a life stream: a unified event log of your digital interactions. But raw events are just data. The magic happens when we transform them into something meaningful.

This is where agents come in.

What is an Agent?

In the Vibe Decoding system, an agent is a Kafka consumer that does one thing well: it receives work commands from a dedicated queue, performs some transformation or enrichment, and emits new events as output.

Agents are the workhorses of the enrichment pipeline. Each agent:

  1. Subscribes to a dedicated work topic (e.g., work.fetch_link)
  2. Processes every message it receives (no filtering needed)
  3. Transforms the data (fetch content, call an LLM, compute derived values)
  4. Emits new events that flow back through the system

The beauty of this architecture is that agents are decoupled. The fetcher doesn’t know about the enricher. The enricher doesn’t know about the materializer. A central router dispatches work to each agent’s queue, handling idempotency checks and retry logic. This means:

  • Agents trust their input (if they receive work, it needs to be done)
  • Retry logic is centralized, not duplicated
  • You can monitor backlog per agent independently
  • Scale agents independently based on their workload

The Fetcher Agent

The first agent in our pipeline is the Fetcher. When a work message arrives on work.fetch_link, the fetcher’s job is simple: fetch the URL and extract readable content.

Here’s the core loop:

// From: stream-agents/scripts/agent_fetcher.ts

const CONSUMER_GROUP = 'fetcher-agent-v2';
const TOPIC = 'work.fetch_link';

async function handleMessage({ message }: EachMessagePayload): Promise<void> {
  if (!message.value) return;

  const workMessage: WorkMessage = JSON.parse(message.value.toString());
  const { url } = workMessage.payload as { url?: string };

  if (!url) {
    console.log(`[fetcher] WARNING: Missing url, skipping: ${workMessage.subject_id}`);
    return;
  }

  console.log(`[fetcher] Fetching: ${url} (attempt ${workMessage.attempt}/${workMessage.max_attempts})`);

  try {
    // Rate limit per domain
    const domain = getDomain(url);
    await rateLimitForDomain(domain);

    // Fetch and extract content
    const result = await fetchAndExtract(url);

    // Check if fetch resulted in an error that should trigger retry
    if (result.error && !result.textContent) {
      await emitWorkFailed(workMessage, result.error);
      return;
    }

    // Emit content.fetched event
    await emitContentFetched(workMessage.subject_id, result, workMessage.correlation_id);
  } catch (err) {
    // Emit work.failed for router to handle retry
    await emitWorkFailed(workMessage, err.message);
  }
}

Notice what’s not in this code:

  • No event type filtering (we only receive work meant for us)
  • No idempotency check (the router handles this)
  • Retry logic is delegated to the router via work.failed events

The fetcher uses Mozilla Readability, the same library that powers Firefox’s Reader View, to extract the main content from web pages. This strips away navigation, ads, and other noise, leaving just the article text.

// From: stream-agents/scripts/agent_fetcher.ts

async function fetchAndExtract(url: string): Promise<FetchResult> {
  const controller = new AbortController();
  const timeoutId = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS);

  try {
    const response = await fetch(url, {
      signal: controller.signal,
      headers: {
        'User-Agent': 'Mozilla/5.0 (compatible; LifestreamBot/1.0)',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
      },
      redirect: 'follow',
    });

    clearTimeout(timeoutId);

    if (!response.ok) {
      return {
        finalUrl: response.url,
        title: null,
        textContent: null,
        error: `HTTP ${response.status}: ${response.statusText}`,
      };
    }

    const html = await response.text();
    const { document } = parseHTML(html);

    // Try Readability extraction
    const reader = new Readability(document);
    const article = reader.parse();

    if (article) {
      return {
        finalUrl: response.url,
        title: article.title,
        textContent: article.textContent,
        error: null,
      };
    }

    // Fallback: extract title from document
    const title = document.querySelector('title')?.textContent ?? null;
    return {
      finalUrl: response.url,
      title,
      textContent: null,
      error: 'Readability could not parse article',
    };
  } catch (err) {
    clearTimeout(timeoutId);
    // ... error handling
  }
}

Notice a few important details:

  • Rate limiting per domain: We track the last fetch time for each domain and wait at least 1 second between requests to the same host. This keeps us from hammering servers.
  • Timeout handling: Network requests can hang. We use an AbortController with a 30-second timeout.
  • Error tracking: When extraction fails, we still emit an event, but with the error recorded. This lets downstream systems know what happened without blocking the pipeline.

The Enricher Agent

Once we have content, the Enricher agent takes over. This is where AI enters the picture.

The enricher listens for content.fetched events and calls an LLM to generate:

  • Tags for categorization
  • Short summary (1-2 sentences)
  • Long summary (2-3 paragraphs)
  • Language detection
// From: stream-agents/scripts/agent_enricher.ts

async function enrichContent(
  title: string | null,
  textContent: string,
  existingTags: Set<string>
): Promise<EnrichmentResult> {
  // Truncate content for cost control
  const truncated = textContent.slice(0, MAX_CONTENT_CHARS);

  // Build tag list for prompt (limit size)
  const tagList = Array.from(existingTags).slice(0, MAX_TAGS_IN_PROMPT).join(', ');

  const prompt = `Analyze this article and provide tags and summaries.

EXISTING TAGS (prefer these when appropriate, but create new ones if needed):
${tagList || '(none yet)'}

Rules for tags:
- Use 3-7 tags per article
- Prefer existing tags when they fit
- New tags should be lowercase, hyphenated (e.g., "machine-learning", "web-development")
- Be specific but not too narrow

${title ? `Title: ${title}\n\n` : ''}Content:
${truncated}

Respond ONLY with valid JSON in this exact format:
{
  "tags": ["tag1", "tag2"],
  "summary_short": "1-2 sentences, max 200 chars",
  "summary_long": "2-3 paragraphs",
  "language": "en"
}`;

  const response = await openai.chat.completions.create({
    model: MODEL,
    messages: [{ role: 'user', content: prompt }],
    max_tokens: 1024,
    temperature: 0.3,
    response_format: { type: 'json_object' },
  });

  const content = response.choices[0]?.message?.content;
  if (!content) {
    throw new Error('Empty response from OpenAI');
  }

  return JSON.parse(content) as EnrichmentResult;
}

The enricher uses gpt-4o-mini with a low temperature (0.3) for consistent, deterministic outputs. We also use JSON mode (response_format: { type: 'json_object' }) to ensure we get structured data back.

Any LLM with structured output works here: Claude, local models via Ollama, or other providers. The key is consistent JSON responses.

The Materializer

Agents emit events, but something needs to turn those events into queryable state. That’s the job of the Materializer.

The materializer is a special consumer that listens to all events and projects them into database tables. It’s the bridge between the event stream and the application layer.

// From: stream-agents/scripts/consume_kafka_materialize.ts

async function processEvent(event: LifestreamEvent): Promise<void> {
  switch (event.event_type) {
    case 'link.added':
      await handleLinkAdded(event);
      break;
    case 'content.fetched':
      await handleContentFetched(event);
      break;
    case 'enrichment.completed':
      await handleEnrichmentCompleted(event);
      break;
    case 'publish.completed':
      await handlePublishCompleted(event);
      break;
    case 'temp.reading_recorded':
      await handleTempReading(event);
      break;
    case 'todo.created':
      await handleTodoCreated(event);
      break;
    case 'todo.completed':
      await handleTodoCompleted(event);
      break;
    case 'annotation.added':
      await handleAnnotationAdded(event);
      break;
    default:
      console.log(`  [unknown] Skipping unhandled event type: ${event.event_type}`);
  }
}

Each handler knows how to project its event type into the appropriate tables. For example, when an enrichment.completed event arrives:

// From: stream-agents/scripts/consume_kafka_materialize.ts

async function handleEnrichmentCompleted(event: LifestreamEvent): Promise<void> {
  const { tags, summary_short, summary_long, summary, language, model_version } = event.payload as {
    tags?: string[];
    summary_short?: string;
    summary_long?: string;
    summary?: string;
    language?: string;
    model_version?: string;
  };

  const shortSummary = summary_short ?? summary ?? null;

  // Upsert link_metadata
  await sql`
    INSERT INTO lifestream.link_metadata (subject_id, tags, summary_short, summary_long, language, model_version)
    VALUES (${event.subject_id}, ${tags ?? []}, ${shortSummary}, ${summary_long ?? null}, ${language ?? null}, ${model_version ?? null})
    ON CONFLICT (subject_id) DO UPDATE SET
      tags = CASE
        WHEN array_length(EXCLUDED.tags, 1) > 0 THEN EXCLUDED.tags
        ELSE COALESCE(NULLIF(lifestream.link_metadata.tags, '{}'), EXCLUDED.tags)
      END,
      summary_short = COALESCE(EXCLUDED.summary_short, lifestream.link_metadata.summary_short),
      summary_long = COALESCE(EXCLUDED.summary_long, lifestream.link_metadata.summary_long),
      language = COALESCE(EXCLUDED.language, lifestream.link_metadata.language),
      model_version = COALESCE(EXCLUDED.model_version, lifestream.link_metadata.model_version)
  `;

  // Update link status
  await sql`
    UPDATE lifestream.links
    SET status = 'enriched'
    WHERE subject_id = ${event.subject_id} AND status IN ('new', 'fetched')
  `;
}

The materializer uses ON CONFLICT ... DO UPDATE (upserts) extensively. This means events can be replayed safely: the same event processed twice will produce the same result.

Idempotency and the Router

A critical property of any event-driven system is idempotency: processing the same event multiple times should produce the same result.

Why does this matter? In distributed systems, things fail. Consumers crash. Networks hiccup. Kafka might deliver the same message twice. Without idempotency, you’d end up with duplicate processing, wasted resources, and potentially inconsistent state.

In our architecture, idempotency is handled by the router, not by individual agents. This centralizes the logic and simplifies agent code.

Here’s how the router implements idempotency checks:

// From: stream-agents/scripts/router.ts

async function contentAlreadyFetched(subjectId: string): Promise<boolean> {
  const result = await sql`
    SELECT 1 FROM lifestream.link_content
    WHERE subject_id = ${subjectId}
  `;
  return result.length > 0;
}

async function alreadyEnriched(subjectId: string): Promise<boolean> {
  const result = await sql`
    SELECT 1 FROM lifestream.link_metadata
    WHERE subject_id = ${subjectId}
      AND array_length(tags, 1) > 0
  `;
  return result.length > 0;
}

async function processEvent(event: LifestreamEvent): Promise<void> {
  switch (event.event_type) {
    case 'link.added':
      if (await contentAlreadyFetched(event.subject_id)) return;
      await emitWorkMessage('fetch_link', event);
      break;
    case 'content.fetched':
      if (await alreadyEnriched(event.subject_id)) return;
      await emitWorkMessage('enrich_link', event);
      break;
    // ... etc
  }
}

The router also handles retry logic. When an agent fails, it emits a work.failed event. The router catches this and either retries (up to 3 attempts) or sends to the dead letter queue:

// From: stream-agents/scripts/router.ts

async function handleWorkFailed(event: LifestreamEvent): Promise<void> {
  const { work_message, error, agent } = event.payload;

  if (work_message.attempt < work_message.max_attempts) {
    // Retry with incremented attempt count
    await emitWorkMessage(work_message.work_type, {
      ...work_message,
      attempt: work_message.attempt + 1,
      last_error: error,
    });
  } else {
    // Send to dead letter queue
    await emitToDeadLetter(work_message, error, agent);
  }
}

The materializer still implements idempotency at the Kafka level, tracking which offsets have been processed:

// From: stream-agents/scripts/consume_kafka_materialize.ts

async function isAlreadyProcessed(topic: string, partition: number, offset: bigint): Promise<boolean> {
  const result = await sql`
    SELECT 1 FROM lifestream.event_ingest_dedupe
    WHERE topic = ${topic}
      AND partition = ${partition}
      AND kafka_offset = ${offset}
  `;
  return result.length > 0;
}

This layered approach (router for business logic, materializer for infrastructure) makes the system robust against restarts, replays, and failures while keeping agent code simple.

The KTable Pattern

Here’s a problem: when the enricher generates tags, we want consistency. If one article is tagged machine-learning and another is tagged ml, that’s confusing. We want the LLM to prefer existing tags when they fit.

But how does the enricher know what tags already exist? It could query the database, but that adds latency and couples the agent to the materializer’s schema.

The solution is a KTable, a Kafka-native pattern for maintaining stateful lookups.

The enricher maintains a local set of known tags:

// From: stream-agents/scripts/agent_enricher.ts

const knownTags = new Set<string>();

function handleTagCatalogMessage(message: EachMessagePayload['message']): void {
  if (!message.value) return;

  try {
    const tags: string[] = JSON.parse(message.value.toString());
    knownTags.clear();
    tags.forEach(t => knownTags.add(t));
    console.log(`[enricher] Tag catalog updated: ${knownTags.size} tags`);
  } catch (err) {
    console.error(`[enricher] Failed to parse tag catalog:`, err);
  }
}

When the enricher discovers new tags, it updates the catalog:

// From: stream-agents/scripts/agent_enricher.ts

async function updateTagCatalog(newTags: string[]): Promise<void> {
  const actuallyNew = newTags.filter(t => !knownTags.has(t));

  if (actuallyNew.length === 0) return;

  // Add to local state
  actuallyNew.forEach(t => knownTags.add(t));

  // Emit updated catalog to Kafka
  try {
    const producer = await getProducer();
    await producer.send({
      topic: TAGS_TOPIC,
      messages: [{
        key: 'all',
        value: JSON.stringify(Array.from(knownTags))
      }]
    });

    console.log(`[enricher] Added ${actuallyNew.length} new tags: ${actuallyNew.join(', ')}`);
  } catch (err) {
    console.error(`[enricher] Failed to update tag catalog:`, err);
    // Continue anyway - local state is still updated
  }
}

The tags.catalog topic is configured with log compaction, which means Kafka will keep only the latest message for each key. Since we use a single key (all), the topic effectively stores just the current state of all tags.

When a new enricher instance starts up, it subscribes to both its work topic and the tags catalog:

// From: stream-agents/scripts/agent_enricher.ts

const WORK_TOPIC = 'work.enrich_link';
const TAGS_TOPIC = 'tags.catalog';

await consumer.subscribe({ topics: [WORK_TOPIC, TAGS_TOPIC], fromBeginning: true });

By reading fromBeginning, it rebuilds its local tag catalog before processing any work. This ensures tag consistency even after restarts.

The Enrichment Pipeline

Let’s trace through a complete example. You save a link to an article about React Server Components:

  1. Input capture emits link.added to Kafka
  2. Router sees link.added, checks idempotency, emits to work.fetch_link
  3. Fetcher processes work, fetches the URL, extracts content, emits content.fetched
  4. Router sees content.fetched, emits to work.enrich_link
  5. Enricher processes work, calls the LLM with known tags, emits enrichment.completed
  6. Router sees enrichment.completed, emits to work.publish_link
  7. Publisher emits publish.completed
  8. Materializer sees all events, projects them into links, link_content, link_metadata, and publish_state tables

The link is now fully enriched, searchable by tags, and ready to surface when relevant.

                              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
link.added ──► Router ──────► β”‚work.fetch   │──► Fetcher ──► content.fetched
                              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                       β”‚
                                                                    β–Ό
                              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                    Router
                              β”‚work.enrich  β”‚β—„β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                    β”‚
                                    β–Ό
                               Enricher ──► enrichment.completed ──► Router
                                                                       β”‚
                              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                          β”‚
                              β”‚work.publish β”‚β—„β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                    β”‚
                                    β–Ό
                               Publisher ──► publish.completed
                                                    β”‚
                                                    β–Ό
                                            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                            β”‚  Materializer β”‚
                                            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                                    β”‚
                                                    β–Ό
                                            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                            β”‚   Database    β”‚
                                            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

What’s Next

We’ve built the core enrichment pipeline: fetching content, calling LLMs for analysis, and projecting events into queryable state. In Part 3, we’ll dive deeper into Kafka patterns for personal streams: topics, partitions, consumer groups, and the tradeoffs involved.

But first, try running the agents yourself. Save a few links and watch them flow through the pipeline. There’s something satisfying about seeing an article transform from a bare URL into a tagged, summarized knowledge artifact.


Ready to continue? Head to Part 3: Kafka Patterns (coming soon) to explore the infrastructure that makes this all work.