client: improve work fetch in presence of max concurrent

Re-enable work buffering in the presence of max concurrent constraints.
See https://boinc.berkeley.edu/trac/wiki/WorkFetchMaxConcurrent
This commit is contained in:
David Anderson 2019-03-25 21:43:30 -07:00
parent fbed9f3290
commit 81a880c74d
6 changed files with 78 additions and 3 deletions

View File

@ -99,6 +99,8 @@ int APP_CONFIGS::config_app_versions(PROJECT* p, bool show_warnings) {
return 0;
}
// clear app- and project-level counters to enforce max concurrent limits
//
void max_concurrent_init() {
for (unsigned int i=0; i<gstate.apps.size(); i++) {
gstate.apps[i]->app_n_concurrent = 0;

View File

@ -99,6 +99,7 @@ struct RR_SIM {
active.push_back(rp);
rsc_work_fetch[0].sim_nused += rp->avp->avg_ncpus;
p->rsc_pwf[0].sim_nused += rp->avp->avg_ncpus;
int rt = rp->avp->gpu_usage.rsc_type;
if (rt) {
rsc_work_fetch[rt].sim_nused += rp->avp->gpu_usage.usage;
@ -118,7 +119,15 @@ struct RR_SIM {
#endif
}
}
if (have_max_concurrent) {
max_concurrent_inc(rp);
if (p->rsc_pwf[0].sim_nused > p->rsc_pwf[0].max_nused) {
p->rsc_pwf[0].max_nused = p->rsc_pwf[0].sim_nused;
}
if (rt && p->rsc_pwf[rt].sim_nused > p->rsc_pwf[0].max_nused) {
p->rsc_pwf[rt].max_nused = p->rsc_pwf[0].sim_nused;
}
}
}
void init_pending_lists();
@ -236,6 +245,7 @@ void RR_SIM::pick_jobs_to_run(double reltime) {
for (unsigned int i=0; i<gstate.projects.size(); i++) {
PROJECT* p = gstate.projects[i];
p->pwf.rec_temp_save = p->pwf.rec_temp;
p->pwf.at_max_concurrent_limit = false;
}
rsc_work_fetch[0].sim_nused = 0;
@ -254,11 +264,11 @@ void RR_SIM::pick_jobs_to_run(double reltime) {
//
for (unsigned int i=0; i<gstate.projects.size(); i++) {
PROJECT* p = gstate.projects[i];
if (p->pwf.at_max_concurrent_limit) continue;
RSC_PROJECT_WORK_FETCH& rsc_pwf = p->rsc_pwf[rt];
if (rsc_pwf.pending.size() ==0) continue;
rsc_pwf.pending_iter = rsc_pwf.pending.begin();
rsc_pwf.sim_nused = 0;
rsc_pwf.max_nused = 0;
p->pwf.rec_temp = p->pwf.rec;
p->compute_sched_priority();
project_heap.push_back(p);
@ -418,6 +428,30 @@ static void handle_missed_deadline(RESULT* rpbest, double diff, double ar) {
}
}
// update "MC shortfall" for projects with max concurrent restrictions
//
static void mc_update_stats(double sim_now, double dt, double buf_end) {
for (unsigned int i=0; i<gstate.projects.size(); i++) {
PROJECT* p = gstate.projects[i];
if (!p->app_configs.project_has_mc) continue;
for (int rt=0; rt<coprocs.n_rsc; rt++) {
RSC_PROJECT_WORK_FETCH& rsc_pwf = p->rsc_pwf[rt];
RSC_WORK_FETCH& rwf = rsc_work_fetch[rt];
double x = rsc_pwf.max_nused - rsc_pwf.sim_nused;
x = std::min(x, rwf.ninstances - rwf.sim_nused);
if (x > 1e-6 && sim_now < buf_end) {
double dt2;
if (sim_now + dt > buf_end) {
dt2 = buf_end - sim_now;
} else {
dt2 = dt;
}
rsc_pwf.mc_shortfall += x*dt2;
}
}
}
}
// do a round_robin simulation,
// for either CPU scheduling (to find deadline misses)
// or work fetch (do compute idleness and shortfall)
@ -557,9 +591,14 @@ void RR_SIM::simulate() {
}
}
// update shortfall and saturated time for each resource
//
for (int i=0; i<coprocs.n_rsc; i++) {
rsc_work_fetch[i].update_stats(sim_now, delta_t, buf_end);
}
if (have_max_concurrent) {
mc_update_stats(sim_now, delta_t, buf_end);
}
// update project REC
//
@ -612,6 +651,9 @@ void RR_SIM::simulate() {
for (int i=0; i<coprocs.n_rsc; i++) {
rsc_work_fetch[i].update_stats(sim_now, d_time, buf_end);
}
if (have_max_concurrent) {
mc_update_stats(sim_now, d_time, buf_end);
}
}
}

View File

@ -76,6 +76,7 @@ void RSC_PROJECT_WORK_FETCH::rr_init() {
sim_nused = 0;
nused_total = 0;
deadlines_missed = 0;
mc_shortfall = 0;
}
void RSC_PROJECT_WORK_FETCH::resource_backoff(PROJECT* p, const char* name) {
@ -177,6 +178,9 @@ void RSC_WORK_FETCH::rr_init() {
sim_used_instances = 0;
}
// update shortfall and saturated time for a given resource;
// called at each time step in RR sim
//
void RSC_WORK_FETCH::update_stats(double sim_now, double dt, double buf_end) {
double idle = ninstances - sim_nused;
if (idle > 1e-6 && sim_now < buf_end) {
@ -222,6 +226,15 @@ void RSC_WORK_FETCH::set_request(PROJECT* p) {
}
RSC_PROJECT_WORK_FETCH& w = project_state(p);
double non_excl_inst = ninstances - w.ncoprocs_excluded;
// if this project has max concurrent,
// use the project-specific "MC shortfall" instead of global shortfall
//
if (p->app_configs.project_has_mc) {
RSC_PROJECT_WORK_FETCH& rsc_pwf = p->rsc_pwf[rsc_type];
shortfall = rsc_pwf.mc_shortfall;
}
if (shortfall) {
if (wacky_dcf(p)) {
// if project's DCF is too big or small,

View File

@ -93,6 +93,7 @@ struct RSC_PROJECT_WORK_FETCH {
int n_runnable_jobs;
double sim_nused;
// # of instances used at this point in the simulation
// Used for GPU exclusion logic
double nused_total; // sum of instances over all runnable jobs
int ncoprocs_excluded;
// number of excluded instances
@ -114,6 +115,13 @@ struct RSC_PROJECT_WORK_FETCH {
// If zero, it's OK to ask this project for this type of work.
// If nonzero, the reason why it's not OK
// stuff for max concurrent logic
//
double max_nused;
// max # instances used so far in simulation.
double mc_shortfall;
// project's shortfall for this resources, given MC limits
RSC_PROJECT_WORK_FETCH() {
backoff_time = 0;
backoff_interval = 0;
@ -218,6 +226,7 @@ struct RSC_WORK_FETCH {
double nidle_now;
// # idle instances now (at the beginning of RR sim)
double sim_nused;
// # instance used at this point in RR sim
COPROC_INSTANCE_BITMAP sim_used_instances;
// bitmap of instances used in simulation,
// taking into account GPU exclusions

View File

@ -729,7 +729,9 @@ int APP_CONFIG::parse(XML_PARSER& xp, MSG_VEC& mv, LOG_FLAGS& log_flags) {
if (xp.match_tag("/app")) return 0;
if (xp.parse_str("name", name, 256)) continue;
if (xp.parse_int("max_concurrent", max_concurrent)) {
if (max_concurrent) have_max_concurrent = true;
if (max_concurrent) {
have_max_concurrent = true;
}
continue;
}
if (xp.match_tag("gpu_versions")) {
@ -800,6 +802,9 @@ int APP_CONFIGS::parse(XML_PARSER& xp, MSG_VEC& mv, LOG_FLAGS& log_flags) {
int retval = ac.parse(xp, mv, log_flags);
if (retval) return retval;
app_configs.push_back(ac);
if (ac.max_concurrent) {
project_has_mc = true;
}
continue;
}
if (xp.match_tag("app_version")) {
@ -812,6 +817,7 @@ int APP_CONFIGS::parse(XML_PARSER& xp, MSG_VEC& mv, LOG_FLAGS& log_flags) {
if (xp.parse_int("project_max_concurrent", n)) {
if (n >= 0) {
have_max_concurrent = true;
project_has_mc = true;
project_max_concurrent = n;
}
continue;

View File

@ -240,6 +240,8 @@ struct APP_CONFIGS {
std::vector<APP_CONFIG> app_configs;
std::vector<APP_VERSION_CONFIG> app_version_configs;
int project_max_concurrent;
bool project_has_mc;
// have app- or project-level max concurrent restriction
bool report_results_immediately;
int parse(XML_PARSER&, MSG_VEC&, LOG_FLAGS&);
@ -250,6 +252,7 @@ struct APP_CONFIGS {
app_configs.clear();
app_version_configs.clear();
project_max_concurrent = 0;
project_has_mc = false;
report_results_immediately = false;
}
};