Commit 178108a8 authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

feat: full Langfuse tracing A-Z with @observe + get_client() v3.11 API

parent 56268101
...@@ -13,11 +13,10 @@ import uuid ...@@ -13,11 +13,10 @@ import uuid
from fastapi import BackgroundTasks from fastapi import BackgroundTasks
from langchain_core.messages import AIMessage, HumanMessage from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.runnables import RunnableConfig from langchain_core.runnables import RunnableConfig
from langfuse import Langfuse, propagate_attributes from langfuse import get_client as get_langfuse, observe, propagate_attributes
from common.cache import redis_cache from common.cache import redis_cache
from common.conversation_manager import get_conversation_manager from common.conversation_manager import get_conversation_manager
from common.langfuse_client import get_callback_handler
from config import DEFAULT_MODEL, REDIS_CACHE_TURN_ON from config import DEFAULT_MODEL, REDIS_CACHE_TURN_ON
from .controller_helpers import ( from .controller_helpers import (
...@@ -32,7 +31,7 @@ from .streaming_callback import ProductIDStreamingCallback ...@@ -32,7 +31,7 @@ from .streaming_callback import ProductIDStreamingCallback
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@observe(name="chat-request")
async def chat_controller( async def chat_controller(
query: str, query: str,
identity_key: str, identity_key: str,
...@@ -113,8 +112,27 @@ async def chat_controller( ...@@ -113,8 +112,27 @@ async def chat_controller(
} }
run_id = str(uuid.uuid4()) run_id = str(uuid.uuid4())
trace_id = Langfuse.create_trace_id() session_id = f"{identity_key}-{run_id[:8]}"
langfuse_handler = get_callback_handler()
# Set Langfuse trace metadata (nests under @observe parent)
langfuse = get_langfuse()
langfuse.update_current_trace(
user_id=identity_key,
session_id=session_id,
tags=["chatbot", "user:authenticated" if is_authenticated else "user:anonymous"],
metadata={
"device_id": device_id,
"customer_id": identity_key if is_authenticated else None,
"model": model_name,
},
)
# Get handler from context → nested under same trace as @observe
try:
langfuse_handler = langfuse.get_current_langchain_handler()
except Exception:
langfuse_handler = None
logger.warning("⚠️ Could not get Langfuse langchain handler")
streaming_callback = ProductIDStreamingCallback() streaming_callback = ProductIDStreamingCallback()
...@@ -123,20 +141,14 @@ async def chat_controller( ...@@ -123,20 +141,14 @@ async def chat_controller(
"user_id": identity_key, "user_id": identity_key,
"transient_images": images or [], "transient_images": images or [],
"run_id": run_id, "run_id": run_id,
"trace_id": trace_id,
}, },
run_id=run_id, run_id=run_id,
metadata={ metadata={
"trace_id": trace_id, "session_id": session_id,
"session_id": f"{identity_key}-{run_id[:8]}",
"device_id": device_id, "device_id": device_id,
"customer_id": identity_key if is_authenticated else None,
"langfuse_tags": ["chatbot", "user:authenticated" if is_authenticated else "user:anonymous"],
}, },
callbacks=[langfuse_handler, streaming_callback] if langfuse_handler else [streaming_callback], callbacks=[langfuse_handler, streaming_callback] if langfuse_handler else [streaming_callback],
) )
session_id = f"{identity_key}-{run_id[:8]}"
logger.info("🌊 Starting LLM streaming...") logger.info("🌊 Starting LLM streaming...")
...@@ -166,40 +178,39 @@ async def chat_controller( ...@@ -166,40 +178,39 @@ async def chat_controller(
early_response = False early_response = False
with propagate_attributes(user_id=identity_key, session_id=session_id): stream_task = asyncio.create_task(consume_events())
stream_task = asyncio.create_task(consume_events())
if return_user_insight: if return_user_insight:
# Dev mode: Đợi stream xong để có đầy đủ user_insight # Dev mode: Đợi stream xong để có đầy đủ user_insight
logger.info("🔍 [DEV MODE] Waiting for full stream...") logger.info("🔍 [DEV MODE] Waiting for full stream...")
await stream_task await stream_task
ai_text_response = streaming_callback.ai_response_text ai_text_response = streaming_callback.ai_response_text
final_product_ids = streaming_callback.product_skus final_product_ids = streaming_callback.product_skus
logger.info("✅ Stream completed in dev mode") logger.info("✅ Stream completed in dev mode")
else: else:
# Prod mode: Return ngay khi có product_ids # Prod mode: Return ngay khi có product_ids
wait_task = asyncio.create_task(streaming_callback.product_found_event.wait()) wait_task = asyncio.create_task(streaming_callback.product_found_event.wait())
done, pending = await asyncio.wait( done, pending = await asyncio.wait(
[stream_task, wait_task], [stream_task, wait_task],
return_when=asyncio.FIRST_COMPLETED, return_when=asyncio.FIRST_COMPLETED,
timeout=30.0, timeout=30.0,
) )
# bắn trả về ngay khi có product_ids hoặc timeout # bắn trả về ngay khi có product_ids hoặc timeout
ai_text_response = streaming_callback.ai_response_text ai_text_response = streaming_callback.ai_response_text
final_product_ids = streaming_callback.product_skus final_product_ids = streaming_callback.product_skus
elapsed = time.time() - start_time elapsed = time.time() - start_time
if streaming_callback.product_ids_found: if streaming_callback.product_ids_found:
logger.info(f"⚡ Response ready at t={elapsed:.2f}s (early return)") logger.info(f"⚡ Response ready at t={elapsed:.2f}s (early return)")
early_response = True early_response = True
else: else:
logger.info(f"✅ Stream completed at t={elapsed:.2f}s (no products)") logger.info(f"✅ Stream completed at t={elapsed:.2f}s (no products)")
# Cleanup (never cancel stream_task; we still need it) # Cleanup (never cancel stream_task; we still need it)
if wait_task in pending: if wait_task in pending:
wait_task.cancel() wait_task.cancel()
try: try:
# Ensure stream completes to get all tool messages (skip if early response) # Ensure stream completes to get all tool messages (skip if early response)
...@@ -297,7 +308,7 @@ async def chat_controller( ...@@ -297,7 +308,7 @@ async def chat_controller(
stream_task=stream_task, stream_task=stream_task,
streaming_callback=streaming_callback, streaming_callback=streaming_callback,
identity_key=identity_key, identity_key=identity_key,
trace_id=trace_id, trace_id=langfuse.get_current_trace_id() if langfuse else run_id,
) )
# ====================== BUILD RESPONSE ====================== # ====================== BUILD RESPONSE ======================
......
...@@ -159,22 +159,26 @@ async def _execute_single_search( ...@@ -159,22 +159,26 @@ async def _execute_single_search(
(first_p.get("description_text") or "")[:100], (first_p.get("description_text") or "")[:100],
) )
from langfuse import langfuse_context from langfuse import get_client as get_langfuse
langfuse_context.update_current_observation( try:
input={ lf = get_langfuse()
"description": item.description[:200] if item.description else None, lf.update_current_observation(
"product_name": item.product_name, input={
"gender": item.gender_by_product, "description": item.description[:200] if item.description else None,
"age": item.age_by_product, "product_name": item.product_name,
"product_line_vn": item.product_line_vn, "gender": item.gender_by_product,
"magento_ref_code": item.magento_ref_code, "age": item.age_by_product,
}, "product_line_vn": item.product_line_vn,
output={ "magento_ref_code": item.magento_ref_code,
"raw_count": len(products), },
"build_ms": round(query_build_time, 2), output={
"db_ms": round(db_time, 2), "raw_count": len(products),
}, "build_ms": round(query_build_time, 2),
) "db_ms": round(db_time, 2),
},
)
except Exception:
pass # Don't break if Langfuse not available
return format_product_results(products), {"fallback_used": False} return format_product_results(products), {"fallback_used": False}
except Exception as e: except Exception as e:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment