Event Model & Life Stream
In Part 0, we introduced the concept of a life stream: a unified event log of your digital interactions. Now it’s time to get concrete. How do we actually model these events? What makes a good event schema? And how do we ensure that the same link saved twice doesn’t create duplicate chaos?
This is the foundation everything else builds on. Get the event model right, and the rest flows naturally. Get it wrong, and you’ll be fighting your own architecture forever.
What is a Life Stream?
A life stream is an append-only log of everything that happens in your digital life. Every link saved, every note created, every temperature reading from your smart home: each becomes an immutable event in the stream.
But here’s the key insight: we’re not storing state. We’re storing facts about what happened.
This distinction matters more than it might seem. When you save a bookmark, you could either:
- Store state: Update a
bookmarkstable with the new URL - Store an event: Append a
link.addedevent to the stream
The event-first approach gives you superpowers:
- Time travel: Replay events to see what your bookmarks looked like last Tuesday
- Derived views: Build multiple state representations from the same events
- Auditability: Know exactly when and how every piece of data entered your system
- Decoupling: Producers don’t need to know about consumers
The tradeoff? You need to materialize state for queries. But that’s what databases (and eventually Flink) are for.
The Event Model
Every event in the life stream follows a consistent structure. Here’s the core interface:
// From stream-agents/src/lib/db.ts
export interface Event {
id?: string;
occurred_at: string;
ingested_at?: string;
source: string;
subject: string;
subject_id: string;
event_type: string;
payload: Record<string, unknown>;
}
Let’s break down each field:
Timestamps: When Things Happened
Every event has two temporal dimensions:
occurred_at: When the event actually happened at the source. If you save a link on your phone at 3pm, that’soccurred_at.ingested_at(orreceived_atin the schema): When the system recorded the event. There’s always some lag.
This distinction matters for ordering. You might ingest events out of order (network delays, batch imports), but you always know when they really happened.
Source: Where It Came From
The source field identifies the origin of an event:
"phone"- Mobile app captures"chrome"- Browser extension"homeassistant"- Smart home integrations"todoist"- Task management sync"agent:fetcher"- The fetcher agent (yes, agents are sources too)"agent:enricher"- The enricher agent
Agents emit events just like any other source. When the fetcher agent successfully retrieves content, it emits a content.fetched event with source: 'agent:fetcher'. This creates a complete audit trail of what happened and who (or what) did it.
Subject and Subject ID: What It’s About
Every event is about something: a link, a todo, a sensor reading. We capture this with two fields:
subject: The type of entity (e.g.,"link","todo.item","home.temperature")subject_id: A stable, unique identifier for that specific entity
The subject_id is the linchpin of the entire system. It’s how we correlate events about the same entity across time and sources. A link saved on your phone and later enriched by an agent both share the same subject_id.
We’ll dive deep into subject ID generation in the next section.
Event Type: What Happened
The event_type field describes the action:
"link.added"- A new link was saved"content.fetched"- Content was retrieved from a URL"enrichment.completed"- AI processing finished"reading.recorded"- A sensor reading was captured"item.completed"- A todo was marked done
The naming convention is {domain}.{action} in past tense. Events are facts about what already happened, not commands for what should happen.
Payload: The Details
The payload is a flexible JSON object containing event-specific data. For a link.added event:
{
"url": "https://example.com/article",
"source_context": "twitter_timeline",
"tags": ["ai", "research"]
}
For a content.fetched event:
{
"final_url": "https://example.com/article",
"title": "An Interesting Article",
"text_content": "The full extracted text...",
"fetch_error": null
}
The schema is intentionally loose. Different event types have different payloads, and we validate at the application layer rather than the database.
Subject IDs: Stable Entity Identification
The subject_id is perhaps the most critical design decision in the entire system. It needs to be:
- Deterministic: The same entity always gets the same ID
- Stable: The ID doesn’t change even if other attributes do
- Unique: No collisions between different entities
- Meaningful: You can tell what type of entity it is at a glance
Here’s how we generate them:
// From stream-agents/src/lib/subject_id.ts
/**
* Subject ID generation utilities
*
* Subject IDs are stable, deterministic identifiers for entities in the system.
* Format: "{type}:{identifier}"
*
* Examples:
* - link:a3f2b1c4d5e6... (SHA256 of normalized URL)
* - sensor:living_room
* - todoist:12345
* - anno:uuid
*/
The format is always {type}:{identifier}. The type prefix makes it immediately clear what kind of entity you’re dealing with, and the identifier is derived deterministically from the entity’s natural key.
URL Normalization
For links, the natural key is the URL. But URLs are tricky. The same page can be referenced many different ways:
https://Example.COM/pagehttps://example.com/page/https://example.com/page?utm_source=twitterHTTP://example.com:80/page
We need to normalize these to a canonical form before hashing:
// From stream-agents/src/lib/subject_id.ts
/**
* Normalize a URL for consistent hashing:
* - Lowercase scheme and host
* - Remove default ports (80, 443)
* - Sort query parameters
* - Remove trailing slashes (except root)
* - Remove fragments
*/
export function normalizeUrl(url: string): string {
try {
const parsed = new URL(url);
// Lowercase scheme and host
parsed.protocol = parsed.protocol.toLowerCase();
parsed.hostname = parsed.hostname.toLowerCase();
// Remove default ports
if (
(parsed.protocol === "http:" && parsed.port === "80") ||
(parsed.protocol === "https:" && parsed.port === "443")
) {
parsed.port = "";
}
// Sort query parameters
const params = new URLSearchParams(parsed.search);
const sortedParams = new URLSearchParams([...params.entries()].sort());
parsed.search = sortedParams.toString();
// Remove fragment
parsed.hash = "";
// Build normalized URL
let normalized = parsed.toString();
// Remove trailing slash (except for root path)
if (normalized.endsWith("/") && parsed.pathname !== "/") {
normalized = normalized.slice(0, -1);
}
return normalized;
} catch {
// If URL parsing fails, return original (will still get hashed)
return url;
}
}
With normalization in place, generating a link’s subject ID is straightforward:
// From stream-agents/src/lib/subject_id.ts
/**
* Generate SHA256 hash of a string (first 16 chars for brevity)
*/
export function sha256Short(input: string): string {
return createHash("sha256").update(input).digest("hex").slice(0, 16);
}
/**
* Generate a link subject_id from a URL
* Format: "link:{sha256(normalized_url)}"
*/
export function linkSubjectId(url: string): string {
const normalized = normalizeUrl(url);
return `link:${sha256Short(normalized)}`;
}
The result looks like link:a3f2b1c4d5e6f7a8. Compact, deterministic, and collision-resistant.
Other Entity Types
Different entity types use different identification strategies:
// From stream-agents/src/lib/subject_id.ts
/**
* Generate a sensor subject_id
* Format: "sensor:{location}"
*/
export function sensorSubjectId(location: string): string {
return `sensor:${location.toLowerCase().replace(/\s+/g, "_")}`;
}
/**
* Generate a todo subject_id (for external todo systems)
* Format: "{source}:{external_id}"
*/
export function todoSubjectId(source: string, externalId: string): string {
return `${source}:${externalId}`;
}
/**
* Generate an annotation subject_id
* Format: "anno:{uuid}"
*/
export function annotationSubjectId(uuid: string): string {
return `anno:${uuid}`;
}
Sensors use human-readable locations (sensor:living_room). Todos use external system IDs (todoist:12345). Annotations get UUIDs because they don’t have natural keys.
The pattern adapts to each domain while maintaining the type:identifier structure.
The Events Table
With the event model defined, here’s how it maps to PostgreSQL:
-- From stream-agents/schema.sql
-- Event ledger (append-only)
create table if not exists events (
id uuid primary key default gen_random_uuid(),
occurred_at timestamptz not null, -- when it happened at the source
received_at timestamptz not null default now(), -- when we stored it
source text not null, -- "phone", "chrome", "homeassistant", "todoist", "agent:summarizer"
subject text not null, -- matches subjects.subject
subject_id text not null, -- matches subjects.subject_id
event_type text not null, -- "created", "reading.recorded", "item.completed", "republish.requested"
schema_version int not null default 1,
payload jsonb not null default '{}'::jsonb,
-- tracing (optional but great)
correlation_id uuid null, -- ties together a pipeline run
causation_id uuid null, -- which prior event caused this event
-- Kafka lineage (optional)
kafka_topic text null,
kafka_partition int null,
kafka_offset bigint null
);
A few things worth noting:
Schema Versioning
The schema_version field future-proofs the system. When you need to evolve the payload structure, bump the version and handle both formats in your consumers.
Correlation and Causation
These tracing fields are optional but invaluable for debugging:
correlation_id: Groups all events from a single pipeline run. When you save a link, the subsequentcontent.fetchedandenrichment.completedevents share the same correlation ID.causation_id: Points to the specific event that triggered this one. Theenrichment.completedevent references thecontent.fetchedevent that caused it.
Kafka Lineage
If events flow through Kafka before landing in Postgres, we record the topic, partition, and offset. This enables exactly-once semantics and debugging when things go sideways.
Indexes for Common Queries
-- From stream-agents/schema.sql
-- Fast timeline queries + per-entity queries
create index if not exists events_subject_idx
on events(subject, subject_id, occurred_at desc);
create index if not exists events_type_idx
on events(event_type, occurred_at desc);
create index if not exists events_source_idx
on events(source, occurred_at desc);
Three index strategies cover most query patterns:
- By subject: “Show me all events for this link”
- By type: “Show me all enrichment completions today”
- By source: “Show me everything from the phone this week”
The Subjects Registry
Events reference subjects, but sometimes you need to query entities directly. The subjects table provides a registry:
-- From stream-agents/schema.sql
-- Canonical subjects registry
create table if not exists subjects (
subject text not null, -- e.g. "link", "todo.item", "home.temperature", "annotation"
subject_id text not null, -- stable key (string)
created_at timestamptz not null default now(),
display_name text null,
visibility text not null default 'private', -- private|public
meta jsonb not null default '{}'::jsonb,
primary key (subject, subject_id)
);
And the corresponding TypeScript interface:
// From stream-agents/src/lib/db.ts
export interface Subject {
subject: string;
subject_id: string;
created_at: string;
display_name?: string;
visibility: "public" | "private" | "unlisted";
meta: Record<string, unknown>;
}
The visibility field is particularly useful. It powers the public-facing site by filtering to only show public subjects.
Event Types Catalog
The system currently handles these event types:
| Event Type | Subject | Description | Emitted By |
|---|---|---|---|
link.added | link | A new URL was saved | phone, chrome, api |
content.fetched | link | HTML content was retrieved and parsed | agent:fetcher |
enrichment.completed | link | AI-generated tags and summary ready | agent:enricher |
reading.recorded | home.temperature | Sensor reading captured | homeassistant |
item.created | todo.item | New todo created | todoist |
item.completed | todo.item | Todo marked done | todoist |
Each event type has an implicit schema (the payload structure), documented by convention rather than formal schemas. For a personal system, this is pragmatic. For a team, you might want JSON Schema or Protobuf.
The Event Flow
Events form a directed graph. Here’s how a link flows through the system:
link.added (source: phone)
↓
content.fetched (source: agent:fetcher, causation_id → link.added)
↓
enrichment.completed (source: agent:enricher, causation_id → content.fetched)
Each step produces a new event rather than mutating state. The materializer (covered in Part 3) watches these events and updates the links, link_content, and link_metadata tables accordingly.
Idempotency: Why It Matters
In distributed systems, messages can be delivered more than once. Network hiccups, consumer restarts, Kafka rebalances: duplicates happen. Your system needs to handle them gracefully.
There are several strategies for achieving idempotency:
Strategy 1: Check Before Acting
Before processing an event, check if it’s already been handled:
// From stream-agents/scripts/agent_fetcher.ts
async function contentAlreadyFetched(subjectId: string): Promise<boolean> {
const result = await sql`
SELECT 1 FROM lifestream.link_content
WHERE subject_id = ${subjectId}
`;
return result.length > 0;
}
// In the message handler:
if (await contentAlreadyFetched(event.subject_id)) {
console.log(`[fetcher] Already fetched: ${event.subject_id}`);
return;
}
The fetcher agent checks if content already exists for a subject_id before making the HTTP request. Simple and effective.
Strategy 2: Kafka Offset Tracking
For events flowing from Kafka to Postgres, we track which offsets have been processed:
-- From stream-agents/schema.sql
-- Track last committed offset per consumer group
create table if not exists kafka_offsets (
consumer_group text not null,
topic text not null,
partition int not null,
last_offset bigint not null,
updated_at timestamptz not null default now(),
primary key (consumer_group, topic, partition)
);
Before processing a message, check if its offset is beyond the last committed. After processing, update the checkpoint. This prevents reprocessing on restart.
Strategy 3: Explicit Dedupe Table
For belt-and-suspenders safety, an explicit dedupe table ensures we never insert the same Kafka record twice:
-- From stream-agents/schema.sql
-- Extra dedupe barrier for events
create table if not exists event_ingest_dedupe (
topic text not null,
partition int not null,
kafka_offset bigint not null,
inserted_at timestamptz not null default now(),
primary key (topic, partition, kafka_offset)
);
Insert into the dedupe table in the same transaction as the event. If the dedupe insert fails (duplicate key), the whole transaction rolls back.
Strategy 4: Natural Key Uniqueness
For links, the normalized URL provides natural deduplication:
-- From stream-agents/schema.sql
create unique index if not exists links_url_norm_uq on links(url_norm);
Saving the same URL twice fails at the database level, which the application handles gracefully by treating it as a no-op.
Putting It All Together
Let’s trace through a complete example. You’re browsing Twitter and see an interesting article. You tap the share button and send it to your life stream:
-
Phone captures the URL and emits a
link.addedevent:{ "occurred_at": "2024-01-15T10:30:00Z", "source": "phone", "subject": "link", "subject_id": "link:a3f2b1c4d5e6f7a8", "event_type": "link.added", "payload": { "url": "https://example.com/article", "source_context": "twitter_share" } } -
Kafka receives the event and durably stores it in the
events.rawtopic. -
Fetcher agent consumes the event, checks idempotency, fetches the URL, and emits:
{ "occurred_at": "2024-01-15T10:30:05Z", "source": "agent:fetcher", "subject": "link", "subject_id": "link:a3f2b1c4d5e6f7a8", "event_type": "content.fetched", "correlation_id": "<original-event-id>", "payload": { "title": "An Interesting Article", "text_content": "The full article text..." } } -
Enricher agent consumes the content.fetched event, calls the LLM, and emits:
{ "occurred_at": "2024-01-15T10:30:10Z", "source": "agent:enricher", "subject": "link", "subject_id": "link:a3f2b1c4d5e6f7a8", "event_type": "enrichment.completed", "correlation_id": "<original-event-id>", "payload": { "tags": ["technology", "ai"], "summary_short": "A deep dive into...", "summary_long": "..." } } -
Materializer updates state tables based on each event, populating
links,link_content, andlink_metadata. -
The link is now ready to appear on your public site (if visibility is set to public).
All of this happens in seconds, and every step is recorded as an immutable event. You can replay, debug, and audit at will.
What’s Next
We’ve established the event model, the foundation everything builds on. In Part 2: Agents and Enrichment Loops, we’ll dive into the agents themselves: how they consume events, call LLMs, and maintain state through Kafka-native patterns.
The fetcher and enricher agents we glimpsed here are just the beginning. The real power comes from composing agents into pipelines that transform raw signals into actionable insights.
Key takeaways from this part:
- Events are immutable facts; state is derived
- Subject IDs provide stable, deterministic entity identification
- URL normalization ensures the same link always gets the same ID
- Multiple idempotency strategies compound safety
- Correlation and causation IDs enable distributed tracing