// The contents of this file are subject to the Mozilla Public License // Version 1.0 (the "License"); you may not use this file except in // compliance with the License. You may obtain a copy of the License at // http://www.mozilla.org/MPL/ // // Software distributed under the License is distributed on an "AS IS" // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the // License for the specific language governing rights and limitations // under the License. // // The Original Code is the Berkeley Open Infrastructure for Network Computing. // // The Initial Developer of the Original Code is the SETI@home project. // Portions created by the SETI@home project are Copyright (C) 2002 // University of California at Berkeley. All Rights Reserved. // // Contributor(s): // // This file contains high-level logic for communicating with // scheduling servers, // and for merging the result of a scheduler RPC into the client state // Note: code for actually doing a scheduler RPC is in scheduler_op.C #include #include #include #include "crypt.h" #include "error_numbers.h" #include "file_names.h" #include "parse.h" #include "log_flags.h" #include "message.h" #include "scheduler_op.h" #include "client_state.h" // quantities like avg CPU time decay by a factor of e every week // #define EXP_DECAY_RATE (1./(3600*24*7)) #define SECONDS_IN_DAY 86400 // Decide whether to start a new file transfer // bool CLIENT_STATE::start_new_file_xfer() { // **** this should do a little more than this return true; } // estimate the days of work remaining // double CLIENT_STATE::current_water_days() { unsigned int i; RESULT* rp; double seconds_remaining=0; for (i=0; istate > RESULT_COMPUTE_DONE) continue; // TODO: subtract time already finished for WUs in progress seconds_remaining += rp->wup->seconds_to_complete; } return (seconds_remaining * SECONDS_IN_DAY); } // seconds of work needed to come up to high-water mark // double CLIENT_STATE::work_needed_secs() { double x = current_water_days(); if (x > prefs->high_water_days) return 0; return (prefs->high_water_days - x)*86400; } // update exponentially-averaged CPU times of all projects // void CLIENT_STATE::update_avg_cpu(PROJECT* p) { time_t now = time(0); double deltat = now - p->exp_avg_mod_time; if (deltat > 0) { if (p->exp_avg_cpu != 0) { p->exp_avg_cpu *= exp(deltat*EXP_DECAY_RATE); } p->exp_avg_mod_time = now; } } // find a project that needs its master file parsed // PROJECT* CLIENT_STATE::next_project_master_pending() { unsigned int i; PROJECT* p; time_t now = time(0); for (i=0; imin_rpc_time > now ) continue; if (p->master_url_fetch_pending) { return p; } } return 0; } // return the next project after "old", in debt order, // that is eligible for a scheduler RPC // PROJECT* CLIENT_STATE::next_project(PROJECT* old) { PROJECT* p, *pbest; int best = 999; time_t now = time(0); unsigned int i; pbest = 0; for (i=0; imin_rpc_time > now ) continue; if (old && p->debt_order <= old->debt_order) continue; if (p->debt_order < best) { pbest = p; best = p->debt_order; } } return pbest; } #if 0 // choose a project to ask for work // PROJECT* CLIENT_STATE::choose_project() { PROJECT* p, *bestp; unsigned int i; double best_ratio, ratio; // update the average CPU times of all projects // for (i=0; inext_request_time) continue; if (p->exp_avg_cpu == 0) return p; ratio = p->resource_share/p->exp_avg_cpu; if (ratio >= best_ratio) { best_ratio = ratio; bestp = p; } } return bestp; } #endif // Compute the "resource debt" of each project. This is used // to determine what project we will focus on next, based on // the user specified resource share // void CLIENT_STATE::compute_resource_debts() { unsigned int i, j; PROJECT* p, *pbest; double best; for (i=0; iexp_avg_cpu == 0) { p->resource_debt = p->resource_share; } else { p->resource_debt = p->resource_share/p->exp_avg_cpu; } p->debt_order = -1; } // put in decreasing order. Should use qsort or some stdlib thang // for (i=0; idebt_order >= 0) continue; if (p->resource_debt > best) { best = p->resource_debt; pbest = p; } } pbest->debt_order = i; } } // Prepare the scheduler request. This writes the request in XML to a // file (SCHED_OP_REQUEST_FILE) which is later sent to the scheduling // server // int CLIENT_STATE::make_scheduler_request(PROJECT* p, double work_req) { FILE* f = fopen(SCHED_OP_REQUEST_FILE, "wb"); unsigned int i; RESULT* rp; if (!f) return ERR_FOPEN; fprintf(f, "\n" " %s\n" " %d\n" " %d\n" " %s\n" " %d\n" " %f\n", p->authenticator, p->hostid, p->rpc_seqno, platform_name, version, work_req ); if (p->code_sign_key) { fprintf(f, "\n%s\n", p->code_sign_key); } FILE* fprefs = fopen(PREFS_FILE_NAME, "r"); if (!fprefs) return ERR_FOPEN; copy_stream(fprefs, f); fclose(fprefs); time_stats.write(f, true); net_stats.write(f, true); host_info.write(f); for (i=0; iproject == p && rp->state == RESULT_READY_TO_ACK) { rp->write(f, true); } } fprintf(f, "\n"); fclose(f); return 0; } // find a project with results that are overdue to report, // and which we're allowed to contact. // PROJECT* CLIENT_STATE::find_project_with_overdue_results() { unsigned int i; RESULT* r; time_t now = time(0); for (i=0; istate == RESULT_READY_TO_ACK) { if (r->project->min_rpc_time < now) { return r->project; } } } return 0; } // return true if we're allowed to do a scheduler RPC to at least one project // bool CLIENT_STATE::some_project_rpc_ok() { unsigned int i; time_t now = time(0); for (i=0; imin_rpc_time < now) return true; } return false; } // called from the client's polling loop. // initiate scheduler RPC activity if needed and possible // bool CLIENT_STATE::scheduler_rpc_poll() { double work_secs; PROJECT* p; bool action, below_low_water; switch(scheduler_op->state) { case SCHEDULER_OP_STATE_IDLE: below_low_water = (current_water_days() <= prefs->low_water_days); if (below_low_water && some_project_rpc_ok()) { compute_resource_debts(); scheduler_op->init_get_work(); action = true; } else { p = find_project_with_overdue_results(); if (p) { compute_resource_debts(); if (p->debt_order == 0) { work_secs = work_needed_secs(); } else { work_secs = 0; } scheduler_op->init_return_results(p, work_secs); action = true; } } break; default: scheduler_op->poll(); if (scheduler_op->state == SCHEDULER_OP_STATE_IDLE) { action = true; } break; } return action; } #if 0 // manage the task of maintaining an adequate supply of work. // bool CLIENT_STATE::get_work() { PROJECT* project; int retval, work_secs; bool action=false; if (need_work()) { switch(scheduler_op->state) { case SCHEDULER_OP_STATE_IDLE: // if no scheduler request pending, start one // project = choose_project(); if (!project) { if (log_flags.sched_op_debug) { printf("all projects temporarily backed off\n"); } return false; } work_secs = (int) (prefs->high_water_days - current_water_days())*86400; retval = make_scheduler_request(project, work_secs); if (retval) { fprintf(stderr, "make_scheduler_request: %d\n", retval); break; } scheduler_op->start_op(project); action = true; break; default: retval = scheduler_op->poll(); if (scheduler_op->state == SCHEDULER_OP_STATE_DONE) { scheduler_op->state = SCHEDULER_OP_STATE_IDLE; action = true; if (scheduler_op->scheduler_op_retval) { fprintf(stderr, "scheduler RPC to %s failed: %d\n", scheduler_op->project->master_url, scheduler_op->scheduler_op_retval ); if (log_flags.sched_ops) { printf("HTTP work request failed: %d\n", retval); } break; } else { handle_scheduler_reply(*scheduler_op); set_client_state_dirty(); } } break; } } return action; } #endif // see whether a new preferences set, obtained from the given // project, looks "reasonable". This is to prevent accidental or // intentional preferences corruption // Currently this is primitive: just make sure there's at least 1 project // TODO: figure out what else to look for here bool PREFS::looks_reasonable(PROJECT& project) { if (projects.size() > 0) return true; return false; } // Parse the reply from a scheduler and configure internal structures // appropriately // void CLIENT_STATE::handle_scheduler_reply( PROJECT* project, char* scheduler_url ) { SCHEDULER_REPLY sr; FILE* f; int retval; unsigned int i; char prefs_backup[256]; PROJECT *pp, *sp; PREFS* new_prefs; bool signature_valid; contacted_sched_server = true; if (log_flags.sched_op_debug) { f = fopen(SCHED_OP_RESULT_FILE, "r"); printf("------------- SCHEDULER REPLY ----------\n"); copy_stream(f, stdout); fclose(f); printf("------------- END ----------\n"); } f = fopen(SCHED_OP_RESULT_FILE, "r"); retval = sr.parse(f); if (strlen(sr.message)) { show_message(sr.message, sr.message_priority); } if (sr.request_delay) { project->min_rpc_time = time(0) + sr.request_delay; } if (sr.hostid) { project->hostid = sr.hostid; project->rpc_seqno = 0; } // if the scheduler reply includes preferences // that are newer than what we have on disk, then // - verify that the new prefs look reasonable; // they should include at least one project. // - rename the current prefs file // - copy new preferences to prefs.xml // - copy any new projects into the CLIENT_STATE structure // - update the preferences info of projects already in CLIENT_STATE // // There may be projects in CLIENT_STATE that are not in the new prefs; // i.e. the user has dropped out of these projects. // We'll continue with any ongoing work for these projects, // but they'll be dropped the next time the client starts up. // if (sr.prefs_mod_time > prefs->mod_time) { f = fopen(PREFS_TEMP_FILE_NAME, "w"); fprintf(f, "\n" " %d\n" " %s\n" " %s\n", sr.prefs_mod_time, project->master_url, scheduler_url ); fputs(sr.prefs_xml, f); fprintf(f, "\n" ); fclose(f); f = fopen(PREFS_TEMP_FILE_NAME, "r"); new_prefs = new PREFS; new_prefs->parse(f); fclose(f); if (new_prefs->looks_reasonable(*project)) { make_prefs_backup_name(*prefs, prefs_backup); rename(PREFS_FILE_NAME, prefs_backup); rename(PREFS_TEMP_FILE_NAME, PREFS_FILE_NAME); for (i=0; iprojects.size(); i++) { pp = new_prefs->projects[i]; sp = lookup_project(pp->master_url); if (sp) { sp->copy_prefs_fields(*pp); } else { projects.push_back(pp); } } delete prefs; prefs = new_prefs; } else { fprintf(stdout, "New preferences don't look reasonable, ignoring\n"); } } // if the scheduler reply includes a code-signing key, // accept it if we don't already have one from the project. // Otherwise verify its signature, using the key we already have. // if (sr.code_sign_key) { if (!project->code_sign_key) { project->code_sign_key = strdup(sr.code_sign_key); } else { if (sr.code_sign_key_signature) { retval = verify_string2( sr.code_sign_key, sr.code_sign_key_signature, project->code_sign_key, signature_valid ); if (!retval && signature_valid) { free(project->code_sign_key); project->code_sign_key = strdup(sr.code_sign_key); } else { fprintf(stdout, "New code signing key from %s doesn't validate\n", project->project_name ); } } else { fprintf(stdout, "Missing code sign key signature\n"); } } } // copy new entities to client state // for (i=0; iversion_num = latest_version_num(wup->app_name); retval = link_workunit(project, wup); if (!retval) { workunits.push_back(wup); } } } for (i=0; istate = RESULT_NEW; } } // update records for ack'ed results // for (i=0; istate = RESULT_SERVER_ACK; } else { fprintf(stderr, "ERROR: got ack for result %s, can't find\n", sr.result_acks[i].name ); } } if (log_flags.state_debug) { printf("State after handle_scheduler_reply():\n"); print_counts(); } }