Make migration an asynchronous task (#4666)

* Add failed state and error to Job
* Move migration code
* Add websocket monitor
* Make migrate a job managed task
This commit is contained in:
WithoutPants 2024-03-14 11:06:23 +11:00 committed by GitHub
parent fa172c2dfd
commit e5929389b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 693 additions and 304 deletions

View File

@ -226,7 +226,9 @@ type Query {
type Mutation {
setup(input: SetupInput!): Boolean!
migrate(input: MigrateInput!): Boolean!
"Migrates the schema to the required version. Returns the job ID"
migrate(input: MigrateInput!): ID!
sceneCreate(input: SceneCreateInput!): Scene
sceneUpdate(input: SceneUpdateInput!): Scene

View File

@ -4,6 +4,7 @@ enum JobStatus {
FINISHED
STOPPING
CANCELLED
FAILED
}
type Job {
@ -15,6 +16,7 @@ type Job {
startTime: Time
endTime: Time
addTime: Time!
error: String
}
input FindJobInput {

View File

@ -22,11 +22,6 @@ func (r *mutationResolver) Setup(ctx context.Context, input manager.SetupInput)
return err == nil, err
}
func (r *mutationResolver) Migrate(ctx context.Context, input manager.MigrateInput) (bool, error) {
err := manager.GetInstance().Migrate(ctx, input)
return err == nil, err
}
func (r *mutationResolver) ConfigureGeneral(ctx context.Context, input ConfigGeneralInput) (*ConfigGeneralResult, error) {
c := config.GetInstance()

View File

@ -38,3 +38,16 @@ func (r *mutationResolver) MigrateBlobs(ctx context.Context, input MigrateBlobsI
return strconv.Itoa(jobID), nil
}
func (r *mutationResolver) Migrate(ctx context.Context, input manager.MigrateInput) (string, error) {
mgr := manager.GetInstance()
t := &task.MigrateJob{
BackupPath: input.BackupPath,
Config: mgr.Config,
Database: mgr.Database,
}
jobID := mgr.JobManager.Add(ctx, "Migrating database...", t)
return strconv.Itoa(jobID), nil
}

View File

@ -41,6 +41,7 @@ func jobToJobModel(j job.Job) *Job {
StartTime: j.StartTime,
EndTime: j.EndTime,
AddTime: j.AddTime,
Error: j.Error,
}
if j.Progress != -1 {

View File

@ -303,52 +303,6 @@ func (s *Manager) validateFFmpeg() error {
return nil
}
func (s *Manager) Migrate(ctx context.Context, input MigrateInput) error {
database := s.Database
// always backup so that we can roll back to the previous version if
// migration fails
backupPath := input.BackupPath
if backupPath == "" {
backupPath = database.DatabaseBackupPath(s.Config.GetBackupDirectoryPath())
} else {
// check if backup path is a filename or path
// filename goes into backup directory, path is kept as is
filename := filepath.Base(backupPath)
if backupPath == filename {
backupPath = filepath.Join(s.Config.GetBackupDirectoryPathOrDefault(), filename)
}
}
// perform database backup
if err := database.Backup(backupPath); err != nil {
return fmt.Errorf("error backing up database: %s", err)
}
if err := database.RunMigrations(); err != nil {
errStr := fmt.Sprintf("error performing migration: %s", err)
// roll back to the backed up version
restoreErr := database.RestoreFromBackup(backupPath)
if restoreErr != nil {
errStr = fmt.Sprintf("ERROR: unable to restore database from backup after migration failure: %s\n%s", restoreErr.Error(), errStr)
} else {
errStr = "An error occurred migrating the database to the latest schema version. The backup database file was automatically renamed to restore the database.\n" + errStr
}
return errors.New(errStr)
}
// if no backup path was provided, then delete the created backup
if input.BackupPath == "" {
if err := os.Remove(backupPath); err != nil {
logger.Warnf("error removing unwanted database backup (%s): %s", backupPath, err.Error())
}
}
return nil
}
func (s *Manager) BackupDatabase(download bool) (string, string, error) {
var backupPath string
var backupName string

View File

@ -136,7 +136,7 @@ func (s *Manager) Import(ctx context.Context) (int, error) {
return 0, errors.New("metadata path must be set in config")
}
j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) {
j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) error {
task := ImportTask{
repository: s.Repository,
resetter: s.Database,
@ -147,6 +147,9 @@ func (s *Manager) Import(ctx context.Context) (int, error) {
fileNamingAlgorithm: config.GetVideoFileNamingAlgorithm(),
}
task.Start(ctx)
// TODO - return error from task
return nil
})
return s.JobManager.Add(ctx, "Importing...", j), nil
@ -159,7 +162,7 @@ func (s *Manager) Export(ctx context.Context) (int, error) {
return 0, errors.New("metadata path must be set in config")
}
j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) {
j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) error {
var wg sync.WaitGroup
wg.Add(1)
task := ExportTask{
@ -168,6 +171,8 @@ func (s *Manager) Export(ctx context.Context) (int, error) {
fileNamingAlgorithm: config.GetVideoFileNamingAlgorithm(),
}
task.Start(ctx, &wg)
// TODO - return error from task
return nil
})
return s.JobManager.Add(ctx, "Exporting...", j), nil
@ -177,9 +182,11 @@ func (s *Manager) RunSingleTask(ctx context.Context, t Task) int {
var wg sync.WaitGroup
wg.Add(1)
j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) {
j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) error {
t.Start(ctx)
wg.Done()
defer wg.Done()
// TODO - return error from task
return nil
})
return s.JobManager.Add(ctx, t.GetDescription(), j)
@ -215,11 +222,10 @@ func (s *Manager) generateScreenshot(ctx context.Context, sceneId string, at *fl
logger.Warnf("failure generating screenshot: %v", err)
}
j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) {
j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) error {
sceneIdInt, err := strconv.Atoi(sceneId)
if err != nil {
logger.Errorf("Error parsing scene id %s: %v", sceneId, err)
return
return fmt.Errorf("error parsing scene id %s: %w", sceneId, err)
}
var scene *models.Scene
@ -234,8 +240,7 @@ func (s *Manager) generateScreenshot(ctx context.Context, sceneId string, at *fl
return scene.LoadPrimaryFile(ctx, s.Repository.File)
}); err != nil {
logger.Errorf("error finding scene for screenshot generation: %v", err)
return
return fmt.Errorf("error finding scene for screenshot generation: %w", err)
}
task := GenerateCoverTask{
@ -248,6 +253,9 @@ func (s *Manager) generateScreenshot(ctx context.Context, sceneId string, at *fl
task.Start(ctx)
logger.Infof("Generate screenshot finished")
// TODO - return error from task
return nil
})
return s.JobManager.Add(ctx, fmt.Sprintf("Generating screenshot for scene id %s", sceneId), j)
@ -309,7 +317,7 @@ func (s *Manager) OptimiseDatabase(ctx context.Context) int {
}
func (s *Manager) MigrateHash(ctx context.Context) int {
j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) {
j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) error {
fileNamingAlgo := config.GetInstance().GetVideoFileNamingAlgorithm()
logger.Infof("Migrating generated files for %s naming hash", fileNamingAlgo.String())
@ -319,8 +327,7 @@ func (s *Manager) MigrateHash(ctx context.Context) int {
scenes, err = s.Repository.Scene.All(ctx)
return err
}); err != nil {
logger.Errorf("failed to fetch list of scenes for migration: %s", err.Error())
return
return fmt.Errorf("failed to fetch list of scenes for migration: %w", err)
}
var wg sync.WaitGroup
@ -331,7 +338,7 @@ func (s *Manager) MigrateHash(ctx context.Context) int {
progress.Increment()
if job.IsCancelled(ctx) {
logger.Info("Stopping due to user request")
return
return nil
}
if scene == nil {
@ -351,6 +358,7 @@ func (s *Manager) MigrateHash(ctx context.Context) int {
}
logger.Info("Finished migrating")
return nil
})
return s.JobManager.Add(ctx, "Migrating scene hashes...", j)
@ -381,13 +389,12 @@ type StashBoxBatchTagInput struct {
}
func (s *Manager) StashBoxBatchPerformerTag(ctx context.Context, input StashBoxBatchTagInput) int {
j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) {
j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) error {
logger.Infof("Initiating stash-box batch performer tag")
boxes := config.GetInstance().GetStashBoxes()
if input.Endpoint < 0 || input.Endpoint >= len(boxes) {
logger.Error(fmt.Errorf("invalid stash_box_index %d", input.Endpoint))
return
return fmt.Errorf("invalid stash_box_index %d", input.Endpoint)
}
box := boxes[input.Endpoint]
@ -435,7 +442,7 @@ func (s *Manager) StashBoxBatchPerformerTag(ctx context.Context, input StashBoxB
}
return nil
}); err != nil {
logger.Error(err.Error())
return err
}
} else if len(input.Names) > 0 || len(input.PerformerNames) > 0 {
// The user is batch adding performers
@ -493,13 +500,12 @@ func (s *Manager) StashBoxBatchPerformerTag(ctx context.Context, input StashBoxB
}
return nil
}); err != nil {
logger.Error(err.Error())
return
return err
}
}
if len(tasks) == 0 {
return
return nil
}
progress.SetTotal(len(tasks))
@ -513,19 +519,20 @@ func (s *Manager) StashBoxBatchPerformerTag(ctx context.Context, input StashBoxB
progress.Increment()
}
return nil
})
return s.JobManager.Add(ctx, "Batch stash-box performer tag...", j)
}
func (s *Manager) StashBoxBatchStudioTag(ctx context.Context, input StashBoxBatchTagInput) int {
j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) {
j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) error {
logger.Infof("Initiating stash-box batch studio tag")
boxes := config.GetInstance().GetStashBoxes()
if input.Endpoint < 0 || input.Endpoint >= len(boxes) {
logger.Error(fmt.Errorf("invalid stash_box_index %d", input.Endpoint))
return
return fmt.Errorf("invalid stash_box_index %d", input.Endpoint)
}
box := boxes[input.Endpoint]
@ -620,13 +627,12 @@ func (s *Manager) StashBoxBatchStudioTag(ctx context.Context, input StashBoxBatc
}
return nil
}); err != nil {
logger.Error(err.Error())
return
return err
}
}
if len(tasks) == 0 {
return
return nil
}
progress.SetTotal(len(tasks))
@ -640,6 +646,8 @@ func (s *Manager) StashBoxBatchStudioTag(ctx context.Context, input StashBoxBatc
progress.Increment()
}
return nil
})
return s.JobManager.Add(ctx, "Batch stash-box studio tag...", j)

View File

@ -106,17 +106,15 @@ func (j *CleanGeneratedJob) logError(err error) {
}
}
func (j *CleanGeneratedJob) Execute(ctx context.Context, progress *job.Progress) {
func (j *CleanGeneratedJob) Execute(ctx context.Context, progress *job.Progress) error {
j.tasksComplete = 0
if !j.BlobsStorageType.IsValid() {
logger.Errorf("invalid blobs storage type: %s", j.BlobsStorageType)
return
return fmt.Errorf("invalid blobs storage type: %s", j.BlobsStorageType)
}
if !j.VideoFileNamingAlgorithm.IsValid() {
logger.Errorf("invalid video file naming algorithm: %s", j.VideoFileNamingAlgorithm)
return
return fmt.Errorf("invalid video file naming algorithm: %s", j.VideoFileNamingAlgorithm)
}
if j.Options.DryRun {
@ -183,10 +181,11 @@ func (j *CleanGeneratedJob) Execute(ctx context.Context, progress *job.Progress)
if job.IsCancelled(ctx) {
logger.Info("Stopping due to user request")
return
return nil
}
logger.Infof("Finished cleaning generated files")
return nil
}
func (j *CleanGeneratedJob) setTaskProgress(taskProgress float64, progress *job.Progress) {

View File

@ -0,0 +1,153 @@
package task
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"github.com/stashapp/stash/pkg/job"
"github.com/stashapp/stash/pkg/logger"
"github.com/stashapp/stash/pkg/sqlite"
)
type migrateJobConfig interface {
GetBackupDirectoryPath() string
GetBackupDirectoryPathOrDefault() string
}
type MigrateJob struct {
BackupPath string
Config migrateJobConfig
Database *sqlite.Database
}
func (s *MigrateJob) Execute(ctx context.Context, progress *job.Progress) error {
required, err := s.required()
if err != nil {
return err
}
if required == 0 {
logger.Infof("database is already at the latest schema version")
return nil
}
// set the number of tasks = required steps + optimise
progress.SetTotal(int(required + 1))
database := s.Database
// always backup so that we can roll back to the previous version if
// migration fails
backupPath := s.BackupPath
if backupPath == "" {
backupPath = database.DatabaseBackupPath(s.Config.GetBackupDirectoryPath())
} else {
// check if backup path is a filename or path
// filename goes into backup directory, path is kept as is
filename := filepath.Base(backupPath)
if backupPath == filename {
backupPath = filepath.Join(s.Config.GetBackupDirectoryPathOrDefault(), filename)
}
}
// perform database backup
if err := database.Backup(backupPath); err != nil {
return fmt.Errorf("error backing up database: %s", err)
}
if err := s.runMigrations(ctx, progress); err != nil {
errStr := fmt.Sprintf("error performing migration: %s", err)
// roll back to the backed up version
restoreErr := database.RestoreFromBackup(backupPath)
if restoreErr != nil {
errStr = fmt.Sprintf("ERROR: unable to restore database from backup after migration failure: %s\n%s", restoreErr.Error(), errStr)
} else {
errStr = "An error occurred migrating the database to the latest schema version. The backup database file was automatically renamed to restore the database.\n" + errStr
}
return errors.New(errStr)
}
// if no backup path was provided, then delete the created backup
if s.BackupPath == "" {
if err := os.Remove(backupPath); err != nil {
logger.Warnf("error removing unwanted database backup (%s): %s", backupPath, err.Error())
}
}
return nil
}
func (s *MigrateJob) required() (uint, error) {
database := s.Database
m, err := sqlite.NewMigrator(database)
if err != nil {
return 0, err
}
defer m.Close()
currentSchemaVersion := m.CurrentSchemaVersion()
targetSchemaVersion := m.RequiredSchemaVersion()
if targetSchemaVersion < currentSchemaVersion {
// shouldn't happen
return 0, nil
}
return targetSchemaVersion - currentSchemaVersion, nil
}
func (s *MigrateJob) runMigrations(ctx context.Context, progress *job.Progress) error {
database := s.Database
m, err := sqlite.NewMigrator(database)
if err != nil {
return err
}
defer m.Close()
for {
currentSchemaVersion := m.CurrentSchemaVersion()
targetSchemaVersion := m.RequiredSchemaVersion()
if currentSchemaVersion >= targetSchemaVersion {
break
}
var err error
progress.ExecuteTask(fmt.Sprintf("Migrating database to schema version %d", currentSchemaVersion+1), func() {
err = m.RunMigration(ctx, currentSchemaVersion+1)
})
if err != nil {
return fmt.Errorf("error running migration for schema %d: %s", currentSchemaVersion+1, err)
}
progress.Increment()
}
// reinitialise the database
if err := database.ReInitialise(); err != nil {
return fmt.Errorf("error reinitialising database: %s", err)
}
// optimise the database
progress.ExecuteTask("Optimising database", func() {
err = database.Optimise(ctx)
})
if err != nil {
return fmt.Errorf("error optimising database: %s", err)
}
progress.Increment()
return nil
}

View File

@ -26,7 +26,7 @@ type MigrateBlobsJob struct {
DeleteOld bool
}
func (j *MigrateBlobsJob) Execute(ctx context.Context, progress *job.Progress) {
func (j *MigrateBlobsJob) Execute(ctx context.Context, progress *job.Progress) error {
var (
count int
err error
@ -37,13 +37,12 @@ func (j *MigrateBlobsJob) Execute(ctx context.Context, progress *job.Progress) {
})
if err != nil {
logger.Errorf("Error counting blobs: %s", err.Error())
return
return fmt.Errorf("error counting blobs: %w", err)
}
if count == 0 {
logger.Infof("No blobs to migrate")
return
return nil
}
logger.Infof("Migrating %d blobs", count)
@ -54,12 +53,11 @@ func (j *MigrateBlobsJob) Execute(ctx context.Context, progress *job.Progress) {
if job.IsCancelled(ctx) {
logger.Info("Cancelled migrating blobs")
return
return nil
}
if err != nil {
logger.Errorf("Error migrating blobs: %v", err)
return
return fmt.Errorf("error migrating blobs: %w", err)
}
// run a vacuum to reclaim space
@ -71,6 +69,7 @@ func (j *MigrateBlobsJob) Execute(ctx context.Context, progress *job.Progress) {
})
logger.Infof("Finished migrating blobs")
return nil
}
func (j *MigrateBlobsJob) countBlobs(ctx context.Context) (int, error) {

View File

@ -3,6 +3,7 @@ package task
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
@ -21,7 +22,7 @@ type MigrateSceneScreenshotsJob struct {
TxnManager txn.Manager
}
func (j *MigrateSceneScreenshotsJob) Execute(ctx context.Context, progress *job.Progress) {
func (j *MigrateSceneScreenshotsJob) Execute(ctx context.Context, progress *job.Progress) error {
var err error
progress.ExecuteTask("Counting files", func() {
var count int
@ -30,8 +31,7 @@ func (j *MigrateSceneScreenshotsJob) Execute(ctx context.Context, progress *job.
})
if err != nil {
logger.Errorf("Error counting files: %s", err.Error())
return
return fmt.Errorf("error counting files: %w", err)
}
progress.ExecuteTask("Migrating files", func() {
@ -40,15 +40,15 @@ func (j *MigrateSceneScreenshotsJob) Execute(ctx context.Context, progress *job.
if job.IsCancelled(ctx) {
logger.Info("Cancelled migrating scene screenshots")
return
return nil
}
if err != nil {
logger.Errorf("Error migrating scene screenshots: %v", err)
return
return fmt.Errorf("error migrating scene screenshots: %w", err)
}
logger.Infof("Finished migrating scene screenshots")
return nil
}
func (j *MigrateSceneScreenshotsJob) countFiles(ctx context.Context) (int, error) {

View File

@ -30,13 +30,13 @@ type InstallPackagesJob struct {
Packages []*models.PackageSpecInput
}
func (j *InstallPackagesJob) Execute(ctx context.Context, progress *job.Progress) {
func (j *InstallPackagesJob) Execute(ctx context.Context, progress *job.Progress) error {
progress.SetTotal(len(j.Packages))
for _, p := range j.Packages {
if job.IsCancelled(ctx) {
logger.Info("Cancelled installing packages")
return
return nil
}
logger.Infof("Installing package %s", p.ID)
@ -53,6 +53,7 @@ func (j *InstallPackagesJob) Execute(ctx context.Context, progress *job.Progress
}
logger.Infof("Finished installing packages")
return nil
}
type UpdatePackagesJob struct {
@ -60,13 +61,12 @@ type UpdatePackagesJob struct {
Packages []*models.PackageSpecInput
}
func (j *UpdatePackagesJob) Execute(ctx context.Context, progress *job.Progress) {
func (j *UpdatePackagesJob) Execute(ctx context.Context, progress *job.Progress) error {
// if no packages are specified, update all
if len(j.Packages) == 0 {
installed, err := j.PackageManager.InstalledStatus(ctx)
if err != nil {
logger.Errorf("Error getting installed packages: %v", err)
return
return fmt.Errorf("error getting installed packages: %w", err)
}
for _, p := range installed {
@ -84,7 +84,7 @@ func (j *UpdatePackagesJob) Execute(ctx context.Context, progress *job.Progress)
for _, p := range j.Packages {
if job.IsCancelled(ctx) {
logger.Info("Cancelled updating packages")
return
return nil
}
logger.Infof("Updating package %s", p.ID)
@ -101,6 +101,7 @@ func (j *UpdatePackagesJob) Execute(ctx context.Context, progress *job.Progress)
}
logger.Infof("Finished updating packages")
return nil
}
type UninstallPackagesJob struct {
@ -108,13 +109,13 @@ type UninstallPackagesJob struct {
Packages []*models.PackageSpecInput
}
func (j *UninstallPackagesJob) Execute(ctx context.Context, progress *job.Progress) {
func (j *UninstallPackagesJob) Execute(ctx context.Context, progress *job.Progress) error {
progress.SetTotal(len(j.Packages))
for _, p := range j.Packages {
if job.IsCancelled(ctx) {
logger.Info("Cancelled installing packages")
return
return nil
}
logger.Infof("Uninstalling package %s", p.ID)
@ -131,4 +132,5 @@ func (j *UninstallPackagesJob) Execute(ctx context.Context, progress *job.Progre
}
logger.Infof("Finished uninstalling packages")
return nil
}

View File

@ -25,7 +25,7 @@ type autoTagJob struct {
cache match.Cache
}
func (j *autoTagJob) Execute(ctx context.Context, progress *job.Progress) {
func (j *autoTagJob) Execute(ctx context.Context, progress *job.Progress) error {
begin := time.Now()
input := j.input
@ -38,6 +38,7 @@ func (j *autoTagJob) Execute(ctx context.Context, progress *job.Progress) {
}
logger.Infof("Finished auto-tag after %s", time.Since(begin).String())
return nil
}
func (j *autoTagJob) isFileBasedAutoTag(input AutoTagMetadataInput) bool {

View File

@ -32,7 +32,7 @@ type cleanJob struct {
scanSubs *subscriptionManager
}
func (j *cleanJob) Execute(ctx context.Context, progress *job.Progress) {
func (j *cleanJob) Execute(ctx context.Context, progress *job.Progress) error {
logger.Infof("Starting cleaning of tracked files")
start := time.Now()
if j.input.DryRun {
@ -47,7 +47,7 @@ func (j *cleanJob) Execute(ctx context.Context, progress *job.Progress) {
if job.IsCancelled(ctx) {
logger.Info("Stopping due to user request")
return
return nil
}
j.cleanEmptyGalleries(ctx)
@ -55,6 +55,7 @@ func (j *cleanJob) Execute(ctx context.Context, progress *job.Progress) {
j.scanSubs.notify()
elapsed := time.Since(start)
logger.Info(fmt.Sprintf("Finished Cleaning (%s)", elapsed))
return nil
}
func (j *cleanJob) cleanEmptyGalleries(ctx context.Context) {

View File

@ -80,7 +80,7 @@ type totalsGenerate struct {
tasks int
}
func (j *GenerateJob) Execute(ctx context.Context, progress *job.Progress) {
func (j *GenerateJob) Execute(ctx context.Context, progress *job.Progress) error {
var scenes []*models.Scene
var err error
var markers []*models.SceneMarker
@ -223,11 +223,12 @@ func (j *GenerateJob) Execute(ctx context.Context, progress *job.Progress) {
if job.IsCancelled(ctx) {
logger.Info("Stopping due to user request")
return
return nil
}
elapsed := time.Since(start)
logger.Info(fmt.Sprintf("Generate finished (%s)", elapsed))
return nil
}
func (j *GenerateJob) queueTasks(ctx context.Context, g *generate.Generator, queue chan<- Task) {

View File

@ -34,18 +34,17 @@ func CreateIdentifyJob(input identify.Options) *IdentifyJob {
}
}
func (j *IdentifyJob) Execute(ctx context.Context, progress *job.Progress) {
func (j *IdentifyJob) Execute(ctx context.Context, progress *job.Progress) error {
j.progress = progress
// if no sources provided - just return
if len(j.input.Sources) == 0 {
return
return nil
}
sources, err := j.getSources()
if err != nil {
logger.Error(err)
return
return err
}
// if scene ids provided, use those
@ -84,8 +83,10 @@ func (j *IdentifyJob) Execute(ctx context.Context, progress *job.Progress) {
return nil
}); err != nil {
logger.Errorf("Error encountered while identifying scenes: %v", err)
return fmt.Errorf("error encountered while identifying scenes: %w", err)
}
return nil
}
func (j *IdentifyJob) identifyAllScenes(ctx context.Context, sources []identify.ScraperSource) error {

View File

@ -2,6 +2,7 @@ package manager
import (
"context"
"fmt"
"time"
"github.com/stashapp/stash/pkg/job"
@ -17,7 +18,7 @@ type OptimiseDatabaseJob struct {
Optimiser Optimiser
}
func (j *OptimiseDatabaseJob) Execute(ctx context.Context, progress *job.Progress) {
func (j *OptimiseDatabaseJob) Execute(ctx context.Context, progress *job.Progress) error {
logger.Info("Optimising database")
progress.SetTotal(2)
@ -31,11 +32,10 @@ func (j *OptimiseDatabaseJob) Execute(ctx context.Context, progress *job.Progres
})
if job.IsCancelled(ctx) {
logger.Info("Stopping due to user request")
return
return nil
}
if err != nil {
logger.Errorf("Error analyzing database: %v", err)
return
return fmt.Errorf("Error analyzing database: %w", err)
}
progress.ExecuteTask("Vacuuming database", func() {
@ -44,13 +44,13 @@ func (j *OptimiseDatabaseJob) Execute(ctx context.Context, progress *job.Progres
})
if job.IsCancelled(ctx) {
logger.Info("Stopping due to user request")
return
return nil
}
if err != nil {
logger.Errorf("Error vacuuming database: %v", err)
return
return fmt.Errorf("error vacuuming database: %w", err)
}
elapsed := time.Since(start)
logger.Infof("Finished optimising database after %s", elapsed)
return nil
}

View File

@ -16,18 +16,16 @@ func (s *Manager) RunPluginTask(
description *string,
args plugin.OperationInput,
) int {
j := job.MakeJobExec(func(jobCtx context.Context, progress *job.Progress) {
j := job.MakeJobExec(func(jobCtx context.Context, progress *job.Progress) error {
pluginProgress := make(chan float64)
task, err := s.PluginCache.CreateTask(ctx, pluginID, taskName, args, pluginProgress)
if err != nil {
logger.Errorf("Error creating plugin task: %s", err.Error())
return
return fmt.Errorf("Error creating plugin task: %w", err)
}
err = task.Start()
if err != nil {
logger.Errorf("Error running plugin task: %s", err.Error())
return
return fmt.Errorf("Error running plugin task: %w", err)
}
done := make(chan bool)
@ -50,14 +48,14 @@ func (s *Manager) RunPluginTask(
for {
select {
case <-done:
return
return nil
case p := <-pluginProgress:
progress.SetPercent(p)
case <-jobCtx.Done():
if err := task.Stop(); err != nil {
logger.Errorf("Error stopping plugin operation: %s", err.Error())
}
return
return nil
}
}
})

View File

@ -34,12 +34,12 @@ type ScanJob struct {
subscriptions *subscriptionManager
}
func (j *ScanJob) Execute(ctx context.Context, progress *job.Progress) {
func (j *ScanJob) Execute(ctx context.Context, progress *job.Progress) error {
input := j.input
if job.IsCancelled(ctx) {
logger.Info("Stopping due to user request")
return
return nil
}
sp := getScanPaths(input.Paths)
@ -74,13 +74,14 @@ func (j *ScanJob) Execute(ctx context.Context, progress *job.Progress) {
if job.IsCancelled(ctx) {
logger.Info("Stopping due to user request")
return
return nil
}
elapsed := time.Since(start)
logger.Info(fmt.Sprintf("Scan finished (%s)", elapsed))
j.subscriptions.notify()
return nil
}
type extensionConfig struct {

View File

@ -5,22 +5,24 @@ import (
"time"
)
type JobExecFn func(ctx context.Context, progress *Progress) error
// JobExec represents the implementation of a Job to be executed.
type JobExec interface {
Execute(ctx context.Context, progress *Progress)
Execute(ctx context.Context, progress *Progress) error
}
type jobExecImpl struct {
fn func(ctx context.Context, progress *Progress)
fn JobExecFn
}
func (j *jobExecImpl) Execute(ctx context.Context, progress *Progress) {
j.fn(ctx, progress)
func (j *jobExecImpl) Execute(ctx context.Context, progress *Progress) error {
return j.fn(ctx, progress)
}
// MakeJobExec returns a simple JobExec implementation using the provided
// function.
func MakeJobExec(fn func(ctx context.Context, progress *Progress)) JobExec {
func MakeJobExec(fn JobExecFn) JobExec {
return &jobExecImpl{
fn: fn,
}
@ -56,6 +58,7 @@ type Job struct {
StartTime *time.Time
EndTime *time.Time
AddTime time.Time
Error *string
outerCtx context.Context
exec JobExec
@ -87,6 +90,12 @@ func (j *Job) cancel() {
}
}
func (j *Job) error(err error) {
errStr := err.Error()
j.Error = &errStr
j.Status = StatusFailed
}
// IsCancelled returns true if cancel has been called on the context.
func IsCancelled(ctx context.Context) bool {
select {

View File

@ -206,7 +206,10 @@ func (m *Manager) executeJob(ctx context.Context, j *Job, done chan struct{}) {
}()
progress := m.newProgress(j)
j.exec.Execute(ctx, progress)
if err := j.exec.Execute(ctx, progress); err != nil {
logger.Errorf("task failed due to error: %v", err)
j.error(err)
}
}
func (m *Manager) onJobFinish(job *Job) {

View File

@ -24,7 +24,7 @@ func newTestExec(finish chan struct{}) *testExec {
}
}
func (e *testExec) Execute(ctx context.Context, p *Progress) {
func (e *testExec) Execute(ctx context.Context, p *Progress) error {
e.progress = p
close(e.started)
@ -38,6 +38,8 @@ func (e *testExec) Execute(ctx context.Context, p *Progress) {
// fall through
}
}
return nil
}
func TestAdd(t *testing.T) {

View File

@ -10,9 +10,6 @@ import (
"path/filepath"
"time"
"github.com/golang-migrate/migrate/v4"
sqlite3mig "github.com/golang-migrate/migrate/v4/database/sqlite3"
"github.com/golang-migrate/migrate/v4/source/iofs"
"github.com/jmoiron/sqlx"
"github.com/stashapp/stash/pkg/fsutil"
@ -144,7 +141,7 @@ func (db *Database) Open(dbPath string) error {
if databaseSchemaVersion == 0 {
// new database, just run the migrations
if err := db.RunMigrations(); err != nil {
if err := db.RunAllMigrations(); err != nil {
return fmt.Errorf("error running initial schema migrations: %w", err)
}
} else {
@ -312,11 +309,6 @@ func (db *Database) RestoreFromBackup(backupPath string) error {
return os.Rename(backupPath, db.dbPath)
}
// Migrate the database
func (db *Database) needsMigration() bool {
return db.schemaVersion != appSchemaVersion
}
func (db *Database) AppSchemaVersion() uint {
return appSchemaVersion
}
@ -349,100 +341,6 @@ func (db *Database) Version() uint {
return db.schemaVersion
}
func (db *Database) getMigrate() (*migrate.Migrate, error) {
migrations, err := iofs.New(migrationsBox, "migrations")
if err != nil {
return nil, err
}
const disableForeignKeys = true
conn, err := db.open(disableForeignKeys)
if err != nil {
return nil, err
}
driver, err := sqlite3mig.WithInstance(conn.DB, &sqlite3mig.Config{})
if err != nil {
return nil, err
}
// use sqlite3Driver so that migration has access to durationToTinyInt
return migrate.NewWithInstance(
"iofs",
migrations,
db.dbPath,
driver,
)
}
func (db *Database) getDatabaseSchemaVersion() (uint, error) {
m, err := db.getMigrate()
if err != nil {
return 0, err
}
defer m.Close()
ret, _, _ := m.Version()
return ret, nil
}
// Migrate the database
func (db *Database) RunMigrations() error {
ctx := context.Background()
m, err := db.getMigrate()
if err != nil {
return err
}
defer m.Close()
databaseSchemaVersion, _, _ := m.Version()
stepNumber := appSchemaVersion - databaseSchemaVersion
if stepNumber != 0 {
logger.Infof("Migrating database from version %d to %d", databaseSchemaVersion, appSchemaVersion)
// run each migration individually, and run custom migrations as needed
var i uint = 1
for ; i <= stepNumber; i++ {
newVersion := databaseSchemaVersion + i
// run pre migrations as needed
if err := db.runCustomMigrations(ctx, preMigrations[newVersion]); err != nil {
return fmt.Errorf("running pre migrations for schema version %d: %w", newVersion, err)
}
err = m.Steps(1)
if err != nil {
// migration failed
return err
}
// run post migrations as needed
if err := db.runCustomMigrations(ctx, postMigrations[newVersion]); err != nil {
return fmt.Errorf("running post migrations for schema version %d: %w", newVersion, err)
}
}
}
// update the schema version
db.schemaVersion, _, _ = m.Version()
// re-initialise the database
const disableForeignKeys = false
db.db, err = db.open(disableForeignKeys)
if err != nil {
return fmt.Errorf("re-initializing the database: %w", err)
}
// optimize database after migration
err = db.Optimise(ctx)
if err != nil {
logger.Warnf("error while performing post-migration optimisation: %v", err)
}
return nil
}
func (db *Database) Optimise(ctx context.Context) error {
logger.Info("Optimising database")
@ -524,28 +422,3 @@ func (db *Database) QuerySQL(ctx context.Context, query string, args []interface
return cols, ret, nil
}
func (db *Database) runCustomMigrations(ctx context.Context, fns []customMigrationFunc) error {
for _, fn := range fns {
if err := db.runCustomMigration(ctx, fn); err != nil {
return err
}
}
return nil
}
func (db *Database) runCustomMigration(ctx context.Context, fn customMigrationFunc) error {
const disableForeignKeys = false
d, err := db.open(disableForeignKeys)
if err != nil {
return err
}
defer d.Close()
if err := fn(ctx, d); err != nil {
return err
}
return nil
}

188
pkg/sqlite/migrate.go Normal file
View File

@ -0,0 +1,188 @@
package sqlite
import (
"context"
"fmt"
"github.com/golang-migrate/migrate/v4"
sqlite3mig "github.com/golang-migrate/migrate/v4/database/sqlite3"
"github.com/golang-migrate/migrate/v4/source/iofs"
"github.com/stashapp/stash/pkg/logger"
)
func (db *Database) needsMigration() bool {
return db.schemaVersion != appSchemaVersion
}
type Migrator struct {
db *Database
m *migrate.Migrate
}
func NewMigrator(db *Database) (*Migrator, error) {
m := &Migrator{
db: db,
}
var err error
m.m, err = m.getMigrate()
return m, err
}
func (m *Migrator) Close() {
if m.m != nil {
m.m.Close()
m.m = nil
}
}
func (m *Migrator) CurrentSchemaVersion() uint {
databaseSchemaVersion, _, _ := m.m.Version()
return databaseSchemaVersion
}
func (m *Migrator) RequiredSchemaVersion() uint {
return appSchemaVersion
}
func (m *Migrator) getMigrate() (*migrate.Migrate, error) {
migrations, err := iofs.New(migrationsBox, "migrations")
if err != nil {
return nil, err
}
const disableForeignKeys = true
conn, err := m.db.open(disableForeignKeys)
if err != nil {
return nil, err
}
driver, err := sqlite3mig.WithInstance(conn.DB, &sqlite3mig.Config{})
if err != nil {
return nil, err
}
// use sqlite3Driver so that migration has access to durationToTinyInt
return migrate.NewWithInstance(
"iofs",
migrations,
m.db.dbPath,
driver,
)
}
func (m *Migrator) RunMigration(ctx context.Context, newVersion uint) error {
databaseSchemaVersion, _, _ := m.m.Version()
if newVersion != databaseSchemaVersion+1 {
return fmt.Errorf("invalid migration version %d, expected %d", newVersion, databaseSchemaVersion+1)
}
// run pre migrations as needed
if err := m.runCustomMigrations(ctx, preMigrations[newVersion]); err != nil {
return fmt.Errorf("running pre migrations for schema version %d: %w", newVersion, err)
}
if err := m.m.Steps(1); err != nil {
// migration failed
return err
}
// run post migrations as needed
if err := m.runCustomMigrations(ctx, postMigrations[newVersion]); err != nil {
return fmt.Errorf("running post migrations for schema version %d: %w", newVersion, err)
}
// update the schema version
m.db.schemaVersion, _, _ = m.m.Version()
return nil
}
func (m *Migrator) runCustomMigrations(ctx context.Context, fns []customMigrationFunc) error {
for _, fn := range fns {
if err := m.runCustomMigration(ctx, fn); err != nil {
return err
}
}
return nil
}
func (m *Migrator) runCustomMigration(ctx context.Context, fn customMigrationFunc) error {
const disableForeignKeys = false
d, err := m.db.open(disableForeignKeys)
if err != nil {
return err
}
defer d.Close()
if err := fn(ctx, d); err != nil {
return err
}
return nil
}
func (db *Database) getDatabaseSchemaVersion() (uint, error) {
m, err := NewMigrator(db)
if err != nil {
return 0, err
}
defer m.Close()
ret, _, _ := m.m.Version()
return ret, nil
}
func (db *Database) ReInitialise() error {
const disableForeignKeys = false
var err error
db.db, err = db.open(disableForeignKeys)
if err != nil {
return fmt.Errorf("re-initializing the database: %w", err)
}
return nil
}
// RunAllMigrations runs all migrations to bring the database up to the current schema version
func (db *Database) RunAllMigrations() error {
ctx := context.Background()
m, err := NewMigrator(db)
if err != nil {
return err
}
defer m.Close()
databaseSchemaVersion, _, _ := m.m.Version()
stepNumber := appSchemaVersion - databaseSchemaVersion
if stepNumber != 0 {
logger.Infof("Migrating database from version %d to %d", databaseSchemaVersion, appSchemaVersion)
// run each migration individually, and run custom migrations as needed
var i uint = 1
for ; i <= stepNumber; i++ {
newVersion := databaseSchemaVersion + i
if err := m.RunMigration(ctx, newVersion); err != nil {
return err
}
}
}
// re-initialise the database
const disableForeignKeys = false
db.db, err = db.open(disableForeignKeys)
if err != nil {
return fmt.Errorf("re-initializing the database: %w", err)
}
// optimize database after migration
err = db.Optimise(ctx)
if err != nil {
logger.Warnf("error while performing post-migration optimisation: %v", err)
}
return nil
}

View File

@ -7,4 +7,5 @@ fragment JobData on Job {
startTime
endTime
addTime
error
}

View File

@ -7,6 +7,7 @@ subscription JobsSubscribe {
subTasks
description
progress
error
}
}
}

View File

@ -49,6 +49,7 @@ import { PluginRoutes } from "./plugins";
// import plugin_api to run code
import "./pluginApi";
import { ConnectionMonitor } from "./ConnectionMonitor";
const Performers = lazyComponent(
() => import("./components/Performers/Performers")
@ -369,6 +370,7 @@ export const App: React.FC = () => {
>
{maybeRenderReleaseNotes()}
<ToastProvider>
<ConnectionMonitor />
<Suspense fallback={<LoadingIndicator />}>
<LightboxProvider>
<ManualProvider>

View File

@ -0,0 +1,34 @@
import { useEffect, useState } from "react";
import { getWSClient, useWSState } from "./core/StashService";
import { useToast } from "./hooks/Toast";
import { useIntl } from "react-intl";
export const ConnectionMonitor: React.FC = () => {
const Toast = useToast();
const intl = useIntl();
const { state } = useWSState(getWSClient());
const [cachedState, setCacheState] = useState<typeof state>(state);
useEffect(() => {
if (cachedState === "connecting" && state === "error") {
Toast.error(
intl.formatMessage({
id: "connection_monitor.websocket_connection_failed",
})
);
}
if (state === "connected" && cachedState === "error") {
Toast.success(
intl.formatMessage({
id: "connection_monitor.websocket_connection_reestablished",
})
);
}
setCacheState(state);
}, [state, cachedState, Toast, intl]);
return null;
};

View File

@ -12,6 +12,7 @@ import {
faBan,
faCheck,
faCircle,
faCircleExclamation,
faCog,
faHourglassStart,
faTimes,
@ -19,7 +20,7 @@ import {
type JobFragment = Pick<
GQL.Job,
"id" | "status" | "subTasks" | "description" | "progress"
"id" | "status" | "subTasks" | "description" | "progress" | "error"
>;
interface IJob {
@ -37,6 +38,7 @@ const Task: React.FC<IJob> = ({ job }) => {
useEffect(() => {
if (
job.status === GQL.JobStatus.Cancelled ||
job.status === GQL.JobStatus.Failed ||
job.status === GQL.JobStatus.Finished
) {
// fade out around 10 seconds
@ -71,6 +73,8 @@ const Task: React.FC<IJob> = ({ job }) => {
return "finished";
case GQL.JobStatus.Cancelled:
return "cancelled";
case GQL.JobStatus.Failed:
return "failed";
}
}
@ -95,6 +99,9 @@ const Task: React.FC<IJob> = ({ job }) => {
case GQL.JobStatus.Cancelled:
icon = faBan;
break;
case GQL.JobStatus.Failed:
icon = faCircleExclamation;
break;
}
return <Icon icon={icon} className={`fa-fw ${iconClass}`} />;
@ -134,6 +141,10 @@ const Task: React.FC<IJob> = ({ job }) => {
</div>
);
}
if (job.status === GQL.JobStatus.Failed && job.error) {
return <div className="job-error">{job.error}</div>;
}
}
return (

View File

@ -296,6 +296,10 @@
color: $success;
}
.failed .fa-icon {
color: $danger;
}
.ready .fa-icon {
color: $warning;
}
@ -304,6 +308,10 @@
.finished {
color: $text-muted;
}
.job-error {
color: $danger;
}
}
#temp-enable-duration .duration-control:disabled {

View File

@ -1,22 +1,42 @@
import React, { useEffect, useMemo, useState } from "react";
import { Button, Card, Container, Form } from "react-bootstrap";
import { Button, Card, Container, Form, ProgressBar } from "react-bootstrap";
import { useIntl, FormattedMessage } from "react-intl";
import { useHistory } from "react-router-dom";
import * as GQL from "src/core/generated-graphql";
import { useSystemStatus, mutateMigrate } from "src/core/StashService";
import {
useSystemStatus,
mutateMigrate,
postMigrate,
} from "src/core/StashService";
import { migrationNotes } from "src/docs/en/MigrationNotes";
import { ExternalLink } from "../Shared/ExternalLink";
import { LoadingIndicator } from "../Shared/LoadingIndicator";
import { MarkdownPage } from "../Shared/MarkdownPage";
import { useMonitorJob } from "src/utils/job";
export const Migrate: React.FC = () => {
const intl = useIntl();
const history = useHistory();
const { data: systemStatus, loading } = useSystemStatus();
const [backupPath, setBackupPath] = useState<string | undefined>();
const [migrateLoading, setMigrateLoading] = useState(false);
const [migrateError, setMigrateError] = useState("");
const intl = useIntl();
const history = useHistory();
const [jobID, setJobID] = useState<string | undefined>();
const { job } = useMonitorJob(jobID, (finishedJob) => {
setJobID(undefined);
setMigrateLoading(false);
if (finishedJob?.error) {
setMigrateError(finishedJob.error);
} else {
postMigrate();
history.push("/");
}
});
// if database path includes path separators, then this is passed through
// to the migration path. Extract the base name of the database file.
@ -94,10 +114,32 @@ export const Migrate: React.FC = () => {
}
if (migrateLoading) {
const progress =
job && job.progress !== undefined && job.progress !== null
? job.progress * 100
: undefined;
return (
<LoadingIndicator
message={intl.formatMessage({ id: "setup.migrate.migrating_database" })}
/>
<div className="migrate-loading-status">
<h4>
<LoadingIndicator inline small message="" />
<span>
<FormattedMessage id="setup.migrate.migrating_database" />
</span>
</h4>
{progress !== undefined && (
<ProgressBar
animated
now={progress}
label={`${progress.toFixed(0)}%`}
/>
)}
{job?.subTasks?.map((subTask, i) => (
<div key={i}>
<p>{subTask}</p>
</div>
))}
</div>
);
}
@ -113,11 +155,13 @@ export const Migrate: React.FC = () => {
try {
setMigrateLoading(true);
setMigrateError("");
await mutateMigrate({
// migrate now uses the job manager
const ret = await mutateMigrate({
backupPath: backupPath ?? "",
});
history.push("/");
setJobID(ret.data?.migrate);
} catch (e) {
if (e instanceof Error) setMigrateError(e.message ?? e.toString());
setMigrateLoading(false);

View File

@ -7,3 +7,20 @@
padding: 16px;
}
}
.migrate-loading-status {
align-items: center;
display: flex;
flex-direction: column;
height: 70vh;
justify-content: center;
width: 100%;
.progress {
width: 60%;
}
h4 span {
margin-left: 0.5rem;
}
}

View File

@ -15,10 +15,36 @@ import { ListFilterModel } from "../models/list-filter/filter";
import * as GQL from "./generated-graphql";
import { createClient } from "./createClient";
import { Client } from "graphql-ws";
import { useEffect, useState } from "react";
const { client } = createClient();
const { client, wsClient, cache: clientCache } = createClient();
export const getClient = () => client;
export const getWSClient = () => wsClient;
export function useWSState(ws: Client) {
const [state, setState] = useState<"connecting" | "connected" | "error">(
"connecting"
);
useEffect(() => {
const disposeConnected = ws.on("connected", () => {
setState("connected");
});
const disposeError = ws.on("error", () => {
setState("error");
});
return () => {
disposeConnected();
disposeError();
};
}, [ws]);
return { state };
}
// Evicts cached results for the given queries.
// Will also call a cache GC afterwards.
@ -2382,13 +2408,14 @@ export const mutateMigrate = (input: GQL.MigrateInput) =>
client.mutate<GQL.MigrateMutation>({
mutation: GQL.MigrateDocument,
variables: { input },
update(cache, result) {
if (!result.data?.migrate) return;
evictQueries(cache, setupMutationImpactedQueries);
},
});
// migrate now runs asynchronously, so we need to evict queries
// once it successfully completes
export function postMigrate() {
evictQueries(clientCache, setupMutationImpactedQueries);
}
/// Packages
// Acts like GQL.useInstalledScraperPackagesStatusQuery if loadUpgrades is true,

View File

@ -144,15 +144,15 @@ export const createClient = () => {
const httpLink = createUploadLink({ uri: url.toString() });
const wsLink = new GraphQLWsLink(
createWSClient({
url: wsUrl.toString(),
retryAttempts: Infinity,
shouldRetry() {
return true;
},
})
);
const wsClient = createWSClient({
url: wsUrl.toString(),
retryAttempts: Infinity,
shouldRetry() {
return true;
},
});
const wsLink = new GraphQLWsLink(wsClient);
const errorLink = onError(({ networkError }) => {
// handle graphql unauthorized error
@ -211,5 +211,6 @@ Please disable it on the server and refresh the page.`);
return {
cache,
client,
wsClient,
};
};

View File

@ -776,6 +776,10 @@
}
},
"configuration": "Configuration",
"connection_monitor": {
"websocket_connection_failed": "Unable to make websocket connection: see browser console for details",
"websocket_connection_reestablished": "Websocket connection re-established"
},
"countables": {
"files": "{count, plural, one {File} other {Files}}",
"galleries": "{count, plural, one {Gallery} other {Galleries}}",

View File

@ -1,24 +1,36 @@
import { useEffect, useState } from "react";
import { getWSClient, useWSState } from "src/core/StashService";
import {
Job,
JobStatus,
JobStatusUpdateType,
useJobQueueQuery,
useFindJobQuery,
useJobsSubscribeSubscription,
} from "src/core/generated-graphql";
export type JobFragment = Pick<
Job,
"id" | "status" | "subTasks" | "description" | "progress"
"id" | "status" | "subTasks" | "description" | "progress" | "error"
>;
export const useMonitorJob = (
jobID: string | undefined | null,
onComplete?: () => void
onComplete?: (job?: JobFragment) => void
) => {
const { state } = useWSState(getWSClient());
const jobsSubscribe = useJobsSubscribeSubscription({
skip: !jobID,
});
const { data: jobData, loading } = useJobQueueQuery({
const {
data: jobData,
loading,
startPolling,
stopPolling,
} = useFindJobQuery({
variables: {
input: { id: jobID ?? "" },
},
fetchPolicy: "network-only",
skip: !jobID,
});
@ -34,19 +46,26 @@ export const useMonitorJob = (
return;
}
const j = jobData?.jobQueue?.find((jj) => jj.id === jobID);
const j = jobData?.findJob;
if (j) {
setJob(j);
if (
j.status === JobStatus.Finished ||
j.status === JobStatus.Failed ||
j.status === JobStatus.Cancelled
) {
setJob(undefined);
onComplete?.(j);
}
} else {
// must've already finished
setJob(undefined);
if (onComplete) {
onComplete();
}
onComplete?.();
}
}, [jobID, jobData, loading, onComplete]);
// monitor batch operation
// monitor job
useEffect(() => {
if (!jobID) {
return;
@ -65,11 +84,25 @@ export const useMonitorJob = (
setJob(event.job);
} else {
setJob(undefined);
if (onComplete) {
onComplete();
}
onComplete?.(event.job);
}
}, [jobsSubscribe, jobID, onComplete]);
// it's possible that the websocket connection isn't present
// in that case, we'll just poll the server
useEffect(() => {
if (!jobID) {
stopPolling();
return;
}
if (state === "connected") {
stopPolling();
} else {
const defaultPollInterval = 1000;
startPolling(defaultPollInterval);
}
}, [jobID, state, startPolling, stopPolling]);
return { job };
};