查看: 154|回复: 1

Python transitions库实战:从基础状态机到SQLAlchemy+PostgreSQL持久化

[复制链接]
发表于 3 小时前 | 显示全部楼层 |阅读模式
在Python开发中,有限状态机(FSM)是处理对象状态流转的经典方案。transitions库凭借其简洁的API和强大的扩展能力,成为实现状态机的首选工具。本文将系统讲解transitions的核心用法,并结合异步SQLAlchemy 2.0与PostgreSQL,演示如何在真实业务中持久化状态数据。

一、安装与基础概念

推荐使用uv管理依赖:
  1. uv pip install transitions
  2. # 如需绘制状态转移图(需安装graphviz)
  3. uv pip install transitions[diagrams]
复制代码

transitions 由 States(状态)、Transitions(转换规则)和 Machine(状态机)三要素组成。以下是最简示例:
  1. from transitions import Machine
  2. states = ['off', 'on']
  3. transitions = [
  4.     {'trigger': 'turn_on', 'source': 'off', 'dest': 'on'},
  5.     {'trigger': 'turn_off', 'source': 'on', 'dest': 'off'}
  6. ]
  7. machine = Machine(states=states, transitions=transitions, initial='off')
  8. print(machine.state)  # off
  9. machine.turn_on()
  10. print(machine.state)  # on
复制代码
默认情况下,Machine实例本身充当模型。实际开发中,通常将状态机绑定到自定义业务对象。

二、绑定自定义业务模型

通过model参数将状态机注入到业务类的实例中,触发器和state属性将动态添加到该对象上:
  1. class LightBulb:
  2.     def __init__(self):
  3.         self.brightness = 100
  4. bulb = LightBulb()
  5. states = ['off', 'on', 'broken']
  6. machine = Machine(model=bulb, states=states, initial='off')
  7. machine.add_transition(trigger='burn_out', source='*', dest='broken')
  8. machine.add_transition(trigger='turn_on', source='off', dest='on')
  9. print(bulb.state)  # off
  10. bulb.turn_on()
  11. print(bulb.state)  # on
复制代码

三、条件限制与异常处理

使用conditions或unless参数控制转换是否发生,只有当条件函数返回True(或unless返回False)时,转换才会执行。设置ignore_invalid_triggers=True可避免无效触发时抛出异常:
  1. class Water:
  2.     def __init__(self):
  3.         self.temperature = 20
  4.     def is_hot(self):
  5.         return self.temperature >= 100
  6. water = Water()
  7. machine = Machine(model=water, states=['liquid', 'gas'], initial='liquid', ignore_invalid_triggers=True)
  8. machine.add_transition(trigger='evaporate', source='liquid', dest='gas', conditions='is_hot')
  9. success = water.evaporate()
  10. print(success)  # False
  11. water.temperature = 100
  12. success = water.evaporate()
  13. print(success)  # True
复制代码

四、生命周期回调函数

transitions 提供多个钩子:prepare(转换开始前,条件未判断时触发)、before(条件满足后触发)、after(转换成功后触发)。示例:
  1. class Hero:
  2.     def wear_armor(self):
  3.         print("[Callback] 穿上战甲!")
  4.     def log_success(self):
  5.         print("[Callback] 状态成功转换。")
  6. hero = Hero()
  7. states = ['normal', 'combat']
  8. machine = Machine(model=hero, states=states, initial='normal')
  9. machine.add_transition(trigger='encounter_enemy', source='normal', dest='combat',
  10.                        before='wear_armor', after='log_success')
  11. hero.encounter_enemy()
  12. # 输出:
  13. # [Callback] 穿上战甲!
  14. # [Callback] 状态成功转换。
复制代码

五、分层/嵌套状态机

当业务状态具有层级关系时,使用HierarchicalMachine。子状态名称通过父状态加下划线拼接(例如on_standby):
  1. from transitions.extensions import HierarchicalMachine as Machine
  2. states = [
  3.     'off',
  4.     {'name': 'on', 'children': ['standby', 'working'], 'initial': 'standby'}
  5. ]
  6. machine = Machine(states=states, initial='off')
  7. machine.add_transition('power_on', 'off', 'on')
  8. machine.add_transition('activate', 'on_standby', 'on_working')
  9. machine.add_transition('power_off', 'on', 'off')
  10. print(machine.state)  # off
  11. machine.power_on()
  12. print(machine.state)  # on_standby
  13. machine.activate()
  14. print(machine.state)  # on_working
复制代码

六、异步状态机

在异步上下文(如FastAPI)中,使用AsyncMachine。所有回调和触发器均以协程形式执行:
  1. import asyncio
  2. from transitions.extensions.asyncio import AsyncMachine
  3. class AsyncTask:
  4.     async def notify_server(self):
  5.         print("开始同步")
  6.         await asyncio.sleep(0.5)
  7.         print("同步完成")
  8. task = AsyncTask()
  9. machine = AsyncMachine(model=task, states=['pending', 'completed'], initial='pending')
  10. machine.add_transition(trigger='complete', source='pending', dest='completed', before='notify_server')
  11. async def main():
  12.     await task.complete()
  13.     print(f"状态: {task.state}")
  14. asyncio.run(main())
复制代码

七、数据库持久化实践(异步SQLAlchemy 2.0 + PostgreSQL)

真实业务中需要将状态持久化到数据库。结合transitions与SQLAlchemy,通过model_attribute参数指定ORM字段,状态变更会自动反映到该字段上。

1. 安装依赖
  1. uv pip install sqlalchemy asyncpg transitions
复制代码

2. 定义数据库模型与全局状态机
  1. # database.py
  2. import asyncio
  3. from sqlalchemy import String, Integer
  4. from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, DeclarativeBase
  5. from sqlalchemy.orm import Mapped, mapped_column
  6. from transitions import Machine
  7. DATABASE_URL = "postgresql+asyncpg://postgres:password@localhost:5432/mydb"
  8. engine = create_async_engine(DATABASE_URL, echo=True)
  9. async_session = async_sessionmaker(engine, expire_on_commit=False)
  10. class Base(DeclarativeBase):
  11.     pass
  12. class Order(Base):
  13.     __tablename__ = "orders"
  14.     id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
  15.     title: Mapped[str] = mapped_column(String(100))
  16.     status: Mapped[str] = mapped_column(String(50), default="created")
  17. states = ["created", "paid", "shipped", "completed", "cancelled"]
  18. order_machine = Machine(
  19.     model=None,
  20.     states=states,
  21.     initial="created",
  22.     model_attribute="status",
  23.     ignore_invalid_triggers=True
  24. )
  25. order_machine.add_transition(trigger="pay", source="created", dest="paid")
  26. order_machine.add_transition(trigger="ship", source="paid", dest="shipped")
  27. order_machine.add_transition(trigger="complete", source="shipped", dest="completed")
  28. order_machine.add_transition(trigger="cancel", source=["created", "paid"], dest="cancelled")
复制代码

3. 业务层使用:查询、转换与持久化
  1. # service.py
  2. from database import async_session, Order, order_machine
  3. from sqlalchemy import select
  4. async def create_new_order(title: str) -> int:
  5.     async with async_session() as session:
  6.         async with session.begin():
  7.             new_order = Order(title=title)
  8.             session.add(new_order)
  9.             return new_order.id
  10. async def process_order_payment(order_id: int):
  11.     async with async_session() as session:
  12.         async with session.begin():
  13.             result = await session.execute(select(Order).where(Order.id == order_id))
  14.             order = result.scalar_one_or_none()
  15.             if not order:
  16.                 print("订单不存在")
  17.                 return
  18.             order_machine.add_model(order)
  19.             success = order.pay()
  20.             if success:
  21.                 print(f"状态变更为 {order.status}")
  22.             else:
  23.                 print("转换失败")
  24.             order_machine.remove_model(order)
复制代码

4. 进阶:在生命周期回调中读写数据库

若需在状态变更时记录日志,可以结合AsyncMachine与回调,将数据库session作为参数传入:
  1. from transitions.extensions.asyncio import AsyncMachine
  2. class OrderWithCallback(Order):
  3.     async def log_status_change(self, event_data):
  4.         session = event_data.kwargs.get("session")
  5.         if session:
  6.             print(f"订单 {self.id} 状态从 {event_data.transition.source} 变更为 {event_data.transition.dest}")
  7.             # 可执行额外DB操作,如 session.add(OrderLog(...))
  8. async_order_machine = AsyncMachine(
  9.     model=None,
  10.     states=states,
  11.     initial="created",
  12.     model_attribute="status"
  13. )
  14. async_order_machine.add_transition(
  15.     trigger="pay",
  16.     source="created",
  17.     dest="paid",
  18.     after="log_status_change"
  19. )
  20. async def pay_with_callback(order_id: int):
  21.     async with async_session() as session:
  22.         async with session.begin():
  23.             result = await session.execute(select(OrderWithCallback).where(OrderWithCallback.id == order_id))
  24.             order = result.scalar_one()
  25.             async_order_machine.add_model(order)
  26.             await order.pay(session=session)
复制代码

最佳实践总结

- 单例状态机:全局只创建一个Machine实例,通过add_model/remove_model动态绑定查询出的数据库实体。
- model_attribute:必须设置该参数,指向ORM托管的字段(如status),否则状态机默认读写state属性。
- 事务一致性:状态转移仅修改内存中的属性,务必在同一个数据库事务中提交session.commit(),确保持久化一致性。

通过以上步骤,你可以轻松将transitions集成到基于SQLAlchemy和PostgreSQL的项目中,实现健壮的状态管理。
回复

使用道具 举报

发表于 3 小时前 | 显示全部楼层

Re: Python transitions库实战:从基础状态机到SQLAlchemy+PostgreSQL持久化

这篇教程写得非常系统,从最基础的状态机构建到绑定自定义模型、条件限制、回调钩子、分层状态机,再到异步场景,层层递进逻辑清晰。尤其喜欢把 `conditions` 和 `ignore_invalid_triggers` 放在一起讲,实际项目里这两个搭配确实能避免很多烦人的异常处理。期待继续看到第七部分关于 PostgreSQL 持久化的具体实现,以及异步 SQLAlchemy 2.0 下 `on_enter`/`on_exit` 回调结合 async session 的写法。谢谢分享!
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

指导单位

江苏省公安厅

江苏省通信管理局

浙江省台州刑侦支队

DEFCON GROUP 86025

Hacking Group 021A

旗下站点

态势感知中心

应急响应中心

红盟安全

联系我们

官方QQ群:112851260

官方邮箱:security#ihonker.org(#改成@)

官方核心成员

关注微信公众号

Archiver|手机版|小黑屋| ( 沪ICP备2021026908号 )

GMT+8, 2026-6-10 19:15 , Processed in 0.024919 second(s), 18 queries , Gzip On, Redis On.

Powered by ihonker.com

Copyright © 2015-现在.

  • 返回顶部