// This file is part of BOINC.
// http://boinc.berkeley.edu
// Copyright (C) 2008 University of California
//
// BOINC is free software; you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License
// as published by the Free Software Foundation,
// either version 3 of the License, or (at your option) any later version.
//
// BOINC is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with BOINC. If not, see .
// Feeder: create a shared memory segment containing DB info,
// including an array of work items (results/workunits to send).
//
// Usage: feeder [ options ]
// [ -d x ] debug level x
// [ --allapps ] interleave results from all applications uniformly
// [ --by_batch ] interleave results from all batches uniformly
// [ --random_order ] order by "random" field of result
// [ --priority_order ] order by decreasing "priority" field of result
// [ --priority_asc ] order by increasing "priority" field of result
// [ --priority_order_create_time ]
// order by priority, then by increasing WU create time
// [ --mod n i ] handle only results with (id mod n) == i
// [ --wmod n i ] handle only workunits with (id mod n) == i
// recommended if using HR with multiple schedulers
// [ --sleep_interval x ] sleep x seconds if nothing to do
// [ --appids a1{,a2} ] get work only for appids a1,...
// (comma-separated list)
// [ --purge_stale x ] remove work items from the shared memory segment
// that have been there for longer then x minutes
// but haven't been assigned
//
// The feeder tries to keep the work array filled.
// It maintains a DB enumerator (DB_WORK_ITEM).
// scan_work_array() scans the work array.
// looking for empty slots and trying to fill them in.
// The enumeration may return results already in the array.
// So, for each result, we scan the entire array to make sure
// it's not there already (can this be streamlined?)
//
// The length of the enum (max and actual) and the number of empty
// slots may differ; either one may be larger.
// New jobs may arrive (from the transitioner at any time).
// So we use the following policies:
//
// - Restart the enum at most once during a given array scan
// - If a scan doesn't add anything (i.e. array is full, or nothing in DB)
// sleep for N seconds
// - If an enumerated job was already in the array,
// stop the scan and sleep for N seconds
// - Otherwise immediately start another scan
// If --allapps is used:
// - there are separate DB enumerators for each app
// - the work array is interleaved by application, based on their weights.
// slot_to_app[] maps slot (i.e. work array index) to app index.
// app_count[] is the number of slots per app
// (approximately proportional to its weight)
// Homogeneous redundancy (HR):
// If HR is used, jobs can either be "uncommitted"
// (can send to any HR class)
// or "committed" (can send only to one HR class).
// The feeder tries to maintain a ratio of committed to uncommitted
// (generally 50/50) and, of committed jobs, ratios between HR classes
// (proportional to the total RAC of hosts in that class).
// This is to maximize the likelihood of having work for an average host.
//
// If you use different HR types between apps, you must use --allapps.
// Otherwise we wouldn't know how many slots to reserve for each HR type.
//
// It's OK to use HR for some apps and not others.
// Trigger files:
// The feeder program periodically checks for two trigger files:
//
// stop_server: destroy shmem and exit
// leave trigger file there (for other daemons)
// reread_db: update DB contents in existing shmem
// delete trigger file
// If you get an "Invalid argument" error when trying to run the feeder,
// it is likely that you aren't able to allocate enough shared memory.
// Either increase the maximum shared memory segment size in the kernel
// configuration, or decrease the MAX_PLATFORMS, MAX_APPS
// MAX_APP_VERSIONS, and MAX_WU_RESULTS in sched_shmem.h
#include "config.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
using std::vector;
#include "boinc_db.h"
#include "error_numbers.h"
#include "filesys.h"
#include "shmem.h"
#include "str_util.h"
#include "svn_version.h"
#include "synch.h"
#include "util.h"
#include "version.h"
#include "credit.h"
#include "sched_config.h"
#include "sched_shmem.h"
#include "sched_util.h"
#include "sched_msgs.h"
#include "hr_info.h"
#ifdef GCL_SIMULATOR
#include "gcl_simulator.h"
#endif
#define DEFAULT_SLEEP_INTERVAL 5
#define AV_UPDATE_PERIOD 600
#define REREAD_DB_FILENAME "reread_db"
#define ENUM_FIRST_PASS 0
#define ENUM_SECOND_PASS 1
#define ENUM_OVER 2
SCHED_SHMEM* ssp;
key_t sema_key;
const char* order_clause="";
char mod_select_clause[256];
int sleep_interval = DEFAULT_SLEEP_INTERVAL;
bool all_apps = false;
int purge_stale_time = 0;
int num_work_items = MAX_WU_RESULTS;
int enum_limit = MAX_WU_RESULTS*2;
// The following defined if --allapps:
int *enum_sizes;
// the enum size per app; else not used
int *app_indices;
// maps slot number to app index, else all zero
int napps;
// number of apps, else one
HR_INFO hr_info;
bool using_hr;
// true iff any app is using HR
bool is_main_feeder = true;
// false if using --mod or --wmod and this one isn't 0
void signal_handler(int) {
log_messages.printf(MSG_NORMAL, "Signaled by simulator\n");
return;
}
void cleanup_shmem() {
ssp->ready = false;
detach_shmem((void*)ssp);
destroy_shmem(config.shmem_key);
}
int check_reread_trigger() {
FILE* f;
f = fopen(config.project_path(REREAD_DB_FILENAME), "r");
if (f) {
fclose(f);
log_messages.printf(MSG_NORMAL,
"Found trigger file %s; re-scanning database tables.\n",
REREAD_DB_FILENAME
);
ssp->init(num_work_items);
ssp->scan_tables();
ssp->perf_info.get_from_db();
int retval = unlink(config.project_path(REREAD_DB_FILENAME));
if (retval) {
// if we can't remove trigger file, exit to avoid infinite loop
//
log_messages.printf(MSG_CRITICAL,
"Can't unlink trigger file; exiting\n"
);
}
log_messages.printf(MSG_NORMAL,
"Done re-scanning: trigger file removed.\n"
);
}
return 0;
}
// Count the # of slots used by HR classes.
// This is done at the start of each array scan,
// and doesn't reflect slots that have been emptied out by the scheduler
//
void hr_count_slots() {
int i, j;
for (i=1; imax_wu_results; i++) {
int app_index = app_indices[i];
int hrt = ssp->apps[app_index].homogeneous_redundancy;
if (!hrt) continue;
WU_RESULT& wu_result = ssp->wu_results[i];
if (wu_result.state == WR_STATE_PRESENT) {
int hrc = wu_result.workunit.hr_class;
if (hrc < 0 || hrc >= hr_nclasses[hrt]) {
log_messages.printf(MSG_CRITICAL,
"HR class %d is out of range\n", hrc
);
continue;
}
hr_info.cur_slots[hrt][hrc]++;
}
}
}
// Enumerate jobs from DB until find one that is not already in the work array.
// If find one, return true.
// If reach end of enum for second time on this array scan, return false
//
static bool get_job_from_db(
DB_WORK_ITEM& wi, // enumerator to get job from
int app_index, // if using --allapps, the app index
int& enum_phase,
int& ncollisions
) {
bool collision;
int retval, j, enum_size;
char select_clause[256];
if (all_apps) {
sprintf(select_clause, "%s and r1.appid=%d",
mod_select_clause, ssp->apps[app_index].id
);
enum_size = enum_sizes[app_index];
} else {
safe_strcpy(select_clause, mod_select_clause);
enum_size = enum_limit;
}
int hrt = ssp->apps[app_index].homogeneous_redundancy;
while (1) {
if (hrt && config.hr_allocate_slots) {
retval = wi.enumerate_all(enum_size, select_clause);
} else {
retval = wi.enumerate(enum_size, select_clause, order_clause);
}
if (retval) {
if (retval != ERR_DB_NOT_FOUND) {
// If DB server dies, exit;
// so /start (run from crontab) will restart us eventually.
//
log_messages.printf(MSG_CRITICAL,
"DB connection lost, exiting\n"
);
exit(0);
}
// we've reach the end of the result set
//
switch (enum_phase) {
case ENUM_FIRST_PASS:
enum_phase = ENUM_SECOND_PASS;
ncollisions = 0;
// disregard collisions - maybe we'll find new jobs
break;
case ENUM_SECOND_PASS:
enum_phase = ENUM_OVER;
return false;
}
log_messages.printf(MSG_NORMAL,
"restarted enumeration for appid %d\n",
ssp->apps[app_index].id
);
} else {
// Check for invalid application ID
//
if (!ssp->lookup_app(wi.wu.appid)) {
#if 0
log_messages.printf(MSG_CRITICAL,
"result [RESULT#%u] has bad appid %d; clean up your DB!\n",
wi.res_id, wi.wu.appid
);
#endif
continue;
}
// if the WU had an error, mark result as DIDNT_NEED
//
if (wi.wu.error_mask) {
char buf[256];
DB_RESULT result;
result.id = wi.res_id;
sprintf(buf, "server_state=%d, outcome=%d",
RESULT_SERVER_STATE_OVER,
RESULT_OUTCOME_DIDNT_NEED
);
result.update_field(buf);
log_messages.printf(MSG_NORMAL,
"[RESULT#%u] WU had error, marking as DIDNT_NEED\n",
wi.res_id
);
continue;
}
// Check for collision (i.e. this result already is in the array)
//
collision = false;
for (j=0; jmax_wu_results; j++) {
if (ssp->wu_results[j].state != WR_STATE_EMPTY && ssp->wu_results[j].resultid == wi.res_id) {
// If the result is already in shared mem,
// and another instance of the WU has been sent,
// bump the infeasible count to encourage
// it to get sent more quickly
//
if (ssp->wu_results[j].infeasible_count == 0) {
if (wi.wu.hr_class > 0) {
ssp->wu_results[j].infeasible_count++;
}
}
ncollisions++;
collision = true;
log_messages.printf(MSG_DEBUG,
"result [RESULT#%u] already in array\n", wi.res_id
);
break;
}
}
if (collision) {
continue;
}
// if using HR, check whether we've exceeded quota for this class
//
if (hrt && config.hr_allocate_slots) {
if (!hr_info.accept(hrt, wi.wu.hr_class)) {
log_messages.printf(MSG_DEBUG,
"rejecting [RESULT#%u] because HR class %d/%d over quota\n",
wi.res_id, hrt, wi.wu.hr_class
);
continue;
}
}
return true;
}
}
return false; // never reached
}
// This function decides the interleaving used for --allapps.
// Inputs:
// n (number of weights)
// k (length of vector)
// a set of weights w(0)..w(n-1)
// Outputs:
// a vector v(0)..v(k-1) with values 0..n-1,
// where each value occurs with the given weight,
// and values are interleaved as much as possible.
// a vector count(0)..count(n-1) saying how many times
// each value occurs in v
//
void weighted_interleave(double* weights, int n, int k, int* v, int* count) {
double *x = (double*) calloc(n, sizeof(double));
int i;
for (i=0; i x[best]) {
best = j;
}
}
v[i] = best;
x[best] -= 1/weights[best];
count[best]++;
}
free(x);
}
// update the job size statistics fields of array entries
//
static void update_job_stats() {
int i, n=0;
double sum=0, sum_sqr=0;
for (i=0; imax_wu_results; i++) {
WU_RESULT& wu_result = ssp->wu_results[i];
if (wu_result.state != WR_STATE_PRESENT) continue;
n++;
double e = wu_result.workunit.rsc_fpops_est;
sum += e;
sum_sqr += e*e;
}
double mean = sum/n;
double stdev = sqrt((sum_sqr - sum*mean)/n);
for (i=0; imax_wu_results; i++) {
WU_RESULT& wu_result = ssp->wu_results[i];
if (wu_result.state != WR_STATE_PRESENT) continue;
double e = wu_result.workunit.rsc_fpops_est;
double diff = e - mean;
wu_result.fpops_size = diff/stdev;
}
}
// We're purging this item because it's been in shared mem too long.
// In general it will get added again soon.
// But if it's committed to an HR class,
// it could be because it got sent to a rare host.
// Un-commit it by zeroing out the WU's hr class,
// and incrementing target_nresults
//
static void purge_stale(WU_RESULT& wu_result) {
DB_WORKUNIT wu;
wu.id = wu_result.workunit.id;
if (wu_result.workunit.hr_class) {
char buf[256];
sprintf(buf,
"hr_class=0, target_nresults=target_nresults+1, transition_time=%ld",
time(0)
);
wu.update_field(buf);
}
}
// Make one pass through the work array, filling in empty slots.
// Return true if we filled in any.
//
static bool scan_work_array(vector &work_items) {
int i;
bool found;
int enum_phase[napps];
int app_index;
int nadditions=0, ncollisions=0;
for (i=0; imax_wu_results; i++) {
app_index = app_indices[i];
DB_WORK_ITEM& wi = work_items[app_index];
WU_RESULT& wu_result = ssp->wu_results[i];
switch (wu_result.state) {
case WR_STATE_PRESENT:
if (purge_stale_time && wu_result.time_added_to_shared_memory < (time(0) - purge_stale_time)) {
log_messages.printf(MSG_NORMAL,
"remove result [RESULT#%u] from slot %d because it is stale\n",
wu_result.resultid, i
);
purge_stale(wu_result);
wu_result.state = WR_STATE_EMPTY;
// fall through, refill this array slot
} else {
break;
}
case WR_STATE_EMPTY:
if (enum_phase[app_index] == ENUM_OVER) continue;
found = get_job_from_db(
wi, app_index, enum_phase[app_index], ncollisions
);
if (found) {
log_messages.printf(MSG_NORMAL,
"adding result [RESULT#%u] in slot %d\n",
wi.res_id, i
);
wu_result.resultid = wi.res_id;
wu_result.res_priority = wi.res_priority;
wu_result.res_server_state = wi.res_server_state;
wu_result.res_report_deadline = wi.res_report_deadline;
wu_result.workunit = wi.wu;
wu_result.state = WR_STATE_PRESENT;
// If the workunit has already been allocated to a certain
// OS then it should be assigned quickly,
// so we set its infeasible_count to 1
//
if (wi.wu.hr_class > 0) {
wu_result.infeasible_count = 1;
} else {
wu_result.infeasible_count = 0;
}
// set the need_reliable flag if needed
//
wu_result.need_reliable = false;
if (config.reliable_on_priority && wu_result.res_priority >= config.reliable_on_priority) {
wu_result.need_reliable = true;
}
wu_result.time_added_to_shared_memory = time(0);
nadditions++;
}
break;
default:
// here the state is a PID; see if it's still alive
//
int pid = wu_result.state;
struct stat s;
char buf[256];
sprintf(buf, "/proc/%d", pid);
log_messages.printf(MSG_NORMAL, "checking pid %d\n", pid);
if (stat(buf, &s)) {
wu_result.state = WR_STATE_PRESENT;
log_messages.printf(MSG_NORMAL,
"Result reserved by non-existent process PID %d; resetting\n",
pid
);
}
}
}
log_messages.printf(MSG_DEBUG, "Added %d results to array\n", nadditions);
if (ncollisions) {
log_messages.printf(MSG_DEBUG,
"%d results already in array\n", ncollisions
);
return false;
}
if (nadditions == 0) {
return false;
}
return true;
}
void feeder_loop() {
vector work_items;
double next_av_update_time=0;
// may need one enumeration per app; create vector
//
for (int i=0; iready = true;
if (!action) {
#ifdef GCL_SIMULATOR
continue_simulation("feeder");
log_messages.printf(MSG_DEBUG, "Waiting for signal\n");
signal(SIGUSR2, simulator_signal_handler);
pause();
#else
log_messages.printf(MSG_DEBUG,
"No action; sleeping %d sec\n", sleep_interval
);
daemon_sleep(sleep_interval);
#endif
} else {
if (config.job_size_matching) {
update_job_stats();
}
}
double now = dtime();
if (is_main_feeder && now > next_av_update_time) {
int retval = update_av_scales(ssp);
if (retval) {
log_messages.printf(MSG_CRITICAL,
"update_av_scales failed: %s\n", boincerror(retval)
);
exit(1);
}
next_av_update_time = now + AV_UPDATE_PERIOD;
}
fflush(stdout);
check_stop_daemons();
check_reread_trigger();
}
}
// see if we're using HR, and if so initialize the necessary data structures
//
void hr_init() {
int i, retval;
bool apps_differ = false;
bool some_app_uses_hr = false;
int hrt, hr_type0 = ssp->apps[0].homogeneous_redundancy;
using_hr = false;
for (i=0; inapps; i++) {
hrt = ssp->apps[i].homogeneous_redundancy;
if (hrt <0 || hrt >= HR_NTYPES) {
log_messages.printf(MSG_CRITICAL,
"HR type %d out of range for app %d\n", hrt, i
);
exit(1);
}
if (hrt) some_app_uses_hr = true;
if (hrt != hr_type0) apps_differ = true;
}
if (config.homogeneous_redundancy) {
log_messages.printf(MSG_NORMAL,
"config HR is %d\n", config.homogeneous_redundancy
);
hrt = config.homogeneous_redundancy;
if (hrt < 0 || hrt >= HR_NTYPES) {
log_messages.printf(MSG_CRITICAL,
"Main HR type %d out of range\n", hrt
);
exit(1);
}
if (some_app_uses_hr) {
log_messages.printf(MSG_CRITICAL,
"You can specify HR at global or app level, but not both\n"
);
exit(1);
}
for (i=0; inapps; i++) {
ssp->apps[i].homogeneous_redundancy = config.homogeneous_redundancy;
ssp->apps[i].weight = 1;
}
} else {
if (some_app_uses_hr) {
if (apps_differ && !all_apps) {
log_messages.printf(MSG_CRITICAL,
"You must use --allapps if apps have different HR\n"
);
exit(1);
}
} else {
return; // HR not being used
}
}
using_hr = true;
if (config.hr_allocate_slots) {
hr_info.init();
retval = hr_info.read_file();
if (retval) {
log_messages.printf(MSG_CRITICAL,
"Can't read HR info file: %s\n", boincerror(retval)
);
exit(1);
}
// find the weight for each HR type
//
for (i=0; inapps; i++) {
hrt = ssp->apps[i].homogeneous_redundancy;
hr_info.type_weights[hrt] += ssp->apps[i].weight;
hr_info.type_being_used[hrt] = true;
}
// compute the slot allocations for HR classes
//
hr_info.allocate(ssp->max_wu_results);
hr_info.show(stderr);
}
}
// write a summary of feeder state to stderr
//
void show_state(int) {
ssp->show(stderr);
if (config.hr_allocate_slots) {
hr_info.show(stderr);
}
}
void show_version() {
log_messages.printf(MSG_NORMAL, "%s\n", SVN_VERSION);
}
void usage(char *name) {
fprintf(stderr,
"%s creates a shared memory segment containing DB info,\n"
"including an array of work items (results/workunits to send).\n\n"
"Usage: %s [OPTION]...\n\n"
"Options:\n"
" [ -d X | --debug_level X] Set log verbosity to X (1..4)\n"
" [ --allapps ] Interleave results from all applications uniformly.\n"
" [ --random_order ] order by \"random\" field of result\n"
" [ --priority_asc ] order by increasing \"priority\" field of result\n"
" [ --priority_order ] order by decreasing \"priority\" field of result\n"
" [ --priority_order_create_time ] order by priority, then by increasing WU create time\n"
" [ --purge_stale x ] remove work items from the shared memory segment after x secs\n"
" that have been there for longer then x minutes\n"
" but haven't been assigned\n"
" [ --appids a1{,a2} ] get work only for appids a1,... (comma-separated list)\n"
" [ --mod n i ] handle only results with (id mod n) == i\n"
" [ --wmod n i ] handle only workunits with (id mod n) == i\n"
" [ --sleep_interval x ] sleep x seconds if nothing to do\n"
" [ -h | --help ] Shows this help text.\n"
" [ -v | --version ] Shows version information.\n",
name, name
);
}
int main(int argc, char** argv) {
int i, retval;
void* p;
char path[MAXPATHLEN], order_buf[1024];
for (i=1; iinit(num_work_items);
atexit(cleanup_shmem);
install_stop_signal_handler();
retval = boinc_db.open(
config.db_name, config.db_host, config.db_user, config.db_passwd
);
if (retval) {
log_messages.printf(MSG_CRITICAL,
"boinc_db.open: %d; %s\n", retval, boinc_db.error_string()
);
exit(1);
}
retval = boinc_db.set_isolation_level(READ_UNCOMMITTED);
if (retval) {
log_messages.printf(MSG_CRITICAL,
"boinc_db.set_isolation_level: %d; %s\n", retval, boinc_db.error_string()
);
}
ssp->scan_tables();
log_messages.printf(MSG_NORMAL,
"read "
"%d platforms, "
"%d apps, "
"%d app_versions, "
"%d assignments\n",
ssp->nplatforms,
ssp->napps,
ssp->napp_versions,
ssp->nassignments
);
log_messages.printf(MSG_NORMAL,
"Using %d job slots\n", ssp->max_wu_results
);
app_indices = (int*) calloc(ssp->max_wu_results, sizeof(int));
// If all_apps is set, make an array saying which array slot
// is associated with which app
//
if (all_apps) {
napps = ssp->napps;
enum_sizes = (int*) calloc(ssp->napps, sizeof(int));
double* weights = (double*) calloc(ssp->napps, sizeof(double));
int* counts = (int*) calloc(ssp->napps, sizeof(int));
if (ssp->app_weight_sum == 0) {
for (i=0; inapps; i++) {
ssp->apps[i].weight = 1;
}
ssp->app_weight_sum = ssp->napps;
}
for (i=0; inapps; i++) {
weights[i] = ssp->apps[i].weight;
}
for (i=0; inapps; i++) {
enum_sizes[i] = (int) floor(0.5 + enum_limit*(weights[i])/(ssp->app_weight_sum));
}
weighted_interleave(
weights, ssp->napps, ssp->max_wu_results, app_indices, counts
);
free(weights);
free(counts);
} else {
napps = 1;
}
hr_init();
if (using_hr && strlen(order_clause)) {
log_messages.printf(MSG_CRITICAL,
"Note: ordering options will not apply to apps for which homogeneous redundancy is used\n"
);
}
retval = ssp->perf_info.get_from_db();
if (retval) {
log_messages.printf(MSG_CRITICAL,
"PERF_INFO::get_from_db(): %d\n", retval
);
}
signal(SIGUSR1, show_state);
feeder_loop();
}
const char *BOINC_RCSID_57c87aa242 = "$Id$";