Commit 6e5f501c authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

update: new tools for brand knowledge

parent 01f2f0ab
...@@ -50,14 +50,14 @@ async def chat_controller( ...@@ -50,14 +50,14 @@ async def chat_controller(
history_dicts = await memory.get_chat_history(user_id, limit=20) history_dicts = await memory.get_chat_history(user_id, limit=20)
history = [] history = []
for h in reversed(history_dicts): for h in reversed(history_dicts):
msg_cls = HumanMessage if h["is_human"] else AIMessage msg_cls = HumanMessage if h["is_human"] else AIMessage
history.append(msg_cls(content=h["message"])) history.append(msg_cls(content=h["message"]))
initial_state, exec_config = _prepare_execution_context( initial_state, exec_config = _prepare_execution_context(
query=query, user_id=user_id, history=history, images=images query=query, user_id=user_id, history=history, images=images
) )
try: try:
# TỐI ƯU: Chạy Graph # TỐI ƯU: Chạy Graph
result = await graph.ainvoke(initial_state, config=exec_config) result = await graph.ainvoke(initial_state, config=exec_config)
...@@ -69,15 +69,27 @@ async def chat_controller( ...@@ -69,15 +69,27 @@ async def chat_controller(
ai_raw_content = result.get("ai_response").content if result.get("ai_response") else "" ai_raw_content = result.get("ai_response").content if result.get("ai_response") else ""
logger.info(f"💾 [RAW AI OUTPUT]:\n{ai_raw_content}") logger.info(f"💾 [RAW AI OUTPUT]:\n{ai_raw_content}")
# Chỉ parse JSON một lần để lấy Explicit IDs từ AI (Nếu có) # Parse JSON để lấy text response và product_ids từ AI
ai_text_response = ai_raw_content
try: try:
# Vì json_mode=True, OpenAI sẽ nhả raw JSON, không cần regex rườm rà # Vì json_mode=True, OpenAI sẽ nhả raw JSON
ai_json = json.loads(ai_raw_content) ai_json = json.loads(ai_raw_content)
# Extract text response từ JSON
ai_text_response = ai_json.get("ai_response", ai_raw_content)
# Merge product_ids từ AI JSON (nếu có) - KHÔNG dùng set() vì dict unhashable
explicit_ids = ai_json.get("product_ids", []) explicit_ids = ai_json.get("product_ids", [])
if explicit_ids: if explicit_ids and isinstance(explicit_ids, list):
all_product_ids = list(set(all_product_ids + explicit_ids)) # Merge và deduplicate by SKU
except (json.JSONDecodeError, Exception): seen_skus = {p["sku"] for p in all_product_ids if "sku" in p}
for product in explicit_ids:
if isinstance(product, dict) and product.get("sku") not in seen_skus:
all_product_ids.append(product)
seen_skus.add(product.get("sku"))
except (json.JSONDecodeError, Exception) as e:
# Nếu AI trả về text thường (hiếm khi xảy ra trong JSON mode) thì ignore # Nếu AI trả về text thường (hiếm khi xảy ra trong JSON mode) thì ignore
logger.warning(f"Could not parse AI response as JSON: {e}")
pass pass
# BACKGROUND TASK: Lưu history nhanh gọn # BACKGROUND TASK: Lưu history nhanh gọn
...@@ -86,12 +98,12 @@ async def chat_controller( ...@@ -86,12 +98,12 @@ async def chat_controller(
memory=memory, memory=memory,
user_id=user_id, user_id=user_id,
human_query=query, human_query=query,
ai_msg=AIMessage(content=ai_raw_content), ai_msg=AIMessage(content=ai_text_response),
) )
return { return {
"ai_response": ai_raw_content, "ai_response": ai_text_response, # CHỈ text, không phải JSON
"product_ids": all_product_ids, "product_ids": all_product_ids, # Array of product objects
} }
except Exception as e: except Exception as e:
...@@ -99,12 +111,13 @@ async def chat_controller( ...@@ -99,12 +111,13 @@ async def chat_controller(
raise raise
def _extract_product_ids(messages: list) -> list[str]: def _extract_product_ids(messages: list) -> list[dict]:
""" """
Extract product internal_ref_code from tool messages (data_retrieval_tool results). Extract full product info from tool messages (data_retrieval_tool results).
Returns list of unique product IDs. Returns list of product objects with: sku, name, price, sale_price, url, thumbnail_image_url.
""" """
product_ids = [] products = []
seen_skus = set()
for msg in messages: for msg in messages:
if isinstance(msg, ToolMessage): if isinstance(msg, ToolMessage):
...@@ -115,14 +128,25 @@ def _extract_product_ids(messages: list) -> list[str]: ...@@ -115,14 +128,25 @@ def _extract_product_ids(messages: list) -> list[str]:
# Check if tool returned products # Check if tool returned products
if tool_result.get("status") == "success" and "products" in tool_result: if tool_result.get("status") == "success" and "products" in tool_result:
for product in tool_result["products"]: for product in tool_result["products"]:
product_id = product.get("internal_ref_code") sku = product.get("internal_ref_code")
if product_id and product_id not in product_ids: if sku and sku not in seen_skus:
product_ids.append(product_id) seen_skus.add(sku)
# Extract full product info
product_obj = {
"sku": sku,
"name": product.get("magento_product_name", ""),
"price": product.get("price_vnd", 0),
"sale_price": product.get("sale_price_vnd"), # null nếu không sale
"url": product.get("magento_url_key", ""),
"thumbnail_image_url": product.get("thumbnail_image_url", ""),
}
products.append(product_obj)
except (json.JSONDecodeError, KeyError, TypeError) as e: except (json.JSONDecodeError, KeyError, TypeError) as e:
logger.debug(f"Could not parse tool message for product IDs: {e}") logger.debug(f"Could not parse tool message for products: {e}")
continue continue
return product_ids return products
def _prepare_execution_context(query: str, user_id: str, history: list, images: list | None): def _prepare_execution_context(query: str, user_id: str, history: list, images: list | None):
...@@ -136,8 +160,8 @@ def _prepare_execution_context(query: str, user_id: str, history: list, images: ...@@ -136,8 +160,8 @@ def _prepare_execution_context(query: str, user_id: str, history: list, images:
"ai_response": None, "ai_response": None,
} }
run_id = str(uuid.uuid4()) run_id = str(uuid.uuid4())
# Metadata for LangSmith # Metadata for LangSmith (TẮT TẠM VÌ RATE LIMIT)
metadata = {"user_id": user_id, "run_id": run_id} # metadata = {"user_id": user_id, "run_id": run_id}
exec_config = RunnableConfig( exec_config = RunnableConfig(
configurable={ configurable={
...@@ -146,7 +170,7 @@ def _prepare_execution_context(query: str, user_id: str, history: list, images: ...@@ -146,7 +170,7 @@ def _prepare_execution_context(query: str, user_id: str, history: list, images:
"run_id": run_id, "run_id": run_id,
}, },
run_id=run_id, run_id=run_id,
metadata=metadata, # Attach metadata for LangSmith # metadata=metadata, # Attach metadata for LangSmith
) )
return initial_state, exec_config return initial_state, exec_config
......
This diff is collapsed.
import logging
from langchain_core.tools import tool
from pydantic import BaseModel, Field
from common.embedding_service import create_embedding_async
from common.starrocks_connection import StarRocksConnection
logger = logging.getLogger(__name__)
class KnowledgeSearchInput(BaseModel):
query: str = Field(
description="Câu hỏi hoặc nhu cầu tìm kiếm thông tin phi sản phẩm của khách hàng (ví dụ: tìm cửa hàng, hỏi chính sách, tra bảng size...)"
)
@tool("canifa_knowledge_search", args_schema=KnowledgeSearchInput)
async def canifa_knowledge_search(query: str) -> str:
"""
Tra cứu TOÀN BỘ thông tin về thương hiệu và dịch vụ của Canifa.
Sử dụng tool này khi khách hàng hỏi về:
1. THƯƠNG HIỆU & GIỚI THIỆU: Lịch sử hình thành, giá trị cốt lõi, sứ mệnh.
2. HỆ THỐNG CỬA HÀNG: Tìm địa chỉ, số điện thoại, giờ mở cửa các cửa hàng tại các tỉnh thành (Hà Nội, HCM, Đà Nẵng, v.v.).
3. CHÍNH SÁCH BÁN HÀNG: Quy định đổi trả, bảo hành, chính sách vận chuyển, phí ship.
4. KHÁCH HÀNG THÂN THIẾT (KHTT): Điều kiện đăng ký thành viên, các hạng thẻ (Green, Silver, Gold, Diamond), quyền lợi tích điểm, thẻ quà tặng.
5. HỖ TRỢ & FAQ: Giải đáp thắc mắc thường gặp, chính sách bảo mật, thông tin liên hệ văn phòng, tuyển dụng.
6. TRA CỨU SIZE (BẢNG KÍCH CỠ): Hướng dẫn chọn size chuẩn cho nam, nữ, trẻ em dựa trên chiều cao, cân nặng.
Ví dụ các câu hỏi phù hợp:
- 'Canifa ở Cầu Giấy địa chỉ ở đâu?'
- 'Chính sách đổi trả hàng trong bao nhiêu ngày?'
- 'Làm sao để lên hạng thẻ Gold?'
- 'Cho mình xem bảng size áo nam.'
- 'Phí vận chuyển đi tỉnh là bao nhiêu?'
- 'Canifa thành lập năm nào?'
"""
logger.info(f"🔍 [Semantic Search] Brand Knowledge query: {query}")
try:
# 1. Tạo embedding cho câu hỏi (Mặc định 1536 chiều như bro yêu cầu)
query_vector = await create_embedding_async(query)
if not query_vector:
return "Xin lỗi, tôi gặp sự cố khi xử lý thông tin. Vui lòng thử lại sau."
v_str = "[" + ",".join(str(v) for v in query_vector) + "]"
# 2. Query StarRocks lấy Top 4 kết quả phù hợp nhất (Không check score)
sql = f"""
SELECT
content,
metadata
FROM shared_source.chatbot_rsa_knowledge
ORDER BY approx_cosine_similarity(embedding, {v_str}) DESC
LIMIT 4
"""
sr = StarRocksConnection()
results = await sr.execute_query_async(sql)
if not results:
logger.warning(f"⚠️ No knowledge data found in DB for query: {query}")
return "Hiện tại tôi chưa tìm thấy thông tin chính xác về nội dung này trong hệ thống kiến thức của Canifa. Bạn có thể liên hệ hotline 1800 6061 để được hỗ trợ trực tiếp."
# 3. Tổng hợp kết quả
knowledge_texts = []
for i, res in enumerate(results):
content = res.get("content", "")
knowledge_texts.append(content)
# LOG DỮ LIỆU LẤY ĐƯỢC (Chỉ hiển thị nội dung)
logger.info(f"📄 [Knowledge Chunk {i + 1}]: {content[:200]}...")
final_response = "\n\n---\n\n".join(knowledge_texts)
logger.info(f"✅ Found {len(results)} relevant knowledge chunks.")
return final_response
except Exception as e:
logger.error(f"❌ Error in canifa_knowledge_search: {e}")
return "Tôi đang gặp khó khăn khi truy cập kho kiến thức. Bạn muốn hỏi về sản phẩm gì khác không?"
This diff is collapsed.
...@@ -5,13 +5,14 @@ Chỉ return 1 tool duy nhất: data_retrieval_tool ...@@ -5,13 +5,14 @@ Chỉ return 1 tool duy nhất: data_retrieval_tool
from langchain_core.tools import Tool from langchain_core.tools import Tool
from .brand_knowledge_tool import canifa_knowledge_search
from .customer_info_tool import collect_customer_info from .customer_info_tool import collect_customer_info
from .data_retrieval_tool import data_retrieval_tool from .data_retrieval_tool import data_retrieval_tool
def get_retrieval_tools() -> list[Tool]: def get_retrieval_tools() -> list[Tool]:
"""Các tool chỉ dùng để đọc/truy vấn dữ liệu (Có thể cache)""" """Các tool chỉ dùng để đọc/truy vấn dữ liệu (Có thể cache)"""
return [data_retrieval_tool] return [data_retrieval_tool, canifa_knowledge_search]
def get_collection_tools() -> list[Tool]: def get_collection_tools() -> list[Tool]:
......
This diff is collapsed.
...@@ -16,7 +16,7 @@ logger = logging.getLogger(__name__) ...@@ -16,7 +16,7 @@ logger = logging.getLogger(__name__)
router = APIRouter() router = APIRouter()
@router.post("/chat", summary="Fashion Q&A Chat (Non-streaming)") @router.post("/api/agent/chat", summary="Fashion Q&A Chat (Non-streaming)")
async def fashion_qa_chat(req: QueryRequest, background_tasks: BackgroundTasks): async def fashion_qa_chat(req: QueryRequest, background_tasks: BackgroundTasks):
""" """
Endpoint chat không stream - trả về response JSON đầy đủ một lần. Endpoint chat không stream - trả về response JSON đầy đủ một lần.
......
...@@ -23,7 +23,7 @@ class ChatHistoryResponse(BaseModel): ...@@ -23,7 +23,7 @@ class ChatHistoryResponse(BaseModel):
next_cursor: int | None = None next_cursor: int | None = None
@router.get("/history/{user_id}", summary="Get Chat History by User ID", response_model=ChatHistoryResponse) @router.get("/api/history/{user_id}", summary="Get Chat History by User ID", response_model=ChatHistoryResponse)
async def get_chat_history(user_id: str, limit: int | None = 50, before_id: int | None = None): async def get_chat_history(user_id: str, limit: int | None = 50, before_id: int | None = None):
""" """
Lấy lịch sử chat của user từ Postgres database. Lấy lịch sử chat của user từ Postgres database.
......
"""
Test API Routes - Tất cả endpoints cho testing (isolated)
KHÔNG ĐỘNG VÀO chatbot_route.py chính!
"""
import asyncio
import logging
import random
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from agent.models import QueryRequest
from common.load_test_manager import get_load_test_manager
router = APIRouter(prefix="/test", tags=["Testing & Load Test"])
logger = logging.getLogger(__name__)
# ==================== MOCK CHAT ENDPOINT ====================
@router.post("/chat-mock", summary="Mock Chat API (for Load Testing)")
async def mock_chat(req: QueryRequest):
"""
Endpoint MOCK để test performance KHÔNG tốn tiền OpenAI.
Trả về response giả lập với latency ngẫu nhiên.
⚠️ CHỈ DÙNG CHO LOAD TESTING!
"""
# Giả lập latency của real API (100-500ms)
await asyncio.sleep(random.uniform(0.1, 0.5))
# Mock responses
mock_responses = [
"Dạ em đã tìm được một số mẫu áo sơ mi nam đẹp cho anh/chị ạ. Anh/chị có thể xem các sản phẩm sau đây.",
"Em xin gợi ý một số mẫu áo thun nam phù hợp với yêu cầu của anh/chị.",
"Dạ, em có tìm thấy một số mẫu quần jean nam trong khoảng giá anh/chị yêu cầu ạ.",
"Em xin giới thiệu các mẫu áo khoác nam đang có khuyến mãi tốt ạ.",
"Anh/chị có thể tham khảo các mẫu giày thể thao nam đang được ưa chuộng nhất.",
]
# Mock product IDs
mock_product_ids = [
f"MOCK_PROD_{random.randint(1000, 9999)}"
for _ in range(random.randint(2, 5))
]
return {
"status": "success",
"ai_response": random.choice(mock_responses),
"product_ids": mock_product_ids,
"_mock": True, # Flag để biết đây là mock response
"_latency_ms": random.randint(100, 500)
}
@router.post("/db-search", summary="DB Search Mock (Test StarRocks Performance)")
async def mock_db_search(req: QueryRequest):
"""
Endpoint để test PERFORMANCE của StarRocks DB query.
Hỗ trợ Multi-Search (Parallel).
"""
from agent.tools.data_retrieval_tool import data_retrieval_tool
try:
# Mock Multi-Search call (Parallel)
tool_result = await data_retrieval_tool.ainvoke({
"searches": [
{
"keywords": "áo sơ mi",
"gender_by_product": "male",
"price_max": 500000
},
{
"keywords": "quần jean",
"gender_by_product": "male",
"price_max": 800000
}
]
})
# Parse result
import json
result_data = json.loads(tool_result)
# Collect all product IDs from all search results
all_product_ids = []
if result_data.get("status") == "success":
for res in result_data.get("results", []):
ids = [p.get("internal_ref_code", "") for p in res.get("products", [])]
all_product_ids.extend(ids)
return {
"status": "success",
"ai_response": "Kết quả Multi-Search Parallel từ DB",
"product_ids": list(set(all_product_ids)),
"_db_test": True,
"_queries_count": len(result_data.get("results", [])),
"_total_products": len(all_product_ids)
}
except Exception as e:
logger.error(f"DB multi-search error: {e}")
return {
"status": "error",
"ai_response": f"Lỗi: {str(e)}",
"product_ids": [],
"_error": str(e)
}
# ==================== LOAD TEST CONTROL ====================
class StartTestRequest(BaseModel):
"""Request body để start test"""
target_url: str = Field(default="http://localhost:5000", description="Base URL của target")
num_users: int = Field(default=10, ge=1, le=1000, description="Số lượng concurrent users")
spawn_rate: int = Field(default=2, ge=1, le=100, description="Tốc độ spawn users (users/second)")
duration_seconds: int = Field(default=60, ge=10, le=600, description="Thời gian chạy test (giây)")
test_type: str = Field(default="chat_mock", description="chat_mock | chat_real | history")
@router.post("/loadtest/start", summary="Bắt đầu Load Test")
async def start_load_test(req: StartTestRequest):
"""
Bắt đầu load test với config được chỉ định.
**test_type options:**
- `chat_mock`: Test mock chat API (KHÔNG tốn tiền) ⭐ Khuyên dùng
- `chat_real`: Test real chat API (TỐN TIỀN OpenAI!)
- `history`: Test history API (không tốn tiền LLM)
"""
try:
manager = get_load_test_manager()
config_dict = req.model_dump()
result = manager.start_test(config_dict)
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
return {
"status": "success",
"message": "Load test started",
"data": result
}
except Exception as e:
logger.error(f"Error starting load test: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/loadtest/stop", summary="Dừng Load Test")
async def stop_load_test():
"""Dừng load test đang chạy"""
try:
manager = get_load_test_manager()
result = manager.stop_test()
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
return {
"status": "success",
"message": "Load test stopped",
"data": result
}
except Exception as e:
logger.error(f"Error stopping load test: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/loadtest/metrics", summary="Lấy Metrics Realtime")
async def get_load_test_metrics():
"""
Lấy metrics realtime của load test.
Frontend poll endpoint này mỗi 2 giây.
"""
try:
manager = get_load_test_manager()
metrics = manager.get_metrics()
return {
"status": "success",
"data": metrics
}
except Exception as e:
logger.error(f"Error getting metrics: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/loadtest/status", summary="Check Test Status")
async def get_load_test_status():
"""Check xem load test có đang chạy không"""
try:
manager = get_load_test_manager()
return {
"status": "success",
"data": {
"is_running": manager.is_running(),
"current_status": manager.status
}
}
except Exception as e:
logger.error(f"Error getting status: {e}")
raise HTTPException(status_code=500, detail=str(e))
This diff is collapsed.
...@@ -130,25 +130,45 @@ class StarRocksConnection: ...@@ -130,25 +130,45 @@ class StarRocksConnection:
charset="utf8mb4", charset="utf8mb4",
cursorclass=aiomysql.DictCursor, cursorclass=aiomysql.DictCursor,
minsize=10, # Sẵn sàng 10 kết nối ngay lập tức (Cực nhanh cho Prod) minsize=10, # Sẵn sàng 10 kết nối ngay lập tức (Cực nhanh cho Prod)
maxsize=50, # Tối đa 50 kết nối (Đủ cân hàng nghìn users, an toàn trên Windows) maxsize=50, # Tăng nhẹ lên 50 (Cân bằng giữa throughput và memory)
connect_timeout=10, connect_timeout=10,
) )
return StarRocksConnection._shared_pool return StarRocksConnection._shared_pool
async def execute_query_async(self, query: str, params: tuple | None = None) -> list[dict[str, Any]]: async def execute_query_async(self, query: str, params: tuple | None = None) -> list[dict[str, Any]]:
""" """
Execute query asynchronously using aiomysql pool Execute query asynchronously using aiomysql pool with Retry Logic.
""" """
pool = await self.get_pool() max_retries = 3
logger.info("🚀 Executing Async Query.") last_error = None
async with pool.acquire() as conn, conn.cursor() as cursor: for attempt in range(max_retries):
await cursor.execute(query, params) try:
results = await cursor.fetchall() pool = await self.get_pool()
logger.info(f"📊 Async Query successful, returned {len(results)} rows") # logger.info(f"🚀 Executing Async Query (Attempt {attempt+1}).")
# aiomysql returns tuples or dicts depending on cursor.
# Since we asked for DictCursor, results are dicts. async with pool.acquire() as conn, conn.cursor() as cursor:
return [dict(row) for row in results] await cursor.execute(query, params)
results = await cursor.fetchall()
# logger.info(f"📊 Async Query successful, returned {len(results)} rows")
return [dict(row) for row in results]
except Exception as e:
last_error = e
logger.warning(f"⚠️ StarRocks DB Error (Attempt {attempt+1}/{max_retries}): {e}")
if "Memory of process exceed limit" in str(e):
# Nếu StarRocks OOM, đợi một chút rồi thử lại
await asyncio.sleep(0.5 * (attempt + 1))
continue
elif "Disconnected" in str(e) or "Lost connection" in str(e):
# Nếu mất kết nối, có thể pool bị stale, thử lại ngay
continue
else:
# Các lỗi khác (cú pháp,...) thì raise luôn
raise
logger.error(f"❌ Failed after {max_retries} attempts: {last_error}")
raise last_error
def close(self): def close(self):
"""Explicitly close if needed (e.g. app shutdown)""" """Explicitly close if needed (e.g. app shutdown)"""
......
...@@ -83,11 +83,15 @@ LANGFUSE_SECRET_KEY: str | None = os.getenv("LANGFUSE_SECRET_KEY") ...@@ -83,11 +83,15 @@ LANGFUSE_SECRET_KEY: str | None = os.getenv("LANGFUSE_SECRET_KEY")
LANGFUSE_PUBLIC_KEY: str | None = os.getenv("LANGFUSE_PUBLIC_KEY") LANGFUSE_PUBLIC_KEY: str | None = os.getenv("LANGFUSE_PUBLIC_KEY")
LANGFUSE_BASE_URL: str | None = os.getenv("LANGFUSE_BASE_URL", "https://cloud.langfuse.com") LANGFUSE_BASE_URL: str | None = os.getenv("LANGFUSE_BASE_URL", "https://cloud.langfuse.com")
# ====================== LANGSMITH CONFIGURATION ====================== # ====================== LANGSMITH CONFIGURATION (TẮT VÌ RATE LIMIT) ======================
LANGSMITH_TRACING = os.getenv("LANGSMITH_TRACING", "false") # LANGSMITH_TRACING = os.getenv("LANGSMITH_TRACING", "false")
LANGSMITH_ENDPOINT = os.getenv("LANGSMITH_ENDPOINT", "https://api.smith.langchain.com") # LANGSMITH_ENDPOINT = os.getenv("LANGSMITH_ENDPOINT", "https://api.smith.langchain.com")
LANGSMITH_API_KEY = os.getenv("LANGSMITH_API_KEY") # LANGSMITH_API_KEY = os.getenv("LANGSMITH_API_KEY")
LANGSMITH_PROJECT = os.getenv("LANGSMITH_PROJECT") # LANGSMITH_PROJECT = os.getenv("LANGSMITH_PROJECT")
LANGSMITH_TRACING = "false"
LANGSMITH_ENDPOINT = None
LANGSMITH_API_KEY = None
LANGSMITH_PROJECT = None
# ====================== CLERK AUTHENTICATION ====================== # ====================== CLERK AUTHENTICATION ======================
CLERK_SECRET_KEY: str | None = os.getenv("CLERK_SECRET_KEY") CLERK_SECRET_KEY: str | None = os.getenv("CLERK_SECRET_KEY")
......
"""
Script tự động thêm context (tên bảng + subsection) vào tất cả size entries trong tonghop.txt
Ví dụ: "Size 92 (2Y):" -> "Size 92 (2Y) - BẢNG SIZE CHUNG CHO UNISEX - TRẺ EM (Dải size lẻ):"
"""
import re
def add_context_to_sizes(input_file, output_file):
with open(input_file, encoding="utf-8") as f:
lines = f.readlines()
result = []
current_table = None # Tên bảng hiện tại
current_subsection = None # Subsection hiện tại (Dải size lẻ, chẵn...)
for line in lines:
stripped = line.strip()
# Phát hiện header bảng (bắt đầu bằng BẢNG hoặc QUẦN)
if stripped.startswith("BẢNG SIZE") or stripped.startswith("QUẦN"):
current_table = stripped
current_subsection = None # Reset subsection khi sang bảng mới
result.append(line)
continue
# Phát hiện subsection (Dải size lẻ, Dải size chẵn)
if "Dải size" in stripped or stripped.startswith("Dải"):
current_subsection = stripped.rstrip(":")
result.append(line)
continue
# Phát hiện dòng Size (bắt mọi pattern: Size XS:, Size 92 (2Y):, Size 26 (XS):)
size_match = re.match(r"^(Size\s+[A-Z0-9]+(?:\s*\([^)]+\))?):(.*)$", stripped)
if size_match and current_table:
size_part = size_match.group(1) # "Size 92 (2Y)" hoặc "Size XS"
rest = size_match.group(2) # Phần còn lại sau dấu :
# Xây dựng context
context_parts = [current_table]
if current_subsection:
context_parts.append(f"({current_subsection})")
context = " - ".join(context_parts)
# Tạo dòng mới với context
new_line = f"{size_part} - {context}:{rest}\n"
result.append(new_line)
continue
# Giữ nguyên các dòng khác
result.append(line)
# Ghi file output
with open(output_file, "w", encoding="utf-8") as f:
f.writelines(result)
print("✅ Đã thêm context vào tất cả size entries!")
print(f"📝 File output: {output_file}")
if __name__ == "__main__":
input_path = r"d:\cnf\chatbot_canifa\backend\datadb\tonghop.txt"
output_path = r"d:\cnf\chatbot_canifa\backend\datadb\tonghop_with_context.txt"
add_context_to_sizes(input_path, output_path)
print("\n🔍 Preview 10 dòng đầu của file mới:")
with open(output_path, encoding="utf-8") as f:
for i, line in enumerate(f):
if i >= 1160 and i < 1170: # Vùng có size entries
print(line.rstrip())
import os
import json
import pymysql
from openai import OpenAI
import time
# ==========================================
# 🔐 HARD KEY CONFIGURATION (As requested)
# ==========================================
OPENAI_API_KEY = "sk-proj-srJ3l3B5q1CzRezXAnaewbbRfuWzIjYHbcAdggzsa4MmtXEHaIwS1OTkMgLpMDikgh"
SR_HOST = "172.16.2.100"
SR_PORT = 9030
SR_USER = "anhvh"
SR_PASS = "v0WYGeyLRCckXotT"
SR_DB = "shared_source"
# Parameter
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50
EMBEDDING_MODEL = "text-embedding-3-small" # 1536 dimensions
client = OpenAI(api_key=OPENAI_API_KEY)
def get_embedding(text):
"""Lấy vector 1536 chiều từ OpenAI"""
try:
text = text.replace("\n", " ")
return client.embeddings.create(input=[text], model=EMBEDDING_MODEL).data[0].embedding
except Exception as e:
print(f"❌ Lỗi Embedding: {e}")
return None
def connect_starrocks():
return pymysql.connect(
host=SR_HOST,
port=SR_PORT,
user=SR_USER,
password=SR_PASS,
database=SR_DB,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
def chunk_text(text, size=CHUNK_SIZE, overlap=CHUNK_OVERLAP):
"""Chia nhỏ văn bản với overlap"""
chunks = []
start = 0
while start < len(text):
end = start + size
chunks.append(text[start:end])
start += size - overlap
return chunks
def ingest():
input_file = r"d:\cnf\chatbot_canifa\backend\datadb\tonghop.txt"
if not os.path.exists(input_file):
print(f"❌ Không tìm thấy file: {input_file}")
return
print(f"📖 Đang đọc file {input_file}...")
with open(input_file, "r", encoding="utf-8") as f:
full_content = f.read()
# Tách dữ liệu theo từng FILE giả định trong tonghop.txt
sections = full_content.split("================================================================================")
db = connect_starrocks()
cursor = db.cursor()
total_chunks = 0
record_id = int(time.time()) # Làm ID cơ bản
for section in sections:
if not section.strip(): continue
# Lấy tiêu đề file nếu có
lines = section.strip().split("\n")
title = "Canifa Knowledge"
if "FILE:" in lines[0]:
title = lines[0].replace("FILE:", "").strip()
content = "\n".join(lines[1:])
else:
content = section
print(f"🚀 Đang xử lý section: {title}")
chunks = chunk_text(content)
for i, chunk in enumerate(chunks):
if len(chunk.strip()) < 20: continue # Bỏ qua đoạn quá ngắn
vector = get_embedding(chunk)
if not vector: continue
metadata = {
"title": title,
"chunk_idx": i,
"source": "tonghop.txt",
"timestamp": time.time()
}
sql = "INSERT INTO shared_source.canifa_knowledge (id, content, metadata, embedding) VALUES (%s, %s, %s, %s)"
try:
cursor.execute(sql, (record_id, chunk, json.dumps(metadata, ensure_ascii=False), str(vector)))
record_id += 1
total_chunks += 1
if total_chunks % 10 == 0:
db.commit()
print(f"✅ Đã nạp {total_chunks} chunks...")
except Exception as e:
print(f"❌ Lỗi SQL: {e}")
db.commit()
db.close()
print(f"🎊 HOÀN THÀNH! Tổng cộng đã nạp {total_chunks} vào StarRocks.")
if __name__ == "__main__":
ingest()
This diff is collapsed.
This diff is collapsed.
import asyncio
import logging
from common.starrocks_connection import StarRocksConnection
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def main():
try:
db = StarRocksConnection()
print("🔌 Connecting to StarRocks...")
# Get Create Table Statement
table_name = "shared_source.magento_product_dimension_with_text_embedding"
res = await db.execute_query_async(f"SHOW CREATE TABLE {table_name}")
if res:
print("\n=== RAW RESULT KEYS ===")
print(res[0].keys())
print(res[0])
else:
print("❌ Could not get table info.")
except Exception as e:
print(f"❌ Error: {e}")
if __name__ == "__main__":
asyncio.run(main())
import asyncio
import os
import sys
# Ensure backend directory is in python path
current_dir = os.path.dirname(os.path.abspath(__file__))
if current_dir not in sys.path:
sys.path.append(current_dir)
from agent.tools.product_search_helpers import build_starrocks_query
class Params:
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
async def main():
# Params for "tìm cho tao áo side m phù hợp để đi chơi"
text = "tìm cho tao áo side m phù hợp để đi chơi"
# Mocking what the extraction layer might produce
params = Params(
query=text,
size_scale="M",
keywords="áo side m đi chơi"
)
print(f"Generating query for: {text}")
try:
sql = await build_starrocks_query(params)
print("Successfully generated query.")
print("Check d:\\cnf\\chatbot_canifa\\backend\\embedding.txt")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
asyncio.run(main())
This diff is collapsed.
This diff is collapsed.
Count,Message,Traceback,Nodes
Method,Name,Error,Occurrences
POST,/api/agent/chat [MAIN],CatchResponseError('Status: 404'),80
POST,/api/agent/chat [MAIN],CatchResponseError('Status: 0'),7
POST,/test/db-search [DB_ONLY],HTTPError('404 Client Error: Not Found for url: /test/db-search [DB_ONLY]'),231
Type,Name,Request Count,Failure Count,Median Response Time,Average Response Time,Min Response Time,Max Response Time,Average Content Size,Requests/s,Failures/s,50%,66%,75%,80%,90%,95%,98%,99%,99.9%,99.99%,100%
POST,/api/agent/chat [MAIN],87,87,1200.0,1715.7884011494702,1.9360000733286142,6251.831999979913,20.229885057471265,3.7430531309093538,3.7430531309093538,1200,1500,2600,2600,4300,6000,6200,6300,6300,6300,6300
GET,/health,6,0,760.0,1024.0447499866907,86.62169997114688,2807.93920008,17220.0,0.25814159523512786,0.0,770,770,1500,1500,2800,2800,2800,2800,2800,2800,2800
POST,/test/db-search [DB_ONLY],231,231,800.0,1095.216632898276,2.817099913954735,7268.948999932036,22.0,9.938451416552422,9.938451416552422,800,1200,1300,1500,2600,3100,5100,5800,7300,7300,7300
,Aggregated,324,318,890.0,1260.533646911808,1.9360000733286142,7268.948999932036,340.0061728395062,13.939646142696905,13.681504547461776,890,1300,1500,2100,2600,4200,6000,6200,7300,7300,7300
Timestamp,User Count,Type,Name,Requests/s,Failures/s,50%,66%,75%,80%,90%,95%,98%,99%,99.9%,99.99%,100%,Total Request Count,Total Failure Count,Total Median Response Time,Total Average Response Time,Total Min Response Time,Total Max Response Time,Total Average Content Size
1767665177,0,,Aggregated,0.000000,0.000000,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,0,0,0,0.0,0,0,0
1767665178,10,,Aggregated,0.000000,0.000000,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,0,0,0,0.0,0,0,0
1767665179,20,,Aggregated,0.000000,0.000000,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,0,0,0,0.0,0,0,0
1767665180,30,,Aggregated,0.000000,0.000000,2600,2600,2600,2600,2600,2800,2800,2800,2800,2800,2800,12,11,2600.0,2172.7166083583143,13.209500000812113,2807.93920008,1455.1666666666667
1767665181,40,,Aggregated,0.000000,0.000000,2600,2600,2600,2800,3000,3100,3200,3200,3200,3200,3200,23,22,2600.0,2127.1039913256614,13.209500000812113,3222.1904000034556,769.7391304347826
1767665182,50,,Aggregated,3.333333,3.000000,2600,2600,2600,2600,2900,3100,3400,3400,3400,3400,3400,41,40,2600.0,1878.9734097606524,13.209500000812113,3379.774900036864,441.4634146341463
1767665183,50,,Aggregated,4.000000,3.750000,2600,2600,2600,2600,2900,3100,3200,3400,3400,3400,3400,51,50,2600.0,1680.0740803934304,13.209500000812113,3379.774900036864,359.2156862745098
1767665185,50,,Aggregated,6.000000,5.800000,2400,2600,2600,2600,3100,3500,4100,4200,4200,4200,4200,68,67,1100.0,1646.725695587092,13.209500000812113,4216.733000008389,274.9117647058824
1767665186,50,,Aggregated,7.285714,7.142857,1400,2600,2600,2600,3400,4200,5200,5600,5600,5600,5600,80,79,1400.0,1743.8070987467654,13.209500000812113,5642.961099976674,236.975
1767665187,50,,Aggregated,8.500000,8.375000,1200,2600,2600,2600,3500,5000,5200,5600,6100,6100,6100,106,105,1200.0,1724.8162594327334,13.209500000812113,6071.023600059561,184.24528301886792
1767665188,50,,Aggregated,8.777778,8.666667,1100,2400,2600,2600,5000,6000,6300,7200,7300,7300,7300,137,136,1100.0,1771.4287751791046,13.209500000812113,7268.948999932036,147.53284671532847
1767665190,50,,Aggregated,13.000000,12.900000,850,1200,2600,2600,3600,5600,6200,7200,7300,7300,7300,169,166,850.0,1466.0903857917574,2.817099913954735,7268.948999932036,327.2899408284024
1767665191,50,,Aggregated,13.900000,13.800000,830,1200,2400,2600,3200,5200,6200,7200,7300,7300,7300,191,188,830.0,1378.066231930599,2.817099913954735,7268.948999932036,292.1256544502618
1767665192,50,,Aggregated,13.900000,13.800000,870,1200,1500,2600,3100,5200,6100,6300,7300,7300,7300,203,200,870.0,1357.2945172349082,2.817099913954735,7268.948999932036,275.83251231527095
1767665193,50,,Aggregated,16.600000,16.400000,950,1200,1500,2600,3000,5200,6100,6300,7300,7300,7300,217,214,950.0,1361.8354663539308,2.817099913954735,7268.948999932036,259.4562211981567
1767665194,50,,Aggregated,17.300000,17.100000,1100,1400,2100,2500,2900,5000,6100,6300,7300,7300,7300,249,246,1100.0,1419.1991526060308,2.817099913954735,7268.948999932036,228.93975903614458
1767665196,50,,Aggregated,16.600000,16.400000,1100,1400,1900,2500,2900,5000,6100,6300,7300,7300,7300,258,252,1000.0,1382.0461077478446,1.9360000733286142,7268.948999932036,421.69767441860466
1767665197,50,,Aggregated,18.600000,18.100000,950,1300,1600,2400,2600,4200,6100,6300,7300,7300,7300,283,277,950.0,1327.5479494664792,1.9360000733286142,7268.948999932036,386.3886925795053
1767665199,50,,Aggregated,17.600000,17.100000,840,1200,1500,2300,2600,4100,6000,6200,7300,7300,7300,308,302,830.0,1242.4250762961055,1.9360000733286142,7268.948999932036,356.7402597402597
1767665200,50,,Aggregated,16.000000,15.500000,890,1300,1500,2100,2600,4200,6000,6200,7300,7300,7300,324,318,890.0,1260.533646911808,1.9360000733286142,7268.948999932036,340.0061728395062
"""
🎯 Production-like Performance Testing with Locust
Test thực tế các endpoint QUAN TRỌNG của chatbot để tìm bottleneck.
CÁCH CHẠY:
1. Start API server:
cd backend
python run.py
2. Run Locust với file này:
locust -f locust/locustfile_production.py --host=http://localhost:8000
3. Mở browser: http://localhost:8089
- Number of users: 50 (tăng dần để test scalability)
- Spawn rate: 10 users/second
- Host: http://localhost:8000
📊 METRICS CẦN QUAN TÂM:
- Response Time (P50, P95, P99): Thời gian phản hồi
- RPS (Requests per second): Throughput
- Failure Rate: Tỷ lệ lỗi
"""
import random
import time
from locust import HttpUser, between, events, task
# ============================================================
# TEST DATA - Mô phỏng real user queries
# ============================================================
CHAT_QUERIES = [
# Simple product search (nhanh)
"tìm áo phông nam",
"quần jean nữ",
"váy liền",
# Complex search với filters (chậm hơn)
"áo khoác nam màu đen giá dưới 500k",
"quần short nữ mùa hè",
# Knowledge queries (test RAG knowledge base)
"Canifa có cửa hàng ở Hà Nội không?",
"chính sách đổi trả",
"bảng size áo nam",
# Conversational (test LLM reasoning)
"xin chào",
"cảm ơn bạn",
]
VISUAL_SEARCH_IMAGES = [
"https://canifa.com/img/500/750/resize/6/t/6ts24w007-sk010-m-1-u.webp",
"https://canifa.com/img/500/750/resize/6/a/6aw24w006-sb492-thumb-2.webp",
]
# ============================================================
# CUSTOM METRICS - Track specific bottlenecks
# ============================================================
chat_response_times = []
db_query_times = []
llm_call_times = []
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
"""Print summary khi test xong."""
print("\n" + "=" * 60)
print("📊 PERFORMANCE TEST SUMMARY")
print("=" * 60)
if chat_response_times:
avg_chat = sum(chat_response_times) / len(chat_response_times)
print(f"💬 Chat Response Time (avg): {avg_chat:.2f}ms")
print(f" P50: {sorted(chat_response_times)[len(chat_response_times) // 2]:.2f}ms")
print(f" P95: {sorted(chat_response_times)[int(len(chat_response_times) * 0.95)]:.2f}ms")
print("\n🎯 RECOMMENDED ACTIONS:")
print("1. Check response times > 2000ms")
print("2. Identify which endpoint has highest latency")
print("3. Use profiling tools (below) for deep dive")
print("=" * 60 + "\n")
# ============================================================
# USER BEHAVIORS
# ============================================================
class ChatbotUser(HttpUser):
"""Mô phỏng user sử dụng chatbot."""
wait_time = between(2, 5) # User thường đợi 2-5s giữa các câu hỏi
def on_start(self):
"""Initialize user - giống như user mở chat lần đầu."""
self.user_id = f"user_{random.randint(1000, 9999)}"
self.thread_id = f"thread_{int(time.time() * 1000)}_{random.randint(0, 999)}"
@task(10)
def chat_message(self):
"""
Task quan trọng nhất - test FULL chatbot flow.
Weight: 10 (chiếm 10/12 = 83% traffic)
"""
query = random.choice(CHAT_QUERIES)
start_time = time.time()
with self.client.post(
"/api/agent/chat",
json={"message": query, "user_id": self.user_id, "thread_id": self.thread_id},
catch_response=True,
name="/api/agent/chat [MAIN]",
) as response:
duration = (time.time() - start_time) * 1000 # ms
chat_response_times.append(duration)
if response.status_code == 200:
response.success()
else:
response.failure(f"Status: {response.status_code}")
@task(1)
def health_check(self):
"""
Health check endpoint.
Weight: 1 (8% traffic)
"""
self.client.get("/", name="/health")
@task(1)
def visual_search(self):
"""
Test visual search nếu có.
Weight: 1 (8% traffic)
"""
img_url = random.choice(VISUAL_SEARCH_IMAGES)
self.client.post(
"/api/recommend/image",
json={"image_url": img_url},
name="/api/recommend/image [VISUAL]",
catch_response=True,
)
class DatabaseStressUser(HttpUser):
"""
Riêng để test ONLY database bottleneck.
Chạy riêng test này để isolate DB performance.
"""
wait_time = between(8, 20) # Mô phỏng người dùng thật: đợi 8-20s giữa các lần search
@task
def db_direct_query(self):
"""Test trực tiếp StarRocks query speed."""
self.client.post(
"/test/db-search",
json={"query": random.choice(["áo phông", "quần jean", "váy"]), "limit": 10},
name="/test/db-search [DB_ONLY]",
)
# ============================================================
# SHAPE CONFIGURATION - Simulate real traffic pattern
# ============================================================
class WebsiteUserShape:
"""
Mô phỏng traffic pattern thực tế:
- Sáng sớm: ít user
- Trưa + tối: peak hours
- Đêm khuya: vắng
CHỈ DÙNG KHI MUỐN TEST THEO PATTERN, KHÔNG DÙNG CHO TEST NHANH.
"""
pass # Implement nếu cần
if __name__ == "__main__":
print("🚀 Starting Locust Performance Test...")
print("📖 Read the docstring for instructions!")
{
"mcpServers": {
"dbeaver": {
"command": "dbeaver-mcp-server",
"env": {
"DBEAVER_DEBUG": "false",
"DBEAVER_TIMEOUT": "30000"
}
},
"sequential-thinking": {
"command": "npx",
"args": [
"-y",
"@modelcontextprotocol/server-sequential-thinking"
],
"env": {}
},
"docs-langchain": {
"serverUrl": "https://docs.langchain.com/mcp"
},
"playwright": {
"command": "npx",
"args": [
"@playwright/mcp@latest"
]
},
"locust-canifa": {
"command": "python",
"args": [
"d:\\cnf\\chatbot_canifa\\backend\\locust\\locust_mcp_server.py"
],
"env": {
"LOCUST_HOST": "http://localhost:8000",
"LOCUST_USERS": "50",
"LOCUST_SPAWN_RATE": "10",
"LOCUST_RUN_TIME": "30s"
}
}
}
}
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
# Chạy từ thư mục d:\cnf\chatbot_canifa\backend\hehe
# 1. Khởi động Server (4 Workers)
python api_server_perf.py
# 2. Chạy Locust Test (500 Users)
locust -f locustfile_production.py --host=http://localhost:8000 --headless -u 500 -r 20 --run-time 1m --html report.html
# 3. Lệnh "trảm" process nếu bị treo
Stop-Process -Name python -Force
\ No newline at end of file
This diff is collapsed.
locust -f locustfile.py --host=http://localhost:8000
\ No newline at end of file
This diff is collapsed.
...@@ -56,8 +56,6 @@ langgraph-checkpoint-postgres==3.0.2 ...@@ -56,8 +56,6 @@ langgraph-checkpoint-postgres==3.0.2
langgraph-prebuilt==1.0.5 langgraph-prebuilt==1.0.5
langgraph-sdk==0.3.0 langgraph-sdk==0.3.0
langsmith==0.5.0 langsmith==0.5.0
locust==2.42.6
locust-cloud==1.30.0
loguru==0.7.3 loguru==0.7.3
MarkupSafe==3.0.3 MarkupSafe==3.0.3
msgpack==1.1.2 msgpack==1.1.2
......
This diff is collapsed.
...@@ -4,4 +4,7 @@ ...@@ -4,4 +4,7 @@
uvicorn server:app --host 0.0.0.0 --port 5000 --reload uvicorn server:app --host 0.0.0.0 --port 5000 --reload
uvicorn server:app --host 0.0.0.0 --port 5000 uvicorn server:app --host 0.0.0.0 --port 5000
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment