- scheduler: enforce max_wus_to_end, daily_result_quota,

and max_wus_in_progress when using score-based scheduling

svn path=/trunk/boinc/; revision=16106
This commit is contained in:
David Anderson 2008-10-01 20:58:28 +00:00
parent bb9d546a02
commit 632587c3b4
2 changed files with 53 additions and 7 deletions

View File

@ -7920,3 +7920,10 @@ David 1 Oct 2008
handle_request.cpp handle_request.cpp
sched_config.cpp,h sched_config.cpp,h
server_types.cpp,h server_types.cpp,h
David 1 Oct 2008
- scheduler: enforce max_wus_to_end, daily_result_quota,
and max_wus_in_progress when using score-based scheduling
sched/
sched_send.cpp

View File

@ -1451,13 +1451,32 @@ struct JOB_SET {
double est_time; double est_time;
double disk_usage; double disk_usage;
double disk_limit; double disk_limit;
int max_jobs;
std::list<JOB> jobs; // sorted high to low std::list<JOB> jobs; // sorted high to low
JOB_SET(double wr, double dl) { JOB_SET(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
work_req = wr; work_req = sreq.work_req_seconds;
est_time = 0; est_time = 0;
disk_usage = 0; disk_usage = 0;
disk_limit = dl; disk_limit = reply.wreq.disk_available;
max_jobs = config.max_wus_to_send;
int ncpus = effective_ncpus(reply.host), n;
if (config.daily_result_quota) {
if (reply.host.max_results_day == 0 || reply.host.max_results_day>config.daily_result_quota) {
reply.host.max_results_day = config.daily_result_quota;
}
reply.wreq.daily_result_quota = ncpus*reply.host.max_results_day;
n = reply.wreq.daily_result_quota - reply.host.nresults_today;
if (n < 0) n = 0;
if (n < max_jobs) max_jobs = n;
}
if (config.max_wus_in_progress) {
n = config.max_wus_in_progress*ncpus - reply.wreq.nresults_on_host;
if (n < 0) n = 0;
if (n < max_jobs) max_jobs = n;
}
} }
void add_job(JOB&); void add_job(JOB&);
double higher_score_disk_usage(double); double higher_score_disk_usage(double);
@ -1667,9 +1686,10 @@ double JOB_SET::lowest_score() {
return jobs.back().score; return jobs.back().score;
} }
// add the given job, and remove lowest-score jobs // add the given job, and remove lowest-score jobs that
// that are in excess of work request // - are in excess of work request
// or that cause the disk limit to be exceeded // - are in excess of per-request or per-day limits
// - cause the disk limit to be exceeded
// //
void JOB_SET::add_job(JOB& job) { void JOB_SET::add_job(JOB& job) {
while (!jobs.empty()) { while (!jobs.empty()) {
@ -1694,6 +1714,13 @@ void JOB_SET::add_job(JOB& job) {
break; break;
} }
} }
if (jobs.size() == max_jobs) {
JOB& worst_job = jobs.back();
jobs.pop_back();
ssp->wu_results[worst_job.index].state = WR_STATE_PRESENT;
}
list<JOB>::iterator i = jobs.begin(); list<JOB>::iterator i = jobs.begin();
while (i != jobs.end()) { while (i != jobs.end()) {
if (i->score < job.score) { if (i->score < job.score) {
@ -1749,7 +1776,7 @@ void JOB_SET::send(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
void send_work_matchmaker(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) { void send_work_matchmaker(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
int i, slots_locked=0; int i, slots_locked=0;
JOB_SET jobs (sreq.work_req_seconds, reply.wreq.disk_available); JOB_SET jobs (sreq, reply);
int min_slots = config.mm_min_slots; int min_slots = config.mm_min_slots;
if (!min_slots) min_slots = ssp->max_wu_results/2; if (!min_slots) min_slots = ssp->max_wu_results/2;
int max_slots = config.mm_max_slots; int max_slots = config.mm_max_slots;
@ -1758,6 +1785,10 @@ void send_work_matchmaker(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
lock_sema(); lock_sema();
i = rand() % ssp->max_wu_results; i = rand() % ssp->max_wu_results;
// scan through the job cache, maintaining a JOB_SET of jobs
// that we can send to this client, ordered by score.
//
for (int slots_scanned=0; slots_scanned<max_slots; slots_scanned++) { for (int slots_scanned=0; slots_scanned<max_slots; slots_scanned++) {
i = (i+1) % ssp->max_wu_results; i = (i+1) % ssp->max_wu_results;
WU_RESULT& wu_result = ssp->wu_results[i]; WU_RESULT& wu_result = ssp->wu_results[i];
@ -1774,6 +1805,11 @@ void send_work_matchmaker(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
JOB job; JOB job;
job.index = i; job.index = i;
// get score for this job, and skip it if it fails quick check.
// NOTE: the EDF check done in get_score()
// includes only in-progress jobs.
//
if (!job.get_score(sreq, reply)) { if (!job.get_score(sreq, reply)) {
continue; continue;
} }
@ -1782,6 +1818,7 @@ void send_work_matchmaker(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
"score for %s: %f\n", wu_result.workunit.name, job.score "score for %s: %f\n", wu_result.workunit.name, job.score
); );
} }
if (job.score > jobs.lowest_score() || !jobs.request_satisfied()) { if (job.score > jobs.lowest_score() || !jobs.request_satisfied()) {
ssp->wu_results[i].state = g_pid; ssp->wu_results[i].state = g_pid;
unlock_sema(); unlock_sema();
@ -1799,6 +1836,8 @@ void send_work_matchmaker(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
if (jobs.request_satisfied() && slots_scanned>=min_slots) break; if (jobs.request_satisfied() && slots_scanned>=min_slots) break;
} }
// TODO: trim jobs from tail of list until we pass the EDF check
//
jobs.send(sreq, reply); jobs.send(sreq, reply);
unlock_sema(); unlock_sema();
if (slots_locked > max_locked) { if (slots_locked > max_locked) {