// The contents of this file are subject to the Mozilla Public License // Version 1.0 (the "License"); you may not use this file except in // compliance with the License. You may obtain a copy of the License at // http://www.mozilla.org/MPL/ // // Software distributed under the License is distributed on an "AS IS" // basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the // License for the specific language governing rights and limitations // under the License. // // The Original Code is the Berkeley Open Infrastructure for Network Computing. // // The Initial Developer of the Original Code is the SETI@home project. // Portions created by the SETI@home project are Copyright (C) 2002 // University of California at Berkeley. All Rights Reserved. // // Contributor(s): // // This file contains high-level logic for communicating with // scheduling servers, // and for merging the result of a scheduler RPC into the client state // Note: code for actually doing a scheduler RPC is in scheduler_op.C #include #include #include #include "windows_cpp.h" #include "crypt.h" #include "error_numbers.h" #include "file_names.h" #include "filesys.h" #include "parse.h" #include "log_flags.h" #include "message.h" #include "scheduler_op.h" #include "client_state.h" // quantities like avg CPU time decay by a factor of e every week // #define SECONDS_IN_DAY (3600*24) #define EXP_DECAY_RATE (1./(SECONDS_IN_DAY*7)) // estimate the days of work remaining // double CLIENT_STATE::current_water_days() { unsigned int i; RESULT* rp; double seconds_remaining=0; for (i=0; istate > RESULT_COMPUTE_DONE) continue; // TODO: subtract time already finished for WUs in progress seconds_remaining += rp->wup->seconds_to_complete; } return (seconds_remaining * SECONDS_IN_DAY); } // seconds of work needed to come up to high-water mark // double CLIENT_STATE::work_needed_secs() { double x = current_water_days(); if (x > prefs.high_water_days) return 0; return (prefs.high_water_days - x)*86400; } // update exponentially-averaged CPU times of all projects // void CLIENT_STATE::update_avg_cpu(PROJECT* p) { time_t now = time(0); double deltat = now - p->exp_avg_mod_time; if (deltat > 0) { if (p->exp_avg_cpu != 0) { p->exp_avg_cpu *= exp(deltat*EXP_DECAY_RATE); } p->exp_avg_mod_time = now; } } // find a project that needs its master file parsed // PROJECT* CLIENT_STATE::next_project_master_pending() { unsigned int i; PROJECT* p; time_t now = time(0); for (i=0; imin_rpc_time > now ) continue; if (p->master_url_fetch_pending) { return p; } } return 0; } // return the next project after "old", in debt order, // that is eligible for a scheduler RPC // PROJECT* CLIENT_STATE::next_project(PROJECT* old) { PROJECT* p, *pbest; int best = 999; time_t now = time(0); unsigned int i; pbest = 0; for (i=0; imin_rpc_time > now ) continue; if (old && p->debt_order <= old->debt_order) continue; if (p->debt_order < best) { pbest = p; best = p->debt_order; } } return pbest; } // Compute the "resource debt" of each project. // This is used to determine what project we will focus on next, // based on the user-specified resource share. // TODO: this counts only CPU time. Should reflect disk/network usage too. // void CLIENT_STATE::compute_resource_debts() { unsigned int i, j; PROJECT* p, *pbest; double best; for (i=0; iexp_avg_cpu == 0) { p->resource_debt = p->resource_share; } else { p->resource_debt = p->resource_share/p->exp_avg_cpu; } p->debt_order = -1; } // put in decreasing order. Should use qsort or some stdlib thang // for (i=0; idebt_order >= 0) continue; if (p->resource_debt > best) { best = p->resource_debt; pbest = p; } } pbest->debt_order = i; } } // Prepare the scheduler request. This writes the request in XML to a // file (SCHED_OP_REQUEST_FILE) which is later sent to the scheduling // server // int CLIENT_STATE::make_scheduler_request(PROJECT* p, double work_req) { FILE* f = fopen(SCHED_OP_REQUEST_FILE, "wb"); unsigned int i; RESULT* rp; if (!f) return ERR_FOPEN; fprintf(f, "\n" " %s\n" " %d\n" " %d\n" " %s\n" " %d\n" " %f\n", p->authenticator, p->hostid, p->rpc_seqno, platform_name, core_client_version, work_req ); if (p->code_sign_key) { fprintf(f, "\n%s\n", p->code_sign_key); } FILE* fprefs = fopen(PREFS_FILE_NAME, "r"); if (!fprefs) return ERR_FOPEN; copy_stream(fprefs, f); fclose(fprefs); time_stats.write(f, true); net_stats.write(f, true); host_info.write(f); for (i=0; iproject == p && rp->state == RESULT_READY_TO_ACK) { rp->write(f, true); } } fprintf(f, "\n"); fclose(f); return 0; } // find a project with results that are overdue to report, // and which we're allowed to contact. // PROJECT* CLIENT_STATE::find_project_with_overdue_results() { unsigned int i; RESULT* r; time_t now = time(0); for (i=0; istate == RESULT_READY_TO_ACK) { if (r->project->min_rpc_time < now) { return r->project; } } } return 0; } // return true if we're allowed to do a scheduler RPC to at least one project // bool CLIENT_STATE::some_project_rpc_ok() { unsigned int i; time_t now = time(0); for (i=0; imin_rpc_time < now) return true; } return false; } // called from the client's polling loop. // initiate scheduler RPC activity if needed and possible // bool CLIENT_STATE::scheduler_rpc_poll() { double work_secs; PROJECT* p; bool action=false, below_low_water; switch(scheduler_op->state) { case SCHEDULER_OP_STATE_IDLE: below_low_water = (current_water_days() <= prefs.low_water_days); if (below_low_water && some_project_rpc_ok()) { compute_resource_debts(); scheduler_op->init_get_work(); action = true; } else { p = find_project_with_overdue_results(); if (p) { compute_resource_debts(); if (p->debt_order == 0) { work_secs = work_needed_secs(); } else { work_secs = 0; } scheduler_op->init_return_results(p, work_secs); action = true; } } break; default: scheduler_op->poll(); if (scheduler_op->state == SCHEDULER_OP_STATE_IDLE) { action = true; } break; } return action; } // Parse the reply from a scheduler // void CLIENT_STATE::handle_scheduler_reply( PROJECT* project, char* scheduler_url ) { SCHEDULER_REPLY sr; FILE* f; int retval; unsigned int i; bool signature_valid; contacted_sched_server = true; if (log_flags.sched_op_debug) { f = fopen(SCHED_OP_RESULT_FILE, "r"); printf("------------- SCHEDULER REPLY ----------\n"); copy_stream(f, stdout); fclose(f); printf("------------- END ----------\n"); } f = fopen(SCHED_OP_RESULT_FILE, "r"); retval = sr.parse(f); fclose(f); if (strlen(sr.project_name)) { strcpy(project->project_name, sr.project_name); } if (strlen(sr.user_name)) { strcpy(project->user_name, sr.user_name); } project->total_credit = sr.total_credit; project->expavg_credit = sr.expavg_credit; if (strlen(sr.message)) { show_message(sr.message, sr.message_priority); } if (sr.request_delay) { project->min_rpc_time = time(0) + sr.request_delay; } if (sr.hostid) { project->hostid = sr.hostid; project->rpc_seqno = 0; } // if the scheduler reply includes preferences // that are newer than what we have on disk, write them to disk // if (sr.prefs_mod_time > prefs.mod_time) { f = fopen(PREFS_FILE_NAME, "w"); fprintf(f, "\n" " %d\n" " %s\n" " %s\n", sr.prefs_mod_time, project->master_url, scheduler_url ); fputs(sr.prefs_xml, f); fprintf(f, "\n" ); fclose(f); prefs.parse_file(); } // if the scheduler reply includes a code-signing key, // accept it if we don't already have one from the project. // Otherwise verify its signature, using the key we already have. // if (sr.code_sign_key) { if (!project->code_sign_key) { project->code_sign_key = strdup(sr.code_sign_key); } else { if (sr.code_sign_key_signature) { retval = verify_string2( sr.code_sign_key, sr.code_sign_key_signature, project->code_sign_key, signature_valid ); if (!retval && signature_valid) { free(project->code_sign_key); project->code_sign_key = strdup(sr.code_sign_key); } else { fprintf(stdout, "New code signing key from %s doesn't validate\n", project->project_name ); } } else { fprintf(stdout, "Missing code sign key signature\n"); } } } // copy new entities to client state // for (i=0; iversion_num = latest_version_num(wup->app_name); retval = link_workunit(project, wup); if (!retval) { workunits.push_back(wup); } } } for (i=0; istate = RESULT_NEW; } } // update records for ack'ed results // for (i=0; istate = RESULT_SERVER_ACK; } else { fprintf(stderr, "ERROR: got ack for result %s, can't find\n", sr.result_acks[i].name ); } } set_client_state_dirty("handle_scheduler_reply"); if (log_flags.state_debug) { printf("State after handle_scheduler_reply():\n"); print_counts(); } }