- scheduler: add a general method for excluding hosts from job distribution.

config.xml has optional <ban_os> and <ban_cpu> elements,
    which contain regular expressions matched against
    os_name\tos_version and p_vendor\tp_model.
    If a host matches either one, it's not sent jobs.
- scheduler: fix bug in job assignment
- scheduler: initial (incompleted, commented-out) code for
    matchmaker scheduling
- server programs: declare "SCHED_CONFIG config" in sched_config.C;
    remove declarations of it from all other .C files
    (because I added a vector to it, I can no longer use memset
    to initialize it to zero; instead, it must be a global variable,
    not an automatic)

svn path=/trunk/boinc/; revision=14783
This commit is contained in:
David Anderson 2008-02-25 18:05:04 +00:00
parent 53274d8b24
commit a09e19b8dc
27 changed files with 355 additions and 54 deletions

View File

@ -1589,3 +1589,20 @@ David Feb 23 2008
client/
scheduler_op.C
David Feb 25 2008
- scheduler: add a general method for excluding hosts from job distribution.
config.xml has optional <ban_os> and <ban_cpu> elements,
which contain regular expressions matched against
os_name\tos_version and p_vendor\tp_model.
If a host matches either one, it's not sent jobs.
- scheduler: fix bug in job assignment
- scheduler: initial (incompleted, commented-out) code for
matchmaker scheduling
- server programs: declare "SCHED_CONFIG config" in sched_config.C;
remove declarations of it from all other .C files
(because I added a vector to it, I can no longer use memset
to initialize it to zero; instead, it must be a global variable,
not an automatic)
sched/*.C

View File

@ -45,7 +45,6 @@ using std::vector;
#define LOCKFILE "assimilator.out"
#define PIDFILE "assimilator.pid"
SCHED_CONFIG config;
bool update_db = true;
bool noinsert = false;

View File

@ -44,7 +44,6 @@ void show_help() {
int main(int argc, char** argv) {
HR_INFO hri;
int retval;
SCHED_CONFIG config;
for (int i=0; i<argc; i++) {
if (!strcmp(argv[i], "--help") || !strcmp(argv[i], "-h")) {

View File

@ -765,7 +765,6 @@ void show_help() {
}
int main(int argc, char** argv) {
SCHED_CONFIG config;
int retval, i;
DUMP_SPEC spec;
char* db_host = 0;

View File

@ -92,7 +92,6 @@ using namespace std;
#define COMPRESSION_GZIP 1
#define COMPRESSION_ZIP 2
SCHED_CONFIG config;
FILE *wu_stream=NULL;
FILE *re_stream=NULL;
FILE *wu_index_stream=NULL;

View File

@ -41,8 +41,6 @@
#include "sched_config.h"
#include "sched_util.h"
SCHED_CONFIG config;
int delete_host_file(int host_id, const char* file_name) {
DB_MSG_TO_HOST mth;
int retval;

View File

@ -123,7 +123,6 @@ using std::vector;
#define ENUM_SECOND_PASS 1
#define ENUM_OVER 2
SCHED_CONFIG config;
SCHED_SHMEM* ssp;
key_t sema_key;
const char* order_clause="";

View File

@ -104,8 +104,6 @@ using namespace std;
#define SLEEP_INTERVAL 5
#define RESULTS_PER_WU 4 // an estimate of redundancy
SCHED_CONFIG config;
int id_modulus=0, id_remainder=0;
bool dont_retry_errors = false;
bool dont_delete_antiques = false;

View File

@ -47,8 +47,6 @@
#endif
#include "sched_msgs.h"
SCHED_CONFIG config;
#define ERR_TRANSIENT true
#define ERR_PERMANENT false

View File

@ -43,8 +43,6 @@
#include "sched_util.h"
#include "md5_file.h"
SCHED_CONFIG config;
void init_xfer_result(DB_RESULT& result) {
result.id = 0;
result.create_time = time(0);

View File

@ -871,25 +871,23 @@ void warn_user_if_core_client_upgrade_scheduled(
bool unacceptable_os(
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply
) {
if (config.no_darwin_6) {
log_messages.printf(MSG_DEBUG,
"OS version %s %s\n",
sreq.host.os_name, sreq.host.os_version
);
unsigned int i;
char buf[1024];
if (!strcmp(sreq.host.os_name, "Darwin") &&
(!strncmp(sreq.host.os_version, "5.", 2) ||
!strncmp(sreq.host.os_version, "6.", 2)
)
) {
for (i=0; i<config.ban_os.size(); i++) {
regex_t& re = config.ban_os[i];
strcpy(buf, sreq.host.os_name);
strcat(buf, "\t");
strcat(buf, sreq.host.os_version);
if (!regexec(&re, buf, 0, NULL, 0)) {
log_messages.printf(MSG_NORMAL,
"Unacceptable OS %s %s\n",
sreq.host.os_name, sreq.host.os_version
);
USER_MESSAGE um(
"Project only supports MacOS Darwin versions 7.X and above",
"low"
sprintf(buf, "This project doesn't support OS type %s %s",
sreq.host.os_name, sreq.host.os_version
);
USER_MESSAGE um(buf, "low");
reply.insert_message(um);
reply.set_delay(DELAY_UNACCEPTABLE_OS);
return true;
@ -901,13 +899,23 @@ bool unacceptable_os(
bool unacceptable_cpu(
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply
) {
if (config.no_amd_k6) {
if (strstr(sreq.host.p_vendor, "AMD") && strstr(sreq.host.p_model, "Family 5 Model 8 Stepping 0")) {
unsigned int i;
char buf[1024];
for (i=0; i<config.ban_cpu.size(); i++) {
regex_t& re = config.ban_cpu[i];
strcpy(buf, sreq.host.p_vendor);
strcat(buf, "\t");
strcat(buf, sreq.host.p_model);
if (!regexec(&re, buf, 0, NULL, 0)) {
log_messages.printf(MSG_NORMAL,
"Unacceptable CPU %s %s\n",
sreq.host.p_vendor, sreq.host.p_model
);
USER_MESSAGE um("Project doesn't support AMD K6 CPUs", "low");
sprintf(buf, "This project doesn't support CPU type %s %s",
sreq.host.p_vendor, sreq.host.p_model
);
USER_MESSAGE um(buf, "low");
reply.insert_message(um);
reply.set_delay(DELAY_UNACCEPTABLE_OS);
return true;

View File

@ -73,7 +73,6 @@ using namespace std;
#define REPLY_FILE_PREFIX "../boinc_reply/"
bool use_files = false; // use disk files for req/reply msgs (for debugging)
SCHED_CONFIG config;
GUI_URLS gui_urls;
PROJECT_FILES project_files;
key_t sema_key;

View File

@ -146,7 +146,6 @@ void make_new_wu(DB_WORKUNIT& original_wu, char* starting_xml, int start_time) {
}
void make_work(vector<string> &wu_names) {
SCHED_CONFIG config;
int retval, start_time=time(0);
char keypath[256];
char buf[LARGE_BLOB_SIZE];

View File

@ -40,7 +40,6 @@ using namespace std;
#include "sched_util.h"
#include "sched_msgs.h"
SCHED_CONFIG config;
char app_name[256];
extern int handle_message(MSG_FROM_HOST&);

View File

@ -38,8 +38,6 @@
#include "sched_config.h"
#include "sched_util.h"
SCHED_CONFIG config;
int request_file_list(int host_id) {
DB_MSG_TO_HOST mth;
int retval;

View File

@ -51,7 +51,6 @@ char* wu_template;
DB_APP app;
int start_time;
int seqno;
SCHED_CONFIG config;
// create one new job
//

View File

@ -119,7 +119,7 @@ bool send_assigned_jobs(SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply) {
if (asg.resultid) continue;
switch (asg.target_type) {
case ASSIGN_NONE:
sprintf(buf, "hostid=%d and workunitid=%d",
sprintf(buf, "where hostid=%d and workunitid=%d",
reply.host.id, asg.workunitid
);
retval = result.lookup(buf);
@ -130,7 +130,7 @@ bool send_assigned_jobs(SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply) {
break;
case ASSIGN_HOST:
if (reply.host.id != asg.target_id) continue;
sprintf(buf, "workunitid=%d", asg.workunitid);
sprintf(buf, "where workunitid=%d", asg.workunitid);
retval = result.lookup(buf);
if (retval == ERR_NOT_FOUND) {
retval = send_assigned_job(asg, request, reply);
@ -140,9 +140,9 @@ bool send_assigned_jobs(SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply) {
case ASSIGN_USER:
if (reply.user.id != asg.target_id) continue;
if (asg.multi) {
sprintf(buf, "workunitid=%d and hostid=%d", asg.workunitid, reply.host.id);
sprintf(buf, "where workunitid=%d and hostid=%d", asg.workunitid, reply.host.id);
} else {
sprintf(buf, "workunitid=%d", asg.workunitid);
sprintf(buf, "where workunitid=%d", asg.workunitid);
}
retval = result.lookup(buf);
if (retval == ERR_NOT_FOUND) {
@ -153,9 +153,9 @@ bool send_assigned_jobs(SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply) {
case ASSIGN_TEAM:
if (reply.team.id != asg.target_id) continue;
if (asg.multi) {
sprintf(buf, "workunitid=%d and hostid=%d", asg.workunitid, reply.host.id);
sprintf(buf, "where workunitid=%d and hostid=%d", asg.workunitid, reply.host.id);
} else {
sprintf(buf, "workunitid=%d", asg.workunitid);
sprintf(buf, "where workunitid=%d", asg.workunitid);
}
retval = result.lookup(buf);
if (retval == ERR_NOT_FOUND) {

View File

@ -36,18 +36,21 @@
const char* CONFIG_FILE = "config.xml";
SCHED_CONFIG config;
const int MAX_NCPUS = 8;
// max multiplier for daily_result_quota and max_wus_in_progress;
// need to change as multicore processors expand
int SCHED_CONFIG::parse(FILE* f) {
char tag[1024], temp[1024];
char tag[1024], buf[256];
bool is_tag;
MIOFILE mf;
XML_PARSER xp(&mf);
int retval;
regex_t re;
mf.init_file(f);
memset(this, 0, sizeof(SCHED_CONFIG));
max_wus_to_send = 10;
default_disk_max_used_gb = 100.;
default_disk_max_used_pct = 50.;
@ -139,13 +142,43 @@ int SCHED_CONFIG::parse(FILE* f) {
if (xp.parse_bool(tag, "ended", ended)) continue;
if (xp.parse_int(tag, "shmem_work_items", shmem_work_items)) continue;
if (xp.parse_int(tag, "feeder_query_size", feeder_query_size)) continue;
if (xp.parse_bool(tag, "no_darwin_6", no_darwin_6)) continue;
if (xp.parse_bool(tag, "no_amd_k6", no_amd_k6)) continue;
if (xp.parse_bool(tag, "no_darwin_6", no_darwin_6)) {
if (no_darwin_6) {
regcomp(&re, ".*Darwin.*\t.*(5\\.|6\\.).*", REG_EXTENDED|REG_NOSUB);
ban_os.push_back(re);
}
continue;
}
if (xp.parse_bool(tag, "no_amd_k6", no_amd_k6)) {
if (no_amd_k6) {
regcomp(&re, ".*AMD.*\t.*Family 5 Model 8 Stepping 0.*", REG_EXTENDED|REG_NOSUB);
ban_cpu.push_back(re);
}
continue;
}
if (xp.parse_str(tag, "httpd_user", httpd_user, sizeof(httpd_user))) continue;
if (xp.parse_int(tag, "file_deletion_strategy", file_deletion_strategy)) continue;
if (xp.parse_bool(tag, "request_time_stats_log", request_time_stats_log)) continue;
if (xp.parse_bool(tag, "enable_assignment", enable_assignment)) continue;
if (xp.parse_int(tag, "max_ncpus", max_ncpus)) continue;
if (xp.parse_str(tag, "ban_os", buf, sizeof(buf))) {
retval = regcomp(&re, buf, REG_EXTENDED|REG_NOSUB);
if (retval) {
log_messages.printf(MSG_CRITICAL, "BAD REGEXP: %s\n", buf);
} else {
ban_os.push_back(re);
}
continue;
}
if (xp.parse_str(tag, "ban_cpu", buf, sizeof(buf))) {
retval = regcomp(&re, buf, REG_EXTENDED|REG_NOSUB);
if (retval) {
log_messages.printf(MSG_CRITICAL, "BAD REGEXP: %s\n", buf);
} else {
ban_cpu.push_back(re);
}
continue;
}
// don't complain about unparsed XML;
// there are lots of tags the scheduler doesn't know about

View File

@ -20,6 +20,10 @@
#ifndef _SCHED_CONFIG_
#define _SCHED_CONFIG_
#include "regex.h"
#include <vector>
using std::vector;
// parsed version of server configuration file
//
class SCHED_CONFIG {
@ -112,6 +116,8 @@ public:
bool request_time_stats_log;
bool enable_assignment;
int max_ncpus;
vector<regex_t> ban_os;
vector<regex_t> ban_cpu;
int parse(FILE*);
int parse_file(const char* dir=".");
@ -120,6 +126,8 @@ public:
int download_path(const char*, char*);
};
extern SCHED_CONFIG config;
// get the project's home directory
// (assumed to be the parent of the CWD)
//

View File

@ -22,6 +22,7 @@
#include "config.h"
#include <vector>
#include <list>
#include <string>
#include <ctime>
#include <cstdio>
@ -57,6 +58,8 @@ using namespace std;
#define FCGI_ToFILE(x) (x)
#endif
void send_work_matchmaker(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply);
const char* infeasible_string(int code) {
switch (code) {
case INFEASIBLE_MEM: return "Not enough memory";
@ -953,6 +956,9 @@ void send_work(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
reply.wreq.infeasible_only = false;
send_work_locality(sreq, reply);
} else {
#if 0
send_work_matchmaker(sreq, reply);
#else
// give top priority to results that require a 'reliable host'
//
if (reply.wreq.host_info.reliable) {
@ -983,6 +989,7 @@ void send_work(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
reply.wreq.infeasible_only = false;
scan_work_array(sreq, reply);
#endif
}
log_messages.printf(MSG_NORMAL,
@ -1127,4 +1134,260 @@ void send_work(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
}
}
#if 0
// compute a "value" for sending this WU to this host.
// return 0 if the WU is infeasible
//
double value(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, WU_RESULT& wu_result) {
bool found;
APP* app;
APP_VERSION* avp;
WORKUNIT wu;
int retval;
wu = wu_result.workunit;
// Find the app and app_version for the client's platform.
//
if (anonymous(sreq.platforms.list[0])) {
app = ssp->lookup_app(wu.appid);
found = sreq.has_version(*app);
if (!found) return 0;
avp = NULL;
} else {
found = find_app_version(sreq, reply.wreq, wu, app, avp);
if (!found) return 0;
// see if the core client is too old.
// don't bump the infeasible count because this
// isn't the result's fault
//
if (!app_core_compatible(reply.wreq, *avp)) return 0;
}
if (app == NULL) return 0; // this should never happen
retval = wu_is_infeasible(wu, sreq, reply, *app);
if (retval) {
log_messages.printf(MSG_DEBUG,
"[HOST#%d] [WU#%d %s] WU is infeasible: %s\n",
reply.host.id, wu.id, wu.name, infeasible_string(retval)
);
return 0;
}
double val = 1;
if (app->beta) {
if (reply.wreq.host_info.allow_beta_work) {
val += 1;
} else {
return 0;
}
} else {
if (reply.wreq.host_info.reliable && (wu_result.need_reliable)) {
val += 1;
}
}
if (wu_result.infeasible_count) {
val += 1;
}
return val;
}
bool wu_is_infeasible_slow(WU_RESULT& wu_result, SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
char buf[256];
int retval;
int n;
DB_RESULT result;
// Don't send if we've already sent a result of this WU to this user.
//
if (config.one_result_per_user_per_wu) {
sprintf(buf,
"where workunitid=%d and userid=%d",
wu_result.workunit.id, reply.user.id
);
retval = result.count(n, buf);
if (retval) {
log_messages.printf(MSG_CRITICAL,
"send_work: can't get result count (%d)\n", retval
);
return true;
} else {
if (n>0) {
log_messages.printf(MSG_DEBUG,
"send_work: user %d already has %d result(s) for WU %d\n",
reply.user.id, n, wu_result.workunit.id
);
return true;
}
}
} else if (config.one_result_per_host_per_wu) {
// Don't send if we've already sent a result
// of this WU to this host.
// We only have to check this
// if we don't send one result per user.
//
sprintf(buf,
"where workunitid=%d and hostid=%d",
wu_result.workunit.id, reply.host.id
);
retval = result.count(n, buf);
if (retval) {
log_messages.printf(MSG_CRITICAL,
"send_work: can't get result count (%d)\n", retval
);
return true;
} else {
if (n>0) {
log_messages.printf(MSG_DEBUG,
"send_work: host %d already has %d result(s) for WU %d\n",
reply.host.id, n, wu_result.workunit.id
);
return true;
}
}
}
APP* app = ssp->lookup_app(wu_result.workunit.appid);
WORKUNIT wu = wu_result.workunit;
if (app_hr_type(*app)) {
if (already_sent_to_different_platform_careful(
sreq, reply.wreq, wu, *app
)) {
log_messages.printf(MSG_DEBUG,
"[HOST#%d] [WU#%d %s] WU is infeasible (assigned to different platform)\n",
reply.host.id, wu.id, wu.name
);
// Mark the workunit as infeasible.
// This ensures that jobs already assigned to a platform
// are processed first.
//
wu_result.infeasible_count++;
return true;
}
}
return false;
}
struct JOB{
int index;
double value;
double est_time;
double disk_usage;
};
struct JOB_SET {
double work_req;
double est_time;
double disk_usage;
double disk_limit;
std::list<JOB> jobs; // sorted
void add_job(JOB&);
double higher_value_disk_usage(double);
};
// add the given job, and remove lowest-value jobs
// that are in excess of work request
// or that cause the disk limit to be exceeded
//
void JOB_SET::add_job(JOB& job) {
while (!jobs.empty()) {
JOB& worst_job = jobs.back();
if (est_time + job.est_time - worst_job.est_time > work_req) {
est_time -= worst_job.est_time;
disk_usage -= worst_job.disk_usage;
jobs.pop_back();
}
}
while (!jobs.empty()) {
JOB& worst_job = jobs.back();
if (disk_usage + job.disk_usage > disk_limit) {
est_time -= worst_job.est_time;
disk_usage -= worst_job.disk_usage;
jobs.pop_back();
}
}
list<JOB>::iterator i = jobs.begin();
while (i != jobs.end()) {
if (i->value < job.value) {
jobs.insert(i, job);
break;
}
i++;
}
if (i == jobs.end()) {
jobs.push_back(job);
}
est_time += job.est_time;
disk_usage += job.disk_usage;
}
// return the disk usage of jobs above the given value
//
double JOB_SET::higher_value_disk_usage(double v) {
double sum = 0;
list<JOB>::iterator i = jobs.begin();
while (i != jobs.end()) {
if (i->value < v) break;
sum += i->disk_usage;
i++;
}
return sum;
}
void send_work_matchmaker(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
int i, slots_scanned=0, slots_locked=0;
JOB_SET jobs;
int min_slots = 20;
int max_slots = 50;
int max_locked = 10;
int pid = getpid();
lock_sema();
i = rand() % ssp->max_wu_results;
while (1) {
i = (i+1) % ssp->max_wu_results;
slots_scanned++;
if (slots_scanned >= max_slots) break;
WU_RESULT& wu_result = ssp->wu_results[i];
switch (wu_result.state) {
case WR_STATE_EMPTY:
continue;
case WR_STATE_PRESENT:
break;
default:
slots_locked++;
continue;
}
double v = value(sreq, reply, ssp->wu_results[i]);
if (v > jobs.lowest_value) {
ssp->wu_results[i] = pid;
unlock_sema();
if (wu_is_infeasible_slow(wu_result, sreq, reply)) {
lock_sema();
ssp->wu_results[i] = WR_STATE_EMPTY;
continue;
}
lock_sema();
jobs->add(i);
}
if (jobs->request_satisfied && slots_scanned>=MIN_SLOTS) break;
}
jobs->send();
unlock_sema();
if (slots_locked > max_locked) {
log_messages.printf(MSG_CRITICAL,
"Found too many locked slots (%d>%d) - increase array size",
slots_locked, max_locked
);
}
}
#endif
const char *BOINC_RCSID_32dcd335e7 = "$Id$";

View File

@ -42,8 +42,6 @@
#include "sched_config.h"
#include "sched_util.h"
SCHED_CONFIG config;
void init_xfer_result(DB_RESULT& result) {
result.id = 0;
result.create_time = time(0);

View File

@ -32,7 +32,6 @@ int main() {
SCHED_SHMEM* ssp;
int retval;
void* p;
SCHED_CONFIG config;
retval = config.parse_file(".");
if (retval) {

View File

@ -52,7 +52,6 @@ using namespace std;
#endif
int startup_time;
SCHED_CONFIG config;
R_RSA_PRIVATE_KEY key;
int mod_n, mod_i;
bool do_mod = false;

View File

@ -41,7 +41,6 @@ using namespace std;
#include "sched_util.h"
#include "sched_msgs.h"
SCHED_CONFIG config;
char variety[256];
extern int handle_trickle(MSG_FROM_HOST&);

View File

@ -151,7 +151,6 @@ int update_teams() {
}
int main(int argc, char** argv) {
SCHED_CONFIG config;
int retval, i;
bool do_update_teams = false, do_update_users = false;
bool do_update_hosts = false;

View File

@ -74,7 +74,6 @@ extern int check_pair(
RESULT & new_result, RESULT const& canonical_result, bool& retry
);
SCHED_CONFIG config;
char app_name[256];
int wu_id_modulus=0;
int wu_id_remainder=0;

View File

@ -38,8 +38,6 @@ bool repair = false;
// wu_checker
// See whether input files that should be present, are
SCHED_CONFIG config;
// get the path a WU's input file
//
int get_file_path(WORKUNIT& wu, char* path) {