Initial commit: Upload Checkpoint project
This commit is contained in:
commit
c0e3781244
32 changed files with 6121 additions and 0 deletions
470
plugins/ipfilter.js
Normal file
470
plugins/ipfilter.js
Normal file
|
|
@ -0,0 +1,470 @@
|
|||
import { registerPlugin, loadConfig, rootDir } from '../index.js';
|
||||
import fs from 'fs';
|
||||
import { dirname, join } from 'path';
|
||||
import { fileURLToPath } from 'url';
|
||||
import maxmind from 'maxmind';
|
||||
import { AhoCorasick } from 'string-dsa';
|
||||
import { getRealIP } from '../utils/network.js';
|
||||
import { createGunzip } from 'zlib';
|
||||
import tarStream from 'tar-stream';
|
||||
import { Buffer } from 'buffer';
|
||||
import * as logs from '../utils/logs.js';
|
||||
import { recordEvent } from './stats.js';
|
||||
|
||||
const cfg = {};
|
||||
await loadConfig('ipfilter', cfg);
|
||||
|
||||
// Map configuration to internal structure
|
||||
const enabled = cfg.Core.Enabled;
|
||||
const accountId = cfg.Core.AccountID || process.env.MAXMIND_ACCOUNT_ID;
|
||||
const licenseKey = cfg.Core.LicenseKey || process.env.MAXMIND_LICENSE_KEY;
|
||||
const dbUpdateInterval = cfg.Core.DBUpdateIntervalHours;
|
||||
|
||||
const ipBlockCacheTTL = cfg.Cache.IPBlockCacheTTLSec * 1000;
|
||||
const ipBlockCacheMaxEntries = cfg.Cache.IPBlockCacheMaxEntries;
|
||||
|
||||
const blockedCountryCodes = new Set(cfg.Blocking.CountryCodes);
|
||||
const blockedContinentCodes = new Set(cfg.Blocking.ContinentCodes);
|
||||
const defaultBlockPage = cfg.Blocking.DefaultBlockPage;
|
||||
|
||||
// Process ASN blocks
|
||||
const blockedASNs = {};
|
||||
const asnGroupBlockPages = {};
|
||||
for (const [group, config] of Object.entries(cfg.ASN || {})) {
|
||||
blockedASNs[group] = config.Numbers || [];
|
||||
asnGroupBlockPages[group] = config.BlockPage;
|
||||
}
|
||||
|
||||
// Process ASN name blocks
|
||||
const blockedASNNames = {};
|
||||
for (const [group, config] of Object.entries(cfg.ASNNames || {})) {
|
||||
blockedASNNames[group] = config.Patterns || [];
|
||||
if (config.BlockPage) {
|
||||
asnGroupBlockPages[group] = config.BlockPage;
|
||||
}
|
||||
}
|
||||
|
||||
const countryBlockPages = cfg.CountryBlockPages || {};
|
||||
const continentBlockPages = cfg.ContinentBlockPages || {};
|
||||
|
||||
const ipBlockCache = new Map();
|
||||
|
||||
const blockPageCache = new Map();
|
||||
async function loadBlockPage(filePath) {
|
||||
if (!blockPageCache.has(filePath)) {
|
||||
try {
|
||||
const txt = await fs.promises.readFile(filePath, 'utf8');
|
||||
blockPageCache.set(filePath, txt);
|
||||
} catch {
|
||||
blockPageCache.set(filePath, null);
|
||||
}
|
||||
}
|
||||
return blockPageCache.get(filePath);
|
||||
}
|
||||
|
||||
const __dirname = dirname(fileURLToPath(import.meta.url));
|
||||
|
||||
const geoIPCountryDBPath = join(rootDir, 'data/GeoLite2-Country.mmdb');
|
||||
const geoIPASNDBPath = join(rootDir, 'data/GeoLite2-ASN.mmdb');
|
||||
const updateTimestampPath = join(rootDir, 'data/ipfilter_update.json');
|
||||
|
||||
let geoipCountryReader, geoipASNReader;
|
||||
|
||||
let isReloading = false;
|
||||
let reloadLock = Promise.resolve();
|
||||
|
||||
async function getLastUpdateTimestamp() {
|
||||
try {
|
||||
if (fs.existsSync(updateTimestampPath)) {
|
||||
const data = await fs.promises.readFile(updateTimestampPath, 'utf8');
|
||||
const json = JSON.parse(data);
|
||||
return json.lastUpdated || 0;
|
||||
}
|
||||
} catch (err) {
|
||||
logs.warn('ipfilter', `Failed to read last update timestamp: ${err}`);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
async function saveUpdateTimestamp() {
|
||||
try {
|
||||
const timestamp = Date.now();
|
||||
await fs.promises.writeFile(
|
||||
updateTimestampPath,
|
||||
JSON.stringify({ lastUpdated: timestamp }),
|
||||
'utf8',
|
||||
);
|
||||
return timestamp;
|
||||
} catch (err) {
|
||||
logs.error('ipfilter', `Failed to save update timestamp: ${err}`);
|
||||
return Date.now();
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the update timestamp file exists on first run
|
||||
if (!fs.existsSync(updateTimestampPath)) {
|
||||
try {
|
||||
await saveUpdateTimestamp();
|
||||
} catch (err) {
|
||||
logs.error('ipfilter', `Failed to initialize update timestamp file: ${err}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Download GeoIP databases if missing
|
||||
async function downloadGeoIPDatabases() {
|
||||
if (!licenseKey || !accountId) {
|
||||
logs.warn(
|
||||
'ipfilter',
|
||||
'No MaxMind credentials found; skipping GeoIP database download. Please set MAXMIND_ACCOUNT_ID and MAXMIND_LICENSE_KEY environment variables or add AccountID and LicenseKey to config/ipfilter.toml',
|
||||
);
|
||||
return;
|
||||
}
|
||||
const editions = [
|
||||
{ id: 'GeoLite2-Country', filePath: geoIPCountryDBPath },
|
||||
{ id: 'GeoLite2-ASN', filePath: geoIPASNDBPath },
|
||||
];
|
||||
for (const { id, filePath } of editions) {
|
||||
if (!fs.existsSync(filePath)) {
|
||||
logs.plugin('ipfilter', `Downloading ${id} database...`);
|
||||
const url = `https://download.maxmind.com/app/geoip_download?edition_id=${id}&license_key=${licenseKey}&suffix=tar.gz`;
|
||||
const res = await fetch(url);
|
||||
if (!res.ok) {
|
||||
logs.error(
|
||||
'ipfilter',
|
||||
`Failed to download ${id} database: ${res.status} ${res.statusText}`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
const tempTar = join(rootDir, 'data', `${id}.tar.gz`);
|
||||
// write response body into a .tar.gz file
|
||||
const arrayBuf = await res.arrayBuffer();
|
||||
await fs.promises.writeFile(tempTar, Buffer.from(arrayBuf));
|
||||
// extract .mmdb files from the downloaded tar.gz
|
||||
const extract = tarStream.extract();
|
||||
extract.on('entry', (header, stream, next) => {
|
||||
if (header.name.endsWith('.mmdb')) {
|
||||
const filename = header.name.split('/').pop();
|
||||
const outPath = join(rootDir, 'data', filename);
|
||||
const ws = fs.createWriteStream(outPath);
|
||||
stream
|
||||
.pipe(ws)
|
||||
.on('finish', next)
|
||||
.on('error', (err) => {
|
||||
logs.error('ipfilter', `Extraction error: ${err}`);
|
||||
next();
|
||||
});
|
||||
} else {
|
||||
stream.resume();
|
||||
next();
|
||||
}
|
||||
});
|
||||
await new Promise((resolve, reject) => {
|
||||
fs.createReadStream(tempTar)
|
||||
.pipe(createGunzip())
|
||||
.pipe(extract)
|
||||
.on('finish', resolve)
|
||||
.on('error', reject);
|
||||
});
|
||||
await fs.promises.unlink(tempTar);
|
||||
logs.plugin('ipfilter', `${id} database downloaded and extracted.`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await downloadGeoIPDatabases();
|
||||
|
||||
async function loadGeoDatabases() {
|
||||
if (isReloading) {
|
||||
await reloadLock;
|
||||
return true;
|
||||
}
|
||||
|
||||
isReloading = true;
|
||||
let lockResolve;
|
||||
reloadLock = new Promise((resolve) => {
|
||||
lockResolve = resolve;
|
||||
});
|
||||
|
||||
try {
|
||||
const countryStats = fs.statSync(geoIPCountryDBPath);
|
||||
const asnStats = fs.statSync(geoIPASNDBPath);
|
||||
|
||||
if (countryStats.size > 1024 && asnStats.size > 1024) {
|
||||
logs.plugin('ipfilter', 'Initializing GeoIP databases from disk...');
|
||||
const newCountryReader = await maxmind.open(geoIPCountryDBPath);
|
||||
const newASNReader = await maxmind.open(geoIPASNDBPath);
|
||||
|
||||
try {
|
||||
const testIP = '8.8.8.8';
|
||||
const countryTest = newCountryReader.get(testIP);
|
||||
const asnTest = newASNReader.get(testIP);
|
||||
|
||||
if (!countryTest || !asnTest) {
|
||||
throw new Error('Database validation failed: test lookups returned empty results');
|
||||
}
|
||||
} catch (validationErr) {
|
||||
logs.error('ipfilter', `GeoIP database validation failed: ${validationErr}`);
|
||||
|
||||
try {
|
||||
await newCountryReader.close();
|
||||
} catch (e) {}
|
||||
try {
|
||||
await newASNReader.close();
|
||||
} catch (e) {}
|
||||
throw new Error('Database validation failed');
|
||||
}
|
||||
|
||||
const oldCountryReader = geoipCountryReader;
|
||||
const oldASNReader = geoipASNReader;
|
||||
|
||||
geoipCountryReader = newCountryReader;
|
||||
geoipASNReader = newASNReader;
|
||||
if (oldCountryReader || oldASNReader) {
|
||||
logs.plugin('ipfilter', 'GeoIP databases reloaded and active');
|
||||
} else {
|
||||
logs.plugin('ipfilter', 'GeoIP databases loaded and active');
|
||||
}
|
||||
|
||||
ipBlockCache.clear();
|
||||
|
||||
await saveUpdateTimestamp();
|
||||
|
||||
if (oldCountryReader || oldASNReader) {
|
||||
setTimeout(async () => {
|
||||
if (oldCountryReader) {
|
||||
try {
|
||||
await oldCountryReader.close();
|
||||
} catch (e) {}
|
||||
}
|
||||
if (oldASNReader) {
|
||||
try {
|
||||
await oldASNReader.close();
|
||||
} catch (e) {}
|
||||
}
|
||||
logs.plugin('ipfilter', 'Old GeoIP database instances closed successfully');
|
||||
}, 5000);
|
||||
}
|
||||
|
||||
return true;
|
||||
} else {
|
||||
logs.warn(
|
||||
'ipfilter',
|
||||
'GeoIP database files are empty or too small. IP filtering will be disabled.',
|
||||
);
|
||||
return false;
|
||||
}
|
||||
} catch (err) {
|
||||
logs.error('ipfilter', `Failed to load GeoIP databases: ${err}`);
|
||||
return false;
|
||||
} finally {
|
||||
isReloading = false;
|
||||
lockResolve();
|
||||
}
|
||||
}
|
||||
|
||||
async function checkAndUpdateDatabases() {
|
||||
if (isReloading) return false;
|
||||
|
||||
const lastUpdate = await getLastUpdateTimestamp();
|
||||
const now = Date.now();
|
||||
const hoursSinceUpdate = (now - lastUpdate) / (1000 * 60 * 60);
|
||||
|
||||
if (hoursSinceUpdate >= dbUpdateInterval) {
|
||||
logs.plugin(
|
||||
'ipfilter',
|
||||
`GeoIP databases last updated ${hoursSinceUpdate.toFixed(1)} hours ago, reloading...`,
|
||||
);
|
||||
return await loadGeoDatabases();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
function startPeriodicDatabaseUpdates() {
|
||||
// Calculate interval in milliseconds
|
||||
const intervalMs = dbUpdateInterval * 60 * 60 * 1000;
|
||||
|
||||
// Schedule periodic updates
|
||||
setInterval(async () => {
|
||||
try {
|
||||
await checkAndUpdateDatabases();
|
||||
} catch (err) {
|
||||
logs.error('ipfilter', `Failed during periodic database update: ${err}`);
|
||||
}
|
||||
}, intervalMs);
|
||||
|
||||
logs.plugin('ipfilter', `Scheduled GeoIP database updates every ${dbUpdateInterval} hours`);
|
||||
}
|
||||
|
||||
await loadGeoDatabases();
|
||||
|
||||
startPeriodicDatabaseUpdates();
|
||||
|
||||
const asnNameMatchers = new Map();
|
||||
for (const [group, names] of Object.entries(blockedASNNames)) {
|
||||
asnNameMatchers.set(group, new AhoCorasick(names));
|
||||
}
|
||||
|
||||
function cacheAndReturn(ip, blocked, blockType, blockValue, customPage, asnOrgName) {
|
||||
const expiresAt = Date.now() + ipBlockCacheTTL;
|
||||
ipBlockCache.set(ip, { blocked, blockType, blockValue, customPage, asnOrgName, expiresAt });
|
||||
// Enforce maximum cache size
|
||||
if (ipBlockCacheMaxEntries > 0 && ipBlockCache.size > ipBlockCacheMaxEntries) {
|
||||
// Remove the oldest entry (first key in insertion order)
|
||||
const oldestKey = ipBlockCache.keys().next().value;
|
||||
ipBlockCache.delete(oldestKey);
|
||||
}
|
||||
return [blocked, blockType, blockValue, customPage, asnOrgName];
|
||||
}
|
||||
|
||||
function isBlockedIPExtended(ip) {
|
||||
const now = Date.now();
|
||||
const entry = ipBlockCache.get(ip);
|
||||
if (entry) {
|
||||
if (entry.expiresAt > now) {
|
||||
// Refresh recency by re-inserting entry
|
||||
ipBlockCache.delete(ip);
|
||||
ipBlockCache.set(ip, entry);
|
||||
return [entry.blocked, entry.blockType, entry.blockValue, entry.customPage, entry.asnOrgName];
|
||||
} else {
|
||||
// Entry expired, remove it
|
||||
ipBlockCache.delete(ip);
|
||||
}
|
||||
}
|
||||
|
||||
const countryReader = geoipCountryReader;
|
||||
const asnReader = geoipASNReader;
|
||||
|
||||
if (!countryReader || !asnReader) {
|
||||
return [false, '', '', '', ''];
|
||||
}
|
||||
|
||||
let countryInfo;
|
||||
try {
|
||||
countryInfo = countryReader.get(ip);
|
||||
} catch (e) {}
|
||||
if (countryInfo?.country && blockedCountryCodes.has(countryInfo.country.iso_code)) {
|
||||
const page = countryBlockPages[countryInfo.country.iso_code] || defaultBlockPage;
|
||||
return cacheAndReturn(ip, true, 'country', countryInfo.country.iso_code, page, '');
|
||||
}
|
||||
|
||||
if (countryInfo?.continent && blockedContinentCodes.has(countryInfo.continent.code)) {
|
||||
const page = continentBlockPages[countryInfo.continent.code] || defaultBlockPage;
|
||||
return cacheAndReturn(ip, true, 'continent', countryInfo.continent.code, page, '');
|
||||
}
|
||||
|
||||
let asnInfo;
|
||||
try {
|
||||
asnInfo = asnReader.get(ip);
|
||||
} catch (e) {}
|
||||
if (asnInfo?.autonomous_system_number) {
|
||||
const asn = asnInfo.autonomous_system_number;
|
||||
const orgName = asnInfo.autonomous_system_organization || '';
|
||||
|
||||
for (const [group, arr] of Object.entries(blockedASNs)) {
|
||||
if (arr.includes(asn)) {
|
||||
const page = asnGroupBlockPages[group] || defaultBlockPage;
|
||||
return cacheAndReturn(ip, true, 'asn', group, page, orgName);
|
||||
}
|
||||
}
|
||||
|
||||
for (const [group, matcher] of asnNameMatchers.entries()) {
|
||||
const matches = matcher.find(orgName);
|
||||
if (matches.length) {
|
||||
const page = asnGroupBlockPages[group] || defaultBlockPage;
|
||||
return cacheAndReturn(ip, true, 'asn', group, page, orgName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return cacheAndReturn(ip, false, '', '', '', '');
|
||||
}
|
||||
|
||||
function IPBlockMiddleware() {
|
||||
return async (request, server) => {
|
||||
const clientIP = getRealIP(request, server);
|
||||
logs.plugin('ipfilter', `Incoming request from IP: ${clientIP}`);
|
||||
const [blocked, blockType, blockValue, customPage, asnOrgName] = isBlockedIPExtended(clientIP);
|
||||
|
||||
if (blocked) {
|
||||
recordEvent('ipfilter.block', {
|
||||
type: blockType,
|
||||
value: blockValue,
|
||||
asn_org: asnOrgName,
|
||||
ip: clientIP, // Include the IP address for stats
|
||||
});
|
||||
const url = new URL(request.url);
|
||||
|
||||
if (url.pathname.startsWith('/api')) {
|
||||
return new Response(
|
||||
JSON.stringify({
|
||||
error: 'Access denied from your location or network.',
|
||||
reason: 'geoip',
|
||||
type: blockType,
|
||||
value: blockValue,
|
||||
asn_org: asnOrgName,
|
||||
}),
|
||||
{
|
||||
status: 403,
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Normalize page paths by stripping leading slash
|
||||
const cleanCustomPage = customPage.replace(/^\/+/, '');
|
||||
const cleanDefaultPage = defaultBlockPage.replace(/^\/+/, '');
|
||||
|
||||
let html = '';
|
||||
logs.plugin(
|
||||
'ipfilter',
|
||||
`Block pages: custom="${cleanCustomPage}", default="${cleanDefaultPage}"`,
|
||||
);
|
||||
logs.plugin('ipfilter', 'Searching for block page in the following locations:');
|
||||
const paths = [
|
||||
// allow absolute paths relative to project root first
|
||||
join(rootDir, cleanCustomPage),
|
||||
];
|
||||
// Fallback to default block page if custom page isn't found
|
||||
if (customPage !== defaultBlockPage) {
|
||||
paths.push(
|
||||
// check default page at root directory
|
||||
join(rootDir, cleanDefaultPage),
|
||||
);
|
||||
}
|
||||
|
||||
for (const p of paths) {
|
||||
logs.plugin('ipfilter', `Trying block page at: ${p}`);
|
||||
const content = await loadBlockPage(p);
|
||||
logs.plugin('ipfilter', `Load result for ${p}: ${content ? 'FOUND' : 'NOT FOUND'}`);
|
||||
if (content) {
|
||||
html = content;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (html) {
|
||||
const output = html.replace('{{.ASNName}}', asnOrgName || 'Blocked Network');
|
||||
return new Response(output, {
|
||||
status: 403,
|
||||
headers: { 'Content-Type': 'text/html; charset=utf-8' },
|
||||
});
|
||||
} else {
|
||||
return new Response('Access denied from your location or network.', {
|
||||
status: 403,
|
||||
headers: { 'Content-Type': 'text/plain' },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return undefined;
|
||||
};
|
||||
}
|
||||
|
||||
if (enabled) {
|
||||
registerPlugin('ipfilter', IPBlockMiddleware());
|
||||
} else {
|
||||
logs.plugin('ipfilter', 'IP filter plugin disabled via config');
|
||||
}
|
||||
|
||||
export { checkAndUpdateDatabases, loadGeoDatabases };
|
||||
303
plugins/proxy.js
Normal file
303
plugins/proxy.js
Normal file
|
|
@ -0,0 +1,303 @@
|
|||
import { registerPlugin, loadConfig } from '../index.js';
|
||||
import * as logs from '../utils/logs.js';
|
||||
|
||||
const proxyConfig = {};
|
||||
await loadConfig('proxy', proxyConfig);
|
||||
|
||||
// Map configuration to internal structure
|
||||
const enabled = proxyConfig.Core.Enabled;
|
||||
const wsTimeout = proxyConfig.Timeouts.WebSocketTimeoutMs;
|
||||
const upstreamTimeout = proxyConfig.Timeouts.UpstreamTimeoutMs;
|
||||
|
||||
// Build proxy mappings from array format
|
||||
const proxyMappings = {};
|
||||
proxyConfig.Mapping.forEach((mapping) => {
|
||||
proxyMappings[mapping.Host] = mapping.Target;
|
||||
});
|
||||
|
||||
logs.plugin('proxy', `Proxy mappings loaded: ${JSON.stringify(proxyMappings)}`);
|
||||
|
||||
const HOP_BY_HOP_HEADERS = [
|
||||
'connection',
|
||||
'keep-alive',
|
||||
'proxy-authenticate',
|
||||
'proxy-authorization',
|
||||
'te',
|
||||
'trailer',
|
||||
'transfer-encoding',
|
||||
'upgrade',
|
||||
];
|
||||
|
||||
// Connect to upstream WebSocket with handshake timeout
|
||||
async function connectUpstreamWebSocket(url, headers) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const ws = new WebSocket(url, { headers });
|
||||
const timer = setTimeout(() => {
|
||||
ws.close();
|
||||
reject(new Error('timeout'));
|
||||
}, wsTimeout);
|
||||
ws.onopen = () => {
|
||||
clearTimeout(timer);
|
||||
resolve(ws);
|
||||
};
|
||||
ws.onerror = (err) => {
|
||||
clearTimeout(timer);
|
||||
reject(err);
|
||||
};
|
||||
ws.onclose = () => {
|
||||
clearTimeout(timer);
|
||||
reject(new Error('closed'));
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
async function createProxyResponse(targetURL, request) {
|
||||
try {
|
||||
const url = new URL(request.url);
|
||||
const targetPathAndQuery = url.pathname + url.search;
|
||||
const fullTargetURL = new URL(targetPathAndQuery, targetURL).toString();
|
||||
|
||||
const startTime = Date.now();
|
||||
|
||||
const outgoingHeaders = new Headers(request.headers);
|
||||
outgoingHeaders.delete('host');
|
||||
|
||||
// Set proper host header for the target
|
||||
const targetHost = new URL(targetURL).host;
|
||||
outgoingHeaders.set('host', targetHost);
|
||||
|
||||
// Forward the original host as X-Forwarded-Host for applications that need it
|
||||
outgoingHeaders.set('x-forwarded-host', request.headers.get('host'));
|
||||
outgoingHeaders.set('x-forwarded-proto', url.protocol.replace(':', ''));
|
||||
|
||||
// Preserve important headers for authentication
|
||||
// Don't delete content-length or transfer-encoding here, handle them properly below
|
||||
const options = {
|
||||
method: request.method,
|
||||
headers: outgoingHeaders,
|
||||
// Follow redirects automatically for GET; forward redirects for non-GET
|
||||
// Absolute requirement: DONT REMOVE
|
||||
redirect: request.method === 'GET' ? 'follow' : 'manual',
|
||||
credentials: 'include',
|
||||
};
|
||||
|
||||
const isChunked = request.headers.get('transfer-encoding')?.toLowerCase() === 'chunked';
|
||||
|
||||
// Define methods that can legitimately have request bodies
|
||||
const methodsWithBody = new Set(['POST', 'PUT', 'PATCH', 'DELETE']);
|
||||
|
||||
if (methodsWithBody.has(request.method) && request.body) {
|
||||
if (isChunked) {
|
||||
logs.plugin('proxy', `De-chunking request body for ${request.method} ${request.url}`);
|
||||
try {
|
||||
const bodyBuffer = await request.arrayBuffer();
|
||||
options.body = bodyBuffer;
|
||||
outgoingHeaders.set('content-length', String(bodyBuffer.byteLength));
|
||||
outgoingHeaders.delete('transfer-encoding');
|
||||
} catch (bufferError) {
|
||||
logs.error('proxy', `Error buffering chunked request body: ${bufferError}`);
|
||||
return new Response('Error processing chunked request body', { status: 500 });
|
||||
}
|
||||
} else {
|
||||
// For non-chunked bodies, preserve the body stream
|
||||
options.body = request.body;
|
||||
// Keep the original content-length if it exists
|
||||
if (request.headers.has('content-length')) {
|
||||
outgoingHeaders.set('content-length', request.headers.get('content-length'));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add a timeout controller for the upstream fetch
|
||||
const controller = new AbortController();
|
||||
const timeoutId = setTimeout(() => {
|
||||
logs.warn(
|
||||
'proxy',
|
||||
`Upstream request to ${fullTargetURL} timed out after ${upstreamTimeout}ms`,
|
||||
);
|
||||
controller.abort();
|
||||
}, upstreamTimeout);
|
||||
|
||||
let response;
|
||||
try {
|
||||
response = await fetch(fullTargetURL, {
|
||||
...options,
|
||||
signal: controller.signal,
|
||||
verbose: true,
|
||||
});
|
||||
} catch (fetchErr) {
|
||||
clearTimeout(timeoutId);
|
||||
if (fetchErr.name === 'AbortError') {
|
||||
logs.error('proxy', `Upstream fetch aborted for ${fullTargetURL} (likely due to timeout)`);
|
||||
return new Response('Gateway Timeout', { status: 504 });
|
||||
}
|
||||
throw fetchErr;
|
||||
}
|
||||
clearTimeout(timeoutId);
|
||||
|
||||
const latency = Date.now() - startTime;
|
||||
|
||||
logs.plugin(
|
||||
'proxy',
|
||||
`Proxied request to: ${fullTargetURL} (${response.status} ${response.statusText}) (${latency}ms)`,
|
||||
);
|
||||
|
||||
const responseHeaders = new Headers(response.headers);
|
||||
|
||||
// Remove hop-by-hop headers
|
||||
HOP_BY_HOP_HEADERS.forEach((h) => responseHeaders.delete(h));
|
||||
|
||||
// Remove content-encoding and content-length headers
|
||||
// This is necessary because Bun/fetch automatically decompresses the response body
|
||||
// but leaves the content-encoding header, causing the browser to try to decompress already decompressed content
|
||||
responseHeaders.delete('content-encoding');
|
||||
responseHeaders.delete('content-length');
|
||||
|
||||
// Add proxy information
|
||||
responseHeaders.set('X-Proxy-Latency', `${latency}ms`);
|
||||
|
||||
// Handle Set-Cookie headers - rewrite domain if needed
|
||||
const setCookieHeaders = response.headers.getSetCookie ? response.headers.getSetCookie() : [];
|
||||
if (setCookieHeaders.length > 0) {
|
||||
responseHeaders.delete('set-cookie');
|
||||
|
||||
setCookieHeaders.forEach((cookieStr) => {
|
||||
// Parse and potentially rewrite the cookie domain
|
||||
let modifiedCookie = cookieStr;
|
||||
|
||||
// Remove domain restrictions that might prevent the cookie from working
|
||||
modifiedCookie = modifiedCookie.replace(/;\s*domain=[^;]*/gi, '');
|
||||
|
||||
// If the cookie has SameSite=None, ensure it also has Secure
|
||||
if (modifiedCookie.match(/samesite\s*=\s*none/i) && !modifiedCookie.match(/secure/i)) {
|
||||
modifiedCookie += '; Secure';
|
||||
}
|
||||
|
||||
// For local development, you might need to adjust SameSite
|
||||
if (url.protocol === 'http:' && modifiedCookie.match(/samesite\s*=\s*none/i)) {
|
||||
modifiedCookie = modifiedCookie.replace(/;\s*samesite=[^;]*/gi, '; SameSite=Lax');
|
||||
modifiedCookie = modifiedCookie.replace(/;\s*secure/gi, '');
|
||||
}
|
||||
|
||||
responseHeaders.append('set-cookie', modifiedCookie);
|
||||
});
|
||||
}
|
||||
|
||||
return new Response(response.body, {
|
||||
status: response.status,
|
||||
statusText: response.statusText,
|
||||
headers: responseHeaders,
|
||||
});
|
||||
} catch (err) {
|
||||
logs.error(
|
||||
'proxy',
|
||||
`Proxy error processing ${request.method} ${request.url}: ${err.message}${
|
||||
err.cause ? ' - Cause: ' + err.cause : ''
|
||||
}`,
|
||||
);
|
||||
let causeDetails = '';
|
||||
if (err.cause) {
|
||||
causeDetails = typeof err.cause === 'object' ? JSON.stringify(err.cause) : String(err.cause);
|
||||
}
|
||||
logs.error(
|
||||
'proxy',
|
||||
`Full error details: ${err.stack}${err.cause ? '\nCause: ' + causeDetails : ''}`,
|
||||
);
|
||||
return new Response('Bad Gateway', { status: 502 });
|
||||
}
|
||||
}
|
||||
|
||||
function proxyMiddleware() {
|
||||
return async (request, server) => {
|
||||
const url = new URL(request.url);
|
||||
const path = url.pathname;
|
||||
|
||||
// Skip checkpoint endpoints
|
||||
if (path.startsWith('/api/challenge') || path.startsWith('/api/verify')) return undefined;
|
||||
|
||||
// Skip static assets
|
||||
if (path.startsWith('/webfont/') || path.startsWith('/js/')) return undefined;
|
||||
|
||||
// Get the hostname from the request
|
||||
const hostname = request.headers.get('host')?.split(':')[0];
|
||||
const target = proxyMappings[hostname];
|
||||
if (!target) return undefined;
|
||||
|
||||
// Handle WebSocket upgrade requests
|
||||
const upgradeHeader = request.headers.get('upgrade')?.toLowerCase();
|
||||
if (upgradeHeader === 'websocket') {
|
||||
const targetUrl = new URL(url.pathname + url.search, target);
|
||||
targetUrl.protocol = targetUrl.protocol.replace(/^http/, 'ws');
|
||||
|
||||
// Forward important headers for WebSocket
|
||||
const wsHeaders = new Headers();
|
||||
if (request.headers.has('cookie')) wsHeaders.set('Cookie', request.headers.get('cookie'));
|
||||
if (request.headers.has('authorization'))
|
||||
wsHeaders.set('Authorization', request.headers.get('authorization'));
|
||||
if (request.headers.has('origin')) wsHeaders.set('Origin', request.headers.get('origin'));
|
||||
if (request.headers.has('sec-websocket-protocol'))
|
||||
wsHeaders.set('Sec-WebSocket-Protocol', request.headers.get('sec-websocket-protocol'));
|
||||
if (request.headers.has('sec-websocket-extensions'))
|
||||
wsHeaders.set('Sec-WebSocket-Extensions', request.headers.get('sec-websocket-extensions'));
|
||||
|
||||
let upstream;
|
||||
try {
|
||||
// Convert Headers object to a plain object for the WebSocket constructor
|
||||
const plainWsHeaders = {};
|
||||
for (const [key, value] of wsHeaders) {
|
||||
plainWsHeaders[key] = value;
|
||||
}
|
||||
upstream = await connectUpstreamWebSocket(targetUrl.toString(), plainWsHeaders);
|
||||
} catch (err) {
|
||||
logs.error('proxy', `Upstream WebSocket connection failed: ${err}`);
|
||||
return new Response('Bad Gateway', { status: 502 });
|
||||
}
|
||||
// Upgrade incoming client connection and attach upstream socket
|
||||
const ok = server.upgrade(request, { data: { upstream } });
|
||||
if (!ok) {
|
||||
logs.error('proxy', 'WebSocket upgrade failed');
|
||||
upstream.close();
|
||||
return new Response('Bad Gateway', { status: 502 });
|
||||
}
|
||||
logs.plugin('proxy', `WebSocket proxied to: ${targetUrl.toString()}`);
|
||||
return;
|
||||
}
|
||||
|
||||
return createProxyResponse(target, request);
|
||||
};
|
||||
}
|
||||
|
||||
// WebSocket handlers for proxying messages between client and upstream
|
||||
export const proxyWebSocketHandler = {
|
||||
open(ws) {
|
||||
const upstream = ws.data.upstream;
|
||||
upstream.onopen = () => logs.plugin('proxy', 'Upstream WebSocket connected');
|
||||
// Forward messages from target to client
|
||||
upstream.onmessage = (event) => ws.send(event.data);
|
||||
upstream.onerror = (err) => {
|
||||
logs.error('proxy', `Upstream WebSocket error: ${err}`);
|
||||
ws.close(1011, 'Upstream error');
|
||||
};
|
||||
upstream.onclose = ({ code, reason }) => ws.close(code, reason);
|
||||
},
|
||||
message(ws, message) {
|
||||
const upstream = ws.data.upstream;
|
||||
// Forward messages from client to target
|
||||
upstream.send(message);
|
||||
},
|
||||
close(ws, code, reason) {
|
||||
const upstream = ws.data.upstream;
|
||||
upstream.close(code, reason);
|
||||
},
|
||||
error(ws, err) {
|
||||
logs.error('proxy', `WebSocket proxy error: ${err}`);
|
||||
const upstream = ws.data.upstream;
|
||||
upstream.close();
|
||||
},
|
||||
};
|
||||
|
||||
if (enabled) {
|
||||
registerPlugin('proxy', proxyMiddleware());
|
||||
} else {
|
||||
logs.plugin('proxy', 'Proxy plugin disabled via config');
|
||||
}
|
||||
132
plugins/stats.js
Normal file
132
plugins/stats.js
Normal file
|
|
@ -0,0 +1,132 @@
|
|||
import { registerPlugin, rootDir, loadConfig } from '../index.js';
|
||||
import { Level } from 'level';
|
||||
import ttl from 'level-ttl';
|
||||
import fs from 'fs/promises';
|
||||
import path from 'path';
|
||||
import { fileURLToPath } from 'url';
|
||||
import { Readable } from 'stream';
|
||||
import cookie from 'cookie';
|
||||
import { getRealIP } from '../utils/network.js';
|
||||
import { parseDuration } from '../utils/time.js';
|
||||
|
||||
// Load stats configuration
|
||||
const statsConfig = {};
|
||||
await loadConfig('stats', statsConfig);
|
||||
|
||||
// Map configuration to internal structure
|
||||
const enabled = statsConfig.Core.Enabled;
|
||||
const statsTTL = parseDuration(statsConfig.Storage.StatsTTL);
|
||||
const statsUIPath = statsConfig.WebUI.StatsUIPath;
|
||||
const statsAPIPath = statsConfig.WebUI.StatsAPIPath;
|
||||
|
||||
// Determine __dirname for ES modules
|
||||
const __dirname = path.dirname(fileURLToPath(import.meta.url));
|
||||
|
||||
/**
|
||||
* Adds createReadStream support to LevelDB instances using async iterator.
|
||||
*/
|
||||
function addReadStreamSupport(dbInstance) {
|
||||
if (!dbInstance.createReadStream) {
|
||||
dbInstance.createReadStream = (opts) =>
|
||||
Readable.from(
|
||||
(async function* () {
|
||||
for await (const [key, value] of dbInstance.iterator(opts)) {
|
||||
yield { key, value };
|
||||
}
|
||||
})(),
|
||||
);
|
||||
}
|
||||
return dbInstance;
|
||||
}
|
||||
|
||||
// Initialize LevelDB for stats under db/stats with TTL and stream support
|
||||
const statsDBPath = path.join(rootDir, 'db', 'stats');
|
||||
await fs.mkdir(statsDBPath, { recursive: true });
|
||||
let rawStatsDB = new Level(statsDBPath, { valueEncoding: 'json' });
|
||||
rawStatsDB = addReadStreamSupport(rawStatsDB);
|
||||
const statsDB = ttl(rawStatsDB, { defaultTTL: statsTTL });
|
||||
addReadStreamSupport(statsDB);
|
||||
|
||||
/**
|
||||
* Record a stat event with a metric name and optional data.
|
||||
* @param {string} metric
|
||||
* @param {object} data
|
||||
*/
|
||||
function recordEvent(metric, data = {}) {
|
||||
// Skip if statsDB is not initialized
|
||||
if (typeof statsDB === 'undefined' || !statsDB || typeof statsDB.put !== 'function') {
|
||||
console.warn(`stats: cannot record "${metric}", statsDB not available`);
|
||||
return;
|
||||
}
|
||||
const timestamp = Date.now();
|
||||
// key includes metric and timestamp and a random suffix to avoid collisions
|
||||
const key = `${metric}:${timestamp}:${Math.random().toString(36).slice(2, 8)}`;
|
||||
try {
|
||||
// Use callback form to avoid promise chaining
|
||||
statsDB.put(key, { timestamp, metric, ...data }, (err) => {
|
||||
if (err) console.error('stats: failed to record event', err);
|
||||
});
|
||||
} catch (err) {
|
||||
console.error('stats: failed to record event', err);
|
||||
}
|
||||
}
|
||||
|
||||
// Handler for serving the stats HTML UI
|
||||
async function handleStatsPage(request) {
|
||||
const url = new URL(request.url);
|
||||
if (url.pathname !== statsUIPath) return undefined;
|
||||
try {
|
||||
const html = await fs.readFile(path.join(__dirname, 'stats.html'), 'utf8');
|
||||
return new Response(html, {
|
||||
status: 200,
|
||||
headers: { 'Content-Type': 'text/html; charset=utf-8' },
|
||||
});
|
||||
} catch (e) {
|
||||
return new Response('Stats UI not found', { status: 404 });
|
||||
}
|
||||
}
|
||||
|
||||
// Handler for stats API
|
||||
async function handleStatsAPI(request) {
|
||||
const url = new URL(request.url);
|
||||
if (url.pathname !== statsAPIPath) return undefined;
|
||||
const metric = url.searchParams.get('metric');
|
||||
const start = parseInt(url.searchParams.get('start') || '0', 10);
|
||||
const end = parseInt(url.searchParams.get('end') || `${Date.now()}`, 10);
|
||||
const result = [];
|
||||
// Iterate over keys for this metric in the time range
|
||||
for await (const [key, value] of statsDB.iterator({
|
||||
gte: `${metric}:${start}`,
|
||||
lte: `${metric}:${end}\uffff`,
|
||||
})) {
|
||||
result.push(value);
|
||||
}
|
||||
return new Response(JSON.stringify(result), {
|
||||
status: 200,
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
});
|
||||
}
|
||||
|
||||
// Middleware for stats plugin
|
||||
function StatsMiddleware() {
|
||||
return async (request) => {
|
||||
// Always serve stats UI and API first, bypassing auth
|
||||
const pageResp = await handleStatsPage(request);
|
||||
if (pageResp) return pageResp;
|
||||
const apiResp = await handleStatsAPI(request);
|
||||
if (apiResp) return apiResp;
|
||||
|
||||
// For any other routes, do not handle
|
||||
return undefined;
|
||||
};
|
||||
}
|
||||
|
||||
// Register the stats plugin
|
||||
if (enabled) {
|
||||
registerPlugin('stats', StatsMiddleware());
|
||||
} else {
|
||||
console.log('Stats plugin disabled via config');
|
||||
}
|
||||
|
||||
// Export recordEvent for other plugins to use
|
||||
export { recordEvent };
|
||||
Loading…
Add table
Add a link
Reference in a new issue