00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "TaskHolder.h"
00025 #include "Debug.h"
00026 #include "PVMMessages.h"
00027 #include "SimulationClock.h"
00028 #include "SpikeStreamSimulation.h"
00029
00030
00031 #include "pvm3.h"
00032 #include <iostream>
00033 using namespace std;
00034
00035
00036
00037 TaskHolder::TaskHolder(int thisTID, int destTID){
00038
00039 thisTaskID = thisTID;
00040 primaryDestinationID = destTID;
00041
00042
00043 numberOfTasks = 1;
00044
00045
00046
00047 destinationTaskIDs = new int[1];
00048 destinationTaskIDs[0] = primaryDestinationID;
00049
00050
00051 bufferCounter = 0;
00052
00053
00054 for(int i=0; i<NUMBER_OF_DELAY_VALUES; ++i)
00055 messageSpikeCount[i] = 0;
00056 }
00057
00058
00059
00060 TaskHolder::~TaskHolder(){
00061 #ifdef MEMORY_DEBUG
00062 cout<<"DELETING TASK HOLDER. TID: "<<thisTaskID<<endl;
00063 #endif//MEMORY_DEBUG
00064
00065
00066 if(numberOfTasks > 0)
00067 delete [] destinationTaskIDs;
00068 }
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078 void TaskHolder::addReceivingTask(int newTaskID){
00079
00080 for (int i=0; i<numberOfTasks; ++i){
00081 if (destinationTaskIDs[i] == newTaskID)
00082 return;
00083 }
00084
00085
00086 int *oldArray = destinationTaskIDs;
00087
00088
00089 destinationTaskIDs = new int[numberOfTasks + 1];
00090
00091
00092 for(int i=0; i<numberOfTasks; i++)
00093 destinationTaskIDs[i] = oldArray[i];
00094
00095
00096 destinationTaskIDs[numberOfTasks] = newTaskID;
00097
00098
00099 if(numberOfTasks > 0)
00100 delete [] oldArray;
00101
00102
00103 ++numberOfTasks;
00104 }
00105
00106
00107
00108 void TaskHolder::printBuffers(){
00109 cout<<"TaskHolder for task "<<thisTaskID<<"; Buffer counter = "<<bufferCounter<<"; Printing buffers.............."<<endl;
00110 for(int i=0; i<NUMBER_OF_DELAY_VALUES; ++i){
00111 if(messageSpikeCount[i] > 0){
00112 cout<<"Buffer "<<i<<": Size: "<<messageSpikeCount[i]<<"; "<<endl;
00113 for(vector<ConnectionHolder*>::iterator iter = spikeMessageBuffer[i].begin(); iter != spikeMessageBuffer[i].end(); ++iter){
00114 cout<<"\tConnection Holder. Delay="<<(*iter)->delay<<"; Size="<<(*iter)->numConnIDs;
00115
00116
00117 for(int j=0; j<(*iter)->numConnIDs * 2; j += 2)
00118 cout<<" from "<<(*iter)->connIDArray[j]<<" to "<<(*iter)->connIDArray[j+1]<<";";
00119 cout<<endl;
00120 }
00121 }
00122 }
00123 }
00124
00125
00126
00127 void TaskHolder::removeReceivingTask(int taskID){
00128
00129
00130 if(taskID == primaryDestinationID){
00131 cerr<<"TaskHolder.cpp: CANNOT REMOVE PRIMARY TASK. TRYING TO REMOVE: "<<taskID<<endl;
00132 return;
00133 }
00134
00135
00136 bool taskIDFound = false;
00137 for(int i=0; i<numberOfTasks; ++i)
00138 if(destinationTaskIDs[i] == taskID)
00139 taskIDFound = true;
00140
00141 if(!taskIDFound){
00142 cerr<<"TaskHolder: TRYING TO REMOVE RECEIVING TASK. TASK ID CANNOT BE FOUND. TID: "<<thisTaskID<<endl;
00143 return;
00144 }
00145
00146
00147 int *oldArray = destinationTaskIDs;
00148
00149
00150 destinationTaskIDs = new int[numberOfTasks - 1];
00151
00152
00153 int counter = 0;
00154 for(int i=0; i<numberOfTasks; ++i)
00155 if(destinationTaskIDs[i] != taskID){
00156 destinationTaskIDs[counter] = oldArray[i];
00157 ++counter;
00158 }
00159
00160
00161 delete [] oldArray;
00162 --numberOfTasks;
00163 }
00164
00165
00166
00167 bool TaskHolder::sendSpikeMessages(){
00168
00169 int info = pvm_initsend( PvmDataDefault );
00170 #ifdef PVM_DEBUG
00171 if(info < 0){
00172 cerr<<"TaskHolder: ERROR INITIALISING SEND BUFFER; TASK ID = "<<thisTaskID<<endl;
00173 return false;
00174 }
00175 #endif//PVM_DEBUG
00176
00177
00178
00179
00180
00181
00182
00183 unsigned int messageTimeStep = SpikeStreamSimulation::simulationClock->getTimeStep();
00184 info = pvm_pkuint(&messageTimeStep, 1, 1);
00185 #ifdef PVM_DEBUG
00186 if(info < 0){
00187 cerr<<"TaskHolder: ERROR PACKING TIME STEP INTO MESSAGE; TASK ID = "<<thisTaskID<<endl;
00188 return false;
00189 }
00190 #endif//PVM_DEBUG
00191
00192
00193
00194
00195
00196 info = pvm_pkuint(&messageSpikeCount[bufferCounter], 1, 1);
00197 #ifdef PVM_DEBUG
00198 if(info < 0){
00199 cerr<<"TaskHolder: ERROR PACKING SPIKE COUNT INTO MESSAGE; TASK ID = "<<thisTaskID<<endl;
00200 return false;
00201 }
00202 #endif//PVM_DEBUG
00203
00204
00205 #ifdef RECORD_STATISTICS
00206 SpikeStreamSimulation::statistics.spikeTotal += messageSpikeCount[bufferCounter];
00207 #endif//RECORD_STATISTICS
00208
00209
00210 for(vector<ConnectionHolder*>::iterator iter = spikeMessageBuffer[bufferCounter].begin(); iter != spikeMessageBuffer[bufferCounter].end(); ++iter){
00211 info = pvm_pkuint((unsigned int*)(*iter)->connIDArray, (*iter)->numConnIDs, 1);
00212 #ifdef PVM_DEBUG
00213 if(info < 0){
00214 cerr<<"TaskHolder: ERROR PACKING SPIKES INTO MESSAGE; TASK ID = "<<thisTaskID<<endl;
00215 return false;
00216 }
00217 #endif//PVM_DEBUG
00218 }
00219
00220
00221
00222
00223
00224 #ifdef TASK_DEBUG
00225 cout<<"TaskHolder: Sending message to other tasks Destination task IDS (number of tasks = "<<numberOfTasks<<"): "<<endl;
00226 for(int i=0; i<numberOfTasks; i++)
00227 cout<<"\t"<<destinationTaskIDs[i]<<endl;
00228 #endif//TASK_DEBUG
00229
00230 #ifdef SPIKE_DEBUG
00231 cout<<"TaskHolder: Sending spike message to "<<primaryDestinationID<<"; Message time step = "<<messageTimeStep<<"; Number of spikes = "<<messageSpikeCount[bufferCounter]<<endl;
00232 #endif//SPIKE_DEBUG
00233
00234
00235 info = pvm_mcast( destinationTaskIDs, numberOfTasks, SPIKE_LIST_MSG);
00236 #ifdef PVM_DEBUG
00237 if(info < 0){
00238 cerr<<"TaskHolder: ERROR MULTICASTING MESSAGE; TASK ID = "<<thisTaskID<<endl;
00239 return false;
00240 }
00241 #endif//PVM_DEBUG
00242
00243
00244
00245
00246
00247 if(primaryDestinationID == thisTaskID){
00248 info = pvm_send(primaryDestinationID, SPIKE_LIST_MSG);
00249 #ifdef PVM_DEBUG
00250 if(info < 0){
00251 cerr<<"TaskHolder: ERROR SENDING MESSAGE TO SELF; FROM TASK ID = "<<thisTaskID<<" TO TASK ID = "<<primaryDestinationID<<endl;
00252 return false;
00253 }
00254 #endif//PVM_DEBUG
00255 }
00256
00257
00258 messageSpikeCount[bufferCounter] = 0;
00259 spikeMessageBuffer[bufferCounter].clear();
00260
00261
00262 ++bufferCounter;
00263 bufferCounter %= NUMBER_OF_DELAY_VALUES;
00264
00265 return true;
00266 }
00267
00268
00269
00270
00271
00272 void TaskHolder::setMaxBufferSize(int maxBSize){
00273 maxBufferSize = maxBSize;
00274 for(int i=0; i<NUMBER_OF_DELAY_VALUES; i++){
00275
00276 spikeMessageBuffer[i].reserve(maxBufferSize);
00277 }
00278 }
00279
00280