Commit d116f990 authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

chore: sync latest changes

parent 995e7d53
......@@ -17,6 +17,7 @@ from agent.nodes.utils import _extract_text
# StarRocks connection for Enrichment
from common.starrocks_connection import get_db_connection
from config import USE_LOCAL_SQLITE
logger = logging.getLogger(__name__)
......@@ -25,7 +26,8 @@ TABLE_NAME = "shared_source.magento_product_dimension_with_text_embedding"
SELECT_COLUMNS = """
magento_ref_code, internal_ref_code, product_name,
sale_price, original_price, product_image_url_thumbnail,
product_web_url, size_scale, gender_by_product, product_line_vn
product_web_url, size_scale, gender_by_product, product_line_vn,
product_color_code, product_color_name
"""
def route_after_classifier(state: StylistProState) -> str:
......@@ -144,6 +146,8 @@ class CANIFAGraph:
sku = str(p.get("magento_ref_code") or p.get("sku") or p.get("sku_code") or "").upper().strip()
if sku:
if "sku" not in p: p["sku"] = sku
if "color_code" not in p: p["color_code"] = p.get("product_color_code", "")
if "color_name" not in p: p["color_name"] = p.get("product_color_name", "")
product_dict[sku] = p
# Also map outfit recommendations
......@@ -156,16 +160,22 @@ class CANIFAGraph:
product_dict[r_sku] = r
except Exception: pass
# 2. Enrich missing or incomplete product data via StarRocks
# 2. Enrich missing or incomplete product data via StarRocks (or SQLite fallback)
wanted_ids = [pid.upper().strip() for pid in ai_product_ids if pid]
missing_ids = [pid for pid in wanted_ids if pid not in product_dict or not product_dict[pid].get("image")]
if missing_ids:
skus = list(dict.fromkeys(missing_ids))
base_skus = list(dict.fromkeys([s.split("-")[0] for s in skus]))
keys = list(dict.fromkeys(skus + base_skus))
try:
if USE_LOCAL_SQLITE:
raise Exception("StarRocks skipped due to USE_LOCAL_SQLITE=True")
db = get_db_connection()
skus = list(dict.fromkeys(missing_ids))
base_skus = list(dict.fromkeys([s.split("-")[0] for s in skus]))
keys = list(dict.fromkeys(skus + base_skus))
if not db:
raise Exception("StarRocks connection is None")
placeholders = ",".join(["%s"] * len(keys))
sql = f"""
......@@ -188,10 +198,40 @@ class CANIFAGraph:
"sizes": r.get("size_scale", ""),
"gender": r.get("gender_by_product", ""),
"product_line": r.get("product_line_vn", ""),
"color_code": r.get("product_color_code", ""),
"color_name": r.get("product_color_name", ""),
}
product_dict[card["sku"].upper()] = card
except Exception as e:
logger.warning(f"⚠️ StarRocks Enrichment failed: {e}")
logger.warning(f"⚠️ StarRocks Enrichment failed: {e}. Falling back to SQLite...")
from common.sqlite_db import sqlite_db
placeholders = ",".join(["?"] * len(keys))
sql = f"""
SELECT {SELECT_COLUMNS}
FROM sr__test_db__magento_product_dimension_with_text_embedding
WHERE UPPER(magento_ref_code) IN ({placeholders})
OR UPPER(internal_ref_code) IN ({placeholders})
LIMIT 200
"""
try:
rows = await sqlite_db.fetch_all(sql, params=tuple(keys + keys))
for r in rows or []:
card = {
"sku": (r.get("magento_ref_code") or r.get("internal_ref_code") or "").strip(),
"name": r.get("product_name", ""),
"price": int(r.get("sale_price") or 0),
"original_price": int(r.get("original_price") or 0),
"image": r.get("product_image_url_thumbnail", ""),
"url": r.get("product_web_url", ""),
"sizes": r.get("size_scale", ""),
"gender": r.get("gender_by_product", ""),
"product_line": r.get("product_line_vn", ""),
"color_code": r.get("product_color_code", ""),
"color_name": r.get("product_color_name", ""),
}
product_dict[card["sku"].upper()] = card
except Exception as sqlite_e:
logger.error(f"❌ SQLite Enrichment fallback failed: {sqlite_e}")
# 3. Assemble final products in the order requested by AI
seen = set()
......
## 8. GỢI Ý SẢN PHẨM TƯƠNG TỰ & PHỐI ĐỒ (TỪ DATA TOOL)
### 8.1. TỔNG QUAN — 2 CỘT DỮ LIỆU TRONG KẾT QUẢ TOOL
Khi `data_retrieval_tool` trả về sản phẩm, mỗi SP có 2 trường pre-computed:
| Trường | Mục đích | Khi nào dùng |
|--------|----------|--------------|
| `similar_items` | Danh sách SP tương tự (cùng loại, form, phong cách) | **MẶC ĐỊNH** — ưu tiên giới thiệu NGAY |
| `outfit_recommendations` | Danh sách SP phối đồ (quần phối áo, phụ kiện) | **CHỈ KHI** khách CHỦ ĐỘNG hỏi phối đồ |
---
### 8.2. ⚡ ƯU TIÊN SỐ 1: `similar_items` (TỰ ĐỘNG GIỚI THIỆU)
**TRIGGER:** Khi giới thiệu bất kỳ SP nào cho khách, nếu SP đó có `similar_items` → **KHÉO LÉO nhắc thêm 1-2 mẫu tương tự** để tăng lựa chọn.
**CÁCH DÙNG:**
1. Đọc trường `similar_items` từ kết quả tool (đã parse thành list[dict])
2. Mỗi item có: `magento_ref_code`, `product_name`
3. Chọn 1-2 mẫu phù hợp nhất để gợi ý thêm
4. **ĐƯA MÃ SKU CỦA SIMILAR ITEMS VÀO `product_ids`** để frontend render card
**VÍ DỤ THỰC TẾ:**
```
Khách: "Tìm quần nỉ cho bé gái"
Tool trả về: Quần nỉ bé gái (1BP24C002) + similar_items chứa [1BP25W003, 1BP25C002, ...]
✅ ĐÚNG (gợi ý tự nhiên):
"Mình tìm được mẫu quần nỉ bé gái này đang HOT luôn bạn ơi! 🔥
Ngoài ra nhà mình còn mẫu quần nỉ dáng suông cũng rất oke cho bé,
dáng thoải mái hơn nếu bé thích mặc rộng. Bạn xem cả 2 mẫu bên dưới nhé!
Thích mẫu nào mình check size cho! 😊"
→ product_ids: ["1BP24C002", "1BP25W003"]
❌ SAI (bỏ qua similar_items):
"Mình tìm được quần nỉ bé gái rồi, xem bên dưới nhé!"
→ Chỉ show 1 SP, bỏ phí data similar_items = MẤT cơ hội bán thêm!
```
**QUY TẮC:**
- Gợi ý similar_items **TỰ NHIÊN**, như "ngoài ra còn mẫu này cũng đẹp", "có thêm mẫu tương tự"
- **KHÔNG liệt kê dài dòng** — chỉ pick 1-2 mẫu hay nhất
- **KHÔNG nhắc mã SKU** trong ai_response — chỉ đưa vào product_ids
- Nếu similar_items RỖNG → bỏ qua, giới thiệu SP chính bình thường
---
### 8.3. 🎯 ƯU TIÊN SỐ 2: `outfit_recommendations` (CHỈ KHI KHÁCH HỎI PHỐI ĐỒ)
**TRIGGER:** CHỈ kích hoạt khi khách CHỦ ĐỘNG hỏi:
- "Phối với quần gì?"
- "Mặc với áo nào cho đẹp?"
- "Gợi ý combo/set/outfit?"
- "Kết hợp với gì?"
- "Mặc đi X nên phối sao?"
- "Tìm nguyên bộ cho mình"
**CÁCH DÙNG:**
1. Đọc trường `outfit_recommendations` từ SP khách đang xem/quan tâm
2. Mỗi item có: `match_product_code`, `match_product_name`, `role` (top/bottom/outerwear/accessory), `reason`
3. Lọc theo `role` phù hợp câu hỏi (hỏi quần → lấy role=bottom, hỏi áo → role=top)
4. Sử dụng trường `reason` để giải thích TẠI SAO phối đẹp
5. **ĐƯA MÃ SKU CỦA OUTFIT ITEMS VÀO `product_ids`** để frontend render card
**VÍ DỤ THỰC TẾ:**
```
Context: Khách vừa xem Áo phông Boxy (3TS26S018-SR079)
Khách: "Áo này phối với quần gì cho bé?"
Tool data từ outfit_recommendations:
[{"match_product_code":"1BP24C002","match_product_name":"Quần nỉ bé gái","role":"bottom","reason":"Quần nỉ phối áo phông — set năng động cho bé đi chơi."}]
✅ ĐÚNG (dùng reason + chọn hộ):
"Áo phông Boxy này phối với quần nỉ dáng jogger là chuẩn bài luôn bạn ơi! 💯
Set này năng động lắm, bé mặc đi chơi hay đi học đều oke.
Quần cạp chun thoải mái cho bé vận động cả ngày!
Bạn xem combo bên dưới nhé 😊"
→ product_ids: ["3TS26S018-SR079", "1BP24C002"]
❌ SAI (không dùng outfit data, tự bịa):
"Áo này phối với quần khaki hoặc quần jeans đều đẹp bạn nhé!"
→ BỊA! Phải dùng đúng data outfit_recommendations từ tool!
```
**⚠️ QUY TẮC QUAN TRỌNG:**
- **CẤM TỰ BỊA** gợi ý phối đồ — PHẢI dùng đúng data từ `outfit_recommendations`
- Nếu `outfit_recommendations` RỖNG → **GỌI `data_retrieval_tool`** để search SP bổ sung (giữ nguyên logic hiện tại ở mục 5.2)
- **Dùng trường `reason`** để giải thích — đây là lý do phối đồ đã được pre-computed, KHÔNG TỰ NGHĨ lý do mới
- Khi gợi ý outfit, **PHẢI thể hiện QUAN ĐIỂM** — "Mình vote combo này!", KHÔNG nói "tùy bạn"
---
### 8.4. 🔄 LUỒNG PIVOTING (CHUYỂN GỐC LIÊN TỤC)
Khi khách hỏi thêm về một SP đã gợi ý (từ similar_items HOẶC outfit_recommendations), AI phải **chuyển gốc sang SP mới đó** để tiếp tục tư vấn.
**VÍ DỤ CHUỖI PIVOTING:**
```
Lượt 1: Khách hỏi "Tìm áo cho bé"
→ Tool trả về Áo A + similar_items [Áo B, Áo C]
→ Bot giới thiệu Áo A + gợi ý thêm Áo B
Lượt 2: Khách nói "Mẫu áo thứ 2 (Áo B) phối quần gì?"
→ CHUYỂN GỐC sang Áo B
→ Đọc outfit_recommendations của Áo B → Quần D
→ Bot gợi ý Quần D
Lượt 3: Khách nói "Có quần nào tương tự kiểu đó không?"
→ CHUYỂN GỐC sang Quần D
→ Đọc similar_items của Quần D → [Quần E, Quần F]
→ Bot gợi ý Quần E, Quần F
```
**NGUYÊN TẮC PIVOTING:**
- Mỗi lượt chat, xác định SP GỐC hiện tại từ context ([LATEST_PRODUCT_INTEREST] trong user_insight)
- Khi khách hỏi về SP đã gợi ý → GỌI `data_retrieval_tool` với `magento_ref_code` của SP đó
- Dùng `similar_items` hoặc `outfit_recommendations` của SP MỚI, KHÔNG dùng của SP cũ
- Mạch hội thoại KHÔNG BAO GIỜ bị cụt — luôn có data để tiếp tục
---
### 8.5. TÓM TẮT QUY TẮC
| Tình huống | Dùng trường nào | Hành động |
|------------|-----------------|-----------|
| Giới thiệu SP lần đầu | `similar_items` | Tự động gợi ý thêm 1-2 mẫu tương tự |
| Khách hỏi "phối gì?" / "combo?" | `outfit_recommendations` | Lấy SP phối theo role + reason |
| Khách hỏi "có mẫu khác tương tự?" | `similar_items` | Lấy danh sách SP tương tự |
| Khách quan tâm SP đã gợi ý | Chuyển gốc | Gọi tool lấy data SP mới → tiếp tục |
| Data rỗng (cả 2 trường) | Fallback | Gọi `data_retrieval_tool` search bình thường |
---
......@@ -27,6 +27,7 @@ SUB_MODULES = [
("04c_sales_upsell.txt", "canifa-04c-sales-upsell", ["canifa", "system-sales"]),
("04d_sales_urgency.txt", "canifa-04d-sales-urgency", ["canifa", "system-sales"]),
("05_tool_routing.txt", "canifa-05-tool-routing", ["canifa", "system-core"]),
("05d_outfit_similar.txt", "canifa-05d-outfit-similar", ["canifa", "system-core"]),
("06_user_insight.txt", "canifa-06-user-insight", ["canifa", "system-core"]),
("07_output_format.txt", "canifa-07-output-format", ["canifa", "system-core"]),
]
......@@ -142,6 +143,7 @@ def verify(lf: Langfuse):
("04c Upsell", "UPSELL & CROSS-SELL"),
("04d Urgency", "URGENCY"),
("05 Tool Routing", "KHI NÀO GỌI TOOL"),
("05d Outfit/Similar", "GỢI Ý SẢN PHẨM TƯƠNG TỰ"),
("06 User Insight", "USER INSIGHT 2.0"),
("07 Output Format", "FORMAT ĐẦU RA"),
("08 Season", "HƯỚNG DẪN TƯ VẤN THEO MÙA"),
......
......@@ -87,13 +87,21 @@ Bạn là Chuyên gia Thời trang (Stylist Pro) của CANIFA. Bạn tư vấn d
</system_role>
<styling_philosophy>
1. GỢI Ý NGAY, KHÔNG HỎI LẠI: Luôn đưa ra lựa chọn. Nếu chưa biết giới tính, gợi ý cả 2 phương án.
2. COMBO / OUTFIT (CRITICAL):
- Nếu khách hỏi "đồ", "set", "phối", "mặc gì" -> PHẢI tạo outfit hoàn chỉnh (Áo + Quần/Váy).
- Sử dụng `outfit_recommendations` từ tool result để bốc đúng món phối.
- BẮT BUỘC nhét SKU món phối vào `product_ids`.
3. KIỂM TRA SỰ PHÙ HỢP: Đừng gợi ý đồ đông cho mùa hè, đừng gợi ý đồ đi làm là áo hoạt hình.
4. KHÔNG TRẦN TÌNH THUẬT TOÁN: Đừng nói "Hệ thống trả về...", hãy nói "Canifa đang có sẵn...".
1. CHIẾN THUẬT SIMILAR ITEMS (AUTO):
- Mặc định khi giới thiệu 1 sản phẩm chính, LUÔN gợi ý kèm 1-2 sản phẩm tương tự lấy từ mảng `similar_items` (nếu có).
- Mục đích: Tránh bế tắc, cho khách thêm lựa chọn (VD: "Ngoài ra, Canifa còn có mẫu [SKU tương tự] cực kỳ hợp dáng...").
2. CHIẾN THUẬT OUTFIT (ON-DEMAND):
- CHỈ tung đồ phối khi khách có ý định rõ ràng (hỏi "phối", "mix", "mặc với gì", "combo", "set"). KHÔNG tự động phối đồ nếu khách chỉ tìm áo/quần đơn lẻ.
- Khi phối đồ: BẮT BUỘC lấy sản phẩm từ mảng `outfit_recommendations`. Dùng `role` và `reason` trong dữ liệu để giải thích thuyết phục (VD: "Với màu Xanh của áo, bạn nên phối với quần [SKU] vì...").
3. NGUYÊN TẮC KIM CHỈ NAM (PIVOTING):
- Nếu khách đang xem SKU A nhưng lại hỏi sang SKU B (từ gợi ý similar/outfit), bạn phải đổi gốc, LẤY SKU B LÀM KIM CHỈ NAM MỚI.
- Các gợi ý tiếp theo phải xoay quanh metadata của SKU mới này.
4. GỢI Ý NGAY & KHÔNG TRẦN TÌNH:
- Gợi ý luôn, không hỏi ngược lại trừ phi cực kỳ cần thiết.
- Không được nói "Theo hệ thống...", "Dữ liệu trả về...", hãy nói một cách tự nhiên như 1 stylist.
</styling_philosophy>
<user_memory_update>
......@@ -104,11 +112,23 @@ Cập nhật 12 trường Insight:
</user_memory_update>
<formatting_rules>
- VĂN PHONG NGẮN GỌN, TRỰC DIỆN: Cắt bỏ mọi lời lẽ dông dài, giải thích lan man. Đi thẳng vào việc giới thiệu sản phẩm.
- TỐI ĐA HÓA SỰ LỰA CHỌN (CHỐT DEAL): Rất khuyến khích đưa ra 3-5 lựa chọn sản phẩm để khách có nhiều cơ hội chốt đơn. TUY NHIÊN, mỗi sản phẩm chỉ mô tả bằng 1 câu cực ngắn (Tên + SKU + 1 ưu điểm).
- TRÌNH BÀY COMBO/OUTFIT GỌN GÀNG: Tuyệt đối không viết tổ hợp lộn xộn kiểu "Áo A + Quần B/C/D". Nếu giới thiệu 1 áo và 4 quần, chỉ cần nói ngắn gọn: "Gợi ý phối đồ: Áo polo [SKU] kết hợp với bất kỳ mẫu quần nào ở trên đều cực kỳ tôn dáng".
- SKU sản phẩm phải bọc trong ngoặc tròn, ví dụ: (6TS25S001).
- MỌI SKU nhắc đến trong text BẮT BUỘC phải nằm trong mảng product_ids.
- Tối đa 250 từ. Không dán URL.
- MỌI SKU nhắc đến trong text BẮT BUỘC phải nằm trong mảng product_ids. Không dán URL.
</formatting_rules>
<example_response>
Dạ Canifa gợi ý 4 mẫu quần kaki nam dáng ôm (slim fit) vừa tôn dáng vừa thoải mái cho anh:
1. Quần khaki nam dáng ôm (8BK25A004): Lên form khá sát nhưng co giãn tốt, hợp đi làm/đi chơi.
2. Quần khaki nam cạp chun (8BK26A001): Cạp thun ẩn, ôm gọn nhưng rất dễ mặc.
3. Quần khaki nam regular (8BK25W001): Dáng ôm vừa phải, ít kén dáng người.
4. Quần khaki nam cạp trơn (8BK23A002): Bản giá tối ưu, giữ tinh thần lịch sự.
💡 Gợi ý phối đồ: Để có outfit hoàn hảo, anh có thể kết hợp Áo polo nam (8TP26A003) với bất kỳ chiếc quần kaki nào ở trên đều cực kỳ hợp và thanh lịch.
</example_response>
<output_format>
Trả về DUY NHẤT JSON:
{{
......
......@@ -13,6 +13,8 @@ from .tool_module import SearchEngine as LegacySearchEngine
logger = logging.getLogger(__name__)
USE_LOCAL_SQLITE = True
PROJECT_ROOT = Path(__file__).resolve().parents[3]
if str(PROJECT_ROOT) not in sys.path:
sys.path.append(str(PROJECT_ROOT))
......@@ -20,7 +22,6 @@ if str(PROJECT_ROOT) not in sys.path:
# Preference module is deprecated; use internal SearchEngine only.
ProductSearchEngine = None
class LiteralSearch(BaseModel):
model_config = {"extra": "ignore"}
......@@ -36,7 +37,18 @@ class InferredSearch(BaseModel):
gender_target: Optional[str] = Field(default=None, description="Alias cu cua gender_by_product.")
age_group: Optional[str] = Field(default=None, description="Alias cu cua age_by_product.")
master_color: Optional[str] = Field(default=None, description="Mau sac.")
tags: List[str] = Field(default_factory=list, description="Intent tags: occ/style/fit/weather/function.")
tags: List[str] = Field(
default_factory=list,
description=(
"AI suy luận ý định khách -> chọn từ 4 TRỤC CỐ ĐỊNH (BẮT BUỘC có prefix!): "
"Trục 1 (occ:): occ:di_lam, occ:di_choi, occ:di_tiec, occ:di_hoc, occ:mac_nha, occ:the_thao, occ:di_bien, occ:du_lich, occ:da_ngoai, occ:di_ngu. "
"Trục 2a (wthr:): wthr:mua_he, wthr:mua_dong, wthr:giao_mua, wthr:troi_mua, wthr:troi_nang. "
"Trục 2b (func:): func:thoang_mat, func:giu_am, func:tham_hut, func:nhanh_kho, func:chong_uv, func:can_gio. "
"Trục 3 (style:): style:thanh_lich, style:nang_dong, style:basic, style:ca_tinh, style:de_thuong, style:tre_trung, style:toi_gian, style:smart_casual. "
"Trục 4 (fit:): fit:oversize, fit:slim, fit:regular, fit:wide_leg, fit:cropped, fit:relaxed. "
"KHÔNG tự nghĩ tag mới! PHẢI giữ prefix! Tối đa 3."
)
)
keywords: List[str] = Field(default_factory=list, description="Tu khoa bo tro cho search.")
price_min: Optional[int] = Field(default=None, description="Gia thap nhat.")
price_max: Optional[int] = Field(default=None, description="Gia cao nhat.")
......@@ -314,7 +326,12 @@ async def data_retrieval_tool(
"""
configurable = config.get("configurable", {}) if config else {}
db_source = configurable.get("db_source", "starrocks")
use_sqlite = db_source == "sqlite"
if USE_LOCAL_SQLITE:
use_sqlite = True
else:
use_sqlite = (db_source == "sqlite")
shared_user_insight = user_insight or configurable.get("user_insight")
per_search_results: List[Dict[str, Any]] = []
......
......@@ -9,6 +9,7 @@ from pydantic import BaseModel, Field
# Core module imports từ local tool_module
from .db_connector import DBConnector
from .tags_mapping import TAG_TO_BITMAP_COL, TAG_TEXT_MAPPING
from .product_mapping import get_related_lines, resolve_product_line
from .pattern_detector import HardPatternDetector
from .size_message_builder import build_size_message
......@@ -229,6 +230,49 @@ def _build_fixed_clauses(inf: InferredSearch, params: list) -> list[str]:
return clauses
def _build_tag_clauses(tags: list[str], params: list) -> str:
if not tags: return ""
bitmap_by_col: dict[str, list[str]] = {}
text_terms: list[str] = []
for tag in tags:
tag_lower = tag.strip().lower()
if tag_lower in TAG_TO_BITMAP_COL:
col, val = TAG_TO_BITMAP_COL[tag_lower]
bitmap_by_col.setdefault(col, []).append(val)
elif tag_lower in TAG_TEXT_MAPPING:
text_terms.append(TAG_TEXT_MAPPING[tag_lower])
else:
# Fallback for hallucinated tags
clean_tag = tag_lower.replace("occ:", "").replace("style:", "").replace("fit:", "").replace("wthr:", "").replace("func:", "")
text_terms.append(clean_tag.replace("_", " "))
all_clauses = []
for col, values in bitmap_by_col.items():
if len(values) == 1:
params.append(values[0])
all_clauses.append(f"{col} = %s")
else:
placeholders = ", ".join(["%s"] * len(values))
params.extend(values)
all_clauses.append(f"{col} IN ({placeholders})")
if text_terms:
term_clauses = []
for term in text_terms:
term = term.strip()
if not term: continue
params.append(f"%{term}%")
params.append(f"%{term}%")
params.append(f"%{term}%")
term_clauses.append(f"(LOWER(description_text) LIKE %s OR LOWER(product_name) LIKE %s OR LOWER(tags) LIKE %s)")
if term_clauses:
all_clauses.append(f"({' OR '.join(term_clauses)})")
if all_clauses:
return f"({' OR '.join(all_clauses)})"
return ""
def _build_exclusion_clauses(keywords: list, params: list) -> list[str]:
clauses = []
......@@ -279,9 +323,10 @@ class SearchEngine:
# Lane 2: Inferred Structured Search
params_inf = []
fixed_inf = _build_fixed_clauses(inf, params_inf)
tag_search_clause = _build_tag_clauses(inf.tags, params_inf)
ex_params = []
exclusions = _build_exclusion_clauses(inf.keywords, ex_params)
sql_inf = _build_full_query(fixed_inf, None, exclusions)
sql_inf = _build_full_query(fixed_inf, tag_search_clause, exclusions)
inferred_products = await self.db.execute_query(sql_inf, tuple(params_inf + ex_params))
# Merge & Dedup (Ưu tiên Inferred trước để chuẩn phân loại)
......@@ -347,13 +392,56 @@ class SearchEngine:
return products
@staticmethod
def _parse_outfit_recommendations(products: list) -> list:
def _parse_outfit_recommendations(products: list, inf=None) -> list:
"""Parse outfit_recommendations JSON string from One Big Table into list[dict]."""
# Determine occasion from inferred tags
occasion_context = ""
if inf and hasattr(inf, "tags") and inf.tags:
from agent.tools.tool_module.tags_mapping import TAG_TEXT_MAPPING
occasions = [TAG_TEXT_MAPPING.get(t) for t in inf.tags if t.startswith("occ:") and TAG_TEXT_MAPPING.get(t)]
if occasions:
occasion_context = f" Phù hợp mặc đi {', '.join(occasions)}."
for p in products:
raw = p.get("outfit_recommendations")
# Fallback to product DB tags if no inferred occasion
prod_occ = occasion_context
if not prod_occ:
prod_tags_raw = p.get("tags")
if prod_tags_raw:
try:
import json
if isinstance(prod_tags_raw, str):
try:
t_list = json.loads(prod_tags_raw)
except:
t_list = [t.strip() for t in prod_tags_raw.split(",")]
elif isinstance(prod_tags_raw, list):
t_list = prod_tags_raw
else:
t_list = []
found_occ = [t for t in t_list if any(o in str(t).lower() for o in ["công sở", "đi làm", "đi chơi", "dạo phố", "mặc nhà", "mặc ngủ", "thể thao", "đi tiệc"])]
if found_occ:
prod_occ = f" Rất hợp để {found_occ[0].lower()}."
except Exception:
pass
if raw and isinstance(raw, str):
try:
p["outfit_recommendations"] = json.loads(raw)
import json
parsed = json.loads(raw)
# Giới hạn max 5 outfit để tránh hàng trăm SP
parsed = parsed[:5]
# Thêm context dịp mặc vào reason
if prod_occ:
for outfit in parsed:
if "reason" in outfit and prod_occ not in outfit["reason"]:
outfit["reason"] += prod_occ
p["outfit_recommendations"] = parsed
except (json.JSONDecodeError, TypeError):
p["outfit_recommendations"] = []
elif not raw:
......@@ -402,7 +490,7 @@ class SearchEngine:
if products:
products = self._parse_similar_items(products)
products = self._parse_outfit_recommendations(products)
products = self._parse_outfit_recommendations(products, inf)
products = self._parse_description_data_full(products)
for p in products:
raw_size = p.get("size_scale", "")
......
"""
Centralized mapping for AI Stylist tags to Database constraints.
This file maintains the exact mapping between LLM generated prefix tags (e.g., 'occ:di_lam')
and their corresponding SQLite/StarRocks database queries (either BITMAP columns or LIKE strings).
"""
TAG_TO_BITMAP_COL: dict[str, tuple[str, str]] = {
# Style
"style:thanh_lich": ("style", "Feminine"),
"style:nang_dong": ("style", "Dynamic"),
"style:basic": ("style", "Basic"),
"style:ca_tinh": ("style", "Street"),
"style:de_thuong": ("style", "Cute"),
"style:tre_trung": ("style", "Trend"),
"style:toi_gian": ("style", "Essential"),
"style:smart_casual":("style", "Smart Casual"),
# Fit
"fit:oversize": ("fitting", "Oversize"),
"fit:slim": ("fitting", "Slim"),
"fit:regular": ("fitting", "Regular"),
"fit:wide_leg": ("fitting", "Relax"),
"fit:cropped": ("fitting", "Boxy"),
"fit:relaxed": ("fitting", "Relax"),
# Weather / Season
"wthr:mua_he": ("season_sale", "Summer"),
"wthr:mua_dong": ("season_sale", "Winter"),
"wthr:giao_mua": ("season_sale", "Basic"),
# Occasion mapped to Bitmap (e.g., season_sale or style)
"occ:di_bien": ("season_sale", "Summer"),
"occ:du_lich": ("season_sale", "Summer"),
"occ:the_thao": ("style", "Athleisure"),
}
TAG_TEXT_MAPPING: dict[str, str] = {
# Occasion
"occ:di_lam": "công sở",
"occ:di_choi": "dạo phố",
"occ:di_tiec": "tiệc",
"occ:di_hoc": "đi học",
"occ:mac_nha": "mặc ngủ",
"occ:da_ngoai": "dã ngoại",
"occ:di_ngu": "mặc ngủ",
"occ:hang_ngay": "hàng ngày",
# Weather
"wthr:troi_mua": "trời mưa",
"wthr:troi_nang": "trời nắng",
# Function
"func:thoang_mat": "thoáng mát",
"func:giu_am": "giữ ấm",
"func:tham_hut": "thấm hút",
"func:nhanh_kho": "nhanh khô",
"func:chong_uv": "chống uv",
"func:can_gio": "cản gió",
}
This source diff could not be displayed because it is too large. You can view the blob instead.
import asyncio
import json
import logging
from datetime import datetime, timedelta, timezone
from typing import Any
import psycopg
from psycopg import sql
from psycopg_pool import AsyncConnectionPool
from config import CHECKPOINT_POSTGRES_URL
logger = logging.getLogger(__name__)
class ConversationManager:
def __init__(
self,
connection_url: str = CHECKPOINT_POSTGRES_URL,
table_name: str = "langgraph_chat_histories",
):
self.connection_url = connection_url
self.table_name = table_name
self._pool: AsyncConnectionPool | None = None
async def _get_pool(self) -> AsyncConnectionPool:
"""Get or create async connection pool."""
if self._pool is None:
self._pool = AsyncConnectionPool(
self.connection_url,
min_size=1,
max_size=20,
max_lifetime=600, # Recycle connections every 10 mins
max_idle=300, # Close idle connections after 5 mins
open=False,
)
await self._pool.open()
return self._pool
async def initialize_table(self):
"""Create the chat history table if it doesn't exist"""
try:
pool = await self._get_pool()
async with pool.connection(timeout=2.0) as conn:
async with conn.cursor() as cursor:
# Set timezone to Vietnam for this session
await cursor.execute("SET timezone = 'Asia/Ho_Chi_Minh'")
create_table_query = sql.SQL("""
CREATE TABLE IF NOT EXISTS {table} (
id SERIAL PRIMARY KEY,
identity_key VARCHAR(255) NOT NULL,
message TEXT NOT NULL,
is_human BOOLEAN NOT NULL,
timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
)
""").format(table=sql.Identifier(self.table_name))
await cursor.execute(create_table_query)
create_index_query = sql.SQL("""
CREATE INDEX IF NOT EXISTS {index_name}
ON {table} (identity_key, timestamp)
""").format(
index_name=sql.Identifier(f"idx_{self.table_name}_identity_timestamp"),
table=sql.Identifier(self.table_name),
)
await cursor.execute(create_index_query)
await conn.commit()
logger.info(f"Table {self.table_name} initialized successfully")
except Exception as e:
logger.error(f"Error initializing table: {e}")
raise
async def save_conversation_turn(self, identity_key: str, human_message: str, ai_message: str):
"""Save both human and AI messages in a single atomic transaction with retry logic."""
max_retries = 3
for attempt in range(max_retries):
try:
pool = await self._get_pool()
# Use Vietnam timezone for consistent timestamp
vietnam_tz = timezone(timedelta(hours=7))
timestamp = datetime.now(vietnam_tz)
# Transaction block: atomic insert
async with pool.connection(timeout=2.0) as conn:
async with conn.cursor() as cursor:
# Set timezone to Vietnam for this session
await cursor.execute("SET timezone = 'Asia/Ho_Chi_Minh'")
insert_query = sql.SQL("""
INSERT INTO {table} (identity_key, message, is_human, timestamp)
VALUES (%s, %s, %s, %s), (%s, %s, %s, %s)
""").format(table=sql.Identifier(self.table_name))
await cursor.execute(
insert_query,
(
identity_key,
human_message,
True,
timestamp,
identity_key,
ai_message,
False,
timestamp,
),
)
await conn.commit()
logger.debug(f"Saved conversation turn for identity_key {identity_key}")
return # Success
except psycopg.OperationalError as e:
logger.warning(f"Database connection error (attempt {attempt + 1}/{max_retries}): {e}")
if attempt == max_retries - 1:
logger.error(f"Failed to save conversation after {max_retries} attempts: {e}")
raise
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"Failed to save conversation for identity_key {identity_key}: {e}", exc_info=True)
raise
async def get_chat_history(
self,
identity_key: str,
limit: int | None = None,
before_id: int | None = None,
include_product_ids: bool = True,
skip_date_filter: bool = False,
) -> list[dict[str, Any]]:
"""
Retrieve chat history for an identity (user_id or device_id) using cursor-based pagination.
Args:
skip_date_filter: True to get all history, False to get only today's messages
include_product_ids: True for API (frontend needs product cards),
False for AI context (only text needed)
"""
try:
if skip_date_filter:
# Get all history without date filter
base_query = sql.SQL("""
SELECT message, is_human, timestamp, id
FROM {table}
WHERE identity_key = %s
""").format(table=sql.Identifier(self.table_name))
params = [identity_key]
else:
# Get today's date range in Vietnam timezone (UTC+7)
vietnam_tz = timezone(timedelta(hours=7))
now = datetime.now(vietnam_tz)
start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0)
end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=999999)
base_query = sql.SQL("""
SELECT message, is_human, timestamp, id
FROM {table}
WHERE identity_key = %s
AND timestamp >= %s AND timestamp <= %s
""").format(table=sql.Identifier(self.table_name))
params = [identity_key, start_of_day, end_of_day]
query_parts = [base_query]
if before_id:
query_parts.append(sql.SQL("AND id < %s"))
params.append(before_id)
query_parts.append(sql.SQL("ORDER BY id DESC"))
if limit:
query_parts.append(sql.SQL("LIMIT %s"))
params.append(limit)
final_query = sql.SQL(" ").join(query_parts)
pool = await self._get_pool()
async with pool.connection(timeout=2.0) as conn, conn.cursor() as cursor:
# Set timezone to Vietnam for this session
await cursor.execute("SET timezone = 'Asia/Ho_Chi_Minh'")
await cursor.execute(final_query, tuple(params))
results = await cursor.fetchall()
history = []
for row in results:
message_content = row[0]
is_human = row[1]
entry = {
"is_human": is_human,
"timestamp": row[2],
"id": row[3],
}
if is_human:
entry["message"] = message_content
else:
try:
parsed = json.loads(message_content)
entry["message"] = parsed.get("ai_response", message_content)
if include_product_ids:
entry["product_ids"] = parsed.get("product_ids", [])
except (json.JSONDecodeError, TypeError):
# Fallback nếu không phải JSON (data cũ)
entry["message"] = message_content
if include_product_ids:
entry["product_ids"] = []
history.append(entry)
return history
except Exception as e:
logger.error(f"Error retrieving chat history: {e}")
return []
async def archive_history(self, identity_key: str) -> str:
"""
Archive current chat history for identity_key by renaming it in the DB.
Only archives messages from TODAY (which are the visible ones).
Returns the new archived key.
"""
try:
timestamp_suffix = datetime.now().strftime("%Y%m%d_%H%M%S")
# Format: user123_archived_20231027_103045
new_key = f"{identity_key}_archived_{timestamp_suffix}"
# Optimize: Use Range Query
now = datetime.now().astimezone() # Ensure Timezone Aware (e.g. +07:00)
start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0)
end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=999999)
pool = await self._get_pool()
async with pool.connection(timeout=2.0) as conn:
async with conn.cursor() as cursor:
query = sql.SQL("""
UPDATE {table}
SET identity_key = %s
WHERE identity_key = %s
AND timestamp >= %s AND timestamp <= %s
""").format(table=sql.Identifier(self.table_name))
await cursor.execute(query, (new_key, identity_key, start_of_day, end_of_day))
await conn.commit()
logger.info(f"Archived history for {identity_key} to {new_key}")
return new_key
except Exception as e:
logger.error(f"Error archiving history: {e}")
raise
async def clear_history(self, identity_key: str):
"""Clear all chat history for an identity"""
try:
pool = await self._get_pool()
async with pool.connection(timeout=2.0) as conn:
async with conn.cursor() as cursor:
query = sql.SQL("DELETE FROM {table} WHERE identity_key = %s").format(
table=sql.Identifier(self.table_name)
)
await cursor.execute(query, (identity_key,))
await conn.commit()
logger.info(f"Cleared chat history for identity_key {identity_key}")
except Exception as e:
logger.error(f"Error clearing chat history: {e}")
async def get_user_count(self) -> int:
"""Get total number of unique identities"""
try:
pool = await self._get_pool()
async with pool.connection() as conn, conn.cursor() as cursor:
query = sql.SQL("SELECT COUNT(DISTINCT identity_key) FROM {table}").format(
table=sql.Identifier(self.table_name)
)
await cursor.execute(query)
result = await cursor.fetchone()
return result[0] if result else 0
except Exception as e:
logger.error(f"Error getting user count: {e}")
return 0
async def get_message_count_today(self, identity_key: str) -> int:
"""
Đếm số tin nhắn của identity trong ngày hôm nay (cho rate limiting).
Chỉ đếm human messages (is_human = true).
"""
try:
# Optimize: Use Range Query
now = datetime.now()
start_of_day = datetime(now.year, now.month, now.day, 0, 0, 0)
end_of_day = datetime(now.year, now.month, now.day, 23, 59, 59, 999999)
pool = await self._get_pool()
async with pool.connection() as conn, conn.cursor() as cursor:
query = sql.SQL("""
SELECT COUNT(*) FROM {table}
WHERE identity_key = %s
AND is_human = true
AND timestamp >= %s AND timestamp <= %s
""").format(table=sql.Identifier(self.table_name))
await cursor.execute(
query,
(identity_key, start_of_day, end_of_day),
)
result = await cursor.fetchone()
return result[0] if result else 0
except Exception as e:
logger.error(f"Error counting messages for {identity_key}: {e}")
return 0
async def close(self):
"""Close the connection pool"""
if self._pool:
await self._pool.close()
# --- Singleton ---
_instance: ConversationManager | None = None
async def get_conversation_manager() -> ConversationManager:
"""Get or create async ConversationManager singleton"""
global _instance
if _instance is None:
_instance = ConversationManager()
await _instance.initialize_table()
return _instance
import asyncio
import json
import logging
from datetime import datetime, timedelta, timezone
from typing import Any
import psycopg
from psycopg import sql
from psycopg_pool import AsyncConnectionPool
from config import CHECKPOINT_POSTGRES_URL, CONV_DATABASE_URL
logger = logging.getLogger(__name__)
class ConversationManager:
def __init__(
self,
connection_url: str = None,
table_name: str = "langgraph_chat_histories",
):
if connection_url is None:
connection_url = CONV_DATABASE_URL or CHECKPOINT_POSTGRES_URL
self.connection_url = connection_url
self.table_name = table_name
self._pool: AsyncConnectionPool | None = None
async def _get_pool(self) -> AsyncConnectionPool:
"""Get or create async connection pool."""
if self._pool is None:
self._pool = AsyncConnectionPool(
self.connection_url,
min_size=1,
max_size=20,
max_lifetime=600, # Recycle connections every 10 mins
max_idle=300, # Close idle connections after 5 mins
open=False,
)
await self._pool.open()
return self._pool
async def initialize_table(self):
"""Create the chat history table if it doesn't exist"""
try:
pool = await self._get_pool()
async with pool.connection(timeout=2.0) as conn:
async with conn.cursor() as cursor:
# Set timezone to Vietnam for this session
await cursor.execute("SET timezone = 'Asia/Ho_Chi_Minh'")
create_table_query = sql.SQL("""
CREATE TABLE IF NOT EXISTS {table} (
id SERIAL PRIMARY KEY,
identity_key VARCHAR(255) NOT NULL,
message TEXT NOT NULL,
is_human BOOLEAN NOT NULL,
timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
)
""").format(table=sql.Identifier(self.table_name))
await cursor.execute(create_table_query)
create_index_query = sql.SQL("""
CREATE INDEX IF NOT EXISTS {index_name}
ON {table} (identity_key, timestamp)
""").format(
index_name=sql.Identifier(f"idx_{self.table_name}_identity_timestamp"),
table=sql.Identifier(self.table_name),
)
await cursor.execute(create_index_query)
await conn.commit()
logger.info(f"Table {self.table_name} initialized successfully")
except Exception as e:
logger.error(f"Error initializing table: {e}")
# Do not raise so that the rest of the application can continue (graceful degradation)
pass
async def save_conversation_turn(self, identity_key: str, human_message: str, ai_message: str):
"""Save both human and AI messages in a single atomic transaction with retry logic."""
max_retries = 3
for attempt in range(max_retries):
try:
pool = await self._get_pool()
# Use Vietnam timezone for consistent timestamp
vietnam_tz = timezone(timedelta(hours=7))
timestamp = datetime.now(vietnam_tz)
# Transaction block: atomic insert
async with pool.connection(timeout=2.0) as conn:
async with conn.cursor() as cursor:
# Set timezone to Vietnam for this session
await cursor.execute("SET timezone = 'Asia/Ho_Chi_Minh'")
insert_query = sql.SQL("""
INSERT INTO {table} (identity_key, message, is_human, timestamp)
VALUES (%s, %s, %s, %s), (%s, %s, %s, %s)
""").format(table=sql.Identifier(self.table_name))
await cursor.execute(
insert_query,
(
identity_key,
human_message,
True,
timestamp,
identity_key,
ai_message,
False,
timestamp,
),
)
await conn.commit()
logger.debug(f"Saved conversation turn for identity_key {identity_key}")
return # Success
except psycopg.OperationalError as e:
logger.warning(f"Database connection error (attempt {attempt + 1}/{max_retries}): {e}")
if attempt == max_retries - 1:
logger.error(f"Failed to save conversation after {max_retries} attempts: {e}")
return None
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"Failed to save conversation for identity_key {identity_key}: {e}", exc_info=True)
return None
async def get_chat_history(
self,
identity_key: str,
limit: int | None = None,
before_id: int | None = None,
include_product_ids: bool = True,
skip_date_filter: bool = False,
) -> list[dict[str, Any]]:
"""
Retrieve chat history for an identity (user_id or device_id) using cursor-based pagination.
Args:
skip_date_filter: True to get all history, False to get only today's messages
include_product_ids: True for API (frontend needs product cards),
False for AI context (only text needed)
"""
try:
if skip_date_filter:
# Get all history without date filter
base_query = sql.SQL("""
SELECT message, is_human, timestamp, id
FROM {table}
WHERE identity_key = %s
""").format(table=sql.Identifier(self.table_name))
params = [identity_key]
else:
# Get today's date range in Vietnam timezone (UTC+7)
vietnam_tz = timezone(timedelta(hours=7))
now = datetime.now(vietnam_tz)
start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0)
end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=999999)
base_query = sql.SQL("""
SELECT message, is_human, timestamp, id
FROM {table}
WHERE identity_key = %s
AND timestamp >= %s AND timestamp <= %s
""").format(table=sql.Identifier(self.table_name))
params = [identity_key, start_of_day, end_of_day]
query_parts = [base_query]
if before_id:
query_parts.append(sql.SQL("AND id < %s"))
params.append(before_id)
query_parts.append(sql.SQL("ORDER BY id DESC"))
if limit:
query_parts.append(sql.SQL("LIMIT %s"))
params.append(limit)
final_query = sql.SQL(" ").join(query_parts)
pool = await self._get_pool()
async with pool.connection(timeout=2.0) as conn, conn.cursor() as cursor:
# Set timezone to Vietnam for this session
await cursor.execute("SET timezone = 'Asia/Ho_Chi_Minh'")
await cursor.execute(final_query, tuple(params))
results = await cursor.fetchall()
history = []
for row in results:
message_content = row[0]
is_human = row[1]
entry = {
"is_human": is_human,
"timestamp": row[2],
"id": row[3],
}
if is_human:
entry["message"] = message_content
else:
try:
parsed = json.loads(message_content)
entry["message"] = parsed.get("ai_response", message_content)
if include_product_ids:
entry["product_ids"] = parsed.get("product_ids", [])
except (json.JSONDecodeError, TypeError):
# Fallback nếu không phải JSON (data cũ)
entry["message"] = message_content
if include_product_ids:
entry["product_ids"] = []
history.append(entry)
return history
except Exception as e:
logger.error(f"Error retrieving chat history: {e}")
return []
async def archive_history(self, identity_key: str) -> str:
"""
Archive current chat history for identity_key by renaming it in the DB.
Only archives messages from TODAY (which are the visible ones).
Returns the new archived key.
"""
try:
timestamp_suffix = datetime.now().strftime("%Y%m%d_%H%M%S")
# Format: user123_archived_20231027_103045
new_key = f"{identity_key}_archived_{timestamp_suffix}"
# Optimize: Use Range Query
now = datetime.now().astimezone() # Ensure Timezone Aware (e.g. +07:00)
start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0)
end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=999999)
pool = await self._get_pool()
async with pool.connection(timeout=2.0) as conn:
async with conn.cursor() as cursor:
query = sql.SQL("""
UPDATE {table}
SET identity_key = %s
WHERE identity_key = %s
AND timestamp >= %s AND timestamp <= %s
""").format(table=sql.Identifier(self.table_name))
await cursor.execute(query, (new_key, identity_key, start_of_day, end_of_day))
await conn.commit()
logger.info(f"Archived history for {identity_key} to {new_key}")
return new_key
except Exception as e:
logger.error(f"Error archiving history: {e}")
raise
async def clear_history(self, identity_key: str):
"""Clear all chat history for an identity"""
try:
pool = await self._get_pool()
async with pool.connection(timeout=2.0) as conn:
async with conn.cursor() as cursor:
query = sql.SQL("DELETE FROM {table} WHERE identity_key = %s").format(
table=sql.Identifier(self.table_name)
)
await cursor.execute(query, (identity_key,))
await conn.commit()
logger.info(f"Cleared chat history for identity_key {identity_key}")
except Exception as e:
logger.error(f"Error clearing chat history: {e}")
async def get_user_count(self) -> int:
"""Get total number of unique identities"""
try:
pool = await self._get_pool()
async with pool.connection() as conn, conn.cursor() as cursor:
query = sql.SQL("SELECT COUNT(DISTINCT identity_key) FROM {table}").format(
table=sql.Identifier(self.table_name)
)
await cursor.execute(query)
result = await cursor.fetchone()
return result[0] if result else 0
except Exception as e:
logger.error(f"Error getting user count: {e}")
return 0
async def get_message_count_today(self, identity_key: str) -> int:
"""
Đếm số tin nhắn của identity trong ngày hôm nay (cho rate limiting).
Chỉ đếm human messages (is_human = true).
"""
try:
# Optimize: Use Range Query
now = datetime.now()
start_of_day = datetime(now.year, now.month, now.day, 0, 0, 0)
end_of_day = datetime(now.year, now.month, now.day, 23, 59, 59, 999999)
pool = await self._get_pool()
async with pool.connection() as conn, conn.cursor() as cursor:
query = sql.SQL("""
SELECT COUNT(*) FROM {table}
WHERE identity_key = %s
AND is_human = true
AND timestamp >= %s AND timestamp <= %s
""").format(table=sql.Identifier(self.table_name))
await cursor.execute(
query,
(identity_key, start_of_day, end_of_day),
)
result = await cursor.fetchone()
return result[0] if result else 0
except Exception as e:
logger.error(f"Error counting messages for {identity_key}: {e}")
return 0
async def close(self):
"""Close the connection pool"""
if self._pool:
await self._pool.close()
# --- Singleton ---
_instance: ConversationManager | None = None
async def get_conversation_manager() -> ConversationManager:
"""Get or create async ConversationManager singleton"""
global _instance
if _instance is None:
_instance = ConversationManager()
await _instance.initialize_table()
return _instance
......@@ -132,6 +132,7 @@ STARROCKS_PORT: int = int(os.getenv("STARROCKS_PORT", "9030"))
STARROCKS_USER: str | None = os.getenv("STARROCKS_USER")
STARROCKS_PASSWORD: str | None = os.getenv("STARROCKS_PASSWORD")
STARROCKS_DB: str | None = os.getenv("STARROCKS_DB")
USE_LOCAL_SQLITE: bool = os.getenv("USE_LOCAL_SQLITE", "false").lower() == "true"
# Placeholder for backward compatibility if needed
AI_MODEL_NAME = DEFAULT_MODEL
......
[('Chân váy', 'di_lam', 'outerwear'), ('Chân váy', 'di_lam', 'outerwear'), ('Găng tay chống nắng', 'di_lam', 'outerwear'), ('Khăn', 'di_lam', 'outerwear'), ('Khẩu trang', 'di_lam', 'outerwear'), ('Mũ', 'di_lam', 'outerwear'), ('Quần Body', 'di_lam', 'outerwear'), ('Quần Khaki', 'di_lam', 'outerwear'), ('Quần Khaki', 'di_lam', 'outerwear'), ('Quần Khaki', 'di_lam', 'outerwear'), ('Quần dài', 'di_lam', 'outerwear'), ('Quần dài', 'di_lam', 'outerwear'), ('Quần dài', 'di_lam', 'outerwear'), ('Quần dài', 'di_lam', 'outerwear'), ('Quần dài', 'di_lam', 'outerwear'), ('Quần dài', 'di_lam', 'outerwear'), ('Quần dài', 'di_lam', 'outerwear'), ('Quần giữ nhiệt', 'di_lam', 'outerwear'), ('Quần giữ nhiệt', 'di_lam', 'outerwear'), ('Quần jean', 'di_lam', 'outerwear'), ('Quần jean', 'di_lam', 'outerwear'), ('Quần jean', 'di_lam', 'outerwear'), ('Quần jean', 'di_lam', 'outerwear'), ('Quần jean', 'di_lam', 'outerwear'), ('Quần jean', 'di_lam', 'outerwear'), ('Quần leggings', 'di_lam', 'outerwear'), ('Quần nỉ', 'di_lam', 'outerwear'), ('Quần nỉ', 'di_lam', 'outerwear'), ('Quần nỉ', 'di_lam', 'outerwear'), ('Quần nỉ', 'di_lam', 'outerwear'), ('Quần nỉ', 'di_lam', 'outerwear'), ('Túi xách', 'di_lam', 'outerwear'), ('Túi xách', 'di_lam', 'outerwear'), ('Váy liền', 'di_lam', 'outerwear'), ('Váy liền', 'di_lam', 'outerwear'), ('Váy liền', 'di_lam', 'outerwear'), ('Váy liền', 'di_lam', 'outerwear'), ('Váy liền', 'di_lam', 'outerwear'), ('Áo Body', 'di_lam', 'outerwear'), ('Áo Body', 'di_lam', 'outerwear'), ('Áo Body', 'di_lam', 'outerwear'), ('Áo Body', 'di_lam', 'outerwear'), ('Áo Body', 'di_lam', 'outerwear'), ('Áo Sơ mi', 'di_lam', 'outerwear'), ('Áo Sơ mi', 'di_lam', 'outerwear'), ('Áo Sơ mi', 'di_lam', 'outerwear'), ('Áo Sơ mi', 'di_lam', 'outerwear'), ('Áo Sơ mi', 'di_lam', 'outerwear'), ('Áo Sơ mi', 'di_lam', 'outerwear'), ('Áo Sơ mi', 'di_lam', 'outerwear'), ('Áo Sơ mi', 'di_lam', 'outerwear'), ('Áo ba lỗ', 'di_lam', 'outerwear'), ('Áo ba lỗ', 'di_lam', 'outerwear'), ('Áo ba lỗ', 'di_lam', 'outerwear'), ('Áo ba lỗ', 'di_lam', 'outerwear'), ('Áo giữ nhiệt', 'di_lam', 'outerwear'), ('Áo giữ nhiệt', 'di_lam', 'outerwear'), ('Áo giữ nhiệt', 'di_lam', 'outerwear'), ('Áo giữ nhiệt', 'di_lam', 'outerwear'), ('Áo giữ nhiệt', 'di_lam', 'outerwear'), ('Áo giữ nhiệt', 'di_lam', 'outerwear'), ('Áo hai dây', 'di_lam', 'outerwear'), ('Áo kiểu', 'di_lam', 'outerwear'), ('Áo len', 'di_lam', 'outerwear'), ('Áo len', 'di_lam', 'outerwear'), ('Áo nỉ', 'di_lam', 'outerwear'), ('Áo nỉ', 'di_lam', 'outerwear'), ('Áo nỉ', 'di_lam', 'outerwear'), ('Áo nỉ', 'di_lam', 'outerwear'), ('Áo nỉ', 'di_lam', 'outerwear'), ('Áo nỉ có mũ', 'di_lam', 'outerwear'), ('Áo nỉ có mũ', 'di_lam', 'outerwear'), ('Áo nỉ có mũ', 'di_lam', 'outerwear'), ('Áo nỉ có mũ', 'di_lam', 'outerwear'), ('Áo phông', 'di_lam', 'outerwear'), ('Áo phông', 'di_lam', 'outerwear'), ('Áo phông', 'di_lam', 'outerwear'), ('Áo phông', 'di_lam', 'outerwear'), ('Áo phông', 'di_lam', 'outerwear'), ('Áo phông', 'di_lam', 'outerwear'), ('Áo phông', 'di_lam', 'outerwear'), ('Áo phông', 'di_lam', 'outerwear'), ('Áo phông', 'di_lam', 'outerwear'), ('Áo phông', 'di_lam', 'outerwear'), ('Áo phông', 'di_lam', 'outerwear'), ('Áo phông', 'di_lam', 'outerwear'), ('Áo phông', 'di_lam', 'outerwear')]
\ No newline at end of file
B----------------
......@@ -37,18 +37,10 @@ def test_query(query, description):
print(f"⚠️ Connection failed: {e}")
if __name__ == "__main__":
print("🚀 Starting Canifa AI Stylist API Verification...")
print("🚀 Testing Canifa AI Stylist Tags Logic...")
# 1. Test Product Search (SQLite Mock Logic)
test_query("Tìm áo phông cho bé trai", "Product Search (SQLite Mock)")
# 2. Test Promotion (StarRocks Production)
test_query("Hôm nay có khuyến mãi gì hot không?", "Promotion Search (StarRocks)")
# 3. Test Knowledge (StarRocks Production)
test_query("Chính sách đổi trả của Canifa thế nào?", "Knowledge Search (StarRocks)")
# 4. Test Store (StarRocks Production)
test_query("Tìm cửa hàng ở Cầu Giấy", "Store Search (StarRocks)")
# Test tags map
test_query("Tìm quần âu nam mặc đi làm công sở", "Test Tag: Đi làm công sở")
test_query("áo thun mặc ở nhà cho nữ", "Test Tag: Ở nhà / mặc ngủ")
print("\n✅ Verification sequence completed.")
import requests
BASE_URL = "http://localhost:5005/api/agent/chat-dev"
def test_query(query, description):
print(f"\nTEST: {description}")
print(f"QUERY: '{query}'")
try:
response = requests.post(BASE_URL, json={"user_query": query}, timeout=60)
print("Status code:", response.status_code)
except Exception as e:
print(f"Connection failed: {e}")
if __name__ == "__main__":
test_query("Tìm quần âu nam mặc đi làm công sở", "Test Tag")
import sys
import asyncio
sys.path.append(r"d:\cnf\chatbot_canifa\backend")
from agent.tools.tool_module.search_engine import SearchEngine
from agent.tools.data_retrieval_tool import InferredSearch
async def main():
se = SearchEngine(use_sqlite=True)
inf = InferredSearch(tags=["occ:di_lam"], keywords=[])
res = await se._dual_lane_search("", inf)
print("Found:", len(res[0]) if res and isinstance(res, tuple) else res)
asyncio.run(main())
......@@ -199,6 +199,9 @@ Always respond warmly in Vietnamese.</textarea>
const price = isObj ? (p.price || p.sale_price || 0) : 0;
const oldPrice = isObj ? ((p.original_price && p.price && p.original_price > p.price) ? p.original_price : null) : null;
const url = isObj ? (p.url ? (p.url.startsWith('http') ? p.url : `https://canifa.com/${p.url}`) : '#') : '#';
const colorName = isObj ? (p.color_name || '') : '';
const colorDisplay = colorName ? `<div class="p-color" title="${colorName}" style="display:inline-block; font-size:11px; padding:2px 6px; background:#f5f5f5; color:#555; border-radius:4px; margin-bottom:4px; margin-top:4px;">${colorName}</div>` : '';
const card = document.createElement('a');
card.className = 'p-card';
......@@ -210,6 +213,7 @@ Always respond warmly in Vietnamese.</textarea>
</div>
<div class="p-meta">
<div class="p-name" title="${name}">${name}</div>
${colorDisplay}
<div class="p-price-row">
<span class="p-price">${price.toLocaleString('vi-VN')}đ</span>
${oldPrice ? `<span class="p-old">${oldPrice.toLocaleString('vi-VN')}đ</span>` : ''}
......
"""
🧪 Test Suite: Outfit Recommendations & Similar Items Flow
============================================================
Tests 3 chiến thuật chính:
1. Similar Items (Tự động gợi ý khi giới thiệu SP)
2. Outfit Recommendations (Chỉ khi khách hỏi phối đồ)
3. Pivoting (Chuyển gốc liên tục qua nhiều lượt chat)
Usage:
py tests/test_outfit_similar_flow.py
py tests/test_outfit_similar_flow.py --base-url http://localhost:5001
"""
import argparse
import asyncio
import json
import logging
import sys
import time
from dataclasses import dataclass, field
from typing import Optional
import httpx
# ============================================================
# CONFIG
# ============================================================
DEFAULT_BASE_URL = "http://localhost:5001"
CHAT_ENDPOINT = "/api/agent/chat-dev"
TIMEOUT_SECONDS = 120 # LLM can be slow
# Force UTF-8 output on Windows
if sys.platform == "win32":
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger(__name__)
@dataclass
class TestResult:
name: str
query: str
passed: bool
ai_response: str = ""
product_ids: list = field(default_factory=list)
duration_sec: float = 0.0
error: Optional[str] = None
notes: str = ""
# ============================================================
# TEST SCENARIOS
# ============================================================
# Each scenario is a dict with:
# name: Human-readable test name
# query: The user query to send
# expect: What to check in the response
# - has_products: bool (expect product_ids non-empty)
# - min_products: int (minimum product count)
# - response_contains: list[str] (keywords in ai_response)
# - response_not_contains: list[str] (anti-keywords)
TEST_SCENARIOS = [
# =========================================================
# GROUP 1: Basic Product Search (Baseline — No outfit logic)
# =========================================================
{
"name": "G1.01 — Tìm áo phông nam cơ bản",
"query": "Tìm áo phông nam đi chơi",
"expect": {"has_products": True, "min_products": 1},
},
{
"name": "G1.02 — Tìm quần cho bé gái",
"query": "Quần cho bé gái 6 tuổi",
"expect": {"has_products": True, "min_products": 1},
},
{
"name": "G1.03 — Tìm váy nữ đi tiệc",
"query": "Tìm váy đi tiệc cho nữ",
"expect": {"has_products": True, "min_products": 1},
},
# =========================================================
# GROUP 2: Similar Items (AI nên tự động gợi ý thêm)
# =========================================================
{
"name": "G2.01 — Tìm bằng SKU → expect similar mention",
"query": "Cho xem mẫu 1BP24C002",
"expect": {"has_products": True},
},
{
"name": "G2.02 — Tìm tất trẻ em → expect similar items",
"query": "Tìm tất cho bé",
"expect": {"has_products": True, "min_products": 1},
},
{
"name": "G2.03 — Có mẫu áo tương tự không?",
"query": "Có mẫu áo phông nào tương tự mẫu 3TS26S018-SR079 không?",
"expect": {"has_products": True},
},
# =========================================================
# GROUP 3: Outfit Recommendations (CHỈ khi hỏi phối đồ)
# =========================================================
{
"name": "G3.01 — Hỏi phối quần cho áo",
"query": "Áo phông 3TS26S018-SR079 phối với quần gì cho bé?",
"expect": {"has_products": True},
},
{
"name": "G3.02 — Hỏi phối áo cho quần",
"query": "Quần nỉ 1BP24C002 mặc với áo gì đẹp?",
"expect": {"has_products": True},
},
{
"name": "G3.03 — Hỏi combo/set outfit",
"query": "Gợi ý nguyên bộ cho bé gái đi học",
"expect": {"has_products": True, "min_products": 1},
},
{
"name": "G3.04 — Hỏi phụ kiện phối",
"query": "Phối phụ kiện gì cho mẫu 1BS25S008-SK010?",
"expect": {"has_products": True},
},
# =========================================================
# GROUP 4: Cross-sell / Upsell tự nhiên
# =========================================================
{
"name": "G4.01 — Khách chốt 1 món → AI upsell tự nhiên",
"query": "Ok lấy cái quần 1BP24C002 đi, còn gì hay không?",
"expect": {"has_products": True},
},
{
"name": "G4.02 — Hỏi thêm combo tiết kiệm",
"query": "Mua thêm gì nữa để được giảm giá?",
"expect": {"has_products": False}, # Expect promotions redirect, not products
},
# =========================================================
# GROUP 5: Phối đồ theo dịp (AI tự suy luận)
# =========================================================
{
"name": "G5.01 — Đồ đi biển cho nam",
"query": "Đồ đi biển cho nam, thoáng mát",
"expect": {"has_products": True, "min_products": 1},
},
{
"name": "G5.02 — Đồ đi làm cho nữ",
"query": "Gợi ý đồ đi làm công sở cho nữ 28 tuổi",
"expect": {"has_products": True, "min_products": 1},
},
{
"name": "G5.03 — Đồ đi chơi cuối tuần",
"query": "Cuối tuần đi chơi với bạn gái mặc gì cho đẹp?",
"expect": {"has_products": True, "min_products": 1},
},
# =========================================================
# GROUP 6: Edge Cases & Negative Tests
# =========================================================
{
"name": "G6.01 — Chào hỏi (không gọi tool)",
"query": "Xin chào shop",
"expect": {"has_products": False},
},
{
"name": "G6.02 — Hỏi chính sách (redirect knowledge)",
"query": "Chính sách đổi trả như thế nào?",
"expect": {"has_products": False},
},
{
"name": "G6.03 — Hỏi cửa hàng",
"query": "Có cửa hàng Canifa ở Hải Phòng không?",
"expect": {"has_products": False},
},
{
"name": "G6.04 — SKU không tồn tại",
"query": "Cho xem mẫu ZZZZ99999",
"expect": {"has_products": False},
},
# =========================================================
# GROUP 7: Multi-turn Pivoting Simulation
# =========================================================
{
"name": "G7.01 — Lượt 1: Tìm áo gốc",
"query": "Tìm áo phông cho bé trai",
"expect": {"has_products": True, "min_products": 1},
},
{
"name": "G7.02 — Lượt 2: Hỏi phối quần (pivot)",
"query": "Áo đó phối quần gì cho bé đi học?",
"expect": {"has_products": True},
},
{
"name": "G7.03 — Lượt 3: Hỏi SP tương tự (pivot tiếp)",
"query": "Có quần nào form rộng hơn không?",
"expect": {"has_products": True},
},
# =========================================================
# GROUP 8: Price-sensitive Queries
# =========================================================
{
"name": "G8.01 — Tìm theo giá",
"query": "Áo phông nam dưới 200k",
"expect": {"has_products": True},
},
{
"name": "G8.02 — Khách nói đắt",
"query": "Đắt quá, có mẫu nào rẻ hơn tương tự không?",
"expect": {"has_products": True},
},
# =========================================================
# GROUP 9: Size & Stock Queries
# =========================================================
{
"name": "G9.01 — Hỏi size có sẵn",
"query": "Mẫu 3TS26S018-SR079 có size gì?",
"expect": {"has_products": True},
},
{
"name": "G9.02 — Tư vấn size theo cân nặng",
"query": "60kg cao 1m65 mặc size gì?",
"expect": {"has_products": False},
},
# =========================================================
# GROUP 10: Stress / Combo Queries
# =========================================================
{
"name": "G10.01 — Mua cho cả nhà",
"query": "Tư vấn đồ cho gia đình 4 người: bố mẹ và 2 con nhỏ, budget 2 triệu",
"expect": {"has_products": True, "min_products": 1},
},
{
"name": "G10.02 — Hỏi nhiều mã cùng lúc",
"query": "So sánh 3TS26S018-SR079 và 1BP24C002 cái nào đáng mua hơn?",
"expect": {"has_products": True, "min_products": 1},
},
]
# ============================================================
# TEST RUNNER
# ============================================================
async def run_single_test(
client: httpx.AsyncClient,
scenario: dict,
base_url: str,
) -> TestResult:
name = scenario["name"]
query = scenario["query"]
expect = scenario.get("expect", {})
result = TestResult(name=name, query=query, passed=False)
try:
start = time.time()
resp = await client.post(
f"{base_url}{CHAT_ENDPOINT}",
json={"user_query": query},
timeout=TIMEOUT_SECONDS,
)
result.duration_sec = round(time.time() - start, 2)
if resp.status_code != 200:
result.error = f"HTTP {resp.status_code}: {resp.text[:200]}"
return result
data = resp.json()
result.ai_response = data.get("ai_response", "")[:500]
result.product_ids = data.get("product_ids", [])
# Evaluate expectations
passed = True
notes_parts = []
if "has_products" in expect:
has = len(result.product_ids) > 0
if has != expect["has_products"]:
passed = False
notes_parts.append(
f"products: expected {'yes' if expect['has_products'] else 'no'}, got {'yes' if has else 'no'} ({len(result.product_ids)})"
)
else:
notes_parts.append(f"products: {len(result.product_ids)} ✓")
if "min_products" in expect:
if len(result.product_ids) < expect["min_products"]:
passed = False
notes_parts.append(
f"min_products: expected >={expect['min_products']}, got {len(result.product_ids)}"
)
if "response_contains" in expect:
for keyword in expect["response_contains"]:
if keyword.lower() not in result.ai_response.lower():
passed = False
notes_parts.append(f"missing keyword: '{keyword}'")
if "response_not_contains" in expect:
for keyword in expect["response_not_contains"]:
if keyword.lower() in result.ai_response.lower():
passed = False
notes_parts.append(f"unwanted keyword found: '{keyword}'")
result.passed = passed
result.notes = " | ".join(notes_parts)
except httpx.TimeoutException:
result.error = f"Timeout after {TIMEOUT_SECONDS}s"
except httpx.ConnectError:
result.error = "Connection refused — is the server running?"
except Exception as e:
result.error = str(e)[:200]
return result
async def run_all_tests(base_url: str, sequential: bool = True):
logger.info(f"🚀 Starting test suite against {base_url}")
logger.info(f"📋 Total scenarios: {len(TEST_SCENARIOS)}")
logger.info("=" * 70)
results: list[TestResult] = []
async with httpx.AsyncClient() as client:
# Quick health check
try:
health = await client.get(f"{base_url}/health", timeout=5)
if health.status_code != 200:
logger.error(f"❌ Health check failed: {health.status_code}")
return
logger.info("✅ Health check passed\n")
except Exception as e:
logger.error(f"❌ Cannot connect to {base_url}: {e}")
logger.error(" Make sure the server is running!")
return
for i, scenario in enumerate(TEST_SCENARIOS, 1):
logger.info(f"[{i:02d}/{len(TEST_SCENARIOS)}] {scenario['name']}")
logger.info(f" Query: \"{scenario['query']}\"")
result = await run_single_test(client, scenario, base_url)
icon = "✅" if result.passed else ("⚠️" if result.error else "❌")
logger.info(f" {icon} {'PASS' if result.passed else 'FAIL'} ({result.duration_sec}s)")
if result.error:
logger.info(f" Error: {result.error}")
if result.notes:
logger.info(f" Notes: {result.notes}")
if result.product_ids:
logger.info(f" Products: {result.product_ids[:5]}{'...' if len(result.product_ids) > 5 else ''}")
logger.info(f" Response: {result.ai_response[:150]}...")
logger.info("")
results.append(result)
# Small delay between tests to avoid hammering
if sequential and i < len(TEST_SCENARIOS):
await asyncio.sleep(1)
# Summary
logger.info("=" * 70)
logger.info("📊 TEST SUMMARY")
logger.info("=" * 70)
passed = sum(1 for r in results if r.passed)
failed = sum(1 for r in results if not r.passed and not r.error)
errored = sum(1 for r in results if r.error)
total_time = sum(r.duration_sec for r in results)
logger.info(f" ✅ Passed: {passed}/{len(results)}")
logger.info(f" ❌ Failed: {failed}/{len(results)}")
logger.info(f" ⚠️ Errors: {errored}/{len(results)}")
logger.info(f" ⏱️ Total: {total_time:.1f}s")
logger.info(f" ⏱️ Avg: {total_time/len(results):.1f}s per test")
if failed + errored > 0:
logger.info("\n❌ FAILURES:")
for r in results:
if not r.passed:
logger.info(f" • {r.name}: {r.error or r.notes}")
# Save results to JSON
output_path = "tests/test_results_outfit_flow.json"
output_data = {
"summary": {
"total": len(results),
"passed": passed,
"failed": failed,
"errored": errored,
"total_time_sec": round(total_time, 1),
},
"results": [
{
"name": r.name,
"query": r.query,
"passed": r.passed,
"duration_sec": r.duration_sec,
"product_count": len(r.product_ids),
"product_ids": r.product_ids[:10],
"error": r.error,
"notes": r.notes,
"ai_response_preview": r.ai_response[:300],
}
for r in results
],
}
try:
with open(output_path, "w", encoding="utf-8") as f:
json.dump(output_data, f, ensure_ascii=False, indent=2)
logger.info(f"\n📄 Results saved to {output_path}")
except Exception as e:
logger.warning(f"Could not save results: {e}")
def main():
parser = argparse.ArgumentParser(description="Test Outfit & Similar Items Flow")
parser.add_argument(
"--base-url",
default=DEFAULT_BASE_URL,
help=f"Base URL of the chatbot API (default: {DEFAULT_BASE_URL})",
)
parser.add_argument(
"--parallel",
action="store_true",
help="Run tests in parallel (faster but may cause issues)",
)
args = parser.parse_args()
asyncio.run(run_all_tests(args.base_url, sequential=not args.parallel))
if __name__ == "__main__":
main()
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment