// This file is part of BOINC. // http://boinc.berkeley.edu // Copyright (C) 2008 University of California // // BOINC is free software; you can redistribute it and/or modify it // under the terms of the GNU Lesser General Public License // as published by the Free Software Foundation, // either version 3 of the License, or (at your option) any later version. // // BOINC is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with BOINC. If not, see . #include "config.h" #ifdef _USING_FCGI_ #include "boinc_fcgi.h" #else #include #endif #include #include #include #include #include #include #include #include #include #include #include "boinc_db.h" #include "crypt.h" #include "error_numbers.h" #include "md5_file.h" #include "parse.h" #include "str_util.h" #include "str_replace.h" #include "common_defs.h" #include "filesys.h" #include "sched_util.h" #include "util.h" #include "backend_lib.h" using std::string; static struct random_init { random_init() { srand48(getpid() + time(0)); } } random_init; int read_file(FILE* f, char* buf, int len) { int n = fread(buf, 1, len, f); buf[n] = 0; return 0; } int read_filename(const char* path, char* buf, int len) { int retval; #ifndef _USING_FCGI_ FILE* f = fopen(path, "r"); #else FCGI_FILE *f=FCGI::fopen(path, "r"); #endif if (!f) return -1; retval = read_file(f, buf, len); fclose(f); return retval; } // see checkin notes Dec 30 2004 // static bool got_md5_info( const char *path, char *md5data, double *nbytes ) { bool retval=false; // look for file named FILENAME.md5 containing md5sum and length. // If found, and newer mod time than file, read md5 sum and file // length from it. char md5name[512]; struct stat md5stat, filestat; char endline='\0'; sprintf(md5name, "%s.md5", path); // get mod times for file if (stat(path, &filestat)) return retval; // get mod time for md5 cache file if (stat(md5name, &md5stat)) return retval; // if cached md5 newer, then open it #ifndef _USING_FCGI_ FILE *fp=fopen(md5name, "r"); #else FCGI_FILE *fp=FCGI::fopen(md5name, "r"); #endif if (!fp) return retval; // read two quantities: md5 sum and length. If we can't read // these, or there is MORE stuff in the file' it's not an md5 // cache file if (3 == fscanf(fp, "%s %lf%c", md5data, nbytes, &endline) && endline=='\n' && EOF==fgetc(fp) ) retval=true; fclose(fp); // if this is one of our cached md5 files, but it's OLDER than the // data file which it supposedly corresponds to, delete it. if (retval && md5stat.st_mtime urls; bool generated_locally = false; file_number = nbytesdef = -1; md5str = urlstr = ""; out += "\n"; while (!xp.get(tag, sizeof(tag), is_tag)) { if (xp.parse_int(tag, "number", file_number)) { continue; } else if (xp.parse_bool(tag, "generated_locally", generated_locally)) { continue; } else if (xp.parse_string(tag, "url", urlstr)) { urls.push_back(urlstr); continue; } else if (xp.parse_string(tag, "md5_cksum", md5str)) { continue; } else if (xp.parse_double(tag, "nbytes", nbytesdef)) { continue; } else if (!strcmp(tag, "/file_info")) { if (nbytesdef != -1 || md5str != "" || urlstr != "") { if (nbytesdef == -1 || md5str == "" || urlstr == "") { fprintf(stderr, "All file properties must be defined " "if at least one is defined (url, md5_cksum, nbytes)!\n" ); return ERR_XML_PARSE; } } if (file_number < 0) { fprintf(stderr, "No file number found\n"); return ERR_XML_PARSE; } if (file_number >= ninfiles) { fprintf(stderr, "Too few input files given; need at least %d\n", file_number+1 ); return ERR_XML_PARSE; } nfiles_parsed++; if (generated_locally) { sprintf(buf, " %s\n" " \n" "\n", infiles[file_number] ); } else if (nbytesdef == -1) { // here if nybtes was not supplied; stage the file // dir_hier_path( infiles[file_number], config_loc.download_dir, config_loc.uldl_dir_fanout, path, true ); // if file isn't found in hierarchy, // look for it at top level and copy // if (!boinc_file_exists(path)) { sprintf(top_download_path, "%s/%s",config_loc.download_dir, infiles[file_number] ); boinc_copy(top_download_path, path); } if (!config_loc.cache_md5_info || !got_md5_info(path, md5, &nbytes)) { retval = md5_file(path, md5, nbytes); if (retval) { fprintf(stderr, "process_wu_template: md5_file %d\n", retval); return retval; } else if (config_loc.cache_md5_info) { write_md5_info(path, md5, nbytes); } } dir_hier_url( infiles[file_number], config_loc.download_url, config_loc.uldl_dir_fanout, url ); sprintf(buf, " %s\n" " %s\n" " %s\n" " %.0f\n" "\n", infiles[file_number], url, md5, nbytes ); } else { // here if nbytes etc. was supplied, // i.e the file is already staged, possibly remotely // urlstr = ""; for (unsigned int i=0; i\n"; } sprintf(buf, " %s\n" "%s" " %s\n" " %.0f\n" "\n", infiles[file_number], urlstr.c_str(), md5str.c_str(), nbytesdef ); } out += buf; break; } else { char buf2[1024]; retval = xp.element(tag, buf2, sizeof(buf2)); if (retval) return retval; out += buf2; out += "\n"; } } } else if (!strcmp(tag, "workunit")) { found = true; out += "\n"; if (command_line) { //fprintf(stderr, "appending command line: %s\n", command_line); out += "\n"; out += command_line; out += "\n\n"; } while (!xp.get(tag, sizeof(tag), is_tag)) { if (!strcmp(tag, "/workunit")) { if (additional_xml && strlen(additional_xml)) { out += additional_xml; out += "\n"; } out += "\n"; break; } else if (!strcmp(tag, "file_ref")) { out += "\n"; bool found_file_number = false, found_open_name = false; while (!xp.get(tag, sizeof(tag), is_tag)) { if (xp.parse_int(tag, "file_number", file_number)) { sprintf(buf, " %s\n", infiles[file_number] ); out += buf; found_file_number = true; continue; } else if (xp.parse_str(tag, "open_name", open_name, sizeof(open_name))) { sprintf(buf, " %s\n", open_name); out += buf; found_open_name = true; continue; } else if (!strcmp(tag, "/file_ref")) { if (!found_file_number) { fprintf(stderr, "No file number found\n"); return ERR_XML_PARSE; } if (!found_open_name) { fprintf(stderr, "No open name found\n"); return ERR_XML_PARSE; } out += "\n"; break; } else if (xp.parse_string(tag, "file_name", tmpstr)) { fprintf(stderr, " ignored in element.\n"); continue; } else { char buf2[1024]; retval = xp.element(tag, buf2, sizeof(buf2)); if (retval) return retval; out += buf2; out += "\n"; } } } else if (xp.parse_string(tag, "command_line", cmdline)) { if (command_line) { fprintf(stderr, "Can't specify command line twice"); return ERR_XML_PARSE; } out += "\n"; out += cmdline; out += "\n\n"; } else if (xp.parse_double(tag, "rsc_fpops_est", dtemp)) { if (!wu.rsc_fpops_est) { wu.rsc_fpops_est = dtemp; } continue; } else if (xp.parse_double(tag, "rsc_fpops_bound", dtemp)) { if (!wu.rsc_fpops_bound) { wu.rsc_fpops_bound = dtemp; } continue; } else if (xp.parse_double(tag, "rsc_memory_bound", dtemp)) { if (!wu.rsc_memory_bound) { wu.rsc_memory_bound = dtemp; } continue; } else if (xp.parse_double(tag, "rsc_bandwidth_bound", dtemp)) { if (!wu.rsc_bandwidth_bound) { wu.rsc_bandwidth_bound = dtemp; } continue; } else if (xp.parse_double(tag, "rsc_disk_bound", dtemp)) { if (!wu.rsc_disk_bound) { wu.rsc_disk_bound = dtemp; } continue; } else if (xp.parse_int(tag, "batch", wu.batch)) { continue; } else if (xp.parse_int(tag, "delay_bound", itemp)) { if (!wu.delay_bound) { wu.delay_bound = itemp; } continue; } else if (xp.parse_int(tag, "min_quorum", wu.min_quorum)) { continue; } else if (xp.parse_int(tag, "target_nresults", wu.target_nresults)) { continue; } else if (xp.parse_int(tag, "max_error_results", wu.max_error_results)) { continue; } else if (xp.parse_int(tag, "max_total_results", wu.max_total_results)) { continue; } else if (xp.parse_int(tag, "max_success_results", wu.max_success_results)) { continue; } else { char buf2[1024]; retval = xp.element(tag, buf2, sizeof(buf2)); if (retval) return retval; out += buf2; out += "\n"; } } } } if (!found) { fprintf(stderr, "process_wu_template: bad WU template - no \n"); return -1; } if (nfiles_parsed != ninfiles) { fprintf(stderr, "process_wu_template: %d input files listed, but template has %d\n", ninfiles, nfiles_parsed ); return -1; } if (out.size() > sizeof(wu.xml_doc)-1) { fprintf(stderr, "create_work: WU XML field is too long (%d bytes; max is %d)\n", (int)out.size(), (int)sizeof(wu.xml_doc)-1 ); return ERR_BUFFER_OVERFLOW; } //fprintf(stderr, "copying to xml_doc: %s\n", out.c_str()); strcpy(wu.xml_doc, out.c_str()); return 0; } // initialize an about-to-be-created result, given its WU // static void initialize_result(DB_RESULT& result, WORKUNIT& wu) { result.id = 0; result.create_time = time(0); result.workunitid = wu.id; result.server_state = RESULT_SERVER_STATE_UNSENT; result.hostid = 0; result.report_deadline = 0; result.sent_time = 0; result.received_time = 0; result.client_state = 0; result.cpu_time = 0; strcpy(result.xml_doc_out, ""); strcpy(result.stderr_out, ""); result.outcome = RESULT_OUTCOME_INIT; result.file_delete_state = ASSIMILATE_INIT; result.validate_state = VALIDATE_STATE_INIT; result.claimed_credit = 0; result.granted_credit = 0; result.appid = wu.appid; result.priority = wu.priority; result.batch = wu.batch; } int create_result_ti( TRANSITIONER_ITEM& ti, char* result_template_filename, char* result_name_suffix, R_RSA_PRIVATE_KEY& key, SCHED_CONFIG& config_loc, char* query_string, // if nonzero, write value list here; else do insert int priority_increase ) { WORKUNIT wu; // copy relevant fields from TRANSITIONER_ITEM to WORKUNIT // strcpy(wu.name, ti.name); wu.id = ti.id; wu.appid = ti.appid; wu.priority = ti.priority; wu.batch = ti.batch; return create_result( wu, result_template_filename, result_name_suffix, key, config_loc, query_string, priority_increase ); } // Create a new result for the given WU. // This is called from: // - the transitioner // - the scheduler (for assigned jobs) // int create_result( WORKUNIT& wu, char* result_template_filename, char* result_name_suffix, R_RSA_PRIVATE_KEY& key, SCHED_CONFIG& config_loc, char* query_string, // if nonzero, write value list here; else do insert int priority_increase ) { DB_RESULT result; char base_outfile_name[256]; char result_template[BLOB_SIZE]; int retval; result.clear(); initialize_result(result, wu); result.priority += priority_increase; sprintf(result.name, "%s_%s", wu.name, result_name_suffix); sprintf(base_outfile_name, "%s_", result.name); retval = read_filename( result_template_filename, result_template, sizeof(result_template) ); if (retval) { fprintf(stderr, "Failed to read result template file '%s': %d\n", result_template_filename, retval ); return retval; } retval = process_result_template( result_template, key, base_outfile_name, config_loc ); if (retval) { fprintf(stderr, "process_result_template() error: %d\n", retval); } if (strlen(result_template) > sizeof(result.xml_doc_in)-1) { fprintf(stderr, "result XML doc is too long: %d bytes, max is %d\n", (int)strlen(result_template), (int)sizeof(result.xml_doc_in)-1 ); return ERR_BUFFER_OVERFLOW; } strlcpy(result.xml_doc_in, result_template, sizeof(result.xml_doc_in)); result.random = lrand48(); if (query_string) { result.db_print_values(query_string); } else { retval = result.insert(); if (retval) { fprintf(stderr, "result.insert(): %d\n", retval); return retval; } } return 0; } // make sure a WU's input files are actually there // int check_files(char** infiles, int ninfiles, SCHED_CONFIG& config_loc) { int i; char path[256]; for (i=0; i sizeof(wu.result_template_file)-1) { fprintf(stderr, "result template filename is too big: %d bytes, max is %d\n", (int)strlen(result_template_filename), (int)sizeof(wu.result_template_file)-1 ); return ERR_BUFFER_OVERFLOW; } strlcpy(wu.result_template_file, result_template_filename, sizeof(wu.result_template_file)); if (wu.rsc_fpops_est == 0) { fprintf(stderr, "no rsc_fpops_est given; can't create job\n"); return ERR_NO_OPTION; } if (wu.rsc_fpops_bound == 0) { fprintf(stderr, "no rsc_fpops_bound given; can't create job\n"); return ERR_NO_OPTION; } if (wu.rsc_disk_bound == 0) { fprintf(stderr, "no rsc_disk_bound given; can't create job\n"); return ERR_NO_OPTION; } if (wu.target_nresults == 0) { fprintf(stderr, "no target_nresults given; can't create job\n"); return ERR_NO_OPTION; } if (wu.max_error_results == 0) { fprintf(stderr, "no max_error_results given; can't create job\n"); return ERR_NO_OPTION; } if (wu.max_total_results == 0) { fprintf(stderr, "no max_total_results given; can't create job\n"); return ERR_NO_OPTION; } if (wu.max_success_results == 0) { fprintf(stderr, "no max_success_results given; can't create job\n"); return ERR_NO_OPTION; } if (wu.max_success_results > wu.max_total_results) { fprintf(stderr, "max_success_results > max_total_results; can't create job\n"); return ERR_INVALID_PARAM; } if (wu.max_error_results > wu.max_total_results) { fprintf(stderr, "max_error_results > max_total_results; can't create job\n"); return ERR_INVALID_PARAM; } if (wu.target_nresults > wu.max_success_results) { fprintf(stderr, "target_nresults > max_success_results; can't create job\n"); return ERR_INVALID_PARAM; } if (strstr(wu.name, ASSIGNED_WU_STR)) { wu.transition_time = INT_MAX; } else { wu.transition_time = time(0); } if (wu.id) { retval = wu.update(); if (retval) { fprintf(stderr, "create_work: workunit.update() %d\n", retval); return retval; } } else { retval = wu.insert(); if (retval) { fprintf(stderr, "create_work: workunit.insert() %d\n", retval); return retval; } wu.id = boinc_db.insert_id(); } return 0; } // STUFF RELATED TO FILE UPLOAD/DOWNLOAD int get_file( int host_id, const char* file_name, vector urls, double max_nbytes, double report_deadline, bool generate_upload_certificate, R_RSA_PRIVATE_KEY& key ) {; char buf[8192]; DB_MSG_TO_HOST mth; int retval; mth.clear(); mth.create_time = time(0); mth.hostid = host_id; strcpy(mth.variety, "file_xfer"); mth.handled = false; sprintf(mth.xml, "\n" " file_xfer\n" "\n" "\n" " file_xfer\n" " 0\n" "\n" "\n" " %s\n" " %.0f\n", max_nbytes, file_name ); for (unsigned int i=0; i%s\n", urls[i]); strcat(mth.xml, buf); } sprintf(buf, "\n" "\n" " upload_%s\n" " file_xfer\n" "\n" "\n" " upload_%s\n" " upload_%s\n" " \n" " %s\n" " \n" " %f\n" "\n", file_name, file_name, file_name, file_name, report_deadline ); strcat(mth.xml, buf); if (generate_upload_certificate) { add_signatures(mth.xml, key); } retval = mth.insert(); if (retval) { fprintf(stderr, "msg_to_host.insert(): %s\n", boincerror(retval)); return retval; } return 0; } int put_file( int host_id, const char* file_name, vector urls, const char* md5, double nbytes, double report_deadline ) { char buf[8192]; DB_MSG_TO_HOST mth; int retval; mth.clear(); mth.create_time = time(0); mth.hostid = host_id; strcpy(mth.variety, "file_xfer"); mth.handled = false; sprintf(mth.xml, "\n" " file_xfer\n" "\n" "\n" " file_xfer\n" " 0\n" "\n" "\n" " %s\n", file_name ); for (unsigned int i=0; i%s\n", urls[i]); strcat(mth.xml, buf); } sprintf(buf, " %s\n" " %.0f\n" " \n" "\n" "\n" " download_%s\n" " file_xfer\n" " \n" " %s\n" " \n" "\n" "\n" " download_%s\n" " download_%s\n" " %f\n" "\n", md5, nbytes, file_name, file_name, file_name, file_name, report_deadline ); strcat(mth.xml, buf); retval = mth.insert(); if (retval) { fprintf(stderr, "msg_to_host.insert(): %s\n", boincerror(retval)); return retval; } return 0; } const char *BOINC_RCSID_b5f8b10eb5 = "$Id$";