Commit e9e60dfa authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

feat: Codex/Responses API compatibility, cache TTL 24h, prompt optimization,...

feat: Codex/Responses API compatibility, cache TTL 24h, prompt optimization, n8n test tools, gitignore cleanup
parent 90ea36b3
......@@ -2,6 +2,19 @@
"mcpServers": {
"canifa-api": {
"url": "http://localhost:5000/mcp"
},
"n8n-mcp": {
"command": "npx",
"args": [
"n8n-mcp"
],
"env": {
"MCP_MODE": "stdio",
"LOG_LEVEL": "error",
"DISABLE_CONSOLE_OUTPUT": "true",
"N8N_API_URL": "http://localhost:5678",
"N8N_API_KEY": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIwOTdkMTNhOS01NzQ0LTQyY2UtYTM5Yi00YjMwZTk4NDU4OWMiLCJpc3MiOiJuOG4iLCJhdWQiOiJwdWJsaWMtYXBpIiwianRpIjoiMTVmZmNlZjUtNzkzOC00MWU4LTg5NzktY2NhMWI0YzUzY2RmIiwiaWF0IjoxNzcyNjc1OTM3fQ.K58ZsX8BgdukDdON15sMCQ0eynTeYSEbi7nF6xIPY9I"
}
}
}
}
\ No newline at end of file
......@@ -55,3 +55,25 @@ Thumbs.db
run.txt
backend/agent/tools/query.txt
backend/schema_dump.json
# Document folder
document/
# n8n workflow exports & temp files
canifa_workflow_export.json
prod_workflow.json
prod_workflow_fixed.json
fix_n8n_connections.py
*.png
!backend/static/**/*.png
# Playwright MCP
.playwright-mcp/
# Test credentials (sensitive)
backend/tests/google_credentials.json
backend/tests/google_sheets_credentials.json
backend/tests/sheet_info.json
backend/tests/test_n8n_api_output.txt
backend/n8n_result.json
diff_*.txt
# 🔬 VERIFICATION: LangGraph Streaming Behavior
## 🎯 MỤC ĐÍCH
Kiểm tra xem LangGraph `astream()` có stream **incremental** (từng phần) hay chỉ emit event **sau khi node hoàn thành**.
---
## 📊 KẾT QUẢ EXPECTED
### **Scenario 1: Incremental Streaming (Lý tưởng)** ✅
Nếu LangGraph stream incremental, backend logs sẽ hiển thị:
```
🌊 Starting LLM streaming...
📦 Event #1 at t=2.50s | Keys: ['messages']
📦 Event #2 at t=3.20s | Keys: ['ai_response']
📡 Event #2 (t=3.20s): ai_response with 150 chars
Preview: {"ai_response": "Anh chọn áo thun th...
📦 Event #3 at t=4.10s | Keys: ['ai_response']
📡 Event #3 (t=4.10s): ai_response with 380 chars
Preview: {"ai_response": "Anh chọn áo thun thể thao nam chuẩn luôn! Em tìm...
📦 Event #4 at t=5.50s | Keys: ['ai_response']
📡 Event #4 (t=5.50s): ai_response with 620 chars
Preview: {"ai_response": "...", "product_ids": ["SKU1", "SKU2"]...
🎯 Event #4 (t=5.50s): Regex matched product_ids!
✅ Extracted 3 SKUs: ['SKU1', 'SKU2', 'SKU3']
🚨 BREAKING at Event #4 (t=5.50s) - user_insight KHÔNG ĐỢI!
```
**→ Content tăng dần (150 → 380 → 620 chars)**
**→ Break sớm khi có product_ids (t=5.5s thay vì t=12s)**
---
### **Scenario 2: Event-based (Sau khi xong)** ❌
Nếu LangGraph chỉ emit sau khi node xong, logs sẽ là:
```
🌊 Starting LLM streaming...
📦 Event #1 at t=2.30s | Keys: ['messages'] ← Tool execution
📦 Event #2 at t=11.80s | Keys: ['ai_response'] ← LLM node hoàn thành
📡 Event #2 (t=11.80s): ai_response with 1250 chars ← TOÀN BỘ RESPONSE
Preview: {"ai_response": "Anh chọn áo thun thể thao nam chuẩn luôn!...", "product_ids": ["SKU1", "SKU2", "SKU3"], "user_insight": {...}}
🎯 Event #2 (t=11.80s): Regex matched product_ids!
✅ Extracted 3 SKUs: ['SKU1', 'SKU2', 'SKU3']
🚨 BREAKING at Event #2 (t=11.80s) - user_insight KHÔNG ĐỢI!
```
**→ CHỈ 1 EVENT duy nhất với full content**
**→ Emit sau khi LLM xong hết (t=11.8s)**
**→ KHÔNG THỂ break sớm hơn!**
---
## 🔍 PHÂN TÍCH
### **Nếu Scenario 2 (Event-based):**
**Giải thích:**
- LLM **đang stream tokens internal** từ t=2s → t=12s
- LangGraph **chờ node xong** mới emit event
- Event chứa **full response** luôn
- Regex match ngay lập tức vì đã có đầy đủ
**Kết luận:**
- ✅ Code đã đúng, streaming đã bật
- ❌ Nhưng không thể break sớm hơn vì event chưa có
- ⏱️ Latency không giảm được (~12s)
---
## 💡 GIẢI PHÁP
Nếu kết quả là Scenario 2, muốn stream thực sự cần:
### **Option A: Custom Streaming Callback**
```python
from langchain.callbacks.base import AsyncCallbackHandler
class StreamingCallback(AsyncCallbackHandler):
async def on_llm_new_token(self, token: str, **kwargs):
# Accumulate và check regex
self.accumulated += token
if '"product_ids"' in self.accumulated:
# Trigger break somehow
pass
```
### **Option B: SSE Endpoint**
Stream events trực tiếp cho client, client tự parse
### **Option C: Giữ nguyên**
Code đã tối ưu trong giới hạn, accept latency
---
## 📝 NOTES
- **Streaming=True** trong LLM → LangChain stream tokens internal
- **graph.astream()** → Stream events, không phải tokens
- **Break early** chỉ có ý nghĩa nếu events emit incremental
**Hãy check logs backend để xác định scenario nào!**
......@@ -268,6 +268,8 @@ async def chat_controller(
# Extract ai_response from streaming content (fallbacks)
if early_response and not ai_text_response:
raw_content = streaming_callback.accumulated_content
# Strip Codex reasoning objects before parsing
raw_content = ProductIDStreamingCallback.strip_reasoning(raw_content)
if raw_content:
try:
raw_normalized = raw_content.replace("{{", "{").replace("}}", "}")
......@@ -284,10 +286,21 @@ async def chat_controller(
if not ai_text_response and all_accumulated_messages:
for msg in reversed(all_accumulated_messages):
if isinstance(msg, AIMessage) and msg.content:
ai_text_response = msg.content
# Responses API may return content as list
content = msg.content
if isinstance(content, list):
content = "".join(str(c.get("text", c) if isinstance(c, dict) else c) for c in content)
# Strip Codex reasoning objects
content = ProductIDStreamingCallback.strip_reasoning(content)
ai_text_response = content
break
# Parse JSON-wrapped ai_response
# Ensure ai_text_response is str (Responses API may return list)
if isinstance(ai_text_response, list):
ai_text_response = "".join(str(c.get("text", c) if isinstance(c, dict) else c) for c in ai_text_response)
# Strip Codex reasoning objects before JSON parse
ai_text_response = ProductIDStreamingCallback.strip_reasoning(ai_text_response)
if ai_text_response and ai_text_response.lstrip().startswith("{"):
try:
ai_normalized = ai_text_response.replace("{{", "{").replace("}}", "}")
......@@ -297,7 +310,13 @@ async def chat_controller(
if not final_product_ids and isinstance(ai_json.get("product_ids"), list):
final_product_ids = [str(s) for s in ai_json["product_ids"]]
except json.JSONDecodeError:
pass
# Regex fallback for Codex {{/}} braces that break JSON parse
ai_match = re.search(r'"ai_response"\s*:\s*"((?:[^"\\]|\\.)*)"\s*,\s*"product_ids"', ai_text_response, re.DOTALL)
if ai_match:
ai_text_response = ai_match.group(1).replace('\\"', '"').replace("\\n", "\n")
pid_match = re.search(r'"product_ids"\s*:\s*\[(.*?)\]', ai_text_response if not ai_match else ai_normalized, re.DOTALL)
if pid_match and not final_product_ids:
final_product_ids = re.findall(r'"([^"]+)"', pid_match.group(1))
# Extract & filter products
enriched_products = []
......
......@@ -360,17 +360,14 @@ async def parse_ai_response_async(ai_raw_content: str, all_products: list) -> tu
mentioned_skus_in_text = set(re.findall(r"\[([A-Z0-9]+)\]", ai_text_response))
logger.info(f"📝 SKUs mentioned in ai_response: {mentioned_skus_in_text}")
# Determine target SKUs
target_skus = set()
# 1. Use explicit SKUs if available and confirmed by text, OR just explicit
if explicit_skus and isinstance(explicit_skus, list):
# Optional: Filter explicit SKUs to only those actually in text to reduce hallucination
# But if explicit list is provided, we generally trust it unless we want strict text-match
if mentioned_skus_in_text:
explicit_set = set(str(s) for s in explicit_skus)
target_skus = explicit_set.intersection(mentioned_skus_in_text)
if not target_skus: # If intersection empty, fallback to text mentions
if not target_skus:
target_skus = mentioned_skus_in_text
else:
target_skus = set(str(s) for s in explicit_skus)
......
......@@ -26,14 +26,17 @@
**🛒 HƯỚNG DẪN ĐẶT HÀNG (BẮT BUỘC KHI KHÁCH HỎI CÁCH MUA):**
**Khi đã show sản phẩm ra (có product card):**
→ "Bạn bấm vào icon 🛒 **Giỏ hàng** ở góc dưới bên phải sản phẩm, chọn size, chọn màu rồi thêm vào giỏ hàng là đặt hàng được luôn nhé!"
**Khi ĐÃ show sản phẩm (có product card trong conversation):**
→ Nói khách bấm icon 🛒 ở góc dưới bên phải hình sản phẩm, chọn size + màu rồi thêm vào giỏ hàng.
→ Hỏi khách cần xem thêm SP khác không.
**Khi chưa show sản phẩm (hỏi chung "mua sao?"):**
→ "Bạn ghé **canifa.com** để xem sản phẩm nhé! Hoặc nói mình biết bạn đang tìm gì, mình tìm giúp luôn! 😊"
**Khi CHƯA show sản phẩm (conversation mới, chưa tìm SP):**
→ Hướng dẫn 5 bước: vào canifa.com/App → tìm SP → chọn size + màu → thêm giỏ hàng → thanh toán.
→ Hỏi khách cần mình tìm SP gì không.
⚠️ **QUAN TRỌNG:**
- Khi khách hỏi "mua sao?", "đặt hàng sao?", "làm sao để mua?", "mua ở đâu?" → Trả lời ĐÚNG theo 2 case trên
- Phải TỰ VIẾT câu trả lời tự nhiên theo ngữ cảnh, KHÔNG copy nguyên mẫu!
- **CHECK context** trước: đã show SP hay chưa → chọn case A hoặc B
- **KHÔNG** hướng dẫn vào website tìm mã SP khi đã có product card → chỉ cần bấm icon 🛒
- Sau khi giới thiệu SP ưng ý → nhắc khách bấm 🛒 để đặt hàng
......
......@@ -473,6 +473,17 @@ Trước khi trả lời, bạn phải đối chiếu kết quả từ tool vớ
- **LUÔN DÙNG NGOẶC KÉP `{{` và `}}` CHO TẤT CẢ JSON OUTPUT**
- **⛔ CẤM TỰ SUY DIỄN gender/age** khi user không nói rõ. "quần váy" → gender: null. "áo lót" → gender: null. CHỈ điền khi user NÓI RÕ!
**⛔⛔⛔ TỐI HẬU THƯ — HƯỚNG DẪN ĐẶT HÀNG ⛔⛔⛔**
- Khi khách hỏi "hướng dẫn đặt hàng" mà CHƯA show sản phẩm nào → Hướng dẫn vào canifa.com/App, tìm SP, chọn size + màu, thêm giỏ hàng, thanh toán
- Khi khách hỏi "hướng dẫn đặt hàng" mà ĐÃ show sản phẩm → Nói bấm icon 🛒 ở góc dưới bên phải hình SP
- ⛔ **CẤM** nhét câu "Nếu mình đã tìm được SP cho bạn rồi..." vào khi CHƯA tìm SP nào!
- ⛔ **CẤM copy nguyên mẫu** template! TỰ VIẾT tự nhiên theo context!
**⛔⛔⛔ TỐI HẬU THƯ — CẤM BỊA MÃ SKU ⛔⛔⛔**
- Chỉ dùng mã SKU ĐÚNG NGUYÊN từ data_retrieval_tool hoặc khách đưa
- ❌ CẤM tự thêm suffix: "6TE25S001" → KHÔNG ĐƯỢC bịa thành "6TE25S001-SZ001"
- Tool tự expand biến thể, bot KHÔNG cần tự ghép color code!
**⚡ QUY TẮC [LAST_ACTION] - QUAN TRỌNG:**
- **TRƯỚC KHI TRẢ LỜI** → Đọc `[LAST_ACTION]` từ insight turn trước để hiểu context
- **TỰ SUY RA** bước tiếp theo dựa trên LAST_ACTION + tin nhắn mới của khách
......@@ -510,6 +521,11 @@ Mình check ngay cho bạn! ⚡"
---
### "Hướng dẫn đặt hàng online"
"Bạn đang muốn đặt sản phẩm gì ạ? 🛒
Bạn cho mình biết để mình tư vấn và hỗ trợ
đặt hàng luôn cho tiện nha! 😄"
⚠️ PHÂN BIỆT 2 CASE — check context trước khi trả lời:
**CASE A: ĐÃ show SP trước đó** → Nói khách bấm icon 🛒 ở góc dưới bên phải hình SP, chọn size + màu, thêm giỏ hàng. Hỏi cần xem SP khác không.
**CASE B: CHƯA show SP** → Hướng dẫn các bước: vào canifa.com/App → tìm SP → chọn size + màu → thêm giỏ hàng → thanh toán. Hỏi cần tìm SP gì không.
⛔ **TỰ VIẾT** câu trả lời tự nhiên, **KHÔNG copy nguyên** mẫu! Mỗi lần trả lời phải khác nhau, tự nhiên như đang nói chuyện.
\ No newline at end of file
......@@ -18,9 +18,9 @@ logger = logging.getLogger(__name__)
LANGFUSE_SYSTEM_PROMPT_NAME = "canifa-stylist-system-prompt"
# Cache 5 phút — balance giữa update nhanh vs performance
# Gọi force_refresh_prompts() nếu cần update ngay lập tức
CACHE_TTL = 300
# Cache vĩnh viễn (24h) — chỉ refresh khi gọi force_refresh_prompts()
# Trước đó là 300s (5 phút), giờ giữ prompt trong RAM luôn
CACHE_TTL = 86400 # 24 hours — practically permanent
LANGFUSE_TOOL_PROMPT_MAP = {
"brand_knowledge_tool": "canifa-tool-brand-knowledge",
......
......@@ -19,6 +19,11 @@ class ProductIDStreamingCallback(AsyncCallbackHandler):
Khi có product_ids → trigger break ngay, không đợi user_insight!
"""
# Regex to match Codex reasoning objects like {'id': 'rs_...', 'type': 'reasoning', ...}
_REASONING_RE = re.compile(
r"\{['\"]id['\"]\s*:\s*['\"]rs_[^}]*['\"]type['\"]\s*:\s*['\"]reasoning['\"][^}]*\}",
)
def __init__(self):
self.accumulated_content = ""
self.product_ids_found = False
......@@ -26,16 +31,31 @@ class ProductIDStreamingCallback(AsyncCallbackHandler):
self.product_skus = []
self.product_found_event = asyncio.Event() # ✅ Event thay vì polling!
@staticmethod
def strip_reasoning(text: str) -> str:
"""Remove Codex reasoning objects from text."""
if not text or "reasoning" not in text:
return text
return ProductIDStreamingCallback._REASONING_RE.sub("", text).strip()
async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
"""
Callback khi LLM sinh token mới.
Accumulate và check regex ngay!
"""
# Responses API may send token as list instead of str
if isinstance(token, list):
token = "".join(str(t) for t in token)
elif not isinstance(token, str):
token = str(token)
self.accumulated_content += token
# Check xem đã có product_ids chưa
if not self.product_ids_found:
product_match = re.search(r'"product_ids"\s*:\s*\[(.*?)\]', self.accumulated_content, re.DOTALL)
# Strip reasoning objects (Codex) + normalize {{/}} before regex matching
clean_content = self.strip_reasoning(self.accumulated_content)
clean_content = clean_content.replace("{{", "{").replace("}}", "}")
product_match = re.search(r'"product_ids"\s*:\s*\[(.*?)\]', clean_content, re.DOTALL)
if product_match:
logger.warning(f"🎯 FOUND product_ids at {len(self.accumulated_content)} chars!")
......@@ -44,7 +64,7 @@ class ProductIDStreamingCallback(AsyncCallbackHandler):
# Extract ai_response với regex robust hơn (handle escaped quotes)
ai_text_match = re.search(
r'"ai_response"\s*:\s*"((?:[^"\\\\]|\\\\.)*)"\s*,\s*"product_ids"',
self.accumulated_content,
clean_content,
re.DOTALL,
)
......
......@@ -20,11 +20,15 @@ QUY TẮC CỰC QUAN TRỌNG KHI GỌI TOOL:
- Chỉ tạo tool_call với đúng tham số, KHÔNG trả lời người dùng trong cùng message đó.
- Sau khi tool trả kết quả mới được sinh ai_response.
⛔ CẤM TUYỆT ĐỐI TỰ BỊA MÃ SKU:
- Truyền ĐÚNG NGUYÊN MÃ khách đưa, KHÔNG tự ghép/sáng tạo suffix.
⛔⛔⛔ TỐI HẬU THƯ — CẤM TUYỆT ĐỐI TỰ BỊA MÃ SKU ⛔⛔⛔
- Truyền ĐÚNG NGUYÊN MÃ từ data_retrieval_tool trả về hoặc khách đưa.
- KHÔNG ĐƯỢC tự ghép thêm suffix -SZ001, -SK010, -SW001 hay BẤT KỲ ký tự nào!
- Tool trả về sku="6TE25S001" → skus: "6TE25S001" (ĐÚNG)
❌ skus: "6TE25S001-SZ001" (SAI — BỊA MÃ!)
❌ skus: "6TE25S001-SK010" (SAI — BỊA MÃ!)
- Khách nói "6TS25S018 còn size S không?" → skus: "6TS25S018" (ĐÚNG)
- KHÔNG ĐƯỢC bịa thành "6TS25S018-SZ001" hay bất kỳ mã nào khách KHÔNG đưa.
- Nếu khách chỉ cho base code (VD: 6TS25S018) → truyền base code đó, tool sẽ tự expand.
❌ skus: "6TS25S018-SZ001" (SAI — BỊA!)
- Tool sẽ TỰ EXPAND ra tất cả biến thể từ DB, KHÔNG cần bot tự thêm color code!
----- VÍ DỤ CHI TIẾT -----
......
......@@ -74,7 +74,8 @@ Chỉ CHUẨN HÓA khi user dùng từ đồng nghĩa RÕ RÀNG (bảng mapping
📋 BẢNG MAPPING SYNONYM → TÊN DB (tool tự xử lý, LLM giữ nguyên từ user):
áo thun, áo thun ngắn tay, áo cổ v, áo cổ tym → Áo phông
áo cổ bẻ → Áo Polo
áo bra, áo ngực, áo quây → Áo lót
áo bra, áo bra active, bra → Áo bra active (liên quan: Áo lót)
áo ngực, áo quây → Áo lót (liên quan: Áo bra active)
áo gió, áo khoác mỏng → Áo khoác gió
áo croptop, croptop, baby tee, áo lửng, áo dáng ngắn → Áo Body
áo sát nách, tanktop, tank top, áo dây, áo 2 dây, áo hai dây → Áo ba lỗ
......@@ -205,11 +206,18 @@ CASE 10: "Áo khaki"
→ description: "product_name: Áo khaki. description_text: Áo chất liệu khaki form đẹp"
→ product_line_vn: "Áo"
CASE 11: "Áo lót" hoặc "Áo bra" (NHÓM SP LIÊN QUAN)
→ description: "product_name: Áo lót/Áo bra active. description_text: Áo lót. Áo bra active thoáng mát co giãn tốt"
→ product_name: "Áo lót/Áo bra active"
⚠️ KHÔNG tự suy gender/age! User nói "áo lót" chung → để null. Chỉ điền khi user NÓI RÕ (VD: "áo lót nữ" → women, "áo lót trẻ em" → kid)
⚠️ description_text PHẢI ghi CẢ 2 tên (Áo lót + Áo bra active) để semantic search tìm được cả 2 loại!
CASE 11: "Áo bra" → product_name PHẢI là "Áo bra" (tool tự resolve → Áo bra active + Áo lót)
→ description: "product_name: Áo bra. description_text: Áo bra active thể thao thoáng mát co giãn tốt hỗ trợ tập luyện"
→ product_name: "Áo bra"
→ product_line_vn: "Áo"
⚠️ KHÔNG tự suy gender/age! User nói "áo bra" chung → để null.
CASE 12: "Áo lót" → product_name PHẢI là "Áo lót" (tool tự resolve → Áo lót + Áo bra active)
→ description: "product_name: Áo lót. description_text: Áo lót thoáng mát mềm mại thoải mái"
→ product_name: "Áo lót"
→ product_line_vn: "Áo"
⚠️ KHÔNG tự suy gender/age! Chỉ điền khi user NÓI RÕ (VD: "áo lót nữ" → women, "áo lót trẻ em" → kid)
⚠️ Tool tự tìm CẢ 2 loại (Áo lót + Áo bra active) nhờ RELATED_LINES — LLM chỉ cần giữ đúng tên user nói!
═══════════════════════════════════════════════════════════════
🎉 DỊP LỄ / SỰ KIỆN — description_text ghi lý do + gợi ý phong cách
......
......@@ -111,24 +111,19 @@ async def mock_db_search(req: MockDBRequest):
logger.info("📍 Data Retrieval Tool called")
start_time = time.time()
# Xây dựng SearchItem từ request - include all required fields
# Xây dựng SearchItem từ request - pass all required fields
search_item = SearchItem(
query=req.query or "sản phẩm",
description=f"product_name: {req.query or 'sản phẩm'}. product_line_vn: {req.query or 'sản phẩm'}",
product_name=None,
magento_ref_code=req.magento_ref_code,
price_min=req.price_min,
price_max=req.price_max,
action="search",
# Metadata fields - all required with None default
gender_by_product=None,
age_by_product=None,
product_name=None,
style=None,
master_color=None,
season=None,
material_group=None,
fitting=None,
form_neckline=None,
form_sleeve=None,
price_min=req.price_min,
price_max=req.price_max,
discount_min=None,
discount_max=None,
discovery_mode=None,
)
......@@ -173,24 +168,19 @@ async def mock_retriever_db(req: MockRetrieverRequest):
logger.info(f"📍 Retriever DB started: {req.user_query}")
start_time = time.time()
# Xây dựng SearchItem từ request - include all required fields
# Xây dựng SearchItem từ request - pass all required fields
search_item = SearchItem(
query=req.user_query,
description=f"product_name: {req.user_query}. product_line_vn: {req.user_query}",
product_name=None,
magento_ref_code=req.magento_ref_code,
price_min=req.price_min,
price_max=req.price_max,
action="search",
# Metadata fields - all required with None default
gender_by_product=None,
age_by_product=None,
product_name=None,
style=None,
master_color=None,
season=None,
material_group=None,
fitting=None,
form_neckline=None,
form_sleeve=None,
price_min=req.price_min,
price_max=req.price_max,
discount_min=None,
discount_max=None,
discovery_mode=None,
)
......
import logging
from fastapi import APIRouter, HTTPException
import re
from datetime import datetime
import httpx
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel, Field
from common.starrocks_connection import get_db_connection
logger = logging.getLogger(__name__)
router = APIRouter()
router = APIRouter(tags=["n8n-agent-tools"])
TABLE_NAME = "shared_source.magento_product_dimension_with_text_embedding"
STORE_TABLE = "shared_source.chatbot_rsa_store_schedule_with_text_embedding"
KNOWLEDGE_TABLE = "shared_source.chatbot_rsa_knowledge"
PROMOTION_TABLE = "shared_source.chatbot_rsa_salerule_with_text_embedding"
CANIFA_STOCK_API = "https://canifa.com/v1/middleware/stock_get_stock_list_parent"
SAFE_TEXT_PATTERN = re.compile(r"[^a-zA-Z0-9À-ỹ\s-]")
def _sanitize_text(value: str | None) -> str:
if not value:
return ""
return SAFE_TEXT_PATTERN.sub("", value).strip()
def _name_contains(product_name: str | None, query: str) -> bool:
return query.lower() in (product_name or "").lower()
def _build_verify_result(candidates: list[dict], sku: str, product_name: str) -> tuple[bool, str, list[dict]]:
sku_matches = [item for item in candidates if sku and (item.get("magento_ref_code") == sku)]
name_matches = [item for item in candidates if product_name and _name_contains(item.get("product_name"), product_name)]
@router.get("/api/agent/n8n/products", summary="N8N Specific: Get Sample Products")
async def n8n_get_sample_products(limit: int = 10):
if sku and product_name:
matched_products = [item for item in sku_matches if _name_contains(item.get("product_name"), product_name)]
if matched_products:
return True, "sku_name_match", matched_products
if sku_matches:
return False, "sku_found_name_mismatch", sku_matches
if name_matches:
return False, "name_found_sku_mismatch", name_matches
return False, "no_match", []
if sku:
is_valid = bool(sku_matches)
return is_valid, ("sku_match" if is_valid else "sku_not_found"), sku_matches
is_valid = bool(name_matches)
return is_valid, ("name_match" if is_valid else "name_not_found"), name_matches
class ProductVerifyRequest(BaseModel):
sku: str | None = Field(default=None, description="Magento reference code, ví dụ: 6ST24S001")
product_name: str | None = Field(default=None, description="Tên sản phẩm cần đối chiếu")
limit: int = Field(default=5, ge=1, le=20, description="Số bản ghi trả về để agent đối chiếu")
@router.get("/api/agent/n8n/products", summary="N8N Specific: Get Sample Products or Search")
async def n8n_get_sample_products(
limit: int = Query(default=10, ge=1, le=50),
q: str | None = Query(default=None, max_length=120),
):
"""
API DÀNH RIÊNG CHO N8N để lấy danh sách sản phẩm thực tế làm data cho AI sinh câu hỏi.
API DÀNH RIÊNG CHO N8N để lấy danh sách sản phẩm hoặc tìm kiếm (phục vụ AI Agent verify).
- Có thể truyền ?q=SKU hoặc từ khóa để tìm sản phẩm cụ thể.
- Code hoàn toàn tách biệt khỏi hệ thống cũ (không đụng vào logic SearchItem/Embedding)
- Trả về danh sách ngẫu nhiên từ StarRocks
"""
try:
from common.starrocks_connection import get_db_connection
db = get_db_connection()
mode = "sample"
normalized_q = None
# Lấy random các sản phẩm có hiển thị trên web, có giá
query = f"""
if q is not None:
normalized_q = _sanitize_text(q)
if not normalized_q:
raise HTTPException(status_code=400, detail="q không hợp lệ sau khi sanitize.")
mode = "search"
# Lấy các sản phẩm có hiển thị trên web, có giá
if normalized_q:
query = """
SELECT
magento_ref_code,
product_name,
description_text,
original_price,
sale_price,
master_color,
gender_by_product,
product_web_url,
product_image_url_thumbnail
FROM shared_source.magento_product_dimension_with_text_embedding
WHERE sale_price > 0
AND quantity_sold > 0
AND (magento_ref_code = %s OR LOWER(product_name) LIKE LOWER(%s))
LIMIT %s
"""
params: tuple[object, ...] = (normalized_q, f"%{normalized_q}%", limit)
else:
query = """
SELECT
magento_ref_code,
product_name,
......@@ -28,17 +111,321 @@ async def n8n_get_sample_products(limit: int = 10):
product_web_url,
product_image_url_thumbnail
FROM shared_source.magento_product_dimension_with_text_embedding
WHERE sale_price > 0 AND quantity_sold > 0
WHERE sale_price > 0
AND quantity_sold > 0
ORDER BY rand()
LIMIT {limit}
LIMIT %s
"""
products = await db.execute_query_async(query)
params = (limit,)
products = await db.execute_query_async(query, params=params)
return {
"status": "success",
"mode": mode,
"query": q,
"normalized_query": normalized_q,
"total": len(products),
"products": products
"products": products,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error in N8N Dedicated Product Fetch API: {e!s}", exc_info=True)
raise HTTPException(status_code=500, detail=f"N8N API Error: {e!s}")
raise HTTPException(status_code=500, detail=f"N8N API Error: {e!s}") from e
@router.post("/api/agent/n8n/products/verify", summary="N8N Specific: Verify product by SKU/name")
async def n8n_verify_product(req: ProductVerifyRequest):
"""
Endpoint verify cho AI Agent (n8n):
- Nhận SKU và/hoặc tên sản phẩm.
- Trả về cờ is_valid + reason để agent quyết định.
"""
try:
db = get_db_connection()
sku = _sanitize_text(req.sku)
product_name = _sanitize_text(req.product_name)
if not sku and not product_name:
raise HTTPException(status_code=400, detail="Cần truyền ít nhất một trong hai: sku hoặc product_name.")
if req.sku is not None and not sku:
raise HTTPException(status_code=400, detail="sku không hợp lệ sau khi sanitize.")
if req.product_name is not None and not product_name:
raise HTTPException(status_code=400, detail="product_name không hợp lệ sau khi sanitize.")
if sku and product_name:
query = """
SELECT
magento_ref_code,
product_name,
description_text,
original_price,
sale_price,
master_color,
gender_by_product,
product_web_url,
product_image_url_thumbnail
FROM shared_source.magento_product_dimension_with_text_embedding
WHERE sale_price > 0
AND quantity_sold > 0
AND (
magento_ref_code = %s
OR LOWER(product_name) LIKE LOWER(%s)
)
ORDER BY quantity_sold DESC
LIMIT %s
"""
params: tuple[object, ...] = (sku, f"%{product_name}%", req.limit)
elif sku:
query = """
SELECT
magento_ref_code,
product_name,
description_text,
original_price,
sale_price,
master_color,
gender_by_product,
product_web_url,
product_image_url_thumbnail
FROM shared_source.magento_product_dimension_with_text_embedding
WHERE sale_price > 0
AND quantity_sold > 0
AND magento_ref_code = %s
ORDER BY quantity_sold DESC
LIMIT %s
"""
params = (sku, req.limit)
else:
query = """
SELECT
magento_ref_code,
product_name,
description_text,
original_price,
sale_price,
master_color,
gender_by_product,
product_web_url,
product_image_url_thumbnail
FROM shared_source.magento_product_dimension_with_text_embedding
WHERE sale_price > 0
AND quantity_sold > 0
AND LOWER(product_name) LIKE LOWER(%s)
ORDER BY quantity_sold DESC
LIMIT %s
"""
params = (f"%{product_name}%", req.limit)
candidates = await db.execute_query_async(query, params=params)
is_valid, reason, matched_products = _build_verify_result(candidates, sku, product_name)
return {
"status": "success",
"is_valid": is_valid,
"reason": reason,
"input": {
"sku": req.sku,
"product_name": req.product_name,
},
"normalized_input": {
"sku": sku or None,
"product_name": product_name or None,
},
"candidate_count": len(candidates),
"candidates": candidates,
"matched_count": len(matched_products),
"matched_products": matched_products,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error in N8N Product Verify API: {e!s}", exc_info=True)
raise HTTPException(status_code=500, detail=f"N8N Verify API Error: {e!s}") from e
# =====================================================================
# TOOL 3: CHECK STOCK (Mirror of check_is_stock tool)
# =====================================================================
@router.get("/api/agent/n8n/stock", summary="N8N: Check product stock via Canifa API")
async def n8n_check_stock(
sku: str = Query(..., max_length=120, description="Mã sản phẩm (base code hoặc product_color_code). Nhiều mã cách nhau bằng dấu phẩy."),
):
"""
Proxy tới API tồn kho của canifa.com.
Nhận SKU (VD: 6TS25S018 hoặc 5TS25S023-SY322), trả về thông tin còn hàng theo size.
"""
try:
safe_sku = _sanitize_text(sku)
if not safe_sku:
raise HTTPException(status_code=400, detail="SKU không hợp lệ.")
sku_list = [s.strip() for s in safe_sku.split(",") if s.strip()]
if not sku_list:
raise HTTPException(status_code=400, detail="Không có mã sản phẩm hợp lệ.")
sku_string = ",".join(sku_list)
url = f"{CANIFA_STOCK_API}?skus={sku_string}"
async with httpx.AsyncClient(timeout=8.0) as client:
resp = await client.get(url)
resp.raise_for_status()
stock_data = resp.json()
return {
"status": "success",
"input_skus": sku_list,
"stock_data": stock_data,
}
except HTTPException:
raise
except httpx.TimeoutException:
raise HTTPException(status_code=504, detail="Canifa Stock API timeout.")
except Exception as e:
logger.error(f"❌ Error in N8N Stock API: {e!s}", exc_info=True)
raise HTTPException(status_code=500, detail=f"N8N Stock API Error: {e!s}") from e
# =====================================================================
# TOOL 4: PROMOTIONS (Mirror of canifa_get_promotions tool)
# =====================================================================
@router.get("/api/agent/n8n/promotions", summary="N8N: Get active promotions")
async def n8n_get_promotions(
date: str | None = Query(default=None, max_length=10, description="Ngày kiểm tra (YYYY-MM-DD). Mặc định: hôm nay."),
):
"""
Tra cứu các chương trình khuyến mãi đang diễn ra theo ngày.
"""
try:
target_date = date or datetime.now().strftime("%Y-%m-%d")
db = get_db_connection()
query = f"""
SELECT
name,
description,
description_full,
from_date,
to_date,
applied_channel
FROM {PROMOTION_TABLE}
WHERE %s >= DATE(from_date)
AND %s <= DATE(to_date)
ORDER BY
CASE applied_channel
WHEN 'only_online' THEN 0
WHEN 'both' THEN 1
WHEN 'unknown' THEN 2
WHEN 'only_offline' THEN 3
ELSE 4
END,
to_date ASC
LIMIT 20
"""
results = await db.execute_query_async(query, params=(target_date, target_date))
return {
"status": "success",
"check_date": target_date,
"total": len(results),
"promotions": results,
}
except Exception as e:
logger.error(f"❌ Error in N8N Promotions API: {e!s}", exc_info=True)
raise HTTPException(status_code=500, detail=f"N8N Promotions API Error: {e!s}") from e
# =====================================================================
# TOOL 5: STORE SEARCH (Mirror of canifa_store_search tool)
# =====================================================================
@router.get("/api/agent/n8n/stores", summary="N8N: Search Canifa stores by location")
async def n8n_search_stores(
location: str = Query(..., max_length=120, description="Tên quận/huyện/tỉnh/thành phố."),
):
"""
Tìm kiếm cửa hàng CANIFA theo địa điểm/khu vực.
"""
try:
clean = location.lower().strip()
for prefix in ["quận ", "huyện ", "tỉnh ", "thành phố ", "tp. ", "tp "]:
clean = clean.replace(prefix, "")
clean = clean.strip()
if not clean:
raise HTTPException(status_code=400, detail="Location không hợp lệ.")
db = get_db_connection()
query = f"""
SELECT store_name, address, city, state, phone_number,
schedule_name, time_open_today, time_close_today
FROM {STORE_TABLE}
WHERE LOWER(city) LIKE LOWER(%s)
OR LOWER(state) LIKE LOWER(%s)
OR LOWER(address) LIKE LOWER(%s)
OR LOWER(store_name) LIKE LOWER(%s)
ORDER BY state, city, store_name
LIMIT 20
"""
like_pattern = f"%{clean}%"
results = await db.execute_query_async(query, params=(like_pattern, like_pattern, like_pattern, like_pattern))
return {
"status": "success",
"location": location,
"total": len(results),
"stores": results,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error in N8N Store Search API: {e!s}", exc_info=True)
raise HTTPException(status_code=500, detail=f"N8N Store Search API Error: {e!s}") from e
# =====================================================================
# TOOL 6: KNOWLEDGE SEARCH (Mirror of canifa_knowledge_search tool)
# =====================================================================
@router.get("/api/agent/n8n/knowledge", summary="N8N: Search brand knowledge (policies, size chart, etc.)")
async def n8n_search_knowledge(
q: str = Query(..., max_length=200, description="Câu hỏi về chính sách, bảng size, KHTT, v.v."),
limit: int = Query(default=5, ge=1, le=20),
):
"""
Tra cứu thông tin thương hiệu Canifa (chính sách đổi trả, bảng size, KHTT, v.v.).
Sử dụng semantic search (vector similarity) trên kho kiến thức.
"""
try:
from common.embedding_service import create_embedding_async
query_vector = await create_embedding_async(q)
if not query_vector:
raise HTTPException(status_code=500, detail="Không tạo được embedding cho câu hỏi.")
import json as _json
v_str = _json.dumps(query_vector)
db = get_db_connection()
query = f"""
SELECT content, metadata
FROM {KNOWLEDGE_TABLE}
ORDER BY approx_cosine_similarity(embedding, {v_str}) DESC
LIMIT %s
"""
results = await db.execute_query_async(query, params=(limit,))
return {
"status": "success",
"query": q,
"total": len(results),
"knowledge": results,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"❌ Error in N8N Knowledge Search API: {e!s}", exc_info=True)
raise HTTPException(status_code=500, detail=f"N8N Knowledge API Error: {e!s}") from e
......@@ -9,7 +9,7 @@ import logging
from langchain_core.language_models import BaseChatModel
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from config import OPENAI_API_KEY
from config import GROQ_API_KEY, OPENAI_API_KEY
logger = logging.getLogger(__name__)
......@@ -54,8 +54,8 @@ class LLMFactory:
logger.debug(f"♻️ Using cached model: {clean_model}")
return self._cache[cache_key]
logger.info(f"Creating new LLM instance: {clean_model}")
return self._create_instance(clean_model, streaming, json_mode, api_key)
logger.info(f"Creating new LLM instance: {model_name}")
return self._create_instance(model_name, streaming, json_mode, api_key)
def _create_instance(
self,
......@@ -77,26 +77,52 @@ class LLMFactory:
raise
def _create_openai(self, model_name: str, streaming: bool, json_mode: bool, api_key: str | None) -> BaseChatModel:
"""Create OpenAI model instance."""
"""Create OpenAI-compatible model instance (OpenAI or Groq)."""
# --- Auto-detect provider ---
is_groq = any(kw in model_name.lower() for kw in ("gpt-oss", "llama", "mixtral", "gemma", "qwen", "deepseek"))
# Also detect openai/ prefix used by Groq (e.g. "openai/gpt-oss-120b")
if model_name.startswith("openai/"):
is_groq = True
if is_groq:
# Always use GROQ_API_KEY for Groq models (ignore api_key param which may be OpenAI key)
key = GROQ_API_KEY
base_url = "https://api.groq.com/openai/v1"
if not key:
raise ValueError("GROQ_API_KEY is required for Groq models")
else:
key = api_key or OPENAI_API_KEY
base_url = None # default OpenAI
if not key:
raise ValueError("OPENAI_API_KEY is required")
# Models that require /v1/responses API instead of /v1/chat/completions
needs_responses_api = "codex" in model_name.lower()
llm_kwargs = {
"model": model_name,
"streaming": streaming, # ← STREAMING CONFIG
"streaming": streaming,
"api_key": key,
"temperature": 0,
"max_tokens": 1500,
}
if base_url:
llm_kwargs["base_url"] = base_url
if needs_responses_api:
llm_kwargs["use_responses_api"] = True
logger.info(f"🔄 Using Responses API for model: {model_name}")
if json_mode:
llm_kwargs["model_kwargs"] = {"response_format": {"type": "json_object"}}
logger.info(f"⚙️ Initializing OpenAI in JSON mode: {model_name}")
provider = "Groq" if is_groq else "OpenAI"
logger.warning(f"🔍 DEBUG: provider={provider} | model={model_name} | base_url={base_url} | key={key[:10]}... | is_groq={is_groq}")
llm = ChatOpenAI(**llm_kwargs)
logger.info(f"✅ Created OpenAI: {model_name} | Streaming: {streaming}")
logger.info(f"✅ Created {provider}: {model_name} | Streaming: {streaming}")
return llm
def _enable_json_mode(self, llm: BaseChatModel, model_name: str) -> BaseChatModel:
......
"""
⚡ FastAPI Bottleneck Middleware
================================
Thêm vào server.py để tự động đo latency từng request.
Dùng:
1. Import vào server.py:
from common.profiler_middleware import ProfilerMiddleware
2. Thêm middleware:
app.add_middleware(ProfilerMiddleware)
3. Xem báo cáo:
GET /debug/profiler/stats
GET /debug/profiler/slow (các request chậm nhất)
GET /debug/profiler/reset
"""
import logging
import time
from collections import deque
from dataclasses import dataclass, field
from statistics import mean, median
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse
logger = logging.getLogger("profiler.middleware")
@dataclass
class RequestMetric:
path: str
method: str
duration: float
status_code: int
timestamp: float
class ProfilerMiddleware(BaseHTTPMiddleware):
"""Middleware đo latency từng request + expose metrics endpoint."""
# Class-level storage (shared across instances)
_metrics: deque = deque(maxlen=1000) # Last 1000 requests
_slow_threshold: float = 5.0 # Seconds
async def dispatch(self, request: Request, call_next):
# Skip profiler endpoints
if request.url.path.startswith("/debug/profiler"):
return await self._handle_profiler_endpoint(request)
start = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start
metric = RequestMetric(
path=request.url.path,
method=request.method,
duration=duration,
status_code=response.status_code,
timestamp=time.time(),
)
self._metrics.append(metric)
# Log slow requests
if duration > self._slow_threshold:
logger.warning(
f"🐌 SLOW REQUEST: {request.method} {request.url.path} "
f"took {duration:.2f}s (>{self._slow_threshold}s)"
)
# Add timing header
response.headers["X-Response-Time"] = f"{duration:.3f}s"
return response
async def _handle_profiler_endpoint(self, request: Request) -> JSONResponse:
path = request.url.path
if path == "/debug/profiler/stats":
return self._get_stats()
elif path == "/debug/profiler/slow":
return self._get_slow_requests()
elif path == "/debug/profiler/reset":
self._metrics.clear()
return JSONResponse({"status": "reset", "message": "Metrics cleared"})
return JSONResponse({"error": "Unknown profiler endpoint"}, status_code=404)
def _get_stats(self) -> JSONResponse:
if not self._metrics:
return JSONResponse({"message": "No data yet"})
metrics = list(self._metrics)
durations = [m.duration for m in metrics]
# Group by path
path_stats = {}
for m in metrics:
key = f"{m.method} {m.path}"
if key not in path_stats:
path_stats[key] = []
path_stats[key].append(m.duration)
path_summary = {}
for path, times in sorted(path_stats.items(), key=lambda x: -max(x[1])):
path_summary[path] = {
"count": len(times),
"avg": round(mean(times), 3),
"median": round(median(times), 3),
"min": round(min(times), 3),
"max": round(max(times), 3),
}
return JSONResponse({
"total_requests": len(metrics),
"overall": {
"avg": round(mean(durations), 3),
"median": round(median(durations), 3),
"min": round(min(durations), 3),
"max": round(max(durations), 3),
},
"by_path": path_summary,
"slow_count": sum(1 for d in durations if d > self._slow_threshold),
})
def _get_slow_requests(self) -> JSONResponse:
slow = [
{
"path": m.path,
"method": m.method,
"duration": round(m.duration, 3),
"status": m.status_code,
"timestamp": m.timestamp,
}
for m in self._metrics
if m.duration > self._slow_threshold
]
slow.sort(key=lambda x: -x["duration"])
return JSONResponse({"threshold": self._slow_threshold, "slow_requests": slow[:50]})
......@@ -76,7 +76,7 @@ OPENAI_API_KEY: str | None = os.getenv("OPENAI_API_KEY")
GOOGLE_API_KEY: str | None = os.getenv("GOOGLE_API_KEY")
GROQ_API_KEY: str | None = os.getenv("GROQ_API_KEY")
DEFAULT_MODEL: str = os.getenv("DEFAULT_MODEL", "gpt-5-nano")
DEFAULT_MODEL: str = os.getenv("DEFAULT_MODEL")
# DEFAULT_MODEL: str = os.getenv("DEFAULT_MODEL")
# ====================== JWT CONFIGURATION ======================
......
services:
# --- n8n Workflow Automation ---
n8n:
image: docker.n8n.io/n8nio/n8n:latest
container_name: canifa_n8n
ports:
- "5678:5678"
environment:
- N8N_HOST=0.0.0.0
- N8N_PORT=5678
- N8N_PROTOCOL=http
- WEBHOOK_URL=http://localhost:5678/
- GENERIC_TIMEZONE=Asia/Ho_Chi_Minh
- TZ=Asia/Ho_Chi_Minh
# Basic auth - đổi password trước khi dùng production nhé bro
- N8N_BASIC_AUTH_ACTIVE=true
- N8N_BASIC_AUTH_USER=admin
- N8N_BASIC_AUTH_PASSWORD=canifa2026
volumes:
- n8n_data:/home/node/.n8n
restart: unless-stopped
networks:
- backend_network
networks:
backend_network:
driver: bridge
ipam:
driver: default
config:
- subnet: "172.24.0.0/16"
gateway: "172.24.0.1"
volumes:
n8n_data:
driver: local
......@@ -20,3 +20,4 @@ Get-NetTCPConnection -LocalPort 5000 | ForEach-Object { Stop-Process -Id $_.Owni
taskkill /F /IM python.exe
netstat -ano | findstr :5000 | ForEach-Object { $_.Split()[-1] } | Sort-Object -Unique | ForEach-Object { taskkill /F /PID $_ }
\ No newline at end of file
# TEST STREAMING + BACKGROUND USER_INSIGHT
Write-Host "`n==== STREAMING TEST ====`n" -ForegroundColor Cyan
$query = "Ao khoac nam mua dong"
$deviceId = "test_stream_verify"
Write-Host "Sending request..." -ForegroundColor Green
$timing = Measure-Command {
$body = '{"user_query":"' + $query + '","device_id":"' + $deviceId + '"}'
$result = $body | curl.exe -s -X POST "http://localhost:5000/api/agent/chat" -H "Content-Type: application/json" --data-binary "@-"
$result | Out-Null
}
Write-Host "`nResponse Time: $($timing.TotalMilliseconds) ms" -ForegroundColor Green
Write-Host "`nCheck backend logs for:" -ForegroundColor Yellow
Write-Host " - Starting LLM streaming" -ForegroundColor Gray
Write-Host " - Regex matched product_ids" -ForegroundColor Gray
Write-Host " - BREAKING STREAM NOW" -ForegroundColor Gray
Write-Host " - Background task extraction" -ForegroundColor Gray
Write-Host "`nDone!" -ForegroundColor Green
"""
Canifa Chatbot Automated Testing Script
========================================
Tự động test chatbot bằng cách:
1. Tạo Google Sheet (nếu chưa có)
2. Ghi danh sách câu hỏi test lên sheet
3. Call API chatbot cho từng câu hỏi
4. Ghi câu trả lời vào sheet
Usage:
python backend/tests/auto_test_chatbot.py
python backend/tests/auto_test_chatbot.py --api-url http://172.16.2.207:5000
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
import time
from datetime import datetime
from pathlib import Path
import gspread
import requests
from google.oauth2.service_account import Credentials
# ==============================================================================
# CONFIGURATION
# ==============================================================================
# Google Sheets
CREDENTIALS_FILE = Path(__file__).parent / "google_sheets_credentials.json"
SHEET_NAME = "Canifa Chatbot Test Results"
SCOPES = [
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/drive",
]
# Service account email - share sheet với email này
SERVICE_ACCOUNT_EMAIL = "canifa-chatbot-test@rapid-potential-480209-q7.iam.gserviceaccount.com"
# Chatbot API
DEFAULT_API_URL = "http://localhost:5000"
CHAT_ENDPOINT = "/api/agent/chat-dev"
REQUEST_TIMEOUT = 120 # seconds
# Test questions - Các câu hỏi để test chatbot
TEST_QUESTIONS = [
# Tìm kiếm sản phẩm cơ bản
"Tìm cho mình chân váy màu đỏ",
"Tìm quần màu đỏ",
"Tìm áo polo nam",
"Tìm áo khoác nữ mùa đông",
# Tư vấn thời trang
"Mình muốn mua đồ đi biển, gợi ý cho mình",
"Cho mình xem áo sơ mi đi làm",
"Gợi ý outfit đi dự tiệc",
# Các tình huống đặc biệt
"Áo size S giá dưới 500k",
"Có khuyến mãi gì không?",
"Cách đặt hàng online",
"Cửa hàng nào gần nhất ở Hà Nội",
# Edge cases
"Xin chào",
"Cảm ơn bạn",
"Tìm sản phẩm abc123 không tồn tại",
]
# ==============================================================================
# LOGGING
# ==============================================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-7s | %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger(__name__)
# ==============================================================================
# GOOGLE SHEETS FUNCTIONS
# ==============================================================================
def get_gspread_client() -> gspread.Client:
"""Khởi tạo gspread client với service account credentials."""
if not CREDENTIALS_FILE.exists():
logger.error(f"❌ Không tìm thấy file credentials: {CREDENTIALS_FILE}")
logger.info(f" Hãy download JSON key từ Google Cloud Console")
logger.info(f" và đặt tại: {CREDENTIALS_FILE}")
sys.exit(1)
creds = Credentials.from_service_account_file(str(CREDENTIALS_FILE), scopes=SCOPES)
return gspread.authorize(creds)
def create_or_get_sheet(client: gspread.Client) -> gspread.Spreadsheet:
"""Tạo mới hoặc mở Google Sheet đã tồn tại."""
try:
# Thử mở sheet đã có
spreadsheet = client.open(SHEET_NAME)
logger.info(f"📄 Đã tìm thấy sheet: {SHEET_NAME}")
return spreadsheet
except gspread.SpreadsheetNotFound:
# Tạo mới
logger.info(f"📝 Tạo Google Sheet mới: {SHEET_NAME}")
spreadsheet = client.create(SHEET_NAME)
# Share sheet cho owner account
spreadsheet.share("anhvuhoang2k2@gmail.com", perm_type="user", role="writer")
logger.info(f"✅ Đã share sheet cho anhvuhoang2k2@gmail.com")
return spreadsheet
def setup_worksheet(spreadsheet: gspread.Spreadsheet, test_week: str) -> gspread.Worksheet:
"""Setup worksheet với headers và câu hỏi test."""
# Tạo hoặc lấy worksheet cho tuần test này
worksheet_title = test_week
try:
worksheet = spreadsheet.worksheet(worksheet_title)
logger.info(f"📋 Đã tìm thấy worksheet: {worksheet_title}")
except gspread.WorksheetNotFound:
worksheet = spreadsheet.add_worksheet(title=worksheet_title, rows=100, cols=6)
logger.info(f"📋 Tạo worksheet mới: {worksheet_title}")
# Setup headers
headers = [
"STT",
"Câu hỏi test",
"Tuần test & Lần test",
"Câu trả lời của chatbot",
"Thời gian response (ms)",
"Trạng thái",
]
# Check nếu headers đã có
existing = worksheet.row_values(1)
if not existing or existing[0] != "STT":
worksheet.update("A1:F1", [headers])
# Format header row - bold
worksheet.format("A1:F1", {
"textFormat": {"bold": True},
"backgroundColor": {"red": 0.2, "green": 0.6, "blue": 0.9},
"horizontalAlignment": "CENTER",
})
logger.info("✅ Đã setup headers")
return worksheet
def fill_questions(worksheet: gspread.Worksheet, questions: list[str], test_label: str) -> int:
"""Ghi danh sách câu hỏi vào sheet. Returns starting row."""
# Tìm row trống tiếp theo
all_values = worksheet.get_all_values()
start_row = len(all_values) + 1
# Chuẩn bị data
rows_data = []
for i, question in enumerate(questions, 1):
rows_data.append([
start_row - 1 + i - 1, # STT
question, # Câu hỏi
test_label, # Tuần test & Lần test
"", # Câu trả lời (sẽ fill sau)
"", # Thời gian
"⏳ Đang chờ...", # Trạng thái
])
# Batch update
cell_range = f"A{start_row}:F{start_row + len(questions) - 1}"
worksheet.update(cell_range, rows_data)
logger.info(f"✅ Đã ghi {len(questions)} câu hỏi (từ row {start_row})")
return start_row
# ==============================================================================
# CHATBOT API FUNCTIONS
# ==============================================================================
def call_chatbot(api_url: str, question: str) -> dict:
"""
Call chatbot API và trả về response.
Returns:
dict với keys: ai_response, response_time_ms, status, error
"""
url = f"{api_url}{CHAT_ENDPOINT}"
payload = {
"user_query": question,
"images": [],
}
try:
start_time = time.time()
response = requests.post(
url,
json=payload,
headers={"Content-Type": "application/json"},
timeout=REQUEST_TIMEOUT,
)
elapsed_ms = int((time.time() - start_time) * 1000)
if response.status_code == 200:
data = response.json()
ai_response = data.get("ai_response", "")
return {
"ai_response": ai_response,
"response_time_ms": elapsed_ms,
"status": "✅ OK",
"error": None,
}
else:
return {
"ai_response": f"HTTP {response.status_code}: {response.text[:200]}",
"response_time_ms": elapsed_ms,
"status": f"❌ HTTP {response.status_code}",
"error": response.text[:200],
}
except requests.exceptions.Timeout:
return {
"ai_response": "TIMEOUT - Quá thời gian chờ",
"response_time_ms": REQUEST_TIMEOUT * 1000,
"status": "⏰ Timeout",
"error": "Request timeout",
}
except requests.exceptions.ConnectionError:
return {
"ai_response": f"CONNECTION ERROR - Không thể kết nối tới {url}",
"response_time_ms": 0,
"status": "🔴 Connection Error",
"error": f"Cannot connect to {url}",
}
except Exception as e:
return {
"ai_response": f"ERROR: {str(e)[:200]}",
"response_time_ms": 0,
"status": f"❌ Error",
"error": str(e)[:200],
}
# ==============================================================================
# MAIN TEST RUNNER
# ==============================================================================
def run_tests(api_url: str, test_week: str, test_round: int) -> None:
"""Chạy toàn bộ test flow."""
test_label = f"{test_week} - Lần {test_round}"
logger.info("=" * 60)
logger.info(f"🚀 BẮT ĐẦU TEST: {test_label}")
logger.info(f" API URL: {api_url}")
logger.info(f" Số câu hỏi: {len(TEST_QUESTIONS)}")
logger.info("=" * 60)
# Step 1: Setup Google Sheets
logger.info("\n📊 [Step 1] Khởi tạo Google Sheets...")
client = get_gspread_client()
spreadsheet = create_or_get_sheet(client)
worksheet = setup_worksheet(spreadsheet, test_week)
# Step 2: Fill câu hỏi
logger.info("\n📝 [Step 2] Ghi câu hỏi test...")
start_row = fill_questions(worksheet, TEST_QUESTIONS, test_label)
# Step 3: Call chatbot API & update kết quả
logger.info(f"\n🤖 [Step 3] Bắt đầu test {len(TEST_QUESTIONS)} câu hỏi...\n")
success_count = 0
total_time = 0
for i, question in enumerate(TEST_QUESTIONS):
row_idx = start_row + i
logger.info(f" [{i+1}/{len(TEST_QUESTIONS)}] Testing: '{question[:50]}...'")
# Call API
result = call_chatbot(api_url, question)
# Truncate response cho sheet (max 50000 chars)
ai_response = result["ai_response"]
if len(ai_response) > 5000:
ai_response = ai_response[:5000] + "... (truncated)"
# Update sheet
worksheet.update(f"D{row_idx}", [[ai_response]])
worksheet.update(f"E{row_idx}", [[result["response_time_ms"]]])
worksheet.update(f"F{row_idx}", [[result["status"]]])
# Stats
if "OK" in result["status"]:
success_count += 1
total_time += result["response_time_ms"]
logger.info(f" → {result['status']} ({result['response_time_ms']}ms)")
# Rate limit: đợi 1 giây giữa các request
if i < len(TEST_QUESTIONS) - 1:
time.sleep(1)
# Step 4: Summary
logger.info("\n" + "=" * 60)
logger.info("📊 KẾT QUẢ TEST:")
logger.info(f" ✅ Thành công: {success_count}/{len(TEST_QUESTIONS)}")
logger.info(f" ❌ Thất bại: {len(TEST_QUESTIONS) - success_count}/{len(TEST_QUESTIONS)}")
logger.info(f" ⏱️ Tổng thời gian: {total_time}ms")
logger.info(f" 📈 Trung bình: {total_time // len(TEST_QUESTIONS)}ms/câu")
logger.info(f"\n 📄 Sheet URL: {spreadsheet.url}")
logger.info("=" * 60)
# ==============================================================================
# CLI ENTRY POINT
# ==============================================================================
def main():
parser = argparse.ArgumentParser(description="Canifa Chatbot Automated Testing")
parser.add_argument(
"--api-url",
default=DEFAULT_API_URL,
help=f"Base URL of chatbot API (default: {DEFAULT_API_URL})",
)
parser.add_argument(
"--week",
default=None,
help="Test week label (default: auto-calculated from current date)",
)
parser.add_argument(
"--round",
type=int,
default=1,
help="Test round number (default: 1)",
)
args = parser.parse_args()
# Auto-calculate test week
if args.week is None:
now = datetime.now()
week_num = now.isocalendar()[1]
args.week = f"Tuần {week_num} ({now.strftime('%m/%Y')})"
run_tests(
api_url=args.api_url,
test_week=args.week,
test_round=args.round,
)
if __name__ == "__main__":
main()
"""
🔬 CANIFA Chatbot - Bottleneck Profiler
========================================
Script đo bottleneck end-to-end cho chatbot API.
Đo:
1. API response time (tổng)
2. Time-to-first-token (TTFT) cho streaming
3. Tool execution latency
4. Memory & CPU usage
5. Async event loop lag
Dùng:
python tests/profiler_bottleneck.py # Chạy mặc định
python tests/profiler_bottleneck.py --queries 5 --url http://localhost:8001
python tests/profiler_bottleneck.py --profile cprofile # Deep profile với cProfile
python tests/profiler_bottleneck.py --profile pyinstrument # Flame graph
"""
import argparse
import asyncio
import json
import logging
import os
import platform
import statistics
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
# Add project root
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
import httpx
except ImportError:
print("❌ Cần cài httpx: pip install httpx")
sys.exit(1)
try:
import psutil
except ImportError:
psutil = None
print("⚠️ psutil không có, bỏ qua CPU/Memory monitoring")
# =====================================================================
# CONFIG
# =====================================================================
DEFAULT_URL = "http://localhost:5000"
DEFAULT_QUERIES = [
"Có áo polo nam màu xanh navy không?",
"Tìm váy liền cho bé gái 5 tuổi",
"Gợi ý outfit đi biển mùa hè",
"Áo khoác gió nam size L giá dưới 500k",
"Quần jeans nữ ống rộng màu đen",
]
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger("profiler")
# =====================================================================
# DATA MODELS
# =====================================================================
@dataclass
class QueryResult:
query: str
total_time: float = 0.0
ttft: float = 0.0 # Time to first token
token_count: int = 0
status_code: int = 0
error: str = ""
has_products: bool = False
memory_before_mb: float = 0.0
memory_after_mb: float = 0.0
cpu_percent: float = 0.0
chunks_received: int = 0
@dataclass
class ProfileReport:
results: list[QueryResult] = field(default_factory=list)
total_duration: float = 0.0
system_info: dict = field(default_factory=dict)
@property
def avg_response_time(self) -> float:
times = [r.total_time for r in self.results if r.status_code == 200]
return statistics.mean(times) if times else 0
@property
def p50_response_time(self) -> float:
times = sorted([r.total_time for r in self.results if r.status_code == 200])
return times[len(times) // 2] if times else 0
@property
def p95_response_time(self) -> float:
times = sorted([r.total_time for r in self.results if r.status_code == 200])
idx = int(len(times) * 0.95)
return times[min(idx, len(times) - 1)] if times else 0
@property
def avg_ttft(self) -> float:
times = [r.ttft for r in self.results if r.ttft > 0]
return statistics.mean(times) if times else 0
# =====================================================================
# PROFILER CORE
# =====================================================================
class BottleneckProfiler:
"""Profiler cho CANIFA Chatbot API."""
def __init__(self, base_url: str, conversation_id: str = "profiler_test"):
self.base_url = base_url.rstrip("/")
self.conversation_id = conversation_id
self.process = psutil.Process() if psutil else None
def _get_memory_mb(self) -> float:
if self.process:
return self.process.memory_info().rss / (1024 * 1024)
return 0
def _get_cpu_percent(self) -> float:
if self.process:
return self.process.cpu_percent(interval=0.1)
return 0
async def profile_streaming_query(self, query: str, client: httpx.AsyncClient) -> QueryResult:
"""Profile một query với streaming response."""
result = QueryResult(query=query)
result.memory_before_mb = self._get_memory_mb()
payload = {
"user_query": query,
"user_id": "profiler_bot",
}
start = time.perf_counter()
first_token_time = None
try:
async with client.stream(
"POST",
f"{self.base_url}/api/agent/chat-dev",
json=payload,
timeout=120.0,
) as response:
result.status_code = response.status_code
if response.status_code != 200:
result.error = f"HTTP {response.status_code}"
result.total_time = time.perf_counter() - start
return result
full_content = ""
async for chunk in response.aiter_text():
if first_token_time is None and chunk.strip():
first_token_time = time.perf_counter()
result.ttft = first_token_time - start
result.chunks_received += 1
full_content += chunk
result.total_time = time.perf_counter() - start
# Check for product IDs in response
result.has_products = "product_ids" in full_content.lower() or "sku" in full_content.lower()
result.token_count = len(full_content.split())
except httpx.TimeoutException:
result.error = "TIMEOUT (>120s)"
result.total_time = time.perf_counter() - start
except Exception as e:
result.error = str(e)
result.total_time = time.perf_counter() - start
result.memory_after_mb = self._get_memory_mb()
result.cpu_percent = self._get_cpu_percent()
return result
async def profile_non_streaming_query(self, query: str, client: httpx.AsyncClient) -> QueryResult:
"""Profile một query với non-streaming response (fallback)."""
result = QueryResult(query=query)
result.memory_before_mb = self._get_memory_mb()
payload = {
"user_query": query,
"user_id": "profiler_bot",
}
start = time.perf_counter()
try:
response = await client.post(
f"{self.base_url}/api/agent/chat-dev",
json=payload,
timeout=120.0,
)
result.total_time = time.perf_counter() - start
result.status_code = response.status_code
result.ttft = result.total_time # Non-streaming: TTFT = total
if response.status_code == 200:
body = response.text
result.has_products = "product_ids" in body.lower() or "sku" in body.lower()
result.token_count = len(body.split())
except httpx.TimeoutException:
result.error = "TIMEOUT (>120s)"
result.total_time = time.perf_counter() - start
except Exception as e:
result.error = str(e)
result.total_time = time.perf_counter() - start
result.memory_after_mb = self._get_memory_mb()
result.cpu_percent = self._get_cpu_percent()
return result
async def run(self, queries: list[str], use_streaming: bool = True) -> ProfileReport:
"""Chạy profiling cho danh sách queries."""
report = ProfileReport()
report.system_info = self._collect_system_info()
overall_start = time.perf_counter()
async with httpx.AsyncClient() as client:
# Health check
try:
health = await client.get(f"{self.base_url}/health", timeout=5.0)
if health.status_code != 200:
logger.error(f"❌ Server không healthy: {health.status_code}")
return report
logger.info(f"✅ Server healthy: {self.base_url}")
except Exception as e:
logger.error(f"❌ Không kết nối được server: {e}")
return report
# Run queries sequentially
for i, query in enumerate(queries, 1):
logger.info(f"\n{'='*60}")
logger.info(f"📝 Query {i}/{len(queries)}: {query}")
logger.info(f"{'='*60}")
if use_streaming:
result = await self.profile_streaming_query(query, client)
else:
result = await self.profile_non_streaming_query(query, client)
report.results.append(result)
# Log kết quả ngay
status = "✅" if not result.error else "❌"
logger.info(f"{status} Total: {result.total_time:.2f}s | "
f"TTFT: {result.ttft:.2f}s | "
f"Chunks: {result.chunks_received} | "
f"Tokens: ~{result.token_count}")
if result.error:
logger.info(f" ⚠️ Error: {result.error}")
if psutil:
mem_delta = result.memory_after_mb - result.memory_before_mb
logger.info(f" 💾 Memory: {result.memory_after_mb:.1f}MB "
f"(Δ{mem_delta:+.1f}MB) | CPU: {result.cpu_percent:.1f}%")
# Nghỉ giữa các queries để không quá tải
if i < len(queries):
await asyncio.sleep(2)
report.total_duration = time.perf_counter() - overall_start
return report
def _collect_system_info(self) -> dict:
info = {
"platform": platform.platform(),
"python": platform.python_version(),
"cpu_count": os.cpu_count(),
}
if psutil:
vm = psutil.virtual_memory()
info["total_ram_gb"] = round(vm.total / (1024**3), 1)
info["available_ram_gb"] = round(vm.available / (1024**3), 1)
return info
# =====================================================================
# EVENT LOOP LAG DETECTOR
# =====================================================================
class EventLoopLagMonitor:
"""Đo async event loop lag - phát hiện blocking code."""
def __init__(self, threshold_ms: float = 100):
self.threshold_ms = threshold_ms
self.lags: list[float] = []
self._running = False
async def start(self):
self._running = True
while self._running:
t1 = time.perf_counter()
await asyncio.sleep(0.01) # 10ms expected
t2 = time.perf_counter()
lag_ms = (t2 - t1 - 0.01) * 1000
if lag_ms > self.threshold_ms:
self.lags.append(lag_ms)
logger.warning(f"⚡ Event loop lag: {lag_ms:.0f}ms (threshold: {self.threshold_ms}ms)")
def stop(self):
self._running = False
@property
def summary(self) -> str:
if not self.lags:
return "✅ Không phát hiện event loop lag"
return (
f"⚠️ {len(self.lags)} lần lag > {self.threshold_ms}ms | "
f"Max: {max(self.lags):.0f}ms | Avg: {statistics.mean(self.lags):.0f}ms"
)
# =====================================================================
# REPORT GENERATOR
# =====================================================================
def print_report(report: ProfileReport):
"""In báo cáo bottleneck đẹp."""
print("\n" + "=" * 70)
print("🔬 BÁO CÁO BOTTLENECK PROFILING - CANIFA CHATBOT")
print("=" * 70)
# System info
si = report.system_info
print(f"\n📦 System: {si.get('platform', 'N/A')}")
print(f" Python: {si.get('python', 'N/A')} | CPUs: {si.get('cpu_count', 'N/A')}")
if "total_ram_gb" in si:
print(f" RAM: {si['available_ram_gb']}GB free / {si['total_ram_gb']}GB total")
# Per-query results
print(f"\n{'─'*70}")
print(f"{'Query':<35} {'Total':>8} {'TTFT':>8} {'Chunks':>8} {'Status':>8}")
print(f"{'─'*70}")
for r in report.results:
status = "✅" if not r.error else "❌"
q = r.query[:33] + ".." if len(r.query) > 33 else r.query
print(f"{q:<35} {r.total_time:>7.2f}s {r.ttft:>7.2f}s {r.chunks_received:>7} {status:>8}")
# Summary stats
print(f"\n{'='*70}")
print("📊 TỔNG KẾT")
print(f"{'='*70}")
successful = [r for r in report.results if r.status_code == 200]
failed = [r for r in report.results if r.error]
print(f"\n Queries: {len(report.results)} total | {len(successful)} thành công | {len(failed)} lỗi")
print(f" Tổng thời gian: {report.total_duration:.1f}s")
if successful:
times = [r.total_time for r in successful]
print(f"\n ⏱️ Response Time:")
print(f" Average: {report.avg_response_time:.2f}s")
print(f" P50: {report.p50_response_time:.2f}s")
print(f" P95: {report.p95_response_time:.2f}s")
print(f" Min: {min(times):.2f}s")
print(f" Max: {max(times):.2f}s")
ttfts = [r.ttft for r in successful if r.ttft > 0]
if ttfts:
print(f"\n 🚀 Time-to-First-Token (TTFT):")
print(f" Average: {report.avg_ttft:.2f}s")
print(f" Min: {min(ttfts):.2f}s")
print(f" Max: {max(ttfts):.2f}s")
if psutil:
mems = [r.memory_after_mb for r in successful]
cpus = [r.cpu_percent for r in successful if r.cpu_percent > 0]
print(f"\n 💾 Resource Usage:")
print(f" Memory: {min(mems):.0f} - {max(mems):.0f} MB")
if cpus:
print(f" CPU: {min(cpus):.0f} - {max(cpus):.0f}%")
# Bottleneck analysis
print(f"\n{'='*70}")
print("🎯 PHÂN TÍCH BOTTLENECK")
print(f"{'='*70}")
if report.avg_response_time > 15:
print("\n 🔴 CRITICAL: Response time > 15s")
print(" → Kiểm tra: LLM model latency, tool execution, network")
elif report.avg_response_time > 10:
print("\n 🟡 WARNING: Response time 10-15s")
print(" → Bình thường cho multi-tool + LLM generation")
print(" → Tối ưu: cache warming, embedding local, smaller model")
elif report.avg_response_time > 5:
print("\n 🟢 GOOD: Response time 5-10s")
print(" → Tốt cho agentic flow có tool calls")
else:
print("\n ⭐ EXCELLENT: Response time < 5s")
if report.avg_ttft > 8:
print("\n 🔴 TTFT > 8s: User phải đợi quá lâu")
print(" → Xem xét: streaming optimization, early return pattern")
elif report.avg_ttft > 4:
print("\n 🟡 TTFT 4-8s: Chấp nhận được nhưng có thể cải thiện")
else:
print("\n 🟢 TTFT < 4s: Tốt!")
# Recommendations
print(f"\n{'='*70}")
print("💡 KHUYẾN NGHỊ PYTHON PROFILING TOOLS")
print(f"{'='*70}")
print("""
📦 Đã test xong E2E bottleneck. Để đi sâu hơn, dùng các tools sau:
1. py-spy (Sampling profiler - KHÔNG cần sửa code)
pip install py-spy
py-spy top --pid <PID> # Real-time CPU profiling
py-spy record -o flame.svg -- python server.py # Flame graph
2. pyinstrument (Call-stack profiler - SIÊU DỄ DÙNG)
pip install pyinstrument
# Wrap trong code:
from pyinstrument import Profiler
profiler = Profiler(async_mode="enabled")
profiler.start()
# ... code ...
profiler.stop()
profiler.print() # Hoặc profiler.output_html()
3. yappi (Async-aware profiler - TỐT NHẤT cho asyncio)
pip install yappi
yappi.set_clock_type("wall") # Real time, không chỉ CPU
yappi.start()
# ... run queries ...
yappi.stop()
yappi.get_func_stats().print_all()
4. scalene (AI-powered profiler - CPU + Memory + GPU)
pip install scalene
scalene server.py # Tự analyze bottleneck bằng AI!
5. memray (Memory profiler - từ Bloomberg)
pip install memray
memray run server.py
memray flamegraph output.bin # Memory flame graph
6. OpenTelemetry (Distributed tracing - ĐÃ CÓ TRONG requirements.txt)
→ Đã có opentelemetry-sdk, chỉ cần enable traces!
""")
print("=" * 70)
print("✅ Profiling hoàn tất!")
print("=" * 70)
# =====================================================================
# MAIN
# =====================================================================
async def main():
parser = argparse.ArgumentParser(description="CANIFA Chatbot Bottleneck Profiler")
parser.add_argument("--url", default=DEFAULT_URL, help="Base URL of the API")
parser.add_argument("--queries", type=int, default=5, help="Number of queries to test")
parser.add_argument("--no-stream", action="store_true", help="Disable streaming")
parser.add_argument("--custom-query", type=str, help="Test with a custom query")
parser.add_argument("--profile", choices=["cprofile", "pyinstrument", "yappi"],
help="Enable deep profiling mode")
parser.add_argument("--monitor-lag", action="store_true", help="Monitor event loop lag")
parser.add_argument("--output", type=str, help="Save JSON report to file")
args = parser.parse_args()
# Build query list
if args.custom_query:
queries = [args.custom_query]
else:
queries = DEFAULT_QUERIES[: args.queries]
print("\n" + "=" * 60)
print("🔬 CANIFA CHATBOT - BOTTLENECK PROFILER")
print("=" * 60)
print(f"🌐 Target: {args.url}")
print(f"📝 Queries: {len(queries)}")
print(f"📡 Mode: {'Non-streaming' if args.no_stream else 'Streaming'}")
if args.profile:
print(f"🔍 Profiler: {args.profile}")
print(f"{'='*60}\n")
profiler = BottleneckProfiler(args.url)
# Optional event loop lag monitor
lag_monitor = None
lag_task = None
if args.monitor_lag:
lag_monitor = EventLoopLagMonitor(threshold_ms=100)
lag_task = asyncio.create_task(lag_monitor.start())
# Optional deep profiling
deep_profiler = None
if args.profile == "pyinstrument":
try:
from pyinstrument import Profiler as PyProfiler
deep_profiler = PyProfiler(async_mode="enabled")
deep_profiler.start()
logger.info("🔍 pyinstrument profiler started")
except ImportError:
logger.warning("⚠️ pyinstrument chưa cài: pip install pyinstrument")
elif args.profile == "yappi":
try:
import yappi
yappi.set_clock_type("wall")
yappi.start()
logger.info("🔍 yappi profiler started")
except ImportError:
logger.warning("⚠️ yappi chưa cài: pip install yappi")
# Run profiling
report = await profiler.run(queries, use_streaming=not args.no_stream)
# Stop monitors
if lag_monitor:
lag_monitor.stop()
if lag_task:
lag_task.cancel()
# Stop deep profilers
if args.profile == "pyinstrument" and deep_profiler:
deep_profiler.stop()
print("\n" + "=" * 60)
print("🔍 PYINSTRUMENT RESULTS")
print("=" * 60)
deep_profiler.print()
# Save HTML report
html = deep_profiler.output_html()
with open("profiler_report.html", "w") as f:
f.write(html)
print("📄 HTML report saved: profiler_report.html")
elif args.profile == "yappi":
try:
import yappi
yappi.stop()
print("\n" + "=" * 60)
print("🔍 YAPPI TOP 30 FUNCTIONS (by total time)")
print("=" * 60)
stats = yappi.get_func_stats()
stats.sort("ttot", "desc")
stats.print_all(
columns={
0: ("name", 60),
1: ("ncall", 8),
2: ("ttot", 10),
3: ("tsub", 10),
4: ("tavg", 10),
},
out=sys.stdout,
)
except ImportError:
pass
# Print report
print_report(report)
# Event loop lag summary
if lag_monitor:
print(f"\n⚡ Event Loop Lag: {lag_monitor.summary}")
# Save JSON output
if args.output:
output_data = {
"system": report.system_info,
"summary": {
"total_queries": len(report.results),
"avg_response_time": report.avg_response_time,
"p50": report.p50_response_time,
"p95": report.p95_response_time,
"avg_ttft": report.avg_ttft,
"total_duration": report.total_duration,
},
"results": [
{
"query": r.query,
"total_time": r.total_time,
"ttft": r.ttft,
"tokens": r.token_count,
"chunks": r.chunks_received,
"status": r.status_code,
"error": r.error,
"has_products": r.has_products,
"memory_mb": r.memory_after_mb,
"cpu_percent": r.cpu_percent,
}
for r in report.results
],
}
with open(args.output, "w", encoding="utf-8") as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
print(f"\n📄 JSON report saved: {args.output}")
if __name__ == "__main__":
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())
"""
🔬 CANIFA Chatbot - STRESS PROFILER (Clear Cache + Multi-User)
===============================================================
Test bottleneck kỹ càng:
1. Clear Redis cache (cold start thật)
2. Test 1 user tuần tự (baseline)
3. Test multi-user đồng thời (concurrent)
4. So sánh Cold vs Warm performance
Dùng:
python tests/profiler_stress.py # Full test (clear cache + 1 user + 3 users)
python tests/profiler_stress.py --users 5 # 5 users đồng thời
python tests/profiler_stress.py --skip-clear # Không clear cache
python tests/profiler_stress.py --warm-only # Chỉ test warm (có cache)
"""
import argparse
import asyncio
import json
import logging
import os
import platform
import statistics
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
import httpx
try:
import redis.asyncio as aioredis
except ImportError:
aioredis = None
print("⚠️ redis package không có, skip cache clear")
try:
import psutil
except ImportError:
psutil = None
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger("stress_profiler")
# =====================================================================
# CONFIG
# =====================================================================
API_URL = "http://localhost:5000"
CHAT_ENDPOINT = "/api/agent/chat-dev"
# Redis config (from config.py defaults)
REDIS_HOST = os.getenv("REDIS_CACHE_URL", "172.16.2.192")
REDIS_PORT = int(os.getenv("REDIS_CACHE_PORT", "6379"))
REDIS_DB = int(os.getenv("REDIS_CACHE_DB", "2"))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None)
REDIS_USERNAME = os.getenv("REDIS_USERNAME", None)
TEST_QUERIES = [
"Có áo polo nam màu xanh navy không?",
"Tìm váy liền cho bé gái 5 tuổi",
"Gợi ý outfit đi biển mùa hè",
"Áo khoác gió nam size L giá dưới 500k",
"Quần jeans nữ ống rộng màu đen",
]
# =====================================================================
# DATA
# =====================================================================
@dataclass
class QueryMetric:
query: str
user_id: str
total_time: float = 0.0
status_code: int = 0
error: str = ""
has_products: bool = False
tokens: int = 0
phase: str = "" # "cold" or "warm"
@dataclass
class PhaseResult:
phase: str
metrics: list[QueryMetric] = field(default_factory=list)
@property
def success_times(self):
return [m.total_time for m in self.metrics if m.status_code == 200]
@property
def avg(self):
t = self.success_times
return statistics.mean(t) if t else 0
@property
def p50(self):
t = sorted(self.success_times)
return t[len(t) // 2] if t else 0
@property
def p95(self):
t = sorted(self.success_times)
return t[int(len(t) * 0.95)] if t else 0
@property
def min_t(self):
t = self.success_times
return min(t) if t else 0
@property
def max_t(self):
t = self.success_times
return max(t) if t else 0
@property
def success_count(self):
return sum(1 for m in self.metrics if m.status_code == 200)
@property
def error_count(self):
return sum(1 for m in self.metrics if m.error)
# =====================================================================
# CACHE MANAGEMENT
# =====================================================================
async def clear_redis_cache() -> dict:
"""Clear toàn bộ Redis cache (resp_cache + emb_cache)."""
if not aioredis:
return {"status": "skipped", "reason": "redis package not installed"}
try:
conn_kwargs = {
"host": REDIS_HOST,
"port": REDIS_PORT,
"db": REDIS_DB,
"decode_responses": True,
"socket_connect_timeout": 5,
}
if REDIS_PASSWORD:
conn_kwargs["password"] = REDIS_PASSWORD
if REDIS_USERNAME:
conn_kwargs["username"] = REDIS_USERNAME
client = aioredis.Redis(**conn_kwargs)
await client.ping()
# Count keys before
resp_keys = await client.keys("resp_cache:*")
emb_keys = await client.keys("emb_cache:*")
counts = {
"resp_cache_before": len(resp_keys),
"emb_cache_before": len(emb_keys),
}
# Delete all cache keys (but keep other keys like prompt_version)
deleted = 0
if resp_keys:
deleted += await client.delete(*resp_keys)
if emb_keys:
deleted += await client.delete(*emb_keys)
counts["deleted"] = deleted
# Verify
resp_after = await client.keys("resp_cache:*")
emb_after = await client.keys("emb_cache:*")
counts["resp_cache_after"] = len(resp_after)
counts["emb_cache_after"] = len(emb_after)
await client.aclose()
return {"status": "cleared", **counts}
except Exception as e:
return {"status": "error", "error": str(e)}
async def get_cache_stats() -> dict:
"""Lấy thông tin cache hiện tại."""
if not aioredis:
return {"status": "unavailable"}
try:
conn_kwargs = {
"host": REDIS_HOST,
"port": REDIS_PORT,
"db": REDIS_DB,
"decode_responses": True,
"socket_connect_timeout": 5,
}
if REDIS_PASSWORD:
conn_kwargs["password"] = REDIS_PASSWORD
if REDIS_USERNAME:
conn_kwargs["username"] = REDIS_USERNAME
client = aioredis.Redis(**conn_kwargs)
await client.ping()
resp_keys = await client.keys("resp_cache:*")
emb_keys = await client.keys("emb_cache:*")
await client.aclose()
return {
"resp_cache_keys": len(resp_keys),
"emb_cache_keys": len(emb_keys),
}
except Exception as e:
return {"error": str(e)}
# =====================================================================
# QUERY RUNNER
# =====================================================================
async def run_query(client: httpx.AsyncClient, query: str, user_id: str, phase: str) -> QueryMetric:
"""Chạy 1 query, đo thời gian."""
metric = QueryMetric(query=query, user_id=user_id, phase=phase)
payload = {"user_query": query, "user_id": user_id}
start = time.perf_counter()
try:
response = await client.post(
f"{API_URL}{CHAT_ENDPOINT}",
json=payload,
timeout=180.0,
)
metric.total_time = time.perf_counter() - start
metric.status_code = response.status_code
if response.status_code == 200:
body = response.text
metric.has_products = "product_ids" in body.lower()
metric.tokens = len(body.split())
except httpx.TimeoutException:
metric.total_time = time.perf_counter() - start
metric.error = "TIMEOUT (>180s)"
except Exception as e:
metric.total_time = time.perf_counter() - start
metric.error = str(e)
return metric
# =====================================================================
# TEST PHASES
# =====================================================================
async def run_sequential(queries: list[str], user_id: str, phase: str) -> PhaseResult:
"""Chạy queries tuần tự - 1 user."""
result = PhaseResult(phase=phase)
async with httpx.AsyncClient() as client:
for i, query in enumerate(queries, 1):
logger.info(f" [{phase.upper()}] {i}/{len(queries)}: {query[:40]}...")
metric = await run_query(client, query, user_id, phase)
result.metrics.append(metric)
status = "✅" if not metric.error else "❌"
logger.info(f" {status} {metric.total_time:.2f}s | Products: {metric.has_products}")
# Nghỉ 1s giữa queries
if i < len(queries):
await asyncio.sleep(1)
return result
async def run_concurrent(queries: list[str], num_users: int, phase: str) -> PhaseResult:
"""Chạy queries đồng thời - nhiều users."""
result = PhaseResult(phase=phase)
async def user_worker(user_idx: int, query: str):
user_id = f"stress_user_{user_idx}"
async with httpx.AsyncClient() as client:
metric = await run_query(client, query, user_id, phase)
return metric
# Mỗi user gửi 1 query cùng lúc
tasks = []
for i in range(num_users):
query = queries[i % len(queries)]
tasks.append(user_worker(i + 1, query))
logger.info(f" [{phase.upper()}] Launching {num_users} concurrent users...")
start = time.perf_counter()
metrics = await asyncio.gather(*tasks, return_exceptions=True)
total = time.perf_counter() - start
logger.info(f" [{phase.upper()}] All {num_users} users finished in {total:.1f}s")
for m in metrics:
if isinstance(m, Exception):
result.metrics.append(QueryMetric(
query="error", user_id="unknown", error=str(m), phase=phase
))
else:
result.metrics.append(m)
return result
# =====================================================================
# REPORT
# =====================================================================
def print_phase_table(phase: PhaseResult):
"""In bảng kết quả cho 1 phase."""
print(f"\n {'User':<15} {'Query':<35} {'Time':>8} {'Status':>8}")
print(f" {'─'*68}")
for m in phase.metrics:
q = m.query[:33] + ".." if len(m.query) > 33 else m.query
status = "✅" if not m.error else f"❌ {m.error[:10]}"
print(f" {m.user_id:<15} {q:<35} {m.total_time:>7.2f}s {status:>8}")
def print_comparison(phases: list[PhaseResult]):
"""So sánh giữa các phases."""
print(f"\n {'Phase':<25} {'Avg':>8} {'P50':>8} {'P95':>8} {'Min':>8} {'Max':>8} {'OK':>5} {'Err':>5}")
print(f" {'─'*78}")
for p in phases:
if p.success_times:
print(
f" {p.phase:<25} {p.avg:>7.2f}s {p.p50:>7.2f}s {p.p95:>7.2f}s "
f"{p.min_t:>7.2f}s {p.max_t:>7.2f}s {p.success_count:>5} {p.error_count:>5}"
)
else:
print(f" {p.phase:<25} {'N/A':>8} {'N/A':>8} {'N/A':>8} {'N/A':>8} {'N/A':>8} {0:>5} {p.error_count:>5}")
def print_full_report(phases: list[PhaseResult], cache_before: dict, cache_after: dict):
"""In báo cáo tổng hợp."""
print("\n" + "=" * 75)
print("🔬 BÁO CÁO STRESS TEST - CANIFA CHATBOT")
print("=" * 75)
# System
print(f"\n📦 System: {platform.platform()}")
print(f" Python: {platform.python_version()} | CPUs: {os.cpu_count()}")
if psutil:
vm = psutil.virtual_memory()
print(f" RAM: {vm.available / (1024**3):.1f}GB free / {vm.total / (1024**3):.1f}GB total")
# Cache info
print(f"\n📦 Redis Cache:")
print(f" Before: resp={cache_before.get('resp_cache_keys', '?')}, emb={cache_before.get('emb_cache_keys', '?')}")
print(f" After: resp={cache_after.get('resp_cache_keys', '?')}, emb={cache_after.get('emb_cache_keys', '?')}")
# Per-phase details
for phase in phases:
print(f"\n{'─'*75}")
print(f"📊 Phase: {phase.phase}")
print_phase_table(phase)
# Comparison
print(f"\n{'='*75}")
print("📊 SO SÁNH CÁC PHASE")
print(f"{'='*75}")
print_comparison(phases)
# Bottleneck analysis
print(f"\n{'='*75}")
print("🎯 PHÂN TÍCH BOTTLENECK")
print(f"{'='*75}")
cold = next((p for p in phases if "cold" in p.phase.lower()), None)
warm = next((p for p in phases if "warm" in p.phase.lower()), None)
concurrent = next((p for p in phases if "concurrent" in p.phase.lower()), None)
if cold and warm and cold.success_times and warm.success_times:
speedup = cold.avg / warm.avg if warm.avg > 0 else 0
cache_impact = cold.avg - warm.avg
print(f"\n 🧊 Cold (no cache) Avg: {cold.avg:.2f}s")
print(f" 🔥 Warm (cached) Avg: {warm.avg:.2f}s")
print(f" ⚡ Cache speedup: {speedup:.1f}x ({cache_impact:.1f}s faster)")
if cache_impact > 3:
print(f" 🔴 Cache impact LỚN ({cache_impact:.1f}s): Embedding API là bottleneck chính!")
print(f" → Khuyến nghị: Cache warming, local embedding model")
elif cache_impact > 1:
print(f" 🟡 Cache impact VỪA ({cache_impact:.1f}s): Có cải thiện đáng kể")
else:
print(f" 🟢 Cache impact NHỎ ({cache_impact:.1f}s): Bottleneck ở chỗ khác (LLM hoặc DB)")
if concurrent and concurrent.success_times:
baseline_avg = warm.avg if (warm and warm.success_times) else (cold.avg if cold and cold.success_times else 0)
if baseline_avg > 0:
slowdown = concurrent.avg / baseline_avg
print(f"\n 👥 Concurrent ({len(concurrent.metrics)} users) Avg: {concurrent.avg:.2f}s")
print(f" 👤 Single user Avg: {baseline_avg:.2f}s")
print(f" 📈 Concurrency penalty: {slowdown:.1f}x")
if slowdown > 2:
print(f" 🔴 CRITICAL: {slowdown:.1f}x chậm hơn khi multi-user!")
print(f" → Có thể do: Connection pool exhaustion, GIL, rate limit LLM API")
elif slowdown > 1.3:
print(f" 🟡 Moderate penalty: Hệ thống chịu tải OK nhưng chậm hơn")
else:
print(f" 🟢 Minimal penalty: Hệ thống scale tốt!")
# Overall verdict
all_times = []
for p in phases:
all_times.extend(p.success_times)
if all_times:
overall_avg = statistics.mean(all_times)
print(f"\n 📊 Overall Avg: {overall_avg:.2f}s across {len(all_times)} queries")
if overall_avg > 15:
print(" 🔴 VERDICT: CHẬM - Cần tối ưu urgently")
elif overall_avg > 10:
print(" 🟡 VERDICT: CHẤP NHẬN ĐƯỢC - Multi-tool + LLM flow")
elif overall_avg > 5:
print(" 🟢 VERDICT: TỐT cho agentic chatbot")
else:
print(" ⭐ VERDICT: XUẤT SẮC!")
print("\n" + "=" * 75)
# =====================================================================
# MAIN
# =====================================================================
async def main():
parser = argparse.ArgumentParser(description="CANIFA Chatbot Stress Profiler")
parser.add_argument("--users", type=int, default=3, help="Concurrent users (default: 3)")
parser.add_argument("--queries", type=int, default=5, help="Queries per phase (default: 5)")
parser.add_argument("--skip-clear", action="store_true", help="Skip cache clearing")
parser.add_argument("--warm-only", action="store_true", help="Only test warm (skip cold)")
parser.add_argument("--output", type=str, help="Save JSON report")
args = parser.parse_args()
queries = TEST_QUERIES[:args.queries]
print("\n" + "=" * 60)
print("🔬 CANIFA CHATBOT - STRESS PROFILER")
print("=" * 60)
print(f"🌐 Target: {API_URL}")
print(f"📝 Queries/phase: {len(queries)}")
print(f"👥 Concurrent: {args.users} users")
print(f"🧹 Clear cache: {'No' if args.skip_clear else 'Yes'}")
print(f"{'='*60}\n")
# Health check
async with httpx.AsyncClient() as client:
try:
r = await client.get(f"{API_URL}/health", timeout=5.0)
if r.status_code != 200:
logger.error(f"❌ Server unhealthy: {r.status_code}")
return
logger.info(f"✅ Server healthy: {API_URL}")
except Exception as e:
logger.error(f"❌ Không kết nối được: {e}")
return
phases = []
# ── Phase 0: Cache before ──
cache_before = await get_cache_stats()
logger.info(f"📦 Cache trước test: {cache_before}")
# ── Phase 1: COLD TEST (clear cache first) ──
if not args.warm_only:
if not args.skip_clear:
logger.info("\n🧹 CLEARING CACHE...")
clear_result = await clear_redis_cache()
logger.info(f" Result: {clear_result}")
await asyncio.sleep(1)
logger.info("\n🧊 PHASE 1: COLD TEST (1 user, no cache)")
cold_result = await run_sequential(queries, "cold_tester", "🧊 Cold (1 user)")
phases.append(cold_result)
await asyncio.sleep(3) # Let system settle
# ── Phase 2: WARM TEST (cache populated from Phase 1) ──
logger.info("\n🔥 PHASE 2: WARM TEST (1 user, with cache)")
warm_result = await run_sequential(queries, "warm_tester", "🔥 Warm (1 user)")
phases.append(warm_result)
await asyncio.sleep(3)
# ── Phase 3: CONCURRENT TEST ──
logger.info(f"\n👥 PHASE 3: CONCURRENT TEST ({args.users} users)")
concurrent_result = await run_concurrent(queries, args.users, f"👥 Concurrent ({args.users} users)")
phases.append(concurrent_result)
# ── Cache after ──
cache_after = await get_cache_stats()
# ── Report ──
print_full_report(phases, cache_before, cache_after)
# ── Save JSON ──
if args.output:
output_data = {
"cache_before": cache_before,
"cache_after": cache_after,
"phases": [
{
"phase": p.phase,
"avg": p.avg,
"p50": p.p50,
"p95": p.p95,
"min": p.min_t,
"max": p.max_t,
"success": p.success_count,
"errors": p.error_count,
"queries": [
{
"query": m.query,
"user": m.user_id,
"time": round(m.total_time, 3),
"status": m.status_code,
"error": m.error,
"products": m.has_products,
}
for m in p.metrics
],
}
for p in phases
],
}
with open(args.output, "w", encoding="utf-8") as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
print(f"\n📄 JSON report: {args.output}")
print("\n✅ Stress test hoàn tất!\n")
if __name__ == "__main__":
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())
"""
Create Google Sheet using Sheets API v4 directly (not gspread).
This avoids Drive quota issues by creating the spreadsheet via Sheets API.
"""
import json
import sys
from pathlib import Path
from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build
CREDENTIALS_FILE = Path(__file__).parent / "google_credentials.json"
SCOPES = [
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/drive",
]
TEST_QUESTIONS = [
"Tìm cho mình chân váy màu đỏ",
"Tìm quần màu đỏ",
"Tìm áo polo nam",
"Tìm áo khoác nữ mùa đông",
"Mình muốn mua đồ đi biển, gợi ý cho mình",
"Cho mình xem áo sơ mi đi làm",
"Gợi ý outfit đi dự tiệc",
"Áo size S giá dưới 500k",
"Có khuyến mãi gì không?",
"Cách đặt hàng online",
"Cửa hàng nào gần nhất ở Hà Nội",
"Xin chào",
"Cảm ơn bạn",
"Tìm sản phẩm abc123 không tồn tại",
]
def main():
creds = Credentials.from_service_account_file(str(CREDENTIALS_FILE), scopes=SCOPES)
sheets_service = build("sheets", "v4", credentials=creds)
drive_service = build("drive", "v3", credentials=creds)
# Create spreadsheet via Sheets API
spreadsheet_body = {
"properties": {"title": "Canifa Chatbot Test Results"},
"sheets": [{
"properties": {"title": "Test Questions"},
}]
}
print("📝 Creating spreadsheet via Sheets API...")
result = sheets_service.spreadsheets().create(body=spreadsheet_body).execute()
sheet_id = result["spreadsheetId"]
sheet_url = result["spreadsheetUrl"]
print(f"✅ Created! ID: {sheet_id}")
print(f"📊 URL: {sheet_url}")
# Write headers + data
headers = ["STT", "Câu hỏi test", "Câu trả lời", "Thời gian (ms)", "Trạng thái"]
values = [headers]
for i, q in enumerate(TEST_QUESTIONS, 1):
values.append([i, q, "", "", "⏳ Đang chờ..."])
sheets_service.spreadsheets().values().update(
spreadsheetId=sheet_id,
range="Test Questions!A1:E15",
valueInputOption="RAW",
body={"values": values}
).execute()
print(f"✅ Wrote {len(TEST_QUESTIONS)} questions")
# Share with anyone (link)
try:
drive_service.permissions().create(
fileId=sheet_id,
body={"type": "anyone", "role": "writer"},
fields="id"
).execute()
print("✅ Shared as 'anyone with link can edit'")
except Exception as e:
print(f"⚠️ Could not share: {e}")
# Save sheet info
info = {"sheet_url": sheet_url, "sheet_id": sheet_id}
info_path = Path(__file__).parent / "sheet_info.json"
info_path.write_text(json.dumps(info, indent=2))
print(f"💾 Saved to {info_path}")
if __name__ == "__main__":
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment