source: trunk/src/libsrc/util/xfrm.c @ 7106

Revision 7106, 48.1 KB checked in by baker, 10 months ago (diff)

fix 'void' function returning a value on Windows

  • Property svn:eol-style set to native
Line 
1/*
2 *   THIS FILE IS UNDER RCS - DO NOT MODIFY UNLESS YOU HAVE
3 *   CHECKED IT OUT USING THE COMMAND CHECKOUT.
4 *
5 *    $Id: xfrm.c,v 1.1 2009/12/16 12:00:00 scott Exp $
6 *
7 *    Revision history:
8 *     $Log: xfrm.c,v $
9 *
10 *     Revision 1.1  2009/12/16 12:00:00 scott
11 *     Initial revision
12 *
13 *
14 */
15
16/*
17 * xfrm.c: 
18 *
19 *
20 *  Initial version:
21 *     
22 */
23
24#include <stdio.h>
25#include <stdlib.h>
26#include <string.h>
27#include <sys/types.h>
28#include <time.h>
29#ifdef _WINNT
30#include <windows.h>
31#define mutex_t HANDLE
32#else
33#ifdef _SOLARIS
34#ifndef _LINUX
35#include <synch.h>      /* mutex's                                      */
36#endif
37#endif
38#endif
39
40#include "earthworm.h"
41#include "kom.h"
42#include "swap.h"
43#include "trheadconv.h" /* trheadconv()                                 */
44#include "xfrm.h"
45
46#define SCNL_INC     100      /* how many more are allocated each     */
47                                /*   time we run out                    */
48#define NUM_BASE_CMDS 12
49#define CMD_ID_GETSCNL       8
50#define CMD_ID_GETWAVESFROM 11
51
52static char cnull = '\0';
53static int  maxSta = 0;     /* number of scnls allocated   */
54static char **BaseCmd=NULL; /* array of command definitions */
55static char **ModCmds;      /* array of module-specific command defs */
56static int  CmdCount;       /* number of command definitions */
57static void **ParamTarget=NULL; /* array of addrs for value of matching commands */
58static void **ModParamTargets; /* ParamTargets for ModCmds */
59/* length of message buffer */
60static long MsgBufLen = (MAX_TRACEBUF_SIZ + sizeof (TP_PREFIX));
61
62
63static void InitializeParameters( XFRMWORLD* pXfrm )
64{
65  int i;
66
67  /*    Initialize members of the XFRMWORLD structure                    */
68  pXfrm->completionFcn          = NULL;
69
70  /*    Initialize members of the xfrmEWH structure                      */
71  pXfrm->xfrmEWH->myInstId = 0;
72  pXfrm->xfrmEWH->myModId  = 0;
73  pXfrm->xfrmEWH->typeError     = 0;
74  pXfrm->xfrmEWH->typeHeartbeat = 0;
75  pXfrm->xfrmEWH->ringInKey     = 0l;
76  pXfrm->xfrmEWH->ringOutKey    = 0l;
77  pXfrm->xfrmEWH->typeTrace     = 0;
78  pXfrm->xfrmEWH->typeTrace2    = 0;
79
80  /*    Initialize members of the xfrmParam structure                    */
81  for( i=0; i<MAX_LOGO; i++ ) {
82     pXfrm->xfrmParam->readInstName[i][0] = '\0';
83     pXfrm->xfrmParam->readModName[i][0] = '\0';
84     pXfrm->xfrmParam->readTypeName[i][0] = '\0';
85  }
86  pXfrm->xfrmParam->ringIn[0]      = '\0';
87  pXfrm->xfrmParam->ringOut[0]     = '\0';
88  pXfrm->xfrmParam->nlogo          = 0;
89  pXfrm->xfrmParam->heartbeatInt   = 15;
90  pXfrm->xfrmParam->debug          = 0;
91  pXfrm->xfrmParam->logSwitch      = 1;
92  pXfrm->xfrmParam->cleanStart     = 1;
93  pXfrm->xfrmParam->testMode       = 0;
94  pXfrm->xfrmParam->maxGap         = 1.5;
95  pXfrm->xfrmParam->getAllSCNLs    = 0;
96
97  InitializeXfrmParameters();
98 
99  return;
100}
101
102
103static int IsWild( char *str )
104{
105   if( strcmp( str, "*" ) == 0 ) return 1;
106   return 0;
107}
108
109/*
110 * Determines if the station name is valid.
111 * In:  pxfrm pointer to the Xfrm World structure
112 *      outputFlag true if output, false if input
113 *      str the station name
114 * Returns non-zero if valid, 0 if invalid
115 */
116static int IsValidSta(XFRMWORLD* pXfrm, int outputFlag, char *str) {
117    if( !str  ||  strlen(str) >= TRACE2_STA_LEN  ||  IsWild(str) ) {
118        if( !str ) str = &cnull;
119        logit ("e", "%s: Invalid %s station code: %s "
120                    "in GetSCNL cmd; exiting!\n",
121                    (outputFlag ? "output" : "input"),
122                    pXfrm->mod_name, str);
123        return 0;
124    }
125    return 1;
126}
127
128/*
129 * Determines if the channel name is valid.
130 * In:  pxfrm pointer to the Xfrm World structure
131 *      outputFlag true if output, false if input
132 *      str the channel name
133 * Returns non-zero if valid, 0 if invalid
134 */
135static int IsValidChan(XFRMWORLD* pXfrm, int outputFlag, char *str) {
136    if( !str  ||  strlen(str) >= TRACE2_CHAN_LEN  ||  IsWild(str)) {
137        if( !str ) str = &cnull;
138        logit ("e", "%s: Invalid %s channel code: %s "
139                    "in GetSCNL cmd; exiting!\n",
140                                (outputFlag ? "output" : "input"),
141                                pXfrm->mod_name, str);
142        return 0;
143    }
144    return 1;
145}
146
147/*
148 * Determines if the network name is valid.
149 * In:  pxfrm pointer to the Xfrm World structure
150 *      outputFlag true if output, false if input
151 *      str the network name
152 * Returns non-zero if valid, 0 if invalid
153 */
154static int IsValidNet(XFRMWORLD* pXfrm, int outputFlag, char *str) {
155    if( !str  ||  strlen(str) >= TRACE2_NET_LEN  ||  IsWild(str)) {
156        if( !str ) str = &cnull;
157        logit ("e", "%s: Invalid %s network code: %s "
158                    "in GetSCNL cmd; exiting!\n",
159                    (outputFlag ? "output" : "input"),
160                    pXfrm->mod_name, str);
161        return 0;
162    }
163    return 1;
164}
165
166/*
167 * Determines if the location name is valid.
168 * In:  pxfrm pointer to the Xfrm World structure
169 *      outputFlag true if output, false if input
170 *      str the location name
171 * Returns non-zero if valid, 0 if invalid
172 */
173static int IsValidLoc(XFRMWORLD* pXfrm, int outputFlag, char *str) {
174    if( !str  ||  strlen(str) >= TRACE2_LOC_LEN  ||  IsWild(str)) {
175        if( !str ) str = &cnull;
176        logit ("e", "%s: Invalid %s location code: %s "
177                    "in GetSCNL cmd; exiting!\n",
178                    (outputFlag ? "output" : "input"),
179                    pXfrm->mod_name, str);
180        return 0;
181    }
182    return 1;
183}
184
185/*
186 * Set the output SCNL
187 * In:  newSCNL the new SCNL
188 *      outSCNL the output SCNL or NULL if none
189 */
190static void SetOutSCNL(XFRMSCNL *newSCNL, SCNL *outSCNL) {
191
192  if ( outSCNL == NULL )
193    return;
194
195  strncpy( newSCNL->outSta,  outSCNL->sta,  TRACE2_STA_LEN );
196  newSCNL->outSta[TRACE2_STA_LEN - 1]   = '\0';
197  strncpy( newSCNL->outChan, outSCNL->chan, TRACE2_CHAN_LEN );
198  newSCNL->outChan[TRACE2_CHAN_LEN - 1] = '\0';
199  strncpy( newSCNL->outNet,  outSCNL->net,  TRACE2_NET_LEN );
200  newSCNL->outNet[TRACE2_NET_LEN - 1]   = '\0';
201  strncpy( newSCNL->outLoc,  outSCNL->loc,  TRACE2_LOC_LEN );
202  newSCNL->outLoc[TRACE2_LOC_LEN - 1]   = '\0';
203
204}
205
206/*
207 * Allocate the message buffer.
208 * Returns the message buffer
209 */
210static char* AllocateMsgBuf() {
211        return (char *) malloc ((size_t) MsgBufLen);
212}
213
214/*
215 * Get the XFRMSCNL for the station
216 * In:  pxfrm pointer to the Xfrm World structure
217 *      jSta the station index
218 * Returns the XFRMSCNL
219 */
220static XFRMSCNL* GetXFRMSCNLForSta(XFRMWORLD* pXfrm, int jSta) {
221        return (XFRMSCNL *)(pXfrm->scnls + jSta*(pXfrm->scnlRecSize));
222}
223
224/*
225 * Get the input message buffer for the station
226 * In:  pxfrm pointer to the Xfrm World structure
227 *      jSta the station index
228 * Returns the input message buffer
229 */
230static char* GetInMsgBufForSta(XFRMWORLD* pXfrm, int jSta) {
231        return GetXFRMSCNLForSta(pXfrm, jSta)->inMsgBuf;
232}
233
234/*
235 * Get the input buffer for the message buffer.
236 * In:  msgBuf the message buffer
237 * Returns the input buffer
238 */
239TracePacket* GetInBufForMsgBuf(char* msgBuf) {
240        return (TracePacket *)(msgBuf + sizeof(TP_PREFIX));
241}
242
243/*
244 * Get the input buffer for the station.
245 * In:  pxfrm pointer to the Xfrm World structure
246 *      jSta the station index
247 * Returns the input buffer
248 */
249TracePacket* GetInBufForSta(XFRMWORLD* pXfrm, int jSta) {
250        char* inMsgBuf = GetInMsgBufForSta(pXfrm, jSta);
251        return (TracePacket *)(inMsgBuf + sizeof(TP_PREFIX));
252}
253
254/*
255 * Get the wave bytes for the buffer.
256 * In:  buf the buffer
257 * Returns the wave bytes.
258 */
259char* GetWave(TracePacket* buf) {
260        return buf->msg + WAVE_BYTES_OFFSET;
261}
262
263/*
264 * Get the data size.
265 * In:  buf the buffer
266 * Return the data size.
267 */
268int GetDataSize(TracePacket* buf) {
269        return buf->trh2.datatype[1]-'0';
270}
271
272/*
273 * Compare the SCNLs.
274 * In:  scnl1 the first scnl
275 *      scnl2 the second scnl
276 * Return 0 if the SCNLs match, non-zero otherwise.
277 */
278int BaseCompareSCNL( XFRMSCNL *scnl1, XFRMSCNL *scnl2 ) {
279
280  TracePacket *buf1, *buf2;
281
282  /* Both SCNLs must exist */
283  if ( ( scnl1 == NULL ) || ( scnl2 ==NULL ) )
284    return 1;
285
286  /* Both must contain valid tracebufs; see BaseFilterXfrm() */
287  /*   status = -1, no data yet                              */
288  /*   status =  1, data is bad                              */
289  /*   status =  0, data is good                             */
290  if ( ( scnl1->status != 0 ) || ( scnl2->status != 0 ) )
291    return 2;
292
293  /* Both must have the same end time */
294  /* inEndtime = 0 is used in many places to mean no data */
295  if ( ( scnl1->inEndtime == 0.0 ) || ( scnl2->inEndtime == 0.0 ) ||
296       ( scnl1->inEndtime != scnl2->inEndtime ) )
297    return 3;
298
299  /* Both must have the same sample rate */
300  if ( scnl1->samprate != scnl2->samprate )
301    return 4;
302
303  /* Both must have the same data size (they have been converted to native type) */
304  if ( scnl1->datatype[1] != scnl2->datatype[1] )
305    return 5;
306
307  /* Both must have the same number of samples */
308  buf1 = GetInBufForMsgBuf( scnl1->inMsgBuf );
309  buf2 = GetInBufForMsgBuf( scnl2->inMsgBuf );
310  if ( buf1->trh2.nsamp != buf2->trh2.nsamp )
311    return 6;
312
313  return 0;
314
315}
316
317/*
318 * Compare the SCNL to the SCNL in the XFRMSCNL.
319 * In:  scnl the SCNL
320 *      xfrmSCNL the XFRMSCNL
321 * Returns 0 if the SCNL matches, non-zero otherwise
322 */
323static int CmpSCNL(SCNL *scnl, XFRMSCNL* xfrmSCNL) {
324        if (strcmp(xfrmSCNL->inSta, scnl->sta) != 0) {
325                return 1;
326        }
327        if (strcmp(xfrmSCNL->inChan, scnl->chan) != 0) {
328                return 2;
329        }
330        if (strcmp(xfrmSCNL->inNet, scnl->net) != 0) {
331                return 3;
332        }
333        if (strcmp(xfrmSCNL->inLoc, scnl->loc) != 0) {
334                return 4;
335        }
336        return 0;
337}
338
339/*
340 * Get the XFRMSCNL for the specified SCNL
341 * In:  pxfrm pointer to the Xfrm World structure
342 *      scnl the SCNL
343 * Out: jSta the station index
344 * Returns the XFRMSCNL or NULL if none
345 */
346static XFRMSCNL* GetXFRMSCNL(XFRMWORLD* pXfrm, SCNL *scnl, int *jSta) {
347        XFRMSCNL* xfrmSCNL = NULL;
348
349        for (*jSta = 0; *jSta < pXfrm->nSCNL; (*jSta)++) {
350                xfrmSCNL = GetXFRMSCNLForSta(pXfrm, *jSta);
351                if (CmpSCNL(scnl, xfrmSCNL) == 0) {
352                        return xfrmSCNL;
353                }
354        }
355        return NULL;
356}
357
358/*
359 * Add the input and output SCNL
360 * In:  pxfrm pointer to the Xfrm World structure
361 *      inSCNL the input SCNL
362 *      outSCNL the output SCNL or NULL if none
363 * Out: jSta the station index
364 * Returns 0 if add was successful, non-zero if error
365 */
366int AddSCNL( XFRMWORLD* pXfrm, SCNL *inSCNL, SCNL *outSCNL, int *jSta ) {
367
368  char *scnls;
369  int nSCNL;
370  XFRMSCNL *newSCNL;
371
372  /* Determine if NSCL already exists */
373  newSCNL = GetXFRMSCNL( pXfrm, inSCNL, jSta );
374  if ( newSCNL != NULL ) {
375    SetOutSCNL( newSCNL, outSCNL );
376    if ( pXfrm->xfrmParam->debug )
377      logit( "", "%s: sta[%d] outSCNL=<%s.%s.%s.%s>\n",
378                 pXfrm->mod_name, *jSta,
379                 newSCNL->outSta, newSCNL->outChan, newSCNL->outNet, newSCNL->outLoc );
380    return 0;
381  }
382
383  if ( pXfrm->nSCNL >= maxSta ) {
384    /* Need to allocate more */
385    maxSta += SCNL_INC;
386    if ( pXfrm->scnlRecSize < sizeof( XFRMSCNL ) ) {
387      logit( "e", "%s: invalid size of SCNL info record (%d); exiting!\n",
388                  pXfrm->mod_name, pXfrm->scnlRecSize );
389      return 1;
390    }
391    pXfrm->scnls = realloc( pXfrm->scnls, maxSta * pXfrm->scnlRecSize );
392    if ( pXfrm->scnls == NULL ) {
393      logit( "e", "%s: realloc for SCNL list failed; exiting!\n", pXfrm->mod_name );
394      return 2;
395    }
396  }
397  scnls = pXfrm->scnls;
398  nSCNL = pXfrm->nSCNL;
399  newSCNL = memset( (XFRMSCNL *) ( scnls + nSCNL * pXfrm->scnlRecSize ),
400                    0, pXfrm->scnlRecSize );
401  *jSta = nSCNL;
402
403  strncpy( newSCNL->inSta,  inSCNL->sta,  TRACE2_STA_LEN );
404  newSCNL->inSta[TRACE2_STA_LEN - 1]   = '\0';
405  strncpy( newSCNL->inChan, inSCNL->chan, TRACE2_CHAN_LEN );
406  newSCNL->inChan[TRACE2_CHAN_LEN - 1] = '\0';
407  strncpy( newSCNL->inNet,  inSCNL->net,  TRACE2_NET_LEN );
408  newSCNL->inNet[TRACE2_NET_LEN - 1]   = '\0';
409  strncpy( newSCNL->inLoc,  inSCNL->loc,  TRACE2_LOC_LEN );
410  newSCNL->inLoc[TRACE2_LOC_LEN - 1]   = '\0';
411  if ( pXfrm->xfrmParam->debug )
412    logit( "", "%s: sta[%d] SCNL=<%s.%s.%s.%s> (uninitialized)\n",
413               pXfrm->mod_name, *jSta,
414               newSCNL->inSta, newSCNL->inChan, newSCNL->inNet, newSCNL->inLoc );
415
416  SetOutSCNL( newSCNL, outSCNL );
417  if ( pXfrm->xfrmParam->debug )
418    if ( outSCNL != NULL )
419      logit( "", "%s: sta[%d] outSCNL=<%s.%s.%s.%s>\n",
420                 pXfrm->mod_name, *jSta,
421                 newSCNL->outSta, newSCNL->outChan, newSCNL->outNet, newSCNL->outLoc );
422
423  newSCNL->status = -1; /* uninitialized */
424  newSCNL->prevSta = -1;
425  newSCNL->nextSta = -1;
426  newSCNL->conversionFactor = TRACE2_NO_CONVERSION_FACTOR;
427  pXfrm->nSCNL++;
428  return 0;
429
430}
431
432/*
433 * Reads the SCNL
434 * In:  pxfrm pointer to the Xfrm World structure
435 *      outputFlag true if output, false if input
436 *      locFlag non-zero if location is provided, 0 otherwise
437 * Out: scnl the SCNL
438 * Returns 0 if read was successful, non-zero if error
439 */
440int ReadSCNL(XFRMWORLD* pXfrm, int outputFlag, int locFlag, SCNL *scnl) {
441        scnl->sta = k_str(); /* read station code */
442        if (!IsValidSta(pXfrm, outputFlag, scnl->sta)) {
443                return 1;
444        }
445        scnl->chan = k_str(); /* read channel code */
446        if (!IsValidChan(pXfrm, outputFlag, scnl->chan)) {
447                return 2;
448        }
449        scnl->net = k_str(); /* read network code */
450        if (!IsValidNet(pXfrm, outputFlag, scnl->net)) {
451                return 3;
452        }
453
454    if (locFlag) {  /* read input location code */
455        scnl->loc = k_str(); /* read location code */
456        if (!IsValidLoc(pXfrm, outputFlag, scnl->loc))
457        {
458                return 4;
459        }
460    }
461    else {      /* use default blank location code */
462        scnl->loc = LOC_NULL_STRING;
463    }
464    return 0;
465}
466
467static int BaseProcessXfrmCommand( XFRMWORLD* pXfrm, int cmd_id, char *com ) {
468  switch ( cmd_id ) {
469  case CMD_ID_GETSCNL: /* GetSCNL or GetSCN */
470  {
471        SCNL inSCNL1, inSCNL2;
472        SCNL outSCNL;
473        XFRMSCNL *xfrmSCNL;
474        double conversionFactor;
475        int jSta1, jSta2;
476        int locFlag = strcmp(com,"GetSCN");
477        int outputFlag = 0;  /* input SCNL */
478        /* exit if error reading the input SCNL */
479        if (ReadSCNL(pXfrm, outputFlag, locFlag, &inSCNL1) != 0) {
480                return -2;
481        }
482        if (pXfrm->nInSCNL > 1) {
483                if (ReadSCNL(pXfrm, outputFlag, locFlag, &inSCNL2) != 0) {
484                        return -2;
485                }
486        }
487        outputFlag = 1;  /* output SCNL */
488        /* exit if error reading the output SCNL */
489        if (ReadSCNL(pXfrm, outputFlag, locFlag, &outSCNL) != 0) {
490                return -2;
491        }
492        /* exit if adding the input and output SCNL */
493    if (AddSCNL(pXfrm, &inSCNL1, &outSCNL, &jSta1) != 0) {
494        return -2;
495    }
496
497        /* read conversion factor if present */
498        conversionFactor = k_val();
499        if (!k_err()) { /* if conversion factor present */
500                xfrmSCNL = GetXFRMSCNLForSta(pXfrm, jSta1);
501                xfrmSCNL->conversionFactor = conversionFactor;
502        }
503
504    if (pXfrm->nInSCNL > 1) {
505        if (AddSCNL(pXfrm, &inSCNL2, NULL, &jSta2) != 0) {
506                return -2;
507        }
508        xfrmSCNL = GetXFRMSCNLForSta(pXfrm, jSta1);
509        if (xfrmSCNL == NULL) {
510                return -1;
511        }
512        xfrmSCNL->nextSta = jSta2;
513        if ( pXfrm->xfrmParam->debug )
514            logit( "", "%s: sta[%d] nextSta=%d\n", pXfrm->mod_name, jSta1, jSta2 );
515        xfrmSCNL = GetXFRMSCNLForSta(pXfrm, jSta2);
516        if (xfrmSCNL == NULL) {
517                return -1;
518        }
519        xfrmSCNL->prevSta = jSta1;
520        if ( pXfrm->xfrmParam->debug )
521            logit( "", "%s: sta[%d] prevSta=%d\n", pXfrm->mod_name, jSta2, jSta1 );
522    }
523    break;
524  }
525  case CMD_ID_GETWAVESFROM: /* GetWavesFrom */
526  {
527        char *str;
528        int ilogo = pXfrm->xfrmParam->nlogo;
529        if(ilogo+1 >= MAX_LOGO) 
530    {
531       logit( "e","%s: Too many GetWavesFrom commands; "
532              "max=%d; exiting!\n", pXfrm->mod_name, MAX_LOGO );
533       return -2;
534    }
535    if((str = k_str()) != NULL) 
536      strcpy(pXfrm->xfrmParam->readInstName[ilogo], str);
537
538    if((str = k_str()) != NULL) 
539      strcpy(pXfrm->xfrmParam->readModName[ilogo], str);
540
541    if((str = k_str()) != NULL)
542    {
543      if( strcmp(str,"TYPE_TRACEBUF" ) != 0 &&
544          strcmp(str,"TYPE_TRACEBUF2") != 0    ) 
545      {
546         logit( "e","%s: GetWavesFrom: invalid msg type: %s "
547                    "(must be TYPE_TRACEBUF or TYPE_TRACEBUF2); "
548                    "exiting!\n", pXfrm->mod_name, str );
549         return -2;
550      }
551      strcpy(pXfrm->xfrmParam->readTypeName[ilogo], str);
552    }
553    pXfrm->xfrmParam->nlogo++;
554    break;
555  }
556  default:
557    return ProcessXfrmCommand( cmd_id, com );
558  }
559  return cmd_id;
560}
561
562/*      Function: BaseReadXfrmConfig                                            */
563static int BaseReadXfrmConfig (char *configfile, XFRMWORLD* pXfrm )
564{
565  char                  *init=NULL;
566  /* init flags, one byte for each required command */
567  int                   nmiss;
568  /* number of required commands that were missed   */
569  char                  *com;
570  char                  *str;
571  int                   nfiles;
572  int                   success;
573  int                   i;
574  /* Assume failure (for goto CONFIG_FAILURE) */
575  int                   ret=EW_FAILURE;
576
577  BaseCmd = calloc( sizeof(char*), CmdCount );
578  ParamTarget = calloc( sizeof(void*), CmdCount );
579  init = malloc( CmdCount );
580  if ( BaseCmd==NULL || ParamTarget==NULL || init==NULL) {
581    logit ("e",
582             "%s: Allocation failure reading config file %s; exiting!\n",
583             pXfrm->mod_name, configfile);
584    goto CONFIG_FAILURE;
585  }
586  BaseCmd[0] = "RSMyModId";
587  ParamTarget[0] = &(pXfrm->xfrmParam->myModName);
588  BaseCmd[1] = "RSInRing";
589  ParamTarget[1] = &(pXfrm->xfrmParam->ringIn);
590  BaseCmd[2] = "RSOutRing";
591  ParamTarget[2] = &(pXfrm->xfrmParam->ringOut);
592  BaseCmd[3] = "RIHeartBeatInterval";
593  ParamTarget[3] = &(pXfrm->xfrmParam->heartbeatInt);
594  BaseCmd[4] = "RSLogFile";
595  ParamTarget[4] = &(pXfrm->xfrmParam->logSwitch);
596  BaseCmd[5] = "RVMaxGap";
597  ParamTarget[5] = &(pXfrm->xfrmParam->maxGap);
598  BaseCmd[6] = "OFTestMode";
599  ParamTarget[6] = &(pXfrm->xfrmParam->testMode);
600  BaseCmd[7] = "OFDebug";
601  ParamTarget[7] = &(pXfrm->xfrmParam->debug);
602  BaseCmd[CMD_ID_GETSCNL] = "RXGetSCNL";
603  BaseCmd[9] = "OFProcessRejected";
604  ParamTarget[9] = &(pXfrm->xfrmParam->processRejected);
605  BaseCmd[10] = "OFGetAllSCNLs";
606  ParamTarget[10] = &(pXfrm->xfrmParam->getAllSCNLs);
607  BaseCmd[CMD_ID_GETWAVESFROM] = "RXGetWavesFrom";
608
609  for ( i=NUM_BASE_CMDS; i<CmdCount; i++ ) {
610    BaseCmd[i] = ModCmds[i-NUM_BASE_CMDS];
611    ParamTarget[i] = ModParamTargets[i-NUM_BASE_CMDS];
612  }
613
614  /* Set to zero one init flag for each required command */
615  for (i = 0; i < CmdCount; i++)
616    init[i] = 0;
617
618  /* Open the main configuration file
619   **********************************/
620  nfiles = k_open (configfile); 
621  if (nfiles == 0) {
622    logit ("e",
623             "%s: Error opening command file <%s>; exiting!\n",
624             pXfrm->mod_name, configfile);
625    goto CONFIG_FAILURE;
626  }
627
628  /* Process all command files
629   ***************************/
630  while (nfiles > 0)   /* While there are command files open */
631  {
632    while (k_rd ())        /* Read next line from active file  */
633    { 
634      com = k_str ();         /* Get the first token from line */
635
636    /* Ignore blank lines & comments
637     *******************************/
638      if (!com) continue;
639      if (com[0] == '#') continue;
640
641      /* Open a nested configuration file */
642      if (com[0] == '@') 
643      {
644        success = nfiles + 1;
645        nfiles  = k_open (&com[1]);
646        if (nfiles != success) {
647          logit ("e",
648                   "%s: Error opening command file <%s>; exiting!\n",
649                   pXfrm->mod_name, &com[1]);
650          goto CONFIG_FAILURE;
651        }
652        continue;
653      }
654
655      if ( strcmp( com, "GetSCN" ) == 0 ) /* Set com to text of GetSCNL */
656        com = BaseCmd[CMD_ID_GETSCNL]+2;
657     
658      ret = -1;
659      for ( i=0; i<CmdCount; i++ )
660      {
661        if ( strcmp( com, BaseCmd[i]+2 ) == 0 ) /*(k_its (BaseCmd[i]+2)) */
662        {
663          char type = BaseCmd[i][1];
664          char kind = BaseCmd[i][0];
665          switch ( type ) {
666          case 'S': /* String */
667            if ((str = k_str ()) != NULL )
668            {
669              strcpy( ParamTarget[i], str );
670            } else {
671              kind = 0;
672            }
673            break;
674          case 'I': /* Integer */
675            *((int*)(ParamTarget[i])) = k_long();
676            break;
677          case 'V': /* Value ? */
678            *((double*)(ParamTarget[i])) = k_val();
679            break;
680          case 'F': /* Flag */
681            *((int*)(ParamTarget[i])) = 1;
682          default: /* Unknown; user-defined? */
683            ret = BaseProcessXfrmCommand( pXfrm, i, com );
684            if ( ret >= 0 )
685              i = ret;
686            break;
687          }
688          break;
689        }
690      }
691      if ( i >= CmdCount )
692      {
693        ret = BaseProcessXfrmCommand( pXfrm, -1, com );
694      } else {
695        ret = i;
696      }
697      if ( ret >= 0 )
698      { 
699        if ( pXfrm->xfrmParam->debug )
700          logit("","%s: command '%s' accepted\n",pXfrm->mod_name, com);
701        init[ret] = 1;
702      } else if ( ret == -1 ) {
703        logit ("e", "%s: <%s> Unknown command in <%s>.\n", 
704               pXfrm->mod_name, com, configfile);
705        continue;
706      } else if ( ret < -1 ) {
707          logit ("e",
708                   "%s: Bad <%s> command in <%s>; exiting!\n",
709                   pXfrm->mod_name, com, configfile);
710          goto CONFIG_FAILURE;
711      }
712
713   /* See if there were any errors processing the command */
714      if ( k_err() ) {
715        logit ("e",
716                 "%s: Bad <%s> command in <%s>; exiting!\n",
717                 pXfrm->mod_name, com, configfile);
718        goto CONFIG_FAILURE;
719      }
720
721    } /** end while k_rd() **/
722
723    nfiles = k_close ();
724
725  } /** end while nfiles **/
726
727/* After all files are closed, check init flags for missed commands */
728  nmiss = 0;
729  for ( i = 0; i < CmdCount; i++ )
730    if ( BaseCmd[i][0]=='R' && !init[i] )
731      nmiss++;
732
733  if ( nmiss > 0 ) {
734    logit ("e", "%s: ERROR, no ", pXfrm->mod_name);
735    for(i = 0; i < CmdCount; i++) if (!init[i]) logit("e", "<%s> ", BaseCmd[i]+2 );
736
737    logit ("e", "command%s in <%s>; exiting!\n", (nmiss==1 ? "" : "s"), configfile);
738    goto CONFIG_FAILURE;
739  }
740
741  ret = ReadXfrmConfig( init );
742
743/* ret is initialized to EW_FAILURE */
744CONFIG_FAILURE:
745  free( BaseCmd );
746  free( ParamTarget );
747  free( init );
748  return ret;
749}
750
751static int BaseReadXfrmEWH( XFRMWORLD *pXfrm )
752{
753  int i;
754
755  if ( GetLocalInst( &(pXfrm->xfrmEWH->myInstId)) != 0 )
756  {
757    logit("e", "%s: Error getting myInstId for %d.\n", 
758          pXfrm->mod_name, (int) pXfrm->xfrmEWH->myInstId);
759    return EW_FAILURE;
760  }
761 
762  if ( GetModId( pXfrm->xfrmParam->myModName, &(pXfrm->xfrmEWH->myModId)) != 0 )
763  {
764    logit("e", "%s: Error getting myModId for %s.\n",
765           pXfrm->mod_name, pXfrm->xfrmParam->myModName );
766    return EW_FAILURE;
767  }
768
769  for (i=0; i<pXfrm->xfrmParam->nlogo; i++ ) {
770    if ( GetInst( pXfrm->xfrmParam->readInstName[i], &(pXfrm->xfrmEWH->readInstId[i])) != 0)
771    {
772      logit("e", "%s: Error getting readInstId for %s.\n",
773             pXfrm->mod_name, pXfrm->xfrmParam->readInstName[i] );
774      return EW_FAILURE;
775    }
776 
777    if ( GetModId( pXfrm->xfrmParam->readModName[i], &(pXfrm->xfrmEWH->readModId[i])) != 0 )
778    {
779      logit("e", "%s: Error getting readModName for %s.\n",
780             pXfrm->mod_name, pXfrm->xfrmParam->readModName[i] );
781      return EW_FAILURE;
782    }
783
784    if ( GetType( pXfrm->xfrmParam->readTypeName[i], &(pXfrm->xfrmEWH->readMsgType[i])) != 0 )
785    {
786      logit("e", "%s: Error getting readMsgType for %s.\n",
787             pXfrm->mod_name, pXfrm->xfrmParam->readTypeName[i] );
788      return EW_FAILURE;
789    }
790  }
791
792  /* Look up keys to shared memory regions */
793  if ((pXfrm->xfrmEWH->ringInKey = GetKey (pXfrm->xfrmParam->ringIn)) == -1) 
794  {
795    logit("e", "%s:  Invalid input ring name <%s>; exiting!\n", 
796           pXfrm->mod_name, pXfrm->xfrmParam->ringIn);
797    return EW_FAILURE;
798  }
799
800  if ((pXfrm->xfrmEWH->ringOutKey = GetKey (pXfrm->xfrmParam->ringOut) ) == -1) 
801  {
802    logit("e", "%s:  Invalid output ring name <%s>; exiting!\n", 
803          pXfrm->mod_name, pXfrm->xfrmParam->ringOut);
804    return EW_FAILURE;
805  }
806
807  /* Look up message types of interest */
808  if (GetType ("TYPE_HEARTBEAT", &(pXfrm->xfrmEWH->typeHeartbeat)) != 0) 
809  {
810    logit("e", "%s: Invalid message type <TYPE_HEARTBEAT>; exiting!\n", 
811            pXfrm->mod_name);
812    return EW_FAILURE;
813  }
814
815  if (GetType ("TYPE_ERROR", &(pXfrm->xfrmEWH->typeError)) != 0) 
816  {
817    logit("e", "%s: Invalid message type <TYPE_ERROR>; exiting!\n", 
818            pXfrm->mod_name);
819    return EW_FAILURE;
820  } 
821
822  if (GetType ("TYPE_TRACEBUF", &(pXfrm->xfrmEWH->typeTrace)) != 0) 
823  {
824    logit("e", "decimate: Invalid message type <TYPE_TRACEBUF>; exiting!\n");
825    return EW_FAILURE;
826  }
827
828  if (GetType ("TYPE_TRACEBUF2", &(pXfrm->xfrmEWH->typeTrace2)) != 0) 
829  {
830    logit("e", "decimate: Invalid message type <TYPE_TRACEBUF2>; exiting!\n");
831    return EW_FAILURE;
832  }
833
834  return ReadXfrmEWH();
835
836} 
837
838static int BaseConfigureXfrm(XFRMWORLD *pXfrm, char **argv)
839{
840  /* Set initial values of WORLD structure */
841  InitializeParameters( pXfrm );
842
843  /* Initialize name of log-file & open it 
844   ***************************************/
845  logit_init (argv[1], 0, MAXMESSAGELEN, 1);
846
847  /* Read config file and configure the decimator */
848  if (BaseReadXfrmConfig(argv[1], pXfrm) != EW_SUCCESS)
849  {
850    logit("e", "%s: failed reading config file <%s>\n", pXfrm->mod_name, argv[1]);
851    return EW_FAILURE;
852  }
853  logit ("" , "Read command file <%s>\n", argv[1]);
854
855  /* Look up important info from earthworm.h tables
856   ************************************************/
857  if (BaseReadXfrmEWH(pXfrm) != EW_SUCCESS)
858  {
859    logit("e", "%s: Call to ReadEWH failed \n", pXfrm->mod_name );
860    return EW_FAILURE;
861  }
862
863  /* Reinitialize logit to the desired logging level
864   ***********************************************/
865  logit_init (argv[1], 0, MAXMESSAGELEN, pXfrm->xfrmParam->logSwitch);
866 
867  /* Get our process ID
868   **********************/
869  if ((pXfrm->MyPid = getpid ()) == -1)
870  {
871    logit ("e", "%s: Call to getpid failed. Exiting.\n", pXfrm->mod_name);
872    return (EW_FAILURE);
873  }
874
875  return ConfigureXfrm();
876}
877
878int matchXfrmSCNL( TracePacket* pPkt, unsigned char msgtype, XFRMWORLD* pXfrm )
879{
880  int   i;
881
882/* Look for match in TYPE_TRACEBUF2 packet
883 *****************************************/
884  if( msgtype == pXfrm->xfrmEWH->typeTrace2 )
885  {
886     /* This should NEVER happen;
887        scblRecSize had to have been set to allocate SCNL records */
888     if ( pXfrm->xfrmParam->getAllSCNLs == 0 && pXfrm->scnlRecSize < sizeof(XFRMSCNL) )
889     {
890       logit ("e", "%s: invalid size of SCNL info record (%d); exiting!\n", pXfrm->mod_name, pXfrm->scnlRecSize );
891       return -3;
892     }
893
894     for(i = 0; i < pXfrm->nSCNL; i++ )
895     {
896       XFRMSCNL *testSCNL = (XFRMSCNL *)(pXfrm->scnls + i*(pXfrm->scnlRecSize));
897       /* try to match explicitly */
898       if ((strcmp(pPkt->trh2.sta,  testSCNL->inSta)  == 0) &&
899           (strcmp(pPkt->trh2.chan, testSCNL->inChan) == 0) &&
900           (strcmp(pPkt->trh2.net,  testSCNL->inNet)  == 0) &&
901           (strcmp(pPkt->trh2.loc,  testSCNL->inLoc)  == 0)    )
902         return( i );
903     }
904     
905     if ( pXfrm->xfrmParam->getAllSCNLs ) {
906        /* Try to add new SCNL, then "find" it if successful */
907        SCNL newSCNL;
908        memset( &newSCNL, 0, sizeof(newSCNL) );
909
910        strncpy( newSCNL.sta,  pPkt->trh2.sta,  TRACE2_STA_LEN );
911        newSCNL.sta[TRACE2_STA_LEN - 1]   = '\0';
912        strncpy( newSCNL.chan, pPkt->trh2.chan, TRACE2_CHAN_LEN );
913        newSCNL.chan[TRACE2_CHAN_LEN - 1] = '\0';
914        strncpy( newSCNL.net,  pPkt->trh2.net,  TRACE2_NET_LEN );
915        newSCNL.net[TRACE2_NET_LEN - 1]   = '\0';
916        strncpy( newSCNL.loc,  pPkt->trh2.loc,  TRACE2_LOC_LEN );
917        newSCNL.loc[TRACE2_LOC_LEN - 1]   = '\0';
918        if ( AddSCNL( pXfrm, &newSCNL, &newSCNL, &i) == 0 )
919                return matchXfrmSCNL( pPkt, msgtype, pXfrm );
920     }
921     return -1;  /* no match in SCNL for TYPE_TRACEBUF2 */
922  }
923
924/* Look for match in TYPE_TRACEBUF packet
925 ****************************************/
926  else if( msgtype == pXfrm->xfrmEWH->typeTrace )
927  {
928     /* This should NEVER happen;
929        scblRecSize had to have been set to allocate SCNL records */
930     if ( pXfrm->xfrmParam->getAllSCNLs == 0 && pXfrm->scnlRecSize < sizeof(XFRMSCNL) )
931     {
932       logit ("e", "%s: invalid size of SCNL info record (%d); exiting!\n", pXfrm->mod_name, pXfrm->scnlRecSize );
933       return -3;
934     }
935
936     for(i = 0; i < pXfrm->nSCNL; i++ )
937     {
938       XFRMSCNL *testSCNL = (XFRMSCNL *)(pXfrm->scnls + i*(pXfrm->scnlRecSize));
939       /* try to match explicitly */
940       if ((strcmp(pPkt->trh.sta,   testSCNL->inSta)  == 0) &&
941           (strcmp(pPkt->trh.chan,  testSCNL->inChan) == 0) &&
942           (strcmp(pPkt->trh.net,   testSCNL->inNet)  == 0) &&
943           (strcmp(LOC_NULL_STRING, testSCNL->inLoc)  == 0)    )
944         return( i );
945     }
946     if ( pXfrm->xfrmParam->getAllSCNLs ) {
947        /* Try to add new SCNL, then "find" it if successful */
948        SCNL newSCNL;
949        memset( &newSCNL, 0, sizeof(newSCNL) );
950       
951        strncpy( newSCNL.sta,  pPkt->trh2.sta,  TRACE2_STA_LEN );
952        newSCNL.sta[TRACE2_STA_LEN - 1]   = '\0';
953        strncpy( newSCNL.chan, pPkt->trh2.chan, TRACE2_CHAN_LEN );
954        newSCNL.chan[TRACE2_CHAN_LEN - 1] = '\0';
955        strncpy( newSCNL.net,  pPkt->trh2.net,  TRACE2_NET_LEN );
956        newSCNL.net[TRACE2_NET_LEN - 1]   = '\0';
957        strncpy( newSCNL.loc,  LOC_NULL_STRING, TRACE2_LOC_LEN );
958        newSCNL.loc[TRACE2_LOC_LEN - 1]   = '\0';
959        if ( AddSCNL( pXfrm, &newSCNL, &newSCNL, &i) == 0 )
960                return matchXfrmSCNL( pPkt, msgtype, pXfrm );
961     }
962     return( -1 );  /* no match in SCN for TYPE_TRACEBUF */
963  }
964
965/* Unknown Message Type
966 **********************/
967  if ( pXfrm->xfrmParam->debug ) 
968    logit("", "%s: Unknown message type %d\n", pXfrm->mod_name, msgtype);
969 
970  return( -2 );
971}
972
973
974/*      Function: SubtusReport                                          */
975void StatusReport( XFRMWORLD *pXfrm, unsigned char type, short code, 
976                   char* message )
977{
978  char          outMsg[MAXMESSAGELEN];  /* The outgoing message.        */
979  time_t        msgTime;        /* Time of the message.                 */
980
981  /*  Get the time of the message                                     */
982  time( &msgTime );
983
984  /*  Build & process the message based on the type                   */
985    if ( pXfrm->xfrmEWH->typeHeartbeat == type )
986    {
987      sprintf( outMsg, "%ld %ld\n", (long) msgTime, (long) pXfrm->MyPid );
988     
989      /*Write the message to the output region                          */
990      if ( tport_putmsg( &(pXfrm->regionOut), &(pXfrm->hrtLogo), 
991                         (long) strlen( outMsg ), outMsg ) != PUT_OK )
992      {
993        /*     Log an error message                                    */
994        logit( "t", "%s: Failed to send a heartbeat message (%d).\n",
995               pXfrm->mod_name, code );
996      }
997    }
998    else
999    {
1000      if ( message ) {
1001        sprintf( outMsg, "%ld %hd %s\n", (long) msgTime, code, message );
1002        logit("t","Error:%d (%s)\n", code, message );
1003      }
1004      else {
1005        sprintf( outMsg, "%ld %hd\n", (long) msgTime, code );
1006        logit("t","Error:%d (No description)\n", code );
1007      }
1008
1009      /*Write the message to the output region                         */
1010      if ( tport_putmsg( &(pXfrm->regionOut), &(pXfrm->errLogo), 
1011                         (long) strlen( outMsg ), outMsg ) != PUT_OK )
1012      {
1013        /*     Log an error message                                    */
1014        logit( "t", "%s: Failed to send an error message (%d).\n",
1015               pXfrm->mod_name, code );
1016      }
1017     
1018    }
1019}
1020
1021/*      Function: BaseXfrmThread                                            */
1022thr_ret BaseXfrmThread (void* args) {
1023  XFRMWORLD     *World = ( XFRMWORLD *) args;
1024  char          *inMsgBuf;
1025  int            jSta;
1026  XFRMSCNL      *pSCNL;
1027  DATA           peekData;
1028  int            ret;
1029  MSG_LOGO       reclogo;       /* logo of retrieved message     */
1030  TracePacket   *inBuf;       /* pointer to raw trace message  */
1031  TracePacket   *outBuf;        /* pointer to debiased trace message  */
1032
1033  if (!World->useInBufPerSCNL) {
1034          inMsgBuf = AllocateMsgBuf();
1035  } else {
1036          inMsgBuf = NULL;
1037  }
1038
1039  /* set up input buffer for each SCNL */
1040  for(jSta = 0; jSta < World->nSCNL; jSta++ )
1041  {
1042          pSCNL = GetXFRMSCNLForSta(World, jSta);
1043          if (inMsgBuf != NULL) {
1044                  pSCNL->inMsgBuf = inMsgBuf;
1045          } else {
1046                  pSCNL->inMsgBuf = AllocateMsgBuf();
1047                  if (pSCNL->inMsgBuf == NULL)
1048                  {
1049                    logit ("e", "%s: Cannot allocate input buffer\n", World->mod_name);
1050                    World->XfrmStatus = -1;
1051                    KillSelfThread();
1052                  }
1053          }
1054  }
1055
1056  outBuf    = (TracePacket *) AllocateMsgBuf();
1057  if (outBuf == NULL)
1058  {
1059    logit ("e", "%s: Cannot allocate output buffer\n", World->mod_name);
1060    World->XfrmStatus = -1;
1061    KillSelfThread();
1062  }
1063
1064  /* Tell the main thread we're feeling ok */
1065  World->XfrmStatus = 0;
1066
1067  while (1)
1068  {
1069    /* Get top message from the MsgQueue */
1070    RequestMutex ();
1071    ret = -1;
1072    peekData = peekNextElement(&(World->MsgQueue));
1073    if (peekData != NULL)
1074    {
1075        /* Extract the SCNL number; recall, it was pasted as an int on the front
1076         * of the message by the main thread */
1077        jSta = *((int*) peekData);
1078        inMsgBuf = GetInMsgBufForSta(World, jSta);
1079        inBuf  = GetInBufForMsgBuf(inMsgBuf);
1080
1081        ret = dequeue (&(World->MsgQueue), inMsgBuf, &MsgBufLen, &reclogo);
1082    }
1083    ReleaseMutex_ew ();
1084   
1085    if (ret < 0)
1086    {                                 /* empty queue */
1087      sleep_ew (500);
1088      continue;
1089    }
1090
1091    if ( World->completionFcn != NULL && MsgBufLen==0 )
1092    {
1093      logit("","%s: thread knows to terminate\n", World->mod_name);
1094      World->completionFcn( 1 );
1095      return THR_NULL_RET;
1096    }
1097   
1098    if (FilterXfrm( World, inBuf, jSta, reclogo.type, outBuf ) ==
1099        EW_FAILURE)
1100    {
1101      logit("t", "%s: error from FilterXfrm; exiting\n", World->mod_name);
1102      World->XfrmStatus = -1;
1103      KillSelfThread();
1104      return THR_NULL_RET; /* should never reach here */
1105    }
1106
1107  } /* while (1) - message dequeuing process */
1108  return THR_NULL_RET; /* should never reach here unless there is a break in the continuous loop */
1109
1110}
1111
1112int BaseXfrmResetSCNL( XFRMSCNL *pSCNL, TracePacket *inBuf, XFRMWORLD* pWorld ) {
1113
1114  pSCNL->inEndtime = 0.0;    /* used to identify gaps */
1115 
1116  /* Save params that mustn't change over span of packets */
1117  strcpy( pSCNL->datatype, inBuf->trh2.datatype );
1118  pSCNL->samprate = inBuf->trh2.samprate;
1119 
1120  return XfrmResetSCNL( pSCNL, inBuf, pWorld );
1121
1122}
1123
1124int BaseFilterXfrm (XFRMWORLD* pDb, TracePacket *inBuf, int jSta, 
1125                    unsigned char msgtype, TracePacket *outBuf)
1126{
1127  XFRMSCNL *pSCNL;
1128  int ret, datasize;
1129 
1130  pSCNL = (XFRMSCNL*)(pDb->scnls + (jSta * pDb->scnlRecSize));
1131 
1132  /* Create SCNL info field if need be */
1133  if ( pSCNL->status < 0 ) 
1134  {
1135    ret = XfrmInitSCNL( pDb, pSCNL ); 
1136    if ( ret != EW_SUCCESS )
1137    {
1138      return ret;
1139    }
1140    pSCNL->status = 0;
1141    if ( pDb->xfrmParam->debug )
1142      logit( "", "%s: sta[%d] status=0 (initialized)\n", pDb->mod_name, jSta );
1143  }
1144  else if ( pSCNL->status )
1145  {
1146    return EW_WARNING;
1147  }
1148
1149  /* If it's tracebuf, make it look like a tracebuf2 message */
1150  if( msgtype == pDb->xfrmEWH->typeTrace ) TrHeadConv( &(inBuf->trh) );
1151 
1152  if (pDb->xfrmParam->debug)
1153    logit("t", "%s: enter filter with <%s.%s.%s.%s> start: %lf\n",
1154          pDb->mod_name,
1155          inBuf->trh2.sta, inBuf->trh2.chan, inBuf->trh2.net, inBuf->trh2.loc,
1156          inBuf->trh2.starttime );
1157 
1158  /* Check for useable data types: we only handle shorts and longs for now */
1159  if ( (inBuf->trh2.datatype[0] != 's' && inBuf->trh2.datatype[0] != 'i') ||
1160        (inBuf->trh2.datatype[1] != '2' && inBuf->trh2.datatype[1] != '4') )
1161  {
1162    logit("t","%s: unusable datatype <%s> from <%s.%s.%s.%s>; skipping\n",
1163          pDb->mod_name,
1164          inBuf->trh2.datatype,
1165          inBuf->trh2.sta, inBuf->trh2.chan, inBuf->trh2.net, inBuf->trh2.loc);
1166    return EW_WARNING;
1167  }
1168  else
1169  {
1170    datasize = inBuf->trh2.datatype[1]-'0';
1171  }
1172
1173  /* If we have previous data, check for data gap */
1174  if ( pSCNL->inEndtime != 0.0 )
1175  {
1176    if ( (inBuf->trh2.starttime - pSCNL->inEndtime) * inBuf->trh2.samprate > 
1177         pDb->xfrmParam->maxGap )
1178    {
1179      logit("t","%s: gap in data for <%s.%s.%s.%s>:\n\tlast end: %lf this start: %lf; resetting\n",
1180            pDb->mod_name,
1181            pSCNL->inSta, pSCNL->inChan, pSCNL->inNet, pSCNL->inLoc, 
1182            pSCNL->inEndtime, inBuf->trh2.starttime);
1183      if ( BaseXfrmResetSCNL( (XFRMSCNL*)pSCNL, inBuf, (XFRMWORLD*)pDb ) != EW_SUCCESS )
1184      {
1185        pSCNL->status = 1;
1186        return EW_FAILURE;
1187      }
1188    }
1189    else if (inBuf->trh2.starttime < pSCNL->inEndtime)
1190    {
1191      logit("t","%s: overlapping times for <%s.%s.%s.%s>:\n"
1192            "\tlast end: %lf this start: %lf; resetting\n",
1193            pDb->mod_name,
1194            pSCNL->inSta, pSCNL->inChan, pSCNL->inNet, pSCNL->inLoc, 
1195            pSCNL->inEndtime, inBuf->trh2.starttime);
1196      if ( BaseXfrmResetSCNL( (XFRMSCNL*)pSCNL, inBuf, (XFRMWORLD*)pDb ) != EW_SUCCESS )
1197      {
1198        pSCNL->status = 1;
1199        return EW_FAILURE;
1200      }
1201    }
1202    else if ( inBuf->trh2.samprate != pSCNL->samprate ) 
1203    {
1204      logit("et","%s: sample rate changed in <%s.%s.%s.%s> at %lf from %lf to %lf; exiting.\n",
1205            pDb->mod_name,
1206            pSCNL->inSta, pSCNL->inChan, pSCNL->inNet, pSCNL->inLoc, inBuf->trh2.starttime,
1207            pSCNL->samprate, inBuf->trh2.samprate);
1208      pSCNL->status = 1;
1209      return EW_FAILURE;
1210    }
1211    else if ( strcmp( inBuf->trh2.datatype, pSCNL->datatype ) != 0 ) 
1212    {
1213      logit("et","%s: data type changed in <%s.%s.%s.%s> at %lf from %s to %s; exiting.\n",
1214            pDb->mod_name,
1215            pSCNL->inSta, pSCNL->inChan, pSCNL->inNet, pSCNL->inLoc, inBuf->trh2.starttime,
1216            pSCNL->datatype, inBuf->trh2.datatype);
1217      pSCNL->status = 1;
1218      return EW_FAILURE;
1219    }
1220
1221  }
1222  else
1223  {
1224    BaseXfrmResetSCNL( (XFRMSCNL*)pSCNL, inBuf, (XFRMWORLD*)pDb );
1225  }
1226 
1227  pSCNL->inEndtime = inBuf->trh2.endtime;
1228   
1229  return EW_SUCCESS;
1230}
1231
1232int XfrmWritePacket( XFRMWORLD* pDb, XFRMSCNL* pSCNL, unsigned char msgtype, 
1233        TracePacket* outBuf, int outBufLen )
1234{
1235  pDb->trcLogo.type = msgtype;
1236  pDb->trcLogo.mod = pDb->xfrmEWH->myModId;
1237  pDb->trcLogo.instid = pDb->xfrmEWH->myInstId;
1238  outBuf->trh2.version[0] = TRACE2_VERSION0;
1239  outBuf->trh2.version[1] = TRACE2_VERSION1;
1240  strcpy( outBuf->trh2.sta, pSCNL->outSta );
1241  strcpy( outBuf->trh2.net, pSCNL->outNet );
1242  strcpy( outBuf->trh2.chan, pSCNL->outChan );
1243  strcpy( outBuf->trh2.loc, pSCNL->outLoc );
1244  if (pSCNL->conversionFactor != TRACE2_NO_CONVERSION_FACTOR) {
1245          outBuf->trh2x.version[1] = TRACE2_VERSION11;
1246          outBuf->trh2x.x.v21.conversion_factor = pSCNL->conversionFactor;
1247  }
1248  if (tport_putmsg (&(pDb->regionOut), &(pDb->trcLogo), outBufLen, 
1249                    outBuf->msg) != PUT_OK)
1250  {
1251    logit ("t","%s: Error sending type:%d message.\n", pDb->mod_name, msgtype );
1252    return EW_FAILURE;
1253  }
1254   
1255  return EW_SUCCESS;
1256}
1257
1258int main (int argc, char **argv) 
1259{
1260  XFRMWORLD *World;                /* Our main data structure              */
1261  time_t    timeNow;               /* current time                         */ 
1262  time_t    timeLastBeat;          /* time last heartbeat was sent         */
1263  long      sizeMsg;               /* size of retrieved message            */
1264  SHM_INFO  regionIn;              /* Input shared memory region info.     */
1265  MSG_LOGO  logoWave[MAX_LOGO];    /* Logo(s) of requested waveforms.      */
1266  int       ilogo;                 /* working counter                      */
1267  MSG_LOGO  logoMsg;               /* logo of retrieved message            */
1268  unsigned  tidXfrmor;             /* Transformer thread id                */
1269  char     *inBuf;                 /* Pointer to the input message buffer. */
1270  int       inBufLen;              /* Size of input message buffer         */
1271  TracePacket  *TracePkt;
1272  int       ret;
1273  char      msgText[MAXMESSAGELEN];/* string for log/error messages        */
1274
1275  /* Allocate a WORLD
1276   *****************************************************/
1277  CmdCount = NUM_BASE_CMDS;
1278  SetupXfrm( &World, &ModCmds, &CmdCount, &ModParamTargets );
1279  World->scnls = NULL;
1280  World->nSCNL = 0;
1281
1282  /* Check command line arguments
1283   ******************************/
1284  if (argc != 2)
1285  {
1286    fprintf (stderr, "Usage: %s <configfile>\n", World->mod_name);
1287    if ( World->version_magic == VERSION_MAGIC )
1288                fprintf( stderr, "Version %s\n", World->version );
1289    FreeXfrmWorld( World );
1290    exit (EW_FAILURE);
1291  }
1292 
1293  /* Read config file and configure the decimator */
1294  if (BaseConfigureXfrm(World, argv) != EW_SUCCESS)
1295  {
1296    logit("e", "%s: configure() failed \n", World->mod_name);
1297    FreeXfrmWorld( World );
1298    exit (EW_FAILURE);
1299  }
1300
1301  if ( World->xfrmParam->testMode )
1302  {
1303    logit("e", "%s terminating normally for test mode\n", World->mod_name);
1304    FreeXfrmWorld( World );
1305    exit (EW_SUCCESS);
1306  }
1307 
1308  /* We will put the SCNL index in front of the trace message, so we  *
1309   * don't have to look up the SCNL again at the other end of the queue. */
1310  inBufLen = MAX_TRACEBUF_SIZ + sizeof( double );
1311  if ( ! ( inBuf = (char *) malloc( (size_t) inBufLen ) ) )
1312  {
1313    logit( "e", "%s: Memory allocation failed - initial message buffer!\n",
1314        argv[0] );
1315    FreeXfrmWorld( World );
1316    exit( EW_FAILURE );
1317  }
1318  TracePkt = (TracePacket *) (inBuf + sizeof(TP_PREFIX));
1319 
1320  /* Attach to Input shared memory ring
1321   ************************************/
1322
1323  tport_attach (&regionIn, World->xfrmEWH->ringInKey);
1324  if (World->xfrmParam->debug) {
1325    logit ("", "%s: Attached to public memory region %s: %ld\n", World->mod_name,
1326           World->xfrmParam->ringIn, World->xfrmEWH->ringInKey);
1327  }
1328
1329  /* Attach to Output shared memory ring
1330   *************************************/
1331  if (World->xfrmEWH->ringOutKey == World->xfrmEWH->ringInKey) {
1332    World->regionOut = regionIn;
1333  } else {
1334    tport_attach (&(World->regionOut), World->xfrmEWH->ringOutKey);
1335    if (World->xfrmParam->debug)
1336      logit ("", "%s: Attached to public memory region %s: %ld\n", World->mod_name,
1337             World->xfrmParam->ringOut, World->xfrmEWH->ringOutKey);
1338  }
1339
1340 /* Specify logos of incoming waveforms
1341  *************************************/
1342  for( ilogo=0; ilogo<World->xfrmParam->nlogo; ilogo++ ) {
1343    logoWave[ilogo].instid = World->xfrmEWH->readInstId[ilogo];
1344    logoWave[ilogo].mod    = World->xfrmEWH->readModId[ilogo];
1345    logoWave[ilogo].type   = World->xfrmEWH->readMsgType[ilogo];
1346    if ( World->xfrmParam->debug ) 
1347      logit("", "%s: Logo[%d] = %d,%d,%d\n", World->mod_name, ilogo,
1348          logoWave[ilogo].instid, logoWave[ilogo].mod, logoWave[ilogo].type); 
1349  }
1350
1351  /* Specify logos of outgoing messages
1352   ************************************/
1353  World->hrtLogo.instid = World->xfrmEWH->myInstId;
1354  World->hrtLogo.mod    = World->xfrmEWH->myModId;
1355  World->hrtLogo.type   = World->xfrmEWH->typeHeartbeat;
1356  if ( World->xfrmParam->debug ) 
1357    logit("", "%s: hrtLogo = %d,%d,%d\n", World->mod_name,
1358        World->hrtLogo.instid, World->hrtLogo.mod, World->hrtLogo.type); 
1359
1360  World->errLogo.instid = World->xfrmEWH->myInstId;
1361  World->errLogo.mod    = World->xfrmEWH->myModId;
1362  World->errLogo.type   = World->xfrmEWH->typeError;
1363  if ( World->xfrmParam->debug ) 
1364    logit("", "%s: errLogo = %d,%d,%d\n", World->mod_name,
1365        World->errLogo.instid, World->errLogo.mod, World->errLogo.type); 
1366 
1367  SpecifyXfrmLogos();
1368
1369  /* Force a heartbeat to be issued in first pass thru main loop  */
1370  timeLastBeat = time (&timeNow) - World->xfrmParam->heartbeatInt - 1;
1371
1372  /* Flush the incoming transport ring */
1373  while (tport_getmsg (&regionIn, logoWave, (short)World->xfrmParam->nlogo, &logoMsg,
1374                       &sizeMsg, inBuf, inBufLen) != GET_NONE);
1375
1376  /* Create MsgQueue mutex */
1377  CreateMutex_ew();
1378
1379  /* Allocate the message Queue */
1380  initqueue (&(World->MsgQueue), QUEUE_SIZE, inBufLen);
1381 
1382
1383  /* Start decimator thread which will read messages from   *
1384   * the Queue, decimate them and write them to the OutRing */
1385  if (StartThreadWithArg (XfrmThread, (void *) World, (unsigned) 
1386                          THREAD_STACK, &tidXfrmor) == -1)
1387  {
1388    logit( "e", 
1389           "%s: Error starting thread.  Exiting.\n", World->mod_name);
1390    tport_detach (&regionIn);
1391   
1392    if (World->xfrmEWH->ringOutKey != World->xfrmEWH->ringInKey) {
1393       tport_detach (&(World->regionOut)); 
1394    }
1395    free( inBuf );
1396    FreeXfrmWorld();
1397    exit( EW_FAILURE );
1398  }
1399
1400  World->XfrmStatus = 0; /*assume the best*/
1401
1402  if ( World->completionFcn != NULL )
1403  {
1404    ret = World->completionFcn( -1 );
1405    if ( ret != EW_SUCCESS )
1406    {
1407      logit("e", "%s: Problem with completionFcn(-1) ret=%d\n", World->mod_name, ret );
1408    }
1409  }
1410
1411/*--------------------- setup done; start main loop -------------------------*/
1412  if ( World->xfrmParam->debug )
1413    logit("","%s: starting main loop\n", World->mod_name);
1414  while (tport_getflag (&regionIn) != TERMINATE  &&
1415         tport_getflag (&regionIn) != World->MyPid )
1416  {
1417    /* send module's heartbeat */
1418    if (time (&timeNow) - timeLastBeat >= World->xfrmParam->heartbeatInt) 
1419    {
1420      timeLastBeat = timeNow;
1421      StatusReport (World, World->xfrmEWH->typeHeartbeat, 0, ""); 
1422    }
1423
1424    if (World->XfrmStatus < 0)
1425    {
1426      logit ("t", 
1427             "%s: thread died. Exiting\n", World->mod_name);
1428      exit (EW_FAILURE);
1429    }
1430
1431    ret = tport_getmsg (&regionIn, logoWave, (short)World->xfrmParam->nlogo, 
1432                        &logoMsg, &sizeMsg, TracePkt->msg, MAX_TRACEBUF_SIZ);
1433
1434    /* Check return code; report errors */
1435    if (ret != GET_OK)
1436    {
1437      if (ret == GET_TOOBIG)
1438      {
1439        sprintf (msgText, "msg[%ld] i%d m%d t%d too long for target",
1440                 sizeMsg, (int) logoMsg.instid,
1441                 (int) logoMsg.mod, (int)logoMsg.type);
1442        StatusReport (World, World->xfrmEWH->typeError, ERR_TOOBIG, msgText);
1443        continue;
1444      }
1445      else if (ret == GET_MISS)
1446      {
1447        sprintf (msgText, "missed msg(s) i%d m%d t%d in %s",
1448                 (int) logoMsg.instid, (int) logoMsg.mod, 
1449                 (int)logoMsg.type, World->xfrmParam->ringIn);
1450        StatusReport (World, World->xfrmEWH->typeError, ERR_MISSMSG, msgText);
1451      }
1452      else if (ret == GET_NOTRACK)
1453      {
1454        sprintf (msgText, "no tracking for logo i%d m%d t%d in %s",
1455                 (int) logoMsg.instid, (int) logoMsg.mod, 
1456                 (int)logoMsg.type, World->xfrmParam->ringIn);
1457        StatusReport (World, World->xfrmEWH->typeError, ERR_NOTRACK, msgText);
1458      }
1459      else if (ret == GET_NONE)
1460      {
1461        sleep_ew(500);
1462        continue;
1463      }
1464      else if (World->xfrmParam->debug)
1465        logit( "", "%s: Unexpected return from tport_getsg: %d\n", 
1466            World->mod_name, ret );
1467    }
1468
1469    if ((ret = matchXfrmSCNL( TracePkt, logoMsg.type, World )) < -1 )
1470    {
1471      logit ("t", "%s: Call to matchSCNL failed with %d; exiting.\n", 
1472        World->mod_name, ret);
1473      FreeXfrmWorld();
1474      exit (EW_FAILURE);
1475    }
1476    else if ( ret == -1 )
1477    {
1478      /* Not an SCNL we want */
1479      if ( World->xfrmParam->processRejected ) 
1480      {
1481        if ( ProcessXfrmRejected( TracePkt, logoMsg, inBuf ) == EW_FAILURE )
1482        {
1483          logit ("t","%s: Error processing rejected packet; type:%d message.\n", 
1484                World->mod_name, logoMsg.type );
1485          FreeXfrmWorld();
1486          exit (EW_FAILURE);
1487        }
1488      }
1489      continue;
1490    }
1491   
1492    /* stick the SCNL number as an int at the front of the message */
1493    *((int*)inBuf) = ret; 
1494
1495    /* If necessary, swap bytes in the wave message */
1496    if( logoMsg.type == World->xfrmEWH->typeTrace2 )
1497        ret = WaveMsg2MakeLocal( &(TracePkt->trh2) );
1498    else if( logoMsg.type == World->xfrmEWH->typeTrace ) 
1499        ret = WaveMsgMakeLocal( &(TracePkt->trh) );
1500
1501    ret = PreprocessXfrmBuffer( TracePkt, logoMsg, inBuf );
1502
1503    if ( ret < -1 )
1504    {
1505      FreeXfrmWorld();
1506      exit (EW_FAILURE);
1507    }
1508    if ( ret == -1 )
1509    {
1510      continue;
1511    }
1512
1513    /* Queue retrieved msg */
1514    RequestMutex ();
1515    ret = enqueue (&(World->MsgQueue), inBuf, sizeMsg + sizeof(TP_PREFIX), logoMsg); 
1516    ReleaseMutex_ew ();
1517
1518    if (ret != 0)
1519    {
1520      if (ret == -1)
1521      {
1522        sprintf (msgText, 
1523                 "Message too large for queue; Lost message.");
1524        StatusReport (World, World->xfrmEWH->typeError, ERR_QUEUE, msgText);
1525        continue;
1526      }
1527      if (ret == -3) 
1528      {
1529        sprintf (msgText, "Queue full. Old messages lost.");
1530        StatusReport (World, World->xfrmEWH->typeError, ERR_QUEUE, msgText);
1531        continue;
1532      }
1533      logit("", "%s: Problem w/ queue: %d\n", World->mod_name, ret );
1534    } /* problem from enqueue */
1535
1536  } /* wait until TERMINATE is raised  */ 
1537
1538  if ( World->completionFcn != NULL ) /* send empty packet to signal termination */
1539  {
1540    RequestMutex (); 
1541    ret = enqueue (&(World->MsgQueue), inBuf, 0, logoMsg); 
1542    ReleaseMutex_ew ();
1543    ret = World->completionFcn( 0 );
1544    if ( ret != EW_SUCCESS )
1545    {
1546      logit("e", "%s: Problem with completionFcn(0) ret=%d\n", World->mod_name, ret );
1547    }
1548  }
1549
1550  /* Termination has been requested */
1551  tport_detach (&regionIn);
1552  if (World->xfrmEWH->ringOutKey != World->xfrmEWH->ringInKey) {
1553    tport_detach (&(World->regionOut));
1554  }
1555  free( inBuf );
1556  FreeXfrmWorld();
1557  logit ("t", "Termination requested; exiting!\n" );
1558  exit (EW_SUCCESS);
1559
1560}
Note: See TracBrowser for help on using the repository browser.