Add support for multi-size apps

See http://boinc.berkeley.edu/trac/wiki/MultiSize
The components of this include:
- DB changes:
    add size_class to workunit and result
    n_size_classes to app; >1 means multi-size
- size_regulator daemon program: change results states
    from INACTIVE to UNSENT carefully
- size_census program; writes quantile info in flat files
- transitioner: when creating results for multi-size apps,
    set server state to INACTIVE
- sched shmem (feeder): read quantile info from flat files,
    store in shared memory
- scheduler (score-based scheduling): for multi-size apps,
    add component to score function for size class.
- show_shmem: show result size class
- make_work (and other callers of count_unsent_results()):
    count both INACTIVE and UNSENT
- create_work: add --size_class cmdline option

Also:
- if get MySQL errors in upgrade, don't rewrite db_version
This commit is contained in:
David Anderson 2013-04-25 00:27:35 -07:00
parent 99258dcecb
commit 0c430ce1fa
16 changed files with 234 additions and 32 deletions

View File

@ -63,8 +63,14 @@ void APP_VERSION::clear() {memset(this, 0, sizeof(*this));}
void USER::clear() {memset(this, 0, sizeof(*this));}
void TEAM::clear() {memset(this, 0, sizeof(*this));}
void HOST::clear() {memset(this, 0, sizeof(*this));}
void RESULT::clear() {memset(this, 0, sizeof(*this));}
void WORKUNIT::clear() {memset(this, 0, sizeof(*this));}
void RESULT::clear() {
memset(this, 0, sizeof(*this));
size_class = -1;
}
void WORKUNIT::clear() {
memset(this, 0, sizeof(*this));
size_class = -1;
}
void CREDITED_JOB::clear() {memset(this, 0, sizeof(*this));}
void MSG_FROM_HOST::clear() {memset(this, 0, sizeof(*this));}
void MSG_TO_HOST::clear() {memset(this, 0, sizeof(*this));}
@ -208,7 +214,8 @@ void DB_APP::db_print(char* buf){
"host_scale_check=%d, "
"homogeneous_app_version=%d, "
"non_cpu_intensive=%d, "
"locality_scheduling=%d ",
"locality_scheduling=%d, "
"n_size_classes=%d ",
create_time,
name,
min_version,
@ -222,7 +229,8 @@ void DB_APP::db_print(char* buf){
host_scale_check?1:0,
homogeneous_app_version?1:0,
non_cpu_intensive?1:0,
locality_scheduling
locality_scheduling,
n_size_classes
);
}
@ -244,6 +252,7 @@ void DB_APP::db_parse(MYSQL_ROW &r) {
homogeneous_app_version = (atoi(r[i++]) != 0);
non_cpu_intensive = (atoi(r[i++]) != 0);
locality_scheduling = atoi(r[i++]);
n_size_classes = atoi(r[i++]);
}
void DB_APP_VERSION::db_print(char* buf){
@ -832,7 +841,8 @@ void DB_WORKUNIT::db_print(char* buf){
"rsc_bandwidth_bound=%.15e, "
"fileset_id=%d, "
"app_version_id=%d, "
"transitioner_flags=%d ",
"transitioner_flags=%d, "
"size_class=%d ",
create_time, appid,
name, xml_doc, batch,
rsc_fpops_est, rsc_fpops_bound, rsc_memory_bound, rsc_disk_bound,
@ -851,7 +861,8 @@ void DB_WORKUNIT::db_print(char* buf){
rsc_bandwidth_bound,
fileset_id,
app_version_id,
transitioner_flags
transitioner_flags,
size_class
);
}
@ -890,6 +901,7 @@ void DB_WORKUNIT::db_parse(MYSQL_ROW &r) {
fileset_id = atoi(r[i++]);
app_version_id = atoi(r[i++]);
transitioner_flags = atoi(r[i++]);
size_class = atoi(r[i++]);
}
void DB_CREDITED_JOB::db_print(char* buf){
@ -921,7 +933,7 @@ void DB_RESULT::db_print(char* buf){
"claimed_credit=%.15e, granted_credit=%.15e, opaque=%.15e, random=%d, "
"app_version_num=%d, appid=%d, exit_status=%d, teamid=%d, "
"priority=%d, mod_time=null, elapsed_time=%.15e, flops_estimate=%.15e, "
"app_version_id=%d, runtime_outlier=%d",
"app_version_id=%d, runtime_outlier=%d, size_class=%d",
create_time, workunitid,
server_state, outcome, client_state,
hostid, userid,
@ -933,7 +945,8 @@ void DB_RESULT::db_print(char* buf){
app_version_num, appid, exit_status, teamid,
priority, elapsed_time, flops_estimate,
app_version_id,
runtime_outlier?1:0
runtime_outlier?1:0,
size_class
);
UNESCAPE(xml_doc_out);
UNESCAPE(stderr_out);
@ -954,7 +967,7 @@ void DB_RESULT::db_print_values(char* buf){
"'%s', '%s', '%s', "
"%d, %d, %d, "
"%.15e, %.15e, %.15e, %d, "
"%d, %d, %d, %d, %d, null, 0, 0, 0, 0)",
"%d, %d, %d, %d, %d, null, 0, 0, 0, 0, %d)",
create_time, workunitid,
server_state, outcome, client_state,
hostid, userid,
@ -963,7 +976,7 @@ void DB_RESULT::db_print_values(char* buf){
xml_doc_in, xml_doc_out, stderr_out,
batch, file_delete_state, validate_state,
claimed_credit, granted_credit, opaque, random,
app_version_num, appid, exit_status, teamid, priority
app_version_num, appid, exit_status, teamid, priority, size_class
);
UNESCAPE(xml_doc_out);
UNESCAPE(stderr_out);
@ -1031,6 +1044,58 @@ void DB_RESULT::db_parse(MYSQL_ROW &r) {
flops_estimate = atof(r[i++]);
app_version_id = atoi(r[i++]);
runtime_outlier = (atoi(r[i++]) != 0);
size_class = atoi(r[i++]);
}
int DB_RESULT::get_unsent_counts(APP& app, int* unsent_count) {
char query[1024];
MYSQL_RES *rp;
for (int i=0; i<app.n_size_classes; i++) {
unsent_count[i] = 0;
}
sprintf(query,
"select size_class, count(size_class) from result where appid=%d and server_state=%d group by size_class",
app.id, RESULT_SERVER_STATE_UNSENT
);
int retval = db->do_query(query);
if (retval) return mysql_errno(db->mysql);
rp = mysql_store_result(db->mysql);
if (!rp) return mysql_errno(db->mysql);
while (1) {
MYSQL_ROW row = mysql_fetch_row(rp);
if (!row) break;
int sc = atoi(row[0]);
int count = atoi(row[1]);
if (sc >= app.n_size_classes) {
fprintf(stderr, "size class %d too large\n", sc);
retval = -1;
break;
}
unsent_count[sc] = count;
};
mysql_free_result(rp);
return retval;
}
int DB_RESULT::make_unsent(
APP& app, int size_class, int n, const char* order_clause, int& nchanged
) {
char query[1024];
sprintf(query,
"update result set server_state=%d where appid=%d and server_state=%d and size_class=%d %s limit %d",
RESULT_SERVER_STATE_UNSENT,
app.id,
RESULT_SERVER_STATE_INACTIVE,
size_class,
order_clause,
n
);
int retval = db->do_query(query);
if (retval) return mysql_errno(db->mysql);
nchanged = db->affected_rows();
return 0;
}
void DB_MSG_FROM_HOST::db_print(char* buf) {
@ -1332,6 +1397,7 @@ void TRANSITIONER_ITEM::parse(MYSQL_ROW& r) {
batch = atoi(r[i++]);
app_version_id = atoi(r[i++]);
transitioner_flags = atoi(r[i++]);
size_class = atoi(r[i++]);
// use safe_atoi() from here on cuz they might not be there
//
@ -1392,6 +1458,7 @@ int DB_TRANSITIONER_ITEM_SET::enumerate(
" wu.batch, "
" wu.app_version_id, "
" wu.transitioner_flags, "
" wu.size_class, "
" res.id, "
" res.name, "
" res.report_deadline, "
@ -1770,6 +1837,8 @@ void WORK_ITEM::parse(MYSQL_ROW& r) {
wu.rsc_bandwidth_bound = atof(r[i++]);
wu.fileset_id = atoi(r[i++]);
wu.app_version_id = atoi(r[i++]);
wu.transitioner_flags = atoi(r[i++]);
wu.size_class = atoi(r[i++]);
}
int DB_WORK_ITEM::enumerate(

View File

@ -55,6 +55,7 @@ struct TRANSITIONER_ITEM {
int batch;
int app_version_id;
int transitioner_flags;
int size_class;
int res_id; // This is the RESULT ID
char res_name[256];
int res_report_deadline;
@ -182,6 +183,10 @@ public:
void db_print_values(char*);
void db_parse(MYSQL_ROW &row);
void operator=(RESULT& r) {RESULT::operator=(r);}
int get_unsent_counts(APP&, int* unsent);
int make_unsent(
APP&, int size_class, int n, const char* order_clause, int& nchanged
);
};
class DB_WORKUNIT : public DB_BASE, public WORKUNIT {

View File

@ -52,6 +52,8 @@ struct PLATFORM {
#define LOCALITY_SCHED_NONE 0
#define LOCALITY_SCHED_LITE 1
#define MAX_SIZE_CLASSES 10
// An application.
//
struct APP {
@ -77,12 +79,15 @@ struct APP {
bool non_cpu_intensive;
int locality_scheduling;
// type of locality scheduling used by this app (see above)
int n_size_classes;
// for multi-size apps, number of size classes
int write(FILE*);
void clear();
// not in DB:
bool have_job;
double size_class_quantiles[MAX_SIZE_CLASSES];
};
// A version of an application.
@ -449,6 +454,9 @@ struct WORKUNIT {
// which version this job is committed to (0 if none)
int transitioner_flags;
// bitmask; see values above
int size_class;
// -1 means none; encode this here so that transitioner
// doesn't have to look up app
// the following not used in the DB
char app_name[256];
@ -470,7 +478,7 @@ struct CREDITED_JOB {
// values of result.server_state
// see html/inc/common_defs.inc
//
//#define RESULT_SERVER_STATE_INACTIVE 1
#define RESULT_SERVER_STATE_INACTIVE 1
#define RESULT_SERVER_STATE_UNSENT 2
#define RESULT_SERVER_STATE_IN_PROGRESS 4
#define RESULT_SERVER_STATE_OVER 5
@ -576,6 +584,8 @@ struct RESULT {
bool runtime_outlier;
// the validator tagged this as having an unusual elapsed time;
// don't include it in PFC or elapsed time statistics.
int size_class;
// -1 means none
void clear();
};

View File

@ -54,6 +54,7 @@ create table app (
homogeneous_app_version tinyint not null default 0,
non_cpu_intensive tinyint not null default 0,
locality_scheduling integer not null default 0,
n_size_classes smallint not null default 0,
primary key (id)
) engine=InnoDB;
@ -246,6 +247,7 @@ create table workunit (
fileset_id integer not null,
app_version_id integer not null,
transitioner_flags tinyint not null,
size_class smallint not null default -1,
primary key (id)
) engine=InnoDB;
@ -283,6 +285,7 @@ create table result (
flops_estimate double not null,
app_version_id integer not null,
runtime_outlier tinyint not null,
size_class smallint not null default -1,
primary key (id)
) engine=InnoDB;

View File

@ -317,8 +317,10 @@ function show_user_summary_public($user) {
row2(tra("Country"), $user->country);
// don't show URL if user has no recent credit (spam suppression)
//
if (strlen($user->url) && $user->expavg_credit > 1) {
row2(tra("URL"), "<a href=\"http://$user->url\">http://$user->url</a>");
if (strlen($user->url)) {
if (no_computing() || $user->expavg_credit > 1) {
row2(tra("URL"), "<a href=\"http://$user->url\">http://$user->url</a>");
}
}
if (!no_computing()) {
show_credit($user);

View File

@ -843,6 +843,12 @@ function update_11_25_2012() {
");
}
function update_4_26_2013() {
do_query("alter table app add n_size_classes smallint not null default 0");
do_query("alter table workunit add size_class smallint not null default -1");
do_query("alter table result add size_class smallint not null default -1");
}
// Updates are done automatically if you use "upgrade".
//
// If you need to do updates manually,
@ -853,6 +859,10 @@ function update_11_25_2012() {
//update_3_17_2010();
// in the following, the first element is a version number.
// This used to be the Subversion version#,
// but with Git we just use sequential integers
//
$db_updates = array (
array(18490, "update_6_16_2009"),
array(19001, "update_9_3_2009"),
@ -871,6 +881,7 @@ $db_updates = array (
array(26060, "update_8_24_2012"),
array(26062, "update_8_26_2012"),
array(27000, "update_11_25_2012"),
array(27001, "update_4_26_2013"),
);
?>

View File

@ -44,15 +44,15 @@ function do_app($app) {
$result = $db->do_query($query);
$a = array();
while ($x = mysql_fetch_object($result)) {
$a[] = $x->et_avg * $x->on_frac * $x->active_frac;
$a[] = (1/$x->et_avg) * $x->on_frac * $x->active_frac;
}
mysql_free_result($result);
sort($a);
$n = count($a);
$f = fopen("../../size_census_".$app->name, "w");
for ($i=1; $i<$app->n_size_classes; $i++) {
$k = (int)(($i*n)/$app->n_size_classes);
fprintf($f, "%f\n", $a[$k]);
$k = (int)(($i*$n)/$app->n_size_classes);
fprintf($f, "%e\n", $a[$k]);
}
fclose($f);
}

View File

@ -62,6 +62,18 @@ foreach($updates as $update) {
list($rev, $func) = $update;
echo "performing update $func\n";
call_user_func($func);
$e = mysql_error();
if ($e) {
echo "\nWARNING: database upgrade failed.
MySQL error message: $e
Please find the update queries in html/ops/db_update.php
and perform them manually.
When done, edit PROJECT_DIR/db_revision so that it contains the line
$rev
";
break;
}
file_put_contents("../../db_revision", $rev);
}
echo "All done.\n";

View File

@ -404,6 +404,7 @@ sys.path.insert(0, os.path.join('%s', 'py'))
'sample_work_generator',
'show_shmem',
'single_job_assimilator',
'size_regulator',
'transitioner',
'transitioner_catchup.php',
'update_stats'

View File

@ -115,6 +115,7 @@ sched_PROGRAMS = \
sample_trivial_validator \
sample_work_generator \
single_job_assimilator \
size_regulator \
transitioner \
trickle_credit \
trickle_echo \
@ -257,6 +258,9 @@ make_work_LDADD = $(SERVERLIBS)
transitioner_SOURCES = transitioner.cpp
transitioner_LDADD = $(SERVERLIBS)
size_regulator_SOURCES = size_regulator.cpp
size_regulator_LDADD = $(SERVERLIBS)
message_handler_SOURCES = message_handler.cpp
message_handler_LDADD = $(SERVERLIBS)

View File

@ -37,6 +37,13 @@
#ifdef NEW_SCORE
static int get_size_class(APP& app, double es) {
for (int i=0; i<app.n_size_classes-1; i++) {
if (es < app.size_class_quantiles[i]) return i;
}
return app.n_size_classes - 1;
}
// Assign a score to this job,
// representing the value of sending the job to this host.
// Also do some initial screening,
@ -86,6 +93,29 @@ bool JOB::get_score(WU_RESULT& wu_result) {
}
}
if (app->n_size_classes > 1) {
double effective_speed = bavp->host_usage.projected_flops * g_reply->host.on_frac * g_reply->host.active_frac;
int target_size = get_size_class(*app, effective_speed);
if (config.debug_send) {
log_messages.printf(MSG_NORMAL,
"[send] size: host %d job %d speed %f\n",
target_size, wu_result.workunit.size_class, effective_speed
);
}
if (target_size == wu_result.workunit.size_class) {
score += 5;
} else if (target_size < wu_result.workunit.size_class) {
score -= 2;
} else {
score -= 1;
}
}
if (config.debug_send) {
log_messages.printf(MSG_NORMAL,
"[send]: job score %f\n", score
);
}
return true;
}

View File

@ -25,6 +25,7 @@
#include <cstring>
#include <string>
#include <vector>
#include <sys/param.h>
using std::vector;
@ -98,8 +99,7 @@ int SCHED_SHMEM::scan_tables() {
int i, j, n;
n = 0;
while (!platform.enumerate()) {
if (platform.deprecated) continue;
while (!platform.enumerate("where deprecated=0")) {
platforms[n++] = platform;
if (n == MAX_PLATFORMS) {
overflow("platforms", "MAX_PLATFORMS");
@ -109,9 +109,7 @@ int SCHED_SHMEM::scan_tables() {
n = 0;
app_weight_sum = 0;
while (!app.enumerate()) {
if (app.deprecated) continue;
apps[n++] = app;
while (!app.enumerate("where deprecated=0")) {
if (n == MAX_APPS) {
overflow("apps", "MAX_APPS");
}
@ -126,6 +124,30 @@ int SCHED_SHMEM::scan_tables() {
have_nci_app = true;
app.non_cpu_intensive = true;
}
if (app.n_size_classes > 1) {
char path[MAXPATHLEN];
sprintf(path, "../size_census_%s", app.name);
FILE* f = fopen(path, "r");
if (!f) {
log_messages.printf(MSG_CRITICAL,
"Missing size census file for app %s\n", app.name
);
return ERR_FOPEN;
}
for (int i=0; i<app.n_size_classes-1; i++) {
char buf[256];
char* p = fgets(buf, 256, f);
if (!p) {
log_messages.printf(MSG_CRITICAL,
"Size census file for app %s is too short\n", app.name
);
return ERR_XML_PARSE; // whatever
}
app.size_class_quantiles[i] = atof(buf);
}
fclose(f);
}
apps[n++] = app;
}
napps = n;
@ -341,7 +363,7 @@ void SCHED_SHMEM::show(FILE* f) {
"HR class",
"priority",
"in shmem",
"size (stdev)",
"size class",
"need reliable",
"inf count"
);
@ -356,7 +378,7 @@ void SCHED_SHMEM::show(FILE* f) {
appname = app?app->name:"missing";
delta_t = dtime() - wu_result.time_added_to_shared_memory;
fprintf(f,
"%4d %12.12s %10d %10d %10d %8d %10d %7ds %12f %12s %9d\n",
"%4d %12.12s %10d %10d %10d %8d %10d %7ds %9d %12s %9d\n",
i,
appname,
wu_result.workunit.id,
@ -365,7 +387,7 @@ void SCHED_SHMEM::show(FILE* f) {
wu_result.workunit.hr_class,
wu_result.res_priority,
delta_t,
wu_result.fpops_size,
wu_result.workunit.size_class,
wu_result.need_reliable?"yes":"no",
wu_result.infeasible_count
);

View File

@ -276,11 +276,11 @@ int count_workunits(int& n, const char* query) {
int count_unsent_results(int& n, int appid) {
char buf[256];
if (appid) {
sprintf(buf, "where server_state=%d and appid=%d ",
sprintf(buf, "where server_state<=%d and appid=%d ",
RESULT_SERVER_STATE_UNSENT, appid
);
} else {
sprintf(buf, "where server_state=%d", RESULT_SERVER_STATE_UNSENT);
sprintf(buf, "where server_state<=%d", RESULT_SERVER_STATE_UNSENT);
}
return count_results(buf, n);

View File

@ -47,10 +47,17 @@ int do_pass(bool& action) {
if (retval) return retval;
action = false;
for (int i=0; i<app.n_size_classes; i++) {
log_messages.printf(MSG_NORMAL, "%d unsent for class %d\n", unsent[i], i);
if (unsent[i] < lo) {
int n = hi - unsent[i], nchanged;
log_messages.printf(MSG_NORMAL,
"releasing %d jobs of size class %d\n", n, i
);
retval = result.make_unsent(app, i, n, order_clause, nchanged);
if (retval) return retval;
log_messages.printf(MSG_NORMAL,
"%d jobs released\n", nchanged
);
if (nchanged == n) {
action = true;
}
@ -70,6 +77,10 @@ int main(int argc, char** argv) {
lo = atoi(argv[++i]);
} else if (!strcmp(argv[i], "--hi")) {
hi = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-d")) {
log_messages.set_debug_level(atoi(argv[++i]));
} else if (!strcmp(argv[i], "--debug_leveld")) {
log_messages.set_debug_level(atoi(argv[++i]));
} else if (!strcmp(argv[i], "--sleep_time")) {
sleep_time = atoi(argv[++i]);
} else if (!strcmp(argv[i], "--random_order")) {
@ -88,6 +99,8 @@ int main(int argc, char** argv) {
usage();
}
log_messages.printf(MSG_NORMAL, "Starting\n");
retval = config.parse_file();
if (retval) {
log_messages.printf(MSG_CRITICAL,
@ -96,7 +109,15 @@ int main(int argc, char** argv) {
exit(1);
}
log_messages.printf(MSG_NORMAL, "Starting\n");
retval = boinc_db.open(
config.db_name, config.db_host, config.db_user, config.db_passwd
);
if (retval) {
log_messages.printf(MSG_CRITICAL,
"boinc_db.open: %d; %s\n", retval, boinc_db.error_string()
);
exit(1);
}
sprintf(buf, "where name='%s'", app_name);
if (app.lookup(buf)) {
@ -104,7 +125,7 @@ int main(int argc, char** argv) {
exit(1);
}
if (app.n_size_classes < 2) {
log_messages.printf(MSG_CRITICAL, "app is not multi-size\n");
log_messages.printf(MSG_CRITICAL, "app '%s' is not multi-size\n", app_name);
exit(1);
}
while (1) {
@ -116,6 +137,9 @@ int main(int argc, char** argv) {
);
exit(1);
}
if (!action) daemon_sleep(sleep_time);
if (!action) {
log_messages.printf(MSG_NORMAL, "sleeping\n");
daemon_sleep(sleep_time);
}
}
}

View File

@ -82,7 +82,12 @@ static void initialize_result(DB_RESULT& result, WORKUNIT& wu) {
result.id = 0;
result.create_time = time(0);
result.workunitid = wu.id;
result.server_state = RESULT_SERVER_STATE_UNSENT;
result.size_class = wu.size_class;
if (result.size_class < 0) {
result.server_state = RESULT_SERVER_STATE_UNSENT;
} else {
result.server_state = RESULT_SERVER_STATE_INACTIVE;
}
result.hostid = 0;
result.report_deadline = 0;
result.sent_time = 0;
@ -120,6 +125,7 @@ int create_result_ti(
wu.appid = ti.appid;
wu.priority = ti.priority;
wu.batch = ti.batch;
wu.size_class = ti.size_class;
return create_result(
wu,
result_template_filename,
@ -528,7 +534,7 @@ int cancel_jobs(int min_id, int max_id) {
sprintf(set_clause, "server_state=%d, outcome=%d",
RESULT_SERVER_STATE_OVER, RESULT_OUTCOME_DIDNT_NEED
);
sprintf(where_clause, "server_state=%d and workunitid >=%d and workunitid<= %d",
sprintf(where_clause, "server_state<=%d and workunitid >=%d and workunitid<= %d",
RESULT_SERVER_STATE_UNSENT, min_id, max_id
);
retval = result.update_fields_noid(set_clause, where_clause);
@ -555,7 +561,7 @@ int cancel_job(DB_WORKUNIT& wu) {
sprintf(set_clause, "server_state=%d, outcome=%d",
RESULT_SERVER_STATE_OVER, RESULT_OUTCOME_DIDNT_NEED
);
sprintf(where_clause, "server_state=%d and workunitid=%d",
sprintf(where_clause, "server_state<=%d and workunitid=%d",
RESULT_SERVER_STATE_UNSENT, wu.id
);
retval = result.update_fields_noid(set_clause, where_clause);

View File

@ -62,6 +62,7 @@ void usage() {
" [ --rsc_fpops_est x ]\n"
" [ --rsc_fpops_bound x ]\n"
" [ --rsc_memory_bound x ]\n"
" [ --size_class n ]\n"
" [ --target_host ID ]\n"
" [ --target_nresults n ]\n"
" [ --target_team ID ]\n"
@ -154,6 +155,8 @@ int main(int argc, const char** argv) {
wu.rsc_fpops_bound = atof(argv[++i]);
} else if (arg(argv, i, "rsc_memory_bound")) {
wu.rsc_memory_bound = atof(argv[++i]);
} else if (arg(argv, i, "size_class")) {
wu.size_class = atoi(argv[++i]);
} else if (arg(argv, i, "rsc_disk_bound")) {
wu.rsc_disk_bound = atof(argv[++i]);
} else if (arg(argv, i, "delay_bound")) {