跳至主要內容

LangGraph持久化报错

pptg大约 2 分钟

1. 使用MySQL作为CheckPoint后端时报错

1.1 问题

asyncmy.errors.OperationalError: (1267, "Illegal mix of collations (utf8mb4_general_ci,IMPLICIT) and (utf8mb4_0900_ai_ci,IMPLICIT) for operation '='")`

1.2 原因

数据库表的排序规则不一致,官方也有这个issue,但是目前还是Open的LangGraph: Checkpointer causes MySQL collation errors (1267/1366) + Hindi TTS loses Devanagari matrasopen in new window

1.3 临时方案

打印了LangGraph的Migration脚本:

python -c "
from langgraph.checkpoint.mysql.asyncmy import AsyncMySaver
print('MIGRATIONS内容:')
for i, migration in enumerate(AsyncMySaver.MIGRATIONS):
    print(f'Migration {i}:')
    print(migration)
    print('=' * 50)
" 

自己手动建表,并设置COLLATE

#!/usr/bin/env python3
"""
重新创建 langgraph checkpoint 表,确保使用正确的字符集和排序规则
"""
import asyncio
import sys
import os

# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

from app.config.config import cfg
import asyncmy


async def reset_checkpoint_tables():
    """重新创建 checkpoint 表"""
    db_config = cfg.database
    
    # 连接到数据库
    connection = await asyncmy.connect(
        host=db_config.host,
        port=db_config.port,
        user=db_config.user,
        password=db_config.password,
        database=db_config.database,
        charset='utf8mb4'
    )
    
    try:
        async with connection.cursor() as cursor:
            # 设置会话的排序规则
            await cursor.execute("SET collation_connection = 'utf8mb4_0900_ai_ci'")
            await cursor.execute("SET collation_database = 'utf8mb4_0900_ai_ci'")
            
            print("删除现有的 checkpoint 表...")
            
            # 删除现有表
            await cursor.execute("DROP TABLE IF EXISTS checkpoints")
            await cursor.execute("DROP TABLE IF EXISTS checkpoint_blobs")
            await cursor.execute("DROP TABLE IF EXISTS checkpoint_writes")
            
            print("创建新的 checkpoint 表...")
            
            # 创建 checkpoints 表
            await cursor.execute("""
                CREATE TABLE checkpoints (
                    thread_id VARCHAR(100) NOT NULL,
                    checkpoint_ns VARCHAR(100) NOT NULL DEFAULT '',
                    checkpoint_ns_hash BINARY(16) NOT NULL,
                    checkpoint_id VARCHAR(100) NOT NULL,
                    parent_checkpoint_id VARCHAR(100),
                    checkpoint JSON NOT NULL,
                    metadata JSON NOT NULL,
                    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
            """)
            
            # 创建 checkpoint_blobs 表
            await cursor.execute("""
                CREATE TABLE checkpoint_blobs (
                    thread_id VARCHAR(100) NOT NULL,
                    checkpoint_ns VARCHAR(100) NOT NULL DEFAULT '',
                    checkpoint_ns_hash BINARY(16) NOT NULL,
                    channel VARCHAR(100) NOT NULL,
                    version VARCHAR(100) NOT NULL,
                    type VARCHAR(128) NOT NULL,
                    `blob` LONGBLOB,
                    PRIMARY KEY (thread_id, checkpoint_ns, channel, version)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
            """)
            
            # 创建 checkpoint_writes 表
            await cursor.execute("""
                CREATE TABLE checkpoint_writes (
                    thread_id VARCHAR(100) NOT NULL,
                    checkpoint_ns VARCHAR(100) NOT NULL DEFAULT '',
                    checkpoint_ns_hash BINARY(16) NOT NULL,
                    checkpoint_id VARCHAR(100) NOT NULL,
                    task_id VARCHAR(100) NOT NULL,
                    task_path VARCHAR(200),
                    idx INTEGER NOT NULL,
                    channel VARCHAR(100) NOT NULL,
                    type VARCHAR(128),
                    `blob` LONGBLOB,
                    PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
            """)
            
            await connection.commit()
            
    except Exception as e:
        await connection.rollback()
    finally:
        await connection.ensure_closed()


if __name__ == "__main__":
    asyncio.run(reset_checkpoint_tables())