Архитектура системы
Архитектура системы
Архитектурный стиль
Проект организован по принципам слоистой и частично hexagonal architecture. В каждом сервисе можно выделить:
core- доменные интерфейсы, перечисления, use cases или сервисы бизнес-логики.infrastructure- реализации хранилищ, брокеров, облачного storage, детекторов, тренеров и DI.presentation- HTTP-роутеры и Pydantic-схемы в API-сервисе.tests- модульные тесты сервисов, репозиториев, фабрик и use cases.
Зависимости в основном направлены внутрь: доменный слой описывает интерфейсы, инфраструктура их реализует, а presentation-слой вызывает сервисы через DI.
Компоненты
schemion-api
FastAPI-приложение. Основные функции:
- регистрация, вход и обновление JWT-токенов;
- создание и чтение датасетов, моделей и задач;
- загрузка файлов в MinIO;
- публикация сообщений в Bobber;
- выдача presigned URL для скачивания файлов;
- SSE-подписка на обновления задачи;
- административная панель на базе
sqladmin; - in-memory кэширование списков и отдельных объектов.
Маршруты подключаются в schemion-api/app/main.py:
/auth/datasets/models/tasks/admin
schemion-training
Фоновый Python-процесс, который запускается как python -m app.main. Он подписывается на training_queue через BobberClient. При получении сообщения воркер:
- открывает сессию PostgreSQL;
- создает репозитории задач, моделей и датасетов;
- загружает базовые веса модели из MinIO;
- загружает ZIP-датасет из MinIO и извлекает YAML-конфигурацию;
- создает тренер через
DetectorTrainerFactory; - запускает обучение;
- сохраняет обученную модель и метрики в MinIO;
- обновляет задачу в PostgreSQL;
- создает новую пользовательскую модель, связанную с базовой моделью и датасетом.
schemion-inference
Фоновый Python-процесс, который запускается как python -m app.infrastructure.main. Он подписывается на inference_queue. При получении сообщения воркер:
- загружает входное изображение из MinIO;
- загружает веса модели;
- создает детектор через
DetectorFactory; - разбивает изображение на тайлы;
- выполняет предсказание на каждом тайле;
- сдвигает координаты bounding boxes обратно в систему координат исходного изображения;
- сохраняет JSON-результат в MinIO;
- обновляет задачу в PostgreSQL.
Bobber
Bobber используется как брокер сообщений. API публикует JSON-сообщения в очереди:
inference_queuetraining_queue
Воркеры подписываются на соответствующие очереди и обрабатывают сообщения синхронно внутри callback.
PostgreSQL
PostgreSQL хранит метаданные: пользователей, роли, разрешения, датасеты, модели и задачи. Файлы в базе не хранятся; вместо этого сохраняются объектные пути MinIO.
MinIO
MinIO хранит:
- ZIP-архивы датасетов;
- входные изображения или PDF для инференса;
- веса моделей;
- JSON-метрики обучения;
- JSON-результаты инференса.
Поток создания задачи инференса
- Клиент вызывает
POST /tasks/create/inferenceсmodel_idи файлом. - API проверяет существование модели.
- API валидирует MIME-тип файла:
image/jpeg,image/pngилиapplication/pdf. - Файл сохраняется в бакет
schemas-images. - В таблице
tasksсоздается задача со статусомqueued. - API публикует сообщение в
inference_queue. - Inference-воркер меняет статус на
running. - Воркер выполняет предсказание и сохраняет результат в бакет
inference-results. - Воркер записывает
output_pathи переводит задачу вsucceeded. - При ошибке воркер записывает
error_msgи статусfailed.
Поток создания задачи обучения
- Клиент вызывает
POST /tasks/create/trainingсmodel_id,dataset_id,image_size,num_epochsиname. - API проверяет существование датасета.
- В таблице
tasksсоздается задача со статусомqueued. - API публикует сообщение в
training_queue. - Training-воркер загружает базовую модель и датасет.
- Воркер проверяет, что базовая модель является системной (
is_system=True). - Создается тренер: YOLO или Faster R-CNN.
- Выполняется обучение.
- Новые веса сохраняются в бакет
models. - Метрики сохраняются в бакет
metrics. - Создается новая запись в
modelsсis_system=False,base_model_idиdataset_id. - Задача переводится в
succeededилиfailed.
Статусы задач
Задача может находиться в одном из состояний:
queued- задача создана и ожидает обработки воркером.running- воркер начал обработку.succeeded- обработка завершена успешно.failed- обработка завершилась ошибкой, причина хранится вerror_msg.
Синхронизация результата с клиентом
Клиент может получать состояние задачи двумя способами:
- периодический REST-запрос
GET /tasks/{task_id}; - SSE-подписка
GET /tasks/subscribe/{task_id}.
SSE-эндпоинт раз в секунду читает задачу и отправляет событие task_update, пока статус не станет succeeded или failed.