- Server: add "job assignment" feature.

Lets you assign a WU to a particular host,
    to one or all hosts belonging to a user or team, or to all hosts.
    See http://boinc.berkeley.edu/trac/wiki/AssignedWork
    Disabled unless you include <enable_assignment> in config.xml
    Uses a new DB table.
    Tested but only a little.
- Server: code cleanup; moved result-handling to a new file,
    and removed the PLATFORM_LIST arg to everything
    (put it in SCHEDULER_REQUEST instead)

svn path=/trunk/boinc/; revision=14767
This commit is contained in:
David Anderson 2008-02-21 00:47:50 +00:00
parent fb46f06881
commit 54519a4ee1
36 changed files with 1032 additions and 468 deletions

View File

@ -774,6 +774,8 @@ struct GRAPHICS_APP {
static GRAPHICS_APP ga_win(false), ga_full(true);
static bool have_graphics_app;
// The following is for backwards compatibility with version 5 clients.
//
static inline void handle_graphics_messages() {
static char graphics_app_path[1024];
char buf[MSG_CHANNEL_SIZE];
@ -785,6 +787,9 @@ static inline void handle_graphics_messages() {
GRAPHICS_APP_FILENAME, graphics_app_path,
sizeof(graphics_app_path)
);
// if the above returns "graphics_app", there was no link file,
// so there's no graphics app
//
if (!strcmp(graphics_app_path, GRAPHICS_APP_FILENAME)) {
have_graphics_app = false;
} else {

View File

@ -1476,3 +1476,40 @@ Charlie Feb 20 2008
client/
app_graphics.C
David Feb 20 2008
- Server: add "job assignment" feature.
Lets you assign a WU to a particular host,
to one or all hosts belonging to a user or team, or to all hosts.
See http://boinc.berkeley.edu/trac/wiki/AssignedWork
Disabled unless you include <enable_assignment> in config.xml
Uses a new DB table.
Tested but only a little.
- Server: code cleanup; moved result-handling to a new file,
and removed the PLATFORM_LIST arg to everything
(put it in SCHEDULER_REQUEST instead)
api/
boinc_api.C
db/
boinc_db.C,h
schema.sql
html/
inc/
db_ops.inc
ops/
assign.php (new)
db_update.php
sched/
Makefile.am
feeder.C
file_upload_handler.C
handle_request.C
sched_assign.C,h (new)
sched_result.C,h (new)
sched_*
transitioner.C
tools/
Makefile.am
backend_lib.C,h
create_work.C

View File

@ -64,6 +64,7 @@ void WORKUNIT::clear() {memset(this, 0, sizeof(*this));}
void CREDITED_JOB::clear() {memset(this, 0, sizeof(*this));}
void MSG_FROM_HOST::clear() {memset(this, 0, sizeof(*this));}
void MSG_TO_HOST::clear() {memset(this, 0, sizeof(*this));}
void ASSIGNMENT::clear() {memset(this, 0, sizeof(*this));}
void TRANSITIONER_ITEM::clear() {memset(this, 0, sizeof(*this));}
void VALIDATOR_ITEM::clear() {memset(this, 0, sizeof(*this));}
void SCHED_RESULT_ITEM::clear() {memset(this, 0, sizeof(*this));}
@ -90,6 +91,8 @@ DB_MSG_FROM_HOST::DB_MSG_FROM_HOST(DB_CONN* dc) :
DB_BASE("msg_from_host", dc?dc:&boinc_db){}
DB_MSG_TO_HOST::DB_MSG_TO_HOST(DB_CONN* dc) :
DB_BASE("msg_to_host", dc?dc:&boinc_db){}
DB_ASSIGNMENT::DB_ASSIGNMENT(DB_CONN* dc) :
DB_BASE("assignment", dc?dc:&boinc_db){}
DB_TRANSITIONER_ITEM_SET::DB_TRANSITIONER_ITEM_SET(DB_CONN* dc) :
DB_BASE_SPECIAL(dc?dc:&boinc_db){}
DB_VALIDATOR_ITEM_SET::DB_VALIDATOR_ITEM_SET(DB_CONN* dc) :
@ -114,6 +117,7 @@ int DB_WORKUNIT::get_id() {return id;}
int DB_RESULT::get_id() {return id;}
int DB_MSG_FROM_HOST::get_id() {return id;}
int DB_MSG_TO_HOST::get_id() {return id;}
int DB_ASSIGNMENT::get_id() {return id;}
void DB_PLATFORM::db_print(char* buf){
sprintf(buf,
@ -873,6 +877,36 @@ void DB_MSG_TO_HOST::db_parse(MYSQL_ROW& r) {
strcpy2(xml, r[i++]);
}
void DB_ASSIGNMENT::db_print(char* buf) {
sprintf(buf,
"create_time=%d, "
"target_id=%d, "
"target_type=%d, "
"multi=%d, "
"workunitid=%d, "
"resultid=%d",
create_time,
target_id,
target_type,
multi,
workunitid,
resultid
);
}
void DB_ASSIGNMENT::db_parse(MYSQL_ROW& r) {
int i=0;
clear();
id = atoi(r[i++]);
create_time = atoi(r[i++]);
target_id = atoi(r[i++]);
target_type = atoi(r[i++]);
multi = atoi(r[i++]);
workunitid = atoi(r[i++]);
resultid = atoi(r[i++]);
}
void TRANSITIONER_ITEM::parse(MYSQL_ROW& r) {
int i=0;
clear();
@ -1595,6 +1629,7 @@ int DB_SCHED_RESULT_ITEM_SET::update_workunits() {
);
for (i=0; i<results.size(); i++) {
if (results[i].id == 0) continue; // skip non-updated results
if (strstr(results[i].name, "asgn")) continue; // skip assigned jobs
if (!first) strcat(query, ",");
first = false;
sprintf(buf, "%d", results[i].workunitid);

View File

@ -430,6 +430,12 @@ struct CREDITED_JOB {
// The result arrived after the canonical result's files were deleted,
// so we can't determine if it's valid
// values for ASSIGNMENT.target_type
#define ASSIGN_NONE 0
#define ASSIGN_HOST 1
#define ASSIGN_USER 2
#define ASSIGN_TEAM 3
struct RESULT {
int id;
int create_time;
@ -499,6 +505,17 @@ struct MSG_TO_HOST {
void clear();
};
struct ASSIGNMENT {
int id;
int create_time;
int target_id;
int target_type;
int multi;
int workunitid;
int resultid;
void clear();
};
struct TRANSITIONER_ITEM {
int id; // WARNING: this is the WU ID
char name[256];
@ -636,6 +653,14 @@ public:
void db_parse(MYSQL_ROW &row);
};
class DB_ASSIGNMENT : public DB_BASE, public ASSIGNMENT {
public:
DB_ASSIGNMENT(DB_CONN* p=0);
int get_id();
void db_print(char*);
void db_parse(MYSQL_ROW& row);
};
// The transitioner uses this to get (WU, result) pairs efficiently.
// Each call to enumerate() returns a list of the pairs for a single WU
//

View File

@ -48,6 +48,5 @@ create table bossa_job_inst (
create table bossa_user (
user_id integer not null,
info text
-- Info about skill.
-- May depend on app; may be scalar or something else
-- Project-dependent info about user's ability and performance.
);

View File

@ -246,6 +246,8 @@ create table result (
primary key (id)
) engine=InnoDB;
-- the following are used to implement trickle messages
create table msg_from_host (
id integer not null auto_increment,
create_time integer not null,
@ -266,16 +268,22 @@ create table msg_to_host (
primary key (id)
) engine=InnoDB;
create table workseq (
-- An assignment of a WU to a specific host, user, or team, or to all hosts
--
create table assignment (
id integer not null auto_increment,
create_time integer not null,
state integer not null,
hostid integer not null,
wuid_last_done integer not null,
wuid_last_sent integer not null,
workseqid_master integer not null,
target_id integer not null,
-- ID of target entity (see below)
target_type integer not null,
-- 0=none, 1=host, 2=user, 3=team
multi tinyint not null,
-- 0=single host, 1=all hosts in set
workunitid integer not null,
resultid integer not null,
-- if not multi, the result
primary key (id)
) engine=InnoDB;
) engine = InnoDB;
-- EVERYTHING FROM HERE ON IS USED ONLY FROM PHP,
-- SO NOT IN BOINC_DB.H ETC.

View File

@ -1,5 +1,11 @@
<?php
$cvs_version_tracker[]="\$Id$"; //Generated automatically - do not edit
class BoincAssignment {
static function enum($where_clause) {
$db = BoincDb::get();
return $db->enum('assignment', 'BoincAssignment', $where_clause);
}
}
define("NVALIDATE_STATES", 6);
@ -1386,4 +1392,5 @@ function host_name_by_id($hostid) {
}
}
$cvs_version_tracker[]="\$Id$"; //Generated automatically - do not edit
?>

60
html/ops/assign.php Normal file
View File

@ -0,0 +1,60 @@
<?php
require_once("../inc/util.inc");
require_once("../inc/util_ops.inc");
require_once("../inc/db_ops.inc");
function show_assign($asgn) {
$when = time_str($asgn->create_time);
switch ($asgn->target_type) {
case 0:
$x = "All hosts";
break;
case 1:
$x = "<a href=db_action.php?table=host&id=$asgn->id>Host $asgn->target_id</a>";
break;
case 2:
if ($asgn->multi) {
$y = "All hosts belonging to ";
} else {
$y = "One host belonging to ";
}
$x = "$y<a href=db_action.php?table=user&id=$asgn->target_id>Host $asgn->target_id</a>";
break;
case 3:
if ($asgn->multi) {
$y = "All hosts belonging to ";
} else {
$y = "One host belonging to ";
}
$x = "$y<a href=db_action.php?table=team&id=$asgn->target_id>Team $asgn->target_id</a>";
break;
}
echo "<tr>
<td>$asgn->id (created $when)</td>
<td>$x</td>
<td><a href=db_action.php?table=workunit&id=$asgn->workunitid>$asgn->workunitid</a></td>
<td><a href=db_action.php?table=result&id=$asgn->resultid>$asgn->resultid</a></td>
</tr>
";
}
function show_assigns() {
admin_page_head("Assignments");
$asgns = BoincAssignment::enum();
start_table();
table_header("Assignment ID/time", "target", "workunit", "result");
foreach ($asgns as $asgn) {
show_assign($asgn);
}
end_table();
admin_page_tail();
}
$action = get_str('action', true);
switch ($action) {
default:
show_assigns();
}
?>

View File

@ -541,11 +541,25 @@ function update_12_28_2007() {
");
}
function update_2_18_2008() {
do_query("create table assignment (
id integer not null auto_increment,
create_time integer not null,
target_id integer not null,
target_type integer not null,
multi tinyint not null,
workunitid integer not null,
resultid integer not null,
primary key (id)
) engine = InnoDB
");
}
// modify the following to call the function you want.
// Make sure you do all needed functions, in order.
// (Look at your DB structure using "explain" queries to see
// which ones you need).
//update_11_14_2007();
//update_2_18_2008();
?>

View File

@ -312,17 +312,13 @@ void get_sandbox_account_token() {
password_str = r_base64_decode(encoded_password_str);
if (string::npos != encoded_username_str.find('\\')) {
domainname_str =
encoded_username_str.substr(
0,
encoded_username_str.find('\\')
);
username_str =
encoded_username_str.substr(
encoded_username_str.rfind(_T('\\')) + 1,
encoded_username_str.length() - encoded_username_str.rfind(_T('\\')) - 1
);
domainname_str = encoded_username_str.substr(
0, encoded_username_str.find('\\')
);
username_str = encoded_username_str.substr(
encoded_username_str.rfind(_T('\\')) + 1,
encoded_username_str.length() - encoded_username_str.rfind(_T('\\')) - 1
);
retval = LogonUserEx(
username_str.c_str(),
domainname_str.c_str(),
@ -337,7 +333,6 @@ void get_sandbox_account_token() {
);
} else {
username_str = encoded_username_str;
retval = LogonUserEx(
username_str.c_str(),
NULL,

View File

@ -71,10 +71,12 @@ cgi_SOURCES = \
hr_info.C \
main.C \
sched_array.C \
sched_assign.C \
sched_hr.C \
sched_resend.C \
sched_send.C \
sched_locality.C \
sched_result.C \
sched_send.C \
sched_timezone.C \
server_types.C \
time_stats_log.C \
@ -184,17 +186,19 @@ fcgi_SOURCES = \
hr.C \
hr_info.C \
main.C \
sched_send.C \
sched_resend.C \
sched_array.C \
sched_hr.C \
server_types.C \
sched_shmem.C \
sched_util.C \
sched_assign.C \
sched_config.C \
sched_msgs.C \
sched_hr.C \
sched_locality.C \
sched_msgs.C \
sched_resend.C \
sched_result.C \
sched_send.C \
sched_shmem.C \
sched_timezone.C \
sched_util.C \
server_types.C \
time_stats_log.C \
edf_sim.C \
../db/boinc_db.C \

View File

@ -659,10 +659,12 @@ int main(int argc, char** argv) {
"read "
"%d platforms, "
"%d apps, "
"%d app_versions\n",
"%d app_versions, "
"%d assignments\n",
ssp->nplatforms,
ssp->napps,
ssp->napp_versions
ssp->napp_versions,
ssp->nassignments
);
log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,
"Using %d job slots\n", ssp->max_wu_results

View File

@ -553,7 +553,7 @@ int handle_request(FILE* in, R_RSA_PUBLIC_KEY& key) {
}
if (!did_something) {
log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL, "handle_request: no command\n");
return return_error(ERR_PERMANENT, "no command");
return return_error(ERR_TRANSIENT, "no command");
}
log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG, "elapsed time %f seconds\n", elapsed_wallclock_time());

View File

@ -52,6 +52,7 @@ using namespace std;
#include "sched_send.h"
#include "sched_config.h"
#include "sched_locality.h"
#include "sched_result.h"
#include "time_stats_log.h"
#ifdef _USING_FCGI_
@ -766,296 +767,6 @@ int handle_global_prefs(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
return 0;
}
// handle completed results
//
int handle_results(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
DB_SCHED_RESULT_ITEM_SET result_handler;
SCHED_RESULT_ITEM* srip;
unsigned int i;
int retval;
RESULT* rp;
bool changed_host=false;
if (sreq.results.size() == 0) return 0;
// copy reported results to a separate vector, "result_handler",
// initially with only the "name" field present
//
for (i=0; i<sreq.results.size(); i++) {
result_handler.add_result(sreq.results[i].name);
}
// read results from database into "result_handler".
// Quantities that must be read from the DB are those
// where srip (see below) appears as an rval.
// These are: id, name, server_state, received_time, hostid, validate_state.
// Quantities that must be written to the DB are those for
// which srip appears as an lval. These are:
// hostid, teamid, received_time, client_state, cpu_time, exit_status,
// app_version_num, claimed_credit, server_state, stderr_out,
// xml_doc_out, outcome, validate_state
//
retval = result_handler.enumerate();
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] Batch query failed\n",
reply.host.id
);
}
// loop over results reported by client
//
// A note about acks: we send an ack for result received if either
// 1) there's some problem with it (wrong state, host, not in DB) or
// 2) we update it successfully.
// In other words, the only time we don't ack a result is when
// it looks OK but the update failed.
//
for (i=0; i<sreq.results.size(); i++) {
rp = &sreq.results[i];
retval = result_handler.lookup_result(rp->name, &srip);
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] [RESULT#? %s] can't find result\n",
reply.host.id, rp->name
);
reply.result_acks.push_back(std::string(rp->name));
continue;
}
log_messages.printf(
SCHED_MSG_LOG::MSG_NORMAL, "[HOST#%d] [RESULT#%d %s] got result (DB: server_state=%d outcome=%d client_state=%d validate_state=%d delete_state=%d)\n",
reply.host.id, srip->id, srip->name, srip->server_state, srip->outcome, srip->client_state, srip->validate_state, srip->file_delete_state
);
// Do various sanity checks.
// If one of them fails, set srip->id = 0,
// which suppresses the DB update later on
//
// If result has server_state OVER
// if outcome NO_REPLY accept it (it's just late).
// else ignore it
//
if (srip->server_state == RESULT_SERVER_STATE_OVER) {
const char *dont_replace_result = NULL;
switch (srip->outcome) {
case RESULT_OUTCOME_INIT:
// should never happen!
dont_replace_result = "this result was never sent";
break;
case RESULT_OUTCOME_SUCCESS:
// don't replace a successful result!
dont_replace_result = "result already reported as success";
break;
case RESULT_OUTCOME_COULDNT_SEND:
// should never happen!
dont_replace_result = "this result couldn't be sent";
break;
case RESULT_OUTCOME_CLIENT_ERROR:
// should never happen!
dont_replace_result = "result already reported as error";
break;
case RESULT_OUTCOME_CLIENT_DETACHED:
case RESULT_OUTCOME_NO_REPLY:
// result is late in arriving, but keep it anyhow
break;
case RESULT_OUTCOME_DIDNT_NEED:
// should never happen
dont_replace_result = "this result wasn't sent (not needed)";
break;
case RESULT_OUTCOME_VALIDATE_ERROR:
// we already passed through the validator, so
// don't keep the new result
dont_replace_result = "result already reported, validate error";
break;
default:
dont_replace_result = "server logic bug; please alert BOINC developers";
break;
}
if (dont_replace_result) {
char buf[256];
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] [RESULT#%d %s] result already over [outcome=%d validate_state=%d]: %s\n",
reply.host.id, srip->id, srip->name, srip->outcome, srip->validate_state, dont_replace_result
);
sprintf(buf, "Completed result %s refused: %s", srip->name, dont_replace_result);
USER_MESSAGE um(buf, "high");
reply.insert_message(um);
srip->id = 0;
reply.result_acks.push_back(std::string(rp->name));
continue;
}
}
if (srip->server_state == RESULT_SERVER_STATE_UNSENT) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] [RESULT#%d %s] got unexpected result: server state is %d\n",
reply.host.id, srip->id, srip->name, srip->server_state
);
srip->id = 0;
reply.result_acks.push_back(std::string(rp->name));
continue;
}
if (srip->received_time) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] [RESULT#%d %s] got result twice\n",
reply.host.id, srip->id, srip->name
);
srip->id = 0;
reply.result_acks.push_back(std::string(rp->name));
continue;
}
if (srip->hostid != reply.host.id) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] [RESULT#%d %s] got result from wrong host; expected [HOST#%d]\n",
reply.host.id, srip->id, srip->name, srip->hostid
);
DB_HOST result_host;
retval = result_host.lookup_id(srip->hostid);
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[RESULT#%d %s] Can't lookup [HOST#%d]\n",
srip->id, srip->name, srip->hostid
);
srip->id = 0;
reply.result_acks.push_back(std::string(rp->name));
continue;
} else if (result_host.userid != reply.host.userid) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[USER#%d] [HOST#%d] [RESULT#%d %s] Not even the same user; expected [USER#%d]\n",
reply.host.userid, reply.host.id, srip->id, srip->name, result_host.userid
);
srip->id = 0;
reply.result_acks.push_back(std::string(rp->name));
continue;
} else {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] [RESULT#%d %s] Allowing result because same USER#%d\n",
reply.host.id, srip->id, srip->name, reply.host.userid
);
changed_host = true;
}
} // hostids do not match
// Modify the in-memory copy obtained from the DB earlier.
// If we found a problem above,
// we have continued and skipped this modify
//
srip->hostid = reply.host.id;
srip->teamid = reply.user.teamid;
srip->received_time = time(0);
srip->client_state = rp->client_state;
srip->cpu_time = rp->cpu_time;
// check for impossible CPU time
//
double elapsed_time = srip->received_time - srip->sent_time;
if (elapsed_time < 0) {
log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,
"[HOST#%d] [RESULT#%d] inconsistent sent/received times\n", srip->hostid, srip->id
);
} else {
if (srip->cpu_time > elapsed_time) {
log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,
"[HOST#%d] [RESULT#%d] excessive CPU time: reported %f > elapsed %f%s\n",
srip->hostid, srip->id, srip->cpu_time, elapsed_time, changed_host?" [OK: HOST changed]":""
);
if (!changed_host) srip->cpu_time = elapsed_time;
}
}
srip->exit_status = rp->exit_status;
srip->app_version_num = rp->app_version_num;
if (rp->fpops_cumulative || rp->intops_cumulative) {
srip->claimed_credit = fpops_to_credit(rp->fpops_cumulative, rp->intops_cumulative);
} else if (rp->fpops_per_cpu_sec || rp->intops_per_cpu_sec) {
srip->claimed_credit = fpops_to_credit(
rp->fpops_per_cpu_sec*srip->cpu_time,
rp->intops_per_cpu_sec*srip->cpu_time
);
} else {
srip->claimed_credit = srip->cpu_time * reply.host.claimed_credit_per_cpu_sec;
}
log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,
"cpu %f cpcs %f, cc %f\n", srip->cpu_time, reply.host.claimed_credit_per_cpu_sec, srip->claimed_credit
);
srip->server_state = RESULT_SERVER_STATE_OVER;
strlcpy(srip->stderr_out, rp->stderr_out, sizeof(srip->stderr_out));
strlcpy(srip->xml_doc_out, rp->xml_doc_out, sizeof(srip->xml_doc_out));
// look for exit status and app version in stderr_out
// (historical - can be deleted at some point)
//
parse_int(srip->stderr_out, "<exit_status>", srip->exit_status);
parse_int(srip->stderr_out, "<app_version>", srip->app_version_num);
if ((srip->client_state == RESULT_FILES_UPLOADED) && (srip->exit_status == 0)) {
srip->outcome = RESULT_OUTCOME_SUCCESS;
log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,
"[RESULT#%d %s]: setting outcome SUCCESS\n",
srip->id, srip->name
);
reply.got_good_result();
} else {
log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,
"[RESULT#%d %s]: client_state %d exit_status %d; setting outcome ERROR\n",
srip->id, srip->name, srip->client_state, srip->exit_status
);
srip->outcome = RESULT_OUTCOME_CLIENT_ERROR;
srip->validate_state = VALIDATE_STATE_INVALID;
reply.got_bad_result();
}
} // loop over all incoming results
// Update the result records
// (skip items that we previously marked to skip)
//
for (i=0; i<result_handler.results.size(); i++) {
SCHED_RESULT_ITEM& sri = result_handler.results[i];
if (sri.id == 0) continue;
retval = result_handler.update_result(sri);
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] [RESULT#%d %s] can't update result: %s\n",
reply.host.id, sri.id, sri.name, boinc_db.error_string()
);
} else {
reply.result_acks.push_back(std::string(sri.name));
}
}
// set transition_time for the results' WUs
//
retval = result_handler.update_workunits();
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] can't update WUs: %d\n",
reply.host.id, retval
);
}
return 0;
}
// if the client has an old code sign public key,
// send it the new one, with a signature based on the old one.
// If they don't have a code sign key, send them one
@ -1309,7 +1020,6 @@ void process_request(
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, SCHED_SHMEM& ss,
char* code_sign_key
) {
PLATFORM_LIST platforms;
PLATFORM* platform;
int retval;
double last_rpc_time;
@ -1341,7 +1051,7 @@ void process_request(
warn_user_if_core_client_upgrade_scheduled(sreq, reply);
}
if (config.locality_scheduling) {
if (config.locality_scheduling || config.enable_assignment) {
have_no_work = false;
} else {
lock_sema();
@ -1460,15 +1170,16 @@ void process_request(
write_time_stats_log(reply);
}
fprintf(stderr, "FOO\n");
// look up the client's platform(s) in the DB
//
platform = ss.lookup_platform(sreq.platform.name);
if (platform) platforms.list.push_back(platform);
if (platform) sreq.platforms.list.push_back(platform);
for (i=0; i<sreq.alt_platforms.size(); i++) {
platform = ss.lookup_platform(sreq.alt_platforms[i].name);
if (platform) platforms.list.push_back(platform);
if (platform) sreq.platforms.list.push_back(platform);
}
if (platforms.list.size() == 0) {
if (sreq.platforms.list.size() == 0) {
sprintf(buf, "platform '%s' not found", sreq.platform.name);
USER_MESSAGE um(buf, "low");
reply.insert_message(um);
@ -1487,7 +1198,7 @@ void process_request(
reply.wreq.nresults_on_host = sreq.other_results.size();
if (sreq.have_other_results_list) {
if (config.resend_lost_results) {
if (resend_lost_work(sreq, reply, platforms, ss)) {
if (resend_lost_work(sreq, reply, ss)) {
ok_to_send_work = false;
}
}
@ -1496,6 +1207,7 @@ void process_request(
}
}
fprintf(stderr, "FOO 1\n");
// if last RPC was within config.min_sendwork_interval, don't send work
//
if (!have_no_work && ok_to_send_work && sreq.work_req_seconds > 0) {
@ -1519,8 +1231,9 @@ void process_request(
reply.set_delay(1.01*config.min_sendwork_interval);
}
}
fprintf(stderr, "FOO 2 %d\n", ok_to_send_work);
if (ok_to_send_work) {
send_work(sreq, reply, platforms, ss);
send_work(sreq, reply, ss);
}
}
@ -1554,10 +1267,11 @@ void handle_request(
if (sreq.parse(fin) == 0){
log_messages.printf(
SCHED_MSG_LOG::MSG_NORMAL,
"Handling request: IP %s, auth %s, host %d, platform %s, version %d.%d.%d\n",
"Handling request: IP %s, auth %s, host %d, platform %s, version %d.%d.%d, work req %d sec\n",
get_remote_addr(), sreq.authenticator, sreq.hostid, sreq.platform.name,
sreq.core_client_major_version, sreq.core_client_minor_version,
sreq.core_client_release
sreq.core_client_release,
(int)sreq.work_req_seconds
);
process_request(sreq, sreply, ss, code_sign_key);

View File

@ -44,8 +44,7 @@
// send only results that were previously infeasible for some host
//
void scan_work_array(
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
SCHED_SHMEM& ss
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, SCHED_SHMEM& ss
) {
int i, j, retval, n, rnd_off;
WORKUNIT wu;
@ -125,7 +124,7 @@ void scan_work_array(
// Find the app and app_version for the client's platform.
// If none, treat the WU as infeasible
//
if (anonymous(platforms.list[0])) {
if (anonymous(sreq.platforms.list[0])) {
app = ss.lookup_app(wu.appid);
found = sreq.has_version(*app);
if (!found) {
@ -133,7 +132,7 @@ void scan_work_array(
}
avp = NULL;
} else {
found = find_app_version(reply.wreq, wu, platforms, ss, app, avp);
found = find_app_version(sreq, reply.wreq, wu, ss, app, avp);
if (!found) {
continue;
}
@ -264,7 +263,7 @@ void scan_work_array(
}
retval = add_result_to_reply(
result, wu, sreq, reply, platforms, app, avp
result, wu, sreq, reply, app, avp
);
// add_result_to_reply() fails only in fairly pathological cases -

View File

@ -1,3 +1,3 @@
extern void scan_work_array(
SCHEDULER_REQUEST&, SCHEDULER_REPLY&, PLATFORM_LIST&, SCHED_SHMEM&
SCHEDULER_REQUEST&, SCHEDULER_REPLY&, SCHED_SHMEM&
);

169
sched/sched_assign.C Normal file
View File

@ -0,0 +1,169 @@
// Berkeley Open Infrastructure for Network Computing
// http://boinc.berkeley.edu
// Copyright (C) 2008 University of California
//
// This is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation;
// either version 2.1 of the License, or (at your option) any later version.
//
// This software is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU Lesser General Public License for more details.
//
// To view the GNU Lesser General Public License visit
// http://www.gnu.org/copyleft/lesser.html
// or write to the Free Software Foundation, Inc.,
// 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#include "boinc_db.h"
#include "crypt.h"
#include "backend_lib.h"
#include "error_numbers.h"
#include "server_types.h"
#include "main.h"
#include "sched_msgs.h"
#include "sched_send.h"
#include "sched_assign.h"
static int send_assigned_job(
ASSIGNMENT& asg, SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply
) {
int retval;
DB_WORKUNIT wu;
char rtfpath[256], suffix[256], path[256], buf[256];
static bool first=true;
static int seqno=0;
static R_RSA_PRIVATE_KEY key;
APP* app;
APP_VERSION* avp;
if (first) {
first = false;
sprintf(path, "%s/upload_private", config.key_dir);
retval = read_key_file(path, key);
if (retval) {
log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL, "can't read key\n");
return -1;
}
}
retval = wu.lookup_id(asg.workunitid);
if (retval) {
log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL, "ERROR: WU NOT FOUND\n");
return retval;
}
app = ssp->lookup_app(wu.appid);
if (!app) {
log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL, "ERROR: APP NOT FOUND\n");
return ERR_NOT_FOUND;
}
bool found = find_app_version(request, reply.wreq, wu, *ssp, app, avp);
if (!found) {
log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL, "ERROR: APP VERSION NOT FOUND\n");
return ERR_NOT_FOUND;
}
sprintf(rtfpath, "../%s", wu.result_template_file);
sprintf(suffix, "%d_%d_%d", getpid(), time(0), seqno++);
retval = create_result(wu, rtfpath, suffix, key, config, 0, 0);
if (retval) {
log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL,
"[WU#%d %s] create_result() %d\n", wu.id, wu.name, retval
);
return retval;
}
int result_id = boinc_db.insert_id();
DB_RESULT result;
retval = result.lookup_id(result_id);
add_result_to_reply(result, wu, request, reply, app, avp);
// if this is a one-job assignment, fill in assignment.resultid
// so that it doesn't get sent again
//
if (!asg.multi) {
DB_ASSIGNMENT db_asg;
db_asg.id = asg.id;
sprintf(buf, "resultid=%d", result_id);
retval = db_asg.update_field(buf);
if (retval) {
log_messages.printf(SCHED_MSG_LOG::MSG_CRITICAL, "ERROR: ASGN UPDATE\n");
return retval;
}
asg.resultid = result_id;
}
log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,
"[WU#%d] [RESULT#%d] [HOST#%d] send assignment %d\n",
wu.id, result_id, reply.host.id
);
return 0;
}
// Send this host any jobs assigned to it, or to its user/team
// Return true iff we sent anything
//
bool send_assigned_jobs(SCHEDULER_REQUEST& request, SCHEDULER_REPLY& reply) {
DB_RESULT result;
int retval;
char buf[256];
bool sent_something = false;
for (int i=0; i<ssp->nassignments; i++) {
ASSIGNMENT& asg = ssp->assignments[i];
// see if this assignment applies to this host
//
if (asg.resultid) continue;
switch (asg.target_type) {
case ASSIGN_NONE:
sprintf(buf, "hostid=%d and workunitid=%d",
reply.host.id, asg.workunitid
);
retval = result.lookup(buf);
if (retval == ERR_NOT_FOUND) {
retval = send_assigned_job(asg, request, reply);
if (!retval) sent_something = true;
}
break;
case ASSIGN_HOST:
if (reply.host.id != asg.target_id) continue;
sprintf(buf, "workunitid=%d", asg.workunitid);
retval = result.lookup(buf);
if (retval == ERR_NOT_FOUND) {
retval = send_assigned_job(asg, request, reply);
if (!retval) sent_something = true;
}
break;
case ASSIGN_USER:
if (reply.user.id != asg.target_id) continue;
if (asg.multi) {
sprintf(buf, "workunitid=%d and hostid=%d", asg.workunitid, reply.host.id);
} else {
sprintf(buf, "workunitid=%d", asg.workunitid);
}
retval = result.lookup(buf);
if (retval == ERR_NOT_FOUND) {
retval = send_assigned_job(asg, request, reply);
if (!retval) sent_something = true;
}
break;
case ASSIGN_TEAM:
if (reply.team.id != asg.target_id) continue;
if (asg.multi) {
sprintf(buf, "workunitid=%d and hostid=%d", asg.workunitid, reply.host.id);
} else {
sprintf(buf, "workunitid=%d", asg.workunitid);
}
retval = result.lookup(buf);
if (retval == ERR_NOT_FOUND) {
retval = send_assigned_job(asg, request, reply);
if (!retval) sent_something = true;
}
break;
}
}
return sent_something;
}

20
sched/sched_assign.h Normal file
View File

@ -0,0 +1,20 @@
// Berkeley Open Infrastructure for Network Computing
// http://boinc.berkeley.edu
// Copyright (C) 2008 University of California
//
// This is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation;
// either version 2.1 of the License, or (at your option) any later version.
//
// This software is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU Lesser General Public License for more details.
//
// To view the GNU Lesser General Public License visit
// http://www.gnu.org/copyleft/lesser.html
// or write to the Free Software Foundation, Inc.,
// 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
extern bool send_assigned_jobs(SCHEDULER_REQUEST&, SCHEDULER_REPLY&);

View File

@ -140,6 +140,7 @@ int SCHED_CONFIG::parse(FILE* f) {
if (xp.parse_str(tag, "httpd_user", httpd_user, sizeof(httpd_user))) continue;
if (xp.parse_int(tag, "file_deletion_strategy", file_deletion_strategy)) continue;
if (xp.parse_bool(tag, "request_time_stats_log", request_time_stats_log)) continue;
if (xp.parse_bool(tag, "enable_assignment", enable_assignment)) continue;
// don't complain about unparsed XML;
// there are lots of tags the scheduler doesn't know about

View File

@ -110,6 +110,7 @@ public:
int file_deletion_strategy;
// select method of automatically deleting files from host
bool request_time_stats_log;
bool enable_assignment;
int parse(FILE*);
int parse_file(const char* dir=".");

View File

@ -277,8 +277,7 @@ int decrement_disk_space_locality(
//
static int possibly_send_result(
DB_RESULT& result,
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
SCHED_SHMEM& ss
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, SCHED_SHMEM& ss
) {
DB_WORKUNIT wu;
DB_RESULT result2;
@ -290,11 +289,9 @@ static int possibly_send_result(
retval = wu.lookup_id(result.workunitid);
if (retval) return ERR_DB_NOT_FOUND;
retval = get_app_version(
wu, app, avp, sreq, reply, platforms, ss
);
retval = get_app_version(wu, app, avp, sreq, reply, ss);
if (retval==ERR_NO_APP_VERSION && anonymous(platforms.list[0])) {
if (retval==ERR_NO_APP_VERSION && anonymous(sreq.platforms.list[0])) {
char help_msg_buf[512];
sprintf(help_msg_buf, "To get more %s work, finish current work, stop BOINC, remove app_info.xml file, and restart.", config.long_name);
USER_MESSAGE um(help_msg_buf, "high");
@ -319,7 +316,7 @@ static int possibly_send_result(
if (count > 0) return ERR_WU_USER_RULE;
}
return add_result_to_reply(result, wu, sreq, reply, platforms, app, avp);
return add_result_to_reply(result, wu, sreq, reply, app, avp);
}
// returns true if the work generator can not make more work for this
@ -497,7 +494,7 @@ static void flag_for_possible_removal(char* filename) {
static int send_results_for_file(
char* filename,
int& nsent,
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply,
SCHED_SHMEM& ss,
bool /*in_working_set*/
) {
@ -652,9 +649,7 @@ static int send_results_for_file(
// we found an unsent result, so try sending it.
// This *should* always work.
//
retval_send = possibly_send_result(
result, sreq, reply, platforms, ss
);
retval_send = possibly_send_result(result, sreq, reply, ss);
boinc_db.commit_transaction();
// if no app version or not enough resources, give up completely
@ -717,7 +712,7 @@ static int send_results_for_file(
// min_resultname = R.filename;
//
static int send_new_file_work_deterministic_seeded(
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply,
SCHED_SHMEM& ss, int& nsent, const char *start_f, const char *end_f
) {
DB_RESULT result;
@ -758,7 +753,7 @@ static int send_new_file_work_deterministic_seeded(
);
retval = send_results_for_file(
filename, nsent, sreq, reply, platforms, ss, false
filename, nsent, sreq, reply, ss, false
);
if (retval==ERR_NO_APP_VERSION || retval==ERR_INSUFFICIENT_RESOURCE) return retval;
@ -795,8 +790,7 @@ static bool is_host_slow(SCHEDULER_REQUEST& sreq) {
// if it has not sent any new work.
//
static int send_new_file_work_deterministic(
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
SCHED_SHMEM& ss
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, SCHED_SHMEM& ss
) {
char start_filename[256];
int getfile_retval, nsent=0;
@ -810,7 +804,7 @@ static int send_new_file_work_deterministic(
// start deterministic search with randomly chosen filename, go to
// lexical maximum
send_new_file_work_deterministic_seeded(sreq, reply, platforms, ss, nsent, start_filename, NULL);
send_new_file_work_deterministic_seeded(sreq, reply, ss, nsent, start_filename, NULL);
if (nsent) {
return 0;
}
@ -819,7 +813,7 @@ static int send_new_file_work_deterministic(
// filename, continue to randomly choosen one
if (!getfile_retval && reply.work_needed(true)) {
send_new_file_work_deterministic_seeded(
sreq, reply, platforms, ss, nsent, "", start_filename
sreq, reply, ss, nsent, "", start_filename
);
if (nsent) {
return 0;
@ -831,8 +825,7 @@ static int send_new_file_work_deterministic(
static int send_new_file_work_working_set(
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
SCHED_SHMEM& ss
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, SCHED_SHMEM& ss
) {
char filename[256];
int retval, nsent;
@ -845,21 +838,21 @@ static int send_new_file_work_working_set(
);
return send_results_for_file(
filename, nsent, sreq, reply, platforms, ss, true
filename, nsent, sreq, reply, ss, true
);
}
// prototype
static int send_old_work(
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
SCHED_SHMEM& ss, int t_min, int t_max);
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply,
SCHED_SHMEM& ss, int t_min, int t_max
);
// The host doesn't have any files for which work is available.
// Pick new file to send. Returns nonzero if no work is available.
//
static int send_new_file_work(
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
SCHED_SHMEM& ss
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, SCHED_SHMEM& ss
) {
while (reply.work_needed(true)) {
@ -880,7 +873,7 @@ static int send_new_file_work(
"send_new_file_work(): try to send old work\n"
);
retval_sow=send_old_work(sreq, reply, platforms, ss, start, end);
retval_sow=send_old_work(sreq, reply, ss, start, end);
if (retval_sow==ERR_NO_APP_VERSION || retval_sow==ERR_INSUFFICIENT_RESOURCE) return retval_sow;
@ -890,7 +883,7 @@ static int send_new_file_work(
"send_new_file_work(%d): try to send from working set\n", retry
);
retry++;
retval_snfwws=send_new_file_work_working_set(sreq, reply, platforms, ss);
retval_snfwws=send_new_file_work_working_set(sreq, reply, ss);
if (retval_snfwws==ERR_NO_APP_VERSION || retval_snfwws==ERR_INSUFFICIENT_RESOURCE) return retval_snfwws;
}
@ -899,7 +892,7 @@ static int send_new_file_work(
log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,
"send_new_file_work(): try deterministic method\n"
);
if (send_new_file_work_deterministic(sreq, reply, platforms, ss)) {
if (send_new_file_work_deterministic(sreq, reply, ss)) {
// if no work remains at all,
// we learn it here and return nonzero.
//
@ -919,7 +912,7 @@ static int send_new_file_work(
// t_min=INT_MIN if you wish to leave off the left constraint.
//
static int send_old_work(
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply,
SCHED_SHMEM& ss, int t_min, int t_max
) {
char buf[1024], filename[256];
@ -947,7 +940,7 @@ static int send_old_work(
retval = result.lookup(buf);
if (!retval) {
retval = possibly_send_result(result, sreq, reply, platforms, ss);
retval = possibly_send_result(result, sreq, reply, ss);
boinc_db.commit_transaction();
if (!retval) {
double age=(now-result.create_time)/3600.0;
@ -957,7 +950,7 @@ static int send_old_work(
extract_retval=extract_filename(result.name, filename);
if (!extract_retval) {
send_results_for_file(
filename, nsent, sreq, reply, platforms, ss, false
filename, nsent, sreq, reply, ss, false
);
} else {
// David, is this right? Is this the only place in
@ -1005,8 +998,7 @@ bool file_info_order(const FILE_INFO& fi1, const FILE_INFO& fi2) {
}
void send_work_locality(
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
SCHED_SHMEM& ss
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, SCHED_SHMEM& ss
) {
int i, nsent, nfiles, j;
@ -1092,7 +1084,7 @@ void send_work_locality(
//
if (config.locality_scheduling_send_timeout && sreq.host.n_bwdown>100000) {
int until=time(0)-config.locality_scheduling_send_timeout;
int retval_sow=send_old_work(sreq, reply, platforms, ss, INT_MIN, until);
int retval_sow=send_old_work(sreq, reply, ss, INT_MIN, until);
if (retval_sow==ERR_NO_APP_VERSION || retval_sow==ERR_INSUFFICIENT_RESOURCE) return;
}
@ -1105,7 +1097,7 @@ void send_work_locality(
if (!reply.work_needed(true)) break;
FILE_INFO& fi = sreq.file_infos[k];
retval_srff=send_results_for_file(
fi.name, nsent, sreq, reply, platforms, ss, false
fi.name, nsent, sreq, reply, ss, false
);
if (retval_srff==ERR_NO_APP_VERSION || retval_srff==ERR_INSUFFICIENT_RESOURCE) return;
@ -1142,7 +1134,7 @@ void send_work_locality(
// send new files if needed
//
if (reply.work_needed(true)) {
send_new_file_work(sreq, reply, platforms, ss);
send_new_file_work(sreq, reply, ss);
}
}

View File

@ -18,7 +18,7 @@
// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
extern void send_work_locality(
SCHEDULER_REQUEST&, SCHEDULER_REPLY&, PLATFORM_LIST&, SCHED_SHMEM&
SCHEDULER_REQUEST&, SCHEDULER_REPLY&, SCHED_SHMEM&
);
extern int decrement_disk_space_locality(

View File

@ -95,8 +95,7 @@ static int possibly_give_result_new_deadline(
// Return true if there were any such jobs
//
bool resend_lost_work(
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply,
PLATFORM_LIST& platforms, SCHED_SHMEM& ss
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, SCHED_SHMEM& ss
) {
DB_RESULT result;
std::vector<DB_RESULT>results;
@ -144,9 +143,7 @@ bool resend_lost_work(
reply.wreq.core_client_version =
sreq.core_client_major_version*100 + sreq.core_client_minor_version;
retval = get_app_version(
wu, app, avp, sreq, reply, platforms, ss
);
retval = get_app_version(wu, app, avp, sreq, reply, ss);
if (retval) {
log_messages.printf( SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] no app version [RESULT#%d]\n",
@ -196,7 +193,7 @@ bool resend_lost_work(
reply.insert_message(um);
} else {
retval = add_result_to_reply(
result, wu, sreq, reply, platforms, app, avp
result, wu, sreq, reply, app, avp
);
if (retval) {
log_messages.printf( SCHED_MSG_LOG::MSG_CRITICAL,

View File

@ -1,4 +1,4 @@
extern bool resend_lost_work(
SCHEDULER_REQUEST&, SCHEDULER_REPLY&, PLATFORM_LIST&, SCHED_SHMEM&
SCHEDULER_REQUEST&, SCHEDULER_REPLY&, SCHED_SHMEM&
);

316
sched/sched_result.C Normal file
View File

@ -0,0 +1,316 @@
// Berkeley Open Infrastructure for Network Computing
// http://boinc.berkeley.edu
// Copyright (C) 2008 University of California
//
// This is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation;
// either version 2.1 of the License, or (at your option) any later version.
//
// This software is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU Lesser General Public License for more details.
//
// To view the GNU Lesser General Public License visit
// http://www.gnu.org/copyleft/lesser.html
// or write to the Free Software Foundation, Inc.,
// 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#include "boinc_db.h"
#include "str_util.h"
#include "parse.h"
#include "server_types.h"
#include "sched_msgs.h"
#include "sched_util.h"
#include "sched_result.h"
// handle completed results
//
int handle_results(SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply) {
DB_SCHED_RESULT_ITEM_SET result_handler;
SCHED_RESULT_ITEM* srip;
unsigned int i;
int retval;
RESULT* rp;
bool changed_host=false;
if (sreq.results.size() == 0) return 0;
// copy reported results to a separate vector, "result_handler",
// initially with only the "name" field present
//
for (i=0; i<sreq.results.size(); i++) {
result_handler.add_result(sreq.results[i].name);
}
// read results from database into "result_handler".
// Quantities that must be read from the DB are those
// where srip (see below) appears as an rval.
// These are: id, name, server_state, received_time, hostid, validate_state.
// Quantities that must be written to the DB are those for
// which srip appears as an lval. These are:
// hostid, teamid, received_time, client_state, cpu_time, exit_status,
// app_version_num, claimed_credit, server_state, stderr_out,
// xml_doc_out, outcome, validate_state
//
retval = result_handler.enumerate();
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] Batch query failed\n",
reply.host.id
);
}
// loop over results reported by client
//
// A note about acks: we send an ack for result received if either
// 1) there's some problem with it (wrong state, host, not in DB) or
// 2) we update it successfully.
// In other words, the only time we don't ack a result is when
// it looks OK but the update failed.
//
for (i=0; i<sreq.results.size(); i++) {
rp = &sreq.results[i];
retval = result_handler.lookup_result(rp->name, &srip);
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] [RESULT#? %s] can't find result\n",
reply.host.id, rp->name
);
reply.result_acks.push_back(std::string(rp->name));
continue;
}
log_messages.printf(
SCHED_MSG_LOG::MSG_NORMAL, "[HOST#%d] [RESULT#%d %s] got result (DB: server_state=%d outcome=%d client_state=%d validate_state=%d delete_state=%d)\n",
reply.host.id, srip->id, srip->name, srip->server_state, srip->outcome, srip->client_state, srip->validate_state, srip->file_delete_state
);
// Do various sanity checks.
// If one of them fails, set srip->id = 0,
// which suppresses the DB update later on
//
// If result has server_state OVER
// if outcome NO_REPLY accept it (it's just late).
// else ignore it
//
if (srip->server_state == RESULT_SERVER_STATE_OVER) {
const char *dont_replace_result = NULL;
switch (srip->outcome) {
case RESULT_OUTCOME_INIT:
// should never happen!
dont_replace_result = "this result was never sent";
break;
case RESULT_OUTCOME_SUCCESS:
// don't replace a successful result!
dont_replace_result = "result already reported as success";
break;
case RESULT_OUTCOME_COULDNT_SEND:
// should never happen!
dont_replace_result = "this result couldn't be sent";
break;
case RESULT_OUTCOME_CLIENT_ERROR:
// should never happen!
dont_replace_result = "result already reported as error";
break;
case RESULT_OUTCOME_CLIENT_DETACHED:
case RESULT_OUTCOME_NO_REPLY:
// result is late in arriving, but keep it anyhow
break;
case RESULT_OUTCOME_DIDNT_NEED:
// should never happen
dont_replace_result = "this result wasn't sent (not needed)";
break;
case RESULT_OUTCOME_VALIDATE_ERROR:
// we already passed through the validator, so
// don't keep the new result
dont_replace_result = "result already reported, validate error";
break;
default:
dont_replace_result = "server logic bug; please alert BOINC developers";
break;
}
if (dont_replace_result) {
char buf[256];
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] [RESULT#%d %s] result already over [outcome=%d validate_state=%d]: %s\n",
reply.host.id, srip->id, srip->name, srip->outcome, srip->validate_state, dont_replace_result
);
sprintf(buf, "Completed result %s refused: %s", srip->name, dont_replace_result);
USER_MESSAGE um(buf, "high");
reply.insert_message(um);
srip->id = 0;
reply.result_acks.push_back(std::string(rp->name));
continue;
}
}
if (srip->server_state == RESULT_SERVER_STATE_UNSENT) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] [RESULT#%d %s] got unexpected result: server state is %d\n",
reply.host.id, srip->id, srip->name, srip->server_state
);
srip->id = 0;
reply.result_acks.push_back(std::string(rp->name));
continue;
}
if (srip->received_time) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] [RESULT#%d %s] got result twice\n",
reply.host.id, srip->id, srip->name
);
srip->id = 0;
reply.result_acks.push_back(std::string(rp->name));
continue;
}
if (srip->hostid != reply.host.id) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] [RESULT#%d %s] got result from wrong host; expected [HOST#%d]\n",
reply.host.id, srip->id, srip->name, srip->hostid
);
DB_HOST result_host;
retval = result_host.lookup_id(srip->hostid);
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[RESULT#%d %s] Can't lookup [HOST#%d]\n",
srip->id, srip->name, srip->hostid
);
srip->id = 0;
reply.result_acks.push_back(std::string(rp->name));
continue;
} else if (result_host.userid != reply.host.userid) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[USER#%d] [HOST#%d] [RESULT#%d %s] Not even the same user; expected [USER#%d]\n",
reply.host.userid, reply.host.id, srip->id, srip->name, result_host.userid
);
srip->id = 0;
reply.result_acks.push_back(std::string(rp->name));
continue;
} else {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] [RESULT#%d %s] Allowing result because same USER#%d\n",
reply.host.id, srip->id, srip->name, reply.host.userid
);
changed_host = true;
}
} // hostids do not match
// Modify the in-memory copy obtained from the DB earlier.
// If we found a problem above,
// we have continued and skipped this modify
//
srip->hostid = reply.host.id;
srip->teamid = reply.user.teamid;
srip->received_time = time(0);
srip->client_state = rp->client_state;
srip->cpu_time = rp->cpu_time;
// check for impossible CPU time
//
double elapsed_time = srip->received_time - srip->sent_time;
if (elapsed_time < 0) {
log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,
"[HOST#%d] [RESULT#%d] inconsistent sent/received times\n", srip->hostid, srip->id
);
} else {
if (srip->cpu_time > elapsed_time) {
log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,
"[HOST#%d] [RESULT#%d] excessive CPU time: reported %f > elapsed %f%s\n",
srip->hostid, srip->id, srip->cpu_time, elapsed_time, changed_host?" [OK: HOST changed]":""
);
if (!changed_host) srip->cpu_time = elapsed_time;
}
}
srip->exit_status = rp->exit_status;
srip->app_version_num = rp->app_version_num;
if (rp->fpops_cumulative || rp->intops_cumulative) {
srip->claimed_credit = fpops_to_credit(rp->fpops_cumulative, rp->intops_cumulative);
} else if (rp->fpops_per_cpu_sec || rp->intops_per_cpu_sec) {
srip->claimed_credit = fpops_to_credit(
rp->fpops_per_cpu_sec*srip->cpu_time,
rp->intops_per_cpu_sec*srip->cpu_time
);
} else {
srip->claimed_credit = srip->cpu_time * reply.host.claimed_credit_per_cpu_sec;
}
log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,
"cpu %f cpcs %f, cc %f\n", srip->cpu_time, reply.host.claimed_credit_per_cpu_sec, srip->claimed_credit
);
srip->server_state = RESULT_SERVER_STATE_OVER;
strlcpy(srip->stderr_out, rp->stderr_out, sizeof(srip->stderr_out));
strlcpy(srip->xml_doc_out, rp->xml_doc_out, sizeof(srip->xml_doc_out));
// look for exit status and app version in stderr_out
// (historical - can be deleted at some point)
//
parse_int(srip->stderr_out, "<exit_status>", srip->exit_status);
parse_int(srip->stderr_out, "<app_version>", srip->app_version_num);
if ((srip->client_state == RESULT_FILES_UPLOADED) && (srip->exit_status == 0)) {
srip->outcome = RESULT_OUTCOME_SUCCESS;
log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,
"[RESULT#%d %s]: setting outcome SUCCESS\n",
srip->id, srip->name
);
reply.got_good_result();
} else {
log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG,
"[RESULT#%d %s]: client_state %d exit_status %d; setting outcome ERROR\n",
srip->id, srip->name, srip->client_state, srip->exit_status
);
srip->outcome = RESULT_OUTCOME_CLIENT_ERROR;
srip->validate_state = VALIDATE_STATE_INVALID;
reply.got_bad_result();
}
} // loop over all incoming results
// Update the result records
// (skip items that we previously marked to skip)
//
for (i=0; i<result_handler.results.size(); i++) {
SCHED_RESULT_ITEM& sri = result_handler.results[i];
if (sri.id == 0) continue;
retval = result_handler.update_result(sri);
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] [RESULT#%d %s] can't update result: %s\n",
reply.host.id, sri.id, sri.name, boinc_db.error_string()
);
} else {
reply.result_acks.push_back(std::string(sri.name));
}
}
// set transition_time for the results' WUs
//
retval = result_handler.update_workunits();
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[HOST#%d] can't update WUs: %d\n",
reply.host.id, retval
);
}
return 0;
}

20
sched/sched_result.h Normal file
View File

@ -0,0 +1,20 @@
// Berkeley Open Infrastructure for Network Computing
// http://boinc.berkeley.edu
// Copyright (C) 2008 University of California
//
// This is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation;
// either version 2.1 of the License, or (at your option) any later version.
//
// This software is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU Lesser General Public License for more details.
//
// To view the GNU Lesser General Public License visit
// http://www.gnu.org/copyleft/lesser.html
// or write to the Free Software Foundation, Inc.,
// 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
extern int handle_results(SCHEDULER_REQUEST&, SCHEDULER_REPLY&);

View File

@ -47,6 +47,7 @@ using namespace std;
#include "hr.h"
#include "sched_locality.h"
#include "sched_timezone.h"
#include "sched_assign.h"
#include "sched_send.h"
@ -98,11 +99,11 @@ bool SCHEDULER_REQUEST::has_version(APP& app) {
//
int get_app_version(
WORKUNIT& wu, APP* &app, APP_VERSION* &avp,
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply,
SCHED_SHMEM& ss
) {
bool found;
if (anonymous(platforms.list[0])) {
if (anonymous(sreq.platforms.list[0])) {
app = ss.lookup_app(wu.appid);
found = sreq.has_version(*app);
if (!found) {
@ -113,7 +114,7 @@ int get_app_version(
}
avp = NULL;
} else {
found = find_app_version(reply.wreq, wu, platforms, ss, app, avp);
found = find_app_version(sreq, reply.wreq, wu, ss, app, avp);
if (!found) {
log_messages.printf(SCHED_MSG_LOG::MSG_DEBUG, "Didn't find app version\n");
return ERR_NO_APP_VERSION;
@ -550,7 +551,7 @@ int insert_wu_tags(WORKUNIT& wu, APP& app) {
// return false if none
//
bool find_app_version(
WORK_REQ& wreq, WORKUNIT& wu, PLATFORM_LIST& platforms, SCHED_SHMEM& ss,
SCHEDULER_REQUEST& sreq, WORK_REQ& wreq, WORKUNIT& wu, SCHED_SHMEM& ss,
APP*& app, APP_VERSION*& avp
) {
app = ss.lookup_app(wu.appid);
@ -561,15 +562,15 @@ bool find_app_version(
return false;
}
unsigned int i;
for (i=0; i<platforms.list.size(); i++) {
PLATFORM* p = platforms.list[i];
for (i=0; i<sreq.platforms.list.size(); i++) {
PLATFORM* p = sreq.platforms.list[i];
avp = ss.lookup_app_version(app->id, p->id, app->min_version);
if (avp) return true;
}
log_messages.printf(
SCHED_MSG_LOG::MSG_DEBUG,
"no app version available: APP#%d PLATFORM#%d min_version %d\n",
app->id, platforms.list[0]->id, app->min_version
app->id, sreq.platforms.list[0]->id, app->min_version
);
wreq.no_app_version = true;
return false;
@ -593,12 +594,10 @@ bool app_core_compatible(WORK_REQ& wreq, APP_VERSION& av) {
}
// add the given workunit to a reply.
// look up its app, and make sure there's a version for this platform.
// Add the app and app_version to the reply also.
//
int add_wu_to_reply(
WORKUNIT& wu, SCHEDULER_REPLY& reply, PLATFORM_LIST& ,
APP* app, APP_VERSION* avp
WORKUNIT& wu, SCHEDULER_REPLY& reply, APP* app, APP_VERSION* avp
) {
int retval;
WORKUNIT wu2, wu3;
@ -772,14 +771,13 @@ void SCHEDULER_REPLY::got_bad_result() {
int add_result_to_reply(
DB_RESULT& result, WORKUNIT& wu, SCHEDULER_REQUEST& request,
SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
APP* app, APP_VERSION* avp
SCHEDULER_REPLY& reply, APP* app, APP_VERSION* avp
) {
int retval;
double wu_seconds_filled;
bool resent_result = false;
retval = add_wu_to_reply(wu, reply, platforms, app, avp);
retval = add_wu_to_reply(wu, reply, app, avp);
if (retval) return retval;
// in the scheduling locality case,
@ -927,8 +925,7 @@ int add_result_to_reply(
}
void send_work(
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, PLATFORM_LIST& platforms,
SCHED_SHMEM& ss
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply, SCHED_SHMEM& ss
) {
char helpful[512];
@ -945,7 +942,7 @@ void send_work(
reply.wreq.beta_only = false;
log_messages.printf(
SCHED_MSG_LOG::MSG_NORMAL,
SCHED_MSG_LOG::MSG_DEBUG,
"[HOST#%d] got request for %f seconds of work; available disk %f GB\n",
reply.host.id, sreq.work_req_seconds, reply.wreq.disk_available/1e9
);
@ -960,6 +957,12 @@ void send_work(
reply.wreq.seconds_to_fill = MIN_SECONDS_TO_SEND;
}
if (config.enable_assignment) {
if (send_assigned_jobs(sreq, reply)) {
return;
}
}
if (config.workload_sim && sreq.have_other_results_list) {
init_ip_results(
sreq.global_prefs.work_buf_min(), reply.host.p_ncpus, sreq.ip_results
@ -968,14 +971,14 @@ void send_work(
if (config.locality_scheduling) {
reply.wreq.infeasible_only = false;
send_work_locality(sreq, reply, platforms, ss);
send_work_locality(sreq, reply, ss);
} else {
// give top priority to results that require a 'reliable host'
//
if (reply.wreq.host_info.reliable) {
reply.wreq.reliable_only = true;
reply.wreq.infeasible_only = false;
scan_work_array(sreq, reply, platforms, ss);
scan_work_array(sreq, reply, ss);
}
reply.wreq.reliable_only = false;
@ -990,17 +993,17 @@ void send_work(
"[HOST#%d] will accept beta work. Scanning for beta work.\n",
reply.host.id
);
scan_work_array(sreq, reply, platforms, ss);
scan_work_array(sreq, reply, ss);
}
reply.wreq.beta_only = false;
// give next priority to results that were infeasible for some other host
//
reply.wreq.infeasible_only = true;
scan_work_array(sreq, reply, platforms, ss);
scan_work_array(sreq, reply, ss);
reply.wreq.infeasible_only = false;
scan_work_array(sreq, reply, platforms, ss);
scan_work_array(sreq, reply, ss);
}
log_messages.printf(

View File

@ -20,22 +20,22 @@
extern int get_app_version(
WORKUNIT& wu, APP* &app, APP_VERSION* &avp,
SCHEDULER_REQUEST& sreq, SCHEDULER_REPLY& reply,
PLATFORM_LIST& platforms, SCHED_SHMEM& ss
SCHED_SHMEM& ss
);
extern void send_work(
SCHEDULER_REQUEST&, SCHEDULER_REPLY&, PLATFORM_LIST&, SCHED_SHMEM&
SCHEDULER_REQUEST&, SCHEDULER_REPLY&, SCHED_SHMEM&
);
extern int add_result_to_reply(
DB_RESULT& result, WORKUNIT& wu, SCHEDULER_REQUEST&, SCHEDULER_REPLY&,
PLATFORM_LIST&, APP* app, APP_VERSION* avp
APP* app, APP_VERSION* avp
);
extern bool anonymous(PLATFORM*);
extern bool find_app_version(
WORK_REQ& wreq, WORKUNIT& wu, PLATFORM_LIST& platform, SCHED_SHMEM& ss,
SCHEDULER_REQUEST&, WORK_REQ& wreq, WORKUNIT& wu, SCHED_SHMEM& ss,
APP*& app, APP_VERSION*& avp
);

View File

@ -43,10 +43,12 @@ void SCHED_SHMEM::init(int nwu_results) {
platform_size = sizeof(PLATFORM);
app_size = sizeof(APP);
app_version_size = sizeof(APP_VERSION);
assignment_size = sizeof(ASSIGNMENT);
wu_result_size = sizeof(WU_RESULT);
max_platforms = MAX_PLATFORMS;
max_apps = MAX_APPS;
max_app_versions = MAX_APP_VERSIONS;
max_assignments = MAX_ASSIGNMENTS;
max_wu_results = nwu_results;
}
@ -61,10 +63,12 @@ int SCHED_SHMEM::verify() {
if (platform_size != sizeof(PLATFORM)) return error_return("platform");
if (app_size != sizeof(APP)) return error_return("app");
if (app_version_size != sizeof(APP_VERSION)) return error_return("app_version");
if (assignment_size != sizeof(ASSIGNMENT)) return error_return("assignment");
if (wu_result_size != sizeof(WU_RESULT)) return error_return("wu_result");
if (max_platforms != MAX_PLATFORMS) return error_return("max platform");
if (max_apps != MAX_APPS) return error_return("max apps");
if (max_app_versions != MAX_APP_VERSIONS) return error_return("max app version");
if (max_app_versions != MAX_APP_VERSIONS) return error_return("max app versions");
if (max_assignments != MAX_ASSIGNMENTS) return error_return("max assignments");
return 0;
}
@ -84,6 +88,7 @@ int SCHED_SHMEM::scan_tables() {
DB_PLATFORM platform;
DB_APP app;
DB_APP_VERSION app_version;
DB_ASSIGNMENT assignment;
int n;
n = 0;
@ -126,6 +131,15 @@ int SCHED_SHMEM::scan_tables() {
}
napp_versions = n;
n = 0;
while (!assignment.enumerate()) {
assignments[n++] = assignment;
if (n == MAX_ASSIGNMENTS) {
overflow("assignments", "MAX_ASSIGNMENTS");
}
}
nassignments = n;
return 0;
}

View File

@ -35,6 +35,7 @@
#define MAX_PLATFORMS 50
#define MAX_APPS 10
#define MAX_APP_VERSIONS 50
#define MAX_ASSIGNMENTS 10
// Default number of work items in shared mem.
// You can configure this in config.xml (<shmem_work_items>)
@ -71,20 +72,22 @@ struct SCHED_SHMEM {
int platform_size; // sizeof(PLATFORM)
int app_size; // sizeof(APP)
int app_version_size; // sizeof(APP_VERSION)
int assignment_size; // sizeof(ASSIGNMENT))
int wu_result_size; // sizeof(WU_RESULT)
int nplatforms;
int napps;
double app_weights;
int napp_versions;
int ncore_versions;
int nassignments;
int max_platforms;
int max_apps;
int max_app_versions;
int max_core_versions;
int max_assignments;
int max_wu_results;
PLATFORM platforms[MAX_PLATFORMS];
APP apps[MAX_APPS];
APP_VERSION app_versions[MAX_APP_VERSIONS];
ASSIGNMENT assignments[MAX_ASSIGNMENTS];
WU_RESULT wu_results[0];
void init(int nwu_results);

View File

@ -182,6 +182,7 @@ struct SCHEDULER_REQUEST {
char authenticator[256];
CLIENT_PLATFORM platform;
std::vector<CLIENT_PLATFORM> alt_platforms;
PLATFORM_LIST platforms;
char cross_project_id[256];
int hostid; // zero if first RPC
int core_client_major_version;

View File

@ -99,6 +99,13 @@ int handle_wu(
bool all_over_and_validated, have_new_result_to_validate, do_delete;
unsigned int i;
TRANSITIONER_ITEM& wu_item = items[0];
TRANSITIONER_ITEM wu_item_original = wu_item;
if (config.enable_assignment && strstr(wu_item.name, "asgn")) {
return 0;
}
// count up the number of results in various states,
// and check for timed-out results
//
@ -114,9 +121,6 @@ int handle_wu(
have_new_result_to_validate = false;
int rs, max_result_suffix = -1;
TRANSITIONER_ITEM& wu_item = items[0];
TRANSITIONER_ITEM wu_item_original = wu_item;
// Scan the WU's results, and find the canonical result if there is one
//
canonical_result_index = -1;
@ -356,18 +360,18 @@ int handle_wu(
char rtfpath[256];
sprintf(rtfpath, "../%s", wu_item.result_template_file);
int priority_increase = 0;
if ( nover && config.reliable_priority_on_over ) {
priority_increase = priority_increase + config.reliable_priority_on_over;
if (nover && config.reliable_priority_on_over) {
priority_increase += config.reliable_priority_on_over;
} else if (nover && !nerrors && config.reliable_priority_on_over_except_error) {
priority_increase = priority_increase + config.reliable_priority_on_over_except_error;
priority_increase += config.reliable_priority_on_over_except_error;
}
retval = create_result(
retval = create_result_ti(
wu_item, rtfpath, suffix, key, config, value_buf, priority_increase
);
if (retval) {
log_messages.printf(
SCHED_MSG_LOG::MSG_CRITICAL,
"[WU#%d %s] create_result() %d\n",
"[WU#%d %s] create_result_ti() %d\n",
wu_item.id, wu_item.name, retval
);
return retval;

View File

@ -2,7 +2,7 @@ include $(top_srcdir)/Makefile.incl
bin_PROGRAMS = create_work sign_executable dir_hier_path dir_hier_move
EXTRA_DIST = make_project add xadd update_versions dbcheck_files_exist upgrade makelog.sh cleanlogs.sh vote_monitor
EXTRA_DIST = make_project xadd update_versions dbcheck_files_exist upgrade makelog.sh cleanlogs.sh vote_monitor
# TODO: use libboinc for these:

View File

@ -158,7 +158,9 @@ static void write_md5_info(
return;
}
// process WU template
// fill in the workunit's XML document (wu.xml_doc)
// by scanning the WU template, macro-substituting the input files,
// and putting in the command line element and additional XML
//
static int process_wu_template(
WORKUNIT& wu,
@ -176,6 +178,7 @@ static int process_wu_template(
double nbytes;
char open_name[256];
bool found=false;
int nfiles_parsed = 0;
out = "";
for (p=strtok(tmplate, "\n"); p; p=strtok(0, "\n")) {
@ -202,6 +205,7 @@ static int process_wu_template(
);
return ERR_XML_PARSE;
}
nfiles_parsed++;
if (generated_locally) {
sprintf(buf,
" <name>%s</name>\n"
@ -345,7 +349,14 @@ static int process_wu_template(
}
}
if (!found) {
fprintf(stderr, "create_work: bad WU template - no <workunit>\n");
fprintf(stderr, "process_wu_template: bad WU template - no <workunit>\n");
return -1;
}
if (nfiles_parsed != ninfiles) {
fprintf(stderr,
"process_wu_template: %d input files listed, but template has %d\n",
ninfiles, nfiles_parsed
);
return -1;
}
if (out.size() > sizeof(wu.xml_doc)-1) {
@ -361,7 +372,7 @@ static int process_wu_template(
// initialize an about-to-be-created result, given its WU
//
static void initialize_result(DB_RESULT& result, TRANSITIONER_ITEM& wu) {
static void initialize_result(DB_RESULT& result, WORKUNIT& wu) {
result.id = 0;
result.create_time = time(0);
result.workunitid = wu.id;
@ -384,18 +395,48 @@ static void initialize_result(DB_RESULT& result, TRANSITIONER_ITEM& wu) {
result.batch = wu.batch;
}
// Create a new result for the given WU.
// This is called ONLY from the transitioner
//
int create_result(
TRANSITIONER_ITEM& wu,
int create_result_ti(
TRANSITIONER_ITEM& ti,
char* result_template_filename,
char* result_name_suffix,
R_RSA_PRIVATE_KEY& key,
SCHED_CONFIG& config,
char* query_string,
int priority_increase
// if nonzero, write value list here; else do insert
int priority_increase
) {
WORKUNIT wu;
// copy relevant fields from TRANSITIONER_ITEM to WORKUNIT
//
strcpy(wu.name, ti.name);
wu.id = ti.id;
wu.appid = ti.appid;
wu.priority = ti.priority;
wu.batch = ti.batch;
return create_result(
wu,
result_template_filename,
result_name_suffix,
key,
config,
query_string,
priority_increase
);
}
// Create a new result for the given WU.
// This is called ONLY from the transitioner
//
int create_result(
WORKUNIT& wu,
char* result_template_filename,
char* result_name_suffix,
R_RSA_PRIVATE_KEY& key,
SCHED_CONFIG& config,
char* query_string,
// if nonzero, write value list here; else do insert
int priority_increase
) {
DB_RESULT result;
char base_outfile_name[256];
@ -562,7 +603,11 @@ int create_work(
fprintf(stderr, "no max_success_results given; can't create job\n");
return ERR_NO_OPTION;
}
wu.transition_time = time(0);
if (strstr(wu.name, "asgn")) {
wu.transition_time = INT_MAX;
} else {
wu.transition_time = time(0);
}
retval = wu.insert();
if (retval) {
fprintf(stderr, "create_work: workunit.insert() %d\n", retval);

View File

@ -40,6 +40,16 @@ extern int read_filename(const char* path, char* buf, int len);
extern void initialize_result(DB_RESULT&, DB_WORKUNIT&);
extern int create_result(
WORKUNIT&,
char* result_template_filename,
char* suffix,
R_RSA_PRIVATE_KEY& key,
SCHED_CONFIG& config,
char* query_string=0,
int priority_increase=0
);
extern int create_result_ti(
TRANSITIONER_ITEM&,
char* result_template_filename,
char* suffix,

View File

@ -24,24 +24,30 @@
// and there must be a valid config.xml file there
//
// create_work
// -appname name
// -wu_name name
// -wu_template filename relative to project root; usually in templates/
// -result_template filename relative to project root; usually in templates/
// [ -config_dir path ]
// [ -batch n ]
// --appname name
// --wu_name name
// --wu_template filename relative to project root; usually in templates/
// --result_template filename relative to project root; usually in templates/
// [ --config_dir path ]
// [ --batch n ]
// the following can be supplied in WU template; see defaults below
// [ -rsc_fpops_est n ]
// [ -rsc_fpops_bound n ]
// [ -rsc_memory_bound n ]
// [ -rsc_disk_bound n ]
// [ -delay_bound x ]
// [ -min_quorum x ]
// [ -target_nresults x ]
// [ -max_error_results x ]
// [ -max_total_results x ]
// [ -max_success_results x ]
// [ -additional_xml x ]
// [ --rsc_fpops_est n ]
// [ --rsc_fpops_bound n ]
// [ --rsc_memory_bound n ]
// [ --rsc_disk_bound n ]
// [ --delay_bound x ]
// [ --min_quorum x ]
// [ --target_nresults x ]
// [ --max_error_results x ]
// [ --max_total_results x ]
// [ --max_success_results x ]
// [ --additional_xml x ]
// [ --assign_all ]
// [ --assign_host ID ]
// [ --assign_user_one ID ]
// [ --assign_user_all ID ]
// [ --assign_team_one ID ]
// [ --assign_team_all ID ]
// infile1 infile2 ...
//
@ -56,6 +62,15 @@
#include "backend_lib.h"
#include "sched_config.h"
bool arg(const char** argv, int i, const char* name) {
char buf[256];
sprintf(buf, "-%s", name);
if (!strcmp(argv[i], buf)) return true;
sprintf(buf, "--%s", name);
if (!strcmp(argv[i], buf)) return true;
return false;
}
int main(int argc, const char** argv) {
DB_APP app;
DB_WORKUNIT wu;
@ -70,6 +85,10 @@ int main(int argc, const char** argv) {
char buf[256];
char additional_xml[256];
SCHED_CONFIG config;
bool assign_flag = false;
bool assign_multi = false;
int assign_id = 0;
int assign_type;
strcpy(result_template_file, "");
strcpy(app.name, "");
@ -94,50 +113,75 @@ int main(int argc, const char** argv) {
wu.delay_bound = 100000;
while (i < argc) {
if (!strcmp(argv[i], "-appname")) {
if (arg(argv, i, "appname")) {
strcpy(app.name, argv[++i]);
} else if (!strcmp(argv[i], "-wu_name")) {
} else if (arg(argv, i, "wu_name")) {
strcpy(wu.name, argv[++i]);
} else if (!strcmp(argv[i], "-wu_template")) {
} else if (arg(argv, i, "wu_template")) {
strcpy(wu_template_file, argv[++i]);
} else if (!strcmp(argv[i], "-result_template")) {
} else if (arg(argv, i, "result_template")) {
strcpy(result_template_file, argv[++i]);
} else if (!strcmp(argv[i], "-batch")) {
} else if (arg(argv, i, "batch")) {
wu.batch = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-config_dir")) {
} else if (arg(argv, i, "config_dir")) {
config_dir = argv[++i];
} else if (!strcmp(argv[i], "-batch")) {
} else if (arg(argv, i, "batch")) {
wu.batch = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-priority")) {
} else if (arg(argv, i, "priority")) {
wu.priority = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-rsc_fpops_est")) {
} else if (arg(argv, i, "rsc_fpops_est")) {
wu.rsc_fpops_est = atof(argv[++i]);
} else if (!strcmp(argv[i], "-rsc_fpops_bound")) {
} else if (arg(argv, i, "rsc_fpops_bound")) {
wu.rsc_fpops_bound = atof(argv[++i]);
} else if (!strcmp(argv[i], "-rsc_memory_bound")) {
} else if (arg(argv, i, "rsc_memory_bound")) {
wu.rsc_memory_bound = atof(argv[++i]);
} else if (!strcmp(argv[i], "-rsc_disk_bound")) {
} else if (arg(argv, i, "rsc_disk_bound")) {
wu.rsc_disk_bound = atof(argv[++i]);
} else if (!strcmp(argv[i], "-delay_bound")) {
} else if (arg(argv, i, "delay_bound")) {
wu.delay_bound = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-min_quorum")) {
} else if (arg(argv, i, "min_quorum")) {
wu.min_quorum = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-target_nresults")) {
} else if (arg(argv, i, "target_nresults")) {
wu.target_nresults = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-max_error_results")) {
} else if (arg(argv, i, "max_error_results")) {
wu.max_error_results = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-max_total_results")) {
} else if (arg(argv, i, "max_total_results")) {
wu.max_total_results = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-max_success_results")) {
} else if (arg(argv, i, "max_success_results")) {
wu.max_success_results = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-opaque")) {
} else if (arg(argv, i, "opaque")) {
wu.opaque = atoi(argv[++i]);
} else if (!strcmp(argv[i], "-command_line")) {
} else if (arg(argv, i, "command_line")) {
command_line= argv[++i];
} else if (!strcmp(argv[i], "-additional_xml")) {
} else if (arg(argv, i, "additional_xml")) {
strcpy(additional_xml, argv[++i]);
} else if (arg(argv, i, "assign_all")) {
assign_flag = true;
assign_type = ASSIGN_NONE;
} else if (arg(argv, i, "assign_host")) {
assign_flag = true;
assign_type = ASSIGN_HOST;
assign_id = atoi(argv[++i]);
} else if (arg(argv, i, "assign_user_one")) {
assign_flag = true;
assign_type = ASSIGN_USER;
assign_id = atoi(argv[++i]);
} else if (arg(argv, i, "assign_user_all")) {
assign_flag = true;
assign_type = ASSIGN_USER;
assign_multi = true;
assign_id = atoi(argv[++i]);
} else if (arg(argv, i, "assign_team_one")) {
assign_flag = true;
assign_type = ASSIGN_TEAM;
assign_id = atoi(argv[++i]);
} else if (arg(argv, i, "assign_team_all")) {
assign_flag = true;
assign_type = ASSIGN_TEAM;
assign_multi = true;
assign_id = atoi(argv[++i]);
} else {
if (!strncmp("-",argv[i],1)) {
if (!strncmp("-", argv[i], 1)) {
fprintf(stderr, "create_work: bad argument '%s'\n", argv[i]);
exit(1);
}
@ -158,6 +202,12 @@ int main(int argc, const char** argv) {
#undef CHKARG
#undef CHKARG_STR
if (assign_flag) {
if (!strstr(wu.name, "asgn")) {
fprintf(stderr, "Assigned WU names must contain 'asgn'\n");
exit(1);
}
}
retval = config.parse_file(config_dir);
if (retval) {
fprintf(stderr, "Can't parse config file: %d\n", retval);
@ -207,6 +257,20 @@ int main(int argc, const char** argv) {
fprintf(stderr, "create_work: %d\n", retval);
exit(1);
}
if (assign_flag) {
DB_ASSIGNMENT assignment;
assignment.clear();
assignment.create_time = time(0);
assignment.target_id = assign_id;
assignment.target_type = assign_type;
assignment.multi = assign_multi;
assignment.workunitid = wu.id;
retval = assignment.insert();
if (retval) {
fprintf(stderr, "assignment.insert() failed: %d\n", retval);
exit(1);
}
}
boinc_db.close();
}