- server: revamp the "assigned job" mechanism.

This now supports two main use cases:
    1) there's a job that you want to run once on all hosts,
        present and future
        (or all hosts belonging to a user, or to a team).
        The job is never transitioned, validated, or assimilated.
    2) There's a normal job for which you want to use only
        hosts belonging to a specific user (e.g. cluster or cloud hosts).
        This restriction can be made either when the job is created,
        or on the fly,
        e.g. as part of a scheme for accelerating batch completion.
        For the latter purposes we now provide a function
            restrict_wu_to_user(DB_WORKUNIT&, int userid);

        The job goes through the standard
        transitioner/validator/assimilator path.

    These cases are enabled by config flags
        <enable_assignment_multi/>
        <enable_assignment/>
    respectively.

    Assignment of type 2) are no longer stored in shared mem,
    so there is no limit on their number.

    There is no longer a rule that assigned job names must contain "asgn".

    NOTE: this requires a database update.


svn path=/trunk/boinc/; revision=25169
This commit is contained in:
David Anderson 2012-01-30 22:39:13 +00:00
parent a542bde3e4
commit 130d6ed4f0
19 changed files with 240 additions and 85 deletions

View File

@ -1141,3 +1141,50 @@ David 30 Jan 2012
clientgui/
DlgAdvPreferencesBase.cpp
David 30 Jan 2012
- server: revamp the "assigned job" mechanism.
This now supports two main use cases:
1) there's a job that you want to run once on all hosts,
present and future
(or all hosts belonging to a user, or to a team).
The job is never transitioned, validated, or assimilated.
2) There's a normal job for which you want to use only
hosts belonging to a specific user (e.g. cluster or cloud hosts).
This restriction can be made either when the job is created,
or on the fly,
e.g. as part of a scheme for accelerating batch completion.
For the latter purposes we now provide a function
restrict_wu_to_user(DB_WORKUNIT&, int userid);
The job goes through the standard
transitioner/validator/assimilator path.
These cases are enabled by config flags
<enable_assignment_multi/>
<enable_assignment/>
respectively.
Assignment of type 2) are no longer stored in shared mem,
so there is no limit on their number.
There is no longer a rule that assigned job names must contain "asgn".
NOTE: this requires a database update.
db/
boinc_db.cpp,h
constraints.sql
schema.sql
sched/
sched_util.cpp,h
sched_shmem.cpp
transitioner.cpp
sched_assign.cpp
tools/
backend_lib.cpp
create_work.cpp
html/ops/
db_update.php
lib/
common_defs.h

View File

@ -822,7 +822,8 @@ void DB_WORKUNIT::db_print(char* buf){
"priority=%d, "
"rsc_bandwidth_bound=%.15e, "
"fileset_id=%d, "
"app_version_id=%d ",
"app_version_id=%d, "
"transitioner_flags=%d ",
create_time, appid,
name, xml_doc, batch,
rsc_fpops_est, rsc_fpops_bound, rsc_memory_bound, rsc_disk_bound,
@ -840,7 +841,8 @@ void DB_WORKUNIT::db_print(char* buf){
priority,
rsc_bandwidth_bound,
fileset_id,
app_version_id
app_version_id,
transitioner_flags
);
}
@ -878,6 +880,7 @@ void DB_WORKUNIT::db_parse(MYSQL_ROW &r) {
rsc_bandwidth_bound = atof(r[i++]);
fileset_id = atoi(r[i++]);
app_version_id = atoi(r[i++]);
transitioner_flags = atoi(r[i++]);
}
void DB_CREDITED_JOB::db_print(char* buf){
@ -1084,7 +1087,7 @@ void DB_ASSIGNMENT::db_print(char* buf) {
target_type,
multi,
workunitid,
resultid
_resultid
);
}
@ -1097,7 +1100,7 @@ void DB_ASSIGNMENT::db_parse(MYSQL_ROW& r) {
target_type = atoi(r[i++]);
multi = atoi(r[i++]);
workunitid = atoi(r[i++]);
resultid = atoi(r[i++]);
_resultid = atoi(r[i++]);
}
int DB_HOST_APP_VERSION::update_scheduler(DB_HOST_APP_VERSION& orig) {
@ -1290,6 +1293,7 @@ void TRANSITIONER_ITEM::parse(MYSQL_ROW& r) {
hr_class = atoi(r[i++]);
batch = atoi(r[i++]);
app_version_id = atoi(r[i++]);
transitioner_flags = atoi(r[i++]);
// use safe_atoi() from here on cuz they might not be there
//
@ -1349,6 +1353,7 @@ int DB_TRANSITIONER_ITEM_SET::enumerate(
" wu.hr_class, "
" wu.batch, "
" wu.app_version_id, "
" wu.transitioner_flags, "
" res.id, "
" res.name, "
" res.report_deadline, "
@ -1364,10 +1369,10 @@ int DB_TRANSITIONER_ITEM_SET::enumerate(
" workunit AS wu "
" LEFT JOIN result AS res ON wu.id = res.workunitid "
"WHERE "
" wu.transition_time < %d %s "
" wu.transition_time < %d %s and transitioner_flags<>%d"
"LIMIT "
" %d ",
transition_time, mod_clause, nresult_limit
transition_time, mod_clause, TRANSITION_NONE, nresult_limit
);
retval = db->do_query(query);
@ -2029,8 +2034,6 @@ int DB_SCHED_RESULT_ITEM_SET::update_workunits() {
for (i=0; i<results.size(); i++) {
if (results[i].id == 0) continue;
// skip non-updated results
if (strstr(results[i].name, ASSIGNED_WU_STR)) continue;
// skip assigned jobs
if (!first) strcat(query, ",");
first = false;
sprintf(buf, "%d", results[i].workunitid);

View File

@ -367,6 +367,7 @@ struct HOST {
// (file delete, assimilate, and states of results, error flags)
// bit fields of error_mask
//
#define WU_ERROR_COULDNT_SEND_RESULT 1
#define WU_ERROR_TOO_MANY_ERROR_RESULTS 2
#define WU_ERROR_TOO_MANY_SUCCESS_RESULTS 4
@ -374,6 +375,13 @@ struct HOST {
#define WU_ERROR_CANCELLED 16
#define WU_ERROR_NO_CANONICAL_RESULT 32
// bit fields of transition_flags
//
#define TRANSITION_NONE 1
// don't transition
#define TRANSITION_NO_NEW_RESULTS 2
// transition, but don't create results
struct WORKUNIT {
int id;
int create_time;
@ -435,6 +443,8 @@ struct WORKUNIT {
int app_version_id;
// if app uses homogeneous_app_version,
// which version this job is committed to (0 if none)
int transitioner_flags;
// bitmask; see values above
// the following not used in the DB
char app_name[256];
@ -630,7 +640,8 @@ struct ASSIGNMENT {
int target_type; // none/host/user/team
int multi; // 0 = single host, 1 = all hosts in set
int workunitid;
int resultid; // if not multi, the result ID
int _resultid; // if not multi, the result ID
// deprecated
void clear();
};
@ -654,6 +665,7 @@ struct TRANSITIONER_ITEM {
int hr_class;
int batch;
int app_version_id;
int transitioner_flags;
int res_id; // This is the RESULT ID
char res_name[256];
int res_report_deadline;

View File

@ -126,3 +126,6 @@ alter table notify
alter table host_app_version
add unique hap(host_id, app_version_id);
alter table assignment
add index asgn_target(target_type, target_id);

View File

@ -237,12 +237,13 @@ create table workunit (
max_error_results integer not null,
max_total_results integer not null,
max_success_results integer not null,
result_template_file varchar(63) not null,
result_template_file varchar(63) not null,
priority integer not null,
mod_time timestamp,
rsc_bandwidth_bound double not null,
fileset_id integer not null,
app_version_id integer not null,
transitioner_flags tinyint not null,
primary key (id)
) engine=InnoDB;
@ -366,6 +367,7 @@ create table assignment (
workunitid integer not null,
resultid integer not null,
-- if not multi, the result
-- deprecated
primary key (id)
) engine = InnoDB;

View File

@ -33,7 +33,10 @@ See also
<a href=wiki/Project_list>a complete list of projects</a>.
<p>
Note: if your computer is equipped with a Graphics Processing Unit
Projects have different requirements such as memory size;
a partial summary is <a href=http://boincfaq.mundayweb.com/index.php?view=67>here</a>.
<p>
If your computer is equipped with a Graphics Processing Unit
(GPU), you may be able to
<a href=http://boinc.berkeley.edu/wiki/GPU_computing>use it to compute faster</a>.
";

View File

@ -796,6 +796,17 @@ function update_9_20_2011() {
add manage tinyint not null
");
}
function_update_1_30_2012() {
do_query("
alter table workunit
add transitioner_flags tinyint not null
");
do_query(
"add index asgn_target(target_type, target_id)"
);
}
// Updates are done automatically if you use "upgrade".
//
// If you need to do updates manually,
@ -819,6 +830,7 @@ $db_updates = array (
array(24137, "update_9_6_2011"),
array(24225, "update_9_15_2011"),
array(24248, "update_9_20_2011"),
array(25169, "update_1_30_2012"),
);

View File

@ -179,7 +179,6 @@ struct VERSION_INFO {
#define CLIENT_AUTH_FILENAME "client_auth.xml"
#define LOCK_FILE_NAME "lockfile"
#define GRAPHICS_APP_FILENAME "graphics_app"
#define ASSIGNED_WU_STR "asgn"
#define GUI_RPC_PASSWD_FILE "gui_rpc_auth.cfg"
#define SS_CONFIG_FILE "ss_config.xml"

View File

@ -41,10 +41,12 @@
#include "sched_assign.h"
// send a job for the given assignment
//
static int send_assigned_job(ASSIGNMENT& asg) {
int retval;
DB_WORKUNIT wu;
char suffix[256], path[256], buf[256];
char suffix[256], path[256];
const char *rtfpath;
static bool first=true;
static int seqno=0;
@ -93,22 +95,6 @@ static int send_assigned_job(ASSIGNMENT& asg) {
retval = result.lookup_id(result_id);
add_result_to_reply(result, wu, bavp, false);
// if this is a one-job assignment, fill in assignment.resultid
// so that it doesn't get sent again
//
if (!asg.multi && asg.target_type!=ASSIGN_NONE) {
DB_ASSIGNMENT db_asg;
db_asg.id = asg.id;
sprintf(buf, "resultid=%u", result_id);
retval = db_asg.update_field(buf);
if (retval) {
log_messages.printf(MSG_CRITICAL,
"assign update failed: %s\n", boincerror(retval)
);
return retval;
}
asg.resultid = result_id;
}
if (config.debug_assignment) {
log_messages.printf(MSG_NORMAL,
"[assign] [WU#%d] [RESULT#%d] [HOST#%d] send assignment %d\n",
@ -118,27 +104,26 @@ static int send_assigned_job(ASSIGNMENT& asg) {
return 0;
}
// Send this host any jobs assigned to it, or to its user/team
// Send this host any "multi" assigned jobs.
// Return true iff we sent anything
//
bool send_assigned_jobs() {
bool send_assigned_jobs_multi() {
DB_RESULT result;
int retval;
char buf[256];
bool sent_something = false;
for (int i=0; i<ssp->nassignments; i++) {
if (!work_needed(false)) break;
ASSIGNMENT& asg = ssp->assignments[i];
if (config.debug_assignment) {
log_messages.printf(MSG_NORMAL,
"[assign] processing assignment type %d\n", asg.target_type
"[assign] processing multi assignment type %d\n",
asg.target_type
);
}
// see if this assignment applies to this host
//
if (asg.resultid) continue;
switch (asg.target_type) {
case ASSIGN_NONE:
sprintf(buf, "where hostid=%d and workunitid=%d",
@ -150,22 +135,11 @@ bool send_assigned_jobs() {
if (!retval) sent_something = true;
}
break;
case ASSIGN_HOST:
if (g_reply->host.id != asg.target_id) continue;
sprintf(buf, "where workunitid=%d", asg.workunitid);
retval = result.lookup(buf);
if (retval == ERR_DB_NOT_FOUND) {
retval = send_assigned_job(asg);
if (!retval) sent_something = true;
}
break;
case ASSIGN_USER:
if (g_reply->user.id != asg.target_id) continue;
if (asg.multi) {
sprintf(buf, "where workunitid=%d and hostid=%d", asg.workunitid, g_reply->host.id);
} else {
sprintf(buf, "where workunitid=%d", asg.workunitid);
}
sprintf(buf, "where workunitid=%d and hostid=%d",
asg.workunitid, g_reply->host.id
);
retval = result.lookup(buf);
if (retval == ERR_DB_NOT_FOUND) {
retval = send_assigned_job(asg);
@ -174,11 +148,7 @@ bool send_assigned_jobs() {
break;
case ASSIGN_TEAM:
if (g_reply->team.id != asg.target_id) continue;
if (asg.multi) {
sprintf(buf, "where workunitid=%d and hostid=%d", asg.workunitid, g_reply->host.id);
} else {
sprintf(buf, "where workunitid=%d", asg.workunitid);
}
sprintf(buf, "where workunitid=%d and hostid=%d", asg.workunitid, g_reply->host.id);
retval = result.lookup(buf);
if (retval == ERR_DB_NOT_FOUND) {
retval = send_assigned_job(asg);
@ -189,3 +159,76 @@ bool send_assigned_jobs() {
}
return sent_something;
}
// send non-multi assigned jobs
//
bool send_assigned_jobs() {
DB_ASSIGNMENT asg;
DB_RESULT result;
DB_WORKUNIT wu;
bool sent_something = false;
int retval;
// for now, only look for user assignments
//
char buf[256];
sprintf(buf, "target_type=%d and target_id=%d and multi=0",
ASSIGN_USER, g_reply->user.id
);
while (asg.enumerate(buf)) {
if (!work_needed(false)) continue;
// if the WU doesn't exist, delete the assignment record.
//
retval = wu.lookup_id(asg.workunitid);
if (retval) {
asg.delete_from_db();
continue;
}
// don't send if WU is validation pending or completed,
// or has transition pending
//
if (wu.need_validate) continue;
if (wu.canonical_resultid) continue;
if (wu.transition_time < time(0)) continue;
// don't send if we already sent one to this host
//
sprintf(buf, "where workunitid=%d and hostid=%d",
asg.workunitid,
g_request->host.id
);
retval = result.lookup(buf);
if (retval != ERR_DB_NOT_FOUND) continue;
// don't send if there's already one in progress to this user
//
sprintf(buf,
"where workunitid=%d and userid=%d and server_state=%d",
asg.workunitid,
g_reply->user.id,
RESULT_SERVER_STATE_IN_PROGRESS
);
retval = result.lookup(buf);
if (retval != ERR_DB_NOT_FOUND) continue;
// OK, send the job
//
retval = send_assigned_job(asg);
if (retval) continue;
sent_something = true;
// update the WU's transition time to time out this job
//
retval = wu.lookup_id(asg.workunitid);
if (retval) continue;
int new_tt = time(0) + wu.delay_bound;
if (new_tt < wu.transition_time) {
char buf2[256];
sprintf(buf2, "transition_time=%d", new_tt);
wu.update_field(buf2);
}
}
return sent_something;
}

View File

@ -16,3 +16,4 @@
// along with BOINC. If not, see <http://www.gnu.org/licenses/>.
extern bool send_assigned_jobs();
extern bool send_assigned_jobs_multi();

View File

@ -173,6 +173,7 @@ int SCHED_CONFIG::parse(FILE* f) {
if (xp.parse_int("feeder_query_size", feeder_query_size)) continue;
if (xp.parse_str("httpd_user", httpd_user, sizeof(httpd_user))) continue;
if (xp.parse_bool("enable_assignment", enable_assignment)) continue;
if (xp.parse_bool("enable_assignment_multi", enable_assignment_multi)) continue;
if (xp.parse_bool("job_size_matching", job_size_matching)) continue;
if (xp.parse_bool("dont_send_jobs", dont_send_jobs)) continue;

View File

@ -95,6 +95,7 @@ struct SCHED_CONFIG {
char httpd_user[256];
// user name under which web server runs (default: apache)
bool enable_assignment;
bool enable_assignment_multi;
bool job_size_matching;
bool dont_send_jobs;

View File

@ -1846,6 +1846,17 @@ void send_work() {
}
}
if (config.enable_assignment_multi) {
if (send_assigned_jobs_multi()) {
if (config.debug_assignment) {
log_messages.printf(MSG_NORMAL,
"[assign] [HOST#%d] sent assigned jobs\n", g_reply->host.id
);
}
goto done;
}
}
if (config.workload_sim && g_request->have_other_results_list) {
init_ip_results(
g_request->global_prefs.work_buf_min(),

View File

@ -184,7 +184,7 @@ int SCHED_SHMEM::scan_tables() {
}
n = 0;
while (!assignment.enumerate()) {
while (!assignment.enumerate("multi <> 0")) {
assignments[n++] = assignment;
if (n == MAX_ASSIGNMENTS) {
overflow("assignments", "MAX_ASSIGNMENTS");

View File

@ -302,6 +302,48 @@ bool app_plan_uses_gpu(const char* plan_class) {
return false;
}
// Arrange that further results for this workunit
// will be sent only to hosts with the given user ID.
// This could be used, for example, so that late workunits
// are sent only to cloud or cluster resources
//
int restrict_wu_to_user(DB_WORKUNIT& wu, int userid) {
DB_RESULT result;
DB_ASSIGNMENT asg;
char buf[256];
int retval;
// mark unsent results as DIDNT_NEED
//
sprintf(buf, "workunitid=%d and server_state=%d",
wu.id, RESULT_SERVER_STATE_UNSENT
);
while (result.enumerate(buf)) {
char buf2[256];
sprintf(buf2, "server_state=%d, outcome=%d",
RESULT_SERVER_STATE_OVER,
RESULT_OUTCOME_DIDNT_NEED
);
result.update_field(buf2);
}
// mark the WU as TRANSITION_NO_NEW_RESULTS
//
sprintf(buf, "transitioner_flags=%d", TRANSITION_NO_NEW_RESULTS);
retval = wu.update_field(buf);
if (retval) return retval;
// create an assignment record
//
asg.clear();
asg.create_time = time(0);
asg.target_id = userid;
asg.target_type = ASSIGN_USER;
asg.multi = 0;
asg.workunitid = wu.id;
retval = asg.insert();
return retval;
}
#ifdef GCL_SIMULATOR

View File

@ -91,6 +91,8 @@ extern bool is_arg(const char*, const char*);
extern bool app_plan_uses_gpu(const char* plan_class);
extern int restrict_wu_to_user(DB_WORKUNIT& wu, int userid);
#ifdef GCL_SIMULATOR
extern void simulator_signal_handler(int signum);
extern void continue_simulation(const char *daemonname);

View File

@ -157,27 +157,6 @@ int handle_wu(
TRANSITIONER_ITEM& wu_item = items[0];
TRANSITIONER_ITEM wu_item_original = wu_item;
// "assigned" WUs aren't supposed to be handled by the transitioner.
// If we get one, it's an error
//
if (config.enable_assignment && strstr(wu_item.name, ASSIGNED_WU_STR)) {
DB_WORKUNIT wu;
char buf[256];
wu.id = wu_item.id;
log_messages.printf(MSG_CRITICAL,
"Assigned WU %d unexpectedly found by transitioner\n", wu.id
);
sprintf(buf, "transition_time=%d", INT_MAX);
retval = wu.update_field(buf);
if (retval) {
log_messages.printf(MSG_CRITICAL,
"update_field failed: %s\n", boincerror(retval)
);
}
return 0;
}
// count up the number of results in various states,
// and check for timed-out results
//
@ -450,7 +429,9 @@ int handle_wu(
//
std::string values;
char value_buf[MAX_QUERY_LEN];
if (n_new_results_needed > 0) {
if (wu_item.transitioner_flags != TRANSITION_NO_NEW_RESULTS
&& n_new_results_needed > 0
) {
log_messages.printf(
MSG_NORMAL,
"[WU#%d %s] Generating %d more results (%d target - %d unsent - %d in progress - %d success)\n",

View File

@ -309,7 +309,7 @@ int create_work(
fprintf(stderr, "target_nresults > max_success_results; can't create job\n");
return ERR_INVALID_PARAM;
}
if (strstr(wu.name, ASSIGNED_WU_STR)) {
if (wu.transitioner_flags & TRANSITION_NONE) {
wu.transition_time = INT_MAX;
} else {
wu.transition_time = time(0);

View File

@ -225,14 +225,6 @@ int main(int argc, const char** argv) {
sprintf(result_template_file, "templates/%s_out", app.name);
}
if (assign_flag) {
if (!strstr(wu.name, ASSIGNED_WU_STR)) {
fprintf(stderr,
"Assigned WU names must contain '%s'\n", ASSIGNED_WU_STR
);
exit(1);
}
}
retval = config.parse_file(config_dir);
if (retval) {
fprintf(stderr, "Can't parse config file: %d\n", retval);