Commit 014198f0 authored by Vũ Hoàng Anh's avatar Vũ Hoàng Anh

feat(outfit-db): add ai_outfit_set + ai_outfit_items tables with dual-backend...

feat(outfit-db): add ai_outfit_set + ai_outfit_items tables with dual-backend support (Postgres/SQLite)
parent a67917c3
"""
outfit_db.py — AI Outfit Storage Layer
────────────────────────────────────────────────────────────
Manages 2 tables:
• dashboard_canifa.ai_outfit_set (1 row per source_code + occasion_tag)
• dashboard_canifa.ai_outfit_items (N rows per set: target_code, role, rank, score)
Backend switching via env:
USE_LOCAL_SQLITE=true → SQLite (local dev / home)
USE_LOCAL_SQLITE=false → Postgres (company / production)
Usage:
from common.outfit_db import OutfitDB
# Lưu outfit từ engine
OutfitDB.save_outfit("8TS25S012", "di_choi", items=[
{"target_code": "8BJ25A002", "role": "bottom", "rank": 1, "score": 87, "reason": "..."},
{"target_code": "8JK25A001", "role": "outer", "rank": 1, "score": 80, "reason": "..."},
])
# Đọc outfit cho FE
data = OutfitDB.get_outfit("8TS25S012")
# → {"di_choi": {"bottom": [...], "outer": [...]}, "di_lam": {...}}
# Stylist pin 1 item
OutfitDB.pin_item(item_id=42, is_pinned=True)
"""
import json
import logging
import os
import sqlite3
from typing import Optional
logger = logging.getLogger(__name__)
SCHEMA = "dashboard_canifa"
SET_TABLE = f"{SCHEMA}.ai_outfit_set"
ITEMS_TABLE = f"{SCHEMA}.ai_outfit_items"
USE_SQLITE = os.getenv("USE_LOCAL_SQLITE", "false").strip().lower() == "true"
SQLITE_PATH = os.getenv(
"SQLITE_PATH",
os.path.join(os.path.dirname(__file__), "..", "database", "canifa_ai_dump.sqlite")
)
SQLITE_PATH = os.path.abspath(SQLITE_PATH)
# Alias bảng cho SQLite (không có schema prefix)
_SET = "ai_outfit_set" if USE_SQLITE else SET_TABLE
_ITEMS = "ai_outfit_items" if USE_SQLITE else ITEMS_TABLE
__all__ = ["OutfitDB"]
# ─── Connection helpers ────────────────────────────────────────────────────────
def _get_conn():
"""Return a DB connection (SQLite hoặc Postgres tuỳ env)."""
if USE_SQLITE:
conn = sqlite3.connect(SQLITE_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
return conn
else:
from common.pool_wrapper import get_pooled_connection_compat
return get_pooled_connection_compat()
def _fetchall_as_dicts(cur) -> list[dict]:
"""Normalize cursor rows to list[dict] for both SQLite & Postgres."""
rows = cur.fetchall()
if not rows:
return []
if isinstance(rows[0], dict):
return [dict(r) for r in rows]
cols = [d[0] for d in cur.description]
return [dict(zip(cols, r)) for r in rows]
# ─── DDL ──────────────────────────────────────────────────────────────────────
_SQLITE_DDL = """
CREATE TABLE IF NOT EXISTS ai_outfit_set (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_code TEXT NOT NULL,
occasion_tag TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now')),
UNIQUE (source_code, occasion_tag)
);
CREATE INDEX IF NOT EXISTS idx_outfit_set_source ON ai_outfit_set(source_code);
CREATE TABLE IF NOT EXISTS ai_outfit_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
outfit_id INTEGER NOT NULL REFERENCES ai_outfit_set(id) ON DELETE CASCADE,
target_code TEXT NOT NULL,
role TEXT NOT NULL,
rank INTEGER NOT NULL DEFAULT 1,
score INTEGER DEFAULT 0,
reason TEXT DEFAULT '',
is_pinned INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_outfit_items_outfit_id ON ai_outfit_items(outfit_id);
CREATE INDEX IF NOT EXISTS idx_outfit_items_target ON ai_outfit_items(target_code);
"""
_POSTGRES_DDL = f"""
CREATE SCHEMA IF NOT EXISTS {SCHEMA};
CREATE TABLE IF NOT EXISTS {SET_TABLE} (
id SERIAL PRIMARY KEY,
source_code VARCHAR(50) NOT NULL,
occasion_tag VARCHAR(50) NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (source_code, occasion_tag)
);
CREATE INDEX IF NOT EXISTS idx_outfit_set_source ON {SET_TABLE}(source_code);
CREATE TABLE IF NOT EXISTS {ITEMS_TABLE} (
id SERIAL PRIMARY KEY,
outfit_id INT NOT NULL REFERENCES {SET_TABLE}(id) ON DELETE CASCADE,
target_code VARCHAR(50) NOT NULL,
role VARCHAR(30) NOT NULL,
rank SMALLINT NOT NULL DEFAULT 1,
score SMALLINT DEFAULT 0,
reason TEXT DEFAULT '',
is_pinned BOOLEAN DEFAULT FALSE,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_outfit_items_outfit_id ON {ITEMS_TABLE}(outfit_id);
CREATE INDEX IF NOT EXISTS idx_outfit_items_target ON {ITEMS_TABLE}(target_code);
"""
# ─── Main class ───────────────────────────────────────────────────────────────
class OutfitDB:
"""Static helper class — manages ai_outfit_set + ai_outfit_items."""
_initialized: bool = False
@classmethod
def ensure_tables(cls) -> None:
"""Create tables if they don't exist (idempotent)."""
if cls._initialized:
return
conn = None
try:
conn = _get_conn()
if USE_SQLITE:
conn.executescript(_SQLITE_DDL)
conn.commit()
else:
cur = conn.cursor()
for stmt in _POSTGRES_DDL.split(";"):
stmt = stmt.strip()
if stmt:
cur.execute(stmt)
conn.commit()
cur.close()
cls._initialized = True
backend = "SQLite" if USE_SQLITE else "Postgres"
logger.info("[OutfitDB] ✅ Tables ready (%s): %s + %s", backend, _SET, _ITEMS)
except Exception as e:
logger.error("[OutfitDB] ensure_tables error: %s", e)
finally:
if conn:
conn.close()
# ── Write ──────────────────────────────────────────────────────────────────
@classmethod
def save_outfit(
cls,
source_code: str,
occasion_tag: str,
items: list[dict],
) -> Optional[int]:
"""
Upsert 1 outfit set và replace toàn bộ items của nó.
items: list of dicts with keys:
target_code (str), role (str), rank (int), score (int), reason (str)
Returns: outfit_id hoặc None nếu lỗi.
"""
cls.ensure_tables()
conn = None
try:
conn = _get_conn()
cur = conn.cursor()
# 1. Upsert ai_outfit_set → lấy id
if USE_SQLITE:
cur.execute(
f"""INSERT INTO {_SET} (source_code, occasion_tag, updated_at)
VALUES (?, ?, datetime('now'))
ON CONFLICT(source_code, occasion_tag)
DO UPDATE SET updated_at = datetime('now')""",
(source_code, occasion_tag),
)
cur.execute(
f"SELECT id FROM {_SET} WHERE source_code = ? AND occasion_tag = ?",
(source_code, occasion_tag),
)
else:
cur.execute(
f"""INSERT INTO {_SET} (source_code, occasion_tag, updated_at)
VALUES (%s, %s, NOW())
ON CONFLICT(source_code, occasion_tag)
DO UPDATE SET updated_at = NOW()
RETURNING id""",
(source_code, occasion_tag),
)
if cur.rowcount == 0 or cur.description is None:
# ON CONFLICT did not return — fetch manually
cur.execute(
f"SELECT id FROM {_SET} WHERE source_code = %s AND occasion_tag = %s",
(source_code, occasion_tag),
)
row = cur.fetchone()
outfit_id = row[0] if row else None
if not outfit_id:
logger.error("[OutfitDB] Could not get outfit_id for %s/%s", source_code, occasion_tag)
return None
# 2. Delete old items (non-pinned)
if USE_SQLITE:
cur.execute(f"DELETE FROM {_ITEMS} WHERE outfit_id = ? AND is_pinned = 0", (outfit_id,))
else:
cur.execute(f"DELETE FROM {_ITEMS} WHERE outfit_id = %s AND is_pinned = FALSE", (outfit_id,))
# 3. Insert new items
for item in items:
if USE_SQLITE:
cur.execute(
f"""INSERT INTO {_ITEMS} (outfit_id, target_code, role, rank, score, reason, is_pinned)
VALUES (?, ?, ?, ?, ?, ?, 0)""",
(outfit_id, item["target_code"], item["role"],
item.get("rank", 1), item.get("score", 0), item.get("reason", "")),
)
else:
cur.execute(
f"""INSERT INTO {_ITEMS} (outfit_id, target_code, role, rank, score, reason, is_pinned)
VALUES (%s, %s, %s, %s, %s, %s, FALSE)""",
(outfit_id, item["target_code"], item["role"],
item.get("rank", 1), item.get("score", 0), item.get("reason", "")),
)
conn.commit()
if not USE_SQLITE:
cur.close()
return outfit_id
except Exception as e:
if conn:
conn.rollback()
logger.error("[OutfitDB] save_outfit error %s/%s: %s", source_code, occasion_tag, e)
return None
finally:
if conn:
conn.close()
# ── Read ───────────────────────────────────────────────────────────────────
@classmethod
def get_outfit(cls, source_code: str) -> dict:
"""
Lấy toàn bộ outfit cho 1 sản phẩm.
Returns:
{
"di_choi": {
"bottom": [{"target_code": ..., "role": ..., "rank": ..., "score": ..., "reason": ..., "is_pinned": ...}],
"outer": [...],
},
"di_lam": {...},
}
"""
cls.ensure_tables()
conn = None
try:
conn = _get_conn()
cur = conn.cursor()
if USE_SQLITE:
cur.execute(
f"""SELECT s.occasion_tag, i.target_code, i.role, i.rank, i.score, i.reason, i.is_pinned, i.id
FROM {_SET} s
JOIN {_ITEMS} i ON i.outfit_id = s.id
WHERE s.source_code = ?
ORDER BY s.occasion_tag, i.role, i.rank""",
(source_code,),
)
else:
cur.execute(
f"""SELECT s.occasion_tag, i.target_code, i.role, i.rank, i.score, i.reason, i.is_pinned, i.id
FROM {_SET} s
JOIN {_ITEMS} i ON i.outfit_id = s.id
WHERE s.source_code = %s
ORDER BY s.occasion_tag, i.role, i.rank""",
(source_code,),
)
rows = cur.fetchall()
if not USE_SQLITE:
cur.close()
result: dict[str, dict[str, list]] = {}
for row in rows:
occ, tgt, role, rank, score, reason, is_pinned, item_id = row
if occ not in result:
result[occ] = {}
if role not in result[occ]:
result[occ][role] = []
result[occ][role].append({
"id": item_id,
"target_code": tgt,
"role": role,
"rank": rank,
"score": score,
"reason": reason or "",
"is_pinned": bool(is_pinned),
})
return result
except Exception as e:
logger.error("[OutfitDB] get_outfit error %s: %s", source_code, e)
return {}
finally:
if conn:
conn.close()
@classmethod
def get_outfit_by_occasion(cls, source_code: str, occasion_tag: str) -> dict[str, list]:
"""Lấy outfit cho 1 sản phẩm trong 1 dịp cụ thể. Returns {role: [items]}."""
full = cls.get_outfit(source_code)
return full.get(occasion_tag, {})
# ── Stylist Pin/Unpin ──────────────────────────────────────────────────────
@classmethod
def pin_item(cls, item_id: int, is_pinned: bool = True) -> bool:
"""Stylist ghim/bỏ ghim 1 item cụ thể."""
cls.ensure_tables()
conn = None
try:
conn = _get_conn()
cur = conn.cursor()
pin_val = 1 if (USE_SQLITE and is_pinned) else is_pinned
placeholder = "?" if USE_SQLITE else "%s"
cur.execute(
f"UPDATE {_ITEMS} SET is_pinned = {placeholder} WHERE id = {placeholder}",
(pin_val, item_id),
)
updated = cur.rowcount > 0
conn.commit()
if not USE_SQLITE:
cur.close()
return updated
except Exception as e:
logger.error("[OutfitDB] pin_item error id=%s: %s", item_id, e)
return False
finally:
if conn:
conn.close()
# ── Stats ──────────────────────────────────────────────────────────────────
@classmethod
def get_stats(cls) -> dict:
"""Số lượng set + items đang lưu."""
cls.ensure_tables()
conn = None
try:
conn = _get_conn()
cur = conn.cursor()
cur.execute(f"SELECT COUNT(*) FROM {_SET}")
sets_count = cur.fetchone()[0]
cur.execute(f"SELECT COUNT(*) FROM {_ITEMS}")
items_count = cur.fetchone()[0]
if not USE_SQLITE:
cur.close()
return {"outfit_sets": sets_count, "outfit_items": items_count}
except Exception as e:
logger.error("[OutfitDB] get_stats error: %s", e)
return {}
finally:
if conn:
conn.close()
# ─── Auto-init on import ──────────────────────────────────────────────────────
try:
OutfitDB.ensure_tables()
except Exception:
pass
"""
migrate_001_ai_outfit_tables.py
────────────────────────────────────────────────────────────
Migration: Tạo 2 bảng ai_outfit_set + ai_outfit_items
- Tự động chọn backend dựa theo env USE_LOCAL_SQLITE:
USE_LOCAL_SQLITE=true → SQLite (local dev / home)
USE_LOCAL_SQLITE=false → Postgres (company / production)
Cách chạy:
python backend/database/migrate/migrate_001_ai_outfit_tables.py
Hoặc được gọi tự động qua outfit_db.ensure_tables() khi server khởi động.
"""
import logging
import os
import sys
logger = logging.getLogger(__name__)
SCHEMA = "dashboard_canifa"
SET_TABLE = f"{SCHEMA}.ai_outfit_set"
ITEMS_TABLE = f"{SCHEMA}.ai_outfit_items"
USE_SQLITE = os.getenv("USE_LOCAL_SQLITE", "false").strip().lower() == "true"
# ─── SQLite DDL (không có schema prefix, không có TIMESTAMPTZ) ────────────────
SQLITE_DDL = """
CREATE TABLE IF NOT EXISTS ai_outfit_set (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_code TEXT NOT NULL,
occasion_tag TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now')),
UNIQUE (source_code, occasion_tag)
);
CREATE INDEX IF NOT EXISTS idx_outfit_set_source
ON ai_outfit_set(source_code);
CREATE TABLE IF NOT EXISTS ai_outfit_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
outfit_id INTEGER NOT NULL REFERENCES ai_outfit_set(id) ON DELETE CASCADE,
target_code TEXT NOT NULL,
role TEXT NOT NULL,
rank INTEGER NOT NULL DEFAULT 1,
score INTEGER DEFAULT 0,
reason TEXT DEFAULT '',
is_pinned INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_outfit_items_outfit_id
ON ai_outfit_items(outfit_id);
CREATE INDEX IF NOT EXISTS idx_outfit_items_target
ON ai_outfit_items(target_code);
"""
# ─── Postgres DDL ─────────────────────────────────────────────────────────────
POSTGRES_DDL = f"""
CREATE SCHEMA IF NOT EXISTS {SCHEMA};
CREATE TABLE IF NOT EXISTS {SET_TABLE} (
id SERIAL PRIMARY KEY,
source_code VARCHAR(50) NOT NULL,
occasion_tag VARCHAR(50) NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (source_code, occasion_tag)
);
CREATE INDEX IF NOT EXISTS idx_outfit_set_source
ON {SET_TABLE}(source_code);
CREATE TABLE IF NOT EXISTS {ITEMS_TABLE} (
id SERIAL PRIMARY KEY,
outfit_id INT NOT NULL REFERENCES {SET_TABLE}(id) ON DELETE CASCADE,
target_code VARCHAR(50) NOT NULL,
role VARCHAR(30) NOT NULL,
rank SMALLINT NOT NULL DEFAULT 1,
score SMALLINT DEFAULT 0,
reason TEXT DEFAULT '',
is_pinned BOOLEAN DEFAULT FALSE,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_outfit_items_outfit_id
ON {ITEMS_TABLE}(outfit_id);
CREATE INDEX IF NOT EXISTS idx_outfit_items_target
ON {ITEMS_TABLE}(target_code);
"""
def run_sqlite():
"""Tạo bảng trên SQLite local."""
sqlite_path = os.getenv(
"SQLITE_PATH",
os.path.join(os.path.dirname(__file__), "..", "canifa_ai_dump.sqlite")
)
sqlite_path = os.path.abspath(sqlite_path)
import sqlite3
conn = sqlite3.connect(sqlite_path)
try:
conn.executescript(SQLITE_DDL)
conn.commit()
logger.info("[migrate] ✅ SQLite: ai_outfit_set + ai_outfit_items created at %s", sqlite_path)
print(f"[OK] SQLite tables created: {sqlite_path}")
except Exception as e:
logger.error("[migrate] SQLite error: %s", e)
print(f"[ERROR] SQLite: {e}")
finally:
conn.close()
def run_postgres():
"""Tạo bảng trên Postgres qua pool_wrapper."""
# Thêm thư mục backend vào sys.path để import được common.*
backend_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
from common.pool_wrapper import get_pooled_connection_compat
conn = None
try:
conn = get_pooled_connection_compat()
cur = conn.cursor()
# Chạy từng statement riêng để tránh lỗi "can't execute multiple commands"
for stmt in POSTGRES_DDL.split(";"):
stmt = stmt.strip()
if stmt:
cur.execute(stmt)
conn.commit()
cur.close()
logger.info("[migrate] ✅ Postgres: ai_outfit_set + ai_outfit_items created")
print("[OK] Postgres tables created: dashboard_canifa.ai_outfit_set + dashboard_canifa.ai_outfit_items")
except Exception as e:
if conn:
conn.rollback()
logger.error("[migrate] Postgres error: %s", e)
print(f"[ERROR] Postgres: {e}")
finally:
if conn:
conn.close()
def run():
"""Entry point: tự chọn backend theo USE_LOCAL_SQLITE."""
logging.basicConfig(level=logging.INFO)
if USE_SQLITE:
print("[migrate] Mode: SQLite (USE_LOCAL_SQLITE=true)")
run_sqlite()
else:
print("[migrate] Mode: Postgres (USE_LOCAL_SQLITE=false)")
run_postgres()
if __name__ == "__main__":
run()
-- ============================================================
-- AI Outfit Tables — dashboard_canifa schema
-- Author: Antigravity / Canifa AI Stylist Engine
-- Created: 2026-04-20
-- ============================================================
-- Purpose:
-- Lưu trực tiếp kết quả gợi ý outfit theo từng sản phẩm nguồn + dịp mặc.
-- Thay thế việc scan 1738 sản phẩm lúc runtime bằng 1 SELECT đơn giản.
--
-- Quan hệ:
-- ai_outfit_set (1 SP nguồn + 1 dịp)
-- └─ ai_outfit_items (N sản phẩm gợi ý, mỗi item giữ role + rank + score)
-- ============================================================
CREATE SCHEMA IF NOT EXISTS dashboard_canifa;
-- Bảng 1: Nhóm outfit theo (source_code, occasion_tag)
CREATE TABLE IF NOT EXISTS dashboard_canifa.ai_outfit_set (
id SERIAL PRIMARY KEY,
source_code VARCHAR(50) NOT NULL, -- SKU sản phẩm đang xem (magento_ref_code)
occasion_tag VARCHAR(50) NOT NULL, -- di_lam | di_choi | mac_nha | du_lich | the_thao
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (source_code, occasion_tag)
);
CREATE INDEX IF NOT EXISTS idx_outfit_set_source
ON dashboard_canifa.ai_outfit_set(source_code);
-- Bảng 2: Chi tiết từng sản phẩm trong outfit
CREATE TABLE IF NOT EXISTS dashboard_canifa.ai_outfit_items (
id SERIAL PRIMARY KEY,
outfit_id INT NOT NULL REFERENCES dashboard_canifa.ai_outfit_set(id) ON DELETE CASCADE,
target_code VARCHAR(50) NOT NULL, -- SKU sản phẩm được gợi ý
role VARCHAR(30) NOT NULL, -- top | bottom | outer | shoes | accessory | bag
rank SMALLINT NOT NULL DEFAULT 1, -- thứ tự ưu tiên trong cùng role (1=tốt nhất)
score SMALLINT DEFAULT 0, -- 0-100
reason TEXT DEFAULT '',
is_pinned BOOLEAN DEFAULT FALSE, -- stylist tay ghim cứng
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_outfit_items_outfit_id
ON dashboard_canifa.ai_outfit_items(outfit_id);
CREATE INDEX IF NOT EXISTS idx_outfit_items_target
ON dashboard_canifa.ai_outfit_items(target_code);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment