// This file is part of BOINC. // http://boinc.berkeley.edu // Copyright (C) 2008 University of California // // BOINC is free software; you can redistribute it and/or modify it // under the terms of the GNU Lesser General Public License // as published by the Free Software Foundation, // either version 3 of the License, or (at your option) any later version. // // BOINC is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with BOINC. If not, see . // wrapper.C // wrapper program - lets you use non-BOINC apps with BOINC // // Handles: // - suspend/resume/quit/abort // - reporting CPU time // - loss of heartbeat from core client // - checkpointing // (at the level of task; or potentially within task) // // See http://boinc.berkeley.edu/trac/wiki/WrapperApp for details // Contributor: Andrew J. Younge (ajy4490@umiacs.umd.edu) #ifndef _WIN32 #include "config.h" #endif #include #include #include #ifdef _WIN32 #include "boinc_win.h" #include "win_util.h" #else #ifdef HAVE_SYS_WAIT_H #include #endif #include #include #ifdef HAVE_SYS_TIME_H #include #endif #ifdef HAVE_SYS_RESOURCE_H #include #endif #include #endif #include "boinc_api.h" #include "boinc_zip.h" #include "diagnostics.h" #include "error_numbers.h" #include "filesys.h" #include "parse.h" #include "proc_control.h" #include "procinfo.h" #include "str_util.h" #include "str_replace.h" #include "util.h" #include "regexp.h" //#define DEBUG #if 1 #define debug_msg(x) #else inline void debug_msg(const char* x) { fprintf(stderr, "%s\n", x); } #endif #define JOB_FILENAME "job.xml" #define CHECKPOINT_FILENAME "wrapper_checkpoint.txt" #define POLL_PERIOD 1.0 using std::vector; using std::string; int nthreads = 1; struct TASK { string application; string exec_dir; // optional execution directory; // macro-substituted for $PROJECT_DIR and $NTHREADS vector vsetenv; // vector of strings for environment variables // macro-substituted string stdin_filename; string stdout_filename; string stderr_filename; string checkpoint_filename; // name of task's checkpoint file, if any string fraction_done_filename; // name of file where app will write its fraction done string command_line; // macro-substituted double weight; // contribution of this task to overall fraction done bool is_daemon; bool append_cmdline_args; bool multi_process; // dynamic stuff follows double current_cpu_time; // most recently measure CPU time of this task double final_cpu_time; // final CPU time of this task double starting_cpu; // how much CPU time was used by tasks before this one bool suspended; #ifdef _WIN32 HANDLE pid_handle; DWORD pid; HANDLE thread_handle; struct _stat last_stat; // mod time of checkpoint file #else int pid; struct stat last_stat; double start_rusage; // getrusage() CPU time at start of task #endif bool stat_first; int parse(XML_PARSER&); bool poll(int& status); int run(int argc, char** argv); void kill(); void stop(); void resume(); double cpu_time(); inline bool has_checkpointed() { bool changed = false; if (checkpoint_filename.size() == 0) return false; struct stat new_stat; int retval = stat(checkpoint_filename.c_str(), &new_stat); if (retval) return false; if (!stat_first && new_stat.st_mtime != last_stat.st_mtime) { changed = true; } stat_first = false; last_stat.st_mtime = new_stat.st_mtime; return changed; } inline double fraction_done() { if (fraction_done_filename.size() == 0) return 0; FILE* f = fopen(fraction_done_filename.c_str(), "r"); if (!f) return 0; // read the last line of the file // fseek(f, -32, SEEK_END); double temp, frac = 0; while (!feof(f)) { char buf[256]; char* p = fgets(buf, 256, f); if (p == NULL) break; int n = sscanf(buf, "%lf", &temp); if (n == 1) frac = temp; } fclose(f); if (frac < 0) return 0; if (frac > 1) return 1; return frac; } #ifdef _WIN32 // Windows uses a "null-terminated sequence of null-terminated strings" // to represent env vars. // I guess arg/argv didn't cut it for them. // void set_up_env_vars(char** env_vars, const int nvars) { int bufsize = 0; int len = 0; for (int j = 0; j < nvars; j++) { bufsize += (1 + vsetenv[j].length()); } bufsize++; // add a final byte for array null ptr *env_vars = new char[bufsize]; memset(*env_vars, 0, sizeof(char) * bufsize); char* p = *env_vars; // copy each env string to a buffer for the process for (vector::iterator it = vsetenv.begin(); it != vsetenv.end() && len < bufsize-1; it++ ) { strncpy(p, it->c_str(), it->length()); len = strlen(p); p += len + 1; // move pointer ahead } } #else void set_up_env_vars(char*** env_vars, const int nvars) { *env_vars = new char*[nvars+1]; // need one more than the # of vars, for a NULL ptr at the end memset(*env_vars, 0x00, sizeof(char*) * (nvars+1)); // get all environment vars for this task for (int i = 0; i < nvars; i++) { (*env_vars)[i] = const_cast(vsetenv[i].c_str()); } } #endif }; vector tasks; vector daemons; vector unzip_filenames; string zip_filename; vector zip_patterns; APP_INIT_DATA aid; // replace s1 with s2 // void str_replace_all(char* buf, const char* s1, const char* s2) { char buf2[64000]; while (1) { char* p = strstr(buf, s1); if (!p) break; strcpy(buf2, p+strlen(s1)); strcpy(p, s2); strcat(p, buf2); } } // macro-substitute strings from job.xml // $PROJECT_DIR -> project directory // $NTHREADS --> --nthreads arg if present, else 1 // void macro_substitute(char* buf) { const char* pd = strlen(aid.project_dir)?aid.project_dir:"."; str_replace_all(buf, "$PROJECT_DIR", pd); char nt[256]; sprintf(nt, "%d", nthreads); str_replace_all(buf, "$NTHREADS", nt); } // make a list of files in the slot directory, // and write to "initial_file_list" // void get_initial_file_list() { char fname[256]; vector initial_files; DIRREF d = dir_open("."); while (!dir_scan(fname, d, sizeof(fname))) { initial_files.push_back(fname); } dir_close(d); FILE* f = fopen("initial_file_list_temp", "w"); for (unsigned int i=0; i& files) { char buf[256]; FILE* f = fopen("initial_file_list", "r"); if (!f) return; while (fgets(buf, sizeof(buf), f)) { strip_whitespace(buf); files.push_back(string(buf)); } fclose(f); } // if any zipped input files are present, unzip and remove them // void do_unzip_inputs() { for (unsigned int i=0; i& v) { for (unsigned int i=0; i initial_files; char fname[256]; read_initial_file_list(initial_files); DIRREF d = dir_open("."); while (!dir_scan(fname, d, sizeof(fname))) { string filename = string(fname); if (in_vector(filename, initial_files)) continue; for (unsigned int i=0; i daemon_pids; for (unsigned int i=0; i 0) { set_up_env_vars(&env_vars, nvars); } BOOL success; if (ends_with((string)app_path, ".bat")) { char cmd[1024]; sprintf(cmd, "cmd.exe /c %s", command.c_str()); success = CreateProcess( "cmd.exe", (LPSTR)cmd, NULL, NULL, TRUE, // bInheritHandles CREATE_NO_WINDOW|IDLE_PRIORITY_CLASS, (LPVOID) env_vars, exec_dir.empty()?NULL:exec_dir.c_str(), &startup_info, &process_info ); } else { success = CreateProcess( app_path, (LPSTR)command.c_str(), NULL, NULL, TRUE, // bInheritHandles CREATE_NO_WINDOW|IDLE_PRIORITY_CLASS, (LPVOID) env_vars, exec_dir.empty()?NULL:exec_dir.c_str(), &startup_info, &process_info ); } if (!success) { char error_msg[1024]; windows_format_error_string(GetLastError(), error_msg, sizeof(error_msg)); fprintf(stderr, "can't run app: %s\n", error_msg); fprintf(stderr, "Error: app_path is '%s'\n", app_path); fprintf(stderr, "Error: command is '%s'\n", command.c_str()); fprintf(stderr, "Error: exec_dir is '%s'\n", exec_dir.c_str()); if (env_vars) delete [] env_vars; return ERR_EXEC; } if (env_vars) delete [] env_vars; pid_handle = process_info.hProcess; pid = process_info.dwProcessId; thread_handle = process_info.hThread; SetThreadPriority(thread_handle, THREAD_PRIORITY_IDLE); #else int retval; char* argv[256]; char arglist[4096]; FILE* stdout_file; FILE* stdin_file; FILE* stderr_file; struct rusage ru; getrusage(RUSAGE_CHILDREN, &ru); start_rusage = (float)ru.ru_utime.tv_sec + ((float)ru.ru_utime.tv_usec)/1e+6; pid = fork(); if (pid == -1) { perror("fork(): "); return ERR_FORK; } if (pid == 0) { // we're in the child process here // // open stdout, stdin if file names are given // NOTE: if the application is restartable, // we should deal with atomicity somehow // if (stdout_filename != "") { boinc_resolve_filename_s(stdout_filename.c_str(), stdout_path); stdout_file = freopen(stdout_path.c_str(), "a", stdout); if (!stdout_file) { fprintf(stderr, "Can't open %s for stdout; exiting\n", stdout_path.c_str()); return ERR_FOPEN; } } if (stdin_filename != "") { boinc_resolve_filename_s(stdin_filename.c_str(), stdin_path); stdin_file = freopen(stdin_path.c_str(), "r", stdin); if (!stdin_file) { fprintf(stderr, "Can't open %s for stdin; exiting\n", stdin_path.c_str()); return ERR_FOPEN; } } if (stderr_filename != "") { boinc_resolve_filename_s(stderr_filename.c_str(), stderr_path); stderr_file = freopen(stderr_path.c_str(), "a", stderr); if (!stderr_file) { fprintf(stderr, "Can't open %s for stderr; exiting\n", stderr_path.c_str()); return ERR_FOPEN; } } // construct argv // TODO: use malloc instead of stack var // argv[0] = app_path; strlcpy(arglist, command_line.c_str(), sizeof(arglist)); parse_command_line(arglist, argv+1); setpriority(PRIO_PROCESS, 0, PROCESS_IDLE_PRIORITY); if (!exec_dir.empty()) { retval = chdir(exec_dir.c_str()); if (!retval) { fprintf(stderr, "chdir() to %s failed\n", exec_dir.c_str()); exit(1); } } // setup environment variables (if any) // const int nvars = vsetenv.size(); char** env_vars = NULL; if (nvars > 0) { set_up_env_vars(&env_vars, nvars); retval = execve(app_path, argv, env_vars); } else { retval = execv(app_path, argv); } perror("execv() failed: "); exit(ERR_EXEC); } // pid = 0 i.e. child proc of the fork #endif suspended = false; return 0; } bool TASK::poll(int& status) { #ifdef _WIN32 unsigned long exit_code; if (GetExitCodeProcess(pid_handle, &exit_code)) { if (exit_code != STILL_ACTIVE) { status = exit_code; final_cpu_time = current_cpu_time; #ifdef DEBUG fprintf(stderr, "process exited; current CPU %f final CPU %f\n", current_cpu_time, final_cpu_time ); #endif return true; } } #else int wpid; struct rusage ru; wpid = waitpid(pid, &status, WNOHANG); if (wpid) { getrusage(RUSAGE_CHILDREN, &ru); final_cpu_time = (float)ru.ru_utime.tv_sec + ((float)ru.ru_utime.tv_usec)/1e+6; final_cpu_time -= start_rusage; #ifdef DEBUG fprintf(stderr, "process exited; current CPU %f final CPU %f\n", current_cpu_time, final_cpu_time ); #endif if (final_cpu_time < current_cpu_time) { final_cpu_time = current_cpu_time; } return true; } #endif return false; } // kill this task (gracefully if possible) and any other subprocesses // void TASK::kill() { kill_daemons(); #ifdef _WIN32 kill_descendants(); #else kill_descendants(pid); #endif } void TASK::stop() { if (multi_process) { suspend_or_resume_descendants(false); } else { suspend_or_resume_process(pid, false); } suspended = true; } void TASK::resume() { if (multi_process) { suspend_or_resume_descendants(true); } else { suspend_or_resume_process(pid, true); } suspended = false; } // Get the CPU time of the app while it's running. // This totals the CPU time of all the descendant processes, // so it shouldn't be called too frequently. // double TASK::cpu_time() { double x = process_tree_cpu_time(pid); // if the process has exited, the above could return zero. // So update carefully. // if (x > current_cpu_time) { current_cpu_time = x; } return current_cpu_time; } void poll_boinc_messages(TASK& task) { BOINC_STATUS status; boinc_get_status(&status); //fprintf(stderr, "wrapper: polling\n"); if (status.no_heartbeat) { debug_msg("wrapper: kill"); task.kill(); exit(0); } if (status.quit_request) { debug_msg("wrapper: quit"); task.kill(); exit(0); } if (status.abort_request) { debug_msg("wrapper: abort"); task.kill(); exit(0); } if (status.suspended) { if (!task.suspended) { debug_msg("wrapper: suspend"); task.stop(); } } else { if (task.suspended) { debug_msg("wrapper: resume"); task.resume(); } } } // Support for multiple tasks. // We keep a checkpoint file that says how many tasks we've completed // and how much CPU time has been used so far // void write_checkpoint(int ntasks_completed, double cpu) { boinc_begin_critical_section(); FILE* f = fopen(CHECKPOINT_FILENAME, "w"); if (!f) return; fprintf(f, "%d %f\n", ntasks_completed, cpu); fclose(f); boinc_checkpoint_completed(); } int read_checkpoint(int& ntasks_completed, double& cpu) { int nt; double c; ntasks_completed = 0; cpu = 0; FILE* f = fopen(CHECKPOINT_FILENAME, "r"); if (!f) return ERR_FOPEN; int n = fscanf(f, "%d %lf", &nt, &c); fclose(f); if (n != 2) return 0; ntasks_completed = nt; cpu = c; return 0; } int main(int argc, char** argv) { BOINC_OPTIONS options; int retval, ntasks_completed; unsigned int i; double total_weight=0, weight_completed=0; double checkpoint_cpu_time; // total CPU time at last checkpoint #ifdef _WIN32 SetPriorityClass(GetCurrentProcess(), NORMAL_PRIORITY_CLASS); #endif for (int j=1; j (int)tasks.size()) { fprintf(stderr, "Checkpoint file: ntasks_completed too large: %d > %d\n", ntasks_completed, (int)tasks.size() ); boinc_finish(1); } for (i=0; i