// Berkeley Open Infrastructure for Network Computing // http://boinc.berkeley.edu // Copyright (C) 2005 University of California // // This is free software; you can redistribute it and/or // modify it under the terms of the GNU Lesser General Public // License as published by the Free Software Foundation; // either version 2.1 of the License, or (at your option) any later version. // // This software is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU Lesser General Public License for more details. // // To view the GNU Lesser General Public License visit // http://www.gnu.org/copyleft/lesser.html // or write to the Free Software Foundation, Inc., // 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA // Abstraction of a set of executing applications, // connected to I/O files in various ways. // Shouldn't depend on CLIENT_STATE. #include "cpp.h" #ifdef _WIN32 #include "boinc_win.h" #else #include "config.h" #endif #ifndef _WIN32 #if HAVE_UNISTD_H #include #endif #if HAVE_SYS_WAIT_H #include #endif #if HAVE_SYS_TIME_H #include #endif #if HAVE_SYS_RESOURCE_H #include #endif #if HAVE_SYS_TYPES_H #include #endif #if HAVE_FCNTL_H #include #endif #include #include #include #include #include #endif #include "client_state.h" #include "client_types.h" #include "error_numbers.h" #include "filesys.h" #include "file_names.h" #include "parse.h" #include "shmem.h" #include "util.h" #include "client_msgs.h" #include "procinfo.h" #include "app.h" using std::max; using std::min; ACTIVE_TASK::ACTIVE_TASK() { result = NULL; wup = NULL; app_version = NULL; pid = 0; slot = 0; _task_state = PROCESS_UNINITIALIZED; scheduler_state = CPU_SCHED_UNINITIALIZED; signal = 0; strcpy(slot_dir, ""); is_ss_app = false; graphics_mode_acked = MODE_UNSUPPORTED; graphics_mode_before_ss = MODE_HIDE_GRAPHICS; graphics_mode_ack_timeout = 0; exit_requested = false; fraction_done = 0; episode_start_cpu_time = 0; run_interval_start_wall_time = gstate.now; debt_interval_start_cpu_time = 0; checkpoint_cpu_time = 0; checkpoint_wall_time = 0; current_cpu_time = 0; have_trickle_down = false; send_upload_file_status = false; too_large = false; want_network = 0; memset(&procinfo, 0, sizeof(procinfo)); #ifdef _WIN32 pid_handle = 0; thread_handle = 0; shm_handle = 0; #endif } static const char* task_state_name(int val) { switch (val) { case PROCESS_UNINITIALIZED: return "UNINITIALIZED"; case PROCESS_EXECUTING: return "EXECUTING"; case PROCESS_SUSPENDED: return "SUSPENDED"; case PROCESS_ABORT_PENDING: return "ABORT_PENDING"; case PROCESS_EXITED: return "EXITED"; case PROCESS_WAS_SIGNALED: return "WAS_SIGNALED"; case PROCESS_EXIT_UNKNOWN: return "EXIT_UNKNOWN"; case PROCESS_ABORTED: return "ABORTED"; case PROCESS_COULDNT_START: return "COULDNT_START"; case PROCESS_QUIT_PENDING: return "QUIT_PENDING"; } return "Unknown"; } void ACTIVE_TASK::set_task_state(int val, const char* where) { _task_state = val; if (log_flags.task_debug) { msg_printf(result->project, MSG_INFO, "[task_debug] task_state=%s for %s from %s", task_state_name(val), result->name, where ); } } #ifdef _WIN32 // call this when a process has existed but will be started again // (e.g. suspend via quit, exited but no finish file). // In these cases we want to keep the shmem and events // void ACTIVE_TASK::close_process_handles() { if (pid_handle) { CloseHandle(pid_handle); pid_handle = NULL; } if (thread_handle) { CloseHandle(thread_handle); thread_handle = NULL; } } #endif // call this when a process has exited and we're not going to restart it // void ACTIVE_TASK::cleanup_task() { #ifdef _WIN32 // detach from shared mem. // This will destroy shmem seg since we're the last attachment // if (app_client_shm.shm) { detach_shmem(shm_handle, app_client_shm.shm); app_client_shm.shm = NULL; } #else int retval; if (app_client_shm.shm) { retval = detach_shmem(app_client_shm.shm); if (retval) { msg_printf(NULL, MSG_INTERNAL_ERROR, "Couldn't detach shared memory: %s", boincerror(retval) ); } retval = destroy_shmem(shmem_seg_name); if (retval) { msg_printf(NULL, MSG_INTERNAL_ERROR, "Couldn't destroy shared memory: %s", boincerror(retval) ); } app_client_shm.shm = NULL; } #endif } ACTIVE_TASK::~ACTIVE_TASK() { cleanup_task(); } int ACTIVE_TASK::init(RESULT* rp) { result = rp; wup = rp->wup; app_version = wup->avp; max_cpu_time = rp->wup->rsc_fpops_bound/gstate.host_info.p_fpops; max_disk_usage = rp->wup->rsc_disk_bound; max_mem_usage = rp->wup->rsc_memory_bound; get_slot_dir(slot, slot_dir); return 0; } #if 0 // Deallocate memory to prevent unneeded reporting of memory leaks // void ACTIVE_TASK_SET::free_mem() { vector::iterator at_iter; ACTIVE_TASK *at; at_iter = active_tasks.begin(); while (at_iter != active_tasks.end()) { at = active_tasks[0]; at_iter = active_tasks.erase(at_iter); delete at; } } #endif void ACTIVE_TASK_SET::get_memory_usage() { static double last_mem_time=0; unsigned int i; int retval; double diff = gstate.now - last_mem_time; if (diff < 10) return; last_mem_time = gstate.now; vector piv; retval = procinfo_setup(piv); if (retval) { if (log_flags.mem_usage_debug) { msg_printf(0, MSG_INTERNAL_ERROR, "[mem_usage_debug] procinfo_setup() returned %d", retval ); } return; } for (i=0; ischeduler_state == CPU_SCHED_SCHEDULED) { PROCINFO& pi = atp->procinfo; unsigned long last_page_fault_count = pi.page_fault_count; memset(&pi, 0, sizeof(pi)); pi.id = atp->pid; procinfo_app(pi, piv); pi.working_set_size_smoothed = .5*pi.working_set_size_smoothed + pi.working_set_size; int pf = pi.page_fault_count - last_page_fault_count; pi.page_fault_rate = pf/diff; if (log_flags.mem_usage_debug) { msg_printf(atp->result->project, MSG_INFO, "[mem_usage_debug] %s: RAM %.2fMB, page %.2fMB, %.2f page faults/sec, user CPU %.3f, kernel CPU %.3f", atp->result->name, pi.working_set_size/MEGA, pi.swap_size/MEGA, pi.page_fault_rate, pi.user_time, pi.kernel_time ); } } } #if 0 // the following is not useful because most OSs don't // move idle processes out of RAM, so physical memory is always full // procinfo_other(pi, piv); msg_printf(NULL, MSG_INFO, "All others: RAM %.2fMB, page %.2fMB, user %.3f, kernel %.3f", pi.working_set_size/MEGA, pi.swap_size/MEGA, pi.user_time, pi.kernel_time ); #endif } // Do periodic checks on running apps: // - get latest CPU time and % done info // - check if any has exited, and clean up // - see if any has exceeded its CPU or disk space limits, and abort it // bool ACTIVE_TASK_SET::poll() { bool action; unsigned int i; static double last_time = 0; if (gstate.now - last_time < 1.0) return false; last_time = gstate.now; action = check_app_exited(); send_heartbeats(); send_trickle_downs(); graphics_poll(); process_control_poll(); get_memory_usage(); action |= check_rsc_limits_exceeded(); action |= get_msgs(); for (i=0; itask_state() == PROCESS_ABORT_PENDING) { if (gstate.now > atp->abort_time + 5.0) { atp->kill_task(false); } } } if (action) { gstate.set_client_state_dirty("ACTIVE_TASK_SET::poll"); } return action; } // Remove an ACTIVE_TASK from the set. // Does NOT delete the ACTIVE_TASK object. // int ACTIVE_TASK_SET::remove(ACTIVE_TASK* atp) { vector::iterator iter; iter = active_tasks.begin(); while (iter != active_tasks.end()) { if (*iter == atp) { iter = active_tasks.erase(iter); return 0; } iter++; } msg_printf(NULL, MSG_INTERNAL_ERROR, "Task %s not found", atp->result->name ); return ERR_NOT_FOUND; } // There's a new trickle file. // Move it from slot dir to project dir // int ACTIVE_TASK::move_trickle_file() { char project_dir[256], new_path[256], old_path[256]; int retval; get_project_dir(result->project, project_dir); sprintf(old_path, "%s/trickle_up.xml", slot_dir); sprintf(new_path, "%s/trickle_up_%s_%d.xml", project_dir, result->name, (int)time(0) ); retval = boinc_rename(old_path, new_path); // if can't move it, remove // if (retval) { boinc_delete_file(old_path); return ERR_RENAME; } return 0; } // Returns the estimated CPU time to completion (in seconds) of this task. // Compute this as a weighted average of estimates based on // 1) the workunit's flops count // 2) the current reported CPU time and fraction done // double ACTIVE_TASK::est_cpu_time_to_completion() { if (fraction_done >= 1) return 0; double wu_est = result->estimated_cpu_time(); if (fraction_done <= 0) return wu_est; double frac_est = (current_cpu_time / fraction_done) - current_cpu_time; double fraction_left = 1-fraction_done; return fraction_done*frac_est + fraction_left*fraction_left*wu_est; } // size of output files and files in slot dir // int ACTIVE_TASK::current_disk_usage(double& size) { double x; unsigned int i; int retval; FILE_INFO* fip; char path[256]; retval = dir_size(slot_dir, size); if (retval) return retval; for (i=0; ioutput_files.size(); i++) { fip = result->output_files[i].file_info; get_pathname(fip, path); retval = file_size(path, x); if (!retval) size += x; } return 0; } // Get the next free slot // int ACTIVE_TASK_SET::get_free_slot() { unsigned int i; int j; bool found; for (j=0; ; j++) { found = false; for (i=0; islot == j) { found = true; break; } } if (!found) return j; } return ERR_NOT_FOUND; // probably never get here } bool ACTIVE_TASK_SET::slot_taken(int slot) { unsigned int i; for (i=0; islot == slot) return true; } return false; } int ACTIVE_TASK::write(MIOFILE& fout) { fout.printf( "\n" " %s\n" " %s\n" " %d\n" " %d\n" " %d\n" " %d\n" " %f\n" " %f\n" " %f\n" " %f\n" " %f\n" " %f\n" " %f\n" "%s", result->project->master_url, result->name, task_state(), app_version->version_num, slot, scheduler_state, checkpoint_cpu_time, fraction_done, current_cpu_time, procinfo.swap_size, procinfo.working_set_size, procinfo.working_set_size_smoothed, procinfo.page_fault_rate, too_large?" \n":"" ); if (supports_graphics() && !gstate.disable_graphics) { fout.printf( " \n" " %d\n", graphics_mode_acked ); } fout.printf("\n"); return 0; } int ACTIVE_TASK::parse(MIOFILE& fin) { char buf[256], result_name[256], project_master_url[256]; int app_version_num=0, n; unsigned int i; PROJECT* project; strcpy(result_name, ""); strcpy(project_master_url, ""); while (fin.fgets(buf, 256)) { if (match_tag(buf, "")) { project = gstate.lookup_project(project_master_url); if (!project) { msg_printf( NULL, MSG_INTERNAL_ERROR, "State file error: project %s not found\n", project_master_url ); return ERR_NULL; } result = gstate.lookup_result(project, result_name); if (!result) { msg_printf( project, MSG_INTERNAL_ERROR, "State file error: result %s not found\n", result_name ); return ERR_NULL; } // various sanity checks // if (result->got_server_ack || result->ready_to_report || result->state() != RESULT_FILES_DOWNLOADED ) { msg_printf(project, MSG_INTERNAL_ERROR, "State file error: result %s is in wrong state\n", result_name ); return ERR_BAD_RESULT_STATE; } wup = result->wup; app_version = gstate.lookup_app_version( result->app, app_version_num ); if (!app_version) { msg_printf( project, MSG_INTERNAL_ERROR, "State file error: application %s version %d not found\n", result->app->name, app_version_num ); return ERR_NULL; } // make sure no two active tasks are in same slot // for (i=0; islot == slot) { msg_printf(project, MSG_INTERNAL_ERROR, "State file error: two tasks in slot %d\n", slot ); return ERR_BAD_RESULT_STATE; } } return 0; } else if (parse_str(buf, "", result_name, sizeof(result_name))) continue; else if (parse_str(buf, "", project_master_url, sizeof(project_master_url))) continue; else if (parse_int(buf, "", app_version_num)) continue; else if (parse_int(buf, "", slot)) continue; else if (parse_double(buf, "", checkpoint_cpu_time)) continue; else if (parse_double(buf, "", fraction_done)) continue; else if (parse_double(buf, "", current_cpu_time)) continue; else if (parse_int(buf, "", n)) continue; else if (parse_double(buf, "", procinfo.swap_size)) continue; else if (parse_double(buf, "", procinfo.working_set_size)) continue; else if (parse_double(buf, "", procinfo.working_set_size_smoothed)) continue; else if (parse_double(buf, "", procinfo.page_fault_rate)) continue; else if (match_tag(buf, "")) continue; else if (parse_int(buf, "", n)) continue; else if (parse_int(buf, "", n)) continue; else { if (log_flags.unparsed_xml) { msg_printf(0, MSG_INFO, "[unparsed_xml] ACTIVE_TASK::parse(): unrecognized %s\n", buf ); } } } return ERR_XML_PARSE; } // Write XML information about this active task set // int ACTIVE_TASK_SET::write(MIOFILE& fout) { unsigned int i; int retval; fout.printf("\n"); for (i=0; iwrite(fout); if (retval) return retval; } fout.printf("\n"); return 0; } // Parse XML information about an active task set // int ACTIVE_TASK_SET::parse(MIOFILE& fin) { ACTIVE_TASK* atp; char buf[256]; int retval; while (fin.fgets(buf, 256)) { if (match_tag(buf, "")) return 0; else if (match_tag(buf, "")) { atp = new ACTIVE_TASK; retval = atp->parse(fin); if (!retval) { if (slot_taken(atp->slot)) { msg_printf(atp->result->project, MSG_INTERNAL_ERROR, "slot %d in use; discarding result %s", atp->slot, atp->result->name ); retval = ERR_XML_PARSE; } } if (!retval) active_tasks.push_back(atp); else delete atp; } else { if (log_flags.unparsed_xml) { msg_printf(0, MSG_INFO, "[unparsed_xml] ACTIVE_TASK_SET::parse(): unrecognized %s\n", buf ); } } } return ERR_XML_PARSE; } void MSG_QUEUE::init(char* n) { strcpy(name, n); last_block = 0; msgs.clear(); } void MSG_QUEUE::msg_queue_send(const char* msg, MSG_CHANNEL& channel) { if ((msgs.size()==0) && channel.send_msg(msg)) { if (log_flags.app_msg_send) { msg_printf(NULL, MSG_INFO, "[app_msg_send] sent %s to %s", msg, name); } last_block = 0; return; } if (log_flags.app_msg_send) { msg_printf(NULL, MSG_INFO, "[app_msg_send] deferred %s to %s", msg, name); } msgs.push_back(std::string(msg)); if (!last_block) last_block = gstate.now; } void MSG_QUEUE::msg_queue_poll(MSG_CHANNEL& channel) { if (msgs.size() > 0) { if (log_flags.app_msg_send) { msg_printf(NULL, MSG_INFO, "[app_msg_send] poll: %d msgs queued for %s:", (int)msgs.size(), name ); } if (channel.send_msg(msgs[0].c_str())) { if (log_flags.app_msg_send) { msg_printf(NULL, MSG_INFO, "[app_msg_send] poll: delayed sent %s", (msgs[0].c_str())); } msgs.erase(msgs.begin()); last_block = 0; } for (unsigned int i=0; i::iterator iter = msgs.begin(); for (int i=0; ic_str(), name ); } if (!strcmp(msg, iter->c_str())) { if (log_flags.app_msg_send) { msg_printf(NULL, MSG_INFO, "[app_msg_send] purged %s from %s", msg, name); } iter = msgs.erase(iter); return 1; } return 0; } bool MSG_QUEUE::timeout(double diff) { if (!last_block) return false; if (gstate.now - last_block > diff) { return true; } return false; } void ACTIVE_TASK_SET::report_overdue() { unsigned int i; ACTIVE_TASK* atp; for (i=0; iresult->report_deadline)/86400; if (diff > 0) { msg_printf(atp->result->project, MSG_USER_ERROR, "Task %s is %.2f days overdue.", atp->result->name, diff ); msg_printf(atp->result->project, MSG_USER_ERROR, "You may not get credit for it. Consider aborting it." ); } } } // scan the slot directory, looking for files with names // of the form boinc_ufr_X. // Then mark file X as being present (and uploadable) // int ACTIVE_TASK::handle_upload_files() { std::string filename; char buf[256], path[256]; int retval; DirScanner dirscan(slot_dir); while (dirscan.scan(filename)) { strcpy(buf, filename.c_str()); if (strstr(buf, UPLOAD_FILE_REQ_PREFIX) == buf) { char* p = buf+strlen(UPLOAD_FILE_REQ_PREFIX); FILE_INFO* fip = result->lookup_file_logical(p); if (fip) { get_pathname(fip, path); retval = md5_file(path, fip->md5_cksum, fip->nbytes); if (retval) { fip->status = retval; } else { fip->status = FILE_PRESENT; } } else { msg_printf(0, MSG_INTERNAL_ERROR, "Can't find uploadable file %s", p); } sprintf(path, "%s/%s", slot_dir, buf); boinc_delete_file(path); } } return 0; } void ACTIVE_TASK_SET::handle_upload_files() { for (unsigned int i=0; ihandle_upload_files(); } } bool ACTIVE_TASK_SET::want_network() { for (unsigned int i=0; iwant_network) return true; } return false; } void ACTIVE_TASK_SET::network_available() { for (unsigned int i=0; iwant_network) { atp->send_network_available(); } } } void ACTIVE_TASK::upload_notify_app(const FILE_INFO* fip, const FILE_REF* frp) { char path[256]; sprintf(path, "%s/%s%s", slot_dir, UPLOAD_FILE_STATUS_PREFIX, frp->open_name); FILE* f = boinc_fopen(path, "w"); if (!f) return; fprintf(f, "%d\n", fip->status); fclose(f); send_upload_file_status = true; } // a file upload has finished. // If any running apps are waiting for it, notify them // void ACTIVE_TASK_SET::upload_notify_app(FILE_INFO* fip) { for (unsigned int i=0; iresult; FILE_REF* frp = rp->lookup_file(fip); if (frp) { atp->upload_notify_app(fip, frp); } } } void ACTIVE_TASK_SET::init() { for (unsigned int i=0; iinit(atp->result); atp->scheduler_state = CPU_SCHED_PREEMPTED; } } const char *BOINC_RCSID_778b61195e = "$Id$";