diff --git a/dcapi/boinc/client.C b/dcapi/boinc/client.C index c3c367519d..bf8fef46bc 100644 --- a/dcapi/boinc/client.C +++ b/dcapi/boinc/client.C @@ -1,3 +1,11 @@ +/* + * client.c - Client side of the BOINC DC-API backend + * + * Authors: + * Gábor Gombás + * + * Copyright (c) 2006 MTA SZTAKI + */ #ifdef HAVE_CONFIG_H #include #endif @@ -25,6 +33,19 @@ #include "common_defs.h" +/******************************************************************** + * Constants + */ + +typedef enum { + STATE_NORMAL, + STATE_SUSPEND, + STATE_FINISH +} client_event_state; + +#define LAST_CKPT_FILE "dc_lastckpt" + + /******************************************************************** * Global variables */ @@ -37,8 +58,13 @@ static char *last_complete_ckpt; static char active_ckpt[PATH_MAX]; /* Number of subresults already sent */ static int subresult_cnt; +/* Maximum number of allowed subresults */ +static int max_subresults; /* Name of the current work unit */ static char wu_name[256]; +/* State machine for suspend */ +static client_event_state client_state; + /******************************************************************** * Common API functions @@ -54,18 +80,19 @@ int DC_getMaxMessageSize(void) int DC_getMaxSubresults(void) { struct dirent *d; - int nsubs; DIR *dir; + if (max_subresults) + return max_subresults - subresult_cnt; + dir = opendir("."); if (!dir) return 0; - nsubs = 0; while ((d = readdir(dir))) if (!strncasecmp(d->d_name, SUBRESULT_PFX, strlen(SUBRESULT_PFX))) - nsubs++; + max_subresults++; closedir(dir); - return nsubs; + return max_subresults; } DC_GridCapabilities DC_getGridCapabilities(void) @@ -90,9 +117,20 @@ int DC_init(void) if (boinc_init()) return DC_ERR_INTERNAL; - if (!boinc_resolve_filename(CKPT_LABEL_IN, path, sizeof(path))) + /* Check if we are starting from a checkpoint file */ + if ((f = boinc_fopen(LAST_CKPT_FILE, "r"))) + { + fgets(path, sizeof(path), f); + fclose(f); + if (strlen(path)) + last_complete_ckpt = strdup(path); + } + /* If the application did not generate a checkpoint file before, check + * if the master sent one */ + else if (!boinc_resolve_filename(CKPT_LABEL_IN, path, sizeof(path))) last_complete_ckpt = strdup(path); + /* Extract the WU name from init_data.xml */ ret = stat("init_data.xml", &st); if (ret) return DC_ERR_INTERNAL; @@ -111,6 +149,18 @@ int DC_init(void) if (!ret) return DC_ERR_INTERNAL; + /* Parse the config file if the master sent one */ + buf = DC_resolveFileName(DC_FILE_IN, DC_CONFIG_FILE); + if (buf && access(buf, R_OK)) + { + ret = _DC_parseCfg(buf); + if (ret) + return ret; + } + free(buf); + + DC_log(LOG_DEBUG, "DC-API initializef for work unit %s", wu_name); + return 0; } @@ -148,14 +198,31 @@ char *DC_resolveFileName(DC_FileType type, const char *logicalFileName) int DC_sendResult(const char *logicalFileName, const char *path, DC_FileMode mode) { - char label[32], new_path[PATH_MAX], msg[PATH_MAX]; - std::string str; + char label[32], new_path[PATH_MAX], msg[PATH_MAX], *p; int ret; + if (!DC_getMaxSubresults()) + { + DC_log(LOG_ERR, "No more subresults are allowed to be sent"); + return DC_ERR_NOTIMPL; + } + + /* We have to use the subresult labels that were defined by the + * master for doing the upload */ snprintf(label, sizeof(label), "%s%d", SUBRESULT_PFX, ++subresult_cnt); if (boinc_resolve_filename(label, new_path, sizeof(new_path))) return DC_ERR_INTERNAL; + /* We need the file name that will appear on the server */ + p = strrchr(new_path, PATH_SEPARATOR[0]); + if (!p) + { + DC_log(LOG_ERR, "Failed to determine the on-server file name " + "of the subresult"); + return DC_ERR_INTERNAL; + } + p++; + switch (mode) { case DC_FILE_REGULAR: @@ -187,7 +254,7 @@ int DC_sendResult(const char *logicalFileName, const char *path, } /* Stupid C++ */ - str = label; + std::string str = label; ret = boinc_upload_file(str); if (ret) { @@ -195,7 +262,8 @@ int DC_sendResult(const char *logicalFileName, const char *path, return DC_ERR_INTERNAL; } - snprintf(msg, sizeof(msg), "%s upload %s", DCAPI_MSG_PFX, label); + snprintf(msg, sizeof(msg), "%s:%s:%s:%s", DCAPI_MSG_PFX, DC_MSG_UPLOAD, + p, logicalFileName); DC_sendMessage(msg); /* If we had to copy volatile files, delete the original only when @@ -207,8 +275,20 @@ int DC_sendResult(const char *logicalFileName, const char *path, int DC_sendMessage(const char *message) { + std::string xml, msg; + int ret; + + msg = message; + xml_escape(msg, xml); + xml = "" + xml + ""; /* We use the WU's name as the variety */ - return boinc_send_trickle_up(wu_name, (char *)message); + ret = boinc_send_trickle_up(wu_name, (char *)xml.c_str()); + if (ret) + { + DC_log(LOG_ERR, "Failed to send trickle-up message"); + return DC_ERR_INTERNAL; + } + return 0; } static DC_Event *new_event(DC_EventType type) @@ -222,14 +302,26 @@ static DC_Event *new_event(DC_EventType type) static DC_Event *handle_special_msg(const char *message) { - /* XXX Implement it */ + if (!strcmp(message, DC_MSG_CANCEL)) + { + client_state = STATE_FINISH; + return new_event(DC_EVENT_FINISH); + } + if (!strcmp(message, DC_MSG_SUSPEND)) + { + client_state = STATE_SUSPEND; + return new_event(DC_EVENT_DO_CHECKPOINT); + } + + DC_log(LOG_WARNING, "Received unknown control message %s", message); + return NULL; } DC_Event *DC_checkEvent(void) { DC_Event *event; - char *buf; + char *buf, *msg; int ret; /* Check for checkpoint requests */ @@ -248,10 +340,26 @@ DC_Event *DC_checkEvent(void) ret = boinc_receive_trickle_down(buf, MAX_MESSAGE_SIZE); if (ret) { - if (!strncmp(buf, DCAPI_MSG_PFX, strlen(DCAPI_MSG_PFX))) + msg = (char *)malloc(MAX_MESSAGE_SIZE); + if (!msg) { - event = handle_special_msg(buf); free(buf); + return NULL; + } + ret = parse_str(buf, "", msg, MAX_MESSAGE_SIZE); + if (!ret) + { + DC_log(LOG_WARNING, "Failed to parse message %s", buf); + free(buf); + free(msg); + return NULL; + } + + if (!strncmp(msg, DCAPI_MSG_PFX, strlen(DCAPI_MSG_PFX))) + { + event = handle_special_msg(msg + + strlen(DCAPI_MSG_PFX) + 1); + free(msg); return event; } @@ -261,6 +369,9 @@ DC_Event *DC_checkEvent(void) } free(buf); + if (client_state == STATE_FINISH) + return new_event(DC_EVENT_FINISH); + return NULL; } @@ -281,10 +392,30 @@ void DC_destroyEvent(DC_Event *event) void DC_checkpointMade(const char *filename) { - /* XXX Compare filename to active_ckpt and warn on mismatch */ + FILE *f; + + if (strcmp(filename, active_ckpt)) + { + DC_log(LOG_ERR, "DC_checkpointMade: bad checkpoint file %s " + "(expected %s)", filename, active_ckpt); + return; + } + + /* Remember which was the last completed checkpoint */ + f = boinc_fopen(LAST_CKPT_FILE, "w"); + if (f) + { + fprintf(f, "%s", filename); + fclose(f); + } + + unlink(last_complete_ckpt); free(last_complete_ckpt); last_complete_ckpt = strdup(filename); boinc_checkpoint_completed(); + + if (client_state == STATE_SUSPEND) + client_state = STATE_FINISH; } void DC_fractionDone(double fraction) @@ -294,7 +425,30 @@ void DC_fractionDone(double fraction) void DC_finish(int exitcode) { + char path[PATH_MAX]; + int ret; + + /* Rename/copy the checkpoint file to the label CKPT_LABEL_OUT + * so it will be uploaded together with the result(s) */ + if (last_complete_ckpt && + !boinc_resolve_filename(CKPT_LABEL_OUT, path, sizeof(path))) + { + ret = link(last_complete_ckpt, path); + if (ret) + _DC_copyFile(last_complete_ckpt, path); + } + + /* Delete files that we have created */ + if (last_complete_ckpt) + unlink(last_complete_ckpt); + if (strlen(active_ckpt)) + unlink(active_ckpt); + unlink(LAST_CKPT_FILE); + boinc_finish(exitcode); + /* We should never get here, but boinc_finish() is not marked + * with "noreturn" so this avoids a GCC warning */ + _exit(exitcode); } /******************************************************************** diff --git a/dcapi/boinc/common_defs.h b/dcapi/boinc/common_defs.h index c23671a3ef..ce9d6dfa1d 100644 --- a/dcapi/boinc/common_defs.h +++ b/dcapi/boinc/common_defs.h @@ -1,4 +1,11 @@ -/* Definitions common for both the server and client side */ +/* + * common_defs.h - Definitions common for both the server and client side + * + * Authors: + * Gábor Gombás + * + * Copyright (c) 2006 MTA SZTAKI + */ #ifndef COMMON_DEFS_H #define COMMON_DEFS_H @@ -13,15 +20,19 @@ extern "C" { #define CKPT_LABEL_IN "dc_checkpoint.in" #define CKPT_LABEL_OUT "dc_checkpoint.out" -/* Name of the client-side config. file */ -#define CLIENTCONF_LABEL "dc_client.conf" - /* Maximum allowed message length */ #define MAX_MESSAGE_SIZE 16384 /* Prefix for internal messages between the client-side and master-side DC-API */ #define DCAPI_MSG_PFX "__dcapi__" +/* Internal message that a subresult has been uploaded */ +#define DC_MSG_UPLOAD "UPLOAD" +/* Internal message telling that the client should suspend */ +#define DC_MSG_SUSPEND "SUSPEND" +/* Internal message for cancelling the client computation */ +#define DC_MSG_CANCEL "CANCEL" + #ifdef __cplusplus } #endif diff --git a/dcapi/boinc/events.C b/dcapi/boinc/events.C index c68387523c..0c4d9f2be3 100644 --- a/dcapi/boinc/events.C +++ b/dcapi/boinc/events.C @@ -1,3 +1,11 @@ +/* + * events.C - BOINC event handling + * + * Authors: + * Gábor Gombás + * + * Copyright (c) 2006 MTA SZTAKI + */ #ifdef HAVE_CONFIG_H #include #endif @@ -10,23 +18,20 @@ #include #include #include +#include -int DC_processEvents(int timeout) +static int process_results(void) { DC_Result *result; DB_WORKUNIT wu; char *query; - - if (!_dc_resultcb || !_dc_subresultcb || !_dc_messagecb) - { - DC_log(LOG_ERR, "DC_processEvents: callbacks are not set up"); - return DC_ERR_CONFIG; - } + int done; /* XXX Check LIMIT value */ query = g_strdup_printf("WHERE name LIKE '%s\\_%%' " "AND assimilate_state = %d LIMIT 100", project_uuid_str, ASSIMILATE_READY); + done = 0; while (!wu.enumerate(query)) { DB_RESULT canonical_result; @@ -53,10 +58,123 @@ int DC_processEvents(int timeout) continue; _dc_resultcb(result->wu, result); _DC_destroyResult(result); + done++; } g_free(query); - return 0; + return done; +} + +static void handle_special_msg(DC_Workunit *wu, const char *msg) +{ + if (!strncmp(msg, DC_MSG_UPLOAD, strlen(DC_MSG_UPLOAD))) + { + char *p, *q, *subresult_name, *client_label, path[PATH_MAX]; + + p = strchr(msg, ':'); + if (!p) + return; + q = strchr(p + 1, ':'); + if (!q) + return; + + subresult_name = g_strndup(p + 1, (q - p) - 1); + client_label = g_strdup(q + 1); + + if (strchr(subresult_name, G_DIR_SEPARATOR)) + { + DC_log(LOG_ERR, "Client sent insecure subresult name, " + "ignoring"); + g_free(subresult_name); + g_free(client_label); + return; + } + + dir_hier_path(subresult_name, _DC_getUploadDir(), + _DC_getUldlDirFanout(), path, FALSE); + g_free(subresult_name); + _dc_subresultcb(wu, client_label, path); + g_free(client_label); + } + else + DC_log(LOG_WARNING, "Received unknown control message %s", + msg); +} + +static int process_messages(void) +{ + DB_MSG_FROM_HOST msg; + char *query, *buf; + DC_Workunit *wu; + int done, ret; + + query = g_strdup_printf("WHERE variety LIKE '%s\\_'", project_uuid_str); + buf = (char *)g_malloc(MAX_MESSAGE_SIZE); + done = 0; + while (!msg.enumerate(query)) + { + wu = _DC_getWUByName(msg.variety); + if (!wu) + { + DC_log(LOG_WARNING, "Received message for unknown " + "WU %s", msg.variety); + goto handled; + } + + ret = parse_str(msg.xml, "", buf, MAX_MESSAGE_SIZE); + if (!ret) + { + DC_log(LOG_WARNING, "Failed to parse message %s", + msg.xml); + goto handled; + } + + if (!strncmp(buf, DCAPI_MSG_PFX, strlen(DCAPI_MSG_PFX))) + handle_special_msg(wu, buf + strlen(DCAPI_MSG_PFX) + 1); + else + _dc_messagecb(wu, buf); + done++; +handled: + msg.handled = true; + msg.update(); + } + g_free(query); + g_free(buf); + return done; +} + +int DC_processEvents(int timeout) +{ + time_t end, now; + int done; + + if (!_dc_resultcb || !_dc_subresultcb || !_dc_messagecb) + { + DC_log(LOG_ERR, "DC_processEvents: callbacks are not set up"); + return DC_ERR_CONFIG; + } + + end = time(NULL) + timeout; + done = 0; + while (1) + { + done += process_results(); + done += process_messages(); + if (done) + break; + + now = time(NULL); + if (now >= end) + break; + + /* Be nice to the database and sleep for a long time */ + if (end - now < 15) + sleep(end - now); + else + sleep(15); + } + + return done ? 0 : DC_ERR_TIMEOUT; } /* Look for a single result that matches the filter */