Main Page | Namespace List | Alphabetical List | Class List | Directories | File List | Class Members | File Members

TCPSynchronizedServer.cpp

Go to the documentation of this file.
00001 /***************************************************************************
00002  *   SpikeStream Simulation                                                *
00003  *   Copyright (C) 2007 by David Gamez                                     *
00004  *   david@davidgamez.eu                                                   *
00005  *   Version 0.1                                                           *
00006  *                                                                         *
00007  *   This program is free software; you can redistribute it and/or modify  *
00008  *   it under the terms of the GNU General Public License as published by  *
00009  *   the Free Software Foundation; either version 2 of the License, or     *
00010  *   (at your option) any later version.                                   *
00011  *                                                                         *
00012  *   This program is distributed in the hope that it will be useful,       *
00013  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00014  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
00015  *   GNU General Public License for more details.                          *
00016  *                                                                         *
00017  *   You should have received a copy of the GNU General Public License     *
00018  *   along with this program; if not, write to the                         *
00019  *   Free Software Foundation, Inc.,                                       *
00020  *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.             *
00021  ***************************************************************************/
00022 
00023 //Reentrant needs to be defined for threads
00024 #ifndef _REENTRANT
00025         #define _REENTRANT
00026 #endif
00027 
00028 //SpikeStream includes
00029 #include "TCPSynchronizedServer.h"
00030 #include "SpikeStreamSimulation.h"
00031 #include "DeviceMessages.h"
00032 
00033 /* Thread includes. Note that this needs to come after the 
00034         SpikeStream includes to avoid strange error about int and braces */
00035 #include <pthread.h>
00036 
00037 
00038 /*! Function that is called in a separate thread to connect to the socket. */
00039 void* startTCPServerConnectThread(void* tmpTcpSyncServer){
00040         //Connect to the external device.
00041         TCPSynchronizedServer* tcpSyncServer = (TCPSynchronizedServer*) tmpTcpSyncServer;//Local copy of reference to class for clarity
00042     if (connect(tcpSyncServer->socketHandle, (struct sockaddr *) &tcpSyncServer->socketAddress, sizeof(tcpSyncServer->socketAddress)) < 0){
00043         SpikeStreamSimulation::systemError("TCPSynchronizedServer: CONNECTION TO SERVER FAILED");
00044         }
00045         else{//Successful connection to socket
00046                 tcpSyncServer->socketConnected = true;
00047         }
00048 }
00049 
00050 
00051 /*! Constructor. */
00052 TCPSynchronizedServer::TCPSynchronizedServer(unsigned int neurGrpWidth){
00053         socketOpen = false;
00054         neuronGrpWidth = neurGrpWidth;
00055 }
00056 
00057 
00058 /*! Destructor. */
00059 TCPSynchronizedServer::~TCPSynchronizedServer(){
00060         #ifdef MEMORY_DEBUG
00061                 cout<<"DESTROYING TCP SYNCHRONIZED SERVER"<<endl;
00062         #endif//MEMORY_DEBUG
00063 
00064         //Close socket
00065         closeDevice();
00066 }
00067 
00068 
00069 //-------------------------------------------------------------------------
00070 //------------------------- PUBLIC METHODS --------------------------------
00071 //-------------------------------------------------------------------------
00072 
00073 /*! Closes the socket. */
00074 bool TCPSynchronizedServer::closeDevice(){
00075         if(socketOpen){
00076                 close(socketHandle);
00077                 socketOpen = false;
00078         }
00079         return true;
00080 }
00081 
00082 
00083 /*! Opens the socket
00084         Adapted from: http://ntrg.cs.tcd.ie/undergrad/4ba2/multicast/antony/index.html. */
00085 bool TCPSynchronizedServer::openSocket(string ipAddress, int port){
00086     /* Create a reliable, stream socket using TCP */
00087         socketHandle = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
00088     if (socketHandle < 0){
00089                 SpikeStreamSimulation::systemError("TCPSynchronizedServer: ERROR CREATING SOCKET");
00090                 return false;
00091         }
00092 
00093     /* Construct the address structure */
00094     memset(&socketAddress, 0, sizeof(socketAddress));     /* Zero out structure */
00095     socketAddress.sin_family      = AF_INET;             /* Internet address family */
00096     socketAddress.sin_addr.s_addr = inet_addr(ipAddress.data());   /* Server IP address */
00097     socketAddress.sin_port        = htons(port); /* Server port */
00098 
00099 
00100     /* Establish the connection to the server. Do this as a separate thread so it can be timed out.*/
00101         //Socket connected is used to check whether we have the connection established
00102         socketConnected = false;
00103 
00104         //Declare thread to make connection
00105         pthread_t connectThread;
00106         int res = pthread_create(&connectThread, NULL, startTCPServerConnectThread, (void*)this);
00107         if(res != 0){
00108                 SpikeStreamSimulation::systemError("TCPSynchronizedServer: ERROR CREATING CONNECTION THREAD");
00109                 return false;
00110         }
00111 
00112         //Wait until connection has been established or until we have timed out
00113         int timeoutCount = 0;
00114         while(!socketConnected){
00115 
00116                 //Sleep while waiting for thread to connect to socket.
00117                 sleep(1);
00118 
00119                 //Check to see if we have been sleeping for too long
00120                 ++timeoutCount;
00121                 if(timeoutCount >= 5){
00122                         //No point in waiting for the thread to rejoin since it is locked. Just display error and return false.
00123                         SpikeStreamSimulation::systemError("TCPSynchronizedServer: Timeout while waiting for thread to connect to device");
00124                         return false;
00125                 }
00126         }
00127 
00128         //Socket has opened successfully
00129         socketOpen = true;
00130         return true;
00131 }
00132 
00133 
00134 /*! Sends spike data to the receiving socket. */
00135 bool TCPSynchronizedServer::sendSpikeData(){
00136         /* Check that socket is still open. An error in the send will set socketOpen
00137                 to false without attempting to close the socket */
00138         if(!socketOpen)
00139                 return false;
00140 
00141         //Create array to hold the spikes. Each spike takes four bytes.
00142         int bufferSize = (neuronVectorPtr->size() * 4) + 4;//Four bytes per event + 4 bytes to record number of spikes in packet
00143         unsigned char charBuffer[ bufferSize ];//Buffer to be sent
00144 
00145         //Add the first two bytes containing the number of spikes in the message
00146         unsigned int numberOfSpikes = neuronVectorPtr->size();
00147 
00148         //Add the number of spikes in the message
00149         charBuffer[0] = (unsigned char)numberOfSpikes;//0 is the LSB
00150         numberOfSpikes >>= 8;
00151         charBuffer[1] = (unsigned char)numberOfSpikes;//1 is the next significant bit
00152         numberOfSpikes >>= 8;
00153         charBuffer[2] = (unsigned char)numberOfSpikes;//2 is the next significant bit
00154         numberOfSpikes >>= 8;
00155         charBuffer[3] = (unsigned char)numberOfSpikes;//3 is the most significant bit
00156 
00157         //Work through vector of firing neurons and fill buffer for sending over network
00158         unsigned int xPos, yPos;
00159         unsigned int bufferCounter = 4;
00160         for(vector<unsigned int>::iterator iter = neuronVectorPtr->begin(); iter != neuronVectorPtr->end(); ++iter){
00161 
00162                 /* Sort out x position. The neuron vector contains the full 32 bit neuron id,
00163                         so need to subtract the start position of the neuron group and turn it into
00164                         a coordinate. */
00165                 xPos = (*iter - startNeuronID) % neuronGrpWidth;
00166                 if(xPos > 255){
00167                         SpikeStreamSimulation::systemError("TCPSynchronizedServer: X POSITION OUT OF RANGE: ", xPos);
00168                         //Leave socket open since this is ok, but return false because of data error
00169                         return false;
00170                 }
00171                 charBuffer[bufferCounter] = (unsigned char) xPos;
00172                 ++bufferCounter;
00173 
00174                 /* Sort out y position. The neuron vector contains the full 32 bit neuron id,
00175                         so need to subtract the start position of the neuron group and turn it into
00176                         a coordinate. */
00177                 yPos = (*iter - startNeuronID) / neuronGrpWidth;
00178                 if(yPos > 255){
00179                         SpikeStreamSimulation::systemError("TCPSynchronizedServer: Y POSITION OUT OF RANGE: ", yPos);
00180                         //Leave socket open since this is ok, but return false because of data error
00181                         return false;
00182                 }
00183                 charBuffer[bufferCounter] = (unsigned char) yPos;
00184                 ++bufferCounter;
00185 
00186                 //FIXME Add time, for the moment, this is not used, so add zeros here
00187                 for(int i=0; i<2; ++i){
00188                         charBuffer[bufferCounter] = 11;
00189                         ++bufferCounter;
00190                 }
00191         }
00192 
00193         //Print buffer if require
00194         #ifdef PRINT_SEND_DEVICE_DATA_MSG
00195                 cout<<"---------------------------------------------------------------------------"<<endl;
00196                 for(int i=0; i<bufferSize; i += 4){
00197                         printf("SEND BUFFER: %d, %d, %d, %d\n", charBuffer[i], charBuffer[i+1], charBuffer[i+2], charBuffer[i+3]);
00198                 }
00199                 cout<<"---------------------------------------------------------------------------"<<endl;
00200                 cout.flush();
00201         #endif//PRINT_SEND_DEVICE_DATA_MSG
00202 
00203         //Send message to device
00204         #ifdef SEND_DEVICE_DATA_DEBUG
00205                 cout<<"TCPSynchronizedServer: Sending message size = "<<bufferSize<<endl;
00206         #endif//SEND_DEVICE_DATA_DEBUG
00207 
00208         int sendResult = send(socketHandle, charBuffer, bufferSize, 0);
00209         if (sendResult != bufferSize){
00210                 SpikeStreamSimulation::systemError("TCPSynchronizedServer: ERROR SENDING MESSAGE WITH SPIKE DATA. INCORRECT NUMBER OF BYTES SENT");
00211                 socketOpen = false;
00212                 return false;
00213         }
00214 
00215         #ifdef SEND_DEVICE_DATA_DEBUG
00216                 cout<<"TCPSynchronizedServer: Data message sent to external device"<<endl;
00217         #endif//SEND_DEVICE_DATA_DEBUG
00218 
00219         //Wait for acknowledgement of 1 byte
00220         unsigned char receiveAckArray[1];
00221         receiveAckArray[0] = 0;
00222     int bytesReceived = 0;
00223         bytesReceived = recv(socketHandle, receiveAckArray, 1, 0);
00224         if(bytesReceived != 1){
00225                 SpikeStreamSimulation::systemError("TCPSynchronizedClient: ERROR WHEN RECEIVNG DEVICE_DATA_ACK_MSG MESSAGE");
00226                 socketOpen = false;
00227                 return false;
00228         }
00229 
00230         //Check message is correct
00231         if(receiveAckArray[0] != DEVICE_DATA_ACK_MSG){
00232                 SpikeStreamSimulation::systemError("TCPSynchronizedClient: INCORRECT DATA ACKNOWLEDGEMENT MESSAGE RECEIVED", (unsigned int)receiveAckArray[0]);
00233                 //Not an error in the socket, but the data received so leave socket open, but return false to flag error
00234                 return false;
00235         }
00236         else{
00237                 #ifdef SEND_DEVICE_DATA_DEBUG
00238                         cout<<"TCPSynchronizedServer: Data acknowlegement message received"<<endl;
00239                 #endif//SEND_DEVICE_DATA_DEBUG
00240         }
00241 
00242         //If we have reached this point everything should be ok
00243         return true;
00244 }
00245 
00246 
00247 /*! Passes a reference to the vector of firing neurons held in SpikeStreamSimulation. */
00248 void TCPSynchronizedServer::setNeuronVector(vector<unsigned int> *neurVectPtr, unsigned int startNeurID){
00249         neuronVectorPtr = neurVectPtr;
00250         startNeuronID = startNeurID;
00251 }
00252 
00253 

Generated on Mon Sep 3 22:24:34 2007 for SpikeStream Simulation by  doxygen 1.4.4