Condor: implement asynchronous mode in BOINC GAHP; from Jaime

This commit is contained in:
David Anderson 2013-08-09 13:41:58 -07:00
parent 53f97f9cf1
commit 37e822fe3a
1 changed files with 46 additions and 18 deletions

View File

@ -45,6 +45,19 @@ char project_url[256];
char authenticator[256];
char response_prefix[256];
// The following are used to implement "asynchronous mode",
// in which we send "R" when a command has completed.
// Synchronization is needed in case 2 commands complete around the same time.
//
pthread_mutex_t io_lock = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
bool wrote_r = false;
bool async_mode = false;
#define BPRINTF(fmt, ...) \
pthread_mutex_lock(&io_lock); \
printf( "%s" fmt, response_prefix, ##__VA_ARGS__ ); \
pthread_mutex_unlock(&io_lock);
bool debug_mode = false;
// if set, handle commands synchronously rather than
// handling them in separate threads
@ -549,6 +562,13 @@ void* handle_command_aux(void* q) {
} else {
c.out = strdup("Unknown command");
}
pthread_mutex_lock(&io_lock);
if ( async_mode && !wrote_r ) {
BPRINTF("R\n");
fflush(stdout);
wrote_r = true;
}
pthread_mutex_unlock(&io_lock);
return NULL;
}
@ -580,8 +600,9 @@ int COMMAND::parse_command() {
return retval;
}
void print_version() {
printf("$GahpVersion: 1.0 %s BOINC\\ GAHP $\n", __DATE__);
void print_version(bool startup) {
BPRINTF("%s$GahpVersion: 1.0 %s BOINC\\ GAHP $\n", startup ? "" : "S ",
__DATE__);
}
int n_results() {
@ -604,38 +625,48 @@ int handle_command(char* p) {
sscanf(p, "%s", cmd);
if (!strcasecmp(cmd, "VERSION")) {
printf("S ");
print_version();
print_version(false);
} else if (!strcasecmp(cmd, "COMMANDS")) {
printf("S ASYNC_MODE_OFF ASYNC_MODE_ON BOINC_ABORT_JOBS BOINC_FETCH_OUTPUT BOINC_PING BOINC_QUERY_BATCHES BOINC_RETIRE_BATCH BOINC_SELECT_PROJECT BOINC_SUBMIT COMMANDS QUIT RESULTS VERSION\n");
BPRINTF("S ASYNC_MODE_OFF ASYNC_MODE_ON BOINC_ABORT_JOBS BOINC_FETCH_OUTPUT BOINC_PING BOINC_QUERY_BATCHES BOINC_RETIRE_BATCH BOINC_SELECT_PROJECT BOINC_SUBMIT COMMANDS QUIT RESULTS VERSION\n");
} else if (!strcasecmp(cmd, "RESPONSE_PREFIX")) {
printf("S\n");
pthread_mutex_lock(&io_lock);
BPRINTF("S\n");
strcpy(response_prefix, p+strlen("RESPONSE_PREFIX "));
pthread_mutex_unlock(&io_lock);
} else if (!strcasecmp(cmd, "ASYNC_MODE_ON")) {
printf("S\n");
pthread_mutex_lock(&io_lock);
BPRINTF("S\n");
async_mode = true;
pthread_mutex_unlock(&io_lock);
} else if (!strcasecmp(cmd, "ASYNC_MODE_OFF")) {
printf("S\n");
pthread_mutex_lock(&io_lock);
BPRINTF("S\n");
async_mode = false;
pthread_mutex_unlock(&io_lock);
} else if (!strcasecmp(cmd, "QUIT")) {
exit(0);
} else if (!strcasecmp(cmd, "RESULTS")) {
printf("S %d\n", n_results());
pthread_mutex_lock(&io_lock);
BPRINTF("S %d\n", n_results());
vector<COMMAND*>::iterator i = commands.begin();
while (i != commands.end()) {
COMMAND *c2 = *i;
if (c2->out) {
printf("%s%d %s\n", response_prefix, c2->id, c2->out);
BPRINTF("%d %s\n", c2->id, c2->out);
delete c2;
i = commands.erase(i);
} else {
i++;
}
}
wrote_r = false;
pthread_mutex_unlock(&io_lock);
} else if (!strcasecmp(cmd, "BOINC_SELECT_PROJECT")) {
int n = sscanf(p, "%s %s %s", cmd, project_url, authenticator);
if (n ==3) {
printf("S\n");
BPRINTF("S\n");
} else {
printf("E\n");
BPRINTF("E\n");
}
} else {
// asynchronous commands go here
@ -644,13 +675,13 @@ int handle_command(char* p) {
p = NULL;
int retval = cp->parse_command();
if (retval) {
printf("E\n");
BPRINTF("E\n");
delete cp;
return 0;
}
if (debug_mode) {
handle_command_aux(cp);
printf("result: %s\n", cp->out);
BPRINTF("result: %s\n", cp->out);
delete cp;
} else {
printf("S\n");
@ -722,14 +753,11 @@ void read_config() {
int main() {
read_config();
strcpy(response_prefix, "");
print_version();
print_version(true);
fflush(stdout);
while (1) {
char* p = get_cmd();
if (p == NULL) break;
if (strlen(response_prefix)) {
printf("%s", response_prefix);
}
handle_command(p);
fflush(stdout);
}