在Python开发中,有限状态机(FSM)是处理对象状态流转的经典方案。transitions库凭借其简洁的API和强大的扩展能力,成为实现状态机的首选工具。本文将系统讲解transitions的核心用法,并结合异步SQLAlchemy 2.0与PostgreSQL,演示如何在真实业务中持久化状态数据。
一、安装与基础概念
推荐使用uv管理依赖:- uv pip install transitions
- # 如需绘制状态转移图(需安装graphviz)
- uv pip install transitions[diagrams]
复制代码
transitions 由 States(状态)、Transitions(转换规则)和 Machine(状态机)三要素组成。以下是最简示例:- from transitions import Machine
- states = ['off', 'on']
- transitions = [
- {'trigger': 'turn_on', 'source': 'off', 'dest': 'on'},
- {'trigger': 'turn_off', 'source': 'on', 'dest': 'off'}
- ]
- machine = Machine(states=states, transitions=transitions, initial='off')
- print(machine.state) # off
- machine.turn_on()
- print(machine.state) # on
复制代码 默认情况下,Machine实例本身充当模型。实际开发中,通常将状态机绑定到自定义业务对象。
二、绑定自定义业务模型
通过model参数将状态机注入到业务类的实例中,触发器和state属性将动态添加到该对象上:- class LightBulb:
- def __init__(self):
- self.brightness = 100
- bulb = LightBulb()
- states = ['off', 'on', 'broken']
- machine = Machine(model=bulb, states=states, initial='off')
- machine.add_transition(trigger='burn_out', source='*', dest='broken')
- machine.add_transition(trigger='turn_on', source='off', dest='on')
- print(bulb.state) # off
- bulb.turn_on()
- print(bulb.state) # on
复制代码
三、条件限制与异常处理
使用conditions或unless参数控制转换是否发生,只有当条件函数返回True(或unless返回False)时,转换才会执行。设置ignore_invalid_triggers=True可避免无效触发时抛出异常:- class Water:
- def __init__(self):
- self.temperature = 20
- def is_hot(self):
- return self.temperature >= 100
- water = Water()
- machine = Machine(model=water, states=['liquid', 'gas'], initial='liquid', ignore_invalid_triggers=True)
- machine.add_transition(trigger='evaporate', source='liquid', dest='gas', conditions='is_hot')
- success = water.evaporate()
- print(success) # False
- water.temperature = 100
- success = water.evaporate()
- print(success) # True
复制代码
四、生命周期回调函数
transitions 提供多个钩子:prepare(转换开始前,条件未判断时触发)、before(条件满足后触发)、after(转换成功后触发)。示例:- class Hero:
- def wear_armor(self):
- print("[Callback] 穿上战甲!")
- def log_success(self):
- print("[Callback] 状态成功转换。")
- hero = Hero()
- states = ['normal', 'combat']
- machine = Machine(model=hero, states=states, initial='normal')
- machine.add_transition(trigger='encounter_enemy', source='normal', dest='combat',
- before='wear_armor', after='log_success')
- hero.encounter_enemy()
- # 输出:
- # [Callback] 穿上战甲!
- # [Callback] 状态成功转换。
复制代码
五、分层/嵌套状态机
当业务状态具有层级关系时,使用HierarchicalMachine。子状态名称通过父状态加下划线拼接(例如on_standby):- from transitions.extensions import HierarchicalMachine as Machine
- states = [
- 'off',
- {'name': 'on', 'children': ['standby', 'working'], 'initial': 'standby'}
- ]
- machine = Machine(states=states, initial='off')
- machine.add_transition('power_on', 'off', 'on')
- machine.add_transition('activate', 'on_standby', 'on_working')
- machine.add_transition('power_off', 'on', 'off')
- print(machine.state) # off
- machine.power_on()
- print(machine.state) # on_standby
- machine.activate()
- print(machine.state) # on_working
复制代码
六、异步状态机
在异步上下文(如FastAPI)中,使用AsyncMachine。所有回调和触发器均以协程形式执行:- import asyncio
- from transitions.extensions.asyncio import AsyncMachine
- class AsyncTask:
- async def notify_server(self):
- print("开始同步")
- await asyncio.sleep(0.5)
- print("同步完成")
- task = AsyncTask()
- machine = AsyncMachine(model=task, states=['pending', 'completed'], initial='pending')
- machine.add_transition(trigger='complete', source='pending', dest='completed', before='notify_server')
- async def main():
- await task.complete()
- print(f"状态: {task.state}")
- asyncio.run(main())
复制代码
七、数据库持久化实践(异步SQLAlchemy 2.0 + PostgreSQL)
真实业务中需要将状态持久化到数据库。结合transitions与SQLAlchemy,通过model_attribute参数指定ORM字段,状态变更会自动反映到该字段上。
1. 安装依赖- uv pip install sqlalchemy asyncpg transitions
复制代码
2. 定义数据库模型与全局状态机- # database.py
- import asyncio
- from sqlalchemy import String, Integer
- from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, DeclarativeBase
- from sqlalchemy.orm import Mapped, mapped_column
- from transitions import Machine
- DATABASE_URL = "postgresql+asyncpg://postgres:password@localhost:5432/mydb"
- engine = create_async_engine(DATABASE_URL, echo=True)
- async_session = async_sessionmaker(engine, expire_on_commit=False)
- class Base(DeclarativeBase):
- pass
- class Order(Base):
- __tablename__ = "orders"
- id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
- title: Mapped[str] = mapped_column(String(100))
- status: Mapped[str] = mapped_column(String(50), default="created")
- states = ["created", "paid", "shipped", "completed", "cancelled"]
- order_machine = Machine(
- model=None,
- states=states,
- initial="created",
- model_attribute="status",
- ignore_invalid_triggers=True
- )
- order_machine.add_transition(trigger="pay", source="created", dest="paid")
- order_machine.add_transition(trigger="ship", source="paid", dest="shipped")
- order_machine.add_transition(trigger="complete", source="shipped", dest="completed")
- order_machine.add_transition(trigger="cancel", source=["created", "paid"], dest="cancelled")
复制代码
3. 业务层使用:查询、转换与持久化- # service.py
- from database import async_session, Order, order_machine
- from sqlalchemy import select
- async def create_new_order(title: str) -> int:
- async with async_session() as session:
- async with session.begin():
- new_order = Order(title=title)
- session.add(new_order)
- return new_order.id
- async def process_order_payment(order_id: int):
- async with async_session() as session:
- async with session.begin():
- result = await session.execute(select(Order).where(Order.id == order_id))
- order = result.scalar_one_or_none()
- if not order:
- print("订单不存在")
- return
- order_machine.add_model(order)
- success = order.pay()
- if success:
- print(f"状态变更为 {order.status}")
- else:
- print("转换失败")
- order_machine.remove_model(order)
复制代码
4. 进阶:在生命周期回调中读写数据库
若需在状态变更时记录日志,可以结合AsyncMachine与回调,将数据库session作为参数传入:- from transitions.extensions.asyncio import AsyncMachine
- class OrderWithCallback(Order):
- async def log_status_change(self, event_data):
- session = event_data.kwargs.get("session")
- if session:
- print(f"订单 {self.id} 状态从 {event_data.transition.source} 变更为 {event_data.transition.dest}")
- # 可执行额外DB操作,如 session.add(OrderLog(...))
- async_order_machine = AsyncMachine(
- model=None,
- states=states,
- initial="created",
- model_attribute="status"
- )
- async_order_machine.add_transition(
- trigger="pay",
- source="created",
- dest="paid",
- after="log_status_change"
- )
- async def pay_with_callback(order_id: int):
- async with async_session() as session:
- async with session.begin():
- result = await session.execute(select(OrderWithCallback).where(OrderWithCallback.id == order_id))
- order = result.scalar_one()
- async_order_machine.add_model(order)
- await order.pay(session=session)
复制代码
最佳实践总结
- 单例状态机:全局只创建一个Machine实例,通过add_model/remove_model动态绑定查询出的数据库实体。
- model_attribute:必须设置该参数,指向ORM托管的字段(如status),否则状态机默认读写state属性。
- 事务一致性:状态转移仅修改内存中的属性,务必在同一个数据库事务中提交session.commit(),确保持久化一致性。
通过以上步骤,你可以轻松将transitions集成到基于SQLAlchemy和PostgreSQL的项目中,实现健壮的状态管理。 |