- scheduler: sweeping changes to the way job runtimes are estimated:

see http://boinc.berkeley.edu/trac/wiki/RuntimeEstimation


svn path=/trunk/boinc/; revision=21153
This commit is contained in:
David Anderson 2010-04-08 23:14:47 +00:00
parent 8d7081de05
commit 1d765245ed
10 changed files with 234 additions and 130 deletions

View File

@ -2661,3 +2661,14 @@ David 7 Apr 2010
sched_types.cpp
sched_version.cpp
sched_send.cpp
David 7 Apr 2010
- scheduler: sweeping changes to the way job runtimes are estimated:
see http://boinc.berkeley.edu/trac/wiki/RuntimeEstimation
sched/
credit.cpp,h
sched_send.cpp
sched_shmem.cpp,h
sched_types.cpp,h
sched_version.cpp,h

View File

@ -388,13 +388,6 @@ int grant_credit(
#define PFC_MODE_APPROX 1
// PFC was crudely approximated
#define MIN_HOST_SAMPLES 10
// use host scaling only if have this many samples for host
#define MIN_VERSION_SAMPLES 100
// update a version's scale only if it has this many samples
#define COBBLESTONE_SCALE 200/86400e9
// multiply normalized PFC by this to get Cobblestones
// used in the computation of AV scale factors
//
struct RSC_INFO {
@ -994,73 +987,6 @@ int assign_credit_set(
return 0;
}
// called from scheduler to update the host_scale_time field
// of host_app_version records for which we're sending jobs,
// and for which scale_probation is set.
// If the record is not there, create it.
//
int update_host_scale_times(
SCHED_SHMEM* ssp, vector<RESULT>& results, int hostid
) {
vector<DB_HOST_APP_VERSION> havs;
unsigned int i, j;
int retval;
for (i=0; i<results.size(); i++) {
RESULT& r = results[i];
int gavid = generalized_app_version_id(r.app_version_id, r.appid);
bool found=false;
for (j=0; j<havs.size(); j++) {
DB_HOST_APP_VERSION& hav = havs[j];
if (hav.app_version_id == gavid) {
found = true;
if (r.report_deadline > hav.host_scale_time) {
hav.host_scale_time = r.report_deadline;
}
}
}
if (!found) {
DB_HOST_APP_VERSION hav;
hav.host_id = hostid;
hav.app_version_id = gavid;
hav.host_scale_time = r.report_deadline;
havs.push_back(hav);
}
}
for (i=0; i<havs.size(); i++) {
char query[256], clause[512];
DB_HOST_APP_VERSION& hav = havs[i];
hav.scale_probation = true;
// host_app_version record may not exist; try to create it first
//
int retval = hav.insert();
if (retval) {
if (config.debug_credit) {
log_messages.printf(MSG_NORMAL,
"[credit] updating host scale time for (%d, %d)\n",
hav.host_id, hav.app_version_id
);
}
sprintf(query, "host_scale_time=%f", hav.host_scale_time);
sprintf(clause,
"host_id=%d and app_version_id=%d and scale_probation<>0",
hav.host_id, hav.app_version_id
);
retval = hav.update_fields_noid(query, clause);
// the above will fail if scale_probation is zero. not an error
} else {
if (config.debug_credit) {
log_messages.printf(MSG_NORMAL,
"[credit] created host_app_version record (%d, %d)\n",
hav.host_id, hav.app_version_id
);
}
}
}
return 0;
}
// A job has errored or timed out; put (host/app_version) on probation
// Called from transitioner
//

View File

@ -26,6 +26,13 @@
#define ERROR_RATE_INIT 0.1
// the initial error rate of a host or app version
#define MIN_HOST_SAMPLES 10
// use host scaling only if have this many samples for host
#define MIN_VERSION_SAMPLES 100
// update a version's scale only if it has this many samples
#define COBBLESTONE_SCALE 200/86400e9
// multiply normalized PFC by this to get Cobblestones
extern void compute_credit_rating(HOST&);
extern double credit_multiplier(int, time_t);
extern double fpops_to_credit(double fpops, double intops);
@ -44,10 +51,6 @@ extern int assign_credit_set(
double max_granted_credit, double& credit
);
extern int update_host_scale_times(
struct SCHED_SHMEM*, std::vector<RESULT>& results, int hostid
);
// if the result was anonymous platform,
// make a "pseudo ID" that combines the app ID and the resource type
//

View File

@ -50,6 +50,7 @@
#include "sched_timezone.h"
#include "sched_assign.h"
#include "sched_customize.h"
#include "sched_version.h"
#include "sched_send.h"
@ -274,7 +275,7 @@ static inline void get_running_frac() {
rf = g_reply->host.active_frac * g_reply->host.on_frac;
}
// clamp running_frac and DCF to a reasonable range
// clamp running_frac to a reasonable range
//
if (rf > 1) {
if (config.debug_send) {
@ -290,28 +291,6 @@ static inline void get_running_frac() {
g_wreq->running_frac = rf;
}
static inline void get_dcf() {
double dcf = g_reply->host.duration_correction_factor;
if (dcf > 10) {
if (config.debug_send) {
log_messages.printf(MSG_NORMAL,
"[send] DCF=%f; setting to 10\n", dcf
);
}
dcf = 10;
} else if (dcf == 0) {
dcf = 1;
} else if (dcf < 0.1) {
if (config.debug_send) {
log_messages.printf(MSG_NORMAL,
"[send] DCF=%f; setting to 0.1\n", dcf
);
}
dcf = 0.1;
}
g_wreq->dcf = dcf;
}
// estimate the amount of real time to complete this WU,
// taking into account active_frac etc.
// Note: don't factor in resource_share_fraction.
@ -320,9 +299,6 @@ static inline void get_dcf() {
double estimate_duration(WORKUNIT& wu, BEST_APP_VERSION& bav) {
double edu = estimate_duration_unscaled(wu, bav);
double ed = edu/g_wreq->running_frac;
if (!config.ignore_dcf) {
ed *= g_wreq->dcf;
}
if (config.debug_send) {
log_messages.printf(MSG_NORMAL,
"[send] est. duration for WU %d: unscaled %.2f scaled %.2f\n",
@ -865,6 +841,13 @@ static int add_wu_to_reply(
// modify the WU's xml_doc; add <name>, <rsc_*> etc.
//
wu2 = wu; // make copy since we're going to modify its XML field
// adjust FPOPS figures for anonymous platform
//
if (bavp->cavp) {
wu2.rsc_fpops_est *= bavp->cavp->rsc_fpops_scale;
wu2.rsc_fpops_bound *= bavp->cavp->rsc_fpops_scale;
}
retval = insert_wu_tags(wu2, *app);
if (retval) {
log_messages.printf(MSG_CRITICAL, "insert_wu_tags failed %d\n", retval);
@ -1040,12 +1023,7 @@ inline static int get_app_version_id(BEST_APP_VERSION* bavp) {
if (bavp->avp) {
return bavp->avp->id;
} else {
if (bavp->cavp->host_usage.ncudas) {
return ANON_PLATFORM_NVIDIA;
} else if (bavp->cavp->host_usage.natis) {
return ANON_PLATFORM_ATI;
}
return ANON_PLATFORM_CPU;
return bavp->cavp->host_usage.resource_type();
}
}
@ -1477,7 +1455,6 @@ void send_work_setup() {
g_wreq->disk_available = max_allowable_disk();
get_mem_sizes();
get_running_frac();
get_dcf();
g_wreq->get_job_limits();
g_wreq->seconds_to_fill = clamp_req_sec(g_request->work_req_seconds);
@ -1542,10 +1519,9 @@ void send_work_setup() {
(int)g_request->global_prefs.work_buf_min()
);
log_messages.printf(MSG_NORMAL,
"[send] active_frac %f on_frac %f DCF %f\n",
"[send] active_frac %f on_frac %f\n",
g_reply->host.active_frac,
g_reply->host.on_frac,
g_reply->host.duration_correction_factor
g_reply->host.on_frac
);
if (g_wreq->anonymous_platform) {
log_messages.printf(MSG_NORMAL,
@ -1562,6 +1538,100 @@ void send_work_setup() {
}
}
static void read_host_app_versions() {
DB_HOST_APP_VERSION hav;
char clause[256];
sprintf(clause, "where host_id=%d", g_reply->host.id);
while (!hav.enumerate(clause)) {
g_wreq->host_app_versions.push_back(hav);
}
}
// update the host_scale_time field of host_app_version records
// for which we're sending jobs, and for which scale_probation is set.
// If the record is not there, create it.
//
int update_host_scale_times(vector<RESULT>& results, int hostid) {
vector<DB_HOST_APP_VERSION> havs;
unsigned int i, j;
int retval;
// make a list of the host_app_versions and compute scale times
//
for (i=0; i<results.size(); i++) {
RESULT& r = results[i];
int gavid = generalized_app_version_id(r.app_version_id, r.appid);
bool found=false;
for (j=0; j<havs.size(); j++) {
DB_HOST_APP_VERSION& hav = havs[j];
if (hav.app_version_id == gavid) {
found = true;
if (r.report_deadline > hav.host_scale_time) {
hav.host_scale_time = r.report_deadline;
}
}
}
if (!found) {
DB_HOST_APP_VERSION hav;
hav.host_id = hostid;
hav.app_version_id = gavid;
hav.host_scale_time = r.report_deadline;
havs.push_back(hav);
}
}
char query[256], clause[512];
// go through the list
//
for (i=0; i<havs.size(); i++) {
DB_HOST_APP_VERSION& hav = havs[i];
// does it already exist in the DB?
//
HOST_APP_VERSION* havp = get_host_app_version(hav.app_version_id);
if (havp) {
if (havp->scale_probation) {
sprintf(query, "host_scale_time=%f", hav.host_scale_time);
sprintf(clause,
"host_id=%d and app_version_id=%d",
hav.host_id, hav.app_version_id
);
retval = hav.update_fields_noid(query, clause);
if (retval) {
log_messages.printf(MSG_CRITICAL,
"hav.update_fields_noid(): %d\n", retval
);
} else {
if (config.debug_credit) {
log_messages.printf(MSG_NORMAL,
"[credit] updating host scale time for (%d, %d)\n",
hav.host_id, hav.app_version_id
);
}
}
}
} else {
hav.scale_probation = true;
retval = hav.insert();
if (retval) {
log_messages.printf(MSG_CRITICAL,
"hav.insert(): %d\n", retval
);
} else {
if (config.debug_credit) {
log_messages.printf(MSG_NORMAL,
"[credit] created host_app_version record (%d, %d)\n",
hav.host_id, hav.app_version_id
);
}
}
}
}
return 0;
}
void send_work() {
int retval;
@ -1577,6 +1647,11 @@ void send_work() {
return;
}
read_host_app_versions();
if (g_wreq->anonymous_platform) {
estimate_flops_anon_platform();
}
get_host_info();
get_prefs_info();
@ -1642,7 +1717,7 @@ void send_work() {
}
done:
retval = update_host_scale_times(ssp, g_reply->results, g_reply->host.id);
retval = update_host_scale_times(g_reply->results, g_reply->host.id);
if (retval) {
log_messages.printf(MSG_CRITICAL,
"update_host_scale_times() failed: %d\n", retval

View File

@ -216,6 +216,13 @@ APP* SCHED_SHMEM::lookup_app(int id) {
return 0;
}
APP* SCHED_SHMEM::lookup_app_name(char* name) {
for (int i=0; i<napps; i++) {
if (!strcmp(name, apps[i].name)) return &apps[i];
}
return 0;
}
// find an app version for a given platform
//
APP_VERSION* SCHED_SHMEM::lookup_app_version(int appid, int platformid) {

View File

@ -109,6 +109,7 @@ struct SCHED_SHMEM {
#endif
APP* lookup_app(int);
APP* lookup_app_name(char*);
APP_VERSION* lookup_app_version(int appid, int platform);
PLATFORM* lookup_platform_id(int);
PLATFORM* lookup_platform(char*);

View File

@ -61,7 +61,11 @@ int CLIENT_APP_VERSION::parse(FILE* f) {
memset(this, 0, sizeof(CLIENT_APP_VERSION));
while (fgets(buf, sizeof(buf), f)) {
if (match_tag(buf, "</app_version>")) return 0;
if (match_tag(buf, "</app_version>")) {
app = ssp->lookup_app_name(app_name);
if (!app) return ERR_NOT_FOUND;
return 0;
}
if (parse_str(buf, "<app_name>", app_name, 256)) continue;
if (parse_str(buf, "<platform>", platform, 256)) continue;
if (parse_str(buf, "<plan_class>", plan_class, 256)) continue;
@ -232,8 +236,14 @@ const char* SCHEDULER_REQUEST::parse(FILE* fin) {
if (match_tag(buf, "</app_versions>")) break;
if (match_tag(buf, "<app_version>")) {
CLIENT_APP_VERSION cav;
cav.parse(fin);
client_app_versions.push_back(cav);
retval = cav.parse(fin);
if (retval) {
g_reply->insert_message(
"Invalid app version description", "high"
);
} else {
client_app_versions.push_back(cav);
}
}
}
continue;

View File

@ -88,7 +88,14 @@ struct HOST_USAGE {
if (flops <= 0) flops = 1e9;
strcpy(cmdline, "");
}
~HOST_USAGE(){}
inline int resource_type() {
if (ncudas) {
return ANON_PLATFORM_NVIDIA;
} else if (natis) {
return ANON_PLATFORM_ATI;
}
return ANON_PLATFORM_CPU;
}
};
// summary of a client's request for work, and our response to it
@ -160,7 +167,6 @@ struct WORK_REQ {
double disk_available;
double ram, usable_ram;
double running_frac;
double dcf;
int njobs_sent;
// The following keep track of the "easiest" job that was rejected
@ -257,6 +263,12 @@ struct CLIENT_APP_VERSION {
int version_num;
char plan_class[256];
HOST_USAGE host_usage;
double rsc_fpops_scale;
// multiply wu.rsc_fpops_est and rsc_fpops_limit
// by this amount when send to client,
// to reflect the discrepancy between how fast the client
// thinks the app is versus how fast we think it is
APP* app;
int parse(FILE*);
};

View File

@ -22,6 +22,7 @@
#include "sched_config.h"
#include "sched_customize.h"
#include "sched_types.h"
#include "credit.h"
#include "sched_version.h"
@ -80,7 +81,7 @@ CLIENT_APP_VERSION* get_app_version_anonymous(APP& app) {
for (i=0; i<g_request->client_app_versions.size(); i++) {
CLIENT_APP_VERSION& cav = g_request->client_app_versions[i];
if (strcmp(cav.app_name, app.name)) {
if (cav.app->id = app.id) {
continue;
}
if (cav.version_num < app.min_version) {
@ -116,6 +117,69 @@ CLIENT_APP_VERSION* get_app_version_anonymous(APP& app) {
return best;
}
HOST_APP_VERSION* get_host_app_version(int avid) {
for (unsigned int i=0; i<g_wreq->host_app_versions.size(); i++) {
HOST_APP_VERSION& hav = g_wreq->host_app_versions[i];
if (hav.app_version_id == avid) return &hav;
}
return NULL;
}
// called at start of send_work().
// Estimate FLOPS of anon platform versions,
// and compute scaling factor for wu.rsc_fpops
//
void estimate_flops_anon_platform() {
unsigned int i;
for (i=0; i<g_request->client_app_versions.size(); i++) {
CLIENT_APP_VERSION& cav = g_request->client_app_versions[i];
cav.rsc_fpops_scale = 1;
if (cav.host_usage.avg_ncpus == 0 && cav.host_usage.ncudas == 0 && cav.host_usage.natis == 0) {
cav.host_usage.avg_ncpus = 1;
}
// current clients fill in host_usage.flops with peak FLOPS
// if it's missing from app_info.xml;
// however, for older clients, we need to fill it in ourselves;
// assume it uses 1 CPU
//
if (cav.host_usage.flops == 0) {
cav.host_usage.flops = g_reply->host.p_fpops;
}
// At this point host_usage.flops is filled in with something.
// See if we have a better estimated based on history
//
HOST_APP_VERSION* havp = get_host_app_version(
generalized_app_version_id(
cav.host_usage.resource_type(), cav.app->id
)
);
if (havp && havp->et.n > MIN_HOST_SAMPLES) {
double new_flops = 1./havp->et.get_avg();
cav.rsc_fpops_scale = cav.host_usage.flops/new_flops;
cav.host_usage.flops = new_flops;
}
}
}
// if we have enough statistics to estimate the app version's
// actual FLOPS on this host, do so.
//
void estimate_flops(HOST_USAGE& hu, APP_VERSION& av) {
HOST_APP_VERSION* havp = get_host_app_version(av.id);
if (havp && havp->et.n > MIN_HOST_SAMPLES) {
double new_flops = 1./havp->et.get_avg();
hu.flops = new_flops;
} else {
if (av.pfc_scale) {
hu.flops *= av.pfc_scale;
}
}
}
// return BEST_APP_VERSION for the given host, or NULL if none
//
//
@ -212,15 +276,6 @@ BEST_APP_VERSION* get_app_version(WORKUNIT& wu, bool check_req) {
}
bav.host_usage = cavp->host_usage;
// if client didn't tell us about the app version,
// assume it uses 1 CPU
//
if (bav.host_usage.flops == 0) {
bav.host_usage.flops = g_reply->host.p_fpops;
}
if (bav.host_usage.avg_ncpus == 0 && bav.host_usage.ncudas == 0 && bav.host_usage.natis == 0) {
bav.host_usage.avg_ncpus = 1;
}
bav.cavp = cavp;
}
g_wreq->best_app_versions.push_back(bav);
@ -302,6 +357,8 @@ BEST_APP_VERSION* get_app_version(WORKUNIT& wu, bool check_req) {
continue;
}
estimate_flops(host_usage, av);
// pick the fastest version
//
if (host_usage.flops > bav.host_usage.flops) {

View File

@ -16,3 +16,5 @@
// along with BOINC. If not, see <http://www.gnu.org/licenses/>.
extern BEST_APP_VERSION* get_app_version(WORKUNIT&, bool check_req=false);
extern void estimate_flops_anon_platform();
extern HOST_APP_VERSION* get_host_app_version(int avid);