Main Page | Namespace List | Alphabetical List | Class List | Directories | File List | Class Members | File Members

TaskHolder.cpp

Go to the documentation of this file.
00001 /***************************************************************************
00002  *   SpikeStream Simulation                                                *
00003  *   Copyright (C) 2007 by David Gamez                                     *
00004  *   david@davidgamez.eu                                                   *
00005  *   Version 0.1                                                           *
00006  *                                                                         *
00007  *   This program is free software; you can redistribute it and/or modify  *
00008  *   it under the terms of the GNU General Public License as published by  *
00009  *   the Free Software Foundation; either version 2 of the License, or     *
00010  *   (at your option) any later version.                                   *
00011  *                                                                         *
00012  *   This program is distributed in the hope that it will be useful,       *
00013  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00014  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
00015  *   GNU General Public License for more details.                          *
00016  *                                                                         *
00017  *   You should have received a copy of the GNU General Public License     *
00018  *   along with this program; if not, write to the                         *
00019  *   Free Software Foundation, Inc.,                                       *
00020  *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.             *
00021  ***************************************************************************/
00022 
00023 //SpikeStream includes
00024 #include "TaskHolder.h"
00025 #include "Debug.h"
00026 #include "PVMMessages.h"
00027 #include "SimulationClock.h"
00028 #include "SpikeStreamSimulation.h"
00029 
00030 //Other includes
00031 #include "pvm3.h"
00032 #include <iostream>
00033 using namespace std;
00034 
00035 
00036 /*! Constructor. */
00037 TaskHolder::TaskHolder(int thisTID, int destTID){
00038         //Store this and destination task ids
00039         thisTaskID = thisTID;
00040         primaryDestinationID = destTID;
00041 
00042         //Currently there is only 1 destination task id
00043         numberOfTasks = 1;
00044         
00045         /* Set up array to hold destination tasks. 
00046                 This may increase if monitoring or archiving is needed. */
00047         destinationTaskIDs = new int[1];
00048         destinationTaskIDs[0] = primaryDestinationID;
00049 
00050         // Initialise the current timeCounter
00051         bufferCounter = 0;
00052 
00053         //Initialise the message spike count array
00054         for(int i=0; i<NUMBER_OF_DELAY_VALUES; ++i)
00055                 messageSpikeCount[i] = 0;
00056 }
00057 
00058 
00059 /*! Destructor. */
00060 TaskHolder::~TaskHolder(){
00061         #ifdef MEMORY_DEBUG
00062                 cout<<"DELETING TASK HOLDER. TID: "<<thisTaskID<<endl;
00063         #endif//MEMORY_DEBUG    
00064 
00065         //Delete dynamic arrays
00066         if(numberOfTasks > 0)
00067                 delete [] destinationTaskIDs;
00068 }
00069 
00070 
00071 //-----------------------------------------------------------------------------
00072 //----------------------------- PUBLIC METHODS --------------------------------
00073 //-----------------------------------------------------------------------------
00074 
00075 /*! Adds a receiving task to the task array.
00076         Done in this clunky way because the broadcast message function in pvm needs an
00077         integer array, not a vector, as one of its arguments. */
00078 void TaskHolder::addReceivingTask(int newTaskID){
00079         //First filter out accidental duplicates
00080         for     (int i=0; i<numberOfTasks; ++i){
00081                 if (destinationTaskIDs[i] == newTaskID)
00082                         return;
00083         }
00084 
00085         // Store a reference to the old array
00086         int *oldArray = destinationTaskIDs;
00087         
00088         //Create a new array to hold taskIDs
00089         destinationTaskIDs = new int[numberOfTasks + 1];
00090         
00091         //Copy old array into new array
00092         for(int i=0; i<numberOfTasks; i++)
00093                 destinationTaskIDs[i] = oldArray[i];
00094                 
00095         //Add new task to array
00096         destinationTaskIDs[numberOfTasks] = newTaskID;
00097         
00098         //Remove the old array 
00099         if(numberOfTasks > 0)
00100                 delete [] oldArray;
00101 
00102         //Increase the number of tasks.
00103         ++numberOfTasks;
00104 }
00105 
00106 
00107 /*! Debug method that prints out the contents of the spike message buffer. */
00108 void TaskHolder::printBuffers(){
00109         cout<<"TaskHolder for task "<<thisTaskID<<"; Buffer counter = "<<bufferCounter<<"; Printing buffers.............."<<endl;
00110         for(int i=0; i<NUMBER_OF_DELAY_VALUES; ++i){
00111                 if(messageSpikeCount[i] > 0){// Only print buffers that have data in them
00112                         cout<<"Buffer "<<i<<": Size: "<<messageSpikeCount[i]<<"; "<<endl;
00113                         for(vector<ConnectionHolder*>::iterator iter = spikeMessageBuffer[i].begin(); iter != spikeMessageBuffer[i].end(); ++iter){
00114                         cout<<"\tConnection Holder. Delay="<<(*iter)->delay<<"; Size="<<(*iter)->numConnIDs;
00115                                 /* Now print contents of each connection id. 
00116                                         Remember that each connection id spans two integers */
00117                                 for(int j=0; j<(*iter)->numConnIDs * 2; j += 2)
00118                                         cout<<" from "<<(*iter)->connIDArray[j]<<" to "<<(*iter)->connIDArray[j+1]<<";";
00119                                 cout<<endl;
00120                         }
00121                 }
00122         }
00123 }
00124 
00125 
00126 /*! Removes a receiving task from the task holder. */
00127 void TaskHolder::removeReceivingTask(int taskID){
00128         /* This method should only be called to remove additional tasks. 
00129                 Should be at least one task left */
00130         if(taskID == primaryDestinationID){
00131                 cerr<<"TaskHolder.cpp: CANNOT REMOVE PRIMARY TASK. TRYING TO REMOVE: "<<taskID<<endl;
00132                 return;
00133         }
00134         
00135         //Check that task ID is in receivingTaskIDs
00136         bool taskIDFound = false;
00137         for(int i=0; i<numberOfTasks; ++i)
00138                 if(destinationTaskIDs[i] == taskID)
00139                         taskIDFound = true;
00140         
00141         if(!taskIDFound){
00142                 cerr<<"TaskHolder: TRYING TO REMOVE RECEIVING TASK. TASK ID CANNOT BE FOUND. TID: "<<thisTaskID<<endl;
00143                 return;
00144         }
00145 
00146         // Store a reference to the old array
00147         int *oldArray = destinationTaskIDs;
00148         
00149         //Create a new array to hold taskIDs
00150         destinationTaskIDs = new int[numberOfTasks - 1];
00151         
00152         //Copy old array into new array
00153         int counter = 0;
00154         for(int i=0; i<numberOfTasks; ++i)
00155                 if(destinationTaskIDs[i] != taskID){
00156                         destinationTaskIDs[counter] = oldArray[i];
00157                         ++counter;
00158                 }
00159         
00160         //Remove the old array and decrease the number of tasks.
00161         delete [] oldArray;
00162         --numberOfTasks;
00163 }
00164 
00165 
00166 /*! Sends spike messages to other tasks and this task. */
00167 bool TaskHolder::sendSpikeMessages(){
00168         //Initialise the send buffer
00169         int info = pvm_initsend( PvmDataDefault );
00170         #ifdef PVM_DEBUG
00171                 if(info < 0){
00172                         cerr<<"TaskHolder: ERROR INITIALISING SEND BUFFER; TASK ID = "<<thisTaskID<<endl;
00173                         return false;
00174                 }
00175         #endif//PVM_DEBUG
00176 
00177 
00178          /* Work through spikeVectors 
00179                 for each connection holder pack the array using the size.
00180                 In non-debug mode run as quickly as possible without checking for error messages.*/
00181 
00182         //Add timestep to message
00183         unsigned int messageTimeStep = SpikeStreamSimulation::simulationClock->getTimeStep(); 
00184         info = pvm_pkuint(&messageTimeStep, 1, 1);
00185         #ifdef PVM_DEBUG
00186                 if(info < 0){
00187                         cerr<<"TaskHolder: ERROR PACKING TIME STEP INTO MESSAGE; TASK ID = "<<thisTaskID<<endl;
00188                         return false;
00189                 }
00190         #endif//PVM_DEBUG
00191 
00192 
00193         //Add number specifying number of spikes that will be in message
00194         //if(messageTimeStep > 200)
00195         //      cout<<"TIME: "<<messageTimeStep<<" MESSAGE SPIKE COUNT: "<<messageSpikeCount[bufferCounter]<<endl;
00196         info = pvm_pkuint(&messageSpikeCount[bufferCounter], 1, 1);
00197         #ifdef PVM_DEBUG
00198                 if(info < 0){
00199                         cerr<<"TaskHolder: ERROR PACKING SPIKE COUNT INTO MESSAGE; TASK ID = "<<thisTaskID<<endl;
00200                         return false;
00201                 }
00202         #endif//PVM_DEBUG
00203 
00204         //Add up the number of spikes being sent
00205         #ifdef RECORD_STATISTICS
00206                 SpikeStreamSimulation::statistics.spikeTotal += messageSpikeCount[bufferCounter];
00207         #endif//RECORD_STATISTICS
00208 
00209         //Pack the spikes into the message
00210         for(vector<ConnectionHolder*>::iterator iter = spikeMessageBuffer[bufferCounter].begin(); iter != spikeMessageBuffer[bufferCounter].end(); ++iter){
00211                 info = pvm_pkuint((unsigned int*)(*iter)->connIDArray, (*iter)->numConnIDs, 1);//Pack both shorts as an int otherwise pvm treats shorts as ints
00212                 #ifdef PVM_DEBUG
00213                         if(info < 0){
00214                                 cerr<<"TaskHolder: ERROR PACKING SPIKES INTO MESSAGE; TASK ID = "<<thisTaskID<<endl;
00215                                 return false;
00216                         }
00217                 #endif//PVM_DEBUG
00218         }
00219 
00220 
00221         /* Send message to other tasks. 
00222                 Might be more than 1 task ID Use multicast to send the message to all of the other task IDs */
00223         //Output debugging information if required
00224         #ifdef TASK_DEBUG
00225                 cout<<"TaskHolder: Sending message to other tasks Destination task IDS (number of tasks = "<<numberOfTasks<<"): "<<endl;
00226                 for(int i=0; i<numberOfTasks; i++)
00227                         cout<<"\t"<<destinationTaskIDs[i]<<endl;
00228         #endif//TASK_DEBUG
00229 
00230         #ifdef SPIKE_DEBUG
00231                 cout<<"TaskHolder: Sending spike message to "<<primaryDestinationID<<"; Message time step = "<<messageTimeStep<<"; Number of spikes = "<<messageSpikeCount[bufferCounter]<<endl;
00232         #endif//SPIKE_DEBUG
00233 
00234         //Multicast the message to all tasks except this task
00235         info = pvm_mcast( destinationTaskIDs, numberOfTasks, SPIKE_LIST_MSG);
00236         #ifdef PVM_DEBUG
00237                 if(info < 0){
00238                         cerr<<"TaskHolder: ERROR MULTICASTING MESSAGE; TASK ID = "<<thisTaskID<<endl;
00239                         return false;
00240                 }
00241         #endif//PVM_DEBUG
00242 
00243 
00244         /* Update neurons in this task
00245                 mcast never sends messages to this task, so need to send a separate message to update the 
00246                 neurons in this task. */
00247         if(primaryDestinationID == thisTaskID){
00248                 info = pvm_send(primaryDestinationID, SPIKE_LIST_MSG);
00249                 #ifdef PVM_DEBUG
00250                         if(info < 0){
00251                                 cerr<<"TaskHolder: ERROR SENDING MESSAGE TO SELF; FROM TASK ID = "<<thisTaskID<<" TO TASK ID = "<<primaryDestinationID<<endl;
00252                                 return false;
00253                         }
00254                 #endif//PVM_DEBUG
00255         }
00256 
00257         /* Reset anything that needs to be reset about buffer */
00258         messageSpikeCount[bufferCounter] = 0; //Reset count of spikes in buffer
00259         spikeMessageBuffer[bufferCounter].clear(); //Empty vector of ConnectionHolder*s
00260         
00261         //Increase the buffer counter and make it circular
00262         ++bufferCounter;
00263         bufferCounter %= NUMBER_OF_DELAY_VALUES;
00264         
00265         return true;
00266 }
00267 
00268 
00269 /*! Allocates memory for all the spike vectors to save incrementally increasing their size
00270         This maximum number is likely to be close to the number of neurons in the task since each
00271         of these could add a connection holder for any point in time. */
00272 void TaskHolder::setMaxBufferSize(int maxBSize){
00273         maxBufferSize = maxBSize;
00274         for(int i=0; i<NUMBER_OF_DELAY_VALUES; i++){
00275                 //Reserve space in the vector
00276                 spikeMessageBuffer[i].reserve(maxBufferSize);
00277         }
00278 }
00279 
00280 

Generated on Mon Sep 3 22:24:34 2007 for SpikeStream Simulation by  doxygen 1.4.4