2021-05-24 04:24:18 +00:00
|
|
|
package job
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2022-07-13 06:30:54 +00:00
|
|
|
"runtime/debug"
|
2021-05-24 04:24:18 +00:00
|
|
|
"sync"
|
|
|
|
"time"
|
2022-07-13 06:30:54 +00:00
|
|
|
|
|
|
|
"github.com/stashapp/stash/pkg/logger"
|
2023-02-15 23:07:52 +00:00
|
|
|
"github.com/stashapp/stash/pkg/utils"
|
2021-05-24 04:24:18 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
const maxGraveyardSize = 10
|
2021-10-14 23:39:48 +00:00
|
|
|
const defaultThrottleLimit = 100 * time.Millisecond
|
2021-05-24 04:24:18 +00:00
|
|
|
|
|
|
|
// Manager maintains a queue of jobs. Jobs are executed one at a time.
|
|
|
|
type Manager struct {
|
|
|
|
queue []*Job
|
|
|
|
graveyard []*Job
|
|
|
|
|
|
|
|
mutex sync.Mutex
|
|
|
|
notEmpty *sync.Cond
|
|
|
|
stop chan struct{}
|
|
|
|
|
|
|
|
lastID int
|
|
|
|
|
|
|
|
subscriptions []*ManagerSubscription
|
|
|
|
updateThrottleLimit time.Duration
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewManager initialises and returns a new Manager.
|
|
|
|
func NewManager() *Manager {
|
|
|
|
ret := &Manager{
|
|
|
|
stop: make(chan struct{}),
|
|
|
|
updateThrottleLimit: defaultThrottleLimit,
|
|
|
|
}
|
|
|
|
|
|
|
|
ret.notEmpty = sync.NewCond(&ret.mutex)
|
|
|
|
|
|
|
|
go ret.dispatcher()
|
|
|
|
|
|
|
|
return ret
|
|
|
|
}
|
|
|
|
|
|
|
|
// Stop is used to stop the dispatcher thread. Once Stop is called, no
|
|
|
|
// more Jobs will be processed.
|
|
|
|
func (m *Manager) Stop() {
|
|
|
|
m.CancelAll()
|
|
|
|
close(m.stop)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add queues a job.
|
2021-06-11 07:24:58 +00:00
|
|
|
func (m *Manager) Add(ctx context.Context, description string, e JobExec) int {
|
2021-05-24 04:24:18 +00:00
|
|
|
m.mutex.Lock()
|
|
|
|
defer m.mutex.Unlock()
|
|
|
|
|
|
|
|
t := time.Now()
|
|
|
|
|
|
|
|
j := Job{
|
|
|
|
ID: m.nextID(),
|
|
|
|
Status: StatusReady,
|
|
|
|
Description: description,
|
|
|
|
AddTime: t,
|
|
|
|
exec: e,
|
2021-06-11 07:24:58 +00:00
|
|
|
outerCtx: ctx,
|
2021-05-24 04:24:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
m.queue = append(m.queue, &j)
|
|
|
|
|
|
|
|
if len(m.queue) == 1 {
|
|
|
|
// notify that there is now a job in the queue
|
|
|
|
m.notEmpty.Broadcast()
|
|
|
|
}
|
|
|
|
|
|
|
|
m.notifyNewJob(&j)
|
|
|
|
|
|
|
|
return j.ID
|
|
|
|
}
|
|
|
|
|
|
|
|
// Start adds a job and starts it immediately, concurrently with any other
|
|
|
|
// jobs.
|
2021-06-11 07:24:58 +00:00
|
|
|
func (m *Manager) Start(ctx context.Context, description string, e JobExec) int {
|
2021-05-24 04:24:18 +00:00
|
|
|
m.mutex.Lock()
|
|
|
|
defer m.mutex.Unlock()
|
|
|
|
|
|
|
|
t := time.Now()
|
|
|
|
|
|
|
|
j := Job{
|
|
|
|
ID: m.nextID(),
|
|
|
|
Status: StatusReady,
|
|
|
|
Description: description,
|
|
|
|
AddTime: t,
|
|
|
|
exec: e,
|
2021-06-11 07:24:58 +00:00
|
|
|
outerCtx: ctx,
|
2021-05-24 04:24:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
m.queue = append(m.queue, &j)
|
|
|
|
|
Hoist context, enable errchkjson (#2488)
* Make the script scraper context-aware
Connect the context to the command execution. This means command
execution can be aborted if the context is canceled. The context is
usually bound to user-interaction, i.e., a scraper operation issued
by the user. Hence, it seems correct to abort a command if the user
aborts.
* Enable errchkjson
Some json marshal calls are *safe* in that they can never fail. This is
conditional on the types of the the data being encoded. errchkjson finds
those calls which are unsafe, and also not checked for errors.
Add logging warnings to the place where unsafe encodings might happen.
This can help uncover usage bugs early in stash if they are tripped,
making debugging easier.
While here, keep the checker enabled in the linter to capture future
uses of json marshalling.
* Pass the context for zip file scanning.
* Pass the context in scanning
* Pass context, replace context.TODO()
Where applicable, pass the context down toward the lower functions in
the call stack. Replace uses of context.TODO() with the passed context.
This makes the code more context-aware, and you can rely on aborting
contexts to clean up subsystems to a far greater extent now.
I've left the cases where there is a context in a struct. My gut feeling
is that they have solutions that are nice, but they require more deep
thinking to unveil how to handle it.
* Remove context from task-structs
As a rule, contexts are better passed explicitly to functions than they
are passed implicitly via structs. In the case of tasks, we already
have a valid context in scope when creating the struct, so remove ctx
from the struct and use the scoped context instead.
With this change it is clear that the scanning functions are under a
context, and the task-starting caller has jurisdiction over the context
and its lifetime. A reader of the code don't have to figure out where
the context are coming from anymore.
While here, connect context.TODO() to the newly scoped context in most
of the scan code.
* Remove context from autotag struct too
* Make more context-passing explicit
In all of these cases, there is an applicable context which is close
in the call-tree. Hook up to this context.
* Simplify context passing in manager
The managers context handling generally wants to use an outer context
if applicable. However, the code doesn't pass it explicitly, but stores
it in a struct. Pull out the context from the struct and use it to
explicitly pass it.
At a later point in time, we probably want to handle this by handing
over the job to a different (program-lifetime) context for background
jobs, but this will do for a start.
2022-04-15 01:34:53 +00:00
|
|
|
m.dispatch(ctx, &j)
|
2021-05-24 04:24:18 +00:00
|
|
|
|
|
|
|
return j.ID
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) notifyNewJob(j *Job) {
|
|
|
|
// assumes lock held
|
|
|
|
for _, s := range m.subscriptions {
|
|
|
|
// don't block if channel is full
|
|
|
|
select {
|
|
|
|
case s.newJob <- *j:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) nextID() int {
|
2021-06-03 23:21:17 +00:00
|
|
|
m.lastID++
|
2021-05-24 04:24:18 +00:00
|
|
|
return m.lastID
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) getReadyJob() *Job {
|
|
|
|
// assumes lock held
|
|
|
|
for _, j := range m.queue {
|
|
|
|
if j.Status == StatusReady {
|
|
|
|
return j
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) dispatcher() {
|
|
|
|
m.mutex.Lock()
|
|
|
|
|
|
|
|
for {
|
|
|
|
// wait until we have something to process
|
|
|
|
j := m.getReadyJob()
|
|
|
|
|
|
|
|
for j == nil {
|
|
|
|
m.notEmpty.Wait()
|
|
|
|
|
|
|
|
// it's possible that we have been stopped - check here
|
|
|
|
select {
|
|
|
|
case <-m.stop:
|
|
|
|
m.mutex.Unlock()
|
|
|
|
return
|
|
|
|
default:
|
|
|
|
// keep going
|
|
|
|
j = m.getReadyJob()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
Hoist context, enable errchkjson (#2488)
* Make the script scraper context-aware
Connect the context to the command execution. This means command
execution can be aborted if the context is canceled. The context is
usually bound to user-interaction, i.e., a scraper operation issued
by the user. Hence, it seems correct to abort a command if the user
aborts.
* Enable errchkjson
Some json marshal calls are *safe* in that they can never fail. This is
conditional on the types of the the data being encoded. errchkjson finds
those calls which are unsafe, and also not checked for errors.
Add logging warnings to the place where unsafe encodings might happen.
This can help uncover usage bugs early in stash if they are tripped,
making debugging easier.
While here, keep the checker enabled in the linter to capture future
uses of json marshalling.
* Pass the context for zip file scanning.
* Pass the context in scanning
* Pass context, replace context.TODO()
Where applicable, pass the context down toward the lower functions in
the call stack. Replace uses of context.TODO() with the passed context.
This makes the code more context-aware, and you can rely on aborting
contexts to clean up subsystems to a far greater extent now.
I've left the cases where there is a context in a struct. My gut feeling
is that they have solutions that are nice, but they require more deep
thinking to unveil how to handle it.
* Remove context from task-structs
As a rule, contexts are better passed explicitly to functions than they
are passed implicitly via structs. In the case of tasks, we already
have a valid context in scope when creating the struct, so remove ctx
from the struct and use the scoped context instead.
With this change it is clear that the scanning functions are under a
context, and the task-starting caller has jurisdiction over the context
and its lifetime. A reader of the code don't have to figure out where
the context are coming from anymore.
While here, connect context.TODO() to the newly scoped context in most
of the scan code.
* Remove context from autotag struct too
* Make more context-passing explicit
In all of these cases, there is an applicable context which is close
in the call-tree. Hook up to this context.
* Simplify context passing in manager
The managers context handling generally wants to use an outer context
if applicable. However, the code doesn't pass it explicitly, but stores
it in a struct. Pull out the context from the struct and use it to
explicitly pass it.
At a later point in time, we probably want to handle this by handing
over the job to a different (program-lifetime) context for background
jobs, but this will do for a start.
2022-04-15 01:34:53 +00:00
|
|
|
done := m.dispatch(j.outerCtx, j)
|
2021-05-24 04:24:18 +00:00
|
|
|
|
|
|
|
// unlock the mutex and wait for the job to finish
|
|
|
|
m.mutex.Unlock()
|
|
|
|
<-done
|
|
|
|
m.mutex.Lock()
|
|
|
|
|
|
|
|
// remove the job from the queue
|
|
|
|
m.removeJob(j)
|
|
|
|
|
|
|
|
// process next job
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) newProgress(j *Job) *Progress {
|
|
|
|
return &Progress{
|
|
|
|
updater: &updater{
|
|
|
|
m: m,
|
|
|
|
job: j,
|
|
|
|
},
|
|
|
|
percent: ProgressIndefinite,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
Hoist context, enable errchkjson (#2488)
* Make the script scraper context-aware
Connect the context to the command execution. This means command
execution can be aborted if the context is canceled. The context is
usually bound to user-interaction, i.e., a scraper operation issued
by the user. Hence, it seems correct to abort a command if the user
aborts.
* Enable errchkjson
Some json marshal calls are *safe* in that they can never fail. This is
conditional on the types of the the data being encoded. errchkjson finds
those calls which are unsafe, and also not checked for errors.
Add logging warnings to the place where unsafe encodings might happen.
This can help uncover usage bugs early in stash if they are tripped,
making debugging easier.
While here, keep the checker enabled in the linter to capture future
uses of json marshalling.
* Pass the context for zip file scanning.
* Pass the context in scanning
* Pass context, replace context.TODO()
Where applicable, pass the context down toward the lower functions in
the call stack. Replace uses of context.TODO() with the passed context.
This makes the code more context-aware, and you can rely on aborting
contexts to clean up subsystems to a far greater extent now.
I've left the cases where there is a context in a struct. My gut feeling
is that they have solutions that are nice, but they require more deep
thinking to unveil how to handle it.
* Remove context from task-structs
As a rule, contexts are better passed explicitly to functions than they
are passed implicitly via structs. In the case of tasks, we already
have a valid context in scope when creating the struct, so remove ctx
from the struct and use the scoped context instead.
With this change it is clear that the scanning functions are under a
context, and the task-starting caller has jurisdiction over the context
and its lifetime. A reader of the code don't have to figure out where
the context are coming from anymore.
While here, connect context.TODO() to the newly scoped context in most
of the scan code.
* Remove context from autotag struct too
* Make more context-passing explicit
In all of these cases, there is an applicable context which is close
in the call-tree. Hook up to this context.
* Simplify context passing in manager
The managers context handling generally wants to use an outer context
if applicable. However, the code doesn't pass it explicitly, but stores
it in a struct. Pull out the context from the struct and use it to
explicitly pass it.
At a later point in time, we probably want to handle this by handing
over the job to a different (program-lifetime) context for background
jobs, but this will do for a start.
2022-04-15 01:34:53 +00:00
|
|
|
func (m *Manager) dispatch(ctx context.Context, j *Job) (done chan struct{}) {
|
2021-05-24 04:24:18 +00:00
|
|
|
// assumes lock held
|
|
|
|
t := time.Now()
|
|
|
|
j.StartTime = &t
|
|
|
|
j.Status = StatusRunning
|
|
|
|
|
2023-02-15 23:07:52 +00:00
|
|
|
ctx, cancelFunc := context.WithCancel(utils.ValueOnlyContext{Context: ctx})
|
2021-05-24 04:24:18 +00:00
|
|
|
j.cancelFunc = cancelFunc
|
|
|
|
|
|
|
|
done = make(chan struct{})
|
2022-07-13 06:30:54 +00:00
|
|
|
go m.executeJob(ctx, j, done)
|
2021-05-24 04:24:18 +00:00
|
|
|
|
|
|
|
m.notifyJobUpdate(j)
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2022-07-13 06:30:54 +00:00
|
|
|
func (m *Manager) executeJob(ctx context.Context, j *Job, done chan struct{}) {
|
|
|
|
defer close(done)
|
|
|
|
defer m.onJobFinish(j)
|
|
|
|
defer func() {
|
|
|
|
if p := recover(); p != nil {
|
|
|
|
// a panic occurred, log and mark the job as failed
|
|
|
|
logger.Errorf("panic in job %d - %s: %v", j.ID, j.Description, p)
|
|
|
|
logger.Error(string(debug.Stack()))
|
|
|
|
|
|
|
|
m.mutex.Lock()
|
|
|
|
defer m.mutex.Unlock()
|
|
|
|
j.Status = StatusFailed
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
progress := m.newProgress(j)
|
2024-03-14 00:06:23 +00:00
|
|
|
if err := j.exec.Execute(ctx, progress); err != nil {
|
|
|
|
logger.Errorf("task failed due to error: %v", err)
|
|
|
|
j.error(err)
|
|
|
|
}
|
2022-07-13 06:30:54 +00:00
|
|
|
}
|
|
|
|
|
2021-05-24 04:24:18 +00:00
|
|
|
func (m *Manager) onJobFinish(job *Job) {
|
|
|
|
m.mutex.Lock()
|
|
|
|
defer m.mutex.Unlock()
|
|
|
|
|
|
|
|
if job.Status == StatusStopping {
|
|
|
|
job.Status = StatusCancelled
|
2022-07-13 06:30:54 +00:00
|
|
|
} else if job.Status != StatusFailed {
|
2021-05-24 04:24:18 +00:00
|
|
|
job.Status = StatusFinished
|
|
|
|
}
|
|
|
|
t := time.Now()
|
|
|
|
job.EndTime = &t
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) removeJob(job *Job) {
|
|
|
|
// assumes lock held
|
|
|
|
index, _ := m.getJob(m.queue, job.ID)
|
|
|
|
if index == -1 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// clear any subtasks
|
|
|
|
job.Details = nil
|
|
|
|
|
|
|
|
m.queue = append(m.queue[:index], m.queue[index+1:]...)
|
|
|
|
|
|
|
|
m.graveyard = append(m.graveyard, job)
|
|
|
|
if len(m.graveyard) > maxGraveyardSize {
|
|
|
|
m.graveyard = m.graveyard[1:]
|
|
|
|
}
|
|
|
|
|
|
|
|
// notify job removed
|
|
|
|
for _, s := range m.subscriptions {
|
|
|
|
// don't block if channel is full
|
|
|
|
select {
|
|
|
|
case s.removedJob <- *job:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) getJob(list []*Job, id int) (index int, job *Job) {
|
|
|
|
// assumes lock held
|
|
|
|
for i, j := range list {
|
|
|
|
if j.ID == id {
|
|
|
|
index = i
|
|
|
|
job = j
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return -1, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// CancelJob cancels the job with the provided id. Jobs that have been started
|
|
|
|
// are notified that they are stopping. Jobs that have not yet started are
|
|
|
|
// removed from the queue. If no job exists with the provided id, then there is
|
|
|
|
// no effect. Likewise, if the job is already cancelled, there is no effect.
|
|
|
|
func (m *Manager) CancelJob(id int) {
|
|
|
|
m.mutex.Lock()
|
|
|
|
defer m.mutex.Unlock()
|
|
|
|
|
|
|
|
_, j := m.getJob(m.queue, id)
|
|
|
|
if j != nil {
|
|
|
|
j.cancel()
|
|
|
|
|
|
|
|
if j.Status == StatusCancelled {
|
|
|
|
// remove from the queue
|
|
|
|
m.removeJob(j)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// CancelAll cancels all of the jobs in the queue. This is the same as
|
|
|
|
// calling CancelJob on all jobs in the queue.
|
|
|
|
func (m *Manager) CancelAll() {
|
|
|
|
m.mutex.Lock()
|
|
|
|
defer m.mutex.Unlock()
|
|
|
|
|
|
|
|
// call cancel on all
|
|
|
|
for _, j := range m.queue {
|
|
|
|
j.cancel()
|
|
|
|
|
|
|
|
if j.Status == StatusCancelled {
|
|
|
|
// add to graveyard
|
|
|
|
m.removeJob(j)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetJob returns a copy of the Job for the provided id. Returns nil if the job
|
|
|
|
// does not exist.
|
|
|
|
func (m *Manager) GetJob(id int) *Job {
|
|
|
|
m.mutex.Lock()
|
|
|
|
defer m.mutex.Unlock()
|
|
|
|
|
|
|
|
// get from the queue or graveyard
|
|
|
|
_, j := m.getJob(append(m.queue, m.graveyard...), id)
|
|
|
|
if j != nil {
|
|
|
|
// make a copy of the job and return the pointer
|
|
|
|
jCopy := *j
|
|
|
|
return &jCopy
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetQueue returns a copy of the current job queue.
|
|
|
|
func (m *Manager) GetQueue() []Job {
|
|
|
|
m.mutex.Lock()
|
|
|
|
defer m.mutex.Unlock()
|
|
|
|
|
|
|
|
var ret []Job
|
|
|
|
|
|
|
|
for _, j := range m.queue {
|
|
|
|
jCopy := *j
|
|
|
|
ret = append(ret, jCopy)
|
|
|
|
}
|
|
|
|
|
|
|
|
return ret
|
|
|
|
}
|
|
|
|
|
|
|
|
// Subscribe subscribes to changes to jobs in the manager queue.
|
|
|
|
func (m *Manager) Subscribe(ctx context.Context) *ManagerSubscription {
|
|
|
|
m.mutex.Lock()
|
|
|
|
defer m.mutex.Unlock()
|
|
|
|
|
|
|
|
ret := newSubscription()
|
|
|
|
|
|
|
|
m.subscriptions = append(m.subscriptions, ret)
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
<-ctx.Done()
|
|
|
|
m.mutex.Lock()
|
|
|
|
defer m.mutex.Unlock()
|
|
|
|
|
|
|
|
ret.close()
|
|
|
|
|
|
|
|
// remove from the list
|
|
|
|
for i, s := range m.subscriptions {
|
|
|
|
if s == ret {
|
|
|
|
m.subscriptions = append(m.subscriptions[:i], m.subscriptions[i+1:]...)
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
return ret
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Manager) notifyJobUpdate(j *Job) {
|
|
|
|
// don't update if job is finished or cancelled - these are handled
|
|
|
|
// by removeJob
|
|
|
|
if j.Status == StatusCancelled || j.Status == StatusFinished {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// assumes lock held
|
|
|
|
for _, s := range m.subscriptions {
|
|
|
|
// don't block if channel is full
|
|
|
|
select {
|
|
|
|
case s.updatedJob <- *j:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type updater struct {
|
|
|
|
m *Manager
|
|
|
|
job *Job
|
|
|
|
lastUpdate time.Time
|
|
|
|
updateTimer *time.Timer
|
|
|
|
}
|
|
|
|
|
|
|
|
func (u *updater) notifyUpdate() {
|
|
|
|
// assumes lock held
|
|
|
|
u.m.notifyJobUpdate(u.job)
|
|
|
|
u.lastUpdate = time.Now()
|
|
|
|
u.updateTimer = nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (u *updater) updateProgress(progress float64, details []string) {
|
|
|
|
u.m.mutex.Lock()
|
|
|
|
defer u.m.mutex.Unlock()
|
|
|
|
|
|
|
|
u.job.Progress = progress
|
|
|
|
u.job.Details = details
|
|
|
|
|
|
|
|
if time.Since(u.lastUpdate) < u.m.updateThrottleLimit {
|
|
|
|
if u.updateTimer == nil {
|
|
|
|
u.updateTimer = time.AfterFunc(u.m.updateThrottleLimit-time.Since(u.lastUpdate), func() {
|
|
|
|
u.m.mutex.Lock()
|
|
|
|
defer u.m.mutex.Unlock()
|
|
|
|
|
|
|
|
u.notifyUpdate()
|
|
|
|
})
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
u.notifyUpdate()
|
|
|
|
}
|
|
|
|
}
|