Source code for pyrk.db.database

# Licensed under a 3-clause BSD style license - see LICENSE
import tables as tb
from db import descriptions as desc

import contextlib
import sys
import six


@contextlib.contextmanager
[docs]def nostderr(): """This context manager catches standard error output. It is used mostly to catch the unnecessary fileclosed warnings from pytables. """ savestderr = sys.stderr class Devnull(object): def write(self, _): pass def flush(self): pass sys.stderr = Devnull() try: yield finally: sys.stderr = savestderr
[docs]class Database(object): """The Database class handles operations on the pyrk simulation backend and provides utilities for interacting with it. """ def __init__(self, filepath='pyrk.h5', mode='w', title='PyRKDatabase' ): """Creates an hdf5 database for simulation information :param filepath: the location of the h5 file. e.g. 'pyrk.h5' :type filepath: str :param mode: mode for file opening :type mode: str (a, w, and r are supported) :param title: The title of the database :type title: str """ self.recorders = [] self.tablehandles = {} self.mode = mode self.title = title self.filepath = filepath self.h5file = tb.File(filename=self.filepath, title=self.title, mode=self.mode) self.groups = self.set_up_groups() self.tables = self.set_up_tables() self.make_groups() self.make_tables()
[docs] def add_group(self, groupname, grouptitle, path_to_group='/'): """Creates a new group in the file :param groupname: name of the group to add :type groupname: str :param grouptitle: metadata to store in plain english, a title :type grouptitle: str :param path_to_group: the database path, starts with '/' :type path_to_group: str """ self.open_db() group = self.group_exists(path_to_group, groupname) if group is False: group = self.h5file.create_group(path_to_group, groupname, grouptitle) return group
[docs] def add_table(self, groupname, tablename, description, tabletitle): """Creates a new table All groupnames must be directly under root :param groupname: name of the group to add :type groupname: str :param tablename: name of the table to add :type tablename: str :param description: metadata for the table :type description: str :param tabletitle: metadata to store in plain english, a title :type tabletitle: str """ self.open_db() p = self.get_tablepath(groupname, tablename) self.tablehandles[p] = self.h5file.create_table('/'+groupname, tablename, description, tabletitle) return self.tablehandles[p]
[docs] def add_row(self, table, row_dict): """Adds a row to the table and flushes the table :param table: handle to the table where the row will reside :type tablename: pytables Table object :param row_dict: metadata to store in plain english, a title :type row_dict: dictionary of row keys and values """ self.open_db() for k, v in six.iteritems(row_dict): table.row[k] = v table.row.append() table.flush()
[docs] def group_exists(self, path_to_group, groupname): """Checks whether the group exsts, with that name, at that path :param groupname: name of the group to add :type groupname: str :param path_to_group: the database path, starts with '/' :type path_to_group: str :returns: returns the group :rtype: pytables Group object """ self.open_db() try: group = self.h5file.get_node(path_to_group, name=groupname) except tb.NoSuchNodeError: group = False return group
[docs] def open_db(self): """Returns a handle to the open db""" # if it is not open, open it. if self.h5file.isopen is True: return self.h5file else: self.h5file = tb.open_file(filename=self.filepath, mode='a') assert(self.h5file.isopen) return self.h5file
[docs] def close_db(self): """Closes all currently open handles to the database.""" with nostderr(): tb.file._open_files.close_all()
[docs] def record_all(self): """For each row sent by current recorders, add the row. """ for i in self.recorders: t = i[0] r = i[1] self.add_row(t, r())
[docs] def delete_db(self): """If the database exists, delete it""" import os.path os.remove(self.filepath)
[docs] def make_groups(self): """For each group in groups, add group to the db. """ for g in self.groups: self.add_group(groupname=g['groupname'], grouptitle=g['grouptitle'], path_to_group=g['path'])
[docs] def make_tables(self): """For each table in tables, add table to the db. """ for t in self.tables: self.add_table(groupname=t['groupname'], tablename=t['tablename'], description=t['description'], tabletitle=t['tabletitle'])
[docs] def set_up_groups(self): """We know what groups need to exist for a PyRK simulation. This is their info. :returns: groups that define the simulation in PyRK """ groups = [] groups.append({'groupname': 'th', 'grouptitle': 'TH', 'path': '/'}) groups.append({'groupname': 'neutronics', 'grouptitle': 'Neutronics', 'path': '/'}) groups.append({'groupname': 'metadata', 'grouptitle': 'Simulation Metadata', 'path': '/'}) return groups
[docs] def set_up_tables(self): """We know what tables need to exist for a PyRK simulation. This is their info. :returns: tables that define the simulation in PyRK """ tables = [] tables.append({'groupname': 'metadata', 'tablename': 'sim_info', 'description': desc.SimInfoRow, 'tabletitle': 'Simulation Information'}) tables.append({'groupname': 'metadata', 'tablename': 'sim_timeseries', 'description': desc.SimTimeseriesRow, 'tabletitle': 'Simulation Power Data'}) tables.append({'groupname': 'th', 'tablename': 'th_params', 'description': desc.ThMetadataRow, 'tabletitle': 'TH Component Parameters'}) tables.append({'groupname': 'th', 'tablename': 'th_timeseries', 'description': desc.ThTimeseriesRow, 'tabletitle': 'TH Timeseries'}) tables.append({'groupname': 'neutronics', 'tablename': 'neutronics_timeseries', 'description': desc.NeutronicsTimeseriesRow, 'tabletitle': 'Neutronics Timeseries'}) tables.append({'groupname': 'neutronics', 'tablename': 'neutronics_params', 'description': desc.NeutronicsParamsRow, 'tabletitle': 'Neutronics Metadata'}) tables.append({'groupname': 'neutronics', 'tablename': 'zetas', 'description': desc.ZetasTimestepRow, 'tabletitle': 'Neutron Precursor Concentrations'}) tables.append({'groupname': 'neutronics', 'tablename': 'omegas', 'description': desc.OmegasTimestepRow, 'tabletitle': 'Decay Heat Fractions'}) return tables
[docs] def register_recorder(self, groupname, tablename, recorder, timeseries=False): """Register an entity that wants to represent itself in the Database :param groupname: name of the group to add :type groupname: str :param path_to_group: the database path, starts with '/' :type path_to_group: str :param recorder: a function pointer that returns a table row :type recorder: function object :param timeseries: should this be recorded each timestep? :type timeseries: bool """ self.open_db() tab = self.get_table(groupname, tablename) if timeseries is False: self.add_row(tab, recorder()) else: self.recorders.append((tab, recorder))
[docs] def get_tablepath(self, groupname, tablename): """Compiles the string for a table within a group :param groupname: name of the group :type groupname: str :param tablename: name of the table in the group :type tablename: str :returns: the path to the table in the group :rtype: str """ return '/'+groupname+'/'+tablename
[docs] def get_table(self, groupname, tablename): """Returns the table handle for a table within a group :param groupname: name of the group :type groupname: str :param tablename: name of the table in the group :type tablename: str :returns: the path to the table in the group :rtype: str """ self.open_db() p = self.get_tablepath(groupname, tablename) try: return self.tablehandles[p] except KeyError: msg = "table path " + p + " not found among table handles." raise KeyError(msg)