From 71fec1defef198651878549ee212c55be5e74b89 Mon Sep 17 00:00:00 2001 From: David Anderson Date: Tue, 11 Jan 2005 05:18:34 +0000 Subject: [PATCH] *** empty log message *** svn path=/trunk/boinc/; revision=5070 --- checkin_notes | 25 ++++++ db/db_base.C | 6 ++ db/db_base.h | 1 + sched/sched_locality.C | 168 +++++++++++++++++++++-------------------- 4 files changed, 118 insertions(+), 82 deletions(-) diff --git a/checkin_notes b/checkin_notes index 0ba7093324..173dd721a2 100755 --- a/checkin_notes +++ b/checkin_notes @@ -22471,3 +22471,28 @@ David 10 Jan 2005 client/ client_types.h cs_scheduler.C + +David 10 Jan 2005 + - locality scheduling changes: + Changed the algorithms a bit to avoid lots of DB queries + Invariant: for a given file/user pair, + results are sent in order of increasing ID + send_results_for_file(): + 1) of results for this file sent to this user, + find the one R with largest ID + 2) find unsent result R2 for this file with next ID greater than R, + and (if one result per use per WU) different workunitid + 3) if need more results, set R = R2 and go to 2) + send_new_file_work(): + min_id = 0 + while need more work + find unsent result R with id>min_id of least ID + min_id = R.id + (Note: this is necessary to preserve the invariant) + Changed make_more_work_for_file() to return zero for success + (BOINC convention) + + db/ + db_base.C,h + sched/ + sched_locality.C diff --git a/db/db_base.C b/db/db_base.C index e6505c433a..fcb39ac026 100644 --- a/db/db_base.C +++ b/db/db_base.C @@ -281,6 +281,12 @@ int DB_BASE::count(int& n, char* clause) { return get_integer(query, n); } +int DB_BASE::max_id(int& n, char* clause) { + char query[MAX_QUERY_LEN]; + sprintf(query, "select max(id) from %s %s", table_name, clause); + return get_integer(query, n); +} + int DB_BASE::sum(double& x, char* field, char* clause) { char query[MAX_QUERY_LEN]; diff --git a/db/db_base.h b/db/db_base.h index fd0768c95f..431fcef8bf 100644 --- a/db/db_base.h +++ b/db/db_base.h @@ -89,6 +89,7 @@ public: int enumerate(char* clause="", bool use_use_result=false); int end_enumerate(); int count(int&, char* clause=""); + int max_id(int&, char* clause=""); int sum(double&, char* field, char* clause=""); int get_double(char* query, double&); int get_integer(char* query, int&); diff --git a/sched/sched_locality.C b/sched/sched_locality.C index 06edfa3e74..331d8ef5b7 100644 --- a/sched/sched_locality.C +++ b/sched/sched_locality.C @@ -116,45 +116,45 @@ static int possibly_send_result( } // Check with the WU generator to see if we can make some -// more WU for this file. Returns zero if can't make more work. -// Returns nonzero if it *might* have made more work (no way to -// be sure if it suceeded). +// more WU for this file. +// Returns nonzero if can't make more work. +// Returns zero if it *might* have made more work +// (no way to be sure if it suceeded). // -int made_more_work_for_file(char* filename) { +int make_more_work_for_file(char* filename) { char fullpath[512]; sprintf(fullpath, "../locality_scheduling/no_work_available/%s", filename); FILE *fp=fopen(fullpath, "r"); if (fp) { - // since we found this file, it means that no work - // remains for this WU. So give up trying to interact - // with the WU generator. + // since we found this file, it means that no work remains for this WU. + // So give up trying to interact with the WU generator. fclose(fp); log_messages.printf( SCHED_MSG_LOG::DEBUG, "found %s indicating no work remaining for file %s\n", fullpath, filename ); - } else { - // open and touch a file in the need_work/ - // directory as a way of indicating that we need work for this file. - // If this operation fails, don't worry or tarry! - // - sprintf(fullpath, "../locality_scheduling/need_work/%s", filename); - FILE *fp2=fopen(fullpath, "w"); - if (fp2) { - fclose(fp2); - log_messages.printf( - SCHED_MSG_LOG::DEBUG, - "touching %s: need work for file %s\n", fullpath, filename - ); - return 1; - } else { - log_messages.printf( - SCHED_MSG_LOG::CRITICAL, - "unable to touch %s to indicate need work for file %s\n", fullpath, filename - ); - } + return -1; } + + // open and touch a file in the need_work/ + // directory as a way of indicating that we need work for this file. + // If this operation fails, don't worry or tarry! + // + sprintf(fullpath, "../locality_scheduling/need_work/%s", filename); + FILE *fp2=fopen(fullpath, "w"); + if (!fp2) { + log_messages.printf( + SCHED_MSG_LOG::CRITICAL, + "unable to touch %s to indicate need work for file %s\n", fullpath, filename + ); + return -1; + } + fclose(fp2); + log_messages.printf( + SCHED_MSG_LOG::DEBUG, + "touching %s: need work for file %s\n", fullpath, filename + ); return 0; } @@ -167,51 +167,60 @@ static int send_results_for_file( SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform, WORK_REQ& wreq, SCHED_SHMEM& ss ) { - DB_RESULT result; - int lookup_retval=0, send_retval=0, lastid=0, i; - unsigned int j; - std::vector excluded_wus; + DB_RESULT result, prev_result; + int retval, lookup_retval=0, send_retval=0, i, maxid; char buf[256], query[65000]; + // find largest ID of results already sent to this user for this file + // + sprintf(buf, "where userid=%d and name like '%s__%%'", + reply.user.id, filename + ); + retval = result.max_id(maxid, buf); + if (retval) return retval; + + // and look up that result + // + retval = prev_result.lookup_id(maxid); + if (retval) return retval; + nsent = 0; for (i=0; i<100; i++) { // avoid infinite loop if (!wreq.work_needed(reply)) break; boinc_db.start_transaction(); - // Look for results which match file 'filename' - sprintf(query, - "where name like '%s__%%' and server_state=%d", - filename, RESULT_SERVER_STATE_UNSENT - ); - for (j=0; j%d ", excluded_wus[j]); - strcat(query, buf); + // find unsent result with next larger ID than previous largest ID + // + if (config.one_result_per_user_per_wu) { + + // if one result per user per WU, insist on different WUID too + // + sprintf(query, + "where name like '%s__%%' and id>%d and workunitid<>%d and server_state=%d order by id limit 1 ", + filename, prev_result.id, prev_result.workunitid, RESULT_SERVER_STATE_UNSENT + ); + } else { + sprintf(query, + "where name like '%s__%%' and id>%d and server_state=%d order by id limit 1 ", + filename, prev_result.id, RESULT_SERVER_STATE_UNSENT + ); } - strcat(query, " limit 1"); lookup_retval = result.lookup(query); - // if we see the same result twice, bail (avoid spinning) - // - if (!lookup_retval && (result.id == lastid)) lookup_retval = -1; - if (!lookup_retval) { - // We found a matching result. - // Probably we will get one of these, - // although for example if we already have a - // result for the same workunit and the administrator has - // set one_result_per_wu then we won't get one of these. - // - lastid = result.id; send_retval = possibly_send_result( - result, - sreq, reply, platform, wreq, ss + result, sreq, reply, platform, wreq, ss ); - if (config.one_result_per_user_per_wu) { - excluded_wus.push_back(result.workunitid); - } - if (!send_retval) { + + // if we couldn't send it, something's wacky; + // print a message, but keep on looking + // + if (send_retval) { + log_messages.printf(SCHED_MSG_LOG::NORMAL, "possibly_send_result(): %d\n", send_retval); + } else { nsent++; } + prev_result = result; } boinc_db.commit_transaction(); @@ -232,27 +241,17 @@ static void send_new_file_work( unsigned int j; DB_RESULT result; char filename[256]; - std::vector excluded_wus; char buf[256], query[65000]; for (i=0; i<100; i++) { // avoid infinite loop if (!wreq.work_needed(reply)) break; boinc_db.start_transaction(); sprintf(query, - "where server_state=%d", - RESULT_SERVER_STATE_UNSENT + "where server_state=%d and id>%d order by id limit 1", + RESULT_SERVER_STATE_UNSENT, lastid ); - for (j=0; j%d", excluded_wus[j]); - strcat(query, buf); - } - strcat(query, " limit 1"); lookup_retval = result.lookup(query); - // if we see the same result twice, bail (avoid spinning) - // - if (!lookup_retval && (result.id == lastid)) lookup_retval = -1; - send_retval=0; if (!lookup_retval) { lastid = result.id; @@ -260,21 +259,23 @@ static void send_new_file_work( result, sreq, reply, platform, wreq, ss ); - log_messages.printf(SCHED_MSG_LOG::DEBUG, "possibly_send_result() gives retval=%d\n", send_retval); - - if (config.one_result_per_user_per_wu) { - excluded_wus.push_back(result.workunitid); - } + log_messages.printf(SCHED_MSG_LOG::DEBUG, + "possibly_send_result(): %d\n", send_retval + ); } boinc_db.commit_transaction(); - log_messages.printf(SCHED_MSG_LOG::DEBUG, "lastid=%d last_wuid=%d\n", lastid, last_wuid); + log_messages.printf(SCHED_MSG_LOG::DEBUG, + "lastid=%d last_wuid=%d\n", lastid, last_wuid + ); if (lookup_retval) break; + lastid = result.id; + if (send_retval) continue; - // try to send more result w/ same file + // try to send more results w/ same file // retval = extract_filename(result.name, filename); if (retval) { @@ -287,9 +288,12 @@ static void send_new_file_work( send_results_for_file( filename, nsent, sreq, reply, platform, wreq, ss ); + + // if no result for that file, ask work generator to make more + // if (!nsent && config.locality_scheduling_wait_period) { - // sleep a bit and try again - if (made_more_work_for_file(filename)) { + retval = make_more_work_for_file(filename); + if (!retval) { sleep(config.locality_scheduling_wait_period); send_results_for_file( filename, nsent, sreq, reply, platform, wreq, ss @@ -319,8 +323,8 @@ void send_work_locality( ); if (!nsent && config.locality_scheduling_wait_period) { - // sleep a bit and try again - if (made_more_work_for_file(fi.name)) { + retval = make_more_work_for_file(fi.name); + if (!retval) { sleep(config.locality_scheduling_wait_period); send_results_for_file( fi.name, nsent, sreq, reply, platform, wreq, ss @@ -328,8 +332,8 @@ void send_work_locality( } } - // if we couldn't send any work for this file, tell client - // to delete it + // if we couldn't send any work for this file, + // tell client to delete it // if (nsent == 0) { reply.file_deletes.push_back(fi);