Skip to content

Commit

Permalink
gRPC -> HTTP express (#50)
Browse files Browse the repository at this point in the history
* grpc -> express server

Changes:

cleanup

add tests

add tsoa for openapi generation

update example client

typing and bug fixes

* update tests

* pr comments

* remove yarn-error.log

* add test

* migrate to blocks as hex strings

* update docstring
  • Loading branch information
jowparks authored Mar 19, 2024
1 parent ae2a0cf commit ff7ff2d
Show file tree
Hide file tree
Showing 46 changed files with 5,602 additions and 2,303 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ node_modules/
.eslintcache
*-cache*
*testdb*
*yarn-error.log
3 changes: 0 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,5 @@ COPY . .
# Build the app
RUN npm run build

# Make port 50051 available to the outside
EXPOSE 50051

# Define the command to run your app using CMD which defines your runtime
CMD ["npm", "run", "start"]
6 changes: 4 additions & 2 deletions example/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@
"main": "index.js",
"scripts": {
"dev": "ts-node src/index.ts",
"gen-api": "yarn swagger-typescript-api -p ../src/swagger/swagger.json -o src/api",
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "Demo",
"license": "ISC",
"devDependencies": {
"@ironfish/rust-nodejs": "^1.8.0",
"buffer-map": "^0.0.7",
"dotenv": "^16.3.1",
"leveldown": "^6.1.1",
"levelup": "^5.1.1",
"swagger-typescript-api": "^13.0.3",
"ts-node": "^10.9.1",
"typescript": "^5.1.6",
"buffer-map": "^0.0.7"
"typescript": "^5.1.6"
}
}
23 changes: 4 additions & 19 deletions example/src/Client/Client.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
import { ServiceError, credentials } from "@grpc/grpc-js";
import {
LightStreamerClient,
SendResponse,
Transaction,
} from "../../../src/models/lightstreamer";
import { BlockProcessor } from "./utils/BlockProcessor";
import { AccountData, AccountsManager } from "./utils/AccountsManager";
import { BlockCache } from "./utils/BlockCache";
Expand All @@ -15,11 +9,9 @@ import {
} from "@ironfish/rust-nodejs";
import { MerkleWitness, notesTree } from "./utils/MerkleTree";
import { BufferMap } from "buffer-map";
import { Api } from "../api/Api";

const client = new LightStreamerClient(
process.env["WALLET_SERVER_HOST"] ?? "localhost:50051",
credentials.createInsecure(),
);
const api = new Api({ baseUrl: process.env["WALLET_SERVER_HOST"] });

export class Client {
private blockCache: BlockCache;
Expand All @@ -31,7 +23,7 @@ export class Client {
this.blockCache = new BlockCache();
this.accountsManager = new AccountsManager(this.blockCache);
this.blockProcessor = new BlockProcessor(
client,
api,
this.blockCache,
this.accountsManager,
);
Expand Down Expand Up @@ -173,13 +165,6 @@ export class Client {
}

public async sendTransaction(transaction: Buffer) {
return new Promise<[ServiceError | null, SendResponse]>((res) => {
client.sendTransaction(
Transaction.create({ data: transaction }),
(error, result) => {
res([error, result]);
},
);
});
return api.transaction.broadcastTransaction(transaction.toString("hex"));
}
}
16 changes: 10 additions & 6 deletions example/src/Client/utils/AccountsManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ export class AccountsManager {
currentNotePosition++;

this._processNote(
new NoteEncrypted(output.note),
new NoteEncrypted(Buffer.from(output.note, "hex")),
parsedBlock,
tx,
index,
Expand Down Expand Up @@ -247,11 +247,11 @@ export class AccountsManager {
accountId: publicKey,
note: deserializedNote,
spent: false,
transactionHash: tx.hash,
transactionHash: Buffer.from(tx.hash, "hex"),
index,
nullifier,
merkleIndex: position,
blockHash: block.hash,
blockHash: Buffer.from(block.hash, "hex"),
sequence: block.sequence,
});

Expand Down Expand Up @@ -283,7 +283,9 @@ export class AccountsManager {

private _processSpend(spend: LightSpend) {
for (const account of this.accounts.values()) {
const noteHash = account.noteHashByNullifier.get(spend.nf);
const noteHash = account.noteHashByNullifier.get(
Buffer.from(spend.nf, "hex"),
);

if (!noteHash) return;

Expand Down Expand Up @@ -311,7 +313,9 @@ export class AccountsManager {
// First we'll process spends
tx.spends.forEach((spend) => {
// If we have a note hash for the nullifier, it means we have a note for this account.
const noteHash = account.noteHashByNullifier.get(spend.nf);
const noteHash = account.noteHashByNullifier.get(
Buffer.from(spend.nf, "hex"),
);

if (!noteHash) return;

Expand All @@ -328,7 +332,7 @@ export class AccountsManager {

// Next we'll process outputs
tx.outputs.forEach((output) => {
const note = new NoteEncrypted(output.note);
const note = new NoteEncrypted(Buffer.from(output.note, "hex"));

// Decrypt note using view key
const decryptedNoteBuffer = note.decryptNoteForOwner(
Expand Down
116 changes: 35 additions & 81 deletions example/src/Client/utils/BlockProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,31 @@
import { EventEmitter } from "events";

import { ServiceError } from "@grpc/grpc-js";
import { BlockCache } from "./BlockCache";
import {
BlockID,
Empty,
LightBlock,
LightStreamerClient,
} from "../../../../src/models/lightstreamer";
import { LightBlock } from "../../../../src/models/lightstreamer";
import {
addNotesToMerkleTree,
getNotesTreeSize,
revertToNoteSize,
} from "./MerkleTree";
import { logThrottled } from "./logThrottled";
import { AccountsManager } from "./AccountsManager";
import { Api } from "example/src/api/Api";

const POLL_INTERVAL = 30 * 1000;

export class BlockProcessor {
private client: LightStreamerClient;
private api: Api<unknown>;
private pollInterval?: NodeJS.Timer;
private isProcessingBlocks: boolean = false;
private blockCache: BlockCache;
private accountsManager: AccountsManager;
private events: EventEmitter = new EventEmitter(); // Event emitter for block events

constructor(
client: LightStreamerClient,
api: Api<unknown>,
blockCache: BlockCache,
accountsManager: AccountsManager,
) {
this.client = client;
this.api = api;
this.blockCache = blockCache;
this.accountsManager = accountsManager;
}
Expand Down Expand Up @@ -67,11 +61,7 @@ export class BlockProcessor {
if (this.isProcessingBlocks) {
return;
}
const [latestBlockError, latestBlock] = await this._getLatestBlock();

if (latestBlockError) {
throw latestBlockError;
}
const latestBlock = await this._getLatestBlock();

const headSequence = latestBlock.sequence;

Expand All @@ -96,75 +86,43 @@ export class BlockProcessor {
return;
}

private _getLatestBlock() {
return new Promise<[ServiceError | null, BlockID]>((res) => {
this.client.getLatestBlock(Empty, (error, result) => {
res([error, result]);
});
});
private async _getLatestBlock(): Promise<{
hash: string;
sequence: number;
}> {
const response = await this.api.latestBlock.getLatestBlock();
return response.data;
}

private _getBlockBySequence(sequence: number) {
return new Promise<[ServiceError | null, BlockID]>((res) => {
this.client.getBlock({ sequence }, (error, result) => {
res([error, result]);
});
});
private async _getBlockBySequence(sequence: number): Promise<LightBlock> {
const response = await this.api.block.getBlock({ sequence });
return LightBlock.decode(Buffer.from(response.data, "hex"));
}
private async _processBlockRange(startSequence: number, endSequence: number) {
console.log(`Processing blocks from ${startSequence} to ${endSequence}`);

let blocksProcessed = startSequence;
let processingChain = Promise.resolve(); // Initialize a Promise chain

const stream = this.client.getBlockRange({
start: {
sequence: startSequence,
},
end: {
sequence: endSequence,
},
const response = await this.api.blockRange.getBlockRange({
start: startSequence,
end: endSequence,
});

const resolveWhenDone = (resolve: (value: unknown) => void) => {
if (blocksProcessed === endSequence + 1) {
resolve(true);
console.log("Finished processing blocks");
}
};

try {
await new Promise((res, rej) => {
stream.on("data", (block: LightBlock) => {
// Append the next block's processing to the promise chain
processingChain = processingChain
.then(() => this._processBlock(block))
.then(() => {
blocksProcessed++;
logThrottled(
`Processed ${blocksProcessed}/${endSequence} blocks`,
100,
blocksProcessed,
);
resolveWhenDone(res); // Check if all blocks have been processed
})
.catch((err) => {
console.error("Error processing block:", err);
rej(err);
});
});

stream.on("end", () => {
resolveWhenDone(res); // Check if all blocks have been processed
});

stream.on("error", (err) => {
rej(err);
});
});
const blocks = response.data;
for (const block of blocks) {
try {
await this._processBlock(
LightBlock.decode(Buffer.from(block, "hex")),
);
} catch (err) {
console.error("Error processing block:", err);
throw err;
}
}
} catch (err) {
console.error(err);
}
console.log(
`Finished processing blocks from ${startSequence} to ${endSequence}`,
);
}

private async _processBlock(block: LightBlock) {
Expand All @@ -178,7 +136,7 @@ export class BlockProcessor {

for (const transaction of block.transactions) {
for (const output of transaction.outputs) {
notes.push(output.note);
notes.push(Buffer.from(output.note, "hex"));
}
}

Expand Down Expand Up @@ -206,7 +164,7 @@ export class BlockProcessor {

// If the incoming block's previous block hash matches the previous block's hash,
// there is no reorg. Note that Buffer.compare returns 0 if the two buffers are equal.
if (block.previousBlockHash.compare(prevCachedBlock.hash) === 0) {
if (block.previousBlockHash == prevCachedBlock.hash) {
return false;
}

Expand All @@ -216,11 +174,7 @@ export class BlockProcessor {

while (!lastMainChainBlock) {
// Get block from server
const [err, block] = await this._getBlockBySequence(currentSequence);

if (err) {
throw err;
}
const block = await this._getBlockBySequence(currentSequence);

// Get block from cache
const cachedBlock = await this.blockCache.getBlockBySequence(
Expand Down
Loading

0 comments on commit ff7ff2d

Please sign in to comment.