Main Page | Namespace List | Alphabetical List | Class List | Directories | File List | Class Members | File Members

UDPSynchronizedClient.cpp

Go to the documentation of this file.
00001 /***************************************************************************
00002  *   SpikeStream Simulation                                                *
00003  *   Copyright (C) 2007 by David Gamez                                     *
00004  *   david@davidgamez.eu                                                   *
00005  *   Version 0.1                                                           *
00006  *                                                                         *
00007  *   This program is free software; you can redistribute it and/or modify  *
00008  *   it under the terms of the GNU General Public License as published by  *
00009  *   the Free Software Foundation; either version 2 of the License, or     *
00010  *   (at your option) any later version.                                   *
00011  *                                                                         *
00012  *   This program is distributed in the hope that it will be useful,       *
00013  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00014  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
00015  *   GNU General Public License for more details.                          *
00016  *                                                                         *
00017  *   You should have received a copy of the GNU General Public License     *
00018  *   along with this program; if not, write to the                         *
00019  *   Free Software Foundation, Inc.,                                       *
00020  *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.             *
00021  ***************************************************************************/
00022 
00023 //Reentrant needs to be defined for threads
00024 #ifndef _REENTRANT
00025         #define _REENTRANT
00026 #endif
00027 
00028 //SpikeStream includes
00029 #include "UDPSynchronizedClient.h"
00030 #include "SpikeStreamSimulation.h"
00031 
00032 /* Thread includes. Note that this needs to come after the 
00033         SpikeStream includes to avoid strange error about int and braces */
00034 #include <pthread.h>
00035 
00036 //Other includes
00037 #include <errno.h>
00038 #include <math.h>
00039 
00040 
00041 /*! Size of the receive buffer used to unpack spikes from the network. */
00042 #define MESSAGE_BUFFER_SIZE 1048576
00043 
00044 /*! Size of the array used to store received spikes.
00045         This has enough space for more than 5000 spikes, each of which takes 
00046         12 bytes to store. */
00047 #define SPIKE_BUFFER_SIZE 60000
00048 
00049 /*! Step counter is a 15 bit number whose max value is defined here. */
00050 #define STEP_COUNTER_MAX 32768
00051 
00052 /*! Maximum time that we will slow down to accommodate external device 
00053         when no messages have been received. */
00054 #define MAX_EXTERNAL_COMPUTE_TIME_US 500000
00055 
00056 
00057 /*! Have to define mutex here because of define that is used to 
00058         initialise it. */
00059 pthread_mutex_t threadMutex = PTHREAD_MUTEX_INITIALIZER;
00060 
00061 
00062 /*! Function that thread for this class calls when it starts running
00063         This function calls the run method of the class. */
00064 void* startThreadFunction(void* udpSyncClient){
00065         ((UDPSynchronizedClient*)udpSyncClient)->run();
00066 }
00067 
00068 
00069 /*! Constructor. */
00070 UDPSynchronizedClient::UDPSynchronizedClient(unsigned int neurGrpWidth, unsigned int neurGrpLength){
00071         //Store details about neuron group
00072         neuronGrpWidth = neurGrpWidth;
00073         neuronGrpLength = neurGrpLength;
00074 
00075         //Initialise variables
00076         socketOpen = false;
00077         threadRunning = false;
00078         externalSyncDelay = false;
00079         externalComputeTime_us = 0;
00080         timeStepsNoMessagesCount = 0;
00081 
00082         //Set up array to hold spikes
00083         spikeCount = 0;
00084         spikeBuffer = new unsigned int[SPIKE_BUFFER_SIZE];
00085 
00086         //Set up buffer to receive messages from the network
00087         messageBuffer = new unsigned char[MESSAGE_BUFFER_SIZE];
00088 }
00089 
00090 
00091 /*! Destructor. */
00092 UDPSynchronizedClient::~UDPSynchronizedClient(){
00093         #ifdef MEMORY_DEBUG
00094                 cout<<"DESTROYING UDP SYNCHRONIZATION CLIENT"<<endl;
00095         #endif//MEMORY_DEBUG
00096 
00097         //Close socket and terminate thread if necessary
00098         closeDevice();
00099 
00100         //Free memory for spike buffer and message buffer
00101         delete [] spikeBuffer;
00102         delete [] messageBuffer;
00103 }
00104 
00105 
00106 //-----------------------------------------------------------------------------
00107 //--------------------------- PUBLIC METHODS ----------------------------------
00108 //-----------------------------------------------------------------------------
00109 
00110 /*! Returns true if the thread is still in its run method and in the state of 
00111         receiving UDP data. */
00112 bool UDPSynchronizedClient::clientThreadRunning(){
00113         return threadRunning;
00114 }
00115 
00116 
00117 /*! Closes the socket and/or stops the thread. */
00118 bool UDPSynchronizedClient::closeDevice(){
00119         //Create bool to record close errors
00120         bool closeOk = true;
00121 
00122         //Stop the run method
00123         threadRunning = false;
00124 
00125         //Wait for thread to rejoin
00126         void *thread_result;
00127         int joinResult = pthread_join(clientThread, &thread_result);
00128         if(joinResult != 0){
00129                 SpikeStreamSimulation::systemError("UDPSynchronizedClient: Thread join failed");
00130                 //Better not to return here so that socket can be closed before returning false
00131                 closeOk = false;                
00132         }
00133         else{
00134                 SpikeStreamSimulation::systemInfo("UDPSynchronizedClient: Thread successfully rejoined");
00135         }
00136 
00137         //Close the socket
00138         close(socketHandle);
00139         socketOpen = false;
00140 
00141         //Return the outcome
00142         return closeOk;
00143 }
00144 
00145 
00146 /*! Returns true if socket is open or thread is running. */
00147 bool UDPSynchronizedClient::deviceOpen(){
00148         if(socketOpen || threadRunning)
00149                 return true;
00150         return false;
00151 }
00152 
00153 
00154 /*! This method is called at each time step to get the current 
00155         external compute time. It increases a counter each time it is
00156         called which is used to increase the external compute time
00157         in the absence of any messages from the other processes,
00158         up to a maximum. This prevents the external compute time
00159         remaining at a low value when there is no external input. */
00160 unsigned int UDPSynchronizedClient::getExternalComputeTime_us(){
00161         //Increase the record of time steps without messages
00162         ++timeStepsNoMessagesCount;
00163         if(timeStepsNoMessagesCount <= 10)
00164                 return externalComputeTime_us;//We are roughly in sync give or take a few lost messages
00165 
00166         //Have not received a message for more than 10 time steps, so want to slow down a bit
00167         unsigned int newExternalComputeTime_us = timeStepsNoMessagesCount * externalComputeTime_us;
00168         if(newExternalComputeTime_us < MAX_EXTERNAL_COMPUTE_TIME_US)//Don't want to stop completely
00169                 return newExternalComputeTime_us;
00170         return MAX_EXTERNAL_COMPUTE_TIME_US;
00171 }
00172 
00173 
00174 /*! Returns true if the external device is delaying itself. */
00175 bool UDPSynchronizedClient::getExternalSyncDelay(){
00176         return externalSyncDelay;
00177 }
00178 
00179 
00180 /*! Used by external class to lock this thread's mutex and prevent it
00181         accessing shared data. */
00182 void UDPSynchronizedClient::lockMutex(){
00183         pthread_mutex_lock( &threadMutex );
00184 }
00185 
00186 
00187 /*! Opens a socket to receive spikes
00188         Adapted from http://ntrg.cs.tcd.ie/undergrad/4ba2/multicast/antony/index.html. */
00189 bool UDPSynchronizedClient::openSocket(string groupAddress, int port){
00190     struct ip_mreq multicastRequest;
00191     struct sockaddr_in multicastAddr; /* Multicast Address */
00192 
00193     /* Create a best-effort datagram socket using UDP */
00194     if ((socketHandle = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0){
00195                 SpikeStreamSimulation::systemError("UDPSynchronizedClient: ERROR IN socket");
00196                 return false;
00197         }
00198 
00199     /* Allow multiple sockets to use the same PORT number */
00200         unsigned int multipleListeners = 1; 
00201     if (setsockopt(socketHandle, SOL_SOCKET, SO_REUSEADDR, &multipleListeners, sizeof(multipleListeners)) < 0) {
00202                 SpikeStreamSimulation::systemError("UDPSynchronizedClient: ERROR Reusing ADDR failed");
00203                 return false;
00204         }
00205 
00206         // Set up socket to timeout after half a second so that it can be shut down easily
00207         struct timeval tmout;
00208         tmout.tv_sec = 0;
00209         tmout.tv_usec = 500000;
00210     if (setsockopt(socketHandle, SOL_SOCKET, SO_RCVTIMEO, &tmout, sizeof(tmout)) < 0) {
00211                 SpikeStreamSimulation::systemError("UDPSynchronizedClient: ERROR SETTING SOCKET TIMEOUT ");
00212                 return false;
00213         }
00214 
00215     /* Construct bind structure */
00216     memset(&multicastAddr, 0, sizeof(multicastAddr));   /* Zero out structure */
00217     multicastAddr.sin_family = AF_INET;                 /* Internet address family */
00218     multicastAddr.sin_addr.s_addr = htonl(INADDR_ANY);  /* Any incoming interface */
00219     multicastAddr.sin_port = htons(port);      /* Multicast port */
00220 
00221     /* Bind to the multicast port */
00222         //FIXME IS THIS REALLY NECESSARY?
00223     if (bind(socketHandle, (struct sockaddr *) &multicastAddr, sizeof(multicastAddr)) < 0){
00224                 SpikeStreamSimulation::systemError("UDPSynchronizedClient: ERROR IN bind");
00225                 return false;
00226         }
00227 
00228     /* Specify the multicast group */
00229     multicastRequest.imr_multiaddr.s_addr = inet_addr(groupAddress.data());
00230 
00231     /* Accept multicast from any interface */
00232     multicastRequest.imr_interface.s_addr = htonl(INADDR_ANY);
00233 
00234     /* Join the multicast address */
00235     if (setsockopt(socketHandle, IPPROTO_IP, IP_ADD_MEMBERSHIP, (void *) &multicastRequest, sizeof(multicastRequest)) < 0){
00236                 SpikeStreamSimulation::systemError("UDPSynchronizedClient: ERROR JOINING MULTICAST");
00237                 return false;
00238         }
00239 
00240         //Socket has been successfully opened
00241         socketOpen = true;
00242         return true;
00243 }
00244 
00245 
00246 /*! Run method for this class, called by the thread that is started. */
00247 void UDPSynchronizedClient::run(){
00248         //Only start run method if the socket is open
00249         if(!socketOpen){
00250                 threadRunning = false;
00251                 return;
00252         }
00253 
00254         //Set up variables for use in this method
00255         unsigned int interMessageTime_us = 0, externalStepCtr = 0, oldExternalStepCtr = 0;
00256         timeval currentTimeStruct, oldTimeStruct;
00257         oldTimeStruct.tv_sec = 0;
00258         oldTimeStruct.tv_usec = 0;
00259 
00260         bool firstMessage = true;
00261         unsigned int addressLength = sizeof(socketAddress);
00262 
00263         cout<<"UDPSynchronizedClient: Thread entering main loop"<<endl;
00264 
00265         //Main run loop
00266         while(threadRunning){
00267                 //Receive from network
00268                 int numBytes= recvfrom(socketHandle, messageBuffer, MESSAGE_BUFFER_SIZE, 0, (struct sockaddr *) &(socketAddress), &addressLength);
00269                 if(numBytes < 2) {//Minimum of two bytes in this synchronization method
00270                         if(errno == EAGAIN || errno == EWOULDBLOCK){
00271                                 #ifdef RECEIVE_DEVICE_DATA_DEBUG
00272                                         SpikeStreamSimulation::systemInfo("UDPSynchronizedClient thread: Socket timed out");
00273                                 #endif//RECEIVE_DEVICE_DATA_DEBUG
00274                         }
00275                         else{
00276                                 SpikeStreamSimulation::systemError("UDPSynchronizedClient: ERROR IN recvfrom EXITING RUN METHOD.");
00277                                 threadRunning = false;
00278                         }
00279                 }
00280                 else{//Greater than or equal to 2 bytes received
00281                         //Reset the counter recording time steps without messages
00282                         timeStepsNoMessagesCount = 0;
00283 
00284                         #ifdef RECEIVE_DEVICE_DATA_DEBUG
00285                                 SpikeStreamSimulation::systemInfo("UDPSynchronizedClient: Message received");
00286                         #endif//RECEIVE_DEVICE_DATA_DEBUG
00287 
00288                         //Lock mutex so that device manager cannot change or read buffer whilst it is being filled with spikes
00289                         pthread_mutex_lock( &threadMutex );
00290 
00291                         //Record time at which message arrived
00292                         gettimeofday(&currentTimeStruct, NULL);
00293                         if(firstMessage)//This is the first message that has been received
00294                                 oldTimeStruct = currentTimeStruct;
00295 
00296                         //Run a check on the packet. Should be a multiple of 4 bytes + 2 synchronization bytes
00297                         if( (numBytes - 2) % 4 != 0){
00298                                 SpikeStreamSimulation::systemError("UDPSynchronizedClient: INCORRECT NUMBER OF BYTES IN SPIKE PACKET ", numBytes);
00299                                 threadRunning = false;
00300                                 return;
00301                         }
00302 
00303                         //Each event is 4 bytes long and starts with the X and Y address and time difference
00304                         //Each packet starts with two bytes containing the synchronization information
00305                         //Check to see if this number of spikes will fit into buffer
00306                         if(spikeCount + (( numBytes - 2 ) / 4 ) > ( SPIKE_BUFFER_SIZE / 12 )){//Each spike is 12 bytes with X, Y and time delay
00307                                 cout<<"UDPSynchronizedClient: Shared memory for spike buffer is full. Resetting spike count. SpikeCount = "<<spikeCount<<"; numBytes/6 = "<<(numBytes/6)<<"; Max spikes = "<<((SPIKE_BUFFER_SIZE - 4) / 8)<<endl;
00308                                 spikeCount = 0;
00309                         }
00310 
00311                         #ifdef RECEIVE_DEVICE_DATA_DEBUG
00312                                 cout<<"UDPSynchronizedClient: Message contains "<<numBytes<<" bytes"<<endl;
00313                         #endif//RECEIVE_DEVICE_DATA_DEBUG
00314 
00315                         //Read synchronization information
00316                         //Read delay flag. This is the last bit of the second byte
00317                         if(messageBuffer[0] & 1)
00318                                 externalSyncDelay = true;
00319                         else
00320                                 externalSyncDelay = false;
00321 
00322                         //Read in step counter. First position is the LSB + delay, second position is the MSB
00323                         externalStepCtr = (unsigned char)messageBuffer[1];//Load MSB
00324                         externalStepCtr <<= 8;
00325                         externalStepCtr += (unsigned char)messageBuffer[0];//Add LSB + delay flag
00326                         externalStepCtr >>= 1;//Get rid of delay flag
00327 
00328                         //If first time, old external step ctr has to be set the same
00329                         if(firstMessage){
00330                                 oldExternalStepCtr = externalStepCtr;
00331                                 firstMessage = false;
00332                         }
00333 
00334                         //Work out counter change and throw away out of sequence messages
00335                         int stepCtrChange = 0;
00336                         if(externalStepCtr > oldExternalStepCtr)
00337                                 stepCtrChange = externalStepCtr - oldExternalStepCtr;
00338                         else if(oldExternalStepCtr - externalStepCtr > 30000)//Counter has probably gone round
00339                                 stepCtrChange = STEP_COUNTER_MAX - oldExternalStepCtr +  externalStepCtr;
00340                         oldExternalStepCtr = externalStepCtr;//Have extracted the change, so record old value
00341 
00342                         if(stepCtrChange > 0){//External device has increased its step
00343                                 //Calculate time between messages
00344                                 interMessageTime_us = 1000000 * (currentTimeStruct.tv_sec - oldTimeStruct.tv_sec) + ( currentTimeStruct.tv_usec - oldTimeStruct.tv_usec);
00345                                 oldTimeStruct = currentTimeStruct;
00346 
00347                                 //Calculate time of external time step
00348                                 externalComputeTime_us = (unsigned int)rint((double)interMessageTime_us / (double)stepCtrChange);
00349 
00350                                 //Read spikes from message, starting from the third byte
00351                                 for(int i=2; i < numBytes; i += 4){
00352 
00353                                         //Check X and Y are within the range of this neuron group
00354                                         if(messageBuffer[i] > neuronGrpWidth){
00355                                                 SpikeStreamSimulation::systemError("UDPSynchronizedClient: X POSITION OUT OF RANGE", messageBuffer[i]);
00356                                                 threadRunning = false;
00357                                                 return;
00358                                         }
00359                                         if(messageBuffer[i+1] > neuronGrpLength){
00360                                                 SpikeStreamSimulation::systemError("UDPSynchronizedClient: Y POSITION OUT OF RANGE", messageBuffer[i+1]);
00361                                                 threadRunning = false;
00362                                                 return;
00363                                         }
00364 
00365                                         //Store X and Y in the spike buffer
00366                                         spikeBuffer[spikeCount * 3] = messageBuffer[i];//X position
00367                                         spikeBuffer[spikeCount * 3 + 1] = messageBuffer[i+1]; // Y position
00368         
00369                                         /*Next two bytes are the timestamp.
00370                                                 This is in units of 0.01 ms
00371                                                 MSB is second position  */
00372                                         unsigned int time = messageBuffer[i+3];//MSB
00373                                         time <<= 8;
00374                                         time += messageBuffer[i+2];//LSB
00375 
00376                                         //FIXME COME UP WITH SOMETHING BETTER HERE LINKED TO SIMULATION TIME OR SOMETHING
00377                                         unsigned int delay = time / 10;//Convert to 0.1 millisec sections
00378                                         if(delay > NUMBER_OF_DELAY_VALUES)
00379                                                 spikeBuffer[spikeCount * 3 + 2] = 0;
00380                                         else
00381                                                 spikeBuffer[spikeCount * 3 + 2] = delay;
00382         
00383                                         //printf("Delay = %d \n", spikeBuffer[(*spikeCount) * 3 + 2]);
00384         
00385                                         //Increase the record of the number of spikes in buffer
00386                                         ++spikeCount;
00387                                 }
00388                         }
00389 
00390                         //Unlock mutex so that device manager can load spikes
00391                         pthread_mutex_unlock( &threadMutex );
00392                 }
00393         }
00394         SpikeStreamSimulation::systemInfo("UDPSynchronizedClient: Exiting spike client thread");
00395 }
00396 
00397 
00398 /*! Starts the thread running and listening for messages. */
00399 void UDPSynchronizedClient::start(){
00400         //Set running to true for main while loop
00401         threadRunning = true;
00402 
00403         //Start thread with reference to this class so that it can store spikes
00404         int res = pthread_create(&clientThread, NULL, startThreadFunction, (void*)this);
00405         if(res != 0){
00406                 SpikeStreamSimulation::systemError("UDPSynchronizedClient: ERROR CREATING CLIENT THREAD");
00407         }
00408 }
00409 
00410 
00411 /*! Stops the thread from running. */
00412 void UDPSynchronizedClient::stop(){
00413         threadRunning = false;
00414 }
00415 
00416 
00417 /*! Used by external applications to unlock the thread's mutex. */
00418 void UDPSynchronizedClient::unlockMutex(){
00419         pthread_mutex_unlock( &threadMutex );
00420 }
00421 
00422 

Generated on Mon Sep 3 22:24:34 2007 for SpikeStream Simulation by  doxygen 1.4.4