"""
Tools for interfacing with Google sheets
Compared to SQL tables, Google sheets allow for formatting and other conveniences
that make them easier to update by hand (appropriate for some columns like labels,
comments, etc.), but the API for Google sheets is relatively simple. This file
is meant to provide tools for looking up data by column header instead of column
number/letter and by shot instead of row number. That is, the user should not have
to know which row holds shot 123456 or which column holds density in order to get
the average density for shot 123456.
In this script, the purely numeric way to refer to cell 'A1' is (0, 0). Do not
allow any reference to A1 as (1, 1) to creep into this code or it will ruin
everything. Referring to ('A', 1) is fine; there's a function for interpreting
cell references, but it uses the presence of a letter in the column as a cue that
the row should be counted from 1, so it can't handle counting columns from 1.
Some packages DO use numbers (from 1) instead of indices (from 0) internally. If
one of these packages is used, its returned values must be converted immediately.
YOU WON'T LIKE WHAT HAPPENS IF YOU FAIL TO MAINTAIN THIS DISCIPLINE.
"""
# OMFIT imports
try:
# framework is running
from .startup_choice import *
except ImportError as _excp:
# class is imported by itself
if (
'attempted relative import with no known parent package' in str(_excp)
or 'No module named \'omfit_classes\'' in str(_excp)
or "No module named '__main__.startup_choice'" in str(_excp)
):
from startup_choice import *
else:
raise
# Basic imports
import copy
import string
import numpy as np
import matplotlib
import re
# OMFIT class imports
from omfit_classes.omfit_mds import OMFITmdsValue
from omfit_classes.omfit_base import OMFITtmp, OMFITtree
# Import modules for interfacing with google sheets
try:
# This is an optional dependency. If it isn't installed, we can't do much except
# explain the problem.
import pygsheets
except ImportError:
pygsheets = None
setup_help = '''
How to prepare a Google Sheet for use with these tools:
-------------------------------------------------------
Most of these steps are described with screen shots in the pygsheets documentation
(use the service account version):
https://pygsheets.readthedocs.io/en/stable/authorization.html
1: Make a sheet in Google drive
2: Make a project at https://console.developers.google.com/
3: Add and enable both Google drive API and Google sheets API
4: Add credentials: create credentials for a service account. Give it the Project: editor role.
5. Add a JSON key to the service account, download it, and place it somewhere accessible to this script.
6: Note the service account's email address (the form is <SERVICE-NAME>@<PROJECT>.iam.gserviceaccount.com)
7: Share the sheet with the service account using the email address it gets (ending in iam.gserviceaccount.com)
Connection should then be possible if OMFITgoogleSheet or subclass is initialized
with the right keyfile and sheet_name.
'''
__all__ = ['OMFITgoogleSheet', 'OMFITexperimentTable']
# List of column letters (A, B, ... Z, AA, AB, ... AZ, BA, ... ZZ)
string_ascii_uppercase = list(map(chr, range(ord('A'), ord('Z') + 1)))
col_letters = [] + string_ascii_uppercase
for first_letter in string_ascii_uppercase:
for second_letter in string_ascii_uppercase:
col_letters.append(first_letter + second_letter)
def interpret_row_col(*args):
"""
Interprets row number and column name or row index and column index and returns indices
Either provide row number and column letter as in google sheet as in args = ('A', 1) or args = ('A1', )
or provide the row and column indices starting from 0 as in args = (0, 0).
That is, give the input the way google displays it or the way python tracks it internally. No mixing.
Large column letters, like AA, AB, CX, etc. are supported up to ZZ.
This function is useful because we are dealing with a mixture of methods that try to index arrays from 0 and
methods that try to communicate with the spreadsheet and its cell addresses. It is very convenient to hand this
thing a valid address from some source and let it figure out how to interpret it. Just make sure you DO NOT let
cell references like (1, 1) for A1 into the code. Convert them on the way in into (0, 0) or ('A', 1).
:param args: One or two input arguments.
One argument should be an address like 'A1'
Two arguments should be a column followed by a row.
If the column is int, both it and the row as interpreted as indices starting from 0.
If the column is in the list of recognized column letters (A, B, ... ZZ), the row is interpreted as a
row number, starting from 1.
So, args of (0, 0), ('A', 1) and 'A1' all mean the same thing.
:return: (int, int, str)
(column_index, row_index, address)
The indices start from 0 and the address.
"""
# Assign the input arguments to column, row, and address_in
if len(args) == 1:
address_in = args[0]
column, row = re.findall(r'[A-Za-z]+|\d+', address_in) # https://stackoverflow.com/a/28290501/6605826
row = int(row)
elif len(args) == 2:
address_in = None
column, row = args[0:2]
else:
raise ValueError('Too many arguments')
# Decide what the indices for column and row are based on what's been given
if isinstance(column, int):
column_index = column
row_index = int(row)
elif column in col_letters:
row_index = int(row) - 1
column_index = int(np.where(col_letters == np.array(column))[0][0])
else:
raise ValueError(f'Could not understand column index {column}')
# Check and then return results
address = f'{col_letters[column_index]}{row_index+1}'
if address_in is not None:
assert address_in == address, 'Did not recover the right address'
assert row_index >= 0, 'Row index must not be negative.'
return column_index, row_index, address
def list_to_array_type(list_in, type_out=None, fill_value=np.NaN, int_truncation_allowed=False):
"""
Forces a list to be an array with a specified type.
Values that can't be forced to the type are replaced with a fill_value.
Special: can be set to consider floats like 5.2 to be unrepresentable as ints instead of truncating them.
:param list_in: list
These would normally be the values read from a column or row.
If formatted values are obtained, they will be strings, even if the strings just hold numbers.
:param type_out: type
This is just the type you would like each element to be in the output.
:param fill_value: object matching type_out
Elements that can't be forced to type_out will be replaced with this.
:param int_truncation_allowed: bool
Allow floats to be cast as int by truncation, so 26.945 becomes 26 instead of fill_value
:return: array matching type_out
"""
numeric_types = [int, float, complex]
type_out = type_out or float
try:
type_out(fill_value)
except Exception as excp:
raise excp.__class__(f'fill_value {fill_value} is inconsistent with type_out {type_out}!')
def try_type(x):
"""Tries to force x to be type and returns it; returns fill_value on failure"""
# noinspection PyBroadException
try:
if type_out in numeric_types:
the_thing = type_out(float(x))
else:
the_thing = type_out(x)
except Exception as exc:
printd(f' failed to cast {type_out}({x}) because {exc}')
return type_out(fill_value)
else:
if (not int_truncation_allowed) and (type_out == int) and (float(x) != the_thing):
printd(f' Casting to int would change {x} to {the_thing} and is not allowed. FAIL!')
return type_out(fill_value)
else:
return the_thing
# fromiter : https://stackoverflow.com/a/28526475/6605826
return np.fromiter(map(try_type, list_in), type_out)
def operate_on_data(data, operation):
"""
Handles mean/median/etc. operations for clients such as OMFITexperimentTable(...).write_mds_data_to_table()
:param data: 1D float array
Data to process
:param operation: str
Description of operation
:return: float
Result of operation on data
"""
if operation == 'mean':
value = np.nanmean(data)
elif operation == 'median':
value = np.nanmedian(data)
elif operation == 'max':
value = np.nanmax(data)
elif operation == 'min':
value = np.nanmin(data)
else:
raise ValueError(f'Unrecognized operation passed to operate_on_data(): {operation}')
return value
def get_definition_from_docstring(theobject, return_kw=False):
"""
Reads example code from the docstring of a class or function and uses it to define some variables
:param theobject: class or function
The docstring should contain statements like '>> the_variable = the_value'.
They can be multi-line.
Each statement should be separated from others by lines not starting with >> .
:param return_kw: bool
Instead of evaluating expressions as written (e.g. var = ClassName(**kw)),
replace class or method name with dict and append _kw to var, to instead get
var_kw = dict(**kw). This will fail if the statements in the docstring aren't
don't follow the form a = b(keywords=values, keywords2=values2, ...).
:return: dict
Keys are the variables being defined (on the left side of the first =)
Values are evaluated expressions taken from the right side of the first =.
"""
fn_doc = theobject.__doc__.split('\n')
is_example = np.array([doc_line.strip().startswith('>> ') for doc_line in fn_doc])
exmp_changes = np.append(0, np.diff(is_example.astype(int)))
starts = np.where(exmp_changes == 1)[0]
ends = np.where(exmp_changes == -1)[0]
docstring_definitions = {}
for start, end in zip(starts, ends):
var_name = fn_doc[start].split('=')[0].strip().split('>> ')[1]
def_start = '='.join(fn_doc[start].split('=')[1:]).strip()
def_rest = [d.split('>> ')[1] for d in fn_doc[start + 1 : end]]
definition = '\n'.join([def_start] + def_rest)
if return_kw:
# noinspection PyBroadException
try:
class_or_func_name = definition.split('(')[0]
replaced_with_dict = 'dict' + definition[len(class_or_func_name) :]
docstring_definitions[var_name + '_kw'] = eval(replaced_with_dict)
except Exception:
docstring_definitions[var_name + '_kw'] = None
else:
docstring_definitions[var_name] = eval(definition)
return docstring_definitions
class OMFITgoogleSheetSetupError(ValueError):
pass
class OMFITgoogleSheetBadKeyfileError(OMFITgoogleSheetSetupError):
pass
class OMFITexperimentTableSetupError(OMFITgoogleSheetSetupError):
pass
[docs]class OMFITgoogleSheet(OMFITtree):
"""
Connects to Google sheets and provides some convenience features
* Lookup columns by column header name instead of column number
* Local caching to allow repeated read operations without too many connections to the remote sheet.
* Throttle to avoid throwing an exception due to hitting Google's rate limiter,
or pygsheet's heavy-handed rate limit obedience protocol.
An example sheet that is compatible with the assumptions made by this class may be found here:
https://docs.google.com/spreadsheets/d/1MJ8cFjFZ2pkt4OHWWIciWT3sM2hSADqzG78NDiqKgkU/edit?usp=sharing
A sample call that should work to start up an OMFITgoogleSheet instance is:
>> gsheet = OMFITgoogleSheet(
>> keyfile=os.sep.join([OMFITsrc, '..', 'samples', 'omfit-test-gsheet_key.json']),
>> sheet_name='OmfitDataSheetTestsheet',
>> subsheet_name='Sheet1', # Default: lookup first subsheet
>> column_header_row_idx=5, # Default: try to guess
>> units_row_idx=6, # Default: None
>> data_start_row_idx=7, # Default: header row + 1 (no units row) or + 2 (if units row specified)
>> )
This call should connect to the example sheet. This is more than an example; this is a functional call
that is read out of the docstring by the regression test and testing will fail if it doesn't work properly.
"""
def __init__(
self,
filename='',
keyfile=None,
sheet_name=None,
subsheet_name=None,
column_header_row_idx=None,
column_header_row_number=None,
units_row_idx=None,
units_row_number=None,
data_start_row_idx=None,
data_start_row_number=None,
**kw,
):
"""
:param filename: str
Not used, but apparently required when subclassing from OMFITtree.
:param keyfile: str or dict-like
Filename with path of the file with authentication information,
or dict-like object with the parsed file contents (OMFITjson should work well).
See setup_help for help setting this up.
:param sheet_name: str
Name of the Google sheets file/object/whatever to access
:param subsheet_name: str
Sheet within the sheet (the tabs at the bottom). Defaults to the first tab.
:param column_header_row_idx: int
Index (from 0) of the row with column headers. If not specified, we will try to guess for you.
Indices like this are stored internally.
:param column_header_row_number: int
Number (from 1) of the row with column headers, as it appears in the sheet.
Ignored if column_header_row_idx is specified. If neither is specified, we will try to guess for you.
This will be converted into an index (from 0) for internal use.
:param units_row_idx: int or None
Index (from 0) of the row with the units, if there is one, or None if there isn't a row for units.
:param units_row_number: int or None
Number (from 1) of the units row. See description of column_header_row_number.
:param data_start_row_idx: int
Index (from 0) of the row where data start.
Defaults to column_header_row + 1 if units_row is None or +2 if units_row is not None.
:param data_start_row_idx: int
Number (from 1) of the first row of data after the header.
See description of column_header_row_number.
:param kw: additional keywords passed to super class's __init__
"""
self.OMFITproperties = {}
super().__init__(filename=filename, **kw)
# Make a scratch space for objects that don't need to be / can't be saved
self['scratch'] = OMFITtmp()
self['scratch']['client'] = None
self['scratch']['sheet_connection'] = None
# Cache setup
self['__cache__'] = OMFITtmp()
self['cache_expiration'] = 30 # seconds
# Throttle setup
self['previous_write_times'] = np.array([])
self['throttle_interval_s'] = 100.0
self['throttle_requests_in_interval'] = 100
# Interpret input keywords
self['keyfile'] = keyfile
self['sheet_name'] = sheet_name
self['subsheet_name'] = subsheet_name
if column_header_row_idx is None and column_header_row_number is not None:
column_header_row_idx = column_header_row_number - 1
self['column_header_row'] = column_header_row_idx
if units_row_idx is None and units_row_number is not None:
units_row_idx = units_row_number - 1
self['units_row'] = units_row_idx
if data_start_row_idx is None and data_start_row_number is not None:
data_start_row_idx = data_start_row_number - 1
self['data_start_row'] = data_start_row_idx
def __tree_repr__(self):
if pygsheets is None:
return 'Import failed: pygsheets. No connection possible.', []
if isinstance(self['scratch'].get('sheet_connection', None), pygsheets.Worksheet):
connection_status = 'connected'
else:
connection_status = '(NOT connected)'
subsheet = self.get('subsheet_name', None)
if subsheet is None:
subsheet = 'TBD'
description = (
f"{self.get('sheet_name', '')}/{subsheet} {connection_status} "
f"{self['column_header_row']}|{self['units_row']}|{self['data_start_row']}"
)
return description, []
def __call__(self):
"""This seems like a useful thing to do"""
self.connect()
[docs] def record_request(self):
"""Logs a request to the Google API so the rate can be avoided by local throttling"""
self['previous_write_times'] = np.append(self['previous_write_times'], time.time())
[docs] def self_check(self, essential_only=False):
"""
Makes sure setup is acceptable and raises AssertionError otherwise
:param essential_only: bool
Skip checks that aren't essential to initializing the class and its connection.
This avoids spamming warnings about stuff that might get resolved later in setup.
"""
problems = []
issues = []
# noinspection PyBroadException
try:
# Essential checks
if self['keyfile'] is None:
problems += ['keyfile missing: Need filename (with path) of the key for accessing Google sheets.']
if self['sheet_name'] is None:
problems += ['sheet_name missing: Please specify the name of the sheet to access.']
# More checks
if not essential_only:
if self['column_header_row'] is None:
issues += ['column_header_row missing: required for most operations.']
except Exception as excp:
problems += [f'Encountered exception while trying to check stuff: {excp}']
# Report problems
if len(issues):
fissues = '\n'.join(issues)
print(f'Warning: there were {len(issues)} issues that may prevent full functionality:\n{fissues}')
fproblems = '\n'.join(problems)
if problems:
self.help()
assert len(problems) == 0, f'There were some setup problems:\n{fproblems}'
[docs] def authorize_client(self):
"""
Deal with keyfile being allowed to be either a string or a dict-like
:return: pygsheets.client.Client instance
"""
if is_string(self['keyfile']):
return pygsheets.authorize(service_account_file=self['keyfile'])
elif isinstance(self['keyfile'], dict):
from google.oauth2 import service_account
clean_keyfile = dict(self['keyfile']) # Things get weird if keyfile is an OMFITjson; simplify to dict
scopes = ('https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive')
creds = service_account.Credentials.from_service_account_info(clean_keyfile, scopes=scopes)
return pygsheets.authorize(custom_credentials=creds)
else:
raise OMFITgoogleSheetBadKeyfileError(
f"Could not use keyfile of type {type(self['keyfile'])}; should be either a str or dict-like"
)
[docs] def connect(self, attempt=0):
"""Establishes connection to google sheet"""
self.self_check(essential_only=True)
if pygsheets is None:
exc = AssertionError("Failed to import required packages: can't connect to google.")
self['scratch']['sheet_connection'] = exc
printe('Unable to connect because of missing packages')
return
print('Connecting to google sheets...')
try:
self['scratch']['client'] = self.authorize_client()
# .open() here is not the same as file.open(). Do not put in `with`.
opened_client = self['scratch']['client'].open(self['sheet_name'])
self['subsheet_name'] = self['subsheet_name'] or opened_client.worksheets()[0].title
self['scratch']['sheet_connection'] = (
self['scratch']['client'].open(self['sheet_name']).worksheet_by_title(self['subsheet_name'])
)
except Exception as exc:
self['scratch']['sheet_connection'] = exc
if attempt < 1:
wait_time = 2.0
printw(f'First attempt to connect failed; waiting {wait_time} seconds and trying again. ({exc})')
time.sleep(wait_time)
self.connect(attempt=attempt + 1)
else:
printe(f'Failed to connect ({exc})')
self['scratch']['sheet_connection'] = exc
else:
self.record_request()
self.post_connect()
[docs] def sheet_connection(self):
"""
Attempt to connect on demand and return a Worksheet object
:return: Worksheet instance
"""
if self['scratch'].get('sheet_connection', None) is None:
self.connect()
return self['scratch'].get('sheet_connection', None)
[docs] def post_connect(self):
"""Fill in some handy information; usually desired after connecting"""
if self['column_header_row'] is None:
self.guess_header_row()
default_data_row = self['column_header_row'] + 1 + (self['units_row'] is not None)
self['data_start_row'] = self['data_start_row'] or default_data_row
self.get_columns()
[docs] @staticmethod
def help():
"""Prints help"""
print(setup_help)
[docs] def direct_write(self, value, *args):
"""
Writes data to a single cell in the remote sheet
:param value: numeric or string
This will be written to the cell
:param args: address information, such as (column, row) or address string.
Examples: ('A', 1), (0, 0), 'A1'.
See interpret_row_col() for more information about how to specify row and column.
"""
from omas.omas_core import force_imas_type
column_index, row_index, address = interpret_row_col(*args)
self.sheet_connection().update_value(address, force_imas_type(value))
self.record_request()
self['__cache__']['changed'] = True
[docs] def direct_read(self, *args, formatted=False):
"""
Reads data from a single cell in the remote sheet
:param args: passed to interpret_row_col
Provide address information here, like ('A', 1), (0, 0), or 'A1'
:param formatted: bool
Get the formatted value (floats will be rounded according to sheet display settings)
:return: str or number
If formatted = True: str
Might contain something that can be trivial cast as int or float, but it's always in a string
If formatted = False: str or number
With formatting off, numbers may be returned
"""
column_index, row_index, address = interpret_row_col(*args)
self.obey_rate_limit()
cell = self.sheet_connection().cell(address)
self.record_request()
if formatted:
return cell.value
else:
return cell.value_unformatted
[docs] def time_until_throttle_okay(self):
"""Checks history of previous requests and decides how long we must wait before making a new request"""
# pygsheets has a seconds_per_quota setting, but all it does is wait that long if it hits an exception.
# It won't be smart and figure out that you only have to wait N seconds to be under the rate again.
t0 = time.time()
self['scratch']['t0'] = t0
recent_requests = np.sum((t0 - self['previous_write_times']) < self['throttle_interval_s'])
if recent_requests < self['throttle_requests_in_interval']:
printd(f'Number of recent requests is only {recent_requests}, so there is no need to wait.')
wait_time = 0
else:
while np.max((t0 - self['previous_write_times']) - self['throttle_interval_s']) > 0:
# Remove old values that are past timeout
self['previous_write_times'] = self['previous_write_times'][1:]
# How long did that stuff take? I don't know, let's update t0 just in case we wasted time
t0 = time.time()
since = t0 - self['previous_write_times']
since_interval = since - self['throttle_interval_s']
possible_wait_times = np.arange(self['throttle_interval_s'] + 1)
since_interval_after_wait = since_interval[:, np.newaxis] + possible_wait_times[np.newaxis, :]
in_interval_after_wait = np.sum(since_interval_after_wait <= 0, axis=0)
acceptable = in_interval_after_wait < self['throttle_requests_in_interval']
wait_time = possible_wait_times[np.where(acceptable)[0][0]]
self['scratch']['since'] = since_interval
self['scratch']['since_interval'] = since_interval
self['scratch']['since_interval_after_wait'] = since_interval_after_wait
self['scratch']['in_interval_after_wait'] = in_interval_after_wait
self['scratch']['acceptable'] = acceptable
return wait_time
[docs] def obey_rate_limit(self):
"""Waits as needed to avoid rate limit"""
wait_time = self.time_until_throttle_okay()
if wait_time > 0:
printw('Connections are being throttled to avoid an error due to too many requests to Google')
print(f'Waiting {wait_time} seconds before making another request...')
time.sleep(wait_time)
[docs] def throttled_write(self, value, *args):
"""
Checks whether it's okay to make a request and waits if necessary to avoid an error
:param value: numeric or string
:param args: arguments to pass to direct_read()
"""
self.obey_rate_limit()
self.direct_write(value, *args)
[docs] def throttled_read(self, *args, **kw):
"""
Checks whether it's okay to make a request and waits if necessary to avoid an error
:param args: arguments to pass to direct_read()
:param kw: keywords to pass to direct_read()
:return: value returned from direct_read()
"""
self.obey_rate_limit()
self.direct_read(*args, **kw)
[docs] def get_columns(self):
"""Gets a list of column names from the header row"""
if self['column_header_row'] is not None:
self.obey_rate_limit()
self['columns'] = self.sheet_connection().get_row(self['column_header_row'] + 1)
self.record_request()
else:
print('Cannot get columns until column_header_row is specified')
[docs] def find_column(self, column_name, quiet=False):
"""
Finds the column index corresponding to a column name
:param column_name: str
Column header name (should be in row given by column_header_row)
:param quiet: bool
No printing
:return: int (success) or None (failed to find column)
Column index (from 0; not number from 1)
"""
indices = np.where(np.array(self['columns']) == column_name)[0]
if len(indices) > 1:
print(f'WARNING! More than one column matches {column_name}! They are: {indices}')
return [idx for idx in indices]
elif len(indices) == 0:
if not quiet:
print(f"Error! Column {repr(column_name)} not found in {self['columns']}")
return None
else:
return int(indices[0])
[docs] def update_cache(self, force=True):
"""
Updates the cached representation if forced or if it seems like it's a good idea
If it's been a while since the cache was updated, do another update.
If we know there was a write operation since the last update, do another update.
If the cache is missing, grab the data and make the cache.
If force=True, do another update.
:param force: bool
Force an update, even if other indicators would lead to skipping the update
"""
dt = time.time() - self['__cache__'].get('time', 0)
update_needed = (
force
or self['__cache__'].get('changed', True)
or (dt > self['cache_expiration'])
or (self['__cache__'].get('values', None) is None)
)
if not update_needed:
return
self.obey_rate_limit()
self['__cache__']['values'] = self.sheet_connection().get_all_values(
returnas='matrix',
majdim='COLS',
include_tailing_empty=True,
include_tailing_empty_rows=True,
value_render='UNFORMATTED_VALUE',
date_time_render_option='SERIAL_NUMBER',
)
self.record_request()
self['__cache__']['time'] = time.time()
self['__cache__']['changed'] = False
[docs] def get_column(self, column, force_update_cache=False, disable_cache=False):
"""
Gets data from a column
By default, the local cache will be checked & updated if needed and then data will be read from the cache.
Caching can be disabled, which will result in more connections to the remote sheet. Some other methods have
been programmed assuming that local caching done here will save them from otherwise inefficient layout of
calls to this function.
:param column: int or str
Column index (from 0) or column letter (like 'A', 'B', ... 'ZZ')
:param force_update_cache: bool
Force the cache to update before reading
:param disable_cache: bool
Don't go through the local cache; read the remote column directly
:return: list
Data from the column
"""
col_idx = interpret_row_col(column, 1)[0]
if disable_cache:
self.obey_rate_limit()
tmp = self.sheet_connection().get_col(col_idx + 1)
self.record_request()
return tmp
self.update_cache(force=force_update_cache)
return self['__cache__']['values'][col_idx]
[docs] def get_column_by_name(self, column_name, **kw):
"""
Gets data from a column by name
:param column_name: str
Name of the column in the header row
:param kw: additional parameters passed to get_column
:return: list
Data from the column
"""
return self.get_column(self.find_column(column_name), **kw)
[docs] def get_row(self, row_idx, force_update_cache=False, disable_cache=False):
"""
Gets data from a row
By default, cached results are used after updating the local cache as needed (see get_column for details).
:param row_idx: int
Row index (from 0)
:param force_update_cache: bool
Force the cache to update before reading
:param disable_cache: bool
Don't go through the local cache; read the remote row directly
:return: array
Data from the row
"""
if disable_cache:
self.obey_rate_limit()
tmp = self.sheet_connection().get_row(row_idx + 1)
self.record_request()
return np.array(tmp)
self.update_cache(force=force_update_cache)
return np.array(self['__cache__']['values']).T[row_idx]
# End of class OMFITgoogleSheet
[docs]class OMFITexperimentTable(OMFITgoogleSheet):
"""
Extends OMFITgoogleSheet by assuming each row corresponds to a shot, allowing more features to be provided.
This is less general (now Shot must exist), but it may be more convenient
to look up data by shot this way. With more assumptions about the structure
of the sheet, more methods can be implemented.
Many methods go further to assume that there is a column named 'Device' and
that there are two columns that can be used to determine a time range. Parts
of the class should work on sheets without this structure, but others will
fail.
An example sheet that is compatible with the assumptions made by this class may be found here:
https://docs.google.com/spreadsheets/d/1MJ8cFjFZ2pkt4OHWWIciWT3sM2hSADqzG78NDiqKgkU/edit?usp=sharing
A sample call that should work to start up an OMFITgoogleSheet instance is:
>> xtable = OMFITexperimentTable(
>> keyfile=os.sep.join([OMFITsrc, '..', 'samples', 'omfit-test-gsheet_key.json']),
>> sheet_name='OmfitDataSheetTestsheet',
>> subsheet_name='Sheet1', # Default: lookup first subsheet
>> column_header_row_idx=5, # Default: try to guess
>> units_row_idx=6, # Default: search for units row
>> data_start_row_idx=7, # Default: find first number after the header row in the shot column
>> )
This call should connect to the example sheet. These data are used in regression tests and should be updated
If the test sheet is changed.
"""
def __init__(self, *args, **kw):
"""
:param kw: passed to parent class: OMFITgoogleSheet
"""
self.OMFITproperites = {}
data_start_row_idx = kw.get('data_start_row_idx', None)
if data_start_row_idx is None and kw.get('data_start_row_number', None) is not None:
data_start_row_idx = kw['data_start_row_number'] - 1
self.input_data_start_row_idx = data_start_row_idx
super().__init__(*args, **kw)
self.shots_raw = None
[docs] def post_connect(self):
"""
Do some inspection on the sheet after connecting to it
"""
super().post_connect()
shot_col_idx = self.find_column('Shot')
if shot_col_idx is None:
raise OMFITexperimentTableSetupError(
f"The sheet must contain a \"Shot\" column:\n"
f"one cell in the header row must contain the exact text \"Shot\".\n"
f"Was the header row specified correctly?\n"
f"column_header_row_idx (currently {self['column_header_row']}) counts from 0,\n"
f"while column_header_row_number (currently {self['column_header_row'] + 1}) counts from 1. "
f"({self['column_header_row'] + 1} is what appears in Google's interface)"
)
self.shots_raw = self.get_column(shot_col_idx)
self['units_row'] = self['units_row'] or self.find_units_row()
self['data_start_row'] = self.input_data_start_row_idx or self.find_data_start_row()
self['shots'] = self.shots_raw[self['data_start_row'] :]
# Ensure there are no trailing blank entries in the list of shots
notblank = np.where([len(str(shot)) > 0 for shot in self['shots']])[0]
if len(notblank):
self['shots'] = self['shots'][: notblank[-1] + 1]
[docs] def self_check(self, essential_only=False):
# Add additional checks here
super().self_check(essential_only=essential_only)
[docs] def find_units_row(self):
"""
Recommended: put a row of units under the column headers (data won't start on the next row after headers)
Were recommendations followed? See if 1st row under headers has "units" under a
column named Shot, id, or ID. In this case, data start on the row after units.
This function is easier to implement under OMFITexperimentTable instead of
OMFITgoogleSheet because we are allowed to make additional assumptions about the
content of the table.
If there is no cell containing just 'units', 'Units', or 'UNITS', but you do
have a row for units, you should specify units_row at init.
:return: int
Index (from 0) of the row with units
"""
if self.shots_raw is None:
self.connect()
# If one of the entries under "Shot" is just 'units', this is easy. Just find it.
units_names = ['units', 'Units', 'UNITS']
for uu in units_names:
if uu in self.shots_raw:
return np.where(np.array(self.shots_raw) == uu)[0][0]
# So 'units' wasn't written under 'Shot'. Maybe under 'id' or something?
col_names = ['id', 'ID', 'Device']
for col_name in col_names:
col_idx = self.find_column(col_name, quiet=True)
if col_idx is None:
continue
self.obey_rate_limit()
raw_col_values = self.sheet_connection().get_col(col_idx + 1)
self.record_request()
for uu in units_names:
if uu in raw_col_values:
return np.where(np.array(raw_col_values) == uu)[0][0]
# Still didn't find it? Maybe a "units" row wasn't included.
return None
[docs] def find_data_start_row(self):
"""
Determine the first row of data.
Allowing this to be different from header + 1 means meta data can be inserted under the column headers.
Since this class imposes the requirement that the "Shot" column exists & has shot numbers, we can find the
first valid number in that column, after the header row, and be safe in assuming that the data start there.
To allow for nonstandard configurations of blank cells, please specify data_start_row manually in init.
:return: int
Index (from 0) of the row where data start.
"""
if self.shots_raw is None:
self.connect()
for i, val in enumerate(self.shots_raw[self['column_header_row'] :]):
if i == 0:
continue # Just skip the header row
try:
int(val)
except ValueError:
pass
else:
return self['column_header_row'] + i
# Didn't find a value that could be cast as int after the header
return self['column_header_row'] + (self['units_row'] is not None) + 1
[docs] def extract_column(self, column, pad_with='', force_type=None, fill_value=np.NaN, fill_if_not_found='not found'):
"""
Downloads data for a given column and returns them with units
:param column: str or int
str: Name of the column; index will be looked up
int: Index of the column, from 0. Name of the column can be looked up easily.
:param pad_with: str
Value to fill in to extend the column if it is too short.
Truncation of results to the last non-empty cell is possible, meaning columns
can have inconsistent lengths if some have more empty cells than others.
:param force_type: type [optional]
If provided, pass col_values through list_to_array_type() to force them to match force_type.
:param fill_value: object
Passed to list_to_array_type(), if used; used to fill in cells that can't be forced to force_type.
It probably would make sense if this were the same as pad_with, but it doesn't have to be.
It does have to be of type force_type, though.
:param fill_if_not_found: str
Value to fill in if data are not found, such as if column_name is None.
:return: (array, str or None)
values in the column, padded or cut to match the number of shots
replaced by `fill_if_not_found` in case of bad column specifications
replaced by `fill_value` for individual cells that can't be forced to type `force_type`
extended by `pad_with` as needed
units, if available or None otherwise
"""
if 'shots' not in self:
self.connect()
ns = len(self['shots'])
if is_string(column):
column_name = column
column_idx = self.find_column(column_name)
elif isinstance(column, (int, np.integer)):
column_idx = column
column_name = self['columns'][column_idx]
else:
raise ValueError('Invalid column specification; should be string (column_name) or int (column_index)')
if column_name is None:
return np.array([fill_if_not_found] * ns), None
col_values0 = self.get_column(column_idx)
deficit = (ns + self['data_start_row']) - len(col_values0)
if deficit > 0:
col_values0 += [pad_with] * deficit
elif deficit < 0:
col_values0 = col_values0[:deficit]
if self.get('units_row', None) is not None:
units = col_values0[self['units_row']]
else:
units = None
col_values = col_values0[self['data_start_row'] :]
# print(f'units of {column_name} are listed as {repr(units)}')
if force_type is not None:
col_values = list_to_array_type(col_values, type_out=force_type, fill_value=fill_value)
else:
col_values = np.array(col_values)
return col_values, units
[docs] def interpret_timing(self, **signal):
"""
Interprets timing information in the signal dictionary for write_mds_data_to_table().
Either tmin and tmax or time and dt are required to be in the dictionary as keys.
See write_mds_data_to_table() documentation for more information.
:param signal: Just pass in one of the dictionaries in the list of signals.
:return: (float array, float array)
Array of start times and array of end times in ms. One entry per row.
"""
tmin = signal.get('tmin', None)
tmax = signal.get('tmax', None)
t = signal.get('time', None)
dt = signal.get('dt', None)
if tmin is not None and tmax is not None:
if is_string(tmin):
tmin_name = tmin
tmin, _ = self.extract_column(tmin, force_type=float)
else:
tmin_name = None
tmin = np.zeros(len(self['shots'])) + float(tmin)
if is_string(tmax):
tmax_name = tmax
tmax, _ = self.extract_column(tmax, force_type=float)
else:
tmax_name = None
tmax = np.zeros(len(self['shots'])) + float(tmax)
print(f'Got timing from tmin (column {tmin_name}) and tmax (column {tmax_name})')
elif t is not None and dt is not None:
if is_string(t):
time_name = t
t, _ = self.extract_column(t, force_type=float)
else:
time_name = None
t = np.zeros(len(self['shots'])) + float(t)
if is_string(dt):
dt_name = dt
dt, _ = self.extract_column(dt, force_type=float)
else:
dt_name = None
dt = np.zeros(len(self['shots'])) + float(dt)
tmin = t - dt
tmax = t + dt
print(f'Got timing from time (column {time_name}) and dt (column {dt_name})')
else:
raise ValueError('Need a valid pair of tmin & tmax or time & dt')
return tmin, tmax
[docs] def interpret_signal(self, i, device_servers, **signal):
"""
Interprets signal specification data for write_mds_data_to_table()
:param i: int
Row / shot index
:param device_servers: dict-like
Dictionary listing alternative servers for different devices, like {'EAST': 'EAST_US'}
:param signal: Just pass in one of the dictionaries in the list of signals.
:return: tuple containing:
server: str
tree: str or None
tdi: str
factor: float
tfactor: float
device: str
"""
devices = self.extract_column('Device')[0]
column_name = signal['column_name'] # str, required
server = device_servers.get(devices[i], devices[i])
# Device-specific TDIs and treenames are possible if these are set as dictionaries.
# Universal TDIs and treenames are easy; just don't put in a dictionary.
# treename of None means PTDATA (d3d only).
# TDI of None means re-use column_name (so you can put pointnames in as column_names and type this in once)
def get_the_thing(name, default):
info = signal.get(name, default)
if isinstance(info, dict):
thingy = info.get(devices[i], default)
else:
thingy = info
return thingy
tree = get_the_thing('treename', None)
tdi = get_the_thing('TDI', column_name)
tdi = tdi or column_name
factor = get_the_thing('factor', 1.0)
tfactor = get_the_thing('tfactor', 1.0)
return server, tree, tdi, factor, tfactor, devices[i]
[docs] def write_mds_data_to_table(self, signals, overwrite=False, device_servers=None):
"""
Gets MDSplus data for a list of signals, performs operations on specified time ranges, & saves results.
This sample signal can be used as an instructional example or as an input for testing
>> sample_signal_request = dict(
>> column_name='density', # The sheet must have this column (happens to be a valid d3d ptname)
>> TDI=dict(EAST=r'\dfsdev'), # d3d's TDI isn't listed b/c TDI defaults to column_name
>> treename=dict(EAST='PCS_EAST'), # DIII-D isn't listed b/c default is None, which goes to PTDATA
>> factor={'DIII-D': 1e-13}, # This particular EAST pointname doesn't require a factor; defaults to 1
>> tfactor=dict(EAST=1e3), # d3d times are already in ms
>> tmin='t min', # str means read timing values from column w header exactly matching this string
>> tmax='t max',
>> )
This sample should work with the example/test sheet.
If the sheet in question had ONLY DIII-D data, the same signal request could be accomplished via:
>> simple_sample_signal_request = dict(column_name='density', factor=1e-13, tmin='t min', tmax='t max')
This sample won't work with the test sheet; it will fail on the EAST shots.
We are exploiting the shortcut that we've used a valid pointname (valid for d3d at least)
as the column header. If you want fancy column names, you have to specify TDIs.
We are also relying on the defaults working for this case.
The sample code in this docstring is interpreted and used by the regression test, so don't break it.
Separate different samples with non-example lines (that don't start with >>)
:param signals: list of dict-likes
Each item in this list should be a dict-like that contains information needed to fetch & process data.
SIGNAL SPECIFICATION
- column_name: str (hard requirement)
Name of the column within the sheet
- TDI: None, str, or dict-like (optional if default behavior (reuse column_name) is okay)
None: reuse column_name as the TDI for every row
str: use this str as the pointname/TDI for every row
dict-like: keys should be devices. Each device can have its own pointname/TDI.
The sheet must have a Device row. If a device is missing, it will inherit the column_name.
- treename: None, str, or dict-like (optional if default behavior (d3d ptdata) is okay)
None: PTDATA (DIII-D only)
str: use this string as the treename for every row
dict-like: device specific treenames. The sheet must have a Device row.
- factor: float or dict-like (defaults to 1.0)
float: multiply results by this number before writing
dict-like: multiply results by a device specific number before writing (unspecified devices get 1)
- tfactor: float or dict-like (defaults to 1.0)
float: multiply times by this number to get ms
dict-like: each device gets a different factor used to put times in ms (unspecified devices get 1)
PROCESSING
- operation: str (defaults to 'mean')
Operation to perform on the gathered data. Options are:
- 'mean'
- 'median'
- 'max'
- 'min'
- tmin: float or str
Start of the time range in ms; used for processing operations like average.
Must be paired with tmax.
A usable tmin/tmax pair takes precedence over time+dt.
A float is used directly. A string triggers lookup of a column in the sheet; then every row gets
its own number determined by its entry in the specified column.
- tmax: float or str
End of the time range in ms. Must be paired with tmin.
- time: float or str
Center of a time range in ms. Ignored if tmin and tmax are supplied. Must be paired with dt.
- dt: float or str
Half-width of time window in ms. Must be paired with time.
:param overwrite: bool
Update the target cell even if it's not empty?
:param device_servers: dict-like [optional]
Provide alternative MDS servers for some devices. A common entry might be {'EAST': 'EAST_US'}
to use the 'EAST_US' (eastdata.gat.com) server with the 'EAST' device.
"""
# I'm assuming that caching in get_column() will save me from inefficient organization of read operations
# In order to be good without caching, this function should figure out all the data it needs and read them
# all up-front. That is, it should build its own temporary cache instead of relying on external caching.
# But doing that adds a lot of development work and increases the maintenance burden, so I'm going to rely
# on external caching instead.
dserv = device_servers or {}
if self.get('shots', None) is None:
self.connect()
shots = self['shots']
for signal in signals:
column_name = signal['column_name']
column_idx = self.find_column(column_name)
column_values, column_units = self.extract_column(column_name)
tmin, tmax = self.interpret_timing(**signal)
for i, shot in enumerate(shots):
if (not overwrite) and len(str(column_values[i])) > 0:
print(f'Cell {column_name} {shot} already has a value and overwrite is off; skipping')
continue
server, tree, tdi, factor, tf, _ = self.interpret_signal(i, device_servers=dserv, **signal)
mds = OMFITmdsValue(server, shot=shot, treename=tree, TDI=tdi)
if not mds.check():
print(f'Bad MDS connection for {server}#{shot} {tree} / {tdi}; skipping')
continue
sel = ((mds.dim_of(0) * tf) >= tmin[i]) & ((mds.dim_of(0) * tf) <= tmax[i])
data = mds.data()[sel] * factor
value = operate_on_data(data, signal.get('operation', 'mean'))
self.throttled_write(value, column_idx, self['data_start_row'] + i)
[docs] def write_efit_result(self, pointnames=None, **kw):
r"""
Wrapper for write_mds_data_to_table for quickly setting up EFIT signals.
Assumes that device-specific variations on EFIT pointnames and primary trees
are easy to guess, and that you have named your columns to exactly match EFIT
pointnames in their "pure" form (no leading \).
Basically, it can build the signal dictionaries for you given a list of
pointnames and some timing instructions.
Here is a set of sample keywords that could be passed to this function:
>> sample_kw = dict(
>> pointnames=['betan'],
>> tmin='t min', # Can supply a constant float instead of a string
>> tmax='t max',
>> overwrite=True,
>> device_servers=dict(EAST='EAST_US'), # Probably only needed for EAST
>> )
These should work in xtable.write_efit_result where xtable is an
OMFITexperimentTable instance connected to a google sheet that contains columns
with headers 'betan', 't min', and 't max'
:param pointnames; list of strings
Use names like betan, etc. This function will figure out whether you really need \betan instead.
:param kw: more settings, including signal setup customizations.
* MUST INCLUDE tmin & tmax OR time & dt!!!!!!!!! <---- don't forget to include timing data
* Optional signal customization: operation, factor
* Remaining keywords (other than those listed so far) will be passed to write_mds_data_to_table
* Do not include column_name, TDI, treename, or tfactor, as these are determined by this function.
If you need this level of customization, just use write_mds_data_to_table() directly.
"""
signal_details = {tmp: kw.pop(tmp, None) for tmp in ['tmin', 'tmax', 'time', 'dt', 'operation', 'factor']}
for sd in list(signal_details):
if signal_details[sd] is None:
signal_details.pop(sd)
signals = [
dict(
column_name=pointname,
TDI={'DIII-D': pointname, 'EAST': rf'\{pointname}'},
treename=dict(EAST='EFIT_EAST'),
tfactor=dict(EAST=1e3),
**signal_details,
)
for pointname in pointnames
]
self.write_mds_data_to_table(signals, **kw)
# End of class OMFITexperimentTable
############################################
if __name__ == '__main__':
# Basic class import test
if pygsheets is not None:
test_classes_main_header()
test_sheet_data = get_definition_from_docstring(OMFITexperimentTable, return_kw=True)['xtable_kw']
et = OMFITexperimentTable(
keyfile=os.sep.join([OMFITsrc, '..', 'samples', 'omfit-test-gsheet_key.json']),
sheet_name=test_sheet_data['sheet_name'],
column_header_row_idx=test_sheet_data['column_header_row_idx'],
units_row_idx=test_sheet_data['units_row_idx'],
data_start_row_idx=test_sheet_data['data_start_row_idx'],
)