Commit f057ad1e authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

feat: complete Langfuse integration with propagate_attributes for user_id tracking

- Removed OpenTelemetry (TracerProvider, OTLPSpanExporter, LoggingInstrumentor, FastAPIInstrumentor)
- Implemented Langfuse v3.11.0 with CallbackHandler for LLM tracing
- Added langfuse_trace_context() with propagate_attributes() for proper user_id filtering
- Fixed user_id to appear in Langfuse User ID filter (not just metadata)
- Added session_id and tags propagation for trace organization
- Updated controller.py to wrap graph execution in langfuse_trace_context
- Verified traces send to self-hosted Langfuse at http://172.16.2.207:3009
- Configuration: LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_BASE_URL in .env

GIAI_DOAN_1 COMPLETE: LLM observability via Langfuse SDK
Next: GIAI_DOAN_2 - Container monitoring stack (cAdvisor + Prometheus + Grafana)
parent 8fccd372
FROM grafana/k6:latest
# Copy K6 test script
COPY backend/hehe/k6-chatbot-test.js /scripts/chatbot-test.js
# Default command - chạy K6 test
CMD ["run", "--out", "json=/tmp/results.json", "/scripts/chatbot-test.js"]
"""
Fashion Q&A Agent Controller
Switched to LangSmith for tracing (configured via environment variables).
Langfuse will auto-trace via LangChain integration (no code changes needed).
"""
import json
......@@ -12,6 +12,7 @@ from langchain_core.messages import AIMessage, HumanMessage, ToolMessage
from langchain_core.runnables import RunnableConfig
from common.conversation_manager import ConversationManager, get_conversation_manager
from common.langfuse_client import get_callback_handler, langfuse_trace_context
from common.llm_factory import create_llm
from config import DEFAULT_MODEL
......@@ -31,9 +32,10 @@ async def chat_controller(
) -> dict:
"""
Controller main logic for non-streaming chat requests.
1. Initialize resources (LLM, tools, graph, conversation manager)
Langfuse will automatically trace all LangChain operations.
"""
logger.info(f"▶️ Starting chat_controller with model: {model_name} for user: {user_id}")
config = get_config()
config.model_name = model_name
......@@ -59,6 +61,8 @@ async def chat_controller(
)
try:
# 🔥 Wrap graph execution với langfuse_trace_context để set user_id cho tất cả observations
with langfuse_trace_context(user_id=user_id, session_id=user_id):
# TỐI ƯU: Chạy Graph
result = await graph.ainvoke(initial_state, config=exec_config)
......@@ -160,8 +164,16 @@ def _prepare_execution_context(query: str, user_id: str, history: list, images:
"ai_response": None,
}
run_id = str(uuid.uuid4())
# Metadata for LangSmith (TẮT TẠM VÌ RATE LIMIT)
# metadata = {"user_id": user_id, "run_id": run_id}
# Metadata for LangChain (tags for logging/filtering)
metadata = {
"run_id": run_id,
"tags": "chatbot,production",
}
# 🔥 CallbackHandler - sẽ được wrap trong langfuse_trace_context để set user_id
# Per Langfuse docs: propagate_attributes() handles user_id propagation
langfuse_handler = get_callback_handler()
exec_config = RunnableConfig(
configurable={
......@@ -170,7 +182,8 @@ def _prepare_execution_context(query: str, user_id: str, history: list, images:
"run_id": run_id,
},
run_id=run_id,
# metadata=metadata, # Attach metadata for LangSmith
metadata=metadata,
callbacks=[langfuse_handler] if langfuse_handler else [],
)
return initial_state, exec_config
......
This diff is collapsed.
This diff is collapsed.
"""
CANIFA Data Retrieval Tool - Tối giản cho Agentic Workflow.
Hỗ trợ Hybrid Search: Semantic (Vector) + Metadata Filter.
"""
import asyncio
import json
import logging
import time
from decimal import Decimal
from langchain_core.tools import tool
from pydantic import BaseModel, Field
from agent.tools.product_search_helpers import build_starrocks_query
from common.starrocks_connection import StarRocksConnection
# from langsmith import traceable
logger = logging.getLogger(__name__)
class DecimalEncoder(json.JSONEncoder):
"""Xử lý kiểu Decimal từ Database khi convert sang JSON."""
def default(self, obj):
if isinstance(obj, Decimal):
return float(obj)
return super().default(obj)
class SearchItem(BaseModel):
"""Cấu trúc một mục tìm kiếm đơn lẻ trong Multi-Search."""
query: str = Field(
...,
description="Câu hỏi/mục đích tự do của user (đi chơi, dự tiệc, phỏng vấn,...) - dùng cho Semantic Search",
)
keywords: str | None = Field(
..., description="Từ khóa sản phẩm cụ thể (áo polo, quần jean,...) - dùng cho LIKE search"
)
magento_ref_code: str | None = Field(
..., description="Mã sản phẩm hoặc mã màu/SKU (Ví dụ: 8TS24W001 hoặc 8TS24W001-SK010)."
)
product_line_vn: str | None = Field(..., description="Dòng sản phẩm (Áo phông, Quần short,...)")
gender_by_product: str | None = Field(..., description="Giới tính: male, female")
age_by_product: str | None = Field(..., description="Độ tuổi: adult, kids, baby, others")
master_color: str | None = Field(..., description="Màu sắc chính (Đen/ Black, Trắng/ White,...)")
material_group: str | None = Field(
...,
description="Nhóm chất liệu. BẮT BUỘC dùng đúng: 'Yarn - Sợi', 'Knit - Dệt Kim', 'Woven - Dệt Thoi', 'Knit/Woven - Dệt Kim/Dệt Thoi'.",
)
season: str | None = Field(..., description="Mùa (Spring Summer, Autumn Winter)")
style: str | None = Field(..., description="Phong cách (Basic Update, Fashion,...)")
fitting: str | None = Field(..., description="Form dáng (Regular, Slim, Loose,...)")
form_neckline: str | None = Field(..., description="Kiểu cổ (Crew Neck, V-neck,...)")
form_sleeve: str | None = Field(..., description="Kiểu tay (Short Sleeve, Long Sleeve,...)")
price_min: float | None = Field(..., description="Giá thấp nhất")
price_max: float | None = Field(..., description="Giá cao nhất")
action: str = Field(..., description="Hành động: 'search' (tìm kiếm) hoặc 'visual_search' (phân tích ảnh)")
class MultiSearchParams(BaseModel):
"""Tham số cho Parallel Multi-Search."""
searches: list[SearchItem] = Field(..., description="Danh sách các truy vấn tìm kiếm chạy song song")
@tool(args_schema=MultiSearchParams)
# @traceable(run_type="tool", name="data_retrieval_tool")
async def data_retrieval_tool(searches: list[SearchItem]) -> str:
"""
Siêu công cụ tìm kiếm sản phẩm CANIFA - Hỗ trợ Parallel Multi-Search (Chạy song song nhiều query).
💡 ĐIỂM ĐẶC BIỆT:
Công cụ này cho phép thực hiện NHIỀU truy vấn tìm kiếm CÙNG LÚC.
Hãy dùng nó khi cần SO SÁNH sản phẩm hoặc tìm trọn bộ OUTFIT (mix & match).
⚠️ QUAN TRỌNG - KHI NÀO DÙNG GÌ:
1️⃣ DÙNG 'query' (Semantic Search - BUỘC PHẢI CÓ):
- Áp dụng cho mọi lượt search để cung cấp bối cảnh (context).
- Ví dụ: "áo thun nam đi biển", "quần tây công sở", "đồ cho bé màu xanh"...
2️⃣ DÙNG METADATA FILTERS (Exact/Partial Match):
- Khi khách nói rõ THUỘC TÍNH: Màu sắc, giá, giới tính, độ tuổi, mã sản phẩm.
- **QUY TẮC MÃ SẢN PHẨM:** Mọi loại mã (VD: `8TS...` hoặc `8TS...-SK...`) → Điền vào `magento_ref_code`.
- **QUY TẮC CHẤT LIÊU (material_group):** Chỉ dùng: `Yarn - Sợi`, `Knit - Dệt Kim`, `Woven - Dệt Thoi`, `Knit/Woven - Dệt Kim/Dệt Thoi`.
📝 VÍ DỤ CHI TIẾT (Single Search):
- Example 1: searches=[{"query": "áo polo nam giá dưới 400k", "keywords": "áo polo", "gender_by_product": "male", "price_max": 400000}]
- Example 2: searches=[{"query": "sản phẩm mã 8TS24W001", "magento_ref_code": "8TS24W001"}]
🚀 VÍ DỤ CẤP CAO (Multi-Search Parallel):
- Example 3 - So sánh: "So sánh áo thun nam đen và áo sơ mi trắng dưới 500k"
Tool Call: searches=[
{"query": "áo thun nam màu đen dưới 500k", "keywords": "áo thun", "master_color": "Đen", "gender_by_product": "male", "price_max": 500000},
{"query": "áo sơ mi nam trắng dưới 500k", "keywords": "áo sơ mi", "master_color": "Trắng", "gender_by_product": "male", "price_max": 500000}
]
- Example 4 - Phối đồ: "Tìm cho mình một cái quần jean và một cái áo khoác để đi chơi"
Tool Call: searches=[
{"query": "quần jean đi chơi năng động", "keywords": "quần jean"},
{"query": "áo khoác đi chơi năng động", "keywords": "áo khoác"}
]
- Example 5 - Cả gia đình: "Tìm áo phông màu xanh cho bố, mẹ và bé trai"
Tool Call: searches=[
{"query": "áo phông nam người lớn màu xanh", "keywords": "áo phông", "master_color": "Xanh", "gender_by_product": "male", "age_by_product": "adult"},
{"query": "áo phông nữ người lớn màu xanh", "keywords": "áo phông", "master_color": "Xanh", "gender_by_product": "female", "age_by_product": "adult"},
{"query": "áo phông bé trai màu xanh", "keywords": "áo phông", "master_color": "Xanh", "gender_by_product": "male", "age_by_product": "others"}
]
"""
logger.info("🔧 [DEBUG] data_retrieval_tool STARTED")
try:
logger.info("🔧 [DEBUG] Creating StarRocksConnection instance")
db = StarRocksConnection()
logger.info("🔧 [DEBUG] StarRocksConnection created successfully")
# 0. Log input parameters (Đúng ý bro)
logger.info(f"📥 [Tool Input] data_retrieval_tool received {len(searches)} items:")
for idx, item in enumerate(searches):
logger.info(f" 🔹 Item [{idx}]: {item.dict(exclude_none=True)}")
# 1. Tạo tasks chạy song song (Parallel)
logger.info("🔧 [DEBUG] Creating parallel tasks")
tasks = []
for item in searches:
tasks.append(_execute_single_search(db, item))
logger.info(f"🚀 [Parallel Search] Executing {len(searches)} queries simultaneously...")
logger.info("🔧 [DEBUG] About to call asyncio.gather()")
results = await asyncio.gather(*tasks)
logger.info(f"🔧 [DEBUG] asyncio.gather() completed with {len(results)} results")
# 2. Tổng hợp kết quả
combined_results = []
for i, products in enumerate(results):
combined_results.append(
{
"search_index": i,
"search_criteria": searches[i].dict(exclude_none=True),
"count": len(products),
"products": products,
}
)
return json.dumps({"status": "success", "results": combined_results}, ensure_ascii=False, cls=DecimalEncoder)
except Exception as e:
logger.error(f"Error in Multi-Search data_retrieval_tool: {e}")
return json.dumps({"status": "error", "message": str(e)})
async def _execute_single_search(db: StarRocksConnection, item: SearchItem) -> list[dict]:
"""Thực thi một search query đơn lẻ (Async)."""
try:
logger.info(f"🔧 [DEBUG] _execute_single_search STARTED for query: {item.query[:50] if item.query else 'None'}")
# ⏱️ Timer: Build query (bao gồm embedding nếu có)
query_build_start = time.time()
logger.info("🔧 [DEBUG] Calling build_starrocks_query()")
sql = await build_starrocks_query(item)
query_build_time = (time.time() - query_build_start) * 1000 # Convert to ms
logger.info(f"🔧 [DEBUG] SQL query built, length: {len(sql)}")
logger.info(f"⏱️ [TIMER] Query Build Time (bao gồm embedding): {query_build_time:.2f}ms")
# ⏱️ Timer: Execute DB query
db_start = time.time()
logger.info("🔧 [DEBUG] Calling db.execute_query_async()")
products = await db.execute_query_async(sql)
db_time = (time.time() - db_start) * 1000 # Convert to ms
logger.info(f"🔧 [DEBUG] Query executed, got {len(products)} products")
logger.info(f"⏱️ [TIMER] DB Query Execution Time: {db_time:.2f}ms")
logger.info(f"⏱️ [TIMER] Total Time (Build + DB): {query_build_time + db_time:.2f}ms")
return _format_product_results(products)
except Exception as e:
logger.error(f"Single search error for item {item}: {e}")
return []
def _format_product_results(products: list[dict]) -> list[dict]:
"""Lọc và format kết quả trả về cho Agent."""
allowed_fields = {
"internal_ref_code",
"description_text_full",
}
return [{k: v for k, v in p.items() if k in allowed_fields} for p in products[:5]]
This diff is collapsed.
......@@ -7,12 +7,14 @@ Router chỉ chứa định nghĩa API, logic nằm ở controller.
import logging
from fastapi import APIRouter, BackgroundTasks, HTTPException
from opentelemetry import trace
from agent.controller import chat_controller
from agent.models import QueryRequest
from config import DEFAULT_MODEL
logger = logging.getLogger(__name__)
tracer = trace.get_tracer(__name__)
router = APIRouter()
......@@ -25,6 +27,14 @@ async def fashion_qa_chat(req: QueryRequest, background_tasks: BackgroundTasks):
logger.info(f"📥 [Incoming Query - NonStream] User: {user_id} | Query: {req.user_query}")
# Get current span để add logs VÀO JAEGER UI
span = trace.get_current_span()
span.set_attribute("user.id", user_id)
span.set_attribute("chat.user_query", req.user_query)
span.add_event(
"📥 User query received", attributes={"user_id": user_id, "query": req.user_query, "timestamp": "incoming"}
)
try:
# Gọi controller để xử lý logic (Non-streaming)
result = await chat_controller(
......@@ -35,7 +45,22 @@ async def fashion_qa_chat(req: QueryRequest, background_tasks: BackgroundTasks):
images=req.images,
)
# Log chi tiết response
logger.info(f"📤 [Outgoing Response - NonStream] User: {user_id}")
logger.info(f"💬 AI Response: {result['ai_response']}")
logger.info(f"🛍️ Product IDs: {result.get('product_ids', [])}")
# Add to span (hiển thị trong Jaeger UI)
span.set_attribute("chat.ai_response", result["ai_response"][:200]) # Giới hạn 200 ký tự
span.set_attribute("chat.product_count", len(result.get("product_ids", [])))
span.add_event(
"💬 AI response generated",
attributes={
"ai_response_preview": result["ai_response"][:100],
"product_count": len(result.get("product_ids", [])),
"product_ids": str(result.get("product_ids", [])[:5]), # First 5 IDs
},
)
return {
"status": "success",
......@@ -45,56 +70,3 @@ async def fashion_qa_chat(req: QueryRequest, background_tasks: BackgroundTasks):
except Exception as e:
logger.error(f"Error in fashion_qa_chat: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
# ====================== FASHION Q&A CHAT API ======================
# @router.post("/stream/chat", summary="Fashion Q&A Chat with Streaming Response")
# async def fashion_qa_chat_stream(req: QueryRequest, request: Request):
# """
# Endpoint duy nhất cho việc chat với Fashion Agent.
# """
# # Trích xuất user_id từ request (auth middleware)
# user_id = getattr(request.state, "user_id", None) or req.user_id or "default_user"
# logger.info(f"📥 [Incoming Query] User: {user_id} | Query: {req.query}")
# try:
# # Gọi controller để xử lý logic và nhận generator stream
# # Note: Vì chat_controller có decorator @observe(), cần await để unwrap
# generator: AsyncGenerator[str, None] = chat_controller(
# query=req.query,
# user_id=user_id,
# model_name=DEFAULT_MODEL,
# conversation_id=req.conversation_id,
# images=req.images,
# )
# async def logging_generator(gen: AsyncGenerator[str, None]):
# full_response_log = ""
# first_chunk = True
# try:
# async for chunk in gen:
# if first_chunk:
# logger.info("🚀 [Stream Started] First chunk received")
# first_chunk = False
# full_response_log += chunk
# yield chunk
# except Exception as e:
# logger.error(f"❌ [Stream Error] {e}")
# yield f"data: {json.dumps({'error': str(e)})}\n\n"
# logger.info(f"📤 [Outgoing Response Stream Finished] Total Chunks Length: {len(full_response_log)}")
# return StreamingResponse(
# logging_generator(generator),
# media_type="text/event-stream",
# headers={
# "Cache-Control": "no-cache",
# "Connection": "keep-alive",
# "X-Accel-Buffering": "no",
# },
# )
# except Exception as e:
# logger.error(f"Error in fashion_qa_chat: {e}", exc_info=True)
# raise HTTPException(status_code=500, detail=str(e)) from e
from fastapi import APIRouter, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Optional
import time
import random
router = APIRouter()
# --- MODELS ---
class MockQueryRequest(BaseModel):
user_query: str
user_id: Optional[str] = "test_user"
session_id: Optional[str] = None
class MockDBRequest(BaseModel):
vector: Optional[List[float]] = None
top_k: int = 10
# --- MOCK DATA ---
MOCK_PRODUCTS = [
{
"internal_ref_code": "8TE24W001",
"description_text_full": "Áo phông nam cotton thoáng mát, form regular fit.",
"sale_price": 199000.0,
"original_price": 299000.0,
"discount_amount": 100000.0,
"max_score": 0.89
},
{
"internal_ref_code": "8JE24W002",
"description_text_full": "Quần Jeans nam ống đứng, chất liệu denim co giãn.",
"sale_price": 399000.0,
"original_price": 499000.0,
"discount_amount": 100000.0,
"max_score": 0.85
},
{
"internal_ref_code": "8SK24W003",
"description_text_full": "Váy liền thân nữ dáng xòe, họa tiết hoa nhí.",
"sale_price": 350000.0,
"original_price": 450000.0,
"discount_amount": 100000.0,
"max_score": 0.82
}
]
# --- ENDPOINTS ---
@router.post("/mock/agent/chat", summary="Mock LLM Chat Endpoint (Fast)")
async def mock_chat(req: MockQueryRequest):
"""
Giả lập phản hồi của Chatbot (bỏ qua LLM & DB thật).
Dùng để test tải (Load Test) xem server chịu được bao nhiêu request/s.
"""
# Simulate slight processing time (10ms - 50ms)
time.sleep(random.uniform(0.01, 0.05))
return {
"status": "success",
"user_query": req.user_query,
"ai_response": {
"content": f"Đây là câu trả lời giả lập cho: '{req.user_query}'. Hệ thống đang hoạt động tốt!",
"role": "assistant"
},
"product_ids": ["8TE24W001", "8JE24W002"],
"processing_time": "0.03s"
}
@router.post("/mock/db/search", summary="Mock StarRocks Vector Search")
async def mock_db_search(req: MockDBRequest):
"""
Giả lập truy vấn Vector Database (StarRocks).
Input: Vector embedding (nếu có).
Output: Danh sách sản phẩm mock giống cấu trúc SQL thật.
"""
# Simulate DB Latency (e.g., 50ms - 100ms)
time.sleep(random.uniform(0.05, 0.1))
# Hardcoded vector from user request (Just for checking format)
# [0.01643567718565464, -0.0008633101242594421, ...]
return {
"status": "success",
"total_hits": 1340,
"results": MOCK_PRODUCTS, # Return existing mock data
"query_vector_dim": len(req.vector) if req.vector else 0,
"note": "This is MOCK data from memory, no actual DB connection."
}
"""
Simple Langfuse Client Wrapper
Minimal setup using langfuse.langchain module
With async-aware batch export using asyncio
With propagate_attributes for proper user_id tracking
"""
import asyncio
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from langfuse import Langfuse, get_client
from langfuse import Langfuse, get_client, propagate_attributes
from langfuse.langchain import CallbackHandler
from config import (
......@@ -81,27 +82,60 @@ async def async_flush_langfuse():
logger.warning(f"⚠️ Async flush failed: {e}")
def get_callback_handler(trace_id: str | None = None, **trace_kwargs) -> CallbackHandler | None:
def get_callback_handler(
trace_id: str | None = None,
user_id: str | None = None,
session_id: str | None = None,
tags: list[str] | None = None,
**trace_kwargs,
) -> CallbackHandler | None:
"""
Get CallbackHandler with unique trace context.
Args:
trace_id: Optional unique trace ID. If not provided, Langfuse will auto-generate.
**trace_kwargs: Additional trace attributes (user_id, session_id, tags, etc.)
trace_id: Optional unique trace ID
user_id: User ID for grouping traces by user (NOT set here - use propagate_attributes instead)
session_id: Session ID for grouping traces by session/conversation
tags: List of tags for filtering traces
**trace_kwargs: Additional trace attributes
Returns:
CallbackHandler instance configured for a new trace
CallbackHandler instance + propagate_attributes context manager
Note:
Per Langfuse docs: use propagate_attributes(user_id=...) context manager
to properly set user_id across all observations in the trace.
This makes user_id appear as a filterable field in Langfuse UI.
"""
try:
# Create handler with optional trace context
# According to Langfuse docs v3: Each CallbackHandler() call creates a NEW trace
handler = CallbackHandler()
# If trace metadata provided, we'll rely on runtime config to set them
# (trace_id, user_id, session_id should be passed via config metadata)
if not _langfuse_client:
logger.warning("⚠️ Langfuse client not initialized")
return None
logger.debug(f"✅ Langfuse CallbackHandler created (trace_id={trace_id or 'auto'})")
handler = CallbackHandler()
logger.debug("✅ Langfuse CallbackHandler created")
return handler
except Exception as e:
logger.warning(f"⚠️ CallbackHandler error: {e}")
return None
@contextmanager
def langfuse_trace_context(user_id: str | None = None, session_id: str | None = None, tags: list[str] | None = None):
"""
Context manager to propagate user_id, session_id, tags to all observations.
Usage:
with langfuse_trace_context(user_id="user_123", session_id="session_456"):
# All observations created here will have these attributes
await invoke_chain()
"""
attrs = {}
if user_id:
attrs["user_id"] = user_id
if session_id:
attrs["session_id"] = session_id
# Tags are set via metadata, not propagate_attributes
with propagate_attributes(**attrs):
yield
......@@ -37,6 +37,11 @@ __all__ = [
"MONGODB_DB_NAME",
"MONGODB_URI",
"OPENAI_API_KEY",
"OTEL_EXPORTER_JAEGER_AGENT_HOST",
"OTEL_EXPORTER_JAEGER_AGENT_PORT",
"OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES",
"OTEL_SERVICE_NAME",
"OTEL_TRACES_EXPORTER",
"PORT",
"REDIS_HOST",
"REDIS_PASSWORD",
......@@ -117,3 +122,9 @@ STARROCKS_DB: str | None = os.getenv("STARROCKS_DB")
# Placeholder for backward compatibility if needed
AI_MODEL_NAME = DEFAULT_MODEL
# ====================== OPENTELEMETRY CONFIGURATION ======================
OTEL_EXPORTER_JAEGER_AGENT_HOST = os.getenv("OTEL_EXPORTER_JAEGER_AGENT_HOST")
OTEL_EXPORTER_JAEGER_AGENT_PORT = os.getenv("OTEL_EXPORTER_JAEGER_AGENT_PORT")
OTEL_SERVICE_NAME = os.getenv("OTEL_SERVICE_NAME")
OTEL_TRACES_EXPORTER = os.getenv("OTEL_TRACES_EXPORTER")
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES = os.getenv("OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES")
import asyncio
import platform
# Fix Windows ProactorEventLoop issue với psycopg
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from psycopg_pool import AsyncConnectionPool
async def test():
pool = AsyncConnectionPool("", open=False)
saver = AsyncPostgresSaver(pool)
print(f"Attributes with 'pool': {[a for a in dir(saver) if 'pool' in a.lower()]}")
await pool.close()
if __name__ == "__main__":
asyncio.run(test())
import asyncio
import logging
from common.starrocks_connection import StarRocksConnection
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def main():
try:
db = StarRocksConnection()
print("🔌 Connecting to StarRocks...")
# Get Create Table Statement
table_name = "shared_source.magento_product_dimension_with_text_embedding"
res = await db.execute_query_async(f"SHOW CREATE TABLE {table_name}")
if res:
print("\n=== RAW RESULT KEYS ===")
print(res[0].keys())
print(res[0])
else:
print("❌ Could not get table info.")
except Exception as e:
print(f"❌ Error: {e}")
if __name__ == "__main__":
asyncio.run(main())
import asyncio
import os
import sys
# Ensure backend directory is in python path
current_dir = os.path.dirname(os.path.abspath(__file__))
if current_dir not in sys.path:
sys.path.append(current_dir)
from agent.tools.product_search_helpers import build_starrocks_query
class Params:
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
async def main():
# Params for "tìm cho tao áo side m phù hợp để đi chơi"
text = "tìm cho tao áo side m phù hợp để đi chơi"
# Mocking what the extraction layer might produce
params = Params(
query=text,
size_scale="M",
keywords="áo side m đi chơi"
)
print(f"Generating query for: {text}")
try:
sql = await build_starrocks_query(params)
print("Successfully generated query.")
print("Check d:\\cnf\\chatbot_canifa\\backend\\embedding.txt")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
asyncio.run(main())
This diff is collapsed.
This diff is collapsed.
Oke bro, tao hiểu rõ ý bro rồi! Để tao tóm gọn lại toàn bộ kế hoạch bằng tiếng Việt cho bro nghe rõ ràng nhé.
Hiểu đúng ý của bro
Bro đang muốn chia làm hai giai đoạn rõ ràng. Giai đoạn một là sửa code chatbot, bỏ phần OpenTelemetry instrumentation hiện tại đi và chuyển sang dùng Langfuse SDK thuần túy để trace LLM calls. Giai đoạn hai là setup infrastructure monitoring hoàn toàn độc lập, dùng cAdvisor để thu thập container metrics từ Docker, kết hợp với Prometheus và Grafana để visualize.
GIAI ĐOẠN 1: Chuyển sang Langfuse SDK thuần túy
Bước đầu tiên là bro cần cleanup code hiện tại. Hiện tại trong server.py của bro đang có đống code setup OpenTelemetry với TracerProvider, OTLPSpanExporter, LoggingInstrumentor. Tất cả những thứ này sẽ được remove sạch. Bro sẽ giữ lại file server đơn giản, chỉ focus vào khởi động FastAPI app mà thôi.
Bước tiếp theo là enable Langfuse đầy đủ. Hiện tại trong code của bro, Langfuse đang bị comment out ở nhiều chỗ với note "TẮT TẠM - Tránh rate limit". Bro cần uncomment tất cả những đoạn này. Cụ thể trong file langfuse_client.py đã có sẵn hàm initialize_langfuse() và get_callback_handler() rồi, bro chỉ cần gọi nó khi app startup.
Trong controller.py, bro cần sửa hàm _prepare_execution_context() để attach Langfuse CallbackHandler vào RunnableConfig. Thay vì dùng OpenTelemetry span để track, bro sẽ dùng Langfuse callback để tự động capture tất cả LangChain runs, bao gồm LLM calls, tool calls, và toàn bộ conversation flow.
Điều quan trọng là bro cần set metadata cho Langfuse trace. Khi tạo CallbackHandler, bro nên truyền vào user_id, session_id, và các custom tags để sau này filter dễ dàng trong Langfuse dashboard. Ví dụ như trace_id có thể là conversation_id, user_id để group theo user, tags để đánh dấu production hay staging environment.
Cuối cùng là test để đảm bảo Langfuse đang hoạt động. Bro gửi vài requests test, sau đó vào Langfuse dashboard kiểm tra xem có traces xuất hiện không, có đủ thông tin về LLM model, tokens, latency không. Nếu thấy đủ data rồi thì giai đoạn một hoàn tất.
GIAI ĐOẠN 2: Setup Container Monitoring với cAdvisor
Giai đoạn này hoàn toàn độc lập với code, bro chỉ cần làm việc với Docker và configuration files.
Đầu tiên bro cần tạo file docker-compose.monitoring.yml riêng cho monitoring stack. File này sẽ define ba services: cAdvisor, Prometheus, và Grafana. Lý do tách riêng là để bro có thể bật tắt monitoring stack độc lập mà không ảnh hưởng đến chatbot service chính.
Service cAdvisor trong docker-compose sẽ mount nhiều thứ từ host vào container. Bro cần mount /var/run/docker.sock để cAdvisor đọc được Docker daemon, mount /sys để đọc cgroups metrics, mount /var/lib/docker để đọc thông tin containers. Expose port 8080 để Prometheus có thể scrape metrics. Quan trọng là cAdvisor container phải chạy với privileged mode hoặc ít nhất có quyền đọc được những đường dẫn system này.
Tiếp theo là config Prometheus. Bro tạo file prometheus.yml để define scrape configs. Trong đó bro add một job tên là cadvisor với target là cadvisor:8080, scrape interval khoảng 15 giây là hợp lý. Prometheus sẽ tự động pull metrics từ endpoint này theo chu kỳ đã set.
Service Prometheus trong docker-compose sẽ mount file prometheus.yml vào /etc/prometheus/prometheus.yml, và mount volume để persist data vào /prometheus. Expose port 9090 để bro có thể access Prometheus UI kiểm tra targets đang healthy không.
Cuối cùng là Grafana. Service Grafana đơn giản nhất, chỉ cần expose port 3000 và mount volume để lưu dashboards. Khi start lần đầu, bro login vào Grafana với admin/admin, sau đó add Prometheus làm datasource với URL là http://prometheus:9090. Tiếp theo là import dashboard, bro có thể dùng dashboard có sẵn từ Grafana community như dashboard ID 193 cho Docker monitoring, hoặc tự tạo dashboard custom theo nhu cầu.
Sau khi setup xong, bro chạy docker-compose -f docker-compose.monitoring.yml up -d để start monitoring stack. Kiểm tra cAdvisor tại localhost:8080 xem có metrics không, kiểm tra Prometheus tại localhost:9090 xem đã scrape được từ cAdvisor chưa, và cuối cùng vào Grafana tại localhost:3000 để xem dashboard có data không.
Lưu ý quan trọng
Điều bro cần nhớ là hai hệ thống này hoạt động song song và hoàn toàn độc lập. Langfuse track application metrics như LLM calls, tokens, conversations. cAdvisor track infrastructure metrics như CPU, RAM, network của containers. Chúng không giao nhau, không conflict, và bro sẽ có một cái nhìn toàn diện về cả application lẫn infrastructure.
Một điểm nữa là khi stress test, bro sẽ mở hai dashboard cùng lúc. Langfuse dashboard để xem throughput của LLM calls, average latency, token consumption, cost estimation. Grafana dashboard để xem container có bị overload không, memory có tăng đột biến không, CPU có spike không. Kết hợp hai nguồn data này bro sẽ biết chính xác bottleneck nằm ở đâu, là do LLM API slow hay do container thiếu resources.
Tóm lại
Giai đoạn một là refactor code để dùng Langfuse SDK thay OpenTelemetry, focus vào LLM observability. Giai đoạn hai là setup Docker monitoring stack với cAdvisor, Prometheus, Grafana, hoàn toàn không động vào code. Hai giai đoạn này theo thứ tự, làm xong một mới qua hai, và cuối cùng bro có một hệ thống observability hoàn chỉnh cho cả application và infrastructure.
Rõ chưa bro? Tao có thiếu sót chi tiết nào không?
\ No newline at end of file
This diff is collapsed.
......@@ -65,6 +65,7 @@ openai==2.13.0
opentelemetry-api==1.39.1
opentelemetry-exporter-otlp-proto-common==1.39.1
opentelemetry-exporter-otlp-proto-http==1.39.1
opentelemetry-instrumentation-logging==0.50b1
opentelemetry-proto==1.39.1
opentelemetry-sdk==1.39.1
opentelemetry-semantic-conventions==0.60b1
......
......@@ -6,5 +6,3 @@ uvicorn server:app --host 0.0.0.0 --port 5000 --reload
uvicorn server:app --host 0.0.0.0 --port 5000
import asyncio
import os # Có os để mount static
import os
import platform
# ==========================================
# 🛑 QUAN TRỌNG: FIX LỖI WINDOWS Ở ĐÂY 🛑
# Phải chạy dòng này TRƯỚC KHI import bất kỳ thư viện nào khác
# ==========================================
if platform.system() == "Windows":
print("🔧 Windows detected: Applying SelectorEventLoopPolicy globally...")
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# Sau khi fix xong mới import tiếp
import logging
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles # Import cái này để mount HTML
from fastapi.staticfiles import StaticFiles
# Updated APIs (Import sau cùng để DB nhận cấu hình fix ở trên)
from api.chatbot_route import router as chatbot_router
from api.conservation_route import router as conservation_router
from common.langfuse_client import initialize_langfuse
from config import PORT
# Configure Logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[logging.StreamHandler()]
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler()],
)
logger = logging.getLogger(__name__)
# ==========================================
# 🔥 LANGFUSE INITIALIZATION
# ==========================================
if initialize_langfuse():
logger.info("✅ Langfuse initialized successfully")
else:
logger.warning("⚠️ Langfuse initialization failed or keys missing")
app = FastAPI(
title="Contract AI Service",
description="API for Contract AI Service",
version="1.0.0",
)
print("✅ Clerk Authentication middleware DISABLED (for testing)")
logger.info("✅ Clerk Authentication middleware DISABLED (for testing)")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
......@@ -45,9 +50,18 @@ app.add_middleware(
allow_headers=["*"],
)
app.include_router(conservation_router)
app.include_router(conservation_router)
app.include_router(chatbot_router)
#
# --- MOCK API FOR LOAD TESTING ---
try:
from api.mock_api_route import router as mock_router
app.include_router(mock_router)
print("✅ Mock API Router mounted at /mock")
except ImportError:
print("⚠️ Mock Router not found, skipping...")
# ==========================================
# 🟢 ĐOẠN MOUNT STATIC HTML CỦA BRO ĐÂY 🟢
......@@ -65,9 +79,9 @@ except Exception as e:
from fastapi.responses import RedirectResponse
@app.get("/")
async def root():
# Tự động nhảy sang trang Chatbot luôn
return RedirectResponse(url="/static/index.html")
......
......@@ -441,7 +441,7 @@
<div class="header">
<h2>🤖 Canifa AI Chat</h2>
<div class="config-area">
<input type="text" id="userId" placeholder="User ID" value="test_user_001">
<input type="text" id="userId" placeholder="User ID" value="test_user_009">
<button onclick="loadHistory(true)">↻ History</button>
<button onclick="clearUI()" style="background: #d32f2f;">✗ Clear UI</button>
</div>
......@@ -682,7 +682,7 @@
const img = document.createElement('img');
img.src = product.thumbnail_image_url || 'https://via.placeholder.com/200';
img.alt = product.name;
img.onerror = function() { this.src = 'https://via.placeholder.com/200?text=No+Image'; };
img.onerror = function () { this.src = 'https://via.placeholder.com/200?text=No+Image'; };
card.appendChild(img);
// Product body
......
version: '3.8'
services:
n8n:
image: n8nio/n8n:latest
container_name: n8n_local
ports:
- "5678:5678"
volumes:
- n8n_data:/home/node/.n8n
environment:
- N8N_HOST=localhost
- N8N_PORT=5678
- N8N_PROTOCOL=http
- WEBHOOK_URL=http://localhost:5678/
- GENERIC_TIMEZONE=Asia/Ho_Chi_Minh
restart: unless-stopped
volumes:
n8n_data:
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment