boinc/client/app_start.C

548 lines
16 KiB
C

static volatile const char *BOINCrcsid="$Id$";
// The contents of this file are subject to the BOINC Public License
// Version 1.0 (the "License"); you may not use this file except in
// compliance with the License. You may obtain a copy of the License at
// http://boinc.berkeley.edu/license_1.0.txt
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
// License for the specific language governing rights and limitations
// under the License.
//
// The Original Code is the Berkeley Open Infrastructure for Network Computing.
//
// The Initial Developer of the Original Code is the SETI@home project.
// Portions created by the SETI@home project are Copyright (C) 2002
// University of California at Berkeley. All Rights Reserved.
//
// Contributor(s):
//
// initialization and starting of applications
#include "cpp.h"
#ifdef _WIN32
#include "boinc_win.h"
#else
#if HAVE_SYS_TIME_H
#include <sys/time.h>
#endif
#if HAVE_SYS_RESOURCE_H
#include <sys/resource.h>
#endif
#if HAVE_SYS_IPC_H
#include <sys/ipc.h>
#endif
#if HAVE_SYS_WAIT_H
#include <sys/wait.h>
#endif
#include <unistd.h>
#include <cerrno>
#endif
using std::vector;
#include "filesys.h"
#include "error_numbers.h"
#include "util.h"
#include "shmem.h"
#include "client_msgs.h"
#include "client_state.h"
#include "file_names.h"
#include "app.h"
// value for setpriority(2)
static const int PROCESS_IDLE_PRIORITY = 19;
// Goes through an array of strings, and prints each string
//
static int debug_print_argv(char** argv) {
int i;
log_messages.printf(CLIENT_MSG_LOG::DEBUG_TASK, "Arguments:");
++log_messages;
for (i=0; argv[i]; i++) {
log_messages.printf(
CLIENT_MSG_LOG::DEBUG_TASK,
"argv[%d]: %s\n", i, argv[i]
);
}
--log_messages;
return 0;
}
int ACTIVE_TASK::link_user_files() {
PROJECT* project = wup->project;
unsigned int i;
FILE_REF fref;
FILE_INFO* fip;
char link_path[256], buf[256], file_path[256];
int retval;
for (i=0; i<project->user_files.size(); i++) {
fref = project->user_files[i];
fip = fref.file_info;
if (fip->status != FILE_PRESENT) continue;
get_pathname(fip, file_path);
sprintf(link_path, "%s%s%s", slot_dir, PATH_SEPARATOR, strlen(fref.open_name)?fref.open_name:fip->name);
sprintf(buf, "..%s..%s%s", PATH_SEPARATOR, PATH_SEPARATOR, file_path);
retval = boinc_link(buf, link_path);
if (retval) return retval;
}
return 0;
}
// make a unique key for core/app shared memory segment
//
int ACTIVE_TASK::get_shmem_seg_name() {
#ifdef _WIN32
int i = 0;
char szSharedMemoryName[256];
HANDLE hSharedMemoryHandle = 0;
for (i=0; i<1024; i++) {
sprintf(szSharedMemoryName, "%sboinc_%d", SHM_PREFIX, i);
hSharedMemoryHandle = create_shmem(szSharedMemoryName, 1024, NULL, true);
if (hSharedMemoryHandle) break;
}
if (!hSharedMemoryHandle) {
return ERR_SHMGET;
}
detach_shmem(hSharedMemoryHandle, NULL);
sprintf(szSharedMemoryName, "boinc_%d", i);
strcpy(shmem_seg_name, szSharedMemoryName);
#else
char init_data_path[256];
sprintf(init_data_path, "%s%s%s", slot_dir, PATH_SEPARATOR, INIT_DATA_FILE);
shmem_seg_name = ftok(init_data_path, slot);
#endif
return 0;
}
// write the app init file.
// This is done before starting the app,
// and when project prefs have changed during app execution
//
int ACTIVE_TASK::write_app_init_file() {
APP_INIT_DATA aid;
FILE *f;
char init_data_path[256], project_dir[256], project_path[256];
int retval;
memset(&aid, 0, sizeof(aid));
aid.core_version = gstate.version();
safe_strcpy(aid.app_name, wup->app->name);
safe_strcpy(aid.user_name, wup->project->user_name);
safe_strcpy(aid.team_name, wup->project->team_name);
if (wup->project->project_specific_prefs.length()) {
aid.project_preferences = (char*)wup->project->project_specific_prefs.c_str();
}
get_project_dir(wup->project, project_dir);
relative_to_absolute(project_dir, project_path);
strcpy(aid.project_dir, project_path);
relative_to_absolute("", aid.boinc_dir);
strcpy(aid.authenticator, wup->project->authenticator);
aid.slot = slot;
strcpy(aid.wu_name, wup->name);
aid.user_total_credit = wup->project->user_total_credit;
aid.user_expavg_credit = wup->project->user_expavg_credit;
aid.host_total_credit = wup->project->host_total_credit;
aid.host_expavg_credit = wup->project->host_expavg_credit;
aid.checkpoint_period = gstate.global_prefs.disk_interval;
aid.fraction_done_update_period = DEFAULT_FRACTION_DONE_UPDATE_PERIOD;
aid.fraction_done_start = 0;
aid.fraction_done_end = 1;
#ifdef _WIN32
strcpy(aid.shmem_seg_name, shmem_seg_name);
#else
aid.shmem_seg_name = shmem_seg_name;
#endif
// wu_cpu_time is the CPU time at start of session,
// not the checkpoint CPU time
// At the start of an episode these are equal, but not in the middle!
//
aid.wu_cpu_time = episode_start_cpu_time;
sprintf(init_data_path, "%s%s%s", slot_dir, PATH_SEPARATOR, INIT_DATA_FILE);
f = boinc_fopen(init_data_path, "w");
if (!f) {
msg_printf(wup->project, MSG_ERROR,
"Failed to open core-to-app prefs file %s",
init_data_path
);
return ERR_FOPEN;
}
aid.host_info = gstate.host_info;
aid.global_prefs = gstate.global_prefs;
retval = write_init_data_file(f, aid);
fclose(f);
return retval;
}
// set up a 'symbolic link' in the slot dir to the given file
// (or copy the file to slot dir)
//
static int setup_file(
WORKUNIT* wup, FILE_INFO* fip, FILE_REF& fref,
char* file_path, char* slot_dir
) {
char link_path[256], buf[256];
int retval;
sprintf(link_path,
"%s%s%s", slot_dir, PATH_SEPARATOR,
strlen(fref.open_name)?fref.open_name:fip->name
);
sprintf(buf, "..%s..%s%s", PATH_SEPARATOR, PATH_SEPARATOR, file_path );
if (fref.copy_file) {
retval = boinc_copy(file_path, link_path);
if (retval) {
msg_printf(wup->project, MSG_ERROR, "Can't copy %s to %s", file_path, link_path);
return retval;
}
} else {
retval = boinc_link(buf, link_path);
if (retval) {
msg_printf(wup->project, MSG_ERROR, "Can't link %s to %s", file_path, link_path);
return retval;
}
}
return 0;
}
// Start a task in a slot directory.
// This includes setting up soft links,
// passing preferences, and starting the process
//
// Current dir is top-level BOINC dir
//
// postcondition: ACTIVE_TASK::state is set correctly
//
int ACTIVE_TASK::start(bool first_time) {
char exec_name[256], file_path[256], buf[256], exec_path[256];
unsigned int i;
FILE_REF fref;
FILE_INFO* fip;
int retval;
SCOPE_MSG_LOG scope_messages(log_messages, CLIENT_MSG_LOG::DEBUG_TASK);
scope_messages.printf("ACTIVE_TASK::start(first_time=%d)\n", first_time);
if (first_time) {
checkpoint_cpu_time = 0;
}
current_cpu_time = checkpoint_cpu_time;
episode_start_cpu_time = checkpoint_cpu_time;
cpu_time_at_last_sched = checkpoint_cpu_time;
fraction_done = 0;
if (!app_client_shm.shm) {
retval = get_shmem_seg_name();
if (retval) {
msg_printf(wup->project, MSG_ERROR,
"Can't get shared memory segment name: %d", retval
);
return retval;
}
}
retval = write_app_init_file();
if (retval) return retval;
// set up applications files
//
for (i=0; i<app_version->app_files.size(); i++) {
FILE_REF fref = app_version->app_files[i];
fip = fref.file_info;
get_pathname(fip, file_path);
if (fref.main_program) {
safe_strcpy(exec_name, fip->name);
safe_strcpy(exec_path, file_path);
}
if (first_time) {
retval = setup_file(wup, fip, fref, file_path, slot_dir);
if (retval) return retval;
}
}
// set up input files
//
for (i=0; i<wup->input_files.size(); i++) {
fref = wup->input_files[i];
fip = fref.file_info;
get_pathname(fref.file_info, file_path);
if (first_time) {
retval = setup_file(wup, fip, fref, file_path, slot_dir);
if (retval) return retval;
}
}
// set up output files
//
for (i=0; i<result->output_files.size(); i++) {
fref = result->output_files[i];
fip = fref.file_info;
get_pathname(fref.file_info, file_path);
if (first_time) {
retval = setup_file(wup, fip, fref, file_path, slot_dir);
if (retval) return retval;
}
}
link_user_files();
#ifdef _WIN32
PROCESS_INFORMATION process_info;
STARTUPINFO startup_info;
char slotdirpath[256];
char cmd_line[512];
memset(&process_info, 0, sizeof(process_info));
memset(&startup_info, 0, sizeof(startup_info));
//startup_info.cb = sizeof(startup_info);
//startup_info.dwFlags = STARTF_USESHOWWINDOW;
//startup_info.wShowWindow = SW_HIDE;
if (!quitRequestEvent) {
sprintf(buf, "%s%s", QUIT_PREFIX, shmem_seg_name);
quitRequestEvent = CreateEvent(0, FALSE, FALSE, buf);
if (quitRequestEvent == NULL) return ERR_INVALID_EVENT;
}
// create core/app share mem segment if needed
//
if (!app_client_shm.shm) {
sprintf(buf, "%s%s", SHM_PREFIX, shmem_seg_name);
shm_handle = create_shmem(buf, sizeof(SHARED_MEM),
(void **)&app_client_shm.shm, false
);
if (shm_handle == NULL) return ERR_SHMGET;
}
app_client_shm.reset_msgs();
// NOTE: in Windows, stderr is redirected in boinc_init_diagnostics();
sprintf(cmd_line, "%s %s", exec_path, wup->command_line);
relative_to_absolute(slot_dir, slotdirpath);
if (!CreateProcess(exec_path,
cmd_line,
NULL,
NULL,
FALSE,
CREATE_NEW_PROCESS_GROUP|CREATE_NO_WINDOW|IDLE_PRIORITY_CLASS,
NULL,
slotdirpath,
&startup_info,
&process_info
)) {
char szError[1024];
windows_error_string(szError, sizeof(szError));
state = PROCESS_COULDNT_START;
gstate.report_result_error(*result, "CreateProcess() failed - %s", szError);
msg_printf(wup->project, MSG_ERROR, "CreateProcess() failed - %s", szError);
return ERR_EXEC;
}
pid = process_info.dwProcessId;
pid_handle = process_info.hProcess;
thread_handle = process_info.hThread;
#else
char* argv[100];
// Set up core/app shared memory seg if needed
//
if (!app_client_shm.shm) {
retval = create_shmem(
shmem_seg_name, sizeof(SHARED_MEM), (void**)&app_client_shm.shm
);
if (retval) {
msg_printf(
wup->project, MSG_ERROR, "Can't create shared mem: %d", retval
);
return retval;
}
}
app_client_shm.reset_msgs();
pid = fork();
if (pid == -1) {
state = PROCESS_COULDNT_START;
gstate.report_result_error(*result, "fork() failed: %s", strerror(errno));
msg_printf(wup->project, MSG_ERROR, "fork() failed: %s", strerror(errno));
return ERR_FORK;
}
if (pid == 0) {
// from here on we're running in a new process.
// If an error happens, exit nonzero so that the core client
// knows there was a problem.
// chdir() into the slot directory
//
retval = chdir(slot_dir);
if (retval) {
perror("chdir");
_exit(retval);
}
// hook up stderr to a specially-named file
//
freopen(STDERR_FILE, "a", stderr);
argv[0] = exec_name;
parse_command_line(wup->command_line, argv+1);
debug_print_argv(argv);
sprintf(buf, "..%s..%s%s", PATH_SEPARATOR, PATH_SEPARATOR, exec_path );
retval = execv(buf, argv);
msg_printf(wup->project, MSG_ERROR,
"execv(%s) failed: %d\n", buf, retval
);
perror("execv");
_exit(errno);
}
scope_messages.printf("ACTIVE_TASK::start(): forked process: pid %d\n", pid);
// set idle process priority
#ifdef HAVE_SETPRIORITY
if (setpriority(PRIO_PROCESS, pid, PROCESS_IDLE_PRIORITY)) {
perror("setpriority");
}
#endif
#endif
state = PROCESS_EXECUTING;
return 0;
}
// Resume the task if it was previously running; otherwise start it
// Postcondition: "state" is set correctly
//
int ACTIVE_TASK::resume_or_start() {
char* str = "??";
int retval;
switch (state) {
case PROCESS_UNINITIALIZED:
if (scheduler_state == CPU_SCHED_UNINITIALIZED) {
if (!boinc_file_exists(slot_dir)) {
make_slot_dir(slot);
}
retval = clean_out_dir(slot_dir);
retval = start(true);
str = "Starting";
} else {
retval = start(false);
str = "Restarting";
}
if (retval) {
state = PROCESS_COULDNT_START;
return retval;
}
break;
case PROCESS_SUSPENDED:
retval = unsuspend();
if (retval) {
msg_printf(
wup->project,
MSG_ERROR,
"ACTIVE_TASK::resume_or_start(): could not unsuspend active_task"
);
state = PROCESS_COULDNT_START;
return retval;
}
str = "Resuming";
break;
case PROCESS_EXECUTING:
return 0;
break;
default:
msg_printf(result->project, MSG_ERROR,
"resume_or_start(): unexpected process state %d", state
);
return 0;
}
msg_printf(result->project, MSG_INFO,
"%s result %s using %s version %.2f",
str,
result->name,
app_version->app->name,
app_version->version_num/100.
);
return 0;
}
// Restart active tasks without wiping and reinitializing slot directories
// Called at init, with max_tasks = ncpus
//
int ACTIVE_TASK_SET::restart_tasks(int max_tasks) {
vector<ACTIVE_TASK*>::iterator iter;
ACTIVE_TASK* atp;
RESULT* result;
int retval, num_tasks_started;
SCOPE_MSG_LOG scope_messages(log_messages, CLIENT_MSG_LOG::DEBUG_TASK);
num_tasks_started = 0;
iter = active_tasks.begin();
while (iter != active_tasks.end()) {
atp = *iter;
result = atp->result;
atp->init(atp->result);
get_slot_dir(atp->slot, atp->slot_dir);
if (!gstate.input_files_available(result)) {
msg_printf(atp->wup->project, MSG_ERROR, "ACTIVE_TASKS::restart_tasks(); missing files\n");
gstate.report_result_error(
*(atp->result),
"One or more missing files"
);
iter = active_tasks.erase(iter);
delete atp;
continue;
}
if (atp->scheduler_state != CPU_SCHED_SCHEDULED
|| num_tasks_started >= max_tasks
) {
msg_printf(atp->wup->project, MSG_INFO,
"Deferring computation for result %s",
atp->result->name
);
atp->scheduler_state = CPU_SCHED_PREEMPTED;
iter++;
continue;
}
msg_printf(atp->wup->project, MSG_INFO,
"Resuming computation for result %s using %s version %.2f",
atp->result->name,
atp->app_version->app->name,
atp->app_version->version_num/100.
);
retval = atp->start(false);
if (retval) {
msg_printf(atp->wup->project, MSG_ERROR, "ACTIVE_TASKS::restart_tasks(); restart failed: %d\n", retval);
gstate.report_result_error(
*(atp->result),
"Couldn't restart the app for this result: %d", retval
);
iter = active_tasks.erase(iter);
delete atp;
} else {
++num_tasks_started;
iter++;
}
}
return 0;
}