Commit 7659d548 authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

feat(lead-stage): hard pattern detection for price comparison queries

- Added hard_patterns.py: detect patterns (cheaper_than, more_expensive_than, similar_to) and extract reference product
- product_search_engine.py: integrate pattern detection, auto-set price_max/min based on reference price
- lead_search_tool.py: added user_insight parameter
- graph.py: pass user_insight to tool, removed product_ids filter (always return full tool result)
- prompts.py: added family/group intent examples, emphasize product_ids must include all items
- stylist_engine.py: added occasion and rule_id to matches, save to DB properly
- Added scripts: regenerate_matches.py and test_insert_one.py

Impact:
- Query 'rẻ hơn' auto sets price_max = ref_price * 0.95
- Query 'đắt hơn' auto sets price_min = ref_price * 1.05
- UI receives complete product list from tool (not truncated by AI)
parent ab00e9ad
......@@ -8,9 +8,9 @@ Flow: User → Classifier (route tool) → [Tool Exec] → Stylist (response + i
- Stylist (NẶNG): with_structured_output(StylistOutput) → sinh response + InsightJSON
"""
import json
import logging
import re
import time
from typing import Annotated, Any, TypedDict
......@@ -40,6 +40,7 @@ logger = logging.getLogger(__name__)
# Pydantic Models
# ═══════════════════════════════════════════════
class LiteralSearchArgs(BaseModel):
model_config = {"extra": "ignore"}
raw_text: str = Field(
......@@ -68,7 +69,10 @@ class InferredSearchArgs(BaseModel):
description="Suy luan tu ngu canh: women/men/boy/girl/unisex. Null neu khong co manh moi.",
)
age_by_product: str | None = Field(default=None, description="adult | kid | others")
master_color: str | None = Field(default=None, description="Màu sắc AI SUY LUẬN từ ngữ cảnh. VD: '30/4' -> color='đỏ'. 'áo trắng' -> color='trắng'.")
master_color: str | None = Field(
default=None,
description="Màu sắc AI SUY LUẬN từ ngữ cảnh. VD: '30/4' -> color='đỏ'. 'áo trắng' -> color='trắng'.",
)
tags: list[str] = Field(
default=[],
description=(
......@@ -80,7 +84,7 @@ class InferredSearchArgs(BaseModel):
"Trục 5 (Dáng): Oversize, Slim, Regular, Wide leg, Cropped, Relaxed. "
"VD: 'đi tiệc' -> tags=['Đi tiệc']. 'mùa đông' -> tags=['Mùa đông', 'Giữ ấm']. "
"KHÔNG tự nghĩ tag mới! Tối đa 3."
)
),
)
keywords: list[str] = Field(
default=[],
......@@ -88,7 +92,7 @@ class InferredSearchArgs(BaseModel):
"Từ khóa BỔ TRỢ AI suy luận thêm (chất liệu, tính năng...). "
"VD: '30/4' -> keywords=['cờ đỏ sao vàng']. 'đi biển' -> keywords=['thoáng mát']. "
"Tối đa 2."
)
),
)
price_min: int | None = Field(default=None)
price_max: int | None = Field(default=None)
......@@ -100,6 +104,7 @@ class InferredSearchArgs(BaseModel):
class ClassifierLeadSearchArgs(BaseModel):
"""Dual-Lane Search Args — buộc AI tách bạch rõ Literal vs Inferred."""
model_config = {"extra": "ignore"}
literal: LiteralSearchArgs = Field(
......@@ -116,34 +121,46 @@ class ClassifierLeadSearchArgs(BaseModel):
class ClassifierOutput(BaseModel):
"""Classifier output — Định tuyến hoặc Trả lời Sớm."""
model_config = {"extra": "ignore"}
reasoning: str = Field(description="Ly luan goi tool hay tra loi truc tiep")
# ── TH 1: GOI TOOL ──
tool_name: str | None = Field(default=None, description="Ten tool (lead_search_tool). Bat buoc de null neu khach chi dang tam su/chao hoi/ko can DB.")
tool_name: str | None = Field(
default=None,
description="Ten tool (lead_search_tool). Bat buoc de null neu khach chi dang tam su/chao hoi/ko can DB.",
)
lead_search_args: ClassifierLeadSearchArgs | None = Field(
default=None,
description=(
"Args cho lead_search_tool. PHAI dien khi tool_name='lead_search_tool'. "
"Null neu tool_name khac hoac null."
"Args cho lead_search_tool. PHAI dien khi tool_name='lead_search_tool'. Null neu tool_name khac hoac null."
),
)
tool_args: dict | None = Field(default=None, description="Args cho cac tool khac (knowledge_search, check_is_stock, v.v.). Null neu dung lead_search_tool.")
tool_args: dict | None = Field(
default=None,
description="Args cho cac tool khac (knowledge_search, check_is_stock, v.v.). Null neu dung lead_search_tool.",
)
# ── TH 2: EARLY EXIT ──
ai_response: str | None = Field(default=None, description="Cau tra loi cho khach. CHI nha ra khi tool_name=null. Neu goi tool, bat buoc de null.")
ai_response: str | None = Field(
default=None,
description="Cau tra loi cho khach. CHI nha ra khi tool_name=null. Neu goi tool, bat buoc de null.",
)
product_ids: list[str] | None = Field(default_factory=list, description="Ma SKU (neu khach hoi trung ma).")
class InsightJSON(BaseModel):
"""Bộ nhớ khách hàng — 12 trường."""
model_config = {"extra": "ignore"}
USER: str = Field(default="Chưa rõ")
TARGET: str = Field(default="Chưa rõ")
GOAL: str = Field(default="Chưa rõ")
CONSTRAINS: str = Field(default="", description="CHỈ điền khi khách NÓI RÕ. VD: khách nói 'dưới 500k' → 'Budget: <500k'. KHÔNG TỰ BỊA!")
CONSTRAINS: str = Field(
default="", description="CHỈ điền khi khách NÓI RÕ. VD: khách nói 'dưới 500k' → 'Budget: <500k'. KHÔNG TỰ BỊA!"
)
STAGE: str = Field(default="BROWSE")
STAGE_NUM: int = Field(default=1)
TONE: str = Field(default="Friendly")
......@@ -155,10 +172,11 @@ class InsightJSON(BaseModel):
class StylistOutput(BaseModel):
"""Stylist output — NẶNG, sinh cả response + insight."""
model_config = {"extra": "ignore"}
ai_response: str = Field(description="Câu trả lời cho khách (max 200 từ)")
product_ids : list[str] = Field(default_factory=list)
product_ids: list[str] = Field(default_factory=list)
user_insight: InsightJSON = Field(description="Updated user insight sau turn này")
......@@ -166,6 +184,7 @@ class StylistOutput(BaseModel):
# State
# ═══════════════════════════════════════════════
class LeadStageState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
......@@ -190,6 +209,7 @@ class LeadStageState(TypedDict):
# Helpers
# ═══════════════════════════════════════════════
def _extract_text(content) -> str:
"""Extract plain text từ msg.content."""
if isinstance(content, str):
......@@ -205,10 +225,33 @@ def _extract_text(content) -> str:
return str(content or "")
_SKU_REGEX = re.compile(r"\b\d[A-Z0-9]{5,}(?:-[A-Z0-9]{2,}){0,2}\b", re.IGNORECASE)
def _dedupe_preserve_order(items: list[str]) -> list[str]:
seen: set[str] = set()
out: list[str] = []
for raw in items:
val = str(raw or "").upper().strip()
if not val or val in seen:
continue
seen.add(val)
out.append(val)
return out
def _extract_skus_from_text(text: str | None) -> list[str]:
if not text:
return []
matches = _SKU_REGEX.findall(text)
return _dedupe_preserve_order([m for m in matches if m])
# ═══════════════════════════════════════════════
# Graph Builder
# ═══════════════════════════════════════════════
class LeadStageGraph:
"""2-Agent: Classifier (NHẸ, route tool) → Stylist (NẶNG, response + insight)."""
......@@ -228,15 +271,11 @@ class LeadStageGraph:
# AI 1: Classifier — structured output, NHẸ (streaming=False)
_classifier_base = create_llm(model_name=self.model_name, streaming=False)
self.classifier_llm = _classifier_base.with_structured_output(
ClassifierOutput, method="function_calling"
)
self.classifier_llm = _classifier_base.with_structured_output(ClassifierOutput, method="function_calling")
# AI 2: Stylist — structured output, NẶNG (streaming=False vì structured)
_stylist_base = create_llm(model_name=self.model_name, streaming=False)
self.stylist_llm = _stylist_base.with_structured_output(
StylistOutput, method="function_calling"
)
self.stylist_llm = _stylist_base.with_structured_output(StylistOutput, method="function_calling")
self._compiled = None
logger.info(f"✅ LeadStageGraph initialized | model: {self.model_name}")
......@@ -276,7 +315,7 @@ class LeadStageGraph:
if sku and sku not in dedup:
dedup[sku] = p
unique_products = list(dedup.values())
lines = [f"✅ Tìm thấy {len(unique_products)} sản phẩm:"]
for p in unique_products[:8]:
price = p.get("price", 0)
......@@ -289,22 +328,22 @@ class LeadStageGraph:
desc = (p.get("description") or "").strip()
if desc:
line += f"\n 📝 {desc[:200]}"
outfit = p.get("outfit_recommendations")
if outfit and isinstance(outfit, list):
outfit_lines = []
for o in outfit[:3]:
outfit_lines.append(f"{o.get('role', 'match')}: {o.get('match_product_name')} ({o.get('match_product_code')})")
outfit_lines.append(
f"{o.get('role', 'match')}: {o.get('match_product_name')} ({o.get('match_product_code')})"
)
if outfit_lines:
line += f"\n 👗 outfit_recommendations: {', '.join(outfit_lines)}"
lines.append(line)
return "\n".join(lines)
else:
return parsed.get("message", "Không tìm thấy sản phẩm phù hợp.")
else:
content = parsed.get("content") or parsed.get("message") or str(parsed)
return str(content)[:800]
return parsed.get("message", "Không tìm thấy sản phẩm phù hợp.")
content = parsed.get("content") or parsed.get("message") or str(parsed)
return str(content)[:800]
except Exception:
return (tool_result or "")[:500]
......@@ -347,15 +386,15 @@ class LeadStageGraph:
start = time.time()
try:
output: ClassifierOutput = await self.classifier_llm.ainvoke(
classifier_messages, config=config
)
output: ClassifierOutput = await self.classifier_llm.ainvoke(classifier_messages, config=config)
except Exception as e:
logger.error(f"❌ Classifier error: {e}")
return {
"tool_result": None,
"tool_name_used": None,
"diagnostics": [{"step": "classifier_error", "label": "❌ Classifier", "content": str(e)[:300], "elapsed_ms": 0}],
"diagnostics": [
{"step": "classifier_error", "label": "❌ Classifier", "content": str(e)[:300], "elapsed_ms": 0}
],
}
elapsed_ms = (time.time() - start) * 1000
......@@ -373,6 +412,10 @@ class LeadStageGraph:
else:
tool_args = {}
# ── PASS USER_INSIGHT TO LEAD_SEARCH_TOOL ──
if tool_name == "lead_search_tool" and user_insight:
tool_args["user_insight"] = user_insight
# ── Log Classifier parsed output ──
logger.info(
"[CLASSIFIER] tool=%s | product_line=%s | keywords=%s | gender=%s | color=%s | reasoning=%s",
......@@ -394,7 +437,7 @@ class LeadStageGraph:
"content": output.reasoning,
"elapsed_ms": round(elapsed_ms),
}
# ── EARLY EXIT (KHÔNG GỌI TOOL) ──
if not tool_name and ai_response:
logger.info(f"⚡ Early Exit: {ai_response[:50]}...")
......@@ -438,13 +481,15 @@ class LeadStageGraph:
except Exception:
summary = (tool_result or "")[:200]
diagnostics.append({
"step": "tool_result",
"label": f"📦 {tool_name}",
"content": summary,
"raw_json": tool_result,
"elapsed_ms": tool_elapsed,
})
diagnostics.append(
{
"step": "tool_result",
"label": f"📦 {tool_name}",
"content": summary,
"raw_json": tool_result,
"elapsed_ms": tool_elapsed,
}
)
except Exception as te:
logger.error(f"❌ Tool '{tool_name}' error: {te}")
tool_result = json.dumps({"status": "error", "message": str(te)})
......@@ -468,7 +513,9 @@ class LeadStageGraph:
# Format insight cũ → inject vào system prompt
try:
old_insight = json.loads(user_insight) if isinstance(user_insight, str) and user_insight.startswith("{") else {}
old_insight = (
json.loads(user_insight) if isinstance(user_insight, str) and user_insight.startswith("{") else {}
)
except Exception:
old_insight = {}
......@@ -504,9 +551,7 @@ class LeadStageGraph:
# Tool results
tool_context = self._format_tool_result(tool_result)
if tool_context:
stylist_messages.append(
HumanMessage(content=f"=== KẾT QUẢ TRA CỨU ===\n{tool_context}")
)
stylist_messages.append(HumanMessage(content=f"=== KẾT QUẢ TRA CỨU ===\n{tool_context}"))
else:
stylist_messages.append(
HumanMessage(content="=== KHÔNG CÓ TOOL RESULT === (Khách chào hỏi hoặc câu hỏi chung)")
......@@ -515,9 +560,7 @@ class LeadStageGraph:
# ── LLM Call: Structured Output ──
start = time.time()
try:
output: StylistOutput = await self.stylist_llm.ainvoke(
stylist_messages, config=config
)
output: StylistOutput = await self.stylist_llm.ainvoke(stylist_messages, config=config)
except Exception as e:
logger.error(f"❌ Stylist structured output error: {e}")
# Fallback: trả response text thuần
......@@ -528,11 +571,17 @@ class LeadStageGraph:
"lead_stage": {"stage": 1, "stage_name": "BROWSE", "tone_directive": "Friendly"},
"stage_injection": "",
"product_ids": [],
"diagnostics": [{"step": "stylist_error", "label": "❌ Stylist Error", "content": str(e)[:300], "elapsed_ms": 0}],
"diagnostics": [
{"step": "stylist_error", "label": "❌ Stylist Error", "content": str(e)[:300], "elapsed_ms": 0}
],
}
elapsed_ms = (time.time() - start) * 1000
# Ensure product_ids covers every SKU mentioned in ai_response (deterministic safeguard)
from_text = _extract_skus_from_text(output.ai_response)
merged_product_ids = _dedupe_preserve_order((output.product_ids or []) + from_text)
# Extract insight
insight_dict = output.user_insight.model_dump()
lead_stage = {
......@@ -545,7 +594,7 @@ class LeadStageGraph:
logger.info(
f"💬 Stylist: stage={insight_dict.get('STAGE')} | goal={insight_dict.get('GOAL')} | "
f"response_len={len(output.ai_response)} | products={output.product_ids} | {elapsed_ms:.0f}ms"
f"response_len={len(output.ai_response)} | products={merged_product_ids} | {elapsed_ms:.0f}ms"
)
diag = {
......@@ -561,7 +610,7 @@ class LeadStageGraph:
"updated_insight": insight_dict,
"lead_stage": lead_stage,
"stage_injection": stage_injection,
"product_ids": output.product_ids or [],
"product_ids": merged_product_ids,
"diagnostics": [diag],
}
......@@ -586,13 +635,9 @@ class LeadStageGraph:
workflow.add_node("stylist", self._stylist_node)
workflow.set_entry_point("classifier")
workflow.add_conditional_edges(
"classifier",
self._after_classifier,
{"stylist": "stylist", END: END}
)
workflow.add_conditional_edges("classifier", self._after_classifier, {"stylist": "stylist", END: END})
workflow.add_edge("stylist", END)
self._compiled = workflow.compile()
......@@ -625,6 +670,7 @@ class LeadStageGraph:
"updated_insight": None,
"lead_stage": None,
"stage_injection": None,
"product_ids": [],
"diagnostics": [],
}
......@@ -655,25 +701,145 @@ class LeadStageGraph:
all_products = parsed.get("products", [])
except Exception:
pass
# Filter products by AI selection
if "product_ids" in result and result["product_ids"] is not None:
ai_chosen_ids = result["product_ids"]
filtered_products = []
for pid in ai_chosen_ids:
for p in all_products:
psku = str(p.get("sku_code") or p.get("sku") or p.get("internal_ref_code") or "").upper()
if psku and psku in str(pid).upper():
filtered_products.append(p)
break
# LUÔN ghi đè. Nếu Stylist chốt mảng rỗng [] thì Frontend cũng nhận list rỗng (chứ k fallback nhét bừa)
all_products = filtered_products
# ════════════════════════════════════════════════════════════════
# LỌC & FLATTEN SẢN PHẨM:
# Lấy đúng các sản phẩm AI đã chọn trả ra trong product_ids,
# bao gồm cả sản phẩm chính VÀ sản phẩm phối (nằm trong outfit_recommendations).
# ════════════════════════════════════════════════════════════════
final_products = []
ai_product_ids = result.get("product_ids", [])
if ai_product_ids and all_products:
# Tạo dictionary lưu trữ mọi sản phẩm (chính + outfit) theo dạng in hoa, không khoảng trắng
product_dict = {}
for p in all_products:
# Tìm đúng key của mã SKU cho SP chính
p_sku = (
str(
p.get("magento_ref_code")
or p.get("sku_code")
or p.get("sku")
or p.get("internal_ref_code")
or ""
)
.upper()
.strip()
)
if p_sku:
# Đảm bảo frontend có trường 'sku' để render
if "sku" not in p:
p["sku"] = p_sku
product_dict[p_sku] = p
# Duyệt luôn outfit_recommendations để gom data
outfit_recs = p.get("outfit_recommendations", [])
if isinstance(outfit_recs, list):
for rec in outfit_recs:
# Món phối thường nằm ở match_product_code hoặc magento_ref_code
rec_sku = (
str(rec.get("match_product_code") or rec.get("magento_ref_code") or rec.get("sku") or "")
.upper()
.strip()
)
if not rec_sku:
continue
# Tạo object tối thiểu để FE render được nếu chưa enrich
if "sku" not in rec:
rec["sku"] = rec_sku
if "name" not in rec:
rec["name"] = rec.get("match_product_name") or rec.get("product_name") or ""
product_dict[rec_sku] = rec
# Enrich: nếu product_ids có món phối chưa có đủ data (price/image), lookup DB theo magento_ref_code
wanted_ids = [str(x).upper().strip() for x in (ai_product_ids or []) if str(x).strip()]
missing_ids = [
pid
for pid in wanted_ids
if pid not in product_dict
or not isinstance(product_dict.get(pid), dict)
or not product_dict.get(pid, {}).get("name")
or not product_dict.get(pid, {}).get("image")
]
if missing_ids:
try:
from agent.lead_stage_agent.product_search_engine import SELECT_COLUMNS, TABLE_NAME
from common.starrocks_connection import get_db_connection
db = get_db_connection()
# Query cả magento_ref_code (full) + internal_ref_code (base)
skus = list(dict.fromkeys([pid for pid in missing_ids if pid]))
base_skus = list(dict.fromkeys([pid.split("-")[0] for pid in skus if pid]))
keys = list(dict.fromkeys([*skus, *base_skus]))
placeholders = ",".join(["%s"] * len(keys))
sql = f"""
SELECT {SELECT_COLUMNS}
FROM {TABLE_NAME}
WHERE UPPER(magento_ref_code) IN ({placeholders})
OR UPPER(internal_ref_code) IN ({placeholders})
ORDER BY quantity_sold DESC NULLS LAST
LIMIT 200
"""
rows = await db.execute_query_async(sql, params=tuple(keys + keys))
# Map kết quả
by_magento: dict[str, dict] = {}
by_internal: dict[str, dict] = {}
for r in rows or []:
mag = str(r.get("magento_ref_code") or "").upper().strip()
internal = str(r.get("internal_ref_code") or "").upper().strip()
if mag and mag not in by_magento:
by_magento[mag] = r
if internal and internal not in by_internal:
by_internal[internal] = r
def _row_to_card(row: dict) -> dict:
sale = float(row.get("sale_price") or 0)
orig = float(row.get("original_price") or 0)
has_discount = sale < orig and orig > 0
disc_pct = int(row.get("discount_percent") or 0)
sku = (row.get("magento_ref_code") or row.get("internal_ref_code") or "").strip()
return {
"sku": sku,
"name": row.get("product_name", ""),
"price": int(sale),
"original_price": int(orig),
"discount": f"-{disc_pct}%" if has_discount else None,
"color": row.get("master_color", ""),
"gender": row.get("gender_by_product", ""),
"product_line": row.get("product_line_vn", ""),
"image": row.get("product_image_url_thumbnail", ""),
"url": row.get("product_web_url", ""),
"sizes": row.get("size_scale", ""),
}
for pid in skus:
row = by_magento.get(pid) or by_internal.get(pid.split("-")[0])
if row:
product_dict[pid] = _row_to_card(row)
except Exception as e:
logger.warning(f"⚠️ Product enrich by SKU failed: {str(e)[:200]}")
# Gom đúng thứ tự AI trả ra, tránh duplicate
seen_skus = set()
for raw_pid in ai_product_ids:
pid = str(raw_pid).upper().strip()
if pid in product_dict and pid not in seen_skus:
final_products.append(product_dict[pid])
seen_skus.add(pid)
else:
# Nếu AI không trả product_ids (hoặc không có tool_result), trả về mặc định
final_products = all_products
logger.info(
"🏷️ [LEAD] query='%s' | stage=%s | products=%d | time=%.0fms",
user_message[:50],
result.get("lead_stage", {}).get("stage_name", "?") if result.get("lead_stage") else "?",
len(all_products),
len(final_products),
elapsed_ms,
)
......@@ -683,7 +849,8 @@ class LeadStageGraph:
"lead_stage": result.get("lead_stage"),
"updated_insight": result.get("updated_insight"),
"pipeline": pipeline,
"products": all_products,
"product_ids": result.get("product_ids", []),
"products": final_products,
}
......
"""
Hard Pattern Detection — Bắt các mẫu hỏi CỨNG đòi hỏi tham chiếu.
Các pattern phổ biến:
• "rẻ hơn", "giá rẻ hơn" → so sánh giá
• "đắt hơn", "cao hơn" → so sánh giá
• "tương tự", "giống như" → lấy đặc điểm SP tham chiếu
• "như cái trước", "cái kia" → refer đến SP đã nói
• "còn size...", "size còn không" → check size variant
• "còn màu...", "màu gì còn" → check color variant
• "còn hàng không" → check stock
"""
import re
from typing import TypedDict, Optional, Any
from dataclasses import dataclass
@dataclass
class PatternMatch:
"""Kết quả pattern detection."""
pattern_type: str # "cheaper_than", "similar_to", "check_size", ...
reference_sku: Optional[str] = None
reference_product_line: Optional[str] = None
reference_price: Optional[float] = None
reference_color: Optional[str] = None
reference_size: Optional[str] = None
# Target criteria (nếu có)
target_price_max: Optional[float] = None
target_price_min: Optional[float] = None
target_size: Optional[str] = None
target_color: Optional[str] = None
confidence: float = 0.0 # 0.0 - 1.0
raw_match: str = "" # Văn bản khớp
class HardPatternDetector:
"""Detect và trích xuất reference từ query user."""
# ═══════════════════════════════════════════════════════════════
# PATTERN REGEXES
# ═══════════════════════════════════════════════════════════════
# 1. GIÁ RẺ HƠN
CHEAPER_PATTERNS = [
r"(?:rẻ|giá)\s+(?:hơn|thấp hơn|xuống hơn)",
r"có\s+(?:cái)?\s+(?:nào)?\s+(?:rẻ|giá)\s+(?:hơn|thấp hơn)",
r"tìm\s+(?:cái)?\s+(?:rẻ|giá)\s+(?:hơn|thấp hơn)",
r"(?:rẻ|giá)\s+(?:hơn|thấp hơn)\s+(?:cái|chiếc)?",
r"có.*rẻ.*hơn",
]
# 2. GIÁ ĐẮT HƠN
EXPENSIVE_PATTERNS = [
r"(?:đắt|giá)\s+(?:hơn|cao hơn)",
r"có\s+(?:cái)?\s+(?:nào)?\s+(?:đắt|giá)\s+(?:hơn|cao hơn)",
r"(?:đắt|giá)\s+(?:hơn|cao hơn)\s+(?:cái|chiếc)?",
r"có.*đắt.*hơn",
]
# 3. TƯƠNG TỰ / GIỐNG NHƯ
SIMILAR_PATTERNS = [
r"(?:giống|tương tự|như|cùng)\s+(?:như|với)?\s+(?:cái|chiếc)?\s*(?:đó|trước|ban\s*nãy|cạnh|là)?",
r"còn\s+(?:cái)?\s+(?:nào|còn)?\s+(?:tương tự|giống)\s+(?:như|với)?",
r"(?:có|tìm)\s+(?:cái)?\s+(?:nào|còn)?\s+(?:giống|tương tự)",
r"giống\s+(?:như)?\s+(?:cái)?(?:đó|trước|kia)",
]
# 4. CHECK SIZE (còn size?, size còn không, có size X không)
SIZE_CHECK_PATTERNS = [
r"(?:còn|có)\s+size\s+([A-Z0-9]+)",
r"size\s+([A-Z0-9]+)\s+(?:còn|có)",
r"(?:còn|có)\s+(?:size\s+)?([A-Z0-9]+)\s+(?:không|ko|kh)",
]
# 5. CHECK MÀU (còn màu?, màu gì còn, có màu X không)
COLOR_CHECK_PATTERNS = [
r"(?:còn|có)\s+(?:màu\s+)?(\w+)\s+(?:còn|không|ko)",
r"màu\s+(\w+)\s+(?:còn|có)",
r"(?:màu\s+)?(\w+)\s+(?:còn|có)\s+(?:không|ko)",
]
# 6. CHECK TỒN KHO (còn hàng không, còn ko, có hàng ko)
STOCK_CHECK_PATTERNS = [
r"(?:còn|có)\s+(?:hàng|kho)\s+(?:không|ko|kh)",
r"(?:hàng|kho)\s+(?:còn|có)\s+(?:không|ko|kh)",
]
# 7. REFERENCE SKU (nhắc trực tiếp mã: 8TP25A002, 5TS25S021-SR079)
SKU_PATTERN = r"\b([A-Z0-9]{4,}-[A-Z0-9]{3,}(?:-[A-Z0-9]{3,})?)\b"
# 8. REFERENCE BY POSITION (cái trước, cái kia, cái đầu tiên)
POSITION_REFERENCE = {
"trước": 0, # SP trước đó (index -1)
"đầu": 0, # SP đầu tiên trong list
"đầu tiên": 0,
"kia": -1, # SP cuối/cuối cùng (gần nhất)
"cuối": -1,
"cuối cùng": -1,
"sau": -1,
}
def detect(self, query: str, context_products: list[dict[str, Any]] = None) -> PatternMatch:
"""
Detect pattern trong query user.
Args:
query: Câu hỏi của user (raw text)
context_products: List sản phẩm đã được giới thiệu trong conversation
[{"sku": "...", "product_line_vn": "...", "sale_price": ...}, ...]
Returns:
PatternMatch với đầy đủ thông tin pattern.
"""
query_lower = query.lower().strip()
match = PatternMatch(pattern_type="unknown")
# 1. Detect CHEAPER THAN
if self._match_any_pattern(query_lower, self.CHEAPER_PATTERNS):
match.pattern_type = "cheaper_than"
match.confidence = 0.9
match = self._extract_reference_info(query, context_products, match)
if match.reference_price:
match.target_price_max = match.reference_price * 0.95 # Rẻ hơn 5%
# 2. Detect EXPENSIVE THAN
elif self._match_any_pattern(query_lower, self.EXPENSIVE_PATTERNS):
match.pattern_type = "more_expensive_than"
match.confidence = 0.9
match = self._extract_reference_info(query, context_products, match)
if match.reference_price:
match.target_price_min = match.reference_price * 1.05 # Đắt hơn 5%
# 3. Detect SIMILAR TO
elif self._match_any_pattern(query_lower, self.SIMILAR_PATTERNS):
match.pattern_type = "similar_to"
match.confidence = 0.8
match = self._extract_reference_info(query, context_products, match)
# 4. Detect SIZE CHECK
elif size_match := re.search(r"(?:còn|có)\s+size\s+([A-Z0-9]+)", query_lower):
match.pattern_type = "check_size"
match.confidence = 0.95
match.target_size = size_match.group(1).upper()
match = self._extract_reference_info(query, context_products, match)
# 5. Detect COLOR CHECK
elif color_match := re.search(r"(?:màu\s+)?(\w+)\s+(?:còn|có)", query_lower):
# Filter out common stop words
color = color_match.group(1)
if color not in ["có", "còn", "ko", "không", "màu"]:
match.pattern_type = "check_color"
match.confidence = 0.9
match.target_color = color
match = self._extract_reference_info(query, context_products, match)
# 6. Detect STOCK CHECK
elif self._match_any_pattern(query_lower, self.STOCK_CHECK_PATTERNS):
match.pattern_type = "check_stock"
match.confidence = 0.9
match = self._extract_reference_info(query, context_products, match)
# 7. Detect direct SKU reference
elif sku_match := re.search(self.SKU_PATTERN, query):
match.pattern_type = "direct_sku"
match.confidence = 1.0
match.reference_sku = sku_match.group(1)
return match
def _match_any_pattern(self, text: str, patterns: list[str]) -> bool:
"""Check nếu text match bất kỳ pattern nào."""
return any(re.search(p, text) for p in patterns)
def _extract_reference_info(self, query: str, context_products: list[dict[str, Any]],
current_match: PatternMatch) -> PatternMatch:
"""
Trích xuất thông tin reference SP từ context.
Logic:
1. Nếu query có SKU cụ thể → tìm trực tiếp
2. Nếu query có tên product_line ("áo polo kia") → filter context
3. Nếu query có từ vị trí ("cái trước", "cái kia") → lấy theo index
4. Default: lấy SP gần nhất (cuối context)
"""
if not context_products:
return current_match
query_lower = query.lower()
# --- Case 1: Direct SKU trong query ---
sku_match = re.search(self.SKU_PATTERN, query)
if sku_match:
sku = sku_match.group(1)
for p in context_products:
if p.get("sku") == sku or p.get("magento_ref_code") == sku:
self._populate_match_from_product(p, current_match)
return current_match
# --- Case 2: Reference by position ---
for pos_word, idx_offset in self.POSITION_REFERENCE.items():
if pos_word in query_lower:
idx = idx_offset if idx_offset >= 0 else len(context_products) + idx_offset
if 0 <= idx < len(context_products):
self._populate_match_from_product(context_products[idx], current_match)
return current_match
# --- Case 3: Reference by product_line ---
# Từ khóa product_line phổ biến
product_keywords = [
"áo phông", "áo polo", "áo sơ mi", "áo khoác", "áo len",
"quần jean", "quần khaki", "quần âu", "quần soóc", "váy", "chân váy",
"bộ", "đồ lót", "quần lót", "áo dài", "blazer", "cardigan"
]
for kw in product_keywords:
if kw in query_lower:
# Tìm SP trong context có product_line chứa kw
for p in reversed(context_products): # Ưu tiên SP gần nhất
if kw in p.get("product_line_vn", "").lower():
self._populate_match_from_product(p, current_match)
return current_match
# --- Case 4: Default → SP gần nhất (last in context) ---
if context_products:
self._populate_match_from_product(context_products[-1], current_match)
return current_match
def _populate_match_from_product(self, product: dict[str, Any], match: PatternMatch) -> None:
"""Fill match info từ product dict."""
match.reference_sku = product.get("sku") or product.get("magento_ref_code") or product.get("internal_ref_code")
match.reference_product_line = product.get("product_line_vn") or product.get("product_line")
match.reference_price = float(product.get("sale_price") or product.get("price") or 0)
match.reference_color = product.get("master_color") or product.get("color")
match.reference_size = None # Size thường không có trong product summary
......@@ -21,6 +21,7 @@ async def lead_search_tool(
inferred: InferredSearch,
magento_ref_code: str | None = None,
reasoning: str | None = None,
user_insight: dict | None = None, # THÊM: từ graph state
) -> str:
"""
Tim kiem san pham CANIFA theo Dual-Lane Architecture.
......@@ -34,7 +35,7 @@ async def lead_search_tool(
try:
# Gọi engine để xử lý logic dual-query / cascade
result_dict = await engine.search(req, reasoning=reasoning)
result_dict = await engine.search(req, reasoning=reasoning, user_insight=user_insight)
return json.dumps(result_dict, ensure_ascii=False, default=str)
except Exception as e:
logger.error("Lead search tool error: %s", e, exc_info=True)
......
......@@ -18,6 +18,7 @@ Architecture:
import json
import logging
import os as _os
import re
import sqlite3
import time
......@@ -27,9 +28,13 @@ from pydantic import BaseModel, Field
from common.starrocks_connection import get_db_connection
from .product_mapping import get_related_lines, resolve_product_line
from .hard_patterns import HardPatternDetector
logger = logging.getLogger(__name__)
# Global hard pattern detector
_hard_pattern_detector = HardPatternDetector()
try:
from langfuse import get_client as _get_langfuse
_LANGFUSE_AVAILABLE = True
......@@ -1334,9 +1339,13 @@ class ProductSearchEngine:
return [], 3, None
async def search(self, req: LeadSearchInput, reasoning: str | None = None) -> dict:
async def search(self, req: LeadSearchInput, reasoning: str | None = None,
user_insight: dict = None) -> dict:
"""
Entry point: Dual-Lane search + enrichment pipeline.
Args:
user_insight: User insight dict (chứa LATEST_PRODUCT_INTEREST, TARGET, ...)
"""
start = time.time()
db = get_db_connection()
......@@ -1344,6 +1353,40 @@ class ProductSearchEngine:
try:
fallback_msg = None
# ★★★ PRE-SEARCH: HARD PATTERN DETECTION & ADAPTATION ★★★
# Detect nếu query có pattern "rẻ hơn", "tương tự", ...
context_products = []
if user_insight:
# Lấy LATEST_PRODUCT_INTEREST và lookup thành product dict
latest = user_insight.get("LATEST_PRODUCT_INTEREST")
if latest:
# Parse SKU từ format "Áo Polo Nam Basic (8TP25A002)" hoặc "8TP25A002"
sku = self._parse_sku_from_insight(latest)
if sku:
# Lookup product từ DB để có giá
product = await self._lookup_product_by_sku(sku, db)
if product:
context_products = [product]
pattern = _hard_pattern_detector.detect(req.literal.raw_text, context_products)
if pattern.pattern_type == "cheaper_than" and pattern.target_price_max:
# Override price_max từ pattern
orig_price_max = req.inferred.price_max
req.inferred.price_max = int(pattern.target_price_max)
logger.info(
f"[HARD PATTERN] 'Rẻ hơn' detected: ref_price={pattern.reference_price:,.0f}đ "
f"→ set price_max={req.inferred.price_max:,.0f}đ (was {orig_price_max})"
)
elif pattern.pattern_type == "more_expensive_than" and pattern.target_price_min:
orig_price_min = req.inferred.price_min
req.inferred.price_min = int(pattern.target_price_min)
logger.info(
f"[HARD PATTERN] 'Đắt hơn' detected: ref_price={pattern.reference_price:,.0f}đ "
f"→ set price_min={req.inferred.price_min:,.0f}đ (was {orig_price_min})"
)
# Các pattern khác có thể xử lý sau...
# ★ UNDERWEAR HARD OVERRIDE — drop gender/age trước khi search ★
underwear_msg = _apply_underwear_override(req)
......@@ -1435,3 +1478,26 @@ class ProductSearchEngine:
logger.error("Lead search tool error: %s", e, exc_info=True)
return {"status": "error", "message": str(e)}
# ═══════════════════════════════════════════════════════════════
# HELPER: Hard Pattern Support
# ═══════════════════════════════════════════════════════════════
def _parse_sku_from_insight(self, insight_value: str) -> str | None:
"""Parse SKU từ LATEST_PRODUCT_INTEREST."""
if not insight_value:
return None
# Format: "Áo Polo Nam Basic (8TP25A002)" hoặc "8TP25A002"
match = re.search(r"\(?([A-Z0-9]{4,}-[A-Z0-9]{3,})\)?", str(insight_value))
return match.group(1) if match else None
async def _lookup_product_by_sku(self, sku: str, db) -> dict | None:
"""Lookup product từ DB bằng SKU."""
sql = f"""
SELECT {SELECT_COLUMNS}
FROM {TABLE_NAME}
WHERE magento_ref_code = %s OR internal_ref_code = %s
LIMIT 1
"""
result = await db.execute_query_async(sql, params=(sku, sku))
return result[0] if result else None
......@@ -277,9 +277,10 @@ Tư vấn thời trang chuyên nghiệp, thân thiện, bám sát sản phẩm t
- **SINGLE-ITEM INTENT:** Khách HỎI ĐÍCH DANH MỘT TÊN SẢN PHẨM CỤ THỂ (VD: "mua áo gì", "nên chọn váy nào", "tìm quần lót dài") → CHỈ GỢI Ý NHỮNG SP CHÍNH, KHÔNG CẦN TỰ ĐỘNG PHỐI ĐỒ.
+ `product_ids`: Chỉ chứa SKU của các SP chính được gợi ý (1-3 sản phẩm).
+ Response: Tập trung vào tính năng, chất liệu, phù hợp dịp. KHÔNG bắt buộc phải bốc `outfit_recommendations`.
- **COMBO/OUTFIT INTENT:** Khách dùng TỪ CHUNG CHUNG ("tìm đồ đi làm", "quần áo mùa hè", "đồ đi biển", "set đồ", "combo", "phối", "mặc gì đi") → BẮT BUỘC CHUYỂN SANG CHẾ ĐỘ TƯ VẤN OUTFIT (TỰ ĐỘNG PHỐI TOP + BOTTOM TRỌN BỘ).
+ ⚠️ Khách hỏi "đồ", "quần áo" là muốn tìm CẢ BỘ (Set). Tuyệt đối KHÔNG ĐƯỢC chỉ trả về toàn Quần hoặc toàn Áo, toàn Váy.
- **COMBO/OUTFIT INTENT:** Khách dùng TỪ CHUNG CHUNG ("tìm đồ đi làm", "quần áo mùa hè", "đồ đi biển", "đi chợ", "đi chơi cho cả gia đình", "set đồ", "combo", "phối", "mặc gì đi") → BẮT BUỘC CHUYỂN SANG CHẾ ĐỘ TƯ VẤN OUTFIT (TỰ ĐỘNG PHỐI TOP + BOTTOM TRỌN BỘ).
+ ⚠️ Khách hỏi "đồ", "quần áo" là muốn tìm CẢ BỘ (Set). Tuyệt đối KHÔNG ĐƯỢC chỉ trả về toàn Quần hoặc toàn Áo, toàn Váy.
+ ⚠️ NẾU TOOL CHỈ TRẢ VỀ TOÀN QUẦN, BẠN PHẢI MỞ `outfit_recommendations` CỦA CÁI QUẦN ĐÓ RA VÀ BỐC LẤY 1 CÁI ÁO ("role": "top") ĐỂ GỢI Ý NGAY LẬP TỨC VỚI QUẦN ĐÓ! KHÔNG ĐƯỢC BẢO "BẠN MUỐN ÁO GÌ ĐỂ MÌNH TÌM".
+ ⚠️ KHI TOOL TRẢ VỀ NHIỀU SẢN PHẨM (2+ items) — ĐẶC BIỆT là "đi chơi/đi chợ cho cả gia đình": BẮT BUỘC đưa TẤT CẢ sản phẩm vào `product_ids` và đề cập đến tất cả trong response. KHÔNG ĐƯỢC bỏ sót bất kỳ món nào.
+ `product_ids`: Chứa SKU của các SP chính + SP phối từ `outfit_recommendations` (Bắt buộc phải có Áo + Quần/Váy phối hợp).
+ Response: Viết theo dạng OUTFIT với phối hợp logic giữa các món (TOP + BOTTOM / OUTERWEAR).
......@@ -367,20 +368,30 @@ Tool trả: Áo khoác blazer nam (8OT25W001) + `outfit_recommendations: [Quần
Bạn cho mình chiều cao cân nặng để mình check size nhé."
- `product_ids: ["8OT25W001", "8BP25S001", "8SM25S002"]` ← Bắt buộc phải nhét SKU của TẤT CẢ các món để hiện lên màn hình.
**VD5 — FAMILY/GROUP INTENT (đi chơi/đi chợ cho cả gia đình):**
Query: "tìm đồ đi chơi cho cả gia đình"
Tool trả: [Áo sơ mi bé trai (2TH25A001), Áo phông nam (8TS26S018-SK010), Váy bé gái (1DS26S003-SM090)]
- ❌ SAI: Chỉ nhắc Áo sơ mi + Váy, bỏ Áo phông nam → `product_ids: ["2TH25A001", "1DS26S003-SM090"]`
- ⚠️ Lý do: Bỏ sản phẩm quan trọng (áo phông) → UI không hiển thị → khách không thấy.
- ✅ ĐÚNG: Phải liệt kê TẤT CẢ sản phẩm tool trả về, phân nhóm rõ ràng:
"Bạn ơi, set đồ cả gia đình thì mình gợi ý theo 3 nhóm này nè:
• Bé trai: Áo sơ mi nam ngắn tay (2TH25A001) — phối Quần khaki bé trai (2BK25S002).
• Người lớn nam: Áo phông nam có hình in (8TS26S018-SK010) — phối Quần jeans nam.
• Bé gái: Váy trẻ em in hình động vật (1DS26S003-SM090).
Bạn cho mình biết bé nhà mình bao nhiêu tuổi và người lớn mặc size gì nhé?"
- `product_ids: ["2TH25A001", "8TS26S018-SK010", "1DS26S003-SM090"]` ← BAO GỒM TẤT CẢ.
### NHIỆM VỤ 2 — Chốt `product_ids` (HIỂN THỊ LÊN GIAO DIỆN):
**Cho SINGLE-ITEM INTENT:**
- `product_ids` chỉ chứa SKU của các SP chính được gợi ý (1-3 sản phẩm).
- BẤT BUỘC KHÔNG nhét các SP phối thêm vào danh sách này.
- VD: Khách hỏi "mua áo gì thoáng mát" → `product_ids: ["5TS25S001", "6TS25S002"]` (chỉ áo).
**💥 LUẬT TỐI CAO BẮT BUỘC (SỐNG CÒN):** BẤT KỲ MÃ SKU NÀO XUẤT HIỆN TRONG TEXT `ai_response` TRONG CẶP NGOẶC TRÒN (VD: `(8TH26A001-SW001)`) ĐỀU PHẢI NẰM TRONG MẢNG `product_ids`!
- TUYỆT ĐỐI KHÔNG BỎ SÓT BẤT CỨ MÃ NÀO! Kể cả khi đó là SP chính hay SP mà bạn gợi ý phối thêm. Nếu bạn nhắc tên nó, bạn PHẢI nhét mã nó vào `product_ids`.
- Nếu AI response nhắc đến 3 mã, `product_ids` PHẢI CÓ 3 phần tử. Nhắc 5 mã, `product_ids` PHẢI CÓ 5 phần tử.
**Cho COMBO/OUTFIT INTENT:**
- `product_ids` chứa SKU của TẤT CẢ sản phẩm được gợi ý (SP chính + SP phối).
- Chỉ nhét sản phẩm mà bạn đã bàn luận trong response.
- VD: Khách hỏi "combo đi biển" → `product_ids: ["6BS25S011", "6TS25S001", "6SS25S002"]` (quần + 2 áo phối).
**Cho KHÔNG PHÂN BIỆT SINGLE-ITEM HAY COMBO:**
- `product_ids` LÀ DANH SÁCH DUY NHẤT CHỨA TẤT CẢ CÁC MÃ SẢN PHẨM ĐƯỢC NHẮC TỚI TRONG CÂU TRẢ LỜI. MỌI MÃ XUẤT HIỆN ĐỀU PHẢI NẰM TRONG NÀY.
- VD: Khách hỏi áo, bạn giới thiệu 2 áo: `(8TS25C002-SE260)`, `(8TS26S018-SM200)`. Xong bạn tiện miệng bảo "có thể phối quần `(8TH26A001-SW001)`" → `product_ids` BẮT BUỘC phải là: `["8TS25C002-SE260", "8TS26S018-SM200", "8TH26A001-SW001"]`.
**LƯU Ý:**
- Nếu bạn gợi ý outfit nhưng nhân vật đó không xuất hiện trong ai_response hoặc không phù hợp → ĐỪNG nhét vào `product_ids`.
- Luôn đảm bảo `product_ids` phản ánh chính xác những gì bạn đã recommend.
- Luôn cẩn thận double-check vòng cuối: Quét qua toàn bộ nội dung `ai_response` bạn chuẩn bị nhả ra. Cứ thấy mã `(6xxxxxx)` hoặc `(8xxxxxx)`... là vứt thẳng vào array `product_ids`!
### NHIỆM VỤ 3 — Sinh `user_insight` (InsightJSON — bộ nhớ khách hàng):
Cập nhật 12 trường dưới đây sau mỗi turn. Cộng dồn thông tin.
......
"""
Regenerate AI outfit matches — clear old + run full batch.
Usage:
py backend/worker/scripts/regenerate_matches.py # run all
py backend/worker/scripts/regenerate_matches.py --limit 10 # dry-run 10 products
py backend/worker/scripts/regenerate_matches.py --execute --limit 50 # commit 50 products
"""
import argparse
import sqlite3
import sys
import time
from pathlib import Path
# Add backend to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from common.constants import SQLITE_DB_PATH
def clear_old_matches(dry_run: bool = True) -> int:
"""Xóa tất cả matches cũ. Returns số bản ghi bị xóa."""
conn = None
try:
conn = sqlite3.connect(SQLITE_DB_PATH)
cur = conn.cursor()
if dry_run:
cur.execute("SELECT COUNT(*) FROM pg__dashboard_canifa__ai_outfit_product_matches")
count = cur.fetchone()[0]
print(f"[DRY-RUN] Sẽ xóa {count} bản ghi cũ trong pg__dashboard_canifa__ai_outfit_product_matches")
return count
else:
cur.execute("DELETE FROM pg__dashboard_canifa__ai_outfit_product_matches")
deleted = cur.rowcount
conn.commit()
print(f"[EXECUTE] Đã xóa {deleted} bản ghi cũ")
return deleted
finally:
if conn:
conn.close()
def run_batch(limit: int | None = None, dry_run: bool = False):
"""Chạy StylistEngine batch."""
from worker.stylist_engine import StylistEngine, BATCH_STATE
print(f"\n{'='*60}")
print(f"🚀 CANIFA OUTFIT REGENERATION")
print(f"{'='*60}")
print(f"Mode: {'DRY-RUN' if dry_run else 'EXECUTE'}")
print(f"Limit: {limit or 'ALL'}")
print(f"{'='*60}\n")
# Clear old matches
print("🗑️ Clearing old matches...")
clear_old_matches(dry_run=dry_run)
# Get list of codes to process
print("\n📦 Loading catalog...")
engine = StylistEngine()
catalog = engine._get_catalog()
all_codes = [p["code"] for p in catalog]
if limit:
codes = all_codes[:limit]
else:
codes = all_codes
print(f" Found {len(codes)} products to process (from {len(all_codes)} total)")
# Run batch directly with codes list
print("\n🔄 Running StylistEngine batch...\n")
start = time.time()
count = engine.run_batch(codes=codes)
elapsed = time.time() - start
print(f"\n{'='*60}")
print(f"✅ Batch completed in {elapsed:.1f}s")
print(f" Processed: {count} products")
print(f" BATCH_STATE: {BATCH_STATE}")
print(f"{'='*60}\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--limit", type=int, default=None, help="Giới hạn số SP xử lý")
parser.add_argument("--execute", action="store_true", help="Thực sự xóa và lưu (không dry-run)")
args = parser.parse_args()
run_batch(limit=args.limit, dry_run=not args.execute)
"""
Simple test: insert matches for 1 product using only SQLite catalog.
Avoids StarRocks dependency.
"""
import sys
import io
if sys.platform == "win32":
sys.stdout.reconfigure(encoding='utf-8')
import sqlite3
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from common.constants import SQLITE_DB_PATH, TABLE_OUTFIT_RULES, TABLE_PRODUCTS
def get_catalog_from_sqlite() -> list[dict]:
"""Load catalog trực tiếp từ SQLite."""
conn = sqlite3.connect(SQLITE_DB_PATH)
conn.row_factory = sqlite3.Row
cur = conn.cursor()
query = f"""
SELECT magento_ref_code as code,
product_name as name,
master_color as color,
product_line_vn as product_line,
gender_by_product as gender,
age_by_product as age_group,
product_image_url_thumbnail as image
FROM {TABLE_PRODUCTS}
WHERE magento_ref_code IS NOT NULL
AND product_line_vn IS NOT NULL
LIMIT 100
"""
cur.execute(query)
rows = cur.fetchall()
conn.close()
catalog = []
for r in rows:
catalog.append({
"code": r["code"],
"name": r["name"],
"color": r["color"] or "",
"product_line": r["product_line"] or "",
"gender": r["gender"] or "",
"age_group": r["age_group"] or "",
"image": r["image"] or "",
"style_tags": [],
"occasion_tags": [],
"material_tags": [],
"season_tags": [],
})
return catalog
def fetch_rules_for_anchor(anchor_cat: str, gender: str = "") -> list[dict]:
"""Fetch rules from SQLite."""
from common.constants import normalize_gender, SQLITE_GENDER_MAP, TABLE_OUTFIT_RULES
conn = sqlite3.connect(SQLITE_DB_PATH)
conn.row_factory = sqlite3.Row
cur = conn.cursor()
gender_key = normalize_gender(gender)
target_gender = SQLITE_GENDER_MAP.get(gender_key, "unisex")
cur.execute(
f"""SELECT id as rule_id, occasion_tag as occasion, match_role, target_category, ai_reason
FROM {TABLE_OUTFIT_RULES}
WHERE UPPER(anchor_category) = UPPER(?)
AND (gender_target = ? OR gender_target = 'unisex' OR gender_target = 'all')""",
(anchor_cat, target_gender)
)
rows = cur.fetchall()
conn.close()
rules = []
for r in rows:
rules.append({
"rule_id": r["rule_id"],
"occ": r["occasion"],
"role": r["match_role"],
"target_cat": r["target_category"],
"ai_reason": r["ai_reason"] or "",
})
return rules
def simple_score(anchor: dict, target: dict) -> int:
"""Simplified score for testing."""
score = 50
anchor_color = anchor["color"] or ""
target_color = target["color"] or ""
# Simple color harmony
if anchor_color and target_color and anchor_color.lower() == target_color.lower():
score += 20
elif target_color and "đen" in target_color.lower():
score += 10
if target.get("is_new_product"):
score += 20
return score
def generate_reason(anchor: dict, target: dict, rule: dict) -> str:
return f"Màu {target['color']} của {target['product_line']} phối cùng {anchor['color']}. {rule['ai_reason']}"
def test_insert_one_product():
"""Test insert matches for first adult product with rules."""
catalog = get_catalog_from_sqlite()
if not catalog:
print("❌ No catalog found")
return False
# Find first ADULT anchor with rules (avoid kid anchors)
anchor = None
for p in catalog:
age = (p["age_group"] or "").lower()
gender = p["gender"] or ""
# Skip kids
if any(k in age for k in ["kid", "trẻ em", "be_trai", "be_gai"]):
continue
if any(k in gender for k in ["boy", "girl", "bé trai", "bé gái"]):
continue
rules = fetch_rules_for_anchor(p["product_line"], p["gender"])
if rules:
anchor = p
break
if not anchor:
print("❌ No adult anchor with rules found")
return False
print(f"🎯 Testing with anchor: {anchor['code']} - {anchor['product_line']} ({anchor['gender']})")
rules = fetch_rules_for_anchor(anchor["product_line"], anchor["gender"])
print(f"📜 Found {len(rules)} rules")
for r in rules:
print(f" • Rule {r['rule_id']}: {anchor['product_line']} → {r['target_cat']} [{r['occ']}]")
# Build targets by category
targets_by_cat = {}
for t in catalog:
if t["code"] == anchor["code"]:
continue
cat = t["product_line"]
if cat not in targets_by_cat:
targets_by_cat[cat] = []
targets_by_cat[cat].append(t)
print(f"\n📦 Target categories found: {list(targets_by_cat.keys())[:10]}")
# Compute matches
rows = []
for rule in rules:
target_cat = rule["target_cat"]
targets = targets_by_cat.get(target_cat, [])
print(f" 🔍 Looking for target category '{target_cat}': {len(targets)} items")
for target in targets:
# Age filter
anchor_age = (anchor["age_group"] or "").strip().lower()
target_age = (target["age_group"] or "").strip().lower()
print(f" → Candidate: {target['code']} ({target['product_line']}), age_group='{target_age}'")
if anchor_age and target_age and anchor_age != target_age:
print(f" ✗ Age filter: anchor='{anchor_age}' != target='{target_age}'")
continue
score = simple_score(anchor, target)
print(f" Score: {score}")
if score < 35:
print(f" ✗ Score too low (<35)")
continue
reason = generate_reason(anchor, target, rule)
print(f" ✓ MATCH! score={score}")
rows.append((
rule["rule_id"],
anchor["code"],
target["code"],
target["name"],
target["gender"],
target["age_group"],
rule["role"],
score,
reason,
rule["occ"],
))
for target in targets:
# Age filter
anchor_age = (anchor["age_group"] or "").strip().lower()
target_age = (target["age_group"] or "").strip().lower()
if anchor_age and target_age and anchor_age != target_age:
continue
score = simple_score(anchor, target)
reason = generate_reason(anchor, target, rule)
rows.append((
rule["rule_id"],
anchor["code"],
target["code"],
target["name"],
target["gender"],
target["age_group"],
rule["role"],
score,
reason,
rule["occ"],
))
# Insert to DB
if rows:
conn = sqlite3.connect(SQLITE_DB_PATH)
cur = conn.cursor()
cur.executemany('''
INSERT INTO pg__dashboard_canifa__ai_outfit_product_matches
(rule_id, anchor_product_code, match_product_code, match_product_name, match_gender, match_age, match_role, score, ai_reason, occasion)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', rows)
conn.commit()
conn.close()
print(f"✅ Inserted {len(rows)} matches for {anchor['code']}")
return True
else:
print("⚠️ No matches generated")
return False
if __name__ == "__main__":
test_insert_one_product()
......@@ -17,15 +17,11 @@ import sqlite3
import time
from typing import Optional
from common.constants import KID_SAFE_KEYWORDS, SQLITE_DB_PATH, SQLITE_GENDER_MAP, TABLE_OUTFIT_RULES, normalize_gender
from common.pool_wrapper import get_pooled_connection_compat
from common.starrocks_connection import get_db_connection
from common.constants import KID_SAFE_KEYWORDS, SQLITE_DB_PATH, SQLITE_GENDER_MAP, TABLE_OUTFIT_RULES, TABLE_PRODUCTS, normalize_gender
logger = logging.getLogger(__name__)
RULES_PATH = os.path.join(os.path.dirname(__file__), "fashion_rules.json")
TABLE = "dashboard_canifa.ultra_descriptions"
SR_TABLE = "shared_source.magento_product_dimension_with_text_embedding"
# ═══ Batch state (used by API for polling) ═══
......@@ -60,7 +56,7 @@ class StylistEngine:
logger.warning("[Stylist] Product not found in catalog: %s", code)
return {}
matches = self._compute_matches(source, catalog)
matches = self.compute_dynamic_rule_matches(code)
self._save_matches(code, matches)
logger.info("[Stylist] ✅ %s → %d matches across occasions", code, sum(
len(items) for occ in matches.values() for items in occ.values()
......@@ -87,11 +83,7 @@ class StylistEngine:
for code in targets:
BATCH_STATE["current_code"] = code
try:
source = next((p for p in catalog if p["code"] == code), None)
if not source:
BATCH_STATE["errors"] += 1
continue
matches = self._compute_matches(source, catalog)
matches = self.compute_dynamic_rule_matches(code)
self._save_matches(code, matches)
count += 1
BATCH_STATE["done"] += 1
......@@ -115,6 +107,7 @@ class StylistEngine:
def _compute_matches(self, source: dict, catalog: list[dict]) -> dict:
"""Compute matches for one source product across all occasions + roles based on DB rules.
Falls back to fallback_occasion_rules in JSON when DB table is empty.
NOTE: This method does NOT include rule_id (use compute_dynamic_rule_matches for DB saves).
"""
rules = self.rules
roles = rules["roles"]
......@@ -169,6 +162,8 @@ class StylistEngine:
"image": target.get("image", ""),
"score": score,
"reason": reason,
"occasion": occ,
"rule_id": None,
})
# Pick top N per occasion+role, sort by score
......@@ -260,6 +255,9 @@ class StylistEngine:
"score": score,
"reason": combined_reason,
"breakdown": breakdown,
"occasion": occ,
"rule_id": r["rule_id"],
"role": tgt_role,
})
break # Found matching rule for this occasion, move to next occasion
......@@ -278,7 +276,7 @@ class StylistEngine:
return result
def _fetch_db_rules(self, anchor_cat: str, gender: str = "") -> list[dict]:
"""Fetch full rules including occasion, match_role, target_category, ai_reason from SQLite."""
"""Fetch full rules including occasion, match_role, target_category, ai_reason, id from SQLite."""
conn = None
try:
conn = sqlite3.connect(SQLITE_DB_PATH)
......@@ -288,7 +286,7 @@ class StylistEngine:
target_gender = SQLITE_GENDER_MAP.get(gender_key, "unisex")
cur.execute(
f"""SELECT occasion_tag as occasion, match_role, target_category, ai_reason
f"""SELECT id, occasion_tag as occasion, match_role, target_category, ai_reason
FROM {TABLE_OUTFIT_RULES}
WHERE UPPER(anchor_category) = UPPER(?)
AND (gender_target = ? OR gender_target = 'unisex' OR gender_target = 'all')""",
......@@ -298,10 +296,11 @@ class StylistEngine:
rules = []
for row in cur.fetchall():
rules.append({
"occ": row[0],
"role": row[1],
"target_cat": row[2],
"ai_reason": row[3] or "",
"rule_id": row[0],
"occ": row[1],
"role": row[2],
"target_cat": row[3],
"ai_reason": row[4] or "",
})
cur.close()
return rules
......@@ -773,67 +772,48 @@ class StylistEngine:
# ──────────────────────────────────────────
def _get_catalog(self, force_reload: bool = False) -> list[dict]:
"""Load product catalog from StarRocks + description_data from Postgres."""
"""Load product catalog from SQLite only (offline mode)."""
if self._catalog and not force_reload:
return self._catalog
logger.info("[Stylist] Loading catalog...")
sr_db = get_db_connection()
# Fetch product basics from StarRocks
rows = sr_db.execute_query(f"""
SELECT
magento_ref_code AS code,
ANY_VALUE(product_name) AS name,
ANY_VALUE(master_color) AS color,
ANY_VALUE(product_line_vn) AS product_line,
ANY_VALUE(gender_by_product) AS gender,
ANY_VALUE(age_by_product) AS age_group,
ANY_VALUE(product_image_url_thumbnail) AS image
FROM {SR_TABLE}
logger.info("[Stylist] Loading catalog from SQLite...")
conn = sqlite3.connect(SQLITE_DB_PATH)
conn.row_factory = sqlite3.Row
cur = conn.cursor()
# Fetch product basics from SQLite
query = f"""
SELECT magento_ref_code as code,
product_name as name,
master_color as color,
product_line_vn as product_line,
gender_by_product as gender,
age_by_product as age_group,
product_image_url_thumbnail as image
FROM {TABLE_PRODUCTS}
WHERE magento_ref_code IS NOT NULL
AND product_line_vn IS NOT NULL
GROUP BY magento_ref_code
""")
# Fetch style/occasion tags from Postgres ultra_descriptions
pg_conn = get_pooled_connection_compat()
pg_cur = pg_conn.cursor()
pg_cur.execute(f"""
SELECT magento_ref_code, description_data
FROM {TABLE}
WHERE magento_ref_code IS NOT NULL
""")
desc_map: dict[str, dict] = {}
for r in pg_cur.fetchall():
code, dd = r
if isinstance(dd, str):
try:
dd = json.loads(dd)
except Exception:
dd = {}
if isinstance(dd, dict):
desc_map[code] = dd
pg_cur.close()
pg_conn.close()
# Build catalog with enriched tags
"""
cur.execute(query)
rows = cur.fetchall()
cur.close()
conn.close()
# Build catalog (no style/occasion tags from ultra_descriptions to avoid Postgres)
catalog = []
for row in rows:
code = row["code"]
dd = desc_map.get(code, {})
catalog.append({
"code": code,
"code": row["code"],
"name": row["name"] or "",
"color": row["color"] or "",
"product_line": row["product_line"] or "",
"gender": row["gender"] or "",
"age_group": row["age_group"] or "",
"image": row["image"] or "",
"style_tags": self._extract_tags(dd.get("phong_cach", ""), dd.get("tags", "")),
"occasion_tags": self._extract_occasion(dd.get("dip_mac", ""), dd.get("mua", "")),
"material_tags": self._extract_tags(dd.get("vat_lieu", "")),
"season_tags": self._extract_tags(dd.get("mua", "")),
"style_tags": [],
"occasion_tags": [],
"material_tags": [],
"season_tags": [],
})
self._catalog = catalog
......@@ -859,12 +839,63 @@ class StylistEngine:
# ──────────────────────────────────────────
def _save_matches(self, code: str, matches: dict) -> bool:
"""DEPRECATED: cột ai_matches trong ultra_descriptions đã bỏ.
Data phối đồ giờ lưu ở bảng ai_outfit_product_matches
(xem outfit_matcher_worker.py).
"""
logger.debug("[Stylist] _save_matches SKIPPED (deprecated) for %s", code)
return True
"""Save matches to ai_outfit_product_matches table."""
conn = None
try:
conn = sqlite3.connect(SQLITE_DB_PATH)
cur = conn.cursor()
# Get full catalog to look up target details (gender, age)
catalog = self._get_catalog()
source = next((p for p in catalog if p["code"] == code), None)
if not source:
logger.error("[Stylist] Anchor product not found: %s", code)
return False
# Build flat list of rows to insert
rows = []
for occ, occ_data in matches.items():
for role, items in occ_data.items():
for item in items:
# Skip items without rule_id (fallback path)
if item.get("rule_id") is None:
continue
# Find target product in catalog to get gender and age
target = next((p for p in catalog if p["code"] == item["code"]), None)
if not target:
continue
rows.append((
item["rule_id"],
code, # anchor_product_code
item["code"], # match_product_code
item["name"], # match_product_name
target.get("gender", ""), # match_gender
target.get("age_group", ""), # match_age
item["role"], # match_role
item["score"],
item["reason"],
item["occasion"], # occasion (NOT occasion_tag)
))
if rows:
cur.executemany('''
INSERT INTO pg__dashboard_canifa__ai_outfit_product_matches
(rule_id, anchor_product_code, match_product_code, match_product_name, match_gender, match_age, match_role, score, ai_reason, occasion)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', rows)
conn.commit()
logger.info("[Stylist] Saved %d matches for %s", len(rows), code)
else:
logger.debug("[Stylist] No matches to save for %s", code)
return True
except Exception as e:
logger.error("[Stylist] Save matches error: %s", e)
if conn:
conn.rollback()
return False
finally:
if conn:
conn.close()
# ── Public aliases for score-test API ───────
def _load_catalog(self) -> list[dict]:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment