From dd5cfb52c0bba6f11ea24394fc466b628c3e93be Mon Sep 17 00:00:00 2001 From: David Anderson Date: Sat, 28 Jan 2006 00:17:26 +0000 Subject: [PATCH] CPU scheduler svn path=/trunk/boinc/; revision=9338 --- api/boinc_api_fortran.C | 19 ++-- checkin_notes | 13 +++ client/client_state.C | 8 ++ client/client_state.h | 24 +++-- client/client_types.C | 3 +- client/client_types.h | 29 ++++++ client/cs_apps.C | 200 ++++++++++++++++++++++++++++++++++++++++ client/cs_scheduler.C | 110 ++-------------------- 8 files changed, 286 insertions(+), 120 deletions(-) diff --git a/api/boinc_api_fortran.C b/api/boinc_api_fortran.C index ceb2b4cd69..dec3bb1cbb 100644 --- a/api/boinc_api_fortran.C +++ b/api/boinc_api_fortran.C @@ -116,17 +116,16 @@ void boinc_calling_thread_cpu_time_(double* d) { boinc_calling_thread_cpu_time(*d); } -void boinc_zip_( - int zipmode, const char* zipfile, const char* unzippath, - int zipfile_len, int unzippath_len +void boinc_zip_(int* zipmode, const char* zipfile, + const char* path, int zipfile_len, int path_len ) { - //zipmode = 0 to unzip, 1 to zip - //zipfile, unzippath = FORTRAN variables of type CHARACTER - STRING_FROM_FORTRAN zipfileff(zipfile, zipfile_len); - STRING_FROM_FORTRAN unzippathff(unzippath, unzippath_len); - zipfileff.strip_whitespace(); - unzippathff.strip_whitespace(); - boinc_zip(zipmode, zipfileff.c_str(), unzippathff.c_str()); + //zipmode = 0 to unzip or 1 to zip. FORTRAN variable of type INTEGER. + //zipfile, path = FORTRAN variables of type CHARACTER. + STRING_FROM_FORTRAN zipfileff(zipfile, zipfile_len); + STRING_FROM_FORTRAN pathff(path, path_len); + zipfileff.strip_whitespace(); + pathff.strip_whitespace(); + boinc_zip(*zipmode,zipfileff.c_str(),pathff.c_str()); } } // extern "C" diff --git a/checkin_notes b/checkin_notes index a1be246f71..3017c322d1 100755 --- a/checkin_notes +++ b/checkin_notes @@ -1106,3 +1106,16 @@ David 27 Jan 2006 cs_prefs.C cs_scheduler.C cs_statefile.C + +David 27 Jan 2006 + - core client: initial checkin of new CPU scheduling code + (in a very incomplete state). + To enable it, define NEW_CPU_SCHED in a couple of .h files + + api/ + boinc_api_fortran.C + client/ + client_state.C,h + client_types.C,h + cs_apps.C + cs_scheduler.C diff --git a/client/client_state.C b/client/client_state.C index 7944711de7..dd8c9de2a9 100644 --- a/client/client_state.C +++ b/client/client_state.C @@ -457,6 +457,9 @@ bool CLIENT_STATE::poll_slow_events() { check_suspend_activities(suspend_reason); +#ifdef NEW_CPU_SCHED + cpu_scheduler.make_schedule(); +#else // Restart tasks on startup. // Do this here (rather than CLIENT_STATE::init()) // so that if we do benchmark on startup, @@ -468,6 +471,7 @@ bool CLIENT_STATE::poll_slow_events() { restart_tasks(); tasks_restarted = true; } +#endif // suspend or resume activities (but only if already did startup) // @@ -542,9 +546,13 @@ bool CLIENT_STATE::poll_slow_events() { POLL_ACTION(handle_pers_file_xfers , handle_pers_file_xfers ); } POLL_ACTION(handle_finished_apps , handle_finished_apps ); +#ifdef NEW_CPU_SCHED + cpu_scheduler.enforce(); +#else if (!tasks_suspended) { POLL_ACTION(schedule_cpus , schedule_cpus ); } +#endif if (!network_suspended) { POLL_ACTION(scheduler_rpc , scheduler_rpc_poll ); } diff --git a/client/client_state.h b/client/client_state.h index 5be603b0d6..204cc3faa6 100644 --- a/client/client_state.h +++ b/client/client_state.h @@ -20,6 +20,8 @@ #ifndef _CLIENT_STATE_ #define _CLIENT_STATE_ +//#define NEW_CPU_SCHED + #ifndef _WIN32 #include #include @@ -70,6 +72,14 @@ enum SUSPEND_REASON { SUSPEND_REASON_DISK_SIZE = 32 }; +#ifdef NEW_CPU_SCHED +struct CPU_SCHEDULER { + void do_accounting(); + void make_schedule(); + void enforce(); +}; +#endif + // CLIENT_STATE encapsulates the global variables of the core client. // If you add anything here, initialize it in the constructor // @@ -81,7 +91,11 @@ public: std::vector app_versions; std::vector workunits; std::vector results; +#ifdef NEW_CPU_SCHED + CPU_SCHEDULER cpu_scheduler; +#else std::vector cpus; +#endif NET_XFER_SET* net_xfers; PERS_FILE_XFER_SET* pers_file_xfers; @@ -292,13 +306,16 @@ private: int choose_version_num(char*, SCHEDULER_REPLY&); int app_finished(ACTIVE_TASK&); +#ifndef NEW_CPU_SCHED void assign_results_to_projects(); bool schedule_largest_debt_project(double expected_pay_off, int cpu_index); bool schedule_earliest_deadline_result(int cpu_index); - bool start_apps(); bool schedule_cpus(); +#endif + bool start_apps(); bool handle_finished_apps(); void handle_file_xfer_apps(); +public: int schedule_result(RESULT*); // --------------- cs_benchmark.C: @@ -455,9 +472,4 @@ extern double calculate_exponential_backoff( int n, double MIN, double MAX ); -#ifdef NEW_CPU_SCHED -class CPU_SCHEDULER { -}; -#endif - #endif diff --git a/client/client_types.C b/client/client_types.C index e3d1e61cfc..7fdce03381 100644 --- a/client/client_types.C +++ b/client/client_types.C @@ -1547,6 +1547,7 @@ FILE_INFO* RESULT::lookup_file_logical(const char* lname) { return 0; } +#ifndef NEW_CPU_SCHED CPU::CPU() : sched_last_time(0), @@ -1565,5 +1566,5 @@ void CPU::schedule_result(RESULT * r) result = r; sched_last_time = gstate.now; } - +#endif const char *BOINC_RCSID_b81ff9a584 = "$Id$"; diff --git a/client/client_types.h b/client/client_types.h index 071553a523..7a21513a55 100644 --- a/client/client_types.h +++ b/client/client_types.h @@ -22,6 +22,8 @@ // client_state.C (to cross-link objects) // +//#define NEW_CPU_SCHED + #ifndef _CLIENT_TYPES_ #define _CLIENT_TYPES_ @@ -53,6 +55,10 @@ #define FILE_NOT_PRESENT 0 #define FILE_PRESENT 1 +#ifdef NEW_CPU_SCHED + class PRESULT; +#endif + class FILE_INFO { public: char name[256]; @@ -297,7 +303,17 @@ public: // temporary used when scanning projects #ifdef NEW_CPU_SCHED + double cpu_share; + double working_cpu_share; + double needed_cpu_share; + bool in_emergency; + double emergency_budget; + double emergency_resource_share; + double emergency_eligible; + void get_ordered_runnable_results(std::vector&); void compute_cpu_share_needed(); + bool has_emergency(); + void allocate(double); #endif // vars related to file-transfer backoff @@ -469,8 +485,19 @@ struct RESULT { // used to keep cpu scheduler from scheduling a result twice // transient; used only within schedule_cpus() bool deadline_problem; +#ifdef NEW_CPU_SCHED + double cpu_share; +#endif }; +#ifdef NEW_CPU_SCHED +struct PRESULT { + RESULT* p; +}; +inline bool operator<(const PRESULT& a, const PRESULT& b) { + return a.p->report_deadline < b.p->report_deadline; +} +#else struct CPU { public: CPU(); @@ -483,5 +510,7 @@ public: RESULT *result; private: }; +#endif + #endif diff --git a/client/cs_apps.C b/client/cs_apps.C index 373c24b8a6..4a623f73bd 100644 --- a/client/cs_apps.C +++ b/client/cs_apps.C @@ -56,9 +56,11 @@ using std::vector; int CLIENT_STATE::quit_activities() { int retval; +#ifndef NEW_CPU_SCHED // calculate long-term debts (for state file) // adjust_debts(); +#endif retval = active_tasks.exit_tasks(); if (retval) { @@ -225,6 +227,8 @@ bool CLIENT_STATE::input_files_available(RESULT* rp) { } +#ifndef NEW_CPU_SCHED + // Choose a "best" runnable result for each project // // Values are returned in project->next_runnable_result @@ -302,6 +306,8 @@ void CLIENT_STATE::assign_results_to_projects() { } } +#endif + // if there's not an active task for the result, make one // int CLIENT_STATE::schedule_result(RESULT* rp) { @@ -317,6 +323,8 @@ int CLIENT_STATE::schedule_result(RESULT* rp) { return 0; } +#ifndef NEW_CPU_SCHED + // Schedule an active task for the project with the largest anticipated debt // among those that have a runnable result. // Return true iff a task was scheduled. @@ -378,6 +386,7 @@ bool CLIENT_STATE::schedule_earliest_deadline_result(int cpu_index) { --best_result->project->deadline_problem_count; return true; } +#endif // find total resource shares of all projects // @@ -417,6 +426,8 @@ double CLIENT_STATE::potentially_runnable_resource_share() { return x; } +#ifndef NEW_CPU_SCHED + // adjust project debts (short, long-term) // void CLIENT_STATE::adjust_debts() { @@ -693,12 +704,14 @@ bool CLIENT_STATE::schedule_cpus() { return true; } + // This is called when the client is initialized. // Try to restart any tasks that were running when we last shut down. // int CLIENT_STATE::restart_tasks() { return active_tasks.restart_tasks(ncpus); } +#endif void CLIENT_STATE::set_ncpus() { // int ncpus; @@ -708,8 +721,10 @@ void CLIENT_STATE::set_ncpus() { ncpus = 1; } if (ncpus > global_prefs.max_cpus) ncpus = global_prefs.max_cpus; +#ifndef NEW_CPU_SCHED if (ncpus != cpus.size()) cpus.resize(ncpus); +#endif } inline double force_fraction(double f) { @@ -774,4 +789,189 @@ void CLIENT_STATE::request_schedule_cpus(const char* where) { msg_printf(0, MSG_INFO, "Rescheduling CPU: %s", where); } +#ifdef NEW_CPU_SCHED + +void PROJECT::get_ordered_runnable_results(vector& presults) { + presults.clear(); + for (unsigned int i=0; iproject != this) continue; + if (!rp->runnable()) continue; + PRESULT pr; + pr.p = rp; + presults.push_back(pr); + } + sort(presults.begin(), presults.end()); +} + +// compute the immediate CPU shares needed for each result, +// and for the project, to meet all deadlines +// and finish past-due work as quickly as possible +// +void PROJECT::compute_cpu_share_needed() { + cpu_share = 0; + double time = gstate.now, dt, wcn, x; + vector presults; + unsigned int i; + + get_ordered_runnable_results(presults); + for (i=0; ireport_deadline < gstate.now) { + cpu_share += 1; + r->cpu_share = 1; + } else { + dt = r->report_deadline - time; + if (dt) { + wcn = r->estimated_cpu_time_remaining(); + double cs = wcn/dt; + if (cs > cpu_share) { + double x = (wcn + cpu_share*(time-gstate.now))/(r->report_deadline-gstate.now); + r->cpu_share = cpu_share - x; + cpu_share = x; + } else { + r->cpu_share = 0; + } + } else { + x = wcn/(time-gstate.now)-cpu_share; + cpu_share += x; + r->cpu_share = x; + } + } + time = r->report_deadline; + } + needed_cpu_share = cpu_share; +} + +bool PROJECT::has_emergency() { + return needed_cpu_share > working_cpu_share; +} + +// given a CPU share, choose results to run, +// and assign their CPU shares +// +void PROJECT::allocate(double cpu_share_left) { + vector presults; + + get_ordered_runnable_results(presults); + for (unsigned i=0; icpu_share) { + gstate.schedule_result(r); + if (r->cpu_share > cpu_share_left) { + r->cpu_share = cpu_share_left; + } + cpu_share_left -= r->cpu_share; + if (cpu_share_left == 0) break; + } + } +} + +#define EMERGENCY_LIMIT 86400*7 + +void CPU_SCHEDULER::do_accounting() { + unsigned int i; + static double last_time; + double dt = gstate.now - last_time; + last_time = gstate.now; + + for (i=0; iin_emergency) { + p->emergency_budget -= dt*p->emergency_resource_share; + if (p->emergency_budget < 0) { + p->emergency_eligible = false; + } + } else { + p->emergency_budget += dt; + if (p->emergency_budget > EMERGENCY_LIMIT) { + p->emergency_eligible = true; + p->emergency_budget = EMERGENCY_LIMIT; + } + } + } +} + +// Called whenever something happens that could change the schedule; +// Also called periodically (every few hours) +// in case results in a schedule are going slower or faster +// than expected (which could change the schedule) +// +void CPU_SCHEDULER::make_schedule() { + bool have_emergency = false; + double non_emergency_rs = 0, cs, cpu_share_left; + PROJECT* p; + unsigned int i; + + do_accounting(); + + for (i=0; icompute_cpu_share_needed(); + if (p->has_emergency() && p->emergency_eligible) { + have_emergency = true; + } else { + non_emergency_rs += p->working_cpu_share; + } + } + + cpu_share_left = gstate.ncpus; + if (have_emergency) { + for (i=0; ihas_emergency() && p->emergency_eligible) { + p->allocate(cpu_share_left); + if (cpu_share_left <= 0) break; + } + } + if (cpu_share_left) { + for (i=0; ihas_emergency() || p->emergency_eligible) { + cs = cpu_share_left*p->working_cpu_share/non_emergency_rs; + p->allocate(cs); + } + } + } + } else { + for (i=0; iworking_cpu_share/non_emergency_rs; + p->allocate(cs); + } + } +} + +// called every ~10 seconds to time-slice among results in a schedule +// Handle checkpoint detection here. +// make_schedule() has already been called. +// +void CPU_SCHEDULER::enforce() { + static bool first = true; + + if (first) { + //for results R in schedule by decreasing STD + // start R + first = false; + return; + } + + do_accounting(); + + //for each running task T not in schedule + // T.abort +} + +#endif + const char *BOINC_RCSID_7bf63ad771 = "$Id$"; diff --git a/client/cs_scheduler.C b/client/cs_scheduler.C index fc6348bad8..7e6c054f39 100644 --- a/client/cs_scheduler.C +++ b/client/cs_scheduler.C @@ -1059,12 +1059,14 @@ bool CLIENT_STATE::should_get_work() { return true; } +#ifndef NEW_CPU_SCHED // if the CPU started this time period overloaded, // let it process for a while to get out of the CPU overload state. // unless there is an override of some sort. if (!work_fetch_no_new_work || must_schedule_cpus) { set_scheduler_modes(); } +#endif return !work_fetch_no_new_work; } @@ -1242,107 +1244,7 @@ bool CLIENT_STATE::rr_misses_deadline(double per_cpu_proc_rate, double rrs, bool return deadline_missed; } -#if 0 -// simulate weighted round-robin scheduling, -// and see if any result misses its deadline. -// -bool CLIENT_STATE::round_robin_misses_deadline( - double per_cpu_proc_rate, double rrs -) { - std::vector booked_to; - int k; - unsigned int i, j; - RESULT* rp; - - SCOPE_MSG_LOG scope_messages(log_messages, CLIENT_MSG_LOG::DEBUG_SCHED_CPU); - - for (k=0; krunnable()) continue; - double project_proc_rate; - if (rrs) { - project_proc_rate = per_cpu_proc_rate * (p->resource_share/rrs); - } else { - project_proc_rate = per_cpu_proc_rate; // TODO - fix - } - for (j=0; jproject != p) continue; - if (!rp->runnable()) continue; - double first = booked_to[0]; - int ifirst = 0; - for (k=1; kestimated_cpu_time_remaining()/project_proc_rate; - scope_messages.printf("set_scheduler_modes() result %s: est %f, deadline %f\n", - rp->name, booked_to[ifirst], rp->report_deadline - ); - if (booked_to[ifirst] > rp->report_deadline) { - return true; - } - } - } - return false; -} -#endif - -#if 0 -// Simulate what will happen if we do EDF schedule starting now. -// Go through non-done results in EDF order, -// keeping track in "booked_to" of how long each CPU is occupied -// -bool CLIENT_STATE::edf_misses_deadline(double per_cpu_proc_rate) { - std::map::iterator it; - std::map results_by_deadline; - std::vector booked_to; - unsigned int i; - int j; - RESULT* rp; - - for (j=0; jcomputing_done()) continue; - if (rp->project->non_cpu_intensive) continue; - results_by_deadline[rp->report_deadline] = rp; - } - - for ( - it = results_by_deadline.begin(); - it != results_by_deadline.end(); - it++ - ) { - rp = (*it).second; - - // find the CPU that will be free first - // - double lowest_book = booked_to[0]; - int lowest_booked_cpu = 0; - for (j=1; jestimated_cpu_time_remaining() - /(per_cpu_proc_rate*CPU_PESSIMISM_FACTOR); - if (booked_to[lowest_booked_cpu] > rp->report_deadline) { - return true; - } - } - return false; -} -#endif +#ifndef NEW_CPU_SCHED // Decide on modes for work-fetch and CPU sched policies. // Namely, set the variables @@ -1447,6 +1349,7 @@ void CLIENT_STATE::set_scheduler_modes() { work_fetch_no_new_work = should_not_fetch_work; cpu_earliest_deadline_first = result_has_deadline_problem; } +#endif double CLIENT_STATE::work_needed_secs() { // Note that one CPDN result with 30 days remaining will not keep 4 CPUs busy today. @@ -1479,12 +1382,13 @@ void CLIENT_STATE::scale_duration_correction_factors(double factor) { } } -void CLIENT_STATE::force_reschedule_all_cpus() -{ +#ifndef NEW_CPU_SCHED +void CLIENT_STATE::force_reschedule_all_cpus() { for (std::vector::iterator it = cpus.begin(); it != cpus.end(); ++it) { (*it).must_schedule = true; } } +#endif // Choose a new host CPID. // Do scheduler RPCs to all projects to propagate the CPID