create_work: add --stdin option for more efficient batch creation

Previously if you wanted to create lots of jobs from a script (e.g. PHP)
you had to run create_work once per job.
With the --stdin option you run it once,
passing it a file (view stdin) with one line per job.
Each line can specify a command line and/or a set of input files.

On my server this gives a performance of about 1000 jobs per minute,
which is less than I would have expected,
but all the time is spent in doing MySQL inserts
so that's as good as we can do for now.

Also fix a bug in stage_file.
This commit is contained in:
David Anderson 2014-04-07 09:07:00 -07:00
parent 0c003b782d
commit c3cbf29af3
3 changed files with 156 additions and 98 deletions

View File

@ -1,6 +1,6 @@
// This file is part of BOINC.
// http://boinc.berkeley.edu
// Copyright (C) 2008 University of California
// Copyright (C) 2014 University of California
//
// BOINC is free software; you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License
@ -15,11 +15,9 @@
// You should have received a copy of the GNU Lesser General Public License
// along with BOINC. If not, see <http://www.gnu.org/licenses/>.
// Create a workunit.
// Input files must be in the download dir.
// See the docs for a description of WU and result template files
// This program must be run in the project's root directory
//
// Create workunit(s).
// see http://boinc.berkeley.edu/trac/wiki/JobSubmission
#include "config.h"
#include <cstdio>
@ -36,6 +34,7 @@
#include "filesys.h"
#include "sched_config.h"
#include "str_replace.h"
#include "str_util.h"
#include "util.h"
#include "backend_lib.h"
@ -66,6 +65,7 @@ void usage() {
" [ --rsc_fpops_bound x ]\n"
" [ --rsc_memory_bound x ]\n"
" [ --size_class n ]\n"
" [ --stdin ]\n"
" [ --target_host ID ]\n"
" [ --target_nresults n ]\n"
" [ --target_team ID ]\n"
@ -77,7 +77,7 @@ void usage() {
exit(1);
}
bool arg(const char** argv, int i, const char* name) {
bool arg(char** argv, int i, const char* name) {
char buf[256];
sprintf(buf, "-%s", name);
if (!strcmp(argv[i], buf)) return true;
@ -86,138 +86,175 @@ bool arg(const char** argv, int i, const char* name) {
return false;
}
int main(int argc, const char** argv) {
DB_APP app;
struct JOB_DESC {
DB_WORKUNIT wu;
int retval;
char wu_template[BLOB_SIZE];
char wu_template_file[256], result_template_file[256], result_template_path[MAXPATHLEN];
const char* command_line=NULL;
const char** infiles = NULL;
int i, ninfiles;
char result_template_file[256], result_template_path[MAXPATHLEN];
char** infiles;
char* command_line;
char additional_xml[256];
bool assign_flag;
bool assign_multi;
int assign_id;
int assign_type;
int ninfiles;
JOB_DESC() {
wu.clear();
infiles = NULL;
command_line = NULL;
assign_flag = false;
assign_multi = false;
strcpy(result_template_file, "");
strcpy(additional_xml, "");
assign_id = 0;
assign_type = ASSIGN_NONE;
ninfiles = 0;
// defaults (in case they're not in WU template)
//
wu.id = 0;
wu.min_quorum = 2;
wu.target_nresults = 2;
wu.max_error_results = 3;
wu.max_total_results = 10;
wu.max_success_results = 6;
wu.rsc_fpops_est = 3600e9;
wu.rsc_fpops_bound = 86400e9;
wu.rsc_memory_bound = 5e8;
wu.rsc_disk_bound = 1e9;
wu.rsc_bandwidth_bound = 0.0;
wu.delay_bound = 7*86400;
}
void create();
void parse_cmdline(int, char**);
};
// parse additional job-specific info when using --stdin
//
void JOB_DESC::parse_cmdline(int argc, char** argv) {
for (int i=0; i<argc; i++) {
if (arg(argv, i, (char*)"command_line")) {
command_line = argv[++i];
} else {
if (!strncmp("-", argv[i], 1)) {
fprintf(stderr, "create_work: bad stdin argument '%s'\n", argv[i]);
exit(1);
}
infiles = argv+i;
ninfiles = argc - i;
break;
}
}
}
int main(int argc, char** argv) {
DB_APP app;
int retval;
char wu_template_file[256];
int i;
char download_dir[256], db_name[256], db_passwd[256];
char db_user[256],db_host[256];
char buf[256];
char additional_xml[256];
char buf[4096];
JOB_DESC jd;
bool show_wu_name = true;
bool assign_flag = false;
bool assign_multi = false;
int assign_id = 0;
int assign_type = ASSIGN_NONE;
bool use_stdin = false;
strcpy(wu_template_file, "");
strcpy(result_template_file, "");
strcpy(app.name, "");
strcpy(db_passwd, "");
strcpy(additional_xml, "");
const char* config_dir = 0;
i = 1;
ninfiles = 0;
wu.clear();
// defaults (in case they're not in WU template)
wu.id = 0;
wu.min_quorum = 2;
wu.target_nresults = 2;
wu.max_error_results = 3;
wu.max_total_results = 10;
wu.max_success_results = 6;
wu.rsc_fpops_est = 3600e9;
wu.rsc_fpops_bound = 86400e9;
wu.rsc_memory_bound = 5e8;
wu.rsc_disk_bound = 1e9;
wu.rsc_bandwidth_bound = 0.0;
wu.delay_bound = 7*86400;
while (i < argc) {
if (arg(argv, i, "appname")) {
safe_strcpy(app.name, argv[++i]);
} else if (arg(argv, i, "batch")) {
wu.batch = atoi(argv[++i]);
} else if (arg(argv, i, "d")) {
int dl = atoi(argv[++i]);
log_messages.set_debug_level(dl);
if (dl ==4) g_print_queries = true;
} else if (arg(argv, i, "wu_name")) {
show_wu_name = false;
safe_strcpy(wu.name, argv[++i]);
safe_strcpy(jd.wu.name, argv[++i]);
} else if (arg(argv, i, "wu_template")) {
safe_strcpy(wu_template_file, argv[++i]);
} else if (arg(argv, i, "result_template")) {
safe_strcpy(result_template_file, argv[++i]);
safe_strcpy(jd.result_template_file, argv[++i]);
} else if (arg(argv, i, "config_dir")) {
config_dir = argv[++i];
} else if (arg(argv, i, "batch")) {
wu.batch = atoi(argv[++i]);
jd.wu.batch = atoi(argv[++i]);
} else if (arg(argv, i, "priority")) {
wu.priority = atoi(argv[++i]);
jd.wu.priority = atoi(argv[++i]);
} else if (arg(argv, i, "rsc_fpops_est")) {
wu.rsc_fpops_est = atof(argv[++i]);
jd.wu.rsc_fpops_est = atof(argv[++i]);
} else if (arg(argv, i, "rsc_fpops_bound")) {
wu.rsc_fpops_bound = atof(argv[++i]);
jd.wu.rsc_fpops_bound = atof(argv[++i]);
} else if (arg(argv, i, "rsc_memory_bound")) {
wu.rsc_memory_bound = atof(argv[++i]);
jd.wu.rsc_memory_bound = atof(argv[++i]);
} else if (arg(argv, i, "size_class")) {
wu.size_class = atoi(argv[++i]);
jd.wu.size_class = atoi(argv[++i]);
} else if (arg(argv, i, "rsc_disk_bound")) {
wu.rsc_disk_bound = atof(argv[++i]);
jd.wu.rsc_disk_bound = atof(argv[++i]);
} else if (arg(argv, i, "delay_bound")) {
wu.delay_bound = atoi(argv[++i]);
jd.wu.delay_bound = atoi(argv[++i]);
} else if (arg(argv, i, "min_quorum")) {
wu.min_quorum = atoi(argv[++i]);
jd.wu.min_quorum = atoi(argv[++i]);
} else if (arg(argv, i, "target_nresults")) {
wu.target_nresults = atoi(argv[++i]);
jd.wu.target_nresults = atoi(argv[++i]);
} else if (arg(argv, i, "max_error_results")) {
wu.max_error_results = atoi(argv[++i]);
jd.wu.max_error_results = atoi(argv[++i]);
} else if (arg(argv, i, "max_total_results")) {
wu.max_total_results = atoi(argv[++i]);
jd.wu.max_total_results = atoi(argv[++i]);
} else if (arg(argv, i, "max_success_results")) {
wu.max_success_results = atoi(argv[++i]);
jd.wu.max_success_results = atoi(argv[++i]);
} else if (arg(argv, i, "opaque")) {
wu.opaque = atoi(argv[++i]);
jd.wu.opaque = atoi(argv[++i]);
} else if (arg(argv, i, "command_line")) {
command_line= argv[++i];
jd.command_line= argv[++i];
} else if (arg(argv, i, "additional_xml")) {
strcpy(additional_xml, argv[++i]);
strcpy(jd.additional_xml, argv[++i]);
} else if (arg(argv, i, "wu_id")) {
wu.id = atoi(argv[++i]);
jd.wu.id = atoi(argv[++i]);
} else if (arg(argv, i, "broadcast")) {
assign_multi = true;
assign_flag = true;
assign_type = ASSIGN_NONE;
jd.assign_multi = true;
jd.assign_flag = true;
jd.assign_type = ASSIGN_NONE;
} else if (arg(argv, i, "broadcast_user")) {
assign_flag = true;
assign_type = ASSIGN_USER;
assign_multi = true;
assign_id = atoi(argv[++i]);
jd.assign_flag = true;
jd.assign_type = ASSIGN_USER;
jd.assign_multi = true;
jd.assign_id = atoi(argv[++i]);
} else if (arg(argv, i, "broadcast_team")) {
assign_flag = true;
assign_type = ASSIGN_TEAM;
assign_multi = true;
assign_id = atoi(argv[++i]);
jd.assign_flag = true;
jd.assign_type = ASSIGN_TEAM;
jd.assign_multi = true;
jd.assign_id = atoi(argv[++i]);
} else if (arg(argv, i, "target_host")) {
assign_flag = true;
assign_type = ASSIGN_HOST;
assign_id = atoi(argv[++i]);
jd.assign_flag = true;
jd.assign_type = ASSIGN_HOST;
jd.assign_id = atoi(argv[++i]);
} else if (arg(argv, i, "target_user")) {
assign_flag = true;
assign_type = ASSIGN_USER;
assign_id = atoi(argv[++i]);
jd.assign_flag = true;
jd.assign_type = ASSIGN_USER;
jd.assign_id = atoi(argv[++i]);
} else if (arg(argv, i, "target_team")) {
assign_flag = true;
assign_type = ASSIGN_TEAM;
assign_id = atoi(argv[++i]);
jd.assign_flag = true;
jd.assign_type = ASSIGN_TEAM;
jd.assign_id = atoi(argv[++i]);
} else if (arg(argv, i, "help")) {
usage();
exit(0);
} else if (arg(argv, i, "stdin")) {
use_stdin = true;
} else {
if (!strncmp("-", argv[i], 1)) {
fprintf(stderr, "create_work: bad argument '%s'\n", argv[i]);
exit(1);
}
infiles = argv+i;
ninfiles = argc - i;
jd.infiles = argv+i;
jd.ninfiles = argc - i;
break;
}
i++;
@ -226,14 +263,14 @@ int main(int argc, const char** argv) {
if (!strlen(app.name)) {
usage();
}
if (!strlen(wu.name)) {
sprintf(wu.name, "%s_%d_%f", app.name, getpid(), dtime());
if (!strlen(jd.wu.name)) {
sprintf(jd.wu.name, "%s_%d_%f", app.name, getpid(), dtime());
}
if (!strlen(wu_template_file)) {
sprintf(wu_template_file, "templates/%s_in", app.name);
}
if (!strlen(result_template_file)) {
sprintf(result_template_file, "templates/%s_out", app.name);
if (!strlen(jd.result_template_file)) {
sprintf(jd.result_template_file, "templates/%s_out", app.name);
}
retval = config.parse_file(config_dir);
@ -262,7 +299,9 @@ int main(int argc, const char** argv) {
exit(1);
}
retval = read_filename(wu_template_file, wu_template, sizeof(wu_template));
retval = read_filename(
wu_template_file, jd.wu_template, sizeof(jd.wu_template)
);
if (retval) {
fprintf(stderr,
"create_work: can't open input template %s\n", wu_template_file
@ -270,11 +309,34 @@ int main(int argc, const char** argv) {
exit(1);
}
wu.appid = app.id;
jd.wu.appid = app.id;
strcpy(result_template_path, "./");
strcat(result_template_path, result_template_file);
retval = create_work(
strcpy(jd.result_template_path, "./");
strcat(jd.result_template_path, jd.result_template_file);
if (use_stdin) {
int _argc;
char* _argv[100];
for (int j=0; ; j++) {
char* p = fgets(buf, sizeof(buf), stdin);
if (p == NULL) break;
JOB_DESC jd2 = jd;
_argc = parse_command_line(buf, _argv);
jd2.parse_cmdline(_argc, _argv);
sprintf(jd2.wu.name, "%s_%d", jd.wu.name, j);
jd2.create();
}
} else {
jd.create();
}
if (show_wu_name) {
printf("workunit name: %s\n", jd.wu.name);
}
boinc_db.close();
}
void JOB_DESC::create() {
char buf[256];
int retval = create_work(
wu,
wu_template,
result_template_file,
@ -288,10 +350,6 @@ int main(int argc, const char** argv) {
if (retval) {
fprintf(stderr, "create_work: %s\n", boincerror(retval));
exit(1);
} else {
if (show_wu_name) {
printf("workunit name: %s\n", wu.name);
}
}
if (assign_flag) {
DB_ASSIGNMENT assignment;
@ -317,7 +375,6 @@ int main(int argc, const char** argv) {
exit(1);
}
}
boinc_db.close();
}
const char *BOINC_RCSID_3865dbbf46 = "$Id$";

View File

@ -371,7 +371,7 @@ static int process_workunit(
}
} else if (xp.parse_string("command_line", cmdline)) {
if (command_line) {
fprintf(stderr, "Can't specify command line twice");
fprintf(stderr, "Can't specify command line twice\n");
return ERR_XML_PARSE;
}
out += "<command_line>\n";

View File

@ -85,6 +85,7 @@ $dl_md5_path = "$dl_path.md5";
// compute the file's MD5
//
$md5 = md5_file($path);
$file_size = filesize($path);
// if file is already in download dir, make sure it's the same
//
@ -117,7 +118,7 @@ Please use a different file name.
// make MD5 file if needed
//
if (!file_exists($dl_md5_path)) {
$x = $md5." ".filesize($path)."\n";
$x = $md5." ".$file_size."\n";
file_put_contents($dl_md5_path, $x);
}