Commit 311db03f authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

Optimize embedding order in data_retrieval_tool - call embedding before DB connection

parent 8bd3d9cf
......@@ -16,7 +16,7 @@ ENV ENV=development
COPY requirements.txt .
# Cài đặt thư viện Python (Docker layer cache)
RUN pip install -r requirements.txt
RUN pip install -r requirements.txt && pip install watchdog[watchmedo]
# Copy toàn bộ source code vào image
COPY . .
......@@ -28,6 +28,4 @@ EXPOSE 5000
HEALTHCHECK --interval=10s --timeout=5s --start-period=5s --retries=2 \
CMD python -c "import requests; requests.get('http://localhost:5000/docs')" || exit 1
# Lệnh chạy server với hot reload
# ⚡ Hot reload - tự động restart khi code thay đổi (cho local dev)
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "5000", "--reload"]
CMD ["gunicorn", "--workers", "4", "--worker-class", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:5000", "--timeout", "120", "--reload", "server:app"]
......@@ -90,16 +90,12 @@ async def data_retrieval_tool(searches: list[SearchItem]) -> str:
"""
logger.info("🔧 [DEBUG] data_retrieval_tool STARTED")
try:
logger.info("🔧 [DEBUG] Getting DB connection (singleton)")
db = get_db_connection()
logger.info("🔧 [DEBUG] DB connection retrieved successfully")
# 0. Log input parameters (Đúng ý bro)
# 0. Log input parameters
logger.info(f"📥 [Tool Input] data_retrieval_tool received {len(searches)} items:")
for idx, item in enumerate(searches):
logger.info(f" 🔹 Item [{idx}]: {item.dict(exclude_none=True)}")
# 1. 🚀 BATCH EMBEDDING: Gom toàn bộ query để gọi OpenAI 1 lần duy nhất (Theo chuẩn bro gửi)
# 1. 🚀 BATCH EMBEDDING: Làm TRƯỚC để tận dụng async (không block)
queries_to_embed = [s.query for s in searches if s.query]
all_vectors = []
if queries_to_embed:
......@@ -108,7 +104,11 @@ async def data_retrieval_tool(searches: list[SearchItem]) -> str:
all_vectors = await create_embeddings_async(queries_to_embed)
logger.info(f"⏱️ [TIMER] Total Batch Embedding Time: {(time.time() - emb_batch_start) * 1000:.2f}ms")
# 2. Tạo tasks chạy song song (Parallel Search)
# 2. Get DB connection (singleton - rất nhanh)
logger.info("🔧 [DEBUG] Getting DB connection (singleton)")
db = get_db_connection()
logger.info("🔧 [DEBUG] DB connection retrieved successfully")
logger.info("🔧 [DEBUG] Creating parallel tasks")
tasks = []
vector_idx = 0
......
import asyncio
import json
import logging
import random
......@@ -12,6 +13,20 @@ logger = logging.getLogger(__name__)
router = APIRouter()
# --- HELPERS ---
async def retry_with_backoff(coro_fn, max_retries=3, backoff_factor=2):
"""Retry async function with exponential backoff"""
for attempt in range(max_retries):
try:
return await coro_fn()
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = backoff_factor**attempt
logger.warning(f"⚠️ Attempt {attempt + 1} failed: {e!s}, retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
# --- MODELS ---
class MockQueryRequest(BaseModel):
user_query: str
......@@ -69,7 +84,7 @@ async def mock_chat(req: MockQueryRequest, background_tasks: BackgroundTasks):
query=req.user_query, magento_ref_code=None, price_min=None, price_max=None, action="search"
)
result_json = await data_retrieval_tool([search_item])
result_json = await data_retrieval_tool.ainvoke({"searches": [search_item]})
result = json.loads(result_json)
search_results = result.get("results", [{}])[0]
products = search_results.get("products", [])
......@@ -131,8 +146,10 @@ async def mock_db_search(req: MockDBRequest):
logger.info(f"🔧 Search params: {search_item.dict(exclude_none=True)}")
# Gọi data_retrieval_tool THẬT
result_json = await data_retrieval_tool([search_item])
# Gọi data_retrieval_tool THẬT với retry
result_json = await retry_with_backoff(
lambda: data_retrieval_tool.ainvoke({"searches": [search_item]}), max_retries=3
)
result = json.loads(result_json)
elapsed_time = time.time() - start_time
......@@ -178,8 +195,10 @@ async def mock_retriever_db(req: MockRetrieverRequest):
logger.info(f"🔧 Retriever params: {search_item.dict(exclude_none=True)}")
# Gọi data_retrieval_tool THẬT (embedding + vector search)
result_json = await data_retrieval_tool([search_item])
# Gọi data_retrieval_tool THẬT (embedding + vector search) với retry
result_json = await retry_with_backoff(
lambda: data_retrieval_tool.ainvoke({"searches": [search_item]}), max_retries=3
)
result = json.loads(result_json)
elapsed_time = time.time() - start_time
......
"""
Script tự động thêm context (tên bảng + subsection) vào tất cả size entries trong tonghop.txt
Ví dụ: "Size 92 (2Y):" -> "Size 92 (2Y) - BẢNG SIZE CHUNG CHO UNISEX - TRẺ EM (Dải size lẻ):"
"""
import re
def add_context_to_sizes(input_file, output_file):
with open(input_file, encoding="utf-8") as f:
lines = f.readlines()
result = []
current_table = None # Tên bảng hiện tại
current_subsection = None # Subsection hiện tại (Dải size lẻ, chẵn...)
for line in lines:
stripped = line.strip()
# Phát hiện header bảng (bắt đầu bằng BẢNG hoặc QUẦN)
if stripped.startswith("BẢNG SIZE") or stripped.startswith("QUẦN"):
current_table = stripped
current_subsection = None # Reset subsection khi sang bảng mới
result.append(line)
continue
# Phát hiện subsection (Dải size lẻ, Dải size chẵn)
if "Dải size" in stripped or stripped.startswith("Dải"):
current_subsection = stripped.rstrip(":")
result.append(line)
continue
# Phát hiện dòng Size (bắt mọi pattern: Size XS:, Size 92 (2Y):, Size 26 (XS):)
size_match = re.match(r"^(Size\s+[A-Z0-9]+(?:\s*\([^)]+\))?):(.*)$", stripped)
if size_match and current_table:
size_part = size_match.group(1) # "Size 92 (2Y)" hoặc "Size XS"
rest = size_match.group(2) # Phần còn lại sau dấu :
# Xây dựng context
context_parts = [current_table]
if current_subsection:
context_parts.append(f"({current_subsection})")
context = " - ".join(context_parts)
# Tạo dòng mới với context
new_line = f"{size_part} - {context}:{rest}\n"
result.append(new_line)
continue
# Giữ nguyên các dòng khác
result.append(line)
# Ghi file output
with open(output_file, "w", encoding="utf-8") as f:
f.writelines(result)
print("✅ Đã thêm context vào tất cả size entries!")
print(f"📝 File output: {output_file}")
if __name__ == "__main__":
input_path = r"d:\cnf\chatbot_canifa\backend\datadb\tonghop.txt"
output_path = r"d:\cnf\chatbot_canifa\backend\datadb\tonghop_with_context.txt"
add_context_to_sizes(input_path, output_path)
print("\n🔍 Preview 10 dòng đầu của file mới:")
with open(output_path, encoding="utf-8") as f:
for i, line in enumerate(f):
if i >= 1160 and i < 1170: # Vùng có size entries
print(line.rstrip())
import os
import json
import pymysql
from openai import OpenAI
import time
# ==========================================
# 🔐 HARD KEY CONFIGURATION (As requested)
# ==========================================
OPENAI_API_KEY = "sk-proj-srJ3l3B5q1CzRezXAnaewbbRfuWzIjYHbcAdggzsa4MmtXEHaIwS1OTkMgLpMDikgh"
SR_HOST = "172.16.2.100"
SR_PORT = 9030
SR_USER = "anhvh"
SR_PASS = "v0WYGeyLRCckXotT"
SR_DB = "shared_source"
# Parameter
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50
EMBEDDING_MODEL = "text-embedding-3-small" # 1536 dimensions
client = OpenAI(api_key=OPENAI_API_KEY)
def get_embedding(text):
"""Lấy vector 1536 chiều từ OpenAI"""
try:
text = text.replace("\n", " ")
return client.embeddings.create(input=[text], model=EMBEDDING_MODEL).data[0].embedding
except Exception as e:
print(f"❌ Lỗi Embedding: {e}")
return None
def connect_starrocks():
return pymysql.connect(
host=SR_HOST,
port=SR_PORT,
user=SR_USER,
password=SR_PASS,
database=SR_DB,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
def chunk_text(text, size=CHUNK_SIZE, overlap=CHUNK_OVERLAP):
"""Chia nhỏ văn bản với overlap"""
chunks = []
start = 0
while start < len(text):
end = start + size
chunks.append(text[start:end])
start += size - overlap
return chunks
def ingest():
input_file = r"d:\cnf\chatbot_canifa\backend\datadb\tonghop.txt"
if not os.path.exists(input_file):
print(f"❌ Không tìm thấy file: {input_file}")
return
print(f"📖 Đang đọc file {input_file}...")
with open(input_file, "r", encoding="utf-8") as f:
full_content = f.read()
# Tách dữ liệu theo từng FILE giả định trong tonghop.txt
sections = full_content.split("================================================================================")
db = connect_starrocks()
cursor = db.cursor()
total_chunks = 0
record_id = int(time.time()) # Làm ID cơ bản
for section in sections:
if not section.strip(): continue
# Lấy tiêu đề file nếu có
lines = section.strip().split("\n")
title = "Canifa Knowledge"
if "FILE:" in lines[0]:
title = lines[0].replace("FILE:", "").strip()
content = "\n".join(lines[1:])
else:
content = section
print(f"🚀 Đang xử lý section: {title}")
chunks = chunk_text(content)
for i, chunk in enumerate(chunks):
if len(chunk.strip()) < 20: continue # Bỏ qua đoạn quá ngắn
vector = get_embedding(chunk)
if not vector: continue
metadata = {
"title": title,
"chunk_idx": i,
"source": "tonghop.txt",
"timestamp": time.time()
}
sql = "INSERT INTO shared_source.canifa_knowledge (id, content, metadata, embedding) VALUES (%s, %s, %s, %s)"
try:
cursor.execute(sql, (record_id, chunk, json.dumps(metadata, ensure_ascii=False), str(vector)))
record_id += 1
total_chunks += 1
if total_chunks % 10 == 0:
db.commit()
print(f"✅ Đã nạp {total_chunks} chunks...")
except Exception as e:
print(f"❌ Lỗi SQL: {e}")
db.commit()
db.close()
print(f"🎊 HOÀN THÀNH! Tổng cộng đã nạp {total_chunks} vào StarRocks.")
if __name__ == "__main__":
ingest()
This source diff could not be displayed because it is too large. You can view the blob instead.
This diff is collapsed.
This diff is collapsed.
"""
StarRocks Database Connection Utility
Based on chatbot-rsa pattern
"""
import asyncio
import logging
from typing import Any
import aiomysql
import pymysql
from pymysql.cursors import DictCursor
from config import (
STARROCKS_DB,
STARROCKS_HOST,
STARROCKS_PASSWORD,
STARROCKS_PORT,
STARROCKS_USER,
)
logger = logging.getLogger(__name__)
__all__ = ["StarRocksConnection", "get_db_connection"]
class StarRocksConnectionManager:
"""
Singleton Class quản lý StarRocks Connection.
"""
def __init__(self):
self._connection: StarRocksConnection | None = None
def get_connection(self) -> "StarRocksConnection":
"""Lazy loading connection"""
if self._connection is None:
logger.info("🔧 [LAZY LOADING] Creating StarRocksConnection instance (first time)")
self._connection = StarRocksConnection()
return self._connection
# --- Singleton ---
_manager = StarRocksConnectionManager()
get_db_connection = _manager.get_connection
class StarRocksConnection:
# Shared connection (Singleton-like behavior) for all instances
_shared_conn = None
def __init__(
self,
host: str | None = None,
database: str | None = None,
user: str | None = None,
password: str | None = None,
port: int | None = None,
):
self.host = host or STARROCKS_HOST
self.database = database or STARROCKS_DB
self.user = user or STARROCKS_USER
self.password = password or STARROCKS_PASSWORD
self.port = port or STARROCKS_PORT
# self.conn references the shared connection
self.conn = None
logger.info(f"✅ StarRocksConnection initialized: {self.host}:{self.port}")
def connect(self):
"""
Establish or reuse persistent connection.
"""
# 1. Try to reuse existing shared connection
if StarRocksConnection._shared_conn and StarRocksConnection._shared_conn.open:
try:
# Ping to check if alive, reconnect if needed
StarRocksConnection._shared_conn.ping(reconnect=True)
self.conn = StarRocksConnection._shared_conn
return self.conn
except Exception as e:
logger.warning(f"⚠️ Connection lost, reconnecting: {e}")
StarRocksConnection._shared_conn = None
# 2. Create new connection if needed
print(f" [DB] 🔌 Đang kết nối StarRocks (New Session): {self.host}:{self.port}...")
logger.info(f"🔌 Connecting to StarRocks at {self.host}:{self.port} (DB: {self.database})...")
try:
new_conn = pymysql.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
charset="utf8mb4",
cursorclass=DictCursor,
connect_timeout=10,
read_timeout=30,
write_timeout=30,
)
print(" [DB] ✅ Kết nối thành công.")
logger.info("✅ Connected to StarRocks")
# Save to class variable
StarRocksConnection._shared_conn = new_conn
self.conn = new_conn
except Exception as e:
print(f" [DB] ❌ Lỗi kết nối: {e!s}")
logger.error(f"❌ Failed to connect to StarRocks: {e}")
raise
return self.conn
def execute_query(self, query: str, params: tuple | None = None) -> list[dict[str, Any]]:
# print(" [DB] 🚀 Bắt đầu truy vấn dữ liệu...")
# (Reduced noise in logs)
logger.info("🚀 Executing StarRocks Query (Persistent Conn).")
conn = self.connect()
try:
with conn.cursor() as cursor:
cursor.execute(query, params)
results = cursor.fetchall()
print(f" [DB] ✅ Truy vấn xong. Lấy được {len(results)} dòng.")
logger.info(f"📊 Query successful, returned {len(results)} rows")
return [dict(row) for row in results]
except Exception as e:
print(f" [DB] ❌ Lỗi truy vấn: {e!s}")
logger.error(f"❌ StarRocks query error: {e}")
# Incase of query error due to connection, invalidate it
StarRocksConnection._shared_conn = None
raise
# FINALLY BLOCK REMOVED: Do NOT close connection
# Async pool shared
_shared_pool = None
_pool_lock = asyncio.Lock()
@classmethod
async def clear_pool(cls):
"""Clear and close existing pool (force recreate fresh connections)"""
async with cls._pool_lock:
if cls._shared_pool is not None:
logger.warning("🔄 Clearing StarRocks connection pool...")
cls._shared_pool.close()
await cls._shared_pool.wait_closed()
cls._shared_pool = None
logger.info("✅ Pool cleared successfully")
async def get_pool(self):
"""
Get or create shared async connection pool (Thread-safe singleton)
"""
if StarRocksConnection._shared_pool is None:
async with StarRocksConnection._pool_lock:
# Double-check inside lock to prevent multiple pools
if StarRocksConnection._shared_pool is None:
logger.info(f"🔌 Creating Async Pool to {self.host}:{self.port}...")
StarRocksConnection._shared_pool = await aiomysql.create_pool(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
db=self.database,
charset="utf8mb4",
cursorclass=aiomysql.DictCursor,
minsize=5, # ← Từ 10 → 5
maxsize=30, # ← Từ 200 → 30 (QUAN TRỌNG!)
connect_timeout=10,
pool_recycle=1800, # ← Từ 3600 → 1800 (30 phút)
autocommit=True,
)
return StarRocksConnection._shared_pool
async def execute_query_async(self, query: str, params: tuple | None = None) -> list[dict[str, Any]]:
"""
Execute query asynchronously using aiomysql pool with Retry Logic.
"""
max_retries = 3
last_error = None
for attempt in range(max_retries):
try:
pool = await self.get_pool()
# logger.info(f"🚀 Executing Async Query (Attempt {attempt+1}).")
# Tăng timeout lên 30s cho load test với 300 users
conn = await asyncio.wait_for(pool.acquire(), timeout=30)
try:
async with conn.cursor() as cursor:
await cursor.execute(query, params)
results = await cursor.fetchall()
# logger.info(f"📊 Async Query successful, returned {len(results)} rows")
return [dict(row) for row in results]
finally:
pool.release(conn)
except TimeoutError as e:
last_error = e
logger.warning(f"⏱️ Pool acquire timeout (Attempt {attempt + 1}/{max_retries})")
# Timeout khi lấy connection → pool đầy, chờ rồi thử lại
await asyncio.sleep(0.2 * (attempt + 1))
continue
except ConnectionAbortedError as e:
last_error = e
logger.warning(f"🔌 Connection aborted (Attempt {attempt + 1}/{max_retries}): {e}")
# Connection bị abort → clear pool và thử lại với fresh connections
if attempt < max_retries - 1:
await StarRocksConnection.clear_pool()
await asyncio.sleep(0.3)
continue
except Exception as e:
last_error = e
logger.warning(f"⚠️ StarRocks DB Error (Attempt {attempt + 1}/{max_retries}): {e}")
if "Memory of process exceed limit" in str(e):
# Nếu StarRocks OOM, đợi một chút rồi thử lại
await asyncio.sleep(0.5 * (attempt + 1))
continue
if "Disconnected" in str(e) or "Lost connection" in str(e) or "aborted" in str(e).lower():
# Nếu mất kết nối, clear pool và thử lại
if attempt < max_retries - 1:
await StarRocksConnection.clear_pool()
await asyncio.sleep(0.3)
continue
# Các lỗi khác (cú pháp,...) thì raise luôn
raise
logger.error(f"❌ Failed after {max_retries} attempts: {last_error}")
raise last_error
def close(self):
"""Explicitly close if needed (e.g. app shutdown)"""
if StarRocksConnection._shared_conn and StarRocksConnection._shared_conn.open:
StarRocksConnection._shared_conn.close()
StarRocksConnection._shared_conn = None
self.conn = None
......@@ -136,9 +136,21 @@ class StarRocksConnection:
_shared_pool = None
_pool_lock = asyncio.Lock()
@classmethod
async def clear_pool(cls):
"""Clear and close existing pool (force recreate fresh connections)"""
async with cls._pool_lock:
if cls._shared_pool is not None:
logger.warning("🔄 Clearing StarRocks connection pool...")
cls._shared_pool.close()
await cls._shared_pool.wait_closed()
cls._shared_pool = None
logger.info("✅ Pool cleared successfully")
async def get_pool(self):
"""
Get or create shared async connection pool (Thread-safe singleton)
Optimized for cosine similarity queries (~200ms)
"""
if StarRocksConnection._shared_pool is None:
async with StarRocksConnection._pool_lock:
......@@ -153,15 +165,19 @@ class StarRocksConnection:
db=self.database,
charset="utf8mb4",
cursorclass=aiomysql.DictCursor,
minsize=10, # Sẵn sàng 10 kết nối ngay lập tức (Cực nhanh cho Prod)
maxsize=50, # Tăng nhẹ lên 50 (Cân bằng giữa throughput và memory)
connect_timeout=10,
minsize=10, # Sẵn sàng 10 connections cho query nặng
maxsize=80, # Đủ cho 300 users với query 200ms
connect_timeout=15, # Tăng timeout kết nối
pool_recycle=3600, # Recycle sau 1h
autocommit=True,
)
logger.info("✅ Pool created successfully")
return StarRocksConnection._shared_pool
async def execute_query_async(self, query: str, params: tuple | None = None) -> list[dict[str, Any]]:
"""
Execute query asynchronously using aiomysql pool with Retry Logic.
Optimized for heavy queries (cosine similarity ~200ms)
"""
max_retries = 3
last_error = None
......@@ -171,22 +187,49 @@ class StarRocksConnection:
pool = await self.get_pool()
# logger.info(f"🚀 Executing Async Query (Attempt {attempt+1}).")
async with pool.acquire() as conn, conn.cursor() as cursor:
await cursor.execute(query, params)
results = await cursor.fetchall()
# logger.info(f"📊 Async Query successful, returned {len(results)} rows")
return [dict(row) for row in results]
# Tăng timeout lên 90s cho query nặng (cosine similarity)
conn = await asyncio.wait_for(pool.acquire(), timeout=90)
try:
async with conn.cursor() as cursor:
await cursor.execute(query, params)
results = await cursor.fetchall()
# logger.info(f"📊 Async Query successful, returned {len(results)} rows")
return [dict(row) for row in results]
finally:
pool.release(conn)
except TimeoutError as e:
last_error = e
logger.warning(f"⏱️ Pool acquire timeout (Attempt {attempt + 1}/{max_retries})")
# Timeout khi lấy connection → pool đầy, chờ lâu hơn
await asyncio.sleep(0.5 * (attempt + 1))
continue
except ConnectionAbortedError as e:
last_error = e
logger.warning(f"🔌 Connection aborted (Attempt {attempt + 1}/{max_retries}): {e}")
# Connection bị abort → clear pool và thử lại với fresh connections
if attempt < max_retries - 1:
await StarRocksConnection.clear_pool()
await asyncio.sleep(0.5)
continue
except Exception as e:
last_error = e
logger.warning(f"⚠️ StarRocks DB Error (Attempt {attempt + 1}/{max_retries}): {e}")
# StarRocks OOM → chờ lâu hơn
if "Memory of process exceed limit" in str(e):
# Nếu StarRocks OOM, đợi một chút rồi thử lại
await asyncio.sleep(0.5 * (attempt + 1))
await asyncio.sleep(1.0 * (attempt + 1))
continue
if "Disconnected" in str(e) or "Lost connection" in str(e):
# Nếu mất kết nối, có thể pool bị stale, thử lại ngay
# Connection issues → clear pool và retry
if "Disconnected" in str(e) or "Lost connection" in str(e) or "aborted" in str(e).lower():
if attempt < max_retries - 1:
await StarRocksConnection.clear_pool()
await asyncio.sleep(0.5)
continue
# Các lỗi khác (cú pháp,...) thì raise luôn
raise
......
......@@ -128,3 +128,4 @@ zipp==3.23.0
zope.event==6.1
zope.interface==8.1.1
zstandard==0.25.0
gunicorn==23.0.0
......@@ -6,3 +6,8 @@ uvicorn server:app --host 0.0.0.0 --port 5000 --reload
uvicorn server:app --host 0.0.0.0 --port 5000
docker restart chatbot-backend
docker restart chatbot-backend && docker logs -f chatbot-backend
docker restart chatbot-backend
\ No newline at end of file
......@@ -105,6 +105,6 @@ if __name__ == "__main__":
host="0.0.0.0",
port=PORT,
reload=ENABLE_RELOAD,
reload_dirs=reload_dirs,
# reload_dirs=reload_dirs,
log_level="info",
)
# check_server.py
import pymysql
conn = pymysql.connect(
host="172.16.2.100", port=9030, user="anhvh", password="v0WYGeyLRCckXotT", database="shared_source"
)
cursor = conn.cursor()
# Check max connections
cursor.execute("SHOW VARIABLES LIKE 'max_connections'")
print("Max Connections:", cursor.fetchone())
# Check current connections
cursor.execute("SHOW PROCESSLIST")
processes = cursor.fetchall()
print(f"Current Active Connections: {len(processes)}")
# Check slow queries
cursor.execute("SHOW VARIABLES LIKE 'long_query_time'")
print("Slow Query Threshold:", cursor.fetchone())
conn.close()
# quick_test.py
import time
import pymysql
def quick_test():
print("🔍 Testing StarRocks connection...")
print("=" * 50)
print("\n📡 MySQL Connection Latency Test:")
latencies = []
for i in range(10):
start = time.time()
try:
conn = pymysql.connect(
host="172.16.2.100",
port=9030,
user="anhvh",
password="v0WYGeyLRCckXotT",
database="shared_source",
connect_timeout=10
)
latency = (time.time() - start) * 1000
latencies.append(latency)
print(f" ✅ Attempt {i+1}: {latency:.2f}ms")
# Lần đầu tiên thì check connection limits
if i == 0:
cursor = conn.cursor()
cursor.execute("SHOW VARIABLES LIKE 'max_connections'")
max_conn = cursor.fetchone()
cursor.execute("SHOW STATUS LIKE 'Threads_connected'")
current = cursor.fetchone()
cursor.execute("SHOW STATUS LIKE 'Max_used_connections'")
max_used = cursor.fetchone()
print(f"\n🔌 Connection Limits:")
print(f" Max Connections: {max_conn[1] if max_conn else 'N/A'}")
print(f" Current Active: {current[1] if current else 'N/A'}")
print(f" Peak Ever Used: {max_used[1] if max_used else 'N/A'}")
# Tính % usage
if max_conn and current:
usage = (int(current[1]) / int(max_conn[1])) * 100
print(f" Usage: {usage:.1f}%")
if usage > 80:
print(" ⚠️ WARNING: Connection pool > 80% full!")
conn.close()
except Exception as e:
print(f" ❌ Attempt {i+1} Failed: {e}")
time.sleep(0.3)
if latencies:
print(f"\n📊 Summary:")
print(f" Average: {sum(latencies)/len(latencies):.2f}ms")
print(f" Min: {min(latencies):.2f}ms")
print(f" Max: {max(latencies):.2f}ms")
if __name__ == "__main__":
quick_test()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment