From ce21de044da60e3989f6662a55d850e393aa711f Mon Sep 17 00:00:00 2001 From: David Anderson Date: Fri, 10 Sep 2004 23:03:13 +0000 Subject: [PATCH] *** empty log message *** svn path=/trunk/boinc/; revision=4185 --- sched/sched_locality.C | 219 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 219 insertions(+) create mode 100644 sched/sched_locality.C diff --git a/sched/sched_locality.C b/sched/sched_locality.C new file mode 100644 index 0000000000..568bc874f7 --- /dev/null +++ b/sched/sched_locality.C @@ -0,0 +1,219 @@ +// "Locality scheduling": a scheduling discipline in which +// there are large sticky input files, +// and many WUs share a single input file. +// If a host already has an input file, +// we try to send him another result that uses that file. +// +// The rules for using locality scheduling: +// - the input files must have names such that +// no name is a substring of another +// - result names must be of the form FILENAME__xxx + + +#include + +#include "boinc_db.h" + +#include "main.h" +#include "server_types.h" +#include "sched_shmem.h" +#include "sched_send.h" +#include "sched_msgs.h" +#include "sched_locality.h" + +static int extract_filename(char* in, char* out) { + strcpy(out, in); + char* p = strstr(out, "__"); + if (!p) return -1; + *p = 0; + return 0; +} + +// Find the app and app_version for the client's platform. +// +static int get_app_version( + WORKUNIT& wu, APP* &app, APP_VERSION* &avp, + SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform, + WORK_REQ& wreq, SCHED_SHMEM& ss +) { + bool found; + if (anonymous(platform)) { + app = ss.lookup_app(wu.appid); + found = sreq.has_version(*app); + if (!found) { + return -1; + } + avp = NULL; + } else { + found = find_app_version(wreq, wu, platform, ss, app, avp); + if (!found) { + return -1; + } + + // see if the core client is too old. + // don't bump the infeasible count because this + // isn't the result's fault + // + if (!app_core_compatible(wreq, *avp)) { + return -1; + } + } + return 0; +} + +// possibly send the client this result +// +static int possibly_send_result( + DB_RESULT& result, + SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform, + WORK_REQ& wreq, SCHED_SHMEM& ss +) { + DB_WORKUNIT wu; + DB_RESULT result2; + int retval, count; + char buf[256]; + APP* app; + APP_VERSION* avp; + + if (config.one_result_per_user_per_wu) { + retval = wu.lookup_id(result.workunitid); + if (retval) return retval; + sprintf(buf, "where userid=%d and workunitid=%d", reply.user.id, wu.id); + retval = result2.count(count, buf); + if (retval) return retval; + if (count > 0) return -1; + } + + retval = get_app_version( + wu, app, avp, + sreq, reply, platform, wreq, ss + ); + if (retval) return retval; + + retval = add_result_to_reply(result, wu, reply, platform, wreq, app, avp); + if (retval) return retval; + return 0; +} + +// The client has (or soon will have) the given file. +// Try to send it more results that use the same file +// +static int send_results_for_file( + char* filename, + int& nsent, + SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform, + WORK_REQ& wreq, SCHED_SHMEM& ss +) { + int lastid = 0; + DB_RESULT result; + int retval = 0; + char buf[256]; + + nsent = 0; + while (1) { + if (!wreq.work_needed(reply)) break; + boinc_db.start_transaction(); + sprintf(buf, + "where name like '%s%%' and server_state=%d and id>%d limit 1", + filename, RESULT_SERVER_STATE_UNSENT, lastid + ); + retval = result.lookup(buf); + if (!retval) { + lastid = result.id; + if (possibly_send_result( + result, + sreq, reply, platform, wreq, ss + )) { + nsent++; + } + } + boinc_db.commit_transaction(); + if (retval) break; + } + return 0; +} + +// The client doesn't have any files for which work is available. +// Pick new file(s) to send. +// +static void send_new_file_work( + SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform, + WORK_REQ& wreq, SCHED_SHMEM& ss +) { + int retval, lastid=0, nsent; + DB_RESULT result; + char filename[256]; + char buf[256]; + + while (1) { + if (!wreq.work_needed(reply)) break; + boinc_db.start_transaction(); + sprintf(buf, + "where server_state=%d and id>%d limit 1", + RESULT_SERVER_STATE_UNSENT, lastid + ); + retval = result.lookup(buf); + if (!retval) { + lastid = result.id; + possibly_send_result( + result, + sreq, reply, platform, wreq, ss + ); + } + boinc_db.commit_transaction(); + if (retval) break; + + // try to send more result w/ same file + // + retval = extract_filename(result.name, filename); + if (retval) { + log_messages.printf( + SCHED_MSG_LOG::CRITICAL, + "bad filename: %s", result.name + ); + continue; + } + send_results_for_file( + filename, nsent, + sreq, reply, platform, wreq, ss + ); + } +} + +void send_work_locality( + SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM& platform, + WORK_REQ& wreq, SCHED_SHMEM& ss +) { + unsigned int i; + int retval, nsent; + + if (sreq.file_infos.size() == 0) { + send_new_file_work( + sreq, reply, platform, wreq, ss + ); + } else { + for (i=0; i