Main Page | Class Hierarchy | Class List | File List | Class Members | File Members

mdp_communicator.h

Go to the documentation of this file.
00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 #ifdef PARALLEL
00014 #include "mpi.h"
00015      
00016 typedef MPI_Request mdp_request;
00017 
00018 class mdp_communicator : public mdp_log {
00019  private:  
00020   MPI_Comm communicator;
00021   double   mytime; // total time
00022   int      wormholes_open;
00023   int      my_id;
00024   int      my_nproc;
00025  public:
00026   double   comm_time;  // time spent in communications
00027   void open_wormholes(int argc, char** argv, 
00028                       MPI_Comm communicator_=MPI_COMM_WORLD) {
00029     if(wormholes_open) return;
00030     communicator=communicator_;
00031     MPI_Init(&argc, &argv);
00032     MPI_Comm_rank(communicator, &(my_id));
00033     MPI_Comm_size(communicator, &(my_nproc));
00034     print=true;
00035     for(int i=0; i<nproc(); i++) {
00036       if(me()==i) 
00037         (*this) << "MPI PROCESS " << me() 
00038                 << " of " << nproc() 
00039                 << " STARTING ON " << getname() << "\n";
00040       barrier();
00041     }
00042     if(me()==0) print=true; else print=false;
00043 
00044     begin_function("PROGRAM");
00045     begin_function("open_wormholes");
00046     (*this) << 
00047       "<head>\n"
00048       "*************************************************\n"
00049       "* Starting [Matrix Distributed Processing]      *\n"
00050       "* created by Massimo Di Pierro                  *\n"
00051       "* copyrighted by www.metacryption.com           *\n"
00052       "*************************************************\n";
00053       "</head>\n";
00054 
00055 
00056     reset_time();
00057     wormholes_open=TRUE; 
00058 
00059     double a,b;
00060     getcpuusage(a,b);
00061     end_function("open_wormholes");
00062   }
00063   int tag(int i, int j) {
00064     return i*nproc()+j;
00065   }
00066   inline const int me() {
00067     return my_id;
00068   }
00069   inline const int nproc() {
00070     return my_nproc;
00071   }
00072   void print_stats() {
00073 #ifndef NO_POSIX
00074     int i;
00075     double *a=new double[nproc()];
00076     double *b=new double[nproc()];
00077     double *c=new double[nproc()];
00078     for(i=0; i<nproc(); i++) a[i]=b[i]=c[i]=0; 
00079     getcpuusage(a[me()],b[me()]);
00080     c[me()]=100.0*comm_time/(time());
00081     add(a,nproc());
00082     add(b,nproc());
00083     add(c,nproc());
00084     char buffer[256];
00085     for(i=0; i<nproc(); i++) {
00086       sprintf(buffer,
00087               "* Process %i stats: CPU=%.2f%% PROCESS=%.2f%% COMM=%.2f%%\n",
00088               i, a[i],b[i],c[i]);
00089       (*this) << buffer;
00090     }
00091     (*this) << "* (above numbers make no sense under windows)\n";
00092     delete[] a;
00093     delete[] b;
00094     delete[] c;
00095 #endif
00096   }
00097   void close_wormholes() {
00098     begin_function("close_wormholes");
00099     (*this) << 
00100       "<foot>\n"
00101       "*************************************************\n"
00102       "* Ending [Matrix Distributed Processing]        *\n";
00103     print_stats();
00104     (*this) << 
00105       "*************************************************\n";
00106       "</foot>\n";
00107     (*this) << "PROCESS " << me() << " ENDING AFTER " << time() << " sec.\n";
00108     wormholes_open=FALSE;
00109     MPI_Finalize();
00110     end_function("close_wormholes");
00111     end_function("PROGRAM");
00112   }
00113   void abort() {
00114     MPI_Abort(communicator,1);
00115   }
00116   mdp_communicator() {
00117     wormholes_open=FALSE;
00118   }
00119   virtual ~mdp_communicator() {
00120   }
00121   template<class T>
00122     void put(T &obj, int destination) {
00123     mdp_request r;
00124     MPI_Isend(&obj, sizeof(T)/sizeof(char), MPI_CHAR, destination, 
00125              tag(me(),destination), communicator, &r);
00126     wait(r);
00127   }
00128   template<class T>
00129     void put(T &obj, int destination, mdp_request &r) {
00130     MPI_Isend(&obj, sizeof(T)/sizeof(char), MPI_CHAR, destination, 
00131              tag(me(),destination), communicator, &r);
00132   }
00133   template<class T>
00134     void get(T &obj, int source) {
00135     MPI_Status  status;
00136     MPI_Recv(&obj, sizeof(T)/sizeof(char), MPI_CHAR, source, 
00137              tag(source,me()), communicator, &status);     
00138   }
00139   template<class T>
00140     void put(T* objptr, long length, int destination) {
00141     mdp_request r;
00142     MPI_Isend(objptr, length*sizeof(T)/sizeof(char), MPI_CHAR, destination, 
00143               tag(me(),destination), communicator, &r);
00144     wait(r);
00145   }
00146   template<class T>
00147     void put(T* objptr, long length, int destination, mdp_request &r) {
00148     if(MPI_Isend(objptr, length*sizeof(T)/sizeof(char), MPI_CHAR, destination, 
00149                  tag(me(),destination), communicator, &r)!=MPI_SUCCESS)
00150       error("Failure to send");
00151   }
00152   template<class T>
00153     void get(T* objptr, long length, int source) {
00154     MPI_Status  status;
00155     if(MPI_Recv(objptr, length*sizeof(T)/sizeof(char), MPI_CHAR, source, 
00156              tag(source,me()), communicator, &status)!=MPI_SUCCESS) 
00157       error("Failure to receive");
00158   }
00159   void wait(mdp_request &r) {
00160     MPI_Status status;
00161     MPI_Wait(&r, &status);
00162   }
00163   void wait(mdp_request *r, int length) {
00164     MPI_Status status;
00165     MPI_Waitall(length, r, &status);
00166   }
00167   // Compatibility functions
00168   // obj1 is input, obj2 is output
00169   void add(float &obj1, float &obj2) {
00170     MPI_Allreduce(&obj1, &obj2, 1, MPI_FLOAT, MPI_SUM, communicator);
00171   }
00172   void add(float* obj1, float *obj2, long length) {
00173     MPI_Allreduce(obj1, obj2, length, MPI_FLOAT, MPI_SUM, communicator);
00174   }
00175   void add(double &obj1, double &obj2) {
00176     MPI_Allreduce(&obj1, &obj2, 1, MPI_DOUBLE, MPI_SUM, communicator);
00177   }
00178   void add(double* obj1, double *obj2, long length) {
00179     MPI_Allreduce(obj1, obj2, length, MPI_DOUBLE, MPI_SUM, communicator);
00180   }
00181   void add(int &obj1) {
00182     int obj2=0;
00183     MPI_Allreduce(&obj1, &obj2, 1, MPI_INT, MPI_SUM, communicator);
00184     obj1=obj2;
00185   }
00186   void add(long &obj1) {
00187     long obj2=0;
00188     MPI_Allreduce(&obj1, &obj2, 1, MPI_LONG, MPI_SUM, communicator);
00189     obj1=obj2;
00190   }
00191   void add(float &obj1) {
00192     float obj2=0;
00193     MPI_Allreduce(&obj1, &obj2, 1, MPI_FLOAT, MPI_SUM, communicator);
00194     obj1=obj2;
00195   }
00196   void add(double &obj1) {
00197     double obj2=0;
00198     MPI_Allreduce(&obj1, &obj2, 1, MPI_DOUBLE, MPI_SUM, communicator);
00199     obj1=obj2;
00200   }
00201   void add(int *obj1, long length) {
00202     long i;
00203     int *obj2=new int[length];
00204     for(i=0; i<length; i++) obj2[i]=0;
00205     MPI_Allreduce(obj1, obj2, length, MPI_INT, MPI_SUM, communicator);
00206     for(i=0; i<length; i++) obj1[i]=obj2[i];
00207     delete[] obj2;
00208   }
00209   void add(long *obj1, long length) {
00210     long i;
00211     long *obj2=new long[length];
00212     for(i=0; i<length; i++) obj2[i]=0;
00213     MPI_Allreduce(obj1, obj2, length, MPI_LONG, MPI_SUM, communicator);
00214     for(i=0; i<length; i++) obj1[i]=obj2[i];
00215     delete[] obj2;
00216   }
00217   void add(float *obj1, long length) {
00218     long i;
00219     float *obj2=new float[length];
00220     for(i=0; i<length; i++) obj2[i]=0;
00221     MPI_Allreduce(obj1, obj2, length, MPI_FLOAT, MPI_SUM, communicator);
00222     for(i=0; i<length; i++) obj1[i]=obj2[i];
00223     delete[] obj2;
00224   }
00225   void add(double *obj1, long length) {
00226     long i;
00227     double *obj2=new double[length];
00228     for(i=0; i<length; i++) obj2[i]=0;
00229     MPI_Allreduce(obj1, obj2, length, MPI_DOUBLE, MPI_SUM, communicator);
00230     for(i=0; i<length; i++) obj1[i]=obj2[i];
00231     delete[] obj2;
00232   }
00233   void add(mdp_complex &obj1) {
00234     mdp_complex obj2=0;
00235 #if !defined(USE_DOUBLE_PRECISION)
00236     MPI_Allreduce(&obj1, &obj2, 2, MPI_FLOAT, MPI_SUM, communicator);
00237 #else
00238     MPI_Allreduce(&obj1, &obj2, 2, MPI_DOUBLE, MPI_SUM, communicator);
00239 #endif
00240     obj1=obj2;
00241   }
00242   void add(mdp_complex *obj1, long length) {
00243     long i;
00244     mdp_complex *obj2=new mdp_complex[length];
00245     for(i=0; i<length; i++) obj2[i]=0;
00246 #if !defined(USE_DOUBLE_PRECISION)
00247     MPI_Allreduce(obj1, obj2, 2*length, MPI_FLOAT, MPI_SUM, communicator);
00248 #else
00249     MPI_Allreduce(obj1, obj2, 2*length, MPI_DOUBLE, MPI_SUM, communicator);
00250 #endif
00251     for(i=0; i<length; i++) obj1[i]=obj2[i];
00252     delete[] obj2;
00253   }
00254   void add(mdp_matrix &a) {
00255     add(a.address(), a.rowmax()*a.colmax());
00256   }
00257   void add(mdp_matrix *a, long length) {
00258     long i;
00259     for(i=0; i<length; i++) add(a[i]);
00260   }
00261 
00262   void barrier() {
00263     MPI_Barrier(communicator);
00264   }
00265   template<class T>
00266     void broadcast(T &obj, int p) {
00267     MPI_Bcast(&obj, sizeof(T)/sizeof(char), MPI_CHAR, p, communicator); 
00268   }
00269   template<class T>
00270     void broadcast(T* obj, long length, int p) {
00271     MPI_Bcast(obj, length*sizeof(T)/sizeof(char), MPI_CHAR, p,communicator); 
00272   }
00273   void reset_time() {
00274     mytime=MPI_Wtime();
00275     comm_time=0;
00276   }
00277   double time() {
00278     return MPI_Wtime()-mytime;
00279   }
00280 };
00281 
00282 #else
00283 #include "time.h"
00284 typedef int mdp_request;
00285 
00298 class mdp_communicator : public mdp_log {
00299  private:
00300 #ifndef NO_POSIX
00301   mdp_psim *nodes;
00302 #endif
00303   int    communicator;
00304   int    wormholes_open;
00305   double  mytime; // total time
00306   double MPI_Wtime() {
00307     return walltime();
00308   }
00309   int    my_id;
00310   int    my_nproc;
00311  public:
00312   double  comm_time;  
00313   mdp_communicator() {
00314     wormholes_open=FALSE;
00315   }
00316   template<class T>
00317     void put(T &obj, int destination) {
00318 #ifndef NO_POSIX
00319     nodes->send(destination,"",obj);
00320 #endif
00321   }
00322   template<class T>
00323     void put(T &obj, int destination, mdp_request &r) {
00324 #ifndef NO_POSIX
00325     nodes->send(destination,"",obj);
00326 #endif
00327   }
00328   template<class T>
00329     void get(T &obj, int source) {
00330 #ifndef NO_POSIX
00331     nodes->recv(source,"",obj);
00332 #endif
00333   }
00334   template<class T>
00335     void put(T* objptr, long length, int destination) {
00336 #ifndef NO_POSIX
00337     nodes->send(destination,"",objptr, length);
00338 #endif
00339   }
00340   template<class T>
00341     void put(T* objptr, long length, int destination, mdp_request &r) {
00342 #ifndef NO_POSIX
00343     nodes->send(destination,"",objptr, length);
00344 #endif
00345   }
00346   template<class T>
00347     void get(T* objptr, long length, int source) {
00348 #ifndef NO_POSIX
00349     nodes->recv(source,"",objptr,length);
00350 #endif
00351   }
00352   // ////////////////////////////////
00353   // Compatibility functions
00354   // obj1 is input, obj2 is output
00355   void add(float &obj1, float &obj2) {
00356 #ifndef NO_POSIX
00357     obj2=nodes->add(obj1);
00358 #endif
00359   }
00360   void add(float* obj1, float *obj2, long length) {
00361 #ifndef NO_POSIX
00362     for(int i=0; i<length; i++) {
00363       obj2[i]=nodes->add(obj1[i]);
00364     }
00365 #endif
00366   }
00367   void add(double &obj1, double &obj2) {
00368 #ifndef NO_POSIX
00369     obj2=nodes->add(obj1);
00370 #endif
00371   }
00372   void add(double* obj1, double *obj2, long length) {
00373 #ifndef NO_POSIX
00374     for(int i=0; i<length; i++) {
00375       obj2[i]=nodes->add(obj1[i]);
00376     }
00377 #endif
00378   }
00379   // end compatilibility functions
00380   // /////////////////////////////////
00381   void add(int &obj1) {
00382 #ifndef NO_POSIX
00383     obj1=nodes->add(obj1);
00384 #endif
00385   }
00386   void add(long &obj1) {
00387 #ifndef NO_POSIX
00388     obj1=nodes->add(obj1);
00389 #endif
00390   }
00391   void add(float &obj1) {
00392 #ifndef NO_POSIX
00393     obj1=nodes->add(obj1);
00394 #endif
00395   }
00396   void add(double &obj1) {
00397 #ifndef NO_POSIX
00398     obj1=nodes->add(obj1);
00399 #endif
00400   }
00401   void add(int *obj1, long length) {
00402 #ifndef NO_POSIX
00403     for(int i=0; i<length; i++) {
00404       obj1[i]=nodes->add(obj1[i]);
00405     }
00406 #endif
00407   }
00408   void add(long *obj1, long length) {
00409 #ifndef NO_POSIX
00410     for(long i=0; i<length; i++) obj1[i]=nodes->add(obj1[i]);
00411 #endif
00412   }
00413   void add(float *obj1, long length) {
00414 #ifndef NO_POSIX
00415     for(long i=0; i<length; i++) obj1[i]=nodes->add(obj1[i]);
00416 #endif
00417   }
00418   void add(double *obj1, long length) {
00419 #ifndef NO_POSIX
00420     for(long i=0; i<length; i++) obj1[i]=nodes->add(obj1[i]);
00421 #endif
00422   }
00423   void add(mdp_complex &obj1) {
00424 #ifndef NO_POSIX
00425     obj1=nodes->add(obj1);
00426 #endif
00427   }
00428   void add(mdp_complex *obj1, long length) {
00429 #ifndef NO_POSIX
00430     for(long i=0; i<length; i++) obj1[i]=nodes->add(obj1[i]);
00431 #endif
00432   }
00433   void add(mdp_matrix &a) {
00434 #ifndef NO_POSIX
00435     for(long i=0; i<a.size(); i++) 
00436       a.address()[i]=nodes->add(a.address()[i]);
00437 #endif
00438   }
00439   void add(mdp_matrix *a, long length) {
00440 #ifndef NO_POSIX
00441     for(long j=0; j<length; j++)
00442       for(long i=0; i<a[j].size(); i++) 
00443         a[j].address()[i]=nodes->add(a[j].address()[i]);
00444 #endif
00445   }
00446   template<class T>
00447     void broadcast(T &obj, int p) {
00448 #ifndef NO_POSIX
00449     nodes->broadcast(p,obj);
00450 #endif
00451   }
00452   template<class T>
00453     void broadcast(T* obj, long length, int p) {
00454 #ifndef NO_POSIX
00455     nodes->broadcast(p,obj,length);
00456 #endif
00457   }
00458   void wait(mdp_request &r) {}
00459   void wait(mdp_request *r, int length) {}
00460   inline const int me() {
00461     return my_id;
00462   }
00463   inline const int nproc() {
00464     return my_nproc;
00465   }
00466   void barrier() {}
00467   int tag(int i, int j) {
00468     return i*nproc()+j;
00469   }
00470   void reset_time() {
00471     mytime=MPI_Wtime();
00472     comm_time=0;
00473   }
00475   double time() { 
00476     return MPI_Wtime()-mytime;
00477   }
00480   void open_wormholes(int argc, char** argv) {
00481     if(wormholes_open) return;
00482 
00483 #ifndef NO_POSIX      
00484     nodes=new mdp_psim(argc, argv);
00485     my_id=nodes->id();
00486     my_nproc=nodes->nprocs();
00487 #else
00488     my_id=0;
00489     my_nproc=1;
00490 #endif
00491     if(me()==0) print=true; else print=false;
00492     begin_function("PROGRAM");
00493     begin_function("open_wormholes");
00494     (*this) << 
00495       "<head>\n"
00496       "*************************************************\n"
00497       "* Starting [mdp_matrix Distributed Processing]      *\n"
00498       "* created by Massimo Di Pierro                  *\n"
00499       "* copyrighted by www.metacryption.com           *\n"
00500       "*************************************************\n";
00501       "</head>\n";
00502     reset_time();
00503     double a,b;
00504     getcpuusage(a,b);
00505     wormholes_open=TRUE;
00506     end_function("open_wormholes");
00507   }
00509   void print_stats() {
00510 #ifndef NO_POSIX
00511     int i;
00512     double *a=new double[nproc()];
00513     double *b=new double[nproc()];
00514     double *c=new double[nproc()];
00515     for(i=0; i<nproc(); i++) a[i]=b[i]=c[i]=0; 
00516     getcpuusage(a[me()],b[me()]);
00517     c[me()]=100.0*comm_time/(time());
00518     add(a,nproc());
00519     add(b,nproc());
00520     add(c,nproc());
00521     char buffer[256];
00522     for(i=0; i<nproc(); i++) {
00523       sprintf(buffer,
00524               "* Process %i stats: CPU=%.2f%% PROCESS=%.2f%% COMM=%.2f%%\n",
00525               i, a[i],b[i],c[i]);
00526       (*this) << buffer;
00527     }
00528     (*this) << "* (above numbers make no sense under windows)\n";
00529     delete[] a;
00530     delete[] b;
00531     delete[] c;
00532 #endif
00533   }
00535   void close_wormholes() {
00536     begin_function("close_wormholes");
00537     (*this) << 
00538       "<foot>\n"
00539       "*************************************************\n"
00540       "* Ending [mdp_matrix Distributed Processing]        *\n";
00541     print_stats();
00542     (*this) << 
00543       "*************************************************\n";
00544       "</foot>\n";
00545     (*this) << "PROCESS " << me() << " ENDING AFTER " << time() << " sec.\n";
00546     wormholes_open=FALSE;
00547     end_function("close_wormholes");
00548     end_function("PROGRAM");
00549 #ifndef NO_POSIX
00550     if(nodes) delete nodes;
00551 #endif
00552   }
00554   void abort() {
00555     (*this) << "calling abort...";
00556     exit(-1);
00557   }
00558 };
00559 
00560 #endif
00561 
00563 mdp_communicator mdp; 
00564 
00566 mdp_communicator& mpi=mdp; 
00567 
00568 void _mpi_error_message(string a, string b, int c) {
00569   mpi.error_message(a,b,c);
00570 }
00571 
00573 inline void begin_function(string s) {
00574   mpi.begin_function(s);
00575 }
00576 
00578 inline void end_function(string s) {
00579   mpi.end_function(s);
00580 }
00581 

Generated on Sun Feb 27 15:12:20 2005 by  doxygen 1.4.1