source: trunk/src/libsrc/winnt/transport.c @ 2899

Revision 2899, 51.9 KB checked in by paulf, 13 years ago (diff)

fixed yet another time_t casting issue in tport_buferror()

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1
2/*
3 *   THIS FILE IS UNDER RCS - DO NOT MODIFY UNLESS YOU HAVE
4 *   CHECKED IT OUT USING THE COMMAND CHECKOUT.
5 *
6 *    $Id$
7 *
8 *    Revision history:
9 *     $Log$
10 *     Revision 1.7  2007/03/29 13:46:40  paulf
11 *     fixed yet another time_t casting issue in tport_buferror()
12 *
13 *     Revision 1.6  2006/10/05 14:49:51  stefan
14 *     commented out unused extern in tport_syserror
15 *
16 *     Revision 1.5  2005/04/15 17:31:27  dietz
17 *     Modified tport_bufthr() to look for termination flag in both the
18 *     public and private rings.
19 *
20 *     Revision 1.4  2004/03/22 21:15:59  kohler
21 *     Added: #include <stdlib.h>
22 *
23 *     Revision 1.3  2001/05/04 23:43:54  dietz
24 *     Changed flag arg of tport_putflag from short to int to handle
25 *     processids properly.
26 *
27 *     Revision 1.2  2000/06/02 18:19:48  dietz
28 *     Fixed tport_putmsg,tport_copyto to always release semaphore before returning
29 *
30 *     Revision 1.1  2000/02/14 18:53:30  lucky
31 *     Initial revision
32 *
33 *
34 */
35
36/********************************************************************/
37/*                                                        6/2000    */
38/*                           transport.c                            */
39/*                                                                  */
40/*   Transport layer functions to access shared memory regions.     */
41/*                                                                  */
42/*   written by Lynn Dietz, Will Kohler with inspiration from       */
43/*       Carl Johnson, Alex Bittenbinder, Barbara Bogaert           */
44/*                                                                  */
45/********************************************************************/
46
47/* ***** Notes on development, delete when appropriate
481. Change the quotes for the transport.h and earthworm.h includes
49   when these are moved to the appropriately pathed included directory.
50*/
51#include <windows.h>
52#include <sys/types.h>
53#include <stdio.h>
54#include <stdlib.h>
55#include <errno.h>
56#include <string.h>
57#include <time.h>
58#include <process.h>
59#include <transport.h>
60
61static short Put_Init=1;           /* initialization flag */
62static short Get_Init=1;           /* initialization flag */
63static short Copyfrom_Init=1;      /* initialization flag */
64static short Copyto_Init  =1;      /* initialization flag */
65
66/* These functions are for internal use by transport functions only
67   ****************************************************************/
68void  tport_syserr  ( char *, long );
69void  tport_buferror( short, char * );
70
71/* These statements and variables are required by the functions of
72   the input-buffering thread
73   ***************************************************************/
74#include "earthworm.h"
75volatile SHM_INFO *PubRegion;      /* transport public ring      */
76volatile SHM_INFO *BufRegion;      /* pointer to private ring    */
77volatile MSG_LOGO *Getlogo;        /* array of logos to copy     */
78volatile short     Nget;           /* number of logos in getlogo */
79volatile unsigned  MaxMsgSize;     /* size of message buffer     */
80volatile char     *Message;        /* message buffer             */
81static unsigned char MyModuleId;   /* module id of main thread   */
82unsigned char      MyInstid;       /* instid of main thread      */
83unsigned char      TypeError;      /* type for error messages    */
84
85/******************** function tport_create *************************/
86/*         Create a shared memory region & its semaphore,           */
87/*           attach to it and initialize header values.             */
88/********************************************************************/
89void tport_create( SHM_INFO *region,   /* info structure for memory region  */
90                   long      nbytes,   /* size of shared memory region      */
91                   long      memkey )  /* key to shared memory region       */
92{
93   SHM_HEAD       *shm;       /* pointer to start of memory region */
94   HANDLE         hshare;     // Handle to memory shared file
95   HANDLE         hmutex;     // Handle to mutex object
96   char           share[20];  // Shared file name from memkey
97   char           mutex[20];  // Mutex name
98   int            err;        // Error code from GetLastError()
99
100/**** Create shared memory region ****/
101   sprintf(share, "SHR_%ld", memkey);
102   hshare = CreateFileMapping(
103      (HANDLE)0xFFFFFFFF,  // Request memory file (swap space)
104      NULL,                // Security attributes
105      PAGE_READWRITE,      // Access restrictions
106      0,                   // High order size (for very large mappings)
107      nbytes,              // Low order size
108      share);              // Name of file for other processes
109   if ( hshare == NULL )
110   {
111      err = GetLastError();
112      tport_syserr( "CreateFileMapping", err);
113   }
114
115/**** Attach to shared memory region ****/
116   shm = (SHM_HEAD *) MapViewOfFile(
117      hshare,              // File object to map
118      FILE_MAP_WRITE,      // Access desired
119      0,                   // High-order 32 bits of file offset
120      0,                   // Low-order 32 bits of file offset
121      nbytes);             // Number of bytes to map
122   if ( shm == NULL )
123   {
124      err = GetLastError();
125      tport_syserr( "MapViewOfFile", err );
126   }
127
128/**** Initialize shared memory region header ****/
129   shm->nbytes = nbytes;
130   shm->keymax = nbytes - sizeof(SHM_HEAD);
131   shm->keyin  = sizeof(SHM_HEAD);
132   shm->keyold = shm->keyin;
133   shm->flag   = 0;
134
135/**** Make semaphore for this shared memory region & set semval = SHM_FREE ****/
136   sprintf(mutex, "MTX_%ld", memkey);
137   hmutex = CreateMutex(
138      NULL,                // Security attributes
139      FALSE,               // Initial ownership
140      mutex);              // Name of mutex (derived from memkey)
141   if ( hmutex == NULL )
142   {
143      err = GetLastError();
144      tport_syserr( "CreateMutex", err);
145   }
146
147/**** set values in the shared memory information structure ****/
148   region->addr = shm;
149   region->hShare = hshare;
150   region->hMutex = hmutex;
151   region->key  = memkey;
152}
153
154
155/******************** function tport_destroy *************************/
156/*                Destroy a shared memory region.                    */
157/*********************************************************************/
158
159void tport_destroy( SHM_INFO *region )
160{
161   int err;
162
163/***** Set kill flag, give other attached programs time to terminate ****/
164
165   tport_putflag( region, TERMINATE );
166   Sleep( 1000 );
167
168/***** Detach from shared memory region *****/
169   if(!UnmapViewOfFile( region->addr )) {
170      err = GetLastError();
171      tport_syserr( "UnmapViewOfFile", err);
172   }
173
174/***** Destroy semaphore set for shared memory region *****/
175   if(!CloseHandle(region->hMutex)) {
176      err = GetLastError();
177      tport_syserr( "CloseHandle (mutex)", err);
178   }
179
180
181/***** Destroy shared memory region *****/
182   if(!CloseHandle(region->hShare)) {
183      err = GetLastError();
184      tport_syserr( "CloseHandle (share)", err);
185   }
186}
187
188/******************** function tport_attach *************************/
189/*            Map to an existing shared memory region.              */
190/********************************************************************/
191
192void tport_attach( SHM_INFO *region,   /* info structure for memory region  */
193                   long      memkey )  /* key to shared memory region       */
194{
195   SHM_HEAD       *shm;       /* pointer to start of memory region */
196   HANDLE         hshare;     // Handle to memory shared file
197   HANDLE         hmutex;     // Handle to mutex object
198   char           share[20];  // Shared file name from memkey
199   char           mutex[20];  // Mutex name
200   int            err;        // Error code from GetLastError()
201
202/**** Create shared memory region ****/
203   sprintf(share, "SHR_%ld", memkey);
204   hshare = OpenFileMapping(
205       FILE_MAP_WRITE,
206       TRUE,
207       share);
208   if ( hshare == NULL )
209   {
210      err = GetLastError();
211      tport_syserr( "OpenFileMapping", err);
212   }
213
214/**** Attach to shared memory region ****/
215   shm = (SHM_HEAD *) MapViewOfFile(
216      hshare,              // File object to map
217      FILE_MAP_WRITE,      // Access desired
218      0,                   // High-order 32 bits of file offset
219      0,                   // Low-order 32 bits of file offset
220      0);                  // Number of bytes to map
221   if ( shm == NULL )
222   {
223      err = GetLastError();
224      tport_syserr( "MapViewOfFile", err );
225   }
226
227/**** Make semaphore for this shared memory region & set semval = SHM_FREE ****/
228   sprintf(mutex, "MTX_%ld", memkey);
229   hmutex = CreateMutex(
230      NULL,                // Security attributes
231      FALSE,               // Initial ownership
232      mutex);              // Name of mutex (derived from memkey)
233   if ( hmutex == NULL )
234   {
235      err = GetLastError();
236      tport_syserr( "CreateMutex", err);
237   }
238
239/**** set values in the shared memory information structure ****/
240   region->addr = shm;
241   region->hShare = hshare;
242   region->hMutex = hmutex;
243   region->key  = memkey;
244}
245
246/******************** function tport_detach **************************/
247/*                Detach from a shared memory region.                */
248/*********************************************************************/
249
250void tport_detach( SHM_INFO *region )
251{
252   int err;
253
254/***** Detach from shared memory region *****/
255   if(!UnmapViewOfFile( region->addr )) {
256      err = GetLastError();
257      tport_syserr( "UnmapViewOfFile", err);
258   }
259
260/***** Destroy semaphore set for shared memory region *****/
261   if(!CloseHandle(region->hMutex)) {
262      err = GetLastError();
263      tport_syserr( "CloseHandle (mutex)", err);
264   }
265
266
267/***** Destroy shared memory region *****/
268   if(!CloseHandle(region->hShare)) {
269      err = GetLastError();
270      tport_syserr( "CloseHandle (share)", err);
271   }
272}
273
274
275
276/*********************** function tport_putmsg ***********************/
277/*            Put a message into a shared memory region.             */
278/*            Assigns a transport-layer sequence number.             */
279/*********************************************************************/
280
281int tport_putmsg( SHM_INFO *region,    /* info structure for memory region    */
282                  MSG_LOGO *putlogo,   /* type, module, instid of incoming msg */
283                  long      length,    /* size of incoming message            */
284                  char     *msg )      /* pointer to incoming message         */
285{
286   volatile static MSG_TRACK  trak[NTRACK_PUT];   /* sequence number keeper   */
287   volatile static int        nlogo;              /* # of logos seen so far   */
288   int                        it;                 /* index into trak          */
289   SHM_HEAD         *shm;              /* pointer to start of memory region   */
290   char             *ring;             /* pointer to ring part of memory      */
291   unsigned long     ir;               /* index into memory ring              */
292   long              nfill;            /* # bytes from ir to ring's last-byte */
293   long              nwrap;            /* # bytes to wrap to front of ring    */
294   TPORT_HEAD        hd;               /* transport layer header to put       */
295   char             *h;                /* pointer to transport layer header   */
296   TPORT_HEAD        old;              /* transport header of oldest msg      */
297   char             *o;                /* pointer to oldest transport header  */
298   int j;
299   int retval = PUT_OK;                /* return value for this function      */
300
301/**** First time around, init the sequence counters, semaphore controls ****/
302
303   if (Put_Init)
304   {
305       nlogo    = 0;
306
307       for( j=0 ; j < NTRACK_PUT ; j++ )
308       {
309          trak[j].memkey      = 0;
310          trak[j].logo.type   = 0;
311          trak[j].logo.mod    = 0;
312          trak[j].logo.instid = 0;
313          trak[j].seq         = 0;
314          trak[j].keyout      = 0;
315       }
316
317       Put_Init = 0;
318   }
319
320/**** Set up pointers for shared memory, etc. ****/
321
322   shm  = region->addr;
323   ring = (char *) shm + sizeof(SHM_HEAD);
324   h    = (char *) (&hd);
325   o    = (char *) (&old);
326
327/**** First, see if the incoming message will fit in the memory region ****/
328
329   if ( length + sizeof(TPORT_HEAD) > shm->keymax )
330   {
331      fprintf( stdout,
332              "ERROR: tport_putmsg; message too large (%ld) for Region %ld\n",
333               length, region->key);
334      return( PUT_TOOBIG );
335   }
336
337/**** Change semaphore; let others know you're using tracking structure & memory  ****/
338
339   WaitForSingleObject(region->hMutex, INFINITE);
340
341/**** Next, find incoming logo in list of combinations already seen ****/
342
343   for( it=0 ; it < nlogo ; it++ )
344   {
345      if ( region->key     != trak[it].memkey     )  continue;
346      if ( putlogo->type   != trak[it].logo.type  )  continue;
347      if ( putlogo->mod    != trak[it].logo.mod   )  continue;
348      if ( putlogo->instid != trak[it].logo.instid ) continue;
349      goto build_header;
350   }
351
352/**** Incoming logo is a new combination; store it, if there's room ****/
353
354   if ( nlogo == NTRACK_PUT )
355   {
356      fprintf( stdout,
357              "ERROR: tport_putmsg; exceeded NTRACK_PUT, msg not sent\n");
358      retval = PUT_NOTRACK;
359      goto release_semaphore;
360   }
361   it = nlogo;
362   trak[it].memkey =  region->key;
363   trak[it].logo   = *putlogo;
364   nlogo++;
365
366/**** Store everything you need in the transport header ****/
367
368build_header:
369   hd.start = FIRST_BYTE;
370   hd.size  = length;
371   hd.logo  = trak[it].logo;
372   hd.seq   = trak[it].seq++;
373
374/**** In shared memory, see if keyin will wrap; if so, reset keyin and keyold ****/
375
376   if ( shm->keyin + sizeof(TPORT_HEAD) + length  <  shm->keyold )
377   {
378       shm->keyin  = shm->keyin  % shm->keymax;
379       shm->keyold = shm->keyold % shm->keymax;
380       if ( shm->keyin <= shm->keyold ) shm->keyin += shm->keymax;
381     /*fprintf( stdout,
382               "NOTICE: tport_putmsg; keyin wrapped & reset; Region %ld\n",
383                region->key );*/
384   }
385
386/**** Then see if there's enough room for new message in shared memory ****/
387/****      If not, "delete" oldest messages until there's room         ****/
388
389   while( shm->keyin + sizeof(TPORT_HEAD) + length - shm->keyold > shm->keymax )
390   {
391      ir = shm->keyold % shm->keymax;
392      if ( ring[ir] != FIRST_BYTE )
393      {
394          fprintf( stdout,
395                  "ERROR: tport_putmsg; keyold not at FIRST_BYTE, Region %ld\n",
396                   region->key );
397          retval = TPORT_FATAL;
398          goto release_semaphore;
399      }
400      for ( j=0 ; j < sizeof(TPORT_HEAD) ; j++ )
401      {
402         if ( ir >= shm->keymax )   ir -= shm->keymax;
403         o[j] = ring[ir++];
404      }
405      shm->keyold += sizeof(TPORT_HEAD) + old.size;
406   }
407
408/**** Now copy transport header into shared memory by chunks... ****/
409
410   ir = shm->keyin % shm->keymax;
411   nwrap = ir + sizeof(TPORT_HEAD) - shm->keymax;
412   if ( nwrap <= 0 )
413   {
414         memcpy( (void *) &ring[ir], (void *) h, sizeof(TPORT_HEAD) );
415   }
416   else
417   {
418         nfill = sizeof(TPORT_HEAD) - nwrap;
419         memcpy( (void *) &ring[ir], (void *) &h[0],     nfill );
420         memcpy( (void *) &ring[0],  (void *) &h[nfill], nwrap );
421   }
422   ir += sizeof(TPORT_HEAD);
423   if ( ir >= shm->keymax )  ir -= shm->keymax;
424
425/**** ...and copy message into shared memory by chunks ****/
426
427   nwrap = ir + length - shm->keymax;
428   if ( nwrap <= 0 )
429   {
430         memcpy( (void *) &ring[ir], (void *) msg, length );
431   }
432   else
433   {
434         nfill = length - nwrap;
435         memcpy( (void *) &ring[ir], (void *) &msg[0],     nfill );
436         memcpy( (void *) &ring[0],  (void *) &msg[nfill], nwrap );
437   }
438   shm->keyin += sizeof(TPORT_HEAD) + length;
439
440/**** Finished with shared memory, let others know via semaphore ****/
441
442release_semaphore:
443   ReleaseMutex(region->hMutex);
444
445   if( retval == TPORT_FATAL ) exit( 1 );
446   return( retval );
447}
448
449
450/*********************** function tport_getmsg ***********************/
451/*                 Get a message out of shared memory.               */
452/*********************************************************************/
453
454int tport_getmsg( SHM_INFO  *region,   /* info structure for memory region  */
455                  MSG_LOGO  *getlogo,  /* requested logo(s)                 */
456                  short      nget,     /* number of logos in getlogo        */
457                  MSG_LOGO  *logo,     /* logo of retrieved msg             */
458                  long      *length,   /* size of retrieved message         */
459                  char      *msg,      /* retrieved message                 */
460                  long       maxsize ) /* max length for retrieved message  */
461{
462   static MSG_TRACK  trak[NTRACK_GET]; /* sequence #, outpointer keeper     */
463   static int        nlogo;            /* # modid,type,instid combos so far */
464   int               it;               /* index into trak                   */
465   SHM_HEAD         *shm;              /* pointer to start of memory region */
466   char             *ring;             /* pointer to ring part of memory    */
467   TPORT_HEAD       *tmphd;            /* temp pointer into shared memory   */
468   unsigned long     ir;               /* index into the ring               */
469   long              nfill;            /* bytes from ir to ring's last-byte */
470   long              nwrap;            /* bytes to grab from front of ring  */
471   TPORT_HEAD        hd;               /* transport header from memory      */
472   char             *h;                /* pointer to transport layer header */
473   int               ih;               /* index into the transport header   */
474   unsigned long     keyin;            /* in-pointer to shared memory       */
475   unsigned long     keyold;           /* oldest complete message in memory */
476   unsigned long     keyget;           /* pointer at which to start search  */
477   int               status = GET_OK;  /* how did retrieval go?             */
478   int               trakked;          /* flag for trakking list entries    */
479   int               i,j;
480
481/**** Get the pointers set up ****/
482
483   shm  = region->addr;
484   ring = (char *) shm + sizeof(SHM_HEAD);
485   h    = (char *) (&hd);
486
487/**** First time around, initialize sequence counters, outpointers ****/
488
489   if (Get_Init)
490   {
491       nlogo = 0;
492
493       for( i=0 ; i < NTRACK_GET ; i++ )
494       {
495          trak[i].memkey      = 0;
496          trak[i].logo.type   = 0;
497          trak[i].logo.mod    = 0;
498          trak[i].logo.instid = 0;
499          trak[i].seq         = 0;
500          trak[i].keyout      = 0;
501          trak[i].active      = 0; /*960618:ldd*/
502       }
503       Get_Init = 0;
504   }
505
506/**** make sure all requested logos are entered in tracking list ****/
507
508   for ( j=0 ; j < nget ; j++ )  /* for all requested message logos */
509   {
510       trakked = 0;  /* assume it's not being trakked */
511       for( it=0 ; it < nlogo ; it++ )  /* for all logos we're tracking */
512       {
513          if( region->key       != trak[it].memkey      ) continue;
514          if( getlogo[j].type   != trak[it].logo.type   ) continue;
515          if( getlogo[j].mod    != trak[it].logo.mod    ) continue;
516          if( getlogo[j].instid != trak[it].logo.instid ) continue;
517          trakked = 1;  /* found it in the trakking list! */
518          break;
519       }
520       if( trakked ) continue;
521    /* Make an entry in trak for this logo; if there's room */
522       if ( nlogo < NTRACK_GET )
523       {
524          it = nlogo;
525          trak[it].memkey = region->key;
526          trak[it].logo   = getlogo[j];
527          nlogo++;
528       }
529   }
530
531/**** find latest starting index to look for any of the requested logos ****/
532
533findkey:
534
535   keyget = shm->keyold;
536
537   for ( it=0 ; it < nlogo ; it++ )  /* for all message logos we're tracking */
538   {
539       if ( trak[it].memkey != region->key ) continue;
540       for ( j=0 ; j < nget ; j++ )  /* for all requested message logos */
541       {
542          if((getlogo[j].type   == trak[it].logo.type   || getlogo[j].type==WILD  ) &&
543             (getlogo[j].mod    == trak[it].logo.mod    || getlogo[j].mod==WILD   ) &&
544             (getlogo[j].instid == trak[it].logo.instid || getlogo[j].instid==WILD) )
545          {
546             if ( trak[it].keyout > keyget )  keyget = trak[it].keyout;
547          }
548       }
549    }
550    keyin = shm->keyin;
551
552/**** See if keyin and keyold were wrapped and reset by tport_putmsg; ****/
553/****       If so, reset trak[xx].keyout and go back to findkey       ****/
554
555   if ( keyget > keyin )
556   {
557      keyold = shm->keyold;
558      for ( it=0 ; it < nlogo ; it++ )
559      {
560         if( trak[it].memkey == region->key )
561         {
562          /* reset keyout */
563/*DEBUG*/    /*printf("tport_getmsg: Pre-reset:  keyout=%10u    keyold=%10u  keyin=%10u\n",
564                     trak[it].keyout, keyold, keyin );*/
565             trak[it].keyout = trak[it].keyout % shm->keymax;
566/*DEBUG*/    /*printf("tport_getmsg:  Intermed:  keyout=%10u    keyold=%10u  keyin=%10u\n",
567                     trak[it].keyout, keyold, keyin );*/
568
569          /* make sure new keyout points to keyin or to a msg's first-byte; */
570          /* if not, we've been lapped, so set keyout to keyold             */
571             ir    = trak[it].keyout;
572             tmphd = (TPORT_HEAD *) &ring[ir];
573             if ( trak[it].keyout == keyin   ||
574                  (keyin-trak[it].keyout)%shm->keymax == 0 )
575             {
576/*DEBUG*/       /*printf("tport_getmsg:  Intermed:  keyout=%10u  same as keyin\n",
577                       trak[it].keyout );*/
578                trak[it].keyout = keyin;
579             }
580             else if( tmphd->start != FIRST_BYTE )
581             {
582/*DEBUG*/       /*printf("tport_getmsg:  Intermed:  keyout=%10u  does not point to FIRST_BYTE\n",
583                        trak[it].keyout );*/
584                trak[it].keyout = keyold;
585             }
586
587          /* else, make sure keyout's value is between keyold and keyin */
588             else if ( trak[it].keyout < keyold )
589             {
590                do {
591                    trak[it].keyout += shm->keymax;
592                } while ( trak[it].keyout < keyold );
593             }
594/*DEBUG*/    /*printf("tport_getmsg:     Reset:  keyout=%10u    keyold=%10u  keyin=%10u\n",
595                     trak[it].keyout, keyold, keyin );*/
596         }
597      }
598    /*fprintf( stdout,
599          "NOTICE: tport_getmsg; keyin wrapped, keyout(s) reset; Region %ld\n",
600           region->key );*/
601
602      goto findkey;
603   }
604
605
606/**** Find next message from requested type, module, instid ****/
607
608nextmsg:
609
610   while ( keyget < keyin )
611   {
612   /* make sure you haven't been lapped by tport_putmsg */
613       if ( keyget < shm->keyold ) keyget = shm->keyold;
614
615   /* load next header; make sure you weren't lapped */
616       ir = keyget % shm->keymax;
617       for ( ih=0 ; ih < sizeof(TPORT_HEAD) ; ih++ )
618       {
619          if ( ir >= shm->keymax )  ir -= shm->keymax;
620          h[ih] = ring[ir++];
621       }
622       if ( keyget < shm->keyold ) continue;  /*added 960612:ldd*/
623
624   /* make sure it starts at beginning of a header */
625       if ( hd.start != FIRST_BYTE )
626       {
627          fprintf( stdout,
628                  "ERROR: tport_getmsg; keyget not at FIRST_BYTE, Region %ld\n",
629                   region->key );
630          exit( 1 );
631       }
632       keyget += sizeof(TPORT_HEAD) + hd.size;
633
634   /* see if this msg matches any requested type */
635       for ( j=0 ; j < nget ; j++ )
636       {
637          if((getlogo[j].type   == hd.logo.type   || getlogo[j].type == WILD) &&
638             (getlogo[j].mod    == hd.logo.mod    || getlogo[j].mod  == WILD) &&
639             (getlogo[j].instid == hd.logo.instid || getlogo[j].instid == WILD) )
640          {
641
642/**** Found a message of requested logo; retrieve it! ****/
643        /* complain if retreived msg is too big */
644             if ( hd.size > maxsize )
645             {
646               *logo   = hd.logo;
647               *length = hd.size;
648                status = GET_TOOBIG;
649                goto trackit;    /*changed 960612:ldd*/
650             }
651        /* copy message by chunks to caller's address */
652             nwrap = ir + hd.size - shm->keymax;
653             if ( nwrap <= 0 )
654             {
655                memcpy( (void *) msg, (void *) &ring[ir], hd.size );
656             }
657             else
658             {
659                nfill = hd.size - nwrap;
660                memcpy( (void *) &msg[0],     (void *) &ring[ir], nfill );
661                memcpy( (void *) &msg[nfill], (void *) &ring[0],  nwrap );
662             }
663        /* see if we got run over by tport_putmsg while copying msg */
664        /* if we did, go back and try to get a msg cleanly          */
665             keyold = shm->keyold;
666             if ( keyold >= keyget )
667             {
668                keyget = keyold;
669                goto nextmsg;
670             }
671
672        /* set other returned variables */
673            *logo   = hd.logo;
674            *length = hd.size;
675
676trackit:
677        /* find msg logo in tracked list */
678             for ( it=0 ; it < nlogo ; it++ )
679             {
680                if ( region->key    != trak[it].memkey      )  continue;
681                if ( hd.logo.type   != trak[it].logo.type   )  continue;
682                if ( hd.logo.mod    != trak[it].logo.mod    )  continue;
683                if ( hd.logo.instid != trak[it].logo.instid )  continue;
684                /* activate sequence tracking if 1st msg */
685                if ( !trak[it].active )
686                {
687                    trak[it].seq    = hd.seq;
688                    trak[it].active = 1;
689                }
690                goto sequence;
691             }
692        /* new logo, track it if there's room */
693             if ( nlogo == NTRACK_GET )
694             {
695                fprintf( stdout,
696                     "ERROR: tport_getmsg; exceeded NTRACK_GET\n");
697                if( status != GET_TOOBIG ) status = GET_NOTRACK; /*changed 960612:ldd*/
698                goto wrapup;
699             }
700             it = nlogo;
701             trak[it].memkey = region->key;
702             trak[it].logo   = hd.logo;
703             trak[it].seq    = hd.seq;
704             trak[it].active = 1;      /*960618:ldd*/
705             nlogo++;
706
707sequence:
708        /* check if sequence #'s match; update sequence # */
709             if ( status == GET_TOOBIG   )  goto wrapup; /*added 960612:ldd*/
710             if ( hd.seq != trak[it].seq )
711             {
712                status = GET_MISS;
713                trak[it].seq = hd.seq;
714             }
715             trak[it].seq++;
716
717        /* Ok, we're finished grabbing this one */
718             goto wrapup;
719
720          } /* end if of logo & getlogo match */
721       }    /* end for over getlogo */
722   }        /* end while over ring */
723
724/**** If you got here, there were no messages of requested logo(s) ****/
725
726   status = GET_NONE;
727
728/**** update outpointer (->msg after retrieved one) for all requested logos ****/
729
730wrapup:
731   for ( it=0 ; it < nlogo ; it++ )  /* for all message logos we're tracking */
732   {
733       if ( trak[it].memkey != region->key ) continue;
734       for ( j=0 ; j < nget ; j++ )  /* for all requested message logos */
735       {
736          if((getlogo[j].type   == trak[it].logo.type   || getlogo[j].type==WILD) &&
737             (getlogo[j].mod    == trak[it].logo.mod    || getlogo[j].mod==WILD)  &&
738             (getlogo[j].instid == trak[it].logo.instid || getlogo[j].instid==WILD) )
739          {
740             trak[it].keyout = keyget;
741          }
742       }
743    }
744
745   return( status );
746
747}
748
749
750/********************* function tport_putflag ************************/
751/*           Puts the kill flag into a shared memory region.         */
752/*********************************************************************/
753
754void tport_putflag( SHM_INFO *region,  /* shared memory info structure     */
755                    int       flag )   /* tells attached processes to exit */
756{
757   SHM_HEAD  *shm;
758
759   shm = region->addr;
760   shm->flag = flag;
761   return;
762}
763
764
765
766/*********************** function tport_getflag **********************/
767/*         Returns the kill flag from a shared memory region.        */
768/*********************************************************************/
769
770int tport_getflag( SHM_INFO *region )
771
772{
773   SHM_HEAD  *shm;
774
775   shm = region->addr;
776   return( (int)shm->flag );
777}
778
779
780/************************** tport_bufthr ****************************/
781/*     Thread to buffer input from one transport ring to another.   */
782/********************************************************************/
783void tport_bufthr( void *dummy )
784{
785   char          errnote[150];
786   MSG_LOGO      logo;
787   long          msgsize;
788   unsigned char msgseq;
789   int           res1, res2;
790   int           gotmsg;
791   HANDLE myHandle = GetCurrentThread();
792
793/* Reset my own thread priority
794   ****************************/
795   if ( SetThreadPriority( myHandle, THREAD_PRIORITY_TIME_CRITICAL ) == 0 )
796   {
797      printf( "Error setting buffer thread priority: %d\n", GetLastError() );
798      exit( -1 );
799   }
800
801/* Flush all existing messages from the public memory region
802   *********************************************************/
803   while( tport_copyfrom((SHM_INFO *) PubRegion, (MSG_LOGO *) Getlogo,
804                          Nget, &logo, &msgsize, (char *) Message,
805                          MaxMsgSize, &msgseq )  !=  GET_NONE  );
806
807   while ( 1 )
808   {
809      Sleep( 500 );
810
811/* If a terminate flag is found, copy it to the private ring.
812   Then, terminate this thread.
813   *********************************************************/
814      if ( tport_getflag( (SHM_INFO *) PubRegion ) == TERMINATE ||
815           tport_getflag( (SHM_INFO *) BufRegion ) == TERMINATE   )
816      {
817         tport_putflag( (SHM_INFO *) BufRegion, TERMINATE );
818         _endthread();
819      }
820
821/* Try to copy a message from the public memory region
822   ***************************************************/
823      do
824      {
825          res1 = tport_copyfrom((SHM_INFO *) PubRegion, (MSG_LOGO *) Getlogo,
826                                Nget, &logo, &msgsize, (char *) Message,
827                                MaxMsgSize, &msgseq );
828          gotmsg = 1;
829
830/* Handle return values
831   ********************/
832          switch ( res1 )
833          {
834          case GET_MISS_LAPPED:
835                sprintf( errnote,
836                        "tport_bufthr: Missed msg(s)  c%d m%d t%d  Overwritten, region:%ld.",
837                         (int) logo.instid, (int) logo.mod, (int) logo.type,
838                         PubRegion->key );
839                tport_buferror( ERR_LAPPED, errnote );
840                break;
841          case GET_MISS_SEQGAP:
842                sprintf( errnote,
843                        "tport_bufthr: Missed msg(s)  c%d m%d t%d  Sequence gap, region:%ld.",
844                         (int) logo.instid, (int) logo.mod, (int) logo.type,
845                         PubRegion->key );
846                tport_buferror( ERR_SEQGAP, errnote );
847                break;
848          case GET_NOTRACK:
849                sprintf( errnote,
850                        "tport_bufthr: Logo c%d m%d t%d not tracked; NTRACK_GET exceeded.",
851                        (int) logo.instid, (int) logo.mod, (int) logo.type );
852                tport_buferror( ERR_UNTRACKED, errnote );
853          case GET_OK:
854                break;
855          case GET_TOOBIG:
856                sprintf( errnote,
857                        "tport_bufthr: msg[%ld] c%d m%d t%d seq%d too big; skipped in region:%ld.",
858                         msgsize, (int) logo.instid, (int) logo.mod,
859                         (int) logo.type, (int) msgseq, PubRegion->key );
860                tport_buferror( ERR_OVERFLOW, errnote );
861          case GET_NONE:
862                gotmsg = 0;
863                break;
864          }
865
866/* If you did get a message, copy it to private ring
867   *************************************************/
868          if ( gotmsg )
869          {
870              res2 = tport_copyto( (SHM_INFO *) BufRegion, &logo,
871                                   msgsize, (char *) Message, msgseq );
872              switch (res2)
873              {
874              case PUT_TOOBIG:
875                 sprintf( errnote,
876                     "tport_bufthr: msg[%ld] (c%d m%d t%d) too big for Region:%ld.",
877                      msgsize, (int) logo.instid, (int) logo.mod, (int) logo.type,
878                      BufRegion->key );
879                 tport_buferror( ERR_OVERFLOW, errnote );
880              case PUT_OK:
881                 break;
882              }
883          }
884      } while ( res1 != GET_NONE );
885   }
886}
887
888
889/************************** tport_buffer ****************************/
890/*       Function to initialize the input buffering thread          */
891/********************************************************************/
892int tport_buffer( SHM_INFO  *region1,      /* transport ring             */
893                  SHM_INFO  *region2,      /* private ring               */
894                  MSG_LOGO  *getlogo,      /* array of logos to copy     */
895                  short      nget,         /* number of logos in getlogo */
896                  unsigned   maxMsgSize,   /* size of message buffer     */
897                  unsigned char module,    /* module id of main thread   */
898                  unsigned char instid )   /* instid id of main thread   */
899{
900   unsigned long thread_id;            /* Thread id of the buffer thread */
901
902/* Allocate message buffer
903   ***********************/
904   Message = (char *) malloc( maxMsgSize );
905   if ( Message == NULL )
906   {
907      fprintf( stdout, "tport_buffer: Error allocating message buffer\n" );
908      return -1;
909   }
910
911/* Copy function arguments to global variables
912   *******************************************/
913   PubRegion   = region1;
914   BufRegion   = region2;
915   Getlogo     = getlogo;
916   Nget        = nget;
917   MaxMsgSize  = maxMsgSize;
918   MyModuleId  = module;
919   MyInstid    = instid;
920
921/* Lookup message type for error messages
922   **************************************/
923   if ( GetType( "TYPE_ERROR", &TypeError ) != 0 )
924   {
925      fprintf( stderr,
926              "tport_buffer: Invalid message type <TYPE_ERROR>\n" );
927      return -1;
928   }
929
930/* Start the input buffer thread
931   *****************************/
932   thread_id = _beginthread( tport_bufthr, 0, NULL );
933
934   if ( thread_id == -1 )                /* Couldn't create thread */
935   {
936      fprintf( stderr, "tport_buffer: Can't start the buffer thread." );
937      return -1;
938   }
939   return 0;
940}
941
942
943/********************** function tport_copyfrom *********************/
944/*      get a message out of public shared memory; save the         */
945/*     sequence number from the transport layer, with the intent    */
946/*       of copying it to a private (buffering) memory ring         */
947/********************************************************************/
948
949int tport_copyfrom( SHM_INFO  *region,   /* info structure for memory region */
950                    MSG_LOGO  *getlogo,  /* requested logo(s)                */
951                    short      nget,     /* number of logos in getlogo       */
952                    MSG_LOGO  *logo,     /* logo of retrieved message        */
953                    long      *length,   /* size of retrieved message        */
954                    char      *msg,      /* retrieved message                */
955                    long       maxsize,  /* max length for retrieved message */
956                    unsigned char *seq ) /* TPORT_HEAD seq# of retrieved msg */
957{
958   static MSG_TRACK  trak[NTRACK_GET]; /* sequence #, outpointer keeper     */
959   static int        nlogo;            /* # modid,type,instid combos so far */
960   int               it;               /* index into trak                   */
961   SHM_HEAD         *shm;              /* pointer to start of memory region */
962   char             *ring;             /* pointer to ring part of memory    */
963   TPORT_HEAD       *tmphd;            /* temp pointer into shared memory   */
964   unsigned long     ir;               /* index into the ring               */
965   long              nfill;            /* bytes from ir to ring's last-byte */
966   long              nwrap;            /* bytes to grab from front of ring  */
967   TPORT_HEAD        hd;               /* transport header from memory      */
968   char             *h;                /* pointer to transport layer header */
969   int               ih;               /* index into the transport header   */
970   unsigned long     keyin;            /* in-pointer to shared memory       */
971   unsigned long     keyold;           /* oldest complete message in memory */
972   unsigned long     keyget;           /* pointer at which to start search  */
973   int               status = GET_OK;  /* how did retrieval go?             */
974   int               lapped = 0;       /* = 1 if memory ring has been over- */
975                                       /* written since last tport_copyfrom */
976   int               trakked;          /* flag for trakking list entries    */
977   int               i,j;
978
979/**** Get the pointers set up ****/
980
981   shm  = region->addr;
982   ring = (char *) shm + sizeof(SHM_HEAD);
983   h    = (char *) (&hd);
984
985/**** First time around, initialize sequence counters, outpointers ****/
986
987   if (Copyfrom_Init)
988   {
989       nlogo = 0;
990
991       for( i=0 ; i < NTRACK_GET ; i++ )
992       {
993          trak[i].memkey      = 0;
994          trak[i].logo.type   = 0;
995          trak[i].logo.mod    = 0;
996          trak[i].logo.instid = 0;
997          trak[i].seq         = 0;
998          trak[i].keyout      = 0;
999          trak[i].active      = 0; /*960618:ldd*/
1000       }
1001       Copyfrom_Init = 0;
1002   }
1003
1004/**** make sure all requested logos are entered in tracking list ****/
1005
1006   for ( j=0 ; j < nget ; j++ )  /* for all requested message logos */
1007   {
1008       trakked = 0;  /* assume it's not being trakked */
1009       for( it=0 ; it < nlogo ; it++ )  /* for all logos we're tracking */
1010       {
1011          if( region->key       != trak[it].memkey      ) continue;
1012          if( getlogo[j].type   != trak[it].logo.type   ) continue;
1013          if( getlogo[j].mod    != trak[it].logo.mod    ) continue;
1014          if( getlogo[j].instid != trak[it].logo.instid ) continue;
1015          trakked = 1;  /* found it in the trakking list! */
1016          break;
1017       }
1018       if( trakked ) continue;
1019    /* Make an entry in trak for this logo; if there's room */
1020       if ( nlogo < NTRACK_GET )
1021       {
1022          it = nlogo;
1023          trak[it].memkey = region->key;
1024          trak[it].logo   = getlogo[j];
1025          nlogo++;
1026       }
1027   }
1028
1029/**** find latest starting index to look for any of the requested logos ****/
1030
1031findkey:
1032
1033   keyget = 0;
1034
1035   for ( it=0 ; it < nlogo ; it++ )  /* for all message logos we're tracking */
1036   {
1037       if ( trak[it].memkey != region->key ) continue;
1038       for ( j=0 ; j < nget ; j++ )  /* for all requested message logos */
1039       {
1040          if((getlogo[j].type   == trak[it].logo.type   || getlogo[j].type==WILD) &&
1041             (getlogo[j].mod    == trak[it].logo.mod    || getlogo[j].mod==WILD)  &&
1042             (getlogo[j].instid == trak[it].logo.instid || getlogo[j].instid==WILD) )
1043          {
1044             if ( trak[it].keyout > keyget )  keyget = trak[it].keyout;
1045          }
1046       }
1047   }
1048
1049/**** make sure you haven't been lapped by tport_copyto or tport_putmsg ****/
1050   if ( keyget < shm->keyold ) {
1051      keyget = shm->keyold;
1052      lapped = 1;
1053   }
1054
1055/**** See if keyin and keyold were wrapped and reset by tport_putmsg; ****/
1056/****       If so, reset trak[xx].keyout and go back to findkey       ****/
1057
1058   keyin = shm->keyin;
1059   if ( keyget > keyin )
1060   {
1061      keyold = shm->keyold;
1062      for ( it=0 ; it < nlogo ; it++ )
1063      {
1064         if( trak[it].memkey == region->key )
1065         {
1066          /* reset keyout */
1067/*DEBUG*/    /*printf("tport_copyfrom: Pre-reset:  keyout=%10u    keyold=%10u  keyin=%10u\n",
1068                     trak[it].keyout, keyold, keyin );*/
1069             trak[it].keyout = trak[it].keyout % shm->keymax;
1070/*DEBUG*/    /*printf("tport_copyfrom:  Intermed:  keyout=%10u    keyold=%10u  keyin=%10u\n",
1071                     trak[it].keyout, keyold, keyin );*/
1072
1073          /* make sure new keyout points to keyin or to a msg's first-byte; */
1074          /* if not, we've been lapped, so set keyout to keyold             */
1075             ir    = trak[it].keyout;
1076             tmphd = (TPORT_HEAD *) &ring[ir];
1077             if ( trak[it].keyout == keyin   ||
1078                  (keyin-trak[it].keyout)%shm->keymax == 0 )
1079             {
1080/*DEBUG*/       /*printf("tport_copyfrom:  Intermed:  keyout=%10u  same as keyin\n",
1081                        trak[it].keyout );*/
1082                trak[it].keyout = keyin;
1083             }
1084             else if( tmphd->start != FIRST_BYTE )
1085             {
1086/*DEBUG*/       /*printf("tport_copyfrom:  Intermed:  keyout=%10u  does not point to FIRST_BYTE\n",
1087                        trak[it].keyout );*/
1088                trak[it].keyout = keyold;
1089                lapped = 1;
1090             }
1091
1092          /* else, make sure keyout's value is between keyold and keyin */
1093             else if ( trak[it].keyout < keyold )
1094             {
1095                do {
1096                    trak[it].keyout += shm->keymax;
1097                } while ( trak[it].keyout < keyold );
1098             }
1099/*DEBUG*/    /*printf("tport_copyfrom:     Reset:  keyout=%10u    keyold=%10u  keyin=%10u\n",
1100                     trak[it].keyout, keyold, keyin );*/
1101         }
1102      }
1103    /*fprintf( stdout,
1104          "NOTICE: tport_copyfrom; keyin wrapped, keyout(s) reset; Region %ld\n",
1105           region->key );*/
1106
1107      goto findkey;
1108   }
1109
1110
1111/**** Find next message from requested type, module, instid ****/
1112
1113nextmsg:
1114
1115   while ( keyget < keyin )
1116   {
1117   /* make sure you haven't been lapped by tport_copyto or tport_putmsg */
1118       if ( keyget < shm->keyold ) {
1119          keyget = shm->keyold;
1120          lapped = 1;
1121       }
1122
1123   /* load next header; make sure you weren't lapped */
1124       ir = keyget % shm->keymax;
1125       for ( ih=0 ; ih < sizeof(TPORT_HEAD) ; ih++ )
1126       {
1127          if ( ir >= shm->keymax )  ir -= shm->keymax;
1128          h[ih] = ring[ir++];
1129       }
1130       if ( keyget < shm->keyold ) continue;  /*added 960612:ldd*/
1131
1132   /* make sure it starts at beginning of a header */
1133       if ( hd.start != FIRST_BYTE )
1134       {
1135          fprintf( stdout,
1136                  "ERROR: tport_copyfrom; keyget not at FIRST_BYTE, Region %ld\n",
1137                   region->key );
1138          exit( 1 );
1139       }
1140       keyget += sizeof(TPORT_HEAD) + hd.size;
1141
1142   /* see if this msg matches any requested type */
1143       for ( j=0 ; j < nget ; j++ )
1144       {
1145          if((getlogo[j].type   == hd.logo.type   || getlogo[j].type == WILD) &&
1146             (getlogo[j].mod    == hd.logo.mod    || getlogo[j].mod  == WILD) &&
1147             (getlogo[j].instid == hd.logo.instid || getlogo[j].instid == WILD) )
1148          {
1149
1150/**** Found a message of requested logo; retrieve it! ****/
1151        /* complain if retreived msg is too big */
1152             if ( hd.size > maxsize )
1153             {
1154               *logo   = hd.logo;
1155               *length = hd.size;
1156               *seq    = hd.seq;
1157                status = GET_TOOBIG;
1158                goto trackit;    /*changed 960612:ldd*/
1159             }
1160        /* copy message by chunks to caller's address */
1161             nwrap = ir + hd.size - shm->keymax;
1162             if ( nwrap <= 0 )
1163             {
1164                memcpy( (void *) msg, (void *) &ring[ir], hd.size );
1165             }
1166             else
1167             {
1168                nfill = hd.size - nwrap;
1169                memcpy( (void *) &msg[0],     (void *) &ring[ir], nfill );
1170                memcpy( (void *) &msg[nfill], (void *) &ring[0],  nwrap );
1171             }
1172        /* see if we got lapped by tport_copyto or tport_putmsg while copying msg */
1173        /* if we did, go back and try to get a msg cleanly          */
1174             keyold = shm->keyold;
1175             if ( keyold >= keyget )
1176             {
1177                keyget = keyold;
1178                lapped = 1;
1179                goto nextmsg;
1180             }
1181
1182        /* set other returned variables */
1183            *logo   = hd.logo;
1184            *length = hd.size;
1185            *seq    = hd.seq;
1186
1187trackit:
1188        /* find logo in tracked list */
1189             for ( it=0 ; it < nlogo ; it++ )
1190             {
1191                if ( region->key    != trak[it].memkey      )  continue;
1192                if ( hd.logo.type   != trak[it].logo.type   )  continue;
1193                if ( hd.logo.mod    != trak[it].logo.mod    )  continue;
1194                if ( hd.logo.instid != trak[it].logo.instid )  continue;
1195                /* activate sequence tracking if 1st msg */
1196                if ( !trak[it].active )
1197                {
1198                    trak[it].seq    = hd.seq;
1199                    trak[it].active = 1;
1200                }
1201                goto sequence;
1202             }
1203        /* new logo, track it if there's room */
1204             if ( nlogo == NTRACK_GET )
1205             {
1206                fprintf( stdout,
1207                     "ERROR: tport_copyfrom; exceeded NTRACK_GET\n");
1208                if( status != GET_TOOBIG ) status = GET_NOTRACK; /*changed 960612:ldd*/
1209                goto wrapup;
1210             }
1211             it = nlogo;
1212             trak[it].memkey = region->key;
1213             trak[it].logo   = hd.logo;
1214             trak[it].seq    = hd.seq;
1215             trak[it].active = 1;      /*960618:ldd*/
1216             nlogo++;
1217
1218sequence:
1219        /* check if sequence #'s match; update sequence # */
1220             if ( status == GET_TOOBIG   )  goto wrapup; /*added 960612:ldd*/
1221             if ( hd.seq != trak[it].seq )
1222             {
1223                if (lapped)  status = GET_MISS_LAPPED;
1224                else         status = GET_MISS_SEQGAP;
1225                trak[it].seq = hd.seq;
1226             }
1227             trak[it].seq++;
1228
1229        /* Ok, we're finished grabbing this one */
1230             goto wrapup;
1231
1232          } /* end if of logo & getlogo match */
1233       }    /* end for over getlogo */
1234   }        /* end while over ring */
1235
1236/**** If you got here, there were no messages of requested logo(s) ****/
1237
1238   status = GET_NONE;
1239
1240/**** update outpointer (->msg after retrieved one) for all requested logos ****/
1241
1242wrapup:
1243   for ( it=0 ; it < nlogo ; it++ )  /* for all message logos we're tracking */
1244   {
1245       if ( trak[it].memkey != region->key ) continue;
1246       for ( j=0 ; j < nget ; j++ )  /* for all requested message logos */
1247       {
1248          if((getlogo[j].type   == trak[it].logo.type   || getlogo[j].type==WILD) &&
1249             (getlogo[j].mod    == trak[it].logo.mod    || getlogo[j].mod==WILD)  &&
1250             (getlogo[j].instid == trak[it].logo.instid || getlogo[j].instid==WILD) )
1251          {
1252             trak[it].keyout = keyget;
1253          }
1254       }
1255    }
1256
1257   return( status );
1258
1259}
1260
1261
1262/*********************** function tport_copyto ***********************/
1263/*           Puts a message into a shared memory region.             */
1264/*    Preserves the sequence number (passed as argument) as the      */
1265/*                transport layer sequence number                    */
1266/*********************************************************************/
1267
1268int tport_copyto( SHM_INFO    *region,  /* info structure for memory region     */
1269                  MSG_LOGO    *putlogo, /* type, module, instid of incoming msg */
1270                  long         length,  /* size of incoming message             */
1271                  char        *msg,     /* pointer to incoming message          */
1272                  unsigned char seq )   /* preserve as sequence# in TPORT_HEAD  */
1273{
1274   SHM_HEAD         *shm;              /* pointer to start of memory region   */
1275   char             *ring;             /* pointer to ring part of memory      */
1276   unsigned long     ir;               /* index into memory ring              */
1277   long              nfill;            /* # bytes from ir to ring's last-byte */
1278   long              nwrap;            /* # bytes to wrap to front of ring    */
1279   TPORT_HEAD        hd;               /* transport layer header to put       */
1280   char             *h;                /* pointer to transport layer header   */
1281   TPORT_HEAD        old;              /* transport header of oldest msg      */
1282   char             *o;                /* pointer to oldest transport header  */
1283   int j;
1284   int retval = PUT_OK;                /* return value for this function      */
1285
1286/**** First time around, initialize semaphore controls ****/
1287
1288   if (Copyto_Init)
1289   {
1290       Copyto_Init  = 0;
1291   }
1292
1293/**** Set up pointers for shared memory, etc. ****/
1294
1295   shm  = region->addr;
1296   ring = (char *) shm + sizeof(SHM_HEAD);
1297   h    = (char *) (&hd);
1298   o    = (char *) (&old);
1299
1300/**** First, see if the incoming message will fit in the memory region ****/
1301
1302   if ( length + sizeof(TPORT_HEAD) > shm->keymax )
1303   {
1304      fprintf( stdout,
1305              "ERROR: tport_copyto; message too large (%ld) for Region %ld\n",
1306               length, region->key);
1307      return( PUT_TOOBIG );
1308   }
1309
1310/**** Change semaphore to let others know you're using memory ****/
1311
1312   WaitForSingleObject( region->hMutex, INFINITE );
1313
1314/**** Store everything you need in the transport header ****/
1315
1316   hd.start = FIRST_BYTE;
1317   hd.size  = length;
1318   hd.logo  = *putlogo;
1319   hd.seq   = seq;
1320
1321/**** First see if keyin will wrap; if so, reset both keyin and keyold ****/
1322
1323   if ( shm->keyin + sizeof(TPORT_HEAD) + length  <  shm->keyold )
1324   {
1325       shm->keyin  = shm->keyin  % shm->keymax;
1326       shm->keyold = shm->keyold % shm->keymax;
1327       if ( shm->keyin <= shm->keyold ) shm->keyin += shm->keymax;
1328     /*fprintf( stdout,
1329               "NOTICE: tport_copyto; keyin wrapped & reset; Region %ld\n",
1330                region->key );*/
1331   }
1332
1333/**** Then see if there's enough room for new message in shared memory ****/
1334/****      If not, "delete" oldest messages until there's room         ****/
1335
1336   while( shm->keyin + sizeof(TPORT_HEAD) + length - shm->keyold > shm->keymax )
1337   {
1338      ir = shm->keyold % shm->keymax;
1339      if ( ring[ir] != FIRST_BYTE )
1340      {
1341          fprintf( stdout,
1342                  "ERROR: tport_copyto; keyold not at FIRST_BYTE, Region %ld\n",
1343                   region->key );
1344          retval = TPORT_FATAL;
1345          goto release_semaphore;
1346      }
1347      for ( j=0 ; j < sizeof(TPORT_HEAD) ; j++ )
1348      {
1349         if ( ir >= shm->keymax )   ir -= shm->keymax;
1350         o[j] = ring[ir++];
1351      }
1352      shm->keyold += sizeof(TPORT_HEAD) + old.size;
1353   }
1354
1355/**** Now copy transport header into shared memory by chunks... ****/
1356
1357   ir = shm->keyin % shm->keymax;
1358   nwrap = ir + sizeof(TPORT_HEAD) - shm->keymax;
1359   if ( nwrap <= 0 )
1360   {
1361         memcpy( (void *) &ring[ir], (void *) h, sizeof(TPORT_HEAD) );
1362   }
1363   else
1364   {
1365         nfill = sizeof(TPORT_HEAD) - nwrap;
1366         memcpy( (void *) &ring[ir], (void *) &h[0],     nfill );
1367         memcpy( (void *) &ring[0],  (void *) &h[nfill], nwrap );
1368   }
1369   ir += sizeof(TPORT_HEAD);
1370   if ( ir >= shm->keymax )  ir -= shm->keymax;
1371
1372/**** ...and copy message into shared memory by chunks ****/
1373
1374   nwrap = ir + length - shm->keymax;
1375   if ( nwrap <= 0 )
1376   {
1377         memcpy( (void *) &ring[ir], (void *) msg, length );
1378   }
1379   else
1380   {
1381         nfill = length - nwrap;
1382         memcpy( (void *) &ring[ir], (void *) &msg[0],     nfill );
1383         memcpy( (void *) &ring[0],  (void *) &msg[nfill], nwrap );
1384   }
1385   shm->keyin += sizeof(TPORT_HEAD) + length;
1386
1387/**** Finished with shared memory, let others know via semaphore ****/
1388
1389release_semaphore:
1390   ReleaseMutex(region->hMutex);
1391
1392   if( retval == TPORT_FATAL ) exit( 1 );
1393   return( retval );
1394}
1395
1396
1397/************************* tport_buferror ***************************/
1398/*  Build an error message and put it in the public memory region   */
1399/********************************************************************/
1400void tport_buferror( short  ierr,       /* 2-byte error word       */
1401                     char  *note  )     /* string describing error */
1402{
1403        MSG_LOGO    logo;
1404        char        msg[256];
1405        long        size;
1406        time_t      t;
1407
1408        logo.instid = MyInstid;
1409        logo.mod    = MyModuleId;
1410        logo.type   = TypeError;
1411
1412        time( &t );
1413        sprintf( msg, "%ld %hd %s\n", (long) t, ierr, note );
1414        size = strlen( msg );   /* don't include the null byte in the message */
1415
1416        if ( tport_putmsg( (SHM_INFO *) PubRegion, &logo, size, msg ) != PUT_OK )
1417        {
1418            printf("tport_bufthr:  Error sending error:%hd for module:%d.\n",
1419                    ierr, MyModuleId );
1420        }
1421        return;
1422}
1423
1424
1425/************************ function tport_syserr **********************/
1426/*                 Print a system error and terminate.               */
1427/*********************************************************************/
1428
1429void tport_syserr( char *msg,   /* message to print (which routine had an error) */
1430                   long  key )  /* identifies which memory region had the error  */
1431{
1432   extern int   sys_nerr;
1433/*   extern char *sys_errlist[];*/
1434
1435   long err = GetLastError();   /* Override with per thread err */
1436
1437   fprintf( stdout, "ERROR: %s (%d", msg, err );
1438   fprintf( stdout, "; %s) Region: %ld\n", strerror(err), key);
1439
1440/*   if ( err > 0 && err < sys_nerr )
1441      fprintf( stdout,"; %s) Region: %ld\n", sys_errlist[err], key );
1442   else
1443      fprintf( stdout, ") Region: %ld\n", key ); */
1444
1445   exit( 1 );
1446}
1447
Note: See TracBrowser for help on using the repository browser.