mirror of https://github.com/BOINC/boinc.git
scheduler: finish implementation of targeted jobs
Targeted jobs are supposed to be like regular jobs (in terms of replication and error handling) except they're restricted to a particular host/user/team. Job instances are created by the scheduler (when it's contacted by a qualifying host) rather than by the transitioner. However, this wasn't fully implemented. In particular, the scheduler needs the same logic as the transitioner to decide whether to create a new instance: namely, whether wu.target_nresults > nunsent - ninprogress - nsuccess Also, the scheduler shouldn't send an instance if the WU had an error or was cancelled.
This commit is contained in:
parent
7887e332d2
commit
3290caf0f8
|
@ -44,6 +44,61 @@
|
|||
|
||||
#include "sched_assign.h"
|
||||
|
||||
// The workunit is targeted to the host (or user or team).
|
||||
// Decide if we should actually send an instance
|
||||
//
|
||||
bool need_targeted_instance(WORKUNIT& wu, int hostid) {
|
||||
|
||||
// don't send if WU had error or was canceled
|
||||
// (db_purge will eventually delete WU and assignment records)
|
||||
//
|
||||
if (wu.error_mask) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// don't send if WU is validation pending or completed,
|
||||
// or has transition pending
|
||||
//
|
||||
if (wu.need_validate) return false;
|
||||
if (wu.canonical_resultid) return false;
|
||||
if (wu.transition_time < time(0)) return false;
|
||||
|
||||
// See if this WU needs another instance.
|
||||
// This replicates logic in the transitioner
|
||||
//
|
||||
char buf[256];
|
||||
DB_RESULT result;
|
||||
int nunsent=0, ninprogress=0, nsuccess=0;
|
||||
sprintf(buf, "where workunitid=%d", wu.id);
|
||||
while (!result.enumerate(buf)) {
|
||||
// send at most 1 instance to a given host
|
||||
//
|
||||
if (result.hostid == hostid) {
|
||||
return false;
|
||||
}
|
||||
switch (result.server_state) {
|
||||
case RESULT_SERVER_STATE_INACTIVE:
|
||||
case RESULT_SERVER_STATE_UNSENT:
|
||||
nunsent++;
|
||||
break;
|
||||
case RESULT_SERVER_STATE_IN_PROGRESS:
|
||||
ninprogress++;
|
||||
break;
|
||||
case RESULT_SERVER_STATE_OVER:
|
||||
if (result.outcome == RESULT_OUTCOME_SUCCESS
|
||||
&& result.validate_state != VALIDATE_STATE_INVALID
|
||||
) {
|
||||
nsuccess++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
int needed = wu.target_nresults - nunsent - ninprogress - nsuccess;
|
||||
if (needed <= 0) return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// send a job for the given assignment
|
||||
//
|
||||
static int send_assigned_job(ASSIGNMENT& asg) {
|
||||
|
@ -114,7 +169,7 @@ static int send_assigned_job(ASSIGNMENT& asg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// Send this host any broadcase jobs.
|
||||
// Send this host any broadcast jobs.
|
||||
// Return true iff we sent anything
|
||||
//
|
||||
bool send_broadcast_jobs() {
|
||||
|
@ -178,9 +233,9 @@ bool send_jobs(int assign_type) {
|
|||
DB_ASSIGNMENT asg;
|
||||
DB_RESULT result;
|
||||
DB_WORKUNIT wu;
|
||||
int retval, n;
|
||||
int retval;
|
||||
bool sent_something = false;
|
||||
char query[256], buf[256];
|
||||
char query[256];
|
||||
|
||||
switch (assign_type) {
|
||||
case ASSIGN_USER:
|
||||
|
@ -211,39 +266,9 @@ bool send_jobs(int assign_type) {
|
|||
continue;
|
||||
}
|
||||
|
||||
// don't send if WU is validation pending or completed,
|
||||
// or has transition pending
|
||||
//
|
||||
if (wu.need_validate) continue;
|
||||
if (wu.canonical_resultid) continue;
|
||||
if (wu.transition_time < time(0)) continue;
|
||||
|
||||
// don't send if an instance is currently in progress
|
||||
//
|
||||
sprintf(buf,
|
||||
"where workunitid=%d and server_state=%d",
|
||||
asg.workunitid,
|
||||
RESULT_SERVER_STATE_IN_PROGRESS
|
||||
);
|
||||
retval = result.count(n, buf);
|
||||
if (retval) {
|
||||
log_messages.printf(MSG_CRITICAL,
|
||||
"result.count() failed: %s\n", boincerror(retval)
|
||||
);
|
||||
if (!need_targeted_instance(wu, g_reply->host.id)) {
|
||||
continue;
|
||||
}
|
||||
if (n>0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// don't send if we already sent an instance to this host
|
||||
//
|
||||
sprintf(buf, "where workunitid=%d and hostid=%d",
|
||||
asg.workunitid,
|
||||
g_reply->host.id
|
||||
);
|
||||
retval = result.lookup(buf);
|
||||
if (retval != ERR_DB_NOT_FOUND) continue;
|
||||
|
||||
// OK, send the job
|
||||
//
|
||||
|
|
Loading…
Reference in New Issue