Main Page   Class Hierarchy   Data Structures   File List   Data Fields   Globals  

multiscatter3_pvm.C

Go to the documentation of this file.
00001 
00002 #include "sigma3.h"
00003 #include "pvm_scatt.h"
00004 
00005 #ifdef HAS_PVM
00006 
00007 // Note: ALL the PVM code for sigma3 is contained in this file.
00008 
00009 #include "pvm3.h"
00010 
00011 //----------------------------------------------------------------------------
00012 
00013 // Externally visible functions (defined at end):
00014 
00015 void initialize_processors(int ntask, int debug);
00016 void terminate_processors(int debug);
00017 int multiscatter3(scatter_profile & prof, sigma_out & out,
00018                   real rho_sq_min, real rho_sq_max, int rho_zone,
00019                   real dt_snap, real snap_cube_size,
00020                   real cpu_time_check, real cpu_init, real &cpu_save,
00021                   int& scatt_total, real& cpu_total, stat_fp acc_stats,
00022                   int debug, int scatter_summary_flag);
00023 
00024 //----------------------------------------------------------------------------
00025 
00026 #define RECV_TIMEOUT    3600            // time out on receive after 1 hour
00027 
00028 // Global variables (note capitalization):
00029 
00030 static char Exe[128];                   // slave process name
00031 
00032 #define NTASK_MAX       16              // maximum number of tasks to start
00033 
00034 static int  Tid[NTASK_MAX];             // TID of each task
00035 static int  Nscatt[NTASK_MAX];          // # scatterings performed by each task
00036 static real Cpu[NTASK_MAX];             // total CPU time consumed by each task
00037 static char Name[NTASK_MAX][128];       // host name of each task
00038 
00039 static int  Random_seed[NTASK_MAX];     // last random seed used for each task
00040 static initial_state3 Init[NTASK_MAX];  // last init sent to each task
00041 
00042 static int  N_task_init = 0;            // task # runs from 0 to N_task_init-1
00043 
00044 //----------------------------------------------------------------------------
00045 
00046 local int task_num(int task_id)
00047 {
00048     for (int task = 0; task < N_task_init; task++)
00049         if (Tid[task] == task_id) return task;
00050     return -1;
00051 }
00052 
00053 local char* host_name(int task_id)
00054 {
00055     int task = task_num(task_id);
00056     if (task >= 0)
00057         return Name[task];
00058     else
00059         return NULL;
00060 }
00061 
00062 local void initialize_task(int task, int reinit, int io)
00063 {
00064     if (reinit) {
00065 
00066       // Start a new task on a specific processor.
00067 
00068       if ( pvm_spawn(Exe, (char**)0, 1,
00069                      Name[task], 1, Tid+task) != 1) {
00070           cerr << "Unable to re-spawn task #" << task << endl;
00071           exit(1);
00072       }
00073 
00074     } else {
00075 
00076         // Start a brand new task on any available processor.
00077       
00078       if ( pvm_spawn(Exe, (char**)0, 33, ".", 1, Tid+task) != 1) {
00079           cerr << "Unable to spawn task #" << task << endl;
00080           exit(1);
00081       }
00082 
00083     }
00084 
00085     if (pvm_recv(Tid[task], HANDSHAKE_MSG) < 0) {
00086         cerr << "Error getting new ID for task #" << task << endl;
00087         exit(1);
00088     }
00089 
00090     pvm_upkstr(Name[task]);
00091 
00092     if (io) {
00093         cerr << (reinit ? "Rei" : "I");
00094         cerr << "nitialized PVM task #" << task
00095              <<",  TID = " << Tid[task]
00096              << ",  node = " << Name[task] << endl << flush;
00097     }
00098 
00099     // Seek notification on task exit (including fatal error).
00100 
00101     pvm_notify(PvmTaskExit, FAILURE_MSG, 1, Tid+task);
00102 
00103 }
00104 
00105 local void initialize_scattering(scatter_profile & prof,
00106                                  real rho_sq_min, real rho_sq_max,
00107                                  int tid, int scatter_summary,
00108                                  real cpu_time_check,
00109                                  real dt_snap, real snap_cube_size)
00110 {
00111     int task = task_num(tid);
00112     single_scatter_init(prof, rho_sq_min, rho_sq_max,
00113                         Init[task], Random_seed[task],
00114                         scatter_summary, dt_snap, snap_cube_size);
00115 
00116     // Pack the data.
00117 
00118     pvm_initsend(PvmDataRaw);
00119 
00120     pvm_pkint(Random_seed+task, 1, 1);
00121     pack(Init[task]);
00122 
00123     pvm_pkdouble(&cpu_time_check, 1, 1);
00124     pvm_pkdouble(&dt_snap, 1, 1);
00125     pvm_pkdouble(&snap_cube_size, 1, 1);
00126 
00127     // Send the data as message SEND_DATA_MSG.
00128 
00129     pvm_send(tid, SEND_DATA_MSG);
00130 
00131     if (scatter_summary > 0)
00132         cerr << "task " << task << ":  sent data to " << tid
00133              << " (" << host_name(tid) << ")\n" << flush;
00134     
00135 }
00136 
00137 local void closeout_tasks(scatter_profile & prof,
00138                           sigma_out & out, int rho_zone,
00139                           real dt_snap, real snap_cube_size,
00140                           stat_fp acc_stats, int debug,
00141                           int n_sent, int & n_complete,
00142                           intermediate_state3 & inter,
00143                           final_state3 & final)
00144 {
00145     if (n_sent < out.trials_per_zone) {
00146 
00147         terminate_processors(debug);
00148         pvm_exit();
00149         cerr << "Unable to continue -- cannot identify failed task\n";
00150         err_exit("multiscatter3: calculation aborted.");
00151 
00152     } else {
00153 
00154         // Close out all remaining scatterings and add the
00155         // appropriate number of errors into the statistics.
00156 
00157         // Note: only the structure elements listed here are
00158         // necessary in the case of an error.
00159 
00160         inter.descriptor = unknown_intermediate;
00161         inter.n_osc = 0;
00162 
00163         final.descriptor = error;
00164         final.time = 0;
00165         final.error = 0;
00166         final.n_steps = 0;
00167 
00168         cerr << n_sent - n_complete << " uncompleted tasks:\n";
00169 
00170         for (int task = 0; task < N_task_init; task++)
00171 
00172             if (Random_seed[task] > 0) {
00173 
00174                 summarize_scattering_initial(Init[task],
00175                                              Random_seed[task],
00176                                              dt_snap, snap_cube_size);
00177 
00178                 // Close out this task by accumulating a (fake)
00179                 // error status in the statistics.
00180 
00181                 single_scatter_stats(prof, Init[task], inter, final,
00182                                      rho_zone,
00183                                      out, acc_stats, 0);
00184 
00185                 Random_seed[task] = 0;
00186                 n_complete++;
00187 
00188                 // The task still hasn't completed, so it will not be
00189                 // usable for further scatterings.  Kill and restart it here.
00190 
00191                 // Keep the old task number, cpu, and nscatt.
00192                 // Modify only the TID.
00193 
00194                 // Note that the TID will apparently be garbled after
00195                 // we kill the task, so we can't verify the source of a
00196                 // FAILURE_MSG signal.  Solve this problem by identifying
00197                 // the notification with a different message ID.
00198 
00199                 pvm_notify(PvmTaskExit, KILL_CONFIRM, 1, Tid+task);
00200 
00201                 if (pvm_kill(Tid[task]) < 0) {
00202 
00203                     cerr << "Unable to kill task #" << task << endl;
00204                     exit(1);
00205 
00206                 } else {
00207 
00208                     cerr << "Task #" << task << " ("
00209                          << Tid[task] << ", node " << Name[task]
00210                          << ") killed..." << flush;
00211 
00212                 }
00213 
00214                 // Wait for confirmation of the kill.
00215 
00216                 struct timeval timeout;
00217                 timeout.tv_sec = 600;           // Give up after 10 minutes
00218                 int bufid;
00219 
00220                 // Return values for bufid:     < 0 ==> error
00221                 //                              = 0 ==> timeout?      <--- ???
00222                 //                              > 0 ==> success
00223 
00224                 if ( (bufid = pvm_trecv(-1, KILL_CONFIRM, &timeout)) <= 0 ) {
00225 
00226                     if (bufid < 0)
00227                         cerr << "error receiving confirmation.\n";
00228                     else
00229                         cerr << "timeout receiving confirmation.\n";
00230 
00231                     exit(1);
00232 
00233                 } else {
00234 
00235                     int tid = -1;
00236 
00237                     if (pvm_upkint(&tid, 1, 1) < 0) {
00238                         cerr << "error unpacking confirmation message.\n";
00239                         exit(1);
00240                     } else
00241                         cerr << "confirmation received from TID "
00242                              << tid << ".\n";
00243 
00244                 }
00245 
00246                 // Wait for 10 us before continuing.
00247 
00248                 starlab_wait(10);
00249 
00250                 // Flush spurious messages (apparently coming from
00251                 // deleted tasks).
00252 
00253                 int n_flush = 0;
00254                 while ((bufid = pvm_probe(-1, FAILURE_MSG)) > 0) {
00255 
00256                     pvm_recv(-1, FAILURE_MSG);
00257 
00258                     int tid = -1;
00259                     if (pvm_upkint(&tid, 1, 1) == 0) {
00260                         if (tid != Tid[task]) {
00261                             cerr << "..." << tid;
00262                             n_flush++;
00263                         }
00264                     }
00265 
00266                 }
00267 
00268                 if (n_flush > 0)
00269                     cerr << ":  " << n_flush << " unexpected failure message"
00270                          << (n_flush > 1 ? "s" : "")
00271                          << " flushed\n";
00272 
00273                 // Start up a new task.
00274 
00275                 initialize_task(task, 1, 1);
00276 
00277             }
00278 
00279         if (n_complete != n_sent)
00280             cerr << "warning: " << n_sent - n_complete
00281                 << " tasks unaccounted for...\n";
00282     }
00283 }
00284 
00285 //----------------------------------------------------------------------------
00286 
00287 // Externally visible functions:
00288 
00289 void initialize_processors(int ntask, int debug)
00290 {
00291     // Initialize up to NTASK_MAX scattering processes.
00292     // Should really determine the number of processors available.
00293 
00294     N_task_init = min(NTASK_MAX, ntask);
00295 
00296     if (getenv("PVM_ROOT") == NULL) err_exit("PVM not available"); // Run time
00297                                                                    // check...
00298 
00299     strcpy(Exe, getenv("STARLAB_PATH"));        // ASSUME that these
00300     if (Exe[0] == '\0')                         // environment variables
00301         strcpy(Exe, getenv("PWD"));             // are defined
00302     else
00303         strcat(Exe, "/bin");
00304     strcat(Exe, "/scatter3_slave.pvm");
00305 
00306     // cerr << "slave executable name is " << Exe << endl;
00307 
00308     for (int task = 0; task < N_task_init; task++) {
00309 
00310         initialize_task(task, 0, 0);
00311 
00312         Cpu[task] = 0;
00313         Nscatt[task] = 0;
00314 
00315     }
00316 
00317     // On exit, we have established N_task_init processes, which are now
00318     // waiting for data.
00319 
00320     cerr << N_task_init << " PVM slave processes initialized\n";
00321 
00322     if (debug) {
00323         cerr << "\nhost names:";
00324         for (int i = 0; i < N_task_init; i += 4) {
00325             for (int task = i; task < min(i+4, N_task_init); task++)
00326                 cerr << "  " << Name[task];
00327             cerr << endl;
00328             if (i < N_task_init - 4) cerr << "           ";
00329         }
00330 
00331         cerr << "\nhost TIDs:";
00332         for (int i = 0; i < N_task_init; i += 4) {
00333             for (int task = i; task < min(i+4, N_task_init); task++)
00334                 cerr << "  " << Tid[task];
00335             cerr << endl;
00336             if (i < N_task_init - 4) cerr << "          ";
00337         }
00338     }
00339 
00340 }
00341 
00342 void terminate_processors(int debug)
00343 {
00344     for (int task = 0; task < N_task_init; task++) {
00345 
00346         pvm_initsend(PvmDataRaw);
00347 
00348         // Send message TERMINATE_MSG to terminate.
00349 
00350         if (pvm_send(Tid[task], TERMINATE_MSG) < 0)
00351             cerr << "Error sending termination message to task = "
00352                  << Tid[task] << "  (" << Name[task] << ")\n";
00353 
00354     }
00355 
00356     cerr << N_task_init << " PVM slave processes terminated\n";
00357 
00358     if (debug) {
00359 
00360         // Print use and load information.
00361 
00362         cerr << "\nscatterings per task:  ";
00363         for (int i = 0; i < N_task_init; i += 4) {
00364             for (int task = i; task < min(i+4, N_task_init); task++)
00365                 cerr << "  " << Nscatt[task];
00366             cerr << endl;
00367             if (i < N_task_init-4) cerr << "                       ";
00368         }
00369 
00370         cerr << "\nCPU %load distribution:";
00371         int p = cerr.precision(4);              // nonstandard precision
00372 
00373         real total = 0;
00374         for (int task = 0; task < N_task_init; task++) total += Cpu[task];
00375         for (int i = 0; i < N_task_init; i += 4) {
00376             for (int task = i; task < min(i+4, N_task_init); task++)
00377                 cerr << "  " << 100*Cpu[task]/total;
00378             cerr << endl;
00379             if (i < N_task_init-4) cerr << "                       ";
00380         }
00381         cerr.precision(p);
00382     }
00383 
00384     N_task_init = 0;
00385     pvm_exit();
00386 }
00387 
00388 // multiscatter3_pvm: Perform a specified number of scattering experiments in
00389 //                    a given impact parameter range, accumulating statistics
00390 //                    as we go.  Return the total number of hits in this range.
00391 //
00392 //                    Distribute the task over several processors using PVM.
00393 //
00394 // NOTE that this function is called multiscatter3, the same as the non-PVM
00395 // version.  It is designed to replace that version in its entirety...
00396 
00397 int multiscatter3(scatter_profile & prof, sigma_out & out,
00398                   real rho_sq_min, real rho_sq_max, int rho_zone,
00399                   real dt_snap, real snap_cube_size,
00400                   real cpu_time_check, real cpu_init, real &cpu_save,
00401                   int& scatt_total, real& cpu_total, stat_fp acc_stats,
00402                   int debug, int scatter_summary_flag)
00403 {
00404     int n_sent = 0, n_complete = 0;
00405     int total_hits = 0;
00406 
00407     if (N_task_init <= 0) err_exit("PVM not initialized.");
00408 
00409     // Start N_task_init scatterings.
00410 
00411     for (int task = 0; task < N_task_init; task++) {
00412 
00413         // Get the next scattering setup.
00414 
00415         // The function single_scatter_init will randomize the angles
00416         // in the same way as the standalone tool scatter3.
00417 
00418         initialize_scattering(prof, rho_sq_min, rho_sq_max, Tid[task],
00419                               scatter_summary_flag, cpu_time_check,
00420                               dt_snap, snap_cube_size);
00421         n_sent++;
00422     }
00423 
00424     // Now wait for data to come back and send new data as needed.
00425 
00426     int single_result;
00427     real cpu_scatter;
00428 
00429     while (n_complete < out.trials_per_zone) {
00430 
00431         int n_rand;
00432         initial_state3 init;
00433         intermediate_state3 inter;
00434         final_state3 final;
00435 
00436         int scatter_summary = scatter_summary_flag;
00437 
00438         struct timeval timeout;
00439         timeout.tv_sec = RECV_TIMEOUT;
00440         int bufid, length, msgid, tid_r;
00441 
00442         int task;
00443 
00444         // Return values for bufid:     < 0 ==> error
00445         //                              = 0 ==> timeout?        <--- ???
00446         //                              > 0 ==> success
00447 
00448         if ( (bufid = pvm_trecv(-1, -1, &timeout)) <= 0 ) {
00449 
00450             // A timeout or a system error has occurred.  Go straight to
00451             // the output phase if all data have been sent (i.e. don't wait
00452             // any longer for the remaining data to return).  Otherwise,
00453             // quit with a error.
00454 
00455             if (bufid < 0)
00456                 cerr << "\nError receiving data...\n";
00457             else
00458                 cerr << "\nTimeout receiving data...\n";
00459 
00460             // We cannot identify the failed task -- do the best we can.
00461 
00462             closeout_tasks(prof, out, rho_zone,
00463                            dt_snap, snap_cube_size,
00464                            acc_stats, debug,
00465                            n_sent, n_complete, inter, final);
00466             break;
00467 
00468         }
00469 
00470         length = msgid = tid_r = -1;
00471         int status = pvm_bufinfo(bufid, &length, &msgid, &tid_r);
00472         bool pvm_error = (status < 0 || length <= 0 || msgid < 0 || tid_r < 0);
00473 
00474         if (pvm_error) {
00475 
00476             cerr << "\nUnknown or illegal message received...\n";
00477 
00478             // Basic diagnostics:
00479 
00480             PRL(bufid);
00481             PRL(status);
00482             PRL(length);
00483             PRL(msgid);
00484             PRL(tid_r);
00485 
00486             PRL(n_sent);
00487             PRL(n_complete);
00488 
00489             closeout_tasks(prof, out, rho_zone,
00490                            dt_snap, snap_cube_size,
00491                            acc_stats, debug,
00492                            n_sent, n_complete, inter, final);
00493             break;
00494 
00495         }
00496 
00497         if (msgid == RETURN_DATA_MSG || msgid == FAILURE_MSG) {
00498 
00499             // Check for the failure of a child process.
00500 
00501             if (msgid == FAILURE_MSG) {
00502 
00503                 // A slave process has unexpectedly exited.  Print a message
00504                 // and start a new process.
00505 
00506                 pvm_upkint(&tid_r, 1, 1);
00507 
00508                 cerr << "\nTask " << tid_r << " (" << host_name(tid_r)
00509                      << ") has terminated prematurely.\n";
00510 
00511                 task = task_num(tid_r);
00512 
00513                 // Recover the initial conditions for use below.
00514 
00515                 n_rand = Random_seed[task];
00516                 init = Init[task];
00517 
00518                 // Start a new slave process.  (Keep old task number,
00519                 // cpu, and nscatt.  Modify only TID.)
00520 
00521                 initialize_task(task, 1, 1);
00522 
00523                 // Do NOT start a new scattering.  Instead, create a fake
00524                 // "error" return state.  (Only the structure elements
00525                 // listed here are necessary in the case of an error.)
00526 
00527                 inter.descriptor = unknown_intermediate;
00528                 inter.n_osc = 0;
00529 
00530                 final.descriptor = error;
00531                 final.time = 0;
00532                 final.error = 0;
00533                 final.n_steps = 0;
00534 
00535                 single_result = 0;
00536                 cpu_scatter = 0;
00537 
00538                 pvm_error = true;
00539 
00540             } else {
00541 
00542                 // Normal completion of a slave subtask.
00543 
00544                 task = task_num(tid_r);
00545 
00546                 if (scatter_summary > 0)
00547                     cerr << "\nreceived data from TID = " << tid_r
00548                          << " (" << Name[task] << ")\n" << flush;
00549 
00550                 pvm_upkint(&n_rand, 1, 1);
00551 
00552                 // Note: n_rand should be Random_seed[task],
00553                 //       init should be Init[task].
00554 
00555                 unpack(init);
00556                 unpack(inter);
00557                 unpack(final);
00558 
00559                 pvm_upkint(&single_result, 1, 1);
00560                 pvm_upkdouble(&cpu_scatter, 1, 1);
00561 
00562             }
00563 
00564             n_complete++;
00565             total_hits += single_result;
00566             scatt_total++;
00567             cpu_total += cpu_scatter;
00568 
00569             Nscatt[task]++;
00570             Cpu[task] += cpu_scatter;
00571 
00572             // Diagnostics:
00573 
00574             if (abs(debug) > 2) {
00575                 int p = cerr.precision(STD_PRECISION);
00576                 cerr << "single_scatter: rho_max^2 = " << rho_sq_max
00577                      << " ; returning with n_hit = " << single_result
00578                      << " and n_hit_tot = " << out.n_hit_tot + single_result
00579                      << endl;
00580                 cerr.precision(p);
00581             }
00582 
00583             if (final.descriptor == error) {
00584 
00585                 // Print out enough information to repeat the calculation.
00586 
00587                 summarize_scattering_initial(init, n_rand,
00588                                              dt_snap, snap_cube_size);
00589                 scatter_summary = 2;
00590                 if (pvm_error) cerr << "final state unknown\n";
00591 
00592             }
00593 
00594             if (!pvm_error && scatter_summary > 0)
00595                 summarize_scattering_final(inter, final,
00596                                            scatter_summary, cpu_scatter);
00597 
00598             // Start a new process, if necessary.
00599 
00600             if (n_sent < out.trials_per_zone) {
00601 
00602                 initialize_scattering(prof, rho_sq_min, rho_sq_max, tid_r,
00603                                       scatter_summary_flag, cpu_time_check,
00604                                       dt_snap, snap_cube_size);
00605                 n_sent++;
00606 
00607             } else {
00608 
00609                 // Don't terminate any processes here -- handled elsewhere.
00610                 // Do set Random_seed = 0 to signify that this task is done.
00611 
00612                 Random_seed[task] = 0;
00613             }
00614 
00615         } else {
00616 
00617             cerr << "unexpected PVM message ID = "
00618                  << msgid << " received" << endl;
00619 
00620             err_exit("multiscatter3: fatal error");
00621 
00622         }
00623 
00624         // Accumulate statistics on the result.
00625 
00626         single_scatter_stats(prof, init, inter, final, rho_zone,
00627                              out, acc_stats, single_result);
00628 
00629     }
00630 
00631     return total_hits;
00632 
00633 }
00634 
00635 #endif

Generated at Sun Feb 24 09:57:10 2002 for STARLAB by doxygen1.2.6 written by Dimitri van Heesch, © 1997-2001