mirror of
https://github.com/mauriceboe/TREK.git
synced 2026-06-30 18:46:00 +00:00
feat(import): parse bookings in the background with a progress widget
Parsing a booking can take a while on a CPU host, so don't hold the upload modal open for it. The async import endpoint returns a job id right away; the parse runs server-side (one at a time per user) and pushes progress over the user's WebSocket, and a small widget in the bottom corner tracks it while the user keeps navigating and editing. A finished job opens the per-item review from the widget.
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import {
|
||||
Controller,
|
||||
Post,
|
||||
Get,
|
||||
Body,
|
||||
Param,
|
||||
Headers,
|
||||
@@ -15,6 +16,7 @@ import type { User } from '../../types';
|
||||
import { JwtAuthGuard } from '../auth/jwt-auth.guard';
|
||||
import { CurrentUser } from '../auth/current-user.decorator';
|
||||
import { BookingImportService } from './booking-import.service';
|
||||
import { ImportJobsService } from './import-jobs.service';
|
||||
import { bookingImportModeSchema } from '@trek/shared';
|
||||
import type { BookingImportPreviewItem, BookingImportPreviewResponse, BookingImportConfirmResponse, BookingImportMode } from '@trek/shared';
|
||||
|
||||
@@ -30,7 +32,10 @@ const UPLOAD = {
|
||||
@Controller('api/trips/:tripId/reservations/import')
|
||||
@UseGuards(JwtAuthGuard)
|
||||
export class BookingImportController {
|
||||
constructor(private readonly bookingImport: BookingImportService) {}
|
||||
constructor(
|
||||
private readonly bookingImport: BookingImportService,
|
||||
private readonly importJobs: ImportJobsService,
|
||||
) {}
|
||||
|
||||
private requireTrip(tripId: string, user: User) {
|
||||
const trip = this.bookingImport.verifyTripAccess(tripId, user.id);
|
||||
@@ -44,6 +49,31 @@ export class BookingImportController {
|
||||
}
|
||||
}
|
||||
|
||||
/** Shared validation for both the sync and async import endpoints; returns the parsed mode. */
|
||||
private validateImport(tripId: string, user: User, files: Express.Multer.File[] | undefined, rawMode?: string): BookingImportMode {
|
||||
const trip = this.requireTrip(tripId, user);
|
||||
this.requireEdit(trip, user);
|
||||
|
||||
const modeResult = bookingImportModeSchema.safeParse(rawMode ?? 'no-ai');
|
||||
if (!modeResult.success) throw new HttpException({ error: 'Invalid mode' }, 400);
|
||||
const mode = modeResult.data;
|
||||
|
||||
if (mode === 'force-ai' && !this.bookingImport.aiAvailable(user.id)) {
|
||||
throw new HttpException({ error: 'AI parsing is not configured' }, 409);
|
||||
}
|
||||
if (mode === 'no-ai' && !this.bookingImport.isAvailable()) {
|
||||
throw new HttpException({ error: 'KItinerary extractor is not available on this server' }, 503);
|
||||
}
|
||||
if (!files || files.length === 0) throw new HttpException({ error: 'No files uploaded' }, 400);
|
||||
for (const f of files) {
|
||||
const ext = f.originalname.toLowerCase().slice(f.originalname.lastIndexOf('.'));
|
||||
if (!ACCEPTED_EXTS.has(ext)) {
|
||||
throw new HttpException({ error: `Unsupported file type: ${f.originalname}. Accepted: EML, PDF, PKPass, HTML, TXT` }, 400);
|
||||
}
|
||||
}
|
||||
return mode;
|
||||
}
|
||||
|
||||
/**
|
||||
* POST /api/trips/:tripId/reservations/import/booking
|
||||
* Accepts up to 5 booking confirmation files (EML, PDF, PKPass, HTML, TXT).
|
||||
@@ -56,39 +86,41 @@ export class BookingImportController {
|
||||
@Param('tripId') tripId: string,
|
||||
@UploadedFiles() files: Express.Multer.File[] | undefined,
|
||||
@Body('mode') rawMode?: string,
|
||||
) {
|
||||
const trip = this.requireTrip(tripId, user);
|
||||
this.requireEdit(trip, user);
|
||||
): Promise<BookingImportPreviewResponse> {
|
||||
const mode = this.validateImport(tripId, user, files, rawMode);
|
||||
return this.bookingImport.preview(files!, mode, user.id);
|
||||
}
|
||||
|
||||
const modeResult = bookingImportModeSchema.safeParse(rawMode ?? 'no-ai');
|
||||
if (!modeResult.success) {
|
||||
throw new HttpException({ error: 'Invalid mode' }, 400);
|
||||
}
|
||||
const mode: BookingImportMode = modeResult.data;
|
||||
/**
|
||||
* POST /api/trips/:tripId/reservations/import/booking/async
|
||||
* Same input as /booking, but returns a job id immediately and parses in the
|
||||
* background. Progress + completion are pushed over the user's WebSocket
|
||||
* (import:progress / import:done / import:error). Lets the upload modal close at
|
||||
* once and a background widget track the work while the user keeps navigating.
|
||||
*/
|
||||
@Post('booking/async')
|
||||
@UseInterceptors(FilesInterceptor('files', MAX_FILES, UPLOAD))
|
||||
async previewAsync(
|
||||
@CurrentUser() user: User,
|
||||
@Param('tripId') tripId: string,
|
||||
@UploadedFiles() files: Express.Multer.File[] | undefined,
|
||||
@Body('mode') rawMode?: string,
|
||||
): Promise<{ jobId: string }> {
|
||||
const mode = this.validateImport(tripId, user, files, rawMode);
|
||||
const jobId = this.importJobs.start(tripId, files!, mode, user.id);
|
||||
return { jobId };
|
||||
}
|
||||
|
||||
// Forcing AI requires it to be configured; otherwise surface a clear 4xx.
|
||||
if (mode === 'force-ai' && !this.bookingImport.aiAvailable(user.id)) {
|
||||
throw new HttpException({ error: 'AI parsing is not configured' }, 409);
|
||||
}
|
||||
// For the kitinerary-only path, keep the existing 503 contract.
|
||||
if (mode === 'no-ai' && !this.bookingImport.isAvailable()) {
|
||||
throw new HttpException({ error: 'KItinerary extractor is not available on this server' }, 503);
|
||||
}
|
||||
|
||||
if (!files || files.length === 0) {
|
||||
throw new HttpException({ error: 'No files uploaded' }, 400);
|
||||
}
|
||||
|
||||
// Validate extensions
|
||||
for (const f of files) {
|
||||
const ext = f.originalname.toLowerCase().slice(f.originalname.lastIndexOf('.'));
|
||||
if (!ACCEPTED_EXTS.has(ext)) {
|
||||
throw new HttpException({ error: `Unsupported file type: ${f.originalname}. Accepted: EML, PDF, PKPass, HTML, TXT` }, 400);
|
||||
}
|
||||
}
|
||||
|
||||
const result: BookingImportPreviewResponse = await this.bookingImport.preview(files, mode, user.id);
|
||||
return result;
|
||||
/**
|
||||
* GET /api/trips/:tripId/reservations/import/jobs/:jobId
|
||||
* Poll a background import job — recovery path for a client that missed the
|
||||
* WebSocket push (navigation, reconnect). 404 once the job has expired.
|
||||
*/
|
||||
@Get('jobs/:jobId')
|
||||
async jobStatus(@CurrentUser() user: User, @Param('jobId') jobId: string) {
|
||||
const job = this.importJobs.get(jobId, user.id);
|
||||
if (!job) throw new HttpException({ error: 'Job not found' }, 404);
|
||||
return { status: job.status, done: job.done, total: job.total, result: job.result, error: job.error };
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { BookingImportController } from './booking-import.controller';
|
||||
import { BookingImportService } from './booking-import.service';
|
||||
import { ImportJobsService } from './import-jobs.service';
|
||||
import { KitineraryExtractorService } from './kitinerary-extractor.service';
|
||||
import { FeaturesController } from './features.controller';
|
||||
import { LlmParseModule } from '../llm-parse/llm-parse.module';
|
||||
@@ -8,6 +9,6 @@ import { LlmParseModule } from '../llm-parse/llm-parse.module';
|
||||
@Module({
|
||||
imports: [LlmParseModule],
|
||||
controllers: [BookingImportController, FeaturesController],
|
||||
providers: [BookingImportService, KitineraryExtractorService],
|
||||
providers: [BookingImportService, KitineraryExtractorService, ImportJobsService],
|
||||
})
|
||||
export class BookingImportModule {}
|
||||
|
||||
@@ -65,6 +65,7 @@ export class BookingImportService {
|
||||
files: Express.Multer.File[],
|
||||
mode: BookingImportMode,
|
||||
userId: number,
|
||||
onProgress?: (done: number, total: number, fileName: string) => void,
|
||||
): Promise<BookingImportPreviewResponse> {
|
||||
const kitineraryAvailable = this.extractor.isAvailable();
|
||||
const aiAvailable = this.llmParse.isAvailable(userId);
|
||||
@@ -76,6 +77,7 @@ export class BookingImportService {
|
||||
const allWarnings: string[] = [];
|
||||
const fileReports: BookingImportFileReport[] = [];
|
||||
|
||||
let processed = 0;
|
||||
for (const file of files) {
|
||||
let kiItems: KiReservation[] = [];
|
||||
let aiUsed = false;
|
||||
@@ -102,14 +104,16 @@ export class BookingImportService {
|
||||
|
||||
if (kiItems.length === 0) {
|
||||
allWarnings.push(`${file.originalname}: no reservations found`);
|
||||
continue;
|
||||
} else {
|
||||
const { items, warnings } = mapReservations(kiItems, file.originalname);
|
||||
// LLM extraction is less certain than kitinerary — always flag for review.
|
||||
if (aiUsed) for (const it of items) it.needs_review = true;
|
||||
allItems.push(...items);
|
||||
allWarnings.push(...warnings);
|
||||
}
|
||||
|
||||
const { items, warnings } = mapReservations(kiItems, file.originalname);
|
||||
// LLM extraction is less certain than kitinerary — always flag for review.
|
||||
if (aiUsed) for (const it of items) it.needs_review = true;
|
||||
allItems.push(...items);
|
||||
allWarnings.push(...warnings);
|
||||
// Report per-file progress so a background import can drive a live widget.
|
||||
onProgress?.(++processed, files.length, file.originalname);
|
||||
}
|
||||
|
||||
return { items: allItems, warnings: allWarnings, files: fileReports };
|
||||
|
||||
@@ -0,0 +1,85 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import { broadcastToUser } from '../../websocket';
|
||||
import { BookingImportService } from './booking-import.service';
|
||||
import type { BookingImportMode, BookingImportPreviewResponse } from '@trek/shared';
|
||||
|
||||
type JobStatus = 'running' | 'done' | 'error';
|
||||
|
||||
interface ImportJob {
|
||||
id: string;
|
||||
tripId: string;
|
||||
userId: number;
|
||||
status: JobStatus;
|
||||
done: number;
|
||||
total: number;
|
||||
result?: BookingImportPreviewResponse;
|
||||
error?: string;
|
||||
createdAt: number;
|
||||
}
|
||||
|
||||
// Keep a finished job around briefly so a client that missed the WebSocket push
|
||||
// (navigation, reconnect) can still GET its result.
|
||||
const JOB_TTL_MS = 10 * 60_000;
|
||||
|
||||
/**
|
||||
* Runs a booking-import parse OFF the request: the controller returns a job id
|
||||
* immediately, the parse continues here, and progress/completion are pushed to the
|
||||
* user's sockets via `broadcastToUser` (which reaches them on ANY page, not just the
|
||||
* trip room). This is what lets the upload modal close at once and a background widget
|
||||
* track the work while the user keeps navigating. The actual parsing is the same
|
||||
* `BookingImportService.preview` the synchronous endpoint uses.
|
||||
*/
|
||||
@Injectable()
|
||||
export class ImportJobsService {
|
||||
private readonly jobs = new Map<string, ImportJob>();
|
||||
/** Tail of each user's job chain — parses run one at a time per user, not all at once. */
|
||||
private readonly chains = new Map<number, Promise<void>>();
|
||||
|
||||
constructor(private readonly bookingImport: BookingImportService) {}
|
||||
|
||||
/** Create a job and queue it behind the user's other parses; returns the job id at once. */
|
||||
start(tripId: string, files: Express.Multer.File[], mode: BookingImportMode, userId: number): string {
|
||||
const id = randomUUID();
|
||||
const job: ImportJob = { id, tripId, userId, status: 'running', done: 0, total: files.length, createdAt: Date.now() };
|
||||
this.jobs.set(id, job);
|
||||
// Chain onto the user's previous parse so they run sequentially (one CPU-heavy
|
||||
// inference at a time), while the request returns immediately.
|
||||
const prev = this.chains.get(userId) ?? Promise.resolve();
|
||||
const next = prev.then(() => this.run(job, files, mode)).catch(() => {});
|
||||
this.chains.set(userId, next);
|
||||
void next.finally(() => {
|
||||
if (this.chains.get(userId) === next) this.chains.delete(userId);
|
||||
});
|
||||
return id;
|
||||
}
|
||||
|
||||
get(id: string, userId: number): ImportJob | undefined {
|
||||
const job = this.jobs.get(id);
|
||||
return job && job.userId === userId ? job : undefined;
|
||||
}
|
||||
|
||||
private async run(job: ImportJob, files: Express.Multer.File[], mode: BookingImportMode): Promise<void> {
|
||||
this.push(job, 'import:progress', { status: 'running', done: 0, total: job.total });
|
||||
try {
|
||||
const result = await this.bookingImport.preview(files, mode, job.userId, (done, total, fileName) => {
|
||||
job.done = done;
|
||||
this.push(job, 'import:progress', { status: 'running', done, total, fileName });
|
||||
});
|
||||
job.status = 'done';
|
||||
job.result = result;
|
||||
this.push(job, 'import:done', { result });
|
||||
} catch (err) {
|
||||
job.status = 'error';
|
||||
job.error = err instanceof Error ? err.message : String(err);
|
||||
this.push(job, 'import:error', { message: job.error });
|
||||
} finally {
|
||||
const id = job.id;
|
||||
setTimeout(() => this.jobs.delete(id), JOB_TTL_MS).unref?.();
|
||||
}
|
||||
}
|
||||
|
||||
private push(job: ImportJob, type: string, payload: Record<string, unknown>): void {
|
||||
broadcastToUser(job.userId, { type, jobId: job.id, tripId: job.tripId, ...payload });
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user