Source code for nmrml2isa.nmrml
# coding: utf-8
from __future__ import (
print_function,
absolute_import,
unicode_literals,
)
import os
import json
import six
import collections
import pronto
from . import __version__, __author__, __email__
from .utils import etree
[docs]class nmrMLmeta(object):
_namespaced_xpaths = {
'instruments': '{root}/s:instrumentConfigurationList/s:instrumentConfiguration',
'software': '{root}/s:softwareList/s:software',
'acquisition': '{root}/s:acquisition/s:acquisition1D/s:acquisitionParameterSet',
'source_file': '{root}/s:sourceFileList/s:sourceFile',
'contacts': '{root}/s:contactList/s:contact',
'processing': '{root}/s:dataProcessingList/s:dataProcessing/s:processingMethod',
'spectrum': '{root}/s:spectrum/s:spectrum1D',
'probehead': '{root}/s:instrumentConfigurationList/s:instrumentConfiguration/s:userParam',
'pulse_sequence': '{root}/s:acquisition/s:acquisition1D/s:acquisitionParameterSet/s:pulseSequence/s:userParam',
}
_raw_xpaths = {k:v.replace('s:', '') for k,v in six.iteritems(_namespaced_xpaths)}
gyromagnetic_table = {
'CHEBI_49637': 42.576, # 1H
'CHEBI_29237': 6.536, # 2H
'CHEBI_36928': 10.705, # 13C
'CHEBI_36938': 3.077, # 14N
'CHEBI_36934': -4.316, # 15N
'CHEBI_33819': -5.772, # 17O
'CHEBI_37971': 17.235, # 31P
}
nmrcv = None
def __init__(self, in_file, cached_onto=None):
# setup lxml parsing
self.in_file = in_file
parser = etree.XMLParser()
self.tree = etree.parse(in_file, parser=parser)
self._build_env()
self._load_ontology(cached_onto)
self.meta = collections.OrderedDict()
try:
filename = os.path.basename(in_file.name)
except AttributeError:
filename = os.path.basename(in_file)
finally:
self.sample = os.path.splitext(filename)[0]
self.meta['Derived Spectral Data File'] = {'value': self.in_file}
self.meta['Sample Name'] = {'value': self.sample}
self.meta['NMR Assay Name'] = {'value': self.sample}
self.meta['Free Induction Decay Data File'] = {'value': "{}.zip".format(self.sample)}
# Start parsing
self.instrument()
self.acquisition()
self.source_file()
self.contacts()
self.processing()
self.spectrum()
self.probehead()
self.pulse_sequence()
self._convert_magnetic_field()
self._urllize(self.meta)
if 'contacts' in self.meta:
self.meta['study_contacts'] = self.meta['contacts']['entry_list']
del self.meta['contacts']
def _load_ontology(self, cached_onto):
if self.nmrcv is None:
if cached_onto is not None:
self.nmrcv = cached_onto
else:
self.nmrcv = pronto.Ontology(
os.path.join(os.path.dirname(os.path.abspath(__file__)),'nmrCV.owl'), False
)
def instrument(self):
"""Parses the instrument model, manufacturer and software"""
instrument = self.tree.find(self.xpaths['instruments'].format(**self.env), self.ns)
cvs = instrument.iterfind('./{cvParam}'.format(**self.env), self.ns)
for cv in cvs:
if cv.attrib['accession'] in self.nmrcv['NMR:1000031'].rchildren():
self.meta['Instrument'] = {
'name': cv.attrib['name'],
'accession': cv.attrib['accession'],
'ref': cv.attrib['cvRef'],
}
manufacturer = next((x for x in self.nmrcv['NMR:1400255'].rchildren() if cv.attrib['name'].startswith(x.name)), None)
if manufacturer is not None:
self.meta['Instrument manufacturer'] = {
'name': manufacturer.name,
'accession': manufacturer.id,
'ref': 'NMRCV',
}
# PROBE
elif cv.attrib['accession'] in self.nmrcv['NMR:1400014'].rchildren():
self.meta['NMR Probe'] = {
'name': cv.attrib['name'],
'accession': cv.attrib['accession'],
'ref': cv.attrib['cvRef'],
}
# AUTOSAMPLER
elif cv.attrib['accession'] in self.nmrcv['NMR:1000234'].rchildren():
self.meta['Autosample'] = {
'name': cv.attrib['name'],
'accession': cv.attrib['accession'],
'ref': cv.attrib['cvRef'],
}
soft_ref = instrument.find('s:softwareRef', self.ns)
if soft_ref is not None:
soft, softv = self.software(soft_ref.attrib['ref'])
if soft is not None:
self.meta['Instrument software'] = soft
if softv is not None:
self.meta['Instrument software version'] = {'value': softv}
def software(self, soft_ref):
"""Parses software information
Returns:
software (dict or None)
software version (str or None)
"""
for soft in self.tree.iterfind(self.xpaths['software'].format(**self.env), self.ns):
if soft.attrib['id'] == soft_ref:
soft_meta = { 'name': soft.attrib['name'],
'ref': soft.attrib['cvRef'],
'accession': soft.attrib['accession'] }
if 'version' in soft.attrib:
return soft_meta, soft.attrib['version']
else:
return soft_meta, None
return None,None
def acquisition(self):
acquisition = self.tree.find(self.xpaths['acquisition'].format(**self.env), self.ns)
if acquisition is None: return
self.meta['Number of transients'] = {'value': int(acquisition.attrib['numberOfScans'])}
self.meta['Number of steady state scans'] = {'value': int(acquisition.attrib['numberOfSteadyStateScans'])}
terms = {'s:sampleAcquisitionTemperature': 'Temperature',
's:sampleContainer': 'NMR tube type',
's:spinningRate': 'Spinning Rate',
's:relaxationDelay': 'Relaxation Delay',
's:pulseSequence': 'Pulse sequence',
's:DirectDimensionParameterSet/s:acquisitionNucleus': 'Acquisition Nucleus',
's:DirectDimensionParameterSet/s:decouplingNucleus': 'Decoupling Nucleus',
's:DirectDimensionParameterSet/s:effectiveExcitationField': 'Magnetic field strength',
's:DirectDimensionParameterSet/s:sweepWidth': 'Sweep Width',
's:DirectDimensionParameterSet/s:pulseWidth': 'Pulse Width',
's:DirectDimensionParameterSet/s:irradiationFrequency': 'Irradiation Frequency',
's:DirectDimensionParameterSet/s:samplingStrategy': 'Sampling Strategy',
}
self.read_children(acquisition, terms)
def source_file(self):
source_files = self.tree.iterfind(self.xpaths['source_file'].format(**self.env), self.ns)
hooked_terms = [
{'hook': lambda cv: cv.attrib['accession'] in self.nmrcv['NMR:1400285'].rchildren(), 'name':'Format'},
{'hook': lambda cv: cv.attrib['accession'] in self.nmrcv['NMR:1400119'].rchildren(), 'name':'Type'},
{'hook': lambda cv: cv.attrib['accession'] in self.nmrcv['NMR:1400122'].rchildren(), 'name':'Type'},
{'hook': lambda cv: cv.attrib['accession'] in self.nmrcv['NMR:1002006'].rchildren(), 'name':'Type'},
{'hook': lambda cv: cv.attrib['accession'] in self.nmrcv['NMR:1400123'].rchildren(), 'name':'Type'},
{'hook': lambda cv: cv.attrib['accession'] == 'NMR:1000319', 'name':'Type'},
]
names = {
'fid': 'Free Induction Decay Data',
'pulseprogram': 'Pulse Sequence Data',
'acqus': 'Acquisition Parameter Data',
'procs': 'Processing Parameter Data',
'1r': '1r Data',
}
for source in source_files:
source_terms = {}
if source.attrib['name'] in names:
name = names[source.attrib['name']]
self.meta[name+' File'] = {
'value': self.sample + source.attrib['location'].split(self.sample)[-1]
}
self._parse_cv(source, hooked_terms, name)
def contacts(self):
contacts = self.tree.iterfind(self.xpaths['contacts'].format(**self.env), self.ns)
self.meta['contacts'] = {'entry_list': []}
for contact in contacts:
name = contact.attrib['fullname'].split(' ', 3)
if len(name)==1: first_name, [last_name], mid = '', name, ''
elif len(name)==2: [first_name, last_name], mid = name, ''
elif len(name)==3: first_name, mid, last_name = name
else: first_name, mid, last_name = contact.attrib['fullname'], '', ''
self.meta['contacts']['entry_list'].append( {
'first_name': first_name,
'mid': mid,
'last_name': last_name,
'mail': contact.attrib['email']
if 'email' in contact.attrib
else ''
} )
def processing(self):
processing = self.tree.find(self.xpaths['processing'].format(**self.env), self.ns)
if processing is None: return
soft_ref = processing.attrib['softwareRef']
soft, softv = self.software(soft_ref)
self.meta['Data Transformation software'] = soft
if softv is not None:
self.meta['Data Transformation software version'] = {'value': softv}
self.meta['Data Transformation Name'] = {'entry_list':[]}
for data_transformation in processing.iterfind(self.env['cvParam'], self.ns):
self.meta['Data Transformation Name']['entry_list'].append(
{
'name': data_transformation.attrib['name'],
'ref': data_transformation.attrib['cvRef'],
'accession': data_transformation.attrib['accession'],
}
)
def spectrum(self):
spectrum = self.tree.find(self.xpaths['spectrum'].format(**self.env), self.ns)
if spectrum is None: return
self.meta['Number of data points'] = {'value': int(spectrum.attrib['numberOfDataPoints'])}
terms = {'s:xAxis': 'X axis range',
's:yAxisType': 'Y axis type',
's:processingParameterSet/s:postAcquisitionSolventSuppressionMethod/':'Post Acquisition Solvent Supression Method',
's:processingParameterSet/s:calibrationCompound/':'Calibration Compound',
's:processingParameterSet/s:dataTransformationMethod/':'Spectrum transformation method',
's:firstDimensionProcessingParameterSet/s:zeroOrderPhaseCorrection': 'Zero Value Phase Correction',
's:firstDimensionProcessingParameterSet/s:firstOrderPhaseCorrection': 'First Order Phase Correction',
's:firstDimensionProcessingParameterSet/s:calibrationReferenceShift': 'Calibration Reference Shift',
's:firstDimensionProcessingParameterSet/s:spectralDenoisingMethod': 'Spectral Denoising Method',
's:firstDimensionProcessingParameterSet/s:windowFunction/s:windowFunctionMethod': 'Window Function Method',
's:firstDimensionProcessingParameterSet/s:windowFunction/s:windowFunctionMethodParameter': 'Window Function Parameter',
's:firstDimensionProcessingParameterSet/s:baselineCorrectionMethod': 'Baseline Correction Method',
}
self.read_children(spectrum, terms)
def read_children(self, node, terms):
for childpath, name in terms.items():
child = node.find(childpath, self.ns)
if child is not None:
extract = self._children_extract(child)
if not extract:
continue
if not name in self.meta or not self.meta[name]:
self.meta[name] = extract.copy()
elif not 'entry_list' in self.meta:
self.meta[name] = {'entry_list': [ self.meta[name], extract.copy() ]}
else:
self.meta[name]['entry_list'].append(extract.copy())
def probehead(self):
"""Extracts the userParam ProbeHead if no CV term was found before."""
if 'NMR Probe' not in self.meta:
probehead = self.tree.find(self.xpaths['probehead'].format(**self.env), self.ns)
if probehead is not None:
self.meta['NMR Probe'] = {'name': probehead.attrib['value'], 'ref':'', 'accession':''}
def pulse_sequence(self):
"""Extracts the userParam Pulse sequence if no CV term was found before."""
if 'Pulse sequence' not in self.meta or not self.meta['Pulse sequence']:
pulse_sequence = self.tree.find(self.xpaths['pulse_sequence'].format(**self.env), self.ns)
if pulse_sequence is not None:
self.meta['Pulse sequence'] = {'name': pulse_sequence.attrib['value'], 'ref':'', 'accession':''}
def _children_extract(self, child):
_dict = {}
if 'value' in child.attrib:
try:
_dict['value'] = float(child.attrib['value'])
except ValueError:
_dict['value'] = child.attrib['value']
if 'startValue' in child.attrib and 'endValue' in child.attrib:
start = min(float(child.attrib['startValue']), float(child.attrib['endValue']))
end = max(float(child.attrib['startValue']), float(child.attrib['endValue']))
_dict['value'] = "{}-{}".format(start, end)
if 'name' in child.attrib:
_dict['name'] = child.attrib['name']
if 'unitName' in child.attrib:
_dict['unit'] = { 'name': child.attrib['unitName'],
'ref': child.attrib['unitCvRef'],
'accession': child.attrib['unitAccession'] }
if 'cvRef' in child.attrib:
_dict['ref'] = child.attrib['cvRef']
_dict['accession'] = child.attrib['accession']
return _dict
def _convert_magnetic_field(self):
"""Convert magnetic field value from mHz to tesla."""
if not 'Magnetic field strength' in self.meta:
return
if self.meta['Magnetic field strength']['unit']['accession'] != 'UO_0000325':
return
if 'Acquisition Nucleus' in self.meta:
if self.meta['Acquisition Nucleus']['accession'] in self.gyromagnetic_table:
mhz = float(self.meta['Magnetic field strength']['value'])
tesla = mhz / self.gyromagnetic_table[self.meta['Acquisition Nucleus']['accession']]
self.meta['Magnetic field strength'] = {
'value': "{:.3f}".format(tesla),
'unit': {'name':'tesla', 'ref':'UO', 'accession':'UO_0000228' }
}
@classmethod
def _urllize(cls, starting_point):
for k,v in starting_point.items():
if isinstance(v, dict):
for key, param in v.items():
if isinstance(param, dict):
cls._urllize(param)
elif isinstance(param, list):
for element in param:
cls._urllize(element)
elif key=='accession':
if 'http' not in param:
starting_point[k][key] = cls._urllize_name(param)
elif k == 'accession':
starting_point[k] = cls._urllize_name(v)
@staticmethod
def _urllize_name(accession):
if accession.startswith('NMR'):
return 'http://nmrML.org/nmrCV#{}'.format(accession)
elif accession.startswith('UO') or accession.startswith('CHEBI'):
return 'http://purl.obolibrary.org/obo/{}'.format(accession.replace(':', '_'))
elif accession.startswith('C'):
return 'http://ncicb.nci.nih.gov/xml/owl/EVS/Thesaurus.owl#{}'.format(accession.replace(':', '_'))
return accession
def _parse_cv(self, node, terms, name):
for cv in node.iterfind('./{cvParam}'.format(**self.env), self.ns):
for term in terms:
if term['hook'](cv):
self.meta[' '.join([name, term['name']])] = {
'name': cv.attrib['name'],
'ref': cv.attrib['cvRef'],
'accession': cv.attrib['accession']
}
if 'unitName' in cv.attrib:
self.meta[' '.join([name, term['name']])]['unit'] = {
'name': cv.attrib['unitName'],
'ref': cv.attrib['unitCvRef'],
'accession': cv.attrib['unitAccession'],
}
def _build_env(self):
try:
# proper method to get namespace through nsmap (lxml)
self.ns = self.tree.getroot().nsmap
self.ns['s'] = self.ns.get(None, '')
self.ns.pop(None, None)
except AttributeError:
# 'hacked' method to get namespace through root tag (xml.etree)
if self.tree.getroot().tag.startswith('{'):
self.ns = {'s': self.tree.getroot().tag[1:].split('}')[0] }
else:
self.ns = {'s': ''}
if self.ns['s'] == '':
self.xpaths = self._raw_xpaths
else:
self.xpaths = self._namespaced_xpaths
self.env = {}
if self.tree.find('./s:nmrML', self.ns) is None:
self.env['root'] = '.'
if self.tree.find('{root}/s:instrumentConfigurationList'
'/s:instrumentConfiguration/s:cvTerm'.format(**self.env), self.ns) is None:
self.env['cvParam'] = 's:cvParam'
else:
self.env['cvParam'] = 's:cvTerm'
@property
def meta_json(self):
return json.dumps(self.meta, indent=4, sort_keys=True)
@property
def meta_isa(self):
keep = ["data transformation", "data transformation software version", "data transformation software",
"term_source", "Raw Spectral Data File", "MS Assay Name", "Derived Spectral Data File", "Sample Name",
"Acquisition Parameter Data File", "Free Induction Decay Data File", 'contacts']
meta_isa = collections.OrderedDict()
for meta_name in self.meta:
if meta_name in keep:
meta_isa[meta_name] = self.meta[meta_name]
else:
#print(meta_name)
meta_isa["Parameter Value["+meta_name+"]"] = self.meta[meta_name]
return meta_isa
@property
def isa_json(self):
return json.dumps(self.meta_isa, indent=4, sort_keys=True)
if __name__ == '__main__':
import sys
print(nmrMLmeta(sys.argv[1]).meta_json)