cover_image

从 GraphQL N+1 问题到 DataLoader 源码梳理

穹心 大淘宝前端技术
2021年09月28日 10:30

前言

在开始阅读本文前,请确保你对 GraphQL 至少有一定了解,比如 GraphQL Schema 基础语法、Resolver、Object Type等,而对于本文主角的 GraphQL N+1 问题,如果你还未曾接触到反而不要紧,在开头部分,我会详细介绍 N+1 问题产生的原因,然后才是后面的解决方案与 DataLoader 源码梳理。

作为 RESTFul API 的竞品,GraphQL有着自己无可取代的优势,如强类型、版本兼容性、按需返回、查询图式(嵌套)数据时的简洁等,也有着不少为人诟病之处,如作为本文主角的 N+1 问题、缓存需要使用特殊算法(犹记得 Apollo Client(一个专用于 GraphQL 请求的前端 Client) 的缓存功能,其开发时间是以年计的)等,对于其中心化特性导致的集群架构需求更是不必多说。

篇幅所限,本文不会涉及除了 N+1 问题以外的优缺点(或许以后有机会我会给每个优缺点安排一篇文章也说不定)。

N+1 问题的产生

假设现在有这么一个场景,,我们要拿到所有用户的所有宠物信息,如果使用 RESTFul API 的话,通常要先拿到所有用户的ID,再去宠物的接口拿到所有宠物的信息:

GET /users
[
{
"id": 1,
"name": "aaa",
"petsId": [1, 2]
},
{
"id": 2,
"name": "bbb",
"petsId": [2, 3, 4]
}
]

GET /pet/:id
[
{
"id": 1,
"kind": "Cat"
},
{
"id": 2,
"kind": "Dog"
}
// ...
]

别问为什么宠物 ID 还有一样的,万一以后有共享宠物呢。

这样的话,如果有 N 个用户,我们就需要查询 N+1 次接口,数据库 I/O 同样是 N+1 次。

如果 /users 接口返回的 user 信息不包含 petsId 字段,那么甚至还要再查询N遍 /user/:id 接口,来拿到用户个体的详细信息,整体的查询次数与数据库 I/O 次数就达到了 2N+1 次。

而对于GraphQL来说,查询语句通常是这样的:

query {
fetchAllUsers {
id
name
pets {
id
kind
age
isMale
}
}
}

看起来查询次数只有一次对吧?那么数据库 I/O 次数呢?看起来好像也只有一次?因为数据是一起查回来的?实际上这里的数据库 I/O 次数达到了N+1次。

具体的 demo 我们会在下面给出,先来简单介绍下上面的 GraphQL语句 基本含义:

  • query 意味着本次查询是 只读 的,这是GraphQL中的 操作类型 概念,其他的操作类型还有 mutationsubscription,分别代表着 读写操作(可以想象为 RESTFul 中的 POST / PUT / Delete 等带方法) 与 订阅操作(可以理解为WebSocket,实际上 subscription 操作也需要通过 WebSocket 承接)。
  • fetchAllUsers 是一个对象类型,它的内部包含了 id name 这些基础属性,以及 pets 这一对象属性,再进入到pets中,它拥有的都是基础属性。 在一个大型 GraphQL API 中,对象属性的嵌套可能会达到数十层甚至更多(这是 GraphQL 的特色之一,但往往也可能带来严重的性能问题,所以通常会限制嵌套的深度,抛弃掉超出一定深度的请求)。
  • REST API中我们通常有 Controller 的概念,来针对路由级别做出响应。 对应的,GraphQL中也有 Resolver 的概念,但是它针对的则是对象类型(ObjectType)。 也就是说,每一个对象类型都会有自己的解析器,比如上面的 fetchAllUsers  和  pets 实际上分别是 User 类型和 Pet 类型的解析器。 你可以理解为 Resolver 就是针对对象类型的函数

现在假设我们有一百个用户, 调用 fetchAllUsers 的解析器会返回这一百个用户的数组, 接着 User类型内还存在着对象属性pets, 那么我们还需要调用 pets 对应的解析器次数是?

理想情况当然是1次, 因为我们可以在第一次查询中拿到所有用户的信息, 那么只要用所有的宠物ID再查一次,不就能得到所有的宠物信息了吗?比如getPetlistByIds

但实际情况中, GraphQL会首先执行用户1的 pets 解析器, 得到用户1的宠物信息, 然后执行用户2, ..., 一共100次, 加上查询所有用户的那1次, 这就是GraphQL的 N+1 问题。

在这个示例里,如果抛开 N+1 的问题不论,你会发现 GraphQL 在查询嵌套的对象数据时确实有无可比拟的优势:简洁、灵活、可控。比如我们来了个神奇的需求,查询 所有用户的亲密度高的男性好友的最近半年内的点赞,在 GraphQL 中会是这样的:

query MagicQuery {
FetchAllUser {
Friend(male: true,intimacy: high){
Like(dateRange: half-year){
...
}
}
}
}

这还只是两层嵌套,想想如果用 REST API 来做,得多麻烦?要么后端通过参数控制和复杂的逻辑判断(不太可能实际上),要么前端把所有数据拿回来后自己裁剪过滤处理。无论哪种都不是特别双赢的解决方案。

当然,你可能会说可以在前后端之间加个BFF(Backend For Frontend,可以理解为专为前端舒服实现的中间胶水层),去后端拿到数据,拼接成前端想要的样子...,不对,这样子好像只是把数据处理的逻辑换了个地方,实际的复杂度还在。那么为啥不用 GraphQL 来做这个 BFF 呢?尤其是在存在多个端消费方式不同,而 REST API 只有一个的情况下?

恭喜,你已经想到了 FaceBook 当年开发 GraphQL 时正面临着什么情景,如果你有兴趣了解更多,GitHub 和 YouTube 上可以找到许多关于它诞生历程的介绍的有趣故事。

再次回到本文的整体,弄清楚了 GraphQL 的 N+1 问题究竟是如何导致的以后,现在我们可以来复现例子,并着手解决它了。

实际示例

本文的所有代码均存放在GitHub,你可以在 DataLoader-Source-Explore 这个仓库中查看。为了节省本文的篇幅,在示例中复现 N+1 问题以及 DataLoader 的具体使用请参见仓库的 README 部分。

在这个示例中,我用 Apollo-Server 写了一个简单的 GraphQL Server,数据直接来自于本地。 如果你想建立一个完善的GraphQL Server, 推荐使用 TypeGraphQL 以及 Apollo-GraphQL 的开源项目(包括了前后端、网关、监控功能), 也可参考我很久之前写的这个大demo: GraphQL-Explorer-Server

注意,Apollo Server V3版本存在一些 Breaking Change,因此在上手示例时,不要升级版本。

针对于上面的 User 和 Pet,我们的GraphQL Schema 是这样的:

type Query {
  fetchAllUsers: [User]
  fetchUserByName(name: String!): User
}

type User {
  id: Int!
  name: String!
  partner: User
  pets: [Pet]
}

type Pet {
  id: Int!
  kind: String!
  age: Int!
  isMale: Boolean!
}

你会发现即使你现在处于零基础状态,也能大致看懂这个 Schema 代表了怎样的数据结构。

DataLoader 源码梳理

DataLoader 的仓库 README 讲述了它的来源与历史,不夸张的说,除了练手项目以及性能未出现不足的小规模项目以外,只要是正式使用的 GraphQL Server 几乎都有 DataLoader 的身影,但也有例外,比如使用了Hasura 与 PostGraphile 这一类直接接管数据库的方案.

DataLoader 原本是用 Flow(GraphQL也是,这是 FaceBook 出品的 JavaScript 类型工具,对标 TypeScript)书写的,这里为了阅读方便我把它修改成了TypeScript版本,见dataLoader.ts。

下面就正式进入源码阅读环节,DataLoader的核心功能主要有 BatchCache,在上一版文章中,我去掉了 Cache 相关的功能实现而只关注于 Batch 能力,但发现实际上这部分逻辑是非常重要的,Batch 的部分执行流程中也涉及到了 Cache 的处理。因此,在这一版文章中我不会再去掉哪一部分(但实际上简化后的 Cache 代码也就几十行)。

鲁迅曾经说过,阅读 TypeScript 代码最好的方式就是从类型声明开始。

DataLoader的构造函数签名是这样的:

class DataLoader<K,V,C = K> {
  constructor(batchLoadFn: BatchLoadFn<K,V>,options?: Options<K,V,C>) {
    // ...
  }
  //...
}

首先是batchLoadFn,这是我们传入的批处理函数。简单地说,就是一个能根据一组ID拿到一组对应数据的函数

看到这里你可能就已经明白 DataLoader 的解决思路了,全文完。或者在最开始我们讲到造成 GraphQL N+1 问题的根源你可能就明白了,只要收集一批需要查询的数据ID,在最后一起解析不就行了吗。这正是 DataLoader 背后的原理, 所以我们才需要在实例化时传入一个批查询函数。你可能会说,就这?但实际上 DataLoader 内部还做了许多工作,才实现 “最后一起解析” 看上去不起眼的功能,比如:

  • 怎么划定一批数据是需要一起处理的?
  • 怎么实现最后一起解析?
  • 怎么确保在批处理完毕后,每一次 loader.loadloader.loadMany能获取到对应的数据?
  • 如果有传入了重复 ID 的 load 调用,应该如何处理?
  • ...

实例化 DataLoader,比如这个是供 User 使用的:

// 实例化
const userLoader = new DataLoader(async (userIds: Readonly<number[]>) => {
  // 批处理函数的查询结果
  const users = await mockService.getUsersByIds(userIds as number[]);
  return users.sort(
    (prev, curr) => userIds.indexOf(prev.id) - userIds.indexOf(curr.id)
  );
// 实际使用
userLoader.load(1)
userLoader.load(2)  
userLoader.load(3)
// ...

注意点:

  • 在完成实例化后,DataLoader实例上主要有 load 和 loadMany这两个方法,对于这里的 User.partner(一对一关系),应当使用 load 方法,对于 User.pets(一对多),应当使用 loadMany 方法。
  • 确保入参和返回值的顺序是一一对应的,并且入参的数量需要和返回的结果数量相同。

除了批处理函数以外,还可以传入选项:

export type Options<K, V, C = K> = {
  batch?: boolean;
  maxBatchSize?: number;
  batchScheduleFn?: (callback: () => void) => void;
  cache?: boolean;
  cacheKeyFn?: (key: K) => C;
  cacheMap?: CacheMap<C, Promise<V>> | null;
};

分别解释下这些属性:

  • batch:是否启用 Batch 能力,这一选项使得你可以手动控制当前的 DataLoader 实例的功能。

  • maxBatchSize:在同一批(后文用 batch 代替,更接近源码意义)内最多同时处理数据的数量,默认使用 Infinity 值。

  • batchScheduleFn:负责调度 batch 的函数,你可以理解为这个函数决定了怎么确定 batch 的范围,即怎样认为这个 batch 是开始了,而怎样则说明这个 batch 结束,需要返回数据了。

  • cache:是否启用 Cache 能力,这里需要重点说明,这一属性并非指通常意义上的 cache,即并非 DataLoader 实例在 load、loadMany方法被调用时,缓存传入的 ID并在后续 ID 相同时直接返回。在上一版文章中有同学就提了这个问题,疑惑为什么配置启用cache,但后续调用 loader.load 方法时仍然请求数据。实际上这里的 cache 指的是,在同一 batch 内,如果 load 方法被传入同一 ID,后续传入的 ID 会被认为与之前的一致,直接返回相同的结果。在后续源码讲解还会有详细介绍。

  • cacheKeyFn:从 cache 的讲解中我们知道它的 cache 功能并不是平时我们理解的那样,但它仍然具有 缓存 的能力。而要实现缓存,就需要有对应的 key 来进行识别。这一函数就是用于转换你传入的 key(ID),来作为最终的缓存标识。如果不传入,则会直接使用 key。

  • cacheMap:缓存既然有 key 就肯定会有 value,只不过这里的 value 指的不是执行完毕的返回值,而是“待执行的值”。我们前面说过,收集一批ID,然后最后用批处理函数一次性读取完毕再对应的返回结果,你可以理解为这里的 cacheMap 就是 键:ID,值:待返回的结果  形式的对象关系。

    默认情况下,会使用一个原生的 Map。如果需要传入自己的 cacheMap,则需要实现 get / set / delete / clear 方法。

然后是内部属性,其实就是对实例化时传入的值的处理:

class DataLoader {
  _batchLoadFn: BatchLoadFn<K, V>;
  _batchScheduleFn: (fn: () => void) => void;
  _maxBatchSize: number;
  _cacheKeyFn: (key: K) => C;
  _cacheMap: CacheMap<C, Promise<V>> | null;
  _batch: Batch<K, V> | null;
}

一个个来看:

this._batchLoadFn = batchLoadFn;
this._batchScheduleFn = getValidBatchScheduleFn(options);

function getValidBatchScheduleFn(
  options?: Options<anyanyany>
): (fn: () => void) => void 
{
  let batchScheduleFn = options && options.batchScheduleFn;
  if (batchScheduleFn === undefined) {
    return enqueuePostPromiseJob;
  }
  return batchScheduleFn;
}

注意这里,如果没有传入自定义的调度函数,则会使用这个神奇的 enqueuePostPromiseJob!结合上面说到调度函数的作用:决定一个 batch 的起止。再来看看这个神奇的家伙:

let enqueuePostPromiseJob =
  typeof process === "object" && typeof process.nextTick === "function"
    ? function (fn: Function{
        if (!resolvedPromise) {
          resolvedPromise = Promise.resolve();
        }
        resolvedPromise.then(() => {
          process.nextTick(fn);
        });
      }
    : setImmediate || setTimeout;

let resolvedPromise: Promise<void>;

先梳理下流程,如果在 Node 环境下 (通过判断 process.nextTick),则使用一个特殊的函数(称为 executor),executor接受一个函数作为参数(称为 handler)。executor内部会确保 handler 在一个 Promise 的 then 方法块中执行(通过在全局缓存一个工具人 Promise,要是用过了(resolved掉了)就重新赋值为一个 resolve 的Promise)。即:

Promise.resolve().then(()=>{
  process.nextTick(fn)
})

如果不在 Node 环境下,则使用 setImmediate 或者 setTimeout。

  • 只有最新版的IE实现了setImmediate,Gecko 和 Webkit 内核都没有实现此API。

    图片
    mdn
  • 可以使用setTimeout(fn,0)模拟setImmediate,详见 setImmediate

  • 使用 setTimeout 作为调度函数相当于:

    setTimeout(fn)

你可能会疑惑,这里一通操作是为了啥?先看看这个名字:enqueuePostPromiseJob,光是从字面意义上可以确认,是为了确保一个操作能够在 Promise 以后执行,再联想到 DataLoader 的执行关键词:收集一批、最后处理... 我想你应该明白了:DataLoader 将会收集发生在同一帧内的 load / loadMany 调用,将这些收集到的 key存储下来,由批处理函数统一完成数据获取,通过使用 enqueuePostPromiseJob 来入队一个一定会在 Promise 任务(也就是常说的微任务)后执行的任务

简单的复习下事件循环的概念:当前执行栈空闲时,执行所有微任务(Promise)队列中的事件,然后执行一个宏任务(setTimeout / setImmediate)。由于微任务永远在宏任务之前执行,所以这里就通过入队一个宏任务来确保能收集到所有 load / loadMany 调用。

而在 NodeJS 中,其事件循环机制由 Libuv 引擎决定,关于其具体的事件循环顺序、执行机制我们这里不做讲解。我们主要关注 process.nextTick 这一方法,在 NodeJS 中 process.nextTick方法拥有自己专用的任务队列,就叫它 nextTick Queue 吧。在通常意义上的 NodeJS 事件循环中的各个阶段并不包括它,然而它的执行时机就在各个阶段之间,在 NodeJS 事件循环即将进入下一阶段前会检查 nextTick Queue 中是否有留存的任务,如果有,则会执行其中的所有任务。

你可以在 NodeJS 源码中看到许多使用 process.nextTick 来延迟任务执行的例子,如 addCatch 以及 setupListenHandle。对于后面一个例子,这里稍微做一点展开,考虑这样一个例子:

import net from "net";

const server = net.createServer(() => {}).listen(8080);

server.on("listening"() => {});

可以看到我们在注册 listening 事件时,server 已经创建完毕并绑定到端口,如果事件回调立刻执行,此时 callback 还未注册(发生在 listen 后),这样的效果不是我们预期的,因此这里使用了  process.nextTick  来确保事件在回调注册完毕之后才触发。

这一部分实际上就是 DataLoader 最核心的功能实现,也是最精华的部分,因此如果你赶时间,其实可以到这里了(不准,继续看完)。

maxBatchSize:

this._maxBatchSize = getValidMaxBatchSize(options);

function getValidMaxBatchSize(options?: Options<anyanyany>): number {
  let shouldBatch = !options || options.batch !== false;
  if (!shouldBatch) {
    return 1;
  }
  let maxBatchSize = options && options.maxBatchSize;
  if (maxBatchSize === undefined) {
    return Infinity;
  }
  return maxBatchSize;
}

做了简单的检查和赋默认值,注意这里如果设置 batch 为false,即关闭 batch 功能,则 maxBatchSize 会被置为1,也就意味着会在 load 被调用一次时立刻就调用批查询函数,而不会再等收集一批。

this._cacheKeyFn = getValidCacheKeyFn(options);
this._cacheMap = getValidCacheMap(options);

function getValidCacheKeyFn<KC>(options?: Options<K, any, C>): (arg: K) => C {
  let cacheKeyFn = options && options.cacheKeyFn;
  if (cacheKeyFn === undefined) {
    return (key) => key;
  }
  return cacheKeyFn;
}



function getValidCacheMap<KVC>(
  options?: Options<K, V, C>
): CacheMap<CPromise<V>> | null 
{
  let shouldCache = !options || options.cache !== false;
  if (!shouldCache) {
    return null;
  }
  let cacheMap = options && options.cacheMap;
  if (cacheMap === undefined) {
    return new Map();
  }
  if (cacheMap !== null) {
    let cacheFunctions = ["get""set""delete""clear"];
    let missingFunctions = cacheFunctions.filter(
      (fnName) => cacheMap && typeof cacheMap[fnName] !== "function"
    );
  }
  return cacheMap;
}

这里的就比较简单了,默认使用 (key) => key 作为缓存 key 的处理函数,以及默认使用 new Map() 作为缓存的存储。

我们注意到这里有个特别的实例属性,看看它的类型定义:

_batch: Batch<K,V> | null;

type Batch<K, V> = {
  hasDispatched: boolean;
  keys: Array<K>;
  callbacks: Array<{
    resolve: (value: V) => void;
    reject: (error: Error) => void;
  }>;
  cacheHits?: Array<() => void>;
};

关于它的属性,由于比较特殊无法独立讲解,请参看后面。不过你也猜猜他们都是啥意思,比如 hasDispatched 看起来像是标记当前的 batch 是否已派发、keys 看起来像是收集到的key...

整理一下上面的内容:

  • _batchLoadFn:直接使用传入的批查询函数。
  • _batchScheduleFn:一个一定会在 Promise 后执行的调度函数,NodeJS 下使用 process.nextTick,浏览器中使用 setImmediate  / setTimeout
  • _maxBatchSize:默认为无限大。如果禁用 Batch 功能,则为 1。
  • _cacheKeyFn:缓存 Key 的转换函数
  • _cacheMap:缓存的记录
  • _batch:详见下文

属性分析就到这里,看起来好像没有什么了?接下来我们就该直接去看调用 .load 方法时都发生了什么了。

load(key: K): Promise<V> {
 let batch = getCurrentBatch(this);
}
function getCurrentBatch<KV>(loader: DataLoader<K, V, any>): Batch<KV{
  let existingBatch = loader._batch;

  if (
    existingBatch !== null &&
    !existingBatch.hasDispatched &&
    existingBatch.keys.length < loader._maxBatchSize &&
    (!existingBatch.cacheHits ||
      existingBatch.cacheHits.length < loader._maxBatchSize)
  ) {
    return existingBatch;
  }

  let newBatch = { hasDispatched: false, keys: [], callbacks: [] };

  loader._batch = newBatch;

  loader._batchScheduleFn(() => {
    dispatchBatch(loader, newBatch);
  });

  return newBatch;
}

现在可以来讲讲 batch 到底是个啥了,看 getCurrentBatch 方法:

  • 先获取到当前的 batch
  • 如果满足这些条件,就使用当前的batch
    • 当前 batch 不为 null
    • 当前 batch 未派发
    • 当前已收集的 keys 未超过最大并发量
    • 不存在 cacheHits 或 存在的 cacheHits 未超过最大并发量,这是个啥?我们后面再讲
  • 否则,就很直接的创建一个新的 batch 返回,覆盖当前实例的 _batch 属性
  • 【重要】调用调度函数,入队一个 dispatchBatch(loader,newBatch) 函数
  • 关于 dispatchBatch,我们后面再讲。
  • 总结:getCurrentBatch 会返回当前的 batch,或创建一个新的 batch 返回。

剩下部分其实就可以放一起看了,直接看 load 方法的完整版:

load(key: K): Promise<V> {
 
    let batch = getCurrentBatch(this);

    let cacheMap = this._cacheMap;

    let cacheKey = this._cacheKeyFn(key);

    if (cacheMap) {
     
      let cachedPromise = cacheMap.get(cacheKey);

      if (cachedPromise) {
        let cacheHits = batch.cacheHits || (batch.cacheHits = []);
        return new Promise((resolve) => {
          cacheHits.push(() => {
            resolve(cachedPromise as V | PromiseLike<V>);
          });
        });
      }
    }

    batch.keys.push(key);
    const promise: Promise<V> = new Promise((resolve, reject) => {
      batch.callbacks.push({ resolve, reject });
    });


    if (cacheMap) {
      cacheMap.set(cacheKey, promise);
    }

    return promise;
  }

梳理下逻辑:

  • 获取到缓存的记录、以及当前 key 转换后值,查看这个转换后的值是否已经存在于记录中。
    • 如果还没?兴高采烈的将这个新的 key 存放进 batch.keys 中,同时实例化一个新的Promise,将它的 resolve、reject 参数存放进 batch.callbacks 中。最后将这个 key 缓存起来,然后 返回这个Promise
    • 如果已经有了,那也没事,同样实例化一个新的Promise,并将 resolve 存放在 batch.cacheHits 中,同样返回这个Promise

这里可能有点让人摸不着头脑:

  • 为什么要实例化一个 Promise,把 resolve、reject 存放为回调?
  • cacheHits又是用来干什么的?

结合 Batch 的类型定义来回答:

type Batch<K, V> = {
  hasDispatched: boolean;
  keys: Array<K>;
  callbacks: Array<{
    resolve: (value: V) => void;
    reject: (error: Error) => void;
  }>;
  cacheHits?: Array<() => void>;
};
  • 把 resolve、reject 存放为回调,还是因为我们最开始提的一个问题:怎么确保在批处理完毕后,每一次 loader.loadloader.loadMany能获取到对应的数据?,注意这里 load 方法返回的 Promise,如果我们在批查询函数返回结果后,去 callbacks 里拿到所有注册的 resolve,依次调用,就相当于 load 方法返回的 Promise 状态变为 resolved!所以就确保了实际使用调用 load 方法的地方能够获取到数据。

  • 注意我们在遇到一个新的 key 时,除了注册回调,还会将 Promise 注册到缓存记录中。后面如果再次遇到这个 key(也就是 load 方法被以相同的参数调用),就能够在缓存记录中查询到,此时返回的 Promise 的 resolve 方法由 cacheHits 接管,但是需要注意的是,这里传入的一个函数,在调用这个函数时才会 resolve 掉这个Promise,并且,resolve的参数是前面已经缓存的 Promise。

    具体逻辑还要看后面的解析。

我们再回到 getCurrentBatch 方法中的 dispatchBatch,它就是最后让这个 batch 执行的推动者:

function dispatchBatch<KV>(
  loader: DataLoader<K, V, any>,
  batch: Batch<K, V>
{
  batch.hasDispatched = true;

  if (batch.keys.length === 0) {
    resolveCacheHits(batch);
    return;
  }

  let batchPromise = loader._batchLoadFn(batch.keys);

  batchPromise
    .then((values) => {
      resolveCacheHits(batch);

      for (let i = 0; i < batch.callbacks.length; i++) {
        let value = values[i];
        if (value instanceof Error) {
          batch.callbacks[i].reject(value);
        } else {
          console.log(`${i} ${JSON.stringify(value)}`);
          batch.callbacks[i].resolve(value);
        }
      }
    })
    .catch((error) => {
      failedDispatch(loader, batch, error);
    });
}

function resolveCacheHits(batch: Batch<anyany>{
  if (batch.cacheHits) {
    for (let i = 0; i < batch.cacheHits.length; i++) {
      batch.cacheHits[i]();
    }
  }
}

  • 首先标记当前的 batch 为已派发完毕
  • 根据批处理函数与前面收集得到的keys,查询得到一个 batchPromise
  • 在这个 batchPromise 中,等待它返回后:
    • 首先调用 resolveCacheHits 方法,resolve 掉所有的 cacheHits,这里的 batch.cacheHits[i]() 也就意味着 load 方法返回的Promise将被置为 resolved。
    • 遍历 callbacks,依次根据请求情况调用 resolve / reject

试着走一遍流程:

  • 调用 load(1)  load(2)  load(3) load(1)
  • 在前三次每次调用中,batch的 callbacks 与 keys 都会被新增成员。
  • 第四次调用,由于 1 对应的值已经存在,因此走 cacheHits 逻辑,不会注册为新的回调。
  • 在 dispatchBatch 中,首先 调用 resolveCacheHits,这个方法实际上最终会等待第一次的 1 对应的 Promise resolved 之后,也就是说两次 1 的效果是一致的。
  • 使用 [1,2,3] 来一次性查询到所有数据,依次 resolve / reject。

到这里 load 方法就讲解完毕了,全文完

讲完了 load 方法,再来看看 loadMany 方法,上面的难关闯了下来,再来看这个就简单多了:

loadMany(keys: Readonly<Array<K>>): Promise<Array<V | Error>> {
    const loadPromises: Promise<any>[] = [];
    for (let i = 0; i < keys.length; i++) {
      loadPromises.push(this.load(keys[i]).catch((error) => error));
    }
    return Promise.all(loadPromises);
  }

load 方法一次只能 "load" 一个 key,为了满足需要一次性 load 多个 key 的需求,就有了 loadMany 方法。本质上二者并无不同,batch的创建、派发、缓存逻辑等都是一样的。唯一不同的是,在 loadMany 中,如果某个 key 对应的 Promise reject 了,也不会将整个 Promise.all reject 掉,而是将这个 reject 的 Promise 转变为错误实例。

DataLoader 实例的方法少得可怜,除了 load 和 loadMany 方法以外,就是几个缓存相关的方法,我们来一笔带过:

  • clear:根据一个 key 来清除缓存,由于在使用这个 key 进行查找时会先使用转换函数处理一次,因此需要传入原生的 key。
  • clearAll:清空所有的缓存记录。
  • prime:手动的添加一对缓存键值对到缓存记录中,这里还会判断这个传入的键值类型,如果是一个错误,则会将 值 转换为一个 使用这个错误 reject 的 Promise 示例。

好的,DataLoader的整体源码解析就到这里了,在完成阅读后,不妨再次根据 DataLoader-Source-Explore 这个仓库中的示例代码,来手动的走一遍 load - batch - dispatch 流程。

展望

在这个仓库中,除了 TypeScript 版本的 DataLoader 源码与示例代码以外,还包括:

  • 一个简化版本的,100+ 行代码的 DataLoader。大致讲述下它都做了那些精简:

    首先,最重要的,不再判断环境,直接使用 process.nextTick 作为批调度函数。其次,原版的代码中实际上是在一个 batch 被创建时开始派发的:

     let newBatch = { hasDispatched: false, keys: [], callbacks: [] };

    loader._batch = newBatch;

    loader._batchScheduleFn(() => {
      dispatchBatch(loader, newBatch);
    );

    这里我们在精简版中就不区分 新/旧 batch 了,直接在实例化,第一个 key 被添加进来时开始派发这个 batch。

在 Prisma 2 中,实际也包含了 DataLoader 的相关代码,这一点不难理解,毕竟 Prisma 最初就号称 GraphQL 的天作之合。源码中的实现见 runtime/DataLoader.ts。在我提供的示例仓库中有添加了注释的版本。

另外,如果你对 Prisma 感兴趣,不妨阅读这两篇系列文章:Prisma: 下一代 ORM,不仅仅是ORM:上篇、下篇,同样由我来带各位学习传说中 NodeJS 的下一代ORM Prisma,或者如果你已经迫不及待想上手体验,也可以查看 Prisma Example 这个仓库。

DataLoader的集成

如果你已经看过了示例代码,肯定会想到,如果是在实际生产环境或者是稍微有点追求,都不可能用这么简陋的方式来解决 N+1 问题,即手动注册每个嵌套关系 DataLoader 实例,因此社区存在一些尝试从不同方面简化使用的操作:

  • NestJS-DataLoader,NestJS 的 DataLoader 工具库,你仍然需要自己提供批查询函数,然后把这个 DataLoader 实例通过方法参数装饰器注入来实际使用,如:

    @Resolver(Account)
    export class AccountResolver {

        @Query(() => [Account])
        public getAccounts(
            @Args({ name: 'ids, type: () => [String] }) ids: string[],
            @Loader(AccountLoader.name) accountLoader: DataLoader<Account['
    id'], Account>): Promise<Account[]> {
            return accountLoader.loadMany(ids);
        }
    }
  • TypeGraphQL-DataLoader,仅适用于 TypeGraphQL 与 TypeORM 的 DataLoader 解决方案,和上面的不同,你不再需要手动提供批查询函数了。这一工具库会自动从 TypeORM 实体类获取被级联装饰器(如@OneToMany)修饰的属性以及关联的实体定义,并使用 TypeGraphQL 的中间件拦截掉解析过程,自己通过 TypeORM 的 API 去查询数据并返回。

    另外,这个工具库为了保持灵活性,也提供了手动注册 DataLoader 实例与批查询函数的方法。

  • Hasura Engine  / PostGraphile,严格地说,这两者和上面的几个方案不再是一个维度的存在了。作为GraphQL PaaS,使用它们意味着你不再需要编写自己的后端代码,通过对数据库的上层封装,它们直接屏蔽掉了普通的 GraphQL Server 可能存在的一系列问题,如 N+1 问题以及 Relay Mutation 相关的适配

尾声

DataLoader相关的知识就分享到这里,回顾一下,实际上最核心的思路还是enqueuePostPromiseJob,通过这种方式巧妙地将 一批单次的数据查询(GetSingleUserById) 转化为 一次批量的数据查询(GetBatchUsersByIds)

在最后我想有必要补充一点,DataLoader并不一定能提升你的 GraphQL API 响应速度,你可以通过ApolloServer的 tracing 选项来开启请求链路耗时追踪,以查看实际效果:

图片
image-20210130210834191

代码示例中并不涉及真正的数据库I/O,因此不能作为示例,此处仅做代示例。

在不使用DataLoader时,假设执行 N 次 GetSingleUserById,那么 I/O 会是这样的:

----->
----->
------>
---->

即多个耗时较小的 I/O 并行执行。

而执行 1次GetBatchUsersByIds,I/O可能会是这样的。

--------------->

即单个耗时较大的I/O执行。

所以,使用DataLoader并不一定能提升你的接口响应速度,只有在数据量级达到一定程度时,才有可能带来明显的提升,在数据量级较小时它反而可能带来反作用。


图片

往期推荐

图片


图片

20秒完成机器学习模型训练和部署?! 说说 Pipcook 2.0




继续滑动看下一个
大淘宝前端技术
向上滑动看下一个