Core Components
2.1 Routing Layer
The routing layer provides a stable endpoint for users to access their workloads regardless of where infrastructure is running.
HTTP Router (Cloudflare Workers)
// http-router/src/index.ts
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
const workloadId = extractWorkloadId(url, request.headers);
// Authenticate request
const auth = await authenticateRequest(request, env);
if (!auth.valid) {
return new Response('Unauthorized', { status: 401 });
}
// Look up current endpoint for this workload
const endpoint = await env.ROUTING_KV.get(`workload:${workloadId}:endpoint`);
if (!endpoint) {
return new Response('Workload not found', { status: 404 });
}
const targetUrl = new URL(request.url);
targetUrl.hostname = endpoint;
// Forward request with tracing headers
return fetch(new Request(targetUrl, {
method: request.method,
headers: addTracingHeaders(request.headers, workloadId),
body: request.body,
}));
}
};
TCP Router (Spectrum + Durable Objects)
For TCP traffic, we use Cloudflare Spectrum combined with Durable Objects for connection state management.
// tcp-router/src/connection-manager.ts
export class ConnectionManager extends DurableObject {
private connections: Map<string, ConnectionState> = new Map();
async handleConnection(connectionId: string, workloadId: string): Promise<string> {
// Get current backend endpoint
const endpoint = await this.env.ROUTING_KV.get(`workload:${workloadId}:endpoint`);
// Track active connection
this.connections.set(connectionId, {
workloadId,
backend: endpoint,
connectedAt: Date.now(),
});
// Store connection mapping in SQLite for durability
await this.ctx.storage.sql.exec(
`INSERT INTO connections (id, workload_id, backend, connected_at)
VALUES (?, ?, ?, ?)`,
connectionId, workloadId, endpoint, Date.now()
);
return endpoint;
}
async handleDisconnect(connectionId: string): Promise<void> {
this.connections.delete(connectionId);
await this.ctx.storage.sql.exec(
`DELETE FROM connections WHERE id = ?`,
connectionId
);
}
}
2.2 Price Arbitrage Engine
The arbitrage engine continuously monitors spot prices across all three cloud providers and selects optimal placement.
// arbitrage-engine/src/price-collector.ts
export class PriceCollector extends DurableObject {
private prices: Map<string, SpotPrice[]> = new Map();
private lastUpdate: number = 0;
async alarm(): Promise<void> {
// Collect prices from all providers in parallel
const [awsPrices, azurePrices, gcpPrices] = await Promise.all([
this.collectAWSPrices(),
this.collectAzurePrices(),
this.collectGCPPrices(),
]);
// Normalize and store prices
const normalizedPrices = this.normalizePrices([
...awsPrices,
...azurePrices,
...gcpPrices,
]);
// Store in SQLite for querying
await this.storePrices(normalizedPrices);
// Update KV cache for fast lookups
await this.updatePriceCache(normalizedPrices);
// Schedule next collection (every 60 seconds)
await this.ctx.storage.setAlarm(Date.now() + 60000);
}
async findOptimalPlacement(requirements: WorkloadRequirements): Promise<PlacementDecision> {
// Query available options matching requirements
const candidates = await this.ctx.storage.sql.exec(`
SELECT provider, region, instance_type, price_per_hour, availability_score
FROM spot_prices
WHERE vcpu >= ? AND memory_gb >= ? AND gpu_count >= ?
AND interruption_rate < ?
ORDER BY price_per_hour ASC
LIMIT 10
`, requirements.minCPU, requirements.minMemoryGB,
requirements.gpuCount || 0, requirements.maxInterruptionRate || 0.2);
// Score candidates based on price, reliability, and region preference
return this.scoreCandidates(candidates, requirements);
}
}
2.3 Workload Orchestrator (Cloudflare Workflows)
Long-running workload lifecycle managed via Cloudflare Workflows for durability.
// orchestrator/src/workload-workflow.ts
export class WorkloadProvisioningWorkflow extends WorkflowEntrypoint<Env, WorkloadRequest> {
async run(event: WorkflowEvent<WorkloadRequest>, step: WorkflowStep): Promise<WorkloadResult> {
const { customerId, requirements, workloadConfig } = event.payload;
// Step 1: Find optimal placement
const placement = await step.do('find-optimal-placement', async () => {
const arbitrageEngine = this.env.ARBITRAGE_ENGINE.get(
this.env.ARBITRAGE_ENGINE.idFromName('global')
);
return arbitrageEngine.findOptimalPlacement(requirements);
});
// Step 2: Provision infrastructure via Kivera proxy
const instance = await step.do('provision-instance', async () => {
return this.provisionInstance(placement, workloadConfig);
});
// Step 3: Wait for instance to be ready
await step.do('wait-for-ready', async () => {
return this.waitForInstanceReady(instance, { timeout: '5 minutes' });
});
// Step 4: Deploy workload
const deployment = await step.do('deploy-workload', async () => {
return this.deployWorkload(instance, workloadConfig);
});
// Step 5: Register routing
await step.do('register-routing', async () => {
await this.env.ROUTING_KV.put(
`workload:${event.payload.workloadId}:endpoint`,
instance.publicIP,
{ expirationTtl: 86400 }
);
// Also store in D1 for querying
await this.env.DB.prepare(`
INSERT INTO workload_routing (workload_id, endpoint, provider, region, instance_id)
VALUES (?, ?, ?, ?, ?)
`).bind(
event.payload.workloadId,
instance.publicIP,
placement.provider,
placement.region,
instance.instanceId
).run();
});
// Step 6: Start health monitoring
await step.do('start-monitoring', async () => {
// Schedule health checks via Durable Object alarm
const monitor = this.env.HEALTH_MONITOR.get(
this.env.HEALTH_MONITOR.idFromName(event.payload.workloadId)
);
await monitor.startMonitoring(instance);
});
return {
workloadId: event.payload.workloadId,
endpoint: instance.publicIP,
provider: placement.provider,
region: placement.region,
instanceType: placement.instanceType,
estimatedCostPerHour: placement.pricePerHour,
};
}
private async provisionInstance(
placement: PlacementDecision,
config: WorkloadConfig
): Promise<InstanceDetails> {
// Route through Kivera proxy for policy enforcement
const kiveraEndpoint = this.env.KIVERA_PROXY_ENDPOINT;
switch (placement.provider) {
case 'aws':
return this.provisionAWSSpotInstance(placement, config, kiveraEndpoint);
case 'azure':
return this.provisionAzureSpotVM(placement, config, kiveraEndpoint);
case 'gcp':
return this.provisionGCPPreemptibleVM(placement, config, kiveraEndpoint);
}
}
}
2.4 Health Monitor
Continuous health checking with spot interruption detection.
// health-monitor/src/index.ts
export class HealthMonitor extends DurableObject {
private workloadId: string;
private instance: InstanceDetails;
private consecutiveFailures: number = 0;
async startMonitoring(instance: InstanceDetails): Promise<void> {
this.instance = instance;
await this.ctx.storage.put('instance', instance);
// Start health check alarm (every 30 seconds)
await this.ctx.storage.setAlarm(Date.now() + 30000);
}
async alarm(): Promise<void> {
const instance = await this.ctx.storage.get<InstanceDetails>('instance');
if (!instance) return;
// Check instance health
const healthResult = await this.checkHealth(instance);
if (!healthResult.healthy) {
this.consecutiveFailures++;
// Check if this is a spot interruption
if (healthResult.spotInterruption) {
await this.handleSpotInterruption(instance);
return;
}
// After 3 consecutive failures, mark unhealthy
if (this.consecutiveFailures >= 3) {
await this.markUnhealthy(instance);
}
} else {
this.consecutiveFailures = 0;
}
// Schedule next check
await this.ctx.storage.setAlarm(Date.now() + 30000);
}
private async handleSpotInterruption(instance: InstanceDetails): Promise<void> {
// Notify user via webhook/email
await this.notifyUser({
type: 'SPOT_INTERRUPTION',
workloadId: this.workloadId,
instance: instance,
message: 'Spot instance interrupted by cloud provider',
});
// Update routing to return 503 with retry-after
await this.env.ROUTING_KV.put(
`workload:${this.workloadId}:status`,
'INTERRUPTED',
{ expirationTtl: 3600 }
);
// Clean up resources
await this.initiateCleanup(instance);
}
}