source: trunk/src/libsrc/util/priority_queue.c @ 1381

Revision 1381, 17.8 KB checked in by dietz, 15 years ago (diff)

included string.h

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*
2** priority_queue.c
3**
4** This implements a priority transfer queue.
5**
6** Priority levels are defined in the .h file.
7**
8** The actual sorting is of the pointers in array queue->sorted.
9** Bear in mind these are pointers to the PRI_QUEUE elements in
10** queue->objects.
11**
12** The queue must first be initialized to a specified size
13** using the init_pri_queue function.  After that, the queue
14** is ready for use through add_item() and pop_next_item().
15** (pop_next_item always returns the next item of the highest
16** priority).
17**
18** SEE priority_queue.h for FUNCTION DESCRIPTIONS and RETURN CODES.
19**
20** CAUTION: This queue manages its own mutex, and thus the function
21**          calls do not need to be wrapped in such by the caller.
22*/
23#include <stdio.h>
24#include <stdlib.h>
25#include <string.h>
26#include <platform.h>
27#include <earthworm.h>
28#include <earthworm_complex_funcs.h>
29#include <priority_queue.h>
30
31/* #define LOG_DEBUG 1  */
32
33#ifdef LOG_DEBUG
34/* #define DEBUG_DETAILS 1 */
35#include <stdio.h>
36#endif
37
38/* Private definitions used to simplify the code by keeping the array
39** shifting in a single location.
40*/
41#define EW_PRI_SHIFT_DOWN -1
42#define EW_PRI_SHIFT_NONE  0
43#define EW_PRI_SHIFT_UP    1
44
45
46/*********************************************************
47**
48**********************************************************/
49int init_pri_queue( PRI_QUEUE   * p_queue
50                  , unsigned long p_max_items
51                  , unsigned long p_max_item_size )
52{
53   int r_status = EW_PRI_RETNORMAL;
54
55   if ( p_queue == NULL )
56   {
57      r_status = EW_PRI_RETQNULL;   
58   }
59   else
60   {
61      /*
62      ** Initialize structure in case deallocation needed later
63      */
64      p_queue->queuesize = 0;
65      p_queue->itemsused = 0;
66      p_queue->sorted    = NULL;
67      p_queue->entries   = NULL;
68      p_queue->data      = NULL;
69   }
70
71   if ( p_max_items < 1 )
72   {
73      r_status = EW_PRI_RETPARAM;
74   }
75
76   if ( r_status == 0 )
77   {
78      unsigned long _idx;
79
80      p_queue->queuesize = p_max_items;
81      p_queue->itemmaxsize = p_max_item_size;
82
83      /*
84      ** Allocate space for the sorted pointers to the object storage
85      */
86      if ( r_status == EW_PRI_RETNORMAL )
87      {
88         if ( ( p_queue->sorted = (PRI_QUEUE_ENTRY **) malloc( sizeof(PRI_QUEUE_ENTRY*) * p_max_items) ) ==  NULL )
89         {
90            r_status = EW_PRI_RETMALLOC;
91         }
92      }
93
94      /*
95      ** Allocate space for the object storage
96      */
97      if ( r_status == EW_PRI_RETNORMAL )
98      {
99         if ( ( p_queue->entries = (PRI_QUEUE_ENTRY *) malloc( sizeof(PRI_QUEUE_ENTRY) * p_max_items) ) ==  NULL )
100         {
101            r_status = EW_PRI_RETMALLOC;
102         }
103      }
104
105
106      /*
107      ** Allocate space for the object storage
108      */
109      if ( r_status == EW_PRI_RETNORMAL )
110      {
111         if ( ( p_queue->data = (char *) malloc(sizeof(char) * p_max_item_size * p_max_items) ) ==  NULL )
112         {
113            r_status = EW_PRI_RETMALLOC;
114         }
115      }
116     
117
118      /*
119      ** Initialize the priority containers,
120      ** Load the container pointers into the sorted list
121      */
122      if ( r_status == EW_PRI_RETNORMAL )
123      {
124         PRI_QUEUE_ENTRY  * _entry  = p_queue->entries;
125         PRI_QUEUE_ENTRY ** _sorted = p_queue->sorted;
126         for ( _idx = 0 ; _idx < p_max_items ; _idx++, _entry++, _sorted++ ) 
127         {
128            /* set priority to indicate no valid data */
129            _entry->pri = EW_PRIORITY_NONE;
130            _entry->length = 0;
131            /* assign data storage location to queue entry */
132            _entry->data = (char *)(p_queue->data + (_idx * p_max_item_size));
133            /* grab storage pointer for sorted array */
134            *_sorted = _entry;
135         }
136      }
137
138
139      /*
140      ** Initialize priority index locations to point to the first (zero)
141      ** location in the sorted array
142      */
143      if ( r_status == EW_PRI_RETNORMAL )
144      {
145         for ( _idx = 0 ; _idx < EW_PRIORITY_COUNT ; _idx++ ) {
146            p_queue->insert_indices[_idx] = 0;
147         }
148      }
149      CreateSpecificMutex( &(p_queue->lock) );
150   }
151
152
153   if ( r_status != EW_PRI_RETNORMAL )
154   {
155      release_pri_queue( p_queue );
156   }
157#ifdef LOG_DEBUG
158   else
159   {
160      char dbgstr[120];
161            sprintf( dbgstr
162                   , "DEBUG init: %d  %d  %d\n"
163                   , p_queue->sorted, p_queue->entries, p_queue->data
164                   );
165            logit( "t" 
166                 , dbgstr
167                 , "export_pri"
168                 , "MOD_EXPORT_SCN"
169                 );
170   }
171#endif
172   return r_status;
173}
174
175
176/*********************************************************
177**
178**********************************************************/
179void release_pri_queue( PRI_QUEUE * p_queue )
180{
181   p_queue->queuesize = 0;
182   p_queue->itemsused = 0;
183
184   CloseSpecificMutex( &p_queue->lock );
185
186   if ( p_queue->sorted != NULL )
187   {
188      free( p_queue->sorted );
189      p_queue->sorted = NULL;
190   }
191
192   if ( p_queue->entries != NULL )
193   {
194      free( p_queue->entries );
195      p_queue->entries = NULL;
196   }
197
198   if ( p_queue->data != NULL )
199   {
200      free( p_queue->data );
201      p_queue->data = NULL;
202   }
203}
204
205/*********************************************************
206**
207**********************************************************/
208int getNumOfElementsInQueue( PRI_QUEUE * p_queue )
209{
210   if ( p_queue == NULL )
211   {
212      return 0;
213   }
214   return p_queue->itemsused;
215}
216
217/*********************************************************
218**
219**********************************************************/
220int add_item( PRI_QUEUE * p_queue
221            , EW_PRIORITY p_priority
222            , MSG_LOGO    p_logo
223            , long        p_size
224            , PRI_DATA    p_data
225            )
226{
227#ifdef LOG_DEBUG
228   char dbgstr[120];
229#endif
230
231   int r_status = EW_PRI_RETNORMAL;
232
233   if ( p_queue == NULL )
234   {
235      return( EW_PRI_RETQNULL );
236   }
237
238   if ( p_queue->queuesize < 1 )
239   {
240      return( EW_PRI_RETNOTREADY );
241   }
242
243   RequestSpecificMutex( &p_queue->lock );
244
245   if ( p_size < 0 || p_queue->itemmaxsize < p_size )
246   {
247      return ( EW_PRI_RETMSGSIZE );
248   }
249   else
250   {
251      EW_PRIORITY _usePri = p_priority;
252
253      long _queuesize = p_queue->queuesize
254         , _ins_index = p_queue->queuesize - 1 /* array location to insert new object */
255         , _src_index   /* array location to obtain container for insertion */
256         ;
257      PRI_QUEUE_ENTRY * _wrk_pointer;  /* item to be shifted */
258      int _shift_direction = EW_PRI_SHIFT_NONE
259         , _shift_stt
260         , _shift_end
261         , _idx             /* work index for loops */
262         ;
263      int _doSwap  = 0  /* swap old object for new */
264      , _updateIdx = 0  /* update the priority insert location indices */
265      ;
266
267      if ( p_priority < EW_PRIORITY_MIN || EW_PRIORITY_MAX < p_priority )
268      {
269         /* priority out of range, fall back to default */
270         _usePri = EW_PRIORITY_DEF;
271         /* keep this return status unless worse one arises */
272         r_status = EW_PRI_RETBADPRI;
273      }
274
275
276      if ( p_queue->insert_indices[p_priority] < _queuesize )
277      {
278         /*
279         ** The insert location is within the array
280         */
281         _ins_index = p_queue->insert_indices[p_priority];
282
283
284         /* Check state of item at insert location */
285         _wrk_pointer = p_queue->sorted[_ins_index];
286
287         if (   _wrk_pointer->pri == 0           /* container at insert location unused */
288             || _ins_index == (_queuesize - 1) /* insert location is last position in the array */
289            )
290         {
291            /*
292            ** Use the item at the insert location as the source container
293            **
294            ** _shift_direction = 0;
295            */
296            _doSwap = 1;
297            _updateIdx = 1;
298            if ( p_queue->itemsused < _queuesize )
299            {
300               p_queue->itemsused++;  /* increment number of items used */
301            }
302         }
303         else
304         {  /*
305            ** the insert location is within the array space
306            ** the item at the insert location is used
307            ** the insert location is not the last position in the array
308            */
309           
310            /* check if the array is full */
311            if ( p_queue->itemsused < _queuesize )
312            {
313               /*
314               ** The array is not full, therefore there is at least one
315               ** unused item in the array.
316               **
317               ** Shift items down from insert point to first unused item
318               */
319               _src_index = p_queue->itemsused++;  /* increment number of items used */
320
321               /* GET THE SOURCE CONTAINER */
322               _wrk_pointer = p_queue->sorted[_src_index];
323
324               /* SHIFT ITEMS DOWN */
325               _shift_end = _ins_index;
326               _shift_stt = _src_index;
327               _shift_direction = EW_PRI_SHIFT_DOWN;
328
329               _doSwap = 1;
330               _updateIdx = 1;
331            }
332            else
333            {
334               /*
335               ** The array is full.
336               **
337               ** Since we've already ascertained that the insert point is within the array
338               ** we know that the item to be inserted is less than the highest priority
339               ** stored in the array because there is at least one item of a higher priority
340               ** following the items at the priority of the new item.
341               **
342               ** Get the priority of the last item in the array, use the earliest item
343               ** of that priority to make space for the new item.
344               ** This equates to shifting items from the insert location down to the
345               ** insert location of the priority before the last (the first item of
346               ** the last priority in the array).
347               */
348               _src_index = p_queue->insert_indices[ p_queue->sorted[_queuesize - 1]->pri - 1 ];
349
350               /* GET THE SOURCE CONTAINER */
351               _wrk_pointer = p_queue->sorted[_src_index];
352
353               /* SHIFT ITEMS DOWN */
354               _shift_end = _ins_index;
355               _shift_stt = _src_index;
356               _shift_direction = EW_PRI_SHIFT_DOWN;
357
358               _doSwap = 1;
359               _updateIdx = 1;
360
361               r_status = EW_PRI_RETDROP;
362            }
363         }
364      }  /*  insert_indices[p_priority] < _queuesize  */
365      else
366      {  /*  insert_indices[p_priority] == _queuesize
367         **
368         ** Insert index for the priority of the new item is past the end of
369         ** the array.  (The array is full.)
370         **
371         ** Therefore, this item can only be inserted if this item has the
372         ** same priority as the last item in the array and the priority
373         ** of the last item in the array is not of the minimum priority.
374         **
375         ** In such a case, will drop the earliest item of the same priority,
376         ** shift all other of the priority up, and tack the new item on at the end.
377         **
378         ** (That way, if the bandwidth opens up again and no others of this
379         ** priority are dropped, we can send a continuous stream of this
380         ** priority from the earliest point at which there was sufficient
381         ** bandwidth).
382         */
383         if (   p_queue->sorted[_queuesize - 1]->pri != EW_PRIORITY_MIN
384             && p_queue->insert_indices[p_priority - 1] < _queuesize
385            )
386         {
387            /*
388            ** The insert point for the priority immediately prior to this one
389            ** is before the end of the array, indicating that this item is of
390            ** the same priority as the last priority in the array.
391            */
392            /* GET THE SOURCE CONTAINER */
393            _wrk_pointer = p_queue->sorted[ p_queue->insert_indices[p_priority - 1] ];
394
395            /* SHIFT ITEMS UP */
396            _shift_stt = p_queue->insert_indices[p_priority - 1];
397            _shift_end = _queuesize - 1;
398            _shift_direction = EW_PRI_SHIFT_UP;
399
400            _doSwap = 1;
401            _updateIdx = 1;
402
403            r_status = EW_PRI_RETDROP;
404
405         }
406         else
407         {
408            r_status = EW_PRI_RETREJECT;
409         }
410      }
411
412
413      if ( _doSwap == 1 )
414      {
415
416         _wrk_pointer->pri           = p_priority;
417         (_wrk_pointer->logo).type   = p_logo.type;
418         (_wrk_pointer->logo).mod    = p_logo.mod;
419         (_wrk_pointer->logo).instid = p_logo.instid;
420         _wrk_pointer->length        = p_size;
421         /*
422         ** copy message text
423         **/
424         if ( _wrk_pointer->data != NULL ) /* avoid invalid write at termination */
425         {
426            memcpy( _wrk_pointer->data, p_data, p_size );
427         }
428      }
429
430
431      if ( _shift_direction != EW_PRI_SHIFT_NONE && p_queue->sorted != NULL )
432      {
433         PRI_QUEUE_ENTRY * _temp = p_queue->sorted[_shift_stt];
434
435#ifdef DEBUG_DETAILS
436         sprintf( dbgstr
437                , "DEBUG add_item() shifting %d -> %d (%d)\n"
438                , _shift_stt, _shift_end, _shift_direction
439                );
440         logit( "t" 
441              , dbgstr
442              , "export_pri"
443              , "MOD_EXPORT_SCN"
444              );
445#endif
446         for ( _idx = _shift_stt
447             ; ( _shift_direction == EW_PRI_SHIFT_UP ? _idx < _shift_end : _idx > _shift_end )
448             ; _idx += _shift_direction
449             )
450         {
451#ifdef DEBUG_DETAILS
452            sprintf( dbgstr
453                   , "DEBUG shifting [%d] = [%d]\n"
454                   , _idx, _idx + _shift_direction
455                   );
456            logit( "t" 
457                 , dbgstr
458                 , "export_pri"
459                 , "MOD_EXPORT_SCN"
460                 );
461#endif
462            p_queue->sorted[ _idx ] = p_queue->sorted[ _idx + _shift_direction ];
463         }
464         p_queue->sorted[_shift_end] = _temp;
465      }
466
467
468      if ( _updateIdx == 1 )
469      {
470         /* Adjust insert indices */
471         for ( _idx = p_priority ; _idx < EW_PRIORITY_COUNT ; _idx++ )
472         {
473            if ( ( p_queue->insert_indices[_idx] + 1 ) <= _queuesize )
474            {
475               p_queue->insert_indices[_idx] = p_queue->insert_indices[_idx] + 1;
476            }
477         }
478      }
479   }
480
481   ReleaseSpecificMutex( &p_queue->lock );
482
483   return r_status;
484}
485
486
487/*********************************************************
488**
489**********************************************************/
490int peek_next_item( PRI_QUEUE   * p_queue
491                  , MSG_LOGO    * p_logoptr
492                  , EW_PRIORITY * p_priptr
493                  )
494{
495   int r_status = EW_PRI_NOITEM;  /* no items in queue */
496
497   if ( p_queue == NULL )
498   {
499      return( EW_PRI_RETQNULL );
500   }
501
502   if ( p_queue->queuesize < 1  )
503   {
504      return( EW_PRI_RETNOTREADY );
505   }
506
507   RequestSpecificMutex( &p_queue->lock );
508
509   if ( p_queue->sorted != NULL )
510   {
511      PRI_QUEUE_ENTRY * _wrk_pointer;  /* item to be shifted */
512      /*
513      ** Check the priority of the object referenced by the
514      ** first pointer in the array.
515      **
516      ** If the priority is none, then there are no items in the list
517      */
518      _wrk_pointer = p_queue->sorted[0];
519
520      if ( _wrk_pointer->pri != EW_PRIORITY_NONE )
521      { 
522         /* copy the logo */
523         p_logoptr->type   = (_wrk_pointer->logo).type;
524         p_logoptr->mod    = (_wrk_pointer->logo).mod ;
525         p_logoptr->instid = (_wrk_pointer->logo).instid;
526 
527         *p_priptr  = _wrk_pointer->pri;  /* message priority */
528
529         r_status = EW_PRI_RETNORMAL;
530      }
531   }
532
533   ReleaseSpecificMutex( &p_queue->lock );
534
535   return r_status;
536}
537
538
539/*********************************************************
540**
541**********************************************************/
542int pop_next_item( PRI_QUEUE * p_queue
543                 , MSG_LOGO  * p_logoptr
544                 , long      * p_sizeptr
545                 , PRI_DATA    p_data
546                 )
547{
548#ifdef LOG_DEBUG
549   char dbgstr[120];
550#endif
551   
552   int r_status = EW_PRI_NOITEM;  /* no items in queue */
553
554   PRI_QUEUE_ENTRY * _wrk_pointer;  /* item to be shifted */
555   unsigned long _idx
556               , _sz
557               ;
558
559   if ( p_queue == NULL )
560   {
561      return( EW_PRI_RETQNULL );
562   }
563
564   if ( p_queue->queuesize < 1  )
565   {
566      return( EW_PRI_RETNOTREADY );
567   }
568
569   RequestSpecificMutex( &(p_queue->lock) );
570
571
572   /*
573   ** Check the priority of the object referenced by the
574   ** first pointer in the array.
575   **
576   ** If the priority is none, then there are no items in the list
577   */
578   _wrk_pointer = p_queue->sorted[0];
579
580   if ( _wrk_pointer->pri != EW_PRIORITY_NONE )
581   { 
582      /* copy the logo */
583      p_logoptr->type   = (_wrk_pointer->logo).type;
584      p_logoptr->mod    = (_wrk_pointer->logo).mod ;
585      p_logoptr->instid = (_wrk_pointer->logo).instid;
586 
587      *p_sizeptr  = _wrk_pointer->length;  /* message length */
588
589      /*
590      ** copy text of message to caller's memory
591      */
592      memcpy( p_data, _wrk_pointer->data, (size_t)(_wrk_pointer->length) );
593
594      /* shift pointers to the other queue entries up */
595      for ( _idx = 0, _sz = p_queue->queuesize - 1 ; _idx < _sz ; _idx++ )
596      {
597         p_queue->sorted[_idx] = p_queue->sorted[_idx + 1];
598      }
599
600      /* clear the queue entry item */
601      _wrk_pointer->pri    = EW_PRIORITY_NONE;
602      _wrk_pointer->length = 0;
603
604      /*
605      ** Put the pointer to the now empty queue entry
606      ** at the end of the sorted array
607      */
608      p_queue->sorted[p_queue->queuesize - 1] = _wrk_pointer;
609
610      /* update the insert indices */
611      for ( _idx = 0 ; _idx < EW_PRIORITY_COUNT ; _idx++ )
612      {
613         if ( 0 <= ( p_queue->insert_indices[_idx] - 1 ) )
614         {
615            p_queue->insert_indices[_idx] = p_queue->insert_indices[_idx] - 1;
616         }
617      }
618
619      /* update the used count */
620      (p_queue->itemsused)--;
621     
622      r_status = EW_PRI_RETNORMAL;
623   }
624
625#ifdef DEBUG_DETAILS
626   sprintf( dbgstr
627          , "%%s(%%s): DEBUG pop_next_item() releasing mutex; q state: %d of %d\n"
628          , p_queue->itemsused
629          , p_queue->queuesize
630          );
631   logit( "t" 
632        , dbgstr
633        , "export_pri"
634        , "MOD_EXPORT_SCN"
635        );
636#endif
637
638   ReleaseSpecificMutex( &p_queue->lock );
639
640   return r_status;
641}
642
643
644
645
646
647
Note: See TracBrowser for help on using the repository browser.