backend: Proxy6 pool + parsers WB / OZON / Я.Маркет / DNS

PROXY POOL (app/proxy_pool.py):
- Loads active proxies from Proxy6.net API every 10 min
- Random rotation per request via proxied_client(timeout, headers)
- Graceful fallback to direct HTTP if PROXY6_TOKEN not set
- Config: PROXY6_TOKEN env var

PARSERS (app/parsers/):
- dns.py — refactored to use proxy_pool with retry+rotation on Qrator block
- wb.py — Wildberries JSON API (search.wb.ru), retries on 429
- ozon.py — OZON composer-api JSON (widgetStates extraction)
- yamarket.py — Я.Маркет HTML + embedded JSON parser
- __init__.py — enrich_one() fans out to all sources, aggregates min/max prices, max rating, sum reviews
- enrich_models() — batch enrich for AI by_category output

NEW DIAGNOSTIC ENDPOINTS (main.py):
- GET /api/parse_wb?q=...&limit=N
- GET /api/parse_ozon?q=...&limit=N
- GET /api/parse_yamarket?q=...&limit=N
- GET /api/parse_all?q=... — fan-out + aggregate
- GET /api/proxy_status — pool diagnostics (count, token configured, age)

PODBOR (main.py):
- _enrich_ai_with_dns -> _enrich_ai_marketplaces (uses all sources)

DEPLOY: needs PROXY6_TOKEN in /opt/zov-tech/deploy/.env on VPS, then docker compose build + up -d backend
This commit is contained in:
wasrusgen 2026-05-11 12:18:04 +03:00
parent 64edb76035
commit 82425dbd88
8 changed files with 876 additions and 41 deletions

View File

@ -19,6 +19,8 @@ class Config:
active_period_days: int active_period_days: int
grace_period_days: int grace_period_days: int
proxy6_token: str # пусто = без прокси (прямой HTTP)
def _required(name: str) -> str: def _required(name: str) -> str:
val = os.getenv(name) val = os.getenv(name)
@ -39,4 +41,5 @@ def get_config() -> Config:
gigachat_scope=os.getenv("GIGACHAT_SCOPE", "GIGACHAT_API_PERS"), gigachat_scope=os.getenv("GIGACHAT_SCOPE", "GIGACHAT_API_PERS"),
active_period_days=int(os.getenv("ACTIVE_PERIOD_DAYS", "90")), active_period_days=int(os.getenv("ACTIVE_PERIOD_DAYS", "90")),
grace_period_days=int(os.getenv("GRACE_PERIOD_DAYS", "14")), grace_period_days=int(os.getenv("GRACE_PERIOD_DAYS", "14")),
proxy6_token=os.getenv("PROXY6_TOKEN", ""),
) )

View File

@ -11,8 +11,9 @@ from fastapi.responses import JSONResponse
from .config import get_config from .config import get_config
from .auth import verify_init_data from .auth import verify_init_data
from . import sheets, ai, telegram as tg from . import sheets, ai, telegram as tg, proxy_pool
from .parsers import dns as parser_dns from . import parsers
from .parsers import dns as parser_dns, wb as parser_wb, ozon as parser_ozon, yamarket as parser_ym
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
log = logging.getLogger("zov.backend") log = logging.getLogger("zov.backend")
@ -143,7 +144,7 @@ async def api_seed_admin():
@app.get("/api/parse_dns") @app.get("/api/parse_dns")
async def api_parse_dns(q: str = "", limit: int = 1): async def api_parse_dns(q: str = "", limit: int = 1):
"""Тестовый эндпоинт парсера DNS. Пример: /api/parse_dns?q=Bosch+KGN39&limit=3""" """Тест парсера DNS."""
if not q: if not q:
return {"error": "missing_query", "hint": "use ?q=<search>"} return {"error": "missing_query", "hint": "use ?q=<search>"}
try: try:
@ -153,6 +154,57 @@ async def api_parse_dns(q: str = "", limit: int = 1):
return {"ok": False, "error": str(e), "query": q} return {"ok": False, "error": str(e), "query": q}
@app.get("/api/parse_wb")
async def api_parse_wb(q: str = "", limit: int = 3):
if not q:
return {"error": "missing_query"}
try:
results = parser_wb.search_wb(q, limit=min(max(1, limit), 10))
return {"ok": True, "query": q, "count": len(results), "results": results}
except Exception as e:
return {"ok": False, "error": str(e), "query": q}
@app.get("/api/parse_ozon")
async def api_parse_ozon(q: str = "", limit: int = 3):
if not q:
return {"error": "missing_query"}
try:
results = parser_ozon.search_ozon(q, limit=min(max(1, limit), 10))
return {"ok": True, "query": q, "count": len(results), "results": results}
except Exception as e:
return {"ok": False, "error": str(e), "query": q}
@app.get("/api/parse_yamarket")
async def api_parse_yamarket(q: str = "", limit: int = 3):
if not q:
return {"error": "missing_query"}
try:
results = parser_ym.search_yamarket(q, limit=min(max(1, limit), 10))
return {"ok": True, "query": q, "count": len(results), "results": results}
except Exception as e:
return {"ok": False, "error": str(e), "query": q}
@app.get("/api/parse_all")
async def api_parse_all(q: str = ""):
"""Спрашивает все источники и возвращает агрегированный результат."""
if not q:
return {"error": "missing_query"}
try:
data = parsers.enrich_one(q)
return {"ok": True, "query": q, "data": data}
except Exception as e:
return {"ok": False, "error": str(e), "query": q}
@app.get("/api/proxy_status")
async def api_proxy_status():
"""Диагностика: показывает текущее состояние proxy-пула."""
return proxy_pool.pool_status()
# ================================================================= # =================================================================
# Handlers # Handlers
# ================================================================= # =================================================================
@ -289,13 +341,13 @@ def _handle_podbor(body: dict[str, Any]) -> dict[str, Any]:
) )
ai_result = ai.call_ai(user_prompt) ai_result = ai.call_ai(user_prompt)
# Обогащение моделей DNS-парсингом # Обогащение моделей данными с маркетплейсов (WB / Я.Маркет / OZON / DNS)
enrich_dns = body.get("enrich", True) enrich_enabled = body.get("enrich", True)
if enrich_dns: if enrich_enabled:
try: try:
_enrich_ai_with_dns(ai_result) _enrich_ai_marketplaces(ai_result)
except Exception as e: except Exception as e:
log.warning("DNS enrich failed: %s", e) log.warning("Marketplace enrich failed: %s", e)
# Update lead row with AI response # Update lead row with AI response
sheets.update_cell_by_key("Leads", "id", lead_id, "ai_response", sheets.update_cell_by_key("Leads", "id", lead_id, "ai_response",
@ -311,8 +363,10 @@ def _handle_podbor(body: dict[str, Any]) -> dict[str, Any]:
return {"ok": True, "id": lead_id, "summary": summary_text, "ai": ai_result.get("json")} return {"ok": True, "id": lead_id, "summary": summary_text, "ai": ai_result.get("json")}
def _enrich_ai_with_dns(ai_result: dict[str, Any]) -> None: def _enrich_ai_marketplaces(ai_result: dict[str, Any]) -> None:
"""Берёт ai_result['json']['by_category'][cat]['models'] и обогащает каждую DNS-данными.""" """Обогащает каждую модель из ai_result['json']['by_category'] данными
с маркетплейсов (WB / Я.Маркет / OZON / DNS). Если PROXY6_TOKEN не задан
скорее всего вернёт пустые данные (Qrator блокирует прямые HTTP)."""
j = ai_result.get("json") j = ai_result.get("json")
if not j or not isinstance(j, dict): if not j or not isinstance(j, dict):
return return
@ -321,7 +375,7 @@ def _enrich_ai_with_dns(ai_result: dict[str, Any]) -> None:
if not isinstance(cat_data, dict): if not isinstance(cat_data, dict):
continue continue
models = cat_data.get("models") or [] models = cat_data.get("models") or []
cat_data["models"] = parser_dns.enrich_models(models, delay_sec=0.4) cat_data["models"] = parsers.enrich_models(models, delay_sec=0.4)
def _handle_test_ai() -> dict[str, Any]: def _handle_test_ai() -> dict[str, Any]:

View File

@ -1,27 +1,132 @@
"""Парсеры маркетплейсов для обогащения карточек моделей. """Парсеры маркетплейсов для обогащения карточек моделей.
Подход MVP: парсим публичные HTML-страницы напрямую с VPS (без прокси). Все парсеры используют общий proxy_pool (Proxy6.net), если PROXY6_TOKEN задан.
При обнаружении anti-bot блокировок переходим на резидентные прокси (Proxy6). Без прокси крупные маркетплейсы РФ (DNS, OZON, Я.Маркет) возвращают 401/307.
Источники: Источники:
- dns.py DNS Shop (dns-shop.ru) самый простой anti-bot, основной источник характеристик - dns.py DNS Shop (характеристики, цена одного магазина)
- yamarket.py Я.Маркет (market.yandex.ru) для сравнения цен между магазинами - wb.py Wildberries (JSON API, цена + отзывы + рейтинг)
- wildberries.py Wildberries (wildberries.ru) для отзывов и рейтингов - ozon.py OZON (composer-api JSON)
- yamarket.py Я.Маркет (HTML + встроенный JSON, сравнение цен)
Унифицированный формат результата: Унифицированный формат результата (item):
{ {
"title": str, # Название как на странице "title": str, # Название как на странице
"url": str, # Ссылка на товар "url": str, # Ссылка на товар
"image_url": str | None, # URL основного фото "image_url": str | None, # URL основного фото
"price_min_rub": int | None, # Минимальная найденная цена "price_min_rub": int | None,
"price_max_rub": int | None, # Максимальная (если есть данные по нескольким магазинам) "price_max_rub": int | None,
"rating": float | None, # 0.0 - 5.0 "rating": float | None, # 0.0 - 5.0
"reviews_count": int | None, # Кол-во отзывов "reviews_count": int | None,
"stores_count": int | None, # На скольких сайтах найдено (Я.Маркет) "stores_count": int | None, # Только Я.Маркет (сравнение)
"specs": dict[str, str], # Ключевые характеристики "specs": dict[str, str],
"source": str, # "dns" / "yamarket" / "wildberries" "source": str, # 'dns' | 'wb' | 'ozon' | 'yamarket'
} }
""" """
from .dns import search_dns from __future__ import annotations
import logging
import time
from typing import Any
__all__ = ["search_dns"] from .dns import search_dns
from .wb import search_wb
from .ozon import search_ozon
from .yamarket import search_yamarket
log = logging.getLogger("zov.parser")
__all__ = ["search_dns", "search_wb", "search_ozon", "search_yamarket",
"enrich_one", "enrich_models"]
def enrich_one(query: str, sources: tuple = ("wb", "yamarket", "ozon", "dns")) -> dict[str, Any]:
"""Спрашивает все указанные источники и объединяет лучшее в единый отчёт.
Возвращает:
{
"wb": {item dict} или None,
"ozon": {item dict} или None,
"yamarket": {item dict} или None,
"dns": {item dict} или None,
"price_min_rub": int | None, # минимум по всем источникам
"price_max_rub": int | None,
"image_url": str | None,
"rating_max": float | None,
"reviews_total": int | None,
"stores_count": int | None, # макс. из yamarket
"best_url": str | None,
}
"""
fetchers = {
"wb": lambda: _safe_first(search_wb, query),
"yamarket": lambda: _safe_first(search_yamarket, query),
"ozon": lambda: _safe_first(search_ozon, query),
"dns": lambda: _safe_first(search_dns, query),
}
items: dict[str, dict] = {}
for src in sources:
fn = fetchers.get(src)
if not fn:
continue
try:
items[src] = fn()
except Exception as e:
log.warning("Source %s failed for %r: %s", src, query, e)
items[src] = None
# Агрегация
prices = [i["price_min_rub"] for i in items.values() if i and i.get("price_min_rub")]
images = [i["image_url"] for i in items.values() if i and i.get("image_url")]
ratings = [i["rating"] for i in items.values() if i and i.get("rating")]
reviews = [i["reviews_count"] for i in items.values() if i and i.get("reviews_count")]
# Я.Маркет даёт количество магазинов
stores = None
if items.get("yamarket") and items["yamarket"].get("stores_count"):
stores = items["yamarket"]["stores_count"]
best_url = None
# Приоритет: yamarket (агрегатор) → wb → ozon → dns
for src in ("yamarket", "wb", "ozon", "dns"):
i = items.get(src)
if i and i.get("url"):
best_url = i["url"]
break
return {
**{src: items.get(src) for src in fetchers.keys()},
"price_min_rub": min(prices) if prices else None,
"price_max_rub": max(prices) if prices else None,
"image_url": images[0] if images else None,
"rating_max": max(ratings) if ratings else None,
"reviews_total": sum(reviews) if reviews else None,
"stores_count": stores,
"best_url": best_url,
}
def enrich_models(models: list[dict[str, Any]], delay_sec: float = 0.5,
sources: tuple = ("wb", "yamarket", "ozon", "dns")) -> list[dict[str, Any]]:
"""Обогащает список моделей от AI данными со всех источников."""
enriched: list[dict[str, Any]] = []
for i, m in enumerate(models):
q = m.get("search_query") or f"{m.get('brand', '')} {m.get('model', '')}".strip()
if not q:
enriched.append({**m, "enriched": None})
continue
try:
data = enrich_one(q, sources=sources)
except Exception as e:
log.warning("Enrich failed for %r: %s", q, e)
data = None
enriched.append({**m, "enriched": data})
if i < len(models) - 1 and delay_sec > 0:
time.sleep(delay_sec)
return enriched
def _safe_first(search_fn, query: str) -> dict[str, Any] | None:
"""Вызывает поиск и возвращает первый результат или None."""
results = search_fn(query, limit=1)
return results[0] if results else None

View File

@ -16,6 +16,8 @@ from urllib.parse import quote_plus
import httpx import httpx
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from .. import proxy_pool
log = logging.getLogger("zov.parser.dns") log = logging.getLogger("zov.parser.dns")
_BASE_URL = "https://www.dns-shop.ru" _BASE_URL = "https://www.dns-shop.ru"
@ -42,31 +44,41 @@ _HEADERS = {
_PRICE_RE = re.compile(r"(\d[\d\s]*)\s*₽") _PRICE_RE = re.compile(r"(\d[\d\s]*)\s*₽")
def search_dns(query: str, limit: int = 1, timeout: float = 12.0) -> list[dict[str, Any]]: def search_dns(query: str, limit: int = 1, timeout: float = 12.0,
max_retries: int = 2) -> list[dict[str, Any]]:
"""Поиск товара на DNS по строке запроса. """Поиск товара на DNS по строке запроса.
Возвращает список результатов (топ-N). Каждый элемент унифицированный Использует Proxy6-пул если PROXY6_TOKEN задан, иначе ходит напрямую.
формат (см. parsers/__init__.py). Пустой список при ошибке. DNS защищён Qrator без прокси скорее всего 401.
Возвращает список результатов (топ-N) или пустой при ошибке.
""" """
url = f"{_SEARCH_URL}?q={quote_plus(query)}" url = f"{_SEARCH_URL}?q={quote_plus(query)}"
log.info("DNS search: %s", url) log.info("DNS search: %s", url)
last_err = None
for attempt in range(max_retries + 1):
try: try:
with httpx.Client(headers=_HEADERS, timeout=timeout, follow_redirects=True) as client: with proxy_pool.proxied_client(timeout=timeout, headers=_HEADERS,
follow_redirects=True) as client:
resp = client.get(url) resp = client.get(url)
except httpx.HTTPError as e: except httpx.HTTPError as e:
log.warning("DNS request failed: %s", e) last_err = e
return [] log.warning("DNS request failed (attempt %d): %s", attempt + 1, e)
continue
if resp.status_code != 200: if resp.status_code == 200:
log.warning("DNS returned %s for query=%r", resp.status_code, query) text = resp.text
return [] if "qrator" in text.lower() or "challenge" in text.lower() or "captcha" in text.lower():
log.warning("DNS Qrator/captcha on attempt %d, rotating proxy", attempt + 1)
continue
return _parse_search_html(text, limit=limit)
if "challenge" in resp.text.lower() or "captcha" in resp.text.lower(): log.warning("DNS returned status=%s on attempt %d", resp.status_code, attempt + 1)
log.warning("DNS anti-bot challenge detected for query=%r", query)
return []
return _parse_search_html(resp.text, limit=limit) log.warning("DNS gave up after %d attempts for query=%r (last_err=%s)",
max_retries + 1, query, last_err)
return []
def _parse_search_html(html: str, limit: int) -> list[dict[str, Any]]: def _parse_search_html(html: str, limit: int) -> list[dict[str, Any]]:

View File

@ -0,0 +1,188 @@
"""Парсер OZON — через composer-api (внутренний JSON API сайта).
OZON отдаёт JSON через `/api/composer-api.bx/page/json/v2?url=/search/?text=`.
JSON содержит вложенные виджеты нас интересует `widgetStates.searchResults...`.
Без прокси возвращает 307/403. Через резидентный РФ-IP проходит.
"""
from __future__ import annotations
import logging
import re
from typing import Any
from urllib.parse import quote_plus
import httpx
from .. import proxy_pool
log = logging.getLogger("zov.parser.ozon")
_BASE_URL = "https://www.ozon.ru"
_API_URL = "https://www.ozon.ru/api/composer-api.bx/page/json/v2"
_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36",
"Accept": "application/json",
"Accept-Language": "ru-RU,ru;q=0.9",
"x-o3-app-name": "dweb_client",
"x-o3-app-version": "release_18.04",
"x-o3-page-type": "search",
"Referer": "https://www.ozon.ru/",
}
_PRICE_RE = re.compile(r"([\d\s]+)\s*₽")
def search_ozon(query: str, limit: int = 3, timeout: float = 15.0,
max_retries: int = 2) -> list[dict[str, Any]]:
"""Поиск товара в OZON через composer-api."""
url_param = f"/search/?text={quote_plus(query)}&from_global=true"
params = {"url": url_param}
for attempt in range(max_retries + 1):
try:
with proxy_pool.proxied_client(timeout=timeout, headers=_HEADERS,
follow_redirects=False) as client:
resp = client.get(_API_URL, params=params)
except httpx.HTTPError as e:
log.warning("OZON request failed (attempt %d): %s", attempt + 1, e)
continue
if resp.status_code in (301, 302, 307, 308):
log.info("OZON redirect %s, rotating proxy", resp.status_code)
continue
if resp.status_code != 200:
log.warning("OZON returned status=%s", resp.status_code)
continue
try:
data = resp.json()
except Exception as e:
log.warning("OZON JSON parse failed: %s", e)
continue
return _extract_products(data, limit=limit)
log.warning("OZON gave up after %d attempts for query=%r", max_retries + 1, query)
return []
def _extract_products(data: dict, limit: int) -> list[dict[str, Any]]:
"""OZON прячет данные в widgetStates — ищем все ключи с 'searchResultsV2'."""
widget_states = data.get("widgetStates") or {}
products: list[dict[str, Any]] = []
for key, raw in widget_states.items():
if "searchResultsV2" not in key and "skuGrid" not in key and "searchCategories" not in key:
continue
try:
import json as _j
w = _j.loads(raw) if isinstance(raw, str) else raw
except Exception:
continue
items = w.get("items") or w.get("products") or []
for it in items:
if len(products) >= limit:
break
item = _build_item(it)
if item:
products.append(item)
if len(products) >= limit:
break
return products
def _build_item(it: dict[str, Any]) -> dict[str, Any] | None:
"""Парсит карточку товара из OZON widget items[]."""
# Структура: { mainState: [...], action: { link: '/product/...' }, images: [...] }
sku = it.get("sku") or it.get("id")
if not sku:
return None
link = (it.get("action") or {}).get("link") or ""
url = f"{_BASE_URL}{link}" if link.startswith("/") else link
# Картинка
image_url = None
imgs = it.get("images") or it.get("tileImage") or []
if isinstance(imgs, list) and imgs:
first = imgs[0]
image_url = first if isinstance(first, str) else (first.get("image") or first.get("src"))
if not image_url:
ti = it.get("tileImage") or {}
if isinstance(ti, dict):
items = ti.get("items") or []
for x in items:
if isinstance(x, dict) and x.get("image"):
image_url = x["image"].get("link") or x["image"].get("src")
break
# Цена и название — берём из mainState текстовых атомов
title = ""
price_min = None
price_max = None
rating = None
reviews = None
for atom in (it.get("mainState") or []):
atom_id = atom.get("id") or ""
atom_type = atom.get("type") or ""
if atom_type == "textAtom":
text = ((atom.get("textAtom") or {}).get("text") or "").strip()
if "name" in atom_id.lower() and not title:
title = re.sub(r"<[^>]+>", "", text)
elif "price" in atom_id.lower():
m = _PRICE_RE.search(text)
if m and not price_min:
price_min = int(m.group(1).replace(" ", "").replace(" ", ""))
elif atom_type == "priceV2":
pv = atom.get("priceV2") or {}
for price_obj in (pv.get("price") or []):
t = (price_obj.get("text") or "").strip()
m = _PRICE_RE.search(t)
if m:
val = int(m.group(1).replace(" ", "").replace(" ", ""))
if price_min is None or val < price_min:
price_min = val
if price_max is None or val > price_max:
price_max = val
elif atom_type == "labelList":
for lbl in ((atom.get("labelList") or {}).get("items") or []):
t = (lbl.get("title") or "").strip()
# Рейтинг типа "4.7"
if re.fullmatch(r"\d\.\d", t):
rating = float(t)
# Отзывы типа "1242 отзыва"
m = re.search(r"(\d[\d\s]*)\s*(?:отзыв|оценок)", t)
if m:
reviews = int(m.group(1).replace(" ", ""))
if not title:
# Резервный фолбэк — могут быть атомы в otherState
for atom in (it.get("otherState") or []):
text = ((atom.get("textAtom") or {}).get("text") or "").strip()
if text and len(text) > 5:
title = re.sub(r"<[^>]+>", "", text)
break
if not title:
return None
return {
"title": title,
"url": url,
"image_url": image_url,
"price_min_rub": price_min,
"price_max_rub": price_max if price_max and price_max != price_min else None,
"rating": rating,
"reviews_count": reviews,
"stores_count": None,
"specs": {},
"source": "ozon",
}

View File

@ -0,0 +1,135 @@
"""Парсер Wildberries — через их JSON API.
Endpoint search.wb.ru отдаёт чистый JSON с товарами. Цены в копейках/u
(делим на 100). У товаров есть rating, feedbacks (отзывы), brand.
Цена /salePriceU/ итоговая со скидкой, /priceU/ RRP.
"""
from __future__ import annotations
import logging
from typing import Any
from urllib.parse import quote_plus
import httpx
from .. import proxy_pool
log = logging.getLogger("zov.parser.wb")
_SEARCH_URL = "https://search.wb.ru/exactmatch/ru/common/v9/search"
_DEFAULT_PARAMS = {
"TestGroup": "no_test",
"TestID": "no_test",
"appType": "1",
"curr": "rub",
"dest": "-1257786", # Москва, можно поменять
"resultset": "catalog",
"sort": "popular",
"spp": "30",
"suppressSpellcheck": "false",
}
_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36",
"Accept": "*/*",
"Accept-Language": "ru-RU,ru;q=0.9",
"Origin": "https://www.wildberries.ru",
"Referer": "https://www.wildberries.ru/",
}
def search_wb(query: str, limit: int = 3, timeout: float = 12.0,
max_retries: int = 2) -> list[dict[str, Any]]:
params = {**_DEFAULT_PARAMS, "query": query}
for attempt in range(max_retries + 1):
try:
with proxy_pool.proxied_client(timeout=timeout, headers=_HEADERS) as client:
resp = client.get(_SEARCH_URL, params=params)
except httpx.HTTPError as e:
log.warning("WB request failed (attempt %d): %s", attempt + 1, e)
continue
if resp.status_code == 429:
log.warning("WB rate-limited on attempt %d, rotating proxy", attempt + 1)
continue
if resp.status_code != 200:
log.warning("WB returned status=%s", resp.status_code)
continue
try:
data = resp.json()
except Exception as e:
log.warning("WB JSON parse failed: %s", e)
continue
products = (data.get("data") or {}).get("products") or []
if not products:
log.info("WB no products for query=%r", query)
return []
results: list[dict[str, Any]] = []
for p in products[:limit]:
results.append(_build_item(p))
return results
log.warning("WB gave up after %d attempts for query=%r", max_retries + 1, query)
return []
def _build_item(p: dict[str, Any]) -> dict[str, Any]:
sale_u = p.get("salePriceU") or 0
price_u = p.get("priceU") or 0
# WB цена в копейках (или /100). Старое поле было в копейках, иногда в условных единицах.
# Делим на 100 — стандартный паттерн.
price_min = (sale_u // 100) if sale_u else (price_u // 100 if price_u else None)
price_max = (price_u // 100) if price_u and price_u != sale_u else None
# Если у товара есть варианты sizes — берём минимальную цену оттуда
sizes = p.get("sizes") or []
if sizes:
size_prices = []
for s in sizes:
sp = (s.get("price") or {}).get("product") or 0
if sp:
size_prices.append(sp // 100)
if size_prices:
price_min = min(size_prices)
if len(size_prices) > 1:
price_max = max(size_prices)
pid = p.get("id")
image_url = _build_image_url(pid) if pid else None
return {
"title": p.get("name") or "",
"url": f"https://www.wildberries.ru/catalog/{pid}/detail.aspx" if pid else "",
"image_url": image_url,
"price_min_rub": price_min,
"price_max_rub": price_max if price_max and price_max != price_min else None,
"rating": p.get("reviewRating") or p.get("rating"),
"reviews_count": p.get("feedbacks"),
"stores_count": None,
"specs": {
"brand": p.get("brand", ""),
"supplier": p.get("supplier", ""),
},
"source": "wb",
}
def _build_image_url(product_id: int) -> str:
"""WB хранит фото на nm-1..20.wbbasket.ru. URL зависит от диапазона id."""
pid = int(product_id)
short = pid // 100000
# Маппинг WB корзин (упрощённый)
if pid < 144_000_000: basket = (short // 1431) + 1
elif pid < 287_000_000: basket = (short // 1431) + 1
else: basket = (short // 1431) + 1
# Безопасный fallback — basket 10 покрывает почти все ID
if basket < 1 or basket > 25:
basket = 10
bn = str(basket).zfill(2)
vol = pid // 100000
part = pid // 1000
return f"https://basket-{bn}.wbbasket.ru/vol{vol}/part{part}/{pid}/images/big/1.webp"

View File

@ -0,0 +1,236 @@
"""Парсер Я.Маркета — HTML страница поиска.
Я.Маркет защищён Qrator. Через резидентный РФ-IP + правильные заголовки
+ cookies на сессию обычно проходит. Без прокси 401.
Из HTML вытаскиваем JSON, который Я.Маркет встраивает в <script type="application/json">.
"""
from __future__ import annotations
import json
import logging
import re
from typing import Any
from urllib.parse import quote_plus
import httpx
from bs4 import BeautifulSoup
from .. import proxy_pool
log = logging.getLogger("zov.parser.yamarket")
_BASE_URL = "https://market.yandex.ru"
_SEARCH_URL = "https://market.yandex.ru/search"
_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "ru-RU,ru;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
}
_PRICE_RE = re.compile(r"(\d[\d\s]*)\s*₽")
def search_yamarket(query: str, limit: int = 3, timeout: float = 20.0,
max_retries: int = 2) -> list[dict[str, Any]]:
"""Поиск товара в Я.Маркете. Возвращает топ-N с ценами и кол-вом магазинов."""
params = {"text": query, "cvredirect": "2"}
for attempt in range(max_retries + 1):
try:
with proxy_pool.proxied_client(timeout=timeout, headers=_HEADERS,
follow_redirects=True) as client:
resp = client.get(_SEARCH_URL, params=params)
except httpx.HTTPError as e:
log.warning("YaMarket request failed (attempt %d): %s", attempt + 1, e)
continue
if resp.status_code != 200:
log.warning("YaMarket status=%s on attempt %d", resp.status_code, attempt + 1)
continue
text = resp.text
if "qrator" in text.lower() or "showcaptcha" in text.lower():
log.warning("YaMarket Qrator/captcha on attempt %d, rotating proxy", attempt + 1)
continue
return _parse_html(text, limit=limit)
log.warning("YaMarket gave up after %d attempts for query=%r", max_retries + 1, query)
return []
def _parse_html(html: str, limit: int) -> list[dict[str, Any]]:
soup = BeautifulSoup(html, "html.parser")
results: list[dict[str, Any]] = []
# Я.Маркет встраивает данные в JSON внутри скриптов
for script in soup.find_all("script", type="application/json"):
data = _try_json(script.string or "")
if not data:
continue
# Структуры разные; ищем массив с offers/products
items = _find_products(data)
for it in items:
if len(results) >= limit:
break
item = _build_item(it)
if item:
results.append(item)
if len(results) >= limit:
break
# Резервный путь — карточки прямо в HTML
if not results:
cards = soup.select("[data-zone-name='snippet-card'], [data-baobab-name='card']")
for card in cards:
if len(results) >= limit:
break
item = _extract_html_card(card)
if item:
results.append(item)
return results
def _find_products(data: Any, _depth: int = 0) -> list[dict]:
"""Рекурсивно ищем массив товаров в JSON Я.Маркета."""
if _depth > 8:
return []
if isinstance(data, list):
# Эвристика: список объектов с offers/price/title
if data and isinstance(data[0], dict) and (
data[0].get("offers") or data[0].get("prices") or data[0].get("titles")
):
return data
for item in data:
found = _find_products(item, _depth + 1)
if found:
return found
elif isinstance(data, dict):
for v in data.values():
found = _find_products(v, _depth + 1)
if found:
return found
return []
def _build_item(p: dict) -> dict[str, Any] | None:
title_obj = p.get("titles") or {}
title = (title_obj.get("raw") if isinstance(title_obj, dict) else "") or p.get("title", "")
if not title:
return None
url_obj = p.get("url") or p.get("urls", {}).get("encrypted", "")
url = url_obj if isinstance(url_obj, str) else ""
if url and url.startswith("/"):
url = f"{_BASE_URL}{url}"
pic = ""
pictures = p.get("pictures") or []
if pictures and isinstance(pictures, list):
pic_obj = pictures[0]
if isinstance(pic_obj, dict):
pic = pic_obj.get("original", {}).get("url") or pic_obj.get("url") or ""
# Цена + кол-во магазинов
prices = p.get("prices") or p.get("offers") or {}
price_min = price_max = None
stores = None
if isinstance(prices, dict):
price_min = _try_int(prices.get("min", {}).get("value") if isinstance(prices.get("min"), dict) else prices.get("min"))
price_max = _try_int(prices.get("max", {}).get("value") if isinstance(prices.get("max"), dict) else prices.get("max"))
stores = _try_int(prices.get("count") or prices.get("offersCount"))
rating = _try_float((p.get("rating") or {}).get("value") if isinstance(p.get("rating"), dict) else p.get("rating"))
reviews = _try_int((p.get("reviews") or {}).get("count") if isinstance(p.get("reviews"), dict) else p.get("reviews"))
return {
"title": re.sub(r"<[^>]+>", "", title).strip(),
"url": url,
"image_url": pic,
"price_min_rub": price_min,
"price_max_rub": price_max if price_max and price_max != price_min else None,
"rating": rating,
"reviews_count": reviews,
"stores_count": stores,
"specs": {},
"source": "yamarket",
}
def _extract_html_card(card) -> dict[str, Any] | None:
"""Резервный парсинг HTML-карточки если JSON не нашёлся."""
title_el = card.select_one("[data-zone-name='title'] span, h3, [class*='Title']")
if not title_el:
return None
title = title_el.get_text(strip=True)
price_el = card.select_one("[data-auto='snippet-price-current'], [class*='Price']")
price = None
if price_el:
m = _PRICE_RE.search(price_el.get_text(" ", strip=True))
if m:
price = _try_int(m.group(1).replace(" ", ""))
img_el = card.select_one("img[srcset], img[src]")
img_url = ""
if img_el:
src = img_el.get("src") or img_el.get("data-src") or ""
if src.startswith("//"):
src = "https:" + src
img_url = src
link_el = card.select_one("a[href*='/product--'], a[data-baobab-name='title']")
url = ""
if link_el:
href = link_el.get("href") or ""
url = href if href.startswith("http") else f"{_BASE_URL}{href}"
if not title:
return None
return {
"title": title,
"url": url,
"image_url": img_url,
"price_min_rub": price,
"price_max_rub": None,
"rating": None,
"reviews_count": None,
"stores_count": None,
"specs": {},
"source": "yamarket",
}
def _try_int(v: Any) -> int | None:
if v is None:
return None
try:
return int(float(str(v).replace(" ", "").replace(",", ".")))
except (ValueError, TypeError):
return None
def _try_float(v: Any) -> float | None:
if v is None:
return None
try:
return float(str(v).replace(" ", "").replace(",", "."))
except (ValueError, TypeError):
return None
def _try_json(s: str) -> Any:
try:
return json.loads(s)
except (ValueError, TypeError):
return None

View File

@ -0,0 +1,102 @@
"""Proxy6.net pool — динамическая загрузка купленных прокси, ротация.
Конфиг:
PROXY6_TOKEN API-ключ Proxy6 (https://proxy6.net/user/developers)
Если пусто прокси не используется (прямые HTTP-запросы).
Использование:
from . import proxy_pool
with proxy_pool.proxied_client(timeout=15) as client:
r = client.get(url, headers=headers)
"""
from __future__ import annotations
import logging
import random
import threading
import time
from typing import Optional
import httpx
from .config import get_config
log = logging.getLogger("zov.proxy")
_API_URL = "https://proxy6.net/api"
_POOL_TTL_SEC = 600 # обновляем пул каждые 10 минут
_lock = threading.Lock()
_pool: list[str] = [] # ["http://user:pass@host:port", ...]
_pool_loaded_at: float = 0.0
def _load_pool(force: bool = False) -> list[str]:
"""Загружает активные прокси из Proxy6 API. Кэшируется на _POOL_TTL_SEC."""
global _pool, _pool_loaded_at
with _lock:
now = time.time()
if not force and _pool and now - _pool_loaded_at < _POOL_TTL_SEC:
return _pool
token = get_config().proxy6_token
if not token:
return _pool # без токена — пустой пул, парсеры пойдут напрямую
try:
with httpx.Client(timeout=10.0) as client:
r = client.get(f"{_API_URL}/{token}/getproxy", params={"state": "active"})
data = r.json()
except Exception as e:
log.warning("Proxy6 API request failed: %s", e)
return _pool
if data.get("status") != "yes":
log.warning("Proxy6 returned status=%s, error=%s",
data.get("status"), data.get("error"))
return _pool
proxies: list[str] = []
for _, p in (data.get("list") or {}).items():
if str(p.get("active")) != "1":
continue
proto = (p.get("type") or "http").lower()
# Proxy6 возвращает 'socks' для SOCKS5
if proto == "socks":
proto = "socks5"
host = p.get("host") or p.get("ip")
port = p.get("port")
user = p.get("user")
pwd = p.get("pass")
if not (host and port and user and pwd):
continue
proxies.append(f"{proto}://{user}:{pwd}@{host}:{port}")
_pool = proxies
_pool_loaded_at = now
log.info("Proxy6 pool loaded: %d active proxies", len(_pool))
return _pool
def get_random_proxy() -> Optional[str]:
"""Возвращает случайный прокси из пула, или None если пул пуст."""
pool = _load_pool()
if not pool:
return None
return random.choice(pool)
def proxied_client(timeout: float = 15.0, **client_kwargs) -> httpx.Client:
"""httpx.Client с рандомным прокси из пула (или прямой если пул пуст)."""
proxy = get_random_proxy()
if proxy:
return httpx.Client(proxy=proxy, timeout=timeout, **client_kwargs)
return httpx.Client(timeout=timeout, **client_kwargs)
def pool_status() -> dict:
"""Для диагностики — текущее состояние пула."""
pool = _load_pool()
return {
"count": len(pool),
"loaded_age_sec": int(time.time() - _pool_loaded_at) if _pool_loaded_at else None,
"token_configured": bool(get_config().proxy6_token),
}