Workload Lifecycle Management
6.1 Lifecycle State Machine
┌─────────────────────────────────────────────────────────────────────────────┐
│ WORKLOAD LIFECYCLE STATE MACHINE │
│ │
│ ┌──────────┐ │
│ │ PENDING │ │
│ └────┬─────┘ │
│ │ │
│ [Placement found] │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ ┌───────────│ PROVISIONING │───────────┐ │
│ │ └──────┬───────┘ │ │
│ │ │ │ │
│ [Provision failed] [Instance ready] [Timeout] │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ FAILED │ │ RUNNING │ │ FAILED │ │
│ └──────────┘ └────┬─────┘ └──────────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ │ │ │ │
│ [User terminate] [Spot interrupt] [Health check fail] │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌─────────────┐ ┌──────────┐ │
│ │ STOPPING │ │ INTERRUPTED │ │ UNHEALTHY│ │
│ └────┬─────┘ └──────┬──────┘ └────┬─────┘ │
│ │ │ │ │
│ │ [Cleanup complete] │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ TERMINATED │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
6.2 Workload Provisioning Workflow
// orchestrator/src/provision-workflow.ts
export class ProvisionWorkflow extends WorkflowEntrypoint<Env, ProvisionRequest> {
async run(event: WorkflowEvent<ProvisionRequest>, step: WorkflowStep) {
const { customerId, workloadId, requirements, config } = event.payload;
// Step 1: Validate customer quotas
await step.do('validate-quotas', async () => {
const customer = await this.env.DB.prepare(
'SELECT * FROM customers WHERE id = ?'
).bind(customerId).first();
const activeWorkloads = await this.env.DB.prepare(
'SELECT COUNT(*) as count FROM workloads WHERE customer_id = ? AND status = ?'
).bind(customerId, 'RUNNING').first();
if (activeWorkloads.count >= customer.max_concurrent_workloads) {
throw new Error(`Quota exceeded: max ${customer.max_concurrent_workloads} concurrent workloads`);
}
});
// Step 2: Find optimal placement
const placement = await step.do('find-placement', async () => {
const engine = this.env.ARBITRAGE_ENGINE.get(
this.env.ARBITRAGE_ENGINE.idFromName('global')
);
return engine.findOptimalPlacement(requirements);
});
// Step 3: Update status to PROVISIONING
await step.do('update-status-provisioning', async () => {
await this.env.DB.prepare(
'UPDATE workloads SET status = ?, started_at = ? WHERE id = ?'
).bind('PROVISIONING', Date.now(), workloadId).run();
});
// Step 4: Provision cloud resources
const instance = await step.do('provision-cloud-resources', {
retries: { limit: 3, delay: '10 seconds', backoff: 'exponential' },
}, async () => {
const provisioner = new CloudProvisioner(this.env, placement.provider);
return provisioner.createSpotInstance({
region: placement.region,
availabilityZone: placement.availabilityZone,
instanceType: placement.instanceType,
imageId: config.imageId || await this.getDefaultImage(placement.provider),
userData: this.generateUserData(config),
tags: {
'workload-id': workloadId,
'customer-id': customerId,
'managed-by': 'compute-arbitrage',
},
spotOptions: {
maxPrice: placement.pricePerHour * 1.1, // 10% buffer
interruptionBehavior: 'terminate',
},
});
});
// Step 5: Wait for instance to be running
await step.do('wait-for-instance', async () => {
const provisioner = new CloudProvisioner(this.env, placement.provider);
for (let i = 0; i < 60; i++) { // Max 5 minutes
const status = await provisioner.getInstanceStatus(instance.instanceId);
if (status === 'running') {
return;
}
if (status === 'terminated' || status === 'failed') {
throw new Error(`Instance failed to start: ${status}`);
}
await new Promise(resolve => setTimeout(resolve, 5000));
}
throw new Error('Timeout waiting for instance to start');
});
// Step 6: Wait for workload to be ready (health check passes)
await step.do('wait-for-workload-ready', async () => {
const healthEndpoint = `http://${instance.publicIP}:${config.healthCheckPort || 8080}/health`;
for (let i = 0; i < 60; i++) { // Max 5 minutes
try {
const response = await fetch(healthEndpoint);
if (response.ok) {
return;
}
} catch (e) {
// Instance not ready yet
}
await new Promise(resolve => setTimeout(resolve, 5000));
}
throw new Error('Timeout waiting for workload health check');
});
// Step 7: Register routing
await step.do('register-routing', async () => {
// Update KV for fast routing
await this.env.ROUTING_KV.put(
`workload:${workloadId}:endpoint`,
instance.publicIP
);
await this.env.ROUTING_KV.put(
`workload:${workloadId}:status`,
'RUNNING'
);
// Update D1 for consistency
await this.env.DB.prepare(`
INSERT INTO workload_routing (workload_id, endpoint, provider, region, instance_id, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT (workload_id) DO UPDATE SET
endpoint = excluded.endpoint,
provider = excluded.provider,
region = excluded.region,
instance_id = excluded.instance_id,
updated_at = excluded.updated_at
`).bind(
workloadId,
instance.publicIP,
placement.provider,
placement.region,
instance.instanceId,
Date.now()
).run();
// Update workload record
await this.env.DB.prepare(
'UPDATE workloads SET status = ? WHERE id = ?'
).bind('RUNNING', workloadId).run();
// Insert instance record
await this.env.DB.prepare(`
INSERT INTO workload_instances
(id, workload_id, provider, region, instance_type, instance_id, public_ip, status, spot_price, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`).bind(
crypto.randomUUID(),
workloadId,
placement.provider,
placement.region,
placement.instanceType,
instance.instanceId,
instance.publicIP,
'RUNNING',
placement.pricePerHour,
Date.now()
).run();
});
// Step 8: Initialize workload storage (if enabled)
await step.do('initialize-storage', async () => {
if (config.storage?.enabled) {
const quotaBytes = (config.storage.quotaGB || 10) * 1024 * 1024 * 1024;
const retentionDays = config.storage.retentionDays || 30;
// Create storage config record
await this.env.DB.prepare(`
INSERT INTO workload_storage_config
(workload_id, storage_enabled, storage_quota_bytes, retention_days, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
`).bind(
workloadId,
true,
quotaBytes,
retentionDays,
Date.now(),
Date.now()
).run();
// Initialize KV cache for fast quota checks
await this.env.ROUTING_KV.put(`workload:${workloadId}:storage:enabled`, 'true');
await this.env.ROUTING_KV.put(`workload:${workloadId}:storage:quota`, String(quotaBytes));
await this.env.ROUTING_KV.put(`workload:${workloadId}:storage:used`, '0');
}
});
// Step 9: Start health monitoring
await step.do('start-health-monitoring', async () => {
const monitor = this.env.HEALTH_MONITOR.get(
this.env.HEALTH_MONITOR.idFromName(workloadId)
);
await monitor.startMonitoring({
workloadId,
instanceId: instance.instanceId,
provider: placement.provider,
region: placement.region,
endpoint: instance.publicIP,
healthCheckPort: config.healthCheckPort || 8080,
});
});
// Step 10: Notify customer
await step.do('notify-customer', async () => {
await this.env.NOTIFICATION_QUEUE.send({
type: 'WORKLOAD_READY',
customerId,
workloadId,
endpoint: `${workloadId}.workloads.computearbitrage.com`,
provider: placement.provider,
region: placement.region,
instanceType: placement.instanceType,
estimatedCostPerHour: placement.pricePerHour,
});
});
return {
success: true,
workloadId,
endpoint: `${workloadId}.workloads.computearbitrage.com`,
provider: placement.provider,
region: placement.region,
instanceType: placement.instanceType,
pricePerHour: placement.pricePerHour,
};
}
}
6.3 Termination Workflow
// orchestrator/src/terminate-workflow.ts
export class TerminateWorkflow extends WorkflowEntrypoint<Env, TerminateRequest> {
async run(event: WorkflowEvent<TerminateRequest>, step: WorkflowStep) {
const { workloadId, reason } = event.payload;
// Step 1: Get workload details
const workload = await step.do('get-workload', async () => {
const row = await this.env.DB.prepare(`
SELECT w.*, wr.provider, wr.region, wr.instance_id
FROM workloads w
JOIN workload_routing wr ON w.id = wr.workload_id
WHERE w.id = ?
`).bind(workloadId).first();
if (!row) {
throw new Error('Workload not found');
}
return row;
});
// Step 2: Update status to STOPPING
await step.do('update-status-stopping', async () => {
await this.env.DB.prepare(
'UPDATE workloads SET status = ? WHERE id = ?'
).bind('STOPPING', workloadId).run();
await this.env.ROUTING_KV.put(`workload:${workloadId}:status`, 'STOPPING');
});
// Step 3: Stop health monitoring
await step.do('stop-health-monitoring', async () => {
const monitor = this.env.HEALTH_MONITOR.get(
this.env.HEALTH_MONITOR.idFromName(workloadId)
);
await monitor.stopMonitoring();
});
// Step 4: Terminate cloud instance
await step.do('terminate-instance', {
retries: { limit: 3, delay: '5 seconds' },
}, async () => {
const provisioner = new CloudProvisioner(this.env, workload.provider);
await provisioner.terminateInstance(workload.instance_id, workload.region);
});
// Step 5: Schedule storage cleanup (if storage enabled)
await step.do('schedule-storage-cleanup', async () => {
const storageConfig = await this.env.DB.prepare(
'SELECT * FROM workload_storage_config WHERE workload_id = ?'
).bind(workloadId).first();
if (storageConfig?.storage_enabled) {
const expiresAt = Date.now() + (storageConfig.retention_days * 24 * 60 * 60 * 1000);
// Mark all objects with expiration timestamp
await this.env.DB.prepare(`
UPDATE workload_storage_objects
SET expires_at = ?
WHERE workload_id = ? AND expires_at IS NULL
`).bind(expiresAt, workloadId).run();
// Clean up KV cache
await this.env.ROUTING_KV.delete(`workload:${workloadId}:storage:enabled`);
await this.env.ROUTING_KV.delete(`workload:${workloadId}:storage:quota`);
await this.env.ROUTING_KV.delete(`workload:${workloadId}:storage:used`);
// Queue cleanup job for after retention period
await this.env.EVENTS_QUEUE.send({
type: 'STORAGE_CLEANUP_SCHEDULED',
workloadId,
expiresAt,
scheduledAt: Date.now(),
});
}
});
// Step 6: Remove routing
await step.do('remove-routing', async () => {
await this.env.ROUTING_KV.delete(`workload:${workloadId}:endpoint`);
await this.env.ROUTING_KV.delete(`workload:${workloadId}:status`);
await this.env.DB.prepare(
'DELETE FROM workload_routing WHERE workload_id = ?'
).bind(workloadId).run();
});
// Step 7: Update final status
await step.do('update-final-status', async () => {
await this.env.DB.prepare(`
UPDATE workloads SET status = ?, terminated_at = ?, termination_reason = ?
WHERE id = ?
`).bind('TERMINATED', Date.now(), reason, workloadId).run();
await this.env.DB.prepare(`
UPDATE workload_instances SET status = ?, terminated_at = ?
WHERE workload_id = ? AND status = 'RUNNING'
`).bind('TERMINATED', Date.now(), workloadId).run();
});
// Step 8: Calculate and record billing
await step.do('record-billing', async () => {
const instance = await this.env.DB.prepare(`
SELECT * FROM workload_instances WHERE workload_id = ? ORDER BY created_at DESC LIMIT 1
`).bind(workloadId).first();
const durationHours = (instance.terminated_at - instance.created_at) / 3600000;
const totalCost = durationHours * instance.spot_price;
await this.env.BILLING_QUEUE.send({
customerId: workload.customer_id,
workloadId,
provider: instance.provider,
region: instance.region,
instanceType: instance.instance_type,
durationHours,
spotPricePerHour: instance.spot_price,
totalCost,
});
});
return { success: true, workloadId };
}
}
6.4 Workload State Storage (R2)
Workloads can optionally persist state and artifacts to Cloudflare R2, enabling:
- State preservation during migrations - Checkpoints survive spot interruptions and provider changes
- Zero egress costs - Download results without per-GB fees (unlike AWS S3, Azure Blob, GCP Cloud Storage)
- Cloud-agnostic storage - Decoupled from provider-specific storage services
Storage Architecture
┌─────────────────────────────────────────────────────────────────────────────────┐
│ WORKLOAD STATE STORAGE (R2) │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Workload │ │ Cloudflare │ │ Workload │ │
│ │ Instance (AWS) │────────▶│ R2 Bucket │◀────────│ Instance (GCP) │ │
│ └─────────────────┘ │ │ └─────────────────┘ │
│ │ │ workloads/ │ │ │
│ │ │ {id}/ │ │ │
│ │ │ state/ │ │ │
│ │ │ checkpts/ │ │ │
│ [Spot Interruption] │ artifacts/ │ [New Instance] │
│ │ └────────┬────────┘ │ │
│ │ │ │ │
│ └─────── Checkpoint ──────────┴────── Restore ──────────┘ │
│ │
│ Migration Flow: │
│ 1. Spot interruption detected on AWS instance │
│ 2. Final checkpoint uploaded to R2 │
│ 3. New instance provisioned on GCP (cheaper option) │
│ 4. Checkpoint restored from R2 │
│ 5. Workload resumes with state intact │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
Storage Namespace Structure
compute-arbitrage-workload-storage/
workloads/
{workloadId}/
state/ # Application state files
config.json
session.dat
checkpoints/ # Full checkpoint snapshots
{timestamp}/
manifest.json
data/
chunk-001.bin
chunk-002.bin
artifacts/ # Output files, results, logs
results/
output-001.csv
logs/
app.log
Pre-signed URL Generation
Workloads interact with R2 storage via pre-signed URLs, enabling direct uploads/downloads without proxying through Workers.
// storage/src/storage-manager.ts
export class WorkloadStorageManager {
constructor(
private env: Env,
private workloadId: string
) {}
async generateUploadUrl(
key: string,
contentType: string,
expiresIn: number = 3600
): Promise<{ uploadUrl: string; expiresAt: number }> {
// Validate quota
const quotaOk = await this.checkQuota();
if (!quotaOk) {
throw new Error('Storage quota exceeded');
}
const objectKey = `workloads/${this.workloadId}/${key}`;
// Generate pre-signed URL for direct upload
const uploadUrl = await this.env.WORKLOAD_STORAGE.createMultipartUpload(objectKey, {
httpMetadata: { contentType },
});
return {
uploadUrl: uploadUrl.uploadId,
expiresAt: Date.now() + (expiresIn * 1000),
};
}
async generateDownloadUrl(
key: string,
expiresIn: number = 3600
): Promise<{ downloadUrl: string; expiresAt: number }> {
const objectKey = `workloads/${this.workloadId}/${key}`;
const object = await this.env.WORKLOAD_STORAGE.head(objectKey);
if (!object) {
throw new Error('Object not found');
}
const expiresAt = Date.now() + (expiresIn * 1000);
const signature = await this.generateSignature(objectKey, expiresAt);
return {
downloadUrl: `https://storage.computearbitrage.com/${objectKey}?expires=${expiresAt}&sig=${signature}`,
expiresAt,
};
}
private async checkQuota(): Promise<boolean> {
const quota = await this.env.ROUTING_KV.get(`workload:${this.workloadId}:storage:quota`);
const used = await this.env.ROUTING_KV.get(`workload:${this.workloadId}:storage:used`);
return parseInt(used || '0') < parseInt(quota || '10737418240');
}
}
Retention Policy
When a workload is terminated, storage objects are retained based on the configured retention period:
// storage/src/retention-worker.ts
export async function processStorageRetention(
env: Env,
workloadId: string,
retentionDays: number
): Promise<void> {
const expiresAt = Date.now() + (retentionDays * 24 * 60 * 60 * 1000);
// Mark all objects with expiration timestamp
await env.DB.prepare(`
UPDATE workload_storage_objects
SET expires_at = ?
WHERE workload_id = ? AND expires_at IS NULL
`).bind(expiresAt, workloadId).run();
// Schedule cleanup job
await env.EVENTS_QUEUE.send({
type: 'STORAGE_CLEANUP_SCHEDULED',
workloadId,
expiresAt,
scheduledAt: Date.now(),
});
}
// Cron job to clean up expired objects
export async function cleanupExpiredStorage(env: Env): Promise<void> {
const now = Date.now();
// Find expired objects
const expired = await env.DB.prepare(`
SELECT workload_id, object_key
FROM workload_storage_objects
WHERE expires_at IS NOT NULL AND expires_at < ?
LIMIT 1000
`).bind(now).all();
// Delete from R2
for (const obj of expired.results) {
const objectKey = `workloads/${obj.workload_id}/${obj.object_key}`;
await env.WORKLOAD_STORAGE.delete(objectKey);
}
// Remove from database
await env.DB.prepare(`
DELETE FROM workload_storage_objects
WHERE expires_at IS NOT NULL AND expires_at < ?
`).bind(now).run();
}