Commit c05cc0ce authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

feat: refactor product data retrieval - Fix URL parsing in...

feat: refactor product data retrieval - Fix URL parsing in description_text_full - Update system_prompt to require product_ids in responses - Remove graph singleton for hot reload support - Improve product enrichment logic
parent 312e2567
......@@ -129,10 +129,10 @@ class CANIFAGraph:
_instance: list[CANIFAGraph | None] = [None]
def build_graph(config: AgentConfig | None = None, llm: BaseChatModel | None = None, tools: list | None = None) -> Any:
"""Get compiled graph (singleton)."""
if _instance[0] is None:
_instance[0] = CANIFAGraph(config, llm, tools)
return _instance[0].build()
"""Get compiled graph (always fresh to pick up prompt changes)."""
# Always create new instance to pick up prompt changes during hot reload
instance = CANIFAGraph(config, llm, tools)
return instance.build()
def get_graph_manager(
......
......@@ -31,20 +31,31 @@ def extract_product_ids(messages: list) -> list[dict]:
# Tool result is JSON string
tool_result = json.loads(msg.content)
# Check if tool returned products
if tool_result.get("status") == "success" and "products" in tool_result:
for product in tool_result["products"]:
sku = product.get("internal_ref_code")
# Check if tool returned products (new format with "results" wrapper)
if tool_result.get("status") == "success":
# Handle both direct "products" and nested "results" format
product_list = []
if "results" in tool_result:
# New format: {"results": [{"products": [...]}]}
for result_item in tool_result["results"]:
product_list.extend(result_item.get("products", []))
elif "products" in tool_result:
# Legacy format: {"products": [...]}
product_list = tool_result["products"]
for product in product_list:
sku = product.get("sku") or product.get("internal_ref_code")
if sku and sku not in seen_skus:
seen_skus.add(sku)
# Extract full product info
# Extract full product info (already parsed by tool)
product_obj = {
"sku": sku,
"name": product.get("magento_product_name", ""),
"price": product.get("price_vnd", 0),
"sale_price": product.get("sale_price_vnd"), # null nếu không sale
"url": product.get("magento_url_key", ""),
"name": product.get("name", ""),
"price": product.get("price", 0),
"sale_price": product.get("sale_price"),
"url": product.get("url", ""),
"thumbnail_image_url": product.get("thumbnail_image_url", ""),
}
products.append(product_obj)
......@@ -55,32 +66,53 @@ def extract_product_ids(messages: list) -> list[dict]:
return products
def parse_ai_response(ai_raw_content: str, all_product_ids: list) -> tuple[str, list]:
def parse_ai_response(ai_raw_content: str, all_products: list) -> tuple[str, list]:
"""
Parse AI response từ LLM output.
Parse AI response từ LLM output và map SKUs với product data.
Flow:
- LLM trả về: {"ai_response": "...", "product_ids": ["SKU1", "SKU2"]}
- all_products: List products enriched từ tool messages
- Map SKUs → enriched products
Args:
ai_raw_content: Raw content từ AI response
all_product_ids: Product IDs extracted từ tool messages
all_products: Products extracted từ tool messages (đã có đầy đủ info)
Returns:
tuple: (ai_text_response, final_product_ids)
tuple: (ai_text_response, final_products)
"""
ai_text_response = ai_raw_content
final_product_ids = all_product_ids
final_products = all_products # Default: trả về tất cả products từ tool
try:
# Try to parse if it's a JSON string from LLM
ai_json = json.loads(ai_raw_content)
ai_text_response = ai_json.get("ai_response", ai_raw_content)
explicit_ids = ai_json.get("product_ids", [])
if explicit_ids and isinstance(explicit_ids, list):
# Replace with explicit IDs from LLM
final_product_ids = explicit_ids
explicit_skus = ai_json.get("product_ids", [])
if explicit_skus and isinstance(explicit_skus, list):
# LLM trả về list SKUs → Map với products đã có
# Build lookup dict từ all_products
product_lookup = {p["sku"]: p for p in all_products if p.get("sku")}
# Map SKUs → enriched products
mapped_products = []
for sku in explicit_skus:
if isinstance(sku, str) and sku in product_lookup:
mapped_products.append(product_lookup[sku])
elif isinstance(sku, dict):
# LLM có thể trả về dict (legacy) → giữ nguyên
mapped_products.append(sku)
if mapped_products:
final_products = mapped_products
# Nếu không map được → giữ all_products
except (json.JSONDecodeError, TypeError):
pass
return ai_text_response, final_product_ids
return ai_text_response, final_products
def prepare_execution_context(query: str, user_id: str, history: list, images: list | None):
......
This diff is collapsed.
......@@ -200,21 +200,60 @@ async def _execute_single_search(db, item: SearchItem, query_vector: list[float]
def _format_product_results(products: list[dict]) -> list[dict]:
"""Lọc và format kết quả trả về cho Agent."""
"""Lọc và format kết quả trả về cho Agent - Parse description_text_full thành structured fields."""
max_items = 15
formatted: list[dict] = []
for p in products[:max_items]:
desc_full = p.get("description_text_full", "")
# Parse các field từ description_text_full
parsed = _parse_description_text(desc_full)
formatted.append(
{
"internal_ref_code": p.get("internal_ref_code"),
# Chuỗi text dài, đã bao gồm: product_name, master_color, image, web_url, material, style, ...
"description_text": p.get("description_text_full"),
"sku": p.get("internal_ref_code"),
"name": parsed.get("product_name", ""),
"price": p.get("original_price"),
"sale_price": p.get("sale_price"),
"original_price": p.get("original_price"),
"url": parsed.get("product_web_url", ""),
"thumbnail_image_url": parsed.get("product_image_url_thumbnail", ""),
"discount_amount": p.get("discount_amount"),
"max_score": p.get("max_score"),
}
)
return formatted
def _parse_description_text(desc: str) -> dict:
"""
Parse description_text_full thành dict các field.
Format: "product_name: X. master_color: Y. product_web_url: https://canifa.com/... ..."
"""
import re
result = {}
if not desc:
return result
# Extract product_name: từ đầu đến ". master_color:" hoặc ". product_image_url:"
name_match = re.search(r"product_name:\s*(.+?)\.(?:\s+master_color:|$)", desc)
if name_match:
result["product_name"] = name_match.group(1).strip()
# Extract product_image_url_thumbnail: từ field name đến ". product_web_url:"
thumb_match = re.search(r"product_image_url_thumbnail:\s*(https?://[^\s]+?)\.(?:\s+product_web_url:|$)", desc)
if thumb_match:
result["product_image_url_thumbnail"] = thumb_match.group(1).strip()
# Extract product_web_url: từ field name đến ". description_text:"
url_match = re.search(r"product_web_url:\s*(https?://[^\s]+?)\.(?:\s+description_text:|$)", desc)
if url_match:
result["product_web_url"] = url_match.group(1).strip()
# Extract master_color: từ field name đến ". product_image_url:"
color_match = re.search(r"master_color:\s*(.+?)\.(?:\s+product_image_url:|$)", desc)
if color_match:
result["master_color"] = color_match.group(1).strip()
return result
......@@ -176,61 +176,3 @@ async def build_starrocks_query(params, query_vector: list[float] | None = None)
"""
return sql
# ============================================================
# TEMPORARILY COMMENTED OUT - save_query_to_log
# ============================================================
# async def save_query_to_log(sql: str):
# """Lưu query full vào file hyde_pure_query.txt."""
# import os
# log_path = r"D:\cnf\chatbot_canifa\backend\logs\hyde_pure_query.txt"
# try:
# log_dir = os.path.dirname(log_path)
# if not os.path.exists(log_dir):
# os.makedirs(log_dir)
# with open(log_path, "w", encoding="utf-8") as f:
# f.write(sql)
# print(f"💾 Full Query saved to: {log_path}")
# except Exception as e:
# print(f"Save query log failed: {e}")
# ============================================================
# TEMPORARILY COMMENTED OUT - save_preview_to_log
# ============================================================
# async def save_preview_to_log(search_query: str, products: list[dict]):
# """Lưu kết quả DB trả về vào db_preview.txt (Format đẹp cho AI)."""
# import os
# preview_path = r"D:\cnf\chatbot_canifa\backend\logs\db_preview.txt"
# try:
# log_dir = os.path.dirname(preview_path)
# if not os.path.exists(log_dir):
# os.makedirs(log_dir)
#
# with open(preview_path, "a", encoding="utf-8") as f:
# f.write(f"\n{'='*60}\n")
# f.write(f"⏰ TIME: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
# f.write(f"🔍 SEARCH: {search_query}\n")
# f.write(f"📊 RESULTS COUNT: {len(products)}\n")
# f.write(f"{'-'*60}\n")
#
# if not products:
# f.write("❌ NO PRODUCTS FOUND\n")
# else:
# for idx, p in enumerate(products[:5], 1):
# code = p.get("internal_ref_code", "N/A")
# sale = p.get("sale_price", "N/A")
# orig = p.get("original_price", "N/A")
# disc = p.get("discount_amount", "0")
# score = p.get("max_score", p.get("similarity_score", "N/A"))
# desc = p.get("description_text_full", "No Description")
#
# f.write(f"{idx}. [{code}] Score: {score}\n")
# f.write(f" 💰 Price: {sale} (Orig: {orig}, Disc: {disc}%)\n")
# f.write(f" 📝 Desc: {desc}\n")
#
# f.write(f"{'='*60}\n")
# print(f"💾 DB Preview (Results) saved to: {preview_path}")
# except Exception as e:
# print(f"Save preview log failed: {e}")
......@@ -15,12 +15,9 @@ from config import (
logger = logging.getLogger(__name__)
# ====================== CACHE CONFIGURATION ======================
# Layer 1: Response Cache (Short TTL to keep stock/price safe)
DEFAULT_RESPONSE_TTL = 300 # 5 minutes
RESPONSE_KEY_PREFIX = "resp_cache:"
# Layer 2: Embedding Cache (Long TTL since vectors are static)
EMBEDDING_CACHE_TTL = 86400 # 24 hours
EMBEDDING_KEY_PREFIX = "emb_cache:"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment