- scheduler: add support for resource-specific scheduler requests:

- parse new request message elements
        (CPU and coproc requested seconds and instances)
    - decide how many jobs to send based on these params
    - select app version based on these params
        (may send both CPU and CUDA app versions for the same app!)

svn path=/trunk/boinc/; revision=16861
This commit is contained in:
David Anderson 2009-01-10 00:43:33 +00:00
parent d2c6cbc7c5
commit a9050243d6
9 changed files with 162 additions and 29 deletions

View File

@ -77,3 +77,20 @@ David 9 Jan 2009
client/
time_stats.cpp
David 9 Jan 2009
- scheduler: add support for resource-specific scheduler requests:
- parse new request message elements
(CPU and coproc requested seconds and instances)
- decide how many jobs to send based on these params
- select app version based on these params
(may send both CPU and CUDA app versions for the same app!)
lib/
coproc.cpp
sched/
handle_request.cpp
main.cpp,h
sched_plan.cpp
sched_send.cpp
server_types.cpp,h

View File

@ -109,6 +109,7 @@ int COPROCS::parse(FILE* fin) {
int retval = cc->parse(fin);
if (!retval) {
coprocs.push_back(cc);
coproc_cuda = cc;
}
}
}

View File

@ -1130,12 +1130,11 @@ void handle_msgs_to_host() {
static void log_request() {
log_messages.printf(MSG_NORMAL,
"Request: [USER#%d] [HOST#%d] [IP %s] client %d.%d.%d, work req %d sec\n",
"Request: [USER#%d] [HOST#%d] [IP %s] client %d.%d.%d\n",
g_reply->user.id, g_reply->host.id, get_remote_addr(),
g_request->core_client_major_version,
g_request->core_client_minor_version,
g_request->core_client_release,
(int)g_request->work_req_seconds
g_request->core_client_release
);
if (config.debug_request_details) {
log_messages.printf(MSG_DEBUG,

View File

@ -80,6 +80,7 @@ static bool db_opened=false;
SCHED_SHMEM* ssp = 0;
bool batch = false;
bool mark_jobs_done = false;
bool all_apps_use_hr;
static void usage(char* p) {
fprintf(stderr,
@ -323,6 +324,14 @@ void attach_to_feeder_shmem() {
exit(0);
}
}
all_apps_use_hr = true;
for (int i=0; i<ssp->napps; i++) {
if (!ssp->apps[i].homogeneous_redundancy) {
all_apps_use_hr = false;
break;
}
}
}
int main(int argc, char** argv) {

View File

@ -67,6 +67,7 @@ extern bool batch;
extern bool mark_jobs_done;
// mark jobs as successfully done immediately after send
// (for debugging/testing)
extern bool all_apps_use_hr;
extern int open_database();
extern void debug_sched(const char *trigger);

View File

@ -60,6 +60,12 @@ bool app_plan(SCHEDULER_REQUEST& sreq, char* plan_class, HOST_USAGE& hu) {
hu.max_ncpus = nthreads;
sprintf(hu.cmdline, "--nthreads %d", nthreads);
hu.flops = 0.95*sreq.host.p_fpops*nthreads;
if (config.debug_version_select) {
log_messages.printf(MSG_DEBUG,
"Multi-thread app estimate %.2f GFLOPS\n",
hu.flops/1e9
);
}
return true;
} else if (!strcmp(plan_class, "cuda")) {
if (g_wreq->no_gpus) {
@ -105,7 +111,7 @@ bool app_plan(SCHEDULER_REQUEST& sreq, char* plan_class, HOST_USAGE& hu) {
if (config.debug_version_select) {
log_messages.printf(MSG_DEBUG,
"CUDA app estimated %.2f GFLOPS (clock %d count %d)\n",
hu.flops/GIGA, cp2->prop.clockRate,
hu.flops/1e9, cp2->prop.clockRate,
cp2->prop.multiProcessorCount
);
}

View File

@ -74,8 +74,8 @@ const char* infeasible_string(int code) {
return "Unknown";
}
const int MIN_SECONDS_TO_SEND = 0;
const int MAX_SECONDS_TO_SEND = (28*SECONDS_IN_DAY);
const double MIN_REQ_SECS = 0;
const double MAX_REQ_SECS = (28*SECONDS_IN_DAY);
// return a number that
// - is the # of CPUs in EDF simulation
@ -126,6 +126,16 @@ BEST_APP_VERSION* get_app_version(WORKUNIT& wu) {
bavp = g_wreq->best_app_versions[i];
if (bavp->appid == wu.appid) {
if (!bavp->avp) return NULL;
// if we previously chose a CUDA app but don't need more CUDA work,
// reset pointer and see if there's another app
//
if (g_wreq->rsc_spec_request
&& bavp->host_usage.cuda_instances() > 0
&& !g_wreq->need_cuda()
) {
bavp = NULL;
}
return bavp;
}
}
@ -204,6 +214,25 @@ BEST_APP_VERSION* get_app_version(WORKUNIT& wu) {
} else {
host_usage.sequential_app(g_reply->host.p_fpops);
}
if (host_usage.cuda_instances()) {
if (!g_wreq->need_cuda()) {
if (config.debug_version_select) {
log_messages.printf(MSG_DEBUG,
"Don't need CUDA jobs, skipping\n"
);
}
continue;
}
} else {
if (!g_wreq->need_cpu()) {
if (config.debug_version_select) {
log_messages.printf(MSG_DEBUG,
"Don't need CPU jobs, skipping\n"
);
}
continue;
}
}
if (host_usage.flops > bavp->host_usage.flops) {
bavp->host_usage = host_usage;
bavp->avp = &av;
@ -761,7 +790,7 @@ int add_wu_to_reply(
"[HOST#%d] Sending app_version %s %d %d %s; %.2f GFLOPS\n",
g_reply->host.id, app->name,
avp2->platformid, avp2->version_num, avp2->plan_class,
bavp->host_usage.flops/GIGA
bavp->host_usage.flops/1e9
);
}
}
@ -856,8 +885,6 @@ bool work_needed(bool locality_sched) {
return false;
}
}
if (g_wreq->seconds_to_fill <= 0) return false;
if (g_wreq->nresults >= config.max_wus_to_send) return false;
int ncpus = effective_ncpus();
@ -888,7 +915,15 @@ bool work_needed(bool locality_sched) {
return false;
}
}
return true;
if (g_wreq->nresults >= config.max_wus_to_send) return false;
if (g_wreq->rsc_spec_request) {
if (g_wreq->need_cpu()) return true;
if (g_wreq->need_cuda()) return true;
} else {
if (g_wreq->seconds_to_fill > 0) return true;
}
return false;
}
void SCHEDULER_REPLY::got_good_result() {
@ -907,7 +942,6 @@ void SCHEDULER_REPLY::got_bad_result() {
int add_result_to_reply(DB_RESULT& result, WORKUNIT& wu, BEST_APP_VERSION* bavp) {
int retval;
double wu_seconds_filled;
bool resent_result = false;
APP* app = ssp->lookup_app(wu.appid);
@ -995,11 +1029,11 @@ int add_result_to_reply(DB_RESULT& result, WORKUNIT& wu, BEST_APP_VERSION* bavp)
}
if (retval) return retval;
wu_seconds_filled = estimate_duration(wu, *bavp);
double est_dur = estimate_duration(wu, *bavp);
if (config.debug_send) {
log_messages.printf(MSG_NORMAL,
"[HOST#%d] Sending [RESULT#%d %s] (fills %.2f seconds)\n",
g_reply->host.id, result.id, result.name, wu_seconds_filled
"[HOST#%d] Sending [RESULT#%d %s] (est. dur. %.2f seconds)\n",
g_reply->host.id, result.id, result.name, est_dur
);
}
@ -1032,8 +1066,19 @@ int add_result_to_reply(DB_RESULT& result, WORKUNIT& wu, BEST_APP_VERSION* bavp)
}
result.bavp = bavp;
g_reply->insert_result(result);
g_wreq->seconds_to_fill -= wu_seconds_filled;
g_request->estimated_delay += wu_seconds_filled/effective_ncpus();
if (g_wreq->rsc_spec_request) {
double cuda_instances = bavp->host_usage.cuda_instances();
if (cuda_instances) {
g_wreq->cuda_req_secs -= est_dur;
g_wreq->cuda_req_instances -= cuda_instances;
} else {
g_wreq->cpu_req_secs -= est_dur;
g_wreq->cpu_req_instances -= bavp->host_usage.avg_ncpus;
}
} else {
g_wreq->seconds_to_fill -= est_dur;
}
g_request->estimated_delay += est_dur;
g_wreq->nresults++;
g_wreq->nresults_on_host++;
if (!resent_result) g_reply->host.nresults_today++;
@ -1041,7 +1086,6 @@ int add_result_to_reply(DB_RESULT& result, WORKUNIT& wu, BEST_APP_VERSION* bavp)
// add this result to workload for simulation
//
if (config.workload_sim && g_request->have_other_results_list) {
double est_dur = estimate_duration(wu, *bavp);
IP_RESULT ipr ("", time(0)+wu.delay_bound, est_dur);
g_request->ip_results.push_back(ipr);
}
@ -1353,12 +1397,35 @@ void set_trust() {
}
}
static double clamp_req_sec(double x) {
if (x < MIN_REQ_SECS) return MIN_REQ_SECS;
if (x > MAX_REQ_SECS) return MAX_REQ_SECS;
return x;
}
void send_work() {
if (g_request->work_req_seconds <= 0) return;
// decipher request type, fill in WORK_REQ, and leave if no request
//
g_wreq->seconds_to_fill = clamp_req_sec(g_request->work_req_seconds);
g_wreq->cpu_req_secs = clamp_req_sec(g_request->cpu_req_secs);
g_wreq->cpu_req_instances = g_request->cpu_req_instances;
if (coproc_cuda) {
g_wreq->cuda_req_secs = clamp_req_sec(coproc_cuda->req_secs);
g_wreq->cuda_req_instances = coproc_cuda->req_instances;
}
if (g_wreq->cpu_req_secs || g_wreq->cuda_req_secs) {
g_wreq->rsc_spec_request = true;
} else {
if (g_wreq->seconds_to_fill == 0) return;
g_wreq->rsc_spec_request = true;
}
g_wreq->disk_available = max_allowable_disk();
if (hr_unknown_platform(g_request->host)) {
if (all_apps_use_hr && hr_unknown_platform(g_request->host)) {
log_messages.printf(MSG_INFO,
"Not sending work because unknown HR class\n"
);
g_wreq->hr_reject_perm = true;
return;
}
@ -1369,6 +1436,18 @@ void send_work() {
set_trust();
if (config.debug_send) {
log_messages.printf(MSG_DEBUG,
"CPU: req %.2f sec, %.2f instances\n",
g_wreq->cpu_req_secs, g_wreq->cpu_req_instances
);
log_messages.printf(MSG_DEBUG,
"CUDA: req %.2f sec, %.2f instances\n",
g_wreq->cuda_req_secs, g_wreq->cuda_req_instances
);
log_messages.printf(MSG_DEBUG,
"work_req_seconds: %.2f secs\n",
g_wreq->seconds_to_fill
);
log_messages.printf(MSG_DEBUG,
"%s matchmaker scheduling; %s EDF sim\n",
config.matchmaker?"Using":"Not using",
@ -1388,14 +1467,6 @@ void send_work() {
);
}
g_wreq->seconds_to_fill = g_request->work_req_seconds;
if (g_wreq->seconds_to_fill > MAX_SECONDS_TO_SEND) {
g_wreq->seconds_to_fill = MAX_SECONDS_TO_SEND;
}
if (g_wreq->seconds_to_fill < MIN_SECONDS_TO_SEND) {
g_wreq->seconds_to_fill = MIN_SECONDS_TO_SEND;
}
if (config.enable_assignment) {
if (send_assigned_jobs()) {
if (config.debug_assignment) {

View File

@ -151,6 +151,8 @@ const char* SCHEDULER_REQUEST::parse(FILE* fin) {
core_client_release = 0;
rpc_seqno = 0;
work_req_seconds = 0;
cpu_req_secs = 0;
cpu_req_instances = 0;
resource_share_fraction = 1.0;
rrs_fraction = 1.0;
prrs_fraction = 1.0;
@ -216,6 +218,8 @@ const char* SCHEDULER_REQUEST::parse(FILE* fin) {
if (parse_int(buf, "<core_client_minor_version>", core_client_minor_version)) continue;
if (parse_int(buf, "<core_client_release>", core_client_release)) continue;
if (parse_double(buf, "<work_req_seconds>", work_req_seconds)) continue;
if (parse_double(buf, "<cpu_req_secs>", cpu_req_secs)) continue;
if (parse_double(buf, "<cpu_req_instances>", cpu_req_instances)) continue;
if (parse_double(buf, "<resource_share_fraction>", resource_share_fraction)) continue;
if (parse_double(buf, "<rrs_fraction>", rrs_fraction)) continue;
if (parse_double(buf, "<prrs_fraction>", prrs_fraction)) continue;

View File

@ -83,6 +83,11 @@ struct HOST_USAGE {
if (flops <= 0) flops = 1e9;
strcpy(cmdline, "");
}
double cuda_instances() {
COPROC* cp = coprocs.lookup("CUDA");
if (cp) return cp->count;
return 0;
}
~HOST_USAGE(){}
};
@ -118,9 +123,27 @@ struct WORK_REQ {
bool trust;
// whether to send unreplicated jobs
// 6.7+ clients send separate requests for different resource types:
//
double cpu_req_secs; // instance-seconds requested
double cpu_req_instances; // number of idle instances, use if possible
double cuda_req_secs;
double cuda_req_instances;
inline bool need_cpu() {
return (cpu_req_secs>0) || (cpu_req_instances>0);
}
inline bool need_cuda() {
return (cuda_req_secs>0) || (cuda_req_instances>0);
}
// older clients send send a single number, the requested duration of jobs
//
double seconds_to_fill;
// in "normalized CPU seconds"; see
// http://boinc.berkeley.edu/trac/wiki/ClientSched#NormalizedCPUTime
// true if new-type request
//
bool rsc_spec_request;
double disk_available;
int nresults;
@ -271,6 +294,8 @@ struct SCHEDULER_REQUEST {
int rpc_seqno;
double work_req_seconds;
// in "normalized CPU seconds" (see work_req.php)
double cpu_req_secs;
double cpu_req_instances;
double resource_share_fraction;
// this project's fraction of total resource share
double rrs_fraction;