订阅(Subscriptions)
除了使用查询(queries)获取数据和使用变更(mutations)修改数据之外,GraphQL 规范还支持第三种操作类型,称为 subscription(订阅)。GraphQL 订阅是一种从服务器向选择监听实时消息的客户端推送数据的方式。订阅与查询类似,它们指定一组要传递给客户端的字段,但不是立即返回单个结果,而是打开一个通道,每当服务器上发生特定事件时,就会向客户端发送一个结果。
订阅的一个常见用例是通知客户端有关特定事件的信息,例如新对象的创建、字段更新等(在这里阅读更多内容)。
使用 Apollo 驱动启用订阅
要启用订阅,请将 installSubscriptionHandlers 属性设置为 true。
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
installSubscriptionHandlers: true,
}),警告
installSubscriptionHandlers 配置选项已从最新版本的 Apollo server 中移除,并且很快也会在本包中弃用。默认情况下,installSubscriptionHandlers 将回退使用 subscriptions-transport-ws(阅读更多),但我们强烈建议改用 graphql-ws(阅读更多)库。
要切换使用 graphql-ws 包,请使用以下配置:
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'graphql-ws': true
},
}),提示
你也可以同时使用两个包(subscriptions-transport-ws 和 graphql-ws),例如用于向后兼容。
代码优先
要使用代码优先方式创建订阅,我们使用 @Subscription() 装饰器(从 @nestjs/graphql 包导出)和 graphql-subscriptions 包中的 PubSub 类,该类提供了一个简单的 发布/订阅 API。
以下订阅处理程序通过调用 PubSub#asyncIterableIterator 来 订阅 事件。此方法接受一个参数 triggerName,对应于事件主题名称。
const pubSub = new PubSub();
@Resolver(() => Author)
export class AuthorResolver {
// ...
@Subscription(() => Comment)
commentAdded() {
return pubSub.asyncIterableIterator('commentAdded');
}
}提示
所有装饰器都从 @nestjs/graphql 包导出,而 PubSub 类从 graphql-subscriptions 包导出。
注意
PubSub 是一个暴露简单 publish 和 subscribe API 的类。在这里阅读更多内容。请注意,Apollo 文档警告默认实现不适用于生产环境(在这里阅读更多内容)。生产应用应使用由外部存储支持的 PubSub 实现(在这里阅读更多内容)。
这将在 SDL 中生成以下 GraphQL schema 部分:
type Subscription {
commentAdded(): Comment!
}请注意,订阅按定义返回一个对象,该对象具有单个顶级属性,其键是订阅的名称。该名称要么继承自订阅处理程序方法的名称(即上面的 commentAdded),要么通过将带有 name 键的选项作为第二个参数传递给 @Subscription() 装饰器来显式提供,如下所示。
@Subscription(() => Comment, {
name: 'commentAdded',
})
subscribeToCommentAdded() {
return pubSub.asyncIterableIterator('commentAdded');
}这种构造生成与前面代码示例相同的 SDL,但允许我们将方法名称与订阅解耦。
发布
现在,要发布事件,我们使用 PubSub#publish 方法。这通常在 mutation 中使用,以在对象图的一部分发生更改时触发客户端更新。例如:
// posts/posts.resolver.ts
@Mutation(() => Comment)
async addComment(
@Args('postId', { type: () => Int }) postId: number,
@Args('comment', { type: () => Comment }) comment: CommentInput,
) {
const newComment = this.commentsService.addComment({ id: postId, comment });
pubSub.publish('commentAdded', { commentAdded: newComment });
return newComment;
}PubSub#publish 方法接受 triggerName(同样,将其视为事件主题名称)作为第一个参数,事件负载作为第二个参数。如前所述,订阅按定义返回一个值,该值具有特定的形状。再看一下我们 commentAdded 订阅生成的 SDL:
type Subscription {
commentAdded(): Comment!
}这告诉我们订阅必须返回一个对象,该对象具有名为 commentAdded 的顶级属性,其值是一个 Comment 对象。需要注意的重要一点是,PubSub#publish 方法发出的事件负载的形状必须与订阅期望返回的值的形状相对应。因此,在上面的示例中,pubSub.publish('commentAdded', { commentAdded: newComment }) 语句发布了一个具有适当形状负载的 commentAdded 事件。如果这些形状不匹配,你的订阅将在 GraphQL 验证阶段失败。
过滤订阅
要过滤特定事件,请将 filter 属性设置为过滤函数。此函数的行为类似于传递给数组 filter 的函数。它接受两个参数:payload 包含事件负载(由事件发布者发送),variables 接收订阅请求期间传入的任何参数。它返回一个布尔值,确定是否应将此事件发布给客户端监听者。
@Subscription(() => Comment, {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded(@Args('title') title: string) {
return pubSub.asyncIterableIterator('commentAdded');
}修改订阅负载
要修改已发布的事件负载,请将 resolve 属性设置为一个函数。该函数接收事件负载(由事件发布者发送)并返回适当的值。
@Subscription(() => Comment, {
resolve: value => value,
})
commentAdded() {
return pubSub.asyncIterableIterator('commentAdded');
}注意
如果你使用 resolve 选项,你应该返回解包后的负载(例如,在我们的示例中,直接返回 newComment 对象,而不是 { commentAdded: newComment } 对象)。
如果你需要访问注入的 provider(例如,使用外部服务来验证数据),请使用以下构造。
@Subscription(() => Comment, {
resolve(this: AuthorResolver, value) {
// "this" 指向 "AuthorResolver" 的实例
return value;
}
})
commentAdded() {
return pubSub.asyncIterableIterator('commentAdded');
}同样的构造也适用于过滤器:
@Subscription(() => Comment, {
filter(this: AuthorResolver, payload, variables) {
// "this" 指向 "AuthorResolver" 的实例
return payload.commentAdded.title === variables.title;
}
})
commentAdded() {
return pubSub.asyncIterableIterator('commentAdded');
}Schema 优先
要在 Nest 中创建等效的订阅,我们将使用 @Subscription() 装饰器。
const pubSub = new PubSub();
@Resolver('Author')
export class AuthorResolver {
// ...
@Subscription()
commentAdded() {
return pubSub.asyncIterableIterator('commentAdded');
}
}要根据上下文和参数过滤特定事件,请设置 filter 属性。
@Subscription('commentAdded', {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded() {
return pubSub.asyncIterableIterator('commentAdded');
}要修改已发布的负载,我们可以使用 resolve 函数。
@Subscription('commentAdded', {
resolve: value => value,
})
commentAdded() {
return pubSub.asyncIterableIterator('commentAdded');
}如果你需要访问注入的 provider(例如,使用外部服务来验证数据),请使用以下构造:
@Subscription('commentAdded', {
resolve(this: AuthorResolver, value) {
// "this" 指向 "AuthorResolver" 的实例
return value;
}
})
commentAdded() {
return pubSub.asyncIterableIterator('commentAdded');
}同样的构造也适用于过滤器:
@Subscription('commentAdded', {
filter(this: AuthorResolver, payload, variables) {
// "this" 指向 "AuthorResolver" 的实例
return payload.commentAdded.title === variables.title;
}
})
commentAdded() {
return pubSub.asyncIterableIterator('commentAdded');
}最后一步是更新类型定义文件。
type Author {
id: Int!
firstName: String
lastName: String
posts: [Post]
}
type Post {
id: Int!
title: String
votes: Int
}
type Query {
author(id: Int!): Author
}
type Comment {
id: String
content: String
}
type Subscription {
commentAdded(title: String!): Comment
}这样,我们就创建了一个 commentAdded(title: String!): Comment 订阅。你可以在这里找到完整的示例实现。
PubSub
我们在上面实例化了一个本地的 PubSub 实例。首选的方法是将 PubSub 定义为一个 provider 并通过构造函数注入它(使用 @Inject() 装饰器)。这允许我们在整个应用中重用该实例。例如,定义一个如下的 provider,然后在需要的地方注入 'PUB_SUB'。
{
provide: 'PUB_SUB',
useValue: new PubSub(),
}自定义订阅服务器
要自定义订阅服务器(例如,更改路径),请使用 subscriptions 选项属性。
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'subscriptions-transport-ws': {
path: '/graphql'
},
}
}),如果你使用 graphql-ws 包进行订阅,请将 subscriptions-transport-ws 键替换为 graphql-ws,如下所示:
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'graphql-ws': {
path: '/graphql'
},
}
}),通过 WebSocket 进行身份验证
可以在 subscriptions 选项中指定的 onConnect 回调函数中检查用户是否已通过身份验证。
onConnect 将接收传递给 SubscriptionClient 的 connectionParams 作为第一个参数(阅读更多)。
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'subscriptions-transport-ws': {
onConnect: (connectionParams) => {
const authToken = connectionParams.authToken;
if (!isValid(authToken)) {
throw new Error('Token is not valid');
}
// 从 token 中提取用户信息
const user = parseToken(authToken);
// 返回用户信息,以便稍后将其添加到上下文中
return { user };
},
}
},
context: ({ connection }) => {
// connection.context 将等于 "onConnect" 回调返回的内容
},
}),此示例中的 authToken 仅在连接首次建立时由客户端发送一次。使用此连接进行的所有订阅将具有相同的 authToken,因此具有相同的用户信息。
注意
subscriptions-transport-ws 中存在一个 bug,允许连接跳过 onConnect 阶段(阅读更多)。你不应假设在用户启动订阅时已调用 onConnect,应始终检查 context 是否已填充。
如果你使用 graphql-ws 包,onConnect 回调的签名会略有不同:
GraphQLModule.forRoot<ApolloDriverConfig>({
driver: ApolloDriver,
subscriptions: {
'graphql-ws': {
onConnect: (context: Context<any>) => {
const { connectionParams, extra } = context;
// 用户验证与上面的示例相同
// 使用 graphql-ws 时,额外的上下文值应存储在 extra 字段中
extra.user = { user: {} };
},
},
},
context: ({ extra }) => {
// 你现在可以通过 extra 字段访问额外的上下文值
},
});使用 Mercurius 驱动启用订阅
要启用订阅,请将 subscription 属性设置为 true。
GraphQLModule.forRoot<MercuriusDriverConfig>({
driver: MercuriusDriver,
subscription: true,
}),提示
你还可以传递选项对象来设置自定义 emitter、验证传入连接等。在这里阅读更多内容(参见 subscription)。
代码优先
要使用代码优先方式创建订阅,我们使用 @Subscription() 装饰器(从 @nestjs/graphql 包导出)和 mercurius 包中的 PubSub 类,该类提供了一个简单的 发布/订阅 API。
以下订阅处理程序通过调用 PubSub#asyncIterableIterator 来 订阅 事件。此方法接受一个参数 triggerName,对应于事件主题名称。
@Resolver(() => Author)
export class AuthorResolver {
// ...
@Subscription(() => Comment)
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}
}提示
上面示例中使用的所有装饰器都从 @nestjs/graphql 包导出,而 PubSub 类从 mercurius 包导出。
注意
PubSub 是一个暴露简单 publish 和 subscribe API 的类。查看这个章节了解如何注册自定义 PubSub 类。
这将在 SDL 中生成以下 GraphQL schema 部分:
type Subscription {
commentAdded(): Comment!
}请注意,订阅按定义返回一个对象,该对象具有单个顶级属性,其键是订阅的名称。该名称要么继承自订阅处理程序方法的名称(即上面的 commentAdded),要么通过将带有 name 键的选项作为第二个参数传递给 @Subscription() 装饰器来显式提供,如下所示。
@Subscription(() => Comment, {
name: 'commentAdded',
})
subscribeToCommentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}这种构造生成与前面代码示例相同的 SDL,但允许我们将方法名称与订阅解耦。
发布
现在,要发布事件,我们使用 PubSub#publish 方法。这通常在 mutation 中使用,以在对象图的一部分发生更改时触发客户端更新。例如:
// posts/posts.resolver.ts
@Mutation(() => Comment)
async addComment(
@Args('postId', { type: () => Int }) postId: number,
@Args('comment', { type: () => Comment }) comment: CommentInput,
@Context('pubsub') pubSub: PubSub,
) {
const newComment = this.commentsService.addComment({ id: postId, comment });
await pubSub.publish({
topic: 'commentAdded',
payload: {
commentAdded: newComment
}
});
return newComment;
}如前所述,订阅按定义返回一个值,该值具有特定的形状。再看一下我们 commentAdded 订阅生成的 SDL:
type Subscription {
commentAdded(): Comment!
}这告诉我们订阅必须返回一个对象,该对象具有名为 commentAdded 的顶级属性,其值是一个 Comment 对象。需要注意的重要一点是,PubSub#publish 方法发出的事件负载的形状必须与订阅期望返回的值的形状相对应。因此,在上面的示例中,pubSub.publish({ topic: 'commentAdded', payload: { commentAdded: newComment } }) 语句发布了一个具有适当形状负载的 commentAdded 事件。如果这些形状不匹配,你的订阅将在 GraphQL 验证阶段失败。
过滤订阅
要过滤特定事件,请将 filter 属性设置为过滤函数。此函数的行为类似于传递给数组 filter 的函数。它接受两个参数:payload 包含事件负载(由事件发布者发送),variables 接收订阅请求期间传入的任何参数。它返回一个布尔值,确定是否应将此事件发布给客户端监听者。
@Subscription(() => Comment, {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded(@Args('title') title: string, @Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}如果你需要访问注入的 provider(例如,使用外部服务来验证数据),请使用以下构造。
@Subscription(() => Comment, {
filter(this: AuthorResolver, payload, variables) {
// "this" 指向 "AuthorResolver" 的实例
return payload.commentAdded.title === variables.title;
}
})
commentAdded(@Args('title') title: string, @Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}Schema 优先
要在 Nest 中创建等效的订阅,我们将使用 @Subscription() 装饰器。
const pubSub = new PubSub();
@Resolver('Author')
export class AuthorResolver {
// ...
@Subscription()
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}
}要根据上下文和参数过滤特定事件,请设置 filter 属性。
@Subscription('commentAdded', {
filter: (payload, variables) =>
payload.commentAdded.title === variables.title,
})
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}如果你需要访问注入的 provider(例如,使用外部服务来验证数据),请使用以下构造:
@Subscription('commentAdded', {
filter(this: AuthorResolver, payload, variables) {
// "this" 指向 "AuthorResolver" 的实例
return payload.commentAdded.title === variables.title;
}
})
commentAdded(@Context('pubsub') pubSub: PubSub) {
return pubSub.subscribe('commentAdded');
}最后一步是更新类型定义文件。
type Author {
id: Int!
firstName: String
lastName: String
posts: [Post]
}
type Post {
id: Int!
title: String
votes: Int
}
type Query {
author(id: Int!): Author
}
type Comment {
id: String
content: String
}
type Subscription {
commentAdded(title: String!): Comment
}这样,我们就创建了一个 commentAdded(title: String!): Comment 订阅。
PubSub
在上面的示例中,我们使用了默认的 PubSub emitter(mqemitter)。首选的方法(用于生产环境)是使用 mqemitter-redis。或者,可以提供自定义的 PubSub 实现(在这里阅读更多内容)。
GraphQLModule.forRoot<MercuriusDriverConfig>({
driver: MercuriusDriver,
subscription: {
emitter: require('mqemitter-redis')({
port: 6579,
host: '127.0.0.1',
}),
},
});通过 WebSocket 进行身份验证
可以在 subscription 选项中指定的 verifyClient 回调函数中检查用户是否已通过身份验证。
verifyClient 将接收 info 对象作为第一个参数,你可以使用它来获取请求的 headers。
GraphQLModule.forRoot<MercuriusDriverConfig>({
driver: MercuriusDriver,
subscription: {
verifyClient: (info, next) => {
const authorization = info.req.headers?.authorization as string;
if (!authorization?.startsWith('Bearer ')) {
return next(false);
}
next(true);
},
}
}),