support_eddl.h
Go to the documentation of this file.
1 /*
2 * ECVL - European Computer Vision Library
3 * Version: 1.0.3
4 * copyright (c) 2021, Università degli Studi di Modena e Reggio Emilia (UNIMORE), AImageLab
5 * Authors:
6 * Costantino Grana (costantino.grana@unimore.it)
7 * Federico Bolelli (federico.bolelli@unimore.it)
8 * Michele Cancilla (michele.cancilla@unimore.it)
9 * Laura Canalini (laura.canalini@unimore.it)
10 * Stefano Allegretti (stefano.allegretti@unimore.it)
11 * All rights reserved.
12 */
13 
14 #ifndef ECVL_SUPPORT_EDDL_H_
15 #define ECVL_SUPPORT_EDDL_H_
16 
17 #include "ecvl/augmentations.h"
18 #include "ecvl/core/filesystem.h"
19 #include "ecvl/core/image.h"
20 #include "ecvl/dataset_parser.h"
21 
22 #include <eddl/apis/eddl.h>
23 
24 #include <algorithm>
25 #include <condition_variable>
26 #include <mutex>
27 #include <queue>
28 #include <thread>
29 #include <tuple>
30 
31 namespace ecvl
32 {
33 #define ECVL_ERROR_AUG_DOES_NOT_EXIST throw std::runtime_error(ECVL_ERROR_MSG "Augmentation for this split does not exist");
34 #define ECVL_ERROR_WORKERS_LESS_THAN_ONE throw std::runtime_error(ECVL_ERROR_MSG "Dataset workers must be at least one");
35 
45 void ImageToTensor(const Image& img, Tensor*& t);
46 
57 void ImageToTensor(const Image& img, Tensor*& t, const int& offset);
58 
71 void TensorToImage(const Tensor* t, Image& img);
72 
85 void TensorToView(const Tensor* t, View<DataType::float32>& v);
86 
96 {
97  std::vector<std::shared_ptr<Augmentation>> augs_;
98 public:
99  DatasetAugmentations(const std::vector<std::shared_ptr<Augmentation>>& augs) : augs_(augs) {}
100 
101  // This makes a deep copy of the Augmentations
103  {
104  for (const auto& a : other.augs_) {
105  augs_.emplace_back(a ? a->Clone() : nullptr);
106  }
107  }
108 
109  // Getters: YAGNI
110 
111  bool Apply(const int split, Image& img, const Image& gt = Image())
112  {
113  // check if the augs for split st are provided
114  try {
115  if (augs_.at(split)) {
116  augs_[split]->Apply(img, gt);
117  return true;
118  }
119  return false;
120  }
121  catch (const std::out_of_range) {
123  }
124  }
125 
126  bool Apply(SplitType st, Image& img, const Image& gt = Image())
127  {
128  return Apply(+st, img, gt); // Magic + operator
129  }
130 
131  bool IsEmpty() const
132  {
133  return augs_.empty();
134  }
135 };
136 
144 {
145  std::condition_variable cond_notempty_;
146  std::condition_variable cond_notfull_;
147  mutable std::mutex mutex_;
148  std::queue<std::tuple<Sample, Tensor*, Tensor*>> cpq_;
149  unsigned max_size_;
150  unsigned threshold_;
152 public:
157  ProducersConsumerQueue(unsigned mxsz) : max_size_(mxsz), threshold_(max_size_ / 2) {}
162  ProducersConsumerQueue(unsigned mxsz, unsigned thresh) : max_size_(mxsz), threshold_(thresh) {}
163 
172  void Push(const Sample& sample, Tensor* const image, Tensor* const label)
173  {
174  std::unique_lock<std::mutex> lock(mutex_);
175  cond_notfull_.wait(lock, [this]() { return cpq_.size() < max_size_; });
176  cpq_.push(std::make_tuple(sample, image, label));
177  cond_notempty_.notify_one();
178  }
179 
189  {
190  cond_notfull_.notify_all();
191  }
192 
202  void Pop(Sample& sample, Tensor*& image, Tensor*& label)
203  {
204  std::unique_lock<std::mutex> lock(mutex_);
205  cond_notempty_.wait(lock, [this]() { return !cpq_.empty(); });
206  std::tie(sample, image, label) = cpq_.front();
207  cpq_.pop();
208  if (cpq_.size() < threshold_) {
209  cond_notfull_.notify_all();
210  }
211  }
212 
217  bool IsFull() const
218  {
219  std::unique_lock<std::mutex> lock(mutex_);
220  return cpq_.size() >= max_size_;
221  }
222 
227  bool IsEmpty() const
228  {
229  std::unique_lock<std::mutex> lock(mutex_);
230  return cpq_.empty();
231  }
232 
237  size_t Length() const
238  {
239  std::unique_lock<std::mutex> lock(mutex_);
240  return cpq_.size();
241  }
242 
248  void SetSize(int max_size, int thresh = -1)
249  {
250  max_size_ = max_size;
251  threshold_ = thresh != -1 ? thresh : max_size / 2;
252  }
253 
254  void Clear()
255  {
256  std::unique_lock<std::mutex> lock(mutex_);
257 
258  // Remove residual samples and delete data
259  while (!cpq_.empty()) {
260  cpq_.pop();
261  }
262  }
263 };
264 
272 {
273 public:
274  int counter_;
275  int min_, max_;
277  ThreadCounters(int min, int max) : counter_{ min }, min_{ min }, max_{ max } {}
278  ThreadCounters(int counter, int min, int max) : counter_{ counter }, min_{ min }, max_{ max } {}
279  void Reset() { counter_ = min_; }
280 };
281 
288 class DLDataset : public Dataset
289 {
290  const unsigned processor_count_ = std::thread::hardware_concurrency();
291 protected:
292  unsigned num_workers_;
293  std::vector<std::vector<ThreadCounters>> splits_tc_;
294  std::vector<std::thread> producers_;
295  bool active_ = false;
296  std::mutex active_mutex_;
297  static std::default_random_engine re_;
303  void InitTC(int split_index);
304 
307  {
308  switch (task_) {
310  tensors_shape_ = std::make_pair<std::vector<int>, std::vector<int>>(
312  { batch_size_, vsize(classes_) });
313  break;
314  case Task::segmentation:
315  tensors_shape_ = std::make_pair<std::vector<int>, std::vector<int>>(
318  break;
319  }
320  }
321 
322 public:
324  int n_channels_gt_ = -1;
325  std::vector<int> resize_dims_;
327  std::vector<int> current_batch_;
332  std::pair< std::vector<int>, std::vector<int>> tensors_shape_;
345  DLDataset(const filesystem::path& filename,
346  const int batch_size,
347  const DatasetAugmentations& augs,
348  const ColorType ctype = ColorType::RGB,
349  const ColorType ctype_gt = ColorType::GRAY,
350  const unsigned num_workers = 1,
351  const double queue_ratio_size = 1.,
352  const std::unordered_map<std::string, bool>& drop_last = std::unordered_map<std::string, bool>{},
353  bool verify = false) :
354 
355  Dataset{ filename, verify },
356  batch_size_{ batch_size },
357  augs_(augs),
358  num_workers_{ std::min(num_workers, processor_count_) },
359  ctype_{ ctype },
360  ctype_gt_{ ctype_gt },
361  queue_{ static_cast<unsigned>(batch_size * queue_ratio_size * std::min(num_workers, processor_count_)) }
362  {
363  // resize current_batch_ to the number of splits and initialize it with 0
364  current_batch_.resize(split_.size(), 0);
365 
366  // Initialize n_channels_
367  Image tmp = samples_[0].LoadImage(ctype);
368  n_channels_ = tmp.Channels();
369 
370  if (!split_.empty()) {
371  current_split_ = 0;
372  // Initialize resize_dims_ after that augmentations on the first image are performed
373  if (augs_.IsEmpty()) {
374  std::cout << ECVL_WARNING_MSG << "Augmentations are empty!" << std::endl;
375  }
376  else {
377  while (!augs_.Apply(current_split_, tmp)) {
378  ++current_split_;
379  }
380  }
381  auto y = tmp.channels_.find('y');
382  auto x = tmp.channels_.find('x');
383  assert(y != std::string::npos && x != std::string::npos);
384  resize_dims_.insert(resize_dims_.begin(), { tmp.dims_[y],tmp.dims_[x] });
385 
386  // Initialize n_channels_gt_ if exists
387  if (samples_[0].label_path_ != nullopt) {
388  n_channels_gt_ = samples_[0].LoadImage(ctype_gt_, true).Channels();
389  }
390  }
391  else {
392  std::cout << ECVL_WARNING_MSG << "Missing splits in the dataset file." << std::endl;
393  }
394 
395  // Set drop_last parameter for each split
396  if (!drop_last.empty()) {
397  if (drop_last.size() == vsize(split_)) {
398  for (int i = 0; i < vsize(split_); ++i) {
399  split_[i].drop_last_ = drop_last.at(split_[i].split_name_);
400  }
401  }
402  else {
403  std::cout << ECVL_WARNING_MSG << "drop_last is not empty but the provided size is different from the size of the splits. The default value 'false' is set for all the splits" << std::endl;
404  }
405  }
406 
407  // Initialize num_batches, last_batch and the ThreadCounters for each split
408  auto s_index = 0;
409  splits_tc_ = std::vector<std::vector<ThreadCounters>>(vsize(split_));
410  for (auto& s : split_) {
411  s.SetNumBatches(batch_size_);
412  s.SetLastBatch(batch_size_);
413 
414  InitTC(s_index);
415  ++s_index;
416  }
417 
418  SetTensorsShape();
419  }
420 
427  void ResetBatch(const ecvl::any& split = -1, bool shuffle = false);
428 
433  void ResetAllBatches(bool shuffle = false);
434 
440  void LoadBatch(Tensor*& images, Tensor*& labels);
441 
446  void LoadBatch(Tensor*& images);
447 
452  static void SetSplitSeed(unsigned seed) { re_.seed(seed); }
453 
459  void SetBatchSize(int bs);
460 
467  virtual void ProduceImageLabel(DatasetAugmentations& augs, Sample& elem);
468 
475  void ThreadFunc(int thread_index);
476 
481  std::tuple<std::vector<Sample>, std::shared_ptr<Tensor>, std::shared_ptr<Tensor>> GetBatch();
482 
487  void Start(int split_index = -1);
488 
490  void Stop();
491 
496  auto GetQueueSize() const { return queue_.Length(); };
497 
502  void SetAugmentations(const DatasetAugmentations& da);
503 
510  const int GetNumBatches(const ecvl::any& split = -1);
511 
517  void ToTensorPlane(const std::vector<int>& label, Tensor*& tensor);
518 
523  void SetWorkers(const unsigned num_workers)
524  {
525  if (num_workers < 0) {
527  }
528 
529  num_workers_ = std::min(num_workers, processor_count_);
530  for (int i = 0; i < vsize(split_); ++i) {
531  InitTC(i);
532  }
533  }
534 
541  void SetNumChannels(const int n_channels, const int n_channels_gt = 1)
542  {
543  n_channels_ = n_channels;
544  n_channels_gt_ = n_channels_gt;
545  SetTensorsShape();
546  }
547 };
548 
559 Image MakeGrid(Tensor*& t, int cols = 8, bool normalize = false);
560 
564 } // namespace ecvl
565 
566 #endif // ECVL_SUPPORT_EDDL_H_
ProducersConsumerQueue(unsigned mxsz, unsigned thresh)
Definition: support_eddl.h:162
Dataset Augmentations.
Definition: support_eddl.h:95
std::vector< int > resize_dims_
Dimensions (HxW) to which Dataset images must be resized.
Definition: support_eddl.h:325
std::vector< int > current_batch_
Number of batches already loaded for each split.
Definition: support_eddl.h:327
std::vector< Split > split_
Splits of the Dataset. See Split.
Image class.
Definition: image.h:66
int batch_size_
Size of each dataset mini batch.
Definition: support_eddl.h:326
int n_channels_
Number of channels of the images.
Definition: support_eddl.h:323
void SetWorkers(const unsigned num_workers)
Change the number of workers.
Definition: support_eddl.h:523
void Pop(Sample &sample, Tensor *&image, Tensor *&label)
Pop a sample from the queue.
Definition: support_eddl.h:202
Class representing the thread counters.
Definition: support_eddl.h:271
std::pair< std::vector< int >, std::vector< int > > tensors_shape_
Shape of sample and label tensors.
Definition: support_eddl.h:332
int max_
Indices of samples managed by the thread in the interval [min_, max_).
Definition: support_eddl.h:275
Class that manages the producers-consumer queue of samples. The queue stores pairs of image and label...
Definition: support_eddl.h:143
void TensorToImage(const Tensor *t, Image &img)
Convert an EDDL Tensor into an ECVL Image.
void ToTensorPlane(const std::vector< int > &label, Tensor *&tensor)
Convert the sample labels in a one-hot encoded tensor and copy it to the batch tensor.
std::vector< std::vector< ThreadCounters > > splits_tc_
Each dataset split has its own vector of threads, each of which has its counters: <counter,...
Definition: support_eddl.h:293
int vsize(const std::vector< T > &v)
Definition: image.h:35
void FreeLockedOnPush()
Free threads locked on a push operation.
Definition: support_eddl.h:188
std::vector< std::string > classes_
Vector with all the classes available in the Dataset.
int n_channels_gt_
Number of channels of the ground truth images.
Definition: support_eddl.h:324
ColorType
Enum class representing the ECVL supported color spaces.
Definition: image.h:44
void SetBatchSize(int bs)
Set a new batch size inside the dataset.
virtual void ProduceImageLabel(DatasetAugmentations &augs, Sample &elem)
Load a sample and its label, and push them to the producers-consumer queue.
unsigned num_workers_
Number of parallel workers.
Definition: support_eddl.h:292
void InitTC(int split_index)
Set which are the indices of the samples managed by each thread.
void TensorToView(const Tensor *t, View< DataType::float32 > &v)
Convert an EDDL Tensor into an ECVL View.
void ImageToTensor(const Image &img, Tensor *&t)
Convert an ECVL Image into an EDDL Tensor.
size_t Length() const
Calculate the current size of the queue.
Definition: support_eddl.h:237
void SetNumChannels(const int n_channels, const int n_channels_gt=1)
Change the number of channels of the Image produced by ECVL and update the internal EDDL tensors shap...
Definition: support_eddl.h:541
bool active_
Whether the threads have already been launched or not.
Definition: support_eddl.h:295
DatasetAugmentations(const DatasetAugmentations &other)
Definition: support_eddl.h:102
std::mutex active_mutex_
Mutex for active_ variable.
Definition: support_eddl.h:296
void SetTensorsShape()
Set internal EDDL tensors shape.
Definition: support_eddl.h:306
Definition: any.h:69
static std::default_random_engine re_
Engine used for random number generation.
Definition: support_eddl.h:297
void SetSize(int max_size, int thresh=-1)
Set the maximum size of the queue and optionally the threshold from which restart to produce samples.
Definition: support_eddl.h:248
int current_split_
Current split from which images are loaded.
ColorType ctype_
ecvl::ColorType of the Dataset images.
Definition: support_eddl.h:328
Task task_
Task of the dataset.
DatasetAugmentations(const std::vector< std::shared_ptr< Augmentation >> &augs)
Definition: support_eddl.h:99
int counter_
Index of the sample currently used by the thread.
Definition: support_eddl.h:274
const int GetNumBatches(const ecvl::any &split=-1)
Get the number of batches of the specified split.
ProducersConsumerQueue(unsigned mxsz)
Definition: support_eddl.h:157
#define ECVL_ERROR_WORKERS_LESS_THAN_ONE
Definition: support_eddl.h:34
void Stop()
Join all the threads.
void ResetBatch(const ecvl::any &split=-1, bool shuffle=false)
Reset the batch counter and optionally shuffle samples indices of the specified split.
Sample image in a dataset.
void SetAugmentations(const DatasetAugmentations &da)
Set the dataset augmentations.
void LoadBatch(Tensor *&images, Tensor *&labels)
Load a batch into images and labels tensor.
DLDataset(const filesystem::path &filename, const int batch_size, const DatasetAugmentations &augs, const ColorType ctype=ColorType::RGB, const ColorType ctype_gt=ColorType::GRAY, const unsigned num_workers=1, const double queue_ratio_size=1., const std::unordered_map< std::string, bool > &drop_last=std::unordered_map< std::string, bool >{}, bool verify=false)
Definition: support_eddl.h:345
bool Apply(const int split, Image &img, const Image &gt=Image())
Definition: support_eddl.h:111
ThreadCounters(int min, int max)
Definition: support_eddl.h:277
ColorType ctype_gt_
ecvl::ColorType of the Dataset ground truth images.
Definition: support_eddl.h:329
bool Apply(SplitType st, Image &img, const Image &gt=Image())
Definition: support_eddl.h:126
ProducersConsumerQueue queue_
Producers-consumer queue of the dataset.
Definition: support_eddl.h:331
auto GetQueueSize() const
Get the current size of the producers-consumer queue of the dataset.
Definition: support_eddl.h:496
std::experimental::any any
Definition: any.h:71
#define ECVL_WARNING_MSG
void Reset()
Reset the thread counter to its minimum value.
Definition: support_eddl.h:279
DeepHealth Dataset.
ThreadCounters(int counter, int min, int max)
Definition: support_eddl.h:278
DatasetAugmentations augs_
ecvl::DatasetAugmentations to be applied to the Dataset images (and ground truth if exist) for each s...
Definition: support_eddl.h:330
bool IsFull() const
Check if the queue is full.
Definition: support_eddl.h:217
bool IsEmpty() const
Check if the queue is empty.
Definition: support_eddl.h:227
static void SetSplitSeed(unsigned seed)
Set a fixed seed for the random generated values. Useful to reproduce experiments with same shuffling...
Definition: support_eddl.h:452
SplitType
Enum class representing the Dataset supported splits.
void Push(const Sample &sample, Tensor *const image, Tensor *const label)
Push a sample in the queue.
Definition: support_eddl.h:172
void Start(int split_index=-1)
Spawn num_workers thread.
std::vector< Sample > samples_
Vector containing all the Dataset samples. See Sample.
DeepHealth Deep Learning Dataset.
Definition: support_eddl.h:288
#define ECVL_ERROR_AUG_DOES_NOT_EXIST
Definition: support_eddl.h:33
void ThreadFunc(int thread_index)
Function called when the thread are spawned.
std::tuple< std::vector< Sample >, std::shared_ptr< Tensor >, std::shared_ptr< Tensor > > GetBatch()
Pop batch_size samples from the queue and copy them into EDDL tensors.
Image MakeGrid(Tensor *&t, int cols=8, bool normalize=false)
Make a grid of images from a EDDL Tensor.
void ResetAllBatches(bool shuffle=false)
Reset the batch counter of each split and optionally shuffle samples indices (within each split).
std::vector< std::thread > producers_
Vector of threads representing the samples producers.
Definition: support_eddl.h:294