Revert "Cache the scripts on disk, in attempt to increase perfrmance, may need to revisit this"

This reverts commit 4a335dc936.
This commit is contained in:
Peter Stockings
2025-07-26 21:53:31 +10:00
parent 4a335dc936
commit 2fbace641d
5 changed files with 77 additions and 95 deletions

View File

@@ -1,114 +1,106 @@
// isolator/deno_server.ts
import { serve } from "https://deno.land/std@0.201.0/http/server.ts";
import { crypto } from "https://deno.land/std@0.201.0/crypto/mod.ts";
import { ensureDir } from "https://deno.land/std@0.201.0/fs/ensure_dir.ts";
const TIMEOUT_MS = 5_000;
const States = Object.freeze({
TIMEOUT: "TIMEOUT",
});
const CACHE_DIR = "./.cache";
await ensureDir(CACHE_DIR);
// Create a pool of workers.
const workerPool: { worker: Worker; inUse: boolean }[] = [];
// Allow the number of workers to be configured via an environment variable.
const numWorkers =
parseInt(Deno.env.get("NUM_WORKERS") || "0") || navigator.hardwareConcurrency;
for (let i = 0; i < numWorkers; i++) {
const worker = new Worker(new URL("./worker.ts", import.meta.url).href, {
type: "module",
deno: {
namespace: true,
permissions: "inherit",
},
});
workerPool.push({ worker, inUse: false });
}
const requestQueue: ((
value:
| { worker: Worker; inUse: boolean }
| PromiseLike<{ worker: Worker; inUse: boolean }>
) => void)[] = [];
function getAvailableWorker() {
return new Promise((resolve) => {
const availableWorker = workerPool.find((w) => !w.inUse);
if (availableWorker) {
availableWorker.inUse = true;
resolve(availableWorker);
} else {
requestQueue.push(resolve);
}
});
}
function releaseWorker(worker) {
if (requestQueue.length > 0) {
const nextRequest = requestQueue.shift();
if (nextRequest) {
nextRequest(worker);
}
} else {
worker.inUse = false;
}
}
async function handler(req: Request): Promise<Response> {
if (req.method !== "POST" || new URL(req.url).pathname !== "/execute") {
return new Response("Not Found", { status: 404 });
}
let worker: Worker | undefined;
const availableWorker = (await getAvailableWorker()) as {
worker: Worker;
inUse: boolean;
};
try {
const body = await req.json();
const {
code = "",
request = {},
environment = {},
name = "userFunc",
} = await req.json();
// Create a content-addressed hash of the code.
const hash = await crypto.subtle.digest(
"SHA-256",
new TextEncoder().encode(code)
);
const hashString = Array.from(new Uint8Array(hash))
.map((b) => b.toString(16).padStart(2, "0"))
.join("");
const filePath = `${CACHE_DIR}/${hashString}.mjs`;
// Write the code to a file if it doesn't already exist.
try {
await Deno.stat(filePath);
} catch (e) {
if (e instanceof Deno.errors.NotFound) {
await Deno.writeTextFile(filePath, code);
} else {
throw e;
}
}
// Create a new worker for each request.
const worker = new Worker(new URL("./worker.ts", import.meta.url).href, {
type: "module",
deno: {
namespace: true,
permissions: "inherit",
},
});
} = body;
const executionPromise = new Promise((resolve, reject) => {
const messageHandler = (e: MessageEvent) => {
const messageHandler = (e) => {
resolve(e.data);
cleanup();
};
const errorHandler = (e: ErrorEvent) => {
const errorHandler = (e) => {
reject(new Error(`Worker error: ${e.message}`));
cleanup();
};
const cleanup = () => {
worker.removeEventListener("message", messageHandler);
worker.removeEventListener("error", errorHandler);
availableWorker.worker.removeEventListener("message", messageHandler);
availableWorker.worker.removeEventListener("error", errorHandler);
};
worker.addEventListener("message", messageHandler);
worker.addEventListener("error", errorHandler);
availableWorker.worker.addEventListener("message", messageHandler);
availableWorker.worker.addEventListener("error", errorHandler);
});
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error("Timeout")), TIMEOUT_MS)
);
worker.postMessage({
filePath: new URL(filePath, import.meta.url).href,
request,
environment,
name,
});
availableWorker.worker.postMessage({ code, request, environment, name });
const startTime = performance.now();
try {
const result: any = await Promise.race([
executionPromise,
timeoutPromise,
]);
// Check if the result from the worker looks like a Response object.
if (
result &&
typeof result === "object" &&
"body" in result &&
"status" in result &&
"headers" in result
) {
return new Response(result.body, {
status: result.status,
headers: result.headers,
});
} else {
// Otherwise, fall back to the old behavior.
return new Response(JSON.stringify(result), {
headers: { "Content-Type": "application/json" },
});
}
const result = await Promise.race([executionPromise, timeoutPromise]);
return new Response(JSON.stringify(result), {
headers: { "Content-Type": "application/json" },
});
} catch (err) {
const executionTime = performance.now() - startTime;
const payload = {
@@ -126,14 +118,12 @@ async function handler(req: Request): Promise<Response> {
} catch (e) {
return new Response(`Bad Request: ${e.message}`, { status: 400 });
} finally {
if (worker) {
worker.terminate(); // Terminate the worker after the request is handled.
}
releaseWorker(availableWorker); // Release the worker.
}
}
const port = parseInt(Deno.env.get("PORT") || "8000");
console.log(`⚡ Deno server ready on :${port}, ready to spawn workers.`);
console.log(`⚡ Deno server ready on :${port} with ${numWorkers} workers.`);
await serve(handler, { port });