Workload Lifecycle Management
6.1 Lifecycle State Machine
┌─────────────────────────────────────────────────────────────────────────────┐
│ WORKLOAD LIFECYCLE STATE MACHINE │
│ │
│ ┌──────────┐ │
│ │ PENDING │ │
│ └────┬─────┘ │
│ │ │
│ [Placement found] │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ ┌───────────│ PROVISIONING │───────────┐ │
│ │ └──────┬───────┘ │ │
│ │ │ │ │
│ [Provision failed] [Instance ready] [Timeout] │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ FAILED │ │ RUNNING │ │ FAILED │ │
│ └──────────┘ └────┬─────┘ └──────────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ │ │ │ │
│ [User terminate] [Spot interrupt] [Health check fail] │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌─────────────┐ ┌──────────┐ │
│ │ STOPPING │ │ INTERRUPTED │ │ UNHEALTHY│ │
│ └────┬─────┘ └──────┬──────┘ └────┬─────┘ │
│ │ │ │ │
│ │ [Cleanup complete] │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ TERMINATED │ │
│ └─────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
6.2 Workload Provisioning Workflow
// orchestrator/src/provision-workflow.ts
export class ProvisionWorkflow extends WorkflowEntrypoint<Env, ProvisionRequest> {
async run(event: WorkflowEvent<ProvisionRequest>, step: WorkflowStep) {
const { customerId, workloadId, requirements, config } = event.payload;
// Step 1: Validate customer quotas
await step.do('validate-quotas', async () => {
const customer = await this.env.DB.prepare(
'SELECT * FROM customers WHERE id = ?'
).bind(customerId).first();
const activeWorkloads = await this.env.DB.prepare(
'SELECT COUNT(*) as count FROM workloads WHERE customer_id = ? AND status = ?'
).bind(customerId, 'RUNNING').first();
if (activeWorkloads.count >= customer.max_concurrent_workloads) {
throw new Error(`Quota exceeded: max ${customer.max_concurrent_workloads} concurrent workloads`);
}
});
// Step 2: Find optimal placement
const placement = await step.do('find-placement', async () => {
const engine = this.env.ARBITRAGE_ENGINE.get(
this.env.ARBITRAGE_ENGINE.idFromName('global')
);
return engine.findOptimalPlacement(requirements);
});
// Step 3: Update status to PROVISIONING
await step.do('update-status-provisioning', async () => {
await this.env.DB.prepare(
'UPDATE workloads SET status = ?, started_at = ? WHERE id = ?'
).bind('PROVISIONING', Date.now(), workloadId).run();
});
// Step 4: Provision cloud resources
const instance = await step.do('provision-cloud-resources', {
retries: { limit: 3, delay: '10 seconds', backoff: 'exponential' },
}, async () => {
const provisioner = new CloudProvisioner(this.env, placement.provider);
return provisioner.createSpotInstance({
region: placement.region,
availabilityZone: placement.availabilityZone,
instanceType: placement.instanceType,
imageId: config.imageId || await this.getDefaultImage(placement.provider),
userData: this.generateUserData(config),
tags: {
'workload-id': workloadId,
'customer-id': customerId,
'managed-by': 'compute-arbitrage',
},
spotOptions: {
maxPrice: placement.pricePerHour * 1.1, // 10% buffer
interruptionBehavior: 'terminate',
},
});
});
// Step 5: Wait for instance to be running
await step.do('wait-for-instance', async () => {
const provisioner = new CloudProvisioner(this.env, placement.provider);
for (let i = 0; i < 60; i++) { // Max 5 minutes
const status = await provisioner.getInstanceStatus(instance.instanceId);
if (status === 'running') {
return;
}
if (status === 'terminated' || status === 'failed') {
throw new Error(`Instance failed to start: ${status}`);
}
await new Promise(resolve => setTimeout(resolve, 5000));
}
throw new Error('Timeout waiting for instance to start');
});
// Step 6: Wait for workload to be ready (health check passes)
await step.do('wait-for-workload-ready', async () => {
const healthEndpoint = `http://${instance.publicIP}:${config.healthCheckPort || 8080}/health`;
for (let i = 0; i < 60; i++) { // Max 5 minutes
try {
const response = await fetch(healthEndpoint);
if (response.ok) {
return;
}
} catch (e) {
// Instance not ready yet
}
await new Promise(resolve => setTimeout(resolve, 5000));
}
throw new Error('Timeout waiting for workload health check');
});
// Step 7: Register routing
await step.do('register-routing', async () => {
// Update KV for fast routing
await this.env.ROUTING_KV.put(
`workload:${workloadId}:endpoint`,
instance.publicIP
);
await this.env.ROUTING_KV.put(
`workload:${workloadId}:status`,
'RUNNING'
);
// Update D1 for consistency
await this.env.DB.prepare(`
INSERT INTO workload_routing (workload_id, endpoint, provider, region, instance_id, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT (workload_id) DO UPDATE SET
endpoint = excluded.endpoint,
provider = excluded.provider,
region = excluded.region,
instance_id = excluded.instance_id,
updated_at = excluded.updated_at
`).bind(
workloadId,
instance.publicIP,
placement.provider,
placement.region,
instance.instanceId,
Date.now()
).run();
// Update workload record
await this.env.DB.prepare(
'UPDATE workloads SET status = ? WHERE id = ?'
).bind('RUNNING', workloadId).run();
// Insert instance record
await this.env.DB.prepare(`
INSERT INTO workload_instances
(id, workload_id, provider, region, instance_type, instance_id, public_ip, status, spot_price, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`).bind(
crypto.randomUUID(),
workloadId,
placement.provider,
placement.region,
placement.instanceType,
instance.instanceId,
instance.publicIP,
'RUNNING',
placement.pricePerHour,
Date.now()
).run();
});
// Step 8: Start health monitoring
await step.do('start-health-monitoring', async () => {
const monitor = this.env.HEALTH_MONITOR.get(
this.env.HEALTH_MONITOR.idFromName(workloadId)
);
await monitor.startMonitoring({
workloadId,
instanceId: instance.instanceId,
provider: placement.provider,
region: placement.region,
endpoint: instance.publicIP,
healthCheckPort: config.healthCheckPort || 8080,
});
});
// Step 9: Notify customer
await step.do('notify-customer', async () => {
await this.env.NOTIFICATION_QUEUE.send({
type: 'WORKLOAD_READY',
customerId,
workloadId,
endpoint: `${workloadId}.workloads.computearbitrage.com`,
provider: placement.provider,
region: placement.region,
instanceType: placement.instanceType,
estimatedCostPerHour: placement.pricePerHour,
});
});
return {
success: true,
workloadId,
endpoint: `${workloadId}.workloads.computearbitrage.com`,
provider: placement.provider,
region: placement.region,
instanceType: placement.instanceType,
pricePerHour: placement.pricePerHour,
};
}
}
6.3 Termination Workflow
// orchestrator/src/terminate-workflow.ts
export class TerminateWorkflow extends WorkflowEntrypoint<Env, TerminateRequest> {
async run(event: WorkflowEvent<TerminateRequest>, step: WorkflowStep) {
const { workloadId, reason } = event.payload;
// Step 1: Get workload details
const workload = await step.do('get-workload', async () => {
const row = await this.env.DB.prepare(`
SELECT w.*, wr.provider, wr.region, wr.instance_id
FROM workloads w
JOIN workload_routing wr ON w.id = wr.workload_id
WHERE w.id = ?
`).bind(workloadId).first();
if (!row) {
throw new Error('Workload not found');
}
return row;
});
// Step 2: Update status to STOPPING
await step.do('update-status-stopping', async () => {
await this.env.DB.prepare(
'UPDATE workloads SET status = ? WHERE id = ?'
).bind('STOPPING', workloadId).run();
await this.env.ROUTING_KV.put(`workload:${workloadId}:status`, 'STOPPING');
});
// Step 3: Stop health monitoring
await step.do('stop-health-monitoring', async () => {
const monitor = this.env.HEALTH_MONITOR.get(
this.env.HEALTH_MONITOR.idFromName(workloadId)
);
await monitor.stopMonitoring();
});
// Step 4: Terminate cloud instance
await step.do('terminate-instance', {
retries: { limit: 3, delay: '5 seconds' },
}, async () => {
const provisioner = new CloudProvisioner(this.env, workload.provider);
await provisioner.terminateInstance(workload.instance_id, workload.region);
});
// Step 5: Remove routing
await step.do('remove-routing', async () => {
await this.env.ROUTING_KV.delete(`workload:${workloadId}:endpoint`);
await this.env.ROUTING_KV.delete(`workload:${workloadId}:status`);
await this.env.DB.prepare(
'DELETE FROM workload_routing WHERE workload_id = ?'
).bind(workloadId).run();
});
// Step 6: Update final status
await step.do('update-final-status', async () => {
await this.env.DB.prepare(`
UPDATE workloads SET status = ?, terminated_at = ?, termination_reason = ?
WHERE id = ?
`).bind('TERMINATED', Date.now(), reason, workloadId).run();
await this.env.DB.prepare(`
UPDATE workload_instances SET status = ?, terminated_at = ?
WHERE workload_id = ? AND status = 'RUNNING'
`).bind('TERMINATED', Date.now(), workloadId).run();
});
// Step 7: Calculate and record billing
await step.do('record-billing', async () => {
const instance = await this.env.DB.prepare(`
SELECT * FROM workload_instances WHERE workload_id = ? ORDER BY created_at DESC LIMIT 1
`).bind(workloadId).first();
const durationHours = (instance.terminated_at - instance.created_at) / 3600000;
const totalCost = durationHours * instance.spot_price;
await this.env.BILLING_QUEUE.send({
customerId: workload.customer_id,
workloadId,
provider: instance.provider,
region: instance.region,
instanceType: instance.instance_type,
durationHours,
spotPricePerHour: instance.spot_price,
totalCost,
});
});
return { success: true, workloadId };
}
}