Source code for typhon.physics.units.em

# -*- coding: utf-8 -*-

"""Module for anything related to the electromagnetic spectrum.

This module imports typhon.physics.units.common and therefore
has a soft dependency on the pint units library.
"""

# Any commits made to this module between 2015-05-01 and 2017-03-01
# by Gerrit Holl are developed for the EC project “Fidelity and
# Uncertainty in Climate Data Records from Earth Observations (FIDUCEO)”.
# Grant agreement: 638822
# 
# All those contributions are dual-licensed under the MIT license for use
# in typhon, and the GNU General Public License version 3.

import warnings
import logging

import numpy
import scipy.interpolate

import numexpr
import pint
import xarray


from typhon import config
from typhon.constants import (h, k, c)
from typhon.physics.units.common import (ureg, radiance_units)
from typhon.physics.units.tools import UnitsAwareDataArray as UADA


logger = logging.getLogger(__name__)


__all__ = [
    'FwmuMixin',
    'SRF',
    'planck_f',
    'specrad_wavenumber2frequency',
    'specrad_frequency_to_planck_bt',
    ]


[docs]class FwmuMixin: """Mixing for frequency/wavelength/wavenumber neutrality Best to use pint ureg quantities at all times. """ _frequency = None _wavenumber = None _wavelength = None @property def frequency(self): return self._frequency @frequency.setter def frequency(self, value): try: self._frequency = value.to(ureg.Hz, "sp") except AttributeError: value = value * ureg.Hz self._frequency = value self._wavenumber = value.to(1 / ureg.centimeter, "sp") self._wavelength = value.to(ureg.metre, "sp") @property def wavenumber(self): return self._wavenumber @wavenumber.setter def wavenumber(self, value): try: self._wavenumber = value.to(1 / ureg.centimeter, "sp") except AttributeError: value = value * 1 / ureg.centimeter self._wavenumber = value self._frequency = value.to(ureg.Hz, "sp") self._wavelength = value.to(ureg.metre, "sp") @property def wavelength(self): return self._wavelength @wavelength.setter def wavelength(self, value): try: self._wavelength = value.to(ureg.metre, "sp") except AttributeError: value = value * ureg.meter self._wavelength = value self._frequency = value.to(ureg.hertz, "sp") self._wavenumber = value.to(1 / ureg.centimeter, "sp")
[docs]class SRF(FwmuMixin): """Respresents a spectral response function TODO: representation of uncertainties """ T_lookup_table = numpy.arange(0, 500.01, 0.05) * ureg.K lookup_table = None L_to_T = None
[docs] def __init__(self, f, W): """Initialise SRF object. You can either initialise an SRF from scratch, or use the classmethod `fromArtsXML` to read it from a file. A toy example on initiating it from scratch: >>> from typhon.physics.units.common import ureg >>> from typhon.physics.units.em import SRF >>> srf = SRF(ureg.Quantity(numpy.array([200, 200.1, 200.2, 200.3, 200.4, 200.5]), 'GHz'), numpy.array([0, 0.5, 1, 1, 0.5, 0])) >>> R_300 = srf.blackbody_radiance(ureg.Quantity(300, 'K')) >>> print(R_300) [ 3.63716781e-15] watt / hertz / meter ** 2 / steradian >>> print(R_300.to("K", "radiance", srf=srf)) [ 300.] kelvin You can also pass in other spectroscopic units (wavenumber, wavelength) that will be converted internally to frequency: >>> srf = SRF(ureg.Quantity(numpy.array([10.8, 10.9, 11.0, 11.1, 11.2, 11.3]), 'um'), numpy.array([0, 0.5, 1, 1, 0.5, 0])) >>> R_300 = srf.blackbody_radiance(ureg.Quantity(numpy.atleast_1d(250), 'K')) >>> print(R_300) [ 1.61922509e-12] watt / hertz / meter ** 2 / steradian >>> print(R_300.to("cm * mW / m**2 / sr", "radiance")) [ 48.54314703] centimeter * milliwatt / meter ** 2 / steradian >>> print(R_300.to("K", "radiance", srf=srf)) [ 300.] kelvin :param ndarray f: Array of frequencies. Can be either a pure ndarray, which will be assumed to be in Hz, or a ureg quantity. :param ndarray W: Array of associated weights. """ try: self.frequency = f.to("Hz", "sp") except AttributeError: self.frequency = ureg.Quantity(f, "Hz") self.W = W
def __repr__(self): """Get string representation. Uses centroid. """ cf = self.centroid() if cf.to("Hz", "sp").m > 3e12: s = cf.to(ureg.um, "sp") else: s = cf.to(ureg.GHz, "sp") return "<{:s}: {:.4~}>".format(self.__class__.__name__, s)
[docs] @classmethod def fromArtsXML(cls, sat, instr, ch): """Read SRF from ArtsXML files. Requires that in the TYPHONRC configuration file, the fields `srf_backend_f` and `srf_backend_response` in the section corresponding to instrument `instr` are defined to point to the respective files in ArtsXML format. Within those definitions, {sat:s} will be substituted with the satellite name. For example, in typhonrc, one might have: [hirs] srf_backend_response = /path/to/{sat}_HIRS.backend_channel_response.xml srf_backend_f = /path/to/{sat}_HIRS.f_backend.xml so that we can do: >>> srf = SRF.fromArtsXML("NOAA15", "hirs", 12) >>> R_300 = srf.blackbody_radiance(ureg.Quantity(atleast_1d(250), 'K')) >>> print(R_300) [ 2.13002925e-13] watt / hertz / meter ** 2 / steradian >>> print(R_300.to("cm * mW / m**2 / sr", "radiance")) [ 6.38566704] centimeter * milliwatt / meter ** 2 / steradian Arguments: sat [str] Satellite name, such as 'NOAA15' instr [str] Instrument name, such as 'hirs'. ch [int] Channel number (start counting at 1). """ from pyarts import xml cf = config.conf[instr] centres = xml.load( cf["srf_backend_f"].format(sat=sat)) gfs = xml.load( cf["srf_backend_response"].format(sat=sat)) freq = gfs[ch - 1].grids[0] + centres[ch - 1] response = gfs[ch - 1].data return cls(freq, response)
[docs] @classmethod def fromRTTOV(cls, sat, instr, ch): """Read SRF from RTTOV format files. Requires that in the TYPHONRC configuration file, the fields 'srf_rttov' in the section corresponding to instrument `instr` are defined to point to the respective files in RTTOV format. Within those definitions, {sat:s} will be substituted with the satellite name and {ch:>02d} with the channel number. For example, in typhonrc, we might have: [hirs] srf_rttov = /path/to/rtcoef_{sat:s}_hirs-shifted_src_ch{ch:>02d}.txt Arguments: sat [str] Satellite name, such as 'noaa-15' instr [str] Instrument name, such as 'hirs'. ch [int] Channel number (start counting at 1). """ cf = config.conf[instr] M = numpy.loadtxt(cf["srf_rttov"].format(sat=sat, ch=ch), skiprows=4) wn = ureg.Quantity(M[:, 0], 1/ureg.cm) W = M[:, 1] return cls(wn, W)
[docs] def centroid(self): """Calculate centre frequency """ return numpy.average( self.frequency, weights=self.W) * self.frequency.units
[docs] def blackbody_radiance(self, T, spectral=True): """Calculate integrated radiance for blackbody at temperature T :param T: Temperature [K]. This can be either a python number, or a numpy ndarray, on a ureg quantity encompassing either. :param spectral: Parameter to control whether to return spectral radiance or radiance. See self.integrate_radiances for details. Returns quantity ndarray with blackbody radiance in desired unit. Note that this is an ndarray with dimension (1,) even if you passin a scalar. """ try: T = T.to("K") except AttributeError: T = ureg.Quantity(T, "K") T = ureg.Quantity(numpy.atleast_1d(T), T.u) # fails if T is multidimensional shp = T.shape return self.integrate_radiances( self.frequency, planck_f( self.frequency[numpy.newaxis, :], T.reshape((-1,))[:, numpy.newaxis]), spectral=spectral).reshape(shp)
[docs] def make_lookup_table(self): """Construct lookup table radiance <-> BT To convert a channel radiance to a brightness temperature, applying the (inverse) Planck function is not correct, because the Planck function applies to monochromatic radiances only. Instead, to convert from radiance (in W m^-2 sr^-1 Hz^-1) to brightness temperature, we use a lookup table. This lookup table is constructed by considering blackbodies at a range of temperatures, then calculating the channel radiance. This table can then be used to get a mapping from radiance to brightness temperature. This method does not return anything, but fill self.lookup_table. """ self.lookup_table = numpy.zeros( shape=(2, self.T_lookup_table.size), dtype=numpy.float64) self.lookup_table[0, :] = self.T_lookup_table self.lookup_table[1, :] = self.blackbody_radiance(self.T_lookup_table) self.L_to_T = scipy.interpolate.interp1d(self.lookup_table[1, :], self.lookup_table[0, :], kind='linear', bounds_error=False, fill_value=(0, 2000))
[docs] def integrate_radiances(self, f, L, spectral=True): """From a spectrum of radiances and a SRF, calculate channel (spectral) radiance The spectral response function may not be specified on the same grid as the spectrum of radiances. Therefore, this function interpolates the spectral response function onto the grid of the radiances. This is less bad than the reverse, because a spectral response function tends to be more smooth than a spectrum. **Approximations:** * Interpolation of spectral response function onto frequency grid on which radiances are specified. :param ndarray f: Frequencies for spectral radiances [Hz] :param ndarray L: Spectral radiances [W m^-2 sr^-1 Hz^-1]. Can be in radiance units of various kinds. Make sure this is consistent with the spectral response function. Innermost dimension must correspond to frequencies. :param bool spectral: If true, return spectral radiance [W m^-2 sr^-1 Hz^-1]. If false, return radiance [W m^-2 sr^-1]. Defaults to True. :returns: Channel (spectral) radiance according to 'spectral' """ # Interpolate onto common frequency grid. The spectral response # function is more smooth so less harmed by interpolation, so I # interpolate the SRF. fnc = scipy.interpolate.interp1d( self.frequency, self.W, bounds_error=False, fill_value=0.0) #w_on_L_grid = fnc(f) * (1 / ureg.Hz) w_on_L_grid = ureg.Quantity(fnc(f), ureg.dimensionless)# * (1 / ureg.Hz) df = ureg.Quantity(numpy.diff(f), f.u) w1p = w_on_L_grid[1:] L1p = L[:, 1:] # ch_BT = (w_on_L_grid * L_f).sum(-1) / (w_on_L_grid.sum()) # due to numexpr limitation, do sum seperately # and due to numexpr bug, explicitly consider zero-dim case # see https://github.com/pydata/numexpr/issues/229 if L.shape[0] == 0: ch_rad = numpy.empty(dtype="f8", shape=L.shape[:-1]) else: # ch_rad = numexpr.evaluate("sum(w_on_L_grid * L, {:d})".format( ch_rad = numexpr.evaluate("sum(w1p * L1p * df, {:d})".format( L.ndim - 1)) ch_rad = ureg.Quantity(ch_rad, w1p.u * L1p.u * df.u) if spectral: return ch_rad / (w1p*df).sum() else: return ch_rad
[docs] def channel_radiance2bt(self, L): """Convert channel radiance to brightness temperature Using the lookup table, convert channel radiance to brightness temperature. Will construct lookup table on first call. Typhon also registers a pint context “radiance” which can be used to convert between radiance units and brightness temperature (even though this is a different quantity), for example, by using L.to("K", "radiance", srf=srf) :param L: Radiance [W m^-2 sr^-1 Hz^-1] or compatible """ if self.lookup_table is None: self.make_lookup_table() return self.L_to_T( L.to(ureg.W / (ureg.m**2 * ureg.sr * ureg.Hz), "radiance")) * ureg.K
[docs] def estimate_band_coefficients(self, sat=None, instr=None, ch=None, include_shift=True): """Estimate band coefficients for fast/explicit BT calculations In some circumstances, a fully integrated SRF may be more expensive than needed. We can then choose an effective wavelength lambda_c along with coefficients alpha, beta such that instead of integrating, we estimate R = B(lambda*, T*), with T* = alpha + beta · T_B and lambda* a wavelength which may be close to the centroid lambda_c (but there is no guarantee). Such an approximation eliminates the explicit use of an integral which can make analysis easier. Returns: alpha (float): Offset in approximation for T* beta (float): Slope in approximation for T* lambda_eff (float): Effective wavelength delta_alpha (float): Uncertainty in alpha delta_beta (float): Uncertainty in beta delta_lambda_eff (float): Uncertainty in lambda_eff """ warnings.warn("Obtaining band coefficients from file", UserWarning) srcfile = config.conf[instr]["band_file"].format(sat=sat) if include_shift: rxp = r"(.{5,6})_ch(\d\d?)_shift([+-]\d+)pm\.nc\s+([\d.]+)\s+(-?[\de\-.]+)\s+([\d.]+)" dtp = [("satname", "S6"), ("channel", "u1"), ("shift", "i2"), ("centre", "f4"), ("alpha", "f4"), ("beta", "f4")] M = numpy.fromregex(srcfile, rxp, dtp).reshape(19, 7) dims = ("channel", "shiftno") ds = xarray.Dataset( {"centre": (dims, M["centre"]), "alpha": (dims, M["alpha"]), "beta": (dims, M["beta"]), "shift": (dims, M["shift"])}, coords = {"channel": M["channel"][:, 0]}) else: rxp = r"(.{5,6})_ch(\d\d?)_rttov\.nc\s+([\d.]+)\s+(-?[\de\-.]+)\s+([\d.]+)" dtp = [("satname", "S6"), ("channel", "u1"), ("centre", "f4"), ("alpha", "f4"), ("beta", "f4")] M = numpy.fromregex(srcfile, rxp, dtp) dims = ("channel",) ds = xarray.Dataset( {"centre": (dims, M["centre"]), "alpha": (dims, M["alpha"]), "beta": (dims, M["beta"])}, coords = {"channel": M["channel"]}) ds = ds.sel(channel=ch) if include_shift: ds0 = ds.sel(shiftno=0) # varies 1.1 – 15.2 nm depending on channel else: ds0 = ds lambda_c = UADA(ds0["centre"], attrs={"units": "1/cm"}) alpha = UADA(ds0["alpha"], attrs={"units": "K"}) beta = UADA(ds0["beta"], attrs={"units": "1"}) if include_shift: delta_ds = ds.sel(shiftno=1) - ds0 delta_lambda_c = abs(UADA(delta_ds["centre"], attrs={"units": "1/cm"})) delta_alpha = abs(UADA(delta_ds["alpha"], attrs={"units": "K"})) delta_beta = abs(UADA(delta_ds["beta"], attrs={"units": "1"})) else: # workaround for https://github.com/pydata/xarray/issues/2542 # by passing to UADA delta_alpha = UADA(xarray.zeros_like(alpha)) delta_alpha.attrs["units"] = "K" delta_beta = UADA(xarray.zeros_like(alpha)) delta_beta.attrs["units"] = "1" delta_lambda_c = UADA(xarray.zeros_like(alpha)) delta_lambda_c.attrs["units"] = "1/cm" return (alpha, beta, lambda_c, delta_alpha, delta_beta, delta_lambda_c)
# Methods returning new SRFs with some changes
[docs] def shift(self, amount): """Get new SRF, shifted by <amount> Return a new SRF, shifted by <amount> Hz. The shape of the SRF is retained. Arguments: Quantity amount: Distance to shift SRF """ return self.__class__(self.frequency.to( amount.u, "sp") + amount, self.W)
[docs] def as_dataarray(self, coordinate): """Return xarray.DataArray object. Coordinate can be "wavelength" (which will be in m), "frequency" (which will be in Hz), or "wavenumber" (which will be in 1/cm). """ return xarray.DataArray(self.W, dims=(coordinate,), coords={coordinate: getattr(self, coordinate)}, name="SRF")
_specrad_freq = ureg.W / (ureg.m**2 * ureg.sr * ureg.Hz)
[docs]def planck_f(f, T): """Planck law expressed in frequency. If more than 10⁵ resulting radiances, uses numexpr. :param f: Frequency. Quantity in [Hz] :param T: Temperature. Quantity in [K] """ # try: # f = f.astype(numpy.float64) # except AttributeError: # pass if (f.size * T.size) > 1e5: return numexpr.evaluate("(2 * h * f**3) / (c**2) * " "1 / (exp((h*f)/(k*T)) - 1)") * ( radiance_units["si"]) return ((2 * ureg.h * f**3) / (ureg.c ** 2) * 1 / (numpy.exp(((ureg.h * f) / (ureg.k * T)).to("1")) - 1)).to( ureg.W / (ureg.m**2 * ureg.sr * ureg.Hz))
[docs]def specrad_wavenumber2frequency(specrad_wavenum): """Convert spectral radiance from per wavenumber to per frequency :param specrad_wavenum: Spectral radiance per wavenumber [W·sr^{-1}·m^{-2}·{m^{-1}}^{-1}] :returns: Spectral radiance per frequency [W⋅sr−1⋅m−2⋅Hz−1] """ if not isinstance(specrad_wavenum, pint.quantity._Quantity): specrad_wavenum = specrad_wavenum * ureg.W / ( ureg.m**2 * ureg.sr * (1 / ureg.m)) return (specrad_wavenum / ureg.c).to(ureg.W / ( ureg.m**2 * ureg.sr * ureg.Hz))
[docs]def specrad_frequency_to_planck_bt(L, f): """Convert spectral radiance per frequency to brightness temperature This function converts monochromatic spectral radiance per frequency to Planck brightness temperature. This is calculated by inverting the Planck function. Note that this function is NOT correct to estimate polychromatic brightness temperatures such as channel brightness temperatures. For this, you need the spectral response function — see the SRF class. :param L: Spectral radiance [W m^-2 sr^-1 Hz^-1] :param f: Corresponding frequency [Hz] :returns: Planck brightness temperature [K]. """ # f needs to be double to prevent overflow f = numpy.asarray(f).astype(numpy.float64) if L.size > 1500000: logger.debug("Doing actual BT conversion: {:,} spectra * {:,} " "frequencies = {:,} radiances".format( L.size // L.shape[-1], f.size, L.size)) # BT = (h * f) / (k * numpy.log((2*h*f**3)/(L * c**2) + 1)) BT = numexpr.evaluate("(h * f) / (k * log((2*h*f**3)/(L * c**2) + 1))") BT = numpy.ma.masked_invalid(BT) if L.size > 1500000: logger.debug("(done)") return ureg.Quantity(BT, ureg.K)