zov-tech/backend-py/app/proxy_pool.py
wasrusgen 811bed31a4 backend: proxy_pool supports PROXY_LIST_FILE + format auto-conversion
- New env: PROXY_LIST_FILE — path to file with one proxy per line
- _normalize_proxy_entry accepts: http://user:pass@host:port, host:port:user:pass (Proxys.io format), host:port
- _load_from_file reads file, dedup with static list
- /api/proxy_status returns file_path, file_loaded count, sample (first 3 masked)
2026-05-11 15:52:02 +03:00

176 lines
6.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Proxy6.net pool — динамическая загрузка купленных прокси, ротация.
Конфиг:
PROXY6_TOKEN — API-ключ Proxy6 (https://proxy6.net/user/developers)
Если пусто — прокси не используется (прямые HTTP-запросы).
Использование:
from . import proxy_pool
with proxy_pool.proxied_client(timeout=15) as client:
r = client.get(url, headers=headers)
"""
from __future__ import annotations
import logging
import random
import threading
import time
from typing import Optional
import httpx
from .config import get_config
log = logging.getLogger("zov.proxy")
_API_URL = "https://proxy6.net/api"
_POOL_TTL_SEC = 600 # обновляем пул каждые 10 минут
_lock = threading.Lock()
_pool: list[str] = [] # ["http://user:pass@host:port", ...]
_pool_loaded_at: float = 0.0
def _normalize_proxy_entry(p: str) -> str | None:
"""Принимает строку в любом из форматов:
- http://user:pass@host:port
- socks5://user:pass@host:port
- host:port:user:pass (формат Proxys.io)
- host:port
и возвращает unified URL.
"""
p = p.strip()
if not p:
return None
if "://" in p:
return p
# host:port:user:pass или host:port
parts = p.split(":")
if len(parts) == 4:
host, port, user, pwd = parts
return f"http://{user}:{pwd}@{host}:{port}"
if len(parts) == 2:
return f"http://{parts[0]}:{parts[1]}"
return None
def _parse_static_list(raw: str) -> list[str]:
"""Парсит PROXY_STATIC_LIST — строка с прокси через запятую/перевод строки."""
if not raw:
return []
parts = raw.replace("\n", ",").split(",")
return [u for u in (_normalize_proxy_entry(p) for p in parts) if u]
def _load_from_file(path: str) -> list[str]:
"""Загружает прокси из файла. Каждая строка — один прокси в любом формате."""
if not path:
return []
try:
with open(path, "r", encoding="utf-8") as f:
lines = f.readlines()
except (OSError, IOError) as e:
log.warning("Failed to read PROXY_LIST_FILE=%s: %s", path, e)
return []
return [u for u in (_normalize_proxy_entry(line) for line in lines) if u]
def _load_pool(force: bool = False) -> list[str]:
"""Загружает прокси: сначала статический список из ENV, потом дополняет из Proxy6 API.
Кэшируется на _POOL_TTL_SEC."""
global _pool, _pool_loaded_at
with _lock:
now = time.time()
if not force and _pool and now - _pool_loaded_at < _POOL_TTL_SEC:
return _pool
cfg = get_config()
proxies: list[str] = []
# 1a) Из файла (для больших списков типа Proxys.io 999 IP)
file_proxies = _load_from_file(cfg.proxy_list_file)
if file_proxies:
proxies.extend(file_proxies)
log.info("Proxy file %s: %d entries", cfg.proxy_list_file, len(file_proxies))
# 1b) Статический список из ENV (для одиночных IP без файла)
static = _parse_static_list(cfg.proxy_static_list)
if static:
# Дедуп
new_items = [s for s in static if s not in proxies]
proxies.extend(new_items)
log.info("Static proxy list: +%d entries (total %d)", len(new_items), len(proxies))
# 2) Динамический пул из Proxy6 API (если есть токен)
if cfg.proxy6_token:
try:
with httpx.Client(timeout=10.0) as client:
r = client.get(f"{_API_URL}/{cfg.proxy6_token}/getproxy",
params={"state": "active"})
data = r.json()
if data.get("status") == "yes":
for _, p in (data.get("list") or {}).items():
if str(p.get("active")) != "1":
continue
proto = (p.get("type") or "http").lower()
if proto == "socks":
proto = "socks5"
host = p.get("host") or p.get("ip")
port = p.get("port")
user = p.get("user")
pwd = p.get("pass")
if not (host and port and user and pwd):
continue
url = f"{proto}://{user}:{pwd}@{host}:{port}"
if url not in proxies:
proxies.append(url)
log.info("Proxy6 API: total pool now %d proxies", len(proxies))
else:
log.warning("Proxy6 API returned status=%s error=%s",
data.get("status"), data.get("error"))
except Exception as e:
log.warning("Proxy6 API request failed: %s", e)
_pool = proxies
_pool_loaded_at = now
if not _pool:
log.info("Proxy pool is empty — parsers will use direct HTTP")
return _pool
def get_random_proxy() -> Optional[str]:
"""Возвращает случайный прокси из пула, или None если пул пуст."""
pool = _load_pool()
if not pool:
return None
return random.choice(pool)
def proxied_client(timeout: float = 15.0, **client_kwargs) -> httpx.Client:
"""httpx.Client с рандомным прокси из пула (или прямой если пул пуст)."""
proxy = get_random_proxy()
if proxy:
return httpx.Client(proxy=proxy, timeout=timeout, **client_kwargs)
return httpx.Client(timeout=timeout, **client_kwargs)
def pool_status() -> dict:
"""Для диагностики — текущее состояние пула."""
pool = _load_pool()
cfg = get_config()
# Маскируем пароли в URL для diagnostic
masked = []
for p in pool:
try:
import re as _re
masked.append(_re.sub(r"://([^:]+):([^@]+)@", r"://\1:***@", p))
except Exception:
masked.append("***")
return {
"count": len(pool),
"loaded_age_sec": int(time.time() - _pool_loaded_at) if _pool_loaded_at else None,
"token_configured": bool(cfg.proxy6_token),
"static_list_size": len(_parse_static_list(cfg.proxy_static_list)),
"file_path": cfg.proxy_list_file,
"file_loaded": len(_load_from_file(cfg.proxy_list_file)) if cfg.proxy_list_file else 0,
"sample": masked[:3], # первые 3 для проверки формата
}