// Berkeley Open Infrastructure for Network Computing // http://boinc.berkeley.edu // Copyright (C) 2005 University of California // // This is free software; you can redistribute it and/or // modify it under the terms of the GNU Lesser General Public // License as published by the Free Software Foundation; // either version 2.1 of the License, or (at your option) any later version. // // This software is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU Lesser General Public License for more details. // // To view the GNU Lesser General Public License visit // http://www.gnu.org/copyleft/lesser.html // or write to the Free Software Foundation, Inc., // 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA // Feeder: create a shared memory segment containing DB info, // including an array of work items (results/workunits to send). // // Usage: feeder [ options ] // [ -d x ] debug level x // [ -random_order ] order by "random" field of result // [ -priority_order ] order by decreasing "priority" field of result // [ -priority_order_create_time ] // order by priority, then by increasing WU create time // [ -mod n i ] handle only results with (id mod n) == i // [ -wmod n i ] handle only workunits with (id mod n) == i // recommended if using HR with multiple schedulers // [ -sleep_interval x ] sleep x seconds if nothing to do // [ -allapps ] interleave results from all applications uniformly // [ -purge_stale x ] remove work items from the shared memory segment // that have been there for longer then x minutes // but haven't been assigned // // The feeder tries to keep the work array filled. // It maintains a DB enumerator (DB_WORK_ITEM). // scan_work_array() scans the work array. // looking for empty slots and trying to fill them in. // The enumeration may return results already in the array. // So, for each result, we scan the entire array to make sure // it's not there already (can this be streamlined?) // // The length of the enum (max and actual) and the number of empty // slots may differ; either one may be larger. // New jobs may arrive (from the transitioner at any time). // So we use the following policies: // // - Restart the enum at most once during a given array scan // - If a scan doesn't add anything (i.e. array is full, or nothing in DB) // sleep for N seconds // - If an enumerated job was already in the array, // stop the scan and sleep for N seconds // - Otherwise immediately start another scan // If -allapps is used: // - there are separate DB enumerators for each app // - the work array is interleaved by application, based on their weights. // slot_to_app[] maps slot (i.e. work array index) to app index. // app_count[] is the number of slots per app // (approximately proportional to its weight) // Homogeneous redundancy (HR): // If HR is used, jobs can either be "uncommitted" // (can send to any HR class) // or "committed" (can send only to one HR class). // The feeder tries to maintain a ratio of committed to uncommitted // (generally 50/50) and, of committed jobs, ratios between HR classes // (proportional to the total RAC of hosts in that class). // This is to maximize the likelihood of having work for an average host. // // If you use different HR types between apps, you must use -allapps. // Otherwise we wouldn't know how many slots to reserve for each HR type. // // It's OK to use HR for some apps and not others. // Trigger files: // The feeder program periodically checks for two trigger files: // // stop_server: destroy shmem and exit // leave trigger file there (for other daemons) // reread_db: update DB contents in existing shmem // delete trigger file // If you get an "Invalid argument" error when trying to run the feeder, // it is likely that you aren't able to allocate enough shared memory. // Either increase the maximum shared memory segment size in the kernel // configuration, or decrease the MAX_PLATFORMS, MAX_APPS // MAX_APP_VERSIONS, and MAX_WU_RESULTS in sched_shmem.h #include "config.h" #include #include #include #include #include #include #include #include #include #include #include using std::vector; #include "boinc_db.h" #include "shmem.h" #include "error_numbers.h" #include "synch.h" #include "util.h" #include "str_util.h" #include "sched_config.h" #include "sched_shmem.h" #include "sched_util.h" #include "sched_msgs.h" #include "hr_info.h" #define DEFAULT_SLEEP_INTERVAL 5 #define REREAD_DB_FILENAME "../reread_db" #define ENUM_FIRST_PASS 0 #define ENUM_SECOND_PASS 1 #define ENUM_OVER 2 SCHED_SHMEM* ssp; key_t sema_key; const char* order_clause=""; char mod_select_clause[256]; double sleep_interval = DEFAULT_SLEEP_INTERVAL; bool all_apps = false; int purge_stale_time = 0; int num_work_items = MAX_WU_RESULTS; int enum_limit = MAX_WU_RESULTS*2; int *enum_sizes; // if -allapps, the enum size per app; else not used int *app_indices; // if -allapps, this maps slot number to app index. // otherwise it's all zero int napps; // if -allapps, the number of apps // otherwise one HR_INFO hr_info; bool using_hr; // true iff any app is using HR // put this here (instead of hr_info.C) so that FCGI compile // won't choke on fscanf() // int PERF_INFO::read_file() { FILE* f = fopen(PERF_INFO_FILENAME, "r"); if (!f) return ERR_FOPEN; int n = fscanf(f, "%f %f", &host_fpops_mean, &host_fpops_stdev ); fclose(f); return 0; } void cleanup_shmem() { ssp->ready = false; detach_shmem((void*)ssp); destroy_shmem(config.shmem_key); } int check_reread_trigger() { FILE* f; f = fopen(REREAD_DB_FILENAME, "r"); if (f) { fclose(f); log_messages.printf(MSG_NORMAL, "Found trigger file %s; re-scanning database tables.\n", REREAD_DB_FILENAME ); ssp->init(num_work_items); ssp->scan_tables(); int retval = unlink(REREAD_DB_FILENAME); if (retval) { // if we can't remove trigger file, exit to avoid infinite loop // log_messages.printf(MSG_CRITICAL, "Can't unlink trigger file; exiting\n" ); } log_messages.printf(MSG_NORMAL, "Done re-scanning: trigger file removed.\n" ); } return 0; } // Count the # of slots used by HR classes. // This is done at the start of each array scan, // and doesn't reflect slots that have been emptied out by the scheduler // void hr_count_slots() { int i, j; for (i=1; imax_wu_results; i++) { int app_index = app_indices[i]; int hrt = ssp->apps[app_index].homogeneous_redundancy; if (!hrt) continue; WU_RESULT& wu_result = ssp->wu_results[i]; if (wu_result.state == WR_STATE_PRESENT) { int hrc = wu_result.workunit.hr_class; if (hrc < 0 || hrc >= hr_nclasses[hrt]) { log_messages.printf(MSG_CRITICAL, "HR class %d is out of range\n", hrc ); continue; } hr_info.cur_slots[hrt][hrc]++; } } } // Enumerate jobs from DB until find one that is not already in the work array. // If find one, return true. // If reach end of enum for second time on this array scan, return false // static bool get_job_from_db( DB_WORK_ITEM& wi, // if -allapps, array of enumerators; else just one int app_index, // if using -allapps, the app index int& enum_phase, int& ncollisions ) { bool collision; int retval, j, enum_size; char select_clause[256]; if (all_apps) { sprintf(select_clause, "%s and r1.appid=%d", mod_select_clause, ssp->apps[app_index].id ); enum_size = enum_sizes[app_index]; } else { strcpy(select_clause, mod_select_clause); enum_size = enum_limit; } int hrt = ssp->apps[app_index].homogeneous_redundancy; while (1) { if (hrt) { retval = wi.enumerate_all(enum_size, select_clause); } else { retval = wi.enumerate(enum_size, select_clause, order_clause); } if (retval) { if (retval != ERR_DB_NOT_FOUND) { // If DB server dies, exit; // so /start (run from crontab) will restart us eventually. // log_messages.printf(MSG_CRITICAL, "DB connection lost, exiting\n" ); exit(0); } // we've reach the end of the result set // switch (enum_phase) { case ENUM_FIRST_PASS: enum_phase = ENUM_SECOND_PASS; ncollisions = 0; // disregard collisions - maybe we'll find new jobs break; case ENUM_SECOND_PASS: enum_phase = ENUM_OVER; return false; } log_messages.printf(MSG_NORMAL, "restarted enumeration for appid %d\n", ssp->apps[app_index].id ); } else { // Check for invalid application ID // if (!ssp->lookup_app(wi.wu.appid)) { log_messages.printf(MSG_CRITICAL, "result [RESULT#%d] has bad appid %d; clean up your DB!\n", wi.res_id, wi.wu.appid ); exit(1); } // Check for collision (i.e. this result already is in the array) // collision = false; for (j=0; jmax_wu_results; j++) { if (ssp->wu_results[j].state != WR_STATE_EMPTY && ssp->wu_results[j].resultid == wi.res_id) { // If the result is already in shared mem, // and another instance of the WU has been sent, // bump the infeasible count to encourage // it to get sent more quickly // if (ssp->wu_results[j].infeasible_count == 0) { if (wi.wu.hr_class > 0) { ssp->wu_results[j].infeasible_count++; } } ncollisions++; collision = true; log_messages.printf(MSG_DEBUG, "result [RESULT#%d] already in array\n", wi.res_id ); break; } } if (collision) { continue; } // if using HR, check whether we've exceeded quota for this class // if (hrt) { if (!hr_info.accept(hrt, wi.wu.hr_class)) { log_messages.printf(MSG_DEBUG, "rejecting [RESULT#%d] because HR class %d/%d over quota\n", wi.res_id, hrt, wi.wu.hr_class ); continue; } } return true; } } return false; // never reached } // This function decides the interleaving used for -allapps. // Inputs: // n (number of weights) // k (length of vector) // a set of weights w(0)..w(n-1) // Outputs: // a vector v(0)..v(k-1) with values 0..n-1, // where each value occurs with the given weight, // and values are interleaved as much as possible. // a vector count(0)..count(n-1) saying how many times // each value occurs in v // void weighted_interleave(double* weights, int n, int k, int* v, int* count) { double *x = (double*) calloc(n, sizeof(double)); int i; for (i=0; i x[best]) { best = j; } } v[i] = best; x[best] -= 1/weights[best]; count[best]++; } free(x); } // update the job size statistics fields of array entries // static void update_stats() { int i, n=0; double sum=0, sum_sqr=0; for (i=0; imax_wu_results; i++) { WU_RESULT& wu_result = ssp->wu_results[i]; if (wu_result.state != WR_STATE_PRESENT) continue; n++; double e = wu_result.workunit.rsc_fpops_est; sum += e; sum_sqr += e*e; } double mean = sum/n; double stdev = sqrt((sum_sqr - sum*mean)/n); for (i=0; imax_wu_results; i++) { WU_RESULT& wu_result = ssp->wu_results[i]; if (wu_result.state != WR_STATE_PRESENT) continue; double e = wu_result.workunit.rsc_fpops_est; double diff = e - mean; wu_result.fpops_size = diff/stdev; } } // Make one pass through the work array, filling in empty slots. // Return true if we filled in any. // static bool scan_work_array(vector &work_items) { int i; bool found; int enum_phase[napps]; int app_index; int nadditions=0, ncollisions=0; for (i=0; imax_wu_results; i++) { app_index = app_indices[i]; if (enum_phase[app_index] == ENUM_OVER) continue; DB_WORK_ITEM& wi = work_items[app_index]; WU_RESULT& wu_result = ssp->wu_results[i]; switch (wu_result.state) { case WR_STATE_PRESENT: if (purge_stale_time && wu_result.time_added_to_shared_memory < (time(0) - purge_stale_time)) { wu_result.state = WR_STATE_EMPTY; log_messages.printf(MSG_NORMAL, "remove result [RESULT#%d] from slot %d because it is stale\n", wu_result.resultid, i ); // fall through, refill this array slot } else { break; } case WR_STATE_EMPTY: found = get_job_from_db( wi, app_index, enum_phase[app_index], ncollisions ); if (found) { log_messages.printf(MSG_NORMAL, "adding result [RESULT#%d] in slot %d\n", wi.res_id, i ); wu_result.resultid = wi.res_id; wu_result.result_priority = wi.res_priority; wu_result.workunit = wi.wu; wu_result.state = WR_STATE_PRESENT; // If the workunit has already been allocated to a certain // OS then it should be assigned quickly, // so we set its infeasible_count to 1 // if (wi.wu.hr_class > 0) { wu_result.infeasible_count = 1; } else { wu_result.infeasible_count = 0; } // If using the reliable mechanism, then set the results for // workunits older then the specificed time // as needing a reliable host // wu_result.need_reliable = 0; if (config.reliable_on_priority && wu_result.result_priority >= config.reliable_on_priority) { wu_result.need_reliable = true; } wu_result.time_added_to_shared_memory = time(0); nadditions++; } break; default: // here the state is a PID; see if it's still alive // int pid = wu_result.state; struct stat s; char buf[256]; sprintf(buf, "/proc/%d", pid); if (stat(buf, &s)) { wu_result.state = WR_STATE_PRESENT; log_messages.printf(MSG_NORMAL, "Result reserved by non-existent process PID %d; resetting\n", pid ); } } } log_messages.printf(MSG_DEBUG, "Added %d results to array\n", nadditions); if (ncollisions) { log_messages.printf(MSG_DEBUG, "%d results already in array\n", ncollisions ); return false; } if (nadditions == 0) { return false; } return true; } void feeder_loop() { vector work_items; for (int i=0; iready = true; if (!action) { log_messages.printf(MSG_DEBUG, "No action; sleeping %.2f sec\n", sleep_interval ); boinc_sleep(sleep_interval); } else { if (config.job_size_matching) { update_stats(); } } fflush(stdout); check_stop_daemons(); check_reread_trigger(); } } // see if we're using HR, and if so initialize the necessary data structures // void hr_init() { int i, retval; bool apps_differ = false; bool some_app_uses_hr = false; int hrt, hr_type0 = ssp->apps[0].homogeneous_redundancy; using_hr = false; for (i=0; inapps; i++) { hrt = ssp->apps[i].homogeneous_redundancy; if (hrt <0 || hrt >= HR_NTYPES) { log_messages.printf(MSG_CRITICAL, "HR type %d out of range for app %d\n", hrt, i ); exit(1); } if (hrt) some_app_uses_hr = true; if (hrt != hr_type0) apps_differ = true; } if (config.homogeneous_redundancy) { log_messages.printf(MSG_NORMAL, "config HR is %d\n", config.homogeneous_redundancy ); hrt = config.homogeneous_redundancy; if (hrt < 0 || hrt >= HR_NTYPES) { log_messages.printf(MSG_CRITICAL, "Main HR type %d out of range\n", hrt ); exit(1); } if (some_app_uses_hr) { log_messages.printf(MSG_CRITICAL, "You can specify HR at global or app level, but not both\n" ); exit(1); } for (i=0; inapps; i++) { ssp->apps[i].homogeneous_redundancy = config.homogeneous_redundancy; ssp->apps[i].weight = 1; } } else { if (some_app_uses_hr) { if (apps_differ && !all_apps) { log_messages.printf(MSG_CRITICAL, "You must use -allapps if apps have different HR\n" ); exit(1); } } else { return; // HR not being used } } using_hr = true; hr_info.init(); retval = hr_info.read_file(); if (retval) { log_messages.printf(MSG_CRITICAL, "Can't read HR info file: %d\n", retval ); exit(1); } // find the weight for each HR type // for (i=0; inapps; i++) { int hrt = ssp->apps[i].homogeneous_redundancy; hr_info.type_weights[hrt] += ssp->apps[i].weight; hr_info.type_being_used[hrt] = true; } // compute the slot allocations for HR classes // hr_info.allocate(ssp->max_wu_results); hr_info.show(stderr); } // write a summary of feeder state to stderr // void show_state(int) { ssp->show(stderr); hr_info.show(stderr); } int main(int argc, char** argv) { int i, retval; void* p; char path[256]; unlink(REREAD_DB_FILENAME); retval = config.parse_file(".."); if (retval) { log_messages.printf(MSG_CRITICAL, "Can't parse ../config.xml: %s\n", boincerror(retval) ); exit(1); } for (i=1; iinit(num_work_items); atexit(cleanup_shmem); install_stop_signal_handler(); retval = boinc_db.open( config.db_name, config.db_host, config.db_user, config.db_passwd ); if (retval) { log_messages.printf(MSG_CRITICAL, "boinc_db.open: %d; %s\n", retval, boinc_db.error_string() ); exit(1); } retval = boinc_db.set_isolation_level(READ_UNCOMMITTED); if (retval) { log_messages.printf(MSG_CRITICAL, "boinc_db.set_isolation_level: %d; %s\n", retval, boinc_db.error_string() ); } ssp->scan_tables(); log_messages.printf(MSG_NORMAL, "read " "%d platforms, " "%d apps, " "%d app_versions, " "%d assignments\n", ssp->nplatforms, ssp->napps, ssp->napp_versions, ssp->nassignments ); log_messages.printf(MSG_NORMAL, "Using %d job slots\n", ssp->max_wu_results ); app_indices = (int*) calloc(ssp->max_wu_results, sizeof(int)); // If all_apps is set, make an array saying which array slot // is associated with which app // if (all_apps) { napps = ssp->napps; enum_sizes = (int*) calloc(ssp->napps, sizeof(int)); double* weights = (double*) calloc(ssp->napps, sizeof(double)); int* counts = (int*) calloc(ssp->napps, sizeof(int)); if (ssp->app_weights == 0) { for (i=0; inapps; i++) { ssp->apps[i].weight = 1; } ssp->app_weights = ssp->napps; } for (i=0; inapps; i++) { weights[i] = ssp->apps[i].weight; } for (i=0; inapps; i++) { enum_sizes[i] = (int) floor(0.5 + enum_limit*(weights[i])/(ssp->app_weights)); } weighted_interleave( weights, ssp->napps, ssp->max_wu_results, app_indices, counts ); } else { napps = 1; } hr_init(); if (config.job_size_matching) { retval = ssp->perf_info.read_file(); if (retval) { log_messages.printf(MSG_CRITICAL, "can't read perf_info file; run census\n" ); exit(1); } } signal(SIGUSR1, show_state); feeder_loop(); } const char *BOINC_RCSID_57c87aa242 = "$Id$";