mirror of https://github.com/BOINC/boinc.git
parent
8f86ee9578
commit
a8b47cd62e
|
@ -124,7 +124,7 @@ int CLIENT_STATE::init() {
|
|||
parse_state_file();
|
||||
|
||||
if (log_flags.state_debug) {
|
||||
print_counts();
|
||||
print_summary();
|
||||
}
|
||||
|
||||
// Run the time tests and host information check if needed
|
||||
|
@ -332,20 +332,23 @@ static void print_log(char* p) {
|
|||
}
|
||||
}
|
||||
|
||||
// do_something is where all the action happens. This is part of the
|
||||
// finite state machine abstraction of the client. Each of the key
|
||||
// elements of the client is given a chance to perform work here.
|
||||
// return true if something happened
|
||||
// TODO: handle errors passed back up to here?
|
||||
// do_something is where all the action happens.
|
||||
// Each of the client's finite-state machine layers is polled,
|
||||
// possibly triggering state transitions.
|
||||
// Returns true if something happened
|
||||
//
|
||||
bool CLIENT_STATE::do_something() {
|
||||
int nbytes=0;
|
||||
bool action = false, x;
|
||||
|
||||
if(check_time_tests() == TIME_TESTS_RUNNING) return action;
|
||||
if (check_time_tests() == TIME_TESTS_RUNNING) return action;
|
||||
|
||||
check_suspend_activities();
|
||||
if (!activities_suspended) {
|
||||
|
||||
print_log("Polling; active layers:\n");
|
||||
if (activities_suspended) {
|
||||
print_log("None (suspended)\n");
|
||||
} else {
|
||||
// Call these functions in bottom to top order with
|
||||
// respect to the FSM hierarchy
|
||||
|
||||
|
@ -354,10 +357,10 @@ bool CLIENT_STATE::do_something() {
|
|||
if (nbytes) { max_bytes -= nbytes; action=true; print_log("net_xfers\n"); }
|
||||
|
||||
x = http_ops->poll();
|
||||
if (x) {action=true; print_log("http_ops::poll\n"); }
|
||||
if (x) {action=true; print_log("http_ops\n"); }
|
||||
|
||||
x = file_xfers->poll();
|
||||
if (x) {action=true; print_log("file_xfers::poll\n"); }
|
||||
if (x) {action=true; print_log("file_xfers\n"); }
|
||||
|
||||
x = active_tasks.poll();
|
||||
if (x) {action=true; print_log("active_tasks::poll\n"); }
|
||||
|
@ -366,13 +369,13 @@ bool CLIENT_STATE::do_something() {
|
|||
if (x) {action=true; print_log("active_tasks::poll_time\n"); }
|
||||
|
||||
x = scheduler_rpc_poll();
|
||||
if (x) {action=true; print_log("scheduler_rpc_poll\n"); }
|
||||
if (x) {action=true; print_log("scheduler_rpc\n"); }
|
||||
|
||||
x = start_apps();
|
||||
if (x) {action=true; print_log("start_apps\n"); }
|
||||
|
||||
x = pers_xfers->poll();
|
||||
if (x) {action=true; print_log("pers_xfers->poll\n"); }
|
||||
if (x) {action=true; print_log("pers_xfers\n"); }
|
||||
|
||||
x = handle_running_apps();
|
||||
if (x) {action=true; print_log("handle_running_apps\n"); }
|
||||
|
@ -386,11 +389,11 @@ bool CLIENT_STATE::do_something() {
|
|||
x = update_results();
|
||||
if (x) {action=true; print_log("update_results\n"); }
|
||||
|
||||
if(write_state_file_if_needed())
|
||||
{
|
||||
fprintf(stderr, "CLIENT_STATE::do_something(): could not write state file");
|
||||
}
|
||||
if (write_state_file_if_needed()) {
|
||||
fprintf(stderr, "CLIENT_STATE::do_something(): could not write state file");
|
||||
}
|
||||
}
|
||||
print_log("End poll\n");
|
||||
if (!action) {
|
||||
time_stats.update(true, !activities_suspended);
|
||||
max_bytes = max_transfer_rate;
|
||||
|
@ -818,28 +821,47 @@ int CLIENT_STATE::latest_version_num(char* app_name) {
|
|||
if (avp->version_num < best) continue;
|
||||
best = avp->version_num;
|
||||
}
|
||||
if (best < 0) fprintf(stderr, "CLIENT_STATE::latest_version_num: no version\n");
|
||||
if (best < 0) {
|
||||
fprintf(stderr, "CLIENT_STATE::latest_version_num: no version\n");
|
||||
}
|
||||
return best;
|
||||
}
|
||||
|
||||
// Print debugging information about how many projects/files/etc
|
||||
// are currently in the client state record
|
||||
//
|
||||
void CLIENT_STATE::print_counts() {
|
||||
if (log_flags.state_debug) {
|
||||
printf(
|
||||
"Client state file:\n"
|
||||
"%d projects\n"
|
||||
"%d file_infos\n"
|
||||
"%d app_versions\n"
|
||||
"%d workunits\n"
|
||||
"%d results\n",
|
||||
(int)projects.size(),
|
||||
(int)file_infos.size(),
|
||||
(int)app_versions.size(),
|
||||
(int)workunits.size(),
|
||||
(int)results.size()
|
||||
);
|
||||
void CLIENT_STATE::print_summary() {
|
||||
unsigned int i;
|
||||
if (!log_flags.state_debug) return;
|
||||
|
||||
printf("Client state summary:\n");
|
||||
printf(" %d projects\n", (int)projects.size());
|
||||
for (i=0; i<projects.size(); i++) {
|
||||
printf(" %s\n", projects[i]->master_url);
|
||||
}
|
||||
printf(" %d file_infos\n", (int)file_infos.size());
|
||||
for (i=0; i<file_infos.size(); i++) {
|
||||
printf(" %s status:%d %s\n", file_infos[i]->name, file_infos[i]->status, file_infos[i]->pers_file_xfer?"active":"inactive");
|
||||
}
|
||||
printf(" %d app_versions\n", (int)app_versions.size());
|
||||
for (i=0; i<app_versions.size(); i++) {
|
||||
printf(" %s %d\n", app_versions[i]->app_name, app_versions[i]->version_num);
|
||||
}
|
||||
printf(" %d workunits\n", (int)workunits.size());
|
||||
for (i=0; i<workunits.size(); i++) {
|
||||
printf(" %s\n", workunits[i]->name);
|
||||
}
|
||||
printf(" %d results\n", (int)results.size());
|
||||
for (i=0; i<results.size(); i++) {
|
||||
printf(" %s state:%d\n", results[i]->name, results[i]->state);
|
||||
}
|
||||
printf(" %d persistent file xfers\n", (int)pers_xfers->pers_file_xfers.size());
|
||||
for (i=0; i<pers_xfers->pers_file_xfers.size(); i++) {
|
||||
printf(" %s\n", pers_xfers->pers_file_xfers[i]->fip->name);
|
||||
}
|
||||
printf(" %d active tasks\n", (int)active_tasks.active_tasks.size());
|
||||
for (i=0; i<active_tasks.active_tasks.size(); i++) {
|
||||
printf(" %s\n", active_tasks.active_tasks[i]->result->name);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -962,7 +984,10 @@ bool CLIENT_STATE::garbage_collect() {
|
|||
|
||||
// TODO: delete obsolete APP_VERSIONs
|
||||
|
||||
if (log_flags.state_debug && action) printf("garbage_collect\n");
|
||||
if (action && log_flags.state_debug) {
|
||||
print_summary();
|
||||
}
|
||||
|
||||
return action;
|
||||
}
|
||||
|
||||
|
|
|
@ -126,7 +126,7 @@ private:
|
|||
bool start_apps();
|
||||
bool handle_running_apps();
|
||||
bool handle_pers_file_xfers();
|
||||
void print_counts();
|
||||
void print_summary();
|
||||
bool garbage_collect();
|
||||
bool update_results();
|
||||
|
||||
|
|
|
@ -59,7 +59,7 @@ int CLIENT_STATE::cleanup_and_exit() {
|
|||
int CLIENT_STATE::exit_tasks() {
|
||||
active_tasks.exit_tasks();
|
||||
|
||||
// for now just kill them
|
||||
// for now just kill them
|
||||
unsigned int i;
|
||||
ACTIVE_TASK *atp;
|
||||
for (i=0; i<active_tasks.active_tasks.size(); i++) {
|
||||
|
@ -121,7 +121,7 @@ bool CLIENT_STATE::handle_running_apps() {
|
|||
);
|
||||
}
|
||||
app_finished(*atp);
|
||||
active_tasks.remove(atp);
|
||||
active_tasks.remove(atp);
|
||||
delete atp;
|
||||
set_client_state_dirty("handle_running_apps");
|
||||
action = true;
|
||||
|
@ -170,7 +170,7 @@ bool CLIENT_STATE::start_apps() {
|
|||
if (log_flags.task_debug) {
|
||||
printf("start_apps(): all slots full\n");
|
||||
}
|
||||
return false;
|
||||
return action;
|
||||
}
|
||||
rp = results[i];
|
||||
|
||||
|
@ -188,12 +188,18 @@ bool CLIENT_STATE::start_apps() {
|
|||
atp->slot = open_slot;
|
||||
atp->init(rp);
|
||||
retval = active_tasks.insert(atp);
|
||||
//couldn't start process
|
||||
if(retval) {
|
||||
atp->state = PROCESS_COULDNT_START;
|
||||
atp->result->active_task_state = PROCESS_COULDNT_START;
|
||||
report_project_error(*(atp->result),retval,"Couldn't start the app for this result.\n",CLIENT_COMPUTING);
|
||||
}
|
||||
|
||||
// couldn't start process
|
||||
//
|
||||
if (retval) {
|
||||
atp->state = PROCESS_COULDNT_START;
|
||||
atp->result->active_task_state = PROCESS_COULDNT_START;
|
||||
report_project_error(
|
||||
*(atp->result),retval,
|
||||
"Couldn't start the app for this result.\n",
|
||||
CLIENT_COMPUTING
|
||||
);
|
||||
}
|
||||
action = true;
|
||||
set_client_state_dirty("start_apps");
|
||||
app_started = time(0);
|
||||
|
|
|
@ -462,6 +462,8 @@ int CLIENT_STATE::handle_scheduler_reply(
|
|||
if (!retval) results.push_back(rp);
|
||||
rp->state = RESULT_NEW;
|
||||
nresults++;
|
||||
} else {
|
||||
fprintf(stderr, "ERROR: already have result %s\n", sr.results[i].name);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -484,7 +486,7 @@ int CLIENT_STATE::handle_scheduler_reply(
|
|||
set_client_state_dirty("handle_scheduler_reply");
|
||||
if (log_flags.state_debug) {
|
||||
printf("State after handle_scheduler_reply():\n");
|
||||
print_counts();
|
||||
print_summary();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -42,11 +42,16 @@ bool do_pass(APP app) {
|
|||
RESULT result;
|
||||
bool did_something = false;
|
||||
int retval;
|
||||
char buf[MAX_BLOB_SIZE];
|
||||
|
||||
wu.appid = app.id;
|
||||
wu.assimilate_state = ASSIMILATE_READY;
|
||||
while (!db_workunit_enum_app_assimilate_state(wu)) {
|
||||
did_something = true;
|
||||
|
||||
sprintf(buf, "Assimilating WU %s, assim state\n", wu.name, wu.assimilate_state);
|
||||
write_log(buf);
|
||||
|
||||
switch(wu.main_state) {
|
||||
case WU_MAIN_STATE_INIT:
|
||||
write_log("ERROR; WU shouldn't be in init state\n");
|
||||
|
@ -61,7 +66,9 @@ bool do_pass(APP app) {
|
|||
write_log("can't get canonical result\n");
|
||||
break;
|
||||
}
|
||||
printf("canonical result for WU %s:\n%s", wu.name, result.xml_doc_out);
|
||||
sprintf(buf, "canonical result for WU %s:\n%s", wu.name, result.xml_doc_out);
|
||||
write_log(buf);
|
||||
|
||||
result.file_delete_state = FILE_DELETE_READY;
|
||||
db_result_update(result);
|
||||
break;
|
||||
|
|
|
@ -105,13 +105,14 @@ int check_triggers(SCHED_SHMEM* ssp) {
|
|||
// - We must avoid excessive re-enumeration,
|
||||
// especially when the number of results is less than the array size.
|
||||
// Crude approach: if a "collision" (as above) occurred on
|
||||
// a pass through the array, wait a long time (60 sec)
|
||||
// a pass through the array, wait a long time (5 sec)
|
||||
//
|
||||
void feeder_loop(SCHED_SHMEM* ssp) {
|
||||
int i, j, nadditions, ncollisions, retval;
|
||||
RESULT result;
|
||||
WORKUNIT wu;
|
||||
bool no_wus, collision, restarted_enum;
|
||||
char buf[256];
|
||||
|
||||
while (1) {
|
||||
nadditions = 0;
|
||||
|
@ -120,6 +121,7 @@ void feeder_loop(SCHED_SHMEM* ssp) {
|
|||
restarted_enum = false;
|
||||
for (i=0; i<ssp->nwu_results; i++) {
|
||||
if (!ssp->wu_results[i].present) {
|
||||
try_again:
|
||||
result.server_state = RESULT_SERVER_STATE_UNSENT;
|
||||
retval = db_result_enum_server_state(result, RESULTS_PER_ENUM);
|
||||
if (retval) {
|
||||
|
@ -128,7 +130,7 @@ void feeder_loop(SCHED_SHMEM* ssp) {
|
|||
// there's no point in doing it again.
|
||||
//
|
||||
if (restarted_enum) {
|
||||
printf("feeder: already restarted enum\n");
|
||||
write_log("already restarted enum on this pass\n");
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -137,13 +139,18 @@ void feeder_loop(SCHED_SHMEM* ssp) {
|
|||
restarted_enum = true;
|
||||
result.server_state = RESULT_SERVER_STATE_UNSENT;
|
||||
retval = db_result_enum_server_state(result, RESULTS_PER_ENUM);
|
||||
printf("feeder: restarting enumeration: %d\n", retval);
|
||||
write_log("restarting enumeration\n");
|
||||
if (retval) {
|
||||
printf("feeder: enumeration returned nothing\n");
|
||||
write_log("enumeration restart returned nothing\n");
|
||||
no_wus = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (result.server_state != RESULT_SERVER_STATE_UNSENT) {
|
||||
sprintf(buf, "RESULT STATE CHANGED: %s\n", result.name);
|
||||
write_log(buf);
|
||||
goto try_again;
|
||||
}
|
||||
collision = false;
|
||||
for (j=0; j<ssp->nwu_results; j++) {
|
||||
if (ssp->wu_results[j].present
|
||||
|
@ -155,10 +162,12 @@ void feeder_loop(SCHED_SHMEM* ssp) {
|
|||
}
|
||||
}
|
||||
if (!collision) {
|
||||
printf("feeder: adding result %d in slot %d\n", result.id, i);
|
||||
sprintf(buf, "adding result %d in slot %d\n", result.id, i);
|
||||
write_log(buf);
|
||||
retval = db_workunit(result.workunitid, wu);
|
||||
if (retval) {
|
||||
printf("feeder: can't read workunit %d: %d\n", result.workunitid, retval);
|
||||
sprintf(buf, "can't read workunit %d: %d\n", result.workunitid, retval);
|
||||
write_log(buf);
|
||||
continue;
|
||||
}
|
||||
ssp->wu_results[i].result = result;
|
||||
|
@ -169,17 +178,18 @@ void feeder_loop(SCHED_SHMEM* ssp) {
|
|||
}
|
||||
}
|
||||
if (nadditions == 0) {
|
||||
printf("feeder: no results added\n");
|
||||
write_log("no results added\n");
|
||||
sleep(1);
|
||||
} else {
|
||||
printf("feeder: added %d results to array\n", nadditions);
|
||||
sprintf(buf, "added %d results to array\n", nadditions);
|
||||
write_log(buf);
|
||||
}
|
||||
if (no_wus) {
|
||||
printf("feeder: no results available\n");
|
||||
write_log("feeder: no results available\n");
|
||||
sleep(5);
|
||||
}
|
||||
if (ncollisions) {
|
||||
printf("feeder: some results already in array - sleeping\n");
|
||||
write_log("feeder: some results already in array - sleeping\n");
|
||||
sleep(5);
|
||||
}
|
||||
fflush(stdout);
|
||||
|
|
|
@ -252,7 +252,7 @@ int update_host_record(SCHEDULER_REQUEST& sreq, HOST& host) {
|
|||
|
||||
retval = db_host_update(host);
|
||||
if (retval) {
|
||||
sprintf(buf, "db_host_update: %d\n", retval);
|
||||
sprintf(buf, "db_host_update() failed: %d\n", retval);
|
||||
write_log(buf);
|
||||
}
|
||||
return 0;
|
||||
|
@ -313,65 +313,71 @@ int handle_results(
|
|||
//
|
||||
reply.result_acks.push_back(*rp);
|
||||
|
||||
sprintf(buf, "got ack for result %s\n", rp->name);
|
||||
write_log(buf);
|
||||
|
||||
strncpy(result.name, rp->name, sizeof(result.name));
|
||||
retval = db_result_lookup_name(result);
|
||||
if (retval) {
|
||||
printf("can't find result %s\n", rp->name);
|
||||
sprintf(buf, "can't find result %s\n", rp->name);
|
||||
write_log(buf);
|
||||
continue;
|
||||
}
|
||||
if (result.server_state != RESULT_SERVER_STATE_IN_PROGRESS) {
|
||||
sprintf(buf,
|
||||
"got unexpected result for %s: server state is %d\n",
|
||||
rp->name, result.server_state
|
||||
);
|
||||
write_log(buf);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (result.hostid != sreq.hostid) {
|
||||
sprintf(buf,
|
||||
"got result from wrong host: %d %d\n",
|
||||
result.hostid, sreq.hostid
|
||||
);
|
||||
write_log(buf);
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: handle error returns
|
||||
//
|
||||
result.hostid = reply.host.id;
|
||||
result.received_time = time(0);
|
||||
result.client_state = rp->client_state;
|
||||
result.cpu_time = rp->cpu_time;
|
||||
result.claimed_credit = result.cpu_time * host.credit_per_cpu_sec;
|
||||
result.validate_state = VALIDATE_STATE_NEED_CHECK;
|
||||
if (result.client_state != CLIENT_DONE) {
|
||||
result.validate_state = VALIDATE_STATE_INVALID;
|
||||
//so we won't try to validate this result anymore
|
||||
result.server_state = RESULT_SERVER_STATE_ERROR;
|
||||
} else {
|
||||
if (result.server_state != RESULT_SERVER_STATE_IN_PROGRESS) {
|
||||
sprintf(buf,
|
||||
"got unexpected result for %s: server state is %d\n",
|
||||
rp->name, result.server_state
|
||||
);
|
||||
write_log(buf);
|
||||
continue;
|
||||
}
|
||||
result.server_state = RESULT_SERVER_STATE_DONE;
|
||||
}
|
||||
|
||||
if (result.hostid != sreq.hostid) {
|
||||
sprintf(buf,
|
||||
"got result from wrong host: %d %d\n",
|
||||
result.hostid, sreq.hostid
|
||||
);
|
||||
write_log(buf);
|
||||
continue;
|
||||
}
|
||||
|
||||
strncpy(result.stderr_out, rp->stderr_out, sizeof(result.stderr_out));
|
||||
strncpy(result.xml_doc_out, rp->xml_doc_out, sizeof(result.xml_doc_out));
|
||||
retval = db_result_update(result);
|
||||
if (retval) {
|
||||
sprintf(buf, "can't update result %d\n", result.id);
|
||||
write_log(buf);
|
||||
}
|
||||
|
||||
// TODO: handle error returns
|
||||
//
|
||||
result.hostid = reply.host.id;
|
||||
result.received_time = time(0);
|
||||
result.client_state = rp->client_state;
|
||||
result.cpu_time = rp->cpu_time;
|
||||
result.claimed_credit = result.cpu_time * host.credit_per_cpu_sec;
|
||||
result.validate_state = VALIDATE_STATE_NEED_CHECK;
|
||||
if (result.client_state != CLIENT_DONE) {
|
||||
result.validate_state = VALIDATE_STATE_INVALID;
|
||||
//so we won't try to validate this result anymore
|
||||
result.server_state = RESULT_SERVER_STATE_ERROR;
|
||||
} else {
|
||||
result.server_state = RESULT_SERVER_STATE_DONE;
|
||||
}
|
||||
|
||||
|
||||
strncpy(result.stderr_out, rp->stderr_out, sizeof(result.stderr_out));
|
||||
strncpy(result.xml_doc_out, rp->xml_doc_out, sizeof(result.xml_doc_out));
|
||||
retval = db_result_update(result);
|
||||
retval = db_workunit(result.workunitid, wu);
|
||||
if (retval) {
|
||||
sprintf(buf,
|
||||
"can't find WU %d for result %d\n",
|
||||
result.workunitid, result.id
|
||||
);
|
||||
write_log(buf);
|
||||
} else {
|
||||
wu.need_validate = 1;
|
||||
retval = db_workunit_update(wu);
|
||||
if (retval) {
|
||||
fprintf(stderr, "can't update result %d\n", result.id);
|
||||
}
|
||||
|
||||
retval = db_workunit(result.workunitid, wu);
|
||||
if (retval) {
|
||||
fprintf(stderr,
|
||||
"can't find WU %d for result %d\n",
|
||||
result.workunitid, result.id
|
||||
);
|
||||
} else {
|
||||
wu.need_validate = 1;
|
||||
retval = db_workunit_update(wu);
|
||||
if (retval) {
|
||||
write_log("Can't update WU\n");
|
||||
}
|
||||
write_log("Can't update WU\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -398,6 +404,10 @@ int send_work(
|
|||
int i, retval, nresults = 0, seconds_to_fill;
|
||||
WORKUNIT wu;
|
||||
RESULT result, result_copy;
|
||||
char buf[256];
|
||||
|
||||
sprintf(buf, "got request for %d seconds of work\n", sreq.work_req_seconds);
|
||||
write_log(buf);
|
||||
|
||||
seconds_to_fill = sreq.work_req_seconds;
|
||||
if (seconds_to_fill > MAX_SECONDS_TO_SEND) {
|
||||
|
@ -456,6 +466,9 @@ int send_work(
|
|||
if (nresults == MAX_WUS_TO_SEND) break;
|
||||
}
|
||||
|
||||
sprintf(buf, "sending %d results\n", nresults);
|
||||
write_log(buf);
|
||||
|
||||
if (nresults == 0) {
|
||||
strcpy(reply.message, "no work available");
|
||||
strcpy(reply.message_priority, "low");
|
||||
|
@ -569,6 +582,7 @@ void handle_request(
|
|||
SCHEDULER_REQUEST sreq;
|
||||
SCHEDULER_REPLY sreply;
|
||||
|
||||
write_log("Handling request\n");
|
||||
memset(&sreq, 0, sizeof(sreq));
|
||||
sreq.parse(fin);
|
||||
process_request(sreq, sreply, ss, code_sign_key);
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
<file_xfer/>
|
||||
<sched_ops/>
|
||||
<state_debug/>
|
||||
<poll_debug/>
|
||||
<task_debug/>
|
||||
<file_xfer_debug/>
|
||||
<sched_op_debug/>
|
||||
|
|
Loading…
Reference in New Issue