From 2f91cd6b5edc32d01d66f5ff8a7da9a45ade2bf9 Mon Sep 17 00:00:00 2001 From: David Anderson Date: Wed, 12 Mar 2014 00:03:17 -0700 Subject: [PATCH] scheduler: add support for jobs targeted at hosts and teams Also: add code to db_purge to delete assignment records for completed WUs --- sched/db_purge.cpp | 8 ++++++ sched/sched_assign.cpp | 63 ++++++++++++++++++++++++++---------------- 2 files changed, 47 insertions(+), 24 deletions(-) diff --git a/sched/db_purge.cpp b/sched/db_purge.cpp index e3672f893c..3ef517392b 100644 --- a/sched/db_purge.cpp +++ b/sched/db_purge.cpp @@ -567,6 +567,14 @@ bool do_pass() { ); exit(6); } + if (config.enable_assignment) { + DB_ASSIGNMENT asg; + sprintf(buf, "where workunitid=%d", wu.id); + retval = asg.lookup(buf); + if (!retval) { + asg.delete_from_db(); + } + } } log_messages.printf(MSG_DEBUG, "Purged workunit [%d] from database\n", wu.id diff --git a/sched/sched_assign.cpp b/sched/sched_assign.cpp index a21c6283ae..cb811ea9bd 100644 --- a/sched/sched_assign.cpp +++ b/sched/sched_assign.cpp @@ -113,7 +113,7 @@ static int send_assigned_job(ASSIGNMENT& asg) { return 0; } -// Send this host any "multi" assigned jobs. +// Send this host any broadcase jobs. // Return true iff we sent anything // bool send_broadcast_jobs() { @@ -127,7 +127,7 @@ bool send_broadcast_jobs() { if (config.debug_assignment) { log_messages.printf(MSG_NORMAL, - "[assign] processing multi assignment type %d\n", + "[assign] processing broadcast type %d\n", asg.target_type ); } @@ -169,22 +169,37 @@ bool send_broadcast_jobs() { return sent_something; } -// send targeted jobs +// Send targeted jobs of a given type. +// NOTE: there may be an atomicity problem in the following. +// Ideally it should be in a transaction. // -bool send_targeted_jobs() { +bool send_jobs(int assign_type) { DB_ASSIGNMENT asg; DB_RESULT result; DB_WORKUNIT wu; - bool sent_something = false; int retval; + bool sent_something = false; + char query[256], buf[256]; - // for now, only look for user assignments - // - char buf[256]; - sprintf(buf, "where target_type=%d and target_id=%d and multi=0", - ASSIGN_USER, g_reply->user.id - ); - while (!asg.enumerate(buf)) { + switch (assign_type) { + case ASSIGN_USER: + sprintf(query, "where target_type=%d and target_id=%d and multi=0", + ASSIGN_USER, g_reply->user.id + ); + break; + case ASSIGN_HOST: + sprintf(query, "where target_type=%d and target_id=%d and multi=0", + ASSIGN_HOST, g_reply->host.id + ); + break; + case ASSIGN_TEAM: + sprintf(query, "where target_type=%d and target_id=%d and multi=0", + ASSIGN_TEAM, g_reply->team.id + ); + break; + } + + while (!asg.enumerate(query)) { if (!work_needed(false)) continue; // if the WU doesn't exist, delete the assignment record. @@ -194,6 +209,7 @@ bool send_targeted_jobs() { asg.delete_from_db(); continue; } + // don't send if WU is validation pending or completed, // or has transition pending // @@ -201,7 +217,7 @@ bool send_targeted_jobs() { if (wu.canonical_resultid) continue; if (wu.transition_time < time(0)) continue; - // don't send if we already sent one to this host + // don't send if we already sent an instance to this host // sprintf(buf, "where workunitid=%d and hostid=%d", asg.workunitid, @@ -210,17 +226,6 @@ bool send_targeted_jobs() { retval = result.lookup(buf); if (retval != ERR_DB_NOT_FOUND) continue; - // don't send if there's already one in progress to this user - // - sprintf(buf, - "where workunitid=%d and userid=%d and server_state=%d", - asg.workunitid, - g_reply->user.id, - RESULT_SERVER_STATE_IN_PROGRESS - ); - retval = result.lookup(buf); - if (retval != ERR_DB_NOT_FOUND) continue; - // OK, send the job // retval = send_assigned_job(asg); @@ -241,3 +246,13 @@ bool send_targeted_jobs() { } return sent_something; } + +// send targeted jobs +// +bool send_targeted_jobs() { + bool sent_something = false; + sent_something |= send_jobs(ASSIGN_USER); + sent_something |= send_jobs(ASSIGN_HOST); + sent_something |= send_jobs(ASSIGN_TEAM); + return sent_something; +}