Перейти к содержанию

gpt_service

1. Назначение🔗

gpt_service — микросервис, обеспечивающий асинхронный доступ к OpenAI Assistant API (/v1/assistants/...) для получения ответов от ChatGPT. Является промежуточным слоем между интеграционными сервисами (WhatsApp, VK, Widget и др.) и OpenAI, позволяя централизованно управлять обращениями к ассистенту и сохранять контекст диалога.

Основные функции:🔗

  • Принимает входящие сообщения от интеграционных сервисов по REST API.
  • Обеспечивает отправку сообщений в OpenAI Assistant API.
  • Отдаёт ответ от ChatGPT в виде одного сообщения, либо через стриминг (StreamingResponse) в случае длинных или составных ответов.
  • Отслеживает статус thread, чтобы при необходимости получить дополнительные ответы (в течение нескольких секунд).
  • Формирует финальный ответ в стандартизированном виде, включая:

  • текст ответа,

  • имя модели,
  • количество использованных токенов,
  • признак завершённости (final).

Работа с памятью (thread):🔗

  • Хранит thread_id, полученный от OpenAI, в собственной базе PostgreSQL, привязывая его к:

  • user_id (внутренний идентификатор пользователя),

  • sender (уникальный ID отправителя — например, group_id из VK или profile_id из WhatsApp),
  • service (название вызвавшего микросервиса),
  • дате создания.

  • Если диалог уже существует — используется сохранённый thread_id, что позволяет OpenAI сохранять контекст диалога и не начинать его заново.

Этот сервис создаёт изолированный и унифицированный доступ ко всем AI-взаимодействиям в системе, избавляя интеграционные слои от необходимости напрямую обращаться к OpenAI и повторно реализовывать логику управления thread/session.

Дополнительные возможности:🔗

  • Если в процессе обработки сообщения ассистент вызывает функцию (tool_call) — gpt_service самостоятельно обращается к tools_service, передаёт параметры вызова и возвращает результат в OpenAI, продолжая диалог.
  • Это позволяет реализовывать сценарии, в которых ChatGPT действует как посредник между пользователем и другими микросервисами, вызывая, например, оплату, получение баланса, создание ссылки и т.д.

2. Архитектура и компоненты🔗

gpt_service реализован на базе FastAPI с асинхронной обработкой запросов. Он предоставляет REST-интерфейс для получения ответов от ChatGPT через OpenAI Assistant API, а также управляет жизненным циклом thread (контекста диалога).

Используемые технологии:🔗

  • FastAPI — HTTP-интерфейс
  • Async SQLAlchemy — ORM
  • PostgreSQL — локальная БД для хранения thread_id
  • alembic — миграции с автогенерацией (revision --autogenerate)
  • OpenAI Python SDK (async) — взаимодействие с Assistant API
  • StreamingResponse — для стриминга ответов от ChatGPT
  • pythonjsonlogger — структурированное логирование

Поток данных:🔗

  • Вход: интеграционные сервисы (VK, WhatsApp, Widget и т.д.) отправляют запрос через HTTP POST на эндпоинты:

    • /send_message
    • /streaming_send_message
  • Обработка:

    • Проверяется наличие thread_id по паре user_id + sender.
    • Если нет — создаётся новый thread, сохраняется в БД.
    • Сообщение отправляется в OpenAI Assistant API.
    • При наличии tool_calls от GPT — вызывается tools_service, результаты возвращаются в OpenAI.
    • Генерируется стриминг-ответ через SSE (text/event-stream) или StreamingResponse (application/json).
  • Выход:

    • Ответ пользователя
    • Модель (openai_model)
    • Токены (input_tokens, output_tokens)
    • Признак завершённости (final)
    • Использованные функции (если были)

Работа с внешними системами:🔗

  • OpenAI — основной обработчик диалогов
  • tools_service — вызывается при наличии tool_call от ChatGPT

Особенности:🔗

  • Поддержка нескольких tool_call в одном ответе
  • Автоматическое повторное выполнение run с передачей результата инструментов
  • Обработка ошибок OpenAI: логируются и возвращаются клиенту с кодом 500
  • Ошибки tools_service также не блокируют работу: ChatGPT получает сообщение об ошибке вызова

Хранилище:🔗

Локальная таблица users, структура:

Поле Назначение
user_id Уникальный ID пользователя
sender Название интеграционного сервиса
thread_id ID контекста диалога в OpenAI
created_at Дата и время создания

(user_id, sender) — уникальная пара, по которой находится thread.


3. Основные модули и директории🔗

Исходный код микросервиса расположен в директории app/. Проект логически разбит на подмодули:

app/
├── main.py                 # Точка входа — FastAPI-приложение
├── schemas.py              # Pydantic-схемы запроса
├── config.py               # Загрузка переменных окружения
├── database.py             # Настройка подключения к PostgreSQL
├── dependencies.py         # Зависимости FastAPI (сессии и т.д.)
├── logger.py               # Конфигурация логгера (JSON + fallback)
├── migration/              # Миграции Alembic
├── users/
│   └── models.py           # SQLAlchemy-модель пользователя + thread
├── utils/
│   ├── crud.py                     # Работа с БД (create_user и др.)
│   ├── gpt_client.py              # Класс GPT — оболочка над OpenAI API
│   ├── execute_tools.py           # Вызов микросервиса tools_service
│   ├── process_gpt.py             # Обработка обычных запросов
│   ├── streaming_gpt_process.py  # Обработка стриминга чатгпт
│   ├── clear_answer.py           # Удаление мусора из ответа (ссылки на файлы где берёт инфу)
│   ├── safe_wait.py              # Повторные попытки и ожидание завершения run

Дополнительно:🔗

  • logs/ — директория, куда выводятся логи, если включено логирование в файл
  • .env, .env.example — переменные окружения и пример
  • alembic.ini — конфигурация alembic
  • docker-compose.yml — развёртывание и настройки контейнеров

Структура проекта отражает разделение ответственности: низкоуровневая работа с GPT, вызовы внешних функций и поточная обработка вынесены в отдельные модули, что облегчает поддержку и масштабирование.


4. Переменные окружения🔗

Обязательные и необязательные переменные🔗

Обязательные:🔗

Переменная Назначение
POSTGRES_USER PostgreSQL: логин
POSTGRES_PASSWORD PostgreSQL: пароль
POSTGRES_DB Имя базы данных
POSTGRES_PORT Порт PostgreSQL
POSTGRES_HOST Хост PostgreSQL
TOOLS_CALL_DOMAIN Адрес микросервиса tools_service

Опциональные (имеют значения по умолчанию):🔗

Переменная По умолчанию Назначение
LOG_LEVEL DEBUG Уровень логов
USE_JSON_ONLY false JSON-лог только в консоль
USE_TEXT_FILE_LOG true Включение текстового лога в файл
LOG_JSON_PATH logs/json_app.log Путь до JSON-лога
LOG_FILE_PATH logs/app.log Путь до текстового лога
MAX_LOG_SIZE_MB 5 Максимальный размер лог-файла (в МБ)
UVICORN_ERROR_LOG_PATH logs/uvicorn_error.log Путь до логов ошибок uvicorn

5. Взаимодействие с другими сервисами🔗

Микросервис gpt_service принимает HTTP-запросы от сервисов интеграций — vk_service, whatsapp_service, widget_service — и возвращает ответ от Assistant API OpenAI. Также при необходимости он вызывает tools_service, если ассистент инициирует вызов встроенной функции.


Запросы🔗

Сервис принимает два варианта взаимодействия:

  1. POST /send_message — обычный ответ (формально тоже стриминговый, на тот случай если ассистент выдаст несколько ответов).
  2. POST /streaming_send_message — стриминговый ответ (SSE), при котором клиент получает чанки в режиме реального времени.

Обработка: обычный ответ (/send_message)🔗

  1. Сначала проверяется наличие thread_id в БД (таблица users), по user_id и sender.

    • Если найден — используется.
    • Если нет — создаётся новый Thread, создаётся Run, пользователь добавляется в БД.
  2. Далее:

    • В Thread добавляется сообщение.
    • Запускается Run.
    • Асинхронно проверяется его завершение (до 60 сек, повторно при вызове функций).
    • Если статус Thread - requires_action, вызывается tools_service (для всех запрошенных функций), результат передаётся в OpenAI.
    • После завершения Run вытаскивается текст ответа (pretty_print).
  3. Результат:

{
  "answer": "Привет! Я могу помочь с разными задачами.",
  "final": false,
  "request_id": "abc-123",
  "input_tokens": 45,
  "output_tokens": 78,
  "openai_model": "gpt-4-1106-preview"
}
  1. Через несколько секунд повторно проверяется статус Run:

  2. Если status = completed, то возвращается:

{
  "final": true,
  "request_id": "abc-123"
}

Заметка

Хотя возврат оформлен как StreamingResponse, фактически клиент получает одно сообщение, но все же может отправиться еще ответы, если ассистент их выдал, на этот случай и реализован StreamingResponse.


Обработка: стриминговый ответ (/streaming_send_message)🔗

  1. Потоковая генерация ответа с использованием OpenAI stream=True.
  2. Ассистент может по ходу запроса вызвать tools_service, результат передаётся обратно в OpenAI.
  3. На выходе клиент получает:

  4. Чанки:

{"type": "chunk", "data": "Привет! Я м"}
  • Финальный блок:
{
  "type": "final",
  "functions": [],
  "data": "Привет! Я могу помочь с разными задачами.",
  "request_id": "abc-123",
  "input_tokens": 45,
  "output_tokens": 78,
  "openai_model": "gpt-4-1106-preview"
}

Особенности:🔗

  • gpt_service самостоятельно управляет созданием и продолжением Thread, обеспечивая "память" диалога.
  • Запоминает sender, чтобы не было коллизий идентификаторов между разными платформами.
  • Может вызывать tools_service для обработки assistant tools.
  • Возвращает подробную статистику: input_tokens, output_tokens, openai_model, functions.

6. Обработка данных и логика🔗

Входные данные🔗

Сервис получает POST-запрос на один из эндпоинтов (/send_message или /streaming_send_message) с телом формата MessageRequest:

{
  "assistant_key": "asst_123",
  "open_ai_key": "sk-...",
  "user_id": "abc123",
  "message": [{"type": "text", "text": "Привет"}],
  "sender": "whatsapp",
  "request_id": "req-001"
}

Основная логика🔗

  1. Поиск или создание треда:

    • Ищется запись в БД по user_id и sender.
    • Если не найдено — создаётся новый Thread в OpenAI, затем сохраняется в таблицу users.
    • Если найден — используется сохранённый thread_id.
  2. Отправка сообщения:

    • Создаётся сообщение в Thread (OpenAI API).
    • Запускается Run.
  3. Ожидание завершения:

    • Через safe_wait_on_run или stream_with_tools (в зависимости от режима).
    • Если requires_action, вызываются функции через tools_service, и результат передаётся в OpenAI.
  4. Извлечение ответа:

    • Извлекается либо весь ответ целиком (pretty_print), либо поступает в чанках (при стриминге).
    • Удаляются ссылки на файлы.
  5. Финальная стадия:

  6. Возвращается JSON с результатом + финальный блок с меткой final: true или type: final.


Ошибки🔗

  • Если Run не завершился за 60 секунд, выбрасывается TimeoutError.
  • Если статус Run = failed — возвращается сообщение об ошибке.
  • Все ошибки логируются через log.error(...).

Ответ клиенту🔗

  • В обычном режиме: JSON-строки, по одной на каждый блок ответа (StreamingResponse).
  • В стриминге: Server-Sent Events (SSE), каждая часть ответа отдаётся по мере генерации.

Переходим к следующему разделу.


7. API эндпоинты🔗

Сервис предоставляет два основных HTTP-эндпоинта для взаимодействия с OpenAI Assistant API. Оба эндпоинта используют схему MessageRequest.

POST /send_message🔗

Отправка сообщения в чат-бота и получение ответа. Используется там где не возможно реализовать потоковый вывод (т.е. везде кроме виджета).

Request:

{
  "assistant_key": "asst_123",
  "open_ai_key": "sk-...",
  "user_id": "abc123",
  "message": [{"type": "text", "text": "Привет"}],
  "sender": "whatsapp",
  "request_id": "req-001"
}

Response (streamed line-by-line, text/event-stream):

{
  "answer": "Привет! Чем могу помочь?",
  "final": false,
  "request_id": "req-001",
  "input_tokens": 17,
  "output_tokens": 23,
  "openai_model": "gpt-4"
}

Последний блок:

{
  "final": true,
  "request_id": "req-001"
}

Ответ формируется по частям и возвращается через StreamingResponse, каждая строка — JSON.


POST /streaming_send_message🔗

Реализация полной потоковой передачи данных (SSE), включая вызов tools_service, если ассистент инициирует ToolCall.

Request:

Такая же схема, как и для /send_message.

Response:

  • text/event-stream
  • Каждое событие отдаётся как:
data: {"type": "chunk", "data": "часть текста"}\n\n
  • Финальный блок:
data: {
  "type": "final",
  "functions": ["get_weather"],
  "data": "Температура в Москве 22°C",
  "request_id": "req-001",
  "input_tokens": 20,
  "output_tokens": 30,
  "openai_model": "gpt-4"
}\n\n

Примеры сообщений🔗

Message — список словарей, формат:

[
  {
    "type": "text",
    "text": "Какая погода в Москве?"
  }
]

Формат схемы запроса (MessageRequest)🔗

class MessageRequest(BaseModel):
    assistant_key: str
    open_ai_key: str
    user_id: str
    message: list[dict]
    sender: str
    request_id: str

8. Логирование и мониторинг🔗

gpt_service использует единый модуль логирования, применяемый во всех микросервисах проекта. Он реализован на базе стандартного logging с расширениями через pythonjsonlogger.

Так как везде используется один и тот же метод логирования, смотри его в документации data_service