Commit 94484304 authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

update

parent f9b10ebe
""" """
Fashion Q&A Agent Package Fashion Q&A Agent Package
""" """
from .graph import build_graph from .models import AgentConfig, AgentState, get_config
from .models import AgentConfig, AgentState, get_config
__all__ = [ def build_graph(*args, **kwargs):
"AgentConfig", from .graph import build_graph as _build_graph
"AgentState",
return _build_graph(*args, **kwargs)
__all__ = [
"AgentConfig",
"AgentState",
"build_graph", "build_graph",
"get_config", "get_config",
] ]
This diff is collapsed.
...@@ -7,8 +7,14 @@ import json ...@@ -7,8 +7,14 @@ import json
import logging import logging
import re import re
import time import time
from fastapi import BackgroundTasks
from langfuse import Langfuse, get_client as get_langfuse
from common.cache import redis_cache from common.cache import redis_cache
from common.conversation_manager import get_conversation_manager
from common.langfuse_client import async_flush_langfuse, get_callback_handler
from agent.helper import handle_post_chat_async, extract_product_ids
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -150,8 +156,6 @@ def extract_products_from_state(final_state_snapshot: dict) -> list[dict]: ...@@ -150,8 +156,6 @@ def extract_products_from_state(final_state_snapshot: dict) -> list[dict]:
Returns: Returns:
list[dict]: List of product objects with name, price, sale_price, etc. list[dict]: List of product objects with name, price, sale_price, etc.
""" """
from .helper import extract_product_ids
enriched_products = [] enriched_products = []
try: try:
...@@ -176,3 +180,69 @@ def extract_products_from_state(final_state_snapshot: dict) -> list[dict]: ...@@ -176,3 +180,69 @@ def extract_products_from_state(final_state_snapshot: dict) -> list[dict]:
logger.error(f"❌ Error extracting products from state: {e}") logger.error(f"❌ Error extracting products from state: {e}")
return enriched_products return enriched_products
async def check_response_cache(
identity_key: str,
query: str,
background_tasks: BackgroundTasks,
model_name: str,
is_authenticated: bool,
device_id: str | None = None,
) -> dict | None:
"""
Helper to handle the cache layer logic: Redis lookup, Insight merge, Langfuse tracing, and background tasks.
Returns a dict response if HIT, otherwise None.
"""
# 1. Check Redis for cached response
cached_response = await redis_cache.get_response(user_id=identity_key, query=query)
if not cached_response:
return None
logger.info(f"🚀 [Cache Layer] HIT | key={identity_key}")
# 2. Merge User Insight (if any)
cached_insight = await load_user_insight_from_redis(identity_key)
if cached_insight:
cached_response = {**cached_response, "user_insight": cached_insight}
# 3. Log Langfuse Trace (Cache Hit)
try:
langfuse = get_langfuse()
if langfuse:
trace_id = Langfuse.create_trace_id()
session_id = f"{identity_key}-cache"
tags = ["chatbot", "cache-hit", "user:authenticated" if is_authenticated else "user:anonymous"]
langfuse.trace(
id=trace_id,
name="chat-request-cache-hit",
user_id=identity_key,
session_id=session_id,
tags=tags,
input={"query": query, "user_id": identity_key},
output={
"ai_response": (cached_response.get("ai_response", "") or "")[:500],
"product_count": len(cached_response.get("product_ids", [])),
"source": "cache",
},
metadata={
"device_id": device_id,
"customer_id": identity_key if is_authenticated else None,
"model": model_name,
"cache_hit": True,
},
)
except Exception:
logger.debug("Failed to log cache hit trace", exc_info=True)
# 4. Background Task (Persist conversation history for cache hit)
memory = await get_conversation_manager()
background_tasks.add_task(
handle_post_chat_async,
memory=memory,
identity_key=identity_key,
human_query=query,
ai_response=cached_response,
)
return {**cached_response, "cached": True}
This diff is collapsed.
...@@ -2,7 +2,7 @@ from typing import Annotated, Any, TypedDict ...@@ -2,7 +2,7 @@ from typing import Annotated, Any, TypedDict
from langchain_core.messages import BaseMessage from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages from langgraph.graph.message import add_messages
from pydantic import BaseModel from pydantic import BaseModel, Field
import config as global_config import config as global_config
...@@ -56,3 +56,71 @@ def get_config() -> AgentConfig: ...@@ -56,3 +56,71 @@ def get_config() -> AgentConfig:
langfuse_secret_key=global_config.LANGFUSE_SECRET_KEY, langfuse_secret_key=global_config.LANGFUSE_SECRET_KEY,
langfuse_base_url=global_config.LANGFUSE_BASE_URL, langfuse_base_url=global_config.LANGFUSE_BASE_URL,
) )
# ==============================================================================
# STYLIST PRO MODELS (Architecture 2.0)
# ==============================================================================
class RawSearchArgs(BaseModel):
"""Lane 1: Trích xuất từ khóa gốc, giữ nguyên ý định khách, sửa lỗi typo."""
raw_text: str = Field(default="", description="Core keywords from user message")
class StructuredSearchArgs(BaseModel):
"""Lane 2: Suy luận bộ lọc có cấu trúc từ ngữ cảnh."""
product_line_vn: list[str] = Field(default_factory=list, description="Áo phông, Quần jean...")
gender_target: str | None = Field(default=None, description="men/women/boy/girl/unisex")
age_group: str | None = Field(default=None, description="adult/kid")
master_color: str | None = Field(default=None, description="Màu sắc suy luận")
tags: list[str] = Field(default_factory=list)
keywords: list[str] = Field(default_factory=list)
price_min: int | None = Field(default=None)
price_max: int | None = Field(default=None)
discount_min: int | None = Field(default=None)
discovery_mode: str | None = Field(default=None)
class ClassifierSearchArgs(BaseModel):
"""Tổng hợp 2 Lane Search."""
literal: RawSearchArgs = Field(default_factory=RawSearchArgs)
inferred: StructuredSearchArgs = Field(default_factory=StructuredSearchArgs)
magento_ref_code: str | None = Field(default=None)
class StylistProInsight(BaseModel):
"""Bộ nhớ khách hàng - 12 trường cốt lõi."""
USER: str | None = Field(default="Chưa rõ")
TARGET: str | None = Field(default="Chưa rõ")
GOAL: str | None = Field(default="Chưa rõ")
CONSTRAINTS: str | None = Field(default="")
STAGE: str | None = Field(default="BROWSE")
STAGE_NUM: int = Field(default=1)
TONE: str | None = Field(default="Friendly")
BEHAVIORAL_HINTS: list[str] = Field(default_factory=list)
LATEST_PRODUCT_INTEREST: str | None = Field(default="")
LAST_ACTION: str | None = Field(default="")
SUMMARY_HISTORY: str | None = Field(default="")
class ClassifierOutput(BaseModel):
"""Kết quả từ node Classifier."""
reasoning: str
tool_name: str | None = None
search_args: ClassifierSearchArgs | None = None
tool_args: dict[str, Any] | None = None
ai_response: str | None = None
product_ids: list[str] = Field(default_factory=list)
class StylistOutput(BaseModel):
"""Kết quả từ node Stylist."""
ai_response: str
product_ids: list[str] = Field(default_factory=list)
user_insight: StylistProInsight
class StylistProState(TypedDict):
"""Trạng thái luồng chạy của Stylist Pro."""
messages: Annotated[list[BaseMessage], add_messages]
user_insight: str | None
tool_name_used: str | None
tool_result: str | None
updated_insight: dict[str, Any] | None
lead_stage: dict[str, Any] | None
stage_injection: str | None
product_ids: list[str]
early_exit: bool | None
diagnostics: Annotated[list[dict[str, Any]], lambda x, y: x + y]
""" """
Agent Nodes Package Agent Nodes Package (Architecture 2.0)
""" """
from .agent import agent_node from .classifier_node import classifier_node
from .stylist_node import stylist_node
__all__ = ["agent_node"] from .models_node import StylistProState
__all__ = [
"classifier_node",
"stylist_node",
"StylistProState"
]
import json
import logging
import time
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.runnables import RunnableConfig
from common.llm_factory import create_llm
from agent.nodes.models_node import StylistProState, ClassifierOutput
from agent.nodes.utils import _extract_text, _format_history, parse_llm_json
from agent.prompt_module.stylist_pro_prompts import STYLIST_PRO_CLASSIFIER_PROMPT
logger = logging.getLogger(__name__)
async def classifier_node(state: StylistProState, config: RunnableConfig):
"""
Classifier Node: Định tuyến intent và thực thi Tool inline.
Sử dụng data_retrieval_tool làm công cụ tìm kiếm chính.
"""
llm = create_llm(model_name=config.get("configurable", {}).get("model_name"), streaming=False)
messages = state["messages"]
user_query = _extract_text(messages[-1].content)
user_insight = state.get("user_insight") or "{}"
history_text = _format_history(messages[:-1])
# Context preparation
products_context = ""
try:
if isinstance(user_insight, str) and user_insight.startswith("{"):
_ins = json.loads(user_insight)
latest = _ins.get("LATEST_PRODUCT_INTEREST", "")
summary = _ins.get("SUMMARY_HISTORY", "")
if latest: products_context += f"\n=== SP ĐÃ GIỚI THIỆU ===\n{latest}\n"
if summary: products_context += f"\n=== TÓM TẮT HỘI THOẠI ===\n{summary}\n"
except Exception: pass
context_block = (
f"=== TIN NHẮN HIỆN TẠI ===\n{user_query}\n\n"
f"=== USER INSIGHT ===\n{user_insight}\n\n"
f"{products_context}"
f"=== LỊCH SỬ ===\n{history_text}"
)
prompt = [
SystemMessage(content=STYLIST_PRO_CLASSIFIER_PROMPT),
HumanMessage(content=context_block),
]
start_time = time.time()
response = await llm.ainvoke(prompt, config=config)
elapsed = (time.time() - start_time) * 1000
data = parse_llm_json(response.content)
# --- PHÒNG THỦ: Dọn dẹp dữ liệu trước khi nạp vào Pydantic ---
if "search_args" in data and "inferred" in data["search_args"]:
inferred = data["search_args"]["inferred"]
# Các trường mong đợi String/Int nhưng hay bị AI trả về List rỗng []
str_fields = ["gender_target", "age_group", "master_color", "price_min", "price_max"]
for field in str_fields:
if field in inferred and (isinstance(inferred[field], list) or isinstance(inferred[field], dict)):
if isinstance(inferred[field], list):
inferred[field] = inferred[field][0] if len(inferred[field]) > 0 else None
else:
inferred[field] = None
# Xử lý product_line_vn (đảm bảo luôn là List)
if "product_line_vn" in inferred and isinstance(inferred["product_line_vn"], str):
inferred["product_line_vn"] = [inferred["product_line_vn"]]
output = ClassifierOutput(**data)
# Tool Registry
tool_name = output.tool_name
tool_result = None
diagnostics = [{
"step": "classifier",
"label": f"🎯 Classifier -> {tool_name or 'Early Exit'}",
"content": output.reasoning,
"elapsed_ms": round(elapsed)
}]
# Prepare tool_args
tool_args = {}
if tool_name:
# Case A: Search Tools (Data Retrieval) - UPDATED FOR DUAL-LANE
if (tool_name == "lead_search_tool" or tool_name == "data_retrieval_tool") and output.search_args:
inf = output.search_args.inferred
tool_args = {
"searches": [
{
"literal": {"raw_text": output.search_args.literal.raw_text},
"inferred": {
"product_line_vn": inf.product_line_vn or [],
"gender_by_product": inf.gender_target,
"age_by_product": inf.age_group,
"master_color": inf.master_color,
"price_min": inf.price_min,
"price_max": inf.price_max,
"discount_min": 0,
"discount_max": 100,
"discovery_mode": None
}
}
]
}
# Case B: Store Search (Ensure 'location' exists)
elif tool_name == "canifa_store_search":
loc = (output.tool_args or {}).get("location") or user_query
tool_args = {"location": loc}
# Case C: Knowledge Search (Ensure 'query' exists)
elif tool_name == "canifa_knowledge_search":
q = (output.tool_args or {}).get("query") or user_query
tool_args = {"query": q}
# Case D: Other Specific Tools
elif output.tool_args:
tool_args = output.tool_args
return {
"tool_name_used": tool_name,
"tool_args": tool_args,
"early_exit": tool_name is None and output.ai_response is not None,
"product_ids": output.product_ids,
"messages": [AIMessage(content=output.ai_response)] if output.ai_response else [],
"diagnostics": diagnostics
}
from typing import Annotated, Any, TypedDict, Optional, List
from pydantic import BaseModel, Field
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
# ==============================================================================
# SEARCH MODELS
# ==============================================================================
class LiteralSearchArgs(BaseModel):
raw_text: str = ""
class InferredSearchArgs(BaseModel):
product_line_vn: List[str] = Field(default_factory=list)
gender_target: Optional[str] = None
age_group: Optional[str] = None
master_color: Optional[str] = None
tags: List[str] = Field(default_factory=list)
keywords: List[str] = Field(default_factory=list)
price_min: Optional[int] = None
price_max: Optional[int] = None
class ClassifierSearchArgs(BaseModel):
literal: LiteralSearchArgs = Field(default_factory=LiteralSearchArgs)
inferred: InferredSearchArgs = Field(default_factory=InferredSearchArgs)
magento_ref_code: Optional[str] = None
# ==============================================================================
# NODE OUTPUT MODELS
# ==============================================================================
class ClassifierOutput(BaseModel):
reasoning: str
tool_name: Optional[str] = None
search_args: Optional[ClassifierSearchArgs] = None
tool_args: Optional[dict] = None
ai_response: Optional[str] = None
product_ids: List[str] = Field(default_factory=list)
class StylistProInsight(BaseModel):
USER: Optional[str] = "Chưa rõ"
TARGET: Optional[str] = "Chưa rõ"
GOAL: Optional[str] = "Chưa rõ"
CONSTRAINTS: Optional[str] = None
STAGE: Optional[str] = "BROWSE"
STAGE_NUM: int = 1
TONE: Optional[str] = "Friendly"
LATEST_PRODUCT_INTEREST: Optional[str] = ""
SUMMARY_HISTORY: Optional[str] = ""
BEHAVIORAL_HINTS: List[str] = Field(default_factory=list)
class StylistOutput(BaseModel):
ai_response: str
product_ids: List[str] = Field(default_factory=list)
user_insight: StylistProInsight
# ==============================================================================
# STATE DEFINITION
# ==============================================================================
class StylistProState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
user_insight: Optional[str]
tool_name_used: Optional[str]
tool_args: Optional[dict] # Added for 3-node flow
tool_result: Optional[str]
updated_insight: Optional[dict]
lead_stage: Optional[dict]
stage_injection: Optional[str]
product_ids: List[str]
early_exit: Optional[bool]
diagnostics: Annotated[list[dict[str, Any]], lambda x, y: x + y]
import json
import logging
import time
from typing import Dict, Any
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.runnables import RunnableConfig
from common.llm_factory import create_llm
from agent.nodes.models_node import StylistProState, StylistOutput
from agent.nodes.utils import _extract_skus_from_text, _dedupe_preserve_order, _format_tool_result, parse_llm_json
from agent.prompt_module.stylist_pro_prompts import STYLIST_PRO_RESPONDER_PROMPT, format_stylist_pro_injection
logger = logging.getLogger(__name__)
async def stylist_node(state: StylistProState, config: RunnableConfig):
"""
Stylist Node: Sinh câu trả lời tư vấn và cập nhật Insight khách hàng.
"""
llm = create_llm(model_name=config.get("configurable", {}).get("model_name"), streaming=False)
insight_dict = json.loads(state.get("user_insight") or "{}")
injection = format_stylist_pro_injection(insight_dict)
system_content = STYLIST_PRO_RESPONDER_PROMPT.format(stage_injection=injection)
messages = [SystemMessage(content=system_content)]
# History context (3 turns)
human_count = 0
recent = []
for msg in reversed(state["messages"][:-1]):
if isinstance(msg, HumanMessage):
recent.insert(0, msg)
human_count += 1
elif isinstance(msg, AIMessage) and not (hasattr(msg, "tool_calls") and msg.tool_calls):
recent.insert(0, msg)
if human_count >= 3: break
messages.extend(recent)
messages.append(state["messages"][-1])
# Tool context
tool_context = _format_tool_result(state.get("tool_result"))
if tool_context:
messages.append(HumanMessage(content=f"=== KẾT QUẢ TRA CỨU ===\n{tool_context}"))
# Extra instructions
messages.append(HumanMessage(content="REMINDER: Output ONLY JSON. List ALL SKUs in product_ids. Ensure Top+Bottom for combos."))
start_time = time.time()
response = await llm.ainvoke(messages, config=config)
elapsed = (time.time() - start_time) * 1000
data = parse_llm_json(response.content)
output = StylistOutput(**data)
# Enrichment & Deduplication
text_skus = _extract_skus_from_text(output.ai_response)
final_product_ids = _dedupe_preserve_order((output.product_ids or []) + text_skus)
# Insight mapping
insight_raw = output.user_insight
insight_dict = insight_raw.model_dump() if hasattr(insight_raw, 'model_dump') else insight_raw
lead_stage = {
"stage": insight_dict.get("STAGE_NUM", 1),
"stage_name": insight_dict.get("STAGE", "BROWSE"),
"tone_directive": insight_dict.get("TONE", "Friendly"),
"behavioral_hints": insight_dict.get("BEHAVIORAL_HINTS", []),
}
diag = {
"step": "stylist",
"label": "💬 Stylist Response",
"content": output.ai_response[:200],
"elapsed_ms": round(elapsed)
}
return {
"updated_insight": insight_dict,
"lead_stage": lead_stage,
"stage_injection": format_stylist_pro_injection(insight_dict),
"product_ids": final_product_ids,
"messages": [AIMessage(content=output.ai_response)],
"diagnostics": [diag]
}
import json
import logging
import time
from langchain_core.runnables import RunnableConfig
from agent.nodes.models_node import StylistProState
from agent.tools.data_retrieval_tool import data_retrieval_tool
logger = logging.getLogger(__name__)
def _get_tool_registry():
from agent.tools.brand_knowledge_tool import canifa_knowledge_search
from agent.tools.check_is_stock import check_is_stock
from agent.tools.promotion_canifa_tool import canifa_get_promotions
from agent.tools.store_search_tool import canifa_store_search
return {
"lead_search_tool": data_retrieval_tool,
"data_retrieval_tool": data_retrieval_tool,
"canifa_knowledge_search": canifa_knowledge_search,
"canifa_get_promotions": canifa_get_promotions,
"canifa_store_search": canifa_store_search,
"check_is_stock": check_is_stock,
}
async def tools_node(state: StylistProState, config: RunnableConfig):
"""
Tools Node: Thực thi Tool dựa trên tool_name_used và tool_args từ Classifier.
"""
tool_name = state.get("tool_name_used")
tool_args = state.get("tool_args") or {}
tool_result = None
diagnostics = []
if not tool_name:
return {"tool_result": None, "diagnostics": []}
tool_registry = _get_tool_registry()
tool_fn = tool_registry.get(tool_name)
if tool_fn:
t_start = time.time()
try:
logger.info(f"🛠️ [ToolsNode] Executing '{tool_name}' with args: {tool_args}")
# Execute Tool
tool_result = await tool_fn.ainvoke(tool_args)
tool_elapsed = (time.time() - t_start) * 1000
diagnostics.append({
"step": "tool_result",
"label": f"📦 {tool_name}",
"content": f"Success: {str(tool_result)[:100]}...",
"raw_json": tool_result,
"elapsed_ms": round(tool_elapsed)
})
logger.info(f"✅ [ToolsNode] '{tool_name}' finished in {round(tool_elapsed)}ms")
except Exception as e:
logger.error(f"❌ [ToolsNode] Execution Error in '{tool_name}': {e}")
tool_result = json.dumps({"status": "error", "message": str(e)})
diagnostics.append({
"step": "tool_error",
"label": f"❌ {tool_name} Error",
"content": str(e),
"elapsed_ms": 0
})
else:
logger.warning(f"⚠️ [ToolsNode] Unknown tool: '{tool_name}'")
return {
"tool_result": tool_result,
"diagnostics": diagnostics
}
import json
import logging
import re
from typing import Dict, Any, List, Optional
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
logger = logging.getLogger(__name__)
def _extract_text(content) -> str:
"""Extract plain text từ msg.content."""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
parts.append(item.get("text", ""))
elif isinstance(item, str):
parts.append(item)
return "\n".join(parts) if parts else str(content)
return str(content or "")
_SKU_REGEX = re.compile(r"\b\d[A-Z0-9]{5,}(?:-[A-Z0-9]{2,}){0,2}\b", re.IGNORECASE)
def _dedupe_preserve_order(items: list[str]) -> list[str]:
seen: set[str] = set()
out: list[str] = []
for raw in items:
val = str(raw or "").upper().strip()
if not val or val in seen:
continue
seen.add(val)
out.append(val)
return out
def _extract_skus_from_text(text: str | None) -> list[str]:
if not text:
return []
matches = _SKU_REGEX.findall(text)
return _dedupe_preserve_order([m for m in matches if m])
def _format_history(messages: list[BaseMessage], max_turns: int = 10) -> str:
"""Format recent messages cho context."""
lines = []
human_count = 0
for msg in reversed(messages):
if isinstance(msg, HumanMessage):
lines.insert(0, f"Khách: {_extract_text(msg.content)[:500]}")
human_count += 1
elif isinstance(msg, AIMessage) and not (hasattr(msg, "tool_calls") and msg.tool_calls):
lines.insert(0, f"Bot: {_extract_text(msg.content)[:500]}")
if human_count >= max_turns:
break
return "\n".join(lines) if lines else "Chưa có lịch sử."
def _format_tool_result(tool_result: str | None) -> str:
"""Format kết quả từ tool thành text xịn cho Stylist."""
if not tool_result:
return ""
try:
parsed = json.loads(tool_result)
if parsed.get("status") == "success":
products = parsed.get("products", [])
if products:
dedup = {}
for p in products:
sku = str(p.get("sku_code") or p.get("sku") or p.get("internal_ref_code") or "").upper().strip()
if sku and sku not in dedup:
dedup[sku] = p
unique_products = list(dedup.values())
lines = [f"✅ Tìm thấy {len(unique_products)} sản phẩm:"]
for p in unique_products[:8]:
price = p.get("price", 0)
discount = p.get("discount", "")
line = f"- [{p.get('sku', '')}] {p.get('name', '')} | Giá: {price:,}đ"
if discount:
line += f" ({discount})"
if p.get("sizes"):
line += f" | Sizes: {p['sizes']}"
desc = (p.get("description") or "").strip()
if desc:
line += f"\n 📝 {desc[:200]}"
outfit = p.get("outfit_recommendations")
if outfit and isinstance(outfit, list):
outfit_lines = []
for o in outfit[:3]:
outfit_lines.append(
f"{o.get('role', 'match')}: {o.get('match_product_name')} ({o.get('match_product_code')})"
)
if outfit_lines:
line += f"\n 👗 outfit_recommendations: {', '.join(outfit_lines)}"
lines.append(line)
return "\n".join(lines)
return parsed.get("message", "Không tìm thấy sản phẩm phù hợp.")
content = parsed.get("content") or parsed.get("message") or str(parsed)
return str(content)[:800]
except Exception:
return (tool_result or "")[:500]
def parse_llm_json(text: str) -> Dict[str, Any]:
"""Senior Parser: Cleanly extract JSON from LLM response."""
text = text.strip()
if text.startswith('```json'): text = text[7:]
if text.startswith('```'): text = text[3:]
if text.endswith('```'): text = text[:-3]
text = text.strip()
start = text.find("{")
end = text.rfind("}")
if start != -1 and end != -1:
text = text[start : end + 1]
try:
return json.loads(text)
except json.JSONDecodeError:
logger.error(f"❌ Failed to parse JSON: {text[:200]}")
raise
"""
Push ALL prompts (modules + tools) to PRODUCTION Langfuse.
Usage: py push_prod.py
"""
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
sys.stdout.reconfigure(encoding="utf-8")
# ---- PRODUCTION Langfuse credentials ----
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-bd0c7202-a7f4-4399-a36e-65ebe2e35104"
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-2bb249b1-77e3-4309-8954-312b8fb2fff9"
os.environ["LANGFUSE_BASE_URL"] = "http://172.16.2.207:3009"
from langfuse import Langfuse
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
PROMPT_DIR = os.path.dirname(BASE_DIR) # prompt_module/ (parent of push_production.py/)
TOOL_DIR = os.path.join(os.path.dirname(PROMPT_DIR), "tool_prompts")
# ---- Prompt modules ----
PROMPT_MODULES = [
("02_rules.txt", "canifa-02-rules", ["canifa", "system-core"]),
("03_context.txt", "canifa-03-context", ["canifa", "system-core"]),
("04a_sales_core.txt", "canifa-04a-sales-core", ["canifa", "system-sales"]),
("04b_sales_thaomai.txt", "canifa-04b-sales-thaomai", ["canifa", "system-sales"]),
("04c_sales_upsell.txt", "canifa-04c-sales-upsell", ["canifa", "system-sales"]),
("04d_sales_urgency.txt", "canifa-04d-sales-urgency", ["canifa", "system-sales"]),
("05_tool_routing.txt", "canifa-05-tool-routing", ["canifa", "system-core"]),
("05b_tool_results.txt", "canifa-05b-tool-results", ["canifa", "system-core"]),
("05c_comparison.txt", "canifa-05c-comparison", ["canifa", "system-core"]),
("06_user_insight.txt", "canifa-06-user-insight", ["canifa", "system-core"]),
("07_output_format.txt", "canifa-07-output-format", ["canifa", "system-core"]),
]
CORE_FILE = "01_core.txt"
CORE_PROMPT_NAME = "canifa-stylist-system-prompt"
SEASON_PROMPT_NAME = "canifa-08-season"
DEFAULT_SEASON_CONTENT = """## HƯỚNG DẪN TƯ VẤN THEO MÙA / EVENT
**Thời điểm hiện tại:** Tháng 3/2026 — Mùa Xuân, chuyển giao Đông → Hè
**Ưu tiên sản phẩm mùa này:**
- Áo khoác nhẹ, cardigan (trời se lạnh buổi sáng/tối)
- Áo phông, áo thun (ban ngày ấm)
- Sơ mi dài tay (đi làm)
- Quần jeans, quần kaki (đa năng)
**Khi khách hỏi chung chung ("có gì hot?", "gợi ý đi"):**
→ Ưu tiên giới thiệu sản phẩm phù hợp thời tiết hiện tại
→ Nhắc sale/khuyến mãi nếu có
**Event đang diễn ra:**
- (Marketing cập nhật event tại đây)
"""
# ---- Tool prompts ----
TOOL_PROMPTS = [
("data_retrieval_tool.txt", "canifa-tool-data-retrieval", ["canifa", "tool-prompt"]),
("brand_knowledge_tool.txt", "canifa-tool-brand-knowledge", ["canifa", "tool-prompt"]),
("check_is_stock.txt", "canifa-tool-check-is-stock", ["canifa", "tool-prompt"]),
("store_search_tool.txt", "canifa-tool-store-search", ["canifa", "tool-prompt"]),
("promotion_canifa_tool.txt", "canifa-tool-promotion", ["canifa", "tool-prompt"]),
]
def read_file(directory: str, filename: str) -> str:
with open(os.path.join(directory, filename), "r", encoding="utf-8") as f:
return f.read()
def main():
print("🚀 PUSH ALL TO PRODUCTION LANGFUSE")
print(f" URL: {os.environ['LANGFUSE_BASE_URL']}")
print()
lf = Langfuse()
# ── Step 1: Prompt modules ──
print("=" * 60)
print("STEP 1: Prompt Modules")
print("=" * 60)
for filename, name, tags in PROMPT_MODULES:
content = read_file(PROMPT_DIR, filename)
lf.create_prompt(name=name, prompt=content, labels=["production"], tags=tags, type="text")
print(f" ✅ {filename:30s} → {name} ({len(content):,} chars)")
# Season
lf.create_prompt(name=SEASON_PROMPT_NAME, prompt=DEFAULT_SEASON_CONTENT,
labels=["production"], tags=["canifa", "system-addon"], type="text")
print(f" ✅ {'(season)':30s} → {SEASON_PROMPT_NAME}")
# Core (with composable references)
core_content = read_file(PROMPT_DIR, CORE_FILE)
refs = "\n".join(f"@@@langfusePrompt:name={n}|label=production@@@" for _, n, _ in PROMPT_MODULES)
refs += f"\n@@@langfusePrompt:name={SEASON_PROMPT_NAME}|label=production@@@"
composed = core_content + "\n" + refs + "\n"
lf.create_prompt(name=CORE_PROMPT_NAME, prompt=composed,
labels=["production"], tags=["canifa", "system-prompt"], type="text")
print(f" ✅ {'01_core.txt (composed)':30s} → {CORE_PROMPT_NAME}")
# ── Step 2: Tool prompts ──
print("\n" + "=" * 60)
print("STEP 2: Tool Prompts")
print("=" * 60)
for filename, name, tags in TOOL_PROMPTS:
filepath = os.path.join(TOOL_DIR, filename)
if not os.path.exists(filepath):
print(f" ⚠️ SKIP {filename} — not found")
continue
content = read_file(TOOL_DIR, filename)
lf.create_prompt(name=name, prompt=content, labels=["production"], tags=tags, type="text")
print(f" ✅ {filename:35s} → {name} ({len(content):,} chars)")
# ── Step 3: Verify ──
print("\n" + "=" * 60)
print("STEP 3: Verification")
print("=" * 60)
prompt = lf.get_prompt(CORE_PROMPT_NAME, label="production", cache_ttl_seconds=0)
print(f" Assembled prompt: {len(prompt.prompt):,} chars")
checks = [
("01 Core", "C-stylist"),
("02 Rules", "QUY TẮC TRUNG THỰC"),
("03 Context", "CONTEXT AWARENESS"),
("04a Sales", "PHONG CÁCH TƯ VẤN"),
("05 Tool Route", "KHI NÀO GỌI TOOL"),
("05b Results", "XỬ LÝ KẾT QUẢ TOOL"),
("05c Compare", "SO SÁNH"),
("06 Insight", "USER INSIGHT 2.0"),
("07 Output", "FORMAT ĐẦU RA"),
("08 Season", "HƯỚNG DẪN TƯ VẤN THEO MÙA"),
]
all_ok = True
for label, keyword in checks:
found = keyword in prompt.prompt
print(f" {'✅' if found else '❌'} {label}: '{keyword}'")
if not found:
all_ok = False
for filename, name, _ in TOOL_PROMPTS:
if not os.path.exists(os.path.join(TOOL_DIR, filename)):
continue
try:
p = lf.get_prompt(name, label="production", cache_ttl_seconds=0)
print(f" ✅ {name} ({len(p.prompt):,} chars)")
except Exception as e:
print(f" ❌ {name} — {e}")
all_ok = False
lf.flush()
print(f"\n{'🎉 ALL DONE!' if all_ok else '⚠️ Done with warnings'}")
if __name__ == "__main__":
main()
"""Push all prompts (system + tools) to Langfuse Production"""
import os
import sys
import subprocess
# --- 1. SET ENVIRONMENT VARIABLES FOR PRODUCTION ---
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-b20df146-732b-47cb-9669-47983906eb93"
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-611d32d6-e8c7-4e06-b5b5-c3486513a4a2"
os.environ["LANGFUSE_BASE_URL"] = "http://172.16.2.207:3009"
# We must import Langfuse AFTER setting os.environ, or initialize explicitly
from langfuse import Langfuse
def push_tools(lf):
print("\n" + "="*60)
print("🚀 PUSHING TOOL PROMPTS TO PRODUCTION")
print("="*60)
TOOL_PROMPTS_DIR = r"d:\cnf\chatbot_canifa\backend\agent\tool_prompts"
# Matching the exact names used in backend/agent/prompt_utils.py
TOOL_FILES = [
("brand_knowledge_tool.txt", "canifa-tool-brand-knowledge", ["canifa", "tool-prompt"]),
("check_is_stock.txt", "canifa-tool-check-stock", ["canifa", "tool-prompt"]),
("data_retrieval_tool.txt", "canifa-tool-data-retrieval", ["canifa", "tool-prompt"]),
("promotion_canifa_tool.txt","canifa-tool-promotion", ["canifa", "tool-prompt"]),
("store_search_tool.txt", "canifa-tool-store-search", ["canifa", "tool-prompt"]),
]
for filename, langfuse_name, tags in TOOL_FILES:
path = os.path.join(TOOL_PROMPTS_DIR, filename)
with open(path, "r", encoding="utf-8") as f:
content = f.read()
lf.create_prompt(
name=langfuse_name,
prompt=content,
labels=["production"],
tags=tags,
type="text",
)
print(f" ✅ {filename:30s} → {langfuse_name} ({len(content):,} chars)")
lf.flush()
print("✅ Tool prompts pushed successfully!\n")
def push_system_prompts():
print("="*60)
print("🚀 PUSHING SYSTEM PROMPTS TO PRODUCTION")
print("="*60)
# We can just call the existing push_modules_to_langfuse.py script
# since we already exported the env vars, it will pick up the prod credentials.
script_dir = r"d:\cnf\chatbot_canifa\backend\agent\prompt_module"
script_path = os.path.join(script_dir, "push_modules_to_langfuse.py")
env = os.environ.copy()
subprocess.run([sys.executable, script_path], cwd=script_dir, env=env, check=True)
if __name__ == "__main__":
sys.stdout.reconfigure(encoding="utf-8")
print("🌟 DEPLOYING TO C-STYLIST PRODUCTION 🌟")
print(f"Target: {os.environ.get('LANGFUSE_BASE_URL')}")
# 1. Push System Prompts
push_system_prompts()
# 2. Push Tool Prompts
lf = Langfuse()
push_tools(lf)
print("🎉 ALL DEPLOYMENTS TO PRODUCTION FINISHED SUCCESSFULLY!")
"""
Prompts for StylistPro Agent (Architecture 2.0).
Combining legacy business logic with modern XML structural tagging for maximum LLM performance.
"""
# ==============================================================================
# 1. CLASSIFIER PROMPT (The Router)
# ==============================================================================
STYLIST_PRO_CLASSIFIER_PROMPT = """\
<system_role>
Bạn là AI Classifier (Router) cấp cao của CANIFA — thương hiệu thời trang gia đình hàng đầu Việt Nam.
Nhiệm vụ duy nhất: Phân tích intent khách hàng và quyết định lộ trình xử lý (Chọn Tool).
</system_role>
<tool_inventory>
1. `lead_search_tool`: Tìm kiếm sản phẩm (Sử dụng cho 90% trường hợp tìm đồ, combo, giá, màu).
2. `check_is_stock`: Kiểm tra size, tồn kho, giá cụ thể của 1 SKU.
3. `canifa_store_search`: Tìm địa chỉ cửa hàng.
4. `canifa_get_promotions`: Tra cứu các chương trình khuyến mãi hiện có.
5. `canifa_knowledge_search`: Tra cứu kiến thức thương hiệu, chính sách đổi trả, chất liệu.
</tool_inventory>
<search_philosophy_dual_lane>
Khi dùng `lead_search_tool`, bạn BẮT BUỘC phân tách:
- **Lane 1 (Literal)**: Trích xuất raw_text. Bỏ từ thừa (tìm, mua, cái...), sửa lỗi typo (nghọ nghiêng -> ngọ nghiêng).
- **Lane 2 (Inferred)**: Suy luận SQL Filters (product_line_vn, gender_target, age_group, tags, keywords).
</search_philosophy_dual_lane>
<vietnam_holidays_mapping>
- 30/4 & 2/9: master_color="Màu đỏ", tags=["Màu đỏ", "Đi chơi / dạo phố"], keywords=["cờ đỏ sao vàng"].
- Tết Nguyên Đán: master_color="Màu đỏ/Màu vàng", keywords=["Tết", "may mắn", "du xuân"], product_line_vn=["Váy liền", "Áo dài"].
- Valentine (14/2): master_color="Màu hồng", keywords=["Valentine", "lãng mạn"].
</vietnam_holidays_mapping>
<mapping_logic_rules>
1. Thời tiết: "Trời mưa" -> Áo khoác gió, trượt nước. "Nắng nóng" -> Áo chống nắng, UV.
2. Dịp: "Đi tiệc" -> Váy liền, Áo kiểu, Sang trọng. "Đi làm" -> Áo sơ mi, Polo, Quần Khaki (Adult).
3. Đồ lót: "Sịp/chip" -> Quần lót, Áo lót. (Tool tự tìm gender, bạn chỉ cần set product_line).
4. Combo: Khách hỏi "đồ", "set", "phối" -> BẮT BUỘC gọi lead_search_tool. RESET product_line cũ từ insight.
</mapping_logic_rules>
<inheritance_rules>
- Kế thừa Giới tính/Đối tượng từ <user_insight> (VD: Target là Nam -> gender_target="men").
- RESET hoàn toàn product_line_vn theo câu hỏi mới nhất.
</inheritance_rules>
<guardrails>
Nghiêm túc chặn các chủ đề không liên quan (Off-topic):
1. Chính trị, tôn giáo, phân biệt chủng tộc, bạo lực, nội dung 18+.
2. So sánh trực diện hoặc tâng bốc đối thủ (Uniqlo, Ivy Moda, Yody, v.v.).
3. Các câu hỏi linh tinh không liên quan đến thời trang, phong cách sống, hoặc Canifa (Ví dụ: "Giá vàng", "Cách nấu phở", "Review phim", "Code Python").
4. Khi gặp off-topic: Set `tool_name: null`, `ai_response`: "Dạ, hiện tại em là trợ lý thời trang của Canifa nên em chỉ có thể hỗ trợ mình về phối đồ và sản phẩm thôi ạ. Anh/Chị cần em tư vấn bộ đồ nào cho hôm nay không ạ?"
</guardrails>
<output_format>
Trả về DUY NHẤT JSON:
{
"reasoning": "giải thích ngắn gọn tại sao chọn tool này hoặc tại sao chặn",
"tool_name": "tên_tool" hoặc null,
"search_args": {
"literal": {"raw_text": "..."},
"inferred": {"product_line_vn": [...], "gender_target": "...", "tags": [...], "keywords": [...], ...}
},
"ai_response": "chào hỏi xã giao HOẶC từ chối off-topic (chỉ dùng khi tool_name=null)",
"product_ids": []
}
</output_format>
"""
# ==============================================================================
# 2. STYLIST PROMPT (The Expert)
# ==============================================================================
STYLIST_PRO_RESPONDER_PROMPT = """\
<system_role>
Bạn là Chuyên gia Thời trang (Stylist Pro) của CANIFA. Bạn tư vấn dựa trên dữ liệu sản phẩm thực và triết lý thời trang của hãng.
</system_role>
<styling_philosophy>
1. GỢI Ý NGAY, KHÔNG HỎI LẠI: Luôn đưa ra lựa chọn. Nếu chưa biết giới tính, gợi ý cả 2 phương án.
2. COMBO / OUTFIT (CRITICAL):
- Nếu khách hỏi "đồ", "set", "phối", "mặc gì" -> PHẢI tạo outfit hoàn chỉnh (Áo + Quần/Váy).
- Sử dụng `outfit_recommendations` từ tool result để bốc đúng món phối.
- BẮT BUỘC nhét SKU món phối vào `product_ids`.
3. KIỂM TRA SỰ PHÙ HỢP: Đừng gợi ý đồ đông cho mùa hè, đừng gợi ý đồ đi làm là áo hoạt hình.
4. KHÔNG TRẦN TÌNH THUẬT TOÁN: Đừng nói "Hệ thống trả về...", hãy nói "Canifa đang có sẵn...".
</styling_philosophy>
<user_memory_update>
Cập nhật 12 trường Insight:
- STAGE: BROWSE (tìm kiếm) -> COMPARE (hỏi sâu tính năng/giá) -> DECIDE (chốt đơn/hỏi size).
- LATEST_PRODUCT_INTEREST: Ghi rõ Tên + SKU sản phẩm vừa tư vấn chính.
- SUMMARY_HISTORY: Tóm tắt các bước đã tư vấn.
</user_memory_update>
<formatting_rules>
- SKU sản phẩm phải bọc trong ngoặc tròn, ví dụ: (6TS25S001).
- MỌI SKU nhắc đến trong text BẮT BUỘC phải nằm trong mảng product_ids.
- Tối đa 250 từ. Không dán URL.
</formatting_rules>
<output_format>
Trả về DUY NHẤT JSON:
{{
"ai_response": "nội dung tư vấn chuyên nghiệp",
"product_ids": ["SKU1", "SKU2", ...],
"user_insight": {{ ...updated 12 fields... }}
}}
</output_format>
<context_injection>
{stage_injection}
</context_injection>
"""
def format_stylist_pro_injection(insight: dict) -> str:
"""Format InsightJSON dict thành đoạn XML context cho prompt."""
template = """
<user_memory>
- User Type: {user}
- Target: {target}
- Goal: {goal}
- Constraints: {constraints}
- Stage: {stage} (Level {stage_num})
- Tone: {tone}
- Latest Interest: {latest}
- History Summary: {summary}
</user_memory>
"""
return template.format(
user=insight.get("USER", "Chưa rõ"),
target=insight.get("TARGET", "Chưa rõ"),
goal=insight.get("GOAL", "Chưa rõ"),
constraints=insight.get("CONSTRAINTS") or "Không",
stage=insight.get("STAGE", "BROWSE"),
stage_num=insight.get("STAGE_NUM", 1),
tone=insight.get("TONE", "Friendly"),
latest=insight.get("LATEST_PRODUCT_INTEREST") or "Chưa có",
summary=insight.get("SUMMARY_HISTORY") or "Chưa có"
)
import logging
import json
import time
from typing import Dict, Any, List
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
from langchain_core.runnables import RunnableConfig
# New Modular Imports
from backend.agent.nodes.models_node import StylistProState
from backend.agent.nodes.classifier_node import classifier_node
from backend.agent.nodes.stylist_node import stylist_node
from backend.agent.nodes.utils import _extract_text
# StarRocks connection for Enrichment
from agent.lead_stage_agent.product_search_engine import SELECT_COLUMNS, TABLE_NAME
from common.starrocks_connection import get_db_connection
logger = logging.getLogger(__name__)
def route_after_classifier(state: StylistProState) -> str:
"""Điều hướng dựa trên early_exit."""
if state.get("early_exit"):
return END
return "stylist"
class StylistProGraph:
"""
Architect Stylist Graph (Version 2.0).
Modular Architecture: Classifier -> Stylist -> END
"""
def __init__(self):
self._compiled = None
def build(self):
if self._compiled:
return self._compiled
workflow = StateGraph(StylistProState)
workflow.add_node("classifier", classifier_node)
workflow.add_node("stylist", stylist_node)
workflow.set_entry_point("classifier")
workflow.add_conditional_edges(
"classifier",
route_after_classifier,
{
"stylist": "stylist",
END: END
}
)
workflow.add_edge("stylist", END)
self._compiled = workflow.compile()
logger.info("✅ StylistProGraph compiled (Modular)")
return self._compiled
async def chat(
self,
user_message: str,
user_insight: str | None = None,
history: List[BaseMessage] | None = None,
config: RunnableConfig | None = None,
) -> Dict[str, Any]:
"""
Main entry point with StarRocks Product Enrichment.
"""
start_time = time.time()
graph = self.build()
messages = history.copy() if history else []
messages.append(HumanMessage(content=user_message))
initial_state = {
"messages": messages,
"user_insight": user_insight,
"tool_result": None,
"tool_name_used": None,
"updated_insight": None,
"lead_stage": None,
"stage_injection": None,
"product_ids": [],
"early_exit": False,
"diagnostics": [],
}
result = await graph.ainvoke(initial_state, config=config)
elapsed_ms = (time.time() - start_time) * 1000
# Extract AI Response
ai_response = ""
for msg in reversed(result["messages"]):
if isinstance(msg, AIMessage):
ai_response = _extract_text(msg.content)
break
# --- PRODUCT ENRICHMENT ---
final_products = []
ai_product_ids = result.get("product_ids", [])
tool_result_raw = result.get("tool_result")
product_dict = {}
if tool_result_raw:
try:
parsed = json.loads(tool_result_raw)
if parsed.get("status") == "success":
all_products = parsed.get("products", [])
for p in all_products:
sku = str(p.get("magento_ref_code") or p.get("sku") or "").upper().strip()
if sku:
if "sku" not in p: p["sku"] = sku
product_dict[sku] = p
recs = p.get("outfit_recommendations", [])
if isinstance(recs, list):
for r in recs:
r_sku = str(r.get("match_product_code") or r.get("sku") or "").upper().strip()
if r_sku:
if "sku" not in r: r["sku"] = r_sku
product_dict[r_sku] = r
except Exception: pass
# Enrich missing IDs via StarRocks
wanted_ids = [pid.upper().strip() for pid in ai_product_ids if pid]
missing_ids = [pid for pid in wanted_ids if pid not in product_dict or not product_dict[pid].get("image")]
if missing_ids:
try:
db = get_db_connection()
skus = list(dict.fromkeys(missing_ids))
base_skus = list(dict.fromkeys([s.split("-")[0] for s in skus]))
keys = list(dict.fromkeys(skus + base_skus))
placeholders = ",".join(["%s"] * len(keys))
sql = f"SELECT {SELECT_COLUMNS} FROM {TABLE_NAME} WHERE UPPER(magento_ref_code) IN ({placeholders}) OR UPPER(internal_ref_code) IN ({placeholders}) LIMIT 200"
rows = await db.execute_query_async(sql, params=tuple(keys + keys))
for r in rows or []:
card = {
"sku": (r.get("magento_ref_code") or r.get("internal_ref_code") or "").strip(),
"name": r.get("product_name", ""),
"price": int(r.get("sale_price") or 0),
"original_price": int(r.get("original_price") or 0),
"image": r.get("product_image_url_thumbnail", ""),
"url": r.get("product_web_url", ""),
"sizes": r.get("size_scale", ""),
"gender": r.get("gender_by_product", ""),
"product_line": r.get("product_line_vn", ""),
}
product_dict[card["sku"].upper()] = card
except Exception as e:
logger.warning(f"⚠️ Enrichment failed: {e}")
seen = set()
for pid in wanted_ids:
if pid in product_dict and pid not in seen:
final_products.append(product_dict[pid])
seen.add(pid)
return {
"response": ai_response,
"elapsed_ms": round(elapsed_ms, 2),
"lead_stage": result.get("lead_stage"),
"updated_insight": result.get("updated_insight"),
"product_ids": ai_product_ids,
"products": final_products,
"pipeline": result.get("diagnostics", [])
}
# Singleton
stylist_pro_graph = StylistProGraph()
""" """
Tools Package Tools package exports.
Export tool và factory function """
"""
from .data_retrieval_tool import data_retrieval_tool
from .data_retrieval_tool import data_retrieval_tool
from .get_tools import get_all_tools
from .promotion_canifa_tool import canifa_get_promotions def get_all_tools():
from .get_tools import get_all_tools as _get_all_tools
__all__ = ["data_retrieval_tool", "get_all_tools", "canifa_get_promotions"]
return _get_all_tools()
def __getattr__(name: str):
if name == "canifa_get_promotions":
from .promotion_canifa_tool import canifa_get_promotions
return canifa_get_promotions
raise AttributeError(name)
__all__ = ["data_retrieval_tool", "get_all_tools", "canifa_get_promotions"]
...@@ -21,20 +21,19 @@ class KnowledgeSearchInput(BaseModel): ...@@ -21,20 +21,19 @@ class KnowledgeSearchInput(BaseModel):
@tool("canifa_knowledge_search", args_schema=KnowledgeSearchInput) @tool("canifa_knowledge_search", args_schema=KnowledgeSearchInput)
async def canifa_knowledge_search(query: str) -> str: async def canifa_knowledge_search(query: str) -> str:
""" """
(Placeholder docstring - Actual prompt is loaded from file)
Tra cứu thông tin thương hiệu Canifa (chính sách, KHTT, bảng size...). Tra cứu thông tin thương hiệu Canifa (chính sách, KHTT, bảng size...).
KHÔNG dùng để tìm cửa hàng - dùng canifa_store_search. KHÔNG dùng để tìm cửa hàng - dùng canifa_store_search.
""" """
logger.info(f"[Semantic Search] Brand Knowledge query: {query}") logger.info(f"[Semantic Search] Brand Knowledge query: {query}")
try: try:
# 1. Tạo embedding cho câu hỏi (cached 24h trong Redis) # 1. Tạo embedding cho câu hỏi
query_vector = await create_embedding_async(query) query_vector = await create_embedding_async(query)
if not query_vector: if not query_vector:
return "Xin lỗi, tôi gặp sự cố khi xử lý thông tin. Vui lòng thử lại sau." return "Xin lỗi, tôi gặp sự cố khi xử lý thông tin. Vui lòng thử lại sau."
v_str = json.dumps(query_vector) v_str = json.dumps(query_vector)
# 2. SINGLE QUERY: Tìm top K chunks → lấy full document theo title (1 round-trip) # 2. Tìm kiếm vector trên StarRocks
sql = f""" sql = f"""
WITH top_chunks AS ( WITH top_chunks AS (
SELECT CAST(json_query(metadata, '$.title') AS VARCHAR) AS title SELECT CAST(json_query(metadata, '$.title') AS VARCHAR) AS title
...@@ -53,46 +52,25 @@ async def canifa_knowledge_search(query: str) -> str: ...@@ -53,46 +52,25 @@ async def canifa_knowledge_search(query: str) -> str:
results = await sr.execute_query_async(sql) results = await sr.execute_query_async(sql)
if not results: if not results:
logger.warning(f"No knowledge data found in DB for query: {query}") return "Hiện tại tôi chưa tìm thấy thông tin chính xác về nội dung này trong hệ thống kiến thức của Canifa."
return "Hiện tại tôi chưa tìm thấy thông tin chính xác về nội dung này trong hệ thống kiến thức của Canifa. Bạn có thể liên hệ hotline 1800 6061 để được hỗ trợ trực tiếp."
# 3. Phân nhóm nội dung theo title
docs_by_title: dict[str, list[str]] = {} docs_by_title: dict[str, list[str]] = {}
for res in results: for res in results:
try: meta_obj = json.loads(res.get("metadata", "{}")) if isinstance(res.get("metadata"), str) else res.get("metadata", {})
meta_obj = ( title = meta_obj.get("title") if isinstance(meta_obj, dict) else None
json.loads(res.get("metadata", "{}")) content = res.get("content", "").strip()
if isinstance(res.get("metadata"), str) if title and content:
else res.get("metadata", {}) docs_by_title.setdefault(title, []).append(content)
)
title = meta_obj.get("title") if isinstance(meta_obj, dict) else None
content = res.get("content", "").strip()
if title and content:
docs_by_title.setdefault(title, []).append(content)
except Exception as e:
logger.debug(f"Skip invalid row while grouping by title: {e}")
if not docs_by_title:
# Fallback: trả raw content nếu không parse được title
knowledge_texts = [res.get("content", "") for res in results]
return "\n\n---\n\n".join(knowledge_texts)
# 4. Tổng hợp kết quả
final_blocks = [] final_blocks = []
for title, contents in docs_by_title.items(): for title, contents in docs_by_title.items():
if contents: full_text = "\n\n".join(contents)
full_text = "\n\n".join(contents) final_blocks.append(f"=== TÀI LIỆU: {title} ===\n{full_text}")
final_blocks.append(f"=== TÀI LIỆU: {title} ===\n{full_text}")
final_response = "\n\n".join(final_blocks) return "\n\n".join(final_blocks)
logger.info(f"Parent Document Retrieval complete. Loaded full context for {len(final_blocks)} topics.")
return final_response
except Exception as e: except Exception as e:
logger.error(f"Error in canifa_knowledge_search: {e}") logger.error(f"Error in canifa_knowledge_search: {e}")
return "Tôi đang gặp khó khăn khi truy cập kho kiến thức. Bạn muốn hỏi về sản phẩm gì khác không?" return "Tôi đang gặp khó khăn khi truy cập kho kiến thức."
canifa_knowledge_search.__doc__ = read_tool_prompt("brand_knowledge_tool") or canifa_knowledge_search.__doc__ canifa_knowledge_search.__doc__ = read_tool_prompt("brand_knowledge_tool") or canifa_knowledge_search.__doc__
This diff is collapsed.
""" """
Tools Factory Tools Factory.
Return all tools for the agent. """
"""
from langchain_core.tools import Tool
from langchain_core.tools import Tool
from .brand_knowledge_tool import canifa_knowledge_search def get_all_tools() -> list[Tool]:
from .data_retrieval_tool import data_retrieval_tool from .brand_knowledge_tool import canifa_knowledge_search
from .promotion_canifa_tool import canifa_get_promotions from .check_is_stock import check_is_stock
from .check_is_stock import check_is_stock from .data_retrieval_tool import data_retrieval_tool
from .store_search_tool import canifa_store_search from .promotion_canifa_tool import canifa_get_promotions
from .store_search_tool import canifa_store_search
def get_all_tools() -> list[Tool]: return [
"""Return toàn bộ list tools cho Agent""" data_retrieval_tool,
return [data_retrieval_tool, canifa_knowledge_search, canifa_get_promotions, check_is_stock, canifa_store_search] canifa_knowledge_search,
canifa_get_promotions,
check_is_stock,
canifa_store_search,
]
{
"sku": "6KS25W005", ✅ Mã sản phẩm
"color_code": "SE008", ✅ Mã màu
"name": "Chân váy nữ dáng A",
"color": "Be/ Beige", ✅ Tên màu
"price": 399000,
"sale_price": 279300,
"url": "https://canifa.com/chan-vay-nu-6ks25w005?color=SE008&utm_source=chatbot&utm_medium=rsa&utm_campaign=ver1",
"thumbnail_image_url": "https://..."
}
\ No newline at end of file
This diff is collapsed.
...@@ -12,7 +12,7 @@ logger = logging.getLogger(__name__) ...@@ -12,7 +12,7 @@ logger = logging.getLogger(__name__)
class PromotionInput(BaseModel): class PromotionInput(BaseModel):
check_date: str | None = Field( check_date: str | None = Field(
..., default=None,
description="Ngày cần kiểm tra khuyến mãi (định dạng YYYY-MM-DD). Nếu muốn kiểm tra ngày hiện tại thì truyền null." description="Ngày cần kiểm tra khuyến mãi (định dạng YYYY-MM-DD). Nếu muốn kiểm tra ngày hiện tại thì truyền null."
) )
model_config = {"extra": "forbid"} model_config = {"extra": "forbid"}
...@@ -22,19 +22,12 @@ class PromotionInput(BaseModel): ...@@ -22,19 +22,12 @@ class PromotionInput(BaseModel):
async def canifa_get_promotions(check_date: str = None) -> str: async def canifa_get_promotions(check_date: str = None) -> str:
""" """
Tra cứu danh sách các chương trình khuyến mãi (CTKM) đang diễn ra theo ngày. Tra cứu danh sách các chương trình khuyến mãi (CTKM) đang diễn ra theo ngày.
Sử dụng tool này khi khách hàng hỏi về:
- "Hôm nay có khuyến mãi gì không?"
- "Đang có chương trình gì hot?"
- "Ngày mai có giảm giá không?"
- "Danh sách mã giảm giá hiện tại."
Trả về: Tên chương trình, mô tả chi tiết, thời gian áp dụng.
""" """
target_date = check_date target_date = check_date
if not target_date: if not target_date:
target_date = datetime.now().strftime("%Y-%m-%d") target_date = datetime.now().strftime("%Y-%m-%d")
logger.info(f"🎁 [Promotion Search] Checking for date: {target_date}") logger.info(f"🎁 [Promotion Search] Checking for date: {target_date} (StarRocks)")
try: try:
sql = f""" sql = f"""
...@@ -64,9 +57,8 @@ async def canifa_get_promotions(check_date: str = None) -> str: ...@@ -64,9 +57,8 @@ async def canifa_get_promotions(check_date: str = None) -> str:
results = await sr.execute_query_async(sql) results = await sr.execute_query_async(sql)
if not results: if not results:
return f"Hiện tại (ngày {target_date}) không có chương trình khuyến mãi nào đang diễn ra trên hệ thống." return f"Hiện tại (ngày {target_date}) không có chương trình khuyến mãi nào đang diễn ra."
# Map channel values to Vietnamese labels
channel_labels = { channel_labels = {
"only_online": "Online (Web/App)", "only_online": "Online (Web/App)",
"both": "Online + Cửa hàng", "both": "Online + Cửa hàng",
...@@ -83,8 +75,6 @@ async def canifa_get_promotions(check_date: str = None) -> str: ...@@ -83,8 +75,6 @@ async def canifa_get_promotions(check_date: str = None) -> str:
t_date = res.get("to_date", "") t_date = res.get("to_date", "")
channel = res.get("applied_channel", "unknown") channel = res.get("applied_channel", "unknown")
channel_label = channel_labels.get(channel, channel) channel_label = channel_labels.get(channel, channel)
# Use description_full if available and longer, else description
content = desc_full if desc_full and len(str(desc_full)) > len(str(desc)) else desc content = desc_full if desc_full and len(str(desc_full)) > len(str(desc)) else desc
lines.append( lines.append(
...@@ -95,21 +85,10 @@ async def canifa_get_promotions(check_date: str = None) -> str: ...@@ -95,21 +85,10 @@ async def canifa_get_promotions(check_date: str = None) -> str:
f"Thời gian: {f_date} đến {t_date}\n" f"Thời gian: {f_date} đến {t_date}\n"
) )
lines.append(
"LƯU Ý QUAN TRỌNG VỀ KÊNH ÁP DỤNG:\n"
"- Mặc định: ƯU TIÊN trình bày các CTKM 'Online (Web/App)' và 'Online + Cửa hàng' TRƯỚC.\n"
"- Chỉ trình bày CTKM 'Chỉ tại cửa hàng' khi khách hỏi CỤ THỂ về ưu đãi tại cửa hàng/offline.\n"
"- Trình bày NỘI DUNG ưu đãi chi tiết cho khách. "
"Nếu nội dung chỉ ghi địa điểm áp dụng mà KHÔNG ghi cụ thể giảm bao nhiêu %, "
"hãy trình bày tên chương trình + thời gian + nội dung có sẵn, "
"rồi hướng dẫn khách liên hệ hotline 1800 6061 hoặc vào canifa.com để xem chi tiết ưu đãi."
)
return "\n".join(lines) return "\n".join(lines)
except Exception as e: except Exception as e:
logger.error(f"❌ Error in canifa_get_promotions: {e}") logger.error(f"❌ Error in canifa_get_promotions: {e}")
return "Xin lỗi, tôi không thể lấy danh sách khuyến mãi lúc này." return "Xin lỗi, tôi không thể lấy danh sách khuyến mãi lúc này."
# Load dynamic docstring
canifa_get_promotions.__doc__ = read_tool_prompt("promotion_canifa_tool") or canifa_get_promotions.__doc__ canifa_get_promotions.__doc__ = read_tool_prompt("promotion_canifa_tool") or canifa_get_promotions.__doc__
This diff is collapsed.
...@@ -20,28 +20,23 @@ class StoreSearchInput(BaseModel): ...@@ -20,28 +20,23 @@ class StoreSearchInput(BaseModel):
async def canifa_store_search(location: str) -> str: async def canifa_store_search(location: str) -> str:
""" """
Tìm kiếm cửa hàng CANIFA theo địa điểm/khu vực. Tìm kiếm cửa hàng CANIFA theo địa điểm/khu vực.
Sử dụng khi khách hàng hỏi về cửa hàng tại một khu vực cụ thể.
""" """
logger.info(f"🏪 [Store Search] Location: {location}") logger.info(f"🏪 [Store Search] Location: {location} (StarRocks)")
try: try:
sr = get_db_connection() sr = get_db_connection()
# Clean location: bỏ prefix generic
clean = location.lower().strip() clean = location.lower().strip()
for prefix in ["quận ", "huyện ", "tỉnh ", "thành phố ", "tp. ", "tp "]: for prefix in ["quận ", "huyện ", "tỉnh ", "thành phố ", "tp. ", "tp "]:
clean = clean.replace(prefix, "") clean = clean.replace(prefix, "")
clean = clean.strip() clean = clean.strip()
# Tách thành tokens, deduplicate (giữ thứ tự)
# VD: "hà đông, hà nội" → ["hà", "đông", "nội"]
tokens = list(dict.fromkeys( tokens = list(dict.fromkeys(
t for t in clean.replace(',', ' ').split() if t.strip() t for t in clean.replace(',', ' ').split() if t.strip()
)) ))
if not tokens: if not tokens:
return "Vui lòng cho em biết khu vực bạn muốn tìm cửa hàng CANIFA (ví dụ: Hoàng Mai, Cầu Giấy, Đà Nẵng...)." return "Vui lòng cho em biết khu vực bạn muốn tìm cửa hàng CANIFA."
# Search trên concat tất cả cột địa chỉ
text_col = "LOWER(concat_ws(' ', store_name, address, city, state))" text_col = "LOWER(concat_ws(' ', store_name, address, city, state))"
def _build_sql(where_clause: str) -> str: def _build_sql(where_clause: str) -> str:
...@@ -54,39 +49,12 @@ async def canifa_store_search(location: str) -> str: ...@@ -54,39 +49,12 @@ async def canifa_store_search(location: str) -> str:
LIMIT 20 LIMIT 20
""" """
# ═══════════════════════════════════════════════
# Step 1: AND tất cả tokens (strict match)
# "hà đông hà nội" → tokens ["hà","đông","nội"] → AND → 5 stores ✓
# ═══════════════════════════════════════════════
and_conds = [f"{text_col} LIKE '%{tk}%'" for tk in tokens] and_conds = [f"{text_col} LIKE '%{tk}%'" for tk in tokens]
results = await sr.execute_query_async(_build_sql(' AND '.join(and_conds))) results = await sr.execute_query_async(_build_sql(' AND '.join(and_conds)))
# ═══════════════════════════════════════════════
# Step 2: Fallback — Reverse LIKE
# Dùng chính DB làm từ điển địa danh:
# Kiểm tra tên quận/huyện/tỉnh nào trong DB XUẤT HIỆN trong input user
# "hà đông cầu giấy" chứa "hà đông" (city) + "cầu giấy" (city) → lấy CẢ 2
# ═══════════════════════════════════════════════
if not results and len(tokens) >= 2:
# Strip prefix khỏi city: "Quận Hà Đông" → "hà đông"
city_stripped = """LOWER(TRIM(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(
city, 'Quận ', ''), 'Huyện ', ''), 'Thành phố ', ''), 'Thị xã ', ''), 'TP. ', '')))"""
state_lower = "LOWER(TRIM(state))"
fallback_where = f"""
(LOCATE({city_stripped}, '{clean}') > 0 AND LENGTH({city_stripped}) > 1)
OR
(LOCATE({state_lower}, '{clean}') > 0 AND LENGTH({state_lower}) > 1)
"""
results = await sr.execute_query_async(_build_sql(fallback_where))
logger.info(f"📊 Store search: reverse-LIKE fallback for '{clean}'")
logger.info(f"📊 Store search: {len(results)} stores found for '{location}'")
if not results: if not results:
return f"Không tìm thấy cửa hàng CANIFA tại khu vực '{location}'. Khách hàng có thể liên hệ hotline 1800 6061 để được hỗ trợ tìm cửa hàng gần nhất." return f"Không tìm thấy cửa hàng CANIFA tại khu vực '{location}'."
# Format kết quả rõ ràng cho LLM
lines = [] lines = []
for r in results: for r in results:
name = r.get("store_name", "") name = r.get("store_name", "")
...@@ -98,22 +66,11 @@ async def canifa_store_search(location: str) -> str: ...@@ -98,22 +66,11 @@ async def canifa_store_search(location: str) -> str:
t_open = r.get("time_open_today", "") t_open = r.get("time_open_today", "")
t_close = r.get("time_close_today", "") t_close = r.get("time_close_today", "")
store_info = f"🏪 {name}\n" store_info = f"🏪 {name}\n 📍 Địa chỉ: {addr}, {city}, {state}\n 📞 ĐT: {phone}\n 🕐 Lịch: {schedule} ({t_open}-{t_close})"
store_info += f" 📍 Địa chỉ: {addr}"
if city:
store_info += f", {city}"
if state:
store_info += f", {state}"
store_info += f"\n 📞 ĐT: {phone}"
store_info += f"\n 🕐 Lịch: {schedule}"
if t_open and t_close:
store_info += f" (Hôm nay: {t_open}-{t_close})"
lines.append(store_info) lines.append(store_info)
return f"Tìm thấy {len(results)} cửa hàng CANIFA tại khu vực '{location}':\n\n" + "\n\n".join(lines) return f"Tìm thấy {len(results)} cửa hàng CANIFA tại '{location}':\n\n" + "\n\n".join(lines)
except Exception as e: except Exception as e:
logger.error(f"❌ Error in canifa_store_search: {e}") logger.error(f"❌ Error in canifa_store_search: {e}")
return "Tôi đang gặp khó khăn khi tìm kiếm cửa hàng. Bạn có thể liên hệ hotline 1800 6061 để được hỗ trợ." return "Tôi đang gặp khó khăn khi tìm kiếm cửa hàng."
from .search_engine import SearchEngine
from .db_connector import DBConnector
from .stock_provider import enrich_with_stock
from .product_mapping import resolve_product_line, get_related_lines
from .pattern_detector import HardPatternDetector
from .size_message_builder import build_size_message
__all__ = [
"SearchEngine",
"DBConnector",
"enrich_with_stock",
"resolve_product_line",
"get_related_lines",
"HardPatternDetector",
"build_size_message"
]
import logging
import re
import sqlite3
import aiosqlite
from typing import Optional
from common.starrocks_connection import get_db_connection
from common.sqlite_db import sqlite_db
from common.constants import SQLITE_DB_PATH
logger = logging.getLogger(__name__)
def translate_query(query: str) -> str:
"""
Dịch câu SQL đặc thù của StarRocks sang dạng mà SQLite hiểu được.
Mượn logic từ sqlite_mock.py của bro.
"""
# 1. Thay thế Placeholder (%s -> ?)
q = re.sub(r'(?<!%)%s(?!%)', '?', query)
# 2. Thay thế cấu trúc của StarRocks (table name)
# test_db.magento_... → sr__test_db__magento_...
q = re.sub(
r'(?:test_db\.)?`?magento_product_dimension_with_text_embedding`?',
r'sr__test_db__magento_product_dimension_with_text_embedding',
q
)
# 3. Vá các ngữ pháp (Dialect)
q = re.sub(r'ANY_VALUE\s*\(', r'MAX(', q, flags=re.IGNORECASE)
q = re.sub(r'\bIF\s*\(', r'IIF(', q, flags=re.IGNORECASE)
q = re.sub(r'NULLS\s+LAST', '', q, flags=re.IGNORECASE)
q = re.sub(r'\bILIKE\b', 'LIKE', q, flags=re.IGNORECASE)
q = re.sub(r'\bTRUE\b', '1', q, flags=re.IGNORECASE)
q = re.sub(r'\bFALSE\b', '0', q, flags=re.IGNORECASE)
# 4. FIND_IN_SET(%s, REPLACE(col, '|', ',')) > 0 → INSTR fallback
# SQLite không hỗ trợ FIND_IN_SET, thay bằng INSTR
q = re.sub(
r"FIND_IN_SET\s*\(\s*(\?)\s*,\s*REPLACE\s*\(\s*(\w+)\s*,\s*'[|]'\s*,\s*','\s*\)\s*\)\s*>\s*0",
r"('|' || \2 || '|') LIKE ('%|' || \1 || '|%')",
q, flags=re.IGNORECASE
)
logger.debug(f"📝 [SQLite Translated] {q[:200]}")
return q
class DBConnector:
"""
Bộ điều phối DB: StarRocks vs SQLite.
"""
def __init__(self, use_sqlite: bool = False):
self.use_sqlite = use_sqlite
async def execute_query(self, query: str, params: tuple = ()) -> list[dict]:
if self.use_sqlite:
t_query = translate_query(query)
logger.info(f"📁 [DBConnector] Using SQLite Mock | Query: {t_query[:100]}...")
try:
return await sqlite_db.fetch_all(t_query, params)
except Exception as e:
logger.error(f"❌ [DBConnector] SQLite Error: {e}")
return []
else:
logger.info(f"🚀 [DBConnector] Using StarRocks Production")
db = get_db_connection()
if not db:
logger.error("❌ [DBConnector] StarRocks Connection Failed!")
return []
try:
return await db.execute_query_async(query, params)
except Exception as e:
logger.error(f"❌ [DBConnector] StarRocks Error: {e}. Falling back to SQLite...")
# Tự động cứu bồ
t_query = translate_query(query)
return await sqlite_db.fetch_all(t_query, params)
import re
from typing import Optional, Any
from dataclasses import dataclass
@dataclass
class PatternMatch:
pattern_type: str # "cheaper_than", "similar_to", "check_size", ...
reference_sku: Optional[str] = None
reference_product_line: Optional[str] = None
reference_price: Optional[float] = None
reference_color: Optional[str] = None
reference_size: Optional[str] = None
target_price_max: Optional[float] = None
target_price_min: Optional[float] = None
target_size: Optional[str] = None
target_color: Optional[str] = None
confidence: float = 0.0
raw_match: str = ""
class HardPatternDetector:
CHEAPER_PATTERNS = [r"(?:rẻ|giá)\s+(?:hơn|thấp hơn|xuống hơn)", r"có.*rẻ.*hơn"]
EXPENSIVE_PATTERNS = [r"(?:đắt|giá)\s+(?:hơn|cao hơn)", r"có.*đắt.*hơn"]
SIMILAR_PATTERNS = [r"(?:giống|tương tự|như|cùng)\s+(?:như|với)?\s+(?:cái|chiếc)?\s*(?:đó|trước|ban\s*nãy|cạnh|là)?"]
SIZE_CHECK_PATTERNS = [r"(?:còn|có)\s+size\s+([A-Z0-9]+)"]
COLOR_CHECK_PATTERNS = [r"(?:màu\s+)?(\w+)\s+(?:còn|có)"]
STOCK_CHECK_PATTERNS = [r"(?:còn|có)\s+(?:hàng|kho)\s+(?:không|ko|kh)"]
SKU_PATTERN = r"\b([A-Z0-9]{4,}-[A-Z0-9]{3,}(?:-[A-Z0-9]{3,})?)\b"
def detect(self, query: str, context_products: list[dict] = None) -> PatternMatch:
query_lower = query.lower().strip()
match = PatternMatch(pattern_type="unknown")
if any(re.search(p, query_lower) for p in self.CHEAPER_PATTERNS):
match.pattern_type = "cheaper_than"
match.confidence = 0.9
elif any(re.search(p, query_lower) for p in self.EXPENSIVE_PATTERNS):
match.pattern_type = "more_expensive_than"
match.confidence = 0.9
elif any(re.search(p, query_lower) for p in self.SIMILAR_PATTERNS):
match.pattern_type = "similar_to"
match.confidence = 0.8
elif sm := re.search(self.SIZE_CHECK_PATTERNS[0], query_lower):
match.pattern_type = "check_size"
match.target_size = sm.group(1).upper()
# Populate reference info if available
if context_products and match.pattern_type != "unknown":
last_p = context_products[-1]
match.reference_sku = last_p.get("code") or last_p.get("magento_ref_code")
match.reference_price = float(last_p.get("price") or 0)
match.reference_product_line = last_p.get("product_line_vn")
if match.pattern_type == "cheaper_than":
match.target_price_max = match.reference_price * 0.95
elif match.pattern_type == "more_expensive_than":
match.target_price_min = match.reference_price * 1.05
return match
"""
Product Line Mapping - Đồng bộ từ Lead Stage
"""
PRODUCT_LINE_MAP: dict[str, list[str]] = {
"Áo Sơ mi": ["áo sơ mi", "áo công sở", "áo đi làm", "sơ mi", "sơmi", "áo sơmi"],
"Áo Polo": ["áo polo", "áo cổ bẻ", "polo"],
"Áo phông": ["áo phông", "áo thun", "áo thun ngắn tay", "áo cổ v", "áo cổ tym", "áo cộc tay"],
"Áo nỉ có mũ": ["áo nỉ có mũ", "áo hoodie", "hoodie"],
"Áo nỉ": ["áo nỉ", "áo sweater", "sweater"],
"Áo mặc nhà": ["áo mặc nhà", "áo ngủ", "áo ở nhà"],
"Áo lót": ["áo lót", "áo ngực", "áo quây", "áo lót nữ", "áo lót nam", "áo lót trẻ em"],
"Áo len gilet": ["áo len gilet", "áo gile len"],
"Áo len": ["áo len", "áo len dài tay"],
"Áo kiểu": ["áo kiểu", "áo điệu", "áo nữ tính"],
"Áo khoác sợi": ["áo khoác sợi"],
"Áo khoác nỉ không mũ": ["áo khoác nỉ không mũ", "áo khoác sweater"],
"Áo khoác nỉ có mũ": ["áo khoác nỉ có mũ", "áo khoác hoodie", "áo khoác nỉ"],
"Áo khoác lông vũ": ["áo khoác lông vũ", "áo phao lông vũ", "áo lông vũ"],
"Áo khoác gió": ["áo khoác gió", "áo gió", "áo khoác mỏng"],
"Áo khoác gilet chần bông": ["áo khoác gilet chần bông", "áo khoác gilet trần bông", "áo gilet chần bông", "áo gilet trần bông"],
"Áo khoác gilet": ["áo khoác gilet", "áo gile", "gile"],
"Áo khoác dạ": ["áo khoác dạ", "áo dạ"],
"Áo khoác dáng ngắn": ["áo khoác dáng ngắn", "áo khoác croptop"],
"Áo khoác chống nắng": ["áo khoác chống nắng", "áo chống nắng"],
"Áo khoác chần bông": ["áo khoác chần bông", "áo khoác trần bông", "áo chần bông", "áo trần bông", "áo phao"],
"Áo khoác": ["áo khoác", "áo ấm", "áo rét"],
"Áo giữ nhiệt": ["áo giữ nhiệt", "áo tản nhiệt", "áo heattech"],
"Áo bra active": ["áo bra active", "áo bra", "bra", "áo tập", "áo thể thao"],
"Áo Body": ["áo body", "áo croptop", "croptop", "baby tee", "áo lửng", "áo dáng ngắn", "áo ôm"],
"Áo ba lỗ": ["áo ba lỗ", "áo sát nách", "tanktop", "tank top", "áo dây", "áo 2 dây", "áo hai dây"],
"Váy liền": ["váy liền", "đầm", "váy công sở", "đầm công sở", "váy liền thân", "đầm suông", "váy dài", "váy body"],
"Chân váy": ["chân váy", "váy maxi", "váy midi", "chân váy dài", "chân váy chữ a", "chân váy công sở", "váy ngắn"],
"Quần giả váy": ["quần giả váy", "quần váy", "skort"],
"Quần soóc": ["quần soóc", "quần đùi", "quần short", "quần lửng", "quần ngố", "short", "quần đùi nam", "quần đùi nữ"],
"Quần nỉ": ["quần nỉ", "quần jogger", "quần ống bo chun", "jogger", "quần thể thao"],
"Quần mặc nhà": ["quần mặc nhà", "quần ngủ", "quần đùi mặc nhà"],
"Quần lót đùi": ["quần lót đùi", "quần sịp đùi", "quần boxer", "boxer", "sịp đùi", "quần xì đùi"],
"Quần lót tam giác": ["quần lót tam giác", "quần sịp tam giác", "quần brief", "brief", "sịp tam giác", "quần xì tam giác"],
"Quần lót": ["quần lót", "quần chip", "quần sịp", "quần trong", "quần nhỏ", "quần xơ lít", "quần xì", "sịp", "chip", "đồ lót"],
"Quần leggings mặc nhà": ["quần leggings mặc nhà", "quần legging mặc nhà"],
"Quần leggings": ["quần leggings", "leggings", "quần legging", "legging", "quần thun ôm"],
"Quần Khaki": ["quần khaki", "quần âu", "quần vải", "quần tây", "quần công sở", "quần đi làm", "quần âu nam", "quần âu nữ", "quần kaki"],
"Quần jean": ["quần jean", "quần bò", "quần jeans", "denim", "jeans", "bò", "jean", "quần dzin"],
"Quần giữ nhiệt": ["quần giữ nhiệt", "quần heattech"],
"Quần dài": ["quần dài", "quần suông", "quần ống rộng", "quần ống suông", "quần lưng thun"],
"Quần culottes": ["quần culottes", "culottes", "quần lửng ống rộng"],
"Quần Body": ["quần body", "quần ôm"],
"Pyjama": ["pyjama", "pajama", "đồ pijama"],
"Mũ": ["mũ", "nón", "phụ kiện Canifa"],
"Tất": ["tất", "vớ", "bao chân", "vớ chân", "tất chân"],
"Quần tất": ["quần tất", "quần vớ", "tất quần"],
"Mũ thể thao": ["mũ thể thao", "mũ snapback", "mũ lưỡi trai", "cap"],
"Khẩu trang": ["khẩu trang", "mask", "mặt nạ vải"],
"Túi xách": ["túi xách", "túi"],
}
SYNONYM_TO_DB: dict[str, str] = {}
for db_value, synonyms in PRODUCT_LINE_MAP.items():
for syn in synonyms:
SYNONYM_TO_DB[syn.lower()] = db_value
RELATED_LINES: dict[str, list[str]] = {
"Áo bra active": ["Áo lót"],
"Áo lót": ["Áo bra active"],
"Quần lót": ["Quần lót đùi", "Quần lót tam giác"],
"Quần lót đùi": ["Quần lót", "Quần lót tam giác"],
"Quần lót tam giác": ["Quần lót", "Quần lót đùi"],
}
def get_related_lines(product_line: str) -> list[str]:
return [product_line] + RELATED_LINES.get(product_line, [])
def resolve_product_line(raw_value: str) -> list[str]:
parts = [p.strip() for p in raw_value.split("/") if p.strip()]
resolved = []
for part in parts:
mapped = SYNONYM_TO_DB.get(part.lower())
if mapped:
resolved.append(mapped)
else:
resolved.append(part)
return resolved
This diff is collapsed.
import logging
logger = logging.getLogger(__name__)
SIZE_MAPPING = {
"NU": {
"XS": "Cao 1m47-1m53, Nặng 38-43kg",
"S": "Cao 1m50-1m55, Nặng 41-46kg",
"M": "Cao 1m55-1m63, Nặng 47-52kg",
"L": "Cao 1m60-1m65, Nặng 53-58kg",
"XL": "Cao 1m62-1m66, Nặng 59-64kg",
},
"NAM": {
"S": "Cao 1m62-1m68, Nặng 57-62kg",
"M": "Cao 1m69-1m73, Nặng 63-67kg",
"L": "Cao 1m71-1m75, Nặng 68-72kg",
"XL": "Cao 1m73-1m77, Nặng 73-77kg",
"XXL": "Cao 1m75-1m79, Nặng 78-82kg",
},
"QUAN_NU": {
"26": "Vòng eo 65cm, Vòng mông 79-87cm",
"27": "Vòng eo 67.5cm, Vòng mông 81-89cm",
"28": "Vòng eo 70cm, Vòng mông 84-92cm",
"29": "Vòng eo 72.5cm, Vòng mông 86-94cm",
"30": "Vòng eo 75cm, Vòng mông 89-97cm",
},
"QUAN_NAM": {
"29": "Vòng eo 79.5cm, Vòng mông 96.5cm",
"30": "Vòng eo 82cm, Vòng mông 99cm",
"31": "Vòng eo 84.5cm, Vòng mông 101.5cm",
"32": "Vòng eo 87cm, Vòng mông 104cm",
"33": "Vòng eo 89cm, Vòng mông 106.5cm",
},
"TRE_EM": {
"90": "Dành cho bé 2Y, Cao 90cm, 10-13kg",
"100": "Dành cho bé 3-4Y, Cao 100cm, 14-17kg",
"110": "Dành cho bé 4-5Y, Cao 107-113cm, 18-23kg",
"120": "Dành cho bé 6-7Y, Cao 120cm, 24-29kg",
"130": "Dành cho bé 8Y, Cao 130cm, 29-33kg",
"140": "Dành cho bé 9-11Y, Cao 137-145cm, 33-39kg",
"150": "Dành cho bé 11-12Y, Cao 150cm, 39-45kg",
"160": "Dành cho bé 13-14Y, Cao 160cm, 45-52kg",
},
"UNISEX": {
"XXS": "Cao 1m55-1m63, Nặng 47-52kg",
"XS": "Cao 1m60-1m65, Nặng 53-58kg",
"S": "Cao 1m62-1m68, Nặng 57-62kg",
"M": "Cao 1m69-1m73, Nặng 63-67kg",
"L": "Cao 1m71-1m75, Nặng 68-72kg",
"XL": "Cao 1m73-1m77, Nặng 73-77kg",
"XXL": "Cao 1m75-1m79, Nặng 79-82kg",
}
}
def determine_table_key(gender: str, product_line: str) -> str:
gender = (gender or "").lower().strip()
product_line = (product_line or "").lower().strip()
is_jeans_or_khaki = any(x in product_line for x in ["jean", "khaki", "kaki", "quần âu", "quần tây"])
is_bottom = "quần" in product_line
if gender in ["boy", "girl", "kid", "bé trai", "bé gái", "trẻ em"]: return "TRE_EM"
if gender == "unisex": return "UNISEX"
if is_bottom and is_jeans_or_khaki:
if gender in ["women", "nu", "nữ", "female"]: return "QUAN_NU"
elif gender in ["men", "nam", "male"]: return "QUAN_NAM"
if gender in ["women", "nu", "nữ", "female"]: return "NU"
elif gender in ["men", "nam", "male"]: return "NAM"
return ""
def build_size_message(gender: str, product_line: str, sizes: list[str], description: str = "") -> str:
if not sizes: return ""
table_key = determine_table_key(gender, product_line)
if not table_key: return ""
mapping = SIZE_MAPPING.get(table_key, {})
size_lines = [f"Size {s.strip().upper()} ({mapping[s.strip().upper()]})" for s in sizes if str(s).strip().upper() in mapping]
if not size_lines: return ""
obj_str = {"NAM":"Sản phẩm Nam", "NU":"Sản phẩm Nữ", "TRE_EM":"Sản phẩm Trẻ em", "UNISEX":"Sản phẩm Unisex", "QUAN_NU":"Quần Nữ", "QUAN_NAM":"Quần Nam"}.get(table_key, "Sản phẩm")
base_msg = f"{obj_str}. Các size ĐANG CÒN HÀNG: {'; '.join(size_lines)}."
desc_lower = (description or "").lower()
if any(x in desc_lower for x in ["dáng ôm", "slim fit", "body"]):
base_msg += "\n\n🔥 Mẹo: Form ôm, nếu thích thoải mái nên nhích lên 1 size."
elif any(x in desc_lower for x in ["oversize", "rộng rãi"]):
base_msg += "\n\n🔥 Mẹo: Form rộng, nên lấy đúng size để lên dáng thụng đẹp."
return base_msg
import time
import httpx
import logging
logger = logging.getLogger(__name__)
CANIFA_STOCK_API = "https://canifa.com/v1/middleware/stock_get_stock_list_parent"
_STOCK_TIMEOUT = 2.0 # seconds
async def fetch_stock_batch(base_codes: list[str]) -> dict[str, list[dict]]:
"""
Gọi API Canifa để lấy thông tin tồn kho cho một danh sách SKU.
"""
if not base_codes:
return {}
sku_string = ",".join(base_codes)
url = f"{CANIFA_STOCK_API}?skus={sku_string}"
t0 = time.time()
try:
async with httpx.AsyncClient(timeout=_STOCK_TIMEOUT) as client:
resp = await client.get(url)
resp.raise_for_status()
data = resp.json()
elapsed = round((time.time() - t0) * 1000)
stock_map: dict[str, list[dict]] = {}
results = data.get("result", [])
if isinstance(results, list):
for item in results:
sku_full = item.get("sku", "")
qty = item.get("qty", 0) or 0
if not sku_full:
continue
# SKU thường có định dạng BASE-COLOR-SIZE
parts = sku_full.rsplit("-", 1)
if len(parts) == 2:
color_code = parts[0]
size = parts[1]
status = "còn hàng" if qty > 0 else "hết hàng"
stock_map.setdefault(color_code, []).append({
"size": size, "qty": qty, "status": status
})
logger.info(f"📦 [StockAPI] {len(base_codes)} SKUs -> {len(stock_map)} colors in {elapsed}ms")
return stock_map
except Exception as e:
logger.warning(f"⚠️ [StockAPI] Error or Timeout: {e}")
return {}
async def enrich_with_stock(products: list[dict]) -> list[dict]:
"""
Lọc danh sách sản phẩm, chỉ giữ lại những món còn hàng.
"""
if not products:
return []
# Lấy danh sách base code duy nhất (ví dụ: 8AT23S001)
base_codes = list({p.get("internal_ref_code", "") for p in products if p.get("internal_ref_code")})
if not base_codes:
return products
stock_map = await fetch_stock_batch(base_codes)
if not stock_map:
return products # Nếu API lỗi thì cứ trả về hết để AI tư vấn
in_stock = []
for p in products:
# product_color_code là mã màu cụ thể (ví dụ: 8AT23S001-SB001)
color_code = p.get("product_color_code", "")
if color_code in stock_map:
sizes_detail = stock_map[color_code]
total_qty = sum(s["qty"] for s in sizes_detail)
if total_qty > 0:
p["_stock_detail"] = sizes_detail
p["_total_qty"] = total_qty
in_stock.append(p)
# Nếu lọc xong mà trắng tay thì trả về 5 thằng đầu tiên để AI còn có cái mà nói
return in_stock if in_stock else products[:5]
"""
constants.py — Single Source of Truth cho toàn bộ hằng số dùng chung.
QUY TẮC:
- Hằng số viết HOA toàn bộ (UPPERCASE).
- Đặt ngay dưới phần import.
- Mọi file khác import từ đây, KHÔNG hardcode lại.
"""
import os
# ═══════════════════════════════════════════════════════════════
# DATABASE PATHS
# ═══════════════════════════════════════════════════════════════
_BACKEND_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
_PROJECT_ROOT = os.path.dirname(_BACKEND_DIR) # D:\cnf\chatbot_canifa
SQLITE_DB_PATH: str = os.path.join(_PROJECT_ROOT, "preference", "common", "database", "canifa_ai_dump.sqlite")
# ═══════════════════════════════════════════════════════════════
# SQLITE TABLE NAMES (mapping schema.table → SQLite table name)
# ═══════════════════════════════════════════════════════════════
TABLE_OUTFIT_RULES = "pg__dashboard_canifa__chatbot_fashion_rules"
TABLE_FASHION_RULES = "pg__dashboard_canifa__chatbot_fashion_rules"
TABLE_OUTFIT_MATCHES = "pg__dashboard_canifa__ai_outfit_product_matches"
TABLE_PRODUCTS = "sr__test_db__magento_product_dimension_with_text_embedding"
# ═══════════════════════════════════════════════════════════════
# GENDER MAPPING (Vietnamese / English → DB key)
# ═══════════════════════════════════════════════════════════════
SQLITE_GENDER_MAP: dict[str, str] = {
"nu": "women",
"nam": "men",
"be_gai": "girl",
"be_trai": "boy",
"unisex": "unisex",
}
# ═══════════════════════════════════════════════════════════════
# KID DETECTION KEYWORDS
# ═══════════════════════════════════════════════════════════════
KID_SAFE_KEYWORDS: list[str] = [
"kid", "trẻ em", "bé trai", "bé gái", "be_gai", "be_trai", "tre_em",
]
def normalize_gender(gender: str) -> str:
"""Chuẩn hoá chuỗi gender về key nội bộ (nu/nam/be_gai/be_trai/unisex)."""
g = (gender or "").lower().strip()
if g in ("nu", "nữ", "women", "female", "woman"):
return "nu"
if g in ("nam", "men", "male", "man"):
return "nam"
if any(k in g for k in ["bé gái", "be_gai", "girl", "be gai"]):
return "be_gai"
if any(k in g for k in ["bé trai", "be_trai", "boy", "be trai"]):
return "be_trai"
return "unisex"
...@@ -41,7 +41,7 @@ class ConversationManager: ...@@ -41,7 +41,7 @@ class ConversationManager:
"""Create the chat history table if it doesn't exist""" """Create the chat history table if it doesn't exist"""
try: try:
pool = await self._get_pool() pool = await self._get_pool()
async with pool.connection() as conn: async with pool.connection(timeout=2.0) as conn:
async with conn.cursor() as cursor: async with conn.cursor() as cursor:
# Set timezone to Vietnam for this session # Set timezone to Vietnam for this session
await cursor.execute("SET timezone = 'Asia/Ho_Chi_Minh'") await cursor.execute("SET timezone = 'Asia/Ho_Chi_Minh'")
...@@ -81,7 +81,7 @@ class ConversationManager: ...@@ -81,7 +81,7 @@ class ConversationManager:
vietnam_tz = timezone(timedelta(hours=7)) vietnam_tz = timezone(timedelta(hours=7))
timestamp = datetime.now(vietnam_tz) timestamp = datetime.now(vietnam_tz)
# Transaction block: atomic insert # Transaction block: atomic insert
async with pool.connection() as conn: async with pool.connection(timeout=2.0) as conn:
async with conn.cursor() as cursor: async with conn.cursor() as cursor:
# Set timezone to Vietnam for this session # Set timezone to Vietnam for this session
await cursor.execute("SET timezone = 'Asia/Ho_Chi_Minh'") await cursor.execute("SET timezone = 'Asia/Ho_Chi_Minh'")
...@@ -175,7 +175,7 @@ class ConversationManager: ...@@ -175,7 +175,7 @@ class ConversationManager:
final_query = sql.SQL(" ").join(query_parts) final_query = sql.SQL(" ").join(query_parts)
pool = await self._get_pool() pool = await self._get_pool()
async with pool.connection() as conn, conn.cursor() as cursor: async with pool.connection(timeout=2.0) as conn, conn.cursor() as cursor:
# Set timezone to Vietnam for this session # Set timezone to Vietnam for this session
await cursor.execute("SET timezone = 'Asia/Ho_Chi_Minh'") await cursor.execute("SET timezone = 'Asia/Ho_Chi_Minh'")
...@@ -232,7 +232,7 @@ class ConversationManager: ...@@ -232,7 +232,7 @@ class ConversationManager:
end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=999999) end_of_day = now.replace(hour=23, minute=59, second=59, microsecond=999999)
pool = await self._get_pool() pool = await self._get_pool()
async with pool.connection() as conn: async with pool.connection(timeout=2.0) as conn:
async with conn.cursor() as cursor: async with conn.cursor() as cursor:
query = sql.SQL(""" query = sql.SQL("""
UPDATE {table} UPDATE {table}
...@@ -255,7 +255,7 @@ class ConversationManager: ...@@ -255,7 +255,7 @@ class ConversationManager:
"""Clear all chat history for an identity""" """Clear all chat history for an identity"""
try: try:
pool = await self._get_pool() pool = await self._get_pool()
async with pool.connection() as conn: async with pool.connection(timeout=2.0) as conn:
async with conn.cursor() as cursor: async with conn.cursor() as cursor:
query = sql.SQL("DELETE FROM {table} WHERE identity_key = %s").format( query = sql.SQL("DELETE FROM {table} WHERE identity_key = %s").format(
table=sql.Identifier(self.table_name) table=sql.Identifier(self.table_name)
......
import logging
import os
from contextlib import asynccontextmanager
from typing import Optional
import aiosqlite
logger = logging.getLogger(__name__)
class SQLiteDBManager:
"""
Quản lý kết nối tới file SQLite database (canifa_ai_dump.sqlite).
Được thiết kế theo Singleton pattern để dùng thống nhất trong toàn bộ App FastAPI.
Đã được chuyển đổi sang aiosqlite (bất đồng bộ).
"""
_instance = None
_db_path = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(SQLiteDBManager, cls).__new__(cls)
from common.constants import SQLITE_DB_PATH
cls._instance._db_path = SQLITE_DB_PATH
return cls._instance
@property
def db_path(self) -> str:
return self._db_path
@db_path.setter
def db_path(self, path: str):
self._db_path = path
async def get_connection(self) -> aiosqlite.Connection:
"""
Khởi tạo và trả về 1 kết nối aiosqlite.
"""
if not os.path.exists(self._db_path):
logger.warning(f"[SQLite] Cảnh báo: File database không tồn tại tại {self._db_path}")
conn = await aiosqlite.connect(self._db_path)
# Row factory giúp lấy dữ liệu dưới dạng Dictionary
conn.row_factory = aiosqlite.Row
return conn
@asynccontextmanager
async def session(self):
"""
Context manager để tự động đóng Connection sau khi dùng xong.
Sử dụng:
async with sqlite_db.session() as conn:
await conn.execute(...)
"""
conn = await self.get_connection()
try:
yield conn
finally:
await conn.close()
async def fetch_all(self, query: str, params: tuple = ()) -> list[dict]:
"""Hàm tiện ích: Lọc toàn bộ danh sách, trả về list chứa các dict."""
async with self.session() as conn:
async with conn.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def fetch_one(self, query: str, params: tuple = ()) -> Optional[dict]:
"""Hàm tiện ích: Lấy duy nhất 1 item đầu tiên, trả về dict."""
async with self.session() as conn:
async with conn.execute(query, params) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def execute(self, query: str, params: tuple = ()) -> int:
"""Hàm tiện ích: Chạy các truy vấn INSERT/UPDATE/DELETE và trả về số dòng bị ảnh hưởng."""
async with self.session() as conn:
async with conn.execute(query, params) as cursor:
await conn.commit()
return cursor.rowcount
# Export một Singleton instance để các file khác import vào dùng luôn
# Cách dùng: from common.sqlite_db import sqlite_db
sqlite_db = SQLiteDBManager()
"""
sqlite_mock.py — Postgres Connection Interceptor (SQLite Fallback Layer)
KIẾN TRÚC:
- Module này giả lập các Psycopg2/Psycopg3 Connection/Cursor cho môi trường dev.
- MockCursor.execute() là SYNCHRONOUS (dùng sqlite3 trực tiếp).
- Lý do KHÔNG dùng aiosqlite ở đây: UltraDescriptionDB, DescFieldConfig và các
class helper khác gọi cursor.execute() theo cách ĐỒNG BỘ (classmethods thường).
Thay đổi sang async sẽ phá vỡ >24 method trong ultra_desc_db.py và nhiều hơn nữa.
LUỒNG ĐÚng:
- async API routes (FastAPI) dùng `sqlite_db.py` (aiosqlite) TRỰc TIẾP.
- Các helper class (UltraDescriptionDB, v.v.) gọi qua `pool_wrapper.get_pooled_connection_compat()`
→ trả về MockConnection → MockCursor SYNC.
"""
import logging
import re
import sqlite3
from contextlib import contextmanager
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# DB Path (trỏ đến cùng file với sqlite_db.py)
# ---------------------------------------------------------------------------
from common.constants import SQLITE_DB_PATH
def _get_sync_conn() -> sqlite3.Connection:
"""Trả về kết nối sqlite3 ĐỒNG BỘ. Dùng cho MockCursor."""
conn = sqlite3.connect(SQLITE_DB_PATH)
conn.row_factory = sqlite3.Row
return conn
# ---------------------------------------------------------------------------
# SQL Dialect Translator: Postgres/StarRocks → SQLite
# ---------------------------------------------------------------------------
def translate_query(query: str) -> str:
"""
Dịch câu SQL đặc thù của Postgres/StarRocks sang dạng mà SQLite hiểu được.
"""
# 1. Thay thế Placeholder của Postgres (%s) thành của SQLite (?)
q = re.sub(r'(?<!%)%s(?!%)', '?', query)
# 2. Thay thế Postgres Schema (dashboard_canifa)
q = re.sub(r'"dashboard_canifa"\."([a-zA-Z0-9_]+)"', r'pg__dashboard_canifa__\1', q)
q = re.sub(r'dashboard_canifa\.([a-zA-Z0-9_]+)', r'pg__dashboard_canifa__\1', q)
# Thay thế Postgres Schema (canifa_chat)
q = re.sub(r'"canifa_chat"\."([a-zA-Z0-9_]+)"', r'pg__canifa_chat__\1', q)
q = re.sub(r'canifa_chat\.([a-zA-Z0-9_]+)', r'pg__canifa_chat__\1', q)
# Thay thế Postgres Schema (public)
q = re.sub(r'"public"\."([a-zA-Z0-9_]+)"', r'pg__public__\1', q)
q = re.sub(r'public\.([a-zA-Z0-9_]+)', r'pg__public__\1', q)
# 3. Thay thế cấu trúc của StarRocks
q = re.sub(
r'([a-zA-Z0-9_]+\.)?`?magento_product_dimension_with_text_embedding`?',
r'sr__test_db__magento_product_dimension_with_text_embedding',
q
)
# 4. Vá các ngữ pháp (Dialect) bị lệch pha
# ANY_VALUE(col) -> MAX(col)
q = re.sub(r'ANY_VALUE\s*\(', r'MAX(', q, flags=re.IGNORECASE)
# MAX_BY(col, score) -> MAX(col)
q = re.sub(r'MAX_BY\s*\(\s*([^,]+)\s*,\s*[^)]+\)', r'MAX(\1)', q, flags=re.IGNORECASE)
# IF(cond, true, false) -> IIF(cond, true, false)
q = re.sub(r'\bIF\s*\(', r'IIF(', q, flags=re.IGNORECASE)
# 5. Handle StarRocks vector similarity (dummy value)
q = re.sub(
r'approx_cosine_similarity\s*\(\s*vector\s*,\s*\[[^\]]+\]\s*\)',
r'0.5',
q, flags=re.IGNORECASE
)
# 6. Strip ORDER BY ... NULLS LAST
q = re.sub(r'NULLS\s+LAST', '', q, flags=re.IGNORECASE)
# 7. Strip PostgreSQL type casts (::jsonb, ::text, ::int, etc.)
q = re.sub(r'::[a-zA-Z_]+(?:\[\])?', '', q)
# 8. NOW() → CURRENT_TIMESTAMP
q = re.sub(r'\bNOW\s*\(\s*\)', 'CURRENT_TIMESTAMP', q, flags=re.IGNORECASE)
# 9. ILIKE → LIKE
q = re.sub(r'\bILIKE\b', 'LIKE', q, flags=re.IGNORECASE)
# 10. TRUE/FALSE literals → 1/0
q = re.sub(r'\bTRUE\b', '1', q, flags=re.IGNORECASE)
q = re.sub(r'\bFALSE\b', '0', q, flags=re.IGNORECASE)
# 11. ON CONFLICT DO NOTHING (SQLite supports this, keep as-is)
# 12. RETURNING id → SQLite không hỗ trợ RETURNING. Strip nó.
q = re.sub(r'\s+RETURNING\s+\S+', '', q, flags=re.IGNORECASE)
return q
# ---------------------------------------------------------------------------
# MockCursor — SYNCHRONOUS, dùng sqlite3 trực tiếp
# ---------------------------------------------------------------------------
class MockRow(tuple):
"""
Giả lập psycopg2 Row / aiomysql DictCursor kết hợp.
Hỗ trợ truy cập qua chỉ mục (row[0]) và qua tên cột (row['id']).
"""
def __new__(cls, cols, values):
return super().__new__(cls, values)
def __init__(self, cols, values):
self._dict = dict(zip(cols, values))
def __getitem__(self, key):
if isinstance(key, str):
return self._dict[key]
return super().__getitem__(key)
def keys(self):
return self._dict.keys()
def get(self, key, default=None):
return self._dict.get(key, default)
class MockCursor:
"""
Giả lập Psycopg2/3 Cursor, chạy ĐỒNG BỘ trên SQLite.
KHÔNG chuyển sang async để tránh phá vỡ:
- UltraDescriptionDB (24+ sync classmethods)
- DescFieldConfig (7+ sync classmethods)
- auth/models.py, agent/sql_agent/persistence.py, v.v.
"""
def __init__(self):
self._rows: list = []
self.rowcount: int = 0
self.description: list = []
self._last_insert_id: int | None = None
def execute(self, query: str, params: tuple = None):
"""Chạy truy vấn SQL — ĐỒNG BỘ, sqlite3 trực tiếp."""
if params is None:
params = ()
tq = translate_query(query)
logger.debug("[SQLITE MOCK] Query: %.120s", tq)
conn = _get_sync_conn()
try:
cur = conn.cursor()
cur.execute(tq, params)
is_select = tq.strip().upper().startswith("SELECT")
is_insert = tq.strip().upper().startswith("INSERT")
if is_select:
fetched = cur.fetchall()
if cur.description:
cols = [d[0] for d in cur.description]
self._rows = [MockRow(cols, row) for row in fetched]
self.description = [(d[0],) for d in cur.description]
else:
self._rows = []
self.rowcount = len(self._rows)
else:
conn.commit()
self.rowcount = cur.rowcount
if is_insert:
self._last_insert_id = cur.lastrowid
# Giả lập RETURNING id → trả về lastrowid dưới dạng hàng đầu tiên
self._rows = [MockRow(["id"], [cur.lastrowid])] if cur.lastrowid else []
else:
self._rows = []
except Exception as e:
logger.error("[SQLITE MOCK] Error: %s | Query: %.200s", e, tq)
self._rows = []
self.rowcount = 0
finally:
conn.close()
def executemany(self, query: str, params_list):
"""Chạy nhiều tham số — ĐỒNG BỘ."""
tq = translate_query(query)
conn = _get_sync_conn()
try:
cur = conn.cursor()
cur.executemany(tq, params_list)
conn.commit()
self.rowcount = cur.rowcount
self._rows = []
except Exception as e:
logger.error("[SQLITE MOCK] executemany error: %s", e)
finally:
conn.close()
def fetchall(self) -> list:
return self._rows
def fetchone(self) -> dict | tuple | None:
if self._rows:
return self._rows[0]
return None
def close(self):
pass
# Hỗ trợ `with cursor:` (sync context manager)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
# ---------------------------------------------------------------------------
# MockConnection
# ---------------------------------------------------------------------------
class MockConnection:
"""
Giả lập psycopg2/3 Connection.
Trả về MockCursor đồng bộ khi gọi .cursor().
"""
def cursor(self) -> MockCursor:
return MockCursor()
def commit(self):
pass # Commit được thực hiện trong MockCursor.execute()
def rollback(self):
pass
def close(self):
pass
@contextmanager
def transaction(self):
yield self
# ---------------------------------------------------------------------------
# Factory functions — dùng bởi db_pool.py và pool_wrapper.py
# ---------------------------------------------------------------------------
@contextmanager
def get_mock_pg_conn():
"""
Context manager: mock kết nối PostgreSQL → SQLite.
Dùng bởi db_pool.CanifaDbPool.get_conn() khi USE_LOCAL_SQLITE=true.
"""
logger.info("🛡️ [SQLITE MOCK] Đánh chặn Postgres connection → SQLite local")
yield MockConnection()
def get_mock_pg_conn_compat() -> MockConnection:
"""
Trực tiếp trả về MockConnection (không context manager).
Dùng bởi pool_wrapper.get_pooled_connection_compat() khi USE_LOCAL_SQLITE=true.
"""
logger.info("🛡️ [SQLITE MOCK COMPAT] Đánh chặn Postgres connection → SQLite local")
return MockConnection()
import requests
import json
import time
BASE_URL = "http://localhost:5000/api/agent/chat-dev"
def test_query(query, description):
print(f"\n{'='*60}")
print(f"TEST: {description}")
print(f"QUERY: '{query}'")
print(f"{'='*60}")
payload = {
"user_query": query,
}
try:
start_time = time.time()
response = requests.post(BASE_URL, json=payload, timeout=60)
elapsed = time.time() - start_time
if response.status_code == 200:
data = response.json()
print(f"✅ Success ({elapsed:.2f}s)")
print(f"🤖 Response: {data.get('ai_response')}")
p_ids = data.get('product_ids', [])
print(f"📦 Product IDs ({len(p_ids)}): {', '.join(p_ids[:5])}{'...' if len(p_ids)>5 else ''}")
# Check diagnostics if available (some versions return it)
if "diagnostics" in data:
for d in data["diagnostics"]:
print(f" - {d.get('label')}: {d.get('elapsed_ms')}ms")
else:
print(f"❌ Error {response.status_code}: {response.text}")
except Exception as e:
print(f"⚠️ Connection failed: {e}")
if __name__ == "__main__":
print("🚀 Starting Canifa AI Stylist API Verification...")
# 1. Test Product Search (SQLite Mock Logic)
test_query("Tìm áo phông cho bé trai", "Product Search (SQLite Mock)")
# 2. Test Promotion (StarRocks Production)
test_query("Hôm nay có khuyến mãi gì hot không?", "Promotion Search (StarRocks)")
# 3. Test Knowledge (StarRocks Production)
test_query("Chính sách đổi trả của Canifa thế nào?", "Knowledge Search (StarRocks)")
# 4. Test Store (StarRocks Production)
test_query("Tìm cửa hàng ở Cầu Giấy", "Store Search (StarRocks)")
print("\n✅ Verification sequence completed.")
...@@ -122,4 +122,5 @@ if __name__ == "__main__": ...@@ -122,4 +122,5 @@ if __name__ == "__main__":
reload=ENABLE_RELOAD, reload=ENABLE_RELOAD,
reload_dirs=reload_dirs, reload_dirs=reload_dirs,
log_level="info", log_level="info",
loop="asyncio" if platform.system() == "Windows" else "auto"
) )
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment