stash/internal/manager/task/migrate_blobs.go

130 lines
2.8 KiB
Go
Raw Normal View History

package task
import (
"context"
"fmt"
"github.com/stashapp/stash/pkg/job"
"github.com/stashapp/stash/pkg/logger"
"github.com/stashapp/stash/pkg/txn"
)
type BlobStoreMigrator interface {
Count(ctx context.Context) (int, error)
FindBlobs(ctx context.Context, n uint, lastChecksum string) ([]string, error)
MigrateBlob(ctx context.Context, checksum string, deleteOld bool) error
}
type Vacuumer interface {
Vacuum(ctx context.Context) error
}
type MigrateBlobsJob struct {
TxnManager txn.Manager
BlobStore BlobStoreMigrator
Vacuumer Vacuumer
DeleteOld bool
}
func (j *MigrateBlobsJob) Execute(ctx context.Context, progress *job.Progress) {
var (
count int
err error
)
progress.ExecuteTask("Counting blobs", func() {
count, err = j.countBlobs(ctx)
progress.SetTotal(count)
})
if err != nil {
logger.Errorf("Error counting blobs: %s", err.Error())
return
}
if count == 0 {
logger.Infof("No blobs to migrate")
return
}
logger.Infof("Migrating %d blobs", count)
progress.ExecuteTask(fmt.Sprintf("Migrating %d blobs", count), func() {
err = j.migrateBlobs(ctx, progress)
})
if job.IsCancelled(ctx) {
logger.Info("Cancelled migrating blobs")
return
}
if err != nil {
logger.Errorf("Error migrating blobs: %v", err)
return
}
// run a vacuum to reclaim space
progress.ExecuteTask("Vacuuming database", func() {
err = j.Vacuumer.Vacuum(ctx)
if err != nil {
logger.Errorf("Error vacuuming database: %v", err)
}
})
logger.Infof("Finished migrating blobs")
}
func (j *MigrateBlobsJob) countBlobs(ctx context.Context) (int, error) {
var count int
if err := txn.WithReadTxn(ctx, j.TxnManager, func(ctx context.Context) error {
var err error
count, err = j.BlobStore.Count(ctx)
return err
}); err != nil {
return 0, err
}
return count, nil
}
func (j *MigrateBlobsJob) migrateBlobs(ctx context.Context, progress *job.Progress) error {
lastChecksum := ""
batch, err := j.getBatch(ctx, lastChecksum)
for len(batch) > 0 && err == nil && ctx.Err() == nil {
for _, checksum := range batch {
if ctx.Err() != nil {
return nil
}
lastChecksum = checksum
progress.ExecuteTask("Migrating blob "+checksum, func() {
defer progress.Increment()
if err := txn.WithTxn(ctx, j.TxnManager, func(ctx context.Context) error {
return j.BlobStore.MigrateBlob(ctx, checksum, j.DeleteOld)
}); err != nil {
logger.Errorf("Error migrating blob %s: %v", checksum, err)
}
})
}
batch, err = j.getBatch(ctx, lastChecksum)
}
return err
}
func (j *MigrateBlobsJob) getBatch(ctx context.Context, lastChecksum string) ([]string, error) {
const batchSize = 1000
var batch []string
err := txn.WithReadTxn(ctx, j.TxnManager, func(ctx context.Context) error {
var err error
batch, err = j.BlobStore.FindBlobs(ctx, batchSize, lastChecksum)
return err
})
return batch, err
}