source: trunk/src/libsrc/util/priority_queue.c @ 7513

Revision 7513, 17.8 KB checked in by baker, 6 months ago (diff)

cleanup warning: variable set but not used [-Wunused-but-set-variable]

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*
2** priority_queue.c
3**
4** This implements a priority transfer queue.
5**
6** Priority levels are defined in the .h file.
7**
8** The actual sorting is of the pointers in array queue->sorted.
9** Bear in mind these are pointers to the PRI_QUEUE elements in
10** queue->objects.
11**
12** The queue must first be initialized to a specified size
13** using the init_pri_queue function.  After that, the queue
14** is ready for use through add_item() and pop_next_item().
15** (pop_next_item always returns the next item of the highest
16** priority).
17**
18** SEE priority_queue.h for FUNCTION DESCRIPTIONS and RETURN CODES.
19**
20** CAUTION: This queue manages its own mutex, and thus the function
21**          calls do not need to be wrapped in such by the caller.
22*/
23#include <stdio.h>
24#include <stdlib.h>
25#include <string.h>
26
27#include "platform.h"
28#include "earthworm.h"
29#include "earthworm_complex_funcs.h"
30#include "priority_queue.h"
31
32/* #define LOG_DEBUG 1  */
33
34#ifdef LOG_DEBUG
35/* #define DEBUG_DETAILS 1 */
36#include <stdio.h>
37#endif
38
39/* Private definitions used to simplify the code by keeping the array
40** shifting in a single location.
41*/
42#define EW_PRI_SHIFT_DOWN -1
43#define EW_PRI_SHIFT_NONE  0
44#define EW_PRI_SHIFT_UP    1
45
46
47/*********************************************************
48**
49**********************************************************/
50int init_pri_queue( PRI_QUEUE   * p_queue
51                  , unsigned long p_max_items
52                  , unsigned long p_max_item_size )
53{
54   int r_status = EW_PRI_RETNORMAL;
55
56   if ( p_queue == NULL )
57   {
58      r_status = EW_PRI_RETQNULL;   
59   }
60   else
61   {
62      /*
63      ** Initialize structure in case deallocation needed later
64      */
65      p_queue->queuesize = 0;
66      p_queue->itemsused = 0;
67      p_queue->sorted    = NULL;
68      p_queue->entries   = NULL;
69      p_queue->data      = NULL;
70   }
71
72   if ( p_max_items < 1 )
73   {
74      r_status = EW_PRI_RETPARAM;
75   }
76
77   if ( r_status == 0 )
78   {
79      unsigned long _idx;
80
81      p_queue->queuesize = p_max_items;
82      p_queue->itemmaxsize = p_max_item_size;
83
84      /*
85      ** Allocate space for the sorted pointers to the object storage
86      */
87      if ( r_status == EW_PRI_RETNORMAL )
88      {
89         if ( ( p_queue->sorted = (PRI_QUEUE_ENTRY **) malloc( sizeof(PRI_QUEUE_ENTRY*) * p_max_items) ) ==  NULL )
90         {
91            r_status = EW_PRI_RETMALLOC;
92         }
93      }
94
95      /*
96      ** Allocate space for the object storage
97      */
98      if ( r_status == EW_PRI_RETNORMAL )
99      {
100         if ( ( p_queue->entries = (PRI_QUEUE_ENTRY *) malloc( sizeof(PRI_QUEUE_ENTRY) * p_max_items) ) ==  NULL )
101         {
102            r_status = EW_PRI_RETMALLOC;
103         }
104      }
105
106
107      /*
108      ** Allocate space for the object storage
109      */
110      if ( r_status == EW_PRI_RETNORMAL )
111      {
112         if ( ( p_queue->data = (char *) malloc(sizeof(char) * p_max_item_size * p_max_items) ) ==  NULL )
113         {
114            r_status = EW_PRI_RETMALLOC;
115         }
116      }
117     
118
119      /*
120      ** Initialize the priority containers,
121      ** Load the container pointers into the sorted list
122      */
123      if ( r_status == EW_PRI_RETNORMAL )
124      {
125         PRI_QUEUE_ENTRY  * _entry  = p_queue->entries;
126         PRI_QUEUE_ENTRY ** _sorted = p_queue->sorted;
127         for ( _idx = 0 ; _idx < p_max_items ; _idx++, _entry++, _sorted++ ) 
128         {
129            /* set priority to indicate no valid data */
130            _entry->pri = EW_PRIORITY_NONE;
131            _entry->length = 0;
132            /* assign data storage location to queue entry */
133            _entry->data = (char *)(p_queue->data + (_idx * p_max_item_size));
134            /* grab storage pointer for sorted array */
135            *_sorted = _entry;
136         }
137      }
138
139
140      /*
141      ** Initialize priority index locations to point to the first (zero)
142      ** location in the sorted array
143      */
144      if ( r_status == EW_PRI_RETNORMAL )
145      {
146         for ( _idx = 0 ; _idx < EW_PRIORITY_COUNT ; _idx++ ) {
147            p_queue->insert_indices[_idx] = 0;
148         }
149      }
150      CreateSpecificMutex( &(p_queue->lock) );
151   }
152
153
154   if ( r_status != EW_PRI_RETNORMAL )
155   {
156      release_pri_queue( p_queue );
157   }
158#ifdef LOG_DEBUG
159   else
160   {
161      char dbgstr[120];
162            sprintf( dbgstr
163                   , "DEBUG init: %d  %d  %d\n"
164                   , p_queue->sorted, p_queue->entries, p_queue->data
165                   );
166            logit( "t" 
167                 , dbgstr
168                 , "export_pri"
169                 , "MOD_EXPORT_SCN"
170                 );
171   }
172#endif
173   return r_status;
174}
175
176
177/*********************************************************
178**
179**********************************************************/
180void release_pri_queue( PRI_QUEUE * p_queue )
181{
182   p_queue->queuesize = 0;
183   p_queue->itemsused = 0;
184
185   CloseSpecificMutex( &p_queue->lock );
186
187   if ( p_queue->sorted != NULL )
188   {
189      free( p_queue->sorted );
190      p_queue->sorted = NULL;
191   }
192
193   if ( p_queue->entries != NULL )
194   {
195      free( p_queue->entries );
196      p_queue->entries = NULL;
197   }
198
199   if ( p_queue->data != NULL )
200   {
201      free( p_queue->data );
202      p_queue->data = NULL;
203   }
204}
205
206/*********************************************************
207**
208**********************************************************/
209int getNumOfElementsInQueue( PRI_QUEUE * p_queue )
210{
211   if ( p_queue == NULL )
212   {
213      return 0;
214   }
215   return p_queue->itemsused;
216}
217
218/*********************************************************
219**
220**********************************************************/
221int add_item( PRI_QUEUE * p_queue
222            , EW_PRIORITY p_priority
223            , MSG_LOGO    p_logo
224            , long        p_size
225            , PRI_DATA    p_data
226            )
227{
228#ifdef LOG_DEBUG
229   char dbgstr[120];
230#endif
231
232   int r_status = EW_PRI_RETNORMAL;
233
234   if ( p_queue == NULL )
235   {
236      return( EW_PRI_RETQNULL );
237   }
238
239   if ( p_queue->queuesize < 1 )
240   {
241      return( EW_PRI_RETNOTREADY );
242   }
243
244   RequestSpecificMutex( &p_queue->lock );
245
246   if ( p_size < 0 || p_queue->itemmaxsize < p_size )
247   {
248      return ( EW_PRI_RETMSGSIZE );
249   }
250   else
251   {
252/*    EW_PRIORITY _usePri = p_priority; */
253
254      long _queuesize = p_queue->queuesize
255         , _ins_index = p_queue->queuesize - 1 /* array location to insert new object */
256         , _src_index   /* array location to obtain container for insertion */
257         ;
258      PRI_QUEUE_ENTRY * _wrk_pointer;  /* item to be shifted */
259      int _shift_direction = EW_PRI_SHIFT_NONE
260         , _shift_stt
261         , _shift_end
262         , _idx             /* work index for loops */
263         ;
264      int _doSwap  = 0  /* swap old object for new */
265      , _updateIdx = 0  /* update the priority insert location indices */
266      ;
267
268      if ( p_priority < EW_PRIORITY_MIN || EW_PRIORITY_MAX < p_priority )
269      {
270         /* priority out of range, fall back to default */
271/*       _usePri = EW_PRIORITY_DEF; */
272         /* keep this return status unless worse one arises */
273         r_status = EW_PRI_RETBADPRI;
274      }
275
276
277      if ( p_queue->insert_indices[p_priority] < _queuesize )
278      {
279         /*
280         ** The insert location is within the array
281         */
282         _ins_index = p_queue->insert_indices[p_priority];
283
284
285         /* Check state of item at insert location */
286         _wrk_pointer = p_queue->sorted[_ins_index];
287
288         if (   _wrk_pointer->pri == 0           /* container at insert location unused */
289             || _ins_index == (_queuesize - 1) /* insert location is last position in the array */
290            )
291         {
292            /*
293            ** Use the item at the insert location as the source container
294            **
295            ** _shift_direction = 0;
296            */
297            _doSwap = 1;
298            _updateIdx = 1;
299            if ( p_queue->itemsused < _queuesize )
300            {
301               p_queue->itemsused++;  /* increment number of items used */
302            }
303         }
304         else
305         {  /*
306            ** the insert location is within the array space
307            ** the item at the insert location is used
308            ** the insert location is not the last position in the array
309            */
310           
311            /* check if the array is full */
312            if ( p_queue->itemsused < _queuesize )
313            {
314               /*
315               ** The array is not full, therefore there is at least one
316               ** unused item in the array.
317               **
318               ** Shift items down from insert point to first unused item
319               */
320               _src_index = p_queue->itemsused++;  /* increment number of items used */
321
322               /* GET THE SOURCE CONTAINER */
323               _wrk_pointer = p_queue->sorted[_src_index];
324
325               /* SHIFT ITEMS DOWN */
326               _shift_end = _ins_index;
327               _shift_stt = _src_index;
328               _shift_direction = EW_PRI_SHIFT_DOWN;
329
330               _doSwap = 1;
331               _updateIdx = 1;
332            }
333            else
334            {
335               /*
336               ** The array is full.
337               **
338               ** Since we've already ascertained that the insert point is within the array
339               ** we know that the item to be inserted is less than the highest priority
340               ** stored in the array because there is at least one item of a higher priority
341               ** following the items at the priority of the new item.
342               **
343               ** Get the priority of the last item in the array, use the earliest item
344               ** of that priority to make space for the new item.
345               ** This equates to shifting items from the insert location down to the
346               ** insert location of the priority before the last (the first item of
347               ** the last priority in the array).
348               */
349               _src_index = p_queue->insert_indices[ p_queue->sorted[_queuesize - 1]->pri - 1 ];
350
351               /* GET THE SOURCE CONTAINER */
352               _wrk_pointer = p_queue->sorted[_src_index];
353
354               /* SHIFT ITEMS DOWN */
355               _shift_end = _ins_index;
356               _shift_stt = _src_index;
357               _shift_direction = EW_PRI_SHIFT_DOWN;
358
359               _doSwap = 1;
360               _updateIdx = 1;
361
362               r_status = EW_PRI_RETDROP;
363            }
364         }
365      }  /*  insert_indices[p_priority] < _queuesize  */
366      else
367      {  /*  insert_indices[p_priority] == _queuesize
368         **
369         ** Insert index for the priority of the new item is past the end of
370         ** the array.  (The array is full.)
371         **
372         ** Therefore, this item can only be inserted if this item has the
373         ** same priority as the last item in the array and the priority
374         ** of the last item in the array is not of the minimum priority.
375         **
376         ** In such a case, will drop the earliest item of the same priority,
377         ** shift all other of the priority up, and tack the new item on at the end.
378         **
379         ** (That way, if the bandwidth opens up again and no others of this
380         ** priority are dropped, we can send a continuous stream of this
381         ** priority from the earliest point at which there was sufficient
382         ** bandwidth).
383         */
384         if (   p_queue->sorted[_queuesize - 1]->pri != EW_PRIORITY_MIN
385             && p_queue->insert_indices[p_priority - 1] < _queuesize
386            )
387         {
388            /*
389            ** The insert point for the priority immediately prior to this one
390            ** is before the end of the array, indicating that this item is of
391            ** the same priority as the last priority in the array.
392            */
393            /* GET THE SOURCE CONTAINER */
394            _wrk_pointer = p_queue->sorted[ p_queue->insert_indices[p_priority - 1] ];
395
396            /* SHIFT ITEMS UP */
397            _shift_stt = p_queue->insert_indices[p_priority - 1];
398            _shift_end = _queuesize - 1;
399            _shift_direction = EW_PRI_SHIFT_UP;
400
401            _doSwap = 1;
402            _updateIdx = 1;
403
404            r_status = EW_PRI_RETDROP;
405
406         }
407         else
408         {
409            r_status = EW_PRI_RETREJECT;
410         }
411      }
412
413
414      if ( _doSwap == 1 )
415      {
416
417         _wrk_pointer->pri           = p_priority;
418         (_wrk_pointer->logo).type   = p_logo.type;
419         (_wrk_pointer->logo).mod    = p_logo.mod;
420         (_wrk_pointer->logo).instid = p_logo.instid;
421         _wrk_pointer->length        = p_size;
422         /*
423         ** copy message text
424         **/
425         if ( _wrk_pointer->data != NULL ) /* avoid invalid write at termination */
426         {
427            memcpy( _wrk_pointer->data, p_data, p_size );
428         }
429      }
430
431
432      if ( _shift_direction != EW_PRI_SHIFT_NONE && p_queue->sorted != NULL )
433      {
434         PRI_QUEUE_ENTRY * _temp = p_queue->sorted[_shift_stt];
435
436#ifdef DEBUG_DETAILS
437         sprintf( dbgstr
438                , "DEBUG add_item() shifting %d -> %d (%d)\n"
439                , _shift_stt, _shift_end, _shift_direction
440                );
441         logit( "t" 
442              , dbgstr
443              , "export_pri"
444              , "MOD_EXPORT_SCN"
445              );
446#endif
447         for ( _idx = _shift_stt
448             ; ( _shift_direction == EW_PRI_SHIFT_UP ? _idx < _shift_end : _idx > _shift_end )
449             ; _idx += _shift_direction
450             )
451         {
452#ifdef DEBUG_DETAILS
453            sprintf( dbgstr
454                   , "DEBUG shifting [%d] = [%d]\n"
455                   , _idx, _idx + _shift_direction
456                   );
457            logit( "t" 
458                 , dbgstr
459                 , "export_pri"
460                 , "MOD_EXPORT_SCN"
461                 );
462#endif
463            p_queue->sorted[ _idx ] = p_queue->sorted[ _idx + _shift_direction ];
464         }
465         p_queue->sorted[_shift_end] = _temp;
466      }
467
468
469      if ( _updateIdx == 1 )
470      {
471         /* Adjust insert indices */
472         for ( _idx = p_priority ; _idx < EW_PRIORITY_COUNT ; _idx++ )
473         {
474            if ( ( p_queue->insert_indices[_idx] + 1 ) <= _queuesize )
475            {
476               p_queue->insert_indices[_idx] = p_queue->insert_indices[_idx] + 1;
477            }
478         }
479      }
480   }
481
482   ReleaseSpecificMutex( &p_queue->lock );
483
484   return r_status;
485}
486
487
488/*********************************************************
489**
490**********************************************************/
491int peek_next_item( PRI_QUEUE   * p_queue
492                  , MSG_LOGO    * p_logoptr
493                  , EW_PRIORITY * p_priptr
494                  )
495{
496   int r_status = EW_PRI_NOITEM;  /* no items in queue */
497
498   if ( p_queue == NULL )
499   {
500      return( EW_PRI_RETQNULL );
501   }
502
503   if ( p_queue->queuesize < 1  )
504   {
505      return( EW_PRI_RETNOTREADY );
506   }
507
508   RequestSpecificMutex( &p_queue->lock );
509
510   if ( p_queue->sorted != NULL )
511   {
512      PRI_QUEUE_ENTRY * _wrk_pointer;  /* item to be shifted */
513      /*
514      ** Check the priority of the object referenced by the
515      ** first pointer in the array.
516      **
517      ** If the priority is none, then there are no items in the list
518      */
519      _wrk_pointer = p_queue->sorted[0];
520
521      if ( _wrk_pointer->pri != EW_PRIORITY_NONE )
522      { 
523         /* copy the logo */
524         p_logoptr->type   = (_wrk_pointer->logo).type;
525         p_logoptr->mod    = (_wrk_pointer->logo).mod ;
526         p_logoptr->instid = (_wrk_pointer->logo).instid;
527 
528         *p_priptr  = _wrk_pointer->pri;  /* message priority */
529
530         r_status = EW_PRI_RETNORMAL;
531      }
532   }
533
534   ReleaseSpecificMutex( &p_queue->lock );
535
536   return r_status;
537}
538
539
540/*********************************************************
541**
542**********************************************************/
543int pop_next_item( PRI_QUEUE * p_queue
544                 , MSG_LOGO  * p_logoptr
545                 , long      * p_sizeptr
546                 , PRI_DATA    p_data
547                 )
548{
549#ifdef LOG_DEBUG
550   char dbgstr[120];
551#endif
552   
553   int r_status = EW_PRI_NOITEM;  /* no items in queue */
554
555   PRI_QUEUE_ENTRY * _wrk_pointer;  /* item to be shifted */
556   unsigned long _idx
557               , _sz
558               ;
559
560   if ( p_queue == NULL )
561   {
562      return( EW_PRI_RETQNULL );
563   }
564
565   if ( p_queue->queuesize < 1  )
566   {
567      return( EW_PRI_RETNOTREADY );
568   }
569
570   RequestSpecificMutex( &(p_queue->lock) );
571
572
573   /*
574   ** Check the priority of the object referenced by the
575   ** first pointer in the array.
576   **
577   ** If the priority is none, then there are no items in the list
578   */
579   _wrk_pointer = p_queue->sorted[0];
580
581   if ( _wrk_pointer->pri != EW_PRIORITY_NONE )
582   { 
583      /* copy the logo */
584      p_logoptr->type   = (_wrk_pointer->logo).type;
585      p_logoptr->mod    = (_wrk_pointer->logo).mod ;
586      p_logoptr->instid = (_wrk_pointer->logo).instid;
587 
588      *p_sizeptr  = _wrk_pointer->length;  /* message length */
589
590      /*
591      ** copy text of message to caller's memory
592      */
593      memcpy( p_data, _wrk_pointer->data, (size_t)(_wrk_pointer->length) );
594
595      /* shift pointers to the other queue entries up */
596      for ( _idx = 0, _sz = p_queue->queuesize - 1 ; _idx < _sz ; _idx++ )
597      {
598         p_queue->sorted[_idx] = p_queue->sorted[_idx + 1];
599      }
600
601      /* clear the queue entry item */
602      _wrk_pointer->pri    = EW_PRIORITY_NONE;
603      _wrk_pointer->length = 0;
604
605      /*
606      ** Put the pointer to the now empty queue entry
607      ** at the end of the sorted array
608      */
609      p_queue->sorted[p_queue->queuesize - 1] = _wrk_pointer;
610
611      /* update the insert indices */
612      for ( _idx = 0 ; _idx < EW_PRIORITY_COUNT ; _idx++ )
613      {
614         if ( 0 <= ( p_queue->insert_indices[_idx] - 1 ) )
615         {
616            p_queue->insert_indices[_idx] = p_queue->insert_indices[_idx] - 1;
617         }
618      }
619
620      /* update the used count */
621      (p_queue->itemsused)--;
622     
623      r_status = EW_PRI_RETNORMAL;
624   }
625
626#ifdef DEBUG_DETAILS
627   sprintf( dbgstr
628          , "%%s(%%s): DEBUG pop_next_item() releasing mutex; q state: %d of %d\n"
629          , p_queue->itemsused
630          , p_queue->queuesize
631          );
632   logit( "t" 
633        , dbgstr
634        , "export_pri"
635        , "MOD_EXPORT_SCN"
636        );
637#endif
638
639   ReleaseSpecificMutex( &p_queue->lock );
640
641   return r_status;
642}
Note: See TracBrowser for help on using the repository browser.