Kafka Patterns

In Part 0, we introduced the concept of a life stream: a unified event log of your digital interactions. But how do you actually transport those events reliably? How do you ensure that a saved bookmark at 2 AM doesn’t disappear into the void before your enrichment agents wake up?

Enter Kafka.

Why Kafka?

You might wonder: why not just write events directly to Postgres and call it a day? For a simple system, that works fine. But as your life stream grows, you’ll run into three problems:

  1. Durability under failure - What happens when your enrichment agent crashes mid-processing? With direct database writes, you’re left guessing which events were handled.

  2. Replay capability - When you deploy a new agent or fix a bug, you want to reprocess historical events. Databases aren’t designed for efficient sequential replay.

  3. Decoupled consumers - Multiple agents need the same events. A fetcher agent grabs content, an enricher agent adds tags, a summarizer generates summaries. They shouldn’t step on each other.

Kafka solves all three. Events are durably stored, can be replayed from any point, and multiple consumers can process the same stream independently.

The Right Tool for the Job

Kafka isn’t always the answer. For a personal life stream with dozens of events per day, it might feel like overkill. But consider what you’re building: a system designed to grow with you for years. The patterns you establish now (durable transport, checkpointed processing, consumer groups) will serve you well as your stream expands.

Plus, running a single-node Kafka cluster is surprisingly lightweight. We’ll cover that setup later.

Topics and Partitions

A Kafka topic is a named stream of events. For our life stream, we use two primary topics:

// From: stream-agents/scripts/init_kafka.ts
const TOPICS: TopicConfig[] = [
  {
    name: 'events.raw',
    numPartitions: 3,
    configEntries: [
      { name: 'retention.ms', value: String(7 * 24 * 60 * 60 * 1000) } // 7 days
    ]
  },
  {
    name: 'tags.catalog',
    numPartitions: 1, // Single partition for KTable pattern
    configEntries: [
      { name: 'cleanup.policy', value: 'compact' },
      { name: 'min.compaction.lag.ms', value: '0' },
      { name: 'segment.ms', value: String(60 * 60 * 1000) } // 1 hour segments
    ]
  }
];

events.raw

This is the main event stream. Every link.added, note.created, and content.fetched event flows through here. Three partitions give us room for parallel processing (though for a personal stream, even one partition would suffice).

The 7-day retention means events stick around long enough to replay if something goes wrong, but don’t accumulate forever.

tags.catalog

This topic uses log compaction, a different pattern. Instead of deleting old messages after a time window, Kafka keeps only the latest message for each key. We use this to maintain a catalog of all known tags, enabling the enricher agent to suggest consistent tagging across articles.

Creating Topics

Topic creation is idempotent. Run it as many times as you want:

// From: stream-agents/scripts/init_kafka.ts
const kafka = getKafka();
const admin = kafka.admin();

await admin.connect();

const existingTopics = await admin.listTopics();

for (const topicConfig of TOPICS) {
  if (existingTopics.includes(topicConfig.name)) {
    console.log(`Topic '${topicConfig.name}' already exists.`);
  } else {
    await admin.createTopics({
      topics: [{
        topic: topicConfig.name,
        numPartitions: topicConfig.numPartitions,
        replicationFactor: 1, // Single node cluster
        configEntries: topicConfig.configEntries
      }]
    });
    console.log(`Topic '${topicConfig.name}' created.`);
  }
}

await admin.disconnect();

The Publisher

Events originate in Postgres (from your iOS Shortcut, browser extension, or API calls), but they need to flow into Kafka for processing. The publisher is a simple forwarder that bridges these two worlds.

The key challenge: exactly-once semantics. If the publisher crashes after sending to Kafka but before recording its progress, it might resend the same events on restart. We solve this with checkpointing.

Checkpoint Storage

// From: stream-agents/scripts/publish_events_to_kafka.ts
interface Checkpoint {
  timestamp: Date | string;
  id: string;
}

async function getCheckpoint(): Promise<Checkpoint> {
  const result = await sql`
    SELECT last_timestamp, last_event_id::text
    FROM lifestream.publisher_checkpoint
    WHERE publisher_id = ${PUBLISHER_ID}
  `;

  if (result.length > 0) {
    return {
      timestamp: result[0].last_timestamp,
      id: result[0].last_event_id,
    };
  }

  // No checkpoint exists yet - start from beginning
  return {
    timestamp: '1970-01-01T00:00:00.000Z',
    id: '00000000-0000-0000-0000-000000000000',
  };
}

async function saveCheckpoint(cp: Checkpoint): Promise<void> {
  await sql`
    INSERT INTO lifestream.publisher_checkpoint
      (publisher_id, last_timestamp, last_event_id)
    VALUES (${PUBLISHER_ID}, ${cp.timestamp}, ${cp.id}::uuid)
    ON CONFLICT (publisher_id) DO UPDATE SET
      last_timestamp = EXCLUDED.last_timestamp,
      last_event_id = EXCLUDED.last_event_id,
      updated_at = now()
  `;
}

The Publish Loop

The publisher runs continuously, polling for new events:

// From: stream-agents/scripts/publish_events_to_kafka.ts
while (true) {
  const cp = await getCheckpoint();

  // Get events after checkpoint
  const events = await sql`
    SELECT *
    FROM lifestream.events
    WHERE received_at >= ${cp.timestamp}
      AND id != ${cp.id}::uuid
    ORDER BY received_at ASC, id ASC
    LIMIT 50
  `;

  if (events.length > 0) {
    const messages = events.map(e => ({
      key: e.subject_id,
      value: JSON.stringify(e),
      headers: {
        event_type: e.event_type,
        source: e.source
      }
    }));

    await producer.send({
      topic: 'events.raw',
      messages
    });

    // Checkpoint AFTER successful publish
    const lastEvent = events[events.length - 1];
    await saveCheckpoint({
      timestamp: lastEvent.received_at,
      id: lastEvent.id
    });

    console.log(`Published ${events.length} events.`);
  }

  await new Promise(r => setTimeout(r, 5000));
}

Notice the order: send to Kafka first, then save the checkpoint. If we crash between these two operations, we’ll resend some events on restart, but that’s fine because our consumers are idempotent.

Consumer Groups

Kafka’s consumer groups are where the magic happens. Multiple consumers in the same group coordinate to process a topic in parallel, with each partition assigned to exactly one consumer.

Creating a Consumer

// From: stream-agents/src/lib/kafka.ts
export const createConsumer = async (groupId: string): Promise<Consumer> => {
  const consumer = kafka.consumer({ groupId });
  await consumer.connect();
  return consumer;
};

The Fetcher Agent Pattern

Here’s how our fetcher agent (which downloads and parses article content) consumes events:

// From: stream-agents/scripts/agent_fetcher.ts
const CONSUMER_GROUP = 'fetcher-agent-v1';
const TOPIC = 'events.raw';

async function main(): Promise<void> {
  console.log('Starting Fetcher Agent...');
  console.log(`Consumer Group: ${CONSUMER_GROUP}`);

  const consumer = await createConsumer(CONSUMER_GROUP);

  await consumer.subscribe({ topic: TOPIC, fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      if (!message.value) return;

      const event = JSON.parse(message.value.toString());

      // Only process link.added events
      if (event.event_type !== 'link.added') return;

      // Idempotency check
      if (await contentAlreadyFetched(event.subject_id)) {
        console.log(`Already fetched: ${event.subject_id}`);
        return;
      }

      // Process the event
      const result = await fetchAndExtract(event.payload.url);
      await emitContentFetched(event.subject_id, result);
    },
  });
}

Key Consumer Concepts

Group ID versioning: Notice fetcher-agent-v1. When you change processing logic significantly, bump the version. This creates a new consumer group that starts from the beginning, reprocessing all events with the new logic.

fromBeginning: true: For a new consumer group, start from the earliest available offset. This ensures you don’t miss events that arrived before the consumer started.

Idempotency: Always check if you’ve already processed an event. Kafka guarantees at-least-once delivery, so your consumer must handle duplicates gracefully.

Multi-Topic Consumers

Some agents need multiple topics. The enricher agent, for example, consumes both events.raw and tags.catalog:

// From: stream-agents/scripts/agent_enricher.ts
const CONSUMER_GROUP = 'enricher-agent-v1';
const EVENTS_TOPIC = 'events.raw';
const TAGS_TOPIC = 'tags.catalog';

// Subscribe to both topics
await consumer.subscribe({
  topics: [EVENTS_TOPIC, TAGS_TOPIC],
  fromBeginning: true
});

await consumer.run({
  eachMessage: async ({ topic, message }) => {
    if (topic === TAGS_TOPIC) {
      // Update local tag catalog
      handleTagCatalogMessage(message);
      return;
    }

    // Handle events.raw messages
    const event = JSON.parse(message.value.toString());
    if (event.event_type === 'content.fetched') {
      await handleContentFetchedEvent(event);
    }
  },
});

This pattern builds a local cache of the tag catalog, keeping the enricher agent’s view of known tags up-to-date.

Authentication

For a personal system running on your own infrastructure, you have options ranging from “no auth” to “full enterprise security.” We’ll cover the two most practical approaches.

SASL/PLAIN

Simple username/password authentication. Easy to set up, works everywhere:

// From: stream-agents/src/lib/kafka.ts
const getSaslConfig = (): SASLOptions | undefined => {
  if (authMethod === 'plain') {
    if (!plainUser || !plainPass) {
      throw new Error(
        'KAFKA_SASL_USERNAME and KAFKA_SASL_PASSWORD required'
      );
    }
    return {
      mechanism: 'plain',
      username: plainUser,
      password: plainPass,
    };
  }
  return undefined;
};

Set these environment variables:

KAFKA_AUTH_METHOD=plain
KAFKA_SASL_USERNAME=your-username
KAFKA_SASL_PASSWORD=your-secure-password
KAFKA_SSL=true

AWS IAM (for MSK)

If you’re running on Amazon MSK, IAM authentication is the cleanest option:

// From: stream-agents/src/lib/kafka.ts
import { generateAuthToken } from 'aws-msk-iam-sasl-signer-js';

const getSaslConfig = (): SASLOptions | undefined => {
  if (authMethod === 'aws-iam') {
    const region = process.env.AWS_REGION || 'us-west-2';

    return {
      mechanism: 'oauthbearer',
      oauthBearerProvider: async () => {
        const result = await generateAuthToken({ region });
        return { value: result.token };
      }
    };
  }
  // ...
};

This uses your AWS credentials (from environment, instance profile, or credentials file) to generate short-lived tokens. No passwords to manage.

The Full Client Configuration

Putting it together:

// From: stream-agents/src/lib/kafka.ts
const brokers = process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'];
const clientId = process.env.KAFKA_CLIENT_ID || 'stream-agents';
const authMethod = process.env.KAFKA_AUTH_METHOD || 'none';

const useSSL = authMethod === 'aws-iam' || process.env.KAFKA_SSL === 'true';

const kafkaConfig = {
  clientId,
  brokers,
  ssl: useSSL,
  connectionTimeout: 10000,
  authenticationTimeout: 10000,
  retry: {
    initialRetryTime: 500,
    retries: 8
  }
};

const sasl = getSaslConfig();
if (sasl) {
  kafkaConfig.sasl = sasl;
}

const kafka = new Kafka(kafkaConfig);

Local Development

You don’t need a managed Kafka service to get started. A single EC2 instance running Kafka in KRaft mode (no ZooKeeper) works beautifully for personal use.

The Architecture

┌─────────────────────────────────────────────────┐
│                   EC2 Instance                  │
│  ┌───────────────┐      ┌───────────────────┐   │
│  │     nginx     │──────│      Kafka        │   │
│  │   (TLS proxy) │ 9092 │   (KRaft mode)    │   │
│  │    :9093      │      │                   │   │
│  └───────────────┘      └───────────────────┘   │
│         │                                        │
│         │ Let's Encrypt                          │
│         │ TLS termination                        │
└─────────┼───────────────────────────────────────┘

    Public Internet

    ┌─────┴─────┐
    │  Clients  │
    └───────────┘

Kafka listens on localhost:9092. nginx terminates TLS on port 9093 and proxies to Kafka. Let’s Encrypt provides free, auto-renewing certificates.

Why This Setup?

  1. TLS everywhere - Your events flow over the public internet. Encryption is non-negotiable.

  2. SASL/PLAIN auth - Simple but effective. Combined with TLS, it’s secure enough for personal use.

  3. KRaft mode - Kafka 3.x+ can run without ZooKeeper, dramatically simplifying single-node deployments.

  4. Minimal cost - A t3.small instance (~$15/month) handles a personal life stream easily.

Terraform Deployment

We use Terraform to provision the infrastructure. Here’s the key configuration:

# From: stream-agents/infrastructure/ec2-kafka/ec2.tf
resource "aws_instance" "kafka" {
  ami           = data.aws_ami.ubuntu.id
  instance_type = "t3.small"

  root_block_device {
    volume_size = 20 # GB
    volume_type = "gp3"
  }

  user_data = <<-EOF
#!/bin/bash
set -e

# Install Docker and nginx
apt-get update
apt-get install -y docker.io nginx libnginx-mod-stream certbot

# Obtain Let's Encrypt certificate
certbot certonly --standalone \
  --non-interactive \
  --agree-tos \
  --email admin@kafka.yourdomain.com \
  --domain kafka.yourdomain.com

# Configure nginx as TLS proxy
# ... (see full config in repository)

# Start Kafka in KRaft mode
docker run -d --name kafka \
  --restart always \
  -p 127.0.0.1:9092:9092 \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_PROCESS_ROLES=controller,broker \
  -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094 \
  -e KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://kafka.yourdomain.com:9093" \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT \
  -e KAFKA_SASL_ENABLED_MECHANISMS=PLAIN \
  # ... additional config
  confluentinc/cp-kafka:7.6.0
EOF
}

Connecting to Your Cluster

Once deployed, configure your clients:

export KAFKA_BROKERS=kafka.yourdomain.com:9093
export KAFKA_AUTH_METHOD=plain
export KAFKA_SASL_USERNAME=your-username
export KAFKA_SASL_PASSWORD=your-password
export KAFKA_SSL=true

Then initialize topics and start publishing:

bun run scripts/init_kafka.ts
bun run scripts/publish_events_to_kafka.ts

Putting It Together

With Kafka in place, your life stream architecture looks like this:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Postgres  │────▶│    Kafka    │────▶│   Agents    │
│  (events)   │     │ events.raw  │     │  (process)  │
└─────────────┘     └─────────────┘     └─────────────┘
      │                    │                    │
      │                    │                    │
      ▼                    ▼                    ▼
   Source of           Durable              Emit new
    Truth             Transport             events

Events are born in Postgres, flow through Kafka for durable transport and fanout, get processed by agents, and the results land back in Postgres for querying.


Next up: Part 4 explores real-time stream processing with Apache Flink (coming soon). We’ll take the patterns established here and add windowed aggregations, complex event processing, and joins across streams.

For more on the overall architecture, revisit Part 0: Vibe Decoding.