Commit c2eaaeb7 authored by Hoanganhvu123's avatar Hoanganhvu123

Initial commit: Batch Testing Tool for Chatbot Canifa

parents
Pipeline #3323 failed with stages
# Environment variables
.env
.env.local
.env.*.local
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual Environment
venv/
ENV/
env/
.venv
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
Thumbs.db
# Logs
*.log
logs/
# Temporary files
*.tmp
*.temp
temp/
tmp/
# Excel files (test files)
*.xlsx
*.xls
!example_questions.xlsx
# Test results
results/
output/
# Database
*.db
*.sqlite
*.sqlite3
# Batch Testing Tool - Chatbot Canifa
Tool để test batch chatbot với file Excel, hỗ trợ test nhiều lần mỗi câu hỏi và thu thập metrics chi tiết.
## 🚀 Tính năng
- ✅ Upload file Excel với danh sách câu hỏi
- ✅ Test mỗi câu hỏi N lần (configurable)
- ✅ Track metrics chi tiết: response, product IDs, latency, cost, tokens
- ✅ Tích hợp Langfuse để lấy metrics (optional)
- ✅ Real-time progress tracking
- ✅ Export Excel với 3 sheets: Summary, Results, Aggregated
- ✅ Beautiful HTML UI
## 📋 Yêu cầu
- Python 3.10+
- Chatbot Canifa API đang chạy
- Langfuse (optional, để lấy thêm metrics)
## 🔧 Cài đặt
1. **Clone và cài đặt dependencies:**
```bash
cd C:\tool_test_chatbot_canifa
pip install -r requirements.txt
```
2. **Cấu hình `.env` file:**
```env
# Chatbot API
CHATBOT_API_URL=http://localhost:8000
CHATBOT_API_ENDPOINT=/api/agent/chat
# Tool settings
TOOL_PORT=5001
MAX_CONCURRENT_REQUESTS=5
REQUEST_TIMEOUT=60
# Langfuse (optional)
LANGFUSE_BASE_URL=https://cloud.langfuse.com
LANGFUSE_SECRET_KEY=your_secret_key_here
```
3. **Chạy server:**
```bash
python app.py
```
4. **Mở browser:**
```
http://localhost:5001
```
## 📝 Cách sử dụng
1. **Chuẩn bị file Excel:**
- Tạo file Excel với cột chứa câu hỏi (mặc định: "Câu hỏi")
- Có thể có thêm các cột khác (sẽ được giữ nguyên trong output)
2. **Upload và test:**
- Mở http://localhost:5001
- Chọn file Excel
- Nhập tên cột chứa câu hỏi (mặc định: "Câu hỏi")
- Chọn số lần test mỗi câu hỏi (1-10)
- Click "Bắt đầu Test"
3. **Theo dõi progress:**
- Xem real-time progress bar
- Xem số lượng đã xử lý, thành công, lỗi
4. **Download kết quả:**
- Sau khi hoàn thành, click "Download Excel Results"
- File Excel có 3 sheets:
- **Summary**: Tổng kết metrics
- **Results**: Chi tiết từng lần test
- **Aggregated**: Tổng hợp theo câu hỏi
## 📊 Metrics được track
### Từ API Response:
- AI Response
- Product IDs
- Status (success/error)
- Product count
### Từ Langfuse (nếu enabled):
- Cost (USD)
- Latency (ms)
- Input/Output/Total Tokens
- Model name
### Aggregated Metrics:
- Average cost/latency
- Min/Max latency
- Success rate
- Response consistency (High/Medium/Low)
- Average product count
## 🏗️ Cấu trúc code
```
tool_test_chatbot_canifa/
├── app.py # Main FastAPI app
├── config.py # Config từ .env
├── requirements.txt # Dependencies
├── api/
│ └── routes.py # API endpoints
├── services/
│ ├── api_client.py # Chatbot API client
│ ├── langfuse_client.py # Langfuse client (optional)
│ └── batch_processor.py # Batch processing logic
├── utils/
│ └── excel_handler.py # Excel read/write
└── templates/
└── index.html # HTML frontend
```
## 🔌 API Endpoints
- `POST /api/batch-test/upload` - Upload Excel và bắt đầu test
- `GET /api/batch-test/progress/{task_id}` - Lấy progress
- `GET /api/batch-test/download/{task_id}` - Download Excel kết quả
- `GET /api/batch-test/health` - Health check
## 📝 Notes
- Tool này gọi API chatbot-canifa, không cần code thay đổi ở chatbot
- Langfuse tự động track trong chatbot, tool chỉ lấy thêm metrics (optional)
- Progress được lưu trong memory, restart server sẽ mất progress
- Trong production, nên dùng Redis hoặc DB để lưu progress
## 🐛 Troubleshooting
1. **Không kết nối được chatbot API:**
- Kiểm tra `CHATBOT_API_URL` trong `.env`
- Đảm bảo chatbot server đang chạy
2. **Không đọc được Excel:**
- Kiểm tra tên cột chứa câu hỏi
- Đảm bảo file là .xlsx hoặc .xls
3. **Langfuse metrics không có:**
- Kiểm tra `LANGFUSE_SECRET_KEY` trong `.env`
- Có thể cần delay để Langfuse sync
# API package
"""
FastAPI routes cho batch testing tool
"""
import logging
import uuid
from typing import Any
from fastapi import APIRouter, BackgroundTasks, File, Form, HTTPException, UploadFile
from fastapi.responses import FileResponse, JSONResponse
from services.batch_processor import BatchProcessor
from utils.excel_handler import read_excel, create_results_excel
logger = logging.getLogger(__name__)
router = APIRouter()
# In-memory storage cho progress (trong production nên dùng Redis hoặc DB)
progress_store: dict[str, dict[str, Any]] = {}
@router.post("/api/batch-test/upload", summary="Upload Excel và bắt đầu batch test")
async def upload_and_test(
file: UploadFile = File(...),
num_tests: int = Form(1),
question_column: str = Form("Câu hỏi"),
):
"""
Upload Excel file và bắt đầu batch testing
Args:
file: Excel file
num_tests: Số lần test mỗi câu hỏi
question_column: Tên cột chứa câu hỏi
Returns:
Task ID để track progress
"""
try:
# Validate file
if not file.filename or not file.filename.endswith((".xlsx", ".xls")):
raise HTTPException(status_code=400, detail="File phải là Excel (.xlsx hoặc .xls)")
# Đọc file
file_content = await file.read()
questions = read_excel(file_content, question_column=question_column)
if not questions:
raise HTTPException(status_code=400, detail="Không tìm thấy câu hỏi nào trong file")
# Tạo task ID
task_id = str(uuid.uuid4())
# Initialize progress
progress_store[task_id] = {
"status": "processing",
"processed": 0,
"total": len(questions) * num_tests,
"current_question": 0,
"current_attempt": 0,
"successful": 0,
"failed": 0,
}
# Start batch processing (async)
processor = BatchProcessor()
async def process_task():
try:
def progress_callback(progress: dict[str, Any]):
progress_store[task_id].update(progress)
result = await processor.process_batch(
questions=questions,
num_tests_per_question=num_tests,
progress_callback=progress_callback,
)
# Tạo Excel output
excel_output = create_results_excel(
summary_data=result["summary"],
detailed_results=result["detailed_results"],
aggregated_results=result["aggregated_results"],
)
# Lưu kết quả
progress_store[task_id].update(
{
"status": "completed",
"result": result,
"excel_output": excel_output,
}
)
except Exception as e:
logger.error(f"Error processing batch: {e}", exc_info=True)
progress_store[task_id].update(
{
"status": "error",
"error_message": str(e),
}
)
# Run async task
import asyncio
asyncio.create_task(process_task())
return JSONResponse(
{
"task_id": task_id,
"message": "Batch testing đã bắt đầu",
"total_questions": len(questions),
"num_tests_per_question": num_tests,
"total_tests": len(questions) * num_tests,
}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in upload_and_test: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/batch-test/progress/{task_id}", summary="Lấy progress của batch test")
async def get_progress(task_id: str):
"""
Lấy progress của batch test
Args:
task_id: Task ID từ upload endpoint
Returns:
Progress info
"""
if task_id not in progress_store:
raise HTTPException(status_code=404, detail="Task không tồn tại")
progress = progress_store[task_id].copy()
# Không trả về excel_output trong progress (quá lớn)
if "excel_output" in progress:
progress["excel_ready"] = True
del progress["excel_output"]
return JSONResponse(progress)
@router.get("/api/batch-test/download/{task_id}", summary="Download Excel kết quả")
async def download_results(task_id: str, background_tasks: BackgroundTasks):
"""
Download Excel file kết quả
Args:
task_id: Task ID từ upload endpoint
background_tasks: Background tasks để xóa file sau khi download
Returns:
Excel file
"""
import os
import tempfile
if task_id not in progress_store:
raise HTTPException(status_code=404, detail="Task không tồn tại")
task_data = progress_store[task_id]
if task_data["status"] != "completed":
raise HTTPException(status_code=400, detail="Task chưa hoàn thành")
if "excel_output" not in task_data:
raise HTTPException(status_code=500, detail="Excel output không tồn tại")
# Tạo temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") as tmp_file:
tmp_file.write(task_data["excel_output"])
tmp_path = tmp_file.name
# Xóa file sau khi download
background_tasks.add_task(os.unlink, tmp_path)
return FileResponse(
tmp_path,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
filename=f"batch_test_results_{task_id}.xlsx",
)
@router.get("/api/batch-test/health", summary="Health check")
async def health_check():
"""Health check endpoint"""
return JSONResponse({"status": "ok", "message": "Batch testing tool is running"})
"""
Main FastAPI application cho Batch Testing Tool
"""
import logging
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from api.routes import router
from config import TOOL_PORT
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# Create FastAPI app
app = FastAPI(
title="Batch Testing Tool - Chatbot Canifa",
description="Tool để test batch chatbot với Excel file",
version="1.0.0",
)
# Include routes
app.include_router(router)
@app.get("/", response_class=HTMLResponse)
async def root():
"""Serve HTML frontend"""
try:
with open("templates/index.html", "r", encoding="utf-8") as f:
return HTMLResponse(content=f.read())
except FileNotFoundError:
return HTMLResponse(
content="<h1>Error: templates/index.html not found</h1>",
status_code=500,
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app:app",
host="0.0.0.0",
port=TOOL_PORT,
reload=True,
log_level="info",
)
"""
Config file cho Supabase và các environment variables
Lấy giá trị từ file .env qua os.getenv
"""
import os
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Export all config variables for type checking
__all__ = [
"AI_MODEL_NAME",
"AI_SUPABASE_KEY",
"AI_SUPABASE_URL",
"CHECKPOINT_POSTGRES_SCHEMA",
"CHECKPOINT_POSTGRES_URL",
"CLERK_SECRET_KEY",
"CONV_DATABASE_URL",
"CONV_SUPABASE_KEY",
"CONV_SUPABASE_URL",
"DEFAULT_MODEL",
"FIRECRAWL_API_KEY",
"GOOGLE_API_KEY",
"GROQ_API_KEY",
"JWT_ALGORITHM",
"JWT_SECRET",
"LANGFUSE_BASE_URL",
"LANGFUSE_PUBLIC_KEY",
"LANGFUSE_SECRET_KEY",
"LANGSMITH_API_KEY",
"LANGSMITH_ENDPOINT",
"LANGSMITH_PROJECT",
"LANGSMITH_TRACING",
"MONGODB_DB_NAME",
"MONGODB_URI",
"OPENAI_API_KEY",
"OTEL_EXPORTER_JAEGER_AGENT_HOST",
"OTEL_EXPORTER_JAEGER_AGENT_PORT",
"OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES",
"OTEL_SERVICE_NAME",
"OTEL_TRACES_EXPORTER",
"PORT",
"REDIS_HOST",
"REDIS_PASSWORD",
"REDIS_PORT",
"REDIS_USERNAME",
"STARROCKS_DB",
"STARROCKS_HOST",
"STARROCKS_PASSWORD",
"STARROCKS_PORT",
"STARROCKS_USER",
"USE_MONGO_CONVERSATION",
]
# ====================== SUPABASE CONFIGURATION ======================
AI_SUPABASE_URL: str | None = os.getenv("AI_SUPABASE_URL")
AI_SUPABASE_KEY: str | None = os.getenv("AI_SUPABASE_KEY")
CONV_SUPABASE_URL: str | None = os.getenv("CONV_SUPABASE_URL")
CONV_SUPABASE_KEY: str | None = os.getenv("CONV_SUPABASE_KEY")
# ====================== REDIS CONFIGURATION ======================
REDIS_HOST: str | None = os.getenv("REDIS_HOST")
REDIS_PORT: int = int(os.getenv("REDIS_PORT", "6379"))
REDIS_PASSWORD: str | None = os.getenv("REDIS_PASSWORD")
REDIS_USERNAME: str | None = os.getenv("REDIS_USERNAME")
# ====================== AI API KEYS & MODELS ======================
OPENAI_API_KEY: str | None = os.getenv("OPENAI_API_KEY")
GOOGLE_API_KEY: str | None = os.getenv("GOOGLE_API_KEY")
GROQ_API_KEY: str | None = os.getenv("GROQ_API_KEY")
DEFAULT_MODEL: str = os.getenv("DEFAULT_MODEL", "gpt-5-nano")
# DEFAULT_MODEL: str = os.getenv("DEFAULT_MODEL")
# ====================== JWT CONFIGURATION ======================
JWT_SECRET: str | None = os.getenv("JWT_SECRET")
JWT_ALGORITHM: str | None = os.getenv("JWT_ALGORITHM")
# ====================== SERVER CONFIG ======================
PORT: int = int(os.getenv("PORT", "5000"))
FIRECRAWL_API_KEY: str | None = os.getenv("FIRECRAWL_API_KEY")
# ====================== LANGFUSE CONFIGURATION (DEPRECATED) ======================
LANGFUSE_SECRET_KEY: str | None = os.getenv("LANGFUSE_SECRET_KEY")
LANGFUSE_PUBLIC_KEY: str | None = os.getenv("LANGFUSE_PUBLIC_KEY")
LANGFUSE_BASE_URL: str | None = os.getenv("LANGFUSE_BASE_URL", "https://cloud.langfuse.com")
# ====================== LANGSMITH CONFIGURATION (TẮT VÌ RATE LIMIT) ======================
# LANGSMITH_TRACING = os.getenv("LANGSMITH_TRACING", "false")
# LANGSMITH_ENDPOINT = os.getenv("LANGSMITH_ENDPOINT", "https://api.smith.langchain.com")
# LANGSMITH_API_KEY = os.getenv("LANGSMITH_API_KEY")
# LANGSMITH_PROJECT = os.getenv("LANGSMITH_PROJECT")
LANGSMITH_TRACING = "false"
LANGSMITH_ENDPOINT = None
LANGSMITH_API_KEY = None
LANGSMITH_PROJECT = None
# ====================== CLERK AUTHENTICATION ======================
CLERK_SECRET_KEY: str | None = os.getenv("CLERK_SECRET_KEY")
# ====================== DATABASE CONNECTION ======================
CONV_DATABASE_URL: str | None = os.getenv("CONV_DATABASE_URL")
# ====================== MONGO CONFIGURATION ======================
MONGODB_URI: str | None = os.getenv("MONGODB_URI", "mongodb://localhost:27017")
MONGODB_DB_NAME: str | None = os.getenv("MONGODB_DB_NAME", "ai_law")
USE_MONGO_CONVERSATION: bool = os.getenv("USE_MONGO_CONVERSATION", "true").lower() == "true"
# ====================== CANIFA INTERNAL POSTGRES ======================
CHECKPOINT_POSTGRES_URL: str | None = os.getenv("CHECKPOINT_POSTGRES_URL")
CHECKPOINT_POSTGRES_SCHEMA: str = os.getenv("CHECKPOINT_POSTGRES_SCHEMA", "canifa_chat")
# ====================== STARROCKS DATA LAKE ======================
STARROCKS_HOST: str | None = os.getenv("STARROCKS_HOST")
STARROCKS_PORT: int = int(os.getenv("STARROCKS_PORT", "9030"))
STARROCKS_USER: str | None = os.getenv("STARROCKS_USER")
STARROCKS_PASSWORD: str | None = os.getenv("STARROCKS_PASSWORD")
STARROCKS_DB: str | None = os.getenv("STARROCKS_DB")
# Placeholder for backward compatibility if needed
AI_MODEL_NAME = DEFAULT_MODEL
# ====================== OPENTELEMETRY CONFIGURATION ======================
OTEL_EXPORTER_JAEGER_AGENT_HOST = os.getenv("OTEL_EXPORTER_JAEGER_AGENT_HOST")
OTEL_EXPORTER_JAEGER_AGENT_PORT = os.getenv("OTEL_EXPORTER_JAEGER_AGENT_PORT")
OTEL_SERVICE_NAME = os.getenv("OTEL_SERVICE_NAME")
OTEL_TRACES_EXPORTER = os.getenv("OTEL_TRACES_EXPORTER")
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES = os.getenv("OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES")
# ====================== BATCH TESTING TOOL CONFIGURATION ======================
CHATBOT_API_URL: str = os.getenv("CHATBOT_API_URL", "http://localhost:8000")
CHATBOT_API_ENDPOINT: str = os.getenv("CHATBOT_API_ENDPOINT", "/api/agent/chat")
TOOL_PORT: int = int(os.getenv("TOOL_PORT", "5001"))
MAX_CONCURRENT_REQUESTS: int = int(os.getenv("MAX_CONCURRENT_REQUESTS", "5"))
REQUEST_TIMEOUT: int = int(os.getenv("REQUEST_TIMEOUT", "60"))
"""
Script để tạo file Excel mẫu cho batch testing
"""
from utils.excel_handler import _get_pandas
# Lazy load pandas
pd = _get_pandas()
# Tạo dữ liệu mẫu
data = {
"Câu hỏi": [
"Áo sơ mi trắng size M giá bao nhiêu?",
"Quần jean nam có màu gì?",
"Giày thể thao nữ size 38",
"Áo khoác mùa đông có size nào?",
"Túi xách da có màu đen không?",
],
"Category": [
"Áo",
"Quần",
"Giày",
"Áo khoác",
"Phụ kiện",
],
"Expected_Products": [
"Áo sơ mi",
"Quần jean",
"Giày thể thao",
"Áo khoác",
"Túi xách",
],
}
# Tạo DataFrame
df = pd.DataFrame(data)
# Lưu vào Excel
df.to_excel("example_questions.xlsx", index=False)
print("✅ Đã tạo file example_questions.xlsx")
print("\nFormat file Excel:")
print(df.to_string())
fastapi==0.104.1
uvicorn[standard]==0.24.0
python-multipart==0.0.6
httpx==0.25.2
pandas==2.1.3
openpyxl==3.1.2
python-dotenv==1.0.0
# Services package
"""
API Client để gọi chatbot-canifa API
"""
import asyncio
import logging
import time
from typing import Any
import httpx
from config import CHATBOT_API_ENDPOINT, CHATBOT_API_URL, REQUEST_TIMEOUT
logger = logging.getLogger(__name__)
class ChatbotAPIClient:
"""Client để gọi chatbot API"""
def __init__(self):
self.base_url = CHATBOT_API_URL
self.endpoint = CHATBOT_API_ENDPOINT
self.timeout = REQUEST_TIMEOUT
async def chat(
self,
query: str,
user_id: str | None = None,
test_id: str | None = None,
test_attempt: int | None = None,
) -> dict[str, Any]:
"""
Gọi chatbot API và trả về response với metrics
Args:
query: Câu hỏi
user_id: User ID (optional)
test_id: Test ID để track (optional)
test_attempt: Số lần test (1, 2, 3...) (optional)
Returns:
Dict chứa response và metrics:
{
"status": "success" | "error",
"ai_response": str,
"product_ids": list,
"latency_ms": float,
"timestamp": str,
"error_message": str | None
}
"""
if not user_id:
user_id = f"batch_test_{test_id}_{test_attempt}" if test_id else "batch_test_user"
url = f"{self.base_url}{self.endpoint}"
payload = {
"user_query": query,
"user_id": user_id,
}
start_time = time.time()
error_message = None
status = "error"
ai_response = ""
product_ids = []
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(url, json=payload)
latency_ms = (time.time() - start_time) * 1000
if response.status_code == 200:
data = response.json()
status = data.get("status", "error")
ai_response = data.get("ai_response", "")
product_ids = data.get("product_ids", [])
else:
error_message = f"API returned status {response.status_code}: {response.text}"
logger.error(error_message)
except httpx.TimeoutException:
latency_ms = (time.time() - start_time) * 1000
error_message = f"Request timeout after {self.timeout}s"
logger.error(error_message)
except Exception as e:
latency_ms = (time.time() - start_time) * 1000
error_message = str(e)
logger.error(f"Error calling chatbot API: {e}", exc_info=True)
return {
"status": status,
"ai_response": ai_response,
"product_ids": product_ids,
"latency_ms": round(latency_ms, 2),
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"error_message": error_message,
}
async def batch_chat(
self,
queries: list[str],
user_id_prefix: str = "batch_test",
max_concurrent: int = 5,
) -> list[dict[str, Any]]:
"""
Gọi API cho nhiều queries song song
Args:
queries: List câu hỏi
user_id_prefix: Prefix cho user_id
max_concurrent: Số requests đồng thời tối đa
Returns:
List kết quả theo thứ tự queries
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def chat_with_semaphore(index: int, query: str):
async with semaphore:
return await self.chat(query, user_id=f"{user_id_prefix}_{index}")
tasks = [chat_with_semaphore(i, query) for i, query in enumerate(queries)]
results = await asyncio.gather(*tasks)
return results
"""
Batch Processor để xử lý batch testing
"""
import asyncio
import logging
import statistics
from datetime import datetime
from typing import Any, Callable
from services.api_client import ChatbotAPIClient
from services.langfuse_client import LangfuseClient
logger = logging.getLogger(__name__)
class BatchProcessor:
"""Processor để xử lý batch testing"""
def __init__(self, max_concurrent: int = 5):
self.api_client = ChatbotAPIClient()
self.langfuse_client = LangfuseClient()
self.max_concurrent = max_concurrent
async def process_batch(
self,
questions: list[dict[str, Any]],
num_tests_per_question: int = 1,
progress_callback: Callable[[dict[str, Any]], None] | None = None,
) -> dict[str, Any]:
"""
Xử lý batch testing cho danh sách câu hỏi
Args:
questions: List câu hỏi [{"id": 1, "question": "...", "row_data": {...}}, ...]
num_tests_per_question: Số lần test mỗi câu hỏi
progress_callback: Callback để update progress (optional)
Returns:
Dict chứa:
- summary: Tổng kết
- detailed_results: Chi tiết từng lần test
- aggregated_results: Tổng hợp theo câu hỏi
"""
total_questions = len(questions)
total_tests = total_questions * num_tests_per_question
detailed_results = []
aggregated_results = []
start_time = datetime.now()
processed = 0
successful = 0
failed = 0
total_cost = 0.0
# Process từng câu hỏi
for question_data in questions:
question_id = question_data["id"]
question = question_data["question"]
row_data = question_data.get("row_data", {})
# Test câu hỏi này N lần
test_results = []
for test_attempt in range(1, num_tests_per_question + 1):
test_id = f"q{question_id}_t{test_attempt}"
# Gọi API
result = await self.api_client.chat(
query=question,
user_id=f"batch_test_{question_id}",
test_id=str(question_id),
test_attempt=test_attempt,
)
# Thêm thông tin vào result
result["question_id"] = question_id
result["question"] = question
result["test_attempt"] = test_attempt
result["product_count"] = len(result.get("product_ids", []))
# Lấy thêm metrics từ Langfuse (optional)
if self.langfuse_client.enabled:
# Tìm trace từ Langfuse (có thể cần delay để Langfuse sync)
await asyncio.sleep(1) # Đợi Langfuse sync
langfuse_metrics = await self.langfuse_client.get_trace_metrics(
trace_id=test_id
)
if langfuse_metrics:
result.update(langfuse_metrics)
test_results.append(result)
detailed_results.append(result)
# Update counters
processed += 1
if result["status"] == "success":
successful += 1
else:
failed += 1
# Update progress
if progress_callback:
progress_callback(
{
"processed": processed,
"total": total_tests,
"current_question": question_id,
"current_attempt": test_attempt,
"successful": successful,
"failed": failed,
}
)
# Tính toán aggregated metrics cho câu hỏi này
aggregated = self._aggregate_test_results(question_id, question, test_results, row_data)
aggregated_results.append(aggregated)
# Update total cost
total_cost += aggregated.get("avg_cost", 0.0) * num_tests_per_question
# Tính tổng kết
end_time = datetime.now()
duration_seconds = (end_time - start_time).total_seconds()
summary = {
"total_questions": total_questions,
"num_tests_per_question": num_tests_per_question,
"total_tests": total_tests,
"successful": successful,
"failed": failed,
"success_rate": round((successful / total_tests * 100) if total_tests > 0 else 0, 2),
"total_cost_usd": round(total_cost, 4),
"avg_cost_per_test": round(total_cost / total_tests if total_tests > 0 else 0, 4),
"duration_seconds": round(duration_seconds, 2),
"start_time": start_time.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_time.strftime("%Y-%m-%d %H:%M:%S"),
}
return {
"summary": summary,
"detailed_results": detailed_results,
"aggregated_results": aggregated_results,
}
def _aggregate_test_results(
self,
question_id: int,
question: str,
test_results: list[dict[str, Any]],
row_data: dict[str, Any],
) -> dict[str, Any]:
"""
Tính toán aggregated metrics cho một câu hỏi sau N lần test
Args:
question_id: ID câu hỏi
question: Nội dung câu hỏi
test_results: List kết quả từ N lần test
row_data: Dữ liệu gốc từ Excel
Returns:
Dict aggregated metrics
"""
successful_results = [r for r in test_results if r["status"] == "success"]
success_count = len(successful_results)
total_count = len(test_results)
# Tính average metrics
latencies = [r["latency_ms"] for r in test_results if r.get("latency_ms")]
costs = [r.get("cost", 0) for r in test_results if r.get("cost")]
product_counts = [r.get("product_count", 0) for r in test_results]
# Response consistency (so sánh các responses)
responses = [r.get("ai_response", "") for r in successful_results]
consistency = self._calculate_consistency(responses)
aggregated = {
"question_id": question_id,
"question": question,
"total_tests": total_count,
"successful_tests": success_count,
"failed_tests": total_count - success_count,
"success_rate": round((success_count / total_count * 100) if total_count > 0 else 0, 2),
"avg_latency_ms": round(statistics.mean(latencies), 2) if latencies else 0,
"min_latency_ms": round(min(latencies), 2) if latencies else 0,
"max_latency_ms": round(max(latencies), 2) if latencies else 0,
"avg_cost_usd": round(statistics.mean(costs), 4) if costs else 0,
"total_cost_usd": round(sum(costs), 4),
"avg_product_count": round(statistics.mean(product_counts), 2) if product_counts else 0,
"response_consistency": consistency,
"sample_response": successful_results[0].get("ai_response", "") if successful_results else "",
}
# Thêm các cột gốc từ Excel
for key, value in row_data.items():
if key not in aggregated:
aggregated[f"original_{key}"] = value
return aggregated
def _calculate_consistency(self, responses: list[str]) -> str:
"""
Tính toán độ nhất quán của responses
Args:
responses: List các responses
Returns:
"High" | "Medium" | "Low"
"""
if len(responses) <= 1:
return "N/A"
# So sánh độ dài
lengths = [len(r) for r in responses]
length_variance = statistics.variance(lengths) if len(lengths) > 1 else 0
# So sánh nội dung (simple similarity)
if len(responses) == 2:
similarity = self._simple_similarity(responses[0], responses[1])
else:
# Tính average similarity
similarities = []
for i in range(len(responses)):
for j in range(i + 1, len(responses)):
similarities.append(self._simple_similarity(responses[i], responses[j]))
similarity = statistics.mean(similarities) if similarities else 0
# Đánh giá
if similarity > 0.8 and length_variance < 100:
return "High"
elif similarity > 0.5:
return "Medium"
else:
return "Low"
def _simple_similarity(self, text1: str, text2: str) -> float:
"""Tính similarity đơn giản giữa 2 texts"""
if not text1 or not text2:
return 0.0
# Simple word overlap
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
if not words1 or not words2:
return 0.0
intersection = len(words1 & words2)
union = len(words1 | words2)
return intersection / union if union > 0 else 0.0
"""
Langfuse Client để lấy metrics từ Langfuse API (optional)
"""
import logging
from typing import Any
import httpx
from config import LANGFUSE_BASE_URL, LANGFUSE_SECRET_KEY
logger = logging.getLogger(__name__)
class LangfuseClient:
"""Client để lấy metrics từ Langfuse"""
def __init__(self):
self.base_url = LANGFUSE_BASE_URL or "https://cloud.langfuse.com"
self.secret_key = LANGFUSE_SECRET_KEY
self.enabled = bool(self.secret_key)
async def get_trace_metrics(self, trace_id: str) -> dict[str, Any] | None:
"""
Lấy metrics từ Langfuse trace
Args:
trace_id: Langfuse trace ID
Returns:
Dict metrics hoặc None nếu không lấy được
"""
if not self.enabled:
return None
try:
url = f"{self.base_url}/api/public/traces/{trace_id}"
headers = {"Authorization": f"Bearer {self.secret_key}"}
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
latency = data.get("latency")
return {
"cost": data.get("totalCost"),
"latency_ms": float(latency * 1000) if latency else None,
"input_tokens": data.get("inputTokens"),
"output_tokens": data.get("outputTokens"),
"total_tokens": data.get("totalTokens"),
"model": data.get("model"),
}
else:
logger.warning(f"Failed to get trace {trace_id}: {response.status_code}")
return None
except Exception as e:
logger.warning(f"Error getting Langfuse metrics: {e}")
return None
async def search_traces(
self,
user_id: str | None = None,
session_id: str | None = None,
limit: int = 10,
) -> list[dict[str, Any]]:
"""
Tìm traces từ Langfuse theo user_id hoặc session_id
Args:
user_id: User ID để filter
session_id: Session ID để filter
limit: Số lượng traces tối đa
Returns:
List traces
"""
if not self.enabled:
return []
try:
url = f"{self.base_url}/api/public/traces"
headers = {"Authorization": f"Bearer {self.secret_key}"}
params: dict[str, Any] = {"limit": limit}
if user_id:
params["userId"] = user_id
if session_id:
params["sessionId"] = session_id
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(url, headers=headers, params=params)
if response.status_code == 200:
data = response.json()
return data.get("data", [])
else:
logger.warning(f"Failed to search traces: {response.status_code}")
return []
except Exception as e:
logger.warning(f"Error searching Langfuse traces: {e}")
return []
This diff is collapsed.
# Utils package
"""
Excel Handler để đọc và ghi Excel files
"""
import logging
from io import BytesIO
from typing import Any
logger = logging.getLogger(__name__)
# Lazy loading - chỉ import pandas khi cần
_pandas = None
def _get_pandas():
"""Lazy load pandas"""
global _pandas
if _pandas is None:
import pandas as pd
_pandas = pd
return _pandas
def read_excel(file_content: bytes, question_column: str = "Câu hỏi") -> list[dict[str, Any]]:
"""
Đọc Excel file và extract câu hỏi
Args:
file_content: File content dạng bytes
question_column: Tên cột chứa câu hỏi
Returns:
List dicts: [{"id": 1, "question": "...", "row_data": {...}}, ...]
"""
try:
pd = _get_pandas()
df = pd.read_excel(BytesIO(file_content))
# Tìm cột câu hỏi (case-insensitive)
question_col = None
for col in df.columns:
if question_column.lower() in col.lower() or "question" in col.lower() or "query" in col.lower():
question_col = col
break
if question_col is None:
raise ValueError(f"Không tìm thấy cột '{question_column}' trong file Excel")
results = []
for idx, row in df.iterrows():
question = str(row[question_col]).strip()
if not question or question.lower() in ["nan", "none", ""]:
continue
# Lưu toàn bộ row data để giữ nguyên các cột khác
row_data = row.to_dict()
# Convert idx to int (pandas index can be various types)
row_id = int(idx) + 1 if isinstance(idx, (int, float)) else len(results) + 1
results.append(
{
"id": row_id, # 1-based index
"question": question,
"row_data": row_data,
}
)
logger.info(f"Đọc được {len(results)} câu hỏi từ Excel")
return results
except Exception as e:
logger.error(f"Error reading Excel: {e}", exc_info=True)
raise
def create_results_excel(
summary_data: dict[str, Any],
detailed_results: list[dict[str, Any]],
aggregated_results: list[dict[str, Any]],
) -> bytes:
"""
Tạo Excel file với 3 sheets: Summary, Results, Aggregated
Args:
summary_data: Dict tổng kết
detailed_results: List kết quả chi tiết từng lần test
aggregated_results: List kết quả tổng hợp theo câu hỏi
Returns:
Excel file dạng bytes
"""
try:
pd = _get_pandas()
output = BytesIO()
# Type ignore for BytesIO - pandas accepts it at runtime
with pd.ExcelWriter(output, engine="openpyxl") as writer: # type: ignore
# Sheet 1: Summary
summary_df = pd.DataFrame([summary_data])
summary_df.to_excel(writer, sheet_name="Summary", index=False)
# Sheet 2: Results (chi tiết từng lần test)
if detailed_results:
results_df = pd.DataFrame(detailed_results)
results_df.to_excel(writer, sheet_name="Results", index=False)
# Sheet 3: Aggregated (tổng hợp theo câu hỏi)
if aggregated_results:
aggregated_df = pd.DataFrame(aggregated_results)
aggregated_df.to_excel(writer, sheet_name="Aggregated", index=False)
output.seek(0)
return output.getvalue()
except Exception as e:
logger.error(f"Error creating Excel: {e}", exc_info=True)
raise
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment