# coding: utf-8
import os
from pkg_resources import ResourceManager
try:
import cPickle as pickle
except ImportError:
import pickle
import numpy as np
import re
from . import pdbtools
from . import utils
from .sysconfig import SysConfig
INFO_DIRECTORY = SysConfig.SYSCONFIG_LOCATION + os.sep + "structure_info"
CACHE_DIRECTORY = SysConfig.SYSCONFIG_LOCATION + os.sep + "cache"
CACHE_DISTANCEMAP = CACHE_DIRECTORY + os.sep + "distancemap.dat"
[docs]class DcaException(Exception):
"""
Custom exception class used for foreseeable, DCA related errors.
"""
pass
def _get_atoms_backbone(term_phosphate=False):
"""
Get list of backbone atoms.
:param term_phosphate: add P atoms
:return: list of atoms
"""
atoms = ["P", "OP1", "OP2"] if term_phosphate else []
return atoms + ["O5'", "C5'", "C4'", "O4'", "C3'", "O3'", "C2'", "O2'", "C1'"]
[docs]def get_atoms_for_res(res, term_phosphate=False):
"""
Get list of atoms for residue.
:param res: nucleotide (A,U,G,C)
:param term_phosphate: add P atoms
:return: list of atoms
"""
atoms = _get_atoms_backbone(term_phosphate)
if res == "A":
return atoms + ["N9", "C8", "N7", "C5", "C6", "N6", "N1", "C2", "N3", "C4"]
elif res == "U":
return atoms + ["N1", "C2", "O2", "N3", "C4", "O4", "C5", "C6"]
elif res == "G":
return atoms + ["N9", "C8", "N7", "C5", "C6", "O6", "N1", "C2", "N2", "N3", "C4"]
elif res == "C":
return atoms + ["N1", "C2", "O2", "N3", "C4", "N4", "C5", "C6"]
[docs]def get_atoms_for_res_sequence(sequence):
"""
Get list of atoms for a sequence of nucleotides
:param sequence: sequence as text
:return: list of atoms
"""
atoms = []
first = True
for res in sequence:
res = res.upper()
atoms.append((res, get_atoms_for_res(res, term_phosphate=(not first))))
first = False
return atoms
[docs]def get_contact_distance_map(structure_directory=INFO_DIRECTORY, westhof_vector=None, force_rebuild=False):
"""
Returns contact distance map
The contact distance map is cached it in the user directory and updated when newer files are found.
:param structure_directory: directory to look up structure information text files
:param westhof_vector: list of factors to apply different weights to the bonding family classes (defaults to ``[1, 1, ... ]``)
:param force_rebuild: force rebuilding the distance map
"""
# default: same weight for all families
if not westhof_vector:
westhof_vector = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
nucleotides = ["A", "U", "G", "C"]
# build a dict of filenames
# if a local file is present in the user directory it will take precedence over the system wide shipped version
structure_filenames = {}
resource_manager = ResourceManager()
for nt1 in nucleotides:
for nt2 in nucleotides:
ntpair = "%s-%s" % (nt1, nt2)
local_file = structure_directory + os.sep + ntpair + ".txt"
if os.path.isfile(local_file):
structure_filenames[ntpair] = local_file
else:
structure_filenames[ntpair] = resource_manager.resource_filename(__name__, "structure_info/%s.txt" % ntpair)
# try to use a cached version of the distance map if found and recent and force_rebuild is False
if not force_rebuild:
try:
cache_ok = True
if os.path.isfile(CACHE_DISTANCEMAP):
cache_timestamp = os.path.getmtime(CACHE_DISTANCEMAP)
for d in structure_filenames.itervalues():
if os.path.getmtime(d) > cache_timestamp:
cache_ok = False
print "Contact map cache out of date. Rebuilding..."
break
if cache_ok:
with open(CACHE_DISTANCEMAP, "r") as f:
return pickle.load(f)
except (IOError, pickle.PickleError, AttributeError, EOFError, IndexError):
print "Contact map cache broken. Rebuilding..."
print "Building contact distance map:"
pdb_structure_dict = {}
distance_map = {}
for nt1 in nucleotides:
for nt2 in nucleotides:
distance_map_res_pair = {}
pdb_codes = []
residues = []
# read the structures for the 12 edge-to-edge bonding families
for line in utils.read_file_line_by_line(structure_filenames[nt1 + '-' + nt2]):
fields = line.split(" ")
if fields[0] != "-":
pdb_codes.append(fields[0].upper())
residues.append((int(fields[1]), int(fields[2])))
else:
pdb_codes.append(None)
residues.append(None)
# loop over all pdbcodes and their index in the list (0-11)
for index, pdb_code in enumerate(pdb_codes):
# skip if we don't have any entry for this family
if pdb_code is None:
continue
# download pdb if necessary
if pdb_code not in pdb_structure_dict:
pdb_structure_dict[pdb_code] = pdbtools.parse_pdb(pdb_code, pdbtools.get_pdb_by_code(pdb_code))
# extract model from pdb
model = pdb_structure_dict[pdb_code][0]
# try to find the residue contact specified. this is done by looping over all chains in the model,
# and checking if the residue is in there and is the correct nucleotide
def find_res(res, resname):
for chain in model:
try:
if chain[res].get_resname().strip() == resname:
return chain[res]
except KeyError:
pass
return None
res1 = find_res(residues[index][0], nt1)
res2 = find_res(residues[index][1], nt2)
if not res1 or not res2:
raise Exception("Could not find residue contact in pdb file: %s-%s %s %s %s" % (nt1, nt2, pdb_code, residues[index][0], residues[index][1]))
print "%s-%s %s %s %s" % (nt1, nt2, pdb_code, residues[index][0], residues[index][1])
# add all atom-atom contacts to the distance map for the current residue pair
for atom1 in res1:
for atom2 in res2:
if not (atom1.name.startswith('H') or atom2.name.startswith('H')):
contact_key = str(atom1.name) + '-' + str(atom2.name)
distance = westhof_vector[index] * (atom1 - atom2)
if contact_key not in distance_map_res_pair:
distance_map_res_pair[contact_key] = [distance]
else:
distance_map_res_pair[contact_key].append(distance)
distance_map[nt1 + nt2] = distance_map_res_pair
# save distance map in cache
utils.mkdir_p(CACHE_DIRECTORY)
with open(CACHE_DISTANCEMAP, "w") as f:
pickle.dump(distance_map, f)
return distance_map
[docs]def get_contact_distance_map_mean(distance_map, mean_cutoff=None, std_cutoff=None):
"""
Return an average distance map containing only those contacts with average distance and standard deviation satisfiying a cutoff.
:param distance_map: full distance map
:param mean_cutoff: limit for average
:param std_cutoff: limit for standard deviation
:return: average distance map
"""
mean_distance_map = {}
for res_pair, distance_map_res_pair in distance_map.iteritems():
mean_distance_map_res = {}
for atom_pair, distances_atom_pair in distance_map_res_pair.iteritems():
mean = np.asarray(distances_atom_pair).mean()
std = np.asarray(distances_atom_pair).std()
if (mean_cutoff is None or mean < mean_cutoff) and (std_cutoff is None or std < std_cutoff):
mean_distance_map_res[atom_pair] = [mean, std]
mean_distance_map[res_pair] = mean_distance_map_res
return mean_distance_map
[docs]def read_pdb_mapping_from_file(dca_prediction_filename):
"""
Read a PDB mapping from DCA file if present and return it as text
:param dca_prediction_filename: DCA input filename
:return: PDB mapping text
"""
pattern_parameter = re.compile(r"^#\s(\S+)\s+(.*)$")
for line in utils.read_file_line_by_line(dca_prediction_filename):
if line[0] == "#":
# comment / parameter line
m = pattern_parameter.match(line)
if m:
if m.group(1) == "pdb-mapping":
return m.group(2)
continue
# we only allow comments on top, so no need to check the rest of the lines:
break
return None
[docs]def parse_dca_data(dca_prediction_filename):
"""
Read a DCA file, adjust the sequence numbers to match the alignment of the PDB file, and create a list of DcaContacts
:param dca_prediction_filename: DCA input filename
:return: list of DcaContact objects
"""
print "Parsing dca file %s..." % dca_prediction_filename
# read pdb mapping from file
pdb_mapping_text = read_pdb_mapping_from_file(dca_prediction_filename)
if pdb_mapping_text is None:
print "pdb-mapping: 1-N (no pdb-mapping found in header of dca file)"
else:
print "pdb-mapping: %s" % pdb_mapping_text
# parse pdb mapping to dict
# for example 1-7,80,100-120,8-9 --> {1: 1, 2: 2, ..., 80: 8, 81: 9, ...}
try:
pdb_mapping = None if pdb_mapping_text is None else {x: i + 1 for i, x in enumerate(utils.comma_separated_ranges_to_list(pdb_mapping_text))}
except ValueError as e:
raise DcaException("Invalid pdb mapping string: %s" % e.message)
dca = []
for line in utils.read_file_line_by_line(dca_prediction_filename):
if line[0] == "#":
continue
# data line
parts = line.split(" ")
if pdb_mapping is not None:
try:
res1 = pdb_mapping[int(parts[0])]
res2 = pdb_mapping[int(parts[1])]
except KeyError:
# our dca file might contain more than what we are interested in for the current prediction.
# simply ignore all dca pairs that don't fit the mapping range, but print a warning.
print "Warning: At least one residue of contact %s not found in PDB-Mapping. Ignoring..." % parts[:2]
continue
else:
res1 = int(parts[0])
res2 = int(parts[1])
dca.append(DcaContact(res1, res2))
return dca
[docs]def get_contact_information_in_pdb_chain(dca_contact, pdb_chain, heavy_only=True):
"""
Returns distance information about a DCA contact in a realized PDB chain
Return value is a tuple of:
- Average distance: Mean distance of all atoms in the contacts.
- Minimum distance: Minimum distance between two atoms in the contact.
- Minimum pair: List of ``[atom1, atom2]`` forming the minimal contact
:param dca_contact: DcaContact object
:param pdb_chain: PDB chain structure object
:param heavy_only: Only use heavy atoms
:return: tuple ``(average_dist, minimum_dist, minimum_pair)``. In case the contact cannot be found in the PDB file ``(0, 0, None)`` is returned.
"""
try:
res1, res2 = (pdb_chain[dca_contact.res1], pdb_chain[dca_contact.res2])
except KeyError:
return 0, 0, None
# calculate average distance
average_heavy = np.linalg.norm(pdbtools.get_center_of_res(res1) - pdbtools.get_center_of_res(res2))
minimum_heavy = 9999
minimum_pair = []
for atom1 in pdbtools.filter_atoms(res1, heavy_only):
for atom2 in pdbtools.filter_atoms(res2, heavy_only):
dist = np.linalg.norm(atom1.coord - atom2.coord)
if dist < minimum_heavy:
minimum_heavy = dist
minimum_pair = [atom1, atom2]
return average_heavy, minimum_heavy, minimum_pair
[docs]def build_cst_info_from_dca_contacts(dca_data, sequence, mapping_mode, cst_function, number_dca_predictions, quiet=False):
"""
Maps DCA residue contacts to atom-atom constraints.
:param dca_data: list od DcaContacts
:param sequence: sequence as text
:param mapping_mode: atom-to-atom mapping mode to use, supported values: "minAtom" or "pOnly"
:param cst_function: rosetta function and parameters as text string
:param number_dca_predictions: maximum number of DCA predictions to use
:param quiet: reduce output verbosity
:return: list of constraint information
"""
mapping_mode = mapping_mode.lower()
if mapping_mode not in ["minAtom", "ponly"]:
raise DcaException("build_cst_info: Invalid mapping mode given: %s" % mapping_mode)
if mapping_mode == "minAtom":
# load contact map for atom-atom contacts
distance_map = get_contact_distance_map()
distance_map_mean = get_contact_distance_map_mean(distance_map, mean_cutoff=6.0, std_cutoff=3.0)
atoms = get_atoms_for_res_sequence(sequence)
cst_info = []
predictions_used = 0
for i, d in enumerate(dca_data):
if predictions_used >= number_dca_predictions:
if not quiet:
print "Limit of %d used predictions reached. Stopping..." % number_dca_predictions
break
# print some information about the dca contact
if not quiet:
print "Contact %d: %s" % (i + 1, d)
# skip contact completely?
if not d.use_contact:
if not quiet:
print " Dca contact skipped."
continue
# lookup contact
try:
res1 = atoms[d.res1 - 1]
res2 = atoms[d.res2 - 1]
contact_key = res1[0] + res2[0]
except IndexError:
# This happens only when there was no pdb-mapping specified in the dca file header.
# TODO: Maybe this whole mapping parsing thing should be worked over so that these
# two cases can be unified.
print " Warning: Residues of contact cannot be found in current sequence. Ignoring..."
continue
predictions_used += 1
if not quiet:
print " Dca contact used (%d)." % predictions_used
# build atom-atom constraints
if mapping_mode == "minAtom":
for atom1 in res1[1]:
for atom2 in res2[1]:
atom_contact_key = atom1 + '-' + atom2
if atom_contact_key in distance_map_mean[contact_key]:
distance = distance_map_mean[contact_key][atom_contact_key][0] / 10.0
if not quiet:
print "[%s, %s] %s %s %s" % (d.res1, d.res2, contact_key, atom_contact_key, distance)
cst_info.append([atom1, d.res1, atom2, d.res2, d.get_rosetta_function(cst_function)])
elif mapping_mode == "ponly":
# TODO: rosetta does not add P atoms for the first residue, so just skip those?
if d.res1 == 1 or d.res2 == 1:
continue
atom_contact_key = "P-P"
if not quiet:
print "[%s, %s] %s %s" % (d.res1, d.res2, contact_key, atom_contact_key)
cst_info.append(["P", d.res1, "P", d.res2, d.get_rosetta_function(cst_function)])
return cst_info
[docs]class DcaContact(object):
"""Class representing a DCA contact."""
[docs] def __init__(self, res1, res2, use_contact=True, weight=1):
"""
Create new DCA contact.
:param res1: number of first residue (from 1)
:param res2: number of second residue (from 1)
:param use_contact: True or False if contact is to be used
:param weight: assign a different weight to the contact (default 1)
"""
self.res1 = res1
self.res2 = res2
self.use_contact = use_contact
self.weight = weight
def __str__(self):
return "[%s, %s], use_contact=%s, weight=%f" % (self.res1, self.res2, self.use_contact, self.weight)
[docs] def get_rosetta_function(self, function="FADE -100 26 20 -2 2"):
"""
Return Rosetta function of the contact as a list while applying weight.
:param function: rosetta function as text
:return: rosetta function as a list with applied weight
"""
function = function.split()
# parse numeric arguments
# noinspection PyShadowingNames
def parse_function(function, types):
for arg, argtype in types:
try:
function[arg] = argtype(function[arg])
except ValueError:
raise DcaException("Invalid Rosetta function: Could not parse argument '%s' as '%s'" % (function[arg], argtype.__name__))
try:
if function[0] == "FADE":
parse_function(function, [(1, int), (2, int), (3, int), (4, float), (5, float)]) # TODO: make all of these floats?
return [function[0], function[1], function[2], function[3], function[4] * self.weight, function[5] * self.weight]
else:
raise DcaException("Invalid Rosetta function: '%s' is not implemented! Only 'FADE' function is recognized at this time." % function[0])
except IndexError:
raise DcaException("Invalid Rosetta function: Not enough arguments")
# DCA FILTERING
[docs]def filter_dca_data(dca_data, dca_filter_chain, quiet=False):
"""
Run list of DCA contacts through a chain of filters.
:param dca_data: list of DcaContact objects
:param dca_filter_chain: list of DcaFilter objects
:param quiet: reduce output verbosity
"""
if dca_filter_chain is None:
return dca_data
for dca_filter in dca_filter_chain:
if dca_filter is not None:
for contact in dca_data:
dca_filter.apply(contact, quiet)
[docs]class DcaFilter(object):
"""Filter base class."""
[docs] def apply(self, contact, quiet=False):
"""Apply filter to a DCA contact.
:param contact: DCA contact
:param quiet: reduce verbosity
"""
raise NotImplementedError
[docs]class DcaFilterThreshold(DcaFilter):
"""Filter to skips DCA contact if below or above a threshold."""
[docs] def __init__(self, pdb_chain, threshold, keep_below=True, mode="minimum_heavy"):
"""
Create a new threshold filter.
:param pdb_chain: PDB chain
:param threshold: threshold below or above to keep a contact
:param keep_below: True to keep below threshold, False to keep above
:param mode: What distance to compare to (average_heavy, minimum_heavy)
"""
self.pdb_chain = pdb_chain
self.threshold = threshold
self.keep_below = keep_below
self.mode = mode
def __repr__(self):
return "Threshold filter: keep %s %f (%s)" % ("below" if self.keep_below else "above", self.threshold, self.mode)
[docs] def apply(self, contact, quiet=False):
# do not touch contacts that are already disabled
if not contact.use_contact:
return
# get contact information
average_heavy, minimum_heavy, minimum_pair = get_contact_information_in_pdb_chain(contact, self.pdb_chain)
# skip if not found
if minimum_pair is None:
contact.use_contact = False
print "%s: not found --> skip"
return
# which distance should be used?
if self.mode == "average_heavy":
distance = average_heavy
else:
distance = minimum_heavy
skip = False
# set contact to disabled if filter failed
if (self.keep_below and distance >= self.threshold) or (not self.keep_below and distance <= self.threshold):
skip = True
contact.use_contact = False
if not quiet:
print "%s: %f --> %s" % (self, distance, "skip" if skip else "keep")