SCIP Doxygen Documentation
 
Loading...
Searching...
No Matches
tpi_openmp.c
Go to the documentation of this file.
1/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2/* */
3/* This file is part of the program and library */
4/* SCIP --- Solving Constraint Integer Programs */
5/* */
6/* Copyright (c) 2002-2023 Zuse Institute Berlin (ZIB) */
7/* */
8/* Licensed under the Apache License, Version 2.0 (the "License"); */
9/* you may not use this file except in compliance with the License. */
10/* You may obtain a copy of the License at */
11/* */
12/* http://www.apache.org/licenses/LICENSE-2.0 */
13/* */
14/* Unless required by applicable law or agreed to in writing, software */
15/* distributed under the License is distributed on an "AS IS" BASIS, */
16/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
17/* See the License for the specific language governing permissions and */
18/* limitations under the License. */
19/* */
20/* You should have received a copy of the Apache-2.0 license */
21/* along with SCIP; see the file LICENSE. If not visit scipopt.org. */
22/* */
23/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
24
25/**@file tpi_openmp.c
26 * @ingroup TASKINTERFACE
27 * @brief the interface functions for openmp
28 * @author Stephen J. Maher
29 * @author Leona Gottwald
30 */
31
32/*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
33
34#include "tpi/tpi.h"
36
37/** A job added to the queue */
39{
40 int jobid; /**< id to identify jobs from a common process */
41 struct SCIP_Job* nextjob; /**< pointer to the next job in the queue */
42 SCIP_RETCODE (*jobfunc)(void* args);/**< pointer to the job function */
43 void* args; /**< pointer to the function arguments */
44 SCIP_RETCODE retcode; /**< return code of the job */
45};
46
47/** the thread pool job queue */
49{
50 SCIP_JOB* firstjob; /**< pointer to the first job in the queue */
51 SCIP_JOB* lastjob; /**< pointer to the last job in the queue */
52 int njobs; /**< number of jobs in the queue */
53};
55
57{
58 SCIP_JOBQUEUE jobqueue; /**< queue of unprocessed jobs */
59 SCIP_JOB** currentjobs; /**< array with slot for each thread to store the currently running job */
60 int ncurrentjobs; /**< number of currently running jobs */
61 int nthreads; /**< number of threads */
62 SCIP_JOBQUEUE finishedjobs; /**< jobqueue containing the finished jobs */
63 SCIP_LOCK lock; /**< lock to protect this stucture from concurrent access */
64 SCIP_CONDITION jobfinished; /**< condition to signal if a job was finished */
65};
67
68static SCIP_JOBQUEUES* _jobqueues = NULL;
69
70
71/** create job queue */
72static
74 int nthreads, /**< the number of threads */
75 int qsize, /**< the queue size */
76 SCIP_Bool blockwhenfull /**< should the queue be blocked from new jobs when full */
77 )
78{
79 int i;
80
81 assert(nthreads >= 0);
82 assert(qsize >= 0);
83 SCIP_UNUSED( blockwhenfull );
84
85 /* allocting memory for the job queue */
86 SCIP_ALLOC( BMSallocMemory(&_jobqueues) );
87 _jobqueues->jobqueue.firstjob = NULL;
88 _jobqueues->jobqueue.lastjob = NULL;
89 _jobqueues->jobqueue.njobs = 0;
90 _jobqueues->finishedjobs.firstjob = NULL;
91 _jobqueues->finishedjobs.lastjob = NULL;
92 _jobqueues->finishedjobs.njobs = 0;
93 _jobqueues->ncurrentjobs = 0;
94
95 _jobqueues->nthreads = nthreads;
97
98 for( i = 0; i < nthreads; ++i )
99 _jobqueues->currentjobs[i] = NULL;
100
101 SCIP_CALL( SCIPtpiInitLock(&_jobqueues->lock) );
103
104 return SCIP_OKAY;
105}
106
107
108/** free job queue */
109static
111 void
112 )
113{
114 assert(_jobqueues != NULL);
115
116 SCIPtpiDestroyLock(&_jobqueues->lock);
118 BMSfreeMemoryArray(&_jobqueues->currentjobs);
119
120 BMSfreeMemory(&_jobqueues);
121
122 return SCIP_OKAY;
123}
124
125
126/** execute job */
127static
129 SCIP_JOB* job /**< the job to be executed in parallel */
130 )
131{
132 int threadnum;
133
135
136 SCIP_CALL_ABORT( SCIPtpiAcquireLock(&_jobqueues->lock) );
137 _jobqueues->currentjobs[threadnum] = job;
138 SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
139
140 job->retcode = (*(job->jobfunc))(job->args);
141
142 SCIP_CALL_ABORT( SCIPtpiAcquireLock(&_jobqueues->lock) );
143 _jobqueues->ncurrentjobs--;
144 _jobqueues->currentjobs[threadnum] = NULL;
145
146 /* insert job into finished jobs */
147 if( _jobqueues->finishedjobs.njobs == 0 )
148 {
149 _jobqueues->finishedjobs.firstjob = job;
150 _jobqueues->finishedjobs.lastjob = job;
151 }
152 else
153 {
154 _jobqueues->finishedjobs.lastjob->nextjob = job;
155 _jobqueues->finishedjobs.lastjob = job;
156 }
157
158 ++_jobqueues->finishedjobs.njobs;
159
161
162 SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
163}
164
165
166/** process jobs from job queue
167 *
168 * The job will only be added when the number of active jobs is equal to the number of threads.
169 * As such, there will always be number of threads + 1 tasks available for the scheduler to run.
170 */
171static
173 void
174 )
175{
176 SCIP_JOB* job;
177
178 SCIP_CALL_ABORT( SCIPtpiAcquireLock(&_jobqueues->lock) );
179
180 while( _jobqueues->ncurrentjobs == SCIPtpiGetNumThreads() )
181 {
182 SCIP_CALL_ABORT( SCIPtpiWaitCondition(&_jobqueues->jobfinished, &_jobqueues->lock) );
183 }
184
185 if( _jobqueues->jobqueue.njobs == 1 )
186 {
187 job = _jobqueues->jobqueue.firstjob;
188 _jobqueues->jobqueue.firstjob = NULL;
189 _jobqueues->jobqueue.lastjob = NULL;
190 --(_jobqueues->jobqueue.njobs);
191 }
192 else if( _jobqueues->jobqueue.njobs > 1 )
193 {
194 job = _jobqueues->jobqueue.firstjob;
195 _jobqueues->jobqueue.firstjob = job->nextjob;
196 --_jobqueues->jobqueue.njobs;
197 }
198 else
199 {
200 job = NULL;
201 }
202
203 ++(_jobqueues->ncurrentjobs);
204 SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
205
206 if( job )
207 {
209 }
210}
211
212
213/** adding a job to the job queue
214 *
215 * This gives some more flexibility in the handling of new jobs.
216 * IMPORTANT: This function MUST be called from within a mutex.
217 */
218static
221 )
222{
223 /* @todo we want to work out what to do with a full job queue. Is there a problem if the limit is hit? */
224 /* @note it is important to have a queuesize. This will stop the code submitting infinitely many jobs. */
225 assert(newjob != NULL);
226
227 newjob->nextjob = NULL;
228
229 /* This function queries the current job list. This could change by other threads writing to the list. So a lock is
230 * required to ensure that the current joblist remains static. */
231 SCIP_CALL( SCIPtpiAcquireLock(&_jobqueues->lock) );
232
233 /* checking the status of the job queue */
234 if( _jobqueues->ncurrentjobs == SCIPtpiGetNumThreads() )
235 {
236 if( _jobqueues->jobqueue.njobs == 0 )
237 {
238 _jobqueues->jobqueue.firstjob = newjob;
239 _jobqueues->jobqueue.lastjob = newjob;
240 }
241 else /* it is assumed that the jobqueue is not full */
242 {
243 _jobqueues->jobqueue.lastjob->nextjob = newjob;
244 _jobqueues->jobqueue.lastjob = newjob;
245 }
246
247 _jobqueues->jobqueue.njobs++;
248
249 SCIP_CALL( SCIPtpiReleaseLock(&_jobqueues->lock) );
250
251 #pragma omp task
253 }
254 else
255 {
257
258 _jobqueues->ncurrentjobs++;
259
260 SCIP_CALL( SCIPtpiReleaseLock(&_jobqueues->lock) );
261 /* running the new job */
262 #pragma omp task firstprivate(newjob)
264 }
265
266 return SCIP_OKAY;
267}
268
269
270/** signal a condition */
272 SCIP_CONDITION* condition /**< condition to signal */
273 )
274{
275 assert( condition != NULL );
276
278
279 if( condition->_waitnum > condition->_signals )
280 ++condition->_signals;
281
283
284 return SCIP_OKAY;
285}
286
287
288/** broadcase a condition */
290 SCIP_CONDITION* condition /**< broadcast a condition */
291 )
292{
293 assert( condition != NULL );
294
296 condition->_signals = condition->_waitnum;
298
299 return SCIP_OKAY;
300}
301
302
303/** wait for a condition */
305 SCIP_CONDITION* condition, /**< condition to wait for */
306 SCIP_LOCK* lock /**< corresponding lock */
307 )
308{
309 int waitnum;
310
312
314 waitnum = ++condition->_waitnum;
315
316 ++condition->_waiters;
317
318 do
319 {
321 #pragma omp taskyield
323 }
324 while( condition->_signals < waitnum );
325
326 --condition->_waiters;
327
328 if( condition->_waiters == 0 )
329 {
330 condition->_signals = 0;
331 condition->_waitnum = 0;
332 }
333
335
337
338 return SCIP_OKAY;
339}
340
341/** returns the number of threads */
343 )
344{
345 return omp_get_num_threads();
346}
347
348/** returns the thread number */
350 )
351{
352 return omp_get_thread_num();
353}
354
355/** creates a job for parallel processing */
357 SCIP_JOB** job, /**< pointer to the job that will be created */
358 int jobid, /**< the id for the current job */
359 SCIP_RETCODE (*jobfunc)(void* args),/**< pointer to the job function */
360 void* jobarg /**< the job's argument */
361 )
362{
364
365 (*job)->jobid = jobid;
366 (*job)->jobfunc = jobfunc;
367 (*job)->args = jobarg;
368 (*job)->nextjob = NULL;
369
370 return SCIP_OKAY;
371}
372
373/** get a new job id for the new set of submitted jobs */
375 void
376 )
377{
378 static int currentjobid = 0;
379 int jobid;
380
381 #pragma omp atomic capture
382 jobid = ++currentjobid;
383
384 return jobid;
385}
386
387/** submit a job for parallel processing; the return value is a globally defined status */
389 SCIP_JOB* job, /**< pointer to the job to be submitted */
390 SCIP_SUBMITSTATUS* status /**< pointer to store the submit status */
391 )
392{
393 assert(_jobqueues != NULL);
394
395 *status = SCIP_SUBMIT_SUCCESS;
397
398 return SCIP_OKAY;
399}
400
401
402/** check whether a job is running */
403static
404SCIP_Bool isJobRunning(
405 int jobid /**< job id to check */
406 )
407{
408 int i;
409
410 if( _jobqueues->ncurrentjobs > 0 )
411 {
412 for( i = 0; i < _jobqueues->nthreads; ++i )
413 {
414 if( _jobqueues->currentjobs[i] != NULL && _jobqueues->currentjobs[i]->jobid == jobid )
415 return TRUE;
416 }
417 }
418
419 return FALSE;
420}
421
422
423/** check whether a job is waiting */
424static
425SCIP_Bool isJobWaiting(
426 int jobid /**< job id to check */
427 )
428{
429 if( _jobqueues->jobqueue.njobs > 0 )
430 {
432 currjob = _jobqueues->jobqueue.firstjob;
433
434 do
435 {
436 if( currjob->jobid == jobid )
437 return TRUE;
438
439 if( currjob == _jobqueues->jobqueue.lastjob )
440 break;
441
442 currjob = currjob->nextjob;
443 }
444 while( TRUE ); /*lint !e506*/
445 }
446
447 return FALSE;
448}
449
450
451/** blocks until all jobs of the given jobid have finished
452 * and then returns the smallest SCIP_RETCODE of all the jobs */
454 int jobid /**< the jobid of the jobs to wait for */
455 )
456{
457 SCIP_RETCODE retcode;
458
459 retcode = SCIP_OKAY;
460 SCIP_CALL( SCIPtpiAcquireLock(&_jobqueues->lock) );
461
462 while( isJobRunning(jobid) || isJobWaiting(jobid) )
463 {
464 SCIP_CALL( SCIPtpiWaitCondition(&_jobqueues->jobfinished, &_jobqueues->lock) );
465 }
466
467 if( _jobqueues->finishedjobs.njobs > 0 )
468 {
469 SCIP_JOB* currjob = _jobqueues->finishedjobs.firstjob;
471
472 /* finding the location of the processed job in the currentjobs queue */
473 do
474 {
475 if( currjob->jobid == jobid )
476 {
477 SCIP_JOB* nextjob;
478
479 /* if the job has the right jobid collect its retcode, remove it from the finished job list, and free it */
480 retcode = MIN(retcode, currjob->retcode);
481
482 /* removing the finished job from finished jobs list */
483 if( currjob == _jobqueues->finishedjobs.firstjob )
484 _jobqueues->finishedjobs.firstjob = currjob->nextjob;
485 else
486 {
487 if( prevjob != NULL )
488 prevjob->nextjob = currjob->nextjob; /*lint !e613*/
489 }
490
491 if( currjob == _jobqueues->finishedjobs.lastjob )
492 _jobqueues->finishedjobs.lastjob = prevjob;
493
494 _jobqueues->finishedjobs.njobs--;
495
496 /* update currjob and free finished job; prevjob stays the same */
497 nextjob = currjob->nextjob;
499 currjob = nextjob;
500 }
501 else
502 {
504 currjob = prevjob->nextjob;
505 }
506 }
507 while( prevjob != _jobqueues->finishedjobs.lastjob );
508 }
509 else
510 {
511 /* given jobid was not submitted */
512 printf("err1");
513 retcode = SCIP_ERROR;
514 }
515
516 SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
517
518 return retcode;
519}
520
521/** initializes tpi */
523 int nthreads, /**< the number of threads to be used */
524 int queuesize, /**< the size of the queue */
525 SCIP_Bool blockwhenfull /**< should the queue block when full */
526 )
527{
529 assert(_jobqueues == NULL);
530
531 SCIP_CALL( createJobQueue(nthreads, queuesize, blockwhenfull) );
532
533 return SCIP_OKAY;
534}
535
536/** deinitializes tpi */
538 void
539 )
540{
541 assert(_jobqueues != NULL);
542 assert(_jobqueues->finishedjobs.njobs == 0);
543 assert(_jobqueues->jobqueue.njobs == 0);
544 assert(_jobqueues->ncurrentjobs == 0);
545
547
548 return SCIP_OKAY;
549}
#define SCIP_UNUSED(x)
Definition def.h:442
#define SCIP_ALLOC(x)
Definition def.h:399
#define TRUE
Definition def.h:95
#define FALSE
Definition def.h:96
#define SCIP_CALL_ABORT(x)
Definition def.h:367
#define SCIP_CALL(x)
Definition def.h:388
return SCIP_OKAY
assert(minobj< SCIPgetCutoffbound(scip))
#define NULL
Definition lpi_spx1.cpp:161
memory allocation routines
#define BMSfreeMemory(ptr)
Definition memory.h:147
#define BMSallocMemoryArray(ptr, num)
Definition memory.h:125
#define BMSfreeMemoryArray(ptr)
Definition memory.h:149
#define BMSallocMemory(ptr)
Definition memory.h:120
SCIP_JOB * lastjob
Definition tpi_openmp.c:51
SCIP_JOB * firstjob
Definition tpi_openmp.c:50
SCIP_JOB ** currentjobs
Definition tpi_openmp.c:59
SCIP_JOBQUEUE finishedjobs
Definition tpi_openmp.c:62
SCIP_LOCK lock
Definition tpi_openmp.c:63
SCIP_JOBQUEUE jobqueue
Definition tpi_openmp.c:58
SCIP_CONDITION jobfinished
Definition tpi_openmp.c:64
SCIP_RETCODE retcode
Definition tpi_openmp.c:44
struct SCIP_Job * nextjob
Definition tpi_openmp.c:41
void * args
Definition tpi_openmp.c:43
SCIP_RETCODE(* jobfunc)(void *args)
Definition tpi_openmp.c:42
int jobid
Definition tpi_openmp.c:40
the type definitions for the SCIP parallel interface
SCIP_RETCODE SCIPtpiInitCondition(SCIP_LOCK *lock)
SCIP_RETCODE SCIPtpiAcquireLock(SCIP_LOCK *lock)
SCIP_RETCODE SCIPtpiInitLock(SCIP_LOCK *lock)
void SCIPtpiDestroyLock(SCIP_LOCK *lock)
void SCIPtpiDestroyCondition(SCIP_LOCK *lock)
SCIP_RETCODE SCIPtpiReleaseLock(SCIP_LOCK *lock)
SCIP_RETCODE SCIPtpiWaitCondition(SCIP_CONDITION *condition, SCIP_LOCK *lock)
Definition tpi_openmp.c:304
SCIP_RETCODE SCIPtpiCreateJob(SCIP_JOB **job, int jobid, SCIP_RETCODE(*jobfunc)(void *args), void *jobarg)
Definition tpi_openmp.c:356
SCIP_RETCODE SCIPtpiSignalCondition(SCIP_CONDITION *condition)
Definition tpi_openmp.c:271
static SCIP_RETCODE jobQueueAddJob(SCIP_JOB *newjob)
Definition tpi_openmp.c:219
int SCIPtpiGetNumThreads()
Definition tpi_openmp.c:342
SCIP_RETCODE SCIPtpiExit(void)
Definition tpi_openmp.c:537
static void executeJob(SCIP_JOB *job)
Definition tpi_openmp.c:128
SCIP_RETCODE SCIPtpiBroadcastCondition(SCIP_CONDITION *condition)
Definition tpi_openmp.c:289
static SCIP_RETCODE freeJobQueue(void)
Definition tpi_openmp.c:110
static void jobQueueProcessJob(void)
Definition tpi_openmp.c:172
SCIP_RETCODE SCIPtpiCollectJobs(int jobid)
Definition tpi_openmp.c:453
SCIP_RETCODE SCIPtpiSumbitJob(SCIP_JOB *job, SCIP_SUBMITSTATUS *status)
Definition tpi_openmp.c:388
static SCIP_RETCODE createJobQueue(int nthreads, int qsize, SCIP_Bool blockwhenfull)
Definition tpi_openmp.c:73
static SCIP_Bool isJobRunning(int jobid)
Definition tpi_openmp.c:404
static SCIP_Bool isJobWaiting(int jobid)
Definition tpi_openmp.c:425
int SCIPtpiGetNewJobID(void)
Definition tpi_openmp.c:374
int SCIPtpiGetThreadNum()
Definition tpi_openmp.c:349
SCIP_RETCODE SCIPtpiInit(int nthreads, int queuesize, SCIP_Bool blockwhenfull)
Definition tpi_openmp.c:522
@ SCIP_ERROR
enum SCIP_Retcode SCIP_RETCODE
enum SCIP_Submitstatus SCIP_SUBMITSTATUS
Definition type_tpi.h:54
@ SCIP_SUBMIT_SUCCESS
Definition type_tpi.h:52