feat(import): parse bookings in the background with a progress widget

Parsing a booking can take a while on a CPU host, so don't hold the
upload modal open for it. The async import endpoint returns a job id
right away; the parse runs server-side (one at a time per user) and
pushes progress over the user's WebSocket, and a small widget in the
bottom corner tracks it while the user keeps navigating and editing.
A finished job opens the per-item review from the widget.
This commit is contained in:
Maurice
2026-06-25 23:56:21 +02:00
parent c92c6bc07c
commit 628830011d
10 changed files with 452 additions and 354 deletions
@@ -1,6 +1,7 @@
import {
Controller,
Post,
Get,
Body,
Param,
Headers,
@@ -15,6 +16,7 @@ import type { User } from '../../types';
import { JwtAuthGuard } from '../auth/jwt-auth.guard';
import { CurrentUser } from '../auth/current-user.decorator';
import { BookingImportService } from './booking-import.service';
import { ImportJobsService } from './import-jobs.service';
import { bookingImportModeSchema } from '@trek/shared';
import type { BookingImportPreviewItem, BookingImportPreviewResponse, BookingImportConfirmResponse, BookingImportMode } from '@trek/shared';
@@ -30,7 +32,10 @@ const UPLOAD = {
@Controller('api/trips/:tripId/reservations/import')
@UseGuards(JwtAuthGuard)
export class BookingImportController {
constructor(private readonly bookingImport: BookingImportService) {}
constructor(
private readonly bookingImport: BookingImportService,
private readonly importJobs: ImportJobsService,
) {}
private requireTrip(tripId: string, user: User) {
const trip = this.bookingImport.verifyTripAccess(tripId, user.id);
@@ -44,6 +49,31 @@ export class BookingImportController {
}
}
/** Shared validation for both the sync and async import endpoints; returns the parsed mode. */
private validateImport(tripId: string, user: User, files: Express.Multer.File[] | undefined, rawMode?: string): BookingImportMode {
const trip = this.requireTrip(tripId, user);
this.requireEdit(trip, user);
const modeResult = bookingImportModeSchema.safeParse(rawMode ?? 'no-ai');
if (!modeResult.success) throw new HttpException({ error: 'Invalid mode' }, 400);
const mode = modeResult.data;
if (mode === 'force-ai' && !this.bookingImport.aiAvailable(user.id)) {
throw new HttpException({ error: 'AI parsing is not configured' }, 409);
}
if (mode === 'no-ai' && !this.bookingImport.isAvailable()) {
throw new HttpException({ error: 'KItinerary extractor is not available on this server' }, 503);
}
if (!files || files.length === 0) throw new HttpException({ error: 'No files uploaded' }, 400);
for (const f of files) {
const ext = f.originalname.toLowerCase().slice(f.originalname.lastIndexOf('.'));
if (!ACCEPTED_EXTS.has(ext)) {
throw new HttpException({ error: `Unsupported file type: ${f.originalname}. Accepted: EML, PDF, PKPass, HTML, TXT` }, 400);
}
}
return mode;
}
/**
* POST /api/trips/:tripId/reservations/import/booking
* Accepts up to 5 booking confirmation files (EML, PDF, PKPass, HTML, TXT).
@@ -56,39 +86,41 @@ export class BookingImportController {
@Param('tripId') tripId: string,
@UploadedFiles() files: Express.Multer.File[] | undefined,
@Body('mode') rawMode?: string,
) {
const trip = this.requireTrip(tripId, user);
this.requireEdit(trip, user);
): Promise<BookingImportPreviewResponse> {
const mode = this.validateImport(tripId, user, files, rawMode);
return this.bookingImport.preview(files!, mode, user.id);
}
const modeResult = bookingImportModeSchema.safeParse(rawMode ?? 'no-ai');
if (!modeResult.success) {
throw new HttpException({ error: 'Invalid mode' }, 400);
}
const mode: BookingImportMode = modeResult.data;
/**
* POST /api/trips/:tripId/reservations/import/booking/async
* Same input as /booking, but returns a job id immediately and parses in the
* background. Progress + completion are pushed over the user's WebSocket
* (import:progress / import:done / import:error). Lets the upload modal close at
* once and a background widget track the work while the user keeps navigating.
*/
@Post('booking/async')
@UseInterceptors(FilesInterceptor('files', MAX_FILES, UPLOAD))
async previewAsync(
@CurrentUser() user: User,
@Param('tripId') tripId: string,
@UploadedFiles() files: Express.Multer.File[] | undefined,
@Body('mode') rawMode?: string,
): Promise<{ jobId: string }> {
const mode = this.validateImport(tripId, user, files, rawMode);
const jobId = this.importJobs.start(tripId, files!, mode, user.id);
return { jobId };
}
// Forcing AI requires it to be configured; otherwise surface a clear 4xx.
if (mode === 'force-ai' && !this.bookingImport.aiAvailable(user.id)) {
throw new HttpException({ error: 'AI parsing is not configured' }, 409);
}
// For the kitinerary-only path, keep the existing 503 contract.
if (mode === 'no-ai' && !this.bookingImport.isAvailable()) {
throw new HttpException({ error: 'KItinerary extractor is not available on this server' }, 503);
}
if (!files || files.length === 0) {
throw new HttpException({ error: 'No files uploaded' }, 400);
}
// Validate extensions
for (const f of files) {
const ext = f.originalname.toLowerCase().slice(f.originalname.lastIndexOf('.'));
if (!ACCEPTED_EXTS.has(ext)) {
throw new HttpException({ error: `Unsupported file type: ${f.originalname}. Accepted: EML, PDF, PKPass, HTML, TXT` }, 400);
}
}
const result: BookingImportPreviewResponse = await this.bookingImport.preview(files, mode, user.id);
return result;
/**
* GET /api/trips/:tripId/reservations/import/jobs/:jobId
* Poll a background import job — recovery path for a client that missed the
* WebSocket push (navigation, reconnect). 404 once the job has expired.
*/
@Get('jobs/:jobId')
async jobStatus(@CurrentUser() user: User, @Param('jobId') jobId: string) {
const job = this.importJobs.get(jobId, user.id);
if (!job) throw new HttpException({ error: 'Job not found' }, 404);
return { status: job.status, done: job.done, total: job.total, result: job.result, error: job.error };
}
/**
@@ -1,6 +1,7 @@
import { Module } from '@nestjs/common';
import { BookingImportController } from './booking-import.controller';
import { BookingImportService } from './booking-import.service';
import { ImportJobsService } from './import-jobs.service';
import { KitineraryExtractorService } from './kitinerary-extractor.service';
import { FeaturesController } from './features.controller';
import { LlmParseModule } from '../llm-parse/llm-parse.module';
@@ -8,6 +9,6 @@ import { LlmParseModule } from '../llm-parse/llm-parse.module';
@Module({
imports: [LlmParseModule],
controllers: [BookingImportController, FeaturesController],
providers: [BookingImportService, KitineraryExtractorService],
providers: [BookingImportService, KitineraryExtractorService, ImportJobsService],
})
export class BookingImportModule {}
@@ -65,6 +65,7 @@ export class BookingImportService {
files: Express.Multer.File[],
mode: BookingImportMode,
userId: number,
onProgress?: (done: number, total: number, fileName: string) => void,
): Promise<BookingImportPreviewResponse> {
const kitineraryAvailable = this.extractor.isAvailable();
const aiAvailable = this.llmParse.isAvailable(userId);
@@ -76,6 +77,7 @@ export class BookingImportService {
const allWarnings: string[] = [];
const fileReports: BookingImportFileReport[] = [];
let processed = 0;
for (const file of files) {
let kiItems: KiReservation[] = [];
let aiUsed = false;
@@ -102,14 +104,16 @@ export class BookingImportService {
if (kiItems.length === 0) {
allWarnings.push(`${file.originalname}: no reservations found`);
continue;
} else {
const { items, warnings } = mapReservations(kiItems, file.originalname);
// LLM extraction is less certain than kitinerary — always flag for review.
if (aiUsed) for (const it of items) it.needs_review = true;
allItems.push(...items);
allWarnings.push(...warnings);
}
const { items, warnings } = mapReservations(kiItems, file.originalname);
// LLM extraction is less certain than kitinerary — always flag for review.
if (aiUsed) for (const it of items) it.needs_review = true;
allItems.push(...items);
allWarnings.push(...warnings);
// Report per-file progress so a background import can drive a live widget.
onProgress?.(++processed, files.length, file.originalname);
}
return { items: allItems, warnings: allWarnings, files: fileReports };
@@ -0,0 +1,85 @@
import { Injectable } from '@nestjs/common';
import { randomUUID } from 'node:crypto';
import { broadcastToUser } from '../../websocket';
import { BookingImportService } from './booking-import.service';
import type { BookingImportMode, BookingImportPreviewResponse } from '@trek/shared';
type JobStatus = 'running' | 'done' | 'error';
interface ImportJob {
id: string;
tripId: string;
userId: number;
status: JobStatus;
done: number;
total: number;
result?: BookingImportPreviewResponse;
error?: string;
createdAt: number;
}
// Keep a finished job around briefly so a client that missed the WebSocket push
// (navigation, reconnect) can still GET its result.
const JOB_TTL_MS = 10 * 60_000;
/**
* Runs a booking-import parse OFF the request: the controller returns a job id
* immediately, the parse continues here, and progress/completion are pushed to the
* user's sockets via `broadcastToUser` (which reaches them on ANY page, not just the
* trip room). This is what lets the upload modal close at once and a background widget
* track the work while the user keeps navigating. The actual parsing is the same
* `BookingImportService.preview` the synchronous endpoint uses.
*/
@Injectable()
export class ImportJobsService {
private readonly jobs = new Map<string, ImportJob>();
/** Tail of each user's job chain — parses run one at a time per user, not all at once. */
private readonly chains = new Map<number, Promise<void>>();
constructor(private readonly bookingImport: BookingImportService) {}
/** Create a job and queue it behind the user's other parses; returns the job id at once. */
start(tripId: string, files: Express.Multer.File[], mode: BookingImportMode, userId: number): string {
const id = randomUUID();
const job: ImportJob = { id, tripId, userId, status: 'running', done: 0, total: files.length, createdAt: Date.now() };
this.jobs.set(id, job);
// Chain onto the user's previous parse so they run sequentially (one CPU-heavy
// inference at a time), while the request returns immediately.
const prev = this.chains.get(userId) ?? Promise.resolve();
const next = prev.then(() => this.run(job, files, mode)).catch(() => {});
this.chains.set(userId, next);
void next.finally(() => {
if (this.chains.get(userId) === next) this.chains.delete(userId);
});
return id;
}
get(id: string, userId: number): ImportJob | undefined {
const job = this.jobs.get(id);
return job && job.userId === userId ? job : undefined;
}
private async run(job: ImportJob, files: Express.Multer.File[], mode: BookingImportMode): Promise<void> {
this.push(job, 'import:progress', { status: 'running', done: 0, total: job.total });
try {
const result = await this.bookingImport.preview(files, mode, job.userId, (done, total, fileName) => {
job.done = done;
this.push(job, 'import:progress', { status: 'running', done, total, fileName });
});
job.status = 'done';
job.result = result;
this.push(job, 'import:done', { result });
} catch (err) {
job.status = 'error';
job.error = err instanceof Error ? err.message : String(err);
this.push(job, 'import:error', { message: job.error });
} finally {
const id = job.id;
setTimeout(() => this.jobs.delete(id), JOB_TTL_MS).unref?.();
}
}
private push(job: ImportJob, type: string, payload: Record<string, unknown>): void {
broadcastToUser(job.userId, { type, jobId: job.id, tripId: job.tripId, ...payload });
}
}