scheduler: add support for jobs targeted at hosts and teams

Also: add code to db_purge to delete assignment records for completed WUs
This commit is contained in:
David Anderson 2014-03-12 00:03:17 -07:00
parent 888c1a1e39
commit 2f91cd6b5e
2 changed files with 47 additions and 24 deletions

View File

@ -567,6 +567,14 @@ bool do_pass() {
);
exit(6);
}
if (config.enable_assignment) {
DB_ASSIGNMENT asg;
sprintf(buf, "where workunitid=%d", wu.id);
retval = asg.lookup(buf);
if (!retval) {
asg.delete_from_db();
}
}
}
log_messages.printf(MSG_DEBUG,
"Purged workunit [%d] from database\n", wu.id

View File

@ -113,7 +113,7 @@ static int send_assigned_job(ASSIGNMENT& asg) {
return 0;
}
// Send this host any "multi" assigned jobs.
// Send this host any broadcase jobs.
// Return true iff we sent anything
//
bool send_broadcast_jobs() {
@ -127,7 +127,7 @@ bool send_broadcast_jobs() {
if (config.debug_assignment) {
log_messages.printf(MSG_NORMAL,
"[assign] processing multi assignment type %d\n",
"[assign] processing broadcast type %d\n",
asg.target_type
);
}
@ -169,22 +169,37 @@ bool send_broadcast_jobs() {
return sent_something;
}
// send targeted jobs
// Send targeted jobs of a given type.
// NOTE: there may be an atomicity problem in the following.
// Ideally it should be in a transaction.
//
bool send_targeted_jobs() {
bool send_jobs(int assign_type) {
DB_ASSIGNMENT asg;
DB_RESULT result;
DB_WORKUNIT wu;
bool sent_something = false;
int retval;
bool sent_something = false;
char query[256], buf[256];
// for now, only look for user assignments
//
char buf[256];
sprintf(buf, "where target_type=%d and target_id=%d and multi=0",
ASSIGN_USER, g_reply->user.id
);
while (!asg.enumerate(buf)) {
switch (assign_type) {
case ASSIGN_USER:
sprintf(query, "where target_type=%d and target_id=%d and multi=0",
ASSIGN_USER, g_reply->user.id
);
break;
case ASSIGN_HOST:
sprintf(query, "where target_type=%d and target_id=%d and multi=0",
ASSIGN_HOST, g_reply->host.id
);
break;
case ASSIGN_TEAM:
sprintf(query, "where target_type=%d and target_id=%d and multi=0",
ASSIGN_TEAM, g_reply->team.id
);
break;
}
while (!asg.enumerate(query)) {
if (!work_needed(false)) continue;
// if the WU doesn't exist, delete the assignment record.
@ -194,6 +209,7 @@ bool send_targeted_jobs() {
asg.delete_from_db();
continue;
}
// don't send if WU is validation pending or completed,
// or has transition pending
//
@ -201,7 +217,7 @@ bool send_targeted_jobs() {
if (wu.canonical_resultid) continue;
if (wu.transition_time < time(0)) continue;
// don't send if we already sent one to this host
// don't send if we already sent an instance to this host
//
sprintf(buf, "where workunitid=%d and hostid=%d",
asg.workunitid,
@ -210,17 +226,6 @@ bool send_targeted_jobs() {
retval = result.lookup(buf);
if (retval != ERR_DB_NOT_FOUND) continue;
// don't send if there's already one in progress to this user
//
sprintf(buf,
"where workunitid=%d and userid=%d and server_state=%d",
asg.workunitid,
g_reply->user.id,
RESULT_SERVER_STATE_IN_PROGRESS
);
retval = result.lookup(buf);
if (retval != ERR_DB_NOT_FOUND) continue;
// OK, send the job
//
retval = send_assigned_job(asg);
@ -241,3 +246,13 @@ bool send_targeted_jobs() {
}
return sent_something;
}
// send targeted jobs
//
bool send_targeted_jobs() {
bool sent_something = false;
sent_something |= send_jobs(ASSIGN_USER);
sent_something |= send_jobs(ASSIGN_HOST);
sent_something |= send_jobs(ASSIGN_TEAM);
return sent_something;
}