Commit 80ba8458 authored by Hoanganhvu123's avatar Hoanganhvu123

feat: complete Phase 2 and Phase 3 (Local Vision) inside worktree

parent 915afed2
...@@ -2,20 +2,21 @@ import json ...@@ -2,20 +2,21 @@ import json
import logging import logging
from common.llm_factory import create_llm from common.llm_factory import create_llm
from config import DEFAULT_MODEL from config import DEFAULT_MODEL
from common.sqlite_db import sqlite_db
from common.constants import TABLE_FASHION_RULES
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def generate_optimizer_prompt(feedbacks: list, current_rules: str) -> str: def generate_optimizer_prompt(feedbacks: list, current_rules: str) -> str:
""" """
Hàm này tạo Prompt Module chuẩn để nhồi cho con Gemini. Tạo prompt phân tích Feedback xấu và đề xuất Rule mới.
Định nghĩa vai trò là Chuyên gia Fashion QC Agent.
""" """
feedback_text = "" feedback_text = ""
for idx, fb in enumerate(feedbacks): for idx, fb in enumerate(feedbacks):
query = fb.get('query', '') or fb.get('user_query', '') query = fb.get('query', '') or fb.get('user_query', '')
response = fb.get('ai_response', '') or fb.get('bot_suggestion', '') response = fb.get('ai_response', '') or fb.get('bot_suggestion', '')
note = fb.get('feedback_text', '') or fb.get('feedback_note', '') note = fb.get('feedback_text', '') or fb.get('feedback_note', '')
feedback_text += f"{idx+1}. User hỏi: '{query}' -> Bot trả lời: '{response}' -> User chửi: '{note}'\n" feedback_text += f"{idx+1}. User: '{query}' -> Bot: '{response}' -> Feedback chê: '{note}'\n"
prompt = f""" prompt = f"""
You are an expert AI Fashion Quality Assurance (QA) Agent. You are an expert AI Fashion Quality Assurance (QA) Agent.
...@@ -24,7 +25,7 @@ def generate_optimizer_prompt(feedbacks: list, current_rules: str) -> str: ...@@ -24,7 +25,7 @@ def generate_optimizer_prompt(feedbacks: list, current_rules: str) -> str:
=========== EXPLICIT INSTRUCTIONS =========== =========== EXPLICIT INSTRUCTIONS ===========
1. Check what the users are angry about in the feedback. 1. Check what the users are angry about in the feedback.
2. Review the CURRENT RULES. 2. Review the CURRENT RULES.
3. Output JSON with fields: 'rule_type' (string), 'new_rule' (string), 'reasoning' (string), 'confidence' (float 0.0-1.0). 3. Output JSON with fields: 'rule_type' (string, e.g., 'size_guide', 'policy', 'tone'), 'new_rule' (string), 'reasoning' (string), 'confidence' (float 0.0-1.0).
4. Provide the FINAL COMPLETE JSON EXACTLY. 4. Provide the FINAL COMPLETE JSON EXACTLY.
=========== BAD FEEDBACK LOGS =========== =========== BAD FEEDBACK LOGS ===========
...@@ -35,33 +36,53 @@ def generate_optimizer_prompt(feedbacks: list, current_rules: str) -> str: ...@@ -35,33 +36,53 @@ def generate_optimizer_prompt(feedbacks: list, current_rules: str) -> str:
""" """
return prompt return prompt
async def persist_new_rule(rule_data: dict) -> bool:
"""
Persistent Memory Hook: Lưu Rule mới học được vào Database.
Giống cơ chế Learning Loop của Hermes-Agent.
"""
try:
query = f"""
INSERT INTO {TABLE_FASHION_RULES} (rule_type, rule_content, created_by, status)
VALUES (?, ?, ?, 'active')
"""
rule_type = rule_data.get('rule_type', 'general')
rule_content = f"{rule_data.get('new_rule', '')} (Lý do: {rule_data.get('reasoning', '')})"
await sqlite_db.execute(query, (rule_type, rule_content, "feedback_learning_loop"))
logger.info(f"Đã lưu Rule mới vào não bộ: {rule_content[:50]}...")
return True
except Exception as e:
logger.error(f"Lỗi khi lưu Persistent Memory: {e}")
return False
async def run_optimization_logic(feedbacks: list, current_rules_str: str) -> str: async def run_optimization_logic(feedbacks: list, current_rules_str: str) -> str:
""" """
Lõi chạy suy luận trực tiếp bằng LLM. Lõi chạy suy luận trực tiếp bằng LLM và ghi nhớ.
""" """
logger.info("Chạy Agent Learning Loop phân tích Rules vs Feedbacks bằng LLM...") logger.info("Chạy Agent Learning Loop phân tích Rules vs Feedbacks bằng LLM...")
prompt = generate_optimizer_prompt(feedbacks, current_rules_str) prompt = generate_optimizer_prompt(feedbacks, current_rules_str)
llm = create_llm(DEFAULT_MODEL, json_mode=True) llm = create_llm(DEFAULT_MODEL, json_mode=True)
messages = [ messages = [{"role": "system", "content": "You output JSON strictly."}, {"role": "user", "content": prompt}]
{"role": "system", "content": "You output JSON strictly."},
{"role": "user", "content": prompt}
]
try: try:
response = await llm.ainvoke(messages) response = await llm.ainvoke(messages)
content = response.content content = response.content
# Remove any markdown formatting if present
if content.startswith("```json"):
content = content.replace("```json", "").replace("```", "").strip()
data = json.loads(content) data = json.loads(content)
# Validation # Validation
if len(data.get("new_rule", "")) < 10: if len(data.get("new_rule", "")) < 10 or data.get("confidence", 0.0) < 0.3:
logger.warning("Rejected rule from LLM: rule too short") logger.warning("Rejected rule from LLM: confidence too low or rule too short")
return "{}"
if data.get("confidence", 0.0) < 0.3:
logger.warning("Rejected rule from LLM: confidence too low")
return "{}" return "{}"
# Hook: Persistent Memory
await persist_new_rule(data)
return json.dumps(data, ensure_ascii=False, indent=4) return json.dumps(data, ensure_ascii=False, indent=4)
except Exception as e: except Exception as e:
logger.error(f"Failed to parse optimization logic output: {e}") logger.error(f"Failed to parse optimization logic output: {e}")
......
import logging
from agent.image_search_agent.vision_model import vision_model
logger = logging.getLogger(__name__)
async def handle_visual_search(image_path: str) -> dict:
"""
Main entry point for Visual Search pipeline.
1. Pass image to Local CPU Vision Model to extract tags.
2. Convert tags to a DB search query.
3. Return structured query for further execution in Lead Agent or direct SQL.
"""
logger.info(f"Đang thực hiện Visual Search cho ảnh: {image_path}")
# 1. Image Analysis (CPU-based)
analysis_result = vision_model.analyze_image(image_path)
if "error" in analysis_result:
logger.error(f"Visual Search thất bại: {analysis_result['error']}")
return {"success": False, "error": analysis_result["error"]}
features = analysis_result.get("features", {})
category = features.get("category", "")
color = features.get("color", "")
style = features.get("style", "")
# 2. Build Query Intent
query_parts = []
if category and category != "unknown":
query_parts.append(category)
if color and color != "unknown":
query_parts.append(f"màu {color}")
if style and style != "unknown":
query_parts.append(f"phong cách {style}")
generated_query = " ".join(query_parts) if query_parts else "sản phẩm thời trang"
logger.info(f"Visual Search sinh ra query: '{generated_query}' dựa trên ảnh.")
# Ở phiên bản POC này, trả về intent query để có thể đưa thẳng vào Lead Search Agent
# (hoặc Split Query Flow) để tiếp tục query Database StarRocks.
return {
"success": True,
"raw_features": features,
"search_query": generated_query,
"confidence": analysis_result.get("confidence", 0.0)
}
...@@ -114,18 +114,36 @@ class ImageSearchGraph: ...@@ -114,18 +114,36 @@ class ImageSearchGraph:
if history: if history:
messages.extend(history) messages.extend(history)
# Xử lý multimodal (ảnh + text) from agent.image_search_agent.vision_model import vision_model
import tempfile
import base64
import os
# Xử lý multimodal (ảnh + text) bằng Local CPU Model
if images and len(images) > 0: if images and len(images) > 0:
text_content = user_message + "\n\n[📸 Xin hãy mô tả rõ ràng, cụ thể các đặc điểm của trang phục trong ảnh để tìm kiếm món đồ tương tự.]" extracted_features_text = ""
multimodal_content = [{"type": "text", "text": text_content}] for idx, img_b64 in enumerate(images):
for img in images: # Lưu base64 ra file tạm để model đọc
image_url = img if img.startswith("data:") or img.startswith("http") else f"data:image/jpeg;base64,{img}" img_data = img_b64.split(",")[-1] if "," in img_b64 else img_b64
multimodal_content.append({ try:
"type": "image_url", with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
"image_url": {"url": image_url, "detail": "auto"} tmp.write(base64.b64decode(img_data))
}) tmp_path = tmp.name
messages.append(HumanMessage(content=multimodal_content))
logger.info(f"📸 [IMAGE SEARCH] Injected {len(images)} image(s)") # Phân tích bằng Local CPU Model
analysis = vision_model.analyze_image(tmp_path)
os.unlink(tmp_path)
if analysis.get("success"):
feats = analysis["features"]
extracted_features_text += f"Ảnh {idx+1} [Local Vision AI]: Danh mục: {feats.get('category')}, Màu: {feats.get('color')}, Phong cách: {feats.get('style')}\n"
extracted_features_text += f"Raw tags: {', '.join(feats.get('raw_labels', []))}\n"
except Exception as e:
logger.error(f"Lỗi đọc ảnh cho Local Vision Model: {e}")
text_content = user_message + "\n\n[Hệ thống Vision Model Local đã phân tích ảnh tải lên và trích xuất các đặc điểm sau:]\n" + extracted_features_text + "\n[Hãy dựa vào các đặc điểm trên để tìm kiếm món đồ tương tự.]"
messages.append(HumanMessage(content=text_content))
logger.info(f"📸 [IMAGE SEARCH] Injected {len(images)} image(s) using Local CPU Vision Model")
else: else:
messages.append(HumanMessage(content=user_message)) messages.append(HumanMessage(content=user_message))
......
import logging
from PIL import Image
import torch
import json
logger = logging.getLogger(__name__)
class LocalVisionModel:
"""
Ultra-lightweight local Vision Model running strictly on CPU.
Sử dụng google/vit-base-patch16-224 (rất nhẹ, ~340MB) để extract tags ảnh.
Nếu chưa có model, transformers sẽ tự tải về cache của HF.
"""
_instance = None
_classifier = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(LocalVisionModel, cls).__new__(cls)
cls._instance._init_model()
return cls._instance
def _init_model(self):
try:
from transformers import pipeline
logger.info("Khởi tạo Local Vision Model (google/vit-base-patch16-224) trên CPU...")
# Ép chạy trên CPU bằng device=-1
self._classifier = pipeline(
task="image-classification",
model="google/vit-base-patch16-224",
device=-1
)
logger.info("Đã khởi tạo thành công Local Vision Model.")
except ImportError:
logger.error("Chưa cài đặt thư viện 'transformers' và 'torch'. Vui lòng chạy: pip install transformers torch torchvision")
self._classifier = None
except Exception as e:
logger.error(f"Lỗi khởi tạo Vision Model: {e}")
self._classifier = None
def analyze_image(self, image_path: str) -> dict:
"""
Nhận vào đường dẫn ảnh, trả về bộ tags được trích xuất: màu sắc, loại áo quần.
"""
if not self._classifier:
return {"error": "Vision model chưa sẵn sàng. Vui lòng kiểm tra logs."}
try:
# 1. Load ảnh
img = Image.open(image_path).convert("RGB")
# 2. Phân tích qua Model
predictions = self._classifier(img)
# 3. Format lại để giống cấu trúc JSON
# VD: [{'score': 0.8, 'label': 'jean, blue jean, denim'}, ...]
top_tags = [p['label'] for p in predictions[:3]]
# Ánh xạ từ ImageNet label sang Fashion category (Rất basic, chỉ là POC)
# Thực tế nên dùng model chuyên biệt cho thời trang như fashion-clip
extracted_features = {
"raw_labels": top_tags,
"category": "unknown",
"color": "unknown",
"style": "casual"
}
# Simple heuristic mapping
tags_str = " ".join(top_tags).lower()
if "shirt" in tags_str or "t-shirt" in tags_str or "jersey" in tags_str:
extracted_features["category"] = "áo thun"
elif "jean" in tags_str or "denim" in tags_str:
extracted_features["category"] = "quần jeans"
extracted_features["color"] = "xanh"
elif "jacket" in tags_str or "coat" in tags_str:
extracted_features["category"] = "áo khoác"
if "red" in tags_str: extracted_features["color"] = "đỏ"
if "black" in tags_str: extracted_features["color"] = "đen"
if "white" in tags_str: extracted_features["color"] = "trắng"
return {
"success": True,
"features": extracted_features,
"confidence": predictions[0]['score']
}
except Exception as e:
logger.error(f"Lỗi khi phân tích ảnh: {e}")
return {"error": str(e)}
# Xuất instance dùng chung
vision_model = LocalVisionModel()
This diff is collapsed.
"""
Inline Edit Agent Graph — LangGraph StateGraph for report section editing.
┌── simple_edit ──→ rewrite → END (rewrite/shorten/fix — no SQL needed)
think ──┤
└── agent_edit ──→ query_data → rewrite_with_data → END (enrich with real data)
Used by api/report_html_route.py via `run_inline_agent()`.
"""
import json
import logging
import re
from typing import Any, TypedDict
from langgraph.graph import END, START, StateGraph
from agent.report_agent.core import call_llm, execute_tools_parallel, parse_json, summarize_results
from agent.report_agent.prompts.inline_prompt import AGENT_SECTION_PROMPT, AGENT_WRITER_PROMPT, INLINE_EDIT_PROMPT
logger = logging.getLogger(__name__)
# ─── State ───────────────────────────────────────────────────────────
class InlineState(TypedDict):
# Input
selected_text: str
action: str # rewrite | enrich | shorten | fix | agent_rewrite
context: str
model: str
codex_token: str | None
openai_key: str | None
# Internal
needs_data: bool
tools_to_run: list[dict]
data_summary: str
thinking: str
# Output
new_text: str
explanation: str
error: str | None
# ─── Nodes ───────────────────────────────────────────────────────────
async def think_node(state: InlineState) -> dict:
"""Analyze the selected text and decide: simple edit or agent-powered rewrite."""
action = state["action"]
# Simple edits → no SQL needed
if action != "agent_rewrite":
return {"needs_data": False, "tools_to_run": [], "thinking": ""}
# Agent rewrite → analyze what data is needed
think_input = (
f"Section text: \"{state['selected_text']}\"\n"
f"Surrounding context: {state['context'][:500]}\n\n"
f"Generate SQL queries to fetch data for enriching this section.\n"
f"Return JSON only."
)
think_raw = await call_llm(
AGENT_SECTION_PROMPT, think_input, state["model"],
codex_token=state.get("codex_token"),
openai_key=state.get("openai_key"),
json_mode=True,
)
think_response = parse_json(think_raw)
tools = think_response.get("tools", [])
skip = think_response.get("action") == "skip"
return {
"needs_data": bool(tools) and not skip,
"tools_to_run": tools,
"thinking": think_response.get("thinking", ""),
}
async def query_node(state: InlineState) -> dict:
"""Execute SQL tools to fetch real data for enriching the section."""
tools_to_run = state.get("tools_to_run", [])
if not tools_to_run:
return {"data_summary": ""}
results = await execute_tools_parallel(tools_to_run)
all_results: dict[str, Any] = {}
for i, (tool_spec, result) in enumerate(zip(tools_to_run, results)):
if isinstance(result, Exception):
result = {"error": str(result)[:200], "data": []}
all_results[f"{tool_spec.get('name', 'q')}_{i}"] = result
data_summary = summarize_results(all_results)
return {"data_summary": data_summary}
async def simple_rewrite_node(state: InlineState) -> dict:
"""Simple rewrite without data: rewrite/shorten/fix."""
user_input = (
f"Selected text: \"{state['selected_text']}\"\n"
f"Action: {state['action']}\n"
f"Surrounding context: {state['context'][:500]}\n\n"
f"Return JSON only."
)
raw = await call_llm(
INLINE_EDIT_PROMPT, user_input, state["model"],
codex_token=state.get("codex_token"),
openai_key=state.get("openai_key"),
)
json_match = re.search(r'\{[\s\S]*\}', raw)
if json_match:
parsed = json.loads(json_match.group())
return {
"new_text": parsed.get("new_text", raw.strip()),
"explanation": parsed.get("explanation", "AI đã chỉnh sửa văn bản"),
}
return {"new_text": raw.strip(), "explanation": "AI đã chỉnh sửa văn bản"}
async def rewrite_with_data_node(state: InlineState) -> dict:
"""Rewrite the section using real data from SQL queries."""
data_summary = state.get("data_summary", "")
if not data_summary.strip() or "no data" in data_summary.lower():
return {
"new_text": state["selected_text"],
"explanation": "Không có dữ liệu mới để bổ sung",
}
write_input = (
f"Original section:\n\"{state['selected_text']}\"\n\n"
f"New data from queries:\n{data_summary}\n\n"
f"Rewrite this section incorporating the new data. Return JSON only."
)
write_raw = await call_llm(
AGENT_WRITER_PROMPT, write_input, state["model"],
codex_token=state.get("codex_token"),
openai_key=state.get("openai_key"),
)
json_match = re.search(r'\{[\s\S]*\}', write_raw)
if json_match:
parsed = json.loads(json_match.group())
return {
"new_text": parsed.get("new_text", write_raw.strip()),
"explanation": parsed.get("explanation", "AI đã bổ sung dữ liệu mới"),
}
return {"new_text": write_raw.strip(), "explanation": "AI đã bổ sung dữ liệu mới"}
# ─── Routing Functions ──────────────────────────────────────────────
def route_after_think(state: InlineState) -> str:
"""Think decides: simple edit → simple_rewrite, data needed → query."""
if state.get("needs_data"):
return "query"
return "simple_rewrite"
# ─── Build Graph ─────────────────────────────────────────────────────
def build_inline_graph() -> StateGraph:
"""Build and compile the inline edit agent graph."""
graph = StateGraph(InlineState)
# Add nodes
graph.add_node("think", think_node)
graph.add_node("query", query_node)
graph.add_node("simple_rewrite", simple_rewrite_node)
graph.add_node("rewrite_with_data", rewrite_with_data_node)
# Entry point
graph.add_edge(START, "think")
# Think → either simple_rewrite or query
graph.add_conditional_edges("think", route_after_think, ["query", "simple_rewrite"])
# query → rewrite_with_data → END
graph.add_edge("query", "rewrite_with_data")
graph.add_edge("rewrite_with_data", END)
# simple_rewrite → END
graph.add_edge("simple_rewrite", END)
return graph.compile()
# Compiled graph instance
inline_graph = build_inline_graph()
# ─── Public API ──────────────────────────────────────────────────────
async def run_inline_agent(
*,
selected_text: str,
action: str = "rewrite",
context: str = "",
model: str = "codex/gpt-5.3-codex",
codex_token: str | None = None,
openai_key: str | None = None,
) -> dict:
"""
Run the inline edit agent and return the result.
Returns: {"new_text": str, "explanation": str} or {"error": str}
"""
initial_state: InlineState = {
"selected_text": selected_text,
"action": action,
"context": context,
"model": model,
"codex_token": codex_token,
"openai_key": openai_key,
"needs_data": False,
"tools_to_run": [],
"data_summary": "",
"thinking": "",
"new_text": "",
"explanation": "",
"error": None,
}
try:
result = await inline_graph.ainvoke(initial_state)
return {
"new_text": result.get("new_text", selected_text),
"explanation": result.get("explanation", ""),
}
except Exception as e:
logger.error("Inline agent error: %s", e)
return {"error": str(e)}
import asyncio
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from agent.report_agent.report_queue import enqueue_report_task
logger = logging.getLogger(__name__)
async def automated_daily_sales_report():
"""
Automated job to generate daily sales reports.
"""
logger.info("Triggering automated daily sales report...")
question = "Tổng hợp báo cáo doanh thu bán hàng ngày hôm qua. So sánh với cùng kỳ tuần trước. Phân tích các sản phẩm bán chạy nhất."
# Fire and forget into the queue
task_id = await enqueue_report_task(
question=question,
model="codex/gpt-5.3-codex",
conversation_id="automated-cron-job",
parent_report_id=None,
current_html=None,
codex_token=None, # Handled by fallback to system env
openai_key=None,
)
if task_id:
logger.info(f"Successfully enqueued automated report task: {task_id}")
else:
logger.error("Failed to enqueue automated report task.")
def start_scheduler():
"""
Khởi tạo và chạy APScheduler cho Report Agent.
Được gọi từ main() hoặc lifespan của FastAPI.
"""
scheduler = AsyncIOScheduler()
# Chạy vào 7h sáng mỗi ngày
scheduler.add_job(
automated_daily_sales_report,
CronTrigger(hour=7, minute=0, timezone="Asia/Ho_Chi_Minh"),
id="daily_sales_report",
name="Automated Daily Sales Report",
replace_existing=True
)
scheduler.start()
logger.info("Report Agent Scheduler started. Automated reports will run daily at 07:00 AM.")
if __name__ == "__main__":
# Test script
logging.basicConfig(level=logging.INFO)
asyncio.run(automated_daily_sales_report())
hermes-agent-repo @ 3800972d
Subproject commit 3800972dd05eabed8d75bfc4c0f5d532d85dafe2
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment