136 lines
3.8 KiB
TypeScript
136 lines
3.8 KiB
TypeScript
// isolator/deno_server.ts
|
|
import { serve } from "https://deno.land/std@0.201.0/http/server.ts";
|
|
|
|
const TIMEOUT_MS = 5_000;
|
|
const States = Object.freeze({
|
|
TIMEOUT: "TIMEOUT",
|
|
});
|
|
|
|
// Create a pool of workers.
|
|
const workerPool: { worker: Worker; inUse: boolean }[] = [];
|
|
// Allow the number of workers to be configured via an environment variable.
|
|
const numWorkers =
|
|
parseInt(Deno.env.get("NUM_WORKERS") || "0") || navigator.hardwareConcurrency;
|
|
|
|
for (let i = 0; i < numWorkers; i++) {
|
|
const worker = new Worker(new URL("./worker.ts", import.meta.url).href, {
|
|
type: "module",
|
|
deno: {
|
|
namespace: true,
|
|
permissions: "inherit",
|
|
},
|
|
});
|
|
workerPool.push({ worker, inUse: false });
|
|
}
|
|
|
|
const requestQueue: ((
|
|
value:
|
|
| { worker: Worker; inUse: boolean }
|
|
| PromiseLike<{ worker: Worker; inUse: boolean }>
|
|
) => void)[] = [];
|
|
|
|
function getAvailableWorker() {
|
|
return new Promise((resolve) => {
|
|
const availableWorker = workerPool.find((w) => !w.inUse);
|
|
if (availableWorker) {
|
|
availableWorker.inUse = true;
|
|
resolve(availableWorker);
|
|
} else {
|
|
requestQueue.push(resolve);
|
|
}
|
|
});
|
|
}
|
|
|
|
function releaseWorker(worker) {
|
|
if (requestQueue.length > 0) {
|
|
const nextRequest = requestQueue.shift();
|
|
if (nextRequest) {
|
|
nextRequest(worker);
|
|
}
|
|
} else {
|
|
worker.inUse = false;
|
|
}
|
|
}
|
|
|
|
async function handler(req: Request): Promise<Response> {
|
|
if (req.method !== "POST" || new URL(req.url).pathname !== "/execute") {
|
|
return new Response("Not Found", { status: 404 });
|
|
}
|
|
|
|
const availableWorker = (await getAvailableWorker()) as {
|
|
worker: Worker;
|
|
inUse: boolean;
|
|
};
|
|
|
|
try {
|
|
const body = await req.json();
|
|
const {
|
|
code = "",
|
|
request = {},
|
|
environment = {},
|
|
name = "userFunc",
|
|
} = body;
|
|
|
|
const executionPromise = new Promise((resolve, reject) => {
|
|
let timeoutId: number;
|
|
|
|
const cleanup = () => {
|
|
availableWorker.worker.removeEventListener("message", messageHandler);
|
|
availableWorker.worker.removeEventListener("error", errorHandler);
|
|
if (timeoutId) clearTimeout(timeoutId);
|
|
};
|
|
|
|
const messageHandler = (e: MessageEvent) => {
|
|
cleanup();
|
|
resolve(e.data);
|
|
};
|
|
|
|
const errorHandler = (e: ErrorEvent) => {
|
|
cleanup();
|
|
reject(new Error(`Worker error: ${e.message}`));
|
|
};
|
|
|
|
timeoutId = setTimeout(() => {
|
|
cleanup();
|
|
reject(new Error("Timeout"));
|
|
}, TIMEOUT_MS);
|
|
|
|
availableWorker.worker.addEventListener("message", messageHandler);
|
|
availableWorker.worker.addEventListener("error", errorHandler);
|
|
|
|
availableWorker.worker.postMessage({ code, request, environment, name });
|
|
});
|
|
|
|
const startTime = performance.now();
|
|
try {
|
|
const result = await executionPromise;
|
|
return new Response(JSON.stringify(result), {
|
|
headers: { "Content-Type": "application/json" },
|
|
});
|
|
} catch (err) {
|
|
const executionTime = performance.now() - startTime;
|
|
const payload = {
|
|
status: err.message === "Timeout" ? States.TIMEOUT : "ERROR",
|
|
result: err.message,
|
|
logs: [],
|
|
environment: environment,
|
|
execution_time: executionTime,
|
|
};
|
|
return new Response(JSON.stringify(payload), {
|
|
status: 500,
|
|
headers: { "Content-Type": "application/json" },
|
|
});
|
|
}
|
|
} catch (e) {
|
|
return new Response(`Bad Request: ${e.message}`, { status: 400 });
|
|
} finally {
|
|
releaseWorker(availableWorker); // Release the worker.
|
|
}
|
|
}
|
|
|
|
const port = parseInt(Deno.env.get("PORT") || "8000");
|
|
|
|
console.log(`⚡ Deno server ready on :${port} with ${numWorkers} workers.`);
|
|
|
|
await serve(handler, { port });
|