# Author: Cameron F. Abrams <cfa22@drexel.edu>
"""
.. module:: pdbrecord
:synopsis: defines the PDBRecord class
.. moduleauthor: Cameron F. Abrams, <cfa22@drexel.edu>
"""
from __future__ import annotations
from collections import UserList, UserDict
from enum import Enum, auto
from .baserecord import BaseRecord, BaseRecordParser
from .baseparsers import StringParser
import logging
logger = logging.getLogger(__name__)
[docs]
class tokengroup:
"""
A class to represent a group of tokens with a label and a method to add tokens.
"""
def __init__(self, tokname, tokval, determinant=True):
if determinant:
self.label = f'{tokname}.{tokval}'
else:
self.label = f'{tokname}'
self.add_token(tokname, tokval)
[docs]
def add_token(self, tokname, tokval):
"""
Add a token to the token group.
Parameters
----------
tokname : str
The name of the token.
tokval : str
The value of the token.
"""
self.__dict__[tokname] = tokval
[docs]
class PDBRecord(BaseRecord):
"""
A class representing a PDB record, inheriting from :class:`.baserecord.BaseRecord`.
It provides methods for parsing and handling PDB records, including embedded records and tokens.
"""
continuation = '0'
[docs]
@classmethod
def base_parse(cls, current_key, pdbrecordline: str, current_format: dict, typemap: dict):
"""
Parse a PDB record line based on the provided format and type mapping.
This method handles the parsing of the PDB record line according to the specified format.
It extracts fields, subrecords, allowed values, and concatenated fields based on the format
and type mapping provided. It also checks for subrecords and handles them accordingly.
Parameters
----------
current_key : str
The key for the current record being parsed.
pdbrecordline : str
The line from the PDB file to be parsed.
current_format : dict
The format dictionary defining the structure of the PDB record.
typemap : dict
A dictionary mapping field names to their types.
Returns
-------
tuple
A tuple containing the parsed input dictionary, the current key, and the current format.
"""
local_record_format = current_format.copy()
fields = local_record_format.get('fields', {})
subrecords = local_record_format.get('subrecords', {})
allowed_values = local_record_format.get('allowed', {})
concats = local_record_format.get('concatenate', {})
input_dict = StringParser(fields, typemap, allowed=allowed_values).parse(pdbrecordline)
for cfield, subf in concats.items():
if not cfield in input_dict:
input_dict[cfield] = []
for f in subf:
assert f in input_dict, f'{current_key} specifies a field for concatenation ({f}) that is not found'
if input_dict[f]:
input_dict[cfield].append(input_dict[f])
if subrecords:
assert 'formats' in subrecords, f'{current_key} is missing formats from its subrecords specification'
assert 'branchon' in subrecords, f'{current_key} is missing specification of base key from its subrecords specification'
assert subrecords['branchon'] in input_dict, f'{current_key} specifies a base record that is not found'
required = subrecords.get('required', True)
if required or input_dict[subrecords['branchon']] in subrecords['formats']:
assert input_dict[subrecords['branchon']] in subrecords['formats'], f'Key "{current_key}" is missing specification of a required subrecord format for field "{subrecords["branchon"]}" value "{input_dict[subrecords["branchon"]]}" from its subrecords specification'
subrecord_format = subrecords['formats'][input_dict[subrecords['branchon']]]
new_key = f'{current_key}.{input_dict[subrecords["branchon"]]}'
input_dict, current_key, current_format = PDBRecord.base_parse(new_key, pdbrecordline, subrecord_format, typemap)
return input_dict, current_key, current_format
[docs]
@classmethod
def newrecord(cls, base_key: str, pdbrecordline: str, record_format: dict, typemap: dict):
"""
Create a new PDBRecord instance from a PDB record line and its format.
This method parses the PDB record line according to the specified format and type mapping,
and returns a new instance of the PDBRecord class with the parsed data.
Parameters
----------
base_key : str
The base key for the PDB record.
pdbrecordline : str
The line from the PDB file to be parsed.
record_format : dict
The format dictionary defining the structure of the PDB record.
typemap : dict
A dictionary mapping field names to their types.
Returns
-------
:class:`PDBRecord`
A new instance of the PDBRecord class containing the parsed data.
"""
# logger.debug(f'newrecord pdbrecordline "{pdbrecordline}"')
while len(pdbrecordline) < 80:
pdbrecordline += ' '
input_dict, current_key, current_format = PDBRecord.base_parse(base_key, pdbrecordline, record_format, typemap)
continuation_custom_fieldname = current_format.get('continuation', None)
if continuation_custom_fieldname:
input_dict['continuation'] = str(input_dict[continuation_custom_fieldname])
if input_dict.get('continuation', '') == '':
input_dict['continuation'] = '0'
inst = cls(input_dict)
inst.key = current_key
inst.format = current_format
return inst
def __repr__(self):
return f'<PDBRecord {self.__dict__}>'
[docs]
def get_token(self, key):
"""
Retrieve a token value from the PDBRecord instance based on the provided key.
Parameters
----------
key : str
The key for the token to retrieve.
Returns
-------
str or None
The value of the token if found, or None if the token does not exist.
"""
if not hasattr(self, 'tokengroups'):
return None
values = {}
for k, tg in self.tokengroups.items():
for kk, tl in tg.items():
if key in tl.__dict__:
values[kk] = tl.__dict__[key]
if len(values) == 1:
return list(values.values())[0]
else:
return values
[docs]
def continue_record(self, other, record_format, **kwargs):
"""
Continue a PDBRecord instance with another PDBRecord instance.
This method merges the attributes of the other PDBRecord instance into the current instance,
handling continuation fields and concatenating values as necessary.
Parameters
----------
other : PDBRecord
The other PDBRecord instance to merge with.
record_format : dict
The format dictionary defining the structure of the PDB record.
kwargs : dict, optional
Additional keyword arguments, such as 'all_fields' to specify whether to include all fields.
all_fields : bool, optional
If True, all fields from the record format will be considered for continuation.
If False, only the fields specified in the record format will be considered.
Returns
-------
None
This method modifies the current instance in place.
"""
all_fields = kwargs.get('all_fields', False)
continuing_fields = record_format.get('continues', record_format['fields'].keys() if all_fields else {})
logger.debug(f'{self.key} {continuing_fields}')
for cfield in continuing_fields:
if isinstance(self.__dict__[cfield], str):
if isinstance(other.__dict__[cfield], str):
self.__dict__[cfield] += ' ' + other.__dict__[cfield]
elif isinstance(other.__dict__[cfield], list):
self.__dict__[cfield] = [self.__dict__[cfield]]
self.__dict__[cfield].extend(other.__dict__[cfield])
elif isinstance(self.__dict__[cfield], list):
if not isinstance(other.__dict__[cfield], list):
assert type(self.__dict__[cfield][0]) is type(other.__dict__[cfield])
self.__dict__[cfield].append(other.__dict__[cfield])
else:
self.__dict__[cfield].extend(other.__dict__[cfield])
else:
self.__dict__[cfield] = [self.__dict__[cfield], other.__dict__[cfield]]
[docs]
def parse_tokens(self, typemap):
"""
Parse tokens from the PDBRecord instance based on the record format.
This method checks if the record format contains token formats and parses them accordingly.
Parameters
----------
typemap : dict
A dictionary mapping field names to their types.
Returns
-------
None
This method modifies the PDBRecord instance in place, adding a `tokengroups` attribute
that contains the parsed tokens grouped by their labels.
"""
record_format = self.format
if not 'token_formats' in record_format:
return
attr_w_tokens = record_format['token_formats']
logger.debug(f'{self.key} {list(attr_w_tokens.keys())}')
self.tokengroups = {} # one tokengroup per attribute in attr_w_tokens
for a in attr_w_tokens.keys():
obj = self.__dict__[a] # expect to be a list
assert isinstance(obj, list), f'Invalid type {type(obj)} for {obj} for token parsing; expecting a list of token-strings'
tdict = attr_w_tokens[a]['tokens']
determinants = attr_w_tokens[a].get('determinants', [])
assert len(determinants) in [0, 1], f'Token group for field {a} of {self.key} may not have more than one determinant'
logger.debug(f'token names {list(tdict.keys())} determinants {determinants}')
self.tokengroups[a] = {}
current_tokengroup = None
for i in range(len(self.__dict__[a])):
pt = self.__dict__[a][i]
toks = [x.strip() for x in pt.split(':')]
if len(toks) != 2: # this is not a token-bearing string
logger.debug(f'ignoring tokenstring: {toks}')
continue
tokkey = None
try:
tokname, tokvalue = [x.strip() for x in pt.split(':')]
except ValueError:
logger.warning(f'Invalid format for token-string {pt}')
continue
logger.debug(f'Found {tokname} : {tokvalue}')
if not tokname in tdict.keys():
for k, v in tdict.items():
if 'key' in v:
logger.debug(f'comparing {tokname} to {v["key"]}')
if tokname == v['key']:
tokkey = k
else:
tokkey = tokname
if not tokkey:
logger.debug(f'Ignoring token {tokname} in record {self.key}')
continue
typ = typemap[tdict[tokkey]['type']]
multiline = tdict[tokkey].get('multiline', False)
tokvalue = typ(tokvalue)
if multiline:
i += 1
while i < len(self.__dict__[a]) and self.__dict__[a][i] != '':
tokvalue += ' ' + self.__dict__[a][i].strip()
i += 1
if tokkey in determinants:
detrank = determinants.index(tokkey)
if detrank == 0:
logger.debug(f'new det tokgroup {tokkey} {tokvalue}')
new_tokengroup = tokengroup(tokkey, tokvalue)
self.tokengroups[a][new_tokengroup.label] = new_tokengroup
current_tokengroup = self.tokengroups[a][new_tokengroup.label]
else:
assert False, 'should never have a detrank>0'
pass # should never happen
else: # assume we are adding tokens to the last group
if not current_tokengroup:
# we have not encoutered the determinant token
# so we assume there is not one
logger.debug(f'new nondet tokgroup {tokkey} {tokvalue}')
new_tokengroup = tokengroup(tokkey, tokvalue, determinant=False)
self.tokengroups[a][new_tokengroup.label] = new_tokengroup
else:
current_tokengroup.add_token(tokkey, tokvalue)
class _EmbedState(Enum):
SEARCHING = auto() # waiting for the signal line
PRE_CAPTURE = auto() # signal seen; skipping lines or gathering tokens
CAPTURING = auto() # recording embedded records until blank line
def _setup_embed_context(self, ename, espec, format_dict, typemap):
"""Validate one embed spec and build its parsers. Returns a context dict."""
assert espec['from'] in self.__dict__, \
f'Record {self.key} references an invalid base field [{espec["from"]}] from which to extract embeds'
assert 'signal' in espec, \
f'Record {self.key} has an embed spec {ename} for which no signal is specified'
assert 'value' in espec, \
f'Record {self.key} has an embed spec {ename} for which no value for signal {espec["signal"]} is specified'
idxparse = None
if 'record_index' in espec:
idxparse = BaseRecordParser({'record_index': espec['record_index']}, typemap).parse
if isinstance(espec['record_format'], str):
embedfmt = format_dict.get(espec['record_format'], {})
assert embedfmt != {}, \
f'Record {self.key} contains an embedded_records specification with an invalid record format [{espec["record_format"]}]'
else:
assert isinstance(espec['record_format'], dict), \
f'Record {self.key} has an embed spec {ename} for which no format is specified'
embedfmt = espec['record_format']
tokenize = espec.get('tokenize', {})
headers = espec.get('headers', {})
tokenparser = headertokenparser = None
if tokenize:
tokenparser = BaseRecordParser({'token': tokenize['from']}, typemap).parse
if headers:
headertokenparser = BaseRecordParser(
{k: v['format'] for k, v in headers['formats'].items()}, typemap
).parse
return {
'sigparse': BaseRecordParser({'signal': espec['signal']}, typemap).parse,
'idxparse': idxparse,
'terparse': BaseRecordParser({'blank': ['String', [12, 80]]}, typemap).parse,
'embedfmt': embedfmt,
'skiplines': espec.get('skiplines', 0),
'tokenize': tokenize,
'headers': headers,
'tokenparser': tokenparser,
'headertokenparser': headertokenparser,
}
[docs]
def parse_embedded(self, format_dict, typemap):
"""
Parse embedded records within the PDBRecord instance based on the record format.
This method checks if the record format contains embedded records and parses them accordingly.
Parameters
----------
format_dict : dict
A dictionary mapping field names to their formats.
typemap : dict
A dictionary mapping field names to their types.
"""
logger.debug(f'Parsing embedded')
new_records = {}
record_format = self.format
if 'embedded_records' not in record_format:
return
base_key = self.key
for ename, espec in record_format.get('embedded_records', {}).items():
logger.debug(f'Embedded {ename}')
ctx = self._setup_embed_context(ename, espec, format_dict, typemap)
embedfrom = espec['from']
token_hold = {}
header_hold = []
embedkey = base_key
lskip = 0
current_division = 0
state = self._EmbedState.SEARCHING
for record in self.__dict__[embedfrom]:
if state == self._EmbedState.SEARCHING:
sigrec = ctx['sigparse'](record)
if sigrec.signal != espec['value']:
logger.debug(f'Ignoring {record}')
continue
idx = None if not ctx['idxparse'] else ctx['idxparse'](record).record_index
embedkey = f'{base_key}.{ename}' + (str(idx) if idx else '')
if not ctx['skiplines'] and not ctx['tokenize'] and not ctx['headers']:
state = self._EmbedState.CAPTURING
else:
state = self._EmbedState.PRE_CAPTURE
elif state == self._EmbedState.PRE_CAPTURE:
if ctx['skiplines']:
logger.debug(f'Skipping {record}')
lskip += 1
if lskip == ctx['skiplines']:
state = self._EmbedState.CAPTURING
continue
logger.debug(f'Parsing "{record}"')
if ctx['tokenize']:
is_ht = header_or_token(record, ctx['tokenize']['d'], ctx['headers'],
ctx['tokenparser'], ctx['headertokenparser'],
token_hold, header_hold)
if is_ht:
continue
# first non-token line transitions to CAPTURING
state = self._EmbedState.CAPTURING
new_div = capture_record(record, ctx['embedfmt'], typemap, embedkey,
ctx['headers'], header_hold, token_hold,
current_division, new_records)
if new_div:
current_division += 1
logger.debug(f'First capture into division {current_division}')
elif state == self._EmbedState.CAPTURING:
if ctx['terparse'](record).blank == '':
logger.debug(f'Terminate embed capture for {embedkey} from record {record}')
break
logger.debug(f'Parsing "{record}"')
if ctx['tokenize']:
is_ht = header_or_token(record, ctx['tokenize']['d'], ctx['headers'],
ctx['tokenparser'], ctx['headertokenparser'],
token_hold, header_hold)
if is_ht:
continue
new_div = capture_record(record, ctx['embedfmt'], typemap, embedkey,
ctx['headers'], header_hold, token_hold,
current_division, new_records)
if new_div:
current_division += 1
logger.debug(f'embed rec new keys {new_records}')
return new_records
[docs]
def parse_tables(self, typemap):
"""
Parse tables from the PDBRecord instance based on the record format.
This method checks if the record format contains table formats and parses them accordingly.
Parameters
----------
typemap : dict
A dictionary mapping field names to their types.
Returns
-------
None. This method modifies the PDBRecord instance in place, adding a `tables` attribute
that contains the parsed tables, where each table is a list of :class:`.pdbrecord.PDBRecord` instances.
"""
fmt = self.format
self.tables = {}
scanbegin = 0
for tname, table in fmt['tables'].items():
logger.debug(f'{self.key} will acquire a table {tname} from line {scanbegin}')
sigparser = BaseRecordParser({'signal': table['signal']}, typemap).parse
sigval = table['value']
skiplines = table.get('skiplines', 0)
rowparser = BaseRecordParser(table['fields'], typemap).parse
self.tables[tname] = []
scanfield = table['from']
triggered = False
capturing = False
lskip = 0
for i in range(scanbegin, len(self.__dict__[scanfield])):
# check for signal
l = self.__dict__[scanfield][i]
if not triggered and sigparser(l).signal == sigval:
# this is a signal-line
triggered = True
if not skiplines:
capturing = True
elif triggered and not capturing:
if skiplines:
lskip += 1
if lskip == skiplines:
capturing = True
elif capturing:
if sigparser(l).signal == '':
logger.debug(f'Terminate table {tname}')
scanbegin = i + 1
break
parsedrow = rowparser(l)
if not all([x == '' for x in parsedrow.__dict__.values()]):
self.tables[tname].append(parsedrow)
[docs]
class PDBRecordList(UserList):
"""
A class representing a list of PDBRecord instances, inheriting from UserList.
It provides methods for parsing and handling multiple PDB records.
"""
def __init__(self, initlist=None):
if initlist is not None:
self._validate_all(initlist)
super().__init__(initlist or [])
def _validate(self, item):
if not isinstance(item, PDBRecord):
raise TypeError(f"All items must be instances of PDBRecord, got {type(item)}")
def _validate_all(self, iterable):
for item in iterable:
self._validate(item)
def __setitem__(self, index, item):
# Support slice assignment
if isinstance(index, slice):
self._validate_all(item)
else:
self._validate(item)
super().__setitem__(index, item)
[docs]
def append(self, item):
self._validate(item)
super().append(item)
[docs]
def insert(self, index, item):
self._validate(item)
super().insert(index, item)
[docs]
def extend(self, other):
self._validate_all(other)
super().extend(other)
def __add__(self, other):
self._validate_all(other)
return PDBRecordList(super().__add__(other))
def __iadd__(self, other):
self._validate_all(other)
return super().__iadd__(other)
[docs]
class PDBRecordDict(UserDict):
"""
A class representing a dictionary of PDBRecord or PDBRecordList instances, inheriting from UserDict.
It provides methods for parsing and handling multiple PDB records stored in a dictionary.
"""
def __init__(self, *args, **kwargs):
super().__init__()
self.update(*args, **kwargs)
def _validate(self, value):
if not isinstance(value, (PDBRecord, PDBRecordList)):
raise TypeError(f"Values must be PDBRecord or PDBRecordList, got {type(value)}")
def __setitem__(self, key, value):
self._validate(value)
super().__setitem__(key, value)
[docs]
def update(self, *args, **kwargs):
other = dict(*args, **kwargs)
for key, value in other.items():
self[key] = value # Triggers __setitem__
[docs]
def setdefault(self, key, default=None):
if key not in self:
self[key] = default # Triggers __setitem__
return self[key]
[docs]
def gather_token(k, v, hold=None):
"""
Gather a token into a holder dictionary.
If the key already exists in the holder, it appends the value to the list.
Parameters
----------
k : str
The key for the token.
v : str
The value of the token.
hold : dict, optional
A dictionary to hold the tokens. Defaults to an empty dictionary.
"""
if k in hold:
if not isinstance(hold[k], list):
hold[k] = [hold[k], v]
else:
hold[k].append(v)
else:
hold[k] = v
[docs]
def capture_record(rec, fmt, typemap, key, hdrs, hh, th, divno, rh):
"""
Capture a record from the PDB file and create a new PDBRecord instance.
This function checks if the record is a continuation of an existing record or a new record.
If it is a continuation, it updates the existing record. If it is a new record,
it creates a new PDBRecord instance and adds it to the record holder.
Parameters
----------
rec : str
The record line to capture.
fmt : dict
The format dictionary defining the structure of the PDB record.
typemap : dict
A dictionary mapping field names to their types.
key : str
The key for the current record being captured.
hdrs : dict
A dictionary containing header formats and their specifications.
hh : list
A list to hold the header values.
th : dict
A dictionary to hold the tokens.
divno : int
The current division number.
rh : dict
A dictionary to hold the records, where keys are record keys and values are PDBRecord instances.
Returns
-------
bool
True if a new division was detected, False otherwise.
"""
new_division = False
embedkey = key
if hh:
divno += 1
logger.debug(f'Capture inherits a header hold; currdivno {divno}')
new_division = True
# if we are not holding headers, we still could encounter a new division
# of the data if the record's divnumber is not the current divnumber
if hdrs:
embedkey = f'{key}.{hdrs["divlabel"]}{divno}'
logger.debug(f'record to {embedkey}')
new_record = PDBRecord.newrecord(embedkey, rec, fmt, typemap)
if hasattr(new_record, 'divnumber'):
mydivno = new_record.divnumber
if mydivno == divno + 1:
logger.debug(f'New division detected {divno} -> {mydivno}')
new_division = True
divlabel = hdrs.get('divlabel', '')
embedkey = f'{key}.{divlabel}{mydivno}'
new_record.key = embedkey
thiskey = new_record.key
record_format = new_record.format
if hh:
new_record.header = hh.copy()
while hh:
hh.pop(0)
if th:
new_record.tokens = th.copy()
keys = list(th.keys())
for k in keys:
del th[k]
logger.debug(f'new record has key {thiskey}')
if not thiskey in rh:
logger.debug(f'new record for {thiskey}')
rh[thiskey] = new_record
else:
logger.debug(f'continuing record for {thiskey}')
root_record = rh[thiskey]
root_record.continue_record(new_record, record_format)
if hasattr(new_record, 'tokens'):
if hasattr(root_record, 'tokens'):
root_record.tokens.update(new_record.tokens)
else:
root_record.tokens = new_record.tokens
return new_division