Price Arbitrage Engine
5.1 Price Collection Architecture
┌───────────────────────────────────────────────────────────────────────────┐
│ PRICE COLLECTION FLOW │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ AWS Price │ │ Azure Price │ │ GCP Price │ │
│ │ Feed Worker │ │ Feed Worker │ │ Feed Worker │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ │ (CRON: */1 minute) │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Price Aggregator (Durable Object) │ │
│ │ │ │
│ │ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │ │
│ │ │ Normalize │ │ Score │ │ Cache Best │ │ │
│ │ │ Prices │─▶│ Availability │─▶│ Options │ │ │
│ │ └────────────────┘ └────────────────┘ └────────────────┘ │ │
│ │ │ │
│ │ SQLite: spot_prices table │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────┐ │
│ │ KV Cache │ │
│ │ (Fast Lookups) │ │
│ └────────────────┘ │
└───────────────────────────────────────────────────────────────────────────┘
5.2 Price Sources
// price-feeds/src/aws.ts
export async function collectAWSSpotPrices(env: Env): Promise<SpotPrice[]> {
const regions = ['us-east-1', 'us-west-2', 'eu-west-1', 'ap-southeast-1', /* ... */];
const prices: SpotPrice[] = [];
for (const region of regions) {
// Call AWS EC2 DescribeSpotPriceHistory API
const response = await callAWSAPI({
service: 'ec2',
region,
action: 'DescribeSpotPriceHistory',
params: {
ProductDescriptions: ['Linux/UNIX'],
StartTime: new Date(Date.now() - 3600000).toISOString(),
},
credentials: env.AWS_CREDENTIALS,
// Route through Kivera for audit
proxy: env.KIVERA_PROXY_ENDPOINT,
});
for (const item of response.SpotPriceHistory) {
prices.push({
provider: 'aws',
region,
availabilityZone: item.AvailabilityZone,
instanceType: item.InstanceType,
pricePerHour: parseFloat(item.SpotPrice),
timestamp: new Date(item.Timestamp).getTime(),
...getInstanceSpecs(item.InstanceType), // vcpu, memory, gpu
});
}
}
return prices;
}
// price-feeds/src/azure.ts
export async function collectAzureSpotPrices(env: Env): Promise<SpotPrice[]> {
// Azure Retail Prices API
const response = await fetch(
'https://prices.azure.com/api/retail/prices?' +
'$filter=serviceName eq \'Virtual Machines\' and priceType eq \'Consumption\' and contains(skuName, \'Spot\')',
{ headers: { 'Accept': 'application/json' } }
);
const data = await response.json();
return data.Items.map(normalizeAzurePrice);
}
// price-feeds/src/gcp.ts
export async function collectGCPSpotPrices(env: Env): Promise<SpotPrice[]> {
// GCP Cloud Billing Catalog API
const response = await callGCPAPI({
service: 'cloudbilling',
method: 'services.skus.list',
params: { parent: 'services/6F81-5844-456A' }, // Compute Engine
credentials: env.GCP_CREDENTIALS,
proxy: env.KIVERA_PROXY_ENDPOINT,
});
return response.skus
.filter(sku => sku.category.usageType === 'Preemptible')
.map(normalizeGCPPrice);
}
5.3 Optimal Placement Algorithm
// arbitrage-engine/src/placement.ts
interface WorkloadRequirements {
minCPU: number;
minMemoryGB: number;
gpuCount?: number;
gpuType?: 'nvidia-t4' | 'nvidia-a100' | 'nvidia-v100';
preferredRegions?: string[];
maxPricePerHour?: number;
maxInterruptionRate?: number;
networkBandwidthGbps?: number;
}
interface PlacementDecision {
provider: 'aws' | 'azure' | 'gcp';
region: string;
availabilityZone?: string;
instanceType: string;
pricePerHour: number;
score: number;
alternatives: PlacementDecision[];
}
export function findOptimalPlacement(
requirements: WorkloadRequirements,
prices: SpotPrice[]
): PlacementDecision {
// Filter candidates that meet minimum requirements
let candidates = prices.filter(p =>
p.vcpu >= requirements.minCPU &&
p.memoryGB >= requirements.minMemoryGB &&
(!requirements.gpuCount || p.gpuCount >= requirements.gpuCount) &&
(!requirements.gpuType || p.gpuType === requirements.gpuType) &&
(!requirements.maxPricePerHour || p.pricePerHour <= requirements.maxPricePerHour) &&
(!requirements.maxInterruptionRate || p.interruptionRate <= requirements.maxInterruptionRate)
);
// Apply region preference if specified
if (requirements.preferredRegions?.length) {
const preferredCandidates = candidates.filter(p =>
requirements.preferredRegions!.includes(p.region)
);
if (preferredCandidates.length > 0) {
candidates = preferredCandidates;
}
}
// Score candidates
const scored = candidates.map(candidate => ({
...candidate,
score: calculateScore(candidate, requirements),
}));
// Sort by score (higher is better)
scored.sort((a, b) => b.score - a.score);
if (scored.length === 0) {
throw new Error('No suitable instances available for requirements');
}
return {
provider: scored[0].provider,
region: scored[0].region,
availabilityZone: scored[0].availabilityZone,
instanceType: scored[0].instanceType,
pricePerHour: scored[0].pricePerHour,
score: scored[0].score,
alternatives: scored.slice(1, 4), // Top 3 alternatives
};
}
function calculateScore(candidate: SpotPrice, requirements: WorkloadRequirements): number {
// Base score from price (lower price = higher score)
// Normalize to 0-100 scale assuming max price is $10/hr
const priceScore = Math.max(0, 100 - (candidate.pricePerHour * 10));
// Availability score (lower interruption rate = higher score)
const availabilityScore = (1 - (candidate.interruptionRate || 0.1)) * 100;
// Over-provisioning penalty (don't want way more resources than needed)
const cpuEfficiency = Math.min(1, requirements.minCPU / candidate.vcpu);
const memoryEfficiency = Math.min(1, requirements.minMemoryGB / candidate.memoryGB);
const efficiencyScore = ((cpuEfficiency + memoryEfficiency) / 2) * 50;
// Weighted combination
return (priceScore * 0.5) + (availabilityScore * 0.3) + (efficiencyScore * 0.2);
}