From e5929389b4e6798c635a9f3d35d5348991c1aaf7 Mon Sep 17 00:00:00 2001 From: WithoutPants <53250216+WithoutPants@users.noreply.github.com> Date: Thu, 14 Mar 2024 11:06:23 +1100 Subject: [PATCH] Make migration an asynchronous task (#4666) * Add failed state and error to Job * Move migration code * Add websocket monitor * Make migrate a job managed task --- graphql/schema/schema.graphql | 4 +- graphql/schema/types/job.graphql | 2 + internal/api/resolver_mutation_configure.go | 5 - internal/api/resolver_mutation_migrate.go | 13 ++ internal/api/resolver_query_job.go | 1 + internal/manager/manager.go | 46 ----- internal/manager/manager_tasks.go | 60 +++--- internal/manager/task/clean_generated.go | 11 +- internal/manager/task/migrate.go | 153 ++++++++++++++ internal/manager/task/migrate_blobs.go | 13 +- .../manager/task/migrate_scene_screenshots.go | 12 +- internal/manager/task/packages.go | 18 +- internal/manager/task_autotag.go | 3 +- internal/manager/task_clean.go | 5 +- internal/manager/task_generate.go | 5 +- internal/manager/task_identify.go | 11 +- internal/manager/task_optimise.go | 14 +- internal/manager/task_plugin.go | 12 +- internal/manager/task_scan.go | 7 +- pkg/job/job.go | 19 +- pkg/job/manager.go | 5 +- pkg/job/manager_test.go | 4 +- pkg/sqlite/database.go | 129 +----------- pkg/sqlite/migrate.go | 188 ++++++++++++++++++ ui/v2.5/graphql/data/job.graphql | 1 + ui/v2.5/graphql/subscriptions.graphql | 1 + ui/v2.5/src/App.tsx | 2 + ui/v2.5/src/ConnectionMonitor.tsx | 34 ++++ .../components/Settings/Tasks/JobTable.tsx | 13 +- ui/v2.5/src/components/Settings/styles.scss | 8 + ui/v2.5/src/components/Setup/Migrate.tsx | 62 +++++- ui/v2.5/src/components/Setup/styles.scss | 17 ++ ui/v2.5/src/core/StashService.ts | 39 +++- ui/v2.5/src/core/createClient.ts | 19 +- ui/v2.5/src/locales/en-GB.json | 4 + ui/v2.5/src/utils/job.ts | 57 ++++-- 36 files changed, 693 insertions(+), 304 deletions(-) create mode 100644 internal/manager/task/migrate.go create mode 100644 pkg/sqlite/migrate.go create mode 100644 ui/v2.5/src/ConnectionMonitor.tsx diff --git a/graphql/schema/schema.graphql b/graphql/schema/schema.graphql index 706652d3c..f4866cb71 100644 --- a/graphql/schema/schema.graphql +++ b/graphql/schema/schema.graphql @@ -226,7 +226,9 @@ type Query { type Mutation { setup(input: SetupInput!): Boolean! - migrate(input: MigrateInput!): Boolean! + + "Migrates the schema to the required version. Returns the job ID" + migrate(input: MigrateInput!): ID! sceneCreate(input: SceneCreateInput!): Scene sceneUpdate(input: SceneUpdateInput!): Scene diff --git a/graphql/schema/types/job.graphql b/graphql/schema/types/job.graphql index ba8b09dbf..0bdd80704 100644 --- a/graphql/schema/types/job.graphql +++ b/graphql/schema/types/job.graphql @@ -4,6 +4,7 @@ enum JobStatus { FINISHED STOPPING CANCELLED + FAILED } type Job { @@ -15,6 +16,7 @@ type Job { startTime: Time endTime: Time addTime: Time! + error: String } input FindJobInput { diff --git a/internal/api/resolver_mutation_configure.go b/internal/api/resolver_mutation_configure.go index 03df50329..f5f3af47f 100644 --- a/internal/api/resolver_mutation_configure.go +++ b/internal/api/resolver_mutation_configure.go @@ -22,11 +22,6 @@ func (r *mutationResolver) Setup(ctx context.Context, input manager.SetupInput) return err == nil, err } -func (r *mutationResolver) Migrate(ctx context.Context, input manager.MigrateInput) (bool, error) { - err := manager.GetInstance().Migrate(ctx, input) - return err == nil, err -} - func (r *mutationResolver) ConfigureGeneral(ctx context.Context, input ConfigGeneralInput) (*ConfigGeneralResult, error) { c := config.GetInstance() diff --git a/internal/api/resolver_mutation_migrate.go b/internal/api/resolver_mutation_migrate.go index 26a56a1b8..083d307e9 100644 --- a/internal/api/resolver_mutation_migrate.go +++ b/internal/api/resolver_mutation_migrate.go @@ -38,3 +38,16 @@ func (r *mutationResolver) MigrateBlobs(ctx context.Context, input MigrateBlobsI return strconv.Itoa(jobID), nil } + +func (r *mutationResolver) Migrate(ctx context.Context, input manager.MigrateInput) (string, error) { + mgr := manager.GetInstance() + t := &task.MigrateJob{ + BackupPath: input.BackupPath, + Config: mgr.Config, + Database: mgr.Database, + } + + jobID := mgr.JobManager.Add(ctx, "Migrating database...", t) + + return strconv.Itoa(jobID), nil +} diff --git a/internal/api/resolver_query_job.go b/internal/api/resolver_query_job.go index aaa671013..0e1222445 100644 --- a/internal/api/resolver_query_job.go +++ b/internal/api/resolver_query_job.go @@ -41,6 +41,7 @@ func jobToJobModel(j job.Job) *Job { StartTime: j.StartTime, EndTime: j.EndTime, AddTime: j.AddTime, + Error: j.Error, } if j.Progress != -1 { diff --git a/internal/manager/manager.go b/internal/manager/manager.go index cff2b439c..19b700344 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -303,52 +303,6 @@ func (s *Manager) validateFFmpeg() error { return nil } -func (s *Manager) Migrate(ctx context.Context, input MigrateInput) error { - database := s.Database - - // always backup so that we can roll back to the previous version if - // migration fails - backupPath := input.BackupPath - if backupPath == "" { - backupPath = database.DatabaseBackupPath(s.Config.GetBackupDirectoryPath()) - } else { - // check if backup path is a filename or path - // filename goes into backup directory, path is kept as is - filename := filepath.Base(backupPath) - if backupPath == filename { - backupPath = filepath.Join(s.Config.GetBackupDirectoryPathOrDefault(), filename) - } - } - - // perform database backup - if err := database.Backup(backupPath); err != nil { - return fmt.Errorf("error backing up database: %s", err) - } - - if err := database.RunMigrations(); err != nil { - errStr := fmt.Sprintf("error performing migration: %s", err) - - // roll back to the backed up version - restoreErr := database.RestoreFromBackup(backupPath) - if restoreErr != nil { - errStr = fmt.Sprintf("ERROR: unable to restore database from backup after migration failure: %s\n%s", restoreErr.Error(), errStr) - } else { - errStr = "An error occurred migrating the database to the latest schema version. The backup database file was automatically renamed to restore the database.\n" + errStr - } - - return errors.New(errStr) - } - - // if no backup path was provided, then delete the created backup - if input.BackupPath == "" { - if err := os.Remove(backupPath); err != nil { - logger.Warnf("error removing unwanted database backup (%s): %s", backupPath, err.Error()) - } - } - - return nil -} - func (s *Manager) BackupDatabase(download bool) (string, string, error) { var backupPath string var backupName string diff --git a/internal/manager/manager_tasks.go b/internal/manager/manager_tasks.go index 9a9037d9f..dd2b9dcc2 100644 --- a/internal/manager/manager_tasks.go +++ b/internal/manager/manager_tasks.go @@ -136,7 +136,7 @@ func (s *Manager) Import(ctx context.Context) (int, error) { return 0, errors.New("metadata path must be set in config") } - j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) { + j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) error { task := ImportTask{ repository: s.Repository, resetter: s.Database, @@ -147,6 +147,9 @@ func (s *Manager) Import(ctx context.Context) (int, error) { fileNamingAlgorithm: config.GetVideoFileNamingAlgorithm(), } task.Start(ctx) + + // TODO - return error from task + return nil }) return s.JobManager.Add(ctx, "Importing...", j), nil @@ -159,7 +162,7 @@ func (s *Manager) Export(ctx context.Context) (int, error) { return 0, errors.New("metadata path must be set in config") } - j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) { + j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) error { var wg sync.WaitGroup wg.Add(1) task := ExportTask{ @@ -168,6 +171,8 @@ func (s *Manager) Export(ctx context.Context) (int, error) { fileNamingAlgorithm: config.GetVideoFileNamingAlgorithm(), } task.Start(ctx, &wg) + // TODO - return error from task + return nil }) return s.JobManager.Add(ctx, "Exporting...", j), nil @@ -177,9 +182,11 @@ func (s *Manager) RunSingleTask(ctx context.Context, t Task) int { var wg sync.WaitGroup wg.Add(1) - j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) { + j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) error { t.Start(ctx) - wg.Done() + defer wg.Done() + // TODO - return error from task + return nil }) return s.JobManager.Add(ctx, t.GetDescription(), j) @@ -215,11 +222,10 @@ func (s *Manager) generateScreenshot(ctx context.Context, sceneId string, at *fl logger.Warnf("failure generating screenshot: %v", err) } - j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) { + j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) error { sceneIdInt, err := strconv.Atoi(sceneId) if err != nil { - logger.Errorf("Error parsing scene id %s: %v", sceneId, err) - return + return fmt.Errorf("error parsing scene id %s: %w", sceneId, err) } var scene *models.Scene @@ -234,8 +240,7 @@ func (s *Manager) generateScreenshot(ctx context.Context, sceneId string, at *fl return scene.LoadPrimaryFile(ctx, s.Repository.File) }); err != nil { - logger.Errorf("error finding scene for screenshot generation: %v", err) - return + return fmt.Errorf("error finding scene for screenshot generation: %w", err) } task := GenerateCoverTask{ @@ -248,6 +253,9 @@ func (s *Manager) generateScreenshot(ctx context.Context, sceneId string, at *fl task.Start(ctx) logger.Infof("Generate screenshot finished") + + // TODO - return error from task + return nil }) return s.JobManager.Add(ctx, fmt.Sprintf("Generating screenshot for scene id %s", sceneId), j) @@ -309,7 +317,7 @@ func (s *Manager) OptimiseDatabase(ctx context.Context) int { } func (s *Manager) MigrateHash(ctx context.Context) int { - j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) { + j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) error { fileNamingAlgo := config.GetInstance().GetVideoFileNamingAlgorithm() logger.Infof("Migrating generated files for %s naming hash", fileNamingAlgo.String()) @@ -319,8 +327,7 @@ func (s *Manager) MigrateHash(ctx context.Context) int { scenes, err = s.Repository.Scene.All(ctx) return err }); err != nil { - logger.Errorf("failed to fetch list of scenes for migration: %s", err.Error()) - return + return fmt.Errorf("failed to fetch list of scenes for migration: %w", err) } var wg sync.WaitGroup @@ -331,7 +338,7 @@ func (s *Manager) MigrateHash(ctx context.Context) int { progress.Increment() if job.IsCancelled(ctx) { logger.Info("Stopping due to user request") - return + return nil } if scene == nil { @@ -351,6 +358,7 @@ func (s *Manager) MigrateHash(ctx context.Context) int { } logger.Info("Finished migrating") + return nil }) return s.JobManager.Add(ctx, "Migrating scene hashes...", j) @@ -381,13 +389,12 @@ type StashBoxBatchTagInput struct { } func (s *Manager) StashBoxBatchPerformerTag(ctx context.Context, input StashBoxBatchTagInput) int { - j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) { + j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) error { logger.Infof("Initiating stash-box batch performer tag") boxes := config.GetInstance().GetStashBoxes() if input.Endpoint < 0 || input.Endpoint >= len(boxes) { - logger.Error(fmt.Errorf("invalid stash_box_index %d", input.Endpoint)) - return + return fmt.Errorf("invalid stash_box_index %d", input.Endpoint) } box := boxes[input.Endpoint] @@ -435,7 +442,7 @@ func (s *Manager) StashBoxBatchPerformerTag(ctx context.Context, input StashBoxB } return nil }); err != nil { - logger.Error(err.Error()) + return err } } else if len(input.Names) > 0 || len(input.PerformerNames) > 0 { // The user is batch adding performers @@ -493,13 +500,12 @@ func (s *Manager) StashBoxBatchPerformerTag(ctx context.Context, input StashBoxB } return nil }); err != nil { - logger.Error(err.Error()) - return + return err } } if len(tasks) == 0 { - return + return nil } progress.SetTotal(len(tasks)) @@ -513,19 +519,20 @@ func (s *Manager) StashBoxBatchPerformerTag(ctx context.Context, input StashBoxB progress.Increment() } + + return nil }) return s.JobManager.Add(ctx, "Batch stash-box performer tag...", j) } func (s *Manager) StashBoxBatchStudioTag(ctx context.Context, input StashBoxBatchTagInput) int { - j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) { + j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) error { logger.Infof("Initiating stash-box batch studio tag") boxes := config.GetInstance().GetStashBoxes() if input.Endpoint < 0 || input.Endpoint >= len(boxes) { - logger.Error(fmt.Errorf("invalid stash_box_index %d", input.Endpoint)) - return + return fmt.Errorf("invalid stash_box_index %d", input.Endpoint) } box := boxes[input.Endpoint] @@ -620,13 +627,12 @@ func (s *Manager) StashBoxBatchStudioTag(ctx context.Context, input StashBoxBatc } return nil }); err != nil { - logger.Error(err.Error()) - return + return err } } if len(tasks) == 0 { - return + return nil } progress.SetTotal(len(tasks)) @@ -640,6 +646,8 @@ func (s *Manager) StashBoxBatchStudioTag(ctx context.Context, input StashBoxBatc progress.Increment() } + + return nil }) return s.JobManager.Add(ctx, "Batch stash-box studio tag...", j) diff --git a/internal/manager/task/clean_generated.go b/internal/manager/task/clean_generated.go index 6fbd828a0..902989046 100644 --- a/internal/manager/task/clean_generated.go +++ b/internal/manager/task/clean_generated.go @@ -106,17 +106,15 @@ func (j *CleanGeneratedJob) logError(err error) { } } -func (j *CleanGeneratedJob) Execute(ctx context.Context, progress *job.Progress) { +func (j *CleanGeneratedJob) Execute(ctx context.Context, progress *job.Progress) error { j.tasksComplete = 0 if !j.BlobsStorageType.IsValid() { - logger.Errorf("invalid blobs storage type: %s", j.BlobsStorageType) - return + return fmt.Errorf("invalid blobs storage type: %s", j.BlobsStorageType) } if !j.VideoFileNamingAlgorithm.IsValid() { - logger.Errorf("invalid video file naming algorithm: %s", j.VideoFileNamingAlgorithm) - return + return fmt.Errorf("invalid video file naming algorithm: %s", j.VideoFileNamingAlgorithm) } if j.Options.DryRun { @@ -183,10 +181,11 @@ func (j *CleanGeneratedJob) Execute(ctx context.Context, progress *job.Progress) if job.IsCancelled(ctx) { logger.Info("Stopping due to user request") - return + return nil } logger.Infof("Finished cleaning generated files") + return nil } func (j *CleanGeneratedJob) setTaskProgress(taskProgress float64, progress *job.Progress) { diff --git a/internal/manager/task/migrate.go b/internal/manager/task/migrate.go new file mode 100644 index 000000000..48ba15a26 --- /dev/null +++ b/internal/manager/task/migrate.go @@ -0,0 +1,153 @@ +package task + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + + "github.com/stashapp/stash/pkg/job" + "github.com/stashapp/stash/pkg/logger" + "github.com/stashapp/stash/pkg/sqlite" +) + +type migrateJobConfig interface { + GetBackupDirectoryPath() string + GetBackupDirectoryPathOrDefault() string +} + +type MigrateJob struct { + BackupPath string + Config migrateJobConfig + Database *sqlite.Database +} + +func (s *MigrateJob) Execute(ctx context.Context, progress *job.Progress) error { + required, err := s.required() + if err != nil { + return err + } + + if required == 0 { + logger.Infof("database is already at the latest schema version") + return nil + } + + // set the number of tasks = required steps + optimise + progress.SetTotal(int(required + 1)) + + database := s.Database + + // always backup so that we can roll back to the previous version if + // migration fails + backupPath := s.BackupPath + if backupPath == "" { + backupPath = database.DatabaseBackupPath(s.Config.GetBackupDirectoryPath()) + } else { + // check if backup path is a filename or path + // filename goes into backup directory, path is kept as is + filename := filepath.Base(backupPath) + if backupPath == filename { + backupPath = filepath.Join(s.Config.GetBackupDirectoryPathOrDefault(), filename) + } + } + + // perform database backup + if err := database.Backup(backupPath); err != nil { + return fmt.Errorf("error backing up database: %s", err) + } + + if err := s.runMigrations(ctx, progress); err != nil { + errStr := fmt.Sprintf("error performing migration: %s", err) + + // roll back to the backed up version + restoreErr := database.RestoreFromBackup(backupPath) + if restoreErr != nil { + errStr = fmt.Sprintf("ERROR: unable to restore database from backup after migration failure: %s\n%s", restoreErr.Error(), errStr) + } else { + errStr = "An error occurred migrating the database to the latest schema version. The backup database file was automatically renamed to restore the database.\n" + errStr + } + + return errors.New(errStr) + } + + // if no backup path was provided, then delete the created backup + if s.BackupPath == "" { + if err := os.Remove(backupPath); err != nil { + logger.Warnf("error removing unwanted database backup (%s): %s", backupPath, err.Error()) + } + } + + return nil +} + +func (s *MigrateJob) required() (uint, error) { + database := s.Database + + m, err := sqlite.NewMigrator(database) + if err != nil { + return 0, err + } + + defer m.Close() + + currentSchemaVersion := m.CurrentSchemaVersion() + targetSchemaVersion := m.RequiredSchemaVersion() + + if targetSchemaVersion < currentSchemaVersion { + // shouldn't happen + return 0, nil + } + + return targetSchemaVersion - currentSchemaVersion, nil +} + +func (s *MigrateJob) runMigrations(ctx context.Context, progress *job.Progress) error { + database := s.Database + + m, err := sqlite.NewMigrator(database) + if err != nil { + return err + } + + defer m.Close() + + for { + currentSchemaVersion := m.CurrentSchemaVersion() + targetSchemaVersion := m.RequiredSchemaVersion() + + if currentSchemaVersion >= targetSchemaVersion { + break + } + + var err error + progress.ExecuteTask(fmt.Sprintf("Migrating database to schema version %d", currentSchemaVersion+1), func() { + err = m.RunMigration(ctx, currentSchemaVersion+1) + }) + + if err != nil { + return fmt.Errorf("error running migration for schema %d: %s", currentSchemaVersion+1, err) + } + + progress.Increment() + } + + // reinitialise the database + if err := database.ReInitialise(); err != nil { + return fmt.Errorf("error reinitialising database: %s", err) + } + + // optimise the database + progress.ExecuteTask("Optimising database", func() { + err = database.Optimise(ctx) + }) + + if err != nil { + return fmt.Errorf("error optimising database: %s", err) + } + + progress.Increment() + + return nil +} diff --git a/internal/manager/task/migrate_blobs.go b/internal/manager/task/migrate_blobs.go index 4cda65725..c57d19ea4 100644 --- a/internal/manager/task/migrate_blobs.go +++ b/internal/manager/task/migrate_blobs.go @@ -26,7 +26,7 @@ type MigrateBlobsJob struct { DeleteOld bool } -func (j *MigrateBlobsJob) Execute(ctx context.Context, progress *job.Progress) { +func (j *MigrateBlobsJob) Execute(ctx context.Context, progress *job.Progress) error { var ( count int err error @@ -37,13 +37,12 @@ func (j *MigrateBlobsJob) Execute(ctx context.Context, progress *job.Progress) { }) if err != nil { - logger.Errorf("Error counting blobs: %s", err.Error()) - return + return fmt.Errorf("error counting blobs: %w", err) } if count == 0 { logger.Infof("No blobs to migrate") - return + return nil } logger.Infof("Migrating %d blobs", count) @@ -54,12 +53,11 @@ func (j *MigrateBlobsJob) Execute(ctx context.Context, progress *job.Progress) { if job.IsCancelled(ctx) { logger.Info("Cancelled migrating blobs") - return + return nil } if err != nil { - logger.Errorf("Error migrating blobs: %v", err) - return + return fmt.Errorf("error migrating blobs: %w", err) } // run a vacuum to reclaim space @@ -71,6 +69,7 @@ func (j *MigrateBlobsJob) Execute(ctx context.Context, progress *job.Progress) { }) logger.Infof("Finished migrating blobs") + return nil } func (j *MigrateBlobsJob) countBlobs(ctx context.Context) (int, error) { diff --git a/internal/manager/task/migrate_scene_screenshots.go b/internal/manager/task/migrate_scene_screenshots.go index bd758391f..f56713f48 100644 --- a/internal/manager/task/migrate_scene_screenshots.go +++ b/internal/manager/task/migrate_scene_screenshots.go @@ -3,6 +3,7 @@ package task import ( "context" "errors" + "fmt" "io" "os" "path/filepath" @@ -21,7 +22,7 @@ type MigrateSceneScreenshotsJob struct { TxnManager txn.Manager } -func (j *MigrateSceneScreenshotsJob) Execute(ctx context.Context, progress *job.Progress) { +func (j *MigrateSceneScreenshotsJob) Execute(ctx context.Context, progress *job.Progress) error { var err error progress.ExecuteTask("Counting files", func() { var count int @@ -30,8 +31,7 @@ func (j *MigrateSceneScreenshotsJob) Execute(ctx context.Context, progress *job. }) if err != nil { - logger.Errorf("Error counting files: %s", err.Error()) - return + return fmt.Errorf("error counting files: %w", err) } progress.ExecuteTask("Migrating files", func() { @@ -40,15 +40,15 @@ func (j *MigrateSceneScreenshotsJob) Execute(ctx context.Context, progress *job. if job.IsCancelled(ctx) { logger.Info("Cancelled migrating scene screenshots") - return + return nil } if err != nil { - logger.Errorf("Error migrating scene screenshots: %v", err) - return + return fmt.Errorf("error migrating scene screenshots: %w", err) } logger.Infof("Finished migrating scene screenshots") + return nil } func (j *MigrateSceneScreenshotsJob) countFiles(ctx context.Context) (int, error) { diff --git a/internal/manager/task/packages.go b/internal/manager/task/packages.go index af970362f..91cf5fa3b 100644 --- a/internal/manager/task/packages.go +++ b/internal/manager/task/packages.go @@ -30,13 +30,13 @@ type InstallPackagesJob struct { Packages []*models.PackageSpecInput } -func (j *InstallPackagesJob) Execute(ctx context.Context, progress *job.Progress) { +func (j *InstallPackagesJob) Execute(ctx context.Context, progress *job.Progress) error { progress.SetTotal(len(j.Packages)) for _, p := range j.Packages { if job.IsCancelled(ctx) { logger.Info("Cancelled installing packages") - return + return nil } logger.Infof("Installing package %s", p.ID) @@ -53,6 +53,7 @@ func (j *InstallPackagesJob) Execute(ctx context.Context, progress *job.Progress } logger.Infof("Finished installing packages") + return nil } type UpdatePackagesJob struct { @@ -60,13 +61,12 @@ type UpdatePackagesJob struct { Packages []*models.PackageSpecInput } -func (j *UpdatePackagesJob) Execute(ctx context.Context, progress *job.Progress) { +func (j *UpdatePackagesJob) Execute(ctx context.Context, progress *job.Progress) error { // if no packages are specified, update all if len(j.Packages) == 0 { installed, err := j.PackageManager.InstalledStatus(ctx) if err != nil { - logger.Errorf("Error getting installed packages: %v", err) - return + return fmt.Errorf("error getting installed packages: %w", err) } for _, p := range installed { @@ -84,7 +84,7 @@ func (j *UpdatePackagesJob) Execute(ctx context.Context, progress *job.Progress) for _, p := range j.Packages { if job.IsCancelled(ctx) { logger.Info("Cancelled updating packages") - return + return nil } logger.Infof("Updating package %s", p.ID) @@ -101,6 +101,7 @@ func (j *UpdatePackagesJob) Execute(ctx context.Context, progress *job.Progress) } logger.Infof("Finished updating packages") + return nil } type UninstallPackagesJob struct { @@ -108,13 +109,13 @@ type UninstallPackagesJob struct { Packages []*models.PackageSpecInput } -func (j *UninstallPackagesJob) Execute(ctx context.Context, progress *job.Progress) { +func (j *UninstallPackagesJob) Execute(ctx context.Context, progress *job.Progress) error { progress.SetTotal(len(j.Packages)) for _, p := range j.Packages { if job.IsCancelled(ctx) { logger.Info("Cancelled installing packages") - return + return nil } logger.Infof("Uninstalling package %s", p.ID) @@ -131,4 +132,5 @@ func (j *UninstallPackagesJob) Execute(ctx context.Context, progress *job.Progre } logger.Infof("Finished uninstalling packages") + return nil } diff --git a/internal/manager/task_autotag.go b/internal/manager/task_autotag.go index 6dc37d56a..e280e79f6 100644 --- a/internal/manager/task_autotag.go +++ b/internal/manager/task_autotag.go @@ -25,7 +25,7 @@ type autoTagJob struct { cache match.Cache } -func (j *autoTagJob) Execute(ctx context.Context, progress *job.Progress) { +func (j *autoTagJob) Execute(ctx context.Context, progress *job.Progress) error { begin := time.Now() input := j.input @@ -38,6 +38,7 @@ func (j *autoTagJob) Execute(ctx context.Context, progress *job.Progress) { } logger.Infof("Finished auto-tag after %s", time.Since(begin).String()) + return nil } func (j *autoTagJob) isFileBasedAutoTag(input AutoTagMetadataInput) bool { diff --git a/internal/manager/task_clean.go b/internal/manager/task_clean.go index 3b9227549..9690cf4c8 100644 --- a/internal/manager/task_clean.go +++ b/internal/manager/task_clean.go @@ -32,7 +32,7 @@ type cleanJob struct { scanSubs *subscriptionManager } -func (j *cleanJob) Execute(ctx context.Context, progress *job.Progress) { +func (j *cleanJob) Execute(ctx context.Context, progress *job.Progress) error { logger.Infof("Starting cleaning of tracked files") start := time.Now() if j.input.DryRun { @@ -47,7 +47,7 @@ func (j *cleanJob) Execute(ctx context.Context, progress *job.Progress) { if job.IsCancelled(ctx) { logger.Info("Stopping due to user request") - return + return nil } j.cleanEmptyGalleries(ctx) @@ -55,6 +55,7 @@ func (j *cleanJob) Execute(ctx context.Context, progress *job.Progress) { j.scanSubs.notify() elapsed := time.Since(start) logger.Info(fmt.Sprintf("Finished Cleaning (%s)", elapsed)) + return nil } func (j *cleanJob) cleanEmptyGalleries(ctx context.Context) { diff --git a/internal/manager/task_generate.go b/internal/manager/task_generate.go index 5d8ebdac9..c28ffe55b 100644 --- a/internal/manager/task_generate.go +++ b/internal/manager/task_generate.go @@ -80,7 +80,7 @@ type totalsGenerate struct { tasks int } -func (j *GenerateJob) Execute(ctx context.Context, progress *job.Progress) { +func (j *GenerateJob) Execute(ctx context.Context, progress *job.Progress) error { var scenes []*models.Scene var err error var markers []*models.SceneMarker @@ -223,11 +223,12 @@ func (j *GenerateJob) Execute(ctx context.Context, progress *job.Progress) { if job.IsCancelled(ctx) { logger.Info("Stopping due to user request") - return + return nil } elapsed := time.Since(start) logger.Info(fmt.Sprintf("Generate finished (%s)", elapsed)) + return nil } func (j *GenerateJob) queueTasks(ctx context.Context, g *generate.Generator, queue chan<- Task) { diff --git a/internal/manager/task_identify.go b/internal/manager/task_identify.go index 2f8bddced..65e602a66 100644 --- a/internal/manager/task_identify.go +++ b/internal/manager/task_identify.go @@ -34,18 +34,17 @@ func CreateIdentifyJob(input identify.Options) *IdentifyJob { } } -func (j *IdentifyJob) Execute(ctx context.Context, progress *job.Progress) { +func (j *IdentifyJob) Execute(ctx context.Context, progress *job.Progress) error { j.progress = progress // if no sources provided - just return if len(j.input.Sources) == 0 { - return + return nil } sources, err := j.getSources() if err != nil { - logger.Error(err) - return + return err } // if scene ids provided, use those @@ -84,8 +83,10 @@ func (j *IdentifyJob) Execute(ctx context.Context, progress *job.Progress) { return nil }); err != nil { - logger.Errorf("Error encountered while identifying scenes: %v", err) + return fmt.Errorf("error encountered while identifying scenes: %w", err) } + + return nil } func (j *IdentifyJob) identifyAllScenes(ctx context.Context, sources []identify.ScraperSource) error { diff --git a/internal/manager/task_optimise.go b/internal/manager/task_optimise.go index 2fb1794fb..9f85e961c 100644 --- a/internal/manager/task_optimise.go +++ b/internal/manager/task_optimise.go @@ -2,6 +2,7 @@ package manager import ( "context" + "fmt" "time" "github.com/stashapp/stash/pkg/job" @@ -17,7 +18,7 @@ type OptimiseDatabaseJob struct { Optimiser Optimiser } -func (j *OptimiseDatabaseJob) Execute(ctx context.Context, progress *job.Progress) { +func (j *OptimiseDatabaseJob) Execute(ctx context.Context, progress *job.Progress) error { logger.Info("Optimising database") progress.SetTotal(2) @@ -31,11 +32,10 @@ func (j *OptimiseDatabaseJob) Execute(ctx context.Context, progress *job.Progres }) if job.IsCancelled(ctx) { logger.Info("Stopping due to user request") - return + return nil } if err != nil { - logger.Errorf("Error analyzing database: %v", err) - return + return fmt.Errorf("Error analyzing database: %w", err) } progress.ExecuteTask("Vacuuming database", func() { @@ -44,13 +44,13 @@ func (j *OptimiseDatabaseJob) Execute(ctx context.Context, progress *job.Progres }) if job.IsCancelled(ctx) { logger.Info("Stopping due to user request") - return + return nil } if err != nil { - logger.Errorf("Error vacuuming database: %v", err) - return + return fmt.Errorf("error vacuuming database: %w", err) } elapsed := time.Since(start) logger.Infof("Finished optimising database after %s", elapsed) + return nil } diff --git a/internal/manager/task_plugin.go b/internal/manager/task_plugin.go index c8be8dfd5..80f38598c 100644 --- a/internal/manager/task_plugin.go +++ b/internal/manager/task_plugin.go @@ -16,18 +16,16 @@ func (s *Manager) RunPluginTask( description *string, args plugin.OperationInput, ) int { - j := job.MakeJobExec(func(jobCtx context.Context, progress *job.Progress) { + j := job.MakeJobExec(func(jobCtx context.Context, progress *job.Progress) error { pluginProgress := make(chan float64) task, err := s.PluginCache.CreateTask(ctx, pluginID, taskName, args, pluginProgress) if err != nil { - logger.Errorf("Error creating plugin task: %s", err.Error()) - return + return fmt.Errorf("Error creating plugin task: %w", err) } err = task.Start() if err != nil { - logger.Errorf("Error running plugin task: %s", err.Error()) - return + return fmt.Errorf("Error running plugin task: %w", err) } done := make(chan bool) @@ -50,14 +48,14 @@ func (s *Manager) RunPluginTask( for { select { case <-done: - return + return nil case p := <-pluginProgress: progress.SetPercent(p) case <-jobCtx.Done(): if err := task.Stop(); err != nil { logger.Errorf("Error stopping plugin operation: %s", err.Error()) } - return + return nil } } }) diff --git a/internal/manager/task_scan.go b/internal/manager/task_scan.go index f0452bfcc..9f9fa66a2 100644 --- a/internal/manager/task_scan.go +++ b/internal/manager/task_scan.go @@ -34,12 +34,12 @@ type ScanJob struct { subscriptions *subscriptionManager } -func (j *ScanJob) Execute(ctx context.Context, progress *job.Progress) { +func (j *ScanJob) Execute(ctx context.Context, progress *job.Progress) error { input := j.input if job.IsCancelled(ctx) { logger.Info("Stopping due to user request") - return + return nil } sp := getScanPaths(input.Paths) @@ -74,13 +74,14 @@ func (j *ScanJob) Execute(ctx context.Context, progress *job.Progress) { if job.IsCancelled(ctx) { logger.Info("Stopping due to user request") - return + return nil } elapsed := time.Since(start) logger.Info(fmt.Sprintf("Scan finished (%s)", elapsed)) j.subscriptions.notify() + return nil } type extensionConfig struct { diff --git a/pkg/job/job.go b/pkg/job/job.go index b3e8685f6..fa1ef3c91 100644 --- a/pkg/job/job.go +++ b/pkg/job/job.go @@ -5,22 +5,24 @@ import ( "time" ) +type JobExecFn func(ctx context.Context, progress *Progress) error + // JobExec represents the implementation of a Job to be executed. type JobExec interface { - Execute(ctx context.Context, progress *Progress) + Execute(ctx context.Context, progress *Progress) error } type jobExecImpl struct { - fn func(ctx context.Context, progress *Progress) + fn JobExecFn } -func (j *jobExecImpl) Execute(ctx context.Context, progress *Progress) { - j.fn(ctx, progress) +func (j *jobExecImpl) Execute(ctx context.Context, progress *Progress) error { + return j.fn(ctx, progress) } // MakeJobExec returns a simple JobExec implementation using the provided // function. -func MakeJobExec(fn func(ctx context.Context, progress *Progress)) JobExec { +func MakeJobExec(fn JobExecFn) JobExec { return &jobExecImpl{ fn: fn, } @@ -56,6 +58,7 @@ type Job struct { StartTime *time.Time EndTime *time.Time AddTime time.Time + Error *string outerCtx context.Context exec JobExec @@ -87,6 +90,12 @@ func (j *Job) cancel() { } } +func (j *Job) error(err error) { + errStr := err.Error() + j.Error = &errStr + j.Status = StatusFailed +} + // IsCancelled returns true if cancel has been called on the context. func IsCancelled(ctx context.Context) bool { select { diff --git a/pkg/job/manager.go b/pkg/job/manager.go index 4ad9b1880..983d88cc0 100644 --- a/pkg/job/manager.go +++ b/pkg/job/manager.go @@ -206,7 +206,10 @@ func (m *Manager) executeJob(ctx context.Context, j *Job, done chan struct{}) { }() progress := m.newProgress(j) - j.exec.Execute(ctx, progress) + if err := j.exec.Execute(ctx, progress); err != nil { + logger.Errorf("task failed due to error: %v", err) + j.error(err) + } } func (m *Manager) onJobFinish(job *Job) { diff --git a/pkg/job/manager_test.go b/pkg/job/manager_test.go index 51bb6a1f1..d7c686cb8 100644 --- a/pkg/job/manager_test.go +++ b/pkg/job/manager_test.go @@ -24,7 +24,7 @@ func newTestExec(finish chan struct{}) *testExec { } } -func (e *testExec) Execute(ctx context.Context, p *Progress) { +func (e *testExec) Execute(ctx context.Context, p *Progress) error { e.progress = p close(e.started) @@ -38,6 +38,8 @@ func (e *testExec) Execute(ctx context.Context, p *Progress) { // fall through } } + + return nil } func TestAdd(t *testing.T) { diff --git a/pkg/sqlite/database.go b/pkg/sqlite/database.go index 9c7dfdbe3..e0d6678f6 100644 --- a/pkg/sqlite/database.go +++ b/pkg/sqlite/database.go @@ -10,9 +10,6 @@ import ( "path/filepath" "time" - "github.com/golang-migrate/migrate/v4" - sqlite3mig "github.com/golang-migrate/migrate/v4/database/sqlite3" - "github.com/golang-migrate/migrate/v4/source/iofs" "github.com/jmoiron/sqlx" "github.com/stashapp/stash/pkg/fsutil" @@ -144,7 +141,7 @@ func (db *Database) Open(dbPath string) error { if databaseSchemaVersion == 0 { // new database, just run the migrations - if err := db.RunMigrations(); err != nil { + if err := db.RunAllMigrations(); err != nil { return fmt.Errorf("error running initial schema migrations: %w", err) } } else { @@ -312,11 +309,6 @@ func (db *Database) RestoreFromBackup(backupPath string) error { return os.Rename(backupPath, db.dbPath) } -// Migrate the database -func (db *Database) needsMigration() bool { - return db.schemaVersion != appSchemaVersion -} - func (db *Database) AppSchemaVersion() uint { return appSchemaVersion } @@ -349,100 +341,6 @@ func (db *Database) Version() uint { return db.schemaVersion } -func (db *Database) getMigrate() (*migrate.Migrate, error) { - migrations, err := iofs.New(migrationsBox, "migrations") - if err != nil { - return nil, err - } - - const disableForeignKeys = true - conn, err := db.open(disableForeignKeys) - if err != nil { - return nil, err - } - - driver, err := sqlite3mig.WithInstance(conn.DB, &sqlite3mig.Config{}) - if err != nil { - return nil, err - } - - // use sqlite3Driver so that migration has access to durationToTinyInt - return migrate.NewWithInstance( - "iofs", - migrations, - db.dbPath, - driver, - ) -} - -func (db *Database) getDatabaseSchemaVersion() (uint, error) { - m, err := db.getMigrate() - if err != nil { - return 0, err - } - defer m.Close() - - ret, _, _ := m.Version() - return ret, nil -} - -// Migrate the database -func (db *Database) RunMigrations() error { - ctx := context.Background() - - m, err := db.getMigrate() - if err != nil { - return err - } - defer m.Close() - - databaseSchemaVersion, _, _ := m.Version() - stepNumber := appSchemaVersion - databaseSchemaVersion - if stepNumber != 0 { - logger.Infof("Migrating database from version %d to %d", databaseSchemaVersion, appSchemaVersion) - - // run each migration individually, and run custom migrations as needed - var i uint = 1 - for ; i <= stepNumber; i++ { - newVersion := databaseSchemaVersion + i - - // run pre migrations as needed - if err := db.runCustomMigrations(ctx, preMigrations[newVersion]); err != nil { - return fmt.Errorf("running pre migrations for schema version %d: %w", newVersion, err) - } - - err = m.Steps(1) - if err != nil { - // migration failed - return err - } - - // run post migrations as needed - if err := db.runCustomMigrations(ctx, postMigrations[newVersion]); err != nil { - return fmt.Errorf("running post migrations for schema version %d: %w", newVersion, err) - } - } - } - - // update the schema version - db.schemaVersion, _, _ = m.Version() - - // re-initialise the database - const disableForeignKeys = false - db.db, err = db.open(disableForeignKeys) - if err != nil { - return fmt.Errorf("re-initializing the database: %w", err) - } - - // optimize database after migration - err = db.Optimise(ctx) - if err != nil { - logger.Warnf("error while performing post-migration optimisation: %v", err) - } - - return nil -} - func (db *Database) Optimise(ctx context.Context) error { logger.Info("Optimising database") @@ -524,28 +422,3 @@ func (db *Database) QuerySQL(ctx context.Context, query string, args []interface return cols, ret, nil } - -func (db *Database) runCustomMigrations(ctx context.Context, fns []customMigrationFunc) error { - for _, fn := range fns { - if err := db.runCustomMigration(ctx, fn); err != nil { - return err - } - } - - return nil -} - -func (db *Database) runCustomMigration(ctx context.Context, fn customMigrationFunc) error { - const disableForeignKeys = false - d, err := db.open(disableForeignKeys) - if err != nil { - return err - } - - defer d.Close() - if err := fn(ctx, d); err != nil { - return err - } - - return nil -} diff --git a/pkg/sqlite/migrate.go b/pkg/sqlite/migrate.go new file mode 100644 index 000000000..9fb36dba1 --- /dev/null +++ b/pkg/sqlite/migrate.go @@ -0,0 +1,188 @@ +package sqlite + +import ( + "context" + "fmt" + + "github.com/golang-migrate/migrate/v4" + sqlite3mig "github.com/golang-migrate/migrate/v4/database/sqlite3" + "github.com/golang-migrate/migrate/v4/source/iofs" + "github.com/stashapp/stash/pkg/logger" +) + +func (db *Database) needsMigration() bool { + return db.schemaVersion != appSchemaVersion +} + +type Migrator struct { + db *Database + m *migrate.Migrate +} + +func NewMigrator(db *Database) (*Migrator, error) { + m := &Migrator{ + db: db, + } + + var err error + m.m, err = m.getMigrate() + return m, err +} + +func (m *Migrator) Close() { + if m.m != nil { + m.m.Close() + m.m = nil + } +} + +func (m *Migrator) CurrentSchemaVersion() uint { + databaseSchemaVersion, _, _ := m.m.Version() + return databaseSchemaVersion +} + +func (m *Migrator) RequiredSchemaVersion() uint { + return appSchemaVersion +} + +func (m *Migrator) getMigrate() (*migrate.Migrate, error) { + migrations, err := iofs.New(migrationsBox, "migrations") + if err != nil { + return nil, err + } + + const disableForeignKeys = true + conn, err := m.db.open(disableForeignKeys) + if err != nil { + return nil, err + } + + driver, err := sqlite3mig.WithInstance(conn.DB, &sqlite3mig.Config{}) + if err != nil { + return nil, err + } + + // use sqlite3Driver so that migration has access to durationToTinyInt + return migrate.NewWithInstance( + "iofs", + migrations, + m.db.dbPath, + driver, + ) +} + +func (m *Migrator) RunMigration(ctx context.Context, newVersion uint) error { + databaseSchemaVersion, _, _ := m.m.Version() + + if newVersion != databaseSchemaVersion+1 { + return fmt.Errorf("invalid migration version %d, expected %d", newVersion, databaseSchemaVersion+1) + } + + // run pre migrations as needed + if err := m.runCustomMigrations(ctx, preMigrations[newVersion]); err != nil { + return fmt.Errorf("running pre migrations for schema version %d: %w", newVersion, err) + } + + if err := m.m.Steps(1); err != nil { + // migration failed + return err + } + + // run post migrations as needed + if err := m.runCustomMigrations(ctx, postMigrations[newVersion]); err != nil { + return fmt.Errorf("running post migrations for schema version %d: %w", newVersion, err) + } + + // update the schema version + m.db.schemaVersion, _, _ = m.m.Version() + + return nil +} + +func (m *Migrator) runCustomMigrations(ctx context.Context, fns []customMigrationFunc) error { + for _, fn := range fns { + if err := m.runCustomMigration(ctx, fn); err != nil { + return err + } + } + + return nil +} + +func (m *Migrator) runCustomMigration(ctx context.Context, fn customMigrationFunc) error { + const disableForeignKeys = false + d, err := m.db.open(disableForeignKeys) + if err != nil { + return err + } + + defer d.Close() + if err := fn(ctx, d); err != nil { + return err + } + + return nil +} + +func (db *Database) getDatabaseSchemaVersion() (uint, error) { + m, err := NewMigrator(db) + if err != nil { + return 0, err + } + defer m.Close() + + ret, _, _ := m.m.Version() + return ret, nil +} + +func (db *Database) ReInitialise() error { + const disableForeignKeys = false + var err error + db.db, err = db.open(disableForeignKeys) + if err != nil { + return fmt.Errorf("re-initializing the database: %w", err) + } + + return nil +} + +// RunAllMigrations runs all migrations to bring the database up to the current schema version +func (db *Database) RunAllMigrations() error { + ctx := context.Background() + + m, err := NewMigrator(db) + if err != nil { + return err + } + defer m.Close() + + databaseSchemaVersion, _, _ := m.m.Version() + stepNumber := appSchemaVersion - databaseSchemaVersion + if stepNumber != 0 { + logger.Infof("Migrating database from version %d to %d", databaseSchemaVersion, appSchemaVersion) + + // run each migration individually, and run custom migrations as needed + var i uint = 1 + for ; i <= stepNumber; i++ { + newVersion := databaseSchemaVersion + i + if err := m.RunMigration(ctx, newVersion); err != nil { + return err + } + } + } + + // re-initialise the database + const disableForeignKeys = false + db.db, err = db.open(disableForeignKeys) + if err != nil { + return fmt.Errorf("re-initializing the database: %w", err) + } + + // optimize database after migration + err = db.Optimise(ctx) + if err != nil { + logger.Warnf("error while performing post-migration optimisation: %v", err) + } + + return nil +} diff --git a/ui/v2.5/graphql/data/job.graphql b/ui/v2.5/graphql/data/job.graphql index d632bdd82..338f7522b 100644 --- a/ui/v2.5/graphql/data/job.graphql +++ b/ui/v2.5/graphql/data/job.graphql @@ -7,4 +7,5 @@ fragment JobData on Job { startTime endTime addTime + error } diff --git a/ui/v2.5/graphql/subscriptions.graphql b/ui/v2.5/graphql/subscriptions.graphql index 7510e9f4a..4c7ffaebf 100644 --- a/ui/v2.5/graphql/subscriptions.graphql +++ b/ui/v2.5/graphql/subscriptions.graphql @@ -7,6 +7,7 @@ subscription JobsSubscribe { subTasks description progress + error } } } diff --git a/ui/v2.5/src/App.tsx b/ui/v2.5/src/App.tsx index 0b63d14dd..eed4da776 100644 --- a/ui/v2.5/src/App.tsx +++ b/ui/v2.5/src/App.tsx @@ -49,6 +49,7 @@ import { PluginRoutes } from "./plugins"; // import plugin_api to run code import "./pluginApi"; +import { ConnectionMonitor } from "./ConnectionMonitor"; const Performers = lazyComponent( () => import("./components/Performers/Performers") @@ -369,6 +370,7 @@ export const App: React.FC = () => { > {maybeRenderReleaseNotes()} + }> diff --git a/ui/v2.5/src/ConnectionMonitor.tsx b/ui/v2.5/src/ConnectionMonitor.tsx new file mode 100644 index 000000000..fb733922f --- /dev/null +++ b/ui/v2.5/src/ConnectionMonitor.tsx @@ -0,0 +1,34 @@ +import { useEffect, useState } from "react"; +import { getWSClient, useWSState } from "./core/StashService"; +import { useToast } from "./hooks/Toast"; +import { useIntl } from "react-intl"; + +export const ConnectionMonitor: React.FC = () => { + const Toast = useToast(); + const intl = useIntl(); + + const { state } = useWSState(getWSClient()); + const [cachedState, setCacheState] = useState(state); + + useEffect(() => { + if (cachedState === "connecting" && state === "error") { + Toast.error( + intl.formatMessage({ + id: "connection_monitor.websocket_connection_failed", + }) + ); + } + + if (state === "connected" && cachedState === "error") { + Toast.success( + intl.formatMessage({ + id: "connection_monitor.websocket_connection_reestablished", + }) + ); + } + + setCacheState(state); + }, [state, cachedState, Toast, intl]); + + return null; +}; diff --git a/ui/v2.5/src/components/Settings/Tasks/JobTable.tsx b/ui/v2.5/src/components/Settings/Tasks/JobTable.tsx index 2d394a5e0..82ed46c85 100644 --- a/ui/v2.5/src/components/Settings/Tasks/JobTable.tsx +++ b/ui/v2.5/src/components/Settings/Tasks/JobTable.tsx @@ -12,6 +12,7 @@ import { faBan, faCheck, faCircle, + faCircleExclamation, faCog, faHourglassStart, faTimes, @@ -19,7 +20,7 @@ import { type JobFragment = Pick< GQL.Job, - "id" | "status" | "subTasks" | "description" | "progress" + "id" | "status" | "subTasks" | "description" | "progress" | "error" >; interface IJob { @@ -37,6 +38,7 @@ const Task: React.FC = ({ job }) => { useEffect(() => { if ( job.status === GQL.JobStatus.Cancelled || + job.status === GQL.JobStatus.Failed || job.status === GQL.JobStatus.Finished ) { // fade out around 10 seconds @@ -71,6 +73,8 @@ const Task: React.FC = ({ job }) => { return "finished"; case GQL.JobStatus.Cancelled: return "cancelled"; + case GQL.JobStatus.Failed: + return "failed"; } } @@ -95,6 +99,9 @@ const Task: React.FC = ({ job }) => { case GQL.JobStatus.Cancelled: icon = faBan; break; + case GQL.JobStatus.Failed: + icon = faCircleExclamation; + break; } return ; @@ -134,6 +141,10 @@ const Task: React.FC = ({ job }) => { ); } + + if (job.status === GQL.JobStatus.Failed && job.error) { + return
{job.error}
; + } } return ( diff --git a/ui/v2.5/src/components/Settings/styles.scss b/ui/v2.5/src/components/Settings/styles.scss index 46c86986f..b7899d8d1 100644 --- a/ui/v2.5/src/components/Settings/styles.scss +++ b/ui/v2.5/src/components/Settings/styles.scss @@ -296,6 +296,10 @@ color: $success; } + .failed .fa-icon { + color: $danger; + } + .ready .fa-icon { color: $warning; } @@ -304,6 +308,10 @@ .finished { color: $text-muted; } + + .job-error { + color: $danger; + } } #temp-enable-duration .duration-control:disabled { diff --git a/ui/v2.5/src/components/Setup/Migrate.tsx b/ui/v2.5/src/components/Setup/Migrate.tsx index 23fc0b520..2f8e38f55 100644 --- a/ui/v2.5/src/components/Setup/Migrate.tsx +++ b/ui/v2.5/src/components/Setup/Migrate.tsx @@ -1,22 +1,42 @@ import React, { useEffect, useMemo, useState } from "react"; -import { Button, Card, Container, Form } from "react-bootstrap"; +import { Button, Card, Container, Form, ProgressBar } from "react-bootstrap"; import { useIntl, FormattedMessage } from "react-intl"; import { useHistory } from "react-router-dom"; import * as GQL from "src/core/generated-graphql"; -import { useSystemStatus, mutateMigrate } from "src/core/StashService"; +import { + useSystemStatus, + mutateMigrate, + postMigrate, +} from "src/core/StashService"; import { migrationNotes } from "src/docs/en/MigrationNotes"; import { ExternalLink } from "../Shared/ExternalLink"; import { LoadingIndicator } from "../Shared/LoadingIndicator"; import { MarkdownPage } from "../Shared/MarkdownPage"; +import { useMonitorJob } from "src/utils/job"; export const Migrate: React.FC = () => { + const intl = useIntl(); + const history = useHistory(); + const { data: systemStatus, loading } = useSystemStatus(); + const [backupPath, setBackupPath] = useState(); const [migrateLoading, setMigrateLoading] = useState(false); const [migrateError, setMigrateError] = useState(""); - const intl = useIntl(); - const history = useHistory(); + const [jobID, setJobID] = useState(); + + const { job } = useMonitorJob(jobID, (finishedJob) => { + setJobID(undefined); + setMigrateLoading(false); + + if (finishedJob?.error) { + setMigrateError(finishedJob.error); + } else { + postMigrate(); + history.push("/"); + } + }); // if database path includes path separators, then this is passed through // to the migration path. Extract the base name of the database file. @@ -94,10 +114,32 @@ export const Migrate: React.FC = () => { } if (migrateLoading) { + const progress = + job && job.progress !== undefined && job.progress !== null + ? job.progress * 100 + : undefined; + return ( - +
+

+ + + + +

+ {progress !== undefined && ( + + )} + {job?.subTasks?.map((subTask, i) => ( +
+

{subTask}

+
+ ))} +
); } @@ -113,11 +155,13 @@ export const Migrate: React.FC = () => { try { setMigrateLoading(true); setMigrateError(""); - await mutateMigrate({ + + // migrate now uses the job manager + const ret = await mutateMigrate({ backupPath: backupPath ?? "", }); - history.push("/"); + setJobID(ret.data?.migrate); } catch (e) { if (e instanceof Error) setMigrateError(e.message ?? e.toString()); setMigrateLoading(false); diff --git a/ui/v2.5/src/components/Setup/styles.scss b/ui/v2.5/src/components/Setup/styles.scss index e10d67c62..36db2798a 100644 --- a/ui/v2.5/src/components/Setup/styles.scss +++ b/ui/v2.5/src/components/Setup/styles.scss @@ -7,3 +7,20 @@ padding: 16px; } } + +.migrate-loading-status { + align-items: center; + display: flex; + flex-direction: column; + height: 70vh; + justify-content: center; + width: 100%; + + .progress { + width: 60%; + } + + h4 span { + margin-left: 0.5rem; + } +} diff --git a/ui/v2.5/src/core/StashService.ts b/ui/v2.5/src/core/StashService.ts index 873530d9d..d6432fc95 100644 --- a/ui/v2.5/src/core/StashService.ts +++ b/ui/v2.5/src/core/StashService.ts @@ -15,10 +15,36 @@ import { ListFilterModel } from "../models/list-filter/filter"; import * as GQL from "./generated-graphql"; import { createClient } from "./createClient"; +import { Client } from "graphql-ws"; +import { useEffect, useState } from "react"; -const { client } = createClient(); +const { client, wsClient, cache: clientCache } = createClient(); export const getClient = () => client; +export const getWSClient = () => wsClient; + +export function useWSState(ws: Client) { + const [state, setState] = useState<"connecting" | "connected" | "error">( + "connecting" + ); + + useEffect(() => { + const disposeConnected = ws.on("connected", () => { + setState("connected"); + }); + + const disposeError = ws.on("error", () => { + setState("error"); + }); + + return () => { + disposeConnected(); + disposeError(); + }; + }, [ws]); + + return { state }; +} // Evicts cached results for the given queries. // Will also call a cache GC afterwards. @@ -2382,13 +2408,14 @@ export const mutateMigrate = (input: GQL.MigrateInput) => client.mutate({ mutation: GQL.MigrateDocument, variables: { input }, - update(cache, result) { - if (!result.data?.migrate) return; - - evictQueries(cache, setupMutationImpactedQueries); - }, }); +// migrate now runs asynchronously, so we need to evict queries +// once it successfully completes +export function postMigrate() { + evictQueries(clientCache, setupMutationImpactedQueries); +} + /// Packages // Acts like GQL.useInstalledScraperPackagesStatusQuery if loadUpgrades is true, diff --git a/ui/v2.5/src/core/createClient.ts b/ui/v2.5/src/core/createClient.ts index 25ec4b958..045f27676 100644 --- a/ui/v2.5/src/core/createClient.ts +++ b/ui/v2.5/src/core/createClient.ts @@ -144,15 +144,15 @@ export const createClient = () => { const httpLink = createUploadLink({ uri: url.toString() }); - const wsLink = new GraphQLWsLink( - createWSClient({ - url: wsUrl.toString(), - retryAttempts: Infinity, - shouldRetry() { - return true; - }, - }) - ); + const wsClient = createWSClient({ + url: wsUrl.toString(), + retryAttempts: Infinity, + shouldRetry() { + return true; + }, + }); + + const wsLink = new GraphQLWsLink(wsClient); const errorLink = onError(({ networkError }) => { // handle graphql unauthorized error @@ -211,5 +211,6 @@ Please disable it on the server and refresh the page.`); return { cache, client, + wsClient, }; }; diff --git a/ui/v2.5/src/locales/en-GB.json b/ui/v2.5/src/locales/en-GB.json index d13ca12da..eabbfe1ef 100644 --- a/ui/v2.5/src/locales/en-GB.json +++ b/ui/v2.5/src/locales/en-GB.json @@ -776,6 +776,10 @@ } }, "configuration": "Configuration", + "connection_monitor": { + "websocket_connection_failed": "Unable to make websocket connection: see browser console for details", + "websocket_connection_reestablished": "Websocket connection re-established" + }, "countables": { "files": "{count, plural, one {File} other {Files}}", "galleries": "{count, plural, one {Gallery} other {Galleries}}", diff --git a/ui/v2.5/src/utils/job.ts b/ui/v2.5/src/utils/job.ts index c7f8007d2..fc473e47c 100644 --- a/ui/v2.5/src/utils/job.ts +++ b/ui/v2.5/src/utils/job.ts @@ -1,24 +1,36 @@ import { useEffect, useState } from "react"; +import { getWSClient, useWSState } from "src/core/StashService"; import { Job, + JobStatus, JobStatusUpdateType, - useJobQueueQuery, + useFindJobQuery, useJobsSubscribeSubscription, } from "src/core/generated-graphql"; export type JobFragment = Pick< Job, - "id" | "status" | "subTasks" | "description" | "progress" + "id" | "status" | "subTasks" | "description" | "progress" | "error" >; export const useMonitorJob = ( jobID: string | undefined | null, - onComplete?: () => void + onComplete?: (job?: JobFragment) => void ) => { + const { state } = useWSState(getWSClient()); + const jobsSubscribe = useJobsSubscribeSubscription({ skip: !jobID, }); - const { data: jobData, loading } = useJobQueueQuery({ + const { + data: jobData, + loading, + startPolling, + stopPolling, + } = useFindJobQuery({ + variables: { + input: { id: jobID ?? "" }, + }, fetchPolicy: "network-only", skip: !jobID, }); @@ -34,19 +46,26 @@ export const useMonitorJob = ( return; } - const j = jobData?.jobQueue?.find((jj) => jj.id === jobID); + const j = jobData?.findJob; if (j) { setJob(j); + + if ( + j.status === JobStatus.Finished || + j.status === JobStatus.Failed || + j.status === JobStatus.Cancelled + ) { + setJob(undefined); + onComplete?.(j); + } } else { // must've already finished setJob(undefined); - if (onComplete) { - onComplete(); - } + onComplete?.(); } }, [jobID, jobData, loading, onComplete]); - // monitor batch operation + // monitor job useEffect(() => { if (!jobID) { return; @@ -65,11 +84,25 @@ export const useMonitorJob = ( setJob(event.job); } else { setJob(undefined); - if (onComplete) { - onComplete(); - } + onComplete?.(event.job); } }, [jobsSubscribe, jobID, onComplete]); + // it's possible that the websocket connection isn't present + // in that case, we'll just poll the server + useEffect(() => { + if (!jobID) { + stopPolling(); + return; + } + + if (state === "connected") { + stopPolling(); + } else { + const defaultPollInterval = 1000; + startPolling(defaultPollInterval); + } + }, [jobID, state, startPolling, stopPolling]); + return { job }; };