backend: Playwright + Chromium for JS-rendered sites (Я.Маркет, OZON fallback)

DOCKERFILE:
- + Chromium system deps (libnss3, libxkbcommon0, libgbm1, libgtk-3-0, etc.)
- + RUN python -m playwright install chromium (~150MB)
- + ENV PLAYWRIGHT_BROWSERS_PATH

REQUIREMENTS:
- + playwright >= 1.45

PARSERS:
- new playwright_engine.py — singleton browser, isolated context per request,
  blocks images/fonts/CSS to save memory, waits for selector + JS hydration
- yamarket.py — rewritten to use Playwright (Я.Маркет is React SPA)
- ozon.py — Playwright fallback when composer-api returns challenge (403)
- wb.py — exponential backoff on 429, still uses direct HTTP (JSON API, no JS needed)

STRATEGY (Hybrid Path C):
- Я.Маркет: Playwright (rendering JS)
- OZON: composer-api first, Playwright fallback
- WB: direct HTTP with backoff (JSON API, fast)
- DNS: kept but lower priority (Qrator hard to crack)
- No more proxy needed for primary path

DEPLOY: removed PROXY_STATIC_LIST from .env, expect ~5min for first build (Chromium download)
This commit is contained in:
wasrusgen 2026-05-11 13:25:05 +03:00
parent 3ee5275ea0
commit d5f290bd0a
6 changed files with 374 additions and 193 deletions

View File

@ -1,9 +1,16 @@
FROM python:3.12-slim
# НУЦ Минцифры root CA — для GigaChat SSL.
# Скачиваем актуальный bundle на этапе сборки и добавляем в системный trust store.
# + системные пакеты для Playwright/Chromium (рендеринг JS-сайтов).
RUN apt-get update \
&& apt-get install -y --no-install-recommends ca-certificates curl \
&& apt-get install -y --no-install-recommends \
ca-certificates curl \
# Chromium dependencies for Playwright
libnss3 libnspr4 libatk-bridge2.0-0 libatk1.0-0 libcups2 \
libxkbcommon0 libxcomposite1 libxdamage1 libxfixes3 libxrandr2 \
libgbm1 libgtk-3-0 libasound2 libpango-1.0-0 libcairo2 \
libdbus-1-3 libdrm2 libxshmfence1 \
fonts-liberation fonts-noto-color-emoji \
&& curl -fsSL -o /usr/local/share/ca-certificates/russian_trusted_root_ca.crt \
https://gu-st.ru/content/Other/doc/russian_trusted_root_ca.cer \
&& curl -fsSL -o /usr/local/share/ca-certificates/russian_trusted_sub_ca.crt \
@ -16,6 +23,9 @@ WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Скачиваем только Chromium (без firefox/webkit) — ~150MB
RUN python -m playwright install chromium
COPY app /app/app
# httpx по умолчанию использует certifi → принудительно указываем системный bundle,
@ -23,6 +33,9 @@ COPY app /app/app
ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt
ENV REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt
# Playwright кэш-каталог браузеров
ENV PLAYWRIGHT_BROWSERS_PATH=/root/.cache/ms-playwright
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--proxy-headers", "--forwarded-allow-ips=*"]

View File

@ -12,8 +12,10 @@ from typing import Any
from urllib.parse import quote_plus
import httpx
from bs4 import BeautifulSoup
from .. import proxy_pool
from . import playwright_engine
log = logging.getLogger("zov.parser.ozon")
@ -34,38 +36,110 @@ _HEADERS = {
_PRICE_RE = re.compile(r"([\d\s]+)\s*₽")
def search_ozon(query: str, limit: int = 3, timeout: float = 15.0,
max_retries: int = 2) -> list[dict[str, Any]]:
"""Поиск товара в OZON через composer-api."""
def search_ozon(query: str, limit: int = 3, timeout: float = 30.0,
max_retries: int = 1, use_playwright: bool = True) -> list[dict[str, Any]]:
"""Поиск товара в OZON.
Сначала пробуем composer-api JSON (быстро), при challenge Playwright (медленно но точно).
"""
# Путь 1: быстрый composer-api
url_param = f"/search/?text={quote_plus(query)}&from_global=true"
params = {"url": url_param}
for attempt in range(max_retries + 1):
try:
with proxy_pool.proxied_client(timeout=timeout, headers=_HEADERS,
follow_redirects=False) as client:
resp = client.get(_API_URL, params=params)
except httpx.HTTPError as e:
log.warning("OZON request failed (attempt %d): %s", attempt + 1, e)
continue
if resp.status_code in (301, 302, 307, 308):
log.info("OZON redirect %s, rotating proxy", resp.status_code)
continue
if resp.status_code != 200:
log.warning("OZON returned status=%s", resp.status_code)
continue
if resp.status_code == 200:
try:
data = resp.json()
except Exception as e:
log.warning("OZON JSON parse failed: %s", e)
return _extract_products(resp.json(), limit=limit)
except Exception:
pass
log.debug("OZON composer-api attempt %d: status=%s", attempt + 1, resp.status_code)
except httpx.HTTPError as e:
log.debug("OZON composer-api err: %s", e)
# Путь 2: Playwright (рендерим обычную HTML-страницу поиска)
if not use_playwright:
return []
log.info("OZON falling back to Playwright for query=%r", query)
page_url = f"{_BASE_URL}/search/?text={quote_plus(query)}"
html = playwright_engine.fetch_page(
page_url,
wait_selector="a[href*='/product/'], [data-widget='searchResultsV2']",
wait_ms=3500,
timeout_ms=int(timeout * 1000),
)
if not html:
return []
return _parse_html_via_dom(html, limit=limit)
def _parse_html_via_dom(html: str, limit: int) -> list[dict[str, Any]]:
"""Fallback: парсим товары из отрендеренного Chrome HTML."""
soup = BeautifulSoup(html, "html.parser")
seen = set()
results: list[dict[str, Any]] = []
for link in soup.select("a[href*='/product/']"):
if len(results) >= limit:
break
href = link.get("href") or ""
if href in seen:
continue
seen.add(href)
# Поднимаемся до карточки
card = link.find_parent("div") or link
title = link.get_text(strip=True) or (card.select_one("span") or {}).get_text(strip=True) if hasattr(card.select_one("span"), "get_text") else ""
if not title or len(title) < 5:
continue
return _extract_products(data, limit=limit)
url = href if href.startswith("http") else f"{_BASE_URL}{href}"
url = url.split("?")[0]
log.warning("OZON gave up after %d attempts for query=%r", max_retries + 1, query)
return []
# Цена в ближайшем родителе
price = None
price_card = link.find_parent("div", recursive=True)
if price_card:
txt = price_card.get_text(" ", strip=True)
m = _PRICE_RE.search(txt)
if m:
price = _try_int(m.group(1).replace(" ", ""))
# Картинка в карточке
img = None
img_el = card.find("img") if card else None
if img_el:
src = img_el.get("src") or ""
if src.startswith("//"):
src = "https:" + src
if src and "data:image" not in src:
img = src
results.append({
"title": title[:200],
"url": url,
"image_url": img,
"price_min_rub": price,
"price_max_rub": None,
"rating": None,
"reviews_count": None,
"stores_count": None,
"specs": {},
"source": "ozon",
})
return results
def _try_int(v: Any) -> int | None:
if v is None:
return None
try:
return int(float(str(v).replace(" ", "").replace(",", ".")))
except (ValueError, TypeError):
return None
def _extract_products(data: dict, limit: int) -> list[dict[str, Any]]:

View File

@ -0,0 +1,129 @@
"""Singleton Playwright + Chromium для парсинга JS-сайтов.
Использование:
from .playwright_engine import fetch_page
html = fetch_page("https://market.yandex.ru/search?text=Bosch+KGN39")
Зачем синглтон: запуск Chromium ~2-3 сек. Держим один экземпляр, открываем
изолированный контекст (cookies/storage) на каждый запрос.
"""
from __future__ import annotations
import logging
import threading
from typing import Optional
log = logging.getLogger("zov.parser.playwright")
_lock = threading.Lock()
_playwright = None
_browser = None
def _get_browser():
"""Возвращает singleton Chromium browser. Инициализирует при первом обращении."""
global _playwright, _browser
with _lock:
if _browser is not None and _browser.is_connected():
return _browser
try:
from playwright.sync_api import sync_playwright
except ImportError as e:
log.error("Playwright not installed: %s", e)
return None
try:
_playwright = sync_playwright().start()
_browser = _playwright.chromium.launch(
headless=True,
args=[
"--no-sandbox",
"--disable-blink-features=AutomationControlled",
"--disable-dev-shm-usage", # snug на маленькой памяти
"--disable-gpu",
],
)
log.info("Playwright Chromium started")
return _browser
except Exception as e:
log.error("Failed to start Playwright: %s", e)
_playwright = None
_browser = None
return None
def fetch_page(url: str, wait_selector: Optional[str] = None,
wait_ms: int = 3000, timeout_ms: int = 25000,
user_agent: Optional[str] = None) -> Optional[str]:
"""Открывает страницу через headless Chromium, ждёт пока JS отрендерит,
возвращает текущий HTML.
Args:
url: целевой URL
wait_selector: если задан ждём пока этот CSS-селектор появится
wait_ms: фиксированная задержка после загрузки (для JS-hydration)
timeout_ms: общий таймаут навигации
user_agent: переопределить UA (по умолчанию используется playwright-овский)
"""
browser = _get_browser()
if not browser:
return None
ctx = None
page = None
try:
ctx = browser.new_context(
user_agent=user_agent or
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36",
viewport={"width": 1280, "height": 800},
locale="ru-RU",
extra_http_headers={
"Accept-Language": "ru-RU,ru;q=0.9,en;q=0.8",
},
)
page = ctx.new_page()
# Блокируем тяжёлые ресурсы — экономим время/память
def _route(route):
rt = route.request.resource_type
if rt in ("image", "font", "media", "stylesheet"):
return route.abort()
return route.continue_()
page.route("**/*", _route)
page.goto(url, timeout=timeout_ms, wait_until="domcontentloaded")
if wait_selector:
try:
page.wait_for_selector(wait_selector, timeout=wait_ms + 5000)
except Exception:
log.debug("wait_selector %s not found, continuing", wait_selector)
else:
page.wait_for_timeout(wait_ms)
html = page.content()
return html
except Exception as e:
log.warning("fetch_page failed for %s: %s", url, e)
return None
finally:
if page:
try: page.close()
except: pass
if ctx:
try: ctx.close()
except: pass
def shutdown():
"""Закрывает браузер при остановке приложения."""
global _playwright, _browser
with _lock:
if _browser:
try: _browser.close()
except: pass
_browser = None
if _playwright:
try: _playwright.stop()
except: pass
_playwright = None

View File

@ -40,38 +40,44 @@ _HEADERS = {
def search_wb(query: str, limit: int = 3, timeout: float = 12.0,
max_retries: int = 2) -> list[dict[str, Any]]:
"""WB через прямой JSON API. Делает экспоненциальный backoff при 429."""
import time
params = {**_DEFAULT_PARAMS, "query": query}
backoff = 2.0
for attempt in range(max_retries + 1):
try:
# Используем прямое подключение (без прокси) — WB лимитирует per-IP,
# но 1 запрос/несколько секунд проходит
with proxy_pool.proxied_client(timeout=timeout, headers=_HEADERS) as client:
resp = client.get(_SEARCH_URL, params=params)
except httpx.HTTPError as e:
log.warning("WB request failed (attempt %d): %s", attempt + 1, e)
time.sleep(backoff)
backoff *= 2
continue
if resp.status_code == 429:
log.warning("WB rate-limited on attempt %d, rotating proxy", attempt + 1)
log.warning("WB rate-limited on attempt %d, sleeping %.1fs", attempt + 1, backoff)
time.sleep(backoff)
backoff *= 2
continue
if resp.status_code != 200:
log.warning("WB returned status=%s", resp.status_code)
continue
return []
try:
data = resp.json()
except Exception as e:
log.warning("WB JSON parse failed: %s", e)
continue
return []
products = (data.get("data") or {}).get("products") or []
if not products:
log.info("WB no products for query=%r", query)
return []
results: list[dict[str, Any]] = []
for p in products[:limit]:
results.append(_build_item(p))
return results
return [_build_item(p) for p in products[:limit]]
log.warning("WB gave up after %d attempts for query=%r", max_retries + 1, query)
return []

View File

@ -1,162 +1,171 @@
"""Парсер Я.Маркета — HTML страница поиска.
"""Парсер Я.Маркета — через Playwright (рендер JS).
Я.Маркет защищён Qrator. Через резидентный РФ-IP + правильные заголовки
+ cookies на сессию обычно проходит. Без прокси 401.
Я.Маркет SPA на React, товары подгружаются через XHR после первой загрузки.
Простой HTTP-запрос не вернёт каталог. Поэтому используем headless Chromium.
Из HTML вытаскиваем JSON, который Я.Маркет встраивает в <script type="application/json">.
Ждём пока в DOM появятся карточки `[data-zone-name="snippet-card"]` или
`a[href*="/product--"]`, потом извлекаем данные.
"""
from __future__ import annotations
import json
import logging
import re
from typing import Any
from urllib.parse import quote_plus
import httpx
from bs4 import BeautifulSoup
from .. import proxy_pool
from . import playwright_engine
log = logging.getLogger("zov.parser.yamarket")
_BASE_URL = "https://market.yandex.ru"
_SEARCH_URL = "https://market.yandex.ru/search"
_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "ru-RU,ru;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
}
_PRICE_RE = re.compile(r"(\d[\d\s]*)\s*₽")
_PRICE_RE = re.compile(r"([\d\s]+)\s*₽")
def search_yamarket(query: str, limit: int = 3, timeout: float = 20.0,
max_retries: int = 2) -> list[dict[str, Any]]:
"""Поиск товара в Я.Маркете. Возвращает топ-N с ценами и кол-вом магазинов."""
params = {"text": query, "cvredirect": "2"}
def search_yamarket(query: str, limit: int = 3, timeout: float = 30.0,
max_retries: int = 1) -> list[dict[str, Any]]:
"""Поиск товара в Я.Маркете через headless Chromium."""
url = f"{_BASE_URL}/search?text={quote_plus(query)}"
html = None
for attempt in range(max_retries + 1):
try:
with proxy_pool.proxied_client(timeout=timeout, headers=_HEADERS,
follow_redirects=True) as client:
resp = client.get(_SEARCH_URL, params=params)
except httpx.HTTPError as e:
log.warning("YaMarket request failed (attempt %d): %s", attempt + 1, e)
continue
html = playwright_engine.fetch_page(
url,
# Ждём появления товарных ссылок или контейнера выдачи
wait_selector="a[href*='/product--'], [data-auto='SerpItem'], [data-zone-name='snippet-card']",
wait_ms=3500,
timeout_ms=int(timeout * 1000),
)
if html:
break
if resp.status_code != 200:
log.warning("YaMarket status=%s on attempt %d", resp.status_code, attempt + 1)
continue
text = resp.text
if "qrator" in text.lower() or "showcaptcha" in text.lower():
log.warning("YaMarket Qrator/captcha on attempt %d, rotating proxy", attempt + 1)
continue
return _parse_html(text, limit=limit)
log.warning("YaMarket gave up after %d attempts for query=%r", max_retries + 1, query)
if not html:
log.warning("YaMarket: no HTML for query=%r", query)
return []
if "showcaptcha" in html.lower() or "qrator" in html.lower()[:5000]:
log.warning("YaMarket: Qrator/captcha for query=%r", query)
return []
return _parse_html(html, limit=limit)
def _parse_html(html: str, limit: int) -> list[dict[str, Any]]:
soup = BeautifulSoup(html, "html.parser")
results: list[dict[str, Any]] = []
# Я.Маркет встраивает данные в JSON внутри скриптов
for script in soup.find_all("script", type="application/json"):
data = _try_json(script.string or "")
if not data:
continue
# Структуры разные; ищем массив с offers/products
items = _find_products(data)
for it in items:
if len(results) >= limit:
break
item = _build_item(it)
if item:
results.append(item)
if len(results) >= limit:
break
# Основной селектор — товарные карточки на странице поиска
candidates = (
soup.select("[data-auto='SerpItem']")
or soup.select("[data-zone-name='snippet-card']")
or soup.select("article[data-baobab-name='card']")
or soup.select("article:has(a[href*='/product--'])")
)
# Резервный путь — карточки прямо в HTML
if not results:
cards = soup.select("[data-zone-name='snippet-card'], [data-baobab-name='card']")
for card in cards:
for card in candidates:
if len(results) >= limit:
break
item = _extract_html_card(card)
item = _extract_card(card)
if item:
results.append(item)
# Резерв — собрать по найденным ссылкам product--
if not results:
seen = set()
for a in soup.select("a[href*='/product--']")[:limit * 2]:
href = a.get("href") or ""
if href in seen:
continue
seen.add(href)
# Берём родительский article как карточку
card = a.find_parent("article") or a.find_parent("div")
if card:
item = _extract_card(card)
if item:
results.append(item)
if len(results) >= limit:
break
return results
def _find_products(data: Any, _depth: int = 0) -> list[dict]:
"""Рекурсивно ищем массив товаров в JSON Я.Маркета."""
if _depth > 8:
return []
if isinstance(data, list):
# Эвристика: список объектов с offers/price/title
if data and isinstance(data[0], dict) and (
data[0].get("offers") or data[0].get("prices") or data[0].get("titles")
):
return data
for item in data:
found = _find_products(item, _depth + 1)
if found:
return found
elif isinstance(data, dict):
for v in data.values():
found = _find_products(v, _depth + 1)
if found:
return found
return []
def _extract_card(card) -> dict[str, Any] | None:
"""Достаём заголовок, ссылку, цену, рейтинг, отзывы, фото, кол-во магазинов."""
link_el = (
card.select_one("a[href*='/product--']")
or card.select_one("a[data-baobab-name='title']")
)
if not link_el:
return None
href = link_el.get("href") or ""
url = href if href.startswith("http") else f"{_BASE_URL}{href}"
def _build_item(p: dict) -> dict[str, Any] | None:
title_obj = p.get("titles") or {}
title = (title_obj.get("raw") if isinstance(title_obj, dict) else "") or p.get("title", "")
title_el = (
card.select_one("[data-zone-name='title'] span")
or card.select_one("h3 span")
or card.select_one("[data-auto='snippet-title']")
or link_el
)
title = title_el.get_text(strip=True) if title_el else (link_el.get_text(strip=True))
if not title:
return None
url_obj = p.get("url") or p.get("urls", {}).get("encrypted", "")
url = url_obj if isinstance(url_obj, str) else ""
if url and url.startswith("/"):
url = f"{_BASE_URL}{url}"
pic = ""
pictures = p.get("pictures") or []
if pictures and isinstance(pictures, list):
pic_obj = pictures[0]
if isinstance(pic_obj, dict):
pic = pic_obj.get("original", {}).get("url") or pic_obj.get("url") or ""
# Цена + кол-во магазинов
prices = p.get("prices") or p.get("offers") or {}
# Цена
price_min = price_max = None
stores = None
if isinstance(prices, dict):
price_min = _try_int(prices.get("min", {}).get("value") if isinstance(prices.get("min"), dict) else prices.get("min"))
price_max = _try_int(prices.get("max", {}).get("value") if isinstance(prices.get("max"), dict) else prices.get("max"))
stores = _try_int(prices.get("count") or prices.get("offersCount"))
price_el = (
card.select_one("[data-auto='snippet-price-current']")
or card.select_one("[data-auto='price-value']")
or card.select_one("[class*='Price']")
)
if price_el:
m = _PRICE_RE.search(price_el.get_text(" ", strip=True))
if m:
price_min = _try_int(m.group(1).replace(" ", "").replace(" ", ""))
rating = _try_float((p.get("rating") or {}).get("value") if isinstance(p.get("rating"), dict) else p.get("rating"))
reviews = _try_int((p.get("reviews") or {}).get("count") if isinstance(p.get("reviews"), dict) else p.get("reviews"))
# Картинка
img_url = None
img_el = card.select_one("img[src], img[srcset]")
if img_el:
src = img_el.get("src") or img_el.get("data-src") or ""
# Иногда src — заглушка 1x1px, основное в srcset
if "data:image" in src or not src:
srcset = img_el.get("srcset") or ""
if srcset:
src = srcset.split(",")[0].strip().split(" ")[0]
if src.startswith("//"):
src = "https:" + src
if src:
img_url = src
# Рейтинг
rating = None
rating_el = card.select_one("[data-auto='snippet-rating'], [class*='Rating'] span")
if rating_el:
rt = rating_el.get_text(strip=True)
m = re.search(r"\d[.,]\d", rt)
if m:
rating = _try_float(m.group(0))
# Отзывы
reviews = None
reviews_el = card.select_one("[data-auto='snippet-feedback'], a[href*='/reviews']")
if reviews_el:
m = re.search(r"\d[\d\s]*", reviews_el.get_text(" ", strip=True))
if m:
reviews = _try_int(m.group(0).replace(" ", ""))
# Кол-во магазинов / предложений
stores = None
stores_el = card.select_one("[data-auto='offer-count'], a[href*='/offers']")
if stores_el:
m = re.search(r"\d+", stores_el.get_text(" ", strip=True))
if m:
stores = int(m.group(0))
return {
"title": re.sub(r"<[^>]+>", "", title).strip(),
"title": title,
"url": url,
"image_url": pic,
"image_url": img_url,
"price_min_rub": price_min,
"price_max_rub": price_max if price_max and price_max != price_min else None,
"rating": rating,
@ -167,50 +176,6 @@ def _build_item(p: dict) -> dict[str, Any] | None:
}
def _extract_html_card(card) -> dict[str, Any] | None:
"""Резервный парсинг HTML-карточки если JSON не нашёлся."""
title_el = card.select_one("[data-zone-name='title'] span, h3, [class*='Title']")
if not title_el:
return None
title = title_el.get_text(strip=True)
price_el = card.select_one("[data-auto='snippet-price-current'], [class*='Price']")
price = None
if price_el:
m = _PRICE_RE.search(price_el.get_text(" ", strip=True))
if m:
price = _try_int(m.group(1).replace(" ", ""))
img_el = card.select_one("img[srcset], img[src]")
img_url = ""
if img_el:
src = img_el.get("src") or img_el.get("data-src") or ""
if src.startswith("//"):
src = "https:" + src
img_url = src
link_el = card.select_one("a[href*='/product--'], a[data-baobab-name='title']")
url = ""
if link_el:
href = link_el.get("href") or ""
url = href if href.startswith("http") else f"{_BASE_URL}{href}"
if not title:
return None
return {
"title": title,
"url": url,
"image_url": img_url,
"price_min_rub": price,
"price_max_rub": None,
"rating": None,
"reviews_count": None,
"stores_count": None,
"specs": {},
"source": "yamarket",
}
def _try_int(v: Any) -> int | None:
if v is None:
return None
@ -227,10 +192,3 @@ def _try_float(v: Any) -> float | None:
return float(str(v).replace(" ", "").replace(",", "."))
except (ValueError, TypeError):
return None
def _try_json(s: str) -> Any:
try:
return json.loads(s)
except (ValueError, TypeError):
return None

View File

@ -7,3 +7,4 @@ google-auth>=2.30.0
python-dotenv>=1.0.0
beautifulsoup4>=4.12.0
lxml>=5.2.0
playwright>=1.45.0