Source code for diskmap.diskmap

"""
Module with mapping functionalities for protoplanetary disks.
"""

import math
import warnings

import numpy as np
# import numpy.ma as ma

from astropy.io import fits
# from astropy.stats import sigma_clip
from beartype import beartype, typing
from scipy.interpolate import griddata, interp1d
# from scipy.ndimage import median_filter


[docs] class DiskMap: """ Class for mapping a surface layer of a protoplanetary disk. """ @beartype def __init__( self, fitsfile: typing.Union[str, np.ndarray], pixscale: float, inclination: float, pos_angle: float, distance: float, image_type: str = "polarized", ) -> None: """ Parameters ---------- fitsfile : str, np.ndarray Name of the FITS file with the scattered light image. Alternatively, a 2D ``numpy`` array with the image can be directly provided. pixscale : float Pixel scale of the image (arcsec per pixel). inclination : float Inclination of the disk (deg). The convention is such that the near side of the disk is on the right side of the image when using an inclination between 0 and 90 deg and using a `pos_angle` of 0 deg. The near and far side of the disk mapping can be exchanged by using a minus sign for the inclination. To be certain about using the correct near and far side, it is best to check the `_radius.fits` file with a somewhat large inclination. The near side will show more strongly compressed radii compared to the far side of the disk. pos_angle : float Position angle of the disk (deg). Defined in counterclockwise direction with respect to the vertical axis (i.e. east of north). distance : float Distance between observer and star (pc). image_type : str Image type ('polarized' or 'total'). This parameter affects the output that will be stored. For example, the conversion from polarized to total intensity phase function is only done with `image_type='polarized'`. Returns ------- NoneType None """ if isinstance(fitsfile, str): self.image = fits.getdata(fitsfile) else: self.image = fitsfile self.image = np.nan_to_num(self.image) if self.image.ndim == 3: warnings.warn( "The FITS file contains a 3D data cube so using the first image." ) self.image = self.image[0,] elif self.image.ndim != 2: raise ValueError("DiskMap requires a 2D image.") if self.image.shape[0] != self.image.shape[1]: raise ValueError("The dimensions of the image should have the same size.") if self.image.dtype == np.float32 or self.image.dtype == np.dtype(">f4"): warnings.warn( "The FITS file data is of type float32, " "his will be converted to float64" ) self.image = self.image.astype(np.float64) if self.image.dtype != np.float64 and self.image.dtype != np.dtype(">f8"): raise ValueError( "The FITS file data should be " "either of type float32 or float64" ) if image_type not in ["polarized", "total"]: raise ValueError( "The argument of 'image_type' should be set " "to 'polarized' or 'total'." ) self.pixscale = pixscale # (arcsec) self.incl = math.radians(inclination) # (rad) self.pos_ang = math.radians(pos_angle) # (rad) self.distance = distance # (pc) self.image_type = image_type self.grid = 501 # should be odd self.radius = None self.azimuth = None self.opening = None self.scatter = None self.im_deproj = None self.im_scaled = None self.stokes_i = None self.phase = None # sum_before = np.sum(self.image) # im_scale = rescale(image=np.asarray(self.image, dtype=np.float64), # scale=(10., 10.), # order=5, # mode='reflect', # anti_aliasing=True, # multichannel=False) # sum_after = np.sum(im_scale) # self.image = im_scale * (sum_before / sum_after) # self.pixscale /= 10. self.npix = self.image.shape[0]
[docs] @beartype def map_disk( self, power_law: typing.Tuple[float, float, float], radius: typing.Tuple[float, float, int] = (1.0, 500.0, 100), surface: str = "power-law", height_func: typing.Optional[typing.Callable[[np.ndarray], np.ndarray]] = None, filename: typing.Optional[str] = None, ) -> None: """ Function for mapping a scattered light image to a height profile (i.e. height of the scattering surface as function of radius in the disk midplane) that is either parameterized with a power-law function or read from an input file (for example returned by a radiative transfer code). Parameters ---------- power_law : tuple(float, float, float) The height of the scattering surface is a power-law function, :math:`h(r) = a + b*(r/1\\,\\mathrm{au})^c`, with :math:`a`, :math:`b`, :math:`r`, and :math:`h(r)` in au. The argument of ``power_law`` should be provided as ``(a, b, c)``. Set all values to zero for the mapping a geometrically flat disk, in which case only the inclination is used for the deprojection. radius : tuple(float, float, int) Radius points that are sampled, provided as (r_in, r_out, n_r), with ``r_in`` and ``r_out`` in au. The outer radius should be set large enough such that a radius is sampled for each pixel in the field of view. To check if any NaNs are present, have a look at the `_radius.fits` output. surface : str Parameterization type for the disk surface ('power-law' or 'function' or 'file'). height_func : callable, None Function that returns the height of the scattering surface as a function of radius. The radii and returned height must be in au. Only used if surface='function'. filename : str, None Filename which contains the radius in au (first column) and the height of the disk surface in au (second column). Returns ------- NoneType None """ if surface == "power-law": # Power-law disk height @beartype def power_law_height( x_power: np.ndarray, a_power: float, b_power: float, c_power: float ) -> np.ndarray: return a_power + b_power * x_power**c_power # midplane radius (au) disk_radius = np.linspace(radius[0], radius[1], radius[2]) # disk height (au) disk_height = power_law_height( disk_radius, power_law[0], power_law[1], power_law[2] ) # opening angle (rad) disk_opening = np.arctan2(disk_height, disk_radius) elif surface == "function": if height_func is None: raise ValueError( "If using surface=='function', you must specify height_func" ) # midplane radius (au) disk_radius = np.linspace(radius[0], radius[1], radius[2]) # disk height (au) disk_height = height_func(disk_radius) # opening angle (rad) disk_opening = np.arctan2(disk_height, disk_radius) elif surface == "file": # Read disk height from ASCII file data = np.loadtxt(filename) # midplane radius (au) disk_radius = np.linspace(radius[0], radius[1], radius[2]) # disk height (au) height_interp = interp1d(data[:, 0], data[:, 1]) disk_height = height_interp(disk_radius) # opening angle (rad) disk_opening = np.arctan2(disk_height, disk_radius) # (au) # Project disk height to image plane disk_phi = np.linspace(0.0, 359.0, 360) # (deg) disk_phi = np.radians(disk_phi) # (rad) x_im = [] y_im = [] r_im = [] o_im = [] s_im = [] p_im = [] for i, r_item in enumerate(disk_radius): for j, p_item in enumerate(disk_phi): x_tmp = r_item * np.sin(p_item) y_tmp = disk_height[i] * math.sin(self.incl) - r_item * np.cos( p_item ) * math.cos(self.incl) x_rot = x_tmp * math.cos(math.pi - self.pos_ang) - y_tmp * math.sin( math.pi - self.pos_ang ) y_rot = x_tmp * math.sin(math.pi - self.pos_ang) + y_tmp * math.cos( math.pi - self.pos_ang ) x_im.append(x_rot) y_im.append(y_rot) r_im.append(math.sqrt(r_item**2 + disk_height[i] ** 2)) p_im.append(p_item) o_im.append(disk_opening[i]) ang_tmp = math.pi / 2.0 + disk_opening[i] par1 = ( math.sin(ang_tmp) * math.cos(math.pi + p_item) * math.sin(self.incl) ) par2 = math.cos(ang_tmp) * math.cos(self.incl) s_im.append(math.pi - math.acos(par1 + par2)) # Sort image plane points along x-axis x_index = np.argsort(x_im) y_sort = np.zeros(len(y_im)) r_sort = np.zeros(len(y_im)) p_sort = np.zeros(len(y_im)) o_sort = np.zeros(len(y_im)) s_sort = np.zeros(len(y_im)) for i in range(len(y_im)): y_sort[i] = y_im[x_index[i]] r_sort[i] = r_im[x_index[i]] p_sort[i] = p_im[x_index[i]] o_sort[i] = o_im[x_index[i]] s_sort[i] = s_im[x_index[i]] grid_xy = np.zeros((self.grid**2, 2)) count = 0 for i in range(-(self.grid - 1) // 2, (self.grid - 1) // 2 + 1): for j in range(-(self.grid - 1) // 2, (self.grid - 1) // 2 + 1): grid_xy[count, 0] = float(i) grid_xy[count, 1] = float(j) count += 1 image_xy = np.zeros((len(x_im), 2)) for i, _ in enumerate(x_im): image_xy[i, 0] = x_im[i] image_xy[i, 1] = y_im[i] # Interpolate image plane if self.npix % 2 == 0: x_grid = np.linspace(-self.npix / 2 + 0.5, self.npix / 2 - 0.5, self.npix) y_grid = np.linspace(-self.npix / 2 + 0.5, self.npix / 2 - 0.5, self.npix) else: x_grid = np.linspace(-(self.npix - 1) / 2, (self.npix - 1) / 2, self.npix) y_grid = np.linspace(-(self.npix - 1) / 2, (self.npix - 1) / 2, self.npix) x_grid *= self.pixscale * self.distance # (au) y_grid *= self.pixscale * self.distance # (au) grid = np.zeros((self.npix**2, 2)) count = 0 for i in range(self.npix): for j in range(self.npix): grid[count, 0] = x_grid[i] grid[count, 1] = x_grid[j] count += 1 fit_radius = griddata(image_xy, r_im, grid, method="linear") fit_azimuth = griddata(image_xy, p_im, grid, method="linear") fit_opening = griddata(image_xy, o_im, grid, method="linear") fit_scatter = griddata(image_xy, s_im, grid, method="linear") self.radius = np.zeros((self.npix, self.npix)) self.azimuth = np.zeros((self.npix, self.npix)) self.opening = np.zeros((self.npix, self.npix)) self.scatter = np.zeros((self.npix, self.npix)) count = 0 for i in range(self.npix): for j in range(self.npix): self.radius[i, j] = fit_radius[count] self.azimuth[i, j] = fit_azimuth[count] self.opening[i, j] = fit_opening[count] self.scatter[i, j] = fit_scatter[count] count += 1
[docs] @beartype def sort_disk(self) -> typing.Tuple[typing.List[np.float64], np.ndarray]: """ Function for creating a list with pixel values and creating a 2D array with the x and y pixel coordinates. Returns ------- NoneType None """ # Create lists with x and y coordinates and pixel values x_disk = [] y_disk = [] im_disk = [] for i in range(self.npix): for j in range(self.npix): x_tmp = self.radius[i, j] * np.cos(self.azimuth[i, j]) y_tmp = self.radius[i, j] * np.sin(self.azimuth[i, j]) x_disk.append( x_tmp * math.cos(math.pi / 2.0 - self.pos_ang) - y_tmp * math.sin(math.pi / 2.0 - self.pos_ang) ) y_disk.append( x_tmp * math.sin(math.pi / 2.0 - self.pos_ang) + y_tmp * math.cos(math.pi / 2.0 - self.pos_ang) ) im_disk.append(self.image[i, j]) # Sort disk plane points along x-axis x_index = np.argsort(x_disk) y_sort = np.zeros(len(y_disk)) im_sort = np.zeros(len(y_disk)) for i in range(len(y_disk)): y_sort[i] = y_disk[x_index[i]] im_sort[i] = im_disk[x_index[i]] # count = 0 # # grid_xy = np.zeros((self.grid**2, 2)) # # for i in range(-(self.grid-1)//2, (self.grid-1)//2+1): # for j in range(-(self.grid-1)//2, (self.grid-1)//2+1): # grid_xy[count, 0] = float(i) # grid_xy[count, 1] = float(j) # # count += 1 image_xy = np.zeros((len(y_disk), 2)) for i, _ in enumerate(y_disk): image_xy[i, 0] = x_disk[i] image_xy[i, 1] = y_disk[i] return im_disk, image_xy
[docs] @beartype def deproject_disk(self) -> None: """ Function for deprojecting a disk surface based on the mapping of :meth:`~diskmap.diskmap.DiskMap.map_disk`. Returns ------- NoneType None """ if self.radius is None or self.azimuth is None: raise ValueError( "The disk has not been mapped yet with the 'map_disk' method." ) im_disk, image_xy = self.sort_disk() # Interpolate disk plane if self.npix % 2 == 0: x_grid = np.linspace(-self.npix / 2 + 0.5, self.npix / 2 - 0.5, self.npix) y_grid = np.linspace(-self.npix / 2 + 0.5, self.npix / 2 - 0.5, self.npix) else: x_grid = np.linspace(-(self.npix - 1) / 2, (self.npix - 1) / 2, self.npix) y_grid = np.linspace(-(self.npix - 1) / 2, (self.npix - 1) / 2, self.npix) x_grid *= self.pixscale * self.distance # (au) y_grid *= self.pixscale * self.distance # (au) grid = np.zeros((self.npix**2, 2)) count = 0 for i in range(self.npix): for j in range(self.npix): grid[count, 0] = x_grid[i] grid[count, 1] = x_grid[j] count += 1 try: fit_im = griddata(image_xy, im_disk, grid, method="linear") except ValueError as exc: raise RuntimeError( "The radius sampling should cover the " "complete field of view of the image. Try " "increasing the outer 'radius' value in " "'map_disk' and have a look at the " "'_radius.fits' output to check for NaNs." ) from exc self.im_deproj = np.zeros((self.npix, self.npix)) count = 0 for i in range(self.npix): for j in range(self.npix): self.im_deproj[i, j] = fit_im[count] count += 1
[docs] @beartype def r2_scaling( self, r_max: float, mask_planet: typing.Optional[typing.Tuple[int, int, float, float]] = None, ) -> None: """ Function for correcting a scattered light image for the r^2 decrease of the stellar irradiation of the disk surface. Parameters ---------- r_max : float Maximum disk radius (au) for the r^2-scaling. Beyond this distance, a constant r^2-scaling is applied of value ``r_max``. mask_planet : tuple(int, int, float, float), None Mask for a planet such that it will not be scaled. The tuple should have the following format: ``(x_planet, y_planet, r_mask, scaling)``. Here, ``x_planet`` and ``y_planet`` are the central pixel of the planet position, ``r_mask`` is the size (in pixels) of the planet signal, and ``scaling`` is the scaling factor of the planet flux. This parameter is a bit experimental and typically not used by setting the argument to ``None``. Returns ------- NoneType None """ if self.radius is None: raise ValueError("Please run 'map_disk' before using 'r2_scaling'.") if mask_planet is not None: x_grid = np.linspace(-mask_planet[0], self.npix - mask_planet[0], self.npix) y_grid = np.linspace(-mask_planet[1], self.npix - mask_planet[1], self.npix) xx_grid, yy_grid = np.meshgrid(x_grid, y_grid) rr_grid = np.sqrt(xx_grid**2 + yy_grid**2) # self.low_snr = np.zeros((self.npix, self.npix)) # # for i in range(3, self.npix-3): # for j in range(3, self.npix-3): # aperture = np.nansum(self.image[i-2:i+3, j-2:j+3]) # # if aperture > 90. and j < 157: # self.low_snr[i, j] = np.nan # else: # self.low_snr[i, j] = self.image[i, j].copy() # # std_low = np.nanstd(self.low_snr) # # for i in range(3, self.npix-3): # for j in range(3, self.npix-3): # if not np.isnan(self.low_snr[i, j]) and np.abs(self.image[i, j]) > 3.*std_low: # select = self.low_snr[i-3:i+4, j-3:j+4].copy() # select[3, 3] = np.nan # # self.image[i, j] = np.nanmedian(select) # self.low_snr = sigma_clip(self.low_snr, 5., maxiters=10) # for k in range(2): # for i in range(3, self.npix-3): # for j in range(3, self.npix-3): # if not np.isnan(self.low_snr[i, j]): # select = self.low_snr[i-3:i+4, j-3:j+4].copy() # select[3, 3] = np.nan # # if self.image[i, j] > 0.5*np.nanstd(select): # self.image[i, j] = np.nanmedian(select) # for i in range(3, self.npix-3): # for j in range(3, self.npix-3): # if not np.isnan(self.high_snr[i, j]): # self.high_snr[i, j] = self.low_snr[i, j] # # elif type(self.low_snr[i, j]) is ma.core.MaskedConstant: # # self.high_snr[i, j] = np.nansum(self.low_snr[i-2:i+3, j-2:j+3]) # # else: # # self.high_snr[i, j] = self.image[i, j] # else: # self.high_snr[i, j] = self.low_snr[i, j].copy() # for k in range(2): # for i in range(3, self.npix-3): # for j in range(3, self.npix-3): # if np.isnan(self.filt[i, j]): # select = self.image[i-2:i+3, j-2:j+3].copy() # select[2, 2] = np.nan # # if np.abs(self.image[i, j]) > 1.5*np.nanstd(select): # self.image[i, j] = np.nanmedian(select) # self.filt[i, j] = median_filter(self.image, 2) # self.image = self.high_snr.copy() self.im_scaled = np.zeros((self.npix, self.npix)) for i in range(self.npix): for j in range(self.npix): if self.radius[i, j] < r_max: if mask_planet is None or rr_grid[i, j] > mask_planet[2]: self.im_scaled[i, j] = self.radius[i, j] ** 2 * self.image[i, j] else: self.im_scaled[i, j] = mask_planet[3] * self.image[i, j] else: if mask_planet is None or rr_grid[i, j] > mask_planet[2]: self.im_scaled[i, j] = r_max**2 * self.image[i, j] else: self.im_scaled[i, j] = mask_planet[3] * self.image[i, j]
[docs] @beartype def total_intensity(self, pol_max: float = 1.0) -> None: """ Function for estimating the (stellar irradiation corrected) total intensity image when ``fitsfile`` contains a polarized light image and ``image_type='polarized'``. A bell-shaped degree of polarized is assumed and effects of multiple scattering are ignored. Parameters ---------- pol_max : float The peak of the bell-shaped degree of polarization, which effectively normalizes the estimated total intensity image. Returns ------- NoneType None """ if self.image_type != "polarized": raise ValueError( "The 'total_intensity' method should only be " "used if the input image is a polarized light " "image (i.e. image_type='polarized')." ) if self.scatter is None or self.im_scaled is None: raise ValueError("Please run 'map_disk' before using 'total_intensity'.") alpha = np.cos(self.scatter) deg_pol = -pol_max * (alpha**2 - 1.0) / (alpha**2 + 1.0) self.stokes_i = self.im_scaled / deg_pol
[docs] @beartype def phase_function(self, radius: typing.Tuple[float, float], n_phase: int): """ Function for extracting the phase function. If ``image_type='polarized'``, the polarized phase function is extracted and the total intensity phase function is estimated by assuming a bell-shaped degree of polarization. If ``image_type='polarized'``, the total intensity phase function is extracted. The extracting is done on the r$^2$-scaled pixel values such that the phase function is not biased by irradiation effects. The phase functions are have been normalized by their maximum value. Parameters ---------- radius : tuple(float, float) Inner and outer radius (au) between which pixels are selected for estimating the phase function. n_phase : int Number of sampling points for the phase function between 0 and 180 deg. Returns ------- NoneType None """ # Phase function is normalizedf so pol_max has not effect pol_max = 1.0 if self.radius is None or self.scatter is None or self.im_scaled is None: raise ValueError( "Please run 'map_disk' and 'r2_scaling' " "before using 'phase_function'." ) scat_select = [] im_select = [] for i in range(self.npix): for j in range(self.npix): if self.radius[i, j] > radius[0] and self.radius[i, j] < radius[1]: scat_select.append(math.degrees(self.scatter[i, j])) # use im_scaled to correct for differences across the # selected radius im_select.append(self.im_scaled[i, j]) phase_bins = np.linspace(0.0, 180.0, num=n_phase) bin_index = np.digitize(scat_select, phase_bins) im_bins = [] scat_bins = [] for i in range(n_phase): im_bins.append([]) scat_bins.append([]) for i, im_item in enumerate(im_select): im_bins[bin_index[i] - 1].append(im_item) scat_bins[bin_index[i] - 1].append(scat_select[i]) angle = [] pol_flux = [] pol_error = [] for i in range(n_phase): if len(im_bins[i]) > 0: angle.append(np.nanmean(scat_bins[i])) pol_flux.append(np.nanmean(im_bins[i])) pol_error.append(np.nanstd(im_bins[i]) / math.sqrt(len(im_bins[i]))) if self.image_type == "polarized": # Degree of polarization alpha = np.cos(np.array(angle) * np.pi / 180.0) deg_pol = -pol_max * (alpha * alpha - 1.0) / (alpha * alpha + 1.0) tot_flux = pol_flux / deg_pol tot_error = pol_error / deg_pol # Normalization flux_norm = max(pol_flux) pol_flux = np.array(pol_flux) / flux_norm pol_error = np.array(pol_error) / flux_norm flux_norm = max(tot_flux) tot_flux = np.array(tot_flux) / flux_norm tot_error = np.array(tot_error) / flux_norm self.phase = np.column_stack( [angle, pol_flux, pol_error, tot_flux, tot_error] ) else: self.phase = np.column_stack([angle, pol_flux, pol_error])
[docs] @beartype def write_output(self, filename: str) -> None: """ Function for writing the available results to FITS files. Parameters ---------- filename : str Filename start that is used for all the output file. Returns ------- NoneType None """ if self.radius is not None: fits.writeto(f"{filename}_radius.fits", self.radius, overwrite=True) if self.scatter is not None: fits.writeto( f"{filename}_scat_angle.fits", np.degrees(self.scatter), overwrite=True ) if self.im_deproj is not None: fits.writeto(f"{filename}_deprojected.fits", self.im_deproj, overwrite=True) if self.im_scaled is not None: fits.writeto(f"{filename}_r2_scaled.fits", self.im_scaled, overwrite=True) if self.stokes_i is not None: fits.writeto( f"{filename}_total_intensity.fits", self.stokes_i, overwrite=True ) if self.phase is not None: if self.image_type == "polarized": header = ( "Scattering angle (deg) - Normalized polarized " + "flux - Error - Normalized total flux - Error" ) else: header = "Scattering angle (deg) - Normalized total flux - Error" np.savetxt(f"{filename}_phase_function.dat", self.phase, header=header)