Implement messaging and subresult upload

git-svn-id: svn+ssh://cvs.lpds.sztaki.hu/var/lib/svn/szdg/dcapi/trunk@467 a7169a2c-3604-0410-bc95-c702d8d87f7a
This commit is contained in:
gombasg 2006-04-13 16:04:19 +00:00 committed by Adam Visegradi
parent 9f555d48ce
commit 9299432e12
3 changed files with 310 additions and 27 deletions

View File

@ -1,3 +1,11 @@
/*
* client.c - Client side of the BOINC DC-API backend
*
* Authors:
* Gábor Gombás <gombasg@sztaki.hu>
*
* Copyright (c) 2006 MTA SZTAKI
*/
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
@ -25,6 +33,19 @@
#include "common_defs.h"
/********************************************************************
* Constants
*/
typedef enum {
STATE_NORMAL,
STATE_SUSPEND,
STATE_FINISH
} client_event_state;
#define LAST_CKPT_FILE "dc_lastckpt"
/********************************************************************
* Global variables
*/
@ -37,8 +58,13 @@ static char *last_complete_ckpt;
static char active_ckpt[PATH_MAX];
/* Number of subresults already sent */
static int subresult_cnt;
/* Maximum number of allowed subresults */
static int max_subresults;
/* Name of the current work unit */
static char wu_name[256];
/* State machine for suspend */
static client_event_state client_state;
/********************************************************************
* Common API functions
@ -54,18 +80,19 @@ int DC_getMaxMessageSize(void)
int DC_getMaxSubresults(void)
{
struct dirent *d;
int nsubs;
DIR *dir;
if (max_subresults)
return max_subresults - subresult_cnt;
dir = opendir(".");
if (!dir)
return 0;
nsubs = 0;
while ((d = readdir(dir)))
if (!strncasecmp(d->d_name, SUBRESULT_PFX, strlen(SUBRESULT_PFX)))
nsubs++;
max_subresults++;
closedir(dir);
return nsubs;
return max_subresults;
}
DC_GridCapabilities DC_getGridCapabilities(void)
@ -90,9 +117,20 @@ int DC_init(void)
if (boinc_init())
return DC_ERR_INTERNAL;
if (!boinc_resolve_filename(CKPT_LABEL_IN, path, sizeof(path)))
/* Check if we are starting from a checkpoint file */
if ((f = boinc_fopen(LAST_CKPT_FILE, "r")))
{
fgets(path, sizeof(path), f);
fclose(f);
if (strlen(path))
last_complete_ckpt = strdup(path);
}
/* If the application did not generate a checkpoint file before, check
* if the master sent one */
else if (!boinc_resolve_filename(CKPT_LABEL_IN, path, sizeof(path)))
last_complete_ckpt = strdup(path);
/* Extract the WU name from init_data.xml */
ret = stat("init_data.xml", &st);
if (ret)
return DC_ERR_INTERNAL;
@ -111,6 +149,18 @@ int DC_init(void)
if (!ret)
return DC_ERR_INTERNAL;
/* Parse the config file if the master sent one */
buf = DC_resolveFileName(DC_FILE_IN, DC_CONFIG_FILE);
if (buf && access(buf, R_OK))
{
ret = _DC_parseCfg(buf);
if (ret)
return ret;
}
free(buf);
DC_log(LOG_DEBUG, "DC-API initializef for work unit %s", wu_name);
return 0;
}
@ -148,14 +198,31 @@ char *DC_resolveFileName(DC_FileType type, const char *logicalFileName)
int DC_sendResult(const char *logicalFileName, const char *path,
DC_FileMode mode)
{
char label[32], new_path[PATH_MAX], msg[PATH_MAX];
std::string str;
char label[32], new_path[PATH_MAX], msg[PATH_MAX], *p;
int ret;
if (!DC_getMaxSubresults())
{
DC_log(LOG_ERR, "No more subresults are allowed to be sent");
return DC_ERR_NOTIMPL;
}
/* We have to use the subresult labels that were defined by the
* master for doing the upload */
snprintf(label, sizeof(label), "%s%d", SUBRESULT_PFX, ++subresult_cnt);
if (boinc_resolve_filename(label, new_path, sizeof(new_path)))
return DC_ERR_INTERNAL;
/* We need the file name that will appear on the server */
p = strrchr(new_path, PATH_SEPARATOR[0]);
if (!p)
{
DC_log(LOG_ERR, "Failed to determine the on-server file name "
"of the subresult");
return DC_ERR_INTERNAL;
}
p++;
switch (mode)
{
case DC_FILE_REGULAR:
@ -187,7 +254,7 @@ int DC_sendResult(const char *logicalFileName, const char *path,
}
/* Stupid C++ */
str = label;
std::string str = label;
ret = boinc_upload_file(str);
if (ret)
{
@ -195,7 +262,8 @@ int DC_sendResult(const char *logicalFileName, const char *path,
return DC_ERR_INTERNAL;
}
snprintf(msg, sizeof(msg), "%s upload %s", DCAPI_MSG_PFX, label);
snprintf(msg, sizeof(msg), "%s:%s:%s:%s", DCAPI_MSG_PFX, DC_MSG_UPLOAD,
p, logicalFileName);
DC_sendMessage(msg);
/* If we had to copy volatile files, delete the original only when
@ -207,8 +275,20 @@ int DC_sendResult(const char *logicalFileName, const char *path,
int DC_sendMessage(const char *message)
{
std::string xml, msg;
int ret;
msg = message;
xml_escape(msg, xml);
xml = "<message>" + xml + "</message>";
/* We use the WU's name as the variety */
return boinc_send_trickle_up(wu_name, (char *)message);
ret = boinc_send_trickle_up(wu_name, (char *)xml.c_str());
if (ret)
{
DC_log(LOG_ERR, "Failed to send trickle-up message");
return DC_ERR_INTERNAL;
}
return 0;
}
static DC_Event *new_event(DC_EventType type)
@ -222,14 +302,26 @@ static DC_Event *new_event(DC_EventType type)
static DC_Event *handle_special_msg(const char *message)
{
/* XXX Implement it */
if (!strcmp(message, DC_MSG_CANCEL))
{
client_state = STATE_FINISH;
return new_event(DC_EVENT_FINISH);
}
if (!strcmp(message, DC_MSG_SUSPEND))
{
client_state = STATE_SUSPEND;
return new_event(DC_EVENT_DO_CHECKPOINT);
}
DC_log(LOG_WARNING, "Received unknown control message %s", message);
return NULL;
}
DC_Event *DC_checkEvent(void)
{
DC_Event *event;
char *buf;
char *buf, *msg;
int ret;
/* Check for checkpoint requests */
@ -248,10 +340,26 @@ DC_Event *DC_checkEvent(void)
ret = boinc_receive_trickle_down(buf, MAX_MESSAGE_SIZE);
if (ret)
{
if (!strncmp(buf, DCAPI_MSG_PFX, strlen(DCAPI_MSG_PFX)))
msg = (char *)malloc(MAX_MESSAGE_SIZE);
if (!msg)
{
event = handle_special_msg(buf);
free(buf);
return NULL;
}
ret = parse_str(buf, "<message>", msg, MAX_MESSAGE_SIZE);
if (!ret)
{
DC_log(LOG_WARNING, "Failed to parse message %s", buf);
free(buf);
free(msg);
return NULL;
}
if (!strncmp(msg, DCAPI_MSG_PFX, strlen(DCAPI_MSG_PFX)))
{
event = handle_special_msg(msg +
strlen(DCAPI_MSG_PFX) + 1);
free(msg);
return event;
}
@ -261,6 +369,9 @@ DC_Event *DC_checkEvent(void)
}
free(buf);
if (client_state == STATE_FINISH)
return new_event(DC_EVENT_FINISH);
return NULL;
}
@ -281,10 +392,30 @@ void DC_destroyEvent(DC_Event *event)
void DC_checkpointMade(const char *filename)
{
/* XXX Compare filename to active_ckpt and warn on mismatch */
FILE *f;
if (strcmp(filename, active_ckpt))
{
DC_log(LOG_ERR, "DC_checkpointMade: bad checkpoint file %s "
"(expected %s)", filename, active_ckpt);
return;
}
/* Remember which was the last completed checkpoint */
f = boinc_fopen(LAST_CKPT_FILE, "w");
if (f)
{
fprintf(f, "%s", filename);
fclose(f);
}
unlink(last_complete_ckpt);
free(last_complete_ckpt);
last_complete_ckpt = strdup(filename);
boinc_checkpoint_completed();
if (client_state == STATE_SUSPEND)
client_state = STATE_FINISH;
}
void DC_fractionDone(double fraction)
@ -294,7 +425,30 @@ void DC_fractionDone(double fraction)
void DC_finish(int exitcode)
{
char path[PATH_MAX];
int ret;
/* Rename/copy the checkpoint file to the label CKPT_LABEL_OUT
* so it will be uploaded together with the result(s) */
if (last_complete_ckpt &&
!boinc_resolve_filename(CKPT_LABEL_OUT, path, sizeof(path)))
{
ret = link(last_complete_ckpt, path);
if (ret)
_DC_copyFile(last_complete_ckpt, path);
}
/* Delete files that we have created */
if (last_complete_ckpt)
unlink(last_complete_ckpt);
if (strlen(active_ckpt))
unlink(active_ckpt);
unlink(LAST_CKPT_FILE);
boinc_finish(exitcode);
/* We should never get here, but boinc_finish() is not marked
* with "noreturn" so this avoids a GCC warning */
_exit(exitcode);
}
/********************************************************************

View File

@ -1,4 +1,11 @@
/* Definitions common for both the server and client side */
/*
* common_defs.h - Definitions common for both the server and client side
*
* Authors:
* Gábor Gombás <gombasg@sztaki.hu>
*
* Copyright (c) 2006 MTA SZTAKI
*/
#ifndef COMMON_DEFS_H
#define COMMON_DEFS_H
@ -13,15 +20,19 @@ extern "C" {
#define CKPT_LABEL_IN "dc_checkpoint.in"
#define CKPT_LABEL_OUT "dc_checkpoint.out"
/* Name of the client-side config. file */
#define CLIENTCONF_LABEL "dc_client.conf"
/* Maximum allowed message length */
#define MAX_MESSAGE_SIZE 16384
/* Prefix for internal messages between the client-side and master-side DC-API */
#define DCAPI_MSG_PFX "__dcapi__"
/* Internal message that a subresult has been uploaded */
#define DC_MSG_UPLOAD "UPLOAD"
/* Internal message telling that the client should suspend */
#define DC_MSG_SUSPEND "SUSPEND"
/* Internal message for cancelling the client computation */
#define DC_MSG_CANCEL "CANCEL"
#ifdef __cplusplus
}
#endif

View File

@ -1,3 +1,11 @@
/*
* events.C - BOINC event handling
*
* Authors:
* Gábor Gombás <gombasg@sztaki.hu>
*
* Copyright (c) 2006 MTA SZTAKI
*/
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
@ -10,23 +18,20 @@
#include <boinc_db.h>
#include <sched_util.h>
#include <sched_msgs.h>
#include <parse.h>
int DC_processEvents(int timeout)
static int process_results(void)
{
DC_Result *result;
DB_WORKUNIT wu;
char *query;
if (!_dc_resultcb || !_dc_subresultcb || !_dc_messagecb)
{
DC_log(LOG_ERR, "DC_processEvents: callbacks are not set up");
return DC_ERR_CONFIG;
}
int done;
/* XXX Check LIMIT value */
query = g_strdup_printf("WHERE name LIKE '%s\\_%%' "
"AND assimilate_state = %d LIMIT 100", project_uuid_str,
ASSIMILATE_READY);
done = 0;
while (!wu.enumerate(query))
{
DB_RESULT canonical_result;
@ -53,10 +58,123 @@ int DC_processEvents(int timeout)
continue;
_dc_resultcb(result->wu, result);
_DC_destroyResult(result);
done++;
}
g_free(query);
return 0;
return done;
}
static void handle_special_msg(DC_Workunit *wu, const char *msg)
{
if (!strncmp(msg, DC_MSG_UPLOAD, strlen(DC_MSG_UPLOAD)))
{
char *p, *q, *subresult_name, *client_label, path[PATH_MAX];
p = strchr(msg, ':');
if (!p)
return;
q = strchr(p + 1, ':');
if (!q)
return;
subresult_name = g_strndup(p + 1, (q - p) - 1);
client_label = g_strdup(q + 1);
if (strchr(subresult_name, G_DIR_SEPARATOR))
{
DC_log(LOG_ERR, "Client sent insecure subresult name, "
"ignoring");
g_free(subresult_name);
g_free(client_label);
return;
}
dir_hier_path(subresult_name, _DC_getUploadDir(),
_DC_getUldlDirFanout(), path, FALSE);
g_free(subresult_name);
_dc_subresultcb(wu, client_label, path);
g_free(client_label);
}
else
DC_log(LOG_WARNING, "Received unknown control message %s",
msg);
}
static int process_messages(void)
{
DB_MSG_FROM_HOST msg;
char *query, *buf;
DC_Workunit *wu;
int done, ret;
query = g_strdup_printf("WHERE variety LIKE '%s\\_'", project_uuid_str);
buf = (char *)g_malloc(MAX_MESSAGE_SIZE);
done = 0;
while (!msg.enumerate(query))
{
wu = _DC_getWUByName(msg.variety);
if (!wu)
{
DC_log(LOG_WARNING, "Received message for unknown "
"WU %s", msg.variety);
goto handled;
}
ret = parse_str(msg.xml, "<message>", buf, MAX_MESSAGE_SIZE);
if (!ret)
{
DC_log(LOG_WARNING, "Failed to parse message %s",
msg.xml);
goto handled;
}
if (!strncmp(buf, DCAPI_MSG_PFX, strlen(DCAPI_MSG_PFX)))
handle_special_msg(wu, buf + strlen(DCAPI_MSG_PFX) + 1);
else
_dc_messagecb(wu, buf);
done++;
handled:
msg.handled = true;
msg.update();
}
g_free(query);
g_free(buf);
return done;
}
int DC_processEvents(int timeout)
{
time_t end, now;
int done;
if (!_dc_resultcb || !_dc_subresultcb || !_dc_messagecb)
{
DC_log(LOG_ERR, "DC_processEvents: callbacks are not set up");
return DC_ERR_CONFIG;
}
end = time(NULL) + timeout;
done = 0;
while (1)
{
done += process_results();
done += process_messages();
if (done)
break;
now = time(NULL);
if (now >= end)
break;
/* Be nice to the database and sleep for a long time */
if (end - now < 15)
sleep(end - now);
else
sleep(15);
}
return done ? 0 : DC_ERR_TIMEOUT;
}
/* Look for a single result that matches the filter */