zov-tech/backend-py/app/sheets.py
wasrusgen d859e9791c roles: multi-role foundation (manager / client / measurer / assembler)
Users.role теперь хранит CSV-список ролей: 'manager,measurer'.
Парсим, добавляем, отзываем — все через sheets.parse_roles / grant_role /
revoke_role / list_users_with_role. Старые однострочные значения работают
как раньше (legacy compat).

Backend:
- /api/me возвращает roles[] (массив), role (главная для legacy-UI),
  plus capabilities {measurer, assembler} для staff
- /api/grant_role (admin-only) — добавить/отозвать роль
- /api/staff_list (manager-only) — список сотрудников по роли
  (будет использоваться в dropdown «выбрать замерщика»)
- При role=staff отдаём отдельный кабинет; если у юзера нет measurer/
  assembler — возвращаем error=no_staff_role

Bot:
- /start — 3-я reply-кнопка [🔧 Я сотрудник]. При тапе MiniApp получает
  ?role=staff и решает кабинет по capabilities.
- /whoami — сотрудник присылает свой Telegram ID, пересылает куратору
  чтобы тот выдал роль через /api/grant_role.

MiniApp:
- renderStaff() — заглушка кабинета сотрудника с шапкой (имя, аватар,
  список ролей) и пустым inbox («Пока пусто»). Если есть measurer —
  быстрая кнопка «Сделать новый замер».
- При error=no_staff_role — экран с инструкцией как получить роль.
- CSS .staff-head / .staff-no-role.

Cache bust v=20260513f.
2026-05-12 19:14:39 +03:00

328 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Тонкая обёртка над Google Sheets через gspread + service account."""
from __future__ import annotations
import threading
from datetime import datetime, timedelta, timezone
from typing import Any
import gspread
from google.oauth2.service_account import Credentials
from .config import get_config
_SCOPES = ["https://www.googleapis.com/auth/spreadsheets"]
_lock = threading.Lock()
_client: gspread.Client | None = None
_book: gspread.Spreadsheet | None = None
def _client_book() -> tuple[gspread.Client, gspread.Spreadsheet]:
global _client, _book
with _lock:
# Если предыдущая попытка частично инициализировалась (auth прошёл, open_by_key упал) —
# _client есть, _book нет. Нужно повторить open_by_key.
if _client is None or _book is None:
try:
cfg = get_config()
if _client is None:
creds = Credentials.from_service_account_file(cfg.google_credentials_path, scopes=_SCOPES)
_client = gspread.authorize(creds)
_book = _client.open_by_key(cfg.sheet_id)
except Exception:
_client = None
_book = None
raise
return _client, _book # type: ignore
def sheet(name: str) -> gspread.Worksheet:
_, book = _client_book()
return book.worksheet(name)
def ensure_sheet(name: str, headers: list[str]) -> gspread.Worksheet:
"""Создаёт лист с заголовками если он не существует. Иначе возвращает существующий."""
_, book = _client_book()
try:
ws = book.worksheet(name)
try:
first = ws.row_values(1)
except Exception:
first = []
if not first:
ws.update("A1", [headers])
return ws
except gspread.exceptions.WorksheetNotFound:
ws = book.add_worksheet(title=name, rows=2000, cols=max(20, len(headers)))
ws.append_row(headers, value_input_option="USER_ENTERED")
return ws
def append_row(name: str, row: list[Any]) -> None:
sheet(name).append_row(row, value_input_option="USER_ENTERED")
def find_row(sheet_name: str, key_col: str, key_val: Any) -> dict[str, Any] | None:
"""Линейный поиск по колонке-ключу. Возвращает строку как dict или None."""
s = sheet(sheet_name)
rows = s.get_all_values()
if not rows:
return None
headers = rows[0]
if key_col not in headers:
return None
idx = headers.index(key_col)
for r in rows[1:]:
if len(r) > idx and str(r[idx]).strip() == str(key_val).strip():
return dict(zip(headers, r + [""] * (len(headers) - len(r))))
return None
def update_cell_by_key(sheet_name: str, key_col: str, key_val: Any, target_col: str, new_val: Any) -> bool:
s = sheet(sheet_name)
rows = s.get_all_values()
if not rows:
return False
headers = rows[0]
if key_col not in headers or target_col not in headers:
return False
key_idx = headers.index(key_col)
target_idx = headers.index(target_col)
for i, r in enumerate(rows[1:], start=2):
if len(r) > key_idx and str(r[key_idx]).strip() == str(key_val).strip():
s.update_cell(i, target_idx + 1, new_val)
return True
return False
def get_setting(key: str) -> str | None:
row = find_row("Settings", "key", key)
return (row or {}).get("value")
# === Доменные хелперы ===
def find_user(tg_id: int) -> dict[str, Any] | None:
if not tg_id:
return None
row = find_row("Users", "tg_id", tg_id)
if not row:
return None
full_name = (f"{row.get('first_name', '')} {row.get('last_name', '')}".strip()
or row.get("tg_username", ""))
roles = parse_roles(row.get("role", ""))
return {**row, "full_name": full_name, "roles": roles}
# ---- Multi-role helpers ----
VALID_ROLES = {"manager", "client", "measurer", "assembler"}
def parse_roles(role_str: str) -> list[str]:
"""Парсит CSV-роли: 'manager,measurer' → ['manager', 'measurer'].
Старые однострочные значения тоже работают: 'manager' → ['manager']."""
if not role_str:
return []
parts = [p.strip() for p in str(role_str).split(",") if p.strip()]
return [p for p in parts if p in VALID_ROLES]
def has_role(user: dict[str, Any] | None, role: str) -> bool:
if not user:
return False
return role in parse_roles(user.get("role", ""))
def primary_role(user: dict[str, Any] | None) -> str:
"""Первая (главная) роль для legacy-кода: manager > measurer > assembler > client."""
if not user:
return ""
roles = parse_roles(user.get("role", ""))
for r in ("manager", "measurer", "assembler", "client"):
if r in roles:
return r
return roles[0] if roles else ""
def grant_role(tg_id: int, role: str) -> bool:
"""Добавляет роль пользователю (если её ещё нет). Возвращает True если что-то изменилось."""
if role not in VALID_ROLES:
return False
user = find_user(tg_id)
if not user:
return False
current = parse_roles(user.get("role", ""))
if role in current:
return False
current.append(role)
return update_cell_by_key("Users", "tg_id", tg_id, "role", ",".join(current))
def revoke_role(tg_id: int, role: str) -> bool:
user = find_user(tg_id)
if not user:
return False
current = parse_roles(user.get("role", ""))
if role not in current:
return False
current.remove(role)
new_val = ",".join(current) if current else "client" # fallback роль
return update_cell_by_key("Users", "tg_id", tg_id, "role", new_val)
def list_users_with_role(role: str) -> list[dict[str, Any]]:
"""Все пользователи, у которых есть указанная роль (для dropdown «выбрать замерщика»)."""
s = sheet("Users")
rows = s.get_all_values()
if not rows:
return []
headers = rows[0]
out: list[dict[str, Any]] = []
for r in rows[1:]:
row = dict(zip(headers, r + [""] * (len(headers) - len(r))))
if role in parse_roles(row.get("role", "")):
full_name = (f"{row.get('first_name', '')} {row.get('last_name', '')}".strip()
or row.get("tg_username", ""))
out.append({
"tg_id": row.get("tg_id"),
"full_name": full_name,
"tg_username": row.get("tg_username", ""),
"roles": parse_roles(row.get("role", "")),
})
return out
def get_or_create_user(tg_user: dict[str, Any], start_param: str | None,
explicit_role: str | None = None) -> dict[str, Any]:
cfg = get_config()
tg_id = tg_user["id"]
admin_id = cfg.admin_tg_id
existing = find_user(tg_id)
now_str = _now_str()
if existing:
update_cell_by_key("Users", "tg_id", tg_id, "last_seen_at", now_str)
# Админ всегда имеет роль manager (могут быть и другие)
if tg_id == admin_id and not has_role(existing, "manager"):
grant_role(tg_id, "manager")
ensure_admin_manager(tg_user)
existing["roles"] = parse_roles((find_user(tg_id) or {}).get("role", ""))
# explicit_role из query (?role=manager|client|staff) — не перетираем уже выданные роли,
# только добавляем если человек впервые открыл эту секцию
elif explicit_role and explicit_role in VALID_ROLES and not has_role(existing, explicit_role):
# client/manager — стандартные роли любой может получить через выбор в боте
if explicit_role in ("manager", "client"):
grant_role(tg_id, explicit_role)
existing["roles"] = parse_roles((find_user(tg_id) or {}).get("role", ""))
return existing
# Новый пользователь
role = "client"
invite_code = ""
if tg_id == admin_id:
role = "manager"
elif explicit_role in ("manager", "client"):
role = explicit_role
elif start_param and start_param.startswith("client_inv_"):
role = "client"
invite_code = start_param
append_row("Users", [
tg_id,
tg_user.get("username", ""),
tg_user.get("first_name", ""),
tg_user.get("last_name", ""),
role, # хранится как CSV; для новых = одна роль
now_str,
now_str,
invite_code,
])
if tg_id == admin_id:
ensure_admin_manager(tg_user)
return find_user(tg_id) or {}
def ensure_admin_manager(tg_user: dict[str, Any]) -> None:
tg_id = tg_user["id"]
if find_row("Managers", "tg_id", tg_id):
return
full_name = (f"{tg_user.get('first_name', '')} {tg_user.get('last_name', '')}".strip()
or tg_user.get("username", "") or str(tg_id))
append_row("Managers", [
tg_id, full_name, "vasrusgen@gmail.com", "",
"ЗОВ — куратор сети", "Санкт-Петербург",
True, "active", "", "", 0, 0, 0, "MGR_ADMIN",
])
def get_manager_profile(tg_id: int) -> dict[str, Any] | None:
cfg = get_config()
row = find_row("Managers", "tg_id", tg_id)
if not row:
return None
is_zov = str(row.get("is_zov_employee", "")).lower() in ("true", "1", "да", "yes")
last_order = _parse_date(row.get("last_order_date"))
active_period = int(get_setting("ACTIVE_PERIOD_DAYS") or cfg.active_period_days)
grace_period = int(get_setting("GRACE_PERIOD_DAYS") or cfg.grace_period_days)
active_until = None
status = "lapsed"
if is_zov:
status = "active"
elif last_order:
active_until = last_order + timedelta(days=active_period)
grace_until = active_until + timedelta(days=grace_period)
now = datetime.now(timezone.utc).astimezone()
if last_order.tzinfo is None:
now = now.replace(tzinfo=None)
if now <= active_until:
status = "active"
elif now <= grace_until:
status = "grace"
else:
status = "lapsed"
return {
**row,
"is_zov_employee": is_zov,
"active_until": active_until,
"status": status,
}
def get_client_profile(tg_id: int) -> dict[str, Any] | None:
return find_row("Clients", "tg_id", tg_id)
def log_event(event: str, tg_id: int | None, payload: dict[str, Any] | None = None) -> None:
import json
try:
append_row("Logs", [
_now_str(), event, tg_id or "",
json.dumps(payload, ensure_ascii=False) if payload else "",
])
except Exception:
pass
def _now_str() -> str:
"""ISO-формат для записи в Sheet (gspread не принимает datetime)."""
return datetime.now(timezone.utc).astimezone().isoformat()
def _parse_date(v: Any) -> datetime | None:
if not v:
return None
if isinstance(v, datetime):
return v
s = str(v).strip()
if not s:
return None
for fmt in ("%Y-%m-%d", "%d.%m.%Y", "%Y-%m-%dT%H:%M:%S", "%d.%m.%Y %H:%M:%S"):
try:
return datetime.strptime(s, fmt)
except ValueError:
continue
return None