jess package

Subpackages

Submodules

jess.JESS_filters module

The repository for all my filters

class jess.JESS_filters.FilterMaskResult(dynamic_spectra: numpy.ndarray, mask: numpy.ndarray, percent_masked: numpy.float64)

Bases: tuple

dynamic_spectra - Dynamic Spectra with RFI filtered mask - Boolean mask percent_masked - The percent masked

dynamic_spectra: numpy.ndarray

Alias for field number 0

mask: numpy.ndarray

Alias for field number 1

percent_masked: numpy.float64

Alias for field number 2

class jess.JESS_filters.FilterResult(dynamic_spectra: numpy.ndarray, percent_masked: numpy.float64)

Bases: tuple

dynamic_spectra - Dynamic Spectra with RFI filtered percent_masked - The percent masked

dynamic_spectra: numpy.ndarray

Alias for field number 0

percent_masked: numpy.float64

Alias for field number 1

jess.JESS_filters.anderson_calculate_values(yr_file, window=64, time_median_kernel=0)

Run a Anderson Darling test on a Fits/Filterbank

Args:

yr_file: Your object

window: window size for the test

time_median_kernel: remove baseline by subtracting a running median

of time_median_kernel length long. Default is no subtraction

returns:

array of anderson darling values for each window.

jess.JESS_filters.autocorrelation_calculate_values(yr_file, window=64, time_median_kernel=0)

Calculate Autocorrelation for blocks of length window.

Args:

yr_file: The Your object for a FITS/fil file to process

window: window length in number of time samples

time_median_kernel: Detrend in time by subtracting a smoothed running

median, smoothed bt time_median_kernel

bandpass_kernel: bandpass kernel to smooth bandpass

Returns:

Autocorrelation values for each block in the file

Notes:

Not sure what the best test for autocorrelation.

https://arxiv.org/pdf/2108.12434.pdf uses the absolute magnitude of of a one sample delay.

The Durbin-Watson statistic also uses the first lag https://en.wikipedia.org/wiki/Durbin%E2%80%93Watson_statistic

I’ve tried some other tests,
  • the sum of the absolute values of all the correlations

  • (std - iqr) to look for outliers

  • the absolute value of the first lag

jess.JESS_filters.central_limit_masker(test_values: numpy.ndarray, window: int, sigma: float = 5, chans_per_subband: int = 512, remove_lower=True) numpy.ndarray

Uses the central limit theorem to look for outliers in each subband. When looking at a large amount of values, the central limit theorem says the values should start to be Guassian distributed. We can use this to flag outliers.

Using subbands we can take into account changes in sensitively across the band/cavity filters.

Args:

test_values: the values from a statistical test

window: the window size of the test

sigma: sigma at which to flag values

num_subbands: number of subbands

return:

bool array with outliers get set as true, this will be the same size as the data.

notes:

see “Spectral Kurtosis-Based RFI Mitigation for CHIME” https://arxiv.org/abs/1808.10365

and

“High cadence kurtosis based RFI excision for CHIME” https://open.library.ubc.ca/soa/cIRcle/collections/ubctheses/24/items/1.0394838?o=5

jess.JESS_filters.dagostino_calculate_values(yr_file, window=64, time_median_kernel=0)

Run a D’Agostino test on a Fits/Filterbank

Args:

yr_file: Your object

window: window size for the test

time_median_kernel: remove baseline by subtracting a running median

of time_median_kernel length long. Default is no subtraction

returns:

array of D’Agostino values for each window.

jess.JESS_filters.entropy_calculate_values(yr_file, window=64, time_median_kernel=0)

Calculate Shannon Entropy for blocks of length window.

Args:

yr_file: The Your object for a FITS/fil file to process

window: window length in number of time samples

time_median_kernel: Detrend in time by subtracting a smoothed running

median, smoothed bt time_median_kernel

bandpass_kernel: bandpass kernel to smooth bandpass

Returns:

Shannon values for each block in the file

jess.JESS_filters.fft_mad(dynamic_spectra: numpy.ndarray, chans_per_subband: int = 256, sigma: float = 3, time_median_size: int = 7, chans_per_fit: int = 50, fitter: object = <function poly_fitter>, bad_chans: typing.Optional[numpy.ndarray] = None, return_same_dtype: bool = True) numpy.ndarray

Takes the real FFT of the dynamic spectra along the time axis (a FFT for each channel). Then take the absolute value, this gives the magnitude of the power @ each frequency.

Then run the MAD filter along the freqency axis, this looks for outliers in the in the spectra. Narrow band RFI will only be in a few channels, add will be flagged.

This mask is then used to flag the complex FFT by setting the flagged points to zero. The first row is excluded because this is the powers for each channel. This could be zero, but it has so effect, and keeping it at its current value keeps the bandpass smooth.

The masked FFT is then inverse real fft back. Data is clip to min/max for the given input data type and returned as that data type.

Args:
dynamic_spectra: spectra block with time on the vertical axis,

and freq on the horizontal

chans_per_subband: number of frequency samples to calculate MAD

sigma: cutoff sigma

time_median_size: the length of the median filter to run in time

chans_per_fit: polynomial/spline knots per channel to fit the bandpass

fitter: which fitter to use, see jess.fitters for options

bad_chans: list of bad channels - these have all information

removed except for the power

return_same_dtype: return the same data type as given

return_mask: return the bool mask of flagged frequencies

Returns:

Dynamic Spectrum with narrow band perodic RFI removed.

(optional) bool mask of frequencies where bad=True

See:

jess.JESS_filters.filter_weights(dynamic_spectra: numpy.ndarray, metric: typing.Callable = <function median>, bandpass_smooth_length: int = 50, cut_sigma: float = 0.6666666666666666, smooth_sigma: int = 30) numpy.ndarray

Makes weights based on low values of some meteric. This is designed to ignore bandpass filters or tapers at the end of the bandpass.

Args:
dynamic_spectra - 2D dynamic spectra with time on the

vertical axis

metric - The statistic to sample.

bandpass_smooth_length - length of the median filter to

smooth the bandpass

sigma_cut - Cut values below (standard deviation)*(sigma cut)

smooth_sigma - Gaussian filter smoothing sigma. If =0, return

the mask where True=good channels

Returns:

Bandpass weights for sections of spectra with low values. 0 where the value is below the threshold and 1 elsewhere, with a Gaussian taper.

jess.JESS_filters.iterative_mad(dynamic_spectra: numpy.ndarray, factor: int, sigma: float = 4, time_median_size: int = 64, chans_per_subband: int = 256, flatten_to: int = 64, **filter_weight_args) numpy.ndarray

Interatively clean a chunk of dynamic spectra. If filter_weight_args is not None, masks channels that are unlickely to have signal. MAD and MAD FFT are run. Then the channels are decimated by factor and MAD is run again, decreasing chans_per_subband by factor. This is repeated until factor channels, when the mean is taken

Args:
dynamic_spectra: a dynamic spectra with time on the vertical axis,

and freq on the horizontal

factor - Factor to reduce each iteration

chans_per_subband: Number of channels to calculate the MAD

sigma: sigma which to reject outliers

flatten_to: the median of the output data

time_median_size: the length of the median filter to run in time

filter_weight_args: Passed to filter_weights, if None no

filter of channels.

Returns:

Cleaned Time Series

jess.JESS_filters.jarque_bera_calculate_values(yr_file, window=64, time_median_kernel=0)

Calculate the Jaque Bera statistic. Use robust ways to normalize the blocks.

Args:

yr_file: The Your object for a FITS/fil file to process

window: window length in number of time samples

time_median_kernel: Detrend in time by subtracting a smoothed running

median, smoothed bt time_median_kernel

bandpass_kernel: bandpass kernel to smooth bandpass

Returns:

Jarque Bera statistic

jess.JESS_filters.kurtosis_calculate_values(yr_file, window=64, time_median_kernel=0)

Calculate Kurtosis for blocks of length window.

Args:

yr_file: The Your object for a FITS/fil file to process

window: window length in number of time samples

time_median_kernel: Detrend in time by subtracting a smoothed running

median, smoothed bt time_median_kernel

bandpass_kernel: bandpass kernel to smooth bandpass

Returns:

Kurtosis

jess.JESS_filters.lilliefors_calculate_values(yr_file, window=64, time_median_kernel=0)

Calculate the Lilliefors statistic. Use robust ways to normalize the blocks.

Args:

yr_file: The Your object for a FITS/fil file to process

window: window length in number of time samples

time_median_kernel: Detrend in time by subtracting a smoothed running

median, smoothed bt time_median_kernel

bandpass_kernel: bandpass kernel to smooth bandpass

Returns:

Lilliefors statistic

jess.JESS_filters.mad_spectra(dynamic_spectra: numpy.ndarray, chans_per_subband: int = 256, sigma: float = 3, chans_per_fit: int = 50, fitter: object = <function poly_fitter>, return_same_dtype: bool = True) numpy.ndarray

Calculates Median Absolute Deviations along the spectral axis (i.e. for each time sample across all channels)

Args:

dynamic_spectra: dynamic spectra with time on the vertical axis, and freq on the horizontal

chans_per_subband: number of channels to calculate the MAD

sigma: cutoff sigma

chans_per_fit: polynomial/spline knots per channel to fit the bandpass

fitter: which fitter to use, see jess.fitters

return_same_dtype: return the same data type as given

return_mask: return the mask

Returns:

Dynamic Spectrum with values clipped

See:

https://github.com/rohinijoshi06/mad-filter-gpu

Notes:

mad_spectra_flat has better excision performance, you should consider that unless you need to keep the bandpass shape

jess.JESS_filters.mad_spectra_flat(dynamic_spectra: numpy.ndarray, chans_per_subband: int = 256, sigma: float = 3, flatten_to: int = 64, time_median_size: int = 0, return_same_dtype: bool = True, no_time_detrend: bool = False) numpy.ndarray

Calculates Median Absolute Deviations along the spectral axis (i.e. for each time sample across all channels). This flattens the data by subtracting the rolling median of median of time and frequencies. It then calculates the Median Absolute Deviation for every frame channels. Outliers are removed based on the assumption of Gaussian data. The dynamic spectra is then detrended again, masking the outliers. This process is then repeated again. The data is returned centerned around flatten_to with removed points set as flatten_to.

Args:
dynamic_spectra: a dynamic spectra with time on the vertical axis,

and freq on the horizontal

chans_per_subband: number of channels to calculate the MAD

sigma: sigma which to reject outliers

flatten_to: the median of the output data

time_median_size: the length of the median filter to run in time

return_same_dtype: return the same data type as given

no_time_detrend: Don’t deterend in time, useful fo low dm

where pulse>%50 of the channel

Returns:

Dynamic Spectrum with values clipped

See:

https://github.com/rohinijoshi06/mad-filter-gpu

Kendrick Smith’s talks about CHIME FRB

Note:

This has better performance than spectral_mad, you should probably use this one.

jess.JESS_filters.mad_time(gulp: numpy.ndarray, sigma: float = 6, frame: int = 128, return_values: bool = False) numpy.ndarray

Calculates the spectral Kurtosis along the time axis

Args:

gulp: the dynamic spectum to be analyzed

` sigma on to cut kurtosis values

frame: number of time samples to calculate the kurtosis

apply_mask: Apply the mask to the data, replacing bad values with zeros

Returns:

Mask based on bad iqr sections

optional: apply mask as replace with zeros

jess.JESS_filters.mad_time_cutter(gulp, frame=256, sigma=10)

Calculates Median Absolute Deviations along the time axis

Args:

frame: number of time samples to calculate the kurtosis

sigma: cutoff sigma

Returns:

Dynamic Spectrum with values clipped

jess.JESS_filters.run_filter(file: str, filter_name: str, window: int = 64, time_median_kernel: int = 0)

Runs filter on a file

jess.JESS_filters.sad_spectra(gulp, frame=128, window=65, sigma=3, clip=True)

Calculates Savgol Absolute Deviations along the spectral axis

Args:

frame: number of time samples to calculate the SAD

sigma: cutoff sigma

Returns:

Dynamic Spectrum with values clipped

jess.JESS_filters.sad_time(gulp, frame=128, window=65, sigma=3, clip=True)

Calculates Savgol Absolute Deviations along the time axis

Args:

frame: number of time samples to calculate the SAD

sigma: cutoff sigma

Returns:

Dynamic Spectrum with values clipped

jess.JESS_filters.skew_calculate_values(yr_file, window=64, time_median_kernel=0)

Calculate Skew for blocks of length window.

Args:

yr_file: The Your object for a FITS/fil file to process

window: window length in number of time samples

time_median_kernel: Detrend in time by subtracting a smoothed running

median, smoothed bt time_median_kernel

bandpass_kernel: bandpass kernel to smooth bandpass

Returns:

Skew values for each block in the file

jess.JESS_filters.std_iqr_calculate_values(yr_file: object, window: int = 64, time_median_kernel: int = 0, bandpass_kernel: int = 7) numpy.ndarray

Take the difference between Standard Deviation and Inter Quatile Range (scaled to give STD) This is the difference between a robust measure and non-robust measure of scale. For Gaussian data then should agree, with outliers, this will differ.

Args:

yr_file: The Your object for a FITS/fil file to process

window: window length in number of time samples

time_median_kernel: Detrend in time by subtracting a smoothed running

median, smoothed bt time_median_kernel

bandpass_kernel: bandpass kernel to smooth bandpass

Returns:

Distances for each block

Notes:

Kevin’s idea

jess.JESS_filters.sum_threasthold_aprls(dynamic_spectra: numpy.ndarray) numpy.ndarray

Computes a mask to cover the RFI in a data set based on ArPLS-ST.

Args:

dynamic_spectra - Array containing the signal and RFI

eta_i - List of sensitivities

Returns:

2D mask where True = RFI

Note:

From http://zmtt.bao.ac.cn/GPPS/RFI/ Cite: https://ui.adsabs.harvard.edu/abs/2021MNRAS.500.2969Z

jess.JESS_filters.sum_threshold(dynamic_spectra: numpy.ndarray, mask: Optional[numpy.ndarray] = None, eta_i: Union[List[float], Tuple[float]] = (0.5, 0.55, 0.62, 0.75, 1), chi_1: float = 35000, normalize_standing_waves: bool = True, suppress_dilation: bool = False, sm_kwargs: Optional[Dict] = None, di_kwargs: Optional[Dict] = None, max_pixels: int = 8) numpy.ndarray

Computes a mask to cover the RFI in a data set.

Args:

dynamic_spectra - Array containing the signal and RFI

chi_1 - First threshold

eta_i - List of sensitivities

max_pixels - Controls the max iteration and chi_1

KWArgs:

normalize_standing_waves - Whether to normalize standing waves

suppress_dilation - If true, mask dilation is suppressed

plotting - True if statistics plot should be displayed

sm_kwargs - Smoothing key words

di_kwargs - dilation key words

Returns:

mask - the mask covering the identified RFI

Note:

From https://cosmo-gitlab.phys.ethz.ch/cosmo_public/seek/-/blob/master/seek/mitigation/sum_threshold.py Cite https://arxiv.org/abs/1607.07443

jess.JESS_filters.zero_dm(dynamic_spectra: numpy.ndarray, bandpass: typing.Optional[numpy.ndarray] = None, return_same_dtype: bool = True, intermediate_dtype: type = <class 'numpy.float32'>) jess.JESS_filters.FilterResult

Mask-safe zero-dm subtraction

args:
dynamic_spectra: The data you want to zero-dm, expects times samples

on the vertical axis. Accepts numpy.ma.arrays.

bandpass - Use if a large file is broken up into pieces.

Be careful about how you use this with masks.

intermediate_dtype - The data type to do the calculations

return_same_dtype: return the same data type as given

returns:

dynamic spectra with a (more) uniform zero time series

note:

This should masked values. I am mainly conserned with bad data being spread out ny the filter, and this ignores masked values when calculating time series and bandpass

example:

yr = Your(“some.fil”) dynamic_spectra = yr.get_data(744000, 2 ** 14)

mask = np.zeros(yr.your_header.nchans, dtype=bool) mask[0:100] = True # mask the first hundred channels

dynamic_spectra = np.ma.array(dynamic_spectra,

mask=np.broadcast_to(dynamic_spectra.shape))

cleaned = zero_dm(dynamic_spectra)

from:

“An interference removal technique for radio pulsar searches” R.P Eatough 2009

see:

https://github.com/SixByNine/sigproc/blob/28ba4f4539d41a8722c6ed194fa66e87bf4610fc/src/zerodm.c#L195

https://sourceforge.net/p/heimdall-astro/code/ci/master/tree/Pipeline/clean_filterbank_rfi.cu

https://github.com/scottransom/presto/blob/de2cf58262190d35fb37dbebf8308a6e29d72adf/src/zerodm.c

https://github.com/thepetabyteproject/your/blob/1f4b39326835e6bb87e0003318b433dc1455a137/your/writer.py#L232

https://sigpyproc3.readthedocs.io/en/latest/_modules/sigpyproc/Filterbank.html#Filterbank.removeZeroDM

jess.JESS_filters.zero_dm_fft(dynamic_spectra: numpy.ndarray, bandpass: Optional[numpy.ndarray] = None, modes_to_zero: int = 2, return_same_dtype: bool = True) jess.JESS_filters.FilterResult

This removes low frequency components from each spectra. This extends 0-DM subtraction. 0-DM subtraction as described in Eatough 2009, involves subtraction of the mean of each spectra from each spectra, makeing the zero-DM time series contant. This is effective in removing broadband RFI that has no structure.

This is very effective for moderate bandwidths and low dynamic ranges. As bandwidths increase, we can see zero-DM RFI that only extends through part of the band. Increases in dynamic range allow for zero-DM RFI to have spectral structure, either intrinsically or the result of the receiving chain.

This attempts to corret for these problems with the subtraction method. This removes the zero Fourier term (the total power), equivalent to the subtraction method. It also can remove higher order terms, removing slowing signals across the band.

You need to be careful about how many terms you remove. We will start to to remove more components of the pulse. When this happens is determined by the fraction of the band that contains the pulse. The larger the pulse, the lower the Fourier components.

args:
dynamic_spectra - The dynamic spectra you want to clean. Time axis

must be verticals

bandpass - Bandpass to add. We subtract off the DC component, we

must add it back to safely write the data as unsigned ints if no bandpass is given, this will use the bandpass from the dynamic spectra given, this can cause jumps if you are processing multiple chunks.

modes_to_zero - The number of modes to filter, starting at the lowest mode

return_same_dtype: return the same data type as given

returns:

dynamic spectra with low frequency modes filtered, same data type as given

notes:

See jess.filters.zero_dm Docstring for other implementations of subtraction 0-dm filters.

jess.JESS_filters_cupy module

This contains cupy versions of some of JESS_filters

class jess.JESS_filters_cupy.FilterMaskResult(dynamic_spectra: numpy.ndarray, mask: cupy._core.core.ndarray, percent_masked: numpy.float64)

Bases: tuple

dynamic_spectra - Dynamic Spectra with RFI filtered mask - Boolean mask percent_masked - The percent masked

dynamic_spectra: numpy.ndarray

Alias for field number 0

mask: cupy._core.core.ndarray

Alias for field number 1

percent_masked: numpy.float64

Alias for field number 2

class jess.JESS_filters_cupy.FilterResult(dynamic_spectra: cupy._core.core.ndarray, percent_masked: numpy.float64)

Bases: tuple

dynamic_spectra - Dynamic Spectra with RFI filtered percent_masked - The percent masked

dynamic_spectra: cupy._core.core.ndarray

Alias for field number 0

percent_masked: numpy.float64

Alias for field number 1

jess.JESS_filters_cupy.fft_mad(dynamic_spectra: cupy._core.core.ndarray, chans_per_subband: int = 256, sigma: float = 3, time_median_size: int = 7, chans_per_fit: int = 50, fitter: typing.Callable = <function poly_fitter>, bad_chans: typing.Optional[numpy.ndarray] = None, return_same_dtype: bool = True) cupy._core.core.ndarray

Takes the real FFT of the dynamic spectra along the time axis (a FFT for each channel). Then take the absolute value, this gives the magnitude of the power @ each frequency.

Then run the MAD filter along the freqency axis, this looks for outliers in the in the spectra. Narrow band RFI will only be in a few channels, add will be flagged.

This mask is then used to flag the complex FFT by setting the flagged points to zero. The first row is excluded because this is the powers for each channel. This could be zero, but it has so effect, and keeping it at its current value keeps the bandpass smooth.

The masked FFT is then inverse real fft back. Data is clip to min/max for the given input data type and returned as that data type.

Args:
dynamic_spectra: spectra block with time on the vertical axis,

and freq on the horizontal

chans_per_subband: number of frequency samples to calculate MAD

time_median_size: the length of the median filter to run in time

sigma: cutoff sigma

chans_per_fit: polynomial/spline knots per channel to fit the bandpass

fitter: which fitter to use, see jess.fitters for options

bad_chans: list of bad channels - these have all information

removed except for the power

return_same_dtype: return the same data type as given

return_mask: return the bool mask of flagged frequencies

Returns:

Dynamic Spectrum with narrow band perodic RFI removed.

(optional) bool mask of frequencies where bad=True

See:

Note:

This provides a 1% difference in masks from the CPU version. This results in a 0.1% higher standard deviation of the zero dm time series. This seems negligible, this version provides 2x speed up on a GTX 1030 over 24 threads of X5675.

jess.JESS_filters_cupy.filter_weights(dynamic_spectra: numpy.ndarray, metric: typing.Callable = <function median>, bandpass_smooth_length: int = 50, cut_sigma: float = 0.6666666666666666, smooth_sigma: int = 30) numpy.ndarray

Makes weights based on low values of some meteric. This is designed to ignore bandpass filters or tapers at the end of the bandpass.

Args:
dynamic_spectra - 2D dynamic spectra with time on the

vertical axis

metric - The statistic to sample.

bandpass_smooth_length - length of the median filter to

smooth the bandpass

sigma_cut - Cut values below (standard deviation)*(sigma cut)

smooth_sigma - Gaussian filter smoothing sigma. If =0, return

the mask where True=good channels

Returns:

Bandpass weights for sections of spectra with low values. 0 where the value is below the threshold and 1 elsewhere, with a Gaussian taper.

jess.JESS_filters_cupy.iterative_mad(dynamic_spectra: numpy.ndarray, factor: int, sigma: float = 4, time_median_size: int = 64, chans_per_subband: int = 256, flatten_to: int = 64, **filter_weight_args) jess.JESS_filters_cupy.FilterResult

Interatively clean a chunk of dynamic spectra. If filter_weight_args is not None, masks channels that are unlikely to have signal. MAD and MAD FFT are run. Then the channels are decimated by factor and MAD is run again, decreasing chans_per_subband by factor. This is repeated until factor channels, when the mean is taken

Args:
dynamic_spectra: a dynamic spectra with time on the vertical axis,

and freq on the horizontal

factor - Factor to reduce each iteration

chans_per_subband: Number of channels to calculate the MAD

sigma: sigma which to reject outliers

flatten_to: the median of the output data

time_median_size: the length of the median filter to run in time

filter_weight_args: Passed to filter_weights, if None no

filter of channels.

Returns:

Cleaned Time Series

jess.JESS_filters_cupy.mad_spectra(dynamic_spectra: cupy._core.core.ndarray, chans_per_subband: int = 256, sigma: float = 3, chans_per_fit: int = 50, return_same_dtype: bool = True) cupy._core.core.ndarray

Calculates Median Absolute Deviations along the spectral axis (i.e. for each time sample across all channels)

Args:
dynamic_spectra: spectra with time on the vertical axis,

and freq on the horizontal

frame: number of frequency samples to calculate the MAD

sigma: cutoff sigma

poly_order: polynomial order to fit for the bandpass

return_same_dtype: return the same data type as given

Returns:

Dynamic Spectrum with values clipped

Notes:

This version differs from the cpu version. This uses nans, while cpu version uses np.ma mask, excision performance is about the same. See the cpu docstring for references.

mask = True, for good values

You should use spectral_mad_flat (which has better RFI removal) unless you really need to preserve the bandpass.

jess.JESS_filters_cupy.mad_spectra_flat(dynamic_spectra: cupy._core.core.ndarray, chans_per_subband: int = 256, sigma: float = 3, flatten_to: int = 64, time_median_size: int = 0, return_same_dtype: bool = True, no_time_detrend: bool = False) cupy._core.core.ndarray

Calculates Median Absolute Deviations along the spectral axis (i.e. for each time sample across all channels). This flattens the data by subtracting the rolling median of median of time and frequencies. It then calculates the Median Absolute Deviation for every frame channels. Outliers are removed based on the assumption of Gaussian data. The dynamic spectra is then detrended again, masking the outliers. This process is then repeated again. The data is returned centerned around flatten_to with removed points set as flatten_to.

Args:
dynamic_spectra: a dynamic spectra with time on the vertical axis,

and freq on the horizontal

chans_per_subband: number of channels to calculate the MAD

sigma: sigma which to reject outliers

flatten_to: the median of the output data

time_median_size: the length of the median filter to run in time

return_mask: return the mask where True=masked_values

return_same_dtype: return the same data type as given

no_time_detrend: Don’t deterend in time, useful fo low dm

where pulse>%50 of the channel

Returns:

Dynamic Spectrum with values clipped

See:

https://github.com/rohinijoshi06/mad-filter-gpu

Kendrick Smith’s talks about CHIME FRB

Note:

This has better performance than spectral_mad, you should probably use this one.

jess.JESS_filters_cupy.zero_dm(dynamic_spectra: cupy._core.core.ndarray, bandpass: typing.Optional[cupy._core.core.ndarray] = None, return_same_dtype: bool = True, intermediate_dtype: type = <class 'numpy.float32'>) jess.JESS_filters_cupy.FilterResult

Mask-safe zero-dm subtraction

args:
dynamic_spectra: The data you want to zero-dm, expects times samples

on the vertical axis. Accepts numpy.ma.arrays.

bandpass - Use if a large file is broken up into pieces.

Be careful about how you use this with masks.

intermediate_dtype - The data type to do the calculations

return_same_dtype: return the same data type as given

returns:

dynamic spectra with a (more) uniform zero time series

note:

This should masked values. I am mainly conserned with bad data being spread out ny the filter, and this ignores masked values when calculating time series and bandpass

The CPU version of this uses np.ma, this isn’t available form cupy, so I don’t use it here. This doesn’t seem when writing this.

example:

yr = Your(“some.fil”) dynamic_spectra = yr.get_data(744000, 2 ** 14)

mask = np.zeros(yr.your_header.nchans, dtype=bool) mask[0:100] = True # mask the first hundred channels

dynamic_spectra = np.ma.array(dynamic_spectra,

mask=np.broadcast_to(dynamic_spectra.shape))

cleaned = zero_dm(dynamic_spectra)

from:

“An interference removal technique for radio pulsar searches” R.P Eatough 2009

see:

https://github.com/SixByNine/sigproc/blob/28ba4f4539d41a8722c6ed194fa66e87bf4610fc/src/zerodm.c#L195

https://sourceforge.net/p/heimdall-astro/code/ci/master/tree/Pipeline/clean_filterbank_rfi.cu

https://github.com/scottransom/presto/blob/de2cf58262190d35fb37dbebf8308a6e29d72adf/src/zerodm.c

https://github.com/thepetabyteproject/your/blob/1f4b39326835e6bb87e0003318b433dc1455a137/your/writer.py#L232

https://sigpyproc3.readthedocs.io/en/latest/_modules/sigpyproc/Filterbank.html#Filterbank.removeZeroDM

jess.JESS_filters_cupy.zero_dm_fft(dynamic_spectra: cupy._core.core.ndarray, bandpass: Optional[cupy._core.core.ndarray] = None, modes_to_zero: int = 2, return_same_dtype: bool = True) jess.JESS_filters_cupy.FilterResult

This removes low frequency components from each spectra. This extends 0-DM subtraction. 0-DM subtraction as described in Eatough 2009, involves subtraction of the mean of each spectra from each spectra, makeing the zero-DM time series contant. This is effective in removing broadband RFI that has no structure.

This is very effective for moderate bandwidths and low dynamic ranges. As bandwidths increase, we can see zero-DM RFI that only extends through part of the band. Increases in dynamic range allow for zero-DM RFI to have spectral structure, either intrinsically or the result of the receiving chain.

This attempts to corret for these problems with the subtraction method. This removes the zero Fourier term (the total power), equivalent to the subtraction method. It also can remove higher order terms, removing slowing signals across the band.

You need to be careful about how many terms you remove. We will start to to remove more components of the pulse. When this happens is determined by the fraction of the band that contains the pulse. The larger the pulse, the lower the Fourier components.

args:
dynamic_spectra - The dynamic spectra you want to clean. Time axis

must be vertical

bandpass - Bandpass to add. We subtract off the DC component, we

must add it back to safely write the data as unsigned ints if no bandpass is given, this will use the bandpass from the dynamic spectra given, this can cause jumps if you are processing multiple chunks.

modes_to_zero - The number of modes to filter.

return_same_dtype: return the same data type as given

returns:

dynamic spectra with low frequency modes filtered, same data type as given

notes:

See jess.filters.zero_dm Docstring for other implementations of subtraction 0-dm filters.

jess.JESS_filters_generic module

Generic (cupy/numpy) filters.

This is a test of writing generic backend filters. It relies on the user giving the correct ndarray.

If this is successful, we should merge JESS_fiters and JESS_fiters_cupy into here.

class jess.JESS_filters_generic.FilterMaskResult(dynamic_spectra: cupy._core.core.ndarray, mask: cupy._core.core.ndarray, percent_masked: numpy.float64)

Bases: tuple

dynamic_spectra - Dynamic Spectra with RFI filtered mask - Boolean mask percent_masked - The percent masked

dynamic_spectra: cupy._core.core.ndarray

Alias for field number 0

mask: cupy._core.core.ndarray

Alias for field number 1

percent_masked: numpy.float64

Alias for field number 2

jess.JESS_filters_generic.jarque_bera(dynamic_spectra: cupy._core.core.ndarray, samples_per_block: int = 4096, sigma: float = 4, detrend: typing.Optional[typing.Tuple] = (<function median_fitter>, 40), winsorize_args: typing.Optional[typing.Tuple] = (5, 40), nan_policy: typing.Optional[str] = None) cupy._core.core.ndarray

Jarque-Bera Gaussianity test, this uses a combination of Kurtosis and Skew. We calculate Jarque-Bera along the time axis in blocks of samples_per_block. This is balanced if the number of samples is not evenly divisible. The Jarque-Bera statstic is Chi-Squared distributed with two degrees of freedom. This makes our Gaussian outlier flagging remove more data than expected. To combate this we take the squareroot of the Jarque Statistic, this makes the distrabution more Gaussian and the flagging work better.

Args:

dynamic_spectra - Section spectra time on the vertical axis

samples_per_block - Time samples for each channel block

detrend - Detrend Kurtosis and Skew values (fitter, chans_per_fit).

If None, no detrend

winsorize_args - Winsorize the second moments. See scipy_cupy.stats.winsorize

If None, no winorization.

nan_policy - How to propagate nans. If None, does not check for nans.

Returns:

bool Mask with True=bad data

jess.JESS_filters_generic.kurtosis_and_skew(dynamic_spectra: cupy._core.core.ndarray, samples_per_block: int = 4096, sigma: float = 4, detrend: typing.Optional[typing.Tuple] = (<function median_fitter>, 40), winsorize_args: typing.Optional[typing.Tuple] = (5, 40), nan_policy: typing.Optional[str] = None) cupy._core.core.ndarray

Gaussainity test using Kurtosis and Skew. We calculate Kurtosis and skew along the time axis in blocks of samples_per_block. This is balanced if the number of samples is not evenly divisible. We then use the central limit theorem to flag outlying samples in Kurtosis and Skew individually. These masks are then added together.

Args:

dynamic_spectra - Section spectra time on the vertical axis

samples_per_block - Time samples for each channel block

detrend - Detrend Kurtosis and Skew values (fitter, chans_per_fit).

If None, no detrend

winsorize_args - Winsorize the second moments. See scipy_cupy.stats.winsorize

If None, no winorization.

nan_policy - How to propagate nans. If None, does not check for nans.

Returns:

bool Mask with True=bad data

Notes:

Flagging based on https://www.worldscientific.com/doi/10.1142/S225117171940004X

jess.JESS_filters_generic.mad_spectra_flat(dynamic_spectra: cupy._core.core.ndarray, chans_per_subband: int = 256, sigma: float = 4, flatten_to: int = 64, time_median_size: int = 0, mask_chans: bool = False, return_same_dtype: bool = True, no_time_detrend: bool = False) cupy._core.core.ndarray

Calculates Median Absolute Deviations along the spectral axis (i.e. for each time sample across all channels). This flattens the data by subtracting the rolling median of median of time and frequencies. It then calculates the Median Absolute Deviation for every frame channels. Outliers are removed based on the assumption of Gaussian data. The dynamic spectra is then detrended again, masking the outliers. This process is then repeated again. The data is returned centerned around flatten_to with removed points set as flatten_to.

Args:
dynamic_spectra: a dynamic spectra with time on the vertical axis,

and freq on the horizontal

chans_per_subband: number of channels to calculate the MAD

sigma: sigma which to reject outliers

flatten_to: the median of the output data

time_median_size: the length of the median filter to run in time

mask_chan - Mask bad channels based on differing mean and median

seems to be a good test for non-linearity

return_same_dtype: return the same data type as given

no_time_detrend: Don’t deterend in time, useful fo low dm

where pulse>%50 of the channel

Returns:

Dynamic Spectrum with values clipped

See:

https://github.com/rohinijoshi06/mad-filter-gpu

Kendrick Smith’s talks about CHIME FRB

Note:

This has better performance than spectral_mad, you should probably use this one.

jess.calculators module

The repository for all calculators

jess.calculators.accumulate(data_array: numpy.ndarray, factor: int, axis: int) numpy.ndarray

Reduce the data along an axis by taking the mean of a ‘factor’ of rows along the axis

args:

data_array: array of data to be reduces

factor: the factor to reduce the dimension by

axis: axis to operate on

returns:

array with axis reduced by factor

jess.calculators.autocorrelate(data: numpy.ndarray, axis: int = - 1) numpy.ndarray

Auto correlation along an axis

Args:

data: data to find the autocorrelaton

axis: axis to find the autocorrlation, -1, 0, 1 available

Returns:

Auto correlation along an axis

Notes:

Uses mean to flatten, if complex structure, should do a better detrend

jess.calculators.balance_chans_per_subband(num_chans: int, chans_per_subband: int) Tuple[int, numpy.ndarray]

Balance chan_per_subband when they are not evenly dividable

Args:

num_chans: total number of channels

num_subbands: number of subbands

Return:

[number of sections, array with start and stops of each of the subband]

jess.calculators.closest_larger_factor(num: int, factor: int) int

Find the closest factor that is larger than a number.

args:

num: The number of to find the largest factor

factor: Factor to divide by

returns:

Closest factor of factor larger than num

jess.calculators.decimate(dynamic_spectra: numpy.ndarray, time_factor: typing.Optional[int] = None, freq_factor: typing.Optional[int] = None, backend: typing.Callable = <function decimate>) numpy.ndarray

Makes decimates along either/both time and frequency axes. Flattens data along frequency before freqency decimation. Fattens again in frequency before returning.

args:

dynamic_spectra: dynamic spectra with time on the ventricle axis

time_factor: factor to reduce time sampling by

freq_factor: factor to reduce freqency channels

backend: backend to use to reduce the dimension, default is

signal.decimate, consider using jess.calculator.mean

returns:

Flattened in frequency dynamic spectra, reduced in time and/or freqency

jess.calculators.divide_range(length: int, num_sections: int) numpy.ndarray

Divide range as evenly as possible.

Args:

length: length of array

num_sections: number of sections to divide array

Return:

array with start and stops of each of the subsections

Note:

Adapted from numpy.lib.shape_base.array_split and subject to the numpy license

jess.calculators.flattner_median(dynamic_spectra: numpy.ndarray, flatten_to: int = 64, kernel_size: int = 0, return_same_dtype: bool = False, return_time_series: bool = False, intermediate_dtype: type = <class 'numpy.float32'>) Union[numpy.ndarray, Tuple]

This flattens the dynamic spectra by subtracting the medians of the time series and then the medians of the of bandpass. Then add flatten_to to all the pixels so that the data can be keep as the same data type.

args:

dynamic_spectra: The dynamic spectra you want to flatten

flatten_to: The number to set as the baseline

kernel_size: The size of the median filter to run over the medians

0,1 => nothing is applied

intermediate_dtype: the data type of the intermediate calculation

return_same_dtype: return the same data type as dynamic_spectra

return_time_series: return the time series difference from flatten_to

median_time_series - flatten_to

returns:

Dynamic spectra flattened in frequency and time (optional) time series median

jess.calculators.flattner_mix(dynamic_spectra: numpy.ndarray, flatten_to: int = 64, kernel_size: int = 0, return_same_dtype: bool = False, return_time_series: bool = False, intermediate_dtype: type = <class 'numpy.float32'>) Union[numpy.ndarray, Tuple]

This flattens the dynamic spectra by subtracting the medians of the time series and then the medians of the of bandpass. Then add flatten_to to all the pixels so that the data can be keep as the same data type.

This uses medians subtraction on the time series. This is less agressive and leaved the mean subtraction for the zero-dm.

Mean subtraction across the spectrum allows for smoother transition between blocks.

args:

dynamic_spectra: The dynamic spectra you want to flatten

flatten_to: The number to set as the baseline

0,1 => nothing is applied

kernel_size: The size of the median filter to run over the medians

return_same_dtype: return the same data type as given, else it will

at least np.int64

return_time_series: return the time series median differences from flatten_to

median_time_series - flatten_to

returns:

Dynamic spectra flattened in frequency and time (optional) time series medians

jess.calculators.guassian_noise_adder(standard_deviations: numpy.ndarray) float

Add Gaussian noise from multiple channels to estimate the time series

Args:

standard_deviation: Standard Deviations along the bandpass

Returns:

time series standard deviation

Notes:

https://en.wikipedia.org/wiki/Sum_of_normally_distributed_random_variables#Independent_random_variables

Variances add at the sum of squares, we are interested in Standard deviation, so talk the square root.

We are taking the mean, do divide by the number of channels (=number of standard deviations)

jess.calculators.highpass_window(window_length: int) numpy.ndarray

Calculates the coefficients to multiply the Fourier components to make a highpass filter.

Args:

window_length: the length of the half window

Returns:

Half of an inverted blackman window, will bw window_length long

jess.calculators.mean(data_array: numpy.ndarray, factor: int, axis: int, pad: str = 'median') numpy.ndarray

Reduce the data along an axis by taking the mean of a ‘factor’ of rows along the axis

args:

data_array: array of data to be reduces

factor: the factor to reduce the dimension by

axis: axis to operate on

pad: method to pad if axis is not divisible. If None

will not pad

returns:

array with axis reduced by factor

jess.calculators.median_abs_deviation_med(x: numpy.ndarray, axis: int = 0, center: object = <function median>, scale: float = 1.0, nan_policy: str = 'propagate')

Compute the median absolute deviation of the data along the given axis. The median absolute deviation (MAD, [1]_) computes the median over the absolute deviations from the median. It is a measure of dispersion similar to the standard deviation but more robust to outliers 2. The MAD of an empty array is np.nan. .. versionadded:: 1.5.0 Parameters ———- x : array_like

Input array or object that can be converted to an array.

axisint or None, optional

Axis along which the range is computed. Default is 0. If None, compute the MAD over the entire array.

centercallable, optional

A function that will return the central value. The default is to use np.median. Any user defined function used will need to have the function signature func(arr, axis).

scalescalar or str, optional

The numerical value of scale will be divided out of the final result. The default is 1.0. The string “normal” is also accepted, and results in scale being the inverse of the standard normal quantile function at 0.75, which is approximately 0.67449. Array-like scale is also allowed, as long as it broadcasts correctly to the output such that out / scale is a valid operation. The output dimensions depend on the input array, x, and the axis argument.

nan_policy{‘propagate’, ‘raise’, ‘omit’}, optional

Defines how to handle when input contains nan. The following options are available (default is ‘propagate’): * ‘propagate’: returns nan * ‘raise’: throws an error * ‘omit’: performs the calculations ignoring nan values

madscalar or ndarray

If axis=None, a scalar is returned. If the input contains integers or floats of smaller precision than np.float64, then the output data-type is np.float64. Otherwise, the output data-type is the same as that of the input.

center : The centers of each array See Also ——– numpy.std, numpy.var, numpy.median, scipy.stats.iqr, scipy.stats.tmean, scipy.stats.tstd, scipy.stats.tvar Notes —– Modifed from scipy.stats.median_abs_devation

The center argument only affects the calculation of the central value around which the MAD is calculated. That is, passing in center=np.mean will calculate the MAD around the mean - it will not calculate the mean absolute deviation. The input array may contain inf, but if center returns inf, the corresponding MAD for that data will be nan. References ———- .. [1] “Median absolute deviation”,

2

“Robust measures of scale”, https://en.wikipedia.org/wiki/Robust_measures_of_scale

When comparing the behavior of median_abs_deviation with np.std, the latter is affected when we change a single value of an array to have an outlier value while the MAD hardly changes: >>> from scipy import stats >>> x = stats.norm.rvs(size=100, scale=1, random_state=123456) >>> x.std() 0.9973906394005013 >>> stats.median_abs_deviation(x) 0.82832610097857 >>> x[0] = 345.6 >>> x.std() 34.42304872314415 >>> stats.median_abs_deviation(x) 0.8323442311590675 Axis handling example: >>> x = np.array([[10, 7, 4], [3, 2, 1]]) >>> x array([[10, 7, 4],

[ 3, 2, 1]])

>>> stats.median_abs_deviation(x)
array([3.5, 2.5, 1.5])
>>> stats.median_abs_deviation(x, axis=None)
2.0
Scale normal example:
>>> x = stats.norm.rvs(size=1000000, scale=2, random_state=123456)
>>> stats.median_abs_deviation(x)
1.3487398527041636
>>> stats.median_abs_deviation(x, scale='normal')
1.9996446978061115
jess.calculators.noise_calculator(yr_obj: your.your.Your, num_samples: int = 1000, len_block: int = 128, detrend: bool = True, kernel_size: int = 17) Tuple[numpy.ndarray, numpy.float64]

Calculate the ideal noise of a file

  1. Get num_samples random start samples of len_block

  2. Optionally detrend the data in time to remove power level trends

  3. Use a robust measure to estimate the noise in each channel Here we use IQR that is then scared

  4. Optionally use a Median filter to filter channels that are contaminated at at the 25th to 75th level

  5. Add up these standard deviations to find the time series standard deviations

Args:

yr_object: your object for the file to process

num_samples: number of samples to take

detrend: detrend along the time axis before finding the dispersion

kernel_size: the kernel size for the median filter. If 0 or 1

no filter is applied

Returns:

Ideal standard deviation of time series, Real standard deviation of the time series

Notes:

detrend would be useful if there are power level changes in the data. This will cause the Standard deviation to be to high. This will remove random trends as well, so this could lead to underestimating the noise level

IQR and MAD do a reasonable job at estimating the dispersion of a channel as long as the channel is not filled with RFI. If there is RFI power at all the percentiles this will cause the IQR to be too high, so we use a median filter to remove these channels. By using smallish blocks, we hope to avoid trends that can’t be detrened linearly

By calculating all the channels independently, then adding the standard deviations we should avoid correlated (zero dm) noise that shows up across off the channels

Calculate the standard deviation of the zero dm time series by collapsing all the channels for each random block. Then take the median of all these standard deviations.

jess.calculators.pad_along_axis(array: numpy.ndarray, new_length: int, axis: int = 0, mode: str = 'median', location: str = 'middle')

Pad along an axis.

args:

array: Array to pad

new_length: New length of the axis

axis: Axis to be padded

mode: mode to pad, see numpy.pad

location: Location of the pad. Options are

[end, start, middle]

return:

Array padded along axis

Based on https://stackoverflow.com/a/49766444

jess.calculators.preprocess(data: numpy.ndarray, central_value_calc: str = 'mean', disperion_calc: str = 'std') Tuple[numpy.ndarray, numpy.ndarray]

Preprocess the array for later statistical tests

Args:

data: 2D dynamic spectra to process

central_value_calc: The method to calculate the central value,

dispersion_calc: The method to calculate the dispersion

Returns:

data preprocessed along axis=0, axis=1

jess.calculators.shannon_entropy(data: numpy.ndarray, axis: int = 0) numpy.ndarray

Calculates the Shannon Entropy along a given axis.

Return entropy in natural units.

Args:

data: 2D Array to calculate entropy

axis: axis to calculate entropy

Returns:

Shannon Entropy along an axis

jess.calculators.to_dtype(data: numpy.ndarray, dtype: numpy.dtype) numpy.ndarray

Takes a chunk of data and changes it to a given data type.

Args:

data: Array that you want to convert

dtype: The output data type

Returns:

data converted to dtype

jess.calculators_cupy module

The repository for all calculators

jess.calculators_cupy.closest_larger_factor(num: int, factor: int) int

Find the closest factor that is larger than a number.

args:

num: The number of to find the largest factor

factor: Factor to divide by

returns:

Closest factor of factor larger than num

jess.calculators_cupy.decimate(dynamic_spectra: cupy._core.core.ndarray, time_factor: typing.Optional[int] = None, freq_factor: typing.Optional[int] = None, backend: typing.Callable = <function mean>) cupy._core.core.ndarray

Makes decimates along either/both time and frequency axes. Flattens data along frequency before freqency decimation. Fattens again in frequency before returning.

args:

dynamic_spectra: dynamic spectra with time on the ventricle axis

time_factor: factor to reduce time sampling by

freq_factor: factor to reduce freqency channels

returns:

Flattened in frequency dynamic spectra, reduced in time and/or freqency

notes:

Always uses jess.calculator_cupy.mean to add data

jess.calculators_cupy.flattner_median(dynamic_spectra: cupy._core.core.ndarray, flatten_to: int = 64, kernel_size: int = 0, return_same_dtype: bool = False, return_time_series: bool = False, intermediate_dtype: type = <class 'numpy.float32'>) Union[cupy._core.core.ndarray, Tuple]

This flattens the dynamic spectra by subtracting the medians of the time series and then the medians of the of bandpass. Then add flatten_to to all the pixels so that the data can be keep as the same data type.

args:

dynamic_spectra: The dynamic spectra you want to flatten

flatten_to: The number to set as the baseline

kernel_size: The size of the median filter to run over the medians

0,1 => nothing is applied

intermediate_dtype: the data type of the intermediate calculation

return_same_dtype: return the same data type as dynamic_spectra

return_time_series: return the time series difference from flatten_to

median_time_series - flatten_to

returns:

Dynamic spectra flattened in frequency and time (optional) time series medians

jess.calculators_cupy.flattner_mix(dynamic_spectra: cupy._core.core.ndarray, flatten_to: int = 64, kernel_size: int = 0, return_same_dtype: bool = False, return_time_series: bool = False, intermediate_dtype: type = <class 'numpy.float32'>) Union[cupy._core.core.ndarray, Tuple]

This flattens the dynamic spectra by subtracting the medians of the time series and then the medians of the of bandpass. Then add flatten_to to all the pixels so that the data can be keep as the same data type.

This uses medians subtraction on the time series. This is less agressive and leaved the mean subtraction for the zero-dm.

Mean subtraction across the spectrum allows for smoother transition between blocks.

args:

dynamic_spectra: The dynamic spectra you want to flatten

flatten_to: The number to set as the baseline

kernel_size: The size of the median filter to run over the medians

0,1 => nothing is applied

return_same_dtype: return the same data type as given, else it will

at least np.int64

return_time_series: return the time series median differences from flatten_to

median_time_series - flatten_to

returns:

Dynamic spectra flattened in frequency and time (optional) time series medians

jess.calculators_cupy.mean(data_array: cupy._core.core.ndarray, factor: int, axis: int, pad: str = 'median') cupy._core.core.ndarray

Reduce the data along an axis by taking the mean of a ‘factor’ of rows along the axis

args:

data_array: array of data to be reduces

factor: the factor to reduce the dimension by

axis: axis to operate on

pad: method to pad if axis is not divisible. If None

will not pad

returns:

array with axis reduced by factor

jess.calculators_cupy.pad_along_axis(array: cupy._core.core.ndarray, new_length: int, axis: int = 0, mode: str = 'median', location: str = 'middle')

Pad along an axis.

args:

array: Array to pad

new_length: New length of the axis

axis: Axis to be padded

mode: mode to pad, see numpy.pad

location: Location of the pad. Options are

[end, start, middle]

return:

Array padded along axis

Based on https://stackoverflow.com/a/49766444

jess.calculators_cupy.to_dtype(data: cupy._core.core.ndarray, dtype: numpy.dtype) cupy._core.core.ndarray

Takes a chunk of data and changes it to a given data type.

Args:

data: Array that you want to convert

dtype: The output data type

Returns:

data converted to dtype

jess.channel_masks module

Contains Utilities to make channel masks

jess.channel_masks.channel_masker(dynamic_spectra: numpy.ndarray, test: str, sigma: float = 4.0, fitter: str = 'median_fitter', chans_per_fit: int = 30, flagger: str = 'z_score_flagger', flag_above: bool = True, flag_below: bool = True, eps: float = 0.9, min_clust_frac: float = 0.01, show_plots: bool = False) numpy.ndarray

Does the given statistical test on a given data array. Then a curve is fitted to the resulting test-bandpass, this removes large effects of the receiver. Z-scores are calculated and channels that decide x-score*sigma are flagged.

The mask of the outlying channels is then saved.

One should be careful if there are only a few channels, z-score will be a bad way to look for outliers.

args:

file - dynamic spectra, the 2D chunk of data to process, time is on y-axis

test - Statistical test you preform on each channel,

option are from stat_test and are Measures of scale: [98-2, 91-9, 90-10, 75-25, mad, stand-dev] Guassianity: [anderson-darling, d’angostino, jarque-bera,

kurtosis, lilliefors, shapio-wilk, skew]

Entropy: [shannon-entropy] Central Value: [mean, midhing, trimean]

If you give a list of two tests, you will get the first one subtracted from the second. For example (stand-dev, mad) will be a test the difference between the two.

You can your rfi_viewer.py to see how your data looks at each one of these tests

sigma - This routine calculates z values over all channels, this is the

sigma at which to flag channels

start - Sample at which to start (default: beginning of file)

nspectra - the number of spectra to process

(default: 65536, the default heimdall gulp)

fitter - Fitter used to fit bandpass effects

chans_per_fit - the number of channels per fitting point, see fitters.py

If zero, don’t do bandpass subtraction. If two tests are given, smooth the second test. (default: 30)

flagger: The flagger to remove outlying points,

[z_score_flagger, dbscan_flagger]

flag_upper - flag values above median+sigma*standard dev (Only z-score)

flag_lower - flag values below median - sigma*standard dev (Only z-score)

eps - dbscan eps (dbscan only)

min_clust_frac - fraction of channels for the minimum cluster size (dbscan only)

show_plot - show the fit and threshold plots, used to check if data is

well behaved (default: false)

returns:

mask - ndarray[bools], where True is a bad channel

Example:

yr_obj = Your(‘some_data.fil’) dynamic_spectra = yr_obj.get_data(0, 65536) mask = channel_masker(dynamic_spectra, which_test=”mean”)

jess.channel_masks.dbscan_flagger(test_values: numpy.ndarray, chans: Optional[numpy.ndarray] = None, eps: float = 0.3, min_clust_frac: float = 0.14, show_plot: bool = False) numpy.ndarray

Use DBScan to look for outliers

args:
test_values: Values from some test for each channel,

expects bandpass effects to be subtracted off

chans: list of chan numbers

eps: DBSCAN eps

min_clust_fraction: minimum fraction of channels for a DBSCAN cluster

show_plot: show diagnostic plot of cluster

return:

Bool mask, True=Bad channel

Example:

yr = Your(“some.fil”) dynamic_spectra = yr.get_data(7000, 2 ** 17) test_values = stat_test(dynamic_spectra, test) fit = fitter(test_values, chans_per_fit=chans_per_fit) flat = test_values - fit mask = dbscan_flagger(flat)

jess.channel_masks.stat_test(data: numpy.ndarray, which_test: str) numpy.ndarray

Runs the statistical tests Should have the same tests as rfi_viewer.py Test:

Measures of scale: 98-2, 91-9, 90-10, 75-25, mad, stand-dev Gaussianity: anderson-darling, d’angostino, jarque-bera,

lilliefors, kurtosis, shapiro-wilk, skew

Central Tendency: mean, midhing, trimean Information: shannopn-entropy

MAD and IQR report the scaled version, to make comparison tests easier

jess.channel_masks.z_score_flagger(flat_bandpass: numpy.ndarray, flag_above: bool = True, flag_below: bool = True, sigma: float = 6.0, measure_of_scale: typing.Callable = functools.partial(<function median_abs_deviation>, scale='normal'), show_plots: bool = False) numpy.ndarray

Flags points based on z-score

args:

flat_banpass - Results of some statistical test with the banpass effects removed

flag_above - Flag values with a z-scores above the threshold

flag_below - Flag values with a z-score below the threshold

sigma - the standard deviation to flag points

show_plots - Show diagnostic plots

returns:

Bool mask where True = bad data

Example:

yr = Your(“some.fil”) dynamic_spectra = yr.get_data(7000, 2 ** 17) test_values = stat_test(dynamic_spectra, test) fit = fitter(test_values, chans_per_fit=chans_per_fit) flat = test_values - fit mask = z_score_flagger(flat)

jess.dispersion module

Dispersion Utilities

dedisperse

jess.dispersion.calc_dispersion_delays(dm: float, chan_freqs: numpy.ndarray) numpy.ndarray

Calculates dispersion delays at an input DM and a frequency array.

Args:

dm (float): DM to calculate the delay chan_freqs (float): Frequencies

Returns:

delays (float): dispersion delays at each frequency channel (in seconds)

jess.dispersion.dedisperse(data: numpy.ndarray, dm: float, tsamp: float, chan_freqs: Optional[numpy.ndarray] = None, delays: Optional[numpy.ndarray] = None) numpy.ndarray

Dedisperse a chunk of data..

Note:

Our method rolls the data around while dedispersing it.

Args:

data: data to dedisperse dm (float): The DM to dedisperse the data at. chan_freqs (float): frequencies tsamp (float): sampling time in seconds delays (float): dispersion delays for each channel (in seconds)

Returns:

dedispersed (float): Dedispersed data

jess.dispersion.delay_lost(dm: float, chan_freqs: numpy.ndarray, tsamp: float) numpy.int64

Calculates the maximum dispersion delay in number of samples

jess.dispersion.fdmt(dynamic_spectra: numpy.ndarray, f_min: float, f_max: float, max_dt: int, dtype: numpy.dtype) numpy.ndarray

This function implements the FDMT algorithm. Note that the bandpass must be flipped, and time on the horizontal

Args:

dynamic_spectra - Dynamic Spectra to dedisperse

f_min,f_max are the base-band begin and end frequencies.

The frequencies should be entered in MHz

max_dt - the maximal delay (in time bins) of the maximal dispersion.

Appears in the paper as N_{Delta} A typical input is maxDT = N_f

dtype - a valid numpy dtype.

reccomended: either int32, or int64.

Returns:

The dispersion measure transform of the Input matrix. he output dimensions are [Input.shape[1],maxDT]

For details, see algorithm 1 in Zackay & Ofek (2014) See the Lincense in _fdmt_utils.py

jess.dispersion.fdmt_fft(dynamic_spectra: numpy.ndarray, f_min: float, f_max: float, max_dt: int, dtype: numpy.dtype) numpy.ndarray

This function implements the FDMT-FFT algorithm.

Args:

dynamic_spectra -

f_min,f_max - are the base-band begin and end frequencies.

he frequencies can be entered in both MHz and GHz, units are factored out in all uses.

maxDT - The maximal delay (in time bins) of the maximal dispersion.

Appears in the paper as N_{Delta} A typical input is maxDT = N_f

dtype - To naively use FFT, one must use floating point types.

Due to casting, use either complex64 or complex128.

Returns:

The dispersion measure transform of the Input matrix. The output dimensions are [Input.shape[1],maxDT]

For details, see algorithm 2 in Zackay & Ofek (2014) See the Lincense in _fdmt_utils.py

jess.dispersion_cupy module

Dispersion Utilities using cupy

jess.dispersion_cupy.calc_dispersion_delays(dm: float, chan_freqs: cupy._core.core.ndarray) cupy._core.core.ndarray

Calculates dispersion delays at an input DM and a frequency array.

Args:

dm (float): DM to calculate the delay chan_freqs (float): Frequencies

Returns:

delays (float): dispersion delays at each frequency channel (in seconds)

jess.dispersion_cupy.dedisperse(data: cupy._core.core.ndarray, dm: float, tsamp: float, chan_freqs: Optional[cupy._core.core.ndarray] = None, delays: Optional[cupy._core.core.ndarray] = None) cupy._core.core.ndarray

Dedisperse a chunk of data..

Note:

Our method rolls the data around while dedispersing it.

Args:

data: data to dedisperse dm (float): The DM to dedisperse the data at. chan_freqs (float): frequencies tsamp (float): sampling time in seconds delays (float): dispersion delays for each channel (in seconds)

Returns:

dedispersed (float): Dedispersed data

jess.dispersion_cupy.delay_lost(dm: float, chan_freqs: cupy._core.core.ndarray, tsamp: float) numpy.int64

Calculates the maximum dispersion delay in number of samples

jess.fitters module

A somewhat robust way to fit bandpasses

cheb_fitter() Fits Chebyshev polynomials, does to fits to be robust.

bandpass_fitter() Fits polynomials twice to get a robust fit.

bspline_fitt() Fits bsplines using the Huber Regressor as a loss function

bspline_fitter() Fits bsplines using the Huber Regressor as a loss function, fits twice to be even more robust

class jess.fitters.SplineTransformer(knots, degree=3, periodic=False)

Bases: sklearn.base.TransformerMixin

The bspline transformer class

fit(x, y=None)

This gives sklearn a fit function

static get_empty_bsplines(num_knots: int, num_chans: int, degree: int = 3, periodic: bool = False) list

Creates the empty bsplines that will be used then fit by the Huber Regressor

transform(x)

Transform the bsplines

jess.fitters.arpls_fitter(bandpass: numpy.ndarray, lam: float = 10000.0, ratio: float = 0.05, itermax: int = 10, dtype: object = <class 'numpy.float32'>) numpy.ndarray

Baseline correction using asymmetrically reweighted penalized least squares smoothing Sung-June Baek, Aaron Park, Young-Jin Ahna and Jaebum Choo, Analyst, 2015, 140, 250 (2015)

Abstract

Baseline correction methods based on penalized least squares are successfully applied to various spectral analyses. The methods change the weights iteratively by estimating a baseline. If a signal is below a previously fitted baseline, large weight is given. On the other hand, no weight or small weight is given when a signal is above a fitted baseline as it could be assumed to be a part of the peak. As noise is distributed above the baseline as well as below the baseline, however, it is desirable to give the same or similar weights in either case. For the purpose, we propose a new weighting scheme based on the generalized logistic function. The proposed method estimates the noise level iteratively and adjusts the weights correspondingly. According to the experimental results with simulated spectra and measured Raman spectra, the proposed method outperforms the existing methods for baseline correction and peak height estimation.

This was first used for radio astronomy in Radio frequency interference mitigation based on the asymmetrically reweighted penalized least squares and SumThreshold method (2021) http://zmtt.bao.ac.cn/GPPS/RFI/

Args:

Bandpass: the bandpass to fit

lam: parameter that can be adjusted by user. The larger lambda is,

the smoother the resulting background, z

ratio: weighting deviations: 0 < ratio < 1,

smaller values allow less negative values

itermax: number of iterations to perform

dtype: data type to preform the matrix opterations

Output:

Fit to bandpass

jess.fitters.bspline_fit(bandpass: numpy.ndarray, channels: Optional[numpy.ndarray] = None, chans_per_knot: int = 100, max_inter=250) numpy.ndarray

This fits a bsplines using the Huber regressor.

The Huber Regressor is a robust fitting function.

Inspired by https://gist.github.com/MMesch/35d7833a3daa4a9e8ca9c6953cbe21d4

Args:

Bandpass: the bandpass to fit

chans_per_knot: number of channels per spline knot

max_iter: The maximum number of iterations

Returns:

Fit to bandpass

Example:

yr = Your(input_file) data = yr.get_data(0, 8192) bandpass = np.median(section, axis=0) fit = bspline_fit(bandpass)

jess.fitters.bspline_fitter(bandpass: numpy.ndarray, channels: Optional[numpy.ndarray] = None, chans_per_fit: int = 100) numpy.ndarray

This fits a bsplines using the Huber regressor.

The Huber Regressor is a robust fitting function.

This wraps bspline_fit, # running it twice to help it futher reject outliers (Not implemented)

Args:

Bandpass: the bandpass to fit

chans_per_fit: number of channels per spline knot

Returns:

Fit to bandpass

Example:

yr = Your(input_file) data = yr.get_data(0, 8192) bandpass = np.median(section, axis=0) fit = bspline_fitter(bandpass)

Notes:

I’ve attempted to make this even more robbust by running it once, flagging data and running it again. However I can’t get model.predict() to work on the full channel set when it is trained with flagged channels.

jess.fitters.cheb_fitter(bandpass: numpy.ndarray, channels: Optional[numpy.ndarray] = None, chans_per_fit: int = 100, mask_sigma: float = 6) numpy.ndarray

Fits bandpasses by Chebyshev fitting the bandpass, looking for channels that are far from this fit, excluding these channels and refitting the bandpass This works well for bandpasses with sine/cosine like features.

Args:

bandpass: the bandpass to fit

chans_per_fit: number of channels for each polynomial order

mask_sigma: standard deviation at which to mask outlying channels

Returns:

Fit to bandpass

Example:

yr = Your(input_file) data = yr.get_data(0, 8192) bandpass = np.median(section, axis=0) fit = cheb_fitter(bandpass)

jess.fitters.get_fitter(fitter: str) Callable

Get the fitter object for a given string

Args:

fitter: string with the selection of bspline_fitter, cheb_fitter, median_fitter, or poly_fitter

return:

corresponding fitter object

jess.fitters.median_fitter(bandpass: numpy.ndarray, chans_per_fit: int = 19, interations: int = 1) numpy.ndarray

Uses a median filter to fit for the bandpass shape

Args:

bandpass: ndarray to fit

chans_per_fit: Number of channels to run the median filter over

iterations: Number of iterations to run the median filter. Must be

greater that one.

Returns:

Fit to bandpass

Notes:

The idea to run this multiple times is from GSL. A recursive median filter might be worth investigating. See https://www.gnu.org/software/gsl/doc/html/filter.html

Example:

yr = Your(input_file) data = yr.get_data(0, 8192) bandpass = np.median(section, axis=0) fit = median_fitter(bandpass)

jess.fitters.poly_fitter(bandpass: numpy.ndarray, channels: Optional[numpy.ndarray] = None, chans_per_fit: int = 200, mask_sigma: float = 6) numpy.ndarray

Fits bandpasses by polyfitting the bandpass, looking for channels that are far from this fit, excluding these channels and refitting the bandpass

Args:

bandpass: the bandpass to fit

channels: list of channel numbers, if None, will create a list starting at zero

chans_per_fit: Number of channels per polynomial

mask_sigma: standard deviation at which to mask outlying channels

Returns:

Fit to bandpass

Example:

yr = Your(input_file) data = yr.get_data(0, 8192) bandpass = np.median(section, axis=0) fit = poly_fitter(bandpass)

jess.fitters_cupy module

A somewhat robust way to fit bandpasses - cupy edition

jess.fitters_cupy.arpls_fitter(bandpass: cupy._core.core.ndarray, lam: float = 30000.0, ratio: float = 0.05, itermax: int = 10, dtype: object = <class 'numpy.float32'>) cupy._core.core.ndarray

Baseline correction using asymmetrically reweighted penalized least squares smoothing Sung-June Baek, Aaron Park, Young-Jin Ahna and Jaebum Choo, Analyst, 2015, 140, 250 (2015)

Abstract

Baseline correction methods based on penalized least squares are successfully applied to various spectral analyses. The methods change the weights iteratively by estimating a baseline. If a signal is below a previously fitted baseline, large weight is given. On the other hand, no weight or small weight is given when a signal is above a fitted baseline as it could be assumed to be a part of the peak. As noise is distributed above the baseline as well as below the baseline, however, it is desirable to give the same or similar weights in either case. For the purpose, we propose a new weighting scheme based on the generalized logistic function. The proposed method estimates the noise level iteratively and adjusts the weights correspondingly. According to the experimental results with simulated spectra and measured Raman spectra, the proposed method outperforms the existing methods for baseline correction and peak height estimation.

This was first used for radio astronomy in Radio frequency interference mitigation based on the asymmetrically reweighted penalized least squares and SumThreshold method (2021) http://zmtt.bao.ac.cn/GPPS/RFI/

Args:

Bandpass: the bandpass to fit

lam: parameter that can be adjusted by user. The larger lambda is,

the smoother the resulting background, z

ratio: weighting deviations: 0 < ratio < 1,

smaller values allow less negative values

itermax: number of iterations to perform

dtype: data type to preform the matrix opterations

Output:

Fit to bandpass

Note:

The Cholesky decomosition on cupy (9.5.0) is slightly off (at e-9 level) this causes the changes amount of smoothness. Upping lam to 3e4 makes the outputs the same.

jess.fitters_cupy.median_fitter(bandpass: cupy._core.core.ndarray, chans_per_fit: int = 19, interations: int = 1) cupy._core.core.ndarray

Uses a median filter to fit for the bandpass shape. Note: does inplace

Args:

bandpass: ndarray to fit

chans_per_fit: Number of channels to run the median filter over

iterations: Number of iterations to run the median filter. Must be

greater that one.

Returns:

Fit to bandpass

Notes:

The idea to run this multiple times is from GSL. A recursive median filter might be worth investigating. See https://www.gnu.org/software/gsl/doc/html/filter.html

Example:

yr = Your(input_file) data = yr.get_data(0, 8192) bandpass = np.median(section, axis=0) fit = median_fitter(bandpass)

jess.fitters_cupy.poly_fitter(bandpass: cupy._core.core.ndarray, channels: Optional[cupy._core.core.ndarray] = None, chans_per_fit: int = 200, mask_sigma: float = 6) cupy._core.core.ndarray

Fits bandpasses by polyfitting the bandpass, looking for channels that are far from this fit, excluding these channels and refitting the bandpass

Args:

bandpass: the bandpass (or any array) that you want to fit a polynomial to.

channels: list of channel numbers, if None, will create a list starting at zero

bandpass: the bandpass to fit

polyorder: order of polynomial to fit

mask_sigma: standard deviation at which to mask outlying channels

Returns:

Fit to bandpass

Example:

yr = Your(input_file) data = cp.asarray(yr.get_data(0, 8192)) bandpass = cp.median(section, axis=0) fit = poly_fitter(bandpass) plt.plot(fit.get())

Module contents