// This file is part of BOINC. // http://boinc.berkeley.edu // Copyright (C) 2013 University of California // // BOINC is free software; you can redistribute it and/or modify it // under the terms of the GNU Lesser General Public License // as published by the Free Software Foundation, // either version 3 of the License, or (at your option) any later version. // // BOINC is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with BOINC. If not, see . // BOINC GAHP (Grid ASCII Helper Protocol) daemon // Notes: // - This is currently Unix-only (mostly because of its use of pthreads) // but with some work it could be made to run on Windows #include #include #include #include #include #include #include #include #include #include "md5_file.h" #include "parse.h" #include "remote_submit.h" using std::map; using std::pair; using std::set; using std::string; using std::vector; char project_url[256]; char authenticator[256]; char response_prefix[256]; // The following are used to implement "asynchronous mode", // in which we send "R" when a command has completed. // Synchronization is needed in case 2 commands complete around the same time. // bool wrote_r = false; bool async_mode = false; #define BPRINTF(fmt, ...) \ printf( "%s" fmt, response_prefix, ##__VA_ARGS__ ); \ bool debug_mode = false; // if set, handle commands synchronously rather than // handling them in separate threads struct SUBMIT_REQ { char batch_name[256]; char app_name[256]; vector jobs; int batch_id; }; // represents a command. // struct COMMAND { int id; char cmd[256]; char* in; // the input, in a malloc'd buffer char* out; // if NULL the command is in progress; // otherwise the output in a malloc'd buffer SUBMIT_REQ submit_req; FETCH_OUTPUT_REQ fetch_output_req; vector abort_job_names; vector batch_names; char batch_name[256]; double lease_end_time; double min_mod_time; COMMAND(char* _in) { in = _in; out = NULL; } ~COMMAND() { if (in) free(in); if (out) free(out); } int parse_command(); int parse_submit(char*); int parse_query_batches(char*); int parse_fetch_output(char*); int parse_abort_jobs(char*); int parse_retire_batch(char*); int parse_set_lease(char*); }; vector commands; int compute_md5(string path, LOCAL_FILE& f) { return md5_file(path.c_str(), f.md5, f.nbytes); } const char *escape_str(const string &str) { static string out; out = ""; for (const char *ptr = str.c_str(); *ptr; ptr++) { switch ( *ptr ) { case ' ': case '\\': case '\r': case '\n': out += '\\'; default: out += *ptr; } } return out.c_str(); } // Get a list of the input files used by the batch. // Get their MD5s. // See if they're already on the server. // If not, upload them. // int process_input_files(SUBMIT_REQ& req, string& error_msg) { unsigned int i, j; int retval; char buf[1024]; map local_files; // maps local path to info about file // get the set of unique source paths // set unique_paths; for (i=0; i::iterator iter = unique_paths.begin(); while (iter != unique_paths.end()) { string s = *iter; LOCAL_FILE lf; retval = compute_md5(s, lf); if (retval) return retval; local_files.insert(std::pair(s, lf)); iter++; } // ask the server which files it doesn't already have. // map::iterator map_iter; map_iter = local_files.begin(); vector md5s, paths; vector absent_files; while (map_iter != local_files.end()) { LOCAL_FILE lf = map_iter->second; paths.push_back(map_iter->first); md5s.push_back(lf.md5); map_iter++; } retval = query_files( project_url, authenticator, md5s, req.batch_id, absent_files, error_msg ); if (retval) return retval; // upload the missing files. // vector upload_md5s, upload_paths; for (unsigned int i=0; i::iterator iter = local_files.find(infile.src_path); LOCAL_FILE& lf = iter->second; sprintf(infile.physical_name, "jf_%s", lf.md5); } } return 0; } // parse the text coming from Condor // int COMMAND::parse_submit(char* p) { strcpy(submit_req.batch_name, strtok_r(NULL, " ", &p)); strcpy(submit_req.app_name, strtok_r(NULL, " ", &p)); int njobs = atoi(strtok_r(NULL, " ", &p)); for (int i=0; i // // <#files> // // ... // int COMMAND::parse_fetch_output(char* p) { char* q = strtok_r(NULL, " ", &p); if (!q) return -1; strcpy(fetch_output_req.job_name, q); q = strtok_r(NULL, " ", &p); if (!q) return -1; strcpy(fetch_output_req.dir, q); q = strtok_r(NULL, " ", &p); if (!q) return -1; fetch_output_req.stderr_filename = string(q); q = strtok_r(NULL, " ", &p); if (!q) return -1; if (!strcmp(q, "ALL")) { fetch_output_req.fetch_all = true; } else if (!strcmp(q, "SOME")) { fetch_output_req.fetch_all = false; } else { return -1; } int nfiles = atoi(strtok_r(NULL, " ", &p)); for (int i=0; i::iterator i; for (i = commands.begin(); i != commands.end(); i++) { COMMAND *c2 = *i; if (c2->out) { n++; } } return n; } // p is a malloc'ed buffer // int handle_command(char* p) { char cmd[256]; int id; cmd[0] = '\0'; sscanf(p, "%s", cmd); if (!strcasecmp(cmd, "VERSION")) { print_version(false); } else if (!strcasecmp(cmd, "COMMANDS")) { BPRINTF("S ASYNC_MODE_OFF ASYNC_MODE_ON BOINC_ABORT_JOBS BOINC_FETCH_OUTPUT BOINC_PING BOINC_QUERY_BATCHES BOINC_RETIRE_BATCH BOINC_SELECT_PROJECT BOINC_SET_LEASE BOINC_SUBMIT COMMANDS QUIT RESULTS VERSION\n"); } else if (!strcasecmp(cmd, "RESPONSE_PREFIX")) { flockfile(stdout); BPRINTF("S\n"); strcpy(response_prefix, p+strlen("RESPONSE_PREFIX ")); funlockfile(stdout); } else if (!strcasecmp(cmd, "ASYNC_MODE_ON")) { flockfile(stdout); BPRINTF("S\n"); async_mode = true; funlockfile(stdout); } else if (!strcasecmp(cmd, "ASYNC_MODE_OFF")) { flockfile(stdout); BPRINTF("S\n"); async_mode = false; funlockfile(stdout); } else if (!strcasecmp(cmd, "QUIT")) { exit(0); } else if (!strcasecmp(cmd, "RESULTS")) { flockfile(stdout); int cnt = n_results(); BPRINTF("S %d\n", cnt); vector::iterator i = commands.begin(); int j = 0; while (i != commands.end() && j < cnt) { COMMAND *c2 = *i; if (c2->out) { BPRINTF("%d %s\n", c2->id, c2->out); delete c2; i = commands.erase(i); j++; } else { i++; } } wrote_r = false; funlockfile(stdout); } else if (!strcasecmp(cmd, "BOINC_SELECT_PROJECT")) { int n = sscanf(p, "%s %s %s", cmd, project_url, authenticator); if (n ==3) { BPRINTF("S\n"); } else { BPRINTF("E\n"); } } else { // asynchronous commands go here // COMMAND *cp = new COMMAND(p); p = NULL; int retval = cp->parse_command(); if (retval) { BPRINTF("E\n"); delete cp; return 0; } if (debug_mode) { handle_command_aux(cp); BPRINTF("result: %s\n", cp->out); delete cp; } else { printf("S\n"); commands.push_back(cp); pthread_t thread_handle; pthread_attr_t thread_attrs; pthread_attr_init(&thread_attrs); pthread_attr_setstacksize(&thread_attrs, 256*1024); int retval = pthread_create( &thread_handle, &thread_attrs, &handle_command_aux, cp ); if (retval) { fprintf(stderr, "can't create thread\n"); return -1; } } } free(p); return 0; } // read a line from stdin (possibly very long). // Return it in a malloc'd buffer // char* get_cmd() { static const int buf_inc = 16384; char* p = (char*)malloc(buf_inc); if (!p) return NULL; int len = 0; int buf_size = buf_inc; while (1) { char c = fgetc(stdin); if (c == EOF) { return NULL; } if (c == '\n') { p[len] = 0; if ( len > 0 && p[len-1] == '\r' ) { p[len-1] = 0; } return p; } p[len++] = c; if (len == buf_size) { p = (char*)realloc(p, len+buf_inc); buf_size += buf_inc; } } } void read_config() { FILE* f = fopen("config.txt", "r"); if (!f) { fprintf(stderr, "no config.txt\n"); return; } fgets(project_url, 256, f); strip_whitespace(project_url); fgets(authenticator, 256, f); strip_whitespace(authenticator); fclose(f); if (!strlen(project_url)) { fprintf(stderr, "no project URL given\n"); exit(1); } if (!strlen(authenticator)) { fprintf(stderr, "no authenticator given\n"); exit(1); } } int main() { read_config(); strcpy(response_prefix, ""); print_version(true); fflush(stdout); while (1) { char* p = get_cmd(); if (p == NULL) break; handle_command(p); fflush(stdout); } }