Source code for diskmap.diskmap

"""
Module with mapping functionalities for protoplanetary disks.
"""

import math
import warnings

from typing import List, Optional, Tuple, Union, Callable

import numpy as np
import numpy.ma as ma

from astropy.io import fits
from astropy.stats import sigma_clip
from scipy.interpolate import griddata, interp1d
from scipy.ndimage import gaussian_filter, median_filter
from typeguard import typechecked


[docs] class DiskMap: """ Class for mapping a surface layer of a protoplanetary disk. """ @typechecked def __init__(self, fitsfile: Union[str, np.ndarray], pixscale: float, inclination: float, pos_angle: float, distance: float, image_type: str = 'polarized') -> None: """ Parameters ---------- fitsfile : str, np.ndarray Name of the FITS file with the scattered light image. Alternatively, a 2D ``numpy`` array with the image can be directly provided. pixscale : float Pixel scale of the image (arcsec per pixel). inclination : float Inclination of the disk (deg). The convention is such that the near side of the disk is on the right side of the image when using an inclination between 0 and 90 deg and using a `pos_angle` of 0 deg. The near and far side of the disk mapping can be exchanged by using a minus sign for the inclination. To be certain about using the correct near and far side, it is best to check the `_radius.fits` file with a somewhat large inclination. The near side will show more strongly compressed radii compared to the far side of the disk. pos_angle : float Position angle of the disk (deg). Defined in counterclockwise direction with respect to the vertical axis (i.e. east of north). distance : float Distance between observer and star (pc). image_type : str Image type ('polarized' or 'total'). This parameter affects the output that will be stored. For example, the conversion from polarized to total intensity phase function is only done with `image_type='polarized'`. Returns ------- NoneType None """ if isinstance(fitsfile, str): self.image = fits.getdata(fitsfile) else: self.image = fitsfile self.image = np.nan_to_num(self.image) if self.image.ndim == 3: warnings.warn( "The FITS file contains a 3D data cube so using the first image." ) self.image = self.image[ 0, ] elif self.image.ndim != 2: raise ValueError("DiskMap requires a 2D image.") if self.image.shape[0] != self.image.shape[1]: raise ValueError("The dimensions of the image should have the same size.") if self.image.dtype == np.float32 or self.image.dtype == np.dtype('>f4'): warnings.warn( "The FITS file data is of type float32, this will be converted to float64" ) self.image = self.image.astype(np.float64) if self.image.dtype != np.float64 and self.image.dtype != np.dtype('>f8'): raise ValueError( f"The FITS file data should be either of type float32 or float64" ) if image_type not in ["polarized", "total"]: raise ValueError( "The argument of 'image_type' should be set " "to 'polarized' or 'total'." ) self.pixscale = pixscale # (arcsec) self.incl = math.radians(inclination) # (rad) self.pos_ang = math.radians(pos_angle) # (rad) self.distance = distance # (pc) self.image_type = image_type self.grid = 501 # should be odd self.radius = None self.azimuth = None self.opening = None self.scatter = None self.im_deproj = None self.im_scaled = None self.stokes_i = None self.phase = None # sum_before = np.sum(self.image) # im_scale = rescale(image=np.asarray(self.image, dtype=np.float64), # scale=(10., 10.), # order=5, # mode='reflect', # anti_aliasing=True, # multichannel=False) # sum_after = np.sum(im_scale) # self.image = im_scale * (sum_before / sum_after) # self.pixscale /= 10. self.npix = self.image.shape[0]
[docs] @typechecked def map_disk( self, power_law: Tuple[float, float, float], radius: Tuple[float, float, int] = (1.0, 500.0, 100), surface: str = "power-law", height_func: Optional[Callable[[np.ndarray],np.ndarray]] = None, filename: Optional[str] = None, ) -> None: """ Function for mapping a scattered light image to a height profile (i.e. height of the scattering surface as function of radius in the disk midplane) that is either parameterized with a power-law function or read from an input file (for example returned by a radiative transfer code). Parameters ---------- power_law : tuple(float, float, float) The height of the scattering surface is a power-law function, :math:`h(r) = a + b*(r/1\\,\\mathrm{au})^c`, with :math:`a`, :math:`b`, :math:`r`, and :math:`h(r)` in au. The argument of ``power_law`` should be provided as ``(a, b, c)``. Set all values to zero for the mapping a geometrically flat disk, in which case only the inclination is used for the deprojection. radius : tuple(float, float, int) Radius points that are sampled, provided as (r_in, r_out, n_r), with ``r_in`` and ``r_out`` in au. The outer radius should be set large enough such that a radius is sampled for each pixel in the field of view. To check if any NaNs are present, have a look at the `_radius.fits` output. surface : str Parameterization type for the disk surface ('power-law' or 'function' or 'file'). height_func : callable, None Function that returns the height of the scattering surface as a function of radius. The radii and returned height must be in au. Only used if surface='function'. filename : str, None Filename which contains the radius in au (first column) and the height of the disk surface in au (second column). Returns ------- NoneType None """ if surface == "power-law": # Power-law disk height @typechecked def power_law_height( x_power: np.ndarray, a_power: float, b_power: float, c_power: float ) -> np.ndarray: return a_power + b_power * x_power ** c_power # midplane radius (au) disk_radius = np.linspace(radius[0], radius[1], radius[2]) # disk height (au) disk_height = power_law_height( disk_radius, power_law[0], power_law[1], power_law[2] ) # opening angle (rad) disk_opening = np.arctan2(disk_height, disk_radius) elif surface == "function": if height_func is None: raise ValueError("If using surface=='function', you must specify height_func") # midplane radius (au) disk_radius = np.linspace(radius[0], radius[1], radius[2]) # disk height (au) disk_height = height_func(disk_radius) # opening angle (rad) disk_opening = np.arctan2(disk_height, disk_radius) elif surface == "file": # Read disk height from ASCII file data = np.loadtxt(filename) # midplane radius (au) disk_radius = np.linspace(radius[0], radius[1], radius[2]) # disk height (au) height_interp = interp1d(data[:, 0], data[:, 1]) disk_height = height_interp(disk_radius) # opening angle (rad) disk_opening = np.arctan2(disk_height, disk_radius) # (au) # Project disk height to image plane disk_phi = np.linspace(0.0, 359.0, 360) # (deg) disk_phi = np.radians(disk_phi) # (rad) x_im = [] y_im = [] r_im = [] o_im = [] s_im = [] p_im = [] for i, r_item in enumerate(disk_radius): for j, p_item in enumerate(disk_phi): x_tmp = r_item * np.sin(p_item) y_tmp = disk_height[i] * math.sin(self.incl) - r_item * np.cos( p_item ) * math.cos(self.incl) x_rot = x_tmp * math.cos(math.pi - self.pos_ang) - y_tmp * math.sin( math.pi - self.pos_ang ) y_rot = x_tmp * math.sin(math.pi - self.pos_ang) + y_tmp * math.cos( math.pi - self.pos_ang ) x_im.append(x_rot) y_im.append(y_rot) r_im.append(math.sqrt(r_item ** 2 + disk_height[i] ** 2)) p_im.append(p_item) o_im.append(disk_opening[i]) ang_tmp = math.pi / 2.0 + disk_opening[i] par1 = ( math.sin(ang_tmp) * math.cos(math.pi + p_item) * math.sin(self.incl) ) par2 = math.cos(ang_tmp) * math.cos(self.incl) s_im.append(math.pi - math.acos(par1 + par2)) # Sort image plane points along x-axis x_index = np.argsort(x_im) y_sort = np.zeros(len(y_im)) r_sort = np.zeros(len(y_im)) p_sort = np.zeros(len(y_im)) o_sort = np.zeros(len(y_im)) s_sort = np.zeros(len(y_im)) for i in range(len(y_im)): y_sort[i] = y_im[x_index[i]] r_sort[i] = r_im[x_index[i]] p_sort[i] = p_im[x_index[i]] o_sort[i] = o_im[x_index[i]] s_sort[i] = s_im[x_index[i]] grid_xy = np.zeros((self.grid ** 2, 2)) count = 0 for i in range(-(self.grid - 1) // 2, (self.grid - 1) // 2 + 1): for j in range(-(self.grid - 1) // 2, (self.grid - 1) // 2 + 1): grid_xy[count, 0] = float(i) grid_xy[count, 1] = float(j) count += 1 image_xy = np.zeros((len(x_im), 2)) for i, _ in enumerate(x_im): image_xy[i, 0] = x_im[i] image_xy[i, 1] = y_im[i] # Interpolate image plane if self.npix % 2 == 0: x_grid = np.linspace(-self.npix / 2 + 0.5, self.npix / 2 - 0.5, self.npix) y_grid = np.linspace(-self.npix / 2 + 0.5, self.npix / 2 - 0.5, self.npix) elif self.npix % 2 == 1: x_grid = np.linspace(-(self.npix - 1) / 2, (self.npix - 1) / 2, self.npix) y_grid = np.linspace(-(self.npix - 1) / 2, (self.npix - 1) / 2, self.npix) x_grid *= self.pixscale * self.distance # (au) y_grid *= self.pixscale * self.distance # (au) grid = np.zeros((self.npix ** 2, 2)) count = 0 for i in range(self.npix): for j in range(self.npix): grid[count, 0] = x_grid[i] grid[count, 1] = x_grid[j] count += 1 fit_radius = griddata(image_xy, r_im, grid, method="linear") fit_azimuth = griddata(image_xy, p_im, grid, method="linear") fit_opening = griddata(image_xy, o_im, grid, method="linear") fit_scatter = griddata(image_xy, s_im, grid, method="linear") self.radius = np.zeros((self.npix, self.npix)) self.azimuth = np.zeros((self.npix, self.npix)) self.opening = np.zeros((self.npix, self.npix)) self.scatter = np.zeros((self.npix, self.npix)) count = 0 for i in range(self.npix): for j in range(self.npix): self.radius[i, j] = fit_radius[count] self.azimuth[i, j] = fit_azimuth[count] self.opening[i, j] = fit_opening[count] self.scatter[i, j] = fit_scatter[count] count += 1
[docs] @typechecked def sort_disk(self) -> Tuple[List[np.float64], np.ndarray]: """ Function for creating a list with pixel values and creating a 2D array with the x and y pixel coordinates. Returns ------- NoneType None """ # Create lists with x and y coordinates and pixel values x_disk = [] y_disk = [] im_disk = [] for i in range(self.npix): for j in range(self.npix): x_tmp = self.radius[i, j] * np.cos(self.azimuth[i, j]) y_tmp = self.radius[i, j] * np.sin(self.azimuth[i, j]) x_disk.append( x_tmp * math.cos(math.pi / 2.0 - self.pos_ang) - y_tmp * math.sin(math.pi / 2.0 - self.pos_ang) ) y_disk.append( x_tmp * math.sin(math.pi / 2.0 - self.pos_ang) + y_tmp * math.cos(math.pi / 2.0 - self.pos_ang) ) im_disk.append(self.image[i, j]) # Sort disk plane points along x-axis x_index = np.argsort(x_disk) y_sort = np.zeros(len(y_disk)) im_sort = np.zeros(len(y_disk)) for i in range(len(y_disk)): y_sort[i] = y_disk[x_index[i]] im_sort[i] = im_disk[x_index[i]] # count = 0 # # grid_xy = np.zeros((self.grid**2, 2)) # # for i in range(-(self.grid-1)//2, (self.grid-1)//2+1): # for j in range(-(self.grid-1)//2, (self.grid-1)//2+1): # grid_xy[count, 0] = float(i) # grid_xy[count, 1] = float(j) # # count += 1 image_xy = np.zeros((len(y_disk), 2)) for i, _ in enumerate(y_disk): image_xy[i, 0] = x_disk[i] image_xy[i, 1] = y_disk[i] return im_disk, image_xy
[docs] @typechecked def deproject_disk(self) -> None: """ Function for deprojecting a disk surface based on the mapping of :meth:`~diskmap.diskmap.DiskMap.map_disk`. Returns ------- NoneType None """ if self.radius is None or self.azimuth is None: raise ValueError( "The disk has not been mapped yet with the 'map_disk' method." ) im_disk, image_xy = self.sort_disk() # Interpolate disk plane if self.npix % 2 == 0: x_grid = np.linspace(-self.npix / 2 + 0.5, self.npix / 2 - 0.5, self.npix) y_grid = np.linspace(-self.npix / 2 + 0.5, self.npix / 2 - 0.5, self.npix) elif self.npix % 2 == 1: x_grid = np.linspace(-(self.npix - 1) / 2, (self.npix - 1) / 2, self.npix) y_grid = np.linspace(-(self.npix - 1) / 2, (self.npix - 1) / 2, self.npix) x_grid *= self.pixscale * self.distance # (au) y_grid *= self.pixscale * self.distance # (au) grid = np.zeros((self.npix ** 2, 2)) count = 0 for i in range(self.npix): for j in range(self.npix): grid[count, 0] = x_grid[i] grid[count, 1] = x_grid[j] count += 1 try: fit_im = griddata(image_xy, im_disk, grid, method="linear") except ValueError: raise RuntimeError( "The radius sampling should cover the " "complete field of view of the image. Try " "increasing the outer 'radius' value in " "'map_disk' and have a look at the " "'_radius.fits' output to check for NaNs." ) self.im_deproj = np.zeros((self.npix, self.npix)) count = 0 for i in range(self.npix): for j in range(self.npix): self.im_deproj[i, j] = fit_im[count] count += 1
[docs] @typechecked def r2_scaling(self, r_max: float, mask_planet: Optional[Tuple[int, int, float, float]] = None) -> None: """ Function for correcting a scattered light image for the r^2 decrease of the stellar irradiation of the disk surface. Parameters ---------- r_max : float Maximum disk radius (au) for the r^2-scaling. Beyond this distance, a constant r^2-scaling is applied of value ``r_max``. mask_planet : tuple(int, int, float, float), None Mask for a planet such that it will not be scaled. The tuple should have the following format: ``(x_planet, y_planet, r_mask, scaling)``. Here, ``x_planet`` and ``y_planet`` are the central pixel of the planet position, ``r_mask`` is the size (in pixels) of the planet signal, and ``scaling`` is the scaling factor of the planet flux. This parameter is a bit experimental and typically not used by setting the argument to ``None``. Returns ------- NoneType None """ if self.radius is None: raise ValueError("Please run 'map_disk' before using 'r2_scaling'.") if mask_planet is not None: x_grid = np.linspace(-mask_planet[0], self.npix-mask_planet[0], self.npix) y_grid = np.linspace(-mask_planet[1], self.npix-mask_planet[1], self.npix) xx_grid, yy_grid = np.meshgrid(x_grid, y_grid) rr_grid = np.sqrt(xx_grid**2 + yy_grid**2) # self.low_snr = np.zeros((self.npix, self.npix)) # # for i in range(3, self.npix-3): # for j in range(3, self.npix-3): # aperture = np.nansum(self.image[i-2:i+3, j-2:j+3]) # # if aperture > 90. and j < 157: # self.low_snr[i, j] = np.nan # else: # self.low_snr[i, j] = self.image[i, j].copy() # # std_low = np.nanstd(self.low_snr) # # for i in range(3, self.npix-3): # for j in range(3, self.npix-3): # if not np.isnan(self.low_snr[i, j]) and np.abs(self.image[i, j]) > 3.*std_low: # select = self.low_snr[i-3:i+4, j-3:j+4].copy() # select[3, 3] = np.nan # # self.image[i, j] = np.nanmedian(select) # self.low_snr = sigma_clip(self.low_snr, 5., maxiters=10) # for k in range(2): # for i in range(3, self.npix-3): # for j in range(3, self.npix-3): # if not np.isnan(self.low_snr[i, j]): # select = self.low_snr[i-3:i+4, j-3:j+4].copy() # select[3, 3] = np.nan # # if self.image[i, j] > 0.5*np.nanstd(select): # self.image[i, j] = np.nanmedian(select) # for i in range(3, self.npix-3): # for j in range(3, self.npix-3): # if not np.isnan(self.high_snr[i, j]): # self.high_snr[i, j] = self.low_snr[i, j] # # elif type(self.low_snr[i, j]) is ma.core.MaskedConstant: # # self.high_snr[i, j] = np.nansum(self.low_snr[i-2:i+3, j-2:j+3]) # # else: # # self.high_snr[i, j] = self.image[i, j] # else: # self.high_snr[i, j] = self.low_snr[i, j].copy() # for k in range(2): # for i in range(3, self.npix-3): # for j in range(3, self.npix-3): # if np.isnan(self.filt[i, j]): # select = self.image[i-2:i+3, j-2:j+3].copy() # select[2, 2] = np.nan # # if np.abs(self.image[i, j]) > 1.5*np.nanstd(select): # self.image[i, j] = np.nanmedian(select) # self.filt[i, j] = median_filter(self.image, 2) # self.image = self.high_snr.copy() self.im_scaled = np.zeros((self.npix, self.npix)) for i in range(self.npix): for j in range(self.npix): if self.radius[i, j] < r_max: if mask_planet is None or rr_grid[i, j] > mask_planet[2]: self.im_scaled[i, j] = self.radius[i, j]**2 * self.image[i, j] else: self.im_scaled[i, j] = mask_planet[3] * self.image[i, j] else: if mask_planet is None or rr_grid[i, j] > mask_planet[2]: self.im_scaled[i, j] = r_max**2 * self.image[i, j] else: self.im_scaled[i, j] = mask_planet[3] * self.image[i, j]
[docs] @typechecked def total_intensity(self, pol_max: float = 1.0) -> None: """ Function for estimating the (stellar irradiation corrected) total intensity image when ``fitsfile`` contains a polarized light image and ``image_type='polarized'``. A bell-shaped degree of polarized is assumed and effects of multiple scattering are ignored. Parameters ---------- pol_max : float The peak of the bell-shaped degree of polarization, which effectively normalizes the estimated total intensity image. Returns ------- NoneType None """ if self.image_type != "polarized": raise ValueError( "The 'total_intensity' method should only be " "used if the input image is a polarized light " "image (i.e. image_type='polarized')." ) if self.scatter is None or self.im_scaled is None: raise ValueError("Please run 'map_disk' before using 'total_intensity'.") alpha = np.cos(self.scatter) deg_pol = -pol_max * (alpha ** 2 - 1.0) / (alpha ** 2 + 1.0) self.stokes_i = self.im_scaled / deg_pol
[docs] @typechecked def phase_function(self, radius: Tuple[float, float], n_phase: int): """ Function for extracting the phase function. If ``image_type='polarized'``, the polarized phase function is extracted and the total intensity phase function is estimated by assuming a bell-shaped degree of polarization. If ``image_type='polarized'``, the total intensity phase function is extracted. The extracting is done on the r$^2$-scaled pixel values such that the phase function is not biased by irradiation effects. The phase functions are have been normalized by their maximum value. Parameters ---------- radius : tuple(float, float) Inner and outer radius (au) between which pixels are selected for estimating the phase function. n_phase : int Number of sampling points for the phase function between 0 and 180 deg. Returns ------- NoneType None """ # Phase function is normalizedf so pol_max has not effect pol_max = 1.0 if self.radius is None or self.scatter is None or self.im_scaled is None: raise ValueError( "Please run 'map_disk' and 'r2_scaling' " "before using 'phase_function'." ) scat_select = [] im_select = [] for i in range(self.npix): for j in range(self.npix): if self.radius[i, j] > radius[0] and self.radius[i, j] < radius[1]: scat_select.append(math.degrees(self.scatter[i, j])) # use im_scaled to correct for differences across the # selected radius im_select.append(self.im_scaled[i, j]) phase_bins = np.linspace(0.0, 180.0, num=n_phase) bin_index = np.digitize(scat_select, phase_bins) im_bins = [] scat_bins = [] for i in range(n_phase): im_bins.append([]) scat_bins.append([]) for i, _ in enumerate(im_select): im_bins[bin_index[i] - 1].append(im_select[i]) scat_bins[bin_index[i] - 1].append(scat_select[i]) angle = [] pol_flux = [] pol_error = [] for i in range(n_phase): if len(im_bins[i]) > 0: angle.append(np.nanmean(scat_bins[i])) pol_flux.append(np.nanmean(im_bins[i])) pol_error.append(np.nanstd(im_bins[i]) / math.sqrt(len(im_bins[i]))) if self.image_type == "polarized": # Degree of polarization alpha = np.cos(np.array(angle) * np.pi / 180.0) deg_pol = -pol_max * (alpha * alpha - 1.0) / (alpha * alpha + 1.0) tot_flux = pol_flux / deg_pol tot_error = pol_error / deg_pol # Normalization flux_norm = max(pol_flux) pol_flux = np.array(pol_flux) / flux_norm pol_error = np.array(pol_error) / flux_norm flux_norm = max(tot_flux) tot_flux = np.array(tot_flux) / flux_norm tot_error = np.array(tot_error) / flux_norm self.phase = np.column_stack( [angle, pol_flux, pol_error, tot_flux, tot_error] ) else: self.phase = np.column_stack([angle, pol_flux, pol_error])
[docs] @typechecked def write_output(self, filename: str) -> None: """ Function for writing the available results to FITS files. Parameters ---------- filename : str Filename start that is used for all the output file. Returns ------- NoneType None """ if self.radius is not None: fits.writeto(f"{filename}_radius.fits", self.radius, overwrite=True) if self.scatter is not None: fits.writeto( f"{filename}_scat_angle.fits", np.degrees(self.scatter), overwrite=True ) if self.im_deproj is not None: fits.writeto(f"{filename}_deprojected.fits", self.im_deproj, overwrite=True) if self.im_scaled is not None: fits.writeto(f"{filename}_r2_scaled.fits", self.im_scaled, overwrite=True) if self.stokes_i is not None: fits.writeto( f"{filename}_total_intensity.fits", self.stokes_i, overwrite=True ) if self.phase is not None: if self.image_type == "polarized": header = ( "Scattering angle (deg) - Normalized polarized " + "flux - Error - Normalized total flux - Error" ) else: header = "Scattering angle (deg) - Normalized total flux - Error" np.savetxt(f"{filename}_phase_function.dat", self.phase, header=header)