"""
========================================
Ammonia inversion transition TROT fitter
========================================
Ammonia inversion transition TROT fitter translated from Erik Rosolowsky's
https://github.com/low-sky/nh3fit
.. moduleauthor:: Adam Ginsburg <adam.g.ginsburg@gmail.com>
Module API
^^^^^^^^^^
"""
from __future__ import division
import numpy as np
from ...mpfit import mpfit
from ...spectrum.parinfo import ParinfoList,Parinfo
from . import fitter
from . import model
import matplotlib.cbook as mpcb
import copy
from astropy import log
from six import iteritems
from . import mpfit_messages
import operator
import string
import warnings
from .ammonia_constants import (line_names, freq_dict, aval_dict, ortho_dict,
TCMB,
voff_lines_dict, tau_wts_dict)
[docs]def ammonia(xarr, trot=20, tex=None, ntot=14, width=1, xoff_v=0.0, fortho=0.0,
tau=None, fillingfraction=None, return_tau=False,
return_tau_profile=False, background_tb=TCMB, verbose=False,
return_components=False, debug=False, line_names=line_names,
ignore_neg_models=False):
"""
Generate a model Ammonia spectrum based on input temperatures, column, and
gaussian parameters. The returned model will be in Kelvin (brightness
temperature) units.
Note that astropy units are not used internally for performance reasons. A
wrapped version of this module including those units would be a good idea,
as it is definitely possible to implement this with unit support and good
performance.
Parameters
----------
xarr: `pyspeckit.spectrum.units.SpectroscopicAxis`
Array of wavelength/frequency values
trot: float
The rotational temperature of the lines. This is the excitation
temperature that governs the relative populations of the rotational
states.
tex: float or None
Excitation temperature. Assumed LTE if unspecified (``None``) or if
tex>trot. This is the excitation temperature for *all* of the modeled
lines, which means we are explicitly assuming T_ex is the same for all
lines.
ntot: float
Total log column density of NH3. Can be specified as a float in the
range 5-25
width: float
Line width (Gaussian sigma) in km/s
xoff_v: float
Line offset in km/s
fortho: float
Fraction of NH3 molecules in ortho state. Default assumes all para
(fortho=0).
tau: None or float
If tau (optical depth in the 1-1 line) is specified, ntot is NOT fit
but is set to a fixed value. The optical depths of the other lines are
fixed relative to tau_oneone
fillingfraction: None or float
fillingfraction is an arbitrary scaling factor to apply to the model
return_tau: bool
Return a dictionary of the optical depths in each line instead of a
synthetic spectrum
return_tau_profile: bool
Return a dictionary of the optical depth profiles in each line, i.e.,
the optical depths that will be used in conjunction with T_ex to produce
the synthetic spectrum
return_components: bool
Return a list of arrays, one for each hyperfine component, instead of
just one array
background_tb : float
The background brightness temperature. Defaults to TCMB.
ignore_neg_models: bool
Normally if background=TCMB and the model is negative, an exception
will be raised. This parameter will simply skip that exception. Use
with extreme caution: negative models (absorption spectra against the
CMB) are not physical! You may want to allow this in some cases
because there can be numerical issues where the model goes negative
when it shouldn't.
verbose: bool
More messages
debug: bool
For debugging.
Returns
-------
spectrum: `numpy.ndarray`
Synthetic spectrum with same shape as ``xarr``
component_list: list
List of `numpy.ndarray`'s, one for each hyperfine component
tau_dict: dict
Dictionary of optical depth values for the various lines
(if ``return_tau`` is set)
"""
from .ammonia_constants import (ckms, ccms, h, kb,
Jortho, Jpara, Brot, Crot)
# Convert X-units to frequency in GHz
if xarr.unit.to_string() != 'GHz':
xarr = xarr.as_unit('GHz')
if tex is None:
log.warning("Assuming tex=trot")
tex = trot
elif isinstance(tex, dict):
for k in tex:
assert k in line_names,"{0} not in line list".format(k)
line_names = tex.keys()
elif tex > trot:
warnings.warn("tex > trot in the ammonia model. "
"This is unphysical and "
"suggests that you may need to constrain tex. See "
"ammonia_model_restricted_tex.")
if width < 0:
return np.zeros(xarr.size)*np.nan
elif width == 0:
return np.zeros(xarr.size)
from .ammonia_constants import line_name_indices, line_names as original_line_names
# recreate line_names keeping only lines with a specified tex
# using this loop instead of tex.keys() preserves the order & data type
line_names = [k for k in original_line_names if k in line_names]
if 5 <= ntot <= 25:
# allow ntot to be specified as a logarithm. This is
# safe because ntot < 1e10 gives a spectrum of all zeros, and the
# plausible range of columns is not outside the specified range
lin_ntot = 10**ntot
else:
raise ValueError("ntot, the logarithmic total column density,"
" must be in the range 5 - 25")
tau_dict = {}
"""
Column density is the free parameter. It is used in conjunction with
the full partition function to compute the optical depth in each band
"""
Zpara = (2*Jpara+1)*np.exp(-h*(Brot*Jpara*(Jpara+1)+
(Crot-Brot)*Jpara**2)/(kb*trot))
Zortho = 2*(2*Jortho+1)*np.exp(-h*(Brot*Jortho*(Jortho+1)+
(Crot-Brot)*Jortho**2)/(kb*trot))
Qpara = Zpara.sum()
Qortho = Zortho.sum()
log.debug("Partition Function: Q_ortho={0}, Q_para={1}".format(Qortho, Qpara))
for linename in line_names:
if ortho_dict[linename]:
# define variable "ortho_or_para_frac" that will be the ortho
# fraction in the case of an ortho transition or the para
# fraction for a para transition
ortho_or_parafrac = fortho
Z = Zortho
Qtot = Qortho
else:
ortho_or_parafrac = 1.0-fortho
Z = Zpara
Qtot = Qpara
# for a complete discussion of these equations, please see
# https://github.com/keflavich/pyspeckit/blob/ammonia_equations/examples/AmmoniaLevelPopulation.ipynb
# https://github.com/pyspeckit/pyspeckit/blob/master/examples/AmmoniaLevelPopulation.ipynb
# and
# http://low-sky.github.io/ammoniacolumn/
# and
# https://github.com/pyspeckit/pyspeckit/pull/136
# short variable names for readability
frq = freq_dict[linename]
partition = Z[line_name_indices[linename]]
aval = aval_dict[linename]
# Total population of the higher energy inversion transition
population_rotstate = lin_ntot * ortho_or_parafrac * partition/Qtot
if isinstance(tex, dict):
expterm = ((1-np.exp(-h*frq/(kb*tex[linename]))) /
(1+np.exp(-h*frq/(kb*tex[linename]))))
else:
expterm = ((1-np.exp(-h*frq/(kb*tex))) /
(1+np.exp(-h*frq/(kb*tex))))
fracterm = (ccms**2 * aval / (8*np.pi*frq**2))
widthterm = (ckms/(width*frq*(2*np.pi)**0.5))
tau_i = population_rotstate * fracterm * expterm * widthterm
tau_dict[linename] = tau_i
log.debug("Line {0}: tau={1}, expterm={2}, pop={3},"
" partition={4}"
.format(linename, tau_i, expterm, population_rotstate,
partition))
# allow tau(11) to be specified instead of ntot
# in the thin case, this is not needed: ntot plays no role
# this process allows you to specify tau without using the approximate equations specified
# above. It should remove ntot from the calculations anyway...
if tau is not None:
tau11_temp = tau_dict['oneone']
# re-scale all optical depths so that tau is as specified, but the relative taus
# are sest by the kinetic temperature and partition functions
for linename,t in iteritems(tau_dict):
tau_dict[linename] = t * tau/tau11_temp
if return_tau:
return tau_dict
model_spectrum = _ammonia_spectrum(xarr, tex, tau_dict, width, xoff_v,
fortho, line_names,
background_tb=background_tb,
fillingfraction=fillingfraction,
return_components=return_components,
return_tau_profile=return_tau_profile
)
if not return_tau_profile and model_spectrum.min() < 0 and background_tb == TCMB and not ignore_neg_models:
raise ValueError("Model dropped below zero. That is not possible "
" normally. Here are the input values: "+
("tex: {0} ".format(tex)) +
("trot: %f " % trot) +
("ntot: %f " % ntot) +
("width: %f " % width) +
("xoff_v: %f " % xoff_v) +
("fortho: %f " % fortho)
)
if verbose or debug:
log.info("trot: %g tex: %s ntot: %g width: %g xoff_v: %g "
"fortho: %g fillingfraction: %g" % (trot, tex, ntot, width,
xoff_v, fortho,
fillingfraction))
return model_spectrum
[docs]def cold_ammonia(xarr, tkin, **kwargs):
"""
Generate a model Ammonia spectrum based on input temperatures, column, and
gaussian parameters
Parameters
----------
xarr: `pyspeckit.spectrum.units.SpectroscopicAxis`
Array of wavelength/frequency values
tkin: float
The kinetic temperature of the lines in K. Will be converted to
rotational temperature following the scheme of Swift et al 2005
(http://esoads.eso.org/abs/2005ApJ...620..823S, eqn A6) and further
discussed in Equation 7 of Rosolowsky et al 2008
(http://adsabs.harvard.edu/abs/2008ApJS..175..509R)
"""
dT0 = 41.18 # Energy difference between (2,2) and (1,1) in K
trot = tkin * (1 + (tkin/dT0)*np.log(1 + 0.6*np.exp(-15.7/tkin)))**-1
log.debug("Cold ammonia turned T_K = {0} into T_rot = {1}".format(tkin,trot))
return ammonia(xarr, trot=trot, **kwargs)
[docs]def ammonia_thin(xarr, tkin=20, tex=None, ntot=14, width=1, xoff_v=0.0,
fortho=0.0, tau=None, return_tau=False, **kwargs):
"""
Use optical depth in the 1-1 line as a free parameter
The optical depths of the other lines are then set by the kinetic
temperature
tkin is used to compute trot assuming a 3-level system consisting of (1,1),
(2,1), and (2,2) as in Swift et al, 2005 [2005ApJ...620..823S]
"""
tau_dict = {}
tex = tkin
dT0 = 41.5 # Energy diff between (2,2) and (1,1) in K
trot = tkin/(1+tkin/dT0*np.log(1+0.6*np.exp(-15.7/tkin)))
tau_dict['oneone'] = tau
tau_dict['twotwo'] = tau*(23.722/23.694)**2*4/3.*5/3.*np.exp(-41.5/trot)
tau_dict['threethree'] = tau*(23.8701279/23.694)**2*3/2.*14./3.*np.exp(-101.1/trot)
tau_dict['fourfour'] = tau*(24.1394169/23.694)**2*8/5.*9/3.*np.exp(-177.34/trot)
line_names = tau_dict.keys()
# TODO: Raise a warning if tkin > (some value), probably 50 K, because
# the 3-level system approximation used here will break down.
if return_tau:
return tau_dict
else:
return _ammonia_spectrum(xarr, tex, tau_dict, width, xoff_v, fortho,
line_names, **kwargs)
def _ammonia_spectrum(xarr, tex, tau_dict, width, xoff_v, fortho, line_names,
background_tb=TCMB, fillingfraction=None,
return_components=False, return_tau_profile=False):
"""
Helper function: given a dictionary of ammonia optical depths,
an excitation tmeperature etc, produce the spectrum.
The default return units are brightness temperature in Kelvin. If
``return_tau_profile`` is specified, the returned "spectrum" will be
a spectrum of optical depths, not an intensity spectrum.
If ``return_components`` is specified, a list of spectra will be returned,
where each spectrum represents one of the hyperfine components of the
particular ammonia line being modeled.
"""
from .ammonia_constants import (ckms, h, kb)
# fillingfraction is an arbitrary scaling for the data
# The model will be (normal model) * fillingfraction
if fillingfraction is None:
fillingfraction = 1.0
# "runspec" means "running spectrum": it is accumulated over a loop
runspec = np.zeros(len(xarr))
if return_components:
components = []
if return_tau_profile:
tau_profile = {}
for linename in line_names:
voff_lines = np.array(voff_lines_dict[linename])
tau_wts = np.array(tau_wts_dict[linename])
lines = (1-voff_lines/ckms)*freq_dict[linename]/1e9
tau_wts = tau_wts / (tau_wts).sum()
nuwidth = np.abs(width/ckms*lines)
nuoff = xoff_v/ckms*lines
# tau array
tauprof = np.zeros(len(xarr))
for kk,nuo in enumerate(nuoff):
tauprof_ = (tau_dict[linename] * tau_wts[kk] *
np.exp(-(xarr.value+nuo-lines[kk])**2 /
(2.0*nuwidth[kk]**2)))
if return_components:
components.append(tauprof_)
tauprof += tauprof_
if return_tau_profile:
tau_profile[linename] = tauprof
T0 = (h*xarr.value*1e9/kb) # "temperature" of wavelength
if isinstance(tex, dict):
runspec = ((T0/(np.exp(T0/tex[linename])-1) -
T0/(np.exp(T0/background_tb)-1)) *
(1-np.exp(-tauprof)) * fillingfraction + runspec)
else:
runspec = ((T0/(np.exp(T0/tex)-1) -
T0/(np.exp(T0/background_tb)-1)) *
(1-np.exp(-tauprof)) * fillingfraction + runspec)
if return_components:
if isinstance(tex, dict):
term1 = [(T0/(np.exp(T0/tex[linename])-1)-T0/(np.exp(T0/background_tb)-1))
for linename in line_names]
else:
term1 = (T0/(np.exp(T0/tex)-1)-T0/(np.exp(T0/background_tb)-1))
return term1*(1-np.exp(-1*np.array(components)))
elif return_tau_profile:
return tau_profile
else:
return runspec
[docs]class ammonia_model(model.SpectralModel):
"""
The basic Ammonia (NH3) model with 6 free parameters:
Trot, Tex, ntot, width, xoff_v, and fortho
Trot is the rotational temperature. It governs the relative populations of
the rotational states, i.e., the relative strength of different transitions
Tex is the excitation temperature. It is assumed constant across all
states, which is not always a good assumption - a radiative transfer and
excitation model is required to constrain this, though.
ntot is the total column density of p-NH3 integrated over all states.
width is the linewidth (the Gaussian sigma)
xoff_v is the velocity offset / line of sight velocity
fortho is the ortho fraction (northo / (northo+npara))
"""
def __init__(self,npeaks=1,npars=6,
parnames=['trot','tex','ntot','width','xoff_v','fortho'],
**kwargs):
npeaks = self.npeaks = int(npeaks)
npars = self.npars = int(npars)
self._default_parnames = parnames
self.parnames = copy.copy(self._default_parnames)
# all fitters must have declared modelfuncs, which should take the fitted pars...
self.modelfunc = ammonia
self.n_modelfunc = self.n_ammonia
# for fitting ammonia simultaneously with a flat background
self.onepeakammonia = fitter.vheightmodel(ammonia)
#self.onepeakammoniafit = self._fourparfitter(self.onepeakammonia)
self.default_parinfo = None
self.default_parinfo, kwargs = self._make_parinfo(**kwargs)
# Remove keywords parsed by parinfo and ignored by the fitter
for kw in ('tied','partied'):
if kw in kwargs:
kwargs.pop(kw)
# enforce ammonia-specific parameter limits
for par in self.default_parinfo:
if 'tex' in par.parname.lower():
par.limited = (True,par.limited[1])
par.limits = (max(par.limits[0],TCMB), par.limits[1])
if 'trot' in par.parname.lower():
par.limited = (True,par.limited[1])
par.limits = (max(par.limits[0],TCMB), par.limits[1])
if 'width' in par.parname.lower():
par.limited = (True,par.limited[1])
par.limits = (max(par.limits[0],0), par.limits[1])
if 'fortho' in par.parname.lower():
par.limited = (True,True)
if par.limits[1] != 0:
par.limits = (max(par.limits[0],0), min(par.limits[1],1))
else:
par.limits = (max(par.limits[0],0), 1)
if 'ntot' in par.parname.lower():
par.limited = (True,par.limited[1])
par.limits = (max(par.limits[0],0), par.limits[1])
self.parinfo = copy.copy(self.default_parinfo)
self.modelfunc_kwargs = kwargs
# lower case? self.modelfunc_kwargs.update({'parnames':self.parinfo.parnames})
self.use_lmfit = kwargs.pop('use_lmfit') if 'use_lmfit' in kwargs else False
self.fitunit = 'GHz'
def __call__(self,*args,**kwargs):
return self.multinh3fit(*args,**kwargs)
[docs] def n_ammonia(self, pars=None, parnames=None, **kwargs):
"""
Returns a function that sums over N ammonia line profiles, where N is the length of
trot,tex,ntot,width,xoff_v,fortho *OR* N = len(pars) / 6
The background "height" is assumed to be zero (you must "baseline" your
spectrum before fitting)
*pars* [ list ]
a list with len(pars) = (6-nfixed)n, assuming
trot,tex,ntot,width,xoff_v,fortho repeated
*parnames* [ list ]
len(parnames) must = len(pars). parnames determine how the ammonia
function parses the arguments
"""
npeaks = self.npeaks
npars = len(self.default_parinfo)
if hasattr(pars,'values'):
# important to treat as Dictionary, since lmfit params & parinfo both have .items
parnames,parvals = zip(*pars.items())
parnames = [p.lower() for p in parnames]
parvals = [p.value for p in parvals]
elif parnames is None:
parvals = pars
parnames = self.parnames
else:
parvals = pars
if len(pars) != len(parnames):
# this should only be needed when other codes are changing the number of peaks
# during a copy, as opposed to letting them be set by a __call__
# (n_modelfuncs = n_ammonia can be called directly)
# n_modelfuncs doesn't care how many peaks there are
if len(pars) % len(parnames) == 0:
parnames = [p for ii in range(len(pars)//len(parnames)) for p in parnames]
npeaks = int(len(parvals) / npars)
log.debug("Setting npeaks={0} npars={1}".format(npeaks, npars))
else:
raise ValueError("Wrong array lengths passed to n_ammonia!")
self._components = []
def L(x):
v = np.zeros(len(x))
for jj in range(int(npeaks)):
modelkwargs = kwargs.copy()
for ii in range(int(npars)):
name = parnames[ii+jj*int(npars)].strip('0123456789').lower()
modelkwargs.update({name:parvals[ii+jj*int(npars)]})
v += self.modelfunc(x,**modelkwargs)
return v
return L
[docs] def components(self, xarr, pars, hyperfine=False,
return_hyperfine_components=False, **kwargs):
"""
Ammonia components don't follow the default, since in Galactic
astronomy the hyperfine components should be well-separated.
If you want to see the individual components overlaid, you'll need to
pass hyperfine to the plot_fit call
"""
comps=[]
for ii in range(self.npeaks):
if hyperfine or return_hyperfine_components:
modelkwargs = dict(zip(self.parnames[ii*self.npars:(ii+1)*self.npars],
pars[ii*self.npars:(ii+1)*self.npars]))
comps.append(self.modelfunc(xarr, return_components=True,
**modelkwargs))
else:
modelkwargs = dict(zip(self.parnames[ii*self.npars:(ii+1)*self.npars],
pars[ii*self.npars:(ii+1)*self.npars]))
comps.append([self.modelfunc(xarr, return_components=False,
**modelkwargs)])
modelcomponents = np.concatenate(comps)
return modelcomponents
[docs] def multinh3fit(self, xax, data, err=None,
parinfo=None,
quiet=True, shh=True,
debug=False,
maxiter=200,
use_lmfit=False,
veryverbose=False, **kwargs):
"""
Fit multiple nh3 profiles (multiple can be 1)
Parameters
----------
xax : array
x axis
data : array
y axis
npeaks : int
How many nh3 profiles to fit? Default 1 (this could supersede onedgaussfit)
err : array
error corresponding to data
params : list
Fit parameters: [trot, tex, ntot (or tau), width, offset, ortho fraction] * npeaks
If len(params) % 6 == 0, npeaks will be set to len(params) / 6.
These parameters (and the related fixed, limited, min/max, names
below) need to have length = 6*npeaks. If npeaks > 1 and length =
6, they will be replicated npeaks times, otherwise they will be
reset to defaults:
fixed : list
Is parameter fixed?
limitedmin : list
minpars : list
set lower limits on each parameter (default: width>0, Tex and trot > Tcmb)
limitedmax : list
maxpars : list
set upper limits on each parameter
parnames : list
default parameter names, important for setting kwargs in model
['trot','tex','ntot','width','xoff_v','fortho']
quiet : bool
should MPFIT output each iteration?
shh : bool
output final parameters?
Returns
-------
mpp : model parameter object
Fit parameters
model : array
The model array
errors : array
the fit errors
chi2 : float
the chi^2 value of the fit
"""
if parinfo is None:
parinfo = self.parinfo = self.make_parinfo(**kwargs)
else:
if isinstance(parinfo, ParinfoList):
if not quiet:
log.info("Using user-specified parinfo.")
self.parinfo = parinfo
else:
if not quiet:
log.info("Using something like a user-specified parinfo, but not.")
self.parinfo = ParinfoList([p if isinstance(p,Parinfo) else Parinfo(p)
for p in parinfo],
preserve_order=True)
fitfun_kwargs = dict((x,y) for (x,y) in kwargs.items()
if x not in ('npeaks', 'params', 'parnames',
'fixed', 'limitedmin', 'limitedmax',
'minpars', 'maxpars', 'tied',
'max_tem_step'))
fitfun_kwargs.update(self.modelfunc_kwargs)
if 'use_lmfit' in fitfun_kwargs:
raise KeyError("use_lmfit was specified in a location where it "
"is unacceptable")
# not used: npars = len(parinfo)/self.npeaks
self._validate_parinfo()
def mpfitfun(x,y,err):
if err is None:
def f(p,fjac=None):
return [0,(y-self.n_ammonia(pars=p,
parnames=parinfo.parnames,
**fitfun_kwargs)(x))]
else:
def f(p,fjac=None):
return [0,(y-self.n_ammonia(pars=p,
parnames=parinfo.parnames,
**fitfun_kwargs)(x))/err]
return f
if veryverbose:
log.info("GUESSES: ")
log.info(str(parinfo))
#log.info "\n".join(["%s: %s" % (p['parname'],p['value']) for p in parinfo])
if use_lmfit:
return self.lmfitter(xax, data, err=err,
parinfo=parinfo,
quiet=quiet,
debug=debug)
else:
mp = mpfit(mpfitfun(xax,data,err),
parinfo=parinfo,
maxiter=maxiter,
quiet=quiet,
debug=debug)
mpp = mp.params
if mp.perror is not None:
mpperr = mp.perror
else:
mpperr = mpp*0
chi2 = mp.fnorm
if mp.status == 0:
raise Exception(mp.errmsg)
for i,p in enumerate(mpp):
parinfo[i]['value'] = p
parinfo[i]['error'] = mpperr[i]
if not shh:
log.info("Fit status: {0}".format(mp.status))
log.info("Fit message: {0}".format(mpfit_messages[mp.status]))
log.info("Fit error message: {0}".format(mp.errmsg))
log.info("Final fit values: ")
for i,p in enumerate(mpp):
log.info(" ".join((parinfo[i]['parname'], str(p), " +/- ",
str(mpperr[i]))))
log.info(" ".join(("Chi2: ", str(mp.fnorm)," Reduced Chi2: ",
str(mp.fnorm/len(data)), " DOF:",
str(len(data)-len(mpp)))))
self.mp = mp
self.parinfo = parinfo
self.mpp = self.parinfo.values
self.mpperr = self.parinfo.errors
self.mppnames = self.parinfo.names
self.model = self.n_ammonia(pars=self.mpp, parnames=self.mppnames,
**fitfun_kwargs)(xax)
indiv_parinfo = [self.parinfo[jj*self.npars:(jj+1)*self.npars]
for jj in range(int(len(self.parinfo)/self.npars))]
modelkwargs = [dict([(p['parname'].strip("0123456789").lower(),
p['value']) for p in pi])
for pi in indiv_parinfo]
self.tau_list = [self.modelfunc(xax, return_tau=True,**mk)
for mk in modelkwargs]
return self.mpp,self.model,self.mpperr,chi2
[docs] def moments(self, Xax, data, negamp=None, veryverbose=False, **kwargs):
"""
Returns a very simple and likely incorrect guess
"""
# trot, TEX, ntot, width, center, ortho fraction
return [20,10, 15, 1.0, 0.0, 1.0]
[docs] def annotations(self):
from decimal import Decimal # for formatting
tex_key = {'trot':'T_R', 'tkin': 'T_K', 'tex':'T_{ex}', 'ntot':'N',
'fortho':'F_o', 'width':'\\sigma', 'xoff_v':'v',
'fillingfraction':'FF', 'tau':'\\tau_{1-1}',
'background_tb':'T_{BG}', 'delta':'T_R-T_{ex}'}
# small hack below: don't quantize if error > value. We want to see the values.
label_list = []
for pinfo in self.parinfo:
parname = tex_key[pinfo['parname'].strip("0123456789").lower()]
parnum = int(pinfo['parname'][-1])
if pinfo['fixed']:
formatted_value = "%s" % pinfo['value']
pm = ""
formatted_error=""
else:
formatted_value = Decimal("%g" % pinfo['value']).quantize(Decimal("%0.2g" % (min(pinfo['error'],pinfo['value']))))
pm = "$\\pm$"
formatted_error = Decimal("%g" % pinfo['error']).quantize(Decimal("%0.2g" % pinfo['error']))
label = "$%s(%i)$=%8s %s %8s" % (parname, parnum, formatted_value, pm, formatted_error)
label_list.append(label)
labels = tuple(mpcb.flatten(label_list))
return labels
def make_parinfo(self, quiet=True,
npeaks=1,
params=(20,20,14,1.0,0.0,0.5), parnames=None,
fixed=(False,False,False,False,False,False),
limitedmin=(True,True,True,True,False,True),
limitedmax=(False,False,True,False,False,True),
minpars=(TCMB,TCMB,5,0,0,0),
maxpars=(0,0,25,0,0,1),
tied=('',)*6,
max_tem_step=1.,
**kwargs
):
if not quiet:
log.info("Creating a 'parinfo' from guesses.")
self.npars = int(len(params) / npeaks)
if len(params) != npeaks and (len(params) / self.npars) > npeaks:
npeaks = len(params) / self.npars
npeaks = self.npeaks = int(npeaks)
if isinstance(params,np.ndarray):
params=params.tolist()
# this is actually a hack, even though it's decently elegant
# somehow, parnames was being changed WITHOUT being passed as a variable
# this doesn't make sense - at all - but it happened.
# (it is possible for self.parnames to have npars*npeaks elements where
# npeaks > 1 coming into this function even though only 6 pars are specified;
# _default_parnames is the workaround)
if parnames is None:
parnames = copy.copy(self._default_parnames)
partype_dict = dict(zip(['params', 'parnames', 'fixed',
'limitedmin', 'limitedmax', 'minpars',
'maxpars', 'tied'],
[params, parnames, fixed, limitedmin,
limitedmax, minpars, maxpars, tied]))
# make sure all various things are the right length; if they're
# not, fix them using the defaults
# (you can put in guesses of length 12 but leave the rest length 6;
# this code then doubles the length of everything else)
for partype,parlist in iteritems(partype_dict):
if len(parlist) != self.npars*self.npeaks:
# if you leave the defaults, or enter something that can be
# multiplied by npars to get to the right number of
# gaussians, it will just replicate
if len(parlist) == self.npars:
partype_dict[partype] *= npeaks
elif len(parlist) > self.npars:
# DANGER: THIS SHOULD NOT HAPPEN!
log.warning("WARNING! Input parameters were longer than allowed for variable {0}".format(parlist))
partype_dict[partype] = partype_dict[partype][:self.npars]
elif parlist==params: # this instance shouldn't really be possible
partype_dict[partype] = [20,20,1e10,1.0,0.0,0.5] * npeaks
elif parlist==fixed:
partype_dict[partype] = [False] * len(params)
elif parlist==limitedmax: # only fortho, fillingfraction have upper limits
partype_dict[partype] = (np.array(parnames) == 'fortho') + (np.array(parnames) == 'fillingfraction')
elif parlist==limitedmin: # no physical values can be negative except velocity
partype_dict[partype] = (np.array(parnames) != 'xoff_v')
elif parlist==minpars:
# all have minima of zero except kinetic temperature, which can't be below CMB.
# Excitation temperature technically can be, but not in this model
partype_dict[partype] = ((np.array(parnames) == 'trot') + (np.array(parnames) == 'tex')) * TCMB
elif parlist==maxpars: # fractions have upper limits of 1.0
partype_dict[partype] = ((np.array(parnames) == 'fortho') + (np.array(parnames) == 'fillingfraction')).astype('float')
elif parlist==parnames: # assumes the right number of parnames (essential)
partype_dict[partype] = list(parnames) * self.npeaks
elif parlist==tied:
partype_dict[partype] = [_increment_string_number(t, ii*self.npars)
for t in tied
for ii in range(self.npeaks)]
if len(parnames) != len(partype_dict['params']):
raise ValueError("Wrong array lengths AFTER fixing them")
# used in components. Is this just a hack?
self.parnames = partype_dict['parnames']
parinfo = [{'n':ii, 'value':partype_dict['params'][ii],
'limits':[partype_dict['minpars'][ii],partype_dict['maxpars'][ii]],
'limited':[partype_dict['limitedmin'][ii],partype_dict['limitedmax'][ii]], 'fixed':partype_dict['fixed'][ii],
'parname':partype_dict['parnames'][ii]+str(int(ii/int(self.npars))),
'tied':partype_dict['tied'][ii],
'mpmaxstep':max_tem_step*float(partype_dict['parnames'][ii] in ('tex','trot')), # must force small steps in temperature (True = 1.0)
'error': 0}
for ii in range(len(partype_dict['params']))
]
# hack: remove 'fixed' pars
#parinfo_with_fixed = parinfo
#parinfo = [p for p in parinfo_with_fixed if not p['fixed']]
#fixed_kwargs = dict((p['parname'].strip("0123456789").lower(),
# p['value'])
# for p in parinfo_with_fixed if p['fixed'])
# don't do this - it breaks the NEXT call because npars != len(parnames) self.parnames = [p['parname'] for p in parinfo]
# this is OK - not a permanent change
#parnames = [p['parname'] for p in parinfo]
# not OK self.npars = len(parinfo)/self.npeaks
parinfo = ParinfoList([Parinfo(p) for p in parinfo], preserve_order=True)
#import pdb; pdb.set_trace()
return parinfo
def _validate_parinfo(self,
must_be_limited={'trot': [True,False],
'tex': [False,False],
'ntot': [True, True],
'width': [True, False],
'xoff_v': [False, False],
'tau': [False, False],
'fortho': [True, True]},
required_limits={'trot': [0, None],
'ntot': [5, 25],
'width': [0, None],
'fortho': [0,1]}):
"""
Make sure the input parameters are all legitimate
"""
for par in self.parinfo:
limited = par.limited
parname = par.parname.strip(string.digits).lower()
mbl = must_be_limited[parname]
for a,b,ul in zip(limited, mbl, ('a lower','an upper')):
if b and not a:
raise ValueError("Parameter {0} must have {1} limit "
"but no such limit is set.".format(
parname, ul))
if parname in required_limits:
limits = par.limits
rlimits = required_limits[parname]
for a,b,op,ul in zip(limits, rlimits, (operator.lt,
operator.gt),
('a lower','an upper')):
if b is not None and op(a,b):
raise ValueError("Parameter {0} must have {1} limit "
"at least {2} but it is set to {3}."
.format(parname, ul, b, a))
[docs] def parse_3par_guesses(self, guesses):
"""
Try to convert a set of interactive guesses (peak, center, width) into
guesses appropriate to the model.
For NH3 models, we add in several extra parameters:
tex = 2.73 * peak
trot = tex * 2
fortho = 0.5
ntot = 15
ntot is set to a constant ~10^15 because this results in optical depths
near 1, so it forces the emission to be approximately significant.
trot > tex so that we're in a physical regime to begin with.
We assume tex = peak + 2.73 because most spectra are shown
background-subtracted (single dish are always that way, interferometric
data are intrinsically that way...) and otherwise the guessing will
crash if you guess a number < 2.73.
"""
gauss_npars = 3
if len(guesses) % gauss_npars != 0:
raise ValueError("Guesses passed to parse_3par_guesses must have "
"length % 3 == 0")
npeaks = len(guesses) // gauss_npars
npars = 6
new_guesses = [-1, -1, 15, -1, -1, 0.5] * npeaks
for ii in range(npeaks):
peak = guesses[ii * gauss_npars + 0]
center = guesses[ii * gauss_npars + 1]
width = guesses[ii * gauss_npars + 2]
new_guesses[ii*npars + 0] = (2.73+peak) * 2
new_guesses[ii*npars + 1] = (2.73+peak)
new_guesses[ii*npars + 3] = width
new_guesses[ii*npars + 4] = center
return new_guesses
[docs]class ammonia_model_vtau(ammonia_model):
def __init__(self,
parnames=['trot', 'tex', 'tau', 'width', 'xoff_v', 'fortho'],
**kwargs):
super(ammonia_model_vtau, self).__init__(parnames=parnames,
**kwargs)
[docs] def moments(self, Xax, data, negamp=None, veryverbose=False, **kwargs):
"""
Returns a very simple and likely incorrect guess
"""
# trot, TEX, ntot, width, center, ortho fraction
return [20, 10, 10, 1.0, 0.0, 1.0]
def _validate_parinfo(self,
must_be_limited={'trot': [True,False],
'tex': [False,False],
'tau': [True, False],
'width': [True, False],
'xoff_v': [False, False],
'fortho': [True, True]},
required_limits={'trot': [0, None],
'tex': [None,None],
'width': [0, None],
'tau': [0, None],
'xoff_v': [None,None],
'fortho': [0,1]}):
supes = super(ammonia_model_vtau, self)
supes._validate_parinfo(must_be_limited=must_be_limited,
required_limits=required_limits)
return supes
[docs] def make_parinfo(self,
params=(20,14,0.5,1.0,0.0,0.5),
fixed=(False,False,False,False,False,False),
limitedmin=(True,True,True,True,False,True),
limitedmax=(False,False,False,False,False,True),
minpars=(TCMB,TCMB,0,0,0,0),
maxpars=(0,0,0,0,0,1),
tied=('',)*6,
**kwargs
):
"""
parnames=['trot', 'tex', 'tau', 'width', 'xoff_v', 'fortho']
"""
return super(ammonia_model_vtau, self).make_parinfo(params=params,
fixed=fixed,
limitedmax=limitedmax,
limitedmin=limitedmin,
minpars=minpars,
maxpars=maxpars,
tied=tied,
**kwargs)
[docs]class ammonia_model_vtau_thin(ammonia_model_vtau):
def __init__(self,parnames=['tkin', 'tau', 'width', 'xoff_v', 'fortho'],
**kwargs):
super(ammonia_model_vtau_thin, self).__init__(parnames=parnames,
npars=5,
**kwargs)
self.modelfunc = ammonia_thin
def _validate_parinfo(self,
must_be_limited={'tkin': [True,False],
'tex': [False,False],
'ntot': [True, True],
'width': [True, False],
'xoff_v': [False, False],
'tau': [False, False],
'fortho': [True, True]},
required_limits={'tkin': [0, None],
'ntot': [5, 25],
'width': [0, None],
'fortho': [0,1]}):
supes = super(ammonia_model_vtau_thin, self)
return supes._validate_parinfo(must_be_limited=must_be_limited,
required_limits=required_limits)
[docs] def moments(self, Xax, data, negamp=None, veryverbose=False, **kwargs):
"""
Returns a very simple and likely incorrect guess
"""
# trot, tau, width, center, ortho fraction
return [20, 1, 1.0, 0.0, 1.0]
def __call__(self,*args,**kwargs):
return self.multinh3fit(*args, **kwargs)
[docs] def make_parinfo(self,
params=(20,14,1.0,0.0,0.5),
fixed=(False,False,False,False,False),
limitedmin=(True,True,True,False,True),
limitedmax=(False,False,False,False,True),
minpars=(TCMB,0,0,0,0),
maxpars=(0,0,0,0,1),
tied=('',)*5,
**kwargs
):
return super(ammonia_model_vtau_thin, self).make_parinfo(params=params,
fixed=fixed,
limitedmax=limitedmax,
limitedmin=limitedmin,
minpars=minpars,
maxpars=maxpars,
tied=tied,
**kwargs)
[docs]class ammonia_model_background(ammonia_model):
def __init__(self,**kwargs):
super(ammonia_model_background,self).__init__(npars=7,
parnames=['trot', 'tex',
'ntot',
'width',
'xoff_v',
'fortho',
'background_tb'])
[docs] def moments(self, Xax, data, negamp=None, veryverbose=False, **kwargs):
"""
Returns a very simple and likely incorrect guess
"""
# trot, TEX, ntot, width, center, ortho fraction
return [20,10, 10, 1.0, 0.0, 1.0, TCMB]
def make_parinfo(self, npeaks=1, err=None,
params=(20,20,14,1.0,0.0,0.5,TCMB), parnames=None,
fixed=(False,False,False,False,False,False,True),
limitedmin=(True,True,True,True,False,True,True),
limitedmax=(False,False,False,False,False,True,True),
minpars=(TCMB,TCMB,0,0,0,0,TCMB), parinfo=None,
maxpars=(0,0,0,0,0,1,TCMB),
tied=('',)*7,
quiet=True, shh=True,
veryverbose=False, **kwargs):
return super(ammonia_model_background,
self).make_parinfo(npeaks=npeaks, err=err, params=params,
parnames=parnames, fixed=fixed,
limitedmin=limitedmin,
limitedmax=limitedmax, minpars=minpars,
parinfo=parinfo, maxpars=maxpars,
tied=tied, quiet=quiet, shh=shh,
veryverbose=veryverbose, **kwargs)
[docs] def multinh3fit(self, xax, data, npeaks=1, err=None,
params=(20,20,14,1.0,0.0,0.5,TCMB), parnames=None,
fixed=(False,False,False,False,False,False,True),
limitedmin=(True,True,True,True,False,True,True),
limitedmax=(False,False,False,False,False,True,True),
minpars=(TCMB,TCMB,0,0,0,0,TCMB), parinfo=None,
maxpars=(0,0,0,0,0,1,TCMB),
tied=('',)*7,
quiet=True, shh=True,
veryverbose=False, **kwargs):
return super(ammonia_model_background,
self).multinh3fit(xax, data, npeaks=npeaks, err=err,
params=params, parnames=parnames,
fixed=fixed, limitedmin=limitedmin,
limitedmax=limitedmax, minpars=minpars,
parinfo=parinfo, maxpars=maxpars,
tied=tied, quiet=quiet, shh=shh,
veryverbose=veryverbose, **kwargs)
[docs]class cold_ammonia_model(ammonia_model):
def __init__(self,
parnames=['tkin', 'tex', 'ntot', 'width', 'xoff_v', 'fortho'],
**kwargs):
super(cold_ammonia_model, self).__init__(parnames=parnames, **kwargs)
self.modelfunc = cold_ammonia
def _validate_parinfo(self,
must_be_limited={'tkin': [True,False],
'tex': [False,False],
'ntot': [True, False],
'width': [True, False],
'xoff_v': [False, False],
'fortho': [True, True]},
required_limits={'tkin': [0, None],
'tex': [None,None],
'width': [0, None],
'ntot': [0, None],
'xoff_v': [None,None],
'fortho': [0,1]}):
supes = super(cold_ammonia_model, self)
return supes._validate_parinfo(must_be_limited=must_be_limited,
required_limits=required_limits)
[docs]class ammonia_model_restricted_tex(ammonia_model):
"""
Ammonia model with an explicitly restricted excitation temperature
such that tex <= trot, set by the "delta" parameter (tex = trot - delta)
with delta > 0. You can choose the ammonia funciton when you initialize
it (e.g., ``ammonia_model_restricted_tex(ammonia_func=ammonia)`` or
``ammonia_model_restricted_tex(ammonia_func=cold_ammonia)``)
"""
def __init__(self,
parnames=['trot', 'tex', 'ntot', 'width', 'xoff_v', 'fortho',
'delta'],
ammonia_func=ammonia,
**kwargs):
super(ammonia_model_restricted_tex, self).__init__(npars=7,
parnames=parnames,
**kwargs)
def ammonia_dtex(*args, **kwargs):
"""
Strip out the 'delta' keyword
"""
# for py2 compatibility, must strip out manually
delta = kwargs.pop('delta') if 'delta' in kwargs else None
try:
np.testing.assert_allclose(kwargs['trot'] - kwargs['tex'],
delta)
except AssertionError:
raise ValueError('trot - tex != delta, even though that was specified.'
f"trot={kwargs['trot']}, tex={kwargs['tex']}, delta={delta}, trot-text={kwargs['trot']-kwargs['tex']}")
return ammonia_func(*args, **kwargs)
self.modelfunc = ammonia_dtex
[docs] def n_ammonia(self, pars=None, parnames=None, **kwargs):
if parnames is not None:
for ii,pn in enumerate(parnames):
if ii % 7 == 1 and 'tex' not in pn:
raise ValueError('bad parameter names')
if ii % 7 == 6 and 'delta' not in pn:
raise ValueError('bad parameter names')
if pars is not None:
assert len(pars) % 7 == 0
for ii in range(int(len(pars)/7)):
try:
# Case A: they're param objects
# (setting the param directly can result in recursion errors)
pars[1+ii*7].value = pars[0+ii*7].value - pars[6+ii*7].value
except AttributeError:
# Case B: they're just lists of values
pars[1+ii*7] = pars[0+ii*7] - pars[6+ii*7]
supes = super(ammonia_model_restricted_tex, self)
return supes.n_ammonia(pars=pars, parnames=parnames, **kwargs)
def _validate_parinfo(self,
must_be_limited={'trot': [True,False],
'tex': [False,False],
'ntot': [True, False],
'width': [True, False],
'xoff_v': [False, False],
'fortho': [True, True],
'delta': [True, False],
},
required_limits={'trot': [0, None],
'tex': [None,None],
'width': [0, None],
'ntot': [0, None],
'xoff_v': [None,None],
'fortho': [0,1],
'delta': [0, None],
}):
supes = super(ammonia_model_restricted_tex, self)
return supes._validate_parinfo(must_be_limited=must_be_limited,
required_limits=required_limits)
[docs] def make_parinfo(self,
params=(20,20,0.5,1.0,0.0,0.5,0),
fixed=(False,False,False,False,False,False,False),
limitedmin=(True,True,True,True,False,True,True),
limitedmax=(False,False,False,False,False,True,False),
minpars=(TCMB,TCMB,0,0,0,0,0),
maxpars=(0,0,0,0,0,1,0),
tied=('','p[0]-p[6]','','','','',''),
**kwargs
):
"""
parnames=['trot', 'tex', 'ntot', 'width', 'xoff_v', 'fortho', 'delta']
'delta' is the difference between tex and trot
"""
supes = super(ammonia_model_restricted_tex, self)
return supes.make_parinfo(params=params, fixed=fixed,
limitedmax=limitedmax, limitedmin=limitedmin,
minpars=minpars, maxpars=maxpars, tied=tied,
**kwargs)
def _increment_string_number(st, count):
"""
Increment a number in a string
Expects input of the form: p[6]
"""
import re
dig = re.compile('[0-9]+')
if dig.search(st):
n = int(dig.search(st).group())
result = dig.sub(str(n+count), st)
return result
else:
return st