Remote job submission: allow efficient batch query

The batch query call used by Condor (query_batch_set(), in the C++ API)
returned info about all the jobs in the set of batches,
even those that hadn't changed.
This is potentially inefficient - a query might return info
about 10,000 jobs, only a few (or none) of which have changed state
since the last call.

Solution: add a "min_mod_time" parameter to the call.
Only jobs that have changed state since that time are reported.
Also, add a "server_time" field to the return,
giving the current time on the server
(in case there's clock skew between client and server)

Also, fix some text scrambling introduced in previous checkin;
there must have been a gremlin in my vim.
This commit is contained in:
David Anderson 2014-01-16 10:24:10 -08:00
parent be4c61140e
commit 01b78c714a
6 changed files with 36 additions and 12 deletions

View File

@ -941,6 +941,10 @@ function show_badges($is_user, $item) {
row2("Badges", $x);
}
function dtime() {
return microtime(true);
}
$cvs_version_tracker[]="\$Id$"; //Generated automatically - do not edit
?>

View File

@ -89,7 +89,7 @@ function check_max_jobs_in_progress($r, $user_submit) {
$n = $db->get_int($query);
if ($n === false) return;
if ($n + count($r->batch->job) > $user_submit->max_jobs_in_progress) {
xml_error(,1 "BOINC server: limit on jobs in progress exceeded");
xml_error(-1, "BOINC server: limit on jobs in progress exceeded");
}
}
@ -121,6 +121,12 @@ $fanout = parse_config(get_config(), "<uldl_dir_fanout>");
function stage_file($file) {
global $fanout;
switch ($file->mode) {
case "semilocal":
case "local":
$md5 = md5_file($file->source);
if (!$md5) {
xml_error(-1, "BOINC server: Can't get MD5 of file $file->source");
}
$name = "jf_$md5";
$path = dir_hier_path($name, "../../download", $fanout);
@ -230,12 +236,6 @@ function submit_batch($r) {
xml_error(-1, "BOINC server: no rsc_fpops_est given");
}
}
if ($x) {
$total_flops += $x;
} else {
xml_error(-1, "BOINC server: no rsc_fpops_est given");
}
}
}
$cmd = "cd ../../bin; ./adjust_user_priority --user $user->id --flops $total_flops --app $app->name";
$x = exec($cmd);
@ -390,9 +390,18 @@ function query_batch2($r) {
$batches[] = $batch;
}
$min_mod_time = (double)$r->min_mod_time;
if ($min_mod_time) {
$mod_time_clause = "and mod_time > FROM_UNIXTIME($min_mod_time)";
} else {
$mod_time_clause = "";
}
$t = dtime();
echo "<server_time>$t</server_time>\n";
echo "<jobs>\n";
foreach ($batches as $batch) {
$wus = BoincWorkunit::enum("batch = $batch->id");
$wus = BoincWorkunit::enum("batch = $batch->id $mod_time_clause");
echo " <batch_size>".count($wus)."</batch_size>\n";
foreach ($wus as $wu) {
if ($wu->canonical_resultid) {

View File

@ -397,6 +397,7 @@ int submit_jobs(
int query_batch_set(
const char* project_url,
const char* authenticator,
double min_mod_time,
vector<string> &batch_names,
QUERY_BATCH_SET_REPLY& qb_reply,
string& error_msg
@ -408,6 +409,8 @@ int query_batch_set(
request = "<query_batch2>\n";
sprintf(buf, "<authenticator>%s</authenticator>\n", authenticator);
request += string(buf);
sprintf(buf, "<min_mod_time>%f</min_mod_time>\n", min_mod_time);
request += string(buf);
for (unsigned int i=0; i<batch_names.size(); i++) {
sprintf(buf, "<batch_name>%s</batch_name>\n", batch_names[i].c_str());
request += string(buf);
@ -423,6 +426,7 @@ int query_batch_set(
}
fseek(reply, 0, SEEK_SET);
retval = -1;
qb_reply.server_time = 0;
error_msg = "";
while (fgets(buf, 256, reply)) {
#ifdef SHOW_REPLY
@ -434,6 +438,7 @@ int query_batch_set(
}
if (parse_int(buf, "<error_num>", retval)) continue;
if (parse_str(buf, "<error_msg>", error_msg)) continue;
if (parse_double(buf, "<server_time>", qb_reply.server_time)) continue;
if (parse_int(buf, "<batch_size>", batch_size)) {
qb_reply.batch_sizes.push_back(batch_size);
continue;

View File

@ -58,8 +58,9 @@ struct JOB_STATUS {
};
struct QUERY_BATCH_SET_REPLY {
double server_time; // server time at start of query
vector<int> batch_sizes; // how many jobs in each of the queried batches
vector<JOB_STATUS> jobs; // the jobs, sequentially
vector<JOB_STATUS> jobs; // the jobs, sequentially
};
struct OUTFILE {
@ -153,6 +154,7 @@ extern int estimate_batch(
extern int query_batch_set(
const char* project_url,
const char* authenticator,
double min_mod_time,
vector<string> &batch_names,
QUERY_BATCH_SET_REPLY& reply,
string& error_msg

View File

@ -83,6 +83,7 @@ struct COMMAND {
vector<string> batch_names;
char batch_name[256];
double lease_end_time;
double min_mod_time;
COMMAND(char* _in) {
in = _in;
@ -296,6 +297,7 @@ void handle_submit(COMMAND& c) {
}
int COMMAND::parse_query_batches(char* p) {
min_mod_time = atof(strtok_r(NULL, " ", &p));
int n = atoi(strtok_r(NULL, " ", &p));
for (int i=0; i<n; i++) {
char* q = strtok_r(NULL, " ", &p);
@ -304,18 +306,20 @@ int COMMAND::parse_query_batches(char* p) {
return 0;
}
void handle_query_batches(COMMAND&c) {
void handle_query_batches(COMMAND& c) {
QUERY_BATCH_SET_REPLY reply;
char buf[256];
string error_msg, s;
int retval = query_batch_set(
project_url, authenticator, c.batch_names, reply, error_msg
project_url, authenticator, c.min_mod_time, c.batch_names, reply, error_msg
);
if (retval) {
sprintf(buf, "error\\ querying\\ batch:\\ %d\\ ", retval);
s = string(buf) + escape_str(error_msg);
} else {
s = string("NULL");
sprintf(buf, " %f", reply.server_time);
s += string(buf);
int i = 0;
for (unsigned int j=0; j<reply.batch_sizes.size(); j++) {
int n = reply.batch_sizes[j];

View File

@ -17,7 +17,7 @@ $job_name_1 = "job1_$t";
$job_name_2 = "job2_$t";
echo "
BOINC_SUBMIT 1 $batch_name uppercase 2 $job_name_1 0 1 in1 in $job_name_2 0 1 in2 in 1
BOINC_QUERY_BATCHES 2 1 $batch_name
BOINC_QUERY_BATCHES 2 0 1 $batch_name
BOINC_ABORT_JOBS 6 $job_name_1 $job_name_2
BOINC_FETCH_OUTPUT 3 $job_name_1 . stderr_out1 ALL 1 out out1
BOINC_FETCH_OUTPUT 4 $job_name_2 . stderr_out2 ALL 1 out out2