From 83196d447f4640ecf213695607a513ce80571864 Mon Sep 17 00:00:00 2001 From: David Anderson Date: Mon, 15 Jul 2002 05:34:32 +0000 Subject: [PATCH] Added scheduler RPC retry policy svn path=/trunk/boinc/; revision=192 --- checkin_notes | 49 +++++++ client/client_state.C | 10 +- client/client_state.h | 30 +++-- client/client_types.C | 38 ++++-- client/client_types.h | 15 ++- client/cs_scheduler.C | 202 ++++++++++++++++++++++++----- client/main.C | 17 ++- client/scheduler_op.C | 262 +++++++++++++++++++++++++++++++++----- client/scheduler_op.h | 38 ++++-- db/db.h | 1 + db/db_mysql.C | 5 +- db/schema.sql | 1 + doc/file_xfer_policy.html | 3 + doc/index.html | 3 +- doc/result.html | 10 +- doc/rpc_policy.html | 121 ++++++++++++++++++ html/user/db.inc | 1 + tools/backend_lib.C | 2 + tools/create_work.C | 8 -- 19 files changed, 695 insertions(+), 121 deletions(-) create mode 100644 doc/file_xfer_policy.html create mode 100644 doc/rpc_policy.html diff --git a/checkin_notes b/checkin_notes index 29e387f61a..9d56edbe90 100755 --- a/checkin_notes +++ b/checkin_notes @@ -1080,3 +1080,52 @@ Michael Gary July 12, 2002 test/ prefs3.xml test_rsc.php + +David A July 14 2002 + - Added reasonable policies for making scheduler RPCs. Summary: + - results now have a "report deadline". + This may be useful for various purposes; for now, + we use it to trigger scheduler RPCs to report results + - The client now tries all a project's scheduler URLs in turn. + - Projects have a "min_rpc_time", the earliest time to + attempt another RPC (replaces next_request_time) + - We maintain an RPC failure count for each project. + If RPCs to all its URLs fail, we increment the count + and set its min_rpc_time according to an exponential backoff + - If a project is getting repeated RPC failures, + reread and parse its master URL file every so often, + in case the set of scheduler URLs has changed + - When the client has a completed result past its deadline, + it attempts to report it to that project. + - When the client's estimated work falls below low water, + it ranks projects according to their "resource deficit", + then attempts RPCs to project in that order + until the estimated work is above high water. + NOTE: only the simplest case has been tested. + We need to write test scripts for many other cases. + NOTE: currently a result's report deadline is now+1000. + We need to add a mechanism for setting it. + - The CLIENT_STATE is now kept in a global variable instead of + a local var of main(). + This is because SCHEDULER_OP needs to get at it. + + client/ + client_state.C,h + client_types.C,h + cs_scheduler.C + main.C + scheduler_op.C,h + db/ + db.h + db_mysql.h + schema.sql + doc/ + file_xfer_policy.html (new) + index.html + result.html + rpc_policy.html (new) + html_user/ + db.inc + tools/ + backend_lib.C + create_work.C diff --git a/client/client_state.C b/client/client_state.C index 537033c95e..de42273798 100644 --- a/client/client_state.C +++ b/client/client_state.C @@ -138,7 +138,7 @@ bool CLIENT_STATE::do_something() { action |= file_xfers->poll(); action |= active_tasks.poll(); action |= active_tasks.poll_time(); - action |= get_work(); + action |= scheduler_rpc_poll(); action |= garbage_collect(); action |= start_apps(); action |= handle_running_apps(); @@ -173,14 +173,6 @@ int CLIENT_STATE::parse_state_file() { p2 = lookup_project(project->master_url); if (p2) { p2->copy_state_fields(*project); - p2->scheduler_urls = project->scheduler_urls; - p2->project_name = project->project_name; - p2->user_name = project->user_name; - p2->rpc_seqno = project->rpc_seqno; - p2->hostid = project->hostid; - p2->next_request_time = project->next_request_time; - p2->exp_avg_cpu = project->exp_avg_cpu; - p2->exp_avg_mod_time = project->exp_avg_mod_time; } else { fprintf(stderr, "Project %s found in state file but not prefs.\n", diff --git a/client/client_state.h b/client/client_state.h index bb3e5a645a..8e50cbe631 100644 --- a/client/client_state.h +++ b/client/client_state.h @@ -17,6 +17,9 @@ // Contributor(s): // +#ifndef _CLIENT_STATE_ +#define _CLIENT_STATE_ + #include #include "app.h" @@ -55,7 +58,6 @@ private: NET_XFER_SET* net_xfers; HTTP_OP_SET* http_ops; FILE_XFER_SET* file_xfers; - SCHEDULER_OP* scheduler_op; ACTIVE_TASK_SET active_tasks; HOST_INFO host_info; PREFS* prefs; @@ -65,7 +67,6 @@ private: bool client_state_dirty; bool exit_when_idle; bool run_time_test; - bool contacted_sched_server; bool activities_suspended; int exit_after; @@ -79,12 +80,8 @@ private: int link_workunit(PROJECT*, WORKUNIT*); int link_result(PROJECT*, RESULT*); int check_suspend_activities(); - bool need_work(); - void update_avg_cpu(PROJECT*); - PROJECT* choose_project(); int make_project_dirs(); int make_slot_dirs(); - bool get_work(); bool input_files_available(RESULT*); int app_finished(ACTIVE_TASK&); bool start_apps(); @@ -92,8 +89,23 @@ private: bool start_file_xfers(); void print_counts(); bool garbage_collect(); - int make_scheduler_request(PROJECT*, int); - void handle_scheduler_reply(SCHEDULER_OP&); + + // stuff related to scheduler RPCs + // + SCHEDULER_OP* scheduler_op; + bool contacted_sched_server; + void compute_resource_debts(); +public: + PROJECT* next_project(PROJECT*); + PROJECT* next_project_master_pending(); + double work_needed_secs(); + int make_scheduler_request(PROJECT*, double); + void handle_scheduler_reply(PROJECT*, char* scheduler_url); +private: + PROJECT* find_project_with_overdue_results(); + bool some_project_rpc_ok(); + bool scheduler_rpc_poll(); + void update_avg_cpu(PROJECT*); double estimate_duration(WORKUNIT*); double current_water_days(); @@ -109,3 +121,5 @@ public: }; extern CLIENT_STATE gstate; + +#endif diff --git a/client/client_types.C b/client/client_types.C index 16afb4a10c..207f7907b1 100644 --- a/client/client_types.C +++ b/client/client_types.C @@ -27,12 +27,13 @@ #include "client_types.h" PROJECT::PROJECT() { + project_specific_prefs = 0; + code_sign_key = 0; } PROJECT::~PROJECT() { - if (project_specific_prefs) { - free(project_specific_prefs); - } + if (project_specific_prefs) free(project_specific_prefs); + if (code_sign_key) free(code_sign_key); } // parse project fields from prefs.xml @@ -73,10 +74,11 @@ int PROJECT::parse_state(FILE* in) { } strcpy(project_name, ""); strcpy(user_name, ""); - next_request_time = 0; resource_share = 1; exp_avg_cpu = 0; exp_avg_mod_time = 0; + min_rpc_time = 0; + nrpc_failures = 0; while (fgets(buf, 256, in)) { if (match_tag(buf, "")) return 0; else if (parse_str(buf, "", string.text)) { @@ -88,13 +90,14 @@ int PROJECT::parse_state(FILE* in) { else if (parse_str(buf, "", user_name)) continue; else if (parse_int(buf, "", rpc_seqno)) continue; else if (parse_int(buf, "", hostid)) continue; - else if (parse_int(buf, "", next_request_time)) continue; else if (parse_double(buf, "", exp_avg_cpu)) continue; else if (parse_int(buf, "", exp_avg_mod_time)) continue; else if (match_tag(buf, "")) { dup_element_contents(in, "", &code_sign_key); //fprintf(stderr, "code_sign_key: %s\n", code_sign_key); } + else if (parse_int(buf, "", nrpc_failures)) continue; + else if (parse_int(buf, "", min_rpc_time)) continue; else fprintf(stderr, "PROJECT::parse_state(): unrecognized: %s\n", buf); } return ERR_XML_PARSE; @@ -121,17 +124,19 @@ int PROJECT::write_state(FILE* out) { " %s\n" " %d\n" " %d\n" - " %d\n" " %f\n" - " %d\n", + " %d\n" + " %d\n" + " %d\n", master_url, project_name, user_name, rpc_seqno, hostid, - next_request_time, exp_avg_cpu, - exp_avg_mod_time + exp_avg_mod_time, + nrpc_failures, + min_rpc_time ); if (code_sign_key) { fprintf(out, @@ -144,15 +149,19 @@ int PROJECT::write_state(FILE* out) { return 0; } +// copy fields from "p" into "this" that are stored in client_state.xml +// void PROJECT::copy_state_fields(PROJECT& p) { scheduler_urls = p.scheduler_urls; project_name = p.project_name; user_name = p.user_name; rpc_seqno = p.rpc_seqno; hostid = p.hostid; - next_request_time = p.next_request_time; exp_avg_cpu = p.exp_avg_cpu; exp_avg_mod_time = p.exp_avg_mod_time; + code_sign_key = strdup(p.code_sign_key); + nrpc_failures = p.nrpc_failures; + min_rpc_time = p.min_rpc_time; } void PROJECT::copy_prefs_fields(PROJECT& p) { @@ -481,6 +490,7 @@ int RESULT::parse_ack(FILE* in) { void RESULT::clear() { strcpy(name, ""); strcpy(wu_name, ""); + report_deadline = 0; output_files.clear(); is_active = false; is_compute_done = false; @@ -506,6 +516,7 @@ int RESULT::parse_server(FILE* in) { if (match_tag(buf, "")) return 0; if (parse_str(buf, "", name)) continue; if (parse_str(buf, "", wu_name)) continue; + if (parse_int(buf, "", report_deadline)) continue; if (match_tag(buf, "")) { file_ref.parse(in); output_files.push_back(file_ref); @@ -529,6 +540,7 @@ int RESULT::parse_state(FILE* in) { if (match_tag(buf, "")) return 0; if (parse_str(buf, "", name)) continue; if (parse_str(buf, "", wu_name)) continue; + if (parse_int(buf, "", report_deadline)) continue; if (match_tag(buf, "")) { file_ref.parse(in); output_files.push_back(file_ref); @@ -579,8 +591,10 @@ int RESULT::write(FILE* out, bool to_server) { } if (!to_server) { fprintf(out, - " %s\n", - wu_name + " %s\n" + " %d\n", + wu_name, + report_deadline ); for (i=0; i output_files; bool is_active; // an app is currently running for this bool is_compute_done; // computation finished diff --git a/client/cs_scheduler.C b/client/cs_scheduler.C index de3e22b43c..9d462d4853 100644 --- a/client/cs_scheduler.C +++ b/client/cs_scheduler.C @@ -18,10 +18,8 @@ // // This file contains high-level logic for communicating with -// scheduling servers: -// - what project to ask for work -// - how much work to ask for -// - merging the result of a scheduler RPC into the client state +// scheduling servers, +// and for merging the result of a scheduler RPC into the client state // Note: code for actually doing a scheduler RPC is elsewhere, // namely scheduler_op.C @@ -45,40 +43,41 @@ #include "client_state.h" // quantities like avg CPU time decay by a factor of e every week +// #define EXP_DECAY_RATE (1./(3600*24*7)) #define SECONDS_IN_DAY 86400 -//estimates the number of days of work remaining +// estimate the days of work remaining // double CLIENT_STATE::current_water_days() { unsigned int i; + RESULT* rp; double seconds_remaining=0; + for (i=0; iis_compute_done) continue; - if (rp->cpu_time > 0) + if (rp->cpu_time > 0) { seconds_remaining += (rp->wup->seconds_to_complete - rp->cpu_time); - else + } else { seconds_remaining += rp->wup->seconds_to_complete; + } } return (seconds_remaining * SECONDS_IN_DAY); } -bool CLIENT_STATE::need_work() { - double temp; - if(prefs->high_water_days < prefs->low_water_days) { - temp = prefs->high_water_days; - prefs->high_water_days = prefs->low_water_days; - prefs->low_water_days = temp; - } - return (current_water_days() <= prefs->low_water_days); +// seconds of work needed to come up to high-water mark +// +double CLIENT_STATE::work_needed_secs() { + double x = current_water_days(); + if (x > prefs->high_water_days) return 0; + return (prefs->high_water_days - x)*86400; } +// update exponentially-averaged CPU times of all projects +// void CLIENT_STATE::update_avg_cpu(PROJECT* p) { - int now = time(0); - if(p==NULL) { - fprintf(stderr, "error: CLIENT_STATE.update_avg_cpu: unexpected NULL pointer p\n"); - } + time_t now = time(0); double deltat = now - p->exp_avg_mod_time; if (deltat > 0) { if (p->exp_avg_cpu != 0) { @@ -88,6 +87,45 @@ void CLIENT_STATE::update_avg_cpu(PROJECT* p) { } } +// find a project that needs its master file parsed +// +PROJECT* CLIENT_STATE::next_project_master_pending() { + unsigned int i; + PROJECT* p; + time_t now = time(0); + + for (i=0; imin_rpc_time > now ) continue; + if (p->master_url_fetch_pending) { + return p; + } + } + return 0; +} + +// return the next project after "old", in debt order, +// that is eligible for a scheduler RPC +// +PROJECT* CLIENT_STATE::next_project(PROJECT* old) { + PROJECT* p, *pbest; + int best = 999; + time_t now = time(0); + unsigned int i; + pbest = 0; + for (i=0; imin_rpc_time > now ) continue; + if (old && p->debt_order <= old->debt_order) continue; + if (p->debt_order < best) { + pbest = p; + best = p->debt_order; + } + } + return pbest; +} + +#if 0 // choose a project to ask for work // PROJECT* CLIENT_STATE::choose_project() { @@ -118,8 +156,41 @@ PROJECT* CLIENT_STATE::choose_project() { } return bestp; } +#endif -int CLIENT_STATE::make_scheduler_request(PROJECT* p, int work_req) { +void CLIENT_STATE::compute_resource_debts() { + unsigned int i, j; + PROJECT* p, *pbest; + double best; + + for (i=0; iexp_avg_cpu == 0) { + p->resource_debt = p->resource_share; + } else { + p->resource_debt = p->resource_share/p->exp_avg_cpu; + } + p->debt_order = -1; + } + + // put in decreasing order. Should use qsort or some stdlib thang + // + for (i=0; idebt_order >= 0) continue; + if (p->resource_debt > best) { + best = p->resource_debt; + pbest = p; + } + } + pbest->debt_order = i; + } +} + +int CLIENT_STATE::make_scheduler_request(PROJECT* p, double work_req) { FILE* f = fopen(SCHED_OP_REQUEST_FILE, "wb"); unsigned int i; RESULT* rp; @@ -139,7 +210,7 @@ int CLIENT_STATE::make_scheduler_request(PROJECT* p, int work_req) { " %d\n" " %s\n" " %d\n" - " %d\n", + " %f\n", p->authenticator, p->hostid, p->rpc_seqno, @@ -152,6 +223,7 @@ int CLIENT_STATE::make_scheduler_request(PROJECT* p, int work_req) { } FILE* fprefs = fopen(PREFS_FILE_NAME, "r"); + if (!fprefs) return ERR_FOPEN; copy_stream(fprefs, f); fclose(fprefs); @@ -171,6 +243,77 @@ int CLIENT_STATE::make_scheduler_request(PROJECT* p, int work_req) { return 0; } +// find a project with results that are overdue to report, +// and which we're allowed to contact. +// +PROJECT* CLIENT_STATE::find_project_with_overdue_results() { + unsigned int i; + RESULT* r; + time_t now = time(0); + + for (i=0; iis_compute_done && r->is_upload_done() && !r->is_server_ack) { + if (r->project->min_rpc_time < now) { + return r->project; + } + } + } + return 0; +} + +// return true if we're allowed to do a scheduler RPC to at least one project +// +bool CLIENT_STATE::some_project_rpc_ok() { + unsigned int i; + time_t now = time(0); + + for (i=0; imin_rpc_time < now) return true; + } + return false; +} + +// called from the client's polling loop. +// initiate scheduler RPC activity if needed and possible +// +bool CLIENT_STATE::scheduler_rpc_poll() { + double work_secs; + PROJECT* p; + bool action, below_low_water; + + switch(scheduler_op->state) { + case SCHEDULER_OP_STATE_IDLE: + below_low_water = (current_water_days() <= prefs->low_water_days); + if (below_low_water && some_project_rpc_ok()) { + compute_resource_debts(); + scheduler_op->init_get_work(); + action = true; + } else { + p = find_project_with_overdue_results(); + if (p) { + compute_resource_debts(); + if (p->debt_order == 0) { + work_secs = work_needed_secs(); + } else { + work_secs = 0; + } + scheduler_op->init_return_results(p, work_secs); + action = true; + } + } + break; + default: + scheduler_op->poll(); + if (scheduler_op->state == SCHEDULER_OP_STATE_IDLE) { + action = true; + } + break; + } + return action; +} + +#if 0 // manage the task of maintaining an adequate supply of work. // bool CLIENT_STATE::get_work() { @@ -226,6 +369,7 @@ bool CLIENT_STATE::get_work() { } return action; } +#endif // see whether a new preferences set, obtained from // the given project, looks "reasonable". @@ -236,21 +380,19 @@ bool PREFS::looks_reasonable(PROJECT& project) { return false; } -void CLIENT_STATE::handle_scheduler_reply(SCHEDULER_OP& sched_op) { +void CLIENT_STATE::handle_scheduler_reply( + PROJECT* project, char* scheduler_url +) { SCHEDULER_REPLY sr; FILE* f; int retval; unsigned int i; char prefs_backup[256]; - PROJECT *project, *pp, *sp; + PROJECT *pp, *sp; PREFS* new_prefs; bool signature_valid; - project = sched_op.project; contacted_sched_server = true; - if (log_flags.sched_ops) { - printf("Got reply from scheduler %s\n", sched_op.scheduler_url); - } if (log_flags.sched_op_debug) { f = fopen(SCHED_OP_RESULT_FILE, "r"); printf("------------- SCHEDULER REPLY ----------\n"); @@ -267,7 +409,7 @@ void CLIENT_STATE::handle_scheduler_reply(SCHEDULER_OP& sched_op) { } if (sr.request_delay) { - project->next_request_time = time(0) + sr.request_delay; + project->min_rpc_time = time(0) + sr.request_delay; } if (sr.hostid) { project->hostid = sr.hostid; @@ -298,7 +440,7 @@ void CLIENT_STATE::handle_scheduler_reply(SCHEDULER_OP& sched_op) { " %s\n", sr.prefs_mod_time, project->master_url, - sched_op.scheduler_url + scheduler_url ); fputs(sr.prefs_xml, f); fprintf(f, diff --git a/client/main.C b/client/main.C index 3599cf5990..867686c49c 100644 --- a/client/main.C +++ b/client/main.C @@ -69,7 +69,6 @@ int initialize_prefs() { } int main(int argc, char** argv) { - CLIENT_STATE cs; PREFS* prefs; FILE* f; int retval; @@ -97,23 +96,23 @@ int main(int argc, char** argv) { } } - cs.init(prefs); - cs.parse_cmdline(argc, argv); - if(cs.run_time_tests()) { - cs.time_tests(); + gstate.init(prefs); + gstate.parse_cmdline(argc, argv); + if(gstate.run_time_tests()) { + gstate.time_tests(); } - cs.restart_tasks(); + gstate.restart_tasks(); while (1) { - if (!cs.do_something()) { + if (!gstate.do_something()) { if (log_flags.time_debug) printf("SLEEP 1 SECOND\n"); fflush(stdout); boinc_sleep(1); } - if (cs.time_to_exit()) { + if (gstate.time_to_exit()) { printf("time to exit\n"); break; } } - cs.exit_tasks(); + gstate.exit_tasks(); return 0; } diff --git a/client/scheduler_op.C b/client/scheduler_op.C index 84cc5db25d..a37d39d910 100644 --- a/client/scheduler_op.C +++ b/client/scheduler_op.C @@ -19,6 +19,7 @@ #include +#include "client_state.h" #include "client_types.h" #include "error_numbers.h" #include "file_names.h" @@ -35,12 +36,82 @@ SCHEDULER_OP::SCHEDULER_OP(HTTP_OP_SET* h) { http_ops = h; } +// try to get enough work to bring us up to high-water mark +// +int SCHEDULER_OP::init_get_work() { + double ns = gstate.work_needed_secs(); + + must_get_work = true; + project = gstate.next_project(0); + if (project) { + init_op_project(ns); + } + return 0; +} + +// report results for a particular project. +// also get work from that project if below high-water mark +// +int SCHEDULER_OP::init_return_results(PROJECT* p, double ns) { + must_get_work = false; + project = p; + return init_op_project(ns); +} + +// try to initiate an RPC to the current project. +// If there are multiple schedulers, start with the first one +// +int SCHEDULER_OP::init_op_project(double ns) { + int retval; + + if (log_flags.sched_op_debug) { + printf("init_op_project: starting op for %s\n", project->master_url); + } + + // if project has no schedulers, skip everything else + // and just get its master file. + // + if (project->scheduler_urls.size() == 0) { + init_master_fetch(project); + return 0; + } + url_index = 0; + retval = gstate.make_scheduler_request(project, ns); + if (retval) { + fprintf(stderr, "make_scheduler_request: %d\n", retval); + return retval; + } + return start_rpc(); +} + +// Set a project's min RPC time to something in the future, +// based on exponential backoff +// TODO: integrate with other backoff sources +// +int SCHEDULER_OP::set_min_rpc_time(PROJECT* p) { + int x = RETRY_BASE_PERIOD; + int i; + + int n = p->nrpc_failures; + if (n > RETRY_CAP) n = RETRY_CAP; + for (i=0; imin_rpc_time = time(0) + x; + if (log_flags.sched_op_debug) { + printf( + "setting min RPC time for %s to %d seconds from now\n", + p->master_url, x + ); + } + return 0; +} + +// low-level routine to initiate an RPC +// int SCHEDULER_OP::start_rpc() { FILE *f; + int retval; - // TODO: try scheduler URLs other than the first one - // - strcpy(scheduler_url, project->scheduler_urls[0].text); + strcpy(scheduler_url, project->scheduler_urls[url_index].text); if (log_flags.sched_ops) { printf("Sending request to scheduler: %s\n", scheduler_url); } @@ -51,35 +122,42 @@ int SCHEDULER_OP::start_rpc() { printf("--------- END ---------\n"); fclose(f); } - http_op.init_post( + retval = http_op.init_post( scheduler_url, SCHED_OP_REQUEST_FILE, SCHED_OP_RESULT_FILE ); - http_ops->insert(&http_op); + if (retval) return retval; + retval = http_ops->insert(&http_op); + if (retval) return retval; project->rpc_seqno++; state = SCHEDULER_OP_STATE_RPC; return 0; } -int SCHEDULER_OP::start_op(PROJECT* p) { - if(p==NULL) { - fprintf(stderr, "error: SCHEDULER_OP.start_op: unexpected NULL pointer p\n"); +// initiate a fetch of a project's master URL file +// +int SCHEDULER_OP::init_master_fetch(PROJECT* p) { + int retval; + + if (p==NULL) { + fprintf(stderr, "error: SCHEDULER_OP.init_master_fetch: NULL pointer p\n"); return ERR_NULL; } project = p; - if (project->scheduler_urls.size() == 0) { - http_op.init_get(project->master_url, MASTER_FILE_NAME); - http_ops->insert(&http_op); - state = SCHEDULER_OP_STATE_GET_MASTER; - } else { - start_rpc(); + if (log_flags.sched_op_debug) { + printf("Fetching master file for %s\n", project->master_url); } + retval = http_op.init_get(project->master_url, MASTER_FILE_NAME); + if (retval) return retval; + retval = http_ops->insert(&http_op); + if (retval) return retval; + state = SCHEDULER_OP_STATE_GET_MASTER; return 0; } // parse a master file. // -int SCHEDULER_OP::parse_master_file() { +int SCHEDULER_OP::parse_master_file(vector &urls) { char buf[256]; STRING256 str; FILE* f; @@ -92,45 +170,171 @@ int SCHEDULER_OP::parse_master_file() { project->scheduler_urls.clear(); while (fgets(buf, 256, f)) { if (parse_str(buf, "", str.text)) { - project->scheduler_urls.push_back(str); + urls.push_back(str); } } + if (log_flags.sched_op_debug) { + printf("Parsed master file; got %d scheduler URLs\n", urls.size()); + } return 0; } +// A master file has just been read. +// transfer scheduler urls to project. +// Return true if any of them is new +// +bool SCHEDULER_OP::update_urls(PROJECT& project, vector &urls) { + unsigned int i, j; + bool found, any_new; + + any_new = false; + for (i=0; i urls; + bool changed, scheduler_op_done; switch(state) { case SCHEDULER_OP_STATE_GET_MASTER: + // here we're fetching the master file for a project + // if (http_op.http_op_state == HTTP_STATE_DONE) { + project->master_url_fetch_pending = false; http_ops->remove(&http_op); - if (http_op.http_op_retval) { - state = SCHEDULER_OP_STATE_DONE; - scheduler_op_retval = http_op.http_op_retval; - } else { + if (http_op.http_op_retval == 0) { if (log_flags.sched_op_debug) { printf( "Got master file from %s; parsing\n", project->master_url ); } - retval = parse_master_file(); - if (retval) { - state = SCHEDULER_OP_STATE_DONE; - scheduler_op_retval = retval; + retval = parse_master_file(urls); + if (retval == 0) { + changed = update_urls(*project, urls); + if (changed) { + project->min_rpc_time = 0; + project->nrpc_failures = 0; + } } else { - start_rpc(); + // master file parse failed. treat like RPC error + // + project->nrpc_failures++; + set_min_rpc_time(project); + if (log_flags.sched_op_debug) { + printf("Master file parse failed\n"); + } + } + } else { + // fetch of master file failed. Treat like RPC error + // + project->nrpc_failures++; + set_min_rpc_time(project); + if (log_flags.sched_op_debug) { + printf("Master file fetch failed\n"); + } + } + project = gstate.next_project_master_pending(); + if (project) { + init_master_fetch(project); + } else { + state = SCHEDULER_OP_STATE_IDLE; + if (log_flags.sched_op_debug) { + printf("Scheduler_op: return to idle state\n"); } } } break; case SCHEDULER_OP_STATE_RPC: + // here we're doing a scheduler RPC to some project + // + scheduler_op_done = false; if (http_op.http_op_state == HTTP_STATE_DONE) { - state = SCHEDULER_OP_STATE_DONE; - scheduler_op_retval = http_op.http_op_retval; http_ops->remove(&http_op); + if (http_op.http_op_retval) { + if (log_flags.sched_op_debug) { + printf( + "scheduler RPC to %s failed\n", + project->scheduler_urls[url_index].text + ); + } + url_index++; + if (url_index < project->scheduler_urls.size()) { + start_rpc(); + } else { + project->nrpc_failures++; + if ((project->nrpc_failures % MASTER_FETCH_PERIOD) == 0) { + project->master_url_fetch_pending = true; + } + set_min_rpc_time(project); + if (must_get_work) { + project = gstate.next_project(project); + if (project) { + init_op_project(gstate.work_needed_secs()); + } else { + scheduler_op_done = true; + } + } else { + scheduler_op_done = true; + } + } + } else { + if (log_flags.sched_op_debug) { + printf( + "scheduler RPC to %s succeeded\n", + project->scheduler_urls[url_index].text + ); + } + project->nrpc_failures = 0; + gstate.handle_scheduler_reply(project, scheduler_url); + if (must_get_work) { + double x = gstate.work_needed_secs(); + if (x > 0) { + project = gstate.next_project(project); + if (project) { + init_op_project(x); + } else { + scheduler_op_done = true; + } + } else { + scheduler_op_done = true; + } + } + } } + if (scheduler_op_done) { + project = gstate.next_project_master_pending(); + if (project) { + init_master_fetch(project); + } else { + state = SCHEDULER_OP_STATE_IDLE; + if (log_flags.sched_op_debug) { + printf("Scheduler_op: return to idle state\n"); + } + } + } + break; + default: + break; } return 0; } @@ -148,7 +352,7 @@ SCHEDULER_REPLY::~SCHEDULER_REPLY() { } int SCHEDULER_REPLY::parse(FILE* in) { - char buf[256]; + char buf[256], *p; int retval; if(in==NULL) { fprintf(stderr, "error: SCHEDULER_REPLY.parse: unexpected NULL pointer in\n"); @@ -163,7 +367,7 @@ int SCHEDULER_REPLY::parse(FILE* in) { code_sign_key = 0; code_sign_key_signature = 0; - fgets(buf, 256, in); + p = fgets(buf, 256, in); if (!match_tag(buf, "")) { fprintf(stderr, "SCHEDULER_REPLY::parse(): bad first tag %s\n", buf); return ERR_XML_PARSE; diff --git a/client/scheduler_op.h b/client/scheduler_op.h index 2abe7202dc..5594d29847 100644 --- a/client/scheduler_op.h +++ b/client/scheduler_op.h @@ -20,36 +20,54 @@ #ifndef _SCHEDULER_OP_ #define _SCHEDULER_OP_ -// Logic for communicating with a scheduling server. -// If we don't yet have the addresses a scheduling server, -// we have to get the file from the project's master URL -// and parse if for elements - -// TODO: try alternate scheduling servers; -// implement backoff and give-up policies +// SCHEDULER_OP encapsulates the policy and mechanism +// for communicating with scheduling servers. +// It is implemented as a finite-state machine. +// It is active in one of two modes: +// get_work: the client wants to get work, and possibly to +// return results as a side-effect +// return_results: the client wants to return results, and possibly +// to get work as a side-effect +// #include "client_types.h" #include "http.h" #include "prefs.h" +// constants related to scheduler RPC policy + +#define MASTER_FETCH_PERIOD 10 + // fetch and parse master URL if nrpc_failures is a multiple of this +#define RETRY_BASE_PERIOD 100 + // after failure, back off 2^nrpc_failures times this times random +#define RETRY_CAP 10 + // cap on nrpc_failures in the above formula + #define SCHEDULER_OP_STATE_IDLE 0 #define SCHEDULER_OP_STATE_GET_MASTER 1 #define SCHEDULER_OP_STATE_RPC 2 -#define SCHEDULER_OP_STATE_DONE 3 struct SCHEDULER_OP { int state; int scheduler_op_retval; HTTP_OP http_op; HTTP_OP_SET* http_ops; - PROJECT* project; + PROJECT* project; // project we're currently contacting char scheduler_url[256]; + bool must_get_work; // true iff in get_work mode + unsigned int url_index; // index within project's URL list SCHEDULER_OP(HTTP_OP_SET*); int poll(); + int init_get_work(); + int init_return_results(PROJECT*, double nsecs); + int init_op_project(double ns); + int init_master_fetch(PROJECT*); + int set_min_rpc_time(PROJECT*); + bool update_urls(PROJECT& project, vector &urls); int start_op(PROJECT*); int start_rpc(); - int parse_master_file(); + int parse_master_file(vector&); }; struct SCHEDULER_REPLY { diff --git a/db/db.h b/db/db.h index cff1c9026d..98f3cf7467 100644 --- a/db/db.h +++ b/db/db.h @@ -220,6 +220,7 @@ struct RESULT { int workunitid; int state; int hostid; + unsigned int report_deadline; unsigned int sent_time; unsigned int received_time; char name[256]; diff --git a/db/db_mysql.C b/db/db_mysql.C index 29e609f581..a3561e8f1d 100644 --- a/db/db_mysql.C +++ b/db/db_mysql.C @@ -196,12 +196,12 @@ void struct_to_str(void* vp, char* q, int type) { rp = (RESULT*)vp; sprintf(q, "id=%d, create_time=%d, workunitid=%d, state=%d, " - "hostid=%d, sent_time=%d, received_time=%d, " + "hostid=%d, report_deadline=%d, sent_time=%d, received_time=%d, " "name='%s', exit_status=%d, cpu_time=%f, " "xml_doc_in='%s', xml_doc_out='%s', stderr_out='%s', " "batch=%d, project_state=%d, validated=%d", rp->id, rp->create_time, rp->workunitid, rp->state, - rp->hostid, rp->sent_time, rp->received_time, + rp->hostid, rp->report_deadline, rp->sent_time, rp->received_time, rp->name, rp->exit_status, rp->cpu_time, rp->xml_doc_in, rp->xml_doc_out, rp->stderr_out, rp->batch, rp->project_state, rp->validated @@ -347,6 +347,7 @@ void row_to_struct(MYSQL_ROW& r, void* vp, int type) { rp->workunitid = atoi(r[i++]); rp->state = atoi(r[i++]); rp->hostid = atoi(r[i++]); + rp->report_deadline = atoi(r[i++]); rp->sent_time = atoi(r[i++]); rp->received_time = atoi(r[i++]); strcpy(rp->name, r[i++]); diff --git a/db/schema.sql b/db/schema.sql index de26b51b56..a36055c809 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -132,6 +132,7 @@ create table result ( workunitid integer not null, state integer not null, hostid integer not null, + report_deadline integer not null, sent_time integer not null, received_time integer not null, name varchar(254) not null, diff --git a/doc/file_xfer_policy.html b/doc/file_xfer_policy.html new file mode 100644 index 0000000000..95f610f66e --- /dev/null +++ b/doc/file_xfer_policy.html @@ -0,0 +1,3 @@ +

Core client: network scheduling and retry policy

+ +

diff --git a/doc/index.html b/doc/index.html index 8333bf3508..e8517013ca 100644 --- a/doc/index.html +++ b/doc/index.html @@ -61,7 +61,8 @@

  • Core client: FSM structure
  • Core client: data structures
  • Core client: logic -
  • Core client: network scheduling and retry policy +
  • Scheduler RPC timing and retry policies +
  • File transfer timing and retry policies
  • Core client: debugging diff --git a/doc/result.html b/doc/result.html index 900290cd85..2281384e54 100644 --- a/doc/result.html +++ b/doc/result.html @@ -8,9 +8,17 @@ The attributes of a result include:
    • The name of the result (unique across all results in the project).
    • The name of the associated workunit. +
    • The time when the completed result +should be reported to a scheduling server. +This is assigned by the project, and +is used by clients to prioritize operations, +and to initiate scheduler RPCs. +There is no guarantee that the result will actually +be reported by this time. +
    • An XML document listing the names of its output files; see below.
    • An XML document giving the sizes and checksums of its output files -(this is filled in after the result is completed). +(filled in after the result is completed).
    • The stderr output of the result.
    • The host that computed the result.
    • The times when the result was dispatched and received. diff --git a/doc/rpc_policy.html b/doc/rpc_policy.html new file mode 100644 index 0000000000..e94ca20528 --- /dev/null +++ b/doc/rpc_policy.html @@ -0,0 +1,121 @@ +

      Scheduler RPC timing and retry policies

      + +

      +Each scheduler RPC reports results, gets work, or both. +The client's scheduler RPC policy has several components: +when to make a scheduler RPC, +which project to contact, +which scheduling server for that project, +how much work to ask for, +and what to do if the RPC fails. +

      +The scheduler RPC policy has the following goals: +

        +
      • Make as few scheduler RPCs as possible. +
      • Use random exponential backoff if a project's scheduling +servers are down. +This avoids an RPC storm when the servers come back up. +
      • Eventually re-read a project's master URL file +in case its set of schedulers changes. +
      • Report results before or soon after their deadlines. +
      + + +

      Resource debt

      +

      +The client maintains an exponentially-averaged sum +of the CPU time it has devoted to each project. +The constant EXP_DECAY_RATE determines the decay rate +(currently a factor of e every week). +

      +Each project is assigned a resource debt, computed as +

      +resource_debt = resource_share / exp_avg_cpu +

      +Resource debt is a measure of how much work the client owes the project, +and in general the project with the greatest resource debt +is the one from which work should be requested. + +

      Minimum RPC time

      +

      +

      +The client maintains a minimum RPC time for each project. +This is the earliest time at which a scheduling RPC should be done +to that project (if zero, an RPC can be done immediately). +The minimum RPC time can be set for various reasons: + +

        +
      • Because of a request from the project, +i.e. a <request_delay> element in a scheduler reply message. +
      • Because RPCs to all of the project's scheduler has failed. +An exponential backoff policy is used. +
      • Because one of the project's computations has failed +(the application crashed, or a file upload or download failed). +An exponential backoff policy is used +to prevent a cycle of rapid failures. +
      + +

      Scheduler RPC sessions

      +

      +Communication with schedulers is organized into sessions, +each of which may involve many RPCs. +There are two types of sessions: +

        +
      • Get-work sessions, whose goal is to get a certain amount of work. +Results may be reported as a side-effect. +
      • Report-result sessions, whose goal is to report results. +Work may be fetched as a side-effect. +
      +The internal logic of scheduler sessions is encapsulated +in the class SCHEDULER_OP. +This is implemented as a state machine, +but its logic expressed as a process might look like: +
      +get_work_session() {
      +    while estimated work < high water mark
      +        P = project with greatest debt and min_rpc_time < now
      +        for each scheduler URL of P
      +            attempt an RPC to that URL
      +            if no error break
      +        if some RPC succeeded
      +            P.nrpc_failures = 0
      +        else
      +            P.nrpc_failures++
      +            P.min_rpc_time = exponential_backoff(P.min_rpc_failures)
      +            if P.nrpc_failures mod MASTER_FETCH_PERIOD = 0
      +                P.fetch_master_flag = true
      +    for each project P with P.fetch_master_flag set
      +        read and parse master file
      +        if error
      +            P.nrpc_failures++
      +            P.min_rpc_time = exponential_backoff(P.min_rpc_failures)
      +        if got any new scheduler urls
      +            P.nrpc_failures = 0
      +            P.min_rpc_time = 0
      +}
      +
      +report_result_session(project P) {
      +    for each scheduler URL of project
      +        attempt an RPC to that URL
      +        if no error break
      +    if some RPC succeeded
      +        P.nrpc_failures = 0
      +    else
      +        P.nrpc_failures++;
      +        P.min_rpc_time = exponential_backoff(P.min_rpc_failures)
      +}
      +
      + + +The logic for initiating scheduler sessions is expressed +in the following poll function: +
      +if a scheduler RPC session is not active
      +    if estimated work is less than low-water mark
      +        start a get-work session
      +    else if some project P has overdue results
      +        start a report-result session for P;
      +        is P is the project with greatest resource debt,
      +        the RPC request should ask for enough work to bring us up
      +        to the high-water mark
      +
      diff --git a/html/user/db.inc b/html/user/db.inc index d0f0fb49fe..58e940d358 100644 --- a/html/user/db.inc +++ b/html/user/db.inc @@ -125,6 +125,7 @@ function show_result($result) { row("state", res_state_string($result->state)); row("host ID", $result->hostid); row("sent", time_str($result->sent_time)); + row("report deadline", time_str($result->report_deadline)); row("received", time_str($result->received_time)); row("exit status", $result->exit_status); row("CPU time", $result->cpu_time); diff --git a/tools/backend_lib.C b/tools/backend_lib.C index ee0ea7c607..4db281794c 100644 --- a/tools/backend_lib.C +++ b/tools/backend_lib.C @@ -138,6 +138,8 @@ int create_result( FILE* result_template_file, *tempfile; assert(result_template_filename!=NULL); memset(&r, 0, sizeof(r)); + r.report_deadline = time(0) + 1000; + // TODO: pass this in r.create_time = time(0); r.workunitid = wu.id; r.state = RESULT_STATE_UNSENT; diff --git a/tools/create_work.C b/tools/create_work.C index c6d5f4e53f..d5d86022e9 100644 --- a/tools/create_work.C +++ b/tools/create_work.C @@ -139,14 +139,6 @@ int main(int argc, char** argv) { exit(1); } -#if 0 - retval = read_file(result_template_file, result_template); - if (retval) { - fprintf(stderr, "can't open result template\n"); - exit(1); - } -#endif - if (wu.dynamic_results) { strcpy(app.result_xml_template, result_template); retval = db_app_update(app);