Workload Lifecycle Management

6.1 Lifecycle State Machine

┌─────────────────────────────────────────────────────────────────────────────┐
│                        WORKLOAD LIFECYCLE STATE MACHINE                      │
│                                                                              │
│                           ┌──────────┐                                      │
│                           │ PENDING  │                                      │
│                           └────┬─────┘                                      │
│                                │                                             │
│                    [Placement found]                                         │
│                                │                                             │
│                                ▼                                             │
│                        ┌──────────────┐                                     │
│           ┌───────────│ PROVISIONING │───────────┐                         │
│           │            └──────┬───────┘           │                         │
│           │                   │                   │                         │
│    [Provision failed]   [Instance ready]   [Timeout]                        │
│           │                   │                   │                         │
│           ▼                   ▼                   ▼                         │
│    ┌──────────┐        ┌──────────┐        ┌──────────┐                    │
│    │  FAILED  │        │ RUNNING  │        │  FAILED  │                    │
│    └──────────┘        └────┬─────┘        └──────────┘                    │
│                             │                                               │
│           ┌─────────────────┼─────────────────┐                            │
│           │                 │                 │                             │
│    [User terminate]  [Spot interrupt]  [Health check fail]                  │
│           │                 │                 │                             │
│           ▼                 ▼                 ▼                             │
│    ┌──────────┐      ┌─────────────┐   ┌──────────┐                        │
│    │ STOPPING │      │ INTERRUPTED │   │ UNHEALTHY│                        │
│    └────┬─────┘      └──────┬──────┘   └────┬─────┘                        │
│         │                   │               │                               │
│         │            [Cleanup complete]     │                               │
│         │                   │               │                               │
│         ▼                   ▼               ▼                               │
│    ┌─────────────────────────────────────────────┐                         │
│    │                  TERMINATED                  │                         │
│    └─────────────────────────────────────────────┘                         │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

6.2 Workload Provisioning Workflow

// orchestrator/src/provision-workflow.ts
export class ProvisionWorkflow extends WorkflowEntrypoint<Env, ProvisionRequest> {
  
  async run(event: WorkflowEvent<ProvisionRequest>, step: WorkflowStep) {
    const { customerId, workloadId, requirements, config } = event.payload;
    
    // Step 1: Validate customer quotas
    await step.do('validate-quotas', async () => {
      const customer = await this.env.DB.prepare(
        'SELECT * FROM customers WHERE id = ?'
      ).bind(customerId).first();
      
      const activeWorkloads = await this.env.DB.prepare(
        'SELECT COUNT(*) as count FROM workloads WHERE customer_id = ? AND status = ?'
      ).bind(customerId, 'RUNNING').first();
      
      if (activeWorkloads.count >= customer.max_concurrent_workloads) {
        throw new Error(`Quota exceeded: max ${customer.max_concurrent_workloads} concurrent workloads`);
      }
    });
    
    // Step 2: Find optimal placement
    const placement = await step.do('find-placement', async () => {
      const engine = this.env.ARBITRAGE_ENGINE.get(
        this.env.ARBITRAGE_ENGINE.idFromName('global')
      );
      return engine.findOptimalPlacement(requirements);
    });
    
    // Step 3: Update status to PROVISIONING
    await step.do('update-status-provisioning', async () => {
      await this.env.DB.prepare(
        'UPDATE workloads SET status = ?, started_at = ? WHERE id = ?'
      ).bind('PROVISIONING', Date.now(), workloadId).run();
    });
    
    // Step 4: Provision cloud resources
    const instance = await step.do('provision-cloud-resources', {
      retries: { limit: 3, delay: '10 seconds', backoff: 'exponential' },
    }, async () => {
      const provisioner = new CloudProvisioner(this.env, placement.provider);
      
      return provisioner.createSpotInstance({
        region: placement.region,
        availabilityZone: placement.availabilityZone,
        instanceType: placement.instanceType,
        imageId: config.imageId || await this.getDefaultImage(placement.provider),
        userData: this.generateUserData(config),
        tags: {
          'workload-id': workloadId,
          'customer-id': customerId,
          'managed-by': 'compute-arbitrage',
        },
        spotOptions: {
          maxPrice: placement.pricePerHour * 1.1, // 10% buffer
          interruptionBehavior: 'terminate',
        },
      });
    });
    
    // Step 5: Wait for instance to be running
    await step.do('wait-for-instance', async () => {
      const provisioner = new CloudProvisioner(this.env, placement.provider);
      
      for (let i = 0; i < 60; i++) { // Max 5 minutes
        const status = await provisioner.getInstanceStatus(instance.instanceId);
        
        if (status === 'running') {
          return;
        }
        
        if (status === 'terminated' || status === 'failed') {
          throw new Error(`Instance failed to start: ${status}`);
        }
        
        await new Promise(resolve => setTimeout(resolve, 5000));
      }
      
      throw new Error('Timeout waiting for instance to start');
    });
    
    // Step 6: Wait for workload to be ready (health check passes)
    await step.do('wait-for-workload-ready', async () => {
      const healthEndpoint = `http://${instance.publicIP}:${config.healthCheckPort || 8080}/health`;
      
      for (let i = 0; i < 60; i++) { // Max 5 minutes
        try {
          const response = await fetch(healthEndpoint);
          if (response.ok) {
            return;
          }
        } catch (e) {
          // Instance not ready yet
        }
        
        await new Promise(resolve => setTimeout(resolve, 5000));
      }
      
      throw new Error('Timeout waiting for workload health check');
    });
    
    // Step 7: Register routing
    await step.do('register-routing', async () => {
      // Update KV for fast routing
      await this.env.ROUTING_KV.put(
        `workload:${workloadId}:endpoint`,
        instance.publicIP
      );
      await this.env.ROUTING_KV.put(
        `workload:${workloadId}:status`,
        'RUNNING'
      );
      
      // Update D1 for consistency
      await this.env.DB.prepare(`
        INSERT INTO workload_routing (workload_id, endpoint, provider, region, instance_id, updated_at)
        VALUES (?, ?, ?, ?, ?, ?)
        ON CONFLICT (workload_id) DO UPDATE SET
          endpoint = excluded.endpoint,
          provider = excluded.provider,
          region = excluded.region,
          instance_id = excluded.instance_id,
          updated_at = excluded.updated_at
      `).bind(
        workloadId,
        instance.publicIP,
        placement.provider,
        placement.region,
        instance.instanceId,
        Date.now()
      ).run();
      
      // Update workload record
      await this.env.DB.prepare(
        'UPDATE workloads SET status = ? WHERE id = ?'
      ).bind('RUNNING', workloadId).run();
      
      // Insert instance record
      await this.env.DB.prepare(`
        INSERT INTO workload_instances 
        (id, workload_id, provider, region, instance_type, instance_id, public_ip, status, spot_price, created_at)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
      `).bind(
        crypto.randomUUID(),
        workloadId,
        placement.provider,
        placement.region,
        placement.instanceType,
        instance.instanceId,
        instance.publicIP,
        'RUNNING',
        placement.pricePerHour,
        Date.now()
      ).run();
    });
    
    // Step 8: Initialize workload storage (if enabled)
    await step.do('initialize-storage', async () => {
      if (config.storage?.enabled) {
        const quotaBytes = (config.storage.quotaGB || 10) * 1024 * 1024 * 1024;
        const retentionDays = config.storage.retentionDays || 30;
        
        // Create storage config record
        await this.env.DB.prepare(`
          INSERT INTO workload_storage_config 
          (workload_id, storage_enabled, storage_quota_bytes, retention_days, created_at, updated_at)
          VALUES (?, ?, ?, ?, ?, ?)
        `).bind(
          workloadId,
          true,
          quotaBytes,
          retentionDays,
          Date.now(),
          Date.now()
        ).run();
        
        // Initialize KV cache for fast quota checks
        await this.env.ROUTING_KV.put(`workload:${workloadId}:storage:enabled`, 'true');
        await this.env.ROUTING_KV.put(`workload:${workloadId}:storage:quota`, String(quotaBytes));
        await this.env.ROUTING_KV.put(`workload:${workloadId}:storage:used`, '0');
      }
    });
    
    // Step 9: Start health monitoring
    await step.do('start-health-monitoring', async () => {
      const monitor = this.env.HEALTH_MONITOR.get(
        this.env.HEALTH_MONITOR.idFromName(workloadId)
      );
      await monitor.startMonitoring({
        workloadId,
        instanceId: instance.instanceId,
        provider: placement.provider,
        region: placement.region,
        endpoint: instance.publicIP,
        healthCheckPort: config.healthCheckPort || 8080,
      });
    });
    
    // Step 10: Notify customer
    await step.do('notify-customer', async () => {
      await this.env.NOTIFICATION_QUEUE.send({
        type: 'WORKLOAD_READY',
        customerId,
        workloadId,
        endpoint: `${workloadId}.workloads.computearbitrage.com`,
        provider: placement.provider,
        region: placement.region,
        instanceType: placement.instanceType,
        estimatedCostPerHour: placement.pricePerHour,
      });
    });
    
    return {
      success: true,
      workloadId,
      endpoint: `${workloadId}.workloads.computearbitrage.com`,
      provider: placement.provider,
      region: placement.region,
      instanceType: placement.instanceType,
      pricePerHour: placement.pricePerHour,
    };
  }
}

6.3 Termination Workflow

// orchestrator/src/terminate-workflow.ts
export class TerminateWorkflow extends WorkflowEntrypoint<Env, TerminateRequest> {
  
  async run(event: WorkflowEvent<TerminateRequest>, step: WorkflowStep) {
    const { workloadId, reason } = event.payload;
    
    // Step 1: Get workload details
    const workload = await step.do('get-workload', async () => {
      const row = await this.env.DB.prepare(`
        SELECT w.*, wr.provider, wr.region, wr.instance_id
        FROM workloads w
        JOIN workload_routing wr ON w.id = wr.workload_id
        WHERE w.id = ?
      `).bind(workloadId).first();
      
      if (!row) {
        throw new Error('Workload not found');
      }
      
      return row;
    });
    
    // Step 2: Update status to STOPPING
    await step.do('update-status-stopping', async () => {
      await this.env.DB.prepare(
        'UPDATE workloads SET status = ? WHERE id = ?'
      ).bind('STOPPING', workloadId).run();
      
      await this.env.ROUTING_KV.put(`workload:${workloadId}:status`, 'STOPPING');
    });
    
    // Step 3: Stop health monitoring
    await step.do('stop-health-monitoring', async () => {
      const monitor = this.env.HEALTH_MONITOR.get(
        this.env.HEALTH_MONITOR.idFromName(workloadId)
      );
      await monitor.stopMonitoring();
    });
    
    // Step 4: Terminate cloud instance
    await step.do('terminate-instance', {
      retries: { limit: 3, delay: '5 seconds' },
    }, async () => {
      const provisioner = new CloudProvisioner(this.env, workload.provider);
      await provisioner.terminateInstance(workload.instance_id, workload.region);
    });
    
    // Step 5: Schedule storage cleanup (if storage enabled)
    await step.do('schedule-storage-cleanup', async () => {
      const storageConfig = await this.env.DB.prepare(
        'SELECT * FROM workload_storage_config WHERE workload_id = ?'
      ).bind(workloadId).first();
      
      if (storageConfig?.storage_enabled) {
        const expiresAt = Date.now() + (storageConfig.retention_days * 24 * 60 * 60 * 1000);
        
        // Mark all objects with expiration timestamp
        await this.env.DB.prepare(`
          UPDATE workload_storage_objects 
          SET expires_at = ? 
          WHERE workload_id = ? AND expires_at IS NULL
        `).bind(expiresAt, workloadId).run();
        
        // Clean up KV cache
        await this.env.ROUTING_KV.delete(`workload:${workloadId}:storage:enabled`);
        await this.env.ROUTING_KV.delete(`workload:${workloadId}:storage:quota`);
        await this.env.ROUTING_KV.delete(`workload:${workloadId}:storage:used`);
        
        // Queue cleanup job for after retention period
        await this.env.EVENTS_QUEUE.send({
          type: 'STORAGE_CLEANUP_SCHEDULED',
          workloadId,
          expiresAt,
          scheduledAt: Date.now(),
        });
      }
    });
    
    // Step 6: Remove routing
    await step.do('remove-routing', async () => {
      await this.env.ROUTING_KV.delete(`workload:${workloadId}:endpoint`);
      await this.env.ROUTING_KV.delete(`workload:${workloadId}:status`);
      
      await this.env.DB.prepare(
        'DELETE FROM workload_routing WHERE workload_id = ?'
      ).bind(workloadId).run();
    });
    
    // Step 7: Update final status
    await step.do('update-final-status', async () => {
      await this.env.DB.prepare(`
        UPDATE workloads SET status = ?, terminated_at = ?, termination_reason = ?
        WHERE id = ?
      `).bind('TERMINATED', Date.now(), reason, workloadId).run();
      
      await this.env.DB.prepare(`
        UPDATE workload_instances SET status = ?, terminated_at = ?
        WHERE workload_id = ? AND status = 'RUNNING'
      `).bind('TERMINATED', Date.now(), workloadId).run();
    });
    
    // Step 8: Calculate and record billing
    await step.do('record-billing', async () => {
      const instance = await this.env.DB.prepare(`
        SELECT * FROM workload_instances WHERE workload_id = ? ORDER BY created_at DESC LIMIT 1
      `).bind(workloadId).first();
      
      const durationHours = (instance.terminated_at - instance.created_at) / 3600000;
      const totalCost = durationHours * instance.spot_price;
      
      await this.env.BILLING_QUEUE.send({
        customerId: workload.customer_id,
        workloadId,
        provider: instance.provider,
        region: instance.region,
        instanceType: instance.instance_type,
        durationHours,
        spotPricePerHour: instance.spot_price,
        totalCost,
      });
    });
    
    return { success: true, workloadId };
  }
}

6.4 Workload State Storage (R2)

Workloads can optionally persist state and artifacts to Cloudflare R2, enabling:

  • State preservation during migrations - Checkpoints survive spot interruptions and provider changes
  • Zero egress costs - Download results without per-GB fees (unlike AWS S3, Azure Blob, GCP Cloud Storage)
  • Cloud-agnostic storage - Decoupled from provider-specific storage services

Storage Architecture

┌─────────────────────────────────────────────────────────────────────────────────┐
│                         WORKLOAD STATE STORAGE (R2)                             │
│                                                                                 │
│  ┌─────────────────┐         ┌─────────────────┐         ┌─────────────────┐   │
│  │ Workload        │         │ Cloudflare      │         │ Workload        │   │
│  │ Instance (AWS)  │────────▶│ R2 Bucket       │◀────────│ Instance (GCP)  │   │
│  └─────────────────┘         │                 │         └─────────────────┘   │
│         │                    │  workloads/     │                │              │
│         │                    │    {id}/        │                │              │
│         │                    │      state/     │                │              │
│         │                    │      checkpts/  │                │              │
│  [Spot Interruption]         │      artifacts/ │         [New Instance]        │
│         │                    └────────┬────────┘                │              │
│         │                             │                         │              │
│         └─────── Checkpoint ──────────┴────── Restore ──────────┘              │
│                                                                                 │
│  Migration Flow:                                                                │
│  1. Spot interruption detected on AWS instance                                  │
│  2. Final checkpoint uploaded to R2                                             │
│  3. New instance provisioned on GCP (cheaper option)                            │
│  4. Checkpoint restored from R2                                                 │
│  5. Workload resumes with state intact                                          │
│                                                                                 │
└─────────────────────────────────────────────────────────────────────────────────┘

Storage Namespace Structure

compute-arbitrage-workload-storage/
  workloads/
    {workloadId}/
      state/                    # Application state files
        config.json
        session.dat
      checkpoints/              # Full checkpoint snapshots
        {timestamp}/
          manifest.json
          data/
            chunk-001.bin
            chunk-002.bin
      artifacts/                # Output files, results, logs
        results/
          output-001.csv
        logs/
          app.log

Pre-signed URL Generation

Workloads interact with R2 storage via pre-signed URLs, enabling direct uploads/downloads without proxying through Workers.

// storage/src/storage-manager.ts
export class WorkloadStorageManager {
  constructor(
    private env: Env,
    private workloadId: string
  ) {}

  async generateUploadUrl(
    key: string,
    contentType: string,
    expiresIn: number = 3600
  ): Promise<{ uploadUrl: string; expiresAt: number }> {
    // Validate quota
    const quotaOk = await this.checkQuota();
    if (!quotaOk) {
      throw new Error('Storage quota exceeded');
    }

    const objectKey = `workloads/${this.workloadId}/${key}`;
    
    // Generate pre-signed URL for direct upload
    const uploadUrl = await this.env.WORKLOAD_STORAGE.createMultipartUpload(objectKey, {
      httpMetadata: { contentType },
    });

    return {
      uploadUrl: uploadUrl.uploadId,
      expiresAt: Date.now() + (expiresIn * 1000),
    };
  }

  async generateDownloadUrl(
    key: string,
    expiresIn: number = 3600
  ): Promise<{ downloadUrl: string; expiresAt: number }> {
    const objectKey = `workloads/${this.workloadId}/${key}`;
    
    const object = await this.env.WORKLOAD_STORAGE.head(objectKey);
    if (!object) {
      throw new Error('Object not found');
    }

    const expiresAt = Date.now() + (expiresIn * 1000);
    const signature = await this.generateSignature(objectKey, expiresAt);
    
    return {
      downloadUrl: `https://storage.computearbitrage.com/${objectKey}?expires=${expiresAt}&sig=${signature}`,
      expiresAt,
    };
  }

  private async checkQuota(): Promise<boolean> {
    const quota = await this.env.ROUTING_KV.get(`workload:${this.workloadId}:storage:quota`);
    const used = await this.env.ROUTING_KV.get(`workload:${this.workloadId}:storage:used`);
    
    return parseInt(used || '0') < parseInt(quota || '10737418240');
  }
}

Retention Policy

When a workload is terminated, storage objects are retained based on the configured retention period:

// storage/src/retention-worker.ts
export async function processStorageRetention(
  env: Env,
  workloadId: string,
  retentionDays: number
): Promise<void> {
  const expiresAt = Date.now() + (retentionDays * 24 * 60 * 60 * 1000);
  
  // Mark all objects with expiration timestamp
  await env.DB.prepare(`
    UPDATE workload_storage_objects 
    SET expires_at = ? 
    WHERE workload_id = ? AND expires_at IS NULL
  `).bind(expiresAt, workloadId).run();

  // Schedule cleanup job
  await env.EVENTS_QUEUE.send({
    type: 'STORAGE_CLEANUP_SCHEDULED',
    workloadId,
    expiresAt,
    scheduledAt: Date.now(),
  });
}

// Cron job to clean up expired objects
export async function cleanupExpiredStorage(env: Env): Promise<void> {
  const now = Date.now();
  
  // Find expired objects
  const expired = await env.DB.prepare(`
    SELECT workload_id, object_key 
    FROM workload_storage_objects 
    WHERE expires_at IS NOT NULL AND expires_at < ?
    LIMIT 1000
  `).bind(now).all();

  // Delete from R2
  for (const obj of expired.results) {
    const objectKey = `workloads/${obj.workload_id}/${obj.object_key}`;
    await env.WORKLOAD_STORAGE.delete(objectKey);
  }

  // Remove from database
  await env.DB.prepare(`
    DELETE FROM workload_storage_objects 
    WHERE expires_at IS NOT NULL AND expires_at < ?
  `).bind(now).run();
}