- client emulator: parse <max_concurrent> in <app> in client_state.xml.

This gives you a way to simulate the effects of app_config.xml
- client: piggyback requests for resources even if we're backed off from them
- client: change resource backoff logic
    Old: if we requested work and didn't get any,
        back off from resources for which we requested work
    New: for each resource type T:
        if we requested work for T and didn't get any, back off from T
        Also, don't back off if we're already backed off
            (i.e. if this is a piggyback request)
        Also, only back off if the RPC was due to an automatic
            and potentially rapid source
            (namely: work fetch, result report, trickle up)
- client: fix small work fetch bug
This commit is contained in:
David Anderson 2013-04-04 10:25:56 -07:00
parent 0eb9551084
commit 330a25893f
5 changed files with 60 additions and 38 deletions

View File

@ -143,6 +143,10 @@ int APP::parse(XML_PARSER& xp) {
fpops.parse(xp, "/fpops");
continue;
}
if (xp.parse_int("max_concurrent", max_concurrent)) {
if (max_concurrent) have_max_concurrent = true;
continue;
}
if (xp.match_tag("checkpoint_period")) {
checkpoint_period.parse(xp, "/checkpoint_period");
continue;

View File

@ -247,6 +247,7 @@ struct APP {
int max_concurrent;
// Limit on # of concurrent jobs of this app; 0 if none
// Specified in app_config.xml
// Can also specify in client_state.xml (for client emulator)
int n_concurrent;
// temp during job scheduling, to enforce max_concurrent
int non_excluded_instances[MAX_RSC];

View File

@ -1168,12 +1168,17 @@ void show_app(APP* app) {
fprintf(summary_file,
" app %s\n"
" job params: fpops_est %.0fG fpops mean %.0fG std_dev %.0fG\n"
" latency %.2f weight %.2f\n",
" latency %.2f weight %.2f",
app->name, app->fpops_est/1e9,
app->fpops.mean/1e9, app->fpops.std_dev/1e9,
app->latency_bound,
app->weight
);
if (app->max_concurrent) {
fprintf(summary_file, " max_concurrent %d\n", app->max_concurrent);
} else {
fprintf(summary_file, "\n");
}
for (unsigned int i=0; i<gstate.app_versions.size(); i++) {
APP_VERSION* avp = gstate.app_versions[i];
if (avp->app != app) continue;

View File

@ -41,7 +41,7 @@ using std::vector;
RSC_WORK_FETCH rsc_work_fetch[MAX_RSC];
WORK_FETCH work_fetch;
inline bool dont_fetch(PROJECT* p, int rsc_type) {
static inline bool dont_fetch(PROJECT* p, int rsc_type) {
if (p->no_rsc_pref[rsc_type]) return true;
if (p->no_rsc_config[rsc_type]) return true;
if (p->no_rsc_apps[rsc_type]) return true;
@ -187,7 +187,6 @@ static bool wacky_dcf(PROJECT* p) {
// don't request anything if project is backed off.
//
void RSC_WORK_FETCH::set_request(PROJECT* p) {
if (dont_fetch(p, rsc_type)) return;
// if backup project, fetch 1 job per idle instance
//
@ -202,8 +201,6 @@ void RSC_WORK_FETCH::set_request(PROJECT* p) {
return;
}
RSC_PROJECT_WORK_FETCH& w = project_state(p);
if (!w.may_have_work) return;
if (w.anon_skip) return;
double non_excl_inst = ninstances - w.ncoprocs_excluded;
if (shortfall) {
if (wacky_dcf(p)) {
@ -476,7 +473,6 @@ void WORK_FETCH::piggyback_work_request(PROJECT* p) {
bool check_higher_priority_projects = true;
if (p->sched_rpc_pending && config.fetch_on_update) {
check_higher_priority_projects = false;
return;
}
setup();
@ -508,7 +504,7 @@ void WORK_FETCH::piggyback_work_request(PROJECT* p) {
DEBUG(msg_printf(p, MSG_INFO, "piggyback: %s can't fetch work", p2->project_name);)
continue;
}
if (rwf.can_fetch(p2)) {
if (rwf.can_fetch(p2) && !rwf.backed_off(p2)) {
DEBUG(msg_printf(p, MSG_INFO, "piggyback: better proj %s", p2->project_name);)
break;
}
@ -544,6 +540,16 @@ static bool higher_priority(PROJECT *p1, PROJECT *p2) {
return (p1->sched_priority > p2->sched_priority);
}
// check resource-level backoff
//
bool RSC_WORK_FETCH::backed_off(PROJECT* p) {
if (project_state(p).backoff_time > gstate.now) {
DEBUG(msg_printf(p, MSG_INFO, "skip: backoff");)
return true;
}
return false;
}
// a variety of checks for whether we should ask this project
// for work of this type
//
@ -556,12 +562,7 @@ bool RSC_WORK_FETCH::can_fetch(PROJECT *p) {
return false;
}
// check resource-level backoff
//
if (project_state(p).backoff_time > gstate.now) {
DEBUG(msg_printf(p, MSG_INFO, "skip: backoff");)
return false;
}
RSC_PROJECT_WORK_FETCH& rpwf = project_state(p);
// if project has zero resource share,
// only fetch work if a device is idle
@ -585,16 +586,15 @@ bool RSC_WORK_FETCH::can_fetch(PROJECT *p) {
// TODO: THIS IS FAIRLY CRUDE. Making it smarter would require
// computing shortfall etc. on a per-project basis
//
int nexcl = p->rsc_pwf[rsc_type].ncoprocs_excluded;
int nexcl = rpwf.ncoprocs_excluded;
if (rsc_type && nexcl) {
int n_not_excluded = ninstances - nexcl;
if (p->rsc_pwf[rsc_type].queue_est > (gstate.work_buf_min() * n_not_excluded)/ninstances) {
if (rpwf.queue_est > (gstate.work_buf_min() * n_not_excluded)/ninstances) {
DEBUG(msg_printf(p, MSG_INFO, "skip: too much work");)
return false;
}
}
RSC_PROJECT_WORK_FETCH& rpwf = project_state(p);
if (rpwf.anon_skip) {
DEBUG(msg_printf(p, MSG_INFO, "skip: anon");)
return false;
@ -690,7 +690,7 @@ PROJECT* WORK_FETCH::choose_project() {
for (int i=0; i<coprocs.n_rsc; i++) {
if (i && !gpus_usable) continue;
RSC_WORK_FETCH& rwf = rsc_work_fetch[i];
if (rwf.can_fetch(p)) {
if (rwf.can_fetch(p) && !rwf.backed_off(p)) {
if (!rwf.found_project) {
rwf.found_project = p;
}
@ -719,17 +719,21 @@ PROJECT* WORK_FETCH::choose_project() {
if (i && !gpus_usable) continue;
RSC_WORK_FETCH& rwf = rsc_work_fetch[i];
bool buffer_low;
DEBUG(msg_printf(p, MSG_INFO, "checking %s", rsc_name(i));)
if (i == rsc_index) {
buffer_low = (rwf.saturated_time < gstate.work_buf_min());
} else {
if (rwf.found_project && rwf.found_project != p) {
DEBUG(msg_printf(p, MSG_INFO, "%s not high prio proj", rsc_name(i));)
continue;
}
buffer_low = (rwf.saturated_time < gstate.work_buf_total());
if (!buffer_low && !rwf.uses_starved_excluded_instances(p)) {
DEBUG(msg_printf(p, MSG_INFO, "%s don't need", rsc_name(i));)
continue;
}
if (!rsc_work_fetch[i].can_fetch(p)) {
if (!rwf.can_fetch(p)) {
DEBUG(msg_printf(p, MSG_INFO, "%s can't fetch", rsc_name(i));)
continue;
}
}
@ -848,34 +852,41 @@ void WORK_FETCH::write_request(FILE* f, PROJECT* p) {
void WORK_FETCH::handle_reply(
PROJECT* p, SCHEDULER_REPLY*, vector<RESULT*> new_results
) {
bool got_rsc[MAX_RSC];
bool got_work[MAX_RSC];
bool requested_work[MAX_RSC];
for (int i=0; i<coprocs.n_rsc; i++) {
got_rsc[i] = false;
got_work[i] = false;
requested_work[i] = (rsc_work_fetch[i].req_secs > 0);
}
for (unsigned int i=0; i<new_results.size(); i++) {
RESULT* rp = new_results[i];
got_work[rp->avp->gpu_usage.rsc_type] = true;
}
// if didn't get any jobs, back off on requested resource types
//
if (!new_results.size()) {
// but not if RPC was requested by project
for (int i=0; i<coprocs.n_rsc; i++) {
// back off on a resource type if
// - we asked for jobs
// - we didn't get any
// - we're not currently backed off for that type
// (i.e. don't back off because of a piggyback request)
// - the RPC was done for a reason that is automatic
// and potentially frequent
//
if (p->sched_rpc_pending != RPC_REASON_PROJECT_REQ) {
for (int i=0; i<coprocs.n_rsc; i++) {
if (rsc_work_fetch[i].req_secs) {
if (requested_work[i] && !got_work[i]) {
if (p->rsc_pwf[i].backoff_time < gstate.now) {
switch (p->sched_rpc_pending) {
case RPC_REASON_RESULTS_DUE:
case RPC_REASON_NEED_WORK:
case RPC_REASON_TRICKLE_UP:
p->rsc_pwf[i].resource_backoff(p, rsc_name(i));
}
}
}
return;
}
// if we did get jobs, clear backoff on resource types
//
for (unsigned int i=0; i<new_results.size(); i++) {
RESULT* rp = new_results[i];
got_rsc[rp->avp->gpu_usage.rsc_type] = true;
}
for (int i=0; i<coprocs.n_rsc; i++) {
if (got_rsc[i]) p->rsc_pwf[i].clear_backoff();
// if we did get jobs, clear backoff
//
if (got_work[i]) {
p->rsc_pwf[i].clear_backoff();
}
}
}

View File

@ -253,6 +253,7 @@ struct RSC_WORK_FETCH {
void set_request_excluded(PROJECT*);
bool may_have_work(PROJECT*);
bool can_fetch(PROJECT*);
bool backed_off(PROJECT*);
bool uses_starved_excluded_instances(PROJECT*);
RSC_WORK_FETCH() {
rsc_type = 0;