changes to support the job processing cookbook

- script_assimilator: add batch_id option
- add/fix comments
- demo_submit_batch, sample_assimilate.py: finish
-
This commit is contained in:
David Anderson 2024-01-12 00:58:14 -08:00
parent 95adf93241
commit 4539eef345
10 changed files with 223 additions and 78 deletions

View File

@ -0,0 +1,12 @@
#! /usr/bin/env python3
# check that 2 files are identical
import sys
def read(path):
with open(path) as f:
data = f.read()
return data
exit(0 if read(sys.argv[1])==read(sys.argv[2]) else 1)

12
samples/worker/validate_init.py Executable file
View File

@ -0,0 +1,12 @@
#! /usr/bin/env python3
# check that a file is uppercase
import sys
def is_uc(path):
with open(path) as f:
data = f.read()
return data == data.upper()
exit(0 if is_uc(sys.argv[1]) else 1)

View File

@ -30,6 +30,7 @@
// wu_id workunit ID
// result_id ID of the canonical result
// runtime runtime of the canonical result
// batch_id the job's batch ID
//
// if no args are specified, the script is invoked as
// scriptname wu_id files
@ -111,6 +112,9 @@ int assimilate_handler(
} else if (s == "runtime") {
sprintf(buf, " %f", canonical_result.elapsed_time);
strcat(cmd, buf);
} else if (s == "batch_id") {
sprintf(buf, " %d", wu.batch);
strcat(cmd, buf);
}
}
} else {

View File

@ -21,17 +21,19 @@
// cmdline args to this program:
// --init_script "scriptname arg1 ... argn"
// --compare_script "scriptname arg1 ... argn"
//
// You must specify at least one.
// where
// scriptname is the name of a script (in bin/)
// argi are keywords (see below) representing
// args to be passed to the script.
// You must specify at least one script.
//
// The init script checks the validity of a result,
// e.g. that the output files have the proper format.
// It returns zero if the files are valid
// It exits zero if the files are valid
//
// The compare script compares two results.
// If returns zero if the output files are equivalent.
// If exits zero if the output files are equivalent.
//
// arg1 ... argn represent cmdline args to be passed to the scripts.
// The options for init_script are:
//
// files list of paths of output files of the result
@ -39,6 +41,7 @@
// runtime task runtime
//
// Additional options for compare_script, for the second result:
//
// files2 list of paths of output files
// result_id2 result ID
// runtime2 task runtime
@ -204,4 +207,3 @@ int compare_results(RESULT& r1, void*, RESULT const& r2, void*, bool& match) {
int cleanup_result(RESULT const&, void*) {
return 0;
}

View File

@ -16,7 +16,7 @@
// along with BOINC. If not, see <http://www.gnu.org/licenses/>.
// Command-line program for creating jobs (workunits).
// Used directly for local job submission;
// Use directly for local job submission;
// run from PHP script for remote job submission.
//
// see https://github.com/BOINC/boinc/wiki/JobSubmission
@ -25,6 +25,8 @@
// - to create a single job, with everything passed on the cmdline
// - to create multiple jobs, where per-job info is passed via stdin,
// one line per job
//
// The input files must already be staged (i.e. in the download hierarchy).
#include "config.h"

75
tools/demo_submit_batch Normal file
View File

@ -0,0 +1,75 @@
#! /usr/bin/env python3
# usage: demo_submit_batch user_id app_name dir
# submit a batch of jobs
# assumptions:
# - you have an app that takes 1 input file
# - there's a directory of input files
#
# The job names will be of the form
# appname__batchid__*
# so that assimilators can know where to put output files
import os, sys, time, subprocess
def main(argv):
if len(argv) != 4:
print('Usage: demo_submit_batch user_id app_name dir')
sys.exit(1)
user_id = int(argv[1])
app_name = argv[2]
dir = argv[3]
# get list of input files
files = []
for entry in os.scandir(dir):
if not entry.is_file():
raise Exception('not file')
files.append(entry.name)
# make the batch record
cmd = [
'bin/create_batch',
'--app_name', app_name,
'--user_id', str(user_id),
'--njobs', str(len(files)),
'--name', '%s__%d'%(app_name, int(time.time()))
]
ret = subprocess.run(cmd, capture_output=True)
if ret.returncode:
raise Exception('create_batch failed (%d): %s'%(ret.returncode, ret.stdout))
batch_id = int(ret.stdout)
# stage the input files
cmd = ['bin/stage_file', '--copy', dir]
ret = subprocess.run(cmd, capture_output=True)
if ret.returncode:
raise Exception('stage_file failed (%d): %s'%(ret.returncode, ret.stdout))
# create the jobs
fstr = '\n'.join(files)
cmd = [
'bin/create_work',
'--appname', app_name,
'--batch', str(batch_id),
'--stdin'
]
ret = subprocess.run(cmd, input=fstr, capture_output=True, encoding='ascii')
if ret.returncode:
raise Exception('create_work failed (%d): %s'%(ret.returncode, ret.stdout))
# mark the batch as in progress
cmd = ['bin/create_work', '--enable', str(batch_id)]
ret = subprocess.run(cmd, capture_output=True)
if ret.returncode:
raise Exception('enable batch failed (%d): %s'%(ret.returncode, ret.stdout))
print('%d jobs submitted. Batch ID %d'%(len(files), batch_id))
main(sys.argv)

View File

@ -19,6 +19,8 @@
// fill in the workunit's XML document (wu.xml_doc)
// by scanning the input template, macro-substituting the input files,
// and putting in the command line element and additional XML
//
// Called (only) in create_work.cpp
#include <stdio.h>
#include <string>
@ -138,21 +140,23 @@ static void write_md5_info(
}
// generate a <file_info> element for workunit XML doc,
// based on the input template and list of variable files
// based on a <file_info> in an input template and list of input files
//
// Inputs:
// xp: parser for input template
// var_infiles: list of files descs passed to create_work (i.e. var files)
// in:
// xp: parser for input template, pointing after <file_info>
// var_infiles: list of file descs passed to create_work
// (the input template may also contain 'constant' input files
// that aren't passed to create_work)
//
// in/out:
// nfiles_parsed: increment if not constant file
//
// Outputs:
// infiles: vector (appended to) of all input files
// out:
// out: append the <file_info> element for the WU XML doc
// infiles: vector (appended to) of all input files
//
// Actions:
// as an input file for create_work.
// out: append the <file_info> element for the WU XML doc
// If not a constant file:
// increment nfiles_parsed
// generate .md5 file if needed
//
static int process_file_info(
XML_PARSER& xp, SCHED_CONFIG& config_loc,

30
tools/sample_assimilate.py Executable file
View File

@ -0,0 +1,30 @@
#! /usr/bin/env python3
# invoked either as
# sample_assimilate.py batch_id outfile_path1 ...
# or
# sample_assimilator.py --error error_code wu_name batch_id
#
# in the 1st case, move the output files from the upload hierarchy
# to sample_results/batch_id/
# in the 2nd case, append a line of the form
# wu_name error_code
# to samples_results/batch_id/errors
import sys, os
if sys.argv[1] == '--error':
error_code = sys.argv[2]
wu_name = sys.argv[3]
batch_id = sys.argv[4]
outdir = 'sample_results/%s'%(batch_id)
os.system('mkdir -p %s'%(outdir))
with f as open('%s/errors'%(outdir), 'a'):
f.write('%s %s\n'%(wu_name, error_code))
else:
batch_id = sys.argv[1]
outfile_path = sys.argv[2]
fname = os.path.basename(outfile_path)
outdir = 'sample_results/%s'%(batch_id)
os.system('mkdir -p %s'%(outdir))
os.system('mv %s %s/%s'%(outfile_path, outdir, fname))

View File

@ -20,6 +20,8 @@
// Stage an input file: namely,
// - move or copy it to the download hierarchy
// - compute its md5
// Note: this isn't necessary; it's done by create_work
// but we may as well do it here.
// - make a gzipped version if needed
//
// Usage (from project dir):
@ -30,8 +32,7 @@
// --gzip Make a gzipped version of the file.
// Use this if you specify <gzip> in the <file_info>
// --copy Copy the file (default is to move it)
//
// -- verbose
// --verbose verbose output
//
// path The file to be staged.
// If it's a directory, stage all the files in that dir
@ -96,10 +97,9 @@ function stage_file($path) {
break;
case -1:
error_exit("
There is already a file in your project's download directory with that name,
$path: There is already a file in your project's download hierarchy with that name,
but with different contents.
This is not allowed by BOINC, which requires that files be immutable.
Please use a different file name.
");
break;
}

View File

@ -15,11 +15,9 @@
// You should have received a copy of the GNU Lesser General Public License
// along with BOINC. If not, see <http://www.gnu.org/licenses/>.
#include "stage_file_native.h"
#include "filesys.h"
#include "error_numbers.h"
#include "sched_util_basic.h"
#include "md5_file.h"
// Stage an input file
// Native (maybe more efficient) version of 'stage_file';
// same cmdline args
#include <zlib.h>
#include <cstdio>
@ -29,7 +27,16 @@
#include <sstream>
#include <fstream>
static int create_md5_file(const char* file_path, const char* md5_file_path, bool verbose) {
#include "filesys.h"
#include "error_numbers.h"
#include "sched_util_basic.h"
#include "md5_file.h"
#include "stage_file_native.h"
static int create_md5_file(
const char* file_path, const char* md5_file_path, bool verbose
) {
char md5_file_hash[MD5_LEN], path[MAXPATHLEN];
FILE* md5_filep;
int retval;
@ -56,15 +63,12 @@ static int create_md5_file(const char* file_path, const char* md5_file_path, boo
int stage_file(
char* file_path,
SCHED_CONFIG& config,
bool gzip,
bool copy,
bool verbose
) {
char dl_hier_path[MAXPATHLEN], gz_path[MAXPATHLEN];
char md5_file_path[MAXPATHLEN], md5_file_hash[MD5_LEN];
char* file_name;
double nbytes;
int retval;
if (!boinc_file_exists(file_path)) {
@ -86,47 +90,47 @@ int stage_file(
}
switch (check_download_file(file_path, dl_hier_path)) {
case 0:
if (verbose) {
fprintf(stdout, "file %s has already been staged\n", file_path);
}
break;
case 1:
retval = create_md5_file(file_path, dl_hier_path, verbose);
if (retval) {
fprintf(stdout, "failed to create md5 file: %s\n", boincerror(retval));
return retval;
}
break;
case 2:
if (copy) {
retval = boinc_copy(file_path, dl_hier_path);
} else {
retval = boinc_rename(file_path, dl_hier_path);
}
if (retval) {
fprintf(stdout, "failed to copy or move file: %s\n", boincerror(retval));
return retval;
}
retval = create_md5_file(dl_hier_path, dl_hier_path, verbose);
if (retval) {
fprintf(stdout, "failed to create md5 file: %s\n", boincerror(retval));
return retval;
}
break;
case -1:
fprintf(stderr,
"There is already a file in your project's download directory with that name,\n"
"but with different contents. This is not allowed by BOINC, which requires that\n"
"files be immutable. Please use a different file name.\n"
);
return -1;
case -2:
fprintf(stderr, "check_download_file: file operation failed - %s\n", strerror(errno));
return -1;
default:
fprintf(stderr, "check_download_file: unknown return code %d\n", retval);
return -1;
case 0:
if (verbose) {
fprintf(stdout, "file %s has already been staged\n", file_path);
}
break;
case 1:
retval = create_md5_file(file_path, dl_hier_path, verbose);
if (retval) {
fprintf(stdout, "failed to create md5 file: %s\n", boincerror(retval));
return retval;
}
break;
case 2:
if (copy) {
retval = boinc_copy(file_path, dl_hier_path);
} else {
retval = boinc_rename(file_path, dl_hier_path);
}
if (retval) {
fprintf(stdout, "failed to copy or move file: %s\n", boincerror(retval));
return retval;
}
retval = create_md5_file(dl_hier_path, dl_hier_path, verbose);
if (retval) {
fprintf(stdout, "failed to create md5 file: %s\n", boincerror(retval));
return retval;
}
break;
case -1:
fprintf(stderr,
"There is already a file in your project's download directory with that name,\n"
"but with different contents. This is not allowed by BOINC, which requires that\n"
"files be immutable. Please use a different file name.\n"
);
return -1;
case -2:
fprintf(stderr, "check_download_file: file operation failed - %s\n", strerror(errno));
return -1;
default:
fprintf(stderr, "check_download_file: unknown return code %d\n", retval);
return -1;
}
if (gzip) {
@ -173,14 +177,16 @@ void usage(int exit_code) {
void run_stage_file(
char* file_path,
SCHED_CONFIG& config,
bool gzip,
bool copy,
bool verbose
) {
int retval = stage_file(file_path, config, gzip, copy, verbose);
int retval = stage_file(file_path, gzip, copy, verbose);
if (retval) {
fprintf(stderr, "stage_file failed for file %s: %s\n", file_path, boincerror(retval));
fprintf(stderr,
"stage_file failed for file %s: %s\n",
file_path, boincerror(retval)
);
exit(1);
}
}
@ -235,11 +241,9 @@ int main(int argc, char** argv) {
if (!is_file(file_path)) {
continue;
}
run_stage_file(file_path, config, gzip, copy, verbose);
run_stage_file(file_path, gzip, copy, verbose);
}
} else {
run_stage_file(path, config, gzip, copy, verbose);
run_stage_file(path, gzip, copy, verbose);
}
return 0;
}