source: trunk/src/seismic_processing/gmew/gmew.c @ 8019

Revision 8019, 15.5 KB checked in by kevin, 3 months ago (diff)

Added Arias Intensity and ColorPGA option

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*
2 *    Revision history:
3 *     $Log$
4 *     Revision 1.13  2009/08/26 13:28:54  paulf
5 *     fixed version output and gmew.d
6 *
7 *     Revision 1.12  2009/08/26 00:38:17  paulf
8 *     forgot to up the version number...duh
9 *
10 *     Revision 1.11  2009/08/25 19:55:12  paulf
11 *     modified logit for extraDelay to only report to log
12 *
13 *     Revision 1.10  2009/08/25 00:10:40  paulf
14 *     added extraDelay parameter go gmew
15 *
16 *     Revision 1.9  2007/02/26 13:44:40  paulf
17 *     fixed heartbeat sprintf() to cast time_t as long
18 *
19 *     Revision 1.8  2007/02/23 17:00:42  paulf
20 *     fixed long warning for time_t
21 *
22 *     Revision 1.7  2006/03/15 14:21:54  paulf
23 *     SCNL version of gmew v0.2.0
24 *
25 *     Revision 1.6  2001/07/18 20:10:35  lombard
26 *     removed more debug lines.
27 *
28 *     Revision 1.5  2001/07/18 20:00:06  lombard
29 *     Removed some debug lines that were causing problems
30 *
31 *     Revision 1.4  2001/07/18 19:18:25  lucky
32 *     *** empty log message ***
33 *
34 *     Revision 1.3  2001/06/10 21:27:36  lombard
35 *     Changed single transport ring to input and output rings.
36 *     Added ability to handle multiple getEventsFrom commands.
37 *     Fixed handling of waveservers in config file.
38 *
39 *     Revision 1.2  2001/05/09 22:33:50  dietz
40 *     Changed to shut down gracefully if the transport flag is
41 *     set to TERMINATE or pEW->myPid.
42 *
43 *     Revision 1.1  2001/03/30 19:14:25  lombard
44 *     Initial revision
45 */
46
47/*
48 * gmew.c: gmew, the earthworm ground-motion module.
49 *
50 * Pete Lombard; Earthworm Engineering; February 2001
51 */
52
53
54#define VERSION "v0.3.3 2019-05-31"
55
56#include <stdio.h>
57#include <stdlib.h>
58#include <string.h>
59
60#include "earthworm.h"
61#include "mem_circ_queue.h"
62#include "rw_strongmotionII.h"
63#include "transport.h"
64#include "time_ew.h"
65#include "read_arc.h"
66#include "gm.h"
67#include "gm_util.h"
68#include "gm_sac.h"
69#include "gm_ws.h"
70#include "../localmag/lm_misc.h"
71#include "ew_spectra_io.h"
72
73/* Error messages used by gmew */
74#define  ERR_MISSMSG       0   /* message missed in transport ring       */
75#define  ERR_TOOBIG        1   /* retreived msg too large for buffer     */
76#define  ERR_NOTRACK       2   /* msg retreived; tracking limit exceeded */
77#define  ERR_ARCFILE       3   /* error creating/writing arc file        */
78#define  ERR_QUEUE         4   
79
80#define IN_QUEUE_SIZE   100     /* Number of arc summary lines to hold  */
81#define THREAD_STACK    8192    /* How big is our thread stack          */
82
83/* Internal function prototypes */
84static void setUpThread( GMPARAMS * );
85static thr_ret StackerThread( void * );
86static void gm_status( MSG_LOGO *, short, char *, GMEW *);
87
88/* Used to parse textual date */
89static char* monthNames[] = {"Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"}; 
90
91int main(int argc, char **argv)
92{
93  static EVENT event;
94  static GMPARAMS gmParams;
95  static GMEW gmEW;
96  int result;
97  long msgLen;
98  MSG_LOGO recLogo;
99  char msg[MAX_BYTES_PER_EQ+1];
100 
101  gmParams.pEW = &gmEW;
102
103  if (Configure(&gmParams, argc, argv, &event, VERSION) < 0)
104  {
105    logit("et", "gmew: Version %s\n", VERSION);
106    logit("et", "gmew: configuration failed; exitting\n");
107    return 1;
108  }
109 
110  logit("t", "gmew: Version %s\n", VERSION);
111
112  /* Start up our message stacker thread */
113  setUpThread( &gmParams );
114 
115  /* Main loop; do this until we die */
116  while ( !gmEW.terminate )
117  {
118    /* Grap the next arc summary line from the input queue */
119    RequestSpecificMutex(&gmEW.Qmutex);
120    result = dequeue (&gmEW.msgQ, msg, &msgLen, &recLogo);
121    ReleaseSpecificMutex(&gmEW.Qmutex);
122   
123    if (result < 0) /* Nothing in queue; take a break */
124      goto Next;
125   
126    /* Parse event data from the new message */
127    if ( gmEW.ha2kLogo.type != 0 && recLogo.type == gmEW.ha2kLogo.type ) {
128        /* Process the hypoarc message */
129       
130                if ( (result = procArc( msg, &event, recLogo, gmEW.gmLogo )) < 0)
131                {
132                  logit("et", "gmew: error processing ARC message; exiting\n");
133                  break;
134                }
135       
136                if (gmParams.waitTime != 0) {
137                  logit("t", "gmew: sleeping for %d seconds before processing event\n", gmParams.waitTime);
138                  sleep_ew(1000*gmParams.waitTime);
139                }
140                 
141                /* Process the event */
142                if ( (result = getGMFromTrace(&event, &gmParams)) < 0)
143                {
144                  logit("et", "gmew: error analyzing ground motion; exiting\n");
145                  break;
146                }
147       
148                endEvent( &event, &gmParams);
149        } else if ( gmEW.amLogo.type != 0 && recLogo.type == gmEW.amLogo.type ) {
150                /* Process the activate_module message */
151                char date[100];
152                int modID, args;
153                time_t now;
154               
155                event.numSta = 0;
156                args = sscanf( msg, "%d %s", &modID, date );
157                if ( args == 1 || args == 2 ) {
158                        if ( modID != gmEW.hrtLogo.mod )
159                                continue;
160                        event.eventId[0] = 0;  /* Indicate this isn't a real event */
161                        if ( args == 2 ) {
162                                if (atoi(date) > 0) {
163                                        EWSConvertTime (date, &event.origin_time);
164                                } else {
165                                        /* handle negative times as time BEFORE NOW */
166                                        time(&now);
167                                        event.origin_time = now + atoi(date);
168                                }
169                        } else {
170                                /* handle no second argument as NOW */
171                                time(&now);
172                                event.origin_time = now;
173                        }
174                       
175                } else {
176                        logit( "w", "gmew: Improperly-formatted ACTIVATE_MODULE message read: '%s'\n",
177                                msg );
178                }
179               
180                if (gmParams.waitTime != 0) {
181                  logit("t", "gmew: sleeping for %d seconds before processing request\n", gmParams.waitTime);
182                  sleep_ew(1000*gmParams.waitTime);
183                }
184                 
185                /* Process the event */
186                if ( (result = getGMFromTrace(&event, &gmParams)) < 0)
187                {
188                  logit("et", "gmew: error processing request; exiting\n");
189                  break;
190                }
191               
192        } else if ( gmEW.threshLogo.type != 0 && recLogo.type == gmEW.threshLogo.type ) {
193            //logit("t", "gmew: saw the THRESH message: %s\n", msg );
194                /* Process the ew threshold message */
195                int args, year;
196                struct tm when_tm;
197                char month_name[20];
198               
199                event.numSta = 0;
200                args = sscanf( msg, "%*s Thresh=%*f Value=%*f Time=%*s %s %d %d:%d:%d %d", month_name, &when_tm.tm_mday, &when_tm.tm_hour, &when_tm.tm_min, &when_tm.tm_sec, &year );
201                if ( args == 6 ) {
202                    int i = 1;
203                    for ( i=0; i<12; i++ )
204                        if ( strcmp( monthNames[i], month_name ) == 0 ) {
205                            when_tm.tm_mon = i;
206                            break;
207                        }
208                        when_tm.tm_year = year - 1900;
209                        event.origin_time = timegm_ew( &when_tm );
210                        sprintf( event.eventId, "%1.0f", -event.origin_time );  /* Indicate this isn't a real event */
211                } else {
212                        logit( "w", "gmew: Improperly-formatted EWTHRESH message read: '%s'\n",
213                                msg );
214                }
215               
216                if (gmParams.waitTime != 0) {
217                  logit("t", "gmew: sleeping for %d seconds before processing request\n", gmParams.waitTime);
218                  sleep_ew(1000*gmParams.waitTime);
219                }
220                 
221                /* Process the event */
222                if ( (result = getGMFromTrace(&event, &gmParams)) < 0)
223                {
224                  logit("et", "gmew: error processing request; exiting\n");
225                  break;
226                }
227               
228        }
229    /* If all went well, don't pause; just get another message */
230    continue;
231 
232  Next:
233    sleep_ew (1000);
234    continue;
235  }
236  /* We've terminated for some reason; make sure our StackerThread knows */
237  gmEW.terminate = 1;
238  logit("et", "gmew: termination requested\n");
239
240  sleep_ew(500); /* Give death a chance...*/
241  tport_detach( &gmEW.InRegion ); 
242  if (gmEW.RingInKey != gmEW.RingOutKey)
243    tport_detach( &gmEW.OutRegion ); 
244
245  return 0;
246
247}
248
249/*
250 * setUpThread: Set up the internal queue between the StackerThread and
251 *              the main thread; start up transport, and then start
252 *              the StackerThread.
253 */
254static void setUpThread( GMPARAMS *pgmParams )
255{
256  GMEW *pEW = pgmParams->pEW;
257  ew_thread_t tidStacker;
258
259  /* Set up a queue; start the message-stacking thread to feed it */
260  initqueue(&pEW->msgQ, IN_QUEUE_SIZE, MAX_BYTES_PER_EQ+1);
261  CreateSpecificMutex(&pEW->Qmutex);
262   
263  /* look up my PID for the heartbeat message */
264  pEW->myPid = getpid();
265  if ( pEW->myPid == -1 )
266  {
267    logit( "e", "gmew: Cannot get pid. Exitting.\n");
268    exit( -1 );
269  }
270
271  /* Attach to Input/Output shared memory ring */
272  if (pEW->RingInKey != pEW->RingOutKey)
273  {
274    tport_attach( &pEW->InRegion, pEW->RingInKey );
275    tport_attach( &pEW->OutRegion, pEW->RingOutKey );
276  }
277  else
278  {
279    tport_attach( &pEW->InRegion, pEW->RingInKey );
280    pEW->OutRegion = pEW->InRegion;
281  }
282
283  logit( "", "gmew: Attached to public memory regions %ld and %ld\n", 
284         pEW->RingInKey, pEW->RingOutKey );
285
286  pEW->terminate = 0;
287  if (StartThreadWithArg (StackerThread, (void *) pgmParams, 
288                          (unsigned) THREAD_STACK, &tidStacker) == -1)
289  {
290    logit( "e", 
291           "gmew: Error starting Stacker thread.  Exitting.\n");
292    pEW->terminate = 1;
293    KillSelfThread();
294  }
295  return;
296}
297
298/*
299 * StackerThread: This is the thread that is the entrance to gmew.
300 *                It reads TYPE_HYP2KARC messages from transport,
301 *                places the first (summary) line from the message into
302 *                an internal queue, and it manages heartbeats for gmew.
303 */
304static thr_ret StackerThread( void *pgm )
305{
306  GMPARAMS *pgmParams;
307  GMEW *pEW;
308  MSG_LOGO  recLogo;          /* logo of retrieved message       */
309  static char  eqmsg[MAX_BYTES_PER_EQ];  /* array to hold event message     */
310  static char line[MAX_BYTES_PER_EQ+1], *cr;
311  static char  Text[GM_MAXTXT];    /* string for log/error messages         */
312  long recsize = 0;      /* size of retrieved message       */
313  time_t timeNow;          /* current time                    */       
314  time_t timeLastBeat;     /* time last heartbeat was sent    */
315  int result;
316  struct Hsum arcSum;
317  int flagAddToQueue = 0;
318
319  pgmParams = (GMPARAMS *)pgm; 
320  pEW = pgmParams->pEW;
321 
322  /* Flush the input buffer on startup */
323  while ( tport_getmsg( &pEW->InRegion, pEW->GetLogo, pEW->nGetLogo, 
324                        &recLogo, &recsize, eqmsg, MAX_BYTES_PER_EQ-1 ) != 
325          GET_NONE );
326 
327  /* Force a heartbeat to be issued in first pass thru main loop */
328  timeLastBeat = time(&timeNow) - pgmParams->HeartBeatInterval - 1;
329 
330  /* setup done; start main loop */
331  while( tport_getflag(&pEW->InRegion) != TERMINATE  &&
332         tport_getflag(&pEW->InRegion) != pEW->myPid  )
333  {
334    /* send gmew's heartbeat */
335    if  ( time(&timeNow) - timeLastBeat  >=  pgmParams->HeartBeatInterval ) 
336    {
337      timeLastBeat = timeNow;
338      gm_status( &pEW->hrtLogo, 0, "", pEW ); 
339    }
340   
341    /* Process all new hypoinverse archive msgs */
342    do  /* Keep doing this as long as there are msgs in the transport ring */
343    {
344      /* Get the next message from shared memory */
345      result = tport_getmsg( &pEW->InRegion, pEW->GetLogo, pEW->nGetLogo, 
346                             &recLogo, &recsize, eqmsg, MAX_BYTES_PER_EQ );
347      /* Check return code; report errors if necessary */
348      if( result != GET_OK )
349      {
350        if( result == GET_NONE ) 
351          break;
352        else if( result == GET_TOOBIG ) 
353        {
354          sprintf( Text, 
355                   "Retrieved msg[%ld] (i%u m%u t%u) too big for eqmsg[%d]",
356                   recsize, recLogo.instid, recLogo.mod, recLogo.type, 
357                   MAX_BYTES_PER_EQ-1 );
358          gm_status( &pEW->errLogo, ERR_TOOBIG, Text, pEW );
359          continue;
360        }
361        else if( result == GET_MISS ) 
362        {
363          sprintf( Text,
364                   "Missed msg(s)  i%u m%u t%u",
365                   recLogo.instid, recLogo.mod, recLogo.type);
366          gm_status( &pEW->errLogo, ERR_MISSMSG, Text, pEW );
367        }
368        else if( result == GET_NOTRACK ) 
369        {
370          sprintf( Text,
371                   "Msg received (i%u m%u t%u); transport.h NTRACK_GET exceeded",
372                   recLogo.instid, recLogo.mod, recLogo.type );
373          gm_status( &pEW->errLogo, ERR_NOTRACK, Text, pEW );
374        }
375      }
376
377      /* Copy the first (summary) line into a buffer for the queue */
378      memset(line, 0, MAX_BYTES_PER_EQ+1);
379      if ( (cr = (char *)memchr(eqmsg, '\n', MAX_BYTES_PER_EQ)) == NULL)
380        strncpy(line, eqmsg, MAX_BYTES_PER_EQ);
381      else
382        strncpy(line, eqmsg, (size_t)(cr - eqmsg));
383     
384      /* Initialize flag for adding arc message to the queue */
385      flagAddToQueue = 0;
386
387      /* Check for LookAtVersion */
388      if( pgmParams->LookAtVersion == vAll) {
389        flagAddToQueue = 1;
390      } else {
391        if (read_hyp (eqmsg, NULL, &arcSum) < 0) {
392          snprintf( Text, GM_MAXTXT - 1, "Error reading arc summary: %s\n", eqmsg );
393          logit("e", "%s", Text);
394        } else {
395          if (arcSum.version ==  pgmParams->LookAtVersion ) {
396            flagAddToQueue = 1;
397            snprintf( Text, GM_MAXTXT - 1, "Enqueued arc message %ld.%ld\n", arcSum.qid, arcSum.version );
398            logit("t", "%s", Text);
399
400          } else {
401            snprintf( Text, GM_MAXTXT - 1, "Not enqueued arc message %ld.%ld\n", arcSum.qid, arcSum.version );
402            logit("t", "%s", Text);
403          }
404        }
405      }
406
407      /* Check for adding (or not) the arc message to the queue */
408      if(flagAddToQueue) {
409        /* Queue retrieved msg */
410        RequestSpecificMutex(&pEW->Qmutex);
411        result = enqueue (&pEW->msgQ, line, MAX_BYTES_PER_EQ, recLogo); 
412        ReleaseSpecificMutex(&pEW->Qmutex);
413      } else {
414        result = GET_NONE;
415      }
416     
417      if (result != 0)
418      {
419        if (result == -1)
420        {
421          sprintf (Text, "Message too large for queue; Lost message.");
422          gm_status (&pEW->errLogo, ERR_QUEUE, Text, pEW );
423          continue;
424        }
425        if (result == -3) 
426        {
427          sprintf (Text, "Queue full. Old messages lost.");
428          gm_status (&pEW->errLogo, ERR_QUEUE, Text, pEW);
429          continue;
430        }
431      } /* problem from enqueue */
432     
433    } while( result != GET_NONE );  /*end of message-processing-loop */
434
435    /* No more msgs in transport ring; take a break */
436    sleep_ew( 1000 );  /* no more messages; wait for new ones to arrive */
437  }
438  /* end of main loop */
439
440  /* Termination has been requested; make sure our main thread knows */
441  pEW->terminate = 1;
442  KillSelfThread();
443  exit(0);  /* not really */
444}
445
446/*
447 * gm_status() builds a heartbeat or error message & puts it into
448 *             shared memory.  Writes errors to log file & screen.
449 */
450static void gm_status( MSG_LOGO *pLogo, short ierr, char *note, GMEW *pEW )
451{
452  char         msg[256];
453  long         size;
454  time_t        t;
455 
456  /* Build the message */ 
457  time( &t );
458
459  if( pLogo->type == pEW->hrtLogo.type )
460    sprintf( msg, "%ld %d\n", (long) t, (int) pEW->myPid );
461  else if( pLogo->type == pEW->errLogo.type )
462  {
463    sprintf( msg, "%ld %hd %s\n", (long) t, ierr, note);
464    logit( "et", "gmew: %s\n", msg );
465  }
466
467
468  /* Write the message to shared memory */
469  if (pLogo->type == pEW->gmLogo.type)
470  {
471    size = strlen(note);
472    if( tport_putmsg( &pEW->OutRegion, pLogo, size, note ) != PUT_OK )
473      logit("et", "gmew: Error sending LOCALMAG message\n");
474  }
475  else
476  {
477    size = strlen( msg );   /* don't include the null byte in the message */
478    if( tport_putmsg( &pEW->OutRegion, pLogo, size, msg ) != PUT_OK )
479    {
480      if( pLogo->type == pEW->hrtLogo.type ) 
481        logit("et","gmew:  Error sending heartbeat.\n" );
482      else if( pLogo->type == pEW->errLogo.type ) 
483        logit("et","gmew:  Error sending error:%d.\n", ierr );
484    }
485  }
486  return;
487}
488
489/*
490 * send_gm: send the ground-motion observations (as a StrongMotion message)
491 *          to transport.
492 *     returns: 0 on success
493 *             -1 on failure (out of space in message or transport error)
494 */
495int send_gm( SM_INFO *pSM, GMEW *pEW )
496{
497  char message[GM_SM_LEN];
498  int rc, len;
499 
500  if ( (rc = wr_strongmotionII( pSM, message, GM_SM_LEN)) < 0)
501    return rc;
502 
503  len = strlen(message);
504
505  logit("t", "Strong motion message with length=%d is being issued:\n", len);
506  logit("", "%s\n", message);
507 
508 
509  if( tport_putmsg( &pEW->OutRegion, &pEW->gmLogo, len, message ) != PUT_OK )
510  {
511    logit("et","gmew:  Error sending strong_motion message.\n" );
512    return -1;
513  }
514 
515  return 0;
516}
Note: See TracBrowser for help on using the repository browser.