Extending the System

Your life stream has been learning about you: capturing links, enriching them with tags, transcribing voice notes. But so far, it’s been a one-way street. Information flows in, gets processed, sits in a database.

What if the system could also act on the world?

This is where extending the system gets interesting. Adding a new source isn’t just about ingesting another data type. It’s about completing the loop: letting the life stream both read from and write to the systems you already use.

The Bidirectional Pattern

Calendar integration is the perfect example. There are two valuable directions:

  1. Ingest: Calendar events → Life Stream (enrich meetings with context)
  2. Emit: Life Stream → Calendar (todos with due dates appear as events)

Most systems only do the first. The life stream can do both.

┌─────────────────┐
│ Google Calendar │
└────────┬────────┘

         │  ┌──── sync events ────┐
         │  │                     │
         ▼  │                     │
┌─────────────────┐       ┌─────────────────┐
│   Life Stream   │ ◄───► │    Calendar     │
│  (events.raw)   │       │     Domain      │
└─────────────────┘       └─────────────────┘
         │                        ▲
         │                        │
         │  ┌─── create events ───┘
         │  │
         ▼  │
┌─────────────────┐
│   Todo Domain   │
│ (with due dates)│
└─────────────────┘

The Extension Pattern

Before diving into calendar specifics, let’s review the pattern for adding any new source. Every extension follows the same steps:

  1. Schema: Define state tables for the domain
  2. Subject IDs: Create deterministic identifiers
  3. Event Types: Document the contracts
  4. Sync/Import: Build the ingestion mechanism
  5. Materializer: Add handlers for new event types
  6. Agents (optional): Enrichment or action agents

This modularity is what makes the system extensible. Each piece is independent. You can add a new domain without touching existing code: just define your schema, emit events in the right format, and add materializer handlers.

Direction 1: Ingesting Calendar Events

Let’s start with reading from the calendar. The goal: your life stream knows about your meetings, can tag them, and can surface relevant links when a meeting topic matches something you’ve bookmarked.

Schema Design

Calendar events need their own state tables. Following the patterns established for links and voice notes:

-- Calendar events domain
create table if not exists calendar_events (
  subject_id text primary key,          -- "gcal:{calendar_id}:{event_id}"
  calendar_id text not null,
  external_event_id text not null,
  title text,
  description text,
  start_at timestamptz,
  end_at timestamptz,
  location text,
  attendees text[],
  status text default 'confirmed',      -- confirmed|tentative|cancelled
  visibility text default 'private',
  created_at timestamptz default now(),

  unique(calendar_id, external_event_id)
);

-- Metadata table for enrichment results
create table if not exists calendar_event_metadata (
  subject_id text primary key references calendar_events(subject_id) on delete cascade,
  tags text[] not null default '{}',
  related_subjects text[],              -- links to other subject_ids
  summary text,
  model_version text,
  updated_at timestamptz not null default now()
);

-- Index for time-range queries
create index if not exists calendar_events_time_idx
  on calendar_events(start_at desc);

The schema mirrors what we’ve done elsewhere: a main state table for the entity, a separate metadata table for enrichment results, and appropriate indexes for common queries.

Subject ID Strategy

Subject IDs must be deterministic: the same calendar event should always produce the same ID, even if synced multiple times. For calendar events, we combine the calendar ID and event ID:

// From: stream-agents/src/lib/subject_id.ts

/**
 * Generate a calendar event subject_id
 * Format: "gcal:{calendar_id}:{event_id}"
 */
export function calendarEventSubjectId(calendarId: string, eventId: string): string {
  return `gcal:${calendarId}:${eventId}`;
}

// Examples:
// calendarEventSubjectId("primary", "abc123def456")
//   → "gcal:primary:abc123def456"
//
// calendarEventSubjectId("work@company.com", "meeting_xyz")
//   → "gcal:work@company.com:meeting_xyz"

The gcal: prefix indicates Google Calendar. If you later add Apple Calendar or Outlook, you’d use ical: or outlook: prefixes. The subject ID format makes the source system clear while remaining collision-free.

Event Types

Calendar sync introduces two new event types:

  • calendar.event_synced - A calendar event was imported/updated
  • calendar.event_enriched - Tags and metadata were added by an enricher
// calendar.event_synced payload
interface CalendarEventSyncedPayload {
  title: string;
  description?: string;
  start_at: string;           // ISO timestamp
  end_at: string;
  location?: string;
  attendees?: string[];
  calendar_name?: string;
  is_recurring: boolean;
  recurrence_id?: string;     // For recurring event instances
}

The Sync Script

The sync script polls the Google Calendar API and emits events for new or changed entries:

// Conceptual: scripts/sync_calendar.ts

import { sql } from '../src/lib/db';
import { calendarEventSubjectId } from '../src/lib/subject_id';
import { google } from 'googleapis';

const SYNC_INTERVAL_MS = 5 * 60 * 1000;  // 5 minutes

async function syncCalendar(calendarId: string): Promise<void> {
  const calendar = google.calendar({ version: 'v3', auth: /* oauth client */ });

  // Get events modified since last sync
  const lastSync = await getLastSyncTime(calendarId);

  const response = await calendar.events.list({
    calendarId,
    updatedMin: lastSync?.toISOString(),
    singleEvents: true,
    orderBy: 'updated',
    maxResults: 100,
  });

  for (const event of response.data.items ?? []) {
    const subjectId = calendarEventSubjectId(calendarId, event.id!);

    // Emit event to the life stream
    await sql`
      INSERT INTO lifestream.events
      (occurred_at, source, subject, subject_id, event_type, payload)
      VALUES (
        ${event.updated ?? new Date().toISOString()},
        'calendar_sync',
        'calendar',
        ${subjectId},
        'calendar.event_synced',
        ${sql.json({
          title: event.summary,
          description: event.description,
          start_at: event.start?.dateTime ?? event.start?.date,
          end_at: event.end?.dateTime ?? event.end?.date,
          location: event.location,
          attendees: event.attendees?.map(a => a.email),
          calendar_name: calendarId,
          is_recurring: !!event.recurringEventId,
        })}
      )
    `;
  }

  await updateLastSyncTime(calendarId);
}

// Run sync loop
async function main(): Promise<void> {
  console.log('Starting calendar sync...');

  while (true) {
    await syncCalendar('primary');
    await new Promise(r => setTimeout(r, SYNC_INTERVAL_MS));
  }
}

Materializer Handler

The materializer needs a new handler for calendar.event_synced:

// From: stream-agents/scripts/consume_kafka_materialize.ts

async function handleCalendarEventSynced(event: LifestreamEvent): Promise<void> {
  const {
    title, description, start_at, end_at,
    location, attendees, calendar_name, is_recurring
  } = event.payload as CalendarEventSyncedPayload;

  // Extract calendar_id and external_event_id from subject_id
  // Format: "gcal:{calendar_id}:{event_id}"
  const parts = event.subject_id.split(':');
  const calendarId = parts[1];
  const externalEventId = parts.slice(2).join(':');

  // Upsert subject
  await sql`
    INSERT INTO lifestream.subjects (subject, subject_id, created_at, visibility)
    VALUES ('calendar', ${event.subject_id}, ${event.occurred_at}, 'private')
    ON CONFLICT (subject, subject_id) DO NOTHING
  `;

  // Upsert calendar event
  await sql`
    INSERT INTO lifestream.calendar_events
    (subject_id, calendar_id, external_event_id, title, description,
     start_at, end_at, location, attendees, status)
    VALUES (
      ${event.subject_id}, ${calendarId}, ${externalEventId},
      ${title}, ${description ?? null},
      ${start_at}::timestamptz, ${end_at}::timestamptz,
      ${location ?? null}, ${attendees ?? []}, 'confirmed'
    )
    ON CONFLICT (subject_id) DO UPDATE SET
      title = EXCLUDED.title,
      description = EXCLUDED.description,
      start_at = EXCLUDED.start_at,
      end_at = EXCLUDED.end_at,
      location = EXCLUDED.location,
      attendees = EXCLUDED.attendees
  `;

  console.log(`  [calendar.event_synced] ${title} @ ${start_at}`);
}

Enrichment: What Can We Do With Calendar Events?

Once calendar events are in the life stream, the enrichment possibilities are interesting:

  • Auto-tagging: Extract topics from meeting titles (“Q1 Planning” → quarterly-planning, strategy)
  • Attendee patterns: Detect recurring 1:1s, team standups, cross-functional meetings
  • Related content: Surface links you’ve saved that match meeting topics
  • Time analysis: When do your deep work blocks actually happen?

The existing enricher agent could be extended to handle calendar events, or you could create a dedicated calendar enrichment agent. The pattern is the same: consume events, call an LLM, emit enriched results.

Direction 2: Emitting to Calendar

Now the more interesting direction. Your life stream contains todos with due dates. What if they automatically appeared on your calendar?

The Flow

todo.created (with due_at)


┌─────────────────────────────┐
│  agent:calendar_publisher   │
│  - Filters todos with dates │
│  - Creates calendar events  │
│  - Tracks what was created  │
└─────────────────────────────┘


Google Calendar API


calendar.event_created


Materializer updates state

The Publisher Agent

This agent listens for todo.created events and creates corresponding calendar entries:

// Conceptual: scripts/agent_calendar_publisher.ts

const CONSUMER_GROUP = 'calendar-publisher-v1';

async function handleMessage({ message }: EachMessagePayload): Promise<void> {
  const event: LifestreamEvent = JSON.parse(message.value!.toString());

  // Only process todos
  if (event.event_type !== 'todo.created') return;

  const { title, due_at, project } = event.payload as {
    title: string;
    due_at?: string;
    project?: string;
  };

  // Skip todos without due dates
  if (!due_at) return;

  // Idempotency: check if we already created a calendar event for this todo
  if (await calendarEventExistsForTodo(event.subject_id)) {
    console.log(`[calendar-publisher] Already created event for ${event.subject_id}`);
    return;
  }

  // Create the calendar event
  const calendar = google.calendar({ version: 'v3', auth: /* oauth client */ });

  const calendarEvent = await calendar.events.insert({
    calendarId: 'primary',
    requestBody: {
      summary: title,
      description: `Todo from life stream: ${event.subject_id}`,
      start: {
        dateTime: due_at,
        timeZone: 'America/New_York',
      },
      end: {
        dateTime: addHours(due_at, 1),  // 1 hour default duration
        timeZone: 'America/New_York',
      },
      // Store the todo subject_id for two-way sync
      extendedProperties: {
        private: {
          lifestream_todo_id: event.subject_id,
        },
      },
    },
  });

  // Emit confirmation event
  const calendarSubjectId = calendarEventSubjectId('primary', calendarEvent.data.id!);

  await sql`
    INSERT INTO lifestream.events
    (occurred_at, source, subject, subject_id, event_type, payload, causation_id)
    VALUES (
      now(),
      'agent:calendar_publisher',
      'calendar',
      ${calendarSubjectId},
      'calendar.event_created',
      ${sql.json({
        title,
        start_at: due_at,
        todo_subject_id: event.subject_id,
        external_event_id: calendarEvent.data.id,
      })},
      ${event.id}::uuid
    )
  `;

  console.log(`[calendar-publisher] Created calendar event for todo: ${title}`);
}

Tracking the Relationship

Notice the extendedProperties in the calendar event creation. This stores the todo’s subject_id in the calendar event itself, enabling two-way sync later.

We also emit a calendar.event_created event with a causation_id pointing back to the original todo.created event. This creates a traceable chain:

todo.created (id: abc123)

    └── causation_id


calendar.event_created (causation_id: abc123)

The Sync Problem

If you do want two-way sync, the pattern is:

  1. Calendar sync script detects changes to events with lifestream_todo_id
  2. Emit calendar.event_updated with the todo reference
  3. A handler (or agent) updates the todo’s due_at accordingly
  4. Careful conflict resolution: which system is source of truth?

For most personal systems, one-way is sufficient. The calendar becomes a view into your todos, not a separate source of truth.

The Extension Checklist

When adding any new source, work through this checklist:

1. Schema

  • State table(s) in schema.sql
  • Metadata table if enrichment is planned
  • Appropriate indexes for query patterns

2. Subject IDs

  • Deterministic ID generator in subject_id.ts
  • Clear prefix indicating source system
  • Collision-free across instances

3. Event Types

  • Document each event type and its payload
  • Define which events trigger which actions
  • Consider both sync (ingest) and action (emit) directions

4. Ingestion

  • Sync script or webhook handler
  • Idempotent event emission
  • Rate limiting for external APIs
  • Error handling that doesn’t block the pipeline

5. Materializer

  • Handler for each new event type
  • Upsert patterns for state tables
  • Proper subject registration

6. Agents (optional)

  • Enrichment agent if LLM processing needed
  • Action agent if emitting to external systems
  • Idempotency checks

Cross-Domain Potential

We’re keeping cross-domain enrichment brief, but it’s worth mentioning: the tag catalog we built for links works for any domain.

When the calendar enricher runs, it can:

  1. Read the existing tag catalog (KTable pattern)
  2. Prefer existing tags when they fit meeting topics
  3. Add new tags when needed
  4. The materializer stores them in calendar_event_metadata.tags

Now queries can span domains: “Show me links, voice notes, and meetings related to kubernetes.” The architecture already supports this, you just need to run the queries.

What Makes This Extensible

The patterns we’ve established make adding new sources straightforward:

  1. Unified event format: Everything is an event with the same structure
  2. Subject ID consistency: Any entity can be referenced across domains
  3. Materializer routing: One switch statement, add a case
  4. Agent independence: New agents don’t affect existing ones
  5. Tag catalog sharing: Semantic consistency across all domains

The calendar integration we’ve sketched here (syncing events in, publishing todos out) follows exactly the patterns used for links and voice notes. The code structure is familiar. The debugging tools work the same way. The replay semantics are identical.

That’s the payoff of investing in a consistent architecture. Each new domain is a straightforward addition, not a new system to understand.


Extending the system isn’t about writing more code, it’s about fitting new sources into existing patterns. Calendar is just one example. RSS feeds, GitHub activity, reading lists, health data from wearables: they all follow the same flow (schema, subject IDs, events, materializer, optional agents).

The life stream grows with you. Add the sources that matter to your workflow. Connect them through tags and context. Let the system participate in your digital life, not just observe it.

Previous: Part 5: Voice Notes & Context Triggers