Main Page | Class Hierarchy | Class List | File List | Class Members | File Members

mdp_psim.h

Go to the documentation of this file.
00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 #include "cstdio"
00014 #include "cstdlib"
00015 #include "string"
00016 #include "iostream"
00017 #include "string"
00018 #include "vector"
00019 #include "map"
00020 #include <sys/file.h>
00021 #include <sys/types.h>
00022 #include <sys/stat.h>
00023 #include <sys/socket.h>
00024 #include <fcntl.h>
00025 using namespace std;
00026 
00051 class mdp_psim {
00052 private:
00053  
00054   // Typed Constants
00055   const static int PROCESS_COUNT_MIN= 1;  // minimum number of processes
00056   const static int PROCESS_COUNT_MAX= 128;// maximum number of processes
00057   const static int CONN_LIST_IGNORE = -1; // connections that cannot occur
00058   const static int PROCESS_PARENT   = 0;  // The parent process ID number
00059   const static int COMM_RECV = 0;      // socket array indicator for reading
00060   const static int COMM_SEND= 1;  // socket array indicator for writing
00061   const static int COMM_TIMEOUT_DEFAULT = 86400;  // 1 day default
00062   
00063   // common enum values for logging routines
00064   enum enumBegEnd       { LOG_BEGIN, LOG_END };
00065   enum enumSendRecv     { LOG_SR_SEND, LOG_SR_RECV };
00066   enum enumSendRecvStep { LOG_SR_START, LOG_SR_FAIL, LOG_SR_SUCCESS };
00067   
00068   // Set this to true if you want verbose testing description
00069   
00070   // Class variables
00071   int     _verbatim;              // 0 for no output, 1 for some, 2 more
00072   int     _processCount;          // Holds the number of processes
00073   string  _logFileName;           // filename of the log file
00074   bool    _doLogging;             // do logging or not?
00075   FILE*   _logfileFD;             // file descriptor for the logging file
00076   int     _processID;             // process ID of "this" process
00077   
00078   int   (*_socketFD)[2];          // array to hold all of the sockets
00079   int _commTimeout;               // defaults to COMM_TIMEOUT_DEFAULT
00080   
00081   map< string, vector<char> >* _hash;  
00082   // Hash Map to hold out of sequence (send/receive) data
00083 
00084   // ******************************************************************* 
00085   // ***         Private Method: psim_begin                          *** 
00086   // ***                                                             *** 
00087   // ***  Used by the constructor ONLY                               *** 
00088   // ******************************************************************* 
00089 
00090   void psim_begin(int processCount, string logFileName, int verbatim) {
00091     _processCount=processCount;
00092     _logFileName=logFileName;
00093     _verbatim=verbatim;
00094 
00095     open_log();
00096     if ((processCount<PROCESS_COUNT_MIN) || (processCount<PROCESS_COUNT_MIN)) {
00097       log("PSIM ERROR: Invalid number of processes");
00098       throw string("PSIM ERROR: Invalid number of processes");
00099     }
00100     
00101     initialize(processCount);
00102     create_sockets();
00103     fork_processes();
00104     
00105     char buffer[256];
00106     sprintf(buffer, "process %i of %i created with pid=%i",
00107             _processID, _processCount, getpid());
00108     log(buffer,1);
00109     
00110   }
00111 
00112   // ******************************************************************* 
00113   // ***         Private Method: psim_end                            *** 
00114   // ***                                                             *** 
00115   // ***  Used by the destructor ONLY                                ***
00116   // ***                                                             *** 
00117   // ******************************************************************* 
00118 
00119   void psim_end() {
00120     for (int source=0; source<_processCount; source++) {
00121       for (int dest=0; dest<_processCount; dest++) {
00122         close(_socketFD[_processCount*source+dest][COMM_SEND]);
00123         close(_socketFD[_processCount*source+dest][COMM_RECV]);
00124       }
00125     }      
00126     if (_socketFD != NULL)
00127       delete [] _socketFD;
00128     _socketFD = NULL;
00129     
00130     // only delete the _hash if it still exists
00131     if (_hash != NULL) {
00132       delete [] _hash;
00133       _hash = NULL;
00134     }
00135     
00136     char buffer[256];
00137     sprintf(buffer, "process %i terminating", _processID);
00138     log(buffer,1);
00139 
00140     close_log();
00141   }
00142 
00143   // ******************************************************************* 
00144   // ***         Private Method: initialize                          *** 
00145   // ***                                                             *** 
00146   // ***  Used by the constructor, this method sets up values and    *** 
00147   // ***  some of the needed resources.                              *** 
00148   // ***                                                             *** 
00149   // ******************************************************************* 
00150 
00151   void initialize(int processCount) {
00152     _processCount = processCount;    
00153     _processID = PROCESS_PARENT;    
00154     _commTimeout = COMM_TIMEOUT_DEFAULT;
00155     
00156     _hash = new map< string, vector<char> >[_processCount];    
00157     if (_hash == NULL) {
00158       log("PSIM ERROR: failure to allocate hash");
00159       throw string("PSIM ERROR: failure to allocate hash");
00160     }
00161     _socketFD = new int[_processCount*_processCount][2];    
00162     if (_socketFD == NULL) {
00163       log("PSIM ERROR: failed to create socket array");    
00164       throw string("PSIM ERROR: failed to create socket array");    
00165     }
00166   }
00167   
00168   // ******************************************************************* 
00169   // ***         Private Method: create_sockets                      *** 
00170   // ***                                                             *** 
00171   // ***  Opens all of the sockets necessary for communication and   *** 
00172   // ***  inserts them into an array.                                *** 
00173   // ***                                                             *** 
00174   // ******************************************************************* 
00175 
00176   void create_sockets() { 
00177     for (int source=0; source<_processCount; source++) {
00178       for (int dest=0; dest<_processCount; dest++) {
00179         if (socketpair(AF_LOCAL, SOCK_STREAM, 0,
00180                        _socketFD[_processCount*source+dest]) < 0) {
00181           log("PSIM ERROR: socketpair");
00182           throw string("PSIM ERROR: socketpair");
00183         }
00184       }
00185     }      
00186     char buffer[256];    
00187     for (int source=0; source<_processCount; source++) 
00188       for (int dest=0; dest<_processCount; dest++) {
00189         sprintf(buffer,"_socketFD[%i*%i+%i]={%i,%i}",
00190                 source,_processCount,dest,
00191                 _socketFD[_processCount*source+dest][COMM_SEND],
00192                 _socketFD[_processCount*source+dest][COMM_RECV]);
00193         log(buffer);
00194       }
00195   }
00196 
00197   // ******************************************************************* 
00198   // ***         Private Method: fork_processes                      *** 
00199   // ***                                                             *** 
00200   // ***                                                             *** 
00201   // ******************************************************************* 
00202   void fork_processes() {
00203     _processID=0;
00204     for (int i=1; i<_processCount; i++) {
00205       int pid = fork();
00206       
00207       if (pid == -1) {
00208         log("PSIM ERROR: fork");
00209         throw("PSIM ERROR: fork");
00210       } else if (pid == 0) { // Child Process
00211         _processID = i;
00212         break;
00213       }
00214     }
00215   }    
00216 
00217   // ******************************************************************* 
00218   // ***         Private Method: check_process_id                    *** 
00219   // ***                                                             *** 
00220   // ***  Varifies that the destination process ID is valid. This    *** 
00221   // ***  is done before data is sent or received.                   *** 
00222   // ***                                                             *** 
00223   // ******************************************************************* 
00224     
00225   void check_process_id(int processID) {
00226     
00227     if ((processID == _processID) || 
00228         (processID < PROCESS_PARENT) ||
00229         (processID >= _processCount)) {
00230       
00231       char buffer[256];
00232       sprintf(buffer, "PSIM ERROR: process %i referencing %i.",
00233               _processID, processID);
00234       log(buffer);
00235       throw string( buffer );
00236     }
00237   }
00238   
00239   // ******************************************************************* 
00240   // ***         Private Method: open_log                            *** 
00241   // ***                                                             *** 
00242   // ***  This method initializes the process log and sets it up for *** 
00243   // ***  appending messages.                                        *** 
00244   // ***                                                             *** 
00245   // ******************************************************************* 
00246   
00247   void open_log() {
00248     _doLogging = false;
00249     if (_logFileName.length()==0) {      
00250       return;
00251     }
00252     
00253     // open and reset file
00254     if ((_logfileFD = fopen(_logFileName.c_str(), "w")) == NULL) {
00255       log("PSIM ERROR: unable to create logfile");
00256       throw string("PSIM ERROR: unable to create logfile");
00257     }
00258     // close the log file
00259     close_log();
00260     
00261     // reopen the log file in append mode
00262     if ((_logfileFD = fopen(_logFileName.c_str(), "a")) == NULL) {
00263       log("PSIM ERROR: unable to open logfile");
00264       throw string("PSIM ERROR: unable to open logfile");
00265     }
00266     _doLogging=true;
00267     
00268   }
00269   
00270   
00271   // ******************************************************************* 
00272   // ***         Private Method: close_log                           *** 
00273   // ***                                                             *** 
00274   // ***  Closes the log file.                                       *** 
00275   // ***                                                             *** 
00276   // ******************************************************************* 
00277   
00278   void close_log() {
00279     if (_doLogging)
00280       fclose(_logfileFD);
00281     
00282   }
00283   
00284   
00285   // ******************************************************************* 
00286   // ***         Private Method: logSendRecv                         *** 
00287   // ***                                                             *** 
00288   // ***  Centralizes the repetitive task of logging the steps       *** 
00289   // ***  during send and receive.                                   *** 
00290   // ***                                                             *** 
00291   // ******************************************************************* 
00292   
00293   void logSendRecv(int sourcedestProcessID,
00294                    string tag,
00295                    enumSendRecv method, 
00296                    enumSendRecvStep step){
00297     
00298     char buffer[256];
00299     const char cmdSendRecv[2][8] = { "send", "recv" };
00300     const char stepSendRecv[3][12] = { "starting...", "failed!", "success." };
00301     
00302     sprintf(buffer, "%i %s(%i,%s) %s", 
00303             _processID, cmdSendRecv[method], 
00304             sourcedestProcessID, tag.c_str(), stepSendRecv[step]);
00305     log(buffer);
00306   }
00307   
00308   // ******************************************************************* 
00309   // ***         Private Method: get_source_index                    *** 
00310   // ***                                                             *** 
00311   // ******************************************************************* 
00312   
00313   int get_source_index(int source) {
00314     check_process_id(source);
00315     return _processCount*source+_processID;
00316   }
00317 
00318   // ******************************************************************* 
00319   // ***         Private Method: detDestIndex                        *** 
00320   // ***                                                             *** 
00321   // ******************************************************************* 
00322   
00323   int get_dest_index(int dest) {
00324     check_process_id(dest);
00325     return _processCount*_processID+dest;
00326   }
00327   
00328   // ******************************************************************* 
00329   // ***         Private Method: send_buffer                         *** 
00330   // ***                                                             *** 
00331   // ***  Handles the sending of binary data.                        *** 
00332   // ***                                                             *** 
00333   // ******************************************************************* 
00334 
00335   void send_buffer(int destProcessID, 
00336                    const void* pdataToSend, long dataSize) {
00337     
00338     int destIndex = get_dest_index(destProcessID);
00339     if (write(_socketFD[destIndex][COMM_SEND], pdataToSend, dataSize)
00340         != dataSize) {
00341       log("PSIM ERROR: failure to write to socket");
00342       throw string("PSIM ERROR: failure to write to socket");
00343     }
00344   }
00345   
00346   
00347   // ******************************************************************* 
00348   // ***         Private Method: send_binary                         *** 
00349   // ***                                                             *** 
00350   // ***  Sends a data tag and a vector of chars (as binary data).   *** 
00351   // ***                                                             *** 
00352   // ******************************************************************* 
00353   
00354   void send_binary(int destProcessID, 
00355                    const string& tag, 
00356                    const vector<char>& data) {
00357     
00358     int tagSize = tag.size();
00359     int dataSize = data.size();
00360     send_buffer(destProcessID, &tagSize, sizeof(tagSize));    
00361     send_buffer(destProcessID, tag.c_str(), tagSize);    
00362     send_buffer(destProcessID, &dataSize, sizeof(dataSize));
00363     send_buffer(destProcessID, &data[0], dataSize);
00364   }
00365   
00366   
00367   // ******************************************************************* 
00368   // ***         Private Method: recv_buffer                         *** 
00369   // ***                                                             *** 
00370   // ***  Handles the receiving of binary data through the sockets.  *** 
00371   // ***                                                             *** 
00372   // ******************************************************************* 
00373   
00374   void recv_buffer(int sourceProcessID, 
00375                    void* pdataToReceive, long dataSize) {
00376     long bytes = 0;
00377     long t0=time(NULL);
00378     int sourceIndex = get_source_index(sourceProcessID);
00379    
00380     // set up blocking read
00381     do{
00382       bytes+=read(_socketFD[sourceIndex][COMM_RECV],
00383                   (char*) pdataToReceive+bytes,dataSize-bytes);
00384       if(time(NULL)-t0>_commTimeout) {
00385         log("PSIM ERROR: timeout error in readin from socket");
00386         throw string("PSIM ERROR: timeout error in readin from socket");
00387       }
00388     } while (bytes<dataSize);
00389   }
00390   
00391   
00392   // ******************************************************************* 
00393   // ***         Private Method: recv_binary                         *** 
00394   // ***                                                             *** 
00395   // ***  Receives data utilizing a data tag to make sure that the   *** 
00396   // ***  data coming in is what was expected.                       *** 
00397   // ***                                                             *** 
00398   // ******************************************************************* 
00399   
00400   void recv_binary(int sourceProcessID, 
00401                    const string& tag, 
00402                    vector<char>& data ) {
00403     
00404     static vector<char> dataTmp;
00405     static string tagReceived;
00406     int size;
00407     
00408     map< string, vector<char> >::iterator itr;
00409     
00410     while (true) {
00411       itr = _hash[sourceProcessID].find(tag);
00412       
00413       if (itr != _hash[sourceProcessID].end()) { // Found?
00414         data = _hash[sourceProcessID][tag];
00415         _hash[sourceProcessID].erase(itr);
00416         break;
00417       } else {
00418         recv_buffer(sourceProcessID, &size, sizeof(size));
00419         char* buffer= new char[size+1];
00420         recv_buffer(sourceProcessID, buffer, size);
00421         buffer[size] = 0;
00422         tagReceived = buffer;
00423         delete buffer;
00424 
00425         if (tagReceived == tag) {
00426           recv_buffer(sourceProcessID, &size, sizeof(size));
00427           data.resize(size);      
00428           recv_buffer(sourceProcessID, &data[0], size);
00429           break;
00430         } else {
00431           recv_buffer(sourceProcessID, &size, sizeof(size));
00432           dataTmp.resize(size);
00433           recv_buffer(sourceProcessID, &dataTmp[0], size);
00434           _hash[sourceProcessID][tagReceived] = dataTmp;
00435         }
00436       }
00437     }
00438   }
00439   
00440   
00441   // ******************************************************************* 
00442   // ***         Private Method: doBegEndLog                         *** 
00443   // ***                                                             *** 
00444   // ***  Centralized log method used by some of the public methods  *** 
00445   // ***  to send a standardized message to the log at the beginning *** 
00446   // ***  and the end of the routine.                                *** 
00447   // ***                                                             *** 
00448   // ******************************************************************* 
00449   
00450   void doBegEndLog(string method, enumBegEnd begEnd) {
00451     char buffer[256];
00452     char* be;
00453     
00454     if (begEnd == LOG_BEGIN) be = "BEGIN";
00455     else  be = "END";
00456     
00457     sprintf(buffer, "%i %s [%s]", _processID, be, method.c_str());
00458     log(buffer);
00459   }
00460     
00461 public:
00462   
00463   // ******************************************************************* 
00464   // ******************************************************************* 
00465   // ***                                                             *** 
00466   // ***                P U B L I C   M E T H O D S                  *** 
00467   // ***                                                             *** 
00468   // ******************************************************************* 
00469   // ******************************************************************* 
00470   
00471   
00472   // ******************************************************************* 
00473   // ***               Constructor: mdp_psim                         *** 
00474   // ***                                                             *** 
00475   // ***  Provide the number of processes to create and the name of  *** 
00476   // ***  the logfile if desired and "" if no logfile is needed.     *** 
00477   // ***                                                             *** 
00478   // ******************************************************************* 
00479   
00480   mdp_psim(int processCount, string logFileName=".psim.log", int verbatim=0) {
00481     psim_begin(processCount, logFileName, verbatim);
00482   }
00483   
00484   mdp_psim(int argc, char** argv) {
00485     int processCount=parse_argv_nprocs(argc,argv);
00486     string logFileName=parse_argv_logfile(argc,argv);
00487     int verbatim=parse_argv_verbatim(argc,argv);
00488     psim_begin(processCount, logFileName, verbatim);
00489   }
00490   
00491   
00492   // ******************************************************************* 
00493   // ***               Destructor: ~mdp_psim                         *** 
00494   // ***                                                             *** 
00495   // ***  Deallocates space that was created within the process,     *** 
00496   // ***  releases sockets, closes the log, etc.                       *** 
00497   // ***                                                             *** 
00498   // ******************************************************************* 
00499   
00500   virtual ~mdp_psim() {
00501     psim_end();
00502   }
00503   
00504   
00505   // ******************************************************************* 
00506   // ***         Public Method: log                                  *** 
00507   // ***                                                             *** 
00508   // ***  Accepts a string and appends the message to the common     *** 
00509   // ***  log file.  Note: locking is not necessary because of the   *** 
00510   // ***  deffinition of append.  It does not matter how many        *** 
00511   // ***  processes share file pointers, writing will always occur   *** 
00512   // ***  at the end of the file.                                    *** 
00513   // ***                                                             *** 
00514   // ******************************************************************* 
00515   
00516   void log(string message,int level=2) {    
00517     if (_doLogging) {
00518       int fd=fileno(_logfileFD);
00519       flock(fd,LOCK_EX);      
00520       fwrite("PSIM LOG: ", 10, 1, _logfileFD);
00521       fwrite(message.c_str(), message.length(), 1, _logfileFD);
00522       fwrite("\n", 1, 1, _logfileFD);
00523       // Clear out the file buffer
00524       fflush(_logfileFD);
00525       flock(fd,LOCK_UN);
00526     }
00527     if(_verbatim>=level) {
00528       cout << "PSIM LOG: " << message << endl;
00529       cout.flush();
00530     }
00531   }
00532   
00533   
00534   // ******************************************************************* 
00535   // ***         Public Method: id                                   *** 
00536   // ***                                                             *** 
00537   // ***  Returns an integer identifying which process is currently  *** 
00538   // ***  executing.                                                 *** 
00539   // ***                                                             *** 
00540   // ******************************************************************* 
00541   
00542   int id() {
00543     return _processID;
00544   }
00545   
00546   // ******************************************************************* 
00547   // ***         Public Method: nprocs                               *** 
00548   // ***                                                             *** 
00549   // ***  Returns an integer identifying the current number of       *** 
00550   // ***  active processes.                                          *** 
00551   // ***                                                             *** 
00552   // ******************************************************************* 
00553   
00554   int nprocs() {
00555     return _processCount;
00556   }
00557   
00558   
00559   // ******************************************************************* 
00560   // ***         Public Method: setCommTimeout                       *** 
00561   // ***                                                             *** 
00562   // ***  Sets the number of seconds that a process will wait to     *** 
00563   // ***  receive data from another process before throwing an       *** 
00564   // ***  exception.                                                 *** 
00565   // ***                                                             *** 
00566   // ******************************************************************* 
00567   
00568   void setCommTimeout(unsigned int commTimeout) {
00569     _commTimeout = commTimeout;
00570   }
00571   
00572   
00573   
00574   
00575   // ******************************************************************* 
00576   // ***         Public Method: send                                 *** 
00577   // ***                                                             *** 
00578   // ***  This aynchronous method sends the data referenced bu       *** 
00579   // ***  "dataToSend" to "destProcessID".  The size of the data     *** 
00580   // ***  is obtained by looking at the type "T".                    *** 
00581   // ***                                                             *** 
00582   // ******************************************************************* 
00583   
00584   template<class T>
00585   void send(int destProcessID, string dataTag, T &dataToSend) {
00586     logSendRecv(destProcessID, dataTag,LOG_SR_SEND, LOG_SR_START);  
00587     vector<char> data(sizeof(T));
00588     for(unsigned long k=0; k<sizeof(T); k++) 
00589       data[k]=((char*)&dataToSend)[k];
00590     send_binary(destProcessID, dataTag, data);    
00591     // cout << _processID << "->" << destProcessID << " " << dataTag << " " << dataToSend << endl;
00592     logSendRecv(destProcessID, dataTag, LOG_SR_SEND, LOG_SR_SUCCESS);
00593   }
00594   
00595   
00596   // ******************************************************************* 
00597   // ***         Public Method: send                                 *** 
00598   // ***                                                             *** 
00599   // ***  This aynchronous method sends the data at location         *** 
00600   // ***  "pdataToSend" to "destProcessID".  The size of the data    *** 
00601   // ***  being sent is provided in the integer: "dataSize".         *** 
00602   // ***                                                             *** 
00603   // ******************************************************************* 
00604   
00605   template<class T>
00606   void send(int destProcessID, string dataTag, 
00607             T *pdataToSend, long dataSize) {
00608     logSendRecv(destProcessID, dataTag, LOG_SR_SEND, LOG_SR_START);  
00609     vector<char> data(sizeof(T)*dataSize);
00610     for(long k=0; k<data.size(); k++) 
00611       data[k]=((char*)pdataToSend)[k];
00612     send_binary(destProcessID, dataTag, data);    
00613     logSendRecv(destProcessID, dataTag,LOG_SR_SEND, LOG_SR_SUCCESS);
00614   }
00615 
00616   // ******************************************************************* 
00617   // ***         Public Method: recv                                 *** 
00618   // ***                                                             *** 
00619   // ***  This synchronous "blocking" method retrieves the data sent *** 
00620   // ***  to "destProcessID".  The size of the data being sent is    *** 
00621   // ***  provided in the integer: "dataSize".                       *** 
00622   // ***                                                             *** 
00623   // ******************************************************************* 
00624   
00625   template<class T>
00626   void recv(int sourceProcessID, string dataTag, T &dataToReceive) {
00627     logSendRecv(sourceProcessID, dataTag,LOG_SR_RECV, LOG_SR_START);    
00628     vector<char> data;
00629     recv_binary(sourceProcessID, dataTag, data);    
00630     if(data.size()!=sizeof(T)) {
00631       log("PSIM ERROR: recv invalid data)");
00632       throw string("PSIM ERROR: recv invalid data)");
00633     };
00634     for(unsigned long k=0; k<sizeof(T); k++) 
00635       ((char*)&dataToReceive)[k]=data[k];
00636     // cout << _processID << "<-" << sourceProcessID << " " << dataTag << " " << dataToReceive << endl;
00637     logSendRecv(sourceProcessID, dataTag, LOG_SR_RECV, LOG_SR_SUCCESS);   
00638   }
00639   
00640   
00641   // ******************************************************************* 
00642   // ***         Public Method: recv                                 *** 
00643   // ***                                                             *** 
00644   // ***  This synchronous "blocking" method retrieves the data sent *** 
00645   // ***  to "destProcessID".  "dataSize" bytes are copied to        *** 
00646   // ***  location "pdataToReceive".                                 *** 
00647   // ***                                                             *** 
00648   // ******************************************************************* 
00649 
00650   template<class T>
00651   void recv(int sourceProcessID, string dataTag, 
00652             T *pdataToReceive, long dataSize) {
00653     logSendRecv(sourceProcessID, dataTag, LOG_SR_RECV, LOG_SR_START);    
00654     vector<char> data;
00655     recv_binary(sourceProcessID, dataTag, data);    
00656     if(data.size()!=sizeof(T)*dataSize) {
00657       log("PSIM ERROR: recv invalid data size");
00658       throw string("PSIM ERROR: recv invalid data size");
00659     }
00660     for(long k=0; k<data.size(); k++) 
00661       ((char*)pdataToReceive)[k]=data[k];
00662     logSendRecv(sourceProcessID, dataTag, LOG_SR_RECV, LOG_SR_SUCCESS);   
00663   }
00664   
00665   // ******************************************************************* 
00666   // ***         Public Method: broadcast                            *** 
00667   // ***                                                             *** 
00668   // ***  Allows broadcasting data to all processes.                 *** 
00669   // ***  sourceProcessID sends data (data) to all of the other      *** 
00670   // ***  processes who receive the data through the same variable). *** 
00671   // ***                                                             *** 
00672   // ******************************************************************* 
00673   
00674   template<class T>
00675   void broadcast(int sourceProcessID, T &data) {
00676     static string tag = "BROADCAST:0";
00677     doBegEndLog(tag, LOG_BEGIN);    
00678     if (_processID == sourceProcessID) {
00679       for (int i=0; i<_processCount; i++) {
00680         if (i != sourceProcessID)
00681           send(i, tag, data);
00682       }
00683     } else {
00684       recv(sourceProcessID, tag, data);
00685     }    
00686     if(tag=="BROADCAST:0") tag="BROADCAST:1"; else tag="BROADCAST:0";
00687     doBegEndLog(tag, LOG_END);
00688   }
00689 
00690   template<class T>
00691     void broadcast(int sourceProcessID, T *data, int dataSize) {
00692     static string tag = "BROADCASTV:0";
00693     doBegEndLog(tag, LOG_BEGIN);    
00694     if (_processID == sourceProcessID) {
00695       for (int i=0; i<_processCount; i++) {
00696         if (i != sourceProcessID)
00697           send(i, tag, data, dataSize);
00698       }
00699     } else {
00700       recv(sourceProcessID, tag, data, dataSize);
00701     }    
00702     if(tag=="BROADCASTV:0") tag="BROADCASTV:1"; else tag="BROADCASTV:0";
00703     doBegEndLog(tag, LOG_END);
00704   }
00705   
00706 
00707   // ******************************************************************* 
00708   // ***         Public Method: collect                              *** 
00709   // ***                                                             *** 
00710   // *** All parallel processes construct a list of the data passed  *** 
00711   // *** by each process.  The list is broadcasted and returned by   *** 
00712   // *** each processor.  This method is used to implement global    *** 
00713   // *** sum and some other global operations.                       *** 
00714   // ***                                                             *** 
00715   // ******************************************************************* 
00716   
00717   template<class T>
00718   vector<T> collect(int dest, T &data) {
00719     static string tag="COLLECT";
00720     vector<T> dataList;
00721     T dataToReceive;
00722     dataList.resize(_processCount);
00723     doBegEndLog(tag, LOG_BEGIN);
00724     
00725     if (_processID != dest) {
00726       send(dest, tag, data);
00727     } else {
00728       dataList[dest]=data;
00729       
00730       for (int i=0; i<_processCount; i++) {
00731         if(i!=dest) {
00732           recv(i, tag, dataToReceive);
00733           dataList[i]=dataToReceive;
00734         }
00735       }
00736     }
00737     if(tag=="COLLECT:0") tag="COLLECT:1"; else tag="COLLECT:0";
00738     doBegEndLog(tag, LOG_END);    
00739     return dataList;
00740   }
00741 
00742   // ******************************************************************* 
00743   // ***         Public Method: combine                              *** 
00744   // ***                                                             *** 
00745   // *** All parallel processes construct a list of the data passed  *** 
00746   // *** by each process.  The list is broadcasted and returned by   *** 
00747   // *** each processor.  This method is used to implement global    *** 
00748   // *** sum and some other global operations.                       *** 
00749   // ***                                                             *** 
00750   // ******************************************************************* 
00751 
00752   template<class T>
00753   vector<T> combine(T &data) {
00754     vector<T> dataList=collect(PROCESS_PARENT,data); 
00755     cout << id() << " size=" << dataList.size() << endl;
00756     
00757     broadcast(PROCESS_PARENT, &dataList[0], dataList.size());    
00758     cout << id() << " list=" << dataList[0] << dataList[1]<< dataList[2]<< endl;
00759     return dataList;
00760   }
00761   
00762   // ******************************************************************* 
00763   // ***         Public Method: barrier                              *** 
00764   // ***                                                             *** 
00765   // ***  Initiates a blocking point so that the processes pause     *** 
00766   // ***  until ALL processes have reached the barrier.              *** 
00767   // ***                                                             *** 
00768   // ******************************************************************* 
00769   
00770   void barrier() {
00771     int dummy;
00772     broadcast(PROCESS_PARENT,dummy);
00773     collect(PROCESS_PARENT,dummy);
00774   }
00775     
00776   // ******************************************************************* 
00777   // ***         Public Method: add                                  *** 
00778   // ***                                                             *** 
00779   // *** All parallel processes sum their data in parallel.  The sum *** 
00780   // *** is returned.                                                *** 
00781   // ***                                                             *** 
00782   // ******************************************************************* 
00783   
00784   template<class T>
00785   T add(T &item) {
00786     vector<T> dataList;
00787     T total=0;
00788     dataList=collect(PROCESS_PARENT,item);
00789     if(_processID==PROCESS_PARENT) 
00790       for (int i=0; i<dataList.size(); i++) {
00791         total += dataList[i];   
00792       }
00793     broadcast(PROCESS_PARENT,total);
00794     return total;
00795   }
00796 
00797   // ******************************************************************* 
00798   // ***         Public Method: auxiliary functions                  *** 
00799   // *** Examples:                                                   *** 
00800   // *** a.out -PSIM_NPROCS=2           (parallel processes)         *** 
00801   // *** a.out -PSIM_LOGFILE=./test.log (log into test.log)          *** 
00802   // *** a.out -PSIM_VERBATIM=1         (show all communications)    *** 
00803   // ***                                                             *** 
00804   // ******************************************************************* 
00805 
00806   static int parse_argv_nprocs(int argc, char** argv) {
00807     int n=1;
00808     for(int i=1; i<argc; i++)
00809       if(strncmp(argv[i],"-PSIM_NPROCS=",13)==0) {
00810         sscanf(argv[i]+13,"%i",&n);
00811         break;
00812       }
00813     return n;
00814   }
00815 
00816   static string parse_argv_logfile(int argc, char** argv) {
00817     for(int i=1; i<argc; i++)
00818       if(strncmp(argv[i],"-PSIM_LOGFILE=",14)==0) {
00819         return string(argv[i]+14);
00820       }
00821     return string("");
00822   }
00823 
00824   static int parse_argv_verbatim(int argc, char** argv) {
00825     int n=1;
00826     for(int i=1; i<argc; i++)
00827       if(strncmp(argv[i],"-PSIM_VERBATIM=",15)==0) {
00828         sscanf(argv[i]+15,"%i",&n);
00829         break;
00830       }
00831     return n;
00832   }
00833 };
00834 
00835 

Generated on Sun Feb 27 15:12:21 2005 by  doxygen 1.4.1