"""Модуль «Подбор техники — цикл согласования».
Цикл:
1. Клиент заполняет brief (анкету пожеланий) → status=brief
2. Менеджер видит brief, создаёт/дополняет подборку → status=draft
3. Менеджер отправляет клиенту → status=sent
4. Клиент голосует (✅/❌) + оставляет комментарий → status=reviewed
5. Менеджер фиксирует итог → status=done
Google Sheets: лист «Proposals».
"""
from __future__ import annotations
import json
import uuid
import logging
from datetime import datetime, timezone
from typing import Any
import httpx
from . import sheets
from .auth import verify_init_data
from .config import get_config
log = logging.getLogger("zov.proposals")
# ---------------------------------------------------------------------------
# Sheet setup
# ---------------------------------------------------------------------------
PROPOSALS_HEADERS = [
"id", "client_key", "client_tg_id", "manager_tg_id",
"status", # brief | draft | sent | reviewed | done | archived
"brief_json", # JSON объект с анкетой клиента
"positions_json", # JSON массив категорий с вариантами
"client_comment", # общий текстовый комментарий клиента
"manager_comment", # финальная заметка менеджера
"created_at", "sent_at", "reviewed_at", "archived_at",
]
ACTIVE_STATUSES = {"brief", "draft", "sent", "reviewed"}
def ensure_sheet() -> None:
sheets.ensure_sheet("Proposals", PROPOSALS_HEADERS)
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
def _short_id() -> str:
return uuid.uuid4().hex[:13]
def _parse_json(raw: str, default: Any) -> Any:
try:
return json.loads(raw) if raw and raw.strip() else default
except Exception:
return default
def _row_to_dict(headers: list[str], row: list) -> dict[str, Any]:
d = dict(zip(headers, row + [""] * (len(headers) - len(row))))
d["brief_json"] = _parse_json(d.get("brief_json", ""), {})
d["positions_json"] = _parse_json(d.get("positions_json", ""), [])
return d
def _get_all(ws) -> tuple[list[str], list[dict]]:
"""Возвращает (headers, list_of_dicts), пропуская пустые строки."""
rows = ws.get_all_values()
if not rows or len(rows) < 1:
return [], []
headers = rows[0]
return headers, [_row_to_dict(headers, r) for r in rows[1:] if any(r)]
def _find_row_num(ws, proposal_id: str) -> int | None:
"""Номер строки в листе (1-based, с учётом заголовка) или None."""
rows = ws.get_all_values()
if not rows:
return None
try:
id_col = rows[0].index("id")
except ValueError:
return None
for i, row in enumerate(rows[1:], start=2):
if len(row) > id_col and row[id_col] == proposal_id:
return i
return None
def _update_field(ws, row_num: int, headers: list[str], field: str, value: Any) -> None:
try:
col = headers.index(field) + 1
ws.update_cell(row_num, col, value)
except Exception as e:
log.warning("_update_field %s: %s", field, e)
def _update_fields(ws, row_num: int, headers: list[str], updates: dict[str, Any]) -> None:
for field, value in updates.items():
_update_field(ws, row_num, headers, field, value)
def _tg_notify(chat_id: str, text: str) -> None:
"""Отправляет сообщение через Telegram Bot API (sync, fire-and-forget)."""
cfg = get_config()
if not cfg.bot_token or not chat_id:
return
try:
url = f"https://api.telegram.org/bot{cfg.bot_token}/sendMessage"
httpx.post(url, json={
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML",
}, timeout=8)
except Exception as e:
log.warning("tg_notify to %s failed: %s", chat_id, e)
def _auth(body: dict) -> tuple[str | None, dict | None]:
"""Парсит initData → (tg_id_str, None) или (None, error_dict)."""
cfg = get_config()
auth = verify_init_data(body.get("initData") or "", cfg.bot_token)
if not auth or not auth.get("user"):
unsafe = body.get("initDataUnsafe") or {}
if isinstance(unsafe, dict) and unsafe.get("user", {}).get("id"):
auth = {"user": unsafe["user"]}
else:
return None, {"error": "invalid_init_data"}
return str(auth["user"]["id"]), None
def _brief_summary(brief: dict) -> str:
"""Краткое текстовое описание анкеты для уведомления."""
lines = []
hob_labels = {"induction": "Индукция", "gas": "Газ", "electric": "Эл-во", "none": "—"}
if brief.get("hob"):
lines.append(f"Варочная: {hob_labels.get(brief['hob'], brief['hob'])}")
if brief.get("oven"):
lines.append("Духовка: нужна")
dw = brief.get("dishwasher")
if dw and dw != "none":
lines.append(f"Посудомойка: {dw} см")
hood_labels = {"builtin": "Встройка", "dome": "Купол", "none": "—"}
if brief.get("hood") and brief["hood"] != "none":
lines.append(f"Вытяжка: {hood_labels.get(brief['hood'], brief['hood'])}")
if brief.get("budget"):
lines.append(f"Бюджет: {int(brief['budget']):,} ₽".replace(",", " "))
if brief.get("notes"):
lines.append(f"Пожелания: {brief['notes'][:120]}")
return "\n".join(lines) if lines else "Анкета заполнена"
# ---------------------------------------------------------------------------
# API handlers
# ---------------------------------------------------------------------------
def handle_brief(body: dict) -> dict:
"""Клиент сохраняет анкету пожеланий. Создаёт или обновляет Proposal со status=brief.
Уведомляет менеджера."""
tg_id, err = _auth(body)
if err:
return err
cfg = get_config()
brief: dict = {
"hob": body.get("hob", ""),
"oven": body.get("oven", ""),
"dishwasher": body.get("dishwasher", ""),
"hood": body.get("hood", ""),
"fridge": body.get("fridge", ""),
"microwave": body.get("microwave", ""),
"budget": body.get("budget", ""),
"notes": str(body.get("notes", "") or "").strip(),
}
client_name = str(body.get("client_name", "") or "").strip()
client_key = client_name.lower() if client_name else f"tg_{tg_id}"
# Менеджер из карточки клиента
manager_tg_id = str(cfg.admin_tg_id) if cfg.admin_tg_id else ""
cl_row = sheets.find_row("Clients", "client_key", client_key)
if cl_row:
manager_tg_id = str(cl_row.get("manager_tg_id", "") or manager_tg_id)
if not manager_tg_id:
# Fallback — попробуем найти по tg_id
cl_row2 = sheets.find_row("Clients", "client_tg_id", tg_id)
if cl_row2:
manager_tg_id = str(cl_row2.get("manager_tg_id", "") or manager_tg_id)
client_key = cl_row2.get("client_key", client_key)
ensure_sheet()
ws = sheets.sheet("Proposals")
headers, all_dicts = _get_all(ws)
# Ищем существующий активный proposal этого клиента
existing = next(
(d for d in all_dicts
if (str(d.get("client_tg_id")) == tg_id or d.get("client_key") == client_key)
and d.get("status") in ACTIVE_STATUSES
and not d.get("archived_at")),
None,
)
if existing:
proposal_id = existing["id"]
row_num = _find_row_num(ws, proposal_id)
if row_num:
_update_fields(ws, row_num, headers, {
"brief_json": json.dumps(brief, ensure_ascii=False),
"status": "brief",
"client_tg_id": tg_id,
})
else:
proposal_id = _short_id()
row = [
proposal_id, client_key, tg_id, manager_tg_id,
"brief",
json.dumps(brief, ensure_ascii=False),
"[]", "", "",
_now(), "", "", "",
]
sheets.append_row("Proposals", row)
# Уведомление менеджеру
if manager_tg_id:
name_tag = f"{client_name}" if client_name else f"клиент (tg {tg_id})"
_tg_notify(manager_tg_id,
f"📋 {name_tag} заполнил анкету на подбор техники:\n\n"
f"{_brief_summary(brief)}\n\n"
f"Откройте карточку клиента, чтобы начать подбор.")
return {"ok": True, "proposal_id": proposal_id}
def handle_create(body: dict) -> dict:
"""Менеджер вручную создаёт Proposal для клиента (status=draft)."""
tg_id, err = _auth(body)
if err:
return err
user = sheets.find_user(tg_id)
if not user or not sheets.has_role(user, "manager"):
return {"error": "only_manager"}
client_key = str(body.get("client_key", "") or "").strip().lower()
if not client_key:
return {"error": "client_key required"}
client_tg_id = str(body.get("client_tg_id", "") or "")
ensure_sheet()
ws = sheets.sheet("Proposals")
headers, all_dicts = _get_all(ws)
# Проверяем, нет ли уже активного
existing = next(
(d for d in all_dicts
if d.get("client_key") == client_key
and d.get("status") in ACTIVE_STATUSES
and not d.get("archived_at")),
None,
)
if existing:
return {"ok": True, "proposal_id": existing["id"], "existing": True}
proposal_id = _short_id()
row = [
proposal_id, client_key, client_tg_id, tg_id,
"draft", "{}", "[]", "", "",
_now(), "", "", "",
]
sheets.append_row("Proposals", row)
return {"ok": True, "proposal_id": proposal_id}
def handle_upsert_variant(body: dict) -> dict:
"""Менеджер добавляет или обновляет вариант в категории.
body: {proposal_id, category, category_label, variant: {id?, model, url, price,
image_url?, manager_comment?}}
"""
tg_id, err = _auth(body)
if err:
return err
user = sheets.find_user(tg_id)
if not user or not sheets.has_role(user, "manager"):
return {"error": "only_manager"}
proposal_id = body.get("proposal_id", "")
category = body.get("category", "").strip()
cat_label = body.get("category_label", category)
variant_in = body.get("variant", {}) or {}
if not proposal_id or not category:
return {"error": "proposal_id and category required"}
ensure_sheet()
ws = sheets.sheet("Proposals")
headers, _ = _get_all(ws)
row_num = _find_row_num(ws, proposal_id)
if not row_num:
return {"error": "proposal_not_found"}
rows = ws.get_all_values()
rd = _row_to_dict(headers, rows[row_num - 1])
# Статус должен позволять редактирование
if rd.get("status") not in ("brief", "draft", "reviewed"):
return {"error": "cannot_edit_in_this_status", "status": rd.get("status")}
positions: list[dict] = rd.get("positions_json") or []
# Найти или создать категорию
cat = next((p for p in positions if p.get("category") == category), None)
if cat is None:
cat = {"category": category, "label": cat_label, "variants": [], "client_comment": ""}
positions.append(cat)
else:
cat["label"] = cat_label # обновляем label если изменился
variant_id = str(variant_in.get("id") or _short_id())
variant = {
"id": variant_id,
"model": str(variant_in.get("model", "") or "").strip(),
"url": str(variant_in.get("url", "") or "").strip(),
"price": variant_in.get("price", ""),
"image_url": str(variant_in.get("image_url", "") or "").strip(),
"source": str(variant_in.get("source", "") or "").strip(),
"manager_comment": str(variant_in.get("manager_comment", "") or "").strip(),
"client_vote": None,
}
# Обновляем если уже есть, иначе добавляем
existing_v = next((v for v in cat["variants"] if v.get("id") == variant_id), None)
if existing_v:
existing_v.update({k: v for k, v in variant.items() if k != "client_vote"})
else:
cat["variants"].append(variant)
_update_fields(ws, row_num, headers, {
"positions_json": json.dumps(positions, ensure_ascii=False),
"status": "draft",
})
return {"ok": True, "variant_id": variant_id}
def handle_remove_variant(body: dict) -> dict:
"""Менеджер удаляет вариант или целую категорию."""
tg_id, err = _auth(body)
if err:
return err
user = sheets.find_user(tg_id)
if not user or not sheets.has_role(user, "manager"):
return {"error": "only_manager"}
proposal_id = body.get("proposal_id", "")
category = body.get("category", "").strip()
variant_id = body.get("variant_id", "").strip() # пусто = удалить всю категорию
ensure_sheet()
ws = sheets.sheet("Proposals")
headers, _ = _get_all(ws)
row_num = _find_row_num(ws, proposal_id)
if not row_num:
return {"error": "proposal_not_found"}
rows = ws.get_all_values()
rd = _row_to_dict(headers, rows[row_num - 1])
positions: list[dict] = rd.get("positions_json") or []
if variant_id:
cat = next((p for p in positions if p.get("category") == category), None)
if cat:
cat["variants"] = [v for v in cat["variants"] if v.get("id") != variant_id]
if not cat["variants"]:
positions = [p for p in positions if p.get("category") != category]
else:
positions = [p for p in positions if p.get("category") != category]
_update_field(ws, row_num, headers, "positions_json",
json.dumps(positions, ensure_ascii=False))
return {"ok": True}
def handle_send(body: dict) -> dict:
"""Менеджер отправляет подборку клиенту. status → sent.
Уведомляет клиента в бот."""
tg_id, err = _auth(body)
if err:
return err
user = sheets.find_user(tg_id)
if not user or not sheets.has_role(user, "manager"):
return {"error": "only_manager"}
proposal_id = body.get("proposal_id", "")
ensure_sheet()
ws = sheets.sheet("Proposals")
headers, _ = _get_all(ws)
row_num = _find_row_num(ws, proposal_id)
if not row_num:
return {"error": "proposal_not_found"}
rows = ws.get_all_values()
rd = _row_to_dict(headers, rows[row_num - 1])
positions: list[dict] = rd.get("positions_json") or []
if not positions or not any(p.get("variants") for p in positions):
return {"error": "no_variants_yet"}
_update_fields(ws, row_num, headers, {"status": "sent", "sent_at": _now()})
client_tg_id = rd.get("client_tg_id", "")
manager_name = str(user.get("full_name", "") or "Менеджер")
n_pos = len(positions)
_tg_notify(client_tg_id,
f"🛍 {manager_name} подобрал технику для вашей кухни!\n\n"
f"В подборке {n_pos} {'категория' if n_pos == 1 else 'категории' if 2 <= n_pos <= 4 else 'категорий'}. "
f"Откройте приложение, чтобы посмотреть варианты и выбрать подходящие.")
return {"ok": True}
def handle_list(body: dict) -> dict:
"""Список proposals.
Менеджер: все по своим клиентам.
Клиент: только свои (по tg_id).
"""
tg_id, err = _auth(body)
if err:
return err
user = sheets.find_user(tg_id)
is_manager = bool(user and sheets.has_role(user, "manager"))
ensure_sheet()
ws = sheets.sheet("Proposals")
headers, all_dicts = _get_all(ws)
out = []
for d in all_dicts:
if d.get("archived_at"):
continue
if is_manager:
if str(d.get("manager_tg_id")) != tg_id:
continue
else:
if str(d.get("client_tg_id")) != tg_id:
continue
# Краткая сводка без полного positions_json
positions = d.get("positions_json") or []
out.append({
"id": d.get("id"),
"client_key": d.get("client_key"),
"client_tg_id": d.get("client_tg_id"),
"status": d.get("status"),
"created_at": d.get("created_at"),
"sent_at": d.get("sent_at"),
"reviewed_at": d.get("reviewed_at"),
"brief": d.get("brief_json") or {},
"n_categories": len(positions),
"n_variants": sum(len(p.get("variants", [])) for p in positions),
})
out.sort(key=lambda x: x.get("created_at") or "", reverse=True)
return {"ok": True, "proposals": out, "is_manager": is_manager}
def handle_detail(body: dict) -> dict:
"""Полная карточка proposal — доступна и менеджеру, и клиенту."""
tg_id, err = _auth(body)
if err:
return err
proposal_id = body.get("proposal_id", "")
ensure_sheet()
ws = sheets.sheet("Proposals")
headers, all_dicts = _get_all(ws)
d = next((x for x in all_dicts if x.get("id") == proposal_id), None)
if not d:
return {"error": "not_found"}
user = sheets.find_user(tg_id)
is_manager = bool(user and sheets.has_role(user, "manager"))
# Клиент видит только своё
if not is_manager and str(d.get("client_tg_id")) != tg_id:
return {"error": "forbidden"}
return {
"ok": True,
"proposal": {
"id": d.get("id"),
"client_key": d.get("client_key"),
"client_tg_id": d.get("client_tg_id"),
"manager_tg_id": d.get("manager_tg_id"),
"status": d.get("status"),
"brief": d.get("brief_json") or {},
"positions": d.get("positions_json") or [],
"client_comment": d.get("client_comment", ""),
"manager_comment": d.get("manager_comment", ""),
"created_at": d.get("created_at"),
"sent_at": d.get("sent_at"),
"reviewed_at": d.get("reviewed_at"),
},
"is_manager": is_manager,
}
def handle_vote(body: dict) -> dict:
"""Клиент голосует за вариант (✅ yes / ❌ no / null — снять голос).
body: {proposal_id, category, variant_id, vote: 'yes'|'no'|null}
"""
tg_id, err = _auth(body)
if err:
return err
proposal_id = body.get("proposal_id", "")
category = body.get("category", "").strip()
variant_id = body.get("variant_id", "").strip()
vote = body.get("vote") # 'yes' | 'no' | None
if vote not in ("yes", "no", None):
return {"error": "vote must be yes | no | null"}
ensure_sheet()
ws = sheets.sheet("Proposals")
headers, _ = _get_all(ws)
row_num = _find_row_num(ws, proposal_id)
if not row_num:
return {"error": "proposal_not_found"}
rows = ws.get_all_values()
rd = _row_to_dict(headers, rows[row_num - 1])
# Клиент голосует только в своём
if str(rd.get("client_tg_id")) != tg_id:
return {"error": "forbidden"}
if rd.get("status") not in ("sent", "reviewed"):
return {"error": "voting_not_open"}
positions: list[dict] = rd.get("positions_json") or []
cat = next((p for p in positions if p.get("category") == category), None)
if not cat:
return {"error": "category_not_found"}
variant = next((v for v in cat.get("variants", []) if v.get("id") == variant_id), None)
if not variant:
return {"error": "variant_not_found"}
variant["client_vote"] = vote
_update_field(ws, row_num, headers, "positions_json",
json.dumps(positions, ensure_ascii=False))
return {"ok": True}
def handle_client_submit(body: dict) -> dict:
"""Клиент отправляет итоговый комментарий → status=reviewed.
Уведомляет менеджера."""
tg_id, err = _auth(body)
if err:
return err
proposal_id = body.get("proposal_id", "")
comment = str(body.get("comment", "") or "").strip()
ensure_sheet()
ws = sheets.sheet("Proposals")
headers, _ = _get_all(ws)
row_num = _find_row_num(ws, proposal_id)
if not row_num:
return {"error": "proposal_not_found"}
rows = ws.get_all_values()
rd = _row_to_dict(headers, rows[row_num - 1])
if str(rd.get("client_tg_id")) != tg_id:
return {"error": "forbidden"}
_update_fields(ws, row_num, headers, {
"status": "reviewed",
"client_comment": comment,
"reviewed_at": _now(),
})
# Сводка голосов
positions: list[dict] = rd.get("positions_json") or []
vote_lines = []
for cat in positions:
for v in cat.get("variants", []):
vote = v.get("client_vote")
if vote == "yes":
vote_lines.append(f"✅ {cat.get('label', cat.get('category'))}: {v.get('model', '—')}")
elif vote == "no":
vote_lines.append(f"❌ {cat.get('label', cat.get('category'))}: {v.get('model', '—')}")
vote_text = "\n".join(vote_lines) if vote_lines else "Голосов нет"
manager_tg_id = rd.get("manager_tg_id", "")
client_key = rd.get("client_key", "клиент")
_tg_notify(manager_tg_id,
f"📬 {client_key.title()} ответил на подборку техники:\n\n"
f"{vote_text}"
+ (f"\n\n💬 Комментарий: {comment}" if comment else ""))
return {"ok": True}