Added scheduler RPC retry policy

svn path=/trunk/boinc/; revision=192
This commit is contained in:
David Anderson 2002-07-15 05:34:32 +00:00
parent a479ec72f7
commit 83196d447f
19 changed files with 695 additions and 121 deletions

View File

@ -1080,3 +1080,52 @@ Michael Gary July 12, 2002
test/
prefs3.xml
test_rsc.php
David A July 14 2002
- Added reasonable policies for making scheduler RPCs. Summary:
- results now have a "report deadline".
This may be useful for various purposes; for now,
we use it to trigger scheduler RPCs to report results
- The client now tries all a project's scheduler URLs in turn.
- Projects have a "min_rpc_time", the earliest time to
attempt another RPC (replaces next_request_time)
- We maintain an RPC failure count for each project.
If RPCs to all its URLs fail, we increment the count
and set its min_rpc_time according to an exponential backoff
- If a project is getting repeated RPC failures,
reread and parse its master URL file every so often,
in case the set of scheduler URLs has changed
- When the client has a completed result past its deadline,
it attempts to report it to that project.
- When the client's estimated work falls below low water,
it ranks projects according to their "resource deficit",
then attempts RPCs to project in that order
until the estimated work is above high water.
NOTE: only the simplest case has been tested.
We need to write test scripts for many other cases.
NOTE: currently a result's report deadline is now+1000.
We need to add a mechanism for setting it.
- The CLIENT_STATE is now kept in a global variable instead of
a local var of main().
This is because SCHEDULER_OP needs to get at it.
client/
client_state.C,h
client_types.C,h
cs_scheduler.C
main.C
scheduler_op.C,h
db/
db.h
db_mysql.h
schema.sql
doc/
file_xfer_policy.html (new)
index.html
result.html
rpc_policy.html (new)
html_user/
db.inc
tools/
backend_lib.C
create_work.C

View File

@ -138,7 +138,7 @@ bool CLIENT_STATE::do_something() {
action |= file_xfers->poll();
action |= active_tasks.poll();
action |= active_tasks.poll_time();
action |= get_work();
action |= scheduler_rpc_poll();
action |= garbage_collect();
action |= start_apps();
action |= handle_running_apps();
@ -173,14 +173,6 @@ int CLIENT_STATE::parse_state_file() {
p2 = lookup_project(project->master_url);
if (p2) {
p2->copy_state_fields(*project);
p2->scheduler_urls = project->scheduler_urls;
p2->project_name = project->project_name;
p2->user_name = project->user_name;
p2->rpc_seqno = project->rpc_seqno;
p2->hostid = project->hostid;
p2->next_request_time = project->next_request_time;
p2->exp_avg_cpu = project->exp_avg_cpu;
p2->exp_avg_mod_time = project->exp_avg_mod_time;
} else {
fprintf(stderr,
"Project %s found in state file but not prefs.\n",

View File

@ -17,6 +17,9 @@
// Contributor(s):
//
#ifndef _CLIENT_STATE_
#define _CLIENT_STATE_
#include <vector>
#include "app.h"
@ -55,7 +58,6 @@ private:
NET_XFER_SET* net_xfers;
HTTP_OP_SET* http_ops;
FILE_XFER_SET* file_xfers;
SCHEDULER_OP* scheduler_op;
ACTIVE_TASK_SET active_tasks;
HOST_INFO host_info;
PREFS* prefs;
@ -65,7 +67,6 @@ private:
bool client_state_dirty;
bool exit_when_idle;
bool run_time_test;
bool contacted_sched_server;
bool activities_suspended;
int exit_after;
@ -79,12 +80,8 @@ private:
int link_workunit(PROJECT*, WORKUNIT*);
int link_result(PROJECT*, RESULT*);
int check_suspend_activities();
bool need_work();
void update_avg_cpu(PROJECT*);
PROJECT* choose_project();
int make_project_dirs();
int make_slot_dirs();
bool get_work();
bool input_files_available(RESULT*);
int app_finished(ACTIVE_TASK&);
bool start_apps();
@ -92,8 +89,23 @@ private:
bool start_file_xfers();
void print_counts();
bool garbage_collect();
int make_scheduler_request(PROJECT*, int);
void handle_scheduler_reply(SCHEDULER_OP&);
// stuff related to scheduler RPCs
//
SCHEDULER_OP* scheduler_op;
bool contacted_sched_server;
void compute_resource_debts();
public:
PROJECT* next_project(PROJECT*);
PROJECT* next_project_master_pending();
double work_needed_secs();
int make_scheduler_request(PROJECT*, double);
void handle_scheduler_reply(PROJECT*, char* scheduler_url);
private:
PROJECT* find_project_with_overdue_results();
bool some_project_rpc_ok();
bool scheduler_rpc_poll();
void update_avg_cpu(PROJECT*);
double estimate_duration(WORKUNIT*);
double current_water_days();
@ -109,3 +121,5 @@ public:
};
extern CLIENT_STATE gstate;
#endif

View File

@ -27,12 +27,13 @@
#include "client_types.h"
PROJECT::PROJECT() {
project_specific_prefs = 0;
code_sign_key = 0;
}
PROJECT::~PROJECT() {
if (project_specific_prefs) {
free(project_specific_prefs);
}
if (project_specific_prefs) free(project_specific_prefs);
if (code_sign_key) free(code_sign_key);
}
// parse project fields from prefs.xml
@ -73,10 +74,11 @@ int PROJECT::parse_state(FILE* in) {
}
strcpy(project_name, "");
strcpy(user_name, "");
next_request_time = 0;
resource_share = 1;
exp_avg_cpu = 0;
exp_avg_mod_time = 0;
min_rpc_time = 0;
nrpc_failures = 0;
while (fgets(buf, 256, in)) {
if (match_tag(buf, "</project>")) return 0;
else if (parse_str(buf, "<scheduler_url>", string.text)) {
@ -88,13 +90,14 @@ int PROJECT::parse_state(FILE* in) {
else if (parse_str(buf, "<user_name>", user_name)) continue;
else if (parse_int(buf, "<rpc_seqno>", rpc_seqno)) continue;
else if (parse_int(buf, "<hostid>", hostid)) continue;
else if (parse_int(buf, "<next_request_time>", next_request_time)) continue;
else if (parse_double(buf, "<exp_avg_cpu>", exp_avg_cpu)) continue;
else if (parse_int(buf, "<exp_avg_mod_time>", exp_avg_mod_time)) continue;
else if (match_tag(buf, "<code_sign_key>")) {
dup_element_contents(in, "</code_sign_key>", &code_sign_key);
//fprintf(stderr, "code_sign_key: %s\n", code_sign_key);
}
else if (parse_int(buf, "<nrpc_failures>", nrpc_failures)) continue;
else if (parse_int(buf, "<min_rpc_time>", min_rpc_time)) continue;
else fprintf(stderr, "PROJECT::parse_state(): unrecognized: %s\n", buf);
}
return ERR_XML_PARSE;
@ -121,17 +124,19 @@ int PROJECT::write_state(FILE* out) {
" <user_name>%s</user_name>\n"
" <rpc_seqno>%d</rpc_seqno>\n"
" <hostid>%d</hostid>\n"
" <next_request_time>%d</next_request_time>\n"
" <exp_avg_cpu>%f</exp_avg_cpu>\n"
" <exp_avg_mod_time>%d</exp_avg_mod_time>\n",
" <exp_avg_mod_time>%d</exp_avg_mod_time>\n"
" <nrpc_failures>%d</nrpc_failures>\n"
" <min_rpc_time>%d</min_rpc_time>\n",
master_url,
project_name,
user_name,
rpc_seqno,
hostid,
next_request_time,
exp_avg_cpu,
exp_avg_mod_time
exp_avg_mod_time,
nrpc_failures,
min_rpc_time
);
if (code_sign_key) {
fprintf(out,
@ -144,15 +149,19 @@ int PROJECT::write_state(FILE* out) {
return 0;
}
// copy fields from "p" into "this" that are stored in client_state.xml
//
void PROJECT::copy_state_fields(PROJECT& p) {
scheduler_urls = p.scheduler_urls;
project_name = p.project_name;
user_name = p.user_name;
rpc_seqno = p.rpc_seqno;
hostid = p.hostid;
next_request_time = p.next_request_time;
exp_avg_cpu = p.exp_avg_cpu;
exp_avg_mod_time = p.exp_avg_mod_time;
code_sign_key = strdup(p.code_sign_key);
nrpc_failures = p.nrpc_failures;
min_rpc_time = p.min_rpc_time;
}
void PROJECT::copy_prefs_fields(PROJECT& p) {
@ -481,6 +490,7 @@ int RESULT::parse_ack(FILE* in) {
void RESULT::clear() {
strcpy(name, "");
strcpy(wu_name, "");
report_deadline = 0;
output_files.clear();
is_active = false;
is_compute_done = false;
@ -506,6 +516,7 @@ int RESULT::parse_server(FILE* in) {
if (match_tag(buf, "</result>")) return 0;
if (parse_str(buf, "<name>", name)) continue;
if (parse_str(buf, "<wu_name>", wu_name)) continue;
if (parse_int(buf, "<report_deadline>", report_deadline)) continue;
if (match_tag(buf, "<file_ref>")) {
file_ref.parse(in);
output_files.push_back(file_ref);
@ -529,6 +540,7 @@ int RESULT::parse_state(FILE* in) {
if (match_tag(buf, "</result>")) return 0;
if (parse_str(buf, "<name>", name)) continue;
if (parse_str(buf, "<wu_name>", wu_name)) continue;
if (parse_int(buf, "<report_deadline>", report_deadline)) continue;
if (match_tag(buf, "<file_ref>")) {
file_ref.parse(in);
output_files.push_back(file_ref);
@ -579,8 +591,10 @@ int RESULT::write(FILE* out, bool to_server) {
}
if (!to_server) {
fprintf(out,
" <wu_name>%s</wu_name>\n",
wu_name
" <wu_name>%s</wu_name>\n"
" <report_deadline>%d</report_deadline>\n",
wu_name,
report_deadline
);
for (i=0; i<output_files.size(); i++) {
output_files[i].write(out);

View File

@ -60,10 +60,20 @@ public:
char user_name[256];
int rpc_seqno;
int hostid;
int next_request_time; // don't contact server until this time
double exp_avg_cpu; // exponentially weighted cpu time
double exp_avg_cpu; // exponentially weighted CPU time
int exp_avg_mod_time; // last time average was changed
char* code_sign_key;
int nrpc_failures; // # of consecutive times we've failed to
// contact all scheduling servers
int min_rpc_time; // earliest time to contact any server
// of this project (or zero)
// the following items are transient; not saved in state file
double resource_debt; // How much CPU time we owe this project
// (arbitrary scale)
int debt_order; // 0 == largest debt
bool master_url_fetch_pending;
// need to fetch and parse the master URL
PROJECT();
~PROJECT();
@ -164,6 +174,7 @@ struct WORKUNIT {
struct RESULT {
char name[256];
char wu_name[256];
int report_deadline;
vector<FILE_REF> output_files;
bool is_active; // an app is currently running for this
bool is_compute_done; // computation finished

View File

@ -18,10 +18,8 @@
//
// This file contains high-level logic for communicating with
// scheduling servers:
// - what project to ask for work
// - how much work to ask for
// - merging the result of a scheduler RPC into the client state
// scheduling servers,
// and for merging the result of a scheduler RPC into the client state
// Note: code for actually doing a scheduler RPC is elsewhere,
// namely scheduler_op.C
@ -45,40 +43,41 @@
#include "client_state.h"
// quantities like avg CPU time decay by a factor of e every week
//
#define EXP_DECAY_RATE (1./(3600*24*7))
#define SECONDS_IN_DAY 86400
//estimates the number of days of work remaining
// estimate the days of work remaining
//
double CLIENT_STATE::current_water_days() {
unsigned int i;
RESULT* rp;
double seconds_remaining=0;
for (i=0; i<results.size(); i++) {
RESULT* rp = results[i];
rp = results[i];
if (rp->is_compute_done) continue;
if (rp->cpu_time > 0)
if (rp->cpu_time > 0) {
seconds_remaining += (rp->wup->seconds_to_complete - rp->cpu_time);
else
} else {
seconds_remaining += rp->wup->seconds_to_complete;
}
}
return (seconds_remaining * SECONDS_IN_DAY);
}
bool CLIENT_STATE::need_work() {
double temp;
if(prefs->high_water_days < prefs->low_water_days) {
temp = prefs->high_water_days;
prefs->high_water_days = prefs->low_water_days;
prefs->low_water_days = temp;
}
return (current_water_days() <= prefs->low_water_days);
// seconds of work needed to come up to high-water mark
//
double CLIENT_STATE::work_needed_secs() {
double x = current_water_days();
if (x > prefs->high_water_days) return 0;
return (prefs->high_water_days - x)*86400;
}
// update exponentially-averaged CPU times of all projects
//
void CLIENT_STATE::update_avg_cpu(PROJECT* p) {
int now = time(0);
if(p==NULL) {
fprintf(stderr, "error: CLIENT_STATE.update_avg_cpu: unexpected NULL pointer p\n");
}
time_t now = time(0);
double deltat = now - p->exp_avg_mod_time;
if (deltat > 0) {
if (p->exp_avg_cpu != 0) {
@ -88,6 +87,45 @@ void CLIENT_STATE::update_avg_cpu(PROJECT* p) {
}
}
// find a project that needs its master file parsed
//
PROJECT* CLIENT_STATE::next_project_master_pending() {
unsigned int i;
PROJECT* p;
time_t now = time(0);
for (i=0; i<projects.size(); i++) {
p = projects[i];
if (p->min_rpc_time > now ) continue;
if (p->master_url_fetch_pending) {
return p;
}
}
return 0;
}
// return the next project after "old", in debt order,
// that is eligible for a scheduler RPC
//
PROJECT* CLIENT_STATE::next_project(PROJECT* old) {
PROJECT* p, *pbest;
int best = 999;
time_t now = time(0);
unsigned int i;
pbest = 0;
for (i=0; i<projects.size(); i++) {
p = projects[i];
if (p->min_rpc_time > now ) continue;
if (old && p->debt_order <= old->debt_order) continue;
if (p->debt_order < best) {
pbest = p;
best = p->debt_order;
}
}
return pbest;
}
#if 0
// choose a project to ask for work
//
PROJECT* CLIENT_STATE::choose_project() {
@ -118,8 +156,41 @@ PROJECT* CLIENT_STATE::choose_project() {
}
return bestp;
}
#endif
int CLIENT_STATE::make_scheduler_request(PROJECT* p, int work_req) {
void CLIENT_STATE::compute_resource_debts() {
unsigned int i, j;
PROJECT* p, *pbest;
double best;
for (i=0; i<projects.size(); i++) {
p = projects[i];
update_avg_cpu(p);
if (p->exp_avg_cpu == 0) {
p->resource_debt = p->resource_share;
} else {
p->resource_debt = p->resource_share/p->exp_avg_cpu;
}
p->debt_order = -1;
}
// put in decreasing order. Should use qsort or some stdlib thang
//
for (i=0; i<projects.size(); i++) {
best = -2;
for (j=0; j<projects.size(); j++) {
p = projects[j];
if (p->debt_order >= 0) continue;
if (p->resource_debt > best) {
best = p->resource_debt;
pbest = p;
}
}
pbest->debt_order = i;
}
}
int CLIENT_STATE::make_scheduler_request(PROJECT* p, double work_req) {
FILE* f = fopen(SCHED_OP_REQUEST_FILE, "wb");
unsigned int i;
RESULT* rp;
@ -139,7 +210,7 @@ int CLIENT_STATE::make_scheduler_request(PROJECT* p, int work_req) {
" <rpc_seqno>%d</rpc_seqno>\n"
" <platform_name>%s</platform_name>\n"
" <core_client_version>%d</core_client_version>\n"
" <work_req_seconds>%d</work_req_seconds>\n",
" <work_req_seconds>%f</work_req_seconds>\n",
p->authenticator,
p->hostid,
p->rpc_seqno,
@ -152,6 +223,7 @@ int CLIENT_STATE::make_scheduler_request(PROJECT* p, int work_req) {
}
FILE* fprefs = fopen(PREFS_FILE_NAME, "r");
if (!fprefs) return ERR_FOPEN;
copy_stream(fprefs, f);
fclose(fprefs);
@ -171,6 +243,77 @@ int CLIENT_STATE::make_scheduler_request(PROJECT* p, int work_req) {
return 0;
}
// find a project with results that are overdue to report,
// and which we're allowed to contact.
//
PROJECT* CLIENT_STATE::find_project_with_overdue_results() {
unsigned int i;
RESULT* r;
time_t now = time(0);
for (i=0; i<results.size(); i++) {
r = results[i];
if (r->is_compute_done && r->is_upload_done() && !r->is_server_ack) {
if (r->project->min_rpc_time < now) {
return r->project;
}
}
}
return 0;
}
// return true if we're allowed to do a scheduler RPC to at least one project
//
bool CLIENT_STATE::some_project_rpc_ok() {
unsigned int i;
time_t now = time(0);
for (i=0; i<projects.size(); i++) {
if (projects[i]->min_rpc_time < now) return true;
}
return false;
}
// called from the client's polling loop.
// initiate scheduler RPC activity if needed and possible
//
bool CLIENT_STATE::scheduler_rpc_poll() {
double work_secs;
PROJECT* p;
bool action, below_low_water;
switch(scheduler_op->state) {
case SCHEDULER_OP_STATE_IDLE:
below_low_water = (current_water_days() <= prefs->low_water_days);
if (below_low_water && some_project_rpc_ok()) {
compute_resource_debts();
scheduler_op->init_get_work();
action = true;
} else {
p = find_project_with_overdue_results();
if (p) {
compute_resource_debts();
if (p->debt_order == 0) {
work_secs = work_needed_secs();
} else {
work_secs = 0;
}
scheduler_op->init_return_results(p, work_secs);
action = true;
}
}
break;
default:
scheduler_op->poll();
if (scheduler_op->state == SCHEDULER_OP_STATE_IDLE) {
action = true;
}
break;
}
return action;
}
#if 0
// manage the task of maintaining an adequate supply of work.
//
bool CLIENT_STATE::get_work() {
@ -226,6 +369,7 @@ bool CLIENT_STATE::get_work() {
}
return action;
}
#endif
// see whether a new preferences set, obtained from
// the given project, looks "reasonable".
@ -236,21 +380,19 @@ bool PREFS::looks_reasonable(PROJECT& project) {
return false;
}
void CLIENT_STATE::handle_scheduler_reply(SCHEDULER_OP& sched_op) {
void CLIENT_STATE::handle_scheduler_reply(
PROJECT* project, char* scheduler_url
) {
SCHEDULER_REPLY sr;
FILE* f;
int retval;
unsigned int i;
char prefs_backup[256];
PROJECT *project, *pp, *sp;
PROJECT *pp, *sp;
PREFS* new_prefs;
bool signature_valid;
project = sched_op.project;
contacted_sched_server = true;
if (log_flags.sched_ops) {
printf("Got reply from scheduler %s\n", sched_op.scheduler_url);
}
if (log_flags.sched_op_debug) {
f = fopen(SCHED_OP_RESULT_FILE, "r");
printf("------------- SCHEDULER REPLY ----------\n");
@ -267,7 +409,7 @@ void CLIENT_STATE::handle_scheduler_reply(SCHEDULER_OP& sched_op) {
}
if (sr.request_delay) {
project->next_request_time = time(0) + sr.request_delay;
project->min_rpc_time = time(0) + sr.request_delay;
}
if (sr.hostid) {
project->hostid = sr.hostid;
@ -298,7 +440,7 @@ void CLIENT_STATE::handle_scheduler_reply(SCHEDULER_OP& sched_op) {
" <from_scheduler>%s</from_scheduler>\n",
sr.prefs_mod_time,
project->master_url,
sched_op.scheduler_url
scheduler_url
);
fputs(sr.prefs_xml, f);
fprintf(f,

View File

@ -69,7 +69,6 @@ int initialize_prefs() {
}
int main(int argc, char** argv) {
CLIENT_STATE cs;
PREFS* prefs;
FILE* f;
int retval;
@ -97,23 +96,23 @@ int main(int argc, char** argv) {
}
}
cs.init(prefs);
cs.parse_cmdline(argc, argv);
if(cs.run_time_tests()) {
cs.time_tests();
gstate.init(prefs);
gstate.parse_cmdline(argc, argv);
if(gstate.run_time_tests()) {
gstate.time_tests();
}
cs.restart_tasks();
gstate.restart_tasks();
while (1) {
if (!cs.do_something()) {
if (!gstate.do_something()) {
if (log_flags.time_debug) printf("SLEEP 1 SECOND\n");
fflush(stdout);
boinc_sleep(1);
}
if (cs.time_to_exit()) {
if (gstate.time_to_exit()) {
printf("time to exit\n");
break;
}
}
cs.exit_tasks();
gstate.exit_tasks();
return 0;
}

View File

@ -19,6 +19,7 @@
#include <stdio.h>
#include "client_state.h"
#include "client_types.h"
#include "error_numbers.h"
#include "file_names.h"
@ -35,12 +36,82 @@ SCHEDULER_OP::SCHEDULER_OP(HTTP_OP_SET* h) {
http_ops = h;
}
// try to get enough work to bring us up to high-water mark
//
int SCHEDULER_OP::init_get_work() {
double ns = gstate.work_needed_secs();
must_get_work = true;
project = gstate.next_project(0);
if (project) {
init_op_project(ns);
}
return 0;
}
// report results for a particular project.
// also get work from that project if below high-water mark
//
int SCHEDULER_OP::init_return_results(PROJECT* p, double ns) {
must_get_work = false;
project = p;
return init_op_project(ns);
}
// try to initiate an RPC to the current project.
// If there are multiple schedulers, start with the first one
//
int SCHEDULER_OP::init_op_project(double ns) {
int retval;
if (log_flags.sched_op_debug) {
printf("init_op_project: starting op for %s\n", project->master_url);
}
// if project has no schedulers, skip everything else
// and just get its master file.
//
if (project->scheduler_urls.size() == 0) {
init_master_fetch(project);
return 0;
}
url_index = 0;
retval = gstate.make_scheduler_request(project, ns);
if (retval) {
fprintf(stderr, "make_scheduler_request: %d\n", retval);
return retval;
}
return start_rpc();
}
// Set a project's min RPC time to something in the future,
// based on exponential backoff
// TODO: integrate with other backoff sources
//
int SCHEDULER_OP::set_min_rpc_time(PROJECT* p) {
int x = RETRY_BASE_PERIOD;
int i;
int n = p->nrpc_failures;
if (n > RETRY_CAP) n = RETRY_CAP;
for (i=0; i<n; i++) x *= 2;
p->min_rpc_time = time(0) + x;
if (log_flags.sched_op_debug) {
printf(
"setting min RPC time for %s to %d seconds from now\n",
p->master_url, x
);
}
return 0;
}
// low-level routine to initiate an RPC
//
int SCHEDULER_OP::start_rpc() {
FILE *f;
int retval;
// TODO: try scheduler URLs other than the first one
//
strcpy(scheduler_url, project->scheduler_urls[0].text);
strcpy(scheduler_url, project->scheduler_urls[url_index].text);
if (log_flags.sched_ops) {
printf("Sending request to scheduler: %s\n", scheduler_url);
}
@ -51,35 +122,42 @@ int SCHEDULER_OP::start_rpc() {
printf("--------- END ---------\n");
fclose(f);
}
http_op.init_post(
retval = http_op.init_post(
scheduler_url, SCHED_OP_REQUEST_FILE,
SCHED_OP_RESULT_FILE
);
http_ops->insert(&http_op);
if (retval) return retval;
retval = http_ops->insert(&http_op);
if (retval) return retval;
project->rpc_seqno++;
state = SCHEDULER_OP_STATE_RPC;
return 0;
}
int SCHEDULER_OP::start_op(PROJECT* p) {
if(p==NULL) {
fprintf(stderr, "error: SCHEDULER_OP.start_op: unexpected NULL pointer p\n");
// initiate a fetch of a project's master URL file
//
int SCHEDULER_OP::init_master_fetch(PROJECT* p) {
int retval;
if (p==NULL) {
fprintf(stderr, "error: SCHEDULER_OP.init_master_fetch: NULL pointer p\n");
return ERR_NULL;
}
project = p;
if (project->scheduler_urls.size() == 0) {
http_op.init_get(project->master_url, MASTER_FILE_NAME);
http_ops->insert(&http_op);
state = SCHEDULER_OP_STATE_GET_MASTER;
} else {
start_rpc();
if (log_flags.sched_op_debug) {
printf("Fetching master file for %s\n", project->master_url);
}
retval = http_op.init_get(project->master_url, MASTER_FILE_NAME);
if (retval) return retval;
retval = http_ops->insert(&http_op);
if (retval) return retval;
state = SCHEDULER_OP_STATE_GET_MASTER;
return 0;
}
// parse a master file.
//
int SCHEDULER_OP::parse_master_file() {
int SCHEDULER_OP::parse_master_file(vector<STRING256> &urls) {
char buf[256];
STRING256 str;
FILE* f;
@ -92,45 +170,171 @@ int SCHEDULER_OP::parse_master_file() {
project->scheduler_urls.clear();
while (fgets(buf, 256, f)) {
if (parse_str(buf, "<scheduler>", str.text)) {
project->scheduler_urls.push_back(str);
urls.push_back(str);
}
}
if (log_flags.sched_op_debug) {
printf("Parsed master file; got %d scheduler URLs\n", urls.size());
}
return 0;
}
// A master file has just been read.
// transfer scheduler urls to project.
// Return true if any of them is new
//
bool SCHEDULER_OP::update_urls(PROJECT& project, vector<STRING256> &urls) {
unsigned int i, j;
bool found, any_new;
any_new = false;
for (i=0; i<urls.size(); i++) {
found = false;
for (j=0; j<project.scheduler_urls.size(); j++) {
if (!strcmp(urls[i].text, project.scheduler_urls[i].text)) {
found = true;
break;
}
}
if (!found) any_new = true;
}
project.scheduler_urls.clear();
for (i=0; i<urls.size(); i++) {
project.scheduler_urls.push_back(urls[i]);
}
return any_new;
}
// poll routine. If an operation is in progress, check for completion
//
int SCHEDULER_OP::poll() {
int retval;
vector<STRING256> urls;
bool changed, scheduler_op_done;
switch(state) {
case SCHEDULER_OP_STATE_GET_MASTER:
// here we're fetching the master file for a project
//
if (http_op.http_op_state == HTTP_STATE_DONE) {
project->master_url_fetch_pending = false;
http_ops->remove(&http_op);
if (http_op.http_op_retval) {
state = SCHEDULER_OP_STATE_DONE;
scheduler_op_retval = http_op.http_op_retval;
} else {
if (http_op.http_op_retval == 0) {
if (log_flags.sched_op_debug) {
printf(
"Got master file from %s; parsing\n",
project->master_url
);
}
retval = parse_master_file();
if (retval) {
state = SCHEDULER_OP_STATE_DONE;
scheduler_op_retval = retval;
retval = parse_master_file(urls);
if (retval == 0) {
changed = update_urls(*project, urls);
if (changed) {
project->min_rpc_time = 0;
project->nrpc_failures = 0;
}
} else {
start_rpc();
// master file parse failed. treat like RPC error
//
project->nrpc_failures++;
set_min_rpc_time(project);
if (log_flags.sched_op_debug) {
printf("Master file parse failed\n");
}
}
} else {
// fetch of master file failed. Treat like RPC error
//
project->nrpc_failures++;
set_min_rpc_time(project);
if (log_flags.sched_op_debug) {
printf("Master file fetch failed\n");
}
}
project = gstate.next_project_master_pending();
if (project) {
init_master_fetch(project);
} else {
state = SCHEDULER_OP_STATE_IDLE;
if (log_flags.sched_op_debug) {
printf("Scheduler_op: return to idle state\n");
}
}
}
break;
case SCHEDULER_OP_STATE_RPC:
// here we're doing a scheduler RPC to some project
//
scheduler_op_done = false;
if (http_op.http_op_state == HTTP_STATE_DONE) {
state = SCHEDULER_OP_STATE_DONE;
scheduler_op_retval = http_op.http_op_retval;
http_ops->remove(&http_op);
if (http_op.http_op_retval) {
if (log_flags.sched_op_debug) {
printf(
"scheduler RPC to %s failed\n",
project->scheduler_urls[url_index].text
);
}
url_index++;
if (url_index < project->scheduler_urls.size()) {
start_rpc();
} else {
project->nrpc_failures++;
if ((project->nrpc_failures % MASTER_FETCH_PERIOD) == 0) {
project->master_url_fetch_pending = true;
}
set_min_rpc_time(project);
if (must_get_work) {
project = gstate.next_project(project);
if (project) {
init_op_project(gstate.work_needed_secs());
} else {
scheduler_op_done = true;
}
} else {
scheduler_op_done = true;
}
}
} else {
if (log_flags.sched_op_debug) {
printf(
"scheduler RPC to %s succeeded\n",
project->scheduler_urls[url_index].text
);
}
project->nrpc_failures = 0;
gstate.handle_scheduler_reply(project, scheduler_url);
if (must_get_work) {
double x = gstate.work_needed_secs();
if (x > 0) {
project = gstate.next_project(project);
if (project) {
init_op_project(x);
} else {
scheduler_op_done = true;
}
} else {
scheduler_op_done = true;
}
}
}
}
if (scheduler_op_done) {
project = gstate.next_project_master_pending();
if (project) {
init_master_fetch(project);
} else {
state = SCHEDULER_OP_STATE_IDLE;
if (log_flags.sched_op_debug) {
printf("Scheduler_op: return to idle state\n");
}
}
}
break;
default:
break;
}
return 0;
}
@ -148,7 +352,7 @@ SCHEDULER_REPLY::~SCHEDULER_REPLY() {
}
int SCHEDULER_REPLY::parse(FILE* in) {
char buf[256];
char buf[256], *p;
int retval;
if(in==NULL) {
fprintf(stderr, "error: SCHEDULER_REPLY.parse: unexpected NULL pointer in\n");
@ -163,7 +367,7 @@ int SCHEDULER_REPLY::parse(FILE* in) {
code_sign_key = 0;
code_sign_key_signature = 0;
fgets(buf, 256, in);
p = fgets(buf, 256, in);
if (!match_tag(buf, "<scheduler_reply>")) {
fprintf(stderr, "SCHEDULER_REPLY::parse(): bad first tag %s\n", buf);
return ERR_XML_PARSE;

View File

@ -20,36 +20,54 @@
#ifndef _SCHEDULER_OP_
#define _SCHEDULER_OP_
// Logic for communicating with a scheduling server.
// If we don't yet have the addresses a scheduling server,
// we have to get the file from the project's master URL
// and parse if for <scheduler> elements
// TODO: try alternate scheduling servers;
// implement backoff and give-up policies
// SCHEDULER_OP encapsulates the policy and mechanism
// for communicating with scheduling servers.
// It is implemented as a finite-state machine.
// It is active in one of two modes:
// get_work: the client wants to get work, and possibly to
// return results as a side-effect
// return_results: the client wants to return results, and possibly
// to get work as a side-effect
//
#include "client_types.h"
#include "http.h"
#include "prefs.h"
// constants related to scheduler RPC policy
#define MASTER_FETCH_PERIOD 10
// fetch and parse master URL if nrpc_failures is a multiple of this
#define RETRY_BASE_PERIOD 100
// after failure, back off 2^nrpc_failures times this times random
#define RETRY_CAP 10
// cap on nrpc_failures in the above formula
#define SCHEDULER_OP_STATE_IDLE 0
#define SCHEDULER_OP_STATE_GET_MASTER 1
#define SCHEDULER_OP_STATE_RPC 2
#define SCHEDULER_OP_STATE_DONE 3
struct SCHEDULER_OP {
int state;
int scheduler_op_retval;
HTTP_OP http_op;
HTTP_OP_SET* http_ops;
PROJECT* project;
PROJECT* project; // project we're currently contacting
char scheduler_url[256];
bool must_get_work; // true iff in get_work mode
unsigned int url_index; // index within project's URL list
SCHEDULER_OP(HTTP_OP_SET*);
int poll();
int init_get_work();
int init_return_results(PROJECT*, double nsecs);
int init_op_project(double ns);
int init_master_fetch(PROJECT*);
int set_min_rpc_time(PROJECT*);
bool update_urls(PROJECT& project, vector<STRING256> &urls);
int start_op(PROJECT*);
int start_rpc();
int parse_master_file();
int parse_master_file(vector<STRING256>&);
};
struct SCHEDULER_REPLY {

View File

@ -220,6 +220,7 @@ struct RESULT {
int workunitid;
int state;
int hostid;
unsigned int report_deadline;
unsigned int sent_time;
unsigned int received_time;
char name[256];

View File

@ -196,12 +196,12 @@ void struct_to_str(void* vp, char* q, int type) {
rp = (RESULT*)vp;
sprintf(q,
"id=%d, create_time=%d, workunitid=%d, state=%d, "
"hostid=%d, sent_time=%d, received_time=%d, "
"hostid=%d, report_deadline=%d, sent_time=%d, received_time=%d, "
"name='%s', exit_status=%d, cpu_time=%f, "
"xml_doc_in='%s', xml_doc_out='%s', stderr_out='%s', "
"batch=%d, project_state=%d, validated=%d",
rp->id, rp->create_time, rp->workunitid, rp->state,
rp->hostid, rp->sent_time, rp->received_time,
rp->hostid, rp->report_deadline, rp->sent_time, rp->received_time,
rp->name, rp->exit_status, rp->cpu_time,
rp->xml_doc_in, rp->xml_doc_out, rp->stderr_out,
rp->batch, rp->project_state, rp->validated
@ -347,6 +347,7 @@ void row_to_struct(MYSQL_ROW& r, void* vp, int type) {
rp->workunitid = atoi(r[i++]);
rp->state = atoi(r[i++]);
rp->hostid = atoi(r[i++]);
rp->report_deadline = atoi(r[i++]);
rp->sent_time = atoi(r[i++]);
rp->received_time = atoi(r[i++]);
strcpy(rp->name, r[i++]);

View File

@ -132,6 +132,7 @@ create table result (
workunitid integer not null,
state integer not null,
hostid integer not null,
report_deadline integer not null,
sent_time integer not null,
received_time integer not null,
name varchar(254) not null,

View File

@ -0,0 +1,3 @@
<h2>Core client: network scheduling and retry policy</h2>
<p>

View File

@ -61,7 +61,8 @@
<li><a href=client_fsm.html>Core client: FSM structure</a>
<li><a href=client_data.html>Core client: data structures</a>
<li><a href=client_logic.html>Core client: logic</a>
<li><a href=client_network.html>Core client: network scheduling and retry policy</a>
<li><a href=rpc_policy.html>Scheduler RPC timing and retry policies</a>
<li><a href=file_xfer_policy.html>File transfer timing and retry policies</a>
<li><a href=client_debug.html>Core client: debugging</a>
</ul>

View File

@ -8,9 +8,17 @@ The attributes of a result include:
<ul>
<li> The name of the result (unique across all results in the project).
<li> The name of the associated workunit.
<li> The time when the completed result
should be reported to a scheduling server.
This is assigned by the project, and
is used by clients to prioritize operations,
and to initiate scheduler RPCs.
There is no guarantee that the result will actually
be reported by this time.
<li> An XML document listing the names of its output files; see below.
<li> An XML document giving the sizes and checksums of its output files
(this is filled in after the result is completed).
(filled in after the result is completed).
<li> The stderr output of the result.
<li> The host that computed the result.
<li> The times when the result was dispatched and received.

121
doc/rpc_policy.html Normal file
View File

@ -0,0 +1,121 @@
<h2>Scheduler RPC timing and retry policies</h2>
<p>
Each scheduler RPC reports results, gets work, or both.
The client's <b>scheduler RPC policy</b> has several components:
when to make a scheduler RPC,
which project to contact,
which scheduling server for that project,
how much work to ask for,
and what to do if the RPC fails.
<p>
The scheduler RPC policy has the following goals:
<ul>
<li> Make as few scheduler RPCs as possible.
<li> Use random exponential backoff if a project's scheduling
servers are down.
This avoids an RPC storm when the servers come back up.
<li> Eventually re-read a project's master URL file
in case its set of schedulers changes.
<li> Report results before or soon after their deadlines.
</ul>
<h3>Resource debt</h3>
<p>
The client maintains an exponentially-averaged sum
of the CPU time it has devoted to each project.
The constant EXP_DECAY_RATE determines the decay rate
(currently a factor of e every week).
<p>
Each project is assigned a <b>resource debt</b>, computed as
<p>
resource_debt = resource_share / exp_avg_cpu
<p>
Resource debt is a measure of how much work the client owes the project,
and in general the project with the greatest resource debt
is the one from which work should be requested.
<h3>Minimum RPC time</h3>
<p>
<p>
The client maintains a <b>minimum RPC time</b> for each project.
This is the earliest time at which a scheduling RPC should be done
to that project (if zero, an RPC can be done immediately).
The minimum RPC time can be set for various reasons:
<ul>
<li> Because of a request from the project,
i.e. a &lt;request_delay> element in a scheduler reply message.
<li> Because RPCs to all of the project's scheduler has failed.
An exponential backoff policy is used.
<li> Because one of the project's computations has failed
(the application crashed, or a file upload or download failed).
An exponential backoff policy is used
to prevent a cycle of rapid failures.
</ul>
<h3>Scheduler RPC sessions</h3>
<p>
Communication with schedulers is organized into <b>sessions</b>,
each of which may involve many RPCs.
There are two types of sessions:
<ul>
<li><b>Get-work</b> sessions, whose goal is to get a certain amount of work.
Results may be reported as a side-effect.
<li><b>Report-result</b> sessions, whose goal is to report results.
Work may be fetched as a side-effect.
</ul>
The internal logic of scheduler sessions is encapsulated
in the class SCHEDULER_OP.
This is implemented as a state machine,
but its logic expressed as a process might look like:
<pre>
get_work_session() {
while estimated work < high water mark
P = project with greatest debt and min_rpc_time < now
for each scheduler URL of P
attempt an RPC to that URL
if no error break
if some RPC succeeded
P.nrpc_failures = 0
else
P.nrpc_failures++
P.min_rpc_time = exponential_backoff(P.min_rpc_failures)
if P.nrpc_failures mod MASTER_FETCH_PERIOD = 0
P.fetch_master_flag = true
for each project P with P.fetch_master_flag set
read and parse master file
if error
P.nrpc_failures++
P.min_rpc_time = exponential_backoff(P.min_rpc_failures)
if got any new scheduler urls
P.nrpc_failures = 0
P.min_rpc_time = 0
}
report_result_session(project P) {
for each scheduler URL of project
attempt an RPC to that URL
if no error break
if some RPC succeeded
P.nrpc_failures = 0
else
P.nrpc_failures++;
P.min_rpc_time = exponential_backoff(P.min_rpc_failures)
}
</pre>
The logic for initiating scheduler sessions is expressed
in the following poll function:
<pre>
if a scheduler RPC session is not active
if estimated work is less than low-water mark
start a get-work session
else if some project P has overdue results
start a report-result session for P;
is P is the project with greatest resource debt,
the RPC request should ask for enough work to bring us up
to the high-water mark
</pre>

View File

@ -125,6 +125,7 @@ function show_result($result) {
row("state", res_state_string($result->state));
row("host ID", $result->hostid);
row("sent", time_str($result->sent_time));
row("report deadline", time_str($result->report_deadline));
row("received", time_str($result->received_time));
row("exit status", $result->exit_status);
row("CPU time", $result->cpu_time);

View File

@ -138,6 +138,8 @@ int create_result(
FILE* result_template_file, *tempfile;
assert(result_template_filename!=NULL);
memset(&r, 0, sizeof(r));
r.report_deadline = time(0) + 1000;
// TODO: pass this in
r.create_time = time(0);
r.workunitid = wu.id;
r.state = RESULT_STATE_UNSENT;

View File

@ -139,14 +139,6 @@ int main(int argc, char** argv) {
exit(1);
}
#if 0
retval = read_file(result_template_file, result_template);
if (retval) {
fprintf(stderr, "can't open result template\n");
exit(1);
}
#endif
if (wu.dynamic_results) {
strcpy(app.result_xml_template, result_template);
retval = db_app_update(app);