Agent Skills › areal-project/AReaL

areal-project/AReaL

GitHub

指导如何在Archon训练引擎中添加对新HuggingFace模型架构的支持。涵盖分析模型配置与源码、选择参考实现及适配代码的完整步骤,适用于新增Llama等模型家族的场景。

25 skills 5,474

Install All Skills

npx skills add areal-project/AReaL --all -g -y
More Options

List skills in collection

npx skills add areal-project/AReaL --list

Skills in Collection (25)

指导如何在Archon训练引擎中添加对新HuggingFace模型架构的支持。涵盖分析模型配置与源码、选择参考实现及适配代码的完整步骤,适用于新增Llama等模型家族的场景。
用户询问如何向Archon添加新模型 用户希望在ArchonEngine中支持新的模型家族(如Llama, Mistral) 用户提及为Archon添加新的ModelSpec或模型类型
.agents/skills/add-archon-model/SKILL.md
npx skills add areal-project/AReaL --skill add-archon-model -g -y
SKILL.md
Frontmatter
{
    "name": "add-archon-model",
    "description": "Guide for adding a new model to the Archon engine. Use when user wants to add support for a new HuggingFace model architecture in ArchonEngine."
}

Add Archon Model

Add support for a new HuggingFace model architecture in the Archon training engine.

When to Use

This skill is triggered when:

  • User asks "how do I add a model to Archon?"
  • User wants to support a new model family (e.g., Llama, Mistral, DeepSeek) in ArchonEngine
  • User mentions adding a new ModelSpec or model type for Archon

Prerequisites

Before starting, ensure:

  • The target model is available on HuggingFace (has config.json with model_type)
  • You know the HuggingFace model ID (e.g., meta-llama/Llama-3-8B)
  • The model uses a standard transformer architecture (decoder-only)

Step-by-Step Guide

Step 1: Analyze the Target Model Architecture

Read the HuggingFace model's source code to extract key architecture information.

Action: Fetch and analyze the model's HuggingFace configuration and modeling files.

  1. Read the model's config.json (via AutoConfig.from_pretrained) to identify:

    • model_type string (this is the key used for registry lookup)
    • All architecture hyperparameters (hidden_size, num_layers, etc.)
    • Any model-specific fields (e.g., qk_norm, attention_bias, MoE fields)
  2. Read the HuggingFace modeling_*.py source to identify:

    • Attention variant: Does it have Q/K norm? Attention bias? Sliding window? Multi-latent attention?
    • FFN variant: SwiGLU (gate_proj + up_proj + down_proj)? GeGLU? Standard MLP?
    • MoE support: Does it have MoE layers? What router type? Shared experts?
    • RoPE variant: Standard RoPE? YaRN? NTK-aware scaling? What is the inv_freq formula?
    • Normalization: RMSNorm or LayerNorm? Pre-norm or post-norm? Elementwise affine?
    • Weight tying: Does tie_word_embeddings appear in config?
    • State dict key names: What are the HF weight key naming conventions?
  3. Summarize findings in a checklist like:

Target model: <name>
HF model_type: "<model_type>" (and variants like "<model_type>_moe" if applicable)
Attention: [standard GQA / with QK norm / with bias / sliding window / ...]
FFN: [SwiGLU / GeGLU / standard MLP / ...]
MoE: [no / yes - num_experts, top_k, shared_experts]
RoPE: [standard / YaRN / NTK-aware / ...]
Norm: [RMSNorm / LayerNorm] with [pre-norm / post-norm]
Weight tying: [yes / no]

Step 2: Select the Reference Model

Choose the closest existing implementation as a starting point:

Target characteristics Reference Why
Dense-only, standard GQA, no QK norm qwen2 Simplest baseline, pure dense
Has QK norm, or has MoE support qwen3 Supports QK norm + MoE + shared experts

Action: Copy the reference model directory as the starting point:

areal/experimental/models/archon/<model>/
  __init__.py
  spec.py
  model/
    args.py
    model.py
    rope.py
    state_dict_adapter.py
  infra/
    parallelize.py

Step 3: Implement args.py

Adapt <Model>ModelArgs to match the target model's HuggingFace config fields.

Key changes from reference:

  1. Update the @dataclass fields to match the target model's hyperparameters:

    • Field names should use Archon conventions (dim, n_layers, n_heads, n_kv_heads, vocab_size, head_dim, hidden_dim, norm_eps, rope_theta, etc.)
    • Default values should match the smallest variant of the target model
    • Add model-specific fields (e.g., attention_bias, qk_norm, sliding_window)
  2. Update from_hf_config() to correctly map HuggingFace config attributes:

    • Use getattr(hf_config, "field_name", default) for optional fields
    • Handle variant-specific fields (e.g., MoE fields only present in MoE variants)
    • The method must return an instance of the model args class

Critical: Verify every field mapping against the HF model's config.json. Incorrect mappings here cause silent errors downstream.

Base class contract (BaseModelArgs):

@dataclass
class <Model>ModelArgs(BaseModelArgs):
    # ... model-specific fields ...

    @classmethod
    def from_hf_config(
        cls,
        hf_config: PretrainedConfig,
        is_critic: bool = False,
        **kwargs,
    ) -> <Model>ModelArgs:
        # Map HF config fields to Archon model args
        ...

Step 4: Implement model.py

Adapt the model architecture to match the target model.

Key components to adapt:

  1. Normalization (RMSNorm or similar):

    • Check if elementwise_affine is configurable
    • Check the epsilon default value
    • If the model uses LayerNorm, implement accordingly
  2. Attention module:

    • Q/K/V projection: Check bias presence (nn.Linear(..., bias=True/False))
    • QK norm: Add q_norm/k_norm if the model has them, remove if it doesn't
    • GQA: n_kv_heads < n_heads for grouped-query attention
    • Ulysses SP: Keep the set_cp_group / _sp_enabled pattern from the reference
    • Output projection: Check bias presence
  3. FeedForward module:

    • SwiGLU: w2(silu(w1(x)) * w3(x)) -- most common for modern LLMs
    • Check bias in linear layers
    • For MoE models: MoE module replaces FeedForward on designated layers
  4. TransformerBlock: Pre-norm (most modern LLMs) vs post-norm

    • MoE layer detection via _is_moe_layer() if applicable
  5. Top-level Model (<Model>Model(BaseArchonModel)):

    • tok_embeddings, layers (as ModuleDict), norm, output/score
    • init_weights(): Match initialization scheme from HF
    • init_buffers(): RoPE cache + MoE buffers
    • forward(): Must follow BaseArchonModel signature: (tokens, positions, cu_seqlens, max_seqlen, tree_attn_meta=None) -> Tensor

Base class contract (BaseArchonModel):

class <Model>Model(BaseArchonModel):
    def forward(self, tokens, positions, cu_seqlens, max_seqlen, tree_attn_meta=None) -> torch.Tensor: ...
    def init_weights(self) -> None: ...
    def init_buffers(self, buffer_device) -> None: ...

Step 5: Implement rope.py

Handle the rotary position embedding variant.

Options:

  1. Standard RoPE (same as qwen2/qwen3): Re-export from qwen2:

    from areal.experimental.models.archon.qwen2.model.rope import (
        apply_rotary_emb,
        precompute_rope_cache,
        repeat_kv,
        reshape_for_broadcast,
        rotate_half,
    )
    
  2. Custom RoPE (YaRN, NTK-aware, etc.): Implement custom precompute_rope_cache() and apply_rotary_emb() functions. The key difference is usually in how inv_freq is computed (scaling factors, interpolation, etc.).

Step 6: Implement state_dict_adapter.py

Map between HuggingFace and Archon weight key names.

This is the most error-prone step. The adapter must correctly handle:

  1. Key name mapping (from_hf_map dict):

    • Embedding: model.embed_tokens.weight -> tok_embeddings.weight
    • Attention: model.layers.{}.self_attn.q_proj.weight -> layers.{}.attention.wq.weight
    • FFN: model.layers.{}.mlp.gate_proj.weight -> layers.{}.feed_forward.w1.weight
    • Norms: model.layers.{}.input_layernorm.weight -> layers.{}.attention_norm.weight
    • Output: lm_head.weight -> output.weight
    • Skip keys (set to None): rotary_emb.inv_freq (computed at runtime)
    • Model-specific keys: bias terms, QK norm weights, etc.
  2. Reverse mapping (to_hf_map): Auto-generated from from_hf_map

  3. MoE expert weights (if applicable): 3D<->2D conversion for expert weights. Copy the MoE handling from qwen3 if the model has MoE.

  4. Weight tying: Skip output.weight during to_hf() if tie_word_embeddings=True

Verification approach: After implementation, the adapter should satisfy:

# Roundtrip: archon -> hf -> archon preserves all keys
hf_sd = adapter.to_hf(archon_sd)
roundtrip_sd = adapter.from_hf(hf_sd)
assert set(roundtrip_sd.keys()) == set(archon_sd.keys())

Base class contract (BaseStateDictAdapter):

class <Model>StateDictAdapter(BaseStateDictAdapter):
    def from_hf(self, hf_state_dict) -> dict[str, Any]: ...
    def to_hf(self, archon_state_dict) -> dict[str, Any]: ...
    def convert_single_to_hf(self, name, tensor) -> list[tuple[str, torch.Tensor]]: ...

Step 7: Implement parallelize.py

Define the parallelization strategy for the model.

The parallelize function applies parallelism in this order:

  1. TP (Tensor Parallelism) -- shard attention/FFN across devices
  2. EP (Expert Parallelism) -- for MoE models only
  3. CP (Context Parallelism / Ulysses SP) -- sequence parallelism
  4. AC (Activation Checkpointing) -- memory optimization
  5. torch.compile -- compilation optimization
  6. FSDP (Fully Sharded Data Parallelism) -- data parallelism

Key adaptations by model architecture:

  • Attention with QK norm: wq/wk use use_local_output=False (DTensor output for norm), add SequenceParallel(sequence_dim=2) for q_norm/k_norm
  • Attention without QK norm: wq/wk/wv all use use_local_output=True
  • Attention with bias: Bias terms follow the same parallel plan as their weights
  • MoE layers: Separate TP plan for MoE input/output, router gate, and expert weights. Copy from qwen3's apply_moe_ep_tp() and apply_non_moe_tp()
  • Dense-only models: Simpler plan without MoE handling. Copy from qwen2

Function signature (must match ParallelizeFn protocol):

def parallelize_<model>(
    model: nn.Module,
    parallel_dims: ArchonParallelDims,
    param_dtype: torch.dtype = torch.bfloat16,
    reduce_dtype: torch.dtype = torch.float32,
    loss_parallel: bool = True,
    cpu_offload: bool = False,
    reshard_after_forward_policy: str = "default",
    ac_config: ActivationCheckpointConfig | None = None,
    enable_compile: bool = True,
) -> nn.Module:

Step 8: Create spec.py and Register

Assemble the ModelSpec and register it.

from areal.experimental.models.archon.model_spec import ModelSpec, register_model_spec
from areal.experimental.models.archon.pipeline_parallel import pipeline_llm
from areal.experimental.models.archon.<model>.infra.parallelize import parallelize_<model>
from areal.experimental.models.archon.<model>.model.args import <Model>ModelArgs
from areal.experimental.models.archon.<model>.model.model import <Model>Model
from areal.experimental.models.archon.<model>.model.state_dict_adapter import (
    <Model>StateDictAdapter,
)

<MODEL>_SPEC = ModelSpec(
    name="<Model>",
    model_class=<Model>Model,
    model_args_class=<Model>ModelArgs,
    state_dict_adapter_class=<Model>StateDictAdapter,
    parallelize_fn=parallelize_<model>,
    supported_model_types=frozenset({"<model_type>"}),  # From HF config.json
    pipelining_fn=pipeline_llm,
)

# Auto-register when module is imported
register_model_spec(<MODEL>_SPEC)

__all__ = ["<MODEL>_SPEC"]

Note: supported_model_types should include all HF model_type strings that this implementation handles (e.g., {"qwen3", "qwen3_moe"} for Qwen3).

Step 9: Register in __init__.py

Add the import to areal/experimental/models/archon/__init__.py:

from areal.experimental.models.archon.<model> import spec as <model>_spec  # noqa: F401

This triggers auto-registration when the module is imported.

Step 10: Verify and Test

Verification should be done in stages, adapting based on available hardware and the test patterns in tests/experimental/archon/.

Before writing tests, examine the existing test files to understand current patterns:

tests/experimental/archon/
  conftest.py             -- Pytest configuration (version checks)
  utils.py                -- Shared utilities (model loading, comparison)
  test_qwen3_args.py      -- Args unit tests (CPU-only)
  test_state_dict_adapter.py  -- State dict roundtrip tests
  test_weight_sync.py     -- Weight completeness tests (meta device)
  test_forward.py         -- Forward precision comparison (single GPU)
  ...

Test stages (write tests appropriate for the model's complexity):

Stage 1: Args Tests (CPU-only, always write these)

Test from_hf_config() with mock HuggingFace configs:

# Pattern: Create mock PretrainedConfig, verify args mapping
from unittest.mock import MagicMock

def test_args_from_hf_config():
    hf_config = MagicMock()
    hf_config.hidden_size = 4096
    hf_config.num_hidden_layers = 32
    # ... set all required fields
    args = <Model>ModelArgs.from_hf_config(hf_config)
    assert args.dim == 4096
    assert args.n_layers == 32

Stage 2: State Dict Adapter Tests (CPU-only)

Test key mapping roundtrip:

def test_state_dict_roundtrip():
    # Create adapter with mock config
    adapter = <Model>StateDictAdapter(mock_config)
    # Create fake archon state dict with expected keys
    archon_sd = {"tok_embeddings.weight": torch.randn(vocab, dim), ...}
    # Roundtrip
    hf_sd = adapter.to_hf(archon_sd)
    roundtrip = adapter.from_hf(hf_sd)
    assert set(roundtrip.keys()) == set(archon_sd.keys())

Stage 3: Weight Completeness (meta device, CPU-only)

Verify all model parameters have HF mappings:

def test_weight_completeness():
    # Create model on meta device
    with torch.device("meta"):
        model = <Model>Model(args)
    adapter = <Model>StateDictAdapter(hf_config)
    # Check every archon param has a HF mapping
    for name, _ in model.named_parameters():
        hf_pairs = adapter.convert_single_to_hf(name, torch.empty(0))
        assert len(hf_pairs) > 0, f"No HF mapping for {name}"

Stage 4: Forward Precision (single GPU, if available)

Compare Archon model output against HuggingFace reference:

@pytest.mark.skipif(not torch.cuda.is_available(), reason="Requires CUDA")
def test_forward_matches_hf():
    # Load both HF and Archon models
    # Run forward on same input
    # Compare logits within tolerance

Important: Do NOT hardcode the test categories. Inspect the existing test files in tests/experimental/archon/ and follow the same patterns, fixtures, and markers. Adapt test scope to the model's specific features (e.g., add MoE-specific tests only if the model has MoE).

Reference Implementations

Model Directory Features
Qwen2 areal/experimental/models/archon/qwen2/ Dense, attention bias, no QK norm
Qwen3 areal/experimental/models/archon/qwen3/ Dense + MoE, QK norm, no attention bias, shared experts

Architecture Decision Map

Feature qwen2 qwen3 What to check in target model
Attention bias Yes No attention_bias in HF config
QK norm No Yes qk_norm in HF config or QKNorm module in modeling file
MoE No Yes num_experts/num_local_experts in HF config
Shared experts No Yes num_shared_experts in HF config
Decoder sparse step No Yes decoder_sparse_step in HF config
Weight tying Both Both tie_word_embeddings in HF config
RoPE Standard Standard (re-export qwen2) Check inv_freq formula in HF modeling code

Common Mistakes

  • Not mapping all HF keys in state_dict_adapter.py (causes silent weight drops)
  • Wrong from_hf_config() field mapping (uses wrong HF config attribute name)
  • Forgetting to handle None keys in from_hf_map (keys to skip like rotary_emb.inv_freq)
  • Missing MoE expert weight 3D<->2D conversion when model has MoE
  • Wrong TP plan for attention with/without QK norm (use_local_output must match)
  • Forgetting to add import line in areal/experimental/models/archon/__init__.py
  • Not including all model_type variants in supported_model_types frozenset
  • Using print instead of areal.utils.logging.getLogger()

File Checklist

After completion, verify all files exist and are consistent:

  • areal/experimental/models/archon/<model>/__init__.py
  • areal/experimental/models/archon/<model>/spec.py -- ModelSpec + register
  • areal/experimental/models/archon/<model>/model/args.py -- ModelArgs + from_hf_config
  • areal/experimental/models/archon/<model>/model/model.py -- Model + Attention + FFN
  • areal/experimental/models/archon/<model>/model/rope.py -- RoPE (or re-export)
  • areal/experimental/models/archon/<model>/model/state_dict_adapter.py -- Key mapping
  • areal/experimental/models/archon/<model>/infra/parallelize.py -- Parallel strategy
  • areal/experimental/models/archon/__init__.py -- Import line added
  • tests/experimental/archon/test_<model>_*.py -- Tests

指导在AReaL项目中添加新数据集加载器的技能。适用于用户询问如何集成新数据集或创建加载器时,提供创建数据集文件及注册到__init__.py的分步指南。
用户询问如何添加数据集 用户希望集成新数据集 用户提到创建数据集加载器
.agents/skills/add-dataset/SKILL.md
npx skills add areal-project/AReaL --skill add-dataset -g -y
SKILL.md
Frontmatter
{
    "name": "add-dataset",
    "description": "Guide for adding a new dataset loader to AReaL. Use when user wants to add a new dataset."
}

Add Dataset

Add a new dataset loader to AReaL.

When to Use

This skill is triggered when:

  • User asks "how do I add a dataset?"
  • User wants to integrate a new dataset
  • User mentions creating a dataset loader

Step-by-Step Guide

Step 1: Create Dataset File

Create areal/dataset/<name>.py:

from datasets import Dataset, load_dataset


def get_<name>_sft_dataset(
    path: str,
    split: str,
    tokenizer,
    max_length: int | None = None,
) -> Dataset:
    """Load dataset for SFT training.

    Args:
        path: Path to dataset (HuggingFace hub or local path)
        split: Dataset split (train/validation/test)
        tokenizer: Tokenizer for processing
        max_length: Maximum sequence length (optional)

    Returns:
        HuggingFace Dataset with processed samples
    """
    dataset = load_dataset(path=path, split=split)

    def process(sample):
        # Tokenize the full sequence (prompt + response)
        seq_token = tokenizer.encode(
            sample["question"] + sample["answer"] + tokenizer.eos_token
        )
        prompt_token = tokenizer.encode(sample["question"])
        # Loss mask: 0 for prompt, 1 for response
        loss_mask = [0] * len(prompt_token) + [1] * (len(seq_token) - len(prompt_token))
        return {"input_ids": seq_token, "loss_mask": loss_mask}

    dataset = dataset.map(process).remove_columns(["question", "answer"])

    if max_length is not None:
        dataset = dataset.filter(lambda x: len(x["input_ids"]) <= max_length)

    return dataset


def get_<name>_rl_dataset(
    path: str,
    split: str,
    tokenizer,
    max_length: int | None = None,
) -> Dataset:
    """Load dataset for RL training.

    Args:
        path: Path to dataset
        split: Dataset split
        tokenizer: Tokenizer for length filtering
        max_length: Maximum sequence length

    Returns:
        HuggingFace Dataset with prompts and answers for reward computation
    """
    dataset = load_dataset(path=path, split=split)

    def process(sample):
        messages = [
            {
                "role": "user",
                "content": sample["question"],
            }
        ]
        return {"messages": messages, "answer": sample["answer"]}

    dataset = dataset.map(process).remove_columns(["question"])

    if max_length is not None:

        def filter_length(sample):
            content = sample["messages"][0]["content"]
            tokens = tokenizer.encode(content)
            return len(tokens) <= max_length

        dataset = dataset.filter(filter_length)

    return dataset

Step 2: Register in init.py

Update areal/dataset/__init__.py:

# Add to VALID_DATASETS
VALID_DATASETS = [
    # ... existing datasets
    "<name>",
]

# Add to _get_custom_dataset function
def _get_custom_dataset(name: str, ...):
    # ... existing code
    elif name == "<name>":
        from areal.dataset.<name> import get_<name>_sft_dataset, get_<name>_rl_dataset
        if dataset_type == "sft":
            return get_<name>_sft_dataset(path, split, max_length, tokenizer)
        else:
            return get_<name>_rl_dataset(path, split, max_length, tokenizer)

Step 3: Add Config (Optional)

If the dataset needs special configuration, add to areal/api/cli_args.py:

@dataclass
class TrainDatasetConfig:
    # ... existing fields
    <name>_specific_field: Optional[str] = None

Step 4: Add Tests

Create tests/test_<name>_dataset.py:

import pytest
from areal.dataset.<name> import get_<name>_sft_dataset, get_<name>_rl_dataset

def test_sft_dataset_loads(tokenizer):
    dataset = get_<name>_sft_dataset("path/to/data", split="train", tokenizer=tokenizer)
    assert len(dataset) > 0
    assert "input_ids" in dataset.column_names
    assert "loss_mask" in dataset.column_names

def test_rl_dataset_loads(tokenizer):
    dataset = get_<name>_rl_dataset("path/to/data", split="train", tokenizer=tokenizer)
    assert len(dataset) > 0
    assert "messages" in dataset.column_names
    assert "answer" in dataset.column_names

Reference Implementations

Dataset File Description
GSM8K areal/dataset/gsm8k.py Math word problems
Geometry3K areal/dataset/geometry3k.py Geometry problems
CLEVR areal/dataset/clevr_count_70k.py Visual counting
HH-RLHF areal/dataset/hhrlhf.py Helpfulness/Harmlessness
TORL areal/dataset/torl_data.py Tool-use RL

Required Fields

SFT Dataset

{
    "messages": [
        {"role": "user", "content": "..."},
        {"role": "assistant", "content": "..."},
    ]
}

RL Dataset

{
    "messages": [
        {"role": "user", "content": "..."},
    ],
    "answer": "ground_truth_for_reward",
    # Optional metadata for reward function
}

Common Mistakes

  • Returning List[Dict] instead of HuggingFace Dataset
  • Using Python loops instead of dataset.map()/filter()
  • Missing "messages" field for RL datasets
  • Wrong message format (should be list of dicts with role and content)
  • Not registering in __init__.py
指导用户为AReaL框架添加自定义奖励函数。涵盖创建Python文件、在__init__.py中注册函数、处理异步阻塞操作及编写单元测试的完整步骤。
用户询问如何添加奖励函数 用户希望实现自定义奖励逻辑 用户提及奖励计算相关需求
.agents/skills/add-reward/SKILL.md
npx skills add areal-project/AReaL --skill add-reward -g -y
SKILL.md
Frontmatter
{
    "name": "add-reward",
    "description": "Guide for adding a new reward function to AReaL. Use when user wants to create a reward function."
}

Add Reward

Add a new reward function to AReaL.

When to Use

This skill is triggered when:

  • User asks "how do I add a reward function?"
  • User wants to implement custom rewards
  • User mentions reward computation

Step-by-Step Guide

Step 1: Create Reward File

Create areal/reward/<name>.py:

from typing import Any

from areal.utils import logging

logger = logging.getLogger("MyReward")


def <name>_reward_fn(
    prompt: str,
    completions: str,
    prompt_ids,
    completion_ids,
    answer: str | None = None,
    **kwargs: Any,
) -> float:
    """Compute reward for a single completion.

    Args:
        prompt: Prompt string
        completions: Completion string (model output)
        prompt_ids: Tokenized prompt IDs
        completion_ids: Tokenized completion IDs
        answer: Ground truth answer from dataset (optional)
        **kwargs: Additional data from dataset

    Returns:
        Reward value (float), typically 0.0 or 1.0
    """
    try:
        # Extract answer from completion
        extracted = _extract_answer(completions)

        # Compare with ground truth
        if answer is not None and extracted == str(answer):
            return 1.0
        return 0.0
    except Exception:
        logger.warning("Exception in reward computation", exc_info=True)
        return 0.0


def _extract_answer(completion: str) -> str:
    """Extract the answer from a completion string.

    Implement your extraction logic here.
    """
    # Example: Extract content from \boxed{}
    import re

    match = re.search(r"\\boxed\{([^}]+)\}", completion)
    if match:
        return match.group(1).strip()
    return completion.strip()

Step 2: Register in init.py

Update areal/reward/__init__.py:

# Add to VALID_REWARD_FN
VALID_REWARD_FN = [
    # ... existing reward functions
    "<name>",
]

# Add to get_reward_fn function
def get_reward_fn(name: str, **kwargs):
    # ... existing code
    elif name == "<name>":
        from areal.reward.<name> import <name>_reward_fn
        return <name>_reward_fn

Step 3: Handle Blocking Operations

If your reward function uses blocking operations (e.g., API calls, model inference), the workflow will wrap it with AsyncRewardWrapper:

# In your workflow
from areal.reward import AsyncRewardWrapper

self.reward_fn = AsyncRewardWrapper(reward_fn)

# Then call it asynchronously
rewards = await self.reward_fn(prompt, completions, **data)

Step 4: Add Tests

Create tests/test_<name>_reward.py:

import pytest
from areal.reward.<name> import <name>_reward_fn

def test_reward_correct_answer():
    reward = <name>_reward_fn(
        prompt="What is 2+2?",
        completions="The answer is \\boxed{4}",
        prompt_ids=None,
        completion_ids=None,
        answer="4",
    )
    assert reward == 1.0

def test_reward_wrong_answer():
    reward = <name>_reward_fn(
        prompt="What is 2+2?",
        completions="The answer is \\boxed{5}",
        prompt_ids=None,
        completion_ids=None,
        answer="4",
    )
    assert reward == 0.0

Reference Implementations

Reward File Description
GSM8K areal/reward/gsm8k.py Math answer verification
Geometry3K areal/reward/geometry3k.py Geometry answer verification
CLEVR areal/reward/clevr_count_70k.py Counting verification
MathVerify areal/reward/math_verify.py General math verification

Function Signature

All reward functions must follow this signature:

def reward_fn(
    prompt: str,               # Input prompt string
    completions: str,          # Model completion string
    prompt_ids,                # Tokenized prompt
    completion_ids,            # Tokenized completion
    **kwargs: Any,             # Additional data from dataset (e.g., answer)
) -> float:                    # Reward value (typically 0.0 or 1.0)

Note: The reward function is called once per sample. Batching is handled by AsyncRewardWrapper in the workflow.

Key Requirements

  1. Deterministic: Same inputs should produce same outputs
  2. Return float: Output is a single float value per sample
  3. No blocking in async context: Use AsyncRewardWrapper if needed
  4. Logging: Use areal.utils.logging, not print
  5. Handle exceptions: Return 0.0 on error, don't raise

Common Mistakes

  • Returning a tensor instead of a float
  • Expecting batched inputs (reward is called per sample)
  • Non-deterministic behavior
  • Blocking operations without AsyncRewardWrapper
  • Raising exceptions instead of returning 0.0
指导在AReaL项目中添加单元测试的技能。涵盖测试类型区分、文件命名规范、Arrange-Act-Assert编写模式及Pytest标记策略,适用于新增功能测试或提升覆盖率场景。
用户询问如何添加测试 用户希望增加测试覆盖率 用户需要为新功能编写测试 用户想了解AReaL测试模式
.agents/skills/add-unit-tests/SKILL.md
npx skills add areal-project/AReaL --skill add-unit-tests -g -y
SKILL.md
Frontmatter
{
    "name": "add-unit-tests",
    "description": "Guide for adding unit tests to AReaL. Use when user wants to add tests for new functionality or increase test coverage."
}

Add Unit Tests

Add unit tests to AReaL following the project's testing conventions.

When to Use

This skill is triggered when:

  • User asks "how do I add tests?"
  • User wants to increase test coverage
  • User needs to write tests for new functionality
  • User wants to understand AReaL testing patterns

Step-by-Step Guide

Step 1: Understand Test Types

AReaL has two main test categories:

Test Type Purpose Location Pattern How It Runs
Unit Tests Test individual functions/modules tests/test_<module>_<feature>.py Directly via pytest
Distributed Tests Test distributed/parallel behavior tests/torchrun/run_*.py Via torchrun (called by pytest subprocess)

Note: All tests are invoked via pytest. Distributed tests use torchrun but are still called from pytest test files.

Step 2: Create Test File Structure

Create test file with naming convention: test_<module>_<feature>.py

import pytest
import torch

# Import the module to test
from areal.dataset.gsm8k import get_gsm8k_sft_dataset
from tests.utils import get_dataset_path  # Optional test utilities
# For mocking tokenizer: from unittest.mock import MagicMock

Step 3: Write Test Functions

Follow Arrange-Act-Assert pattern:

def test_function_under_condition_returns_expected():
    """Test that function returns expected value under condition."""
    # Arrange
    input_data = 5
    expected_output = 10

    # Act
    result = function_under_test(input_data)

    # Assert
    assert result == expected_output

Step 4: Add Pytest Markers and CI Strategy

Use appropriate pytest markers:

Marker When to Use
@pytest.mark.slow Test takes > 10 seconds (excluded from CI by default)
@pytest.mark.ci Slow test that must run in CI (use with @pytest.mark.slow)
@pytest.mark.asyncio Async test functions
@pytest.mark.skipif(cond, reason=...) Conditional skip
@pytest.mark.parametrize(...) Parameterized tests

CI Test Strategy:

  • @pytest.mark.slow: Excluded from CI by default (CI runs pytest -m "not slow")
  • @pytest.mark.slow + @pytest.mark.ci: Slow but must run in CI
  • No marker: Runs in CI (fast unit tests)
@pytest.mark.asyncio
async def test_async_function():
    result = await async_function()
    assert result == expected

@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_gpu_feature():
    tensor = torch.tensor([1, 2, 3], device="cuda")
    # ... assertions

@pytest.mark.parametrize("batch_size", [1, 4, 16])
def test_with_parameters(batch_size):
    # Parameterized test

@pytest.mark.slow
def test_slow_function():
    # Excluded from CI by default

@pytest.mark.slow
@pytest.mark.ci
def test_slow_but_required_in_ci():
    # Slow but must run in CI

Step 5: Mock Distributed Environment

For unit tests that need distributed mocks:

import torch.distributed as dist

def test_distributed_function(monkeypatch):
    monkeypatch.setattr(dist, "get_rank", lambda: 0)
    monkeypatch.setattr(dist, "get_world_size", lambda: 2)
    result = distributed_function()
    assert result == expected

Step 6: Handle GPU Dependencies

Always skip gracefully when GPU unavailable:

CUDA_AVAILABLE = torch.cuda.is_available()

@pytest.mark.skipif(not CUDA_AVAILABLE, reason="CUDA not available")
def test_gpu_function():
    tensor = torch.tensor([1, 2, 3], device="cuda")
    # ... assertions

Key Requirements (Based on testing.md)

Mocking Distributed

  • Use torch.distributed.fake_pg for unit tests
  • Mock dist.get_rank() and dist.get_world_size() explicitly
  • Don't mock internals of FSDP/DTensor

GPU Test Constraints

  • Always skip gracefully when GPU unavailable
  • Clean up GPU memory: torch.cuda.empty_cache() in fixtures
  • Use smallest possible model/batch for unit tests

Assertions

  • Use torch.testing.assert_close() for tensor comparison
  • Specify rtol/atol explicitly for numerical tests
  • Avoid bare assert tensor.equal() - no useful error message

Reference Implementations

Test File Description Key Patterns
tests/test_utils.py Utility function tests Fixtures, parametrized tests
tests/test_examples.py Integration tests with dataset loading Dataset path resolution, success pattern matching
tests/test_fsdp_engine_nccl.py Distributed tests Torchrun integration

Common Mistakes

  • Missing test file registration: Ensure file follows test_*.py naming
  • GPU dependency without skip: Always use @pytest.mark.skipif for GPU tests
  • Incorrect tensor comparisons: Use torch.testing.assert_close() not assert tensor.equal()
  • Memory leaks in GPU tests: Clean up with torch.cuda.empty_cache()
  • Mocking too much: Don't mock FSDP/DTensor internals
  • Unclear test names: Follow test_<what>_<condition>_<expected> pattern
  • No docstrings: Add descriptive docstrings to test functions

Integration with Other Skills

This skill complements other AReaL development skills:

  • After /add-dataset: Add tests for new dataset loaders
  • After /add-workflow: Add tests for new workflows
  • After /add-reward: Add tests for new reward functions
  • With expert agents: Reference this skill when planning test implementation

Running Tests

# First check GPU availability (many tests require GPU)
python -c "import torch; print('GPU available:', torch.cuda.is_available())"

# Run specific test file
uv run pytest tests/test_<name>.py

# Skip slow tests (CI default)
uv run pytest -m "not slow"

# Run with verbose output
uv run pytest -v

# Run distributed tests (requires torchrun and multi-GPU)
# Note: Usually invoked via pytest test files
torchrun --nproc_per_node=2 tests/torchrun/run_<test>.py
指导如何在AReaL中创建新的RolloutWorkflow。涵盖理解需求、编写异步工作流类、注册到模块及更新训练脚本的步骤,支持自定义 rollout 实现。
用户询问如何添加工作流 用户想要创建新的 RolloutWorkflow 用户提及实现自定义 rollout
.agents/skills/add-workflow/SKILL.md
npx skills add areal-project/AReaL --skill add-workflow -g -y
SKILL.md
Frontmatter
{
    "name": "add-workflow",
    "description": "Guide for adding a new RolloutWorkflow to AReaL. Use when user wants to create a new workflow."
}

Add Workflow

Add a new RolloutWorkflow implementation to AReaL.

When to Use

This skill is triggered when:

  • User asks "how do I add a workflow?"
  • User wants to create a new RolloutWorkflow
  • User mentions implementing a custom rollout

Prerequisites

Before starting, ensure you understand:

  • The workflow's purpose and requirements
  • Input/output data format
  • Reward function to use

Step-by-Step Guide

Step 1: Create Workflow File

Create areal/workflow/<name>.py:

import uuid
from typing import Any, Callable

import torch

from areal.api.cli_args import GenerationHyperparameters
from areal.api.engine_api import InferenceEngine
from areal.api.io_struct import ModelRequest, ModelResponse
from areal.api.reward_api import AsyncRewardWrapper
from areal.api.workflow_api import RolloutWorkflow
from areal.utils import logging

logger = logging.getLogger("MyWorkflow")


class MyWorkflow(RolloutWorkflow):
    """Description of your workflow."""

    def __init__(
        self,
        gconfig: GenerationHyperparameters,
        tokenizer,
        reward_fn: Callable,
    ):
        self.gconfig = gconfig.new_with_stop_and_pad_token_ids(tokenizer)
        self.tokenizer = tokenizer
        self.async_reward_fn = AsyncRewardWrapper(reward_fn)

    async def arun_episode(
        self,
        engine: InferenceEngine,
        data: dict[str, Any],
    ) -> dict[str, Any] | None | dict[str, InteractionWithTokenLogpReward]:
        """Run a single episode. MUST be async and non-blocking."""

        # 1. Prepare input_ids from data
        input_ids = self.tokenizer.apply_chat_template(
            data["messages"],
            tokenize=True,
            add_generation_prompt=True,
        )

        # 2. Build ModelRequest
        req = ModelRequest(
            rid=uuid.uuid4().hex,
            input_ids=list(input_ids),
            gconfig=self.gconfig.new(n_samples=1),
            tokenizer=self.tokenizer,
        )

        # 3. Generate completion (async)
        resp: ModelResponse = await engine.agenerate(req)

        # 4. Compute reward (async)
        prompt_str = self.tokenizer.decode(input_ids)
        completion_str = self.tokenizer.decode(resp.output_tokens)
        reward = await self.async_reward_fn(
            prompt_str,
            completion_str,
            resp.input_tokens,
            resp.output_tokens,
            **data,
        )

        # 5. Return results in expected format
        return {
            "input_ids": torch.tensor(resp.input_tokens),
            "output_ids": torch.tensor(resp.output_tokens),
            "reward": torch.tensor(reward),
        }

Step 2: Register in init.py

Add to areal/workflow/__init__.py:

from areal.workflow.<name> import MyWorkflow

__all__ = [
    # ... existing exports
    "MyWorkflow",
]

Step 3: Update Entry Script

Update your training script to use the new workflow:

trainer.train(
    workflow="areal.workflow.<name>.MyWorkflow",
    # ... other args
)

Step 4: Add Tests

Create tests/test_<name>_workflow.py:

import pytest
from areal.workflow.<name> import MyWorkflow

@pytest.mark.asyncio
async def test_workflow_basic():
    # Test basic functionality
    pass

Reference Implementations

Workflow File Description
MultiTurnWorkflow areal/workflow/multi_turn.py Multi-turn conversation
RLVRWorkflow areal/workflow/rlvr.py RL with verifiable rewards
VisionRLVRWorkflow areal/workflow/vision_rlvr.py Vision + RLVR

Key Requirements

  1. Async: arun_episode must be async def and non-blocking
  2. No sync I/O: Use aiofiles for file operations
  3. Wrap rewards: Use AsyncRewardWrapper for reward functions
  4. Tensor format: Output tensors should be [batch, seq_len, ...]
  5. Use helpers: concat_padded_tensors for combining outputs

Common Mistakes

  • Using open() instead of aiofiles.open()
  • Forgetting to await async calls
  • Not wrapping reward function with AsyncRewardWrapper
  • Wrong tensor shape conventions
提供AReaL仓库的Conventional Commits规范,自动根据文件路径推断提交范围。适用于所有Git提交场景,确保消息格式统一、类型准确及作用域清晰。
执行git commit命令时 创建Pull Request时 子代理执行产生提交的任何工作流中
.agents/skills/commit-conventions/SKILL.md
npx skills add areal-project/AReaL --skill commit-conventions -g -y
SKILL.md
Frontmatter
{
    "name": "commit-conventions",
    "description": "AReaL commit message conventions. MUST load on every git commit -- provides Conventional Commits format with scope inference from file paths."
}

Commit Conventions

Commit message conventions and scope inference rules for the AReaL repository.

When to Use

ALWAYS load this skill when making any git commit in AReaL. This includes:

  • Direct commits (git commit)
  • Commits during PR creation ($create-pr / /create-pr)
  • Commits delegated to sub-agents with this skill loaded
  • Any agent workflow that produces a commit

Commit Message Format

<type>(<scope>): <subject>

<body>

[Optional sections:]
Key changes:
- change 1
- change 2

Refs: #123, #456

Type Selection

Type When to Use
feat New feature or capability
fix Bug fix
docs Documentation only
gov Governance or maintainer changes
style Formatting/style-only changes
refactor Code change without feature/fix
perf Performance improvement
test Adding or fixing tests
build Build system or dependencies
ci CI pipeline or workflow changes
chore Build, deps, config changes
revert Revert a previous commit

Scope Inference

Infer scope from the primary changed file paths:

File Path Pattern Scope
areal/workflow/ workflow
areal/engine/ engine
areal/reward/ reward
areal/dataset/ dataset
areal/api/ api
areal/utils/ utils
areal/infra/ infra
areal/trainer/ trainer
areal/models/ models
areal/experimental/ experimental
docs/ docs
examples/ examples
AGENTS.md, .agents/, .claude/, .codex/, .opencode/ agents
Multiple areas Omit scope or use broader term

Rules

  • Subject: imperative mood, ~50-72 chars, no trailing period
  • Body: explain "why" not "what", wrap at 72 chars
  • Key changes: bullet list of main modifications (for complex commits with 3+ files)
  • Refs: reference issues/PRs if applicable

Examples

Single file fix:

fix(reward): handle empty completion in gsm8k

Return 0 reward instead of raising exception when
completion string is empty after extraction.

Multi-file feature:

feat(engine): add CPU offload support to ArchonEngine

Enable torch_memory_saver for model offloading during
rollout phase to reduce GPU memory pressure.

Key changes:
- Add offload/onload methods to ArchonEngine
- Integrate with weight update flow
- Handle ROCm compatibility

Docs only:

docs: update algorithm comparison table

Add SAPO and GSPO to the algorithm family documentation
with configuration examples.

Agent/tooling changes:

chore(agents): port review-pr command to OpenCode

Add OpenCode-native commands with task() category
delegation instead of hardcoded model names.

Key changes:
- Create .opencode/command/ with review-pr, create-pr
- Replace hardcoded model routing with platform-native review routing
- Add expert subagent consultation patterns

Governance/maintainer changes:

gov(agents): add maintainer ownership for service modules

Update CODEOWNERS and maintainer references to reflect
current governance responsibilities.

Key changes:
- Add maintainers for agent_service and infra ownership
- Align governance docs with updated reviewer responsibilities

用于在当前分支创建或更新 GitHub PR。流程包括预检、Rebase、Squash提交、生成符合规范的Commit和PR描述,最后推送并调用gh命令创建或编辑PR,支持Draft模式。
用户要求为当前分支创建 Pull Request 用户要求更新现有的 Pull Request
.agents/skills/create-pr/SKILL.md
npx skills add areal-project/AReaL --skill create-pr -g -y
SKILL.md
Frontmatter
{
    "name": "create-pr",
    "description": "Rebase the current branch onto the latest base branch, squash local commits, generate a Conventional Commit message, and create or update the GitHub pull request."
}

Create Pull Request

Use this skill when the user asks to create or update a PR for the current branch.

Inputs

  • Optional --draft
  • Optional --base <branch> (default: main)

Preconditions

  1. Verify the current branch is not main or master.
  2. Check for uncommitted changes with git status --short.
  3. Ensure gh is available.
  4. If there are uncommitted changes, stop and ask the user whether to commit or stash them first.

Workflow

Step 1: Check for an existing PR

  • Run gh pr view --json number,title,url,state,isDraft.
  • If a PR already exists, tell the user before rewriting history or force-pushing.

Step 2: Fetch and rebase

git fetch origin <base>
git rebase origin/<base>
  • If rebase conflicts occur, abort the rebase and stop.
  • Tell the user which files conflicted and ask them to resolve manually.

Step 3: Squash into one commit

git reset --soft origin/<base>
  • Load the commit-conventions skill before generating the commit message.
  • Infer type and scope from the staged diff.
  • Keep the commit subject imperative and under about 72 characters.

Step 4: Generate PR title and body

  • Use the squashed commit message style for the PR title.
  • Follow the repository PR template at .github/PULL_REQUEST_TEMPLATE.md.
  • Summarize user-facing changes, risk areas, test commands run, and skipped suites with reasons.

Step 5: Push and create or update the PR

  • Push the branch.
  • If history was rewritten, confirm before force-pushing.
  • Create or update the PR with gh pr create or gh pr edit.
  • Respect --draft when requested.

Guardrails

  • Never create a PR from main or master.
  • Never silently force-push over an existing PR branch.
  • Never bypass commit-conventions for the squashed commit.
  • If gh authentication or remote permissions fail, stop and report the exact blocker.

Output

Report:

  • Base branch used
  • Final commit message
  • PR title
  • PR URL, if creation or update succeeded
  • Any steps that were skipped or require user follow-up
用于排查AReaL分布式训练(FSDP2/TP/CP/EP)问题的指南,涵盖死锁、结果错误、OOM及通信异常。提供最小复现策略、环境配置、py-spy栈追踪及梯度检查方法。
训练挂起或死锁 多卡结果不一致或数值错误 分布式设置下的OOM错误 NCCL通信错误或设备网格问题
.agents/skills/debug-distributed/SKILL.md
npx skills add areal-project/AReaL --skill debug-distributed -g -y
SKILL.md
Frontmatter
{
    "name": "debug-distributed",
    "description": "Guide for debugging distributed training issues in AReaL. Use when user encounters hangs, wrong results, OOM, or communication errors."
}

Debug Distributed Training

Debugging guide for distributed training issues in AReaL (FSDP2, TP, CP, EP).

When to Use

This skill is triggered when:

  • Training hangs or deadlocks
  • Results differ across ranks or are numerically wrong
  • OOM errors in distributed settings
  • NCCL/communication errors or device mesh issues

Debugging Principles

Minimal Reproduction

Always follow the minimal demo principle: Reproduce with the least amount of code to narrow down the issue faster.

# Bad: Debug in full training loop
# Good: Create minimal script
import torch
import torch.distributed as dist

dist.init_process_group("nccl")
rank = dist.get_rank()

# Reproduce the exact operation that fails
tensor = torch.ones(10).cuda()
dist.all_reduce(tensor)  # <-- Isolate the failing op
print(f"Rank {rank}: {tensor}")

Reduction strategy:

  1. Remove unrelated model components
  2. Use small tensor sizes
  3. Reduce world_size to minimum (e.g., 2 GPUs)
  4. Remove torch.compile if possible
  5. Disable activation checkpointing

Step-by-Step Debugging Guide

1. Hang Debugging (Deadlocks, Synchronization)

Environment Variables for Debugging:

# Full debug logging
export TORCH_DISTRIBUTED_DEBUG=DETAIL
export NCCL_DEBUG=INFO
export NCCL_DEBUG_SUBSYS=ALL

# torch.compile debugging
export TORCH_LOGS="+dynamo,recompiles"
export TORCHDYNAMO_VERBOSE=1

Dump Call Stack with py-spy (for hung processes):

# Find process IDs
ps aux | grep python

# Dump call stack of specific rank
py-spy dump --pid <PID>

# Record flame graph for performance analysis
py-spy record -o profile.svg --pid <PID> --duration 30

Common Causes:

  1. Mismatched Collectives: One rank calls all_reduce, another doesn't.
  2. Wrong Process Group: Using wrong group for collective.
  3. Tensor Shape Mismatch: Different shapes across ranks.

Debug Steps:

# Verify group membership
mesh = parallel_dims.get_mesh("dp_shard_cp")
group = mesh.get_group()
print(f"Rank {dist.get_rank()}: group size = {dist.get_world_size(group)}")

# Print shapes on all ranks
print(f"Rank {dist.get_rank()}: tensor.shape = {tensor.shape}")
dist.barrier()

Timeout Adjustment (for debugging only):

from areal.engine.core.distributed import patch_dist_group_timeout
from datetime import timedelta
patch_dist_group_timeout(timedelta(minutes=30))

2. Wrong Results (Gradient, Reduction Issues)

Check DTensor Placements:

from torch.distributed.tensor import DTensor
if isinstance(param, DTensor):
    print(f"Param {name}: placements={param.placements}, mesh={param.device_mesh}")

Verify Gradient Reduction:

for name, param in model.named_parameters():
    if param.grad is not None:
        print(f"Rank {dist.get_rank()}: {name} grad_sum = {param.grad.sum().item()}")

3. OOM Issues (Memory, Sharding)

Check Memory Usage:

print(f"Rank {dist.get_rank()}: "
      f"allocated={torch.cuda.memory_allocated()/1e9:.2f}GB, "
      f"reserved={torch.cuda.memory_reserved()/1e9:.2f}GB")

Check FSDP Coverage:

for name, param in model.named_parameters():
    is_dtensor = isinstance(param, DTensor)
    print(f"{name}: is_dtensor={is_dtensor}, shape={param.shape}")

4. Communication Errors

Error Cause Solution
NCCL WARN Cuda failure GPU communication Check NCCL version, GPU topology
RuntimeError: Timed out Rank synchronization Increase timeout, check code paths
Invalid device mesh Mesh configuration Verify world_size = dp * tp * cp

Debugging Tools

Environment Variables Reference

Variable Purpose
TORCH_DISTRIBUTED_DEBUG=DETAIL Detailed distributed logging
NCCL_DEBUG=INFO NCCL communication logging
NCCL_DEBUG_SUBSYS=ALL All NCCL subsystems
TORCH_LOGS="+dynamo,recompiles" torch.compile logging
TORCHDYNAMO_VERBOSE=1 Dynamo verbose output
CUDA_LAUNCH_BLOCKING=1 Synchronous CUDA (slow, for debugging)

py-spy for Call Stack Analysis

# Install
pip install py-spy

# Dump call stack of hung process
py-spy dump --pid <PID>

# Dump all Python processes
pgrep -f python | xargs -I {} py-spy dump --pid {}

# Record flame graph
py-spy record -o profile.svg --pid <PID> --duration 30

Rank-Conditional Printing

def print_all_ranks(msg):
    for r in range(dist.get_world_size()):
        if dist.get_rank() == r:
            print(f"[Rank {r}] {msg}")
        dist.barrier()

Check Device Mesh

def debug_mesh(parallel_dims):
    mesh = parallel_dims.world_mesh
    for dim_name in mesh.mesh_dim_names:
        submesh = parallel_dims.get_mesh(dim_name)
        if submesh:
            print(f"Rank {dist.get_rank()}: {dim_name} size={submesh.size()}")

Validate Tensor Consistency

def check_tensor_consistency(tensor, name, group=None):
    local_sum = tensor.sum().item()
    tensor_sums = [None] * dist.get_world_size(group)
    dist.all_gather_object(tensor_sums, local_sum, group=group)
    if dist.get_rank() == 0 and len(set(tensor_sums)) > 1:
        print(f"WARNING: {name} inconsistent: {tensor_sums}")

Key Files Reference

Component File
Parallel Dims areal/experimental/models/archon/parallel_dims.py
Expert Parallel areal/experimental/models/archon/expert_parallel.py
Ulysses (CP) areal/experimental/models/archon/ulysses.py
FSDP/TP Apply areal/experimental/models/archon/qwen2/infra/parallelize.py
用于只读审查PR的工作流,包含风险分析、目标检查表和Codex子代理咨询。遵循硬规则禁止修改环境,分阶段解析上下文、分析变更、规划审查并生成按严重程度排序的发现报告。
用户请求对当前分支或特定PR进行代码审查 需要评估PR风险等级和潜在故障模式
.agents/skills/review-pr/SKILL.md
npx skills add areal-project/AReaL --skill review-pr -g -y
SKILL.md
Frontmatter
{
    "name": "review-pr",
    "description": "Read-only pull request review workflow with risk analysis, targeted checklists, and Codex subagent consultation."
}

Review Pull Request

Use this skill when the user asks for a PR review of the current branch or a specific PR.

Inputs

  • Optional PR number
  • Optional --quick to stop after the change analysis phase

Hard Rules

  • Stay read-only.
  • Do not edit files, commit, push, rebase, or change GitHub state.
  • Do not run build, install, or test commands that mutate the environment.
  • Use gh for PR metadata and git diff retrieval.

Reference Files

  • references/review-pr-domains-and-signals.md
  • references/review-pr-templates.md

Workflow

Phase 1: Resolve PR context

  1. Use gh pr view to fetch PR title, body, state, draft status, and changed files.
  2. If no PR exists, stop and report that clearly.
  3. If the PR is closed, stop.
  4. Record the branch name and changed file list.

Phase 2: Change analysis

  1. Classify changed files using references/review-pr-domains-and-signals.md.
  2. Determine the highest overall risk level: CRITICAL, HIGH, MEDIUM, or LOW.
  3. Build a CHANGE_ANALYSIS_REPORT that lists:
    • detected domains/signals
    • risk level
    • affected files
    • related frameworks
    • likely failure modes

If --quick is set, return the change analysis report and stop here.

Phase 3: Review planning

  1. Select the smallest useful set of review passes from references/review-pr-templates.md.
  2. Split by risk area, not by file count.
  3. Always include at least one general logic pass.

Phase 4: Expert consultation

Consult the matching Codex subagents registered in .codex/config.toml when relevant:

  • archon-expert
  • fsdp-expert
  • megatron-expert
  • algorithm-expert
  • launcher-expert

If the Codex runtime supports parallel subagent execution, run independent review passes in parallel. Otherwise, execute them serially.

Phase 5: Final review

Produce findings first, ordered by severity:

  1. CRITICAL
  2. HIGH
  3. MEDIUM
  4. LOW

For every finding, include:

  • file path
  • line number when available
  • why it is a bug, regression, or risk
  • concrete fix direction

What to Ignore

  • Pure style nits with no correctness impact
  • Issues outside the changed scope unless the PR makes them worse
  • Failures that standard linters or formatters would already catch
  • Speculative concerns with no concrete trigger in the diff

Output Shape

Use this structure:

CHANGE_ANALYSIS_REPORT:
- detected_domains: [...]
- detected_signals: [...]
- risk_level: ...
- affected_files: [...]
- related_frameworks: [...]
- identified_risks: [...]

Findings
1. [severity] Title — path:line
   - Problem: ...
   - Fix: ...

Open Questions
- ...

Residual Risk
- ...
将docs/en/下的英文Markdown文档翻译为docs/zh/下的中文版本。支持全量翻译与增量更新,保留技术术语、代码块及Markdown结构,自动处理目录创建与路径映射。
用户要求翻译英文文档 需要将docs/en/内容同步至docs/zh/
.agents/skills/translate-doc-zh/SKILL.md
npx skills add areal-project/AReaL --skill translate-doc-zh -g -y
SKILL.md
Frontmatter
{
    "name": "translate-doc-zh",
    "description": "Translate an English document under `docs\/en\/` into the matching Chinese document under `docs\/zh\/`."
}

Translate Docs EN to ZH

Use this skill when the user asks to translate a document from docs/en/ to docs/zh/.

Input

  • A markdown file path under docs/en/

Workflow

Step 1: Validate the source path

  1. Confirm the file exists.
  2. Confirm it is under docs/en/.
  3. Confirm it ends with .md.
  4. If any check fails, stop and ask the user for a valid docs/en/...md path.

Step 2: Resolve the target path

  • Source: docs/en/<path>.md
  • Target: docs/zh/<path>.md

Step 3: Choose translation mode

  • If the Chinese file already exists, update only the changed parts while preserving the rest.
  • If the Chinese file does not exist, translate the full document.

Translation Rules

  • Preserve technical terms such as FSDP, FSDP2, GRPO, PPO, DAPO, MoE, LLM, RL, RLVR, Codex, Claude Code, OpenCode, Megatron, Archon, SGLang, vLLM, PyTorch, HuggingFace, and Transformers.
  • Do not translate file paths, code blocks, CLI flags, or literal configuration keys.
  • Preserve Markdown structure, tables, and fenced code blocks.
  • Use concise, professional Chinese terminology.

Error Handling

  • If the target directory does not exist, create it before writing the translated file.
  • If the source document is partially translated already, preserve sections that do not need updates.
用于升级AReaL项目运行时依赖。通过解析包版本参数,维护SGLang和vLLM双配置模型,更新pyproject文件及锁文件,重构Dockerfile,并针对核心包执行API兼容性审计以确保结构完整。
用户请求升级特定Python包版本 需要更新推理后端依赖(如sglang或vllm)
.agents/skills/upgrade-deps/SKILL.md
npx skills add areal-project/AReaL --skill upgrade-deps -g -y
SKILL.md
Frontmatter
{
    "name": "upgrade-deps",
    "description": "Upgrade focused runtime dependencies in AReaL. First validates and updates per-package API checklists for structural completeness, then updates pyproject files, resolves conflicts, locks, updates the Dockerfile, and audits API compatibility against the checklists."
}

Usage

/upgrade-deps <package==version> [<package==version> ...]

Arguments: One or more pinned package versions, e.g., /upgrade-deps megatron-core==0.17.0 sglang==0.5.10 vllm==0.18.0 transformers==4.58.0.

If a package is omitted, its current version can be preserved or upgraded, depending on the resolution of uv lock.


Architecture

Dual-Manifest Model

AReaL maintains two pyproject files because SGLang and vLLM pin mutually-incompatible torch / torchao versions:

File Inference backend Lock file
pyproject.toml SGLang (default) uv.lock
pyproject.vllm.toml vLLM uv.vllm.lock

Both share the same core dependencies, megatron extras, and dev group. They diverge only in inference backend extras and torch/torchao version constraints.

The Dockerfile builds both variants from a single file using ARG VARIANT (sglang or vllm). The base image, torch install, and flash-attn wheels are all variant-specific.

Focused Packages

The following packages are focused — their API usage in AReaL is cataloged, and any version change triggers the API compatibility audit (Step 6):

Package Import path
megatron-core megatron.core
megatron-bridge megatron.bridge
mbridge mbridge
transformers transformers
sglang sglang
vllm vllm
peft peft
torchao torchao

torch is tracked in the Package Impact Matrix below for scope and Docker awareness, but is not a focused package — it does not receive API auditing.

Package Impact Matrix

Every entry below has a variant scope that determines which files to edit, which lock files to regenerate, and whether the Dockerfile needs review. All focused packages plus torch (tracked for Docker impact only, not API-audited) are listed.

Package Scope pyproject.toml locations pyproject.vllm.toml locations Docker impact
sglang sglang-only [optional-deps].sglang Base image
vllm vllm-only [optional-deps].vllm No
megatron-core shared [optional-deps].megatron + [tool.uv].override-dependencies [optional-deps].megatron + [tool.uv].override-dependencies No
megatron-bridge shared [optional-deps].megatron [optional-deps].megatron No
mbridge shared [optional-deps].megatron (git pin) [optional-deps].megatron (git pin) No
transformers shared [project].dependencies [project].dependencies No
peft shared [project].dependencies [project].dependencies No
torch shared-divergent [project].dependencies (≥2.9.1) [project].dependencies (≥2.10.0) Stage 1 torch
torchao shared-divergent [tool.uv].override-dependencies (==0.15.0) [project].dependencies (==0.16.0) No

Scope rules:

  • sglang-only / vllm-only: Edit and lock only the affected variant.
  • shared: Edit both files; lock both variants.
  • shared-divergent: The two variants intentionally use different versions of the same package. If the user supplies a single target version, ask which variant(s) to apply it to rather than assuming convergence. If the user supplies two targets (e.g., torch==2.9.1@sglang torch==2.10.0@vllm), apply each accordingly.

Upgrade Families

Some packages are tightly coupled and should be upgraded together:

Family Members Reason
megatron megatron-core, megatron-bridge megatron-bridge wraps megatron-core; tightly coupled APIs
inference sglang or vllm + torch + torchao Inference backends pin specific torch/torchao versions

When the user upgrades one member, check the checklist of all family members for required co-upgrades and warn if a co-upgrade is needed but not requested.

Per-Package API Checklists

Each focused package has a dedicated markdown file under checklists/ that documents how AReaL uses that package's APIs:

.agents/skills/upgrade-deps/
├── SKILL.md
└── checklists/
    ├── megatron-core.md
    ├── megatron-bridge.md
    ├── vllm.md
    ├── sglang.md
    ├── transformers.md
    ├── peft.md
    └── torchao.md

Each checklist file MUST contain the following sections:

---
package: <pip-package-name>
github: <org/repo>                    # e.g., NVIDIA/Megatron-LM
branch_template: "v${VERSION}"        # how to construct the git branch/tag from version
upstream_paths:                        # source paths to cross-reference
  - path/to/relevant/file.py
  - path/to/relevant/module/
---

## Affected Files

### Primary (most likely to break)
| File | Imports / Usage |
| ---- | --------------- |
| ...  | ...             |

### Secondary
...

### Tertiary (tests, infra)
...

## API Usage Catalog

For each function/class, verify the call signature against the upstream source.
Focus on: removed params, renamed params, new required params, changed return types,
moved/renamed modules.

### 1. `module.submodule.function_or_class`

**Source:** `upstream_path/file.py`

Called in `areal/path/to/file.py:LINE`:
\```python
actual_call_site_code()
\```

**Check:** [what to verify]

### 2. ...

## Version-Guarded Code (if any)
- [file:line] description of version guard and when it can be removed

To populate a checklist, catalog AReaL's usage of the package by grepping for imports and call sites, then fill in the template sections. Each checklist should be self-contained — all API signatures, file paths, and upstream references must be recorded directly in the checklist file.


Workflow

Step 0: Parse Input and Record Baseline

  1. Parse arguments. Each argument is <package==version> or, for shared-divergent packages, <package==version@variant> (e.g., torchao==0.16.0@vllm).
  2. Validate that each package name is a recognized focused package (see Focused Packages table). If unrecognized, warn but proceed — it may be a transitive dependency the user wants to pin.
  3. For shared-divergent packages (torch, torchao): if the user supplies a single version without a @variant qualifier, ask which variant(s) to apply it to before proceeding.
  4. Record the current resolved versions of ALL focused packages from the applicable lock file(s) as the baseline snapshot. Use uv.lock for sglang-scoped and shared packages; uv.vllm.lock for vllm-scoped and shared packages. Packages scoped to a single variant appear only in that variant's lock file. This baseline will be used later to detect which packages actually changed — including transitive bumps caused by uv lock resolution, not just explicit pin changes.

Step 0.5: Validate and Update Checklists

Before modifying any dependencies, verify that the API checklists for the packages being upgraded are structurally complete — i.e., they document all current AReaL import sites and call patterns. Stale checklists lead to missed breaking changes in Step 6.

For each focused package explicitly requested in the command that has a checklist file under checklists/:

  1. Discover all usages. Grep the AReaL codebase (areal/, tests/, examples/) for all imports matching the package's import path(s). See the Import Patterns table in CHECKLIST_MAINTENANCE.md § 2 for package-specific grep patterns. For HTTP-based integrations (sglang, vllm), also scan request-building code for endpoint paths and JSON field names.

  2. Compare against the checklist. Diff the discovered files against the Affected Files tables (Primary / Secondary / Tertiary) in the checklist. Identify:

    • Missing — files that import the package but are not listed in any tier.
    • Stale — files listed in the checklist that no longer import the package.
    • Changed — files whose actual imports differ from what's documented.
  3. Classify and update. For each discrepancy, follow the Structural Validation Procedure in CHECKLIST_MAINTENANCE.md § 3:

    • Add missing files to the appropriate tier (Primary / Secondary / Tertiary).
    • Add new API Usage Catalog entries for genuinely new call patterns.
    • Remove stale entries.
    • Renumber catalog entries if needed.
  4. Update the Checklist File Status table at the bottom of this file if entry counts changed.

  5. Report changes before proceeding:

    Checklist validation for <package>:
    - Added N files to Affected Files (P primary, S secondary, T tertiary)
    - Added M new API catalog entries: [brief list]
    - Removed K stale entries: [brief list]
    - No changes needed (if clean)
    

If a requested focused package does not have a checklist file, create one from checklists/_TEMPLATE.md using the full procedure in CHECKLIST_MAINTENANCE.md § 5.

Scope note: This step validates only the packages explicitly named in the command. Packages that are transitively bumped are identified later in Step 5; their checklists are validated at that point using the same procedure before the API audit in Step 6.

Step 1: Update Pyproject Files

For each requested package, using the Package Impact Matrix:

  1. Determine the variant scope (sglang-only, vllm-only, shared, shared-divergent).
  2. Edit the version pin in all declaration locations for each affected pyproject file. A single package may appear in multiple locations:
    • [project].dependencies
    • [project.optional-dependencies].<extra>
    • [tool.uv].override-dependencies
  3. For upgrade families: if megatron-core is being upgraded, also check whether megatron-bridge needs a corresponding version bump. Warn if it does but was not included in the command.

Step 2: Lock Dependencies (Variant-Aware)

Regenerate lock files for each affected variant. A variant is affected if any of its pyproject file's dependencies were modified. If conflicts arise during locking, do NOT attempt to resolve them in this step — just report them and defer resolution to Step 3.

SGLang variant (if pyproject.toml was modified):

bash scripts/uv_lock.sh

vLLM variant (if pyproject.vllm.toml was modified):

bash scripts/uv_lock.sh

If uv lock fails for either variant:

  1. Read the error message carefully.
  2. If it's a version conflict, proceed to Step 3 to resolve, then return here to re-lock.
  3. Do NOT modify dependencies without user approval to make the lock succeed.

Validation: After locking, verify both lock files exist and are non-empty.

Step 3: Resolve Dependency Conflicts

Resolve any conflicts from Step 2. After all conflicts are resolved, return to Step 2 and re-lock the affected variant(s). Classify each conflict:

Auto-resolvable — only AReaL's pin conflicts with an upstream package, and the upstream's required version is acceptable. Update AReaL's pin automatically.

Needs user input — two upstream packages have mutual conflicts (e.g., sglang requires torch==2.9.1 but vllm requires torch==2.10.0). Summarize and ask the user.

Output format:

Summary

---
Auto-resolved (no action required):
- <name>: <packageA> requires <versionA>, <packageB> requires <versionB>,
  AReaL specified <oldVersion>, updated to <newVersion>
- ...

---
Conflicts (need user resolution):
- <name>: <packageA> requires <versionA>, <packageB> requires <versionB>
- ...

You may use override-dependencies in [tool.uv] to force-pin versions where needed. Remember that sglang and vllm are separate variants maintained in different pyproject files — they are never installed together in the same environment.

Step 4: Update Dockerfile (If Required)

Review the Package Impact Matrix "Docker impact" column. Only proceed with Dockerfile changes if an upgraded package has Docker impact. The most common triggers are:

Base image change (triggered by sglang upgrade):

The Dockerfile base image is lmsysorg/sglang:v{SGLANG_VERSION}-cu129-amd64-runtime. If sglang was upgraded, update line 9 of the Dockerfile:

FROM lmsysorg/sglang:v{NEW_SGLANG_VERSION}-cu129-amd64-runtime

Verify that the new base image tag exists on Docker Hub / GHCR before committing.

Torch version change (triggered by torch upgrade):

The Dockerfile Stage 1 installs torch with variant-specific versions. Update the version mapping in the RUN uv venv command:

&& if [ "$VARIANT" = "vllm" ]; then TORCH_VER="{VLLM_TORCH}"; else TORCH_VER="{SGLANG_TORCH}"; fi \

Also check flash-attn wheel compatibility — both flash-attn-2 and flash-attn-3 installs use a TORCH_TAG that must match the torch major.minor version. Update both occurrences in the Dockerfile:

# flash-attn-2 (first flash-attn RUN block)
&& if [ "$VARIANT" = "vllm" ]; then TORCH_TAG="torch{VLLM_TORCH_MAJOR_MINOR}"; else TORCH_TAG="torch{SGLANG_TORCH_MAJOR_MINOR}"; fi \

# flash-attn-3 (second flash-attn RUN block — same TORCH_TAG pattern)
&& if [ "$VARIANT" = "vllm" ]; then TORCH_TAG="torch{VLLM_TORCH_MAJOR_MINOR}"; else TORCH_TAG="torch{SGLANG_TORCH_MAJOR_MINOR}"; fi \

No Dockerfile changes needed for: megatron-core, megatron-bridge, transformers, peft, vllm, torchao. These are installed via uv pip install -r pyproject.toml in Stage 3, which reads the updated pyproject automatically.

Step 5: Identify Updated Focused Packages

Compare the baseline snapshot (Step 0) against the resolved versions in the lock files (uv.lock and/or uv.vllm.lock) produced by Step 2 (or re-locked after Step 3 conflict resolution). This catches not only explicitly requested upgrades but also transitive version bumps — e.g., upgrading sglang may pull in a newer transformers through dependency resolution.

Build a list of focused packages whose resolved version actually changed. This list determines which API checklists to audit in Step 6.

The focused packages to check are listed in the Focused Packages table (Architecture section):

  • megatron-core (imports as megatron.core)
  • megatron-bridge (imports as megatron.bridge)
  • transformers
  • sglang
  • vllm
  • peft
  • torchao

If a package version did NOT change (even if it was in the user's input but resolved to the same version), skip its API audit.

For any newly-identified package whose checklist was NOT already validated in Step 0.5, run the same structural validation procedure (see CHECKLIST_MAINTENANCE.md § 3) on its checklist before proceeding to Step 6.

Step 6: API Compatibility Audit

For each updated focused package that has a checklist file under checklists/:

6a. Clone upstream source

Read the checklist frontmatter to get github and branch_template. Clone or checkout the target version:

REPO_ROOT=$(pwd)
PKG_DIR="${REPO_ROOT}/<package>-src"
VERSION="<target_version>"
# Validate VERSION to prevent command injection
if [[ ! "$VERSION" =~ ^[a-zA-Z0-9._/-]+$ ]]; then
  echo "Error: Invalid version format: $VERSION"; exit 1
fi
BRANCH=$(echo "<branch_template>" | sed "s/\${VERSION}/$VERSION/")
if [ ! -d "$PKG_DIR" ]; then
  git clone --depth 1 --branch "$BRANCH" "https://github.com/<github>.git" "$PKG_DIR"
else
  (cd "$PKG_DIR" && git fetch origin && git checkout "$BRANCH")
fi

If cloning fails (tag doesn't exist, etc.), report to the user immediately.

6b. Audit API signatures

For EACH entry in the checklist's API Usage Catalog:

  1. Open the upstream source file at the target version (paths listed in the checklist's upstream_paths frontmatter).
  2. Compare the function/class signature against the current AReaL invocation.
  3. Flag any of:
    • Removed parameters still passed by AReaL → must remove from call site
    • Renamed parameters → must rename in call site
    • New required parameters (no default) → must add to call site
    • New optional parameters with useful defaults → document but skip
    • Changed return types → must update consumers
    • Removed functions/classes → must find replacement
    • Moved modules → must update import paths
    • Changed method signatures on returned objects → must update call sites
  4. Record findings per-file.

6c. Check version-guarded code

If the checklist has a "Version-Guarded Code" section, check whether any guards reference versions at or below the new target. If so, verify the upstream fix is present and note the dead code for cleanup.

6d. Apply code changes (if any)

For each flagged incompatibility:

  1. Update the call site in the affected AReaL file.
  2. Preserve existing behavior — do NOT refactor beyond what's required.
  3. If a function was removed, check the upstream migration guide or changelog.
  4. Priority order: engine layer → model layer → infra layer → test files.

If there are unresolvable breaking changes, STOP and ask the user before proceeding.

6e. Update checklist file

Update the checklist file to reflect the post-upgrade state. Follow the Content Update Procedure in CHECKLIST_MAINTENANCE.md § 4:

  1. Update API signatures in catalog entries where upstream changed.
  2. Update call-site code snippets where Step 6d modified AReaL code.
  3. Update version-guarded code entries (remove cleaned-up guards, add new ones).
  4. Update frontmatter upstream_paths if source files moved.
  5. Update the entry count in the Checklist File Status table (bottom of this file).

This ensures the checklist remains an accurate reference for future upgrades.

6f. Clean up cloned repositories

Remove the cloned upstream source directories to avoid cluttering the workspace:

rm -rf "${REPO_ROOT}/<package>-src"

Step 7: Run Pre-Commit

pre-commit run --all-files

Step 8: Generate Upgrade Summary

Dump a formatted markdown summary to upgrade-summary.md in the repository root (this file is gitignored and ephemeral). The summary MUST include:

## Dependency Upgrade Summary

**Date:** YYYY-MM-DD
**Requested:** <original command>

### Version Changes

| Package | Old Version | New Version | Variant(s) |
| ------- | ----------- | ----------- | ---------- |
| ...     | ...         | ...         | ...        |

### Dependency Resolution

- <auto-resolved change description>
- ...

### Dockerfile Changes

- <change description, or "No changes required">

### API Compatibility Audit

#### <package-name> (old → new)

**Breaking changes found:**
- [file:line] description of change

**Module moves / renames:**
- [old_path] → [new_path]

**Version-guarded code:**
- [file:line] status (still needed / can be removed)

**No breaking changes found** _(if clean)_

#### ...

### Unresolved Issues (if any)

- <description of issue and why it could not be auto-resolved>

If the upgrade failed at any step, the summary should still be generated with the failure reason clearly documented in the "Unresolved Issues" section.

Step 9: Create PR and Trigger CI (Optional)

Ask the user if they want to create a PR.

If the user agrees:

  1. Load the create-pr skill to create the PR.
  2. Trigger the CI workflow manually via .github/workflows/build-docker-image.yml (only if the Dockerfile was modified or inference backend versions changed).
  3. The Docker build CI builds both sglang and vllm images, then automatically triggers testing on each. Debug until the overall workflow succeeds.
  4. If you encounter issues that cannot be resolved, ask the user for help.

Checklist File Status

Package Checklist file Status
megatron-core checklists/megatron-core.md ✅ 18 API entries (parallel_state, DDP, optimizer, pipeline, checkpointing, transformer config, FP8, GPTModel, tensor_parallel, layer specs, RoPE)
megatron-bridge checklists/megatron-bridge.md ✅ 7 API entries (AutoBridge, LoRA, save/load HF, monkey-patch guard)
mbridge checklists/mbridge.md ✅ 14 API entries (AutoBridge, Bridge properties, weight mappings, LLMBridge subclassing, register_model, monkey-patch target)
vllm checklists/vllm.md ✅ 14 API entries (entrypoints, LoRA manager, worker V0/V1, tool parsers, CLI)
sglang checklists/sglang.md ✅ 14 API entries (HTTP endpoints, tool/reasoning parsers, CLI flags, version guards)
transformers checklists/transformers.md ✅ 12 API entries (Auto* classes, tokenizer, flash attention monkey-patches, Qwen VL internals, LR schedulers)
peft checklists/peft.md ✅ 4 API entries (LoraConfig, TaskType, get_peft_model, weight key format)
torchao checklists/torchao.md ✅ 5 API entries (fp8_blockwise_mm, enable_fp8_linear/experts, shard validation, Triton kernels)
指导在Archon训练引擎中添加新HuggingFace模型架构。适用于需支持新模型系列或新增ModelSpec的场景。步骤包括分析目标模型架构(如注意力、FFN、MoE等特征),选择参考实现并复制目录结构作为起点,确保模型符合标准Transformer规范。
用户询问如何向Archon添加模型 希望为ArchonEngine支持新的模型家族 提及添加新的ModelSpec或模型类型
.claude/skills/add-archon-model/SKILL.md
npx skills add areal-project/AReaL --skill add-archon-model -g -y
SKILL.md
Frontmatter
{
    "name": "add-archon-model",
    "description": "Guide for adding a new model to the Archon engine. Use when user wants to add support for a new HuggingFace model architecture in ArchonEngine."
}

Add Archon Model

Add support for a new HuggingFace model architecture in the Archon training engine.

When to Use

This skill is triggered when:

  • User asks "how do I add a model to Archon?"
  • User wants to support a new model family (e.g., Llama, Mistral, DeepSeek) in ArchonEngine
  • User mentions adding a new ModelSpec or model type for Archon

Prerequisites

Before starting, ensure:

  • The target model is available on HuggingFace (has config.json with model_type)
  • You know the HuggingFace model ID (e.g., meta-llama/Llama-3-8B)
  • The model uses a standard transformer architecture (decoder-only)

Step-by-Step Guide

Step 1: Analyze the Target Model Architecture

Read the HuggingFace model's source code to extract key architecture information.

Action: Fetch and analyze the model's HuggingFace configuration and modeling files.

  1. Read the model's config.json (via AutoConfig.from_pretrained) to identify:

    • model_type string (this is the key used for registry lookup)
    • All architecture hyperparameters (hidden_size, num_layers, etc.)
    • Any model-specific fields (e.g., qk_norm, attention_bias, MoE fields)
  2. Read the HuggingFace modeling_*.py source to identify:

    • Attention variant: Does it have Q/K norm? Attention bias? Sliding window? Multi-latent attention?
    • FFN variant: SwiGLU (gate_proj + up_proj + down_proj)? GeGLU? Standard MLP?
    • MoE support: Does it have MoE layers? What router type? Shared experts?
    • RoPE variant: Standard RoPE? YaRN? NTK-aware scaling? What is the inv_freq formula?
    • Normalization: RMSNorm or LayerNorm? Pre-norm or post-norm? Elementwise affine?
    • Weight tying: Does tie_word_embeddings appear in config?
    • State dict key names: What are the HF weight key naming conventions?
  3. Summarize findings in a checklist like:

Target model: <name>
HF model_type: "<model_type>" (and variants like "<model_type>_moe" if applicable)
Attention: [standard GQA / with QK norm / with bias / sliding window / ...]
FFN: [SwiGLU / GeGLU / standard MLP / ...]
MoE: [no / yes - num_experts, top_k, shared_experts]
RoPE: [standard / YaRN / NTK-aware / ...]
Norm: [RMSNorm / LayerNorm] with [pre-norm / post-norm]
Weight tying: [yes / no]

Step 2: Select the Reference Model

Choose the closest existing implementation as a starting point:

Target characteristics Reference Why
Dense-only, standard GQA, no QK norm qwen2 Simplest baseline, pure dense
Has QK norm, or has MoE support qwen3 Supports QK norm + MoE + shared experts

Action: Copy the reference model directory as the starting point:

areal/experimental/models/archon/<model>/
  __init__.py
  spec.py
  model/
    args.py
    model.py
    rope.py
    state_dict_adapter.py
  infra/
    parallelize.py

Step 3: Implement args.py

Adapt <Model>ModelArgs to match the target model's HuggingFace config fields.

Key changes from reference:

  1. Update the @dataclass fields to match the target model's hyperparameters:

    • Field names should use Archon conventions (dim, n_layers, n_heads, n_kv_heads, vocab_size, head_dim, hidden_dim, norm_eps, rope_theta, etc.)
    • Default values should match the smallest variant of the target model
    • Add model-specific fields (e.g., attention_bias, qk_norm, sliding_window)
  2. Update from_hf_config() to correctly map HuggingFace config attributes:

    • Use getattr(hf_config, "field_name", default) for optional fields
    • Handle variant-specific fields (e.g., MoE fields only present in MoE variants)
    • The method must return an instance of the model args class

Critical: Verify every field mapping against the HF model's config.json. Incorrect mappings here cause silent errors downstream.

Base class contract (BaseModelArgs):

@dataclass
class <Model>ModelArgs(BaseModelArgs):
    # ... model-specific fields ...

    @classmethod
    def from_hf_config(
        cls,
        hf_config: PretrainedConfig,
        is_critic: bool = False,
        **kwargs,
    ) -> <Model>ModelArgs:
        # Map HF config fields to Archon model args
        ...

Step 4: Implement model.py

Adapt the model architecture to match the target model.

Key components to adapt:

  1. Normalization (RMSNorm or similar):

    • Check if elementwise_affine is configurable
    • Check the epsilon default value
    • If the model uses LayerNorm, implement accordingly
  2. Attention module:

    • Q/K/V projection: Check bias presence (nn.Linear(..., bias=True/False))
    • QK norm: Add q_norm/k_norm if the model has them, remove if it doesn't
    • GQA: n_kv_heads < n_heads for grouped-query attention
    • Ulysses SP: Keep the set_cp_group / _sp_enabled pattern from the reference
    • Output projection: Check bias presence
  3. FeedForward module:

    • SwiGLU: w2(silu(w1(x)) * w3(x)) -- most common for modern LLMs
    • Check bias in linear layers
    • For MoE models: MoE module replaces FeedForward on designated layers
  4. TransformerBlock: Pre-norm (most modern LLMs) vs post-norm

    • MoE layer detection via _is_moe_layer() if applicable
  5. Top-level Model (<Model>Model(BaseArchonModel)):

    • tok_embeddings, layers (as ModuleDict), norm, output/score
    • init_weights(): Match initialization scheme from HF
    • init_buffers(): RoPE cache + MoE buffers
    • forward(): Must follow BaseArchonModel signature: (tokens, positions, cu_seqlens, max_seqlen, tree_attn_meta=None) -> Tensor

Base class contract (BaseArchonModel):

class <Model>Model(BaseArchonModel):
    def forward(self, tokens, positions, cu_seqlens, max_seqlen, tree_attn_meta=None) -> torch.Tensor: ...
    def init_weights(self) -> None: ...
    def init_buffers(self, buffer_device) -> None: ...

Step 5: Implement rope.py

Handle the rotary position embedding variant.

Options:

  1. Standard RoPE (same as qwen2/qwen3): Re-export from qwen2:

    from areal.experimental.models.archon.qwen2.model.rope import (
        apply_rotary_emb,
        precompute_rope_cache,
        repeat_kv,
        reshape_for_broadcast,
        rotate_half,
    )
    
  2. Custom RoPE (YaRN, NTK-aware, etc.): Implement custom precompute_rope_cache() and apply_rotary_emb() functions. The key difference is usually in how inv_freq is computed (scaling factors, interpolation, etc.).

Step 6: Implement state_dict_adapter.py

Map between HuggingFace and Archon weight key names.

This is the most error-prone step. The adapter must correctly handle:

  1. Key name mapping (from_hf_map dict):

    • Embedding: model.embed_tokens.weight -> tok_embeddings.weight
    • Attention: model.layers.{}.self_attn.q_proj.weight -> layers.{}.attention.wq.weight
    • FFN: model.layers.{}.mlp.gate_proj.weight -> layers.{}.feed_forward.w1.weight
    • Norms: model.layers.{}.input_layernorm.weight -> layers.{}.attention_norm.weight
    • Output: lm_head.weight -> output.weight
    • Skip keys (set to None): rotary_emb.inv_freq (computed at runtime)
    • Model-specific keys: bias terms, QK norm weights, etc.
  2. Reverse mapping (to_hf_map): Auto-generated from from_hf_map

  3. MoE expert weights (if applicable): 3D<->2D conversion for expert weights. Copy the MoE handling from qwen3 if the model has MoE.

  4. Weight tying: Skip output.weight during to_hf() if tie_word_embeddings=True

Verification approach: After implementation, the adapter should satisfy:

# Roundtrip: archon -> hf -> archon preserves all keys
hf_sd = adapter.to_hf(archon_sd)
roundtrip_sd = adapter.from_hf(hf_sd)
assert set(roundtrip_sd.keys()) == set(archon_sd.keys())

Base class contract (BaseStateDictAdapter):

class <Model>StateDictAdapter(BaseStateDictAdapter):
    def from_hf(self, hf_state_dict) -> dict[str, Any]: ...
    def to_hf(self, archon_state_dict) -> dict[str, Any]: ...
    def convert_single_to_hf(self, name, tensor) -> list[tuple[str, torch.Tensor]]: ...

Step 7: Implement parallelize.py

Define the parallelization strategy for the model.

The parallelize function applies parallelism in this order:

  1. TP (Tensor Parallelism) -- shard attention/FFN across devices
  2. EP (Expert Parallelism) -- for MoE models only
  3. CP (Context Parallelism / Ulysses SP) -- sequence parallelism
  4. AC (Activation Checkpointing) -- memory optimization
  5. torch.compile -- compilation optimization
  6. FSDP (Fully Sharded Data Parallelism) -- data parallelism

Key adaptations by model architecture:

  • Attention with QK norm: wq/wk use use_local_output=False (DTensor output for norm), add SequenceParallel(sequence_dim=2) for q_norm/k_norm
  • Attention without QK norm: wq/wk/wv all use use_local_output=True
  • Attention with bias: Bias terms follow the same parallel plan as their weights
  • MoE layers: Separate TP plan for MoE input/output, router gate, and expert weights. Copy from qwen3's apply_moe_ep_tp() and apply_non_moe_tp()
  • Dense-only models: Simpler plan without MoE handling. Copy from qwen2

Function signature (must match ParallelizeFn protocol):

def parallelize_<model>(
    model: nn.Module,
    parallel_dims: ArchonParallelDims,
    param_dtype: torch.dtype = torch.bfloat16,
    reduce_dtype: torch.dtype = torch.float32,
    loss_parallel: bool = True,
    cpu_offload: bool = False,
    reshard_after_forward_policy: str = "default",
    ac_config: ActivationCheckpointConfig | None = None,
    enable_compile: bool = True,
) -> nn.Module:

Step 8: Create spec.py and Register

Assemble the ModelSpec and register it.

from areal.experimental.models.archon.model_spec import ModelSpec, register_model_spec
from areal.experimental.models.archon.pipeline_parallel import pipeline_llm
from areal.experimental.models.archon.<model>.infra.parallelize import parallelize_<model>
from areal.experimental.models.archon.<model>.model.args import <Model>ModelArgs
from areal.experimental.models.archon.<model>.model.model import <Model>Model
from areal.experimental.models.archon.<model>.model.state_dict_adapter import (
    <Model>StateDictAdapter,
)

<MODEL>_SPEC = ModelSpec(
    name="<Model>",
    model_class=<Model>Model,
    model_args_class=<Model>ModelArgs,
    state_dict_adapter_class=<Model>StateDictAdapter,
    parallelize_fn=parallelize_<model>,
    supported_model_types=frozenset({"<model_type>"}),  # From HF config.json
    pipelining_fn=pipeline_llm,
)

# Auto-register when module is imported
register_model_spec(<MODEL>_SPEC)

__all__ = ["<MODEL>_SPEC"]

Note: supported_model_types should include all HF model_type strings that this implementation handles (e.g., {"qwen3", "qwen3_moe"} for Qwen3).

Step 9: Register in __init__.py

Add the import to areal/experimental/models/archon/__init__.py:

from areal.experimental.models.archon.<model> import spec as <model>_spec  # noqa: F401

This triggers auto-registration when the module is imported.

Step 10: Verify and Test

Verification should be done in stages, adapting based on available hardware and the test patterns in tests/experimental/archon/.

Before writing tests, examine the existing test files to understand current patterns:

tests/experimental/archon/
  conftest.py             -- Pytest configuration (version checks)
  utils.py                -- Shared utilities (model loading, comparison)
  test_qwen3_args.py      -- Args unit tests (CPU-only)
  test_state_dict_adapter.py  -- State dict roundtrip tests
  test_weight_sync.py     -- Weight completeness tests (meta device)
  test_forward.py         -- Forward precision comparison (single GPU)
  ...

Test stages (write tests appropriate for the model's complexity):

Stage 1: Args Tests (CPU-only, always write these)

Test from_hf_config() with mock HuggingFace configs:

# Pattern: Create mock PretrainedConfig, verify args mapping
from unittest.mock import MagicMock

def test_args_from_hf_config():
    hf_config = MagicMock()
    hf_config.hidden_size = 4096
    hf_config.num_hidden_layers = 32
    # ... set all required fields
    args = <Model>ModelArgs.from_hf_config(hf_config)
    assert args.dim == 4096
    assert args.n_layers == 32

Stage 2: State Dict Adapter Tests (CPU-only)

Test key mapping roundtrip:

def test_state_dict_roundtrip():
    # Create adapter with mock config
    adapter = <Model>StateDictAdapter(mock_config)
    # Create fake archon state dict with expected keys
    archon_sd = {"tok_embeddings.weight": torch.randn(vocab, dim), ...}
    # Roundtrip
    hf_sd = adapter.to_hf(archon_sd)
    roundtrip = adapter.from_hf(hf_sd)
    assert set(roundtrip.keys()) == set(archon_sd.keys())

Stage 3: Weight Completeness (meta device, CPU-only)

Verify all model parameters have HF mappings:

def test_weight_completeness():
    # Create model on meta device
    with torch.device("meta"):
        model = <Model>Model(args)
    adapter = <Model>StateDictAdapter(hf_config)
    # Check every archon param has a HF mapping
    for name, _ in model.named_parameters():
        hf_pairs = adapter.convert_single_to_hf(name, torch.empty(0))
        assert len(hf_pairs) > 0, f"No HF mapping for {name}"

Stage 4: Forward Precision (single GPU, if available)

Compare Archon model output against HuggingFace reference:

@pytest.mark.skipif(not torch.cuda.is_available(), reason="Requires CUDA")
def test_forward_matches_hf():
    # Load both HF and Archon models
    # Run forward on same input
    # Compare logits within tolerance

Important: Do NOT hardcode the test categories. Inspect the existing test files in tests/experimental/archon/ and follow the same patterns, fixtures, and markers. Adapt test scope to the model's specific features (e.g., add MoE-specific tests only if the model has MoE).

Reference Implementations

Model Directory Features
Qwen2 areal/experimental/models/archon/qwen2/ Dense, attention bias, no QK norm
Qwen3 areal/experimental/models/archon/qwen3/ Dense + MoE, QK norm, no attention bias, shared experts

Architecture Decision Map

Feature qwen2 qwen3 What to check in target model
Attention bias Yes No attention_bias in HF config
QK norm No Yes qk_norm in HF config or QKNorm module in modeling file
MoE No Yes num_experts/num_local_experts in HF config
Shared experts No Yes num_shared_experts in HF config
Decoder sparse step No Yes decoder_sparse_step in HF config
Weight tying Both Both tie_word_embeddings in HF config
RoPE Standard Standard (re-export qwen2) Check inv_freq formula in HF modeling code

Common Mistakes

  • Not mapping all HF keys in state_dict_adapter.py (causes silent weight drops)
  • Wrong from_hf_config() field mapping (uses wrong HF config attribute name)
  • Forgetting to handle None keys in from_hf_map (keys to skip like rotary_emb.inv_freq)
  • Missing MoE expert weight 3D<->2D conversion when model has MoE
  • Wrong TP plan for attention with/without QK norm (use_local_output must match)
  • Forgetting to add import line in areal/experimental/models/archon/__init__.py
  • Not including all model_type variants in supported_model_types frozenset
  • Using print instead of areal.utils.logging.getLogger()

File Checklist

After completion, verify all files exist and are consistent:

  • areal/experimental/models/archon/<model>/__init__.py
  • areal/experimental/models/archon/<model>/spec.py -- ModelSpec + register
  • areal/experimental/models/archon/<model>/model/args.py -- ModelArgs + from_hf_config
  • areal/experimental/models/archon/<model>/model/model.py -- Model + Attention + FFN
  • areal/experimental/models/archon/<model>/model/rope.py -- RoPE (or re-export)
  • areal/experimental/models/archon/<model>/model/state_dict_adapter.py -- Key mapping
  • areal/experimental/models/archon/<model>/infra/parallelize.py -- Parallel strategy
  • areal/experimental/models/archon/__init__.py -- Import line added
  • tests/experimental/archon/test_<model>_*.py -- Tests

指导如何在AReaL框架中添加新数据集加载器。涵盖创建包含SFT和RL训练处理逻辑的Python文件,并在__init__.py中注册数据集名称与导入函数,实现数据集集成。
用户询问如何添加数据集 用户希望集成新数据集 用户提及创建数据集加载器
.claude/skills/add-dataset/SKILL.md
npx skills add areal-project/AReaL --skill add-dataset -g -y
SKILL.md
Frontmatter
{
    "name": "add-dataset",
    "description": "Guide for adding a new dataset loader to AReaL. Use when user wants to add a new dataset."
}

Add Dataset

Add a new dataset loader to AReaL.

When to Use

This skill is triggered when:

  • User asks "how do I add a dataset?"
  • User wants to integrate a new dataset
  • User mentions creating a dataset loader

Step-by-Step Guide

Step 1: Create Dataset File

Create areal/dataset/<name>.py:

from datasets import Dataset, load_dataset


def get_<name>_sft_dataset(
    path: str,
    split: str,
    tokenizer,
    max_length: int | None = None,
) -> Dataset:
    """Load dataset for SFT training.

    Args:
        path: Path to dataset (HuggingFace hub or local path)
        split: Dataset split (train/validation/test)
        tokenizer: Tokenizer for processing
        max_length: Maximum sequence length (optional)

    Returns:
        HuggingFace Dataset with processed samples
    """
    dataset = load_dataset(path=path, split=split)

    def process(sample):
        # Tokenize the full sequence (prompt + response)
        seq_token = tokenizer.encode(
            sample["question"] + sample["answer"] + tokenizer.eos_token
        )
        prompt_token = tokenizer.encode(sample["question"])
        # Loss mask: 0 for prompt, 1 for response
        loss_mask = [0] * len(prompt_token) + [1] * (len(seq_token) - len(prompt_token))
        return {"input_ids": seq_token, "loss_mask": loss_mask}

    dataset = dataset.map(process).remove_columns(["question", "answer"])

    if max_length is not None:
        dataset = dataset.filter(lambda x: len(x["input_ids"]) <= max_length)

    return dataset


def get_<name>_rl_dataset(
    path: str,
    split: str,
    tokenizer,
    max_length: int | None = None,
) -> Dataset:
    """Load dataset for RL training.

    Args:
        path: Path to dataset
        split: Dataset split
        tokenizer: Tokenizer for length filtering
        max_length: Maximum sequence length

    Returns:
        HuggingFace Dataset with prompts and answers for reward computation
    """
    dataset = load_dataset(path=path, split=split)

    def process(sample):
        messages = [
            {
                "role": "user",
                "content": sample["question"],
            }
        ]
        return {"messages": messages, "answer": sample["answer"]}

    dataset = dataset.map(process).remove_columns(["question"])

    if max_length is not None:

        def filter_length(sample):
            content = sample["messages"][0]["content"]
            tokens = tokenizer.encode(content)
            return len(tokens) <= max_length

        dataset = dataset.filter(filter_length)

    return dataset

Step 2: Register in init.py

Update areal/dataset/__init__.py:

# Add to VALID_DATASETS
VALID_DATASETS = [
    # ... existing datasets
    "<name>",
]

# Add to _get_custom_dataset function
def _get_custom_dataset(name: str, ...):
    # ... existing code
    elif name == "<name>":
        from areal.dataset.<name> import get_<name>_sft_dataset, get_<name>_rl_dataset
        if dataset_type == "sft":
            return get_<name>_sft_dataset(path, split, max_length, tokenizer)
        else:
            return get_<name>_rl_dataset(path, split, max_length, tokenizer)

Step 3: Add Config (Optional)

If the dataset needs special configuration, add to areal/api/cli_args.py:

@dataclass
class TrainDatasetConfig:
    # ... existing fields
    <name>_specific_field: Optional[str] = None

Step 4: Add Tests

Create tests/test_<name>_dataset.py:

import pytest
from areal.dataset.<name> import get_<name>_sft_dataset, get_<name>_rl_dataset

def test_sft_dataset_loads(tokenizer):
    dataset = get_<name>_sft_dataset("path/to/data", split="train", tokenizer=tokenizer)
    assert len(dataset) > 0
    assert "input_ids" in dataset.column_names
    assert "loss_mask" in dataset.column_names

def test_rl_dataset_loads(tokenizer):
    dataset = get_<name>_rl_dataset("path/to/data", split="train", tokenizer=tokenizer)
    assert len(dataset) > 0
    assert "messages" in dataset.column_names
    assert "answer" in dataset.column_names

Reference Implementations

Dataset File Description
GSM8K areal/dataset/gsm8k.py Math word problems
Geometry3K areal/dataset/geometry3k.py Geometry problems
CLEVR areal/dataset/clevr_count_70k.py Visual counting
HH-RLHF areal/dataset/hhrlhf.py Helpfulness/Harmlessness
TORL areal/dataset/torl_data.py Tool-use RL

Required Fields

SFT Dataset

{
    "messages": [
        {"role": "user", "content": "..."},
        {"role": "assistant", "content": "..."},
    ]
}

RL Dataset

{
    "messages": [
        {"role": "user", "content": "..."},
    ],
    "answer": "ground_truth_for_reward",
    # Optional metadata for reward function
}

Common Mistakes

  • ❌ Returning List[Dict] instead of HuggingFace Dataset
  • ❌ Using Python loops instead of dataset.map()/filter()
  • ❌ Missing "messages" field for RL datasets
  • ❌ Wrong message format (should be list of dicts with role and content)
  • ❌ Not registering in __init__.py

指导在AReaL框架中创建和注册自定义奖励函数。涵盖编写奖励逻辑、更新注册文件、处理异步阻塞操作及添加单元测试的完整步骤,适用于需要实现特定评估指标的场景。
用户询问如何添加奖励函数 用户希望实现自定义奖励逻辑 用户提及奖励计算相关需求
.claude/skills/add-reward/SKILL.md
npx skills add areal-project/AReaL --skill add-reward -g -y
SKILL.md
Frontmatter
{
    "name": "add-reward",
    "description": "Guide for adding a new reward function to AReaL. Use when user wants to create a reward function."
}

Add Reward

Add a new reward function to AReaL.

When to Use

This skill is triggered when:

  • User asks "how do I add a reward function?"
  • User wants to implement custom rewards
  • User mentions reward computation

Step-by-Step Guide

Step 1: Create Reward File

Create areal/reward/<name>.py:

from typing import Any

from areal.utils import logging

logger = logging.getLogger("MyReward")


def <name>_reward_fn(
    prompt: str,
    completions: str,
    prompt_ids,
    completion_ids,
    answer: str | None = None,
    **kwargs: Any,
) -> float:
    """Compute reward for a single completion.

    Args:
        prompt: Prompt string
        completions: Completion string (model output)
        prompt_ids: Tokenized prompt IDs
        completion_ids: Tokenized completion IDs
        answer: Ground truth answer from dataset (optional)
        **kwargs: Additional data from dataset

    Returns:
        Reward value (float), typically 0.0 or 1.0
    """
    try:
        # Extract answer from completion
        extracted = _extract_answer(completions)

        # Compare with ground truth
        if answer is not None and extracted == str(answer):
            return 1.0
        return 0.0
    except Exception:
        logger.warning("Exception in reward computation", exc_info=True)
        return 0.0


def _extract_answer(completion: str) -> str:
    """Extract the answer from a completion string.

    Implement your extraction logic here.
    """
    # Example: Extract content from \boxed{}
    import re

    match = re.search(r"\\boxed\{([^}]+)\}", completion)
    if match:
        return match.group(1).strip()
    return completion.strip()

Step 2: Register in init.py

Update areal/reward/__init__.py:

# Add to VALID_REWARD_FN
VALID_REWARD_FN = [
    # ... existing reward functions
    "<name>",
]

# Add to get_reward_fn function
def get_reward_fn(name: str, **kwargs):
    # ... existing code
    elif name == "<name>":
        from areal.reward.<name> import <name>_reward_fn
        return <name>_reward_fn

Step 3: Handle Blocking Operations

If your reward function uses blocking operations (e.g., API calls, model inference), the workflow will wrap it with AsyncRewardWrapper:

# In your workflow
from areal.reward import AsyncRewardWrapper

self.reward_fn = AsyncRewardWrapper(reward_fn)

# Then call it asynchronously
rewards = await self.reward_fn(prompt, completions, **data)

Step 4: Add Tests

Create tests/test_<name>_reward.py:

import pytest
from areal.reward.<name> import <name>_reward_fn

def test_reward_correct_answer():
    reward = <name>_reward_fn(
        prompt="What is 2+2?",
        completions="The answer is \\boxed{4}",
        prompt_ids=None,
        completion_ids=None,
        answer="4",
    )
    assert reward == 1.0

def test_reward_wrong_answer():
    reward = <name>_reward_fn(
        prompt="What is 2+2?",
        completions="The answer is \\boxed{5}",
        prompt_ids=None,
        completion_ids=None,
        answer="4",
    )
    assert reward == 0.0

Reference Implementations

Reward File Description
GSM8K areal/reward/gsm8k.py Math answer verification
Geometry3K areal/reward/geometry3k.py Geometry answer verification
CLEVR areal/reward/clevr_count_70k.py Counting verification
MathVerify areal/reward/math_verify.py General math verification

Function Signature

All reward functions must follow this signature:

def reward_fn(
    prompt: str,               # Input prompt string
    completions: str,          # Model completion string
    prompt_ids,                # Tokenized prompt
    completion_ids,            # Tokenized completion
    **kwargs: Any,             # Additional data from dataset (e.g., answer)
) -> float:                    # Reward value (typically 0.0 or 1.0)

Note: The reward function is called once per sample. Batching is handled by AsyncRewardWrapper in the workflow.

Key Requirements

  1. Deterministic: Same inputs should produce same outputs
  2. Return float: Output is a single float value per sample
  3. No blocking in async context: Use AsyncRewardWrapper if needed
  4. Logging: Use areal.utils.logging, not print
  5. Handle exceptions: Return 0.0 on error, don't raise

Common Mistakes

  • ❌ Returning a tensor instead of a float
  • ❌ Expecting batched inputs (reward is called per sample)
  • ❌ Non-deterministic behavior
  • ❌ Blocking operations without AsyncRewardWrapper
  • ❌ Raising exceptions instead of returning 0.0

指导在AReaL项目中添加单元测试,涵盖测试类型区分、文件命名规范、Arrange-Act-Assert编写模式及Pytest标记策略,适用于新增功能测试或提升覆盖率场景。
用户询问如何添加测试 用户希望增加测试覆盖率 用户需要为新功能编写测试 用户想了解AReaL的测试模式
.claude/skills/add-unit-tests/SKILL.md
npx skills add areal-project/AReaL --skill add-unit-tests -g -y
SKILL.md
Frontmatter
{
    "name": "add-unit-tests",
    "description": "Guide for adding unit tests to AReaL. Use when user wants to add tests for new functionality or increase test coverage."
}

Add Unit Tests

Add unit tests to AReaL following the project's testing conventions.

When to Use

This skill is triggered when:

  • User asks "how do I add tests?"
  • User wants to increase test coverage
  • User needs to write tests for new functionality
  • User wants to understand AReaL testing patterns

Step-by-Step Guide

Step 1: Understand Test Types

AReaL has two main test categories:

Test Type Purpose Location Pattern How It Runs
Unit Tests Test individual functions/modules tests/test_<module>_<feature>.py Directly via pytest
Distributed Tests Test distributed/parallel behavior tests/torchrun/run_*.py Via torchrun (called by pytest subprocess)

Note: All tests are invoked via pytest. Distributed tests use torchrun but are still called from pytest test files.

Step 2: Create Test File Structure

Create test file with naming convention: test_<module>_<feature>.py

import pytest
import torch

# Import the module to test
from areal.dataset.gsm8k import get_gsm8k_sft_dataset
from tests.utils import get_dataset_path  # Optional test utilities
# For mocking tokenizer: from unittest.mock import MagicMock

Step 3: Write Test Functions

Follow Arrange-Act-Assert pattern:

def test_function_under_condition_returns_expected():
    """Test that function returns expected value under condition."""
    # Arrange
    input_data = 5
    expected_output = 10

    # Act
    result = function_under_test(input_data)

    # Assert
    assert result == expected_output

Step 4: Add Pytest Markers and CI Strategy

Use appropriate pytest markers:

Marker When to Use
@pytest.mark.slow Test takes > 10 seconds (excluded from CI by default)
@pytest.mark.ci Slow test that must run in CI (use with @pytest.mark.slow)
@pytest.mark.asyncio Async test functions
@pytest.mark.skipif(cond, reason=...) Conditional skip
@pytest.mark.parametrize(...) Parameterized tests

CI Test Strategy:

  • @pytest.mark.slow: Excluded from CI by default (CI runs pytest -m "not slow")
  • @pytest.mark.slow + @pytest.mark.ci: Slow but must run in CI
  • No marker: Runs in CI (fast unit tests)
@pytest.mark.asyncio
async def test_async_function():
    result = await async_function()
    assert result == expected

@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_gpu_feature():
    tensor = torch.tensor([1, 2, 3], device="cuda")
    # ... assertions

@pytest.mark.parametrize("batch_size", [1, 4, 16])
def test_with_parameters(batch_size):
    # Parameterized test

@pytest.mark.slow
def test_slow_function():
    # Excluded from CI by default

@pytest.mark.slow
@pytest.mark.ci
def test_slow_but_required_in_ci():
    # Slow but must run in CI

Step 5: Mock Distributed Environment

For unit tests that need distributed mocks:

import torch.distributed as dist

def test_distributed_function(monkeypatch):
    monkeypatch.setattr(dist, "get_rank", lambda: 0)
    monkeypatch.setattr(dist, "get_world_size", lambda: 2)
    result = distributed_function()
    assert result == expected

Step 6: Handle GPU Dependencies

Always skip gracefully when GPU unavailable:

CUDA_AVAILABLE = torch.cuda.is_available()

@pytest.mark.skipif(not CUDA_AVAILABLE, reason="CUDA not available")
def test_gpu_function():
    tensor = torch.tensor([1, 2, 3], device="cuda")
    # ... assertions

Key Requirements (Based on testing.md)

Mocking Distributed

  • Use torch.distributed.fake_pg for unit tests
  • Mock dist.get_rank() and dist.get_world_size() explicitly
  • Don't mock internals of FSDP/DTensor

GPU Test Constraints

  • Always skip gracefully when GPU unavailable
  • Clean up GPU memory: torch.cuda.empty_cache() in fixtures
  • Use smallest possible model/batch for unit tests

Assertions

  • Use torch.testing.assert_close() for tensor comparison
  • Specify rtol/atol explicitly for numerical tests
  • Avoid bare assert tensor.equal() - no useful error message

Reference Implementations

Test File Description Key Patterns
tests/test_utils.py Utility function tests Fixtures, parametrized tests
tests/test_examples.py Integration tests with dataset loading Dataset path resolution, success pattern matching
tests/test_fsdp_engine_nccl.py Distributed tests Torchrun integration

Common Mistakes

  • Missing test file registration: Ensure file follows test_*.py naming
  • GPU dependency without skip: Always use @pytest.mark.skipif for GPU tests
  • Incorrect tensor comparisons: Use torch.testing.assert_close() not assert tensor.equal()
  • Memory leaks in GPU tests: Clean up with torch.cuda.empty_cache()
  • Mocking too much: Don't mock FSDP/DTensor internals
  • Unclear test names: Follow test_<what>_<condition>_<expected> pattern
  • No docstrings: Add descriptive docstrings to test functions

Integration with Other Skills

This skill complements other AReaL development skills:

  • After /add-dataset: Add tests for new dataset loaders
  • After /add-workflow: Add tests for new workflows
  • After /add-reward: Add tests for new reward functions
  • With planner agent: Reference this skill when planning test implementation

Running Tests

# First check GPU availability (many tests require GPU)
python -c "import torch; print('GPU available:', torch.cuda.is_available())"

# Run specific test file
uv run pytest tests/test_<name>.py

# Skip slow tests (CI default)
uv run pytest -m "not slow"

# Run with verbose output
uv run pytest -v

# Run distributed tests (requires torchrun and multi-GPU)
# Note: Usually invoked via pytest test files
torchrun --nproc_per_node=2 tests/torchrun/run_<test>.py
指导在 AReaL 中创建新的 RolloutWorkflow。涵盖前置条件、编写异步推理与奖励计算代码、注册到模块及更新训练脚本,用于实现自定义 rollout 流程。
用户询问如何添加工作流 用户希望创建新的 RolloutWorkflow 用户提及实现自定义 rollout
.claude/skills/add-workflow/SKILL.md
npx skills add areal-project/AReaL --skill add-workflow -g -y
SKILL.md
Frontmatter
{
    "name": "add-workflow",
    "description": "Guide for adding a new RolloutWorkflow to AReaL. Use when user wants to create a new workflow."
}

Add Workflow

Add a new RolloutWorkflow implementation to AReaL.

When to Use

This skill is triggered when:

  • User asks "how do I add a workflow?"
  • User wants to create a new RolloutWorkflow
  • User mentions implementing a custom rollout

Prerequisites

Before starting, ensure you understand:

  • The workflow's purpose and requirements
  • Input/output data format
  • Reward function to use

Step-by-Step Guide

Step 1: Create Workflow File

Create areal/workflow/<name>.py:

import uuid
from typing import Any, Callable

import torch

from areal.api.cli_args import GenerationHyperparameters
from areal.api.engine_api import InferenceEngine
from areal.api.io_struct import ModelRequest, ModelResponse
from areal.api.reward_api import AsyncRewardWrapper
from areal.api.workflow_api import RolloutWorkflow
from areal.utils import logging

logger = logging.getLogger("MyWorkflow")


class MyWorkflow(RolloutWorkflow):
    """Description of your workflow."""

    def __init__(
        self,
        gconfig: GenerationHyperparameters,
        tokenizer,
        reward_fn: Callable,
    ):
        self.gconfig = gconfig.new_with_stop_and_pad_token_ids(tokenizer)
        self.tokenizer = tokenizer
        self.async_reward_fn = AsyncRewardWrapper(reward_fn)

    async def arun_episode(
        self,
        engine: InferenceEngine,
        data: dict[str, Any],
    ) -> dict[str, torch.Tensor]:
        """Run a single episode. MUST be async and non-blocking."""

        # 1. Prepare input_ids from data
        input_ids = self.tokenizer.apply_chat_template(
            data["messages"],
            tokenize=True,
            add_generation_prompt=True,
        )

        # 2. Build ModelRequest
        req = ModelRequest(
            rid=uuid.uuid4().hex,
            input_ids=list(input_ids),
            gconfig=self.gconfig.new(n_samples=1),
            tokenizer=self.tokenizer,
        )

        # 3. Generate completion (async)
        resp: ModelResponse = await engine.agenerate(req)

        # 4. Compute reward (async)
        prompt_str = self.tokenizer.decode(input_ids)
        completion_str = self.tokenizer.decode(resp.output_tokens)
        reward = await self.async_reward_fn(
            prompt_str,
            completion_str,
            resp.input_tokens,
            resp.output_tokens,
            **data,
        )

        # 5. Return results in expected format
        return {
            "input_ids": torch.tensor(resp.input_tokens),
            "output_ids": torch.tensor(resp.output_tokens),
            "reward": torch.tensor(reward),
        }

Step 2: Register in init.py

Add to areal/workflow/__init__.py:

from areal.workflow.<name> import MyWorkflow

__all__ = [
    # ... existing exports
    "MyWorkflow",
]

Step 3: Update Entry Script

Update your training script to use the new workflow:

trainer.train(
    workflow="areal.workflow.<name>.MyWorkflow",
    # ... other args
)

Step 4: Add Tests

Create tests/test_<name>_workflow.py:

import pytest
from areal.workflow.<name> import MyWorkflow

@pytest.mark.asyncio
async def test_workflow_basic():
    # Test basic functionality
    pass

Reference Implementations

Workflow File Description
MultiTurnWorkflow areal/workflow/multi_turn.py Multi-turn conversation
RLVRWorkflow areal/workflow/rlvr.py RL with verifiable rewards
VisionRLVRWorkflow areal/workflow/vision_rlvr.py Vision + RLVR

Key Requirements

  1. Async: arun_episode must be async def and non-blocking
  2. No sync I/O: Use aiofiles for file operations
  3. Wrap rewards: Use AsyncRewardWrapper for reward functions
  4. Tensor format: Output tensors should be [batch, seq_len, ...]
  5. Use helpers: concat_padded_tensors for combining outputs

Common Mistakes

  • ❌ Using open() instead of aiofiles.open()
  • ❌ Forgetting to await async calls
  • ❌ Not wrapping reward function with AsyncRewardWrapper
  • ❌ Wrong tensor shape conventions

规范AReaL仓库的Git提交信息,强制使用Conventional Commits格式。支持根据文件路径自动推断作用域,提供详细的类型选择指南及提交消息结构规则。
执行git commit时 创建PR或Agent工作流产生提交时
.claude/skills/commit-conventions/SKILL.md
npx skills add areal-project/AReaL --skill commit-conventions -g -y
SKILL.md
Frontmatter
{
    "name": "commit-conventions",
    "description": "AReaL commit message conventions. MUST load on every git commit -- provides Conventional Commits format with scope inference from file paths."
}

Commit Conventions

Commit message conventions and scope inference rules for the AReaL repository.

When to Use

ALWAYS load this skill when making any git commit in AReaL. This includes:

  • Direct commits (git commit)
  • Commits during PR creation (/create-pr)
  • Commits delegated via Agent tool with skills: ["commit-conventions"]
  • Any agent workflow that produces a commit

Commit Message Format

<type>(<scope>): <subject>

<body>

[Optional sections:]
Key changes:
- change 1
- change 2

Refs: #123, #456

Type Selection

Type When to Use
feat New feature or capability
fix Bug fix
docs Documentation only
gov Governance or maintainer changes
style Formatting/style-only changes
refactor Code change without feature/fix
perf Performance improvement
test Adding or fixing tests
build Build system or dependencies
ci CI pipeline or workflow changes
chore Build, deps, config changes
revert Revert a previous commit

Scope Inference

Infer scope from the primary changed file paths:

File Path Pattern Scope
areal/workflow/ workflow
areal/engine/ engine
areal/reward/ reward
areal/dataset/ dataset
areal/api/ api
areal/utils/ utils
areal/infra/ infra
areal/trainer/ trainer
areal/models/ models
areal/experimental/ archon
docs/ docs
examples/ examples
AGENTS.md, .agents/, .claude/, .codex/, .opencode/ agents
Multiple areas Omit scope or use broader term

Rules

  • Subject: imperative mood, ~50-72 chars, no trailing period
  • Body: explain "why" not "what", wrap at 72 chars
  • Key changes: bullet list of main modifications (for complex commits with 3+ files)
  • Refs: reference issues/PRs if applicable

Examples

Single file fix:

fix(reward): handle empty completion in gsm8k

Return 0 reward instead of raising exception when
completion string is empty after extraction.

Multi-file feature:

feat(engine): add CPU offload support to ArchonEngine

Enable torch_memory_saver for model offloading during
rollout phase to reduce GPU memory pressure.

Key changes:
- Add offload/onload methods to ArchonEngine
- Integrate with weight update flow
- Handle ROCm compatibility

Docs only:

docs: update algorithm comparison table

Add SAPO and GSPO to the algorithm family documentation
with configuration examples.

Agent/tooling changes:

chore(agents): port review-pr command to OpenCode

Add OpenCode-native commands with task() category
delegation instead of hardcoded model names.

Key changes:
- Create .opencode/command/ with review-pr, create-pr
- Replace hardcoded model routing with platform-native review routing
- Add expert subagent consultation patterns

Governance/maintainer changes:

gov(agents): add maintainer ownership for service modules

Update CODEOWNERS and maintainer references to reflect
current governance responsibilities.

Key changes:
- Add maintainers for agent_service and infra ownership
- Align governance docs with updated reviewer responsibilities

针对AReaL分布式训练(FSDP2/TP等)的调试指南。涵盖死锁、结果错误、OOM及通信错误的排查。强调最小复现原则,提供环境配置、py-spy堆栈分析及DTensor验证等具体步骤与代码示例。
训练挂起或死锁 跨Rank结果不一致或数值错误 分布式环境下的OOM错误 NCCL通信错误或设备网格问题
.claude/skills/debug-distributed/SKILL.md
npx skills add areal-project/AReaL --skill debug-distributed -g -y
SKILL.md
Frontmatter
{
    "name": "debug-distributed",
    "description": "Guide for debugging distributed training issues in AReaL. Use when user encounters hangs, wrong results, OOM, or communication errors."
}

Debug Distributed Training

Debugging guide for distributed training issues in AReaL (FSDP2, TP, CP, EP).

When to Use

This skill is triggered when:

  • Training hangs or deadlocks
  • Results differ across ranks or are numerically wrong
  • OOM errors in distributed settings
  • NCCL/communication errors or device mesh issues

Debugging Principles

Minimal Reproduction

Always follow the minimal demo principle: Reproduce with the least amount of code to narrow down the issue faster.

# Bad: Debug in full training loop
# Good: Create minimal script
import torch
import torch.distributed as dist

dist.init_process_group("nccl")
rank = dist.get_rank()

# Reproduce the exact operation that fails
tensor = torch.ones(10).cuda()
dist.all_reduce(tensor)  # <-- Isolate the failing op
print(f"Rank {rank}: {tensor}")

Reduction strategy:

  1. Remove unrelated model components
  2. Use small tensor sizes
  3. Reduce world_size to minimum (e.g., 2 GPUs)
  4. Remove torch.compile if possible
  5. Disable activation checkpointing

Step-by-Step Debugging Guide

1. Hang Debugging (Deadlocks, Synchronization)

Environment Variables for Debugging:

# Full debug logging
export TORCH_DISTRIBUTED_DEBUG=DETAIL
export NCCL_DEBUG=INFO
export NCCL_DEBUG_SUBSYS=ALL

# torch.compile debugging
export TORCH_LOGS="+dynamo,recompiles"
export TORCHDYNAMO_VERBOSE=1

Dump Call Stack with py-spy (for hung processes):

# Find process IDs
ps aux | grep python

# Dump call stack of specific rank
py-spy dump --pid <PID>

# Record flame graph for performance analysis
py-spy record -o profile.svg --pid <PID> --duration 30

Common Causes:

  1. Mismatched Collectives: One rank calls all_reduce, another doesn't.
  2. Wrong Process Group: Using wrong group for collective.
  3. Tensor Shape Mismatch: Different shapes across ranks.

Debug Steps:

# Verify group membership
mesh = parallel_dims.get_mesh("dp_shard_cp")
group = mesh.get_group()
print(f"Rank {dist.get_rank()}: group size = {dist.get_world_size(group)}")

# Print shapes on all ranks
print(f"Rank {dist.get_rank()}: tensor.shape = {tensor.shape}")
dist.barrier()

Timeout Adjustment (for debugging only):

from areal.engine.core.distributed import patch_dist_group_timeout
from datetime import timedelta
patch_dist_group_timeout(timedelta(minutes=30))

2. Wrong Results (Gradient, Reduction Issues)

Check DTensor Placements:

from torch.distributed.tensor import DTensor
if isinstance(param, DTensor):
    print(f"Param {name}: placements={param.placements}, mesh={param.device_mesh}")

Verify Gradient Reduction:

for name, param in model.named_parameters():
    if param.grad is not None:
        print(f"Rank {dist.get_rank()}: {name} grad_sum = {param.grad.sum().item()}")

3. OOM Issues (Memory, Sharding)

Check Memory Usage:

print(f"Rank {dist.get_rank()}: "
      f"allocated={torch.cuda.memory_allocated()/1e9:.2f}GB, "
      f"reserved={torch.cuda.memory_reserved()/1e9:.2f}GB")

Check FSDP Coverage:

for name, param in model.named_parameters():
    is_dtensor = isinstance(param, DTensor)
    print(f"{name}: is_dtensor={is_dtensor}, shape={param.shape}")

4. Communication Errors

Error Cause Solution
NCCL WARN Cuda failure GPU communication Check NCCL version, GPU topology
RuntimeError: Timed out Rank synchronization Increase timeout, check code paths
Invalid device mesh Mesh configuration Verify world_size = dp * tp * cp

Debugging Tools

Environment Variables Reference

Variable Purpose
TORCH_DISTRIBUTED_DEBUG=DETAIL Detailed distributed logging
NCCL_DEBUG=INFO NCCL communication logging
NCCL_DEBUG_SUBSYS=ALL All NCCL subsystems
TORCH_LOGS="+dynamo,recompiles" torch.compile logging
TORCHDYNAMO_VERBOSE=1 Dynamo verbose output
CUDA_LAUNCH_BLOCKING=1 Synchronous CUDA (slow, for debugging)

py-spy for Call Stack Analysis

# Install
pip install py-spy

# Dump call stack of hung process
py-spy dump --pid <PID>

# Dump all Python processes
pgrep -f python | xargs -I {} py-spy dump --pid {}

# Record flame graph
py-spy record -o profile.svg --pid <PID> --duration 30

Rank-Conditional Printing

def print_all_ranks(msg):
    for r in range(dist.get_world_size()):
        if dist.get_rank() == r:
            print(f"[Rank {r}] {msg}")
        dist.barrier()

Check Device Mesh

def debug_mesh(parallel_dims):
    mesh = parallel_dims.world_mesh
    for dim_name in mesh.mesh_dim_names:
        submesh = parallel_dims.get_mesh(dim_name)
        if submesh:
            print(f"Rank {dist.get_rank()}: {dim_name} size={submesh.size()}")

Validate Tensor Consistency

def check_tensor_consistency(tensor, name, group=None):
    local_sum = tensor.sum().item()
    tensor_sums = [None] * dist.get_world_size(group)
    dist.all_gather_object(tensor_sums, local_sum, group=group)
    if dist.get_rank() == 0 and len(set(tensor_sums)) > 1:
        print(f"WARNING: {name} inconsistent: {tensor_sums}")

Key Files Reference

Component File
Parallel Dims areal/experimental/models/archon/parallel_dims.py
Expert Parallel areal/experimental/models/archon/expert_parallel.py
Ulysses (CP) areal/experimental/models/archon/ulysses.py
FSDP/TP Apply areal/experimental/models/archon/qwen2/infra/parallelize.py

指导在 Archon 训练引擎中添加新 HuggingFace 模型架构的支持。适用于用户希望集成 Llama、Mistral 等新模型时,提供从分析模型配置到选择参考实现的完整步骤。
询问如何向 Archon 添加模型 希望支持新的模型家族(如 Llama, Mistral) 提及添加新的 ModelSpec 或模型类型
.opencode/skills/add-archon-model/SKILL.md
npx skills add areal-project/AReaL --skill add-archon-model -g -y
SKILL.md
Frontmatter
{
    "name": "add-archon-model",
    "description": "Guide for adding a new model to the Archon engine. Use when user wants to add support for a new HuggingFace model architecture in ArchonEngine."
}

Add Archon Model

Add support for a new HuggingFace model architecture in the Archon training engine.

When to Use

This skill is triggered when:

  • User asks "how do I add a model to Archon?"
  • User wants to support a new model family (e.g., Llama, Mistral, DeepSeek) in ArchonEngine
  • User mentions adding a new ModelSpec or model type for Archon

Prerequisites

Before starting, ensure:

  • The target model is available on HuggingFace (has config.json with model_type)
  • You know the HuggingFace model ID (e.g., meta-llama/Llama-3-8B)
  • The model uses a standard transformer architecture (decoder-only)

Step-by-Step Guide

Step 1: Analyze the Target Model Architecture

Read the HuggingFace model's source code to extract key architecture information.

Action: Fetch and analyze the model's HuggingFace configuration and modeling files.

  1. Read the model's config.json (via AutoConfig.from_pretrained) to identify:

    • model_type string (this is the key used for registry lookup)
    • All architecture hyperparameters (hidden_size, num_layers, etc.)
    • Any model-specific fields (e.g., qk_norm, attention_bias, MoE fields)
  2. Read the HuggingFace modeling_*.py source to identify:

    • Attention variant: Does it have Q/K norm? Attention bias? Sliding window? Multi-latent attention?
    • FFN variant: SwiGLU (gate_proj + up_proj + down_proj)? GeGLU? Standard MLP?
    • MoE support: Does it have MoE layers? What router type? Shared experts?
    • RoPE variant: Standard RoPE? YaRN? NTK-aware scaling? What is the inv_freq formula?
    • Normalization: RMSNorm or LayerNorm? Pre-norm or post-norm? Elementwise affine?
    • Weight tying: Does tie_word_embeddings appear in config?
    • State dict key names: What are the HF weight key naming conventions?
  3. Summarize findings in a checklist like:

Target model: <name>
HF model_type: "<model_type>" (and variants like "<model_type>_moe" if applicable)
Attention: [standard GQA / with QK norm / with bias / sliding window / ...]
FFN: [SwiGLU / GeGLU / standard MLP / ...]
MoE: [no / yes - num_experts, top_k, shared_experts]
RoPE: [standard / YaRN / NTK-aware / ...]
Norm: [RMSNorm / LayerNorm] with [pre-norm / post-norm]
Weight tying: [yes / no]

Step 2: Select the Reference Model

Choose the closest existing implementation as a starting point:

Target characteristics Reference Why
Dense-only, standard GQA, no QK norm qwen2 Simplest baseline, pure dense
Has QK norm, or has MoE support qwen3 Supports QK norm + MoE + shared experts

Action: Copy the reference model directory as the starting point:

areal/experimental/models/archon/<model>/
  __init__.py
  spec.py
  model/
    args.py
    model.py
    rope.py
    state_dict_adapter.py
  infra/
    parallelize.py

Step 3: Implement args.py

Adapt <Model>ModelArgs to match the target model's HuggingFace config fields.

Key changes from reference:

  1. Update the @dataclass fields to match the target model's hyperparameters:

    • Field names should use Archon conventions (dim, n_layers, n_heads, n_kv_heads, vocab_size, head_dim, hidden_dim, norm_eps, rope_theta, etc.)
    • Default values should match the smallest variant of the target model
    • Add model-specific fields (e.g., attention_bias, qk_norm, sliding_window)
  2. Update from_hf_config() to correctly map HuggingFace config attributes:

    • Use getattr(hf_config, "field_name", default) for optional fields
    • Handle variant-specific fields (e.g., MoE fields only present in MoE variants)
    • The method must return an instance of the model args class

Critical: Verify every field mapping against the HF model's config.json. Incorrect mappings here cause silent errors downstream.

Base class contract (BaseModelArgs):

@dataclass
class <Model>ModelArgs(BaseModelArgs):
    # ... model-specific fields ...

    @classmethod
    def from_hf_config(
        cls,
        hf_config: PretrainedConfig,
        is_critic: bool = False,
        **kwargs,
    ) -> <Model>ModelArgs:
        # Map HF config fields to Archon model args
        ...

Step 4: Implement model.py

Adapt the model architecture to match the target model.

Key components to adapt:

  1. Normalization (RMSNorm or similar):

    • Check if elementwise_affine is configurable
    • Check the epsilon default value
    • If the model uses LayerNorm, implement accordingly
  2. Attention module:

    • Q/K/V projection: Check bias presence (nn.Linear(..., bias=True/False))
    • QK norm: Add q_norm/k_norm if the model has them, remove if it doesn't
    • GQA: n_kv_heads < n_heads for grouped-query attention
    • Ulysses SP: Keep the set_cp_group / _sp_enabled pattern from the reference
    • Output projection: Check bias presence
  3. FeedForward module:

    • SwiGLU: w2(silu(w1(x)) * w3(x)) -- most common for modern LLMs
    • Check bias in linear layers
    • For MoE models: MoE module replaces FeedForward on designated layers
  4. TransformerBlock: Pre-norm (most modern LLMs) vs post-norm

    • MoE layer detection via _is_moe_layer() if applicable
  5. Top-level Model (<Model>Model(BaseArchonModel)):

    • tok_embeddings, layers (as ModuleDict), norm, output/score
    • init_weights(): Match initialization scheme from HF
    • init_buffers(): RoPE cache + MoE buffers
    • forward(): Must follow BaseArchonModel signature: (tokens, positions, cu_seqlens, max_seqlen, tree_attn_meta=None) -> Tensor

Base class contract (BaseArchonModel):

class <Model>Model(BaseArchonModel):
    def forward(self, tokens, positions, cu_seqlens, max_seqlen, tree_attn_meta=None) -> torch.Tensor: ...
    def init_weights(self) -> None: ...
    def init_buffers(self, buffer_device) -> None: ...

Step 5: Implement rope.py

Handle the rotary position embedding variant.

Options:

  1. Standard RoPE (same as qwen2/qwen3): Re-export from qwen2:

    from areal.experimental.models.archon.qwen2.model.rope import (
        apply_rotary_emb,
        precompute_rope_cache,
        repeat_kv,
        reshape_for_broadcast,
        rotate_half,
    )
    
  2. Custom RoPE (YaRN, NTK-aware, etc.): Implement custom precompute_rope_cache() and apply_rotary_emb() functions. The key difference is usually in how inv_freq is computed (scaling factors, interpolation, etc.).

Step 6: Implement state_dict_adapter.py

Map between HuggingFace and Archon weight key names.

This is the most error-prone step. The adapter must correctly handle:

  1. Key name mapping (from_hf_map dict):

    • Embedding: model.embed_tokens.weight -> tok_embeddings.weight
    • Attention: model.layers.{}.self_attn.q_proj.weight -> layers.{}.attention.wq.weight
    • FFN: model.layers.{}.mlp.gate_proj.weight -> layers.{}.feed_forward.w1.weight
    • Norms: model.layers.{}.input_layernorm.weight -> layers.{}.attention_norm.weight
    • Output: lm_head.weight -> output.weight
    • Skip keys (set to None): rotary_emb.inv_freq (computed at runtime)
    • Model-specific keys: bias terms, QK norm weights, etc.
  2. Reverse mapping (to_hf_map): Auto-generated from from_hf_map

  3. MoE expert weights (if applicable): 3D<->2D conversion for expert weights. Copy the MoE handling from qwen3 if the model has MoE.

  4. Weight tying: Skip output.weight during to_hf() if tie_word_embeddings=True

Verification approach: After implementation, the adapter should satisfy:

# Roundtrip: archon -> hf -> archon preserves all keys
hf_sd = adapter.to_hf(archon_sd)
roundtrip_sd = adapter.from_hf(hf_sd)
assert set(roundtrip_sd.keys()) == set(archon_sd.keys())

Base class contract (BaseStateDictAdapter):

class <Model>StateDictAdapter(BaseStateDictAdapter):
    def from_hf(self, hf_state_dict) -> dict[str, Any]: ...
    def to_hf(self, archon_state_dict) -> dict[str, Any]: ...
    def convert_single_to_hf(self, name, tensor) -> list[tuple[str, torch.Tensor]]: ...

Step 7: Implement parallelize.py

Define the parallelization strategy for the model.

The parallelize function applies parallelism in this order:

  1. TP (Tensor Parallelism) -- shard attention/FFN across devices
  2. EP (Expert Parallelism) -- for MoE models only
  3. CP (Context Parallelism / Ulysses SP) -- sequence parallelism
  4. AC (Activation Checkpointing) -- memory optimization
  5. torch.compile -- compilation optimization
  6. FSDP (Fully Sharded Data Parallelism) -- data parallelism

Key adaptations by model architecture:

  • Attention with QK norm: wq/wk use use_local_output=False (DTensor output for norm), add SequenceParallel(sequence_dim=2) for q_norm/k_norm
  • Attention without QK norm: wq/wk/wv all use use_local_output=True
  • Attention with bias: Bias terms follow the same parallel plan as their weights
  • MoE layers: Separate TP plan for MoE input/output, router gate, and expert weights. Copy from qwen3's apply_moe_ep_tp() and apply_non_moe_tp()
  • Dense-only models: Simpler plan without MoE handling. Copy from qwen2

Function signature (must match ParallelizeFn protocol):

def parallelize_<model>(
    model: nn.Module,
    parallel_dims: ArchonParallelDims,
    param_dtype: torch.dtype = torch.bfloat16,
    reduce_dtype: torch.dtype = torch.float32,
    loss_parallel: bool = True,
    cpu_offload: bool = False,
    reshard_after_forward_policy: str = "default",
    ac_config: ActivationCheckpointConfig | None = None,
    enable_compile: bool = True,
) -> nn.Module:

Step 8: Create spec.py and Register

Assemble the ModelSpec and register it.

from areal.experimental.models.archon.model_spec import ModelSpec, register_model_spec
from areal.experimental.models.archon.pipeline_parallel import pipeline_llm
from areal.experimental.models.archon.<model>.infra.parallelize import parallelize_<model>
from areal.experimental.models.archon.<model>.model.args import <Model>ModelArgs
from areal.experimental.models.archon.<model>.model.model import <Model>Model
from areal.experimental.models.archon.<model>.model.state_dict_adapter import (
    <Model>StateDictAdapter,
)

<MODEL>_SPEC = ModelSpec(
    name="<Model>",
    model_class=<Model>Model,
    model_args_class=<Model>ModelArgs,
    state_dict_adapter_class=<Model>StateDictAdapter,
    parallelize_fn=parallelize_<model>,
    supported_model_types=frozenset({"<model_type>"}),  # From HF config.json
    pipelining_fn=pipeline_llm,
)

# Auto-register when module is imported
register_model_spec(<MODEL>_SPEC)

__all__ = ["<MODEL>_SPEC"]

Note: supported_model_types should include all HF model_type strings that this implementation handles (e.g., {"qwen3", "qwen3_moe"} for Qwen3).

Step 9: Register in __init__.py

Add the import to areal/experimental/models/archon/__init__.py:

from areal.experimental.models.archon.<model> import spec as <model>_spec  # noqa: F401

This triggers auto-registration when the module is imported.

Step 10: Verify and Test

Verification should be done in stages, adapting based on available hardware and the test patterns in tests/experimental/archon/.

Before writing tests, examine the existing test files to understand current patterns:

tests/experimental/archon/
  conftest.py             -- Pytest configuration (version checks)
  utils.py                -- Shared utilities (model loading, comparison)
  test_qwen3_args.py      -- Args unit tests (CPU-only)
  test_state_dict_adapter.py  -- State dict roundtrip tests
  test_weight_sync.py     -- Weight completeness tests (meta device)
  test_forward.py         -- Forward precision comparison (single GPU)
  ...

Test stages (write tests appropriate for the model's complexity):

Stage 1: Args Tests (CPU-only, always write these)

Test from_hf_config() with mock HuggingFace configs:

# Pattern: Create mock PretrainedConfig, verify args mapping
from unittest.mock import MagicMock

def test_args_from_hf_config():
    hf_config = MagicMock()
    hf_config.hidden_size = 4096
    hf_config.num_hidden_layers = 32
    # ... set all required fields
    args = <Model>ModelArgs.from_hf_config(hf_config)
    assert args.dim == 4096
    assert args.n_layers == 32

Stage 2: State Dict Adapter Tests (CPU-only)

Test key mapping roundtrip:

def test_state_dict_roundtrip():
    # Create adapter with mock config
    adapter = <Model>StateDictAdapter(mock_config)
    # Create fake archon state dict with expected keys
    archon_sd = {"tok_embeddings.weight": torch.randn(vocab, dim), ...}
    # Roundtrip
    hf_sd = adapter.to_hf(archon_sd)
    roundtrip = adapter.from_hf(hf_sd)
    assert set(roundtrip.keys()) == set(archon_sd.keys())

Stage 3: Weight Completeness (meta device, CPU-only)

Verify all model parameters have HF mappings:

def test_weight_completeness():
    # Create model on meta device
    with torch.device("meta"):
        model = <Model>Model(args)
    adapter = <Model>StateDictAdapter(hf_config)
    # Check every archon param has a HF mapping
    for name, _ in model.named_parameters():
        hf_pairs = adapter.convert_single_to_hf(name, torch.empty(0))
        assert len(hf_pairs) > 0, f"No HF mapping for {name}"

Stage 4: Forward Precision (single GPU, if available)

Compare Archon model output against HuggingFace reference:

@pytest.mark.skipif(not torch.cuda.is_available(), reason="Requires CUDA")
def test_forward_matches_hf():
    # Load both HF and Archon models
    # Run forward on same input
    # Compare logits within tolerance

Important: Do NOT hardcode the test categories. Inspect the existing test files in tests/experimental/archon/ and follow the same patterns, fixtures, and markers. Adapt test scope to the model's specific features (e.g., add MoE-specific tests only if the model has MoE).

Reference Implementations

Model Directory Features
Qwen2 areal/experimental/models/archon/qwen2/ Dense, attention bias, no QK norm
Qwen3 areal/experimental/models/archon/qwen3/ Dense + MoE, QK norm, no attention bias, shared experts

Architecture Decision Map

Feature qwen2 qwen3 What to check in target model
Attention bias Yes No attention_bias in HF config
QK norm No Yes qk_norm in HF config or QKNorm module in modeling file
MoE No Yes num_experts/num_local_experts in HF config
Shared experts No Yes num_shared_experts in HF config
Decoder sparse step No Yes decoder_sparse_step in HF config
Weight tying Both Both tie_word_embeddings in HF config
RoPE Standard Standard (re-export qwen2) Check inv_freq formula in HF modeling code

Common Mistakes

  • Not mapping all HF keys in state_dict_adapter.py (causes silent weight drops)
  • Wrong from_hf_config() field mapping (uses wrong HF config attribute name)
  • Forgetting to handle None keys in from_hf_map (keys to skip like rotary_emb.inv_freq)
  • Missing MoE expert weight 3D<->2D conversion when model has MoE
  • Wrong TP plan for attention with/without QK norm (use_local_output must match)
  • Forgetting to add import line in areal/experimental/models/archon/__init__.py
  • Not including all model_type variants in supported_model_types frozenset
  • Using print instead of areal.utils.logging.getLogger()

File Checklist

After completion, verify all files exist and are consistent:

  • areal/experimental/models/archon/<model>/__init__.py
  • areal/experimental/models/archon/<model>/spec.py -- ModelSpec + register
  • areal/experimental/models/archon/<model>/model/args.py -- ModelArgs + from_hf_config
  • areal/experimental/models/archon/<model>/model/model.py -- Model + Attention + FFN
  • areal/experimental/models/archon/<model>/model/rope.py -- RoPE (or re-export)
  • areal/experimental/models/archon/<model>/model/state_dict_adapter.py -- Key mapping
  • areal/experimental/models/archon/<model>/infra/parallelize.py -- Parallel strategy
  • areal/experimental/models/archon/__init__.py -- Import line added
  • tests/experimental/archon/test_<model>_*.py -- Tests

指导用户为AReaL框架添加新数据集加载器。涵盖创建包含SFT和RL训练数据处理函数的Python文件,并在__init__.py中注册新数据集的步骤。
用户询问如何添加数据集 用户希望集成新数据集 用户提及创建数据集加载器
.opencode/skills/add-dataset/SKILL.md
npx skills add areal-project/AReaL --skill add-dataset -g -y
SKILL.md
Frontmatter
{
    "name": "add-dataset",
    "description": "Guide for adding a new dataset loader to AReaL. Use when user wants to add a new dataset."
}

Add Dataset

Add a new dataset loader to AReaL.

When to Use

This skill is triggered when:

  • User asks "how do I add a dataset?"
  • User wants to integrate a new dataset
  • User mentions creating a dataset loader

Step-by-Step Guide

Step 1: Create Dataset File

Create areal/dataset/<name>.py:

from datasets import Dataset, load_dataset


def get_<name>_sft_dataset(
    path: str,
    split: str,
    tokenizer,
    max_length: int | None = None,
) -> Dataset:
    """Load dataset for SFT training.

    Args:
        path: Path to dataset (HuggingFace hub or local path)
        split: Dataset split (train/validation/test)
        tokenizer: Tokenizer for processing
        max_length: Maximum sequence length (optional)

    Returns:
        HuggingFace Dataset with processed samples
    """
    dataset = load_dataset(path=path, split=split)

    def process(sample):
        # Tokenize the full sequence (prompt + response)
        seq_token = tokenizer.encode(
            sample["question"] + sample["answer"] + tokenizer.eos_token
        )
        prompt_token = tokenizer.encode(sample["question"])
        # Loss mask: 0 for prompt, 1 for response
        loss_mask = [0] * len(prompt_token) + [1] * (len(seq_token) - len(prompt_token))
        return {"input_ids": seq_token, "loss_mask": loss_mask}

    dataset = dataset.map(process).remove_columns(["question", "answer"])

    if max_length is not None:
        dataset = dataset.filter(lambda x: len(x["input_ids"]) <= max_length)

    return dataset


def get_<name>_rl_dataset(
    path: str,
    split: str,
    tokenizer,
    max_length: int | None = None,
) -> Dataset:
    """Load dataset for RL training.

    Args:
        path: Path to dataset
        split: Dataset split
        tokenizer: Tokenizer for length filtering
        max_length: Maximum sequence length

    Returns:
        HuggingFace Dataset with prompts and answers for reward computation
    """
    dataset = load_dataset(path=path, split=split)

    def process(sample):
        messages = [
            {
                "role": "user",
                "content": sample["question"],
            }
        ]
        return {"messages": messages, "answer": sample["answer"]}

    dataset = dataset.map(process).remove_columns(["question"])

    if max_length is not None:

        def filter_length(sample):
            content = sample["messages"][0]["content"]
            tokens = tokenizer.encode(content)
            return len(tokens) <= max_length

        dataset = dataset.filter(filter_length)

    return dataset

Step 2: Register in init.py

Update areal/dataset/__init__.py:

# Add to VALID_DATASETS
VALID_DATASETS = [
    # ... existing datasets
    "<name>",
]

# Add to _get_custom_dataset function
def _get_custom_dataset(name: str, ...):
    # ... existing code
    elif name == "<name>":
        from areal.dataset.<name> import get_<name>_sft_dataset, get_<name>_rl_dataset
        if dataset_type == "sft":
            return get_<name>_sft_dataset(path, split, max_length, tokenizer)
        else:
            return get_<name>_rl_dataset(path, split, max_length, tokenizer)

Step 3: Add Config (Optional)

If the dataset needs special configuration, add to areal/api/cli_args.py:

@dataclass
class TrainDatasetConfig:
    # ... existing fields
    <name>_specific_field: Optional[str] = None

Step 4: Add Tests

Create tests/test_<name>_dataset.py:

import pytest
from areal.dataset.<name> import get_<name>_sft_dataset, get_<name>_rl_dataset

def test_sft_dataset_loads(tokenizer):
    dataset = get_<name>_sft_dataset("path/to/data", split="train", tokenizer=tokenizer)
    assert len(dataset) > 0
    assert "input_ids" in dataset.column_names
    assert "loss_mask" in dataset.column_names

def test_rl_dataset_loads(tokenizer):
    dataset = get_<name>_rl_dataset("path/to/data", split="train", tokenizer=tokenizer)
    assert len(dataset) > 0
    assert "messages" in dataset.column_names
    assert "answer" in dataset.column_names

Reference Implementations

Dataset File Description
GSM8K areal/dataset/gsm8k.py Math word problems
Geometry3K areal/dataset/geometry3k.py Geometry problems
CLEVR areal/dataset/clevr_count_70k.py Visual counting
HH-RLHF areal/dataset/hhrlhf.py Helpfulness/Harmlessness
TORL areal/dataset/torl_data.py Tool-use RL

Required Fields

SFT Dataset

{
    "messages": [
        {"role": "user", "content": "..."},
        {"role": "assistant", "content": "..."},
    ]
}

RL Dataset

{
    "messages": [
        {"role": "user", "content": "..."},
    ],
    "answer": "ground_truth_for_reward",
    # Optional metadata for reward function
}

Common Mistakes

  • Returning List[Dict] instead of HuggingFace Dataset
  • Using Python loops instead of dataset.map()/filter()
  • Missing "messages" field for RL datasets
  • Wrong message format (should be list of dicts with role and content)
  • Not registering in __init__.py

指导如何在AReaL框架中添加自定义奖励函数。涵盖创建Python文件、注册到__init__.py、处理异步阻塞操作及编写测试用例的完整步骤,适用于需要实现或修改模型评估奖励逻辑的场景。
用户询问如何添加奖励函数 用户希望实现自定义奖励 用户提及奖励计算
.opencode/skills/add-reward/SKILL.md
npx skills add areal-project/AReaL --skill add-reward -g -y
SKILL.md
Frontmatter
{
    "name": "add-reward",
    "description": "Guide for adding a new reward function to AReaL. Use when user wants to create a reward function."
}

Add Reward

Add a new reward function to AReaL.

When to Use

This skill is triggered when:

  • User asks "how do I add a reward function?"
  • User wants to implement custom rewards
  • User mentions reward computation

Step-by-Step Guide

Step 1: Create Reward File

Create areal/reward/<name>.py:

from typing import Any

from areal.utils import logging

logger = logging.getLogger("MyReward")


def <name>_reward_fn(
    prompt: str,
    completions: str,
    prompt_ids,
    completion_ids,
    answer: str | None = None,
    **kwargs: Any,
) -> float:
    """Compute reward for a single completion.

    Args:
        prompt: Prompt string
        completions: Completion string (model output)
        prompt_ids: Tokenized prompt IDs
        completion_ids: Tokenized completion IDs
        answer: Ground truth answer from dataset (optional)
        **kwargs: Additional data from dataset

    Returns:
        Reward value (float), typically 0.0 or 1.0
    """
    try:
        # Extract answer from completion
        extracted = _extract_answer(completions)

        # Compare with ground truth
        if answer is not None and extracted == str(answer):
            return 1.0
        return 0.0
    except Exception:
        logger.warning("Exception in reward computation", exc_info=True)
        return 0.0


def _extract_answer(completion: str) -> str:
    """Extract the answer from a completion string.

    Implement your extraction logic here.
    """
    # Example: Extract content from \boxed{}
    import re

    match = re.search(r"\\boxed\{([^}]+)\}", completion)
    if match:
        return match.group(1).strip()
    return completion.strip()

Step 2: Register in init.py

Update areal/reward/__init__.py:

# Add to VALID_REWARD_FN
VALID_REWARD_FN = [
    # ... existing reward functions
    "<name>",
]

# Add to get_reward_fn function
def get_reward_fn(name: str, **kwargs):
    # ... existing code
    elif name == "<name>":
        from areal.reward.<name> import <name>_reward_fn
        return <name>_reward_fn

Step 3: Handle Blocking Operations

If your reward function uses blocking operations (e.g., API calls, model inference), the workflow will wrap it with AsyncRewardWrapper:

# In your workflow
from areal.reward import AsyncRewardWrapper

self.reward_fn = AsyncRewardWrapper(reward_fn)

# Then call it asynchronously
rewards = await self.reward_fn(prompt, completions, **data)

Step 4: Add Tests

Create tests/test_<name>_reward.py:

import pytest
from areal.reward.<name> import <name>_reward_fn

def test_reward_correct_answer():
    reward = <name>_reward_fn(
        prompt="What is 2+2?",
        completions="The answer is \\boxed{4}",
        prompt_ids=None,
        completion_ids=None,
        answer="4",
    )
    assert reward == 1.0

def test_reward_wrong_answer():
    reward = <name>_reward_fn(
        prompt="What is 2+2?",
        completions="The answer is \\boxed{5}",
        prompt_ids=None,
        completion_ids=None,
        answer="4",
    )
    assert reward == 0.0

Reference Implementations

Reward File Description
GSM8K areal/reward/gsm8k.py Math answer verification
Geometry3K areal/reward/geometry3k.py Geometry answer verification
CLEVR areal/reward/clevr_count_70k.py Counting verification
MathVerify areal/reward/math_verify.py General math verification

Function Signature

All reward functions must follow this signature:

def reward_fn(
    prompt: str,               # Input prompt string
    completions: str,          # Model completion string
    prompt_ids,                # Tokenized prompt
    completion_ids,            # Tokenized completion
    **kwargs: Any,             # Additional data from dataset (e.g., answer)
) -> float:                    # Reward value (typically 0.0 or 1.0)

Note: The reward function is called once per sample. Batching is handled by AsyncRewardWrapper in the workflow.

Key Requirements

  1. Deterministic: Same inputs should produce same outputs
  2. Return float: Output is a single float value per sample
  3. No blocking in async context: Use AsyncRewardWrapper if needed
  4. Logging: Use areal.utils.logging, not print
  5. Handle exceptions: Return 0.0 on error, don't raise

Common Mistakes

  • Returning a tensor instead of a float
  • Expecting batched inputs (reward is called per sample)
  • Non-deterministic behavior
  • Blocking operations without AsyncRewardWrapper
  • Raising exceptions instead of returning 0.0

指导为AReaL项目添加单元测试,涵盖测试类型区分、文件命名规范、Arrange-Act-Assert编写模式及Pytest标记策略,旨在提升代码覆盖率并确保CI流程稳定。
用户询问如何添加测试 需要为新功能编写测试 希望增加测试覆盖率 想了解AReaL的测试模式
.opencode/skills/add-unit-tests/SKILL.md
npx skills add areal-project/AReaL --skill add-unit-tests -g -y
SKILL.md
Frontmatter
{
    "name": "add-unit-tests",
    "description": "Guide for adding unit tests to AReaL. Use when user wants to add tests for new functionality or increase test coverage."
}

Add Unit Tests

Add unit tests to AReaL following the project's testing conventions.

When to Use

This skill is triggered when:

  • User asks "how do I add tests?"
  • User wants to increase test coverage
  • User needs to write tests for new functionality
  • User wants to understand AReaL testing patterns

Step-by-Step Guide

Step 1: Understand Test Types

AReaL has two main test categories:

Test Type Purpose Location Pattern How It Runs
Unit Tests Test individual functions/modules tests/test_<module>_<feature>.py Directly via pytest
Distributed Tests Test distributed/parallel behavior tests/torchrun/run_*.py Via torchrun (called by pytest subprocess)

Note: All tests are invoked via pytest. Distributed tests use torchrun but are still called from pytest test files.

Step 2: Create Test File Structure

Create test file with naming convention: test_<module>_<feature>.py

import pytest
import torch

# Import the module to test
from areal.dataset.gsm8k import get_gsm8k_sft_dataset
from tests.utils import get_dataset_path  # Optional test utilities
# For mocking tokenizer: from unittest.mock import MagicMock

Step 3: Write Test Functions

Follow Arrange-Act-Assert pattern:

def test_function_under_condition_returns_expected():
    """Test that function returns expected value under condition."""
    # Arrange
    input_data = 5
    expected_output = 10

    # Act
    result = function_under_test(input_data)

    # Assert
    assert result == expected_output

Step 4: Add Pytest Markers and CI Strategy

Use appropriate pytest markers:

Marker When to Use
@pytest.mark.slow Test takes > 10 seconds (excluded from CI by default)
@pytest.mark.ci Slow test that must run in CI (use with @pytest.mark.slow)
@pytest.mark.asyncio Async test functions
@pytest.mark.skipif(cond, reason=...) Conditional skip
@pytest.mark.parametrize(...) Parameterized tests

CI Test Strategy:

  • @pytest.mark.slow: Excluded from CI by default (CI runs pytest -m "not slow")
  • @pytest.mark.slow + @pytest.mark.ci: Slow but must run in CI
  • No marker: Runs in CI (fast unit tests)
@pytest.mark.asyncio
async def test_async_function():
    result = await async_function()
    assert result == expected

@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_gpu_feature():
    tensor = torch.tensor([1, 2, 3], device="cuda")
    # ... assertions

@pytest.mark.parametrize("batch_size", [1, 4, 16])
def test_with_parameters(batch_size):
    # Parameterized test

@pytest.mark.slow
def test_slow_function():
    # Excluded from CI by default

@pytest.mark.slow
@pytest.mark.ci
def test_slow_but_required_in_ci():
    # Slow but must run in CI

Step 5: Mock Distributed Environment

For unit tests that need distributed mocks:

import torch.distributed as dist

def test_distributed_function(monkeypatch):
    monkeypatch.setattr(dist, "get_rank", lambda: 0)
    monkeypatch.setattr(dist, "get_world_size", lambda: 2)
    result = distributed_function()
    assert result == expected

Step 6: Handle GPU Dependencies

Always skip gracefully when GPU unavailable:

CUDA_AVAILABLE = torch.cuda.is_available()

@pytest.mark.skipif(not CUDA_AVAILABLE, reason="CUDA not available")
def test_gpu_function():
    tensor = torch.tensor([1, 2, 3], device="cuda")
    # ... assertions

Key Requirements (Based on testing.md)

Mocking Distributed

  • Use torch.distributed.fake_pg for unit tests
  • Mock dist.get_rank() and dist.get_world_size() explicitly
  • Don't mock internals of FSDP/DTensor

GPU Test Constraints

  • Always skip gracefully when GPU unavailable
  • Clean up GPU memory: torch.cuda.empty_cache() in fixtures
  • Use smallest possible model/batch for unit tests

Assertions

  • Use torch.testing.assert_close() for tensor comparison
  • Specify rtol/atol explicitly for numerical tests
  • Avoid bare assert tensor.equal() - no useful error message

Reference Implementations

Test File Description Key Patterns
tests/test_utils.py Utility function tests Fixtures, parametrized tests
tests/test_examples.py Integration tests with dataset loading Dataset path resolution, success pattern matching
tests/test_fsdp_engine_nccl.py Distributed tests Torchrun integration

Common Mistakes

  • Missing test file registration: Ensure file follows test_*.py naming
  • GPU dependency without skip: Always use @pytest.mark.skipif for GPU tests
  • Incorrect tensor comparisons: Use torch.testing.assert_close() not assert tensor.equal()
  • Memory leaks in GPU tests: Clean up with torch.cuda.empty_cache()
  • Mocking too much: Don't mock FSDP/DTensor internals
  • Unclear test names: Follow test_<what>_<condition>_<expected> pattern
  • No docstrings: Add descriptive docstrings to test functions

Integration with Other Skills

This skill complements other AReaL development skills:

  • After /add-dataset: Add tests for new dataset loaders
  • After /add-workflow: Add tests for new workflows
  • After /add-reward: Add tests for new reward functions
  • With expert agents: Reference this skill when planning test implementation

Running Tests

# First check GPU availability (many tests require GPU)
python -c "import torch; print('GPU available:', torch.cuda.is_available())"

# Run specific test file
uv run pytest tests/test_<name>.py

# Skip slow tests (CI default)
uv run pytest -m "not slow"

# Run with verbose output
uv run pytest -v

# Run distributed tests (requires torchrun and multi-GPU)
# Note: Usually invoked via pytest test files
torchrun --nproc_per_node=2 tests/torchrun/run_<test>.py
指导用户如何在AReaL系统中创建和注册新的RolloutWorkflow。涵盖编写自定义工作流类、在__init__.py中注册以及更新训练脚本以使用新工作流的完整步骤。
用户询问如何添加工作流 用户想要创建新的RolloutWorkflow 用户提及实现自定义 rollout
.opencode/skills/add-workflow/SKILL.md
npx skills add areal-project/AReaL --skill add-workflow -g -y
SKILL.md
Frontmatter
{
    "name": "add-workflow",
    "description": "Guide for adding a new RolloutWorkflow to AReaL. Use when user wants to create a new workflow."
}

Add Workflow

Add a new RolloutWorkflow implementation to AReaL.

When to Use

This skill is triggered when:

  • User asks "how do I add a workflow?"
  • User wants to create a new RolloutWorkflow
  • User mentions implementing a custom rollout

Prerequisites

Before starting, ensure you understand:

  • The workflow's purpose and requirements
  • Input/output data format
  • Reward function to use

Step-by-Step Guide

Step 1: Create Workflow File

Create areal/workflow/<name>.py:

import uuid
from typing import Any, Callable

import torch

from areal.api.cli_args import GenerationHyperparameters
from areal.api.engine_api import InferenceEngine
from areal.api.io_struct import ModelRequest, ModelResponse
from areal.api.reward_api import AsyncRewardWrapper
from areal.api.workflow_api import RolloutWorkflow
from areal.utils import logging

logger = logging.getLogger("MyWorkflow")


class MyWorkflow(RolloutWorkflow):
    """Description of your workflow."""

    def __init__(
        self,
        gconfig: GenerationHyperparameters,
        tokenizer,
        reward_fn: Callable,
    ):
        self.gconfig = gconfig.new_with_stop_and_pad_token_ids(tokenizer)
        self.tokenizer = tokenizer
        self.async_reward_fn = AsyncRewardWrapper(reward_fn)

    async def arun_episode(
        self,
        engine: InferenceEngine,
        data: dict[str, Any],
    ) -> dict[str, Any] | None | dict[str, InteractionWithTokenLogpReward]:
        """Run a single episode. MUST be async and non-blocking."""

        # 1. Prepare input_ids from data
        input_ids = self.tokenizer.apply_chat_template(
            data["messages"],
            tokenize=True,
            add_generation_prompt=True,
        )

        # 2. Build ModelRequest
        req = ModelRequest(
            rid=uuid.uuid4().hex,
            input_ids=list(input_ids),
            gconfig=self.gconfig.new(n_samples=1),
            tokenizer=self.tokenizer,
        )

        # 3. Generate completion (async)
        resp: ModelResponse = await engine.agenerate(req)

        # 4. Compute reward (async)
        prompt_str = self.tokenizer.decode(input_ids)
        completion_str = self.tokenizer.decode(resp.output_tokens)
        reward = await self.async_reward_fn(
            prompt_str,
            completion_str,
            resp.input_tokens,
            resp.output_tokens,
            **data,
        )

        # 5. Return results in expected format
        return {
            "input_ids": torch.tensor(resp.input_tokens),
            "output_ids": torch.tensor(resp.output_tokens),
            "reward": torch.tensor(reward),
        }

Step 2: Register in init.py

Add to areal/workflow/__init__.py:

from areal.workflow.<name> import MyWorkflow

__all__ = [
    # ... existing exports
    "MyWorkflow",
]

Step 3: Update Entry Script

Update your training script to use the new workflow:

trainer.train(
    workflow="areal.workflow.<name>.MyWorkflow",
    # ... other args
)

Step 4: Add Tests

Create tests/test_<name>_workflow.py:

import pytest
from areal.workflow.<name> import MyWorkflow

@pytest.mark.asyncio
async def test_workflow_basic():
    # Test basic functionality
    pass

Reference Implementations

Workflow File Description
MultiTurnWorkflow areal/workflow/multi_turn.py Multi-turn conversation
RLVRWorkflow areal/workflow/rlvr.py RL with verifiable rewards
VisionRLVRWorkflow areal/workflow/vision_rlvr.py Vision + RLVR

Key Requirements

  1. Async: arun_episode must be async def and non-blocking
  2. No sync I/O: Use aiofiles for file operations
  3. Wrap rewards: Use AsyncRewardWrapper for reward functions
  4. Tensor format: Output tensors should be [batch, seq_len, ...]
  5. Use helpers: concat_padded_tensors for combining outputs

Common Mistakes

  • Using open() instead of aiofiles.open()
  • Forgetting to await async calls
  • Not wrapping reward function with AsyncRewardWrapper
  • Wrong tensor shape conventions

为AReaL仓库提供Conventional Commits提交规范,涵盖类型选择、格式要求及基于文件路径的自动范围推断,确保Git提交信息标准化。
执行git commit操作 创建Pull Request时 通过task代理执行commit任务
.opencode/skills/commit-conventions/SKILL.md
npx skills add areal-project/AReaL --skill commit-conventions -g -y
SKILL.md
Frontmatter
{
    "name": "commit-conventions",
    "description": "AReaL commit message conventions. MUST load on every git commit -- provides Conventional Commits format with scope inference from file paths."
}

Commit Conventions

Commit message conventions and scope inference rules for the AReaL repository.

When to Use

ALWAYS load this skill when making any git commit in AReaL. This includes:

  • Direct commits (git commit)
  • Commits during PR creation (/create-pr)
  • Commits delegated via task(load_skills=["commit-conventions"], ...)
  • Any agent workflow that produces a commit

Commit Message Format

<type>(<scope>): <subject>

<body>

[Optional sections:]
Key changes:
- change 1
- change 2

Refs: #123, #456

Type Selection

Type When to Use
feat New feature or capability
fix Bug fix
docs Documentation only
gov Governance or maintainer changes
style Formatting/style-only changes
refactor Code change without feature/fix
perf Performance improvement
test Adding or fixing tests
build Build system or dependencies
ci CI pipeline or workflow changes
chore Build, deps, config changes
revert Revert a previous commit

Scope Inference

Infer scope from the primary changed file paths:

File Path Pattern Scope
areal/workflow/ workflow
areal/engine/ engine
areal/reward/ reward
areal/dataset/ dataset
areal/api/ api
areal/utils/ utils
areal/infra/ infra
areal/trainer/ trainer
areal/models/ models
areal/experimental/ archon
docs/ docs
examples/ examples
AGENTS.md, .agents/, .claude/, .codex/, .opencode/ agents
Multiple areas Omit scope or use broader term

Rules

  • Subject: imperative mood, ~50-72 chars, no trailing period
  • Body: explain "why" not "what", wrap at 72 chars
  • Key changes: bullet list of main modifications (for complex commits with 3+ files)
  • Refs: reference issues/PRs if applicable

Examples

Single file fix:

fix(reward): handle empty completion in gsm8k

Return 0 reward instead of raising exception when
completion string is empty after extraction.

Multi-file feature:

feat(engine): add CPU offload support to ArchonEngine

Enable torch_memory_saver for model offloading during
rollout phase to reduce GPU memory pressure.

Key changes:
- Add offload/onload methods to ArchonEngine
- Integrate with weight update flow
- Handle ROCm compatibility

Docs only:

docs: update algorithm comparison table

Add SAPO and GSPO to the algorithm family documentation
with configuration examples.

Agent/tooling changes:

chore(agents): port review-pr command to OpenCode

Add OpenCode-native commands with task() category
delegation instead of hardcoded model names.

Key changes:
- Create .opencode/command/ with review-pr, create-pr
- Replace hardcoded model routing with platform-native review routing
- Add expert subagent consultation patterns

Governance/maintainer changes:

gov(agents): add maintainer ownership for service modules

Update CODEOWNERS and maintainer references to reflect
current governance responsibilities.

Key changes:
- Add maintainers for agent_service and infra ownership
- Align governance docs with updated reviewer responsibilities

针对AReaL分布式训练(FSDP2/TP等)的调试指南,涵盖死锁、结果错误、OOM及通信故障。提供最小复现策略、环境配置、py-spy堆栈分析及具体排查步骤。
训练挂起或死锁 多卡结果不一致或数值错误 分布式OOM报错 NCCL通信或设备网格异常
.opencode/skills/debug-distributed/SKILL.md
npx skills add areal-project/AReaL --skill debug-distributed -g -y
SKILL.md
Frontmatter
{
    "name": "debug-distributed",
    "description": "Guide for debugging distributed training issues in AReaL. Use when user encounters hangs, wrong results, OOM, or communication errors."
}

Debug Distributed Training

Debugging guide for distributed training issues in AReaL (FSDP2, TP, CP, EP).

When to Use

This skill is triggered when:

  • Training hangs or deadlocks
  • Results differ across ranks or are numerically wrong
  • OOM errors in distributed settings
  • NCCL/communication errors or device mesh issues

Debugging Principles

Minimal Reproduction

Always follow the minimal demo principle: Reproduce with the least amount of code to narrow down the issue faster.

# Bad: Debug in full training loop
# Good: Create minimal script
import torch
import torch.distributed as dist

dist.init_process_group("nccl")
rank = dist.get_rank()

# Reproduce the exact operation that fails
tensor = torch.ones(10).cuda()
dist.all_reduce(tensor)  # <-- Isolate the failing op
print(f"Rank {rank}: {tensor}")

Reduction strategy:

  1. Remove unrelated model components
  2. Use small tensor sizes
  3. Reduce world_size to minimum (e.g., 2 GPUs)
  4. Remove torch.compile if possible
  5. Disable activation checkpointing

Step-by-Step Debugging Guide

1. Hang Debugging (Deadlocks, Synchronization)

Environment Variables for Debugging:

# Full debug logging
export TORCH_DISTRIBUTED_DEBUG=DETAIL
export NCCL_DEBUG=INFO
export NCCL_DEBUG_SUBSYS=ALL

# torch.compile debugging
export TORCH_LOGS="+dynamo,recompiles"
export TORCHDYNAMO_VERBOSE=1

Dump Call Stack with py-spy (for hung processes):

# Find process IDs
ps aux | grep python

# Dump call stack of specific rank
py-spy dump --pid <PID>

# Record flame graph for performance analysis
py-spy record -o profile.svg --pid <PID> --duration 30

Common Causes:

  1. Mismatched Collectives: One rank calls all_reduce, another doesn't.
  2. Wrong Process Group: Using wrong group for collective.
  3. Tensor Shape Mismatch: Different shapes across ranks.

Debug Steps:

# Verify group membership
mesh = parallel_dims.get_mesh("dp_shard_cp")
group = mesh.get_group()
print(f"Rank {dist.get_rank()}: group size = {dist.get_world_size(group)}")

# Print shapes on all ranks
print(f"Rank {dist.get_rank()}: tensor.shape = {tensor.shape}")
dist.barrier()

Timeout Adjustment (for debugging only):

from areal.engine.core.distributed import patch_dist_group_timeout
from datetime import timedelta
patch_dist_group_timeout(timedelta(minutes=30))

2. Wrong Results (Gradient, Reduction Issues)

Check DTensor Placements:

from torch.distributed.tensor import DTensor
if isinstance(param, DTensor):
    print(f"Param {name}: placements={param.placements}, mesh={param.device_mesh}")

Verify Gradient Reduction:

for name, param in model.named_parameters():
    if param.grad is not None:
        print(f"Rank {dist.get_rank()}: {name} grad_sum = {param.grad.sum().item()}")

3. OOM Issues (Memory, Sharding)

Check Memory Usage:

print(f"Rank {dist.get_rank()}: "
      f"allocated={torch.cuda.memory_allocated()/1e9:.2f}GB, "
      f"reserved={torch.cuda.memory_reserved()/1e9:.2f}GB")

Check FSDP Coverage:

for name, param in model.named_parameters():
    is_dtensor = isinstance(param, DTensor)
    print(f"{name}: is_dtensor={is_dtensor}, shape={param.shape}")

4. Communication Errors

Error Cause Solution
NCCL WARN Cuda failure GPU communication Check NCCL version, GPU topology
RuntimeError: Timed out Rank synchronization Increase timeout, check code paths
Invalid device mesh Mesh configuration Verify world_size = dp * tp * cp

Debugging Tools

Environment Variables Reference

Variable Purpose
TORCH_DISTRIBUTED_DEBUG=DETAIL Detailed distributed logging
NCCL_DEBUG=INFO NCCL communication logging
NCCL_DEBUG_SUBSYS=ALL All NCCL subsystems
TORCH_LOGS="+dynamo,recompiles" torch.compile logging
TORCHDYNAMO_VERBOSE=1 Dynamo verbose output
CUDA_LAUNCH_BLOCKING=1 Synchronous CUDA (slow, for debugging)

py-spy for Call Stack Analysis

# Install
pip install py-spy

# Dump call stack of hung process
py-spy dump --pid <PID>

# Dump all Python processes
pgrep -f python | xargs -I {} py-spy dump --pid {}

# Record flame graph
py-spy record -o profile.svg --pid <PID> --duration 30

Rank-Conditional Printing

def print_all_ranks(msg):
    for r in range(dist.get_world_size()):
        if dist.get_rank() == r:
            print(f"[Rank {r}] {msg}")
        dist.barrier()

Check Device Mesh

def debug_mesh(parallel_dims):
    mesh = parallel_dims.world_mesh
    for dim_name in mesh.mesh_dim_names:
        submesh = parallel_dims.get_mesh(dim_name)
        if submesh:
            print(f"Rank {dist.get_rank()}: {dim_name} size={submesh.size()}")

Validate Tensor Consistency

def check_tensor_consistency(tensor, name, group=None):
    local_sum = tensor.sum().item()
    tensor_sums = [None] * dist.get_world_size(group)
    dist.all_gather_object(tensor_sums, local_sum, group=group)
    if dist.get_rank() == 0 and len(set(tensor_sums)) > 1:
        print(f"WARNING: {name} inconsistent: {tensor_sums}")

Key Files Reference

Component File
Parallel Dims areal/experimental/models/archon/parallel_dims.py
Expert Parallel areal/experimental/models/archon/expert_parallel.py
Ulysses (CP) areal/experimental/models/archon/ulysses.py
FSDP/TP Apply areal/experimental/models/archon/qwen2/infra/parallelize.py

- 위키
Copyright © 2011-2026 iteam. Current version is 2.155.2. UTC+08:00, 2026-07-05 21:53
浙ICP备14020137号-1 $방문자$