from __future__ import print_function
import numpy as np
from six.moves import xrange
import itertools
from . import cosmology
from collections import OrderedDict
"""
To test:
import spectrum
spec = spectrum.Spectrum('sample_sdss.txt')
spec.plotter(xmin = 6400, xmax = 6800)
spec.specfit(guesses = [20, 6718.29, 5, 100, 6564.614, 20, 50, 6585.27, 20, 20, 6732.67, 5, 50, 6549.86, 5])
spec.measure()
"""
cm_per_mpc = 3.08568e+24
[docs]class Measurements(object):
def __init__(self, Spectrum, z=None, d=None, fluxnorm=None,
miscline=None, misctol=10., ignore=None, derive=True, debug=False,
restframe=False, ptol=2, sort=False):
"""
This can be called after a fit is run. It will inherit the specfit
object and derive as much as it can from modelpars. Just do:
spec.measure(z, xunits, fluxnorm)
Notes: If z (redshift) or d (distance) are present, we can compute
integrated line luminosities rather than just fluxes. Provide distance
in cm.
Only works with Gaussians. To generalize:
1. make sure we manipulate modelpars correctly, i.e. read in
entries corresponding to wavelength/frequency/whatever correctly.
Parameters
----------
z: float or None
redshift
d: float or None
distance in cm (used for luminosities)
fluxnorm: bool
Normalize the fluxes?
miscline: dictionary
miscline = [{'name': H_alpha', 'wavelength': 6565}]
misctol: tolerance (in Angstroms) for identifying an unmatched line
to the line(s) we specify in miscline dictionary.
sort: bool
Sort the entries in order of observed wavelength (or velocity or
frequency)
"""
self.debug = debug
self.restframe = restframe
# Inherit specfit object
self.specfit = Spectrum.specfit
self.speclines = Spectrum.speclines
# Bit of a hack - help identifying unmatched lines
self.miscline = miscline
self.misctol = misctol
# Flux units in case we are interested in line luminosities or just having real flux units
if fluxnorm is not None:
self.fluxnorm = fluxnorm
else:
self.fluxnorm = 1
# This is where we'll keep our results
self.lines = OrderedDict()
# Read in observed wavelengths
tmp1 = np.reshape(self.specfit.modelpars, (int(len(self.specfit.modelpars) / 3), 3))
tmp2 = np.reshape(self.specfit.modelerrs, (int(len(self.specfit.modelerrs) / 3), 3))
if ignore is not None:
tmp1 = np.delete(tmp1, ignore, 0)
tmp2 = np.delete(tmp2, ignore, 0)
# each tmp1 contains amplitude,wavelength,width
# (Assumes gaussians)
wavelengths = tmp1[:,1]
# sort by wavelength
if sort:
order = np.argsort(wavelengths)
self.obspos = wavelengths[order]
else:
order = np.arange(wavelengths.size)
self.obspos = wavelengths
self.Nlines = wavelengths.size
# Read in modelpars and modelerrs, re-organize so they are 2D arrays sorted by ascending wavelength
self.modelpars = np.zeros_like(tmp1)
self.modelerrs = np.zeros_like(tmp2)
for i, element in enumerate(order):
self.modelpars[i] = tmp1[element]
self.modelerrs[i] = tmp2[element]
# Read in appropriate list of reference wavelengths/frequencies/whatever
self.reflines = self.speclines.optical.get_optical_lines()
self.refpos = self.reflines['xarr']
self.refname = self.reflines['name']
# Redshift reference lines if restframe = True
if self.restframe and z is not None:
self.refpos *= (1.0 + z)
# If distance or redshift has been provided, we can compute luminosities from fluxes
if d is not None:
self.d = d
else:
self.d = None
if z is not None:
self.cosmology = cosmology.Cosmology()
self.d = self.cosmology.LuminosityDistance(z) * cm_per_mpc
self.unmatched = self.identify_by_position(ptol=ptol)
#if np.sum(unmatched) >= 2:
# self.identify_by_spacing(unmatched)
if derive:
self.derive()
[docs] def identify_by_position(self, ptol):
"""
Match observed lines to nearest reference line. Don't use spacing at all.
ptol = tolerance (in angstroms) to accept positional match
"""
if not hasattr(self, 'lines'):
self.lines = OrderedDict()
# Fill lines dictionary
unmatched = np.zeros_like(self.obspos)
for i, pos in enumerate(self.obspos):
# Check miscline directory for match
matched = False
if self.miscline is not None:
for line in self.miscline:
if abs(pos - line['wavelength']) > ptol:
continue
matched = True
name = line['name']
break
if not matched:
diff = np.abs(pos - self.refpos)
loc = np.argmin(diff)
if diff[loc] <= ptol:
matched = True
name = self.refname[loc]
if name in self.lines.keys():
name += '_1'
num = int(name[-1])
while name in self.lines.keys():
num += 1
name = '%s_%i' % (self.refname[loc], num)
if matched:
self.lines[name] = {}
self.lines[name]['modelpars'] = list(self.modelpars[i])
self.lines[name]['modelerrs'] = list(self.modelerrs[i])
else:
name = 'unknown_1'
num = 1
while name in self.lines.keys():
num += 1
name = 'unknown_%i' % num
self.lines[name] = {}
self.lines[name]['modelpars'] = list(self.modelpars[i])
self.lines[name]['modelerrs'] = list(self.modelerrs[i])
unmatched[i] = 1
return unmatched
[docs] def identify_by_spacing(self):
"""
Determine identity of lines in self.modelpars. Fill entries of self.lines dictionary.
Note: This method will be infinitely slow for more than 10 or so lines.
"""
if self.unmatched is None:
self.unmatched = np.ones_like(self.obspos)
# Remove lines that were already identified
obspos = self.obspos[self.unmatched == 1]
# Spacing between observed lines (odiff) and reference lines (rdiff)
self.odiff = np.abs(np.diff(obspos))
self.rdiff = np.abs(np.diff(self.refpos))
# Don't try to identify lines with separations smaller than the smallest
# separation in our reference library
self.rdmin = 0.99 * min(self.rdiff)
# If lines have multiple components (i.e. spacing much closer than ref lines),
# delete them from ID list.
if np.any(self.odiff) < self.rdmin:
where = np.ravel(np.argwhere(self.odiff < self.rdmin))
odiff = np.delete(self.odiff, where)
multi = True
else:
where = 0
odiff = self.odiff
multi = False
refpos = self.refpos
refname = self.refname
# Don't include elements of reference array that are far away from the observed lines (speeds things up)
condition = (refpos >= 0.99 * min(self.obspos)) & (refpos <= 1.01 * max(self.obspos))
refpos = refpos[condition]
refname = refname[condition]
if len(refpos) == 0:
print('WARNING: No reference lines in this wavelength regime.')
elif len(refpos) < self.Nlines:
print('WARNING: More observed lines than reference lines in this band.')
# Construct all possible (N-element) combos of reference lines
combos = itertools.combinations(refpos, min(self.Nlines, len(refpos)))
# List to keep track of line identification. Each entry is (cost, (line1, line2, line3,...))
self.IDresults = []
for i, combo in enumerate(combos):
rdiff = np.diff(combo)
if len(odiff) == len(rdiff):
result = (np.sum(np.abs(odiff - rdiff)), combo)
self.IDresults.append(result)
else: # If more/less observed lines than reference lines, try excluding observed lines one at a time
if len(odiff) > len(rdiff):
subcombos = itertools.combinations(odiff, len(rdiff))
for subcombo in subcombos:
result = (np.sum(np.abs(subcombo - rdiff)), combo)
self.IDresults.append(result)
else:
subcombos = itertools.combinations(rdiff, len(odiff))
for subcombo in subcombos:
result = (np.sum(np.abs(odiff - subcombo)), combo)
self.IDresults.append(result)
# Pick best solution
best = np.argmin(zip(*self.IDresults)[0]) # Location of best solution
ALLloc = [] # x-values of best fit lines in reference dictionary
# Determine indices of matched reference lines
for element in self.IDresults[best][1]:
ALLloc.append(np.argmin(np.abs(refpos - element)))
# Fill lines dictionary
for i, element in enumerate(ALLloc):
line = refname[element]
self.lines[line] = {}
loc = np.argmin(np.abs(self.obspos - refpos[element]))
self.lines[line]['modelpars'] = list(self.modelpars[loc])
self.lines[line]['modelerrs'] = list(self.modelerrs[loc])
# Track down odd lines (i.e. broad components of lines already identified)
# This won't yet work for lines that are truly unidentified
if len(ALLloc) < self.Nlines:
# Figure out which modelpars/errs that belong to lines that were already identified
mpars = self.modelpars.copy()
merrs = self.modelerrs.copy()
for line in self.lines:
wavelengths = zip(*mpars)[1]
i = np.argmin(np.abs(zip(*mpars)[1] - self.lines[line]['modelpars'][1]))
mpars = np.delete(mpars, i, 0)
merrs = np.delete(merrs, i, 0)
# Loop over unmatched modelpars/errs, find name of unmatched line, extend corresponding dict entry
if self.miscline is None:
for i, x in enumerate(zip(*mpars)[1]):
self.lines['unknown%i' % i] = {}
self.lines['unknown%i' % i]['modelpars'] = mpars[i]
self.lines['unknown%i' % i]['modelerrs'] = merrs[i]
# If we've know a-priori which lines the unmatched lines are likely to be, use that information
else:
print(self.miscline)
for i, miscline in enumerate(self.miscline):
for j, x in enumerate(zip(*mpars)[1]):
if abs(x - miscline['wavelength']) < self.misctol:
name = miscline['name']
else:
name = 'unknown%i' % j
self.lines[name] = {}
self.lines[name]['modelpars'] = mpars[j]
self.lines[name]['modelerrs'] = merrs[j]
self.separate()
[docs] def derive(self):
"""
Calculate luminosity and FWHM for all spectral lines.
"""
for line in self.lines.keys():
if self.debug:
print("Computing parameters for line %s" % line)
self.lines[line]['fwhm'] = self.compute_fwhm(self.lines[line]['modelpars'])
self.lines[line]['flux'] = self.compute_flux(self.lines[line]['modelpars'])
self.lines[line]['amp'] = self.compute_amplitude(self.lines[line]['modelpars'])
self.lines[line]['pos'] = self.lines[line]['modelpars'][1]
if self.d is not None:
self.lines[line]['lum'] = self.compute_luminosity(self.lines[line]['modelpars'])
[docs] def separate(self):
"""
For multicomponent lines, separate into broad and narrow components (assume only one of components is narrow).
"""
for key in self.lines.keys():
modpars = self.lines[key]['modelpars']
moderrs = self.lines[key]['modelerrs']
if len(modpars) > 3:
modpars2d = np.reshape(modpars, (len(modpars) / 3, 3))
moderrs2d = np.reshape(moderrs, (len(moderrs) / 3, 3))
sigma = zip(*modpars2d)[2]
minsigma = min(np.abs(sigma))
i_narrow = list(np.abs(sigma)).index(minsigma)
else:
continue
self.lines["{0}_N".format(key)] = {}
self.lines["{0}_N".format(key)]['modelpars'] = []
self.lines["{0}_N".format(key)]['modelerrs'] = []
self.lines["{0}_B".format(key)] = {}
self.lines["{0}_B".format(key)]['modelpars'] = []
self.lines["{0}_B".format(key)]['modelerrs'] = []
for i, arr in enumerate(modpars2d):
if i == i_narrow:
self.lines["{0}_N".format(key)]['modelpars'] = arr
self.lines["{0}_N".format(key)]['modelerrs'] = moderrs2d[i]
else:
self.lines["{0}_B".format(key)]['modelpars'].extend(arr)
self.lines["{0}_B".format(key)]['modelerrs'].extend(moderrs2d[i])
[docs] def compute_flux(self, pars):
"""
Calculate integrated flux of emission line. Works for multi-component fits too. Unnormalized.
"""
flux = 0
niter = (len(pars) / 3)
assert niter == int(niter)
for i in xrange(int(niter)):
flux += np.sqrt(2. * np.pi) * pars[3 * i] * abs(pars[2 + 3 * i])
return flux * self.fluxnorm
[docs] def compute_amplitude(self, pars):
"""
Calculate amplitude of emission line. Should be easy - add multiple components if they exist.
Currently assumes multiple components have the same centroid.
"""
amp = 0
niter = (len(pars) / 3)
for i in xrange(int(niter)):
amp += pars[3 * i]
return amp * self.fluxnorm
[docs] def compute_luminosity(self, pars):
"""
Determine luminosity of line (need distance and flux units).
"""
lum = 0
niter = (len(pars) / 3)
for i in xrange(int(niter)):
lum += self.compute_flux(pars) * 4. * np.pi * self.d**2
return lum
[docs] def compute_fwhm(self, pars):
"""
Determine full-width at half maximum for multi-component fit numerically, or analytically if line
has only a single component. Uses bisection technique for the former with absolute tolerance of 1e-4.
"""
if len(pars) == 3:
return 2. * np.sqrt(2. * np.log(2.)) * abs(pars[2])
else:
atol = 1e-4
niter = (len(pars) / 3)
pars2d = np.reshape(pars, (int(niter), 3))
start = list(zip(*pars2d))[1][0] # start at central wavelength of first component
# If the centroids are exactly the same for all components, we know the peak, and peak position
if np.allclose(zip(*pars2d)[1], atol):
fmax = np.sum(zip(*pars2d)[0])
# Otherwise, we have to figure out where the multicomponent peak is
else:
f = lambda x: self.specfit.fitter.slope(x)
xfmax = self.bisection(f, start)
fmax = self.specfit.fitter.n_modelfunc(pars)(np.array([xfmax, xfmax]))[0]
hmax = 0.5 * fmax
# current height relative to half max - we want to minimize this function. Could be asymmetric.
f = lambda x: self.specfit.fitter.n_modelfunc(pars)(np.array([x])) - hmax
xhmax1 = self.bisection(f, start)
xhmax2 = self.bisection(f, start + (start - xhmax1))
return abs(xhmax2 - xhmax1)
[docs] def bisection(self, f, x_guess):
"""
Find root of function using bisection method. Absolute tolerance of 1e-4 is being used.
"""
x1, x2 = self.bracket_root(f, x_guess)
# Narrow bracketed range with bisection until tolerance is met
while abs(x2 - x1) > 1e-4:
midpt = np.mean([x1, x2])
fmid = f(midpt)
if np.sign(fmid) < 0: x1 = midpt
else: x2 = midpt
if fmid == 0.0: break
return x2
[docs] def bracket_root(self, f, x_guess, atol = 1e-4):
"""
Bracket root by finding points where function goes from positive to negative.
"""
f1 = f(x_guess)
f2 = f(x_guess + 1)
df = f2 - f1
# Determine whether increasing or decreasing x_guess will lead us to zero
if (f1 > 0 and df < 0) or (f1 < 0 and df > 0): sign = 1
else: sign = -1
# Find root bracketing points
xpre = x_guess
xnow = x_guess + sign
fpre = f1
fnow = f(xnow)
while (np.sign(fnow) == np.sign(fpre)):
xpre = xnow
xnow += sign * 0.1
fpre = f(xpre)
fnow = f(xnow)
x1 = min(xnow, xpre)
x2 = max(xnow, xpre)
if not np.all([np.sign(fpre), np.sign(fnow)]):
x1 -= 1e-4
x2 += 1e-4
return x1, x2
[docs] def to_tex(self):
"""
Write out fit results to tex format.
"""
pass