00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "UDPSynchronizedServer.h"
00025 #include "SpikeStreamSimulation.h"
00026
00027
00028 #include <iostream>
00029 #include <mysql++.h>
00030 using namespace std;
00031 using namespace mysqlpp;
00032
00033
00034
00035 #define STEP_COUNTER_MAX 32768;
00036
00037
00038
00039 UDPSynchronizedServer::UDPSynchronizedServer(DBInterface* dbInter, unsigned int neurGrpWidth){
00040
00041 deviceDBInterface = dbInter;
00042
00043
00044 neuronGrpWidth = neurGrpWidth;
00045
00046
00047 socketOpen = false;
00048 }
00049
00050
00051
00052 UDPSynchronizedServer::~UDPSynchronizedServer(){
00053 #ifdef MEMORY_DEBUG
00054 cout<<"DESTROYING UDP SYNCHRONIZED SERVER"<<endl;
00055 #endif//MEMORY_DEBUG
00056
00057
00058 closeDevice();
00059 }
00060
00061
00062
00063
00064
00065
00066
00067 bool UDPSynchronizedServer::closeDevice(){
00068 close(socketHandle);
00069 socketOpen = false;
00070 return true;
00071 }
00072
00073
00074
00075
00076 bool UDPSynchronizedServer::openSocket(string address, int port){
00077
00078 socketHandle=socket(AF_INET, SOCK_DGRAM, 0);
00079 if (socketHandle < 0) {
00080 SpikeStreamSimulation::systemError("DeviceManager: ERROR OPENING SOCKET");
00081 return false;
00082 }
00083
00084
00085 memset(&socketAddress, 0, sizeof(socketAddress));
00086 socketAddress.sin_family=AF_INET;
00087 socketAddress.sin_addr.s_addr=inet_addr(address.data());
00088 socketAddress.sin_port=htons(port);
00089
00090
00091 socketOpen = true;
00092 return true;
00093 }
00094
00095
00096
00097 bool UDPSynchronizedServer::sendSpikeData(){
00098
00099
00100 if(!socketOpen)
00101 return false;
00102
00103
00104 int bufferSize = (neuronVectorPtr->size() * 4) + 2;
00105 unsigned char charBuffer[ bufferSize ];
00106
00107
00108 unsigned short stepCounter = SpikeStreamSimulation::simulationClock->getTimeStep() % STEP_COUNTER_MAX;
00109
00110
00111 stepCounter <<= 1;
00112
00113
00114 if(simulationSynchronizationDelay()){
00115 stepCounter += 1;
00116 }
00117
00118
00119 charBuffer[0] = (unsigned char)stepCounter;
00120 stepCounter >>= 8;
00121 charBuffer[1] = (unsigned char)stepCounter;
00122
00123
00124 unsigned int xPos, yPos;
00125 unsigned int bufferCounter = 2;
00126 for(vector<unsigned int>::iterator iter = neuronVectorPtr->begin(); iter != neuronVectorPtr->end(); ++iter){
00127
00128
00129
00130
00131 xPos = (*iter - startNeuronID) % neuronGrpWidth;
00132 if(xPos > 255){
00133 SpikeStreamSimulation::systemError("UDPSynchronizedServer: X POSITION OUT OF RANGE: ", xPos);
00134
00135 return false;
00136 }
00137 charBuffer[bufferCounter] = (unsigned char) xPos;
00138 ++bufferCounter;
00139
00140
00141
00142
00143 yPos = (*iter - startNeuronID) / neuronGrpWidth;
00144 if(yPos > 255){
00145 SpikeStreamSimulation::systemError("UDPSynchronizedServer: Y POSITION OUT OF RANGE: ", yPos);
00146
00147 return false;
00148 }
00149 charBuffer[bufferCounter] = (unsigned char) yPos;
00150 ++bufferCounter;
00151
00152
00153 for(int i=0; i<2; ++i){
00154 charBuffer[bufferCounter] = 0;
00155 ++bufferCounter;
00156 }
00157 }
00158
00159
00160 #ifdef SEND_NETWORK_MSG_DEBUG
00161 cout<<"Sending message: size of = "<<sizeof(charBuffer)<<" buffer size = "<<bufferSize<<endl;
00162 #endif//SEND_NETWORK_MESSAGE_DEBUG
00163
00164 int sendResult = sendto(socketHandle, charBuffer, bufferSize, 0, (struct sockaddr *) &socketAddress, sizeof(socketAddress));
00165 if (sendResult < 0) {
00166 SpikeStreamSimulation::systemError("DeviceManager: ERROR SENDING MESSAGE ");
00167 perror("sendto");
00168 socketOpen = false;
00169 return false;
00170 }
00171
00172
00173 return true;
00174 }
00175
00176
00177
00178
00179 void UDPSynchronizedServer::setNeuronVector(vector<unsigned int> *neurVectPtr, unsigned int startNeurID){
00180 neuronVectorPtr = neurVectPtr;
00181 startNeuronID = startNeurID;
00182 }
00183
00184
00185
00186
00187
00188 bool UDPSynchronizedServer::simulationSynchronizationDelay(){
00189 try{
00190 Query deviceQuery = deviceDBInterface->getQuery();
00191 deviceQuery.reset();
00192 deviceQuery<<"SELECT * FROM SynchronizationDelay";
00193 Result syncRes = deviceQuery.store();
00194 if(syncRes.size() > 0)
00195 return true;
00196 return false;
00197 }
00198 catch (const BadQuery& er) {
00199 ostringstream errorStrStream;
00200 errorStrStream<<"Bad query when checking for synchronization delay: \""<<er.what()<<"\"";
00201 SpikeStreamSimulation::systemError(errorStrStream.str());
00202 }
00203 catch (const Exception& er) {
00204 ostringstream errorStrStream;
00205 errorStrStream<<"Exception thrown checking for synchronization delay: \""<<er.what()<<"\"";
00206 SpikeStreamSimulation::systemError(errorStrStream.str());
00207 }
00208 catch(std::exception& er){
00209 ostringstream errorStrStream;
00210 errorStrStream<<"Exception thrown checking for synchronization delay: \""<<er.what()<<"\"";
00211 SpikeStreamSimulation::systemError(errorStrStream.str());
00212 }
00213 }
00214
00215
00216