remote job submission: fix bugs, improve error reporting

This commit is contained in:
David Anderson 2016-07-27 01:07:07 -07:00
parent a60f0bdb49
commit b6c0d75a8c
3 changed files with 265 additions and 119 deletions

View File

@ -83,7 +83,11 @@ function read_input_template($app, $r) {
} else {
$path = project_dir() . "/templates/$app->name"."_in";
}
return simplexml_load_file($path);
$x = simplexml_load_file($path);
if (!$x) {
xml_error(-1, "Couldn't parse input template file $path");
}
return $x;
}
function check_max_jobs_in_progress($r, $user_submit) {

View File

@ -38,6 +38,35 @@ using std::string;
//#define SHOW_REPLY
// replies can have one or more <error> elements.
// These can be either PHP Notices or Warnings (which are not fatal),
// or fatal errors.
// Fatal errors have nonzero error_num.
//
struct ERROR {
int error_num;
char error_msg[256];
char type[256];
char file[256];
char line[256];
void parse(XML_PARSER& xp) {
error_num = 0;
strcpy(error_msg, "");
strcpy(type, "");
strcpy(file, "");
strcpy(line, "");
while (!xp.get_tag()) {
if (xp.match_tag("/error")) break;
if (xp.parse_str("error_msg", error_msg, sizeof(error_msg))) continue;
if (xp.parse_int("error_num", error_num)) continue;
if (xp.parse_str("type", type, sizeof(type))) continue;
if (xp.parse_str("file", file, sizeof(file))) continue;
if (xp.parse_str("line", line, sizeof(line))) continue;
}
}
};
// do an HTTP GET request.
//
static int do_http_get(
@ -153,21 +182,28 @@ int query_files(
}
fseek(reply, 0, SEEK_SET);
int x;
retval = -1;
error_msg = "";
while (fgets(buf, 256, reply)) {
retval = 0;
MIOFILE mf;
mf.init_file(reply);
XML_PARSER xp(&mf);
while (!xp.get_tag()) {
#ifdef SHOW_REPLY
printf("query_files reply: %s", buf);
printf("query_files reply: %s\n", xp.parsed_tag);
#endif
if (strstr(buf, "absent_files")) {
retval = 0;
continue;
}
if (parse_int(buf, "<error_num>", retval)) continue;
if (parse_str(buf, "<error_msg>", error_msg)) continue;
if (parse_int(buf, "<file>", x)) {
absent_files.push_back(x);
continue;
if (xp.match_tag("absent_files")) {
while (!xp.get_tag()) {
if (xp.match_tag("/absent_files")) break;
if (xp.parse_int("file", x)) {
absent_files.push_back(x);
}
}
} else if (xp.match_tag("error")) {
ERROR error;
error.parse(xp);
if (error.error_num) {
retval = error.error_num;
error_msg = error.error_msg;
}
}
}
fclose(reply);
@ -205,17 +241,26 @@ int upload_files (
}
fseek(reply, 0, SEEK_SET);
retval = -1;
error_msg = "";
while (fgets(buf, 256, reply)) {
bool success;
MIOFILE mf;
mf.init_file(reply);
XML_PARSER xp(&mf);
while (!xp.get_tag()) {
#ifdef SHOW_REPLY
printf("upload_files reply: %s", buf);
printf("upload_files reply: %s\n", xp.parsed_tag);
#endif
if (strstr(buf, "success")) {
if (xp.parse_bool("success", success)) {
retval = 0;
continue;
}
if (parse_int(buf, "<error_num>", retval)) continue;
if (parse_str(buf, "<error_msg>", error_msg)) continue;
if (xp.match_tag("error")) {
ERROR error;
error.parse(xp);
if (error.error_num) {
retval = error.error_num;
error_msg = error.error_msg;
}
}
}
fclose(reply);
return retval;
@ -257,19 +302,26 @@ int create_batch(
char buf[256];
batch_id = 0;
fseek(reply, 0, SEEK_SET);
int error_num = 0;
error_msg = "";
while (fgets(buf, 256, reply)) {
MIOFILE mf;
mf.init_file(reply);
XML_PARSER xp(&mf);
retval = 0;
while (!xp.get_tag()) {
#ifdef SHOW_REPLY
printf("create_batch reply: %s", buf);
printf("create_batch reply: %s\n", xp.parsed_tag);
#endif
if (parse_int(buf, "<batch_id>", batch_id)) continue;
if (parse_int(buf, "<error_num>", error_num)) continue;
if (parse_str(buf, "<error_msg>", error_msg)) continue;
if (xp.parse_int("batch_id", batch_id)) continue;
if (xp.match_tag("error")) {
ERROR error;
error.parse(xp);
if (error.error_num) {
retval = error.error_num;
error_msg = error.error_msg;
}
}
}
fclose(reply);
return error_num;
return retval;
}
int estimate_batch(
@ -309,17 +361,25 @@ int estimate_batch(
}
fseek(reply, 0, SEEK_SET);
retval = -1;
error_msg = "";
while (fgets(buf, 256, reply)) {
MIOFILE mf;
mf.init_file(reply);
XML_PARSER xp(&mf);
while (!xp.get_tag()) {
#ifdef SHOW_REPLY
printf("submit_batch reply: %s", buf);
printf("estimate_batch reply: %s\n", xp.parsed_tag);
#endif
if (parse_double(buf, "<seconds>", est_makespan)) {
if (xp.parse_double("seconds", est_makespan)) {
retval = 0;
continue;
}
if (parse_int(buf, "<error_num>", retval)) continue;
if (parse_str(buf, "<error_msg>", error_msg)) continue;
if (xp.match_tag("error")) {
ERROR error;
error.parse(xp);
if (error.error_num) {
retval = error.error_num;
error_msg = error.error_msg;
}
}
}
fclose(reply);
return retval;
@ -397,18 +457,26 @@ int submit_jobs(
}
fseek(reply, 0, SEEK_SET);
retval = -1;
error_msg = "";
int temp;
while (fgets(buf, 256, reply)) {
MIOFILE mf;
mf.init_file(reply);
XML_PARSER xp(&mf);
while (!xp.get_tag()) {
#ifdef SHOW_REPLY
printf("submit_batch reply: %s", buf);
printf("submit_batch reply: %s\n", xp.parsed_tag);
#endif
if (parse_int(buf, "<batch_id>", temp)) {
if (xp.parse_int("batch_id", temp)) {
retval = 0;
continue;
}
if (parse_int(buf, "<error_num>", retval)) continue;
if (parse_str(buf, "<error_msg>", error_msg)) continue;
if (xp.match_tag("error")) {
ERROR error;
error.parse(xp);
if (error.error_num) {
retval = error.error_num;
error_msg = error.error_msg;
}
}
}
fclose(reply);
return retval;
@ -447,37 +515,45 @@ int query_batch_set(
fseek(reply, 0, SEEK_SET);
retval = -1;
qb_reply.server_time = 0;
error_msg = "";
while (fgets(buf, 256, reply)) {
MIOFILE mf;
mf.init_file(reply);
XML_PARSER xp(&mf);
while (!xp.get_tag()) {
#ifdef SHOW_REPLY
printf("query_batches reply: %s", buf);
#endif
if (strstr(buf, "jobs")) {
if (xp.match_tag("jobs")) {
retval = 0;
continue;
}
if (parse_int(buf, "<error_num>", retval)) continue;
if (parse_str(buf, "<error_msg>", error_msg)) continue;
if (parse_double(buf, "<server_time>", qb_reply.server_time)) continue;
if (parse_int(buf, "<batch_size>", batch_size)) {
if (xp.parse_double("server_time", qb_reply.server_time)) continue;
if (xp.parse_int("batch_size", batch_size)) {
qb_reply.batch_sizes.push_back(batch_size);
continue;
}
if (strstr(buf, "<job>")) {
if (xp.match_tag("job")) {
JOB_STATUS js;
while (fgets(buf, 256, reply)) {
while (!xp.get_tag()) {
#ifdef SHOW_REPLY
printf("query_batches reply: %s", buf);
#endif
if (strstr(buf, "</job>")) {
if (xp.match_tag("/job")) {
qb_reply.jobs.push_back(js);
break;
}
if (parse_str(buf, "job_name", js.job_name)) continue;
if (parse_str(buf, "status", js.status)) continue;
if (xp.parse_string("job_name", js.job_name)) continue;
if (xp.parse_string("status", js.status)) continue;
}
continue;
}
if (xp.match_tag("error")) {
ERROR error;
error.parse(xp);
if (error.error_num) {
retval = error.error_num;
error_msg = error.error_msg;
}
}
}
fclose(reply);
return retval;
@ -580,8 +656,14 @@ int query_batches(
}
continue;
}
if (xp.parse_string("error_msg", error_msg)) continue;
if (xp.parse_int("error_num", retval)) continue;
if (xp.match_tag("error")) {
ERROR error;
error.parse(xp);
if (error.error_num) {
retval = error.error_num;
error_msg = error.error_msg;
}
}
}
fclose(reply);
return retval;
@ -690,17 +772,26 @@ int abort_jobs(
}
fseek(reply, 0, SEEK_SET);
retval = -1;
error_msg = "";
while (fgets(buf, 256, reply)) {
bool success;
MIOFILE mf;
XML_PARSER xp(&mf);
mf.init_file(reply);
while (!xp.get_tag()) {
#ifdef SHOW_REPLY
printf("abort_jobs reply: %s", buf);
printf("abort_jobs reply: %s\n", xp.parsed_tag);
#endif
if (strstr(buf, "success")) {
if (xp.parse_bool("success", success)) {
retval = 0;
continue;
}
if (parse_int(buf, "<error_num>", retval)) continue;
if (parse_str(buf, "<error_msg>", error_msg)) continue;
if (xp.match_tag("error")) {
ERROR error;
error.parse(xp);
if (error.error_num) {
retval = error.error_num;
error_msg = error.error_msg;
}
}
}
fclose(reply);
return retval;
@ -736,20 +827,25 @@ int get_templates(
return retval;
}
retval = -1;
error_msg = "";
fseek(reply, 0, SEEK_SET);
while (fgets(buf, 256, reply)) {
MIOFILE mf;
XML_PARSER xp(&mf);
mf.init_file(reply);
while (!xp.get_tag()) {
#ifdef SHOW_REPLY
printf("get_templates reply: %s", buf);
printf("get_templates reply: %s\n", xp.parsed_tag);
#endif
if (parse_int(buf, "<error_num>", retval)) continue;
if (parse_str(buf, "<error_msg>", error_msg)) continue;
if (strstr(buf, "<templates>")) {
MIOFILE mf;
XML_PARSER xp(&mf);
mf.init_file(reply);
if (xp.match_tag("templates")) {
retval = td.parse(xp);
}
if (xp.match_tag("error")) {
ERROR error;
error.parse(xp);
if (error.error_num) {
retval = error.error_num;
error_msg = error.error_msg;
}
}
}
fclose(reply);
return retval;
@ -849,20 +945,25 @@ int query_completed_job(
return retval;
}
retval = -1;
error_msg = "";
fseek(reply, 0, SEEK_SET);
while (fgets(buf, 256, reply)) {
MIOFILE mf;
XML_PARSER xp(&mf);
mf.init_file(reply);
while (!xp.get_tag()) {
#ifdef SHOW_REPLY
printf("query_completed_job reply: %s", buf);
printf("query_completed_job reply: %s\n", xp.parsed_tag);
#endif
if (parse_int(buf, "<error_num>", retval)) continue;
if (parse_str(buf, "<error_msg>", error_msg)) continue;
if (strstr(buf, "<completed_job>")) {
MIOFILE mf;
XML_PARSER xp(&mf);
mf.init_file(reply);
if (xp.match_tag("completed_job")) {
retval = jd.parse(xp);
}
if (xp.match_tag("error")) {
ERROR error;
error.parse(xp);
if (error.error_num) {
retval = error.error_num;
error_msg = error.error_msg;
}
}
}
fclose(reply);
return retval;
@ -891,18 +992,27 @@ int retire_batch(
return retval;
}
retval = -1;
error_msg = "";
bool success;
fseek(reply, 0, SEEK_SET);
while (fgets(buf, 256, reply)) {
MIOFILE mf;
XML_PARSER xp(&mf);
mf.init_file(reply);
while (!xp.get_tag()) {
#ifdef SHOW_REPLY
printf("retire_batch reply: %s", buf);
printf("retire_batch reply: %s\n", xp.parsed_tag);
#endif
if (parse_int(buf, "<error_num>", retval)) continue;
if (parse_str(buf, "<error_msg>", error_msg)) continue;
if (strstr(buf, "success")) {
if (xp.parse_bool("success", success)) {
retval = 0;
continue;
}
if (xp.match_tag("error")) {
ERROR error;
error.parse(xp);
if (error.error_num) {
retval = error.error_num;
error_msg = error.error_msg;
}
}
}
fclose(reply);
return retval;
@ -934,18 +1044,28 @@ int set_expire_time(
return retval;
}
retval = -1;
bool success;
error_msg = "";
fseek(reply, 0, SEEK_SET);
while (fgets(buf, 256, reply)) {
MIOFILE mf;
XML_PARSER xp(&mf);
mf.init_file(reply);
while (!xp.get_tag()) {
#ifdef SHOW_REPLY
printf("set_expire_time reply: %s", buf);
printf("set_expire_time reply: %s\n", xp.parsed_tag);
#endif
if (parse_int(buf, "<error_num>", retval)) continue;
if (parse_str(buf, "<error_msg>", error_msg)) continue;
if (strstr(buf, "success")) {
if (xp.parse_bool("success", success)) {
retval = 0;
continue;
}
if (xp.match_tag("error")) {
ERROR error;
error.parse(xp);
if (error.error_num) {
retval = error.error_num;
error_msg = error.error_msg;
}
}
}
fclose(reply);
return retval;
@ -967,18 +1087,28 @@ int ping_server(
return retval;
}
retval = -1;
bool success;
error_msg = "";
fseek(reply, 0, SEEK_SET);
while (fgets(buf, 256, reply)) {
MIOFILE mf;
XML_PARSER xp(&mf);
mf.init_file(reply);
while (!xp.get_tag()) {
#ifdef SHOW_REPLY
printf("reply: %s\n", buf);
printf("ping_server() reply: %s\n", xp.parsed_tag);
#endif
if (parse_int(buf, "<error_num>", retval)) continue;
if (parse_str(buf, "<error_msg>", error_msg)) continue;
if (strstr(buf, "success")) {
if (xp.parse_bool("success", success)) {
retval = 0;
continue;
}
if (xp.match_tag("error")) {
ERROR error;
error.parse(xp);
if (error.error_num) {
retval = error.error_num;
error_msg = error.error_msg;
}
}
}
fclose(reply);
return retval;

View File

@ -41,6 +41,11 @@ using std::set;
using std::string;
using std::vector;
//#define DEBUG
// if set, handle commands synchronously rather than
// handling them in separate threads
// Also print more errors
extern size_t strlcpy(char*, const char*, size_t);
char project_url[256];
@ -57,10 +62,6 @@ bool async_mode = false;
#define BPRINTF(fmt, ...) \
printf( "%s" fmt, response_prefix, ##__VA_ARGS__ ); \
bool debug_mode = false;
// if set, handle commands synchronously rather than
// handling them in separate threads
struct SUBMIT_REQ {
char batch_name[256];
char app_name[256];
@ -206,13 +207,19 @@ int process_input_files(SUBMIT_REQ& req, string& error_msg) {
absent_files,
error_msg
);
if (retval) return retval;
if (retval) {
#ifdef DEBUG
printf("query_files() failed (%d): %s\n", retval, error_msg.c_str());
#endif
return retval;
}
// upload the missing files.
//
vector<string> upload_boinc_names, upload_paths;
for (unsigned int i=0; i<absent_files.size(); i++) {
int j = absent_files[i];
printf("file %d is absent\n", j);
upload_boinc_names.push_back(boinc_names[j]);
upload_paths.push_back(paths[j]);
}
@ -224,7 +231,12 @@ int process_input_files(SUBMIT_REQ& req, string& error_msg) {
req.batch_id,
error_msg
);
if (retval) return retval;
if (retval) {
#ifdef DEBUG
printf("upload_files() failed (%d): %s\n", retval, error_msg.c_str());
#endif
return retval;
}
// fill in the physical file names in the submit request
//
@ -767,25 +779,25 @@ int handle_command(char* p) {
delete cp;
return 0;
}
if (debug_mode) {
handle_command_aux(cp);
BPRINTF("result: %s\n", cp->out);
delete cp;
} else {
printf("S\n");
commands.push_back(cp);
pthread_t thread_handle;
pthread_attr_t thread_attrs;
pthread_attr_init(&thread_attrs);
pthread_attr_setstacksize(&thread_attrs, 256*1024);
int retval = pthread_create(
&thread_handle, &thread_attrs, &handle_command_aux, cp
);
if (retval) {
fprintf(stderr, "can't create thread\n");
return -1;
}
#ifdef DEBUG
handle_command_aux(cp);
BPRINTF("result: %s\n", cp->out);
delete cp;
#else
printf("S\n");
commands.push_back(cp);
pthread_t thread_handle;
pthread_attr_t thread_attrs;
pthread_attr_init(&thread_attrs);
pthread_attr_setstacksize(&thread_attrs, 256*1024);
int retval = pthread_create(
&thread_handle, &thread_attrs, &handle_command_aux, cp
);
if (retval) {
fprintf(stderr, "can't create thread\n");
return -1;
}
#endif
}
free(p);
return 0;