00001
00002 #include "sigma3.h"
00003 #include "pvm_scatt.h"
00004
00005 #ifdef HAS_PVM
00006
00007
00008
00009 #include "pvm3.h"
00010
00011
00012
00013
00014
00015 void initialize_processors(int ntask, int debug);
00016 void terminate_processors(int debug);
00017 int multiscatter3(scatter_profile & prof, sigma_out & out,
00018 real rho_sq_min, real rho_sq_max, int rho_zone,
00019 real dt_snap, real snap_cube_size,
00020 real cpu_time_check, real cpu_init, real &cpu_save,
00021 int& scatt_total, real& cpu_total, stat_fp acc_stats,
00022 int debug, int scatter_summary_flag);
00023
00024
00025
00026 #define RECV_TIMEOUT 3600 // time out on receive after 1 hour
00027
00028
00029
00030 static char Exe[128];
00031
00032 #define NTASK_MAX 16 // maximum number of tasks to start
00033
00034 static int Tid[NTASK_MAX];
00035 static int Nscatt[NTASK_MAX];
00036 static real Cpu[NTASK_MAX];
00037 static char Name[NTASK_MAX][128];
00038
00039 static int Random_seed[NTASK_MAX];
00040 static initial_state3 Init[NTASK_MAX];
00041
00042 static int N_task_init = 0;
00043
00044
00045
00046 local int task_num(int task_id)
00047 {
00048 for (int task = 0; task < N_task_init; task++)
00049 if (Tid[task] == task_id) return task;
00050 return -1;
00051 }
00052
00053 local char* host_name(int task_id)
00054 {
00055 int task = task_num(task_id);
00056 if (task >= 0)
00057 return Name[task];
00058 else
00059 return NULL;
00060 }
00061
00062 local void initialize_task(int task, int reinit, int io)
00063 {
00064 if (reinit) {
00065
00066
00067
00068 if ( pvm_spawn(Exe, (char**)0, 1,
00069 Name[task], 1, Tid+task) != 1) {
00070 cerr << "Unable to re-spawn task #" << task << endl;
00071 exit(1);
00072 }
00073
00074 } else {
00075
00076
00077
00078 if ( pvm_spawn(Exe, (char**)0, 33, ".", 1, Tid+task) != 1) {
00079 cerr << "Unable to spawn task #" << task << endl;
00080 exit(1);
00081 }
00082
00083 }
00084
00085 if (pvm_recv(Tid[task], HANDSHAKE_MSG) < 0) {
00086 cerr << "Error getting new ID for task #" << task << endl;
00087 exit(1);
00088 }
00089
00090 pvm_upkstr(Name[task]);
00091
00092 if (io) {
00093 cerr << (reinit ? "Rei" : "I");
00094 cerr << "nitialized PVM task #" << task
00095 <<", TID = " << Tid[task]
00096 << ", node = " << Name[task] << endl << flush;
00097 }
00098
00099
00100
00101 pvm_notify(PvmTaskExit, FAILURE_MSG, 1, Tid+task);
00102
00103 }
00104
00105 local void initialize_scattering(scatter_profile & prof,
00106 real rho_sq_min, real rho_sq_max,
00107 int tid, int scatter_summary,
00108 real cpu_time_check,
00109 real dt_snap, real snap_cube_size)
00110 {
00111 int task = task_num(tid);
00112 single_scatter_init(prof, rho_sq_min, rho_sq_max,
00113 Init[task], Random_seed[task],
00114 scatter_summary, dt_snap, snap_cube_size);
00115
00116
00117
00118 pvm_initsend(PvmDataRaw);
00119
00120 pvm_pkint(Random_seed+task, 1, 1);
00121 pack(Init[task]);
00122
00123 pvm_pkdouble(&cpu_time_check, 1, 1);
00124 pvm_pkdouble(&dt_snap, 1, 1);
00125 pvm_pkdouble(&snap_cube_size, 1, 1);
00126
00127
00128
00129 pvm_send(tid, SEND_DATA_MSG);
00130
00131 if (scatter_summary > 0)
00132 cerr << "task " << task << ": sent data to " << tid
00133 << " (" << host_name(tid) << ")\n" << flush;
00134
00135 }
00136
00137 local void closeout_tasks(scatter_profile & prof,
00138 sigma_out & out, int rho_zone,
00139 real dt_snap, real snap_cube_size,
00140 stat_fp acc_stats, int debug,
00141 int n_sent, int & n_complete,
00142 intermediate_state3 & inter,
00143 final_state3 & final)
00144 {
00145 if (n_sent < out.trials_per_zone) {
00146
00147 terminate_processors(debug);
00148 pvm_exit();
00149 cerr << "Unable to continue -- cannot identify failed task\n";
00150 err_exit("multiscatter3: calculation aborted.");
00151
00152 } else {
00153
00154
00155
00156
00157
00158
00159
00160 inter.descriptor = unknown_intermediate;
00161 inter.n_osc = 0;
00162
00163 final.descriptor = error;
00164 final.time = 0;
00165 final.error = 0;
00166 final.n_steps = 0;
00167
00168 cerr << n_sent - n_complete << " uncompleted tasks:\n";
00169
00170 for (int task = 0; task < N_task_init; task++)
00171
00172 if (Random_seed[task] > 0) {
00173
00174 summarize_scattering_initial(Init[task],
00175 Random_seed[task],
00176 dt_snap, snap_cube_size);
00177
00178
00179
00180
00181 single_scatter_stats(prof, Init[task], inter, final,
00182 rho_zone,
00183 out, acc_stats, 0);
00184
00185 Random_seed[task] = 0;
00186 n_complete++;
00187
00188
00189
00190
00191
00192
00193
00194
00195
00196
00197
00198
00199 pvm_notify(PvmTaskExit, KILL_CONFIRM, 1, Tid+task);
00200
00201 if (pvm_kill(Tid[task]) < 0) {
00202
00203 cerr << "Unable to kill task #" << task << endl;
00204 exit(1);
00205
00206 } else {
00207
00208 cerr << "Task #" << task << " ("
00209 << Tid[task] << ", node " << Name[task]
00210 << ") killed..." << flush;
00211
00212 }
00213
00214
00215
00216 struct timeval timeout;
00217 timeout.tv_sec = 600;
00218 int bufid;
00219
00220
00221
00222
00223
00224 if ( (bufid = pvm_trecv(-1, KILL_CONFIRM, &timeout)) <= 0 ) {
00225
00226 if (bufid < 0)
00227 cerr << "error receiving confirmation.\n";
00228 else
00229 cerr << "timeout receiving confirmation.\n";
00230
00231 exit(1);
00232
00233 } else {
00234
00235 int tid = -1;
00236
00237 if (pvm_upkint(&tid, 1, 1) < 0) {
00238 cerr << "error unpacking confirmation message.\n";
00239 exit(1);
00240 } else
00241 cerr << "confirmation received from TID "
00242 << tid << ".\n";
00243
00244 }
00245
00246
00247
00248 starlab_wait(10);
00249
00250
00251
00252
00253 int n_flush = 0;
00254 while ((bufid = pvm_probe(-1, FAILURE_MSG)) > 0) {
00255
00256 pvm_recv(-1, FAILURE_MSG);
00257
00258 int tid = -1;
00259 if (pvm_upkint(&tid, 1, 1) == 0) {
00260 if (tid != Tid[task]) {
00261 cerr << "..." << tid;
00262 n_flush++;
00263 }
00264 }
00265
00266 }
00267
00268 if (n_flush > 0)
00269 cerr << ": " << n_flush << " unexpected failure message"
00270 << (n_flush > 1 ? "s" : "")
00271 << " flushed\n";
00272
00273
00274
00275 initialize_task(task, 1, 1);
00276
00277 }
00278
00279 if (n_complete != n_sent)
00280 cerr << "warning: " << n_sent - n_complete
00281 << " tasks unaccounted for...\n";
00282 }
00283 }
00284
00285
00286
00287
00288
00289 void initialize_processors(int ntask, int debug)
00290 {
00291
00292
00293
00294 N_task_init = min(NTASK_MAX, ntask);
00295
00296 if (getenv("PVM_ROOT") == NULL) err_exit("PVM not available");
00297
00298
00299 strcpy(Exe, getenv("STARLAB_PATH"));
00300 if (Exe[0] == '\0')
00301 strcpy(Exe, getenv("PWD"));
00302 else
00303 strcat(Exe, "/bin");
00304 strcat(Exe, "/scatter3_slave.pvm");
00305
00306
00307
00308 for (int task = 0; task < N_task_init; task++) {
00309
00310 initialize_task(task, 0, 0);
00311
00312 Cpu[task] = 0;
00313 Nscatt[task] = 0;
00314
00315 }
00316
00317
00318
00319
00320 cerr << N_task_init << " PVM slave processes initialized\n";
00321
00322 if (debug) {
00323 cerr << "\nhost names:";
00324 for (int i = 0; i < N_task_init; i += 4) {
00325 for (int task = i; task < min(i+4, N_task_init); task++)
00326 cerr << " " << Name[task];
00327 cerr << endl;
00328 if (i < N_task_init - 4) cerr << " ";
00329 }
00330
00331 cerr << "\nhost TIDs:";
00332 for (int i = 0; i < N_task_init; i += 4) {
00333 for (int task = i; task < min(i+4, N_task_init); task++)
00334 cerr << " " << Tid[task];
00335 cerr << endl;
00336 if (i < N_task_init - 4) cerr << " ";
00337 }
00338 }
00339
00340 }
00341
00342 void terminate_processors(int debug)
00343 {
00344 for (int task = 0; task < N_task_init; task++) {
00345
00346 pvm_initsend(PvmDataRaw);
00347
00348
00349
00350 if (pvm_send(Tid[task], TERMINATE_MSG) < 0)
00351 cerr << "Error sending termination message to task = "
00352 << Tid[task] << " (" << Name[task] << ")\n";
00353
00354 }
00355
00356 cerr << N_task_init << " PVM slave processes terminated\n";
00357
00358 if (debug) {
00359
00360
00361
00362 cerr << "\nscatterings per task: ";
00363 for (int i = 0; i < N_task_init; i += 4) {
00364 for (int task = i; task < min(i+4, N_task_init); task++)
00365 cerr << " " << Nscatt[task];
00366 cerr << endl;
00367 if (i < N_task_init-4) cerr << " ";
00368 }
00369
00370 cerr << "\nCPU %load distribution:";
00371 int p = cerr.precision(4);
00372
00373 real total = 0;
00374 for (int task = 0; task < N_task_init; task++) total += Cpu[task];
00375 for (int i = 0; i < N_task_init; i += 4) {
00376 for (int task = i; task < min(i+4, N_task_init); task++)
00377 cerr << " " << 100*Cpu[task]/total;
00378 cerr << endl;
00379 if (i < N_task_init-4) cerr << " ";
00380 }
00381 cerr.precision(p);
00382 }
00383
00384 N_task_init = 0;
00385 pvm_exit();
00386 }
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397 int multiscatter3(scatter_profile & prof, sigma_out & out,
00398 real rho_sq_min, real rho_sq_max, int rho_zone,
00399 real dt_snap, real snap_cube_size,
00400 real cpu_time_check, real cpu_init, real &cpu_save,
00401 int& scatt_total, real& cpu_total, stat_fp acc_stats,
00402 int debug, int scatter_summary_flag)
00403 {
00404 int n_sent = 0, n_complete = 0;
00405 int total_hits = 0;
00406
00407 if (N_task_init <= 0) err_exit("PVM not initialized.");
00408
00409
00410
00411 for (int task = 0; task < N_task_init; task++) {
00412
00413
00414
00415
00416
00417
00418 initialize_scattering(prof, rho_sq_min, rho_sq_max, Tid[task],
00419 scatter_summary_flag, cpu_time_check,
00420 dt_snap, snap_cube_size);
00421 n_sent++;
00422 }
00423
00424
00425
00426 int single_result;
00427 real cpu_scatter;
00428
00429 while (n_complete < out.trials_per_zone) {
00430
00431 int n_rand;
00432 initial_state3 init;
00433 intermediate_state3 inter;
00434 final_state3 final;
00435
00436 int scatter_summary = scatter_summary_flag;
00437
00438 struct timeval timeout;
00439 timeout.tv_sec = RECV_TIMEOUT;
00440 int bufid, length, msgid, tid_r;
00441
00442 int task;
00443
00444
00445
00446
00447
00448 if ( (bufid = pvm_trecv(-1, -1, &timeout)) <= 0 ) {
00449
00450
00451
00452
00453
00454
00455 if (bufid < 0)
00456 cerr << "\nError receiving data...\n";
00457 else
00458 cerr << "\nTimeout receiving data...\n";
00459
00460
00461
00462 closeout_tasks(prof, out, rho_zone,
00463 dt_snap, snap_cube_size,
00464 acc_stats, debug,
00465 n_sent, n_complete, inter, final);
00466 break;
00467
00468 }
00469
00470 length = msgid = tid_r = -1;
00471 int status = pvm_bufinfo(bufid, &length, &msgid, &tid_r);
00472 bool pvm_error = (status < 0 || length <= 0 || msgid < 0 || tid_r < 0);
00473
00474 if (pvm_error) {
00475
00476 cerr << "\nUnknown or illegal message received...\n";
00477
00478
00479
00480 PRL(bufid);
00481 PRL(status);
00482 PRL(length);
00483 PRL(msgid);
00484 PRL(tid_r);
00485
00486 PRL(n_sent);
00487 PRL(n_complete);
00488
00489 closeout_tasks(prof, out, rho_zone,
00490 dt_snap, snap_cube_size,
00491 acc_stats, debug,
00492 n_sent, n_complete, inter, final);
00493 break;
00494
00495 }
00496
00497 if (msgid == RETURN_DATA_MSG || msgid == FAILURE_MSG) {
00498
00499
00500
00501 if (msgid == FAILURE_MSG) {
00502
00503
00504
00505
00506 pvm_upkint(&tid_r, 1, 1);
00507
00508 cerr << "\nTask " << tid_r << " (" << host_name(tid_r)
00509 << ") has terminated prematurely.\n";
00510
00511 task = task_num(tid_r);
00512
00513
00514
00515 n_rand = Random_seed[task];
00516 init = Init[task];
00517
00518
00519
00520
00521 initialize_task(task, 1, 1);
00522
00523
00524
00525
00526
00527 inter.descriptor = unknown_intermediate;
00528 inter.n_osc = 0;
00529
00530 final.descriptor = error;
00531 final.time = 0;
00532 final.error = 0;
00533 final.n_steps = 0;
00534
00535 single_result = 0;
00536 cpu_scatter = 0;
00537
00538 pvm_error = true;
00539
00540 } else {
00541
00542
00543
00544 task = task_num(tid_r);
00545
00546 if (scatter_summary > 0)
00547 cerr << "\nreceived data from TID = " << tid_r
00548 << " (" << Name[task] << ")\n" << flush;
00549
00550 pvm_upkint(&n_rand, 1, 1);
00551
00552
00553
00554
00555 unpack(init);
00556 unpack(inter);
00557 unpack(final);
00558
00559 pvm_upkint(&single_result, 1, 1);
00560 pvm_upkdouble(&cpu_scatter, 1, 1);
00561
00562 }
00563
00564 n_complete++;
00565 total_hits += single_result;
00566 scatt_total++;
00567 cpu_total += cpu_scatter;
00568
00569 Nscatt[task]++;
00570 Cpu[task] += cpu_scatter;
00571
00572
00573
00574 if (abs(debug) > 2) {
00575 int p = cerr.precision(STD_PRECISION);
00576 cerr << "single_scatter: rho_max^2 = " << rho_sq_max
00577 << " ; returning with n_hit = " << single_result
00578 << " and n_hit_tot = " << out.n_hit_tot + single_result
00579 << endl;
00580 cerr.precision(p);
00581 }
00582
00583 if (final.descriptor == error) {
00584
00585
00586
00587 summarize_scattering_initial(init, n_rand,
00588 dt_snap, snap_cube_size);
00589 scatter_summary = 2;
00590 if (pvm_error) cerr << "final state unknown\n";
00591
00592 }
00593
00594 if (!pvm_error && scatter_summary > 0)
00595 summarize_scattering_final(inter, final,
00596 scatter_summary, cpu_scatter);
00597
00598
00599
00600 if (n_sent < out.trials_per_zone) {
00601
00602 initialize_scattering(prof, rho_sq_min, rho_sq_max, tid_r,
00603 scatter_summary_flag, cpu_time_check,
00604 dt_snap, snap_cube_size);
00605 n_sent++;
00606
00607 } else {
00608
00609
00610
00611
00612 Random_seed[task] = 0;
00613 }
00614
00615 } else {
00616
00617 cerr << "unexpected PVM message ID = "
00618 << msgid << " received" << endl;
00619
00620 err_exit("multiscatter3: fatal error");
00621
00622 }
00623
00624
00625
00626 single_scatter_stats(prof, init, inter, final, rho_zone,
00627 out, acc_stats, single_result);
00628
00629 }
00630
00631 return total_hits;
00632
00633 }
00634
00635 #endif