Commit 288fcc98 authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

refactor: all codebase

parent c9a437ef
# Makefile cho CANIFA Chatbot
.PHONY: up down restart logs build ps clean setup-nginx monitor-up monitor-down
up:
docker-compose up -d --build
down:
docker-compose down
restart:
docker-compose restart backend
logs:
docker-compose logs -f backend
ps:
docker-compose ps
build:
docker-compose build
clean:
docker-compose down -v --rmi all --remove-orphans
setup-nginx:
@echo "🚀 Đang cấu hình Nginx..."
sudo cp nginx.conf /etc/nginx/sites-available/chatbot
sudo ln -sf /etc/nginx/sites-available/chatbot /etc/nginx/sites-enabled/
sudo nginx -t && sudo systemctl restart nginx
@echo "✅ Nginx đã được cấu hình và restart!"
"""
Fashion Q&A Agent Controller
Điều phối luồng chạy của Agent, tích hợp ConversationManager (Postgres Memory).
Switched to LangSmith for tracing (configured via environment variables).
"""
import json
import logging
import uuid
from collections.abc import AsyncGenerator
from langchain_core.messages import AIMessage, HumanMessage
from fastapi import BackgroundTasks
from langchain_core.messages import AIMessage, HumanMessage, ToolMessage
from langchain_core.runnables import RunnableConfig
from common.conversation_manager import ConversationManager, get_conversation_manager
from common.llm_factory import create_llm
from common.conversation_manager import get_conversation_manager, ConversationManager
from config import DEFAULT_MODEL
from .graph import build_graph
......@@ -22,96 +21,117 @@ from .tools.get_tools import get_all_tools
logger = logging.getLogger(__name__)
async def chat_controller(
query: str, user_id: str, model_name: str = DEFAULT_MODEL, conversation_id: str | None = None, images: list[str] | None = None
) -> AsyncGenerator[str, None]:
# 1. Khởi tạo & Chuẩn bị (Dependency Injection)
query: str,
user_id: str,
background_tasks: BackgroundTasks,
model_name: str = DEFAULT_MODEL,
images: list[str] | None = None,
) -> dict:
"""
Controller main logic for non-streaming chat requests.
1. Initialize resources (LLM, tools, graph, conversation manager)
"""
logger.info(f"▶️ Starting chat_controller with model: {model_name} for user: {user_id}")
config = get_config()
config.model_name = model_name
# Khởi tạo resources - Factory sẽ tự động chọn provider dựa trên tên model
llm = create_llm(model_name=model_name, streaming=True)
# Enable JSON mode to ensure structured output
llm = create_llm(model_name=model_name, streaming=False, json_mode=True)
tools = get_all_tools()
graph = build_graph(config, llm=llm, tools=tools)
# Init ConversationManager (Singleton)
memory = get_conversation_manager()
actual_conv_id = conversation_id or str(uuid.uuid4())
memory = await get_conversation_manager()
# LOAD HISTORY & Prepare State
# Get history from Postgres (returns list of dicts)
history_dicts = memory.get_chat_history(user_id, limit=10)
history_dicts = await memory.get_chat_history(user_id, limit=20)
# Convert to BaseMessage objects
history = []
for h in reversed(history_dicts): # API returns desc, we want chronological for context
if h['is_human']:
history.append(HumanMessage(content=h['message']))
for h in reversed(history_dicts):
if h["is_human"]:
history.append(HumanMessage(content=h["message"]))
else:
history.append(AIMessage(content=h['message']))
current_human_msg = HumanMessage(content=query)
history.append(AIMessage(content=h["message"]))
initial_state, exec_config = _prepare_execution_context(
query=query, user_id=user_id, actual_conv_id=actual_conv_id, history=history, images=images
query=query, user_id=user_id, history=history, images=images
)
final_ai_message = None
# 3. Stream Engine
try:
async for event in graph.astream(initial_state, config=exec_config, stream_mode="values"):
final_ai_message = _extract_last_ai_message(event) or final_ai_message
result = await graph.ainvoke(initial_state, config=exec_config)
# logger.info(f"Answer result from ai: {result}")
# Serialize messages to dicts to avoid "content='...'" string representation
if "messages" in event:
event["messages"] = [
m.dict() if hasattr(m, "dict") else m
for m in event["messages"]
]
# take ai message from result
final_ai_message = result.get("ai_response")
yield f"data: {json.dumps(event, default=str, ensure_ascii=False)}\n\n"
# Extract product IDs from tool messages
product_ids = _extract_product_ids(result.get("messages", []))
# 4. Hậu xử lý (Lưu DB)
_handle_post_chat(
# Save to DB in background after response is sent
background_tasks.add_task(
_handle_post_chat_async,
memory=memory,
user_id=user_id,
human_query=query,
ai_msg=final_ai_message,
)
yield "data: [DONE]\n\n"
logger.info(f"✅ Request completed for user {user_id} with {len(product_ids)} products")
return {
"ai_response": final_ai_message.content if final_ai_message else "",
"product_ids": product_ids,
}
except Exception as e:
logger.error(f"💥 Stream error: {e}", exc_info=True)
yield f"data: {json.dumps({'error': str(e)})}\n\n"
finally:
logger.info(f"✅ Request completed for conversation {actual_conv_id}")
logger.error(f"💥 Chat error: {e}", exc_info=True)
raise
def _extract_product_ids(messages: list) -> list[str]:
"""
Extract product internal_ref_code from tool messages (data_retrieval_tool results).
Returns list of unique product IDs.
"""
product_ids = []
for msg in messages:
if isinstance(msg, ToolMessage):
try:
# Tool result is JSON string
tool_result = json.loads(msg.content)
def _prepare_execution_context(query: str, user_id: str, actual_conv_id: str, history: list, images: list | None):
"""Tách logic chuẩn bị state và config để giảm độ phức tạp."""
# Check if tool returned products
if tool_result.get("status") == "success" and "products" in tool_result:
for product in tool_result["products"]:
product_id = product.get("internal_ref_code")
if product_id and product_id not in product_ids:
product_ids.append(product_id)
except (json.JSONDecodeError, KeyError, TypeError) as e:
logger.debug(f"Could not parse tool message for product IDs: {e}")
continue
return product_ids
def _prepare_execution_context(query: str, user_id: str, history: list, images: list | None):
"""Prepare initial state and execution config for the graph run."""
initial_state: AgentState = {
"user_query": HumanMessage(content=query),
"messages": [HumanMessage(content=query)],
"history": history,
"user_id": user_id,
"images": [],
"thread_id": actual_conv_id,
"image_analysis": None,
"images_embedding": [],
"ai_response": None,
}
run_id = str(uuid.uuid4())
# Metadata for LangSmith
metadata = {
"conversation_id": actual_conv_id,
"user_id": user_id,
"run_id": run_id
}
metadata = {"user_id": user_id, "run_id": run_id}
exec_config = RunnableConfig(
configurable={
"conversation_id": actual_conv_id,
"user_id": user_id,
"transient_images": images or [],
"run_id": run_id,
......@@ -122,21 +142,13 @@ def _prepare_execution_context(query: str, user_id: str, actual_conv_id: str, hi
return initial_state, exec_config
def _extract_last_ai_message(event: dict) -> AIMessage | None:
"""Trích xuất tin nhắn AI cuối cùng từ event stream."""
if event.get("messages"):
last_msg = event["messages"][-1]
if isinstance(last_msg, AIMessage):
return last_msg
return None
def _handle_post_chat(memory: ConversationManager, user_id: str, human_query: str, ai_msg: AIMessage | None):
"""Xử lý lưu history sau khi kết thúc stream. LangSmith tự động trace nên không cần update thủ công."""
async def _handle_post_chat_async(
memory: ConversationManager, user_id: str, human_query: str, ai_msg: AIMessage | None
):
"""Save chat history in background task after response is sent."""
if ai_msg:
# Save User Message
memory.save_message(user_id, human_query, True)
# Save AI Message
memory.save_message(user_id, ai_msg.content, False)
logger.info(f"💾 Saved conversation for user {user_id} to Postgres")
try:
await memory.save_conversation_turn(user_id, human_query, ai_msg.content)
logger.debug(f"Saved conversation for user {user_id}")
except Exception as e:
logger.error(f"Failed to save conversation for user {user_id}: {e}", exc_info=True)
"""
Fashion Q&A Agent Controller
Điều phối luồng chạy của Agent, tích hợp ConversationManager (Postgres Memory).
Using Langfuse @observe() decorator for automatic trace creation.
"""
import json
import logging
import uuid
from collections.abc import AsyncGenerator
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.runnables import RunnableConfig
from langfuse import get_client, observe, propagate_attributes
from langfuse.langchain import CallbackHandler
from common.llm_factory import create_llm
from common.conversation_manager import get_conversation_manager, ConversationManager
from .graph import build_graph
from .models import AgentState, get_config
from .tools.get_tools import get_all_tools
logger = logging.getLogger(__name__)
@observe(capture_input=False, capture_output=False)
async def chat_controller(
query: str, user_id: str, model_name: str, conversation_id: str | None = None, images: list[str] | None = None
) -> AsyncGenerator[str, None]:
# 1. Khởi tạo & Chuẩn bị (Dependency Injection)
logger.info(f"▶️ Starting chat_controller for user: {user_id}")
config = get_config()
config.model_name = model_name
# Khởi tạo resources bên ngoài để dễ test/mock
llm = create_llm(model_name=model_name, api_key=config.openai_api_key, streaming=True)
tools = get_all_tools()
graph = build_graph(config, llm=llm, tools=tools)
# Init ConversationManager (Singleton)
memory = get_conversation_manager()
actual_conv_id = conversation_id or str(uuid.uuid4())
# 2. Chạy logic chính trong trace context
with propagate_attributes(session_id=actual_conv_id, user_id=user_id, tags=["canifa", "chatbot"]):
# LOAD HISTORY & Prepare State
# Get history from Postgres (returns list of dicts)
history_dicts = memory.get_chat_history(user_id, limit=10)
# Convert to BaseMessage objects
history = []
for h in reversed(history_dicts): # API returns desc, we want chronological for context
if h['is_human']:
history.append(HumanMessage(content=h['message']))
else:
history.append(AIMessage(content=h['message']))
current_human_msg = HumanMessage(content=query)
initial_state, exec_config = _prepare_execution_context(
query=query, user_id=user_id, actual_conv_id=actual_conv_id, history=history, images=images
)
final_ai_message = None
# 3. Stream Engine
try:
async for event in graph.astream(initial_state, config=exec_config, stream_mode="values"):
final_ai_message = _extract_last_ai_message(event) or final_ai_message
# Serialize messages to dicts to avoid "content='...'" string representation
if "messages" in event:
event["messages"] = [
m.dict() if hasattr(m, "dict") else m
for m in event["messages"]
]
yield f"data: {json.dumps(event, default=str, ensure_ascii=False)}\n\n"
# 4. Hậu xử lý (Lưu DB & Trace)
_handle_post_chat(
memory=memory,
user_id=user_id,
human_query=query,
ai_msg=final_ai_message,
query=query,
images_count=len(images) if images else 0,
)
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"💥 Stream error: {e}", exc_info=True)
yield f"data: {json.dumps({'error': str(e)})}\n\n"
finally:
logger.info(f"✅ Request completed for conversation {actual_conv_id}")
def _prepare_execution_context(query: str, user_id: str, actual_conv_id: str, history: list, images: list | None):
"""Tách logic chuẩn bị state và config để giảm độ phức tạp."""
initial_state: AgentState = {
"messages": [HumanMessage(content=query)],
"history": history,
"user_id": user_id,
"images": [],
"thread_id": actual_conv_id,
"image_analysis": None,
}
run_id = str(uuid.uuid4())
exec_config = RunnableConfig(
configurable={
"conversation_id": actual_conv_id,
"user_id": user_id,
"transient_images": images or [],
"run_id": run_id,
},
run_id=run_id,
callbacks=[CallbackHandler()],
)
return initial_state, exec_config
def _extract_last_ai_message(event: dict) -> AIMessage | None:
"""Trích xuất tin nhắn AI cuối cùng từ event stream."""
if event.get("messages"):
last_msg = event["messages"][-1]
if isinstance(last_msg, AIMessage):
return last_msg
return None
def _handle_post_chat(memory: ConversationManager, user_id: str, human_query: str, ai_msg: AIMessage | None, query: str, images_count: int):
"""Xử lý lưu history và update trace sau khi kết thúc stream."""
if ai_msg:
# Save User Message
memory.save_message(user_id, human_query, True)
# Save AI Message
memory.save_message(user_id, ai_msg.content, False)
logger.info(f"💾 Saved conversation for user {user_id} to Postgres")
# Update trace
try:
langfuse = get_client()
langfuse.update_current_trace(
name="canifa-chatbot-query",
input={"query": query, "images_count": images_count},
output={"response": ai_msg.content if ai_msg else None},
)
except Exception as e:
logger.warning(f"Failed to update trace: {e}")
......@@ -9,7 +9,6 @@ import logging
from typing import Any
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import HumanMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnableConfig
from langgraph.cache.memory import InMemoryCache
......@@ -29,7 +28,6 @@ logger = logging.getLogger(__name__)
class CANIFAGraph:
"""
Fashion Q&A Agent Graph Manager.
Khởi tạo tất cả resources trong __init__, các node dùng self.xxx.
"""
def __init__(
......@@ -40,55 +38,43 @@ class CANIFAGraph:
):
self.config = config or get_config()
self._compiled_graph: Any | None = None
# Dependency Injection: Ưu tiên dùng llm/tools được truyền vào
self.llm: BaseChatModel = llm or create_llm(
model_name=self.config.model_name, api_key=self.config.openai_api_key, streaming=True
)
# Phân loại tools
self.all_tools = tools or get_all_tools()
self.collection_tools = get_collection_tools() # Vẫn lấy list name để routing
# Retrieval tools are logically all tools minus collection tools (conceptually, or specific list)
# For simplicity and robust tool usage, we can bind all tools to retrieval node if needed,
# or separate them. The user code snippet uses `self.retrieval_tools` but passed `all_tools`.
# Reviewing user snippet: `workflow.add_node("retrieve_tools", ToolNode(self.retrieval_tools)...`
# But `retrieval_tools` wasn't defined in __init__ in the user snippet, likely implied.
# I'll define retrieval_tools as all tools for now or filter if strictly needed.
# Assuming all_tools are retrieval compatible except collection ones?
# Let's use all_tools for the ToolNode to be safe unless distinct behavior is needed.
self.retrieval_tools = self.all_tools
self.llm_with_tools = self.llm.bind_tools(self.all_tools)
self.system_prompt = get_system_prompt()
self.prompt_template = ChatPromptTemplate.from_messages(
[
("system", self.system_prompt),
MessagesPlaceholder(variable_name="history"),
MessagesPlaceholder(variable_name="user_query"),
MessagesPlaceholder(variable_name="messages"),
]
)
self.chain = self.prompt_template | self.llm_with_tools
self.cache = InMemoryCache()
async def _agent_node(self, state: AgentState, config: RunnableConfig) -> dict:
"""Agent node - LLM reasoning với tools và history sạch."""
"""Agent node - Chỉ việc đổ dữ liệu riêng vào khuôn đã có sẵn."""
messages = state.get("messages", [])
history = state.get("history", [])
user_query = state.get("user_query")
prompt = ChatPromptTemplate.from_messages(
[
("system", self.system_prompt),
MessagesPlaceholder(variable_name="history"), # Long-term clean history
MessagesPlaceholder(variable_name="messages"), # Current turn technical messages
]
)
# 2. Xử lý Image hint (Lấy từ Config của lượt chạy này)
transient_images = config.get("configurable", {}).get("transient_images", [])
if transient_images and messages:
# Removed image processing logic as requested
pass
# Invoke chain with user_query, history, and messages
response = await self.chain.ainvoke({
"user_query": [user_query] if user_query else [],
"history": history,
"messages": messages
})
return {"messages": [response], "ai_response": response}
# Invoke LLM
chain = prompt | self.llm_with_tools
response = await chain.ainvoke({"messages": messages, "history": history})
return {"messages": [response]}
def _should_continue(self, state: AgentState) -> str:
"""Routing: tool nodes hoặc end."""
......@@ -109,12 +95,9 @@ class CANIFAGraph:
return "retrieve_tools"
def build(self) -> Any:
"""Build và compile LangGraph workflow (Không dùng Checkpointer)."""
"""Build và compile LangGraph workflow."""
if self._compiled_graph is not None:
return self._compiled_graph
logger.info("🔨 Building LangGraph workflow (No Checkpointer)...")
workflow = StateGraph(AgentState)
# Nodes
......@@ -132,11 +115,7 @@ class CANIFAGraph:
workflow.add_edge("retrieve_tools", "agent")
workflow.add_edge("collect_tools", "agent")
# Compile WITHOUT checkpointer
self._compiled_graph = workflow.compile(cache=self.cache)
# ❌ KHÔNG ATTACH Langfuse callback vào compiled graph
# ✅ Sẽ pass callback vào runtime config của mỗi lượt chạy
self._compiled_graph = workflow.compile(cache=self.cache) # No Checkpointer
logger.info("✅ Graph compiled (Langfuse callback will be per-run)")
return self._compiled_graph
......@@ -149,7 +128,6 @@ class CANIFAGraph:
# --- Singleton & Public API ---
_instance: list[CANIFAGraph | None] = [None]
def build_graph(config: AgentConfig | None = None, llm: BaseChatModel | None = None, tools: list | None = None) -> Any:
"""Get compiled graph (singleton)."""
if _instance[0] is None:
......
# """
# Simple Memory Manager for Fashion Q&A Agent
# Tối giản hóa: Chỉ sử dụng 1 Collection 'conversations' trong MongoDB
# """
# logger = logging.getLogger(__name__)
class SimpleMemoryManager:
"""
Quản lý bộ nhớ tối giản: Lưu/Load message từ 1 collection duy nhất.
Sử dụng conversation_id làm định danh chính.
TẠM THỜI COMMENT LOGIC ĐỂ TEST CHAY
"""
def __init__(self):
# self.client = get_mongo_client()
# self.db = self.client[MONGODB_DB_NAME or "ai_law"]
# self.collection = self.db["conversations"] # Tên bảng tối giản
# self._indexes_created = False
pass
async def _ensure_indexes(self):
# if not self._indexes_created:
# # Index theo ID cuộc hội thoại và thời gian cập nhật
# await self.collection.create_index("_id")
# await self.collection.create_index("updated_at")
# self._indexes_created = True
pass
async def save_messages(
self,
conversation_id: str,
messages: list, # List[BaseMessage],
user_id: str | None = None, # Optional[str] = None
):
"""Lưu toàn bộ danh sách tin nhắn vào cuộc hội thoại."""
# try:
# await self._ensure_indexes()
# messages_dict = [self._message_to_dict(msg) for msg in messages]
# await self.collection.update_one(
# {"_id": conversation_id},
# {
# "$set": {
# "user_id": user_id,
# "messages": messages_dict,
# "updated_at": datetime.utcnow(),
# },
# "$setOnInsert": {
# "created_at": datetime.utcnow(),
# }
# },
# upsert=True
# )
# except Exception as e:
# # logger.error(f"❌ Save memory error: {e}")
# raise
pass
async def load_messages(self, conversation_id: str, limit: int = 20) -> list: # List[BaseMessage]:
"""Load N tin nhắn gần nhất của cuộc hội thoại."""
# try:
# await self._ensure_indexes()
# doc = await self.collection.find_one({"_id": conversation_id})
# if not doc or "messages" not in doc:
# return []
# msgs_dict = doc["messages"]
# # Chỉ lấy số lượng tin nhắn giới hạn để tiết kiệm token
# if limit:
# msgs_dict = msgs_dict[-limit:]
# return [self._dict_to_message(m) for m in msgs_dict]
# except Exception as e:
# # logger.error(f"❌ Load memory error: {e}")
# return []
return []
# def _message_to_dict(self, msg: BaseMessage) -> dict:
# return {
# "type": msg.__class__.__name__,
# "content": msg.content,
# "timestamp": datetime.utcnow().isoformat(),
# }
# def _dict_to_message(self, msg_dict: dict) -> BaseMessage:
# m_type = msg_dict.get("type", "HumanMessage")
# content = msg_dict.get("content", "")
# if m_type == "AIMessage": return AIMessage(content=content)
# if m_type == "SystemMessage": return SystemMessage(content=content)
# return HumanMessage(content=content)
# Singleton
_memory_manager = None
def get_memory_manager():
global _memory_manager
if _memory_manager is None:
_memory_manager = SimpleMemoryManager()
return _memory_manager
......@@ -10,24 +10,21 @@ import config as global_config
class QueryRequest(BaseModel):
"""API Request model cho Fashion Q&A Chat"""
query: str
history: list[BaseMessage] | None = None
model_name: str = global_config.DEFAULT_MODEL
user_id: str | None = None
user_query: str
images: list[str] | None = None
image_analysis: dict[str, Any] | None = None
conversation_id: str | None = None
class AgentState(TypedDict):
"""Trạng thái của Agent trong LangGraph."""
messages: Annotated[list[BaseMessage], add_messages]
history: list[BaseMessage] # Conversation history sạch (Human + AI)
user_query: BaseMessage
history: list[BaseMessage]
user_id: str | None
images: list[str] | None
image_analysis: dict[str, Any] | None
thread_id: str | None
ai_response: BaseMessage | None
images_embedding: list[str] | None
messages: Annotated[list[BaseMessage], add_messages]
class AgentConfig:
......
This diff is collapsed.
......@@ -13,6 +13,8 @@ from pydantic import BaseModel, Field
from agent.tools.product_search_helpers import build_starrocks_query
from common.starrocks_connection import StarRocksConnection
from langsmith import traceable
logger = logging.getLogger(__name__)
......@@ -52,8 +54,6 @@ class SearchParams(BaseModel):
action: str = Field("search", description="Hành động: 'search' (tìm kiếm) hoặc 'visual_search' (phân tích ảnh)")
from langsmith import traceable
@tool(args_schema=SearchParams)
@traceable(run_type="tool", name="data_retrieval_tool")
async def data_retrieval_tool(
......@@ -76,29 +76,62 @@ async def data_retrieval_tool(
price_max: float | None = None,
) -> str:
"""
Tìm kiếm sản phẩm trong database của CANIFA sử dụng tìm kiếm ngữ nghĩa (Semantic), từ khóa (Keywords) hoặc các bộ lọc thuộc tính.
Cơ chế hoạt động (Hybrid Search):
- Nếu có 'query': Hệ thống sẽ tạo vector embedding và tìm kiếm theo độ tương đồng ngữ nghĩa.
- Nếu có 'keywords' hoặc các thuộc tính khác: Hệ thống sẽ tạo các câu lệnh SQL WHERE để lọc chính xác kết quả.
- Kết hợp cả hai để mang lại kết quả tối ưu nhất.
Ví dụ sử dụng (Examples):
1. Tìm kiếm theo ý định chung:
User: "Tìm cho mình một bộ đồ đi biển mát mẻ"
Tool call: data_retrieval_tool(query="bộ đồ đi biển mát mẻ", gender_by_product="Female")
2. Tìm chính xác theo loại sản phẩm và giá:
User: "Áo polo nam dưới 400k"
Tool call: data_retrieval_tool(keywords="áo polo", gender_by_product="Male", price_max=400000)
3. Tìm theo mã sản phẩm cụ thể:
User: "Check sản phẩm 8TS24W001"
Tool call: data_retrieval_tool(internal_ref_code="8TS24W001")
4. Kết hợp tìm kiếm sâu:
User: "Áo khoác len mùa đông cho bé trai từ 200k đến 500k"
Tool call: data_retrieval_tool(query="áo khoác len ấm áp", material_group="Len", age_by_product="Kids", price_min=200000, price_max=500000)
Tìm kiếm sản phẩm CANIFA - Phân biệt rõ giữa Semantic Search và Metadata Filter.
⚠️ QUAN TRỌNG - KHI NÀO DÙNG GÌ:
1️⃣ DÙNG 'query' (Semantic Search - Vector Embedding):
✅ Khi user hỏi về MỤC ĐÍCH, BỐI CẢNH, PHONG CÁCH SỐNG
✅ Câu hỏi trừu tượng, không có từ khóa sản phẩm rõ ràng
✅ Ví dụ:
- "Tìm đồ đi biển mát mẻ" → query="đồ đi biển mát mẻ"
- "Quần áo cho buổi hẹn hò" → query="trang phục hẹn hò lịch sự"
- "Đồ mặc dự tiệc sang trọng" → query="trang phục dự tiệc sang trọng"
- "Outfit cho mùa đông ấm áp" → query="trang phục mùa đông ấm áp"
2️⃣ DÙNG 'keywords' + METADATA FILTERS (Exact Match):
✅ Khi user hỏi về THUỘC TÍNH CỤ THỂ của sản phẩm
✅ Có TÊN SẢN PHẨM rõ ràng (áo polo, quần jean, váy liền,...)
✅ Có GIÁ, MÀU SẮC, SIZE, MÃ SẢN PHẨM
✅ Ví dụ:
- "Áo polo nam" → keywords="áo polo", gender_by_product="male"
- "Quần jean nữ dưới 500k" → keywords="quần jean", gender_by_product="female", price_max=500000
- "Áo thun đen giá rẻ" → keywords="áo thun", master_color="Đen", price_max=200000
- "Sản phẩm 8TS24W001" → internal_ref_code="8TS24W001"
- "Váy liền cho bé gái màu hồng" → keywords="váy liền", gender_by_product="female", age_by_product="others", master_color="Hồng"
🚫 KHÔNG BAO GIỜ DÙNG 'query' CHO:
- Câu hỏi về GIÁ (dưới 400k, từ 200k-500k, giá rẻ,...)
- Câu hỏi về MÀU SẮC cụ thể (đen, trắng, đỏ,...)
- Câu hỏi về TÊN SẢN PHẨM (áo polo, quần jean, váy liền,...)
- Câu hỏi về MÃ SẢN PHẨM (8TS24W001, 1DS24C015,...)
💡 KẾT HỢP CẢ HAI (Hybrid):
Chỉ dùng khi câu hỏi vừa có BỐI CẢNH trừu tượng, vừa có THUỘC TÍNH cụ thể:
- "Tìm áo khoác ấm áp cho mùa đông, giá dưới 1 triệu"
→ query="áo khoác ấm áp mùa đông", price_max=1000000
📝 VÍ DỤ CHI TIẾT:
Example 1 - Semantic Search (MỤC ĐÍCH):
User: "Tìm đồ đi làm chuyên nghiệp"
Tool: data_retrieval_tool(query="trang phục công sở chuyên nghiệp")
Example 2 - Metadata Filter (THUỘC TÍNH):
User: "Cho tôi xem áo polo nam dưới 400k"
Tool: data_retrieval_tool(keywords="áo polo", gender_by_product="male", price_max=400000)
Example 3 - Metadata Only (GIÁ + MÀU):
User: "Quần short đen giá rẻ"
Tool: data_retrieval_tool(keywords="quần short", master_color="Đen", price_max=300000)
Example 4 - Exact Match (MÃ SẢN PHẨM):
User: "Cho tôi thông tin sản phẩm 8TS24W001"
Tool: data_retrieval_tool(internal_ref_code="8TS24W001")
Example 5 - Hybrid (BỐI CẢNH + FILTER):
User: "Tìm áo khoác ấm cho mùa đông, cho bé trai, từ 200k-500k"
Tool: data_retrieval_tool(query="áo khoác ấm áp mùa đông", age_by_product="others", gender_by_product="male", price_min=200000, price_max=500000)
"""
try:
# 1. Log & Prepare Params
......@@ -128,6 +161,7 @@ async def data_retrieval_tool(
query_vector = None
if query:
from common.embedding_service import create_embedding_async
query_vector = await create_embedding_async(query)
# 3. Execute Search (Async)
......
......@@ -4,12 +4,9 @@ FastAPI endpoints cho Fashion Q&A Agent service.
Router chỉ chứa định nghĩa API, logic nằm ở controller.
"""
import json
import logging
from collections.abc import AsyncGenerator
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import StreamingResponse
from fastapi import APIRouter, BackgroundTasks, HTTPException
from agent.controller import chat_controller
from agent.models import QueryRequest
......@@ -20,103 +17,84 @@ router = APIRouter()
@router.post("/chat", summary="Fashion Q&A Chat (Non-streaming)")
async def fashion_qa_chat(req: QueryRequest, request: Request):
async def fashion_qa_chat(req: QueryRequest, background_tasks: BackgroundTasks):
"""
Endpoint chat không stream - trả về response JSON đầy đủ một lần.
"""
# Trích xuất user_id từ request (auth middleware)
user_id = getattr(request.state, "user_id", None) or req.user_id or "default_user"
user_id = req.user_id or "default_user"
logger.info(f"📥 [Incoming Query - NonStream] User: {user_id} | Query: {req.query}")
logger.info(f"📥 [Incoming Query - NonStream] User: {user_id} | Query: {req.user_query}")
try:
# Gọi controller để xử lý logic và nhận generator stream
# Note: Vì chat_controller có decorator @observe(), cần await để unwrap
generator: AsyncGenerator[str, None] = chat_controller(
query=req.query,
# Gọi controller để xử lý logic (Non-streaming)
result = await chat_controller(
query=req.user_query,
user_id=user_id,
background_tasks=background_tasks,
model_name=DEFAULT_MODEL,
conversation_id=req.conversation_id,
images=req.images,
)
# Collect toàn bộ events từ generator
final_response = None
async for chunk in generator:
# Parse SSE data format
if chunk.startswith("data: "):
data_str = chunk[6:].strip()
if data_str != "[DONE]":
final_response = json.loads(data_str)
# Trả về response cuối cùng
if final_response and "messages" in final_response:
last_message = final_response["messages"][-1]
response_text = last_message.get("content", "") if isinstance(last_message, dict) else str(last_message)
logger.info(f"📤 [Outgoing Response - NonStream] User: {user_id} | Response: {response_text}")
logger.info(f"📤 [Outgoing Response - NonStream] User: {user_id}")
return {
"status": "success",
"response": response_text,
"conversation_id": req.conversation_id,
"ai_response": result["ai_response"],
"product_ids": result.get("product_ids", []),
}
return {"status": "error", "message": "No response generated"}
except Exception as e:
logger.error(f"Error in fashion_qa_chat: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
# ====================== FASHION Q&A CHAT API ======================
@router.post("/stream/chat", summary="Fashion Q&A Chat with Streaming Response")
async def fashion_qa_chat_stream(req: QueryRequest, request: Request):
"""
Endpoint duy nhất cho việc chat với Fashion Agent.
"""
# Trích xuất user_id từ request (auth middleware)
user_id = getattr(request.state, "user_id", None) or req.user_id or "default_user"
logger.info(f"📥 [Incoming Query] User: {user_id} | Query: {req.query}")
try:
# Gọi controller để xử lý logic và nhận generator stream
# Note: Vì chat_controller có decorator @observe(), cần await để unwrap
generator: AsyncGenerator[str, None] = chat_controller(
query=req.query,
user_id=user_id,
model_name=DEFAULT_MODEL,
conversation_id=req.conversation_id,
images=req.images,
)
async def logging_generator(gen: AsyncGenerator[str, None]):
full_response_log = ""
first_chunk = True
try:
async for chunk in gen:
if first_chunk:
logger.info("🚀 [Stream Started] First chunk received")
first_chunk = False
full_response_log += chunk
yield chunk
except Exception as e:
logger.error(f"❌ [Stream Error] {e}")
yield f"data: {json.dumps({'error': str(e)})}\n\n"
logger.info(f"📤 [Outgoing Response Stream Finished] Total Chunks Length: {len(full_response_log)}")
return StreamingResponse(
logging_generator(generator),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
except Exception as e:
logger.error(f"Error in fashion_qa_chat: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
# @router.post("/stream/chat", summary="Fashion Q&A Chat with Streaming Response")
# async def fashion_qa_chat_stream(req: QueryRequest, request: Request):
# """
# Endpoint duy nhất cho việc chat với Fashion Agent.
# """
# # Trích xuất user_id từ request (auth middleware)
# user_id = getattr(request.state, "user_id", None) or req.user_id or "default_user"
# logger.info(f"📥 [Incoming Query] User: {user_id} | Query: {req.query}")
# try:
# # Gọi controller để xử lý logic và nhận generator stream
# # Note: Vì chat_controller có decorator @observe(), cần await để unwrap
# generator: AsyncGenerator[str, None] = chat_controller(
# query=req.query,
# user_id=user_id,
# model_name=DEFAULT_MODEL,
# conversation_id=req.conversation_id,
# images=req.images,
# )
# async def logging_generator(gen: AsyncGenerator[str, None]):
# full_response_log = ""
# first_chunk = True
# try:
# async for chunk in gen:
# if first_chunk:
# logger.info("🚀 [Stream Started] First chunk received")
# first_chunk = False
# full_response_log += chunk
# yield chunk
# except Exception as e:
# logger.error(f"❌ [Stream Error] {e}")
# yield f"data: {json.dumps({'error': str(e)})}\n\n"
# logger.info(f"📤 [Outgoing Response Stream Finished] Total Chunks Length: {len(full_response_log)}")
# return StreamingResponse(
# logging_generator(generator),
# media_type="text/event-stream",
# headers={
# "Cache-Control": "no-cache",
# "Connection": "keep-alive",
# "X-Accel-Buffering": "no",
# },
# )
# except Exception as e:
# logger.error(f"Error in fashion_qa_chat: {e}", exc_info=True)
# raise HTTPException(status_code=500, detail=str(e)) from e
from fastapi import APIRouter, HTTPException
from typing import List, Dict, Any, Optional
from pydantic import BaseModel
import logging
from typing import Any
from fastapi import APIRouter
from pydantic import BaseModel
from common.conversation_manager import get_conversation_manager
router = APIRouter(tags=["Conservation"])
logger = logging.getLogger(__name__)
class ChatMessage(BaseModel):
id: int
user_id: str | None = None # Optional usually not needed in list but good for consistency
......@@ -15,32 +17,29 @@ class ChatMessage(BaseModel):
is_human: bool
timestamp: Any
class ChatHistoryResponse(BaseModel):
data: List[Dict[str, Any]]
next_cursor: Optional[int] = None
data: list[dict[str, Any]]
next_cursor: int | None = None
@router.get("/history/{user_id}", summary="Get Chat History by User ID", response_model=ChatHistoryResponse)
async def get_chat_history(user_id: str, limit: Optional[int] = 20, before_id: Optional[int] = None):
async def get_chat_history(user_id: str, limit: int | None = 50, before_id: int | None = None):
"""
Lấy lịch sử chat của user từ Postgres database.
Trả về object chứa `data` (list messages) và `next_cursor` để dùng cho trang tiếp theo.
"""
try:
# Sử dụng ConversationManager Singleton
manager = get_conversation_manager()
manager = await get_conversation_manager()
# Lấy history từ DB với pagination
history = manager.get_chat_history(user_id, limit=limit, before_id=before_id)
# Lấy history từ DB
history = await manager.get_chat_history(user_id, limit=limit, before_id=before_id)
next_cursor = None
if history and len(history) > 0:
# Cursor cho trang tiếp theo chính là ID của tin nhắn cuối cùng (cũ nhất trong batch này)
next_cursor = history[-1]['id']
next_cursor = history[-1]["id"]
return {
"data": history,
"next_cursor": next_cursor
}
return {"data": history, "next_cursor": next_cursor}
except Exception as e:
logger.error(f"Error fetching chat history for user {user_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
import logging
import psycopg2
from typing import List, Dict, Optional, Any
from datetime import datetime
from config import CHECKPOINT_POSTGRES_URL
from typing import Any
logger = logging.getLogger(__name__)
from psycopg_pool import AsyncConnectionPool
class DatabaseConnection:
def __init__(self, connection_url: str):
self.connection_url = connection_url
self.conn = None
from config import CHECKPOINT_POSTGRES_URL
def connect(self):
if not self.conn or self.conn.closed:
self.conn = psycopg2.connect(self.connection_url)
return self.conn
logger = logging.getLogger(__name__)
def close(self):
if self.conn and not self.conn.closed:
self.conn.close()
class ConversationManager:
def __init__(self, connection_url: str = CHECKPOINT_POSTGRES_URL, table_name: str = "langgraph_chat_histories"):
self.db = DatabaseConnection(connection_url)
def __init__(
self,
connection_url: str = CHECKPOINT_POSTGRES_URL,
table_name: str = "langgraph_chat_histories",
):
self.connection_url = connection_url
self.table_name = table_name
self._pool: AsyncConnectionPool | None = None
async def _get_pool(self) -> AsyncConnectionPool:
"""Get or create async connection pool."""
if self._pool is None:
self._pool = AsyncConnectionPool(self.connection_url, open=False)
await self._pool.open()
return self._pool
def initialize_table(self):
async def initialize_table(self):
"""Create the chat history table if it doesn't exist"""
try:
conn = self.db.connect()
with conn.cursor() as cursor:
cursor.execute(f"""
pool = await self._get_pool()
async with pool.connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {self.table_name} (
id SERIAL PRIMARY KEY,
user_id VARCHAR(255) NOT NULL,
......@@ -40,36 +42,48 @@ class ConversationManager:
)
""")
# Create index
cursor.execute(f"""
await cursor.execute(f"""
CREATE INDEX IF NOT EXISTS idx_{self.table_name}_user_timestamp
ON {self.table_name} (user_id, timestamp)
""")
conn.commit()
logger.info(f"Table {self.table_name} initialized successfully")
await conn.commit()
logger.info(f"Table {self.table_name} initialized successfully")
except Exception as e:
logger.error(f"Error initializing table: {e}")
raise e
raise
def save_message(self, user_id: str, message: str, is_human: bool):
"""Save a message to the chat history"""
async def save_conversation_turn(self, user_id: str, human_message: str, ai_message: str):
"""Save both human and AI messages in a single atomic transaction."""
try:
conn = self.db.connect()
with conn.cursor() as cursor:
cursor.execute(
pool = await self._get_pool()
timestamp = datetime.now()
async with pool.connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
f"""INSERT INTO {self.table_name} (user_id, message, is_human, timestamp)
VALUES (%s, %s, %s, %s)""",
(user_id, message, is_human, datetime.now())
VALUES (%s, %s, %s, %s), (%s, %s, %s, %s)""",
(
user_id,
human_message,
True,
timestamp,
user_id,
ai_message,
False,
timestamp,
),
)
conn.commit()
logger.debug(f"💾 Saved message for user {user_id}: {message[:50]}...")
await conn.commit()
logger.debug(f"Saved conversation turn for user {user_id}")
except Exception as e:
logger.error(f"Error saving message: {e}")
logger.error(f"Failed to save conversation for user {user_id}: {e}", exc_info=True)
raise
def get_chat_history(self, user_id: str, limit: Optional[int] = None, before_id: Optional[int] = None) -> List[Dict[str, Any]]:
"""Retrieve chat history for a user using cursor-based pagination (before_id)"""
async def get_chat_history(
self, user_id: str, limit: int | None = None, before_id: int | None = None
) -> list[dict[str, Any]]:
"""Retrieve chat history for a user using cursor-based pagination."""
try:
# Base query
query = f"""
SELECT message, is_human, timestamp, id
FROM {self.table_name}
......@@ -77,29 +91,27 @@ class ConversationManager:
"""
params = [user_id]
# Add cursor condition if provided
if before_id:
query += " AND id < %s"
params.append(before_id)
# Order by id DESC (ensures strict chronological consistency with ID cursor)
query += " ORDER BY id DESC"
if limit:
query += " LIMIT %s"
params.append(limit)
conn = self.db.connect()
with conn.cursor() as cursor:
cursor.execute(query, tuple(params))
results = cursor.fetchall()
pool = await self._get_pool()
async with pool.connection() as conn, conn.cursor() as cursor:
await cursor.execute(query, tuple(params))
results = await cursor.fetchall()
return [
{
'message': row[0],
'is_human': row[1],
'timestamp': row[2],
'id': row[3]
"message": row[0],
"is_human": row[1],
"timestamp": row[2],
"id": row[3],
}
for row in results
]
......@@ -107,37 +119,44 @@ class ConversationManager:
logger.error(f"Error retrieving chat history: {e}")
return []
def clear_history(self, user_id: str):
async def clear_history(self, user_id: str):
"""Clear all chat history for a user"""
try:
conn = self.db.connect()
with conn.cursor() as cursor:
cursor.execute(f"DELETE FROM {self.table_name} WHERE user_id = %s", (user_id,))
conn.commit()
logger.info(f"🗑️ Cleared chat history for user {user_id}")
pool = await self._get_pool()
async with pool.connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute(f"DELETE FROM {self.table_name} WHERE user_id = %s", (user_id,))
await conn.commit()
logger.info(f"Cleared chat history for user {user_id}")
except Exception as e:
logger.error(f"Error clearing chat history: {e}")
def get_user_count(self) -> int:
async def get_user_count(self) -> int:
"""Get total number of unique users"""
try:
conn = self.db.connect()
with conn.cursor() as cursor:
cursor.execute(f"SELECT COUNT(DISTINCT user_id) FROM {self.table_name}")
result = cursor.fetchone()
pool = await self._get_pool()
async with pool.connection() as conn, conn.cursor() as cursor:
await cursor.execute(f"SELECT COUNT(DISTINCT user_id) FROM {self.table_name}")
result = await cursor.fetchone()
return result[0] if result else 0
except Exception as e:
logger.error(f"Error getting user count: {e}")
return 0
async def close(self):
"""Close the connection pool"""
if self._pool:
await self._pool.close()
# --- Singleton ---
_instance: Optional[ConversationManager] = None
_instance: ConversationManager | None = None
def get_conversation_manager() -> ConversationManager:
"""Get or create generic ConversationManager singleton"""
async def get_conversation_manager() -> ConversationManager:
"""Get or create async ConversationManager singleton"""
global _instance
if _instance is None:
_instance = ConversationManager()
# Initialize table on first creation
_instance.initialize_table()
await _instance.initialize_table()
return _instance
"""
LLM Factory - Centralized LLM creation for OpenAI & Gemini.
Quản lý việc khởi tạo và caching các LLM models, tự động nhận diện provider.
Manages initialization and caching of LLM models with automatic provider detection.
"""
import contextlib
import logging
from typing import cast
from langchain_core.language_models import BaseChatModel
from langchain_openai import ChatOpenAI
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from config import GOOGLE_API_KEY, OPENAI_API_KEY
......@@ -17,18 +16,15 @@ logger = logging.getLogger(__name__)
class LLMFactory:
"""
Singleton Class quản lý việc khởi tạo và caching các LLM Models.
"""
"""Singleton factory for managing LLM instances with caching and provider auto-detection."""
COMMON_MODELS: list[str] = [
"gpt-4o-mini",
"gemini-2.0-flash-lite-preview-02-05",
"gemini-1.5-flash",
]
def __init__(self):
# Cache dict: Key=(model_name, streaming, json_mode, api_key), Value=LLM Instance
"""Initialize LLM factory with empty cache."""
self._cache: dict[tuple[str, bool, bool, str | None], BaseChatModel] = {}
def get_model(
......@@ -39,109 +35,148 @@ class LLMFactory:
api_key: str | None = None,
) -> BaseChatModel:
"""
Lấy LLM instance từ cache hoặc tạo mới.
Get or create an LLM instance from cache.
Args:
model_name: Model identifier (e.g., "gpt-4o-mini", "gemini-2.0-flash-lite-preview-02-05")
streaming: Enable streaming responses
json_mode: Enable JSON output format
api_key: Optional API key override
Returns:
Configured LLM instance
"""
# Clean model name
clean_model = model_name.split("/")[-1] if "/" in model_name else model_name
cache_key = (clean_model, streaming, json_mode, api_key)
# 1. Hit Cache
if cache_key in self._cache:
logger.debug(f"♻️ Using cached model: {clean_model}")
return self._cache[cache_key]
# 2. Miss Cache -> Create New
return self._create_new_instance(clean_model, streaming, json_mode, api_key)
logger.info(f"Creating new LLM instance: {clean_model}")
return self._create_instance(clean_model, streaming, json_mode, api_key)
def _create_new_instance(
def _create_instance(
self,
model_name: str,
streaming: bool = False,
json_mode: bool = False,
api_key: str | None = None,
) -> BaseChatModel:
"""Khởi tạo LLM instance dựa trên tên model"""
"""
Create and cache a new LLM instance based on model name.
Args:
model_name: Clean model identifier
streaming: Enable streaming
json_mode: Enable JSON mode
api_key: Optional API key override
Returns:
Configured LLM instance
Raises:
ValueError: If required API keys are missing
"""
try:
# 1. Nhận diện Gemini
if "gemini" in model_name.lower():
effective_key = api_key or GOOGLE_API_KEY
if not effective_key:
raise ValueError("GOOGLE_API_KEY is missing for Gemini model")
if self._is_gemini_model(model_name):
llm = self._create_gemini(model_name, streaming, api_key)
else:
llm = self._create_openai(model_name, streaming, api_key)
if json_mode:
llm = self._enable_json_mode(llm, model_name)
cache_key = (model_name, streaming, json_mode, api_key)
self._cache[cache_key] = llm
return llm
except Exception as e:
logger.error(f"❌ Failed to create model {model_name}: {e}")
raise
def _is_gemini_model(self, model_name: str) -> bool:
"""Check if model name is a Gemini model."""
return "gemini" in model_name.lower()
def _create_gemini(self, model_name: str, streaming: bool, api_key: str | None) -> BaseChatModel:
"""Create Gemini model instance."""
key = api_key or GOOGLE_API_KEY
if not key:
raise ValueError("GOOGLE_API_KEY is required for Gemini models")
llm = ChatGoogleGenerativeAI(
model=model_name,
streaming=streaming,
google_api_key=effective_key,
temperature=0
google_api_key=key,
temperature=0,
)
logger.info(f"✨ Created Gemini model: {model_name}")
logger.info(f"✨ Created Gemini: {model_name}")
return llm
# 2. Nhận diện OpenAI (hoặc mặc định)
else:
effective_key = api_key or OPENAI_API_KEY
if not effective_key:
# Nếu không có OpenAI key, thử dùng Gemini làm fallback cuối cùng
def _create_openai(self, model_name: str, streaming: bool, api_key: str | None) -> BaseChatModel:
"""Create OpenAI model instance with fallback to Gemini if needed."""
key = api_key or OPENAI_API_KEY
if not key:
logger.warning("⚠️ No OpenAI key, attempting Gemini fallback")
if GOOGLE_API_KEY:
logger.warning(f"⚠️ No OpenAI key found, falling back to Gemini for {model_name}")
llm = ChatGoogleGenerativeAI(
model="gemini-1.5-flash",
streaming=streaming,
google_api_key=GOOGLE_API_KEY,
temperature=0
)
else:
raise ValueError("Neither OPENAI_API_KEY nor GOOGLE_API_KEY is available.")
else:
# Khởi tạo OpenAI
# Lưu ý: gpt-5-nano nếu không tồn tại sẽ bị lỗi từ phía OpenAI API
return self._create_gemini("gemini-1.5-flash", streaming, GOOGLE_API_KEY)
raise ValueError("Neither OPENAI_API_KEY nor GOOGLE_API_KEY is available")
llm = ChatOpenAI(
model=model_name,
streaming=streaming,
api_key=effective_key,
temperature=0
api_key=key,
temperature=0,
)
logger.info(f"✅ Created OpenAI model: {model_name}")
logger.info(f"✅ Created OpenAI: {model_name}")
return llm
# Apply JSON mode nếu cần
if json_mode:
def _enable_json_mode(self, llm: BaseChatModel, model_name: str) -> BaseChatModel:
"""Enable JSON mode for the LLM."""
try:
llm = llm.bind(response_format={"type": "json_object"})
logger.debug(f"⚙️ Enabled JSON Mode for {model_name}")
except Exception as ex:
logger.warning(f"⚠️ Failed to bind JSON mode: {ex}")
# Lưu vào cache
cache_key = (model_name, streaming, json_mode, api_key)
self._cache[cache_key] = cast(BaseChatModel, llm)
return self._cache[cache_key]
logger.debug(f"⚙️ JSON mode enabled for {model_name}")
except Exception as e:
logger.error(f"❌ Failed to create model {model_name}: {e}")
raise
logger.warning(f"⚠️ JSON mode not supported: {e}")
return llm
def initialize(self, skip_warmup: bool = True) -> None:
"""
Pre-initialize common models.
def initialize(self, skip_warmup: bool = True):
"""Pre-initialize common models"""
Args:
skip_warmup: Skip initialization if True
"""
if skip_warmup or self._cache:
return
logger.info("🔥 Warming up LLM Factory...")
for name in self.COMMON_MODELS:
for model_name in self.COMMON_MODELS:
with contextlib.suppress(Exception):
self.get_model(name, streaming=True)
self.get_model(model_name, streaming=True)
# --- Singleton Instance & Public API ---
_factory = LLMFactory()
def create_llm(model_name: str, streaming: bool = True, json_mode: bool = False, api_key: str | None = None):
def create_llm(
model_name: str,
streaming: bool = True,
json_mode: bool = False,
api_key: str | None = None,
) -> BaseChatModel:
"""Create or get cached LLM instance."""
return _factory.get_model(model_name, streaming=streaming, json_mode=json_mode, api_key=api_key)
def init_llm_factory(skip_warmup: bool = True):
def init_llm_factory(skip_warmup: bool = True) -> None:
"""Initialize the LLM factory."""
_factory.initialize(skip_warmup)
def create_embedding_model():
"""Helper để tạo embedding model (OpenAI focus)"""
from langchain_openai import OpenAIEmbeddings
from config import OPENAI_API_KEY
def create_embedding_model() -> OpenAIEmbeddings:
"""Create OpenAI embeddings model."""
return OpenAIEmbeddings(model="text-embedding-3-small", api_key=OPENAI_API_KEY)
......@@ -4,6 +4,7 @@ Lấy giá trị từ file .env qua os.getenv
"""
import os
from dotenv import load_dotenv
# Load environment variables from .env file
......@@ -66,7 +67,8 @@ OPENAI_API_KEY: str | None = os.getenv("OPENAI_API_KEY")
GOOGLE_API_KEY: str | None = os.getenv("GOOGLE_API_KEY")
GROQ_API_KEY: str | None = os.getenv("GROQ_API_KEY")
DEFAULT_MODEL: str = os.getenv("DEFAULT_MODEL", "gemini-2.0-flash-lite-preview-02-05")
DEFAULT_MODEL: str = os.getenv("DEFAULT_MODEL", "gpt-5-nano")
# DEFAULT_MODEL: str = os.getenv("DEFAULT_MODEL")
# ====================== JWT CONFIGURATION ======================
JWT_SECRET: str | None = os.getenv("JWT_SECRET")
......
......@@ -6,12 +6,12 @@ services:
build: .
container_name: canifa_backend
ports:
- "5000:5000"
- "8000:8000"
volumes:
- .:/app # Mount code để hot-reload khi dev (tuỳ chọn)
- .:/app
environment:
- PORT=8000
- CHECKPOINT_POSTGRES_URL=postgresql://postgres:password@postgres_db:5432/canifa_chat
# Các biến môi trường khác bro có thể thêm vào đây hoặc dùng file .env
- OPENAI_API_KEY=${OPENAI_API_KEY}
- LANGFUSE_PUBLIC_KEY=${LANGFUSE_PUBLIC_KEY}
- LANGFUSE_SECRET_KEY=${LANGFUSE_SECRET_KEY}
......@@ -24,6 +24,40 @@ services:
depends_on:
- postgres_db
restart: unless-stopped
logging:
driver: "json-file"
options:
tag: "{{.Name}}"
# --- Monitoring Stack (Loki + Promtail + Grafana) ---
loki:
image: grafana/loki:2.9.0
container_name: canifa_loki
ports:
- "3100:3100"
command: -config.file=/etc/loki/local-config.yaml
restart: unless-stopped
promtail:
image: grafana/promtail:2.9.0
container_name: canifa_promtail
volumes:
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- /var/run/docker.sock:/var/run/docker.sock
- ./promtail-config.yaml:/etc/promtail/config.yml
command: -config.file=/etc/promtail/config.yml
restart: unless-stopped
grafana:
image: grafana/grafana:10.1.0
container_name: canifa_grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin # Đổi pass khi lên prod nhé bro
depends_on:
- loki
restart: unless-stopped
# --- Database Service (Postgres) ---
postgres_db:
......
server {
listen 80;
server_name _; #bot ip server
# Log files
access_log /var/log/nginx/chatbot_access.log;
error_log /var/log/nginx/chatbot_error.log;
location /chat {
# allow 1.2.3.4;
# deny all;
proxy_pass http://127.0.0.1:5000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_read_timeout 300s;
proxy_connect_timeout 300s;
proxy_send_timeout 300s;
}
# endpoit for history
location /history {
# allow 1.2.3.4;
# deny all;
proxy_pass http://127.0.0.1:5000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
location / {
proxy_pass http://127.0.0.1:8000;
}
}
server:
http_listen_port: 9080
grpc_listen_port: 0
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: docker
docker_sd_configs:
- host: unix:///var/run/docker.sock
refresh_interval: 5s
relabel_configs:
- source_labels: ['__meta_docker_container_name']
regex: '/(.*)'
target_label: 'container'
.\.venv\Scripts\activate
\ No newline at end of file
# CANIFA Chatbot API Documentation
API hệ thống chatbot tư vấn thời trang CANIFA - CiCi Assistant.
## Base URL
```
http://localhost:8000
```
---
## API Endpoints
### 1. Chat với Chatbot
**Endpoint:** `POST /chat`
**Mô tả:** Gửi tin nhắn tới chatbot và nhận phản hồi tư vấn thời trang cùng danh sách sản phẩm liên quan.
#### Request Body
```json
{
"user_id": "string",
"user_query": "string"
}
```
**Parameters:**
| Field | Type | Required | Mô tả |
|-------|------|----------|-------|
| `user_id` | string | ✅ | ID định danh người dùng (dùng để lưu lịch sử chat) |
| `user_query` | string | ✅ | Nội dung tin nhắn của người dùng |
**Ví dụ Request:**
```json
{
"user_id": "user_12345",
"user_query": "Cho em xem áo sơ mi nam dưới 500k"
}
```
#### Response
**Success Response (200 OK):**
```json
{
"status": "success",
"ai_response": "string",
"product_ids": ["string"]
}
```
**Response Fields:**
| Field | Type | Mô tả |
|-------|------|-------|
| `status` | string | Trạng thái xử lý request (`"success"` hoặc `"error"`) |
| `ai_response` | string | Câu trả lời của chatbot (văn bản tư vấn) |
| `product_ids` | array[string] | Danh sách mã sản phẩm được đề xuất (internal_ref_code) |
**Ví dụ Response:**
```json
{
"status": "success",
"ai_response": "Em chào anh! Em đã tìm thấy một số mẫu áo sơ mi nam đẹp trong tầm giá dưới 500k:\n\n1. Áo Sơ Mi Nam Cotton - 399.000đ\n2. Áo Sơ Mi Slim Fit - 449.000đ\n\nCác sản phẩm này đều là chất liệu cotton thoáng mát, phù hợp cho mùa hè ạ!",
"product_ids": ["SM12345", "SM12346"]
}
```
**Error Response (500 Internal Server Error):**
```json
{
"status": "error",
"ai_response": "Xin lỗi, đã có lỗi xảy ra. Vui lòng thử lại sau.",
"product_ids": []
}
```
---
### 2. Lấy Lịch Sử Chat
**Endpoint:** `GET /history/{user_id}`
**Mô tả:** Lấy lịch sử chat của người dùng với phân trang cursor-based.
#### Path Parameters
| Parameter | Type | Required | Mô tả |
|-----------|------|----------|-------|
| `user_id` | string | ✅ | ID người dùng cần lấy lịch sử |
#### Query Parameters
| Parameter | Type | Required | Default | Mô tả |
|-----------|------|----------|---------|-------|
| `limit` | integer | ❌ | 50 | Số lượng tin nhắn tối đa mỗi trang (1-100) |
| `before_id` | integer | ❌ | null | ID của tin nhắn để lấy các tin nhắn trước đó (dùng cho phân trang) |
**Ví dụ Request:**
```
GET /history/user_12345?limit=20&before_id=150
```
#### Response
**Success Response (200 OK):**
```json
{
"data": [
{
"id": 149,
"user_id": "user_12345",
"message": "Cho em xem áo sơ mi nam",
"is_human": true,
"timestamp": "2025-12-25T14:30:00"
},
{
"id": 148,
"user_id": "user_12345",
"message": "Em đã tìm thấy một số mẫu áo sơ mi nam đẹp...",
"is_human": false,
"timestamp": "2025-12-25T14:30:02"
}
],
"next_cursor": 130
}
```
**Response Fields:**
| Field | Type | Mô tả |
|-------|------|-------|
| `data` | array[object] | Danh sách tin nhắn chat (sắp xếp từ mới → cũ) |
| `data[].id` | integer | ID duy nhất của tin nhắn |
| `data[].user_id` | string | ID người dùng |
| `data[].message` | string | Nội dung tin nhắn |
| `data[].is_human` | boolean | `true` = tin nhắn của người dùng, `false` = tin nhắn của bot |
| `data[].timestamp` | string | Thời gian gửi tin nhắn (ISO 8601 format) |
| `next_cursor` | integer \| null | ID của tin nhắn cuối cùng (dùng làm `before_id` cho request tiếp theo). `null` nếu hết dữ liệu |
---
## Phân Trang (Pagination)
API sử dụng **cursor-based pagination** để lấy lịch sử chat:
### Cách hoạt động:
1. **Request đầu tiên** - Lấy 20 tin nhắn mới nhất:
```
GET /history/user_12345?limit=20
```
Response:
```json
{
"data": [...], // 20 tin nhắn (ID: 200 → 181)
"next_cursor": 181
}
```
2. **Request tiếp theo** - Lấy 20 tin nhắn cũ hơn:
```
GET /history/user_12345?limit=20&before_id=181
```
Response:
```json
{
"data": [...], // 20 tin nhắn (ID: 180 → 161)
"next_cursor": 161
}
```
3. **Request cuối cùng** - Khi hết dữ liệu:
```json
{
"data": [...], // 5 tin nhắn còn lại
"next_cursor": null
}
```
### Logic phân trang:
- `next_cursor` luôn là **ID của tin nhắn cuối cùng** trong `data`
- Dùng `next_cursor` làm `before_id` cho request tiếp theo
- Khi `next_cursor = null` → đã hết dữ liệu
---
## Chat Workflow
```mermaid
graph LR
A[User gửi message] --> B[POST /chat]
B --> C{Agent xử lý}
C --> D[Tìm kiếm sản phẩm]
C --> E[Trả lời tư vấn]
D --> F[Trích xuất product_ids]
E --> F
F --> G[Response: ai_response + product_ids]
G --> H[Lưu vào PostgreSQL]
H --> I[Trả về client]
```
### Quy trình xử lý:
1. User gửi tin nhắn qua API `/chat`
2. Hệ thống agent phân tích intent
3. Nếu cần tìm sản phẩm → Gọi `data_retrieval_tool` với tham số phù hợp
4. Agent tổng hợp thông tin → Trả lời tư vấn
5. Trích xuất `product_ids` từ kết quả tìm kiếm
6. Lưu lịch sử chat vào PostgreSQL (background task)
7. Trả về JSON với `ai_response` và `product_ids`
---
## Error Handling
### Error Response Format
```json
{
"status": "error",
"ai_response": "Mô tả lỗi hoặc thông báo fallback",
"product_ids": []
}
```
### HTTP Status Codes
| Code | Ý nghĩa | Khi nào xảy ra |
|------|---------|----------------|
| 200 | OK | Request thành công |
| 400 | Bad Request | Thiếu `user_id` hoặc `user_query` |
| 500 | Internal Server Error | Lỗi hệ thống (database, LLM, ...) |
---
## Ví Dụ Sử Dụng
### Python
```python
import requests
# Chat với bot
response = requests.post("http://localhost:8000/chat", json={
"user_id": "user_12345",
"user_query": "Cho em xem váy đầm dự tiệc dưới 1 triệu"
})
data = response.json()
print(f"Bot: {data['ai_response']}")
print(f"Sản phẩm: {data['product_ids']}")
# Lấy lịch sử chat
history = requests.get("http://localhost:8000/history/user_12345?limit=10")
messages = history.json()["data"]
for msg in messages:
sender = "User" if msg["is_human"] else "Bot"
print(f"{sender}: {msg['message']}")
```
### JavaScript (Fetch API)
```javascript
// Chat với bot
const response = await fetch('http://localhost:8000/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
user_id: 'user_12345',
user_query: 'Cho em xem áo khoác nữ'
})
});
const data = await response.json();
console.log('Bot:', data.ai_response);
console.log('Products:', data.product_ids);
// Lấy lịch sử chat (phân trang)
let cursor = null;
const allMessages = [];
do {
const url = cursor
? `http://localhost:8000/history/user_12345?limit=50&before_id=${cursor}`
: `http://localhost:8000/history/user_12345?limit=50`;
const historyResponse = await fetch(url);
const { data: messages, next_cursor } = await historyResponse.json();
allMessages.push(...messages);
cursor = next_cursor;
} while (cursor !== null);
console.log(`Tổng số tin nhắn: ${allMessages.length}`);
```
### cURL
```bash
# Chat với bot
curl -X POST "http://localhost:8000/chat" \
-H "Content-Type: application/json" \
-d '{
"user_id": "user_12345",
"user_query": "Cho em xem giày thể thao nam"
}'
# Lấy lịch sử chat
curl "http://localhost:8000/history/user_12345?limit=20"
# Lấy trang tiếp theo
curl "http://localhost:8000/history/user_12345?limit=20&before_id=150"
```
---
## Notes
### 1. Product IDs
- `product_ids` trả về danh sách `internal_ref_code` (mã sản phẩm nội bộ)
- Frontend có thể dùng để hiển thị carousel sản phẩm hoặc link đến trang chi tiết
- Nếu không tìm thấy sản phẩm → `product_ids = []`
### 2. Conversation History
- Lịch sử chat được lưu tự động sau mỗi cuộc hội thoại (background task)
- Dữ liệu lưu trong PostgreSQL với index trên `user_id` và `id`
- Sắp xếp theo thứ tự mới nhất → cũ nhất
### 3. Rate Limiting
- Hiện tại chưa có rate limiting
- Khuyến nghị implement rate limit khi deploy production
### 4. Authentication
- Hiện tại API không yêu cầu authentication
- `user_id` do client tự generate và gửi lên
- Khuyến nghị: Tích hợp Clerk Auth hoặc JWT token cho production
---
## Environment Variables
```bash
# PostgreSQL
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DB=chatbot_db
POSTGRES_USER=postgres
POSTGRES_PASSWORD=your_password
# OpenAI
OPENAI_API_KEY=sk-...
# StarRocks (Vector Database)
STARROCKS_HOST=localhost
STARROCKS_PORT=9030
STARROCKS_USER=root
STARROCKS_PASSWORD=your_password
STARROCKS_DB=chatbot_products
# Server
PORT=8000
HOST=0.0.0.0
```
---
## Testing
Truy cập `http://localhost:8000/static/index.html` để test chatbot qua UI đơn giản.
......@@ -402,7 +402,20 @@
botMsgDiv.className = 'message bot';
if (data.status === 'success') {
botMsgDiv.innerText = data.response;
// Display AI response
botMsgDiv.innerText = data.ai_response || data.response || 'No response';
// Add product IDs if available
if (data.product_ids && data.product_ids.length > 0) {
const productInfo = document.createElement('div');
productInfo.style.marginTop = '8px';
productInfo.style.fontSize = '0.85em';
productInfo.style.color = '#aaa';
productInfo.style.borderTop = '1px solid #555';
productInfo.style.paddingTop = '8px';
productInfo.innerText = `📦 Products: ${data.product_ids.join(', ')}`;
botMsgDiv.appendChild(productInfo);
}
} else {
botMsgDiv.innerText = "Error: " + (data.message || "Unknown error");
botMsgDiv.style.color = 'red';
......
import dis
# Code với type annotation:
def func1():
x: int = 5
return x
# Code không có type annotation:
def func2():
x = 5
return x
print("=== Func1 (có type) ===")
dis.dis(func1)
print("\n=== Func2 (không type) ===")
dis.dis(func2)
# KẾT QUẢ: Bytecode y hệt nhau! Type annotation bị bỏ qua hoàn toàn.
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment