source: trunk/src/libsrc/util/xfrm.c @ 7513

Revision 7513, 47.8 KB checked in by baker, 6 months ago (diff)

cleanup warning: variable set but not used [-Wunused-but-set-variable]

  • Property svn:eol-style set to native
Line 
1/*
2 * xfrm.c: 
3 *
4 *
5 *  Initial version:
6 *     
7 */
8
9#include <stdio.h>
10#include <stdlib.h>
11#include <string.h>
12#include <sys/types.h>
13#include <time.h>
14#ifdef _WINNT
15#include <windows.h>
16#define mutex_t HANDLE
17#else
18#ifdef _SOLARIS
19#ifndef _LINUX
20#include <synch.h>      /* mutex's                                      */
21#endif
22#endif
23#endif
24
25#include "earthworm.h"
26#include "kom.h"
27#include "swap.h"
28#include "trheadconv.h" /* trheadconv()                                 */
29#include "xfrm.h"
30
31#define SCNL_INC     100      /* how many more are allocated each     */
32                                /*   time we run out                    */
33#define NUM_BASE_CMDS 12
34#define CMD_ID_GETSCNL       8
35#define CMD_ID_GETWAVESFROM 11
36
37static char cnull = '\0';
38static int  maxSta = 0;     /* number of scnls allocated   */
39static char **BaseCmd=NULL; /* array of command definitions */
40static char **ModCmds;      /* array of module-specific command defs */
41static int  CmdCount;       /* number of command definitions */
42static void **ParamTarget=NULL; /* array of addrs for value of matching commands */
43static void **ModParamTargets; /* ParamTargets for ModCmds */
44/* length of message buffer */
45static long MsgBufLen = (MAX_TRACEBUF_SIZ + sizeof (TP_PREFIX));
46
47
48static void InitializeParameters( XFRMWORLD* pXfrm )
49{
50  int i;
51
52  /*    Initialize members of the XFRMWORLD structure                    */
53  pXfrm->completionFcn          = NULL;
54
55  /*    Initialize members of the xfrmEWH structure                      */
56  pXfrm->xfrmEWH->myInstId = 0;
57  pXfrm->xfrmEWH->myModId  = 0;
58  pXfrm->xfrmEWH->typeError     = 0;
59  pXfrm->xfrmEWH->typeHeartbeat = 0;
60  pXfrm->xfrmEWH->ringInKey     = 0l;
61  pXfrm->xfrmEWH->ringOutKey    = 0l;
62  pXfrm->xfrmEWH->typeTrace     = 0;
63  pXfrm->xfrmEWH->typeTrace2    = 0;
64
65  /*    Initialize members of the xfrmParam structure                    */
66  for( i=0; i<MAX_LOGO; i++ ) {
67     pXfrm->xfrmParam->readInstName[i][0] = '\0';
68     pXfrm->xfrmParam->readModName[i][0] = '\0';
69     pXfrm->xfrmParam->readTypeName[i][0] = '\0';
70  }
71  pXfrm->xfrmParam->ringIn[0]      = '\0';
72  pXfrm->xfrmParam->ringOut[0]     = '\0';
73  pXfrm->xfrmParam->nlogo          = 0;
74  pXfrm->xfrmParam->heartbeatInt   = 15;
75  pXfrm->xfrmParam->debug          = 0;
76  pXfrm->xfrmParam->logSwitch      = 1;
77  pXfrm->xfrmParam->cleanStart     = 1;
78  pXfrm->xfrmParam->testMode       = 0;
79  pXfrm->xfrmParam->maxGap         = 1.5;
80  pXfrm->xfrmParam->getAllSCNLs    = 0;
81
82  InitializeXfrmParameters();
83 
84  return;
85}
86
87
88static int IsWild( char *str )
89{
90   if( strcmp( str, "*" ) == 0 ) return 1;
91   return 0;
92}
93
94/*
95 * Determines if the station name is valid.
96 * In:  pxfrm pointer to the Xfrm World structure
97 *      outputFlag true if output, false if input
98 *      str the station name
99 * Returns non-zero if valid, 0 if invalid
100 */
101static int IsValidSta(XFRMWORLD* pXfrm, int outputFlag, char *str) {
102    if( !str  ||  strlen(str) >= TRACE2_STA_LEN  ||  IsWild(str) ) {
103        if( !str ) str = &cnull;
104        logit ("e", "%s: Invalid %s station code: %s "
105                    "in GetSCNL cmd; exiting!\n",
106                    (outputFlag ? "output" : "input"),
107                    pXfrm->mod_name, str);
108        return 0;
109    }
110    return 1;
111}
112
113/*
114 * Determines if the channel name is valid.
115 * In:  pxfrm pointer to the Xfrm World structure
116 *      outputFlag true if output, false if input
117 *      str the channel name
118 * Returns non-zero if valid, 0 if invalid
119 */
120static int IsValidChan(XFRMWORLD* pXfrm, int outputFlag, char *str) {
121    if( !str  ||  strlen(str) >= TRACE2_CHAN_LEN  ||  IsWild(str)) {
122        if( !str ) str = &cnull;
123        logit ("e", "%s: Invalid %s channel code: %s "
124                    "in GetSCNL cmd; exiting!\n",
125                                (outputFlag ? "output" : "input"),
126                                pXfrm->mod_name, str);
127        return 0;
128    }
129    return 1;
130}
131
132/*
133 * Determines if the network name is valid.
134 * In:  pxfrm pointer to the Xfrm World structure
135 *      outputFlag true if output, false if input
136 *      str the network name
137 * Returns non-zero if valid, 0 if invalid
138 */
139static int IsValidNet(XFRMWORLD* pXfrm, int outputFlag, char *str) {
140    if( !str  ||  strlen(str) >= TRACE2_NET_LEN  ||  IsWild(str)) {
141        if( !str ) str = &cnull;
142        logit ("e", "%s: Invalid %s network code: %s "
143                    "in GetSCNL cmd; exiting!\n",
144                    (outputFlag ? "output" : "input"),
145                    pXfrm->mod_name, str);
146        return 0;
147    }
148    return 1;
149}
150
151/*
152 * Determines if the location name is valid.
153 * In:  pxfrm pointer to the Xfrm World structure
154 *      outputFlag true if output, false if input
155 *      str the location name
156 * Returns non-zero if valid, 0 if invalid
157 */
158static int IsValidLoc(XFRMWORLD* pXfrm, int outputFlag, char *str) {
159    if( !str  ||  strlen(str) >= TRACE2_LOC_LEN  ||  IsWild(str)) {
160        if( !str ) str = &cnull;
161        logit ("e", "%s: Invalid %s location code: %s "
162                    "in GetSCNL cmd; exiting!\n",
163                    (outputFlag ? "output" : "input"),
164                    pXfrm->mod_name, str);
165        return 0;
166    }
167    return 1;
168}
169
170/*
171 * Set the output SCNL
172 * In:  newSCNL the new SCNL
173 *      outSCNL the output SCNL or NULL if none
174 */
175static void SetOutSCNL(XFRMSCNL *newSCNL, SCNL *outSCNL) {
176
177  if ( outSCNL == NULL )
178    return;
179
180  strncpy( newSCNL->outSta,  outSCNL->sta,  TRACE2_STA_LEN );
181  newSCNL->outSta[TRACE2_STA_LEN - 1]   = '\0';
182  strncpy( newSCNL->outChan, outSCNL->chan, TRACE2_CHAN_LEN );
183  newSCNL->outChan[TRACE2_CHAN_LEN - 1] = '\0';
184  strncpy( newSCNL->outNet,  outSCNL->net,  TRACE2_NET_LEN );
185  newSCNL->outNet[TRACE2_NET_LEN - 1]   = '\0';
186  strncpy( newSCNL->outLoc,  outSCNL->loc,  TRACE2_LOC_LEN );
187  newSCNL->outLoc[TRACE2_LOC_LEN - 1]   = '\0';
188
189}
190
191/*
192 * Allocate the message buffer.
193 * Returns the message buffer
194 */
195static char* AllocateMsgBuf() {
196        return (char *) malloc ((size_t) MsgBufLen);
197}
198
199/*
200 * Get the XFRMSCNL for the station
201 * In:  pxfrm pointer to the Xfrm World structure
202 *      jSta the station index
203 * Returns the XFRMSCNL
204 */
205static XFRMSCNL* GetXFRMSCNLForSta(XFRMWORLD* pXfrm, int jSta) {
206        return (XFRMSCNL *)(pXfrm->scnls + jSta*(pXfrm->scnlRecSize));
207}
208
209/*
210 * Get the input message buffer for the station
211 * In:  pxfrm pointer to the Xfrm World structure
212 *      jSta the station index
213 * Returns the input message buffer
214 */
215static char* GetInMsgBufForSta(XFRMWORLD* pXfrm, int jSta) {
216        return GetXFRMSCNLForSta(pXfrm, jSta)->inMsgBuf;
217}
218
219/*
220 * Get the input buffer for the message buffer.
221 * In:  msgBuf the message buffer
222 * Returns the input buffer
223 */
224TracePacket* GetInBufForMsgBuf(char* msgBuf) {
225        return (TracePacket *)(msgBuf + sizeof(TP_PREFIX));
226}
227
228/*
229 * Get the input buffer for the station.
230 * In:  pxfrm pointer to the Xfrm World structure
231 *      jSta the station index
232 * Returns the input buffer
233 */
234TracePacket* GetInBufForSta(XFRMWORLD* pXfrm, int jSta) {
235        char* inMsgBuf = GetInMsgBufForSta(pXfrm, jSta);
236        return (TracePacket *)(inMsgBuf + sizeof(TP_PREFIX));
237}
238
239/*
240 * Get the wave bytes for the buffer.
241 * In:  buf the buffer
242 * Returns the wave bytes.
243 */
244char* GetWave(TracePacket* buf) {
245        return buf->msg + WAVE_BYTES_OFFSET;
246}
247
248/*
249 * Get the data size.
250 * In:  buf the buffer
251 * Return the data size.
252 */
253int GetDataSize(TracePacket* buf) {
254        return buf->trh2.datatype[1]-'0';
255}
256
257/*
258 * Compare the SCNLs.
259 * In:  scnl1 the first scnl
260 *      scnl2 the second scnl
261 * Return 0 if the SCNLs match, non-zero otherwise.
262 */
263int BaseCompareSCNL( XFRMSCNL *scnl1, XFRMSCNL *scnl2 ) {
264
265  TracePacket *buf1, *buf2;
266
267  /* Both SCNLs must exist */
268  if ( ( scnl1 == NULL ) || ( scnl2 ==NULL ) )
269    return 1;
270
271  /* Both must contain valid tracebufs; see BaseFilterXfrm() */
272  /*   status = -1, no data yet                              */
273  /*   status =  1, data is bad                              */
274  /*   status =  0, data is good                             */
275  if ( ( scnl1->status != 0 ) || ( scnl2->status != 0 ) )
276    return 2;
277
278  /* Both must have the same end time */
279  /* inEndtime = 0 is used in many places to mean no data */
280  if ( ( scnl1->inEndtime == 0.0 ) || ( scnl2->inEndtime == 0.0 ) ||
281       ( scnl1->inEndtime != scnl2->inEndtime ) )
282    return 3;
283
284  /* Both must have the same sample rate */
285  if ( scnl1->samprate != scnl2->samprate )
286    return 4;
287
288  /* Both must have the same data size (they have been converted to native type) */
289  if ( scnl1->datatype[1] != scnl2->datatype[1] )
290    return 5;
291
292  /* Both must have the same number of samples */
293  buf1 = GetInBufForMsgBuf( scnl1->inMsgBuf );
294  buf2 = GetInBufForMsgBuf( scnl2->inMsgBuf );
295  if ( buf1->trh2.nsamp != buf2->trh2.nsamp )
296    return 6;
297
298  return 0;
299
300}
301
302/*
303 * Compare the SCNL to the SCNL in the XFRMSCNL.
304 * In:  scnl the SCNL
305 *      xfrmSCNL the XFRMSCNL
306 * Returns 0 if the SCNL matches, non-zero otherwise
307 */
308static int CmpSCNL(SCNL *scnl, XFRMSCNL* xfrmSCNL) {
309        if (strcmp(xfrmSCNL->inSta, scnl->sta) != 0) {
310                return 1;
311        }
312        if (strcmp(xfrmSCNL->inChan, scnl->chan) != 0) {
313                return 2;
314        }
315        if (strcmp(xfrmSCNL->inNet, scnl->net) != 0) {
316                return 3;
317        }
318        if (strcmp(xfrmSCNL->inLoc, scnl->loc) != 0) {
319                return 4;
320        }
321        return 0;
322}
323
324/*
325 * Get the XFRMSCNL for the specified SCNL
326 * In:  pxfrm pointer to the Xfrm World structure
327 *      scnl the SCNL
328 * Out: jSta the station index
329 * Returns the XFRMSCNL or NULL if none
330 */
331static XFRMSCNL* GetXFRMSCNL(XFRMWORLD* pXfrm, SCNL *scnl, int *jSta) {
332        XFRMSCNL* xfrmSCNL = NULL;
333
334        for (*jSta = 0; *jSta < pXfrm->nSCNL; (*jSta)++) {
335                xfrmSCNL = GetXFRMSCNLForSta(pXfrm, *jSta);
336                if (CmpSCNL(scnl, xfrmSCNL) == 0) {
337                        return xfrmSCNL;
338                }
339        }
340        return NULL;
341}
342
343/*
344 * Add the input and output SCNL
345 * In:  pxfrm pointer to the Xfrm World structure
346 *      inSCNL the input SCNL
347 *      outSCNL the output SCNL or NULL if none
348 * Out: jSta the station index
349 * Returns 0 if add was successful, non-zero if error
350 */
351int AddSCNL( XFRMWORLD* pXfrm, SCNL *inSCNL, SCNL *outSCNL, int *jSta ) {
352
353  char *scnls;
354  int nSCNL;
355  XFRMSCNL *newSCNL;
356
357  /* Determine if NSCL already exists */
358  newSCNL = GetXFRMSCNL( pXfrm, inSCNL, jSta );
359  if ( newSCNL != NULL ) {
360    SetOutSCNL( newSCNL, outSCNL );
361    if ( pXfrm->xfrmParam->debug )
362      logit( "", "%s: sta[%d] outSCNL=<%s.%s.%s.%s>\n",
363                 pXfrm->mod_name, *jSta,
364                 newSCNL->outSta, newSCNL->outChan, newSCNL->outNet, newSCNL->outLoc );
365    return 0;
366  }
367
368  if ( pXfrm->nSCNL >= maxSta ) {
369    /* Need to allocate more */
370    maxSta += SCNL_INC;
371    if ( pXfrm->scnlRecSize < sizeof( XFRMSCNL ) ) {
372      logit( "e", "%s: invalid size of SCNL info record (%d); exiting!\n",
373                  pXfrm->mod_name, pXfrm->scnlRecSize );
374      return 1;
375    }
376    pXfrm->scnls = realloc( pXfrm->scnls, maxSta * pXfrm->scnlRecSize );
377    if ( pXfrm->scnls == NULL ) {
378      logit( "e", "%s: realloc for SCNL list failed; exiting!\n", pXfrm->mod_name );
379      return 2;
380    }
381  }
382  scnls = pXfrm->scnls;
383  nSCNL = pXfrm->nSCNL;
384  newSCNL = memset( (XFRMSCNL *) ( scnls + nSCNL * pXfrm->scnlRecSize ),
385                    0, pXfrm->scnlRecSize );
386  *jSta = nSCNL;
387
388  strncpy( newSCNL->inSta,  inSCNL->sta,  TRACE2_STA_LEN );
389  newSCNL->inSta[TRACE2_STA_LEN - 1]   = '\0';
390  strncpy( newSCNL->inChan, inSCNL->chan, TRACE2_CHAN_LEN );
391  newSCNL->inChan[TRACE2_CHAN_LEN - 1] = '\0';
392  strncpy( newSCNL->inNet,  inSCNL->net,  TRACE2_NET_LEN );
393  newSCNL->inNet[TRACE2_NET_LEN - 1]   = '\0';
394  strncpy( newSCNL->inLoc,  inSCNL->loc,  TRACE2_LOC_LEN );
395  newSCNL->inLoc[TRACE2_LOC_LEN - 1]   = '\0';
396  if ( pXfrm->xfrmParam->debug )
397    logit( "", "%s: sta[%d] SCNL=<%s.%s.%s.%s> (uninitialized)\n",
398               pXfrm->mod_name, *jSta,
399               newSCNL->inSta, newSCNL->inChan, newSCNL->inNet, newSCNL->inLoc );
400
401  SetOutSCNL( newSCNL, outSCNL );
402  if ( pXfrm->xfrmParam->debug )
403    if ( outSCNL != NULL )
404      logit( "", "%s: sta[%d] outSCNL=<%s.%s.%s.%s>\n",
405                 pXfrm->mod_name, *jSta,
406                 newSCNL->outSta, newSCNL->outChan, newSCNL->outNet, newSCNL->outLoc );
407
408  newSCNL->status = -1; /* uninitialized */
409  newSCNL->prevSta = -1;
410  newSCNL->nextSta = -1;
411  newSCNL->conversionFactor = TRACE2_NO_CONVERSION_FACTOR;
412  pXfrm->nSCNL++;
413  return 0;
414
415}
416
417/*
418 * Reads the SCNL
419 * In:  pxfrm pointer to the Xfrm World structure
420 *      outputFlag true if output, false if input
421 *      locFlag non-zero if location is provided, 0 otherwise
422 * Out: scnl the SCNL
423 * Returns 0 if read was successful, non-zero if error
424 */
425int ReadSCNL(XFRMWORLD* pXfrm, int outputFlag, int locFlag, SCNL *scnl) {
426        scnl->sta = k_str(); /* read station code */
427        if (!IsValidSta(pXfrm, outputFlag, scnl->sta)) {
428                return 1;
429        }
430        scnl->chan = k_str(); /* read channel code */
431        if (!IsValidChan(pXfrm, outputFlag, scnl->chan)) {
432                return 2;
433        }
434        scnl->net = k_str(); /* read network code */
435        if (!IsValidNet(pXfrm, outputFlag, scnl->net)) {
436                return 3;
437        }
438
439    if (locFlag) {  /* read input location code */
440        scnl->loc = k_str(); /* read location code */
441        if (!IsValidLoc(pXfrm, outputFlag, scnl->loc))
442        {
443                return 4;
444        }
445    }
446    else {      /* use default blank location code */
447        scnl->loc = LOC_NULL_STRING;
448    }
449    return 0;
450}
451
452static int BaseProcessXfrmCommand( XFRMWORLD* pXfrm, int cmd_id, char *com ) {
453  switch ( cmd_id ) {
454  case CMD_ID_GETSCNL: /* GetSCNL or GetSCN */
455  {
456        SCNL inSCNL1, inSCNL2;
457        SCNL outSCNL;
458        XFRMSCNL *xfrmSCNL;
459        double conversionFactor;
460        int jSta1, jSta2;
461        int locFlag = strcmp(com,"GetSCN");
462        int outputFlag = 0;  /* input SCNL */
463        /* exit if error reading the input SCNL */
464        if (ReadSCNL(pXfrm, outputFlag, locFlag, &inSCNL1) != 0) {
465                return -2;
466        }
467        if (pXfrm->nInSCNL > 1) {
468                if (ReadSCNL(pXfrm, outputFlag, locFlag, &inSCNL2) != 0) {
469                        return -2;
470                }
471        }
472        outputFlag = 1;  /* output SCNL */
473        /* exit if error reading the output SCNL */
474        if (ReadSCNL(pXfrm, outputFlag, locFlag, &outSCNL) != 0) {
475                return -2;
476        }
477        /* exit if adding the input and output SCNL */
478    if (AddSCNL(pXfrm, &inSCNL1, &outSCNL, &jSta1) != 0) {
479        return -2;
480    }
481
482        /* read conversion factor if present */
483        conversionFactor = k_val();
484        if (!k_err()) { /* if conversion factor present */
485                xfrmSCNL = GetXFRMSCNLForSta(pXfrm, jSta1);
486                xfrmSCNL->conversionFactor = conversionFactor;
487        }
488
489    if (pXfrm->nInSCNL > 1) {
490        if (AddSCNL(pXfrm, &inSCNL2, NULL, &jSta2) != 0) {
491                return -2;
492        }
493        xfrmSCNL = GetXFRMSCNLForSta(pXfrm, jSta1);
494        if (xfrmSCNL == NULL) {
495                return -1;
496        }
497        xfrmSCNL->nextSta = jSta2;
498        if ( pXfrm->xfrmParam->debug )
499            logit( "", "%s: sta[%d] nextSta=%d\n", pXfrm->mod_name, jSta1, jSta2 );
500        xfrmSCNL = GetXFRMSCNLForSta(pXfrm, jSta2);
501        if (xfrmSCNL == NULL) {
502                return -1;
503        }
504        xfrmSCNL->prevSta = jSta1;
505        if ( pXfrm->xfrmParam->debug )
506            logit( "", "%s: sta[%d] prevSta=%d\n", pXfrm->mod_name, jSta2, jSta1 );
507    }
508    break;
509  }
510  case CMD_ID_GETWAVESFROM: /* GetWavesFrom */
511  {
512        char *str;
513        int ilogo = pXfrm->xfrmParam->nlogo;
514        if(ilogo+1 >= MAX_LOGO) 
515    {
516       logit( "e","%s: Too many GetWavesFrom commands; "
517              "max=%d; exiting!\n", pXfrm->mod_name, MAX_LOGO );
518       return -2;
519    }
520    if((str = k_str()) != NULL) 
521      strcpy(pXfrm->xfrmParam->readInstName[ilogo], str);
522
523    if((str = k_str()) != NULL) 
524      strcpy(pXfrm->xfrmParam->readModName[ilogo], str);
525
526    if((str = k_str()) != NULL)
527    {
528      if( strcmp(str,"TYPE_TRACEBUF" ) != 0 &&
529          strcmp(str,"TYPE_TRACEBUF2") != 0    ) 
530      {
531         logit( "e","%s: GetWavesFrom: invalid msg type: %s "
532                    "(must be TYPE_TRACEBUF or TYPE_TRACEBUF2); "
533                    "exiting!\n", pXfrm->mod_name, str );
534         return -2;
535      }
536      strcpy(pXfrm->xfrmParam->readTypeName[ilogo], str);
537    }
538    pXfrm->xfrmParam->nlogo++;
539    break;
540  }
541  default:
542    return ProcessXfrmCommand( cmd_id, com );
543  }
544  return cmd_id;
545}
546
547/*      Function: BaseReadXfrmConfig                                            */
548static int BaseReadXfrmConfig (char *configfile, XFRMWORLD* pXfrm )
549{
550  char                  *init=NULL;
551  /* init flags, one byte for each required command */
552  int                   nmiss;
553  /* number of required commands that were missed   */
554  char                  *com;
555  char                  *str;
556  int                   nfiles;
557  int                   success;
558  int                   i;
559  /* Assume failure (for goto CONFIG_FAILURE) */
560  int                   ret=EW_FAILURE;
561
562  BaseCmd = calloc( sizeof(char*), CmdCount );
563  ParamTarget = calloc( sizeof(void*), CmdCount );
564  init = malloc( CmdCount );
565  if ( BaseCmd==NULL || ParamTarget==NULL || init==NULL) {
566    logit ("e",
567             "%s: Allocation failure reading config file %s; exiting!\n",
568             pXfrm->mod_name, configfile);
569    goto CONFIG_FAILURE;
570  }
571  BaseCmd[0] = "RSMyModId";
572  ParamTarget[0] = &(pXfrm->xfrmParam->myModName);
573  BaseCmd[1] = "RSInRing";
574  ParamTarget[1] = &(pXfrm->xfrmParam->ringIn);
575  BaseCmd[2] = "RSOutRing";
576  ParamTarget[2] = &(pXfrm->xfrmParam->ringOut);
577  BaseCmd[3] = "RIHeartBeatInterval";
578  ParamTarget[3] = &(pXfrm->xfrmParam->heartbeatInt);
579  BaseCmd[4] = "RSLogFile";
580  ParamTarget[4] = &(pXfrm->xfrmParam->logSwitch);
581  BaseCmd[5] = "RVMaxGap";
582  ParamTarget[5] = &(pXfrm->xfrmParam->maxGap);
583  BaseCmd[6] = "OFTestMode";
584  ParamTarget[6] = &(pXfrm->xfrmParam->testMode);
585  BaseCmd[7] = "OFDebug";
586  ParamTarget[7] = &(pXfrm->xfrmParam->debug);
587  BaseCmd[CMD_ID_GETSCNL] = "RXGetSCNL";
588  BaseCmd[9] = "OFProcessRejected";
589  ParamTarget[9] = &(pXfrm->xfrmParam->processRejected);
590  BaseCmd[10] = "OFGetAllSCNLs";
591  ParamTarget[10] = &(pXfrm->xfrmParam->getAllSCNLs);
592  BaseCmd[CMD_ID_GETWAVESFROM] = "RXGetWavesFrom";
593
594  for ( i=NUM_BASE_CMDS; i<CmdCount; i++ ) {
595    BaseCmd[i] = ModCmds[i-NUM_BASE_CMDS];
596    ParamTarget[i] = ModParamTargets[i-NUM_BASE_CMDS];
597  }
598
599  /* Set to zero one init flag for each required command */
600  for (i = 0; i < CmdCount; i++)
601    init[i] = 0;
602
603  /* Open the main configuration file
604   **********************************/
605  nfiles = k_open (configfile); 
606  if (nfiles == 0) {
607    logit ("e",
608             "%s: Error opening command file <%s>; exiting!\n",
609             pXfrm->mod_name, configfile);
610    goto CONFIG_FAILURE;
611  }
612
613  /* Process all command files
614   ***************************/
615  while (nfiles > 0)   /* While there are command files open */
616  {
617    while (k_rd ())        /* Read next line from active file  */
618    { 
619      com = k_str ();         /* Get the first token from line */
620
621    /* Ignore blank lines & comments
622     *******************************/
623      if (!com) continue;
624      if (com[0] == '#') continue;
625
626      /* Open a nested configuration file */
627      if (com[0] == '@') 
628      {
629        success = nfiles + 1;
630        nfiles  = k_open (&com[1]);
631        if (nfiles != success) {
632          logit ("e",
633                   "%s: Error opening command file <%s>; exiting!\n",
634                   pXfrm->mod_name, &com[1]);
635          goto CONFIG_FAILURE;
636        }
637        continue;
638      }
639
640      if ( strcmp( com, "GetSCN" ) == 0 ) /* Set com to text of GetSCNL */
641        com = BaseCmd[CMD_ID_GETSCNL]+2;
642     
643      ret = -1;
644      for ( i=0; i<CmdCount; i++ )
645      {
646        if ( strcmp( com, BaseCmd[i]+2 ) == 0 ) /*(k_its (BaseCmd[i]+2)) */
647        {
648          char type = BaseCmd[i][1];
649          /* char kind = BaseCmd[i][0]; */
650          switch ( type ) {
651          case 'S': /* String */
652            if ((str = k_str ()) != NULL )
653            {
654              strcpy( ParamTarget[i], str );
655/*          } else {    */
656/*            kind = 0; */
657            }
658            break;
659          case 'I': /* Integer */
660            *((int*)(ParamTarget[i])) = k_long();
661            break;
662          case 'V': /* Value ? */
663            *((double*)(ParamTarget[i])) = k_val();
664            break;
665          case 'F': /* Flag */
666            *((int*)(ParamTarget[i])) = 1;
667          default: /* Unknown; user-defined? */
668            ret = BaseProcessXfrmCommand( pXfrm, i, com );
669            if ( ret >= 0 )
670              i = ret;
671            break;
672          }
673          break;
674        }
675      }
676      if ( i >= CmdCount )
677      {
678        ret = BaseProcessXfrmCommand( pXfrm, -1, com );
679      } else {
680        ret = i;
681      }
682      if ( ret >= 0 )
683      { 
684        if ( pXfrm->xfrmParam->debug )
685          logit("","%s: command '%s' accepted\n",pXfrm->mod_name, com);
686        init[ret] = 1;
687      } else if ( ret == -1 ) {
688        logit ("e", "%s: <%s> Unknown command in <%s>.\n", 
689               pXfrm->mod_name, com, configfile);
690        continue;
691      } else if ( ret < -1 ) {
692          logit ("e",
693                   "%s: Bad <%s> command in <%s>; exiting!\n",
694                   pXfrm->mod_name, com, configfile);
695          goto CONFIG_FAILURE;
696      }
697
698   /* See if there were any errors processing the command */
699      if ( k_err() ) {
700        logit ("e",
701                 "%s: Bad <%s> command in <%s>; exiting!\n",
702                 pXfrm->mod_name, com, configfile);
703        goto CONFIG_FAILURE;
704      }
705
706    } /** end while k_rd() **/
707
708    nfiles = k_close ();
709
710  } /** end while nfiles **/
711
712/* After all files are closed, check init flags for missed commands */
713  nmiss = 0;
714  for ( i = 0; i < CmdCount; i++ )
715    if ( BaseCmd[i][0]=='R' && !init[i] )
716      nmiss++;
717
718  if ( nmiss > 0 ) {
719    logit ("e", "%s: ERROR, no ", pXfrm->mod_name);
720    for(i = 0; i < CmdCount; i++) if (!init[i]) logit("e", "<%s> ", BaseCmd[i]+2 );
721
722    logit ("e", "command%s in <%s>; exiting!\n", (nmiss==1 ? "" : "s"), configfile);
723    goto CONFIG_FAILURE;
724  }
725
726  ret = ReadXfrmConfig( init );
727
728/* ret is initialized to EW_FAILURE */
729CONFIG_FAILURE:
730  free( BaseCmd );
731  free( ParamTarget );
732  free( init );
733  return ret;
734}
735
736static int BaseReadXfrmEWH( XFRMWORLD *pXfrm )
737{
738  int i;
739
740  if ( GetLocalInst( &(pXfrm->xfrmEWH->myInstId)) != 0 )
741  {
742    logit("e", "%s: Error getting myInstId for %d.\n", 
743          pXfrm->mod_name, (int) pXfrm->xfrmEWH->myInstId);
744    return EW_FAILURE;
745  }
746 
747  if ( GetModId( pXfrm->xfrmParam->myModName, &(pXfrm->xfrmEWH->myModId)) != 0 )
748  {
749    logit("e", "%s: Error getting myModId for %s.\n",
750           pXfrm->mod_name, pXfrm->xfrmParam->myModName );
751    return EW_FAILURE;
752  }
753
754  for (i=0; i<pXfrm->xfrmParam->nlogo; i++ ) {
755    if ( GetInst( pXfrm->xfrmParam->readInstName[i], &(pXfrm->xfrmEWH->readInstId[i])) != 0)
756    {
757      logit("e", "%s: Error getting readInstId for %s.\n",
758             pXfrm->mod_name, pXfrm->xfrmParam->readInstName[i] );
759      return EW_FAILURE;
760    }
761 
762    if ( GetModId( pXfrm->xfrmParam->readModName[i], &(pXfrm->xfrmEWH->readModId[i])) != 0 )
763    {
764      logit("e", "%s: Error getting readModName for %s.\n",
765             pXfrm->mod_name, pXfrm->xfrmParam->readModName[i] );
766      return EW_FAILURE;
767    }
768
769    if ( GetType( pXfrm->xfrmParam->readTypeName[i], &(pXfrm->xfrmEWH->readMsgType[i])) != 0 )
770    {
771      logit("e", "%s: Error getting readMsgType for %s.\n",
772             pXfrm->mod_name, pXfrm->xfrmParam->readTypeName[i] );
773      return EW_FAILURE;
774    }
775  }
776
777  /* Look up keys to shared memory regions */
778  if ((pXfrm->xfrmEWH->ringInKey = GetKey (pXfrm->xfrmParam->ringIn)) == -1) 
779  {
780    logit("e", "%s:  Invalid input ring name <%s>; exiting!\n", 
781           pXfrm->mod_name, pXfrm->xfrmParam->ringIn);
782    return EW_FAILURE;
783  }
784
785  if ((pXfrm->xfrmEWH->ringOutKey = GetKey (pXfrm->xfrmParam->ringOut) ) == -1) 
786  {
787    logit("e", "%s:  Invalid output ring name <%s>; exiting!\n", 
788          pXfrm->mod_name, pXfrm->xfrmParam->ringOut);
789    return EW_FAILURE;
790  }
791
792  /* Look up message types of interest */
793  if (GetType ("TYPE_HEARTBEAT", &(pXfrm->xfrmEWH->typeHeartbeat)) != 0) 
794  {
795    logit("e", "%s: Invalid message type <TYPE_HEARTBEAT>; exiting!\n", 
796            pXfrm->mod_name);
797    return EW_FAILURE;
798  }
799
800  if (GetType ("TYPE_ERROR", &(pXfrm->xfrmEWH->typeError)) != 0) 
801  {
802    logit("e", "%s: Invalid message type <TYPE_ERROR>; exiting!\n", 
803            pXfrm->mod_name);
804    return EW_FAILURE;
805  } 
806
807  if (GetType ("TYPE_TRACEBUF", &(pXfrm->xfrmEWH->typeTrace)) != 0) 
808  {
809    logit("e", "decimate: Invalid message type <TYPE_TRACEBUF>; exiting!\n");
810    return EW_FAILURE;
811  }
812
813  if (GetType ("TYPE_TRACEBUF2", &(pXfrm->xfrmEWH->typeTrace2)) != 0) 
814  {
815    logit("e", "decimate: Invalid message type <TYPE_TRACEBUF2>; exiting!\n");
816    return EW_FAILURE;
817  }
818
819  return ReadXfrmEWH();
820
821} 
822
823static int BaseConfigureXfrm(XFRMWORLD *pXfrm, char **argv)
824{
825  /* Set initial values of WORLD structure */
826  InitializeParameters( pXfrm );
827
828  /* Initialize name of log-file & open it 
829   ***************************************/
830  logit_init (argv[1], 0, MAXMESSAGELEN, 1);
831
832  /* Read config file and configure the decimator */
833  if (BaseReadXfrmConfig(argv[1], pXfrm) != EW_SUCCESS)
834  {
835    logit("e", "%s: failed reading config file <%s>\n", pXfrm->mod_name, argv[1]);
836    return EW_FAILURE;
837  }
838  logit ("" , "Read command file <%s>\n", argv[1]);
839
840  /* Look up important info from earthworm.h tables
841   ************************************************/
842  if (BaseReadXfrmEWH(pXfrm) != EW_SUCCESS)
843  {
844    logit("e", "%s: Call to ReadEWH failed \n", pXfrm->mod_name );
845    return EW_FAILURE;
846  }
847
848  /* Reinitialize logit to the desired logging level
849   ***********************************************/
850  logit_init (argv[1], 0, MAXMESSAGELEN, pXfrm->xfrmParam->logSwitch);
851 
852  /* Get our process ID
853   **********************/
854  if ((pXfrm->MyPid = getpid ()) == -1)
855  {
856    logit ("e", "%s: Call to getpid failed. Exiting.\n", pXfrm->mod_name);
857    return (EW_FAILURE);
858  }
859
860  return ConfigureXfrm();
861}
862
863int matchXfrmSCNL( TracePacket* pPkt, unsigned char msgtype, XFRMWORLD* pXfrm )
864{
865  int   i;
866
867/* Look for match in TYPE_TRACEBUF2 packet
868 *****************************************/
869  if( msgtype == pXfrm->xfrmEWH->typeTrace2 )
870  {
871     /* This should NEVER happen;
872        scblRecSize had to have been set to allocate SCNL records */
873     if ( pXfrm->xfrmParam->getAllSCNLs == 0 && pXfrm->scnlRecSize < sizeof(XFRMSCNL) )
874     {
875       logit ("e", "%s: invalid size of SCNL info record (%d); exiting!\n", pXfrm->mod_name, pXfrm->scnlRecSize );
876       return -3;
877     }
878
879     for(i = 0; i < pXfrm->nSCNL; i++ )
880     {
881       XFRMSCNL *testSCNL = (XFRMSCNL *)(pXfrm->scnls + i*(pXfrm->scnlRecSize));
882       /* try to match explicitly */
883       if ((strcmp(pPkt->trh2.sta,  testSCNL->inSta)  == 0) &&
884           (strcmp(pPkt->trh2.chan, testSCNL->inChan) == 0) &&
885           (strcmp(pPkt->trh2.net,  testSCNL->inNet)  == 0) &&
886           (strcmp(pPkt->trh2.loc,  testSCNL->inLoc)  == 0)    )
887         return( i );
888     }
889     
890     if ( pXfrm->xfrmParam->getAllSCNLs ) {
891        /* Try to add new SCNL, then "find" it if successful */
892        SCNL newSCNL;
893        memset( &newSCNL, 0, sizeof(newSCNL) );
894
895        strncpy( newSCNL.sta,  pPkt->trh2.sta,  TRACE2_STA_LEN );
896        newSCNL.sta[TRACE2_STA_LEN - 1]   = '\0';
897        strncpy( newSCNL.chan, pPkt->trh2.chan, TRACE2_CHAN_LEN );
898        newSCNL.chan[TRACE2_CHAN_LEN - 1] = '\0';
899        strncpy( newSCNL.net,  pPkt->trh2.net,  TRACE2_NET_LEN );
900        newSCNL.net[TRACE2_NET_LEN - 1]   = '\0';
901        strncpy( newSCNL.loc,  pPkt->trh2.loc,  TRACE2_LOC_LEN );
902        newSCNL.loc[TRACE2_LOC_LEN - 1]   = '\0';
903        if ( AddSCNL( pXfrm, &newSCNL, &newSCNL, &i) == 0 )
904                return matchXfrmSCNL( pPkt, msgtype, pXfrm );
905     }
906     return -1;  /* no match in SCNL for TYPE_TRACEBUF2 */
907  }
908
909/* Look for match in TYPE_TRACEBUF packet
910 ****************************************/
911  else if( msgtype == pXfrm->xfrmEWH->typeTrace )
912  {
913     /* This should NEVER happen;
914        scblRecSize had to have been set to allocate SCNL records */
915     if ( pXfrm->xfrmParam->getAllSCNLs == 0 && pXfrm->scnlRecSize < sizeof(XFRMSCNL) )
916     {
917       logit ("e", "%s: invalid size of SCNL info record (%d); exiting!\n", pXfrm->mod_name, pXfrm->scnlRecSize );
918       return -3;
919     }
920
921     for(i = 0; i < pXfrm->nSCNL; i++ )
922     {
923       XFRMSCNL *testSCNL = (XFRMSCNL *)(pXfrm->scnls + i*(pXfrm->scnlRecSize));
924       /* try to match explicitly */
925       if ((strcmp(pPkt->trh.sta,   testSCNL->inSta)  == 0) &&
926           (strcmp(pPkt->trh.chan,  testSCNL->inChan) == 0) &&
927           (strcmp(pPkt->trh.net,   testSCNL->inNet)  == 0) &&
928           (strcmp(LOC_NULL_STRING, testSCNL->inLoc)  == 0)    )
929         return( i );
930     }
931     if ( pXfrm->xfrmParam->getAllSCNLs ) {
932        /* Try to add new SCNL, then "find" it if successful */
933        SCNL newSCNL;
934        memset( &newSCNL, 0, sizeof(newSCNL) );
935       
936        strncpy( newSCNL.sta,  pPkt->trh2.sta,  TRACE2_STA_LEN );
937        newSCNL.sta[TRACE2_STA_LEN - 1]   = '\0';
938        strncpy( newSCNL.chan, pPkt->trh2.chan, TRACE2_CHAN_LEN );
939        newSCNL.chan[TRACE2_CHAN_LEN - 1] = '\0';
940        strncpy( newSCNL.net,  pPkt->trh2.net,  TRACE2_NET_LEN );
941        newSCNL.net[TRACE2_NET_LEN - 1]   = '\0';
942        strncpy( newSCNL.loc,  LOC_NULL_STRING, TRACE2_LOC_LEN );
943        newSCNL.loc[TRACE2_LOC_LEN - 1]   = '\0';
944        if ( AddSCNL( pXfrm, &newSCNL, &newSCNL, &i) == 0 )
945                return matchXfrmSCNL( pPkt, msgtype, pXfrm );
946     }
947     return( -1 );  /* no match in SCN for TYPE_TRACEBUF */
948  }
949
950/* Unknown Message Type
951 **********************/
952  if ( pXfrm->xfrmParam->debug ) 
953    logit("", "%s: Unknown message type %d\n", pXfrm->mod_name, msgtype);
954 
955  return( -2 );
956}
957
958
959/*      Function: SubtusReport                                          */
960void StatusReport( XFRMWORLD *pXfrm, unsigned char type, short code, 
961                   char* message )
962{
963  char          outMsg[MAXMESSAGELEN];  /* The outgoing message.        */
964  time_t        msgTime;        /* Time of the message.                 */
965
966  /*  Get the time of the message                                     */
967  time( &msgTime );
968
969  /*  Build & process the message based on the type                   */
970    if ( pXfrm->xfrmEWH->typeHeartbeat == type )
971    {
972      sprintf( outMsg, "%ld %ld\n", (long) msgTime, (long) pXfrm->MyPid );
973     
974      /*Write the message to the output region                          */
975      if ( tport_putmsg( &(pXfrm->regionOut), &(pXfrm->hrtLogo), 
976                         (long) strlen( outMsg ), outMsg ) != PUT_OK )
977      {
978        /*     Log an error message                                    */
979        logit( "t", "%s: Failed to send a heartbeat message (%d).\n",
980               pXfrm->mod_name, code );
981      }
982    }
983    else
984    {
985      if ( message ) {
986        sprintf( outMsg, "%ld %hd %s\n", (long) msgTime, code, message );
987        logit("t","Error:%d (%s)\n", code, message );
988      }
989      else {
990        sprintf( outMsg, "%ld %hd\n", (long) msgTime, code );
991        logit("t","Error:%d (No description)\n", code );
992      }
993
994      /*Write the message to the output region                         */
995      if ( tport_putmsg( &(pXfrm->regionOut), &(pXfrm->errLogo), 
996                         (long) strlen( outMsg ), outMsg ) != PUT_OK )
997      {
998        /*     Log an error message                                    */
999        logit( "t", "%s: Failed to send an error message (%d).\n",
1000               pXfrm->mod_name, code );
1001      }
1002     
1003    }
1004}
1005
1006/*      Function: BaseXfrmThread                                            */
1007thr_ret BaseXfrmThread (void* args) {
1008  XFRMWORLD     *World = ( XFRMWORLD *) args;
1009  char          *inMsgBuf;
1010  int            jSta;
1011  XFRMSCNL      *pSCNL;
1012  DATA           peekData;
1013  int            ret;
1014  MSG_LOGO       reclogo;       /* logo of retrieved message     */
1015  TracePacket   *inBuf;       /* pointer to raw trace message  */
1016  TracePacket   *outBuf;        /* pointer to debiased trace message  */
1017
1018  if (!World->useInBufPerSCNL) {
1019          inMsgBuf = AllocateMsgBuf();
1020  } else {
1021          inMsgBuf = NULL;
1022  }
1023
1024  /* set up input buffer for each SCNL */
1025  for(jSta = 0; jSta < World->nSCNL; jSta++ )
1026  {
1027          pSCNL = GetXFRMSCNLForSta(World, jSta);
1028          if (inMsgBuf != NULL) {
1029                  pSCNL->inMsgBuf = inMsgBuf;
1030          } else {
1031                  pSCNL->inMsgBuf = AllocateMsgBuf();
1032                  if (pSCNL->inMsgBuf == NULL)
1033                  {
1034                    logit ("e", "%s: Cannot allocate input buffer\n", World->mod_name);
1035                    World->XfrmStatus = -1;
1036                    KillSelfThread();
1037                  }
1038          }
1039  }
1040
1041  outBuf    = (TracePacket *) AllocateMsgBuf();
1042  if (outBuf == NULL)
1043  {
1044    logit ("e", "%s: Cannot allocate output buffer\n", World->mod_name);
1045    World->XfrmStatus = -1;
1046    KillSelfThread();
1047  }
1048
1049  /* Tell the main thread we're feeling ok */
1050  World->XfrmStatus = 0;
1051
1052  while (1)
1053  {
1054    /* Get top message from the MsgQueue */
1055    RequestMutex ();
1056    ret = -1;
1057    peekData = peekNextElement(&(World->MsgQueue));
1058    if (peekData != NULL)
1059    {
1060        /* Extract the SCNL number; recall, it was pasted as an int on the front
1061         * of the message by the main thread */
1062        jSta = *((int*) peekData);
1063        inMsgBuf = GetInMsgBufForSta(World, jSta);
1064        inBuf  = GetInBufForMsgBuf(inMsgBuf);
1065
1066        ret = dequeue (&(World->MsgQueue), inMsgBuf, &MsgBufLen, &reclogo);
1067    }
1068    ReleaseMutex_ew ();
1069   
1070    if (ret < 0)
1071    {                                 /* empty queue */
1072      sleep_ew (500);
1073      continue;
1074    }
1075
1076    if ( World->completionFcn != NULL && MsgBufLen==0 )
1077    {
1078      logit("","%s: thread knows to terminate\n", World->mod_name);
1079      World->completionFcn( 1 );
1080      return THR_NULL_RET;
1081    }
1082   
1083    if (FilterXfrm( World, inBuf, jSta, reclogo.type, outBuf ) ==
1084        EW_FAILURE)
1085    {
1086      logit("t", "%s: error from FilterXfrm; exiting\n", World->mod_name);
1087      World->XfrmStatus = -1;
1088      KillSelfThread();
1089      return THR_NULL_RET; /* should never reach here */
1090    }
1091
1092  } /* while (1) - message dequeuing process */
1093  return THR_NULL_RET; /* should never reach here unless there is a break in the continuous loop */
1094
1095}
1096
1097int BaseXfrmResetSCNL( XFRMSCNL *pSCNL, TracePacket *inBuf, XFRMWORLD* pWorld ) {
1098
1099  pSCNL->inEndtime = 0.0;    /* used to identify gaps */
1100 
1101  /* Save params that mustn't change over span of packets */
1102  strcpy( pSCNL->datatype, inBuf->trh2.datatype );
1103  pSCNL->samprate = inBuf->trh2.samprate;
1104 
1105  return XfrmResetSCNL( pSCNL, inBuf, pWorld );
1106
1107}
1108
1109int BaseFilterXfrm (XFRMWORLD* pDb, TracePacket *inBuf, int jSta, 
1110                    unsigned char msgtype, TracePacket *outBuf)
1111{
1112  XFRMSCNL *pSCNL;
1113  int ret;
1114 
1115  pSCNL = (XFRMSCNL*)(pDb->scnls + (jSta * pDb->scnlRecSize));
1116 
1117  /* Create SCNL info field if need be */
1118  if ( pSCNL->status < 0 ) 
1119  {
1120    ret = XfrmInitSCNL( pDb, pSCNL ); 
1121    if ( ret != EW_SUCCESS )
1122    {
1123      return ret;
1124    }
1125    pSCNL->status = 0;
1126    if ( pDb->xfrmParam->debug )
1127      logit( "", "%s: sta[%d] status=0 (initialized)\n", pDb->mod_name, jSta );
1128  }
1129  else if ( pSCNL->status )
1130  {
1131    return EW_WARNING;
1132  }
1133
1134  /* If it's tracebuf, make it look like a tracebuf2 message */
1135  if( msgtype == pDb->xfrmEWH->typeTrace ) TrHeadConv( &(inBuf->trh) );
1136 
1137  if (pDb->xfrmParam->debug)
1138    logit("t", "%s: enter filter with <%s.%s.%s.%s> start: %lf\n",
1139          pDb->mod_name,
1140          inBuf->trh2.sta, inBuf->trh2.chan, inBuf->trh2.net, inBuf->trh2.loc,
1141          inBuf->trh2.starttime );
1142 
1143  /* Check for useable data types: we only handle shorts and longs for now */
1144  if ( (inBuf->trh2.datatype[0] != 's' && inBuf->trh2.datatype[0] != 'i') ||
1145       (inBuf->trh2.datatype[1] != '2' && inBuf->trh2.datatype[1] != '4') )
1146  {
1147    logit("t","%s: unusable datatype <%s> from <%s.%s.%s.%s>; skipping\n",
1148          pDb->mod_name,
1149          inBuf->trh2.datatype,
1150          inBuf->trh2.sta, inBuf->trh2.chan, inBuf->trh2.net, inBuf->trh2.loc);
1151    return EW_WARNING;
1152  }
1153
1154  /* If we have previous data, check for data gap */
1155  if ( pSCNL->inEndtime != 0.0 )
1156  {
1157    if ( (inBuf->trh2.starttime - pSCNL->inEndtime) * inBuf->trh2.samprate > 
1158         pDb->xfrmParam->maxGap )
1159    {
1160      logit("t","%s: gap in data for <%s.%s.%s.%s>:\n\tlast end: %lf this start: %lf; resetting\n",
1161            pDb->mod_name,
1162            pSCNL->inSta, pSCNL->inChan, pSCNL->inNet, pSCNL->inLoc, 
1163            pSCNL->inEndtime, inBuf->trh2.starttime);
1164      if ( BaseXfrmResetSCNL( (XFRMSCNL*)pSCNL, inBuf, (XFRMWORLD*)pDb ) != EW_SUCCESS )
1165      {
1166        pSCNL->status = 1;
1167        return EW_FAILURE;
1168      }
1169    }
1170    else if (inBuf->trh2.starttime < pSCNL->inEndtime)
1171    {
1172      logit("t","%s: overlapping times for <%s.%s.%s.%s>:\n"
1173            "\tlast end: %lf this start: %lf; resetting\n",
1174            pDb->mod_name,
1175            pSCNL->inSta, pSCNL->inChan, pSCNL->inNet, pSCNL->inLoc, 
1176            pSCNL->inEndtime, inBuf->trh2.starttime);
1177      if ( BaseXfrmResetSCNL( (XFRMSCNL*)pSCNL, inBuf, (XFRMWORLD*)pDb ) != EW_SUCCESS )
1178      {
1179        pSCNL->status = 1;
1180        return EW_FAILURE;
1181      }
1182    }
1183    else if ( inBuf->trh2.samprate != pSCNL->samprate ) 
1184    {
1185      logit("et","%s: sample rate changed in <%s.%s.%s.%s> at %lf from %lf to %lf; exiting.\n",
1186            pDb->mod_name,
1187            pSCNL->inSta, pSCNL->inChan, pSCNL->inNet, pSCNL->inLoc, inBuf->trh2.starttime,
1188            pSCNL->samprate, inBuf->trh2.samprate);
1189      pSCNL->status = 1;
1190      return EW_FAILURE;
1191    }
1192    else if ( strcmp( inBuf->trh2.datatype, pSCNL->datatype ) != 0 ) 
1193    {
1194      logit("et","%s: data type changed in <%s.%s.%s.%s> at %lf from %s to %s; exiting.\n",
1195            pDb->mod_name,
1196            pSCNL->inSta, pSCNL->inChan, pSCNL->inNet, pSCNL->inLoc, inBuf->trh2.starttime,
1197            pSCNL->datatype, inBuf->trh2.datatype);
1198      pSCNL->status = 1;
1199      return EW_FAILURE;
1200    }
1201
1202  }
1203  else
1204  {
1205    BaseXfrmResetSCNL( (XFRMSCNL*)pSCNL, inBuf, (XFRMWORLD*)pDb );
1206  }
1207 
1208  pSCNL->inEndtime = inBuf->trh2.endtime;
1209   
1210  return EW_SUCCESS;
1211}
1212
1213int XfrmWritePacket( XFRMWORLD* pDb, XFRMSCNL* pSCNL, unsigned char msgtype, 
1214        TracePacket* outBuf, int outBufLen )
1215{
1216  pDb->trcLogo.type = msgtype;
1217  pDb->trcLogo.mod = pDb->xfrmEWH->myModId;
1218  pDb->trcLogo.instid = pDb->xfrmEWH->myInstId;
1219  outBuf->trh2.version[0] = TRACE2_VERSION0;
1220  outBuf->trh2.version[1] = TRACE2_VERSION1;
1221  strcpy( outBuf->trh2.sta, pSCNL->outSta );
1222  strcpy( outBuf->trh2.net, pSCNL->outNet );
1223  strcpy( outBuf->trh2.chan, pSCNL->outChan );
1224  strcpy( outBuf->trh2.loc, pSCNL->outLoc );
1225  if (pSCNL->conversionFactor != TRACE2_NO_CONVERSION_FACTOR) {
1226          outBuf->trh2x.version[1] = TRACE2_VERSION11;
1227          outBuf->trh2x.x.v21.conversion_factor = pSCNL->conversionFactor;
1228  }
1229  if (tport_putmsg (&(pDb->regionOut), &(pDb->trcLogo), outBufLen, 
1230                    outBuf->msg) != PUT_OK)
1231  {
1232    logit ("t","%s: Error sending type:%d message.\n", pDb->mod_name, msgtype );
1233    return EW_FAILURE;
1234  }
1235   
1236  return EW_SUCCESS;
1237}
1238
1239int main (int argc, char **argv) 
1240{
1241  XFRMWORLD *World;                /* Our main data structure              */
1242  time_t    timeNow;               /* current time                         */ 
1243  time_t    timeLastBeat;          /* time last heartbeat was sent         */
1244  long      sizeMsg;               /* size of retrieved message            */
1245  SHM_INFO  regionIn;              /* Input shared memory region info.     */
1246  MSG_LOGO  logoWave[MAX_LOGO];    /* Logo(s) of requested waveforms.      */
1247  int       ilogo;                 /* working counter                      */
1248  MSG_LOGO  logoMsg;               /* logo of retrieved message            */
1249  unsigned  tidXfrmor;             /* Transformer thread id                */
1250  char     *inBuf;                 /* Pointer to the input message buffer. */
1251  int       inBufLen;              /* Size of input message buffer         */
1252  TracePacket  *TracePkt;
1253  int       ret;
1254  char      msgText[MAXMESSAGELEN];/* string for log/error messages        */
1255
1256  /* Allocate a WORLD
1257   *****************************************************/
1258  CmdCount = NUM_BASE_CMDS;
1259  SetupXfrm( &World, &ModCmds, &CmdCount, &ModParamTargets );
1260  World->scnls = NULL;
1261  World->nSCNL = 0;
1262
1263  /* Check command line arguments
1264   ******************************/
1265  if (argc != 2)
1266  {
1267    fprintf (stderr, "Usage: %s <configfile>\n", World->mod_name);
1268    if ( World->version_magic == VERSION_MAGIC )
1269                fprintf( stderr, "Version %s\n", World->version );
1270    FreeXfrmWorld( World );
1271    exit (EW_FAILURE);
1272  }
1273 
1274  /* Read config file and configure the decimator */
1275  if (BaseConfigureXfrm(World, argv) != EW_SUCCESS)
1276  {
1277    logit("e", "%s: configure() failed \n", World->mod_name);
1278    FreeXfrmWorld( World );
1279    exit (EW_FAILURE);
1280  }
1281
1282  if ( World->xfrmParam->testMode )
1283  {
1284    logit("e", "%s terminating normally for test mode\n", World->mod_name);
1285    FreeXfrmWorld( World );
1286    exit (EW_SUCCESS);
1287  }
1288 
1289  /* We will put the SCNL index in front of the trace message, so we  *
1290   * don't have to look up the SCNL again at the other end of the queue. */
1291  inBufLen = MAX_TRACEBUF_SIZ + sizeof( double );
1292  if ( ! ( inBuf = (char *) malloc( (size_t) inBufLen ) ) )
1293  {
1294    logit( "e", "%s: Memory allocation failed - initial message buffer!\n",
1295        argv[0] );
1296    FreeXfrmWorld( World );
1297    exit( EW_FAILURE );
1298  }
1299  TracePkt = (TracePacket *) (inBuf + sizeof(TP_PREFIX));
1300 
1301  /* Attach to Input shared memory ring
1302   ************************************/
1303
1304  tport_attach (&regionIn, World->xfrmEWH->ringInKey);
1305  if (World->xfrmParam->debug) {
1306    logit ("", "%s: Attached to public memory region %s: %ld\n", World->mod_name,
1307           World->xfrmParam->ringIn, World->xfrmEWH->ringInKey);
1308  }
1309
1310  /* Attach to Output shared memory ring
1311   *************************************/
1312  if (World->xfrmEWH->ringOutKey == World->xfrmEWH->ringInKey) {
1313    World->regionOut = regionIn;
1314  } else {
1315    tport_attach (&(World->regionOut), World->xfrmEWH->ringOutKey);
1316    if (World->xfrmParam->debug)
1317      logit ("", "%s: Attached to public memory region %s: %ld\n", World->mod_name,
1318             World->xfrmParam->ringOut, World->xfrmEWH->ringOutKey);
1319  }
1320
1321 /* Specify logos of incoming waveforms
1322  *************************************/
1323  for( ilogo=0; ilogo<World->xfrmParam->nlogo; ilogo++ ) {
1324    logoWave[ilogo].instid = World->xfrmEWH->readInstId[ilogo];
1325    logoWave[ilogo].mod    = World->xfrmEWH->readModId[ilogo];
1326    logoWave[ilogo].type   = World->xfrmEWH->readMsgType[ilogo];
1327    if ( World->xfrmParam->debug ) 
1328      logit("", "%s: Logo[%d] = %d,%d,%d\n", World->mod_name, ilogo,
1329          logoWave[ilogo].instid, logoWave[ilogo].mod, logoWave[ilogo].type); 
1330  }
1331
1332  /* Specify logos of outgoing messages
1333   ************************************/
1334  World->hrtLogo.instid = World->xfrmEWH->myInstId;
1335  World->hrtLogo.mod    = World->xfrmEWH->myModId;
1336  World->hrtLogo.type   = World->xfrmEWH->typeHeartbeat;
1337  if ( World->xfrmParam->debug ) 
1338    logit("", "%s: hrtLogo = %d,%d,%d\n", World->mod_name,
1339        World->hrtLogo.instid, World->hrtLogo.mod, World->hrtLogo.type); 
1340
1341  World->errLogo.instid = World->xfrmEWH->myInstId;
1342  World->errLogo.mod    = World->xfrmEWH->myModId;
1343  World->errLogo.type   = World->xfrmEWH->typeError;
1344  if ( World->xfrmParam->debug ) 
1345    logit("", "%s: errLogo = %d,%d,%d\n", World->mod_name,
1346        World->errLogo.instid, World->errLogo.mod, World->errLogo.type); 
1347 
1348  SpecifyXfrmLogos();
1349
1350  /* Force a heartbeat to be issued in first pass thru main loop  */
1351  timeLastBeat = time (&timeNow) - World->xfrmParam->heartbeatInt - 1;
1352
1353  /* Flush the incoming transport ring */
1354  while (tport_getmsg (&regionIn, logoWave, (short)World->xfrmParam->nlogo, &logoMsg,
1355                       &sizeMsg, inBuf, inBufLen) != GET_NONE);
1356
1357  /* Create MsgQueue mutex */
1358  CreateMutex_ew();
1359
1360  /* Allocate the message Queue */
1361  initqueue (&(World->MsgQueue), QUEUE_SIZE, inBufLen);
1362 
1363
1364  /* Start decimator thread which will read messages from   *
1365   * the Queue, decimate them and write them to the OutRing */
1366  if (StartThreadWithArg (XfrmThread, (void *) World, (unsigned) 
1367                          THREAD_STACK, &tidXfrmor) == -1)
1368  {
1369    logit( "e", 
1370           "%s: Error starting thread.  Exiting.\n", World->mod_name);
1371    tport_detach (&regionIn);
1372   
1373    if (World->xfrmEWH->ringOutKey != World->xfrmEWH->ringInKey) {
1374       tport_detach (&(World->regionOut)); 
1375    }
1376    free( inBuf );
1377    FreeXfrmWorld();
1378    exit( EW_FAILURE );
1379  }
1380
1381  World->XfrmStatus = 0; /*assume the best*/
1382
1383  if ( World->completionFcn != NULL )
1384  {
1385    ret = World->completionFcn( -1 );
1386    if ( ret != EW_SUCCESS )
1387    {
1388      logit("e", "%s: Problem with completionFcn(-1) ret=%d\n", World->mod_name, ret );
1389    }
1390  }
1391
1392/*--------------------- setup done; start main loop -------------------------*/
1393  if ( World->xfrmParam->debug )
1394    logit("","%s: starting main loop\n", World->mod_name);
1395  while (tport_getflag (&regionIn) != TERMINATE  &&
1396         tport_getflag (&regionIn) != World->MyPid )
1397  {
1398    /* send module's heartbeat */
1399    if (time (&timeNow) - timeLastBeat >= World->xfrmParam->heartbeatInt) 
1400    {
1401      timeLastBeat = timeNow;
1402      StatusReport (World, World->xfrmEWH->typeHeartbeat, 0, ""); 
1403    }
1404
1405    if (World->XfrmStatus < 0)
1406    {
1407      logit ("t", 
1408             "%s: thread died. Exiting\n", World->mod_name);
1409      exit (EW_FAILURE);
1410    }
1411
1412    ret = tport_getmsg (&regionIn, logoWave, (short)World->xfrmParam->nlogo, 
1413                        &logoMsg, &sizeMsg, TracePkt->msg, MAX_TRACEBUF_SIZ);
1414
1415    /* Check return code; report errors */
1416    if (ret != GET_OK)
1417    {
1418      if (ret == GET_TOOBIG)
1419      {
1420        sprintf (msgText, "msg[%ld] i%d m%d t%d too long for target",
1421                 sizeMsg, (int) logoMsg.instid,
1422                 (int) logoMsg.mod, (int)logoMsg.type);
1423        StatusReport (World, World->xfrmEWH->typeError, ERR_TOOBIG, msgText);
1424        continue;
1425      }
1426      else if (ret == GET_MISS)
1427      {
1428        sprintf (msgText, "missed msg(s) i%d m%d t%d in %s",
1429                 (int) logoMsg.instid, (int) logoMsg.mod, 
1430                 (int)logoMsg.type, World->xfrmParam->ringIn);
1431        StatusReport (World, World->xfrmEWH->typeError, ERR_MISSMSG, msgText);
1432      }
1433      else if (ret == GET_NOTRACK)
1434      {
1435        sprintf (msgText, "no tracking for logo i%d m%d t%d in %s",
1436                 (int) logoMsg.instid, (int) logoMsg.mod, 
1437                 (int)logoMsg.type, World->xfrmParam->ringIn);
1438        StatusReport (World, World->xfrmEWH->typeError, ERR_NOTRACK, msgText);
1439      }
1440      else if (ret == GET_NONE)
1441      {
1442        sleep_ew(500);
1443        continue;
1444      }
1445      else if (World->xfrmParam->debug)
1446        logit( "", "%s: Unexpected return from tport_getsg: %d\n", 
1447            World->mod_name, ret );
1448    }
1449
1450    if ((ret = matchXfrmSCNL( TracePkt, logoMsg.type, World )) < -1 )
1451    {
1452      logit ("t", "%s: Call to matchSCNL failed with %d; exiting.\n", 
1453        World->mod_name, ret);
1454      FreeXfrmWorld();
1455      exit (EW_FAILURE);
1456    }
1457    else if ( ret == -1 )
1458    {
1459      /* Not an SCNL we want */
1460      if ( World->xfrmParam->processRejected ) 
1461      {
1462        if ( ProcessXfrmRejected( TracePkt, logoMsg, inBuf ) == EW_FAILURE )
1463        {
1464          logit ("t","%s: Error processing rejected packet; type:%d message.\n", 
1465                World->mod_name, logoMsg.type );
1466          FreeXfrmWorld();
1467          exit (EW_FAILURE);
1468        }
1469      }
1470      continue;
1471    }
1472   
1473    /* stick the SCNL number as an int at the front of the message */
1474    *((int*)inBuf) = ret; 
1475
1476    /* If necessary, swap bytes in the wave message */
1477    if( logoMsg.type == World->xfrmEWH->typeTrace2 )
1478        ret = WaveMsg2MakeLocal( &(TracePkt->trh2) );
1479    else if( logoMsg.type == World->xfrmEWH->typeTrace ) 
1480        ret = WaveMsgMakeLocal( &(TracePkt->trh) );
1481
1482    ret = PreprocessXfrmBuffer( TracePkt, logoMsg, inBuf );
1483
1484    if ( ret < -1 )
1485    {
1486      FreeXfrmWorld();
1487      exit (EW_FAILURE);
1488    }
1489    if ( ret == -1 )
1490    {
1491      continue;
1492    }
1493
1494    /* Queue retrieved msg */
1495    RequestMutex ();
1496    ret = enqueue (&(World->MsgQueue), inBuf, sizeMsg + sizeof(TP_PREFIX), logoMsg); 
1497    ReleaseMutex_ew ();
1498
1499    if (ret != 0)
1500    {
1501      if (ret == -1)
1502      {
1503        sprintf (msgText, 
1504                 "Message too large for queue; Lost message.");
1505        StatusReport (World, World->xfrmEWH->typeError, ERR_QUEUE, msgText);
1506        continue;
1507      }
1508      if (ret == -3) 
1509      {
1510        sprintf (msgText, "Queue full. Old messages lost.");
1511        StatusReport (World, World->xfrmEWH->typeError, ERR_QUEUE, msgText);
1512        continue;
1513      }
1514      logit("", "%s: Problem w/ queue: %d\n", World->mod_name, ret );
1515    } /* problem from enqueue */
1516
1517  } /* wait until TERMINATE is raised  */ 
1518
1519  if ( World->completionFcn != NULL ) /* send empty packet to signal termination */
1520  {
1521    RequestMutex (); 
1522    ret = enqueue (&(World->MsgQueue), inBuf, 0, logoMsg); 
1523    ReleaseMutex_ew ();
1524    ret = World->completionFcn( 0 );
1525    if ( ret != EW_SUCCESS )
1526    {
1527      logit("e", "%s: Problem with completionFcn(0) ret=%d\n", World->mod_name, ret );
1528    }
1529  }
1530
1531  /* Termination has been requested */
1532  tport_detach (&regionIn);
1533  if (World->xfrmEWH->ringOutKey != World->xfrmEWH->ringInKey) {
1534    tport_detach (&(World->regionOut));
1535  }
1536  free( inBuf );
1537  FreeXfrmWorld();
1538  logit ("t", "Termination requested; exiting!\n" );
1539  exit (EW_SUCCESS);
1540
1541}
Note: See TracBrowser for help on using the repository browser.