Remote job submission: add support for per-job templates in submit requests

This supports the TACC use case,
in the jobs in a batch can use different Docker images
and different input and output file signatures,
none of which are known in advance.

Python API binding:
    - A JOB_DESC object can optionally contain wu_template and result_template
        elements, which are the templates (the actual XML) to use for that job.
        Add these to the XML request message if present.
    - Added the same capability to the PHP binding, but not C++.
    - Added and debugged test cases for both languages.

    Also, submit_batch() can take either a batch name (in which case
    the batch is created) or a batch ID
    (in which the batch was created prior to remotely staging files).

RPC handler:
    - in submit_batch(), check for jobs with templates specified
        and store them in files.
        For input templates (which are deleted after creating jobs)
        we put them in /tmp,
        and use a map so that if two templates are the same we use 1 file.
        For output templates (which have to last until all jobs are done)
        we put them in templates/tmp, with content-based filenames
        to economize.
    - When creating jobs, or generating SQL strings for multiple jobs,
        use these names as --wu_template_filename
        and --result_template_filename args to create_work
        (either cmdline args or stdin args)
    - Delete WU templates when done

create_work.cpp:
    handle per-job --wu_template and --result_template args in stdin job lines
    (the names of per-job WU and result templates).
    Maintain a map mapping WU template name to contents,
    to avoid repeatedly reading them.

    For jobs that don't specify templates, use the ones specified
    at the batch level, or the defaults.
This commit is contained in:
David Anderson 2017-01-21 00:24:11 -08:00
parent 0bc9b02640
commit 381e0caf14
7 changed files with 295 additions and 50 deletions

View File

@ -20,6 +20,10 @@
$fixed_navbar = false;
if (defined('REMOTE_JOB_SUBMISSION') && REMOTE_JOB_SUBMISSION) {
require_once("../inc/submit_db.inc");
}
////////////// NAVBAR ////////////////
// call this to start the navbar.
@ -167,7 +171,6 @@ function sample_navbar(
array(tra("Applications"), $url_prefix."apps.php"),
);
if (defined('REMOTE_JOB_SUBMISSION') && REMOTE_JOB_SUBMISSION) {
require_once("../inc/submit_db.inc");
if ($user && BoincUserSubmit::lookup_userid($user->id)) {
$x[] = array("Job submission", $url_prefix."submit.php");
}

View File

@ -77,6 +77,14 @@ function req_to_xml($req, $op) {
}
$x .= " </input_file>\n";
}
// send templates for both estimate and submit ops
//
if (isset($job->wu_template)) {
$x .= " <wu_template>\n$job->wu_template\n </wu_template>\n";
}
if (isset($job->result_template)) {
$x .= " <result_template>\n$job->result_template\n </result_template>\n";
}
$x .= " </job>
";
}
@ -318,28 +326,70 @@ function boinc_retire_batch($req) {
}
//// example usage follows
/*
if (1) {
$req = new StdClass;
$req->project = "http://isaac.ssl.berkeley.edu/test/";
$req->authenticator = "x";
if (0) {
$req->authenticator = trim(file_get_contents("test_auth"));
$req->app_name = "uppercase";
$req->batch_name = "batch_name3";
$req->jobs = array();
$job = new StdClass;
$job->input_files = array();
$f = new StdClass;
$f->mode = "remote";
$f->url = "http://isaac.ssl.berkeley.edu/validate_logic.txt";
$f->md5 = "eec5a142cea5202c9ab2e4575a8aaaa7";
$f->nbytes = 4250;
$job->input_files[] = $f;
$f = new StdClass;
$f->mode = "local";
$f->source = "foobar";
//$job->input_files[] = $f;
for ($i=10; $i<11; $i++) {
$it = "
<input_template>
<file_info>
</file_info>
<workunit>
<file_ref>
<open_name>in</open_name>
</file_ref>
<target_nresults>1</target_nresults>
<min_quorum>1</min_quorum>
<rsc_fpops_est> 60e9 </rsc_fpops_est>
<rsc_fpops_bound> 60e12 </rsc_fpops_bound>
<rsc_disk_bound>2e6</rsc_disk_bound>
<rsc_memory_bound>1e6</rsc_memory_bound>
<delay_bound>3600</delay_bound>
<credit>1</credit>
</workunit>
</input_template>
";
$ot = "
<output_template>
<file_info>
<name><OUTFILE_0/></name>
<generated_locally/>
<upload_when_present/>
<max_nbytes>5000000</max_nbytes>
<url><UPLOAD_URL/></url>
</file_info>
<result>
<file_ref>
<file_name><OUTFILE_0/></file_name>
<open_name>out</open_name>
</file_ref>
</result>
</output_template>
";
for ($i=0; $i<2; $i++) {
$job->rsc_fpops_est = $i*1e9;
$job->command_line = "--t $i";
$job->wu_template = $it;
$job->result_template = $ot;
$req->jobs[] = $job;
}
@ -378,5 +428,4 @@ if (0) {
print_r($jobs);
}
}
*/
?>

View File

@ -180,10 +180,18 @@ function stage_files(&$jobs, $template) {
}
}
// submit a list of jobs with a single create_work command.
//
function submit_jobs(
$jobs, $template, $app, $batch_id, $priority,
$result_template_file = null, $workunit_template_file = null
$result_template_file, // batch-level; can also specify per job
$workunit_template_file
) {
global $wu_templates, $result_templates;
// make a string to pass to create_work;
// one line per job
//
$x = "";
foreach($jobs as $job) {
if ($job->name) {
@ -206,6 +214,14 @@ function submit_jobs(
$x .= " $file->name";
}
}
if ($job->wu_template) {
$f = $wu_templates[$job->wu_template];
$x .= " --wu_template $f";
}
if ($job->result_template) {
$f = $result_templates[$job->result_template];
$x .= " --result_template $f";
}
$x .= "\n";
}
@ -232,6 +248,58 @@ function submit_jobs(
unlink($errfile);
}
// lists of arrays for job-level templates;
// each maps template to filename
//
$wu_templates = array();
$result_templates = array();
// The job specifies an input template.
// Check whether the template is already in our map.
// If not, write it to a temp file.
//
function make_wu_template($job) {
global $wu_templates;
if (!array_key_exists($job->wu_template, $wu_templates)) {
$f = tempnam("/tmp", "wu_template_");
//echo "writing wt $f\n";
file_put_contents($f, $job->wu_template);
$wu_templates[$job->wu_template] = $f;
} else {
//echo "dup wu template\n";
}
}
// same for output templates.
// A little different because these have to exist for life of job.
// Store them in templates/tmp/, with content-based filenames
//
function make_result_template($job) {
global $result_templates;
if (!array_key_exists($job->result_template, $result_templates)) {
$m = md5($job->result_template);
$filename = "../../templates/tmp/$m";
if (!file_exists($filename)) {
file_put_contents($filename, $job->result_template);
}
$result_templates[$job->result_template] = $filename;
} else {
//echo "dup result template\n";
}
}
// delete per-job WU templates after creating jobs.
// (we can't delete result templates)
//
function delete_wu_templates() {
global $wu_templates;
foreach ($wu_templates as $t->$f) {
unlink($f);
}
}
// convert job list from XML nodes to our own objects
//
function xml_get_jobs($r) {
$jobs = array();
foreach($r->batch->job as $j) {
@ -243,6 +311,8 @@ function xml_get_jobs($r) {
$job->target_host = (int)$j->target_host;
$job->name = (string)$j->name;
$job->rsc_fpops_est = (double)$j->rsc_fpops_est;
$job->wu_template = $j->wu_template->input_template->asXML();
$job->result_template = $j->result_template->output_template->asXML();
foreach ($j->input_file as $f) {
$file = new StdClass;
$file->mode = (string)$f->mode;
@ -256,6 +326,12 @@ function xml_get_jobs($r) {
$job->input_files[] = $file;
}
$jobs[] = $job;
if ($job->wu_template) {
make_wu_template($job);
}
if ($job->result_template) {
make_result_template($job);
}
}
return $jobs;
}
@ -350,14 +426,16 @@ function submit_batch($r) {
echo "<batch_id>$batch_id</batch_id>
</submit_batch>
";
//delete_wu_templates();
}
function create_batch($r) {
xml_start_tag("create_batch");
$app = get_submit_app((string)($r->batch->app_name));
$app = get_submit_app((string)($r->app_name));
list($user, $user_submit) = authenticate_user($r, $app);
$now = time();
$batch_name = (string)($r->batch->batch_name);
$batch_name = (string)($r->batch_name);
$batch_name = BoincDb::escape_string($batch_name);
$expire_time = (double)($r->expire_time);
$batch_id = BoincBatch::insert(
@ -744,10 +822,6 @@ estimate_batch($r);
exit;
}
if (0) {
require_once("submit_test.inc");
}
$request_log = parse_config(get_config(), "<remote_submission_log>");
if ($request_log) {
$request_log_dir = parse_config(get_config(), "<log_dir>");
@ -761,7 +835,12 @@ if ($request_log) {
}
xml_header();
$r = simplexml_load_string($_POST['request']);
if (0) {
$r = file_get_contents("submit_req.xml");
} else {
$r = $_POST['request'];
}
$r = simplexml_load_string($r);
if (!$r) {
xml_error(-1, "can't parse request message");
}

View File

@ -24,7 +24,7 @@ import xml.etree.ElementTree as ET
import requests
# you'll need to "yip install requests"
# represents an input file
# describes an input file
#
class FILE_DESC:
def __init__(self):
@ -43,22 +43,27 @@ class FILE_DESC:
xml += '</input_file>\n'
return xml
# represents a job
# describes a job
#
class JOB_DESC:
def __init__(self):
return
def to_xml(self):
xml = ('<job>\n'
'<rsc_fpops_est>%f</rsc_fpops_est>\n'
'<command_line>%s</command_line>\n'
) %(self.rsc_fpops_est, self.command_line)
xml = '<job>\n'
if hasattr(self, 'rsc_fpops'):
xml += '<rsc_fpops_est>%f</rsc_fpops_est>\n'%self.rsc_fpops_est
if hasattr(self, 'command_line'):
xml += '<command_line>%s</command_line>\n'%self.command_line
if hasattr(self, 'wu_template'):
xml += '<wu_template>\n%s\n</wu_template>\n'%self.wu_template
if hasattr(self, 'result_template'):
xml += '<result_template>\n%s\n</result_template>\n'%self.result_template
for file in self.files:
xml += file.to_xml()
xml += '</job>\n'
return xml
# represents a batch description for submit() or estimate()
# describes a batch for submit() or estimate()
#
class BATCH_DESC:
def __init__(self):
@ -69,8 +74,12 @@ class BATCH_DESC:
'<authenticator>%s</authenticator>\n'
'<batch>\n'
'<app_name>%s</app_name>\n'
'<batch_name>%s</batch_name>\n'
) %(op, self.authenticator, self.app_name, self.batch_name)
) %(op, self.authenticator, self.app_name)
if hasattr(self, 'batch_id'):
xml += '<batch_id>%s</batch_id>\n'%(self.batch_id)
elif hasattr(self, 'batch_name'):
xml += '<batch_name>%s</batch_name>\n'%(self.batch_name)
for job in self.jobs:
xml += job.to_xml()
xml += '</batch>\n</%s>\n' %(op)
@ -80,7 +89,7 @@ class CREATE_BATCH_REQ:
def __init__(self):
return
def to_xml(self):
xml = ('create_batch\n'
xml = ('<create_batch>\n'
'<authenticator>%s</authenticator>\n'
'<app_name>%s</app_name>\n'
'<batch_name>%s</batch_name>\n'
@ -95,6 +104,7 @@ class REQUEST:
return
def do_http_post(req, project_url, handler='submit_rpc_handler.php'):
print req
url = project_url + handler
params = urllib.urlencode({'request': req})
f = urllib.urlopen(url, params)
@ -121,6 +131,8 @@ def abort_jobs(req):
req_xml += '</abort_jobs>\n'
return do_http_post(req_xml, req.project)
# req is a CREATE_BATCH_REQ
#
def create_batch(req):
return do_http_post(req.to_xml(), req.project)
@ -170,7 +182,6 @@ def get_output_files(req):
auth_str = md5.new(req.authenticator+req.batch_id).digest()
return project_url+"/get_output.php?cmd=batch_files&batch_id=%s&auth_str=%s"%(req.batch_id, auth_str)
def retire_batch(req):
req_xml = ('<retire_batch>\n'
'<authenticator>%s</authenticator>\n'
@ -211,7 +222,7 @@ class UPLOAD_FILES_REQ:
xml += '</upload_files>\n'
return xml
# This actually does two RPC:
# This actually does two RPCs:
# query_files() to find what files aren't already on server
# upload_files() to upload them
#

View File

@ -224,6 +224,9 @@ int check_files(char** infiles, int ninfiles, SCHED_CONFIG& config_loc) {
return 0;
}
// variant where input files are described by a list of names,
// for use by work generators
//
int create_work(
DB_WORKUNIT& wu,
const char* _wu_template,
@ -254,6 +257,13 @@ int create_work(
);
}
// variant where input files are described by INFILE_DESCS,
// so you can have remote files etc.
//
// If query_string is present, don't actually create the job;
// instead, append to the query string.
// The caller is responsible for doing the query.
//
int create_work2(
DB_WORKUNIT& wu,
const char* _wu_template,

View File

@ -25,6 +25,7 @@
#include <cstring>
#include <ctime>
#include <string>
#include <map>
#include <sys/param.h>
#include <unistd.h>
@ -40,6 +41,7 @@
#include "backend_lib.h"
using std::string;
using std::map;
bool verbose = false;
bool continue_on_error = false;
@ -80,6 +82,7 @@ void usage() {
" [ --wu_id ID ] ID of existing workunit record (used by boinc_submit)\n"
" [ --wu_name name ] default: generate a name based on app name\n"
" [ --wu_template filename ] default: appname_in\n"
"\nSee http://boinc.berkeley.edu/trac/wiki/JobSubmission\n"
);
exit(1);
}
@ -102,10 +105,16 @@ void check_assign_id(int x) {
}
}
// describes a job.
// Also used to store batch-level info such as template names
// and assignment info
//
struct JOB_DESC {
DB_WORKUNIT wu;
char wu_template[BLOB_SIZE];
char result_template_file[256], result_template_path[MAXPATHLEN];
char wu_template_file[256];
char result_template_file[256];
char result_template_path[MAXPATHLEN];
vector <INFILE_DESC> infiles;
char* command_line;
char additional_xml[256];
@ -119,6 +128,7 @@ struct JOB_DESC {
command_line = NULL;
assign_flag = false;
assign_multi = false;
strcpy(wu_template_file, "");
strcpy(result_template_file, "");
strcpy(additional_xml, "");
assign_id = 0;
@ -152,6 +162,10 @@ void JOB_DESC::parse_cmdline(int argc, char** argv) {
command_line = argv[++i];
} else if (arg(argv, i, (char*)"wu_name")) {
safe_strcpy(wu.name, argv[++i]);
} else if (arg(argv, i, (char*)"wu_template")) {
safe_strcpy(wu_template_file, argv[++i]);
} else if (arg(argv, i, (char*)"result_template")) {
safe_strcpy(result_template_file, argv[++i]);
} else if (arg(argv, i, (char*)"remote_file")) {
INFILE_DESC id;
id.is_remote = true;
@ -182,10 +196,33 @@ void JOB_DESC::parse_cmdline(int argc, char** argv) {
}
}
// See if WU template was given for job.
// Many jobs may have the same ones.
// To avoid rereading files, cache them in a map.
// Get from cache if there, else read the file and add to cache
//
void get_wu_template(JOB_DESC& jd2) {
// the jobs may specify WU templates.
//
static map<char*, char*> wu_templates;
if (wu_templates.count(jd2.wu_template_file) == 0) {
char* p;
int retval = read_file_malloc(jd2.wu_template_file, p, 0, false);
if (retval) {
fprintf(
stderr, "Can't read WU template %s\n", jd2.wu_template_file
);
exit(1);
}
wu_templates[jd2.wu_template_file] = p;
}
strcpy(jd2.wu_template, wu_templates[jd2.wu_template_file]);
}
int main(int argc, char** argv) {
DB_APP app;
int retval;
char wu_template_file[256];
int i;
char download_dir[256], db_name[256], db_passwd[256];
char db_user[256],db_host[256];
@ -194,7 +231,6 @@ int main(int argc, char** argv) {
bool show_wu_name = true;
bool use_stdin = false;
strcpy(wu_template_file, "");
strcpy(app.name, "");
strcpy(db_passwd, "");
const char* config_dir = 0;
@ -211,7 +247,7 @@ int main(int argc, char** argv) {
show_wu_name = false;
safe_strcpy(jd.wu.name, argv[++i]);
} else if (arg(argv, i, "wu_template")) {
safe_strcpy(wu_template_file, argv[++i]);
safe_strcpy(jd.wu_template_file, argv[++i]);
} else if (arg(argv, i, "result_template")) {
safe_strcpy(jd.result_template_file, argv[++i]);
} else if (arg(argv, i, "config_dir")) {
@ -318,8 +354,8 @@ int main(int argc, char** argv) {
if (!strlen(jd.wu.name)) {
sprintf(jd.wu.name, "%s_%d_%f", app.name, getpid(), dtime());
}
if (!strlen(wu_template_file)) {
sprintf(wu_template_file, "templates/%s_in", app.name);
if (!strlen(jd.wu_template_file)) {
sprintf(jd.wu_template_file, "templates/%s_in", app.name);
}
if (!strlen(jd.result_template_file)) {
sprintf(jd.result_template_file, "templates/%s_out", app.name);
@ -352,12 +388,16 @@ int main(int argc, char** argv) {
exit(1);
}
// read the WU template file.
// this won't get used if we're creating a batch
// with job-level WU templates
//
retval = read_filename(
wu_template_file, jd.wu_template, sizeof(jd.wu_template)
jd.wu_template_file, jd.wu_template, sizeof(jd.wu_template)
);
if (retval) {
fprintf(stderr,
"create_work: can't open input template %s\n", wu_template_file
"create_work: can't open input template %s\n", jd.wu_template_file
);
exit(1);
}
@ -367,6 +407,10 @@ int main(int argc, char** argv) {
strcpy(jd.result_template_path, "./");
strcat(jd.result_template_path, jd.result_template_file);
if (use_stdin) {
// clear the WU template name so we'll recognize a job-level one
//
strcpy(jd.wu_template_file, "");
if (jd.assign_flag) {
// if we're doing assignment we can't use the bulk-query method;
// create the jobs one at a time.
@ -383,6 +427,9 @@ int main(int argc, char** argv) {
if (!strlen(jd2.wu.name)) {
sprintf(jd2.wu.name, "%s_%d", jd.wu.name, j);
}
if (strlen(jd2.wu_template_file)) {
get_wu_template(jd2);
}
jd2.create();
}
} else {
@ -410,6 +457,9 @@ int main(int argc, char** argv) {
// otherwise accumulate a SQL query so that we can
// create jobs en masse
//
if (strlen(jd2.wu_template_file)) {
get_wu_template(jd2);
}
retval = create_work2(
jd2.wu,
jd2.wu_template,

View File

@ -21,7 +21,7 @@ from submit_api import *
project_url = 'http://isaac.ssl.berkeley.edu/test/'
# read auth from a file so we don't have to including it here
# read auth from a file so we don't have to include it here
#
def get_auth():
with open("test_auth", "r") as f:
@ -31,10 +31,8 @@ def get_auth():
#
def make_batch_desc():
file = FILE_DESC()
file.mode = 'remote'
file.url = 'http://isaac.ssl.berkeley.edu/validate_logic.txt'
file.md5 = "eec5a142cea5202c9ab2e4575a8aaaa7"
file.nbytes = 4250
file.mode = 'local_staged'
file.source = 'input'
job = JOB_DESC()
job.files = [file]
@ -43,12 +41,44 @@ def make_batch_desc():
batch.project = project_url
batch.authenticator = get_auth()
batch.app_name = "uppercase"
batch.batch_name = "blah"
batch.batch_name = "blah16"
batch.jobs = []
for i in range(3):
for i in range(2):
job.rsc_fpops_est = i*1e9
job.command_line = '-i %s' %(i)
job.wu_template = """
<input_template>
<file_info>
</file_info>
<workunit>
<file_ref>
<open_name>in</open_name>
</file_ref>
<target_nresults>1</target_nresults>
<min_quorum>1</min_quorum>
<credit>2</credit>
<rsc_fpops_est> 60e9 </rsc_fpops_est>
</workunit>
</input_template>
"""
job.result_template = """
<output_template>
<file_info>
<name><OUTFILE_0/></name>
<generated_locally/>
<upload_when_present/>
<max_nbytes>4000000</max_nbytes>
<url><UPLOAD_URL/></url>
</file_info>
<result>
<file_ref>
<file_name><OUTFILE_0/></file_name>
<open_name>out</open_name>
</file_ref>
</result>
</output_template>
"""
batch.jobs.append(copy.copy(job))
return batch
@ -66,7 +96,7 @@ def test_submit_batch():
batch = make_batch_desc()
r = submit_batch(batch)
if r[0].tag == 'error':
print 'error: ', r.find('error_msg').text
print 'error: ', r[0].find('error_msg').text
return
print 'batch ID: ', r[0].text
@ -99,12 +129,25 @@ def test_query_batch():
print ' n_outfiles: ', job.find('n_outfiles').text
# ... various other fields
def test_abort_batch
def test_create_batch():
req = CREATE_BATCH_REQ()
req.project = project_url
req.authenticator = get_auth()
req.app_name = 'uppercase'
req.batch_name = 'foobar'
req.expire_time = 0
r = create_batch(req)
if r[0].tag == 'error':
print 'error: ', r[0].find('error_msg').text
return
print 'batch ID: ', r[0].text
def test_abort_batch():
req = REQUEST()
req.project = project_url
req.authenticator = get_auth()
req.batch_id = 271
r = abort_bath(req)
r = abort_batch(req)
if r[0].tag == 'error':
print 'error: ', r.find('error_msg').text
return
@ -123,4 +166,4 @@ def test_upload_files():
return
print 'success'
test_upload_files()
test_submit_batch()