Initial commit
This commit is contained in:
25
packages/server/package.json
Normal file
25
packages/server/package.json
Normal file
@@ -0,0 +1,25 @@
|
||||
{
|
||||
"name": "@torrent-client/server",
|
||||
"version": "1.0.0",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"start": "bun src/index.ts",
|
||||
"dev": "bun --watch src/index.ts",
|
||||
"build": "tsc",
|
||||
"test": "bun test"
|
||||
},
|
||||
"dependencies": {
|
||||
"@torrent-client/shared": "*",
|
||||
"express": "^4.18.2",
|
||||
"cors": "^2.8.5"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/express": "^4.17.17",
|
||||
"@types/cors": "^2.8.13",
|
||||
"@types/node": "^18.17.0",
|
||||
"bun-types": "latest",
|
||||
"ts-node": "^10.9.1",
|
||||
"nodemon": "^3.0.1"
|
||||
}
|
||||
}
|
||||
107
packages/server/src/download.integration.test.ts
Normal file
107
packages/server/src/download.integration.test.ts
Normal file
@@ -0,0 +1,107 @@
|
||||
import { expect, test, describe } from "bun:test";
|
||||
import { TorrentSession } from "./engine";
|
||||
import { Bitfield } from "@torrent-client/shared";
|
||||
import crypto from 'node:crypto';
|
||||
import { rmSync, existsSync } from 'node:fs';
|
||||
import { join } from 'node:path';
|
||||
|
||||
describe("Download Integration", () => {
|
||||
const MOCK_HASH = "A18230D43BDA105BE7DEF84CB711859018AAA92C"; // Dummy hash
|
||||
const MAGNET = `magnet:?xt=urn:btih:${MOCK_HASH}&dn=Test`;
|
||||
|
||||
test("Full Handshake and Piece Request Flow", async () => {
|
||||
// 1. Setup Mock Peer Server
|
||||
let receivedHandshake = false;
|
||||
let receivedRequest = false;
|
||||
|
||||
const server = Bun.listen({
|
||||
hostname: "127.0.0.1",
|
||||
port: 6881,
|
||||
socket: {
|
||||
data(s, data) {
|
||||
let buf = data;
|
||||
if (!receivedHandshake && buf.length >= 68 && buf[0] === 19) {
|
||||
receivedHandshake = true;
|
||||
// Send Handshake back
|
||||
const reserved = Buffer.alloc(8);
|
||||
const peerId = Buffer.alloc(20);
|
||||
Buffer.from("-BT0001-TESTPEER").copy(peerId);
|
||||
s.write(Buffer.concat([
|
||||
Buffer.from([19]), Buffer.from("BitTorrent protocol"),
|
||||
reserved, Buffer.from(MOCK_HASH, 'hex'), peerId
|
||||
]));
|
||||
// Send Bitfield (we have all pieces)
|
||||
const bitfield = Buffer.from([255]);
|
||||
const len = Buffer.alloc(4);
|
||||
len.writeUInt32BE(bitfield.length + 1);
|
||||
s.write(Buffer.concat([len, Buffer.from([5]), bitfield]));
|
||||
// Unchoke
|
||||
s.write(Buffer.from([0, 0, 0, 1, 1]));
|
||||
buf = buf.slice(68);
|
||||
}
|
||||
|
||||
while (buf.length >= 4) {
|
||||
const msgLen = buf.readUInt32BE(0);
|
||||
if (msgLen === 0) { buf = buf.slice(4); continue; }
|
||||
if (buf.length < msgLen + 4) break;
|
||||
|
||||
const id = buf[4];
|
||||
if (id === 6) {
|
||||
receivedRequest = true;
|
||||
// Send dummy piece back
|
||||
const index = buf.readUInt32BE(5);
|
||||
const block = Buffer.alloc(16384);
|
||||
const payload = Buffer.alloc(16384 + 8);
|
||||
payload.writeUInt32BE(index, 0);
|
||||
payload.writeUInt32BE(0, 4);
|
||||
block.copy(payload, 8);
|
||||
|
||||
const resLen = Buffer.alloc(4);
|
||||
resLen.writeUInt32BE(payload.length + 1);
|
||||
s.write(Buffer.concat([resLen, Buffer.from([7]), payload]));
|
||||
}
|
||||
buf = buf.slice(msgLen + 4);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// 2. Initialize Session
|
||||
const session = new TorrentSession(MAGNET);
|
||||
|
||||
// Mock metadata discovery for test
|
||||
(session as any).solvedMetadata = true;
|
||||
(session as any).files = [{ name: "test.dat", size: 16384 }];
|
||||
(session as any).pieceLength = 16384;
|
||||
(session as any).totalSize = 16384; // Added this field
|
||||
(session as any).bitfield = new Bitfield(1);
|
||||
(session as any).pieceHashes = crypto.createHash('sha1').update(Buffer.alloc(16384)).digest();
|
||||
(session as any).status = 'ready';
|
||||
(session as any).peers = ["127.0.0.1:6881"];
|
||||
|
||||
// 3. Start Download
|
||||
session.startDownload();
|
||||
|
||||
// 4. Wait for completion or timeout
|
||||
let attempts = 0;
|
||||
while (session.status !== 'completed' && attempts < 20) {
|
||||
await new Promise(r => setTimeout(r, 200));
|
||||
attempts++;
|
||||
}
|
||||
|
||||
// 5. Assertions
|
||||
expect(receivedHandshake).toBe(true);
|
||||
expect(receivedRequest).toBe(true);
|
||||
expect(session.status).toBe('completed');
|
||||
expect(session.progress).toBe(100);
|
||||
|
||||
server.stop();
|
||||
session.destroy();
|
||||
|
||||
// Cleanup
|
||||
const downloadPath = join(process.cwd(), 'downloads', MOCK_HASH);
|
||||
if (existsSync(downloadPath)) {
|
||||
rmSync(downloadPath, { recursive: true, force: true });
|
||||
}
|
||||
}, 10000);
|
||||
});
|
||||
42
packages/server/src/download.real.test.ts
Normal file
42
packages/server/src/download.real.test.ts
Normal file
@@ -0,0 +1,42 @@
|
||||
import { expect, test, describe } from "bun:test";
|
||||
import { TorrentSession } from "./engine";
|
||||
|
||||
describe("Real Swarm Download", () => {
|
||||
// Debian 12.8.0 netinst magnet (highly seeded)
|
||||
const REAL_MAGNET = "magnet:?xt=urn:btih:6a9759bffd5c0af65319979fb7832189f4f3c35d&dn=debian-12.8.0-amd64-netinst.iso";
|
||||
|
||||
test("Discover and download first block from real swarm", async () => {
|
||||
const session = new TorrentSession(REAL_MAGNET);
|
||||
|
||||
console.log("[Test] Waiting for metadata discovery...");
|
||||
let attempts = 0;
|
||||
while (session.status === 'discovering' && attempts < 60) {
|
||||
await new Promise(r => setTimeout(r, 1000));
|
||||
attempts++;
|
||||
}
|
||||
|
||||
expect(session.status).toBe('ready');
|
||||
console.log(`[Test] Metadata discovered! ${session.files[0].name} (${session.files[0].size} bytes)`);
|
||||
|
||||
console.log("[Test] Starting download...");
|
||||
session.startDownload();
|
||||
|
||||
// Wait for at least 1% progress or 1 active connection
|
||||
let downloadAttempts = 0;
|
||||
let foundPeers = false;
|
||||
while (session.progress < 1 && downloadAttempts < 60) {
|
||||
await new Promise(r => setTimeout(r, 1000));
|
||||
if (session.activeConnections > 0) foundPeers = true;
|
||||
if (session.progress > 0) break;
|
||||
downloadAttempts++;
|
||||
}
|
||||
|
||||
console.log(`[Test] Final Status: ${session.status}, Progress: ${session.progress}%, Active: ${session.activeConnections}`);
|
||||
|
||||
expect(foundPeers).toBe(true);
|
||||
// We don't strictly expect progress > 0 because peers might be slow/choked,
|
||||
// but we at least want to see active connections and INTERESTED signals in logs.
|
||||
|
||||
session.destroy();
|
||||
}, 125000); // 2 minute timeout for real network
|
||||
});
|
||||
296
packages/server/src/engine.ts
Normal file
296
packages/server/src/engine.ts
Normal file
@@ -0,0 +1,296 @@
|
||||
import { parseMagnetURI, type MagnetData, Bitfield } from '@torrent-client/shared';
|
||||
import { getPeersFromUDPTracker, fetchMetadataFromPeer, PUBLIC_TRACKERS } from './index';
|
||||
import { StorageManager } from './storage';
|
||||
import { PeerWorker } from './worker';
|
||||
import { PieceReassembler } from './reassembler';
|
||||
import crypto from 'node:crypto';
|
||||
|
||||
export type SessionStatus = 'discovering' | 'ready' | 'downloading' | 'paused' | 'completed' | 'error';
|
||||
|
||||
export interface TorrentFile {
|
||||
name: string;
|
||||
size: number;
|
||||
}
|
||||
|
||||
export class TorrentSession {
|
||||
public hash: string;
|
||||
public magnetData: MagnetData;
|
||||
public status: SessionStatus = 'discovering';
|
||||
public files: TorrentFile[] = [];
|
||||
public peers: string[] = [];
|
||||
public activeConnections: number = 0;
|
||||
public progress: number = 0;
|
||||
public errorMessage: string | null = null;
|
||||
|
||||
private peerSet = new Set<string>();
|
||||
private solvedMetadata = false;
|
||||
private isDestroyed = false;
|
||||
|
||||
private storage?: StorageManager;
|
||||
private bitfield?: Bitfield;
|
||||
private pieceHashes?: Uint8Array;
|
||||
private pieceLength: number = 0;
|
||||
private totalSize: number = 0;
|
||||
private workers: PeerWorker[] = [];
|
||||
private reassemblers: Map<number, PieceReassembler> = new Map();
|
||||
private peerIndex = 0;
|
||||
private MAX_WORKERS = 30;
|
||||
|
||||
constructor(magnetURI: string) {
|
||||
const parsed = parseMagnetURI(magnetURI);
|
||||
if (!parsed?.hash) throw new Error("Invalid magnet link");
|
||||
this.hash = parsed.hash;
|
||||
this.magnetData = parsed;
|
||||
|
||||
this.startDiscovery();
|
||||
}
|
||||
|
||||
private async startDiscovery() {
|
||||
const trackers = [...new Set([...(this.magnetData.tr || []), ...PUBLIC_TRACKERS])].filter(t => t.startsWith("udp:"));
|
||||
console.log(`[Engine] Session ${this.hash}: Starting discovery from ${trackers.length} trackers...`);
|
||||
|
||||
trackers.forEach(async (tracker) => {
|
||||
try {
|
||||
const trackerPeers = await getPeersFromUDPTracker(tracker, this.hash);
|
||||
for (const peer of trackerPeers) {
|
||||
if (this.isDestroyed || this.solvedMetadata || this.peerSet.has(peer)) continue;
|
||||
this.peerSet.add(peer);
|
||||
this.peers = Array.from(this.peerSet);
|
||||
|
||||
if (this.peerSet.size <= 50) {
|
||||
this.tryFetchMetadata(peer);
|
||||
}
|
||||
|
||||
// If we are already downloading, immediately try to use new peers
|
||||
if (this.status === 'downloading') {
|
||||
this.refillWorkers();
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
// Tracker error
|
||||
}
|
||||
});
|
||||
|
||||
setTimeout(() => {
|
||||
if (!this.isDestroyed && this.status === 'discovering' && !this.solvedMetadata) {
|
||||
this.status = 'error';
|
||||
this.errorMessage = "Metadata discovery timed out.";
|
||||
}
|
||||
}, 30000);
|
||||
}
|
||||
|
||||
private async tryFetchMetadata(peer: string) {
|
||||
if (this.isDestroyed || this.solvedMetadata) return;
|
||||
this.activeConnections++;
|
||||
const [host, p] = peer.split(':');
|
||||
try {
|
||||
const res = await fetchMetadataFromPeer(this.hash, host, parseInt(p));
|
||||
if (res && !this.solvedMetadata && !this.isDestroyed) {
|
||||
this.solvedMetadata = true;
|
||||
this.files = res.files || [{ name: res.name, size: res.size }];
|
||||
this.pieceHashes = res.pieces;
|
||||
this.pieceLength = res.pieceLength;
|
||||
this.totalSize = res.size;
|
||||
this.status = 'ready';
|
||||
|
||||
const totalPieces = Math.ceil(res.size / res.pieceLength);
|
||||
this.bitfield = new Bitfield(totalPieces);
|
||||
this.storage = new StorageManager(this.hash, this.files);
|
||||
|
||||
console.log(`[Engine] Session ${this.hash}: Metadata verified. ${totalPieces} pieces total.`);
|
||||
}
|
||||
} catch (e) {
|
||||
// Peer failed
|
||||
} finally {
|
||||
this.activeConnections--;
|
||||
}
|
||||
}
|
||||
|
||||
public startDownload() {
|
||||
if (this.status !== 'ready') return;
|
||||
this.status = 'downloading';
|
||||
console.log(`[Engine] Session ${this.hash}: Initializing swarm download...`);
|
||||
this.refillWorkers();
|
||||
}
|
||||
|
||||
private refillWorkers() {
|
||||
if (this.status !== 'downloading' || this.isDestroyed) return;
|
||||
|
||||
while (this.workers.length < this.MAX_WORKERS && this.peerIndex < this.peers.length) {
|
||||
const peer = this.peers[this.peerIndex++];
|
||||
const [host, p] = peer.split(':');
|
||||
|
||||
const worker = new PeerWorker(
|
||||
this.hash, host, parseInt(p),
|
||||
this.bitfield!.totalPieces,
|
||||
(idx, begin, block) => this.handleBlock(idx, begin, block),
|
||||
() => this.onWorkerReady(worker),
|
||||
() => this.onWorkerClose(worker)
|
||||
);
|
||||
this.workers.push(worker);
|
||||
worker.connect();
|
||||
}
|
||||
}
|
||||
|
||||
private onWorkerReady(worker: PeerWorker) {
|
||||
if (this.status !== 'downloading') return;
|
||||
worker.signalInterest(); // Proactively tell them we want data!
|
||||
this.activeConnections = this.workers.length;
|
||||
this.scheduleWork();
|
||||
}
|
||||
|
||||
private onWorkerClose(worker: PeerWorker) {
|
||||
this.workers = this.workers.filter(w => w !== worker);
|
||||
this.activeConnections = this.workers.length;
|
||||
this.refillWorkers(); // Try to get a new peer
|
||||
}
|
||||
|
||||
private scheduleWork() {
|
||||
if (this.status !== 'downloading' || !this.bitfield) return;
|
||||
|
||||
const allPending = new Set<string>();
|
||||
for (const w of this.workers) {
|
||||
for (const p of w.getPendingRequests()) allPending.add(p);
|
||||
}
|
||||
|
||||
for (const worker of this.workers) {
|
||||
if (worker.getPendingRequests().size >= 5) continue;
|
||||
|
||||
// 1. Prioritize ongoing piece reassembly
|
||||
let foundWork = false;
|
||||
for (const [pieceIdx, reassembler] of this.reassemblers.entries()) {
|
||||
if (worker.hasPiece(pieceIdx)) {
|
||||
const missing = reassembler.getMissingBlocks();
|
||||
for (const begin of missing) {
|
||||
const key = `${pieceIdx}:${begin}`;
|
||||
if (!allPending.has(key) && worker.getPendingRequests().size < 5) {
|
||||
const blockSize = Math.min(16384, reassembler.totalSize - begin);
|
||||
worker.requestBlock(pieceIdx, begin, blockSize);
|
||||
allPending.add(key);
|
||||
foundWork = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (foundWork) continue;
|
||||
|
||||
// 2. Start a new piece
|
||||
for (let i = 0; i < this.bitfield.totalPieces; i++) {
|
||||
if (!this.bitfield.has(i) && !this.reassemblers.has(i) && worker.hasPiece(i)) {
|
||||
const size = (i === this.bitfield.totalPieces - 1)
|
||||
? (this.totalSize % this.pieceLength || this.pieceLength)
|
||||
: this.pieceLength;
|
||||
const re = new PieceReassembler(size);
|
||||
this.reassemblers.set(i, re);
|
||||
|
||||
const missing = re.getMissingBlocks();
|
||||
for (const begin of missing) {
|
||||
const key = `${i}:${begin}`;
|
||||
if (!allPending.has(key) && worker.getPendingRequests().size < 5) {
|
||||
const blockSize = Math.min(16384, size - begin);
|
||||
worker.requestBlock(i, begin, blockSize);
|
||||
allPending.add(key);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private handleBlock(index: number, begin: number, block: Buffer) {
|
||||
if (!this.bitfield || this.bitfield.has(index)) return;
|
||||
const reassembler = this.reassemblers.get(index);
|
||||
if (!reassembler) return;
|
||||
|
||||
if (reassembler.addBlock(begin, block)) {
|
||||
// Piece is complete!
|
||||
const fullPiece = reassembler.getFullPiece();
|
||||
if (!fullPiece) return;
|
||||
|
||||
const expectedHash = this.pieceHashes?.slice(index * 20, (index + 1) * 20);
|
||||
const actualHash = crypto.createHash('sha1').update(fullPiece).digest();
|
||||
|
||||
if (expectedHash && Buffer.compare(actualHash, expectedHash) === 0) {
|
||||
this.bitfield.set(index);
|
||||
this.storage?.writePiece(index, this.pieceLength, fullPiece);
|
||||
this.reassemblers.delete(index);
|
||||
|
||||
const completed = Array.from({ length: this.bitfield.totalPieces }).filter((_, i) => this.bitfield!.has(i)).length;
|
||||
this.progress = Math.floor((completed / this.bitfield.totalPieces) * 100);
|
||||
|
||||
if (this.progress >= 100) {
|
||||
this.status = 'completed';
|
||||
console.log(`[Engine] Torrent ${this.hash} download COMPLETED!`);
|
||||
}
|
||||
} else {
|
||||
// Hash failed, restart piece
|
||||
this.reassemblers.delete(index);
|
||||
}
|
||||
}
|
||||
|
||||
this.scheduleWork();
|
||||
}
|
||||
|
||||
public destroy() {
|
||||
this.isDestroyed = true;
|
||||
this.solvedMetadata = true;
|
||||
this.status = 'paused';
|
||||
this.workers.forEach(w => { /* close socket if possible */ });
|
||||
console.log(`[Engine] Session ${this.hash}: Destroyed.`);
|
||||
}
|
||||
|
||||
public toJSON() {
|
||||
return {
|
||||
hash: this.hash,
|
||||
name: this.magnetData.dn || this.files[0]?.name || this.hash,
|
||||
status: this.status,
|
||||
progress: this.progress,
|
||||
peers: this.peers.length,
|
||||
activeConnections: this.workers.length,
|
||||
files: this.files,
|
||||
errorMessage: this.errorMessage
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export class TorrentManager {
|
||||
private static instance: TorrentManager;
|
||||
private sessions = new Map<string, TorrentSession>();
|
||||
|
||||
private constructor() {}
|
||||
|
||||
public static getInstance(): TorrentManager {
|
||||
if (!TorrentManager.instance) {
|
||||
TorrentManager.instance = new TorrentManager();
|
||||
}
|
||||
return TorrentManager.instance;
|
||||
}
|
||||
|
||||
public addTorrent(magnetURI: string): TorrentSession {
|
||||
const parsed = parseMagnetURI(magnetURI);
|
||||
if (!parsed?.hash) throw new Error("Invalid magnet");
|
||||
|
||||
if (this.sessions.has(parsed.hash)) {
|
||||
return this.sessions.get(parsed.hash)!;
|
||||
}
|
||||
|
||||
const session = new TorrentSession(magnetURI);
|
||||
this.sessions.set(session.hash, session);
|
||||
return session;
|
||||
}
|
||||
|
||||
public getSession(hash: string): TorrentSession | undefined {
|
||||
return this.sessions.get(hash);
|
||||
}
|
||||
|
||||
public getAllSessions(): TorrentSession[] {
|
||||
return Array.from(this.sessions.values());
|
||||
}
|
||||
|
||||
public removeSession(hash: string) {
|
||||
const s = this.sessions.get(hash);
|
||||
if (s) s.destroy();
|
||||
this.sessions.delete(hash);
|
||||
}
|
||||
}
|
||||
317
packages/server/src/index.ts
Normal file
317
packages/server/src/index.ts
Normal file
@@ -0,0 +1,317 @@
|
||||
import { parseMagnetURI, BencodeDecoder, BencodeEncoder, MetadataReassembler } from "@torrent-client/shared";
|
||||
import dgram from "node:dgram";
|
||||
import crypto from "node:crypto";
|
||||
import { join } from "node:path";
|
||||
import { TorrentManager } from "./engine";
|
||||
|
||||
const port = process.env.PORT || 3001;
|
||||
const getCONFIG = () => ({
|
||||
TRACKER_TIMEOUT: parseInt(process.env.TRACKER_TIMEOUT || "3000"),
|
||||
METADATA_TIMEOUT: parseInt(process.env.METADATA_TIMEOUT || "10000"),
|
||||
});
|
||||
|
||||
export const PUBLIC_TRACKERS = [
|
||||
"udp://tracker.opentrackr.org:1337/announce",
|
||||
"udp://tracker.openbittorrent.com:6969/announce",
|
||||
"udp://exodus.desync.com:6969/announce",
|
||||
"udp://open.stealth.si:80/announce",
|
||||
"udp://tracker.torrent.eu.org:451/announce"
|
||||
];
|
||||
|
||||
console.log(`Torrent Engine (Bun) running at http://localhost:${port}`);
|
||||
|
||||
export async function getPeersFromUDPTracker(trackerUrl: string, infoHashHex: string): Promise<string[]> {
|
||||
const url = new URL(trackerUrl);
|
||||
if (url.protocol !== "udp:") return [];
|
||||
|
||||
return new Promise((resolve) => {
|
||||
const client = dgram.createSocket("udp4");
|
||||
const transactionId = crypto.randomBytes(4);
|
||||
const connectionId = Buffer.from("0000041727101980", "hex");
|
||||
|
||||
const connectMsg = Buffer.concat([connectionId, Buffer.from([0, 0, 0, 0]), transactionId]);
|
||||
|
||||
const timeout = setTimeout(() => { client.close(); resolve([]); }, getCONFIG().TRACKER_TIMEOUT);
|
||||
|
||||
client.on("message", (msg) => {
|
||||
const action = msg.readInt32BE(0);
|
||||
const receivedTransactionId = msg.slice(4, 8);
|
||||
if (!transactionId.equals(receivedTransactionId)) return;
|
||||
|
||||
if (action === 0) {
|
||||
const receivedConnectionId = msg.slice(8, 16);
|
||||
const announceMsg = Buffer.concat([
|
||||
receivedConnectionId, Buffer.from([0, 0, 0, 1]), transactionId,
|
||||
Buffer.from(infoHashHex, "hex"), Buffer.from("-BT0001-" + crypto.randomBytes(6).toString("hex")),
|
||||
Buffer.alloc(8), Buffer.alloc(8), Buffer.alloc(8),
|
||||
Buffer.from([0, 0, 0, 2]), Buffer.alloc(4), Buffer.alloc(4),
|
||||
Buffer.from([0xFF, 0xFF, 0xFF, 0xFF]), Buffer.from([0x1B, 0x39])
|
||||
]);
|
||||
client.send(announceMsg, parseInt(url.port), url.hostname);
|
||||
} else if (action === 1) {
|
||||
const peers = [];
|
||||
for (let i = 20; i + 6 <= msg.length; i += 6) {
|
||||
peers.push(`${msg[i]}.${msg[i + 1]}.${msg[i + 2]}.${msg[i + 3]}:${msg.readUInt16BE(i + 4)}`);
|
||||
}
|
||||
clearTimeout(timeout); client.close(); resolve(peers);
|
||||
}
|
||||
});
|
||||
|
||||
client.send(connectMsg, parseInt(url.port), url.hostname, () => {});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Full BEP 9/10 Metadata Retrieval
|
||||
*/
|
||||
export async function fetchMetadataFromPeer(hash: string, host: string, portNum: number): Promise<{ name: string; size: number; pieces: Uint8Array; pieceLength: number; files?: any[] } | null> {
|
||||
return new Promise((resolve) => {
|
||||
let receivedHandshake = false;
|
||||
let utMetadataId: number | null = null;
|
||||
let metadataSize: number | null = null;
|
||||
let reassembler: MetadataReassembler | null = null;
|
||||
let buffer = Buffer.alloc(0);
|
||||
let socketInstance: any = null;
|
||||
|
||||
let isSettled = false;
|
||||
const timer = setTimeout(() => {
|
||||
if (isSettled) return;
|
||||
if (socketInstance) {
|
||||
console.log(`[Metadata] Timeout reaching ${host}:${portNum}`);
|
||||
socketInstance.end();
|
||||
} else {
|
||||
console.log(`[Metadata] Connection attempt to ${host}:${portNum} timed out`);
|
||||
}
|
||||
isSettled = true;
|
||||
resolve(null);
|
||||
}, getCONFIG().METADATA_TIMEOUT);
|
||||
|
||||
const finish = (result: any) => {
|
||||
if (isSettled) return;
|
||||
isSettled = true;
|
||||
clearTimeout(timer);
|
||||
resolve(result);
|
||||
};
|
||||
|
||||
try {
|
||||
const socket = Bun.connect({
|
||||
hostname: host,
|
||||
port: portNum,
|
||||
socket: {
|
||||
open(s: any) {
|
||||
socketInstance = s;
|
||||
console.log(`[Metadata] Socket open to ${host}:${portNum}`);
|
||||
const reserved = Buffer.alloc(8);
|
||||
reserved[5] |= 0x10; // Extension protocol
|
||||
s.write(Buffer.concat([
|
||||
Buffer.from([19]), Buffer.from("BitTorrent protocol"),
|
||||
reserved, Buffer.from(hash, 'hex'), Buffer.from("-BT0001-" + crypto.randomBytes(6).toString("hex"))
|
||||
]));
|
||||
},
|
||||
data(s: any, data: Buffer) {
|
||||
buffer = Buffer.concat([buffer, data]);
|
||||
|
||||
if (!receivedHandshake) {
|
||||
if (buffer.length >= 68 && buffer[0] === 19) {
|
||||
receivedHandshake = true;
|
||||
console.log(`[Metadata] Handshake with ${host} OK`);
|
||||
buffer = buffer.slice(68);
|
||||
// Signal interested + Extended Handshake
|
||||
s.write(Buffer.from([0, 0, 0, 1, 2])); // interested
|
||||
const extHandshake = BencodeEncoder.encode({ m: { ut_metadata: 1 } });
|
||||
const msgLen = Buffer.alloc(4);
|
||||
msgLen.writeUInt32BE(extHandshake.length + 2);
|
||||
s.write(Buffer.concat([msgLen, Buffer.from([20, 0]), extHandshake]));
|
||||
} else return;
|
||||
}
|
||||
|
||||
while (buffer.length >= 4) {
|
||||
const length = buffer.readUInt32BE(0);
|
||||
if (length === 0) {
|
||||
buffer = buffer.slice(4);
|
||||
continue;
|
||||
}
|
||||
if (buffer.length < length + 4) break;
|
||||
|
||||
const msg = buffer.slice(4, length + 4);
|
||||
buffer = buffer.slice(length + 4);
|
||||
|
||||
if (msg[0] === 20) {
|
||||
const extId = msg[1];
|
||||
const payload = msg.slice(2);
|
||||
|
||||
if (extId === 0) {
|
||||
try {
|
||||
const decoded = BencodeDecoder.decode(payload) as any;
|
||||
utMetadataId = decoded.m?.ut_metadata;
|
||||
metadataSize = decoded.metadata_size;
|
||||
console.log(`[Metadata] ${host} metadata_size: ${metadataSize}, ut_metadata: ${utMetadataId}`);
|
||||
|
||||
if (utMetadataId !== undefined && utMetadataId !== null && metadataSize) {
|
||||
reassembler = new MetadataReassembler(metadataSize);
|
||||
console.log(`[Metadata] Requesting piece 0 from ${host}`);
|
||||
const req = BencodeEncoder.encode({ msg_type: 0, piece: 0 });
|
||||
const len = Buffer.alloc(4);
|
||||
len.writeUInt32BE(req.length + 2);
|
||||
s.write(Buffer.concat([len, Buffer.from([20, utMetadataId]), req]));
|
||||
}
|
||||
} catch (e) {
|
||||
console.error(`[Metadata] Bencode error from ${host}`);
|
||||
}
|
||||
} else if (extId === 1 && reassembler) { // Message for OUR ut_metadata assignment
|
||||
try {
|
||||
const decoder = new BencodeDecoder(payload);
|
||||
const dict = decoder.decode() as any;
|
||||
const pieceData = payload.slice(decoder.getOffset());
|
||||
console.log(`[Metadata] Received piece ${dict.piece} from ${host} (${pieceData.length} bytes)`);
|
||||
|
||||
if (dict.msg_type === 1) {
|
||||
const complete = reassembler.addPiece(dict.piece, pieceData);
|
||||
if (complete) {
|
||||
console.log(`[Metadata] Fetched all info from ${host}!`);
|
||||
const fullMetadata = reassembler.getFullMetadata();
|
||||
if (fullMetadata) {
|
||||
const info = BencodeDecoder.decode(fullMetadata) as any;
|
||||
const files = info.files
|
||||
? info.files.map((f: any) => ({
|
||||
name: Array.isArray(f.path)
|
||||
? f.path.map((p: any) => p instanceof Uint8Array ? new TextDecoder().decode(p) : p).join('/')
|
||||
: (f.path instanceof Uint8Array ? new TextDecoder().decode(f.path) : f.path),
|
||||
size: f.length
|
||||
}))
|
||||
: [{
|
||||
name: info.name instanceof Uint8Array ? new TextDecoder().decode(info.name) : info.name,
|
||||
size: info.length
|
||||
}];
|
||||
finish({
|
||||
name: info.name instanceof Uint8Array ? new TextDecoder().decode(info.name) : info.name,
|
||||
size: info.length || files.reduce((acc: number, f: any) => acc + f.size, 0),
|
||||
pieces: info.pieces,
|
||||
pieceLength: info['piece length'],
|
||||
files
|
||||
}); // Resolve before closing to prevent race
|
||||
s.end();
|
||||
}
|
||||
} else {
|
||||
console.log(`[Metadata] Requesting next piece from ${host}`);
|
||||
const req = BencodeEncoder.encode({ msg_type: 0, piece: dict.piece + 1 });
|
||||
const len = Buffer.alloc(4);
|
||||
len.writeUInt32BE(req.length + 2);
|
||||
s.write(Buffer.concat([len, Buffer.from([20, utMetadataId as number]), req]));
|
||||
}
|
||||
} else if (dict.msg_type === 2) {
|
||||
console.log(`[Metadata] Peer ${host} rejected piece ${dict.piece}`);
|
||||
}
|
||||
} catch (e) {
|
||||
console.log(`[Metadata] Error parsing data from ${host}`);
|
||||
}
|
||||
} else {
|
||||
console.log(`[Metadata] Received unknown extension ${extId} from ${host}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
close() { if (!isSettled) { console.log(`[Metadata] Connection closed by ${host}`); finish(null); } },
|
||||
connectError() { if (!isSettled) { console.log(`[Metadata] Connect error to ${host}:${portNum}`); finish(null); } },
|
||||
error(err: any) { if (!isSettled) { console.log(`[Metadata] Socket error from ${host}: ${err}`); finish(null); } }
|
||||
}
|
||||
});
|
||||
} catch (e) {
|
||||
console.log(`[Metadata] Bun.connect failed for ${host}: ${e}`);
|
||||
finish(null);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
export async function handleRequest(req: Request): Promise<Response> {
|
||||
const url = new URL(req.url);
|
||||
const headers = { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type" };
|
||||
|
||||
if (req.method === "OPTIONS") return new Response(null, { headers });
|
||||
|
||||
if (url.pathname === "/api/torrents" && req.method === "GET") {
|
||||
const sessions = TorrentManager.getInstance().getAllSessions().map(s => s.toJSON());
|
||||
return new Response(JSON.stringify(sessions), { headers });
|
||||
}
|
||||
|
||||
if (url.pathname === "/api/torrents" && req.method === "POST") {
|
||||
const { magnetURI } = await req.json();
|
||||
try {
|
||||
const session = TorrentManager.getInstance().addTorrent(magnetURI);
|
||||
return new Response(JSON.stringify(session.toJSON()), { headers });
|
||||
} catch (e: any) {
|
||||
return new Response(JSON.stringify({ error: e.message }), { status: 400, headers });
|
||||
}
|
||||
}
|
||||
|
||||
if (url.pathname.startsWith("/api/torrents/") && url.pathname.endsWith("/start") && req.method === "POST") {
|
||||
const hash = url.pathname.split("/")[3];
|
||||
const session = TorrentManager.getInstance().getSession(hash);
|
||||
if (session) {
|
||||
session.startDownload();
|
||||
return new Response(JSON.stringify({ success: true }), { headers });
|
||||
}
|
||||
return new Response(JSON.stringify({ error: "Session not found" }), { status: 404, headers });
|
||||
}
|
||||
|
||||
if (url.pathname.startsWith("/api/torrents/") && req.method === "DELETE") {
|
||||
const hash = url.pathname.split("/").pop();
|
||||
if (hash) TorrentManager.getInstance().removeSession(hash);
|
||||
return new Response(JSON.stringify({ success: true }), { headers });
|
||||
}
|
||||
|
||||
// File Download Endpoint
|
||||
const downloadMatch = url.pathname.match(/\/api\/torrents\/([^\/]+)\/download\/(.+)/);
|
||||
if (downloadMatch && req.method === "GET") {
|
||||
const hash = downloadMatch[1];
|
||||
const filename = decodeURIComponent(downloadMatch[2]);
|
||||
|
||||
const session = TorrentManager.getInstance().getSession(hash);
|
||||
if (!session) {
|
||||
return new Response(JSON.stringify({ error: "Session not found" }), { status: 404, headers });
|
||||
}
|
||||
|
||||
const filePath = join(process.cwd(), "downloads", hash, filename);
|
||||
const file = Bun.file(filePath);
|
||||
|
||||
if (await file.exists()) {
|
||||
return new Response(file, {
|
||||
headers: {
|
||||
...headers,
|
||||
"Content-Disposition": `attachment; filename="${filename.split('/').pop()}"`
|
||||
}
|
||||
});
|
||||
}
|
||||
return new Response(JSON.stringify({ error: "File not found on disk" }), { status: 404, headers });
|
||||
}
|
||||
|
||||
// Deprecated single-request metadata endpoint (maintained for compatibility)
|
||||
if (url.pathname === "/api/metadata") {
|
||||
const { magnetURI } = await req.json();
|
||||
const session = TorrentManager.getInstance().addTorrent(magnetURI);
|
||||
|
||||
// Return current state (might be discovering)
|
||||
return new Response(JSON.stringify({
|
||||
message: session.status === 'ready' ? "Successfully fetched metadata!" : "Metadata retrieval in progress...",
|
||||
parsed: session.magnetData,
|
||||
peers: session.peers.slice(0, 10),
|
||||
files: session.files
|
||||
}), { headers });
|
||||
}
|
||||
|
||||
const distPath = join(process.cwd(), "packages", "client", "dist");
|
||||
const filePath = url.pathname === "/" ? "index.html" : url.pathname.slice(1);
|
||||
const file = Bun.file(join(distPath, filePath));
|
||||
|
||||
if (await file.exists()) {
|
||||
return new Response(file);
|
||||
}
|
||||
return new Response(Bun.file(join(distPath, "index.html")));
|
||||
}
|
||||
|
||||
if (process.env.NODE_ENV !== "test") {
|
||||
Bun.serve({
|
||||
port: port,
|
||||
fetch: handleRequest
|
||||
});
|
||||
}
|
||||
33
packages/server/src/metadata.multi-tracker.test.ts
Normal file
33
packages/server/src/metadata.multi-tracker.test.ts
Normal file
@@ -0,0 +1,33 @@
|
||||
import { expect, test, describe } from "bun:test";
|
||||
import { handleRequest } from "./index";
|
||||
|
||||
describe("Metadata API (Multi-Tracker)", () => {
|
||||
test("aggregates peers from multiple trackers", async () => {
|
||||
// Magnet with multiple trackers
|
||||
const magnetURI = "magnet:?xt=urn:btih:A18230D43BDA105BE7DEF84CB711859018AAA92D&dn=Snow%20Crash&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=udp%3A%2F%2Fopen.stealth.si%3A80%2Fannounce&tr=udp%3A%2F%2Ftracker.torrent.eu.org%3A451%2Fannounce";
|
||||
|
||||
const req = new Request("http://localhost/api/metadata", {
|
||||
method: "POST",
|
||||
body: JSON.stringify({ magnetURI }),
|
||||
headers: { "Content-Type": "application/json" }
|
||||
});
|
||||
|
||||
console.log("[Test] Querying multiple trackers for aggregation...");
|
||||
const res: Response = await handleRequest(req);
|
||||
expect(res.status).toBe(200);
|
||||
|
||||
const body = await res.json();
|
||||
console.log(`[Test] Total peers found: ${body.peers.length}`);
|
||||
|
||||
// We expect a decent number of unique peers if trackers are responsive
|
||||
// Even if metadata retrieval fails, we want to see peer discovery working
|
||||
expect(body.peers).toBeDefined();
|
||||
|
||||
// Check if duplicates are handled (internal logic verification)
|
||||
const uniquePeers = new Set(body.peers);
|
||||
expect(uniquePeers.size).toBe(body.peers.length);
|
||||
|
||||
// Message should reflect that we found peers
|
||||
expect(body.message).toMatch(/Successfully fetched metadata!|Peers found but metadata retrieval timed out/);
|
||||
}, 30000);
|
||||
});
|
||||
32
packages/server/src/metadata.real.test.ts
Normal file
32
packages/server/src/metadata.real.test.ts
Normal file
@@ -0,0 +1,32 @@
|
||||
import { expect, test, describe } from "bun:test";
|
||||
import { handleRequest } from "./index";
|
||||
|
||||
describe("Metadata API (Real World)", () => {
|
||||
test("successfully fetches metadata for a real magnet link", async () => {
|
||||
// Snow Crash - Neal Stephenson (EPUB) - Very stable and highly seeded
|
||||
const magnetURI = "magnet:?xt=urn:btih:A18230D43BDA105BE7DEF84CB711859018AAA92D&dn=Snow%20Crash&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337";
|
||||
|
||||
const req = new Request("http://localhost/api/metadata", {
|
||||
method: "POST",
|
||||
body: JSON.stringify({ magnetURI }),
|
||||
headers: { "Content-Type": "application/json" }
|
||||
});
|
||||
|
||||
console.log("[Test] Starting real-world metadata fetch (timeout 60s)...");
|
||||
const res: Response = await handleRequest(req);
|
||||
expect(res.status).toBe(200);
|
||||
|
||||
const body = await res.json();
|
||||
console.log("[Test] API Response:", JSON.stringify(body, null, 2));
|
||||
|
||||
expect(body.message).toBe("Successfully fetched metadata!");
|
||||
expect(body.files).toBeDefined();
|
||||
expect(body.files.length).toBeGreaterThan(0);
|
||||
|
||||
// Verify file structure
|
||||
const file = body.files[0];
|
||||
expect(file.name).toBeDefined();
|
||||
expect(file.size).toBeGreaterThan(0);
|
||||
|
||||
}, 60000); // 60s timeout for real network retrieval
|
||||
});
|
||||
31
packages/server/src/metadata.single-file.test.ts
Normal file
31
packages/server/src/metadata.single-file.test.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
import { expect, test, describe } from "bun:test";
|
||||
import { handleRequest } from "./index";
|
||||
|
||||
describe("Metadata API (Single File)", () => {
|
||||
test("successfully fetches metadata for a single-file torrent", async () => {
|
||||
// Ubuntu 22.04.3 Live Server AMD64 ISO - Standard single-file torrent
|
||||
const magnetURI = "magnet:?xt=urn:btih:3e2de7a6d8590bb25b41097fa668045952fcc506&dn=ubuntu-22.04.3-live-server-amd64.iso&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337";
|
||||
|
||||
const req = new Request("http://localhost/api/metadata", {
|
||||
method: "POST",
|
||||
body: JSON.stringify({ magnetURI }),
|
||||
headers: { "Content-Type": "application/json" }
|
||||
});
|
||||
|
||||
console.log("[Test] Starting single-file metadata fetch...");
|
||||
const res: Response = await handleRequest(req);
|
||||
expect(res.status).toBe(200);
|
||||
|
||||
const body = await res.json();
|
||||
console.log("[Test] API Response:", JSON.stringify(body, null, 2));
|
||||
|
||||
// Even if it times out on some networks, we check the logic branch if it succeeds
|
||||
if (body.files.length > 0) {
|
||||
expect(body.files.length).toBe(1);
|
||||
expect(body.files[0].name).toMatch(/ubuntu/i);
|
||||
expect(body.files[0].size).toBeGreaterThan(1000000000); // ~2GB
|
||||
} else {
|
||||
console.log("[Test] Metadata fetch timed out (expected on some restricted networks)");
|
||||
}
|
||||
}, 45000);
|
||||
});
|
||||
70
packages/server/src/metadata.test.ts
Normal file
70
packages/server/src/metadata.test.ts
Normal file
@@ -0,0 +1,70 @@
|
||||
// No global overrides to ensure clean integration test environment
|
||||
|
||||
import { expect, test, describe } from "bun:test";
|
||||
import { handleRequest } from "./index";
|
||||
|
||||
describe("Metadata API", () => {
|
||||
test("returns 400 for invalid magnet URI", async () => {
|
||||
const req = new Request("http://localhost/api/metadata", {
|
||||
method: "POST",
|
||||
body: JSON.stringify({ magnetURI: "invalid-magnet" }),
|
||||
headers: { "Content-Type": "application/json" }
|
||||
});
|
||||
|
||||
const res: Response = await handleRequest(req);
|
||||
expect(res.status).toBe(400);
|
||||
const body = await res.json();
|
||||
expect(body.error).toBe("Invalid magnet");
|
||||
});
|
||||
|
||||
test("returns 'No peers found' for valid magnet with no trackers (simulated)", async () => {
|
||||
// A truly random hash that 100% won't have peers
|
||||
const randomHash = "1234567890abcdef1234567890abcdef" + Math.random().toString(16).slice(2, 10);
|
||||
const validMagnet = `magnet:?xt=urn:btih:${randomHash}&dn=random.iso`;
|
||||
const req = new Request("http://localhost/api/metadata", {
|
||||
method: "POST",
|
||||
body: JSON.stringify({ magnetURI: validMagnet }),
|
||||
headers: { "Content-Type": "application/json" }
|
||||
});
|
||||
|
||||
const res: Response = await handleRequest(req);
|
||||
expect(res.status).toBe(200);
|
||||
const body = await res.json();
|
||||
// Message can be either "No peers found" or "Peers found but metadata retrieval timed out..."
|
||||
expect(body.message).toMatch(/No peers found|Peers found but metadata retrieval timed out|All attempted peers failed/);
|
||||
expect(body.files).toEqual([]);
|
||||
// Verify parsing logic is also invoked
|
||||
expect(body.parsed).toBeDefined();
|
||||
expect(body.parsed.hash).toBeDefined();
|
||||
expect(body.parsed.dn).toBe("random.iso");
|
||||
}, 15000);
|
||||
|
||||
test("successfully parses a complex magnet URI", async () => {
|
||||
const complexMagnet = "magnet:?xt=urn:btih:A18230D43BDA105BE7DEF84CB711859018AAA92D&dn=Snow%20Crash&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&xl=1024";
|
||||
const req = new Request("http://localhost/api/metadata", {
|
||||
method: "POST",
|
||||
body: JSON.stringify({ magnetURI: complexMagnet }),
|
||||
headers: { "Content-Type": "application/json" }
|
||||
});
|
||||
|
||||
const res: Response = await handleRequest(req);
|
||||
const body = await res.json();
|
||||
|
||||
expect(res.status).toBe(200);
|
||||
expect(body.parsed).toBeDefined();
|
||||
expect(body.parsed.hash).toBe("A18230D43BDA105BE7DEF84CB711859018AAA92D");
|
||||
expect(body.parsed.dn).toBe("Snow Crash");
|
||||
expect(body.parsed.tr).toContain("udp://tracker.opentrackr.org:1337");
|
||||
expect(body.parsed.xl).toBe(1024);
|
||||
}, 10000);
|
||||
|
||||
test("returns 200 for OPTIONS request (CORS)", async () => {
|
||||
const req = new Request("http://localhost/api/metadata", {
|
||||
method: "OPTIONS"
|
||||
});
|
||||
|
||||
const res = await handleRequest(req);
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.headers.get("Access-Control-Allow-Origin")).toBe("*");
|
||||
});
|
||||
});
|
||||
37
packages/server/src/metadata.tracker-resilience.test.ts
Normal file
37
packages/server/src/metadata.tracker-resilience.test.ts
Normal file
@@ -0,0 +1,37 @@
|
||||
import { expect, test, describe } from "bun:test";
|
||||
import { handleRequest } from "./index";
|
||||
|
||||
describe("Metadata API (Tracker Resilience)", () => {
|
||||
test("handles timeouts from unresponsive trackers correctly", async () => {
|
||||
// Magnet with one real tracker and two non-existent ones
|
||||
const magnetURI = "magnet:?xt=urn:btih:A18230D43BDA105BE7DEF84CB711859018AAA92D&tr=udp%3A%2F%2F127.0.0.1%3A1%2Fannounce&tr=udp%3A%2F%2F10.255.255.1%3A6969%2Fannounce&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337";
|
||||
|
||||
const req = new Request("http://localhost/api/metadata", {
|
||||
method: "POST",
|
||||
body: JSON.stringify({ magnetURI }),
|
||||
headers: { "Content-Type": "application/json" }
|
||||
});
|
||||
|
||||
console.log("[Test] Querying mix of garbage and real trackers...");
|
||||
const startTime = Date.now();
|
||||
const res: Response = await handleRequest(req);
|
||||
const duration = Date.now() - startTime;
|
||||
|
||||
expect(res.status).toBe(200);
|
||||
const body = await res.json();
|
||||
|
||||
console.log(`[Test] Request completed in ${duration}ms`);
|
||||
console.log(`[Test] Peers found: ${body.peers.length}`);
|
||||
|
||||
// The tracker timeout is 3s. Total time shouldn't exceed the meta-timeout (10s)
|
||||
// significantly unless retrieval is actually happening.
|
||||
expect(duration).toBeLessThan(45000);
|
||||
|
||||
// Even with 2 garbage trackers, we should find peers from opentrackr
|
||||
if (body.peers.length === 0) {
|
||||
console.log("[Test] Warning: No peers found in resilience test. This can happen if opentrackr is down.");
|
||||
} else {
|
||||
expect(body.peers.length).toBeGreaterThan(0);
|
||||
}
|
||||
}, 60000);
|
||||
});
|
||||
38
packages/server/src/reassembler.ts
Normal file
38
packages/server/src/reassembler.ts
Normal file
@@ -0,0 +1,38 @@
|
||||
export class PieceReassembler {
|
||||
private blocks: Map<number, Uint8Array> = new Map();
|
||||
public totalSize: number;
|
||||
private blockSize: number = 16384;
|
||||
|
||||
constructor(totalSize: number) {
|
||||
this.totalSize = totalSize;
|
||||
}
|
||||
|
||||
public addBlock(begin: number, data: Uint8Array): boolean {
|
||||
this.blocks.set(begin, data);
|
||||
return this.isComplete();
|
||||
}
|
||||
|
||||
public isComplete(): boolean {
|
||||
const totalBlocks = Math.ceil(this.totalSize / this.blockSize);
|
||||
return this.blocks.size === totalBlocks;
|
||||
}
|
||||
|
||||
public getFullPiece(): Uint8Array | null {
|
||||
if (!this.isComplete()) return null;
|
||||
const fullData = new Uint8Array(this.totalSize);
|
||||
for (const [begin, data] of this.blocks.entries()) {
|
||||
fullData.set(data, begin);
|
||||
}
|
||||
return fullData;
|
||||
}
|
||||
|
||||
public getMissingBlocks(): number[] {
|
||||
const missing = [];
|
||||
const totalBlocks = Math.ceil(this.totalSize / this.blockSize);
|
||||
for (let i = 0; i < totalBlocks; i++) {
|
||||
const begin = i * this.blockSize;
|
||||
if (!this.blocks.has(begin)) missing.push(begin);
|
||||
}
|
||||
return missing;
|
||||
}
|
||||
}
|
||||
65
packages/server/src/storage.ts
Normal file
65
packages/server/src/storage.ts
Normal file
@@ -0,0 +1,65 @@
|
||||
import { join } from 'node:path';
|
||||
import { mkdirSync, openSync, writeSync, closeSync, existsSync } from 'node:fs';
|
||||
|
||||
export interface FileInfo {
|
||||
name: string;
|
||||
size: number;
|
||||
}
|
||||
|
||||
export class StorageManager {
|
||||
private baseDir: string;
|
||||
private files: { name: string; size: number; offset: number }[] = [];
|
||||
private totalSize: number = 0;
|
||||
|
||||
constructor(hash: string, fileList: FileInfo[]) {
|
||||
this.baseDir = join(process.cwd(), 'downloads', hash);
|
||||
if (!existsSync(this.baseDir)) {
|
||||
mkdirSync(this.baseDir, { recursive: true });
|
||||
}
|
||||
|
||||
let currentOffset = 0;
|
||||
for (const f of fileList) {
|
||||
this.files.push({ ...f, offset: currentOffset });
|
||||
currentOffset += f.size;
|
||||
|
||||
const filePath = join(this.baseDir, f.name);
|
||||
const dirPath = join(filePath, '..');
|
||||
if (!existsSync(dirPath)) {
|
||||
mkdirSync(dirPath, { recursive: true });
|
||||
}
|
||||
|
||||
// Pre-create the file if it doesn't exist
|
||||
if (!existsSync(filePath)) {
|
||||
const fd = openSync(filePath, 'w');
|
||||
closeSync(fd);
|
||||
}
|
||||
}
|
||||
this.totalSize = currentOffset;
|
||||
}
|
||||
|
||||
public writePiece(pieceIndex: number, pieceSize: number, data: Uint8Array) {
|
||||
let pieceOffset = pieceIndex * pieceSize;
|
||||
let bytesRemaining = data.length;
|
||||
let dataOffset = 0;
|
||||
|
||||
for (const file of this.files) {
|
||||
const fileEnd = file.offset + file.size;
|
||||
|
||||
// Check if this piece overlaps with this file
|
||||
if (pieceOffset < fileEnd && (pieceOffset + bytesRemaining) > file.offset) {
|
||||
const fileStartInPiece = Math.max(pieceOffset, file.offset);
|
||||
const fileEndInPiece = Math.min(pieceOffset + bytesRemaining, fileEnd);
|
||||
const writeSize = fileEndInPiece - fileStartInPiece;
|
||||
|
||||
const fileWriteOffset = fileStartInPiece - file.offset;
|
||||
const dataReadOffset = dataOffset + (fileStartInPiece - pieceOffset);
|
||||
|
||||
const filePath = join(this.baseDir, file.name);
|
||||
// Use 'r+' to allow seeking correctly on Windows
|
||||
const fd = openSync(filePath, 'r+');
|
||||
writeSync(fd, data, dataReadOffset, writeSize, fileWriteOffset);
|
||||
closeSync(fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
177
packages/server/src/worker.ts
Normal file
177
packages/server/src/worker.ts
Normal file
@@ -0,0 +1,177 @@
|
||||
import { Bitfield } from '@torrent-client/shared';
|
||||
import crypto from 'node:crypto';
|
||||
|
||||
export class PeerWorker {
|
||||
private socket: any = null;
|
||||
private buffer = Buffer.alloc(0);
|
||||
private handshakeDone = false;
|
||||
private peerBitfield: Bitfield | null = null;
|
||||
private amInterested = false;
|
||||
private peerChoked = true;
|
||||
private activeRequests = 0;
|
||||
|
||||
constructor(
|
||||
private hash: string,
|
||||
private host: string,
|
||||
private port: number,
|
||||
private totalPieces: number,
|
||||
private onPiece: (index: number, begin: number, data: Buffer) => void,
|
||||
private onReady: () => void,
|
||||
private onClose: () => void
|
||||
) {
|
||||
this.peerBitfield = new Bitfield(totalPieces);
|
||||
}
|
||||
|
||||
public connect() {
|
||||
console.log(`[Worker] Connecting to ${this.host}:${this.port}...`);
|
||||
try {
|
||||
this.socket = Bun.connect({
|
||||
hostname: this.host,
|
||||
port: this.port,
|
||||
socket: {
|
||||
open: (s: any) => this.onOpen(s),
|
||||
data: (s: any, d: Buffer) => this.onData(s, d),
|
||||
close: () => {
|
||||
console.log(`[Worker] Connection closed: ${this.host}`);
|
||||
this.onClose();
|
||||
},
|
||||
connectError: () => {
|
||||
console.log(`[Worker] Connect error: ${this.host}`);
|
||||
this.onClose();
|
||||
},
|
||||
error: (e: any) => {
|
||||
console.log(`[Worker] Socket error for ${this.host}:`, e);
|
||||
this.onClose();
|
||||
},
|
||||
}
|
||||
});
|
||||
} catch (e) {
|
||||
this.onClose();
|
||||
}
|
||||
}
|
||||
|
||||
public getHost() {
|
||||
return this.host;
|
||||
}
|
||||
|
||||
private onOpen(s: any) {
|
||||
this.socket = s; // Ensure we use the active socket instance
|
||||
const reserved = Buffer.alloc(8);
|
||||
reserved[5] |= 0x10; // Extension Protocol support (BEP 10)
|
||||
s.write(Buffer.concat([
|
||||
Buffer.from([19]), Buffer.from("BitTorrent protocol"),
|
||||
reserved, Buffer.from(this.hash, 'hex'), Buffer.from("-BT0001-" + crypto.randomBytes(6).toString("hex"))
|
||||
]));
|
||||
}
|
||||
|
||||
private onData(s: any, data: Buffer) {
|
||||
this.socket = s;
|
||||
this.buffer = Buffer.concat([this.buffer, data]);
|
||||
|
||||
if (!this.handshakeDone) {
|
||||
if (this.buffer.length >= 68 && this.buffer[0] === 19) {
|
||||
this.handshakeDone = true;
|
||||
this.buffer = this.buffer.slice(68);
|
||||
console.log(`[Worker] Handshake done with ${this.host}`);
|
||||
this.onReady(); // Trigger scheduler to send INTERESTED
|
||||
} else return;
|
||||
}
|
||||
|
||||
while (this.buffer.length >= 4) {
|
||||
const length = this.buffer.readUInt32BE(0);
|
||||
if (length === 0) { // keep-alive
|
||||
this.buffer = this.buffer.slice(4);
|
||||
continue;
|
||||
}
|
||||
if (this.buffer.length < length + 4) break;
|
||||
|
||||
const msg = this.buffer.slice(4, length + 4);
|
||||
this.buffer = this.buffer.slice(length + 4);
|
||||
|
||||
this.handleMessage(s, msg);
|
||||
}
|
||||
}
|
||||
|
||||
private handleMessage(s: any, msg: Buffer) {
|
||||
this.socket = s;
|
||||
const id = msg[0];
|
||||
const payload = msg.slice(1);
|
||||
// Silent for now unless debugging: console.log(`[Worker] MSG ${id} from ${this.host}`);
|
||||
|
||||
switch (id) {
|
||||
case 0: // choke
|
||||
this.peerChoked = true;
|
||||
break;
|
||||
case 1: // unchoke
|
||||
this.peerChoked = false;
|
||||
this.onReady(); // Ready to request blocks now!
|
||||
break;
|
||||
case 4: // have
|
||||
const haveIdx = payload.readUInt32BE(0);
|
||||
this.peerBitfield?.set(haveIdx);
|
||||
this.onReady();
|
||||
break;
|
||||
case 5: // bitfield
|
||||
if (this.peerBitfield) {
|
||||
console.log(`[Worker] BITFIELD from ${this.host} (${payload.length} bytes)`);
|
||||
this.peerBitfield.fromBuffer(payload);
|
||||
}
|
||||
this.onReady(); // Check for interest
|
||||
break;
|
||||
case 7: // piece
|
||||
const index = payload.readUInt32BE(0);
|
||||
const begin = payload.readUInt32BE(4);
|
||||
const block = payload.slice(8);
|
||||
this.pendingRequests.delete(`${index}:${begin}`);
|
||||
this.onPiece(index, begin, block);
|
||||
this.activeRequests--;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
public signalInterest() {
|
||||
if (!this.socket || this.amInterested) return;
|
||||
console.log(`[Worker] Proactively sending INTERESTED to ${this.host}`);
|
||||
this.socket.write(Buffer.from([0, 0, 0, 1, 2]));
|
||||
this.amInterested = true;
|
||||
}
|
||||
|
||||
private pendingRequests = new Set<string>(); // "index:begin"
|
||||
|
||||
public getPendingRequests() {
|
||||
return this.pendingRequests;
|
||||
}
|
||||
|
||||
public requestBlock(index: number, begin: number, length: number) {
|
||||
if (!this.socket) return;
|
||||
const key = `${index}:${begin}`;
|
||||
if (this.pendingRequests.has(key)) return;
|
||||
|
||||
// Signal interest if not already
|
||||
if (!this.amInterested) {
|
||||
console.log(`[Worker] Sending INTERESTED to ${this.host}`);
|
||||
this.socket.write(Buffer.from([0, 0, 0, 1, 2]));
|
||||
this.amInterested = true;
|
||||
}
|
||||
|
||||
if (this.peerChoked) return; // Wait for unchoke BEFORE sending REQUEST
|
||||
|
||||
if (this.activeRequests < 5) {
|
||||
const req = Buffer.alloc(13);
|
||||
req[0] = 6; // request ID
|
||||
req.writeUInt32BE(index, 1);
|
||||
req.writeUInt32BE(begin, 5);
|
||||
req.writeUInt32BE(length, 9);
|
||||
|
||||
const len = Buffer.alloc(4);
|
||||
len.writeUInt32BE(13);
|
||||
this.socket.write(Buffer.concat([len, req]));
|
||||
this.activeRequests++;
|
||||
this.pendingRequests.add(key);
|
||||
}
|
||||
}
|
||||
|
||||
public hasPiece(index: number) {
|
||||
return this.peerBitfield?.has(index) ?? false;
|
||||
}
|
||||
}
|
||||
25
packages/server/tsconfig.json
Normal file
25
packages/server/tsconfig.json
Normal file
@@ -0,0 +1,25 @@
|
||||
{
|
||||
"compilerOptions": {
|
||||
"target": "ESNext",
|
||||
"module": "ESNext",
|
||||
"moduleResolution": "Node",
|
||||
"esModuleInterop": true,
|
||||
"skipLibCheck": true,
|
||||
"forceConsistentCasingInFileNames": true,
|
||||
"strict": true,
|
||||
"outDir": "dist",
|
||||
"baseUrl": ".",
|
||||
"paths": {
|
||||
"@torrent-client/shared": [
|
||||
"../shared/src"
|
||||
]
|
||||
},
|
||||
"types": [
|
||||
"bun-types"
|
||||
]
|
||||
},
|
||||
"include": [
|
||||
"src/**/*"
|
||||
],
|
||||
"exclude": []
|
||||
}
|
||||
Reference in New Issue
Block a user