// This file is part of BOINC. // http://boinc.berkeley.edu // Copyright (C) 2008 University of California // // BOINC is free software; you can redistribute it and/or modify it // under the terms of the GNU Lesser General Public License // as published by the Free Software Foundation, // either version 3 of the License, or (at your option) any later version. // // BOINC is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with BOINC. If not, see . // monitoring and process control of running apps #include "cpp.h" #ifdef _WIN32 #include "boinc_win.h" #include "win_util.h" #ifdef _MSC_VER #define snprintf _snprintf #endif #ifndef STATUS_SUCCESS #define STATUS_SUCCESS 0x0 // may be in ntstatus.h #endif #ifndef STATUS_DLL_INIT_FAILED #define STATUS_DLL_INIT_FAILED 0xC0000142 // may be in ntstatus.h #endif #else #include "config.h" #include #include #if HAVE_SYS_IPC_H #include #endif #if HAVE_SYS_RESOURCE_H #include #endif #ifdef HAVE_CSIGNAL #include #elif defined(HAVE_SYS_SIGNAL_H) #include #elif defined(HAVE_SIGNAL_H) #include #endif #if HAVE_SYS_WAIT_H #include #endif #include #endif using std::vector; #include "filesys.h" #include "error_numbers.h" #include "util.h" #include "str_util.h" #include "parse.h" #include "shmem.h" #include "client_msgs.h" #include "client_state.h" #include "file_names.h" #include "procinfo.h" #include "sandbox.h" #include "app.h" #ifdef _WIN32 bool ACTIVE_TASK::kill_all_children() { unsigned int i,j; std::vector ps; std::vector tps; procinfo_setup(ps); PROCINFO pi; pi.id = pid; tps.push_back(pi); for (i=0; i < tps.size(); i++) { PROCINFO tp = tps[i]; for (j=0; j < ps.size(); j++) { PROCINFO p = ps[j]; if (tp.id == p.parentid) { if (TerminateProcessById(p.id)) { tps.push_back(p); } } } } return true; } #endif // Send a quit message. // int ACTIVE_TASK::request_exit() { if (!app_client_shm.shm) return 1; process_control_queue.msg_queue_send( "", app_client_shm.shm->process_control_request ); quit_time = gstate.now; return 0; } // Send an abort message. // int ACTIVE_TASK::request_abort() { if (!app_client_shm.shm) return 1; process_control_queue.msg_queue_send( "", app_client_shm.shm->process_control_request ); return 0; } // Kill the task by OS-specific means. // int ACTIVE_TASK::kill_task(bool restart) { #ifdef _WIN32 TerminateProcess(process_handle, 1); #else #ifdef SANDBOX kill_via_switcher(pid); // Also kill app directly, just to be safe // #endif kill(pid, SIGKILL); #endif cleanup_task(); if (restart) { set_task_state(PROCESS_UNINITIALIZED, "kill_task"); char buf[256]; sprintf(buf, "restarting %s", result->name); gstate.request_enforce_schedule(result->project, buf); } else { set_task_state(PROCESS_ABORTED, "kill_task"); } return 0; } // We have sent a quit request to the process; see if it's exited. // This is called when the core client exits, // or when a project is detached or reset // bool ACTIVE_TASK::has_task_exited() { bool exited = false; if (!process_exists()) return true; #ifdef _WIN32 unsigned long exit_code; if (GetExitCodeProcess(process_handle, &exit_code)) { if (exit_code != STILL_ACTIVE) { exited = true; } } #else if (waitpid(pid, 0, WNOHANG) == pid) { exited = true; } #endif if (exited) { set_task_state(PROCESS_EXITED, "has_task_exited"); cleanup_task(); } return exited; } static void limbo_message(ACTIVE_TASK& at) { #ifdef _WIN32 if (at.result->exit_status == STATUS_DLL_INIT_FAILED) { msg_printf(at.result->project, MSG_INFO, "Task %s exited with a DLL initialization error.", at.result->name ); msg_printf(at.result->project, MSG_INFO, "If this happens repeatedly you may need to reboot your computer." ); } else { #endif msg_printf(at.result->project, MSG_INFO, "Task %s exited with zero status but no 'finished' file", at.result->name ); msg_printf(at.result->project, MSG_INFO, "If this happens repeatedly you may need to reset the project." ); #ifdef _WIN32 } #endif } static void clear_backoffs(ACTIVE_TASK* atp) { int rt = atp->result->avp->rsc_type(); if (rt == RSC_TYPE_CPU) return; for (unsigned int i=0; iavp->rsc_type() == rt) { rp->schedule_backoff = 0; } } } // handle a task that exited prematurely (i.e. the job isn't done) // void ACTIVE_TASK::handle_premature_exit(bool& will_restart) { // if it exited because we sent it a quit message, don't count // if (task_state() == PROCESS_QUIT_PENDING) { set_task_state(PROCESS_UNINITIALIZED, "handle_premature_exit"); will_restart = true; return; } // otherwise keep count of exits; // restart it unless this happens 100 times w/o a checkpoint // premature_exit_count++; if (premature_exit_count > 100) { set_task_state(PROCESS_ABORTED, "handle_premature_exit"); result->exit_status = ERR_TOO_MANY_EXITS; gstate.report_result_error(*result, "too many exit(0)s"); result->set_state(RESULT_ABORTED, "handle_premature_exit"); } else { will_restart = true; limbo_message(*this); set_task_state(PROCESS_UNINITIALIZED, "handle_premature_exit"); } } // deal with a process that has exited, for whatever reason: // - completion // - crash // - preemption via quit // #ifdef _WIN32 void ACTIVE_TASK::handle_exited_app(unsigned long exit_code) { #else void ACTIVE_TASK::handle_exited_app(int stat) { #endif bool will_restart = false; if (log_flags.task_debug) { msg_printf(result->project, MSG_INFO, "[task] Process for %s exited", result->name ); } get_app_status_msg(); get_trickle_up_msg(); result->final_cpu_time = current_cpu_time; result->final_elapsed_time = elapsed_time; if (task_state() == PROCESS_ABORT_PENDING) { set_task_state(PROCESS_ABORTED, "handle_exited_app"); } else { #ifdef _WIN32 result->exit_status = exit_code; switch(exit_code) { case STATUS_SUCCESS: // if another process killed the app, it looks like exit(0). // So check for the finish file // if (finish_file_present()) { set_task_state(PROCESS_EXITED, "handle_exited_app"); break; } double x; if (temporary_exit_file_present(x)) { set_task_state(PROCESS_UNINITIALIZED, "temporary exit"); will_restart = true; result->schedule_backoff = gstate.now + x; break; } handle_premature_exit(will_restart); break; case 0xc000013a: // control-C?? case 0x40010004: // vista shutdown?? can someone explain this? case STATUS_DLL_INIT_FAILED: // This can happen because: // - The OS is shutting down, and attempting to start // any new application fails automatically. // - The OS has run out of desktop heap // - (reportedly) The computer has just come out of hibernation // handle_premature_exit(will_restart); break; default: char szError[1024]; set_task_state(PROCESS_EXITED, "handle_exited_app"); gstate.report_result_error( *result, "%s - exit code %d (0x%x)", windows_format_error_string(exit_code, szError, sizeof(szError)), exit_code, exit_code ); if (log_flags.task_debug) { msg_printf(result->project, MSG_INFO, "[task] Process for %s exited", result->name ); msg_printf(result->project, MSG_INFO, "[task] exit code %d (0x%x): %s", exit_code, exit_code, windows_format_error_string(exit_code, szError, sizeof(szError)) ); } break; } #else if (WIFEXITED(stat)) { result->exit_status = WEXITSTATUS(stat); if (result->exit_status) { set_task_state(PROCESS_EXITED, "handle_exited_app"); gstate.report_result_error( *result, "process exited with code %d (0x%x, %d)", result->exit_status, result->exit_status, (-1<<8)|result->exit_status ); } else { if (finish_file_present()) { set_task_state(PROCESS_EXITED, "handle_exited_app"); } else { handle_premature_exit(will_restart); } } if (log_flags.task_debug) { msg_printf(result->project, MSG_INFO, "[task] exit status %d\n", result->exit_status ); } } else if (WIFSIGNALED(stat)) { int got_signal = WTERMSIG(stat); if (log_flags.task_debug) { msg_printf(result->project, MSG_INFO, "[task] process got signal %d", signal ); } // if the process was externally killed, allow it to restart. // switch (got_signal) { case SIGHUP: case SIGINT: case SIGQUIT: case SIGKILL: case SIGTERM: case SIGSTOP: will_restart = true; set_task_state(PROCESS_UNINITIALIZED, "handle_exited_app"); limbo_message(*this); break; default: result->exit_status = stat; set_task_state(PROCESS_WAS_SIGNALED, "handle_exited_app"); signal = got_signal; gstate.report_result_error( *result, "process got signal %d", signal ); } } else { result->exit_status = -1; set_task_state(PROCESS_EXIT_UNKNOWN, "handle_exited_app"); gstate.report_result_error(*result, "process exit, unknown"); msg_printf(result->project, MSG_INTERNAL_ERROR, "process exited for unknown reason" ); } #endif } cleanup_task(); if (gstate.exit_after_finish) { exit(0); } if (!will_restart) { copy_output_files(); read_stderr_file(); client_clean_out_dir(slot_dir, "handle_exited_app()"); clear_backoffs(this); // clear scheduling backoffs of jobs waiting for GPU } gstate.request_schedule_cpus("application exited"); gstate.request_work_fetch("application exited"); } bool ACTIVE_TASK::finish_file_present() { char path[256]; sprintf(path, "%s/%s", slot_dir, BOINC_FINISH_CALLED_FILE); return (boinc_file_exists(path) != 0); } bool ACTIVE_TASK::temporary_exit_file_present(double& x) { char path[256]; sprintf(path, "%s/%s", slot_dir, TEMPORARY_EXIT_FILE); FILE* f = fopen(path, "r"); if (!f) return false; int y; int n = fscanf(f, "%d", &y); fclose(f); if (n != 1 || y < 0 || y > 86400) { x = 300; } else { x = y; } return true; } void ACTIVE_TASK_SET::send_trickle_downs() { unsigned int i; ACTIVE_TASK* atp; bool sent; for (i=0; iprocess_exists()) continue; if (atp->have_trickle_down) { if (!atp->app_client_shm.shm) continue; sent = atp->app_client_shm.shm->trickle_down.send_msg("\n"); if (sent) atp->have_trickle_down = false; } if (atp->send_upload_file_status) { if (!atp->app_client_shm.shm) continue; sent = atp->app_client_shm.shm->trickle_down.send_msg("\n"); if (sent) atp->send_upload_file_status = false; } } } void ACTIVE_TASK_SET::send_heartbeats() { unsigned int i; ACTIVE_TASK* atp; char buf[1024]; double ar = gstate.available_ram(); for (i=0; iprocess_exists()) continue; if (!atp->app_client_shm.shm) continue; snprintf(buf, sizeof(buf), "" "%e" "%e", atp->procinfo.working_set_size, ar ); bool sent = atp->app_client_shm.shm->heartbeat.send_msg(buf); if (log_flags.app_msg_send) { if (sent) { msg_printf(atp->result->project, MSG_INFO, "[app_msg_send] sent heartbeat to %s", atp->result->name ); } else { msg_printf(atp->result->project, MSG_INFO, "[app_msg_send] failed to send heartbeat to %s", atp->result->name ); } } } } void ACTIVE_TASK_SET::process_control_poll() { unsigned int i; ACTIVE_TASK* atp; for (i=0; iprocess_exists()) continue; if (!atp->app_client_shm.shm) continue; // if app has had the same message in its send buffer for 180 sec, // assume it's hung and restart it // if (atp->process_control_queue.timeout(180)) { if (log_flags.task_debug) { msg_printf(atp->result->project, MSG_INFO, "Restarting %s - message timeout", atp->result->name ); } atp->kill_task(true); } else { atp->process_control_queue.msg_queue_poll( atp->app_client_shm.shm->process_control_request ); } } } // See if any processes have exited // bool ACTIVE_TASK_SET::check_app_exited() { ACTIVE_TASK* atp; bool found = false; #ifdef _WIN32 unsigned long exit_code; unsigned int i; for (i=0; iprocess_exists()) continue; if (GetExitCodeProcess(atp->process_handle, &exit_code)) { if (exit_code != STILL_ACTIVE) { found = true; atp->handle_exited_app(exit_code); } } else { if (log_flags.task_debug) { char errmsg[1024]; msg_printf(atp->result->project, MSG_INFO, "[task] task %s GetExitCodeProcess() failed - %s GLE %d (0x%x)", atp->result->name, windows_format_error_string( GetLastError(), errmsg, sizeof(errmsg) ), GetLastError(), GetLastError() ); } // The process doesn't seem to be there. // Mark task as aborted so we don't check it again. // atp->set_task_state(PROCESS_ABORTED, "check_app_exited"); } } #else int pid, stat; if ((pid = waitpid(-1, &stat, WNOHANG)) > 0) { atp = lookup_pid(pid); if (!atp) { // if we're running benchmarks, exited process // is probably a benchmark process; don't show error // if (!gstate.are_cpu_benchmarks_running() && log_flags.task_debug) { msg_printf(NULL, MSG_INTERNAL_ERROR, "Process %d not found\n", pid ); } return false; } atp->handle_exited_app(stat); found = true; } #endif return found; } // if an app has exceeded its maximum disk usage, abort it // bool ACTIVE_TASK::check_max_disk_exceeded() { double disk_usage; int retval; retval = current_disk_usage(disk_usage); if (retval) { msg_printf(this->wup->project, MSG_INTERNAL_ERROR, "Can't get task disk usage: %s", boincerror(retval) ); } else { if (disk_usage > max_disk_usage) { msg_printf( result->project, MSG_INFO, "Aborting task %s: exceeded disk limit: %.2fMB > %.2fMB\n", result->name, disk_usage/MEGA, max_disk_usage/MEGA ); abort_task(ERR_RSC_LIMIT_EXCEEDED, "Maximum disk usage exceeded"); return true; } } return false; } // Check if any of the active tasks have exceeded their // resource limits on disk, CPU time or memory // bool ACTIVE_TASK_SET::check_rsc_limits_exceeded() { unsigned int i; ACTIVE_TASK *atp; static double last_disk_check_time = 0; bool do_disk_check = false; bool did_anything = false; double ram_left = gstate.available_ram(); double max_ram = gstate.max_available_ram(); // Some slot dirs have lots of files, // so only check every min(disk_interval, 300) secs // double min_interval = gstate.global_prefs.disk_interval; if (min_interval < 300) min_interval = 300; if (gstate.now > last_disk_check_time + min_interval) { do_disk_check = true; } for (i=0; itask_state() != PROCESS_EXECUTING) continue; if (!atp->result->project->non_cpu_intensive && (atp->elapsed_time > atp->max_elapsed_time)) { msg_printf(atp->result->project, MSG_INFO, "Aborting task %s: exceeded elapsed time limit %f\n", atp->result->name, atp->max_elapsed_time ); atp->abort_task(ERR_RSC_LIMIT_EXCEEDED, "Maximum elapsed time exceeded"); did_anything = true; continue; } if (atp->procinfo.working_set_size_smoothed > max_ram) { msg_printf(atp->result->project, MSG_INFO, "Aborting task %s: exceeded memory limit %.2fMB > %.2fMB\n", atp->result->name, atp->procinfo.working_set_size_smoothed/MEGA, max_ram/MEGA ); atp->abort_task(ERR_RSC_LIMIT_EXCEEDED, "Maximum memory exceeded"); did_anything = true; continue; } if (do_disk_check && atp->check_max_disk_exceeded()) { did_anything = true; continue; } ram_left -= atp->procinfo.working_set_size_smoothed; } if (ram_left < 0) { gstate.request_schedule_cpus("RAM usage limit exceeded"); } if (do_disk_check) { last_disk_check_time = gstate.now; } return did_anything; } // If process is running, send it an "abort" message, // Set a flag so that if it doesn't exit within 5 seconds, // kill it by OS-specific mechanism (e.g. KILL signal). // This is done when app has exceeded CPU, disk, or mem limits, // or when the user has requested it. // int ACTIVE_TASK::abort_task(int exit_status, const char* msg) { if (task_state() == PROCESS_EXECUTING || task_state() == PROCESS_SUSPENDED) { set_task_state(PROCESS_ABORT_PENDING, "abort_task"); abort_time = gstate.now; request_abort(); } else { set_task_state(PROCESS_ABORTED, "abort_task"); } result->exit_status = exit_status; gstate.report_result_error(*result, msg); result->set_state(RESULT_ABORTED, "abort_task"); return 0; } // check for the stderr file, copy to result record // bool ACTIVE_TASK::read_stderr_file() { std::string stderr_file; char path[256]; // truncate stderr output to the last 63KB; // it's unlikely that more than that will be useful // int max_len = 63*1024; sprintf(path, "%s/%s", slot_dir, STDERR_FILE); if (!boinc_file_exists(path)) return false; if (read_file_string(path, stderr_file, max_len, !config.stderr_head)) { return false; } result->stderr_out += "\n"; result->stderr_out += stderr_file; result->stderr_out += "\n\n"; return true; } // tell a running app to reread project preferences. // This is called when project prefs change, // or when a user file has finished downloading. // int ACTIVE_TASK::request_reread_prefs() { int retval; link_user_files(); retval = write_app_init_file(); if (retval) return retval; graphics_request_queue.msg_queue_send( xml_graphics_modes[MODE_REREAD_PREFS], app_client_shm.shm->graphics_request ); return 0; } // tell a running app to reread the app_info file // (e.g. because proxy settings have changed: this is for F@h) // int ACTIVE_TASK::request_reread_app_info() { int retval = write_app_init_file(); if (retval) return retval; process_control_queue.msg_queue_send( "", app_client_shm.shm->process_control_request ); return 0; } // tell all running apps of a project to reread prefs // void ACTIVE_TASK_SET::request_reread_prefs(PROJECT* project) { unsigned int i; ACTIVE_TASK* atp; for (i=0; iresult->project != project) continue; if (!atp->process_exists()) continue; atp->request_reread_prefs(); } } void ACTIVE_TASK_SET::request_reread_app_info() { for (unsigned int i=0; iprocess_exists()) continue; atp->request_reread_app_info(); } } // send quit signal to all tasks in the project // (or all tasks, if proj==0). // If they don't exit in 5 seconds, // send them a kill signal and wait up to 5 more seconds to exit. // This is called when the core client exits, // or when a project is detached or reset // int ACTIVE_TASK_SET::exit_tasks(PROJECT* proj) { request_tasks_exit(proj); // Wait 5 seconds for them to exit normally; if they don't then kill them // if (wait_for_exit(5, proj)) { kill_tasks(proj); } wait_for_exit(5, proj); // get final checkpoint_cpu_times // get_msgs(); gstate.request_schedule_cpus("exit_tasks"); return 0; } // Wait up to wait_time seconds for processes to exit // If proj is zero, wait for all processes, else that project's // NOTE: it's bad form to sleep, but it would be complex to avoid it here // int ACTIVE_TASK_SET::wait_for_exit(double wait_time, PROJECT* proj) { bool all_exited; unsigned int i,n; ACTIVE_TASK *atp; for (i=0; i<10; i++) { all_exited = true; for (n=0; nwup->project != proj) continue; if (!atp->has_task_exited()) { all_exited = false; break; } } if (all_exited) return 0; boinc_sleep(wait_time/10.0); } return ERR_NOT_EXITED; } int ACTIVE_TASK_SET::abort_project(PROJECT* project) { vector::iterator task_iter; ACTIVE_TASK* atp; exit_tasks(project); task_iter = active_tasks.begin(); while (task_iter != active_tasks.end()) { atp = *task_iter; if (atp->result->project == project) { task_iter = active_tasks.erase(task_iter); delete atp; } else { task_iter++; } } return 0; } // suspend all currently running tasks // called only from CLIENT_STATE::suspend_tasks(), // e.g. because on batteries, time of day, benchmarking, CPU throttle, etc. // void ACTIVE_TASK_SET::suspend_all(int reason) { for (unsigned int i=0; itask_state() != PROCESS_EXECUTING) continue; switch (reason) { case SUSPEND_REASON_CPU_THROTTLE: // if we're doing CPU throttling, don't bother suspending apps // that don't use a full CPU // if (atp->result->project->non_cpu_intensive) continue; if (atp->app_version->avg_ncpus < 1) continue; atp->preempt(REMOVE_NEVER); break; case SUSPEND_REASON_BENCHMARKS: atp->preempt(REMOVE_NEVER); break; case SUSPEND_REASON_CPU_USAGE: if (atp->result->project->non_cpu_intensive) break; // fall through default: atp->preempt(REMOVE_MAYBE_USER); } } } // resume all currently scheduled tasks // void ACTIVE_TASK_SET::unsuspend_all() { unsigned int i; ACTIVE_TASK* atp; for (i=0; ischeduler_state != CPU_SCHED_SCHEDULED) continue; if (atp->task_state() == PROCESS_UNINITIALIZED) { if (atp->start(false)) { msg_printf(atp->wup->project, MSG_INTERNAL_ERROR, "Couldn't restart task %s", atp->result->name ); } } else if (atp->task_state() == PROCESS_SUSPENDED) { atp->unsuspend(); } } } // Check to see if any tasks are running // called if benchmarking and waiting for suspends to happen // or the system needs to suspend itself so we are suspending // the applications // bool ACTIVE_TASK_SET::is_task_executing() { unsigned int i; ACTIVE_TASK* atp; for (i=0; itask_state() == PROCESS_EXECUTING) { return true; } } return false; } // Send quit message to all app processes // This is called when the core client exits, // or when a project is detached or reset // void ACTIVE_TASK_SET::request_tasks_exit(PROJECT* proj) { unsigned int i; ACTIVE_TASK *atp; for (i=0; iwup->project != proj) continue; if (!atp->process_exists()) continue; atp->request_exit(); } } // Send kill signal to all app processes // Don't wait for them to exit // void ACTIVE_TASK_SET::kill_tasks(PROJECT* proj) { unsigned int i; ACTIVE_TASK *atp; for (i=0; iwup->project != proj) continue; if (!atp->process_exists()) continue; atp->kill_task(false); } } // send a message // int ACTIVE_TASK::suspend() { if (!app_client_shm.shm) return 0; if (task_state() != PROCESS_EXECUTING) { msg_printf(result->project, MSG_INFO, "Internal error: expected process %s to be executing", result->name ); } int n = process_control_queue.msg_queue_purge(""); if (n == 0) { process_control_queue.msg_queue_send( "", app_client_shm.shm->process_control_request ); } set_task_state(PROCESS_SUSPENDED, "suspend"); return 0; } // resume a suspended task // int ACTIVE_TASK::unsuspend() { if (!app_client_shm.shm) return 0; if (task_state() != PROCESS_SUSPENDED) { msg_printf(result->project, MSG_INFO, "Internal error: expected process %s to be suspended", result->name ); } if (log_flags.cpu_sched) { msg_printf(result->project, MSG_INFO, "[cpu_sched] Resuming %s", result->name ); } int n = process_control_queue.msg_queue_purge(""); if (n == 0) { process_control_queue.msg_queue_send( "", app_client_shm.shm->process_control_request ); } set_task_state(PROCESS_EXECUTING, "unsuspend"); return 0; } void ACTIVE_TASK::send_network_available() { if (!app_client_shm.shm) return; process_control_queue.msg_queue_send( "", app_client_shm.shm->process_control_request ); return; } // See if the app has placed a new message in shared mem // (with CPU done, frac done etc.) // If so parse it and return true. // bool ACTIVE_TASK::get_app_status_msg() { char msg_buf[MSG_CHANNEL_SIZE]; double fd; if (!app_client_shm.shm) { msg_printf(result->project, MSG_INFO, "Task %s: no shared memory segment", result->name ); return false; } if (!app_client_shm.shm->app_status.get_msg(msg_buf)) { return false; } if (log_flags.app_msg_receive) { msg_printf(this->wup->project, MSG_INFO, "[app_msg_receive] got msg from slot %d: %s", slot, msg_buf ); } want_network = 0; current_cpu_time = checkpoint_cpu_time = 0.0; if (parse_double(msg_buf, "", fd)) { // fraction_done will be reported as zero // until the app's first call to boinc_fraction_done(). // So ignore zeros. // if (fd) fraction_done = fd; } parse_double(msg_buf, "", current_cpu_time); parse_double(msg_buf, "", checkpoint_cpu_time); parse_double(msg_buf, "", result->fpops_per_cpu_sec); parse_double(msg_buf, "", result->fpops_cumulative); parse_double(msg_buf, "", result->intops_per_cpu_sec); parse_double(msg_buf, "", result->intops_cumulative); parse_int(msg_buf, "", want_network); if (current_cpu_time < 0) { msg_printf(result->project, MSG_INFO, "app reporting negative CPU: %f", current_cpu_time ); current_cpu_time = 0; } if (checkpoint_cpu_time < 0) { msg_printf(result->project, MSG_INFO, "app reporting negative checkpoint CPU: %f", checkpoint_cpu_time ); checkpoint_cpu_time = 0; } return true; } bool ACTIVE_TASK::get_trickle_up_msg() { char msg_buf[MSG_CHANNEL_SIZE]; bool found = false; int retval; if (!app_client_shm.shm) return false; if (app_client_shm.shm->trickle_up.get_msg(msg_buf)) { if (match_tag(msg_buf, "")) { retval = move_trickle_file(); if (!retval) { wup->project->trickle_up_pending = true; } } if (match_tag(msg_buf, "")) { handle_upload_files(); } found = true; } return found; } // check for msgs from active tasks. // void ACTIVE_TASK_SET::get_msgs() { unsigned int i; ACTIVE_TASK *atp; double old_time; static double last_time=0; double delta_t; if (last_time) { delta_t = gstate.now - last_time; // Normally this is called every second. // If delta_t is > 10, we'll assume that a period of hibernation // or suspension happened, and treat it as zero. // If negative, must be clock reset. Ignore. // if (delta_t > 10 || delta_t < 0) { delta_t = 0; } } else { delta_t = 0; } last_time = gstate.now; for (i=0; iprocess_exists()) continue; old_time = atp->checkpoint_cpu_time; if (atp->task_state() == PROCESS_EXECUTING) { atp->elapsed_time += delta_t; } if (atp->get_app_status_msg()) { if (old_time != atp->checkpoint_cpu_time) { char buf[256]; sprintf(buf, "%s checkpointed", atp->result->name); gstate.request_enforce_schedule(atp->result->project, buf); atp->checkpoint_wall_time = gstate.now; atp->premature_exit_count = 0; atp->checkpoint_elapsed_time = atp->elapsed_time; if (log_flags.task_debug) { msg_printf(atp->wup->project, MSG_INFO, "[task] result %s checkpointed", atp->result->name ); } else if (log_flags.checkpoint_debug) { msg_printf(atp->wup->project, MSG_INFO, "[checkpoint] result %s checkpointed", atp->result->name ); } atp->write_task_state_file(); } } atp->get_trickle_up_msg(); } } // write checkpoint state to a file in the slot dir // (this avoids rewriting the state file on each checkpoint) // void ACTIVE_TASK::write_task_state_file() { char path[1024]; sprintf(path, "%s/%s", slot_dir, TASK_STATE_FILENAME); FILE* f = fopen(path, "w"); if (!f) return; fprintf(f, "\n" " %s\n" " %s\n" " %f\n" " %f\n" " %f\n" "\n", result->project->master_url, result->name, checkpoint_cpu_time, checkpoint_elapsed_time, fraction_done ); fclose(f); } // called on startup; read the task state file in case it's more recent // then the main state file // void ACTIVE_TASK::read_task_state_file() { char buf[4096], path[1024], s[1024]; sprintf(path, "%s/%s", slot_dir, TASK_STATE_FILENAME); FILE* f = fopen(path, "r"); if (!f) return; buf[0] = 0; fread(buf, 1, 4096, f); fclose(f); buf[4095] = 0; double x; // sanity checks - project and result name must match // if (!parse_str(buf, "", s, sizeof(s))) { return; } if (strcmp(s, result->project->master_url)) { return; } if (!parse_str(buf, "", s, sizeof(s))) { return; } if (strcmp(s, result->name)) { return; } if (parse_double(buf, "", x)) { if (x > checkpoint_cpu_time) { checkpoint_cpu_time = x; } } if (parse_double(buf, "", x)) { if (x > checkpoint_elapsed_time) { checkpoint_elapsed_time = x; } } }