*** empty log message ***

svn path=/trunk/boinc/; revision=5070
This commit is contained in:
David Anderson 2005-01-11 05:18:34 +00:00
parent 8addf5b6f9
commit 71fec1defe
4 changed files with 118 additions and 82 deletions

View File

@ -22471,3 +22471,28 @@ David 10 Jan 2005
client/
client_types.h
cs_scheduler.C
David 10 Jan 2005
- locality scheduling changes:
Changed the algorithms a bit to avoid lots of DB queries
Invariant: for a given file/user pair,
results are sent in order of increasing ID
send_results_for_file():
1) of results for this file sent to this user,
find the one R with largest ID
2) find unsent result R2 for this file with next ID greater than R,
and (if one result per use per WU) different workunitid
3) if need more results, set R = R2 and go to 2)
send_new_file_work():
min_id = 0
while need more work
find unsent result R with id>min_id of least ID
min_id = R.id
(Note: this is necessary to preserve the invariant)
Changed make_more_work_for_file() to return zero for success
(BOINC convention)
db/
db_base.C,h
sched/
sched_locality.C

View File

@ -281,6 +281,12 @@ int DB_BASE::count(int& n, char* clause) {
return get_integer(query, n);
}
int DB_BASE::max_id(int& n, char* clause) {
char query[MAX_QUERY_LEN];
sprintf(query, "select max(id) from %s %s", table_name, clause);
return get_integer(query, n);
}
int DB_BASE::sum(double& x, char* field, char* clause) {
char query[MAX_QUERY_LEN];

View File

@ -89,6 +89,7 @@ public:
int enumerate(char* clause="", bool use_use_result=false);
int end_enumerate();
int count(int&, char* clause="");
int max_id(int&, char* clause="");
int sum(double&, char* field, char* clause="");
int get_double(char* query, double&);
int get_integer(char* query, int&);

View File

@ -116,45 +116,45 @@ static int possibly_send_result(
}
// Check with the WU generator to see if we can make some
// more WU for this file. Returns zero if can't make more work.
// Returns nonzero if it *might* have made more work (no way to
// be sure if it suceeded).
// more WU for this file.
// Returns nonzero if can't make more work.
// Returns zero if it *might* have made more work
// (no way to be sure if it suceeded).
//
int made_more_work_for_file(char* filename) {
int make_more_work_for_file(char* filename) {
char fullpath[512];
sprintf(fullpath, "../locality_scheduling/no_work_available/%s", filename);
FILE *fp=fopen(fullpath, "r");
if (fp) {
// since we found this file, it means that no work
// remains for this WU. So give up trying to interact
// with the WU generator.
// since we found this file, it means that no work remains for this WU.
// So give up trying to interact with the WU generator.
fclose(fp);
log_messages.printf(
SCHED_MSG_LOG::DEBUG,
"found %s indicating no work remaining for file %s\n", fullpath, filename
);
} else {
// open and touch a file in the need_work/
// directory as a way of indicating that we need work for this file.
// If this operation fails, don't worry or tarry!
//
sprintf(fullpath, "../locality_scheduling/need_work/%s", filename);
FILE *fp2=fopen(fullpath, "w");
if (fp2) {
fclose(fp2);
log_messages.printf(
SCHED_MSG_LOG::DEBUG,
"touching %s: need work for file %s\n", fullpath, filename
);
return 1;
} else {
log_messages.printf(
SCHED_MSG_LOG::CRITICAL,
"unable to touch %s to indicate need work for file %s\n", fullpath, filename
);
}
return -1;
}
// open and touch a file in the need_work/
// directory as a way of indicating that we need work for this file.
// If this operation fails, don't worry or tarry!
//
sprintf(fullpath, "../locality_scheduling/need_work/%s", filename);
FILE *fp2=fopen(fullpath, "w");
if (!fp2) {
log_messages.printf(
SCHED_MSG_LOG::CRITICAL,
"unable to touch %s to indicate need work for file %s\n", fullpath, filename
);
return -1;
}
fclose(fp2);
log_messages.printf(
SCHED_MSG_LOG::DEBUG,
"touching %s: need work for file %s\n", fullpath, filename
);
return 0;
}
@ -167,51 +167,60 @@ static int send_results_for_file(
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform,
WORK_REQ& wreq, SCHED_SHMEM& ss
) {
DB_RESULT result;
int lookup_retval=0, send_retval=0, lastid=0, i;
unsigned int j;
std::vector<int> excluded_wus;
DB_RESULT result, prev_result;
int retval, lookup_retval=0, send_retval=0, i, maxid;
char buf[256], query[65000];
// find largest ID of results already sent to this user for this file
//
sprintf(buf, "where userid=%d and name like '%s__%%'",
reply.user.id, filename
);
retval = result.max_id(maxid, buf);
if (retval) return retval;
// and look up that result
//
retval = prev_result.lookup_id(maxid);
if (retval) return retval;
nsent = 0;
for (i=0; i<100; i++) { // avoid infinite loop
if (!wreq.work_needed(reply)) break;
boinc_db.start_transaction();
// Look for results which match file 'filename'
sprintf(query,
"where name like '%s__%%' and server_state=%d",
filename, RESULT_SERVER_STATE_UNSENT
);
for (j=0; j<excluded_wus.size(); j++) {
sprintf(buf, " and workunitid<>%d ", excluded_wus[j]);
strcat(query, buf);
// find unsent result with next larger ID than previous largest ID
//
if (config.one_result_per_user_per_wu) {
// if one result per user per WU, insist on different WUID too
//
sprintf(query,
"where name like '%s__%%' and id>%d and workunitid<>%d and server_state=%d order by id limit 1 ",
filename, prev_result.id, prev_result.workunitid, RESULT_SERVER_STATE_UNSENT
);
} else {
sprintf(query,
"where name like '%s__%%' and id>%d and server_state=%d order by id limit 1 ",
filename, prev_result.id, RESULT_SERVER_STATE_UNSENT
);
}
strcat(query, " limit 1");
lookup_retval = result.lookup(query);
// if we see the same result twice, bail (avoid spinning)
//
if (!lookup_retval && (result.id == lastid)) lookup_retval = -1;
if (!lookup_retval) {
// We found a matching result.
// Probably we will get one of these,
// although for example if we already have a
// result for the same workunit and the administrator has
// set one_result_per_wu then we won't get one of these.
//
lastid = result.id;
send_retval = possibly_send_result(
result,
sreq, reply, platform, wreq, ss
result, sreq, reply, platform, wreq, ss
);
if (config.one_result_per_user_per_wu) {
excluded_wus.push_back(result.workunitid);
}
if (!send_retval) {
// if we couldn't send it, something's wacky;
// print a message, but keep on looking
//
if (send_retval) {
log_messages.printf(SCHED_MSG_LOG::NORMAL, "possibly_send_result(): %d\n", send_retval);
} else {
nsent++;
}
prev_result = result;
}
boinc_db.commit_transaction();
@ -232,27 +241,17 @@ static void send_new_file_work(
unsigned int j;
DB_RESULT result;
char filename[256];
std::vector<int> excluded_wus;
char buf[256], query[65000];
for (i=0; i<100; i++) { // avoid infinite loop
if (!wreq.work_needed(reply)) break;
boinc_db.start_transaction();
sprintf(query,
"where server_state=%d",
RESULT_SERVER_STATE_UNSENT
"where server_state=%d and id>%d order by id limit 1",
RESULT_SERVER_STATE_UNSENT, lastid
);
for (j=0; j<excluded_wus.size(); j++) {
sprintf(buf, " and workunitid<>%d", excluded_wus[j]);
strcat(query, buf);
}
strcat(query, " limit 1");
lookup_retval = result.lookup(query);
// if we see the same result twice, bail (avoid spinning)
//
if (!lookup_retval && (result.id == lastid)) lookup_retval = -1;
send_retval=0;
if (!lookup_retval) {
lastid = result.id;
@ -260,21 +259,23 @@ static void send_new_file_work(
result,
sreq, reply, platform, wreq, ss
);
log_messages.printf(SCHED_MSG_LOG::DEBUG, "possibly_send_result() gives retval=%d\n", send_retval);
if (config.one_result_per_user_per_wu) {
excluded_wus.push_back(result.workunitid);
}
log_messages.printf(SCHED_MSG_LOG::DEBUG,
"possibly_send_result(): %d\n", send_retval
);
}
boinc_db.commit_transaction();
log_messages.printf(SCHED_MSG_LOG::DEBUG, "lastid=%d last_wuid=%d\n", lastid, last_wuid);
log_messages.printf(SCHED_MSG_LOG::DEBUG,
"lastid=%d last_wuid=%d\n", lastid, last_wuid
);
if (lookup_retval) break;
lastid = result.id;
if (send_retval) continue;
// try to send more result w/ same file
// try to send more results w/ same file
//
retval = extract_filename(result.name, filename);
if (retval) {
@ -287,9 +288,12 @@ static void send_new_file_work(
send_results_for_file(
filename, nsent, sreq, reply, platform, wreq, ss
);
// if no result for that file, ask work generator to make more
//
if (!nsent && config.locality_scheduling_wait_period) {
// sleep a bit and try again
if (made_more_work_for_file(filename)) {
retval = make_more_work_for_file(filename);
if (!retval) {
sleep(config.locality_scheduling_wait_period);
send_results_for_file(
filename, nsent, sreq, reply, platform, wreq, ss
@ -319,8 +323,8 @@ void send_work_locality(
);
if (!nsent && config.locality_scheduling_wait_period) {
// sleep a bit and try again
if (made_more_work_for_file(fi.name)) {
retval = make_more_work_for_file(fi.name);
if (!retval) {
sleep(config.locality_scheduling_wait_period);
send_results_for_file(
fi.name, nsent, sreq, reply, platform, wreq, ss
@ -328,8 +332,8 @@ void send_work_locality(
}
}
// if we couldn't send any work for this file, tell client
// to delete it
// if we couldn't send any work for this file,
// tell client to delete it
//
if (nsent == 0) {
reply.file_deletes.push_back(fi);