- feeder: added a new enumerator of DB_WORK_ITEM that,

on successive calls, scans through ALL the sendable
    jobs satisfying the select clause
    (it does this by ID order, so there's no order clause)
    This is used for HR, so that if a job has been committed
    to an HR class, we eventually get it.

    With extremely minimal testing, the new HR stuff seems to work.

db/
    boinc_db.C,h
sched/
    feeder.C
    sample_work_generator.C
    server_types.C

svn path=/trunk/boinc/; revision=12988
This commit is contained in:
David Anderson 2007-06-22 23:48:37 +00:00
parent 0a19dddf18
commit a97556bdfd
6 changed files with 86 additions and 7 deletions

View File

@ -6595,3 +6595,20 @@ David 22 June 2007
prefs.inc
client/
app_control.C
David 22 June 2007
- feeder: added a new enumerator of DB_WORK_ITEM that,
on successive calls, scans through ALL the sendable
jobs satisfying the select clause
(it does this by ID order, so there's no order clause)
This is used for HR, so that if a job has been committed
to an HR class, we eventually get it.
With extremely minimal testing, the new HR stuff seems to work.
db/
boinc_db.C,h
sched/
feeder.C
sample_work_generator.C
server_types.C

View File

@ -95,7 +95,10 @@ DB_TRANSITIONER_ITEM_SET::DB_TRANSITIONER_ITEM_SET(DB_CONN* dc) :
DB_VALIDATOR_ITEM_SET::DB_VALIDATOR_ITEM_SET(DB_CONN* dc) :
DB_BASE_SPECIAL(dc?dc:&boinc_db){}
DB_WORK_ITEM::DB_WORK_ITEM(DB_CONN* dc) :
DB_BASE_SPECIAL(dc?dc:&boinc_db){}
DB_BASE_SPECIAL(dc?dc:&boinc_db
){
start_id = 0;
}
DB_IN_PROGRESS_RESULT::DB_IN_PROGRESS_RESULT(DB_CONN* dc) :
DB_BASE_SPECIAL(dc?dc:&boinc_db){}
DB_SCHED_RESULT_ITEM_SET::DB_SCHED_RESULT_ITEM_SET(DB_CONN* dc) :
@ -1330,6 +1333,51 @@ int DB_WORK_ITEM::enumerate(
return 0;
}
int DB_WORK_ITEM::enumerate_all(
int limit, const char* select_clause
) {
char query[MAX_QUERY_LEN];
int retval;
MYSQL_ROW row;
if (!cursor.active) {
sprintf(query,
"select high_priority r2.id, r2.priority, workunit.* from result r1, result r2, workunit "
" where r1.server_state=%d and r2.id=r1.id and r1.workunitid=workunit.id and r1.id>%d "
" %s "
"limit %d",
RESULT_SERVER_STATE_UNSENT,
start_id,
select_clause,
limit
);
retval = db->do_query(query);
if (retval) return mysql_errno(db->mysql);
cursor.rp = mysql_store_result(db->mysql);
if (!cursor.rp) return mysql_errno(db->mysql);
// if query gets no rows, start over in ID space
//
if (mysql_num_rows(cursor.rp) == 0) {
mysql_free_result(cursor.rp);
start_id = 0;
return ERR_DB_NOT_FOUND;
}
cursor.active = true;
}
row = mysql_fetch_row(cursor.rp);
if (!row) {
mysql_free_result(cursor.rp);
cursor.active = false;
retval = mysql_errno(db->mysql);
if (retval) return retval;
return ERR_DB_NOT_FOUND;
} else {
parse(row);
start_id = res_id;
}
return 0;
}
void IN_PROGRESS_RESULT::parse(MYSQL_ROW& r) {
int i=0;

View File

@ -680,12 +680,19 @@ struct WORK_ITEM {
};
class DB_WORK_ITEM : public WORK_ITEM, public DB_BASE_SPECIAL {
int start_id;
// when enumerate_all is used, keeps track of which ID to start from
public:
DB_WORK_ITEM(DB_CONN* p=0);
int enumerate(
int limit, const char* select_clause, const char* order_clause
);
// used by feeder
int enumerate_all(
int limit, const char* select_clause
);
// used by feeder when HR is used.
// Successive calls cycle through all results.
int read_result();
// used by scheduler to read result server state
int update();

View File

@ -224,9 +224,14 @@ static bool get_job_from_db(
strcpy(select_clause, mod_select_clause);
enum_size = enum_limit;
}
int hrt = ssp->apps[app_index].homogeneous_redundancy;
while (1) {
retval = wi.enumerate(enum_size, select_clause, order_clause);
if (hrt) {
retval = wi.enumerate_all(enum_size, select_clause);
} else {
retval = wi.enumerate(enum_size, select_clause, order_clause);
}
if (retval) {
printf("Reached end of enum for app %d\n", app_index);
// we've reach the end of the result set
@ -287,7 +292,6 @@ static bool get_job_from_db(
// if using HR, check whether we've exceeded quota for this class
//
int hrt = ssp->apps[app_index].homogeneous_redundancy;
if (hrt) {
if (!hr_info.accept(hrt, wi.wu.hr_class)) {
log_messages.printf(
@ -375,7 +379,6 @@ static bool scan_work_array(vector<DB_WORK_ITEM> &work_items) {
break;
}
case WR_STATE_EMPTY:
printf("doing slot %d app %d\n", i, app_index);
found = get_job_from_db(
wi, app_index, enum_phase[app_index], ncollisions
);
@ -635,9 +638,8 @@ int main(int argc, char** argv) {
}
ssp->scan_tables();
log_messages.printf(
SCHED_MSG_LOG::MSG_NORMAL,
"feeder: read "
log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,
"read "
"%d platforms, "
"%d apps, "
"%d app_versions\n",
@ -645,6 +647,9 @@ int main(int argc, char** argv) {
ssp->napps,
ssp->napp_versions
);
log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL,
"Using %d job slots\n", ssp->max_wu_results
);
app_indices = (int*) calloc(ssp->max_wu_results, sizeof(int));

View File

@ -33,6 +33,7 @@
#include "error_numbers.h"
#include "backend_lib.h"
#include "parse.h"
#include "util.h"
#include "sched_config.h"
#include "sched_util.h"

View File

@ -25,6 +25,7 @@ using namespace std;
#include "parse.h"
#include "error_numbers.h"
#include "str_util.h"
#include "util.h"
#include "main.h"
#include "sched_util.h"
#include "sched_msgs.h"