// This file is part of BOINC. // http://boinc.berkeley.edu // Copyright (C) 2008 University of California // // BOINC is free software; you can redistribute it and/or modify it // under the terms of the GNU Lesser General Public License // as published by the Free Software Foundation, // either version 3 of the License, or (at your option) any later version. // // BOINC is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with BOINC. If not, see . // wrapper.C // wrapper program - lets you use non-BOINC apps with BOINC // // Handles: // - suspend/resume/quit/abort // - reporting CPU time // - loss of heartbeat from core client // - checkpointing // (at the level of task; or potentially within task) // // See http://boinc.berkeley.edu/trac/wiki/WrapperApp for details // Contributor: Andrew J. Younge (ajy4490@umiacs.umd.edu) #include #include #include #ifdef _WIN32 #include "boinc_win.h" #include "win_util.h" #else #include #include #include #include #endif #include "procinfo.h" #include "boinc_api.h" #include "diagnostics.h" #include "filesys.h" #include "parse.h" #include "str_util.h" #include "str_replace.h" #include "util.h" #include "error_numbers.h" #define JOB_FILENAME "job.xml" #define CHECKPOINT_FILENAME "wrapper_checkpoint.txt" #define POLL_PERIOD 1.0 using std::vector; using std::string; struct TASK { string application; string exec_dir; // optional execution directory; macro-substituted for $PROJECT_DIR vector vsetenv; // vector of strings for environment variables string stdin_filename; string stdout_filename; string stderr_filename; string checkpoint_filename; // name of task's checkpoint file, if any string fraction_done_filename; // name of file where app will write its fraction done string command_line; double weight; // contribution of this task to overall fraction done bool is_daemon; bool append_cmdline_args; // dynamic stuff follows double final_cpu_time; double starting_cpu; // how much CPU time was used by tasks before this in the job file bool suspended; double wall_cpu_time; // for estimating CPU time on Win98/ME and Mac #ifdef _WIN32 HANDLE pid_handle; DWORD pid; HANDLE thread_handle; struct _stat last_stat; // mod time of checkpoint file #else int pid; struct stat last_stat; #endif bool stat_first; void macro_substitute(char* buf, const int len); int parse(XML_PARSER&); bool poll(int& status); int run(int argc, char** argv); void kill(); void stop(); void resume(); double cpu_time(); inline bool has_checkpointed() { bool changed = false; if (checkpoint_filename.size() == 0) return false; struct stat new_stat; int retval = stat(checkpoint_filename.c_str(), &new_stat); if (retval) return false; if (!stat_first && new_stat.st_mtime != last_stat.st_mtime) { changed = true; } stat_first = false; last_stat.st_mtime = new_stat.st_mtime; return changed; } inline double fraction_done() { if (fraction_done_filename.size() == 0) return 0; FILE* f = fopen(fraction_done_filename.c_str(), "r"); if (!f) return 0; double frac; int n = fscanf(f, "%lf", &frac); fclose(f); if (n != 1) return 0; if (frac < 0) return 0; if (frac > 1) return 1; return frac; } #ifdef _WIN32 // Windows uses a "null-terminated sequence of null-terminated strings" // to represent env vars. // I guess arg/argv didn't cut it for them. // void set_up_env_vars(char** env_vars, const int nvars) { int bufsize = 0; int len = 0; for (int j = 0; j < nvars; j++) { bufsize += (1 + vsetenv[j].length()); } bufsize++; // add a final byte for array null ptr *env_vars = new char[bufsize]; memset(*env_vars, 0, sizeof(char) * bufsize); char* p = *env_vars; // copy each env string to a buffer for the process for (vector::iterator it = vsetenv.begin(); it != vsetenv.end() && len < bufsize-1; it++ ) { strncpy(p, it->c_str(), it->length()); len = strlen(p); p += len + 1; // move pointer ahead } } #else void set_up_env_vars(char*** env_vars, const int nvars) { *env_vars = new char*[nvars+1]; // need one more than the # of vars, for a NULL ptr at the end memset(*env_vars, 0x00, sizeof(char*) * (nvars+1)); // get all environment vars for this task for (int i = 0; i < nvars; i++) { (*env_vars)[i] = (char*) vsetenv[i].c_str(); } } #endif }; vector tasks; vector daemons; APP_INIT_DATA aid; // macro replacement in wrapper strings from job.xml // for example PROJECT_DIR can be replaced in exec_dir and environment variables // void TASK::macro_substitute(char* buf, const int iLen = 8192) { char* buf2 = new char[iLen]; while (1) { char* p = strstr(buf, "$PROJECT_DIR"); if (!p) break; strcpy(buf2, p+strlen("$PROJECT_DIR")); if (strlen(aid.project_dir) > 0) { strcpy(p, aid.project_dir); } else { strcpy(p, "."); } strcat(p, buf2); } delete [] buf2; } int TASK::parse(XML_PARSER& xp) { char tag[1024], buf[8192]; bool is_tag; weight = 1; final_cpu_time = 0; stat_first = true; pid = 0; is_daemon = false; append_cmdline_args = false; while (!xp.get(tag, sizeof(tag), is_tag)) { if (!is_tag) { fprintf(stderr, "%s TASK::parse(): unexpected text %s\n", boinc_msg_prefix(buf, sizeof(buf)), tag ); continue; } if (!strcmp(tag, "/task")) { return 0; } else if (xp.parse_string(tag, "application", application)) continue; else if (xp.parse_str(tag, "exec_dir", buf, sizeof(buf))) { macro_substitute(buf, 8192); exec_dir = buf; continue; } else if (xp.parse_str(tag, "setenv", buf, sizeof(buf))) { macro_substitute(buf, 8192); vsetenv.push_back(buf); continue; } else if (xp.parse_string(tag, "stdin_filename", stdin_filename)) continue; else if (xp.parse_string(tag, "stdout_filename", stdout_filename)) continue; else if (xp.parse_string(tag, "stderr_filename", stderr_filename)) continue; else if (xp.parse_str(tag, "command_line", buf, sizeof(buf))) { macro_substitute(buf, 8192); command_line = buf; continue; } else if (xp.parse_string(tag, "checkpoint_filename", checkpoint_filename)) continue; else if (xp.parse_string(tag, "fraction_done_filename", fraction_done_filename)) continue; else if (xp.parse_double(tag, "weight", weight)) continue; else if (xp.parse_bool(tag, "daemon", is_daemon)) continue; else if (xp.parse_bool(tag, "append_cmdline_args", append_cmdline_args)) continue; } return ERR_XML_PARSE; } int parse_job_file() { MIOFILE mf; char tag[1024], buf[256], buf2[256]; bool is_tag; boinc_resolve_filename(JOB_FILENAME, buf, 1024); FILE* f = boinc_fopen(buf, "r"); if (!f) { fprintf(stderr, "%s can't open job file %s\n", boinc_msg_prefix(buf2, sizeof(buf2)), buf ); return ERR_FOPEN; } mf.init_file(f); XML_PARSER xp(&mf); if (!xp.parse_start("job_desc")) return ERR_XML_PARSE; while (!xp.get(tag, sizeof(tag), is_tag)) { if (!is_tag) { fprintf(stderr, "%s unexpected text in job.xml: %s\n", boinc_msg_prefix(buf2, sizeof(buf2)), tag ); continue; } if (!strcmp(tag, "/job_desc")) { fclose(f); return 0; } if (!strcmp(tag, "task")) { TASK task; int retval = task.parse(xp); if (!retval) { if (task.is_daemon) { daemons.push_back(task); } else { tasks.push_back(task); } } continue; } else { fprintf(stderr, "%s unexpected tag in job.xml: %s\n", boinc_msg_prefix(buf2, sizeof(buf2)), tag ); } } fclose(f); return ERR_XML_PARSE; } int start_daemons(int argc, char** argv) { for (unsigned int i=0; i daemon_pids; for (unsigned int i=0; i 0) { set_up_env_vars(&env_vars, nvars); } if (!CreateProcess( app_path, (LPSTR)command.c_str(), NULL, NULL, TRUE, // bInheritHandles CREATE_NO_WINDOW|IDLE_PRIORITY_CLASS, (LPVOID) env_vars, exec_dir.empty()?NULL:exec_dir.c_str(), &startup_info, &process_info )) { char error_msg[1024]; windows_error_string(error_msg, sizeof(error_msg)); fprintf(stderr, "can't run app: %s\n", error_msg); if (env_vars) delete [] env_vars; return ERR_EXEC; } if (env_vars) delete [] env_vars; pid_handle = process_info.hProcess; pid = process_info.dwProcessId; thread_handle = process_info.hThread; SetThreadPriority(thread_handle, THREAD_PRIORITY_IDLE); #else int retval, argc; char progname[256]; char* argv[256]; char arglist[4096]; FILE* stdout_file; FILE* stdin_file; FILE* stderr_file; pid = fork(); if (pid == -1) { perror("fork(): "); return ERR_FORK; } if (pid == 0) { // we're in the child process here // // open stdout, stdin if file names are given // NOTE: if the application is restartable, // we should deal with atomicity somehow // if (stdout_filename != "") { boinc_resolve_filename_s(stdout_filename.c_str(), stdout_path); stdout_file = freopen(stdout_path.c_str(), "a", stdout); if (!stdout_file) return ERR_FOPEN; } if (stdin_filename != "") { boinc_resolve_filename_s(stdin_filename.c_str(), stdin_path); stdin_file = freopen(stdin_path.c_str(), "r", stdin); if (!stdin_file) return ERR_FOPEN; } if (stderr_filename != "") { boinc_resolve_filename_s(stderr_filename.c_str(), stderr_path); stderr_file = freopen(stderr_path.c_str(), "a", stderr); if (!stderr_file) return ERR_FOPEN; } // construct argv // TODO: use malloc instead of stack var // argv[0] = app_path; strlcpy(arglist, command_line.c_str(), sizeof(arglist)); argc = parse_command_line(arglist, argv+1); setpriority(PRIO_PROCESS, 0, PROCESS_IDLE_PRIORITY); if (!exec_dir.empty()) { int retval = chdir(exec_dir.c_str()); #if 0 fprintf(stderr, "%s change to directory for task: %s\n", retval ? "Failed to" : "Successful", exec_dir.c_str() ); #endif } // setup environment variables (if any) // const int nvars = vsetenv.size(); char** env_vars = NULL; if (nvars > 0) { set_up_env_vars(&env_vars, nvars); retval = execve(app_path, argv, env_vars); } else { retval = execv(app_path, argv); } perror("execv() failed: "); exit(ERR_EXEC); } // pid = 0 i.e. child proc of the fork #endif wall_cpu_time = 0; suspended = false; return 0; } bool TASK::poll(int& status) { if (!suspended) wall_cpu_time += POLL_PERIOD; #ifdef _WIN32 unsigned long exit_code; if (GetExitCodeProcess(pid_handle, &exit_code)) { if (exit_code != STILL_ACTIVE) { status = exit_code; final_cpu_time = cpu_time(); return true; } } #else int wpid, stat; struct rusage ru; wpid = wait4(pid, &status, WNOHANG, &ru); if (wpid) { final_cpu_time = (float)ru.ru_utime.tv_sec + ((float)ru.ru_utime.tv_usec)/1e+6; return true; } #endif return false; } // kill this task (gracefully if possible) and any other subprocesses // void TASK::kill() { kill_daemons(); #ifdef _WIN32 // on Win, just kill all our descendants // vector descendants; get_descendants(GetCurrentProcessId(), descendants); kill_all(descendants); #else // on Unix, ask main process nicely. // it descendants still exist after 10 sec, use the nuclear option // vector descendants; get_descendants(getpid(), descendants); ::kill(pid, SIGTERM); for (int i=0; i<10; i++) { if (!any_process_exists(descendants)) { return; } sleep(1); } kill_all(descendants); // kill any processes that might have been created // in the last 10 secs get_descendants(getpid(), descendants); kill_all(descendants); #endif } void TASK::stop() { #ifdef _WIN32 suspend_or_resume_threads(pid, 0, false); #else ::kill(pid, SIGSTOP); #endif suspended = true; } void TASK::resume() { #ifdef _WIN32 suspend_or_resume_threads(pid, 0, true); #else ::kill(pid, SIGCONT); #endif suspended = false; } double TASK::cpu_time() { #ifdef _WIN32 double x; int retval = boinc_process_cpu_time(pid_handle, x); if (retval) return wall_cpu_time; return x; #elif defined(__APPLE__) // There's no easy way to get another process's CPU time in Mac OS X // return wall_cpu_time; #else return linux_cpu_time(pid); #endif } void poll_boinc_messages(TASK& task) { BOINC_STATUS status; boinc_get_status(&status); if (status.no_heartbeat) { task.kill(); exit(0); } if (status.quit_request) { task.kill(); exit(0); } if (status.abort_request) { task.kill(); exit(0); } if (status.suspended) { if (!task.suspended) { task.stop(); } } else { if (task.suspended) { task.resume(); } } } void send_status_message( TASK& task, double frac_done, double checkpoint_cpu_time ) { double current_cpu_time = task.starting_cpu + task.cpu_time(); boinc_report_app_status( current_cpu_time, checkpoint_cpu_time, frac_done ); } // Support for multiple tasks. // We keep a checkpoint file that says how many tasks we've completed // and how much CPU time has been used so far // void write_checkpoint(int ntasks_completed, double cpu) { FILE* f = fopen(CHECKPOINT_FILENAME, "w"); if (!f) return; fprintf(f, "%d %f\n", ntasks_completed, cpu); fclose(f); } void read_checkpoint(int& ntasks_completed, double& cpu) { int nt; double c; ntasks_completed = 0; cpu = 0; FILE* f = fopen(CHECKPOINT_FILENAME, "r"); if (!f) return; int n = fscanf(f, "%d %lf", &nt, &c); fclose(f); if (n != 2) return; ntasks_completed = nt; cpu = c; } int main(int argc, char** argv) { BOINC_OPTIONS options; int retval, ntasks_completed; unsigned int i; double total_weight=0, weight_completed=0; double checkpoint_cpu_time; // overall CPU time at last checkpoint memset(&options, 0, sizeof(options)); options.main_program = true; options.check_heartbeat = true; options.handle_process_control = true; boinc_init_options(&options); fprintf(stderr, "wrapper: starting\n"); boinc_get_init_data(aid); retval = parse_job_file(); if (retval) { fprintf(stderr, "can't parse job file: %d\n", retval); boinc_finish(retval); } read_checkpoint(ntasks_completed, checkpoint_cpu_time); if (ntasks_completed > (int)tasks.size()) { fprintf(stderr, "Checkpoint file: ntasks_completed too large: %d > %d\n", ntasks_completed, (int)tasks.size() ); boinc_finish(1); } for (i=0; i