Merge pull request #1401 from BOINC/assimilator_enhancements

Daemons: enhance assimilator framework
This commit is contained in:
Christian Beer 2015-10-30 14:34:43 +01:00
commit 6cf316daf5
6 changed files with 127 additions and 59 deletions

View File

@ -1,6 +1,6 @@
// This file is part of BOINC.
// http://boinc.berkeley.edu
// Copyright (C) 2008 University of California
// Copyright (C) 2015 University of California
//
// BOINC is free software; you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License
@ -34,7 +34,5 @@ extern int assimilate_handler(
RESULT& // the canonical instance
);
extern int g_argc;
extern char** g_argv;
extern char* results_prefix;
extern char* transcripts_prefix;
extern int assimilate_handler_init(int argc, char** argv);
extern void assimilate_handler_usage();

View File

@ -1,6 +1,6 @@
// This file is part of BOINC.
// http://boinc.berkeley.edu
// Copyright (C) 2014 University of California
// Copyright (C) 2015 University of California
//
// BOINC is free software; you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License
@ -46,16 +46,11 @@ using std::vector;
#define SLEEP_INTERVAL 10
bool update_db = true;
bool noinsert = false;
int wu_id_modulus=0, wu_id_remainder=0;
int sleep_interval = SLEEP_INTERVAL;
int one_pass_N_WU=0;
int g_argc;
char** g_argv;
char* results_prefix = NULL;
char* transcripts_prefix = NULL;
void usage(char** argv) {
void usage(char* name) {
fprintf(stderr,
"This program is an 'assimilator'; it handles completed jobs.\n"
"Normally it is run as a daemon from config.xml.\n"
@ -70,13 +65,13 @@ void usage(char** argv) {
" [--one_pass] Do one DB enumeration, then exit\n"
" [--one_pass_N_WU N] Process at most N jobs\n"
" [-d | --debug_level N] Set verbosity level (1 to 4)\n"
" [--dont_update_db] Don't update DB (for testing)\n"
" [--noinsert] Don't insert records in app-specific DB\n"
" [--dont_update_db] Don't update BOINC DB (for testing)\n"
" [-h | --help] Show this\n"
" [-v | --version] Show version information\n",
argv[0]
" [-v | --version] Show version information\n"
"\n",
name
);
exit(0);
assimilate_handler_usage();
}
// assimilate all WUs that need it
@ -100,7 +95,7 @@ bool do_pass(APP& app) {
}
sprintf(buf,
"where appid=%lu and assimilate_state=%d %s limit %d",
"where appid=%ld and assimilate_state=%d %s limit %d",
app.id, ASSIMILATE_READY, mod_clause,
one_pass_N_WU ? one_pass_N_WU : 1000
);
@ -127,7 +122,7 @@ bool do_pass(APP& app) {
"[%s] assimilating WU %lu; state=%d\n", wu.name, wu.id, wu.assimilate_state
);
sprintf(buf, "where workunitid=%lu", wu.id);
sprintf(buf, "where workunitid=%ld", wu.id);
canonical_result.clear();
bool found = false;
while (1) {
@ -206,6 +201,13 @@ bool do_pass(APP& app) {
return did_something;
}
void missing_argument(char* name, char* arg) {
log_messages.printf(MSG_CRITICAL,
"%s requires an argument\n\n", arg
);
usage(name);
}
int main(int argc, char** argv) {
int retval;
bool one_pass = false;
@ -215,22 +217,38 @@ int main(int argc, char** argv) {
strcpy(app.name, "");
check_stop_daemons();
g_argc = argc;
g_argv = argv;
int j=1;
for (i=1; i<argc; i++) {
if (is_arg(argv[i], "one_pass_N_WU")) {
one_pass_N_WU = atoi(argv[++i]);
if (!argv[++i]) {
missing_argument(argv[0], argv[--i]);
exit(1);
}
one_pass_N_WU = atoi(argv[i]);
one_pass = true;
} else if (is_arg(argv[i], "sleep_interval")) {
sleep_interval = atoi(argv[++i]);
if (!argv[++i]) {
missing_argument(argv[0], argv[--i]);
exit(1);
}
sleep_interval = atoi(argv[i]);
} else if (is_arg(argv[i], "one_pass")) {
one_pass = true;
} else if (is_arg(argv[i], "d") || is_arg(argv[i], "debug_level")) {
int dl = atoi(argv[++i]);
if (!argv[++i]) {
missing_argument(argv[0], argv[--i]);
exit(1);
}
int dl = atoi(argv[i]);
log_messages.set_debug_level(dl);
if (dl ==4) g_print_queries = true;
} else if (is_arg(argv[i], "app")) {
safe_strcpy(app.name, argv[++i]);
if (!argv[++i]) {
missing_argument(argv[0], argv[--i]);
exit(1);
}
safe_strcpy(app.name, argv[i]);
} else if (is_arg(argv[i], "dont_update_db")) {
// This option is for testing your assimilator. When set,
// it ensures that the assimilator does not actually modify
@ -238,31 +256,32 @@ int main(int argc, char** argv) {
// your assimilator over and over again without affecting
// your project.
update_db = false;
} else if (is_arg(argv[i], "noinsert")) {
// This option is also for testing and is used to
// prevent the inserting of results into the *backend*
// (as opposed to the boinc) DB.
noinsert = true;
} else if (is_arg(argv[i], "mod")) {
wu_id_modulus = atoi(argv[++i]);
wu_id_remainder = atoi(argv[++i]);
if (!argv[++i]) {
missing_argument(argv[0], argv[--i]);
exit(1);
}
wu_id_modulus = atoi(argv[i]);
if (!argv[++i]) {
missing_argument(argv[0], argv[--i]);
exit(1);
}
wu_id_remainder = atoi(argv[i]);
} else if (is_arg(argv[i], "help") || is_arg(argv[i], "h")) {
usage(argv);
usage(argv[0]);
exit(0);
} else if (is_arg(argv[i], "v") || is_arg(argv[i], "version")) {
printf("%s\n", SVN_VERSION);
exit(0);
} else if (is_arg(argv[i], "results_prefix")) {
results_prefix=argv[++i];
} else if (is_arg(argv[i], "transcripts_prefix")) {
transcripts_prefix=argv[++i];
} else {
// project-specific part might parse extra args
//log_messages.printf(MSG_CRITICAL, "Unrecognized arg: %s\n", argv[i]);
// unknown arg - pass to handler
argv[j++] = argv[i];
}
}
if (!strlen(app.name)) {
usage(argv);
usage(argv[0]);
exit(1);
}
if (wu_id_modulus) {
@ -280,19 +299,24 @@ int main(int argc, char** argv) {
exit(1);
}
log_messages.printf(MSG_NORMAL, "Starting\n");
retval = boinc_db.open(config.db_name, config.db_host, config.db_user, config.db_passwd);
if (retval) {
log_messages.printf(MSG_CRITICAL, "Can't open DB\n");
log_messages.printf(MSG_CRITICAL, "boinc_db.open failed: %s\n", boincerror(retval));
exit(1);
}
sprintf(buf, "where name='%s'", app.name);
retval = app.lookup(buf);
if (retval) {
log_messages.printf(MSG_CRITICAL, "Can't find app\n");
log_messages.printf(MSG_CRITICAL, "Can't find app: %s\n", app.name);
exit(1);
}
argv[j] = 0;
retval = assimilate_handler_init(j, argv);
if (retval) exit(1);
log_messages.printf(MSG_NORMAL, "Starting assimilator handler\n");
install_stop_signal_handler();
do {
if (!do_pass(app)) {

View File

@ -1,6 +1,6 @@
// This file is part of BOINC.
// http://boinc.berkeley.edu
// Copyright (C) 2008 University of California
// Copyright (C) 2015 University of California
//
// BOINC is free software; you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License
@ -29,6 +29,7 @@
#include "sched_msgs.h"
#include "validate_util.h"
#include "sched_config.h"
#include "assimilate_handler.h"
using std::vector;
using std::string;
@ -44,6 +45,19 @@ int write_error(char* p) {
return 0;
}
int assimilate_handler_init(int argc, char** argv) {
// handle project specific arguments here
return 0;
}
void assimilate_handler_usage() {
// describe the project specific arguments here
//fprintf(stderr,
// " Custom options:\n"
// " [--project_option X] a project specific option\n"
//);
}
int assimilate_handler(
WORKUNIT& wu, vector<RESULT>& /*results*/, RESULT& canonical_result
) {

View File

@ -1,6 +1,6 @@
// This file is part of BOINC.
// http://boinc.berkeley.edu
// Copyright (C) 2008 University of California
// Copyright (C) 2015 University of California
//
// BOINC is free software; you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License
@ -31,6 +31,19 @@
using std::vector;
using std::string;
int assimilate_handler_init(int argc, char** argv) {
// handle project specific arguments here
return 0;
}
void assimilate_handler_usage() {
// describe the project specific arguments here
//fprintf(stderr,
// " Custom options:\n"
// " [--project_option X] a project specific option\n"
//);
}
int assimilate_handler(
WORKUNIT& wu, vector<RESULT>& /*results*/, RESULT& canonical_result
) {

View File

@ -1,6 +1,6 @@
// This file is part of BOINC.
// http://boinc.berkeley.edu
// Copyright (C) 2014 University of California
// Copyright (C) 2015 University of California
//
// BOINC is free software; you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License
@ -47,6 +47,7 @@
#include "boinc_db.h"
#include "error_numbers.h"
#include "sched_msgs.h"
#include "sched_util.h"
#include "validate_util.h"
#include "validator.h"
#include "sched_config.h"
@ -54,13 +55,13 @@
using std::vector;
using std::string;
bool first = true;
vector<string> script;
void parse_cmdline() {
for (int i=1; i<g_argc; i++) {
if (!strcmp(g_argv[i], "--script")) {
script = split(g_argv[++i], ' ');
int assimilate_handler_init(int argc, char** argv) {
// handle project specific arguments here
for (int i=1; i<argc; i++) {
if (is_arg(argv[i], "script")) {
script = split(argv[++i], ' ');
if (script.size() == 1) {
script.push_back("wu_id");
script.push_back("files");
@ -71,8 +72,18 @@ void parse_cmdline() {
log_messages.printf(MSG_CRITICAL,
"script name missing from command line\n"
);
exit(1);
return 1;
}
return 0;
}
void assimilate_handler_usage() {
// describe the project specific arguments here
fprintf(stderr,
" Custom options:\n"
" --script \"X\" call script to assimilate job\n"
" see comment in script_assimilator.cpp for details\n"
);
}
int assimilate_handler(
@ -82,11 +93,6 @@ int assimilate_handler(
char cmd[4096], buf[256];
unsigned int i, j;
if (first) {
parse_cmdline();
first = false;
}
if (wu.canonical_resultid) {
sprintf(cmd, "../bin/%s", script[0].c_str());
vector<string> paths;

View File

@ -1,6 +1,6 @@
// This file is part of BOINC.
// http://boinc.berkeley.edu
// Copyright (C) 2008 University of California
// Copyright (C) 2015 University of California
//
// BOINC is free software; you can redistribute it and/or modify it
// under the terms of the GNU Lesser General Public License
@ -39,6 +39,19 @@
using std::vector;
using std::string;
int assimilate_handler_init(int argc, char** argv) {
// handle project specific arguments here
return 0;
}
void assimilate_handler_usage() {
// describe the project specific arguments here
//fprintf(stderr,
// " Custom options:\n"
// " [--project_option X] a project specific option\n"
//);
}
int assimilate_handler(
WORKUNIT& wu, vector<RESULT>& /*results*/, RESULT& canonical_result
) {