mirror of https://github.com/stashapp/stash.git
415 lines
7.9 KiB
Go
415 lines
7.9 KiB
Go
package job
|
|
|
|
import (
|
|
"context"
|
|
"runtime/debug"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/stashapp/stash/pkg/logger"
|
|
"github.com/stashapp/stash/pkg/utils"
|
|
)
|
|
|
|
const maxGraveyardSize = 10
|
|
const defaultThrottleLimit = 100 * time.Millisecond
|
|
|
|
// Manager maintains a queue of jobs. Jobs are executed one at a time.
|
|
type Manager struct {
|
|
queue []*Job
|
|
graveyard []*Job
|
|
|
|
mutex sync.Mutex
|
|
notEmpty *sync.Cond
|
|
stop chan struct{}
|
|
|
|
lastID int
|
|
|
|
subscriptions []*ManagerSubscription
|
|
updateThrottleLimit time.Duration
|
|
}
|
|
|
|
// NewManager initialises and returns a new Manager.
|
|
func NewManager() *Manager {
|
|
ret := &Manager{
|
|
stop: make(chan struct{}),
|
|
updateThrottleLimit: defaultThrottleLimit,
|
|
}
|
|
|
|
ret.notEmpty = sync.NewCond(&ret.mutex)
|
|
|
|
go ret.dispatcher()
|
|
|
|
return ret
|
|
}
|
|
|
|
// Stop is used to stop the dispatcher thread. Once Stop is called, no
|
|
// more Jobs will be processed.
|
|
func (m *Manager) Stop() {
|
|
m.CancelAll()
|
|
close(m.stop)
|
|
}
|
|
|
|
// Add queues a job.
|
|
func (m *Manager) Add(ctx context.Context, description string, e JobExec) int {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
t := time.Now()
|
|
|
|
j := Job{
|
|
ID: m.nextID(),
|
|
Status: StatusReady,
|
|
Description: description,
|
|
AddTime: t,
|
|
exec: e,
|
|
outerCtx: ctx,
|
|
}
|
|
|
|
m.queue = append(m.queue, &j)
|
|
|
|
if len(m.queue) == 1 {
|
|
// notify that there is now a job in the queue
|
|
m.notEmpty.Broadcast()
|
|
}
|
|
|
|
m.notifyNewJob(&j)
|
|
|
|
return j.ID
|
|
}
|
|
|
|
// Start adds a job and starts it immediately, concurrently with any other
|
|
// jobs.
|
|
func (m *Manager) Start(ctx context.Context, description string, e JobExec) int {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
t := time.Now()
|
|
|
|
j := Job{
|
|
ID: m.nextID(),
|
|
Status: StatusReady,
|
|
Description: description,
|
|
AddTime: t,
|
|
exec: e,
|
|
outerCtx: ctx,
|
|
}
|
|
|
|
m.queue = append(m.queue, &j)
|
|
|
|
m.dispatch(ctx, &j)
|
|
|
|
return j.ID
|
|
}
|
|
|
|
func (m *Manager) notifyNewJob(j *Job) {
|
|
// assumes lock held
|
|
for _, s := range m.subscriptions {
|
|
// don't block if channel is full
|
|
select {
|
|
case s.newJob <- *j:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) nextID() int {
|
|
m.lastID++
|
|
return m.lastID
|
|
}
|
|
|
|
func (m *Manager) getReadyJob() *Job {
|
|
// assumes lock held
|
|
for _, j := range m.queue {
|
|
if j.Status == StatusReady {
|
|
return j
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *Manager) dispatcher() {
|
|
m.mutex.Lock()
|
|
|
|
for {
|
|
// wait until we have something to process
|
|
j := m.getReadyJob()
|
|
|
|
for j == nil {
|
|
m.notEmpty.Wait()
|
|
|
|
// it's possible that we have been stopped - check here
|
|
select {
|
|
case <-m.stop:
|
|
m.mutex.Unlock()
|
|
return
|
|
default:
|
|
// keep going
|
|
j = m.getReadyJob()
|
|
}
|
|
}
|
|
|
|
done := m.dispatch(j.outerCtx, j)
|
|
|
|
// unlock the mutex and wait for the job to finish
|
|
m.mutex.Unlock()
|
|
<-done
|
|
m.mutex.Lock()
|
|
|
|
// remove the job from the queue
|
|
m.removeJob(j)
|
|
|
|
// process next job
|
|
}
|
|
}
|
|
|
|
func (m *Manager) newProgress(j *Job) *Progress {
|
|
return &Progress{
|
|
updater: &updater{
|
|
m: m,
|
|
job: j,
|
|
},
|
|
percent: ProgressIndefinite,
|
|
}
|
|
}
|
|
|
|
func (m *Manager) dispatch(ctx context.Context, j *Job) (done chan struct{}) {
|
|
// assumes lock held
|
|
t := time.Now()
|
|
j.StartTime = &t
|
|
j.Status = StatusRunning
|
|
|
|
ctx, cancelFunc := context.WithCancel(utils.ValueOnlyContext{Context: ctx})
|
|
j.cancelFunc = cancelFunc
|
|
|
|
done = make(chan struct{})
|
|
go m.executeJob(ctx, j, done)
|
|
|
|
m.notifyJobUpdate(j)
|
|
|
|
return
|
|
}
|
|
|
|
func (m *Manager) executeJob(ctx context.Context, j *Job, done chan struct{}) {
|
|
defer close(done)
|
|
defer m.onJobFinish(j)
|
|
defer func() {
|
|
if p := recover(); p != nil {
|
|
// a panic occurred, log and mark the job as failed
|
|
logger.Errorf("panic in job %d - %s: %v", j.ID, j.Description, p)
|
|
logger.Error(string(debug.Stack()))
|
|
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
j.Status = StatusFailed
|
|
}
|
|
}()
|
|
|
|
progress := m.newProgress(j)
|
|
if err := j.exec.Execute(ctx, progress); err != nil {
|
|
logger.Errorf("task failed due to error: %v", err)
|
|
j.error(err)
|
|
}
|
|
}
|
|
|
|
func (m *Manager) onJobFinish(job *Job) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
if job.Status == StatusStopping {
|
|
job.Status = StatusCancelled
|
|
} else if job.Status != StatusFailed {
|
|
job.Status = StatusFinished
|
|
}
|
|
t := time.Now()
|
|
job.EndTime = &t
|
|
}
|
|
|
|
func (m *Manager) removeJob(job *Job) {
|
|
// assumes lock held
|
|
index, _ := m.getJob(m.queue, job.ID)
|
|
if index == -1 {
|
|
return
|
|
}
|
|
|
|
// clear any subtasks
|
|
job.Details = nil
|
|
|
|
m.queue = append(m.queue[:index], m.queue[index+1:]...)
|
|
|
|
m.graveyard = append(m.graveyard, job)
|
|
if len(m.graveyard) > maxGraveyardSize {
|
|
m.graveyard = m.graveyard[1:]
|
|
}
|
|
|
|
// notify job removed
|
|
for _, s := range m.subscriptions {
|
|
// don't block if channel is full
|
|
select {
|
|
case s.removedJob <- *job:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) getJob(list []*Job, id int) (index int, job *Job) {
|
|
// assumes lock held
|
|
for i, j := range list {
|
|
if j.ID == id {
|
|
index = i
|
|
job = j
|
|
return
|
|
}
|
|
}
|
|
|
|
return -1, nil
|
|
}
|
|
|
|
// CancelJob cancels the job with the provided id. Jobs that have been started
|
|
// are notified that they are stopping. Jobs that have not yet started are
|
|
// removed from the queue. If no job exists with the provided id, then there is
|
|
// no effect. Likewise, if the job is already cancelled, there is no effect.
|
|
func (m *Manager) CancelJob(id int) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
_, j := m.getJob(m.queue, id)
|
|
if j != nil {
|
|
j.cancel()
|
|
|
|
if j.Status == StatusCancelled {
|
|
// remove from the queue
|
|
m.removeJob(j)
|
|
}
|
|
}
|
|
}
|
|
|
|
// CancelAll cancels all of the jobs in the queue. This is the same as
|
|
// calling CancelJob on all jobs in the queue.
|
|
func (m *Manager) CancelAll() {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
// call cancel on all
|
|
for _, j := range m.queue {
|
|
j.cancel()
|
|
|
|
if j.Status == StatusCancelled {
|
|
// add to graveyard
|
|
m.removeJob(j)
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetJob returns a copy of the Job for the provided id. Returns nil if the job
|
|
// does not exist.
|
|
func (m *Manager) GetJob(id int) *Job {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
// get from the queue or graveyard
|
|
_, j := m.getJob(append(m.queue, m.graveyard...), id)
|
|
if j != nil {
|
|
// make a copy of the job and return the pointer
|
|
jCopy := *j
|
|
return &jCopy
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetQueue returns a copy of the current job queue.
|
|
func (m *Manager) GetQueue() []Job {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
var ret []Job
|
|
|
|
for _, j := range m.queue {
|
|
jCopy := *j
|
|
ret = append(ret, jCopy)
|
|
}
|
|
|
|
return ret
|
|
}
|
|
|
|
// Subscribe subscribes to changes to jobs in the manager queue.
|
|
func (m *Manager) Subscribe(ctx context.Context) *ManagerSubscription {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
ret := newSubscription()
|
|
|
|
m.subscriptions = append(m.subscriptions, ret)
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
ret.close()
|
|
|
|
// remove from the list
|
|
for i, s := range m.subscriptions {
|
|
if s == ret {
|
|
m.subscriptions = append(m.subscriptions[:i], m.subscriptions[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}()
|
|
|
|
return ret
|
|
}
|
|
|
|
func (m *Manager) notifyJobUpdate(j *Job) {
|
|
// don't update if job is finished or cancelled - these are handled
|
|
// by removeJob
|
|
if j.Status == StatusCancelled || j.Status == StatusFinished {
|
|
return
|
|
}
|
|
|
|
// assumes lock held
|
|
for _, s := range m.subscriptions {
|
|
// don't block if channel is full
|
|
select {
|
|
case s.updatedJob <- *j:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
type updater struct {
|
|
m *Manager
|
|
job *Job
|
|
lastUpdate time.Time
|
|
updateTimer *time.Timer
|
|
}
|
|
|
|
func (u *updater) notifyUpdate() {
|
|
// assumes lock held
|
|
u.m.notifyJobUpdate(u.job)
|
|
u.lastUpdate = time.Now()
|
|
u.updateTimer = nil
|
|
}
|
|
|
|
func (u *updater) updateProgress(progress float64, details []string) {
|
|
u.m.mutex.Lock()
|
|
defer u.m.mutex.Unlock()
|
|
|
|
u.job.Progress = progress
|
|
u.job.Details = details
|
|
|
|
if time.Since(u.lastUpdate) < u.m.updateThrottleLimit {
|
|
if u.updateTimer == nil {
|
|
u.updateTimer = time.AfterFunc(u.m.updateThrottleLimit-time.Since(u.lastUpdate), func() {
|
|
u.m.mutex.Lock()
|
|
defer u.m.mutex.Unlock()
|
|
|
|
u.notifyUpdate()
|
|
})
|
|
}
|
|
} else {
|
|
u.notifyUpdate()
|
|
}
|
|
}
|