Skip to content

Commit

Permalink
Update kompactor.ts
Browse files Browse the repository at this point in the history
  • Loading branch information
lmangani authored Jan 27, 2025
1 parent 7ef0d33 commit 54ea53c
Showing 1 changed file with 132 additions and 68 deletions.
200 changes: 132 additions & 68 deletions kompactor.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import { readdir, mkdir, readFile, writeFile } from 'node:fs/promises';
import { join, dirname, resolve } from 'node:path';
import { readdir, mkdir, readFile, writeFile, access } from 'node:fs/promises';
import { join, dirname, resolve, sep, basename } from 'node:path';
import duckdb, { DuckDBInstance, Connection } from '@duckdb/node-api';

class ParquetCompactor {
private snapshotsDir: string;
private outputDir: string;
private dryRun: boolean;
private verbose: boolean;
private instance: DuckDBInstance | null = null;
private connection: Connection | null = null;
private log: (...args: any[]) => void;

constructor(snapshotsDir: string, outputDir: string, options: { dryRun?: boolean; verbose?: boolean } = {}) {
this.snapshotsDir = snapshotsDir;
this.outputDir = outputDir;
dataDir: string;
hosts: string[];
dryRun: boolean;
verbose: boolean;
instance: DuckDBInstance | null = null;
connection: Connection | null = null;
log: (...args: any[]) => void;

constructor(dataDir: string, hosts: string[], options: { dryRun?: boolean; verbose?: boolean } = {}) {
this.dataDir = dataDir;
this.hosts = hosts;
this.dryRun = options.dryRun || false;
this.verbose = options.verbose || false;

Expand All @@ -22,7 +22,52 @@ class ParquetCompactor {
() => {};
}

private async initializeDuckDB(): Promise<void> {
getHostDir(host: string) {
return join(this.dataDir, host);
}

getSnapshotsDir(host: string) {
return join(this.getHostDir(host), 'snapshots');
}

getDbsDir(host: string) {
return join(this.getHostDir(host), 'dbs');
}

async validateDirectories() {
// Check if data directory exists
try {
await access(this.dataDir);
} catch {
throw new Error(`Data directory does not exist: ${this.dataDir}`);
}

// Check if each host directory exists
for (const host of this.hosts) {
const hostDir = this.getHostDir(host);
try {
await access(hostDir);
} catch {
throw new Error(`Host directory does not exist: ${hostDir}`);
}

// Check for snapshots directory
try {
await access(this.getSnapshotsDir(host));
} catch {
throw new Error(`Snapshots directory does not exist: ${this.getSnapshotsDir(host)}`);
}

// Check for dbs directory
try {
await access(this.getDbsDir(host));
} catch {
throw new Error(`DBs directory does not exist: ${this.getDbsDir(host)}`);
}
}
}

async initializeDuckDB() {
this.log('Initializing DuckDB instance...');
this.log('DuckDB version:', duckdb.version());

Expand All @@ -48,21 +93,11 @@ class ParquetCompactor {
this.log('DuckDB extensions initialized successfully');
}

private async findSnapshotFiles(): Promise<string[]> {
const files = await readdir(this.snapshotsDir);
return files.filter(file => file.endsWith('.info.json'));
}

private async readSnapshotMetadata(filename: string): Promise<any> {
const content = await readFile(join(this.snapshotsDir, filename), 'utf-8');
return JSON.parse(content);
}

private async compactParquetFiles(tableFiles: any[], outputPath: string): Promise<void> {
async compactParquetFiles(host: string, tableFiles: any[], outputPath: string) {
if (!this.connection) throw new Error('DuckDB connection not initialized');

// Convert file paths to absolute paths
const filePaths = tableFiles.map(f => resolve(this.snapshotsDir, f.path));
// Convert file paths to absolute paths, ensuring proper host directory is used
const filePaths = tableFiles.map(f => join(this.getHostDir(host), f.path));

this.log('Compacting files:', filePaths);
this.log('Output path:', outputPath);
Expand All @@ -84,7 +119,7 @@ class ParquetCompactor {
`);
}

private async getParquetStats(filePath: string, sourceFiles?: any[]): Promise<any> {
async getParquetStats(filePath: string, sourceFiles?: any[]) {
if (!this.connection) throw new Error('DuckDB connection not initialized');

if (this.dryRun && sourceFiles) {
Expand Down Expand Up @@ -117,33 +152,45 @@ class ParquetCompactor {
return stats;
}

private async processTableGroup(tableId: number, files: any[]): Promise<any> {
async processTableGroup(host: string, tableId: number, files: any[]) {
this.log(`\nProcessing table group ${tableId} with ${files.length} files`);
this.log('Files to process:', JSON.stringify(files, null, 2));

const outputPath = join(this.outputDir, `table_${tableId}_compacted.parquet`);
// Use the first file's path components to build the output path
const firstFile = files[0];
const baseFileName = basename(firstFile.path); // Using imported basename directly
const firstFilePath = firstFile.path;

// The output path should exactly match the input path format
const outputPath = join(
this.getDbsDir(host),
...firstFilePath.split('/').slice(2) // Remove 'my_host/dbs' prefix as it's added by getDbsDir
);

if (this.dryRun) {
this.log(`[DRY-RUN] Would create directory: ${dirname(outputPath)}`);
} else {
await mkdir(dirname(outputPath), { recursive: true });
}

await this.compactParquetFiles(files, outputPath);
await this.compactParquetFiles(host, files, outputPath);
const stats = await this.getParquetStats(outputPath, files);

// Keep the exact same path format as input
const relativePath = firstFilePath;

return {
id: files[0].id,
path: outputPath,
id: firstFile.id,
path: relativePath,
size_bytes: stats.size_bytes,
row_count: stats.row_count,
chunk_time: files[0].chunk_time,
chunk_time: firstFile.chunk_time,
min_time: stats.min_time,
max_time: stats.max_time
};
}

private async updateMetadata(metadata: any, tableId: number, compactedFile: any): Promise<any> {
async updateMetadata(metadata: any, tableId: number, compactedFile: any) {
this.log('\nUpdating metadata for table:', tableId);
this.log('Compacted file info:', JSON.stringify(compactedFile, null, 2));

Expand Down Expand Up @@ -179,78 +226,95 @@ class ParquetCompactor {
return metadata;
}

public async compact(): Promise<void> {
async readSnapshotMetadata(host: string, filename: string) {
const content = await readFile(join(this.getSnapshotsDir(host), filename), 'utf-8');
return JSON.parse(content);
}

async compact() {
try {
await this.validateDirectories();
await this.initializeDuckDB();
const snapshotFiles = await this.findSnapshotFiles();

for (const snapshotFile of snapshotFiles) {
const metadata = await this.readSnapshotMetadata(snapshotFile);

for (const [dbId, dbInfo] of metadata.databases) {
for (const [tableId, files] of dbInfo.tables) {
console.log(`Processing table ${tableId} with ${files.length} files...`);

const compactedFile = await this.processTableGroup(tableId, files);
await this.updateMetadata(metadata, tableId, compactedFile);

for (const host of this.hosts) {
this.log(`\nProcessing host: ${host}`);
const snapshotFiles = await readdir(this.getSnapshotsDir(host));
const jsonFiles = snapshotFiles.filter(file => file.endsWith('.info.json'));

for (const snapshotFile of jsonFiles) {
const metadata = await this.readSnapshotMetadata(host, snapshotFile);

for (const [dbId, dbInfo] of metadata.databases) {
for (const [tableId, files] of dbInfo.tables) {
console.log(`Processing table ${tableId} with ${files.length} files...`);

const compactedFile = await this.processTableGroup(host, tableId, files);
await this.updateMetadata(metadata, tableId, compactedFile);
}
}
}

const outputMetadataPath = join(this.outputDir, snapshotFile);
if (!this.dryRun) {
await writeFile(
outputMetadataPath,
JSON.stringify(metadata, null, 2)
);
} else {
this.log(`[DRY-RUN] Would write updated metadata to: ${outputMetadataPath}`);
const outputMetadataPath = join(this.getSnapshotsDir(host), snapshotFile);
if (!this.dryRun) {
await writeFile(
outputMetadataPath,
JSON.stringify(metadata, null, 2)
);
} else {
this.log(`[DRY-RUN] Would write updated metadata to: ${outputMetadataPath}`);
}
}
}
} finally {
// DuckDB resources will be automatically cleaned up
this.connection = null;
this.instance = null;
}
}
}

// CLI handling
const usage = `
Usage: bun run parquet-compactor.ts <input-dir> <output-dir> [options]
Usage: bun run kompactor.ts <data-dir> --hosts <host1,host2,...> [options]
Arguments:
input-dir Directory containing snapshot metadata and parquet files
output-dir Directory where compacted files will be written
data-dir Root data directory (e.g., /data)
--hosts Comma-separated list of host folders to process (e.g., my_host,other_host)
Options:
--dry-run Run without making any changes
--verbose Enable detailed logging
--help Show this help message
Example:
bun run parquet-compactor.ts ./snapshots ./compacted --dry-run --verbose
bun run kompactor.ts /data --hosts my_host --dry-run --verbose
bun run kompactor.ts /data --hosts my_host,other_host
`;

const args = process.argv.slice(2);

if (args.includes('--help') || args.length < 2) {
if (args.includes('--help') || args.length < 1) {
console.log(usage);
process.exit(args.includes('--help') ? 0 : 1);
}

const inputDir = args[0];
const outputDir = args[1];
const dataDir = args[0];
const hostsIndex = args.indexOf('--hosts');
if (hostsIndex === -1 || !args[hostsIndex + 1]) {
console.error('Error: --hosts parameter is required');
console.log(usage);
process.exit(1);
}

const hosts = args[hostsIndex + 1].split(',');
const dryRun = args.includes('--dry-run');
const verbose = args.includes('--verbose');

console.log(`Starting compactor with:
Input directory: ${inputDir}
Output directory: ${outputDir}
Data directory: ${dataDir}
Hosts to process: ${hosts.join(', ')}
Dry run: ${dryRun}
Verbose: ${verbose}
`);

const compactor = new ParquetCompactor(inputDir, outputDir, { dryRun, verbose });
const compactor = new ParquetCompactor(dataDir, hosts, { dryRun, verbose });

try {
await compactor.compact();
Expand Down

0 comments on commit 54ea53c

Please sign in to comment.