Remember the mode of input files

Do file copy in the library itself
Fix result template generation


git-svn-id: svn+ssh://cvs.lpds.sztaki.hu/var/lib/svn/szdg/dcapi/trunk@390 a7169a2c-3604-0410-bc95-c702d8d87f7a
This commit is contained in:
gombasg 2006-03-23 12:15:51 +00:00 committed by Adam Visegradi
parent 91f1589138
commit 84e7e61bb4
2 changed files with 141 additions and 57 deletions

View File

@ -10,6 +10,8 @@ extern "C" {
#include <uuid/uuid.h>
#include <glib.h>
#include "common_defs.h"
/********************************************************************
* Constants
*/
@ -53,7 +55,7 @@ struct _DC_Workunit
DC_WUState state;
char *workdir;
/* Input file definitions. Elements are of type char * */
/* Input file definitions. Elements are of type DC_LogicalFile */
GList *input_files;
int num_inputs;

View File

@ -7,6 +7,7 @@
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <fcntl.h>
#include <glib.h>
#include <sched_util.h>
@ -113,13 +114,12 @@ void DC_destroyWU(DC_Workunit *wu)
while (wu->input_files)
{
char *path = get_workdir_path(wu,
(const char *)wu->input_files->data, FILE_IN);
unlink(path);
g_free(path);
g_free(wu->input_files->data);
DC_PhysicalFile *file = (DC_PhysicalFile *)wu->input_files->data;
unlink(file->path);
wu->input_files = g_list_delete_link(wu->input_files,
wu->input_files);
_DC_destroyPhysicalFile(file);
}
while (wu->output_files)
@ -176,6 +176,7 @@ void DC_destroyWU(DC_Workunit *wu)
g_free(wu);
}
/* Check if the logical name is not already registered */
static int check_logical_name(DC_Workunit *wu, const char *logicalFileName)
{
GList *l;
@ -188,7 +189,9 @@ static int check_logical_name(DC_Workunit *wu, const char *logicalFileName)
}
for (l = wu->input_files; l; l = l->next)
{
if (!strcmp((char *)l->data, logicalFileName))
DC_PhysicalFile *file = (DC_PhysicalFile *)l->data;
if (!strcmp(file->label, logicalFileName))
{
DC_log(LOG_ERR, "File %s is already registered as an "
"input file", logicalFileName);
@ -207,9 +210,77 @@ static int check_logical_name(DC_Workunit *wu, const char *logicalFileName)
return 0;
}
#define COPY_BUFSIZE 65536
static int copy_file(const char *src, const char *dst)
{
int sfd, dfd;
ssize_t ret;
char *buf;
buf = (char *)g_malloc(COPY_BUFSIZE);
sfd = open(src, O_RDONLY);
if (sfd == -1)
{
DC_log(LOG_ERR, "Failed to open %s for copying: %s", src,
strerror(errno));
g_free(buf);
return -1;
}
dfd = open(dst, O_WRONLY | O_CREAT | O_TRUNC);
if (dfd == -1)
{
DC_log(LOG_ERR, "Failed to create %s: %s", dst, strerror(errno));
g_free(buf);
close(sfd);
return -1;
}
while ((ret = read(sfd, buf, COPY_BUFSIZE)) > 0)
{
char *ptr = buf;
while (ret)
{
ssize_t ret2 = write(dfd, ptr, ret);
if (ret2 < 0)
{
DC_log(LOG_ERR, "Error writing to %s: %s", dst,
strerror(errno));
close(sfd);
close(dfd);
unlink(dst);
g_free(buf);
return -1;
}
ret -= ret2;
ptr += ret2;
}
}
if (ret < 0)
{
DC_log(LOG_ERR, "Error reading from %s: %s", src, strerror(errno));
close(sfd);
close(dfd);
g_free(buf);
unlink(dst);
return -1;
}
g_free(buf);
close(sfd);
if (close(dfd))
{
DC_log(LOG_ERR, "Error writing to %s: %s", dst, strerror(errno));
unlink(dst);
return -1;
}
return 0;
}
int DC_addWUInput(DC_Workunit *wu, const char *logicalFileName, const char *URL,
DC_FileMode fileMode)
{
DC_PhysicalFile *file;
char *workpath;
int ret;
@ -222,66 +293,47 @@ int DC_addWUInput(DC_Workunit *wu, const char *logicalFileName, const char *URL,
* does not exceed the max. number of file slots */
workpath = get_workdir_path(wu, logicalFileName, FILE_IN);
file = _DC_createPhysicalFile(logicalFileName, workpath);
g_free(workpath);
if (!file)
return DC_ERR_INTERNAL;
switch (fileMode)
{
case DC_FILE_REGULAR:
{
GString *cmd;
char *quoted;
cmd = g_string_new("/bin/cp -a ");
quoted = g_shell_quote(URL);
g_string_append(cmd, quoted);
g_free(quoted);
g_string_append_c(cmd, ' ');
quoted = g_shell_quote(workpath);
g_string_append(cmd, quoted);
g_free(quoted);
/* XXX Re-implement this in C to save the overhead
* of shell invocation */
ret = system(cmd->str);
ret = copy_file(URL, file->path);
if (ret)
{
DC_log(LOG_ERR, "Executing \"%s\" failed: %s",
cmd->str, strerror(errno));
g_string_free(cmd, TRUE);
g_free(workpath);
_DC_destroyPhysicalFile(file);
return DC_ERR_BADPARAM; /* XXX */
}
g_string_free(cmd, TRUE);
break;
}
case DC_FILE_PERSISTENT:
ret = link(URL, workpath);
ret = link(URL, file->path);
if (ret)
{
DC_log(LOG_ERR, "Failed to link %s to %s: %s",
URL, workpath, strerror(errno));
g_free(workpath);
URL, file->path, strerror(errno));
_DC_destroyPhysicalFile(file);
return DC_ERR_BADPARAM; /* XXX */
}
/* XXX Further optimization: remember that the file was
* persistent and add <sticky/> and <report_on_rpc/>
* tags to the <file_info> description in the WU
* template */
/* Remember the file mode */
file->mode = DC_FILE_PERSISTENT;
break;
case DC_FILE_VOLATILE:
ret = rename(URL, workpath);
ret = rename(URL, file->path);
if (ret)
{
DC_log(LOG_ERR, "Failed to rename %s to %s: %s",
URL, workpath, strerror(errno));
g_free(workpath);
URL, file->path, strerror(errno));
_DC_destroyPhysicalFile(file);
return DC_ERR_BADPARAM; /* XXX */
}
break;
}
wu->input_files = g_list_append(wu->input_files,
g_strdup(logicalFileName));
wu->input_files = g_list_append(wu->input_files, file);
wu->num_inputs++;
g_free(workpath);
return 0;
}
@ -303,10 +355,19 @@ int DC_addWUOutput(DC_Workunit *wu, const char *logicalFileName)
return 0;
}
/* Returns the unique per-wu file name in the BOINC download hierarchy,
* without path components */
static char *get_input_download_name(DC_Workunit *wu, const char *label)
{
return g_strdup_printf("%s_%s", label, wu->uuid_str);
}
/* Returns the full path of the input file in the BOINC download hierarchy */
static char *get_input_download_path(DC_Workunit *wu, const char *label)
{
char path[PATH_MAX], *filename;
filename = g_strdup_printf("%s_%s", label, wu->uuid_str);
filename = get_input_download_name(wu, label);
dir_hier_path(filename, _DC_getDownloadDir(), _DC_getUldlDirFanout(),
path, TRUE);
g_free(filename);
@ -316,30 +377,35 @@ static char *get_input_download_path(DC_Workunit *wu, const char *label)
/* This function installs input files to the Boinc download directory */
static int install_input_files(DC_Workunit *wu)
{
char *src, *dest;
char *dest;
GList *l;
int ret;
for (l = wu->input_files; l; l = l->next)
{
src = get_workdir_path(wu, (char *)l->data, FILE_IN);
dest = get_input_download_path(wu, (char *)l->data);
ret = link(src, dest);
DC_PhysicalFile *file = (DC_PhysicalFile *)l->data;
/* This also creates the directory if needed */
dest = get_input_download_path(wu, file->label);
/* By creating the hard link ourselves we prevent BOINC from
* copying the file later */
ret = link(file->path, dest);
if (ret)
{
DC_log(LOG_ERR, "Failed to install file %s to the "
"Boinc download directory at %s", src, dest);
g_free(src);
"Boinc download directory at %s", file->path,
dest);
g_free(dest);
return DC_ERR_INTERNAL;
}
g_free(src);
g_free(dest);
}
/* During resume, we have to install the checkpoint file as well */
if (wu->ckpt_name)
{
char *src;
src = get_workdir_path(wu, wu->ckpt_name, FILE_CKPT);
dest = get_input_download_path(wu, wu->ckpt_name);
ret = link(src, dest);
@ -376,11 +442,13 @@ static char *generate_wu_template(DC_Workunit *wu)
for (i = 0, l = wu->input_files; l && i < wu->num_inputs;
l = l->next, i++)
{
DC_PhysicalFile *file = (DC_PhysicalFile *)l->data;
g_string_append(tmpl, "\t<file_ref>\n");
g_string_append_printf(tmpl,
"\t\t<file_number>%d</file_number>\n", i);
g_string_append_printf(tmpl,
"\t\t<open_name>%s</open_name>\n", (char *)l->data);
"\t\t<open_name>%s</open_name>\n", file->label);
g_string_append(tmpl, "\t</file_ref>\n");
}
@ -470,8 +538,8 @@ static char *generate_result_template(DC_Workunit *wu)
{
fprintf(tmpl, "\t<file_ref>\n");
fprintf(tmpl, "\t\t<file_name><OUTFILE_%d/></name>\n", i);
fprintf(tmpl, "\t\t<open_name>_dc_subresult_%d</open_name>\n", /* XXX check name */
i - wu->num_outputs);
fprintf(tmpl, "\t\t<open_name>%s%d</open_name>\n",
SUBRESULT_PFX, i - wu->num_outputs);
fprintf(tmpl, "\t</file_ref>\n");
i++;
}
@ -479,7 +547,7 @@ static char *generate_result_template(DC_Workunit *wu)
/* The checkpoint template */
fprintf(tmpl, "\t<file_ref>\n");
fprintf(tmpl, "\t\t<file_name><OUTFILE_%d/></name>\n", i);
fprintf(tmpl, "\t\t<open_name>_dc_checkpoint</open_name>\n"); /* XXX check name */
fprintf(tmpl, "\t\t<open_name>%s</open_name>\n", CKPT_LABEL);
fprintf(tmpl, "\t</file_ref>\n");
fprintf(tmpl, "</result>\n");
@ -510,8 +578,8 @@ int DC_submitWU(DC_Workunit *wu)
{
char **infiles, *result_path;
char *wu_template, *result_template_file;
int i, ret, ninputs;
DB_WORKUNIT bwu;
int i, ret;
GList *l;
ret = install_input_files(wu);
@ -537,10 +605,22 @@ int DC_submitWU(DC_Workunit *wu)
result_path = g_strdup_printf("%s%c%s", _DC_getCfgStr(CFG_PROJECTROOT),
G_DIR_SEPARATOR, result_template_file);
infiles = g_new0(char *, wu->num_inputs + 1);
/* Create the input file name array as required by create_work() */
ninputs = wu->num_inputs;
if (wu->ckpt_name)
ninputs++;
infiles = g_new0(char *, ninputs + 1);
for (l = wu->input_files, i = 0; l && i < wu->num_inputs;
l = l->next, i++)
infiles[i] = g_strdup((char *)l->data);
{
DC_PhysicalFile *file = (DC_PhysicalFile *)l->data;
infiles[i] = get_input_download_name(wu, file->label);
}
if (wu->ckpt_name)
infiles[i++] = get_input_download_name(wu, wu->ckpt_name);
/* Terminator so we can use g_strfreev() later */
infiles[i] = NULL;
ret = create_work(bwu, wu_template, result_template_file, result_path,
const_cast<const char **>(infiles), wu->num_inputs,
@ -556,5 +636,7 @@ int DC_submitWU(DC_Workunit *wu)
return DC_ERR_DATABASE;
}
wu->state = DC_WU_RUNNING;
return 0;
}