Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TRA-572] Add handler for upsert vault events to ender. #2274

Merged
merged 3 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions indexer/packages/postgres/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import PerpetualPositionModel from './models/perpetual-position-model';
import SubaccountModel from './models/subaccount-model';
import TradingRewardModel from './models/trading-reward-model';
import TransferModel from './models/transfer-model';
import VaultModel from './models/vault-model';
import {
APITimeInForce,
CandleResolution,
Expand Down Expand Up @@ -103,6 +104,7 @@ export const SQL_TO_JSON_DEFINED_MODELS = [
SubaccountModel,
TransferModel,
TradingRewardModel,
VaultModel,
];

export type SpecifiedClobPairStatus =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export async function up(knex: Knex): Promise<void> {
table.bigInteger('clobPairId').notNullable(); // clob pair id for vault
table.enum('status', [
'DEACTIVATED',
'STANDBY',
'STAND_BY',
'QUOTING',
'CLOSE_ONLY',
]).notNullable(); // quoting status of vault
Expand Down
1 change: 1 addition & 0 deletions indexer/packages/postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export * as SubaccountUsernamesTable from './stores/subaccount-usernames-table';
export * as PersistentCacheTable from './stores/persistent-cache-table';
export * as AffiliateReferredUsersTable from './stores/affiliate-referred-users-table';
export * as FirebaseNotificationTokenTable from './stores/firebase-notification-token-table';
export * as VaultTable from './stores/vault-table';

export * as perpetualMarketRefresher from './loops/perpetual-market-refresher';
export * as assetRefresher from './loops/asset-refresher';
Expand Down
16 changes: 16 additions & 0 deletions indexer/packages/postgres/src/models/vault-model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,22 @@ export default class VaultModel extends BaseModel {
};
}

/**
* A mapping from column name to JSON conversion expected.
* See getSqlConversionForDydxModelTypes for valid conversions.
*
* TODO(IND-239): Ensure that jsonSchema() / sqlToJsonConversions() / model fields match.
*/
static get sqlToJsonConversions() {
return {
address: 'string',
clobPairId: 'string',
status: 'string',
createdAt: 'date-time',
updatedAt: 'date-time',
};
}

address!: string;

clobPairId!: string;
Expand Down
1 change: 1 addition & 0 deletions indexer/packages/v4-protos/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ export * from './codegen/google/protobuf/timestamp';
export * from './codegen/dydxprotocol/indexer/protocol/v1/clob';
export * from './codegen/dydxprotocol/indexer/protocol/v1/subaccount';
export * from './codegen/dydxprotocol/indexer/shared/removal_reason';
export * from './codegen/dydxprotocol/vault/vault';
export * from './utils';
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import {
IndexerTendermintBlock,
UpsertVaultEventV1,
VaultStatus,
} from '@dydxprotocol-indexer/v4-protos';
import {
dbHelpers,
testMocks,
testConstants,
VaultFromDatabase,
VaultTable,
VaultStatus as IndexerVaultStatus,
} from '@dydxprotocol-indexer/postgres';
import { KafkaMessage } from 'kafkajs';
import { createKafkaMessage } from '@dydxprotocol-indexer/kafka';
import { onMessage } from '../../src/lib/on-message';
import { DydxIndexerSubtypes } from '../../src/lib/types';
import { createIndexerTendermintBlock, createIndexerTendermintEvent } from '../helpers/indexer-proto-helpers';
import {
defaultHeight,
defaultPreviousHeight,
defaultTime,
defaultTxHash,
} from '../helpers/constants';
import { updateBlockCache } from '../../src/caches/block-cache';
import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions';

describe('upsertVaultHandler', () => {
beforeAll(async () => {
await dbHelpers.migrate();
await dbHelpers.clearData();
await createPostgresFunctions();
});

beforeEach(async () => {
await testMocks.seedData();
updateBlockCache(defaultPreviousHeight);
});

afterEach(async () => {
await dbHelpers.clearData();
jest.clearAllMocks();
});

afterAll(async () => {
await dbHelpers.teardown();
jest.resetAllMocks();
});

it('should upsert new vaults in single block', async () => {
const events: UpsertVaultEventV1[] = [
{
address: testConstants.defaultVaultAddress,
clobPairId: 0,
status: VaultStatus.VAULT_STATUS_QUOTING,
}, {
address: testConstants.defaultAddress,
clobPairId: 1,
status: VaultStatus.VAULT_STATUS_STAND_BY,
},
];
const block: IndexerTendermintBlock = createBlockFromEvents(
defaultHeight,
...events,
);
const binaryBlock: Uint8Array = IndexerTendermintBlock.encode(block).finish();
const kafkaMessage: KafkaMessage = createKafkaMessage(Buffer.from(binaryBlock));

await onMessage(kafkaMessage);

const vaults: VaultFromDatabase[] = await VaultTable.findAll(
{},
[],
{},
);
expect(vaults).toHaveLength(2);
expect(vaults[0]).toEqual({
address: testConstants.defaultVaultAddress,
clobPairId: '0',
status: IndexerVaultStatus.QUOTING,
createdAt: block.time?.toISOString(),
updatedAt: block.time?.toISOString(),
});
expect(vaults[1]).toEqual({
address: testConstants.defaultAddress,
clobPairId: '1',
status: IndexerVaultStatus.STAND_BY,
createdAt: block.time?.toISOString(),
updatedAt: block.time?.toISOString(),
});
});

it('should upsert an existing vault', async () => {
await VaultTable.create(testConstants.defaultVault);

const events: UpsertVaultEventV1[] = [
{
address: testConstants.defaultVaultAddress,
clobPairId: 0,
status: VaultStatus.VAULT_STATUS_CLOSE_ONLY,
},
];
const block: IndexerTendermintBlock = createBlockFromEvents(
defaultHeight,
...events,
);
const binaryBlock: Uint8Array = IndexerTendermintBlock.encode(block).finish();
const kafkaMessage: KafkaMessage = createKafkaMessage(Buffer.from(binaryBlock));

await onMessage(kafkaMessage);

const vaults: VaultFromDatabase[] = await VaultTable.findAll(
{},
[],
{},
);
expect(vaults).toHaveLength(1);
expect(vaults[0]).toEqual({
address: testConstants.defaultVault.address,
clobPairId: testConstants.defaultVault.clobPairId,
status: IndexerVaultStatus.CLOSE_ONLY,
createdAt: testConstants.defaultVault.createdAt,
updatedAt: block.time?.toISOString(),
});
});
});

function createBlockFromEvents(
height: number,
...events: UpsertVaultEventV1[]
): IndexerTendermintBlock {
const transactionIndex = 0;
let eventIndex = 0;

return createIndexerTendermintBlock(
height,
defaultTime,
events.map((event) => {
const indexerEvent = createIndexerTendermintEvent(
DydxIndexerSubtypes.UPSERT_VAULT,
UpsertVaultEventV1.encode(event).finish(),
transactionIndex,
eventIndex,
);
eventIndex += 1;
return indexerEvent;
}),
[defaultTxHash],
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { ParseMessageError, logger } from '@dydxprotocol-indexer/base';
import {
IndexerTendermintBlock,
IndexerTendermintEvent,
UpsertVaultEventV1,
VaultStatus,
} from '@dydxprotocol-indexer/v4-protos';
import { dbHelpers, testConstants, testMocks } from '@dydxprotocol-indexer/postgres';
import { DydxIndexerSubtypes } from '../../src/lib/types';
import { defaultHeight, defaultTime, defaultTxHash } from '../helpers/constants';
import {
createIndexerTendermintBlock,
createIndexerTendermintEvent,
} from '../helpers/indexer-proto-helpers';
import { expectDidntLogError } from '../helpers/validator-helpers';
import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions';
import { UpsertVaultValidator } from '../../src/validators/upsert-vault-validator';

describe('upsert-vault-validator', () => {
beforeAll(async () => {
await dbHelpers.migrate();
await createPostgresFunctions();
});

beforeEach(async () => {
await testMocks.seedData();
jest.spyOn(logger, 'error');
});

afterEach(async () => {
await dbHelpers.clearData();
jest.clearAllMocks();
});

afterAll(async () => {
await dbHelpers.teardown();
jest.resetAllMocks();
});

describe('validate', () => {
it('does not throw error on valid uspert vault event', () => {
const event: UpsertVaultEventV1 = {
address: testConstants.defaultVaultAddress,
clobPairId: 0,
status: VaultStatus.VAULT_STATUS_QUOTING,
};
const validator: UpsertVaultValidator = new UpsertVaultValidator(
event,
createBlock(event),
0,
);

validator.validate();
expectDidntLogError();
});

it('throws error if address in event is empty', () => {
const event: UpsertVaultEventV1 = {
address: '',
clobPairId: 0,
status: VaultStatus.VAULT_STATUS_QUOTING,
};
const validator: UpsertVaultValidator = new UpsertVaultValidator(
event,
createBlock(event),
0,
);

expect(() => validator.validate()).toThrow(new ParseMessageError(
'UpsertVaultEvent address is not populated',
));
});
});
});

function createBlock(
upsertVaultEvent: UpsertVaultEventV1,
): IndexerTendermintBlock {
const event: IndexerTendermintEvent = createIndexerTendermintEvent(
DydxIndexerSubtypes.UPSERT_VAULT,
UpsertVaultEventV1.encode(upsertVaultEvent).finish(),
0,
0,
);

return createIndexerTendermintBlock(
defaultHeight,
defaultTime,
[event],
[defaultTxHash],
);
}
18 changes: 18 additions & 0 deletions indexer/services/ender/src/handlers/upsert-vault-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { UpsertVaultEventV1 } from '@dydxprotocol-indexer/v4-protos';
import * as pg from 'pg';

import { ConsolidatedKafkaEvent } from '../lib/types';
import { Handler } from './handler';

export class UpsertVaultHandler extends Handler<UpsertVaultEventV1> {
eventType: string = 'UpsertVaultEventV1';

public getParallelizationIds(): string[] {
return [];
}

// eslint-disable-next-line @typescript-eslint/require-await
public async internalHandle(_: pg.QueryResultRow): Promise<ConsolidatedKafkaEvent[]> {
return [];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const HANDLER_SCRIPTS: string[] = [
'dydx_transfer_handler.sql',
'dydx_update_clob_pair_handler.sql',
'dydx_update_perpetual_handler.sql',
'dydx_vault_upsert_handler.sql',
];

const DB_SETUP_SCRIPTS: string[] = [
Expand Down Expand Up @@ -91,6 +92,7 @@ const HELPER_SCRIPTS: string[] = [
'dydx_uuid_from_transaction_parts.sql',
'dydx_uuid_from_transfer_parts.sql',
'dydx_protocol_market_type_to_perpetual_market_type.sql',
'dydx_protocol_vault_status_to_vault_status.sql',
];

const MAIN_SCRIPTS: string[] = [
Expand Down
3 changes: 3 additions & 0 deletions indexer/services/ender/src/lib/block-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { TradingRewardsValidator } from '../validators/trading-rewards-validator
import { TransferValidator } from '../validators/transfer-validator';
import { UpdateClobPairValidator } from '../validators/update-clob-pair-validator';
import { UpdatePerpetualValidator } from '../validators/update-perpetual-validator';
import { UpsertVaultValidator } from '../validators/upsert-vault-validator';
import { Validator, ValidatorInitializer } from '../validators/validator';
import { BatchedHandlers } from './batched-handlers';
import { indexerTendermintEventToEventProtoWithType, indexerTendermintEventToTransactionIndex } from './helper';
Expand All @@ -55,13 +56,15 @@ const TXN_EVENT_SUBTYPE_VERSION_TO_VALIDATOR_MAPPING: Record<string, ValidatorIn
[serializeSubtypeAndVersion(DydxIndexerSubtypes.DELEVERAGING.toString(), 1)]: DeleveragingValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.LIQUIDITY_TIER.toString(), 2)]: LiquidityTierValidatorV2,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.REGISTER_AFFILIATE.toString(), 1)]: RegisterAffiliateValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.UPSERT_VAULT.toString(), 1)]: UpsertVaultValidator,
};

const BLOCK_EVENT_SUBTYPE_VERSION_TO_VALIDATOR_MAPPING: Record<string, ValidatorInitializer> = {
[serializeSubtypeAndVersion(DydxIndexerSubtypes.FUNDING.toString(), 1)]: FundingValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.TRADING_REWARD.toString(), 1)]: TradingRewardsValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.STATEFUL_ORDER.toString(), 1)]: StatefulOrderValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.OPEN_INTEREST_UPDATE.toString(), 1)]: OpenInterestUpdateValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.UPSERT_VAULT.toString(), 1)]: UpsertVaultValidator,
};

function serializeSubtypeAndVersion(
Expand Down
10 changes: 10 additions & 0 deletions indexer/services/ender/src/lib/helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
OpenInterestUpdateEventV1,
TradingRewardsEventV1,
RegisterAffiliateEventV1,
UpsertVaultEventV1,
} from '@dydxprotocol-indexer/v4-protos';
import Big from 'big.js';
import _ from 'lodash';
Expand Down Expand Up @@ -251,6 +252,15 @@ export function indexerTendermintEventToEventProtoWithType(
blockEventIndex,
};
}
case (DydxIndexerSubtypes.UPSERT_VAULT.toString()): {
return {
type: DydxIndexerSubtypes.UPSERT_VAULT,
eventProto: UpsertVaultEventV1.decode(eventDataBinary),
indexerTendermintEvent: event,
version,
blockEventIndex,
};
}
default: {
const message: string = `Unable to parse event subtype: ${event.subtype}`;
logger.error({
Expand Down
Loading
Loading