Main Page | Namespace List | Alphabetical List | Class List | Directories | File List | Class Members | File Members

UDPSynchronizedServer.cpp

Go to the documentation of this file.
00001 /***************************************************************************
00002  *   SpikeStream Simulation                                                *
00003  *   Copyright (C) 2007 by David Gamez                                     *
00004  *   david@davidgamez.eu                                                   *
00005  *   Version 0.1                                                           *
00006  *                                                                         *
00007  *   This program is free software; you can redistribute it and/or modify  *
00008  *   it under the terms of the GNU General Public License as published by  *
00009  *   the Free Software Foundation; either version 2 of the License, or     *
00010  *   (at your option) any later version.                                   *
00011  *                                                                         *
00012  *   This program is distributed in the hope that it will be useful,       *
00013  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00014  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
00015  *   GNU General Public License for more details.                          *
00016  *                                                                         *
00017  *   You should have received a copy of the GNU General Public License     *
00018  *   along with this program; if not, write to the                         *
00019  *   Free Software Foundation, Inc.,                                       *
00020  *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.             *
00021  ***************************************************************************/
00022 
00023 //SpikeStream includes
00024 #include "UDPSynchronizedServer.h"
00025 #include "SpikeStreamSimulation.h"
00026 
00027 //Other includes
00028 #include <iostream>
00029 #include <mysql++.h>
00030 using namespace std;
00031 using namespace mysqlpp;
00032 
00033 
00034 /*! Step counter is a 15 bit number whose max value is defined here. */
00035 #define STEP_COUNTER_MAX 32768;
00036 
00037 
00038 /*! Constructor. */
00039 UDPSynchronizedServer::UDPSynchronizedServer(DBInterface* dbInter, unsigned int neurGrpWidth){
00040         //Store reference to database handling class
00041         deviceDBInterface = dbInter;
00042 
00043         //Store details about neuron group
00044         neuronGrpWidth = neurGrpWidth;
00045 
00046         //Socket starts closed
00047         socketOpen = false;
00048 }
00049 
00050 
00051 /*! Destructor. */
00052 UDPSynchronizedServer::~UDPSynchronizedServer(){
00053         #ifdef MEMORY_DEBUG
00054                 cout<<"DESTROYING UDP SYNCHRONIZED SERVER"<<endl;
00055         #endif//MEMORY_DEBUG
00056 
00057         //Close socket
00058         closeDevice();
00059 }
00060 
00061 
00062 //--------------------------------------------------------------------------
00063 //--------------------------- PUBLIC METHODS -------------------------------
00064 //--------------------------------------------------------------------------
00065 
00066 /*! Closes the socket. */
00067 bool UDPSynchronizedServer::closeDevice(){
00068         close(socketHandle);
00069         socketOpen = false;
00070         return true;
00071 }
00072 
00073 
00074 /*! Opens the UDP socket. 
00075         Adapted from: http://ntrg.cs.tcd.ie/undergrad/4ba2/multicast/antony/index.html. */
00076 bool UDPSynchronizedServer::openSocket(string address, int port){
00077         /* Create what looks like an ordinary UDP socket */
00078         socketHandle=socket(AF_INET, SOCK_DGRAM, 0);
00079         if (socketHandle < 0) {
00080                 SpikeStreamSimulation::systemError("DeviceManager: ERROR OPENING SOCKET");
00081                 return false;
00082         }
00083 
00084      /* Set up destination address */
00085      memset(&socketAddress, 0, sizeof(socketAddress));
00086      socketAddress.sin_family=AF_INET;
00087      socketAddress.sin_addr.s_addr=inet_addr(address.data());
00088      socketAddress.sin_port=htons(port);
00089 
00090         //If we have reached this point,the socket is open
00091         socketOpen = true;
00092         return true;
00093 }
00094 
00095 
00096 /*! Sends spike data to the receiving socket. */
00097 bool UDPSynchronizedServer::sendSpikeData(){
00098         /* Check that socket is still open. An error in the send will set socketOpen
00099                 to false without attempting to close the socket */
00100         if(!socketOpen)
00101                 return false;
00102 
00103         //Create array to hold the spikes. Each spike takes four bytes.
00104         int bufferSize = (neuronVectorPtr->size() * 4) + 2;//Four bytes per event + 2 bytes synchronization
00105         unsigned char charBuffer[ bufferSize ];
00106 
00107         //Add the first two bytes containing the timestep and delay flag
00108         unsigned short stepCounter = SpikeStreamSimulation::simulationClock->getTimeStep() % STEP_COUNTER_MAX;
00109 
00110         //Shift step counter left to leave space for the delay flag 
00111         stepCounter <<= 1;
00112 
00113         //Set delay flag to 1 if one of the layers in the simulation has delayed in the last time step
00114         if(simulationSynchronizationDelay()){
00115                 stepCounter += 1;
00116         }
00117 
00118         //Pack the step counter into the array
00119         charBuffer[0] = (unsigned char)stepCounter;//0 is the LSB
00120         stepCounter >>= 8;
00121         charBuffer[1] = (unsigned char)stepCounter;//1 is the MSB
00122 
00123         //Work through vector of firing neurons and fill buffer for sending over network
00124         unsigned int xPos, yPos;
00125         unsigned int bufferCounter = 2;
00126         for(vector<unsigned int>::iterator iter = neuronVectorPtr->begin(); iter != neuronVectorPtr->end(); ++iter){
00127 
00128                 /* Sort out x position. The neuron vector contains the full 32 bit neuron id,
00129                         so need to subtract the start position of the neuron group and turn it into
00130                         a coordinate. */
00131                 xPos = (*iter - startNeuronID) % neuronGrpWidth;
00132                 if(xPos > 255){
00133                         SpikeStreamSimulation::systemError("UDPSynchronizedServer: X POSITION OUT OF RANGE: ", xPos);
00134                         //Leave socket open since this is ok, but return false because of data error
00135                         return false;
00136                 }
00137                 charBuffer[bufferCounter] = (unsigned char) xPos;
00138                 ++bufferCounter;
00139 
00140                 /* Sort out y position. The neuron vector contains the full 32 bit neuron id,
00141                         so need to subtract the start position of the neuron group and turn it into
00142                         a coordinate. */
00143                 yPos = (*iter - startNeuronID) / neuronGrpWidth;
00144                 if(yPos > 255){
00145                         SpikeStreamSimulation::systemError("UDPSynchronizedServer: Y POSITION OUT OF RANGE: ", yPos);
00146                         //Leave socket open since this is ok, but return false because of data error
00147                         return false;
00148                 }
00149                 charBuffer[bufferCounter] = (unsigned char) yPos;
00150                 ++bufferCounter;
00151 
00152                 //FIXME Add time, for the moment, this is not used, so add zeros here
00153                 for(int i=0; i<2; ++i){
00154                         charBuffer[bufferCounter] = 0;
00155                         ++bufferCounter;
00156                 }
00157         }
00158 
00159         //Broadcast buffer over network
00160         #ifdef SEND_NETWORK_MSG_DEBUG
00161                 cout<<"Sending message: size of = "<<sizeof(charBuffer)<<" buffer size = "<<bufferSize<<endl;
00162         #endif//SEND_NETWORK_MESSAGE_DEBUG
00163 
00164         int sendResult = sendto(socketHandle, charBuffer, bufferSize, 0, (struct sockaddr *) &socketAddress, sizeof(socketAddress));
00165         if (sendResult < 0) {
00166                 SpikeStreamSimulation::systemError("DeviceManager: ERROR SENDING MESSAGE ");
00167                 perror("sendto");
00168                 socketOpen = false;
00169                 return false;
00170         }
00171 
00172         //If we have reached this point, everything should be ok
00173         return true;
00174 }
00175 
00176 
00177 /*! Passes a reference to the neuron vector in SpikeStreamSimulation to this class.
00178         This contains the ids of the currently firing neurons. */
00179 void UDPSynchronizedServer::setNeuronVector(vector<unsigned int> *neurVectPtr, unsigned int startNeurID){
00180         neuronVectorPtr = neurVectPtr;
00181         startNeuronID = startNeurID;
00182 }
00183 
00184 
00185 /*! Returns true if there is 1 or more entries in the synchronization delay database
00186         These entries indicate that another task is delaying itself to synchronize
00187         up with an external device. */
00188 bool UDPSynchronizedServer::simulationSynchronizationDelay(){
00189         try{
00190                 Query deviceQuery = deviceDBInterface->getQuery();
00191                 deviceQuery.reset();
00192                 deviceQuery<<"SELECT * FROM SynchronizationDelay";
00193                 Result syncRes = deviceQuery.store();
00194                 if(syncRes.size() > 0)
00195                         return true;
00196                 return false;
00197         }
00198         catch (const BadQuery& er) {// Handle any query errors
00199                 ostringstream errorStrStream;
00200                 errorStrStream<<"Bad query when checking for synchronization delay: \""<<er.what()<<"\"";
00201                 SpikeStreamSimulation::systemError(errorStrStream.str());
00202         }
00203         catch (const Exception& er) {// Catch-all for any other MySQL++ exceptions
00204                 ostringstream errorStrStream;
00205                 errorStrStream<<"Exception thrown checking for synchronization delay: \""<<er.what()<<"\"";
00206                 SpikeStreamSimulation::systemError(errorStrStream.str());
00207         }
00208         catch(std::exception& er){// Catch-all for any other exceptions
00209                 ostringstream errorStrStream;
00210                 errorStrStream<<"Exception thrown checking for synchronization delay: \""<<er.what()<<"\"";
00211                 SpikeStreamSimulation::systemError(errorStrStream.str());
00212         }
00213 }
00214 
00215 
00216 

Generated on Mon Sep 3 22:24:34 2007 for SpikeStream Simulation by  doxygen 1.4.4