example app: print "starting" message after boinc_init, so that it appears in stdferr file

Also remove old score-based sched code
This commit is contained in:
David Anderson 2013-12-10 14:00:31 -08:00
parent 2882ca2c94
commit 6d4999767f
3 changed files with 19 additions and 339 deletions

View File

@ -155,6 +155,14 @@ int main(int argc, char **argv) {
if (strstr(argv[i], "trickle_up")) trickle_up = true;
if (strstr(argv[i], "trickle_down")) trickle_down = true;
}
retval = boinc_init();
if (retval) {
fprintf(stderr, "%s boinc_init returned %d\n",
boinc_msg_prefix(buf, sizeof(buf)), retval
);
exit(retval);
}
fprintf(stderr, "%s app started; CPU time %f, flags:%s%s%s%s%s%s%s\n",
boinc_msg_prefix(buf, sizeof(buf)),
cpu_time,
@ -167,14 +175,6 @@ int main(int argc, char **argv) {
trickle_down?" trickle_down":""
);
retval = boinc_init();
if (retval) {
fprintf(stderr, "%s boinc_init returned %d\n",
boinc_msg_prefix(buf, sizeof(buf)), retval
);
exit(retval);
}
// open the input file (resolve logical name first)
//
boinc_resolve_filename(INPUT_FILENAME, input_path, sizeof(input_path));

View File

@ -15,7 +15,12 @@
// You should have received a copy of the GNU Lesser General Public License
// along with BOINC. If not, see <http://www.gnu.org/licenses/>.
// Matchmaker scheduling code
// job dispatch using a score-based approach:
// - scan the job array, assigning a score to each job and building a list
// (the score reflect a variety of factors).
// - sort the list
// - send jobs in order of decreasing score until request is satisfied
// - do the above separately for each resource type
#include <algorithm>
@ -35,8 +40,8 @@
#include "sched_score.h"
#ifdef NEW_SCORE
// given the host's estimated speed, determine its size class
//
static int get_size_class(APP& app, double es) {
for (int i=0; i<app.n_size_classes-1; i++) {
if (es < app.size_class_quantiles[i]) return i;
@ -222,7 +227,8 @@ void send_work_score_type(int rt) {
wu_result.state = g_pid;
// It passed fast checks.
// Release sema and to slow checks
// Release sema and do slow checks
//
unlock_sema();
sema_locked = false;
@ -279,289 +285,3 @@ void send_work_score() {
}
}
}
#else
// reread result from DB, make sure it's still unsent
// TODO: from here to add_result_to_reply()
// (which updates the DB record) should be a transaction
//
int read_sendable_result(SCHED_DB_RESULT& result) {
int retval = result.lookup_id(result.id);
if (retval) {
log_messages.printf(MSG_CRITICAL,
"[RESULT#%u] result.lookup_id() failed %s\n",
result.id, boincerror(retval)
);
return ERR_NOT_FOUND;
}
if (result.server_state != RESULT_SERVER_STATE_UNSENT) {
log_messages.printf(MSG_NORMAL,
"[RESULT#%u] expected to be unsent; instead, state is %d\n",
result.id, result.server_state
);
return ERR_BAD_RESULT_STATE;
}
return 0;
}
// TODO: use slow_check()
//
bool wu_is_infeasible_slow(
WU_RESULT& wu_result, SCHEDULER_REQUEST&, SCHEDULER_REPLY&
) {
char buf[256];
int retval;
int n;
SCHED_DB_RESULT result;
// Don't send if we've already sent a result of this WU to this user.
//
if (config.one_result_per_user_per_wu) {
sprintf(buf,
"where workunitid=%d and userid=%d",
wu_result.workunit.id, g_reply->user.id
);
retval = result.count(n, buf);
if (retval) {
log_messages.printf(MSG_CRITICAL,
"send_work: can't get result count (%s)\n", boincerror(retval)
);
return true;
} else {
if (n>0) {
if (config.debug_send) {
log_messages.printf(MSG_NORMAL,
"[send] send_work: user %d already has %d result(s) for WU %d\n",
g_reply->user.id, n, wu_result.workunit.id
);
}
return true;
}
}
} else if (config.one_result_per_host_per_wu) {
// Don't send if we've already sent a result
// of this WU to this host.
// We only have to check this
// if we don't send one result per user.
//
sprintf(buf,
"where workunitid=%d and hostid=%d",
wu_result.workunit.id, g_reply->host.id
);
retval = result.count(n, buf);
if (retval) {
log_messages.printf(MSG_CRITICAL,
"send_work: can't get result count (%s)\n", boincerror(retval)
);
return true;
} else {
if (n>0) {
if (config.debug_send) {
log_messages.printf(MSG_NORMAL,
"[send] send_work: host %d already has %d result(s) for WU %d\n",
g_reply->host.id, n, wu_result.workunit.id
);
}
return true;
}
}
}
APP* app = ssp->lookup_app(wu_result.workunit.appid);
WORKUNIT wu = wu_result.workunit;
if (app_hr_type(*app)) {
if (already_sent_to_different_hr_class(wu, *app)) {
if (config.debug_send) {
log_messages.printf(MSG_NORMAL,
"[send] [HOST#%d] [WU#%u %s] WU is infeasible (assigned to different platform)\n",
g_reply->host.id, wu.id, wu.name
);
}
// Mark the workunit as infeasible.
// This ensures that jobs already assigned to a platform
// are processed first.
//
wu_result.infeasible_count++;
return true;
}
}
return false;
}
double JOB_SET::lowest_score() {
if (jobs.empty()) return 0;
return jobs.back().score;
}
// add the given job, and remove lowest-score jobs that
// - are in excess of work request
// - are in excess of per-request or per-day limits
// - cause the disk limit to be exceeded
//
void JOB_SET::add_job(JOB& job) {
while (!jobs.empty()) {
JOB& worst_job = jobs.back();
if (est_time + job.est_time - worst_job.est_time > work_req) {
est_time -= worst_job.est_time;
disk_usage -= worst_job.disk_usage;
jobs.pop_back();
ssp->wu_results[worst_job.index].state = WR_STATE_PRESENT;
} else {
break;
}
}
while (!jobs.empty()) {
JOB& worst_job = jobs.back();
if (disk_usage + job.disk_usage > disk_limit) {
est_time -= worst_job.est_time;
disk_usage -= worst_job.disk_usage;
jobs.pop_back();
ssp->wu_results[worst_job.index].state = WR_STATE_PRESENT;
} else {
break;
}
}
if ((int)jobs.size() == max_jobs) {
JOB& worst_job = jobs.back();
jobs.pop_back();
ssp->wu_results[worst_job.index].state = WR_STATE_PRESENT;
}
std::list<JOB>::iterator i = jobs.begin();
while (i != jobs.end()) {
if (i->score < job.score) {
jobs.insert(i, job);
break;
}
i++;
}
if (i == jobs.end()) {
jobs.push_back(job);
}
est_time += job.est_time;
disk_usage += job.disk_usage;
if (config.debug_send) {
log_messages.printf(MSG_NORMAL,
"[send] added job to set. est_time %.2f disk_usage %.2fGB\n",
est_time, disk_usage/GIGA
);
}
}
// return the disk usage of jobs above the given score
//
double JOB_SET::higher_score_disk_usage(double v) {
double sum = 0;
std::list<JOB>::iterator i = jobs.begin();
while (i != jobs.end()) {
if (i->score < v) break;
sum += i->disk_usage;
i++;
}
return sum;
}
void JOB_SET::send() {
WORKUNIT wu;
SCHED_DB_RESULT result;
int retval;
std::list<JOB>::iterator i = jobs.begin();
while (i != jobs.end()) {
JOB& job = *(i++);
WU_RESULT wu_result = ssp->wu_results[job.index];
ssp->wu_results[job.index].state = WR_STATE_EMPTY;
wu = wu_result.workunit;
result.id = wu_result.resultid;
retval = read_sendable_result(result);
if (!retval) {
add_result_to_reply(result, wu, job.bavp, false);
}
}
}
void send_work_score() {
int i, slots_locked=0, slots_nonempty=0;
JOB_SET jobs;
int min_slots = config.mm_min_slots;
if (!min_slots) min_slots = ssp->max_wu_results/2;
int max_slots = config.mm_max_slots;
if (!max_slots) max_slots = ssp->max_wu_results;
int max_locked = 10;
lock_sema();
i = rand() % ssp->max_wu_results;
// scan through the job cache, maintaining a JOB_SET of jobs
// that we can send to this client, ordered by score.
//
for (int slots_scanned=0; slots_scanned<max_slots; slots_scanned++) {
i = (i+1) % ssp->max_wu_results;
WU_RESULT& wu_result = ssp->wu_results[i];
switch (wu_result.state) {
case WR_STATE_EMPTY:
continue;
case WR_STATE_PRESENT:
slots_nonempty++;
break;
default:
slots_nonempty++;
if (wu_result.state == g_pid) break;
slots_locked++;
continue;
}
JOB job;
job.index = i;
// get score for this job, and skip it if it fails quick check.
// NOTE: the EDF check done in get_score()
// includes only in-progress jobs.
//
if (!job.get_score()) {
continue;
}
if (config.debug_send) {
log_messages.printf(MSG_NORMAL,
"[send] score for %s: %f\n", wu_result.workunit.name, job.score
);
}
if (job.score > jobs.lowest_score() || !jobs.request_satisfied()) {
ssp->wu_results[i].state = g_pid;
unlock_sema();
if (wu_is_infeasible_slow(wu_result, *g_request, *g_reply)) {
// if we can't use this job, put it back in pool
//
lock_sema();
ssp->wu_results[i].state = WR_STATE_PRESENT;
continue;
}
lock_sema();
jobs.add_job(job);
}
if (jobs.request_satisfied() && slots_scanned>=min_slots) break;
}
if (slots_nonempty) {
g_wreq->no_jobs_available = false;
} else {
log_messages.printf(MSG_CRITICAL,
"Job cache is empty - check feeder\n"
);
}
// TODO: trim jobs from tail of list until we pass the EDF check
//
jobs.send();
unlock_sema();
if (slots_locked > max_locked) {
log_messages.printf(MSG_CRITICAL,
"Found too many locked slots (%d>%d) - increase array size\n",
slots_locked, max_locked
);
}
}
#endif

View File

@ -15,9 +15,8 @@
// You should have received a copy of the GNU Lesser General Public License
// along with BOINC. If not, see <http://www.gnu.org/licenses/>.
#define NEW_SCORE
// job dispatch using a score-based approach. See sched_scure.cpp
#ifdef NEW_SCORE
struct JOB {
int index;
int result_id;
@ -28,43 +27,4 @@ struct JOB {
bool get_score(WU_RESULT&);
};
#else
#include <list>
struct JOB {
int index;
double score;
double est_time;
double disk_usage;
APP* app;
BEST_APP_VERSION* bavp;
bool get_score();
};
struct JOB_SET {
double work_req;
double est_time;
double disk_usage;
double disk_limit;
int max_jobs;
std::list<JOB> jobs; // sorted high to low
JOB_SET() {
work_req = g_request->work_req_seconds;
est_time = 0;
disk_usage = 0;
disk_limit = g_wreq->disk_available;
max_jobs = g_wreq->max_jobs_per_rpc;
}
void add_job(JOB&);
double higher_score_disk_usage(double);
double lowest_score();
inline bool request_satisfied() {
return est_time >= work_req;
}
void send();
};
#endif
extern void send_work_score();