vk_service/whatsapp_service
1. Назначение🔗
Микросервисы whatsapp_service и vk_service отвечают за приём и обработку входящих сообщений от пользователей с платформ WAPPI (WhatsApp) и VK соответственно.
Они реализуют следующий процесс:
- Принимают входящие вебхуки от внешней платформы.
-
Проверяют:
- Активен ли профиль (через
data_service). - Не находится ли пользователь в списке игнорируемых (по локальной БД).
- Не является ли отправитель GPT или менеджером.
- Активен ли профиль (через
-
Сохраняют сообщение в Redis в очередь
gpt_queue. -
Если нет активного процесса обработки (по ключу
gpt_lock):- Ставят техническую блокировку (
gpt_lock) на данного пользователя. - Запускают сбор сообщений в фоновом режиме (
asyncio.create_task(...)).
- Ставят техническую блокировку (
-
В течение времени, указанного в профиле:
- Собираются входящие сообщения.
-
По окончании таймера:
- Извлекается до 9 сообщений.
Больше 10 сообщений API OpenAI не принимает в 1 запросе
- Добавляется информационное сообщение (
"Юзер {имя} написал сообщения, дата {дата}, день недели {день}"). - Формируется payload OpenAI (Assistant API).
- Отправляется в
gpt_serviceпо HTTP со стримингом. - Полученный ответ отправляется пользователю в VK или WhatsApp.
- В
data_serviceотправляются данные о расходе токенов.
-
Если после ответа остались ещё сообщения — цикл повторяется.
- В случае ошибок GPT пользователь добавляется в список игнорируемых.
Сервис логирует ключевые действия: входящие данные, ход обработки, ошибки, расход токенов. Весь процесс управления сообщениями построен так, чтобы на одного пользователя одновременно выполнялся только один процесс обработки.
Этот раздел объеденяет несколько микросервисов так как у них идентичная архитектура и даже код, различия разве что в обработке вебхуков и отправке сообщение по API сервисов
2. Архитектура и компоненты🔗
Микросервисы whatsapp_service и vk_service реализованы на FastAPI. Каждый сервис принимает вебхуки от соответствующей платформы, обрабатывает сообщения, обращается к другим внутренним сервисам и отправляет ответ пользователю.
Используемые технологии🔗
- FastAPI — REST API и маршрутизация
- Redis — кэш профилей, блокировок и сообщений
- PostgreSQL + SQLAlchemy — локальное хранилище и работа с ORM
- Alembic — миграции БД
- httpx (async) — вызовы к другим микросервисам
- Асинхронный код (asyncio) — запуск фоновых задач без Celery
Взаимодействие с другими сервисами🔗
-
Внешние:
-
Приём вебхуков от:
- WAPPI (WhatsApp)
- VK API
-
Отправка сообщений пользователям через API соответствующей платформы
-
-
Внутренние:
-
gpt_service:- отправка сообщений GPT (HTTP + SSE)
- получение ответа и токен-статистики
-
data_service:- получение данных о профиле (по
profile_id) - отправка информации о GPT-запросах для учёта и биллинга
- получение данных о профиле (по
-
Кэш и временные структуры (Redis)🔗
| Назначение | Префикс ключа | TTL |
|---|---|---|
| Данные профиля | cached_vk_ / cached_whatsapp_ | Лучше уточнять в коде |
| ФИО пользователя (VK) | vk_user_full_name | |
| Техническая блокировка | vk_gpt_lock / whatsapp_gpt_lock | |
| Очередь сообщений | gpt_queue:{id} |
Очистка кэша осуществляется автоматически через TTL.
Асинхронная обработка🔗
- Сбор сообщений и их отправка в GPT выполняется фоново через
asyncio.create_task(...). - Использование блокировки (
gpt_lock) гарантирует, что одновременно будет выполняться не более одного процесса обработки на одного пользователя. - Celery или другие очереди не используются.
3. Основные модули и директории🔗
Исходный код микросервисов расположен в директориях app/. Оба проекта логически разделены на подмодули с идентичной архитектурой: конфигурация, взаимодействие с базой данных, логика Redis, работа с внешними API и интеграция с gpt_service.
Ниже — две отдельные структуры в виде дерева: одна для whatsapp_service, вторая для vk_service.
Структура whatsapp_service🔗
WhatsappBackend/
├── .env # Зависимости
├── .env.example # Пример файла зависимостей
├── alembic.ini # Конфиг Alembic
├── app/ # Основная папка проекта
│ ├── config.py # Извлечение данных из env и создание клиентов подключения к базам данных
│ ├── database.py # Фабрика сессий и клиенты подключения к БД
│ ├── dependencies.py # Зависимости SQLAlchemy
│ ├── logger.py # Самописный логгер
│ ├── main.py
│ ├── migration/
│ ├── models/
│ │ └── users.py # Модель юзера для игнора
│ └── utils/
│ ├── check_bot_data.py # Запрос данных о профиле
│ ├── crud.py
│ ├── date_time.py # Получение текущего времени
│ ├── caption_message.py # Формирование пояснительного сообщения для чатгпт
│ ├── gpt/
│ │ ├── gpt_service_api.py
│ │ └── send_tokens.py # Отправка данных в data_service для создани траназкции о расходах
│ ├── redis/
│ │ ├── add_message.py
│ │ ├── cache_data.py
│ │ ├── check_batch.py # Проверка готовности отправки следующей пачки сообщений
│ │ ├── crud_lock_keys.py # Блокировка обработки сообщений юзера
│ │ └── crud_messages.py
│ ├── wappi/
│ │ └── api.py
│ └── webhook/
│ ├── extract_message.py # Извлечение данных из вебхука Wappi
│ ├── handle_webhook.py # Первичная обработка вебхука
│ └── process_webhook.py # Основная обработка вебхука
├── docker-compose.yml
├── logs/
├── readme.md
├── requirements.txt
Структура vk_service🔗
├── .env # Зависимости
├── .env.example # Пример файла зависимостей
├── alembic.ini # Конфиг Alembic
├── app/ # Основная папка проекта
│ ├── config.py # Извлечение данных из env и создание клиентов подключения к базам данных
│ ├── database.py # Фабрика сессий и клиенты подключения к БД
│ ├── dependencies.py # Зависимости SQLAlchemy
│ ├── logger.py # Самописный логгер
│ ├── main.py
│ ├── migration/
│ ├── models/
│ │ └── users.py # Модель юзера для игнора
│ └── utils/
│ ├── check_bot_data.py # Запрос данных о профиле
│ ├── crud.py
│ ├── check_bot_data.py # Запрос данных о профиле
│ ├── gpt/
│ │ ├── gpt_service_api.py
│ │ └── send_tokens.py # Отправка данных в data_service для создани траназкции о расходах
│ ├── redis/
│ │ ├── add_message.py
│ │ ├── cache_data.py
│ │ ├── cache_user_full_name.py # Кэширование имени юзера (т.к. в вк его запрашивают отдельно)
│ │ ├── check_batch.py # Проверка готовности отправки следующей пачки сообщений
│ │ ├── crud_lock_keys.py # Блокировка обработки сообщений юзера
│ │ └── crud_message.py
│ ├── vk.py
│ └── webhook/
│ ├── check_payload.py # Проверка вебхука на payload чтобы понять написал ли это сообщение бот
│ ├── extract_message.py # Извлечение данных из вебхука Wappi
│ ├── handle_webhook.py # Первичная обработка вебхука
│ ├── message_wrapper.py # Перевод вебхука в удобный объект
│ └── process_webhook.py # Основная обработка вебхука
│ └── vk_api.py
├── docker-compose.yml
├── logs/
├── readme.md
├── requirements.txt
Ключевые отличия между сервисами заключаются в интеграционном уровне — в whatsapp_service используется WAPPI, в vk_service — VK API, что отражается в структуре webhook/ и вспомогательных утилитах.
4. Переменные окружения🔗
Оба микросервиса используют одинаковый набор переменных окружения для подключения к PostgreSQL, Redis, взаимодействия с внутренними сервисами и логгирования. Ниже — полный список.
Обязательные и необязательные переменные🔗
Обязательные:🔗
| Переменная | Назначение |
|---|---|
POSTGRES_USER | Пользователь PostgreSQL |
POSTGRES_PASSWORD | Пароль PostgreSQL |
POSTGRES_DB | Название базы данных |
POSTGRES_PORT | Порт PostgreSQL |
POSTGRES_HOST | Хост PostgreSQL |
REDIS_HOST | Хост Redis |
REDIS_PORT | Порт Redis |
REDIS_DB | Номер используемой базы Redis |
CREDENTIALS_DOMAIN | URL сервиса data_service |
GPT_SERVICE_DOMAIN | URL сервиса gpt_service |
TOKENS_ADD_URL | Относительный путь для отправки токенов |
Опциональные (имеют значения по умолчанию):🔗
| Переменная | По умолчанию | Назначение |
|---|---|---|
LOG_LEVEL | DEBUG | Уровень логов |
USE_JSON_ONLY | false | JSON-лог только в консоль |
USE_TEXT_FILE_LOG | true | Включение текстового лога в файл |
LOG_JSON_PATH | logs/json_app.log | Путь до JSON-лога |
LOG_FILE_PATH | logs/app.log | Путь до текстового лога |
MAX_LOG_SIZE_MB | 5 | Максимальный размер лог-файла (в МБ) |
UVICORN_ERROR_LOG_PATH | logs/uvicorn_error.log | Путь до логов ошибок uvicorn |
5. Взаимодействие с другими сервисами🔗
Микросервисы whatsapp_service и vk_service взаимодействуют с рядом внешних и внутренних сервисов. Ниже описано, как устроены эти связи.
gpt_service🔗
Служит прокси между микросервисом и OpenAI API. Принимает входящие сообщения и возвращает стриминг-ответ от модели.
- Метод:
POST - URL:
${GPT_SERVICE_DOMAIN}/send_message - Формат:
httpx.AsyncClient(...).stream(...) - Payload:
{
"assistant_key": "...",
"open_ai_key": "...",
"user_id": "...",
"message": "...",
"request_id": "...",
"sender": "..."
}
answer— текст ответаinput_tokens,output_tokens,openai_modelfinal: true— завершение
На основании ответа данные отправляются пользователю, а usage передаётся в data_service.
data_service🔗
Используется для получения профиля и записи токенов после GPT-запроса.
Получение профиля🔗
- Метод:
GET - URL:
${CREDENTIALS_DOMAIN}/profile/{profile_id} - Назначение: Получение ключей, имени, параметров (таймер и т.д.)
Учёт токенов🔗
- Метод:
POST - URL:
${CREDENTIALS_DOMAIN}${TOKENS_ADD_URL} - Payload:
{
"account_identifier": "profile_id",
"input_tokens": 1234,
"output_tokens": 567,
"service": "VK" | "WhatsApp",
"openai_model": "gpt-4.0",
"request_id": "..."
}
Ответ содержит tx_id, balance, которые кешируются в Redis.
Внешние API платформ🔗
VK API🔗
Отправка сообщений🔗
- URL:
https://api.vk.ru/method/messages.send - Метод:
POST -
Параметры:
-
user_id,message,random_id,payload,v - Заголовок:
Authorization: Bearer <токен из профиля>
Запрос имени пользователя🔗
- URL:
https://api.vk.ru/method/users.get - Метод:
GET -
Параметры:
-
user_ids,fields=first_name,last_name,v - Заголовок:
Authorization: Bearer <токен из профиля>
Используется для кэширования имени пользователя при необходимости отобразить в ответе.
WAPPI API (WhatsApp)🔗
Отправка сообщений🔗
- URL:
https://wappi.pro/api/sync/message/send - Метод:
POST -
Заголовок:
-
Authorization: <токен из профиля> Content-Type: application/json-
Параметры запроса:
-
profile_id— ID интеграции в WAPPI - Тело запроса:
Если токен не получен или данные профиля не загружены, отправка прерывается с логированием ошибки.
6. Обработка данных и логика🔗
Микросервисы whatsapp_service и vk_service реализуют единый сценарий асинхронной обработки входящих сообщений с использованием Redis, блокировок и стримингового общения с GPT.
Этап 1 — Приём и первичная фильтрация🔗
- Вебхук поступает от платформы (VK / WAPPI) и разбирается на составные части, логируется и отправляется дальше.
-
Выполняются проверки:
- Тип события (например,
outgoing_message_apiпропускается), - Отправитель — не бот и не наш API,
- Пользователь не в игноре (через локальную БД),
- В VK: дополнительно обрабатывается
confirmation-вебхук.
- Тип события (например,
-
Если всё валидно — сообщение разбирается и помещается в Redis-очередь (
gpt_queue).
Этап 2 — Контроль блокировки и запуск обработки🔗
Перед началом сбора сообщений проверяется наличие блокировки (gpt_lock). Если её нет:
- Устанавливается Redis-блокировка,
Блокировка нужна чтобы на 1 юзера был только 1 процесс обработки. Сама блокировка это просто поле в Redis с айди юзера
- Запускается
asyncio.create_task(...)с отложенной обработкой (schedule_gpt_message_handling).
Таймер (wait_time) извлекается из профиля и реализуется через asyncio.sleep(...).
Этап 3 — Сбор сообщений и отправка в GPT🔗
Потому что OpenAI API не пустит больше 10
- Из Redis извлекаются последние 9 сообщений.
- В начало добавляется пояснительное сообщение:
- Все сообщения передаются в
gpt_service(черезsend_to_fastapi_gpt(...)) в стриминговом режиме.
Если GPT возвращает ответ — он:
- логгируется,
- отправляется пользователю (через VK API или WAPPI),
- создаётся запись о расходе токенов в
data_service, - обновляется кэш баланса в Redis.
Обработка ошибок🔗
Если во время отправки в GPT возникает ошибка (любой тип), пользователь:
- автоматически вносится в список игнорируемых (
add_ignored_number(...)), - получает сообщение о передаче разговора менеджеру (через платформу),
- дальнейшая обработка прерывается.
Исключения логгируются, но не пробрасываются — стабильность сохраняется.
Этап 4 — Продолжение или завершение🔗
После ответа GPT:
- Если в очереди остались сообщения — запускается новый цикл, с учётом интервала ожидания
wait_time(черезcheck_batch_ready(...)). - Если очередь пуста — Redis-блокировка снимается.
Это гарантирует, что на одного пользователя может работать только один процесс одновременно.
Тогда поступим просто и по делу — если у микросервисов только один внутренний эндпоинт для приёма вебхуков, и они не предоставляют внешнего API, это так и надо отразить в разделе. Вот как можно оформить:
7. API эндпоинты🔗
Микросервисы whatsapp_service и vk_service не предоставляют внешнего API. Единственные доступные публичные эндпоинты — это точки входа для приёма вебхуков от соответствующих платформ.
POST /webhook🔗
Назначение:🔗
Принимает входящие события от VK API или WAPPI (в зависимости от сервиса). После валидации и проверки запускает асинхронную обработку.
Пример запроса (VK и WAPPI):🔗
Это вы можете найти в документации WAPPI и документации VK
В VK также поддерживается
type: confirmation, при котором возвращается специальный токен который вносит пользователь (он его берёт в настройках группы).
8. Логирование и мониторинг🔗
whatsapp_service и vk_service используют единый модуль логирования, применяемый во всех микросервисах проекта. Он реализован на базе стандартного logging с расширениями через pythonjsonlogger.
Так как везде используется один и тот же метод логирования, смотри его в документации data_service