CPU scheduler

svn path=/trunk/boinc/; revision=9336
This commit is contained in:
David Anderson 2006-01-27 22:55:05 +00:00
parent 803f4728f8
commit ba6a479fb4
8 changed files with 163 additions and 34 deletions

View File

@ -1095,3 +1095,14 @@ David 27 Jan 2006
client/
cs_apps.C
David 27 Jan 2006
- core client: various scheduler fixes
(from John McLeod)
client/
client_state.C,h
client_types.C,h
cs_prefs.C
cs_scheduler.C
cs_statefile.C

View File

@ -125,6 +125,8 @@ CLIENT_STATE::CLIENT_STATE() {
have_tentative_project = false;
new_version_check_time = 0;
detach_console = false;
rr_results_fail_count = rr_last_results_fail_count = 0;
}
#if 0
@ -1445,5 +1447,13 @@ double calculate_exponential_backoff( int n, double MIN, double MAX) {
return rand_range(MIN, rmax);
}
bool CLIENT_STATE::network_is_intermittent() const
{
if (global_prefs.net_start_hour != global_prefs.net_end_hour) return true;
if (global_prefs.confirm_before_connecting) return true;
if ((now - network_last_unsuspended) < SECONDS_PER_DAY * 7) return true;
if ((now - network_last_unsuspended) < global_prefs.work_buf_min_days * SECONDS_PER_DAY) return true;
return false;
}
const char *BOINC_RCSID_e836980ee1 = "$Id$";

View File

@ -81,6 +81,7 @@ public:
std::vector<APP_VERSION*> app_versions;
std::vector<WORKUNIT*> workunits;
std::vector<RESULT*> results;
std::vector<CPU> cpus;
NET_XFER_SET* net_xfers;
PERS_FILE_XFER_SET* pers_file_xfers;
@ -150,6 +151,7 @@ public:
// Don't do CPU. See check_suspend_activities for logic
bool network_suspended;
// Don't do network. See check_suspend_network for logic
double network_last_unsuspended;
bool executing_as_daemon;
// true if --daemon is on the commandline
// this means we are running as a daemon on unix,
@ -186,6 +188,8 @@ private:
double total_cpu_time_this_period;
bool work_fetch_no_new_work;
bool cpu_earliest_deadline_first;
long rr_last_results_fail_count;
long rr_results_fail_count;
// --------------- acct_mgr.C:
public:
@ -231,6 +235,7 @@ public:
bool want_network();
void network_available();
bool no_gui_rpc;
bool network_id_intermittent() const;
private:
int link_app(PROJECT*, APP*);
int link_file_info(PROJECT*, FILE_INFO*);
@ -288,8 +293,8 @@ private:
int choose_version_num(char*, SCHEDULER_REPLY&);
int app_finished(ACTIVE_TASK&);
void assign_results_to_projects();
bool schedule_largest_debt_project(double expected_pay_off);
bool schedule_earliest_deadline_result();
bool schedule_largest_debt_project(double expected_pay_off, int cpu_index);
bool schedule_earliest_deadline_result(int cpu_index);
bool start_apps();
bool schedule_cpus();
bool handle_finished_apps();
@ -338,11 +343,13 @@ private:
// --------------- cs_scheduler.C:
public:
double work_needed_secs();
void force_reschedule_all_cpus();
PROJECT* next_project_master_pending();
PROJECT* next_project_need_work();
int make_scheduler_request(PROJECT*);
int handle_scheduler_reply(PROJECT*, char* scheduler_url, int& nresults);
int compute_work_requests();
bool network_is_intermittent() const;
SCHEDULER_OP* scheduler_op;
void scale_duration_correction_factors(double);
private:
@ -359,7 +366,7 @@ private:
bool no_work_for_a_cpu();
int proj_min_results(PROJECT*, double);
bool round_robin_misses_deadline(double, double);
bool rr_misses_deadline(double, double);
bool rr_misses_deadline(double, double, bool);
bool edf_misses_deadline(double);
void set_scheduler_modes();
void generate_new_host_cpid();

View File

@ -99,6 +99,7 @@ void PROJECT::init() {
work_request = 0;
work_request_urgency = WORK_FETCH_DONT_NEED;
duration_correction_factor = 1;
deadline_problem_count = 0;
}
// parse project fields from client_state.xml
@ -1476,6 +1477,18 @@ double RESULT::estimated_cpu_time_remaining() {
return estimated_cpu_time();
}
double RESULT::computation_deadline() {
double compute_deadline = report_deadline -
(gstate.global_prefs.cpu_scheduling_period_minutes*60 + // the number of seconds between task switches (some people are setting this to be a couple of days).
SECONDS_PER_DAY); // one day of slack
if (::gstate.network_is_intermittent()) {
// only needed if the network connection is unreliable.
compute_deadline -= gstate.global_prefs.work_buf_min_days*SECONDS_PER_DAY; // the number of seconds possible between INet connections
}
return compute_deadline;
}
// The given result has just completed successfully.
// Update the correction factor used to predict
// completion time for this project's results
@ -1534,4 +1547,23 @@ FILE_INFO* RESULT::lookup_file_logical(const char* lname) {
return 0;
}
CPU::CPU()
:
sched_last_time(0),
must_schedule(true), // a new one MUST be scheduled right now.
result(NULL)
{
}
CPU::~CPU()
{
gstate.request_schedule_cpus("Reduction in CPUs");
}
void CPU::schedule_result(RESULT * r)
{
result = r;
sched_last_time = gstate.now;
}
const char *BOINC_RCSID_b81ff9a584 = "$Id$";

View File

@ -241,6 +241,7 @@ public:
// fields used by CPU scheduler and work fetch
// everything from here on applies only to CPU intensive projects
int deadline_problem_count;
bool contactable();
// not suspended and not deferred and not no more work
bool runnable();
@ -399,6 +400,7 @@ struct RESULT {
char name[256];
char wu_name[256];
double report_deadline;
double computation_deadline();
std::vector<FILE_REF> output_files;
bool ready_to_report;
// we're ready to report this result to the server;
@ -466,7 +468,20 @@ struct RESULT {
bool already_selected;
// used to keep cpu scheduler from scheduling a result twice
// transient; used only within schedule_cpus()
bool deadline_problem;
};
struct CPU {
public:
CPU();
~CPU();
void schedule_result(RESULT * result);
double sched_last_time;
bool must_schedule;
RESULT *result;
private:
};
#endif

View File

@ -237,6 +237,7 @@ int CLIENT_STATE::suspend_network(int reason) {
int CLIENT_STATE::resume_network() {
msg_printf(NULL, MSG_INFO, "Resuming network activity");
network_last_unsuspended = now;
return 0;
}

View File

@ -442,7 +442,7 @@ PROJECT* CLIENT_STATE::find_project_with_overdue_results() {
if (have_sporadic_connection) {
return p;
}
if (gstate.now > r->report_deadline - REPORT_DEADLINE_CUSHION) {
if (gstate.now > r->computation_deadline()) {
return p;
}
if (gstate.now > r->completed_time + global_prefs.work_buf_min_days*SECONDS_PER_DAY) {
@ -537,7 +537,9 @@ int CLIENT_STATE::compute_work_requests() {
overall_work_fetch_urgency = WORK_FETCH_NEED_IMMEDIATELY;
} else if (global_work_need > 0) {
scope_messages.printf("compute_work_requests(): global work needed is greater than zero\n");
overall_work_fetch_urgency = WORK_FETCH_NEED;
// disconnected hosts need to keep the queue full at all times that they are connected.
// always connected hosts do not need to be as agressive at keeping the queue full.
overall_work_fetch_urgency = network_is_intermittent() ? WORK_FETCH_NEED_IMMEDIATELY : WORK_FETCH_NEED;
} else {
overall_work_fetch_urgency = WORK_FETCH_OK;
}
@ -615,7 +617,7 @@ int CLIENT_STATE::compute_work_requests() {
p->work_request = max(0.0,
//(2*work_min_period - estimated_time_to_starvation)
(work_min_period - estimated_time_to_starvation)
* ncpus
* avg_proc_rate() * p->resource_share / prrs
);
} else if (overall_work_fetch_urgency > WORK_FETCH_OK) {
@ -634,6 +636,13 @@ int CLIENT_STATE::compute_work_requests() {
"CLIENT_STATE::compute_work_requests(): client work need: %f sec, urgency %d\n",
global_work_need, overall_work_fetch_urgency
);
// Make certain that there is only one project with a positive work request.
PROJECT * next_proj = next_project_need_work();
for (i = 0; i < projects.size(); ++i)
{
if (projects[i] != next_proj) projects[i]->work_request = 0.0;
}
return 0;
}
@ -1052,8 +1061,8 @@ bool CLIENT_STATE::should_get_work() {
// if the CPU started this time period overloaded,
// let it process for a while to get out of the CPU overload state.
//
if (!work_fetch_no_new_work) {
// unless there is an override of some sort.
if (!work_fetch_no_new_work || must_schedule_cpus) {
set_scheduler_modes();
}
@ -1102,13 +1111,14 @@ bool CLIENT_STATE::no_work_for_a_cpu() {
// return true if round-robin scheduling will miss a deadline
//
bool CLIENT_STATE::rr_misses_deadline(double per_cpu_proc_rate, double rrs) {
bool CLIENT_STATE::rr_misses_deadline(double per_cpu_proc_rate, double rrs, bool current_work_only) {
PROJECT* p, *pbest;
RESULT* rp, *rpbest;
vector<RESULT*> active;
unsigned int i;
double x;
vector<RESULT*>::iterator it;
bool deadline_missed = false;
SCOPE_MSG_LOG scope_messages(log_messages, CLIENT_MSG_LOG::DEBUG_SCHED_CPU);
@ -1121,12 +1131,16 @@ bool CLIENT_STATE::rr_misses_deadline(double per_cpu_proc_rate, double rrs) {
p->pending.clear();
}
rr_results_fail_count = 0;
for (i=0; i<results.size(); i++) {
rp = results[i];
if (rp->aborted_via_gui) continue;
if (!rp->runnable()) continue;
if (rp->aborted_via_gui) continue;
if (rp->project->non_cpu_intensive) continue;
rp->rrsim_cpu_left = rp->estimated_cpu_time_remaining();
rp->deadline_problem = false;
p = rp->project;
if (p->active.size() < (unsigned int)ncpus) {
active.push_back(rp);
@ -1160,12 +1174,17 @@ bool CLIENT_STATE::rr_misses_deadline(double per_cpu_proc_rate, double rrs) {
// "rpbest" is first result to finish. Does it miss its deadline?
//
double diff = sim_now + rpbest->rrsim_finish_delay - rpbest->report_deadline;
double diff = sim_now + rpbest->rrsim_finish_delay - rpbest->computation_deadline();
if (diff > 0) {
scope_messages.printf(
"rr_sim: result %s misses deadline by %f\n", rpbest->name, diff
);
return true;
if (current_work_only) {
scope_messages.printf(
"rr_sim: result %s misses deadline by %f\n", rpbest->name, diff
);
rpbest->deadline_problem = true;
deadline_missed = true;
++rr_results_fail_count;
}
else return true;
}
// remove *rpbest from active set,
@ -1219,8 +1238,8 @@ bool CLIENT_STATE::rr_misses_deadline(double per_cpu_proc_rate, double rrs) {
sim_now += rpbest->rrsim_finish_delay;
}
scope_messages.printf( "rr_sim: deadlines met\n");
return false;
if (!deadline_missed) scope_messages.printf( "rr_sim: deadlines met\n");
return deadline_missed;
}
#if 0
@ -1332,20 +1351,19 @@ bool CLIENT_STATE::edf_misses_deadline(double per_cpu_proc_rate) {
// and print a message if we're changing their value
//
void CLIENT_STATE::set_scheduler_modes() {
RESULT* rp;
unsigned int i;
bool should_not_fetch_work = false;
bool use_earliest_deadline_first = false;
bool result_has_deadline_problem = false;
double total_proc_rate = avg_proc_rate();
double per_cpu_proc_rate = total_proc_rate/ncpus;
SCOPE_MSG_LOG scope_messages(log_messages, CLIENT_MSG_LOG::DEBUG_SCHED_CPU);
double rrs = runnable_resource_share();
if (rr_misses_deadline(per_cpu_proc_rate, rrs)) {
if (rr_misses_deadline(per_cpu_proc_rate, rrs, true)) {
// if round robin would miss a deadline, use EDF
//
use_earliest_deadline_first = true;
result_has_deadline_problem = true;
if (!no_work_for_a_cpu()) {
should_not_fetch_work = true;
}
@ -1356,12 +1374,13 @@ void CLIENT_STATE::set_scheduler_modes() {
PROJECT* p = next_project_need_work();
if (p && !p->runnable()) {
rrs += p->resource_share;
if (rr_misses_deadline(per_cpu_proc_rate, rrs)) {
if (rr_misses_deadline(per_cpu_proc_rate, rrs, false)) {
should_not_fetch_work = true;
}
}
}
#if 0
for (i=0; i<results.size(); i++) {
rp = results[i];
if (rp->computing_done()) continue;
@ -1370,7 +1389,8 @@ void CLIENT_STATE::set_scheduler_modes() {
// Is the nearest deadline within a day?
//
if (rp->report_deadline - gstate.now < 60 * 60 * 24) {
use_earliest_deadline_first = true;
result_has_deadline_problem = true;
rp->deadline_problem = true;
scope_messages.printf(
"set_scheduler_modes(): Less than 1 day until deadline.\n"
);
@ -1379,13 +1399,20 @@ void CLIENT_STATE::set_scheduler_modes() {
// is there a deadline < twice the users connect period?
//
if (rp->report_deadline - gstate.now < global_prefs.work_buf_min_days * SECONDS_PER_DAY * 2) {
use_earliest_deadline_first = true;
result_has_deadline_problem = true;
rp->deadline_problem = true;
scope_messages.printf(
"set_scheduler_modes(): Deadline is before reconnect time.\n"
);
}
}
#endif
for (i = 0; i < projects.size(); ++i) projects[i]->deadline_problem_count = 0;
for (i = 0; i < results.size(); ++i) {
if (results[i]->deadline_problem) ++results[i]->project->deadline_problem_count;
}
// display only when the policy changes to avoid once per second
//
if (work_fetch_no_new_work && !should_not_fetch_work) {
@ -1400,32 +1427,48 @@ void CLIENT_STATE::set_scheduler_modes() {
);
}
if (cpu_earliest_deadline_first && !use_earliest_deadline_first) {
if (cpu_earliest_deadline_first && !result_has_deadline_problem) {
msg_printf(NULL, MSG_INFO,
"Resuming round-robin CPU scheduling."
);
}
if (!cpu_earliest_deadline_first && use_earliest_deadline_first) {
if (!cpu_earliest_deadline_first && result_has_deadline_problem) {
msg_printf(NULL, MSG_INFO,
"Using earliest-deadline-first scheduling because computer is overcommitted."
"Using critical-deadline-first scheduling because computer is overcommitted."
);
}
if (rr_results_fail_count > rr_last_results_fail_count) {
force_reschedule_all_cpus();
}
rr_last_results_fail_count = rr_results_fail_count;
work_fetch_no_new_work = should_not_fetch_work;
cpu_earliest_deadline_first = use_earliest_deadline_first;
cpu_earliest_deadline_first = result_has_deadline_problem;
}
double CLIENT_STATE::work_needed_secs() {
double total_work = 0;
// Note that one CPDN result with 30 days remaining will not keep 4 CPUs busy today.
std::vector <double> work_for_cpu;
work_for_cpu.resize(ncpus);
for( int i = 0; i < ncpus; ++i) work_for_cpu[i] = 0;
for( unsigned int i = 0; i < results.size(); ++i) {
if (results[i]->project->non_cpu_intensive) continue;
total_work += results[i]->estimated_cpu_time_remaining();
int best_cpu = 0;
for (int j = 1; j < ncpus; ++j)
{
if (work_for_cpu[j] < work_for_cpu[best_cpu])
best_cpu = j;
}
work_for_cpu[best_cpu] += results[i]->estimated_cpu_time_remaining();
}
double x = global_prefs.work_buf_min_days*SECONDS_PER_DAY*avg_proc_rate() - total_work;
if (x < 0) {
return 0;
double total_work_need = 0;
for (int i =0; i < ncpus; ++i) {
double x = global_prefs.work_buf_min_days*SECONDS_PER_DAY*avg_proc_rate()/ncpus - work_for_cpu[i];
if (x < 0) total_work_need += x;
}
return x;
return total_work_need;
}
void CLIENT_STATE::scale_duration_correction_factors(double factor) {
@ -1436,6 +1479,13 @@ void CLIENT_STATE::scale_duration_correction_factors(double factor) {
}
}
void CLIENT_STATE::force_reschedule_all_cpus()
{
for (std::vector<CPU>::iterator it = cpus.begin(); it != cpus.end(); ++it) {
(*it).must_schedule = true;
}
}
// Choose a new host CPID.
// Do scheduler RPCs to all projects to propagate the CPID
//

View File

@ -309,6 +309,7 @@ int CLIENT_STATE::parse_state_file() {
continue;
} else if (parse_int(buf, "<user_network_request>", user_network_request)) {
continue;
} else if (parse_double(buf, "<network_last_unsuspended>", network_last_unsuspended)) {
} else if (parse_int(buf, "<core_client_major_version>", old_major_version)) {
} else if (parse_int(buf, "<core_client_minor_version>", old_minor_version)) {
} else if (parse_int(buf, "<core_client_release>", old_release)) {
@ -425,7 +426,8 @@ int CLIENT_STATE::write_state(MIOFILE& f) {
"<core_client_minor_version>%d</core_client_minor_version>\n"
"<core_client_release>%d</core_client_release>\n"
"<user_run_request>%d</user_run_request>\n"
"<user_network_request>%d</user_network_request>\n"
"<user_network_request>%f</user_network_request>\n"
"<network_last_unsuspended>%d</network_last_unsuspended>\n"
"%s"
"<new_version_check_time>%f</new_version_check_time>\n",
platform_name,
@ -434,6 +436,7 @@ int CLIENT_STATE::write_state(MIOFILE& f) {
core_client_release,
user_run_request,
user_network_request,
network_last_unsuspended,
cpu_benchmarks_pending?"<cpu_benchmarks_pending/>\n":"",
new_version_check_time
);