Перейти к содержанию

vk_service/whatsapp_service

1. Назначение🔗

Микросервисы whatsapp_service и vk_service отвечают за приём и обработку входящих сообщений от пользователей с платформ WAPPI (WhatsApp) и VK соответственно.

Они реализуют следующий процесс:

  • Принимают входящие вебхуки от внешней платформы.
  • Проверяют:

    • Активен ли профиль (через data_service).
    • Не находится ли пользователь в списке игнорируемых (по локальной БД).
    • Не является ли отправитель GPT или менеджером.
  • Сохраняют сообщение в Redis в очередь gpt_queue.

  • Если нет активного процесса обработки (по ключу gpt_lock):

    • Ставят техническую блокировку (gpt_lock) на данного пользователя.
    • Запускают сбор сообщений в фоновом режиме (asyncio.create_task(...)).
  • В течение времени, указанного в профиле:

    • Собираются входящие сообщения.
    • По окончании таймера:

      • Извлекается до 9 сообщений.

      Больше 10 сообщений API OpenAI не принимает в 1 запросе

      • Добавляется информационное сообщение ("Юзер {имя} написал сообщения, дата {дата}, день недели {день}").
      • Формируется payload OpenAI (Assistant API).
      • Отправляется в gpt_service по HTTP со стримингом.
      • Полученный ответ отправляется пользователю в VK или WhatsApp.
      • В data_service отправляются данные о расходе токенов.
  • Если после ответа остались ещё сообщения — цикл повторяется.

  • В случае ошибок GPT пользователь добавляется в список игнорируемых.

Сервис логирует ключевые действия: входящие данные, ход обработки, ошибки, расход токенов. Весь процесс управления сообщениями построен так, чтобы на одного пользователя одновременно выполнялся только один процесс обработки.

Этот раздел объеденяет несколько микросервисов так как у них идентичная архитектура и даже код, различия разве что в обработке вебхуков и отправке сообщение по API сервисов


2. Архитектура и компоненты🔗

Микросервисы whatsapp_service и vk_service реализованы на FastAPI. Каждый сервис принимает вебхуки от соответствующей платформы, обрабатывает сообщения, обращается к другим внутренним сервисам и отправляет ответ пользователю.

Используемые технологии🔗

  • FastAPI — REST API и маршрутизация
  • Redis — кэш профилей, блокировок и сообщений
  • PostgreSQL + SQLAlchemy — локальное хранилище и работа с ORM
  • Alembic — миграции БД
  • httpx (async) — вызовы к другим микросервисам
  • Асинхронный код (asyncio) — запуск фоновых задач без Celery

Взаимодействие с другими сервисами🔗

  • Внешние:

    • Приём вебхуков от:

      • WAPPI (WhatsApp)
      • VK API
    • Отправка сообщений пользователям через API соответствующей платформы

  • Внутренние:

    • gpt_service:

      • отправка сообщений GPT (HTTP + SSE)
      • получение ответа и токен-статистики
    • data_service:

      • получение данных о профиле (по profile_id)
      • отправка информации о GPT-запросах для учёта и биллинга

Кэш и временные структуры (Redis)🔗

Назначение Префикс ключа TTL
Данные профиля cached_vk_ / cached_whatsapp_ Лучше уточнять в коде
ФИО пользователя (VK) vk_user_full_name
Техническая блокировка vk_gpt_lock / whatsapp_gpt_lock
Очередь сообщений gpt_queue:{id}

Очистка кэша осуществляется автоматически через TTL.

Асинхронная обработка🔗

  • Сбор сообщений и их отправка в GPT выполняется фоново через asyncio.create_task(...).
  • Использование блокировки (gpt_lock) гарантирует, что одновременно будет выполняться не более одного процесса обработки на одного пользователя.
  • Celery или другие очереди не используются.

3. Основные модули и директории🔗

Исходный код микросервисов расположен в директориях app/. Оба проекта логически разделены на подмодули с идентичной архитектурой: конфигурация, взаимодействие с базой данных, логика Redis, работа с внешними API и интеграция с gpt_service.

Ниже — две отдельные структуры в виде дерева: одна для whatsapp_service, вторая для vk_service.


Структура whatsapp_service🔗

WhatsappBackend/
├── .env # Зависимости
├── .env.example # Пример файла зависимостей
├── alembic.ini # Конфиг Alembic
├── app/ # Основная папка проекта
│   ├── config.py # Извлечение данных из env и создание клиентов подключения к базам данных
│   ├── database.py # Фабрика сессий и клиенты подключения к БД
│   ├── dependencies.py # Зависимости SQLAlchemy
│   ├── logger.py # Самописный логгер
│   ├── main.py 
│   ├── migration/
│   ├── models/ 
│   │   └── users.py # Модель юзера для игнора
│   └── utils/
│       ├── check_bot_data.py # Запрос данных о профиле
│       ├── crud.py 
│       ├── date_time.py # Получение текущего времени
│       ├── caption_message.py # Формирование пояснительного сообщения для чатгпт
│       ├── gpt/
│       │   ├── gpt_service_api.py 
│       │   └── send_tokens.py # Отправка данных в data_service для создани траназкции о расходах
│       ├── redis/
│       │   ├── add_message.py 
│       │   ├── cache_data.py
│       │   ├── check_batch.py # Проверка готовности отправки следующей пачки сообщений
│       │   ├── crud_lock_keys.py # Блокировка обработки сообщений юзера
│       │   └── crud_messages.py
│       ├── wappi/
│       │   └── api.py
│       └── webhook/
│           ├── extract_message.py # Извлечение данных из вебхука Wappi
│           ├── handle_webhook.py # Первичная обработка вебхука
│           └── process_webhook.py # Основная обработка вебхука
├── docker-compose.yml
├── logs/
├── readme.md
├── requirements.txt

Структура vk_service🔗

├── .env # Зависимости
├── .env.example # Пример файла зависимостей
├── alembic.ini # Конфиг Alembic
├── app/ # Основная папка проекта
│   ├── config.py # Извлечение данных из env и создание клиентов подключения к базам данных
│   ├── database.py # Фабрика сессий и клиенты подключения к БД
│   ├── dependencies.py # Зависимости SQLAlchemy
│   ├── logger.py # Самописный логгер
│   ├── main.py 
│   ├── migration/
│   ├── models/ 
│   │   └── users.py # Модель юзера для игнора
│   └── utils/
│       ├── check_bot_data.py # Запрос данных о профиле
│       ├── crud.py
│       ├── check_bot_data.py # Запрос данных о профиле
│       ├── gpt/
│       │   ├── gpt_service_api.py 
│       │   └── send_tokens.py # Отправка данных в data_service для создани траназкции о расходах
│       ├── redis/
│       │   ├── add_message.py
│       │   ├── cache_data.py
│       │   ├── cache_user_full_name.py # Кэширование имени юзера (т.к. в вк его запрашивают отдельно)
│       │   ├── check_batch.py # Проверка готовности отправки следующей пачки сообщений
│       │   ├── crud_lock_keys.py # Блокировка обработки сообщений юзера
│       │   └── crud_message.py
│       ├── vk.py
│       └── webhook/
│           ├── check_payload.py # Проверка вебхука на payload чтобы понять написал ли это сообщение бот
│           ├── extract_message.py # Извлечение данных из вебхука Wappi
│           ├── handle_webhook.py # Первичная обработка вебхука
│           ├── message_wrapper.py # Перевод вебхука в удобный объект
│           └── process_webhook.py # Основная обработка вебхука
│           └── vk_api.py
├── docker-compose.yml
├── logs/
├── readme.md
├── requirements.txt

Ключевые отличия между сервисами заключаются в интеграционном уровне — в whatsapp_service используется WAPPI, в vk_service — VK API, что отражается в структуре webhook/ и вспомогательных утилитах.


4. Переменные окружения🔗

Оба микросервиса используют одинаковый набор переменных окружения для подключения к PostgreSQL, Redis, взаимодействия с внутренними сервисами и логгирования. Ниже — полный список.

Обязательные и необязательные переменные🔗

Обязательные:🔗

Переменная Назначение
POSTGRES_USER Пользователь PostgreSQL
POSTGRES_PASSWORD Пароль PostgreSQL
POSTGRES_DB Название базы данных
POSTGRES_PORT Порт PostgreSQL
POSTGRES_HOST Хост PostgreSQL
REDIS_HOST Хост Redis
REDIS_PORT Порт Redis
REDIS_DB Номер используемой базы Redis
CREDENTIALS_DOMAIN URL сервиса data_service
GPT_SERVICE_DOMAIN URL сервиса gpt_service
TOKENS_ADD_URL Относительный путь для отправки токенов

Опциональные (имеют значения по умолчанию):🔗

Переменная По умолчанию Назначение
LOG_LEVEL DEBUG Уровень логов
USE_JSON_ONLY false JSON-лог только в консоль
USE_TEXT_FILE_LOG true Включение текстового лога в файл
LOG_JSON_PATH logs/json_app.log Путь до JSON-лога
LOG_FILE_PATH logs/app.log Путь до текстового лога
MAX_LOG_SIZE_MB 5 Максимальный размер лог-файла (в МБ)
UVICORN_ERROR_LOG_PATH logs/uvicorn_error.log Путь до логов ошибок uvicorn

5. Взаимодействие с другими сервисами🔗

Микросервисы whatsapp_service и vk_service взаимодействуют с рядом внешних и внутренних сервисов. Ниже описано, как устроены эти связи.


gpt_service🔗

Служит прокси между микросервисом и OpenAI API. Принимает входящие сообщения и возвращает стриминг-ответ от модели.

  • Метод: POST
  • URL: ${GPT_SERVICE_DOMAIN}/send_message
  • Формат: httpx.AsyncClient(...).stream(...)
  • Payload:

{
  "assistant_key": "...",
  "open_ai_key": "...",
  "user_id": "...",
  "message": "...",
  "request_id": "...",
  "sender": "..."
}
* Ответ (возможно несколько):

  • answer — текст ответа
  • input_tokens, output_tokens, openai_model
  • final: true — завершение

На основании ответа данные отправляются пользователю, а usage передаётся в data_service.


data_service🔗

Используется для получения профиля и записи токенов после GPT-запроса.

Получение профиля🔗

  • Метод: GET
  • URL: ${CREDENTIALS_DOMAIN}/profile/{profile_id}
  • Назначение: Получение ключей, имени, параметров (таймер и т.д.)

Учёт токенов🔗

  • Метод: POST
  • URL: ${CREDENTIALS_DOMAIN}${TOKENS_ADD_URL}
  • Payload:
{
  "account_identifier": "profile_id",
  "input_tokens": 1234,
  "output_tokens": 567,
  "service": "VK" | "WhatsApp",
  "openai_model": "gpt-4.0",
  "request_id": "..."
}

Ответ содержит tx_id, balance, которые кешируются в Redis.


Внешние API платформ🔗

VK API🔗

Отправка сообщений🔗
  • URL: https://api.vk.ru/method/messages.send
  • Метод: POST
  • Параметры:

  • user_id, message, random_id, payload, v

  • Заголовок: Authorization: Bearer <токен из профиля>
Запрос имени пользователя🔗
  • URL: https://api.vk.ru/method/users.get
  • Метод: GET
  • Параметры:

  • user_ids, fields=first_name,last_name, v

  • Заголовок: Authorization: Bearer <токен из профиля>

Используется для кэширования имени пользователя при необходимости отобразить в ответе.


WAPPI API (WhatsApp)🔗

Отправка сообщений🔗
  • URL: https://wappi.pro/api/sync/message/send
  • Метод: POST
  • Заголовок:

  • Authorization: <токен из профиля>

  • Content-Type: application/json
  • Параметры запроса:

  • profile_id — ID интеграции в WAPPI

  • Тело запроса:
{
  "body": "Привет!",
  "recipient": "+79991112233"
}

Если токен не получен или данные профиля не загружены, отправка прерывается с логированием ошибки.


6. Обработка данных и логика🔗

Микросервисы whatsapp_service и vk_service реализуют единый сценарий асинхронной обработки входящих сообщений с использованием Redis, блокировок и стримингового общения с GPT.


Этап 1 — Приём и первичная фильтрация🔗

  1. Вебхук поступает от платформы (VK / WAPPI) и разбирается на составные части, логируется и отправляется дальше.
  2. Выполняются проверки:

    • Тип события (например, outgoing_message_api пропускается),
    • Отправитель — не бот и не наш API,
    • Пользователь не в игноре (через локальную БД),
    • В VK: дополнительно обрабатывается confirmation-вебхук.
  3. Если всё валидно — сообщение разбирается и помещается в Redis-очередь (gpt_queue).


Этап 2 — Контроль блокировки и запуск обработки🔗

Перед началом сбора сообщений проверяется наличие блокировки (gpt_lock). Если её нет:

  • Устанавливается Redis-блокировка,

Блокировка нужна чтобы на 1 юзера был только 1 процесс обработки. Сама блокировка это просто поле в Redis с айди юзера

  • Запускается asyncio.create_task(...) с отложенной обработкой (schedule_gpt_message_handling).

Таймер (wait_time) извлекается из профиля и реализуется через asyncio.sleep(...).


Этап 3 — Сбор сообщений и отправка в GPT🔗

Потому что OpenAI API не пустит больше 10

  1. Из Redis извлекаются последние 9 сообщений.
  2. В начало добавляется пояснительное сообщение:
    Юзер {имя} написал сообщения, время отправки {дата}, {день недели}
    
  3. Все сообщения передаются в gpt_service (через send_to_fastapi_gpt(...)) в стриминговом режиме.

Если GPT возвращает ответ — он:

  • логгируется,
  • отправляется пользователю (через VK API или WAPPI),
  • создаётся запись о расходе токенов в data_service,
  • обновляется кэш баланса в Redis.

Обработка ошибок🔗

Если во время отправки в GPT возникает ошибка (любой тип), пользователь:

  • автоматически вносится в список игнорируемых (add_ignored_number(...)),
  • получает сообщение о передаче разговора менеджеру (через платформу),
  • дальнейшая обработка прерывается.

Исключения логгируются, но не пробрасываются — стабильность сохраняется.


Этап 4 — Продолжение или завершение🔗

После ответа GPT:

  • Если в очереди остались сообщения — запускается новый цикл, с учётом интервала ожидания wait_time (через check_batch_ready(...)).
  • Если очередь пуста — Redis-блокировка снимается.

Это гарантирует, что на одного пользователя может работать только один процесс одновременно.


Тогда поступим просто и по делу — если у микросервисов только один внутренний эндпоинт для приёма вебхуков, и они не предоставляют внешнего API, это так и надо отразить в разделе. Вот как можно оформить:


7. API эндпоинты🔗

Микросервисы whatsapp_service и vk_service не предоставляют внешнего API. Единственные доступные публичные эндпоинты — это точки входа для приёма вебхуков от соответствующих платформ.


POST /webhook🔗

Назначение:🔗

Принимает входящие события от VK API или WAPPI (в зависимости от сервиса). После валидации и проверки запускает асинхронную обработку.

Пример запроса (VK и WAPPI):🔗

Это вы можете найти в документации WAPPI и документации VK

В VK также поддерживается type: confirmation, при котором возвращается специальный токен который вносит пользователь (он его берёт в настройках группы).


8. Логирование и мониторинг🔗

whatsapp_service и vk_service используют единый модуль логирования, применяемый во всех микросервисах проекта. Он реализован на базе стандартного logging с расширениями через pythonjsonlogger.

Так как везде используется один и тот же метод логирования, смотри его в документации data_service