00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "Debug.h"
00025 #include "PVMMessages.h"
00026 #include "SpikeStreamArchiver.h"
00027 #include "GlobalVariables.h"
00028 #include "Utilities.h"
00029
00030
00031 #include <ctime>
00032 #include <stdlib.h>
00033 #include "pvm3.h"
00034 #include <iostream>
00035 using namespace std;
00036 using namespace mysqlpp;
00037
00038
00039
00040 int SpikeStreamArchiver::parentTaskID = 0;
00041 bool SpikeStreamArchiver::errorState = false;
00042
00043
00044
00045 SpikeStreamArchiver::SpikeStreamArchiver(int argc, char **argv){
00046
00047 myTaskID = pvm_mytid();
00048 parentTaskID = pvm_parent();
00049
00050
00051 networkDataStored = false;
00052
00053
00054 pvm_setopt( PvmRoute, PvmRouteDirect );
00055
00056
00057 unpackArray = new unsigned int[MAX_NUMBER_OF_SPIKES];
00058
00059
00060
00061
00062
00063
00064 char *neuralNetworkHost, *neuralNetworkUser, *neuralNetworkPassword, *neuralNetworkDatabase;
00065 char *archiveHost, *archiveUser, *archivePassword, *archiveDatabase;
00066
00067 for(int i=1; i<argc; i += 2){
00068 if(argv[i][0] == '-'){
00069 if(Utilities::cStringEquals(argv[i], "-nnh", 4)){
00070 neuralNetworkHost = argv[i+1];
00071 }
00072 else if(Utilities::cStringEquals(argv[i], "-nnu", 4)) {
00073 neuralNetworkUser = argv[i+1];
00074 }
00075 else if(Utilities::cStringEquals(argv[i], "-nnp", 4)){
00076 neuralNetworkPassword = argv[i+1];
00077 }
00078 else if(Utilities::cStringEquals(argv[i], "-nnd", 4)){
00079 neuralNetworkDatabase = argv[i+1];
00080 }
00081 else if(Utilities::cStringEquals(argv[i], "-ah", 4)){
00082 archiveHost = argv[i+1];
00083 }
00084 else if(Utilities::cStringEquals(argv[i], "-au", 4)) {
00085 archiveUser = argv[i+1];
00086 }
00087 else if(Utilities::cStringEquals(argv[i], "-ap", 4)){
00088 archivePassword = argv[i+1];
00089 }
00090 else if(Utilities::cStringEquals(argv[i], "-ad", 4)){
00091 archiveDatabase = argv[i+1];
00092 }
00093 else if(Utilities::cStringEquals(argv[i], "-an", 4)){
00094
00095 Utilities::safeCStringCopy(archiveName, argv[i+1], MAX_DATABASE_NAME_LENGTH);
00096 }
00097 else if(Utilities::cStringEquals(argv[i], "-at", 4)){
00098 if(Utilities::cStringEquals(argv[i+1], "Neurons", 30))
00099 archiveType = FIRING_NEURON_ARCHIVE;
00100 else if(Utilities::cStringEquals(argv[i+1], "Spikes", 20))
00101 archiveType = SPIKE_ARCHIVE;
00102 else{
00103 cerr<<"SpikeStreamArchiver: ARCHIVE TYPE NOT RECOGNIZED: \""<<argv[i+1]<<"\". SETTING TO ARCHIVE FIRING NEURONS"<<endl;
00104 archiveType = FIRING_NEURON_ARCHIVE;
00105 }
00106 }
00107 }
00108 else{
00109 cerr<<"SpikeStreamArchiver: Invalid command line arguments!"<<endl;
00110 return;
00111 }
00112 }
00113
00114
00115 #ifdef COMMAND_LINE_PARAMETERS_DEBUG
00116 cout<<"Neural network database parameters: "<<neuralNetworkHost<<", "<<neuralNetworkUser<<", "<<neuralNetworkPassword<<", "<<neuralNetworkDatabase<<endl;
00117 cout<<"Archive database parameters: "<<archiveHost<<", "<<archiveUser<<", "<<archivePassword<<", "<<archiveDatabase<<endl;
00118 #endif//COMMAND_LINE_PARAMETERS_DEBUG
00119
00120
00121 neuralNetworkDBInterface = new DBInterface(neuralNetworkHost, neuralNetworkUser, neuralNetworkPassword, neuralNetworkDatabase);
00122 if(!neuralNetworkDBInterface->connectToDatabase(true)){
00123 systemError("FAILED TO CONNECT TO THE NEURAL NETWORK DATABASE");
00124 }
00125
00126
00127 archiveDBInterface = new DBInterface(archiveHost, archiveUser, archivePassword, archiveDatabase);
00128 if(!archiveDBInterface->connectToDatabase(true)){
00129 systemError("FAILED TO CONNECT TO NEURAL ARCHIVE DATABASE");
00130 }
00131
00132
00133
00134
00135 loadTaskIDs();
00136
00137
00138
00139 archiveNeuralNetwork();
00140
00141
00142 sendMessage(parentTaskID, ARCHIVE_TASK_STARTED_MSG, simulationStartTime);
00143
00144
00145 cout<<"Archiver task successfully started with taskID "<<myTaskID<<endl;
00146
00147
00148 stop = false;
00149
00150 run();
00151 }
00152
00153
00154
00155 SpikeStreamArchiver::~SpikeStreamArchiver(){
00156 #ifdef MEMORY_DEBUG
00157 cout<<"DELETING NEURON ARCHIVER. TASK ID: "<<myTaskID<<endl;
00158 #endif//MEMORY_DEBUG
00159
00160
00161 delete archiveDBInterface;
00162 delete neuralNetworkDBInterface;
00163
00164
00165 delete [] unpackArray;
00166 }
00167
00168
00169
00170 void SpikeStreamArchiver::run(){
00171 while(!stop){
00172
00173 int bufID = pvm_recv(-1, -1);
00174 if(bufID < 0){
00175 pvm_perror("SpikeStreamArchiver: MESSAGE RECEIVE ERROR");
00176 systemError("SpikeStreamArchiver: MESSAGE RECEIVE ERROR");
00177 }
00178 else if (bufID > 0){
00179 int bytes, msgtag, senderTID;
00180 int info = pvm_bufinfo(bufID, &bytes, &msgtag, &senderTID);
00181 if(info < 0){
00182 pvm_perror("SpikeStreamArchiver: PROBLEM GETTING BUFFER INFO");
00183 systemError("SpikeStreamArchiver: PROBLEM GETTING BUFFER INFO");
00184 }
00185 else if(errorState){
00186 if(msgtag == EXIT_MSG){
00187 #ifdef MESSAGE_DEBUG
00188 cout<<"Task "<<myTaskID<<": EXIT_MSG "<<bytes<<" bytes received from "<<senderTID<<endl;
00189 #endif//MESSAGE_DEBUG
00190 stop = true;
00191 }
00192 else{
00193 systemError("Task is in error state and will no longer carry out any operations.\nTermination of the simulation is recommended.");
00194 }
00195 }
00196 else{
00197 switch(msgtag){
00198 case (SPIKE_LIST_MSG):
00199 #ifdef MESSAGE_DEBUG
00200 cout<<"Task "<<myTaskID<<": SPIKE_LIST_MSG "<<bytes<<" bytes received from "<<senderTID<<endl;
00201 #endif//MESSAGE_DEBUG
00202 processSpikeList(senderTID);
00203 break;
00204 case (FIRING_NEURON_LIST_MSG):
00205 #ifdef MESSAGE_DEBUG
00206 cout<<"Task "<<myTaskID<<": FIRING_NEURON_LIST_MSG "<<bytes<<" bytes received from "<<senderTID<<endl;
00207 #endif//MESSAGE_DEBUG
00208 processFiringNeuronList(senderTID);
00209 break;
00210 case (START_ARCHIVING_MSG):
00211 #ifdef MESSAGE_DEBUG
00212 cout<<"Task "<<myTaskID<<": START_ARCHIVING_MSG "<<bytes<<" bytes received from "<<senderTID<<endl;
00213 #endif//MESSAGE_DEBUG
00214 startArchiving();
00215 break;
00216 case (STOP_ARCHIVING_MSG):
00217 #ifdef MESSAGE_DEBUG
00218 cout<<"Task "<<myTaskID<<": STOP_ARCHIVING_MSG "<<bytes<<" bytes received from "<<senderTID<<endl;
00219 #endif//MESSAGE_DEBUG
00220 stopArchiving();
00221 break;
00222 case (EXIT_MSG):
00223 #ifdef MESSAGE_DEBUG
00224 cout<<"Task "<<myTaskID<<": EXIT_MSG "<<bytes<<" bytes received from "<<senderTID<<endl;
00225 #endif//MESSAGE_DEBUG
00226 stop = true;
00227 break;
00228 default:
00229 cout<<"Task "<<myTaskID<<"; UNRECOGNIZED MESSAGE "<<bytes<<" bytes received from "<<senderTID<<endl;
00230 }
00231 }
00232 }
00233 }
00234
00235
00236 cleanUpArchiver();
00237 }
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247 void SpikeStreamArchiver::systemError(const char *message){
00248 errorState = true;
00249 cerr<<"SpikeStreamArchiver: "<<message<<endl;
00250 sendMessage(parentTaskID, ERROR_MSG, message);
00251 }
00252
00253
00254
00255
00256
00257 void SpikeStreamArchiver::systemError(const char *message, int messageData1){
00258 errorState = true;
00259 ostringstream tempStr;
00260 tempStr<<message<<messageData1;
00261 cerr<<"SpikeStreamArchiver: "<<message<<messageData1<<endl;
00262 sendMessage(parentTaskID, ERROR_MSG, tempStr.str().data());
00263 }
00264
00265
00266
00267
00268
00269 void SpikeStreamArchiver::systemError(const string &message){
00270 errorState = true;
00271 cerr<<"SpikeStreamArchiver: "<<message<<endl;
00272 sendMessage(parentTaskID, ERROR_MSG, message.data());
00273 }
00274
00275
00276
00277
00278
00279
00280
00281
00282
00283 bool SpikeStreamArchiver::archiveNeuralNetwork(){
00284
00285 simulationStartTime = time(NULL);
00286
00287 string xmlString = "<?xml version=\"1.0\" encoding=\"ISO-8859-1\"?>";
00288 xmlString += "<neural_network>";
00289
00290 try{
00291 Query networkQuery = neuralNetworkDBInterface->getQuery();
00292 networkQuery.reset();
00293 networkQuery<<"SELECT * FROM NeuronGroups";
00294 Result result = networkQuery.store();
00295 for(Result::iterator iter = result.begin(); iter != result.end(); ++iter){
00296 Row row(*iter);
00297
00298
00299 unsigned int neuronGrpID = Utilities::getUInt(row["NeuronGrpID"]);
00300 networkQuery.reset();
00301 networkQuery<<"SELECT MIN(NeuronID) FROM Neurons WHERE NeuronGrpID = "<<neuronGrpID;
00302 Result startNeuronRes = networkQuery.store();
00303 Row startNeuronRow(*startNeuronRes.begin());
00304
00305 xmlString += "<neuron_group id=\"";
00306 xmlString +=(std::string)row["NeuronGrpID"];
00307 xmlString += "\">";
00308
00309 xmlString += "<name>";
00310 xmlString += (std::string)row["Name"];
00311 xmlString += "</name>";
00312
00313 xmlString += "<start_neuron_id>";
00314 xmlString += (std::string)startNeuronRow["MIN(NeuronID)"];
00315 xmlString += "</start_neuron_id>";
00316
00317 xmlString += "<width>";
00318 xmlString += (std::string)row["Width"];
00319 xmlString += "</width>";
00320
00321 xmlString += "<length>";
00322 xmlString += (std::string)row["Length"];
00323 xmlString += "</length>";
00324
00325 xmlString += "<location>";
00326 xmlString += (std::string)row["X"];
00327 xmlString += ",";
00328 xmlString += (std::string)row["Y"];
00329 xmlString += ",";
00330 xmlString += (std::string)row["Z"];
00331 xmlString += "</location>";
00332
00333 xmlString += "<spacing>";
00334 xmlString += (std::string)row["Spacing"];
00335 xmlString += "</spacing>";
00336
00337 xmlString += "<neuron_type>";
00338 xmlString += (std::string)row["NeuronType"];
00339 xmlString += "</neuron_type>";
00340
00341 xmlString += "</neuron_group>";
00342 }
00343
00344
00345 xmlString += "</neural_network>";
00346
00347
00348 Query archiveQuery = archiveDBInterface->getQuery();
00349 archiveQuery.reset();
00350 ostringstream strbuf;
00351 strbuf <<"INSERT INTO NetworkModels(StartTime, ArchiveType, Name, NeuronGroups) VALUES ( "<<simulationStartTime<<", "<<archiveType<<", \""<<archiveName<<"\", \""<<mysqlpp::escape<<xmlString<<"\" )";
00352 archiveQuery.exec(strbuf.str());
00353 }
00354 catch (const BadQuery& er) {
00355 ostringstream errorStrStream;
00356 errorStrStream<<"Bad query when archiving neural network: \""<<er.what()<<"\"";
00357 systemError(errorStrStream.str());
00358 return false;
00359 }
00360 catch (const Exception& er) {
00361 ostringstream errorStrStream;
00362 errorStrStream<<"Exception thrown archiving neural network: \""<<er.what()<<"\"";
00363 systemError(errorStrStream.str());
00364 return false;
00365 }
00366 catch(std::exception& er){
00367 ostringstream errorStrStream;
00368 errorStrStream<<"Exception thrown archiving neural network: \""<<er.what()<<"\"";
00369 systemError(errorStrStream.str());
00370 return false;
00371 }
00372
00373
00374 return true;
00375 }
00376
00377
00378
00379
00380 void SpikeStreamArchiver::cleanUpArchiver(){
00381 cout<<"Cleaning up archiver"<<endl;
00382
00383 for(map<unsigned int, TimeStepHolder>::iterator iter = timeStepMap.begin(); iter != timeStepMap.end(); ++iter){
00384
00385 iter->second.xmlString += "</network_pattern>";
00386
00387
00388 storeNetworkData(iter->first, iter->second.xmlString);
00389 }
00390
00391
00392 if(!networkDataStored){
00393 try{
00394 Query archiveQuery = archiveDBInterface->getQuery();
00395 archiveQuery.reset();
00396 archiveQuery<<"DELETE FROM NetworkModels WHERE StartTime = "<<simulationStartTime;
00397 archiveQuery.execute();
00398 }
00399 catch (const BadQuery& er) {
00400 ostringstream errorStrStream;
00401 errorStrStream<<"Bad query when deleting empty network models: \""<<er.what()<<"\"";
00402 systemError(errorStrStream.str());
00403 }
00404 catch (const Exception& er) {
00405 ostringstream errorStrStream;
00406 errorStrStream<<"Exception thrown deleting empty network models: \""<<er.what()<<"\"";
00407 systemError(errorStrStream.str());
00408 }
00409 catch(std::exception& er){
00410 ostringstream errorStrStream;
00411 errorStrStream<<"Exception thrown deleting empty network models: \""<<er.what()<<"\"";
00412 systemError(errorStrStream.str());
00413 }
00414 }
00415
00416
00417 sendMessage(parentTaskID, TASK_EXITED_MSG);
00418 cout<<"SpikeStreamArchiver: Finished storing data, now exiting pvm and invoking destructor"<<endl;
00419
00420
00421 pvm_exit();
00422 }
00423
00424
00425
00426 string SpikeStreamArchiver::getString(unsigned int uInt){
00427 stringstream tempStringStr;
00428 tempStringStr<<uInt;
00429 return tempStringStr.str();
00430 }
00431
00432
00433
00434 bool SpikeStreamArchiver::loadTaskIDs(){
00435 spikeMessageTotal = 0;
00436 try{
00437 Query networkQuery = neuralNetworkDBInterface->getQuery();
00438 networkQuery.reset();
00439 networkQuery<<"SELECT NeuronGrpID, TaskID FROM NeuronGroups";
00440 Result taskResult = networkQuery.store();
00441 for(Result::iterator iter = taskResult.begin(); iter != taskResult.end(); ++iter){
00442 Row taskRow(*iter);
00443 int tempTaskID = Utilities::getInt(taskRow["TaskID"]);
00444 unsigned int tempNeuronGrpID = Utilities::getUInt(taskRow["NeuronGrpID"]);
00445
00446
00447 taskToNeuronGrpMap[tempTaskID] = tempNeuronGrpID;
00448
00449
00450 spikeMessageTotal += tempTaskID;
00451 }
00452
00453
00454
00455 for(map<int, unsigned int>::iterator iter = taskToNeuronGrpMap.begin(); iter != taskToNeuronGrpMap.end(); ++iter){
00456 networkQuery.reset();
00457 networkQuery<<"SELECT MIN(NeuronID) FROM Neurons WHERE NeuronGrpID = "<<iter->second;
00458 Result minNeurIDRes = networkQuery.store();
00459 Row minNeurIDRow(*minNeurIDRes.begin());
00460 startNeurIDTaskMap[iter->first] = Utilities::getUInt((std::string)minNeurIDRow["MIN(NeuronID)"]);
00461 }
00462 }
00463 catch (const BadQuery& er) {
00464 ostringstream errorStrStream;
00465 errorStrStream<<"Bad query when loading task IDs: \""<<er.what()<<"\"";
00466 systemError(errorStrStream.str());
00467 return false;
00468 }
00469 catch (const Exception& er) {
00470 ostringstream errorStrStream;
00471 errorStrStream<<"Exception thrown loading task IDs: \""<<er.what()<<"\"";
00472 systemError(errorStrStream.str());
00473 return false;
00474 }
00475 catch(std::exception& er){
00476 ostringstream errorStrStream;
00477 errorStrStream<<"Exception thrown loading task IDs: \""<<er.what()<<"\"";
00478 systemError(errorStrStream.str());
00479 return false;
00480 }
00481
00482
00483 return true;
00484 }
00485
00486
00487
00488
00489
00490 void SpikeStreamArchiver::processFiringNeuronList(int senderTID){
00491
00492 stringstream firingNeuronStringStr;
00493
00494
00495 int info = pvm_upkuint(&messageTimeStep, 1, 1);
00496 #ifdef PVM_DEBUG
00497 if(info < 0){
00498 cerr<<"SpikeStreamArchiver: ERROR EXTRACTING MESSAGE TIME STEP; TASK ID = "<<pvm_mytid()<<"; messageTimeStep: "<<messageTimeStep<<endl;
00499 return;
00500 }
00501 #endif//PVM_DEBUG
00502
00503
00504 if(timeStepMap.find(messageTimeStep) != timeStepMap.end()){
00505
00506 timeStepMap[messageTimeStep].spikeMessageCount += senderTID;
00507 }
00508
00509
00510 else{
00511
00512 timeStepMap[messageTimeStep].spikeMessageCount = senderTID;
00513
00514
00515 timeStepMap[messageTimeStep].xmlString = "<?xml version=\"1.0\" encoding=\"ISO-8859-1\"?>";
00516 timeStepMap[messageTimeStep].xmlString += "<network_pattern>";
00517 }
00518
00519
00520 info = pvm_upkuint(&numberOfSpikes, 1, 1);
00521 #ifdef PVM_DEBUG
00522 if(info < 0){
00523 cerr<<"SpikeStreamArchiver: ERROR NUMBER OF FIRING NEURONS FROM MESSAGE; TASK ID = "<<pvm_mytid()<<"; numberOfSpikes: "<<numberOfSpikes<<endl;
00524 return;
00525 }
00526 #endif//PVM_DEBUG
00527
00528
00529 info = pvm_upkuint(unpackArray, numberOfSpikes, 1);
00530 #ifdef PVM_DEBUG
00531 if(info < 0){
00532 cerr<<"SpikeStreamArchiver: ERROR UNPACKING FIRING NEURON IDS FROM MESSAGE; TASK ID = "<<pvm_mytid()<<"; numberOfSpikes: "<<numberOfSpikes<<endl;
00533 return;
00534 }
00535 #endif//PVM_DEBUG
00536
00537
00538 for(int i=0; i<numberOfSpikes; ++i){
00539 firingNeuronStringStr<<unpackArray[i];
00540 if(i != numberOfSpikes - 1)
00541 firingNeuronStringStr<<",";
00542 }
00543
00544 #ifdef FIRING_NEURON_DEBUG
00545 cout<<"SpikeStreamArchiver: Number of firing neurons processed: "<<numberOfSpikes<<" from sender: "<<senderTID<<" at timeStep: "<<messageTimeStep<<endl;
00546 #endif//FIRING_NEURON_DEBUG
00547
00548
00549 timeStepMap[messageTimeStep].xmlString += "<neuron_group id=\"";
00550 timeStepMap[messageTimeStep].xmlString += getString(taskToNeuronGrpMap[senderTID]);
00551 timeStepMap[messageTimeStep].xmlString += "\">";
00552 timeStepMap[messageTimeStep].xmlString += firingNeuronStringStr.str();
00553 timeStepMap[messageTimeStep].xmlString += "</neuron_group>";
00554
00555
00556 for(map<unsigned int, TimeStepHolder>::iterator iter = timeStepMap.begin(); iter != timeStepMap.end(); ++iter){
00557 if(iter->second.spikeMessageCount == spikeMessageTotal){
00558
00559
00560 iter->second.xmlString += "</network_pattern>";
00561
00562
00563 if(storeNetworkData(iter->first, iter->second.xmlString)){
00564
00565 timeStepMap.erase(iter);
00566 }
00567
00568
00569 break;
00570 }
00571 }
00572 }
00573
00574
00575
00576
00577
00578 void SpikeStreamArchiver::processSpikeList(int senderTID){
00579
00580 int info = pvm_upkuint(&messageTimeStep, 1, 1);
00581 #ifdef PVM_DEBUG
00582 if(info < 0){
00583 cerr<<"SpikeStreamArchiver: ERROR EXTRACTING MESSAGE TIME STEP; TASK ID = "<<pvm_mytid()<<"; messageTimeStep: "<<messageTimeStep<<endl;
00584 return;
00585 }
00586 #endif//PVM_DEBUG
00587
00588
00589 if(timeStepMap.find(messageTimeStep) != timeStepMap.end()){
00590
00591 timeStepMap[messageTimeStep].spikeMessageCount += senderTID;
00592 }
00593
00594
00595
00596 else{
00597
00598 timeStepMap[messageTimeStep].spikeMessageCount = senderTID;
00599
00600
00601 timeStepMap[messageTimeStep].xmlString = "<?xml version=\"1.0\" encoding=\"ISO-8859-1\"?>";
00602 timeStepMap[messageTimeStep].xmlString += "<network_pattern>";
00603 }
00604
00605
00606 info = pvm_upkuint(&numberOfSpikes, 1, 1);
00607 #ifdef PVM_DEBUG
00608 if(info < 0){
00609 cerr<<"NetworkMonitor: ERROR NUMBER OF FIRING NEURONS FROM MESSAGE; TASK ID = "<<pvm_mytid()<<"; numberOfSpikes: "<<numberOfSpikes<<endl;
00610 return;
00611 }
00612 #endif//PVM_DEBUG
00613
00614
00615 firingNeuronMap.clear();
00616
00617
00618 info = pvm_upkuint(unpackArray, numberOfSpikes, 1);
00619 #ifdef PVM_DEBUG
00620 if(info < 0){
00621 systemError("NetworkMonitor: ERROR UNPACKING UNSIGNED INT FROM MESSAGE. NUMBER OF SPIKES = ", numberOfSpikes);
00622 return;
00623 }
00624 #endif//PVM_DEBUG
00625
00626
00627 for(unsigned int i=0; i<numberOfSpikes; ++i){
00628
00629
00630 unpkFromNeurID = (unsigned short) unpackArray[i];
00631 unpkFromNeurID += startNeurIDTaskMap[senderTID];;
00632
00633
00634 firingNeuronMap [ unpkFromNeurID ] = true;
00635 }
00636
00637
00638
00639 stringstream firingNeuronStringStr;
00640 bool firstTime = true;
00641 for(map<unsigned int, bool>::iterator iter = firingNeuronMap.begin(); iter != firingNeuronMap.end(); ++iter){
00642 if(!firstTime){
00643 firingNeuronStringStr<<",";
00644 }
00645 firingNeuronStringStr<<iter->first;
00646 firstTime = false;
00647 }
00648
00649
00650 timeStepMap[messageTimeStep].xmlString += "<neuron_group id=\"";
00651 timeStepMap[messageTimeStep].xmlString += getString(taskToNeuronGrpMap[senderTID]);
00652 timeStepMap[messageTimeStep].xmlString += "\">";
00653 timeStepMap[messageTimeStep].xmlString += firingNeuronStringStr.str();
00654 timeStepMap[messageTimeStep].xmlString += "</neuron_group>";
00655
00656
00657
00658 for(map<unsigned int, TimeStepHolder>::iterator iter = timeStepMap.begin(); iter != timeStepMap.end(); ++iter){
00659 if(iter->second.spikeMessageCount == spikeMessageTotal){
00660
00661
00662 iter->second.xmlString += "</network_pattern>";
00663
00664
00665 if(storeNetworkData(iter->first, iter->second.xmlString)){
00666
00667 timeStepMap.erase(iter);
00668 }
00669
00670
00671 break;
00672 }
00673 }
00674 }
00675
00676
00677 bool SpikeStreamArchiver::sendMessage(int taskID, int msgtag){
00678
00679 int info = pvm_initsend(PvmDataDefault);
00680 #ifdef PVM_DEBUG
00681 if(info < 0){
00682 cout<<"SpikeStreamArchiver: Init send error: "<<info<<" with task "<<taskID<<" and msgtag: "<<msgtag<<endl;
00683 return false;
00684 }
00685 #endif//PVM_DEBUG
00686
00687
00688 info = pvm_send(taskID, msgtag);
00689 #ifdef PVM_DEBUG
00690 if(info < 0){
00691 cout<<"SpikeStreamArchiver: Send error: "<<info<<" with task "<<taskID<<" and msgtag: "<<msgtag<<endl;
00692 return false;
00693 }
00694 #endif//PVM_DEBUG
00695 return true;
00696 }
00697
00698
00699
00700 bool SpikeStreamArchiver::sendMessage(int taskID, int msgtag, unsigned int msgInteger){
00701
00702 int info = pvm_initsend(PvmDataDefault);
00703 #ifdef PVM_DEBUG
00704 if(info < 0){
00705 cout<<"SpikeStreamArchiver: Init send error: "<<info<<" with task "<<taskID<<" and msgtag: "<<msgtag<<endl;
00706 return false;
00707 }
00708 #endif//PVM_DEBUG
00709
00710
00711 info = pvm_pkuint(&msgInteger, 1, 1);
00712 #ifdef PVM_DEBUG
00713 if(info < 0){
00714 cerr<<"SpikeStreamArchiver: ERROR PACKING INTEGER INTO MESSAGE"<<endl;
00715 return false;
00716 }
00717 #endif//PVM_DEBUG
00718
00719
00720 info = pvm_send(taskID, msgtag);
00721 #ifdef PVM_DEBUG
00722 if(info < 0){
00723 cout<<"SpikeStreamArchiver: Send error: "<<info<<" with task "<<taskID<<" and msgtag: "<<msgtag<<endl;
00724 return false;
00725 }
00726 #endif//PVM_DEBUG
00727 return true;
00728 }
00729
00730
00731
00732 bool SpikeStreamArchiver::sendMessage(int taskID, int msgtag, const char* charArray){
00733
00734 unsigned int arrayLength = strlen(charArray) + 1;
00735
00736
00737 int info = pvm_initsend(PvmDataDefault);
00738 #ifdef PVM_DEBUG
00739 if(info<0){
00740 cerr<<"SpikeStreamArchiver: Init send error: tag: "<<msgtag<<" to: "<<taskID<<" containing: "<<charArray<<endl;
00741 return false;
00742 }
00743 #endif//PVM_DEBUG
00744
00745
00746 info = pvm_pkuint(&arrayLength, 1, 1);
00747 #ifdef PVM_DEBUG
00748 if(info < 0){
00749 cerr<<"SpikeStreamArchiver: ERROR PACKING MESSAGE LENGTH INTO MESSAGE"<<endl;
00750 return false;
00751 }
00752 #endif //PVM_DEBUG
00753
00754
00755 info = pvm_pkstr((char*)charArray);
00756 #ifdef PVM_DEBUG
00757 if(info < 0){
00758 cerr<<"SpikeStreamArchiver: ERROR PACKING CHAR* INTO MESSAGE"<<endl;
00759 return false;
00760 }
00761 #endif //PVM_DEBUG
00762
00763
00764 info = pvm_send(taskID, msgtag);
00765 #ifdef PVM_DEBUG
00766 if(info<0){
00767 cerr<<"NeuronSimulation: Message send error: tag: "<<msgtag<<" to: "<<taskID<<" containing: "<<charArray<<endl;
00768 return false;
00769 }
00770 #endif //PVM_DEBUG
00771 return true;
00772 }
00773
00774
00775
00776 void SpikeStreamArchiver::startArchiving(){
00777 for(map<int, unsigned int>::iterator iter = taskToNeuronGrpMap.begin(); iter != taskToNeuronGrpMap.end(); ++iter){
00778
00779 if(archiveType == FIRING_NEURON_ARCHIVE)
00780 sendMessage(iter->first, REQUEST_FIRING_NEURON_DATA_MSG);
00781 else if (archiveType == SPIKE_ARCHIVE)
00782 sendMessage(iter->first, REQUEST_SPIKE_DATA_MSG);
00783 }
00784 }
00785
00786
00787
00788 void SpikeStreamArchiver::stopArchiving(){
00789 for(map<int, unsigned int>::iterator iter = taskToNeuronGrpMap.begin(); iter != taskToNeuronGrpMap.end(); ++iter){
00790 if(archiveType == FIRING_NEURON_ARCHIVE)
00791 sendMessage(iter->first, CANCEL_FIRING_NEURON_DATA_MSG);
00792 else if (archiveType == SPIKE_ARCHIVE)
00793 sendMessage(iter->first, CANCEL_SPIKE_DATA_MSG);
00794 }
00795 }
00796
00797
00798
00799 bool SpikeStreamArchiver::storeNetworkData(unsigned int timeStep, string xmlString){
00800 try{
00801 Query archiveQuery = archiveDBInterface->getQuery();
00802 archiveQuery.reset();
00803 ostringstream strbuf;
00804 strbuf <<"INSERT INTO NetworkData VALUES ( "<<simulationStartTime<<", "<<timeStep<<", \""<<mysqlpp::escape<<xmlString<<"\" )";
00805
00806 bool queryResult = archiveQuery.exec(strbuf.str());
00807 if(!queryResult){
00808 systemError("SpikeStreamArchiver: UNSUCCESSFUL MYSQLPP QUERY STORING NETWORK DATA");
00809 return false;
00810 }
00811
00812
00813
00814 networkDataStored = true;
00815 }
00816 catch (const BadQuery& er) {
00817 ostringstream errorStrStream;
00818 errorStrStream<<"Bad query when storing network data: \""<<er.what()<<"\"";
00819 systemError(errorStrStream.str());
00820 return false;
00821 }
00822 catch (const Exception& er) {
00823 ostringstream errorStrStream;
00824 errorStrStream<<"Exception thrown storing network data: \""<<er.what()<<"\"";
00825 systemError(errorStrStream.str());
00826 return false;
00827 }
00828 catch(std::exception& er){
00829 ostringstream errorStrStream;
00830 errorStrStream<<"Exception thrown storing network data: \""<<er.what()<<"\"";
00831 systemError(errorStrStream.str());
00832 return false;
00833 }
00834
00835
00836 return true;
00837 }
00838
00839
00840
00841