diff --git a/checkin_notes b/checkin_notes index d4552f1f74..f4e62a4e1d 100644 --- a/checkin_notes +++ b/checkin_notes @@ -1589,3 +1589,20 @@ David Feb 23 2008 client/ scheduler_op.C + +David Feb 25 2008 + - scheduler: add a general method for excluding hosts from job distribution. + config.xml has optional and elements, + which contain regular expressions matched against + os_name\tos_version and p_vendor\tp_model. + If a host matches either one, it's not sent jobs. + - scheduler: fix bug in job assignment + - scheduler: initial (incompleted, commented-out) code for + matchmaker scheduling + - server programs: declare "SCHED_CONFIG config" in sched_config.C; + remove declarations of it from all other .C files + (because I added a vector to it, I can no longer use memset + to initialize it to zero; instead, it must be a global variable, + not an automatic) + + sched/*.C diff --git a/sched/assimilator.C b/sched/assimilator.C index 4b46ac56b5..378deccb11 100644 --- a/sched/assimilator.C +++ b/sched/assimilator.C @@ -45,7 +45,6 @@ using std::vector; #define LOCKFILE "assimilator.out" #define PIDFILE "assimilator.pid" -SCHED_CONFIG config; bool update_db = true; bool noinsert = false; diff --git a/sched/census.C b/sched/census.C index 2fe6d0144d..4f744c4af7 100644 --- a/sched/census.C +++ b/sched/census.C @@ -44,7 +44,6 @@ void show_help() { int main(int argc, char** argv) { HR_INFO hri; int retval; - SCHED_CONFIG config; for (int i=0; i &wu_names) { - SCHED_CONFIG config; int retval, start_time=time(0); char keypath[256]; char buf[LARGE_BLOB_SIZE]; diff --git a/sched/message_handler.C b/sched/message_handler.C index 5d4c232bc1..4c9ca090dc 100644 --- a/sched/message_handler.C +++ b/sched/message_handler.C @@ -40,7 +40,6 @@ using namespace std; #include "sched_util.h" #include "sched_msgs.h" -SCHED_CONFIG config; char app_name[256]; extern int handle_message(MSG_FROM_HOST&); diff --git a/sched/request_file_list.C b/sched/request_file_list.C index dee35b2f3a..658ca09c31 100644 --- a/sched/request_file_list.C +++ b/sched/request_file_list.C @@ -38,8 +38,6 @@ #include "sched_config.h" #include "sched_util.h" -SCHED_CONFIG config; - int request_file_list(int host_id) { DB_MSG_TO_HOST mth; int retval; diff --git a/sched/sample_work_generator.C b/sched/sample_work_generator.C index e1c310cef9..a7eb12a80c 100644 --- a/sched/sample_work_generator.C +++ b/sched/sample_work_generator.C @@ -51,7 +51,6 @@ char* wu_template; DB_APP app; int start_time; int seqno; -SCHED_CONFIG config; // create one new job // diff --git a/sched/sched_assign.C b/sched/sched_assign.C index 502281e556..3126ba8818 100644 --- a/sched/sched_assign.C +++ b/sched/sched_assign.C @@ -119,7 +119,7 @@ bool send_assigned_jobs(SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply) { if (asg.resultid) continue; switch (asg.target_type) { case ASSIGN_NONE: - sprintf(buf, "hostid=%d and workunitid=%d", + sprintf(buf, "where hostid=%d and workunitid=%d", reply.host.id, asg.workunitid ); retval = result.lookup(buf); @@ -130,7 +130,7 @@ bool send_assigned_jobs(SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply) { break; case ASSIGN_HOST: if (reply.host.id != asg.target_id) continue; - sprintf(buf, "workunitid=%d", asg.workunitid); + sprintf(buf, "where workunitid=%d", asg.workunitid); retval = result.lookup(buf); if (retval == ERR_NOT_FOUND) { retval = send_assigned_job(asg, request, reply); @@ -140,9 +140,9 @@ bool send_assigned_jobs(SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply) { case ASSIGN_USER: if (reply.user.id != asg.target_id) continue; if (asg.multi) { - sprintf(buf, "workunitid=%d and hostid=%d", asg.workunitid, reply.host.id); + sprintf(buf, "where workunitid=%d and hostid=%d", asg.workunitid, reply.host.id); } else { - sprintf(buf, "workunitid=%d", asg.workunitid); + sprintf(buf, "where workunitid=%d", asg.workunitid); } retval = result.lookup(buf); if (retval == ERR_NOT_FOUND) { @@ -153,9 +153,9 @@ bool send_assigned_jobs(SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply) { case ASSIGN_TEAM: if (reply.team.id != asg.target_id) continue; if (asg.multi) { - sprintf(buf, "workunitid=%d and hostid=%d", asg.workunitid, reply.host.id); + sprintf(buf, "where workunitid=%d and hostid=%d", asg.workunitid, reply.host.id); } else { - sprintf(buf, "workunitid=%d", asg.workunitid); + sprintf(buf, "where workunitid=%d", asg.workunitid); } retval = result.lookup(buf); if (retval == ERR_NOT_FOUND) { diff --git a/sched/sched_config.C b/sched/sched_config.C index 4498033b93..0cf1322c05 100644 --- a/sched/sched_config.C +++ b/sched/sched_config.C @@ -36,18 +36,21 @@ const char* CONFIG_FILE = "config.xml"; +SCHED_CONFIG config; + const int MAX_NCPUS = 8; // max multiplier for daily_result_quota and max_wus_in_progress; // need to change as multicore processors expand int SCHED_CONFIG::parse(FILE* f) { - char tag[1024], temp[1024]; + char tag[1024], buf[256]; bool is_tag; MIOFILE mf; XML_PARSER xp(&mf); + int retval; + regex_t re; mf.init_file(f); - memset(this, 0, sizeof(SCHED_CONFIG)); max_wus_to_send = 10; default_disk_max_used_gb = 100.; default_disk_max_used_pct = 50.; @@ -139,13 +142,43 @@ int SCHED_CONFIG::parse(FILE* f) { if (xp.parse_bool(tag, "ended", ended)) continue; if (xp.parse_int(tag, "shmem_work_items", shmem_work_items)) continue; if (xp.parse_int(tag, "feeder_query_size", feeder_query_size)) continue; - if (xp.parse_bool(tag, "no_darwin_6", no_darwin_6)) continue; - if (xp.parse_bool(tag, "no_amd_k6", no_amd_k6)) continue; + if (xp.parse_bool(tag, "no_darwin_6", no_darwin_6)) { + if (no_darwin_6) { + regcomp(&re, ".*Darwin.*\t.*(5\\.|6\\.).*", REG_EXTENDED|REG_NOSUB); + ban_os.push_back(re); + } + continue; + } + if (xp.parse_bool(tag, "no_amd_k6", no_amd_k6)) { + if (no_amd_k6) { + regcomp(&re, ".*AMD.*\t.*Family 5 Model 8 Stepping 0.*", REG_EXTENDED|REG_NOSUB); + ban_cpu.push_back(re); + } + continue; + } if (xp.parse_str(tag, "httpd_user", httpd_user, sizeof(httpd_user))) continue; if (xp.parse_int(tag, "file_deletion_strategy", file_deletion_strategy)) continue; if (xp.parse_bool(tag, "request_time_stats_log", request_time_stats_log)) continue; if (xp.parse_bool(tag, "enable_assignment", enable_assignment)) continue; if (xp.parse_int(tag, "max_ncpus", max_ncpus)) continue; + if (xp.parse_str(tag, "ban_os", buf, sizeof(buf))) { + retval = regcomp(&re, buf, REG_EXTENDED|REG_NOSUB); + if (retval) { + log_messages.printf(MSG_CRITICAL, "BAD REGEXP: %s\n", buf); + } else { + ban_os.push_back(re); + } + continue; + } + if (xp.parse_str(tag, "ban_cpu", buf, sizeof(buf))) { + retval = regcomp(&re, buf, REG_EXTENDED|REG_NOSUB); + if (retval) { + log_messages.printf(MSG_CRITICAL, "BAD REGEXP: %s\n", buf); + } else { + ban_cpu.push_back(re); + } + continue; + } // don't complain about unparsed XML; // there are lots of tags the scheduler doesn't know about diff --git a/sched/sched_config.h b/sched/sched_config.h index 620a32eda6..ff1ed0879c 100644 --- a/sched/sched_config.h +++ b/sched/sched_config.h @@ -20,6 +20,10 @@ #ifndef _SCHED_CONFIG_ #define _SCHED_CONFIG_ +#include "regex.h" +#include +using std::vector; + // parsed version of server configuration file // class SCHED_CONFIG { @@ -112,6 +116,8 @@ public: bool request_time_stats_log; bool enable_assignment; int max_ncpus; + vector ban_os; + vector ban_cpu; int parse(FILE*); int parse_file(const char* dir="."); @@ -120,6 +126,8 @@ public: int download_path(const char*, char*); }; +extern SCHED_CONFIG config; + // get the project's home directory // (assumed to be the parent of the CWD) // diff --git a/sched/sched_send.C b/sched/sched_send.C index c87dbb96b9..6b03a4af0f 100644 --- a/sched/sched_send.C +++ b/sched/sched_send.C @@ -22,6 +22,7 @@ #include "config.h" #include +#include #include #include #include @@ -57,6 +58,8 @@ using namespace std; #define FCGI_ToFILE(x) (x) #endif +void send_work_matchmaker(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply); + const char* infeasible_string(int code) { switch (code) { case INFEASIBLE_MEM: return "Not enough memory"; @@ -953,6 +956,9 @@ void send_work(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) { reply.wreq.infeasible_only = false; send_work_locality(sreq, reply); } else { +#if 0 + send_work_matchmaker(sreq, reply); +#else // give top priority to results that require a 'reliable host' // if (reply.wreq.host_info.reliable) { @@ -983,6 +989,7 @@ void send_work(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) { reply.wreq.infeasible_only = false; scan_work_array(sreq, reply); +#endif } log_messages.printf(MSG_NORMAL, @@ -1127,4 +1134,260 @@ void send_work(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) { } } +#if 0 + +// compute a "value" for sending this WU to this host. +// return 0 if the WU is infeasible +// +double value(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, WU_RESULT& wu_result) { + bool found; + APP* app; + APP_VERSION* avp; + WORKUNIT wu; + int retval; + + wu = wu_result.workunit; + + // Find the app and app_version for the client's platform. + // + if (anonymous(sreq.platforms.list[0])) { + app = ssp->lookup_app(wu.appid); + found = sreq.has_version(*app); + if (!found) return 0; + avp = NULL; + } else { + found = find_app_version(sreq, reply.wreq, wu, app, avp); + if (!found) return 0; + + // see if the core client is too old. + // don't bump the infeasible count because this + // isn't the result's fault + // + if (!app_core_compatible(reply.wreq, *avp)) return 0; + } + if (app == NULL) return 0; // this should never happen + + retval = wu_is_infeasible(wu, sreq, reply, *app); + if (retval) { + log_messages.printf(MSG_DEBUG, + "[HOST#%d] [WU#%d %s] WU is infeasible: %s\n", + reply.host.id, wu.id, wu.name, infeasible_string(retval) + ); + return 0; + } + + double val = 1; + if (app->beta) { + if (reply.wreq.host_info.allow_beta_work) { + val += 1; + } else { + return 0; + } + } else { + if (reply.wreq.host_info.reliable && (wu_result.need_reliable)) { + val += 1; + } + } + + if (wu_result.infeasible_count) { + val += 1; + } + return val; +} + +bool wu_is_infeasible_slow(WU_RESULT& wu_result, SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) { + char buf[256]; + int retval; + int n; + DB_RESULT result; + + // Don't send if we've already sent a result of this WU to this user. + // + if (config.one_result_per_user_per_wu) { + sprintf(buf, + "where workunitid=%d and userid=%d", + wu_result.workunit.id, reply.user.id + ); + retval = result.count(n, buf); + if (retval) { + log_messages.printf(MSG_CRITICAL, + "send_work: can't get result count (%d)\n", retval + ); + return true; + } else { + if (n>0) { + log_messages.printf(MSG_DEBUG, + "send_work: user %d already has %d result(s) for WU %d\n", + reply.user.id, n, wu_result.workunit.id + ); + return true; + } + } + } else if (config.one_result_per_host_per_wu) { + // Don't send if we've already sent a result + // of this WU to this host. + // We only have to check this + // if we don't send one result per user. + // + sprintf(buf, + "where workunitid=%d and hostid=%d", + wu_result.workunit.id, reply.host.id + ); + retval = result.count(n, buf); + if (retval) { + log_messages.printf(MSG_CRITICAL, + "send_work: can't get result count (%d)\n", retval + ); + return true; + } else { + if (n>0) { + log_messages.printf(MSG_DEBUG, + "send_work: host %d already has %d result(s) for WU %d\n", + reply.host.id, n, wu_result.workunit.id + ); + return true; + } + } + } + + APP* app = ssp->lookup_app(wu_result.workunit.appid); + WORKUNIT wu = wu_result.workunit; + if (app_hr_type(*app)) { + if (already_sent_to_different_platform_careful( + sreq, reply.wreq, wu, *app + )) { + log_messages.printf(MSG_DEBUG, + "[HOST#%d] [WU#%d %s] WU is infeasible (assigned to different platform)\n", + reply.host.id, wu.id, wu.name + ); + // Mark the workunit as infeasible. + // This ensures that jobs already assigned to a platform + // are processed first. + // + wu_result.infeasible_count++; + return true; + } + } + return false; +} + +struct JOB{ + int index; + double value; + double est_time; + double disk_usage; +}; + +struct JOB_SET { + double work_req; + double est_time; + double disk_usage; + double disk_limit; + std::list jobs; // sorted + + void add_job(JOB&); + double higher_value_disk_usage(double); +}; + +// add the given job, and remove lowest-value jobs +// that are in excess of work request +// or that cause the disk limit to be exceeded +// +void JOB_SET::add_job(JOB& job) { + while (!jobs.empty()) { + JOB& worst_job = jobs.back(); + if (est_time + job.est_time - worst_job.est_time > work_req) { + est_time -= worst_job.est_time; + disk_usage -= worst_job.disk_usage; + jobs.pop_back(); + } + } + while (!jobs.empty()) { + JOB& worst_job = jobs.back(); + if (disk_usage + job.disk_usage > disk_limit) { + est_time -= worst_job.est_time; + disk_usage -= worst_job.disk_usage; + jobs.pop_back(); + } + } + list::iterator i = jobs.begin(); + while (i != jobs.end()) { + if (i->value < job.value) { + jobs.insert(i, job); + break; + } + i++; + } + if (i == jobs.end()) { + jobs.push_back(job); + } + est_time += job.est_time; + disk_usage += job.disk_usage; +} + +// return the disk usage of jobs above the given value +// +double JOB_SET::higher_value_disk_usage(double v) { + double sum = 0; + list::iterator i = jobs.begin(); + while (i != jobs.end()) { + if (i->value < v) break; + sum += i->disk_usage; + i++; + } + return sum; +} + +void send_work_matchmaker(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) { + int i, slots_scanned=0, slots_locked=0; + JOB_SET jobs; + int min_slots = 20; + int max_slots = 50; + int max_locked = 10; + int pid = getpid(); + + lock_sema(); + i = rand() % ssp->max_wu_results; + while (1) { + i = (i+1) % ssp->max_wu_results; + slots_scanned++; + if (slots_scanned >= max_slots) break; + WU_RESULT& wu_result = ssp->wu_results[i]; + switch (wu_result.state) { + case WR_STATE_EMPTY: + continue; + case WR_STATE_PRESENT: + break; + default: + slots_locked++; + continue; + } + + double v = value(sreq, reply, ssp->wu_results[i]); + if (v > jobs.lowest_value) { + ssp->wu_results[i] = pid; + unlock_sema(); + if (wu_is_infeasible_slow(wu_result, sreq, reply)) { + lock_sema(); + ssp->wu_results[i] = WR_STATE_EMPTY; + continue; + } + lock_sema(); + jobs->add(i); + } + + if (jobs->request_satisfied && slots_scanned>=MIN_SLOTS) break; + } + + jobs->send(); + unlock_sema(); + if (slots_locked > max_locked) { + log_messages.printf(MSG_CRITICAL, + "Found too many locked slots (%d>%d) - increase array size", + slots_locked, max_locked + ); + } +} +#endif + const char *BOINC_RCSID_32dcd335e7 = "$Id$"; diff --git a/sched/send_file.C b/sched/send_file.C index 14a034df82..f012ba7d67 100644 --- a/sched/send_file.C +++ b/sched/send_file.C @@ -42,8 +42,6 @@ #include "sched_config.h" #include "sched_util.h" -SCHED_CONFIG config; - void init_xfer_result(DB_RESULT& result) { result.id = 0; result.create_time = time(0); diff --git a/sched/show_shmem.C b/sched/show_shmem.C index 662aba7df7..825f843d2d 100644 --- a/sched/show_shmem.C +++ b/sched/show_shmem.C @@ -32,7 +32,6 @@ int main() { SCHED_SHMEM* ssp; int retval; void* p; - SCHED_CONFIG config; retval = config.parse_file("."); if (retval) { diff --git a/sched/transitioner.C b/sched/transitioner.C index c2ecd929da..68d0089a1e 100644 --- a/sched/transitioner.C +++ b/sched/transitioner.C @@ -52,7 +52,6 @@ using namespace std; #endif int startup_time; -SCHED_CONFIG config; R_RSA_PRIVATE_KEY key; int mod_n, mod_i; bool do_mod = false; diff --git a/sched/trickle_handler.C b/sched/trickle_handler.C index 2551916d01..32b55b3ad1 100644 --- a/sched/trickle_handler.C +++ b/sched/trickle_handler.C @@ -41,7 +41,6 @@ using namespace std; #include "sched_util.h" #include "sched_msgs.h" -SCHED_CONFIG config; char variety[256]; extern int handle_trickle(MSG_FROM_HOST&); diff --git a/sched/update_stats.C b/sched/update_stats.C index 7c97fa8362..d2c7bc9891 100644 --- a/sched/update_stats.C +++ b/sched/update_stats.C @@ -151,7 +151,6 @@ int update_teams() { } int main(int argc, char** argv) { - SCHED_CONFIG config; int retval, i; bool do_update_teams = false, do_update_users = false; bool do_update_hosts = false; diff --git a/sched/validator.C b/sched/validator.C index 8d9c9500f4..5e83eabf43 100644 --- a/sched/validator.C +++ b/sched/validator.C @@ -74,7 +74,6 @@ extern int check_pair( RESULT & new_result, RESULT const& canonical_result, bool& retry ); -SCHED_CONFIG config; char app_name[256]; int wu_id_modulus=0; int wu_id_remainder=0; diff --git a/sched/wu_check.C b/sched/wu_check.C index a974cb4ff0..b8251d0289 100644 --- a/sched/wu_check.C +++ b/sched/wu_check.C @@ -38,8 +38,6 @@ bool repair = false; // wu_checker // See whether input files that should be present, are -SCHED_CONFIG config; - // get the path a WU's input file // int get_file_path(WORKUNIT& wu, char* path) {