Commit 2c568c1d authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

feat: per-message SSE streaming, localStorage persistence, resume from...

feat: per-message SSE streaming, localStorage persistence, resume from incomplete, fix ASGI middleware buffering
parent 2fc434b5
...@@ -184,27 +184,39 @@ async def run_test_batch(req: TestRunRequest): ...@@ -184,27 +184,39 @@ async def run_test_batch(req: TestRunRequest):
elapsed = time.time() - start_time elapsed = time.time() - start_time
total_time += elapsed total_time += elapsed
msg_result = None
if response.status_code == 200: if response.status_code == 200:
data = response.json() data = response.json()
version_responses.append({ msg_result = {
"message": message, "message": message,
"ai_response": data.get("ai_response", ""), "ai_response": data.get("ai_response", ""),
"product_ids": data.get("product_ids", []), "product_ids": data.get("product_ids", []),
"response_time": round(elapsed, 2), "response_time": round(elapsed, 2),
}) }
else: else:
version_responses.append({ msg_result = {
"message": message, "message": message,
"ai_response": f"[ERROR {response.status_code}] {response.text[:200]}", "ai_response": f"[ERROR {response.status_code}] {response.text[:200]}",
"product_ids": [], "product_ids": [],
"response_time": round(elapsed, 2), "response_time": round(elapsed, 2),
}
version_responses.append(msg_result)
# Stream per-message update immediately
yield _sse_event("message_done", {
"conv_id": conv_id,
"version": version,
"msg_index": msg_idx,
"ai_response": msg_result["ai_response"],
"response_time": msg_result["response_time"],
}) })
# Delay between messages # Delay between messages
if msg_idx < len(messages) - 1 and req.delay_ms > 0: if msg_idx < len(messages) - 1 and req.delay_ms > 0:
await asyncio.sleep(req.delay_ms / 1000) await asyncio.sleep(req.delay_ms / 1000)
# Store result # Store result for conv×version
result = { result = {
"conv_id": conv_id, "conv_id": conv_id,
"version": version, "version": version,
...@@ -219,7 +231,7 @@ async def run_test_batch(req: TestRunRequest): ...@@ -219,7 +231,7 @@ async def run_test_batch(req: TestRunRequest):
completed += 1 completed += 1
_test_results[test_id]["completed"] = completed _test_results[test_id]["completed"] = completed
# Stream progress event # Stream conv×version completion
yield _sse_event("progress", { yield _sse_event("progress", {
"result_key": result_key, "result_key": result_key,
"conv_id": conv_id, "conv_id": conv_id,
......
...@@ -13,7 +13,6 @@ from typing import TYPE_CHECKING ...@@ -13,7 +13,6 @@ from typing import TYPE_CHECKING
from fastapi import Request from fastapi import Request
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from common.canifa_api import extract_user_id_from_canifa_response, verify_canifa_token from common.canifa_api import extract_user_id_from_canifa_response, verify_canifa_token
from common.message_limit import message_limit_service from common.message_limit import message_limit_service
...@@ -58,9 +57,12 @@ RATE_LIMITED_PATHS = [ ...@@ -58,9 +57,12 @@ RATE_LIMITED_PATHS = [
] ]
class CanifaAuthMiddleware(BaseHTTPMiddleware): class CanifaAuthMiddleware:
""" """
Canifa Authentication + Rate Limit Middleware Canifa Authentication + Rate Limit Middleware (Pure ASGI)
Uses raw ASGI instead of BaseHTTPMiddleware to avoid response buffering
which breaks SSE streaming.
Flow: Flow:
1. Frontend gửi request với Authorization: Bearer <canifa_token> 1. Frontend gửi request với Authorization: Bearer <canifa_token>
...@@ -70,17 +72,27 @@ class CanifaAuthMiddleware(BaseHTTPMiddleware): ...@@ -70,17 +72,27 @@ class CanifaAuthMiddleware(BaseHTTPMiddleware):
5. Routes lấy trực tiếp từ request.state 5. Routes lấy trực tiếp từ request.state
""" """
async def dispatch(self, request: Request, call_next: Callable): def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = Request(scope, receive, send)
path = request.url.path path = request.url.path
method = request.method method = request.method
# ✅ Allow OPTIONS requests (CORS preflight) # ✅ Allow OPTIONS requests (CORS preflight)
if method == "OPTIONS": if method == "OPTIONS":
return await call_next(request) await self.app(scope, receive, send)
return
# Skip public endpoints # Skip public endpoints — pass through without wrapping (preserves SSE streaming)
if path in PUBLIC_PATHS or any(path.startswith(prefix) for prefix in PUBLIC_PATH_PREFIXES): if path in PUBLIC_PATHS or any(path.startswith(prefix) for prefix in PUBLIC_PATH_PREFIXES):
return await call_next(request) await self.app(scope, receive, send)
return
# ===================================================================== # =====================================================================
# STEP 1: AUTHENTICATION & IDENTITY # STEP 1: AUTHENTICATION & IDENTITY
...@@ -112,7 +124,13 @@ class CanifaAuthMiddleware(BaseHTTPMiddleware): ...@@ -112,7 +124,13 @@ class CanifaAuthMiddleware(BaseHTTPMiddleware):
if method in ["POST", "PUT", "PATCH"]: if method in ["POST", "PUT", "PATCH"]:
try: try:
body_bytes = await request.body() body_bytes = await request.body()
request._receive = lambda: {"type": "http.request", "body": body_bytes} scope["_body"] = body_bytes # cache body for re-read
async def receive_with_body():
return {"type": "http.request", "body": body_bytes}
receive = receive_with_body
if body_bytes: if body_bytes:
try: try:
data = json.loads(body_bytes) data = json.loads(body_bytes)
...@@ -150,7 +168,7 @@ class CanifaAuthMiddleware(BaseHTTPMiddleware): ...@@ -150,7 +168,7 @@ class CanifaAuthMiddleware(BaseHTTPMiddleware):
if not can_send: if not can_send:
logger.warning(f"⚠️ Rate Limit: {identity_key} | {limit_info['used']}/{limit_info['limit']}") logger.warning(f"⚠️ Rate Limit: {identity_key} | {limit_info['used']}/{limit_info['limit']}")
return JSONResponse( response = JSONResponse(
status_code=429, status_code=429,
content={ content={
"status": "error", "status": "error",
...@@ -165,10 +183,12 @@ class CanifaAuthMiddleware(BaseHTTPMiddleware): ...@@ -165,10 +183,12 @@ class CanifaAuthMiddleware(BaseHTTPMiddleware):
}, },
}, },
) )
await response(scope, receive, send)
return
except Exception as e: except Exception as e:
logger.error(f"❌ Rate Limit Check Error: {e}") logger.error(f"❌ Rate Limit Check Error: {e}")
return await call_next(request) await self.app(scope, receive, send)
# ============================================================================= # =============================================================================
......
...@@ -784,6 +784,8 @@ ...@@ -784,6 +784,8 @@
let isRunning = false; let isRunning = false;
let abortController = null; let abortController = null;
const LS_KEY = 'canifa_test_editor_data'; const LS_KEY = 'canifa_test_editor_data';
const LS_RESULTS_KEY = 'canifa_test_results';
const LS_NUMVERSIONS_KEY = 'canifa_test_numversions';
// ─── Tab Switching ─── // ─── Tab Switching ───
function switchTab(tabName, btn) { function switchTab(tabName, btn) {
...@@ -882,11 +884,34 @@ ...@@ -882,11 +884,34 @@
document.addEventListener('DOMContentLoaded', () => { document.addEventListener('DOMContentLoaded', () => {
const restored = loadEditorFromLS(); const restored = loadEditorFromLS();
if (!restored) { if (!restored) {
// Add 3 default empty rows
addEditorRow('1', ''); addEditorRow('1', '');
addEditorRow('1', ''); addEditorRow('1', '');
addEditorRow('2', ''); addEditorRow('2', '');
} }
// Restore saved numVersions
const savedVersions = localStorage.getItem(LS_NUMVERSIONS_KEY);
if (savedVersions) document.getElementById('numVersions').value = savedVersions;
// Auto-build table if editor has data, then restore saved results
if (restored) {
const rows = getEditorRows();
if (rows.length > 0) {
const conversations = {};
let totalMsgs = 0;
for (const r of rows) {
if (!conversations[r.convId]) conversations[r.convId] = [];
conversations[r.convId].push(r.message);
totalMsgs++;
}
parsedConversations = conversations;
buildTable({
conversations,
total_conversations: Object.keys(conversations).length,
total_messages: totalMsgs
});
}
}
}); });
// ─── File Upload ─── // ─── File Upload ───
...@@ -1004,9 +1029,32 @@ ...@@ -1004,9 +1029,32 @@
}); });
} }
// Reset results // Restore any saved results into the table
testResults = {}; restoreResultsToTable();
document.getElementById('exportBtn').style.display = 'none';
// Show export if there are saved results
document.getElementById('exportBtn').style.display = Object.keys(testResults).length > 0 ? 'inline-flex' : 'none';
}
// ─── localStorage: Save/Restore Results ───
function saveResultsToLS() {
try {
localStorage.setItem(LS_RESULTS_KEY, JSON.stringify(testResults));
} catch (e) { /* quota exceeded, ignore */ }
}
function restoreResultsToTable() {
try {
const saved = JSON.parse(localStorage.getItem(LS_RESULTS_KEY));
if (saved && Object.keys(saved).length > 0) {
testResults = saved;
for (const [key, data] of Object.entries(saved)) {
if (data.responses) {
updateResponseCell(data);
}
}
}
} catch (e) { /* ignore parse errors */ }
} }
// ─── Run Test ─── // ─── Run Test ───
...@@ -1018,32 +1066,62 @@ ...@@ -1018,32 +1066,62 @@
if (!endpointUrl) { alert('Please enter endpoint URL'); return; } if (!endpointUrl) { alert('Please enter endpoint URL'); return; }
if (Object.keys(parsedConversations).length === 0) { alert('Please upload test data first'); return; } if (Object.keys(parsedConversations).length === 0) { alert('Please upload test data first'); return; }
// Save numVersions to localStorage
localStorage.setItem(LS_NUMVERSIONS_KEY, numVersions);
// Determine which conv_ids are fully completed (all versions done)
const incompleteConvIds = [];
for (const convId of Object.keys(parsedConversations)) {
let allDone = true;
for (let v = 1; v <= numVersions; v++) {
const key = `${convId}_v${v}`;
if (!testResults[key] || !testResults[key].responses || testResults[key].responses.length === 0) {
allDone = false;
break;
}
}
if (!allDone) incompleteConvIds.push(convId);
}
if (incompleteConvIds.length === 0) {
alert('All conversations already have results! Click Clear to reset.');
return;
}
// Reset // Reset
isRunning = true; isRunning = true;
abortController = new AbortController(); abortController = new AbortController();
testResults = {}; const alreadyCompleted = Object.keys(parsedConversations).length * numVersions - incompleteConvIds.length * numVersions;
completedTasks = 0; completedTasks = alreadyCompleted;
totalTasks = Object.keys(parsedConversations).length * numVersions; totalTasks = Object.keys(parsedConversations).length * numVersions;
// Reset response cells to waiting // Only reset cells for incomplete conversations
for (const convId of Object.keys(parsedConversations)) { for (const convId of incompleteConvIds) {
const msgs = parsedConversations[convId]; const msgs = parsedConversations[convId];
for (let mi = 0; mi < msgs.length; mi++) { for (let mi = 0; mi < msgs.length; mi++) {
for (let v = 1; v <= numVersions; v++) { for (let v = 1; v <= numVersions; v++) {
const key = `${convId}_v${v}`;
if (!testResults[key] || !testResults[key].responses || testResults[key].responses.length === 0) {
const respCell = document.getElementById(`resp-${convId}-msg${mi}-v${v}`); const respCell = document.getElementById(`resp-${convId}-msg${mi}-v${v}`);
if (respCell) { respCell.innerHTML = '<span style="color:#ffc107">⏳</span>'; respCell.classList.add('waiting'); } if (respCell) { respCell.innerHTML = '<span style="color:#ffc107">⏳</span>'; respCell.classList.add('waiting'); }
} }
} }
} }
}
// Show progress // Show progress
document.getElementById('progressCard').style.display = 'block'; document.getElementById('progressCard').style.display = 'block';
document.getElementById('runAllBtn').disabled = true; document.getElementById('runAllBtn').disabled = true;
document.getElementById('stopBtn').style.display = 'inline-flex'; document.getElementById('stopBtn').style.display = 'inline-flex';
document.getElementById('progressBar').style.width = '0%';
document.getElementById('logPanel').innerHTML = ''; document.getElementById('logPanel').innerHTML = '';
document.getElementById('exportBtn').style.display = 'none'; document.getElementById('exportBtn').style.display = 'none';
// Set initial progress (may not be 0 if resuming)
updateProgress(completedTasks, totalTasks);
if (alreadyCompleted > 0) {
addLog(`⏩ Skipping ${alreadyCompleted} already completed tasks, resuming...`, 'info');
}
// SSE request // SSE request
try { try {
const response = await fetch('/api/test/run', { const response = await fetch('/api/test/run', {
...@@ -1054,6 +1132,7 @@ ...@@ -1054,6 +1132,7 @@
conversations: parsedConversations, conversations: parsedConversations,
num_versions: numVersions, num_versions: numVersions,
delay_ms: delayMs, delay_ms: delayMs,
selected_conv_ids: incompleteConvIds,
}), }),
signal: abortController.signal, signal: abortController.signal,
}); });
...@@ -1112,17 +1191,22 @@ ...@@ -1112,17 +1191,22 @@
addLog(`🚀 Test started — ${data.total_conversations} convs × ${data.num_versions} versions = ${data.total_tasks} tasks`, 'info'); addLog(`🚀 Test started — ${data.total_conversations} convs × ${data.num_versions} versions = ${data.total_tasks} tasks`, 'info');
break; break;
case 'message_done':
// Update single cell immediately
updateSingleCell(data.conv_id, data.version, data.msg_index, data.ai_response, data.response_time);
break;
case 'progress': case 'progress':
completedTasks = data.completed; completedTasks++;
updateProgress(data.completed, totalTasks); updateProgress(completedTasks, totalTasks);
testResults[data.result_key] = data; testResults[data.result_key] = data;
updateResponseCell(data); saveResultsToLS();
addLog(`✅ Conv #${data.conv_id} v${data.version}${data.total_time}s — 🔑 ${data.device_id}`, 'success'); addLog(`✅ Conv #${data.conv_id} v${data.version}${data.total_time}s — 🔑 ${data.device_id}`, 'success');
break; break;
case 'error': case 'error':
completedTasks = data.completed; completedTasks++;
updateProgress(data.completed, totalTasks); updateProgress(completedTasks, totalTasks);
updateResponseCellError(data); updateResponseCellError(data);
addLog(`❌ Conv #${data.conv_id} v${data.version}${data.error}`, 'error'); addLog(`❌ Conv #${data.conv_id} v${data.version}${data.error}`, 'error');
break; break;
...@@ -1151,6 +1235,14 @@ ...@@ -1151,6 +1235,14 @@
} }
} }
function updateSingleCell(convId, version, msgIndex, aiResponse, responseTime) {
const cell = document.getElementById(`resp-${convId}-msg${msgIndex}-v${version}`);
if (cell) {
cell.classList.remove('waiting');
cell.innerHTML = `<div class="response-cell"><div>${escapeHtml(aiResponse || '')}</div><div style="margin-top:4px;font-size:11px;color:#888">${responseTime || 0}s</div></div>`;
}
}
function updateResponseCellError(data) { function updateResponseCellError(data) {
const convId = data.conv_id; const convId = data.conv_id;
const v = data.version; const v = data.version;
...@@ -1260,6 +1352,8 @@ ...@@ -1260,6 +1352,8 @@
addEditorRow('1', ''); addEditorRow('1', '');
addEditorRow('2', ''); addEditorRow('2', '');
localStorage.removeItem(LS_KEY); localStorage.removeItem(LS_KEY);
localStorage.removeItem(LS_RESULTS_KEY);
localStorage.removeItem(LS_NUMVERSIONS_KEY);
// Also clear server-side results // Also clear server-side results
fetch('/api/test/results/clear', { method: 'DELETE' }) fetch('/api/test/results/clear', { method: 'DELETE' })
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment