Source code for pyspeckit.spectrum.readers.fits_reader

from __future__ import print_function
try:
    import astropy.io.fits as pyfits
except ImportError:
    import pyfits
import numpy.ma as ma
import numpy as np
from six import operator

from .. import units
from . import make_axis
from ...specwarnings import warn

[docs]def open_1d_fits(filename, hdu=0, **kwargs): """ Grabs all the relevant pieces of a simple FITS-compliant 1d spectrum Inputs: wcstype - the suffix on the WCS type to get to velocity/frequency/whatever specnum - Which # spectrum, along the y-axis, is the data? errspecnum - Which # spectrum, along the y-axis, is the error spectrum? """ # try to open as an HDU... if all((hasattr(filename,k) for k in ('data','header','_header'))): f = [filename] hdu = 0 elif isinstance(filename,pyfits.HDUList): f = filename else: f = pyfits.open(filename, ignore_missing_end=True) return open_1d_pyfits(f[hdu],**kwargs)
[docs]def open_1d_pyfits(pyfits_hdu, specnum=0, wcstype='', specaxis="1", errspecnum=None, autofix=True, scale_keyword=None, scale_action=operator.truediv, verbose=False, apnum=0, **kwargs): """ This is open_1d_fits but for a pyfits_hdu so you don't necessarily have to open a fits file """ # force things that will be treated as strings to be strings # this is primarily to avoid problems with variables being passed as unicode wcstype = str(wcstype) specaxis = str(specaxis) hdr = pyfits_hdu._header if autofix: for card in hdr.cards: try: if verbose: card.verify('fix') else: card.verify('silentfix') except pyfits.VerifyError: del hdr[card.keyword] data = pyfits_hdu.data with np.errstate(invalid='ignore'): # silently turn signalling nans into quiet nans data[np.isnan(data)] = np.nan # search for the correct axis (may be 1 or 3, unlikely to be 2 or others) # 1 = 1D spectrum # 3 = "3D" spectrum with a single x,y point (e.g., JCMT smurf/makecube) if hdr.get('NAXIS') > 1: for ii in range(1,hdr.get('NAXIS')+1): ctype = hdr.get('CTYPE%i'%ii) if ctype in units.xtype_dict: specaxis="%i" % ii if hdr.get('NAXIS') == 2: if hdr.get('WAT0_001') is not None: if 'multispec' in hdr.get('WAT0_001'): # treat as an Echelle spectrum from IRAF warn(""" This looks like an Echelle spectrum. You may want to load it using pyspeckit.wrappers.load_IRAF_multispec. The file will still be successfully read if you continue, but the plotting and fitting packages will run into errors.""") return read_echelle(pyfits_hdu) if isinstance(specnum,list): # allow averaging of multiple spectra (this should be modified # - each spectrum should be a Spectrum instance) spec = ma.array(data[specnum,:]).mean(axis=0) elif isinstance(specnum,int): spec = ma.array(data[specnum,:]).squeeze() else: raise TypeError( "Specnum is of wrong type (not a list of integers or an integer)." + " Type: %s" % str(type(specnum))) if errspecnum is not None: # SDSS supplies weights, not errors. if hdr.get('TELESCOP') == 'SDSS 2.5-M': errspec = 1. / np.sqrt(ma.array(data[errspecnum,:]).squeeze()) else: errspec = ma.array(data[errspecnum,:]).squeeze() else: errspec = spec*0 # set error spectrum to zero if it's not in the data elif hdr.get('NAXIS') > 2: if hdr.get('BANDID2'): # this is an IRAF .ms.fits file with a 'background' in the 3rd dimension spec = ma.array(data[specnum,apnum,:]).squeeze() else: for ii in range(1,hdr.get('NAXIS')+1): # only fail if extra axes have more than one row if hdr.get('NAXIS%i' % ii) > 1 and (ii != int(specaxis)): raise ValueError("Too many axes for open_1d_fits") spec = ma.array(data).squeeze() if errspecnum is None: errspec = spec*0 # set error spectrum to zero if it's not in the data else: spec = ma.array(data).squeeze() if errspecnum is None: errspec = spec*0 # set error spectrum to zero if it's not in the data if scale_keyword is not None: try: print(("Found SCALE keyword %s. Using %s to scale it" % (scale_keyword,scale_action))) scaleval = hdr[scale_keyword] spec = scale_action(spec,scaleval) errspec = scale_action(errspec,scaleval) except (ValueError, KeyError) as e: pass xarr = None if hdr.get('ORIGIN') == 'CLASS-Grenoble': # Use the CLASS FITS definition (which is non-standard) # http://iram.fr/IRAMFR/GILDAS/doc/html/class-html/node84.html # F(n) = RESTFREQ + CRVALi + ( n - CRPIXi ) * CDELTi if verbose: print("Loading a CLASS .fits spectrum") # there is no reason to assume CDELT is wrong dv = -1*hdr.get('CDELT1') dv = hdr.get('CDELT1') if hdr.get('RESTFREQ'): v0 = hdr.get('RESTFREQ') + hdr.get('CRVAL1') elif hdr.get('RESTF'): v0 = hdr.get('RESTF') + hdr.get('CRVAL1') else: warn("CLASS file does not have RESTF or RESTFREQ") p3 = hdr.get('CRPIX1') elif hdr.get(str('CD%s_%s%s' % (specaxis,specaxis,wcstype))): dv = hdr['CD%s_%s%s' % (specaxis,specaxis,wcstype)] v0 = hdr['CRVAL%s%s' % (specaxis,wcstype)] p3 = hdr['CRPIX%s%s' % (specaxis,wcstype)] hdr['CDELT%s' % specaxis] = dv if verbose: print(("Using the FITS CD matrix. PIX=%f VAL=%f DELT=%f" % (p3,v0,dv))) elif hdr.get(str('CDELT%s%s' % (specaxis,wcstype))): if hdr.get(str('PC{0}_{0}'.format(specaxis))): dv = hdr['CDELT%s%s' % (specaxis,wcstype)] * hdr.get(str('PC{0}_{0}'.format(specaxis))) else: dv = hdr['CDELT%s%s' % (specaxis,wcstype)] v0 = hdr['CRVAL%s%s' % (specaxis,wcstype)] p3 = hdr['CRPIX%s%s' % (specaxis,wcstype)] if verbose: print(("Using the FITS CDELT value. PIX=%f VAL=%f DELT=%f" % (p3,v0,dv))) elif len(data.shape) > 1: if verbose: print("No CDELT or CD in header. Assuming 2D input with 1st line representing the spectral axis.") # try assuming first axis is X axis if hdr.get('CUNIT%s%s' % (specaxis,wcstype)): xarr = data[0,:] spec = data[1,:] if data.shape[0] > 2: errspec = data[2,:] else: raise TypeError("Don't know what type of FITS file you've input; "+ "its header is not FITS compliant and it doesn't look like it "+ "was written by pyspeckit.") # Deal with logarithmic wavelength binning if necessary if xarr is None: if hdr.get('WFITTYPE') == 'LOG-LINEAR': xconv = lambda v: 10**((v-p3+1)*dv+v0) xarr = xconv(np.arange(len(spec))) else: xconv = lambda v: ((v-p3+1)*dv+v0) xarr = xconv(np.arange(len(spec))) # need to do something with this... restfreq = hdr.get('RESTFREQ') if restfreq is None: restfreq= hdr.get('RESTFRQ') XAxis = make_axis(xarr,hdr,wcstype=wcstype,specaxis=specaxis,**kwargs) return spec,errspec,XAxis,hdr
def _get_WATS(hdr): """ Get the WATS from an IRAF Echelle header """ WAT1_dict = dict( [s.split('=') for s in hdr.get("WAT1_001").split()] ) # hdr.get does not preserve whitespace, but whitespace is ESSENTIAL here! WAT_string = "" ii = 0 while (hdr.get("WAT2_%03i" % (ii+1))) is not None: WAT_string += (hdr.get("WAT2_%03i" % (ii+1)) + " "*(68-len(hdr.get("WAT2_%03i" % (ii+1))))) ii += 1 WAT_list = WAT_string.split("spec") if "multi" in WAT_list[0]: WAT_list.pop(0) if ' ' in WAT_list: WAT_list.remove(' ') if '' in WAT_list: WAT_list.remove('') specaxdict = dict( [ (int(s.split("=")[0]),s.split("=")[1]) for s in WAT_list ] ) return WAT1_dict,specaxdict def make_linear_axis(hdr, axsplit, WAT1_dict): num,beam,dtype,crval,cdelt,naxis,z,aplow,aphigh = axsplit[:9] # this is a hack for cropped spectra... #print "header naxis: %i, WAT naxis: %i" % (hdr['NAXIS1'], int(naxis)) if hdr['NAXIS1'] != int(naxis): naxis = hdr['NAXIS1'] crpix = hdr.get('CRPIX1') warn("Treating as cropped echelle spectrum.") else: crpix = 0 if len(axsplit) > 9: functions = axsplit[9:] warn("Found but did not use functions %s" % str(functions)) if int(dtype) == 0: # Linear dispersion (eq 2, p.5 from Valdez, linked above) xax = ((float(crval) + float(cdelt) * (np.arange(int(naxis)) + 1 - crpix)) / (1.+float(z))) else: raise ValueError("Unrecognized LINEAR dispersion in IRAF Echelle specification") headerkws = {'CRPIX1':1, 'CRVAL1':crval, 'CDELT1':cdelt,'NAXIS1':naxis, 'NAXIS':1, 'REDSHIFT':z, 'CTYPE1':'wavelength', 'CUNIT1':WAT1_dict['units']} return xax, naxis, headerkws def make_multispec_axis(hdr, axsplit, WAT1_dict): num,beam,dtype,crval,cdelt,naxis,z,aplow,aphigh = axsplit[:9] # this is a hack for cropped spectra... #print "header naxis: %i, WAT naxis: %i" % (hdr['NAXIS1'], int(naxis)) if hdr['NAXIS1'] != int(naxis): crpix = int(naxis) - hdr['NAXIS'] naxis = hdr['NAXIS1'] warn("Treating as cropped echelle spectrum.") else: crpix = 0 if len(axsplit) > 9: functions = axsplit[9:] warn("Found but did not use functions %s" % str(functions)) if int(dtype) == 0: # Linear dispersion (eq 11, p.9 from Valdez, linked above) xax = (float(crval) + float(cdelt) * (np.arange(int(naxis)))) / (1.+float(z)) elif int(dtype) == 1: # Log-linear dispersion (eq 12, p.9 from Valdez, linked above) xax = 10.**(float(crval) + float(cdelt) * (np.arange(int(naxis)))) / (1.+float(z)) # elif int(dtype) == 2: # Non-linear dispersion # elif int(dtype) == -1: # Data is not dispersion coords else: raise ValueError("Unrecognized MULTISPE dispersion in IRAF Echelle specification") headerkws = {'CRPIX1':1, 'CRVAL1':crval, 'CDELT1':cdelt, 'NAXIS1':naxis, 'NAXIS':1, 'REDSHIFT':z, 'CTYPE1':'wavelength', 'CUNIT1':WAT1_dict['units']} return xax, naxis, headerkws
[docs]def read_echelle(pyfits_hdu): """ Read an IRAF Echelle spectrum http://iraf.noao.edu/iraf/ftp/iraf/docs/specwcs.ps.Z """ hdr = pyfits_hdu.header WAT1_dict, specaxdict = _get_WATS(hdr) x_axes = [] for specnum, axstring in specaxdict.items(): axsplit = axstring.replace('"','').split() if specnum != int(axsplit[0]): raise ValueError("Mismatch in IRAF Echelle specification") num,beam,dtype,crval,cdelt,naxis,z,aplow,aphigh = axsplit[:9] if hdr['CTYPE1'] == 'LINEAR': xax,naxis,headerkws = make_linear_axis(hdr, axsplit, WAT1_dict) elif hdr['CTYPE1'] == 'MULTISPE': xax,naxis,headerkws = make_multispec_axis(hdr, axsplit, WAT1_dict) cards = [pyfits.Card(k,v) for (k,v) in headerkws.items()] header = pyfits.Header(cards) xarr = make_axis(xax,header) x_axes.append(xarr) data = pyfits_hdu.data with np.errstate(invalid='ignore'): data[np.isnan(data)] = np.nan return data, np.zeros_like(data), units.EchelleAxes(x_axes), hdr