import numpy as np
import astropy.units as u
import keras
from keras.utils import Sequence, to_categorical
from dl1_data_handler.reader import ProcessType
[docs]
class DLDataLoader(Sequence):
"""
Generates batches for Keras application.
DLDataLoader is a data loader class that inherits from ``~keras.utils.Sequence``.
It is designed to handle and load data for deep learning models in a batch-wise manner.
Attributes:
-----------
data_reader : DLDataReader
An instance of DLDataReader to read the input data.
indices : list
List of indices to specify the data to be loaded.
tasks : list
List of tasks to be performed on the data to properly set up the labels.
batch_size : int
Size of the batch to load the data.
random_seed : int, optional
Whether to shuffle the data after each epoch with a provided random seed.
Methods:
--------
__len__():
Returns the number of batches per epoch.
on_epoch_end():
Updates indices after each epoch if random seed is provided.
__getitem__(index):
Generates one batch of data using _get_mono_item(index) or _get_stereo_item(index).
_get_mono_item(index):
Generates one batch of monoscopic data.
_get_stereo_item(index):
Generates one batch of stereoscopic data.
"""
def __init__(
self,
DLDataReader,
indices,
tasks,
batch_size=64,
random_seed=None,
sort_by_intensity=False,
stack_telescope_images=False,
**kwargs,
):
super().__init__(**kwargs)
"Initialization"
self.DLDataReader = DLDataReader
self.indices = indices
self.tasks = tasks
self.batch_size = batch_size
self.random_seed = random_seed
self.on_epoch_end()
self.stack_telescope_images = stack_telescope_images
self.sort_by_intensity = sort_by_intensity
# Set the input shape based on the mode of the DLDataReader
if self.DLDataReader.__class__.__name__ != "DLFeatureVectorReader":
if self.DLDataReader.mode == "mono":
self.input_shape = self.DLDataReader.input_shape
elif self.DLDataReader.mode == "stereo":
self.input_shape = self.DLDataReader.input_shape[
list(self.DLDataReader.selected_telescopes)[0]
]
# Reshape inputs into proper dimensions
# for the stereo analysis with stacked images
if self.stack_telescope_images:
self.input_shape = (
self.input_shape[1],
self.input_shape[2],
self.input_shape[0] * self.input_shape[3],
)
def __len__(self):
"""
Returns the number of batches per epoch.
This method calculates the number of batches required to cover the entire dataset
based on the batch size.
Returns:
--------
int
Number of batches per epoch.
"""
return int(np.floor(len(self.indices) / self.batch_size))
[docs]
def on_epoch_end(self):
"""
Updates indices after each epoch. If a random seed is provided, the indices are shuffled.
This method is called at the end of each epoch to ensure that the data is shuffled
if the shuffle attribute is set to True. This helps in improving the training process
by providing the model with a different order of data in each epoch.
"""
if self.random_seed is not None:
np.random.seed(self.random_seed)
np.random.shuffle(self.indices)
def __getitem__(self, index):
"""
Generate one batch of data and retrieve the features and labels.
This method is called to generate one batch of monoscopic and stereoscopic data based on
the index provided. It calls either _get_mono_item(batch) or _get_stereo_item(batch)
based on the mode of the DLDataReader.
Parameters:
-----------
index : int
Index of the batch to generate.
Returns:
--------
tuple
A tuple containing the input data as features and the corresponding labels.
"""
# Generate indices of the batch
batch_indices = self.indices[
index * self.batch_size : (index + 1) * self.batch_size
]
features, labels = None, None
if self.DLDataReader.mode == "mono":
batch = self.DLDataReader.generate_mono_batch(batch_indices)
features, labels = self._get_mono_item(batch)
elif self.DLDataReader.mode == "stereo":
batch = self.DLDataReader.generate_stereo_batch(batch_indices)
features, labels = self._get_stereo_item(batch)
return features, labels
def _get_mono_item(self, batch):
"""
Retrieve the features and labels for one batch of monoscopic data.
This method is called to retrieve the features and labels for one batch of
monoscopic data. The labels are set up based on the tasks specified.
Parameters:
-----------
batch : astropy.table.Table
A table containing the data for the batch.
Returns:
--------
tuple
A tuple containing the input data as features and the corresponding labels.
"""
# Retrieve the telescope images and store in the features dictionary
labels = {}
features = batch["features"].data
if "type" in self.tasks:
labels["type"] = to_categorical(
batch["true_shower_primary_class"].data,
num_classes=2,
)
# Temp fix till keras support class weights for multiple outputs or I wrote custom loss
# https://github.com/keras-team/keras/issues/11735
if len(self.tasks) == 1:
labels = to_categorical(
batch["true_shower_primary_class"].data,
num_classes=2,
)
if "energy" in self.tasks:
labels["energy"] = batch["log_true_energy"].data
if "skydirection" in self.tasks:
labels["skydirection"] = np.stack(
(
batch["fov_lon"].data,
batch["fov_lat"].data,
),
axis=1,
)
if "cameradirection" in self.tasks:
labels["cameradirection"] = np.stack(
(
batch["cam_coord_offset_x"].data,
batch["cam_coord_offset_y"].data,
),
axis=1,
)
return features, labels
def _get_stereo_item(self, batch):
"""
Retrieve the features and labels for one batch of stereoscopic data.
This method is called to retrieve the features and labels for one batch of
stereoscopic data. The original batch is grouped to retrieve the telescope
data for each event and then the telescope images or waveforms are stored
by the hillas intensity or stacked if required. Feature vectors can also
be retrieved if available for ``telescope``- and ``subarray``level. The
labels are set up based on the tasks specified.
Parameters:
-----------
batch : astropy.table.Table
A table containing the data for the batch.
Returns:
--------
tuple
A tuple containing the input data as features and the corresponding labels.
"""
labels = {}
if self.DLDataReader.process_type == ProcessType.Simulation:
batch_grouped = batch.group_by(
["obs_id", "event_id", "tel_type_id", "true_shower_primary_class"]
)
elif self.DLDataReader.process_type == ProcessType.Observation:
batch_grouped = batch.group_by(["obs_id", "event_id", "tel_type_id"])
features, mono_feature_vectors, stereo_feature_vectors = [], [], []
true_shower_primary_class = []
log_true_energy = []
fov_lon, fov_lat, angular_separation = [], [], []
cam_coord_offset_x, cam_coord_offset_y, cam_coord_distance = [], [], []
for group_element in batch_grouped.groups:
if "features" in batch.colnames:
if self.sort_by_intensity:
# Sort images by the hillas intensity in a given batch if requested
group_element.sort(["hillas_intensity"], reverse=True)
# Stack the telescope images for stereo analysis
if self.stack_telescope_images:
# Retrieve the telescope images
plain_features = group_element["features"].data
# Stack the telescope images along the last axis
stacked_features = np.concatenate(
[plain_features[i] for i in range(plain_features.shape[0])],
axis=-1,
)
# Append the stacked images to the features list
# shape: (batch_size, image_shape, image_shape, n_channels * n_tel)
features.append(stacked_features)
else:
# Append the plain images to the features list
# shape: (batch_size, n_tel, image_shape, image_shape, n_channels)
features.append(group_element["features"].data)
# Retrieve the feature vectors
if "mono_feature_vectors" in batch.colnames:
mono_feature_vectors.append(group_element["mono_feature_vectors"].data)
if "stereo_feature_vectors" in batch.colnames:
stereo_feature_vectors.append(
group_element["stereo_feature_vectors"].data
)
# Retrieve the labels for the tasks
# FIXME: This won't work for divergent pointing directions
if "type" in self.tasks:
true_shower_primary_class.append(
group_element["true_shower_primary_class"].data[0]
)
if "energy" in self.tasks:
log_true_energy.append(group_element["log_true_energy"].data[0])
if "skydirection" in self.tasks:
fov_lon.append(group_element["fov_lon"].data[0])
fov_lat.append(
group_element["fov_lat"].data[0]
)
if "cameradirection" in self.tasks:
cam_coord_offset_x.append(group_element["cam_coord_offset_x"].data)
cam_coord_offset_y.append(
group_element["cam_coord_offset_y"].data
)
# Store the labels in the labels dictionary
if "type" in self.tasks:
labels["type"] = to_categorical(
np.array(true_shower_primary_class),
num_classes=2,
)
# Temp fix till keras support class weights for multiple outputs or I wrote custom loss
# https://github.com/keras-team/keras/issues/11735
if len(self.tasks) == 1:
labels = to_categorical(
np.array(true_shower_primary_class),
num_classes=2,
)
if "energy" in self.tasks:
labels["energy"] = np.array(log_true_energy)
if "skydirection" in self.tasks:
labels["skydirection"] = np.stack(
(
np.array(fov_lon),
np.array(fov_lat),
),
axis=1,
)
if "cameradirection" in self.tasks:
labels["cameradirection"] = np.stack(
(
np.array(cam_coord_offset_x),
np.array(cam_coord_offset_y),
),
axis=1,
)
# Store the fatures in the features dictionary
if "features" in batch.colnames:
features = np.array(features)
# TDOO: Add support for both feature vectors
if "mono_feature_vectors" in batch.colnames:
features = np.array(mono_feature_vectors)
if "stereo_feature_vectors" in batch.colnames:
features = np.array(stereo_feature_vectors)
return features, labels