Halide  20.0.0
Halide compiler and libraries
thread_pool_common.h
Go to the documentation of this file.
1 #define EXTENDED_DEBUG 0
2 
3 #if EXTENDED_DEBUG
4 // This code is currently setup for Linux debugging. Switch to using pthread_self on e.g. Mac OS X.
5 extern "C" int syscall(int);
6 
7 namespace {
8 int gettid() {
9 #ifdef BITS_32
10  return syscall(224);
11 #else
12  return syscall(186);
13 #endif
14 }
15 } // namespace
16 
17 // clang-format off
18 #define log_message(stuff) do { print(nullptr) << gettid() << ": " << stuff << "\n"; } while (0)
19 // clang-format on
20 
21 #else
22 
23 // clang-format off
24 #define log_message(stuff) do { /*nothing*/ } while (0)
25 // clang-format on
26 
27 #endif
28 
29 namespace Halide {
30 namespace Runtime {
31 namespace Internal {
32 
33 // A condition variable, augmented with a bit of spinning on an atomic counter
34 // before going to sleep for real. This helps reduce overhead at the end of a
35 // parallel for loop when idle worker threads are waiting for other threads to
36 // finish so that the next parallel for loop can begin.
40 
41  void wait(halide_mutex *mutex) {
42  // First spin for a bit, checking the counter for another thread to bump
43  // it.
44  uintptr_t initial;
45  Synchronization::atomic_load_relaxed(&counter, &initial);
46  halide_mutex_unlock(mutex);
47  for (int spin = 0; spin < 40; spin++) {
49  uintptr_t current;
50  Synchronization::atomic_load_relaxed(&counter, &current);
51  if (current != initial) {
52  halide_mutex_lock(mutex);
53  return;
54  }
55  }
56 
57  // Give up on spinning and relock the mutex preparing to sleep for real.
58  halide_mutex_lock(mutex);
59 
60  // Check one final time with the lock held. This guarantees we won't
61  // miss an increment of the counter because it is only ever incremented
62  // with the lock held.
63  uintptr_t current;
64  Synchronization::atomic_load_relaxed(&counter, &current);
65  if (current != initial) {
66  return;
67  }
68 
69  halide_cond_wait(&cond, mutex);
70  }
71 
72  void broadcast() {
73  // Release any spinning waiters
74  Synchronization::atomic_fetch_add_acquire_release(&counter, (uintptr_t)1);
75 
76  // Release any sleeping waiters
78  }
79 
80  // Note that this cond var variant doesn't have signal(), because it always
81  // wakes all spinning waiters.
82 };
83 
84 struct work {
86 
87  // If we come in to the task system via do_par_for we just have a
88  // halide_task_t, not a halide_loop_task_t.
90 
96 
97  void *user_context;
101  // which condition variable is the owner sleeping on. nullptr if it isn't sleeping.
103 
108  // Note that we don't release the semaphores already
109  // acquired. We never have two consumers contending
110  // over the same semaphore, so it's not helpful to do
111  // so.
112  return false;
113  }
114  }
115  // Future iterations of this task need to acquire the semaphores from scratch.
116  next_semaphore = 0;
117  return true;
118  }
119 
120  ALWAYS_INLINE bool running() const {
121  return task.extent || active_workers;
122  }
123 };
124 
126  if (threads > MAX_THREADS) {
127  return MAX_THREADS;
128  } else if (threads < 1) {
129  return 1;
130  } else {
131  return threads;
132  }
133 }
134 
136  char *threads_str = getenv("HL_NUM_THREADS");
137  if (!threads_str) {
138  // Legacy name for HL_NUM_THREADS
139  threads_str = getenv("HL_NUMTHREADS");
140  }
141  return threads_str ?
142  atoi(threads_str) :
144 }
145 
146 // The work queue and thread pool is weak, so one big work queue is shared by all halide functions
147 struct work_queue_t {
148  // all fields are protected by this mutex.
150 
151  // The desired number threads doing work (HL_NUM_THREADS).
153 
154  // All fields after this must be zero in the initial state. See assert_zeroed
155  // Field serves both to mark the offset in struct and as layout padding.
157 
158  // Singly linked list for job stack
160 
161  // The number threads created
163 
164  // Workers sleep on one of two condition variables, to make it
165  // easier to wake up the right number if a small number of tasks
166  // are enqueued. There are A-team workers and B-team workers. The
167  // following variables track the current size and the desired size
168  // of the A team.
170 
171  // The condition variables that workers and owners sleep on. We
172  // may want to wake them up independently. Any code that may
173  // invalidate any of the reasons a worker or owner may have slept
174  // must signal or broadcast the appropriate condition variable.
176 
177  // The number of sleeping workers and owners. An over-estimate - a
178  // waking-up thread may not have decremented this yet.
180 
181  // Keep track of threads so they can be joined at shutdown
182  halide_thread *threads[MAX_THREADS];
183 
184  // Global flags indicating the threadpool should shut down, and
185  // whether the thread pool has been initialized.
187 
188  // The number of threads that are currently commited to possibly block
189  // via outstanding jobs queued or being actively worked on. Used to limit
190  // the number of iterations of parallel for loops that are invoked so as
191  // to prevent deadlock due to oversubscription of threads.
193 
194  ALWAYS_INLINE bool running() const {
195  return !shutdown;
196  }
197 
198  // Used to check initial state is correct.
200  // Assert that all fields except the mutex and desired threads count are zeroed.
201  const char *bytes = ((const char *)&this->zero_marker);
202  const char *limit = ((const char *)this) + sizeof(work_queue_t);
203  while (bytes < limit && *bytes == 0) {
204  bytes++;
205  }
206  halide_abort_if_false(nullptr, bytes == limit && "Logic error in thread pool work queue initialization.\n");
207  }
208 
209  // Return the work queue to initial state. Must be called while locked
210  // and queue will remain locked.
212  // Ensure all fields except the mutex and desired hreads count are zeroed.
213  char *bytes = ((char *)&this->zero_marker);
214  char *limit = ((char *)this) + sizeof(work_queue_t);
215  memset(bytes, 0, limit - bytes);
216  }
217 };
218 
220 
221 #if EXTENDED_DEBUG
222 
223 WEAK void print_job(work *job, const char *indent, const char *prefix = nullptr) {
224  if (prefix == nullptr) {
225  prefix = indent;
226  }
227  const char *name = job->task.name ? job->task.name : "<no name>";
228  const char *parent_name = job->parent_job ? (job->parent_job->task.name ? job->parent_job->task.name : "<no name>") : "<no parent job>";
229  log_message(prefix << name << "[" << job << "] serial: " << job->task.serial << " active_workers: " << job->active_workers << " min: " << job->task.min << " extent: " << job->task.extent << " siblings: " << job->siblings << " sibling count: " << job->sibling_count << " min_threads " << job->task.min_threads << " next_sempaphore: " << job->next_semaphore << " threads_reserved: " << job->threads_reserved << " parent_job: " << parent_name << "[" << job->parent_job << "]");
230  for (int i = 0; i < job->task.num_semaphores; i++) {
231  log_message(indent << " semaphore " << (void *)job->task.semaphores[i].semaphore << " count " << job->task.semaphores[i].count << " val " << *(int *)job->task.semaphores[i].semaphore);
232  }
233 }
234 
235 WEAK void dump_job_state() {
236  log_message("Dumping job state, jobs in queue:");
237  work *job = work_queue.jobs;
238  while (job != nullptr) {
239  print_job(job, " ");
240  job = job->next_job;
241  }
242  log_message("Done dumping job state.");
243 }
244 
245 #else
246 
247 // clang-format off
248 #define print_job(job, indent, prefix) do { /*nothing*/ } while (0)
249 #define dump_job_state() do { /*nothing*/ } while (0)
250 // clang-format on
251 
252 #endif
253 
254 WEAK void worker_thread(void *);
255 
257  while (owned_job ? owned_job->running() : !work_queue.shutdown) {
258  work *job = work_queue.jobs;
259  work **prev_ptr = &work_queue.jobs;
260 
261  if (owned_job) {
262  if (owned_job->exit_status != halide_error_code_success) {
263  if (owned_job->active_workers == 0) {
264  while (job != owned_job) {
265  prev_ptr = &job->next_job;
266  job = job->next_job;
267  }
268  *prev_ptr = job->next_job;
269  job->task.extent = 0;
270  continue; // So loop exit is always in the same place.
271  }
272  } else if (owned_job->parent_job && owned_job->parent_job->exit_status != halide_error_code_success) {
273  owned_job->exit_status = owned_job->parent_job->exit_status;
274  // The wakeup can likely be only done under certain conditions, but it is only happening
275  // in when an error has already occured and it seems more important to ensure reliable
276  // termination than to optimize this path.
278  continue;
279  }
280  }
281 
282  dump_job_state();
283 
284  // Find a job to run, prefering things near the top of the stack.
285  while (job) {
286  print_job(job, "", "Considering job ");
287  // Only schedule tasks with enough free worker threads
288  // around to complete. They may get stolen later, but only
289  // by tasks which can themselves use them to complete
290  // work, so forward progress is made.
291  bool enough_threads;
292 
293  work *parent_job = job->parent_job;
294 
295  int threads_available;
296  if (parent_job == nullptr) {
297  // The + 1 is because work_queue.threads_created does not include the main thread.
298  threads_available = (work_queue.threads_created + 1) - work_queue.threads_reserved;
299  } else {
300  if (parent_job->active_workers == 0) {
301  threads_available = parent_job->task.min_threads - parent_job->threads_reserved;
302  } else {
303  threads_available = parent_job->active_workers * parent_job->task.min_threads - parent_job->threads_reserved;
304  }
305  }
306  enough_threads = threads_available >= job->task.min_threads;
307 
308  if (!enough_threads) {
309  log_message("Not enough threads for job " << job->task.name << " available: " << threads_available << " min_threads: " << job->task.min_threads);
310  }
311  bool can_use_this_thread_stack = !owned_job || (job->siblings == owned_job->siblings) || job->task.min_threads == 0;
312  if (!can_use_this_thread_stack) {
313  log_message("Cannot run job " << job->task.name << " on this thread.");
314  }
315  bool can_add_worker = (!job->task.serial || (job->active_workers == 0));
316  if (!can_add_worker) {
317  log_message("Cannot add worker to job " << job->task.name);
318  }
319 
320  if (enough_threads && can_use_this_thread_stack && can_add_worker) {
321  if (job->make_runnable()) {
322  break;
323  } else {
324  log_message("Cannot acquire semaphores for " << job->task.name);
325  }
326  }
327  prev_ptr = &(job->next_job);
328  job = job->next_job;
329  }
330 
331  if (!job) {
332  // There is no runnable job. Go to sleep.
333  if (owned_job) {
335  owned_job->owner_is_sleeping = true;
337  owned_job->owner_is_sleeping = false;
339  } else {
342  // Transition to B team
346  } else {
348  }
350  }
351  continue;
352  }
353 
354  log_message("Working on job " << job->task.name);
355 
356  // Increment the active_worker count so that other threads
357  // are aware that this job is still in progress even
358  // though there are no outstanding tasks for it.
359  job->active_workers++;
360 
361  if (job->parent_job == nullptr) {
363  log_message("Reserved " << job->task.min_threads << " on work queue for " << job->task.name << " giving " << work_queue.threads_reserved << " of " << work_queue.threads_created + 1);
364  } else {
366  log_message("Reserved " << job->task.min_threads << " on " << job->parent_job->task.name << " for " << job->task.name << " giving " << job->parent_job->threads_reserved << " of " << job->parent_job->task.min_threads);
367  }
368 
369  int result = halide_error_code_success;
370 
371  if (job->task.serial) {
372  // Remove it from the stack while we work on it
373  *prev_ptr = job->next_job;
374 
375  // Release the lock and do the task.
377  int total_iters = 0;
378  int iters = 1;
379  while (result == halide_error_code_success) {
380  // Claim as many iterations as possible
381  while ((job->task.extent - total_iters) > iters &&
382  job->make_runnable()) {
383  iters++;
384  }
385  if (iters == 0) {
386  break;
387  }
388 
389  // Do them
390  result = halide_do_loop_task(job->user_context, job->task.fn,
391  job->task.min + total_iters, iters,
392  job->task.closure, job);
393  total_iters += iters;
394  iters = 0;
395  }
397 
398  job->task.min += total_iters;
399  job->task.extent -= total_iters;
400 
401  // Put it back on the job stack, if it hasn't failed.
402  if (result != halide_error_code_success) {
403  job->task.extent = 0; // Force job to be finished.
404  } else if (job->task.extent > 0) {
405  job->next_job = work_queue.jobs;
406  work_queue.jobs = job;
407  }
408  } else {
409  // Claim a task from it.
410  work myjob = *job;
411  job->task.min++;
412  job->task.extent--;
413 
414  // If there were no more tasks pending for this job, remove it
415  // from the stack.
416  if (job->task.extent == 0) {
417  *prev_ptr = job->next_job;
418  }
419 
420  // Release the lock and do the task.
422  if (myjob.task_fn) {
423  result = halide_do_task(myjob.user_context, myjob.task_fn,
424  myjob.task.min, myjob.task.closure);
425  } else {
426  result = halide_do_loop_task(myjob.user_context, myjob.task.fn,
427  myjob.task.min, 1,
428  myjob.task.closure, job);
429  }
431  }
432 
433  if (result != halide_error_code_success) {
434  log_message("Saw thread pool saw error from task: " << (int)result);
435  }
436 
437  bool wake_owners = false;
438 
439  // If this task failed, set the exit status on the job.
440  if (result != halide_error_code_success) {
441  job->exit_status = result;
442  // Mark all siblings as also failed.
443  for (int i = 0; i < job->sibling_count; i++) {
444  log_message("Marking " << job->sibling_count << " siblings ");
446  job->siblings[i].exit_status = result;
447  wake_owners |= (job->active_workers == 0 && job->siblings[i].owner_is_sleeping);
448  }
449  log_message("Done marking siblings.");
450  }
451  }
452 
453  if (job->parent_job == nullptr) {
455  log_message("Returned " << job->task.min_threads << " to work queue for " << job->task.name << " giving " << work_queue.threads_reserved << " of " << work_queue.threads_created + 1);
456  } else {
458  log_message("Returned " << job->task.min_threads << " to " << job->parent_job->task.name << " for " << job->task.name << " giving " << job->parent_job->threads_reserved << " of " << job->parent_job->task.min_threads);
459  }
460 
461  // We are no longer active on this job
462  job->active_workers--;
463 
464  log_message("Done working on job " << job->task.name);
465 
466  if (wake_owners ||
467  (job->active_workers == 0 && (job->task.extent == 0 || job->exit_status != halide_error_code_success) && job->owner_is_sleeping)) {
468  // The job is done or some owned job failed via sibling linkage. Wake up the owner.
470  }
471  }
472 }
473 
474 WEAK void worker_thread(void *arg) {
478 }
479 
480 WEAK void enqueue_work_already_locked(int num_jobs, work *jobs, work *task_parent) {
481  if (!work_queue.initialized) {
483 
484  // Compute the desired number of threads to use. Other code
485  // can also mess with this value, but only when the work queue
486  // is locked.
489  }
491  work_queue.initialized = true;
492  }
493 
494  // Gather some information about the work.
495 
496  // Some tasks require a minimum number of threads to make forward
497  // progress. Also assume the blocking tasks need to run concurrently.
498  int min_threads = 0;
499 
500  // Count how many workers to wake. Start at -1 because this thread
501  // will contribute.
502  int workers_to_wake = -1;
503 
504  // Could stalled owners of other tasks conceivably help with one
505  // of these jobs.
506  bool stealable_jobs = false;
507 
508  bool job_has_acquires = false;
509  bool job_may_block = false;
510  for (int i = 0; i < num_jobs; i++) {
511  if (jobs[i].task.min_threads == 0) {
512  stealable_jobs = true;
513  } else {
514  job_may_block = true;
515  min_threads += jobs[i].task.min_threads;
516  }
517  if (jobs[i].task.num_semaphores != 0) {
518  job_has_acquires = true;
519  }
520 
521  if (jobs[i].task.serial) {
522  workers_to_wake++;
523  } else {
524  workers_to_wake += jobs[i].task.extent;
525  }
526  }
527 
528  if (task_parent == nullptr) {
529  // This is here because some top-level jobs may block, but are not accounted for
530  // in any enclosing min_threads count. In order to handle extern stages and such
531  // correctly, we likely need to make the total min_threads for an invocation of
532  // a pipeline a property of the entire thing. This approach works because we use
533  // the increased min_threads count to increase the size of the thread pool. It should
534  // even be safe against reservation races because this is happening under the work
535  // queue lock and that lock will be held into running the job. However that's many
536  // lines of code from here to there and it is not guaranteed this will be the first
537  // job run.
538  if (job_has_acquires || job_may_block) {
539  log_message("enqueue_work_already_locked adding one to min_threads.");
540  min_threads += 1;
541  }
542 
543  // Spawn more threads if necessary.
544  while (work_queue.threads_created < MAX_THREADS &&
546  (work_queue.threads_created + 1) - work_queue.threads_reserved < min_threads)) {
547  // We might need to make some new threads, if work_queue.desired_threads_working has
548  // increased, or if there aren't enough threads to complete this new task.
552  }
553  log_message("enqueue_work_already_locked top level job " << jobs[0].task.name << " with min_threads " << min_threads << " work_queue.threads_created " << work_queue.threads_created << " work_queue.threads_reserved " << work_queue.threads_reserved);
554  if (job_has_acquires || job_may_block) {
556  }
557  } else {
558  log_message("enqueue_work_already_locked job " << jobs[0].task.name << " with min_threads " << min_threads << " task_parent " << task_parent->task.name << " task_parent->task.min_threads " << task_parent->task.min_threads << " task_parent->threads_reserved " << task_parent->threads_reserved);
559  halide_abort_if_false(nullptr, (min_threads <= ((task_parent->task.min_threads * task_parent->active_workers) -
560  task_parent->threads_reserved)) &&
561  "Logic error: thread over commit.\n");
562  if (job_has_acquires || job_may_block) {
563  task_parent->threads_reserved++;
564  }
565  }
566 
567  // Push the jobs onto the stack.
568  for (int i = num_jobs - 1; i >= 0; i--) {
569  // We could bubble it downwards based on some heuristics, but
570  // it's not strictly necessary to do so.
571  jobs[i].next_job = work_queue.jobs;
572  jobs[i].siblings = &jobs[0];
573  jobs[i].sibling_count = num_jobs;
574  jobs[i].threads_reserved = 0;
575  work_queue.jobs = jobs + i;
576  }
577 
578  bool nested_parallelism =
581 
582  // Wake up an appropriate number of threads
583  if (nested_parallelism || workers_to_wake > work_queue.workers_sleeping) {
584  // If there's nested parallelism going on, we just wake up
585  // everyone. TODO: make this more precise.
587  } else {
588  work_queue.target_a_team_size = workers_to_wake;
589  }
590 
594  if (stealable_jobs) {
596  }
597  }
598 
599  if (job_has_acquires || job_may_block) {
600  if (task_parent != nullptr) {
601  task_parent->threads_reserved--;
602  } else {
604  }
605  }
606 }
607 
615 
616 } // namespace Internal
617 } // namespace Runtime
618 } // namespace Halide
619 
620 using namespace Halide::Runtime::Internal;
621 
622 extern "C" {
623 
624 namespace {
625 WEAK __attribute__((destructor)) void halide_thread_pool_cleanup() {
627 }
628 } // namespace
629 
631  uint8_t *closure) {
632  return f(user_context, idx, closure);
633 }
634 
636  int min, int extent, uint8_t *closure,
637  void *task_parent) {
638  return f(user_context, min, extent, closure, task_parent);
639 }
640 
642  int min, int size, uint8_t *closure) {
643  if (size <= 0) {
645  }
646 
647  work job;
648  job.task.fn = nullptr;
649  job.task.min = min;
650  job.task.extent = size;
651  job.task.serial = false;
652  job.task.semaphores = nullptr;
653  job.task.num_semaphores = 0;
654  job.task.closure = closure;
655  job.task.min_threads = 0;
656  job.task.name = nullptr;
657  job.task_fn = f;
660  job.active_workers = 0;
661  job.next_semaphore = 0;
662  job.owner_is_sleeping = false;
663  job.siblings = &job; // guarantees no other job points to the same siblings.
664  job.sibling_count = 0;
665  job.parent_job = nullptr;
667  enqueue_work_already_locked(1, &job, nullptr);
670  return job.exit_status;
671 }
672 
674  struct halide_parallel_task_t *tasks,
675  void *task_parent) {
676  work *jobs = (work *)__builtin_alloca(sizeof(work) * num_tasks);
677 
678  for (int i = 0; i < num_tasks; i++) {
679  if (tasks->extent <= 0) {
680  // Skip extent zero jobs
681  num_tasks--;
682  continue;
683  }
684  jobs[i].task = *tasks++;
685  jobs[i].task_fn = nullptr;
686  jobs[i].user_context = user_context;
688  jobs[i].active_workers = 0;
689  jobs[i].next_semaphore = 0;
690  jobs[i].owner_is_sleeping = false;
691  jobs[i].parent_job = (work *)task_parent;
692  }
693 
694  if (num_tasks == 0) {
696  }
697 
699  enqueue_work_already_locked(num_tasks, jobs, (work *)task_parent);
700  int exit_status = halide_error_code_success;
701  for (int i = 0; i < num_tasks; i++) {
702  // It doesn't matter what order we join the tasks in, because
703  // we'll happily assist with siblings too.
705  if (jobs[i].exit_status != halide_error_code_success) {
706  exit_status = jobs[i].exit_status;
707  }
708  }
710  return exit_status;
711 }
712 
714  if (n < 0) {
715  halide_error(nullptr, "halide_set_num_threads: must be >= 0.");
716  }
717  // Don't make this an atomic swap - we don't want to be changing
718  // the desired number of threads while another thread is in the
719  // middle of a sequence of non-atomic operations.
721  if (n == 0) {
723  }
727  return old;
728 }
729 
734  return n;
735 }
736 
738  if (work_queue.initialized) {
739  // Wake everyone up and tell them the party's over and it's time
740  // to go home
742 
743  work_queue.shutdown = true;
748 
749  // Wait until they leave
750  for (int i = 0; i < work_queue.threads_created; i++) {
752  }
753 
754  // Tidy up
755  work_queue.reset();
756  }
757 }
758 
760  int value;
761 };
762 
765  Halide::Runtime::Internal::Synchronization::atomic_store_release(&sem->value, &n);
766  return n;
767 }
768 
771  int old_val = Halide::Runtime::Internal::Synchronization::atomic_fetch_add_acquire_release(&sem->value, n);
772  // TODO(abadams|zvookin): Is this correct if an acquire can be for say count of 2 and the releases are 1 each?
773  if (old_val == 0 && n != 0) { // Don't wake if nothing released.
774  // We may have just made a job runnable
779  }
780  return old_val + n;
781 }
782 
784  if (n == 0) {
785  return true;
786  }
788  // Decrement and get new value
789  int expected;
790  int desired;
791  Halide::Runtime::Internal::Synchronization::atomic_load_acquire(&sem->value, &expected);
792  do {
793  desired = expected - n;
794  } while (desired >= 0 &&
795  !Halide::Runtime::Internal::Synchronization::atomic_cas_weak_relacq_relaxed(&sem->value, &expected, &desired));
796  return desired >= 0;
797 }
798 
801  custom_do_task = f;
802  return result;
803 }
804 
808  return result;
809 }
810 
813  custom_do_par_for = f;
814  return result;
815 }
816 
818  halide_do_par_for_t do_par_for,
819  halide_do_task_t do_task,
820  halide_do_loop_task_t do_loop_task,
821  halide_do_parallel_tasks_t do_parallel_tasks,
822  halide_semaphore_init_t semaphore_init,
823  halide_semaphore_try_acquire_t semaphore_try_acquire,
824  halide_semaphore_release_t semaphore_release) {
825 
826  custom_do_par_for = do_par_for;
827  custom_do_task = do_task;
828  custom_do_loop_task = do_loop_task;
829  custom_do_parallel_tasks = do_parallel_tasks;
830  custom_semaphore_init = semaphore_init;
831  custom_semaphore_try_acquire = semaphore_try_acquire;
832  custom_semaphore_release = semaphore_release;
833 }
834 
836  uint8_t *closure) {
837  return (*custom_do_task)(user_context, f, idx, closure);
838 }
839 
841  int min, int size, uint8_t *closure) {
842  return (*custom_do_par_for)(user_context, f, min, size, closure);
843 }
844 
846  int min, int size, uint8_t *closure, void *task_parent) {
847  return custom_do_loop_task(user_context, f, min, size, closure, task_parent);
848 }
849 
850 WEAK int halide_do_parallel_tasks(void *user_context, int num_tasks,
851  struct halide_parallel_task_t *tasks,
852  void *task_parent) {
853  return custom_do_parallel_tasks(user_context, num_tasks, tasks, task_parent);
854 }
855 
856 WEAK int halide_semaphore_init(struct halide_semaphore_t *sema, int count) {
857  return custom_semaphore_init(sema, count);
858 }
859 
860 WEAK int halide_semaphore_release(struct halide_semaphore_t *sema, int count) {
861  return custom_semaphore_release(sema, count);
862 }
863 
864 WEAK bool halide_semaphore_try_acquire(struct halide_semaphore_t *sema, int count) {
865  return custom_semaphore_try_acquire(sema, count);
866 }
867 }
int(* halide_semaphore_release_t)(struct halide_semaphore_t *, int)
void halide_cond_wait(struct halide_cond *cond, struct halide_mutex *mutex)
int(* halide_do_par_for_t)(void *, halide_task_t, int, int, uint8_t *)
Set a custom method for performing a parallel for loop.
int(* halide_task_t)(void *user_context, int task_number, uint8_t *closure)
Define halide_do_par_for to replace the default thread pool implementation.
void halide_mutex_lock(struct halide_mutex *mutex)
A basic set of mutex and condition variable functions, which call platform specific code for mutual e...
void halide_mutex_unlock(struct halide_mutex *mutex)
struct halide_thread * halide_spawn_thread(void(*f)(void *), void *closure)
Spawn a thread.
int(* halide_do_loop_task_t)(void *, halide_loop_task_t, int, int, uint8_t *, void *)
The version of do_task called for loop tasks.
bool(* halide_semaphore_try_acquire_t)(struct halide_semaphore_t *, int)
int(* halide_loop_task_t)(void *user_context, int min, int extent, uint8_t *closure, void *task_parent)
A task representing a serial for loop evaluated over some range.
void halide_join_thread(struct halide_thread *)
Join a thread.
@ halide_error_code_success
There was no error.
void halide_cond_broadcast(struct halide_cond *cond)
int(* halide_do_task_t)(void *, halide_task_t, int, uint8_t *)
If you use the default do_par_for, you can still set a custom handler to perform each individual task...
int(* halide_semaphore_init_t)(struct halide_semaphore_t *, int)
void halide_error(void *user_context, const char *)
Halide calls this function on runtime errors (for example bounds checking failures).
int(* halide_do_parallel_tasks_t)(void *, int, struct halide_parallel_task_t *, void *task_parent)
Provide an entire custom tasking runtime via function pointers.
WEAK halide_semaphore_release_t custom_semaphore_release
WEAK halide_semaphore_init_t custom_semaphore_init
WEAK halide_do_task_t custom_do_task
WEAK halide_do_par_for_t custom_do_par_for
ALWAYS_INLINE int clamp_num_threads(int threads)
WEAK void worker_thread(void *)
WEAK void enqueue_work_already_locked(int num_jobs, work *jobs, work *task_parent)
WEAK halide_do_parallel_tasks_t custom_do_parallel_tasks
WEAK void worker_thread_already_locked(work *owned_job)
WEAK halide_do_loop_task_t custom_do_loop_task
WEAK halide_semaphore_try_acquire_t custom_semaphore_try_acquire
This file defines the class FunctionDAG, which is our representation of a Halide pipeline,...
Expr min(const FuncRef &a, const FuncRef &b)
Explicit overloads of min and max for FuncRef.
Definition: Func.h:597
WEAK int halide_host_cpu_count()
__UINTPTR_TYPE__ uintptr_t
int atoi(const char *)
unsigned __INT8_TYPE__ uint8_t
void * memset(void *s, int val, size_t n)
void halide_thread_yield()
char * getenv(const char *)
#define ALWAYS_INLINE
#define halide_abort_if_false(user_context, cond)
#define WEAK
halide_thread * threads[MAX_THREADS]
ALWAYS_INLINE void assert_zeroed() const
ALWAYS_INLINE bool running() const
Cross platform condition variable.
Cross-platform mutex.
A parallel task to be passed to halide_do_parallel_tasks.
struct halide_semaphore_acquire_t * semaphores
halide_loop_task_t fn
struct halide_semaphore_t * semaphore
An opaque struct representing a semaphore.
WEAK void halide_set_custom_parallel_runtime(halide_do_par_for_t do_par_for, halide_do_task_t do_task, halide_do_loop_task_t do_loop_task, halide_do_parallel_tasks_t do_parallel_tasks, halide_semaphore_init_t semaphore_init, halide_semaphore_try_acquire_t semaphore_try_acquire, halide_semaphore_release_t semaphore_release)
WEAK int halide_default_semaphore_release(halide_semaphore_t *s, int n)
WEAK halide_do_task_t halide_set_custom_do_task(halide_do_task_t f)
#define dump_job_state()
WEAK bool halide_default_semaphore_try_acquire(halide_semaphore_t *s, int n)
WEAK bool halide_semaphore_try_acquire(struct halide_semaphore_t *sema, int count)
WEAK int halide_default_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
WEAK int halide_get_num_threads()
Get or set the number of threads used by Halide's thread pool.
WEAK halide_do_loop_task_t halide_set_custom_do_loop_task(halide_do_loop_task_t f)
WEAK int halide_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
WEAK int halide_semaphore_init(struct halide_semaphore_t *sema, int count)
WEAK int halide_default_do_loop_task(void *user_context, halide_loop_task_t f, int min, int extent, uint8_t *closure, void *task_parent)
#define log_message(stuff)
WEAK halide_do_par_for_t halide_set_custom_do_par_for(halide_do_par_for_t f)
#define print_job(job, indent, prefix)
WEAK int halide_do_loop_task(void *user_context, halide_loop_task_t f, int min, int size, uint8_t *closure, void *task_parent)
WEAK int halide_do_parallel_tasks(void *user_context, int num_tasks, struct halide_parallel_task_t *tasks, void *task_parent)
Enqueue some number of the tasks described above and wait for them to complete.
WEAK void halide_shutdown_thread_pool()
WEAK int halide_default_semaphore_init(halide_semaphore_t *s, int n)
WEAK int halide_default_do_par_for(void *user_context, halide_task_t f, int min, int size, uint8_t *closure)
The default versions of the parallel runtime functions.
WEAK int halide_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
WEAK int halide_default_do_task(void *user_context, halide_task_t f, int idx, uint8_t *closure)
WEAK int halide_set_num_threads(int n)
WEAK int halide_semaphore_release(struct halide_semaphore_t *sema, int count)
void * user_context