Autotooled dcapi with 4.x/5.x Boinc compatibility and clgr support included

git-svn-id: svn+ssh://cvs.lpds.sztaki.hu/var/lib/svn/szdg/dcapi/trunk@145 a7169a2c-3604-0410-bc95-c702d8d87f7a
This commit is contained in:
gombasg 2005-12-06 15:36:31 +00:00 committed by Adam Visegradi
parent d3e8698e8a
commit 00f8d22a05
34 changed files with 2493 additions and 333 deletions

13
dcapi/Makefile.am Normal file
View File

@ -0,0 +1,13 @@
SUBDIRS = include common
if WITH_BOINC
SUBDIRS += boinc
endif
if WITH_CLGR
SUBDIRS += clgr
endif
if WITH_LOCAL
SUBDIRS += local
endif

View File

@ -1,27 +0,0 @@
#CC=cc -n32 -g -Xcpluscomm
#CC=gcc-3.4 -g
#CPP=g++-3.4 -g
CC=gcc -g
CPP=g++ -g
AR=ar
RANLIB=ranlib
# User configurable items
BOINC=../../boinc
MYSQL_INC=/sw/include/mysql
INC=-I../include -I/usr/local/include
BOINC_INC=-I${BOINC}/tools -I${BOINC}/sched -I${BOINC}/db -I${BOINC}/lib -I${MYSQL_INC}
all:
${CC} -c -Wall dc.c cfg.c logger.c result.c ${INC}
$(CPP) -c -Wall validate_util.C ${INC} ${BOINC_INC}
${CPP} -c -Wall wu.C assimilator.C ${INC} ${BOINC_INC}
${AR} rc libdc-boinc-local.a dc.o cfg.o logger.o result.o wu.o assimilator.o validate_util.o
${RANLIB} libdc-boinc-local.a
clean:
rm -rf core *~ *.o libdc-boinc-local.a

18
dcapi/boinc/Makefile.am Normal file
View File

@ -0,0 +1,18 @@
lib_LTLIBRARIES = libdc-boinc.la
AM_CPPFLAGS = -I$(top_srcdir)/common $(BOINC_CPPFLAGS) $(MYSQL_CPPFLAGS)
libdc_boinc_la_SOURCES = \
assimilator.C \
assimilator.h \
dc.c \
result.c \
result.h \
validate_util.C \
validate_util.h \
wu.C \
wu.h
libdc_boinc_la_LIBADD = \
../common/libdc-common.la \
$(BOINC_LIBS) \
$(MYSQL_LIBS)

View File

@ -1,27 +0,0 @@
#define DC_CFG_OK 0
#define DC_CFG_FILENOTEXISTS 1
/** Parse the config file and store name=value pairs in memory
*
*/
int dc_cfg_parse(const char *cfgfile);
/** Get a value for a given name
*
*/
char * dc_cfg_get(char *name);
#define DC_CFG_OK 0
#define DC_CFG_FILENOTEXISTS 1
#define DC_CFG_OUTOFMEM 2
/** Parse the config file and store name=value pairs in memory
*
*/
int dc_cfg_parse(const char *cfgfile);
/** Get a value for a given name
*
*/
char * dc_cfg_get(char *name);

View File

@ -1,100 +0,0 @@
#include <stdio.h>
#include <stdarg.h>
#include <sys/syslog.h>
#include <errno.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#include "cfg.h"
#include "dc.h"
static int loglevel = -1;
static FILE *logfile;
static const char *levels[] =
{
[LOG_DEBUG] = "Debug",
[LOG_INFO] = "Info",
[LOG_NOTICE] = "Notice",
[LOG_WARNING] = "Warning",
[LOG_ERR] = "Error"
};
static void init_log(void)
{
char *val;
/* Default level */
loglevel = LOG_NOTICE;
val = dc_cfg_get("LogLevel");
if (val)
{
if (val[0] >= '0' && val[0] <= '9')
loglevel = atoi(val);
else
{
int i;
for (i = 0; i < (int)sizeof(levels) / sizeof(levels[0]); i++)
{
if (levels[i] && !strcasecmp(levels[i], val))
{
loglevel = i;
break;
}
}
}
}
val = dc_cfg_get("LogFile");
if (val)
{
logfile = fopen(val, "a");
if (!logfile)
{
fprintf(stderr, "Failed to open the log file %s: %s",
val, strerror(errno));
exit(1);
}
}
else
logfile = stdout;
}
void DC_log(int level, const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
DC_vlog(level, fmt, ap);
va_end(ap);
}
void DC_vlog(int level, const char *fmt, va_list ap)
{
const char *levstr;
char timebuf[32];
struct tm *tm;
time_t now;
if (loglevel < 0)
init_log();
if (level > loglevel)
return;
if (level >= 0 && level < sizeof(levels) / sizeof(levels[0]) && levels[level])
levstr = levels[level];
else
levstr = "Unknown";
now = time(NULL);
tm = localtime(&now);
strftime(timebuf, sizeof(timebuf), "%Y-%m-%d %H:%M:%S", tm);
fprintf(logfile, "%s [%s] ", timebuf, levstr);
vfprintf(logfile, fmt, ap);
fprintf(logfile, "\n");
fflush(logfile);
}

View File

@ -34,6 +34,11 @@
#include <sched_msgs.h>
#include <validate_util.h>
// Compatibility with Boinc 4.x
#if BOINC_VERSION == 4
#define MSG_CRITICAL CRITICAL
#endif
using std::vector;
using std::string;
@ -47,10 +52,15 @@ int get_output_file_path(RESULT const& result, string& path_str) {
if (!parse_str(result.xml_doc_out, "<name>", buf, sizeof(buf))) {
return ERR_XML_PARSE;
}
dir_hier_path(buf, config.upload_dir, config.uldl_dir_fanout, path, true);
if (!boinc_file_exists(path)) {
dir_hier_path(buf, config.upload_dir, config.uldl_dir_fanout, false, path);
}
#if BOINC_VERSION == 4
dir_hier_path(buf, config.upload_dir, config.uldl_dir_fanout, true, path);
if (!boinc_file_exists(path))
dir_hier_path(buf, config.upload_dir, config.uldl_dir_fanout, false, path);
#else
dir_hier_path(buf, config.upload_dir, config.uldl_dir_fanout, path);
#endif
path_str = path;
return 0;
}

View File

@ -11,11 +11,13 @@
#include <string.h>
#include <errno.h>
//#include <uuid/uuid.h>
#include <sched_config.h>
#include "dc.h"
#include "wu.h"
#include "result.h"
extern SCHED_CONFIG config;
/* PRIVATE */
@ -177,7 +179,11 @@ int dc_wu_setInput(DC_Workunit wu, const char *url, const char* localfilename)
// snprintf(downloadpath, 256, "%s/download/%s", dc_projectRootDir, downloadfilename);
snprintf(download_dir, 256, "%s/download", dc_projectRootDir);
dir_hier_path(downloadfilename, download_dir, 1024, downloadpath, true);
#if BOINC_VERSION == 4
dir_hier_path(downloadfilename, download_dir, config.uldl_dir_fanout, true, downloadpath, true);
#else
dir_hier_path(downloadfilename, download_dir, config.uldl_dir_fanout, downloadpath, true);
#endif
snprintf(syscmd, 1024, "cp %s %s", url, downloadpath);
DC_log(LOG_DEBUG, "system command: '%s'", syscmd);

21
dcapi/clgr/Makefile.am Normal file
View File

@ -0,0 +1,21 @@
lib_LTLIBRARIES = libdc-clgr.la
AM_CPPFLAGS = -I$(top_srcdir)/common
libdc_clgr_la_SOURCES = \
dc.c \
dc_client.c \
defines.h \
get_out.c \
get_out.h \
remove.c \
remove.h \
result.c \
result.h \
status.c \
status.h \
submit.c \
submit.h \
wu.C \
wu.h
libdc_clgr_la_LIBADD = ../common/libdc-common.la

261
dcapi/clgr/dc.c Normal file
View File

@ -0,0 +1,261 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include "dc.h"
#include "cfg.h"
#include "wu.h"
//#include "assimilator.h"
#include "result.h"
static char *projectname; /* project name under boinc */
static char *appname; /* name of the application, used here in assimilation */
static char *workdir; /* a working dir for this application */
static char *executabledir; /* the executable file, with path */
//static char *boincprojectrootdir; /* where projects are stored, e.g. /usr/boinc/projects */
//static char *wutemplate; /* path and file name */
//static char *resulttemplate; /* path and file name */
//static char *uploadkeyfile; /* upload private key file */
static char yesno[2][4] = {"no", "yes"};
static char *sleepinterval;
/* CONSTRAINTS
1. One application client is supported (because of assimilation)
Client name is defined for each WU separately in the API
At DC_init, this one client name should be given as application_name.
2. One workunit and one result templates are supported, given in the config file.
*/
int DC_init(const char *project_name, const char *application_name, const char *configfile)
{
char buf[512]; //, syscmd[2048];
time_t ltime;
projectname = strdup(project_name);
appname = strdup(application_name);
if (dc_cfg_parse(configfile) != DC_CFG_OK) return DC_ERROR;
workdir = dc_cfg_get("WorkingDirectory");
executabledir = dc_cfg_get("Executabledir");
//boincprojectrootdir = dc_cfg_get("BoincProjectRootDirectory");
//wutemplate = dc_cfg_get("WUTemplatePath");
//resulttemplate = dc_cfg_get("ResultTemplatePath");
//uploadkeyfile = dc_cfg_get("UploadPrivateKey");
sleepinterval = dc_cfg_get("CheckResultSleepInterval");
if (workdir == NULL) {
DC_log(LOG_ERR,
"Working directory cannot be determined from the config file");
return DC_ERROR;
}
if (executabledir == NULL) {
DC_log(LOG_ERR,
"Executable file cannot be determined from the config file");
return DC_ERROR;
}
/*
if (boincprojectrootdir == NULL) {
DC_log(LOG_ERR,
"BOINC project root directory cannot be determined from the config file");
return DC_ERROR;
}
if (wutemplate == NULL) {
DC_log(LOG_ERR,
"Template file for Workunit cannot be determined from the config file");
return DC_ERROR;
}
if (resulttemplate == NULL) {
DC_log(LOG_ERR,
"Template file for Result cannot be determined from the config file");
return DC_ERROR;
}
if (uploadkeyfile == NULL) {
DC_log(LOG_ERR,
"Upload private key file cannot be determined from the config file");
return DC_ERROR;
}
*/
if (sleepinterval == NULL) {
DC_log(LOG_ERR,
"Check for Result sleeping interval cannot be determined from the config file\nDefault value is 5 sec.");
strcpy(sleepinterval,"5");
//return DC_ERROR;
}
DC_log(LOG_DEBUG, "init: WorkingDir=%s", workdir);
DC_log(LOG_DEBUG, "init: Executabledir=%s", executabledir);
// DC_log(LOG_DEBUG, "init: Boinc Project Root Dir=%s", boincprojectrootdir);
// DC_log(LOG_DEBUG, "init: Template WU=%s", wutemplate);
// DC_log(LOG_DEBUG, "init: Template Result=%s", resulttemplate);
// DC_log(LOG_DEBUG, "init: Upload private key file=%s", uploadkeyfile);
DC_log(LOG_DEBUG, "init: Check Result sleeping interval=%s", sleepinterval);
/* Check config.xml in project's directory */
/* snprintf(buf, 512, "%s/%s/config.xml", boincprojectrootdir, project_name);
if (access(buf, R_OK)) {
DC_log(LOG_ERR,
"Invalid project name, not valid configuration found: %s, error = %d %s",
buf, errno, strerror(errno));
return DC_ERROR;
}
*/
/* Copy template files into project's template/ dir. */
/* snprintf(buf, 512, "%s/%s/templates", boincprojectrootdir, project_name);
snprintf(syscmd, 1024, "cp %s %s", resulttemplate, buf);
if (system(syscmd) == -1) {
DC_log(LOG_ERR,
"Cannot copy result template files into project's template/ dir.\n"
"Command '%s' failed.", syscmd);
return DC_ERROR;
}
*/
// snprintf(buf, 512, "%s/%s", boincprojectrootdir, project_name);
// dc_wu_init(buf, uploadkeyfile); /* init WU manager */
snprintf(buf, 512, "%s_%s_%ld", project_name, appname, time(&ltime));
dc_wu_init(workdir, buf, executabledir);
return DC_OK;
}
DC_Workunit DC_createWU (const char *application_client,
const char *arguments)
{
// char dir[256];
/* char inpf[256], syscmd[1024]; */
/* Create working directory for WU
<workdir>/<wu name>
*/
/*
snprintf(dir, 256, "%s/%s/%d", workdir, projectname, wundx);
DC_log(LOG_DEBUG, "Create WU in %s", dir);
if (mkdir(dir, S_IRWXU+S_IRGRP)) {
if (errno != 17) { // 17=directory exists
DC_log(LOG_ERR,
"Cannot create directory %s: %s",
dir, strerror(errno));
return -1;
}
else {
// errno 17: File exists
// Here should be checked if
// - it is a directory and
// - we can read/write into it
//
}
}
*/
return dc_wu_create(application_client, arguments);
}
int DC_setInput (DC_Workunit wu, char * URL, char * localFileName)
{
return dc_wu_setInput(wu, URL, localFileName);
}
int DC_setPriority (DC_Workunit wu, int priority)
{
return dc_wu_setPriority(wu, priority);
}
int DC_submitWU (DC_Workunit wu)
{
//char boincRootDir[256];
//snprintf(boincRootDir, 512, "%s/%s", boincprojectrootdir, projectname);
//return dc_wu_createBoincWU(wu, uploadkeyfile, boincRootDir);
return dc_wu_submitWU(wu);
}
int DC_cancelWU (DC_Workunit wu)
{
printf("DC_cancelWU is not implemented yet");
return 1;
}
int DC_destroyWU(DC_Workunit wu)
{
return dc_wu_destroy(wu);
}
int DC_suspendWU(DC_Workunit wu)
{
return dc_wu_suspend(wu);
}
int DC_resubmitWU(DC_Workunit wu)
{
return dc_wu_resubmit(wu);
}
int DC_checkForResult(int timeout,
void (*cb_assimilate_result)(DC_Result result)
)
{
int retval;
int ex = 0;
int sleeptime = atoi(sleepinterval);
time_t tstart = time(NULL);
// static int initialized = 0;
//char buf[512];
if (sleeptime < 1) sleeptime = 1;
DC_log(LOG_INFO, "Check for results, blocking=%s, sleep_interval=%d",
yesno[timeout>=0], sleeptime);
/*
if (!initialized) {
snprintf(buf, 512, "%s/%s", boincprojectrootdir, projectname);
dc_assimilator_init(buf, appname); // init assimilator
initialized = 1;
}
*/
while (!ex) {
ex = 1;
DC_log(LOG_DEBUG, "Call dc_assimilator_dopass()");
//retval = dc_assimilator_dopass(cb_assimilate_result);
retval = dc_wu_checkForResult(cb_assimilate_result);
if (retval == 0 && timeout == 0) /* no result and waiting forever */
ex = 0;
if (retval == 0 && /* no result found yet */
timeout > 0 && /* blocking mode */
time(NULL) < tstart+timeout /* timeout (>0) not exceeded */
) ex = 0;
if (!ex) sleep(sleeptime);
}
if (retval == -1) return DC_ERROR;
else return DC_OK;
}

66
dcapi/clgr/dc_client.c Normal file
View File

@ -0,0 +1,66 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include "dc_client.h"
#include "dc.h"
static time_t result_time = 0;
static time_t ckpt_time = 0;
static char statfilename[] = "job_out_stats.dat";
/* Print out result_time and ckpt_time in the format "%d %d"
* into the file 'statfilename'
* ClusterGrid infrastructure takes care of delivering information
* up to the information system so that job status query results in
* subresult_time = <time>
* ckpt_time = <time>
*/
static int dc_client_writeout(void)
{
FILE *out;
int retval = DC_OK;
if ( (out = fopen(statfilename, "w")) != NULL) {
fprintf(out, "%d %d\n", (int) result_time, (int)ckpt_time);
fclose(out);
}
else {
DC_log(LOG_ERR, "dc-client: Cannot open file %s", statfilename);
retval = DC_ERROR;
}
return retval;
}
/** Notify master (somehow) that a subresult is provided
*/
int DC_SendResult(char **files, // output files
int nfiles // number of output files
)
{
result_time = time(NULL);
return dc_client_writeout();
}
/** Notify master (somehow) that a user-level checkpoint is provided
*/
int DC_CheckpointMade(void)
{
ckpt_time = time(NULL);
return dc_client_writeout();
}
/** Finalize client API if needed.
*/
void DC_Finish(void)
{
}

34
dcapi/clgr/defines.h Normal file
View File

@ -0,0 +1,34 @@
#ifndef __DEFINES_H_
#define __DEFINES_H_
#ifdef __cplusplus
extern "C" {
#endif
/* Maximum number of input files */
#define MAX_INFILES 4
/* Maximum number of output files */
#define MAX_OUTFILES 4
/* States of the work unit */
#define STATE_INVALID 0 // don't used space in the wu_table
#define STATE_CREATED 1 // used wu_table part, but is already doesn't submitted
#define STATE_SUBMITTED 2 // a submitted work unit
#define STATE_RUNNING 3 // when the work unit is stated to running on a host
#define STATE_FINISHED 4 // the work unit is finished so we can get output files, create result, and delete work unit
#define STATE_UNKNOWN 5 // don't know what is the state of the selected wu, especially before ask_state function call
#define STATE_UNDEFINED 6 // if the ask_status function cannot realize the status of the given work unit
#define STATE_SUSPENDED 7 // when the master suspended an already submitted work unit
/* parameters for getout function */
#define GETOUT_FINISHED 0 // the work unit was finished so we can delete it from the info system
#define GETOUT_NOTFINISHED 1 // the work unit wasn't finished, only create subresults
#ifdef __cplusplus
}
#endif
#endif /* __DEFINES_H_ */

212
dcapi/clgr/get_out.c Normal file
View File

@ -0,0 +1,212 @@
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<sys/stat.h>
#include<sys/types.h>
#include<unistd.h>
#include "defines.h"
#include "dc.h"
char *extract_id(const char *wu_name);
static char *locate_path()
{
char buf[1024];
getcwd(buf, sizeof(buf));
return strdup(buf);
}
static int set_path_normal(const char *path)
{
if (chdir(path) != 0) return DC_ERROR;
//else DC_log(LOG_DEBUG," *** Actual path has been set to '%s' ", path);
return DC_OK;
}
int exit_code(char *filename)
{
FILE *log;
char line[512];
char *tmp1, *tmp2;
int exitcode = 0;
//int i;
log = fopen(filename, "r");
if(log != NULL)
{
while(!feof(log))
{
fgets(line, sizeof(line), log);
tmp1 = strdup(line);
tmp1 = strtok(tmp1, " ");
// search for a line that contains 'terminated.'
while((tmp1 = strtok(NULL, " ")) != NULL)
{
if(strcmp(tmp1, "terminated.") == 0)
{
// if found, than read the line coming after
fgets(line, sizeof(line), log);
tmp1 = strdup(line);
tmp1 = strtok(tmp1, " ");
tmp2 = NULL;
while((tmp1 = strtok(NULL, " ")) != NULL)
{
tmp2 = strdup(tmp1);
}
exitcode = atoi(tmp2);
DC_log(LOG_DEBUG,"GETOUT: ***exitcode: %d, log: %s;", exitcode, tmp2);
}
}
}
fclose(log);
}
return exitcode;
}
int get_out(const char *wu_name, const char *work_dir,
char *output_dir[1], char *result_name[1],
char *std_out[1], char *std_err[1], char *sys_log[1], int *exitcode, int IsFinished)
{
char *pwd;
int i;
char *id;
char outputdir[512];
char file[128];
FILE *exist;
FILE *clgr_getout;
char command[512];
char result[512];
//int space_counter = 0;
DC_log(LOG_DEBUG,"GET_OUT: started...");
pwd = locate_path();
set_path_normal(work_dir);
if(IsFinished == GETOUT_FINISHED)
/* SATATE=FINISHED, get_out results and delete from clustergrid */
snprintf(command, 512, "clgr_getout -t -d -i %s", wu_name);
else
/* SATATE!=FINISHED, get_out results BUT DO NOT delete from the clustergrid */
snprintf(command, 512, "clgr_getout -t -i %s", wu_name);
clgr_getout = popen(command, "r");
if(clgr_getout == NULL)
{
DC_log(LOG_ERR,"Cannot make 'clgr_getout' command.");
return DC_ERROR;
}else
{
fgets(result, sizeof(result), clgr_getout);
pclose(clgr_getout);
}
/* search for 'ok' after the third space, string 'result' is NOT modified */
id = strdup(result);
id = strtok(result, " ");
id = strtok(NULL, " ");
id = strtok(NULL, " ");
id = strtok(NULL, " ");
//DC_log(LOG_DEBUG,"***%s***",result);
//DC_log(LOG_DEBUG,"***%s***",id);
if(!strcmp(id, "ok"))
{
DC_log(LOG_ERR,"Clgr_getout error message: '%s'", result);
return DC_ERROR;
}
DC_log(LOG_DEBUG,"%s job finished.",wu_name);
////////////////////////////////////////
i = 0;
id = (char *)extract_id(wu_name);
result_name[0] = strdup(wu_name);
DC_log(LOG_DEBUG,"***result_name: %s", result_name[0]);
sprintf(outputdir, "%s/output", work_dir);
set_path_normal(pwd);
if(chdir(outputdir) != 0)
{
DC_log(LOG_ERR,"Cannot move into %s dir.",outputdir);
return DC_ERROR;
}
output_dir[0] = outputdir;
DC_log(LOG_DEBUG,"***output_dir: %s",output_dir[0]);
/* .out file */
sprintf(file, "%s.out", id);
exist = fopen(file, "r");
if(exist != NULL)
{
std_out[0] = strdup(file);
fclose(exist);
}
else
{
std_out[0] = NULL;
}
/* .err file */
exist = NULL;
sprintf(file, "%s.err", id);
exist = fopen(file, "r");
if(exist != NULL)
{
std_err[0] = strdup(file);
fclose(exist);
}
else
{
std_err[0] = NULL;
}
/* .log file */
exist = NULL;
sprintf(file, "%s.log", id);
exist = fopen(file, "r");
if(exist != NULL)
{
sys_log[0] = strdup(file);
fclose(exist);
*exitcode = exit_code(file);
}
else
{
sys_log[0] = NULL;
}
set_path_normal(pwd);
free(pwd);
return 0;
}
char *extract_id(const char *wu_name)
{
int i;
char* id;
/* id is the last 36 characters of the wu_name */
id = (char *)strrchr(wu_name, '\0');
for(i=0;i<36;i++)
{
id--;
}
return id;
}

34
dcapi/clgr/get_out.h Normal file
View File

@ -0,0 +1,34 @@
#ifndef __GET_OUT_H_
#define __GET_OUT_H_
#ifdef __cplusplus
extern "C" {
#endif
/**
* Function: get_out() - retreives the output of a completed job
*
* Arguments:
* wu_name:
* work_dir:
* output_dir:
* result_name:
* outfiles[]:
* num_of_outfiles:
*
* Output:
* get_out() returns 0 on success and fills out its arguments
*
*
**/
int get_out(const char *wu_name, const char *work_dir, char *output_dir[1], char *result_name[1],
char *std_out[1], char *std_err[1], char *sys_log[1], int *exitcode, int IsFinished);
//char *extract_id(const char *wu_name);
#ifdef __cplusplus
}
#endif
#endif /* __GET_OUT_H_ */

53
dcapi/clgr/remove.c Normal file
View File

@ -0,0 +1,53 @@
#include<string.h>
#include<stdlib.h>
#include<stdio.h>
#include "defines.h"
#include "dc.h"
int remove_wu(char *wuname, char *workdir)
{
FILE *f;
char syscmd[512], buf[512];
char *id;
buf[0] = 0;
DC_log(LOG_DEBUG,"REMOVE: %s work unit is under removing.", wuname);
sprintf(syscmd, "clgr_rm -i %s", wuname);
f = popen(syscmd, "r");
if(!feof(f))
{
fgets(buf, sizeof(buf), f);
pclose(f);
}
else
{
DC_log(LOG_ERR,"REMOVE: Cannot accomplish clgr_rm system-command!");
return DC_ERROR;
}
if (strlen(buf) == 0)
{
DC_log(LOG_ERR,"REMOVE: Cannot accomplish clgr_rm system-command!");
return DC_ERROR;
}
id = strdup(buf);
id = strtok(buf, " ");
id = strtok(NULL, " ");
id = strtok(NULL, " ");
id = strtok(NULL, " ");
if (!strcmp(id, "ok"))
{
DC_log(LOG_ERR,"REMOVE: clgr_rm error message: '%s'", id);
return DC_ERROR;
}
DC_log(LOG_DEBUG,"REMOVE: %s workunit was removed.", wuname);
return DC_OK;
}

18
dcapi/clgr/remove.h Normal file
View File

@ -0,0 +1,18 @@
#ifndef __REMOVE_H_
#define __REMOVE_H_
#ifdef __cplusplus
extern "C" {
#endif
/* Function: Removes a job from the info system.
* Arguments:
* - wuname: the name of the selected job in the info system*/
int remove_wu(char *wuname, char *workdir);
#ifdef __cplusplus
}
#endif
#endif /* __RMOVE_H_ */

69
dcapi/clgr/result.c Normal file
View File

@ -0,0 +1,69 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "dc.h"
#include "result.h"
#include "wu.h"
DC_Result dc_result_create(char *name, char *wuname, char *dir, char *std_out, char *std_err, char *sys_log, int exitcode)
{
dc_result *result;
result = (dc_result *) malloc (sizeof(dc_result));
result->name = strdup(name);
result->wu = dc_wu_findByName(wuname);
result->outfiles_dir = strdup(dir);
if (std_out != NULL) result->std_out = strdup(std_out);
else result->std_out = NULL;
if (std_err != NULL) result->std_err = strdup(std_err);
else result->std_err = NULL;
if (sys_log != NULL) result->sys_log = strdup(sys_log);
else result->sys_log = NULL;
result->exitcode = exitcode;
return (DC_Result) result;
}
/*
int dc_result_addOutputFile(DC_Result result, char *filename)
{
dc_result *dcr = (dc_result *) result;
if (dcr->noutfiles >= MAX_OUTFILES-1) {
DC_log( LOG_ERR, "Too many output files for one result!");
return DC_ERROR;
}
dcr->outfiles[dcr->noutfiles] = strdup(filename);
dcr->noutfiles++;
return DC_OK;
}
*/
void dc_result_free(DC_Result result)
{
dc_result *res;
if (result != NULL) {
res = (dc_result *) result;
if (res->name != NULL) {
free(res->name);
res->name = NULL;
}
if (res->outfiles_dir != NULL) {
free(res->outfiles_dir);
res->outfiles_dir = NULL;
}
if (res->std_out != NULL) {
free(res->std_out);
res->std_out = NULL;
}
if (res->std_err != NULL) {
free(res->std_err);
res->std_err = NULL;
}
if (res->sys_log != NULL) {
free(res->sys_log);
res->sys_log = NULL;
}
free(res);
}
}

19
dcapi/clgr/result.h Normal file
View File

@ -0,0 +1,19 @@
#ifndef __RESULT_H_
#define __RESULT_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "dc.h"
DC_Result dc_result_create(char *name, char *wuname, char *dir, char *std_out, char *std_err, char *sys_log, int exitcode);
//int dc_result_addOutputFile(DC_Result result, char *filename);
void dc_result_free(DC_Result result);
#ifdef __cplusplus
}
#endif
#endif /* __RESULT_H_ */

135
dcapi/clgr/status.c Normal file
View File

@ -0,0 +1,135 @@
#include<stdio.h>
#include<string.h>
#include<time.h>
#include<stdlib.h>
#include"defines.h"
#include"dc.h"
int ask_status(const char *wu_name, int *status, time_t *subresult_time, time_t *ckpt_time)
{
FILE *clgr_status;
char command[512];
char result[512];
char variable[512];
char *value;
char clgr_string[32];
int len; //n_of_lines = 0;
int back = STATE_UNKNOWN;
//int i;
DC_log(LOG_DEBUG,"STATUS: wu_name: '%s'", wu_name);
sprintf(command, "clgr_status -l -i %s", wu_name);
clgr_status = popen(command, "r");
/********************/
/* reset variables */
/******************/
*subresult_time = *ckpt_time = 0;
/*********************************/
/* read through the status info */
/*******************************/
while(!feof(clgr_status))
{
//n_of_lines++;
fgets(result, sizeof(result), clgr_status);
if ((strchr(result,'=')) == NULL) break;
//DC_log(LOG_DEBUG,"%s", result);
/*****************************/
/* retreive variable string */
/***************************/
len = strcspn(result, " =");
if( len < 512)
{
snprintf(variable, len+1, "%s", result);
//DC_log(LOG_DEBUG,"STATUS: variable_name: '%s'", variable);
}
else {
DC_log(LOG_ERR, "status.c: status field longer than 512 chars");
return DC_ERROR;
}
if(!strcmp(variable, "status"))
{
/***************************/
/* retreive STATUS string */
/*************************/
/*value = result;*/
value = (char *)strchr(result, '=');
value++;
value++;
len = strcspn(value, "\n");
if (len > 31) len = 31;
snprintf(clgr_string, len+1, "%s", value);
DC_log(LOG_DEBUG,"STATUS: retreived_status: '%s'", clgr_string);
/*********************/
/* set return state */
/*******************/
if(strcmp(clgr_string, "DONE") == 0)
back = STATE_FINISHED;
else if(strcmp(clgr_string, "RUNNING") == 0)
back = STATE_RUNNING;
else if(strcmp(clgr_string, "QUEUED_ACTIVE") == 0)
back = STATE_SUBMITTED;
else
back = STATE_UNDEFINED;
//DC_log(LOG_DEBUG,"STATUS: status retreived successfully!");
}
else if(!strcmp(variable, "subresult_time"))
{
/***********************************/
/* retreive SUBRESULT_TIME string */
/*********************************/
value = result;
value = (char *)strchr(result, '=');
value++;
value++;
len = strcspn(value, "\n");
if (len > 31) len = 31;
snprintf(clgr_string, len+1, "%s", value);
*subresult_time = (time_t) atoi(clgr_string);
DC_log(LOG_DEBUG,"STATUS: subresult time (%d) retreived successfully!", subresult_time);
}
else if(!strcmp(variable, "ckpt_time"))
{
/***********************************/
/* retreive CHEKPOINT_TIME string */
/*********************************/
value = result;
value = (char *)strchr(result, '=');
value++;
value++;
len = strcspn(value, "\n");
if (len > 31) len = 31;
snprintf(clgr_string, len+1, "%s", value);
*ckpt_time = (time_t) atoi(clgr_string);
DC_log(LOG_DEBUG,"STATUS: checkpoint time (%d) retreived successfully!", ckpt_time);
}
}
pclose(clgr_status);
// If the CLGR system doesn't found target wu.
/* if (n_of_lines == 1){
DC_log(LOG_ERR,"STATUS: No status found for %s workunit.", wu_name);
*status = STATE_UNKNOWN;
return DC_ERROR;
}
*/
*status = back;
DC_log(LOG_DEBUG, "STATUS: status retreived successfully!");
return DC_OK;
}

37
dcapi/clgr/status.h Normal file
View File

@ -0,0 +1,37 @@
#ifndef __STATUS_H_
#define __STATUS_H_
#ifdef __cplusplus
extern "C" {
#endif
/**
* Function: ask_status(const char *wu_name, int *status,
* time_t *subresult_time, time_t *ckpt_time) -
* retreives the status info of a submitted job identified by
* the Work Unit Name (wu_name) of the job
* In addition it also retreives
* - Subresult Time (subresult_time) and
* - Chekpoint Time (ckpt_time)
* of the job, if any, or returns NULL if missing.
*
*
* Argumets:
* char *wu_name: work_unit name
* int *status: return argument, status of the job
* int *subresult_time: return argument, subresult time info
* int *ckpt_time: return argument, chekpoint time info
*
* Output:
* Submit() returns zero upon success and the collected infos can
* be retreived from it's return arguments
*
**/
int ask_status(const char *wu_name, int *status, time_t *subresult_time, time_t *ckpt_time);
#ifdef __cplusplus
}
#endif
#endif /* __STATUS_H_ */

341
dcapi/clgr/submit.c Normal file
View File

@ -0,0 +1,341 @@
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
//#ifndef MAX_INFILES
#include "defines.h"
//#endif
#include "dc.h"
static int create_work_dir(const char *work_dir_base, const char *workdir_id, const char *work_dir_num);
static int create_jobdir_structure();
static int create_submit_file(const char *job_name, const char *executable, const char *args);
static int copy_executables(const char *executable_dir, const char *work_dir);
static int copy_inputfiles(int num_of_infiles,const char *infiles[MAX_INFILES],
const char *localfilenames[MAX_INFILES], const char *work_dir);
static char *job_submit(const char *work_dir);
static char *retreive_id(const char *result);
static char *locate_path();
static int set_path_normal(const char *path);
static char *locate_path()
{
char buf[1024];
getcwd(buf, sizeof(buf));
return strdup(buf);
}
static int set_path_normal(const char *path)
{
if (chdir(path) != 0) return DC_ERROR;
//else DC_log(LOG_DEBUG," *** Actual path has been set to '%s'", path);
return DC_OK;
}
char *submit(const char *work_dir_base, const char *work_dir_id, const char *work_dir_num,
const char *executable_dir, const char *executable, const char *job_name,
const char *args, int num_of_infiles,
const char *infiles[MAX_INFILES], const char *localfilenames[MAX_INFILES])
{
char *pwd;
char *id;
char work_dir[512];
sprintf(work_dir, "%s/%s/%s", work_dir_base, work_dir_id, work_dir_num);
//DC_log(LOG_DEBUG,"function_submit_called_OK");
pwd = locate_path();
/* create workdir */
if(create_work_dir(work_dir_base, work_dir_id, work_dir_num) != DC_OK)
{
free(pwd);
return NULL;
}
//DC_log(LOG_DEBUG,"'%s' working directory created!", work_dir);
/***************************/
/* create jobdir structure */
/***************************/
if(create_jobdir_structure() != DC_OK)
{
free(pwd);
return NULL;
}
//DC_log(LOG_DEBUG,"Jobdir structure created!");
/**********************/
/* create submit file */
/**********************/
if(create_submit_file(job_name, executable, args) != DC_OK)
{
free(pwd);
return NULL;
}
//DC_log(LOG_DEBUG,"Submit file created!");
/***************************/
/* copy executables in bin */
/***************************/
if(copy_executables(executable_dir, work_dir) != DC_OK)
{
free(pwd);
return NULL;
}
//DC_log(LOG_DEBUG,"Executable file(s) copied into '%s/bin'!", work_dir);
/********************/
/* copy input files */
/********************/
if(copy_inputfiles(num_of_infiles, infiles,
localfilenames, work_dir) != DC_OK)
{
free(pwd);
return NULL;
}
//DC_log(LOG_DEBUG,"Input file(s) copied into '%s/input'!", work_dir);
id = job_submit(work_dir);
//DC_log(LOG_DEBUG,"'%s' job submitted!", id);
if(set_path_normal(pwd) == DC_ERROR)
{
free(pwd);
return NULL;
}
free(pwd);
return id;
}
int resubmit(char *workdir, char *wu_name[1])
{
char *pwd;
char *id;
//DC_log(LOG_DEBUG,"SUBMIT: workdir: '%s'", workdir);
pwd = locate_path();
//DC_log(LOG_DEBUG,"SUBMIT: locate path: '%s'", pwd);
id = job_submit(workdir);
//DC_log(LOG_DEBUG,"SUBMIT: wuname: '%s'", id);
if(set_path_normal(pwd) == DC_ERROR)
{
free(pwd);
return DC_ERROR;
}
free(pwd);
wu_name[0] = strdup(id);
return DC_OK;
}
int create_work_dir(const char *work_dir_base, const char *work_dir_id, const char *work_dir_num)
{
char path[128];
if(mkdir(work_dir_base, (S_IRWXU|S_IRWXG|S_IRWXO)) != 0)
{
//return -1;
}
sprintf(path, "%s", work_dir_base);
if(chdir(path) != 0)
{
DC_log(LOG_ERR,"Cannot move into '%s' directory!", work_dir_base);
return DC_ERROR;
}
if(mkdir(work_dir_id, (S_IRWXU|S_IRWXG|S_IRWXO)) != 0)
{
//return -3;
}
sprintf(path, "./%s", work_dir_id);
if(chdir(path) != 0)
{
DC_log(LOG_ERR,"Cannot move into '%s/%s' directory!", work_dir_base, work_dir_id);
return DC_ERROR;
}
if(mkdir(work_dir_num, (S_IRWXU|S_IRWXG|S_IRWXO)) != 0)
{
DC_log(LOG_ERR,"The following directory already exists : '%s/%s/%s'!",
work_dir_base, work_dir_id, work_dir_num);
return DC_ERROR;
}
sprintf(path, "./%s", work_dir_num);
if(chdir(path) != 0)
{
DC_log(LOG_ERR,"Cannot move into '%s/%s/%s' directory!",
work_dir_base, work_dir_id, work_dir_num);
return DC_ERROR;
}
return DC_OK;
}
int create_jobdir_structure()
{
if (mkdir("bin", (S_IRWXU|S_IRWXG|S_IRWXO) ) != 0)
{
DC_log(LOG_ERR,"Cannot create 'bin' directory!");
return DC_ERROR;
}
if (mkdir("input", (S_IRWXU|S_IRWXG|S_IRWXO)) != 0)
{
DC_log(LOG_ERR,"Cannot create 'input' directory!");
return DC_ERROR;
}
if (mkdir("output", (S_IRWXU|S_IRWXG|S_IRWXO)) != 0)
{
DC_log(LOG_ERR,"Cannot create 'output' directory!");
return DC_ERROR;
}
if (mkdir("tmp", (S_IRWXU|S_IRWXG|S_IRWXO)) != 0)
{
DC_log(LOG_ERR,"Cannot create 'tmp' directory!");
return DC_ERROR;
}
return DC_OK;
}
int create_submit_file(const char *job_name, const char *executable,
const char *args)
{
FILE *f;
f = fopen("submit","w");
if(f != NULL)
{
/* jobname */
fprintf(f, "[");
fprintf(f, job_name);
fprintf(f, "]");
fprintf(f, "\n");
/* executable */
fprintf(f, "executable = ");
fprintf(f, executable);
fprintf(f, "\n");
/* arguments */
fprintf(f, "arguments = ");
fprintf(f, args);
fprintf(f, "\n");
/* arg type */
fprintf(f, "type = seq");
/* These (3) lines make the job run on sztaki.testlab */
//fprintf(f, "\n");
//fprintf(f, "resource_id = https://n0.sztaki-testlab.grid/\n");
//fprintf(f, "mercury = true");
fclose(f);
return DC_OK;
}else{
return DC_ERROR;
}
}
int copy_executables(const char *executable_dir, const char *work_dir)
{
//FILE *cp_cmd;
char command[2024];
chdir("../../..");
snprintf(command, 2024, "cp -r %s/* %s/bin/", executable_dir, work_dir);
if (system(command) == -1) return DC_ERROR;
return DC_OK;
}
int copy_inputfiles(int num_of_infiles, const char *infiles[MAX_INFILES],
const char *localfilenames[MAX_INFILES], const char *work_dir)
{
int i;
char command[2024];
//FILE *cp_cmd;
for(i=0; i < num_of_infiles; i++)
{
snprintf(command, 2024, "cp %s %s/input/%s", infiles[i], work_dir, localfilenames[i]);
if (system(command) == -1) return DC_ERROR;
}
return DC_OK;
}
char *job_submit(const char *work_dir)
{
FILE *clgr_submit;
char *command = "clgr_submit .";
char result[100];
char *id;
if(chdir(work_dir) != 0)
{
DC_log(LOG_ERR,"job_submit: cannot move into '%s' directory!", work_dir);
return NULL;
}
clgr_submit = popen(command, "r");
if(clgr_submit == NULL)
{
id = NULL;
}else
{
fgets(result, sizeof(result), clgr_submit);
pclose(clgr_submit);
/* retreive identifier string */
id = retreive_id(result);
}
//DC_log(LOG_DEBUG,"%s job submitted!", id);
return id;
}
char *retreive_id(const char *result)
{
char *ok;
int len;
char *res;
char *id;
ok = strstr(result, "ok");
if(ok == NULL)
{
id = NULL;
}else{
res = (char *)strchr(result, ' ');
res++;
len = strcspn(res, " ");
id = (char *) malloc(len*sizeof(char));
if(id != NULL)
snprintf(id, len+1, "%s", res);
}
return id;
}

60
dcapi/clgr/submit.h Normal file
View File

@ -0,0 +1,60 @@
#ifndef __SUBMIT_H_
#define __SUBMIT_H_
#ifdef __cplusplus
extern "C" {
#endif
//#define MAX_INFILES 4
/**
* Function: Submit() - creates directory and file structure for
* the job to be submitted and than submits
* the job in the ClusterGrid for processing
*
* Arguments:
* work_dir_base - the base directory for the job (eg.: wrokdir)
* work_dir_id - the identifier for the job (eg.: )
* work_dir_num - number of the job (eg.: 1)
* executable_dir - function gets the executables for the job from this directory (eg.: ./executable)
* executable - the name of the executable of the job (eg.: worker)
* job_name - name of the job (eg.: test_job)
* args - arguments of the job
* arg_type - type of the arguments
* num_of_infiles - number of the input files the executable processes
* infiles[] - input files with path
* localfilenames - input filenames gets renamed for these names
*
* Output:
* Submit() returns a job identifier string upon success
* or returns NULL upon faliure
*
* Remarks:
* Submit uses function retreive_id() to extract the job identifier from the string
* that is returned by clgr_submit command and it reserves memory for the id with a malloc
* To free the memory allocated for the id is the job of the caller of Submit()
*
**/
char *submit(const char *work_dir_base, const char *work_dir_id, const char *work_dir_num,
const char *executable_dir, const char *executale, const char *job_name,
const char *args, int num_of_infiles,
const char *infiles[MAX_INFILES], const char *localfilenames[MAX_INFILES]);
int resubmit(char *workdir, char *wu_name[1]);
/*
int create_work_dir(const char *work_dir_base, const char *workdir_id, const char *work_dir_num);
int create_jobdir_structure();
int create_submit_file(const char *job_name, const char *executable, const char *args, const char *arg_type);
int copy_executables(const char *executable_dir, const char *work_dir);
int copy_inputfiles(int num_of_infiles, char* infiles[MAX_INFILES],
char* localfilenames[MAX_INFILES], const char *work_dir);
char *job_submit(const char *work_dir);
char *retreive_id(const char *result);
*/
#ifdef __cplusplus
}
#endif
#endif /* __SUBMIT_H_ */

881
dcapi/clgr/wu.C Normal file
View File

@ -0,0 +1,881 @@
/* Include BOINC Headers */
//#include "rsaeuro.h"
//#include "crypt.h"
//#include "backend_lib.h"
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include "dc.h"
#include "wu.h"
#include "result.h"
#include "defines.h"
#include "submit.h"
#include "status.h"
#include "get_out.h"
#include "remove.h"
/* PRIVATE */
/* maximum number of work units */
#define MAX_N_WU 1024
static char *state_strings[5] = {"invalid", "created", "submitted",
"running", "finished"};
//#define MAX_N_WU 1024
typedef struct {
char *wuname;
char *clientname;
char *arguments;
char *workdir;
char *infiles[MAX_INFILES];
char *localfilenames[MAX_INFILES];
int ninfiles;
int state;
int priority;
time_t subresult_time;
time_t ckpt_time;
} dc_wu;
static dc_wu wutable[MAX_N_WU];
static int wu_max, wu_sum;
//static int dc_wu_findByName(char *wuname);
static int dc_wu_findByState(int state, int from);
static int dc_wu_isWUValid(DC_Workunit wu);
static int dc_wu_getWUIndex(DC_Workunit wu);
dc_wu* dc_wu_getWU(DC_Workunit wu);
//int dc_wu_getWUbyName(const char *wuname);
static char *dc_projectRootDir;
static char *dc_exec_dir;
static char *dc_workdir_id;
//static char *dc_uploadPrivateFile;
int dc_wu_isWUValid(DC_Workunit wu)
{
if ((wu < 0) || (wu >= MAX_N_WU)) return DC_ERROR;
else if (wutable[wu].state == STATE_INVALID) return DC_ERROR;
return DC_OK;
}
dc_wu* dc_wu_getWU(DC_Workunit wu)
{
return (&wutable[wu]);
}
int dc_wu_getWUIndex(DC_Workunit wu)
{
return (int)(wu);
}
/* Public */
int dc_wu_findByName(char *wuname)
{
int ndx = 0;
int ex = 0;
while (ndx < MAX_N_WU && !ex) {
if (wutable[ndx].wuname != NULL &&
!strcmp(wutable[ndx].wuname, wuname)) ex = 1;
else {
ndx++;
}
}
return ndx;
}
void dc_wu_init(const char *projectroot, const char *workdir_id, const char *exec_dir) //, const char *uploadkeyfile)
{
int i;
for (i = 0; i < MAX_N_WU; i++) {
wutable[i].wuname = NULL;
wutable[i].clientname = NULL;
wutable[i].arguments = NULL;
wutable[i].workdir = NULL;
// wutable[i].wutemplate = NULL;
// wutable[i].resulttemplate = NULL;
wutable[i].ninfiles = 0;
wutable[i].state = STATE_INVALID;
wutable[i].priority = 100;
wutable[i].subresult_time = 0;
wutable[i].ckpt_time = 0;
}
wu_max = wu_sum = 0;
dc_projectRootDir = strdup(projectroot);
dc_workdir_id = strdup(workdir_id);
dc_exec_dir = strdup(exec_dir);
//dc_uploadPrivateFile = strdup(uploadkeyfile);
}
DC_Workunit dc_wu_create(const char *clientname,
const char *arguments)
{
int ndx = 0;
int ex = 0;
/* Look for an empty slow in dc_wu table */
while (ndx < MAX_N_WU && !ex) {
if (wutable[ndx].state == STATE_INVALID) ex = 1;
else ndx++;
}
if (ndx == MAX_N_WU) {
DC_log(LOG_ERR, "Too many work units (%d) at once",
MAX_N_WU);
return 0;
}
if (ndx > wu_max) wu_max = ndx;
wu_sum++;
/* Fill in information */
wutable[ndx].clientname = strdup(clientname);
wutable[ndx].arguments = strdup(arguments);
//wutable[ndx].workdir = strdup(workdir);
//wutable[ndx].wutemplate = strdup(wutemplate);
//wutable[ndx].resulttemplate = strdup(resulttemplate);
wutable[ndx].state = STATE_CREATED;
return (DC_Workunit) ndx;
}
void dc_wu_getFileName_fromPath(const char *path, char *fn)
{
/* memory for fn should be allocated before calling this function! */
char *lastdelim;
lastdelim = strrchr( path, '/');
if (lastdelim == NULL)
strcpy(fn, (char *) path);
else if (strlen(lastdelim) == 0)
fn[0]='\0';
else
strcpy(fn, lastdelim+1);
}
int dc_wu_setInput(DC_Workunit wu, const char *url, const char* localfilename)
{
dc_wu* dcwu;
//char downloadpath[256], syscmd[1024];
char filename[256]; //, downloadfilename[256];
if (dc_wu_isWUValid(wu) == DC_ERROR) return DC_ERROR;
dcwu = dc_wu_getWU(wu);
if (dcwu->ninfiles >= MAX_INFILES-1) {
DC_log( LOG_ERR, "Too many input files for one wu!");
return DC_ERROR;
}
/* Copy input files (extended by _'wuname') into boinc download directory */
dc_wu_getFileName_fromPath(url, filename);
if (filename == NULL) {
DC_log(LOG_ERR, "File name is not given in URL '%s'", url);
}
// snprintf(downloadfilename, 256, "%s_%s", filename, dcwu->wuname);
// snprintf(downloadpath, 256, "%s/download/%s", dc_projectRootDir, downloadfilename);
// snprintf(syscmd, 1024, "cp %s %s", url, downloadpath);
// if (system(syscmd) == -1) return DC_ERROR;
dcwu->infiles[dcwu->ninfiles] = strdup(filename);
dcwu->localfilenames[dcwu->ninfiles] = strdup(localfilename);
// DC_log( LOG_DEBUG, "Input file '%s' added to wu '%s' and copied into %s",
// dcwu->infiles[dcwu->ninfiles], dc->.wuname, downloadpath);
DC_log( LOG_DEBUG, "Input file parameters added to wu:");
DC_log( LOG_DEBUG, "Input file: %s, Local name: %s;",dcwu->infiles[dcwu->ninfiles],dcwu->localfilenames[dcwu->ninfiles]);
dcwu->ninfiles++;
return DC_OK;
}
int dc_wu_setPriority(DC_Workunit wu, int priority)
{
dc_wu* dcwu;
if (dc_wu_isWUValid(wu) != DC_OK) return DC_ERROR;
dcwu = dc_wu_getWU(wu);
dcwu->priority = priority;
return DC_OK;
}
int dc_wu_destroy(DC_Workunit wu)
{
dc_wu* dcwu;
if (dc_wu_isWUValid(wu) != DC_OK) return DC_ERROR;
dcwu = dc_wu_getWU(wu);
if ((dcwu->state == STATE_SUBMITTED) || (dcwu->state == STATE_RUNNING) || (dcwu->state == STATE_FINISHED))
if (remove_wu(dcwu->wuname, dcwu->workdir) != DC_OK )
{
DC_log(LOG_ERR,"Cannot remove %s work unit from the info system.", dcwu->wuname);
return DC_ERROR;
}
if (dcwu->wuname != NULL) free(dcwu->wuname);
if (dcwu->clientname != NULL) free(dcwu->clientname);
if (dcwu->arguments != NULL) free(dcwu->arguments);
if (dcwu->workdir != NULL) free(dcwu->workdir);
// if (dcwu->wutemplate != NULL) free(dcwu->wutemplate);
// if (dcwu->resulttemplate != NULL) free(dcwu->resulttemplate);
dcwu->state = STATE_INVALID;
return DC_OK;
}
int dc_wu_submitWU(DC_Workunit wu)
{
dc_wu* dcwu;
//char *workdir_base;
//char *workdir_id;
char buf[512];
char *workdir_num;
char *executable_dir, *jobname,
*executable, *arguments;
// char *inputfiles[MAX_INFILES], *local_files[MAX_INFILES];
int n_of_infiles, wundx, i; //actual_infile = 0;
const char *input_files[MAX_INFILES], *local_files[MAX_INFILES];
if (dc_wu_isWUValid(wu) != DC_OK) return DC_ERROR;
dcwu = dc_wu_getWU(wu);
wundx = dc_wu_getWUIndex(wu) + 1;
// working_dir = strdup("%s/%d",dc_projectRootDir, wundx);
// snprintf(working_dir, 512, "%s/%d", dc_projectRootDir, wundx);
// working_dir = strdup(dc_projectRootDir);
snprintf(buf, 512, "%d", wundx);
workdir_num = strdup(buf);
executable_dir = strdup(dc_exec_dir);
n_of_infiles = dcwu->ninfiles;
jobname = strdup("Clgr-test");
executable = strdup(dcwu->clientname);
arguments = strdup(dcwu->arguments);
//type = strdup("batch");
snprintf(buf, 512, "%s/%s/%s", dc_projectRootDir, dc_workdir_id, workdir_num);
dcwu->workdir = strdup(buf);
DC_log(LOG_DEBUG,"Workunit_directory: '%s'", dcwu->workdir);
/*
if (dcwu->ninfiles != 0)
{
input_files = (char **) malloc (sizeof(dcwu->infiles[actual_infile]));
local_files = (char **) malloc (sizeof(dcwu->localfilenames[actual_infile]));
local_files[actual_infile] = strdup(dcwu->localfilenames[actual_infile]);
input_files[actual_infile] = strdup(dcwu->infiles[actual_infile]);
actual_infile++;
while (actual_infile < dcwu->ninfiles)
{
input_files = (char **) realloc (input_files, sizeof(input_files) + sizeof(dcwu->infiles[actual_infile]));
input_files[actual_infile] = strdup(dcwu->infiles[actual_infile]);
local_files = (char **) realloc (local_files, sizeof(local_files) + sizeof(dcwu->localfilenames[actual_infile]));
local_files[actual_infile] = strdup(dcwu->localfilenames[actual_infile]);
actual_infile++;
}
}
*/
for (i = 0; i < MAX_INFILES; i++)
{
if (dcwu->infiles[i] != NULL)
input_files[i] = strdup(dcwu->infiles[i]);
else input_files[i] = NULL;
if (dcwu->localfilenames[i] != NULL)
local_files[i] = strdup(dcwu->localfilenames[i]);
else local_files[i] = NULL;
}
/*
DC_log(LOG_DEBUG,"** Workdir_base : %s", dc_projectRootDir);
DC_log(LOG_DEBUG,"** Workdir_id : %s", dc_workdir_id);
DC_log(LOG_DEBUG,"** Workdir_num : %s", workdir_num);
DC_log(LOG_DEBUG,"** Executable Directory : %s",executable_dir);
DC_log(LOG_DEBUG,"** Executable : %s", executable);
DC_log(LOG_DEBUG,"** Jobname : %s",jobname);
DC_log(LOG_DEBUG,"** Arguments : %s", arguments);
DC_log(LOG_DEBUG,"** Type : %s", type);
DC_log(LOG_DEBUG,"** Number of input files : %d",n_of_infiles);
if (n_of_infiles != 0) DC_log(LOG_DEBUG,"** Input File(s):");
for (i = 0; i < n_of_infiles; i++)
DC_log(LOG_DEBUG," - %s", dcwu->infiles[i]);
if (n_of_infiles != 0) DC_log(LOG_DEBUG,"** Local Filename(s):");
for (i = 0; i < n_of_infiles; i++)
DC_log(LOG_DEBUG," - %s", dcwu->localfilenames[i]);
*/
dcwu->wuname = submit(dc_projectRootDir, dc_workdir_id, workdir_num,
executable_dir, executable, jobname, arguments,
n_of_infiles, input_files, local_files);
DC_log(LOG_DEBUG,"The '%s' workunit is submitted", dcwu->wuname);
dcwu->state = STATE_SUBMITTED;
return DC_OK;
}
int dc_wu_suspend(DC_Workunit wu)
{
dc_wu *dcwu;
if (dc_wu_isWUValid(wu) != DC_OK) return DC_ERROR;
dcwu = dc_wu_getWU(wu);
if ((dcwu->state == STATE_SUBMITTED) || (dcwu->state == STATE_RUNNING) || (dcwu->state == STATE_FINISHED))
{
dcwu->state = STATE_SUSPENDED;
return remove_wu(dcwu->wuname, dcwu->workdir);
}
DC_log(LOG_ERR,"%s work unit is not submitted, so can't be suspended!");
return DC_ERROR;
}
int dc_wu_resubmit(DC_Workunit wu)
{
dc_wu *dcwu;
char *id[1];
if (dc_wu_isWUValid(wu) != DC_OK) return DC_ERROR;
dcwu = dc_wu_getWU(wu);
if (dcwu->state != STATE_SUSPENDED)
{
DC_log(LOG_ERR,"%s work unit is not a suspended work unit, so it can't be resubmitted.", dcwu->wuname);
return DC_ERROR;
}
DC_log(LOG_DEBUG,"The '%s' workunit is under Re-submittion...", dcwu->wuname);
if (resubmit(dcwu->workdir, id) != DC_OK)
{
DC_log(LOG_ERR,"Cannot Re-submitting target workunit.");
return DC_ERROR;
}
dcwu->wuname = strdup(id[0]);
DC_log(LOG_DEBUG,"The Re-submittion is finished. The new name of the workunit is '%s'", dcwu->wuname);
dcwu->state = STATE_SUBMITTED;
return DC_OK;
}
/*
int dc_wu_createBoincWU(int wundx, char *uploadkeyfile, char *boincRootDir)
{
// char infile_dir[256], download_url[256], upload_url[256];/
char wutempl[4096];
char resulttempl_relativepath[256];
char buf[512];
//R_RSA_PRIVATE_KEY key;
//DB_WORKUNIT bwu;
//DB_APP app;
char db_name[256], db_passwd[256],db_user[256],db_host[256];
//SCHED_CONFIG config;
int retval;
if ((wundx >= MAX_N_WU) || (wundx < 0)) return DC_ERROR;
DC_log(LOG_DEBUG, "Create a wu for boinc based on info:\n"
"wuname = '%s', clientname = '%s', arguments = '%s', workdir = '%s' "
"wutemplate = '%s', resulttemplate = '%s'",
wutable[wundx].wuname, wutable[wundx].clientname, wutable[wundx].arguments, wutable[wundx].workdir,
wutable[wundx].wutemplate, wutable[wundx].resulttemplate);
DC_log(LOG_DEBUG, "\t1. Read upload private key from: %s", uploadkeyfile);
// retval = read_key_file(uploadkeyfile, key);
// if (retval) {
// DC_log(LOG_ERR, "wu.c: can't read key: %d", retval);
// return DC_ERROR;
// }
DC_log(LOG_DEBUG, "\t2. Read wu template file: %s", wutable[wundx].wutemplate);
retval = read_filename(wutable[wundx].wutemplate, wutempl, 4096);
if (retval) {
DC_log(LOG_ERR, "wu.c: can't read template file: %d", retval);
return DC_ERROR;
} // else {
//DC_log(LOG_DEBUG, "WU Templ='%s'", wutempl);
//}
DC_log(LOG_DEBUG, "\t3. Determine result template relative path");
dc_wu_getFileName_fromPath(wutable[wundx].resulttemplate, buf);
snprintf(resulttempl_relativepath, 256, "templates/%s", buf);
DC_log(LOG_DEBUG, "\t %s -> %s", wutable[wundx].resulttemplate, resulttempl_relativepath);
DC_log(LOG_DEBUG, "\t3. Parse boinc project config file");
retval = config.parse_file(boincRootDir);
if (retval) {
DC_log(LOG_ERR, "Can't parse config file: %d", retval);
return DC_ERROR;
} else {
strcpy(db_name, config.db_name);
strcpy(db_passwd, config.db_passwd);
strcpy(db_user, config.db_user);
strcpy(db_host, config.db_host);
}
strcpy(app.name, wutable[wundx].clientname);
DC_log(LOG_DEBUG,
"\t4. Init MySQL connection and get appid for client %s\n"
"\t DB name=%s, host=%s, user=%s",
app.name, db_name, db_host, db_user);
retval = boinc_db.open(db_name, db_host, db_user, db_passwd);
if (retval) {
DC_log(LOG_ERR, "wu.c: error opening database: %d", retval );
return DC_ERROR;
}
sprintf(buf, "where name='%s'", app.name);
retval = app.lookup(buf);
if (retval) {
DC_log(LOG_ERR, "wu.c: app not found in database");
return DC_ERROR;
} else {
DC_log(LOG_DEBUG, "\t Application id in database = %d", app.id);
}
DC_log(LOG_DEBUG, "\t5. Create Boinc workunit");
//
//snprintf(infile_dir, 256, "%s/download", dc_projectRootDir);
//snprintf(download_url, 256, "http://n24.hpcc.sztaki.hu/proba1/download");
//snprintf(upload_url, 256, "http://n24.hpcc.sztaki.hu/proba1/upload");
//
strncpy(bwu.name, wutable[wundx].wuname, 256);
bwu.appid = app.id;
bwu.batch = 1;
bwu.rsc_fpops_est = 200000 ;
bwu.rsc_fpops_bound = 2000000000 ;
bwu.rsc_memory_bound = 2000000;
bwu.rsc_disk_bound = 2000000;
bwu.delay_bound = 720000;
DC_log(LOG_DEBUG,
"\t Call Boinc create_work with parameters:\n"
"\t appid = %d, wutempl = '%50.50s...'\n"
"\t resulttemplate = '%s' and '%s'\n"
"\t # of inputfiles = %d, key.mod = '%40.40s'...",
bwu.appid, wutempl, resulttempl_relativepath, wutable[wundx].resulttemplate,
wutable[wundx].ninfiles, key.modulus);
DC_log(LOG_DEBUG, "\t *** This is the 1. checkpoint !!! ***");
retval = create_work(
bwu,
wutempl, // WU template, contents, not path
resulttempl_relativepath, // Result template filename, relative to project root
wutable[wundx].resulttemplate, // Result template, absolute path,
const_cast<const char **>(wutable[wundx].infiles), // array of input file names
wutable[wundx].ninfiles,
key, // upload authentication key
config // data from config.xml
);
DC_log(LOG_DEBUG, "\t *** This is the 2. checkpoint !!! ***");
if (retval) {
DC_log(LOG_ERR, "wu.c, cannot create workunit: %d", retval);
return DC_ERROR;
}
boinc_db.close();
return DC_OK;
}
*/
int dc_wu_reCreate(DC_Workunit wu)
{
dc_wu *dcwu;
char *id[1];
if (dc_wu_isWUValid(wu) != DC_OK) return DC_ERROR;
dcwu = dc_wu_getWU(wu);
DC_log(LOG_DEBUG,"Re-creating %s workunit.", dcwu->wuname);
DC_log(LOG_DEBUG,"Deleting it from the info system...");
if (remove_wu(dcwu->wuname, dcwu->workdir) != DC_OK)
{
DC_log(LOG_ERR,"Cannot delete the workunit under Re-creation, from the info system.");
}
DC_log(LOG_DEBUG,"Re-submitting target workunit...");
if (resubmit(dcwu->workdir, id) != DC_OK)
{
DC_log(LOG_ERR,"Cannot Re-submit workunit.");
return DC_ERROR;
}
dcwu->wuname = strdup(id[0]);
DC_log(LOG_DEBUG,"Receation was successful. The new name of the workunit is '%s'", dcwu->wuname);
dcwu->state = STATE_SUBMITTED;
return DC_OK;
}
int dc_wu_checkForResult(void (*cb_assimilate_result)(DC_Result result))
{
static int actual_wu = MAX_N_WU - 1;
int last_unchecked_wu = 0;
static int counter1 = 0;
//int wustatus;
//char *result_name = NULL;
//char *result_std_out = NULL;
//char *result_std_err = NULL;
//char *result_std_log = NULL;
//int n_of_outfiles;
char *outputdir[1];
char *result_name[1];
char *std_out[1];
char *std_err[1];
char *sys_log[1];
int exitcode = 0;
//int i, n_of_outfiles;
//char *outfiles[MAX_OUTFILES];
//static bool did_something;
time_t dummy1, dummy2;
last_unchecked_wu = actual_wu;
actual_wu++;
if (actual_wu >= MAX_N_WU) actual_wu = 0;
counter1++;
DC_log(LOG_DEBUG,"Check for result %d. running.", counter1);
//DC_log(LOG_DEBUG," ");
/*
if (counter1 == 3)
{
wutable[0].wuname = strdup("https://testentry.niif.grid/clgr-broker/jobs/2206820f-b038-41ac-9e28-88dceae5b16a");
wutable[0].workdir = strdup("./workdir/clusterGrid_ClgrProba1_1110378262/1");
wutable[1].wuname = strdup("https://testentry.niif.grid/clgr-broker/jobs/e2fee82b-e7a9-439f-a072-fb0262d7b071");
wutable[1].workdir = strdup("./workdir/clusterGrid_ClgrProba1_1110378262/2");
}
*/
//did_something = false;
//for (actual_wu = 0; actual_wu < MAX_N_WU; actual_wu++)
while (actual_wu != last_unchecked_wu)
{
if ((wutable[actual_wu].state == STATE_SUBMITTED) || (wutable[actual_wu].state == STATE_RUNNING))
{
//wutable[actual_wu].state = ask_status(wutable[actual_wu].wuname);
wutable[actual_wu].state = STATE_UNKNOWN;
if (ask_status(wutable[actual_wu].wuname, &(wutable[actual_wu].state), &(dummy1), &(dummy2)) != DC_OK)
{
//DC_log(LOG_ERR,"Error in %s work unit's status.",wutable[actual_wu].wuname);
//return -1;
DC_log(LOG_ERR,"Error, while asking status of %s workunit.",wutable[actual_wu].wuname);
if (dc_wu_reCreate(actual_wu) != DC_OK)
{
return -1;
}
}
if (wutable[actual_wu].state == STATE_UNDEFINED)
{
DC_log(LOG_ERR,"%s work unit's state is undefined.", wutable[actual_wu].wuname);
/*
DC_Result dcresult = dc_result_create("error_result", wutable[actual_wu].wuname,
"./", NULL, NULL, NULL, exitcode);
dcresult->status = result_failed;
cb_assimilate_result(dcresult);
return 1;
*/
if (dc_wu_reCreate(actual_wu) != DC_OK)
{
return -1;
}
}
if (wutable[actual_wu].state == STATE_UNKNOWN)
{
DC_log(LOG_ERR,"%s work unit's state is unknown.", wutable[actual_wu].wuname);
/*
DC_Result dcresult = dc_result_create("error_result", wutable[actual_wu].wuname,
"./", NULL, NULL, NULL, exitcode);
dcresult->status = result_failed;
cb_assimilate_result(dcresult);
return 1;
*/
if (dc_wu_reCreate(actual_wu) != DC_OK)
{
return -1;
}
}
//DC_log(LOG_DEBUG,"*%s,\n %s,\n %i", wutable[actual_wu].wuname, wutable[actual_wu].workdir, wutable[actual_wu].state);
if (wutable[actual_wu].state == STATE_FINISHED)
{
//did_something = true;
/*DC_log(LOG_DEBUG,"*** workdir: %s, outputdir: %s, result_name: %s, n_of_files: %d",
wutable[actual_wu].workdir, outputdir[0], result_name[0], n_of_outfiles);*/
if (get_out(wutable[actual_wu].wuname, wutable[actual_wu].workdir, outputdir,
result_name, std_out, std_err, sys_log, &exitcode, GETOUT_FINISHED) != DC_OK)
{
DC_log(LOG_ERR,"Cannot get output of %s work unit.",wutable[actual_wu].wuname);
/*
DC_Result dcresult = dc_result_create("error_result", wutable[actual_wu].wuname,
"./", NULL, NULL, NULL, exitcode);
dcresult->status = result_failed;
cb_assimilate_result(dcresult);
return 1;
*/
if (dc_wu_reCreate(actual_wu) != DC_OK)
{
return -1;
}
}
DC_log(LOG_DEBUG,"The exitcode was: %d", exitcode);
//I think this is the point where the exitcode should be mined from the log fil
//but correct me if I am wrong
//DC_log(LOG_DEBUG,"***%s, %i", wutable[actual_wu].wuname, wutable[actual_wu].state);
/*DC_log(LOG_DEBUG,"*** workdir: %s, outputdir: %s, result_name: %s, n_of_files: %d",
wutable[actual_wu].workdir, outputdir[0], result_name[0]);
*/
DC_log(LOG_DEBUG,"Result creating: %s result, to %s wu, with %s output_dir, FILES: %s, %s, %s;",
result_name[0], wutable[actual_wu].wuname, outputdir[0], std_out[0], std_err[0], sys_log[0]);
DC_Result dcresult = dc_result_create(result_name[0], wutable[actual_wu].wuname, outputdir[0],
std_out[0], std_err[0], sys_log[0], exitcode);
/*for (i = 0; i < n_of_outfiles; i++)
{
//DC_log(LOG_DEBUG,"%s/%s added to %s result.", outputdir[0], outfiles[i], result_name[0]);
dc_result_addOutputFile(dcresult, outfiles[i]);
}*/
dcresult->status = DC_RESULT_FINAL;
cb_assimilate_result(dcresult);
return 1;
}else
/**********************/
/* GET_OUT_SUBRESULT */
/********************/
if (((dummy1 - wutable[actual_wu].subresult_time) != 0) && ((dummy2 - wutable[actual_wu].ckpt_time) != 0))
{
wutable[actual_wu].subresult_time = dummy1;
wutable[actual_wu].ckpt_time = dummy2;
DC_log(LOG_DEBUG,"WU: subresult detected.");
/*DC_log(LOG_DEBUG,"*** work unit state: %s, subresult_time: %d, ckpt_time: %d",
wutable[actual_wu].state, dummy1, dummy2);*/
if (get_out(wutable[actual_wu].wuname, wutable[actual_wu].workdir, outputdir,
result_name, std_out, std_err, sys_log, &exitcode, GETOUT_NOTFINISHED) != DC_OK)
{
DC_log(LOG_ERR,"Cannot get output of %s work unit.",wutable[actual_wu].wuname);
/*
DC_Result dcresult = dc_result_create("error_result", wutable[actual_wu].wuname,
"./", NULL, NULL, NULL, exitcode);
dcresult->status = result_failed;
cb_assimilate_result(dcresult);
return 1;
*/
if (dc_wu_reCreate(actual_wu) != DC_OK)
{
return -1;
}
}
//DC_log(LOG_DEBUG,"***%s, %i", wutable[actual_wu].wuname, wutable[actual_wu].state);
/*DC_log(LOG_DEBUG,"*** workdir: %s, outputdir: %s, result_name: %s FILES: %s,%s,%s;",
wutable[actual_wu].workdir, outputdir[0], result_name[0], std_out[0], std_err[0], sys_log[0]);
*/
DC_log(LOG_DEBUG,"Result creating: %s result, to %s wu, with %s output_dir, FILES: %s,%s,%s;",
result_name[0], wutable[actual_wu].wuname, outputdir[0], std_out[0], std_err[0], sys_log[0]);
DC_Result dcresult = dc_result_create(result_name[0], wutable[actual_wu].wuname, outputdir[0],
std_out[0], std_err[0], sys_log[0], exitcode);
/*for (i = 0; i < n_of_outfiles; i++)
{
DC_log(LOG_DEBUG,"%s/%s added to %s result.", outputdir[0], outfiles[i], result_name[0]);
dc_result_addOutputFile(dcresult, outfiles[i]);
}*/
dcresult->status = DC_RESULT_SUB;
cb_assimilate_result(dcresult);
return 1;
}
//else DC_log(LOG_DEBUG,"%d; %d ", dummy1, dummy2);
}
actual_wu++;
if (actual_wu >= MAX_N_WU) actual_wu = 0;
}
//result = create_result();
//if (did_something) return 1;
return 0;
}
#ifdef NEVERDEFINED
int dc_wu_wuCreated(char *wuname, char *workdir)
{
int ndx = 0;
int ex = 0;
while (ndx < MAX_N_WU && !ex) {
if (wutable[ndx].state == STATE_INVALID) ex = 1;
else ndx++;
}
if (ndx == MAX_N_WU) {
DC_log(LOG_ERR, "Too many work units (%d) at once",
MAX_N_WU);
return DC_ERROR;
}
if (ndx > wu_max) wu_max = ndx;
wu_sum++;
wutable[ndx].name = strdup(wuname);
wutable[ndx].workdir = strdup(workdir);
wutable[ndx].state = STATE_CREATED;
return DC_OK;
}
int dc_wu_wuSubmitted(char *wuname)
{
int ndx = dc_rm_findByName(wuname);
if ((ndx >= MAX_N_WU) || (wundx < 0)){
DC_log(LOG_ERR,
"Work unit (%s) not created before submission",
wuname);
return DC_ERROR;
}
wutable[ndx].state = STATE_SUBMITTED;
return DC_OK;
}
int dc_wu_checkForResult(DC_Result **result)
{
char cmd[256];
char name[256];
int retval;
int ndx = 0;
int ex = 0;
ndx = dc_wu_findByState(STATE_SUBMITTED, ndx);
while ((ndx < MAX_N_WU) && (ndx >= 0)) {
/* check here somehow if a WU is completed */
if (retval == 1) { /* process finished (not exists) */
wutable[ndx].state = STATE_FINISHED;
return DC_RM_RESULTEXISTS;
}
else if (retval < 0) { /* error */
return DC_ERROR;
}
ndx = dc_wu_findByState(STATE_SUBMITTED, ndx+1);
}
return DC_RM_NORESULT;
}
#endif
void dc_wu_log(void)
{
int i;
DC_log(LOG_DEBUG, "----------------------------------------");
DC_log(LOG_DEBUG, "DC has the following info about WUs");
DC_log(LOG_DEBUG, "Number of workunits = %d",
wu_sum);
DC_log(LOG_DEBUG, "Table info: wu_max = %d, wu_sum = %d",
wu_max, wu_sum);
DC_log(LOG_DEBUG, "Table list:");
DC_log(LOG_DEBUG, "Name State WorkDir");
for (i = 0; i <= wu_max; i++) {
DC_log(LOG_DEBUG, "%s\t%s\t%s",
wutable[i].wuname,
state_strings[wutable[i].state],
wutable[i].workdir);
}
DC_log(LOG_DEBUG, "-----------------------------------------");
}
static int dc_wu_findByState(int state, int from)
{
int ndx = from;
int ex = 0;
while (ndx < MAX_N_WU && !ex) {
if (wutable[ndx].state == state) ex = 1;
else ndx++;
}
return ndx;
}
/*
int dc_wu_getWUbyName(const char *wuname)
{
char *wu_name;
strcpy (wu_name, wuname);
static int ndx = dc_wu_findByName(wu_name);
if (ndx == MAX_N_WU)
{
return NULL;
}else{
return (ndx);
}
}
*/

55
dcapi/clgr/wu.h Normal file
View File

@ -0,0 +1,55 @@
#ifndef __WU_H_
#define __WU_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "dc.h"
/** init workunit module.
* 'projectroot' is the path to the directory where the
* actual project is installed e.g. /usr/boinc/projects/proba.
* 'uploadkeyfile' is the upload private key file for the
* installed boinc project, e.g. /usr/boinc/keys/upload_private
*/
//void dc_wu_init(const char *projectroot, const char *uploadkeyfile);
void dc_wu_init(const char *projectroot,const char *workdir_id, const char* exec_dir);
/** add new wu which has been created */
int dc_wu_create(const char *clientname,
const char *arguments);
/** add input files to a wu */
int dc_wu_setInput(DC_Workunit wu, const char *url, const char* localfilename);
/** set priority of a wu */
int dc_wu_setPriority(DC_Workunit wu, int priority);
/** Create the wu within Boinc (i.e. submit from application into Boinc)
* uploadkeyfile: upload_private key
* boincRootDir: the actual projects dir. config.xml should be there
*/
//int dc_wu_createBoincWU(DC_Workunit wu, char *uploadkeyfile, char *boincRootDir);
int dc_wu_submitWU(DC_Workunit wu);
int dc_wu_destroy(DC_Workunit wu);
int dc_wu_suspend(DC_Workunit wu);
int dc_wu_resubmit(DC_Workunit wu);
/* Return the workunit index from the WUtable that matches the name */
int dc_wu_findByName(char *wuname);
void dc_wu_log(void); /* print internal data */
/* */
int dc_wu_checkForResult(void (*cb_assimilate_result)(DC_Result result));
#ifdef __cplusplus
}
#endif
#endif /* __WU_H_ */

6
dcapi/common/Makefile.am Normal file
View File

@ -0,0 +1,6 @@
noinst_LTLIBRARIES = libdc-common.la
libdc_common_la_SOURCES = \
cfg.c \
cfg.h \
logger.c

View File

@ -1,6 +1,6 @@
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include "cfg.h"
@ -9,7 +9,7 @@
/* Private functions */
static char * dc_cfg_cutLeadingWhiteSpace(char *line);
void dc_cfg_cutTrailingWhiteSpace(char *line);
static int tokenise( char *line, char *separators, char *tokens[], int max );
static int tokenise(char *line, char *separators, char *tokens[], int max);
typedef struct {
char *name;
@ -29,10 +29,8 @@ int dc_cfg_parse(const char *cfgfile)
char *tokens[MAX_TOKEN];
int tokenCount;
DC_log(LOG_DEBUG, "cfg: parse config file %s\n", cfgfile);
if ((cfg = fopen(cfgfile, "r")) == NULL) {
DC_log(LOG_ERR, "Config file %s cannot be opened\nErrno=%d %s",
fprintf(stderr, "Config file %s cannot be opened\nErrno=%d %s",
cfgfile, errno, strerror(errno));
return DC_CFG_FILENOTEXISTS;
}
@ -43,13 +41,13 @@ int dc_cfg_parse(const char *cfgfile)
if (str[0] == '\0' || str[0] == '\n') continue; /* empty */
if (str[0] == '#') continue; /* comment */
tokenCount = tokenise( line, "=\n", tokens, MAX_TOKEN );
tokenCount = tokenise(line, "=\n", tokens, MAX_TOKEN);
if (tokenCount == 2) {
NameValuePair *tmp;
tmp = realloc(pairs, (n_pairs + 1) * sizeof(*tmp));
if (!tmp) {
DC_log(LOG_ERR, "Out of memory while parsing config file\n");
fprintf(stderr, "Out of memory while parsing config file\n");
return DC_CFG_OUTOFMEM;
}
pairs = tmp;
@ -58,7 +56,7 @@ int dc_cfg_parse(const char *cfgfile)
n_pairs++;
}
else {
DC_log(LOG_WARNING,
fprintf(stderr,
"Cannot understand in config file %s the line %s",
cfgfile, line);
}
@ -66,7 +64,7 @@ int dc_cfg_parse(const char *cfgfile)
return DC_CFG_OK;
}
char * dc_cfg_get(char *name)
char *dc_cfg_get(char *name)
{
/* Return the value of the name=value pair */
int i = 0;
@ -83,7 +81,7 @@ char * dc_cfg_get(char *name)
return NULL;
}
static char * dc_cfg_cutLeadingWhiteSpace(char *line)
static char *dc_cfg_cutLeadingWhiteSpace(char *line)
{
/* go until white spaces are in front of the string */
char *str = line;
@ -107,7 +105,7 @@ void dc_cfg_cutTrailingWhiteSpace(char *line)
}
/* break string into tokens and save tokens in arrays */
static int tokenise( char *line, char *separators, char *tokens[], int max )
static int tokenise(char *line, char *separators, char *tokens[], int max)
{
int i = 0;
char *pch;

View File

@ -1,6 +1,7 @@
#define DC_CFG_OK 0
#define DC_CFG_FILENOTEXISTS 1
#define DC_CFG_OUTOFMEM 2
/** Parse the config file and store name=value pairs in memory
*

View File

@ -1,10 +1,10 @@
#include <stdio.h>
#include <stdarg.h>
#include <sys/syslog.h>
#include <errno.h>
#include <strings.h>
#include <stdlib.h>
#include <stdarg.h>
#include <stdio.h>
#include <errno.h>
#include <time.h>
#include <string.h>
#include "cfg.h"
#include "dc.h"

92
dcapi/configure.ac Normal file
View File

@ -0,0 +1,92 @@
AC_INIT([Distributed Computing API], [0.1], [podhorszki@sztaki.hu], [dcapi])
AC_CONFIG_HEADERS([include/config.h])
AC_CONFIG_AUX_DIR([scripts])
AC_CONFIG_SRCDIR([include/dc.h])
AM_INIT_AUTOMAKE([foreign])
AC_PROG_CC
AC_DISABLE_SHARED
AC_PROG_LIBTOOL
LIBTOOL="$LIBTOOL --silent"
AC_ARG_WITH([boinc], AS_HELP_STRING([--with-boinc@<:@=DIR@:>@],
[Use BOINC (installed in DIR)]),, [with_boinc=auto])
no_boinc=
if test "$with_boinc" = no; then
no_boinc=yes
fi
if test "$no_boinc" != yes; then
case "$with_boinc" in
yes|auto)
BOINC_INCLUDES="/usr/include/BOINC"
;;
*)
BOINC_INCLUDES="$with_boinc"
;;
esac
AC_PATH_PROG([MYSQL_CONFIG], [mysql_config])
if test "$MYSQL_CONFIG" = ""; then
no_boinc=yes
fi
fi
if test "$no_boinc" != yes; then
BOINC_CPPFLAGS="-I$BOINC_INCLUDES -I$BOINC_INCLUDES/db -I$BOINC_INCLUDES/lib -I$BOINC_INCLUDES/sched -I$BOINC_INCLUDES/tools"
if test -d "$BOINC_INCLUDES/RSAEuro/source"; then
BOINC_CPPFLAGS="$BOINC_CPPFLAGS -I$BOINC_INCLUDES/RSAEuro/source"
else
if test -d "$BOINC_INCLUDES/RSAEuro"; then
BOINC_CPPFLAGS="$BOINC_CPPFLAGS -I$BOINC_INCLUDES/RSAEuro"
fi
fi
BOINC_LIBS="-lboinc -lboinc_api -lsched -lboinc_zip"
AC_CHECK_LIB([rsaeuro], [RSAPrivateEncrypt],
[BOINC_LIBS="$BOINC_LIBS -lrsaeuro"],
[BOINC_LIBS="$BOINC_LIBS -lcrypto"])
MYSQL_CPPFLAGS=`$MYSQL_CONFIG --cflags`
MYSQL_LIBS=`$MYSQL_CONFIG --libs | sed -e 's,-L/usr/lib , ,'`
AC_SUBST([BOINC_CPPFLAGS])
AC_SUBST([BOINC_LIBS])
AC_SUBST([MYSQL_CPPFLAGS])
AC_SUBST([MYSQL_LIBS])
fi
AC_LANG([C++])
AH_TEMPLATE([BOINC_VERSION], [BOINC major version])
if test "$no_boinc" != yes; then
save_CPPFLAGS="$CPPFLAGS"
CPPFLAGS="$CPPFLAGS $BOINC_CPPFLAGS"
AC_CACHE_CHECK([for BOINC version], [dc_cv_boinc_version], [
AC_COMPILE_IFELSE([AC_LANG_PROGRAM([#include <sched_msgs.h>],
[SCHED_MSG_LOG::Kind level = SCHED_MSG_LOG::CRITICAL;])],
[dc_cv_boinc_version=4],
[dc_cv_boinc_version=5])
])
CPPFLAGS="$save_CPPFLAGS"
AC_DEFINE([BOINC_VERSION], [$dc_cv_boinc_version])
fi
AC_ARG_WITH([clgr], AS_HELP_STRING([--with-clgr@<:@=DIR@:>@],
[Use ClusterGrid (installed in DIR)]),, [with_clgr=yes])
AC_ARG_ENABLE([local], AS_HELP_STRING([--enable-local],
[Build local DC-API]),, [enable_local=yes])
AM_CONDITIONAL([WITH_BOINC], [test "$with_boinc" != no])
AM_CONDITIONAL([WITH_CLGR], [test "$with_clgr" != no])
AM_CONDITIONAL([WITH_LOCAL], [test "$enable_local" == yes])
AC_CONFIG_FILES([Makefile \
include/Makefile \
common/Makefile \
boinc/Makefile \
local/Makefile \
clgr/Makefile])
AC_OUTPUT

View File

@ -0,0 +1 @@
include_HEADERS = dc.h dc_client.h

View File

@ -14,9 +14,17 @@ extern "C" {
#include <sys/syslog.h>
#include <stdarg.h>
#define DC_OK 0
#define DC_ERROR 1
enum {
DC_OK,
DC_ERROR
};
typedef enum {
DC_RESULT_ACCEPT,
DC_RESULT_INVALID,
DC_RESULT_FINAL,
DC_RESULT_SUB
} DC_ResultStatus;
/** The first function to be invoked by the main application before using DC
*
@ -51,6 +59,13 @@ typedef struct {
char *outfiles_dir; // directory of output files
char *outfiles[MAX_OUTFILES]; // output files
int noutfiles; // number of output files
/* clgr-specific fields */
char *std_out;
char *std_err;
char *sys_log;
int exitcode;
DC_ResultStatus status;
} dc_result;
/* Result.
@ -139,9 +154,6 @@ int DC_checkForResult(int timeout,
);
#define DC_RESULT_ACCEPT 0
#define DC_RESULT_INVALID 1
/* Callback functions
These functions should be provided by the application.

View File

@ -6,8 +6,9 @@
#ifndef __DCCLIENT_H_
#define __DCCLIENT_H_
#define DC_OK 0
#define DC_ERROR 1
#ifdef __cplusplus
extern "C" {
#endif
/** Send a (partial) result back to the master.
* Parameters:
@ -38,4 +39,8 @@ int DC_ContinueWork(void);
*/
void DC_Finish(void);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -1,17 +0,0 @@
#CC=cc -n32 -g -Xcpluscomm
CC=gcc -g
AR=ar
INC=-I../include
LIBDC=libdc-local
all:
${CC} -c dc.c cfg.c logger.c rm.c ${INC}
${AR} rc ${LIBDC}.a dc.o cfg.o logger.o rm.o
clean:
rm -rf core *~ *.o ${LIBDC}.a

8
dcapi/local/Makefile.am Normal file
View File

@ -0,0 +1,8 @@
lib_LTLIBRARIES = libdc-local.la
AM_CPPFLAGS = -I$(top_srcdir)/common
libdc_local_la_SOURCES = \
dc.c \
rm.c
libdc_local_la_LIBADD = ../common/libdc-common.la

View File

@ -1,135 +0,0 @@
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include "dc.h"
#include "cfg.h"
/* Private functions */
static char * dc_cfg_cutLeadingWhiteSpace(char *line);
void dc_cfg_cutTrailingWhiteSpace(char *line);
static int tokenise( char *line, char *separatrors, char *tokens[], int max );
typedef struct {
char *name;
char *value;
} NameValuePair;
#define MAX_N_PAIRS 100
static NameValuePair pairs[MAX_N_PAIRS];
static int n_pairs;
/* Public functions */
int dc_cfg_parse(const char *cfgfile)
{
FILE *cfg;
char line[1024];
char *str;
# define MAX_TOKEN 3
char *tokens[MAX_TOKEN];
int tokenCount;
/* This function cannot use logging functions because the logger
gets values from the config file */
n_pairs = 0;
/*DC_log(LOG_INFO,"cfg: parse config file %s\n", cfgfile);*/
if ((cfg = fopen(cfgfile, "r")) == NULL) {
fprintf(stderr,"Config file %s cannot be opened\nErrno=%d %s",
cfgfile, errno, strerror(errno));
return DC_CFG_FILENOTEXISTS;
}
while (fgets(line, 1024, cfg) != NULL) {
str = dc_cfg_cutLeadingWhiteSpace(line);
if (str[0] == '\0' || str[0] == '\n') continue; /* empty */
if (str[0] == '#') continue; /* comment */
tokenCount = tokenise( line, "=\n", tokens, MAX_TOKEN );
if (tokenCount == 2) {
if (n_pairs < MAX_N_PAIRS) {
pairs[n_pairs].name = strdup(tokens[0]);
pairs[n_pairs].value = strdup(tokens[1]);
/*DC_log(LOG_DEBUG,"cfg: name=value pair: [%s]=[%s]\n",
pairs[n_pairs].name, pairs[n_pairs].value);*/
n_pairs++;
}
else {
fprintf(stderr, "Too many name=value pairs in config file\n");
}
}
else {
fprintf(stderr,
"Cannot understand in config file %s the line%n%s",
cfgfile, line);
}
}
return DC_CFG_OK;
}
char * dc_cfg_get(char *name)
{
/* Return the value of the name=value pair */
int i = 0;
if (name == NULL) return NULL;
while (i < n_pairs) {
if (pairs[i].name != NULL &&
!strcmp(pairs[i].name, name))
return pairs[i].value;
i++;
}
return NULL;
}
static char * dc_cfg_cutLeadingWhiteSpace(char *line)
{
/* go until white spaces are in front of the string */
char *str = line;
while (str[0] == ' ' || str[0] == '\t') str++;
return str;
}
void dc_cfg_cutTrailingWhiteSpace(char *line)
{
/* go until white spaces are at the end of the string */
char *str = line;
int len = strlen(line);
if (len > 0) {
str = line+len-1;
while (len > 0 &&
(str[0] == ' ' || str[0] == '\t')) {
str[0]='\0';
str--;
}
}
}
/* break string into tokens and save tokens in arrays */
static int tokenise( char *line, char *separators, char *tokens[], int max )
{
int i = 0;
char *pch;
pch = strtok( line, separators ); /* get 1st token addr */
while( pch != NULL ) /* repeat for each token */
{
if ( i < max )
{
/* strip leading and trailing spaces and
save token in array
*/
tokens[i] = dc_cfg_cutLeadingWhiteSpace(pch);
dc_cfg_cutTrailingWhiteSpace(tokens[i]);
}
pch = strtok( NULL, separators ); /* next token address */
i++;
}
return( i );
}