back end: add keyword-based component to job scheduling score.

- add DB field for storing job keywords: workunit.keywords
    add this to various DB parse/write functions
- add --keywords option to create_work for specifying job keywords
- add <keyword_sched> option in config.xml for enabling keyword score
    (it's disabled by default).
    If set, increment score for "yes" keyword matches,
    and disallow jobs with "no" matches
- in scheduler, add array job_keywords_array for parsed versions
    of job keywords (vector<int>)

also:
- use symbols instead of numbers for slow_check() return values
- parse unused fields in req message to remove unparsed-XML warnings
This commit is contained in:
David Anderson 2017-07-22 00:48:38 -07:00
parent 6fc116012b
commit 20d07be2b8
20 changed files with 132 additions and 43 deletions

View File

@ -901,7 +901,8 @@ void DB_WORKUNIT::db_print(char* buf){
"fileset_id=%lu, "
"app_version_id=%ld, "
"transitioner_flags=%d, "
"size_class=%d ",
"size_class=%d, "
"keywords='%s' ",
create_time, appid,
name, xml_doc, batch,
rsc_fpops_est, rsc_fpops_bound, rsc_memory_bound, rsc_disk_bound,
@ -921,7 +922,8 @@ void DB_WORKUNIT::db_print(char* buf){
fileset_id,
app_version_id,
transitioner_flags,
size_class
size_class,
keywords
);
}
@ -944,7 +946,8 @@ void DB_WORKUNIT::db_print_values(char* buf) {
"%lu, "
"%ld, "
"%d, "
"%d)",
"%d, "
"'%s')",
create_time, appid,
name, xml_doc, batch,
rsc_fpops_est, rsc_fpops_bound,
@ -965,7 +968,8 @@ void DB_WORKUNIT::db_print_values(char* buf) {
fileset_id,
app_version_id,
transitioner_flags,
size_class
size_class,
keywords
);
}
@ -1005,6 +1009,7 @@ void DB_WORKUNIT::db_parse(MYSQL_ROW &r) {
app_version_id = atol(r[i++]);
transitioner_flags = atoi(r[i++]);
size_class = atoi(r[i++]);
strcpy2(keywords, r[i++]);
}
void DB_CREDITED_JOB::db_print(char* buf){
@ -2020,6 +2025,7 @@ void WORK_ITEM::parse(MYSQL_ROW& r) {
wu.app_version_id = atol(r[i++]);
wu.transitioner_flags = atoi(r[i++]);
wu.size_class = atoi(r[i++]);
strcpy2(wu.keywords, r[i++]);
}
int DB_WORK_ITEM::enumerate(

View File

@ -18,7 +18,14 @@
#ifndef _BOINC_DB_
#define _BOINC_DB_
// Structures corresponding to database records.
// Structures passed to and from DB queries.
//
// Mostly these correspond to DB tables, and inherit structs
// defined in boinc_db_types.h
// But some of them - TRANSITIONER_ITEM, STATE_COUNTS, SCHED_RESULT_ITEM, etc. -
// combine the info from multiple tables (from joins)
// or have subsets of table data.
//
// Some of these types have counterparts in client/types.h,
// but don't be deceived - client and server have different variants.

View File

@ -480,9 +480,9 @@ struct WORKUNIT {
int size_class;
// -1 means none; encode this here so that transitioner
// doesn't have to look up app
char keywords[256];
// keywords, as space-separated integers
// the following not used in the DB
char app_name[256];
void clear();
WORKUNIT(){clear();}
};

View File

@ -256,6 +256,7 @@ create table workunit (
app_version_id integer not null,
transitioner_flags tinyint not null,
size_class smallint not null default -1,
keywords varchar(254) not null,
primary key (id)
) engine=InnoDB;

View File

@ -1049,6 +1049,12 @@ function update_6_13_2017() {
");
}
function update_7_21_2017() {
do_query("alter table workunit
add column keywords varchar(254) not null
");
}
// Updates are done automatically if you use "upgrade".
//
// If you need to do updates manually,
@ -1098,6 +1104,7 @@ $db_updates = array (
array(27015, "update_2_17_2017"),
array(27016, "update_3_17_2017"),
array(27017, "update_6_13_2017"),
array(27018, "update_7_21_2017"),
);
?>

View File

@ -18,7 +18,6 @@
// utility functions for keywords
#include <stdio.h>
#include <algorithm>
#include "parse.h"
#include "keyword.h"
@ -54,15 +53,13 @@ void USER_KEYWORDS::write(FILE* f) {
fprintf(f, "</user_keywords>\n");
}
double keyword_score(USER_KEYWORDS& user_keywords, JOB_KEYWORDS& job_keywords) {
double score = 0;
for (unsigned int i=0; i<job_keywords.ids.size(); i++) {
int jk = job_keywords.ids[i];
if (std::find(user_keywords.yes.begin(), user_keywords.yes.end(), jk) != user_keywords.yes.end()) {
score += 1;
} else if (std::find(user_keywords.no.begin(), user_keywords.no.end(), jk) != user_keywords.no.end()) {
return -1;
}
void JOB_KEYWORDS::parse_str(char* buf) {
char* p = strtok(buf, " ");
if (!p) return;
ids.push_back(atoi(p));
while (true) {
p = strtok(NULL, " ");
if (!p) break;
ids.push_back(atoi(p));
}
return score;
}

View File

@ -17,6 +17,9 @@
// utility classes for keywords
#ifndef BOINC_KEYWORD_H
#define BOINC_KEYWORD_H
#include <vector>
#include "parse.h"
@ -24,12 +27,12 @@ struct USER_KEYWORDS {
std::vector<int> yes;
std::vector<int> no;
int parse(XML_PARSER&);
void clear() {
inline void clear() {
yes.clear();
no.clear();
}
void write(FILE*);
bool empty() {
inline bool empty() {
return yes.empty() && no.empty();
}
};
@ -38,6 +41,12 @@ struct JOB_KEYWORDS {
std::vector<int> ids;
void parse_str(char*);
// parse space-separated list
inline bool empty() {
return ids.empty();
}
inline void clear() {
ids.clear();
}
};
extern double keyword_score(USER_KEYWORDS&, JOB_KEYWORDS&);
#endif

View File

@ -42,6 +42,10 @@ extern char* precision_time_to_string(double);
extern void secs_to_hmsf(double, char*);
extern std::string timediff_format(double);
inline bool empty(char* p) {
return p[0] == 0;
}
inline bool ends_with(std::string const& s, std::string const& suffix) {
return
s.size()>=suffix.size() &&

View File

@ -167,6 +167,7 @@ cgi_sources = \
sched_customize.cpp \
sched_files.cpp \
sched_hr.cpp \
sched_keyword.cpp \
sched_limit.cpp \
sched_locality.cpp \
sched_main.cpp \

View File

@ -375,9 +375,10 @@ int wu_is_infeasible_fast(
// Do checks that require DB access for whether we can send this job,
// and return:
// 0 if OK to send
// 1 if can't send to this host
// 2 if can't send to ANY host
// CHECK_OK if OK to send
// CHECK_NO_HOST if can't send to this host
// CHECK_NO_ANY if can't send to ANY host
// e.g. WU error mask is nonzero
//
int slow_check(
WU_RESULT& wu_result, // the job cache entry.
@ -402,7 +403,7 @@ int slow_check(
log_messages.printf(MSG_CRITICAL,
"send_work: can't get result count (%s)\n", boincerror(retval)
);
return 1;
return CHECK_NO_HOST;
} else {
if (n>0) {
if (config.debug_send_job) {
@ -411,7 +412,7 @@ int slow_check(
g_reply->user.id, n, wu.id
);
}
return 1;
return CHECK_NO_HOST;
}
}
} else if (config.one_result_per_host_per_wu) {
@ -426,7 +427,7 @@ int slow_check(
log_messages.printf(MSG_CRITICAL,
"send_work: can't get result count (%s)\n", boincerror(retval)
);
return 1;
return CHECK_NO_HOST;
} else {
if (n>0) {
if (config.debug_send_job) {
@ -435,7 +436,7 @@ int slow_check(
g_reply->host.id, n, wu.id
);
}
return 1;
return CHECK_NO_HOST;
}
}
}
@ -454,13 +455,13 @@ int slow_check(
log_messages.printf(MSG_CRITICAL,
"can't get fields for [WU#%lu]: %s\n", db_wu.id, boincerror(retval)
);
return 1;
return CHECK_NO_HOST;
}
// check wu.error_mask
//
if (vals[2] != 0) {
return 2;
return CHECK_NO_ANY;
}
if (app_hr_type(*app)) {
@ -477,7 +478,7 @@ int slow_check(
// are processed first.
//
wu_result.infeasible_count++;
return 1;
return CHECK_NO_HOST;
}
}
if (app->homogeneous_app_version) {
@ -491,11 +492,11 @@ int slow_check(
);
}
wu_result.infeasible_count++;
return 1;
return CHECK_NO_HOST;
}
}
}
return 0;
return CHECK_OK;
}
// Check for pathological conditions that mean

View File

@ -22,6 +22,8 @@
#include "sched_shmem.h"
#include "sched_types.h"
// return values from wu_is_infeasible_fast()
//
#define INFEASIBLE_MEM 1
#define INFEASIBLE_DISK 2
#define INFEASIBLE_CPU 3
@ -42,7 +44,15 @@ extern int wu_is_infeasible_fast(
int res_server_state, int res_priority, double res_report_deadline,
APP&, BEST_APP_VERSION&
);
// return values from slow_check()
//
#define CHECK_OK 0
#define CHECK_NO_HOST 1
#define CHECK_NO_ANY 2
extern int slow_check(WU_RESULT&, APP*, BEST_APP_VERSION*);
extern bool result_still_sendable(DB_RESULT& result, WORKUNIT& wu);
extern bool app_not_selected(int appid);

View File

@ -298,6 +298,7 @@ int SCHED_CONFIG::parse(FILE* f) {
if (xp.parse_double("version_select_random_factor", version_select_random_factor)) continue;
if (xp.parse_double("maintenance_delay", maintenance_delay)) continue;
if (xp.parse_bool("credit_by_app", credit_by_app)) continue;
if (xp.parse_bool("keyword_sched", keyword_sched)) continue;
//////////// SCHEDULER LOG FLAGS /////////
@ -308,6 +309,7 @@ int SCHED_CONFIG::parse(FILE* f) {
if (xp.parse_bool("debug_edf_sim_workload", debug_edf_sim_workload)) continue;
if (xp.parse_bool("debug_fcgi", debug_fcgi)) continue;
if (xp.parse_bool("debug_handle_results", debug_handle_results)) continue;
if (xp.parse_bool("debug_keyword", debug_keyword)) continue;
if (xp.parse_bool("debug_locality", debug_locality)) continue;
if (xp.parse_bool("debug_locality_lite", debug_locality_lite)) continue;
if (xp.parse_bool("debug_prefs", debug_prefs)) continue;

View File

@ -180,6 +180,8 @@ struct SCHED_CONFIG {
// to calculate projected_flops when choosing version.
bool credit_by_app;
// store per-app credit info in credit_user and credit_team
bool keyword_sched;
// score jobs based on keywords
// time intervals
double maintenance_delay;
@ -194,6 +196,7 @@ struct SCHED_CONFIG {
bool debug_fcgi;
bool debug_client_files; // stuff related to sticky files on client
bool debug_handle_results;
bool debug_keyword;
bool debug_locality; // locality scheduling
bool debug_locality_lite; // locality scheduling Lite
bool debug_prefs;

View File

@ -16,23 +16,32 @@
// along with BOINC. If not, see <http://www.gnu.org/licenses/>.
// code related to keyword-based job scoring
//
// A job's keywords are stored in workunit.keywords as a char string.
// We don't want to parse that every time we score the job,
// so we maintain a list of JOB_KEYWORDs paralleling the job array.
#include <algorithm>
#include <iterator>
#include "sched_main.h"
#include "keyword.h"
JOB_KEYWORDS *job_keywords_array;
// compute the score increment for the given job and user keywords
// (or -1 if the keywords are incompatible)
//
double keyword_score_aux(
USER_KEYWORDS& user_keywords, JOB_KEYWORDS& job_keywords
USER_KEYWORDS& uks, JOB_KEYWORDS& jks
) {
double score = 0;
for (unsigned int i=0; i<job_keywords.ids.size(); i++) {
int jk = job_keywords.ids[i];
if (std::find(user_keywords.yes.begin(), user_keywords.yes.end(), jk) != user_keywords.yes.end()) {
for (unsigned int i=0; i<jks.ids.size(); i++) {
int jk = jks.ids[i];
if (std::find(uks.yes.begin(), uks.yes.end(), jk) != uks.yes.end()) {
score += 1;
} else if (std::find(user_keywords.no.begin(), user_keywords.no.end(), jk) != user_keywords.no.end()) {
} else if (std::find(uks.no.begin(), uks.no.end(), jk) != uks.no.end()) {
return -1;
}
}
@ -76,6 +85,8 @@ void keyword_sched_remove_job(int i) {
job_keywords_array[i].clear();
}
// called at CGI start to initialize job keyword array
//
void keyword_sched_init() {
job_keywords_array = new JOB_KEYWORDS[ssp->max_wu_results];
}

View File

@ -18,7 +18,11 @@
#ifndef BOINC_SCHED_KEYWORD_H
#define BOINC_SCHED_KEYWORD_H
// see sched_keywork.cpp
extern double keyword_score(int);
extern void keyword_sched_remove_job(int index);
extern void keyword_sched_init();
#endif

View File

@ -61,6 +61,7 @@
#include "handle_request.h"
#include "sched_config.h"
#include "sched_files.h"
#include "sched_keyword.h"
#include "sched_msgs.h"
#include "sched_types.h"
#include "sched_util.h"
@ -543,6 +544,9 @@ int main(int argc, char** argv) {
send_message("Server error: can't attach shared memory", config.maintenance_delay);
goto done;
}
if (config.keyword_sched) {
keyword_sched_init();
}
if (strlen(config.debug_req_reply_dir)) {
struct stat statbuf;

View File

@ -31,6 +31,7 @@
#include "sched_check.h"
#include "sched_config.h"
#include "sched_hr.h"
#include "sched_keyword.h"
#include "sched_main.h"
#include "sched_msgs.h"
#include "sched_send.h"
@ -54,7 +55,8 @@ static int get_size_class(APP& app, double es) {
// Also do some initial screening,
// and return false if can't send the job to host
//
bool JOB::get_score(WU_RESULT& wu_result) {
bool JOB::get_score(int array_index) {
WU_RESULT& wu_result = ssp->wu_results[array_index];
score = 0;
if (!app->beta && wu_result.need_reliable) {
@ -130,6 +132,14 @@ bool JOB::get_score(WU_RESULT& wu_result) {
score += wu_result.res_priority;
if (config.keyword_sched) {
double x = keyword_score(array_index);
if (x < 0) {
return false;
}
score += x;
}
if (config.debug_send_job) {
log_messages.printf(MSG_NORMAL,
"[send_job]: score %f for result %lu\n", score, wu_result.resultid
@ -215,7 +225,7 @@ void send_work_score_type(int rt) {
job.index = i;
job.result_id = wu_result.resultid;
if (!job.get_score(wu_result)) {
if (!job.get_score(i)) {
if (config.debug_send_job) {
log_messages.printf(MSG_NORMAL,
"[send_job] [RESULT#%lu] get_score() returned false\n",
@ -298,11 +308,14 @@ void send_work_score_type(int rt) {
sema_locked = false;
switch (slow_check(wu_result, job.app, job.bavp)) {
case 1:
case CHECK_NO_HOST:
wu_result.state = WR_STATE_PRESENT;
break;
case 2:
case CHECK_NO_ANY:
wu_result.state = WR_STATE_EMPTY;
if (config.keyword_sched) {
keyword_sched_remove_job(job.index);
}
break;
default:
// slow_check() refreshes fields of wu_result.workunit;
@ -315,6 +328,7 @@ void send_work_score_type(int rt) {
// (since otherwise feeder might overwrite it)
//
wu_result.state = WR_STATE_EMPTY;
keyword_sched_remove_job(job.index);
// reread result from DB, make sure it's still unsent
// TODO: from here to end of add_result_to_reply()

View File

@ -24,7 +24,7 @@ struct JOB {
APP* app;
BEST_APP_VERSION* bavp;
bool get_score(WU_RESULT&);
bool get_score(int);
};
extern void send_work_score();

View File

@ -1373,6 +1373,7 @@ int HOST::parse(XML_PARSER& xp) {
p_ncpus = 1;
double dtemp;
string stemp;
int x;
while (!xp.get_tag()) {
if (xp.match_tag("/host_info")) return 0;
if (xp.parse_int("timezone", timezone)) continue;
@ -1404,6 +1405,10 @@ int HOST::parse(XML_PARSER& xp) {
continue;
}
// unused fields
//
if (xp.parse_int("n_usable_coprocs", x)) continue;
// parse deprecated fields to avoid error messages
//
if (xp.parse_double("p_calculated", dtemp)) continue;

View File

@ -62,6 +62,7 @@ void usage() {
" [ -d n ]\n"
" [ --delay_bound x ]\n"
" [ --hr_class n ]\n"
" [ --keywords 'n1 n2 ...' ]\n"
" [ --max_error_results n ]\n"
" [ --max_success_results n ]\n"
" [ --max_total_results n ]\n"
@ -336,6 +337,8 @@ int main(int argc, char** argv) {
verbose = true;
} else if (arg(argv, i, "continue_on_error")) {
continue_on_error = true;
} else if (arg(argv, i, "keywords")) {
strcpy(jd.wu.keywords, argv[++i]);
} else {
if (!strncmp("-", argv[i], 1)) {
fprintf(stderr, "create_work: bad argument '%s'\n", argv[i]);