From 4539eef345d9f94cfafe9e8b2af900ae7c810378 Mon Sep 17 00:00:00 2001 From: David Anderson Date: Fri, 12 Jan 2024 00:58:14 -0800 Subject: [PATCH] changes to support the job processing cookbook - script_assimilator: add batch_id option - add/fix comments - demo_submit_batch, sample_assimilate.py: finish - --- samples/worker/validate_compare.py | 12 +++ samples/worker/validate_init.py | 12 +++ sched/script_assimilator.cpp | 4 + sched/script_validator.cpp | 14 ++-- tools/create_work.cpp | 4 +- tools/demo_submit_batch | 75 ++++++++++++++++++ tools/process_input_template.cpp | 24 +++--- tools/sample_assimilate.py | 30 ++++++++ tools/stage_file | 8 +- tools/stage_file_native.cpp | 118 +++++++++++++++-------------- 10 files changed, 223 insertions(+), 78 deletions(-) create mode 100755 samples/worker/validate_compare.py create mode 100755 samples/worker/validate_init.py create mode 100644 tools/demo_submit_batch create mode 100755 tools/sample_assimilate.py diff --git a/samples/worker/validate_compare.py b/samples/worker/validate_compare.py new file mode 100755 index 0000000000..a1f4fa2ee5 --- /dev/null +++ b/samples/worker/validate_compare.py @@ -0,0 +1,12 @@ +#! /usr/bin/env python3 + +# check that 2 files are identical + +import sys + +def read(path): + with open(path) as f: + data = f.read() + return data + +exit(0 if read(sys.argv[1])==read(sys.argv[2]) else 1) diff --git a/samples/worker/validate_init.py b/samples/worker/validate_init.py new file mode 100755 index 0000000000..470986d17a --- /dev/null +++ b/samples/worker/validate_init.py @@ -0,0 +1,12 @@ +#! /usr/bin/env python3 + +# check that a file is uppercase + +import sys + +def is_uc(path): + with open(path) as f: + data = f.read() + return data == data.upper() + +exit(0 if is_uc(sys.argv[1]) else 1) diff --git a/sched/script_assimilator.cpp b/sched/script_assimilator.cpp index f41783f884..6e42184806 100644 --- a/sched/script_assimilator.cpp +++ b/sched/script_assimilator.cpp @@ -30,6 +30,7 @@ // wu_id workunit ID // result_id ID of the canonical result // runtime runtime of the canonical result +// batch_id the job's batch ID // // if no args are specified, the script is invoked as // scriptname wu_id files @@ -111,6 +112,9 @@ int assimilate_handler( } else if (s == "runtime") { sprintf(buf, " %f", canonical_result.elapsed_time); strcat(cmd, buf); + } else if (s == "batch_id") { + sprintf(buf, " %d", wu.batch); + strcat(cmd, buf); } } } else { diff --git a/sched/script_validator.cpp b/sched/script_validator.cpp index 154a87cb88..df0ac52f57 100644 --- a/sched/script_validator.cpp +++ b/sched/script_validator.cpp @@ -21,17 +21,19 @@ // cmdline args to this program: // --init_script "scriptname arg1 ... argn" // --compare_script "scriptname arg1 ... argn" -// -// You must specify at least one. +// where +// scriptname is the name of a script (in bin/) +// argi are keywords (see below) representing +// args to be passed to the script. +// You must specify at least one script. // // The init script checks the validity of a result, // e.g. that the output files have the proper format. -// It returns zero if the files are valid +// It exits zero if the files are valid // // The compare script compares two results. -// If returns zero if the output files are equivalent. +// If exits zero if the output files are equivalent. // -// arg1 ... argn represent cmdline args to be passed to the scripts. // The options for init_script are: // // files list of paths of output files of the result @@ -39,6 +41,7 @@ // runtime task runtime // // Additional options for compare_script, for the second result: +// // files2 list of paths of output files // result_id2 result ID // runtime2 task runtime @@ -204,4 +207,3 @@ int compare_results(RESULT& r1, void*, RESULT const& r2, void*, bool& match) { int cleanup_result(RESULT const&, void*) { return 0; } - diff --git a/tools/create_work.cpp b/tools/create_work.cpp index c4e851b71c..eb58f2b575 100644 --- a/tools/create_work.cpp +++ b/tools/create_work.cpp @@ -16,7 +16,7 @@ // along with BOINC. If not, see . // Command-line program for creating jobs (workunits). -// Used directly for local job submission; +// Use directly for local job submission; // run from PHP script for remote job submission. // // see https://github.com/BOINC/boinc/wiki/JobSubmission @@ -25,6 +25,8 @@ // - to create a single job, with everything passed on the cmdline // - to create multiple jobs, where per-job info is passed via stdin, // one line per job +// +// The input files must already be staged (i.e. in the download hierarchy). #include "config.h" diff --git a/tools/demo_submit_batch b/tools/demo_submit_batch new file mode 100644 index 0000000000..adc0f0a4b0 --- /dev/null +++ b/tools/demo_submit_batch @@ -0,0 +1,75 @@ +#! /usr/bin/env python3 + +# usage: demo_submit_batch user_id app_name dir +# submit a batch of jobs +# assumptions: +# - you have an app that takes 1 input file +# - there's a directory of input files +# +# The job names will be of the form +# appname__batchid__* +# so that assimilators can know where to put output files + +import os, sys, time, subprocess + +def main(argv): + if len(argv) != 4: + print('Usage: demo_submit_batch user_id app_name dir') + sys.exit(1) + + user_id = int(argv[1]) + app_name = argv[2] + dir = argv[3] + + # get list of input files + + files = [] + for entry in os.scandir(dir): + if not entry.is_file(): + raise Exception('not file') + files.append(entry.name) + + # make the batch record + + cmd = [ + 'bin/create_batch', + '--app_name', app_name, + '--user_id', str(user_id), + '--njobs', str(len(files)), + '--name', '%s__%d'%(app_name, int(time.time())) + ] + ret = subprocess.run(cmd, capture_output=True) + if ret.returncode: + raise Exception('create_batch failed (%d): %s'%(ret.returncode, ret.stdout)) + batch_id = int(ret.stdout) + + # stage the input files + + cmd = ['bin/stage_file', '--copy', dir] + ret = subprocess.run(cmd, capture_output=True) + if ret.returncode: + raise Exception('stage_file failed (%d): %s'%(ret.returncode, ret.stdout)) + + # create the jobs + + fstr = '\n'.join(files) + cmd = [ + 'bin/create_work', + '--appname', app_name, + '--batch', str(batch_id), + '--stdin' + ] + ret = subprocess.run(cmd, input=fstr, capture_output=True, encoding='ascii') + if ret.returncode: + raise Exception('create_work failed (%d): %s'%(ret.returncode, ret.stdout)) + + # mark the batch as in progress + + cmd = ['bin/create_work', '--enable', str(batch_id)] + ret = subprocess.run(cmd, capture_output=True) + if ret.returncode: + raise Exception('enable batch failed (%d): %s'%(ret.returncode, ret.stdout)) + + print('%d jobs submitted. Batch ID %d'%(len(files), batch_id)) + +main(sys.argv) diff --git a/tools/process_input_template.cpp b/tools/process_input_template.cpp index 14ac1f8c75..d38f265b07 100644 --- a/tools/process_input_template.cpp +++ b/tools/process_input_template.cpp @@ -19,6 +19,8 @@ // fill in the workunit's XML document (wu.xml_doc) // by scanning the input template, macro-substituting the input files, // and putting in the command line element and additional XML +// +// Called (only) in create_work.cpp #include #include @@ -138,21 +140,23 @@ static void write_md5_info( } // generate a element for workunit XML doc, -// based on the input template and list of variable files +// based on a in an input template and list of input files // -// Inputs: -// xp: parser for input template -// var_infiles: list of files descs passed to create_work (i.e. var files) +// in: +// xp: parser for input template, pointing after +// var_infiles: list of file descs passed to create_work +// (the input template may also contain 'constant' input files +// that aren't passed to create_work) +// +// in/out: +// nfiles_parsed: increment if not constant file // -// Outputs: -// infiles: vector (appended to) of all input files // out: +// out: append the element for the WU XML doc +// infiles: vector (appended to) of all input files // // Actions: -// as an input file for create_work. -// out: append the element for the WU XML doc -// If not a constant file: -// increment nfiles_parsed +// generate .md5 file if needed // static int process_file_info( XML_PARSER& xp, SCHED_CONFIG& config_loc, diff --git a/tools/sample_assimilate.py b/tools/sample_assimilate.py new file mode 100755 index 0000000000..f0a01fe118 --- /dev/null +++ b/tools/sample_assimilate.py @@ -0,0 +1,30 @@ +#! /usr/bin/env python3 + +# invoked either as +# sample_assimilate.py batch_id outfile_path1 ... +# or +# sample_assimilator.py --error error_code wu_name batch_id +# +# in the 1st case, move the output files from the upload hierarchy +# to sample_results/batch_id/ +# in the 2nd case, append a line of the form +# wu_name error_code +# to samples_results/batch_id/errors + +import sys, os + +if sys.argv[1] == '--error': + error_code = sys.argv[2] + wu_name = sys.argv[3] + batch_id = sys.argv[4] + outdir = 'sample_results/%s'%(batch_id) + os.system('mkdir -p %s'%(outdir)) + with f as open('%s/errors'%(outdir), 'a'): + f.write('%s %s\n'%(wu_name, error_code)) +else: + batch_id = sys.argv[1] + outfile_path = sys.argv[2] + fname = os.path.basename(outfile_path) + outdir = 'sample_results/%s'%(batch_id) + os.system('mkdir -p %s'%(outdir)) + os.system('mv %s %s/%s'%(outfile_path, outdir, fname)) diff --git a/tools/stage_file b/tools/stage_file index 65f6377d9a..e46110b1d9 100755 --- a/tools/stage_file +++ b/tools/stage_file @@ -20,6 +20,8 @@ // Stage an input file: namely, // - move or copy it to the download hierarchy // - compute its md5 +// Note: this isn't necessary; it's done by create_work +// but we may as well do it here. // - make a gzipped version if needed // // Usage (from project dir): @@ -30,8 +32,7 @@ // --gzip Make a gzipped version of the file. // Use this if you specify in the // --copy Copy the file (default is to move it) -// -// -- verbose +// --verbose verbose output // // path The file to be staged. // If it's a directory, stage all the files in that dir @@ -96,10 +97,9 @@ function stage_file($path) { break; case -1: error_exit(" -There is already a file in your project's download directory with that name, +$path: There is already a file in your project's download hierarchy with that name, but with different contents. This is not allowed by BOINC, which requires that files be immutable. -Please use a different file name. "); break; } diff --git a/tools/stage_file_native.cpp b/tools/stage_file_native.cpp index a137b745fe..eed0325489 100644 --- a/tools/stage_file_native.cpp +++ b/tools/stage_file_native.cpp @@ -15,11 +15,9 @@ // You should have received a copy of the GNU Lesser General Public License // along with BOINC. If not, see . -#include "stage_file_native.h" -#include "filesys.h" -#include "error_numbers.h" -#include "sched_util_basic.h" -#include "md5_file.h" +// Stage an input file +// Native (maybe more efficient) version of 'stage_file'; +// same cmdline args #include #include @@ -29,7 +27,16 @@ #include #include -static int create_md5_file(const char* file_path, const char* md5_file_path, bool verbose) { +#include "filesys.h" +#include "error_numbers.h" +#include "sched_util_basic.h" +#include "md5_file.h" + +#include "stage_file_native.h" + +static int create_md5_file( + const char* file_path, const char* md5_file_path, bool verbose +) { char md5_file_hash[MD5_LEN], path[MAXPATHLEN]; FILE* md5_filep; int retval; @@ -56,15 +63,12 @@ static int create_md5_file(const char* file_path, const char* md5_file_path, boo int stage_file( char* file_path, - SCHED_CONFIG& config, bool gzip, bool copy, bool verbose ) { char dl_hier_path[MAXPATHLEN], gz_path[MAXPATHLEN]; - char md5_file_path[MAXPATHLEN], md5_file_hash[MD5_LEN]; char* file_name; - double nbytes; int retval; if (!boinc_file_exists(file_path)) { @@ -86,47 +90,47 @@ int stage_file( } switch (check_download_file(file_path, dl_hier_path)) { - case 0: - if (verbose) { - fprintf(stdout, "file %s has already been staged\n", file_path); - } - break; - case 1: - retval = create_md5_file(file_path, dl_hier_path, verbose); - if (retval) { - fprintf(stdout, "failed to create md5 file: %s\n", boincerror(retval)); - return retval; - } - break; - case 2: - if (copy) { - retval = boinc_copy(file_path, dl_hier_path); - } else { - retval = boinc_rename(file_path, dl_hier_path); - } - if (retval) { - fprintf(stdout, "failed to copy or move file: %s\n", boincerror(retval)); - return retval; - } - retval = create_md5_file(dl_hier_path, dl_hier_path, verbose); - if (retval) { - fprintf(stdout, "failed to create md5 file: %s\n", boincerror(retval)); - return retval; - } - break; - case -1: - fprintf(stderr, - "There is already a file in your project's download directory with that name,\n" - "but with different contents. This is not allowed by BOINC, which requires that\n" - "files be immutable. Please use a different file name.\n" - ); - return -1; - case -2: - fprintf(stderr, "check_download_file: file operation failed - %s\n", strerror(errno)); - return -1; - default: - fprintf(stderr, "check_download_file: unknown return code %d\n", retval); - return -1; + case 0: + if (verbose) { + fprintf(stdout, "file %s has already been staged\n", file_path); + } + break; + case 1: + retval = create_md5_file(file_path, dl_hier_path, verbose); + if (retval) { + fprintf(stdout, "failed to create md5 file: %s\n", boincerror(retval)); + return retval; + } + break; + case 2: + if (copy) { + retval = boinc_copy(file_path, dl_hier_path); + } else { + retval = boinc_rename(file_path, dl_hier_path); + } + if (retval) { + fprintf(stdout, "failed to copy or move file: %s\n", boincerror(retval)); + return retval; + } + retval = create_md5_file(dl_hier_path, dl_hier_path, verbose); + if (retval) { + fprintf(stdout, "failed to create md5 file: %s\n", boincerror(retval)); + return retval; + } + break; + case -1: + fprintf(stderr, + "There is already a file in your project's download directory with that name,\n" + "but with different contents. This is not allowed by BOINC, which requires that\n" + "files be immutable. Please use a different file name.\n" + ); + return -1; + case -2: + fprintf(stderr, "check_download_file: file operation failed - %s\n", strerror(errno)); + return -1; + default: + fprintf(stderr, "check_download_file: unknown return code %d\n", retval); + return -1; } if (gzip) { @@ -173,14 +177,16 @@ void usage(int exit_code) { void run_stage_file( char* file_path, - SCHED_CONFIG& config, bool gzip, bool copy, bool verbose ) { - int retval = stage_file(file_path, config, gzip, copy, verbose); + int retval = stage_file(file_path, gzip, copy, verbose); if (retval) { - fprintf(stderr, "stage_file failed for file %s: %s\n", file_path, boincerror(retval)); + fprintf(stderr, + "stage_file failed for file %s: %s\n", + file_path, boincerror(retval) + ); exit(1); } } @@ -235,11 +241,9 @@ int main(int argc, char** argv) { if (!is_file(file_path)) { continue; } - run_stage_file(file_path, config, gzip, copy, verbose); + run_stage_file(file_path, gzip, copy, verbose); } } else { - run_stage_file(path, config, gzip, copy, verbose); + run_stage_file(path, gzip, copy, verbose); } - - return 0; }