- Remote job submission stuff for Condor.

Submit and Query are more or less working.
This commit is contained in:
David Anderson 2013-01-31 14:06:17 -08:00 committed by Oliver Bock
parent 9342fe696b
commit 46f06b9350
6 changed files with 190 additions and 23 deletions

View File

@ -869,6 +869,7 @@ function flops_to_credit($f) {
function credit_to_gflop_hours($c) { function credit_to_gflop_hours($c) {
return $c/(200/24); return $c/(200/24);
} }
function do_download($path,$name="") { function do_download($path,$name="") {
if (strcmp($name,"") == 0) { if (strcmp($name,"") == 0) {
$name=basename($path); $name=basename($path);

View File

@ -50,7 +50,6 @@ function get_output_file($instance_name, $file_num, $auth_str) {
$path = dir_hier_path($name, $upload_dir, $fanout); $path = dir_hier_path($name, $upload_dir, $fanout);
if (!is_file($path)) die("no such file $path"); if (!is_file($path)) die("no such file $path");
do_download($path); do_download($path);
} }
// get all the output files of a batch (canonical instances only) // get all the output files of a batch (canonical instances only)
@ -85,7 +84,7 @@ function get_batch_output_files($batch_id, $auth_str) {
unlink($zip_filename); unlink($zip_filename);
} }
// get all the output files of a workunit (canonical instances only) // get all the output files of a workunit (canonical instance only)
// and make a zip of all of them // and make a zip of all of them
// //
function get_wu_output_files($wu_id, $auth_str) { function get_wu_output_files($wu_id, $auth_str) {
@ -118,19 +117,18 @@ function get_wu_output_files($wu_id, $auth_str) {
unlink($zip_basename); unlink($zip_basename);
} }
$auth_str = get_str('auth_str'); $auth_str = get_str('auth_str');
$instance_name = get_str('instance_name', true); $instance_name = get_str('instance_name', true);
$batch_id = get_int('batch_id' , true);
$wu_id = get_int('wu_id');
if ($instance_name) { if ($instance_name) {
$file_num = get_int('file_num'); $file_num = get_int('file_num');
get_output_file($instance_name, $file_num, $auth_str); get_output_file($instance_name, $file_num, $auth_str);
} else { } else if ($batch_id) {
$batch_id = get_int('batch_id' , true);
if ($batch_id) {
get_batch_output_files($batch_id, $auth_str); get_batch_output_files($batch_id, $auth_str);
} else { } else if ($wu_id) {
$wu_id=get_int('wu_id');
get_wu_output_files($wu_id, $auth_str); get_wu_output_files($wu_id, $auth_str);
} } else {
echo "bad command\n";
} }
?> ?>

View File

@ -208,12 +208,15 @@ function submit_batch($r) {
$let = (double)$x; $let = (double)$x;
if ($batch_id) { if ($batch_id) {
$batch->update("logical_end_time=$let and state= ".BATCH_STATE_IN_PROGRESS); $njobs = count($jobs);
$ret = $batch->update("njobs=$njobs, logical_end_time=$let, state= ".BATCH_STATE_IN_PROGRESS);
if (!$ret) xml_error(-1, "batch->update() failed");
} else { } else {
$batch_name = (string)($r->batch->batch_name); $batch_name = (string)($r->batch->batch_name);
$batch_id = BoincBatch::insert( $batch_id = BoincBatch::insert(
"(user_id, create_time, njobs, name, app_id, logical_end_time, state) values ($user->id, $now, $njobs, '$batch_name', $app->id, $let, ".BATCH_STATE_IN_PROGRESS.")" "(user_id, create_time, njobs, name, app_id, logical_end_time, state) values ($user->id, $now, $njobs, '$batch_name', $app->id, $let, ".BATCH_STATE_IN_PROGRESS.")"
); );
if (!$batch_id) xml_error(-1, "BoincBatch::insert() failed");
} }
$i = 0; $i = 0;
foreach($jobs as $job) { foreach($jobs as $job) {
@ -278,12 +281,8 @@ function query_batch($r) {
list($user, $user_submit) = authenticate_user($r, null); list($user, $user_submit) = authenticate_user($r, null);
$batch_id = (int)($r->batch_id); $batch_id = (int)($r->batch_id);
$batch = BoincBatch::lookup_id($batch_id); $batch = BoincBatch::lookup_id($batch_id);
if (!$batch) { if (!$batch) xml_error(-1, "no such batch");
xml_error(-1, "no such batch"); if ($batch->user_id != $user->id) xml_error(-1, "not owner");
}
if ($batch->user_id != $user->id) {
xml_error(-1, "not owner");
}
$wus = BoincWorkunit::enum("batch = $batch_id"); $wus = BoincWorkunit::enum("batch = $batch_id");
$batch = get_batch_params($batch, $wus); $batch = get_batch_params($batch, $wus);
@ -301,6 +300,34 @@ function query_batch($r) {
echo "</batch>\n"; echo "</batch>\n";
} }
// variant for Condor, which doesn't care about instances
//
function query_batch_condor($r) {
list($user, $user_submit) = authenticate_user($r, null);
$batch_id = (int)($r->batch_id);
$batch = BoincBatch::lookup_id($batch_id);
if (!$batch) xml_error(-1, "no such batch");
if ($batch->user_id != $user->id) xml_error(-1, "not owner");
$wus = BoincWorkunit::enum("batch = $batch_id");
echo "<batch>\n";
foreach ($wus as $wu) {
if ($wu->canonical_resultid) {
$status = "done";
} else if ($wu->error_mask) {
$status = "error";
} else {
$status = "in progress";
}
echo
" <job>
<job_name>$wu->name</job_name>
<status>$status</status>
</job>
";
}
echo "</batch>\n";
}
function query_job($r) { function query_job($r) {
list($user, $user_submit) = authenticate_user($r, null); list($user, $user_submit) = authenticate_user($r, null);
$job_id = (int)($r->job_id); $job_id = (int)($r->job_id);
@ -413,12 +440,13 @@ switch ($r->getName()) {
case 'abort_batch': handle_abort_batch($r); break; case 'abort_batch': handle_abort_batch($r); break;
case 'estimate_batch': estimate_batch($r); break; case 'estimate_batch': estimate_batch($r); break;
case 'query_batch': query_batch($r); break; case 'query_batch': query_batch($r); break;
case 'query_batch_condor': query_batch_condor($r); break;
case 'query_batches': query_batches($r); break; case 'query_batches': query_batches($r); break;
case 'query_job': query_job($r); break; case 'query_job': query_job($r); break;
case 'retire_batch': handle_retire_batch($r); break; case 'retire_batch': handle_retire_batch($r); break;
case 'submit_batch': submit_batch($r); break; case 'submit_batch': submit_batch($r); break;
case 'create_batch': create_batch($r); break; case 'create_batch': create_batch($r); break;
default: xml_error(-1, "bad command"); default: xml_error(-1, "bad command: ".$r->getName());
} }
?> ?>

View File

@ -139,7 +139,7 @@ int process_input_files(SUBMIT_REQ& req) {
// parse the text coming from Condor // parse the text coming from Condor
// //
int parse_boinc_submit(COMMAND& c, char* p, SUBMIT_REQ& req) { int parse_submit(COMMAND& c, char* p, SUBMIT_REQ& req) {
strcpy(req.batch_name, strtok_r(NULL, " ", &p)); strcpy(req.batch_name, strtok_r(NULL, " ", &p));
strcpy(req.app_name, strtok_r(NULL, " ", &p)); strcpy(req.app_name, strtok_r(NULL, " ", &p));
int njobs = atoi(strtok_r(NULL, " ", &p)); int njobs = atoi(strtok_r(NULL, " ", &p));
@ -179,10 +179,10 @@ int parse_boinc_submit(COMMAND& c, char* p, SUBMIT_REQ& req) {
// - create batch/file associations, and upload files // - create batch/file associations, and upload files
// - create jobs // - create jobs
// //
void handle_boinc_submit(COMMAND& c, char* p) { void handle_submit(COMMAND& c, char* p) {
SUBMIT_REQ req; SUBMIT_REQ req;
int retval; int retval;
retval = parse_boinc_submit(c, p, req); retval = parse_submit(c, p, req);
if (retval) { if (retval) {
printf("error parsing request: %d\n", retval); printf("error parsing request: %d\n", retval);
return; return;
@ -207,6 +207,43 @@ void handle_boinc_submit(COMMAND& c, char* p) {
printf("success\n"); printf("success\n");
} }
void handle_query_batch(COMMAND&c, char* p) {
int batch_id = atoi(strtok_r(NULL, " ", &p));
QUERY_BATCH_REPLY reply;
query_batch(project_url, authenticator, batch_id, reply);
for (unsigned int i=0; i<reply.jobs.size(); i++) {
QUERY_BATCH_JOB &j = reply.jobs[i];
printf("job %s: status %s\n", j.job_name.c_str(), j.status.c_str());
}
}
// <job name> <dir>
// <#files>
// <dst name>
// ...
//
void handle_fetch_output(COMMAND& c, char* p) {
FETCH_OUTPUT_REQ req;
strcpy(req.job_name, strtok_r(NULL, " ", &p));
strcpy(req.dir, strtok_r(NULL, " ", &p));
req.file_names.clear();
int nfiles = atoi(strtok_r(NULL, " ", &p));
for (int i=0; i<nfiles; i++) {
char* f = strtok_r(NULL, " ", &p);
req.file_names.push_back(string(f));
}
for (int i=0; i<nfiles; i++) {
char path[1024];
sprintf(path, "%s/%s", req.dir, req.file_names[i].c_str());
int retval = get_output_file(
project_url, authenticator, req.job_name, i, path
);
if (retval) {
printf("get_output_file() returned %d\n", retval);
}
}
}
void* handle_command_aux(void* q) { void* handle_command_aux(void* q) {
COMMAND &c = *((COMMAND*)q); COMMAND &c = *((COMMAND*)q);
char *p; char *p;
@ -215,7 +252,11 @@ void* handle_command_aux(void* q) {
char* id = strtok_r(NULL, " ", &p); char* id = strtok_r(NULL, " ", &p);
printf("handling cmd %s\n", cmd); printf("handling cmd %s\n", cmd);
if (!strcmp(cmd, "BOINC_SUBMIT")) { if (!strcmp(cmd, "BOINC_SUBMIT")) {
handle_boinc_submit(c, p); handle_submit(c, p);
} else if (!strcmp(cmd, "BOINC_QUERY_BATCH")) {
handle_query_batch(c, p);
} else if (!strcmp(cmd, "BOINC_FETCH_OUTPUT")) {
handle_fetch_output(c, p);
} else { } else {
sleep(10); sleep(10);
char buf[256]; char buf[256];

View File

@ -49,3 +49,19 @@ struct SUBMIT_REQ {
// maps local path to info about file // maps local path to info about file
int batch_id; int batch_id;
}; };
struct QUERY_BATCH_JOB {
string job_name;
string status;
QUERY_BATCH_JOB(){}
};
struct QUERY_BATCH_REPLY {
vector<QUERY_BATCH_JOB> jobs;
};
struct FETCH_OUTPUT_REQ {
char job_name[256];
char dir[256];
vector<string> file_names;
};

View File

@ -28,6 +28,31 @@
using std::vector; using std::vector;
using std::string; using std::string;
// do an HTTP GET request.
//
static int do_http_get(
const char* url,
const char* dst_path
) {
FILE* reply = fopen(dst_path, "w");
if (!reply) return -1;
CURL *curl = curl_easy_init();
if (!curl) {
return -1;
}
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_USERAGENT, "BOINC Condor adapter");
curl_easy_setopt(curl, CURLOPT_WRITEDATA, reply);
CURLcode res = curl_easy_perform(curl);
if (res != CURLE_OK) {
fprintf(stderr, "CURL error: %s\n", curl_easy_strerror(res));
}
curl_easy_cleanup(curl);
return 0;
}
// send an HTTP POST request, // send an HTTP POST request,
// with an optional set of multi-part file attachments // with an optional set of multi-part file attachments
// //
@ -217,7 +242,7 @@ int create_batch(
int submit_jobs( int submit_jobs(
const char* project_url, const char* project_url,
const char* authenticator, const char* authenticator,
SUBMIT_REQ req SUBMIT_REQ &req
) { ) {
char buf[1024], url[1024]; char buf[1024], url[1024];
sprintf(buf, sprintf(buf,
@ -276,3 +301,61 @@ int submit_jobs(
fclose(reply); fclose(reply);
return retval; return retval;
} }
int query_batch(
const char* project_url,
const char* authenticator,
int batch_id,
QUERY_BATCH_REPLY& qb_reply
) {
string request;
char url[1024], buf[256];
request = "<query_batch_condor>\n";
sprintf(buf, "<batch_id>%d</batch_id>\n", batch_id);
request += string(buf);
sprintf(buf, "<authenticator>%s</authenticator>\n", authenticator);
request += string(buf);
request += "</query_batch_condor>\n";
sprintf(url, "%ssubmit_rpc_handler.php", project_url);
FILE* reply = tmpfile();
vector<string> x;
int retval = do_http_post(url, request.c_str(), reply, x);
if (retval) {
fclose(reply);
return retval;
}
fseek(reply, 0, SEEK_SET);
retval = 0;
while (fgets(buf, 256, reply)) {
printf("query_batch reply: %s", buf);
if (strstr(buf, "error")) {
retval = -1;
}
if (strstr(buf, "<job>")) {
QUERY_BATCH_JOB qbj;
while (fgets(buf, 256, reply)) {
if (strstr(buf, "</job>")) {
qb_reply.jobs.push_back(qbj);
}
if (parse_str(buf, "job_name", qbj.job_name)) continue;
if (parse_str(buf, "status", qbj.status)) continue;
}
}
}
fclose(reply);
return retval;
}
int get_output_file(
const char* project_url,
const char* authenticator,
const char* job_name,
int file_num,
const char* dst_path
) {
char url[1024];
sprintf(url, "%sget_output.php?auth_str=%s&instance_name=%s&file_num=%d",
project_url, authenticator, job_name, file_num
);
return do_http_get(url, dst_path);
}