From a97556bdfdb658aae2a80a88ab834e9fda50de64 Mon Sep 17 00:00:00 2001 From: David Anderson Date: Fri, 22 Jun 2007 23:48:37 +0000 Subject: [PATCH] - feeder: added a new enumerator of DB_WORK_ITEM that, on successive calls, scans through ALL the sendable jobs satisfying the select clause (it does this by ID order, so there's no order clause) This is used for HR, so that if a job has been committed to an HR class, we eventually get it. With extremely minimal testing, the new HR stuff seems to work. db/ boinc_db.C,h sched/ feeder.C sample_work_generator.C server_types.C svn path=/trunk/boinc/; revision=12988 --- checkin_notes | 17 ++++++++++++ db/boinc_db.C | 50 ++++++++++++++++++++++++++++++++++- db/boinc_db.h | 7 +++++ sched/feeder.C | 17 +++++++----- sched/sample_work_generator.C | 1 + sched/server_types.C | 1 + 6 files changed, 86 insertions(+), 7 deletions(-) diff --git a/checkin_notes b/checkin_notes index c98767f458..91b95038db 100755 --- a/checkin_notes +++ b/checkin_notes @@ -6595,3 +6595,20 @@ David 22 June 2007 prefs.inc client/ app_control.C + +David 22 June 2007 + - feeder: added a new enumerator of DB_WORK_ITEM that, + on successive calls, scans through ALL the sendable + jobs satisfying the select clause + (it does this by ID order, so there's no order clause) + This is used for HR, so that if a job has been committed + to an HR class, we eventually get it. + + With extremely minimal testing, the new HR stuff seems to work. + + db/ + boinc_db.C,h + sched/ + feeder.C + sample_work_generator.C + server_types.C diff --git a/db/boinc_db.C b/db/boinc_db.C index 3e35585035..8e373ec510 100644 --- a/db/boinc_db.C +++ b/db/boinc_db.C @@ -95,7 +95,10 @@ DB_TRANSITIONER_ITEM_SET::DB_TRANSITIONER_ITEM_SET(DB_CONN* dc) : DB_VALIDATOR_ITEM_SET::DB_VALIDATOR_ITEM_SET(DB_CONN* dc) : DB_BASE_SPECIAL(dc?dc:&boinc_db){} DB_WORK_ITEM::DB_WORK_ITEM(DB_CONN* dc) : - DB_BASE_SPECIAL(dc?dc:&boinc_db){} + DB_BASE_SPECIAL(dc?dc:&boinc_db +){ + start_id = 0; +} DB_IN_PROGRESS_RESULT::DB_IN_PROGRESS_RESULT(DB_CONN* dc) : DB_BASE_SPECIAL(dc?dc:&boinc_db){} DB_SCHED_RESULT_ITEM_SET::DB_SCHED_RESULT_ITEM_SET(DB_CONN* dc) : @@ -1330,6 +1333,51 @@ int DB_WORK_ITEM::enumerate( return 0; } +int DB_WORK_ITEM::enumerate_all( + int limit, const char* select_clause +) { + char query[MAX_QUERY_LEN]; + int retval; + MYSQL_ROW row; + if (!cursor.active) { + sprintf(query, + "select high_priority r2.id, r2.priority, workunit.* from result r1, result r2, workunit " + " where r1.server_state=%d and r2.id=r1.id and r1.workunitid=workunit.id and r1.id>%d " + " %s " + "limit %d", + RESULT_SERVER_STATE_UNSENT, + start_id, + select_clause, + limit + ); + retval = db->do_query(query); + if (retval) return mysql_errno(db->mysql); + cursor.rp = mysql_store_result(db->mysql); + if (!cursor.rp) return mysql_errno(db->mysql); + + // if query gets no rows, start over in ID space + // + if (mysql_num_rows(cursor.rp) == 0) { + mysql_free_result(cursor.rp); + start_id = 0; + return ERR_DB_NOT_FOUND; + } + cursor.active = true; + } + row = mysql_fetch_row(cursor.rp); + if (!row) { + mysql_free_result(cursor.rp); + cursor.active = false; + retval = mysql_errno(db->mysql); + if (retval) return retval; + return ERR_DB_NOT_FOUND; + } else { + parse(row); + start_id = res_id; + } + return 0; +} + void IN_PROGRESS_RESULT::parse(MYSQL_ROW& r) { int i=0; diff --git a/db/boinc_db.h b/db/boinc_db.h index 019982ac37..863f3623ce 100755 --- a/db/boinc_db.h +++ b/db/boinc_db.h @@ -680,12 +680,19 @@ struct WORK_ITEM { }; class DB_WORK_ITEM : public WORK_ITEM, public DB_BASE_SPECIAL { + int start_id; + // when enumerate_all is used, keeps track of which ID to start from public: DB_WORK_ITEM(DB_CONN* p=0); int enumerate( int limit, const char* select_clause, const char* order_clause ); // used by feeder + int enumerate_all( + int limit, const char* select_clause + ); + // used by feeder when HR is used. + // Successive calls cycle through all results. int read_result(); // used by scheduler to read result server state int update(); diff --git a/sched/feeder.C b/sched/feeder.C index 852e1ffa84..c8d5d76af5 100644 --- a/sched/feeder.C +++ b/sched/feeder.C @@ -224,9 +224,14 @@ static bool get_job_from_db( strcpy(select_clause, mod_select_clause); enum_size = enum_limit; } + int hrt = ssp->apps[app_index].homogeneous_redundancy; while (1) { - retval = wi.enumerate(enum_size, select_clause, order_clause); + if (hrt) { + retval = wi.enumerate_all(enum_size, select_clause); + } else { + retval = wi.enumerate(enum_size, select_clause, order_clause); + } if (retval) { printf("Reached end of enum for app %d\n", app_index); // we've reach the end of the result set @@ -287,7 +292,6 @@ static bool get_job_from_db( // if using HR, check whether we've exceeded quota for this class // - int hrt = ssp->apps[app_index].homogeneous_redundancy; if (hrt) { if (!hr_info.accept(hrt, wi.wu.hr_class)) { log_messages.printf( @@ -375,7 +379,6 @@ static bool scan_work_array(vector &work_items) { break; } case WR_STATE_EMPTY: - printf("doing slot %d app %d\n", i, app_index); found = get_job_from_db( wi, app_index, enum_phase[app_index], ncollisions ); @@ -635,9 +638,8 @@ int main(int argc, char** argv) { } ssp->scan_tables(); - log_messages.printf( - SCHED_MSG_LOG::MSG_NORMAL, - "feeder: read " + log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL, + "read " "%d platforms, " "%d apps, " "%d app_versions\n", @@ -645,6 +647,9 @@ int main(int argc, char** argv) { ssp->napps, ssp->napp_versions ); + log_messages.printf(SCHED_MSG_LOG::MSG_NORMAL, + "Using %d job slots\n", ssp->max_wu_results + ); app_indices = (int*) calloc(ssp->max_wu_results, sizeof(int)); diff --git a/sched/sample_work_generator.C b/sched/sample_work_generator.C index 0bfa8dd0d5..e06b0733e6 100644 --- a/sched/sample_work_generator.C +++ b/sched/sample_work_generator.C @@ -33,6 +33,7 @@ #include "error_numbers.h" #include "backend_lib.h" #include "parse.h" +#include "util.h" #include "sched_config.h" #include "sched_util.h" diff --git a/sched/server_types.C b/sched/server_types.C index a4be2770c2..1de402fe1f 100644 --- a/sched/server_types.C +++ b/sched/server_types.C @@ -25,6 +25,7 @@ using namespace std; #include "parse.h" #include "error_numbers.h" #include "str_util.h" +#include "util.h" #include "main.h" #include "sched_util.h" #include "sched_msgs.h"