Commit 37a3151d authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

fix: decouple import errors & fix history API to query langgraph_chat_histories

- experiment_log_route: self-contained _get_pool/_now (remove notes_route dep)
- roadmap_flow_route: self-contained _get_pool/_now (remove notes_route dep)
- check_history_route: query langgraph_chat_histories via async pool
  (was incorrectly querying dashboard_canifa.chat_history with wrong columns)
- real_user_history/check_history_route: same fix
- Verified: identity 468707 returns full chat history with products
parent eea82e60
Pipeline #3434 failed with stage
...@@ -9,8 +9,24 @@ from collections import defaultdict ...@@ -9,8 +9,24 @@ from collections import defaultdict
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, HTTPException
from pydantic import BaseModel from pydantic import BaseModel
from psycopg_pool import AsyncConnectionPool
from api.notes.notes_route import _get_pool, _now from config import CHECKPOINT_POSTGRES_URL
VN_TZ = timezone(timedelta(hours=7))
_pool: AsyncConnectionPool | None = None
async def _get_pool() -> AsyncConnectionPool:
global _pool
if _pool is None:
_pool = AsyncConnectionPool(CHECKPOINT_POSTGRES_URL, min_size=1, max_size=5, open=False)
await _pool.open()
return _pool
def _now() -> datetime:
return datetime.now(VN_TZ)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/dashboard", tags=["Experiment Log"]) router = APIRouter(prefix="/api/dashboard", tags=["Experiment Log"])
......
""" """
Chat History API Routes (Check/Debug) Chat History API Routes (Check/Debug)
- GET /api/check-history/{identity_key} - Lấy lịch sử chat (có phân trang, không giới hạn thời gian) - GET /api/check-history/{identity_key} - Lấy lịch sử chat từ Postgres bảng langgraph_chat_histories
- DELETE /api/check-history/{identity_key} - Xóa lịch sử chat
Note: identity_key có thể là device_id (guest) hoặc user_id (đã login) Note: identity_key có thể là device_id (guest) hoặc user_id (đã login)
""" """
import json
import logging import logging
from typing import Any from typing import Any
from fastapi import APIRouter, HTTPException, Request from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import JSONResponse from psycopg_pool import AsyncConnectionPool
from pydantic import BaseModel from pydantic import BaseModel
from common.cache import redis_cache
from common.conversation_manager import get_conversation_manager
from common.rate_limit import rate_limit_service from common.rate_limit import rate_limit_service
from common.reset_limit import reset_limit_service from config import CHECKPOINT_POSTGRES_URL
router = APIRouter(tags=["Chat History"]) router = APIRouter(tags=["Chat History"])
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Async pool singleton cho CHECKPOINT DB
_pool: AsyncConnectionPool | None = None
async def _get_pool() -> AsyncConnectionPool:
global _pool
if _pool is None:
_pool = AsyncConnectionPool(CHECKPOINT_POSTGRES_URL, min_size=1, max_size=5, open=False)
await _pool.open()
return _pool
class ChatHistoryResponse(BaseModel): class ChatHistoryResponse(BaseModel):
data: list[dict[str, Any]] data: list[dict[str, Any]]
next_cursor: int | None = None next_cursor: int | None = None
class ClearHistoryResponse(BaseModel):
success: bool
message: str
@router.get("/api/check-history/{identity_key}", summary="Get Chat History (Check)", response_model=ChatHistoryResponse) @router.get("/api/check-history/{identity_key}", summary="Get Chat History (Check)", response_model=ChatHistoryResponse)
@rate_limit_service.limiter.limit("30/minute") @rate_limit_service.limiter.limit("30/minute")
async def get_chat_history( async def get_chat_history(
...@@ -43,10 +47,11 @@ async def get_chat_history( ...@@ -43,10 +47,11 @@ async def get_chat_history(
date_to: str | None = None, date_to: str | None = None,
): ):
""" """
Lấy lịch sử chat theo identity_key (có phân trang). Lấy lịch sử chat theo identity_key trực tiếp từ Postgres (bảng langgraph_chat_histories).
- date_from: YYYY-MM-DD (inclusive) - date_from: YYYY-MM-DD (inclusive)
- date_to: YYYY-MM-DD (inclusive) - date_to: YYYY-MM-DD (inclusive)
- before_id: cursor pagination (lấy các tin nhắn có id < before_id)
""" """
try: try:
if not identity_key or identity_key.strip() == "": if not identity_key or identity_key.strip() == "":
...@@ -54,20 +59,74 @@ async def get_chat_history( ...@@ -54,20 +59,74 @@ async def get_chat_history(
logger.info(f"GET History: identity={identity_key} | limit={limit} | date={date_from}~{date_to}") logger.info(f"GET History: identity={identity_key} | limit={limit} | date={date_from}~{date_to}")
manager = await get_conversation_manager() # Build dynamic query against langgraph_chat_histories
history = await manager.get_chat_history( conditions = ["identity_key = %s"]
identity_key, params: list[Any] = [identity_key]
limit=limit,
before_id=before_id, if before_id:
skip_date_filter=True, conditions.append("id < %s")
date_from=date_from, params.append(before_id)
date_to=date_to,
) if date_from:
conditions.append("timestamp >= %s::timestamptz")
params.append(f"{date_from} 00:00:00+07")
if date_to:
conditions.append("timestamp <= %s::timestamptz")
params.append(f"{date_to} 23:59:59+07")
where_clause = " AND ".join(conditions)
safe_limit = min(limit or 50, 200)
params.append(safe_limit)
query = f"""
SELECT id, identity_key, message, is_human, timestamp
FROM langgraph_chat_histories
WHERE {where_clause}
ORDER BY id DESC
LIMIT %s
"""
pool = await _get_pool()
async with pool.connection() as conn:
async with conn.cursor() as cur:
await cur.execute("SET timezone = 'Asia/Ho_Chi_Minh'")
await cur.execute(query, tuple(params))
rows = await cur.fetchall()
columns = [desc[0] for desc in cur.description] if cur.description else []
history = []
for row in rows:
item = dict(zip(columns, row))
# Convert timestamp to ISO string for JSON
ts = item.get("timestamp")
if ts and hasattr(ts, "isoformat"):
item["timestamp"] = ts.isoformat()
elif ts:
item["timestamp"] = str(ts)
is_human = item.get("is_human", False)
message_content = item.get("message", "")
if not is_human:
# Parse product_ids from AI message JSON
try:
parsed = json.loads(message_content)
item["message"] = parsed.get("ai_response", message_content)
item["product_ids"] = parsed.get("product_ids", parsed.get("products", []))
except (json.JSONDecodeError, TypeError):
item["product_ids"] = []
history.append(item)
next_cursor = history[-1]["id"] if history else None next_cursor = history[-1]["id"] if history else None
logger.info(f"✅ Fetched {len(history)} messages for {identity_key}") logger.info(f"✅ Fetched {len(history)} messages for {identity_key} from Postgres (langgraph_chat_histories)")
return {"data": history, "next_cursor": next_cursor} return {"data": history, "next_cursor": next_cursor}
except Exception as e:
logger.error(f"Error fetching chat history: {e}")
raise HTTPException(status_code=500, detail="Failed to fetch chat history") from e
except HTTPException:
raise
except Exception as e:
logger.error(f"Error fetching chat history from Postgres: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to fetch chat history: {str(e)}") from e
"""
Chat History API Routes (Check/Debug)
- GET /api/check-history/{identity_key} - Lấy lịch sử chat từ Postgres bảng langgraph_chat_histories
Note: identity_key có thể là device_id (guest) hoặc user_id (đã login)
"""
import json
import logging
from typing import Any
from fastapi import APIRouter, HTTPException, Request
from psycopg_pool import AsyncConnectionPool
from pydantic import BaseModel
from common.rate_limit import rate_limit_service
from config import CHECKPOINT_POSTGRES_URL
router = APIRouter(tags=["Chat History"])
logger = logging.getLogger(__name__)
# Async pool singleton cho CHECKPOINT DB
_pool: AsyncConnectionPool | None = None
async def _get_pool() -> AsyncConnectionPool:
global _pool
if _pool is None:
_pool = AsyncConnectionPool(CHECKPOINT_POSTGRES_URL, min_size=1, max_size=5, open=False)
await _pool.open()
return _pool
class ChatHistoryResponse(BaseModel):
data: list[dict[str, Any]]
next_cursor: int | None = None
@router.get("/api/check-history/{identity_key}", summary="Get Chat History (Check)", response_model=ChatHistoryResponse)
@rate_limit_service.limiter.limit("30/minute")
async def get_chat_history(
request: Request,
identity_key: str,
limit: int | None = 50,
before_id: int | None = None,
date_from: str | None = None,
date_to: str | None = None,
):
"""
Lấy lịch sử chat theo identity_key trực tiếp từ Postgres (bảng langgraph_chat_histories).
- date_from: YYYY-MM-DD (inclusive)
- date_to: YYYY-MM-DD (inclusive)
- before_id: cursor pagination (lấy các tin nhắn có id < before_id)
"""
try:
if not identity_key or identity_key.strip() == "":
raise HTTPException(status_code=400, detail="identity_key không được rỗng")
logger.info(f"GET History: identity={identity_key} | limit={limit} | date={date_from}~{date_to}")
# Build dynamic query against langgraph_chat_histories
conditions = ["identity_key = %s"]
params: list[Any] = [identity_key]
if before_id:
conditions.append("id < %s")
params.append(before_id)
if date_from:
conditions.append("timestamp >= %s::timestamptz")
params.append(f"{date_from} 00:00:00+07")
if date_to:
conditions.append("timestamp <= %s::timestamptz")
params.append(f"{date_to} 23:59:59+07")
where_clause = " AND ".join(conditions)
safe_limit = min(limit or 50, 200)
params.append(safe_limit)
query = f"""
SELECT id, identity_key, message, is_human, timestamp
FROM langgraph_chat_histories
WHERE {where_clause}
ORDER BY id DESC
LIMIT %s
"""
pool = await _get_pool()
async with pool.connection() as conn:
async with conn.cursor() as cur:
await cur.execute("SET timezone = 'Asia/Ho_Chi_Minh'")
await cur.execute(query, tuple(params))
rows = await cur.fetchall()
columns = [desc[0] for desc in cur.description] if cur.description else []
history = []
for row in rows:
item = dict(zip(columns, row))
# Convert timestamp to ISO string for JSON
ts = item.get("timestamp")
if ts and hasattr(ts, "isoformat"):
item["timestamp"] = ts.isoformat()
elif ts:
item["timestamp"] = str(ts)
is_human = item.get("is_human", False)
message_content = item.get("message", "")
if not is_human:
# Parse product_ids from AI message JSON
try:
parsed = json.loads(message_content)
item["message"] = parsed.get("ai_response", message_content)
item["product_ids"] = parsed.get("product_ids", parsed.get("products", []))
except (json.JSONDecodeError, TypeError):
item["product_ids"] = []
history.append(item)
next_cursor = history[-1]["id"] if history else None
logger.info(f"✅ Fetched {len(history)} messages for {identity_key} from Postgres (langgraph_chat_histories)")
return {"data": history, "next_cursor": next_cursor}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error fetching chat history from Postgres: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to fetch chat history: {str(e)}") from e
...@@ -10,7 +10,22 @@ from fastapi import APIRouter, HTTPException ...@@ -10,7 +10,22 @@ from fastapi import APIRouter, HTTPException
from pydantic import BaseModel from pydantic import BaseModel
from config import CHECKPOINT_POSTGRES_URL from config import CHECKPOINT_POSTGRES_URL
from api.notes.notes_route import _get_pool, _now from psycopg_pool import AsyncConnectionPool
VN_TZ = timezone(timedelta(hours=7))
_pool: AsyncConnectionPool | None = None
async def _get_pool() -> AsyncConnectionPool:
global _pool
if _pool is None:
_pool = AsyncConnectionPool(CHECKPOINT_POSTGRES_URL, min_size=1, max_size=5, open=False)
await _pool.open()
return _pool
def _now() -> datetime:
return datetime.now(VN_TZ)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/dashboard", tags=["Roadmap & Flow"]) router = APIRouter(prefix="/api/dashboard", tags=["Roadmap & Flow"])
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment