Commit 906e5cc3 authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

perf: QA Gatekeeper audit fixes for backend concurrency and thread-safety

parent 24d87940
...@@ -269,7 +269,7 @@ def _get_colors_by_group(group: str, color_groups_map: dict, exclude: str | None ...@@ -269,7 +269,7 @@ def _get_colors_by_group(group: str, color_groups_map: dict, exclude: str | None
@router.get("/{code}") @router.get("/{code}")
async def get_fashion_matches(code: str): def get_fashion_matches(code: str):
from worker.stylist_engine import StylistEngine from worker.stylist_engine import StylistEngine
engine = StylistEngine() engine = StylistEngine()
...@@ -426,7 +426,7 @@ async def color_logic(req: ColorLogicRequest): ...@@ -426,7 +426,7 @@ async def color_logic(req: ColorLogicRequest):
@router.post("/outfit-suggest") @router.post("/outfit-suggest")
async def outfit_suggest(req: OutfitSuggestRequest): def outfit_suggest(req: OutfitSuggestRequest):
try: try:
rules = _load_rules() rules = _load_rules()
color_keys = rules.get("color_keys", {}) color_keys = rules.get("color_keys", {})
...@@ -531,7 +531,7 @@ class ScoreTestRequest(BaseModel): ...@@ -531,7 +531,7 @@ class ScoreTestRequest(BaseModel):
@router.post("/score-test") @router.post("/score-test")
async def score_test(req: ScoreTestRequest): def score_test(req: ScoreTestRequest):
try: try:
from worker.stylist_engine import StylistEngine from worker.stylist_engine import StylistEngine
...@@ -569,7 +569,7 @@ async def score_test(req: ScoreTestRequest): ...@@ -569,7 +569,7 @@ async def score_test(req: ScoreTestRequest):
@router.get("/audit/tag-coverage") @router.get("/audit/tag-coverage")
async def audit_tag_coverage(limit: int = Query(300, ge=20, le=2000), q: str = ""): def audit_tag_coverage(limit: int = Query(300, ge=20, le=2000), q: str = ""):
"""Audit product tags vs rules coverage for data QA.""" """Audit product tags vs rules coverage for data QA."""
try: try:
from worker.stylist_engine import StylistEngine from worker.stylist_engine import StylistEngine
...@@ -745,7 +745,7 @@ async def audit_tag_coverage(limit: int = Query(300, ge=20, le=2000), q: str = " ...@@ -745,7 +745,7 @@ async def audit_tag_coverage(limit: int = Query(300, ge=20, le=2000), q: str = "
# --- Rules Framework HTML View --- # --- Rules Framework HTML View ---
@router.get("/rules/view") @router.get("/rules/view")
async def rules_view(): def rules_view():
from collections import defaultdict from collections import defaultdict
from fastapi.responses import HTMLResponse from fastapi.responses import HTMLResponse
......
...@@ -6,6 +6,7 @@ Based on chatbot-rsa pattern ...@@ -6,6 +6,7 @@ Based on chatbot-rsa pattern
import asyncio import asyncio
import logging import logging
import os import os
import threading
from typing import Any from typing import Any
import aiomysql import aiomysql
...@@ -47,8 +48,8 @@ get_db_connection = _manager.get_connection ...@@ -47,8 +48,8 @@ get_db_connection = _manager.get_connection
class StarRocksConnection: class StarRocksConnection:
# Shared connection (Singleton-like behavior) for all instances # Use thread-local storage to ensure thread-safety across FastAPI sync workers
_shared_conn = None _thread_local = threading.local()
def __init__( def __init__(
self, self,
...@@ -71,16 +72,17 @@ class StarRocksConnection: ...@@ -71,16 +72,17 @@ class StarRocksConnection:
""" """
Establish or reuse persistent connection. Establish or reuse persistent connection.
""" """
# 1. Try to reuse existing shared connection # 1. Try to reuse existing thread-local connection
if StarRocksConnection._shared_conn and StarRocksConnection._shared_conn.open: conn = getattr(StarRocksConnection._thread_local, 'conn', None)
if conn and conn.open:
try: try:
# Ping to check if alive, reconnect if needed # Ping to check if alive, reconnect if needed
StarRocksConnection._shared_conn.ping(reconnect=True) conn.ping(reconnect=True)
self.conn = StarRocksConnection._shared_conn self.conn = conn
return self.conn return self.conn
except Exception as e: except Exception as e:
logger.warning(f"⚠️ Connection lost, reconnecting: {e}") logger.warning(f"⚠️ Connection lost, reconnecting: {e}")
StarRocksConnection._shared_conn = None StarRocksConnection._thread_local.conn = None
# 2. Create new connection if needed # 2. Create new connection if needed
print(f" [DB] 🔌 Đang kết nối StarRocks (New Session): {self.host}:{self.port}...") print(f" [DB] 🔌 Đang kết nối StarRocks (New Session): {self.host}:{self.port}...")
...@@ -101,8 +103,8 @@ class StarRocksConnection: ...@@ -101,8 +103,8 @@ class StarRocksConnection:
print(" [DB] ✅ Kết nối thành công.") print(" [DB] ✅ Kết nối thành công.")
logger.info("✅ Connected to StarRocks") logger.info("✅ Connected to StarRocks")
# Save to class variable # Save to thread-local variable
StarRocksConnection._shared_conn = new_conn StarRocksConnection._thread_local.conn = new_conn
self.conn = new_conn self.conn = new_conn
except Exception as e: except Exception as e:
...@@ -136,7 +138,7 @@ class StarRocksConnection: ...@@ -136,7 +138,7 @@ class StarRocksConnection:
print(f" [DB] ❌ Lỗi truy vấn: {e!s}") print(f" [DB] ❌ Lỗi truy vấn: {e!s}")
logger.error(f"❌ StarRocks query error: {e}") logger.error(f"❌ StarRocks query error: {e}")
# Incase of query error due to connection, invalidate it # Incase of query error due to connection, invalidate it
StarRocksConnection._shared_conn = None StarRocksConnection._thread_local.conn = None
raise raise
# FINALLY BLOCK REMOVED: Do NOT close connection # FINALLY BLOCK REMOVED: Do NOT close connection
...@@ -264,7 +266,8 @@ class StarRocksConnection: ...@@ -264,7 +266,8 @@ class StarRocksConnection:
def close(self): def close(self):
"""Explicitly close if needed (e.g. app shutdown)""" """Explicitly close if needed (e.g. app shutdown)"""
if StarRocksConnection._shared_conn and StarRocksConnection._shared_conn.open: conn = getattr(StarRocksConnection._thread_local, 'conn', None)
StarRocksConnection._shared_conn.close() if conn and conn.open:
StarRocksConnection._shared_conn = None conn.close()
StarRocksConnection._thread_local.conn = None
self.conn = None self.conn = None
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment