diff --git a/backend-py/app/ai.py b/backend-py/app/ai.py index f7a5cb4..f2e9836 100644 --- a/backend-py/app/ai.py +++ b/backend-py/app/ai.py @@ -161,6 +161,10 @@ SYSTEM_PROMPT_PICKER = ( ' "next_steps": ["рекомендации для менеджера: что уточнить с клиентом, что проверить на замере"]\n' "}\n\n" "═══ КРИТИЧНО ═══\n" + "0. **КАТАЛОГ МОДЕЛЕЙ**: если в user-prompt передан раздел `ДОСТУПНЫЙ КАТАЛОГ МОДЕЛЕЙ` — \n" + " ты ОБЯЗАН выбирать модели ТОЛЬКО оттуда. Каждая модель из каталога — реальный артикул, \n" + " подтверждённый парсером маркетплейса. Не из каталога = не использовать.\n" + " Если каталог не передан (или пустой по нужным категориям) — действуй на основе своих знаний с правилами ниже.\n" "1. **Реальные модели**: артикулы должны существовать в природе:\n" " - Haier C4F744CMG, Haier HRF-541DM7RU (холодильники)\n" " - Bosch Serie 4 KGN39NW00R ⚠, Liebherr CNd 5223 ⚠\n" diff --git a/backend-py/app/catalog.py b/backend-py/app/catalog.py new file mode 100644 index 0000000..06e0fcf --- /dev/null +++ b/backend-py/app/catalog.py @@ -0,0 +1,229 @@ +"""Кэш каталога моделей в Google Sheets. + +Зачем: AI должен выбирать модели из РЕАЛЬНОГО списка (собранного парсерами), +а не выдумывать артикулы. Раз в неделю обновляем каталог запуском +парсеров по seed-комбинациям бренд+категория. + +Использование: +- POST /api/catalog/refresh?cat=fridge — обновить одну категорию +- POST /api/catalog/refresh — обновить все 8 категорий (медленно) +- GET /api/catalog/list?cat=fridge — прочитать каталог одной категории +- В _handle_podbor: catalog.list_for_ai(...) для AI prompt +""" +from __future__ import annotations +import logging +import time +import uuid +from datetime import datetime, timezone +from typing import Any + +from . import sheets +from . import parsers + +log = logging.getLogger("zov.catalog") + +SHEET_NAME = "Catalog" +HEADERS = [ + "id", "category", "brand", "tier", "model_name", + "search_query", "price_min_rub", "price_max_rub", + "image_url", "source", "url", "last_seen_at", +] + +# Бренды по тирам — реалии РФ 2026 +SEED_BRANDS_BY_TIER = { + "premium": ["Miele", "Liebherr", "Gaggenau", "V-Zug", "Asko", "Smeg"], + "middle": ["Bosch", "Siemens", "NEFF", "Haier", "Samsung", "LG", "Electrolux", "AEG"], + "budget": ["Kuppersberg", "Maunfeld", "Weissgauff", "Korting", + "Hansa", "Beko", "Gorenje", "Hisense"], +} + +# Категории и поисковое слово на русском +CATEGORY_QUERIES = { + "fridge": "холодильник", + "hob": "варочная панель", + "oven": "духовой шкаф", + "dw": "посудомоечная машина", + "hood": "вытяжка", + "microwave": "микроволновая печь", + "coffee": "кофемашина", + "washer": "стиральная машина", +} + + +def refresh_catalog(categories: list[str] | None = None, + sources: tuple = ("yamarket", "wb", "citilink"), + per_brand: int = 2, + delay_sec: float = 1.0) -> dict[str, Any]: + """Запускает парсеры для каждого (brand × category) комбо, сохраняет результаты в Sheets. + + Args: + categories: список ключей категорий (если None — все 8) + sources: какие парсеры использовать + per_brand: сколько результатов сохранять на (brand × category) + delay_sec: пауза между запросами к парсерам (не нагружать) + + Returns: + dict со статистикой: {total_added, by_category, errors} + """ + if categories is None: + categories = list(CATEGORY_QUERIES.keys()) + + # Гарантируем что лист есть + sheets.ensure_sheet(SHEET_NAME, HEADERS) + + total_added = 0 + by_category: dict[str, int] = {} + errors: list[str] = [] + + for cat in categories: + cat_label = CATEGORY_QUERIES.get(cat) + if not cat_label: + errors.append(f"unknown category: {cat}") + continue + + added_cat = 0 + for tier, brands in SEED_BRANDS_BY_TIER.items(): + for brand in brands: + query = f"{brand} {cat_label}" + log.info("Catalog refresh: %s · %s · %r", cat, brand, query) + try: + enriched = parsers.enrich_one(query, sources=sources) + except Exception as e: + err = f"{cat}/{brand}: {e}" + log.warning("enrich_one failed: %s", err) + errors.append(err) + if delay_sec > 0: + time.sleep(delay_sec) + continue + + items_added = _save_results(cat, brand, tier, query, enriched, per_brand) + added_cat += items_added + total_added += items_added + + if delay_sec > 0: + time.sleep(delay_sec) + + by_category[cat] = added_cat + log.info("Catalog refresh: category %s — %d items added", cat, added_cat) + + return { + "ok": True, + "total_added": total_added, + "by_category": by_category, + "errors": errors[:10], # первые 10 ошибок + } + + +def _save_results(cat: str, brand: str, tier: str, query: str, + enriched: dict, max_items: int) -> int: + """Сохраняет до max_items релевантных результатов из enriched.""" + if not enriched: + return 0 + + saved = 0 + seen_titles = set() + sources_priority = ["yamarket", "wb", "citilink", "ozon", "dns"] + + for src in sources_priority: + if saved >= max_items: + break + item = enriched.get(src) + if not item or not item.get("title"): + continue + + # Фильтр релевантности: бренд должен упоминаться в названии или specs.brand + title = (item.get("title") or "").lower() + item_brand = (item.get("specs") or {}).get("brand", "").lower() + if brand.lower() not in title and brand.lower() not in item_brand: + continue + + # Дедуп по title в рамках одного (cat, brand) + title_key = item["title"][:100].lower().strip() + if title_key in seen_titles: + continue + seen_titles.add(title_key) + + try: + sheets.append_row(SHEET_NAME, [ + _short_id(), + cat, + brand, + tier, + item["title"][:250], + query, + item.get("price_min_rub") or "", + item.get("price_max_rub") or "", + item.get("image_url") or "", + src, + item.get("url") or "", + _now_iso(), + ]) + saved += 1 + except Exception as e: + log.warning("Failed to save row: %s", e) + + return saved + + +def list_catalog(category: str | None = None, tier: str | None = None, + brand: str | None = None, limit: int = 200) -> list[dict[str, Any]]: + """Читает каталог из Sheets с опциональными фильтрами.""" + try: + ws = sheets.sheet(SHEET_NAME) + rows = ws.get_all_values() + except Exception as e: + log.warning("Cannot read Catalog sheet: %s", e) + return [] + + if not rows or len(rows) < 2: + return [] + + headers = rows[0] + out: list[dict[str, Any]] = [] + for r in rows[1:]: + row = dict(zip(headers, r + [""] * (len(headers) - len(r)))) + if category and row.get("category") != category: + continue + if tier and row.get("tier") != tier: + continue + if brand and row.get("brand", "").lower() != brand.lower(): + continue + out.append(row) + if len(out) >= limit: + break + return out + + +def list_for_ai(categories: list[str], tiers: list[str] | None = None, + limit_per_cat: int = 30) -> str: + """Формирует короткий текст-каталог для AI prompt. + + Пример вывода: + fridge candidates (Haier, Bosch ⚠, ...): + - Haier C2F619CFU1 [middle] · ~44 800 ₽ + - Haier C4F744CMG [middle] · ~79 990 ₽ + - Bosch Serie 4 KGN39NW00R ⚠ [middle] · ~85 000 ₽ + ... + """ + lines = [] + for cat in categories: + items = list_catalog(category=cat, limit=limit_per_cat * 3) # с запасом + if tiers: + items = [i for i in items if i.get("tier") in tiers] + items = items[:limit_per_cat] + if not items: + continue + lines.append(f"\n{cat} ({len(items)} моделей):") + for it in items: + price = it.get("price_min_rub") or "" + price_str = f" · ~{price} ₽" if price else "" + lines.append(f" - {it.get('brand', '')} {it.get('model_name', '')} [{it.get('tier', '')}]{price_str}") + return "\n".join(lines).strip() + + +def _short_id() -> str: + return uuid.uuid4().hex[:13] + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() diff --git a/backend-py/app/main.py b/backend-py/app/main.py index ca8ef35..0229e4e 100644 --- a/backend-py/app/main.py +++ b/backend-py/app/main.py @@ -11,7 +11,7 @@ from fastapi.responses import JSONResponse from .config import get_config from .auth import verify_init_data -from . import sheets, ai, telegram as tg, proxy_pool +from . import sheets, ai, telegram as tg, proxy_pool, catalog from . import parsers from .parsers import dns as parser_dns, wb as parser_wb, ozon as parser_ozon, yamarket as parser_ym, citilink as parser_cl @@ -224,6 +224,55 @@ async def api_proxy_status(): return proxy_pool.pool_status() +@app.post("/api/catalog/refresh") +def api_catalog_refresh(cat: str = "", per_brand: int = 2, delay: float = 1.0): + """Запускает парсинг каталога (медленно — несколько минут на категорию). + + Параметры: + cat: одна категория (fridge|hob|oven|dw|hood|microwave|coffee|washer) + или пусто = все 8 (очень долго) + per_brand: сколько моделей сохранять на (brand × category) — default 2 + delay: задержка между запросами к парсерам, сек — default 1.0 + """ + categories = [cat] if cat else None + try: + result = catalog.refresh_catalog( + categories=categories, + per_brand=max(1, min(per_brand, 5)), + delay_sec=max(0.0, min(delay, 10.0)), + ) + return result + except Exception as e: + log.exception("catalog refresh failed") + return {"ok": False, "error": str(e)} + + +@app.get("/api/catalog/list") +def api_catalog_list(cat: str = "", tier: str = "", brand: str = "", limit: int = 100): + """Читает каталог моделей из Sheets с фильтрами.""" + items = catalog.list_catalog( + category=cat or None, + tier=tier or None, + brand=brand or None, + limit=min(limit, 500), + ) + return { + "ok": True, + "filters": {"category": cat, "tier": tier, "brand": brand}, + "count": len(items), + "items": items, + } + + +@app.get("/api/catalog/preview_ai") +def api_catalog_preview_ai(cats: str = "fridge", tiers: str = ""): + """Превью того, что AI получит в prompt (для отладки).""" + cat_list = [c.strip() for c in cats.split(",") if c.strip()] + tier_list = [t.strip() for t in tiers.split(",") if t.strip()] or None + text = catalog.list_for_ai(cat_list, tiers=tier_list, limit_per_cat=30) + return {"text": text, "length_chars": len(text)} + + # ================================================================= # Handlers # ================================================================= @@ -354,10 +403,20 @@ def _handle_podbor(body: dict[str, Any]) -> dict[str, Any]: "", "", 0, False, "new", 0, ]) + # Загружаем релевантный каталог моделей и передаём AI как «доступный пул» + catalog_text = _build_catalog_context(checklist) + user_prompt = ( f"Подбери технику для следующего клиента:\n\n" f"{json.dumps({'client': {'name': client_name}, 'checklist': checklist}, ensure_ascii=False, indent=2)}" ) + if catalog_text: + user_prompt += ( + "\n\n═══ ДОСТУПНЫЙ КАТАЛОГ МОДЕЛЕЙ (выбирай ТОЛЬКО из этого списка) ═══\n" + + catalog_text + + "\n\nВАЖНО: если модель не из этого списка — НЕ возвращай её. " + "Каталог собран парсерами с реальных маркетплейсов РФ — это гарантия что артикул существует." + ) ai_result = ai.call_ai(user_prompt) # Обогащение моделей данными с маркетплейсов (WB / Я.Маркет / OZON / DNS) @@ -382,6 +441,33 @@ def _handle_podbor(body: dict[str, Any]) -> dict[str, Any]: return {"ok": True, "id": lead_id, "summary": summary_text, "ai": ai_result.get("json")} +def _build_catalog_context(checklist: dict[str, Any]) -> str: + """Готовит компактный текст-каталог для AI prompt. + + Берёт только релевантные категории + тиры (по budget_preset). + """ + cats = checklist.get("categories") or [] + if not cats: + return "" + + # Маппинг бюджет-пресета → тиры каталога + bp = checklist.get("budget_preset") or "" + tier_map = { + "luxe": ["premium"], + "premium": ["premium", "middle"], + "middle": ["middle"], + "budget": ["middle", "budget"], # средний и ниже + "exact": None, + } + tiers = tier_map.get(bp) # None = все тиры + + try: + return catalog.list_for_ai(cats, tiers=tiers, limit_per_cat=25) + except Exception as e: + log.warning("Cannot build catalog context: %s", e) + return "" + + def _enrich_ai_marketplaces(ai_result: dict[str, Any]) -> None: """Обогащает каждую модель из ai_result['json']['by_category'] данными с маркетплейсов (WB / Я.Маркет / OZON / DNS). Если PROXY6_TOKEN не задан — diff --git a/backend-py/app/sheets.py b/backend-py/app/sheets.py index a7bc467..ebbfd0d 100644 --- a/backend-py/app/sheets.py +++ b/backend-py/app/sheets.py @@ -38,6 +38,24 @@ def sheet(name: str) -> gspread.Worksheet: return book.worksheet(name) +def ensure_sheet(name: str, headers: list[str]) -> gspread.Worksheet: + """Создаёт лист с заголовками если он не существует. Иначе возвращает существующий.""" + _, book = _client_book() + try: + ws = book.worksheet(name) + try: + first = ws.row_values(1) + except Exception: + first = [] + if not first: + ws.update("A1", [headers]) + return ws + except gspread.exceptions.WorksheetNotFound: + ws = book.add_worksheet(title=name, rows=2000, cols=max(20, len(headers))) + ws.append_row(headers, value_input_option="USER_ENTERED") + return ws + + def append_row(name: str, row: list[Any]) -> None: sheet(name).append_row(row, value_input_option="USER_ENTERED")