Skip to content

Commit

Permalink
feat: JOIN-49161 add possibility for filtration in pubsub lib (#109)
Browse files Browse the repository at this point in the history
  • Loading branch information
eugene-taran authored Dec 17, 2024
1 parent 5ae0733 commit 76c0f3d
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defaults: &defaults
environment:
NODE_OPTIONS: '--max_old_space_size=1536'
docker:
- image: cimg/node:20.11.0
- image: cimg/node:20.17.0
<<: *docker-auth

commands:
Expand Down
10 changes: 5 additions & 5 deletions packages/pubsub/src/Publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ export class Publisher<T = unknown> {
}
}

public async publishMsg(data: T): Promise<void> {
await this.sendAvroMessage(data)
public async publishMsg(data: T, attributes?: Record<string, string>): Promise<void> {
await this.sendAvroMessage(data, attributes)
}

public async flush(): Promise<void> {
Expand Down Expand Up @@ -96,8 +96,8 @@ export class Publisher<T = unknown> {
}
}

private async sendAvroMessage(data: T): Promise<void> {
let currentMessageMetadata = this.avroMessageMetadata
private async sendAvroMessage(data: T, attributes?: Record<string, string>): Promise<void> {
let currentMessageMetadata = { ...this.avroMessageMetadata, ...attributes}
if (this.optionArrayPaths && this.optionArrayPaths.length > 0) {
const undefinedOrNullOptionalArrays = this.fieldsProcessor.findAndReplaceUndefinedOrNullOptionalArrays(
data as Record<string, unknown>,
Expand All @@ -122,7 +122,7 @@ export class Publisher<T = unknown> {

const buffer = Buffer.from(this.readerAvroType.toString(data))
const messageId = await this.topic.publishMessage({ data: buffer, attributes: currentMessageMetadata })
this.logger?.info(`PubSub: Avro message sent for topic: ${this.topicName}:`, { data, messageId })
this.logger?.info(`PubSub: Avro message sent for topic: ${this.topicName}:`, { data, attributes, messageId })
}

private prepareAvroMessageMetadata(schema: SchemaWithMetadata): Record<string, string> {
Expand Down
2 changes: 1 addition & 1 deletion packages/pubsub/src/PublisherFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Publisher } from './Publisher'
export interface IPublisher<T> {
topicName: string
initialize: () => Promise<void>
publishMsg: (data: T) => Promise<void>
publishMsg: (data: T, attributes: Record<string, string>) => Promise<void>
flush: () => Promise<void>
}

Expand Down
16 changes: 15 additions & 1 deletion packages/pubsub/src/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ export interface IMessageInfo {
receivedAt: Date
}

/**
* Subscription options
* Filter is immutable and can't be changed after subscription is created
*/
export interface ISubscriptionOptions {
ackDeadline?: number
allowExcessMessages?: boolean
Expand All @@ -34,7 +38,8 @@ export interface ISubscriptionOptions {
name: string
id: number
}
labels?: ({ [k: string]: string } | null);
labels?: ({ [k: string]: string } | null)
filter?: string
}

export interface ISubscriberOptions {
Expand All @@ -57,6 +62,7 @@ interface ISubscriptionInitializationOptions {
deadLetterPolicy: ISubscriptionDeadLetterPolicy | null
retryPolicy: ISubscriptionRetryPolicy
labels?: ({ [k: string]: string } | null);
filter?: string
}

export class Subscriber<T = unknown> {
Expand Down Expand Up @@ -222,6 +228,10 @@ export class Subscriber<T = unknown> {
this.logger?.info(`PubSub: Subscription ${subscriptionName} is created`)
} else if (options) {
const [existingSubscription] = await subscription.getMetadata()
if (options.filter != existingSubscription.filter) {
throw new Error(`PubSub: Subscriptions filters are immutable, they can't be changed, subscription: ${subscriptionName},` +
` currentFilter: ${existingSubscription.filter || 'undefined'}, newFilter: ${options.filter || 'undefined'}`)
}
if (this.isMetadataChanged(existingSubscription, options)) {
await subscription.setMetadata(options)
this.logger?.info(`PubSub: Subscription ${subscriptionName} metadata updated`)
Expand Down Expand Up @@ -304,6 +314,10 @@ export class Subscriber<T = unknown> {
}
}

if (subscriptionOptions.filter) {
options.filter = subscriptionOptions.filter
}

return options
}

Expand Down
17 changes: 15 additions & 2 deletions packages/pubsub/src/__tests__/Publisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ const topic = 'topic-name'
const topicMock = getTopicMock()
const clientMock = getClientMock({ topicMock })
const type = Type.forSchema(SCHEMA_DEFINITION_EXAMPLE as Schema, {logicalTypes: {'timestamp-micros': DateType}})
const processAbortSpy = jest.spyOn(process, 'abort')
const schemas = {writer: SCHEMA_DEFINITION_EXAMPLE, reader: SCHEMA_DEFINITION_EXAMPLE}
const schemasWithArrays = {writer: SCHEMA_DEFINITION_WRITER_OPTIONAL_ARRAY_EXAMPLE,
reader: SCHEMA_DEFINITION_READER_OPTIONAL_ARRAY_EXAMPLE}
Expand All @@ -45,7 +44,6 @@ describe('Publisher', () => {
topicMock.publishMessage.mockReset()
topicMock.getMetadata.mockReset()
schemaMock.get.mockReset()
processAbortSpy.mockClear()
})

describe('initialize', () => {
Expand Down Expand Up @@ -96,6 +94,21 @@ describe('Publisher', () => {
expect(topicMock.publishMessage).toHaveBeenCalledWith({ data: avroMessage, attributes: metadata })
})

it('publishes avro json encoded object with attributes', async () => {
topicMock.exists.mockResolvedValue([true])
topicMock.getMetadata.mockResolvedValue([{ 'schemaSettings': { 'schema': 'mock-schema' } }])
await publisher.initialize()

await publisher.publishMsg(message, { 'testKey': 'testValue' })

expect(topicMock.publishMessage).toHaveBeenCalledWith({
data: avroMessage, attributes: {
...metadata,
'testKey': 'testValue',
},
})
})

it('publishes avro json with max allowed date value when date in micros overflows MAX_SAFE_INTEGER', async () => {
publisher = new Publisher(topic, clientMock as unknown as PubSub, schemas, new ConsoleLogger())
topicMock.exists.mockResolvedValue([true])
Expand Down
61 changes: 60 additions & 1 deletion packages/pubsub/src/__tests__/Subscriber.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { PubSub } from '@google-cloud/pubsub'
import { SchemaServiceClient, SubscriberClient } from '@google-cloud/pubsub/build/src/v1'
import { Schema, Type } from 'avsc'
import { createCallOptions } from '../createCallOptions'
import { ILogger } from '../ILogger'
import { DateType } from '../logical-types/DateType'
import { IParsedMessage, ISubscriptionOptions, Subscriber } from '../Subscriber'
import {
Expand Down Expand Up @@ -34,7 +35,6 @@ const readerTypeWithArrays = Type.forSchema(SCHEMA_DEFINITION_READER_OPTIONAL_AR
const typeWithPreserveNull = Type.forSchema(SCHEMA_DEFINITION_PRESERVE_NULL_EXAMPLE as Schema, {logicalTypes: {'timestamp-micros': DateType}})
const flushPromises = () => new Promise(setImmediate)


const subscriptionOptions: ISubscriptionOptions = {
ackDeadline: 10,
allowExcessMessages: true,
Expand All @@ -45,6 +45,12 @@ const subscriptionOptions: ISubscriptionOptions = {
labels: { testKey: 'testValue'}
}

const loggerMock = jest.mocked<ILogger>({
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
})

describe('Subscriber', () => {
let subscriber: Subscriber

Expand Down Expand Up @@ -125,6 +131,57 @@ describe('Subscriber', () => {
expect(subscriptionMock.setMetadata).not.toHaveBeenCalled()
})

it('creates subscription with filter', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([false])
const subscriberWithFilter = new Subscriber({
topicName, subscriptionName,
subscriptionOptions: {
...subscriptionOptions,
filter: 'attributes.testKey="testValue"',
},
}, clientMock as unknown as PubSub,
schemaClientMock as unknown as SchemaServiceClient, undefined as unknown as SubscriberClient, new ConsoleLogger())

await subscriberWithFilter.initialize()

expect(subscriptionMock.create).toHaveBeenCalledWith({
deadLetterPolicy: null,
retryPolicy: {
minimumBackoff: { seconds: subscriptionOptions.minBackoffSeconds },
maximumBackoff: { seconds: subscriptionOptions.maxBackoffSeconds },
},
labels: subscriptionOptions.labels,
gaxOpts: createCallOptions,
filter: 'attributes.testKey="testValue"'
})
})

it('throws error if filter has changed', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
subscriptionMock.getMetadata.mockResolvedValue([{
filter: 'attributes.testKey="currentValue"',
}])

const subscriberWithFilter = new Subscriber({
topicName, subscriptionName,
subscriptionOptions: {
...subscriptionOptions,
filter: 'attributes.testKey="newValue"',
},
}, clientMock as unknown as PubSub,
schemaClientMock as unknown as SchemaServiceClient, undefined as unknown as SubscriberClient, loggerMock)
const processAbortSpy = jest.spyOn(process, 'abort').mockImplementation()

await subscriberWithFilter.initialize()

expect(loggerMock.error).toHaveBeenCalledWith('PubSub: Failed to initialize subscriber subscription-name',
new Error('PubSub: Subscriptions filters are immutable, they can\'t be changed, subscription: subscription-name, ' +
'currentFilter: attributes.testKey="currentValue", newFilter: attributes.testKey="newValue"'))
processAbortSpy.mockClear()
})

it('updates metadata if backoff has changed', async () => {
topicMock.exists.mockResolvedValue([true])
subscriptionMock.exists.mockResolvedValue([true])
Expand Down Expand Up @@ -248,6 +305,8 @@ describe('Subscriber', () => {
expect(subscriptionMock.setMetadata).not.toHaveBeenCalled()
})



describe('dead letter policy', () => {
const deadLetterTopicName = 'subscription-name-unack'
const deadLetterSubscriptionName = 'subscription-name-unack'
Expand Down

0 comments on commit 76c0f3d

Please sign in to comment.