00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #ifndef _REENTRANT
00025 #define _REENTRANT
00026 #endif
00027
00028
00029 #include "TCPSynchronizedClient.h"
00030 #include "SpikeStreamSimulation.h"
00031 #include "DeviceMessages.h"
00032 #include "PerformanceTimer.h"
00033
00034
00035
00036 #include <pthread.h>
00037
00038
00039
00040
00041
00042 #define SPIKE_BUFFER_SIZE 60000
00043
00044
00045
00046 #define MESSAGE_BUFFER_SIZE 1048576
00047
00048
00049
00050 void* startTCPClientConnectThread(void* tmpTcpSyncClient){
00051
00052 TCPSynchronizedClient* tcpSyncClient = (TCPSynchronizedClient*) tmpTcpSyncClient;
00053 if (connect(tcpSyncClient->socketHandle, (struct sockaddr *) &tcpSyncClient->externalDevAddr, sizeof(tcpSyncClient->externalDevAddr)) < 0){
00054 SpikeStreamSimulation::systemError("TCPSynchronizedClient: CONNECTION TO SERVER FAILED");
00055 }
00056 else{
00057 tcpSyncClient->socketConnected = true;
00058 }
00059 }
00060
00061
00062
00063 TCPSynchronizedClient::TCPSynchronizedClient(unsigned int neurGrpWidth, unsigned int neurGrpLength){
00064
00065 neuronGrpWidth = neurGrpWidth;
00066 neuronGrpLength = neurGrpLength;
00067
00068
00069 socketOpen = false;
00070 twoByteCoordinates = false;
00071
00072
00073 spikeCount = 0;
00074 spikeBuffer = new unsigned int[SPIKE_BUFFER_SIZE];
00075
00076
00077 msgBuffer = new unsigned char[MESSAGE_BUFFER_SIZE];
00078
00079
00080 requestDataMsgArray[0] = SPIKESTREAM_DATA_ACK_MSG;
00081 }
00082
00083
00084
00085 TCPSynchronizedClient::~TCPSynchronizedClient(){
00086 #ifdef MEMORY_DEBUG
00087 cout<<"DESTROYING TCP SYNCHRONIZATION CLIENT"<<endl;
00088 #endif//MEMORY_DEBUG
00089
00090
00091 closeDevice();
00092
00093
00094 delete [] spikeBuffer;
00095 delete [] msgBuffer;
00096 }
00097
00098
00099
00100
00101
00102
00103
00104 bool TCPSynchronizedClient::closeDevice(){
00105 if(socketOpen){
00106 close(socketHandle);
00107 socketOpen = false;
00108 }
00109 return true;
00110 }
00111
00112
00113
00114 bool TCPSynchronizedClient::deviceOpen(){
00115 return socketOpen;
00116 }
00117
00118
00119
00120 bool TCPSynchronizedClient::fetchData(){
00121
00122
00123 if(!socketOpen)
00124 return false;
00125
00126
00127 spikeCount = 0;
00128
00129 #ifdef RECEIVE_DEVICE_DATA_DEBUG
00130 cout<<"TCPSynchronizedClient: Waiting for data ..."<<endl;
00131 #endif//RECEIVE_DEVICE_DATA_DEBUG
00132
00133
00134
00135
00136 unsigned char messageSizeArray[1];
00137 unsigned int numberOfMsgSpikes = 0;
00138 unsigned int totalBytesReceived = 0, bytesReceived = 0;
00139 while (totalBytesReceived < 4){
00140
00141
00142
00143 if ((bytesReceived = recv(socketHandle, messageSizeArray, 1, 0)) <= 0){
00144 SpikeStreamSimulation::systemError("TCPSynchronizedClient: ERROR RECEIVNG SIZE OF MESSAGE");
00145 socketOpen = false;
00146 return false;
00147 }
00148
00149
00150 unsigned int tmpPartMsgSpikeCount = messageSizeArray[0];
00151 numberOfMsgSpikes += tmpPartMsgSpikeCount << 8*totalBytesReceived;
00152
00153
00154 totalBytesReceived += bytesReceived;
00155 }
00156
00157 #ifdef RECEIVE_DEVICE_DATA_DEBUG
00158 cout<<"TCPSynchronizedClient: Data received. Number of spikes in message = "<<numberOfMsgSpikes<<endl;
00159 #endif//RECEIVE_DEVICE_DATA_DEBUG
00160
00161
00162
00163 unsigned int msgBufferSize = 4 * numberOfMsgSpikes;
00164 unsigned char *msgBufferPtr = msgBuffer;
00165 totalBytesReceived = 0;
00166 bytesReceived = 0;
00167 while (totalBytesReceived < msgBufferSize ){
00168
00169 if ((bytesReceived = recv(socketHandle, msgBufferPtr, msgBufferSize - totalBytesReceived, 0)) <= 0){
00170 SpikeStreamSimulation::systemError("TCPSynchronizedClient: ERROR WHEN RECEIVING MAIN MESSAGE DATA");
00171 socketOpen = false;
00172 return false;
00173 }
00174
00175
00176 totalBytesReceived += bytesReceived;
00177
00178
00179 msgBufferPtr += bytesReceived;
00180 }
00181
00182
00183 #ifdef PRINT_RECEIVE_DEVICE_DATA_MSG
00184 printMessageBuffer(msgBuffer, msgBufferSize);
00185 #endif//PRINT_RECEIVE_DEVICE_DATA_MSG
00186
00187
00188 for(unsigned int i=0; i < msgBufferSize; i += 4){
00189 if(twoByteCoordinates){
00190
00191 unsigned int xVal = msgBuffer[i+1] & 255;
00192 xVal <<= 8;
00193 xVal += msgBuffer[i] & 255;
00194
00195 unsigned int yVal = msgBuffer[i+3] & 255;
00196 yVal <<= 8;
00197 yVal += msgBuffer[i + 2] & 255;
00198
00199
00200 if(xVal > neuronGrpWidth){
00201 SpikeStreamSimulation::systemError("TCPSynchronizedClient: X POSITION OUT OF RANGE ", xVal);
00202
00203 return false;
00204 }
00205 if(yVal > neuronGrpLength){
00206 SpikeStreamSimulation::systemError("TCPSynchronizedClient: Y POSITION OUT OF RANGE ", yVal);
00207
00208 return false;
00209 }
00210
00211
00212 spikeBuffer[spikeCount * 3] = xVal;
00213 spikeBuffer[spikeCount * 3 + 1] = yVal;
00214
00215
00216 spikeBuffer[spikeCount * 3 + 2] = 0;
00217 }
00218 else{
00219
00220 if(msgBuffer[i] > neuronGrpWidth){
00221 SpikeStreamSimulation::systemError("TCPSynchronizedClient: X POSITION OUT OF RANGE ", msgBuffer[i]);
00222
00223 return false;
00224 }
00225 if(msgBuffer[i+1] > neuronGrpLength){
00226 SpikeStreamSimulation::systemError("TCPSynchronizedClient: Y POSITION OUT OF RANGE ", msgBuffer[i+1]);
00227
00228 return false;
00229 }
00230
00231
00232 spikeBuffer[spikeCount * 3] = msgBuffer[i];
00233 spikeBuffer[spikeCount * 3 + 1] = msgBuffer[i+1];
00234
00235
00236
00237
00238 unsigned int time = msgBuffer[i+3];
00239 time <<= 8;
00240 time += msgBuffer[i+2];
00241
00242
00243 unsigned int delay = time / 10;
00244 if(delay > NUMBER_OF_DELAY_VALUES)
00245 spikeBuffer[spikeCount * 3 + 2] = 0;
00246 else
00247 spikeBuffer[spikeCount * 3 + 2] = delay;
00248 }
00249
00250
00251 ++spikeCount;
00252 }
00253
00254 #ifdef RECEIVE_DEVICE_DATA_DEBUG
00255 cout<<"TCPSynchronizedClient: Spikes unpacked into SpikeBuffer"<<endl;
00256 #endif//RECEIVE_DEVICE_DATA_DEBUG
00257
00258
00259
00260 if (send(socketHandle, requestDataMsgArray, 1, 0) != 1){
00261 SpikeStreamSimulation::systemError("TCPSynchronizedClient: SENDING OF REQUEST_DEVICE_DATA_MSG TO SERVER FAILED");
00262 socketOpen = false;
00263 return false;
00264 }
00265
00266
00267
00268
00269 return true;
00270 }
00271
00272
00273
00274
00275
00276
00277 bool TCPSynchronizedClient::openSocket(string externalDeviceIPAddr, int externalDevicePort){
00278 cout<<"TCPSynchronizedClient: Connecting to "<<externalDeviceIPAddr<<" on port "<<externalDevicePort<<endl;
00279
00280
00281 socketHandle = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
00282 if (socketHandle < 0){
00283 SpikeStreamSimulation::systemError("TCPSynchronizedClient: ERROR CREATING SOCKET");
00284 return false;
00285 }
00286
00287
00288 memset(&externalDevAddr, 0, sizeof(externalDevAddr));
00289 externalDevAddr.sin_family = AF_INET;
00290 externalDevAddr.sin_addr.s_addr = inet_addr(externalDeviceIPAddr.data());
00291 externalDevAddr.sin_port = htons(externalDevicePort);
00292
00293
00294
00295
00296 socketConnected = false;
00297
00298
00299 pthread_t connectThread;
00300 int res = pthread_create(&connectThread, NULL, startTCPClientConnectThread, (void*)this);
00301 if(res != 0){
00302 SpikeStreamSimulation::systemError("TCPSynchronizedClient: ERROR CREATING CONNECTION THREAD");
00303 return false;
00304 }
00305
00306
00307 int timeoutCount = 0;
00308 while(!socketConnected){
00309
00310
00311 sleep(1);
00312
00313
00314 ++timeoutCount;
00315 if(timeoutCount >= 5){
00316
00317 SpikeStreamSimulation::systemError("TCPSynchronizedClient: Timeout while waiting for thread to connect to device");
00318 return false;
00319 }
00320 }
00321
00322
00323
00324 socketOpen = true;
00325 return true;
00326 }
00327
00328
00329
00330
00331 void TCPSynchronizedClient::setTwoByteCoords(bool twoBtCds){
00332 twoByteCoordinates = twoBtCds;
00333 }
00334
00335
00336
00337
00338
00339
00340
00341
00342 void TCPSynchronizedClient::printMessageBuffer(unsigned char* msgBuffer, unsigned int msgBufSize){
00343 cout<<"-------------------------- START MESSAGE BUFFER ----------------------------"<<endl;
00344 for(unsigned int i=0; i<msgBufSize; i+=4)
00345 printf("%d, %d, %d, %d\n", msgBuffer[i], msgBuffer[i+1], msgBuffer[i+2], msgBuffer[i+3]);
00346 cout<<"------------------------ END MESSAGE BUFFER ----------------------------"<<endl<<endl;
00347 }
00348
00349