- client: first whack at new work-fetch logic. Very preliminary.

svn path=/trunk/boinc/; revision=16754
This commit is contained in:
David Anderson 2008-12-31 23:07:59 +00:00
parent 5859c140f3
commit 8c591e31df
22 changed files with 487 additions and 785 deletions

View File

@ -10412,3 +10412,13 @@ Rom 30 Dec 2008
/
configure.ac
version.h
David 31 Dec 2008
- client: first whack at new work-fetch logic. Very preliminary.
client/
most files
lib/
coproc.cpp,h
sched/
sched_plan.cpp

View File

@ -766,7 +766,7 @@ int ACTIVE_TASK_SET::abort_project(PROJECT* project) {
task_iter++;
}
}
project->long_term_debt = 0;
project->clear_perm();
return 0;
}

View File

@ -108,10 +108,9 @@ static void debug_print_argv(char** argv) {
//
static void cuda_cmdline(ACTIVE_TASK* atp, char* cmdline) {
char buf[256];
COPROC* cp = gstate.coprocs.lookup("CUDA");
if (!cp) return;
if (!coproc_cuda) return;
for (int i=0; i<MAX_COPROC_INSTANCES; i++) {
if (cp->owner[i] == atp) {
if (coproc_cuda->owner[i] == atp) {
sprintf(buf, " --device %d", i);
strcat(cmdline, buf);
}

View File

@ -117,7 +117,6 @@ CLIENT_STATE::CLIENT_STATE():
cant_write_state_file = false;
debt_interval_start = 0;
total_wall_cpu_time_this_debt_interval = 0;
retry_shmem_time = 0;
must_schedule_cpus = true;
must_enforce_cpu_schedule = true;
@ -277,6 +276,7 @@ int CLIENT_STATE::init() {
msg_printf(NULL, MSG_INFO, "No coprocessors");
}
}
work_fetch.init();
// Check to see if we can write the state file.
//
@ -625,9 +625,6 @@ bool CLIENT_STATE::poll_slow_events() {
POLL_ACTION(enforce_schedule , enforce_schedule );
tasks_restarted = true;
}
if (!tasks_suspended && !network_suspended) {
POLL_ACTION(compute_work_requests, compute_work_requests );
}
if (!network_suspended) {
POLL_ACTION(scheduler_rpc , scheduler_rpc_poll );
}

View File

@ -245,10 +245,7 @@ private:
// --------------- cpu_sched.cpp:
private:
double debt_interval_start;
double total_wall_cpu_time_this_debt_interval;
// "wall CPU time" accumulated since last adjust_debts()
double total_cpu_time_this_debt_interval;
double cpu_shortfall;
bool work_fetch_no_new_work;
bool must_enforce_cpu_schedule;
bool must_schedule_cpus;
@ -401,8 +398,12 @@ private:
// --------------- cs_scheduler.cpp:
public:
int make_scheduler_request(PROJECT*);
int handle_scheduler_reply(PROJECT*, char* scheduler_url, int& nresults);
int handle_scheduler_reply(PROJECT*, char* scheduler_url);
SCHEDULER_OP* scheduler_op;
PROJECT* next_project_master_pending();
PROJECT* next_project_sched_rpc_pending();
PROJECT* next_project_trickle_up_pending();
PROJECT* find_project_with_overdue_results();
private:
bool contacted_sched_server;
int overall_work_fetch_urgency;
@ -458,7 +459,6 @@ public:
void free_mem();
// --------------- rr_sim.cpp:
void set_rrsim_flops(RESULT* rp, double rrs);
void rr_simulation();
void print_deadline_misses();
@ -466,11 +466,6 @@ public:
public:
int proj_min_results(PROJECT*, double);
void check_project_timeout();
PROJECT* next_project_master_pending();
PROJECT* next_project_sched_rpc_pending();
PROJECT* next_project_trickle_up_pending();
PROJECT* next_project_need_work();
PROJECT* find_project_with_overdue_results();
double overall_cpu_frac();
double time_until_work_done(PROJECT*, int, double);
bool compute_work_requests();

View File

@ -96,7 +96,8 @@ void PROJECT::init() {
non_cpu_intensive = false;
verify_files_on_app_start = false;
short_term_debt = 0;
long_term_debt = 0;
cpu_pwf.clear_perm();
cuda_pwf.clear_perm();
send_file_list = false;
send_time_stats_log = 0;
send_job_log = 0;
@ -109,10 +110,7 @@ void PROJECT::init() {
user_files.clear();
project_files.clear();
anticipated_debt = 0;
wall_cpu_time_this_debt_interval = 0;
next_runnable_result = NULL;
work_request = 0;
work_request_urgency = WORK_FETCH_DONT_NEED;
duration_correction_factor = 1;
project_files_downloaded_time = 0;
use_symlinks = false;
@ -188,7 +186,8 @@ int PROJECT::parse_state(MIOFILE& in) {
if (parse_bool(buf, "detach_when_done", detach_when_done)) continue;
if (parse_bool(buf, "ended", ended)) continue;
if (parse_double(buf, "<short_term_debt>", short_term_debt)) continue;
if (parse_double(buf, "<long_term_debt>", long_term_debt)) continue;
if (parse_double(buf, "<long_term_debt>", cpu_pwf.debt)) continue;
if (parse_double(buf, "<cuda_long_term_debt>", cuda_pwf.debt)) continue;
if (parse_double(buf, "<resource_share>", x)) continue; // not authoritative
if (parse_double(buf, "<duration_correction_factor>", duration_correction_factor)) continue;
if (parse_bool(buf, "attached_via_acct_mgr", attached_via_acct_mgr)) continue;
@ -240,6 +239,7 @@ int PROJECT::write_state(MIOFILE& out, bool gui_rpc) {
" <next_rpc_time>%f</next_rpc_time>\n"
" <short_term_debt>%f</short_term_debt>\n"
" <long_term_debt>%f</long_term_debt>\n"
" <cuda_long_term_debt>%f</cuda_long_term_debt>\n"
" <resource_share>%f</resource_share>\n"
" <duration_correction_factor>%f</duration_correction_factor>\n"
" <sched_rpc_pending>%d</sched_rpc_pending>\n"
@ -268,7 +268,8 @@ int PROJECT::write_state(MIOFILE& out, bool gui_rpc) {
min_rpc_time,
next_rpc_time,
short_term_debt,
long_term_debt,
cpu_pwf.debt,
cuda_pwf.debt,
resource_share,
duration_correction_factor,
sched_rpc_pending,
@ -350,7 +351,8 @@ void PROJECT::copy_state_fields(PROJECT& p) {
trickle_up_pending = p.trickle_up_pending;
safe_strcpy(code_sign_key, p.code_sign_key);
short_term_debt = p.short_term_debt;
long_term_debt = p.long_term_debt;
cpu_pwf = p.cpu_pwf;
cuda_pwf = p.cuda_pwf;
send_file_list = p.send_file_list;
send_time_stats_log = p.send_time_stats_log;
send_job_log = p.send_job_log;
@ -1127,6 +1129,7 @@ int APP_VERSION::parse(MIOFILE& in) {
int retval = cp->parse(in);
if (!retval) {
coprocs.coprocs.push_back(cp);
ncudas = cp->used;
} else {
msg_printf(0, MSG_INTERNAL_ERROR, "Error parsing <coproc>");
delete cp;

View File

@ -34,6 +34,7 @@
#include "cert_sig.h"
#include "hostinfo.h"
#include "coproc.h"
#include "work_fetch.h"
#include "miofile.h"
#include "rr_sim.h"
@ -299,7 +300,7 @@ public:
// everything from here on applies only to CPU intensive projects
/// not suspended and not deferred and not no more work
bool contactable();
bool can_request_work();
/// has a runnable result
bool runnable();
/// has a result in downloading state
@ -325,32 +326,24 @@ public:
/// computed over runnable projects
/// used for CPU scheduling
double short_term_debt;
/// Computed over potentially runnable projects
/// (defined for all projects, but doesn't change if
/// not potentially runnable).
/// Normalized so mean over all projects is zero
double long_term_debt;
/// expected debt by the end of the preemption period
double anticipated_debt;
/// how much "wall CPU time" has been devoted to this
/// project in the current debt interval
double wall_cpu_time_this_debt_interval;
/// the next result to run for this project
struct RESULT *next_runnable_result;
/// number of results in UPLOADING state
/// Don't start new results if these exceeds 2*ncpus.
int nuploading_results;
/// the unit is "project-normalized CPU seconds",
/// i.e. the work should take 1 CPU on this host
/// X seconds of wall-clock time to complete,
/// taking into account
/// 1) this project's fractional resource share
/// 2) on_frac and active_frac
/// see doc/sched.php
double work_request;
int work_request_urgency;
// stuff related to work fetch
//
RSC_PROJECT_WORK_FETCH cpu_pwf;
RSC_PROJECT_WORK_FETCH cuda_pwf;
PROJECT_WORK_FETCH pwf;
inline void clear_perm() {
cpu_pwf.clear_perm();
cuda_pwf.clear_perm();
}
/// # of results being returned in current scheduler op
int nresults_returned;
@ -417,6 +410,7 @@ struct APP_VERSION {
char api_version[16];
double avg_ncpus;
double max_ncpus;
double ncudas;
double flops;
/// additional cmdline args
char cmdline[256];

View File

@ -318,9 +318,11 @@ void CLIENT_STATE::reset_debt_accounting() {
unsigned int i;
for (i=0; i<projects.size(); i++) {
PROJECT* p = projects[i];
p->wall_cpu_time_this_debt_interval = 0.0;
p->cpu_pwf.reset_debt_accounting();
p->cuda_pwf.reset_debt_accounting();
}
total_wall_cpu_time_this_debt_interval = 0.0;
cpu_work_fetch.reset_debt_accounting();
cuda_work_fetch.reset_debt_accounting();
debt_interval_start = now;
}
@ -328,15 +330,14 @@ void CLIENT_STATE::reset_debt_accounting() {
//
void CLIENT_STATE::adjust_debts() {
unsigned int i;
double total_long_term_debt = 0;
double total_short_term_debt = 0;
double prrs, rrs;
int nprojects=0, nrprojects=0;
PROJECT *p;
double share_frac;
double wall_cpu_time = now - debt_interval_start;
double elapsed_time = now - debt_interval_start;
if (wall_cpu_time < 1) {
if (elapsed_time < 1) {
return;
}
@ -345,74 +346,41 @@ void CLIENT_STATE::adjust_debts() {
// Currently we don't have a way to estimate how long this was for,
// so ignore the last period and reset counters.
//
if (wall_cpu_time > global_prefs.cpu_scheduling_period()*2) {
if (elapsed_time > global_prefs.cpu_scheduling_period()*2) {
if (log_flags.debt_debug) {
msg_printf(NULL, MSG_INFO,
"[debt_debug] adjust_debt: elapsed time (%d) longer than sched period (%d). Ignoring this period.",
(int)wall_cpu_time, (int)global_prefs.cpu_scheduling_period()
(int)elapsed_time, (int)global_prefs.cpu_scheduling_period()
);
}
reset_debt_accounting();
return;
}
// Total up total and per-project "wall CPU" since last CPU reschedule.
// "Wall CPU" is the wall time during which a task was
// runnable (at the OS level).
//
// We use wall CPU for debt calculation
// (instead of reported actual CPU) for two reasons:
// 1) the process might have paged a lot, so the actual CPU
// may be a lot less than wall CPU
// 2) BOINC relies on apps to report their CPU time.
// Sometimes there are bugs and apps report zero CPU.
// It's safer not to trust them.
//
for (i=0; i<active_tasks.active_tasks.size(); i++) {
ACTIVE_TASK* atp = active_tasks.active_tasks[i];
if (atp->scheduler_state != CPU_SCHED_SCHEDULED) continue;
if (atp->wup->project->non_cpu_intensive) continue;
atp->result->project->wall_cpu_time_this_debt_interval += wall_cpu_time;
total_wall_cpu_time_this_debt_interval += wall_cpu_time;
p = atp->result->project;
if (p->non_cpu_intensive) continue;
work_fetch.accumulate_inst_sec(atp, elapsed_time);
}
// adjust long term debts
cpu_work_fetch.update_debts();
cuda_work_fetch.update_debts();
// adjust short term debts
rrs = runnable_resource_share();
prrs = potentially_runnable_resource_share();
for (i=0; i<projects.size(); i++) {
p = projects[i];
// potentially_runnable() can be false right after a result completes,
// but we still need to update its LTD.
// In this case its wall_cpu_time_this_debt_interval will be nonzero.
//
if (!(p->potentially_runnable()) && p->wall_cpu_time_this_debt_interval) {
prrs += p->resource_share;
}
}
for (i=0; i<projects.size(); i++) {
p = projects[i];
if (p->non_cpu_intensive) continue;
nprojects++;
// adjust long-term debts
//
if (p->potentially_runnable() || p->wall_cpu_time_this_debt_interval) {
share_frac = p->resource_share/prrs;
p->long_term_debt += share_frac*total_wall_cpu_time_this_debt_interval
- p->wall_cpu_time_this_debt_interval;
}
total_long_term_debt += p->long_term_debt;
// adjust short term debts
//
if (p->runnable()) {
nrprojects++;
share_frac = p->resource_share/rrs;
p->short_term_debt += share_frac*total_wall_cpu_time_this_debt_interval
- p->wall_cpu_time_this_debt_interval;
p->short_term_debt += share_frac*cpu_work_fetch.secs_this_debt_interval
- p->cpu_pwf.secs_this_debt_interval;
total_short_term_debt += p->short_term_debt;
} else {
p->short_term_debt = 0;
@ -422,16 +390,10 @@ void CLIENT_STATE::adjust_debts() {
if (nprojects==0) return;
// long-term debt:
// normalize so mean is zero,
// short-term debt:
// normalize so mean is zero, and limit abs value at MAX_STD
//
double avg_long_term_debt = total_long_term_debt / nprojects;
double avg_short_term_debt = 0;
if (nrprojects) {
avg_short_term_debt = total_short_term_debt / nrprojects;
}
double avg_short_term_debt = total_short_term_debt / nrprojects;
for (i=0; i<projects.size(); i++) {
p = projects[i];
if (p->non_cpu_intensive) continue;
@ -444,14 +406,6 @@ void CLIENT_STATE::adjust_debts() {
p->short_term_debt = -MAX_STD;
}
}
p->long_term_debt -= avg_long_term_debt;
if (log_flags.debt_debug) {
msg_printf(0, MSG_INFO,
"[debt_debug] adjust_debts(): project %s: STD %f, LTD %f",
p->project_name, p->short_term_debt, p->long_term_debt
);
}
}
reset_debt_accounting();
@ -1178,19 +1132,6 @@ double CLIENT_STATE::potentially_runnable_resource_share() {
return x;
}
double CLIENT_STATE::fetchable_resource_share() {
double x = 0;
for (unsigned int i=0; i<projects.size(); i++) {
PROJECT* p = projects[i];
if (p->non_cpu_intensive) continue;
if (p->long_term_debt < -global_prefs.cpu_scheduling_period()) continue;
if (p->contactable()) {
x += p->resource_share;
}
}
return x;
}
// same, but nearly runnable (could be downloading work right now)
//
double CLIENT_STATE::nearly_runnable_resource_share() {
@ -1294,6 +1235,7 @@ void CLIENT_STATE::set_ncpus() {
run_cpu_benchmarks = true;
request_schedule_cpus("Number of usable CPUs has changed");
request_work_fetch("Number of usable CPUs has changed");
work_fetch.init();
}
}

View File

@ -202,9 +202,8 @@ int CLIENT_STATE::app_finished(ACTIVE_TASK& at) {
rp->project->update_duration_correction_factor(&at);
}
double wall_cpu_time = now - debt_interval_start;
at.result->project->wall_cpu_time_this_debt_interval += wall_cpu_time;
total_wall_cpu_time_this_debt_interval += wall_cpu_time;
double elapsed_time = now - debt_interval_start;
work_fetch.accumulate_inst_sec(&at, elapsed_time);
return 0;
}

View File

@ -114,7 +114,6 @@ int CLIENT_STATE::make_scheduler_request(PROJECT* p) {
" <core_client_major_version>%d</core_client_major_version>\n"
" <core_client_minor_version>%d</core_client_minor_version>\n"
" <core_client_release>%d</core_client_release>\n"
" <work_req_seconds>%f</work_req_seconds>\n"
" <resource_share_fraction>%f</resource_share_fraction>\n"
" <rrs_fraction>%f</rrs_fraction>\n"
" <prrs_fraction>%f</prrs_fraction>\n"
@ -127,14 +126,14 @@ int CLIENT_STATE::make_scheduler_request(PROJECT* p) {
core_client_version.major,
core_client_version.minor,
core_client_version.release,
p->work_request,
resource_share_fraction,
rrs_fraction,
prrs_fraction,
time_until_work_done(p, proj_min_results(p, prrs)-1, prrs),
work_fetch.estimated_delay,
p->duration_correction_factor,
g_use_sandbox?1:0
);
work_fetch.write_request(p, f);
// write client capabilities
//
@ -331,6 +330,7 @@ bool CLIENT_STATE::scheduler_rpc_poll() {
PROJECT *p;
bool action=false;
static double last_time=0;
static double last_work_fetch_time = 0;
// check only every 5 sec
//
@ -366,11 +366,23 @@ bool CLIENT_STATE::scheduler_rpc_poll() {
action = true;
break;
}
if (!(exit_when_idle && contacted_sched_server)) {
scheduler_op->init_get_work();
if (scheduler_op->state != SCHEDULER_OP_STATE_IDLE) {
break;
}
// should we check work fetch? Do this infrequently.
if (exit_when_idle && contacted_sched_server) break;
if (tasks_suspended) break;
if (must_check_work_fetch) {
last_work_fetch_time = 0;
}
if (now - last_work_fetch_time < 60) return false;
last_work_fetch_time = now;
p = work_fetch.choose_project();
if (p) {
scheduler_op->init_op_project(p, RPC_REASON_NEED_WORK);
action = true;
break;
}
break;
default:
@ -385,9 +397,7 @@ bool CLIENT_STATE::scheduler_rpc_poll() {
// Handle the reply from a scheduler
//
int CLIENT_STATE::handle_scheduler_reply(
PROJECT* project, char* scheduler_url, int& nresults
) {
int CLIENT_STATE::handle_scheduler_reply(PROJECT* project, char* scheduler_url) {
SCHEDULER_REPLY sr;
FILE* f;
int retval;
@ -396,8 +406,8 @@ int CLIENT_STATE::handle_scheduler_reply(
char buf[256], filename[256];
std::string old_gui_urls = project->gui_urls;
PROJECT* p2;
vector<RESULT*>new_results;
nresults = 0;
contacted_sched_server = true;
project->last_rpc_time = now;
@ -752,8 +762,8 @@ int CLIENT_STATE::handle_scheduler_reply(
}
rp->wup->version_num = rp->version_num;
results.push_back(rp);
new_results.push_back(rp);
rp->set_state(RESULT_NEW, "handle_scheduler_reply");
nresults++;
est_duration += rp->estimated_duration(false);
}
if (log_flags.sched_op_debug) {
@ -878,15 +888,10 @@ int CLIENT_STATE::handle_scheduler_reply(
project->next_rpc_time = 0;
}
// if we asked for work and didn't get any,
// treat it as an RPC failure; back off this project
//
if (project->work_request && nresults==0) {
scheduler_op->backoff(project, "no work from project\n");
} else {
project->nrpc_failures = 0;
project->min_rpc_time = 0;
}
work_fetch.handle_reply(project, new_results);
project->nrpc_failures = 0;
project->min_rpc_time = 0;
if (sr.request_delay) {
double x = now + sr.request_delay;
@ -1029,4 +1034,13 @@ PROJECT* CLIENT_STATE::find_project_with_overdue_results() {
return 0;
}
// trigger work fetch
//
void CLIENT_STATE::request_work_fetch(const char* where) {
if (log_flags.work_fetch_debug) {
msg_printf(0, MSG_INFO, "[work_fetch_debug] Request work fetch: %s", where);
}
must_check_work_fetch = true;
}
const char *BOINC_RCSID_d35a4a7711 = "$Id$";

View File

@ -899,8 +899,8 @@ static void read_all_projects_list_file(MIOFILE& fout) {
static int set_debt(XML_PARSER& xp) {
bool is_tag;
char tag[256], url[256];
double short_term_debt = 0.0, long_term_debt = 0.0;
bool got_std=false, got_ltd=false;
double short_term_debt = 0.0, long_term_debt = 0.0, cuda_debt;;
bool got_std=false, got_ltd=false, got_cuda_debt=false;
strcpy(url, "");
while (!xp.get(tag, sizeof(tag), is_tag)) {
if (!strcmp(tag, "/project")) {
@ -909,7 +909,7 @@ static int set_debt(XML_PARSER& xp) {
PROJECT* p = gstate.lookup_project(url);
if (!p) return ERR_NOT_FOUND;
if (got_std) p->short_term_debt = short_term_debt;
if (got_ltd) p->long_term_debt = long_term_debt;
if (got_ltd) p->cpu_pwf.debt = long_term_debt;
return 0;
}
if (xp.parse_str(tag, "master_url", url, sizeof(url))) continue;
@ -921,6 +921,10 @@ static int set_debt(XML_PARSER& xp) {
got_ltd = true;
continue;
}
if (xp.parse_double(tag, "cuda_debt", cuda_debt)) {
got_cuda_debt = true;
continue;
}
if (log_flags.unparsed_xml) {
msg_printf(NULL, MSG_INFO,
"[unparsed_xml] set_debt: unrecognized %s", tag

View File

@ -6,10 +6,10 @@ OBJS = \
app.o \
client_msgs.o \
cs_apps.o \
cs_scheduler.o \
cs_scheduler.o \
cpu_sched.o \
log_flags.o \
rr_sim.o \
rr_sim.o \
sim.o \
sim_util.o \
time_stats.o \

View File

@ -24,12 +24,18 @@
#else
#include "client_state.h"
#endif
#include "coproc.h"
#include "client_msgs.h"
// this is here (rather than rr_sim.h) because its inline functions
// refer to RESULT
//
struct RR_SIM_STATUS {
std::vector<RESULT*> active;
COPROCS coprocs;
double active_ncpus;
double active_cudas;
inline bool can_run(RESULT* rp) {
return coprocs.sufficient_coprocs(
@ -47,6 +53,7 @@ struct RR_SIM_STATUS {
}
active.push_back(rp);
active_ncpus += rp->avp->avg_ncpus;
active_cudas += rp->avp->ncudas;
}
// remove *rpbest from active set,
// and adjust CPU time left for other results
@ -75,6 +82,7 @@ struct RR_SIM_STATUS {
}
}
active_ncpus -= rpbest->avp->avg_ncpus;
active_cudas -= rpbest->avp->ncudas;
}
RR_SIM_STATUS() {
@ -88,6 +96,7 @@ struct RR_SIM_STATUS {
void RR_SIM_PROJECT_STATUS::activate(RESULT* rp) {
active.push_back(rp);
active_ncpus += rp->avp->avg_ncpus;
active_cudas += rp->avp->ncudas;
}
bool RR_SIM_PROJECT_STATUS::can_run(RESULT* rp, int ncpus) {
@ -104,12 +113,13 @@ void RR_SIM_PROJECT_STATUS::remove_active(RESULT* r) {
}
}
active_ncpus -= r->avp->avg_ncpus;
active_cudas -= r->avp->ncudas;
}
// estimate the rate (FLOPS) that this job will get long-term
// with weighted round-robin scheduling
//
void CLIENT_STATE::set_rrsim_flops(RESULT* rp, double rrs) {
void set_rrsim_flops(RESULT* rp) {
// if it's a coproc job, use app version estimate
if (rp->uses_coprocs()) {
rp->rrsim_flops = rp->avp->flops;
@ -121,15 +131,16 @@ void CLIENT_STATE::set_rrsim_flops(RESULT* rp, double rrs) {
// running with other jobs of this project, ignoring other factors
//
double x = 1;
if (p->rr_sim_status.active_ncpus > ncpus) {
x = ncpus/p->rr_sim_status.active_ncpus;
if (p->rr_sim_status.active_ncpus > gstate.ncpus) {
x = gstate.ncpus/p->rr_sim_status.active_ncpus;
}
double r1 = x*rp->avp->avg_ncpus;
// if the project's total CPU usage is more than its share, scale
//
double rrs = cpu_work_fetch.runnable_resource_share;
double share_frac = rrs ? p->resource_share/rrs : 1;
double share_cpus = share_frac*ncpus;
double share_cpus = share_frac*gstate.ncpus;
double r2 = r1;
if (p->rr_sim_status.active_ncpus > share_cpus) {
r2 *= (share_cpus / p->rr_sim_status.active_ncpus);
@ -137,9 +148,9 @@ void CLIENT_STATE::set_rrsim_flops(RESULT* rp, double rrs) {
// scale by overall CPU availability
//
double r3 = r2 * overall_cpu_frac();
double r3 = r2 * gstate.overall_cpu_frac();
rp->rrsim_flops = r3 * host_info.p_fpops;
rp->rrsim_flops = r3 * gstate.host_info.p_fpops;
if (log_flags.rr_simulation) {
msg_printf(p, MSG_INFO,
"[rr_sim] set_rrsim_flops: %f (r1 %f r2 %f r3 %f)",
@ -209,10 +220,12 @@ void CLIENT_STATE::rr_simulation() {
sim_status.coprocs.clone(coprocs, false);
double ar = available_ram();
work_fetch.rr_init();
if (log_flags.rr_simulation) {
msg_printf(0, MSG_INFO,
"[rr_sim] rr_sim start: now %f work_buf_total %f ncpus %d",
now, work_buf_total(), ncpus
"[rr_sim] rr_sim start: now %f work_buf_total %f",
now, work_buf_total()
);
}
@ -242,33 +255,41 @@ void CLIENT_STATE::rr_simulation() {
rp->last_rr_sim_missed_deadline = rp->rr_sim_misses_deadline;
rp->rr_sim_misses_deadline = false;
if (rp->uses_coprocs()) {
p->rr_sim_status.has_coproc_jobs = true;
p->rr_sim_status.has_cuda_jobs = true;
} else {
p->rr_sim_status.has_cpu_jobs = true;
}
}
double trs = 0; // total resource share of CPU-int projects;
// determines CPU share (for shortfall computation)
double rrs = 0; // total resource share of nearly runnable CPU-int projects
// determines processing rate
// note the number of idle instances
//
cpu_work_fetch.nidle_now = ncpus - sim_status.active_ncpus;
if (coproc_cuda) {
cuda_work_fetch.nidle_now = coproc_cuda->count - coproc_cuda->used;
}
for (i=0; i<projects.size(); i++) {
p = projects[i];
if (p->non_cpu_intensive) continue;
if (!p->rr_sim_status.has_coproc_jobs || p->rr_sim_status.has_cpu_jobs) {
trs += p->resource_share;
if (p->rr_sim_status.has_cpu_jobs) {
cpu_work_fetch.total_resource_share += p->resource_share;
if (p->nearly_runnable()) {
rrs += p->resource_share;
cpu_work_fetch.runnable_resource_share += p->resource_share;
}
}
if (p->rr_sim_status.has_cuda_jobs) {
cuda_work_fetch.total_resource_share += p->resource_share;
if (p->nearly_runnable()) {
cuda_work_fetch.runnable_resource_share += p->resource_share;
}
}
}
double buf_end = now + work_buf_total();
// Simulation loop. Keep going until work done
// Simulation loop. Keep going until all work done
//
double sim_now = now;
cpu_shortfall = 0;
bool all_projects_have_pending = false;
while (sim_status.active.size()) {
@ -277,7 +298,7 @@ void CLIENT_STATE::rr_simulation() {
rpbest = NULL;
for (i=0; i<sim_status.active.size(); i++) {
rp = sim_status.active[i];
set_rrsim_flops(rp, rrs);
set_rrsim_flops(rp);
rp->rrsim_finish_delay = rp->rrsim_flops_left/rp->rrsim_flops;
if (!rpbest || rp->rrsim_finish_delay < rpbest->rrsim_finish_delay) {
rpbest = rp;
@ -318,66 +339,40 @@ void CLIENT_STATE::rr_simulation() {
}
}
// increment CPU shortfalls if necessary
// increment resource shortfalls
//
if (sim_now < buf_end) {
// check whether all projects have pending jobs;
// if so, we won't increment overall CPU shortfall
//
all_projects_have_pending = true;
for (i=0; i<projects.size(); i++) {
p = projects[i];
if (p->non_cpu_intensive) continue;
if (!p->rr_sim_status.pending.size()) {
all_projects_have_pending = false;
break;
}
}
double end_time = sim_now + rpbest->rrsim_finish_delay;
if (end_time > buf_end) end_time = buf_end;
double d_time = end_time - sim_now;
double nidle_cpus = ncpus - sim_status.active_ncpus;
if (nidle_cpus<0) nidle_cpus = 0;
if (nidle_cpus > 0 && !all_projects_have_pending) {
cpu_shortfall += d_time*nidle_cpus;
if (log_flags.rr_simulation) {
msg_printf(0, MSG_INFO,
"[rr_sim] new overall CPU shortfall: %f", cpu_shortfall
);
}
} else {
if (log_flags.rr_simulation) {
msg_printf(0, MSG_INFO,
"[rr_sim] no change in overall CPU shortfall: nidle %f all have pending %d",
nidle_cpus, all_projects_have_pending
);
}
}
if (sim_status.active_ncpus >= ncpus) {
work_fetch.estimated_delay = end_time - gstate.now;
}
cpu_work_fetch.accumulate_shortfall(d_time, sim_status.active_ncpus);
if (coproc_cuda) {
cuda_work_fetch.accumulate_shortfall(d_time, sim_status.active_cudas);
}
for (i=0; i<projects.size(); i++) {
p = projects[i];
if (p->non_cpu_intensive) continue;
if (p->rr_sim_status.pending.size()) continue;
double rsf = trs?p->resource_share/trs:1;
double proj_cpu_share = ncpus*rsf;
if (log_flags.rr_simulation) {
msg_printf(p, MSG_INFO,
"[rr_sim] npending %d last ncpus %f cpu share %f",
(int)p->rr_sim_status.pending.size(), p->rr_sim_status.active_ncpus, proj_cpu_share
);
}
double x = proj_cpu_share - p->rr_sim_status.active_ncpus;
if (x > 0) {
p->rr_sim_status.cpu_shortfall += d_time*x;
}
if (log_flags.rr_simulation) {
msg_printf(p, MSG_INFO,
"[rr_sim] new shortfall %f d_time %f proj_cpu_share %f lpan %f",
p->rr_sim_status.cpu_shortfall, d_time, proj_cpu_share, p->rr_sim_status.active_ncpus
);
}
RSC_PROJECT_WORK_FETCH& w = cpu_work_fetch.project_state(p);
if (w.debt_eligible(p)) {
p->cpu_pwf.accumulate_shortfall(
cpu_work_fetch, p, d_time, p->rr_sim_status.active_ncpus
);
}
if (coproc_cuda) {
RSC_PROJECT_WORK_FETCH& w = cuda_work_fetch.project_state(p);
if (w.debt_eligible(p)) {
p->cuda_pwf.accumulate_shortfall(
cuda_work_fetch, p, d_time, p->rr_sim_status.active_cudas
);
}
}
}
}
@ -402,8 +397,8 @@ void CLIENT_STATE::rr_simulation() {
// If all work done for a project, subtract that project's share
//
if (pbest->rr_sim_status.none_active()) {
if (!pbest->rr_sim_status.has_coproc_jobs || pbest->rr_sim_status.has_cpu_jobs) {
rrs -= pbest->resource_share;
if (pbest->rr_sim_status.has_cpu_jobs) {
cpu_work_fetch.runnable_resource_share -= pbest->resource_share;
}
}
@ -414,29 +409,25 @@ void CLIENT_STATE::rr_simulation() {
//
if (sim_now < buf_end) {
double d_time = buf_end - sim_now;
cpu_shortfall += d_time * ncpus;
cpu_work_fetch.accumulate_shortfall(d_time, 0);
if (coproc_cuda) {
cuda_work_fetch.accumulate_shortfall(d_time, 0);
}
for (i=0; i<projects.size(); i++) {
p = projects[i];
if (p->non_cpu_intensive) continue;
double rsf = trs?p->resource_share/trs:1;
double proj_cpu_share = ncpus*rsf;
p->rr_sim_status.cpu_shortfall += d_time*proj_cpu_share;
p->cpu_pwf.accumulate_shortfall(
cpu_work_fetch, p, d_time, 0
);
if (coproc_cuda) {
p->cuda_pwf.accumulate_shortfall(
cuda_work_fetch, p, d_time, 0
);
}
}
}
if (log_flags.rr_simulation) {
for (i=0; i<projects.size(); i++) {
p = projects[i];
if (p->non_cpu_intensive) continue;
if (p->rr_sim_status.cpu_shortfall) {
msg_printf(p, MSG_INFO,
"[rr_sim] shortfall %f\n", p->rr_sim_status.cpu_shortfall
);
}
}
msg_printf(NULL, MSG_INFO,
"[rr_sim] done; total shortfall %f\n",
cpu_shortfall
);
// call something
}
}

View File

@ -27,17 +27,16 @@ struct RR_SIM_PROJECT_STATUS {
std::vector<RESULT*>pending;
int deadlines_missed;
double active_ncpus;
double cpu_shortfall;
bool has_coproc_jobs;
double active_cudas;
bool has_cuda_jobs;
bool has_cpu_jobs;
inline void clear() {
active.clear();
pending.clear();
deadlines_missed = 0;
cpu_shortfall = 0;
active_ncpus = 0;
has_coproc_jobs = false;
has_cuda_jobs = false;
has_cpu_jobs = false;
}
void activate(RESULT* rp);

View File

@ -73,25 +73,6 @@ bool SCHEDULER_OP::check_master_fetch_start() {
return true;
}
// Try to get work from eligible project with biggest long term debt
// PRECONDITION: compute_work_requests() has been called
// to fill in PROJECT::work_request
// and CLIENT_STATE::overall_work_fetch_urgency
//
int SCHEDULER_OP::init_get_work() {
int retval;
PROJECT* p = gstate.next_project_need_work();
if (p) {
retval = init_op_project(p, RPC_REASON_NEED_WORK);
if (retval) {
return retval;
}
}
return 0;
}
// try to initiate an RPC to the given project.
// If there are multiple schedulers, start with a random one.
// User messages and backoff() is done at this level.
@ -122,7 +103,7 @@ int SCHEDULER_OP::init_op_project(PROJECT* p, int r) {
}
if (reason == RPC_REASON_INIT) {
p->work_request = 1;
work_fetch.set_initial_work_request(p);
if (!gstate.cpu_benchmarks_done()) {
gstate.cpu_benchmarks_set_defaults();
}
@ -207,17 +188,11 @@ int SCHEDULER_OP::start_rpc(PROJECT* p) {
int retval;
char request_file[1024], reply_file[1024];
// if requesting work, round up to 1 sec
//
if (p->work_request>0 && p->work_request<1) {
p->work_request = 1;
}
safe_strcpy(scheduler_url, p->get_scheduler_url(url_index, url_random));
if (log_flags.sched_ops) {
msg_printf(p, MSG_INFO,
"Sending scheduler request: %s. Requesting %.0f seconds of work, reporting %d completed tasks",
rpc_reason_string(reason), p->work_request, p->nresults_returned
rpc_reason_string(reason), p->cpu_pwf.shortfall, p->nresults_returned
);
}
@ -364,7 +339,7 @@ bool SCHEDULER_OP::update_urls(PROJECT* p, vector<std::string> &urls) {
// poll routine. If an operation is in progress, check for completion
//
bool SCHEDULER_OP::poll() {
int retval, nresults;
int retval;
vector<std::string> urls;
bool changed, scheduler_op_done;
@ -454,7 +429,7 @@ bool SCHEDULER_OP::poll() {
}
}
} else {
retval = gstate.handle_scheduler_reply(cur_proj, scheduler_url, nresults);
retval = gstate.handle_scheduler_reply(cur_proj, scheduler_url);
switch (retval) {
case 0:
break;
@ -465,7 +440,6 @@ bool SCHEDULER_OP::poll() {
backoff(cur_proj, "can't parse scheduler reply");
break;
}
cur_proj->work_request = 0; // don't ask again right away
}
cur_proj = NULL;
gstate.request_work_fetch("RPC complete");

View File

@ -73,7 +73,6 @@ public:
public:
SCHEDULER_OP(HTTP_OP_SET*);
bool poll();
int init_get_work();
int init_op_project(PROJECT*, int);
int init_master_fetch(PROJECT*);
bool check_master_fetch_start();

View File

@ -184,7 +184,7 @@ bool CLIENT_STATE::simulate_rpc(PROJECT* _p) {
}
last_time = now;
sprintf(buf, "RPC to %s; asking for %f<br>", p->project_name, p->work_request);
sprintf(buf, "RPC to %s; asking for %f<br>", p->project_name, p->cpu_pwf.shortfall);
html_msg += buf;
handle_completed_results();
@ -194,7 +194,7 @@ bool CLIENT_STATE::simulate_rpc(PROJECT* _p) {
}
bool sent_something = false;
double work_left = p->work_request;
double work_left = p->cpu_pwf.shortfall;
while (work_left > 0) {
RESULT* rp = new RESULT;
WORKUNIT* wup = new WORKUNIT;
@ -224,12 +224,11 @@ bool CLIENT_STATE::simulate_rpc(PROJECT* _p) {
work_left -= p->duration_correction_factor*wup->rsc_fpops_est/host_info.p_fpops;
}
if (p->work_request > 0 && !sent_something) {
if (p->cpu_pwf.shortfall > 0 && !sent_something) {
p->backoff();
} else {
p->nrpc_failures = 0;
}
p->work_request = 0;
request_schedule_cpus("simulate_rpc");
request_work_fetch("simulate_rpc");
return true;
@ -256,7 +255,7 @@ bool CLIENT_STATE::scheduler_rpc_poll() {
if (p) {
return simulate_rpc(p);
}
p = next_project_need_work();
p = work_fetch.choose_project();
if (p) {
return simulate_rpc(p);
}
@ -588,7 +587,6 @@ void CLIENT_STATE::simulate() {
action |= handle_finished_apps();
action |= possibly_schedule_cpus();
action |= enforce_schedule();
action |= compute_work_requests();
action |= scheduler_rpc_poll();
}
if (!action) break;

View File

@ -194,11 +194,10 @@ private:
void schedule_cpus();
bool enforce_schedule();
bool no_work_for_a_cpu();
void rr_simulation();
void set_rrsim_flops(RESULT* rp, double rrs);
void make_preemptable_task_list(vector<ACTIVE_TASK*>&, double&);
void print_deadline_misses();
public:
void rr_simulation();
std::vector <RESULT*> ordered_scheduled_results;
double retry_shmem_time;
inline double work_buf_min() {

View File

@ -15,532 +15,288 @@
// You should have received a copy of the GNU Lesser General Public License
// along with BOINC. If not, see <http://www.gnu.org/licenses/>.
// High-level logic for communicating with scheduling servers,
// and for merging the result of a scheduler RPC into the client state
// The scheduler RPC mechanism is in scheduler_op.C
#include "cpp.h"
#ifdef _WIN32
#include "boinc_win.h"
#endif
#ifndef _WIN32
#include "config.h"
#include <stdio.h>
#include <math.h>
#include <time.h>
#include <strings.h>
#include <map>
#include <set>
#endif
#include "error_numbers.h"
#include "file_names.h"
#include "filesys.h"
#include "parse.h"
#include "str_util.h"
#include "util.h"
#include "client_types.h"
#include "client_msgs.h"
#include "scheduler_op.h"
#ifdef SIM
#include "sim.h"
#else
#include "client_state.h"
#endif
using std::max;
#include "work_fetch.h"
using std::vector;
using std::string;
static const char* urgency_name(int urgency) {
switch(urgency) {
case WORK_FETCH_DONT_NEED: return "Don't need";
case WORK_FETCH_OK: return "OK";
case WORK_FETCH_NEED: return "Need";
case WORK_FETCH_NEED_IMMEDIATELY: return "Need immediately";
RSC_WORK_FETCH cuda_work_fetch;
RSC_WORK_FETCH cpu_work_fetch;
WORK_FETCH work_fetch;
RSC_PROJECT_WORK_FETCH& RSC_WORK_FETCH::project_state(PROJECT* p) {
switch(rsc_type) {
case RSC_TYPE_CPU: return p->cpu_pwf;
case RSC_TYPE_CUDA: return p->cuda_pwf;
}
return "Unknown";
}
// how many CPUs should this project occupy on average,
// based on its resource share relative to a given set
//
int CLIENT_STATE::proj_min_results(PROJECT* p, double subset_resource_share) {
if (p->non_cpu_intensive) {
return 1;
bool RSC_WORK_FETCH::may_have_work(PROJECT* p) {
RSC_PROJECT_WORK_FETCH& w = project_state(p);
return (w.backoff_time < gstate.now);
}
void RSC_WORK_FETCH::rr_init() {
shortfall = 0;
nidle_now = 0;
total_resource_share = 0;
runnable_resource_share = 0;
}
void WORK_FETCH::rr_init() {
cpu_work_fetch.rr_init();
if (coproc_cuda) {
cuda_work_fetch.rr_init();
}
if (!subset_resource_share) return 1; // TODO - fix
return (int)(ceil(ncpus*p->resource_share/subset_resource_share));
estimated_delay = 0;
}
// Return the best project to fetch work from, NULL if none
//
// Pick the one with largest (long term debt - amount of current work)
//
// PRECONDITIONS:
// - work_request_urgency and work_request set for all projects
// - CLIENT_STATE::overall_work_fetch_urgency is set
// (by previous call to compute_work_requests())
//
PROJECT* CLIENT_STATE::next_project_need_work() {
PROJECT *p, *p_prospect = NULL;
unsigned int i;
for (i=0; i<projects.size(); i++) {
p = projects[i];
if (p->work_request_urgency == WORK_FETCH_DONT_NEED) continue;
if (p->work_request == 0) continue;
if (!p->contactable()) continue;
// if we don't need work, only get work from non-cpu intensive projects.
//
if (overall_work_fetch_urgency == WORK_FETCH_DONT_NEED && !p->non_cpu_intensive) continue;
// if we don't really need work,
// and we don't really need work from this project, pass.
//
if (overall_work_fetch_urgency == WORK_FETCH_OK) {
if (p->work_request_urgency <= WORK_FETCH_OK) {
continue;
}
}
if (p_prospect) {
if (p->work_request_urgency == WORK_FETCH_OK &&
p_prospect->work_request_urgency > WORK_FETCH_OK
) {
continue;
}
if (p->long_term_debt + p->rr_sim_status.cpu_shortfall < p_prospect->long_term_debt + p_prospect->rr_sim_status.cpu_shortfall
&& !p->non_cpu_intensive
) {
continue;
}
}
p_prospect = p;
void RSC_WORK_FETCH::accumulate_shortfall(double d_time, double nused) {
double idle = ninstances - nused;
if (idle > 0) {
shortfall += idle*d_time;
}
if (p_prospect && (p_prospect->work_request <= 0)) {
p_prospect->work_request = 1.0;
if (log_flags.work_fetch_debug) {
msg_printf(0, MSG_INFO,
"[work_fetch_debug] next_project_need_work: project picked %s",
p_prospect->project_name
);
}
}
return p_prospect;
}
// the fraction of time a given CPU is working for BOINC
//
double CLIENT_STATE::overall_cpu_frac() {
double running_frac = time_stats.on_frac * time_stats.active_frac;
if (running_frac < 0.01) running_frac = 0.01;
if (running_frac > 1) running_frac = 1;
return running_frac;
}
// the expected number of CPU seconds completed by the client
// in a second of wall-clock time.
// May be > 1 on a multiprocessor.
//
double CLIENT_STATE::avg_proc_rate() {
return ncpus*overall_cpu_frac();
}
// estimate wall-clock time until the number of uncompleted results
// for project p will reach k,
// given the total resource share of a set of competing projects
//
double CLIENT_STATE::time_until_work_done(
PROJECT *p, int k, double subset_resource_share
void RSC_PROJECT_WORK_FETCH::accumulate_shortfall(
RSC_WORK_FETCH& rwf,
PROJECT* p,
double d_time,
double nused
) {
int num_results_to_skip = k;
double est = 0;
// total up the estimated time for this project's unstarted
// and partially completed results,
// omitting the last k
//
for (vector<RESULT*>::reverse_iterator iter = results.rbegin();
iter != results.rend(); iter++
) {
RESULT *rp = *iter;
if (rp->project != p
|| rp->state() > RESULT_FILES_DOWNLOADED
|| rp->ready_to_report
) continue;
if (num_results_to_skip > 0) {
--num_results_to_skip;
continue;
}
if (rp->project->non_cpu_intensive) {
// if it is a non_cpu intensive project,
// it needs only one at a time.
//
est = max(rp->estimated_time_remaining(true), work_buf_min());
} else {
est += rp->estimated_time_remaining(true);
}
}
if (log_flags.work_fetch_debug) {
msg_printf(NULL, MSG_INFO,
"[work_fetch_debug] time_until_work_done(): est %f ssr %f apr %f prs %f",
est, subset_resource_share, avg_proc_rate(), p->resource_share
);
}
if (subset_resource_share) {
double apr = avg_proc_rate()*p->resource_share/subset_resource_share;
return est/apr;
} else {
return est/avg_proc_rate(); // TODO - fix
double rsf = rwf.total_resource_share?p->resource_share/rwf.total_resource_share:1;
double share = rwf.ninstances * rsf;
double x = share - nused;
if (x > 0) {
shortfall += d_time * x;
}
}
// Top-level function for work fetch policy.
// Outputs:
// - overall_work_fetch_urgency
// - for each contactable project:
// - work_request and work_request_urgency
//
// Notes:
// - at most 1 CPU-intensive project will have a nonzero work_request
// and a work_request_urgency higher than DONT_NEED.
// This prevents projects with low LTD from getting work
// even though there was a higher LTD project that should get work.
// - all non-CPU-intensive projects that need work
// and are contactable will have a work request of 1.
//
// return false
//
bool CLIENT_STATE::compute_work_requests() {
unsigned int i;
static double last_time = 0;
PROJECT* RSC_WORK_FETCH::choose_project() {
PROJECT* pbest = NULL;
if (gstate.now - last_time >= 60) {
gstate.request_work_fetch("timer");
}
if (!must_check_work_fetch) return false;
if (log_flags.work_fetch_debug) {
msg_printf(0, MSG_INFO, "[work_fetch_debug] compute_work_requests(): start");
}
last_time = gstate.now;
must_check_work_fetch = false;
compute_nuploading_results();
adjust_debts();
#ifdef SIM
if (work_fetch_old) {
// "dumb" version for simulator only.
// for each project, compute extra work needed to bring it up to
// total_buf/relative resource share.
//
overall_work_fetch_urgency = WORK_FETCH_DONT_NEED;
double trs = total_resource_share();
double total_buf = ncpus*(work_buf_min() + work_buf_additional());
for (i=0; i<projects.size(); i++) {
PROJECT* p = projects[i];
double d = 0;
for (unsigned int j=0; j<results.size(); j++) {
RESULT* rp = results[j];
if (rp->project != p) continue;
d += rp->estimated_time_remaining(true);
}
double rrs = p->resource_share/trs;
double minq = total_buf*rrs;
if (d < minq) {
p->work_request = minq-d;
p->work_request_urgency = WORK_FETCH_NEED;
overall_work_fetch_urgency = WORK_FETCH_NEED;
} else {
p->work_request = 0;
p->work_request_urgency = WORK_FETCH_DONT_NEED;
}
}
return false;
}
#endif
rr_simulation();
// compute per-project and overall urgency
//
bool possible_deadline_miss = false;
bool project_shortfall = false;
bool non_cpu_intensive_needs_work = false;
for (i=0; i< projects.size(); i++) {
PROJECT* p = projects[i];
if (p->non_cpu_intensive) {
if (p->nearly_runnable() || !p->contactable() || p->some_result_suspended()) {
p->work_request = 0;
p->work_request_urgency = WORK_FETCH_DONT_NEED;
} else {
p->work_request = 1.0;
p->work_request_urgency = WORK_FETCH_NEED_IMMEDIATELY;
non_cpu_intensive_needs_work = true;
if (log_flags.work_fetch_debug) {
msg_printf(p, MSG_INFO,
"[work_fetch_debug] non-CPU-intensive project needs work"
);
}
return false;
}
} else {
p->work_request_urgency = WORK_FETCH_DONT_NEED;
p->work_request = 0;
if (p->rr_sim_status.deadlines_missed) {
possible_deadline_miss = true;
}
if (p->rr_sim_status.cpu_shortfall && p->long_term_debt > -global_prefs.cpu_scheduling_period()) {
project_shortfall = true;
}
}
}
if (cpu_shortfall <= 0.0 && (possible_deadline_miss || !project_shortfall)) {
overall_work_fetch_urgency = WORK_FETCH_DONT_NEED;
} else if (no_work_for_a_cpu()) {
overall_work_fetch_urgency = WORK_FETCH_NEED_IMMEDIATELY;
} else if (cpu_shortfall > 0) {
overall_work_fetch_urgency = WORK_FETCH_NEED;
} else {
overall_work_fetch_urgency = WORK_FETCH_OK;
}
if (log_flags.work_fetch_debug) {
msg_printf(0, MSG_INFO,
"[work_fetch_debug] compute_work_requests(): cpu_shortfall %f, overall urgency %s",
cpu_shortfall, urgency_name(overall_work_fetch_urgency)
);
}
if (overall_work_fetch_urgency == WORK_FETCH_DONT_NEED) {
if (non_cpu_intensive_needs_work) {
overall_work_fetch_urgency = WORK_FETCH_NEED_IMMEDIATELY;
}
return false;
}
// loop over projects, and pick one to get work from
//
double prrs = fetchable_resource_share();
PROJECT *pbest = NULL;
for (i=0; i<projects.size(); i++) {
PROJECT *p = projects[i];
// see if this project can be ruled out completely
//
for (unsigned i=0; i<gstate.projects.size(); i++) {
PROJECT* p = gstate.projects[i];
if (p->non_cpu_intensive) continue;
if (!p->rr_sim_status.cpu_shortfall && p->rr_sim_status.has_coproc_jobs) {
if (log_flags.work_fetch_debug) {
msg_printf(p, MSG_INFO,
"[work_fetch_debug] 0 shortfall and have coproc jobs; skipping"
);
}
continue;
}
if (!p->contactable()) {
if (log_flags.work_fetch_debug) {
msg_printf(p, MSG_INFO, "[work_fetch_debug] work fetch: project not contactable; skipping");
}
continue;
}
if ((p->deadlines_missed >= ncpus)
&& overall_work_fetch_urgency < WORK_FETCH_NEED
) {
if (log_flags.work_fetch_debug) {
msg_printf(p, MSG_INFO,
"[work_fetch_debug] project has %d deadline misses; skipping",
p->deadlines_missed
);
}
continue;
}
if (p->some_download_stalled()) {
if (log_flags.work_fetch_debug) {
msg_printf(p, MSG_INFO,
"[work_fetch_debug] project has stalled download; skipping"
);
}
continue;
}
if (p->some_result_suspended()) {
if (log_flags.work_fetch_debug) {
msg_printf(p, MSG_INFO, "[work_fetch_debug] project has suspended result; skipping");
}
continue;
}
if (p->overworked() && overall_work_fetch_urgency < WORK_FETCH_NEED) {
if (log_flags.work_fetch_debug) {
msg_printf(p, MSG_INFO, "[work_fetch_debug] project is overworked; skipping");
}
continue;
}
if (p->rr_sim_status.cpu_shortfall == 0.0 && overall_work_fetch_urgency < WORK_FETCH_NEED) {
if (log_flags.work_fetch_debug) {
msg_printf(p, MSG_INFO, "[work_fetch_debug] project has no shortfall; skipping");
}
continue;
}
if (p->nuploading_results > 2*ncpus) {
if (log_flags.work_fetch_debug) {
msg_printf(p, MSG_INFO,
"[work_fetch_debug] project has %d uploading results; skipping",
p->nuploading_results
);
}
continue;
}
// If the project's DCF is outside of reasonable limits,
// the project's WU FLOP estimates are not useful for predicting
// completion time.
// Switch to a simpler policy: ask for 1 sec of work if
// we don't have any.
//
if (p->duration_correction_factor < 0.02 || p->duration_correction_factor > 80.0) {
if (p->runnable()) {
if (log_flags.work_fetch_debug) {
msg_printf(p, MSG_INFO,
"[work_fetch_debug] project DCF %f out of range and have work; skipping",
p->duration_correction_factor
);
}
continue;
} else {
if (log_flags.work_fetch_debug) {
msg_printf(p, MSG_INFO,
"[work_fetch_debug] project DCF %f out of range: changing shortfall %f to 1.0",
p->duration_correction_factor, p->rr_sim_status.cpu_shortfall
);
}
p->rr_sim_status.cpu_shortfall = 1.0;
}
}
// see if this project is better than our current best
//
if (!p->can_request_work()) continue;
if (!may_have_work(p)) continue;
if (pbest) {
// avoid getting work from a project in deadline trouble
//
if (p->deadlines_missed && !pbest->deadlines_missed) {
if (log_flags.work_fetch_debug) {
msg_printf(p, MSG_INFO,
"[work_fetch_debug] project has deadline misses, %s doesn't",
pbest->get_project_name()
);
}
continue;
}
// avoid getting work from an overworked project
//
if (p->overworked() && !pbest->overworked()) {
if (log_flags.work_fetch_debug) {
msg_printf(p, MSG_INFO,
"[work_fetch_debug] project is overworked, %s isn't",
pbest->get_project_name()
);
}
continue;
}
// get work from project with highest LTD
//
if (pbest->long_term_debt + pbest->rr_sim_status.cpu_shortfall > p->long_term_debt + p->rr_sim_status.cpu_shortfall) {
if (log_flags.work_fetch_debug) {
msg_printf(p, MSG_INFO,
"[work_fetch_debug] project has less LTD than %s",
pbest->get_project_name()
);
}
if (pbest->pwf.overall_debt > p->pwf.overall_debt) {
continue;
}
}
pbest = p;
if (log_flags.work_fetch_debug) {
msg_printf(pbest, MSG_INFO, "[work_fetch_debug] best project so far");
}
return pbest;
}
void WORK_FETCH::set_overall_debts() {
for (unsigned i=0; i<gstate.projects.size(); i++) {
PROJECT* p = gstate.projects[i];
p->pwf.overall_debt = p->cpu_pwf.debt;
if (coproc_cuda) {
p->pwf.overall_debt += cuda_work_fetch.speed*p->cuda_pwf.debt;
}
}
}
if (pbest) {
pbest->work_request = max(
pbest->rr_sim_status.cpu_shortfall,
cpu_shortfall * (prrs ? pbest->resource_share/prrs : 1)
void RSC_WORK_FETCH::print_state(char* name) {
msg_printf(0, MSG_INFO,
"[wfd] %s: shortfall %.2f nidle %.2f total RS %.2f runnable RS %.2f",
name,
shortfall, nidle_now,
total_resource_share, runnable_resource_share
);
for (unsigned int i=0; i<gstate.projects.size(); i++) {
PROJECT* p = gstate.projects[i];
RSC_PROJECT_WORK_FETCH& pwf = project_state(p);
msg_printf(p, MSG_INFO,
"[wfd] %s: shortfall %.2f nidle %.2f",
name, pwf.shortfall, pwf.nidle_now
);
}
}
// sanity check
//
double x = 1.01*work_buf_total()*ncpus;
// the 1.01 is for round-off error
if (pbest->work_request > x) {
msg_printf(NULL, MSG_INTERNAL_ERROR,
"Proposed work request %f bigger than max %f",
pbest->work_request, x
);
pbest->work_request = x;
void WORK_FETCH::print_state() {
msg_printf(0, MSG_INFO, "[wfd] ------- start work fetch state -------");
cpu_work_fetch.print_state("CPU");
if (coproc_cuda) {
cuda_work_fetch.print_state("CUDA");
}
for (unsigned int i=0; i<gstate.projects.size(); i++) {
PROJECT* p = gstate.projects[i];
msg_printf(p, MSG_INFO, "[wfd] overall_debt %f", p->pwf.overall_debt);
}
msg_printf(0, MSG_INFO, "[wfd] ------- end work fetch state -------");
}
static void print_req(PROJECT* p) {
msg_printf(p, MSG_INFO,
"[wfd] request: CPU (%.2f sec, %.2f) CUDA (%.2f sec, %.2f)",
p->cpu_pwf.shortfall, p->cpu_pwf.nidle_now,
p->cuda_pwf.shortfall, p->cuda_pwf.nidle_now
);
}
// choose a project to fetch work from
//
PROJECT* WORK_FETCH::choose_project() {
PROJECT* p = 0;
gstate.rr_simulation();
set_overall_debts();
// if a resource is currently idle, get work for it;
// give GPU priority over CPU
//
if (coproc_cuda && cuda_work_fetch.nidle_now) {
p = cuda_work_fetch.choose_project();
if (p) {
p->cpu_pwf.shortfall = 0;
}
if (!pbest->nearly_runnable()) {
pbest->work_request_urgency = WORK_FETCH_NEED_IMMEDIATELY;
} else if (pbest->rr_sim_status.cpu_shortfall) {
pbest->work_request_urgency = WORK_FETCH_NEED;
}
if (!p && cpu_work_fetch.nidle_now) {
p = cpu_work_fetch.choose_project();
if (p) {
p->cuda_pwf.shortfall = 0;
}
}
if (!p && coproc_cuda && cuda_work_fetch.shortfall) {
p = cuda_work_fetch.choose_project();
}
if (!p && cpu_work_fetch.shortfall) {
p = cpu_work_fetch.choose_project();
}
if (log_flags.work_fetch_debug) {
print_state();
if (p) {
print_req(p);
} else {
pbest->work_request_urgency = WORK_FETCH_OK;
msg_printf(0, MSG_INFO, "No project chosen for work fetch");
}
if (log_flags.work_fetch_debug) {
msg_printf(pbest, MSG_INFO,
"[work_fetch_debug] compute_work_requests(): work req %f, shortfall %f, urgency %s\n",
pbest->work_request, pbest->rr_sim_status.cpu_shortfall,
urgency_name(pbest->work_request_urgency)
);
}
} else if (non_cpu_intensive_needs_work) {
overall_work_fetch_urgency = WORK_FETCH_NEED_IMMEDIATELY;
}
return false;
return p;
}
// called when benchmarks change
//
void CLIENT_STATE::scale_duration_correction_factors(double factor) {
if (factor <= 0) return;
for (unsigned int i=0; i<projects.size(); i++) {
PROJECT* p = projects[i];
p->duration_correction_factor *= factor;
void WORK_FETCH::accumulate_inst_sec(ACTIVE_TASK* atp, double dt) {
APP_VERSION* avp = atp->result->avp;
PROJECT* p = atp->result->project;
double x = dt*avp->avg_ncpus;
p->cpu_pwf.secs_this_debt_interval += x;
cpu_work_fetch.secs_this_debt_interval += x;
if (coproc_cuda) {
x = dt*coproc_cuda->used;
p->cuda_pwf.secs_this_debt_interval += x;
cuda_work_fetch.secs_this_debt_interval += x;
}
if (log_flags.cpu_sched_debug) {
msg_printf(NULL, MSG_INFO,
"[cpu_sched_debug] scaling duration correction factors by %f",
factor
}
void RSC_WORK_FETCH::update_debts() {
unsigned int i;
int nprojects = 0;
double ders = 0;
PROJECT* p;
for (i=0; i<gstate.projects.size(); i++) {
p = gstate.projects[i];
RSC_PROJECT_WORK_FETCH& w = project_state(p);
if (!w.debt_eligible(p)) continue;
ders += p->resource_share;
}
double total_debt = 0;
for (i=0; i<gstate.projects.size(); i++) {
p = gstate.projects[i];
RSC_PROJECT_WORK_FETCH& w = project_state(p);
if (w.debt_eligible(p)) {
double share_frac = p->resource_share/ders;
w.debt += share_frac*secs_this_debt_interval - w.secs_this_debt_interval;
}
total_debt += w.debt;
nprojects++;
}
// normalize so mean is zero,
//
double avg_debt = total_debt / nprojects;
for (i=0; i<gstate.projects.size(); i++) {
p = gstate.projects[i];
RSC_PROJECT_WORK_FETCH& w = project_state(p);
w.debt-= avg_debt;
}
}
bool RSC_PROJECT_WORK_FETCH::debt_eligible(PROJECT* p) {
if (backoff_interval > 0) return false;
if (p->suspended_via_gui) return false;
return true;
}
void WORK_FETCH::write_request(PROJECT* p, FILE* f) {
if (p->cpu_pwf.shortfall > 0 && p->cpu_pwf.shortfall < 1) {
p->cpu_pwf.shortfall = 1;
}
double work_req_seconds = p->cpu_pwf.shortfall;
fprintf(f,
" <cpu_req_seconds>%f</cpu_req_seconds>\n"
" <cpu_ninstances>%f</cpu_ninstances>\n",
p->cpu_pwf.shortfall,
p->cpu_pwf.nidle_now
);
if (coproc_cuda) {
if (p->cuda_pwf.shortfall > work_req_seconds) {
work_req_seconds = p->cuda_pwf.shortfall;
}
if (p->cuda_pwf.shortfall > 0 && p->cuda_pwf.shortfall < 1) {
p->cuda_pwf.shortfall = 1;
}
fprintf(f,
" <cuda_req_seconds>%f</cuda_req_seconds>\n"
" <cuda_ninstances>%f</cuda_ninstances>\n",
p->cuda_pwf.shortfall,
p->cuda_pwf.nidle_now
);
}
}
fprintf(f,
" <work_req_seconds>%f</work_req_seconds>\n",
work_req_seconds
);
}
// Choose a new host CPID.
// If using account manager, do scheduler RPCs
// to all acct-mgr-attached projects to propagate the CPID
// we just got a scheduler reply with the given jobs.
//
void CLIENT_STATE::generate_new_host_cpid() {
host_info.generate_host_cpid();
for (unsigned int i=0; i<projects.size(); i++) {
if (projects[i]->attached_via_acct_mgr) {
projects[i]->sched_rpc_pending = RPC_REASON_ACCT_MGR_REQ;
projects[i]->set_min_rpc_time(now + 15, "Sending new host CPID");
}
void WORK_FETCH::handle_reply(PROJECT* p, vector<RESULT*> new_results) {
unsigned int i;
for (i=0; i<new_results.size(); i++) {
RESULT* rp = new_results[i];
}
}
void WORK_FETCH::set_initial_work_request(PROJECT* p) {
p->cpu_pwf.shortfall = 1;
p->cuda_pwf.shortfall = 1;
}
void WORK_FETCH::init() {
cpu_work_fetch.rsc_type = RSC_TYPE_CPU;
cpu_work_fetch.ninstances = gstate.ncpus;
if (coproc_cuda) {
cuda_work_fetch.rsc_type = RSC_TYPE_CUDA;
cuda_work_fetch.ninstances = coproc_cuda->count;
cuda_work_fetch.speed = coproc_cuda->flops_estimate()/gstate.host_info.p_fpops;
}
}
////////////////////////
void CLIENT_STATE::compute_nuploading_results() {
unsigned int i;
@ -585,7 +341,7 @@ bool PROJECT::some_result_suspended() {
return false;
}
bool PROJECT::contactable() {
bool PROJECT::can_request_work() {
if (suspended_via_gui) return false;
if (master_url_fetch_pending) return false;
if (min_rpc_time > gstate.now) return false;
@ -595,7 +351,7 @@ bool PROJECT::contactable() {
bool PROJECT::potentially_runnable() {
if (runnable()) return true;
if (contactable()) return true;
if (can_request_work()) return true;
if (downloading()) return true;
return false;
}
@ -606,8 +362,8 @@ bool PROJECT::nearly_runnable() {
return false;
}
bool PROJECT::overworked() {
return long_term_debt < -gstate.global_prefs.cpu_scheduling_period();
bool RSC_PROJECT_WORK_FETCH::overworked() {
return debt < -gstate.global_prefs.cpu_scheduling_period();
}
bool RESULT::runnable() {
@ -677,13 +433,42 @@ double ACTIVE_TASK::est_time_to_completion(bool for_work_fetch) {
return x;
}
// trigger work fetch
//
void CLIENT_STATE::request_work_fetch(const char* where) {
if (log_flags.work_fetch_debug) {
msg_printf(0, MSG_INFO, "[work_fetch_debug] Request work fetch: %s", where);
}
must_check_work_fetch = true;
// the fraction of time a given CPU is working for BOINC
//
double CLIENT_STATE::overall_cpu_frac() {
double running_frac = time_stats.on_frac * time_stats.active_frac;
if (running_frac < 0.01) running_frac = 0.01;
if (running_frac > 1) running_frac = 1;
return running_frac;
}
// called when benchmarks change
//
void CLIENT_STATE::scale_duration_correction_factors(double factor) {
if (factor <= 0) return;
for (unsigned int i=0; i<projects.size(); i++) {
PROJECT* p = projects[i];
p->duration_correction_factor *= factor;
}
if (log_flags.cpu_sched_debug) {
msg_printf(NULL, MSG_INFO,
"[cpu_sched_debug] scaling duration correction factors by %f",
factor
);
}
}
// Choose a new host CPID.
// If using account manager, do scheduler RPCs
// to all acct-mgr-attached projects to propagate the CPID
//
void CLIENT_STATE::generate_new_host_cpid() {
host_info.generate_host_cpid();
for (unsigned int i=0; i<projects.size(); i++) {
if (projects[i]->attached_via_acct_mgr) {
projects[i]->sched_rpc_pending = RPC_REASON_ACCT_MGR_REQ;
projects[i]->set_min_rpc_time(now + 15, "Sending new host CPID");
}
}
}
const char *BOINC_RCSID_d3a4a7711 = "$Id$";

View File

@ -37,6 +37,8 @@
using std::string;
using std::vector;
COPROC_CUDA* coproc_cuda;
#ifndef _USING_FCGI_
void COPROC::write_xml(MIOFILE& f) {
f.printf(
@ -188,7 +190,7 @@ string COPROC_CUDA::get(COPROCS& coprocs) {
if (cc.prop.major > 100) continue; // e.g. 9999 is an error
if (real_count) {
if (cc.flops() > cc2.flops()) {
if (cc.flops_estimate() > cc2.flops_estimate()) {
cc2 = cc;
}
s += ", ";
@ -207,6 +209,7 @@ string COPROC_CUDA::get(COPROCS& coprocs) {
ccp->count = real_count;
strcpy(ccp->type, "CUDA");
coprocs.coprocs.push_back(ccp);
coproc_cuda = ccp;
if (real_count == 1) {
return "CUDA device: "+s;
} else {

View File

@ -136,8 +136,13 @@ struct COPROC_CUDA : public COPROC {
int parse(FILE*);
// rough estimate of FLOPS
inline double flops() {
return 2500*prop.clockRate * prop.multiProcessorCount;
// The following is based on SETI@home CUDA,
// which gets 50 GFLOPS on a Quadro FX 3700,
// which has 14 MPs and a clock rate of 1.25 MHz
//
inline double flops_estimate() {
double x = (prop.clockRate * prop.multiProcessorCount)*5e10/(14*1.25e6);
return x?x:5e10;
}
};
@ -150,4 +155,6 @@ struct COPROC_CELL_SPE : public COPROC {
void fake_cuda(COPROCS&, int);
extern COPROC_CUDA* coproc_cuda;
#endif

View File

@ -101,17 +101,7 @@ bool app_plan(SCHEDULER_REQUEST& sreq, char* plan_class, HOST_USAGE& hu) {
if (x > 1) x = 1;
hu.avg_ncpus = x;
hu.max_ncpus = x;
// estimate the FLOPS we're going to get from the GPU.
// The following is based on SETI@home CUDA,
// which gets 50 GFLOPS on a Quadro FX 3700,
// which has 14 MPs and a clock rate of 1.25 MHz
//
x = (double)cp2->prop.clockRate * (double)cp2->prop.multiProcessorCount;
double y = 14.*1250000.;
if (!x) x = y;
hu.flops = 5e10 * (x/y);
hu.flops = cp2->flops_estimate();
if (config.debug_version_select) {
log_messages.printf(MSG_DEBUG,
"CUDA app estimated %.2f GFLOPS (clock %d count %d)\n",