Commit c9a437ef authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

Initial commit: Chatbot Canifa system

parents
Pipeline #3290 failed with stages
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual Environment
.venv/
venv/
ENV/
# IDEs
.vscode/
.idea/
# Backend specifically
backend/.env
backend/.venv/
backend/__pycache__/
backend/*.pyc
# OS
.DS_Store
Thumbs.db
# Misc
.ruff_cache/
*.log
!backend/requirements.txt
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.8.4
hooks:
# Run the linter.
- id: ruff
args: [ --fix ]
# Run the formatter.
- id: ruff-format
__pycache__
*.pyc
.env
.venv
venv
.git
.gitignore
.dockerignore
logs
data
# EditorConfig - đảm bảo indentation nhất quán
# https://editorconfig.org
root = true
[*]
charset = utf-8
end_of_line = lf
insert_final_newline = true
trim_trailing_whitespace = true
[*.py]
indent_style = space
indent_size = 4
max_line_length = 120
[*.{json,yml,yaml}]
indent_style = space
indent_size = 2
[*.md]
trim_trailing_whitespace = false
# Ignore embedded repo
preference/
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Requirement already satisfied: python-socketio in c:\users\fptshop\miniconda3\envs\robot\lib\site-packages (5.13.0)
Requirement already satisfied: bidict>=0.21.0 in c:\users\fptshop\miniconda3\envs\robot\lib\site-packages (from python-socketio) (0.23.1)
Requirement already satisfied: python-engineio>=4.11.0 in c:\users\fptshop\miniconda3\envs\robot\lib\site-packages (from python-socketio) (4.12.2)
Requirement already satisfied: simple-websocket>=0.10.0 in c:\users\fptshop\miniconda3\envs\robot\lib\site-packages (from python-engineio>=4.11.0->python-socketio) (1.1.0)
Requirement already satisfied: wsproto in c:\users\fptshop\miniconda3\envs\robot\lib\site-packages (from simple-websocket>=0.10.0->python-engineio>=4.11.0->python-socketio) (1.2.0)
Requirement already satisfied: h11<1,>=0.9.0 in c:\users\fptshop\miniconda3\envs\robot\lib\site-packages (from wsproto->simple-websocket>=0.10.0->python-engineio>=4.11.0->python-socketio) (0.16.0)
# Sử dụng Python 3.11 Slim để tối ưu dung lượng
FROM python:3.11-slim
# Thiết lập thư mục làm việc
WORKDIR /app
# Cài đặt các dependencies hệ thống cần thiết (nếu có, ví dụ build tools)
# RUN apt-get update && apt-get install -y gcc libpq-dev && rm -rf /var/lib/apt/lists/*
# Copy requirements.txt trước để tận dụng Docker cache
COPY requirements.txt .
# Cài đặt thư viện Python
RUN pip install --no-cache-dir -r requirements.txt
# Copy toàn bộ source code vào image
COPY . .
# Expose port 5000 (Port chạy server)
EXPOSE 5000
# Lệnh chạy server
CMD ["python", "server.py"]
"""
Backend Package
"""
"""
Fashion Q&A Agent Package
"""
from .graph import build_graph
from .models import AgentConfig, AgentState, get_config
__all__ = [
"AgentConfig",
"AgentState",
"build_graph",
"get_config",
]
"""
Fashion Q&A Agent Controller
Điều phối luồng chạy của Agent, tích hợp ConversationManager (Postgres Memory).
Switched to LangSmith for tracing (configured via environment variables).
"""
import json
import logging
import uuid
from collections.abc import AsyncGenerator
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.runnables import RunnableConfig
from common.llm_factory import create_llm
from common.conversation_manager import get_conversation_manager, ConversationManager
from config import DEFAULT_MODEL
from .graph import build_graph
from .models import AgentState, get_config
from .tools.get_tools import get_all_tools
logger = logging.getLogger(__name__)
async def chat_controller(
query: str, user_id: str, model_name: str = DEFAULT_MODEL, conversation_id: str | None = None, images: list[str] | None = None
) -> AsyncGenerator[str, None]:
# 1. Khởi tạo & Chuẩn bị (Dependency Injection)
logger.info(f"▶️ Starting chat_controller with model: {model_name} for user: {user_id}")
config = get_config()
config.model_name = model_name
# Khởi tạo resources - Factory sẽ tự động chọn provider dựa trên tên model
llm = create_llm(model_name=model_name, streaming=True)
tools = get_all_tools()
graph = build_graph(config, llm=llm, tools=tools)
# Init ConversationManager (Singleton)
memory = get_conversation_manager()
actual_conv_id = conversation_id or str(uuid.uuid4())
# LOAD HISTORY & Prepare State
# Get history from Postgres (returns list of dicts)
history_dicts = memory.get_chat_history(user_id, limit=10)
# Convert to BaseMessage objects
history = []
for h in reversed(history_dicts): # API returns desc, we want chronological for context
if h['is_human']:
history.append(HumanMessage(content=h['message']))
else:
history.append(AIMessage(content=h['message']))
current_human_msg = HumanMessage(content=query)
initial_state, exec_config = _prepare_execution_context(
query=query, user_id=user_id, actual_conv_id=actual_conv_id, history=history, images=images
)
final_ai_message = None
# 3. Stream Engine
try:
async for event in graph.astream(initial_state, config=exec_config, stream_mode="values"):
final_ai_message = _extract_last_ai_message(event) or final_ai_message
# Serialize messages to dicts to avoid "content='...'" string representation
if "messages" in event:
event["messages"] = [
m.dict() if hasattr(m, "dict") else m
for m in event["messages"]
]
yield f"data: {json.dumps(event, default=str, ensure_ascii=False)}\n\n"
# 4. Hậu xử lý (Lưu DB)
_handle_post_chat(
memory=memory,
user_id=user_id,
human_query=query,
ai_msg=final_ai_message,
)
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"💥 Stream error: {e}", exc_info=True)
yield f"data: {json.dumps({'error': str(e)})}\n\n"
finally:
logger.info(f"✅ Request completed for conversation {actual_conv_id}")
def _prepare_execution_context(query: str, user_id: str, actual_conv_id: str, history: list, images: list | None):
"""Tách logic chuẩn bị state và config để giảm độ phức tạp."""
initial_state: AgentState = {
"messages": [HumanMessage(content=query)],
"history": history,
"user_id": user_id,
"images": [],
"thread_id": actual_conv_id,
"image_analysis": None,
}
run_id = str(uuid.uuid4())
# Metadata for LangSmith
metadata = {
"conversation_id": actual_conv_id,
"user_id": user_id,
"run_id": run_id
}
exec_config = RunnableConfig(
configurable={
"conversation_id": actual_conv_id,
"user_id": user_id,
"transient_images": images or [],
"run_id": run_id,
},
run_id=run_id,
metadata=metadata, # Attach metadata for LangSmith
)
return initial_state, exec_config
def _extract_last_ai_message(event: dict) -> AIMessage | None:
"""Trích xuất tin nhắn AI cuối cùng từ event stream."""
if event.get("messages"):
last_msg = event["messages"][-1]
if isinstance(last_msg, AIMessage):
return last_msg
return None
def _handle_post_chat(memory: ConversationManager, user_id: str, human_query: str, ai_msg: AIMessage | None):
"""Xử lý lưu history sau khi kết thúc stream. LangSmith tự động trace nên không cần update thủ công."""
if ai_msg:
# Save User Message
memory.save_message(user_id, human_query, True)
# Save AI Message
memory.save_message(user_id, ai_msg.content, False)
logger.info(f"💾 Saved conversation for user {user_id} to Postgres")
"""
Fashion Q&A Agent Controller
Điều phối luồng chạy của Agent, tích hợp ConversationManager (Postgres Memory).
Using Langfuse @observe() decorator for automatic trace creation.
"""
import json
import logging
import uuid
from collections.abc import AsyncGenerator
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.runnables import RunnableConfig
from langfuse import get_client, observe, propagate_attributes
from langfuse.langchain import CallbackHandler
from common.llm_factory import create_llm
from common.conversation_manager import get_conversation_manager, ConversationManager
from .graph import build_graph
from .models import AgentState, get_config
from .tools.get_tools import get_all_tools
logger = logging.getLogger(__name__)
@observe(capture_input=False, capture_output=False)
async def chat_controller(
query: str, user_id: str, model_name: str, conversation_id: str | None = None, images: list[str] | None = None
) -> AsyncGenerator[str, None]:
# 1. Khởi tạo & Chuẩn bị (Dependency Injection)
logger.info(f"▶️ Starting chat_controller for user: {user_id}")
config = get_config()
config.model_name = model_name
# Khởi tạo resources bên ngoài để dễ test/mock
llm = create_llm(model_name=model_name, api_key=config.openai_api_key, streaming=True)
tools = get_all_tools()
graph = build_graph(config, llm=llm, tools=tools)
# Init ConversationManager (Singleton)
memory = get_conversation_manager()
actual_conv_id = conversation_id or str(uuid.uuid4())
# 2. Chạy logic chính trong trace context
with propagate_attributes(session_id=actual_conv_id, user_id=user_id, tags=["canifa", "chatbot"]):
# LOAD HISTORY & Prepare State
# Get history from Postgres (returns list of dicts)
history_dicts = memory.get_chat_history(user_id, limit=10)
# Convert to BaseMessage objects
history = []
for h in reversed(history_dicts): # API returns desc, we want chronological for context
if h['is_human']:
history.append(HumanMessage(content=h['message']))
else:
history.append(AIMessage(content=h['message']))
current_human_msg = HumanMessage(content=query)
initial_state, exec_config = _prepare_execution_context(
query=query, user_id=user_id, actual_conv_id=actual_conv_id, history=history, images=images
)
final_ai_message = None
# 3. Stream Engine
try:
async for event in graph.astream(initial_state, config=exec_config, stream_mode="values"):
final_ai_message = _extract_last_ai_message(event) or final_ai_message
# Serialize messages to dicts to avoid "content='...'" string representation
if "messages" in event:
event["messages"] = [
m.dict() if hasattr(m, "dict") else m
for m in event["messages"]
]
yield f"data: {json.dumps(event, default=str, ensure_ascii=False)}\n\n"
# 4. Hậu xử lý (Lưu DB & Trace)
_handle_post_chat(
memory=memory,
user_id=user_id,
human_query=query,
ai_msg=final_ai_message,
query=query,
images_count=len(images) if images else 0,
)
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"💥 Stream error: {e}", exc_info=True)
yield f"data: {json.dumps({'error': str(e)})}\n\n"
finally:
logger.info(f"✅ Request completed for conversation {actual_conv_id}")
def _prepare_execution_context(query: str, user_id: str, actual_conv_id: str, history: list, images: list | None):
"""Tách logic chuẩn bị state và config để giảm độ phức tạp."""
initial_state: AgentState = {
"messages": [HumanMessage(content=query)],
"history": history,
"user_id": user_id,
"images": [],
"thread_id": actual_conv_id,
"image_analysis": None,
}
run_id = str(uuid.uuid4())
exec_config = RunnableConfig(
configurable={
"conversation_id": actual_conv_id,
"user_id": user_id,
"transient_images": images or [],
"run_id": run_id,
},
run_id=run_id,
callbacks=[CallbackHandler()],
)
return initial_state, exec_config
def _extract_last_ai_message(event: dict) -> AIMessage | None:
"""Trích xuất tin nhắn AI cuối cùng từ event stream."""
if event.get("messages"):
last_msg = event["messages"][-1]
if isinstance(last_msg, AIMessage):
return last_msg
return None
def _handle_post_chat(memory: ConversationManager, user_id: str, human_query: str, ai_msg: AIMessage | None, query: str, images_count: int):
"""Xử lý lưu history và update trace sau khi kết thúc stream."""
if ai_msg:
# Save User Message
memory.save_message(user_id, human_query, True)
# Save AI Message
memory.save_message(user_id, ai_msg.content, False)
logger.info(f"💾 Saved conversation for user {user_id} to Postgres")
# Update trace
try:
langfuse = get_client()
langfuse.update_current_trace(
name="canifa-chatbot-query",
input={"query": query, "images_count": images_count},
output={"response": ai_msg.content if ai_msg else None},
)
except Exception as e:
logger.warning(f"Failed to update trace: {e}")
"""
Fashion Q&A Agent Graph
LangGraph workflow với clean architecture.
Tất cả resources (LLM, Tools) khởi tạo trong __init__.
Sử dụng ConversationManager (Postgres) để lưu history thay vì checkpoint.
"""
import logging
from typing import Any
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import HumanMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnableConfig
from langgraph.cache.memory import InMemoryCache
from langgraph.graph import END, StateGraph
from langgraph.prebuilt import ToolNode
from langgraph.types import CachePolicy
from common.llm_factory import create_llm
from .models import AgentConfig, AgentState, get_config
from .prompt import get_system_prompt
from .tools.get_tools import get_all_tools, get_collection_tools
logger = logging.getLogger(__name__)
class CANIFAGraph:
"""
Fashion Q&A Agent Graph Manager.
Khởi tạo tất cả resources trong __init__, các node dùng self.xxx.
"""
def __init__(
self,
config: AgentConfig | None = None,
llm: BaseChatModel | None = None,
tools: list | None = None,
):
self.config = config or get_config()
self._compiled_graph: Any | None = None
# Dependency Injection: Ưu tiên dùng llm/tools được truyền vào
self.llm: BaseChatModel = llm or create_llm(
model_name=self.config.model_name, api_key=self.config.openai_api_key, streaming=True
)
# Phân loại tools
self.all_tools = tools or get_all_tools()
self.collection_tools = get_collection_tools() # Vẫn lấy list name để routing
# Retrieval tools are logically all tools minus collection tools (conceptually, or specific list)
# For simplicity and robust tool usage, we can bind all tools to retrieval node if needed,
# or separate them. The user code snippet uses `self.retrieval_tools` but passed `all_tools`.
# Reviewing user snippet: `workflow.add_node("retrieve_tools", ToolNode(self.retrieval_tools)...`
# But `retrieval_tools` wasn't defined in __init__ in the user snippet, likely implied.
# I'll define retrieval_tools as all tools for now or filter if strictly needed.
# Assuming all_tools are retrieval compatible except collection ones?
# Let's use all_tools for the ToolNode to be safe unless distinct behavior is needed.
self.retrieval_tools = self.all_tools
self.llm_with_tools = self.llm.bind_tools(self.all_tools)
self.system_prompt = get_system_prompt()
self.cache = InMemoryCache()
async def _agent_node(self, state: AgentState, config: RunnableConfig) -> dict:
"""Agent node - LLM reasoning với tools và history sạch."""
messages = state.get("messages", [])
history = state.get("history", [])
prompt = ChatPromptTemplate.from_messages(
[
("system", self.system_prompt),
MessagesPlaceholder(variable_name="history"), # Long-term clean history
MessagesPlaceholder(variable_name="messages"), # Current turn technical messages
]
)
# 2. Xử lý Image hint (Lấy từ Config của lượt chạy này)
transient_images = config.get("configurable", {}).get("transient_images", [])
if transient_images and messages:
# Removed image processing logic as requested
pass
# Invoke LLM
chain = prompt | self.llm_with_tools
response = await chain.ainvoke({"messages": messages, "history": history})
return {"messages": [response]}
def _should_continue(self, state: AgentState) -> str:
"""Routing: tool nodes hoặc end."""
last_message = state["messages"][-1]
if not hasattr(last_message, "tool_calls") or not last_message.tool_calls:
logger.info("🏁 Agent finished")
return "end"
tool_names = [tc["name"] for tc in last_message.tool_calls]
collection_names = [t.name for t in self.collection_tools]
if any(name in collection_names for name in tool_names):
logger.info(f"🔄 → collect_tools: {tool_names}")
return "collect_tools"
logger.info(f"🔄 → retrieve_tools: {tool_names}")
return "retrieve_tools"
def build(self) -> Any:
"""Build và compile LangGraph workflow (Không dùng Checkpointer)."""
if self._compiled_graph is not None:
return self._compiled_graph
logger.info("🔨 Building LangGraph workflow (No Checkpointer)...")
workflow = StateGraph(AgentState)
# Nodes
workflow.add_node("agent", self._agent_node)
workflow.add_node("retrieve_tools", ToolNode(self.retrieval_tools), cache_policy=CachePolicy(ttl=3600))
workflow.add_node("collect_tools", ToolNode(self.collection_tools))
# Edges
workflow.set_entry_point("agent")
workflow.add_conditional_edges(
"agent",
self._should_continue,
{"retrieve_tools": "retrieve_tools", "collect_tools": "collect_tools", "end": END},
)
workflow.add_edge("retrieve_tools", "agent")
workflow.add_edge("collect_tools", "agent")
# Compile WITHOUT checkpointer
self._compiled_graph = workflow.compile(cache=self.cache)
# ❌ KHÔNG ATTACH Langfuse callback vào compiled graph
# ✅ Sẽ pass callback vào runtime config của mỗi lượt chạy
logger.info("✅ Graph compiled (Langfuse callback will be per-run)")
return self._compiled_graph
@property
def graph(self) -> Any:
return self.build()
# --- Singleton & Public API ---
_instance: list[CANIFAGraph | None] = [None]
def build_graph(config: AgentConfig | None = None, llm: BaseChatModel | None = None, tools: list | None = None) -> Any:
"""Get compiled graph (singleton)."""
if _instance[0] is None:
_instance[0] = CANIFAGraph(config, llm, tools)
return _instance[0].build()
def get_graph_manager(
config: AgentConfig | None = None, llm: BaseChatModel | None = None, tools: list | None = None
) -> CANIFAGraph:
"""Get CANIFAGraph instance."""
if _instance[0] is None:
_instance[0] = CANIFAGraph(config, llm, tools)
return _instance[0]
def reset_graph() -> None:
"""Reset singleton for testing."""
_instance[0] = None
# """
# Simple Memory Manager for Fashion Q&A Agent
# Tối giản hóa: Chỉ sử dụng 1 Collection 'conversations' trong MongoDB
# """
# logger = logging.getLogger(__name__)
class SimpleMemoryManager:
"""
Quản lý bộ nhớ tối giản: Lưu/Load message từ 1 collection duy nhất.
Sử dụng conversation_id làm định danh chính.
TẠM THỜI COMMENT LOGIC ĐỂ TEST CHAY
"""
def __init__(self):
# self.client = get_mongo_client()
# self.db = self.client[MONGODB_DB_NAME or "ai_law"]
# self.collection = self.db["conversations"] # Tên bảng tối giản
# self._indexes_created = False
pass
async def _ensure_indexes(self):
# if not self._indexes_created:
# # Index theo ID cuộc hội thoại và thời gian cập nhật
# await self.collection.create_index("_id")
# await self.collection.create_index("updated_at")
# self._indexes_created = True
pass
async def save_messages(
self,
conversation_id: str,
messages: list, # List[BaseMessage],
user_id: str | None = None, # Optional[str] = None
):
"""Lưu toàn bộ danh sách tin nhắn vào cuộc hội thoại."""
# try:
# await self._ensure_indexes()
# messages_dict = [self._message_to_dict(msg) for msg in messages]
# await self.collection.update_one(
# {"_id": conversation_id},
# {
# "$set": {
# "user_id": user_id,
# "messages": messages_dict,
# "updated_at": datetime.utcnow(),
# },
# "$setOnInsert": {
# "created_at": datetime.utcnow(),
# }
# },
# upsert=True
# )
# except Exception as e:
# # logger.error(f"❌ Save memory error: {e}")
# raise
pass
async def load_messages(self, conversation_id: str, limit: int = 20) -> list: # List[BaseMessage]:
"""Load N tin nhắn gần nhất của cuộc hội thoại."""
# try:
# await self._ensure_indexes()
# doc = await self.collection.find_one({"_id": conversation_id})
# if not doc or "messages" not in doc:
# return []
# msgs_dict = doc["messages"]
# # Chỉ lấy số lượng tin nhắn giới hạn để tiết kiệm token
# if limit:
# msgs_dict = msgs_dict[-limit:]
# return [self._dict_to_message(m) for m in msgs_dict]
# except Exception as e:
# # logger.error(f"❌ Load memory error: {e}")
# return []
return []
# def _message_to_dict(self, msg: BaseMessage) -> dict:
# return {
# "type": msg.__class__.__name__,
# "content": msg.content,
# "timestamp": datetime.utcnow().isoformat(),
# }
# def _dict_to_message(self, msg_dict: dict) -> BaseMessage:
# m_type = msg_dict.get("type", "HumanMessage")
# content = msg_dict.get("content", "")
# if m_type == "AIMessage": return AIMessage(content=content)
# if m_type == "SystemMessage": return SystemMessage(content=content)
# return HumanMessage(content=content)
# Singleton
_memory_manager = None
def get_memory_manager():
global _memory_manager
if _memory_manager is None:
_memory_manager = SimpleMemoryManager()
return _memory_manager
from typing import Annotated, Any, TypedDict
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
from pydantic import BaseModel
import config as global_config
class QueryRequest(BaseModel):
"""API Request model cho Fashion Q&A Chat"""
query: str
history: list[BaseMessage] | None = None
model_name: str = global_config.DEFAULT_MODEL
user_id: str | None = None
images: list[str] | None = None
image_analysis: dict[str, Any] | None = None
conversation_id: str | None = None
class AgentState(TypedDict):
"""Trạng thái của Agent trong LangGraph."""
messages: Annotated[list[BaseMessage], add_messages]
history: list[BaseMessage] # Conversation history sạch (Human + AI)
user_id: str | None
images: list[str] | None
image_analysis: dict[str, Any] | None
thread_id: str | None
class AgentConfig:
"""Class chứa cấu hình runtime cho Agent."""
def __init__(self, **kwargs):
self.model_name = kwargs.get("model_name") or global_config.DEFAULT_MODEL
self.openai_api_key = kwargs.get("openai_api_key")
self.google_api_key = kwargs.get("google_api_key")
self.groq_api_key = kwargs.get("groq_api_key")
self.supabase_url = kwargs.get("supabase_url")
self.supabase_key = kwargs.get("supabase_key")
self.langfuse_public_key = kwargs.get("langfuse_public_key")
self.langfuse_secret_key = kwargs.get("langfuse_secret_key")
self.langfuse_base_url = kwargs.get("langfuse_base_url")
def get_config() -> AgentConfig:
"""Khởi tạo cấu hình Agent từ các biến môi trường."""
return AgentConfig(
model_name=global_config.DEFAULT_MODEL,
openai_api_key=global_config.OPENAI_API_KEY,
google_api_key=global_config.GOOGLE_API_KEY,
groq_api_key=global_config.GROQ_API_KEY,
supabase_url=global_config.AI_SUPABASE_URL,
supabase_key=global_config.AI_SUPABASE_KEY,
langfuse_public_key=global_config.LANGFUSE_PUBLIC_KEY,
langfuse_secret_key=global_config.LANGFUSE_SECRET_KEY,
langfuse_base_url=global_config.LANGFUSE_BASE_URL,
)
"""
Agent Nodes Package
"""
from .agent import agent_node
__all__ = ["agent_node"]
This diff is collapsed.
"""
Tools Package
Export tool và factory function
"""
from .data_retrieval_tool import data_retrieval_tool
from .get_tools import get_all_tools
__all__ = ["data_retrieval_tool", "get_all_tools"]
"""
Tool thu thập thông tin khách hàng (Tên, Số điện thoại, Email)
Dùng để đẩy data về CRM hoặc hệ thống lưu trữ khách hàng.
"""
import json
import logging
from langchain_core.tools import tool
logger = logging.getLogger(__name__)
@tool
async def collect_customer_info(name: str, phone: str, email: str | None = None) -> str:
"""
Sử dụng tool này để ghi lại thông tin khách hàng khi họ muốn tư vấn sâu hơn,
nhận khuyến mãi hoặc đăng ký mua hàng.
Args:
name: Tên của khách hàng
phone: Số điện thoại của khách hàng
email: Email của khách hàng (không bắt buộc)
"""
try:
print(f"\n[TOOL] --- 📝 Thu thập thông tin khách hàng: {name} - {phone} ---")
logger.info(f"📝 Collecting customer info: {name}, {phone}, {email}")
# Giả lập việc đẩy data đi (CRM/Sheet)
# Trong thực tế, bạn sẽ gọi một API ở đây
db_record = {
"customer_name": name,
"phone_number": phone,
"email_address": email,
"status": "pending_consultation",
}
# Trả về kết quả thành công
return json.dumps(
{
"status": "success",
"message": (
f"Cảm ơn anh/chị {name}. CiCi đã ghi nhận thông tin và sẽ có nhân viên "
f"liên hệ tư vấn qua số điện thoại {phone} sớm nhất ạ!"
),
"data_captured": db_record,
},
ensure_ascii=False,
)
except Exception as e:
logger.error(f"❌ Lỗi khi thu thập thông tin: {e}")
return json.dumps(
{
"status": "error",
"message": f"Xin lỗi, CiCi gặp sự cố khi lưu thông tin. Anh/chị vui lòng thử lại sau ạ. Lỗi: {e!s}",
},
ensure_ascii=False,
)
This diff is collapsed.
"""
Tools Factory
Chỉ return 1 tool duy nhất: data_retrieval_tool
"""
from langchain_core.tools import Tool
from .customer_info_tool import collect_customer_info
from .data_retrieval_tool import data_retrieval_tool
def get_retrieval_tools() -> list[Tool]:
"""Các tool chỉ dùng để đọc/truy vấn dữ liệu (Có thể cache)"""
return [data_retrieval_tool]
def get_collection_tools() -> list[Tool]:
"""Các tool dùng để ghi/thu thập dữ liệu (KHÔNG cache)"""
return [collect_customer_info]
def get_all_tools() -> list[Tool]:
"""Return toàn bộ list tools cho Agent"""
return get_retrieval_tools() + get_collection_tools()
This diff is collapsed.
"""
Fashion Q&A Agent Router
FastAPI endpoints cho Fashion Q&A Agent service.
Router chỉ chứa định nghĩa API, logic nằm ở controller.
"""
import json
import logging
from collections.abc import AsyncGenerator
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import StreamingResponse
from agent.controller import chat_controller
from agent.models import QueryRequest
from config import DEFAULT_MODEL
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/chat", summary="Fashion Q&A Chat (Non-streaming)")
async def fashion_qa_chat(req: QueryRequest, request: Request):
"""
Endpoint chat không stream - trả về response JSON đầy đủ một lần.
"""
# Trích xuất user_id từ request (auth middleware)
user_id = getattr(request.state, "user_id", None) or req.user_id or "default_user"
logger.info(f"📥 [Incoming Query - NonStream] User: {user_id} | Query: {req.query}")
try:
# Gọi controller để xử lý logic và nhận generator stream
# Note: Vì chat_controller có decorator @observe(), cần await để unwrap
generator: AsyncGenerator[str, None] = chat_controller(
query=req.query,
user_id=user_id,
model_name=DEFAULT_MODEL,
conversation_id=req.conversation_id,
images=req.images,
)
# Collect toàn bộ events từ generator
final_response = None
async for chunk in generator:
# Parse SSE data format
if chunk.startswith("data: "):
data_str = chunk[6:].strip()
if data_str != "[DONE]":
final_response = json.loads(data_str)
# Trả về response cuối cùng
if final_response and "messages" in final_response:
last_message = final_response["messages"][-1]
response_text = last_message.get("content", "") if isinstance(last_message, dict) else str(last_message)
logger.info(f"📤 [Outgoing Response - NonStream] User: {user_id} | Response: {response_text}")
return {
"status": "success",
"response": response_text,
"conversation_id": req.conversation_id,
}
return {"status": "error", "message": "No response generated"}
except Exception as e:
logger.error(f"Error in fashion_qa_chat: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
# ====================== FASHION Q&A CHAT API ======================
@router.post("/stream/chat", summary="Fashion Q&A Chat with Streaming Response")
async def fashion_qa_chat_stream(req: QueryRequest, request: Request):
"""
Endpoint duy nhất cho việc chat với Fashion Agent.
"""
# Trích xuất user_id từ request (auth middleware)
user_id = getattr(request.state, "user_id", None) or req.user_id or "default_user"
logger.info(f"📥 [Incoming Query] User: {user_id} | Query: {req.query}")
try:
# Gọi controller để xử lý logic và nhận generator stream
# Note: Vì chat_controller có decorator @observe(), cần await để unwrap
generator: AsyncGenerator[str, None] = chat_controller(
query=req.query,
user_id=user_id,
model_name=DEFAULT_MODEL,
conversation_id=req.conversation_id,
images=req.images,
)
async def logging_generator(gen: AsyncGenerator[str, None]):
full_response_log = ""
first_chunk = True
try:
async for chunk in gen:
if first_chunk:
logger.info("🚀 [Stream Started] First chunk received")
first_chunk = False
full_response_log += chunk
yield chunk
except Exception as e:
logger.error(f"❌ [Stream Error] {e}")
yield f"data: {json.dumps({'error': str(e)})}\n\n"
logger.info(f"📤 [Outgoing Response Stream Finished] Total Chunks Length: {len(full_response_log)}")
return StreamingResponse(
logging_generator(generator),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
except Exception as e:
logger.error(f"Error in fashion_qa_chat: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
import asyncio
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from psycopg_pool import AsyncConnectionPool
async def test():
pool = AsyncConnectionPool("", open=False)
saver = AsyncPostgresSaver(pool)
print(f"Attributes with 'pool': {[a for a in dir(saver) if 'pool' in a.lower()]}")
await pool.close()
if __name__ == "__main__":
asyncio.run(test())
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
cdcdccdc @ f5deb28b
This diff is collapsed.
chatbot-rsa @ d6b45f42
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment