mirror of https://github.com/BOINC/boinc.git
*** empty log message ***
svn path=/trunk/boinc/; revision=5258
This commit is contained in:
parent
896d0b8a46
commit
4a0fb78aa6
|
@ -23597,3 +23597,17 @@ David 31 Jan 2005
|
|||
handle_request.C
|
||||
sched_send.C
|
||||
server_types.C,h
|
||||
|
||||
David 31 Jan 2005
|
||||
- Made WORK_REQ into a member of SCHEDULER_REPLY.
|
||||
This makes it available in handle_request()
|
||||
in case you want to send user messages based on it.
|
||||
Also reduces the number of structs that have to be
|
||||
passed around everywhere.
|
||||
- Fixed formatting in sched_locality.C.
|
||||
Bruce: please use 4-space indentation, no tab chars
|
||||
|
||||
sched/
|
||||
sched_locality.C,h
|
||||
sched_send.C,h
|
||||
server_types.C,y
|
||||
|
|
|
@ -52,7 +52,7 @@ static int extract_filename(char* in, char* out) {
|
|||
static int get_app_version(
|
||||
WORKUNIT& wu, APP* &app, APP_VERSION* &avp,
|
||||
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform,
|
||||
WORK_REQ& wreq, SCHED_SHMEM& ss
|
||||
SCHED_SHMEM& ss
|
||||
) {
|
||||
bool found;
|
||||
if (anonymous(platform)) {
|
||||
|
@ -63,14 +63,14 @@ static int get_app_version(
|
|||
}
|
||||
avp = NULL;
|
||||
} else {
|
||||
found = find_app_version(wreq, wu, platform, ss, app, avp);
|
||||
found = find_app_version(reply.wreq, wu, platform, ss, app, avp);
|
||||
if (!found) {
|
||||
return ERR_NO_APP_VERSION;
|
||||
}
|
||||
|
||||
// see if the core client is too old.
|
||||
//
|
||||
if (!app_core_compatible(wreq, *avp)) {
|
||||
if (!app_core_compatible(reply.wreq, *avp)) {
|
||||
return ERR_NO_APP_VERSION;
|
||||
}
|
||||
}
|
||||
|
@ -85,7 +85,7 @@ static int get_app_version(
|
|||
static int possibly_send_result(
|
||||
DB_RESULT& result,
|
||||
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform,
|
||||
WORK_REQ& wreq, SCHED_SHMEM& ss
|
||||
SCHED_SHMEM& ss
|
||||
) {
|
||||
DB_WORKUNIT wu;
|
||||
DB_RESULT result2;
|
||||
|
@ -104,33 +104,34 @@ static int possibly_send_result(
|
|||
}
|
||||
|
||||
retval = get_app_version(
|
||||
wu, app, avp,
|
||||
sreq, reply, platform, wreq, ss
|
||||
wu, app, avp, sreq, reply, platform, ss
|
||||
);
|
||||
if (retval) return retval;
|
||||
|
||||
return add_result_to_reply(result, wu, reply, platform, wreq, app, avp);
|
||||
return add_result_to_reply(result, wu, reply, platform, app, avp);
|
||||
}
|
||||
|
||||
// returns true if the work generator can not make more work for this
|
||||
// file, false if it can.
|
||||
//
|
||||
bool work_generation_over(char *filename) {
|
||||
static bool work_generation_over(char *filename) {
|
||||
char fullpath[512];
|
||||
sprintf(fullpath, "../locality_scheduling/no_work_available/%s", filename);
|
||||
return boinc_file_exists(fullpath);
|
||||
}
|
||||
|
||||
// returns zero on success, nonzero if didn't touch file
|
||||
//
|
||||
int touch_file(char *path) {
|
||||
FILE *fp;
|
||||
|
||||
if (boinc_file_exists(path))
|
||||
if (boinc_file_exists(path)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if ((fp=fopen(path, "w"))) {
|
||||
fclose(fp);
|
||||
return 0;
|
||||
fclose(fp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
return -1;
|
||||
|
@ -154,7 +155,7 @@ int make_more_work_for_file(char* filename) {
|
|||
);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
// open and touch a file in the need_work/
|
||||
// directory as a way of indicating that we need work for this file.
|
||||
// If this operation fails, don't worry or tarry!
|
||||
|
@ -223,7 +224,7 @@ error_exit:
|
|||
return 1;
|
||||
}
|
||||
|
||||
void flag_for_possible_removal(char* filename) {
|
||||
static void flag_for_possible_removal(char* filename) {
|
||||
char path[256];
|
||||
sprintf(path, "../locality_scheduling/working_set_removal/%s", filename);
|
||||
touch_file(path);
|
||||
|
@ -239,7 +240,7 @@ static int send_results_for_file(
|
|||
char* filename,
|
||||
int& nsent,
|
||||
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform,
|
||||
WORK_REQ& wreq, SCHED_SHMEM& ss,
|
||||
SCHED_SHMEM& ss,
|
||||
bool in_working_set
|
||||
) {
|
||||
DB_RESULT result, prev_result;
|
||||
|
@ -256,7 +257,7 @@ static int send_results_for_file(
|
|||
);
|
||||
#else
|
||||
sprintf(buf, "where userid=%d and name>'%s__' and name<'%s__~'",
|
||||
reply.user.id, filename, filename
|
||||
reply.user.id, filename, filename
|
||||
);
|
||||
#endif
|
||||
retval_max = result.max_id(maxid, buf);
|
||||
|
@ -271,7 +272,7 @@ static int send_results_for_file(
|
|||
for (i=0; i<100; i++) { // avoid infinite loop
|
||||
int query_retval;
|
||||
|
||||
if (!wreq.work_needed(reply)) break;
|
||||
if (!reply.work_needed()) break;
|
||||
|
||||
log_messages.printf(SCHED_MSG_LOG::DEBUG,
|
||||
"in_send_results_for_file(%s, %d) prev_result.id=%d\n", filename, i, prev_result.id
|
||||
|
@ -316,75 +317,76 @@ static int send_results_for_file(
|
|||
query_retval = result.lookup(query);
|
||||
|
||||
if (query_retval) {
|
||||
int make_work_retval;
|
||||
|
||||
// no unsent results are available for this file
|
||||
//
|
||||
boinc_db.commit_transaction();
|
||||
int make_work_retval;
|
||||
|
||||
// no unsent results are available for this file
|
||||
//
|
||||
boinc_db.commit_transaction();
|
||||
|
||||
// see if no more work remains to be made for this file,
|
||||
// or if an attempt to make more work fails.
|
||||
//
|
||||
make_work_retval=make_more_work_for_file(filename);
|
||||
log_messages.printf(SCHED_MSG_LOG::DEBUG,
|
||||
"make_more_work_for_file(%s, %d)=%d\n", filename, i, make_work_retval
|
||||
// see if no more work remains to be made for this file,
|
||||
// or if an attempt to make more work fails.
|
||||
//
|
||||
make_work_retval=make_more_work_for_file(filename);
|
||||
log_messages.printf(SCHED_MSG_LOG::DEBUG,
|
||||
"make_more_work_for_file(%s, %d)=%d\n", filename, i, make_work_retval
|
||||
);
|
||||
|
||||
if (make_work_retval) {
|
||||
// can't make any more work for this file
|
||||
|
||||
if (make_work_retval) {
|
||||
// can't make any more work for this file
|
||||
|
||||
if (config.one_result_per_user_per_wu) {
|
||||
if (config.one_result_per_user_per_wu) {
|
||||
|
||||
// do an EXPENSIVE db query
|
||||
char query[256];
|
||||
// do an EXPENSIVE db query
|
||||
char query[256];
|
||||
#ifdef USE_REGEXP
|
||||
sprintf(query,
|
||||
"where server_state=%d and name like '%s__%%' limit 1",
|
||||
RESULT_SERVER_STATE_UNSENT, filename
|
||||
);
|
||||
sprintf(query,
|
||||
"where server_state=%d and name like '%s__%%' limit 1",
|
||||
RESULT_SERVER_STATE_UNSENT, filename
|
||||
);
|
||||
#else
|
||||
sprintf(query,
|
||||
"where server_state=%d and name>'%s__' and name<'%s__~' limit 1",
|
||||
RESULT_SERVER_STATE_UNSENT, filename, filename
|
||||
);
|
||||
sprintf(query,
|
||||
"where server_state=%d and name>'%s__' and name<'%s__~' limit 1",
|
||||
RESULT_SERVER_STATE_UNSENT, filename, filename
|
||||
);
|
||||
#endif
|
||||
|
||||
// re-using result -- do I need to clear it?
|
||||
if (!result.lookup(query)) {
|
||||
// some results remain -- but they are not suitable
|
||||
// for us because they must be for a WU that we have
|
||||
// already looked at.
|
||||
break;
|
||||
}
|
||||
} // config.one_result_per_user_per_wu
|
||||
// re-using result -- do I need to clear it?
|
||||
if (!result.lookup(query)) {
|
||||
// some results remain -- but they are not suitable
|
||||
// for us because they must be for a WU that we have
|
||||
// already looked at.
|
||||
break;
|
||||
}
|
||||
} // config.one_result_per_user_per_wu
|
||||
|
||||
// arrive here if and only if there exist no further
|
||||
// unsent results for this file.
|
||||
flag_for_possible_removal(filename);
|
||||
log_messages.printf(SCHED_MSG_LOG::DEBUG,
|
||||
"No remaining work for file %s (%d), flagging for removal\n", filename, i
|
||||
);
|
||||
break;
|
||||
} // make_work_retval
|
||||
// arrive here if and only if there exist no further
|
||||
// unsent results for this file.
|
||||
flag_for_possible_removal(filename);
|
||||
log_messages.printf(SCHED_MSG_LOG::DEBUG,
|
||||
"No remaining work for file %s (%d), flagging for removal\n", filename, i
|
||||
);
|
||||
break;
|
||||
} // make_work_retval
|
||||
|
||||
// if the user has not configured us to wait and try
|
||||
// again, we are finished.
|
||||
//
|
||||
if (!config.locality_scheduling_wait_period)
|
||||
break;
|
||||
// if the user has not configured us to wait and try
|
||||
// again, we are finished.
|
||||
//
|
||||
if (!config.locality_scheduling_wait_period) {
|
||||
break;
|
||||
}
|
||||
|
||||
// wait a bit and try again to find a suitable unsent result
|
||||
sleep(config.locality_scheduling_wait_period);
|
||||
|
||||
} // query_retval
|
||||
else {
|
||||
int retval_send;
|
||||
// wait a bit and try again to find a suitable unsent result
|
||||
sleep(config.locality_scheduling_wait_period);
|
||||
|
||||
} // query_retval
|
||||
else {
|
||||
int retval_send;
|
||||
|
||||
// we found an unsent result, so try sending it. This
|
||||
// *should* always work.
|
||||
//
|
||||
// we found an unsent result, so try sending it. This
|
||||
// *should* always work.
|
||||
//
|
||||
retval_send = possibly_send_result(
|
||||
result, sreq, reply, platform, wreq, ss
|
||||
result, sreq, reply, platform, ss
|
||||
);
|
||||
boinc_db.commit_transaction();
|
||||
|
||||
|
@ -395,22 +397,22 @@ static int send_results_for_file(
|
|||
// if we couldn't send it for other reason, something's wacky;
|
||||
// print a message, but keep on looking.
|
||||
|
||||
// David, this is NOT wacky. Consider the following
|
||||
// scenario: WU A has result 1 and WU B has result 2.
|
||||
// These are both sent to a host. Some time later, result
|
||||
// 1 fails and the transitioner creates a new result,
|
||||
// result 3 for WU A. Then the host requests a new
|
||||
// result. The maximum result already sent to the host is
|
||||
// 2. The next unsent result (sorted by ID) is #3. But
|
||||
// since it is for WU A, and since the host has already
|
||||
// gotten a result for WU A, it's infeasible. So I think
|
||||
// this is only wacky if !one_wu_per_result_per_host.
|
||||
if (!retval_send) {
|
||||
nsent++;
|
||||
} else if (!config.one_result_per_user_per_wu) {
|
||||
log_messages.printf(SCHED_MSG_LOG::CRITICAL,
|
||||
"Database inconsistency? possibly_send_result(%d) failed for [RESULT#%d], returning %d\n",
|
||||
i, result.id, retval_send
|
||||
// David, this is NOT wacky. Consider the following
|
||||
// scenario: WU A has result 1 and WU B has result 2.
|
||||
// These are both sent to a host. Some time later, result
|
||||
// 1 fails and the transitioner creates a new result,
|
||||
// result 3 for WU A. Then the host requests a new
|
||||
// result. The maximum result already sent to the host is
|
||||
// 2. The next unsent result (sorted by ID) is #3. But
|
||||
// since it is for WU A, and since the host has already
|
||||
// gotten a result for WU A, it's infeasible. So I think
|
||||
// this is only wacky if !one_wu_per_result_per_host.
|
||||
if (!retval_send) {
|
||||
nsent++;
|
||||
} else if (!config.one_result_per_user_per_wu) {
|
||||
log_messages.printf(SCHED_MSG_LOG::CRITICAL,
|
||||
"Database inconsistency? possibly_send_result(%d) failed for [RESULT#%d], returning %d\n",
|
||||
i, result.id, retval_send
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -439,7 +441,7 @@ static int send_results_for_file(
|
|||
//
|
||||
static int send_new_file_work_deterministic_seeded(
|
||||
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform,
|
||||
WORK_REQ& wreq, SCHED_SHMEM& ss, int& nsent, char *start_f, char *end_f
|
||||
SCHED_SHMEM& ss, int& nsent, char *start_f, char *end_f
|
||||
) {
|
||||
DB_RESULT result;
|
||||
char filename[256], min_resultname[256], query[1024];
|
||||
|
@ -451,9 +453,9 @@ static int send_new_file_work_deterministic_seeded(
|
|||
strcpy(min_resultname, start_f);
|
||||
while (1) {
|
||||
|
||||
// are we done with the search yet?
|
||||
if (end_f && strcmp(min_resultname, end_f)>=0)
|
||||
break;
|
||||
// are we done with the search yet?
|
||||
if (end_f && strcmp(min_resultname, end_f)>=0)
|
||||
break;
|
||||
|
||||
sprintf(query,
|
||||
"where server_state=%d and name>'%s' order by name limit 1",
|
||||
|
@ -464,12 +466,12 @@ static int send_new_file_work_deterministic_seeded(
|
|||
retval = extract_filename(result.name, filename);
|
||||
if (retval) return retval; // not locality scheduled, now what???
|
||||
|
||||
log_messages.printf(SCHED_MSG_LOG::DEBUG,
|
||||
log_messages.printf(SCHED_MSG_LOG::DEBUG,
|
||||
"send_new_file_work_deterministic will try filename %s\n", filename
|
||||
);
|
||||
|
||||
retval = send_results_for_file(
|
||||
filename, nsent, sreq, reply, platform, wreq, ss, false
|
||||
filename, nsent, sreq, reply, platform, ss, false
|
||||
);
|
||||
if (nsent>0) break;
|
||||
// construct a name which is lexically greater than the name of any result
|
||||
|
@ -485,36 +487,39 @@ static int send_new_file_work_deterministic_seeded(
|
|||
//
|
||||
static int send_new_file_work_deterministic(
|
||||
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform,
|
||||
WORK_REQ& wreq, SCHED_SHMEM& ss
|
||||
SCHED_SHMEM& ss
|
||||
) {
|
||||
char start_filename[256];
|
||||
int getfile_retval, nsent=0;
|
||||
char start_filename[256];
|
||||
int getfile_retval, nsent=0;
|
||||
|
||||
// get random filename as starting point for deterministic search
|
||||
if ((getfile_retval = get_working_set_filename(start_filename)))
|
||||
strcpy(start_filename, "");
|
||||
// get random filename as starting point for deterministic search
|
||||
if ((getfile_retval = get_working_set_filename(start_filename))) {
|
||||
strcpy(start_filename, "");
|
||||
}
|
||||
|
||||
// start deterministic search with randomly chosen filename, go to
|
||||
// lexical maximum
|
||||
send_new_file_work_deterministic_seeded(sreq, reply, platform, wreq, ss, nsent, start_filename, NULL);
|
||||
if (nsent)
|
||||
return 0;
|
||||
// start deterministic search with randomly chosen filename, go to
|
||||
// lexical maximum
|
||||
send_new_file_work_deterministic_seeded(sreq, reply, platform, ss, nsent, start_filename, NULL);
|
||||
if (nsent) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// continue deterministic search at lexically first possible
|
||||
// filename, continue to randomly choosen one
|
||||
if (!getfile_retval && wreq.work_needed(reply)) {
|
||||
send_new_file_work_deterministic_seeded(sreq, reply, platform, wreq, ss, nsent, "", start_filename);
|
||||
if (nsent)
|
||||
return 0;
|
||||
}
|
||||
// continue deterministic search at lexically first possible
|
||||
// filename, continue to randomly choosen one
|
||||
if (!getfile_retval && reply.work_needed()) {
|
||||
send_new_file_work_deterministic_seeded(sreq, reply, platform, ss, nsent, "", start_filename);
|
||||
if (nsent) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
return 1;
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
||||
static int send_new_file_work_working_set(
|
||||
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform,
|
||||
WORK_REQ& wreq, SCHED_SHMEM& ss
|
||||
SCHED_SHMEM& ss
|
||||
) {
|
||||
char filename[256];
|
||||
int retval, nsent;
|
||||
|
@ -527,48 +532,52 @@ static int send_new_file_work_working_set(
|
|||
);
|
||||
|
||||
return send_results_for_file(
|
||||
filename, nsent, sreq, reply, platform, wreq, ss, true
|
||||
filename, nsent, sreq, reply, platform, ss, true
|
||||
);
|
||||
}
|
||||
|
||||
// prototype
|
||||
static int send_old_work(
|
||||
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform,
|
||||
WORK_REQ& wreq, SCHED_SHMEM& ss, int timeout);
|
||||
SCHED_SHMEM& ss, int timeout);
|
||||
|
||||
// The host doesn't have any files for which work is available.
|
||||
// Pick new file to send. Returns nonzero if no work is available.
|
||||
//
|
||||
static int send_new_file_work(
|
||||
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform,
|
||||
WORK_REQ& wreq, SCHED_SHMEM& ss
|
||||
SCHED_SHMEM& ss
|
||||
) {
|
||||
|
||||
while (wreq.work_needed(reply)) {
|
||||
int random_time=6*3600+rand()%(6*3600);
|
||||
while (reply.work_needed()) {
|
||||
int random_time=6*3600+rand()%(6*3600);
|
||||
|
||||
// send work that's been hanging around the queue for more than 6
|
||||
// to 12 hours
|
||||
log_messages.printf(SCHED_MSG_LOG::DEBUG,
|
||||
"send_new_file_work() trying to send results created > %.1f hours ago\n", ((double)random_time)/3600.0);
|
||||
send_old_work(sreq, reply, platform, wreq, ss, random_time);
|
||||
// send work that's been hanging around the queue for more than 6
|
||||
// to 12 hours
|
||||
log_messages.printf(SCHED_MSG_LOG::DEBUG,
|
||||
"send_new_file_work() trying to send results created > %.1f hours ago\n", ((double)random_time)/3600.0);
|
||||
send_old_work(sreq, reply, platform, ss, random_time);
|
||||
|
||||
if (wreq.work_needed(reply)) {
|
||||
log_messages.printf(SCHED_MSG_LOG::DEBUG,
|
||||
"send_new_file_work() trying to send from working set\n");
|
||||
send_new_file_work_working_set(sreq, reply, platform, wreq, ss);
|
||||
}
|
||||
if (reply.work_needed()) {
|
||||
log_messages.printf(SCHED_MSG_LOG::DEBUG,
|
||||
"send_new_file_work() trying to send from working set\n"
|
||||
);
|
||||
send_new_file_work_working_set(sreq, reply, platform, ss);
|
||||
}
|
||||
|
||||
if (wreq.work_needed(reply)) {
|
||||
log_messages.printf(SCHED_MSG_LOG::DEBUG,
|
||||
"send_new_file_work() trying deterministic method\n");
|
||||
if (send_new_file_work_deterministic(sreq, reply, platform, wreq, ss)) {
|
||||
// if no work remains at all, we learn it here and return nonzero.
|
||||
return 1;
|
||||
}
|
||||
if (reply.work_needed()) {
|
||||
log_messages.printf(SCHED_MSG_LOG::DEBUG,
|
||||
"send_new_file_work() trying deterministic method\n"
|
||||
);
|
||||
if (send_new_file_work_deterministic(sreq, reply, platform, ss)) {
|
||||
// if no work remains at all,
|
||||
// we learn it here and return nonzero.
|
||||
//
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
@ -577,7 +586,7 @@ static int send_new_file_work(
|
|||
// system?
|
||||
static int send_old_work(
|
||||
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform,
|
||||
WORK_REQ& wreq, SCHED_SHMEM& ss, int timeout
|
||||
SCHED_SHMEM& ss, int timeout
|
||||
) {
|
||||
char buf[1024], filename[256];
|
||||
int retval, extract_retval, nsent;
|
||||
|
@ -590,29 +599,25 @@ static int send_old_work(
|
|||
);
|
||||
retval = result.lookup(buf);
|
||||
if (!retval) {
|
||||
retval = possibly_send_result(
|
||||
result, sreq, reply, platform, wreq, ss
|
||||
);
|
||||
retval = possibly_send_result(result, sreq, reply, platform, ss);
|
||||
boinc_db.commit_transaction();
|
||||
if (!retval) {
|
||||
log_messages.printf(SCHED_MSG_LOG::DEBUG,
|
||||
"send_old_work(%s) send laggard result [RESULT#%d]\n", result.name, result.id
|
||||
);
|
||||
extract_retval=extract_filename(result.name, filename);
|
||||
log_messages.printf(SCHED_MSG_LOG::DEBUG,
|
||||
"send_old_work(%s) send laggard result [RESULT#%d]\n", result.name, result.id
|
||||
);
|
||||
extract_retval=extract_filename(result.name, filename);
|
||||
if (!extract_retval) {
|
||||
send_results_for_file(
|
||||
filename, nsent,
|
||||
sreq, reply, platform, wreq, ss, false
|
||||
filename, nsent, sreq, reply, platform, ss, false
|
||||
);
|
||||
} else {
|
||||
// David, is this right? Is this the only place in
|
||||
// the locality scheduler that non-locality work //
|
||||
// gets done?
|
||||
log_messages.printf(SCHED_MSG_LOG::DEBUG,
|
||||
"Note: sent NON-LOCALITY result %s\n", result.name
|
||||
);
|
||||
}
|
||||
else {
|
||||
// David, is this right? Is this the only place in
|
||||
// the locality scheduler that non-locality work //
|
||||
// gets done?
|
||||
log_messages.printf(SCHED_MSG_LOG::DEBUG,
|
||||
"Note: sent NON-LOCALITY result %s\n", result.name
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
|
@ -626,7 +631,7 @@ static int send_old_work(
|
|||
|
||||
void send_work_locality(
|
||||
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform,
|
||||
WORK_REQ& wreq, SCHED_SHMEM& ss
|
||||
SCHED_SHMEM& ss
|
||||
) {
|
||||
int i;
|
||||
int nsent, nfiles, j, k;
|
||||
|
@ -649,17 +654,17 @@ void send_work_locality(
|
|||
// send old work if there is any
|
||||
//
|
||||
if (config.locality_scheduling_send_timeout) {
|
||||
send_old_work(sreq, reply, platform, wreq, ss, config.locality_scheduling_send_timeout);
|
||||
send_old_work(sreq, reply, platform, ss, config.locality_scheduling_send_timeout);
|
||||
}
|
||||
|
||||
// send work for existing files
|
||||
//
|
||||
for (i=0; i<(int)sreq.file_infos.size(); i++) {
|
||||
k = (i+j)%nfiles;
|
||||
if (!wreq.work_needed(reply)) break;
|
||||
if (!reply.work_needed()) break;
|
||||
FILE_INFO& fi = sreq.file_infos[k];
|
||||
send_results_for_file(
|
||||
fi.name, nsent, sreq, reply, platform, wreq, ss, false
|
||||
fi.name, nsent, sreq, reply, platform, ss, false
|
||||
);
|
||||
|
||||
// if we couldn't send any work for this file, tell client to delete it
|
||||
|
@ -675,8 +680,8 @@ void send_work_locality(
|
|||
|
||||
// send new files if needed
|
||||
//
|
||||
if (wreq.work_needed(reply)) {
|
||||
send_new_file_work(sreq, reply, platform, wreq, ss);
|
||||
if (reply.work_needed()) {
|
||||
send_new_file_work(sreq, reply, platform, ss);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,5 +19,5 @@
|
|||
|
||||
extern void send_work_locality(
|
||||
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform,
|
||||
WORK_REQ& wreq, SCHED_SHMEM& ss
|
||||
SCHED_SHMEM& ss
|
||||
);
|
||||
|
|
|
@ -302,7 +302,7 @@ bool app_core_compatible(WORK_REQ& wreq, APP_VERSION& av) {
|
|||
//
|
||||
int add_wu_to_reply(
|
||||
WORKUNIT& wu, SCHEDULER_REPLY& reply, PLATFORM& platform,
|
||||
WORK_REQ& wreq, APP* app, APP_VERSION* avp
|
||||
APP* app, APP_VERSION* avp
|
||||
) {
|
||||
int retval;
|
||||
WORKUNIT wu2;
|
||||
|
@ -459,13 +459,13 @@ void unlock_sema() {
|
|||
unlock_semaphore(sema_key);
|
||||
}
|
||||
|
||||
bool WORK_REQ::work_needed(SCHEDULER_REPLY& reply) {
|
||||
if (seconds_to_fill <= 0) return false;
|
||||
if (disk_available <= 0) return false;
|
||||
if (nresults >= config.max_wus_to_send) return false;
|
||||
bool SCHEDULER_REPLY::work_needed() {
|
||||
if (wreq.seconds_to_fill <= 0) return false;
|
||||
if (wreq.disk_available <= 0) return false;
|
||||
if (wreq.nresults >= config.max_wus_to_send) return false;
|
||||
if (config.daily_result_quota) {
|
||||
if (reply.host.nresults_today >= config.daily_result_quota) {
|
||||
daily_result_quota_exceeded = true;
|
||||
if (host.nresults_today >= config.daily_result_quota) {
|
||||
wreq.daily_result_quota_exceeded = true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -474,15 +474,15 @@ bool WORK_REQ::work_needed(SCHEDULER_REPLY& reply) {
|
|||
|
||||
int add_result_to_reply(
|
||||
DB_RESULT& result, WORKUNIT& wu, SCHEDULER_REPLY& reply, PLATFORM& platform,
|
||||
WORK_REQ& wreq, APP* app, APP_VERSION* avp
|
||||
APP* app, APP_VERSION* avp
|
||||
) {
|
||||
int retval;
|
||||
double wu_seconds_filled;
|
||||
|
||||
retval = add_wu_to_reply(wu, reply, platform, wreq, app, avp);
|
||||
retval = add_wu_to_reply(wu, reply, platform, app, avp);
|
||||
if (retval) return retval;
|
||||
|
||||
wreq.disk_available -= wu.rsc_disk_bound;
|
||||
reply.wreq.disk_available -= wu.rsc_disk_bound;
|
||||
|
||||
// update the result in DB
|
||||
//
|
||||
|
@ -525,8 +525,8 @@ int add_result_to_reply(
|
|||
);
|
||||
}
|
||||
reply.insert_result(result);
|
||||
wreq.seconds_to_fill -= wu_seconds_filled;
|
||||
wreq.nresults++;
|
||||
reply.wreq.seconds_to_fill -= wu_seconds_filled;
|
||||
reply.wreq.nresults++;
|
||||
reply.host.nresults_today++;
|
||||
return 0;
|
||||
}
|
||||
|
@ -536,7 +536,6 @@ int add_result_to_reply(
|
|||
// previously infeasible for some host
|
||||
//
|
||||
static void scan_work_array(
|
||||
WORK_REQ& wreq,
|
||||
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform,
|
||||
SCHED_SHMEM& ss
|
||||
) {
|
||||
|
@ -548,14 +547,14 @@ static void scan_work_array(
|
|||
APP_VERSION* avp;
|
||||
bool found;
|
||||
|
||||
if (wreq.disk_available < 0) wreq.insufficient_disk = true;
|
||||
if (reply.wreq.disk_available < 0) reply.wreq.insufficient_disk = true;
|
||||
|
||||
lock_sema();
|
||||
|
||||
rnd_off = rand() % ss.nwu_results;
|
||||
for (j=0; j<ss.nwu_results; j++) {
|
||||
i = (j+rnd_off) % ss.nwu_results;
|
||||
if (!wreq.work_needed(reply)) break;
|
||||
if (!reply.work_needed()) break;
|
||||
|
||||
WU_RESULT& wu_result = ss.wu_results[i];
|
||||
|
||||
|
@ -567,12 +566,12 @@ static void scan_work_array(
|
|||
continue;
|
||||
}
|
||||
|
||||
if (wreq.infeasible_only && (wu_result.infeasible_count==0)) {
|
||||
if (reply.wreq.infeasible_only && (wu_result.infeasible_count==0)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (wu_result.workunit.rsc_disk_bound > wreq.disk_available) {
|
||||
wreq.insufficient_disk = true;
|
||||
if (wu_result.workunit.rsc_disk_bound > reply.wreq.disk_available) {
|
||||
reply.wreq.insufficient_disk = true;
|
||||
wu_result.infeasible_count++;
|
||||
continue;
|
||||
}
|
||||
|
@ -589,7 +588,7 @@ static void scan_work_array(
|
|||
//
|
||||
wu = wu_result.workunit;
|
||||
if (!wu_is_feasible(
|
||||
wu, reply.host, wreq, sreq.resource_share_fraction,
|
||||
wu, reply.host, reply.wreq, sreq.resource_share_fraction,
|
||||
sreq.estimated_delay
|
||||
)) {
|
||||
log_messages.printf(
|
||||
|
@ -611,7 +610,7 @@ static void scan_work_array(
|
|||
}
|
||||
avp = NULL;
|
||||
} else {
|
||||
found = find_app_version(wreq, wu, platform, ss, app, avp);
|
||||
found = find_app_version(reply.wreq, wu, platform, ss, app, avp);
|
||||
if (!found) {
|
||||
wu_result.infeasible_count++;
|
||||
continue;
|
||||
|
@ -621,7 +620,7 @@ static void scan_work_array(
|
|||
// don't bump the infeasible count because this
|
||||
// isn't the result's fault
|
||||
//
|
||||
if (!app_core_compatible(wreq, *avp)) {
|
||||
if (!app_core_compatible(reply.wreq, *avp)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
@ -663,7 +662,7 @@ static void scan_work_array(
|
|||
//
|
||||
if (config.homogeneous_redundancy || app->homogeneous_redundancy) {
|
||||
if (already_sent_to_different_platform(
|
||||
sreq, wu_result.workunit, wreq
|
||||
sreq, wu_result.workunit, reply.wreq
|
||||
)) {
|
||||
goto dont_send;
|
||||
}
|
||||
|
@ -706,7 +705,7 @@ static void scan_work_array(
|
|||
//
|
||||
|
||||
retval = add_result_to_reply(
|
||||
result, wu, reply, platform, wreq, app, avp
|
||||
result, wu, reply, platform, app, avp
|
||||
);
|
||||
if (!retval) goto done;
|
||||
|
||||
|
@ -725,55 +724,53 @@ int send_work(
|
|||
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform,
|
||||
SCHED_SHMEM& ss
|
||||
) {
|
||||
WORK_REQ wreq;
|
||||
|
||||
memset(&wreq, 0, sizeof(wreq));
|
||||
// ROMW: Reverting back to older implementation until all clients are 4.x
|
||||
// or higher.
|
||||
#if 1
|
||||
wreq.disk_available = max_allowable_disk(sreq);
|
||||
reply.wreq.disk_available = max_allowable_disk(sreq);
|
||||
#else
|
||||
wreq.disk_available = sreq.project_disk_free;
|
||||
reply.wreq.disk_available = sreq.project_disk_free;
|
||||
#endif
|
||||
wreq.insufficient_disk = false;
|
||||
wreq.insufficient_mem = false;
|
||||
wreq.insufficient_speed = false;
|
||||
wreq.no_app_version = false;
|
||||
wreq.homogeneous_redundancy_reject = false;
|
||||
wreq.daily_result_quota_exceeded = false;
|
||||
wreq.core_client_version = sreq.core_client_major_version*100
|
||||
reply.wreq.insufficient_disk = false;
|
||||
reply.wreq.insufficient_mem = false;
|
||||
reply.wreq.insufficient_speed = false;
|
||||
reply.wreq.no_app_version = false;
|
||||
reply.wreq.homogeneous_redundancy_reject = false;
|
||||
reply.wreq.daily_result_quota_exceeded = false;
|
||||
reply.wreq.core_client_version = sreq.core_client_major_version*100
|
||||
+ sreq.core_client_minor_version;
|
||||
wreq.nresults = 0;
|
||||
reply.wreq.nresults = 0;
|
||||
|
||||
log_messages.printf(
|
||||
SCHED_MSG_LOG::NORMAL,
|
||||
"[HOST#%d] got request for %f seconds of work; available disk %f GB\n",
|
||||
reply.host.id, sreq.work_req_seconds, wreq.disk_available/1e9
|
||||
reply.host.id, sreq.work_req_seconds, reply.wreq.disk_available/1e9
|
||||
);
|
||||
|
||||
if (sreq.work_req_seconds <= 0) return 0;
|
||||
|
||||
wreq.seconds_to_fill = sreq.work_req_seconds;
|
||||
if (wreq.seconds_to_fill > MAX_SECONDS_TO_SEND) {
|
||||
wreq.seconds_to_fill = MAX_SECONDS_TO_SEND;
|
||||
reply.wreq.seconds_to_fill = sreq.work_req_seconds;
|
||||
if (reply.wreq.seconds_to_fill > MAX_SECONDS_TO_SEND) {
|
||||
reply.wreq.seconds_to_fill = MAX_SECONDS_TO_SEND;
|
||||
}
|
||||
if (wreq.seconds_to_fill < MIN_SECONDS_TO_SEND) {
|
||||
wreq.seconds_to_fill = MIN_SECONDS_TO_SEND;
|
||||
if (reply.wreq.seconds_to_fill < MIN_SECONDS_TO_SEND) {
|
||||
reply.wreq.seconds_to_fill = MIN_SECONDS_TO_SEND;
|
||||
}
|
||||
|
||||
if (config.locality_scheduling) {
|
||||
wreq.infeasible_only = false;
|
||||
send_work_locality(sreq, reply, platform, wreq, ss);
|
||||
if (wreq.disk_available < 0)
|
||||
wreq.insufficient_disk = true;
|
||||
reply.wreq.infeasible_only = false;
|
||||
send_work_locality(sreq, reply, platform, ss);
|
||||
if (reply.wreq.disk_available < 0) {
|
||||
reply.wreq.insufficient_disk = true;
|
||||
}
|
||||
} else {
|
||||
// give priority to results that were infeasible for some other host
|
||||
//
|
||||
wreq.infeasible_only = true;
|
||||
scan_work_array(wreq, sreq, reply, platform, ss);
|
||||
reply.wreq.infeasible_only = true;
|
||||
scan_work_array(sreq, reply, platform, ss);
|
||||
|
||||
wreq.infeasible_only = false;
|
||||
scan_work_array(wreq, sreq, reply, platform, ss);
|
||||
reply.wreq.infeasible_only = false;
|
||||
scan_work_array(sreq, reply, platform, ss);
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
@ -790,47 +787,47 @@ int send_work(
|
|||
|
||||
log_messages.printf(
|
||||
SCHED_MSG_LOG::NORMAL, "[HOST#%d] Sent %d results\n",
|
||||
reply.host.id, wreq.nresults
|
||||
reply.host.id, reply.wreq.nresults
|
||||
);
|
||||
|
||||
if (wreq.nresults == 0) {
|
||||
if (reply.wreq.nresults == 0) {
|
||||
reply.request_delay = 3600;
|
||||
USER_MESSAGE um("No work available", "high");
|
||||
reply.insert_message(um);
|
||||
if (wreq.no_app_version) {
|
||||
if (reply.wreq.no_app_version) {
|
||||
USER_MESSAGE um("(there was work for other platforms)", "high");
|
||||
reply.insert_message(um);
|
||||
reply.request_delay = 3600*24;
|
||||
}
|
||||
if (wreq.insufficient_disk) {
|
||||
if (reply.wreq.insufficient_disk) {
|
||||
USER_MESSAGE um(
|
||||
"(there was work but you don't have enough disk space allocated)",
|
||||
"high"
|
||||
);
|
||||
reply.insert_message(um);
|
||||
}
|
||||
if (wreq.insufficient_mem) {
|
||||
if (reply.wreq.insufficient_mem) {
|
||||
USER_MESSAGE um(
|
||||
"(there was work but your computer doesn't have enough memory)",
|
||||
"high"
|
||||
);
|
||||
reply.insert_message(um);
|
||||
}
|
||||
if (wreq.insufficient_speed) {
|
||||
if (reply.wreq.insufficient_speed) {
|
||||
USER_MESSAGE um(
|
||||
"(there was work but your computer would not finish it before it is due",
|
||||
"high"
|
||||
);
|
||||
reply.insert_message(um);
|
||||
}
|
||||
if (wreq.homogeneous_redundancy_reject) {
|
||||
if (reply.wreq.homogeneous_redundancy_reject) {
|
||||
USER_MESSAGE um(
|
||||
"(there was work but it was committed to other platforms",
|
||||
"high"
|
||||
);
|
||||
reply.insert_message(um);
|
||||
}
|
||||
if (wreq.outdated_core) {
|
||||
if (reply.wreq.outdated_core) {
|
||||
USER_MESSAGE um(
|
||||
" (your core client is out of date - please upgrade)",
|
||||
"high"
|
||||
|
@ -842,7 +839,7 @@ int send_work(
|
|||
"Not sending work because core client is outdated\n"
|
||||
);
|
||||
}
|
||||
if (wreq.daily_result_quota_exceeded) {
|
||||
if (reply.wreq.daily_result_quota_exceeded) {
|
||||
USER_MESSAGE um("(daily quota exceeded)", "high");
|
||||
reply.insert_message(um);
|
||||
log_messages.printf(
|
||||
|
|
|
@ -23,7 +23,7 @@ extern int send_work(
|
|||
|
||||
extern int add_result_to_reply(
|
||||
DB_RESULT& result, WORKUNIT& wu, SCHEDULER_REPLY& reply, PLATFORM&,
|
||||
WORK_REQ& wreq, APP* app, APP_VERSION* avp
|
||||
APP* app, APP_VERSION* avp
|
||||
);
|
||||
|
||||
extern bool anonymous(PLATFORM&);
|
||||
|
|
|
@ -298,6 +298,7 @@ int MSG_FROM_HOST_DESC::parse(FILE* fin) {
|
|||
}
|
||||
|
||||
SCHEDULER_REPLY::SCHEDULER_REPLY() {
|
||||
memset(&wreq, 0, sizeof(wreq));
|
||||
request_delay = 0;
|
||||
hostid = 0;
|
||||
send_global_prefs = false;
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
#include "result_state.h"
|
||||
#include "md5_file.h"
|
||||
|
||||
// summary of a client's request for work
|
||||
// summary of a client's request for work, and our response to it
|
||||
//
|
||||
struct WORK_REQ {
|
||||
bool infeasible_only;
|
||||
|
@ -47,7 +47,6 @@ struct WORK_REQ {
|
|||
bool outdated_core;
|
||||
bool daily_result_quota_exceeded;
|
||||
|
||||
bool work_needed(struct SCHEDULER_REPLY&);
|
||||
void update_for_result(double seconds_filled);
|
||||
};
|
||||
|
||||
|
@ -147,6 +146,7 @@ struct USER_MESSAGE {
|
|||
// you must do it in the constructor. Nothing is zeroed by default.
|
||||
//
|
||||
struct SCHEDULER_REPLY {
|
||||
WORK_REQ wreq;
|
||||
int request_delay; // don't request again until this time elapses
|
||||
std::vector<USER_MESSAGE> messages;
|
||||
int hostid;
|
||||
|
@ -181,6 +181,7 @@ struct SCHEDULER_REPLY {
|
|||
void insert_workunit_unique(WORKUNIT&);
|
||||
void insert_result(RESULT&);
|
||||
void insert_message(USER_MESSAGE&);
|
||||
bool work_needed();
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue