From fad4d27bfebb80a374c2041b86ffab509845effe Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Wed, 23 Oct 2024 14:47:18 +0300 Subject: [PATCH] fix: subscriptions & webhooks (#7816) * fix: subscriptions & webhooks * chore(dependencies): updated changesets for modified dependencies * Fix lockfile * Fix TS * pipeLogs: true remove --------- Co-authored-by: github-actions[bot] --- ...l-mesh_fusion-runtime-7816-dependencies.md | 5 + .changeset/mean-papayas-look.md | 8 ++ .github/workflows/tests.yml | 2 +- .../json-schema-subscriptions.test.ts.snap | 17 ++-- .../json-schema-subscriptions.test.ts | 92 ++++++++++--------- e2e/json-schema-subscriptions/mesh.config.ts | 14 ++- .../composition/src/transforms/encapsulate.ts | 6 +- packages/fusion/runtime/package.json | 1 + .../fusion/runtime/src/federation/subgraph.ts | 11 --- .../runtime/src/federation/supergraph.ts | 49 +++++++--- packages/legacy/config/src/process.ts | 6 +- .../utils/src/resolve-additional-resolvers.ts | 5 +- .../serve-runtime/src/createGatewayRuntime.ts | 1 - website/src/pages/v1/_meta.ts | 1 + .../src/pages/v1/subscriptions-webhooks.mdx | 88 ++++++++++++++++++ yarn.lock | 1 + 16 files changed, 217 insertions(+), 90 deletions(-) create mode 100644 .changeset/@graphql-mesh_fusion-runtime-7816-dependencies.md create mode 100644 .changeset/mean-papayas-look.md create mode 100644 website/src/pages/v1/subscriptions-webhooks.mdx diff --git a/.changeset/@graphql-mesh_fusion-runtime-7816-dependencies.md b/.changeset/@graphql-mesh_fusion-runtime-7816-dependencies.md new file mode 100644 index 0000000000000..2fb3c2cf6cfb3 --- /dev/null +++ b/.changeset/@graphql-mesh_fusion-runtime-7816-dependencies.md @@ -0,0 +1,5 @@ +--- +"@graphql-mesh/fusion-runtime": patch +--- +dependencies updates: + - Added dependency [`@graphql-tools/merge@^9.0.8` ↗︎](https://www.npmjs.com/package/@graphql-tools/merge/v/9.0.8) (to `dependencies`) diff --git a/.changeset/mean-papayas-look.md b/.changeset/mean-papayas-look.md new file mode 100644 index 0000000000000..7253bb9ce3dff --- /dev/null +++ b/.changeset/mean-papayas-look.md @@ -0,0 +1,8 @@ +--- +'@graphql-mesh/fusion-composition': patch +'@graphql-mesh/fusion-runtime': patch +'@graphql-mesh/config': patch +'@graphql-mesh/utils': patch +--- + +Fix subscription fields fed by webhooks and defined in \`additionalTypeDefs\` with \`@resolveTo\` diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ad2244768388e..1838ccd2a9724 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -126,7 +126,7 @@ jobs: command: yarn test:leak integration: - needs: [unit] + needs: [typecheck, unit] name: integration / node ${{matrix.node-version}} timeout-minutes: 60 runs-on: ubuntu-latest diff --git a/e2e/json-schema-subscriptions/__snapshots__/json-schema-subscriptions.test.ts.snap b/e2e/json-schema-subscriptions/__snapshots__/json-schema-subscriptions.test.ts.snap index 593d05f85f0ec..2638ce5f0005a 100644 --- a/e2e/json-schema-subscriptions/__snapshots__/json-schema-subscriptions.test.ts.snap +++ b/e2e/json-schema-subscriptions/__snapshots__/json-schema-subscriptions.test.ts.snap @@ -1,7 +1,7 @@ // Jest Snapshot v1, https://goo.gl/fbAQLP exports[`should compose the appropriate schema 1`] = ` -"schema @link(url: "https://specs.apollo.dev/link/v1.0") @link(url: "https://specs.apollo.dev/join/v0.3", for: EXECUTION) @link(url: "https://the-guild.dev/graphql/mesh/spec/v1.0", import: ["@example", "@httpOperation", "@pubsubOperation", "@transport", "@extraSchemaDefinitionDirective"]) { +"schema @link(url: "https://the-guild.dev/graphql/mesh/spec/v1.0") { query: Query mutation: Mutation subscription: Subscription @@ -31,6 +31,8 @@ directive @transport(subgraph: String, kind: String, location: String, headers: directive @extraSchemaDefinitionDirective(directives: _DirectiveExtensions) repeatable on OBJECT +directive @additionalField on FIELD_DEFINITION + directive @live on QUERY scalar join__FieldSet @@ -67,10 +69,10 @@ type query_todos_items @join__type(graph: API) { } type Mutation @join__type(graph: API) { - addTodo(input: mutationInput_addTodo_input_Input): mutation_addTodo @httpOperation(subgraph: "API", path: "/todo", httpMethod: POST) + addTodo(input: mutationInput_addTodo_input_Input): Todo @httpOperation(subgraph: "API", path: "/todo", httpMethod: POST) } -type mutation_addTodo @example(subgraph: "API", value: "{\\"id\\":0,\\"name\\":\\"TodoName\\",\\"content\\":\\"TodoContent\\"}") @join__type(graph: API) { +type Todo @example(subgraph: "API", value: "{\\"id\\":0,\\"name\\":\\"TodoName\\",\\"content\\":\\"TodoContent\\"}") @join__type(graph: API) { id: Int name: String content: String @@ -78,13 +80,8 @@ type mutation_addTodo @example(subgraph: "API", value: "{\\"id\\":0,\\"name\\":\ type Subscription @join__type(graph: API) { """PubSub Topic: webhook:post:/webhooks/todo_added""" - todoAdded: subscription_todoAdded @pubsubOperation(subgraph: "API", pubsubTopic: "webhook:post:/webhooks/todo_added") -} - -type subscription_todoAdded @example(subgraph: "API", value: "{\\"id\\":0,\\"name\\":\\"TodoName\\",\\"content\\":\\"TodoContent\\"}") @join__type(graph: API) { - id: Int - name: String - content: String + todoAddedFromSource: Todo @pubsubOperation(subgraph: "API", pubsubTopic: "webhook:post:/webhooks/todo_added") + todoAddedFromExtensions: Todo @resolveTo(pubsubTopic: "webhook:post:/webhooks/todo_added") @additionalField } enum HTTPMethod @join__type(graph: API) { diff --git a/e2e/json-schema-subscriptions/json-schema-subscriptions.test.ts b/e2e/json-schema-subscriptions/json-schema-subscriptions.test.ts index d483c7ab77b41..77689d5cd13ba 100644 --- a/e2e/json-schema-subscriptions/json-schema-subscriptions.test.ts +++ b/e2e/json-schema-subscriptions/json-schema-subscriptions.test.ts @@ -9,24 +9,26 @@ it('should compose the appropriate schema', async () => { expect(result).toMatchSnapshot(); }); -it('should query, mutate and subscribe', async () => { - const servePort = await getAvailablePort(); - const api = await service('api', { servePort }); - const { output } = await compose({ output: 'graphql', services: [api] }); - const { execute } = await serve({ supergraph: output, port: servePort }); +['todoAddedFromSource', 'todoAddedFromExtensions'].forEach(subscriptionField => { + describe(`Listen to ${subscriptionField}`, () => { + it('should query, mutate and subscribe', async () => { + const servePort = await getAvailablePort(); + const api = await service('api', { servePort }); + const { output } = await compose({ output: 'graphql', services: [api] }); + const { execute } = await serve({ supergraph: output, port: servePort }); - await expect( - execute({ - query: /* GraphQL */ ` - query Todos { - todos { - name - content - } - } - `, - }), - ).resolves.toMatchInlineSnapshot(` + await expect( + execute({ + query: /* GraphQL */ ` + query Todos { + todos { + name + content + } + } + `, + }), + ).resolves.toMatchInlineSnapshot(` { "data": { "todos": [], @@ -34,35 +36,35 @@ it('should query, mutate and subscribe', async () => { } `); - const sse = createClient({ - url: `http://localhost:${servePort}/graphql`, - retryAttempts: 0, - fetchFn: fetch, - }); + const sse = createClient({ + url: `http://localhost:${servePort}/graphql`, + retryAttempts: 0, + fetchFn: fetch, + }); - const sub = sse.iterate({ - query: /* GraphQL */ ` - subscription TodoAdded { - todoAdded { + const sub = sse.iterate({ + query: /* GraphQL */ ` + subscription ${subscriptionField} { + ${subscriptionField} { name content } } `, - }); + }); - await expect( - execute({ - query: /* GraphQL */ ` - mutation AddTodo { - addTodo(input: { name: "Shopping", content: "Buy Milk" }) { - name - content - } - } - `, - }), - ).resolves.toMatchInlineSnapshot(` + await expect( + execute({ + query: /* GraphQL */ ` + mutation AddTodo { + addTodo(input: { name: "Shopping", content: "Buy Milk" }) { + name + content + } + } + `, + }), + ).resolves.toMatchInlineSnapshot(` { "data": { "addTodo": { @@ -73,17 +75,19 @@ it('should query, mutate and subscribe', async () => { } `); - for await (const msg of sub) { - expect(msg).toMatchInlineSnapshot(` + for await (const msg of sub) { + expect(msg).toMatchInlineSnapshot(` { "data": { - "todoAdded": { + "${subscriptionField}": { "content": "Buy Milk", "name": "Shopping", }, }, } `); - break; - } + break; + } + }); + }); }); diff --git a/e2e/json-schema-subscriptions/mesh.config.ts b/e2e/json-schema-subscriptions/mesh.config.ts index c191da88dea92..698a13e8428d9 100644 --- a/e2e/json-schema-subscriptions/mesh.config.ts +++ b/e2e/json-schema-subscriptions/mesh.config.ts @@ -30,12 +30,14 @@ export const composeConfig = defineComposeConfig({ method: 'POST', requestSample: './addTodo.json', responseSample: './todo.json', + responseTypeName: 'Todo', }, { type: OperationTypeNode.SUBSCRIPTION, - field: 'todoAdded', + field: 'todoAddedFromSource', pubsubTopic: 'webhook:post:/webhooks/todo_added', responseSample: './todo.json', + responseTypeName: 'Todo', }, ], }), @@ -43,6 +45,16 @@ export const composeConfig = defineComposeConfig({ ], additionalTypeDefs: /* GraphQL */ ` directive @live on QUERY + + # If you don't have Subscription type defined anywhere + # You have to extend subscription definition + extend schema { + subscription: Subscription + } + + type Subscription { + todoAddedFromExtensions: Todo @resolveTo(pubsubTopic: "webhook:post:/webhooks/todo_added") + } `, }); diff --git a/packages/fusion/composition/src/transforms/encapsulate.ts b/packages/fusion/composition/src/transforms/encapsulate.ts index d631c7986848f..31b57957f6c1e 100644 --- a/packages/fusion/composition/src/transforms/encapsulate.ts +++ b/packages/fusion/composition/src/transforms/encapsulate.ts @@ -132,9 +132,9 @@ export const resolveToDirective = new GraphQLDirective({ result: { type: GraphQLString }, resultType: { type: GraphQLString }, sourceArgs: { type: resolveToSourceArgsScalar }, - sourceFieldName: { type: new GraphQLNonNull(GraphQLString) }, - sourceName: { type: new GraphQLNonNull(GraphQLString) }, + sourceFieldName: { type: GraphQLString }, + sourceName: { type: GraphQLString }, sourceSelectionSet: { type: GraphQLString }, - sourceTypeName: { type: new GraphQLNonNull(GraphQLString) }, + sourceTypeName: { type: GraphQLString }, }, }); diff --git a/packages/fusion/runtime/package.json b/packages/fusion/runtime/package.json index ada5e2cb562af..40217f26fb5ec 100644 --- a/packages/fusion/runtime/package.json +++ b/packages/fusion/runtime/package.json @@ -59,6 +59,7 @@ "@graphql-tools/delegate": "^10.0.26", "@graphql-tools/executor": "^1.3.2", "@graphql-tools/federation": "^2.2.17", + "@graphql-tools/merge": "^9.0.8", "@graphql-tools/stitch": "^9.2.15", "@graphql-tools/stitching-directives": "^3.1.7", "@graphql-tools/utils": "^10.5.5", diff --git a/packages/fusion/runtime/src/federation/subgraph.ts b/packages/fusion/runtime/src/federation/subgraph.ts index 2beb707175701..e6ca338800fa4 100644 --- a/packages/fusion/runtime/src/federation/subgraph.ts +++ b/packages/fusion/runtime/src/federation/subgraph.ts @@ -40,7 +40,6 @@ export interface HandleFederationSubschemaOpts { schemaDirectives?: Record; transportEntryMap: Record; additionalTypeDefs: TypeSource[]; - additionalResolvers: IResolvers[]; stitchingDirectivesTransformer: (subschemaConfig: SubschemaConfig) => SubschemaConfig; onSubgraphExecute: ReturnType; } @@ -51,7 +50,6 @@ export function handleFederationSubschema({ schemaDirectives, transportEntryMap, additionalTypeDefs, - additionalResolvers, stitchingDirectivesTransformer, onSubgraphExecute, }: HandleFederationSubschemaOpts) { @@ -140,15 +138,6 @@ export function handleFederationSubschema({ }, ], }); - for (const resolveToDirective of resolveToDirectives) { - additionalResolvers.push( - resolveAdditionalResolversWithoutImport({ - targetTypeName: typeName, - targetFieldName: fieldName, - ...resolveToDirective, - }), - ); - } } const additionalFieldDirectives = fieldDirectives.additionalField; if (additionalFieldDirectives?.length > 0) { diff --git a/packages/fusion/runtime/src/federation/supergraph.ts b/packages/fusion/runtime/src/federation/supergraph.ts index 6d9960422841b..abb1717a52ec3 100644 --- a/packages/fusion/runtime/src/federation/supergraph.ts +++ b/packages/fusion/runtime/src/federation/supergraph.ts @@ -1,8 +1,17 @@ -import { isEnumType, Kind, visit, type GraphQLSchema } from 'graphql'; +import { + isEnumType, + Kind, + print, + visit, + type GraphQLSchema, + type ObjectTypeDefinitionNode, +} from 'graphql'; import type { TransportEntry } from '@graphql-mesh/transport-common'; +import type { YamlConfig } from '@graphql-mesh/types'; import { resolveAdditionalResolversWithoutImport } from '@graphql-mesh/utils'; import type { SubschemaConfig } from '@graphql-tools/delegate'; import { getStitchedSchemaFromSupergraphSdl } from '@graphql-tools/federation'; +import { mergeTypeDefs } from '@graphql-tools/merge'; import { stitchingDirectives } from '@graphql-tools/stitching-directives'; import { asArray, @@ -122,17 +131,39 @@ export const handleFederationSupergraph: UnifiedGraphHandler = function ({ schemaDirectives, transportEntryMap, additionalTypeDefs, - additionalResolvers, stitchingDirectivesTransformer, onSubgraphExecute, }), batch, onStitchingOptions(opts: any) { subschemas = opts.subschemas; - opts.typeDefs = [opts.typeDefs, additionalTypeDefs]; + const mergedTypeDefs = mergeTypeDefs([opts.typeDefs, additionalTypeDefs]); + visit(mergedTypeDefs, { + [Kind.FIELD_DEFINITION](field, _key, _parent, _path, ancestors) { + const fieldDirectives = getDirectiveExtensions<{ + resolveTo: YamlConfig.AdditionalStitchingResolverObject; + }>({ astNode: field }); + const resolveToDirectives = fieldDirectives?.resolveTo; + if (resolveToDirectives?.length) { + const targetTypeName = (ancestors[ancestors.length - 1] as ObjectTypeDefinitionNode) + .name.value; + const targetFieldName = field.name.value; + for (const resolveToDirective of resolveToDirectives) { + additionalResolvers.push( + resolveAdditionalResolversWithoutImport({ + targetTypeName, + targetFieldName, + ...resolveToDirective, + }), + ); + } + } + }, + }); + opts.typeDefs = mergedTypeDefs; opts.resolvers = additionalResolvers; }, - onSubgraphAST(name, subgraphAST) { + onSubgraphAST(_name, subgraphAST) { return visit(subgraphAST, { [Kind.OBJECT_TYPE_DEFINITION](node) { const typeName = node.name.value; @@ -140,7 +171,6 @@ export const handleFederationSupergraph: UnifiedGraphHandler = function ({ ...node, fields: node.fields.filter(fieldNode => { const fieldDirectives = getDirectiveExtensions({ astNode: fieldNode }); - const fieldName = fieldNode.name.value; const resolveToDirectives = fieldDirectives.resolveTo; if (resolveToDirectives?.length > 0) { additionalTypeDefs.push({ @@ -153,15 +183,6 @@ export const handleFederationSupergraph: UnifiedGraphHandler = function ({ }, ], }); - for (const resolveToDirective of resolveToDirectives) { - additionalResolvers.push( - resolveAdditionalResolversWithoutImport({ - targetTypeName: typeName, - targetFieldName: fieldName, - ...(resolveToDirective as any), - }), - ); - } } const additionalFieldDirectives = fieldDirectives.additionalField; if (additionalFieldDirectives?.length > 0) { diff --git a/packages/legacy/config/src/process.ts b/packages/legacy/config/src/process.ts index 3012f2525ce71..3b7440b1d40b5 100644 --- a/packages/legacy/config/src/process.ts +++ b/packages/legacy/config/src/process.ts @@ -468,9 +468,9 @@ export async function processConfig( scalar ResolveToSourceArgs directive @resolveTo( requiredSelectionSet: String - sourceName: String! - sourceTypeName: String! - sourceFieldName: String! + sourceName: String + sourceTypeName: String + sourceFieldName: String sourceSelectionSet: String sourceArgs: ResolveToSourceArgs keyField: String diff --git a/packages/legacy/utils/src/resolve-additional-resolvers.ts b/packages/legacy/utils/src/resolve-additional-resolvers.ts index a6ebd2c8c5f88..3fb81d7fd0178 100644 --- a/packages/legacy/utils/src/resolve-additional-resolvers.ts +++ b/packages/legacy/utils/src/resolve-additional-resolvers.ts @@ -11,6 +11,7 @@ import { getNamedType, isAbstractType, isInterfaceType, isObjectType, Kind } fro import lodashGet from 'lodash.get'; import toPath from 'lodash.topath'; import { process } from '@graphql-mesh/cross-helpers'; +import type { MeshContext } from '@graphql-mesh/runtime'; import { stringInterpolator } from '@graphql-mesh/string-interpolation'; import type { ImportFn, MeshPubSub, YamlConfig } from '@graphql-mesh/types'; import type { IResolvers } from '@graphql-tools/utils'; @@ -162,10 +163,10 @@ export function resolveAdditionalResolversWithoutImport( [additionalResolver.targetTypeName]: { [additionalResolver.targetFieldName]: { subscribe: withFilter( - (root, args, context, info) => { + (root, args, context: MeshContext, info) => { const resolverData = { root, args, context, info, env: process.env }; const topic = stringInterpolator.parse(additionalResolver.pubsubTopic, resolverData); - return (pubsub || context.pubsub).asyncIterator(topic) as AsyncIterableIterator; + return (pubsub || context.pubsub).asyncIterator(topic)[Symbol.asyncIterator](); }, (root, args, context, info) => { return additionalResolver.filterBy diff --git a/packages/serve-runtime/src/createGatewayRuntime.ts b/packages/serve-runtime/src/createGatewayRuntime.ts index 063ae028c3069..8b178af7e79bf 100644 --- a/packages/serve-runtime/src/createGatewayRuntime.ts +++ b/packages/serve-runtime/src/createGatewayRuntime.ts @@ -370,7 +370,6 @@ export function createGatewayRuntime = Reco subschemaConfig, transportEntryMap, additionalTypeDefs, - additionalResolvers, stitchingDirectivesTransformer, onSubgraphExecute, }); diff --git a/website/src/pages/v1/_meta.ts b/website/src/pages/v1/_meta.ts index c047ee2f661f8..4d1b3c621e48b 100644 --- a/website/src/pages/v1/_meta.ts +++ b/website/src/pages/v1/_meta.ts @@ -8,6 +8,7 @@ export default { 'response-caching': 'Response Caching', 'rate-limit': 'Rate Limit', auth: 'Authentication', + 'subscriptions-webhooks': 'Subscriptions & Webhooks', 'consume-in-other-gateways': 'Consume in Other Gateways', 'local-execution': 'Local Execution', 'migration-from-v0': 'Migration from Mesh v0', diff --git a/website/src/pages/v1/subscriptions-webhooks.mdx b/website/src/pages/v1/subscriptions-webhooks.mdx new file mode 100644 index 0000000000000..de5d0d416604d --- /dev/null +++ b/website/src/pages/v1/subscriptions-webhooks.mdx @@ -0,0 +1,88 @@ +--- +description: + Handle Webhooks with GraphQL Subscriptions in your unified schema using GraphQL Mesh. Use PubSub + implementation to bind webhook paths to specific topics and consume them as subscription root + fields. You can also generate GraphQL type definitions from sample JSON responses using the JSON + Schema Handler. +--- + +import { Callout } from '@theguild/components' + +# Handle Webhooks with GraphQL Subscriptions + +GraphQL Mesh can consume Webhooks as GraphQL Subscriptions in the unified schema by using the +built-in PubSub implementation + +## Add a new Subscription field + +You can use `additionalTypeDefs` and `resolveTo` directive to add subscription root fields to your +unified schema. + +If you subscribe a pubSub topic with `webhook:EXPECTED_METHOD:EXPECTED_PATH`, the server will +automatically bind that path to that topic. + +```ts filename="mesh.config.ts" +import { defineConfig as defineComposeConfig } from '@graphql-mesh/compose-cli' + +export const composeConfig = defineComposeConfig({ + additionalTypeDefs: /* GraphQL */ ` + # If you don't have Subscription type defined anywhere + # You have to extend subscription definition + extend schema { + subscription: Subscription + } + type Subscription { + todoAdded: Todo @resolveTo( + pubsubTopic: "webhook:post:/webhooks/todo_added" + # result: "data.someProp.someOtherProp", # You can get nested fields + # filterBy: "root.userId === args.userId" # You can filter the payload by `userId` for example + ) + } + ` +}) +``` + +We can use existing types from our unified schema, and this root field is subscribed to our specific +`topic` in our PubSub service. + +But you have to enable `webhooks` flag in Hive Gateway in this case; + +```ts filename="mesh.config.ts" +import { defineConfig as defineGatewayConfig } from '@graphql-hive/gateway' + +export const gatewayConfig = defineGatewayConfig({ + webhooks: true +}) +``` + +### Use JSON Schema Handler instead + +You can also use the JSON Schema handler if you don't want to write an extra GraphQL type +definition. You can generate GraphQL type definitions from sample JSON response; + +Just add the following to your existing JSON schema handler configuration in `.meshrc.yml` file; + +```ts filename="mesh.config.ts" +import { defineConfig as defineComposeConfig } from '@graphql-mesh/compose-cli' +import { loadJSONSchemaSubgraph } from '@omnigraph/json-schema' + +export const composeConfig = defineComposeConfig({ + subgraphs: [ + { + sourceHandler: loadJSONSchemaSubgraph('API', { + /* Other configuration options */ + operations: [ + /** Other operations */ + { + type: OperationTypeNode.SUBSCRIPTION, + field: 'todoAdded', + pubsubTopic: 'webhook:post:/webhooks/todo_added', + responseSample: './todo.json', + responseTypeName: 'Todo' + } + ] + }) + } + ] +}) +``` diff --git a/yarn.lock b/yarn.lock index fd495833de7d8..eaa86e84a4c9c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7181,6 +7181,7 @@ __metadata: "@graphql-tools/delegate": "npm:^10.0.26" "@graphql-tools/executor": "npm:^1.3.2" "@graphql-tools/federation": "npm:^2.2.17" + "@graphql-tools/merge": "npm:^9.0.8" "@graphql-tools/stitch": "npm:^9.2.15" "@graphql-tools/stitching-directives": "npm:^3.1.7" "@graphql-tools/utils": "npm:^10.5.5"