From 54ea53c6d8f1697cd8e3f7d1f7cbe590c2a2cd1c Mon Sep 17 00:00:00 2001 From: Lorenzo Mangani Date: Mon, 27 Jan 2025 21:19:28 +0100 Subject: [PATCH] Update kompactor.ts --- kompactor.ts | 200 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 132 insertions(+), 68 deletions(-) diff --git a/kompactor.ts b/kompactor.ts index 4e32df3..578736d 100644 --- a/kompactor.ts +++ b/kompactor.ts @@ -1,19 +1,19 @@ -import { readdir, mkdir, readFile, writeFile } from 'node:fs/promises'; -import { join, dirname, resolve } from 'node:path'; +import { readdir, mkdir, readFile, writeFile, access } from 'node:fs/promises'; +import { join, dirname, resolve, sep, basename } from 'node:path'; import duckdb, { DuckDBInstance, Connection } from '@duckdb/node-api'; class ParquetCompactor { - private snapshotsDir: string; - private outputDir: string; - private dryRun: boolean; - private verbose: boolean; - private instance: DuckDBInstance | null = null; - private connection: Connection | null = null; - private log: (...args: any[]) => void; - - constructor(snapshotsDir: string, outputDir: string, options: { dryRun?: boolean; verbose?: boolean } = {}) { - this.snapshotsDir = snapshotsDir; - this.outputDir = outputDir; + dataDir: string; + hosts: string[]; + dryRun: boolean; + verbose: boolean; + instance: DuckDBInstance | null = null; + connection: Connection | null = null; + log: (...args: any[]) => void; + + constructor(dataDir: string, hosts: string[], options: { dryRun?: boolean; verbose?: boolean } = {}) { + this.dataDir = dataDir; + this.hosts = hosts; this.dryRun = options.dryRun || false; this.verbose = options.verbose || false; @@ -22,7 +22,52 @@ class ParquetCompactor { () => {}; } - private async initializeDuckDB(): Promise { + getHostDir(host: string) { + return join(this.dataDir, host); + } + + getSnapshotsDir(host: string) { + return join(this.getHostDir(host), 'snapshots'); + } + + getDbsDir(host: string) { + return join(this.getHostDir(host), 'dbs'); + } + + async validateDirectories() { + // Check if data directory exists + try { + await access(this.dataDir); + } catch { + throw new Error(`Data directory does not exist: ${this.dataDir}`); + } + + // Check if each host directory exists + for (const host of this.hosts) { + const hostDir = this.getHostDir(host); + try { + await access(hostDir); + } catch { + throw new Error(`Host directory does not exist: ${hostDir}`); + } + + // Check for snapshots directory + try { + await access(this.getSnapshotsDir(host)); + } catch { + throw new Error(`Snapshots directory does not exist: ${this.getSnapshotsDir(host)}`); + } + + // Check for dbs directory + try { + await access(this.getDbsDir(host)); + } catch { + throw new Error(`DBs directory does not exist: ${this.getDbsDir(host)}`); + } + } + } + + async initializeDuckDB() { this.log('Initializing DuckDB instance...'); this.log('DuckDB version:', duckdb.version()); @@ -48,21 +93,11 @@ class ParquetCompactor { this.log('DuckDB extensions initialized successfully'); } - private async findSnapshotFiles(): Promise { - const files = await readdir(this.snapshotsDir); - return files.filter(file => file.endsWith('.info.json')); - } - - private async readSnapshotMetadata(filename: string): Promise { - const content = await readFile(join(this.snapshotsDir, filename), 'utf-8'); - return JSON.parse(content); - } - - private async compactParquetFiles(tableFiles: any[], outputPath: string): Promise { + async compactParquetFiles(host: string, tableFiles: any[], outputPath: string) { if (!this.connection) throw new Error('DuckDB connection not initialized'); - // Convert file paths to absolute paths - const filePaths = tableFiles.map(f => resolve(this.snapshotsDir, f.path)); + // Convert file paths to absolute paths, ensuring proper host directory is used + const filePaths = tableFiles.map(f => join(this.getHostDir(host), f.path)); this.log('Compacting files:', filePaths); this.log('Output path:', outputPath); @@ -84,7 +119,7 @@ class ParquetCompactor { `); } - private async getParquetStats(filePath: string, sourceFiles?: any[]): Promise { + async getParquetStats(filePath: string, sourceFiles?: any[]) { if (!this.connection) throw new Error('DuckDB connection not initialized'); if (this.dryRun && sourceFiles) { @@ -117,11 +152,20 @@ class ParquetCompactor { return stats; } - private async processTableGroup(tableId: number, files: any[]): Promise { + async processTableGroup(host: string, tableId: number, files: any[]) { this.log(`\nProcessing table group ${tableId} with ${files.length} files`); this.log('Files to process:', JSON.stringify(files, null, 2)); - const outputPath = join(this.outputDir, `table_${tableId}_compacted.parquet`); + // Use the first file's path components to build the output path + const firstFile = files[0]; + const baseFileName = basename(firstFile.path); // Using imported basename directly + const firstFilePath = firstFile.path; + + // The output path should exactly match the input path format + const outputPath = join( + this.getDbsDir(host), + ...firstFilePath.split('/').slice(2) // Remove 'my_host/dbs' prefix as it's added by getDbsDir + ); if (this.dryRun) { this.log(`[DRY-RUN] Would create directory: ${dirname(outputPath)}`); @@ -129,21 +173,24 @@ class ParquetCompactor { await mkdir(dirname(outputPath), { recursive: true }); } - await this.compactParquetFiles(files, outputPath); + await this.compactParquetFiles(host, files, outputPath); const stats = await this.getParquetStats(outputPath, files); + // Keep the exact same path format as input + const relativePath = firstFilePath; + return { - id: files[0].id, - path: outputPath, + id: firstFile.id, + path: relativePath, size_bytes: stats.size_bytes, row_count: stats.row_count, - chunk_time: files[0].chunk_time, + chunk_time: firstFile.chunk_time, min_time: stats.min_time, max_time: stats.max_time }; } - private async updateMetadata(metadata: any, tableId: number, compactedFile: any): Promise { + async updateMetadata(metadata: any, tableId: number, compactedFile: any) { this.log('\nUpdating metadata for table:', tableId); this.log('Compacted file info:', JSON.stringify(compactedFile, null, 2)); @@ -179,48 +226,57 @@ class ParquetCompactor { return metadata; } - public async compact(): Promise { + async readSnapshotMetadata(host: string, filename: string) { + const content = await readFile(join(this.getSnapshotsDir(host), filename), 'utf-8'); + return JSON.parse(content); + } + + async compact() { try { + await this.validateDirectories(); await this.initializeDuckDB(); - const snapshotFiles = await this.findSnapshotFiles(); - - for (const snapshotFile of snapshotFiles) { - const metadata = await this.readSnapshotMetadata(snapshotFile); - - for (const [dbId, dbInfo] of metadata.databases) { - for (const [tableId, files] of dbInfo.tables) { - console.log(`Processing table ${tableId} with ${files.length} files...`); - - const compactedFile = await this.processTableGroup(tableId, files); - await this.updateMetadata(metadata, tableId, compactedFile); + + for (const host of this.hosts) { + this.log(`\nProcessing host: ${host}`); + const snapshotFiles = await readdir(this.getSnapshotsDir(host)); + const jsonFiles = snapshotFiles.filter(file => file.endsWith('.info.json')); + + for (const snapshotFile of jsonFiles) { + const metadata = await this.readSnapshotMetadata(host, snapshotFile); + + for (const [dbId, dbInfo] of metadata.databases) { + for (const [tableId, files] of dbInfo.tables) { + console.log(`Processing table ${tableId} with ${files.length} files...`); + + const compactedFile = await this.processTableGroup(host, tableId, files); + await this.updateMetadata(metadata, tableId, compactedFile); + } } - } - const outputMetadataPath = join(this.outputDir, snapshotFile); - if (!this.dryRun) { - await writeFile( - outputMetadataPath, - JSON.stringify(metadata, null, 2) - ); - } else { - this.log(`[DRY-RUN] Would write updated metadata to: ${outputMetadataPath}`); + const outputMetadataPath = join(this.getSnapshotsDir(host), snapshotFile); + if (!this.dryRun) { + await writeFile( + outputMetadataPath, + JSON.stringify(metadata, null, 2) + ); + } else { + this.log(`[DRY-RUN] Would write updated metadata to: ${outputMetadataPath}`); + } } } } finally { - // DuckDB resources will be automatically cleaned up this.connection = null; this.instance = null; } } } -// CLI handling const usage = ` -Usage: bun run parquet-compactor.ts [options] +Usage: bun run kompactor.ts --hosts [options] Arguments: - input-dir Directory containing snapshot metadata and parquet files - output-dir Directory where compacted files will be written + data-dir Root data directory (e.g., /data) + --hosts Comma-separated list of host folders to process (e.g., my_host,other_host) Options: --dry-run Run without making any changes @@ -228,29 +284,37 @@ Options: --help Show this help message Example: - bun run parquet-compactor.ts ./snapshots ./compacted --dry-run --verbose + bun run kompactor.ts /data --hosts my_host --dry-run --verbose + bun run kompactor.ts /data --hosts my_host,other_host `; const args = process.argv.slice(2); -if (args.includes('--help') || args.length < 2) { +if (args.includes('--help') || args.length < 1) { console.log(usage); process.exit(args.includes('--help') ? 0 : 1); } -const inputDir = args[0]; -const outputDir = args[1]; +const dataDir = args[0]; +const hostsIndex = args.indexOf('--hosts'); +if (hostsIndex === -1 || !args[hostsIndex + 1]) { + console.error('Error: --hosts parameter is required'); + console.log(usage); + process.exit(1); +} + +const hosts = args[hostsIndex + 1].split(','); const dryRun = args.includes('--dry-run'); const verbose = args.includes('--verbose'); console.log(`Starting compactor with: -Input directory: ${inputDir} -Output directory: ${outputDir} +Data directory: ${dataDir} +Hosts to process: ${hosts.join(', ')} Dry run: ${dryRun} Verbose: ${verbose} `); -const compactor = new ParquetCompactor(inputDir, outputDir, { dryRun, verbose }); +const compactor = new ParquetCompactor(dataDir, hosts, { dryRun, verbose }); try { await compactor.compact();