gpt_service
1. Назначение🔗
gpt_service — микросервис, обеспечивающий асинхронный доступ к OpenAI Assistant API (/v1/assistants/...) для получения ответов от ChatGPT. Является промежуточным слоем между интеграционными сервисами (WhatsApp, VK, Widget и др.) и OpenAI, позволяя централизованно управлять обращениями к ассистенту и сохранять контекст диалога.
Основные функции:🔗
- Принимает входящие сообщения от интеграционных сервисов по REST API.
- Обеспечивает отправку сообщений в OpenAI Assistant API.
- Отдаёт ответ от ChatGPT в виде одного сообщения, либо через стриминг (
StreamingResponse) в случае длинных или составных ответов. - Отслеживает статус
thread, чтобы при необходимости получить дополнительные ответы (в течение нескольких секунд). -
Формирует финальный ответ в стандартизированном виде, включая:
-
текст ответа,
- имя модели,
- количество использованных токенов,
- признак завершённости (
final).
Работа с памятью (thread):🔗
-
Хранит
thread_id, полученный от OpenAI, в собственной базе PostgreSQL, привязывая его к: -
user_id(внутренний идентификатор пользователя), sender(уникальный ID отправителя — например,group_idиз VK илиprofile_idиз WhatsApp),service(название вызвавшего микросервиса),-
дате создания.
-
Если диалог уже существует — используется сохранённый
thread_id, что позволяет OpenAI сохранять контекст диалога и не начинать его заново.
Этот сервис создаёт изолированный и унифицированный доступ ко всем AI-взаимодействиям в системе, избавляя интеграционные слои от необходимости напрямую обращаться к OpenAI и повторно реализовывать логику управления thread/session.
Дополнительные возможности:🔗
- Если в процессе обработки сообщения ассистент вызывает функцию (
tool_call) —gpt_serviceсамостоятельно обращается кtools_service, передаёт параметры вызова и возвращает результат в OpenAI, продолжая диалог. - Это позволяет реализовывать сценарии, в которых ChatGPT действует как посредник между пользователем и другими микросервисами, вызывая, например, оплату, получение баланса, создание ссылки и т.д.
2. Архитектура и компоненты🔗
gpt_service реализован на базе FastAPI с асинхронной обработкой запросов. Он предоставляет REST-интерфейс для получения ответов от ChatGPT через OpenAI Assistant API, а также управляет жизненным циклом thread (контекста диалога).
Используемые технологии:🔗
- FastAPI — HTTP-интерфейс
- Async SQLAlchemy — ORM
- PostgreSQL — локальная БД для хранения
thread_id - alembic — миграции с автогенерацией (
revision --autogenerate) - OpenAI Python SDK (async) — взаимодействие с Assistant API
- StreamingResponse — для стриминга ответов от ChatGPT
- pythonjsonlogger — структурированное логирование
Поток данных:🔗
-
Вход: интеграционные сервисы (VK, WhatsApp, Widget и т.д.) отправляют запрос через HTTP
POSTна эндпоинты:/send_message/streaming_send_message
-
Обработка:
- Проверяется наличие
thread_idпо пареuser_id + sender. - Если нет — создаётся новый
thread, сохраняется в БД. - Сообщение отправляется в OpenAI Assistant API.
- При наличии
tool_callsот GPT — вызываетсяtools_service, результаты возвращаются в OpenAI. - Генерируется стриминг-ответ через SSE (
text/event-stream) или StreamingResponse (application/json).
- Проверяется наличие
-
Выход:
- Ответ пользователя
- Модель (
openai_model) - Токены (
input_tokens,output_tokens) - Признак завершённости (
final) - Использованные функции (если были)
Работа с внешними системами:🔗
- OpenAI — основной обработчик диалогов
- tools_service — вызывается при наличии
tool_callот ChatGPT
Особенности:🔗
- Поддержка нескольких
tool_callв одном ответе - Автоматическое повторное выполнение
runс передачей результата инструментов - Обработка ошибок OpenAI: логируются и возвращаются клиенту с кодом 500
- Ошибки
tools_serviceтакже не блокируют работу: ChatGPT получает сообщение об ошибке вызова
Хранилище:🔗
Локальная таблица users, структура:
| Поле | Назначение |
|---|---|
user_id | Уникальный ID пользователя |
sender | Название интеграционного сервиса |
thread_id | ID контекста диалога в OpenAI |
created_at | Дата и время создания |
(user_id, sender) — уникальная пара, по которой находится thread.
3. Основные модули и директории🔗
Исходный код микросервиса расположен в директории app/. Проект логически разбит на подмодули:
app/
├── main.py # Точка входа — FastAPI-приложение
├── schemas.py # Pydantic-схемы запроса
├── config.py # Загрузка переменных окружения
├── database.py # Настройка подключения к PostgreSQL
├── dependencies.py # Зависимости FastAPI (сессии и т.д.)
├── logger.py # Конфигурация логгера (JSON + fallback)
├── migration/ # Миграции Alembic
│
├── users/
│ └── models.py # SQLAlchemy-модель пользователя + thread
│
├── utils/
│ ├── crud.py # Работа с БД (create_user и др.)
│ ├── gpt_client.py # Класс GPT — оболочка над OpenAI API
│ ├── execute_tools.py # Вызов микросервиса tools_service
│ ├── process_gpt.py # Обработка обычных запросов
│ ├── streaming_gpt_process.py # Обработка стриминга чатгпт
│ ├── clear_answer.py # Удаление мусора из ответа (ссылки на файлы где берёт инфу)
│ ├── safe_wait.py # Повторные попытки и ожидание завершения run
Дополнительно:🔗
logs/— директория, куда выводятся логи, если включено логирование в файл.env,.env.example— переменные окружения и примерalembic.ini— конфигурация alembicdocker-compose.yml— развёртывание и настройки контейнеров
Структура проекта отражает разделение ответственности: низкоуровневая работа с GPT, вызовы внешних функций и поточная обработка вынесены в отдельные модули, что облегчает поддержку и масштабирование.
4. Переменные окружения🔗
Обязательные и необязательные переменные🔗
Обязательные:🔗
| Переменная | Назначение |
|---|---|
POSTGRES_USER | PostgreSQL: логин |
POSTGRES_PASSWORD | PostgreSQL: пароль |
POSTGRES_DB | Имя базы данных |
POSTGRES_PORT | Порт PostgreSQL |
POSTGRES_HOST | Хост PostgreSQL |
TOOLS_CALL_DOMAIN | Адрес микросервиса tools_service |
Опциональные (имеют значения по умолчанию):🔗
| Переменная | По умолчанию | Назначение |
|---|---|---|
LOG_LEVEL | DEBUG | Уровень логов |
USE_JSON_ONLY | false | JSON-лог только в консоль |
USE_TEXT_FILE_LOG | true | Включение текстового лога в файл |
LOG_JSON_PATH | logs/json_app.log | Путь до JSON-лога |
LOG_FILE_PATH | logs/app.log | Путь до текстового лога |
MAX_LOG_SIZE_MB | 5 | Максимальный размер лог-файла (в МБ) |
UVICORN_ERROR_LOG_PATH | logs/uvicorn_error.log | Путь до логов ошибок uvicorn |
5. Взаимодействие с другими сервисами🔗
Микросервис gpt_service принимает HTTP-запросы от сервисов интеграций — vk_service, whatsapp_service, widget_service — и возвращает ответ от Assistant API OpenAI. Также при необходимости он вызывает tools_service, если ассистент инициирует вызов встроенной функции.
Запросы🔗
Сервис принимает два варианта взаимодействия:
- POST
/send_message— обычный ответ (формально тоже стриминговый, на тот случай если ассистент выдаст несколько ответов). - POST
/streaming_send_message— стриминговый ответ (SSE), при котором клиент получает чанки в режиме реального времени.
Обработка: обычный ответ (/send_message)🔗
-
Сначала проверяется наличие
thread_idв БД (таблицаusers), поuser_idиsender.- Если найден — используется.
- Если нет — создаётся новый Thread, создаётся Run, пользователь добавляется в БД.
-
Далее:
- В Thread добавляется сообщение.
- Запускается Run.
- Асинхронно проверяется его завершение (до 60 сек, повторно при вызове функций).
- Если статус
Thread-requires_action, вызываетсяtools_service(для всех запрошенных функций), результат передаётся в OpenAI. - После завершения Run вытаскивается текст ответа (
pretty_print).
-
Результат:
{
"answer": "Привет! Я могу помочь с разными задачами.",
"final": false,
"request_id": "abc-123",
"input_tokens": 45,
"output_tokens": 78,
"openai_model": "gpt-4-1106-preview"
}
-
Через несколько секунд повторно проверяется статус Run:
-
Если
status = completed, то возвращается:
Заметка
Хотя возврат оформлен как StreamingResponse, фактически клиент получает одно сообщение, но все же может отправиться еще ответы, если ассистент их выдал, на этот случай и реализован StreamingResponse.
Обработка: стриминговый ответ (/streaming_send_message)🔗
- Потоковая генерация ответа с использованием OpenAI
stream=True. - Ассистент может по ходу запроса вызвать
tools_service, результат передаётся обратно в OpenAI. -
На выходе клиент получает:
-
Чанки:
- Финальный блок:
{
"type": "final",
"functions": [],
"data": "Привет! Я могу помочь с разными задачами.",
"request_id": "abc-123",
"input_tokens": 45,
"output_tokens": 78,
"openai_model": "gpt-4-1106-preview"
}
Особенности:🔗
gpt_serviceсамостоятельно управляет созданием и продолжениемThread, обеспечивая "память" диалога.- Запоминает
sender, чтобы не было коллизий идентификаторов между разными платформами. - Может вызывать
tools_serviceдля обработки assistant tools. - Возвращает подробную статистику:
input_tokens,output_tokens,openai_model,functions.
6. Обработка данных и логика🔗
Входные данные🔗
Сервис получает POST-запрос на один из эндпоинтов (/send_message или /streaming_send_message) с телом формата MessageRequest:
{
"assistant_key": "asst_123",
"open_ai_key": "sk-...",
"user_id": "abc123",
"message": [{"type": "text", "text": "Привет"}],
"sender": "whatsapp",
"request_id": "req-001"
}
Основная логика🔗
-
Поиск или создание треда:
- Ищется запись в БД по
user_idиsender. - Если не найдено — создаётся новый Thread в OpenAI, затем сохраняется в таблицу
users. - Если найден — используется сохранённый
thread_id.
- Ищется запись в БД по
-
Отправка сообщения:
- Создаётся сообщение в Thread (OpenAI API).
- Запускается
Run.
-
Ожидание завершения:
- Через
safe_wait_on_runилиstream_with_tools(в зависимости от режима). - Если
requires_action, вызываются функции черезtools_service, и результат передаётся в OpenAI.
- Через
-
Извлечение ответа:
- Извлекается либо весь ответ целиком (
pretty_print), либо поступает в чанках (при стриминге). - Удаляются ссылки на файлы.
- Извлекается либо весь ответ целиком (
-
Финальная стадия:
-
Возвращается JSON с результатом + финальный блок с меткой
final: trueилиtype: final.
Ошибки🔗
- Если Run не завершился за 60 секунд, выбрасывается
TimeoutError. - Если статус Run =
failed— возвращается сообщение об ошибке. - Все ошибки логируются через
log.error(...).
Ответ клиенту🔗
- В обычном режиме: JSON-строки, по одной на каждый блок ответа (
StreamingResponse). - В стриминге: Server-Sent Events (SSE), каждая часть ответа отдаётся по мере генерации.
Переходим к следующему разделу.
7. API эндпоинты🔗
Сервис предоставляет два основных HTTP-эндпоинта для взаимодействия с OpenAI Assistant API. Оба эндпоинта используют схему MessageRequest.
POST /send_message🔗
Отправка сообщения в чат-бота и получение ответа. Используется там где не возможно реализовать потоковый вывод (т.е. везде кроме виджета).
Request:
{
"assistant_key": "asst_123",
"open_ai_key": "sk-...",
"user_id": "abc123",
"message": [{"type": "text", "text": "Привет"}],
"sender": "whatsapp",
"request_id": "req-001"
}
Response (streamed line-by-line, text/event-stream):
{
"answer": "Привет! Чем могу помочь?",
"final": false,
"request_id": "req-001",
"input_tokens": 17,
"output_tokens": 23,
"openai_model": "gpt-4"
}
Последний блок:
Ответ формируется по частям и возвращается через
StreamingResponse, каждая строка — JSON.
POST /streaming_send_message🔗
Реализация полной потоковой передачи данных (SSE), включая вызов tools_service, если ассистент инициирует ToolCall.
Request:
Такая же схема, как и для /send_message.
Response:
text/event-stream- Каждое событие отдаётся как:
- Финальный блок:
data: {
"type": "final",
"functions": ["get_weather"],
"data": "Температура в Москве 22°C",
"request_id": "req-001",
"input_tokens": 20,
"output_tokens": 30,
"openai_model": "gpt-4"
}\n\n
Примеры сообщений🔗
Message — список словарей, формат:
Формат схемы запроса (MessageRequest)🔗
class MessageRequest(BaseModel):
assistant_key: str
open_ai_key: str
user_id: str
message: list[dict]
sender: str
request_id: str
8. Логирование и мониторинг🔗
gpt_service использует единый модуль логирования, применяемый во всех микросервисах проекта. Он реализован на базе стандартного logging с расширениями через pythonjsonlogger.
Так как везде используется один и тот же метод логирования, смотри его в документации data_service