Commit 6ce1554b authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

refactor: clean up controller logs + improve stock tool prompt

parent 178108a8
This diff is collapsed.
...@@ -109,39 +109,29 @@ async def finalize_user_insight_after_stream( ...@@ -109,39 +109,29 @@ async def finalize_user_insight_after_stream(
stream_task, stream_task,
streaming_callback, streaming_callback,
identity_key: str, identity_key: str,
trace_id: str, trace_id: str = "",
) -> None: ) -> None:
""" """
Background task: Wait for stream to complete, then extract and save user_insight. Background task: Wait for stream to complete, then extract and save user_insight.
Links to main trace using trace_context for proper Langfuse tracing. NOTE: Runs outside Langfuse trace context — no need to log to Langfuse.
Args:
stream_task: The asyncio task for graph streaming
streaming_callback: ProductIDStreamingCallback instance
identity_key: User/device identifier
trace_id: Langfuse trace ID to link background task
""" """
# Clear Langfuse observation context to avoid auto-tracing in background
try: try:
from langfuse import get_client from langfuse._context import _observation_stack_context
_observation_stack_context.set([])
except Exception:
pass # Langfuse version may not have this; safe to ignore
langfuse = get_client() try:
# Link background task to same trace using trace_context
with langfuse.start_as_current_observation(
name="extract-user-insight-background", trace_context={"trace_id": trace_id}
) as span:
# Wait for stream to complete # Wait for stream to complete
await stream_task await stream_task
# Extract user insight from accumulated content # Extract user insight from accumulated content
full_content = streaming_callback.accumulated_content full_content = streaming_callback.accumulated_content
if full_content and identity_key: if full_content and identity_key:
span.update(input={"identity_key": identity_key, "content_length": len(full_content)}) await extract_and_save_user_insight(full_content, identity_key)
result = await extract_and_save_user_insight(full_content, identity_key)
span.update(output=result)
else: else:
logger.warning("⚠️ [Background] No content to extract user_insight from") logger.warning("⚠️ [Background] No content to extract user_insight from")
span.update(output={"status": "no_content"})
except Exception as exc: except Exception as exc:
logger.error(f"❌ [Background] user_insight finalize failed: {exc}") logger.error(f"❌ [Background] user_insight finalize failed: {exc}")
......
...@@ -21,7 +21,7 @@ from langgraph.types import CachePolicy ...@@ -21,7 +21,7 @@ from langgraph.types import CachePolicy
from common.llm_factory import create_llm from common.llm_factory import create_llm
from .models import AgentConfig, AgentState, get_config from .models import AgentConfig, AgentState, get_config
from .prompt import get_last_modified, get_system_prompt, get_system_prompt_template from .prompt_utils import get_system_prompt_template
from .tools.get_tools import get_all_tools, get_collection_tools from .tools.get_tools import get_all_tools, get_collection_tools
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -48,11 +48,13 @@ class CANIFAGraph: ...@@ -48,11 +48,13 @@ class CANIFAGraph:
self.retrieval_tools = self.all_tools self.retrieval_tools = self.all_tools
self.llm_with_tools = self.llm.bind_tools(self.all_tools, strict=True) self.llm_with_tools = self.llm.bind_tools(self.all_tools, strict=True)
# Lưu template với {date_str} placeholder → inject dynamic mỗi request self.cache = InMemoryCache()
self.system_prompt_template = get_system_prompt_template()
self.prompt_template = ChatPromptTemplate.from_messages( def _build_chain(self, system_prompt_template: str):
"""Build chain with dynamic system prompt (fetched from Langfuse per request)."""
prompt_template = ChatPromptTemplate.from_messages(
[ [
("system", self.system_prompt_template), ("system", system_prompt_template),
( (
"system", "system",
"===== USER INSIGHT (TỪ TURN TRƯỚC) =====\n⚡ BẮT BUỘC: Đọc [NEXT] bên dưới và THỰC HIỆN chiến lược đã lên kế hoạch!\n\n{user_insight}\n=====================================", "===== USER INSIGHT (TỪ TURN TRƯỚC) =====\n⚡ BẮT BUỘC: Đọc [NEXT] bên dưới và THỰC HIỆN chiến lược đã lên kế hoạch!\n\n{user_insight}\n=====================================",
...@@ -64,9 +66,7 @@ class CANIFAGraph: ...@@ -64,9 +66,7 @@ class CANIFAGraph:
MessagesPlaceholder(variable_name="messages"), MessagesPlaceholder(variable_name="messages"),
] ]
) )
return prompt_template | self.llm_with_tools
self.chain = self.prompt_template | self.llm_with_tools
self.cache = InMemoryCache()
async def _agent_node(self, state: AgentState, config: RunnableConfig) -> dict: async def _agent_node(self, state: AgentState, config: RunnableConfig) -> dict:
"""Agent node - Chỉ việc đổ dữ liệu riêng vào khuôn đã có sẵn.""" """Agent node - Chỉ việc đổ dữ liệu riêng vào khuôn đã có sẵn."""
...@@ -104,10 +104,12 @@ class CANIFAGraph: ...@@ -104,10 +104,12 @@ class CANIFAGraph:
or "⚠️ TRẠNG THÁI KHỞI TẠO: Chưa có User Insight từ lịch sử. Hãy bắt đầu thu thập thông tin mới (Nếu thiếu thông tin thì ghi 'Chưa rõ')." or "⚠️ TRẠNG THÁI KHỞI TẠO: Chưa có User Insight từ lịch sử. Hãy bắt đầu thu thập thông tin mới (Nếu thiếu thông tin thì ghi 'Chưa rõ')."
) )
# Inject date_str ĐỘNG mỗi request (không cache từ __init__) # Fetch prompt from Langfuse DYNAMICALLY per request (cache_ttl=0 → 30ms)
current_date_str = datetime.now().strftime("%d/%m/%Y") current_date_str = datetime.now().strftime("%d/%m/%Y")
system_prompt_template = get_system_prompt_template()
chain = self._build_chain(system_prompt_template)
response = await self.chain.ainvoke( response = await chain.ainvoke(
{ {
"date_str": current_date_str, "date_str": current_date_str,
"history": history, "history": history,
...@@ -181,34 +183,23 @@ def build_graph(config: AgentConfig | None = None, llm: BaseChatModel | None = N ...@@ -181,34 +183,23 @@ def build_graph(config: AgentConfig | None = None, llm: BaseChatModel | None = N
def get_graph_manager( def get_graph_manager(
config: AgentConfig | None = None, llm: BaseChatModel | None = None, tools: list | None = None config: AgentConfig | None = None, llm: BaseChatModel | None = None, tools: list | None = None
) -> CANIFAGraph: ) -> CANIFAGraph:
"""Get CANIFAGraph instance (Auto-rebuild if model config changes OR prompt version changed).""" """Get CANIFAGraph instance (Auto-rebuild if model config changes).
current_prompt_version = get_last_modified()
Prompt is now fetched dynamically per request from Langfuse,
so no need to rebuild graph when prompt changes.
"""
# 1. New Instance if Empty # 1. New Instance if Empty
if _instance[0] is None: if _instance[0] is None:
_instance[0] = CANIFAGraph(config, llm, tools) _instance[0] = CANIFAGraph(config, llm, tools)
_instance[0].prompt_version = current_prompt_version logger.info(f"✨ Graph Created: {_instance[0].config.model_name} (prompts from Langfuse)")
logger.info(f"✨ Graph Created: {_instance[0].config.model_name}, prompt_version={current_prompt_version}")
return _instance[0] return _instance[0]
# 2. Check for Config Changes (Model Switch OR Prompt Version Change) # 2. Check for Model Config Changes only
is_model_changed = config and config.model_name != _instance[0].config.model_name is_model_changed = config and config.model_name != _instance[0].config.model_name
is_prompt_changed = current_prompt_version != getattr(_instance[0], "prompt_version", 0)
if is_model_changed or is_prompt_changed:
change_reason = []
if is_model_changed: if is_model_changed:
change_reason.append(f"Model ({_instance[0].config.model_name}->{config.model_name})") logger.info(f"🔄 Rebuilding Graph: Model ({_instance[0].config.model_name}->{config.model_name})")
if is_prompt_changed:
change_reason.append(
f"Prompt Version ({getattr(_instance[0], 'prompt_version', 0)}->{current_prompt_version})"
)
logger.info(f"🔄 Rebuilding Graph due to: {', '.join(change_reason)}")
_instance[0] = CANIFAGraph(config, llm, tools) _instance[0] = CANIFAGraph(config, llm, tools)
_instance[0].prompt_version = current_prompt_version
return _instance[0] return _instance[0]
return _instance[0] return _instance[0]
......
"""
CiCi Fashion Consultant - System Prompt
Tư vấn thời trang CANIFA chuyên nghiệp
Version 3.4 - Simplified Summary History
Last updated: 2026-01-29 11:27
"""
import os
from datetime import datetime
PROMPT_FILE_PATH = os.path.join(os.path.dirname(__file__), "system_prompt.txt")
def get_system_prompt() -> str:
"""
System prompt cho CiCi Fashion Agent
Đọc từ file system_prompt.txt để có thể update dynamic.
Returns:
str: System prompt với ngày hiện tại
"""
now = datetime.now()
date_str = now.strftime("%d/%m/%Y")
try:
if os.path.exists(PROMPT_FILE_PATH):
with open(PROMPT_FILE_PATH, "r", encoding="utf-8") as f:
prompt_template = f.read()
return prompt_template.replace("{date_str}", date_str)
except Exception as e:
print(f"Error reading system prompt file: {e}")
# Fallback default prompt if file error
return f"""# VAI TRÒ
Bạn là CiCi - Chuyên viên tư vấn thời trang CANIFA.
Hôm nay: {date_str}
KHÔNG BAO GIỜ BỊA ĐẶT. TRẢ LỜI NGẮN GỌN.
"""
def get_system_prompt_template() -> str:
"""
Trả về system prompt template CHƯA replace {date_str}.
Dùng cho ChatPromptTemplate để inject ngày động mỗi request.
Returns:
str: System prompt template với placeholder {date_str}
"""
try:
if os.path.exists(PROMPT_FILE_PATH):
with open(PROMPT_FILE_PATH, "r", encoding="utf-8") as f:
return f.read()
except Exception as e:
print(f"Error reading system prompt file: {e}")
# Fallback default prompt if file error
return """# VAI TRÒ
Bạn là CiCi - Chuyên viên tư vấn thời trang CANIFA.
Hôm nay: {date_str}
KHÔNG BAO GIỜ BỊA ĐẶT. TRẢ LỜI NGẮN GỌN.
"""
def get_last_modified() -> float:
"""Trả về timestamp lần sửa cuối cùng của file system_prompt.txt."""
try:
if os.path.exists(PROMPT_FILE_PATH):
return os.path.getmtime(PROMPT_FILE_PATH)
except Exception:
pass
return 0.0
\ No newline at end of file
"""
Prompt Utilities — ALL prompts from Langfuse.
System prompt + Tool prompts, single source of truth.
"""
import os
import logging import logging
from datetime import datetime
from langfuse import Langfuse
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Directory name for tool prompts LANGFUSE_SYSTEM_PROMPT_NAME = "canifa-stylist-system-prompt"
PROMPTS_DIR_NAME = "tool_prompts"
PROMPTS_DIR = os.path.join(os.path.dirname(__file__), PROMPTS_DIR_NAME)
def get_tool_prompt_path(filename: str) -> str: LANGFUSE_TOOL_PROMPT_MAP = {
"""Get absolute path for a tool prompt file.""" "brand_knowledge_tool": "canifa-tool-brand-knowledge",
if not filename.endswith(".txt"): "check_is_stock": "canifa-tool-check-stock",
filename += ".txt" "data_retrieval_tool": "canifa-tool-data-retrieval",
return os.path.join(PROMPTS_DIR, filename) "promotion_canifa_tool": "canifa-tool-promotion",
}
_langfuse_client: Langfuse | None = None
def _get_langfuse() -> Langfuse:
global _langfuse_client
if _langfuse_client is None:
_langfuse_client = Langfuse()
return _langfuse_client
def get_system_prompt() -> str:
"""System prompt với ngày hiện tại đã inject."""
lf = _get_langfuse()
prompt = lf.get_prompt(LANGFUSE_SYSTEM_PROMPT_NAME, label="production", cache_ttl_seconds=0)
return prompt.compile(date_str=datetime.now().strftime("%d/%m/%Y"))
def get_system_prompt_template() -> str:
"""Template chưa replace date_str — dùng cho ChatPromptTemplate."""
lf = _get_langfuse()
prompt = lf.get_prompt(LANGFUSE_SYSTEM_PROMPT_NAME, label="production", cache_ttl_seconds=0)
return prompt.prompt.replace("{{date_str}}", "{date_str}")
def read_tool_prompt(filename: str, default_prompt: str = "") -> str:
"""
Read tool prompt from file.
Returns default_prompt if file not found or empty.
"""
file_path = get_tool_prompt_path(filename)
try:
if os.path.exists(file_path):
with open(file_path, "r", encoding="utf-8") as f:
content = f.read().strip()
if content:
return content
except Exception as e:
logger.error(f"Error reading tool prompt {filename}: {e}")
def read_tool_prompt(filename: str, default_prompt: str = "") -> str:
"""Read tool prompt from Langfuse."""
name_key = filename.replace(".txt", "")
langfuse_name = LANGFUSE_TOOL_PROMPT_MAP.get(name_key)
if not langfuse_name:
logger.warning(f"⚠️ No Langfuse mapping for tool prompt '{name_key}'")
return default_prompt return default_prompt
lf = _get_langfuse()
prompt = lf.get_prompt(langfuse_name, label="production", cache_ttl_seconds=0)
return prompt.prompt
def write_tool_prompt(filename: str, content: str) -> bool: def write_tool_prompt(filename: str, content: str) -> bool:
"""Write content to tool prompt file.""" """Push tool prompt to Langfuse as new version."""
file_path = get_tool_prompt_path(filename) name_key = filename.replace(".txt", "")
try: langfuse_name = LANGFUSE_TOOL_PROMPT_MAP.get(name_key)
os.makedirs(os.path.dirname(file_path), exist_ok=True) if not langfuse_name:
with open(file_path, "w", encoding="utf-8") as f: logger.error(f"No Langfuse mapping for '{name_key}'")
f.write(content)
return True
except Exception as e:
logger.error(f"Error writing tool prompt {filename}: {e}")
return False return False
lf = _get_langfuse()
lf.create_prompt(
name=langfuse_name,
prompt=content,
labels=["production"],
tags=["canifa", "tool-prompt"],
type="text",
)
logger.info(f"✅ Tool prompt '{name_key}' pushed to Langfuse")
return True
def list_tool_prompts() -> list[str]: def list_tool_prompts() -> list[str]:
"""List all available tool prompt files.""" """List available tool prompt names."""
try: return sorted(LANGFUSE_TOOL_PROMPT_MAP.keys())
if not os.path.exists(PROMPTS_DIR):
return []
files = [f for f in os.listdir(PROMPTS_DIR) if f.endswith(".txt")]
return sorted(files)
except Exception as e:
logger.error(f"Error listing tool prompts: {e}")
return []
...@@ -2,6 +2,7 @@ Công cụ KIỂM TRA TỒN KHO sản phẩm CANIFA theo mã sản phẩm. ...@@ -2,6 +2,7 @@ Công cụ KIỂM TRA TỒN KHO sản phẩm CANIFA theo mã sản phẩm.
KHI NÀO GỌI TOOL NÀY: KHI NÀO GỌI TOOL NÀY:
- Khách hỏi "còn hàng không?", "còn size không?", "check tồn kho" - Khách hỏi "còn hàng không?", "còn size không?", "check tồn kho"
- Khách hỏi "có những size nào?", "còn những size nào?", "size chart", "số lượng từng size"
- Khách hỏi về MÃ SẢN PHẨM CỤ THỂ kèm từ khóa tồn kho (vd: "8TS24W001 còn size L không?") - Khách hỏi về MÃ SẢN PHẨM CỤ THỂ kèm từ khóa tồn kho (vd: "8TS24W001 còn size L không?")
- Khách muốn biết số lượng tồn kho của một hoặc nhiều mã sản phẩm - Khách muốn biết số lượng tồn kho của một hoặc nhiều mã sản phẩm
...@@ -30,7 +31,6 @@ CASE 3: KIỂM TRA MÃ KÈM SIZE ...@@ -30,7 +31,6 @@ CASE 3: KIỂM TRA MÃ KÈM SIZE
User: "Mã 6ST25W005-SE091 còn size M và L không?" User: "Mã 6ST25W005-SE091 còn size M và L không?"
-> Gọi check_is_stock với: -> Gọi check_is_stock với:
- skus: "6ST25W005-SE091" - skus: "6ST25W005-SE091"
- sizes: "M,L"
CASE 4: KIỂM TRA MÃ BASE (TỰ EXPAND) CASE 4: KIỂM TRA MÃ BASE (TỰ EXPAND)
User: "6ST25W005 còn màu nào và size nào?" User: "6ST25W005 còn màu nào và size nào?"
...@@ -38,8 +38,23 @@ User: "6ST25W005 còn màu nào và size nào?" ...@@ -38,8 +38,23 @@ User: "6ST25W005 còn màu nào và size nào?"
- skus: "6ST25W005" - skus: "6ST25W005"
(Tool sẽ tự động expand ra tất cả các biến thể từ DB) (Tool sẽ tự động expand ra tất cả các biến thể từ DB)
CÁCH ĐỌC KẾT QUẢ: CASE 5: HỎI CÓ NHỮNG SIZE NÀO / CÒN SIZE NÀO
User: "Áo này có những size nào?" hoặc "Sản phẩm này còn size gì?"
-> Gọi check_is_stock với mã sản phẩm (lấy từ context cuộc hội thoại)
- skus: "6ST25W005-SE091"
CÁCH ĐỌC VÀ TRÌNH BÀY KẾT QUẢ:
- stock_responses: Danh sách tồn kho từng SKU - stock_responses: Danh sách tồn kho từng SKU
- is_in_stock: true/false - còn hàng hay không - is_in_stock: true/false - còn hàng hay không
- qty: số lượng tồn kho - qty: số lượng tồn kho
- Nếu hết hàng -> Gợi ý size/màu khác còn hàng
CÁCH TRÌNH BÀY CHO KHÁCH:
1. Liệt kê RÕ RÀNG từng size kèm tình trạng:
- Size S: Còn hàng (12 sản phẩm)
- Size M: Còn hàng (8 sản phẩm)
- Size L: Hết hàng ❌
- Size XL: Còn hàng (3 sản phẩm)
2. Nếu khách hỏi "có những size nào" → Liệt kê TẤT CẢ size có trong kết quả, kèm số lượng
3. Nếu khách hỏi "còn size nào" → Chỉ liệt kê size CÒN HÀNG (qty > 0)
4. Nếu hết hàng size khách muốn → Gợi ý size/màu khác còn hàng
5. Nếu sản phẩm hoàn toàn hết hàng → Thông báo rõ và gợi ý sản phẩm tương tự
""" """
CANIFA Data Retrieval Tool - Tối giản cho Agentic Workflow. CANIFA Data Retrieval Tool - Tối giản cho Agentic Workflow.
Hỗ trợ Hybrid Search: Semantic (Vector) + Metadata Filter. Hỗ trợ Hybrid Search: Semantic (Vector) + Metadata Filter.
Tracing:
- LangChain CallbackHandler auto-traces tool call (name, input, output)
- Manual span 'db-query-search' in _execute_single_search for SQL timing & product count
(nested under parent LangChain trace via start_as_current_observation)
""" """
import asyncio import asyncio
...@@ -8,7 +13,8 @@ import json ...@@ -8,7 +13,8 @@ import json
import logging import logging
import time import time
from langfuse import observe
from langfuse import get_client as get_langfuse
from langchain_core.tools import tool from langchain_core.tools import tool
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
...@@ -103,12 +109,12 @@ class MultiSearchParams(BaseModel): ...@@ -103,12 +109,12 @@ class MultiSearchParams(BaseModel):
searches: list[SearchItem] = Field(description="Danh sách các truy vấn tìm kiếm") searches: list[SearchItem] = Field(description="Danh sách các truy vấn tìm kiếm")
@observe(name="db-query-search")
async def _execute_single_search( async def _execute_single_search(
db, item: SearchItem, query_vector: list[float] | None = None db, item: SearchItem, query_vector: list[float] | None = None
) -> tuple[list[dict], dict]: ) -> tuple[list[dict], dict]:
""" """
Thực thi một search query đơn lẻ (Async). Thực thi một search query đơn lẻ (Async).
Creates a manual 'db-query-search' span under the current Langfuse context.
Returns: Returns:
Tuple of (products, filter_info) Tuple of (products, filter_info)
...@@ -130,14 +136,15 @@ async def _execute_single_search( ...@@ -130,14 +136,15 @@ async def _execute_single_search(
logger.debug("SQL built, length=%s, build_time_ms=%.2f", len(sql), query_build_time) logger.debug("SQL built, length=%s, build_time_ms=%.2f", len(sql), query_build_time)
if not sql: if not sql:
return [] return [], {"fallback_used": False}
# Timer: execute DB query # Timer: execute DB query
db_start = time.time() db_start = time.time()
products = await db.execute_query_async(sql, params=params) products = await db.execute_query_async(sql, params=params)
db_time = (time.time() - db_start) * 1000 # Convert to ms db_time = (time.time() - db_start) * 1000 # Convert to ms
logger.info( logger.info(
"_execute_single_search done, products=%s, build_ms=%.2f, db_ms=%.2f, total_ms=%.2f", "🔍 _execute_single_search done: query=%r, products=%s, build_ms=%.2f, db_ms=%.2f, total_ms=%.2f",
short_query,
len(products), len(products),
query_build_time, query_build_time,
db_time, db_time,
...@@ -159,26 +166,44 @@ async def _execute_single_search( ...@@ -159,26 +166,44 @@ async def _execute_single_search(
(first_p.get("description_text") or "")[:100], (first_p.get("description_text") or "")[:100],
) )
from langfuse import get_client as get_langfuse # ✅ Nest 'db-query-search' span dưới chat-request trace hiện tại trên Langfuse
try: try:
lf = get_langfuse() lf = get_langfuse()
lf.update_current_observation( if lf:
with lf.start_as_current_observation(
as_type="span",
name="db-query-search",
) as span:
span.update(
input={ input={
"description": item.description[:200] if item.description else None, "description": item.description[:200] if item.description else None,
"product_name": item.product_name, "product_name": item.product_name,
"gender": item.gender_by_product, "gender": item.gender_by_product,
"age": item.age_by_product, "age": item.age_by_product,
"color": item.master_color,
"price_range": f"{item.price_min or ''}-{item.price_max or ''}",
"product_line_vn": item.product_line_vn, "product_line_vn": item.product_line_vn,
"magento_ref_code": item.magento_ref_code, "magento_ref_code": item.magento_ref_code,
"discovery_mode": item.discovery_mode,
}, },
output={ output={
"raw_count": len(products), "product_count": len(products),
"build_ms": round(query_build_time, 2), "build_ms": round(query_build_time, 2),
"db_ms": round(db_time, 2), "db_ms": round(db_time, 2),
"total_ms": round(query_build_time + db_time, 2),
"sample_products": [
{
"name": p.get("product_name", "?"),
"code": p.get("magento_ref_code", "?"),
"price": float(p["sale_price"]) if p.get("sale_price") else None,
}
for p in products[:3]
] if products else [],
}, },
metadata={"sql_length": len(sql)},
) )
except Exception: except Exception as trace_err:
pass # Don't break if Langfuse not available logger.warning("⚠️ Langfuse db-query-search span failed: %s", trace_err, exc_info=True)
return format_product_results(products), {"fallback_used": False} return format_product_results(products), {"fallback_used": False}
except Exception as e: except Exception as e:
...@@ -192,8 +217,26 @@ async def data_retrieval_tool(searches: list[SearchItem]) -> str: ...@@ -192,8 +217,26 @@ async def data_retrieval_tool(searches: list[SearchItem]) -> str:
Công cụ tìm kiếm sản phẩm CANIFA. Công cụ tìm kiếm sản phẩm CANIFA.
Hỗ trợ tìm kiếm Semantic và lọc theo Metadata. Hỗ trợ tìm kiếm Semantic và lọc theo Metadata.
""" """
tool_start = time.time()
logger.info("🔧 data_retrieval_tool called with %d items", len(searches)) logger.info("🔧 data_retrieval_tool called with %d items", len(searches))
# Log search params cho debugging
for i, s in enumerate(searches):
logger.info(
"🔧 Search[%d]: desc=%r, name=%r, gender=%s, age=%s, color=%s, "
"price=%s-%s, line=%s, code=%s, mode=%s",
i,
(s.description[:80] + "...") if s.description and len(s.description) > 80 else s.description,
s.product_name,
s.gender_by_product,
s.age_by_product,
s.master_color,
s.price_min, s.price_max,
s.product_line_vn,
s.magento_ref_code,
s.discovery_mode,
)
# Get DB Connection # Get DB Connection
db = get_db_connection() db = get_db_connection()
if not db: if not db:
...@@ -213,8 +256,6 @@ async def data_retrieval_tool(searches: list[SearchItem]) -> str: ...@@ -213,8 +256,6 @@ async def data_retrieval_tool(searches: list[SearchItem]) -> str:
if filter_info: if filter_info:
all_filter_infos.append(filter_info) all_filter_infos.append(filter_info)
# Aggregate filter info from first result for simplicity in response # Aggregate filter info from first result for simplicity in response
final_info = all_filter_infos[0] if all_filter_infos else {} final_info = all_filter_infos[0] if all_filter_infos else {}
...@@ -237,16 +278,18 @@ async def data_retrieval_tool(searches: list[SearchItem]) -> str: ...@@ -237,16 +278,18 @@ async def data_retrieval_tool(searches: list[SearchItem]) -> str:
"filter_info": final_info, "filter_info": final_info,
} }
total_ms = (time.time() - tool_start) * 1000
logger.info( logger.info(
"🎁 Final result: %d products. Fallback used: %s", "🎁 Final result: %d products, total_ms=%.2f. Fallback used: %s",
len(combined_results), len(combined_results),
total_ms,
final_info.get("fallback_used", False), final_info.get("fallback_used", False),
) )
return json.dumps(output, ensure_ascii=False, default=str) return json.dumps(output, ensure_ascii=False, default=str)
# Load dynamic docstring
# Load dynamic docstring # Load dynamic docstring
dynamic_prompt = read_tool_prompt("data_retrieval_tool") dynamic_prompt = read_tool_prompt("data_retrieval_tool")
if dynamic_prompt: if dynamic_prompt:
......
...@@ -2,7 +2,7 @@ import logging ...@@ -2,7 +2,7 @@ import logging
import os import os
import time import time
from langfuse import observe # Note: tracing handled by parent data_retrieval_tool via context manager
from common.embedding_service import create_embedding_async from common.embedding_service import create_embedding_async
...@@ -103,7 +103,6 @@ def _get_metadata_clauses(params, sql_params: list) -> list[str]: ...@@ -103,7 +103,6 @@ def _get_metadata_clauses(params, sql_params: list) -> list[str]:
return clauses return clauses
@observe(name="build-starrocks-query")
async def build_starrocks_query(params, query_vector: list[float] | None = None) -> tuple[str, list]: async def build_starrocks_query(params, query_vector: list[float] | None = None) -> tuple[str, list]:
""" """
Build SQL query với Parameterized Query để tránh SQL Injection. Build SQL query với Parameterized Query để tránh SQL Injection.
......
from fastapi import APIRouter, HTTPException, Request from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel from pydantic import BaseModel
import os
import re
from agent.graph import reset_graph
from common.cache import bump_prompt_version
router = APIRouter() from agent.prompt_utils import get_system_prompt_template, LANGFUSE_SYSTEM_PROMPT_NAME, _get_langfuse
PROMPT_FILE_PATH = os.path.join(os.path.dirname(__file__), "../agent/system_prompt.txt") router = APIRouter()
# Allowed variables in prompt (single braces OK for these)
ALLOWED_VARIABLES = {"date_str"}
class PromptUpdateRequest(BaseModel): class PromptUpdateRequest(BaseModel):
content: str content: str
def validate_prompt_braces(content: str) -> tuple[bool, list[str]]:
"""
Validate that all braces in prompt are properly escaped.
Returns (is_valid, list of problematic patterns)
"""
# Find all {word} patterns
single_brace_pattern = re.findall(r'\{([^{}]+)\}', content)
# Filter out allowed variables
problematic = [
var for var in single_brace_pattern
if var.strip() not in ALLOWED_VARIABLES and not var.startswith('{')
]
return len(problematic) == 0, problematic
from common.rate_limit import rate_limit_service from common.rate_limit import rate_limit_service
@router.get("/api/agent/system-prompt") @router.get("/api/agent/system-prompt")
async def get_system_prompt_content(request: Request): async def get_system_prompt_content(request: Request):
"""Get current system prompt content""" """Get current system prompt content from Langfuse."""
try: try:
if os.path.exists(PROMPT_FILE_PATH): return {"status": "success", "content": get_system_prompt_template()}
with open(PROMPT_FILE_PATH, "r", encoding="utf-8") as f:
content = f.read()
return {"status": "success", "content": content}
else:
return {"status": "error", "message": "Prompt file not found"}
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
...@@ -50,41 +25,24 @@ async def get_system_prompt_content(request: Request): ...@@ -50,41 +25,24 @@ async def get_system_prompt_content(request: Request):
@router.post("/api/agent/system-prompt") @router.post("/api/agent/system-prompt")
@rate_limit_service.limiter.limit("10/minute") @rate_limit_service.limiter.limit("10/minute")
async def update_system_prompt_content(request: Request, body: PromptUpdateRequest): async def update_system_prompt_content(request: Request, body: PromptUpdateRequest):
"""Update system prompt content""" """Create new system prompt version in Langfuse."""
try: try:
# Validate braces lf = _get_langfuse()
is_valid, problematic = validate_prompt_braces(body.content) content = body.content
if "{date_str}" in content and "{{date_str}}" not in content:
if not is_valid: content = content.replace("{date_str}", "{{date_str}}")
# Return warning but still allow save
warning = ( prompt = lf.create_prompt(
f"⚠️ Phát hiện {{...}} chưa escape: {problematic[:3]}... " name=LANGFUSE_SYSTEM_PROMPT_NAME,
f"Nếu đây là JSON, hãy dùng {{{{ }}}} thay vì {{ }}. " prompt=content,
f"Prompt vẫn được lưu nhưng có thể gây lỗi khi chat." labels=["production"],
tags=["canifa", "system-prompt"],
type="text",
) )
else: return {
warning = None
# 1. Update file
with open(PROMPT_FILE_PATH, "w", encoding="utf-8") as f:
f.write(body.content)
# 2. Bump prompt version in Redis (ALL workers will detect this)
new_version = await bump_prompt_version()
# 3. Reset local worker's Graph Singleton (immediate effect for this worker)
reset_graph()
response = {
"status": "success", "status": "success",
"message": f"System prompt updated. Version: {new_version}. All workers will reload on next request.", "message": f"System prompt updated. Version: {prompt.version}",
"prompt_version": new_version "prompt_version": prompt.version,
} }
if warning:
response["warning"] = warning
return response
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
...@@ -23,7 +23,7 @@ async def get_tool_prompts_list(request: Request): ...@@ -23,7 +23,7 @@ async def get_tool_prompts_list(request: Request):
@router.get("/api/agent/tool-prompts/{filename}") @router.get("/api/agent/tool-prompts/{filename}")
async def get_tool_prompt_content(filename: str, request: Request): async def get_tool_prompt_content(filename: str, request: Request):
"""Get content of a specific tool prompt file.""" """Get content of a specific tool prompt (from Langfuse, fallback to file)."""
try: try:
content = read_tool_prompt(filename) content = read_tool_prompt(filename)
if not content: if not content:
...@@ -38,7 +38,7 @@ async def get_tool_prompt_content(filename: str, request: Request): ...@@ -38,7 +38,7 @@ async def get_tool_prompt_content(filename: str, request: Request):
@router.post("/api/agent/tool-prompts/{filename}") @router.post("/api/agent/tool-prompts/{filename}")
async def update_tool_prompt_content(filename: str, request: Request, body: ToolPromptUpdateRequest): async def update_tool_prompt_content(filename: str, request: Request, body: ToolPromptUpdateRequest):
"""Update content of a tool prompt file and reset graph.""" """Update tool prompt (writes to file + syncs to Langfuse) and reset graph."""
try: try:
# Ensure filename is safe (basic check) # Ensure filename is safe (basic check)
if ".." in filename or "/" in filename or "\\" in filename: if ".." in filename or "/" in filename or "\\" in filename:
...@@ -51,7 +51,7 @@ async def update_tool_prompt_content(filename: str, request: Request, body: Tool ...@@ -51,7 +51,7 @@ async def update_tool_prompt_content(filename: str, request: Request, body: Tool
# Reset Graph to reload tools with new prompts # Reset Graph to reload tools with new prompts
reset_graph() reset_graph()
return {"status": "success", "message": f"Tool prompt {filename} updated successfully. Graph reloaded."} return {"status": "success", "message": f"Tool prompt {filename} updated (file + Langfuse). Graph reloaded."}
except HTTPException: except HTTPException:
raise raise
except Exception as e: except Exception as e:
......
"""Quick test: Gemini 2.0 Flash via Google GenAI."""
from google import genai
API_KEY = "AIzaSyCLAk9LsvS7RtxuXJLf5uqx4sI034Hc3Xg"
client = genai.Client(api_key=API_KEY)
response = client.models.generate_content(
model="gemini-3-flash-preview",
contents="Xin chào! Bạn là ai? Trả lời ngắn gọn bằng tiếng Việt.",
)
print("=" * 60)
print("Model: gemini--flash")
print("=" * 60)
print(response.text)
print("=" * 60)
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment