catalog: models cache in Sheets — AI picks from real list, no SKU hallucination

NEW MODULE app/catalog.py:
- refresh_catalog(cats, sources, per_brand, delay) — runs parsers for seed brand+category pairs
- list_catalog(cat, tier, brand) — reads from Sheets
- list_for_ai(cats, tiers) — compact text for AI prompt context
- SEED_BRANDS_BY_TIER + CATEGORY_QUERIES — 22 brands × 8 cats = 176 combos
- Saves top-2 relevant results per (brand × cat), filters by brand presence in title
- Dedup by title hash within (cat, brand) bucket

SHEETS:
- ensure_sheet(name, headers) — auto-creates Catalog tab on first refresh
- Schema: id, category, brand, tier, model_name, search_query, price_min/max, image_url, source, url, last_seen_at

ENDPOINTS:
- POST /api/catalog/refresh?cat=X&per_brand=N — manual refresh (1 cat ~2-5 min)
- GET /api/catalog/list?cat=&tier=&brand= — read with filters
- GET /api/catalog/preview_ai?cats=fridge — debug what AI receives

AI PROMPT:
- Rule #0: if catalog passed in user prompt — MUST select only from there
- _build_catalog_context: filters by checklist.budget_preset → tier subset
  (luxe→premium, premium→premium+middle, middle→middle, budget→middle+budget)

_handle_podbor:
- Loads catalog subset, appends to user_prompt as 'ДОСТУПНЫЙ КАТАЛОГ МОДЕЛЕЙ'
- AI 'выбирай ТОЛЬКО из этого списка' rule reinforced

NEXT: trigger refresh manually for 1 category (~3 min), then real podbor test
to verify AI uses catalog models instead of hallucinating SKUs
This commit is contained in:
wasrusgen 2026-05-12 06:32:39 +03:00
parent 1a57374020
commit 9e652c4a34
4 changed files with 338 additions and 1 deletions

View File

@ -161,6 +161,10 @@ SYSTEM_PROMPT_PICKER = (
' "next_steps": ["рекомендации для менеджера: что уточнить с клиентом, что проверить на замере"]\n'
"}\n\n"
"═══ КРИТИЧНО ═══\n"
"0. **КАТАЛОГ МОДЕЛЕЙ**: если в user-prompt передан раздел `ДОСТУПНЫЙ КАТАЛОГ МОДЕЛЕЙ` — \n"
" ты ОБЯЗАН выбирать модели ТОЛЬКО оттуда. Каждая модель из каталога — реальный артикул, \n"
" подтверждённый парсером маркетплейса. Не из каталога = не использовать.\n"
" Если каталог не передан (или пустой по нужным категориям) — действуй на основе своих знаний с правилами ниже.\n"
"1. **Реальные модели**: артикулы должны существовать в природе:\n"
" - Haier C4F744CMG, Haier HRF-541DM7RU (холодильники)\n"
" - Bosch Serie 4 KGN39NW00R ⚠, Liebherr CNd 5223 ⚠\n"

229
backend-py/app/catalog.py Normal file
View File

@ -0,0 +1,229 @@
"""Кэш каталога моделей в Google Sheets.
Зачем: AI должен выбирать модели из РЕАЛЬНОГО списка (собранного парсерами),
а не выдумывать артикулы. Раз в неделю обновляем каталог запуском
парсеров по seed-комбинациям бренд+категория.
Использование:
- POST /api/catalog/refresh?cat=fridge обновить одну категорию
- POST /api/catalog/refresh обновить все 8 категорий (медленно)
- GET /api/catalog/list?cat=fridge прочитать каталог одной категории
- В _handle_podbor: catalog.list_for_ai(...) для AI prompt
"""
from __future__ import annotations
import logging
import time
import uuid
from datetime import datetime, timezone
from typing import Any
from . import sheets
from . import parsers
log = logging.getLogger("zov.catalog")
SHEET_NAME = "Catalog"
HEADERS = [
"id", "category", "brand", "tier", "model_name",
"search_query", "price_min_rub", "price_max_rub",
"image_url", "source", "url", "last_seen_at",
]
# Бренды по тирам — реалии РФ 2026
SEED_BRANDS_BY_TIER = {
"premium": ["Miele", "Liebherr", "Gaggenau", "V-Zug", "Asko", "Smeg"],
"middle": ["Bosch", "Siemens", "NEFF", "Haier", "Samsung", "LG", "Electrolux", "AEG"],
"budget": ["Kuppersberg", "Maunfeld", "Weissgauff", "Korting",
"Hansa", "Beko", "Gorenje", "Hisense"],
}
# Категории и поисковое слово на русском
CATEGORY_QUERIES = {
"fridge": "холодильник",
"hob": "варочная панель",
"oven": "духовой шкаф",
"dw": "посудомоечная машина",
"hood": "вытяжка",
"microwave": "микроволновая печь",
"coffee": "кофемашина",
"washer": "стиральная машина",
}
def refresh_catalog(categories: list[str] | None = None,
sources: tuple = ("yamarket", "wb", "citilink"),
per_brand: int = 2,
delay_sec: float = 1.0) -> dict[str, Any]:
"""Запускает парсеры для каждого (brand × category) комбо, сохраняет результаты в Sheets.
Args:
categories: список ключей категорий (если None все 8)
sources: какие парсеры использовать
per_brand: сколько результатов сохранять на (brand × category)
delay_sec: пауза между запросами к парсерам (не нагружать)
Returns:
dict со статистикой: {total_added, by_category, errors}
"""
if categories is None:
categories = list(CATEGORY_QUERIES.keys())
# Гарантируем что лист есть
sheets.ensure_sheet(SHEET_NAME, HEADERS)
total_added = 0
by_category: dict[str, int] = {}
errors: list[str] = []
for cat in categories:
cat_label = CATEGORY_QUERIES.get(cat)
if not cat_label:
errors.append(f"unknown category: {cat}")
continue
added_cat = 0
for tier, brands in SEED_BRANDS_BY_TIER.items():
for brand in brands:
query = f"{brand} {cat_label}"
log.info("Catalog refresh: %s · %s · %r", cat, brand, query)
try:
enriched = parsers.enrich_one(query, sources=sources)
except Exception as e:
err = f"{cat}/{brand}: {e}"
log.warning("enrich_one failed: %s", err)
errors.append(err)
if delay_sec > 0:
time.sleep(delay_sec)
continue
items_added = _save_results(cat, brand, tier, query, enriched, per_brand)
added_cat += items_added
total_added += items_added
if delay_sec > 0:
time.sleep(delay_sec)
by_category[cat] = added_cat
log.info("Catalog refresh: category %s%d items added", cat, added_cat)
return {
"ok": True,
"total_added": total_added,
"by_category": by_category,
"errors": errors[:10], # первые 10 ошибок
}
def _save_results(cat: str, brand: str, tier: str, query: str,
enriched: dict, max_items: int) -> int:
"""Сохраняет до max_items релевантных результатов из enriched."""
if not enriched:
return 0
saved = 0
seen_titles = set()
sources_priority = ["yamarket", "wb", "citilink", "ozon", "dns"]
for src in sources_priority:
if saved >= max_items:
break
item = enriched.get(src)
if not item or not item.get("title"):
continue
# Фильтр релевантности: бренд должен упоминаться в названии или specs.brand
title = (item.get("title") or "").lower()
item_brand = (item.get("specs") or {}).get("brand", "").lower()
if brand.lower() not in title and brand.lower() not in item_brand:
continue
# Дедуп по title в рамках одного (cat, brand)
title_key = item["title"][:100].lower().strip()
if title_key in seen_titles:
continue
seen_titles.add(title_key)
try:
sheets.append_row(SHEET_NAME, [
_short_id(),
cat,
brand,
tier,
item["title"][:250],
query,
item.get("price_min_rub") or "",
item.get("price_max_rub") or "",
item.get("image_url") or "",
src,
item.get("url") or "",
_now_iso(),
])
saved += 1
except Exception as e:
log.warning("Failed to save row: %s", e)
return saved
def list_catalog(category: str | None = None, tier: str | None = None,
brand: str | None = None, limit: int = 200) -> list[dict[str, Any]]:
"""Читает каталог из Sheets с опциональными фильтрами."""
try:
ws = sheets.sheet(SHEET_NAME)
rows = ws.get_all_values()
except Exception as e:
log.warning("Cannot read Catalog sheet: %s", e)
return []
if not rows or len(rows) < 2:
return []
headers = rows[0]
out: list[dict[str, Any]] = []
for r in rows[1:]:
row = dict(zip(headers, r + [""] * (len(headers) - len(r))))
if category and row.get("category") != category:
continue
if tier and row.get("tier") != tier:
continue
if brand and row.get("brand", "").lower() != brand.lower():
continue
out.append(row)
if len(out) >= limit:
break
return out
def list_for_ai(categories: list[str], tiers: list[str] | None = None,
limit_per_cat: int = 30) -> str:
"""Формирует короткий текст-каталог для AI prompt.
Пример вывода:
fridge candidates (Haier, Bosch , ...):
- Haier C2F619CFU1 [middle] · ~44 800
- Haier C4F744CMG [middle] · ~79 990
- Bosch Serie 4 KGN39NW00R [middle] · ~85 000
...
"""
lines = []
for cat in categories:
items = list_catalog(category=cat, limit=limit_per_cat * 3) # с запасом
if tiers:
items = [i for i in items if i.get("tier") in tiers]
items = items[:limit_per_cat]
if not items:
continue
lines.append(f"\n{cat} ({len(items)} моделей):")
for it in items:
price = it.get("price_min_rub") or ""
price_str = f" · ~{price}" if price else ""
lines.append(f" - {it.get('brand', '')} {it.get('model_name', '')} [{it.get('tier', '')}]{price_str}")
return "\n".join(lines).strip()
def _short_id() -> str:
return uuid.uuid4().hex[:13]
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()

View File

@ -11,7 +11,7 @@ from fastapi.responses import JSONResponse
from .config import get_config
from .auth import verify_init_data
from . import sheets, ai, telegram as tg, proxy_pool
from . import sheets, ai, telegram as tg, proxy_pool, catalog
from . import parsers
from .parsers import dns as parser_dns, wb as parser_wb, ozon as parser_ozon, yamarket as parser_ym, citilink as parser_cl
@ -224,6 +224,55 @@ async def api_proxy_status():
return proxy_pool.pool_status()
@app.post("/api/catalog/refresh")
def api_catalog_refresh(cat: str = "", per_brand: int = 2, delay: float = 1.0):
"""Запускает парсинг каталога (медленно — несколько минут на категорию).
Параметры:
cat: одна категория (fridge|hob|oven|dw|hood|microwave|coffee|washer)
или пусто = все 8 (очень долго)
per_brand: сколько моделей сохранять на (brand × category) default 2
delay: задержка между запросами к парсерам, сек default 1.0
"""
categories = [cat] if cat else None
try:
result = catalog.refresh_catalog(
categories=categories,
per_brand=max(1, min(per_brand, 5)),
delay_sec=max(0.0, min(delay, 10.0)),
)
return result
except Exception as e:
log.exception("catalog refresh failed")
return {"ok": False, "error": str(e)}
@app.get("/api/catalog/list")
def api_catalog_list(cat: str = "", tier: str = "", brand: str = "", limit: int = 100):
"""Читает каталог моделей из Sheets с фильтрами."""
items = catalog.list_catalog(
category=cat or None,
tier=tier or None,
brand=brand or None,
limit=min(limit, 500),
)
return {
"ok": True,
"filters": {"category": cat, "tier": tier, "brand": brand},
"count": len(items),
"items": items,
}
@app.get("/api/catalog/preview_ai")
def api_catalog_preview_ai(cats: str = "fridge", tiers: str = ""):
"""Превью того, что AI получит в prompt (для отладки)."""
cat_list = [c.strip() for c in cats.split(",") if c.strip()]
tier_list = [t.strip() for t in tiers.split(",") if t.strip()] or None
text = catalog.list_for_ai(cat_list, tiers=tier_list, limit_per_cat=30)
return {"text": text, "length_chars": len(text)}
# =================================================================
# Handlers
# =================================================================
@ -354,10 +403,20 @@ def _handle_podbor(body: dict[str, Any]) -> dict[str, Any]:
"", "", 0, False, "new", 0,
])
# Загружаем релевантный каталог моделей и передаём AI как «доступный пул»
catalog_text = _build_catalog_context(checklist)
user_prompt = (
f"Подбери технику для следующего клиента:\n\n"
f"{json.dumps({'client': {'name': client_name}, 'checklist': checklist}, ensure_ascii=False, indent=2)}"
)
if catalog_text:
user_prompt += (
"\n\n═══ ДОСТУПНЫЙ КАТАЛОГ МОДЕЛЕЙ (выбирай ТОЛЬКО из этого списка) ═══\n"
+ catalog_text
+ "\n\nВАЖНО: если модель не из этого списка — НЕ возвращай её. "
"Каталог собран парсерами с реальных маркетплейсов РФ — это гарантия что артикул существует."
)
ai_result = ai.call_ai(user_prompt)
# Обогащение моделей данными с маркетплейсов (WB / Я.Маркет / OZON / DNS)
@ -382,6 +441,33 @@ def _handle_podbor(body: dict[str, Any]) -> dict[str, Any]:
return {"ok": True, "id": lead_id, "summary": summary_text, "ai": ai_result.get("json")}
def _build_catalog_context(checklist: dict[str, Any]) -> str:
"""Готовит компактный текст-каталог для AI prompt.
Берёт только релевантные категории + тиры (по budget_preset).
"""
cats = checklist.get("categories") or []
if not cats:
return ""
# Маппинг бюджет-пресета → тиры каталога
bp = checklist.get("budget_preset") or ""
tier_map = {
"luxe": ["premium"],
"premium": ["premium", "middle"],
"middle": ["middle"],
"budget": ["middle", "budget"], # средний и ниже
"exact": None,
}
tiers = tier_map.get(bp) # None = все тиры
try:
return catalog.list_for_ai(cats, tiers=tiers, limit_per_cat=25)
except Exception as e:
log.warning("Cannot build catalog context: %s", e)
return ""
def _enrich_ai_marketplaces(ai_result: dict[str, Any]) -> None:
"""Обогащает каждую модель из ai_result['json']['by_category'] данными
с маркетплейсов (WB / Я.Маркет / OZON / DNS). Если PROXY6_TOKEN не задан

View File

@ -38,6 +38,24 @@ def sheet(name: str) -> gspread.Worksheet:
return book.worksheet(name)
def ensure_sheet(name: str, headers: list[str]) -> gspread.Worksheet:
"""Создаёт лист с заголовками если он не существует. Иначе возвращает существующий."""
_, book = _client_book()
try:
ws = book.worksheet(name)
try:
first = ws.row_values(1)
except Exception:
first = []
if not first:
ws.update("A1", [headers])
return ws
except gspread.exceptions.WorksheetNotFound:
ws = book.add_worksheet(title=name, rows=2000, cols=max(20, len(headers)))
ws.append_row(headers, value_input_option="USER_ENTERED")
return ws
def append_row(name: str, row: list[Any]) -> None:
sheet(name).append_row(row, value_input_option="USER_ENTERED")