回调函数

回调函数(Callback Function) 是一种编程模式,指的是将一个函数作为参数传递给另一个函数,并在特定时机(如事件触发、异步操作完成、条件满足等)被调用执行。

  • 实现异步操作(如网络请求、文件读写完成后的处理)

  • 实现事件驱动(如 GUI 中按钮点击、定时器触发)

  • 提高代码灵活性和复用性(不同场景传入不同回调函数)

优点

  • 灵活性高:行为可动态配置

  • 解耦:调用者和被调用者无需直接依赖

  • 适合事件/异步模型

缺点

  • 回调地狱(Callback Hell):多层嵌套回调导致代码难以阅读和维护

  • 错误处理复杂:异常传播困难

  • 调试困难:调用栈不直观

实现方式

  • 基础回调

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    # 需要回调的函数
    def greet(name):
    print(f"Hello, {name}!")

    # 主函数,将回调函数作为参数
    def process_user(name, callback):
    print("Processing user...")
    callback(name) # 回调!

    # 调用主函数
    process_user("Alice", greet)
    # 输出:
    # Processing user...
    # Hello, Alice!
  • 带返回值的回调

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    def square(x):
    return x * x

    # 调用回调函数计算
    def apply_operation(numbers, operation):
    return [operation(num) for num in numbers]

    # 输出结果
    result = apply_operation([1, 2, 3, 4], square)
    print(result) # [1, 4, 9, 16]
  • 匿名函数(lambda)的回调

    1
    2
    3
    numbers = [1, 2, 3, 4, 5]
    result = list(map(lambda x: x ** 2, numbers))
    print(result) # [1, 4, 9, 16, 25]
  • 事件驱动回调

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    class Button:
    def __init__(self):
    self.click_callbacks = []

    def on_click(self, callback):
    self.click_callbacks.append(callback)

    def click(self):
    print("Button clicked!")
    for callback in self.click_callbacks:
    callback()

    def say_hello():
    print("Hello from callback!")

    def log_click():
    print("Button was clicked at some time.")

    btn = Button()
    btn.on_click(say_hello)
    btn.on_click(log_click)

    btn.click()
    # 输出:
    # Button clicked!
    # Hello from callback!
    # Button was clicked at some time.

异步回调

  • 使用 threading 的异步回调

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import threading
    import time

    def async_task(callback):
    def worker():
    time.sleep(2) # 模拟耗时操作
    result = "Task completed!"
    callback(result)
    # 异步执行
    thread = threading.Thread(target=worker)
    thread.start()

    # 回调函数
    def handle_result(result):
    print(f"Received result: {result}")

    print("Starting async task...")
    async_task(handle_result)
  • 回调 + 参数绑定(functools)

    回调函数需要额外参数,但调用方只允许传入无参函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    from functools import partial

    def notify_user(user_id, message):
    print(f"User {user_id}: {message}")

    # 绑定部分参数
    callback = partial(notify_user, 123, "Your task is done!")

    # 模拟触发
    callback() # 输出: User 123: Your task is done!

弱引用

弱引用(Weak Reference) 是一种特殊的引用方式,它不会增加对象的引用计数,因此不会阻止对象被垃圾回收机制回收。

Python 使用引用计数作为主要的垃圾回收机制。当一个对象的引用计数变为 0 时,它就会被自动回收。

但在某些场景下,我们希望“观察”或“缓存”一个对象,但又不希望因为我们的引用而阻止它被回收 —— 这就是弱引用的用武之地。

  • 强引用(Strong Reference):普通的变量赋值、容器存储等,会增加引用计数。

  • 弱引用(Weak Reference):不会增加引用计数,对象可被正常回收。

  • 弱引用指向的对象一旦被回收,弱引用会自动失效(变成 None 或抛出异常)。

不是所有对象都支持弱引用!只有支持弱引用协议的对象才可以

支持弱引用的对象:

  • 用户自定义类的实例(默认支持)

  • 部分内置类型(如 list, dict 在某些 Python 版本中支持,但不推荐依赖)

不支持弱引用的对象:

  • int, str, tuple, float不可变内置类型(出于性能和实现原因)

  • 部分 C 扩展对象

使用 weakref

Python 提供了标准库 weakref 来创建和管理弱引用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import weakref

class MyClass:
def __init__(self, name):
self.name = name

def __del__(self):
print(f"{self.name} 被销毁了")

obj = MyClass("TestObject")
print("引用计数:", sys.getrefcount(obj) - 1) # 减1是因为getrefcount本身也引用了一次

# 创建弱引用
weak_obj = weakref.ref(obj)

print("弱引用指向的对象:", weak_obj()) # <__main__.MyClass object at 0x...>
print("是否存活:", weak_obj() is not None) # True

# 删除强引用
del obj

print("弱引用指向的对象:", weak_obj()) # None
print("是否存活:", weak_obj() is not None) # False

带回调的弱引用

可以在对象被回收时触发一个回调函数:

1
2
3
4
5
6
7
8
9
10
11
12
import weakref

def callback(ref):
print("对象已被回收,弱引用失效!")

obj = MyClass("CallbackObject")
weak_obj = weakref.ref(obj, callback) # 注册回调

del obj # 触发回收和回调
# 输出:
# CallbackObject 被销毁了
# 对象已被回收,弱引用失效!

弱引用字典

  • WeakKeyDictionary:键是弱引用(键可被回收)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import weakref

class Key:
def __init__(self, name):
self.name = name
def __repr__(self):
return f"Key({self.name})"

key1 = Key("A")
key2 = Key("B")

weak_dict = weakref.WeakKeyDictionary()
weak_dict[key1] = "value1"
weak_dict[key2] = "value2"

print("当前字典:", dict(weak_dict)) # {Key(A): 'value1', Key(B): 'value2'}

del key1
print("删除 key1 后:", dict(weak_dict)) # {Key(B): 'value2'}
  • WeakValueDictionary:值是弱引用(值可被回收)

    键或值必须是可哈希对象(通常是自定义类实例),不能是 list, dict 等不可哈希类型。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    weak_val_dict = weakref.WeakValueDictionary()
    obj1 = MyClass("Obj1")
    obj2 = MyClass("Obj2")

    weak_val_dict["a"] = obj1
    weak_val_dict["b"] = obj2

    print("当前字典:", dict(weak_val_dict))

    del obj1
    print("删除 obj1 后:", dict(weak_val_dict)) # 只剩 'b': obj2

弱引用集合

类似 set,但元素是弱引用:

1
2
3
4
5
6
7
8
9
10
11
12
weak_set = weakref.WeakSet()

obj1 = MyClass("SetObj1")
obj2 = MyClass("SetObj2")

weak_set.add(obj1)
weak_set.add(obj2)

print("当前集合:", list(weak_set))

del obj1
print("删除 obj1 后:", list(weak_set)) # 只剩 obj2

应用场景

  • 缓存系统,避免内存泄漏

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import weakref

    class DataCache:
    def __init__(self):
    self._cache = weakref.WeakValueDictionary()

    def get(self, key):
    return self._cache.get(key)

    def set(self, key, value):
    self._cache[key] = value

    cache = DataCache()
    big_data = MyClass("BigData")
    cache.set("data1", big_data)

    print(cache.get("data1")) # 存在
    del big_data
    print(cache.get("data1")) # None —— 自动清理!
  • 观察者模式 / 事件监听器,避免循环引用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    class Subject:
    def __init__(self):
    self._observers = weakref.WeakSet()

    def add_observer(self, observer):
    self._observers.add(observer)

    def notify(self):
    for obs in self._observers:
    obs.update()

    class Observer:
    def update(self):
    print("收到通知!")

    subject = Subject()
    observer = Observer()
    subject.add_observer(observer)
    subject.notify() # 收到通知!

    del observer # observer 被回收,自动从 WeakSet 中移除
    subject.notify() # 无输出,不会报错
  • 父子对象引用,避免循环引用导致内存泄漏

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    class Parent:
    def __init__(self, name):
    self.name = name
    self.children = []

    class Child:
    def __init__(self, name, parent):
    self.name = name
    self.parent = weakref.ref(parent) # 弱引用父对象,避免循环引用

    p = Parent("Parent1")
    c = Child("Child1", p)
    p.children.append(c)

    print(c.parent().name) # Parent1

    del p # 父对象可被正常回收
    print(c.parent()) # None

系统中的应用

支持传递多个参数+弱引用的回调函数使用

定义回调函数

支持多个参数:用于处理主函数完成后的操作:成功或失败的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def handle_workflow_completion(self, task_id, prompt_id, success, msg, **kwargs):
try:
with lock:
if success:
task.status = TaskStatus.SUCCESS.value
task.task_msg = f"任务执行成功,工作流已完成:{msg}"
self.on_complete(task, prompt_id, file_names)

else:
if task.execution_count <= self.task_max_retry:
task.status = TaskStatus.QUEUED.value
task.task_msg = msg
task.end_time = None # 清除结束时间
else:
self.on_error(task, f"任务执行失败:已重试超过{self.task_max_retry}次,{msg}")
async_send_failure_email(task_id, task.task_type, task.task_msg)

except Exception as e:
print_log_exception()

定义主函数

主函数内部可以继续使用回调函数:on_completeon_error

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def async_run_workflow(self, on_complete=None, on_error=None, task_id=None):
try:
if true:
on_complete(task_id, prompt_id)

workflow_status_checker.check_workflow_status_async(
# on_complete=weakref.WeakMethod(task_callback_handler.handle_workflow_completion),
# on_timeout=weakref.WeakMethod(task_callback_handler.handle_workflow_timeout),
on_complete=task_callback_handler.handle_workflow_completion,
on_timeout=task_callback_handler.handle_workflow_timeout,
prompt_id=prompt_id,
task_id=task_id
)

return prompt_id

except Exception as e:
print_log_exception()
on_error(task_id, {str(e)})
return ""

定义执行函数

执行主函数,调用回调函数

1
2
3
4
5
6
7
8
9
def check_workflow_status_async(self, prompt_id: str,
on_complete: Callable[[str, bool], None],
task_id: str = None) -> str:
if true:
self.callback_with_complete(task_id, prompt_id, True, "成功", on_complete)

else:
# 执行完成回调,标记为失败
self.callback_with_complete(task_id, prompt_id, False, "工作流执行出错,错误", on_complete)

弱引用调用回调函数

使用 functools 包装所有参数,使用 weakref 调用 functools

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def callback_with_complete(self, task_id: str, prompt_id: str, success: bool, msg: str, on_complete):
# 执行完成回调,标记为失败
try:
callback_with_args = functools.partial(
on_complete,
task_id,
prompt_id,
success,
output_name,
msg
)

weak_callback = weakref.ref(callback_with_args)
# 调用弱引用回调
if weak_callback() is not None:
weak_callback()()
else:
warning("weak_callback_complete 对象已被垃圾回收")

except Exception as e:
error(f"weak_callback_complete 执行完成回调时出错: {str(e)}")
print_log_exception()