Kafka Patterns

In Part 0, we introduced the concept of a life stream: a unified event log of your digital interactions. But how do you actually transport those events reliably? How do you ensure that a saved bookmark at 2 AM doesn’t disappear into the void before your enrichment agents wake up?

Enter Kafka.

Why Kafka?

You might wonder: why not just write events directly to Postgres and call it a day? For a simple system, that works fine. But as your life stream grows, you’ll run into three problems:

  1. Durability under failure - What happens when your enrichment agent crashes mid-processing? With direct database writes, you’re left guessing which events were handled.

  2. Replay capability - When you deploy a new agent or fix a bug, you want to reprocess historical events. Databases aren’t designed for efficient sequential replay.

  3. Decoupled consumers - Multiple agents need the same events. A fetcher agent grabs content, an enricher agent adds tags, a summarizer generates summaries. They shouldn’t step on each other.

Kafka solves all three. Events are durably stored, can be replayed from any point, and multiple consumers can process the same stream independently.

The Right Tool for the Job

Kafka isn’t always the answer. For a personal life stream with dozens of events per day, it might feel like overkill. But consider what you’re building: a system designed to grow with you for years. The patterns you establish now (durable transport, checkpointed processing, consumer groups) will serve you well as your stream expands.

Plus, running a single-node Kafka cluster is surprisingly lightweight. We’ll cover that setup later.

Topics and Partitions

A Kafka topic is a named stream of events. For our life stream, we use several topics organized into two categories: event topics (facts about what happened) and work topics (commands for what to do).

// From: stream-agents/scripts/init_kafka.ts
const TOPICS: TopicConfig[] = [
  // Event topics (facts)
  {
    name: 'events.raw',
    numPartitions: 3,
    configEntries: [
      { name: 'retention.ms', value: String(7 * 24 * 60 * 60 * 1000) } // 7 days
    ]
  },
  {
    name: 'tags.catalog',
    numPartitions: 1, // Single partition for KTable pattern
    configEntries: [
      { name: 'cleanup.policy', value: 'compact' },
      { name: 'min.compaction.lag.ms', value: '0' },
      { name: 'segment.ms', value: String(60 * 60 * 1000) } // 1 hour segments
    ]
  },
  // Work topics (tasks/commands)
  { name: 'work.fetch_link', numPartitions: 3, ... },
  { name: 'work.enrich_link', numPartitions: 3, ... },
  { name: 'work.publish_link', numPartitions: 3, ... },
  { name: 'work.dead_letter', numPartitions: 1, retentionMs: 30 days },
];

events.raw

This is the main event stream. Every link.added, note.created, and content.fetched event flows through here. Three partitions give us room for parallel processing (though for a personal stream, even one partition would suffice).

The 7-day retention means events stick around long enough to replay if something goes wrong, but don’t accumulate forever.

tags.catalog

This topic uses log compaction, a different pattern. Instead of deleting old messages after a time window, Kafka keeps only the latest message for each key. We use this to maintain a catalog of all known tags, enabling the enricher agent to suggest consistent tagging across articles.

work.* topics

These are work queues for agents. Instead of agents filtering through all events, the router dispatches work to specific topics:

TopicConsumerPurpose
work.fetch_linkFetcher AgentURLs that need content extraction
work.enrich_linkEnricher AgentContent that needs LLM processing
work.publish_linkPublisher AgentLinks ready for publication
work.dead_letter(Manual)Failed work after max retries

This separation provides cleaner architecture, per-queue monitoring, and simplified agent code.

Creating Topics

Topic creation is idempotent. Run it as many times as you want:

// From: stream-agents/scripts/init_kafka.ts
const kafka = getKafka();
const admin = kafka.admin();

await admin.connect();

const existingTopics = await admin.listTopics();

for (const topicConfig of TOPICS) {
  if (existingTopics.includes(topicConfig.name)) {
    console.log(`Topic '${topicConfig.name}' already exists.`);
  } else {
    await admin.createTopics({
      topics: [{
        topic: topicConfig.name,
        numPartitions: topicConfig.numPartitions,
        replicationFactor: 1, // Single node cluster
        configEntries: topicConfig.configEntries
      }]
    });
    console.log(`Topic '${topicConfig.name}' created.`);
  }
}

await admin.disconnect();

The Publisher

Events originate in Postgres (from your iOS Shortcut, browser extension, or API calls), but they need to flow into Kafka for processing. The publisher is a simple forwarder that bridges these two worlds.

The key challenge: at-least-once semantics. If the publisher crashes after sending to Kafka but before recording its progress, it might resend the same events on restart. We solve this by tracking publication status directly on each event.

Why Not Timestamp-Based Checkpoints?

An earlier version of this system used a checkpoint table storing the last published timestamp and event ID. This approach had a subtle but serious flaw: if the publisher stopped running and restarted later, events created during the downtime could be skipped if newer events were processed first.

The boolean column approach is more robust:

  • No events skipped: Every event starts with published_to_kafka = false
  • Restart-safe: The publisher picks up where it left off, regardless of timing
  • Simple queries: Just WHERE published_to_kafka = false

The tradeoff is write amplification (an UPDATE after each INSERT), but for a personal system this is negligible.

Schema

-- From: stream-agents/schema.sql
CREATE TABLE IF NOT EXISTS events (
  id              uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  occurred_at     timestamptz NOT NULL,
  received_at     timestamptz NOT NULL DEFAULT now(),
  source          text NOT NULL,
  subject         text NOT NULL,
  subject_id      text NOT NULL,
  event_type      text NOT NULL,
  payload         jsonb NOT NULL DEFAULT '{}'::jsonb,
  -- ... other columns ...

  -- Kafka publication tracking
  published_to_kafka boolean NOT NULL DEFAULT false
);

-- Partial index for efficient queries
CREATE INDEX IF NOT EXISTS events_unpublished_idx
  ON events(received_at) WHERE published_to_kafka = false;

The Publish Loop

The publisher runs continuously, polling for unpublished events:

// From: stream-agents/scripts/publish_events_to_kafka.ts
while (true) {
  // Query unpublished events
  const events = await sql`
    SELECT *
    FROM lifestream.events
    WHERE published_to_kafka = false
    ORDER BY received_at ASC
    LIMIT 50
  `;

  if (events.length > 0) {
    const messages = events.map(e => ({
      key: e.subject_id,
      value: JSON.stringify(e),
      headers: {
        event_type: e.event_type,
        source: e.source
      }
    }));

    await producer.send({
      topic: 'events.raw',
      messages
    });

    // Mark events as published
    const eventIds = events.map(e => e.id);
    await sql`
      UPDATE lifestream.events
      SET published_to_kafka = true
      WHERE id = ANY(${eventIds}::uuid[])
    `;

    console.log(`Published ${events.length} events to Kafka.`);
  }

  await new Promise(r => setTimeout(r, 5000));
}

Notice the order: send to Kafka first, then mark as published. If we crash between these two operations, we’ll resend some events on restart, but that’s fine because our consumers are idempotent.

The Router Pattern

events.raw Router work.fetch work.enrich work.publish Fetcher Enricher Publisher Materializer Postgres

Before diving into consumer groups, let’s talk about a key architectural pattern: the router.

In the early versions of this system, each agent consumed directly from events.raw and filtered for the events it cared about. This worked, but had drawbacks:

  • Every agent received every event and had to filter locally
  • Idempotency checks were duplicated across agents
  • No centralized place for retry logic
  • Hard to monitor backlog per work type

The solution is a router that separates “facts” (events) from “tasks” (work commands):

                         events.raw

                        [ROUTER]
              (idempotency + retry logic)

        ┌────────────────────┼────────────────────┐
        ↓                    ↓                    ↓
   work.fetch_link     work.enrich_link    work.publish_link
        │                    │                    │
        ↓                    ↓                    ↓
   [Fetcher]            [Enricher]          [Publisher]

Router Implementation

The router consumes from events.raw and, for each event, decides whether to create work:

// From: stream-agents/scripts/router.ts
async function processEvent(event: LifestreamEvent): Promise<void> {
  switch (event.event_type) {
    case 'link.added':
      // Check if already fetched (idempotency)
      if (await contentAlreadyFetched(event.subject_id)) return;
      await emitWorkMessage('fetch_link', event);
      break;
    case 'content.fetched':
      // Skip if fetch had error or no content
      if (event.payload.fetch_error || !event.payload.text_content) return;
      if (await alreadyEnriched(event.subject_id)) return;
      await emitWorkMessage('enrich_link', event);
      break;
    case 'enrichment.completed':
      if (await alreadyPublished(event.subject_id)) return;
      await emitWorkMessage('publish_link', event);
      break;
    case 'work.failed':
      await handleWorkFailed(event);
      break;
  }
}

All idempotency checks happen here, in one place. Agents can trust that if they receive a work message, the work needs to be done.

Work Messages

Work messages are simpler than events—they’re commands, not facts:

// From: stream-agents/src/lib/work_message.ts
interface WorkMessage {
  subject_id: string;
  work_type: 'fetch_link' | 'enrich_link' | 'publish_link';
  correlation_id: string;
  triggered_by_event_id: string;
  attempt: number;        // starts at 1, incremented on retry
  max_attempts: number;   // default 3
  created_at: string;
  last_error?: string;    // populated on retry
  payload: Record<string, unknown>;
}

The attempt and max_attempts fields enable retry logic, which we’ll cover shortly.

Retry and Dead Letter Queue

When an agent fails to process work, it emits a work.failed event. The router catches this and decides whether to retry:

// From: stream-agents/scripts/router.ts
async function handleWorkFailed(event: LifestreamEvent): Promise<void> {
  const { work_message, error, agent } = event.payload;

  if (work_message.attempt < work_message.max_attempts) {
    // Retry: re-emit with incremented attempt
    const retryWork = {
      ...work_message,
      attempt: work_message.attempt + 1,
      last_error: error,
    };
    await emitWorkMessage(work_message.work_type, retryWork);
  } else {
    // Max retries exceeded: send to dead letter queue
    await emitToDeadLetter(work_message, error, agent);
  }
}

The dead letter queue (work.dead_letter) retains failed messages for 30 days. You can inspect them, fix the underlying issue, and replay manually.

Consumer Groups

Kafka’s consumer groups are where the magic happens. Multiple consumers in the same group coordinate to process a topic in parallel, with each partition assigned to exactly one consumer.

Creating a Consumer

// From: stream-agents/src/lib/kafka.ts
export const createConsumer = async (groupId: string): Promise<Consumer> => {
  const consumer = kafka.consumer({ groupId });
  await consumer.connect();
  return consumer;
};

The Fetcher Agent Pattern

With the router in place, agents become much simpler. They consume from their dedicated work topic and trust that every message needs processing:

// From: stream-agents/scripts/agent_fetcher.ts
const CONSUMER_GROUP = 'fetcher-agent-v2';
const TOPIC = 'work.fetch_link';  // Dedicated work topic

async function main(): Promise<void> {
  console.log('Starting Fetcher Agent...');

  const consumer = await createConsumer(CONSUMER_GROUP);
  await consumer.subscribe({ topic: TOPIC, fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ message }) => {
      if (!message.value) return;

      const workMessage = JSON.parse(message.value.toString());
      const { url } = workMessage.payload;

      console.log(`Fetching: ${url} (attempt ${workMessage.attempt}/${workMessage.max_attempts})`);

      try {
        const result = await fetchAndExtract(url);
        await emitContentFetched(workMessage.subject_id, result, workMessage.correlation_id);
      } catch (err) {
        // Emit work.failed for router to handle retry
        await emitWorkFailed(workMessage, err.message);
      }
    },
  });
}

Notice what’s missing compared to the old pattern:

  • No event type filtering (every message is relevant)
  • No idempotency check (router handles this)
  • Retry is handled by emitting work.failed, not by the agent

Key Consumer Concepts

Group ID versioning: Notice fetcher-agent-v2. When you change processing logic significantly, bump the version. This creates a new consumer group that starts from the beginning, reprocessing all work with the new logic.

fromBeginning: true: For a new consumer group, start from the earliest available offset. This ensures you don’t miss work that arrived before the consumer started.

Trust the router: Agents no longer do idempotency checks. If they receive a work message, the work needs to be done.

Multi-Topic Consumers

Some agents still need multiple topics. The enricher agent consumes both its work topic and tags.catalog:

// From: stream-agents/scripts/agent_enricher.ts
const CONSUMER_GROUP = 'enricher-agent-v2';
const WORK_TOPIC = 'work.enrich_link';
const TAGS_TOPIC = 'tags.catalog';

// Subscribe to both topics
await consumer.subscribe({
  topics: [WORK_TOPIC, TAGS_TOPIC],
  fromBeginning: true
});

await consumer.run({
  eachMessage: async ({ topic, message }) => {
    if (topic === TAGS_TOPIC) {
      // Update local tag catalog
      handleTagCatalogMessage(message);
      return;
    }

    // Handle work messages
    const workMessage = JSON.parse(message.value.toString());
    await processEnrichment(workMessage);
  },
});

This pattern builds a local cache of the tag catalog, keeping the enricher agent’s view of known tags up-to-date.

Authentication

For a personal system running on your own infrastructure, you have options ranging from “no auth” to “full enterprise security.” We’ll cover the two most practical approaches.

SASL/PLAIN

Simple username/password authentication. Easy to set up, works everywhere:

// From: stream-agents/src/lib/kafka.ts
const getSaslConfig = (): SASLOptions | undefined => {
  if (authMethod === 'plain') {
    if (!plainUser || !plainPass) {
      throw new Error(
        'KAFKA_SASL_USERNAME and KAFKA_SASL_PASSWORD required'
      );
    }
    return {
      mechanism: 'plain',
      username: plainUser,
      password: plainPass,
    };
  }
  return undefined;
};

Set these environment variables:

KAFKA_AUTH_METHOD=plain
KAFKA_SASL_USERNAME=your-username
KAFKA_SASL_PASSWORD=your-secure-password
KAFKA_SSL=true

AWS IAM (for MSK)

If you’re running on Amazon MSK, IAM authentication is the cleanest option:

// From: stream-agents/src/lib/kafka.ts
import { generateAuthToken } from 'aws-msk-iam-sasl-signer-js';

const getSaslConfig = (): SASLOptions | undefined => {
  if (authMethod === 'aws-iam') {
    const region = process.env.AWS_REGION || 'us-west-2';

    return {
      mechanism: 'oauthbearer',
      oauthBearerProvider: async () => {
        const result = await generateAuthToken({ region });
        return { value: result.token };
      }
    };
  }
  // ...
};

This uses your AWS credentials (from environment, instance profile, or credentials file) to generate short-lived tokens. No passwords to manage.

The Full Client Configuration

Putting it together:

// From: stream-agents/src/lib/kafka.ts
const brokers = process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'];
const clientId = process.env.KAFKA_CLIENT_ID || 'stream-agents';
const authMethod = process.env.KAFKA_AUTH_METHOD || 'none';

const useSSL = authMethod === 'aws-iam' || process.env.KAFKA_SSL === 'true';

const kafkaConfig = {
  clientId,
  brokers,
  ssl: useSSL,
  connectionTimeout: 10000,
  authenticationTimeout: 10000,
  retry: {
    initialRetryTime: 500,
    retries: 8
  }
};

const sasl = getSaslConfig();
if (sasl) {
  kafkaConfig.sasl = sasl;
}

const kafka = new Kafka(kafkaConfig);

Local Development

You don’t need a managed Kafka service to get started. A single EC2 instance running Kafka in KRaft mode (no ZooKeeper) works beautifully for personal use.

The Architecture

┌─────────────────────────────────────────────────┐
│                   EC2 Instance                  │
│  ┌───────────────┐      ┌───────────────────┐   │
│  │     nginx     │──────│      Kafka        │   │
│  │   (TLS proxy) │ 9092 │   (KRaft mode)    │   │
│  │    :9093      │      │                   │   │
│  └───────────────┘      └───────────────────┘   │
│         │                                        │
│         │ Let's Encrypt                          │
│         │ TLS termination                        │
└─────────┼───────────────────────────────────────┘

    Public Internet

    ┌─────┴─────┐
    │  Clients  │
    └───────────┘

Kafka listens on localhost:9092. nginx terminates TLS on port 9093 and proxies to Kafka. Let’s Encrypt provides free, auto-renewing certificates.

Why This Setup?

  1. TLS everywhere - Your events flow over the public internet. Encryption is non-negotiable.

  2. SASL/PLAIN auth - Simple but effective. Combined with TLS, it’s secure enough for personal use.

  3. KRaft mode - Kafka 3.x+ can run without ZooKeeper, dramatically simplifying single-node deployments.

  4. Minimal cost - A t3.small instance (~$15/month) handles a personal life stream easily.

Terraform Deployment

We use Terraform to provision the infrastructure. Here’s the key configuration:

# From: stream-agents/infrastructure/ec2-kafka/ec2.tf
resource "aws_instance" "kafka" {
  ami           = data.aws_ami.ubuntu.id
  instance_type = "t3.small"

  root_block_device {
    volume_size = 20 # GB
    volume_type = "gp3"
  }

  user_data = <<-EOF
#!/bin/bash
set -e

# Install Docker and nginx
apt-get update
apt-get install -y docker.io nginx libnginx-mod-stream certbot

# Obtain Let's Encrypt certificate
certbot certonly --standalone \
  --non-interactive \
  --agree-tos \
  --email admin@kafka.yourdomain.com \
  --domain kafka.yourdomain.com

# Configure nginx as TLS proxy
# ... (see full config in repository)

# Start Kafka in KRaft mode
docker run -d --name kafka \
  --restart always \
  -p 127.0.0.1:9092:9092 \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_PROCESS_ROLES=controller,broker \
  -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094 \
  -e KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://kafka.yourdomain.com:9093" \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT \
  -e KAFKA_SASL_ENABLED_MECHANISMS=PLAIN \
  # ... additional config
  confluentinc/cp-kafka:7.6.0
EOF
}

Connecting to Your Cluster

Once deployed, configure your clients:

export KAFKA_BROKERS=kafka.yourdomain.com:9093
export KAFKA_AUTH_METHOD=plain
export KAFKA_SASL_USERNAME=your-username
export KAFKA_SASL_PASSWORD=your-password
export KAFKA_SSL=true

Then initialize topics and start publishing:

bun run scripts/init_kafka.ts
bun run scripts/publish_events_to_kafka.ts

Putting It Together

With Kafka and the router in place, your life stream architecture looks like this:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Postgres  │────▶│    Kafka    │────▶│   Router    │────▶│   Agents    │
│  (events)   │     │ events.raw  │     │ (dispatch)  │     │  (process)  │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘
      │                    │                    │                    │
      │                    │                    │                    │
      ▼                    ▼                    ▼                    ▼
   Source of           Durable            Idempotency           Emit new
    Truth             Transport            + Retry               events

Events are born in Postgres, flow through Kafka for durable transport, get dispatched by the router to agent-specific work queues, get processed by agents, and the results land back in Postgres for querying.

Running the Full Pipeline

Start these in separate terminals:

# Infrastructure
bun run kafka:publish      # DB → Kafka forwarder
bun run kafka:materialize  # Kafka → DB state sync
bun run router             # events.raw → work.* topics

# Agents
bun run agent:fetcher      # Fetches URLs, extracts content
bun run agent:enricher     # Calls OpenAI for tags/summaries
bun run agent:publisher    # Marks links as published

Next up: Part 4 explores real-time stream processing with Apache Flink (coming soon). We’ll take the patterns established here and add windowed aggregations, complex event processing, and joins across streams.

For more on the overall architecture, revisit Part 0: Vibe Decoding.