cover_image

学术研究 | ComfyUI源码分析-NODE RUNTIME TRACE

kitezhu LitGate
2024年03月26日 11:40


ComfyUI是一个基于节点流程式的stable diffusion AI 绘图工具。它是一个强大的、模块化的 Stable Diffusion GUI,具有图形 / 节点界面。ComfyUI 本质上是一个创建工作流程的工具-一系列可用于生成图像的步骤。通过将稳定扩散的算法融入节点工作流,ComfyUI不仅在图像生成速度上超越传统A111 WebUI,而且显存占用更少。


ComfyUI在WebUI领域开创了全新的篇章,引领着AI绘画软件的新潮流。笔者本文目的是记录ComfyUI源码的学习笔记,如有差误欢迎指正。本章节的重点在于trace从node创建到接入workflow最终生成,后端的执行过程,非相关部分本章节不会赘述,后续会在其他章节详述部分核心node的内部实现。



本文转自kitezhu的文章《ComfyUI源码分析-NODE RUNTIME TRACE》,未经授权,禁止转载


ComfyUI-整体链路


首先从整体上看下ComfyUI的执行流程,下图简单示意了ComfyUI各模块之间的链路调度流程。先过一遍,有个大概的印象,后面会对模块进行展开描述。

图片


如上图所示,笔者将整个框架拆成了三个部分解读,web模块、服务启动模块、后端异步处理模块。为了方便理解,先说服务启动模块,服务启动主要经历几个部分,1)node注册,node来源包括系统内置node和第三方实现的node,最后都会注册到nodes.py下面的NODE_CLASS_MAPPINGS里,作为一个全局变量。同时,如果node有web代码,会统一管理到EXTENSION_WEB_DIRS。2)异步构建任务处理器prompt_worker,单独创建了一个线程处理任务,任务的接受处理是通过线程安全队列PromptQueue通信的。接受到任务后,会调用PromptExecutor执行一次任务,任务的执行是workflow维度的,会将任务拆解成多个pipeline逐个执行,单pipeline内部是深度递归执行的。3)服务正式启动,服务启动前会涉及到模型地址绑定、路由注册等,这里不做过多描述,真正服务起来后,执行核心调的是/prompt的post接口。再说下web模块,代码位于./web文件夹,启动自web/index.html,如果node有额外的web信息,会在启动时将相关web代码放到web/extensions下面。第一次载入workflow.json时,会调服务端的/object_info get接口,一次性获取所有node的信息,用于渲染workflow。真正执行时是点击Queue Prompt,触发服务端/prompt的post接口,将任务放到服务端的线程安全队列PromptQueue中,多次点击的话会触发任务排队,服务端在执行时会实时进度回调,所以在web端能看到任务的处理进度。最后说下异步执行执行模块,web请求数据是一个表示了web界面完整流程的workflow,可能是一个pipeline,也可能是多个pipeline,给到的是一个nodes列表,每个node信息里包含了类型信息和输入信息,通过每个节点的上游信息,解析真正的执行流(可能是多个),进而将流程串起来,深度递归执行过程中,为了加速生成,会记录已实例化对象object_storage,已经执行过的nodeoutputs,待执行的pipelineto_execute。所以该模块除了执行,还有个比较重要的部分是"cache"的动态处理。下文会对部分模块进行展开描述。





NODE的管理


1

创建NODE


如何创建可以参考example_node.py.example。例子在nodes.py有很多,这里不展开描述。只说一下一个最简node的必要元素。后面也会基于这些要素构建最简node,构建一个最简测试pipeline。

class CLIPTextEncode:    @classmethod    def INPUT_TYPES(s):        return {"required": {"text": ("STRING", {"multiline": True}), "clip": ("CLIP", )}}    RETURN_TYPES = ("CONDITIONING",)    FUNCTION = "encode"
CATEGORY = "conditioning"
def encode(self, clip, text): tokens = clip.tokenize(text) cond, pooled = clip.encode_from_tokens(tokens, return_pooled=True) return ([[cond, {"pooled_output": pooled}]], )


CLIPTextEncode为例。

  1. INPUT_TYPES是对输入信息的描述,其中required表示要填字段,text(类型STRING)是用户文本框输入({"multiline": True})、clip(类型CLIP)是上游传递,这里有个注意点,参数的命名要和真实的函数入参,比如def encode(self, clip, text)cliptext。更多输入的方式可以参考class KSamplernode。

  2. RETURN_TYPES是对输出的类型描述,这里的类型并不一定是要真实存在的类型,但是一定要和下游node的输入类型对齐,比如class KSampler依赖CLIPTextEncode的返回值CONDITIONING,则KSampler对应的输入positive类型也要文本对齐CONDITIONING。另外node的输出是tuple元素,每个位置要和RETURN_TYPES对齐。

  3. FUNCTION的存在是为了统一node调用,后面真实调用是直接node_obj.FUNCTION(...),所以这里要映射到你真正要执行的函数。

  4. CATEGORY是node归类标签,对应右键Add Node操作的多级标签。

  5. 自定义函数,实现自己的自定义函数,用FUNCTION关联起来,输入输出描述清楚,就是用了。

此外也会有一些扩展参数。比如:ComfyUI的链路校验相对较弱,如果node需要自己的特殊校验,可以实现自定义校验函数VALIDATE_INPUTS

2

注册NODE

上文也提到,nodes.py里面维护着一个全局变量NODE_CLASS_MAPPINGS,所有的node都会注册到这里。不过node的来源却各不相同。nodes的来源分为三个部分。

  • nodes.py [文件]
    该文件统一管理node的注册。这里维护了系统默认支持的基础node,比如:KSampler VAEDecode VAEEncode CLIPTextEncode LoadImage LoraLoader CLIPLoader UNETLoader等。同时维护了全局的NODE_CLASS_MAPPINGS列表,类名称到类定义的映射。该文件的node是默认配置到全局映射表,不需要特别的load。同时也管理着其他模块load的方法init_custom_nodes,后面其他的模块也会调该文件里的方法merge进来。

  • comfy_extras [文件夹]
    这里面comfyui官方维护了一些自定义节点。主要是sd生态中比较重要且已经验证的feature。
    比如 nodes_clip_sdxl nodes_hypernetwork等。
    每个文件都维护了和文件feature相关node的NODE_CLASS_MAPPINGS映射表。在启动的时候,会在nodes.py 中的 init_custom_nodes里面按文件载入。

def init_custom_nodes():    extras_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy_extras")    extras_files = [        "nodes_latent.py",        "nodes_hypernetwork.py",


几个注意点:


1. 从源码可以看出,如果需要在这里新增node文件,需要在源码里面显示改代码加上,才会生效。

2. 这里的load_custom_node入参ignore为空,即这里面的node优先级最高,如果和其他有重名,会覆盖其他的node。是一种覆盖写的方式。

for node_file in extras_files:        load_custom_node(os.path.join(extras_dir, node_file)) # 没有传ignore


3. 每个py文件要有自己的NODE_CLASS_MAPPINGS,漏掉也不会注册。本质上是对不同文件NODE_CLASS_MAPPINGS的merge。

# 比如:comfy_extras/nodes_tomesd.pyNODE_CLASS_MAPPINGS = {    "TomePatchModel": TomePatchModel,}


  • custom_nodes ['第三方插件'文件夹]

这里面存在很多非官方的业界的各种扩展插件能力,用户可以根据自己的需要自己定制或者使用一些社区的插件能力。


比如 ComfyUI-Manager comfyui-animatediff等。在启动的时候,会在nodes.py 中的 init_custom_nodes里面载入。但是和上面不一样的是,这里多数是按照文件夹(即插件级别)载入的。


def load_custom_nodes():    base_node_names = set(NODE_CLASS_MAPPINGS.keys())    node_paths = folder_paths.get_folder_paths("custom_nodes")    for custom_node_path in node_paths:        possible_modules = os.listdir(custom_node_path)        for possible_module in possible_modules:            module_path = os.path.join(custom_node_path, possible_module)            if os.path.isfile(module_path) and os.path.splitext(module_path)[1] != ".py": continue            if module_path.endswith(".disabled"): continue            success = load_custom_node(module_path, base_node_names) # ignore=base_node_names


几个注意点:


  1. 如果插件不想被载入,则可以将文件夹的名字以.disabled结尾。

  2. 这里的load_custom_node入参ignore为已有的全集,即这里面的node如果和已有node重名,则会跳过。

  3. 如果插件node是文件夹的话,文件夹根目录要有一个__init__.py文件,并且将NODE_CLASS_MAPPINGS 引入。


3

常见NODE







官方有对核心node的功能及使用做说明,详见CoreNodes。 非官方的就推荐一个ComfyUI-Manager,直接clone下来放到custom_nodes文件夹就行,可以方便的管理插件和模型,也可以一键下载workflow miss插件。

图片

其他插件自行探索吧,不是本文重点。


NODE 运行时



本节是本章重点,为了方便理解,笔者从官方demo(https://comfyanonymous.github.io/ComfyUI_examples/)中拿一个base版本的t2i(具体workflow,详见附件),同步trace下node真正的执行流程。

图片

workflow创建好后,每个node在画布中都有了唯一的id,并且每个node的输入信息、链接信息都已经有了。为了方便起见,后续会直接沿用这里的唯一id,先陈列出来,如下,至于其他信息可以详见json文件。

unique_id   class_type7           CLIPTextEncode5           EmptyLatentImage3           KSampler8           VAEDecode4           CheckpointLoaderSimple10          PreviewImage6           CLIPTextEncode

从中我们可以看到,即使是两个同类型的node,比如CLIPTextEncode,编号也是不一样的。为了后续方便理解,这里笔者先定义一个概念,结果节点即上述workflow的输出节点,可能存在多个,每个都代表一个piepeline,比如上图中的PreviewImage,就是笔者定义的结果节点。上文中对链路流程已经做了大致介绍,就不再从头说起,直接从PromptExecutor开始trace。


1

NODE执行列表


核心函数:PromptExecutor.execute。入参简化如下:

结果节点: [10]node列表:{[3, info3], [4, info4], [5, info5], [6, info6], [7, info7], [8, info8], [9, info9], [10, info10]}
补充解释下,结果节点存储在execute_outputs字段,可以是多个, 源码中后面与to_execute字段有关系。node列表是无序字典,存储在prompt字段里(吐槽下,源码里prompt定义很容易产生误解...)里。首先将无序的node列表,拆分不同的pipeline,制定执行顺序(目前是以pipeline涉及node长度来排),目前的demo只有一个结果节点(即只有一个pipeline),不涉及pipeline间排序。首先是将无序的node串起来,确定执行node的顺序和长度,涉及核心函数recursive_will_execute。这是一个深度遍历的过程。

def recursive_will_execute(prompt, outputs, current_item):    """    递归获取将要执行的节点列表    """    # 获取当前节点的唯一标识符    unique_id = current_item    # 获取当前节点的输入数据    inputs = prompt[unique_id]['inputs']     # 存储将要执行的节点列表    will_execute = []    # 如果当前节点已经在输出中,则直接返回空列表    if unique_id in outputs:        return []
# 遍历当前节点的输入 for x in inputs: input_data = inputs[x] # 如果输入数据是一个列表 if isinstance(input_data, list): # 获取输入数据的唯一标识符和输出索引 input_unique_id = input_data[0] output_index = input_data[1] # 如果输入数据的唯一标识符不在输出中,则递归获取将要执行的节点列表 if input_unique_id not in outputs: will_execute += recursive_will_execute(prompt, outputs, input_unique_id)
# 返回将要执行的节点列表,包括当前节点的唯一标识符 return will_execute + [unique_id]


图片


针对上图,先解释一些基本信息,上图中的m_n表示id为m的node第n个出口的node,每个node的详情里面包含了输入依赖node的信息。比如workflow 中的PreviewImage节点(10node)依赖VAEDecode节点的IMAGE数据(8node的0输出位),即上图看到的10依赖8_0。从结果节点开始,通过节点内的上游依赖信息,深度递归遍历,最终trace到整个链路所依赖的node以及执行顺序。如果有多个结果节点,每个结果节点都会进行一次深度遍历的trace过程,最终根据执行路径长度决定先执行哪个pipeline(这里的pipeline是指上图所示的结果节点执行顺序)。


2

NODE列表执行


真正的pipeline node执行涉及的核心函数是recursive_execute函数,和上面提到的recursive_will_execute函数类似,也是一个深度优先的递归,这里不再赘述,不同点在于这里是真正的执行。

图片

在执行过程中,已经执行过的node会直接跳过,避免多次执行,比如上图中执行链路长度是10,node 4重复多次,会被跳过,所以真正的执行长度是7。下面展开介绍下单个node的具体执行流程。

1)构造node输入数据,核心的函数是get_input_data 。

def get_input_data(inputs, class_def, unique_id, outputs={}, prompt={}, extra_data={}):    """    该函数的主要功能是根据输入参数 inputs、class_def、unique_id, 以及可选参数 outputs、prompt、extra_data, 生成一个包含所有输入数据的字典 input_data_all。    """    valid_inputs = class_def.INPUT_TYPES()    input_data_all = {}    for x in inputs:        # 一部分数据来自输出        input_data = inputs[x]        if isinstance(input_data, list):            input_unique_id = input_data[0]            output_index = input_data[1]            if input_unique_id not in outputs:                input_data_all[x] = (None,)                continue            obj = outputs[input_unique_id][output_index]            input_data_all[x] = obj        else:            # 一部分数据来自输入            if ("required" in valid_inputs and x in valid_inputs["required"]) or ("optional" in valid_inputs and x in valid_inputs["optional"]):                input_data_all[x] = [input_data]
根据数据的输入场景,笔者将node分为两种:叶子节点,不依赖其他节点输入。节点的用户输入信息已经在workflow.json里面随请求传递下来。比如node 5(EmptyLatentImage) 中的width、height、batch_size,直接获取就可以:input_data_all["width"] = [inputs["width"]]非叶子节点,即可能有用户输入,又依赖其他节点输入。如果依赖其他节点输入,需要先从inputs获取依赖节点的input_unique_idoutput_index,再依据节点位置信息从outputs获取node结果。比如workflow 中的PreviewImage节点(10node)的images依赖VAEDecode节点的IMAGE数据(8node的0输出位),即input_data_all["images"] = outputs[8][0]

2)node调用 首先是node实例化,优先从object_storage复用,key是(unique_id, class_type),如果cache没有,则从NODE_CLASS_MAPPINGS获取具体对应类型,实力化对象:obj = NODE_CLASS_MAPPINGS[class_type]()。然后会调用obj.FUNCTION,真正执行。

# get a slice of inputs, repeat last input when list isn't long enough    def slice_dict(d, i):        d_new = dict()        for k,v in d.items():            d_new[k] = v[i if len(v) > i else -1]        return d_new        results = []    if input_is_list:        if allow_interrupt:            nodes.before_node_execution()        results.append(getattr(obj, func)(**input_data_all))    elif max_len_input == 0:        if allow_interrupt:            nodes.before_node_execution()        results.append(getattr(obj, func)())    else:        for i in range(max_len_input):            if allow_interrupt:                nodes.before_node_execution()            results.append(getattr(obj, func)(**slice_dict(input_data_all, i)))    return results

这里需要提一下,为了适配不同node的自定义输入,一般会将输入信息放入字典,用 * 运算符展开,自动匹配入参。所以在自定义node时,需要将INPUT_TYPES中的命名与obj.FUNCTION真实命名对齐(吐槽下,python没有强的类型校验...你敢传,它就敢接...)。

3)node结果统一管理 node的输出主要的注意点时要将位置对应上,然后结果会存到outputs里面,需要的时候从outputs。下面笔者以node 4(CheckpointLoaderSimple,输出MODEL、CLIP、VAE)为例,讲述输出的映射关系。obj.FUNCTION的输出一般是一个tuple结构,每个tuple元素代表一个输出的参数。即class CheckpointLoaderSimple的输出:

return (model_patcher, clip, vae)

为了更好的参数扩展,上层又将结果用list包装了一下。

output_data = [[model_patcher],[clip],[vae]]

然后结果会缓存到outputs。

outputs[4] = output_data

最后下游节点要使用的话,直接读取,比如node 6(CLIPTextEncode)要使用clip,则:

clip = outputs[4][1]

顺便提一句,每个node都会有一个RETURN_TYPES,比如class CheckpointLoaderSimple:

RETURN_TYPES = ("MODEL", "CLIP", "VAE")


这个主要是用于两个链接链接点之间的类型名称校验,并不是强的类型校验。主要还是依靠运行过程中的抛异常。

3

NODE链路优化

为了减少不必要的执行和模型加载,ComfyUI在多个维度进行了优化。

  • 显存优化,Dynamic-Offload。
    ComfyUI有个./comfy/model_management.py模块,维护着一个current_loaded_models全局变量,用于维护搬迁到显存的模型,基于LRU的策略缓存显存中的模型。每次任务执行前会调用comfy.model_management.cleanup_models(),查询current_loaded_models中显存模型的引用计数<= 2则清理掉。使用过程中需要模型在显存中使用时,先从current_loaded_models显存cache中获取,1)在缓存,则该模型头插到缓存。2)不在,则load到显存,并头插到current_loaded_models。3)当显存剩余大小不足与分配新的模型时,则从current_loaded_models尾部队列释放最近最久未使用的模型显存。

  • 执行链路优化
    1)单次执行链路内优化

图片

上文已经提到,在执行过程中,已经执行过的node会直接跳过,避免多次执行。还有一些场景没有覆盖到。比如多次不同任务间的情况。

2)多次执行链路见优化。
假如用户第一次执行workflow后,后台会保存每个node的生成结果outputs。还是以上面的workflow为例,假如修改了node 3(KSampler)的输入seed。则当用户第二次请求时,node 3的前置执行节点时不需要变动的,后续节点需要变动。于是:第二次的执行流程就可以精简如下:

图片


第一次用户请求需要执行7个node,第二次只需要执行3个node。怎么判断node是否改动呢?有两种场景,一个是node本身的请求带了is_changed标记;另一个是服务端本身记录了上一次执行的输入信息old_prompt,会判断输入信息是否有变动,inputs == old_prompt[unique_id]['inputs']

3)outputsobject_storage清理。
每次请求处理前都会对outputs进行瘦身。会及时清理掉不在当前请求node list中的cache,即保证outputs是当前请求node列表的子集。object_storage类似,维护的是node已实例化的对象池,每次请求前也会瘦身,保证是当前node列表子集。

4)workflow校验。
校验的核心函数是def validate_prompt(prompt):,要满足以下要求。


1.每个node的类型都要已注册(在`NODE_CLASS_MAPPINGS`中可查)。2.每个请求的workflow至少要有一个输出节点(node的定义含有`OUTPUT_NODE`属性并且`node.OUTPUT_NODE == True`)。3.判断node输入定义`INPUT_TYPES`中的必填字段(`required`相关字段)有没有在输入里面。4.判断依赖node链接是否正常。首先判断依赖“坐标”(前面提到的`m_n`,比如10依赖8_0)node是否真实存在、类型是否正确,然后本节点的输入类型和依赖node的对应位输出类型(`RETURN_TYPES[n]`)是否一致。5.基础类型校验。如果是`INT``FLOAT``STRING`这些基础类型,会进行类型强转判断,如果有设置`min``max`也会判断最小值和最大值范围。6.自定义输入校验。ComfyUI的链路校验相对较弱,如果node需要自己的特殊校验,可以实现自定义校验函数`VALIDATE_INPUTS`


4

T2I TRACE


上面是以node视角讲述了node的内部运转机制,现在以业务视角为例(这里还是复用上文的workflow),看一下最基本的T2I,在ComfyUI中都做了啥。

图片


上图来自Stable Diffusion论文,大致分为三块,构造Conditioning(text + image),基于vae图片Pixel空间与Latent空间的转换,以及基于unet迭代去噪。


SD 模型载入


核心入口函数CheckpointLoaderSimple.load_checkpoint,根据ckpt类型选择载入方式(safetensors.torch.load_filetorch.load)获取sd模型权重state_dict,默认使用半精度(保证较小的显存占有和较快的推理速度)。从comfy/model_detection.py中动态获取unet配置(detect_unet_config)、模型配置(model_config_from_unet_config),comfy/supported_models.py维护着支持的sd模型及相关配置。
models = [SD15, SD20, SD21UnclipL, SD21UnclipH, SDXLRefiner, SDXL, SSD1B]
  • unet
    这里是沿用ldm(comfy/ldm/modules/diffusionmodules/openaimodel.py)中unet的定义(UNetModel)。
    使用comfy/model_base.py 中的class BaseModel进行了一次再封装(unet=self.diffusion_model),unet模型权重为上文提到的sd模型权重state_dictkey前缀为"model.diffusion_model."的部分。

self.diffusion_model = UNetModel(**unet_config, device=device) def load_model_weights(self, sd, unet_prefix="model.diffusion_model."):        to_load = {}        keys = list(sd.keys())        for k in keys:            if k.startswith(unet_prefix):                to_load[k[len(unet_prefix):]] = sd.pop(k)
m, u = self.diffusion_model.load_state_dict(to_load, strict=False)

unet模型默认是load到cpu内存。


  • vae
    这里是沿用ldm中(comfy/ldm/models/autoencoder.py)vae的定义(AutoencoderKL)。使用comfy/sd.py 中的class VAE进行了一次再封装(vae=self.first_stage_model),unet模型权重为上文提到的sd模型权重state_dictkey前缀为"first_stage_model."的部分。vae权重根据不同的来源做了兼容适配。

vae_sd = comfy.utils.state_dict_prefix_replace(sd, {"first_stage_model.": ""}, filter_keys=True)vae_sd = model_config.process_vae_state_dict(vae_sd)vae = VAE(sd=vae_sd)...self.first_stage_model = AutoencoderKL(ddconfig=ddconfig, embed_dim=4)m, u = self.first_stage_model.load_state_dict(sd, strict=False)

vae模型默认是load到cpu内存。

  • clip
    每个sd版本的clip差异较大,ComfyUI针对不同版本封装了不同的实现,这里是以笔者使用的SD15comfy/supported_models.py)为例。

def clip_target(self):        return supported_models_base.ClipTarget(sd1_clip.SD1Tokenizer, sd1_clip.SD1ClipModel)
  • 使用的是comfy/sd1_clip.py中的SD1Tokenizer(背后是对transformers.models.clip.tokenization_clip.CLIPTokenizer的封装)、SD1ClipModel(背后是对transformers.models.clip.modeling_clip.CLIPTextModel的封装)。
    clip权重则是sd模型权重state_dict搬迁走unet、vae之后剩余的权重。clip模型默认是load到cpu内存。

  • model_patcher
    这里单独提一下ComfyUIcomfy/model_patcher.py中封装的class ModelPatcher。这个类的主要功能是对 PyTorch 模型进行补丁操作,包括添加补丁、计算新的权重、撤销补丁等。提供了一种灵活的方式来修改 PyTorch 模型的权重。
    上述的unet、vae、clip也有用model_patcher包一层,用于部分node灵活调整权重。


CLIP 构造Conditioning

1)text positive and negative 正向词和负向词处理流程一样,这里以正向词为例:

text='beautiful girl smile'。# 1 分词tokens = clip.tokenize(text) # text_model的输入token是77个(算上开始和结束符),超过截断,不够‘补0’。# 即:[<start>,<beautiful>,<girl>,<smile>,<end>,...,<end>]共`77`个,下面的1.0表示词权。# tokens = [(49406,1.0),(1215,1.0)(1611,1.0)(3490,1.0)(49407,1.0)...(49407,1.0)]# 2 load model to gpu# 3 text model encoder,获取构造onditioningcond, pooled = clip.encode_from_tokens(tokens, return_pooled=True)# 每个token生成大小为768的embedding,所以cond的shape是`torch.Size([1, 77, 768])`。


2)构造latent image 如果是I2I场景,这里是要根据真实输入图片构造latent image,即使用vae 将图片压缩到latent空间。当前demo是T2I,不需要输入图片,所以就直接根据输入长宽构造全0的latent image。

# 即长宽减少8倍,通道扩成4.# 这里有个点需要注意,图片的长宽最好是8的倍数。latent = torch.zeros([batch_size, 4, height // 8, width // 8])


UNET 迭代去噪

图片

采样器的调度过程比较复杂,为了便于理解,我们以入参的视角去分析每个参数的作用,以及在调用栈的位置。为了统一描述,上图中的调用栈都加上了编号。

  • model
    这里的model是指的unet部分,在上图调用栈3处load到显存中。真正的触发模型调用是在上图的蓝色部分(调用栈14),会触发多次,每调一次就是一次去噪过程。

  • positive、negative
    positive、negative一个代表正向次的embeddeding,一个代表负向词的embedding。在链路传递中主要是格式的改变,最终cat到一起,作为conditioning传递给了unet。结合调用栈,具体trace过程如下:

旧格式:positive: list (1) [pub]| [0]: list (2) [pub]| | [0]: Tensor (1)| | [1]: dict (1) [pub]| | | ['pooled_output']: Tensor (1)---------# 调用栈3 格式转换,`model_conds["c_crossattn"] = comfy.conds.CONDCrossAttn(c[0])`positive = convert_cond(positive)negative = convert_cond(negative)--------新格式:positive: list (1) [pub]| [0]: dict (2) [pub]| | ['pooled_output']: Tensor (1)| | ['model_conds']: dict (1) [pub]| | | ['c_crossattn']: CONDCrossAttn
---------# 调用栈11 格式转换,positive和negative的c_crossattn做cond_cat, `c = cond_cat(c)`---------新格式:c: dict (2) [pub]| ['c_crossattn']: Tensor (2) [pub]| | [0]: Tensor (77)| | [1]: Tensor (77)最终unet收到的是:context=c['c_crossattn']
  • latent_image
    将被去噪的潜在特征。latent_image的shape影响noise,I2I场景影响unet noise 输入,mask场景影响unet输出结果

# 调用栈2 在T2I场景,latent_image的shape会影响noise的生成。noise = comfy.sample.prepare_noise(latent_image, seed, batch_inds)
# 调用栈5,`latent_image = model.process_latent_in(latent_image)`。latent_image乘系数,该步对T2I没影响,latent_image本身就是0。`return latent * self.scale_factor`。
# 调用栈6,`noise += latent_image`, 噪声叠加latent_image信息,对I2I有影响,对T2I没影响。
# 调用栈8,I2I mask场景,影响输出。if denoise_mask is not None: out += self.latent_image * latent_mask
  • seed、control_after_generate
    control_after_generate提供在每次提示后更改上述种子编号的能力。节点可以randomize、 incrementdecrement或保留种子fixedseed是用于创建噪声的随机种子,固定seed可以复现结果。

# 调用栈2,seed影响noise创建。`generator = torch.manual_seed(seed)`
  • steps、denoise、scheduler
    我将stepsdenoisescheduler放在一起讲,是因为这三个参数本质上都是在影响sigmas,sigma可以理解为噪声强度,打印一个线上的数据:


[14.6146, 11.9175,  9.8142,  8.1584,  6.8430,  5.7885,  4.9356,  4.2397,  3.6669,  3.1913,  2.7931,  2.4569,  2.1705,  1.9246,  1.7116,  1.5257,  1.3619,  1.2166,  1.0865,  0.9691,  0.8622,  0.7640,  0.6730,  0.5877,  0.5067,  0.4286,  0.3515,  0.2722,  0.1835,  0.0292,  0.0000]


个数就是步长,数值由高到低。
steps影响的是迭代次数。
scheduler影响的是数值分布。
denoise中间结果影响迭代次数,从最终结果中取最后的(steps + 1) 个。看下下面代码应该就一目了然了。

# 调用栈3,# denoise 会影响 stepdef set_steps(self, steps, denoise=None):        self.steps = steps        if denoise is None or denoise > 0.9999:            self.sigmas = self.calculate_sigmas(steps).to(self.device)        else:            new_steps = int(steps/denoise)            sigmas = self.calculate_sigmas(new_steps).to(self.device)            self.sigmas = sigmas[-(steps + 1):]
# scheduler_name steps会影响sigmasdef calculate_sigmas_scheduler(model, scheduler_name, steps): if scheduler_name == "karras": sigmas = k_diffusion_sampling.get_sigmas_karras(n=steps, sigma_min=float(model.model_sampling.sigma_min), sigma_max=float(model.model_sampling.sigma_max)) elif scheduler_name == "exponential": sigmas = k_diffusion_sampling.get_sigmas_exponential(n=steps, sigma_min=float(model.model_sampling.sigma_min), sigma_max=float(model.model_sampling.sigma_max)) elif scheduler_name == "normal": sigmas = normal_scheduler(model, steps) elif scheduler_name == "simple": sigmas = simple_scheduler(model, steps) elif scheduler_name == "ddim_uniform": sigmas = ddim_scheduler(model, steps) elif scheduler_name == "sgm_uniform": sigmas = normal_scheduler(model, steps, sgm=True) else: print("error invalid scheduler", self.scheduler) return sigmas

  • cfg
    cfg可以理解为用户输入prompt的强度。从下面代码可以看到,cond_scale越大,那cond结果影响就越大。

# 调用栈11,这里要注意,这里的代码是拿到unet处理结果的后置处理。# 这里的 cond_scale=cfgreturn uncond + (cond - uncond) * cond_scale
  • sampler_name
    采样器核心的采样算法。整个去躁过程主要是采样器在控制,位于上面调用栈的橙色部分。ComfyUI目前支持的采样器。

KSAMPLER_NAMES = ["euler", "euler_ancestral", "heun", "heunpp2","dpm_2", "dpm_2_ancestral",                  "lms", "dpm_fast", "dpm_adaptive", "dpmpp_2s_ancestral", "dpmpp_sde", "dpmpp_sde_gpu",                  "dpmpp_2m", "dpmpp_2m_sde", "dpmpp_2m_sde_gpu", "dpmpp_3m_sde", "dpmpp_3m_sde_gpu", "ddpm", "lcm"]


上面调用栈之所以很复杂,就是中间为了适配各种采样器,加了很多封装,现在可以回过再看一遍调度过程,分析下作者封装的思路。

图片


采样器的封装,采样器有基于k_diffusion的,也有非k_diffusion的。所以需要一个基类class Sampler统一起来;k_diffusion的采样器是一堆函数,需要class KSAMPLER(Sampler)统一管理起来;类管理起来后还缺少上下文和数据的处理,需要class KSampler将类和数据关联起来。


VAE latent转换图片

过完KSampler拿到的是图片latents,需要通过vae decoder将latents转换为图片。直接调VAE model就行,入参只需要图片latents。
return (vae.decode(samples["samples"]), )
vae问题是显存占用过大,大图场景可能要考虑显卡的动态优化。如果对效果没有那么高要求的话,也可以使用taesd,比如上面的KSampler,每迭代一步都会回调一个sample latents,ComfyUI内置的就是用taesd反解预览。vae转换完之后就可以预览或者保存了,ComfyUI已经内置了相关能力。


与A111 WebUI插件机制的对比


笔者之前也trace过A111 WebUI。简要介绍下 WebUI插件执行流程。WebUI本身是有一个主线流程在的(T2I 或 I2I)。如下:

#a111 v1.5.0 webui梳理核心的python文件:processing.py   - 流程处理:   process_images_inner   https://github.com/AUTOMATIC1111/stable-diffusion-webui/blob/v1.5.0/modules/processing.py#L673   https://github.com/AUTOMATIC1111/stable-diffusion-webui/blob/v1.5.0/modules/processing.py#L689   - 核心的两个类:             - StableDiffusionProcessingTxt2Img       https://github.com/AUTOMATIC1111/stable-diffusion-webui/blob/v1.5.0/modules/processing.py#L922       核心的函数处理:def sample(       https://github.com/AUTOMATIC1111/stable-diffusion-webui/blob/v1.5.0/modules/processing.py#L1034             - StableDiffusionProcessingImg2Img                https://github.com/AUTOMATIC1111/stable-diffusion-webui/blob/v1.5.0/modules/processing.py#L1210                核心的函数处理:def sample(                https://github.com/AUTOMATIC1111/stable-diffusion-webui/blob/v1.5.0/modules/processing.py#L1363核心的python文件:sd_samplers_kdiffusion.py,采样器及步数迭代       - 采样器选择:https://github.com/AUTOMATIC1111/stable-diffusion-webui/blob/v1.5.0/modules/sd_samplers_kdiffusion.py#L13       - 处理 text2image : https://github.com/AUTOMATIC1111/stable-diffusion-webui/blob/v1.5.0/modules/sd_samplers_kdiffusion.py#L441       - 处理 image2image : https://github.com/AUTOMATIC1111/stable-diffusion-webui/blob/v1.5.0/modules/sd_samplers_kdiffusion.py#L397

WebUI插件的思想是在主链路(T2I或I2I)中,预留几个插件接入点:

  • p.scripts.before_process(p)https://github.com/AUTOMATIC1111/stable-diffusion-webui/blob/v1.5.0/modules/processing.py#L652

  • p.scripts.process(p)https://github.com/AUTOMATIC1111/stable-diffusion-webui/blob/v1.5.0/modules/processing.py#L726

  • p.scripts.before_process_batch(...)https://github.com/AUTOMATIC1111/stable-diffusion-webui/blob/v1.5.0/modules/processing.py#L759

  • p.scripts.process_batch(...)https://github.com/AUTOMATIC1111/stable-diffusion-webui/blob/v1.5.0/modules/processing.py#L771

  • p.scripts.postprocess_batch(...)https://github.com/AUTOMATIC1111/stable-diffusion-webui/blob/v1.5.0/modules/processing.py#L807

  • p.scripts.postprocess_imagehttps://github.com/AUTOMATIC1111/stable-diffusion-webui/blob/v1.5.0/modules/processing.py#L828

  • p.scripts.postprocesshttps://github.com/AUTOMATIC1111/stable-diffusion-webui/blob/v1.5.0/modules/processing.py#L905

从上面名称也可以看出,是在处理过程中的各个阶段埋了一些点,插件只能在这些位置触发,很受限;另外就是每个埋单触发时,如果有多个插件,会按照固定的顺序顺序执行,有没有相互影响,也很难确定。此外插件开发者为了打破这种限制,在代码里充斥着各种hook,走着走着可能就跳走了,整体流程不可控,比较黑盒。相对来讲,ComfyUI就显得简洁清晰,通过workflow就可以大致的了解整个流程了。此外不管是性能上还是可复现性方面,ComfyUI也相对更优一点。


一个栗子

做一个例子来结束本文吧。想来想起,做个最简单的加法器吧,"课间十分钟"就可以体验下。

# -*- coding: utf-8 -*-
class Input: @classmethod def INPUT_TYPES(s): return {"required": {"number": ("FLOAT", {"multiline": True})}} RETURN_TYPES = ("CalcFLOAT",) FUNCTION = "input" CATEGORY = "calc"
def input(self, number): return (number, ) class Add: @classmethod def INPUT_TYPES(s): return {"required": {"number1": ("CalcFLOAT",), "number2": ("CalcFLOAT",)}} RETURN_TYPES = ("CalcSTR",) FUNCTION = "add" CATEGORY = "calc"
def add(self, number1, number2): val = str(number1+number2) return (val, )
NODE_CLASS_MAPPINGS = { "Input": Input, "Add": Add,}

图片

结语


ComfyUI一直在迭代,且没有打tag。不保证文中内容不会过期,为了不必要的对齐,这里给下笔者发文时用的commit id:777f6b15225197898a5f49742682a2be859072d7。本文主要是从node为入手点,分析ComfyUI在后端的执行流程。后续会从模型视角出发,trace lora、controlnet等SD中的重要组件,是如何对模型动态调整权重、调整计算,未完待续。。。

关于LitGate

大家好,我是LitGate,一个专注于AI创作的游戏社区。我们的新版官网已经上线✨你可以在里面找到各种AI创作的实操案例,以及已经沉淀的AI游戏创意demo,相信一定能让你大开眼界!


我们还有一个讨论群📣,如果你对AI创作感兴趣,或者有什么问题想要咨询,欢迎加入我们的讨论群,和大家一起交流学习!(PS:目前群内人数较多,为了有一个优质的讨论环境,请各位添加社区管理员企业微信账号邀请入群


更多精彩活动和功能筹备上线中,敬请期待~


关注我们,一起探索AI创作的无限可能吧!


新版官网地址:www.litgate.ai

图片

继续滑动看下一个
LitGate
向上滑动看下一个