LangGraph持久化报错
大约 2 分钟
1. 使用MySQL作为CheckPoint后端时报错
1.1 问题
asyncmy.errors.OperationalError: (1267, "Illegal mix of collations (utf8mb4_general_ci,IMPLICIT) and (utf8mb4_0900_ai_ci,IMPLICIT) for operation '='")`
1.2 原因
数据库表的排序规则不一致,官方也有这个issue,但是目前还是Open的LangGraph: Checkpointer causes MySQL collation errors (1267/1366) + Hindi TTS loses Devanagari matras
1.3 临时方案
打印了LangGraph的Migration脚本:
python -c "
from langgraph.checkpoint.mysql.asyncmy import AsyncMySaver
print('MIGRATIONS内容:')
for i, migration in enumerate(AsyncMySaver.MIGRATIONS):
print(f'Migration {i}:')
print(migration)
print('=' * 50)
"
自己手动建表,并设置COLLATE
#!/usr/bin/env python3
"""
重新创建 langgraph checkpoint 表,确保使用正确的字符集和排序规则
"""
import asyncio
import sys
import os
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.config.config import cfg
import asyncmy
async def reset_checkpoint_tables():
"""重新创建 checkpoint 表"""
db_config = cfg.database
# 连接到数据库
connection = await asyncmy.connect(
host=db_config.host,
port=db_config.port,
user=db_config.user,
password=db_config.password,
database=db_config.database,
charset='utf8mb4'
)
try:
async with connection.cursor() as cursor:
# 设置会话的排序规则
await cursor.execute("SET collation_connection = 'utf8mb4_0900_ai_ci'")
await cursor.execute("SET collation_database = 'utf8mb4_0900_ai_ci'")
print("删除现有的 checkpoint 表...")
# 删除现有表
await cursor.execute("DROP TABLE IF EXISTS checkpoints")
await cursor.execute("DROP TABLE IF EXISTS checkpoint_blobs")
await cursor.execute("DROP TABLE IF EXISTS checkpoint_writes")
print("创建新的 checkpoint 表...")
# 创建 checkpoints 表
await cursor.execute("""
CREATE TABLE checkpoints (
thread_id VARCHAR(100) NOT NULL,
checkpoint_ns VARCHAR(100) NOT NULL DEFAULT '',
checkpoint_ns_hash BINARY(16) NOT NULL,
checkpoint_id VARCHAR(100) NOT NULL,
parent_checkpoint_id VARCHAR(100),
checkpoint JSON NOT NULL,
metadata JSON NOT NULL,
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
""")
# 创建 checkpoint_blobs 表
await cursor.execute("""
CREATE TABLE checkpoint_blobs (
thread_id VARCHAR(100) NOT NULL,
checkpoint_ns VARCHAR(100) NOT NULL DEFAULT '',
checkpoint_ns_hash BINARY(16) NOT NULL,
channel VARCHAR(100) NOT NULL,
version VARCHAR(100) NOT NULL,
type VARCHAR(128) NOT NULL,
`blob` LONGBLOB,
PRIMARY KEY (thread_id, checkpoint_ns, channel, version)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
""")
# 创建 checkpoint_writes 表
await cursor.execute("""
CREATE TABLE checkpoint_writes (
thread_id VARCHAR(100) NOT NULL,
checkpoint_ns VARCHAR(100) NOT NULL DEFAULT '',
checkpoint_ns_hash BINARY(16) NOT NULL,
checkpoint_id VARCHAR(100) NOT NULL,
task_id VARCHAR(100) NOT NULL,
task_path VARCHAR(200),
idx INTEGER NOT NULL,
channel VARCHAR(100) NOT NULL,
type VARCHAR(128),
`blob` LONGBLOB,
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
""")
await connection.commit()
except Exception as e:
await connection.rollback()
finally:
await connection.ensure_closed()
if __name__ == "__main__":
asyncio.run(reset_checkpoint_tables())