From 130d6ed4f02611c5e40b7e5d3e0a6bdc24f648e4 Mon Sep 17 00:00:00 2001 From: David Anderson Date: Mon, 30 Jan 2012 22:39:13 +0000 Subject: [PATCH] - server: revamp the "assigned job" mechanism. This now supports two main use cases: 1) there's a job that you want to run once on all hosts, present and future (or all hosts belonging to a user, or to a team). The job is never transitioned, validated, or assimilated. 2) There's a normal job for which you want to use only hosts belonging to a specific user (e.g. cluster or cloud hosts). This restriction can be made either when the job is created, or on the fly, e.g. as part of a scheme for accelerating batch completion. For the latter purposes we now provide a function restrict_wu_to_user(DB_WORKUNIT&, int userid); The job goes through the standard transitioner/validator/assimilator path. These cases are enabled by config flags respectively. Assignment of type 2) are no longer stored in shared mem, so there is no limit on their number. There is no longer a rule that assigned job names must contain "asgn". NOTE: this requires a database update. svn path=/trunk/boinc/; revision=25169 --- checkin_notes | 47 ++++++++++++++++ db/boinc_db.cpp | 19 ++++--- db/boinc_db.h | 14 ++++- db/constraints.sql | 3 + db/schema.sql | 4 +- doc/projects.php | 5 +- html/ops/db_update.php | 12 ++++ lib/common_defs.h | 1 - sched/sched_assign.cpp | 125 +++++++++++++++++++++++++++-------------- sched/sched_assign.h | 1 + sched/sched_config.cpp | 1 + sched/sched_config.h | 1 + sched/sched_send.cpp | 11 ++++ sched/sched_shmem.cpp | 2 +- sched/sched_util.cpp | 42 ++++++++++++++ sched/sched_util.h | 2 + sched/transitioner.cpp | 25 +-------- tools/backend_lib.cpp | 2 +- tools/create_work.cpp | 8 --- 19 files changed, 240 insertions(+), 85 deletions(-) diff --git a/checkin_notes b/checkin_notes index cccc01423d..ec46c97100 100644 --- a/checkin_notes +++ b/checkin_notes @@ -1141,3 +1141,50 @@ David 30 Jan 2012 clientgui/ DlgAdvPreferencesBase.cpp + +David 30 Jan 2012 + - server: revamp the "assigned job" mechanism. + This now supports two main use cases: + 1) there's a job that you want to run once on all hosts, + present and future + (or all hosts belonging to a user, or to a team). + The job is never transitioned, validated, or assimilated. + 2) There's a normal job for which you want to use only + hosts belonging to a specific user (e.g. cluster or cloud hosts). + This restriction can be made either when the job is created, + or on the fly, + e.g. as part of a scheme for accelerating batch completion. + For the latter purposes we now provide a function + restrict_wu_to_user(DB_WORKUNIT&, int userid); + + The job goes through the standard + transitioner/validator/assimilator path. + + These cases are enabled by config flags + + + respectively. + + Assignment of type 2) are no longer stored in shared mem, + so there is no limit on their number. + + There is no longer a rule that assigned job names must contain "asgn". + + NOTE: this requires a database update. + + db/ + boinc_db.cpp,h + constraints.sql + schema.sql + sched/ + sched_util.cpp,h + sched_shmem.cpp + transitioner.cpp + sched_assign.cpp + tools/ + backend_lib.cpp + create_work.cpp + html/ops/ + db_update.php + lib/ + common_defs.h diff --git a/db/boinc_db.cpp b/db/boinc_db.cpp index 5051e3769c..c2efaeff20 100644 --- a/db/boinc_db.cpp +++ b/db/boinc_db.cpp @@ -822,7 +822,8 @@ void DB_WORKUNIT::db_print(char* buf){ "priority=%d, " "rsc_bandwidth_bound=%.15e, " "fileset_id=%d, " - "app_version_id=%d ", + "app_version_id=%d, " + "transitioner_flags=%d ", create_time, appid, name, xml_doc, batch, rsc_fpops_est, rsc_fpops_bound, rsc_memory_bound, rsc_disk_bound, @@ -840,7 +841,8 @@ void DB_WORKUNIT::db_print(char* buf){ priority, rsc_bandwidth_bound, fileset_id, - app_version_id + app_version_id, + transitioner_flags ); } @@ -878,6 +880,7 @@ void DB_WORKUNIT::db_parse(MYSQL_ROW &r) { rsc_bandwidth_bound = atof(r[i++]); fileset_id = atoi(r[i++]); app_version_id = atoi(r[i++]); + transitioner_flags = atoi(r[i++]); } void DB_CREDITED_JOB::db_print(char* buf){ @@ -1084,7 +1087,7 @@ void DB_ASSIGNMENT::db_print(char* buf) { target_type, multi, workunitid, - resultid + _resultid ); } @@ -1097,7 +1100,7 @@ void DB_ASSIGNMENT::db_parse(MYSQL_ROW& r) { target_type = atoi(r[i++]); multi = atoi(r[i++]); workunitid = atoi(r[i++]); - resultid = atoi(r[i++]); + _resultid = atoi(r[i++]); } int DB_HOST_APP_VERSION::update_scheduler(DB_HOST_APP_VERSION& orig) { @@ -1290,6 +1293,7 @@ void TRANSITIONER_ITEM::parse(MYSQL_ROW& r) { hr_class = atoi(r[i++]); batch = atoi(r[i++]); app_version_id = atoi(r[i++]); + transitioner_flags = atoi(r[i++]); // use safe_atoi() from here on cuz they might not be there // @@ -1349,6 +1353,7 @@ int DB_TRANSITIONER_ITEM_SET::enumerate( " wu.hr_class, " " wu.batch, " " wu.app_version_id, " + " wu.transitioner_flags, " " res.id, " " res.name, " " res.report_deadline, " @@ -1364,10 +1369,10 @@ int DB_TRANSITIONER_ITEM_SET::enumerate( " workunit AS wu " " LEFT JOIN result AS res ON wu.id = res.workunitid " "WHERE " - " wu.transition_time < %d %s " + " wu.transition_time < %d %s and transitioner_flags<>%d" "LIMIT " " %d ", - transition_time, mod_clause, nresult_limit + transition_time, mod_clause, TRANSITION_NONE, nresult_limit ); retval = db->do_query(query); @@ -2029,8 +2034,6 @@ int DB_SCHED_RESULT_ITEM_SET::update_workunits() { for (i=0; ia complete list of projects.

-Note: if your computer is equipped with a Graphics Processing Unit +Projects have different requirements such as memory size; +a partial summary is here. +

+If your computer is equipped with a Graphics Processing Unit (GPU), you may be able to use it to compute faster. "; diff --git a/html/ops/db_update.php b/html/ops/db_update.php index 22c77880c1..b02f617a45 100755 --- a/html/ops/db_update.php +++ b/html/ops/db_update.php @@ -796,6 +796,17 @@ function update_9_20_2011() { add manage tinyint not null "); } + +function_update_1_30_2012() { + do_query(" + alter table workunit + add transitioner_flags tinyint not null + "); + do_query( + "add index asgn_target(target_type, target_id)" + ); +} + // Updates are done automatically if you use "upgrade". // // If you need to do updates manually, @@ -819,6 +830,7 @@ $db_updates = array ( array(24137, "update_9_6_2011"), array(24225, "update_9_15_2011"), array(24248, "update_9_20_2011"), + array(25169, "update_1_30_2012"), ); diff --git a/lib/common_defs.h b/lib/common_defs.h index 82a8dd37ba..ec728ed96d 100644 --- a/lib/common_defs.h +++ b/lib/common_defs.h @@ -179,7 +179,6 @@ struct VERSION_INFO { #define CLIENT_AUTH_FILENAME "client_auth.xml" #define LOCK_FILE_NAME "lockfile" #define GRAPHICS_APP_FILENAME "graphics_app" -#define ASSIGNED_WU_STR "asgn" #define GUI_RPC_PASSWD_FILE "gui_rpc_auth.cfg" #define SS_CONFIG_FILE "ss_config.xml" diff --git a/sched/sched_assign.cpp b/sched/sched_assign.cpp index a2d954b432..ce15a83454 100644 --- a/sched/sched_assign.cpp +++ b/sched/sched_assign.cpp @@ -41,10 +41,12 @@ #include "sched_assign.h" +// send a job for the given assignment +// static int send_assigned_job(ASSIGNMENT& asg) { int retval; DB_WORKUNIT wu; - char suffix[256], path[256], buf[256]; + char suffix[256], path[256]; const char *rtfpath; static bool first=true; static int seqno=0; @@ -93,22 +95,6 @@ static int send_assigned_job(ASSIGNMENT& asg) { retval = result.lookup_id(result_id); add_result_to_reply(result, wu, bavp, false); - // if this is a one-job assignment, fill in assignment.resultid - // so that it doesn't get sent again - // - if (!asg.multi && asg.target_type!=ASSIGN_NONE) { - DB_ASSIGNMENT db_asg; - db_asg.id = asg.id; - sprintf(buf, "resultid=%u", result_id); - retval = db_asg.update_field(buf); - if (retval) { - log_messages.printf(MSG_CRITICAL, - "assign update failed: %s\n", boincerror(retval) - ); - return retval; - } - asg.resultid = result_id; - } if (config.debug_assignment) { log_messages.printf(MSG_NORMAL, "[assign] [WU#%d] [RESULT#%d] [HOST#%d] send assignment %d\n", @@ -118,27 +104,26 @@ static int send_assigned_job(ASSIGNMENT& asg) { return 0; } -// Send this host any jobs assigned to it, or to its user/team +// Send this host any "multi" assigned jobs. // Return true iff we sent anything // -bool send_assigned_jobs() { +bool send_assigned_jobs_multi() { DB_RESULT result; int retval; char buf[256]; bool sent_something = false; for (int i=0; inassignments; i++) { - if (!work_needed(false)) break; ASSIGNMENT& asg = ssp->assignments[i]; if (config.debug_assignment) { log_messages.printf(MSG_NORMAL, - "[assign] processing assignment type %d\n", asg.target_type + "[assign] processing multi assignment type %d\n", + asg.target_type ); } // see if this assignment applies to this host // - if (asg.resultid) continue; switch (asg.target_type) { case ASSIGN_NONE: sprintf(buf, "where hostid=%d and workunitid=%d", @@ -150,22 +135,11 @@ bool send_assigned_jobs() { if (!retval) sent_something = true; } break; - case ASSIGN_HOST: - if (g_reply->host.id != asg.target_id) continue; - sprintf(buf, "where workunitid=%d", asg.workunitid); - retval = result.lookup(buf); - if (retval == ERR_DB_NOT_FOUND) { - retval = send_assigned_job(asg); - if (!retval) sent_something = true; - } - break; case ASSIGN_USER: if (g_reply->user.id != asg.target_id) continue; - if (asg.multi) { - sprintf(buf, "where workunitid=%d and hostid=%d", asg.workunitid, g_reply->host.id); - } else { - sprintf(buf, "where workunitid=%d", asg.workunitid); - } + sprintf(buf, "where workunitid=%d and hostid=%d", + asg.workunitid, g_reply->host.id + ); retval = result.lookup(buf); if (retval == ERR_DB_NOT_FOUND) { retval = send_assigned_job(asg); @@ -174,11 +148,7 @@ bool send_assigned_jobs() { break; case ASSIGN_TEAM: if (g_reply->team.id != asg.target_id) continue; - if (asg.multi) { - sprintf(buf, "where workunitid=%d and hostid=%d", asg.workunitid, g_reply->host.id); - } else { - sprintf(buf, "where workunitid=%d", asg.workunitid); - } + sprintf(buf, "where workunitid=%d and hostid=%d", asg.workunitid, g_reply->host.id); retval = result.lookup(buf); if (retval == ERR_DB_NOT_FOUND) { retval = send_assigned_job(asg); @@ -189,3 +159,76 @@ bool send_assigned_jobs() { } return sent_something; } + +// send non-multi assigned jobs +// +bool send_assigned_jobs() { + DB_ASSIGNMENT asg; + DB_RESULT result; + DB_WORKUNIT wu; + bool sent_something = false; + int retval; + + // for now, only look for user assignments + // + char buf[256]; + sprintf(buf, "target_type=%d and target_id=%d and multi=0", + ASSIGN_USER, g_reply->user.id + ); + while (asg.enumerate(buf)) { + if (!work_needed(false)) continue; + + // if the WU doesn't exist, delete the assignment record. + // + retval = wu.lookup_id(asg.workunitid); + if (retval) { + asg.delete_from_db(); + continue; + } + // don't send if WU is validation pending or completed, + // or has transition pending + // + if (wu.need_validate) continue; + if (wu.canonical_resultid) continue; + if (wu.transition_time < time(0)) continue; + + // don't send if we already sent one to this host + // + sprintf(buf, "where workunitid=%d and hostid=%d", + asg.workunitid, + g_request->host.id + ); + retval = result.lookup(buf); + if (retval != ERR_DB_NOT_FOUND) continue; + + // don't send if there's already one in progress to this user + // + sprintf(buf, + "where workunitid=%d and userid=%d and server_state=%d", + asg.workunitid, + g_reply->user.id, + RESULT_SERVER_STATE_IN_PROGRESS + ); + retval = result.lookup(buf); + if (retval != ERR_DB_NOT_FOUND) continue; + + // OK, send the job + // + retval = send_assigned_job(asg); + if (retval) continue; + + sent_something = true; + + // update the WU's transition time to time out this job + // + retval = wu.lookup_id(asg.workunitid); + if (retval) continue; + int new_tt = time(0) + wu.delay_bound; + if (new_tt < wu.transition_time) { + char buf2[256]; + sprintf(buf2, "transition_time=%d", new_tt); + wu.update_field(buf2); + } + } + return sent_something; +} diff --git a/sched/sched_assign.h b/sched/sched_assign.h index 3ad1519fea..ed61507889 100644 --- a/sched/sched_assign.h +++ b/sched/sched_assign.h @@ -16,3 +16,4 @@ // along with BOINC. If not, see . extern bool send_assigned_jobs(); +extern bool send_assigned_jobs_multi(); diff --git a/sched/sched_config.cpp b/sched/sched_config.cpp index afc04b5338..1ef48fe520 100644 --- a/sched/sched_config.cpp +++ b/sched/sched_config.cpp @@ -173,6 +173,7 @@ int SCHED_CONFIG::parse(FILE* f) { if (xp.parse_int("feeder_query_size", feeder_query_size)) continue; if (xp.parse_str("httpd_user", httpd_user, sizeof(httpd_user))) continue; if (xp.parse_bool("enable_assignment", enable_assignment)) continue; + if (xp.parse_bool("enable_assignment_multi", enable_assignment_multi)) continue; if (xp.parse_bool("job_size_matching", job_size_matching)) continue; if (xp.parse_bool("dont_send_jobs", dont_send_jobs)) continue; diff --git a/sched/sched_config.h b/sched/sched_config.h index 434ca4d1ff..83733b043c 100644 --- a/sched/sched_config.h +++ b/sched/sched_config.h @@ -95,6 +95,7 @@ struct SCHED_CONFIG { char httpd_user[256]; // user name under which web server runs (default: apache) bool enable_assignment; + bool enable_assignment_multi; bool job_size_matching; bool dont_send_jobs; diff --git a/sched/sched_send.cpp b/sched/sched_send.cpp index 7fc47972ab..d5114fa5ad 100644 --- a/sched/sched_send.cpp +++ b/sched/sched_send.cpp @@ -1846,6 +1846,17 @@ void send_work() { } } + if (config.enable_assignment_multi) { + if (send_assigned_jobs_multi()) { + if (config.debug_assignment) { + log_messages.printf(MSG_NORMAL, + "[assign] [HOST#%d] sent assigned jobs\n", g_reply->host.id + ); + } + goto done; + } + } + if (config.workload_sim && g_request->have_other_results_list) { init_ip_results( g_request->global_prefs.work_buf_min(), diff --git a/sched/sched_shmem.cpp b/sched/sched_shmem.cpp index 7ce7d02caf..1eda6e3e51 100644 --- a/sched/sched_shmem.cpp +++ b/sched/sched_shmem.cpp @@ -184,7 +184,7 @@ int SCHED_SHMEM::scan_tables() { } n = 0; - while (!assignment.enumerate()) { + while (!assignment.enumerate("multi <> 0")) { assignments[n++] = assignment; if (n == MAX_ASSIGNMENTS) { overflow("assignments", "MAX_ASSIGNMENTS"); diff --git a/sched/sched_util.cpp b/sched/sched_util.cpp index 833422642f..d0211ea329 100644 --- a/sched/sched_util.cpp +++ b/sched/sched_util.cpp @@ -302,6 +302,48 @@ bool app_plan_uses_gpu(const char* plan_class) { return false; } +// Arrange that further results for this workunit +// will be sent only to hosts with the given user ID. +// This could be used, for example, so that late workunits +// are sent only to cloud or cluster resources +// +int restrict_wu_to_user(DB_WORKUNIT& wu, int userid) { + DB_RESULT result; + DB_ASSIGNMENT asg; + char buf[256]; + int retval; + + // mark unsent results as DIDNT_NEED + // + sprintf(buf, "workunitid=%d and server_state=%d", + wu.id, RESULT_SERVER_STATE_UNSENT + ); + while (result.enumerate(buf)) { + char buf2[256]; + sprintf(buf2, "server_state=%d, outcome=%d", + RESULT_SERVER_STATE_OVER, + RESULT_OUTCOME_DIDNT_NEED + ); + result.update_field(buf2); + } + + // mark the WU as TRANSITION_NO_NEW_RESULTS + // + sprintf(buf, "transitioner_flags=%d", TRANSITION_NO_NEW_RESULTS); + retval = wu.update_field(buf); + if (retval) return retval; + + // create an assignment record + // + asg.clear(); + asg.create_time = time(0); + asg.target_id = userid; + asg.target_type = ASSIGN_USER; + asg.multi = 0; + asg.workunitid = wu.id; + retval = asg.insert(); + return retval; +} #ifdef GCL_SIMULATOR diff --git a/sched/sched_util.h b/sched/sched_util.h index 08f2fc5296..b458837fbe 100644 --- a/sched/sched_util.h +++ b/sched/sched_util.h @@ -91,6 +91,8 @@ extern bool is_arg(const char*, const char*); extern bool app_plan_uses_gpu(const char* plan_class); +extern int restrict_wu_to_user(DB_WORKUNIT& wu, int userid); + #ifdef GCL_SIMULATOR extern void simulator_signal_handler(int signum); extern void continue_simulation(const char *daemonname); diff --git a/sched/transitioner.cpp b/sched/transitioner.cpp index 9415852ca4..9c97516ffa 100644 --- a/sched/transitioner.cpp +++ b/sched/transitioner.cpp @@ -157,27 +157,6 @@ int handle_wu( TRANSITIONER_ITEM& wu_item = items[0]; TRANSITIONER_ITEM wu_item_original = wu_item; - // "assigned" WUs aren't supposed to be handled by the transitioner. - // If we get one, it's an error - // - if (config.enable_assignment && strstr(wu_item.name, ASSIGNED_WU_STR)) { - DB_WORKUNIT wu; - char buf[256]; - - wu.id = wu_item.id; - log_messages.printf(MSG_CRITICAL, - "Assigned WU %d unexpectedly found by transitioner\n", wu.id - ); - sprintf(buf, "transition_time=%d", INT_MAX); - retval = wu.update_field(buf); - if (retval) { - log_messages.printf(MSG_CRITICAL, - "update_field failed: %s\n", boincerror(retval) - ); - } - return 0; - } - // count up the number of results in various states, // and check for timed-out results // @@ -450,7 +429,9 @@ int handle_wu( // std::string values; char value_buf[MAX_QUERY_LEN]; - if (n_new_results_needed > 0) { + if (wu_item.transitioner_flags != TRANSITION_NO_NEW_RESULTS + && n_new_results_needed > 0 + ) { log_messages.printf( MSG_NORMAL, "[WU#%d %s] Generating %d more results (%d target - %d unsent - %d in progress - %d success)\n", diff --git a/tools/backend_lib.cpp b/tools/backend_lib.cpp index 21b1aad6f1..ad5dbfc7b7 100644 --- a/tools/backend_lib.cpp +++ b/tools/backend_lib.cpp @@ -309,7 +309,7 @@ int create_work( fprintf(stderr, "target_nresults > max_success_results; can't create job\n"); return ERR_INVALID_PARAM; } - if (strstr(wu.name, ASSIGNED_WU_STR)) { + if (wu.transitioner_flags & TRANSITION_NONE) { wu.transition_time = INT_MAX; } else { wu.transition_time = time(0); diff --git a/tools/create_work.cpp b/tools/create_work.cpp index 5a1b3a6b71..15c12603d7 100644 --- a/tools/create_work.cpp +++ b/tools/create_work.cpp @@ -225,14 +225,6 @@ int main(int argc, const char** argv) { sprintf(result_template_file, "templates/%s_out", app.name); } - if (assign_flag) { - if (!strstr(wu.name, ASSIGNED_WU_STR)) { - fprintf(stderr, - "Assigned WU names must contain '%s'\n", ASSIGNED_WU_STR - ); - exit(1); - } - } retval = config.parse_file(config_dir); if (retval) { fprintf(stderr, "Can't parse config file: %d\n", retval);