Commit 28274420 authored by Hoanganhvu123's avatar Hoanganhvu123

feat: Migrate from LangGraph to Agno framework

parent f057ad1e
""" """
Fashion Q&A Agent Package Fashion Q&A Agent Package - Agno Framework
""" """
from .graph import build_graph # Only export what's needed for Agno
from .models import AgentConfig, AgentState, get_config from .agno_agent import get_agno_agent
from .agno_controller import chat_controller
from .models import QueryRequest
__all__ = [ __all__ = [
"AgentConfig", "get_agno_agent",
"AgentState", "chat_controller",
"build_graph", "QueryRequest",
"get_config",
] ]
"""
CANIFA Agent với Agno Framework
Thay thế LangGraph bằng Agno
"""
import logging
from typing import TYPE_CHECKING, Any, cast
# Type checking imports (only used for type hints)
if TYPE_CHECKING:
from agno.agent import Agent as AgentType
from agno.db.base import BaseDb as BaseDbType
from agno.models.openai import OpenAIChat as OpenAIChatType
else:
AgentType = Any # type: ignore
BaseDbType = Any # type: ignore
OpenAIChatType = Any # type: ignore
# Runtime imports with fallback
try:
from agno.agent import Agent
from agno.db.base import BaseDb
from agno.models.openai import OpenAIChat
except ImportError:
# Fallback nếu chưa install agno
Agent = None
BaseDb = Any # type: ignore
OpenAIChat = None
from common.conversation_manager import get_conversation_manager
from config import DEFAULT_MODEL, OPENAI_API_KEY
from .prompt import get_system_prompt
from .tools.agno_tools import get_agno_tools
logger = logging.getLogger(__name__)
def create_agno_model(model_name: str = DEFAULT_MODEL, json_mode: bool = False):
"""
Tạo Agno model từ config.py
"""
if OpenAIChat is None:
raise ImportError("Agno not installed. Run: pip install agno")
return OpenAIChat(
id=model_name,
api_key=OPENAI_API_KEY,
# Agno sẽ handle json_mode nếu cần
)
async def create_agno_agent(
model_name: str = DEFAULT_MODEL,
json_mode: bool = False,
) -> AgentType: # type: ignore
"""
Tạo Agno Agent với ConversationManager (có memory)
Args:
model_name: Model name từ config.py
json_mode: Enable JSON output
Returns:
Configured Agno Agent
"""
# Tạo model từ config
model = create_agno_model(model_name, json_mode)
# Lấy tools (đã convert sang Agno format)
tools = get_agno_tools()
# Lấy system prompt
system_prompt = get_system_prompt()
# Lấy ConversationManager (đã implement BaseDb interface)
db = await get_conversation_manager()
if Agent is None:
raise ImportError("Agno not installed. Run: pip install agno")
# Type cast: ConversationManager implements BaseDb interface (duck typing)
# Runtime sẽ hoạt động vì ConversationManager có đủ methods cần thiết
db_cast = cast(BaseDbType, db) # type: ignore[assignment]
# Tạo Agno Agent với DB (có memory)
agent = Agent(
name="CANIFA Agent",
model=model,
db=db_cast, # Dùng ConversationManager (implement BaseDb interface)
tools=tools,
instructions=system_prompt, # Agno dùng instructions thay vì system_prompt
add_history_to_context=True, # Bật history
num_history_runs=20, # Load 20 messages gần nhất
markdown=True,
)
logger.info(f"✅ Agno Agent created with model: {model_name} (WITH MEMORY)")
return agent
# Singleton instance
_agno_agent_instance: AgentType | None = None # type: ignore
async def get_agno_agent(
model_name: str = DEFAULT_MODEL,
json_mode: bool = False,
) -> AgentType: # type: ignore
"""
Get or create Agno Agent singleton (với memory)
"""
global _agno_agent_instance
if _agno_agent_instance is None:
# Tạo agent với ConversationManager (có memory)
_agno_agent_instance = await create_agno_agent(
model_name=model_name,
json_mode=json_mode,
)
return _agno_agent_instance
def reset_agno_agent():
"""Reset singleton for testing"""
global _agno_agent_instance
_agno_agent_instance = None
"""
CANIFA Agent Controller với Agno Framework
"""
import json
import logging
from typing import Any
from fastapi import BackgroundTasks
from common.langfuse_client import langfuse_trace_context
from config import DEFAULT_MODEL
from .agno_agent import get_agno_agent
logger = logging.getLogger(__name__)
async def chat_controller(
query: str,
user_id: str,
background_tasks: BackgroundTasks,
model_name: str = DEFAULT_MODEL,
images: list[str] | None = None,
) -> dict:
"""
Controller với Agno Agent (có memory tự động).
Agno tự động load/save history qua ConversationManager.
"""
logger.info(f"▶️ Agno chat_controller | User: {user_id} | Model: {model_name}")
try:
agent = await get_agno_agent(model_name=model_name, json_mode=True)
with langfuse_trace_context(user_id=user_id, session_id=user_id):
# Agno tự động load history và save sau khi respond (memory enabled)
result = agent.run(query, session_id=user_id)
# Extract response
ai_content = str(result.content if hasattr(result, "content") and result.content else str(result))
logger.info(f"💾 AI Response: {ai_content[:200]}...")
# Parse response và extract products
ai_text, product_ids = _parse_agno_response(result, ai_content)
return {
"ai_response": ai_text,
"product_ids": product_ids,
}
except Exception as e:
logger.error(f"💥 Agno chat error for user {user_id}: {e}", exc_info=True)
raise
def _parse_agno_response(result: Any, ai_content: str) -> tuple[str, list[dict]]:
"""
Parse Agno response và extract AI text + product IDs.
Returns: (ai_text_response, product_ids)
"""
ai_text = ai_content
product_ids = []
# Try parse JSON response
try:
ai_json = json.loads(ai_content)
ai_text = ai_json.get("ai_response", ai_content)
product_ids = ai_json.get("product_ids", []) or []
except (json.JSONDecodeError, Exception) as e:
logger.debug(f"Response is not JSON, using raw text: {e}")
# Extract products từ tool results
if hasattr(result, "messages"):
tool_products = _extract_products_from_messages(result.messages)
# Merge và deduplicate
seen_skus = {p.get("sku") for p in product_ids if isinstance(p, dict) and "sku" in p}
for product in tool_products:
if isinstance(product, dict) and product.get("sku") not in seen_skus:
product_ids.append(product)
seen_skus.add(product.get("sku"))
return ai_text, product_ids
def _extract_products_from_messages(messages: list) -> list[dict]:
"""Extract products từ Agno tool messages."""
products = []
seen_skus = set()
for msg in messages:
if not (hasattr(msg, "content") and isinstance(msg.content, str)):
continue
try:
tool_result = json.loads(msg.content)
if tool_result.get("status") != "success":
continue
# Handle multi-search format
if "results" in tool_result:
for result_item in tool_result["results"]:
products.extend(_parse_products(result_item.get("products", []), seen_skus))
# Handle single search format
elif "products" in tool_result:
products.extend(_parse_products(tool_result["products"], seen_skus))
except (json.JSONDecodeError, KeyError, TypeError) as e:
logger.debug(f"Skip invalid tool message: {e}")
continue
return products
def _parse_products(products: list[dict], seen_skus: set[str]) -> list[dict]:
"""Parse và format products, skip duplicates."""
parsed = []
for product in products:
if not isinstance(product, dict):
continue
sku = product.get("internal_ref_code")
if not sku or sku in seen_skus:
continue
seen_skus.add(sku)
parsed.append({
"sku": sku,
"name": product.get("magento_product_name", ""),
"price": product.get("price_vnd", 0),
"sale_price": product.get("sale_price_vnd"),
"url": product.get("magento_url_key", ""),
"thumbnail_image_url": product.get("thumbnail_image_url", ""),
})
return parsed
"""
Fashion Q&A Agent Controller
Langfuse will auto-trace via LangChain integration (no code changes needed).
"""
import json
import logging
import uuid
from fastapi import BackgroundTasks
from langchain_core.messages import AIMessage, HumanMessage, ToolMessage
from langchain_core.runnables import RunnableConfig
from common.conversation_manager import ConversationManager, get_conversation_manager
from common.langfuse_client import get_callback_handler, langfuse_trace_context
from common.llm_factory import create_llm
from config import DEFAULT_MODEL
from .graph import build_graph
from .models import AgentState, get_config
from .tools.get_tools import get_all_tools
logger = logging.getLogger(__name__)
async def chat_controller(
query: str,
user_id: str,
background_tasks: BackgroundTasks,
model_name: str = DEFAULT_MODEL,
images: list[str] | None = None,
) -> dict:
"""
Controller main logic for non-streaming chat requests.
Langfuse will automatically trace all LangChain operations.
"""
logger.info(f"▶️ Starting chat_controller with model: {model_name} for user: {user_id}")
config = get_config()
config.model_name = model_name
# Enable JSON mode to ensure structured output
llm = create_llm(model_name=model_name, streaming=False, json_mode=True)
tools = get_all_tools()
graph = build_graph(config, llm=llm, tools=tools)
# Init ConversationManager (Singleton)
memory = await get_conversation_manager()
# LOAD HISTORY & Prepare State (Optimize: history logic remains solid)
history_dicts = await memory.get_chat_history(user_id, limit=20)
history = []
for h in reversed(history_dicts):
msg_cls = HumanMessage if h["is_human"] else AIMessage
history.append(msg_cls(content=h["message"]))
initial_state, exec_config = _prepare_execution_context(
query=query, user_id=user_id, history=history, images=images
)
try:
# 🔥 Wrap graph execution với langfuse_trace_context để set user_id cho tất cả observations
with langfuse_trace_context(user_id=user_id, session_id=user_id):
# TỐI ƯU: Chạy Graph
result = await graph.ainvoke(initial_state, config=exec_config)
# TỐI ƯU: Extract IDs từ Tool Messages một lần duy nhất
all_product_ids = _extract_product_ids(result.get("messages", []))
# TỐI ƯU: Xử lý AI Response
ai_raw_content = result.get("ai_response").content if result.get("ai_response") else ""
logger.info(f"💾 [RAW AI OUTPUT]:\n{ai_raw_content}")
# Parse JSON để lấy text response và product_ids từ AI
ai_text_response = ai_raw_content
try:
# Vì json_mode=True, OpenAI sẽ nhả raw JSON
ai_json = json.loads(ai_raw_content)
# Extract text response từ JSON
ai_text_response = ai_json.get("ai_response", ai_raw_content)
# Merge product_ids từ AI JSON (nếu có) - KHÔNG dùng set() vì dict unhashable
explicit_ids = ai_json.get("product_ids", [])
if explicit_ids and isinstance(explicit_ids, list):
# Merge và deduplicate by SKU
seen_skus = {p["sku"] for p in all_product_ids if "sku" in p}
for product in explicit_ids:
if isinstance(product, dict) and product.get("sku") not in seen_skus:
all_product_ids.append(product)
seen_skus.add(product.get("sku"))
except (json.JSONDecodeError, Exception) as e:
# Nếu AI trả về text thường (hiếm khi xảy ra trong JSON mode) thì ignore
logger.warning(f"Could not parse AI response as JSON: {e}")
pass
# BACKGROUND TASK: Lưu history nhanh gọn
background_tasks.add_task(
_handle_post_chat_async,
memory=memory,
user_id=user_id,
human_query=query,
ai_msg=AIMessage(content=ai_text_response),
)
return {
"ai_response": ai_text_response, # CHỈ text, không phải JSON
"product_ids": all_product_ids, # Array of product objects
}
except Exception as e:
logger.error(f"💥 Chat error for user {user_id}: {e}", exc_info=True)
raise
def _extract_product_ids(messages: list) -> list[dict]:
"""
Extract full product info from tool messages (data_retrieval_tool results).
Returns list of product objects with: sku, name, price, sale_price, url, thumbnail_image_url.
"""
products = []
seen_skus = set()
for msg in messages:
if isinstance(msg, ToolMessage):
try:
# Tool result is JSON string
tool_result = json.loads(msg.content)
# Check if tool returned products
if tool_result.get("status") == "success" and "products" in tool_result:
for product in tool_result["products"]:
sku = product.get("internal_ref_code")
if sku and sku not in seen_skus:
seen_skus.add(sku)
# Extract full product info
product_obj = {
"sku": sku,
"name": product.get("magento_product_name", ""),
"price": product.get("price_vnd", 0),
"sale_price": product.get("sale_price_vnd"), # null nếu không sale
"url": product.get("magento_url_key", ""),
"thumbnail_image_url": product.get("thumbnail_image_url", ""),
}
products.append(product_obj)
except (json.JSONDecodeError, KeyError, TypeError) as e:
logger.debug(f"Could not parse tool message for products: {e}")
continue
return products
def _prepare_execution_context(query: str, user_id: str, history: list, images: list | None):
"""Prepare initial state and execution config for the graph run."""
initial_state: AgentState = {
"user_query": HumanMessage(content=query),
"messages": [HumanMessage(content=query)],
"history": history,
"user_id": user_id,
"images_embedding": [],
"ai_response": None,
}
run_id = str(uuid.uuid4())
# Metadata for LangChain (tags for logging/filtering)
metadata = {
"run_id": run_id,
"tags": "chatbot,production",
}
# 🔥 CallbackHandler - sẽ được wrap trong langfuse_trace_context để set user_id
# Per Langfuse docs: propagate_attributes() handles user_id propagation
langfuse_handler = get_callback_handler()
exec_config = RunnableConfig(
configurable={
"user_id": user_id,
"transient_images": images or [],
"run_id": run_id,
},
run_id=run_id,
metadata=metadata,
callbacks=[langfuse_handler] if langfuse_handler else [],
)
return initial_state, exec_config
async def _handle_post_chat_async(
memory: ConversationManager, user_id: str, human_query: str, ai_msg: AIMessage | None
):
"""Save chat history in background task after response is sent."""
if ai_msg:
try:
await memory.save_conversation_turn(user_id, human_query, ai_msg.content)
logger.debug(f"Saved conversation for user {user_id}")
except Exception as e:
logger.error(f"Failed to save conversation for user {user_id}: {e}", exc_info=True)
"""
Fashion Q&A Agent Graph
LangGraph workflow với clean architecture.
Tất cả resources (LLM, Tools) khởi tạo trong __init__.
Sử dụng ConversationManager (Postgres) để lưu history thay vì checkpoint.
"""
import logging
from typing import Any
from langchain_core.language_models import BaseChatModel
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnableConfig
from langgraph.cache.memory import InMemoryCache
from langgraph.graph import END, StateGraph
from langgraph.prebuilt import ToolNode
from langgraph.types import CachePolicy
from common.llm_factory import create_llm
from .models import AgentConfig, AgentState, get_config
from .prompt import get_system_prompt
from .tools.get_tools import get_all_tools, get_collection_tools
logger = logging.getLogger(__name__)
class CANIFAGraph:
"""
Fashion Q&A Agent Graph Manager.
"""
def __init__(
self,
config: AgentConfig | None = None,
llm: BaseChatModel | None = None,
tools: list | None = None,
):
self.config = config or get_config()
self._compiled_graph: Any | None = None
self.llm: BaseChatModel = llm or create_llm(
model_name=self.config.model_name, api_key=self.config.openai_api_key, streaming=True
)
self.all_tools = tools or get_all_tools()
self.collection_tools = get_collection_tools() # Vẫn lấy list name để routing
self.retrieval_tools = self.all_tools
self.llm_with_tools = self.llm.bind_tools(self.all_tools, strict=True)
self.system_prompt = get_system_prompt()
self.prompt_template = ChatPromptTemplate.from_messages(
[
("system", self.system_prompt),
MessagesPlaceholder(variable_name="history"),
MessagesPlaceholder(variable_name="user_query"),
MessagesPlaceholder(variable_name="messages"),
]
)
self.chain = self.prompt_template | self.llm_with_tools
self.cache = InMemoryCache()
async def _agent_node(self, state: AgentState, config: RunnableConfig) -> dict:
"""Agent node - Chỉ việc đổ dữ liệu riêng vào khuôn đã có sẵn."""
messages = state.get("messages", [])
history = state.get("history", [])
user_query = state.get("user_query")
transient_images = config.get("configurable", {}).get("transient_images", [])
if transient_images and messages:
pass
# Invoke chain with user_query, history, and messages
response = await self.chain.ainvoke({
"user_query": [user_query] if user_query else [],
"history": history,
"messages": messages
})
return {"messages": [response], "ai_response": response}
def _should_continue(self, state: AgentState) -> str:
"""Routing: tool nodes hoặc end."""
last_message = state["messages"][-1]
if not hasattr(last_message, "tool_calls") or not last_message.tool_calls:
logger.info("🏁 Agent finished")
return "end"
tool_names = [tc["name"] for tc in last_message.tool_calls]
collection_names = [t.name for t in self.collection_tools]
if any(name in collection_names for name in tool_names):
logger.info(f"🔄 → collect_tools: {tool_names}")
return "collect_tools"
logger.info(f"🔄 → retrieve_tools: {tool_names}")
return "retrieve_tools"
def build(self) -> Any:
"""Build và compile LangGraph workflow."""
if self._compiled_graph is not None:
return self._compiled_graph
workflow = StateGraph(AgentState)
# Nodes
workflow.add_node("agent", self._agent_node)
workflow.add_node("retrieve_tools", ToolNode(self.retrieval_tools), cache_policy=CachePolicy(ttl=3600))
workflow.add_node("collect_tools", ToolNode(self.collection_tools))
# Edges
workflow.set_entry_point("agent")
workflow.add_conditional_edges(
"agent",
self._should_continue,
{"retrieve_tools": "retrieve_tools", "collect_tools": "collect_tools", "end": END},
)
workflow.add_edge("retrieve_tools", "agent")
workflow.add_edge("collect_tools", "agent")
self._compiled_graph = workflow.compile(cache=self.cache) # No Checkpointer
logger.info("✅ Graph compiled (Langfuse callback will be per-run)")
return self._compiled_graph
@property
def graph(self) -> Any:
return self.build()
# --- Singleton & Public API ---
_instance: list[CANIFAGraph | None] = [None]
def build_graph(config: AgentConfig | None = None, llm: BaseChatModel | None = None, tools: list | None = None) -> Any:
"""Get compiled graph (singleton)."""
if _instance[0] is None:
_instance[0] = CANIFAGraph(config, llm, tools)
return _instance[0].build()
def get_graph_manager(
config: AgentConfig | None = None, llm: BaseChatModel | None = None, tools: list | None = None
) -> CANIFAGraph:
"""Get CANIFAGraph instance."""
if _instance[0] is None:
_instance[0] = CANIFAGraph(config, llm, tools)
return _instance[0]
def reset_graph() -> None:
"""Reset singleton for testing."""
_instance[0] = None
"""
Agno Tools - Pure Python functions cho Agno Agent
Đã convert từ LangChain @tool decorator sang Agno format
"""
from .data_retrieval_tool import data_retrieval_tool
from .brand_knowledge_tool import canifa_knowledge_search
from .customer_info_tool import collect_customer_info
def get_agno_tools():
"""
Get tools cho Agno Agent.
Agno tự động convert Python functions thành tool definitions.
Returns:
List of Python functions (Agno tools)
"""
return [
data_retrieval_tool,
canifa_knowledge_search,
collect_customer_info,
]
import logging import logging
from langchain_core.tools import tool
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from common.embedding_service import create_embedding_async from common.embedding_service import create_embedding_async
...@@ -15,7 +14,6 @@ class KnowledgeSearchInput(BaseModel): ...@@ -15,7 +14,6 @@ class KnowledgeSearchInput(BaseModel):
) )
@tool("canifa_knowledge_search", args_schema=KnowledgeSearchInput)
async def canifa_knowledge_search(query: str) -> str: async def canifa_knowledge_search(query: str) -> str:
""" """
Tra cứu TOÀN BỘ thông tin về thương hiệu và dịch vụ của Canifa. Tra cứu TOÀN BỘ thông tin về thương hiệu và dịch vụ của Canifa.
...@@ -35,6 +33,10 @@ async def canifa_knowledge_search(query: str) -> str: ...@@ -35,6 +33,10 @@ async def canifa_knowledge_search(query: str) -> str:
- 'Cho mình xem bảng size áo nam.' - 'Cho mình xem bảng size áo nam.'
- 'Phí vận chuyển đi tỉnh là bao nhiêu?' - 'Phí vận chuyển đi tỉnh là bao nhiêu?'
- 'Canifa thành lập năm nào?' - 'Canifa thành lập năm nào?'
Args:
query: Câu hỏi hoặc nhu cầu tìm kiếm thông tin phi sản phẩm của khách hàng
(ví dụ: tìm cửa hàng, hỏi chính sách, tra bảng size...)
""" """
logger.info(f"🔍 [Semantic Search] Brand Knowledge query: {query}") logger.info(f"🔍 [Semantic Search] Brand Knowledge query: {query}")
......
...@@ -6,16 +6,14 @@ Dùng để đẩy data về CRM hoặc hệ thống lưu trữ khách hàng. ...@@ -6,16 +6,14 @@ Dùng để đẩy data về CRM hoặc hệ thống lưu trữ khách hàng.
import json import json
import logging import logging
from langchain_core.tools import tool
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@tool async def collect_customer_info(name: str, phone: str, email: str | None = None) -> str:
async def collect_customer_info(name: str, phone: str, email: str | None) -> str:
""" """
Sử dụng tool này để ghi lại thông tin khách hàng khi họ muốn tư vấn sâu hơn, Sử dụng tool này để ghi lại thông tin khách hàng khi họ muốn tư vấn sâu hơn,
nhận khuyến mãi hoặc đăng ký mua hàng. nhận khuyến mãi hoặc đăng ký mua hàng.
Args: Args:
name: Tên của khách hàng name: Tên của khách hàng
phone: Số điện thoại của khách hàng phone: Số điện thoại của khách hàng
......
...@@ -9,7 +9,6 @@ import logging ...@@ -9,7 +9,6 @@ import logging
import time import time
from decimal import Decimal from decimal import Decimal
from langchain_core.tools import tool
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from agent.tools.product_search_helpers import build_starrocks_query, save_preview_to_log from agent.tools.product_search_helpers import build_starrocks_query, save_preview_to_log
...@@ -50,8 +49,6 @@ class MultiSearchParams(BaseModel): ...@@ -50,8 +49,6 @@ class MultiSearchParams(BaseModel):
searches: list[SearchItem] = Field(..., description="Danh sách các truy vấn tìm kiếm chạy song song") searches: list[SearchItem] = Field(..., description="Danh sách các truy vấn tìm kiếm chạy song song")
@tool(args_schema=MultiSearchParams)
# @traceable(run_type="tool", name="data_retrieval_tool")
async def data_retrieval_tool(searches: list[SearchItem]) -> str: async def data_retrieval_tool(searches: list[SearchItem]) -> str:
""" """
Siêu công cụ tìm kiếm sản phẩm CANIFA - Hỗ trợ Parallel Multi-Search (Chạy song song nhiều query). Siêu công cụ tìm kiếm sản phẩm CANIFA - Hỗ trợ Parallel Multi-Search (Chạy song song nhiều query).
...@@ -86,6 +83,14 @@ async def data_retrieval_tool(searches: list[SearchItem]) -> str: ...@@ -86,6 +83,14 @@ async def data_retrieval_tool(searches: list[SearchItem]) -> str:
{"query": "Quần jean nam slim fit năng động"}, {"query": "Quần jean nam slim fit năng động"},
{"query": "Áo khoác nam thể thao trẻ trung"} {"query": "Áo khoác nam thể thao trẻ trung"}
] ]
Args:
searches: Danh sách các truy vấn tìm kiếm chạy song song. Mỗi item là SearchItem với:
- query: Mô tả sản phẩm chi tiết (bắt buộc)
- magento_ref_code: Mã sản phẩm cụ thể (nếu có)
- price_min: Giá thấp nhất (nếu có)
- price_max: Giá cao nhất (nếu có)
- action: 'search' hoặc 'visual_search'
""" """
logger.info("🔧 [DEBUG] data_retrieval_tool STARTED") logger.info("🔧 [DEBUG] data_retrieval_tool STARTED")
try: try:
......
"""
CANIFA Data Retrieval Tool - Tối giản cho Agentic Workflow.
Hỗ trợ Hybrid Search: Semantic (Vector) + Metadata Filter.
"""
import asyncio
import json
import logging
import time
from decimal import Decimal
from langchain_core.tools import tool
from pydantic import BaseModel, Field
from agent.tools.product_search_helpers import build_starrocks_query
from common.starrocks_connection import StarRocksConnection
# from langsmith import traceable
logger = logging.getLogger(__name__)
class DecimalEncoder(json.JSONEncoder):
"""Xử lý kiểu Decimal từ Database khi convert sang JSON."""
def default(self, obj):
if isinstance(obj, Decimal):
return float(obj)
return super().default(obj)
class SearchItem(BaseModel):
"""Cấu trúc một mục tìm kiếm đơn lẻ trong Multi-Search."""
query: str = Field(
...,
description="Câu hỏi/mục đích tự do của user (đi chơi, dự tiệc, phỏng vấn,...) - dùng cho Semantic Search",
)
keywords: str | None = Field(
..., description="Từ khóa sản phẩm cụ thể (áo polo, quần jean,...) - dùng cho LIKE search"
)
magento_ref_code: str | None = Field(
..., description="Mã sản phẩm hoặc mã màu/SKU (Ví dụ: 8TS24W001 hoặc 8TS24W001-SK010)."
)
product_line_vn: str | None = Field(..., description="Dòng sản phẩm (Áo phông, Quần short,...)")
gender_by_product: str | None = Field(..., description="Giới tính: male, female")
age_by_product: str | None = Field(..., description="Độ tuổi: adult, kids, baby, others")
master_color: str | None = Field(..., description="Màu sắc chính (Đen/ Black, Trắng/ White,...)")
material_group: str | None = Field(
...,
description="Nhóm chất liệu. BẮT BUỘC dùng đúng: 'Yarn - Sợi', 'Knit - Dệt Kim', 'Woven - Dệt Thoi', 'Knit/Woven - Dệt Kim/Dệt Thoi'.",
)
season: str | None = Field(..., description="Mùa (Spring Summer, Autumn Winter)")
style: str | None = Field(..., description="Phong cách (Basic Update, Fashion,...)")
fitting: str | None = Field(..., description="Form dáng (Regular, Slim, Loose,...)")
form_neckline: str | None = Field(..., description="Kiểu cổ (Crew Neck, V-neck,...)")
form_sleeve: str | None = Field(..., description="Kiểu tay (Short Sleeve, Long Sleeve,...)")
price_min: float | None = Field(..., description="Giá thấp nhất")
price_max: float | None = Field(..., description="Giá cao nhất")
action: str = Field(..., description="Hành động: 'search' (tìm kiếm) hoặc 'visual_search' (phân tích ảnh)")
class MultiSearchParams(BaseModel):
"""Tham số cho Parallel Multi-Search."""
searches: list[SearchItem] = Field(..., description="Danh sách các truy vấn tìm kiếm chạy song song")
@tool(args_schema=MultiSearchParams)
# @traceable(run_type="tool", name="data_retrieval_tool")
async def data_retrieval_tool(searches: list[SearchItem]) -> str:
"""
Siêu công cụ tìm kiếm sản phẩm CANIFA - Hỗ trợ Parallel Multi-Search (Chạy song song nhiều query).
💡 ĐIỂM ĐẶC BIỆT:
Công cụ này cho phép thực hiện NHIỀU truy vấn tìm kiếm CÙNG LÚC.
Hãy dùng nó khi cần SO SÁNH sản phẩm hoặc tìm trọn bộ OUTFIT (mix & match).
⚠️ QUAN TRỌNG - KHI NÀO DÙNG GÌ:
1️⃣ DÙNG 'query' (Semantic Search - BUỘC PHẢI CÓ):
- Áp dụng cho mọi lượt search để cung cấp bối cảnh (context).
- Ví dụ: "áo thun nam đi biển", "quần tây công sở", "đồ cho bé màu xanh"...
2️⃣ DÙNG METADATA FILTERS (Exact/Partial Match):
- Khi khách nói rõ THUỘC TÍNH: Màu sắc, giá, giới tính, độ tuổi, mã sản phẩm.
- **QUY TẮC MÃ SẢN PHẨM:** Mọi loại mã (VD: `8TS...` hoặc `8TS...-SK...`) → Điền vào `magento_ref_code`.
- **QUY TẮC CHẤT LIÊU (material_group):** Chỉ dùng: `Yarn - Sợi`, `Knit - Dệt Kim`, `Woven - Dệt Thoi`, `Knit/Woven - Dệt Kim/Dệt Thoi`.
📝 VÍ DỤ CHI TIẾT (Single Search):
- Example 1: searches=[{"query": "áo polo nam giá dưới 400k", "keywords": "áo polo", "gender_by_product": "male", "price_max": 400000}]
- Example 2: searches=[{"query": "sản phẩm mã 8TS24W001", "magento_ref_code": "8TS24W001"}]
🚀 VÍ DỤ CẤP CAO (Multi-Search Parallel):
- Example 3 - So sánh: "So sánh áo thun nam đen và áo sơ mi trắng dưới 500k"
Tool Call: searches=[
{"query": "áo thun nam màu đen dưới 500k", "keywords": "áo thun", "master_color": "Đen", "gender_by_product": "male", "price_max": 500000},
{"query": "áo sơ mi nam trắng dưới 500k", "keywords": "áo sơ mi", "master_color": "Trắng", "gender_by_product": "male", "price_max": 500000}
]
- Example 4 - Phối đồ: "Tìm cho mình một cái quần jean và một cái áo khoác để đi chơi"
Tool Call: searches=[
{"query": "quần jean đi chơi năng động", "keywords": "quần jean"},
{"query": "áo khoác đi chơi năng động", "keywords": "áo khoác"}
]
- Example 5 - Cả gia đình: "Tìm áo phông màu xanh cho bố, mẹ và bé trai"
Tool Call: searches=[
{"query": "áo phông nam người lớn màu xanh", "keywords": "áo phông", "master_color": "Xanh", "gender_by_product": "male", "age_by_product": "adult"},
{"query": "áo phông nữ người lớn màu xanh", "keywords": "áo phông", "master_color": "Xanh", "gender_by_product": "female", "age_by_product": "adult"},
{"query": "áo phông bé trai màu xanh", "keywords": "áo phông", "master_color": "Xanh", "gender_by_product": "male", "age_by_product": "others"}
]
"""
logger.info("🔧 [DEBUG] data_retrieval_tool STARTED")
try:
logger.info("🔧 [DEBUG] Creating StarRocksConnection instance")
db = StarRocksConnection()
logger.info("🔧 [DEBUG] StarRocksConnection created successfully")
# 0. Log input parameters (Đúng ý bro)
logger.info(f"📥 [Tool Input] data_retrieval_tool received {len(searches)} items:")
for idx, item in enumerate(searches):
logger.info(f" 🔹 Item [{idx}]: {item.dict(exclude_none=True)}")
# 1. Tạo tasks chạy song song (Parallel)
logger.info("🔧 [DEBUG] Creating parallel tasks")
tasks = []
for item in searches:
tasks.append(_execute_single_search(db, item))
logger.info(f"🚀 [Parallel Search] Executing {len(searches)} queries simultaneously...")
logger.info("🔧 [DEBUG] About to call asyncio.gather()")
results = await asyncio.gather(*tasks)
logger.info(f"🔧 [DEBUG] asyncio.gather() completed with {len(results)} results")
# 2. Tổng hợp kết quả
combined_results = []
for i, products in enumerate(results):
combined_results.append(
{
"search_index": i,
"search_criteria": searches[i].dict(exclude_none=True),
"count": len(products),
"products": products,
}
)
return json.dumps({"status": "success", "results": combined_results}, ensure_ascii=False, cls=DecimalEncoder)
except Exception as e:
logger.error(f"Error in Multi-Search data_retrieval_tool: {e}")
return json.dumps({"status": "error", "message": str(e)})
async def _execute_single_search(db: StarRocksConnection, item: SearchItem) -> list[dict]:
"""Thực thi một search query đơn lẻ (Async)."""
try:
logger.info(f"🔧 [DEBUG] _execute_single_search STARTED for query: {item.query[:50] if item.query else 'None'}")
# ⏱️ Timer: Build query (bao gồm embedding nếu có)
query_build_start = time.time()
logger.info("🔧 [DEBUG] Calling build_starrocks_query()")
sql = await build_starrocks_query(item)
query_build_time = (time.time() - query_build_start) * 1000 # Convert to ms
logger.info(f"🔧 [DEBUG] SQL query built, length: {len(sql)}")
logger.info(f"⏱️ [TIMER] Query Build Time (bao gồm embedding): {query_build_time:.2f}ms")
# ⏱️ Timer: Execute DB query
db_start = time.time()
logger.info("🔧 [DEBUG] Calling db.execute_query_async()")
products = await db.execute_query_async(sql)
db_time = (time.time() - db_start) * 1000 # Convert to ms
logger.info(f"🔧 [DEBUG] Query executed, got {len(products)} products")
logger.info(f"⏱️ [TIMER] DB Query Execution Time: {db_time:.2f}ms")
logger.info(f"⏱️ [TIMER] Total Time (Build + DB): {query_build_time + db_time:.2f}ms")
return _format_product_results(products)
except Exception as e:
logger.error(f"Single search error for item {item}: {e}")
return []
def _format_product_results(products: list[dict]) -> list[dict]:
"""Lọc và format kết quả trả về cho Agent."""
allowed_fields = {
"internal_ref_code",
"description_text_full",
}
return [{k: v for k, v in p.items() if k in allowed_fields} for p in products[:5]]
This diff is collapsed.
...@@ -9,7 +9,7 @@ import logging ...@@ -9,7 +9,7 @@ import logging
from fastapi import APIRouter, BackgroundTasks, HTTPException from fastapi import APIRouter, BackgroundTasks, HTTPException
from opentelemetry import trace from opentelemetry import trace
from agent.controller import chat_controller from agent.agno_controller import chat_controller
from agent.models import QueryRequest from agent.models import QueryRequest
from config import DEFAULT_MODEL from config import DEFAULT_MODEL
......
This diff is collapsed.
""" """
Simple Langfuse Client Wrapper Langfuse Client với OpenInference instrumentation cho Agno
Minimal setup using langfuse.langchain module Tự động trace tất cả Agno calls (LLM, tools, agent runs)
With propagate_attributes for proper user_id tracking
""" """
import asyncio import asyncio
import base64
import logging import logging
import os import os
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
...@@ -19,6 +19,21 @@ from config import ( ...@@ -19,6 +19,21 @@ from config import (
LANGFUSE_SECRET_KEY, LANGFUSE_SECRET_KEY,
) )
# OpenInference imports (optional - only if available)
_OPENINFERENCE_AVAILABLE = False
AgnoInstrumentor = None # type: ignore
try:
from openinference.instrumentation.agno import AgnoInstrumentor # type: ignore[import-untyped]
from opentelemetry import trace as trace_api
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
_OPENINFERENCE_AVAILABLE = True
except ImportError:
pass
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# ⚡ Global state for async batch export # ⚡ Global state for async batch export
...@@ -31,9 +46,10 @@ _batch_lock = asyncio.Lock if hasattr(asyncio, "Lock") else None ...@@ -31,9 +46,10 @@ _batch_lock = asyncio.Lock if hasattr(asyncio, "Lock") else None
def initialize_langfuse() -> bool: def initialize_langfuse() -> bool:
""" """
1. Set environment variables 1. Setup OpenInference instrumentation cho Agno (nếu available)
2. Initialize Langfuse client 2. Configure OTLP exporter để gửi traces đến Langfuse
3. Setup thread pool for async batch export 3. Initialize Langfuse client (fallback)
4. Register shutdown handler
""" """
global _langfuse_client, _export_executor global _langfuse_client, _export_executor
...@@ -44,27 +60,95 @@ def initialize_langfuse() -> bool: ...@@ -44,27 +60,95 @@ def initialize_langfuse() -> bool:
# Set environment # Set environment
os.environ["LANGFUSE_PUBLIC_KEY"] = LANGFUSE_PUBLIC_KEY os.environ["LANGFUSE_PUBLIC_KEY"] = LANGFUSE_PUBLIC_KEY
os.environ["LANGFUSE_SECRET_KEY"] = LANGFUSE_SECRET_KEY os.environ["LANGFUSE_SECRET_KEY"] = LANGFUSE_SECRET_KEY
os.environ["LANGFUSE_BASE_URL"] = LANGFUSE_BASE_URL or "https://cloud.langfuse.com" base_url = LANGFUSE_BASE_URL or "https://cloud.langfuse.com"
os.environ["LANGFUSE_TIMEOUT"] = "10" # 10s timeout, not blocking os.environ["LANGFUSE_BASE_URL"] = base_url
os.environ["LANGFUSE_TIMEOUT"] = "10"
# Disable default flush to prevent blocking os.environ["LANGFUSE_FLUSHINTERVAL"] = "300"
os.environ["LANGFUSE_FLUSHINTERVAL"] = "300" # 5 min, very infrequent
try: try:
_langfuse_client = get_client() # ========== Setup OpenInference cho Agno ==========
_export_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="langfuse_export") global _OPENINFERENCE_AVAILABLE
if _OPENINFERENCE_AVAILABLE:
if _langfuse_client.auth_check(): try:
logger.info("✅ Langfuse Ready! (async batch export)") # Determine Langfuse OTLP endpoint
return True if "localhost" in base_url or "127.0.0.1" in base_url:
logger.error("❌ Langfuse auth failed") otlp_endpoint = f"{base_url}/api/public/otel"
return False elif "us.cloud" in base_url:
otlp_endpoint = "https://us.cloud.langfuse.com/api/public/otel"
elif "eu.cloud" in base_url:
otlp_endpoint = "https://eu.cloud.langfuse.com/api/public/otel"
else:
# Custom deployment
otlp_endpoint = f"{base_url}/api/public/otel"
# Create auth header
langfuse_auth = base64.b64encode(
f"{LANGFUSE_PUBLIC_KEY}:{LANGFUSE_SECRET_KEY}".encode()
).decode()
# Set OTLP environment variables
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = otlp_endpoint
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {langfuse_auth}"
# Configure TracerProvider
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter()))
trace_api.set_tracer_provider(tracer_provider=tracer_provider)
# Instrument Agno
if AgnoInstrumentor:
AgnoInstrumentor().instrument()
logger.info(f"✅ OpenInference instrumentation enabled for Agno")
logger.info(f" → Sending traces to: {otlp_endpoint}")
except Exception as e:
logger.warning(f"⚠️ Failed to setup OpenInference: {e}. Falling back to Langfuse SDK.")
_OPENINFERENCE_AVAILABLE = False
# ========== Fallback: Langfuse SDK ==========
if not _OPENINFERENCE_AVAILABLE:
_langfuse_client = get_client()
_export_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="langfuse_export")
# Register shutdown handler
import atexit
atexit.register(shutdown_langfuse)
logger.info(f"✅ Langfuse initialized (BASE_URL: {base_url})")
return True
except Exception as e: except Exception as e:
logger.error(f"❌ Langfuse init error: {e}") logger.error(f"❌ Langfuse init error: {e}")
return False return False
def shutdown_langfuse():
"""Shutdown Langfuse client gracefully để tránh nghẽn khi exit"""
global _langfuse_client, _export_executor
try:
if _langfuse_client:
# Flush pending traces trước khi shutdown
try:
_langfuse_client.flush()
except Exception as e:
logger.debug(f"Langfuse flush error during shutdown: {e}")
# Shutdown client (non-blocking với timeout)
try:
if hasattr(_langfuse_client, "shutdown"):
_langfuse_client.shutdown()
except Exception as e:
logger.debug(f"Langfuse shutdown error: {e}")
if _export_executor:
_export_executor.shutdown(wait=False) # Non-blocking shutdown
logger.debug("🔒 Langfuse client shutdown completed")
except Exception as e:
logger.debug(f"Error during Langfuse shutdown: {e}")
async def async_flush_langfuse(): async def async_flush_langfuse():
""" """
Async wrapper to flush Langfuse without blocking event loop. Async wrapper to flush Langfuse without blocking event loop.
......
This diff is collapsed.
"""
LLM Factory - OpenAI LLM creation with caching.
Manages initialization and caching of OpenAI models.
"""
import contextlib
import logging
from langchain_core.language_models import BaseChatModel
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from config import OPENAI_API_KEY
logger = logging.getLogger(__name__)
class LLMFactory:
"""Singleton factory for managing OpenAI LLM instances with caching."""
COMMON_MODELS: list[str] = [
"gpt-4o-mini",
"gpt-4o",
"gpt-5-nano",
"gpt-5-mini",
]
def __init__(self):
"""Initialize LLM factory with empty cache."""
self._cache: dict[tuple[str, bool, bool, str | None], BaseChatModel] = {}
def get_model(
self,
model_name: str,
streaming: bool = True,
json_mode: bool = False,
api_key: str | None = None,
) -> BaseChatModel:
"""
Get or create an LLM instance from cache.
Args:
model_name: Model identifier (e.g., "gpt-4o-mini", "gemini-2.0-flash-lite-preview-02-05")
streaming: Enable streaming responses
json_mode: Enable JSON output format
api_key: Optional API key override
Returns:
Configured LLM instance
"""
clean_model = model_name.split("/")[-1] if "/" in model_name else model_name
cache_key = (clean_model, streaming, json_mode, api_key)
if cache_key in self._cache:
logger.debug(f"♻️ Using cached model: {clean_model}")
return self._cache[cache_key]
logger.info(f"Creating new LLM instance: {clean_model}")
return self._create_instance(clean_model, streaming, json_mode, api_key)
def _create_instance(
self,
model_name: str,
streaming: bool = False,
json_mode: bool = False,
api_key: str | None = None,
) -> BaseChatModel:
"""Create and cache a new OpenAI LLM instance."""
try:
llm = self._create_openai(model_name, streaming, json_mode, api_key)
cache_key = (model_name, streaming, json_mode, api_key)
self._cache[cache_key] = llm
return llm
except Exception as e:
logger.error(f"❌ Failed to create model {model_name}: {e}")
raise
def _create_openai(self, model_name: str, streaming: bool, json_mode: bool, api_key: str | None) -> BaseChatModel:
"""Create OpenAI model instance."""
key = api_key or OPENAI_API_KEY
if not key:
raise ValueError("OPENAI_API_KEY is required")
llm_kwargs = {
"model": model_name,
"streaming": streaming,
"api_key": key,
"temperature": 0,
}
# Nếu bật json_mode, tiêm trực tiếp vào constructor
if json_mode:
llm_kwargs["model_kwargs"] = {"response_format": {"type": "json_object"}}
logger.info(f"⚙️ Initializing OpenAI in JSON mode: {model_name}")
llm = ChatOpenAI(**llm_kwargs)
logger.info(f"✅ Created OpenAI: {model_name}")
return llm
def _enable_json_mode(self, llm: BaseChatModel, model_name: str) -> BaseChatModel:
"""Enable JSON mode for the LLM."""
try:
llm = llm.bind(response_format={"type": "json_object"})
logger.debug(f"⚙️ JSON mode enabled for {model_name}")
except Exception as e:
logger.warning(f"⚠️ JSON mode not supported: {e}")
return llm
def initialize(self, skip_warmup: bool = True) -> None:
"""
Pre-initialize common models.
Args:
skip_warmup: Skip initialization if True
"""
if skip_warmup or self._cache:
return
logger.info("🔥 Warming up LLM Factory...")
for model_name in self.COMMON_MODELS:
with contextlib.suppress(Exception):
self.get_model(model_name, streaming=True)
# --- Singleton Instance & Public API ---
_factory = LLMFactory()
def create_llm(
model_name: str,
streaming: bool = True,
json_mode: bool = False,
api_key: str | None = None,
) -> BaseChatModel:
"""Create or get cached LLM instance."""
return _factory.get_model(model_name, streaming=streaming, json_mode=json_mode, api_key=api_key)
def init_llm_factory(skip_warmup: bool = True) -> None:
"""Initialize the LLM factory."""
_factory.initialize(skip_warmup)
def create_embedding_model() -> OpenAIEmbeddings:
"""Create OpenAI embeddings model."""
return OpenAIEmbeddings(model="text-embedding-3-small", api_key=OPENAI_API_KEY)
...@@ -3,8 +3,8 @@ StarRocks Database Connection Utility ...@@ -3,8 +3,8 @@ StarRocks Database Connection Utility
Based on chatbot-rsa pattern Based on chatbot-rsa pattern
""" """
import logging
import asyncio import asyncio
import logging
from typing import Any from typing import Any
import aiomysql import aiomysql
...@@ -34,11 +34,11 @@ class StarRocksConnection: ...@@ -34,11 +34,11 @@ class StarRocksConnection:
password: str | None = None, password: str | None = None,
port: int | None = None, port: int | None = None,
): ):
self.host = host or STARROCKS_HOST self.host = host or STARROCKS_HOST or ""
self.database = database or STARROCKS_DB self.database = database or STARROCKS_DB or ""
self.user = user or STARROCKS_USER self.user = user or STARROCKS_USER or ""
self.password = password or STARROCKS_PASSWORD self.password = password or STARROCKS_PASSWORD or ""
self.port = port or STARROCKS_PORT self.port = port or STARROCKS_PORT or 3306
# self.conn references the shared connection # self.conn references the shared connection
self.conn = None self.conn = None
...@@ -61,11 +61,15 @@ class StarRocksConnection: ...@@ -61,11 +61,15 @@ class StarRocksConnection:
print(f" [DB] 🔌 Đang kết nối StarRocks (New Session): {self.host}:{self.port}...") print(f" [DB] 🔌 Đang kết nối StarRocks (New Session): {self.host}:{self.port}...")
logger.info(f"🔌 Connecting to StarRocks at {self.host}:{self.port} (DB: {self.database})...") logger.info(f"🔌 Connecting to StarRocks at {self.host}:{self.port} (DB: {self.database})...")
try: try:
# Ensure all required parameters are strings (not None)
if not all([self.host, self.user, self.password, self.database]):
raise ValueError("Missing required StarRocks connection parameters")
new_conn = pymysql.connect( new_conn = pymysql.connect(
host=self.host, host=self.host,
port=self.port, port=self.port,
user=self.user, user=self.user,
password=self.password, password=self.password, # Now guaranteed to be str, not None
database=self.database, database=self.database,
charset="utf8mb4", charset="utf8mb4",
cursorclass=DictCursor, cursorclass=DictCursor,
...@@ -121,11 +125,15 @@ class StarRocksConnection: ...@@ -121,11 +125,15 @@ class StarRocksConnection:
# Double-check inside lock to prevent multiple pools # Double-check inside lock to prevent multiple pools
if StarRocksConnection._shared_pool is None: if StarRocksConnection._shared_pool is None:
logger.info(f"🔌 Creating Async Pool to {self.host}:{self.port}...") logger.info(f"🔌 Creating Async Pool to {self.host}:{self.port}...")
# Ensure all required parameters are strings (not None)
if not all([self.host, self.user, self.password, self.database]):
raise ValueError("Missing required StarRocks connection parameters")
StarRocksConnection._shared_pool = await aiomysql.create_pool( StarRocksConnection._shared_pool = await aiomysql.create_pool(
host=self.host, host=self.host,
port=self.port, port=self.port,
user=self.user, user=self.user,
password=self.password, password=self.password, # Now guaranteed to be str, not None
db=self.database, db=self.database,
charset="utf8mb4", charset="utf8mb4",
cursorclass=aiomysql.DictCursor, cursorclass=aiomysql.DictCursor,
...@@ -141,18 +149,18 @@ class StarRocksConnection: ...@@ -141,18 +149,18 @@ class StarRocksConnection:
""" """
max_retries = 3 max_retries = 3
last_error = None last_error = None
for attempt in range(max_retries): for attempt in range(max_retries):
try: try:
pool = await self.get_pool() pool = await self.get_pool()
# logger.info(f"🚀 Executing Async Query (Attempt {attempt+1}).") # logger.info(f"🚀 Executing Async Query (Attempt {attempt+1}).")
async with pool.acquire() as conn, conn.cursor() as cursor: async with pool.acquire() as conn, conn.cursor() as cursor:
await cursor.execute(query, params) await cursor.execute(query, params)
results = await cursor.fetchall() results = await cursor.fetchall()
# logger.info(f"📊 Async Query successful, returned {len(results)} rows") # logger.info(f"📊 Async Query successful, returned {len(results)} rows")
return [dict(row) for row in results] return [dict(row) for row in results]
except Exception as e: except Exception as e:
last_error = e last_error = e
logger.warning(f"⚠️ StarRocks DB Error (Attempt {attempt+1}/{max_retries}): {e}") logger.warning(f"⚠️ StarRocks DB Error (Attempt {attempt+1}/{max_retries}): {e}")
...@@ -160,15 +168,16 @@ class StarRocksConnection: ...@@ -160,15 +168,16 @@ class StarRocksConnection:
# Nếu StarRocks OOM, đợi một chút rồi thử lại # Nếu StarRocks OOM, đợi một chút rồi thử lại
await asyncio.sleep(0.5 * (attempt + 1)) await asyncio.sleep(0.5 * (attempt + 1))
continue continue
elif "Disconnected" in str(e) or "Lost connection" in str(e): if "Disconnected" in str(e) or "Lost connection" in str(e):
# Nếu mất kết nối, có thể pool bị stale, thử lại ngay # Nếu mất kết nối, có thể pool bị stale, thử lại ngay
continue continue
else: # Các lỗi khác (cú pháp,...) thì raise luôn
# Các lỗi khác (cú pháp,...) thì raise luôn raise
raise
logger.error(f"❌ Failed after {max_retries} attempts: {last_error}") logger.error(f"❌ Failed after {max_retries} attempts: {last_error}")
raise last_error if last_error:
raise last_error
raise RuntimeError("Failed to execute query after multiple attempts")
def close(self): def close(self):
"""Explicitly close if needed (e.g. app shutdown)""" """Explicitly close if needed (e.g. app shutdown)"""
......
...@@ -49,7 +49,9 @@ langchain==1.2.0 ...@@ -49,7 +49,9 @@ langchain==1.2.0
langchain-core==1.2.3 langchain-core==1.2.3
langchain-google-genai==4.1.2 langchain-google-genai==4.1.2
langchain-openai==1.1.6 langchain-openai==1.1.6
agno==2.3.24
langfuse==3.11.0 langfuse==3.11.0
openinference-instrumentation-agno==1.0.0
langgraph==1.0.5 langgraph==1.0.5
langgraph-checkpoint==3.0.1 langgraph-checkpoint==3.0.1
langgraph-checkpoint-postgres==3.0.2 langgraph-checkpoint-postgres==3.0.2
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment