Please note that active projects have migrated to https://github.com/fawkesrobotics.

robot-memory: fix include circular dependency
[fawkes.git] / src / plugins / robot-memory / robot_memory.cpp
1 /***************************************************************************
2  *  robot_memory.cpp - Class for storing and querying information in the RobotMemory
3  *    
4  *  Created: Aug 23, 2016 1:34:32 PM 2016
5  *  Copyright  2016  Frederik Zwilling
6  *             2017 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8
9 /*  This program is free software; you can redistribute it and/or modify
10  *  it under the terms of the GNU General Public License as published by
11  *  the Free Software Foundation; either version 2 of the License, or
12  *  (at your option) any later version.
13  *
14  *  This program is distributed in the hope that it will be useful,
15  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *  GNU Library General Public License for more details.
18  *
19  *  Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21
22 #include "robot_memory.h"
23
24 #include <core/threading/mutex.h>
25 #include <core/threading/mutex_locker.h>
26 #include <interfaces/RobotMemoryInterface.h>
27 #include <utils/misc/string_conversions.h>
28 #include <utils/misc/string_split.h>
29 #include <utils/system/hostinfo.h>
30
31 #include <string>
32 #include <chrono>
33 #include <thread>
34
35 // from MongoDB
36 #include <mongo/client/dbclient.h>
37
38 using namespace mongo;
39 using namespace fawkes;
40
41 /** @class RobotMemory "robot_memory.h"
42  * Access to the robot memory based on mongodb.
43  * Using this class, you can query/insert/remove/update information in
44  * the robot memory.  Furthermore, you can register trigger to get
45  * notified when something was changed in the robot memory matching
46  * your query and you can access computables, which are on demand
47  * computed information, by registering the computables and then
48  * querying as if the information would already be in the database.
49  * @author Frederik Zwilling
50  */
51
52 /**
53  * Robot Memory Constructor with objects of the thread
54  * @param config Fawkes config
55  * @param logger Fawkes logger
56  * @param clock Fawkes clock
57  * @param mongo_connection_manager MongoDBConnCreator to create client connections to the shared and local db
58  * @param blackboard Fawkes blackboard
59  */
60 RobotMemory::RobotMemory(fawkes::Configuration* config, fawkes::Logger* logger,
61    fawkes::Clock* clock, fawkes::MongoDBConnCreator* mongo_connection_manager,
62    fawkes::BlackBoard* blackboard)
63 {
64   mutex_ = new Mutex();
65   config_ = config;
66   logger_ = logger;
67   clock_ = clock;
68   mongo_connection_manager_ = mongo_connection_manager;
69   blackboard_ = blackboard;
70   debug_ = false;
71 }
72
73 RobotMemory::~RobotMemory()
74 {
75   mongo_connection_manager_->delete_client(mongodb_client_local_);
76   mongo_connection_manager_->delete_client(mongodb_client_distributed_);
77   delete mutex_;
78   delete trigger_manager_;
79   blackboard_->close(rm_if_);
80 }
81
82 void RobotMemory::init()
83 {
84   //load config values
85   log("Started RobotMemory");
86   default_collection_ = "robmem.test";
87   try {
88     default_collection_ = config_->get_string("/plugins/robot-memory/default-collection");
89   } catch (Exception &e) {}
90   try {
91     debug_ = config_->get_bool("/plugins/robot-memory/more-debug-output");
92   } catch (Exception &e) {}
93   database_name_ = "robmem";
94   try {
95     database_name_ = config_->get_string("/plugins/robot-memory/database");
96   } catch (Exception &e) {}
97   distributed_dbs_ = config_->get_strings("/plugins/robot-memory/distributed-db-names");
98   cfg_startup_grace_period_ = 10;
99   try {
100           cfg_startup_grace_period_ = config_->get_uint("/plugins/robot-memory/startup-grace-period");
101   } catch (Exception &e) {} // ignored, use default
102
103   cfg_coord_database_ = config_->get_string("/plugins/robot-memory/coordination/database");
104   cfg_coord_mutex_collection_ = config_->get_string("/plugins/robot-memory/coordination/mutex-collection");
105   cfg_coord_mutex_collection_ = cfg_coord_database_ + "." + cfg_coord_mutex_collection_;
106
107   using namespace std::chrono_literals;
108   
109   //initiate mongodb connections:
110   log("Connect to local mongod");
111   unsigned int startup_tries = 0;
112   for (; startup_tries < cfg_startup_grace_period_ * 2; ++startup_tries) {
113           try {
114                   mongodb_client_local_ = mongo_connection_manager_->create_client("robot-memory-local");
115                   break;
116           } catch (fawkes::Exception &e) {
117                   logger_->log_info(name_, "Waiting");
118                   std::this_thread::sleep_for(500ms);
119           }
120   }
121
122   if (config_->exists("/plugins/mongodb/clients/robot-memory-distributed/enabled") &&
123       config_->get_bool("/plugins/mongodb/clients/robot-memory-distributed/enabled"))
124   {
125           distributed_ = true;
126     log("Connect to distributed mongod");
127     for (; startup_tries < cfg_startup_grace_period_ * 2; ++startup_tries) {
128             try {
129                     mongodb_client_distributed_ = mongo_connection_manager_->create_client("robot-memory-distributed");
130                     break;
131             } catch (fawkes::Exception &e) {
132                     logger_->log_info(name_, "Waiting");
133                     std::this_thread::sleep_for(500ms);
134             }
135     }
136   }
137
138   //init blackboard interface
139   rm_if_ = blackboard_->open_for_writing<RobotMemoryInterface>(config_->get_string("/plugins/robot-memory/interface-name").c_str());
140   rm_if_->set_error("");
141   rm_if_->set_result("");
142   rm_if_->write();
143
144   //Setup event trigger and computables manager
145   trigger_manager_ = new EventTriggerManager(logger_, config_, mongo_connection_manager_);
146   computables_manager_ = new ComputablesManager(logger_, config_, this, clock_);
147
148   log_deb("Initialized RobotMemory");
149 }
150
151 void RobotMemory::loop()
152 {
153   trigger_manager_->check_events();
154   computables_manager_->cleanup_computed_docs();
155 }
156
157 /**
158  * Query information from the robot memory.
159  * @param query The query returned documents have to match (essentially a BSONObj)
160  * @param collection The database and collection to query as string (e.g. robmem.worldmodel)
161  * @return Cursor to get the documents from, NULL for invalid query
162  */
163 QResCursor RobotMemory::query(Query query, std::string collection)
164 {
165   check_collection_name(collection);
166   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
167   log_deb(std::string("Executing Query "+ query.toString() +" on collection "+collection));
168
169   //check if computation on demand is necessary and execute Computables
170   computables_manager_->check_and_compute(query, collection);
171
172   //lock (mongo_client not thread safe)
173   MutexLocker lock(mutex_);
174
175   //set read preference of query to nearest to read from the local replica set member first
176   query.readPref(ReadPreference_Nearest, BSONArray());
177
178   //actually execute query
179   QResCursor cursor;
180   try{
181     cursor = mongodb_client->query(collection, query);
182   } catch (DBException &e) {
183     std::string error = std::string("Error for query ")
184       + query.toString() + "\n Exception: " + e.toString();
185     log(error, "error");
186     return NULL;
187   }
188   return cursor;
189 }
190
191 /**
192  * Aggregation call on the robot memory.
193  * @param pipeline Series of commands defining the aggregation
194  * @param collection The database and collection to query as string (e.g. robmem.worldmodel)
195  * @return Result object
196  */
197 mongo::BSONObj
198 RobotMemory::aggregate(std::vector<mongo::BSONObj> pipeline, std::string collection)
199 {
200   check_collection_name(collection);
201   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
202   log_deb(std::string("Executing Aggregation on collection "+collection));
203
204   //TODO: check if computation on demand is necessary and execute Computables
205   // that might be complicated because you need to build a query to check against from the fields mentioned in the different parts of the pipeline
206   // A possible solution might be forcing the user to define the $match oject seperately and using it as query to check computables
207
208   //lock (mongo_client not thread safe)
209   MutexLocker lock(mutex_);
210
211   //actually execute aggregation as command (in more modern mongo-cxx versions there should be an easier way with a proper aggregate function)
212   BSONObj res;
213   //get db and collection name
214   size_t point_pos = collection.find(".");
215   if(point_pos == collection.npos)
216   {
217     logger_->log_error(name_, "Collection %s needs to start with 'dbname.'", collection.c_str());
218     return fromjson("{}");
219   }
220   std::string db = collection.substr(0, point_pos);
221   std::string col = collection.substr(point_pos+1);
222   try{
223     mongodb_client->runCommand(db, BSON("aggregate" << col  << "pipeline" << pipeline), res);
224   } catch (DBException &e) {
225     std::string error = std::string("Error for aggregation ")
226       + "\n Exception: " + e.toString();
227     log(error, "error");
228     return fromjson("{}");
229   }
230   return res;
231 }
232
233 /**
234  * Inserts a document into the robot memory
235  * @param obj The document as BSONObj
236  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
237  * @return 1: Success 0: Error
238  */
239 int RobotMemory::insert(mongo::BSONObj obj, std::string collection)
240 {
241   check_collection_name(collection);
242   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
243
244   log_deb(std::string("Inserting "+ obj.toString() + " into collection " + collection));
245
246   //lock (mongo_client not thread safe)
247   MutexLocker lock(mutex_);
248
249   //actually execute insert
250   try{
251     mongodb_client->insert(collection, obj);
252   } catch (DBException &e) {
253     std::string error = "Error for insert " + obj.toString()
254         + "\n Exception: " + e.toString();
255     log_deb(error, "error");
256     return 0;
257   }
258   //return success
259   return 1;
260 }
261
262 /** Create an index on a collection.
263  * @param obj The keys document as BSONObj
264  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
265  * @param unique true to create unique index
266  * @return 1: Success 0: Error
267  */
268 int
269 RobotMemory::create_index(mongo::BSONObj obj, std::string collection, bool unique)
270 {
271   check_collection_name(collection);
272   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
273
274   log_deb(std::string("Creating index "+ obj.toString() + " on collection " + collection));
275
276   //lock (mongo_client not thread safe)
277   MutexLocker lock(mutex_);
278
279   //actually execute insert
280   try{
281           mongodb_client->createIndex(collection, mongo::IndexSpec().addKeys(obj).unique(unique));
282   } catch (DBException &e) {
283           std::string error = "Error when creating index " + obj.toString()
284                   + "\n Exception: " + e.toString();
285           log_deb(error, "error");
286           return 0;
287   }
288   //return success
289   return 1;
290 }
291
292
293 /**
294  * Inserts all document of a vector into the robot memory
295  * @param v_obj The vector of BSONObj document
296  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
297  * @return 1: Success 0: Error
298  */
299 int RobotMemory::insert(std::vector<mongo::BSONObj> v_obj, std::string collection)
300 {
301   check_collection_name(collection);
302   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
303
304   std::string insert_string = "[";
305   for(BSONObj obj : v_obj)
306   {
307     insert_string += obj.toString() + ",\n";
308   }
309   insert_string += "]";
310
311   log_deb(std::string("Inserting vector of documents " + insert_string+  " into collection " + collection));
312
313   //lock (mongo_client not thread safe)
314   MutexLocker lock(mutex_);
315
316   //actually execute insert
317   try{
318     mongodb_client->insert(collection, v_obj);
319   } catch (DBException &e) {
320     std::string error = "Error for insert " + insert_string
321         + "\n Exception: " + e.toString();
322     log_deb(error, "error");
323     return 0;
324   }
325   //return success
326   return 1;
327 }
328
329 /**
330  * Inserts a document into the robot memory
331  * @param obj_str The document as json string
332  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
333  * @return 1: Success 0: Error
334  */
335 int RobotMemory::insert(std::string obj_str, std::string collection)
336 {
337   return insert(fromjson(obj_str), collection);
338 }
339
340 /**
341  * Updates documents in the robot memory
342  * @param query The query defining which documents to update
343  * @param update What to change in these documents
344  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
345  * @param upsert Should the update document be inserted if the query returns no documents?
346  * @return 1: Success 0: Error
347  */
348 int RobotMemory::update(mongo::Query query, mongo::BSONObj update, std::string collection, bool upsert)
349 {
350   check_collection_name(collection);
351   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
352   log_deb(std::string("Executing Update "+update.toString()+" for query "+query.toString()+" on collection "+ collection));
353
354   //lock (mongo_client not thread safe)
355   MutexLocker lock(mutex_);
356
357   //actually execute update
358   try{
359     mongodb_client->update(collection, query, update, upsert);
360   } catch (DBException &e) {
361     log_deb(std::string("Error for update "+update.toString()+" for query "+query.toString()+"\n Exception: "+e.toString()), "error");
362     return 0;
363   }
364   //return success
365   return 1;
366 }
367
368 /**
369  * Updates documents in the robot memory
370  * @param query The query defining which documents to update
371  * @param update_str What to change in these documents as json string
372  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
373  * @param upsert Should the update document be inserted if the query returns no documents?
374  * @return 1: Success 0: Error
375  */
376 int RobotMemory::update(mongo::Query query, std::string update_str, std::string collection, bool upsert)
377 {
378   return update(query, fromjson(update_str), collection, upsert);
379 }
380
381
382 /** Atomically update and retrieve document.
383  * @param filter The filter defining the document to update.
384  * If multiple match takes the first one.
385  * @param update Update statement. May only contain update operators!
386  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
387  * @param upsert Should the update document be inserted if the query returns no documents?
388  * @param return_new return the document before (false) or after (true) the update has been applied?
389  * @return document, depending on @p return_new either before or after the udpate has been applied.
390  */
391 mongo::BSONObj
392 RobotMemory::find_one_and_update(const mongo::BSONObj& filter, const mongo::BSONObj& update,
393                                  std::string collection, bool upsert, bool return_new)
394 {
395   check_collection_name(collection);
396   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
397
398   log_deb(std::string("Executing findOneAndUpdate "+update.toString()+
399                       " for filter "+filter.toString()+" on collection "+ collection));
400
401   MutexLocker lock(mutex_);
402
403   try{
404           return mongodb_client->findAndModify(collection, filter, update, upsert, return_new);
405   } catch (DBException &e) {
406           std::string error = "Error for update "+update.toString()+" for query "+
407                   filter.toString()+"\n Exception: "+e.toString();
408           log_deb(error, "error");
409     BSONObjBuilder b;
410     b.append("error", error);
411     return b.obj();
412   }
413 }
414
415 /**
416  * Remove documents from the robot memory
417  * @param query Which documents to remove
418  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
419  * @return 1: Success 0: Error
420  */
421 int RobotMemory::remove(mongo::Query query, std::string collection)
422 {
423   check_collection_name(collection);
424   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
425   log_deb(std::string("Executing Remove "+query.toString()+" on collection "+collection));
426
427   //lock (mongo_client not thread safe)
428   MutexLocker lock(mutex_);
429
430   //actually execute remove
431   try{
432     mongodb_client->remove(collection, query);
433   } catch (DBException &e) {
434     log_deb(std::string("Error for query "+query.toString()+"\n Exception: "+e.toString()), "error");
435     return 0;
436   }
437   //return success
438   return 1;
439 }
440
441 /**
442  * Performs a MapReduce operation on the robot memory (https://docs.mongodb.com/manual/core/map-reduce/)
443  * @param query Which documents to use for the map step
444  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
445  * @param js_map_fun Map function in JavaScript as string
446  * @param js_reduce_fun Reduce function in JavaScript as string
447  * @return BSONObj containing the result
448  */
449 mongo::BSONObj RobotMemory::mapreduce(mongo::Query query, std::string collection, std::string js_map_fun, std::string js_reduce_fun)
450 {
451   check_collection_name(collection);
452   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
453   MutexLocker lock(mutex_);
454   log_deb(std::string("Executing MapReduce "+query.toString()+" on collection "+collection+
455     " map: " + js_map_fun + " reduce: " + js_reduce_fun));
456   return mongodb_client->mapreduce(collection, js_map_fun, js_reduce_fun, query);
457 }
458
459 /**
460  * Performs an aggregation operation on the robot memory (https://docs.mongodb.com/v3.2/reference/method/db.collection.aggregate/)
461  * @param pipeline A sequence of data aggregation operations or stages. See the https://docs.mongodb.com/v3.2/reference/operator/aggregation-pipeline/ for details
462  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
463  * @return Cursor to get the documents from, NULL for invalid pipeline
464  */
465 QResCursor RobotMemory::aggregate(mongo::BSONObj pipeline, std::string collection)
466 {
467   check_collection_name(collection);
468   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
469   MutexLocker lock(mutex_);
470   log_deb(std::string("Executing Aggregation pipeline: "+pipeline.toString() +" on collection "+collection));
471
472   QResCursor cursor;
473   try{
474     cursor = mongodb_client->aggregate(collection, pipeline);
475   } catch (DBException &e) {
476     std::string error = std::string("Error for query ")
477       + pipeline.toString() + "\n Exception: " + e.toString();
478     log(error, "error");
479     return NULL;
480   }
481   return cursor;
482 }
483
484 /**
485  * Drop (= remove) a whole collection and all documents inside it
486  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
487  * @return 1: Success 0: Error
488  */
489 int RobotMemory::drop_collection(std::string collection)
490 {
491   check_collection_name(collection);
492   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
493   MutexLocker lock(mutex_);
494   log_deb("Dropping collection " + collection);
495   return mongodb_client->dropCollection(collection);
496 }
497
498 /**
499  * Remove the whole database of the robot memory and all documents inside
500  * @return 1: Success 0: Error
501  */
502 int RobotMemory::clear_memory()
503 {
504   //lock (mongo_client not thread safe)
505   MutexLocker lock(mutex_);
506
507   log_deb("Clearing whole robot memory");
508   mongodb_client_local_->dropDatabase(database_name_);
509   return 1;
510 }
511
512 /**
513  * Restore a previously dumped collection from a directory
514  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
515  * @param directory Directory of the dump
516  * @return 1: Success 0: Error
517  */
518 int RobotMemory::restore_collection(std::string collection, std::string directory)
519 {
520   check_collection_name(collection);
521   drop_collection(collection);
522
523   //lock (mongo_client not thread safe)
524    MutexLocker lock(mutex_);
525
526   //resolve path to restore
527   if(collection.find(".") == std::string::npos)
528   {
529     log(std::string("Unable to restore collection" + collection), "error");
530     log(std::string("Specify collection like 'db.collection'"), "error");
531     return 0;
532   }
533   std::string path = StringConversions::resolve_path(directory) + "/"
534       + collection.replace(collection.find("."),1,"/") + ".bson";
535   log_deb(std::string("Restore collection " + collection + " from " + path), "warn");
536
537   //call mongorestore from folder with initial restores
538   std::string command = "/usr/bin/mongorestore --dir " + path
539       + " --host=127.0.0.1 --quiet";
540   log_deb(std::string("Restore command: " + command), "warn");
541   FILE *bash_output = popen(command.c_str(), "r");
542
543   //check if output is ok
544   if(!bash_output)
545   {
546     log(std::string("Unable to restore collection" + collection), "error");
547     return 0;
548   }
549   std::string output_string = "";
550   char buffer[100];
551   while (!feof(bash_output) )
552   {
553     if (fgets(buffer, 100, bash_output) == NULL)
554     {
555       break;
556     }
557     output_string += buffer;
558   }
559   pclose(bash_output);
560   if(output_string.find("Failed") != std::string::npos)
561   {
562     log(std::string("Unable to restore collection" + collection), "error");
563     log_deb(output_string, "error");
564     return 0;
565   }
566   return 1;
567 }
568
569 /**
570  * Dump (= save) a collection to the filesystem to restore it later
571  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
572  * @param directory Directory to dump the collection to
573  * @return 1: Success 0: Error
574  */
575 int RobotMemory::dump_collection(std::string collection, std::string directory)
576 {
577   check_collection_name(collection);
578
579   //lock (mongo_client not thread safe)
580    MutexLocker lock(mutex_);
581
582   //resolve path to dump to
583   if(collection.find(".") == std::string::npos)
584   {
585     log(std::string("Unable to dump collection" + collection), "error");
586     log(std::string("Specify collection like 'db.collection'"), "error");
587     return 0;
588   }
589   std::string path = StringConversions::resolve_path(directory);
590   log_deb(std::string("Dump collection " + collection + " into " + path), "warn");
591
592   //call mongorestore from folder with initial restores
593   std::vector<std::string> split = str_split(collection, '.');
594   std::string command = "/usr/bin/mongodump --out=" + path + " --db=" + split[0]
595     + " --collection=" + split[1] + " --host=127.0.0.1 --quiet";
596   log_deb(std::string("Dump command: " + command), "warn");
597   FILE *bash_output = popen(command.c_str(), "r");
598   //check if output is ok
599   if(!bash_output)
600   {
601     log(std::string("Unable to dump collection" + collection), "error");
602     return 0;
603   }
604   std::string output_string = "";
605   char buffer[100];
606   while (!feof(bash_output) )
607   {
608     if (fgets(buffer, 100, bash_output) == NULL)
609     {
610       break;
611     }
612     output_string += buffer;
613   }
614   pclose(bash_output);
615   if(output_string.find("Failed") != std::string::npos)
616   {
617     log(std::string("Unable to dump collection" + collection), "error");
618     log_deb(output_string, "error");
619     return 0;
620   }
621   return 1;
622 }
623
624 void
625 RobotMemory::log(std::string what, std::string info)
626 {
627   if(!info.compare("error"))
628       logger_->log_error(name_, "%s", what.c_str());
629   else if(!info.compare("warn"))
630     logger_->log_warn(name_, "%s", what.c_str());
631   else if(!info.compare("debug"))
632     logger_->log_debug(name_, "%s", what.c_str());
633   else
634     logger_->log_info(name_, "%s", what.c_str());
635 }
636
637 void
638 RobotMemory::log_deb(std::string what, std::string level)
639 {
640   if(debug_)
641     log(what, level);
642 }
643
644 void
645 RobotMemory::log_deb(mongo::Query query, std::string what, std::string level)
646 {
647   if(debug_)
648     log(query, what, level);
649 }
650
651 void
652 RobotMemory::log(mongo::Query query, std::string what, std::string level)
653 {
654   std::string output = what
655     + "\nFilter: " + query.getFilter().toString()
656     + "\nModifiers: " + query.getModifiers().toString()
657     + "\nSort: " + query.getSort().toString()
658     + "\nHint: " + query.getHint().toString()
659     + "\nReadPref: " + query.getReadPref().toString();
660   log(output, level);
661 }
662
663 void
664 RobotMemory::log_deb(mongo::BSONObj obj, std::string what, std::string level)
665 {
666   log(obj, what, level);
667 }
668
669 void
670 RobotMemory::log(mongo::BSONObj obj, std::string what, std::string level)
671 {
672   std::string output = what
673     + "\nObject: " + obj.toString();
674   log(output, level);
675 }
676
677 void
678 RobotMemory::set_fields(mongo::BSONObj &obj, std::string what)
679 {
680   BSONObjBuilder b;
681   b.appendElements(obj);
682   b.appendElements(fromjson(what));
683   //override
684   obj = b.obj();
685 }
686
687 void
688 RobotMemory::set_fields(mongo::Query &q, std::string what)
689 {
690   BSONObjBuilder b;
691   b.appendElements(q.getFilter());
692   b.appendElements(fromjson(what));
693
694   //the following is not yet kept in the query:
695   // + "\nFilter: " + query.getFilter().toString()
696   // + "\nModifiers: " + query.getModifiers().toString()
697   // + "\nSort: " + query.getSort().toString()
698   // + "\nHint: " + query.getHint().toString()
699   // + "\nReadPref: " + query.getReadPref().toString();
700
701   //override
702   q = Query(b.obj());
703 }
704
705 void
706 RobotMemory::remove_field(mongo::Query &q, std::string what)
707 {
708   BSONObjBuilder b;
709   b.appendElements(q.getFilter().removeField(what));
710
711   //the following is not yet kept in the query:
712   // + "\nFilter: " + query.getFilter().toString()
713   // + "\nModifiers: " + query.getModifiers().toString()
714   // + "\nSort: " + query.getSort().toString()
715   // + "\nHint: " + query.getHint().toString()
716   // + "\nReadPref: " + query.getReadPref().toString();
717
718   //override
719   q = Query(b.obj());
720 }
721
722 /**
723  * Check if collection name is valid and correct it if necessary
724  */
725 void
726 RobotMemory::check_collection_name(std::string &collection)
727 {
728   if(collection == "")
729   {
730       collection = default_collection_;
731   }
732   if(database_name_ != "robmem" && collection.find("robmem.") == 0)
733   {
734     //change used database name (e.g. for the case of multiple simulated dababases)
735     collection.replace(0, 6, database_name_);
736   }
737 }
738
739 /**
740  * Get the mongodb client associated with the collection (eighter the local or distributed one)
741  */
742 mongo::DBClientBase*
743 RobotMemory::get_mongodb_client(std::string &collection)
744 {
745   if(!distributed_)
746   {
747       return mongodb_client_local_;
748   }
749   //get db name of collection
750   size_t point_pos = collection.find(".");
751   if(point_pos == collection.npos)
752   {
753     logger_->log_error(name_, "Collection %s needs to start with 'dbname.'", collection.c_str());
754     return mongodb_client_local_;
755   }
756   std::string db = collection.substr(0, point_pos);
757   if(std::find(distributed_dbs_.begin(), distributed_dbs_.end(), db) != distributed_dbs_.end())
758   {
759     return mongodb_client_distributed_;
760   }
761   return mongodb_client_local_;
762 }
763
764 /**
765  * Remove a previously registered trigger
766  * @param trigger Pointer to the trigger to remove
767  */
768 void RobotMemory::remove_trigger(EventTrigger* trigger)
769 {
770   trigger_manager_->remove_trigger(trigger);
771 }
772
773 /**
774  * Remove previously registered computable
775  * @param computable The computable to remove
776  */
777 void RobotMemory::remove_computable(Computable* computable)
778 {
779   computables_manager_->remove_computable(computable);
780 }
781
782 /** Explicitly create a mutex.
783  * This is an optional step, a mutex is also created automatically when trying
784  * to acquire the lock for the first time. Adding it explicitly may increase
785  * visibility, e.g., in the database. Use it for mutexes which are locked
786  * only very infrequently.
787  * @param name mutex name
788  * @return true if operation was successful, false on failure
789  */
790 bool
791 RobotMemory::mutex_create(const std::string& name)
792 {
793         mongo::DBClientInterface *client =
794                 distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
795         mongo::BSONObjBuilder insert_doc;
796         insert_doc.append("$currentDate", BSON("lock-time" << true));
797         insert_doc.append("_id", name);
798         insert_doc.append("locked", false);
799         try {
800                 client->insert(cfg_coord_mutex_collection_, insert_doc.obj(),
801                                0, &mongo::WriteConcern::majority);
802                 return true;
803         } catch (mongo::DBException &e) {
804                 logger_->log_info(name_, "Failed to create mutex %s: %s", name.c_str(), e.what());
805                 return false;
806         }
807 }
808
809 /** Destroy a mutex.
810  * The mutex is erased from the database. This is done disregarding it's current
811  * lock state.
812  * @param name mutex name
813  * @return true if operation was successful, false on failure
814  */
815 bool
816 RobotMemory::mutex_destroy(const std::string& name)
817 {
818         mongo::DBClientInterface *client =
819                 distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
820         mongo::BSONObj destroy_doc{BSON("_id" << name)};
821         try {
822                 client->remove(cfg_coord_mutex_collection_, destroy_doc,
823                                true, &mongo::WriteConcern::majority);
824                 return true;
825         } catch (mongo::DBException &e) {
826                 logger_->log_info(name_, "Failed to destroy mutex %s: %s", name.c_str(), e.what());
827                 return false;
828         }
829 }
830
831 /** Try to acquire a lock for a mutex.
832  * This will access the database and atomically find and update (or
833  * insert) a mutex lock. If the mutex has not been created it is added
834  * automatically. If the lock cannot be acquired the function also
835  * returns immediately. There is no blocked waiting for the lock.
836  * @param name mutex name
837  * @param identity string to set as lock-holder
838  * @param force true to force acquisition of the lock, i.e., even if
839  * the lock has already been acquired take ownership (steal the lock).
840  * @return true if operation was successful, false on failure
841  */
842 bool
843 RobotMemory::mutex_try_lock(const std::string& name,
844                             std::string identity, bool force)
845 {
846         mongo::DBClientBase *client =
847                 distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
848
849         if (identity.empty()) {
850                 HostInfo host_info;
851                 identity = host_info.name();
852         }
853
854         // here we can add an $or to implement lock timeouts
855         mongo::BSONObjBuilder filter_doc;
856         filter_doc.append("_id", name);
857         if (! force) {
858                 filter_doc.append("locked", false);
859         }
860
861         mongo::BSONObjBuilder update_doc;
862         update_doc.append("$currentDate", BSON("lock-time" << true));
863         mongo::BSONObjBuilder update_set;
864         update_set.append("locked", true);
865         update_set.append("locked-by", identity);
866         update_doc.append("$set", update_set.obj());
867
868         try {
869                 BSONObj new_doc =
870                         client->findAndModify(cfg_coord_mutex_collection_,
871                                               filter_doc.obj(), update_doc.obj(),
872                                               /* upsert */ true, /* return new */ true,
873                                               /* sort */ BSONObj(), /* fields */ BSONObj(),
874                                               &mongo::WriteConcern::majority);
875
876                 return (new_doc.getField("locked-by").String() == identity &&
877                         new_doc.getField("locked").Bool());
878
879         } catch (mongo::OperationException &e) {
880                 //if (e.obj()["code"].numberInt() != 11000) {
881                 // 11000: Duplicate key exception, occurs if we do not become leader, all fine
882                 return false;
883         }
884 }
885
886 /** Try to acquire a lock for a mutex.
887  * This will access the database and atomically find and update (or
888  * insert) a mutex lock. If the mutex has not been created it is added
889  * automatically. If the lock cannot be acquired the function also
890  * returns immediately. There is no blocked waiting for the lock.
891  * @param name mutex name
892  * @param force true to force acquisition of the lock, i.e., even if
893  * the lock has already been acquired take ownership (steal the lock).
894  * @return true if operation was successful, false on failure
895  */
896 bool
897 RobotMemory::mutex_try_lock(const std::string& name, bool force)
898 {
899         return mutex_try_lock(name, "", force);
900 }
901
902 /** Release lock on mutex.
903  * @param name mutex name
904  * @param identity string to set as lock-holder
905  * @return true if operation was successful, false on failure
906  */
907 bool
908 RobotMemory::mutex_unlock(const std::string& name,
909                           std::string identity)
910 {
911         mongo::DBClientBase *client =
912                 distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
913
914         if (identity.empty()) {
915                 HostInfo host_info;
916                 identity = host_info.name();
917         }
918
919         // here we can add an $or to implement lock timeouts
920         mongo::BSONObj filter_doc{BSON("_id" << name << "locked-by" << identity)};
921
922         mongo::BSONObj update_doc{BSON("$set" << BSON("locked" << false) <<
923                                        "$unset" << BSON("locked-by" << true <<
924                                                         "lock-time" << true))};
925
926         try {
927                 BSONObj new_doc =
928                         client->findAndModify(cfg_coord_mutex_collection_,
929                                               filter_doc, update_doc,
930                                               /* upsert */ true, /* return new */ true,
931                                               /* sort */ BSONObj(), /* fields */ BSONObj(),
932                                               &mongo::WriteConcern::majority);
933
934                 return true;
935         } catch (mongo::OperationException &e) {
936                 return false;
937         }
938 }
939
940
941 /** Renew a mutex.
942  * Renewing means updating the lock timestamp to the current time to
943  * avoid expiration. Note that the lock must currently be held by
944  * the given identity.
945  * @param name mutex name
946  * @param identity string to set as lock-holder (defaults to hostname
947  * if empty)
948  * @return true if operation was successful, false on failure
949  */
950 bool
951 RobotMemory::mutex_renew_lock(const std::string& name,
952                               std::string identity)
953 {
954         mongo::DBClientBase *client =
955                 distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
956
957         if (identity.empty()) {
958                 HostInfo host_info;
959                 identity = host_info.name();
960         }
961
962         // here we can add an $or to implement lock timeouts
963         mongo::BSONObj filter_doc{BSON("_id" << name <<
964                                        "locked" << true <<
965                                        "locked-by" << identity)};
966
967         // we set all data, even the data which is not actually modified, to
968         // make it easier to process the update in triggers.
969         mongo::BSONObjBuilder update_doc;
970         update_doc.append("$currentDate", BSON("lock-time" << true));
971         mongo::BSONObjBuilder update_set;
972         update_set.append("locked", true);
973         update_set.append("locked-by", identity);
974         update_doc.append("$set", update_set.obj());
975
976         try {
977                 BSONObj new_doc =
978                         client->findAndModify(cfg_coord_mutex_collection_,
979                                               filter_doc, update_doc.obj(),
980                                               /* upsert */ false, /* return new */ true,
981                                               /* sort */ BSONObj(), /* fields */ BSONObj(),
982                                               &mongo::WriteConcern::majority);
983
984                 return true;
985         } catch (mongo::OperationException &e) {
986                 logger_->log_warn(name_, "Renewing lock on mutex %s failed: %s",
987                                   name.c_str(), e.what());
988                 return false;
989         }
990 }
991
992
993 /** Setup time-to-live index for mutexes.
994  * Setting up a time-to-live index for mutexes enables automatic
995  * expiration through the database. Note, however, that the documents
996  * are expired only every 60 seconds. This has two consequences:
997  * - max_age_sec lower than 60 seconds cannot be achieved
998  * - locks may be held for up to just below 60 seconds longer than
999  *   configured, i.e., if the mutex had not yet expired when the
1000  *   background tasks runs.
1001  * @param max_age_sec maximum age of locks in seconds
1002  * @return true if operation was successful, false on failure
1003  */
1004 bool
1005 RobotMemory::mutex_setup_ttl(float max_age_sec)
1006 {
1007   MutexLocker lock(mutex_);
1008
1009         mongo::DBClientBase *client =
1010                 distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1011
1012         BSONObj keys = BSON("lock-time" << true);
1013
1014         try {
1015                 client->createIndex(cfg_coord_mutex_collection_, mongo::IndexSpec()
1016                                     .addKeys(keys)
1017                                     .expireAfterSeconds(max_age_sec));
1018   } catch (DBException &e) {
1019                 logger_->log_warn(name_, "Creating TTL index failed: %s", e.what());
1020           return false;
1021   }
1022   return true;
1023 }
1024
1025 /** Expire old locks on mutexes.
1026  * This will update the database and set all mutexes to unlocked for
1027  * which the lock-time is older than the given maximum age.
1028  * @param max_age_sec maximum age of locks in seconds
1029  * @return true if operation was successful, false on failure
1030  */
1031 bool
1032 RobotMemory::mutex_expire_locks(float max_age_sec)
1033 {
1034         mongo::DBClientBase *client =
1035                 distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1036
1037         using std::chrono::milliseconds;
1038         using std::chrono::high_resolution_clock;
1039         using std::chrono::time_point;
1040         using std::chrono::time_point_cast;
1041
1042         auto max_age_ms = milliseconds(static_cast<unsigned long int>(std::floor(max_age_sec*1000)));
1043         time_point<high_resolution_clock, milliseconds> expire_before =
1044                 time_point_cast<milliseconds>(high_resolution_clock::now()) - max_age_ms;
1045         mongo::Date_t   expire_before_mdb(expire_before.time_since_epoch().count());
1046
1047         // here we can add an $or to implement lock timeouts
1048         mongo::BSONObj filter_doc{BSON("locked" << true <<
1049                                        "lock-time" << mongo::LT << expire_before_mdb)};
1050
1051         try {
1052                 client->remove(cfg_coord_mutex_collection_, filter_doc,
1053                                true, &mongo::WriteConcern::majority);
1054
1055                 return true;
1056         } catch (mongo::OperationException &e) {
1057                 return false;
1058         }
1059 }