Main Page | Namespace List | Alphabetical List | Class List | Directories | File List | Class Members | File Members

TCPSynchronizedClient.cpp

Go to the documentation of this file.
00001 /***************************************************************************
00002  *   SpikeStream Simulation                                                *
00003  *   Copyright (C) 2007 by David Gamez                                     *
00004  *   david@davidgamez.eu                                                   *
00005  *   Version 0.1                                                           *
00006  *                                                                         *
00007  *   This program is free software; you can redistribute it and/or modify  *
00008  *   it under the terms of the GNU General Public License as published by  *
00009  *   the Free Software Foundation; either version 2 of the License, or     *
00010  *   (at your option) any later version.                                   *
00011  *                                                                         *
00012  *   This program is distributed in the hope that it will be useful,       *
00013  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00014  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
00015  *   GNU General Public License for more details.                          *
00016  *                                                                         *
00017  *   You should have received a copy of the GNU General Public License     *
00018  *   along with this program; if not, write to the                         *
00019  *   Free Software Foundation, Inc.,                                       *
00020  *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.             *
00021  ***************************************************************************/
00022 
00023 //Reentrant needs to be defined for threads
00024 #ifndef _REENTRANT
00025         #define _REENTRANT
00026 #endif
00027 
00028 //SpikeStream includes
00029 #include "TCPSynchronizedClient.h"
00030 #include "SpikeStreamSimulation.h"
00031 #include "DeviceMessages.h"
00032 #include "PerformanceTimer.h"
00033 
00034 /* Thread includes. Note that this needs to come after the 
00035         SpikeStream includes to avoid strange error about int and braces */
00036 #include <pthread.h>
00037 
00038 
00039 /*! Size of the array used to store received spikes.
00040         This has enough space for more than 5000 spikes, each of which takes 
00041         12 bytes to store.              return false;*/
00042 #define SPIKE_BUFFER_SIZE 60000
00043 
00044 
00045 /*! Size of the receive buffer used to unpack spikes from the network. */
00046 #define MESSAGE_BUFFER_SIZE 1048576
00047 
00048 
00049 /*! Function that is called in a separate thread to connect to the socket. */
00050 void* startTCPClientConnectThread(void* tmpTcpSyncClient){
00051         //Connect to the external device.
00052         TCPSynchronizedClient* tcpSyncClient = (TCPSynchronizedClient*) tmpTcpSyncClient;//Local copy of reference to class for clarity
00053     if (connect(tcpSyncClient->socketHandle, (struct sockaddr *) &tcpSyncClient->externalDevAddr, sizeof(tcpSyncClient->externalDevAddr)) < 0){
00054         SpikeStreamSimulation::systemError("TCPSynchronizedClient: CONNECTION TO SERVER FAILED");
00055         }
00056         else{//Successful connection to socket
00057                 tcpSyncClient->socketConnected = true;
00058         }
00059 }
00060 
00061 
00062 /*! Constructor. */
00063 TCPSynchronizedClient::TCPSynchronizedClient(unsigned int neurGrpWidth, unsigned int neurGrpLength){
00064         //Store details about neuron group
00065         neuronGrpWidth = neurGrpWidth;
00066         neuronGrpLength = neurGrpLength;
00067         
00068         //Initialise variables
00069         socketOpen = false;
00070         twoByteCoordinates = false;
00071 
00072         //Set up array to hold spikes
00073         spikeCount = 0;
00074         spikeBuffer = new unsigned int[SPIKE_BUFFER_SIZE];
00075 
00076         //Set up message buffer
00077         msgBuffer = new unsigned char[MESSAGE_BUFFER_SIZE];
00078 
00079         //Set up array holding confirmation that device data has been received
00080         requestDataMsgArray[0] = SPIKESTREAM_DATA_ACK_MSG;
00081 }
00082 
00083 
00084 /*! Destructor. */
00085 TCPSynchronizedClient::~TCPSynchronizedClient(){
00086         #ifdef MEMORY_DEBUG
00087                 cout<<"DESTROYING TCP SYNCHRONIZATION CLIENT"<<endl;
00088         #endif//MEMORY_DEBUG
00089 
00090         //Close socket
00091         closeDevice();
00092 
00093         //Free memory for thread spike buffer
00094         delete [] spikeBuffer;
00095         delete [] msgBuffer;
00096 }
00097 
00098 
00099 //---------------------------------------------------------------------------
00100 //----------------------------- PUBLIC METHODS ------------------------------
00101 //---------------------------------------------------------------------------
00102 
00103 /*! Closes the socket. */
00104 bool TCPSynchronizedClient::closeDevice(){
00105         if(socketOpen){
00106                 close(socketHandle);
00107                 socketOpen = false;
00108         }
00109         return true;
00110 }
00111 
00112 
00113 /*! Returns true if the socket is open. */
00114 bool TCPSynchronizedClient::deviceOpen(){
00115         return socketOpen;
00116 }
00117 
00118 
00119 /*! Waits for a message containing data from the external device and unpacks the data. */
00120 bool TCPSynchronizedClient::fetchData(){
00121         /* Check that socket is still open. An error in the receive will set socketOpen
00122                 to false without attempting to close the socket */
00123         if(!socketOpen)
00124                 return false;
00125 
00126         //Initialise spike count
00127         spikeCount = 0;
00128 
00129         #ifdef RECEIVE_DEVICE_DATA_DEBUG
00130                 cout<<"TCPSynchronizedClient: Waiting for data ..."<<endl;
00131         #endif//RECEIVE_DEVICE_DATA_DEBUG
00132 
00133     /* Receive the data back from the server */
00134         //Receive the first four bytes specifying the number of spikes in the message
00135         //Receive these 1 at a time since am not sure if they will all come at once
00136         unsigned char messageSizeArray[1];
00137         unsigned int numberOfMsgSpikes = 0;
00138     unsigned int totalBytesReceived = 0, bytesReceived = 0;
00139     while (totalBytesReceived < 4){
00140 
00141         /* Receive up to the buffer size (minus 1 to leave space for
00142            a null terminator) bytes from the sender */
00143         if ((bytesReceived = recv(socketHandle, messageSizeArray, 1, 0)) <= 0){
00144                         SpikeStreamSimulation::systemError("TCPSynchronizedClient: ERROR RECEIVNG SIZE OF MESSAGE");
00145                         socketOpen = false;
00146                         return false;
00147                 }
00148 
00149                 //Assemble the count of the number of message spikes in the message
00150                 unsigned int tmpPartMsgSpikeCount = messageSizeArray[0];
00151                 numberOfMsgSpikes += tmpPartMsgSpikeCount << 8*totalBytesReceived;//Shift to correct position in the number
00152 
00153         // Keep track of total bytes 
00154         totalBytesReceived += bytesReceived;   
00155     }
00156 
00157         #ifdef RECEIVE_DEVICE_DATA_DEBUG
00158                 cout<<"TCPSynchronizedClient: Data received. Number of spikes in message = "<<numberOfMsgSpikes<<endl;
00159         #endif//RECEIVE_DEVICE_DATA_DEBUG
00160 
00161         /* Now receive the rest of the message
00162                 Each receive call may contain all of the message or only part of it */
00163         unsigned int msgBufferSize = 4 * numberOfMsgSpikes;
00164         unsigned char *msgBufferPtr = msgBuffer;
00165         totalBytesReceived = 0;
00166         bytesReceived = 0;
00167     while (totalBytesReceived < msgBufferSize ){
00168                 //Receive all or part of the spike data
00169                 if ((bytesReceived = recv(socketHandle, msgBufferPtr, msgBufferSize - totalBytesReceived, 0)) <= 0){
00170                         SpikeStreamSimulation::systemError("TCPSynchronizedClient: ERROR WHEN RECEIVING MAIN MESSAGE DATA");
00171                         socketOpen = false;
00172                         return false;
00173                 }
00174 
00175                 //Add up number of bytes received
00176                 totalBytesReceived += bytesReceived;
00177 
00178                 //Increase the pointer to the message buffer array so that it points to empty part of it
00179                 msgBufferPtr += bytesReceived;
00180         }
00181 
00182         //Print out the message buffer for debug
00183         #ifdef PRINT_RECEIVE_DEVICE_DATA_MSG
00184                 printMessageBuffer(msgBuffer, msgBufferSize);
00185         #endif//PRINT_RECEIVE_DEVICE_DATA_MSG
00186 
00187         //Unpack spikes
00188         for(unsigned int i=0; i < msgBufferSize; i += 4){
00189                 if(twoByteCoordinates){//The X and Y positions of each spike are encoded by two bytes.
00190                         //Stuff for two byte conversion
00191                         unsigned int xVal = msgBuffer[i+1] & 255;
00192                         xVal <<= 8;
00193                         xVal += msgBuffer[i] & 255;
00194         
00195                         unsigned int yVal = msgBuffer[i+3] & 255;
00196                         yVal <<= 8;
00197                         yVal += msgBuffer[i + 2] & 255;
00198 
00199                         //Check X and Y are within the range of this neuron group
00200                         if(xVal > neuronGrpWidth){
00201                                 SpikeStreamSimulation::systemError("TCPSynchronizedClient: X POSITION OUT OF RANGE ", xVal);
00202                                 //Leave socket open since this is ok, but return false because of data error
00203                                 return false;
00204                         }
00205                         if(yVal > neuronGrpLength){
00206                                 SpikeStreamSimulation::systemError("TCPSynchronizedClient: Y POSITION OUT OF RANGE ", yVal);
00207                                 //Leave socket open since this is ok, but return false because of data error
00208                                 return false;
00209                         }
00210         
00211                         //Store X and Y position in spike buffer
00212                         spikeBuffer[spikeCount * 3] =  xVal;//X position
00213                         spikeBuffer[spikeCount * 3 + 1] = yVal; // Y position
00214         
00215                         //Set delay even if we are not using it.
00216                         spikeBuffer[spikeCount * 3 + 2] = 0;
00217                 }
00218                 else{//The X and Y positions of each spike are encoded by a single byte
00219                         //Check X and Y are within the range of this neuron group
00220                         if(msgBuffer[i] > neuronGrpWidth){
00221                                 SpikeStreamSimulation::systemError("TCPSynchronizedClient: X POSITION OUT OF RANGE ", msgBuffer[i]);
00222                                 //Leave socket open since this is ok, but return false because of data error
00223                                 return false;
00224                         }
00225                         if(msgBuffer[i+1] > neuronGrpLength){
00226                                 SpikeStreamSimulation::systemError("TCPSynchronizedClient: Y POSITION OUT OF RANGE ", msgBuffer[i+1]);
00227                                 //Leave socket open since this is ok, but return false because of data error
00228                                 return false;
00229                         }
00230 
00231                         //Store X and Y position in spike buffer
00232                         spikeBuffer[spikeCount * 3] =  msgBuffer[i];//X position
00233                         spikeBuffer[spikeCount * 3 + 1] = msgBuffer[i+1]; // Y position
00234 
00235                         /*Next two bytes are the timestamp.
00236                                 This is in units of 0.01 ms
00237                                 MSB is second position  */
00238                         unsigned int time = msgBuffer[i+3];//MSB
00239                         time <<= 8;
00240                         time += msgBuffer[i+2];//LSB
00241         
00242                         //FIXME COME UP WITH SOMETHING BETTER HERE LINKED TO SIMULATION TIME OR SOMETHING
00243                         unsigned int delay = time / 10;//Convert to 0.1 millisec sections
00244                         if(delay > NUMBER_OF_DELAY_VALUES)
00245                                 spikeBuffer[spikeCount * 3 + 2] = 0;
00246                         else
00247                                 spikeBuffer[spikeCount * 3 + 2] = delay;
00248                 }
00249 
00250                 //Increase the record of the number of spikes in buffer
00251                 ++spikeCount;
00252         }
00253 
00254         #ifdef RECEIVE_DEVICE_DATA_DEBUG
00255                 cout<<"TCPSynchronizedClient: Spikes unpacked into SpikeBuffer"<<endl;
00256         #endif//RECEIVE_DEVICE_DATA_DEBUG
00257 
00258 
00259         /* Send a message requesting data to the device */
00260     if (send(socketHandle, requestDataMsgArray, 1, 0) != 1){
00261         SpikeStreamSimulation::systemError("TCPSynchronizedClient: SENDING OF REQUEST_DEVICE_DATA_MSG TO SERVER FAILED");
00262                 socketOpen = false;
00263                 return false;
00264         }
00265 
00266         //cout<<"-------------- DONE ----------------"<<endl;
00267 
00268         //If we have reached this point everything should be ok
00269         return true;
00270 }
00271 
00272 
00273 /*! Opens the TCP socket.
00274         The connection to the socket is handled by a separate thread so that a failed connection can be
00275         timed out.
00276         Adapated from: http://cs.baylor.edu/~donahoo/practical/CSockets/textcode.html. */
00277 bool TCPSynchronizedClient::openSocket(string externalDeviceIPAddr, int externalDevicePort){
00278         cout<<"TCPSynchronizedClient: Connecting to "<<externalDeviceIPAddr<<" on port "<<externalDevicePort<<endl;
00279 
00280     /* Create a reliable, stream socket using TCP */
00281         socketHandle = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
00282     if (socketHandle < 0){
00283         SpikeStreamSimulation::systemError("TCPSynchronizedClient: ERROR CREATING SOCKET");
00284                 return false;
00285         }
00286 
00287     /* Construct the server address structure */
00288     memset(&externalDevAddr, 0, sizeof(externalDevAddr));     /* Zero out structure */
00289     externalDevAddr.sin_family      = AF_INET;             /* Internet address family */
00290     externalDevAddr.sin_addr.s_addr = inet_addr(externalDeviceIPAddr.data());   /* Server IP address */
00291     externalDevAddr.sin_port        = htons(externalDevicePort); /* Server port */
00292 
00293 
00294     /* Establish the connection to the server. Do this as a separate thread so it can be timed out.*/
00295         //Socket connected is used to check whether we have the connection established
00296         socketConnected = false;
00297 
00298         //Declare thread to make connection
00299         pthread_t connectThread;
00300         int res = pthread_create(&connectThread, NULL, startTCPClientConnectThread, (void*)this);
00301         if(res != 0){
00302                 SpikeStreamSimulation::systemError("TCPSynchronizedClient: ERROR CREATING CONNECTION THREAD");
00303                 return false;
00304         }
00305 
00306         //Wait until connection has been established or until we have timed out
00307         int timeoutCount = 0;
00308         while(!socketConnected){
00309 
00310                 //Sleep while waiting for thread to connect to socket.
00311                 sleep(1);
00312 
00313                 //Check to see if we have been sleeping for too long
00314                 ++timeoutCount;
00315                 if(timeoutCount >= 5){
00316                         //No point in waiting for the thread to rejoin since it is locked. Just display error and return false.
00317                         SpikeStreamSimulation::systemError("TCPSynchronizedClient: Timeout while waiting for thread to connect to device");
00318                         return false;
00319                 }
00320         }
00321 
00322 
00323         //If we have got this far, socket is open
00324         socketOpen = true;
00325         return true;
00326 }
00327 
00328 
00329 /*! Vision data is potentially larger than 255, so need to use two bytes to encode
00330         the coordinates within a vision map, which replaces the timing information. */
00331 void TCPSynchronizedClient::setTwoByteCoords(bool twoBtCds){
00332         twoByteCoordinates = twoBtCds;
00333 }
00334 
00335 
00336 //-------------------------------------------------------------------------------
00337 //---------------------------- PRIVATE METHODS ----------------------------------
00338 //-------------------------------------------------------------------------------
00339 
00340 /*! Prints out the message buffer received from the external device
00341         for debugging. */
00342 void TCPSynchronizedClient::printMessageBuffer(unsigned char* msgBuffer, unsigned int msgBufSize){
00343         cout<<"-------------------------- START MESSAGE BUFFER ----------------------------"<<endl;
00344         for(unsigned int i=0; i<msgBufSize; i+=4)
00345                 printf("%d, %d, %d, %d\n", msgBuffer[i], msgBuffer[i+1], msgBuffer[i+2], msgBuffer[i+3]);
00346         cout<<"------------------------ END MESSAGE BUFFER ----------------------------"<<endl<<endl;
00347 }
00348 
00349 

Generated on Mon Sep 3 22:24:34 2007 for SpikeStream Simulation by  doxygen 1.4.4