Source code for wormpose.machine_learning.synthetic_data_generator

Generates synthetic data using multiprocessing

import os
import pickle
import time
from multiprocessing import Process, Value
from typing import Type, Callable, Generator, Optional

import numpy as np

from wormpose.dataset import Dataset
from wormpose.machine_learning.generic_file_writer import GenericFileWriter
from wormpose.pose.centerline import flip_theta

class _TemplatesChunkData(object):
    def __init__(self, num_total_templates):

        self.frames = None
        self.measurements = None
        self.skeletons = None
        self.video_names = None

        self._num_total = num_total_templates
        self._cur_index = 0

    def _init_chunk_arr(self, arr):
        arr = np.asarray(arr)
        return np.empty((self._num_total,) + arr.shape[1:], dtype=arr.dtype)

    def add_data(self, features, frames, template_indexes, video_name: str):

        # sort indexes for faster frames reading
        template_indexes = np.sort(template_indexes)

        if self._cur_index == 0:
            # init all chunk arrays
            # frames images may not have all the same size, so preallocate a simple python list instead
            self.frames = [None] * self._num_total
            self.skeletons = self._init_chunk_arr(features.skeletons)
            self.measurements = self._init_chunk_arr(features.measurements)
            self.video_names = self._init_chunk_arr(video_name)

        for i in range(len(template_indexes)):
            template_index = template_indexes[i]

            self.frames[self._cur_index] = frames[template_index]
            self.skeletons[self._cur_index] = features.skeletons[template_index]
            self.measurements[self._cur_index] = features.measurements[template_index]
            self.video_names[self._cur_index] = video_name

            self._cur_index = self._cur_index + 1

    def write_to_file(self, filename: str):
        with open(filename, "wb") as f:
            pickle.dump(self, f)

def _write_to_file(
    out_filename: str,
    num_samples: int,
    template_filename: str,
    postures_generation_fn: Callable[[], Generator],
    writer: Type[GenericFileWriter],
    random_seed: Optional[int],
    from wormpose.images.synthetic import SyntheticDataset

    seed = int.from_bytes(os.urandom(4), byteorder="little") if random_seed is None else random_seed

    with open(template_filename, "rb") as templates_f:
        templates = pickle.load(templates_f)

    worm_measurements = {}
    for video_name in np.unique(templates.video_names):
        indexes = np.where(templates.video_names == video_name)[0]
        measurements = templates.measurements[indexes]
        average_measurements = np.empty((measurements.shape[1:]), dtype=measurements.dtype)
        for name in measurements.dtype.names:
            average_measurements[0][name] = np.nanmean(measurements[name])
        worm_measurements[video_name] = average_measurements

    synthetic_dataset = SyntheticDataset(**synthetic_dataset_args)

    # preallocate all the template indexes to create the synthetic images (random)
    template_indexes = np.random.randint(len(templates.frames), size=num_samples)

    # preallocate all the choices for the position of the head (random)
    headtail_choice = np.random.choice([0, 1], size=num_samples)

    # preallocate image buffer to draw the synthetic image on
    image_data = np.empty(synthetic_dataset.output_image_shape, dtype=np.uint8)

    postures_gen = postures_generation_fn()
    with writer(out_filename) as synth_data_writer:

        for index, (cur_template_index, cur_headtail_choice) in enumerate(zip(template_indexes, headtail_choice)):
            theta = next(postures_gen)
            label_data = [theta, flip_theta(theta)]
            video_name = templates.video_names[cur_template_index]
            template_frame = templates.frames[cur_template_index]
            template_skeleton = templates.skeletons[cur_template_index]
            template_measurements = worm_measurements[video_name]
            progress_counter.value = index + 1

[docs]class SyntheticDataGenerator(object): """ This generator makes synthetic data by distributing the work to several processes: Each process will write one file containing the synthetic data and its label We use a "GenericFileWriter" to be able to write the files in different formats (TFrecord, pickle etc...) """ _MAX_TEMPLATE_FRAMES = 1000 def __init__( self, num_process: int, temp_dir: str, dataset: Dataset, postures_generation_fn: Callable[[], Generator], enable_random_augmentations: bool, writer: Type[GenericFileWriter], random_seed: Optional[int], ): """ :param num_process: How many processes to distribute the work: there will be one file written per process :param temp_dir: Where to store temporary files :param dataset: A WormPose dataset :param postures_generation_fn: A function generating worm postures (angles) :param enable_random_augmentations: If true, the synthetic images will have random augmentations: translation, scale, blur :param writer: object responsible of writing the data to files :param random_seed: Seed for reproducibility """ self.num_process = num_process self.temp_dir = temp_dir self.dataset = dataset self.postures_generation_fn = postures_generation_fn self.writer = writer self.random_seed = random_seed self.synthetic_dataset_args = dict( frame_preprocessing=dataset.frame_preprocessing, enable_random_augmentations=enable_random_augmentations, output_image_shape=dataset.image_shape, ) self.video_names = dataset.video_names self._all_template_indexes = [dataset.features_dataset[v].labelled_indexes for v in self.video_names] if np.sum([len(x) for x in self._all_template_indexes]) == 0: raise RuntimeError("Can't create training data because couldn't find any labelled frame in the dataset.") def _split_data(self, num_samples_per_process): """ Split a subset of template frames and associated features from different videos randomly to several processes """ for process_index in range(self.num_process): num_samples = num_samples_per_process[process_index] if num_samples == 0: continue num_templates = min(num_samples, self._MAX_TEMPLATE_FRAMES) projected_num_templates_per_video = max(num_templates // len(self.video_names), 1) actual_num_templates_per_video = [ min(projected_num_templates_per_video, len(x)) for x in self._all_template_indexes ] num_total_templates = np.sum(actual_num_templates_per_video) templates_chunk_data = _TemplatesChunkData(num_total_templates=num_total_templates) for video_name, cur_video_template_indexes, cur_video_num_templates in zip( self.video_names, self._all_template_indexes, actual_num_templates_per_video, ): # Select randomly some template indexes for each video template_indexes = np.random.choice( cur_video_template_indexes, size=cur_video_num_templates, replace=False, ) # Fill in the templates data chunk for this video with as frames: templates_chunk_data.add_data( features=self.dataset.features_dataset[video_name], frames=frames, template_indexes=template_indexes, video_name=video_name, ) template_filename = os.path.join(self.temp_dir, f"template_frames_{process_index:02d}.pkl") templates_chunk_data.write_to_file(template_filename) yield template_filename
[docs] def generate(self, file_pattern: str, num_samples: int): """ Start the generation of files containing synthetic data :param file_pattern: Filepath of the files to generate with an "index" variable ex: "path_to/file_{index}.tfrecord" :param num_samples: How many images (total) to generate, they will be split in several files """ if num_samples <= 0: return 1 # nothing to do when num_samples is <= 0, progress 100% progress_counters = [Value("i", 0) for _ in range(self.num_process)] num_samples_per_process = [num_samples // self.num_process] * self.num_process num_samples_per_process[0] += num_samples % self.num_process workers = [] for process_index, template_filename in enumerate(self._split_data(num_samples_per_process)): seed = self.random_seed + process_index if self.random_seed is not None else None out_filename = file_pattern.format(index=process_index) proc = Process( target=_write_to_file, args=( out_filename, num_samples_per_process[process_index], template_filename, self.postures_generation_fn, progress_counters[process_index], self.writer, self.synthetic_dataset_args, seed, ), ) proc.start() workers.append(proc) while True: num_entries_written = np.sum([x.value for x in progress_counters]) yield float(num_entries_written) / num_samples if num_entries_written >= num_samples: break time.sleep(0.5) for proc in workers: proc.join()