// The contents of this file are subject to the BOINC Public License // Version 1.0 (the "License"); you may not use this file except in // compliance with the License. You may obtain a copy of the License at // http://boinc.berkeley.edu/license_1.0.txt // // Software distributed under the License is distributed on an "AS IS" // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the // License for the specific language governing rights and limitations // under the License. // // The Original Code is the Berkeley Open Infrastructure for Network Computing. // // The Initial Developer of the Original Code is the SETI@home project. // Portions created by the SETI@home project are Copyright (C) 2002 // University of California at Berkeley. All Rights Reserved. // // Contributor(s): // // monitoring and process control of running apps #include "cpp.h" #ifdef _WIN32 #include "boinc_win.h" #else #include #include #if HAVE_SYS_IPC_H #include #endif #if HAVE_SYS_RESOURCE_H #include #endif #if HAVE_SYS_SIGNAL_H #include #endif #if HAVE_SYS_WAIT_H #include #endif #endif using std::vector; #include "filesys.h" #include "error_numbers.h" #include "util.h" #include "parse.h" #include "shmem.h" #include "client_msgs.h" #include "client_state.h" #include "file_names.h" #include "app.h" bool ACTIVE_TASK::process_exists() { if (state == PROCESS_EXECUTING) return true; if (state == PROCESS_SUSPENDED) return true; if (state == PROCESS_ABORT_PENDING) return true; return false; } // Send a quit message. // int ACTIVE_TASK::request_exit() { if (!app_client_shm.shm) return 1; process_control_queue.msg_queue_send( "", app_client_shm.shm->process_control_request ); return 0; } // send a kill signal. // This is not caught by the process // int ACTIVE_TASK::kill_task() { #ifdef _WIN32 return !TerminateProcess(pid_handle, -1); #else return kill(pid, SIGKILL); #endif } #if !defined(HAVE_WAIT4) && defined(HAVE_WAIT3) #include struct proc_info_t { int status; rusage r; proc_info_t() {}; proc_info_t(int s, const rusage &ru); }; proc_info_t::proc_info_t(int s, const rusage &ru) : status(s), r(ru) {} pid_t wait4(pid_t pid, int *statusp, int options, struct rusage *rusagep) { static std::map proc_info; pid_t tmp_pid=0; if (!pid) { return wait3(statusp,options,rusagep); } else { if (proc_info.find(pid) == proc_info.end()) { do { tmp_pid=wait3(statusp,options,rusagep); if ((tmp_pid>0) && (tmp_pid != pid)) { proc_info[tmp_pid]=proc_info_t(*statusp,*rusagep); if (!(options && WNOHANG)) { tmp_pid=0; } } else { return pid; } } while (!tmp_pid); } else { *statusp=proc_info[pid].status; *rusagep=proc_info[pid].r; proc_info.erase(pid); return pid; } } } #endif // We have sent a quit request to the process; see if it's exited. // This is called when the core client exits, // or when a project is detached or reset // bool ACTIVE_TASK::has_task_exited() { bool exited = false; if (!process_exists()) return true; #ifdef _WIN32 unsigned long exit_code; if (GetExitCodeProcess(pid_handle, &exit_code)) { if (exit_code != STILL_ACTIVE) { exited = true; } } #else int my_pid, stat; struct rusage rs; my_pid = wait4(pid, &stat, WNOHANG, &rs); if (my_pid == pid) { exited = true; } #endif if (exited) { state = PROCESS_EXITED; } return exited; } // preempt this task // called from the CLIENT_STATE::schedule_cpus() // if quit_task is true always do this by quitting (we're low on swap space) // int ACTIVE_TASK::preempt(bool quit_task) { int retval; if (quit_task) { retval = request_exit(); pending_suspend_via_quit = true; } else { retval = suspend(); } scheduler_state = CPU_SCHED_PREEMPTED; msg_printf(result->project, MSG_INFO, "Pausing result %s (%s)", result->name, (quit_task ? "removed from memory" : "left in memory") ); return 0; } static void limbo_message(ACTIVE_TASK& at) { msg_printf(at.result->project, MSG_INFO, "Result %s exited with zero status but no 'finished' file", at.result->name ); msg_printf(at.result->project, MSG_INFO, "If this happens repeatedly you may need to reset the project." ); } // deal with a process that has exited, for whatever reason // (including preemption) // #ifdef _WIN32 bool ACTIVE_TASK::handle_exited_app(unsigned long exit_code) { get_app_status_msg(); get_trickle_up_msg(); result->final_cpu_time = checkpoint_cpu_time; if (state == PROCESS_ABORT_PENDING) { state = PROCESS_ABORTED; } else { state = PROCESS_EXITED; exit_status = exit_code; if (exit_code) { char szError[1024]; gstate.report_result_error( *result, "%s - exit code %d (0x%x)", windows_format_error_string(exit_code, szError, sizeof(szError)), exit_code, exit_code ); } else { if (pending_suspend_via_quit) { pending_suspend_via_quit = false; state = PROCESS_UNINITIALIZED; close_process_handles(); return true; } if (!finish_file_present()) { scheduler_state = CPU_SCHED_PREEMPTED; state = PROCESS_UNINITIALIZED; close_process_handles(); limbo_message(*this); return true; } } result->exit_status = exit_status; } if (app_client_shm.shm) { detach_shmem(shm_handle, app_client_shm.shm); app_client_shm.shm = NULL; } read_stderr_file(); clean_out_dir(slot_dir); return true; } #else bool ACTIVE_TASK::handle_exited_app(int stat) { SCOPE_MSG_LOG scope_messages(log_messages, CLIENT_MSG_LOG::DEBUG_TASK); get_app_status_msg(); get_trickle_up_msg(); result->final_cpu_time = checkpoint_cpu_time; if (state == PROCESS_ABORT_PENDING) { state = PROCESS_ABORTED; } else { if (WIFEXITED(stat)) { state = PROCESS_EXITED; exit_status = WEXITSTATUS(stat); if (exit_status) { gstate.report_result_error( *result, "process exited with code %d (0x%x)", exit_status, exit_status ); } else { // check for cases where an app exits // without it being done from core client's point of view; // in these cases, don't clean out slot dir // if (pending_suspend_via_quit) { pending_suspend_via_quit = false; state = PROCESS_UNINITIALIZED; // destroy shm, since restarting app will re-create it // detach_and_destroy_shmem(); return true; } if (!finish_file_present()) { // The process looks like it exited normally // but there's no "finish file". // Assume it was externally killed, // and just leave it there // (assume user is about to exit core client) // #if 0 state = PROCESS_IN_LIMBO; #else scheduler_state = CPU_SCHED_PREEMPTED; state = PROCESS_UNINITIALIZED; detach_and_destroy_shmem(); #endif limbo_message(*this); return true; } } result->exit_status = exit_status; scope_messages.printf( "ACTIVE_TASK::handle_exited_app(): process exited: status %d\n", exit_status ); } else if (WIFSIGNALED(stat)) { int got_signal = WTERMSIG(stat); // if the process was externally killed, allow it to restart. // switch(got_signal) { case SIGHUP: case SIGINT: case SIGQUIT: case SIGKILL: case SIGTERM: case SIGSTOP: state = PROCESS_IN_LIMBO; limbo_message(*this); return true; } exit_status = stat; result->exit_status = exit_status; state = PROCESS_WAS_SIGNALED; signal = got_signal; gstate.report_result_error( *result, "process got signal %d", signal ); scope_messages.printf("ACTIVE_TASK::handle_exited_app(): process got signal %d\n", signal); } else { state = PROCESS_EXIT_UNKNOWN; result->state = PROCESS_EXIT_UNKNOWN; } } read_stderr_file(); clean_out_dir(slot_dir); return true; } #endif bool ACTIVE_TASK::finish_file_present() { char path[256]; sprintf(path, "%s%s%s", slot_dir, PATH_SEPARATOR, BOINC_FINISH_CALLED_FILE); return boinc_file_exists(path); } void ACTIVE_TASK_SET::send_trickle_downs() { unsigned int i; ACTIVE_TASK* atp; bool sent; for (i=0; iprocess_exists()) continue; if (atp->have_trickle_down) { if (!atp->app_client_shm.shm) continue; sent = atp->app_client_shm.shm->trickle_down.send_msg("\n"); if (sent) atp->have_trickle_down = false; } } } void ACTIVE_TASK_SET::send_heartbeats() { unsigned int i; ACTIVE_TASK* atp; for (i=0; iprocess_exists()) continue; if (!atp->app_client_shm.shm) continue; atp->app_client_shm.shm->heartbeat.send_msg("\n"); } } void ACTIVE_TASK_SET::process_control_poll() { unsigned int i; ACTIVE_TASK* atp; for (i=0; iprocess_exists()) continue; if (!atp->app_client_shm.shm) continue; atp->process_control_queue.msg_queue_poll( atp->app_client_shm.shm->process_control_request ); } } // See if any processes have exited // bool ACTIVE_TASK_SET::check_app_exited() { ACTIVE_TASK* atp; bool found = false; SCOPE_MSG_LOG scope_messages(log_messages, CLIENT_MSG_LOG::DEBUG_TASK); #ifdef _WIN32 unsigned long exit_code; unsigned int i; for (i=0; iprocess_exists()) continue; if (GetExitCodeProcess(atp->pid_handle, &exit_code)) { if (exit_code != STILL_ACTIVE) { scope_messages.printf("ACTIVE_TASK_SET::check_app_exited(): Process exited with code %d\n", exit_code); found = true; atp->handle_exited_app(exit_code); } } } #else int pid; int stat; struct rusage rs; if ((pid = wait4(0, &stat, WNOHANG, &rs)) > 0) { scope_messages.printf("ACTIVE_TASK_SET::check_app_exited(): process %d is done\n", pid); atp = lookup_pid(pid); if (!atp) { msg_printf(NULL, MSG_ERROR, "ACTIVE_TASK_SET::check_app_exited(): pid %d not found\n", pid); return false; } atp->handle_exited_app(stat); found = true; } #endif if (found) gstate.must_schedule_cpus = true; return found; } // if an app has exceeded its maximum CPU time, abort it // bool ACTIVE_TASK::check_max_cpu_exceeded() { if (current_cpu_time > max_cpu_time) { msg_printf(result->project, MSG_INFO, "Aborting result %s: exceeded CPU time limit %f\n", result->name, max_cpu_time ); abort_task("Maximum CPU time exceeded"); return true; } return false; } // if an app has exceeded its maximum disk usage, abort it // bool ACTIVE_TASK::check_max_disk_exceeded() { double disk_usage; int retval; // don't do disk check too often // retval = current_disk_usage(disk_usage); if (retval) { msg_printf(0, MSG_ERROR, "Can't get application disk usage: %d", retval); } else { if (disk_usage > max_disk_usage) { msg_printf( result->project, MSG_INFO, "Aborting result %s: exceeded disk limit: %f > %f\n", result->name, disk_usage, max_disk_usage ); abort_task("Maximum disk usage exceeded"); return true; } } return false; } #if 0 // if an app has exceeded its maximum allowed memory, abort it // bool ACTIVE_TASK::check_max_mem_exceeded() { // TODO: calculate working set size elsewhere if (working_set_size > max_mem_usage || working_set_size/1048576 > gstate.global_prefs.max_memory_mbytes) { msg_printf( result->project, MSG_INFO, "Aborting result %s: exceeded memory limit %f\n", result->name, min(max_mem_usage, gstate.global_prefs.max_memory_mbytes*1048576) ); abort_task("Maximum memory usage exceeded"); return true; } return false; } #endif bool ACTIVE_TASK::check_max_mem_exceeded() { if (max_mem_usage != 0 && resident_set_size*1024 > max_mem_usage) { msg_printf( result->project, MSG_INFO, "Aborting result %s: exceeded memory limit %f\n", result->name, max_mem_usage ); abort_task("Maximum memory usage exceeded"); return true; } return false; } bool ACTIVE_TASK_SET::vm_limit_exceeded(double vm_limit) { unsigned int i; ACTIVE_TASK *atp; double total_vm_usage = 0; for (i=0; iprocess_exists()) continue; total_vm_usage += atp->vm_size; } return (total_vm_usage > vm_limit); } // Check if any of the active tasks have exceeded their // resource limits on disk, CPU time or memory // bool ACTIVE_TASK_SET::check_rsc_limits_exceeded() { unsigned int j; ACTIVE_TASK *atp; static time_t last_disk_check_time = 0; for (j=0;jstate != PROCESS_EXECUTING) continue; if (atp->check_max_cpu_exceeded()) return true; else if (atp->check_max_mem_exceeded()) return true; else if (time(0)>last_disk_check_time + gstate.global_prefs.disk_interval) { last_disk_check_time = time(0); if (atp->check_max_disk_exceeded()) return true; } } return false; } // If process is running, send it a kill signal // This is done when app has exceeded CPU, disk, or mem limits // int ACTIVE_TASK::abort_task(char* msg) { if (state == PROCESS_EXECUTING || state == PROCESS_SUSPENDED) { state = PROCESS_ABORT_PENDING; kill_task(); } else { state = PROCESS_ABORTED; } gstate.report_result_error(*result, msg); return 0; } // check for the stderr file, copy to result record // bool ACTIVE_TASK::read_stderr_file() { char stderr_file[MAX_BLOB_LEN]; char path[256]; int n; sprintf(path, "%s%s%s", slot_dir, PATH_SEPARATOR, STDERR_FILE); if (boinc_file_exists(path)) { FILE* f = fopen(path, "r"); n = fread(stderr_file, 1, sizeof(stderr_file)-1, f); fclose(f); if (n < 0) return false; stderr_file[n] = '\0'; result->stderr_out += "\n"; result->stderr_out += stderr_file; const char* stderr_txt_close = "\n\n"; // truncate stderr output to 64KB; // it's unlikely that more than that will be useful // result->stderr_out = result->stderr_out.substr( 0, MAX_BLOB_LEN-1-strlen(stderr_txt_close) ); result->stderr_out += stderr_txt_close; return true; } return false; } // tell a running app to reread project preferences. // This is called when project prefs change, // or when a user file has finished downloading. // int ACTIVE_TASK::request_reread_prefs() { int retval; link_user_files(); retval = write_app_init_file(); if (retval) return retval; if (!app_client_shm.shm) return 0; app_client_shm.shm->graphics_request.send_msg( xml_graphics_modes[MODE_REREAD_PREFS] ); return 0; } // tell all running apps of a project to reread prefs // void ACTIVE_TASK_SET::request_reread_prefs(PROJECT* project) { unsigned int i; ACTIVE_TASK* atp; for (i=0; iresult->project != project) continue; if (!atp->process_exists()) continue; atp->request_reread_prefs(); } } // send quit signal to all tasks in the project // (or all tasks, if proj==0). // If they don't exit in 5 seconds, // send them a kill signal and wait up to 5 more seconds to exit. // This is called when the core client exits, // or when a project is detached or reset // int ACTIVE_TASK_SET::exit_tasks(PROJECT* proj) { request_tasks_exit(proj); // Wait 5 seconds for them to exit normally; if they don't then kill them // if (wait_for_exit(5, proj)) { kill_tasks(proj); } wait_for_exit(5, proj); // get final checkpoint_cpu_times // get_msgs(); return 0; } // Wait up to wait_time seconds for processes to exit // If proj is zero, wait for all processes, else that project's // NOTE: it's bad form to sleep, but it would be complex to avoid it here // int ACTIVE_TASK_SET::wait_for_exit(double wait_time, PROJECT* proj) { bool all_exited; unsigned int i,n; ACTIVE_TASK *atp; for (i=0; i<10; i++) { all_exited = true; for (n=0; nwup->project != proj) continue; if (!atp->has_task_exited()) { all_exited = false; break; } } if (all_exited) return 0; boinc_sleep(wait_time/10.0); } return ERR_NOT_EXITED; } int ACTIVE_TASK_SET::abort_project(PROJECT* project) { vector::iterator task_iter; ACTIVE_TASK* atp; exit_tasks(project); task_iter = active_tasks.begin(); while (task_iter != active_tasks.end()) { atp = *task_iter; if (atp->result->project == project) { task_iter = active_tasks.erase(task_iter); delete atp; } else { task_iter++; } } return 0; } // Find the ACTIVE_TASK in the current set with the matching PID // ACTIVE_TASK* ACTIVE_TASK_SET::lookup_pid(int pid) { unsigned int i; ACTIVE_TASK* atp; for (i=0; ipid == pid) return atp; } return NULL; } // Find the ACTIVE_TASK in the current set with the matching result // ACTIVE_TASK* ACTIVE_TASK_SET::lookup_result(RESULT* result) { unsigned int i; ACTIVE_TASK* atp; for (i=0; iresult == result) { return atp; } } return NULL; } // suspend all currently running tasks // called only from CLIENT_STATE::suspend_activities(), // e.g. because on batteries, time of day, benchmarking, etc. // void ACTIVE_TASK_SET::suspend_all(bool leave_apps_in_memory) { unsigned int i; ACTIVE_TASK* atp; for (i=0; istate != PROCESS_EXECUTING) continue; if (leave_apps_in_memory) { atp->suspend(); } else { atp->request_exit(); atp->pending_suspend_via_quit = true; } } } // resume all currently running tasks // void ACTIVE_TASK_SET::unsuspend_all() { unsigned int i; ACTIVE_TASK* atp; for (i=0; ischeduler_state != CPU_SCHED_SCHEDULED) continue; if (atp->state == PROCESS_UNINITIALIZED) { if (atp->start(false)) { msg_printf( atp->wup->project, MSG_ERROR, "ACTIVE_TASK_SET::unsuspend_all(): could not restart active_task" ); } } else if (atp->state == PROCESS_SUSPENDED) { atp->unsuspend(); } } } // Check to see if any tasks are running // called if benchmarking and waiting for suspends to happen // bool ACTIVE_TASK_SET::is_task_executing() { unsigned int i; ACTIVE_TASK* atp; for (i=0; istate == PROCESS_EXECUTING) { return true; } } return false; } // Send quit signal to all app processes // This is called when the core client exits, // or when a project is detached or reset // void ACTIVE_TASK_SET::request_tasks_exit(PROJECT* proj) { unsigned int i; ACTIVE_TASK *atp; for (i=0; iwup->project != proj) continue; if (!atp->process_exists()) continue; atp->request_exit(); } } // Send kill signal to all app processes // Don't wait for them to exit // void ACTIVE_TASK_SET::kill_tasks(PROJECT* proj) { unsigned int i; ACTIVE_TASK *atp; for (i=0; iwup->project != proj) continue; if (!atp->process_exists()) continue; atp->kill_task(); } } // suspend a task // int ACTIVE_TASK::suspend() { if (!app_client_shm.shm) return 0; process_control_queue.msg_queue_send( "", app_client_shm.shm->process_control_request ); state = PROCESS_SUSPENDED; return 0; } // resume a suspended task // int ACTIVE_TASK::unsuspend() { if (!app_client_shm.shm) return 0; process_control_queue.msg_queue_send( "", app_client_shm.shm->process_control_request ); state = PROCESS_EXECUTING; return 0; } // See if the app has placed a new message in shared mem // (with CPU done, frac done etc.) // If so parse it and return true. // bool ACTIVE_TASK::get_app_status_msg() { char msg_buf[MSG_CHANNEL_SIZE]; bool found = false; if (!app_client_shm.shm) return false; if (app_client_shm.shm->app_status.get_msg(msg_buf)) { fraction_done = current_cpu_time = checkpoint_cpu_time = 0.0; parse_double(msg_buf, "", fraction_done); parse_double(msg_buf, "", current_cpu_time); parse_double(msg_buf, "", checkpoint_cpu_time); parse_double(msg_buf, "", vm_size); parse_double(msg_buf, "", resident_set_size); found = true; } return found; } bool ACTIVE_TASK::get_trickle_up_msg() { char msg_buf[MSG_CHANNEL_SIZE]; bool found = false; int retval; if (!app_client_shm.shm) return false; if (app_client_shm.shm->trickle_up.get_msg(msg_buf)) { if (match_tag(msg_buf, "")) { retval = move_trickle_file(); if (!retval) { wup->project->sched_rpc_pending = true; } } found = true; } return found; } // check for msgs from active tasks. // Return true if any of them has changed its checkpoint_cpu_time // (since in that case we need to write state file) // bool ACTIVE_TASK_SET::get_msgs() { unsigned int i; ACTIVE_TASK *atp; double old_time; bool action = false; for (i=0; iprocess_exists()) continue; old_time = atp->checkpoint_cpu_time; if (atp->get_app_status_msg()) { //atp->estimate_frac_rate_of_change(dtime()); if (old_time != atp->checkpoint_cpu_time) { action = true; } } atp->get_trickle_up_msg(); } return action; }