Please note that active projects have migrated to https://github.com/fawkesrobotics.

d3616855f665c4b811de4f32954754732e61e699
[fawkes.git] / src / plugins / robot-memory / robot_memory.cpp
1 /***************************************************************************
2  *  robot_memory.cpp - Class for storing and querying information in the RobotMemory
3  *    
4  *  Created: Aug 23, 2016 1:34:32 PM 2016
5  *  Copyright  2016  Frederik Zwilling
6  *             2017 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8
9 /*  This program is free software; you can redistribute it and/or modify
10  *  it under the terms of the GNU General Public License as published by
11  *  the Free Software Foundation; either version 2 of the License, or
12  *  (at your option) any later version.
13  *
14  *  This program is distributed in the hope that it will be useful,
15  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *  GNU Library General Public License for more details.
18  *
19  *  Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21
22 #include "robot_memory.h"
23
24 #include <core/threading/mutex.h>
25 #include <core/threading/mutex_locker.h>
26 #include <utils/misc/string_conversions.h>
27 #include <utils/misc/string_split.h>
28 #include <utils/system/hostinfo.h>
29
30 #include <string>
31 #include <chrono>
32 #include <thread>
33
34 // from MongoDB
35 #include <mongo/client/dbclient.h>
36
37 using namespace mongo;
38 using namespace fawkes;
39
40 /** @class RobotMemory "robot_memory.h"
41  * Access to the robot memory based on mongodb.
42  * Using this class, you can query/insert/remove/update information in
43  * the robot memory.  Furthermore, you can register trigger to get
44  * notified when something was changed in the robot memory matching
45  * your query and you can access computables, which are on demand
46  * computed information, by registering the computables and then
47  * querying as if the information would already be in the database.
48  * @author Frederik Zwilling
49  */
50
51 /**
52  * Robot Memory Constructor with objects of the thread
53  * @param config Fawkes config
54  * @param logger Fawkes logger
55  * @param clock Fawkes clock
56  * @param mongo_connection_manager MongoDBConnCreator to create client connections to the shared and local db
57  * @param blackboard Fawkes blackboard
58  */
59 RobotMemory::RobotMemory(fawkes::Configuration* config, fawkes::Logger* logger,
60    fawkes::Clock* clock, fawkes::MongoDBConnCreator* mongo_connection_manager,
61    fawkes::BlackBoard* blackboard)
62 {
63   mutex_ = new Mutex();
64   config_ = config;
65   logger_ = logger;
66   clock_ = clock;
67   mongo_connection_manager_ = mongo_connection_manager;
68   blackboard_ = blackboard;
69   debug_ = false;
70 }
71
72 RobotMemory::~RobotMemory()
73 {
74   mongo_connection_manager_->delete_client(mongodb_client_local_);
75   mongo_connection_manager_->delete_client(mongodb_client_distributed_);
76   delete mutex_;
77   delete trigger_manager_;
78   blackboard_->close(rm_if_);
79 }
80
81 void RobotMemory::init()
82 {
83   //load config values
84   log("Started RobotMemory");
85   default_collection_ = "robmem.test";
86   try {
87     default_collection_ = config_->get_string("/plugins/robot-memory/default-collection");
88   } catch (Exception &e) {}
89   try {
90     debug_ = config_->get_bool("/plugins/robot-memory/more-debug-output");
91   } catch (Exception &e) {}
92   database_name_ = "robmem";
93   try {
94     database_name_ = config_->get_string("/plugins/robot-memory/database");
95   } catch (Exception &e) {}
96   distributed_dbs_ = config_->get_strings("/plugins/robot-memory/distributed-db-names");
97   cfg_startup_grace_period_ = 10;
98   try {
99           cfg_startup_grace_period_ = config_->get_uint("/plugins/robot-memory/startup-grace-period");
100   } catch (Exception &e) {} // ignored, use default
101
102   cfg_coord_database_ = config_->get_string("/plugins/robot-memory/coordination/database");
103   cfg_coord_mutex_collection_ = config_->get_string("/plugins/robot-memory/coordination/mutex-collection");
104   cfg_coord_mutex_collection_ = cfg_coord_database_ + "." + cfg_coord_mutex_collection_;
105
106   using namespace std::chrono_literals;
107   
108   //initiate mongodb connections:
109   log("Connect to local mongod");
110   unsigned int startup_tries = 0;
111   for (; startup_tries < cfg_startup_grace_period_ * 2; ++startup_tries) {
112           try {
113                   mongodb_client_local_ = mongo_connection_manager_->create_client("robot-memory-local");
114                   break;
115           } catch (fawkes::Exception &e) {
116                   logger_->log_info(name_, "Waiting");
117                   std::this_thread::sleep_for(500ms);
118           }
119   }
120
121   if (config_->exists("/plugins/mongodb/clients/robot-memory-distributed/enabled") &&
122       config_->get_bool("/plugins/mongodb/clients/robot-memory-distributed/enabled"))
123   {
124           distributed_ = true;
125     log("Connect to distributed mongod");
126     for (; startup_tries < cfg_startup_grace_period_ * 2; ++startup_tries) {
127             try {
128                     mongodb_client_distributed_ = mongo_connection_manager_->create_client("robot-memory-distributed");
129                     break;
130             } catch (fawkes::Exception &e) {
131                     logger_->log_info(name_, "Waiting");
132                     std::this_thread::sleep_for(500ms);
133             }
134     }
135   }
136
137   //init blackboard interface
138   rm_if_ = blackboard_->open_for_writing<RobotMemoryInterface>(config_->get_string("/plugins/robot-memory/interface-name").c_str());
139   rm_if_->set_error("");
140   rm_if_->set_result("");
141   rm_if_->write();
142
143   //Setup event trigger and computables manager
144   trigger_manager_ = new EventTriggerManager(logger_, config_, mongo_connection_manager_);
145   computables_manager_ = new ComputablesManager(logger_, config_, this, clock_);
146
147   log_deb("Initialized RobotMemory");
148 }
149
150 void RobotMemory::loop()
151 {
152   trigger_manager_->check_events();
153   computables_manager_->cleanup_computed_docs();
154 }
155
156 /**
157  * Query information from the robot memory.
158  * @param query The query returned documents have to match (essentially a BSONObj)
159  * @param collection The database and collection to query as string (e.g. robmem.worldmodel)
160  * @return Cursor to get the documents from, NULL for invalid query
161  */
162 QResCursor RobotMemory::query(Query query, std::string collection)
163 {
164   check_collection_name(collection);
165   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
166   log_deb(std::string("Executing Query "+ query.toString() +" on collection "+collection));
167
168   //check if computation on demand is necessary and execute Computables
169   computables_manager_->check_and_compute(query, collection);
170
171   //lock (mongo_client not thread safe)
172   MutexLocker lock(mutex_);
173
174   //set read preference of query to nearest to read from the local replica set member first
175   query.readPref(ReadPreference_Nearest, BSONArray());
176
177   //actually execute query
178   QResCursor cursor;
179   try{
180     cursor = mongodb_client->query(collection, query);
181   } catch (DBException &e) {
182     std::string error = std::string("Error for query ")
183       + query.toString() + "\n Exception: " + e.toString();
184     log(error, "error");
185     return NULL;
186   }
187   return cursor;
188 }
189
190 /**
191  * Aggregation call on the robot memory.
192  * @param pipeline Series of commands defining the aggregation
193  * @param collection The database and collection to query as string (e.g. robmem.worldmodel)
194  * @return Result object
195  */
196 mongo::BSONObj
197 RobotMemory::aggregate(std::vector<mongo::BSONObj> pipeline, std::string collection)
198 {
199   check_collection_name(collection);
200   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
201   log_deb(std::string("Executing Aggregation on collection "+collection));
202
203   //TODO: check if computation on demand is necessary and execute Computables
204   // that might be complicated because you need to build a query to check against from the fields mentioned in the different parts of the pipeline
205   // A possible solution might be forcing the user to define the $match oject seperately and using it as query to check computables
206
207   //lock (mongo_client not thread safe)
208   MutexLocker lock(mutex_);
209
210   //actually execute aggregation as command (in more modern mongo-cxx versions there should be an easier way with a proper aggregate function)
211   BSONObj res;
212   //get db and collection name
213   size_t point_pos = collection.find(".");
214   if(point_pos == collection.npos)
215   {
216     logger_->log_error(name_, "Collection %s needs to start with 'dbname.'", collection.c_str());
217     return fromjson("{}");
218   }
219   std::string db = collection.substr(0, point_pos);
220   std::string col = collection.substr(point_pos+1);
221   try{
222     mongodb_client->runCommand(db, BSON("aggregate" << col  << "pipeline" << pipeline), res);
223   } catch (DBException &e) {
224     std::string error = std::string("Error for aggregation ")
225       + "\n Exception: " + e.toString();
226     log(error, "error");
227     return fromjson("{}");
228   }
229   return res;
230 }
231
232 /**
233  * Inserts a document into the robot memory
234  * @param obj The document as BSONObj
235  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
236  * @return 1: Success 0: Error
237  */
238 int RobotMemory::insert(mongo::BSONObj obj, std::string collection)
239 {
240   check_collection_name(collection);
241   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
242
243   log_deb(std::string("Inserting "+ obj.toString() + " into collection " + collection));
244
245   //lock (mongo_client not thread safe)
246   MutexLocker lock(mutex_);
247
248   //actually execute insert
249   try{
250     mongodb_client->insert(collection, obj);
251   } catch (DBException &e) {
252     std::string error = "Error for insert " + obj.toString()
253         + "\n Exception: " + e.toString();
254     log_deb(error, "error");
255     return 0;
256   }
257   //return success
258   return 1;
259 }
260
261 /** Create an index on a collection.
262  * @param obj The keys document as BSONObj
263  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
264  * @param unique true to create unique index
265  * @return 1: Success 0: Error
266  */
267 int
268 RobotMemory::create_index(mongo::BSONObj obj, std::string collection, bool unique)
269 {
270   check_collection_name(collection);
271   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
272
273   log_deb(std::string("Creating index "+ obj.toString() + " on collection " + collection));
274
275   //lock (mongo_client not thread safe)
276   MutexLocker lock(mutex_);
277
278   //actually execute insert
279   try{
280           mongodb_client->createIndex(collection, mongo::IndexSpec().addKeys(obj).unique(unique));
281   } catch (DBException &e) {
282           std::string error = "Error when creating index " + obj.toString()
283                   + "\n Exception: " + e.toString();
284           log_deb(error, "error");
285           return 0;
286   }
287   //return success
288   return 1;
289 }
290
291
292 /**
293  * Inserts all document of a vector into the robot memory
294  * @param v_obj The vector of BSONObj document
295  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
296  * @return 1: Success 0: Error
297  */
298 int RobotMemory::insert(std::vector<mongo::BSONObj> v_obj, std::string collection)
299 {
300   check_collection_name(collection);
301   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
302
303   std::string insert_string = "[";
304   for(BSONObj obj : v_obj)
305   {
306     insert_string += obj.toString() + ",\n";
307   }
308   insert_string += "]";
309
310   log_deb(std::string("Inserting vector of documents " + insert_string+  " into collection " + collection));
311
312   //lock (mongo_client not thread safe)
313   MutexLocker lock(mutex_);
314
315   //actually execute insert
316   try{
317     mongodb_client->insert(collection, v_obj);
318   } catch (DBException &e) {
319     std::string error = "Error for insert " + insert_string
320         + "\n Exception: " + e.toString();
321     log_deb(error, "error");
322     return 0;
323   }
324   //return success
325   return 1;
326 }
327
328 /**
329  * Inserts a document into the robot memory
330  * @param obj_str The document as json string
331  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
332  * @return 1: Success 0: Error
333  */
334 int RobotMemory::insert(std::string obj_str, std::string collection)
335 {
336   return insert(fromjson(obj_str), collection);
337 }
338
339 /**
340  * Updates documents in the robot memory
341  * @param query The query defining which documents to update
342  * @param update What to change in these documents
343  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
344  * @param upsert Should the update document be inserted if the query returns no documents?
345  * @return 1: Success 0: Error
346  */
347 int RobotMemory::update(mongo::Query query, mongo::BSONObj update, std::string collection, bool upsert)
348 {
349   check_collection_name(collection);
350   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
351   log_deb(std::string("Executing Update "+update.toString()+" for query "+query.toString()+" on collection "+ collection));
352
353   //lock (mongo_client not thread safe)
354   MutexLocker lock(mutex_);
355
356   //actually execute update
357   try{
358     mongodb_client->update(collection, query, update, upsert);
359   } catch (DBException &e) {
360     log_deb(std::string("Error for update "+update.toString()+" for query "+query.toString()+"\n Exception: "+e.toString()), "error");
361     return 0;
362   }
363   //return success
364   return 1;
365 }
366
367 /**
368  * Updates documents in the robot memory
369  * @param query The query defining which documents to update
370  * @param update_str What to change in these documents as json string
371  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
372  * @param upsert Should the update document be inserted if the query returns no documents?
373  * @return 1: Success 0: Error
374  */
375 int RobotMemory::update(mongo::Query query, std::string update_str, std::string collection, bool upsert)
376 {
377   return update(query, fromjson(update_str), collection, upsert);
378 }
379
380
381 /** Atomically update and retrieve document.
382  * @param filter The filter defining the document to update.
383  * If multiple match takes the first one.
384  * @param update Update statement. May only contain update operators!
385  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
386  * @param upsert Should the update document be inserted if the query returns no documents?
387  * @param return_new return the document before (false) or after (true) the update has been applied?
388  * @return document, depending on @p return_new either before or after the udpate has been applied.
389  */
390 mongo::BSONObj
391 RobotMemory::find_one_and_update(const mongo::BSONObj& filter, const mongo::BSONObj& update,
392                                  std::string collection, bool upsert, bool return_new)
393 {
394   check_collection_name(collection);
395   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
396
397   log_deb(std::string("Executing findOneAndUpdate "+update.toString()+
398                       " for filter "+filter.toString()+" on collection "+ collection));
399
400   MutexLocker lock(mutex_);
401
402   try{
403           return mongodb_client->findAndModify(collection, filter, update, upsert, return_new);
404   } catch (DBException &e) {
405           std::string error = "Error for update "+update.toString()+" for query "+
406                   filter.toString()+"\n Exception: "+e.toString();
407           log_deb(error, "error");
408     BSONObjBuilder b;
409     b.append("error", error);
410     return b.obj();
411   }
412 }
413
414 /**
415  * Remove documents from the robot memory
416  * @param query Which documents to remove
417  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
418  * @return 1: Success 0: Error
419  */
420 int RobotMemory::remove(mongo::Query query, std::string collection)
421 {
422   check_collection_name(collection);
423   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
424   log_deb(std::string("Executing Remove "+query.toString()+" on collection "+collection));
425
426   //lock (mongo_client not thread safe)
427   MutexLocker lock(mutex_);
428
429   //actually execute remove
430   try{
431     mongodb_client->remove(collection, query);
432   } catch (DBException &e) {
433     log_deb(std::string("Error for query "+query.toString()+"\n Exception: "+e.toString()), "error");
434     return 0;
435   }
436   //return success
437   return 1;
438 }
439
440 /**
441  * Performs a MapReduce operation on the robot memory (https://docs.mongodb.com/manual/core/map-reduce/)
442  * @param query Which documents to use for the map step
443  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
444  * @param js_map_fun Map function in JavaScript as string
445  * @param js_reduce_fun Reduce function in JavaScript as string
446  * @return BSONObj containing the result
447  */
448 mongo::BSONObj RobotMemory::mapreduce(mongo::Query query, std::string collection, std::string js_map_fun, std::string js_reduce_fun)
449 {
450   check_collection_name(collection);
451   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
452   MutexLocker lock(mutex_);
453   log_deb(std::string("Executing MapReduce "+query.toString()+" on collection "+collection+
454     " map: " + js_map_fun + " reduce: " + js_reduce_fun));
455   return mongodb_client->mapreduce(collection, js_map_fun, js_reduce_fun, query);
456 }
457
458 /**
459  * Performs an aggregation operation on the robot memory (https://docs.mongodb.com/v3.2/reference/method/db.collection.aggregate/)
460  * @param pipeline A sequence of data aggregation operations or stages. See the https://docs.mongodb.com/v3.2/reference/operator/aggregation-pipeline/ for details
461  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
462  * @return Cursor to get the documents from, NULL for invalid pipeline
463  */
464 QResCursor RobotMemory::aggregate(mongo::BSONObj pipeline, std::string collection)
465 {
466   check_collection_name(collection);
467   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
468   MutexLocker lock(mutex_);
469   log_deb(std::string("Executing Aggregation pipeline: "+pipeline.toString() +" on collection "+collection));
470
471   QResCursor cursor;
472   try{
473     cursor = mongodb_client->aggregate(collection, pipeline);
474   } catch (DBException &e) {
475     std::string error = std::string("Error for query ")
476       + pipeline.toString() + "\n Exception: " + e.toString();
477     log(error, "error");
478     return NULL;
479   }
480   return cursor;
481 }
482
483 /**
484  * Drop (= remove) a whole collection and all documents inside it
485  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
486  * @return 1: Success 0: Error
487  */
488 int RobotMemory::drop_collection(std::string collection)
489 {
490   check_collection_name(collection);
491   mongo::DBClientBase* mongodb_client = get_mongodb_client(collection);
492   MutexLocker lock(mutex_);
493   log_deb("Dropping collection " + collection);
494   return mongodb_client->dropCollection(collection);
495 }
496
497 /**
498  * Remove the whole database of the robot memory and all documents inside
499  * @return 1: Success 0: Error
500  */
501 int RobotMemory::clear_memory()
502 {
503   //lock (mongo_client not thread safe)
504   MutexLocker lock(mutex_);
505
506   log_deb("Clearing whole robot memory");
507   mongodb_client_local_->dropDatabase(database_name_);
508   return 1;
509 }
510
511 /**
512  * Restore a previously dumped collection from a directory
513  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
514  * @param directory Directory of the dump
515  * @return 1: Success 0: Error
516  */
517 int RobotMemory::restore_collection(std::string collection, std::string directory)
518 {
519   check_collection_name(collection);
520   drop_collection(collection);
521
522   //lock (mongo_client not thread safe)
523    MutexLocker lock(mutex_);
524
525   //resolve path to restore
526   if(collection.find(".") == std::string::npos)
527   {
528     log(std::string("Unable to restore collection" + collection), "error");
529     log(std::string("Specify collection like 'db.collection'"), "error");
530     return 0;
531   }
532   std::string path = StringConversions::resolve_path(directory) + "/"
533       + collection.replace(collection.find("."),1,"/") + ".bson";
534   log_deb(std::string("Restore collection " + collection + " from " + path), "warn");
535
536   //call mongorestore from folder with initial restores
537   std::string command = "/usr/bin/mongorestore --dir " + path
538       + " --host=127.0.0.1 --quiet";
539   log_deb(std::string("Restore command: " + command), "warn");
540   FILE *bash_output = popen(command.c_str(), "r");
541
542   //check if output is ok
543   if(!bash_output)
544   {
545     log(std::string("Unable to restore collection" + collection), "error");
546     return 0;
547   }
548   std::string output_string = "";
549   char buffer[100];
550   while (!feof(bash_output) )
551   {
552     if (fgets(buffer, 100, bash_output) == NULL)
553     {
554       break;
555     }
556     output_string += buffer;
557   }
558   pclose(bash_output);
559   if(output_string.find("Failed") != std::string::npos)
560   {
561     log(std::string("Unable to restore collection" + collection), "error");
562     log_deb(output_string, "error");
563     return 0;
564   }
565   return 1;
566 }
567
568 /**
569  * Dump (= save) a collection to the filesystem to restore it later
570  * @param collection The database and collection to use as string (e.g. robmem.worldmodel)
571  * @param directory Directory to dump the collection to
572  * @return 1: Success 0: Error
573  */
574 int RobotMemory::dump_collection(std::string collection, std::string directory)
575 {
576   check_collection_name(collection);
577
578   //lock (mongo_client not thread safe)
579    MutexLocker lock(mutex_);
580
581   //resolve path to dump to
582   if(collection.find(".") == std::string::npos)
583   {
584     log(std::string("Unable to dump collection" + collection), "error");
585     log(std::string("Specify collection like 'db.collection'"), "error");
586     return 0;
587   }
588   std::string path = StringConversions::resolve_path(directory);
589   log_deb(std::string("Dump collection " + collection + " into " + path), "warn");
590
591   //call mongorestore from folder with initial restores
592   std::vector<std::string> split = str_split(collection, '.');
593   std::string command = "/usr/bin/mongodump --out=" + path + " --db=" + split[0]
594     + " --collection=" + split[1] + " --host=127.0.0.1 --quiet";
595   log_deb(std::string("Dump command: " + command), "warn");
596   FILE *bash_output = popen(command.c_str(), "r");
597   //check if output is ok
598   if(!bash_output)
599   {
600     log(std::string("Unable to dump collection" + collection), "error");
601     return 0;
602   }
603   std::string output_string = "";
604   char buffer[100];
605   while (!feof(bash_output) )
606   {
607     if (fgets(buffer, 100, bash_output) == NULL)
608     {
609       break;
610     }
611     output_string += buffer;
612   }
613   pclose(bash_output);
614   if(output_string.find("Failed") != std::string::npos)
615   {
616     log(std::string("Unable to dump collection" + collection), "error");
617     log_deb(output_string, "error");
618     return 0;
619   }
620   return 1;
621 }
622
623 void
624 RobotMemory::log(std::string what, std::string info)
625 {
626   if(!info.compare("error"))
627       logger_->log_error(name_, "%s", what.c_str());
628   else if(!info.compare("warn"))
629     logger_->log_warn(name_, "%s", what.c_str());
630   else if(!info.compare("debug"))
631     logger_->log_debug(name_, "%s", what.c_str());
632   else
633     logger_->log_info(name_, "%s", what.c_str());
634 }
635
636 void
637 RobotMemory::log_deb(std::string what, std::string level)
638 {
639   if(debug_)
640     log(what, level);
641 }
642
643 void
644 RobotMemory::log_deb(mongo::Query query, std::string what, std::string level)
645 {
646   if(debug_)
647     log(query, what, level);
648 }
649
650 void
651 RobotMemory::log(mongo::Query query, std::string what, std::string level)
652 {
653   std::string output = what
654     + "\nFilter: " + query.getFilter().toString()
655     + "\nModifiers: " + query.getModifiers().toString()
656     + "\nSort: " + query.getSort().toString()
657     + "\nHint: " + query.getHint().toString()
658     + "\nReadPref: " + query.getReadPref().toString();
659   log(output, level);
660 }
661
662 void
663 RobotMemory::log_deb(mongo::BSONObj obj, std::string what, std::string level)
664 {
665   log(obj, what, level);
666 }
667
668 void
669 RobotMemory::log(mongo::BSONObj obj, std::string what, std::string level)
670 {
671   std::string output = what
672     + "\nObject: " + obj.toString();
673   log(output, level);
674 }
675
676 void
677 RobotMemory::set_fields(mongo::BSONObj &obj, std::string what)
678 {
679   BSONObjBuilder b;
680   b.appendElements(obj);
681   b.appendElements(fromjson(what));
682   //override
683   obj = b.obj();
684 }
685
686 void
687 RobotMemory::set_fields(mongo::Query &q, std::string what)
688 {
689   BSONObjBuilder b;
690   b.appendElements(q.getFilter());
691   b.appendElements(fromjson(what));
692
693   //the following is not yet kept in the query:
694   // + "\nFilter: " + query.getFilter().toString()
695   // + "\nModifiers: " + query.getModifiers().toString()
696   // + "\nSort: " + query.getSort().toString()
697   // + "\nHint: " + query.getHint().toString()
698   // + "\nReadPref: " + query.getReadPref().toString();
699
700   //override
701   q = Query(b.obj());
702 }
703
704 void
705 RobotMemory::remove_field(mongo::Query &q, std::string what)
706 {
707   BSONObjBuilder b;
708   b.appendElements(q.getFilter().removeField(what));
709
710   //the following is not yet kept in the query:
711   // + "\nFilter: " + query.getFilter().toString()
712   // + "\nModifiers: " + query.getModifiers().toString()
713   // + "\nSort: " + query.getSort().toString()
714   // + "\nHint: " + query.getHint().toString()
715   // + "\nReadPref: " + query.getReadPref().toString();
716
717   //override
718   q = Query(b.obj());
719 }
720
721 /**
722  * Check if collection name is valid and correct it if necessary
723  */
724 void
725 RobotMemory::check_collection_name(std::string &collection)
726 {
727   if(collection == "")
728   {
729       collection = default_collection_;
730   }
731   if(database_name_ != "robmem" && collection.find("robmem.") == 0)
732   {
733     //change used database name (e.g. for the case of multiple simulated dababases)
734     collection.replace(0, 6, database_name_);
735   }
736 }
737
738 /**
739  * Get the mongodb client associated with the collection (eighter the local or distributed one)
740  */
741 mongo::DBClientBase*
742 RobotMemory::get_mongodb_client(std::string &collection)
743 {
744   if(!distributed_)
745   {
746       return mongodb_client_local_;
747   }
748   //get db name of collection
749   size_t point_pos = collection.find(".");
750   if(point_pos == collection.npos)
751   {
752     logger_->log_error(name_, "Collection %s needs to start with 'dbname.'", collection.c_str());
753     return mongodb_client_local_;
754   }
755   std::string db = collection.substr(0, point_pos);
756   if(std::find(distributed_dbs_.begin(), distributed_dbs_.end(), db) != distributed_dbs_.end())
757   {
758     return mongodb_client_distributed_;
759   }
760   return mongodb_client_local_;
761 }
762
763 /**
764  * Remove a previously registered trigger
765  * @param trigger Pointer to the trigger to remove
766  */
767 void RobotMemory::remove_trigger(EventTrigger* trigger)
768 {
769   trigger_manager_->remove_trigger(trigger);
770 }
771
772 /**
773  * Remove previously registered computable
774  * @param computable The computable to remove
775  */
776 void RobotMemory::remove_computable(Computable* computable)
777 {
778   computables_manager_->remove_computable(computable);
779 }
780
781 /** Explicitly create a mutex.
782  * This is an optional step, a mutex is also created automatically when trying
783  * to acquire the lock for the first time. Adding it explicitly may increase
784  * visibility, e.g., in the database. Use it for mutexes which are locked
785  * only very infrequently.
786  * @param name mutex name
787  * @return true if operation was successful, false on failure
788  */
789 bool
790 RobotMemory::mutex_create(const std::string& name)
791 {
792         mongo::DBClientInterface *client =
793                 distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
794         mongo::BSONObjBuilder insert_doc;
795         insert_doc.append("$currentDate", BSON("lock-time" << true));
796         insert_doc.append("_id", name);
797         insert_doc.append("locked", false);
798         try {
799                 client->insert(cfg_coord_mutex_collection_, insert_doc.obj(),
800                                0, &mongo::WriteConcern::majority);
801                 return true;
802         } catch (mongo::DBException &e) {
803                 logger_->log_info(name_, "Failed to create mutex %s: %s", name.c_str(), e.what());
804                 return false;
805         }
806 }
807
808 /** Destroy a mutex.
809  * The mutex is erased from the database. This is done disregarding it's current
810  * lock state.
811  * @param name mutex name
812  * @return true if operation was successful, false on failure
813  */
814 bool
815 RobotMemory::mutex_destroy(const std::string& name)
816 {
817         mongo::DBClientInterface *client =
818                 distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
819         mongo::BSONObj destroy_doc{BSON("_id" << name)};
820         try {
821                 client->remove(cfg_coord_mutex_collection_, destroy_doc,
822                                true, &mongo::WriteConcern::majority);
823                 return true;
824         } catch (mongo::DBException &e) {
825                 logger_->log_info(name_, "Failed to destroy mutex %s: %s", name.c_str(), e.what());
826                 return false;
827         }
828 }
829
830 /** Try to acquire a lock for a mutex.
831  * This will access the database and atomically find and update (or
832  * insert) a mutex lock. If the mutex has not been created it is added
833  * automatically. If the lock cannot be acquired the function also
834  * returns immediately. There is no blocked waiting for the lock.
835  * @param name mutex name
836  * @param identity string to set as lock-holder
837  * @param force true to force acquisition of the lock, i.e., even if
838  * the lock has already been acquired take ownership (steal the lock).
839  * @return true if operation was successful, false on failure
840  */
841 bool
842 RobotMemory::mutex_try_lock(const std::string& name,
843                             std::string identity, bool force)
844 {
845         mongo::DBClientBase *client =
846                 distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
847
848         if (identity.empty()) {
849                 HostInfo host_info;
850                 identity = host_info.name();
851         }
852
853         // here we can add an $or to implement lock timeouts
854         mongo::BSONObjBuilder filter_doc;
855         filter_doc.append("_id", name);
856         if (! force) {
857                 filter_doc.append("locked", false);
858         }
859
860         mongo::BSONObjBuilder update_doc;
861         update_doc.append("$currentDate", BSON("lock-time" << true));
862         mongo::BSONObjBuilder update_set;
863         update_set.append("locked", true);
864         update_set.append("locked-by", identity);
865         update_doc.append("$set", update_set.obj());
866
867         try {
868                 BSONObj new_doc =
869                         client->findAndModify(cfg_coord_mutex_collection_,
870                                               filter_doc.obj(), update_doc.obj(),
871                                               /* upsert */ true, /* return new */ true,
872                                               /* sort */ BSONObj(), /* fields */ BSONObj(),
873                                               &mongo::WriteConcern::majority);
874
875                 return (new_doc.getField("locked-by").String() == identity &&
876                         new_doc.getField("locked").Bool());
877
878         } catch (mongo::OperationException &e) {
879                 //if (e.obj()["code"].numberInt() != 11000) {
880                 // 11000: Duplicate key exception, occurs if we do not become leader, all fine
881                 return false;
882         }
883 }
884
885 /** Try to acquire a lock for a mutex.
886  * This will access the database and atomically find and update (or
887  * insert) a mutex lock. If the mutex has not been created it is added
888  * automatically. If the lock cannot be acquired the function also
889  * returns immediately. There is no blocked waiting for the lock.
890  * @param name mutex name
891  * @param force true to force acquisition of the lock, i.e., even if
892  * the lock has already been acquired take ownership (steal the lock).
893  * @return true if operation was successful, false on failure
894  */
895 bool
896 RobotMemory::mutex_try_lock(const std::string& name, bool force)
897 {
898         return mutex_try_lock(name, "", force);
899 }
900
901 /** Release lock on mutex.
902  * @param name mutex name
903  * @param identity string to set as lock-holder
904  * @return true if operation was successful, false on failure
905  */
906 bool
907 RobotMemory::mutex_unlock(const std::string& name,
908                           std::string identity)
909 {
910         mongo::DBClientBase *client =
911                 distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
912
913         if (identity.empty()) {
914                 HostInfo host_info;
915                 identity = host_info.name();
916         }
917
918         // here we can add an $or to implement lock timeouts
919         mongo::BSONObj filter_doc{BSON("_id" << name << "locked-by" << identity)};
920
921         mongo::BSONObj update_doc{BSON("$set" << BSON("locked" << false) <<
922                                        "$unset" << BSON("locked-by" << true <<
923                                                         "lock-time" << true))};
924
925         try {
926                 BSONObj new_doc =
927                         client->findAndModify(cfg_coord_mutex_collection_,
928                                               filter_doc, update_doc,
929                                               /* upsert */ true, /* return new */ true,
930                                               /* sort */ BSONObj(), /* fields */ BSONObj(),
931                                               &mongo::WriteConcern::majority);
932
933                 return true;
934         } catch (mongo::OperationException &e) {
935                 return false;
936         }
937 }
938
939
940 /** Renew a mutex.
941  * Renewing means updating the lock timestamp to the current time to
942  * avoid expiration. Note that the lock must currently be held by
943  * the given identity.
944  * @param name mutex name
945  * @param identity string to set as lock-holder (defaults to hostname
946  * if empty)
947  * @return true if operation was successful, false on failure
948  */
949 bool
950 RobotMemory::mutex_renew_lock(const std::string& name,
951                               std::string identity)
952 {
953         mongo::DBClientBase *client =
954                 distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
955
956         if (identity.empty()) {
957                 HostInfo host_info;
958                 identity = host_info.name();
959         }
960
961         // here we can add an $or to implement lock timeouts
962         mongo::BSONObj filter_doc{BSON("_id" << name <<
963                                        "locked" << true <<
964                                        "locked-by" << identity)};
965
966         // we set all data, even the data which is not actually modified, to
967         // make it easier to process the update in triggers.
968         mongo::BSONObjBuilder update_doc;
969         update_doc.append("$currentDate", BSON("lock-time" << true));
970         mongo::BSONObjBuilder update_set;
971         update_set.append("locked", true);
972         update_set.append("locked-by", identity);
973         update_doc.append("$set", update_set.obj());
974
975         try {
976                 BSONObj new_doc =
977                         client->findAndModify(cfg_coord_mutex_collection_,
978                                               filter_doc, update_doc.obj(),
979                                               /* upsert */ false, /* return new */ true,
980                                               /* sort */ BSONObj(), /* fields */ BSONObj(),
981                                               &mongo::WriteConcern::majority);
982
983                 return true;
984         } catch (mongo::OperationException &e) {
985                 logger_->log_warn(name_, "Renewing lock on mutex %s failed: %s",
986                                   name.c_str(), e.what());
987                 return false;
988         }
989 }
990
991
992 /** Setup time-to-live index for mutexes.
993  * Setting up a time-to-live index for mutexes enables automatic
994  * expiration through the database. Note, however, that the documents
995  * are expired only every 60 seconds. This has two consequences:
996  * - max_age_sec lower than 60 seconds cannot be achieved
997  * - locks may be held for up to just below 60 seconds longer than
998  *   configured, i.e., if the mutex had not yet expired when the
999  *   background tasks runs.
1000  * @param max_age_sec maximum age of locks in seconds
1001  * @return true if operation was successful, false on failure
1002  */
1003 bool
1004 RobotMemory::mutex_setup_ttl(float max_age_sec)
1005 {
1006   MutexLocker lock(mutex_);
1007
1008         mongo::DBClientBase *client =
1009                 distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1010
1011         BSONObj keys = BSON("lock-time" << true);
1012
1013         try {
1014                 client->createIndex(cfg_coord_mutex_collection_, mongo::IndexSpec()
1015                                     .addKeys(keys)
1016                                     .expireAfterSeconds(max_age_sec));
1017   } catch (DBException &e) {
1018                 logger_->log_warn(name_, "Creating TTL index failed: %s", e.what());
1019           return false;
1020   }
1021   return true;
1022 }
1023
1024 /** Expire old locks on mutexes.
1025  * This will update the database and set all mutexes to unlocked for
1026  * which the lock-time is older than the given maximum age.
1027  * @param max_age_sec maximum age of locks in seconds
1028  * @return true if operation was successful, false on failure
1029  */
1030 bool
1031 RobotMemory::mutex_expire_locks(float max_age_sec)
1032 {
1033         mongo::DBClientBase *client =
1034                 distributed_ ? mongodb_client_distributed_ : mongodb_client_local_;
1035
1036         using std::chrono::milliseconds;
1037         using std::chrono::high_resolution_clock;
1038         using std::chrono::time_point;
1039         using std::chrono::time_point_cast;
1040
1041         auto max_age_ms = milliseconds(static_cast<unsigned long int>(std::floor(max_age_sec*1000)));
1042         time_point<high_resolution_clock, milliseconds> expire_before =
1043                 time_point_cast<milliseconds>(high_resolution_clock::now()) - max_age_ms;
1044         mongo::Date_t   expire_before_mdb(expire_before.time_since_epoch().count());
1045
1046         // here we can add an $or to implement lock timeouts
1047         mongo::BSONObj filter_doc{BSON("locked" << true <<
1048                                        "lock-time" << mongo::LT << expire_before_mdb)};
1049
1050         try {
1051                 client->remove(cfg_coord_mutex_collection_, filter_doc,
1052                                true, &mongo::WriteConcern::majority);
1053
1054                 return true;
1055         } catch (mongo::OperationException &e) {
1056                 return false;
1057         }
1058 }