source: trunk/src/libsrc/winnt/transport.c @ 3161

Revision 3161, 52.1 KB checked in by paulf, 13 years ago (diff)

minor change to wording of OpenFileMapping? error

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1
2/*
3 *   THIS FILE IS UNDER RCS - DO NOT MODIFY UNLESS YOU HAVE
4 *   CHECKED IT OUT USING THE COMMAND CHECKOUT.
5 *
6 *    $Id$
7 *
8 *    Revision history:
9 *     $Log$
10 *     Revision 1.8  2007/11/28 16:38:08  paulf
11 *     minor change to wording of OpenFileMapping error
12 *
13 *     Revision 1.7  2007/03/29 13:46:40  paulf
14 *     fixed yet another time_t casting issue in tport_buferror()
15 *
16 *     Revision 1.6  2006/10/05 14:49:51  stefan
17 *     commented out unused extern in tport_syserror
18 *
19 *     Revision 1.5  2005/04/15 17:31:27  dietz
20 *     Modified tport_bufthr() to look for termination flag in both the
21 *     public and private rings.
22 *
23 *     Revision 1.4  2004/03/22 21:15:59  kohler
24 *     Added: #include <stdlib.h>
25 *
26 *     Revision 1.3  2001/05/04 23:43:54  dietz
27 *     Changed flag arg of tport_putflag from short to int to handle
28 *     processids properly.
29 *
30 *     Revision 1.2  2000/06/02 18:19:48  dietz
31 *     Fixed tport_putmsg,tport_copyto to always release semaphore before returning
32 *
33 *     Revision 1.1  2000/02/14 18:53:30  lucky
34 *     Initial revision
35 *
36 *
37 */
38
39/********************************************************************/
40/*                                                        6/2000    */
41/*                           transport.c                            */
42/*                                                                  */
43/*   Transport layer functions to access shared memory regions.     */
44/*                                                                  */
45/*   written by Lynn Dietz, Will Kohler with inspiration from       */
46/*       Carl Johnson, Alex Bittenbinder, Barbara Bogaert           */
47/*                                                                  */
48/********************************************************************/
49
50/* ***** Notes on development, delete when appropriate
511. Change the quotes for the transport.h and earthworm.h includes
52   when these are moved to the appropriately pathed included directory.
53*/
54#include <windows.h>
55#include <sys/types.h>
56#include <stdio.h>
57#include <stdlib.h>
58#include <errno.h>
59#include <string.h>
60#include <time.h>
61#include <process.h>
62#include <transport.h>
63
64static short Put_Init=1;           /* initialization flag */
65static short Get_Init=1;           /* initialization flag */
66static short Copyfrom_Init=1;      /* initialization flag */
67static short Copyto_Init  =1;      /* initialization flag */
68
69/* These functions are for internal use by transport functions only
70   ****************************************************************/
71void  tport_syserr  ( char *, long );
72void  tport_buferror( short, char * );
73
74/* These statements and variables are required by the functions of
75   the input-buffering thread
76   ***************************************************************/
77#include "earthworm.h"
78volatile SHM_INFO *PubRegion;      /* transport public ring      */
79volatile SHM_INFO *BufRegion;      /* pointer to private ring    */
80volatile MSG_LOGO *Getlogo;        /* array of logos to copy     */
81volatile short     Nget;           /* number of logos in getlogo */
82volatile unsigned  MaxMsgSize;     /* size of message buffer     */
83volatile char     *Message;        /* message buffer             */
84static unsigned char MyModuleId;   /* module id of main thread   */
85unsigned char      MyInstid;       /* instid of main thread      */
86unsigned char      TypeError;      /* type for error messages    */
87
88/******************** function tport_create *************************/
89/*         Create a shared memory region & its semaphore,           */
90/*           attach to it and initialize header values.             */
91/********************************************************************/
92void tport_create( SHM_INFO *region,   /* info structure for memory region  */
93                   long      nbytes,   /* size of shared memory region      */
94                   long      memkey )  /* key to shared memory region       */
95{
96   SHM_HEAD       *shm;       /* pointer to start of memory region */
97   HANDLE         hshare;     // Handle to memory shared file
98   HANDLE         hmutex;     // Handle to mutex object
99   char           share[20];  // Shared file name from memkey
100   char           mutex[20];  // Mutex name
101   int            err;        // Error code from GetLastError()
102
103/**** Create shared memory region ****/
104   sprintf(share, "SHR_%ld", memkey);
105   hshare = CreateFileMapping(
106      (HANDLE)0xFFFFFFFF,  // Request memory file (swap space)
107      NULL,                // Security attributes
108      PAGE_READWRITE,      // Access restrictions
109      0,                   // High order size (for very large mappings)
110      nbytes,              // Low order size
111      share);              // Name of file for other processes
112   if ( hshare == NULL )
113   {
114      err = GetLastError();
115      tport_syserr( "CreateFileMapping", err);
116   }
117
118/**** Attach to shared memory region ****/
119   shm = (SHM_HEAD *) MapViewOfFile(
120      hshare,              // File object to map
121      FILE_MAP_WRITE,      // Access desired
122      0,                   // High-order 32 bits of file offset
123      0,                   // Low-order 32 bits of file offset
124      nbytes);             // Number of bytes to map
125   if ( shm == NULL )
126   {
127      err = GetLastError();
128      tport_syserr( "MapViewOfFile", err );
129   }
130
131/**** Initialize shared memory region header ****/
132   shm->nbytes = nbytes;
133   shm->keymax = nbytes - sizeof(SHM_HEAD);
134   shm->keyin  = sizeof(SHM_HEAD);
135   shm->keyold = shm->keyin;
136   shm->flag   = 0;
137
138/**** Make semaphore for this shared memory region & set semval = SHM_FREE ****/
139   sprintf(mutex, "MTX_%ld", memkey);
140   hmutex = CreateMutex(
141      NULL,                // Security attributes
142      FALSE,               // Initial ownership
143      mutex);              // Name of mutex (derived from memkey)
144   if ( hmutex == NULL )
145   {
146      err = GetLastError();
147      tport_syserr( "CreateMutex", err);
148   }
149
150/**** set values in the shared memory information structure ****/
151   region->addr = shm;
152   region->hShare = hshare;
153   region->hMutex = hmutex;
154   region->key  = memkey;
155}
156
157
158/******************** function tport_destroy *************************/
159/*                Destroy a shared memory region.                    */
160/*********************************************************************/
161
162void tport_destroy( SHM_INFO *region )
163{
164   int err;
165
166/***** Set kill flag, give other attached programs time to terminate ****/
167
168   tport_putflag( region, TERMINATE );
169   Sleep( 1000 );
170
171/***** Detach from shared memory region *****/
172   if(!UnmapViewOfFile( region->addr )) {
173      err = GetLastError();
174      tport_syserr( "UnmapViewOfFile", err);
175   }
176
177/***** Destroy semaphore set for shared memory region *****/
178   if(!CloseHandle(region->hMutex)) {
179      err = GetLastError();
180      tport_syserr( "CloseHandle (mutex)", err);
181   }
182
183
184/***** Destroy shared memory region *****/
185   if(!CloseHandle(region->hShare)) {
186      err = GetLastError();
187      tport_syserr( "CloseHandle (share)", err);
188   }
189}
190
191/******************** function tport_attach *************************/
192/*            Map to an existing shared memory region.              */
193/********************************************************************/
194
195void tport_attach( SHM_INFO *region,   /* info structure for memory region  */
196                   long      memkey )  /* key to shared memory region       */
197{
198   SHM_HEAD       *shm;       /* pointer to start of memory region */
199   HANDLE         hshare;     // Handle to memory shared file
200   HANDLE         hmutex;     // Handle to mutex object
201   char           share[20];  // Shared file name from memkey
202   char           mutex[20];  // Mutex name
203   int            err;        // Error code from GetLastError()
204
205/**** Create shared memory region ****/
206   sprintf(share, "SHR_%ld", memkey);
207   hshare = OpenFileMapping(
208       FILE_MAP_WRITE,
209       TRUE,
210       share);
211   if ( hshare == NULL )
212   {
213      err = GetLastError();
214      tport_syserr( "OpenFileMapping (Earthworm may not be running or your environment may not be set properly)", err);
215   }
216
217/**** Attach to shared memory region ****/
218   shm = (SHM_HEAD *) MapViewOfFile(
219      hshare,              // File object to map
220      FILE_MAP_WRITE,      // Access desired
221      0,                   // High-order 32 bits of file offset
222      0,                   // Low-order 32 bits of file offset
223      0);                  // Number of bytes to map
224   if ( shm == NULL )
225   {
226      err = GetLastError();
227      tport_syserr( "MapViewOfFile", err );
228   }
229
230/**** Make semaphore for this shared memory region & set semval = SHM_FREE ****/
231   sprintf(mutex, "MTX_%ld", memkey);
232   hmutex = CreateMutex(
233      NULL,                // Security attributes
234      FALSE,               // Initial ownership
235      mutex);              // Name of mutex (derived from memkey)
236   if ( hmutex == NULL )
237   {
238      err = GetLastError();
239      tport_syserr( "CreateMutex", err);
240   }
241
242/**** set values in the shared memory information structure ****/
243   region->addr = shm;
244   region->hShare = hshare;
245   region->hMutex = hmutex;
246   region->key  = memkey;
247}
248
249/******************** function tport_detach **************************/
250/*                Detach from a shared memory region.                */
251/*********************************************************************/
252
253void tport_detach( SHM_INFO *region )
254{
255   int err;
256
257/***** Detach from shared memory region *****/
258   if(!UnmapViewOfFile( region->addr )) {
259      err = GetLastError();
260      tport_syserr( "UnmapViewOfFile", err);
261   }
262
263/***** Destroy semaphore set for shared memory region *****/
264   if(!CloseHandle(region->hMutex)) {
265      err = GetLastError();
266      tport_syserr( "CloseHandle (mutex)", err);
267   }
268
269
270/***** Destroy shared memory region *****/
271   if(!CloseHandle(region->hShare)) {
272      err = GetLastError();
273      tport_syserr( "CloseHandle (share)", err);
274   }
275}
276
277
278
279/*********************** function tport_putmsg ***********************/
280/*            Put a message into a shared memory region.             */
281/*            Assigns a transport-layer sequence number.             */
282/*********************************************************************/
283
284int tport_putmsg( SHM_INFO *region,    /* info structure for memory region    */
285                  MSG_LOGO *putlogo,   /* type, module, instid of incoming msg */
286                  long      length,    /* size of incoming message            */
287                  char     *msg )      /* pointer to incoming message         */
288{
289   volatile static MSG_TRACK  trak[NTRACK_PUT];   /* sequence number keeper   */
290   volatile static int        nlogo;              /* # of logos seen so far   */
291   int                        it;                 /* index into trak          */
292   SHM_HEAD         *shm;              /* pointer to start of memory region   */
293   char             *ring;             /* pointer to ring part of memory      */
294   unsigned long     ir;               /* index into memory ring              */
295   long              nfill;            /* # bytes from ir to ring's last-byte */
296   long              nwrap;            /* # bytes to wrap to front of ring    */
297   TPORT_HEAD        hd;               /* transport layer header to put       */
298   char             *h;                /* pointer to transport layer header   */
299   TPORT_HEAD        old;              /* transport header of oldest msg      */
300   char             *o;                /* pointer to oldest transport header  */
301   int j;
302   int retval = PUT_OK;                /* return value for this function      */
303
304/**** First time around, init the sequence counters, semaphore controls ****/
305
306   if (Put_Init)
307   {
308       nlogo    = 0;
309
310       for( j=0 ; j < NTRACK_PUT ; j++ )
311       {
312          trak[j].memkey      = 0;
313          trak[j].logo.type   = 0;
314          trak[j].logo.mod    = 0;
315          trak[j].logo.instid = 0;
316          trak[j].seq         = 0;
317          trak[j].keyout      = 0;
318       }
319
320       Put_Init = 0;
321   }
322
323/**** Set up pointers for shared memory, etc. ****/
324
325   shm  = region->addr;
326   ring = (char *) shm + sizeof(SHM_HEAD);
327   h    = (char *) (&hd);
328   o    = (char *) (&old);
329
330/**** First, see if the incoming message will fit in the memory region ****/
331
332   if ( length + sizeof(TPORT_HEAD) > shm->keymax )
333   {
334      fprintf( stdout,
335              "ERROR: tport_putmsg; message too large (%ld) for Region %ld\n",
336               length, region->key);
337      return( PUT_TOOBIG );
338   }
339
340/**** Change semaphore; let others know you're using tracking structure & memory  ****/
341
342   WaitForSingleObject(region->hMutex, INFINITE);
343
344/**** Next, find incoming logo in list of combinations already seen ****/
345
346   for( it=0 ; it < nlogo ; it++ )
347   {
348      if ( region->key     != trak[it].memkey     )  continue;
349      if ( putlogo->type   != trak[it].logo.type  )  continue;
350      if ( putlogo->mod    != trak[it].logo.mod   )  continue;
351      if ( putlogo->instid != trak[it].logo.instid ) continue;
352      goto build_header;
353   }
354
355/**** Incoming logo is a new combination; store it, if there's room ****/
356
357   if ( nlogo == NTRACK_PUT )
358   {
359      fprintf( stdout,
360              "ERROR: tport_putmsg; exceeded NTRACK_PUT, msg not sent\n");
361      retval = PUT_NOTRACK;
362      goto release_semaphore;
363   }
364   it = nlogo;
365   trak[it].memkey =  region->key;
366   trak[it].logo   = *putlogo;
367   nlogo++;
368
369/**** Store everything you need in the transport header ****/
370
371build_header:
372   hd.start = FIRST_BYTE;
373   hd.size  = length;
374   hd.logo  = trak[it].logo;
375   hd.seq   = trak[it].seq++;
376
377/**** In shared memory, see if keyin will wrap; if so, reset keyin and keyold ****/
378
379   if ( shm->keyin + sizeof(TPORT_HEAD) + length  <  shm->keyold )
380   {
381       shm->keyin  = shm->keyin  % shm->keymax;
382       shm->keyold = shm->keyold % shm->keymax;
383       if ( shm->keyin <= shm->keyold ) shm->keyin += shm->keymax;
384     /*fprintf( stdout,
385               "NOTICE: tport_putmsg; keyin wrapped & reset; Region %ld\n",
386                region->key );*/
387   }
388
389/**** Then see if there's enough room for new message in shared memory ****/
390/****      If not, "delete" oldest messages until there's room         ****/
391
392   while( shm->keyin + sizeof(TPORT_HEAD) + length - shm->keyold > shm->keymax )
393   {
394      ir = shm->keyold % shm->keymax;
395      if ( ring[ir] != FIRST_BYTE )
396      {
397          fprintf( stdout,
398                  "ERROR: tport_putmsg; keyold not at FIRST_BYTE, Region %ld\n",
399                   region->key );
400          retval = TPORT_FATAL;
401          goto release_semaphore;
402      }
403      for ( j=0 ; j < sizeof(TPORT_HEAD) ; j++ )
404      {
405         if ( ir >= shm->keymax )   ir -= shm->keymax;
406         o[j] = ring[ir++];
407      }
408      shm->keyold += sizeof(TPORT_HEAD) + old.size;
409   }
410
411/**** Now copy transport header into shared memory by chunks... ****/
412
413   ir = shm->keyin % shm->keymax;
414   nwrap = ir + sizeof(TPORT_HEAD) - shm->keymax;
415   if ( nwrap <= 0 )
416   {
417         memcpy( (void *) &ring[ir], (void *) h, sizeof(TPORT_HEAD) );
418   }
419   else
420   {
421         nfill = sizeof(TPORT_HEAD) - nwrap;
422         memcpy( (void *) &ring[ir], (void *) &h[0],     nfill );
423         memcpy( (void *) &ring[0],  (void *) &h[nfill], nwrap );
424   }
425   ir += sizeof(TPORT_HEAD);
426   if ( ir >= shm->keymax )  ir -= shm->keymax;
427
428/**** ...and copy message into shared memory by chunks ****/
429
430   nwrap = ir + length - shm->keymax;
431   if ( nwrap <= 0 )
432   {
433         memcpy( (void *) &ring[ir], (void *) msg, length );
434   }
435   else
436   {
437         nfill = length - nwrap;
438         memcpy( (void *) &ring[ir], (void *) &msg[0],     nfill );
439         memcpy( (void *) &ring[0],  (void *) &msg[nfill], nwrap );
440   }
441   shm->keyin += sizeof(TPORT_HEAD) + length;
442
443/**** Finished with shared memory, let others know via semaphore ****/
444
445release_semaphore:
446   ReleaseMutex(region->hMutex);
447
448   if( retval == TPORT_FATAL ) exit( 1 );
449   return( retval );
450}
451
452
453/*********************** function tport_getmsg ***********************/
454/*                 Get a message out of shared memory.               */
455/*********************************************************************/
456
457int tport_getmsg( SHM_INFO  *region,   /* info structure for memory region  */
458                  MSG_LOGO  *getlogo,  /* requested logo(s)                 */
459                  short      nget,     /* number of logos in getlogo        */
460                  MSG_LOGO  *logo,     /* logo of retrieved msg             */
461                  long      *length,   /* size of retrieved message         */
462                  char      *msg,      /* retrieved message                 */
463                  long       maxsize ) /* max length for retrieved message  */
464{
465   static MSG_TRACK  trak[NTRACK_GET]; /* sequence #, outpointer keeper     */
466   static int        nlogo;            /* # modid,type,instid combos so far */
467   int               it;               /* index into trak                   */
468   SHM_HEAD         *shm;              /* pointer to start of memory region */
469   char             *ring;             /* pointer to ring part of memory    */
470   TPORT_HEAD       *tmphd;            /* temp pointer into shared memory   */
471   unsigned long     ir;               /* index into the ring               */
472   long              nfill;            /* bytes from ir to ring's last-byte */
473   long              nwrap;            /* bytes to grab from front of ring  */
474   TPORT_HEAD        hd;               /* transport header from memory      */
475   char             *h;                /* pointer to transport layer header */
476   int               ih;               /* index into the transport header   */
477   unsigned long     keyin;            /* in-pointer to shared memory       */
478   unsigned long     keyold;           /* oldest complete message in memory */
479   unsigned long     keyget;           /* pointer at which to start search  */
480   int               status = GET_OK;  /* how did retrieval go?             */
481   int               trakked;          /* flag for trakking list entries    */
482   int               i,j;
483
484/**** Get the pointers set up ****/
485
486   shm  = region->addr;
487   ring = (char *) shm + sizeof(SHM_HEAD);
488   h    = (char *) (&hd);
489
490/**** First time around, initialize sequence counters, outpointers ****/
491
492   if (Get_Init)
493   {
494       nlogo = 0;
495
496       for( i=0 ; i < NTRACK_GET ; i++ )
497       {
498          trak[i].memkey      = 0;
499          trak[i].logo.type   = 0;
500          trak[i].logo.mod    = 0;
501          trak[i].logo.instid = 0;
502          trak[i].seq         = 0;
503          trak[i].keyout      = 0;
504          trak[i].active      = 0; /*960618:ldd*/
505       }
506       Get_Init = 0;
507   }
508
509/**** make sure all requested logos are entered in tracking list ****/
510
511   for ( j=0 ; j < nget ; j++ )  /* for all requested message logos */
512   {
513       trakked = 0;  /* assume it's not being trakked */
514       for( it=0 ; it < nlogo ; it++ )  /* for all logos we're tracking */
515       {
516          if( region->key       != trak[it].memkey      ) continue;
517          if( getlogo[j].type   != trak[it].logo.type   ) continue;
518          if( getlogo[j].mod    != trak[it].logo.mod    ) continue;
519          if( getlogo[j].instid != trak[it].logo.instid ) continue;
520          trakked = 1;  /* found it in the trakking list! */
521          break;
522       }
523       if( trakked ) continue;
524    /* Make an entry in trak for this logo; if there's room */
525       if ( nlogo < NTRACK_GET )
526       {
527          it = nlogo;
528          trak[it].memkey = region->key;
529          trak[it].logo   = getlogo[j];
530          nlogo++;
531       }
532   }
533
534/**** find latest starting index to look for any of the requested logos ****/
535
536findkey:
537
538   keyget = shm->keyold;
539
540   for ( it=0 ; it < nlogo ; it++ )  /* for all message logos we're tracking */
541   {
542       if ( trak[it].memkey != region->key ) continue;
543       for ( j=0 ; j < nget ; j++ )  /* for all requested message logos */
544       {
545          if((getlogo[j].type   == trak[it].logo.type   || getlogo[j].type==WILD  ) &&
546             (getlogo[j].mod    == trak[it].logo.mod    || getlogo[j].mod==WILD   ) &&
547             (getlogo[j].instid == trak[it].logo.instid || getlogo[j].instid==WILD) )
548          {
549             if ( trak[it].keyout > keyget )  keyget = trak[it].keyout;
550          }
551       }
552    }
553    keyin = shm->keyin;
554
555/**** See if keyin and keyold were wrapped and reset by tport_putmsg; ****/
556/****       If so, reset trak[xx].keyout and go back to findkey       ****/
557
558   if ( keyget > keyin )
559   {
560      keyold = shm->keyold;
561      for ( it=0 ; it < nlogo ; it++ )
562      {
563         if( trak[it].memkey == region->key )
564         {
565          /* reset keyout */
566/*DEBUG*/    /*printf("tport_getmsg: Pre-reset:  keyout=%10u    keyold=%10u  keyin=%10u\n",
567                     trak[it].keyout, keyold, keyin );*/
568             trak[it].keyout = trak[it].keyout % shm->keymax;
569/*DEBUG*/    /*printf("tport_getmsg:  Intermed:  keyout=%10u    keyold=%10u  keyin=%10u\n",
570                     trak[it].keyout, keyold, keyin );*/
571
572          /* make sure new keyout points to keyin or to a msg's first-byte; */
573          /* if not, we've been lapped, so set keyout to keyold             */
574             ir    = trak[it].keyout;
575             tmphd = (TPORT_HEAD *) &ring[ir];
576             if ( trak[it].keyout == keyin   ||
577                  (keyin-trak[it].keyout)%shm->keymax == 0 )
578             {
579/*DEBUG*/       /*printf("tport_getmsg:  Intermed:  keyout=%10u  same as keyin\n",
580                       trak[it].keyout );*/
581                trak[it].keyout = keyin;
582             }
583             else if( tmphd->start != FIRST_BYTE )
584             {
585/*DEBUG*/       /*printf("tport_getmsg:  Intermed:  keyout=%10u  does not point to FIRST_BYTE\n",
586                        trak[it].keyout );*/
587                trak[it].keyout = keyold;
588             }
589
590          /* else, make sure keyout's value is between keyold and keyin */
591             else if ( trak[it].keyout < keyold )
592             {
593                do {
594                    trak[it].keyout += shm->keymax;
595                } while ( trak[it].keyout < keyold );
596             }
597/*DEBUG*/    /*printf("tport_getmsg:     Reset:  keyout=%10u    keyold=%10u  keyin=%10u\n",
598                     trak[it].keyout, keyold, keyin );*/
599         }
600      }
601    /*fprintf( stdout,
602          "NOTICE: tport_getmsg; keyin wrapped, keyout(s) reset; Region %ld\n",
603           region->key );*/
604
605      goto findkey;
606   }
607
608
609/**** Find next message from requested type, module, instid ****/
610
611nextmsg:
612
613   while ( keyget < keyin )
614   {
615   /* make sure you haven't been lapped by tport_putmsg */
616       if ( keyget < shm->keyold ) keyget = shm->keyold;
617
618   /* load next header; make sure you weren't lapped */
619       ir = keyget % shm->keymax;
620       for ( ih=0 ; ih < sizeof(TPORT_HEAD) ; ih++ )
621       {
622          if ( ir >= shm->keymax )  ir -= shm->keymax;
623          h[ih] = ring[ir++];
624       }
625       if ( keyget < shm->keyold ) continue;  /*added 960612:ldd*/
626
627   /* make sure it starts at beginning of a header */
628       if ( hd.start != FIRST_BYTE )
629       {
630          fprintf( stdout,
631                  "ERROR: tport_getmsg; keyget not at FIRST_BYTE, Region %ld\n",
632                   region->key );
633          exit( 1 );
634       }
635       keyget += sizeof(TPORT_HEAD) + hd.size;
636
637   /* see if this msg matches any requested type */
638       for ( j=0 ; j < nget ; j++ )
639       {
640          if((getlogo[j].type   == hd.logo.type   || getlogo[j].type == WILD) &&
641             (getlogo[j].mod    == hd.logo.mod    || getlogo[j].mod  == WILD) &&
642             (getlogo[j].instid == hd.logo.instid || getlogo[j].instid == WILD) )
643          {
644
645/**** Found a message of requested logo; retrieve it! ****/
646        /* complain if retreived msg is too big */
647             if ( hd.size > maxsize )
648             {
649               *logo   = hd.logo;
650               *length = hd.size;
651                status = GET_TOOBIG;
652                goto trackit;    /*changed 960612:ldd*/
653             }
654        /* copy message by chunks to caller's address */
655             nwrap = ir + hd.size - shm->keymax;
656             if ( nwrap <= 0 )
657             {
658                memcpy( (void *) msg, (void *) &ring[ir], hd.size );
659             }
660             else
661             {
662                nfill = hd.size - nwrap;
663                memcpy( (void *) &msg[0],     (void *) &ring[ir], nfill );
664                memcpy( (void *) &msg[nfill], (void *) &ring[0],  nwrap );
665             }
666        /* see if we got run over by tport_putmsg while copying msg */
667        /* if we did, go back and try to get a msg cleanly          */
668             keyold = shm->keyold;
669             if ( keyold >= keyget )
670             {
671                keyget = keyold;
672                goto nextmsg;
673             }
674
675        /* set other returned variables */
676            *logo   = hd.logo;
677            *length = hd.size;
678
679trackit:
680        /* find msg logo in tracked list */
681             for ( it=0 ; it < nlogo ; it++ )
682             {
683                if ( region->key    != trak[it].memkey      )  continue;
684                if ( hd.logo.type   != trak[it].logo.type   )  continue;
685                if ( hd.logo.mod    != trak[it].logo.mod    )  continue;
686                if ( hd.logo.instid != trak[it].logo.instid )  continue;
687                /* activate sequence tracking if 1st msg */
688                if ( !trak[it].active )
689                {
690                    trak[it].seq    = hd.seq;
691                    trak[it].active = 1;
692                }
693                goto sequence;
694             }
695        /* new logo, track it if there's room */
696             if ( nlogo == NTRACK_GET )
697             {
698                fprintf( stdout,
699                     "ERROR: tport_getmsg; exceeded NTRACK_GET\n");
700                if( status != GET_TOOBIG ) status = GET_NOTRACK; /*changed 960612:ldd*/
701                goto wrapup;
702             }
703             it = nlogo;
704             trak[it].memkey = region->key;
705             trak[it].logo   = hd.logo;
706             trak[it].seq    = hd.seq;
707             trak[it].active = 1;      /*960618:ldd*/
708             nlogo++;
709
710sequence:
711        /* check if sequence #'s match; update sequence # */
712             if ( status == GET_TOOBIG   )  goto wrapup; /*added 960612:ldd*/
713             if ( hd.seq != trak[it].seq )
714             {
715                status = GET_MISS;
716                trak[it].seq = hd.seq;
717             }
718             trak[it].seq++;
719
720        /* Ok, we're finished grabbing this one */
721             goto wrapup;
722
723          } /* end if of logo & getlogo match */
724       }    /* end for over getlogo */
725   }        /* end while over ring */
726
727/**** If you got here, there were no messages of requested logo(s) ****/
728
729   status = GET_NONE;
730
731/**** update outpointer (->msg after retrieved one) for all requested logos ****/
732
733wrapup:
734   for ( it=0 ; it < nlogo ; it++ )  /* for all message logos we're tracking */
735   {
736       if ( trak[it].memkey != region->key ) continue;
737       for ( j=0 ; j < nget ; j++ )  /* for all requested message logos */
738       {
739          if((getlogo[j].type   == trak[it].logo.type   || getlogo[j].type==WILD) &&
740             (getlogo[j].mod    == trak[it].logo.mod    || getlogo[j].mod==WILD)  &&
741             (getlogo[j].instid == trak[it].logo.instid || getlogo[j].instid==WILD) )
742          {
743             trak[it].keyout = keyget;
744          }
745       }
746    }
747
748   return( status );
749
750}
751
752
753/********************* function tport_putflag ************************/
754/*           Puts the kill flag into a shared memory region.         */
755/*********************************************************************/
756
757void tport_putflag( SHM_INFO *region,  /* shared memory info structure     */
758                    int       flag )   /* tells attached processes to exit */
759{
760   SHM_HEAD  *shm;
761
762   shm = region->addr;
763   shm->flag = flag;
764   return;
765}
766
767
768
769/*********************** function tport_getflag **********************/
770/*         Returns the kill flag from a shared memory region.        */
771/*********************************************************************/
772
773int tport_getflag( SHM_INFO *region )
774
775{
776   SHM_HEAD  *shm;
777
778   shm = region->addr;
779   return( (int)shm->flag );
780}
781
782
783/************************** tport_bufthr ****************************/
784/*     Thread to buffer input from one transport ring to another.   */
785/********************************************************************/
786void tport_bufthr( void *dummy )
787{
788   char          errnote[150];
789   MSG_LOGO      logo;
790   long          msgsize;
791   unsigned char msgseq;
792   int           res1, res2;
793   int           gotmsg;
794   HANDLE myHandle = GetCurrentThread();
795
796/* Reset my own thread priority
797   ****************************/
798   if ( SetThreadPriority( myHandle, THREAD_PRIORITY_TIME_CRITICAL ) == 0 )
799   {
800      printf( "Error setting buffer thread priority: %d\n", GetLastError() );
801      exit( -1 );
802   }
803
804/* Flush all existing messages from the public memory region
805   *********************************************************/
806   while( tport_copyfrom((SHM_INFO *) PubRegion, (MSG_LOGO *) Getlogo,
807                          Nget, &logo, &msgsize, (char *) Message,
808                          MaxMsgSize, &msgseq )  !=  GET_NONE  );
809
810   while ( 1 )
811   {
812      Sleep( 500 );
813
814/* If a terminate flag is found, copy it to the private ring.
815   Then, terminate this thread.
816   *********************************************************/
817      if ( tport_getflag( (SHM_INFO *) PubRegion ) == TERMINATE ||
818           tport_getflag( (SHM_INFO *) BufRegion ) == TERMINATE   )
819      {
820         tport_putflag( (SHM_INFO *) BufRegion, TERMINATE );
821         _endthread();
822      }
823
824/* Try to copy a message from the public memory region
825   ***************************************************/
826      do
827      {
828          res1 = tport_copyfrom((SHM_INFO *) PubRegion, (MSG_LOGO *) Getlogo,
829                                Nget, &logo, &msgsize, (char *) Message,
830                                MaxMsgSize, &msgseq );
831          gotmsg = 1;
832
833/* Handle return values
834   ********************/
835          switch ( res1 )
836          {
837          case GET_MISS_LAPPED:
838                sprintf( errnote,
839                        "tport_bufthr: Missed msg(s)  c%d m%d t%d  Overwritten, region:%ld.",
840                         (int) logo.instid, (int) logo.mod, (int) logo.type,
841                         PubRegion->key );
842                tport_buferror( ERR_LAPPED, errnote );
843                break;
844          case GET_MISS_SEQGAP:
845                sprintf( errnote,
846                        "tport_bufthr: Missed msg(s)  c%d m%d t%d  Sequence gap, region:%ld.",
847                         (int) logo.instid, (int) logo.mod, (int) logo.type,
848                         PubRegion->key );
849                tport_buferror( ERR_SEQGAP, errnote );
850                break;
851          case GET_NOTRACK:
852                sprintf( errnote,
853                        "tport_bufthr: Logo c%d m%d t%d not tracked; NTRACK_GET exceeded.",
854                        (int) logo.instid, (int) logo.mod, (int) logo.type );
855                tport_buferror( ERR_UNTRACKED, errnote );
856          case GET_OK:
857                break;
858          case GET_TOOBIG:
859                sprintf( errnote,
860                        "tport_bufthr: msg[%ld] c%d m%d t%d seq%d too big; skipped in region:%ld.",
861                         msgsize, (int) logo.instid, (int) logo.mod,
862                         (int) logo.type, (int) msgseq, PubRegion->key );
863                tport_buferror( ERR_OVERFLOW, errnote );
864          case GET_NONE:
865                gotmsg = 0;
866                break;
867          }
868
869/* If you did get a message, copy it to private ring
870   *************************************************/
871          if ( gotmsg )
872          {
873              res2 = tport_copyto( (SHM_INFO *) BufRegion, &logo,
874                                   msgsize, (char *) Message, msgseq );
875              switch (res2)
876              {
877              case PUT_TOOBIG:
878                 sprintf( errnote,
879                     "tport_bufthr: msg[%ld] (c%d m%d t%d) too big for Region:%ld.",
880                      msgsize, (int) logo.instid, (int) logo.mod, (int) logo.type,
881                      BufRegion->key );
882                 tport_buferror( ERR_OVERFLOW, errnote );
883              case PUT_OK:
884                 break;
885              }
886          }
887      } while ( res1 != GET_NONE );
888   }
889}
890
891
892/************************** tport_buffer ****************************/
893/*       Function to initialize the input buffering thread          */
894/********************************************************************/
895int tport_buffer( SHM_INFO  *region1,      /* transport ring             */
896                  SHM_INFO  *region2,      /* private ring               */
897                  MSG_LOGO  *getlogo,      /* array of logos to copy     */
898                  short      nget,         /* number of logos in getlogo */
899                  unsigned   maxMsgSize,   /* size of message buffer     */
900                  unsigned char module,    /* module id of main thread   */
901                  unsigned char instid )   /* instid id of main thread   */
902{
903   unsigned long thread_id;            /* Thread id of the buffer thread */
904
905/* Allocate message buffer
906   ***********************/
907   Message = (char *) malloc( maxMsgSize );
908   if ( Message == NULL )
909   {
910      fprintf( stdout, "tport_buffer: Error allocating message buffer\n" );
911      return -1;
912   }
913
914/* Copy function arguments to global variables
915   *******************************************/
916   PubRegion   = region1;
917   BufRegion   = region2;
918   Getlogo     = getlogo;
919   Nget        = nget;
920   MaxMsgSize  = maxMsgSize;
921   MyModuleId  = module;
922   MyInstid    = instid;
923
924/* Lookup message type for error messages
925   **************************************/
926   if ( GetType( "TYPE_ERROR", &TypeError ) != 0 )
927   {
928      fprintf( stderr,
929              "tport_buffer: Invalid message type <TYPE_ERROR>\n" );
930      return -1;
931   }
932
933/* Start the input buffer thread
934   *****************************/
935   thread_id = _beginthread( tport_bufthr, 0, NULL );
936
937   if ( thread_id == -1 )                /* Couldn't create thread */
938   {
939      fprintf( stderr, "tport_buffer: Can't start the buffer thread." );
940      return -1;
941   }
942   return 0;
943}
944
945
946/********************** function tport_copyfrom *********************/
947/*      get a message out of public shared memory; save the         */
948/*     sequence number from the transport layer, with the intent    */
949/*       of copying it to a private (buffering) memory ring         */
950/********************************************************************/
951
952int tport_copyfrom( SHM_INFO  *region,   /* info structure for memory region */
953                    MSG_LOGO  *getlogo,  /* requested logo(s)                */
954                    short      nget,     /* number of logos in getlogo       */
955                    MSG_LOGO  *logo,     /* logo of retrieved message        */
956                    long      *length,   /* size of retrieved message        */
957                    char      *msg,      /* retrieved message                */
958                    long       maxsize,  /* max length for retrieved message */
959                    unsigned char *seq ) /* TPORT_HEAD seq# of retrieved msg */
960{
961   static MSG_TRACK  trak[NTRACK_GET]; /* sequence #, outpointer keeper     */
962   static int        nlogo;            /* # modid,type,instid combos so far */
963   int               it;               /* index into trak                   */
964   SHM_HEAD         *shm;              /* pointer to start of memory region */
965   char             *ring;             /* pointer to ring part of memory    */
966   TPORT_HEAD       *tmphd;            /* temp pointer into shared memory   */
967   unsigned long     ir;               /* index into the ring               */
968   long              nfill;            /* bytes from ir to ring's last-byte */
969   long              nwrap;            /* bytes to grab from front of ring  */
970   TPORT_HEAD        hd;               /* transport header from memory      */
971   char             *h;                /* pointer to transport layer header */
972   int               ih;               /* index into the transport header   */
973   unsigned long     keyin;            /* in-pointer to shared memory       */
974   unsigned long     keyold;           /* oldest complete message in memory */
975   unsigned long     keyget;           /* pointer at which to start search  */
976   int               status = GET_OK;  /* how did retrieval go?             */
977   int               lapped = 0;       /* = 1 if memory ring has been over- */
978                                       /* written since last tport_copyfrom */
979   int               trakked;          /* flag for trakking list entries    */
980   int               i,j;
981
982/**** Get the pointers set up ****/
983
984   shm  = region->addr;
985   ring = (char *) shm + sizeof(SHM_HEAD);
986   h    = (char *) (&hd);
987
988/**** First time around, initialize sequence counters, outpointers ****/
989
990   if (Copyfrom_Init)
991   {
992       nlogo = 0;
993
994       for( i=0 ; i < NTRACK_GET ; i++ )
995       {
996          trak[i].memkey      = 0;
997          trak[i].logo.type   = 0;
998          trak[i].logo.mod    = 0;
999          trak[i].logo.instid = 0;
1000          trak[i].seq         = 0;
1001          trak[i].keyout      = 0;
1002          trak[i].active      = 0; /*960618:ldd*/
1003       }
1004       Copyfrom_Init = 0;
1005   }
1006
1007/**** make sure all requested logos are entered in tracking list ****/
1008
1009   for ( j=0 ; j < nget ; j++ )  /* for all requested message logos */
1010   {
1011       trakked = 0;  /* assume it's not being trakked */
1012       for( it=0 ; it < nlogo ; it++ )  /* for all logos we're tracking */
1013       {
1014          if( region->key       != trak[it].memkey      ) continue;
1015          if( getlogo[j].type   != trak[it].logo.type   ) continue;
1016          if( getlogo[j].mod    != trak[it].logo.mod    ) continue;
1017          if( getlogo[j].instid != trak[it].logo.instid ) continue;
1018          trakked = 1;  /* found it in the trakking list! */
1019          break;
1020       }
1021       if( trakked ) continue;
1022    /* Make an entry in trak for this logo; if there's room */
1023       if ( nlogo < NTRACK_GET )
1024       {
1025          it = nlogo;
1026          trak[it].memkey = region->key;
1027          trak[it].logo   = getlogo[j];
1028          nlogo++;
1029       }
1030   }
1031
1032/**** find latest starting index to look for any of the requested logos ****/
1033
1034findkey:
1035
1036   keyget = 0;
1037
1038   for ( it=0 ; it < nlogo ; it++ )  /* for all message logos we're tracking */
1039   {
1040       if ( trak[it].memkey != region->key ) continue;
1041       for ( j=0 ; j < nget ; j++ )  /* for all requested message logos */
1042       {
1043          if((getlogo[j].type   == trak[it].logo.type   || getlogo[j].type==WILD) &&
1044             (getlogo[j].mod    == trak[it].logo.mod    || getlogo[j].mod==WILD)  &&
1045             (getlogo[j].instid == trak[it].logo.instid || getlogo[j].instid==WILD) )
1046          {
1047             if ( trak[it].keyout > keyget )  keyget = trak[it].keyout;
1048          }
1049       }
1050   }
1051
1052/**** make sure you haven't been lapped by tport_copyto or tport_putmsg ****/
1053   if ( keyget < shm->keyold ) {
1054      keyget = shm->keyold;
1055      lapped = 1;
1056   }
1057
1058/**** See if keyin and keyold were wrapped and reset by tport_putmsg; ****/
1059/****       If so, reset trak[xx].keyout and go back to findkey       ****/
1060
1061   keyin = shm->keyin;
1062   if ( keyget > keyin )
1063   {
1064      keyold = shm->keyold;
1065      for ( it=0 ; it < nlogo ; it++ )
1066      {
1067         if( trak[it].memkey == region->key )
1068         {
1069          /* reset keyout */
1070/*DEBUG*/    /*printf("tport_copyfrom: Pre-reset:  keyout=%10u    keyold=%10u  keyin=%10u\n",
1071                     trak[it].keyout, keyold, keyin );*/
1072             trak[it].keyout = trak[it].keyout % shm->keymax;
1073/*DEBUG*/    /*printf("tport_copyfrom:  Intermed:  keyout=%10u    keyold=%10u  keyin=%10u\n",
1074                     trak[it].keyout, keyold, keyin );*/
1075
1076          /* make sure new keyout points to keyin or to a msg's first-byte; */
1077          /* if not, we've been lapped, so set keyout to keyold             */
1078             ir    = trak[it].keyout;
1079             tmphd = (TPORT_HEAD *) &ring[ir];
1080             if ( trak[it].keyout == keyin   ||
1081                  (keyin-trak[it].keyout)%shm->keymax == 0 )
1082             {
1083/*DEBUG*/       /*printf("tport_copyfrom:  Intermed:  keyout=%10u  same as keyin\n",
1084                        trak[it].keyout );*/
1085                trak[it].keyout = keyin;
1086             }
1087             else if( tmphd->start != FIRST_BYTE )
1088             {
1089/*DEBUG*/       /*printf("tport_copyfrom:  Intermed:  keyout=%10u  does not point to FIRST_BYTE\n",
1090                        trak[it].keyout );*/
1091                trak[it].keyout = keyold;
1092                lapped = 1;
1093             }
1094
1095          /* else, make sure keyout's value is between keyold and keyin */
1096             else if ( trak[it].keyout < keyold )
1097             {
1098                do {
1099                    trak[it].keyout += shm->keymax;
1100                } while ( trak[it].keyout < keyold );
1101             }
1102/*DEBUG*/    /*printf("tport_copyfrom:     Reset:  keyout=%10u    keyold=%10u  keyin=%10u\n",
1103                     trak[it].keyout, keyold, keyin );*/
1104         }
1105      }
1106    /*fprintf( stdout,
1107          "NOTICE: tport_copyfrom; keyin wrapped, keyout(s) reset; Region %ld\n",
1108           region->key );*/
1109
1110      goto findkey;
1111   }
1112
1113
1114/**** Find next message from requested type, module, instid ****/
1115
1116nextmsg:
1117
1118   while ( keyget < keyin )
1119   {
1120   /* make sure you haven't been lapped by tport_copyto or tport_putmsg */
1121       if ( keyget < shm->keyold ) {
1122          keyget = shm->keyold;
1123          lapped = 1;
1124       }
1125
1126   /* load next header; make sure you weren't lapped */
1127       ir = keyget % shm->keymax;
1128       for ( ih=0 ; ih < sizeof(TPORT_HEAD) ; ih++ )
1129       {
1130          if ( ir >= shm->keymax )  ir -= shm->keymax;
1131          h[ih] = ring[ir++];
1132       }
1133       if ( keyget < shm->keyold ) continue;  /*added 960612:ldd*/
1134
1135   /* make sure it starts at beginning of a header */
1136       if ( hd.start != FIRST_BYTE )
1137       {
1138          fprintf( stdout,
1139                  "ERROR: tport_copyfrom; keyget not at FIRST_BYTE, Region %ld\n",
1140                   region->key );
1141          exit( 1 );
1142       }
1143       keyget += sizeof(TPORT_HEAD) + hd.size;
1144
1145   /* see if this msg matches any requested type */
1146       for ( j=0 ; j < nget ; j++ )
1147       {
1148          if((getlogo[j].type   == hd.logo.type   || getlogo[j].type == WILD) &&
1149             (getlogo[j].mod    == hd.logo.mod    || getlogo[j].mod  == WILD) &&
1150             (getlogo[j].instid == hd.logo.instid || getlogo[j].instid == WILD) )
1151          {
1152
1153/**** Found a message of requested logo; retrieve it! ****/
1154        /* complain if retreived msg is too big */
1155             if ( hd.size > maxsize )
1156             {
1157               *logo   = hd.logo;
1158               *length = hd.size;
1159               *seq    = hd.seq;
1160                status = GET_TOOBIG;
1161                goto trackit;    /*changed 960612:ldd*/
1162             }
1163        /* copy message by chunks to caller's address */
1164             nwrap = ir + hd.size - shm->keymax;
1165             if ( nwrap <= 0 )
1166             {
1167                memcpy( (void *) msg, (void *) &ring[ir], hd.size );
1168             }
1169             else
1170             {
1171                nfill = hd.size - nwrap;
1172                memcpy( (void *) &msg[0],     (void *) &ring[ir], nfill );
1173                memcpy( (void *) &msg[nfill], (void *) &ring[0],  nwrap );
1174             }
1175        /* see if we got lapped by tport_copyto or tport_putmsg while copying msg */
1176        /* if we did, go back and try to get a msg cleanly          */
1177             keyold = shm->keyold;
1178             if ( keyold >= keyget )
1179             {
1180                keyget = keyold;
1181                lapped = 1;
1182                goto nextmsg;
1183             }
1184
1185        /* set other returned variables */
1186            *logo   = hd.logo;
1187            *length = hd.size;
1188            *seq    = hd.seq;
1189
1190trackit:
1191        /* find logo in tracked list */
1192             for ( it=0 ; it < nlogo ; it++ )
1193             {
1194                if ( region->key    != trak[it].memkey      )  continue;
1195                if ( hd.logo.type   != trak[it].logo.type   )  continue;
1196                if ( hd.logo.mod    != trak[it].logo.mod    )  continue;
1197                if ( hd.logo.instid != trak[it].logo.instid )  continue;
1198                /* activate sequence tracking if 1st msg */
1199                if ( !trak[it].active )
1200                {
1201                    trak[it].seq    = hd.seq;
1202                    trak[it].active = 1;
1203                }
1204                goto sequence;
1205             }
1206        /* new logo, track it if there's room */
1207             if ( nlogo == NTRACK_GET )
1208             {
1209                fprintf( stdout,
1210                     "ERROR: tport_copyfrom; exceeded NTRACK_GET\n");
1211                if( status != GET_TOOBIG ) status = GET_NOTRACK; /*changed 960612:ldd*/
1212                goto wrapup;
1213             }
1214             it = nlogo;
1215             trak[it].memkey = region->key;
1216             trak[it].logo   = hd.logo;
1217             trak[it].seq    = hd.seq;
1218             trak[it].active = 1;      /*960618:ldd*/
1219             nlogo++;
1220
1221sequence:
1222        /* check if sequence #'s match; update sequence # */
1223             if ( status == GET_TOOBIG   )  goto wrapup; /*added 960612:ldd*/
1224             if ( hd.seq != trak[it].seq )
1225             {
1226                if (lapped)  status = GET_MISS_LAPPED;
1227                else         status = GET_MISS_SEQGAP;
1228                trak[it].seq = hd.seq;
1229             }
1230             trak[it].seq++;
1231
1232        /* Ok, we're finished grabbing this one */
1233             goto wrapup;
1234
1235          } /* end if of logo & getlogo match */
1236       }    /* end for over getlogo */
1237   }        /* end while over ring */
1238
1239/**** If you got here, there were no messages of requested logo(s) ****/
1240
1241   status = GET_NONE;
1242
1243/**** update outpointer (->msg after retrieved one) for all requested logos ****/
1244
1245wrapup:
1246   for ( it=0 ; it < nlogo ; it++ )  /* for all message logos we're tracking */
1247   {
1248       if ( trak[it].memkey != region->key ) continue;
1249       for ( j=0 ; j < nget ; j++ )  /* for all requested message logos */
1250       {
1251          if((getlogo[j].type   == trak[it].logo.type   || getlogo[j].type==WILD) &&
1252             (getlogo[j].mod    == trak[it].logo.mod    || getlogo[j].mod==WILD)  &&
1253             (getlogo[j].instid == trak[it].logo.instid || getlogo[j].instid==WILD) )
1254          {
1255             trak[it].keyout = keyget;
1256          }
1257       }
1258    }
1259
1260   return( status );
1261
1262}
1263
1264
1265/*********************** function tport_copyto ***********************/
1266/*           Puts a message into a shared memory region.             */
1267/*    Preserves the sequence number (passed as argument) as the      */
1268/*                transport layer sequence number                    */
1269/*********************************************************************/
1270
1271int tport_copyto( SHM_INFO    *region,  /* info structure for memory region     */
1272                  MSG_LOGO    *putlogo, /* type, module, instid of incoming msg */
1273                  long         length,  /* size of incoming message             */
1274                  char        *msg,     /* pointer to incoming message          */
1275                  unsigned char seq )   /* preserve as sequence# in TPORT_HEAD  */
1276{
1277   SHM_HEAD         *shm;              /* pointer to start of memory region   */
1278   char             *ring;             /* pointer to ring part of memory      */
1279   unsigned long     ir;               /* index into memory ring              */
1280   long              nfill;            /* # bytes from ir to ring's last-byte */
1281   long              nwrap;            /* # bytes to wrap to front of ring    */
1282   TPORT_HEAD        hd;               /* transport layer header to put       */
1283   char             *h;                /* pointer to transport layer header   */
1284   TPORT_HEAD        old;              /* transport header of oldest msg      */
1285   char             *o;                /* pointer to oldest transport header  */
1286   int j;
1287   int retval = PUT_OK;                /* return value for this function      */
1288
1289/**** First time around, initialize semaphore controls ****/
1290
1291   if (Copyto_Init)
1292   {
1293       Copyto_Init  = 0;
1294   }
1295
1296/**** Set up pointers for shared memory, etc. ****/
1297
1298   shm  = region->addr;
1299   ring = (char *) shm + sizeof(SHM_HEAD);
1300   h    = (char *) (&hd);
1301   o    = (char *) (&old);
1302
1303/**** First, see if the incoming message will fit in the memory region ****/
1304
1305   if ( length + sizeof(TPORT_HEAD) > shm->keymax )
1306   {
1307      fprintf( stdout,
1308              "ERROR: tport_copyto; message too large (%ld) for Region %ld\n",
1309               length, region->key);
1310      return( PUT_TOOBIG );
1311   }
1312
1313/**** Change semaphore to let others know you're using memory ****/
1314
1315   WaitForSingleObject( region->hMutex, INFINITE );
1316
1317/**** Store everything you need in the transport header ****/
1318
1319   hd.start = FIRST_BYTE;
1320   hd.size  = length;
1321   hd.logo  = *putlogo;
1322   hd.seq   = seq;
1323
1324/**** First see if keyin will wrap; if so, reset both keyin and keyold ****/
1325
1326   if ( shm->keyin + sizeof(TPORT_HEAD) + length  <  shm->keyold )
1327   {
1328       shm->keyin  = shm->keyin  % shm->keymax;
1329       shm->keyold = shm->keyold % shm->keymax;
1330       if ( shm->keyin <= shm->keyold ) shm->keyin += shm->keymax;
1331     /*fprintf( stdout,
1332               "NOTICE: tport_copyto; keyin wrapped & reset; Region %ld\n",
1333                region->key );*/
1334   }
1335
1336/**** Then see if there's enough room for new message in shared memory ****/
1337/****      If not, "delete" oldest messages until there's room         ****/
1338
1339   while( shm->keyin + sizeof(TPORT_HEAD) + length - shm->keyold > shm->keymax )
1340   {
1341      ir = shm->keyold % shm->keymax;
1342      if ( ring[ir] != FIRST_BYTE )
1343      {
1344          fprintf( stdout,
1345                  "ERROR: tport_copyto; keyold not at FIRST_BYTE, Region %ld\n",
1346                   region->key );
1347          retval = TPORT_FATAL;
1348          goto release_semaphore;
1349      }
1350      for ( j=0 ; j < sizeof(TPORT_HEAD) ; j++ )
1351      {
1352         if ( ir >= shm->keymax )   ir -= shm->keymax;
1353         o[j] = ring[ir++];
1354      }
1355      shm->keyold += sizeof(TPORT_HEAD) + old.size;
1356   }
1357
1358/**** Now copy transport header into shared memory by chunks... ****/
1359
1360   ir = shm->keyin % shm->keymax;
1361   nwrap = ir + sizeof(TPORT_HEAD) - shm->keymax;
1362   if ( nwrap <= 0 )
1363   {
1364         memcpy( (void *) &ring[ir], (void *) h, sizeof(TPORT_HEAD) );
1365   }
1366   else
1367   {
1368         nfill = sizeof(TPORT_HEAD) - nwrap;
1369         memcpy( (void *) &ring[ir], (void *) &h[0],     nfill );
1370         memcpy( (void *) &ring[0],  (void *) &h[nfill], nwrap );
1371   }
1372   ir += sizeof(TPORT_HEAD);
1373   if ( ir >= shm->keymax )  ir -= shm->keymax;
1374
1375/**** ...and copy message into shared memory by chunks ****/
1376
1377   nwrap = ir + length - shm->keymax;
1378   if ( nwrap <= 0 )
1379   {
1380         memcpy( (void *) &ring[ir], (void *) msg, length );
1381   }
1382   else
1383   {
1384         nfill = length - nwrap;
1385         memcpy( (void *) &ring[ir], (void *) &msg[0],     nfill );
1386         memcpy( (void *) &ring[0],  (void *) &msg[nfill], nwrap );
1387   }
1388   shm->keyin += sizeof(TPORT_HEAD) + length;
1389
1390/**** Finished with shared memory, let others know via semaphore ****/
1391
1392release_semaphore:
1393   ReleaseMutex(region->hMutex);
1394
1395   if( retval == TPORT_FATAL ) exit( 1 );
1396   return( retval );
1397}
1398
1399
1400/************************* tport_buferror ***************************/
1401/*  Build an error message and put it in the public memory region   */
1402/********************************************************************/
1403void tport_buferror( short  ierr,       /* 2-byte error word       */
1404                     char  *note  )     /* string describing error */
1405{
1406        MSG_LOGO    logo;
1407        char        msg[256];
1408        long        size;
1409        time_t      t;
1410
1411        logo.instid = MyInstid;
1412        logo.mod    = MyModuleId;
1413        logo.type   = TypeError;
1414
1415        time( &t );
1416        sprintf( msg, "%ld %hd %s\n", (long) t, ierr, note );
1417        size = strlen( msg );   /* don't include the null byte in the message */
1418
1419        if ( tport_putmsg( (SHM_INFO *) PubRegion, &logo, size, msg ) != PUT_OK )
1420        {
1421            printf("tport_bufthr:  Error sending error:%hd for module:%d.\n",
1422                    ierr, MyModuleId );
1423        }
1424        return;
1425}
1426
1427
1428/************************ function tport_syserr **********************/
1429/*                 Print a system error and terminate.               */
1430/*********************************************************************/
1431
1432void tport_syserr( char *msg,   /* message to print (which routine had an error) */
1433                   long  key )  /* identifies which memory region had the error  */
1434{
1435   extern int   sys_nerr;
1436/*   extern char *sys_errlist[];*/
1437
1438   long err = GetLastError();   /* Override with per thread err */
1439
1440   fprintf( stdout, "ERROR (tport_ calls): %s (%d", msg, err );
1441   fprintf( stdout, "; %s) Region: %ld\n", strerror(err), key);
1442
1443/*   if ( err > 0 && err < sys_nerr )
1444      fprintf( stdout,"; %s) Region: %ld\n", sys_errlist[err], key );
1445   else
1446      fprintf( stdout, ") Region: %ld\n", key ); */
1447
1448   exit( 1 );
1449}
1450
Note: See TracBrowser for help on using the repository browser.