Commit fce3daf8 authored by Bot's avatar Bot

Merge branch 'main' of gitlab.hdtex.group:anhvh/tool-test-chatbot-canifa

parents 6c99fc15 a4083b51
# Environment variables
.env
.env.local
.env.*.local
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual Environment
venv/
ENV/
env/
.venv
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
Thumbs.db
# Logs
*.log
logs/
# Temporary files
*.tmp
*.temp
temp/
tmp/
# Excel files (test files)
*.xlsx
*.xls
!example_questions.xlsx
# Test results
results/
output/
# Database
*.db
*.sqlite
*.sqlite3
# Batch Testing Tool - Chatbot Canifa
Tool để test batch chatbot với file Excel, hỗ trợ test nhiều lần mỗi câu hỏi và thu thập metrics chi tiết.
## 🚀 Tính năng
- ✅ Upload file Excel với danh sách câu hỏi
- ✅ Test mỗi câu hỏi N lần (configurable)
- ✅ Track metrics chi tiết: response, product IDs, latency, cost, tokens
- ✅ Tích hợp Langfuse để lấy metrics (optional)
- ✅ Real-time progress tracking
- ✅ Export Excel với 3 sheets: Summary, Results, Aggregated
- ✅ Beautiful HTML UI
## 📋 Yêu cầu
- Python 3.10+
- Chatbot Canifa API đang chạy
- Langfuse (optional, để lấy thêm metrics)
## 🔧 Cài đặt
1. **Clone và cài đặt dependencies:**
```bash
cd C:\tool_test_chatbot_canifa
pip install -r requirements.txt
```
2. **Cấu hình `.env` file:**
```env
# Chatbot API
CHATBOT_API_URL=http://localhost:8000
CHATBOT_API_ENDPOINT=/api/agent/chat
# Tool settings
TOOL_PORT=5001
MAX_CONCURRENT_REQUESTS=5
REQUEST_TIMEOUT=60
# Langfuse (optional)
LANGFUSE_BASE_URL=https://cloud.langfuse.com
LANGFUSE_SECRET_KEY=your_secret_key_here
```
3. **Chạy server:**
```bash
python app.py
```
4. **Mở browser:**
```
http://localhost:5001
```
## 📝 Cách sử dụng
1. **Chuẩn bị file Excel:**
- Tạo file Excel với cột chứa câu hỏi (mặc định: "Câu hỏi")
- Có thể có thêm các cột khác (sẽ được giữ nguyên trong output)
2. **Upload và test:**
- Mở http://localhost:5001
- Chọn file Excel
- Nhập tên cột chứa câu hỏi (mặc định: "Câu hỏi")
- Chọn số lần test mỗi câu hỏi (1-10)
- Click "Bắt đầu Test"
3. **Theo dõi progress:**
- Xem real-time progress bar
- Xem số lượng đã xử lý, thành công, lỗi
4. **Download kết quả:**
- Sau khi hoàn thành, click "Download Excel Results"
- File Excel có 3 sheets:
- **Summary**: Tổng kết metrics
- **Results**: Chi tiết từng lần test
- **Aggregated**: Tổng hợp theo câu hỏi
## 📊 Metrics được track
### Từ API Response:
- AI Response
- Product IDs
- Status (success/error)
- Product count
### Từ Langfuse (nếu enabled):
- Cost (USD)
- Latency (ms)
- Input/Output/Total Tokens
- Model name
### Aggregated Metrics:
- Average cost/latency
- Min/Max latency
- Success rate
- Response consistency (High/Medium/Low)
- Average product count
## 🏗️ Cấu trúc code
```
tool_test_chatbot_canifa/
├── app.py # Main FastAPI app
├── config.py # Config từ .env
├── requirements.txt # Dependencies
├── api/
│ └── routes.py # API endpoints
├── services/
│ ├── api_client.py # Chatbot API client
│ ├── langfuse_client.py # Langfuse client (optional)
│ └── batch_processor.py # Batch processing logic
├── utils/
│ └── excel_handler.py # Excel read/write
└── templates/
└── index.html # HTML frontend
```
## 🔌 API Endpoints
- `POST /api/batch-test/upload` - Upload Excel và bắt đầu test
- `GET /api/batch-test/progress/{task_id}` - Lấy progress
- `GET /api/batch-test/download/{task_id}` - Download Excel kết quả
- `GET /api/batch-test/health` - Health check
## 📝 Notes
- Tool này gọi API chatbot-canifa, không cần code thay đổi ở chatbot
- Langfuse tự động track trong chatbot, tool chỉ lấy thêm metrics (optional)
- Progress được lưu trong memory, restart server sẽ mất progress
- Trong production, nên dùng Redis hoặc DB để lưu progress
## 🐛 Troubleshooting
1. **Không kết nối được chatbot API:**
- Kiểm tra `CHATBOT_API_URL` trong `.env`
- Đảm bảo chatbot server đang chạy
2. **Không đọc được Excel:**
- Kiểm tra tên cột chứa câu hỏi
- Đảm bảo file là .xlsx hoặc .xls
3. **Langfuse metrics không có:**
- Kiểm tra `LANGFUSE_SECRET_KEY` trong `.env`
- Có thể cần delay để Langfuse sync
# API package
"""
FastAPI routes cho batch testing tool
"""
import logging
import uuid
from typing import Any
from fastapi import APIRouter, BackgroundTasks, File, Form, HTTPException, UploadFile
from fastapi.responses import FileResponse, JSONResponse
from services.batch_processor import BatchProcessor
from utils.excel_handler import create_results_excel, read_excel
logger = logging.getLogger(__name__)
router = APIRouter()
# In-memory storage cho progress (trong production nên dùng Redis hoặc DB)
progress_store: dict[str, dict[str, Any]] = {}
@router.post("/api/batch-test/upload", summary="Upload Excel và bắt đầu batch test")
async def upload_and_test(
file: UploadFile = File(...),
num_tests: int = Form(1),
question_column: str = Form("Câu hỏi"),
api_url: str = Form("http://localhost:8000"),
):
"""
Upload Excel file và bắt đầu batch testing
Args:
file: Excel file
num_tests: Số lần test mỗi câu hỏi
question_column: Tên cột chứa câu hỏi
Returns:
Task ID để track progress
"""
try:
# Validate file
if not file.filename or not file.filename.endswith((".xlsx", ".xls")):
raise HTTPException(
status_code=400, detail="File phải là Excel (.xlsx hoặc .xls)"
)
# Đọc file
file_content = await file.read()
questions = read_excel(file_content, question_column=question_column)
if not questions:
raise HTTPException(
status_code=400, detail="Không tìm thấy câu hỏi nào trong file"
)
# Tạo task ID
task_id = str(uuid.uuid4())
# Initialize progress
progress_store[task_id] = {
"status": "processing",
"processed": 0,
"total": len(questions) * num_tests,
"current_question": 0,
"current_attempt": 0,
"successful": 0,
"failed": 0,
}
# Start batch processing (async)
processor = BatchProcessor(api_url=api_url)
async def process_task():
try:
def progress_callback(progress: dict[str, Any]):
progress_store[task_id].update(progress)
result = await processor.process_batch(
questions=questions,
num_tests_per_question=num_tests,
progress_callback=progress_callback,
)
# Tạo Excel output
excel_output = create_results_excel(
summary_data=result["summary"],
detailed_results=result["detailed_results"],
aggregated_results=result["aggregated_results"],
)
# Lưu kết quả
progress_store[task_id].update(
{
"status": "completed",
"result": result,
"excel_output": excel_output,
}
)
except Exception as e:
logger.error(f"Error processing batch: {e}", exc_info=True)
progress_store[task_id].update(
{
"status": "error",
"error_message": str(e),
}
)
# Run async task
import asyncio
asyncio.create_task(process_task())
return JSONResponse(
{
"task_id": task_id,
"message": "Batch testing đã bắt đầu",
"total_questions": len(questions),
"num_tests_per_question": num_tests,
"total_tests": len(questions) * num_tests,
}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in upload_and_test: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@router.get("/api/batch-test/progress/{task_id}", summary="Lấy progress của batch test")
async def get_progress(task_id: str):
"""
Lấy progress của batch test
Args:
task_id: Task ID từ upload endpoint
Returns:
Progress info
"""
if task_id not in progress_store:
raise HTTPException(status_code=404, detail="Task không tồn tại")
progress = progress_store[task_id].copy()
# Không trả về excel_output trong progress (quá lớn)
if "excel_output" in progress:
progress["excel_ready"] = True
del progress["excel_output"]
return JSONResponse(progress)
@router.get("/api/batch-test/download/{task_id}", summary="Download Excel kết quả")
async def download_results(task_id: str, background_tasks: BackgroundTasks):
"""
Download Excel file kết quả
Args:
task_id: Task ID từ upload endpoint
background_tasks: Background tasks để xóa file sau khi download
Returns:
Excel file
"""
import os
import tempfile
if task_id not in progress_store:
raise HTTPException(status_code=404, detail="Task không tồn tại")
task_data = progress_store[task_id]
if task_data["status"] != "completed":
raise HTTPException(status_code=400, detail="Task chưa hoàn thành")
if "excel_output" not in task_data:
raise HTTPException(status_code=500, detail="Excel output không tồn tại")
# Tạo temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") as tmp_file:
tmp_file.write(task_data["excel_output"])
tmp_path = tmp_file.name
# Xóa file sau khi download
background_tasks.add_task(os.unlink, tmp_path)
return FileResponse(
tmp_path,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
filename=f"batch_test_results_{task_id}.xlsx",
)
@router.get("/api/batch-test/health", summary="Health check")
async def health_check():
"""Health check endpoint"""
return JSONResponse({"status": "ok", "message": "Batch testing tool is running"})
"""
Main FastAPI application cho Batch Testing Tool
"""
import logging
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from api.routes import router
from config import TOOL_PORT
# Setup logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# Create FastAPI app
app = FastAPI(
title="Batch Testing Tool - Chatbot Canifa",
description="Tool để test batch chatbot với Excel file",
version="1.0.0",
)
# Include routes
app.include_router(router)
@app.get("/", response_class=HTMLResponse)
async def root():
"""Serve HTML frontend"""
try:
with open("templates/index.html", "r", encoding="utf-8") as f:
return HTMLResponse(content=f.read())
except FileNotFoundError:
return HTMLResponse(
content="<h1>Error: templates/index.html not found</h1>",
status_code=500,
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app:app",
host="0.0.0.0",
port=TOOL_PORT,
reload=True,
log_level="info",
)
"""
Config file cho Supabase và các environment variables
Lấy giá trị từ file .env qua os.getenv
"""
import os
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Export all config variables for type checking
__all__ = [
"AI_MODEL_NAME",
"AI_SUPABASE_KEY",
"AI_SUPABASE_URL",
"CHECKPOINT_POSTGRES_SCHEMA",
"CHECKPOINT_POSTGRES_URL",
"CLERK_SECRET_KEY",
"CONV_DATABASE_URL",
"CONV_SUPABASE_KEY",
"CONV_SUPABASE_URL",
"DEFAULT_MODEL",
"FIRECRAWL_API_KEY",
"GOOGLE_API_KEY",
"GROQ_API_KEY",
"JWT_ALGORITHM",
"JWT_SECRET",
"LANGFUSE_BASE_URL",
"LANGFUSE_PUBLIC_KEY",
"LANGFUSE_SECRET_KEY",
"LANGSMITH_API_KEY",
"LANGSMITH_ENDPOINT",
"LANGSMITH_PROJECT",
"LANGSMITH_TRACING",
"MONGODB_DB_NAME",
"MONGODB_URI",
"OPENAI_API_KEY",
"OTEL_EXPORTER_JAEGER_AGENT_HOST",
"OTEL_EXPORTER_JAEGER_AGENT_PORT",
"OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES",
"OTEL_SERVICE_NAME",
"OTEL_TRACES_EXPORTER",
"PORT",
"REDIS_HOST",
"REDIS_PASSWORD",
"REDIS_PORT",
"REDIS_USERNAME",
"STARROCKS_DB",
"STARROCKS_HOST",
"STARROCKS_PASSWORD",
"STARROCKS_PORT",
"STARROCKS_USER",
"USE_MONGO_CONVERSATION",
]
# ====================== SUPABASE CONFIGURATION ======================
AI_SUPABASE_URL: str | None = os.getenv("AI_SUPABASE_URL")
AI_SUPABASE_KEY: str | None = os.getenv("AI_SUPABASE_KEY")
CONV_SUPABASE_URL: str | None = os.getenv("CONV_SUPABASE_URL")
CONV_SUPABASE_KEY: str | None = os.getenv("CONV_SUPABASE_KEY")
# ====================== REDIS CONFIGURATION ======================
REDIS_HOST: str | None = os.getenv("REDIS_HOST")
REDIS_PORT: int = int(os.getenv("REDIS_PORT", "6379"))
REDIS_PASSWORD: str | None = os.getenv("REDIS_PASSWORD")
REDIS_USERNAME: str | None = os.getenv("REDIS_USERNAME")
# ====================== AI API KEYS & MODELS ======================
OPENAI_API_KEY: str | None = os.getenv("OPENAI_API_KEY")
GOOGLE_API_KEY: str | None = os.getenv("GOOGLE_API_KEY")
GROQ_API_KEY: str | None = os.getenv("GROQ_API_KEY")
DEFAULT_MODEL: str = os.getenv("DEFAULT_MODEL", "gpt-5-nano")
# DEFAULT_MODEL: str = os.getenv("DEFAULT_MODEL")
# ====================== JWT CONFIGURATION ======================
JWT_SECRET: str | None = os.getenv("JWT_SECRET")
JWT_ALGORITHM: str | None = os.getenv("JWT_ALGORITHM")
# ====================== SERVER CONFIG ======================
# Lấy PORT từ environment variable, mặc định 5002
# Có thể thay đổi bằng cách set PORT=xxxx trong .env hoặc system env
PORT: int = int(os.getenv("PORT", "5002"))
FIRECRAWL_API_KEY: str | None = os.getenv("FIRECRAWL_API_KEY")
# ====================== LANGFUSE CONFIGURATION (DEPRECATED) ======================
LANGFUSE_SECRET_KEY: str | None = os.getenv("LANGFUSE_SECRET_KEY")
LANGFUSE_PUBLIC_KEY: str | None = os.getenv("LANGFUSE_PUBLIC_KEY")
LANGFUSE_BASE_URL: str | None = os.getenv(
"LANGFUSE_BASE_URL", "https://cloud.langfuse.com"
)
# ====================== LANGSMITH CONFIGURATION (TẮT VÌ RATE LIMIT) ======================
# LANGSMITH_TRACING = os.getenv("LANGSMITH_TRACING", "false")
# LANGSMITH_ENDPOINT = os.getenv("LANGSMITH_ENDPOINT", "https://api.smith.langchain.com")
# LANGSMITH_API_KEY = os.getenv("LANGSMITH_API_KEY")
# LANGSMITH_PROJECT = os.getenv("LANGSMITH_PROJECT")
LANGSMITH_TRACING = "false"
LANGSMITH_ENDPOINT = None
LANGSMITH_API_KEY = None
LANGSMITH_PROJECT = None
# ====================== CLERK AUTHENTICATION ======================
CLERK_SECRET_KEY: str | None = os.getenv("CLERK_SECRET_KEY")
# ====================== DATABASE CONNECTION ======================
CONV_DATABASE_URL: str | None = os.getenv("CONV_DATABASE_URL")
# ====================== MONGO CONFIGURATION ======================
MONGODB_URI: str | None = os.getenv("MONGODB_URI", "mongodb://localhost:27017")
MONGODB_DB_NAME: str | None = os.getenv("MONGODB_DB_NAME", "ai_law")
USE_MONGO_CONVERSATION: bool = (
os.getenv("USE_MONGO_CONVERSATION", "true").lower() == "true"
)
# ====================== CANIFA INTERNAL POSTGRES ======================
CHECKPOINT_POSTGRES_URL: str | None = os.getenv("CHECKPOINT_POSTGRES_URL")
CHECKPOINT_POSTGRES_SCHEMA: str = os.getenv("CHECKPOINT_POSTGRES_SCHEMA", "canifa_chat")
# ====================== STARROCKS DATA LAKE ======================
STARROCKS_HOST: str | None = os.getenv("STARROCKS_HOST")
STARROCKS_PORT: int = int(os.getenv("STARROCKS_PORT", "9030"))
STARROCKS_USER: str | None = os.getenv("STARROCKS_USER")
STARROCKS_PASSWORD: str | None = os.getenv("STARROCKS_PASSWORD")
STARROCKS_DB: str | None = os.getenv("STARROCKS_DB")
# Placeholder for backward compatibility if needed
AI_MODEL_NAME = DEFAULT_MODEL
# ====================== OPENTELEMETRY CONFIGURATION ======================
OTEL_EXPORTER_JAEGER_AGENT_HOST = os.getenv("OTEL_EXPORTER_JAEGER_AGENT_HOST")
OTEL_EXPORTER_JAEGER_AGENT_PORT = os.getenv("OTEL_EXPORTER_JAEGER_AGENT_PORT")
OTEL_SERVICE_NAME = os.getenv("OTEL_SERVICE_NAME")
OTEL_TRACES_EXPORTER = os.getenv("OTEL_TRACES_EXPORTER")
OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES = os.getenv(
"OTEL_EXPORTER_JAEGER_AGENT_SPLIT_OVERSIZED_BATCHES"
)
# ====================== BATCH TESTING TOOL CONFIGURATION ======================
CHATBOT_API_URL: str = os.getenv("CHATBOT_API_URL", "http://localhost:8000")
CHATBOT_API_ENDPOINT: str = os.getenv("CHATBOT_API_ENDPOINT", "/api/agent/chat")
TOOL_PORT: int = int(os.getenv("TOOL_PORT", "5002"))
MAX_CONCURRENT_REQUESTS: int = int(os.getenv("MAX_CONCURRENT_REQUESTS", "5"))
REQUEST_TIMEOUT: int = int(os.getenv("REQUEST_TIMEOUT", "60"))
"""
Script để tạo file Excel mẫu cho batch testing
"""
from utils.excel_handler import _get_pandas
# Lazy load pandas
pd = _get_pandas()
# Tạo dữ liệu mẫu
data = {
"Câu hỏi": [
"Áo sơ mi trắng size M giá bao nhiêu?",
"Quần jean nam có màu gì?",
"Giày thể thao nữ size 38",
"Áo khoác mùa đông có size nào?",
"Túi xách da có màu đen không?",
],
"Category": [
"Áo",
"Quần",
"Giày",
"Áo khoác",
"Phụ kiện",
],
"Expected_Products": [
"Áo sơ mi",
"Quần jean",
"Giày thể thao",
"Áo khoác",
"Túi xách",
],
}
# Tạo DataFrame
df = pd.DataFrame(data)
# Lưu vào Excel
df.to_excel("example_questions.xlsx", index=False)
print("✅ Đã tạo file example_questions.xlsx")
print("\nFormat file Excel:")
print(df.to_string())
.\.venv\Scripts\activate
\ No newline at end of file
# Services package
"""
API Client để gọi chatbot-canifa API
"""
import asyncio
import logging
import time
from typing import Any
import httpx
from config import CHATBOT_API_ENDPOINT, CHATBOT_API_URL, REQUEST_TIMEOUT
logger = logging.getLogger(__name__)
class ChatbotAPIClient:
"""Client để gọi chatbot API"""
def __init__(self, api_url: str | None = None):
self.base_url = api_url or CHATBOT_API_URL
self.endpoint = CHATBOT_API_ENDPOINT
self.timeout = REQUEST_TIMEOUT
async def chat(
self,
query: str,
user_id: str | None = None,
test_id: str | None = None,
test_attempt: int | None = None,
) -> dict[str, Any]:
"""
Gọi chatbot API và trả về response với metrics
Args:
query: Câu hỏi
user_id: User ID (optional)
test_id: Test ID để track (optional)
test_attempt: Số lần test (1, 2, 3...) (optional)
Returns:
Dict chứa response và metrics:
{
"status": "success" | "error",
"ai_response": str,
"product_ids": list,
"latency_ms": float,
"timestamp": str,
"error_message": str | None
}
"""
if not user_id:
user_id = (
f"batch_test_{test_id}_{test_attempt}" if test_id else "batch_test_user"
)
# Nếu api_url đã chứa endpoint (kết thúc bằng /chat), dùng trực tiếp
# Nếu không, thêm endpoint vào
if self.base_url.endswith("/chat") or self.base_url.endswith("/api/agent/chat"):
url = self.base_url
else:
url = f"{self.base_url}{self.endpoint}"
payload = {
"user_query": query,
"user_id": user_id,
}
start_time = time.time()
error_message = None
status = "error"
ai_response = ""
product_ids = []
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(url, json=payload)
latency_ms = (time.time() - start_time) * 1000
if response.status_code == 200:
data = response.json()
status = data.get("status", "error")
ai_response = data.get("ai_response", "")
product_ids = data.get("product_ids", [])
else:
error_message = (
f"API returned status {response.status_code}: {response.text}"
)
logger.error(error_message)
except httpx.TimeoutException:
latency_ms = (time.time() - start_time) * 1000
error_message = f"Request timeout after {self.timeout}s"
logger.error(error_message)
except Exception as e:
latency_ms = (time.time() - start_time) * 1000
error_message = str(e)
logger.error(f"Error calling chatbot API: {e}", exc_info=True)
return {
"status": status,
"ai_response": ai_response,
"product_ids": product_ids,
"latency_ms": round(latency_ms, 2),
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"error_message": error_message,
}
async def batch_chat(
self,
queries: list[str],
user_id_prefix: str = "batch_test",
max_concurrent: int = 5,
) -> list[dict[str, Any]]:
"""
Gọi API cho nhiều queries song song
Args:
queries: List câu hỏi
user_id_prefix: Prefix cho user_id
max_concurrent: Số requests đồng thời tối đa
Returns:
List kết quả theo thứ tự queries
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def chat_with_semaphore(index: int, query: str):
async with semaphore:
return await self.chat(query, user_id=f"{user_id_prefix}_{index}")
tasks = [chat_with_semaphore(i, query) for i, query in enumerate(queries)]
results = await asyncio.gather(*tasks)
return results
This diff is collapsed.
"""
Langfuse Client để lấy metrics từ Langfuse API (optional)
"""
import logging
from typing import Any
import httpx
from config import LANGFUSE_BASE_URL, LANGFUSE_SECRET_KEY
logger = logging.getLogger(__name__)
class LangfuseClient:
"""Client để lấy metrics từ Langfuse"""
def __init__(self):
self.base_url = LANGFUSE_BASE_URL or "https://cloud.langfuse.com"
self.secret_key = LANGFUSE_SECRET_KEY
self.enabled = bool(self.secret_key)
async def get_trace_metrics(self, trace_id: str) -> dict[str, Any] | None:
"""
Lấy metrics từ Langfuse trace
Args:
trace_id: Langfuse trace ID
Returns:
Dict metrics hoặc None nếu không lấy được
"""
if not self.enabled:
return None
try:
url = f"{self.base_url}/api/public/traces/{trace_id}"
headers = {"Authorization": f"Bearer {self.secret_key}"}
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
latency = data.get("latency")
return {
"cost": data.get("totalCost"),
"latency_ms": float(latency * 1000) if latency else None,
"input_tokens": data.get("inputTokens"),
"output_tokens": data.get("outputTokens"),
"total_tokens": data.get("totalTokens"),
"model": data.get("model"),
}
else:
logger.warning(f"Failed to get trace {trace_id}: {response.status_code}")
return None
except Exception as e:
logger.warning(f"Error getting Langfuse metrics: {e}")
return None
async def search_traces(
self,
user_id: str | None = None,
session_id: str | None = None,
limit: int = 10,
) -> list[dict[str, Any]]:
"""
Tìm traces từ Langfuse theo user_id hoặc session_id
Args:
user_id: User ID để filter
session_id: Session ID để filter
limit: Số lượng traces tối đa
Returns:
List traces
"""
if not self.enabled:
return []
try:
url = f"{self.base_url}/api/public/traces"
headers = {"Authorization": f"Bearer {self.secret_key}"}
params: dict[str, Any] = {"limit": limit}
if user_id:
params["userId"] = user_id
if session_id:
params["sessionId"] = session_id
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(url, headers=headers, params=params)
if response.status_code == 200:
data = response.json()
return data.get("data", [])
else:
logger.warning(f"Failed to search traces: {response.status_code}")
return []
except Exception as e:
logger.warning(f"Error searching Langfuse traces: {e}")
return []
This diff is collapsed.
# Utils package
"""
Excel Handler để đọc và ghi Excel files
"""
import logging
from io import BytesIO
from typing import Any
logger = logging.getLogger(__name__)
# Lazy loading - chỉ import pandas khi cần
_pandas = None
def _get_pandas():
"""Lazy load pandas"""
global _pandas
if _pandas is None:
import pandas as pd
_pandas = pd
return _pandas
def read_excel(
file_content: bytes, question_column: str = "Câu hỏi"
) -> list[dict[str, Any]]:
"""
Đọc Excel file và extract câu hỏi
Args:
file_content: File content dạng bytes
question_column: Tên cột chứa câu hỏi
Returns:
List dicts: [{"id": 1, "question": "...", "row_data": {...}}, ...]
"""
try:
pd = _get_pandas()
df = pd.read_excel(BytesIO(file_content))
# Tìm cột câu hỏi (case-insensitive)
question_col = None
for col in df.columns:
if (
question_column.lower() in col.lower()
or "question" in col.lower()
or "query" in col.lower()
):
question_col = col
break
# Nếu chỉ có 1 cột trong file, mặc định dùng cột đó làm question
if question_col is None:
if df.shape[1] == 1:
question_col = df.columns[0]
else:
raise ValueError(
f"Không tìm thấy cột '{question_column}' trong file Excel"
)
results = []
for idx, row in df.iterrows():
question = str(row[question_col]).strip()
if not question or question.lower() in ["nan", "none", ""]:
continue
# Lưu toàn bộ row data để giữ nguyên các cột khác
row_data = row.to_dict()
# Convert idx to int (pandas index can be various types)
row_id = int(idx) + 1 if isinstance(idx, (int, float)) else len(results) + 1
results.append(
{
"id": row_id, # 1-based index
"question": question,
"row_data": row_data,
}
)
logger.info(f"Đọc được {len(results)} câu hỏi từ Excel")
return results
except Exception as e:
logger.error(f"Error reading Excel: {e}", exc_info=True)
raise
def create_results_excel(
summary_data: dict[str, Any],
detailed_results: list[dict[str, Any]],
aggregated_results: list[dict[str, Any]],
) -> bytes:
"""
Tạo Excel file với 1 sheet duy nhất:
- Cột Question (câu hỏi)
- Cột Answer1, Answer2, ... (theo số lần test)
Args:
summary_data: Dict tổng kết
detailed_results: List kết quả chi tiết từng lần test
aggregated_results: List kết quả tổng hợp theo câu hỏi
Returns:
Excel file dạng bytes
"""
try:
pd = _get_pandas()
output = BytesIO()
# Lấy số lần test từ summary
num_tests = int(summary_data.get("num_tests_per_question", 1))
# Xây dựng dữ liệu cho sheet: mỗi row là 1 câu hỏi + các answers
sheet_data: list[dict[str, Any]] = []
for agg in aggregated_results:
row: dict[str, Any] = {"Question": agg.get("question", "")}
question_id = agg.get("question_id")
for test_num in range(1, num_tests + 1):
# Tìm kết quả của lần test này trong detailed_results
result = next(
(
r
for r in detailed_results
if r.get("question_id") == question_id
and r.get("test_attempt") == test_num
),
None,
)
answer = result.get("ai_response", "") if result else ""
row[f"Answer {test_num}"] = answer
sheet_data.append(row)
# Tạo DataFrame và ghi ra Excel
results_df = pd.DataFrame(sheet_data)
with pd.ExcelWriter(output, engine="openpyxl") as writer: # type: ignore
results_df.to_excel(writer, sheet_name="Results", index=False)
# Format column width
worksheet = writer.sheets["Results"]
for column_cells in worksheet.columns:
max_length = 0
column_letter = column_cells[0].column_letter
for cell in column_cells:
try:
if cell.value:
max_length = max(max_length, len(str(cell.value)))
except Exception:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
output.seek(0)
return output.getvalue()
except Exception as e:
logger.error(f"Error creating Excel: {e}", exc_info=True)
raise
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment