// This file is part of BOINC. // http://boinc.berkeley.edu // Copyright (C) 2012 University of California // // BOINC is free software; you can redistribute it and/or modify it // under the terms of the GNU Lesser General Public License // as published by the Free Software Foundation, // either version 3 of the License, or (at your option) any later version. // // BOINC is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with BOINC. If not, see . // C++ interfaces to web RPCs related to remote job submission, // namely those described here: // http://boinc.berkeley.edu/trac/wiki/RemoteInputFiles // http://boinc.berkeley.edu/trac/wiki/RemoteOutputFiles // http://boinc.berkeley.edu/trac/wiki/RemoteJobs #include #include #include #include #include #include "parse.h" #include "str_util.h" #include "url.h" #include "remote_submit.h" using std::vector; using std::string; //#define SHOW_REPLY // do an HTTP GET request. // static int do_http_get( const char* url, const char* dst_path ) { FILE* reply = fopen(dst_path, "w"); if (!reply) return -1; CURL *curl = curl_easy_init(); if (!curl) { fclose(reply); return -1; } curl_easy_setopt(curl, CURLOPT_URL, url); curl_easy_setopt(curl, CURLOPT_USERAGENT, "BOINC remote job submission"); curl_easy_setopt(curl, CURLOPT_WRITEDATA, reply); CURLcode res = curl_easy_perform(curl); if (res != CURLE_OK) { fprintf(stderr, "CURL error: %s\n", curl_easy_strerror(res)); } curl_easy_cleanup(curl); fclose(reply); return 0; } // send an HTTP POST request, // with an optional set of multi-part file attachments // static int do_http_post( const char* url, const char* request, FILE* reply, vector send_files ) { CURL *curl; CURLcode res; char buf[256]; curl = curl_easy_init(); if (!curl) { return -1; } struct curl_httppost *formpost=NULL; struct curl_httppost *lastptr=NULL; struct curl_slist *headerlist=NULL; curl_formadd(&formpost, &lastptr, CURLFORM_COPYNAME, "request", CURLFORM_COPYCONTENTS, request, CURLFORM_END ); for (unsigned int i=0; i &md5s, int batch_id, vector &absent_files, string& error_msg ) { string req_msg; char buf[256]; req_msg = "\n"; sprintf(buf, "%s\n", authenticator); req_msg += string(buf); if (batch_id) { sprintf(buf, "%d\n", batch_id); req_msg += string(buf); } for (unsigned int i=0; i%s\n", md5s[i].c_str()); req_msg += string(buf); } req_msg += "\n"; FILE* reply = tmpfile(); char url[256]; sprintf(url, "%sjob_file.php", project_url); vector xx; int retval = do_http_post(url, req_msg.c_str(), reply, xx); if (retval) { fclose(reply); return retval; } fseek(reply, 0, SEEK_SET); int x; retval = -1; error_msg = ""; while (fgets(buf, 256, reply)) { #ifdef SHOW_REPLY printf("query_files reply: %s", buf); #endif if (strstr(buf, "absent_files")) { retval = 0; continue; } if (parse_int(buf, "", retval)) continue; if (parse_str(buf, "", error_msg)) continue; if (parse_int(buf, "", x)) { absent_files.push_back(x); continue; } } fclose(reply); return retval; } int upload_files ( const char* project_url, const char* authenticator, vector &paths, vector &md5s, int batch_id, string &error_msg ) { char buf[1024]; string req_msg = "\n"; sprintf(buf, "%s\n", authenticator); req_msg += string(buf); if (batch_id) { sprintf(buf, "%d\n", batch_id); req_msg += string(buf); } for (unsigned int i=0; i%s\n", md5s[i].c_str()); req_msg += string(buf); } req_msg += "\n"; FILE* reply = tmpfile(); char url[256]; sprintf(url, "%sjob_file.php", project_url); int retval = do_http_post(url, req_msg.c_str(), reply, paths); if (retval) { fclose(reply); return retval; } fseek(reply, 0, SEEK_SET); retval = -1; error_msg = ""; while (fgets(buf, 256, reply)) { #ifdef SHOW_REPLY printf("upload_files reply: %s", buf); #endif if (strstr(buf, "success")) { retval = 0; continue; } if (parse_int(buf, "", retval)) continue; if (parse_str(buf, "", error_msg)) continue; } fclose(reply); return retval; } int create_batch( const char* project_url, const char* authenticator, const char* batch_name, const char* app_name, double expire_time, int& batch_id, string& error_msg ) { char request[1024]; char url[1024]; sprintf(request, "\n" " %s\n" " \n" " %s\n" " %s\n" " %f\n" " \n" "\n", authenticator, batch_name, app_name, expire_time ); sprintf(url, "%ssubmit_rpc_handler.php", project_url); FILE* reply = tmpfile(); vector x; int retval = do_http_post(url, request, reply, x); if (retval) { fclose(reply); return retval; } char buf[256]; batch_id = 0; fseek(reply, 0, SEEK_SET); int error_num = 0; error_msg = ""; while (fgets(buf, 256, reply)) { #ifdef SHOW_REPLY printf("create_batch reply: %s", buf); #endif if (parse_int(buf, "", batch_id)) continue; if (parse_int(buf, "", error_num)) continue; if (parse_str(buf, "", error_msg)) continue; } fclose(reply); return error_num; } int estimate_batch( const char* project_url, const char* authenticator, char app_name[256], vector jobs, double& est_makespan, string& error_msg ) { char buf[1024], url[1024]; sprintf(buf, "\n" "%s\n" "\n" " %s\n", authenticator, app_name ); string request = buf; for (unsigned int i=0; i\n"; } request += "\n"; } request += "\n\n"; sprintf(url, "%ssubmit_rpc_handler.php", project_url); FILE* reply = tmpfile(); vector x; int retval = do_http_post(url, request.c_str(), reply, x); if (retval) { fclose(reply); return retval; } fseek(reply, 0, SEEK_SET); retval = -1; error_msg = ""; while (fgets(buf, 256, reply)) { #ifdef SHOW_REPLY printf("submit_batch reply: %s", buf); #endif if (parse_double(buf, "", est_makespan)) { retval = 0; continue; } if (parse_int(buf, "", retval)) continue; if (parse_str(buf, "", error_msg)) continue; } fclose(reply); return retval; } int submit_jobs( const char* project_url, const char* authenticator, char app_name[256], int batch_id, vector jobs, string& error_msg ) { char buf[1024], url[1024]; sprintf(buf, "\n" "%s\n" "\n" " %d\n" " %s\n", authenticator, batch_id, app_name ); string request = buf; for (unsigned int i=0; i%s\n", job.job_name); request += buf; if (!job.cmdline_args.empty()) { request += "" + job.cmdline_args + "\n"; } for (unsigned int j=0; j\n" "local_staged\n" "%s\n" "\n", infile.physical_name ); request += buf; } request += "\n"; } request += "\n\n"; sprintf(url, "%ssubmit_rpc_handler.php", project_url); FILE* reply = tmpfile(); vector x; int retval = do_http_post(url, request.c_str(), reply, x); if (retval) { fclose(reply); return retval; } fseek(reply, 0, SEEK_SET); retval = -1; error_msg = ""; int temp; while (fgets(buf, 256, reply)) { #ifdef SHOW_REPLY printf("submit_batch reply: %s", buf); #endif if (parse_int(buf, "", temp)) { retval = 0; continue; } if (parse_int(buf, "", retval)) continue; if (parse_str(buf, "", error_msg)) continue; } fclose(reply); return retval; } int query_batch_set( const char* project_url, const char* authenticator, double min_mod_time, vector &batch_names, QUERY_BATCH_SET_REPLY& qb_reply, string& error_msg ) { string request; char url[1024], buf[256]; int batch_size; request = "\n"; sprintf(buf, "%s\n", authenticator); request += string(buf); sprintf(buf, "%f\n", min_mod_time); request += string(buf); for (unsigned int i=0; i%s\n", batch_names[i].c_str()); request += string(buf); } request += "\n"; sprintf(url, "%ssubmit_rpc_handler.php", project_url); FILE* reply = tmpfile(); vector x; int retval = do_http_post(url, request.c_str(), reply, x); if (retval) { fclose(reply); return retval; } fseek(reply, 0, SEEK_SET); retval = -1; qb_reply.server_time = 0; error_msg = ""; while (fgets(buf, 256, reply)) { #ifdef SHOW_REPLY printf("query_batches reply: %s", buf); #endif if (strstr(buf, "jobs")) { retval = 0; continue; } if (parse_int(buf, "", retval)) continue; if (parse_str(buf, "", error_msg)) continue; if (parse_double(buf, "", qb_reply.server_time)) continue; if (parse_int(buf, "", batch_size)) { qb_reply.batch_sizes.push_back(batch_size); continue; } if (strstr(buf, "")) { JOB_STATUS js; while (fgets(buf, 256, reply)) { #ifdef SHOW_REPLY printf("query_batches reply: %s", buf); #endif if (strstr(buf, "")) { qb_reply.jobs.push_back(js); break; } if (parse_str(buf, "job_name", js.job_name)) continue; if (parse_str(buf, "status", js.status)) continue; } continue; } } fclose(reply); return retval; } int BATCH_STATUS::parse(XML_PARSER& xp) { memset(this, 0, sizeof(BATCH_STATUS)); while (!xp.get_tag()) { if (xp.match_tag("/batch")) { return 0; } if (xp.parse_int("id", id)) continue; if (xp.parse_str("name", name, sizeof(name))) continue; if (xp.parse_int("state", state)) continue; if (xp.parse_int("njobs", njobs)) continue; if (xp.parse_int("nerror_jobs", nerror_jobs)) continue; if (xp.parse_double("fraction_done", fraction_done)) continue; if (xp.parse_double("create_time", create_time)) continue; if (xp.parse_double("expire_time", expire_time)) continue; if (xp.parse_double("est_completion_time", est_completion_time)) continue; if (xp.parse_double("completion_time", completion_time)) continue; if (xp.parse_double("credit_estimate", credit_estimate)) continue; if (xp.parse_double("credit_canonical", credit_canonical)) continue; } return ERR_XML_PARSE; } void BATCH_STATUS::print() { printf("Batch %d (%s)\n" " state: %s\n" " njobs: %d\n" " nerror_jobs: %d\n" " fraction_done: %f\n", id, name, batch_state_string(state), njobs, nerror_jobs, fraction_done ); printf( " create_time: %s\n", time_to_string(create_time) ); printf( " expire_time: %s\n", time_to_string(expire_time) ); printf( " est_completion_time: %s\n", time_to_string(est_completion_time) ); printf( " completion_time: %s\n", time_to_string(completion_time) ); printf( " credit_estimate: %f\n" " credit_canonical: %f\n", credit_estimate, credit_canonical ); } int query_batches( const char* project_url, const char* authenticator, vector& batches, string &error_msg ) { string request; char url[1024], buf[256]; request = "\n"; sprintf(buf, "%s\n", authenticator); request += string(buf); request += "\n"; sprintf(url, "%ssubmit_rpc_handler.php", project_url); FILE* reply = tmpfile(); vector x; int retval = do_http_post(url, request.c_str(), reply, x); if (retval) { fclose(reply); return retval; } fseek(reply, 0, SEEK_SET); retval = -1; error_msg = "failed to parse Web RPC reply"; MIOFILE mf; XML_PARSER xp(&mf); mf.init_file(reply); while (!xp.get_tag()) { if (xp.match_tag("/batches")) { retval = 0; error_msg = ""; break; } if (xp.match_tag("batch")) { BATCH_STATUS bs; if (!bs.parse(xp)) { batches.push_back(bs); } continue; } if (xp.parse_string("error_msg", error_msg)) continue; if (xp.parse_int("error_num", retval)) continue; } fclose(reply); return retval; } int JOB_STATE::parse(XML_PARSER& xp) { memset(this, 0, sizeof(JOB_STATE)); while (!xp.get_tag()) { if (xp.match_tag("/job")) { return 0; } if (xp.parse_int("id", id)) continue; if (xp.parse_str("name", name, sizeof(name))) continue; if (xp.parse_int("canonical_instance_id", canonical_instance_id)) continue; if (xp.parse_int("n_outfiles", n_outfiles)) continue; } return ERR_XML_PARSE; } void JOB_STATE::print() { printf( "job %d (%s)\n" " canonical_instance_id %d\n" " n_outfiles %d\n", id, name, canonical_instance_id, n_outfiles ); } int query_batch( const char* project_url, const char* authenticator, int batch_id, const char* batch_name, vector& jobs, string &error_msg ) { string request; char url[1024], buf[256]; request = "\n"; sprintf(buf, "%s\n", authenticator); request += string(buf); if (batch_id) { sprintf(buf, "%d\n", batch_id); } else { sprintf(buf, "%s\n", batch_name); } request += string(buf); request += "\n"; sprintf(url, "%ssubmit_rpc_handler.php", project_url); FILE* reply = tmpfile(); vector x; int retval = do_http_post(url, request.c_str(), reply, x); if (retval) { fclose(reply); return retval; } fseek(reply, 0, SEEK_SET); retval = -1; error_msg = ""; MIOFILE mf; XML_PARSER xp(&mf); mf.init_file(reply); while (!xp.get_tag()) { if (xp.match_tag("/jobs")) { retval = 0; break; } if (xp.match_tag("job")) { JOB_STATE js; if (!js.parse(xp)) { jobs.push_back(js); } continue; } } fclose(reply); return retval; } int abort_jobs( const char* project_url, const char* authenticator, vector &job_names, string &error_msg ) { string request; char url[1024], buf[256]; request = "\n"; sprintf(buf, "%s\n", authenticator); request += string(buf); for (unsigned int i=0; i%s\n", job_names[i].c_str()); request += string(buf); } request += "\n"; sprintf(url, "%ssubmit_rpc_handler.php", project_url); FILE* reply = tmpfile(); vector x; int retval = do_http_post(url, request.c_str(), reply, x); if (retval) { fclose(reply); return retval; } fseek(reply, 0, SEEK_SET); retval = -1; error_msg = ""; while (fgets(buf, 256, reply)) { #ifdef SHOW_REPLY printf("abort_jobs reply: %s", buf); #endif if (strstr(buf, "success")) { retval = 0; continue; } if (parse_int(buf, "", retval)) continue; if (parse_str(buf, "", error_msg)) continue; } fclose(reply); return retval; } int get_templates( const char* project_url, const char* authenticator, const char* app_name, const char* job_name, TEMPLATE_DESC &td, string &error_msg ) { string request; char url[1024], buf[256]; request = "\n"; sprintf(buf, "%s\n", authenticator); request += string(buf); if (app_name) { sprintf(buf, "%s\n", app_name); request += string(buf); } else { sprintf(buf, "%s\n", job_name); request += string(buf); } request += "\n"; sprintf(url, "%ssubmit_rpc_handler.php", project_url); FILE* reply = tmpfile(); vector x; int retval = do_http_post(url, request.c_str(), reply, x); if (retval) { fclose(reply); return retval; } retval = -1; error_msg = ""; fseek(reply, 0, SEEK_SET); while (fgets(buf, 256, reply)) { #ifdef SHOW_REPLY printf("get_templates reply: %s", buf); #endif if (parse_int(buf, "", retval)) continue; if (parse_str(buf, "", error_msg)) continue; if (strstr(buf, "")) { MIOFILE mf; XML_PARSER xp(&mf); mf.init_file(reply); retval = td.parse(xp); } } fclose(reply); return retval; } int TEMPLATE_DESC::parse(XML_PARSER& xp) { string s; while (!xp.get_tag()) { if (xp.match_tag("input_template")) { while (!xp.get_tag()) { if (xp.match_tag("/input_template")) break; if (xp.parse_string("open_name", s)) { input_files.push_back(s); } } } if (xp.match_tag("output_template")) { while (!xp.get_tag()) { if (xp.match_tag("/output_template")) break; if (xp.parse_string("open_name", s)) { output_files.push_back(s); } } } } return 0; } int COMPLETED_JOB_DESC::parse(XML_PARSER& xp) { canonical_resultid = 0; error_mask = 0; error_resultid = 0; exit_status = 0; elapsed_time = 0; cpu_time = 0; while (!xp.get_tag()) { if (xp.match_tag("/completed_job")) return 0; if (xp.parse_int("canonical_resultid", canonical_resultid)) continue; if (xp.parse_int("error_mask", error_mask)) continue; if (xp.parse_int("error_resultid", error_resultid)) continue; if (xp.parse_int("exit_status", exit_status)) continue; if (xp.parse_double("elapsed_time", elapsed_time)) continue; if (xp.parse_double("cpu_time", cpu_time)) continue; if (xp.parse_string("stderr_out", stderr_out)) { xml_unescape(stderr_out); continue; } } return ERR_XML_PARSE; } int get_output_file( const char* project_url, const char* authenticator, const char* job_name, int file_num, const char* dst_path, string &error_msg ) { char url[1024], job_name_esc[1024]; escape_url(job_name, job_name_esc, sizeof(job_name_esc)); sprintf(url, "%sget_output.php?cmd=workunit_file&auth_str=%s&wu_name=%s&file_num=%d", project_url, authenticator, job_name_esc, file_num ); //printf("fetching %s to %s\n", url, dst_path); int retval = do_http_get(url, dst_path); error_msg = ""; if (retval) { char buf[1024]; sprintf(buf, "couldn't fetch %s: %d", url, retval); error_msg = string(buf); } return retval; } int query_completed_job( const char* project_url, const char* authenticator, const char* job_name, COMPLETED_JOB_DESC& jd, string &error_msg ) { string request; char url[1024], buf[256]; request = "\n"; sprintf(buf, "%s\n", authenticator); request += string(buf); sprintf(buf, "%s\n", job_name); request += string(buf); request += "\n"; sprintf(url, "%ssubmit_rpc_handler.php", project_url); FILE* reply = tmpfile(); vector x; int retval = do_http_post(url, request.c_str(), reply, x); if (retval) { fclose(reply); return retval; } retval = -1; error_msg = ""; fseek(reply, 0, SEEK_SET); while (fgets(buf, 256, reply)) { #ifdef SHOW_REPLY printf("query_completed_job reply: %s", buf); #endif if (parse_int(buf, "", retval)) continue; if (parse_str(buf, "", error_msg)) continue; if (strstr(buf, "")) { MIOFILE mf; XML_PARSER xp(&mf); mf.init_file(reply); retval = jd.parse(xp); } } fclose(reply); return retval; } int retire_batch( const char* project_url, const char* authenticator, const char* batch_name, string &error_msg ) { string request; char url[1024], buf[256]; request = "\n"; sprintf(buf, "%s\n", authenticator); request += string(buf); sprintf(buf, "%s\n", batch_name); request += string(buf); request += "\n"; sprintf(url, "%ssubmit_rpc_handler.php", project_url); FILE* reply = tmpfile(); vector x; int retval = do_http_post(url, request.c_str(), reply, x); if (retval) { fclose(reply); return retval; } retval = -1; error_msg = ""; fseek(reply, 0, SEEK_SET); while (fgets(buf, 256, reply)) { #ifdef SHOW_REPLY printf("retire_batch reply: %s", buf); #endif if (parse_int(buf, "", retval)) continue; if (parse_str(buf, "", error_msg)) continue; if (strstr(buf, "success")) { retval = 0; continue; } } fclose(reply); return retval; } int set_expire_time( const char* project_url, const char* authenticator, const char* batch_name, double expire_time, string &error_msg ) { string request; char url[1024], buf[256]; request = "\n"; sprintf(buf, "%s\n", authenticator); request += string(buf); sprintf(buf, "%s\n", batch_name); request += string(buf); sprintf(buf, "%f\n", expire_time); request += string(buf); request += "\n"; sprintf(url, "%ssubmit_rpc_handler.php", project_url); FILE* reply = tmpfile(); vector x; int retval = do_http_post(url, request.c_str(), reply, x); if (retval) { fclose(reply); return retval; } retval = -1; error_msg = ""; fseek(reply, 0, SEEK_SET); while (fgets(buf, 256, reply)) { #ifdef SHOW_REPLY printf("set_expire_time reply: %s", buf); #endif if (parse_int(buf, "", retval)) continue; if (parse_str(buf, "", error_msg)) continue; if (strstr(buf, "success")) { retval = 0; continue; } } fclose(reply); return retval; } int ping_server( const char* project_url, string &error_msg ) { string request; char url[1024], buf[256]; request = " \n"; // the space is needed sprintf(url, "%ssubmit_rpc_handler.php", project_url); FILE* reply = tmpfile(); vector x; int retval = do_http_post(url, request.c_str(), reply, x); if (retval) { fclose(reply); return retval; } retval = -1; error_msg = ""; fseek(reply, 0, SEEK_SET); while (fgets(buf, 256, reply)) { #ifdef SHOW_REPLY printf("reply: %s\n", buf); #endif if (parse_int(buf, "", retval)) continue; if (parse_str(buf, "", error_msg)) continue; if (strstr(buf, "success")) { retval = 0; continue; } } fclose(reply); return retval; }