Event Model & Life Stream

In Part 0, we introduced the concept of a life stream: a unified event log of your digital interactions. Now it’s time to get concrete. How do we actually model these events? What makes a good event schema? And how do we ensure that the same link saved twice doesn’t create duplicate chaos?

This is the foundation everything else builds on. Get the event model right, and the rest flows naturally. Get it wrong, and you’ll be fighting your own architecture forever.

What is a Life Stream?

A life stream is an append-only log of everything that happens in your digital life. Every link saved, every note created, every temperature reading from your smart home: each becomes an immutable event in the stream.

But here’s the key insight: we’re not storing state. We’re storing facts about what happened.

This distinction matters more than it might seem. When you save a bookmark, you could either:

  1. Store state: Update a bookmarks table with the new URL
  2. Store an event: Append a link.added event to the stream

The event-first approach gives you superpowers:

  • Time travel: Replay events to see what your bookmarks looked like last Tuesday
  • Derived views: Build multiple state representations from the same events
  • Auditability: Know exactly when and how every piece of data entered your system
  • Decoupling: Producers don’t need to know about consumers

The tradeoff? You need to materialize state for queries. But that’s what databases (and eventually Flink) are for.

The Event Model

Every event in the life stream follows a consistent structure. Here’s the core interface:

// From stream-agents/src/lib/db.ts

export interface Event {
  id?: string;
  occurred_at: string;
  ingested_at?: string;
  source: string;
  subject: string;
  subject_id: string;
  event_type: string;
  payload: Record<string, unknown>;
}

Let’s break down each field:

Timestamps: When Things Happened

Every event has two temporal dimensions:

  • occurred_at: When the event actually happened at the source. If you save a link on your phone at 3pm, that’s occurred_at.
  • ingested_at (or received_at in the schema): When the system recorded the event. There’s always some lag.

This distinction matters for ordering. You might ingest events out of order (network delays, batch imports), but you always know when they really happened.

Source: Where It Came From

The source field identifies the origin of an event:

  • "phone" - Mobile app captures
  • "chrome" - Browser extension
  • "homeassistant" - Smart home integrations
  • "todoist" - Task management sync
  • "agent:fetcher" - The fetcher agent (yes, agents are sources too)
  • "agent:enricher" - The enricher agent

Agents emit events just like any other source. When the fetcher agent successfully retrieves content, it emits a content.fetched event with source: 'agent:fetcher'. This creates a complete audit trail of what happened and who (or what) did it.

Subject and Subject ID: What It’s About

Every event is about something: a link, a todo, a sensor reading. We capture this with two fields:

  • subject: The type of entity (e.g., "link", "todo.item", "home.temperature")
  • subject_id: A stable, unique identifier for that specific entity

The subject_id is the linchpin of the entire system. It’s how we correlate events about the same entity across time and sources. A link saved on your phone and later enriched by an agent both share the same subject_id.

We’ll dive deep into subject ID generation in the next section.

Event Type: What Happened

The event_type field describes the action:

  • "link.added" - A new link was saved
  • "content.fetched" - Content was retrieved from a URL
  • "enrichment.completed" - AI processing finished
  • "reading.recorded" - A sensor reading was captured
  • "item.completed" - A todo was marked done

The naming convention is {domain}.{action} in past tense. Events are facts about what already happened, not commands for what should happen.

Payload: The Details

The payload is a flexible JSON object containing event-specific data. For a link.added event:

{
  "url": "https://example.com/article",
  "source_context": "twitter_timeline",
  "tags": ["ai", "research"]
}

For a content.fetched event:

{
  "final_url": "https://example.com/article",
  "title": "An Interesting Article",
  "text_content": "The full extracted text...",
  "fetch_error": null
}

The schema is intentionally loose. Different event types have different payloads, and we validate at the application layer rather than the database.

Subject IDs: Stable Entity Identification

The subject_id is perhaps the most critical design decision in the entire system. It needs to be:

  1. Deterministic: The same entity always gets the same ID
  2. Stable: The ID doesn’t change even if other attributes do
  3. Unique: No collisions between different entities
  4. Meaningful: You can tell what type of entity it is at a glance

Here’s how we generate them:

// From stream-agents/src/lib/subject_id.ts

/**
 * Subject ID generation utilities
 *
 * Subject IDs are stable, deterministic identifiers for entities in the system.
 * Format: "{type}:{identifier}"
 *
 * Examples:
 * - link:a3f2b1c4d5e6... (SHA256 of normalized URL)
 * - sensor:living_room
 * - todoist:12345
 * - anno:uuid
 */

The format is always {type}:{identifier}. The type prefix makes it immediately clear what kind of entity you’re dealing with, and the identifier is derived deterministically from the entity’s natural key.

URL Normalization

For links, the natural key is the URL. But URLs are tricky. The same page can be referenced many different ways:

  • https://Example.COM/page
  • https://example.com/page/
  • https://example.com/page?utm_source=twitter
  • HTTP://example.com:80/page

We need to normalize these to a canonical form before hashing:

// From stream-agents/src/lib/subject_id.ts

/**
 * Normalize a URL for consistent hashing:
 * - Lowercase scheme and host
 * - Remove default ports (80, 443)
 * - Sort query parameters
 * - Remove trailing slashes (except root)
 * - Remove fragments
 */
export function normalizeUrl(url: string): string {
  try {
    const parsed = new URL(url);

    // Lowercase scheme and host
    parsed.protocol = parsed.protocol.toLowerCase();
    parsed.hostname = parsed.hostname.toLowerCase();

    // Remove default ports
    if (
      (parsed.protocol === "http:" && parsed.port === "80") ||
      (parsed.protocol === "https:" && parsed.port === "443")
    ) {
      parsed.port = "";
    }

    // Sort query parameters
    const params = new URLSearchParams(parsed.search);
    const sortedParams = new URLSearchParams([...params.entries()].sort());
    parsed.search = sortedParams.toString();

    // Remove fragment
    parsed.hash = "";

    // Build normalized URL
    let normalized = parsed.toString();

    // Remove trailing slash (except for root path)
    if (normalized.endsWith("/") && parsed.pathname !== "/") {
      normalized = normalized.slice(0, -1);
    }

    return normalized;
  } catch {
    // If URL parsing fails, return original (will still get hashed)
    return url;
  }
}

With normalization in place, generating a link’s subject ID is straightforward:

// From stream-agents/src/lib/subject_id.ts

/**
 * Generate SHA256 hash of a string (first 16 chars for brevity)
 */
export function sha256Short(input: string): string {
  return createHash("sha256").update(input).digest("hex").slice(0, 16);
}

/**
 * Generate a link subject_id from a URL
 * Format: "link:{sha256(normalized_url)}"
 */
export function linkSubjectId(url: string): string {
  const normalized = normalizeUrl(url);
  return `link:${sha256Short(normalized)}`;
}

The result looks like link:a3f2b1c4d5e6f7a8. Compact, deterministic, and collision-resistant.

Other Entity Types

Different entity types use different identification strategies:

// From stream-agents/src/lib/subject_id.ts

/**
 * Generate a sensor subject_id
 * Format: "sensor:{location}"
 */
export function sensorSubjectId(location: string): string {
  return `sensor:${location.toLowerCase().replace(/\s+/g, "_")}`;
}

/**
 * Generate a todo subject_id (for external todo systems)
 * Format: "{source}:{external_id}"
 */
export function todoSubjectId(source: string, externalId: string): string {
  return `${source}:${externalId}`;
}

/**
 * Generate an annotation subject_id
 * Format: "anno:{uuid}"
 */
export function annotationSubjectId(uuid: string): string {
  return `anno:${uuid}`;
}

Sensors use human-readable locations (sensor:living_room). Todos use external system IDs (todoist:12345). Annotations get UUIDs because they don’t have natural keys.

The pattern adapts to each domain while maintaining the type:identifier structure.

The Events Table

With the event model defined, here’s how it maps to PostgreSQL:

-- From stream-agents/schema.sql

-- Event ledger (append-only)
create table if not exists events (
  id              uuid primary key default gen_random_uuid(),

  occurred_at      timestamptz not null,          -- when it happened at the source
  received_at      timestamptz not null default now(), -- when we stored it

  source           text not null,                 -- "phone", "chrome", "homeassistant", "todoist", "agent:summarizer"
  subject          text not null,                 -- matches subjects.subject
  subject_id       text not null,                 -- matches subjects.subject_id

  event_type       text not null,                 -- "created", "reading.recorded", "item.completed", "republish.requested"
  schema_version   int  not null default 1,

  payload          jsonb not null default '{}'::jsonb,

  -- tracing (optional but great)
  correlation_id   uuid null,                     -- ties together a pipeline run
  causation_id     uuid null,                     -- which prior event caused this event

  -- Kafka lineage (optional)
  kafka_topic      text null,
  kafka_partition  int  null,
  kafka_offset     bigint null
);

A few things worth noting:

Schema Versioning

The schema_version field future-proofs the system. When you need to evolve the payload structure, bump the version and handle both formats in your consumers.

Correlation and Causation

These tracing fields are optional but invaluable for debugging:

  • correlation_id: Groups all events from a single pipeline run. When you save a link, the subsequent content.fetched and enrichment.completed events share the same correlation ID.
  • causation_id: Points to the specific event that triggered this one. The enrichment.completed event references the content.fetched event that caused it.

Kafka Lineage

If events flow through Kafka before landing in Postgres, we record the topic, partition, and offset. This enables exactly-once semantics and debugging when things go sideways.

Indexes for Common Queries

-- From stream-agents/schema.sql

-- Fast timeline queries + per-entity queries
create index if not exists events_subject_idx
  on events(subject, subject_id, occurred_at desc);

create index if not exists events_type_idx
  on events(event_type, occurred_at desc);

create index if not exists events_source_idx
  on events(source, occurred_at desc);

Three index strategies cover most query patterns:

  1. By subject: “Show me all events for this link”
  2. By type: “Show me all enrichment completions today”
  3. By source: “Show me everything from the phone this week”

The Subjects Registry

Events reference subjects, but sometimes you need to query entities directly. The subjects table provides a registry:

-- From stream-agents/schema.sql

-- Canonical subjects registry
create table if not exists subjects (
  subject         text not null,           -- e.g. "link", "todo.item", "home.temperature", "annotation"
  subject_id      text not null,           -- stable key (string)
  created_at      timestamptz not null default now(),
  display_name    text null,
  visibility      text not null default 'private', -- private|public
  meta            jsonb not null default '{}'::jsonb,

  primary key (subject, subject_id)
);

And the corresponding TypeScript interface:

// From stream-agents/src/lib/db.ts

export interface Subject {
  subject: string;
  subject_id: string;
  created_at: string;
  display_name?: string;
  visibility: "public" | "private" | "unlisted";
  meta: Record<string, unknown>;
}

The visibility field is particularly useful. It powers the public-facing site by filtering to only show public subjects.

Event Types Catalog

The system currently handles these event types:

Event TypeSubjectDescriptionEmitted By
link.addedlinkA new URL was savedphone, chrome, api
content.fetchedlinkHTML content was retrieved and parsedagent:fetcher
enrichment.completedlinkAI-generated tags and summary readyagent:enricher
reading.recordedhome.temperatureSensor reading capturedhomeassistant
item.createdtodo.itemNew todo createdtodoist
item.completedtodo.itemTodo marked donetodoist

Each event type has an implicit schema (the payload structure), documented by convention rather than formal schemas. For a personal system, this is pragmatic. For a team, you might want JSON Schema or Protobuf.

The Event Flow

Events form a directed graph. Here’s how a link flows through the system:

link.added (source: phone)

content.fetched (source: agent:fetcher, causation_id → link.added)

enrichment.completed (source: agent:enricher, causation_id → content.fetched)

Each step produces a new event rather than mutating state. The materializer (covered in Part 3) watches these events and updates the links, link_content, and link_metadata tables accordingly.

Idempotency: Why It Matters

In distributed systems, messages can be delivered more than once. Network hiccups, consumer restarts, Kafka rebalances: duplicates happen. Your system needs to handle them gracefully.

There are several strategies for achieving idempotency:

Strategy 1: Check Before Acting

Before processing an event, check if it’s already been handled:

// From stream-agents/scripts/agent_fetcher.ts

async function contentAlreadyFetched(subjectId: string): Promise<boolean> {
  const result = await sql`
    SELECT 1 FROM lifestream.link_content
    WHERE subject_id = ${subjectId}
  `;
  return result.length > 0;
}

// In the message handler:
if (await contentAlreadyFetched(event.subject_id)) {
  console.log(`[fetcher] Already fetched: ${event.subject_id}`);
  return;
}

The fetcher agent checks if content already exists for a subject_id before making the HTTP request. Simple and effective.

Strategy 2: Kafka Offset Tracking

For events flowing from Kafka to Postgres, we track which offsets have been processed:

-- From stream-agents/schema.sql

-- Track last committed offset per consumer group
create table if not exists kafka_offsets (
  consumer_group  text not null,
  topic           text not null,
  partition       int  not null,
  last_offset     bigint not null,
  updated_at      timestamptz not null default now(),
  primary key (consumer_group, topic, partition)
);

Before processing a message, check if its offset is beyond the last committed. After processing, update the checkpoint. This prevents reprocessing on restart.

Strategy 3: Explicit Dedupe Table

For belt-and-suspenders safety, an explicit dedupe table ensures we never insert the same Kafka record twice:

-- From stream-agents/schema.sql

-- Extra dedupe barrier for events
create table if not exists event_ingest_dedupe (
  topic           text not null,
  partition       int  not null,
  kafka_offset    bigint not null,
  inserted_at     timestamptz not null default now(),
  primary key (topic, partition, kafka_offset)
);

Insert into the dedupe table in the same transaction as the event. If the dedupe insert fails (duplicate key), the whole transaction rolls back.

Strategy 4: Natural Key Uniqueness

For links, the normalized URL provides natural deduplication:

-- From stream-agents/schema.sql

create unique index if not exists links_url_norm_uq on links(url_norm);

Saving the same URL twice fails at the database level, which the application handles gracefully by treating it as a no-op.

Putting It All Together

Let’s trace through a complete example. You’re browsing Twitter and see an interesting article. You tap the share button and send it to your life stream:

  1. Phone captures the URL and emits a link.added event:

    {
      "occurred_at": "2024-01-15T10:30:00Z",
      "source": "phone",
      "subject": "link",
      "subject_id": "link:a3f2b1c4d5e6f7a8",
      "event_type": "link.added",
      "payload": {
        "url": "https://example.com/article",
        "source_context": "twitter_share"
      }
    }
  2. Kafka receives the event and durably stores it in the events.raw topic.

  3. Fetcher agent consumes the event, checks idempotency, fetches the URL, and emits:

    {
      "occurred_at": "2024-01-15T10:30:05Z",
      "source": "agent:fetcher",
      "subject": "link",
      "subject_id": "link:a3f2b1c4d5e6f7a8",
      "event_type": "content.fetched",
      "correlation_id": "<original-event-id>",
      "payload": {
        "title": "An Interesting Article",
        "text_content": "The full article text..."
      }
    }
  4. Enricher agent consumes the content.fetched event, calls the LLM, and emits:

    {
      "occurred_at": "2024-01-15T10:30:10Z",
      "source": "agent:enricher",
      "subject": "link",
      "subject_id": "link:a3f2b1c4d5e6f7a8",
      "event_type": "enrichment.completed",
      "correlation_id": "<original-event-id>",
      "payload": {
        "tags": ["technology", "ai"],
        "summary_short": "A deep dive into...",
        "summary_long": "..."
      }
    }
  5. Materializer updates state tables based on each event, populating links, link_content, and link_metadata.

  6. The link is now ready to appear on your public site (if visibility is set to public).

All of this happens in seconds, and every step is recorded as an immutable event. You can replay, debug, and audit at will.

What’s Next

We’ve established the event model, the foundation everything builds on. In Part 2: Agents and Enrichment Loops, we’ll dive into the agents themselves: how they consume events, call LLMs, and maintain state through Kafka-native patterns.

The fetcher and enricher agents we glimpsed here are just the beginning. The real power comes from composing agents into pipelines that transform raw signals into actionable insights.


Key takeaways from this part:

  • Events are immutable facts; state is derived
  • Subject IDs provide stable, deterministic entity identification
  • URL normalization ensures the same link always gets the same ID
  • Multiple idempotency strategies compound safety
  • Correlation and causation IDs enable distributed tracing