support_eddl.h
Go to the documentation of this file.
1 /*
2 * ECVL - European Computer Vision Library
3 * Version: 1.0.0
4 * copyright (c) 2021, Università degli Studi di Modena e Reggio Emilia (UNIMORE), AImageLab
5 * Authors:
6 * Costantino Grana (costantino.grana@unimore.it)
7 * Federico Bolelli (federico.bolelli@unimore.it)
8 * Michele Cancilla (michele.cancilla@unimore.it)
9 * Laura Canalini (laura.canalini@unimore.it)
10 * Stefano Allegretti (stefano.allegretti@unimore.it)
11 * All rights reserved.
12 */
13 
14 #ifndef ECVL_SUPPORT_EDDL_H_
15 #define ECVL_SUPPORT_EDDL_H_
16 
17 #include "ecvl/augmentations.h"
18 #include "ecvl/core/filesystem.h"
19 #include "ecvl/core/image.h"
20 #include "ecvl/dataset_parser.h"
21 
22 #include <eddl/apis/eddl.h>
23 
24 #include <algorithm>
25 #include <condition_variable>
26 #include <mutex>
27 #include <queue>
28 #include <thread>
29 #include <tuple>
30 
31 namespace ecvl
32 {
33 #define ECVL_ERROR_AUG_DOES_NOT_EXIST throw std::runtime_error(ECVL_ERROR_MSG "Augmentation for this split does not exist");
34 #define ECVL_ERROR_WORKERS_LESS_THAN_ONE throw std::runtime_error(ECVL_ERROR_MSG "Dataset workers must be at least one");
35 
45 void ImageToTensor(const Image& img, Tensor*& t);
46 
57 void ImageToTensor(const Image& img, Tensor*& t, const int& offset);
58 
71 void TensorToImage(const Tensor* t, Image& img);
72 
85 void TensorToView(const Tensor* t, View<DataType::float32>& v);
86 
96 {
97  std::vector<std::shared_ptr<Augmentation>> augs_;
98 public:
99  DatasetAugmentations(const std::vector<std::shared_ptr<Augmentation>>& augs) : augs_(augs) {}
100 
101  // This makes a deep copy of the Augmentations
103  {
104  for (const auto& a : other.augs_) {
105  augs_.emplace_back(a ? a->Clone() : nullptr);
106  }
107  }
108 
109  // Getters: YAGNI
110 
111  bool Apply(const int split, Image& img, const Image& gt = Image())
112  {
113  // check if the augs for split st are provided
114  try {
115  if (augs_.at(split)) {
116  augs_[split]->Apply(img, gt);
117  return true;
118  }
119  return false;
120  }
121  catch (const std::out_of_range) {
123  }
124  }
125 
126  bool Apply(SplitType st, Image& img, const Image& gt = Image())
127  {
128  return Apply(+st, img, gt); // Magic + operator
129  }
130 
131  bool IsEmpty() const
132  {
133  return augs_.empty();
134  }
135 };
136 
144 {
145  std::condition_variable cond_notempty_;
146  std::condition_variable cond_notfull_;
147  mutable std::mutex mutex_;
148  std::queue<std::tuple<Sample, Tensor*, Tensor*>> cpq_;
149  unsigned max_size_;
150  unsigned threshold_;
152 public:
157  ProducersConsumerQueue(unsigned mxsz) : max_size_(mxsz), threshold_(max_size_ / 2) {}
162  ProducersConsumerQueue(unsigned mxsz, unsigned thresh) : max_size_(mxsz), threshold_(thresh) {}
163 
172  void Push(const Sample& sample, Tensor* const image, Tensor* const label)
173  {
174  std::unique_lock<std::mutex> lock(mutex_);
175  cond_notfull_.wait(lock, [this]() { return cpq_.size() < max_size_; });
176  cpq_.push(std::make_tuple(sample, image, label));
177  cond_notempty_.notify_one();
178  }
179 
189  void Pop(Sample& sample, Tensor*& image, Tensor*& label)
190  {
191  std::unique_lock<std::mutex> lock(mutex_);
192  cond_notempty_.wait(lock, [this]() { return !cpq_.empty(); });
193  std::tie(sample, image, label) = cpq_.front();
194  cpq_.pop();
195  if (cpq_.size() < threshold_) {
196  cond_notfull_.notify_all();
197  }
198  }
199 
204  bool IsFull() const
205  {
206  std::unique_lock<std::mutex> lock(mutex_);
207  return cpq_.size() >= max_size_;
208  }
209 
214  bool IsEmpty() const
215  {
216  std::unique_lock<std::mutex> lock(mutex_);
217  return cpq_.empty();
218  }
219 
224  size_t Length() const
225  {
226  std::unique_lock<std::mutex> lock(mutex_);
227  return cpq_.size();
228  }
229 
235  void SetSize(int max_size, int thresh = -1)
236  {
237  max_size_ = max_size;
238  threshold_ = thresh != -1 ? thresh : max_size / 2;
239  }
240 
241  void Clear()
242  {
243  std::unique_lock<std::mutex> lock(mutex_);
244 
245  // Remove residual samples and delete data
246  while (!cpq_.empty()) {
247  cpq_.pop();
248  }
249  }
250 };
251 
259 {
260 public:
261  int counter_;
262  int min_, max_;
264  ThreadCounters(int min, int max) : counter_{ min }, min_{ min }, max_{ max } {}
265  ThreadCounters(int counter, int min, int max) : counter_{ counter }, min_{ min }, max_{ max } {}
266  void Reset() { counter_ = min_; }
267 };
268 
275 class DLDataset : public Dataset
276 {
277  const unsigned processor_count_ = std::thread::hardware_concurrency();
278 protected:
280  std::vector<int> current_batch_;
284  unsigned num_workers_;
286  std::pair< std::vector<int>, std::vector<int>> tensors_shape_;
287  std::vector<std::vector<ThreadCounters>> splits_tc_;
288  std::vector<std::thread> producers_;
289  bool active_ = false;
290  std::mutex active_mutex_;
291  static std::default_random_engine re_;
297  void InitTC(int split_index);
298 
301  {
302  switch (task_) {
304  tensors_shape_ = std::make_pair<std::vector<int>, std::vector<int>>(
306  { batch_size_, vsize(classes_) });
307  break;
308  case Task::segmentation:
309  tensors_shape_ = std::make_pair<std::vector<int>, std::vector<int>>(
312  break;
313  }
314  }
315 
316 public:
318  int n_channels_gt_ = -1;
319  std::vector<int> resize_dims_;
332  DLDataset(const filesystem::path& filename,
333  const int batch_size,
334  const DatasetAugmentations& augs,
335  const ColorType ctype = ColorType::RGB,
336  const ColorType ctype_gt = ColorType::GRAY,
337  const unsigned num_workers = 1,
338  const double queue_ratio_size = 1.,
339  const std::vector<bool>& drop_last = {},
340  bool verify = false) :
341 
342  Dataset{ filename, verify },
343  batch_size_{ batch_size },
344  augs_(augs),
345  num_workers_{ std::min(num_workers, processor_count_) },
346  ctype_{ ctype },
347  ctype_gt_{ ctype_gt },
348  queue_{ static_cast<unsigned>(batch_size * queue_ratio_size * std::min(num_workers, processor_count_)) }
349  {
350  // resize current_batch_ to the number of splits and initialize it with 0
351  current_batch_.resize(split_.size(), 0);
352 
353  // Initialize n_channels_
354  Image tmp = samples_[0].LoadImage(ctype);
355  n_channels_ = tmp.Channels();
356 
357  if (!split_.empty()) {
358  current_split_ = 0;
359  // Initialize resize_dims_ after that augmentations on the first image are performed
360  if (augs_.IsEmpty()) {
361  std::cout << ECVL_WARNING_MSG << "Augmentations are empty!" << std::endl;
362  }
363  else {
364  while (!augs_.Apply(current_split_, tmp)) {
365  ++current_split_;
366  }
367  }
368  auto y = tmp.channels_.find('y');
369  auto x = tmp.channels_.find('x');
370  assert(y != std::string::npos && x != std::string::npos);
371  resize_dims_.insert(resize_dims_.begin(), { tmp.dims_[y],tmp.dims_[x] });
372 
373  // Initialize n_channels_gt_ if exists
374  if (samples_[0].label_path_ != nullopt) {
375  n_channels_gt_ = samples_[0].LoadImage(ctype_gt_, true).Channels();
376  }
377  }
378  else {
379  std::cout << ECVL_WARNING_MSG << "Missing splits in the dataset file." << std::endl;
380  }
381 
382  // Set drop_last parameter for each split
383  if (!drop_last.empty()) {
384  if (vsize(drop_last) == vsize(split_)) {
385  for (int i = 0; i < vsize(drop_last); ++i) {
386  split_[i].drop_last_ = drop_last[i];
387  }
388  }
389  else {
390  std::cout << ECVL_WARNING_MSG << "drop_last is not empty but the provided size is different from the size of the splits. The default value 'false' is set for all the splits" << std::endl;
391  }
392  }
393 
394  // Initialize num_batches, last_batch and the ThreadCounters for each split
395  auto s_index = 0;
396  splits_tc_ = std::vector<std::vector<ThreadCounters>>(vsize(split_));
397  for (auto& s : split_) {
398  s.SetNumBatches(batch_size_);
399  s.SetLastBatch(batch_size_);
400 
401  InitTC(s_index);
402  ++s_index;
403  }
404 
405  SetTensorsShape();
406  }
407 
414  void ResetBatch(const ecvl::any& split = -1, bool shuffle = false);
415 
420  void ResetAllBatches(bool shuffle = false);
421 
427  void LoadBatch(Tensor*& images, Tensor*& labels);
428 
433  void LoadBatch(Tensor*& images);
434 
439  static void SetSplitSeed(unsigned seed) { re_.seed(seed); }
440 
446  void SetBatchSize(int bs);
447 
454  virtual void ProduceImageLabel(DatasetAugmentations& augs, Sample& elem);
455 
462  void ThreadFunc(int thread_index);
463 
468  std::tuple<std::vector<Sample>, std::shared_ptr<Tensor>, std::shared_ptr<Tensor>> GetBatch();
469 
474  void Start(int split_index = -1);
475 
477  void Stop();
478 
483  auto GetQueueSize() const { return queue_.Length(); };
484 
489  void SetAugmentations(const DatasetAugmentations& da);
490 
497  const int GetNumBatches(const ecvl::any& split = -1);
498 
504  void ToTensorPlane(const std::vector<int>& label, Tensor*& tensor);
505 
510  void SetWorkers(const unsigned num_workers)
511  {
512  if (num_workers < 0) {
514  }
515 
516  num_workers_ = std::min(num_workers, processor_count_);
517  for (int i = 0; i < vsize(split_); ++i) {
518  InitTC(i);
519  }
520  }
521 
528  void SetNumChannels(const int n_channels, const int n_channels_gt = 1)
529  {
530  n_channels_ = n_channels;
531  n_channels_gt_ = n_channels_gt;
532  SetTensorsShape();
533  }
534 };
535 
546 Image MakeGrid(Tensor*& t, int cols = 8, bool normalize = false);
547 
551 } // namespace ecvl
552 
553 #endif // ECVL_SUPPORT_EDDL_H_
ProducersConsumerQueue(unsigned mxsz, unsigned thresh)
Definition: support_eddl.h:162
Dataset Augmentations.
Definition: support_eddl.h:95
std::vector< int > resize_dims_
Dimensions (HxW) to which Dataset images must be resized.
Definition: support_eddl.h:319
std::vector< int > current_batch_
Number of batches already loaded for each split.
Definition: support_eddl.h:280
std::vector< Split > split_
Splits of the Dataset. See Split.
Image class.
Definition: image.h:72
int batch_size_
Size of each dataset mini batch.
Definition: support_eddl.h:279
int n_channels_
Number of channels of the images.
Definition: support_eddl.h:317
void SetWorkers(const unsigned num_workers)
Change the number of workers.
Definition: support_eddl.h:510
void Pop(Sample &sample, Tensor *&image, Tensor *&label)
Pop a sample from the queue.
Definition: support_eddl.h:189
Class representing the thread counters.
Definition: support_eddl.h:258
std::pair< std::vector< int >, std::vector< int > > tensors_shape_
Shape of sample and label tensors.
Definition: support_eddl.h:286
int max_
Indices of samples managed by the thread in the interval [min_, max_).
Definition: support_eddl.h:262
Class that manages the producers-consumer queue of samples. The queue stores pairs of image and label...
Definition: support_eddl.h:143
void TensorToImage(const Tensor *t, Image &img)
Convert an EDDL Tensor into an ECVL Image.
void ToTensorPlane(const std::vector< int > &label, Tensor *&tensor)
Convert the sample labels in a one-hot encoded tensor and copy it to the batch tensor.
std::vector< std::vector< ThreadCounters > > splits_tc_
Each dataset split has its own vector of threads, each of which has its counters: <counter,...
Definition: support_eddl.h:287
int vsize(const std::vector< T > &v)
Definition: image.h:34
std::vector< std::string > classes_
Vector with all the classes available in the Dataset.
int n_channels_gt_
Number of channels of the ground truth images.
Definition: support_eddl.h:318
ColorType
Enum class representing the ECVL supported color spaces.
Definition: image.h:50
void SetBatchSize(int bs)
Set a new batch size inside the dataset.
virtual void ProduceImageLabel(DatasetAugmentations &augs, Sample &elem)
Load a sample and its label, and push them to the producers-consumer queue.
unsigned num_workers_
Number of parallel workers.
Definition: support_eddl.h:284
void InitTC(int split_index)
Set which are the indices of the samples managed by each thread.
void TensorToView(const Tensor *t, View< DataType::float32 > &v)
Convert an EDDL Tensor into an ECVL View.
void ImageToTensor(const Image &img, Tensor *&t)
Convert an ECVL Image into an EDDL Tensor.
size_t Length() const
Calculate the current size of the queue.
Definition: support_eddl.h:224
void SetNumChannels(const int n_channels, const int n_channels_gt=1)
Change the number of channels of the Image produced by ECVL and update the internal EDDL tensors shap...
Definition: support_eddl.h:528
bool active_
Whether the threads have already been launched or not.
Definition: support_eddl.h:289
DatasetAugmentations(const DatasetAugmentations &other)
Definition: support_eddl.h:102
std::mutex active_mutex_
Mutex for active_ variable.
Definition: support_eddl.h:290
void SetTensorsShape()
Set internal EDDL tensors shape.
Definition: support_eddl.h:300
Definition: any.h:69
static std::default_random_engine re_
Engine used for random number generation.
Definition: support_eddl.h:291
void SetSize(int max_size, int thresh=-1)
Set the maximum size of the queue and optionally the threshold from which restart to produce samples.
Definition: support_eddl.h:235
int current_split_
Current split from which images are loaded.
ColorType ctype_
ecvl::ColorType of the Dataset images.
Definition: support_eddl.h:281
Task task_
Task of the dataset.
DatasetAugmentations(const std::vector< std::shared_ptr< Augmentation >> &augs)
Definition: support_eddl.h:99
int counter_
Index of the sample currently used by the thread.
Definition: support_eddl.h:261
const int GetNumBatches(const ecvl::any &split=-1)
Get the number of batches of the specified split.
ProducersConsumerQueue(unsigned mxsz)
Definition: support_eddl.h:157
#define ECVL_ERROR_WORKERS_LESS_THAN_ONE
Definition: support_eddl.h:34
void Stop()
Join all the threads.
void ResetBatch(const ecvl::any &split=-1, bool shuffle=false)
Reset the batch counter and optionally shuffle samples indices of the specified split.
Sample image in a dataset.
void SetAugmentations(const DatasetAugmentations &da)
Set the dataset augmentations.
void LoadBatch(Tensor *&images, Tensor *&labels)
Load a batch into images and labels tensor.
bool Apply(const int split, Image &img, const Image &gt=Image())
Definition: support_eddl.h:111
ThreadCounters(int min, int max)
Definition: support_eddl.h:264
ColorType ctype_gt_
ecvl::ColorType of the Dataset ground truth images.
Definition: support_eddl.h:282
bool Apply(SplitType st, Image &img, const Image &gt=Image())
Definition: support_eddl.h:126
ProducersConsumerQueue queue_
Producers-consumer queue of the dataset.
Definition: support_eddl.h:285
auto GetQueueSize() const
Get the current size of the producers-consumer queue of the dataset.
Definition: support_eddl.h:483
std::experimental::any any
Definition: any.h:71
#define ECVL_WARNING_MSG
void Reset()
Reset the thread counter to its minimum value.
Definition: support_eddl.h:266
DeepHealth Dataset.
ThreadCounters(int counter, int min, int max)
Definition: support_eddl.h:265
DatasetAugmentations augs_
ecvl::DatasetAugmentations to be applied to the Dataset images (and ground truth if exist) for each s...
Definition: support_eddl.h:283
bool IsFull() const
Check if the queue is full.
Definition: support_eddl.h:204
bool IsEmpty() const
Check if the queue is empty.
Definition: support_eddl.h:214
static void SetSplitSeed(unsigned seed)
Set a fixed seed for the random generated values. Useful to reproduce experiments with same shuffling...
Definition: support_eddl.h:439
DLDataset(const filesystem::path &filename, const int batch_size, const DatasetAugmentations &augs, const ColorType ctype=ColorType::RGB, const ColorType ctype_gt=ColorType::GRAY, const unsigned num_workers=1, const double queue_ratio_size=1., const std::vector< bool > &drop_last={}, bool verify=false)
Definition: support_eddl.h:332
SplitType
Enum class representing the Dataset supported splits.
void Push(const Sample &sample, Tensor *const image, Tensor *const label)
Push a sample in the queue.
Definition: support_eddl.h:172
void Start(int split_index=-1)
Spawn num_workers thread.
std::vector< Sample > samples_
Vector containing all the Dataset samples. See Sample.
DeepHealth Deep Learning Dataset.
Definition: support_eddl.h:275
#define ECVL_ERROR_AUG_DOES_NOT_EXIST
Definition: support_eddl.h:33
void ThreadFunc(int thread_index)
Function called when the thread are spawned.
std::tuple< std::vector< Sample >, std::shared_ptr< Tensor >, std::shared_ptr< Tensor > > GetBatch()
Pop batch_size samples from the queue and copy them into EDDL tensors.
Image MakeGrid(Tensor *&t, int cols=8, bool normalize=false)
Make a grid of images from a EDDL Tensor.
void ResetAllBatches(bool shuffle=false)
Reset the batch counter of each split and optionally shuffle samples indices (within each split).
std::vector< std::thread > producers_
Vector of threads representing the samples producers.
Definition: support_eddl.h:288