// This file is part of BOINC. // http://boinc.berkeley.edu // Copyright (C) 2008 University of California // // BOINC is free software; you can redistribute it and/or modify it // under the terms of the GNU Lesser General Public License // as published by the Free Software Foundation, // either version 3 of the License, or (at your option) any later version. // // BOINC is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with BOINC. If not, see . // Abstraction of a set of executing applications, // connected to I/O files in various ways. // Shouldn't depend on CLIENT_STATE. #include "cpp.h" #ifdef _WIN32 #include "boinc_win.h" #else #include "config.h" #endif #ifndef _WIN32 #include #if HAVE_SYS_WAIT_H #include #endif #if HAVE_SYS_TIME_H #include #endif #if HAVE_SYS_RESOURCE_H #include #endif #if HAVE_SYS_TYPES_H #include #endif #if HAVE_FCNTL_H #include #endif #include #include #include #include #include #endif #include "client_state.h" #include "error_numbers.h" #include "filesys.h" #include "file_names.h" #include "parse.h" #include "shmem.h" #include "str_util.h" #include "client_msgs.h" #include "procinfo.h" #include "sandbox.h" #include "app.h" using std::max; using std::min; bool exclusive_app_running; bool exclusive_gpu_app_running; int gpu_suspend_reason; double non_boinc_cpu_usage; #define ABORT_TIMEOUT 60 // if we send app request, wait this long before killing it. // This gives it time to download symbol files (which can be several MB) // and write stack trace to stderr #define QUIT_TIMEOUT 10 // Same, for . Shorter because no stack trace is generated ACTIVE_TASK::~ACTIVE_TASK() { } // preempt this task; // called from the CLIENT_STATE::enforce_schedule() // and ACTIVE_TASK_SET::suspend_all() // int ACTIVE_TASK::preempt(int preempt_type) { int retval; bool remove=false; switch (preempt_type) { case REMOVE_NEVER: remove = false; break; case REMOVE_MAYBE_USER: case REMOVE_MAYBE_SCHED: // GPU jobs: always remove from mem, since it's tying up GPU RAM // if (result->uses_coprocs()) { remove = true; break; } // if it's never checkpointed, leave in mem // if (checkpoint_elapsed_time == 0) { remove = false; break; } // otherwise obey user prefs // remove = !gstate.global_prefs.leave_apps_in_memory; break; case REMOVE_ALWAYS: remove = true; break; } if (remove) { if (log_flags.cpu_sched) { msg_printf(result->project, MSG_INFO, "[cpu_sched] Preempting %s (removed from memory)", result->name ); } set_task_state(PROCESS_QUIT_PENDING, "preempt"); retval = request_exit(); } else { if (log_flags.cpu_sched) { msg_printf(result->project, MSG_INFO, "[cpu_sched] Preempting %s (left in memory)", result->name ); } retval = suspend(); } return 0; } #ifndef SIM ACTIVE_TASK::ACTIVE_TASK() { result = NULL; wup = NULL; app_version = NULL; pid = 0; slot = 0; _task_state = PROCESS_UNINITIALIZED; scheduler_state = CPU_SCHED_UNINITIALIZED; signal = 0; strcpy(slot_dir, ""); graphics_mode_acked = MODE_UNSUPPORTED; graphics_mode_ack_timeout = 0; fraction_done = 0; run_interval_start_wall_time = gstate.now; checkpoint_cpu_time = 0; checkpoint_wall_time = 0; current_cpu_time = 0; once_ran_edf = false; elapsed_time = 0; checkpoint_elapsed_time = 0; have_trickle_down = false; send_upload_file_status = false; too_large = false; needs_shmem = false; want_network = 0; premature_exit_count = 0; quit_time = 0; memset(&procinfo, 0, sizeof(procinfo)); #ifdef _WIN32 process_handle = NULL; shm_handle = NULL; #endif premature_exit_count = 0; } static const char* task_state_name(int val) { switch (val) { case PROCESS_UNINITIALIZED: return "UNINITIALIZED"; case PROCESS_EXECUTING: return "EXECUTING"; case PROCESS_SUSPENDED: return "SUSPENDED"; case PROCESS_ABORT_PENDING: return "ABORT_PENDING"; case PROCESS_EXITED: return "EXITED"; case PROCESS_WAS_SIGNALED: return "WAS_SIGNALED"; case PROCESS_EXIT_UNKNOWN: return "EXIT_UNKNOWN"; case PROCESS_ABORTED: return "ABORTED"; case PROCESS_COULDNT_START: return "COULDNT_START"; case PROCESS_QUIT_PENDING: return "QUIT_PENDING"; } return "Unknown"; } void ACTIVE_TASK::set_task_state(int val, const char* where) { _task_state = val; if (log_flags.task_debug) { msg_printf(result->project, MSG_INFO, "[task] task_state=%s for %s from %s", task_state_name(val), result->name, where ); } } // called when a process has exited or we've killed it // void ACTIVE_TASK::cleanup_task() { #ifdef _WIN32 if (process_handle) { CloseHandle(process_handle); process_handle = NULL; } // detach from shared mem. // This will destroy shmem seg since we're the last attachment // if (app_client_shm.shm) { detach_shmem(shm_handle, app_client_shm.shm); app_client_shm.shm = NULL; } #else int retval; if (app_client_shm.shm) { #ifndef __EMX__ if (app_version->api_major_version() >= 6) { retval = detach_shmem_mmap(app_client_shm.shm, sizeof(SHARED_MEM)); } else #endif { retval = detach_shmem(app_client_shm.shm); if (retval) { msg_printf(wup->project, MSG_INTERNAL_ERROR, "Couldn't detach shared memory: %s", boincerror(retval) ); } retval = destroy_shmem(shmem_seg_name); if (retval) { msg_printf(wup->project, MSG_INTERNAL_ERROR, "Couldn't destroy shared memory: %s", boincerror(retval) ); } } app_client_shm.shm = NULL; gstate.retry_shmem_time = 0; } #endif // clear backoff for app's resource; // this addresses the situation where the project has a // "max # jobs in progress" limit, and we're backed off because of that // if (app_version->ncudas) { result->project->cuda_pwf.clear_backoff(); } else if (app_version->natis) { result->project->ati_pwf.clear_backoff(); } else { result->project->cpu_pwf.clear_backoff(); } if (config.exit_after_finish) { exit(0); } } int ACTIVE_TASK::init(RESULT* rp) { result = rp; wup = rp->wup; app_version = rp->avp; max_elapsed_time = rp->wup->rsc_fpops_bound/rp->avp->flops; max_disk_usage = rp->wup->rsc_disk_bound; max_mem_usage = rp->wup->rsc_memory_bound; get_slot_dir(slot, slot_dir, sizeof(slot_dir)); relative_to_absolute(slot_dir, slot_path); return 0; } // Deallocate memory to prevent unneeded reporting of memory leaks // void ACTIVE_TASK_SET::free_mem() { vector::iterator at_iter; ACTIVE_TASK *at; at_iter = active_tasks.begin(); while (at_iter != active_tasks.end()) { at = active_tasks[0]; at_iter = active_tasks.erase(at_iter); delete at; } } bool app_running(vector& piv, const char* p) { for (unsigned int i=0; i piv; retval = procinfo_setup(piv); if (retval) { if (log_flags.mem_usage_debug) { msg_printf(NULL, MSG_INTERNAL_ERROR, "[mem_usage] procinfo_setup() returned %d", retval ); } return; } for (i=0; ischeduler_state == CPU_SCHED_SCHEDULED) { PROCINFO& pi = atp->procinfo; unsigned long last_page_fault_count = pi.page_fault_count; memset(&pi, 0, sizeof(pi)); pi.id = atp->pid; procinfo_app(pi, piv, atp->app_version->graphics_exec_file); pi.working_set_size_smoothed = .5*pi.working_set_size_smoothed + pi.working_set_size; int pf = pi.page_fault_count - last_page_fault_count; pi.page_fault_rate = pf/diff; if (log_flags.mem_usage_debug) { msg_printf(atp->result->project, MSG_INFO, "[mem_usage] %s: RAM %.2fMB, page %.2fMB, %.2f page faults/sec, user CPU %.3f, kernel CPU %.3f", atp->result->name, pi.working_set_size/MEGA, pi.swap_size/MEGA, pi.page_fault_rate, pi.user_time, pi.kernel_time ); } } } exclusive_app_running = false; bool old_egar = exclusive_gpu_app_running; exclusive_gpu_app_running = false; for (i=0; itask_state() == PROCESS_ABORT_PENDING) { if (gstate.now > atp->abort_time + ABORT_TIMEOUT) { atp->kill_task(false); } } if (atp->task_state() == PROCESS_QUIT_PENDING) { if (gstate.now > atp->quit_time + QUIT_TIMEOUT) { atp->kill_task(true); } } } if (action) { gstate.set_client_state_dirty("ACTIVE_TASK_SET::poll"); } return action; } // There's a new trickle file. // Move it from slot dir to project dir // int ACTIVE_TASK::move_trickle_file() { char project_dir[256], new_path[1024], old_path[1024]; int retval; get_project_dir(result->project, project_dir, sizeof(project_dir)); sprintf(old_path, "%s/trickle_up.xml", slot_dir); sprintf(new_path, "%s/trickle_up_%s_%d.xml", project_dir, result->name, (int)time(0) ); retval = boinc_rename(old_path, new_path); // if can't move it, remove // if (retval) { delete_project_owned_file(old_path, true); return ERR_RENAME; } return 0; } // size of output files and files in slot dir // int ACTIVE_TASK::current_disk_usage(double& size) { double x; unsigned int i; int retval; FILE_INFO* fip; char path[1024]; retval = dir_size(slot_dir, size); if (retval) return retval; for (i=0; ioutput_files.size(); i++) { fip = result->output_files[i].file_info; get_pathname(fip, path, sizeof(path)); retval = file_size(path, x); if (!retval) size += x; } return 0; } bool ACTIVE_TASK_SET::is_slot_in_use(int slot) { unsigned int i; for (i=0; islot == slot) { return true; } } return false; } bool ACTIVE_TASK_SET::is_slot_dir_in_use(char* dir) { char path[1024]; unsigned int i; for (i=0; islot, path, sizeof(path)); if (!strcmp(path, dir)) return true; } return false; } // Get a free slot, // and make a slot dir if needed // void ACTIVE_TASK::get_free_slot(RESULT* rp) { int j, retval; char path[1024]; for (j=0; ; j++) { if (gstate.active_tasks.is_slot_in_use(j)) continue; // make sure we can make an empty directory for this slot // get_slot_dir(j, path, sizeof(path)); if (boinc_file_exists(path)) { if (is_dir(path)) { retval = client_clean_out_dir(path, "get_free_slot()"); if (!retval) break; } } else { retval = make_slot_dir(j); if (!retval) break; } } slot = j; if (log_flags.slot_debug) { msg_printf(rp->project, MSG_INFO, "[slot] assigning slot %d to %s", j, rp->name); } } bool ACTIVE_TASK_SET::slot_taken(int slot) { unsigned int i; for (i=0; islot == slot) return true; } return false; } // is here for the benefit of 3rd-party software // that reads the client state file // int ACTIVE_TASK::write(MIOFILE& fout) { fout.printf( "\n" " %s\n" " %s\n" " %d\n" " %d\n" " %d\n" " %f\n" " %f\n" " %f\n" " %f\n" " %d\n" " %f\n" " %f\n" " %f\n" " %f\n", result->project->master_url, result->name, task_state(), app_version->version_num, slot, checkpoint_cpu_time, checkpoint_elapsed_time, fraction_done, current_cpu_time, once_ran_edf?1:0, procinfo.swap_size, procinfo.working_set_size, procinfo.working_set_size_smoothed, procinfo.page_fault_rate ); fout.printf("\n"); return 0; } int ACTIVE_TASK::write_gui(MIOFILE& fout) { fout.printf( "\n" " %d\n" " %d\n" " %d\n" " %d\n" " %d\n" " %f\n" " %f\n" " %f\n" " %f\n" " %f\n" " %f\n" " %f\n" " %f\n" "%s" "%s", task_state(), app_version->version_num, slot, pid, scheduler_state, checkpoint_cpu_time, fraction_done, current_cpu_time, elapsed_time, procinfo.swap_size, procinfo.working_set_size, procinfo.working_set_size_smoothed, procinfo.page_fault_rate, too_large?" \n":"", needs_shmem?" \n":"" ); if (strlen(app_version->graphics_exec_path)) { fout.printf( " %s\n" " %s\n", app_version->graphics_exec_path, slot_path ); } if (supports_graphics() && !gstate.disable_graphics) { fout.printf( " \n" " %d\n", graphics_mode_acked ); } fout.printf("\n"); return 0; } int ACTIVE_TASK::parse(MIOFILE& fin) { char buf[256], result_name[256], project_master_url[256]; int n, dummy; unsigned int i; PROJECT* project=0; double x; strcpy(result_name, ""); strcpy(project_master_url, ""); while (fin.fgets(buf, 256)) { if (match_tag(buf, "")) { project = gstate.lookup_project(project_master_url); if (!project) { msg_printf( NULL, MSG_INTERNAL_ERROR, "State file error: project %s not found for task\n", project_master_url ); return ERR_NULL; } result = gstate.lookup_result(project, result_name); if (!result) { msg_printf( project, MSG_INTERNAL_ERROR, "State file error: result %s not found for task\n", result_name ); return ERR_NULL; } // various sanity checks // if (result->got_server_ack || result->ready_to_report || result->state() != RESULT_FILES_DOWNLOADED ) { return ERR_BAD_RESULT_STATE; } wup = result->wup; app_version = gstate.lookup_app_version( result->app, result->platform, result->version_num, result->plan_class ); if (!app_version) { msg_printf( project, MSG_INTERNAL_ERROR, "State file error: app %s platform %s version %d not found\n", result->app->name, result->platform, result->version_num ); return ERR_NULL; } // make sure no two active tasks are in same slot // for (i=0; islot == slot) { msg_printf(project, MSG_INTERNAL_ERROR, "State file error: two tasks in slot %d\n", slot ); return ERR_BAD_RESULT_STATE; } } // for 6.2/6.4 transition // if (checkpoint_elapsed_time == 0) { elapsed_time = checkpoint_cpu_time; checkpoint_elapsed_time = elapsed_time; } return 0; } else if (parse_str(buf, "", result_name, sizeof(result_name))) continue; else if (parse_str(buf, "", project_master_url, sizeof(project_master_url))) continue; else if (parse_int(buf, "", slot)) continue; else if (parse_int(buf, "", dummy)) continue; else if (parse_double(buf, "", checkpoint_cpu_time)) continue; else if (parse_bool(buf, "once_ran_edf", once_ran_edf)) continue; else if (parse_double(buf, "", fraction_done)) continue; else if (parse_double(buf, "", checkpoint_elapsed_time)) continue; else if (parse_int(buf, "", n)) continue; else if (parse_double(buf, "", procinfo.swap_size)) continue; else if (parse_double(buf, "", procinfo.working_set_size)) continue; else if (parse_double(buf, "", procinfo.working_set_size_smoothed)) continue; else if (parse_double(buf, "", procinfo.page_fault_rate)) continue; else if (parse_double(buf, "", x)) continue; else { if (log_flags.unparsed_xml) { msg_printf(project, MSG_INFO, "[unparsed_xml] ACTIVE_TASK::parse(): unrecognized %s\n", buf ); } } } return ERR_XML_PARSE; } int ACTIVE_TASK_SET::write(MIOFILE& fout) { unsigned int i; int retval; fout.printf("\n"); for (i=0; iwrite(fout); if (retval) return retval; } fout.printf("\n"); return 0; } int ACTIVE_TASK_SET::parse(MIOFILE& fin) { ACTIVE_TASK* atp; char buf[256]; int retval; while (fin.fgets(buf, 256)) { if (match_tag(buf, "")) return 0; else if (match_tag(buf, "")) { atp = new ACTIVE_TASK; retval = atp->parse(fin); if (!retval) { if (slot_taken(atp->slot)) { msg_printf(atp->result->project, MSG_INTERNAL_ERROR, "slot %d in use; discarding result %s", atp->slot, atp->result->name ); retval = ERR_XML_PARSE; } } if (!retval) active_tasks.push_back(atp); else delete atp; } else { if (log_flags.unparsed_xml) { msg_printf(NULL, MSG_INFO, "[unparsed_xml] ACTIVE_TASK_SET::parse(): unrecognized %s\n", buf ); } } } return ERR_XML_PARSE; } void MSG_QUEUE::init(char* n) { strcpy(name, n); last_block = 0; msgs.clear(); } void MSG_QUEUE::msg_queue_send(const char* msg, MSG_CHANNEL& channel) { if ((msgs.size()==0) && channel.send_msg(msg)) { if (log_flags.app_msg_send) { msg_printf(NULL, MSG_INFO, "[app_msg_send] sent %s to %s", msg, name); } last_block = 0; return; } if (log_flags.app_msg_send) { msg_printf(NULL, MSG_INFO, "[app_msg_send] deferred %s to %s", msg, name); } msgs.push_back(std::string(msg)); if (!last_block) last_block = gstate.now; } void MSG_QUEUE::msg_queue_poll(MSG_CHANNEL& channel) { if (msgs.size() > 0) { if (log_flags.app_msg_send) { msg_printf(NULL, MSG_INFO, "[app_msg_send] poll: %d msgs queued for %s:", (int)msgs.size(), name ); } if (channel.send_msg(msgs[0].c_str())) { if (log_flags.app_msg_send) { msg_printf(NULL, MSG_INFO, "[app_msg_send] poll: delayed sent %s", (msgs[0].c_str())); } msgs.erase(msgs.begin()); last_block = 0; } for (unsigned int i=0; i::iterator iter = msgs.begin(); for (int i=0; ic_str(), name ); } if (!strcmp(msg, iter->c_str())) { if (log_flags.app_msg_send) { msg_printf(NULL, MSG_INFO, "[app_msg_send] purged %s from %s", msg, name); } iter = msgs.erase(iter); return 1; } return 0; } bool MSG_QUEUE::timeout(double diff) { if (!last_block) return false; if (gstate.now - last_block > diff) { return true; } return false; } void ACTIVE_TASK_SET::report_overdue() { unsigned int i; ACTIVE_TASK* atp; for (i=0; iresult->report_deadline)/86400; if (diff > 0) { msg_printf(atp->result->project, MSG_INFO, "Task %s is %.2f days overdue; you may not get credit for it. Consider aborting it.", atp->result->name, diff ); } } } // scan the slot directory, looking for files with names // of the form boinc_ufr_X. // Then mark file X as being present (and uploadable) // int ACTIVE_TASK::handle_upload_files() { std::string filename; char buf[256], path[1024]; int retval; DirScanner dirscan(slot_dir); while (dirscan.scan(filename)) { strcpy(buf, filename.c_str()); if (strstr(buf, UPLOAD_FILE_REQ_PREFIX) == buf) { char* p = buf+strlen(UPLOAD_FILE_REQ_PREFIX); FILE_INFO* fip = result->lookup_file_logical(p); if (fip) { get_pathname(fip, path, sizeof(path)); retval = md5_file(path, fip->md5_cksum, fip->nbytes); if (retval) { fip->status = retval; } else { fip->status = FILE_PRESENT; } } else { msg_printf(wup->project, MSG_INTERNAL_ERROR, "Can't find uploadable file %s", p ); } sprintf(path, "%s/%s", slot_dir, buf); delete_project_owned_file(path, true); // delete the link file } } return 0; } void ACTIVE_TASK_SET::handle_upload_files() { for (unsigned int i=0; ihandle_upload_files(); } } bool ACTIVE_TASK_SET::want_network() { for (unsigned int i=0; iwant_network) return true; } return false; } void ACTIVE_TASK_SET::network_available() { for (unsigned int i=0; iwant_network) { atp->send_network_available(); } } } void ACTIVE_TASK::upload_notify_app(const FILE_INFO* fip, const FILE_REF* frp) { char path[256]; sprintf(path, "%s/%s%s", slot_dir, UPLOAD_FILE_STATUS_PREFIX, frp->open_name); FILE* f = boinc_fopen(path, "w"); if (!f) return; fprintf(f, "%d\n", fip->status); fclose(f); send_upload_file_status = true; } // a file upload has finished. // If any running apps are waiting for it, notify them // void ACTIVE_TASK_SET::upload_notify_app(FILE_INFO* fip) { for (unsigned int i=0; iresult; FILE_REF* frp = rp->lookup_file(fip); if (frp) { atp->upload_notify_app(fip, frp); } } } void ACTIVE_TASK_SET::init() { for (unsigned int i=0; iinit(atp->result); atp->scheduler_state = CPU_SCHED_PREEMPTED; atp->read_task_state_file(); atp->current_cpu_time = atp->checkpoint_cpu_time; atp->elapsed_time = atp->checkpoint_elapsed_time; } } #endif