Source code for pidibble.pdbparse

# Author: Cameron F. Abrams <cfa22@drexel.edu>
"""

.. module:: pdbparse
   :synopsis: Defines the PDBParser class
   
.. moduleauthor: Cameron F. Abrams, <cfa22@drexel.edu>

"""
import importlib.metadata
import json
import logging
import os
import urllib.request
import yaml
from typing import Callable

import numpy as np

from mmcif.io.IoAdapterCore import IoAdapterCore
from pathlib import Path

from . import resources
from .baseparsers import ListParsers, ListParser, str2int_sig, safe_float
from .baserecord import BaseRecordParser
from .pdbrecord import PDBRecord, PDBRecordDict, PDBRecordList
from .mmcif_parse import MMCIF_Parser
from .hex import str2atomSerial, hex_reset

logger = logging.getLogger(__name__)
__version__ = importlib.metadata.version("pidibble")

[docs] class PDBParser: """ A class for parsing PDB files and extracting structured data. This class handles fetching PDB files, reading them, and parsing their contents into structured records based on predefined formats. Attributes ---------- parsed : PDBRecordDict A dictionary containing parsed records, where keys are record types and values are :class:`.pdbrecord.PDBRecord` instances or lists of instances. This dictionary is populated after parsing the PDB or mmCIF file. mappers : dict A dictionary of mappers for parsing different data types, including custom formats and delimiters. pdb_lines : list A list of lines read from the PDB file. Empty if no input file is provided. cif_data : dict A dictionary containing the parsed mmCIF data. Empty if no input file is provided. """ def __init__(self, input_format: str = 'PDB', overwrite: bool = False, source_db: str = None, source_id: str = None, filepath: str | Path = None, mappers: dict[str, Callable] = {'HxInteger':str2atomSerial, 'Integer':str2int_sig, 'String':str, 'Float':safe_float}, comment_chars: list[str] = ['#'], pdb_format_file: str ='pdb_format.yaml', mmcif_format_file: str = 'mmcif_format.yaml', **kwargs): logger.debug(f'Pidibble v. {__version__}') self.input_format = input_format self.overwrite = overwrite self.source_db = source_db self.source_id = source_id self.filepath = Path(filepath) if filepath else None self.mappers = mappers self.mappers.update(ListParsers) self.comment_chars = comment_chars self.pdb_lines = [] self.cif_data = {} self.parsed = PDBRecordDict() self.pdb_format_file = pdb_format_file if not os.path.isfile(self.pdb_format_file): # if pdb_format_file is not a file in the CWD, assume it is a relative path to the resources directory # this is useful for testing self.pdb_format_file = os.path.join(os.path.dirname(resources.__file__), pdb_format_file) self.mmcif_format_file = mmcif_format_file if not os.path.isfile(self.mmcif_format_file): # if mmcif_format_file is not a file in the CWD, assume it is a relative path to the resources directory # this is useful for testing self.mmcif_format_file = os.path.join(os.path.dirname(resources.__file__), mmcif_format_file) if os.path.exists(self.pdb_format_file): with open(self.pdb_format_file, 'r') as f: self.pdb_format_dict = yaml.safe_load(f) logger.debug(f'Pidibble uses the installed config file {self.pdb_format_file}') else: raise FileNotFoundError(f'{self.pdb_format_file} not found, either locally ({os.getcwd()}) or in resources ({os.path.dirname(resources.__file__)})') if os.path.exists(self.mmcif_format_file): with open(self.mmcif_format_file,'r') as f: self.mmcif_format_dict=yaml.safe_load(f) logger.debug(f'Pidibble uses the installed config file {self.mmcif_format_file}') else: raise FileNotFoundError(f'{self.mmcif_format_file} not found, either locally ({os.getcwd()}) or in resources ({os.path.dirname(resources.__file__)})') # update mappers with delimiters and custom formats delimiter_dict = self.pdb_format_dict.get('delimiters', {}) for map,d in delimiter_dict.items(): if not map in self.mappers: self.mappers[map] = ListParser(d).parse cformat_dict = self.pdb_format_dict.get('custom_formats', {}) for cname,cformat in cformat_dict.items(): if not cname in self.mappers: self.mappers[cname] = BaseRecordParser(cformat, self.mappers).parse
[docs] def fetch(self): """ Fetch the PDB file based on the provided PDB code or AlphaFold ID. This method checks if the PDB code or AlphaFold ID is provided, constructs the appropriate file path, and attempts to download the file from the PDB or AlphaFold API. Returns ------- bool True if the file was successfully fetched, False otherwise. """ assert self.source_db is not None or self.filepath is not None, f'source_db {self.source_db} or filepath {self.filepath} must be specified for fetch()' if self.source_db is not None and self.source_id is None: raise ValueError(f'You must specify a source ID code for source_db {self.source_db}') if self.source_db is not None and self.source_db not in ['rcsb', 'alphafold', 'opm']: raise ValueError(f'Source db {self.source_db} is not recognized.') if self.filepath is not None: if not self.filepath.exists(): raise FileNotFoundError(f'{self.filepath.name} not found.') return True match self.source_db: case 'rcsb': if self.input_format == 'PDB': self.filepath = Path(f'{self.source_id}.pdb') elif self.input_format == 'mmCIF': self.filepath = Path(f'{self.source_id}.cif') else: logger.warning(f'Input format {self.input_format} not recognized; using PDB') self.filepath = Path(f'{self.source_id}.pdb') BASE_URL = self.pdb_format_dict['BASE_URL'] target_url = os.path.join(BASE_URL, self.filepath.name) if not self.filepath.exists() or self.overwrite: try: urllib.request.urlretrieve(target_url, self.filepath.name) except: logger.warning(f'Could not fetch {self.filepath.name} from {self.source_db}') return False return True case 'alphafold': self.filepath = Path(f'{self.source_id}.pdb') BASE_URL = self.pdb_format_dict['ALPHAFOLD_API_URL'] target_url = os.path.join(BASE_URL, self.source_id) try: urllib.request.urlretrieve(target_url + r'?key=' + self.pdb_format_dict['ALPHAFOLD_API_KEY'], f'{self.source_id}.json') except: logger.warning(f'Could not fetch metadata for entry with accession code {self.source_id} from AlphaFold') return False with open(f'{self.source_id}.json') as f: result = json.load(f) try: urllib.request.urlretrieve(result[0]['pdbUrl'], self.filepath.name) except: logger.warning(f'Could not retrieve {result[0]["pdbUrl"]}') return False return True case 'opm': self.filepath = Path(f'{self.source_id}.pdb') BASE_URL = self.pdb_format_dict['OPM_URL'] target_url = os.path.join(BASE_URL, self.filepath.name) if not self.filepath.exists() or self.overwrite: try: urllib.request.urlretrieve(target_url, self.filepath.name) except: logger.warning(f'Could not fetch {self.filepath.name} from {self.source_db}') return False logger.warning(f'Stripping blanks and END lines from OPM pdb') badlines = self.filepath.read_text().split('\n') with open(self.filepath.name, 'w') as f_base: with open(f'{self.filepath.stem}-dum.pdb', 'w') as f_dum: f = f_base for line in badlines: sline = line.strip() if not sline.startswith('END') and len(sline) > 0: f.write(sline+'\n') if sline.startswith('END') and f is f_base: f = f_dum logger.debug(f'Generated {self.filepath.name} and {self.filepath.stem}-dum.pdb') return True case '_': logger.debug(f'Source db {self.source_db} is not recognized.') return False
[docs] def read_PDB(self): """ Read the PDB file and store its lines in :attr:`PDBParser.pdb_lines`. This method opens the PDB file, reads its contents, and splits it into lines. If the last line is empty, it removes it from the list of lines. """ with open(self.filepath, 'r') as f: self.pdb_lines = f.read().split('\n') if self.pdb_lines[-1] == '': self.pdb_lines = self.pdb_lines[:-1]
[docs] def read_mmCIF(self): """ Read the mmCIF file and store its data in :attr:`PDBParser.cif_data`. This method uses the :class:`mmcif.io.IoAdapterCore.IoAdapterCore` to read the mmCIF file and store the data in :attr:`PDBParser.cif_data`. """ io = IoAdapterCore() l_dc = io.readFile(self.filepath) self.cif_data = l_dc[0]
[docs] def read(self): """ Read the PDB or mmCIF file based on the input format. This method checks the input format and calls the appropriate read method. """ if self.input_format == 'mmCIF': self.read_mmCIF() else: self.read_PDB()
[docs] def parse_base(self): """ Parse the base records from the PDB or mmCIF file. This method initializes the parsing process based on the input format. If the input format is mmCIF, it uses the :class:`.mmcif_parse.MMCIF_Parser` to parse the mmCIF data. If the input format is PDB, it uses the :class:`.pdbrecord.PDBRecord` class to parse the PDB lines. """ if self.input_format == 'mmCIF': self.parse_mmCIF() else: self.parse_PDB()
[docs] def parse_mmCIF(self): """ Parse the mmCIF data and generate a dictionary of :class:`.pdbrecord.PDBRecord` instances. This method uses the :class:`.mmcif_parse.MMCIF_Parser` to parse the mmCIF data and store the parsed records in :attr:`PDBParser.parsed`. """ mmcif_parser = MMCIF_Parser(self.mmcif_format_dict,self.pdb_format_dict['record_formats'],self.cif_data) self.parsed = mmcif_parser.parse()
[docs] def parse_PDB(self): """ Parse the PDB lines and generate a dictionary of :class:`.pdbrecord.PDBRecord` instances. This method iterates through the PDB lines, identifies the record type based on the first character, and creates a new :class:`.pdbrecord.PDBRecord` instance for each record. It handles different record types, including continuation records and grouped records. """ hex_reset() record_formats=self.pdb_format_dict['record_formats'] key='' record_format={} group_open_record=None for i,pdbrecord_line in enumerate(self.pdb_lines): tc=pdbrecord_line[0] if tc in self.comment_chars: continue pdbrecord_line+=' '*(80-len(pdbrecord_line)) base_key=pdbrecord_line[:6].strip() assert base_key in record_formats,f'{base_key} is not found in among the available record formats' base_record_format=record_formats[base_key] record_type=base_record_format['type'] new_record=PDBRecord.newrecord(base_key,pdbrecord_line,base_record_format,self.mappers) key=new_record.key record_format=new_record.format if record_type in [1,2,6]: if not key in self.parsed: self.parsed[key]=new_record else: # this must be a continuation record assert record_type!=1,f'{key} may not have continuation records' root_record=self.parsed[key] root_record.continue_record(new_record,record_format,all_fields=('REMARK' in key)) elif record_type in [3,4,5]: if not key in self.parsed: # this is necessarily the first occurance of a record with this key, but since there can be multiple instances this must be a list of records if 'groupuntil' in record_format: group_open_record=new_record logger.debug(f'opening group {group_open_record.serial} until {group_open_record.format["groupuntil"]}') if group_open_record!=None and key==group_open_record.format['groupuntil']: logger.debug(f'closing group {group_open_record.serial}') group_open_record=None if 'groupby' in record_format: tok=new_record.format['groupby'].split('.') if group_open_record!=None: if tok[0]==group_open_record.key: groupid=getattr(group_open_record,tok[1]) setattr(new_record,group_open_record.key.lower(),groupid) self.parsed[key]=PDBRecordList([new_record]) else: # this is either # (a) a continuation record of a given key.(determinants) # or # (b) a new set of (determinants) on this key # note (b) is only option if there are no determinants # first, look for key.(determinants) root_record=None if 'determinants' in record_format: nrd=[new_record.__dict__[k] for k in record_format['determinants']] for r in self.parsed[key]: td=[r.__dict__[k] for k in record_format['determinants']] if nrd==td: root_record=r break if root_record: # case (a) assert root_record.continuation<new_record.continuation,f'continuation parsing error {record_type}' root_record.continue_record(new_record,record_format) else: # case (b) if 'groupuntil' in record_format: group_open_record=new_record logger.debug(f'opening group {group_open_record.serial} until {group_open_record.format["groupuntil"]}') if group_open_record!=None and key==group_open_record.format['groupuntil']: logger.debug(f'closing group {group_open_record.serial}') group_open_record=None if 'groupby' in record_format: tok=new_record.format['groupby'].split('.') if group_open_record!=None: if tok[0]==group_open_record.key: groupid=getattr(group_open_record,tok[1]) setattr(new_record,group_open_record.key.lower(),groupid) self.parsed[key].append(new_record)
[docs] def post_process(self): """ Post-process the parsed records to handle embedded records, tokens, and tables. This method checks if the input format is mmCIF and processes the records accordingly. If the input format is PDB, it processes the records to handle embedded records, tokens, and tables. """ if self.input_format!='mmCIF': self.parse_embedded_records() self.parse_tokens() self.parse_tables()
# def parse_models(self): # n_models=self.parsed.get('NUMMDL',1) # for i in range(n_models): # self.parsed['MODEL'][i+1]={} # # in progress
[docs] def parse_embedded_records(self): """ Parse embedded records within the parsed records. This method iterates through the parsed records and checks if any record has embedded records. If an embedded record is found, it calls the :meth:`.pdbrecord.PDBRecord.parse_embedded` method to parse the embedded records. It updates the :attr:`PDBParser.parsed` dictionary with the new parsed records. """ new_parsed_records={} for key,p in self.parsed.items(): if type(p)==PDBRecord: rf=p.format if 'embedded_records' in rf: new_parsed_records.update(p.parse_embedded(self.pdb_format_dict['record_formats'],self.mappers)) elif type(p)==PDBRecordList: for q in p: rf=q.format if 'embedded_records' in rf: new_parsed_records.update(q.parse_embedded(self.pdb_format_dict['record_formats'],self.mappers)) self.parsed.update(new_parsed_records)
[docs] def parse_tokens(self): """ Parse tokens within the parsed records. This method iterates through the parsed records and checks if any record has token formats. If a token format is found, it calls the :meth:`.pdbrecord.PDBRecord.parse_tokens` method to parse the tokens. It updates the :attr:`PDBParser.parsed` dictionary with the new parsed records. """ for key,p in self.parsed.items(): if type(p)==PDBRecord: rf=p.format if 'token_formats' in rf: p.parse_tokens(self.mappers) elif type(p)==list: for q in p: rf=q.format if 'token_formats' in rf: q.parse_tokens(self.mappers)
[docs] def parse_tables(self): """ Parse tables within the parsed records. This method iterates through the parsed records and checks if any record has table formats. If a table format is found, it calls the :meth:`.pdbrecord.PDBRecord.parse_tables` method to parse the tables. It updates the :attr:`PDBParser.parsed` dictionary with the new parsed records. """ for key,p in self.parsed.items(): if type(p)==PDBRecordList: continue # don't expect to read a table from a multiple-record entry rf=p.format if 'tables' in rf: p.parse_tables(self.mappers)
[docs] def parse(self): """ Parse the PDB or mmCIF file and generate a dictionary of :class:`.pdbrecord.PDBRecord` instances. This method first fetches the PDB or mmCIF file based on the provided PDB code or AlphaFold ID. It then reads the file and parses its contents into structured records. If the input format is mmCIF, it uses the :class:`.mmcif_parse.MMCIF_Parser` to parse the mmCIF data. If the input format is PDB, it uses the :class:`.pdbrecord.PDBRecord` class to parse the PDB lines. Returns ------- self : PDBParser The instance of :class:`.pdbrecord.PDBRecord` containing the parsed records. """ if self.fetch(): self.read() self.parse_base() self.post_process() else: logger.warning(f'No data.') return self
[docs] def get_symm_ops(rec:PDBRecord): """ Extract the symmetry operations from a PDB record. This function processes the symmetry operations from a PDB record and returns the transformation matrix and translation vector. Parameters ---------- rec : :class:`.pdbrecord.PDBRecord` The PDBRecord instance containing the symmetry operations. Returns ------- M : :class:`numpy.ndarray` The 3x3 transformation matrix. T : :class:`numpy.ndarray` The 3x1 translation vector. """ M = np.identity(3) T = np.array([0., 0., 0.]) if not (hasattr(rec, 'row') and hasattr(rec, 'coordinate')): raise ValueError('Invalid PDBRecord: missing row or coordinate attributes') assert len(rec.row) == 3, f'a transformation matrix record should not have more than 3 rows' for c, r in zip(rec.coordinate, rec.row): row = c - 1 M[row][0] = r.m1 M[row][1] = r.m2 M[row][2] = r.m3 T[row] = r.t return M, T