NeoTask 版本演进

在微服务和分布式系统日益普及的今天,任务调度框架成为基础设施的重要组成部分。本文记录了从零开始设计和实现一个通用分布式任务池(NeoTask)的完整历程,包括架构设计、技术选型、演进路径以及关键问题的解决方案。希望能为正在构建类似系统的开发者提供参考。

📚 返回首页 | 📖 快速上手 | 📘 API 参考 | ⚙️ 配置说明 | 🐙 GitHub | 🐱 NeoTask 官网


项目定位

NeoTask 是一个嵌入式通用任务池组件,以库的形式被其他应用引用,通过函数调用方式使用,而非独立的 HTTP 服务。这意味着开发者可以在现有项目中直接引入,无需部署额外的服务进程。

核心目标

目标 说明
嵌入友好 用户通过 submit() / wait_for_result() 使用,无需理解分布式细节
可扩展 从单机到分布式集群平滑演进,配置切换即可升级
高可用 节点故障时任务自动恢复,保证任务不丢失
可观测 提供内置监控能力,可选的 Web UI 面板
任务编排 支持复杂的工作流 DAG 依赖管理

核心设计原则

  • 渐进式增强

    每个版本都是独立可用的完整产品,用户可以根据需求选择合适的版本:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    from neotask import TaskPool, TaskPoolConfig

    # 单机模式
    pool = TaskPool(
    executor=my_executor,
    config=TaskPoolConfig(storage_type="memory")
    )

    # SQLite 持久化模式
    pool = TaskPool(
    executor=my_executor,
    config=TaskPoolConfig(storage_type="sqlite", sqlite_path="tasks.db")
    )

    # 分布式模式
    pool = TaskPool(
    executor=my_executor,
    config=TaskPoolConfig(storage_type="redis", redis_url="redis://localhost:6379")
    )
  • 向后兼容

    • 新版本不破坏旧版 API
    • 存储格式支持平滑升级
    • 配置参数有合理默认值
  • 可观测性优先

    • 关键操作都有事件埋点

    • 指标默认收集

    • 支持 Prometheus 集成

用户使用方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from neotask import TaskPool

# 1. 定义业务执行函数
async def my_executor(task_data: dict) -> dict:
# 业务逻辑
return {"result": "processed"}

# 2. 创建任务池
pool = TaskPool(executor=my_executor)

# 3. 使用
task_id = pool.submit({"data": "value"})
result = pool.wait_for_result(task_id) # 同步等待

# 或异步等待
result = await pool.wait_for_result_async(task_id)

架构演进总览

核心架构图

graph TB
    subgraph UserLayer["用户层"]
        APP[用户应用代码]
    end

    subgraph APILayer["API 层"]
        TP[TaskPool
即时任务入口] TS[TaskScheduler
定时任务入口] WF[WorkflowEngine
工作流编排入口] end subgraph CoreLayer["核心层"] LM[LifecycleManager
任务生命周期] QS[QueueScheduler
队列调度器] WP[WorkerPool
Worker池] FM[FutureManager
异步等待] EB[EventBus
事件总线] end subgraph HA["高可用层 v1.0"] WD[WatchDog
看门狗] TD[TimeoutDetector
超时检测] REC[Reclaimer
任务回收] HB[Heartbeat
节点心跳] end subgraph OrchestrationLayer["编排层 v1.5"] DAG[DAG引擎] COND[条件分支] PARA[并行执行] end subgraph StorageLayer["存储层"] MEM[Memory] SQL[(SQLite)] REDIS[(Redis)] end subgraph MonitorLayer["监控层"] MC[MetricsCollector] HM[HealthChecker] REP[Reporter] end subgraph EnterpriseLayer["企业级层 v2.0"] WEB[Web UI] TENANT[多租户] PROM[Prometheus] end APP --> TP & TS & WF TP --> LM & QS & WP & FM TS --> TP WF --> TP & EB WP --> WD & TD & HB LM --> REC LM --> EB WP --> EB QS --> REDIS LM --> MEM & SQL & REDIS EB --> MC MC --> REP WP --> HM MC --> PROM WF --> WEB APP --> TENANT

演进路线图

timeline
    title NeoTask 架构演进路线图
    
    section v0.1
        基础任务池 : 本地内存队列
                   : 异步执行引擎
                   : 内存/SQLite存储
                   
    section v0.2
        可观测性 : 事件总线
                 : 指标收集器
                 : 健康检查
                 
    section v0.3
        定时调度 : 延时队列/时间轮
                 : 周期任务调度
                 : Cron表达式解析
                 
    section v0.4
        分布式基础 : Redis共享队列
                   : 分布式锁
                   : 多节点协调
                   
    section v0.5
        性能优化 : 预取机制
                 : 批量操作
                 : 连接池优化
                 : 任务进度实时更新

    section v1.0
        高可用保障 : 看门狗续期
                   : 超时检测
                   : 故障自动恢复

    section v1.5
        任务编排 : DAG工作流
                 : 条件分支
                 : 并行执行

    section v2.0
        企业级特性 : 独立Web UI
                   : 多租户隔离
                   : Prometheus集成

版本功能对比

功能 v0.1 v0.2 v0.3 v0.4 v0.5 v1.0 v1.5 v2.0
即时任务
优先级队列
内存/SQLite存储
可观测性 -
事件总线 -
指标收集 -
定时调度 - -
延时执行 - -
周期任务/Cron - -
分布式 - - -
Redis存储 - - -
分布式锁 - - -
性能优化 - - - -
预取机制 - - - -
高可用 - - - - -
看门狗 - - - - -
故障恢复 - - - - -
任务编排 - - - - - -
DAG工作流 - - - - - -
条件分支 - - - - - -
企业级 - - - - - - -
Web UI - - - - - - -
多租户 - - - - - - -
Prometheus - - - - - - -

技术选型

组件 v0.1 v0.2 v0.3 v0.4 v0.5 v1.0 v1.5 v2.0
异步框架 asyncio asyncio asyncio asyncio asyncio asyncio asyncio asyncio
存储 内存/SQLite 内存/SQLite 内存/SQLite Redis Redis Redis Redis Redis
分布式锁 - - - SET NX SET NX 看门狗 看门狗 看门狗
队列 deque deque deque Sorted Set Sorted Set Sorted Set Sorted Set Sorted Set
事件 - EventBus EventBus EventBus EventBus EventBus EventBus +WebSocket
延时任务 - - 时间轮 时间轮 时间轮 时间轮 时间轮 时间轮
周期任务 - - Cron Cron Cron Cron Cron Cron
Web框架 - - - - - - - FastAPI
监控 - 基础指标 指标收集 指标收集 +批处理 +超时检测 +工作流指标 +Prometheus
编排引擎 - - - - - - DAG DAG

v0.1:基础任务池

实现最核心的任务调度能力,验证架构可行性。

graph TB
    subgraph User["用户应用层"]
        APP[用户代码]
    end
    
    subgraph TaskPool["NeoTask 核心 v0.1"]
        TP[TaskPool
入口门面] subgraph Core["核心组件"] LM[LifecycleManager
任务生命周期管理] QS[QueueScheduler
队列调度器] WP[WorkerPool
Worker池/并发控制] FM[FutureManager
异步等待管理] end EX[TaskExecutor
用户业务逻辑] end subgraph Storage["存储层"] MEM[MemoryStorage] SQLITE[(SQLiteStorage)] end APP -->|submit/wait| TP TP --> LM TP --> QS TP --> WP TP --> FM LM --> MEM LM --> SQLITE WP -->|execute| EX

核心设计

任务状态机

stateDiagram-v2
    [*] --> PENDING: 创建任务
    
    PENDING --> RUNNING: Worker 开始执行
    PENDING --> CANCELLED: 用户取消
    
    RUNNING --> SUCCESS: 执行成功
    RUNNING --> FAILED: 执行失败
    RUNNING --> CANCELLED: 用户取消
    
    FAILED --> PENDING: 自动重试
    FAILED --> [*]: 超过重试次数
    
    SUCCESS --> [*]: 完成
    CANCELLED --> [*]: 已取消

数据流时序

sequenceDiagram
    participant User
    participant Pool as TaskPool
    participant LM as LifecycleManager
    participant QS as QueueScheduler
    participant WP as WorkerPool
    participant FM as FutureManager
    participant Storage as Storage

    User->>Pool: submit(data, priority)
    Pool->>LM: create_task()
    LM->>Storage: 保存任务(PENDING)
    LM-->>Pool: 返回 Task 对象
    Pool->>QS: push(task_id, priority)
    Pool->>FM: get(task_id)
    Pool-->>User: 返回 task_id
    
    loop Worker 调度循环
        WP->>QS: pop() 获取任务
        WP->>LM: start_task(task_id)
        WP->>Storage: 更新状态(RUNNING)
        
        alt 任务执行成功
            WP->>WP: executor.execute(data)
            WP->>LM: complete_task(result)
            WP->>FM: complete(task_id, result)
            WP->>Storage: 更新状态(SUCCESS)
        else 任务执行失败
            WP->>LM: fail_task(error)
            WP->>FM: complete(task_id, error)
            WP->>Storage: 更新状态(FAILED)
            WP->>QS: push(重试入队)
        else 任务取消
            WP->>LM: cancel_task()
            WP->>FM: complete(task_id, error)
        end
    end
    
    User->>Pool: wait_for_result(task_id)
    Pool->>FM: wait()
    FM-->>Pool: 返回结果
    Pool-->>User: 任务结果

关键设计决策

挑战 解决方案 设计理由
任务持久化 支持内存/SQLite 可选存储 满足不同场景需求,开发测试用内存,生产用 SQLite
并发控制 asyncio.Semaphore 控制并发数 充分利用 Python 异步能力,避免线程开销
优先级队列 heapq 堆实现优先级队列 内存高效,O(log n) 入队出队
异步等待 Future + Event 实现等待机制 符合 asyncio 编程模型,易于理解

v0.2:可观测性

增加可观测性,支持任务生命周期监控和指标收集。

graph TB
    subgraph Core["核心组件"]
        TP[TaskPool]
        LM[LifecycleManager]
        QS[QueueScheduler]
        WP[WorkerPool]
    end
    
    subgraph Monitor["监控模块 v0.2"]
        EB[EventBus
事件总线] MC[MetricsCollector
指标收集器] end subgraph Storage["存储层"] MEM[Memory] SQL[SQLite] end WP -.->|emit event| EB EB -.->|subscribe| MC MC --> STATS[统计数据] LM --> EB User[用户] -->|get_stats| TP TP --> MC

核心设计

事件驱动架构

事件总线是 v0.2 的核心组件,采用发布-订阅模式解耦各模块:

graph LR
    subgraph Producers["事件生产者"]
        LM[LifecycleManager]
        WP[WorkerPool]
        QS[QueueScheduler]
    end
    
    subgraph Bus["EventBus"]
        Q[事件队列]
        DISPATCH[分发器]
    end
    
    subgraph Consumers["事件消费者"]
        MC[MetricsCollector]
        LOG[日志记录]
        ALERT[告警系统]
    end
    
    Producers -->|emit| Bus
    Bus -->|notify| Consumers
sequenceDiagram
    participant TP as TaskProcessor
    participant EB as EventBus
    participant MC as MetricsCollector
    participant User

    TP->>EB: emit(TASK_STARTED, task_id)
    EB->>MC: on_task_started()
    MC->>MC: 记录开始时间
    
    TP->>TP: 执行任务...
    
    TP->>EB: emit(TASK_COMPLETED, duration)
    EB->>MC: on_task_completed()
    MC->>MC: 计算耗时、更新吞吐量
    
    User->>TP: get_stats()
    TP->>MC: get_current_stats()
    MC-->>User: 统计数据

事件类型定义

事件类型 触发时机 携带数据
task.created 任务创建 task_id, data
task.started 任务开始执行 task_id, node_id
task.completed 任务完成 task_id, result, duration
task.failed 任务失败 task_id, error
task.cancelled 任务取消 task_id
task.retry 任务重试 task_id, retry_count

指标收集体系

指标类型 指标名称 说明
计数器 任务提交总数 累计提交的任务数量
计数器 任务完成总数 累计完成的任务数量
计数器 任务失败总数 累计失败的任务数量
瞬时值 当前待处理数 队列中等待执行的任务数
瞬时值 当前执行中数 正在执行的任务数
分布统计 执行时间分布 P50/P90/P95/P99 执行时间
分布统计 排队时间分布 任务在队列中的等待时间
比率 成功率 成功任务数 / 总执行数
比率 吞吐量 每秒完成任务数

挑战与解决方案

挑战 解决方案 设计理由
事件丢失 同步事件总线,失败重试 保证关键事件不丢失
指标膨胀 环形缓冲区,限制历史长度 控制内存使用,保留最新数据
性能影响 异步事件处理,不阻塞主流程 保证任务执行不受监控影响

v0.3:定时调度

支持延时执行、周期任务和 Cron 表达式,满足定时任务场景需求。

类型 说明 示例
延时执行 指定延迟时间后执行 5分钟后执行
定时执行 指定具体时间点执行 2024-12-25 10:00:00 执行
周期性执行 按固定间隔重复执行 每5分钟执行一次
Cron 表达式 复杂时间规则 每天9点执行
graph TB
    subgraph User["用户层"]
        APP[用户应用]
    end
    
    subgraph TaskScheduler["TaskScheduler v0.3"]
        SUB[任务提交接口]
        
        subgraph Scheduler["调度引擎"]
            TIMER[调度循环
扫描到期任务] CRON[Cron解析器] end end subgraph TaskPool["TaskPool"] POOL[TaskPool
即时任务执行] end subgraph Storage["存储层"] DELAY_Q[延时队列
Sorted Set] PERIODIC_H[周期任务配置] end APP --> SUB SUB --> POOL SUB --> DELAY_Q SUB --> PERIODIC_H TIMER -->|扫描| DELAY_Q TIMER -->|触发| POOL CRON -->|计算| TIMER

核心设计

时间轮设计

时间轮是处理大量延时任务的高效数据结构,将时间划分为多个槽位,每个槽位存放该时间点需要执行的任务。

graph TB
    subgraph TimeWheel["时间轮(60个槽位,每槽1秒)"]
        TW[环形数组]
        
        subgraph Slot0["槽位0"]
            T1[任务A]
        end
        
        subgraph Slot1["槽位1"]
            T2[任务B]
            T3[任务C]
        end
        
        subgraph Slot2["槽位2"]
            T4[任务D]
        end
        
        subgraph SlotN["..."]
            TN[更多任务]
        end
    end
    
    POINTER[指针
每秒移动一格] POINTER --> TW POINTER -->|指向槽位0| EXEC[执行槽位0中的任务]

时间轮的优势:

  • O(1) 时间复杂度:添加和删除任务都是常数时间

  • 内存高效:无需为每个任务创建定时器

  • 批量处理:同一槽位的任务批量执行

周期任务调度流程

sequenceDiagram
    participant User
    participant API as TaskScheduler
    participant Redis
    participant Cron as Cron解析器
    participant Timer as 时间轮
    participant Executor

    User->>API: submit_cron(data, "*/5 * * * *")
    API->>Redis: 存储周期任务配置
    API->>Redis: 计算下次执行时间
    API->>Redis: 加入延时队列
    API-->>User: task_id

    loop 每个周期
        Timer->>Redis: 扫描到期任务
        Redis-->>Timer: 周期任务到期
        
        Timer->>Cron: 计算下次执行时间
        Cron-->>Timer: next_run_at
        
        Timer->>Executor: 执行任务
        Timer->>Redis: 更新下次执行时间
        Timer->>Redis: 重新加入延时队列
    end

Cron 表达式支持

表达式 含义 示例
* * * * * 每分钟执行 心跳检测
*/5 * * * * 每5分钟执行 定期检查
0 9 * * * 每天9:00执行 日报生成
0 9 * * 1 每周一9:00执行 周报生成
0 0 1 * * 每月1号0:00执行 月报生成
0 9-17 * * * 每天9-17点整点执行 营业时间任务

v0.4:分布式基础

支持多节点部署,任务可被任意节点消费,实现水平扩展。

graph TB
    subgraph Nodes["任务执行节点(对等架构)"]
        subgraph Node1["Node 1"]
            P1[TaskPool]
            Q1[QueueScheduler]
            W1[WorkerPool]
        end
        
        subgraph Node2["Node 2"]
            P2[TaskPool]
            Q2[QueueScheduler]
            W2[WorkerPool]
        end
        
        subgraph Node3["Node 3"]
            P3[TaskPool]
            Q3[QueueScheduler]
            W3[WorkerPool]
        end
    end
    
    subgraph Redis["Redis 共享存储"]
        PQ[优先级队列
Sorted Set] DQ[延迟队列
Sorted Set] TASKS[任务详情
Hash] LOCKS[分布式锁
String] INDEX[状态索引
Set] end Nodes --> Redis Redis --> Nodes

核心设计

去中心化对等架构

v0.4 采用去中心化设计,所有节点对等,无主从概念:

graph LR
    subgraph Decentralized["去中心化架构"]
        direction LR
        N1[节点A] --- N2[节点B]
        N2 --- N3[节点C]
        N3 --- N1
    end
    
    subgraph Traditional["传统主从架构"]
        direction TB
        L[Leader] --> F1[Follower1]
        L --> F2[Follower2]
        L --> F3[Follower3]
    end

去中心化架构的优势:

特性 去中心化 主从架构
单点故障 有(Leader 故障)
水平扩展 线性扩展 受 Leader 瓶颈
任务分配 抢占式 Leader 分配
复杂度 较低 较高(需选举)

分布式锁机制

分布式锁是保证任务不被重复执行的关键,使用 Redis 的 SET NX EX 命令实现原子操作:

sequenceDiagram
    participant NodeA as 节点A
    participant Redis as Redis
    participant NodeB as 节点B

    Note over NodeA,Redis: 节点A抢占任务
    NodeA->>Redis: SET lock:task-123 NX EX 30
    Redis-->>NodeA: OK(获取锁成功)
    NodeA->>Redis: ZREM queue task-123
    
    Note over NodeB,Redis: 节点B尝试抢占同一任务
    NodeB->>Redis: SET lock:task-123 NX EX 30
    Redis-->>NodeB: NULL(锁已被占用)
    
    Note over NodeA,Redis: 节点A执行完成,释放锁
    NodeA->>Redis: DEL lock:task-123

多节点协调

graph TB
    subgraph NodeCoordination["多节点协调机制"]
        REG[节点注册
/加入集群] HB[心跳上报
/健康证明] DISCOVER[节点发现
/获取活跃节点] FAIL[故障检测
/节点失联] end REG --> HB --> DISCOVER DISCOVER --> FAIL FAIL -.->|重新注册| REG

挑战与解决方案

分布式任务调度系统在从单机演进到多节点时,面临以下关键挑战:

graph TB
    subgraph Challenges["v0.4 核心挑战"]
        C1[任务重复消费]
        C2[节点协调难题]
        C3[数据一致性]
        C4[网络分区处理]
    end
    
    Challenges --> S[解决方案]
    
    subgraph Solutions["解决方案"]
        S1[分布式锁 + Lua原子操作]
        S2[去中心化对等架构]
        S3[Redis共享存储]
        S4[心跳检测 + 锁超时]
    end
挑战 解决方案 关键技术 复杂度
任务重复消费 分布式锁 + Lua 原子操作 SET NX EX, Lua 脚本
节点协调难题 去中心化对等架构 抢占式消费,无主节点
数据一致性 Redis 共享状态中心 原子操作,状态索引
网络分区处理 心跳检测 + 锁超时 TTL 机制,定期扫描

决策理由

  1. 去中心化架构:避免主节点瓶颈和单点故障,实现真正的水平扩展
  2. Redis Sorted Set:天然支持优先级排序和原子操作,性能优异
  3. 悲观锁机制:在分布式环境下更可靠,配合 TTL 避免死锁
  4. 心跳+TTL 组合:既能快速检测故障,又能容忍短暂网络抖动

挑战一:任务重复消费

在分布式环境下,多个节点同时从队列消费任务时,可能出现同一任务被多个节点同时获取的情况:

sequenceDiagram
    participant Redis as Redis队列
    participant NodeA as 节点A
    participant NodeB as 节点B

    Note over NodeA,NodeB: 问题场景:任务被重复消费
    
    NodeA->>Redis: ZPOPMIN(获取任务T1)
    NodeB->>Redis: ZPOPMIN(获取任务T1)
    Redis-->>NodeA: 返回T1
    Redis-->>NodeB: 返回T1(同一任务!)
    
    Note over NodeA,NodeB: 结果:T1被执行了两次

解决方案:分布式锁 + Lua 原子操作

flowchart LR
    subgraph Atomic["原子操作流程"]
        A[从队列取任务] --> B[尝试获取锁]
        B -->|成功| C[更新任务状态]
        B -->|失败| D[跳过该任务]
        C --> E[继续取下一个]
        D --> E
    end

核心设计要点

要点 实现方式 作用
原子性 Lua 脚本执行取任务+加锁 保证操作不可分割
锁唯一性 SET NX(不存在才设置) 只有一个节点能获取锁
锁自动释放 EX(过期时间) 防止节点崩溃导致死锁
锁归属校验 锁值存储节点ID 只有持有者才能释放锁

挑战二:节点协调难题

多节点环境下,需要解决以下协调问题:

graph LR
    subgraph Problems["节点协调问题"]
        P1[节点发现
如何知道其他节点存在?] P2[任务分配
任务该由哪个节点处理?] P3[负载均衡
如何避免节点忙闲不均?] P4[故障检测
如何发现节点已崩溃?] end

解决方案:去中心化对等架构

graph TB
    subgraph Decentralized["去中心化架构核心原则"]
        PR1[所有节点对等
无主从概念] PR2[抢占式消费
谁抢到谁处理] PR3[本地决策
无需全局协调] PR4[最终一致性
可容忍短暂不一致] end

各节点工作流程

flowchart TB
    subgraph NodeLifecycle["节点生命周期"]
        START[节点启动] --> REG[注册到Redis]
        REG --> HEARTBEAT[定期发送心跳]
        HEARTBEAT --> CONSUME[抢占消费任务]
        CONSUME --> CHECK{是否继续运行?}
        CHECK -->|是| HEARTBEAT
        CHECK -->|否| UNREG[注销节点]
    end

节点协调机制对比

机制 传统主从架构 NeoTask 去中心化
任务分配 Leader 统一分配 节点主动抢占
负载均衡 Leader 决策 自然竞争均衡
故障检测 Leader 检测 所有节点互相检测
协调开销 O(n) O(1)
单点风险 有(Leader故障)

挑战三:数据一致性

分布式环境下,任务状态需要在多个节点间保持一致:

stateDiagram-v2
    [*] --> PENDING: 创建任务
    
    PENDING --> RUNNING: 节点A认领
    RUNNING --> SUCCESS: 节点A完成
    
    note right of RUNNING
        问题场景:
        节点A认领后崩溃
        状态仍为RUNNING
        其他节点无法处理
    end note

解决方案:Redis 作为共享状态中心

graph TB
    subgraph StateStorage["任务状态存储设计"]
        TASK[任务Hash
task:id] LOCK[分布式锁
lock:id] QUEUE[优先级队列
queue:priority] INDEX[状态索引
status:state] end subgraph Benefits["设计优势"] B1[单一事实来源] B2[原子操作支持] B3[自动过期机制] B4[天然持久化] end StateStorage --> Benefits

状态一致性保证

操作 原子性保证 实现方式
任务创建 Redis HSET + SADD
任务认领 Lua 脚本(取队列+加锁+更新状态)
状态更新 分布式锁保护
故障恢复 最终一致 定期扫描 + 锁超时

挑战四:网络分区处理

分布式系统中,网络分区(节点间网络中断)是不可避免的:

graph LR
    subgraph Normal["正常情况"]
        N1[节点A] <--> R[(Redis)]
        N2[节点B] <--> R
    end
    
    subgraph Partition["网络分区"]
        P1[节点A] -.->|断开| R
        P2[节点B] <--> R
    end

解决方案:心跳检测 + 锁超时

flowchart TB
    subgraph Detection["故障检测机制"]
        HB[节点心跳
每5秒更新] LOCK[任务锁
30秒TTL] SCAN[扫描器
每10秒运行] end subgraph Judgement["判定逻辑"] J1{心跳超时>15秒?} J2{锁是否存在?} J3{锁是否过期?} end subgraph Action["处置动作"] A1[标记节点可疑] A2[等待宽限期] A3[回收任务] A4[任务重新入队] end Detection --> Judgement --> Action

分区处理策略

场景 判定条件 处理动作 恢复后
临时网络抖动 心跳超时 < 30秒 标记可疑,等待恢复 自动恢复
节点真崩溃 心跳超时 > 30秒 回收该节点所有任务 重新注册
锁持有者失联 锁存在但心跳超时 等待锁过期 其他节点接管
脑裂 双主同时存在 锁机制保证互斥 版本号仲裁

v0.5:性能优化

通过预取机制和批量操作,大幅提升系统吞吐量,减少网络开销。

预取机制

sequenceDiagram
    participant Node as 节点
    participant Redis as Redis
    participant Local as 本地队列

    loop 每 100ms
        Node->>Local: 检查本地队列大小
        
        alt 本地队列 < 阈值(饥饿状态)
            Node->>Redis: 批量预取任务(Lua原子操作)
            Note over Node,Redis: 1. 从队列取N个任务
2. 为每个任务设置锁
3. 更新任务状态 Redis-->>Node: 返回 task_ids Node->>Local: 批量放入本地队列 else 本地队列充足 Node->>Local: 直接从本地队列消费 end end

预取策略对比

策略 描述 适用场景 优缺点
SIZE_BASED 基于队列大小触发预取 负载平稳 简单有效,但对突发流量响应慢
TIME_BASED 基于时间间隔触发预取 周期性负载 可预测,但可能浪费资源
HYBRID 混合策略,结合大小和时间 通用场景 平衡响应速度和资源利用

性能提升数据

graph LR
    subgraph Before["优化前"]
        B1[单次获取1个任务] --> B2[每次操作都访问Redis]
        B3[高网络延迟] --> B4[吞吐量: ~10k QPS]
    end
    
    subgraph After["优化后"]
        A1[批量获取20个任务] --> A2[减少95% Redis访问]
        A3[本地队列缓存] --> A4[吞吐量: ~30k QPS]
    end
    
    Before -->|预取机制| After

批量操作优化

操作类型 优化前 优化后 提升倍数
任务提交 单次提交 批量提交 10x
任务获取 单次获取 批量预取 20x
状态更新 逐一更新 批量更新 5x
指标上报 实时上报 批量聚合 10x

挑战与解决方案

挑战 解决方案
任务重复消费 分布式锁 + Lua 原子操作
优先级反转 本地队列使用 PriorityQueue
锁竞争 预取批量减少操作次数
节点负载不均 本地消费能力决定处理量

v1.0:高可用保障

解决节点故障、任务超时等异常场景,实现生产级高可用。

graph TB
    subgraph Node["任务节点 v1.0"]
        EXE[任务执行器]
        WD[看门狗
锁续期] HB[心跳上报] end subgraph HA["高可用组件"] DET[超时检测器
定期扫描] REC[任务回收器
故障恢复] HM[健康监控
节点心跳] end subgraph Storage["Redis存储"] LOCKS[分布式锁] HEART[节点心跳] TASKS[任务状态] end WD -->|续期| LOCKS HB -->|写入| HEART DET -->|扫描| TASKS DET -->|检查| LOCKS DET -->|检查| HEART REC -->|回收| TASKS HM -->|检测| HEART

看门狗机制

长时间任务需要定期续期锁,防止锁过期导致任务被重复执行

sequenceDiagram
    participant Node as 节点
    participant WD as 看门狗
    participant Redis as Redis

    Node->>Redis: SET lock NX EX 30
    Node->>WD: 启动看门狗
    
    loop 每 10 秒
        WD->>Redis: 检查锁归属
        alt 锁仍被当前节点持有
            WD->>Redis: EXPIRE lock 30(续期)
        else 锁已丢失
            WD->>WD: 停止续期,任务失败
        end
    end
    
    Node->>Redis: 任务完成,DEL lock
    WD->>WD: 停止

故障恢复流程

flowchart TB
    START[开始] --> SCAN[扫描 RUNNING 状态任务]
    SCAN --> CHECK_LOCK{检查锁是否存在}
    
    CHECK_LOCK -->|锁存在| CHECK_NODE{节点心跳正常?}
    CHECK_NODE -->|正常| WAIT[继续等待]
    CHECK_NODE -->|超时| MARK_DEAD[标记节点死亡]
    
    CHECK_LOCK -->|锁不存在| MARK_ORPHAN[标记为孤儿任务]
    
    MARK_DEAD --> RECOVER
    MARK_ORPHAN --> RECOVER[任务恢复]
    
    RECOVER --> CHECK_RETRY{重试次数
是否超限?} CHECK_RETRY -->|未超限| REQUEUE[重新入队] CHECK_RETRY -->|已超限| DLQ[移入死信队列] REQUEUE --> END([结束]) DLQ --> ALERT[发送告警] ALERT --> END

恢复时间目标

故障类型 检测时间 恢复时间 总 RTO
节点崩溃 15秒 15秒 30秒
网络分区 30秒 30秒 60秒
任务超时 可配置 立即 超时阈值
锁过期 30秒 5秒 35秒

死信队列设计

graph LR
    subgraph Normal["正常流程"]
        T[任务] --> Q[队列] --> W[Worker] --> S[成功]
    end
    
    subgraph Failure["失败流程"]
        F[任务失败] --> R1[重试1]
        R1 --> R2[重试2]
        R2 --> R3[重试3]
        R3 --> DLQ[(死信队列)]
    end
    
    subgraph DLQProcess["死信处理"]
        DLQ --> ALERT[告警通知]
        DLQ --> REVIEW[人工审核]
        REVIEW -->|可恢复| REQUEUE[重新入队]
        REVIEW -->|业务错误| DISCARD[丢弃]
    end

挑战与解决方案

挑战 解决方案
锁续期与节点故障 心跳 + 锁双重检测
任务卡死检测 进度更新 + 超时阈值
节点宕机任务丢失 心跳检测 + 自动回收
重复回收 分布式锁保护回收操作

v1.5:任务编排

支持复杂的工作流 DAG 依赖管理,实现任务编排能力。

使用场景

任务编排使用场景:

  1. 顺序执行:A → B → C
  2. 并行执行:A 完成后,B 和 C 并行
  3. 条件分支:根据 A 的结果决定执行 B 还是 C
  4. 聚合等待:等待 B 和 C 都完成后执行 D
  5. 超时控制:整个 DAG 的超时时间
  6. 重试策略:子任务独立重试
graph TB
    subgraph Orchestration["任务编排层 v1.5"]
        DAG[DAG定义
任务依赖图] SCH[WorkflowScheduler
工作流调度器] ENG[WorkflowEngine
工作流执行引擎] CTX[WorkflowContext
上下文传递] end subgraph Core["核心层(已有)"] TP[TaskPool
即时任务执行] TS[TaskScheduler
定时任务调度] EB[EventBus
事件总线] end subgraph Storage["存储层扩展"] WF[(Workflow存储)] EXEC[(执行记录)] STATE[(状态存储)] end DAG --> SCH SCH --> ENG ENG --> TP ENG --> TS CTX --> ENG ENG --> WF ENG --> EXEC ENG --> STATE EB -.->|事件监听| ENG

工作流类型

graph LR
    subgraph Sequential["顺序执行"]
        A[A] --> B[B] --> C[C]
    end
    
    subgraph Parallel["并行执行"]
        A2[A] --> B2[B]
        A2 --> C2[C]
        B2 --> D2[D]
        C2 --> D2
    end
    
    subgraph Conditional["条件分支"]
        A3[A] --> COND{条件判断}
        COND -->|True| B3[B]
        COND -->|False| C3[C]
        B3 --> D3[D]
        C3 --> D3
    end
    
    subgraph Complex["复杂DAG"]
        E[E] --> F[F]
        E --> G[G]
        F --> H[H]
        G --> H
        H --> I[I]
    end

工作流状态机

stateDiagram-v2
    [*] --> PENDING: 创建工作流
    
    PENDING --> RUNNING: 开始执行
    PENDING --> CANCELLED: 用户取消
    
    RUNNING --> WAITING: 等待子任务完成
    WAITING --> RUNNING: 依赖满足
    
    RUNNING --> SUCCESS: 所有节点成功
    RUNNING --> FAILED: 任一节点失败
    RUNNING --> CANCELLED: 用户取消
    RUNNING --> TIMEOUT: 整体超时
    
    SUCCESS --> [*]
    FAILED --> [*]
    CANCELLED --> [*]
    TIMEOUT --> [*]

节点状态机

stateDiagram-v2
    [*] --> PENDING: 创建节点
    
    PENDING --> READY: 依赖满足
    PENDING --> SKIPPED: 条件不满足
    
    READY --> RUNNING: 开始执行
    READY --> CANCELLED: 工作流取消
    
    RUNNING --> SUCCESS: 执行成功
    RUNNING --> FAILED: 执行失败
    RUNNING --> CANCELLED: 工作流取消
    
    SUCCESS --> [*]
    FAILED --> [*]
    SKIPPED --> [*]
    CANCELLED --> [*]

条件分支示例

1
2
3
4
5
6
7
# 条件表达式示例
conditions = {
"status_check": "result.status == 'valid'",
"numeric_check": "result.count > 10",
"user_check": "context.user_id in ['admin', 'root']",
"range_check": "10 < result.value < 100"
}

错误处理策略

策略 行为 适用场景
FAIL_FAST 一失败立即停止工作流 关键任务,不允许部分成功
FAIL_SAFE 继续执行,记录失败节点 非关键任务,允许部分成功
RETRY 失败节点自动重试 网络波动等临时故障
FALLBACK 使用备用节点/数据 有降级方案的任务

DAG 可视化

graph LR
    subgraph DAG["DAG 工作流示例"]
        FETCH[fetch_data] --> VALIDATE[validate_data]
        
        VALIDATE --> PROCESS_A[process_a]
        VALIDATE --> PROCESS_B[process_b]
        
        PROCESS_A --> AGGREGATE[aggregate]
        PROCESS_B --> AGGREGATE
        
        AGGREGATE --> SAVE[save_result]
    end
    
    subgraph Status["执行状态"]
        direction LR
        S1[fetch_data] --> S2[validate_data]
        S2 --> S3[process_a]
        S2 --> S4[process_b]
    end

v2.0:企业级特性

提供可选的 Web 监控面板,支持多租户和 Prometheus 集成。

graph TB
    subgraph Nodes["任务节点"]
        N1[Node1
TaskPool] N2[Node2
TaskPool] N3[Node3
TaskPool] end subgraph Redis["Redis 共享存储"] DATA[(任务数据)] STATS[(统计数据)] TENANT[(租户隔离)] end subgraph UI["独立 Web UI 服务 v2.0"] API[FastAPI
REST API] WS[WebSocket
实时推送] FRONT[前端
HTMX/React] AUTH[认证授权
多租户] end subgraph Monitor["监控集成"] PROM[Prometheus
指标暴露] GRAFANA[Grafana
可视化] end Nodes --> Redis UI --> Redis UI --> PROM PROM --> GRAFANA User[用户] --> FRONT FRONT --> API & WS FRONT --> AUTH

Web UI 设计

嵌入式部署模式

UI 作为后台线程自动启动,用户无需额外命令,一个进程搞定所有。

graph TB
    subgraph Process["用户进程"]
        APP[用户应用]
        SCH[TaskScheduler]
        UI[FastAPI App
运行在独立线程] end APP -->|调用| SCH SCH -->|自动启动| UI UI -->|同进程| APP

API 端点设计

方法 路径 说明
GET /api/stats 实时统计
GET /api/tasks 任务列表(支持分页、筛选)
GET /api/tasks/{id} 任务详情
POST /api/tasks/{id}/cancel 取消任务
POST /api/tasks/{id}/retry 重试任务
GET /api/workflows 工作流列表
GET /api/workflows/{id} 工作流详情
GET /api/workflows/{id}/dag 工作流 DAG 可视化
GET /api/nodes 节点列表
GET /api/metrics 指标数据
WS /ws WebSocket 实时推送

WebSocket 实时推送

sequenceDiagram
    participant Browser as 浏览器
    participant Server as Web UI 服务
    participant EB as EventBus

    Browser->>Server: WebSocket 连接
    Server->>EB: 订阅所有事件
    
    loop 实时推送
        EB->>Server: 任务状态变更事件
        Server->>Browser: 推送更新
        Browser->>Browser: 更新界面
    end

多租户设计

租户隔离模型

graph TB
    subgraph TenantA["租户A"]
        TA1[任务队列]
        TA2[任务数据]
        TA3[工作流]
    end
    
    subgraph TenantB["租户B"]
        TB1[任务队列]
        TB2[任务数据]
        TB3[工作流]
    end
    
    subgraph Shared["共享资源"]
        WORKERS[Worker Pool]
        REDIS[Redis]
    end
    
    TenantA --> Shared
    TenantB --> Shared

租户配额管理

配额项 说明 默认值
max_tasks 最大任务数 10,000
max_queue_size 最大队列大小 1,000
max_concurrent 最大并发数 10
max_workflows 最大工作流数 100
task_ttl 任务保留时间 7天

Prometheus 集成

指标暴露

graph LR
    subgraph NeoTask
        MC[MetricsCollector]
        EXP[Exporter]
    end
    
    subgraph Prometheus
        SCRAPE[抓取器]
        TSDB[时序数据库]
    end
    
    subgraph Grafana
        DASH[仪表盘]
        ALERT[告警]
    end
    
    MC --> EXP
    EXP -->|HTTP:9090/metrics| SCRAPE
    SCRAPE --> TSDB
    TSDB --> DASH
    TSDB --> ALERT

核心指标

指标名 类型 说明
neotask_tasks_total Counter 任务总数(按状态、租户)
neotask_task_duration_seconds Histogram 任务执行时长分布
neotask_queue_size Gauge 队列大小
neotask_active_workers Gauge 活跃 Worker 数
neotask_success_rate Gauge 成功率
neotask_throughput Gauge 吞吐量

架构决策记录(ADR)

ADR-001: 选择嵌入式而非独立服务

  • 状态: 已采纳

  • 日期: 2026-04-01

  • 决策: 以库的形式提供,而非独立 HTTP 服务

  • 原因: 降低使用门槛,减少部署复杂度,用户无需额外运维

ADR-002: 使用 Sorted Set 实现优先级队列

  • 状态: 已采纳

  • 日期: 2026-04-01

  • 决策: Redis Sorted Set 存储任务队列

  • 原因: 天然支持优先级排序,原子操作,性能优异

ADR-003: 预取机制设计

  • 状态: 已采纳

  • 日期: 2026-04-01

  • 决策: 本地队列 + 批量预取

  • 原因: 减少 Redis 访问次数 95%,提高吞吐量 3 倍

ADR-004: 去中心化架构(无主节点)

  • 状态: 已采纳

  • 日期: 2026-05-02

  • 决策: 所有节点对等,采用抢占式消费

  • 原因: 避免单点故障,简化架构,线性扩展

ADR-005: 任务编排 DAG 设计

  • 状态: 已采纳

  • 日期: 2026-04-15

  • 决策: 基于依赖图的 DAG 执行引擎

  • 原因: 支持复杂业务场景,提升用户体验

ADR-006: 故障恢复策略

  • 状态: 已采纳

  • 日期: 2026-05-06

  • 决策: 心跳检测 + 锁超时 + 任务回收

  • 原因: 保证任务不丢失,5 分钟内自动恢复


性能基准

场景 配置 QPS P99延迟 说明
单机内存队列 1 Worker 50k 2ms 基础性能
单机内存队列 10 Worker 100k 5ms 并发提升
Redis 分布式 1 节点 10k 10ms 网络开销
Redis 分布式 3 节点 25k 15ms 水平扩展
预取机制 预取=20 30k 8ms 性能提升 3x
高可用模式 全功能 15k 20ms 稳定运行
任务编排 10节点 DAG 5k 50ms 编排开销

版本兼容性

升级路径 API 变更 存储变更 迁移成本
v0.1 → v0.2 新增事件 API 自动迁移
v0.2 → v0.3 新增定时 API 新增延时队列 自动迁移
v0.3 → v0.4 新增 Redis 结构 自动迁移
v0.4 → v0.5 无需迁移
v0.5 → v1.0 新增心跳表 自动迁移
v1.0 → v1.5 新增编排 API 新增 DAG 表 自动迁移
v1.5 → v2.0 新增 Web API 新增租户表 自动迁移

总结

架构演进历程

graph TB
    subgraph Phase1["Phase 1: 基础能力 v0.1-0.3"]
        P1[基础任务队列
可观测性
定时调度] end subgraph Phase2["Phase 2: 分布式 v0.4-0.5"] P2[Redis共享队列
分布式锁
预取机制] end subgraph Phase3["Phase 3: 高可用 v1.0"] P3[看门狗续期
超时检测
故障恢复] end subgraph Phase4["Phase 4: 任务编排 v1.5"] P4[DAG工作流
条件分支
并行执行] end subgraph Phase5["Phase 5: 企业级 v2.0"] P5[Web UI
多租户
Prometheus] end Phase1 --> Phase2 --> Phase3 --> Phase4 --> Phase5

各版本核心能力总结

版本 核心能力 关键组件 里程碑
v0.1 基础任务队列 LifecycleManager, WorkerPool 可用
v0.2 可观测性 EventBus, MetricsCollector 可监控
v0.3 定时调度 TaskScheduler, 时间轮, Cron 功能完整
v0.4 分布式基础 Redis 存储, 分布式锁 可扩展
v0.5 性能优化 预取机制, 批量操作 高性能
v1.0 高可用保障 看门狗, 故障恢复 生产级
v1.5 任务编排 DAG 引擎, 条件分支 复杂场景
v2.0 企业级 Web UI, 多租户 商业化

关键设计决策回顾

  1. 嵌入式设计:以库的形式提供,降低使用门槛

  2. 渐进式增强:每个版本都是完整产品,用户可按需选择

  3. 存储抽象:支持多种存储后端,平滑演进

  4. 事件驱动:EventBus 解耦各模块,支持监控和扩展

  5. 异步优先:基于 asyncio,充分利用 Python 异步能力

  6. 去中心化:无主节点,抢占式消费,简单可靠

  7. DAG 编排:支持复杂工作流,满足企业级需求


相关链接

返回首页

入门与参考

架构与设计

分布式核心

存储与监控

资源