Source code for omas.omas_mongo

'''save/load from MongoDB routines

-------
'''
import pathlib

# to start a mongodb server on the local workstation
# mongod --dbpath $DIRECTORY_WHERE_TO_STORE_DATA

from .omas_utils import *
from .omas_core import ODS


# -----------------------------
# save and load OMAS to MongoDB
# -----------------------------
[docs]def save_omas_mongo(ods, collection, database='omas', server=omas_rcparams['default_mongo_server']): """ Save an ODS to MongoDB :param ods: OMAS data set :param collection: collection name in the database :param database: database name on the server :param server: server name :return: unique `_id` identifier of the record """ printd('Saving OMAS data to MongoDB: collection=%s database=%s server=%s' % (collection, database, server), topic='MongoDB') # importing module from pymongo import MongoClient # connect client = MongoClient(server.format(**get_mongo_credentials(server, database, collection))) # access database db = client[database] # access collection coll = db[collection] # a cheap way to encode data kw = {'indent': 0, 'separators': (',', ': '), 'sort_keys': True} json_string = json.dumps(ods, default=lambda x: json_dumper(x, None), **kw) data = json.loads(json_string) # avoid insert_one() to modify data data = copy.copy(data) # insert record res = coll.insert_one(data) return str(res.inserted_id)
[docs]def load_omas_mongo( find, collection, database='omas', server=omas_rcparams['default_mongo_server'], consistency_check=True, imas_version=omas_rcparams['default_imas_version'], limit=None, ): """ Load an ODS from MongoDB :param find: dictionary to find data in the database :param collection: collection name in the database :param database: database name on the server :param server: server name :param consistency_check: verify that data is consistent with IMAS schema :param imas_version: imas version to use for consistency check :param limit: return at most `limit` number of results :return: list of OMAS data set that match find criterion """ # importing module from pymongo import MongoClient from bson.objectid import ObjectId # allow search by _id if not isinstance(find, dict): raise TypeError('load_omas_mongo find attribute must be a dictionary') if '_id' in find: find = copy.deepcopy(find) find['_id'] = ObjectId(find['_id']) printd('Loading OMAS data from MongoDB: collection=%s database=%s server=%s' % (collection, database, server), topic='MongoDB') # connect client = MongoClient(server.format(**get_mongo_credentials(server, database, collection))) # access database db = client[database] # access collection coll = db[collection] # find all the matching records found = coll.find(find) if limit is not None: found = found.limit(limit) # populate ODSs results = {} for record in found: ods = ODS(consistency_check=consistency_check, imas_version=imas_version) _id = record['_id'] del record['_id'] ods.from_structure(record) results[_id] = ods return results
def get_mongo_credentials(server='', database='', collection=''): """ Users can specify their credentials in a `~/.omas/mongo_credentials` json file formatted like this: >> {"default": {"user": "mydefaultuser", >> "pass": "mydefaultpass"}, >> "omasdb-xymmt.mongodb.net": {"user": "myuser1" >> "pass": "mypass1", >> "specific_database": {"user": "myuser2", >> "pass": "mypass2", >> "specific_collection": {"user": "myuser3", >> "pass": "mypass3"} >> }}} if no `~/.omas/mongo_credentials` file is found: {'user': 'omas_test', 'pass': 'omas_test'} if no matching server is found, the `default` is returned :param server: server to look credentials for * server can have `{user}:{pass}` placeholders: mongodb+srv://{user}:{pass}@omasdb.xymmt.mongodb.net :param database: database name in server to look credentials for :param collection: collection name in database to look credentials for :return: dictionary with 'user' and 'pass' keys """ server = server.split('@')[-1] up = {'user': 'omas_test', 'pass': 'omas_test'} config = {} filepath = pathlib.Path.home() / '/.omas/mongo_credentials' if filepath.exists(): with open(filepath) as f: config = json.loads(f.read()) if 'default' in config: up = config['default'] if server in config: up = config[server] if database in config[server]: up = config[server][database] if collection in config[server][collection]: up = config[server][database][collection] return up
[docs]def through_omas_mongo(ods, method=['function', 'class_method'][1]): """ Test save and load OMAS MongoDB :param ods: ods :return: ods """ ods = copy.deepcopy(ods) # make a copy to make sure save does not alter entering ODS if method == 'function': _id = save_omas_mongo(ods, collection='test', database='test') results = load_omas_mongo({'_id': _id}, collection='test', database='test') if len(results) != 1: raise Exception('through_omas_mongo failed') ods1 = list(results.values())[0] return ods1 else: _id = ods.save('mongo', collection='test', database='test') ods1 = ODS().load('mongo', {'_id': _id}, collection='test', database='test') return ods1