Перейти к содержанию

data_service

1. Назначение🔗

data_service (Микросервис данных и статистики) — это внутренний асинхронный сервис, предоставляющий доступ к основной базе данных проекта. Он необходим для взаимодействия всех асинхронных микросервисов с централизованным хранилищем данных, созданным на Django, без прямого подключения к его синхронному ORM.

Сервис выполняет две ключевые задачи:

  1. Предоставление данных: по запросу других интегарционных микросервисов (например, WhatsApp, VK и пр.) выдаёт подробную информацию о профиле пользователя. Ответ формируется в виде JSON-объекта, содержащего все данные, связанные с запрашиваемым профилем. Обработка этих данных происходит уже на стороне вызывающего сервиса.

  2. Создание транзакций: сервис принимает информацию о расходе токенов (например, после запроса к OpenAI) и сохраняет данные о транзакции, включая модель GPT, количество токенов, сумму, идентификаторы аккаунта и сервиса. В процессе также обновляется кэш баланса пользователя, и вызывающему сервису возвращается обновлённое значение.

Сервис подключается к основной базе данных PostgreSQL напрямую с помощью SQLAlchemy в асинхронном режиме (async_sessionmaker). Взаимодействие происходит через HTTP (REST API). UI и публичный доступ отсутствуют — data_service предназначен исключительно для внутренних нужд.

Зачем он нужен?

Причиной создания сервиса стала необходимость обеспечения асинхронного доступа к БД из микросервисов, минуя ограничения синхронного Django ORM. Он служит API-обёрткой над основной базой данных проекта.

Взгляд в будущее

В будущем планируется расширение функциональности: учёт пополнений, метрик, статистических агрегатов и аналитических данных. Но это не точно, может это сделаю в джанго или в отдельном микросервисе


2. Архитектура и компоненты🔗

Микросервис data_service реализован на FastAPI с использованием SQLAlchemy как ORM для асинхронного взаимодействия с основной базой данных PostgreSQL. Все модели базы данных продублированы вручную, чтобы обеспечить полноценную работу с объектами SQLAlchemy вместо прямого написания SQL-запросов. Валидация входящих и исходящих данных реализована через Pydantic-схемы.

Для логирования используется pythonjsonlogger — все логи выводятся в структурированном JSON-формате.

Компоненты:🔗

  • FastAPI — веб-фреймворк
  • SQLAlchemy (async) — асинхронная ORM
  • PostgreSQL — основная база данных проекта
  • Pydantic — схемы валидации данных
  • pythonjsonlogger — логирование

Взаимодействие с системой:🔗

data_service вызывается всеми интеграционными микросервисами (WhatsApp, VK, Web Widget и др.) через HTTP-запросы (REST API). Он реагирует на входящие запросы, обрабатывает их, и при необходимости взаимодействует с базой данных: извлекает или записывает данные.

Важно

Сервис сам не инициирует исходящие запросы и не взаимодействует с другими системами напрямую. Он выступает как API-шлюз к основной базе данных.

Кэширование и фоновые задачи:🔗

Кэширование не реализовано на уровне data_service — за это отвечают внешние микросервисы. Это позволяет уменьшить нагрузку на основной сервис и избежать повторных обращений к БД.

Тестовый скрипт

Также в сервис включён вспомогательный скрипт, запускаемый по крону, который ежедневно (в 03:00) списывает абонентскую плату с активных профилей. Этот скрипт не относится напрямую к FastAPI-приложению, но использует тот же ORM-слой и подключение к БД. (Да и просто я не знал куда его засунуть)


3. Основные модули и директории🔗

Структура проекта организована по принципу логического разделения ответственности. Все исходные файлы расположены внутри папки app/.

app/
├── main.py               # Точка входа, инициализация FastAPI-приложения
├── config.py             # Загрузка и парсинг конфигурации из .env
├── database.py           # Инициализация подключения к PostgreSQL через SQLAlchemy
├── dependencies.py       # Зависимости FastAPI (Depends)
├── logger.py             # Настройка логгера с JSON-выводом
├── utils/                # Вспомогательные утилиты
├── routers/              # HTTP-роутеры FastAPI
│   └── widget.py         # Роуты, связанные с профилями\чатами\сообщениями виджета (отдельные роутеры для него)
├── models/               # SQLAlchemy-модели (воссозданы вручную из Django)
│   ├── widget/           # Модели виджета
│   └── ...               # Прочие модели
├── schemas/              # Pydantic-схемы для валидации запросов/ответов
│   ├── base.py           # Общие схемы
│   └── widget.py         # Схемы для виджета
├── migration/            # Alembic миграции базы данных (не используется)

Дополнительно:🔗

  • daily_subscriptions/ — директория со скриптом списания ежедневной абонентской платы, запускаемым по крону. Работает независимо от FastAPI, но использует те же модели и подключение к БД.
  • logs/ — директория для хранения логов (если указана соответствующая настройка).
  • .env, .env.example — переменные окружения и пример
  • alembic.ini, docker-compose.yml, requirements.txt — конфигурационные и окруженческие файлы

Такое разделение позволяет легко масштабировать проект и переиспользовать код между сервисами и задачами.


4. Переменные окружения🔗

Микросервис использует переменные окружения, задаваемые через .env или переменные системы. Ниже — перечень ключевых параметров и их назначение:

Переменная Назначение
POSTGRES_USER Имя пользователя PostgreSQL
POSTGRES_PASSWORD Пароль пользователя PostgreSQL
POSTGRES_DB Название базы данных
POSTGRES_HOST Адрес хоста БД
POSTGRES_PORT Порт подключения к БД (обычно 5432)

Логирование и лог-файлы🔗

Переменная Назначение
LOG_LEVEL Уровень логирования (DEBUG, INFO, WARNING и т.д.)
USE_JSON_ONLY Если true — консольный лог только в формате JSON, иначе — читаемый текст
USE_TEXT_FILE_LOG Вести ли дублирующий лог в обычном текстовом виде
LOG_JSON_PATH Путь к JSON-логу (по умолчанию logs/json_app.log)
LOG_FILE_PATH Путь к читаемому текстовому логу
MAX_LOG_SIZE_MB Максимальный размер каждого лог-файла в мегабайтах (ротация по размеру)
UVICORN_ERROR_LOG_PATH Отдельный файл логов с ошибками Uvicorn (например, traceback при падении сервера)

Дополнительные🔗

Переменная Назначение
CURRENCYAPI_KEY Ключ доступа к внешнему API для мониторинга валют. В настоящее время отключён по решению заказчика.

О переменных

Все переменные, связанные с логами, используются внутри logger.py, который реализует расширенную настройку логирования, включая JSON-вывод, ротацию логов и фильтрацию ошибок. Поддерживается fallback для нестандартных объектов при сериализации.


5. Взаимодействие с другими сервисами🔗

Микросервис data_service используется как внутренний API-слой к основной базе данных и вызывается всеми интеграционными микросервисами, включая:

  • whatsapp_service
  • vk_service
  • widget_service
  • и другие, обрабатывающие внешние каналы общения

Способ взаимодействия:🔗

Взаимодействие происходит через HTTP-запросы (REST API) по внутренней сети. Запросы выполняются к FastAPI-интерфейсу, реализованному в data_service.

Типы взаимодействия:🔗

  • Чтение данных:

    • Получение информации о профиле по profile_id
    • Получение информации об аккаунте, токенах, тарифах и т.п.
  • Запись данных:

    • Создание транзакций расходов (например, после использования OpenAI API)
    • Обновление информации о пользователях (ответ от данного микросервиса)

Формат передачи:🔗

  • JSON в теле запроса
  • JSON в теле ответа

data_service изолирован от внешнего мира

data_service не инициализирует исходящих запросов, не подключается к другим API и не использует очереди. Он полностью реактивен — только принимает запросы и отвечает на них, взаимодействуя напрямую с основной БД через SQLAlchemy.


6. Обработка данных и логика🔗

data_service обрабатывает входящие запросы от интеграционных микросервисов, обеспечивая асинхронный доступ к основной базе данных проекта. Логика сервиса организована в виде FastAPI-эндпоинтов с чётко определённой схемой взаимодействия: валидация данных, работа с ORM-моделями через SQLAlchemy и возврат структурированного JSON-ответа.

Чтение данных (GET-запросы):🔗

При запросе данных (например, /credentials) сервис:

  1. Валидирует входные параметры (через query-параметры или pydantic-схемы).
  2. Выбирает соответствующую модель профиля по коду сервиса (vk, whatsapp, widget) и идентификатору аккаунта.
  3. Выполняет JOIN с нужными связями (например, user, vk_profile) с помощью selectinload для оптимизации количества SQL-запросов.
  4. Формирует ответ, используя самописный универсальный метод .to_dict() с поддержкой вложенных объектов.

Если данные не найдены или возникла ошибка на уровне SQLAlchemy — возвращается ошибка 404 или 500, при этом происходит логирование события.

Создание транзакций:🔗

Маршрут /tokens/add выполняет:

  1. Поиск сервиса (ServiceRegistry) и профиля пользователя по переданному идентификатору.
  2. Поиск используемой OpenAI-модели и её тарифов (если не найдена — стоимость считается нулевой).
  3. Определение типа транзакции (GPT_USAGE).
  4. Расчёт итоговой стоимости (amount_rub).
  5. Создание записи в Transaction и TransactionDetail, привязка к аккаунту и профилю.
  6. Обновление кэшированного баланса (balance_cached) в CustomUser.
  7. Фиксация изменений через session.commit().

Ошибки (IntegrityError, OperationalError, SQLAlchemyError) обрабатываются с возвратом 500 и логируются с подробностями (включая request_id).

Обработка виджетов:🔗

Отдельные маршруты внутри widget_router обеспечивают доступ к данным, специфичным для виджета:

  • /get_messages — получить все сообщения пользователя, отсортированные по времени.
  • /message — добавить новое сообщение в чат, создать чат при необходимости.
  • /credentials — получить параметры отображения, включая иконки и связанные профили.

Все данные записываются напрямую в БД, без дополнительной фильтрации, контроля частоты или ограничения дубликатов. Эти эндпоинты используются исключительно widget_service, как асинхронный интерфейс к данным, хранящимся в PostgreSQL.


7. API эндпоинты🔗

Все эндпоинты работают через HTTP и используют формат JSON. Авторизация не применяется (внутреннее использование). Эндпоинты разделены на две группы: общие и специфичные для виджета.


Создание транзакции GPT🔗

POST /tokens/add

Создаёт транзакцию расхода токенов и возвращает обновлённый кэш баланса пользователя.

Request:

{
  "request_id": "uuid",
  "service": "whatsapp",
  "account_identifier": "abc123",
  "openai_model": "gpt-4",
  "input_tokens": 532,
  "output_tokens": 180,
  "description": "Ответ пользователю"
}
  • service — код сервиса (из модели ServiceRegistry.code). Допустимые значения: VK, WHATSAPP, WIDGET.

  • account_identifier — уникальный ID аккаунта. В зависимости от сервиса:

    • VK: group_id
    • WHATSAPP: profile_id
    • WIDGET: name (из BaseProfile)

WIDGET можно использовать оказывается здесь, но не стоит, лучше использовать эндпоинт виджета

  • Если service или account_identifier указаны неверно — транзакция не создаётся.
  • Баланс пользователя списывается, и микросервису отправляется актуальное значение.

Response:

{
  "tx_id": 134,           // ID созданной транзакции
  "balance": 2.75         // Новый баланс после списания
}

Получение данных профиля🔗

GET /credentials

Возвращает информацию о профиле и балансе по service и account_identifier.

Query-параметры:

  • serviceVK, WHATSAPP, WIDGET
  • account_identifier — в зависимости от сервиса

Логика выбора account_identifier из полей такая же как и в прошлом эндпоинте

Response (пример):

{
  "id": 1,
  "user_id": 1,
  "service_id": 1,
  "name": "Тестовый",
  "open_ai_assistant_key": "asst_...",
  "open_ai_key": "sk-proj-...",
  "wait_time": 15,
  "is_active": false,
  "unignore_triggers": [
    "убрать"
  ],
  "started_at": null,
  "next_billing_date": "2025-05-25",
  "amount_rub": 33.333333,
  "vk_profile": {
    "base_profile_id": 3,
    "group_id": "114730803",
    "token": "vk1.a.-...",
    "return_string": "bf87cd63"
  },
  "balance_cached": 16.86
}

К json'у в конце цепляется json представление модели профиля (вк,ватсап)


7.1 Виджет-эндпоинты🔗


Получение сообщений🔗

GET /widget/get_messages

Возвращает все сообщения из чата, привязанного к user_id, отсортированные по времени.

Query-параметры:

  • user_id: строка — уникальный ID, назначаемый JavaScript-виджетом в браузере.

Response:

[
  {
    "id": 1,
    "chat_id": 1,
    "sender": "user",
    "message": "Привет, GPT!",
    "timestamp": "2025-05-28T07:46:57.149733+00:00"
  },
  {
    "id": 2,
    "chat_id": 1,
    "sender": "chatgpt",
    "message": "Привет! Как я могу помочь тебе сегодня?",
    "timestamp": "2025-05-28T07:47:04.247230+00:00"
  }
]

Добавление сообщения🔗

POST /widget/message

Добавляет новое сообщение в чат, создавая чат при необходимости.

Request:

{
  "text": "Привет",
  "sender": "user",        // или "chatgpt"
  "user_id": "123",
  "widget_id": 7           // ID BaseProfile, к которому привязан виджет
}

Response:

  • HTTP 200 OK (без тела)

Получение настроек виджета🔗

GET /widget/credentials

Возвращает конфигурацию виджета по домену: базовый профиль, ключи OpenAI, баланс, приветственный текст и др.

Query-параметры:

  • domain: строка (домен, связанный с WidgetSettings)

Response (пример):

{
  "base_profile": {
    "id": 4,
    "name": "Widget",
    "open_ai_assistant_key": "asst_...",
    "open_ai_key": "sk-...",
    "is_active": true,
    "user": {
      "balance_cached": "228.1337"
    }
  },
  "welcome_text": "welcome to the us",
  "start_hints": ["hint1", "hint2", "hint3"],
  "widget_left": true,
  "tg_token": "934554334:AACEC5i...",
  "manager_tg_id": ["123", "456"],
  "icon_url": "icon/dfffFEef22"
}

8. Логирование и мониторинг🔗

data_service использует единый модуль логирования, применяемый во всех микросервисах проекта. Он реализован на базе стандартного logging с расширениями через pythonjsonlogger.

Особенности логгера:🔗

  • Поддержка JSON-формата логов с добавлением всех переданных extra-полей
  • Опциональный человеко-читаемый лог в отдельный файл (если включено через .env)
  • Ротация логов: ограничение по размеру файла, хранение нескольких резервных копий
  • Выделенный лог-файл для ошибок uvicorn, с форматированием под traceback
  • Безопасная сериализация extra, чтобы избежать конфликта со стандартными атрибутами логов

Пример логов:🔗

{
  "asctime": "2025-06-26 19:45:00",
  "levelname": "INFO",
  "name": "app_logger",
  "message": "Транзакция успешно создана",
  "request_id": "7fa5...",
  "tx_id": 142,
  "amount": 2.36,
  "module": "transactions",
  "funcName": "create_transaction",
  "lineno": 128
}

Конфигурация через .env:🔗

Переменная Назначение
LOG_LEVEL Уровень логирования (DEBUG, INFO...)
USE_JSON_ONLY Включить JSON-лог в stdout
USE_TEXT_FILE_LOG Вести лог в читаемом текстовом виде
LOG_JSON_PATH Путь к JSON-логу
LOG_FILE_PATH Путь к читаемому логу
MAX_LOG_SIZE_MB Максимальный размер файла (в МБ)
UVICORN_ERROR_LOG_PATH Лог uvicorn-ошибок (traceback'и и сбои сервера)

Алерты и мониторинг:🔗

На текущий момент алерты и мониторинг не настроены. В будущем планируется... что нибудь =)