Commit 2fc434b5 authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

chore: simplify Docker - single Dockerfile + compose on port 5004 with hot...

chore: simplify Docker - single Dockerfile + compose on port 5004 with hot reload, remove dev/prod split
parent ac6be50f
FROM python:3.11-slim
WORKDIR /app
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 5004
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "5004", "--reload"]
# ============================================================
# DOCKERFILE.DEV - Local Development (Hot Reload + Cache)
# ============================================================
# Sử dụng Python 3.11 Slim để tối ưu dung lượng
FROM python:3.11-slim
# Thiết lập thư mục làm việc
WORKDIR /app
# Thiết lập biến môi trường
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV ENV=development
# Copy requirements.txt trước để tận dụng Docker cache
COPY requirements.txt .
# Cài đặt thư viện Python (Docker layer cache)
RUN pip install -r requirements.txt && pip install watchdog[watchmedo]
# Copy toàn bộ source code vào image
COPY . .
# Expose port 5000
EXPOSE 5000
# Health check (optional)
HEALTHCHECK --interval=10s --timeout=5s --start-period=5s --retries=2 \
CMD python -c "import requests; requests.get('http://localhost:5000/docs')" || exit 1
CMD ["gunicorn", "--workers", "4", "--worker-class", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:5000", "--timeout", "120", "--reload", "server:app"]
FROM python:3.11-slim
WORKDIR /app
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV ENV=development
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
# Expose port 5000 (Port chạy server)
EXPOSE 5000
# Tự động tính số worker = (Số Core * 2) + 1 để tận dụng tối đa CPU
CMD gunicorn server:app --workers $(( 2 * $(nproc) + 1 )) --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:5000 --timeout 60
"""
Fashion Q&A Agent Router
FastAPI endpoints cho Fashion Q&A Agent service.
Router chỉ chứa định nghĩa API, logic nằm ở controller.
Note: Rate limit check đã được xử lý trong middleware (CanifaAuthMiddleware)
"""
import logging
from fastapi import APIRouter, BackgroundTasks, Request
from fastapi.responses import JSONResponse
from agent.controller import chat_controller
from agent.models import QueryRequest
from common.cache import redis_cache
from common.message_limit import message_limit_service
from common.rate_limit import rate_limit_service
from config import DEFAULT_MODEL
logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/api/agent/chat", summary="Fashion Q&A Chat (Non-streaming)")
@rate_limit_service.limiter.limit("20/minute")
async def fashion_qa_chat(request: Request, req: QueryRequest, background_tasks: BackgroundTasks):
"""
Endpoint chat không stream - trả về response JSON đầy đủ một lần.
Note: Rate limit đã được check trong middleware.
"""
user_id = getattr(request.state, "user_id", None)
device_id = getattr(request.state, "device_id", "unknown")
is_authenticated = getattr(request.state, "is_authenticated", False)
identity_id = user_id if is_authenticated else device_id
logger.info(f"📥 [Incoming Query - NonStream] User: {identity_id} | Query: {req.user_query}")
try:
# Call controller để xử lý chat
result = await chat_controller(
query=req.user_query,
background_tasks=background_tasks,
model_name=DEFAULT_MODEL,
images=req.images,
identity_key=str(identity_id), # Key lưu history
is_authenticated=is_authenticated, # Pass auth status for Langfuse metadata
device_id=device_id, # Luôn truyền device_id để lưu vào Langfuse metadata
)
logger.info(f"💬 AI Response: {result['ai_response']}")
logger.info(f"🛍️ Product IDs: {result.get('product_ids', [])}")
# Increment message count SAU KHI chat thành công
usage_info = await message_limit_service.increment(
identity_key=identity_id,
is_authenticated=is_authenticated,
)
return {
"status": "success",
"ai_response": result["ai_response"],
"product_ids": result.get("product_ids", []),
"limit_info": {
"limit": usage_info["limit"],
"used": usage_info["used"],
"remaining": usage_info["remaining"],
"reset_seconds": usage_info.get("reset_seconds"),
},
}
except Exception as e:
logger.error(f"Error in fashion_qa_chat: {e}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"status": "error",
"error_code": "SYSTEM_ERROR",
"message": "Oops 😥 Hiện Canifa-AI chưa thể xử lý yêu cầu của bạn ngay lúc này, vui lòng quay lại trong giây lát.",
},
)
@router.post("/api/agent/chat-dev", summary="Fashion Q&A Chat (Dev - includes user_insight)")
@rate_limit_service.limiter.limit("50/minute")
async def fashion_qa_chat_dev(request: Request, req: QueryRequest, background_tasks: BackgroundTasks):
"""
Endpoint chat dành cho DEV - trả về đầy đủ user_insight.
Note: Rate limit đã được check trong middleware.
"""
user_id = getattr(request.state, "user_id", None)
device_id = getattr(request.state, "device_id", "unknown")
is_authenticated = getattr(request.state, "is_authenticated", False)
identity_id = user_id if is_authenticated else device_id
logger.info(f"📥 [Incoming Query - Dev] User: {identity_id} | Query: {req.user_query}")
try:
# DEV MODE: Return ai_response + products immediately
result = await chat_controller(
query=req.user_query,
background_tasks=background_tasks,
model_name=DEFAULT_MODEL,
images=req.images,
identity_key=str(identity_id),
return_user_insight=False,
is_authenticated=is_authenticated, # Pass auth status for Langfuse metadata
device_id=device_id, # Luôn truyền device_id để lưu vào Langfuse metadata
)
usage_info = await message_limit_service.increment(
identity_key=identity_id,
is_authenticated=is_authenticated,
)
logger.warning(f"🔍 [DEBUG] usage_info = {usage_info}")
return {
"status": "success",
"ai_response": result["ai_response"],
"product_ids": result.get("product_ids", []),
"insight_status": "success" if result.get("user_insight") else "pending",
"user_insight": result.get("user_insight"),
"limit_info": {
"limit": usage_info["limit"],
"used": usage_info["used"],
"remaining": usage_info["remaining"],
"reset_seconds": usage_info.get("reset_seconds"),
},
}
except Exception as e:
logger.error(f"Error in fashion_qa_chat_dev: {e}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"status": "error",
"error_code": "SYSTEM_ERROR",
"message": "Oops 😥 Hiện Canifa-AI chưa thể xử lý yêu cầu của bạn ngay lúc này, vui lòng quay lại trong giây lát.",
},
)
@router.get("/api/agent/user-insight", summary="Get latest user_insight (Dev)")
@rate_limit_service.limiter.limit("120/minute")
async def get_user_insight(request: Request):
"""
Polling endpoint for dev UI to fetch latest user_insight from Redis.
"""
user_id = getattr(request.state, "user_id", None)
device_id = getattr(request.state, "device_id", "unknown")
is_authenticated = getattr(request.state, "is_authenticated", False)
identity_id = user_id if is_authenticated else device_id
try:
client = redis_cache.get_client()
if not client:
return {"status": "pending", "user_insight": None}
insight_key = f"identity_key_insight:{identity_id}"
user_insight = await client.get(insight_key)
if user_insight:
return {"status": "success", "user_insight": user_insight}
return {"status": "pending", "user_insight": None}
except Exception as e:
logger.error(f"Error in get_user_insight: {e}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"status": "error",
"error_code": "SYSTEM_ERROR",
"message": "Không thể tải user_insight lúc này.",
},
)
import asyncio
import json
import logging
import time
from fastapi import APIRouter, BackgroundTasks, HTTPException
from pydantic import BaseModel
from agent.tools.data_retrieval_tool import SearchItem, data_retrieval_tool
from agent.mock_controller import mock_chat_controller
logger = logging.getLogger(__name__)
router = APIRouter()
# --- HELPERS ---
async def retry_with_backoff(coro_fn, max_retries=3, backoff_factor=2):
"""Retry async function with exponential backoff"""
for attempt in range(max_retries):
try:
return await coro_fn()
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = backoff_factor**attempt
logger.warning(f"⚠️ Attempt {attempt + 1} failed: {e!s}, retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
# --- MODELS ---
class MockQueryRequest(BaseModel):
user_query: str
user_id: str | None = "test_user"
session_id: str | None = None
images: list[str] | None = None
class MockDBRequest(BaseModel):
query: str | None = None
magento_ref_code: str | None = None
price_min: float | None = None
price_max: float | None = None
top_k: int = 10
class MockRetrieverRequest(BaseModel):
user_query: str
price_min: float | None = None
price_max: float | None = None
magento_ref_code: str | None = None
user_id: str | None = "test_user"
session_id: str | None = None
# --- MOCK LLM RESPONSES (không gọi OpenAI) ---
MOCK_AI_RESPONSES = [
"Dựa trên tìm kiếm của bạn, tôi tìm thấy các sản phẩm phù hợp với nhu cầu của bạn. Những mặt hàng này có chất lượng tốt và giá cả phải chăng.",
"Tôi gợi ý cho bạn những sản phẩm sau. Chúng đều là những lựa chọn phổ biến và nhận được đánh giá cao từ khách hàng.",
"Dựa trên tiêu chí tìm kiếm của bạn, đây là những sản phẩm tốt nhất mà tôi có thể giới thiệu.",
"Những sản phẩm này hoàn toàn phù hợp với yêu cầu của bạn. Hãy xem chi tiết để chọn sản phẩm yêu thích nhất.",
"Tôi đã tìm được các mặt hàng tuyệt vời cho bạn. Hãy kiểm tra chúng để tìm ra lựa chọn tốt nhất.",
]
# --- ENDPOINTS ---
@router.post("/api/mock/agent/chat", summary="Mock Agent Chat (Real Tools + Fake LLM)")
async def mock_chat(req: MockQueryRequest, background_tasks: BackgroundTasks):
"""
Mock Agent Chat using mock_chat_controller:
- ✅ Real embedding + vector search (data_retrieval_tool THẬT)
- ✅ Real products from StarRocks
- ❌ Fake LLM response (no OpenAI cost)
- Perfect for stress testing + end-to-end testing
"""
try:
logger.info(f"🚀 [Mock Agent Chat] Starting with query: {req.user_query}")
result = await mock_chat_controller(
query=req.user_query,
user_id=req.user_id or "test_user",
background_tasks=background_tasks,
images=req.images,
)
return {
"status": "success",
"user_query": req.user_query,
"user_id": req.user_id,
"session_id": req.session_id,
**result, # Include status, ai_response, product_ids, etc.
}
except Exception as e:
logger.error(f"❌ Error in mock agent chat: {e!s}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Mock Agent Chat Error: {e!s}")
@router.post("/api/mock/db/search", summary="Real Data Retrieval Tool (Agent Tool)")
async def mock_db_search(req: MockDBRequest):
"""
Dùng `data_retrieval_tool` THẬT từ Agent:
- Nếu có magento_ref_code → CODE SEARCH (không cần embedding)
- Nếu có query → HYDE SEMANTIC SEARCH (embedding + vector search)
- Lọc theo giá nếu có price_min/price_max
- Trả về sản phẩm thực từ StarRocks
Format input giống SearchItem của agent tool.
"""
try:
logger.info("📍 Data Retrieval Tool called")
start_time = time.time()
# Xây dựng SearchItem từ request - include all required fields
search_item = SearchItem(
query=req.query or "sản phẩm",
magento_ref_code=req.magento_ref_code,
price_min=req.price_min,
price_max=req.price_max,
action="search",
# Metadata fields - all required with None default
gender_by_product=None,
age_by_product=None,
product_name=None,
style=None,
master_color=None,
season=None,
material_group=None,
fitting=None,
form_neckline=None,
form_sleeve=None,
)
logger.info(f"🔧 Search params: {search_item.dict(exclude_none=True)}")
# Gọi data_retrieval_tool THẬT với retry
result_json = await retry_with_backoff(
lambda: data_retrieval_tool.ainvoke({"searches": [search_item]}), max_retries=3
)
result = json.loads(result_json)
elapsed_time = time.time() - start_time
logger.info(f"✅ Data Retrieval completed in {elapsed_time:.3f}s")
return {
"status": result.get("status", "success"),
"search_params": search_item.dict(exclude_none=True),
"total_results": len(result.get("results", [{}])[0].get("products", [])),
"products": result.get("results", [{}])[0].get("products", []),
"processing_time_ms": round(elapsed_time * 1000, 2),
"raw_result": result,
}
except Exception as e:
logger.error(f"❌ Error in DB search: {e!s}", exc_info=True)
raise HTTPException(status_code=500, detail=f"DB Search Error: {e!s}")
@router.post("/api/mock/retrieverdb", summary="Real Embedding + Real DB Vector Search")
@router.post("/api/mock/retriverdb", summary="Real Embedding + Real DB Vector Search (Legacy)")
async def mock_retriever_db(req: MockRetrieverRequest):
"""
API thực tế để test Retriever + DB Search (dùng agent tool):
- Lấy query từ user
- Embedding THẬT (gọi OpenAI embedding trong tool)
- Vector search THẬT trong StarRocks
- Trả về kết quả sản phẩm thực (bỏ qua LLM)
Dùng để test performance của embedding + vector search riêng biệt.
"""
try:
logger.info(f"📍 Retriever DB started: {req.user_query}")
start_time = time.time()
# Xây dựng SearchItem từ request - include all required fields
search_item = SearchItem(
query=req.user_query,
magento_ref_code=req.magento_ref_code,
price_min=req.price_min,
price_max=req.price_max,
action="search",
# Metadata fields - all required with None default
gender_by_product=None,
age_by_product=None,
product_name=None,
style=None,
master_color=None,
season=None,
material_group=None,
fitting=None,
form_neckline=None,
form_sleeve=None,
)
logger.info(f"🔧 Retriever params: {search_item.dict(exclude_none=True)}")
# Gọi data_retrieval_tool THẬT (embedding + vector search) với retry
result_json = await retry_with_backoff(
lambda: data_retrieval_tool.ainvoke({"searches": [search_item]}), max_retries=3
)
result = json.loads(result_json)
elapsed_time = time.time() - start_time
logger.info(f"✅ Retriever completed in {elapsed_time:.3f}s")
# Parse kết quả
search_results = result.get("results", [{}])[0]
products = search_results.get("products", [])
return {
"status": result.get("status", "success"),
"user_query": req.user_query,
"user_id": req.user_id,
"session_id": req.session_id,
"search_params": search_item.dict(exclude_none=True),
"total_results": len(products),
"products": products,
"processing_time_ms": round(elapsed_time * 1000, 2),
}
except Exception as e:
logger.error(f"❌ Error in retriever DB: {e!s}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Retriever DB Error: {e!s}")
from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel
import os
import re
from agent.graph import reset_graph
from common.cache import bump_prompt_version
router = APIRouter()
PROMPT_FILE_PATH = os.path.join(os.path.dirname(__file__), "../agent/system_prompt.txt")
# Allowed variables in prompt (single braces OK for these)
ALLOWED_VARIABLES = {"date_str"}
class PromptUpdateRequest(BaseModel):
content: str
def validate_prompt_braces(content: str) -> tuple[bool, list[str]]:
"""
Validate that all braces in prompt are properly escaped.
Returns (is_valid, list of problematic patterns)
"""
# Find all {word} patterns
single_brace_pattern = re.findall(r'\{([^{}]+)\}', content)
# Filter out allowed variables
problematic = [
var for var in single_brace_pattern
if var.strip() not in ALLOWED_VARIABLES and not var.startswith('{')
]
return len(problematic) == 0, problematic
from common.rate_limit import rate_limit_service
@router.get("/api/agent/system-prompt")
async def get_system_prompt_content(request: Request):
"""Get current system prompt content"""
try:
if os.path.exists(PROMPT_FILE_PATH):
with open(PROMPT_FILE_PATH, "r", encoding="utf-8") as f:
content = f.read()
return {"status": "success", "content": content}
else:
return {"status": "error", "message": "Prompt file not found"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/agent/system-prompt")
@rate_limit_service.limiter.limit("10/minute")
async def update_system_prompt_content(request: Request, body: PromptUpdateRequest):
"""Update system prompt content"""
try:
# Validate braces
is_valid, problematic = validate_prompt_braces(body.content)
if not is_valid:
# Return warning but still allow save
warning = (
f"⚠️ Phát hiện {{...}} chưa escape: {problematic[:3]}... "
f"Nếu đây là JSON, hãy dùng {{{{ }}}} thay vì {{ }}. "
f"Prompt vẫn được lưu nhưng có thể gây lỗi khi chat."
)
else:
warning = None
# 1. Update file
with open(PROMPT_FILE_PATH, "w", encoding="utf-8") as f:
f.write(body.content)
# 2. Bump prompt version in Redis (ALL workers will detect this)
new_version = await bump_prompt_version()
# 3. Reset local worker's Graph Singleton (immediate effect for this worker)
reset_graph()
response = {
"status": "success",
"message": f"System prompt updated. Version: {new_version}. All workers will reload on next request.",
"prompt_version": new_version
}
if warning:
response["warning"] = warning
return response
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
......@@ -33,7 +33,7 @@ _test_results: dict[str, Any] = {}
# Models
# ─────────────────────────────────────────────────────────
class TestRunRequest(BaseModel):
endpoint_url: str = "http://localhost:5004/api/agent/chat-dev"
endpoint_url: str = "http://172.16.2.207:5000/api/agent/chat-dev"
conversations: dict[str, list[str]] # conv_id -> [messages]
num_versions: int = 3
delay_ms: int = 1000
......@@ -157,7 +157,7 @@ async def run_test_batch(req: TestRunRequest):
messages = conversations[conv_id]
for version in range(1, req.num_versions + 1):
device_id = f"test-conv{conv_id}-v{version}"
device_id = f"test-c{conv_id}-v{version}-{uuid.uuid4().hex[:6]}"
result_key = f"{conv_id}_v{version}"
try:
......@@ -168,10 +168,11 @@ async def run_test_batch(req: TestRunRequest):
start_time = time.time()
# Send to target endpoint with fake device_id
tagged_message = f"{message} [v{version}]"
response = await client.post(
req.endpoint_url,
json={
"user_query": message,
"user_query": tagged_message,
"device_id": device_id,
},
headers={
......@@ -227,7 +228,7 @@ async def run_test_batch(req: TestRunRequest):
"completed": completed,
"total_tasks": total_tasks,
"total_time": result["total_time"],
"final_response": result["final_response"][:300],
"responses": version_responses,
"num_messages": len(messages),
})
......@@ -291,6 +292,17 @@ async def list_test_results():
}
# ─────────────────────────────────────────────────────────
# 4. Clear Results
# ─────────────────────────────────────────────────────────
@router.delete("/api/test/results/clear", summary="Clear all test results")
async def clear_test_results():
"""Clear all stored test results from memory."""
count = len(_test_results)
_test_results.clear()
return {"status": "success", "cleared": count, "message": f"Cleared {count} test result(s)"}
# ─────────────────────────────────────────────────────────
# Helper
# ─────────────────────────────────────────────────────────
......
from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel
from agent.graph import reset_graph
from agent.prompt_utils import list_tool_prompts, read_tool_prompt, write_tool_prompt
router = APIRouter()
class ToolPromptUpdateRequest(BaseModel):
content: str
@router.get("/api/agent/tool-prompts")
async def get_tool_prompts_list(request: Request):
"""List all available tool prompt files."""
try:
files = list_tool_prompts()
return {"status": "success", "files": files}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/agent/tool-prompts/{filename}")
async def get_tool_prompt_content(filename: str, request: Request):
"""Get content of a specific tool prompt file."""
try:
content = read_tool_prompt(filename)
if not content:
# Try appending .txt if not present
if not filename.endswith(".txt"):
content = read_tool_prompt(filename + ".txt")
return {"status": "success", "content": content}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/agent/tool-prompts/{filename}")
async def update_tool_prompt_content(filename: str, request: Request, body: ToolPromptUpdateRequest):
"""Update content of a tool prompt file and reset graph."""
try:
# Ensure filename is safe (basic check)
if ".." in filename or "/" in filename or "\\" in filename:
raise HTTPException(status_code=400, detail="Invalid filename")
success = write_tool_prompt(filename, body.content)
if not success:
raise HTTPException(status_code=500, detail="Failed to write file")
# Reset Graph to reload tools with new prompts
reset_graph()
return {"status": "success", "message": f"Tool prompt {filename} updated successfully. Graph reloaded."}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
services:
# --- Backend Service ---
backend:
build:
context: .
dockerfile: Dockerfile.prod
container_name: canifa_backend
env_file: .env
ports:
- "5000:5000"
volumes:
- .:/app
environment:
- PORT=5000
restart: unless-stopped
deploy:
resources:
limits:
memory: 8g
networks:
- backend_network
logging:
driver: "json-file"
options:
tag: "{{.Name}}"
networks:
backend_network:
driver: bridge
ipam:
driver: default
config:
- subnet: "172.24.0.0/16"
gateway: "172.24.0.1"
services:
# --- Backend Service ---
backend:
build: .
container_name: canifa_backend
env_file: .env
ports:
- "5000:5000"
volumes:
- .:/app
environment:
- PORT=5000
restart: unless-stopped
deploy:
resources:
limits:
memory: 8g
networks:
- backend_network
logging:
driver: "json-file"
options:
tag: "{{.Name}}"
networks:
backend_network:
driver: bridge
ipam:
driver: default
config:
- subnet: "172.24.0.0/16"
gateway: "172.24.0.1"
services:
backend:
build: .
container_name: canifa_backend
env_file: .env
ports:
- "5004:5004"
volumes:
- .:/app
restart: unless-stopped
deploy:
resources:
limits:
memory: 8g
logging:
driver: "json-file"
options:
max-size: "50m"
max-file: "3"
"""
CANIFA Fashion Agent - Terminal Tester
Test agent với full flow: LLM + Tools + MongoDB Memory (No Postgres Checkpoint)
"""
import asyncio
import platform
import sys
from pathlib import Path
# Add parent dir to sys.path
sys.path.insert(0, str(Path(__file__).parent))
from langchain_core.messages import AIMessage, HumanMessage
# from langfuse import get_client, observe, propagate_attributes # TẮT TẠM
# from langfuse.langchain import CallbackHandler # TẮT TẠM
from agent.graph import build_graph
from agent.models import get_config
async def main():
import logging
# Tắt log rác của LangChain để terminal sạch sẽ
logging.getLogger("langchain").setLevel(logging.ERROR)
logging.getLogger("langgraph").setLevel(logging.ERROR)
logging.getLogger("langfuse").setLevel(logging.ERROR) # TẮT Langfuse logs
# 📝 Enable INFO logs for Data Retrieval & Embedding tools (Timing & Debug)
logging.basicConfig(level=logging.INFO, format="%(message)s") # Bật INFO để thấy tool calls
logging.getLogger("agent.tools.data_retrieval_tool").setLevel(logging.INFO)
logging.getLogger("agent.tools.product_search_helpers").setLevel(logging.INFO)
print("\n" + "=" * 50)
print("🚀 CANIFA FASHION AGENT - TERMINAL TESTER (MEM V2)")
print("=" * 50)
try:
# 1. Init Langfuse (Monitor tool) - TẮT TẠM ĐỂ TRÁNH RATE LIMIT
# from langfuse import Langfuse
# Langfuse() # Initialize singleton
# 2. Build Graph
config = get_config()
graph = build_graph(config)
print("✅ System: Graph & MongoDB Memory Ready!")
except Exception as e:
print(f"❌ System Error: {e}")
import traceback
traceback.print_exc()
return
print("\n💬 CiCi: 'Em chào anh/chị ạ! Em là CiCi, stylist riêng của mình đây. Anh/chị cần em tư vấn gì không ạ?'")
print("(Gõ 'q' để thoát, 'image' để giả lập gửi ảnh)")
conversation_id = "test_terminal_session_v2"
user_id = "tester_01"
while True:
query = input("\n[User]: ").strip()
if query.lower() in ["exit", "q"]:
break
if not query:
continue
# Giả lập gửi ảnh
if query.lower() == "image":
print("📸 [System]: Đã giả lập gửi 1 ảnh (base64).")
query = "Mẫu này chất liệu gì vậy em?"
print(f"[User]: {query}")
# ⚙️ STREAMING MODE - Bật/Tắt
ENABLE_STREAMING = True
print("⏳ CiCi is thinking...")
# 🎯 Call wrapped function - each call = 1 trace
await run_single_query(
graph=graph,
query=query,
conversation_id=conversation_id,
user_id=user_id,
enable_streaming=ENABLE_STREAMING,
)
print("\n👋 CiCi: 'Cảm ơn anh/chị đã ghé thăm CANIFA. Hẹn gặp lại nhé!'")
# @observe() # TẮT TẠM - Tránh Langfuse rate limit
async def run_single_query(graph, query: str, conversation_id: str, user_id: str, enable_streaming: bool = True):
"""Run single query - Langfuse DISABLED"""
import logging
logger = logging.getLogger(__name__)
# Load History từ MongoDB
history = []
current_human_msg = HumanMessage(content=query)
input_state = {
"messages": [current_human_msg],
"history": history,
"user_id": user_id,
}
# TẮT Langfuse callback
# langfuse_handler = CallbackHandler()
config_runnable = {
"configurable": {"conversation_id": conversation_id, "user_id": user_id, "transient_images": []},
# "callbacks": [langfuse_handler], # TẮT
}
final_ai_message = None
ai_content = ""
try:
# Chạy Stream
if enable_streaming:
print("\n👸 CiCi: ", end="", flush=True)
async for event in graph.astream(input_state, config=config_runnable, stream_mode="values"):
if "messages" in event:
msg = event["messages"][-1]
# 🔍 LOG TOOL CALLS
if hasattr(msg, "tool_calls") and msg.tool_calls:
logger.info(f"\n🛠️ TOOL CALLED: {[tc['name'] for tc in msg.tool_calls]}")
if isinstance(msg, AIMessage) and msg.content:
final_ai_message = msg
# STREAMING MODE: In từng đoạn content
if enable_streaming and msg.content != ai_content:
new_content = msg.content[len(ai_content) :]
print(new_content, end="", flush=True)
ai_content = msg.content
# Nếu không stream, in toàn bộ
if not enable_streaming and final_ai_message:
print(f"\n👸 CiCi: {final_ai_message.content}")
else:
print() # Xuống dòng
if final_ai_message:
# Lưu History mới
new_history = [*history, current_human_msg, final_ai_message]
# TẮT Langfuse update
# langfuse = get_client()
# langfuse.update_current_trace(...)
print("[System]: ✅ Response complete")
except Exception as e:
print(f"\n❌ Error during execution: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())
# ============================================================
# CANIFA BACKEND - Quick Reference
# ============================================================
# === LOCAL DEV (port 5000) ===
.\.venv\Scripts\activate
uvicorn server:app --host 0.0.0.0 --port 5000 --reload
uvicorn server:app --host 0.0.0.0 --port 5000
docker restart chatbot-backend
docker restart chatbot-backend && docker logs -f chatbot-backend
docker logs -f chatbot-backend
# === DOCKER (port 5004, hot reload) ===
docker compose up -d --build
docker restart canifa_backend
docker logs -f canifa_backend
docker restart canifa_backend && docker logs -f canifa_backend
docker compose down
sudo docker compose -f docker-compose.prod.yml up -d --build
Get-NetTCPConnection -LocalPort 5000 | ForEach-Object { Stop-Process -Id $_.OwningProcess -Force }
# === KILL PORT ===
taskkill /F /IM python.exe
netstat -ano | findstr :5000 | ForEach-Object { $_.Split()[-1] } | Sort-Object -Unique | ForEach-Object { taskkill /F /PID $_ }
\ No newline at end of file
netstat -ano | findstr :5004 | ForEach-Object { $_.Split()[-1] } | Sort-Object -Unique | ForEach-Object { taskkill /F /PID $_ }
\ No newline at end of file
......@@ -8,14 +8,10 @@ from fastapi import FastAPI
from fastapi.responses import FileResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from api.chatbot_route import router as chatbot_router
from api.check_history_route import router as check_history_router
from api.conservation_route import router as conservation_router
from api.mock_api_route import router as mock_router
from api.prompt_route import router as prompt_router
from api.stock_route import router as stock_router
from api.test_conversation_route import router as test_conversation_router
from api.tool_prompt_route import router as tool_prompt_router
from common.cache import redis_cache
from common.middleware import middleware_manager
from config import PORT
......@@ -56,7 +52,7 @@ async def startup_event():
@app.get("/")
async def root():
return RedirectResponse(url="/static/index.html")
return RedirectResponse(url="/static/test_conversation.html")
@app.get("/health")
......@@ -90,10 +86,6 @@ middleware_manager.setup(
# api include
app.include_router(conservation_router)
app.include_router(check_history_router)
app.include_router(chatbot_router)
app.include_router(prompt_router)
app.include_router(tool_prompt_router) # Register new router
app.include_router(mock_router)
app.include_router(stock_router)
app.include_router(test_conversation_router)
......@@ -103,14 +95,14 @@ if __name__ == "__main__":
print("🚀 Contract AI Service Starting...")
print("=" * 60)
print(f"📡 REST API: http://localhost:{PORT}")
print(f"📡 Test Chatbot: http://localhost:{PORT}/static/index.html")
print(f"📡 Test Tool: http://localhost:{PORT}/static/test_conversation.html")
print(f"📚 API Docs: http://localhost:{PORT}/docs")
print("=" * 60)
ENABLE_RELOAD = False
print(f"⚠️ Hot reload: {ENABLE_RELOAD}")
reload_dirs = ["common", "api", "agent"]
reload_dirs = ["common", "api"]
if ENABLE_RELOAD:
os.environ["PYTHONUNBUFFERED"] = "1"
......
This diff is collapsed.
This diff is collapsed.
message_content,conversation_id_test
Cho mình xem áo khoác nam,1
Có size L không?,1
Mình muốn mua váy đỏ,2
Giá bao nhiêu vậy?,2
Có khuyến mãi không?,2
Tư vấn quần jean nữ,3
This diff is collapsed.
This diff is collapsed.
conversation_id_test,message_content
1,Xin chào
1,Tôi muốn tìm áo khoác nữ
2,Hi
2,Có áo phao nam không?
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment