Main Page | Namespace List | Alphabetical List | Class List | Directories | File List | Class Members | File Members

SpikeStreamArchiver.cpp

Go to the documentation of this file.
00001 /***************************************************************************
00002  *   SpikeStream Archiver                                                  *
00003  *   Copyright (C) 2007 by David Gamez                                     *
00004  *   david@davidgamez.eu                                                   *
00005  *   Version 0.1                                                           *
00006  *                                                                         *
00007  *   This program is free software; you can redistribute it and/or modify  *
00008  *   it under the terms of the GNU General Public License as published by  *
00009  *   the Free Software Foundation; either version 2 of the License, or     *
00010  *   (at your option) any later version.                                   *
00011  *                                                                         *
00012  *   This program is distributed in the hope that it will be useful,       *
00013  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00014  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
00015  *   GNU General Public License for more details.                          *
00016  *                                                                         *
00017  *   You should have received a copy of the GNU General Public License     *
00018  *   along with this program; if not, write to the                         *
00019  *   Free Software Foundation, Inc.,                                       *
00020  *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.             *
00021  ***************************************************************************/
00022 
00023 //SpikeStream includes
00024 #include "Debug.h"
00025 #include "PVMMessages.h"
00026 #include "SpikeStreamArchiver.h"
00027 #include "GlobalVariables.h"
00028 #include "Utilities.h"
00029 
00030 //Other includes
00031 #include <ctime>
00032 #include <stdlib.h>
00033 #include "pvm3.h"
00034 #include <iostream>
00035 using namespace std;
00036 using namespace mysqlpp;
00037 
00038 
00039 //Declare static variables
00040 int SpikeStreamArchiver::parentTaskID = 0;
00041 bool SpikeStreamArchiver::errorState = false;
00042 
00043 
00044 /*! Constructor */
00045 SpikeStreamArchiver::SpikeStreamArchiver(int argc, char **argv){
00046         //Get task id and parent task id
00047         myTaskID = pvm_mytid();
00048         parentTaskID = pvm_parent();
00049 
00050         //Initialise variables
00051         networkDataStored = false;
00052 
00053         //Set up direct routing
00054         pvm_setopt( PvmRoute, PvmRouteDirect );
00055 
00056         //Initialize array for receiving spikes
00057         unpackArray = new unsigned int[MAX_NUMBER_OF_SPIKES];
00058 
00059         /* Read arguments from array. 
00060                 These include host, username and password for database and NeuronGrpID
00061                 Use options: -h [hostname] -d [database] -u [username]  -p [password] 
00062                 with spaces in between to avoid strict ordering. argv[0] is the executable name, so start
00063                 from 1. */
00064         char *neuralNetworkHost, *neuralNetworkUser, *neuralNetworkPassword, *neuralNetworkDatabase;
00065         char *archiveHost, *archiveUser, *archivePassword, *archiveDatabase;
00066 
00067         for(int i=1; i<argc; i += 2){
00068                 if(argv[i][0] == '-'){//This line contains an option
00069                         if(Utilities::cStringEquals(argv[i], "-nnh", 4)){//Extract neural network host information
00070                                 neuralNetworkHost = argv[i+1];
00071                         }
00072                         else if(Utilities::cStringEquals(argv[i], "-nnu", 4)) {//Extract nerual network user information
00073                                 neuralNetworkUser = argv[i+1];
00074                         }
00075                         else if(Utilities::cStringEquals(argv[i], "-nnp", 4)){// Extract password information
00076                                 neuralNetworkPassword = argv[i+1];
00077                         }
00078                         else if(Utilities::cStringEquals(argv[i], "-nnd", 4)){// Extract database information
00079                                 neuralNetworkDatabase = argv[i+1];
00080                         }
00081                         else if(Utilities::cStringEquals(argv[i], "-ah", 4)){//Extract neural network host information
00082                                 archiveHost = argv[i+1];
00083                         }
00084                         else if(Utilities::cStringEquals(argv[i], "-au", 4)) {//Extract nerual network user information
00085                                 archiveUser = argv[i+1];
00086                         }
00087                         else if(Utilities::cStringEquals(argv[i], "-ap", 4)){// Extract password information
00088                                 archivePassword = argv[i+1];
00089                         }
00090                         else if(Utilities::cStringEquals(argv[i], "-ad", 4)){// Extract database information
00091                                 archiveDatabase = argv[i+1];
00092                         }
00093                         else if(Utilities::cStringEquals(argv[i], "-an", 4)){// Extract archive name
00094                                 // Copy archive name
00095                                 Utilities::safeCStringCopy(archiveName, argv[i+1], MAX_DATABASE_NAME_LENGTH);
00096                         }
00097                         else if(Utilities::cStringEquals(argv[i], "-at", 4)){
00098                                 if(Utilities::cStringEquals(argv[i+1], "Neurons", 30))
00099                                         archiveType = FIRING_NEURON_ARCHIVE;
00100                                 else if(Utilities::cStringEquals(argv[i+1], "Spikes", 20))
00101                                         archiveType = SPIKE_ARCHIVE;
00102                                 else{
00103                                         cerr<<"SpikeStreamArchiver: ARCHIVE TYPE NOT RECOGNIZED: \""<<argv[i+1]<<"\". SETTING TO ARCHIVE FIRING NEURONS"<<endl;
00104                                         archiveType = FIRING_NEURON_ARCHIVE;
00105                                 }
00106                         }
00107                 }
00108                 else{
00109                         cerr<<"SpikeStreamArchiver: Invalid command line arguments!"<<endl;
00110                         return;
00111                 }
00112         }
00113 
00114         //Output database parameters to check them
00115         #ifdef COMMAND_LINE_PARAMETERS_DEBUG
00116                 cout<<"Neural network database parameters: "<<neuralNetworkHost<<", "<<neuralNetworkUser<<", "<<neuralNetworkPassword<<", "<<neuralNetworkDatabase<<endl;
00117                 cout<<"Archive database parameters: "<<archiveHost<<", "<<archiveUser<<", "<<archivePassword<<", "<<archiveDatabase<<endl;
00118         #endif//COMMAND_LINE_PARAMETERS_DEBUG
00119 
00120         //Create a new neural network database interface
00121         neuralNetworkDBInterface = new DBInterface(neuralNetworkHost, neuralNetworkUser, neuralNetworkPassword, neuralNetworkDatabase);
00122         if(!neuralNetworkDBInterface->connectToDatabase(true)){//Use exceptions
00123                 systemError("FAILED TO CONNECT TO THE NEURAL NETWORK DATABASE");//This sets up the error state, so will only respond to exit message now
00124         }
00125 
00126         //Create a new archive database interface
00127         archiveDBInterface = new DBInterface(archiveHost, archiveUser, archivePassword, archiveDatabase);
00128         if(!archiveDBInterface->connectToDatabase(true)){//Use exceptions
00129                 systemError("FAILED TO CONNECT TO NEURAL ARCHIVE DATABASE");//This sets up the error state, so will only respond to exit message now
00130         }
00131 
00132         /* Load up a list connecting neuron group ids to task ids. The neuron archiver
00133                 should be started after all the simulation tasks have started so all the task
00134                 IDs should be in the database */
00135         loadTaskIDs();
00136 
00137         /* Store an XML file recording the essential details of the neural network. 
00138                 This is to enable the archive to be played back even after the network has changed */
00139         archiveNeuralNetwork();
00140 
00141         /*Send confirmation message to parent that archive has started. */
00142         sendMessage(parentTaskID, ARCHIVE_TASK_STARTED_MSG, simulationStartTime);
00143 
00144         //Give user feedback about initialization of task
00145         cout<<"Archiver task successfully started with taskID "<<myTaskID<<endl;
00146 
00147         //Start main run method of task
00148         stop = false;
00149 
00150         run();
00151 }
00152 
00153 
00154 /*! Destructor */
00155 SpikeStreamArchiver::~SpikeStreamArchiver(){
00156         #ifdef MEMORY_DEBUG
00157                 cout<<"DELETING NEURON ARCHIVER. TASK ID: "<<myTaskID<<endl;
00158         #endif//MEMORY_DEBUG
00159 
00160         //Delete any classes allocated from the heap
00161         delete archiveDBInterface;
00162         delete neuralNetworkDBInterface;
00163 
00164         //Delete unpack array
00165         delete [] unpackArray;
00166 }
00167 
00168 
00169 /*! Main run method */
00170 void SpikeStreamArchiver::run(){
00171         while(!stop){
00172                 //Blocking receive - waiting for message from other tasks
00173                 int bufID = pvm_recv(-1, -1);
00174                 if(bufID < 0){
00175                         pvm_perror("SpikeStreamArchiver: MESSAGE RECEIVE ERROR");
00176                         systemError("SpikeStreamArchiver: MESSAGE RECEIVE ERROR");
00177                 }
00178                 else if (bufID > 0){
00179                         int bytes, msgtag, senderTID;
00180                         int info = pvm_bufinfo(bufID, &bytes, &msgtag, &senderTID);//Get info about message
00181                         if(info < 0){
00182                                 pvm_perror("SpikeStreamArchiver: PROBLEM GETTING BUFFER INFO");
00183                                 systemError("SpikeStreamArchiver: PROBLEM GETTING BUFFER INFO");
00184                         }
00185                         else if(errorState){//When there has been an error, task only responds to an exit msg
00186                                 if(msgtag == EXIT_MSG){
00187                                         #ifdef MESSAGE_DEBUG
00188                                                 cout<<"Task "<<myTaskID<<": EXIT_MSG "<<bytes<<" bytes received from "<<senderTID<<endl;
00189                                         #endif//MESSAGE_DEBUG
00190                                         stop = true;//Exits the archiver from the run loop
00191                                 }
00192                                 else{
00193                                         systemError("Task is in error state and will no longer carry out any operations.\nTermination of the simulation is recommended.");
00194                                 }
00195                         }
00196                         else{
00197                                 switch(msgtag){
00198                                         case (SPIKE_LIST_MSG):
00199                                                 #ifdef MESSAGE_DEBUG
00200                                                         cout<<"Task "<<myTaskID<<": SPIKE_LIST_MSG "<<bytes<<" bytes received from "<<senderTID<<endl;
00201                                                 #endif//MESSAGE_DEBUG
00202                                                 processSpikeList(senderTID);
00203                                         break;
00204                                         case (FIRING_NEURON_LIST_MSG):
00205                                                 #ifdef MESSAGE_DEBUG
00206                                                         cout<<"Task "<<myTaskID<<": FIRING_NEURON_LIST_MSG "<<bytes<<" bytes received from "<<senderTID<<endl;
00207                                                 #endif//MESSAGE_DEBUG
00208                                                 processFiringNeuronList(senderTID);
00209                                         break;
00210                                         case (START_ARCHIVING_MSG):
00211                                                 #ifdef MESSAGE_DEBUG
00212                                                         cout<<"Task "<<myTaskID<<": START_ARCHIVING_MSG "<<bytes<<" bytes received from "<<senderTID<<endl;
00213                                                 #endif//MESSAGE_DEBUG
00214                                                 startArchiving();
00215                                         break;
00216                                         case (STOP_ARCHIVING_MSG):
00217                                                 #ifdef MESSAGE_DEBUG
00218                                                         cout<<"Task "<<myTaskID<<": STOP_ARCHIVING_MSG "<<bytes<<" bytes received from "<<senderTID<<endl;
00219                                                 #endif//MESSAGE_DEBUG
00220                                                 stopArchiving();
00221                                         break;
00222                                         case (EXIT_MSG):
00223                                                 #ifdef MESSAGE_DEBUG
00224                                                         cout<<"Task "<<myTaskID<<": EXIT_MSG "<<bytes<<" bytes received from "<<senderTID<<endl;
00225                                                 #endif//MESSAGE_DEBUG
00226                                                 stop = true;//Exits the archiver from the run loop
00227                                         break;
00228                                         default:
00229                                                 cout<<"Task "<<myTaskID<<"; UNRECOGNIZED MESSAGE "<<bytes<<" bytes received from "<<senderTID<<endl;
00230                                 }
00231                         }
00232                 }
00233         }
00234 
00235         //Do any remaining tasks prior to exiting
00236         cleanUpArchiver();
00237 }
00238 
00239 
00240 //---------------------------------------------------------------------------------
00241 //-------------------------- PUBLIC STATIC METHODS --------------------------------
00242 //---------------------------------------------------------------------------------
00243 
00244 /*! Writes an error message to the standard output and sends a message to the parent task 
00245         with the error message. Also puts archiver into error state, in which it only responds
00246         to exit message so that SimulationManager can shut simulation down cleanly.*/
00247 void SpikeStreamArchiver::systemError(const char *message){
00248         errorState = true;//Put the program into error state when it stops operating and waits for exit message.
00249         cerr<<"SpikeStreamArchiver: "<<message<<endl;
00250         sendMessage(parentTaskID, ERROR_MSG, message);
00251 }
00252 
00253 
00254 /*! Writes an error message to the standard output and sends a message to the parent task 
00255         with the error message. Also puts archiver into error state, in which it only responds
00256         to exit message so that SimulationManager can shut simulation down cleanly.*/
00257 void SpikeStreamArchiver::systemError(const char *message, int messageData1){
00258         errorState = true;//Put the program into error state when it stops operating and waits for exit message.
00259         ostringstream tempStr;
00260         tempStr<<message<<messageData1;
00261         cerr<<"SpikeStreamArchiver: "<<message<<messageData1<<endl;
00262         sendMessage(parentTaskID, ERROR_MSG, tempStr.str().data());
00263 }
00264 
00265 
00266 /*! Writes an error message to the standard output and sends a message to the parent task 
00267         with the error message. Also puts archiver into error state, in which it only responds
00268         to exit message so that SimulationManager can shut simulation down cleanly.*/
00269 void SpikeStreamArchiver::systemError(const string &message){
00270         errorState = true;//Put the program into error state when it stops operating and waits for exit message.
00271         cerr<<"SpikeStreamArchiver: "<<message<<endl;
00272         sendMessage(parentTaskID, ERROR_MSG, message.data());
00273 }
00274 
00275 
00276 //---------------------------------------------------------------------------------
00277 //------------------------------ PRIVATE METHODS ----------------------------------
00278 //---------------------------------------------------------------------------------
00279 
00280 /*! Store an XML file recording the essential details of the neural network. 
00281         This is to enable the archive to be played back even after the network has changed 
00282         For the moment just store the neuron groups */
00283 bool SpikeStreamArchiver::archiveNeuralNetwork(){
00284         //Get the time in seconds at which this is being archived
00285         simulationStartTime = time(NULL);
00286 
00287         string xmlString = "<?xml version=\"1.0\" encoding=\"ISO-8859-1\"?>";
00288         xmlString += "<neural_network>";
00289 
00290         try{
00291                 Query networkQuery = neuralNetworkDBInterface->getQuery();
00292                 networkQuery.reset();
00293                 networkQuery<<"SELECT * FROM NeuronGroups";
00294                 Result result = networkQuery.store();
00295                 for(Result::iterator iter = result.begin(); iter != result.end(); ++iter){
00296                         Row row(*iter);
00297         
00298                         //First get start neuron group ID
00299                         unsigned int neuronGrpID = Utilities::getUInt(row["NeuronGrpID"]);
00300                         networkQuery.reset();
00301                         networkQuery<<"SELECT MIN(NeuronID) FROM Neurons WHERE NeuronGrpID = "<<neuronGrpID;
00302                         Result startNeuronRes = networkQuery.store();
00303                         Row startNeuronRow(*startNeuronRes.begin());//Should only be one row
00304         
00305                         xmlString += "<neuron_group id=\"";
00306                         xmlString +=(std::string)row["NeuronGrpID"];
00307                         xmlString += "\">";
00308         
00309                         xmlString += "<name>";
00310                         xmlString += (std::string)row["Name"];
00311                         xmlString += "</name>";
00312         
00313                         xmlString += "<start_neuron_id>";
00314                         xmlString += (std::string)startNeuronRow["MIN(NeuronID)"];
00315                         xmlString += "</start_neuron_id>";
00316         
00317                         xmlString += "<width>";
00318                         xmlString += (std::string)row["Width"];
00319                         xmlString += "</width>";
00320         
00321                         xmlString += "<length>";
00322                         xmlString += (std::string)row["Length"];
00323                         xmlString += "</length>";
00324         
00325                         xmlString += "<location>";
00326                         xmlString += (std::string)row["X"];
00327                         xmlString += ",";
00328                         xmlString += (std::string)row["Y"];
00329                         xmlString += ",";
00330                         xmlString += (std::string)row["Z"];
00331                         xmlString += "</location>";
00332                 
00333                         xmlString += "<spacing>";
00334                         xmlString += (std::string)row["Spacing"];
00335                         xmlString += "</spacing>";
00336         
00337                         xmlString += "<neuron_type>";
00338                         xmlString += (std::string)row["NeuronType"];
00339                         xmlString += "</neuron_type>";
00340         
00341                         xmlString += "</neuron_group>";
00342                 }
00343         
00344                 //Finish off XML string
00345                 xmlString += "</neural_network>";
00346 
00347                 //Write XML model to database   
00348                 Query archiveQuery = archiveDBInterface->getQuery();
00349                 archiveQuery.reset();
00350                 ostringstream strbuf;
00351         strbuf <<"INSERT INTO NetworkModels(StartTime, ArchiveType, Name, NeuronGroups) VALUES ( "<<simulationStartTime<<", "<<archiveType<<", \""<<archiveName<<"\", \""<<mysqlpp::escape<<xmlString<<"\" )";
00352         archiveQuery.exec(strbuf.str());
00353         }
00354         catch (const BadQuery& er) {// Handle any query errors
00355                 ostringstream errorStrStream;
00356                 errorStrStream<<"Bad query when archiving neural network: \""<<er.what()<<"\"";
00357                 systemError(errorStrStream.str());
00358                 return false;
00359         }
00360         catch (const Exception& er) {// Catch-all for any other MySQL++ exceptions
00361                 ostringstream errorStrStream;
00362                 errorStrStream<<"Exception thrown archiving neural network: \""<<er.what()<<"\"";
00363                 systemError(errorStrStream.str());
00364                 return false;
00365         }
00366         catch(std::exception& er){// Catch-all for any other exceptions
00367                 ostringstream errorStrStream;
00368                 errorStrStream<<"Exception thrown archiving neural network: \""<<er.what()<<"\"";
00369                 systemError(errorStrStream.str());
00370                 return false;
00371         }
00372 
00373         //Everything ok if we have reached this point
00374         return true;
00375 }
00376 
00377 
00378 /*! Called just before exiting run method to do any final tasks, such as storing spike lists
00379         or deleting network model if no spike lists have been stored. */
00380 void SpikeStreamArchiver::cleanUpArchiver(){
00381         cout<<"Cleaning up archiver"<<endl;
00382         //Archive any half completed timeStepHolders
00383         for(map<unsigned int, TimeStepHolder>::iterator iter = timeStepMap.begin(); iter != timeStepMap.end(); ++iter){
00384                 //Finish off xml string
00385                 iter->second.xmlString += "</network_pattern>";
00386 
00387                 /* Store string in database. */
00388                 storeNetworkData(iter->first, iter->second.xmlString);
00389         }
00390 
00391         //Delete network model if no data has been stored for it
00392         if(!networkDataStored){
00393                 try{
00394                         Query archiveQuery = archiveDBInterface->getQuery();
00395                         archiveQuery.reset();
00396                         archiveQuery<<"DELETE FROM NetworkModels WHERE StartTime = "<<simulationStartTime;
00397                         archiveQuery.execute();
00398                 }
00399                 catch (const BadQuery& er) {// Handle any query errors
00400                         ostringstream errorStrStream;
00401                         errorStrStream<<"Bad query when deleting empty network models: \""<<er.what()<<"\"";
00402                         systemError(errorStrStream.str());
00403                 }
00404                 catch (const Exception& er) {// Catch-all for any other MySQL++ exceptions
00405                         ostringstream errorStrStream;
00406                         errorStrStream<<"Exception thrown deleting empty network models: \""<<er.what()<<"\"";
00407                         systemError(errorStrStream.str());
00408                 }
00409                 catch(std::exception& er){// Catch-all for any other exceptions
00410                         ostringstream errorStrStream;
00411                         errorStrStream<<"Exception thrown deleting empty network models: \""<<er.what()<<"\"";
00412                         systemError(errorStrStream.str());
00413                 }
00414         }
00415 
00416         //Inform main application that clean up is complete
00417         sendMessage(parentTaskID, TASK_EXITED_MSG);
00418         cout<<"SpikeStreamArchiver: Finished storing data, now exiting pvm and invoking destructor"<<endl;
00419 
00420         //Exit from pvm, destructor should be automatically invoked
00421         pvm_exit();
00422 }
00423 
00424 
00425 /*! Converts an unsigned integer to a string. */
00426 string SpikeStreamArchiver::getString(unsigned int uInt){
00427         stringstream tempStringStr;
00428         tempStringStr<<uInt;
00429         return tempStringStr.str();
00430 }
00431 
00432 
00433 /*! Fills the map connecting NeuronGrpIDs and TaskIDs. */
00434 bool SpikeStreamArchiver::loadTaskIDs(){
00435         spikeMessageTotal = 0;
00436         try{
00437                 Query networkQuery = neuralNetworkDBInterface->getQuery();
00438                 networkQuery.reset();
00439                 networkQuery<<"SELECT NeuronGrpID, TaskID FROM NeuronGroups";
00440                 Result taskResult = networkQuery.store();
00441                 for(Result::iterator iter = taskResult.begin(); iter != taskResult.end(); ++iter){
00442                         Row taskRow(*iter);
00443                         int tempTaskID = Utilities::getInt(taskRow["TaskID"]);
00444                         unsigned int tempNeuronGrpID = Utilities::getUInt(taskRow["NeuronGrpID"]);
00445                         
00446                         // Add entry storing the link between taskID and neuronGrpID
00447                         taskToNeuronGrpMap[tempTaskID] = tempNeuronGrpID;
00448         
00449                         // Add taskID to spike message total to track whether all spike messages have arrived
00450                         spikeMessageTotal += tempTaskID;
00451                 }
00452         
00453                 /* Work through the neuron groups and load up the start neuron ID for each task. 
00454                         This is used to decompress the messages. */
00455                 for(map<int, unsigned int>::iterator iter = taskToNeuronGrpMap.begin(); iter != taskToNeuronGrpMap.end(); ++iter){
00456                         networkQuery.reset();
00457                         networkQuery<<"SELECT MIN(NeuronID) FROM Neurons WHERE NeuronGrpID = "<<iter->second;
00458                         Result minNeurIDRes = networkQuery.store();
00459                         Row minNeurIDRow(*minNeurIDRes.begin());//Should be only one row
00460                         startNeurIDTaskMap[iter->first] = Utilities::getUInt((std::string)minNeurIDRow["MIN(NeuronID)"]);
00461                 }
00462         }
00463         catch (const BadQuery& er) {// Handle any query errors
00464                 ostringstream errorStrStream;
00465                 errorStrStream<<"Bad query when loading task IDs: \""<<er.what()<<"\"";
00466                 systemError(errorStrStream.str());
00467                 return false;
00468         }
00469         catch (const Exception& er) {// Catch-all for any other MySQL++ exceptions
00470                 ostringstream errorStrStream;
00471                 errorStrStream<<"Exception thrown loading task IDs: \""<<er.what()<<"\"";
00472                 systemError(errorStrStream.str());
00473                 return false;
00474         }
00475         catch(std::exception& er){// Catch-all for any other exceptions
00476                 ostringstream errorStrStream;
00477                 errorStrStream<<"Exception thrown loading task IDs: \""<<er.what()<<"\"";
00478                 systemError(errorStrStream.str());
00479                 return false;
00480         }
00481 
00482         //If we have reached this point everything should be ok
00483         return true;
00484 }
00485 
00486 
00487 /*! Processes a list of firing neurons from the task simulating a neuron group. 
00488         Need to receive all the neuron groups, which is checked using task ids.
00489         For each neuron group add the firing neurons to the archive. */
00490 void SpikeStreamArchiver::processFiringNeuronList(int senderTID){
00491         //Create a string stream for writing neuron ids to
00492         stringstream firingNeuronStringStr;
00493 
00494         //Get the time step of the message
00495         int info = pvm_upkuint(&messageTimeStep, 1, 1);
00496         #ifdef PVM_DEBUG
00497                 if(info < 0){
00498                         cerr<<"SpikeStreamArchiver: ERROR EXTRACTING MESSAGE TIME STEP; TASK ID = "<<pvm_mytid()<<"; messageTimeStep: "<<messageTimeStep<<endl;
00499                         return;
00500                 }
00501         #endif//PVM_DEBUG
00502 
00503         //See if we already have a timeStepHolder for this time step
00504         if(timeStepMap.find(messageTimeStep) != timeStepMap.end()){
00505                 //Have found one so increase spike message count for map
00506                 timeStepMap[messageTimeStep].spikeMessageCount += senderTID;
00507         }
00508         /* Do not have a timeStepHolder for this map, so create one and 
00509                 initialise spikeMessageCount and xmlString */
00510         else{
00511                 //Initialise spikeMessageCount
00512                 timeStepMap[messageTimeStep].spikeMessageCount = senderTID;
00513 
00514                 //Start creating the XML file
00515                 timeStepMap[messageTimeStep].xmlString = "<?xml version=\"1.0\" encoding=\"ISO-8859-1\"?>";
00516                 timeStepMap[messageTimeStep].xmlString += "<network_pattern>";
00517         }
00518 
00519         // Extract the number of neurons in the message
00520         info = pvm_upkuint(&numberOfSpikes, 1, 1);
00521         #ifdef PVM_DEBUG
00522                 if(info < 0){
00523                         cerr<<"SpikeStreamArchiver: ERROR NUMBER OF FIRING NEURONS FROM MESSAGE; TASK ID = "<<pvm_mytid()<<"; numberOfSpikes: "<<numberOfSpikes<<endl;
00524                         return;
00525                 }
00526         #endif//PVM_DEBUG
00527 
00528         //Unpack the list of firing neurons. No problem with duplicates here.
00529         info = pvm_upkuint(unpackArray, numberOfSpikes, 1);
00530         #ifdef PVM_DEBUG
00531                 if(info < 0){
00532                         cerr<<"SpikeStreamArchiver: ERROR UNPACKING FIRING NEURON IDS FROM MESSAGE; TASK ID = "<<pvm_mytid()<<"; numberOfSpikes: "<<numberOfSpikes<<endl;
00533                         return;
00534                 }
00535         #endif//PVM_DEBUG
00536 
00537         //Now add list of firing neurons to string stream
00538         for(int i=0; i<numberOfSpikes; ++i){
00539                 firingNeuronStringStr<<unpackArray[i];
00540                 if(i != numberOfSpikes - 1)//don't add comma for the last number
00541                         firingNeuronStringStr<<",";
00542         }
00543 
00544         #ifdef FIRING_NEURON_DEBUG
00545                 cout<<"SpikeStreamArchiver: Number of firing neurons processed: "<<numberOfSpikes<<" from sender: "<<senderTID<<" at timeStep: "<<messageTimeStep<<endl;
00546         #endif//FIRING_NEURON_DEBUG
00547 
00548         //Add string with firing neuron information to xml string
00549         timeStepMap[messageTimeStep].xmlString += "<neuron_group id=\"";
00550         timeStepMap[messageTimeStep].xmlString += getString(taskToNeuronGrpMap[senderTID]);
00551         timeStepMap[messageTimeStep].xmlString += "\">";
00552         timeStepMap[messageTimeStep].xmlString += firingNeuronStringStr.str();//firingNeuronString;
00553         timeStepMap[messageTimeStep].xmlString += "</neuron_group>";
00554 
00555         //Check the time step map to see if any of them is complete
00556         for(map<unsigned int, TimeStepHolder>::iterator iter = timeStepMap.begin(); iter != timeStepMap.end(); ++iter){
00557                 if(iter->second.spikeMessageCount == spikeMessageTotal){//All messages have been received for this time step
00558                         
00559                         //Finish off xml string
00560                         iter->second.xmlString += "</network_pattern>";
00561 
00562                         //Store string in database
00563                         if(storeNetworkData(iter->first, iter->second.xmlString)){
00564                                 //Remove timeStepHolder from map if it has been successfully stored
00565                                 timeStepMap.erase(iter);
00566                         }
00567 
00568                         //Should only be one timeStepHolder completed for each time step so break here
00569                         break;
00570                 }
00571         }
00572 }
00573 
00574 
00575 /*! Processes a list of spikes from the task simulating the neuron group that is being monitored. 
00576         Need to receive all the neuron groups. This is checked using task ids.
00577         For each neuron group add the firing neurons to the archive. */
00578 void SpikeStreamArchiver::processSpikeList(int senderTID){
00579                 //Get the time step of the message
00580         int info = pvm_upkuint(&messageTimeStep, 1, 1);
00581         #ifdef PVM_DEBUG
00582                 if(info < 0){
00583                         cerr<<"SpikeStreamArchiver: ERROR EXTRACTING MESSAGE TIME STEP; TASK ID = "<<pvm_mytid()<<"; messageTimeStep: "<<messageTimeStep<<endl;
00584                         return;
00585                 }
00586         #endif//PVM_DEBUG
00587 
00588         //See if we already have a timeStepHolder for this time step
00589         if(timeStepMap.find(messageTimeStep) != timeStepMap.end()){
00590                 //Have found one so increase spike message count for map
00591                 timeStepMap[messageTimeStep].spikeMessageCount += senderTID;
00592         }
00593 
00594         /* Do not have a timeStepHolder for this map, so create one and 
00595                 initialise spikeMessageCount, unix time and xmlString */
00596         else{
00597                 //Initialise spikeMessageCount
00598                 timeStepMap[messageTimeStep].spikeMessageCount = senderTID;
00599 
00600                 //Start creating the XML file
00601                 timeStepMap[messageTimeStep].xmlString = "<?xml version=\"1.0\" encoding=\"ISO-8859-1\"?>";
00602                 timeStepMap[messageTimeStep].xmlString += "<network_pattern>";
00603         }
00604 
00605         // Extract the number of spikes in the message
00606         info = pvm_upkuint(&numberOfSpikes, 1, 1);
00607         #ifdef PVM_DEBUG
00608                 if(info < 0){
00609                         cerr<<"NetworkMonitor: ERROR NUMBER OF FIRING NEURONS FROM MESSAGE; TASK ID = "<<pvm_mytid()<<"; numberOfSpikes: "<<numberOfSpikes<<endl;
00610                         return;
00611                 }
00612         #endif//PVM_DEBUG
00613 
00614         //Unpack the spikes into the firingNeuronMap to filter out duplicates
00615         firingNeuronMap.clear();
00616 
00617         //Unpack the list of spikes 
00618         info = pvm_upkuint(unpackArray, numberOfSpikes, 1);//Unpack from and to neuron ids as two shorts compressed into an integer
00619         #ifdef PVM_DEBUG
00620                 if(info < 0){
00621                         systemError("NetworkMonitor: ERROR UNPACKING UNSIGNED INT FROM MESSAGE. NUMBER OF SPIKES = ", numberOfSpikes);
00622                         return;
00623                 }
00624         #endif//PVM_DEBUG
00625         
00626         //Process the spikes to get the from ids
00627         for(unsigned int i=0; i<numberOfSpikes; ++i){
00628                 /* Add the from neuron ID to the from key.
00629                         Only need the from neuron id because we are monitoring spikes from a layer*/
00630                 unpkFromNeurID = (unsigned short) unpackArray[i];
00631                 unpkFromNeurID += startNeurIDTaskMap[senderTID];;
00632 
00633                 //Add it to the firing neuron map
00634                 firingNeuronMap [ unpkFromNeurID ] = true;
00635         }
00636 
00637 
00638         //Now add list of firing neurons to string buffer
00639         stringstream firingNeuronStringStr;
00640         bool firstTime = true;
00641         for(map<unsigned int, bool>::iterator iter = firingNeuronMap.begin(); iter != firingNeuronMap.end(); ++iter){
00642                 if(!firstTime){
00643                         firingNeuronStringStr<<",";
00644                 }
00645                 firingNeuronStringStr<<iter->first;
00646                 firstTime = false;
00647         }
00648 
00649         //Add string with firing neuron information to xml string
00650         timeStepMap[messageTimeStep].xmlString += "<neuron_group id=\"";
00651         timeStepMap[messageTimeStep].xmlString += getString(taskToNeuronGrpMap[senderTID]);
00652         timeStepMap[messageTimeStep].xmlString += "\">";
00653         timeStepMap[messageTimeStep].xmlString += firingNeuronStringStr.str();
00654         timeStepMap[messageTimeStep].xmlString += "</neuron_group>";
00655 
00656 
00657         //Check the time step map to see if any of them is complete
00658         for(map<unsigned int, TimeStepHolder>::iterator iter = timeStepMap.begin(); iter != timeStepMap.end(); ++iter){
00659                 if(iter->second.spikeMessageCount == spikeMessageTotal){//All messages have been received for this time step
00660                         
00661                         //Finish off xml string
00662                         iter->second.xmlString += "</network_pattern>";
00663 
00664                         //Store string in database
00665                         if(storeNetworkData(iter->first, iter->second.xmlString)){
00666                                 //Remove timeStepHolder from map if it has been successfully stored
00667                                 timeStepMap.erase(iter);
00668                         }
00669 
00670                         //Should only be one timeStepHolder completed for each time step so break here
00671                         break;
00672                 }
00673         }
00674 }
00675 
00676 /*! Sends a message without any contents. */
00677 bool SpikeStreamArchiver::sendMessage(int taskID, int msgtag){
00678         //Initialise the buffer
00679         int info = pvm_initsend(PvmDataDefault);
00680         #ifdef PVM_DEBUG
00681                 if(info < 0){
00682                         cout<<"SpikeStreamArchiver: Init send error: "<<info<<" with task "<<taskID<<" and msgtag: "<<msgtag<<endl;
00683                         return false;
00684                 }
00685         #endif//PVM_DEBUG
00686 
00687         //Send the message
00688         info = pvm_send(taskID, msgtag);
00689         #ifdef PVM_DEBUG
00690                 if(info < 0){
00691                         cout<<"SpikeStreamArchiver: Send error: "<<info<<" with task "<<taskID<<" and msgtag: "<<msgtag<<endl;
00692                         return false;
00693                 }
00694         #endif//PVM_DEBUG
00695         return true;
00696 }
00697 
00698 
00699 /*! Sends a message with a single integer. */
00700 bool SpikeStreamArchiver::sendMessage(int taskID, int msgtag, unsigned int msgInteger){
00701         //Initialise buffer
00702         int info = pvm_initsend(PvmDataDefault);
00703         #ifdef PVM_DEBUG
00704                 if(info < 0){
00705                         cout<<"SpikeStreamArchiver: Init send error: "<<info<<" with task "<<taskID<<" and msgtag: "<<msgtag<<endl;
00706                         return false;
00707                 }
00708         #endif//PVM_DEBUG
00709 
00710         //Pack the integer
00711         info = pvm_pkuint(&msgInteger, 1, 1);
00712         #ifdef PVM_DEBUG
00713                 if(info < 0){
00714                         cerr<<"SpikeStreamArchiver: ERROR PACKING INTEGER INTO MESSAGE"<<endl;
00715                         return false;
00716                 }
00717         #endif//PVM_DEBUG
00718         
00719         //Send the message
00720         info = pvm_send(taskID, msgtag);
00721         #ifdef PVM_DEBUG
00722                 if(info < 0){
00723                         cout<<"SpikeStreamArchiver: Send error: "<<info<<" with task "<<taskID<<" and msgtag: "<<msgtag<<endl;
00724                         return false;
00725                 }
00726         #endif//PVM_DEBUG
00727         return true;
00728 }
00729 
00730 
00731 /*! Sends a message containing a char string. */
00732 bool SpikeStreamArchiver::sendMessage(int taskID, int msgtag, const char* charArray){
00733         //First find the length of the char array (+1 so that other end does not have to add 1 for null character)
00734         unsigned int arrayLength = strlen(charArray) + 1;
00735 
00736         //Initialise the buffer
00737         int info = pvm_initsend(PvmDataDefault);
00738         #ifdef PVM_DEBUG
00739                 if(info<0){
00740                         cerr<<"SpikeStreamArchiver: Init send error: tag: "<<msgtag<<" to: "<<taskID<<" containing: "<<charArray<<endl;
00741                         return false;
00742                 }
00743         #endif//PVM_DEBUG
00744 
00745         //Pack the length of the char array
00746         info = pvm_pkuint(&arrayLength, 1, 1);
00747         #ifdef PVM_DEBUG
00748                 if(info < 0){
00749                         cerr<<"SpikeStreamArchiver: ERROR PACKING MESSAGE LENGTH INTO MESSAGE"<<endl;
00750                         return false;
00751                 }
00752         #endif //PVM_DEBUG
00753 
00754         //Pack the char array
00755         info = pvm_pkstr((char*)charArray);
00756         #ifdef PVM_DEBUG
00757                 if(info < 0){
00758                         cerr<<"SpikeStreamArchiver: ERROR PACKING  CHAR* INTO MESSAGE"<<endl;
00759                         return false;
00760                 }
00761         #endif //PVM_DEBUG
00762 
00763         //Send message
00764         info = pvm_send(taskID, msgtag);
00765         #ifdef PVM_DEBUG
00766                 if(info<0){
00767                         cerr<<"NeuronSimulation: Message send error: tag: "<<msgtag<<" to: "<<taskID<<" containing: "<<charArray<<endl;
00768                         return false;
00769                 }
00770         #endif //PVM_DEBUG
00771         return true;
00772 }
00773 
00774 
00775 /*! Sends a message to all simulation tasks requesting them to start sending spike messages. */
00776 void SpikeStreamArchiver::startArchiving(){
00777         for(map<int, unsigned int>::iterator iter = taskToNeuronGrpMap.begin(); iter != taskToNeuronGrpMap.end(); ++iter){
00778                 //Send a different message depending on the archive type
00779                 if(archiveType == FIRING_NEURON_ARCHIVE)
00780                         sendMessage(iter->first, REQUEST_FIRING_NEURON_DATA_MSG);
00781                 else if (archiveType == SPIKE_ARCHIVE)
00782                         sendMessage(iter->first, REQUEST_SPIKE_DATA_MSG);
00783         }
00784 }
00785 
00786 
00787 /*! Sends a message to all simulation taks requesting them to stop sending spike messages. */
00788 void SpikeStreamArchiver::stopArchiving(){
00789         for(map<int, unsigned int>::iterator iter = taskToNeuronGrpMap.begin(); iter != taskToNeuronGrpMap.end(); ++iter){
00790                 if(archiveType == FIRING_NEURON_ARCHIVE)
00791                         sendMessage(iter->first, CANCEL_FIRING_NEURON_DATA_MSG);
00792                 else if (archiveType == SPIKE_ARCHIVE)
00793                         sendMessage(iter->first, CANCEL_SPIKE_DATA_MSG);
00794         }
00795 }
00796 
00797 
00798 /*! Stores the network data in the database. */
00799 bool SpikeStreamArchiver::storeNetworkData(unsigned int timeStep, string xmlString){
00800         try{
00801                 Query archiveQuery = archiveDBInterface->getQuery();
00802                 archiveQuery.reset();
00803                 ostringstream strbuf;
00804                 strbuf <<"INSERT INTO NetworkData VALUES ( "<<simulationStartTime<<", "<<timeStep<<", \""<<mysqlpp::escape<<xmlString<<"\" )";
00805                 //Only preview when using xpvm, otherwise this could crash.
00806                 bool queryResult = archiveQuery.exec(strbuf.str());
00807                 if(!queryResult){
00808                         systemError("SpikeStreamArchiver: UNSUCCESSFUL MYSQLPP QUERY STORING NETWORK DATA");
00809                         return false;
00810                 }
00811 
00812                 /* Record that network data has been stored. 
00813                         Model file will be deleted if no network data is stored for it */
00814                 networkDataStored = true;
00815         }
00816         catch (const BadQuery& er) {// Handle any query errors
00817                 ostringstream errorStrStream;
00818                 errorStrStream<<"Bad query when storing network data: \""<<er.what()<<"\"";
00819                 systemError(errorStrStream.str());
00820                 return false;
00821         }
00822         catch (const Exception& er) {// Catch-all for any other MySQL++ exceptions
00823                 ostringstream errorStrStream;
00824                 errorStrStream<<"Exception thrown storing network data: \""<<er.what()<<"\"";
00825                 systemError(errorStrStream.str());
00826                 return false;
00827         }
00828         catch(std::exception& er){// Catch-all for any other exceptions
00829                 ostringstream errorStrStream;
00830                 errorStrStream<<"Exception thrown storing network data: \""<<er.what()<<"\"";
00831                 systemError(errorStrStream.str());
00832                 return false;
00833         }
00834 
00835         //If we have reached this point everything should be ok
00836         return true;
00837 }
00838 
00839 
00840 
00841 

Generated on Mon Sep 3 22:09:16 2007 for SpikeStream Archiver by  doxygen 1.4.4