source: trunk/src/seismic_processing/ewspectra/ewspectra.c @ 4203

Revision 4203, 34.5 KB checked in by scott, 10 years ago (diff)

Added activate_module & its message type; modified ewspectra to use it

Line 
1/******************************************************************************
2 *
3 *      File:                   ewspectra.c
4 *
5 *      Function:               Earthworm module for computing spectra from waveserver data
6 *
7 *      Author(s):              Scott Hunter, ISTI
8 *
9 *      Source:                 Started anew.
10 *
11 *      Notes:                 
12 *
13 *      Change History:
14 *                      4/26/11 Started source
15 *     
16 *****************************************************************************/
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <time.h>
21#include <earthworm_defs.h>
22#include <ws2ts.h>
23#include <math.h>
24#include <string.h>
25#include <kom.h>
26#include <ew_spectra.h>
27#include <transport.h>
28#include "iir.h"
29#include <ew_spectra_io.h>
30
31#define MAX_EWS_PROCS   10      /* maximum number of spectra processing definitions */
32
33#define MSGSTK_OFF    0              /* MessageStacker has not been started      */
34#define MSGSTK_ALIVE  1              /* MessageStacker alive and well            */
35#define MSGSTK_ERR   -1              /* MessageStacker encountered error quit    */
36volatile int MessageStackerStatus = MSGSTK_OFF;
37
38/* Error messages used by export
39 ***********************************/
40#define  ERR_MISSMSG       0   /* message missed in transport ring        */
41#define  ERR_TOOBIG        1   /* retreived msg too large for buffer      */
42#define  ERR_NOTRACK       2   /* msg retreived; tracking limit exceeded  */
43#define  ERR_BADMSG        3   /* message w/ bad timespan                 */
44static char  errText[256];     /* string for log/error messages           */
45
46#define backward 0
47
48char *progname;
49
50/* Things to look up in the earthworm.h tables with getutil.c functions
51 **********************************************************************/
52static long          InRingKey;     /* key of transport ring for input    */
53static long          OutRingKey;    /* key of transport ring for output   */
54static unsigned char InstId;        /* local installation id              */
55static unsigned char MyModId;       /* Module Id for this program         */
56
57static  SHM_INFO  InRegion;     /* shared memory region to use for input  */
58static  SHM_INFO  OutRegion;    /* shared memory region to use for output */
59
60MSG_LOGO  GetLogo[1];                           /* requesting module,type,instid */
61pid_t MyPid;        /* Our own pid, sent with heartbeat for restart purposes */
62       
63time_t now;        /* current time, used for timing heartbeats */
64time_t MyLastBeat;         /* time of last local (into Earthworm) hearbeat */
65
66static int     HeartBeatInt;        /* seconds between heartbeats        */
67static unsigned char TypeHeartBeat;
68static unsigned char TypeError;
69
70static char *Rawmsg = NULL;          /* "raw" retrieved msg for main thread      */
71static char *MSrawmsg = NULL;        /* MessageStacker's "raw" retrieved message */
72
73int static long    MaxMsgSize = 100;          /* max size for input msgs    */
74
75WShandle *wsh;
76
77#define THREAD_STACK 8192
78static unsigned tidStacker;          /* Thread moving messages from transport */
79
80int numPeaks = 0;
81double peakBand[2];
82
83
84
85/*****************************************************************************
86   Discrete fourier transform subroutine
87   based on the fast fourier transform algorithm
88   which computes the quantity:
89
90             n-1
91        x(j)=sum  x(k) exp( 2*pi*i*isign*k*j/n )
92             k=0
93
94   n must be a integral power of 2.
95*****************************************************************************/
96static void ah_four(double *data, int n, int isign) {
97        int ip0, ip1, ip2, ip3, i3rev;
98        int i1, i2a, i2b, i3;
99        double sinth, wstpr, wstpi, wr, wi, tempr, tempi, theta;
100
101        ip0= 2;
102        ip3= ip0*n;
103        i3rev= 1;
104        for (i3= 1; i3 <= ip3; i3+= ip0) {
105                if (i3 < i3rev) {
106                        tempr= data[i3-1];
107                        tempi= data[i3];
108                        data[i3-1]= data[i3rev-1];
109                        data[i3]= data[i3rev];
110                        data[i3rev-1]= tempr;
111                        data[i3rev]= tempi;
112                }
113                ip1= ip3/2;
114                do {
115                        if (i3rev <= ip1)
116                                break;
117                        i3rev-= ip1;
118                        ip1/= 2;
119                } while (ip1 >= ip0);
120                i3rev+= ip1;
121        }
122        ip1= ip0;
123        while (ip1 < ip3) {
124                ip2= ip1 * 2;
125                theta= 6.283185/((double) (isign*ip2/ip0));
126                sinth= (double) sin((double) (theta/2.));
127                wstpr= -2.*sinth*sinth;
128                wstpi= (double) sin((double) theta);
129                wr= 1.;
130                wi= 0.;
131                for (i1= 1; i1 <= ip1; i1+= ip0) {
132                        for (i3= i1; i3 <ip3; i3+= ip2) {
133                                i2a= i3;
134                                i2b= i2a+ip1;
135                                tempr= wr*data[i2b-1] - wi*data[i2b];
136                                tempi= wr*data[i2b] + wi*data[i2b-1];
137                                data[i2b-1]= data[i2a-1] - tempr;
138                                data[i2b]= data[i2a] - tempi;
139                                data[i2a-1]+= tempr;
140                                data[i2a]+= tempi;
141                        }
142                        tempr= wr;
143                        wr= wr*wstpr - wi*wstpi + wr;
144                        wi= wi*wstpr + tempr*wstpi + wi;
145                }
146                ip1=ip2;
147        }
148        return;
149}
150
151/*****************************************************************************
152   This subroutine takes a real time series and computes its
153   discrete fourier transform. The time series has n real points
154   and the transform has n/2+1 complex values starting with
155   frequency zero and ending at the nyquist frequency.
156
157   Parameters:
158        x       floating point data array
159        n       the number of samples, an integral power of 2
160*****************************************************************************/
161static void ah_fftr(double *x, int n) {
162        int nn, is, nm, j, i;
163        int k1j, k1i, k2j, k2i;
164        double s, fn, ex, wr, wi, wwr, wrr, wwi, a1, a2, b1, b2;
165
166        nn= n/2;
167        is= 1;
168        ah_four(x, nn, is);
169        nm= nn/2;
170        s= x[0];
171        x[0]+= x[1];
172        x[n]= s - x[1];
173        x[1]= 0.0 ;
174        x[n+1]= 0.0;
175        x[nn+1]= (-x[nn+1]);
176        fn= (double) n;
177        ex= 6.2831853/fn;
178        j= nn;
179        wr= 1.0;
180        wi= 0.0;
181        wwr= (double) cos((double) ex);
182        wwi= (double) (-sin((double) ex));
183        for (i= 2; i <= nm; i++) {
184                wrr= wr*wwr-wi*wwi;
185                wi= wr*wwi+wi*wwr;
186                wr= wrr;
187                k1j= 2*j-1;
188                k1i= 2*i-1;
189                k2j= 2*j;
190                k2i= 2*i;
191                a1= 0.5*(x[k1i-1]+x[k1j-1]);
192                a2= 0.5*(x[k2i-1]-x[k2j-1]);
193                b1= 0.5*(-x[k1i-1]+x[k1j-1]);
194                b2= 0.5*(-x[k2i-1]-x[k2j-1]);
195                s= b1;
196                b1= b1*wr+b2*wi;
197                b2= b2*wr-s*wi;
198                x[k1i-1]= a1-b2;
199                x[k2i-1]= (-a2-b1);
200                x[k1j-1]= a1+b2;
201                x[k2j-1]=  a2-b1;
202                j-= 1;
203        }
204        return;
205}
206
207int scale;                                                      /* 0=no scaling */
208int white;                                                      /* 1=scale amplitude output only by mean */
209double start, stop;                                     /* time period from config */
210int smooth;                                                     /* 0=no smoothing */
211double smooth_width;                            /* width (in secs) of smoothing window */
212char SCNL[MAX_EWS_PROCS][3][4][10];     /* SCNLs to process */
213char binary[MAX_EWS_PROCS];                     /* 1=use 2 SCNLs for processing */
214char processing_mode[MAX_EWS_PROCS];/* what processing to do:
215                                                                                p = Plain (unary)
216                                                                                d = difference (binary)
217                                                                                */
218int taperType;                                          /* what kind of tapering to do:
219                                                                                BARTLETT=1, HANNING=2,
220                                                                                PARZAN=3, BMHARRIS=4
221                                                                                */
222double taperFrac;                                       /* fraction of each end to taper */
223int num_ews_procs;                                      /* # of processing requests */
224char outpath[100];                                      /* path for output file */
225FILE *fp;                                                       /* output file pointer */
226char inRing[MAX_RING_STR];                      /* name of input ring */
227char outRing[MAX_RING_STR];                     /* name of output ring */
228int find_triggers;
229double highcutoff, lowcutoff;           /* freq for high and low cutoffs */
230int highpoles, lowpoles;                        /* # poles for high & low cutoffs (0=none) */
231
232char *taperName[5] = {"", "BARTLETT", "HANNING", "PARZAN", "BMHARRIS"};
233char *Argv0 = "ewspectra";
234
235static char    MyModName[MAX_MOD_STR];       /* speak as this module name/id      */
236int spectraOut = 0;
237
238/***********************************************************************
239 * parse_my_command () - handle config file commands not handled in ws2ts
240 *              return 0 if handled, 1 if unknown or an error
241 ***********************************************************************/
242int parse_my_command() {
243        if (k_its("Scale")) {
244                scale = 1;
245        } else if (k_its("White")) {
246                white = 1;
247        } else if (k_its("Smooth")) {
248                smooth = 1;
249                smooth_width = k_val();
250        } else if (k_its("Taper")) {
251                char *taperTypeName;
252                taperTypeName = k_str();
253                for ( taperType = 4; taperType>0; taperType-- )
254                        if ( strcmp( taperTypeName, taperName[taperType] )==0 )
255                                break;
256                if ( taperType == 0 ) {
257                        logit( "e", "Invalid type of taper: '%s'\n", taperTypeName );
258                        return 1;
259                }
260                taperFrac = k_val();
261        } else if (k_its("MyModuleId")) {
262                strcpy( MyModName, k_str() );
263                if ( GetModId( MyModName, &MyModId ) != 0 ) {
264                  logit( "e", "%s: Invalid module name <%s>; exiting!\n",
265                                   Argv0, MyModName );
266                  return 1;
267                }
268        } else if (k_its("DeconvolveSpectraSCNLs") || k_its("DiffSpectraSCNLs")
269                || k_its("PlainSpectraSCNL")) {
270                int i,j,step;
271                if ( num_ews_procs >= MAX_EWS_PROCS ) {
272                        logit( "e", "Too many processing requests; ignoring '%s'\n", k_com() );
273                        return 0;
274                }
275                step = k_its("PlainSpectraSCNL") ? 2 : 1;
276                *SCNL[num_ews_procs][2][3] = 0;
277                for ( i=0; i<3; i+=step )
278                        for ( j=0; j<4; j++ ) {
279                                char *str = k_str();
280                                if ( str == NULL )
281                                        if ( i < 2 ) {
282                                                logit( "e", "Incomplete source SCNL for %s\n", k_com() );
283                                                return 0;
284                                        } else if ( j > 0 ) {
285                                                logit( "e", "Target SCNL must be fully specified for %s\n", k_com() );
286                                                return 0;
287                                        } else
288                                                break;
289                                strcpy( SCNL[num_ews_procs][i][j], str );
290                        }
291                processing_mode[num_ews_procs] = step==2 ? 'p' : k_its("DeconvolveSpectraSCNLs") ? 'd' : 's';
292                binary[num_ews_procs] = (step == 1);
293                num_ews_procs++;
294        } else if (k_its("TimeSpan")) {
295                char *arg1s = k_str();
296                double arg1 = atof( arg1s );
297                if ( arg1 < 0 ) { 
298                        start = time(NULL) + arg1;
299                } else if ( EWSConvertTime (arg1s, &start) == EW_FAILURE ) {
300                        return 1;
301                }
302                stop = start + k_val();
303        } else if (k_its("OutFile")) {
304                strcpy( outpath, k_str() );
305        } else if (k_its("InRing")) {
306                find_triggers = 1;
307                strcpy( inRing, k_str() );
308                if ( ( InRingKey = GetKey(inRing) ) == -1 ) {
309                        logit( "e", "%s:  Invalid ring name <%s>; exiting!\n",
310                                         Argv0, inRing);
311                        return 1;
312                }
313        } else if (k_its("OutRing")) {
314                strcpy( outRing, k_str() );
315            if ( ( OutRingKey = GetKey(outRing) ) == -1 ) {
316                        logit( "e", "%s:  Invalid ring name <%s>; exiting!\n",
317                                         Argv0, outRing);
318                        return 1;
319                }
320        } else if (k_its("HighCut")) {
321                highcutoff = k_val();
322                if ( highcutoff <= 0 ) {
323                        logit( "e", "HighCut cutoff frequency must be > 0\n" );
324                        return 1;
325                }
326                highpoles = k_int();
327                if ( highpoles % 2 || highpoles < 2 || highpoles > 12 )  {
328                        logit( "e", "# poles for HighCut must be 2, 4, 6, 8, 10 or 12\n" );
329                        return 1;
330                }
331        } else if (k_its("LowCut")) {
332                lowcutoff = k_val();
333                if ( lowcutoff <= 0 ) {
334                        logit( "e", "LowCut cutoff frequency must be > 0\n" );
335                        return 1;
336                }
337                lowpoles = k_int();
338                if ( lowpoles % 2 || lowpoles < 2 || lowpoles > 12 )  {
339                        logit( "e", "# poles for LowCut must be 2, 4, 6, 8, 10 or 12\n" );
340                        return 1;
341                }
342        } else if (k_its("ReportSpectra")) {
343                spectraOut = 1;
344        } else if (k_its("ReportPeaks")) {
345                numPeaks = k_int();
346                peakBand[0] = k_val();
347                peakBand[1] = k_val();
348        } else
349                return 1;
350        return 0;
351}
352
353
354
355/***********************************************************************
356 * deconvolve_spectra () - deconvolve spectra b into a
357 *              return 0 if no problems, 1 otherwise
358 ***********************************************************************/
359int deconvolve_spectra( EW_TIME_SERIES *spec_a, EW_TIME_SERIES *spec_b ) {
360        int ndata = spec_a->dataCount * 2;
361        int i;
362        double denom, result_0, result_1;
363        double *a = spec_a->data, *b = spec_b->data;
364        for ( i=0; i<=ndata; i+=2 ) {
365                denom = b[i]*b[i] + b[i+1]*b[i+1];
366                if ( denom == 0.0 )
367                        a[i] = a[i+1] = 0.0;
368                else {
369                        result_0 = a[i]*b[i] + a[i+1]*b[i+1];
370                        result_1 = b[i]*a[i+1] - a[i]*b[i+1];
371                        a[i]   = result_0 / denom;
372                        a[i+1] = result_1 / denom;
373                }
374                       
375        }
376        return 0;
377}
378
379static double iScale( int i, double t, void* scale ) {
380        return i*(*(double*)scale);
381}
382
383/***********************************************************************
384 * smooth_spectra () - smooth the spectra timeseries
385 *              return 0 if no problems, 1 otherwise
386 ***********************************************************************/
387int smooth_spectra( EW_TIME_SERIES * ewspec, int mode ) {
388        double *specsm, even_sum, odd_sum;
389        int i, imod;
390        double delta = 1/ewspec->trh2x.samprate;
391        int smoother= smooth_width/delta;
392        int ndata = ewspec->dataCount * 2;
393        double *spec = ewspec->data;
394       
395        /* make sure that smoother is even */
396        smoother= (smoother/2)*2;
397        if ((specsm= (double *) malloc((smoother)*sizeof(double))) == NULL ) {
398                logit("et","%s: memory allocation error\n", progname);
399                return 1;
400        }
401        /* Create & fill smoothing window */
402        memcpy( specsm, spec, smoother*2*sizeof(double) );
403        even_sum = odd_sum = 0;
404        for ( i=0; i<smoother*2; i+=2) {
405                if ( mode & EWTS_MODE_FIRST )
406                        even_sum   += spec[i];
407                if ( mode & EWTS_MODE_SECOND )
408                        odd_sum += spec[i+1];
409        }
410               
411        for (i= smoother, imod=0; i <= ndata-smoother; i+= 2, imod = (imod+2)%(smoother*2) ) {
412                if ( mode & EWTS_MODE_FIRST ) {
413                        even_sum += spec[i+smoother];           /* Add next item to window sums */
414                        spec[i]   = even_sum / (smoother+1);/* Store next smoothed result */
415                        even_sum -= specsm[imod];                       /* Remove oldest value from sums... */
416                        specsm[imod]   = spec[i+smoother];      /*... and replace it with newest in window */
417                }
418                if ( mode & EWTS_MODE_SECOND ) {
419                        odd_sum  += spec[i+smoother+1];         /* Add next item to window sums */
420                        spec[i+1] = odd_sum  / (smoother+1);/* Store next smoothed result */
421                        odd_sum -= specsm[imod+1];              /* Remove oldest value from sums... */
422                        specsm[imod+1] = spec[i+smoother+1];/*... and replace it with newest in window */
423                }
424        }
425        free(specsm);
426        return 0;
427}
428
429/***********************************************************************
430 * convert_spectra () - convert complex spectra to amp/phase representation
431 *              return 0 if no problems, 1 otherwise
432 ***********************************************************************/
433int convert_spectra( EW_TIME_SERIES * ewspec ) {
434        int i, ndata = ewspec->dataCount * 2;
435        double *spec = ewspec->data;
436        double amp, phase;
437        for (i= 0; i <= ndata; i+= 2) {
438                amp= (double) sqrt((double) (spec[i]*spec[i]+spec[i+1]*spec[i+1]));
439                phase= (double) atan2(spec[i+1], spec[i]);
440                spec[i]= amp;
441                spec[i+1]= phase;
442        }
443        ewspec->dataType = EWTS_TYPE_AMPPHS;
444        return 0;
445}
446
447/* Similar thing for white */
448
449/***********************************************************************
450 * calc_spectra () - calculate the spectra for a timeseries, store in
451 *      provided timeseries
452 *              return 0 if no problems, 1 otherwise
453 ***********************************************************************/
454int calc_spectra( EW_TIME_SERIES * ewts, EW_TIME_SERIES * ewspec ) 
455{
456        double *spec;
457        int i, ndata;
458        /*double delta = 1/ewts->trh2x.samprate;*/
459       
460        *ewspec = *ewts;
461
462        ndata = ewts->dataCount;
463
464        /* get nearest power of 2 for listed data length */
465        for (i= 1; (int) pow((double) 2., (double) i) < ewts->dataCount; i++);
466        ndata= (int) pow((double) 2.,(double) (i+1));
467        ewspec->dataCount = ndata/2;
468        ewspec->dataType = EWTS_TYPE_COMPLEX;
469
470        /* allocate space for data and smoothing area */
471        if ((spec= (double *) malloc((unsigned) (ndata+2)*sizeof(double))) == NULL ) {
472                logit("et","%s: memory allocation error\n", progname);
473                return 1;
474        }
475        ewspec->data = spec;
476        memcpy( spec, ewts->data, ewts->dataCount*sizeof(double) );
477        bzero( spec+ewts->dataCount, sizeof(double)*(ndata-ewts->dataCount+2) );
478
479        ah_fftr(spec, ndata);
480       
481        return 0;
482}
483
484
485void findmax(EW_TIME_SERIES *ewspec, int top3[])
486{
487        double *arr = ewspec->data;
488        int n = ewspec->dataCount;
489        double prev = arr[0];
490        int i, j, cap, locap;
491        int goup = 1;
492        double *top3val;
493
494        double delta = 1/ewspec->trh2x.samprate;
495        double nyquist= 1.0/(2.0*delta);
496        double ndata = ewspec->dataCount - 1;
497        double scale = nyquist/ndata;
498       
499        printf( "Finding %d peaks\n", numPeaks );
500        top3val = malloc( sizeof(double) * numPeaks );
501        if ( top3val == NULL ) {
502                logit( "e", "Failed to allocate memory for peak-finding\n" );
503                exit(1);
504        }
505        for ( i=0; i<numPeaks; i++ ) {
506                top3[i] = -1;
507                top3val[i] = -1;
508        }
509
510        locap = (peakBand[0] / scale)*2;
511        if ( locap < 0 )
512                locap = 0;
513        if ( locap % 2 )
514                locap--;
515        cap = (peakBand[1] / scale)*2;
516        if ( cap % 2 )
517                cap++;
518        if ( cap > n )
519                cap = n;
520        for( i=0; i <= cap; i+=2)
521        {
522                double l,r;
523                l = (i - 2 < 0)?arr[0]:arr[i-2];
524                r = (i + 2 >= n)?arr[n-2]:arr[i+2];
525                double cur = (l + 2*arr[i] + r)/4;
526                if(goup) {
527                        if(prev > cur && i > 0) {
528                                int newidx = i-2;
529                                double newval = arr[newidx];
530                                if ( newidx >= locap && newval > top3val[numPeaks-1] ) {
531                                        for ( j=numPeaks-2; j>=0 && newval > top3val[j]; j-- ) {
532                                                top3val[j+1] = top3val[j];
533                                                top3[j+1] = top3[j];
534                                        }
535                                        top3[j+1] = newidx;
536                                        top3val[j+1] = newval;
537                                }
538                                goup = 0;
539                        }
540                } else {
541                        if(prev < cur)
542                                goup = 1;
543                }
544                prev = cur;
545        }
546        free( top3val );
547}
548
549/***********************************************************************
550 * process_timespan () - process the specified timespan for data gotten
551 *              for the SCNLs specified in config from the set of waveservers
552 *              in wsh
553 ***********************************************************************/
554void process_timespan(  WShandle *wsh, double start, double stop ) 
555{
556        int ret, i, pn;
557        EW_TIME_SERIES *ewts = NULL, *ewts2 = NULL, ewspec, ewspec2;
558        TRACE_REQ tr;
559        int *top3 = malloc( sizeof(int)*numPeaks );
560
561        tr.pBuf = NULL;
562        /* Try each of the processing requests from config */
563        for ( pn=0; pn < num_ews_procs; pn++ ) {
564       
565                /* Get the tracebufs for the first/only SCNL */
566                tr.bufLen = (stop - start)*1000000;
567                if ( tr.pBuf == NULL ) {
568                        tr.pBuf = malloc( tr.bufLen );
569                        if ( tr.pBuf == NULL ) {
570                                logit( "et", "%s: failed to allocate waveserver buffer\n", progname );
571                                continue;
572                        }
573                }
574                tr.timeout = 500;
575                tr.reqStarttime = start;
576                tr.reqEndtime = stop;           
577                ret = getTraceBufsFromWS( wsh, SCNL[pn][0][0], SCNL[pn][0][1], 
578                                                                        SCNL[pn][0][2], SCNL[pn][0][3], &tr );
579       
580                /* Convert to a timeseries */
581                if ( ret == 0 )
582                        ewts = convertTraceBufReq2WEW( &tr, 0 );
583                else 
584                        continue; /* move on to next SCNL */
585                       
586                if ( ewts == NULL )
587                        continue; /* move on to next SCNL */
588                if ( 1 ) {
589                        char date[2][20];
590                        EWSUnConvertTime( date[0], tr.actStarttime );
591                        EWSUnConvertTime( date[1], tr.actEndtime);
592                        EWSUnConvertTime( date[0], ewts->trh2x.starttime );
593                        EWSUnConvertTime( date[1], ewts->trh2x.endtime );
594                }
595
596                /* Handle the second timeseries, if present */
597                if ( binary[pn] ) {
598                        ret = getTraceBufsFromWS( wsh, SCNL[pn][1][0], SCNL[pn][1][1], 
599                                                                                SCNL[pn][1][2], SCNL[pn][1][3], &tr );
600                        if ( ret == 0 )
601                                ewts2 = convertTraceBufReq2WEW( &tr, 0 );       
602                        else {
603                                ws2ts_purge( NULL, NULL, ewts );
604                                continue;
605                        }
606                        if ( ewts2 == NULL ) {
607                                ws2ts_purge( NULL, NULL, ewts );
608                                continue;
609                        }
610                        if ( 1 ) {
611                                char date[2][20];
612                                EWSUnConvertTime( date[0], tr.actStarttime );
613                                EWSUnConvertTime( date[1], tr.actEndtime);
614                                EWSUnConvertTime( date[0], ewts2->trh2x.starttime );
615                                EWSUnConvertTime( date[1], ewts2->trh2x.endtime );
616                        }
617                }
618               
619                /* Free tracebuf space */
620                free( tr.pBuf );
621                tr.pBuf = NULL;
622               
623                /* Complete second timeseries for diff */
624                if ( processing_mode[pn] == 's' ) {
625                        demean_EWTS( *ewts );
626                        if ( taperType )
627                                taper_EWTS( *ewts, taperType, taperFrac, EWTS_MODE_BOTH );
628                        demean_EWTS( *ewts2 );
629                        if ( taperType )
630                                taper_EWTS( *ewts2, taperType, taperFrac, EWTS_MODE_BOTH );
631                       
632                        subtract_from_EWTS( *ewts, *ewts2, EWTS_MODE_BOTH );
633                               
634                        ws2ts_purge( NULL, NULL, ewts2 );
635                }               
636
637                /* Demean, filter, taper, & produce spectra */
638                demean_EWTS( *ewts );   
639                if ( highpoles | lowpoles ) {
640                        if ( 0 && taperType )
641                                taper_EWTS( *ewts, taperType, taperFrac, EWTS_MODE_BOTH );
642                        iir( ewts, lowcutoff, lowpoles, highcutoff, highpoles );
643                        demean_EWTS( *ewts );
644                }
645                if ( taperType )
646                        taper_EWTS( *ewts, taperType, taperFrac, EWTS_MODE_BOTH );
647                calc_spectra( ewts, &ewspec ); 
648                ws2ts_purge( NULL, NULL, ewts );
649               
650                if ( processing_mode[pn] == 'd' ) {
651                        demean_EWTS( *ewts2 );
652                        if ( highpoles | lowpoles ) {
653                                if ( 0 && taperType )
654                                        taper_EWTS( *ewts2, taperType, taperFrac, EWTS_MODE_BOTH );
655                                iir( ewts2, lowcutoff, lowpoles, highcutoff, highpoles );
656                                demean_EWTS( *ewts2 );
657                        }
658                        if ( taperType )
659                                taper_EWTS( *ewts2, taperType, taperFrac, EWTS_MODE_BOTH );
660                        calc_spectra( ewts2, &ewspec2 );
661                        ws2ts_purge( NULL, NULL, ewts2 );
662                       
663                        deconvolve_spectra( &ewspec, &ewspec2 );
664                        if ( ewspec2.data != NULL )
665                                free( ewspec2.data );
666                }
667               
668                convert_spectra( &ewspec );
669                if ( smooth )
670                        smooth_spectra( &ewspec, EWTS_MODE_FIRST | EWTS_MODE_SECOND );
671               
672                double delta = 1/ewspec.trh2x.samprate;
673                double nyquist= 1.0/(2.0*delta);
674                double ndata = ewspec.dataCount - 1;
675                double scale = nyquist/ndata;
676               
677                if ( OutRingKey != -1 ) {
678                        memcpy( ewspec.trh2x.sta,  SCNL[pn][2][0], TRACE2_STA_LEN );
679                        memcpy( ewspec.trh2x.net,  SCNL[pn][2][1], TRACE2_NET_LEN );
680                        memcpy( ewspec.trh2x.chan, SCNL[pn][2][2], TRACE2_CHAN_LEN );
681                        memcpy( ewspec.trh2x.loc,  SCNL[pn][2][3], TRACE2_LOC_LEN );
682                }
683               
684                MSG_LOGO logo;
685                logo.instid = InstId;
686                logo.mod    = MyModId;
687               
688                char date[2][20];
689                char buffer[2000], line[100], *specData = buffer, *peakData = buffer;
690                EWSUnConvertTime( date[0], start );
691                EWSUnConvertTime( date[1], stop );
692                sprintf( peakData, "Requested start=%15s\tend=%15s\n", date[0], date[1] );
693                peakData += strlen(peakData);
694                EWSUnConvertTime( date[0], ewspec.trh2x.starttime );
695                EWSUnConvertTime( date[1], ewspec.trh2x.endtime );
696                sprintf( peakData, "Actual    start=%15s\tend=%15s\n", 
697                        date[0], date[1] );
698                peakData += strlen(peakData);   
699                sprintf( peakData, "Sampling  nsamp=%15ld\trate=%15.3lf\n", 
700                        ewspec.dataCount, ewspec.trh2x.samprate );
701                peakData += strlen(peakData);
702                sprintf( peakData, "Source 1: %s.%s.%s.%s", 
703                        SCNL[pn][0][0], SCNL[pn][0][1], SCNL[pn][0][2], SCNL[pn][0][3] );
704                peakData += strlen(peakData);
705                if ( binary[pn] ) {
706                        sprintf( peakData, "\tSource 2: %s.%s.%s.%s\t%s\n", 
707                                SCNL[pn][1][0], SCNL[pn][1][1], SCNL[pn][1][2], SCNL[pn][1][3],
708                                processing_mode[pn] == 's' ? "(difference)" : "(deconvolution)");
709                        peakData += strlen(peakData);
710                } else 
711                        *peakData++ = '\n';
712                if ( taperType )
713                        sprintf( peakData, "Tapered %5.2lf%% (%s)\n", taperFrac*100, taperName[taperType] );
714                else
715                        sprintf( peakData, "Tapering disabled\n" );
716                peakData += strlen(peakData);
717                if ( smooth ) 
718                        sprintf( peakData, "Smoothed (%5.2lf Hz)\n", smooth_width );
719                else
720                        sprintf( peakData, "Smoothing disabled\n");                     
721                peakData += strlen(peakData);
722                if ( lowpoles )
723                        sprintf( peakData, "LowCut  (%d poles) %lf\n", lowpoles, lowcutoff );
724                else
725                        sprintf( peakData, "LowCut disabled\n" );
726                peakData += strlen(peakData);
727                if ( highpoles )
728                        sprintf( peakData, "HighCut (%d poles) %lf\n", highpoles, highcutoff );
729                else
730                        sprintf( peakData, "HighCut disabled\n" );
731                peakData += strlen(peakData);
732                specData = peakData;
733
734                if ( numPeaks > 0 ) {
735                        findmax( &ewspec, top3 );
736                       
737                        sprintf( peakData, "Peak band: %15.3lf thru %15.3lf\n", peakBand[0], peakBand[1] );
738                        peakData += strlen(peakData);
739                        for ( i=0; i<numPeaks; i++ ) {
740                                if ( top3[i] == -1 )
741                                        sprintf( peakData, "Peak %d: Not in band\n", i+1 );
742                                else
743                                        sprintf( peakData, "Peak %d: Freq = %lf, amp = %lf\n", i+1, 
744                                                top3[i]*scale/2, ewspec.data[top3[i]] );
745                                peakData += strlen( peakData );
746                        }
747
748                        if ( OutRingKey != -1 ) {
749                                printf("Writing peaks to ring\n");
750                                GetType( "TYPE_SPECTRA_PEAKS", &(logo.type) );
751                                peakData[0] = 0;
752                                if( tport_putmsg( &OutRegion, &logo, peakData-buffer, buffer ) != PUT_OK ) {
753                                   logit("et", "%s:  Error sending writing peaks.\n",
754                                                  Argv0 );
755                                }
756                        } 
757                        if ( fp == NULL && OutRingKey == -1 )
758                                logit( "w", "ewspectra: Peaks requested, but not output means specified\n" );
759                }
760 
761                if ( outpath[0] != 0 ) {
762                        fp = fopen( outpath, "a" );
763                        if ( fp == NULL )
764                                logit( "e", "Could not open output file: '%s'\n", outpath );
765                        fwrite( buffer, peakData-buffer, 1, fp );
766                }
767                       
768                if ( spectraOut ) {                     
769                        printf("ReportSpectra %lf %lf %lf %lf\n", delta, nyquist, ndata, scale);
770                        if ( fp != NULL ) {
771                                printf("Writing spectra to file\n");
772                                write_EWTS_as_spectra_to_file( &ewspec, Argv0, fp );
773                        } 
774                        if ( OutRingKey != -1 ) {
775                                write_EWTS_as_spectra_to_ring( &ewspec, Argv0, &OutRegion, &logo );
776                        }
777                        if ( fp == NULL && OutRingKey == -1 )
778                                logit( "w", "ewspectra: Spectra output requested, but not output means specified\n" );
779                }
780               
781                if ( fp != NULL ) {
782                        fclose( fp );
783                        fp = NULL;
784                }
785                       
786                if ( ewspec.data != NULL )
787                        free( ewspec.data );
788               
789        }
790       
791        /* Free tracebuf space, if still allocated */
792        if ( tr.pBuf != NULL ) 
793                free( tr.pBuf );
794
795}
796
797/***************************************************************************
798 * ewspectra_status() builds a heartbeat or error message & puts it into      *
799 *                 shared memory.  Writes errors to log file & screen.     *
800 ***************************************************************************/
801void ewspectra_status( unsigned char type, short ierr, char *note )
802{
803   MSG_LOGO    logo;
804   char        msg[256];
805   long        size;
806   time_t      t;
807
808/* Build the message
809 *******************/
810   logo.instid = InstId;
811   logo.mod    = MyModId;
812   logo.type   = type;
813
814   time( &t );
815
816   if( type == TypeHeartBeat )
817    sprintf( msg, "%ld %ld\n%c", (long) t, (long) MyPid, (char)0);
818   else if( type == TypeError )
819   {
820    sprintf( msg, "%ld %hd %s\n%c", (long) t, ierr, note, 0);
821
822    logit( "et", "%s(%s): %s\n", Argv0, MyModName, note );
823   }
824
825   size = strlen( msg );   /* don't include the null byte in the message */
826
827/* Write the message to shared memory
828 ************************************/
829   if( tport_putmsg( &InRegion, &logo, size, msg ) != PUT_OK )
830   {
831        if( type == TypeHeartBeat ) {
832           logit("et","%s(%s):  Error sending heartbeat.\n",
833                  Argv0, MyModName );
834    }
835    else if( type == TypeError ) {
836           logit("et", "%s(%s):  Error sending error:%d.\n",
837                  Argv0, MyModName, ierr );
838    }
839   }
840
841   return;
842}
843
844/********************** Message Stacking Thread *******************
845 *           Move messages from transport to memory queue         *
846 ******************************************************************/
847thr_ret MessageStacker( void *dummy )
848{
849   long          recsize;   /* size of retrieved message             */
850   MSG_LOGO      reclogo;       /* logo of retrieved message             */
851   long      filteredSize;  /* size of message after user-filtering  */
852   unsigned char filteredType;  /* type of message after filtering       */
853   int       res;
854   int       ret;
855   char      date[100], *msgName;
856   double    start, stop;
857   int modid, len, okMsg;
858
859   /* Tell the main thread we're ok
860   ********************************/
861   MessageStackerStatus = MSGSTK_ALIVE;
862
863   /* Start main export service loop for current connection
864   ********************************************************/
865   while( 1 )
866   {
867      /* Get a message from transport ring
868      ************************************/
869      res = tport_getmsg( &InRegion, GetLogo, 1,
870                          &reclogo, &recsize, MSrawmsg, MaxMsgSize-1 );
871
872      /* Wait if no messages for us
873       ****************************/
874      if( res == GET_NONE ) {sleep_ew(100); continue;}
875     
876      /* Check return code; report errors
877      ***********************************/
878      if( res != GET_OK )
879      {
880         if( res==GET_TOOBIG )
881         {
882            sprintf( errText, "msg[%ld] i%d m%d t%d too long for target",
883                            recsize, (int) reclogo.instid,
884                (int) reclogo.mod, (int)reclogo.type );
885            ewspectra_status( TypeError, ERR_TOOBIG, errText );
886            continue;
887         }
888         else if( res==GET_MISS )
889         {
890            sprintf( errText, "missed msg(s) i%d m%d t%d in %s",(int) reclogo.instid,
891                    (int) reclogo.mod, (int)reclogo.type, inRing );
892            ewspectra_status( TypeError, ERR_MISSMSG, errText );
893         }
894         else if( res==GET_NOTRACK )
895         {
896            sprintf( errText, "no tracking for logo i%d m%d t%d in %s",
897                     (int) reclogo.instid, (int) reclogo.mod, (int)reclogo.type,
898                     inRing );
899            ewspectra_status( TypeError, ERR_NOTRACK, errText );
900         }
901      }
902     
903      res = sscanf( MSrawmsg, "%d %s %d", &modid, date, &len );
904      if ( modid != MyModId )
905        continue;
906      msgName = "ACTIVATE_MODULE";
907      okMsg = (res == 3);
908          if ( !okMsg ) {
909                        sprintf( errText, "malformed %s msg i%d m%d t%d in %s",msgName,
910                                        (int) reclogo.instid,
911                                        (int) reclogo.mod, (int)reclogo.type, inRing );
912                        ewspectra_status( TypeError, ERR_BADMSG, errText );
913                        continue;
914      }
915
916      /* Process retrieved msg (res==GET_OK,GET_MISS,GET_NOTRACK)
917      ***********************************************************/
918          start = atof( date );
919          if ( start < 0 ) { 
920                start += time(NULL);
921          } else if ( EWSConvertTime (date, &start) == EW_FAILURE ) {
922                sprintf( errText, "%s message w/ bad time (%s)", msgName, date );
923                ewspectra_status( TypeError, ERR_BADMSG, errText );
924          }
925          stop = start + len;
926 
927          process_timespan( wsh, start, stop );
928
929   } /* end of while */
930
931   /* we're quitting
932   *****************/
933error:
934   MessageStackerStatus = MSGSTK_ERR; /* file a complaint to the main thread */
935   KillSelfThread(); /* main thread will restart us */
936}
937
938int main(int argc, char **argv)
939{
940/* Other variables: */
941   int           res;
942   long          recsize;   /* size of retrieved message             */
943   MSG_LOGO      reclogo;   /* logo of retrieved message             */
944       
945        /* Look up installations of interest
946        *********************************/
947        if ( GetLocalInst( &InstId ) != 0 ) {
948          fprintf( stderr,
949                          "%s: error getting local installation id; exiting!\n",
950                           Argv0 );
951          exit( -1 );
952        }
953   if ( GetType( "TYPE_HEARTBEAT", &TypeHeartBeat ) != 0 ) {
954      fprintf( stderr,
955              "%s: Invalid message type <TYPE_HEARTBEAT>; exiting!\n", Argv0 );
956      exit( -1 );
957   }
958   if ( GetType( "TYPE_ERROR", &TypeError ) != 0 ) {
959      fprintf( stderr,
960              "%s: Invalid message type <TYPE_ERROR>; exiting!\n", Argv0 );
961      exit( -1 );
962   }
963
964        if ((progname= strrchr(*argv, (int) '/')) != (char *) 0)
965                progname++;
966        else
967                progname= *argv;
968       
969    logit_init (progname, 0, 1024, 1);
970
971    /* Check command line arguments */
972    if (argc != 2) {
973                fprintf (stderr, "Usage: %s <configfile>\n", progname);
974                return EW_FAILURE;
975    }
976       
977        /* Get our own Pid for restart purposes
978        ***************************************/
979        MyPid = getpid();
980        if(MyPid == -1)
981        {
982      logit("e", "%s: Cannot get pid; exiting!\n", Argv0);
983      return(0);
984        }
985
986        /* Read-in & interpret config file */
987        scale= white= smooth= taperType= 0;
988        num_ews_procs = 0;
989        outpath[0] = 0;
990        find_triggers = 0;
991        lowpoles = highpoles = lowcutoff = highcutoff = 0;
992        InRingKey = OutRingKey = -1;
993        start = -1;
994        wsh = init_ws( argv[1], progname, parse_my_command );
995       
996        if ( wsh == NULL )
997                /* init_ws already reported the problem */
998                return EW_FAILURE;
999
1000        if ( num_ews_procs == 0 ) {
1001                logit( "e", "No SCNLs specified; exiting\n" );
1002                ws2ts_purge( wsh, NULL, NULL );
1003                exit(1);
1004        }
1005        if ( OutRingKey == -1 && outpath[0] == 0 ) {
1006                logit( "e", "No output specified; exiting\n" );
1007                ws2ts_purge( wsh, NULL, NULL );
1008                exit(1);
1009        }
1010        if ( lowcutoff > 0 && highcutoff > 0 && lowcutoff >= highcutoff ) {
1011                logit( "e", "Low Cutoff Frequency Must Be < High Cutoff Frequency.\n" );
1012                ws2ts_purge( wsh, NULL, NULL );
1013                exit(1);
1014        }
1015        if ( OutRingKey != -1 ) {
1016                int i;
1017                for ( i=num_ews_procs-1; i>=0; i-- )
1018                        if ( *SCNL[i][2][3] == 0 )
1019                                if ( outpath[0] != 0 )
1020                                        logit( "w", "No SCNL for ring messages specified for %s.%s.%s.%s; will only write to file\n",
1021                                                SCNL[i][0][0], SCNL[i][0][1], SCNL[i][0][2], SCNL[i][0][3] );
1022                                else {
1023                                        logit( "w", "No SCNL for ring messages specified for %s.%s.%s.%s; aborting this SCNL\n",
1024                                                SCNL[i][0][0], SCNL[i][0][1], SCNL[i][0][2], SCNL[i][0][3] );
1025                                        num_ews_procs--;
1026                                        if ( i < num_ews_procs ) {
1027                                                int j,k;
1028                                                for ( j=0; j<3; j+=(binary[i]?1:2) )
1029                                                        for ( k=0; j<4; k++ )
1030                                                                strcpy( SCNL[i][j][k], SCNL[num_ews_procs][j][k] );
1031                                                binary[i] = binary[num_ews_procs];
1032                                                processing_mode[i] = processing_mode[num_ews_procs];
1033                                        }
1034                                }
1035                if ( num_ews_procs == 0 ) {
1036                        logit( "e", "All SCNLs aborted; exiting\n" );
1037                        ws2ts_purge( wsh, NULL, NULL );
1038                        exit(1);
1039                }
1040               
1041        }
1042        if ( find_triggers ) {
1043                int geti = GetInst( "INST_WILDCARD", &(GetLogo[0].instid) );
1044                int getm = GetModId( "MOD_WILDCARD", &(GetLogo[0].mod) );
1045                int gett = GetType( "TYPE_ACTIVATE_MODULE", &(GetLogo[0].type) );
1046                int getMsg = 0;
1047                if ( ( MSrawmsg = (char *) malloc(MaxMsgSize) ) ==  NULL )
1048                {
1049                  logit( "e", "%s: error allocating MSrawmsg; exiting!\n",
1050                                 Argv0 );
1051                  getMsg = 1;
1052                }
1053                if ( geti || getm || gett || getMsg ) {
1054                        if ( geti )
1055                                logit( "e", "%s: INST_WILDCARD unknown; exiting!\n", Argv0 );
1056                        if ( getm )
1057                                logit( "e", "%s: MOD_WILDCARD unknown; exiting!\n", Argv0 );
1058                        if ( gett )
1059                                logit( "e", "%s: TYPE_ACTIVATE_MODULE unknown; exiting!\n", Argv0 );
1060                        ws2ts_purge( wsh, NULL, NULL );
1061                        exit(1);
1062                }
1063                tport_attach( &InRegion, InRingKey );           
1064        }
1065        if ( OutRingKey != -1 ) {
1066                tport_attach( &OutRegion, OutRingKey );
1067        }
1068       
1069        if ( start != -1 )
1070                process_timespan( wsh, start, stop );
1071
1072        /* Here's where, if InputRing is defined, we'd listen for
1073                COMPUTE_SPECTRA messages, and call process_timespan
1074                for their timespans */
1075        if ( find_triggers ) {
1076
1077           /* step over all messages from transport ring
1078           *********************************************/
1079           /* As Lynn pointed out: if we're restarted by startstop after hanging,
1080                  we should throw away any of our messages in the transport ring.
1081                  Else we could end up re-sending a previously sent message, causing
1082                  time to go backwards... */
1083           do
1084           {
1085                 res = tport_getmsg( &InRegion, GetLogo, 1,
1086                                                         &reclogo, &recsize, MSrawmsg, MaxMsgSize-1 );
1087           } while (res !=GET_NONE);
1088       
1089           /* One heartbeat to announce ourselves to statmgr
1090           ************************************************/
1091           ewspectra_status( TypeHeartBeat, 0, "" );
1092           time(&MyLastBeat);
1093       
1094       
1095           /* Start the message stacking thread if it isn't already running.
1096                ****************************************************************/
1097           if (MessageStackerStatus != MSGSTK_ALIVE )
1098           {
1099                 if ( StartThread(  MessageStacker, (unsigned)THREAD_STACK, &tidStacker ) == -1 )
1100                 {
1101                   logit( "e",
1102                                  "%s(%s): Error starting  MessageStacker thread; exiting!\n",
1103                          Argv0, MyModName );
1104                   tport_detach( &InRegion );
1105                   tport_detach( &OutRegion );
1106                   return( -1 );
1107                 }
1108                 MessageStackerStatus = MSGSTK_ALIVE;
1109           }
1110       
1111       
1112           /* Start main ringdup service loop
1113           **********************************/
1114           while( tport_getflag( &InRegion ) != TERMINATE  &&
1115                          tport_getflag( &InRegion ) != MyPid         )
1116           {
1117                 /* Beat the heart into the transport ring
1118                  ****************************************/
1119                  time(&now);
1120                  if (difftime(now,MyLastBeat) > (double)HeartBeatInt )
1121                  {
1122                          ewspectra_status( TypeHeartBeat, 0, "" );
1123                  time(&MyLastBeat);
1124                  }
1125       
1126                  /* take a brief nap; added 970624:ldd
1127                   ************************************/
1128                  sleep_ew(500);
1129           } /*end while of monitoring loop */
1130               
1131        }
1132               
1133        /* Clean up after ourselves */
1134        if ( InRingKey != -1 )
1135                tport_detach( &InRegion );
1136        if ( OutRingKey != -1 )
1137                tport_detach( &OutRegion );
1138        ws2ts_purge( wsh, NULL, NULL );
1139}
Note: See TracBrowser for help on using the repository browser.