// This file is part of BOINC. // http://boinc.berkeley.edu // Copyright (C) 2013 University of California // // BOINC is free software; you can redistribute it and/or modify it // under the terms of the GNU Lesser General Public License // as published by the Free Software Foundation, // either version 3 of the License, or (at your option) any later version. // // BOINC is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with BOINC. If not, see . // BOINC GAHP (Grid ASCII Helper Protocol) daemon // Notes: // - This is currently Unix-only (mostly because of its use of pthreads) // but with some work it could be made to run on Windows #include #include #include #include #include #include #include #include #include #include "md5_file.h" #include "parse.h" #include "remote_submit.h" #include "svn_version.h" #include "version.h" #define BOINC_GAHP_VERSION "1.0.2" using std::map; using std::pair; using std::set; using std::string; using std::vector; //#define DEBUG // if set, handle commands synchronously rather than // handling them in separate threads // Also print more errors extern size_t strlcpy(char*, const char*, size_t); char project_url[256]; char authenticator[256]; char response_prefix[256]; // The following are used to implement "asynchronous mode", // in which we send "R" when a command has completed. // Synchronization is needed in case 2 commands complete around the same time. // bool wrote_r = false; bool async_mode = false; #define BPRINTF(fmt, ...) \ printf( "%s" fmt, response_prefix, ##__VA_ARGS__ ); \ struct SUBMIT_REQ { char batch_name[256]; char app_name[256]; vector jobs; int batch_id; JOB_PARAMS job_params; char app_version_num[256]; }; struct LOCAL_FILE { char boinc_name[256]; // the MD5 followed by filename extension if any double nbytes; }; // represents a command. // struct COMMAND { int id; char cmd[256]; char* in; // the input, in a malloc'd buffer char* out; // if NULL the command is in progress; // otherwise the output in a malloc'd buffer SUBMIT_REQ submit_req; FETCH_OUTPUT_REQ fetch_output_req; vector abort_job_names; vector batch_names; char batch_name[256]; double lease_end_time; double min_mod_time; COMMAND(char* _in) { in = _in; out = NULL; } ~COMMAND() { if (in) free(in); if (out) free(out); } int parse_command(); int parse_submit(char*); int parse_query_batches(char*); int parse_fetch_output(char*); int parse_abort_jobs(char*); int parse_retire_batch(char*); int parse_set_lease(char*); }; vector commands; void filename_extension(const char* path, char* ext) { const char* p = strrchr(path, '/'); if (!p) p = path; const char* q = strrchr(p, '.'); if (q) { strcpy(ext, q); } else { strcpy(ext, ""); } } int compute_boinc_name(string path, LOCAL_FILE& f) { char md5[64], ext[256]; int retval = md5_file(path.c_str(), md5, f.nbytes); if (retval) return retval; filename_extension(path.c_str(), ext); sprintf(f.boinc_name, "%s%s", md5, ext); return 0; } const char *escape_str(const string &str) { static string out; out = ""; for (const char *ptr = str.c_str(); *ptr; ptr++) { switch ( *ptr ) { case ' ': case '\\': case '\r': case '\n': out += '\\'; default: out += *ptr; } } return out.c_str(); } // Get a list of the input files used by the batch. // Get their MD5s. // See if they're already on the server. // If not, upload them. // int process_input_files(SUBMIT_REQ& req, string& error_msg) { unsigned int i, j; int retval; char buf[1024]; map local_files; // maps local path to info about file // get the set of unique source paths // set unique_paths; for (i=0; i::iterator iter = unique_paths.begin(); while (iter != unique_paths.end()) { string s = *iter; LOCAL_FILE lf; retval = compute_boinc_name(s, lf); if (retval) return retval; local_files.insert(std::pair(s, lf)); ++iter; } // ask the server which files it doesn't already have. // map::iterator map_iter; map_iter = local_files.begin(); vector boinc_names, paths; vector absent_files; while (map_iter != local_files.end()) { LOCAL_FILE lf = map_iter->second; paths.push_back(map_iter->first); boinc_names.push_back(lf.boinc_name); ++map_iter; } retval = query_files( project_url, authenticator, boinc_names, req.batch_id, absent_files, error_msg ); if (retval) { #ifdef DEBUG printf("query_files() failed (%d): %s\n", retval, error_msg.c_str()); #endif return retval; } // upload the missing files. // vector upload_boinc_names, upload_paths; for (unsigned int i=0; i::iterator iter = local_files.find(infile.src_path); LOCAL_FILE& lf = iter->second; sprintf(infile.physical_name, "%s", lf.boinc_name); } } return 0; } // parse the text coming from Condor // int COMMAND::parse_submit(char* p) { strlcpy(submit_req.batch_name, strtok_r(NULL, " ", &p), sizeof(submit_req.batch_name)); strlcpy(submit_req.app_name, strtok_r(NULL, " ", &p), sizeof(submit_req.app_name)); int njobs = atoi(strtok_r(NULL, " ", &p)); for (int i=0; i // // <#files> // // ... // int COMMAND::parse_fetch_output(char* p) { char* q = strtok_r(NULL, " ", &p); if (!q) return -1; strlcpy(fetch_output_req.job_name, q, sizeof(fetch_output_req.job_name)); q = strtok_r(NULL, " ", &p); if (!q) return -1; strlcpy(fetch_output_req.dir, q, sizeof(fetch_output_req.dir)); q = strtok_r(NULL, " ", &p); if (!q) return -1; fetch_output_req.stderr_filename = string(q); q = strtok_r(NULL, " ", &p); if (!q) return -1; if (!strcmp(q, "ALL")) { fetch_output_req.fetch_all = true; } else if (!strcmp(q, "SOME")) { fetch_output_req.fetch_all = false; } else { return -1; } int nfiles = atoi(strtok_r(NULL, " ", &p)); for (int i=0; i/dev/null", req.dir, req.job_name); retval = system(buf); if (retval) { s = string("unzip\\ failed"); } else { unlink(path); } } } else if (req.fetch_all) { for (i=0; i= td.output_files.size()) { sprintf(buf, "too\\ many\\ output\\ files\\ specified\\ submit:%d\\ template:%d", i, td.output_files.size() ); s = string(buf); goto done; } j = i; } sprintf(path, "%s/%s", req.dir, lname); retval = get_output_file( project_url, authenticator, req.job_name, i, path, error_msg ); if (retval) { sprintf(buf, "get_output_file()\\ returned\\ %d\\ ", retval); s = string(buf) + escape_str(error_msg); break; } } } // We've fetched all required output files; now move or rename them. // Use system("mv...") rather than rename(), // since the latter doesn't work across filesystems // for (i=0; i::iterator i; for (i = commands.begin(); i != commands.end(); ++i) { COMMAND *c2 = *i; if (c2->out) { n++; } } return n; } // p is a malloc'ed buffer // int handle_command(char* p) { char cmd[256]; int id, retval; cmd[0] = '\0'; sscanf(p, "%s", cmd); if (!strcasecmp(cmd, "VERSION")) { print_version(false); } else if (!strcasecmp(cmd, "COMMANDS")) { BPRINTF("S ASYNC_MODE_OFF ASYNC_MODE_ON BOINC_ABORT_JOBS BOINC_FETCH_OUTPUT BOINC_PING BOINC_QUERY_BATCHES BOINC_RETIRE_BATCH BOINC_SELECT_PROJECT BOINC_SET_LEASE BOINC_SUBMIT COMMANDS QUIT RESULTS VERSION\n"); } else if (!strcasecmp(cmd, "RESPONSE_PREFIX")) { flockfile(stdout); BPRINTF("S\n"); strlcpy(response_prefix, p+strlen("RESPONSE_PREFIX "), sizeof(response_prefix)); funlockfile(stdout); } else if (!strcasecmp(cmd, "ASYNC_MODE_ON")) { flockfile(stdout); BPRINTF("S\n"); async_mode = true; funlockfile(stdout); } else if (!strcasecmp(cmd, "ASYNC_MODE_OFF")) { flockfile(stdout); BPRINTF("S\n"); async_mode = false; funlockfile(stdout); } else if (!strcasecmp(cmd, "QUIT")) { exit(0); } else if (!strcasecmp(cmd, "RESULTS")) { flockfile(stdout); int cnt = n_results(); BPRINTF("S %d\n", cnt); vector::iterator i = commands.begin(); int j = 0; while (i != commands.end() && j < cnt) { COMMAND *c2 = *i; if (c2->out) { BPRINTF("%d %s\n", c2->id, c2->out); delete c2; i = commands.erase(i); j++; } else { ++i; } } wrote_r = false; funlockfile(stdout); } else if (!strcasecmp(cmd, "BOINC_SELECT_PROJECT")) { int n = sscanf(p, "%s %s %s", cmd, project_url, authenticator); if (n ==3) { BPRINTF("S\n"); } else { BPRINTF("E\n"); } } else { // asynchronous commands go here // COMMAND *cp = new COMMAND(p); p = NULL; retval = cp->parse_command(); if (retval) { BPRINTF("E\n"); delete cp; return 0; } #ifdef DEBUG handle_command_aux(cp); BPRINTF("result: %s\n", cp->out); delete cp; #else printf("S\n"); commands.push_back(cp); pthread_t thread_handle; pthread_attr_t thread_attrs; pthread_attr_init(&thread_attrs); pthread_attr_setstacksize(&thread_attrs, 256*1024); retval = pthread_create( &thread_handle, &thread_attrs, &handle_command_aux, cp ); if (retval) { fprintf(stderr, "can't create thread\n"); return -1; } #endif } free(p); return 0; } // read a line from stdin (possibly very long). // Return it in a malloc'd buffer // char* get_cmd() { static const int buf_inc = 16384; char* p = (char*)malloc(buf_inc); if (!p) return NULL; int len = 0; int buf_size = buf_inc; while (1) { char c = fgetc(stdin); if (c == EOF) { free(p); return NULL; } if (c == '\n') { p[len] = 0; if ( len > 0 && p[len-1] == '\r' ) { p[len-1] = 0; } return p; } p[len++] = c; if (len == buf_size) { p = (char*)realloc(p, len+buf_inc); buf_size += buf_inc; } } } void read_config() { FILE* f = fopen("config.txt", "r"); if (!f) { fprintf(stderr, "no config.txt\n"); return; } fgets(project_url, 256, f); strip_whitespace(project_url); fgets(authenticator, 256, f); strip_whitespace(authenticator); fclose(f); if (!strlen(project_url)) { fprintf(stderr, "no project URL given\n"); exit(1); } if (!strlen(authenticator)) { fprintf(stderr, "no authenticator given\n"); exit(1); } } int main(int argc, char*argv[]) { if (argc>1) { if (!strcmp(argv[1],"--version")) { fprintf(stderr,SVN_VERSION"\n"); return 0; } } read_config(); strcpy(response_prefix, ""); print_version(true); fflush(stdout); while (1) { char* p = get_cmd(); if (p == NULL) break; handle_command(p); fflush(stdout); } return 0; }