Core Components

2.1 Routing Layer

The routing layer provides a stable endpoint for users to access their workloads regardless of where infrastructure is running.

HTTP Router (Cloudflare Workers)

// http-router/src/index.ts
export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);
    const workloadId = extractWorkloadId(url, request.headers);
    
    // Authenticate request
    const auth = await authenticateRequest(request, env);
    if (!auth.valid) {
      return new Response('Unauthorized', { status: 401 });
    }
    
    // Look up current endpoint for this workload
    const endpoint = await env.ROUTING_KV.get(`workload:${workloadId}:endpoint`);
    if (!endpoint) {
      return new Response('Workload not found', { status: 404 });
    }
    
    const targetUrl = new URL(request.url);
    targetUrl.hostname = endpoint;
    
    // Forward request with tracing headers
    return fetch(new Request(targetUrl, {
      method: request.method,
      headers: addTracingHeaders(request.headers, workloadId),
      body: request.body,
    }));
  }
};

TCP Router (Spectrum + Durable Objects)

For TCP traffic, we use Cloudflare Spectrum combined with Durable Objects for connection state management.

// tcp-router/src/connection-manager.ts
export class ConnectionManager extends DurableObject {
  private connections: Map<string, ConnectionState> = new Map();
  
  async handleConnection(connectionId: string, workloadId: string): Promise<string> {
    // Get current backend endpoint
    const endpoint = await this.env.ROUTING_KV.get(`workload:${workloadId}:endpoint`);
    
    // Track active connection
    this.connections.set(connectionId, {
      workloadId,
      backend: endpoint,
      connectedAt: Date.now(),
    });
    
    // Store connection mapping in SQLite for durability
    await this.ctx.storage.sql.exec(
      `INSERT INTO connections (id, workload_id, backend, connected_at) 
       VALUES (?, ?, ?, ?)`,
      connectionId, workloadId, endpoint, Date.now()
    );
    
    return endpoint;
  }
  
  async handleDisconnect(connectionId: string): Promise<void> {
    this.connections.delete(connectionId);
    await this.ctx.storage.sql.exec(
      `DELETE FROM connections WHERE id = ?`,
      connectionId
    );
  }
}

2.2 Price Arbitrage Engine

The arbitrage engine continuously monitors spot prices across all three cloud providers and selects optimal placement.

// arbitrage-engine/src/price-collector.ts
export class PriceCollector extends DurableObject {
  private prices: Map<string, SpotPrice[]> = new Map();
  private lastUpdate: number = 0;
  
  async alarm(): Promise<void> {
    // Collect prices from all providers in parallel
    const [awsPrices, azurePrices, gcpPrices] = await Promise.all([
      this.collectAWSPrices(),
      this.collectAzurePrices(),
      this.collectGCPPrices(),
    ]);
    
    // Normalize and store prices
    const normalizedPrices = this.normalizePrices([
      ...awsPrices,
      ...azurePrices,
      ...gcpPrices,
    ]);
    
    // Store in SQLite for querying
    await this.storePrices(normalizedPrices);
    
    // Update KV cache for fast lookups
    await this.updatePriceCache(normalizedPrices);
    
    // Schedule next collection (every 60 seconds)
    await this.ctx.storage.setAlarm(Date.now() + 60000);
  }
  
  async findOptimalPlacement(requirements: WorkloadRequirements): Promise<PlacementDecision> {
    // Query available options matching requirements
    const candidates = await this.ctx.storage.sql.exec(`
      SELECT provider, region, instance_type, price_per_hour, availability_score
      FROM spot_prices
      WHERE vcpu >= ? AND memory_gb >= ? AND gpu_count >= ?
        AND interruption_rate < ?
      ORDER BY price_per_hour ASC
      LIMIT 10
    `, requirements.minCPU, requirements.minMemoryGB, 
       requirements.gpuCount || 0, requirements.maxInterruptionRate || 0.2);
    
    // Score candidates based on price, reliability, and region preference
    return this.scoreCandidates(candidates, requirements);
  }
}

2.3 Workload Orchestrator (Cloudflare Workflows)

Long-running workload lifecycle managed via Cloudflare Workflows for durability.

// orchestrator/src/workload-workflow.ts
export class WorkloadProvisioningWorkflow extends WorkflowEntrypoint<Env, WorkloadRequest> {
  
  async run(event: WorkflowEvent<WorkloadRequest>, step: WorkflowStep): Promise<WorkloadResult> {
    const { customerId, requirements, workloadConfig } = event.payload;
    
    // Step 1: Find optimal placement
    const placement = await step.do('find-optimal-placement', async () => {
      const arbitrageEngine = this.env.ARBITRAGE_ENGINE.get(
        this.env.ARBITRAGE_ENGINE.idFromName('global')
      );
      return arbitrageEngine.findOptimalPlacement(requirements);
    });
    
    // Step 2: Provision infrastructure via Kivera proxy
    const instance = await step.do('provision-instance', async () => {
      return this.provisionInstance(placement, workloadConfig);
    });
    
    // Step 3: Wait for instance to be ready
    await step.do('wait-for-ready', async () => {
      return this.waitForInstanceReady(instance, { timeout: '5 minutes' });
    });
    
    // Step 4: Deploy workload
    const deployment = await step.do('deploy-workload', async () => {
      return this.deployWorkload(instance, workloadConfig);
    });
    
    // Step 5: Register routing
    await step.do('register-routing', async () => {
      await this.env.ROUTING_KV.put(
        `workload:${event.payload.workloadId}:endpoint`,
        instance.publicIP,
        { expirationTtl: 86400 }
      );
      
      // Also store in D1 for querying
      await this.env.DB.prepare(`
        INSERT INTO workload_routing (workload_id, endpoint, provider, region, instance_id)
        VALUES (?, ?, ?, ?, ?)
      `).bind(
        event.payload.workloadId,
        instance.publicIP,
        placement.provider,
        placement.region,
        instance.instanceId
      ).run();
    });
    
    // Step 6: Start health monitoring
    await step.do('start-monitoring', async () => {
      // Schedule health checks via Durable Object alarm
      const monitor = this.env.HEALTH_MONITOR.get(
        this.env.HEALTH_MONITOR.idFromName(event.payload.workloadId)
      );
      await monitor.startMonitoring(instance);
    });
    
    return {
      workloadId: event.payload.workloadId,
      endpoint: instance.publicIP,
      provider: placement.provider,
      region: placement.region,
      instanceType: placement.instanceType,
      estimatedCostPerHour: placement.pricePerHour,
    };
  }
  
  private async provisionInstance(
    placement: PlacementDecision,
    config: WorkloadConfig
  ): Promise<InstanceDetails> {
    // Route through Kivera proxy for policy enforcement
    const kiveraEndpoint = this.env.KIVERA_PROXY_ENDPOINT;
    
    switch (placement.provider) {
      case 'aws':
        return this.provisionAWSSpotInstance(placement, config, kiveraEndpoint);
      case 'azure':
        return this.provisionAzureSpotVM(placement, config, kiveraEndpoint);
      case 'gcp':
        return this.provisionGCPPreemptibleVM(placement, config, kiveraEndpoint);
    }
  }
}

2.4 Health Monitor

Continuous health checking with spot interruption detection.

// health-monitor/src/index.ts
export class HealthMonitor extends DurableObject {
  private workloadId: string;
  private instance: InstanceDetails;
  private consecutiveFailures: number = 0;
  
  async startMonitoring(instance: InstanceDetails): Promise<void> {
    this.instance = instance;
    await this.ctx.storage.put('instance', instance);
    
    // Start health check alarm (every 30 seconds)
    await this.ctx.storage.setAlarm(Date.now() + 30000);
  }
  
  async alarm(): Promise<void> {
    const instance = await this.ctx.storage.get<InstanceDetails>('instance');
    if (!instance) return;
    
    // Check instance health
    const healthResult = await this.checkHealth(instance);
    
    if (!healthResult.healthy) {
      this.consecutiveFailures++;
      
      // Check if this is a spot interruption
      if (healthResult.spotInterruption) {
        await this.handleSpotInterruption(instance);
        return;
      }
      
      // After 3 consecutive failures, mark unhealthy
      if (this.consecutiveFailures >= 3) {
        await this.markUnhealthy(instance);
      }
    } else {
      this.consecutiveFailures = 0;
    }
    
    // Schedule next check
    await this.ctx.storage.setAlarm(Date.now() + 30000);
  }
  
  private async handleSpotInterruption(instance: InstanceDetails): Promise<void> {
    // Notify user via webhook/email
    await this.notifyUser({
      type: 'SPOT_INTERRUPTION',
      workloadId: this.workloadId,
      instance: instance,
      message: 'Spot instance interrupted by cloud provider',
    });
    
    // Update routing to return 503 with retry-after
    await this.env.ROUTING_KV.put(
      `workload:${this.workloadId}:status`,
      'INTERRUPTED',
      { expirationTtl: 3600 }
    );
    
    // Clean up resources
    await this.initiateCleanup(instance);
  }
}