source: trunk/src/seismic_processing/ewspectra/ewspectra.c @ 4194

Revision 4194, 34.1 KB checked in by scott, 10 years ago (diff)

Ewspectra, compute_spectra & sniffspectra

Line 
1/******************************************************************************
2 *
3 *      File:                   ewspectra.c
4 *
5 *      Function:               Earthworm module for computing spectra from waveserver data
6 *
7 *      Author(s):              Scott Hunter, ISTI
8 *
9 *      Source:                 Started anew.
10 *
11 *      Notes:                 
12 *
13 *      Change History:
14 *                      4/26/11 Started source
15 *     
16 *****************************************************************************/
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <time.h>
21#include <earthworm_defs.h>
22#include <ws2ts.h>
23#include <math.h>
24#include <string.h>
25#include <kom.h>
26#include <ew_spectra.h>
27#include <transport.h>
28#include "iir.h"
29#include <ew_spectra_io.h>
30
31#define MAX_EWS_PROCS   10      /* maximum number of spectra processing definitions */
32
33#define MSGSTK_OFF    0              /* MessageStacker has not been started      */
34#define MSGSTK_ALIVE  1              /* MessageStacker alive and well            */
35#define MSGSTK_ERR   -1              /* MessageStacker encountered error quit    */
36volatile int MessageStackerStatus = MSGSTK_OFF;
37
38/* Error messages used by export
39 ***********************************/
40#define  ERR_MISSMSG       0   /* message missed in transport ring        */
41#define  ERR_TOOBIG        1   /* retreived msg too large for buffer      */
42#define  ERR_NOTRACK       2   /* msg retreived; tracking limit exceeded  */
43#define  ERR_BADMSG        3   /* message w/ bad timespan                 */
44static char  errText[256];     /* string for log/error messages           */
45
46#define backward 0
47
48char *progname;
49
50/* Things to look up in the earthworm.h tables with getutil.c functions
51 **********************************************************************/
52static long          InRingKey;     /* key of transport ring for input    */
53static long          OutRingKey;    /* key of transport ring for output   */
54static unsigned char InstId;        /* local installation id              */
55static unsigned char MyModId;       /* Module Id for this program         */
56
57static  SHM_INFO  InRegion;     /* shared memory region to use for input  */
58static  SHM_INFO  OutRegion;    /* shared memory region to use for output */
59
60MSG_LOGO  GetLogo;                      /* requesting module,type,instid */
61pid_t MyPid;        /* Our own pid, sent with heartbeat for restart purposes */
62       
63time_t now;        /* current time, used for timing heartbeats */
64time_t MyLastBeat;         /* time of last local (into Earthworm) hearbeat */
65
66static int     HeartBeatInt;        /* seconds between heartbeats        */
67static unsigned char TypeHeartBeat;
68static unsigned char TypeError;
69
70static char *Rawmsg = NULL;          /* "raw" retrieved msg for main thread      */
71static char *MSrawmsg = NULL;        /* MessageStacker's "raw" retrieved message */
72
73int static long    MaxMsgSize = 100;          /* max size for input msgs    */
74
75WShandle *wsh;
76
77#define THREAD_STACK 8192
78static unsigned tidStacker;          /* Thread moving messages from transport */
79
80int numPeaks = 0;
81double peakBand[2];
82
83
84
85/*****************************************************************************
86   Discrete fourier transform subroutine
87   based on the fast fourier transform algorithm
88   which computes the quantity:
89
90             n-1
91        x(j)=sum  x(k) exp( 2*pi*i*isign*k*j/n )
92             k=0
93
94   n must be a integral power of 2.
95*****************************************************************************/
96static void ah_four(double *data, int n, int isign) {
97        int ip0, ip1, ip2, ip3, i3rev;
98        int i1, i2a, i2b, i3;
99        double sinth, wstpr, wstpi, wr, wi, tempr, tempi, theta;
100
101        ip0= 2;
102        ip3= ip0*n;
103        i3rev= 1;
104        for (i3= 1; i3 <= ip3; i3+= ip0) {
105                if (i3 < i3rev) {
106                        tempr= data[i3-1];
107                        tempi= data[i3];
108                        data[i3-1]= data[i3rev-1];
109                        data[i3]= data[i3rev];
110                        data[i3rev-1]= tempr;
111                        data[i3rev]= tempi;
112                }
113                ip1= ip3/2;
114                do {
115                        if (i3rev <= ip1)
116                                break;
117                        i3rev-= ip1;
118                        ip1/= 2;
119                } while (ip1 >= ip0);
120                i3rev+= ip1;
121        }
122        ip1= ip0;
123        while (ip1 < ip3) {
124                ip2= ip1 * 2;
125                theta= 6.283185/((double) (isign*ip2/ip0));
126                sinth= (double) sin((double) (theta/2.));
127                wstpr= -2.*sinth*sinth;
128                wstpi= (double) sin((double) theta);
129                wr= 1.;
130                wi= 0.;
131                for (i1= 1; i1 <= ip1; i1+= ip0) {
132                        for (i3= i1; i3 <ip3; i3+= ip2) {
133                                i2a= i3;
134                                i2b= i2a+ip1;
135                                tempr= wr*data[i2b-1] - wi*data[i2b];
136                                tempi= wr*data[i2b] + wi*data[i2b-1];
137                                data[i2b-1]= data[i2a-1] - tempr;
138                                data[i2b]= data[i2a] - tempi;
139                                data[i2a-1]+= tempr;
140                                data[i2a]+= tempi;
141                        }
142                        tempr= wr;
143                        wr= wr*wstpr - wi*wstpi + wr;
144                        wi= wi*wstpr + tempr*wstpi + wi;
145                }
146                ip1=ip2;
147        }
148        return;
149}
150
151/*****************************************************************************
152   This subroutine takes a real time series and computes its
153   discrete fourier transform. The time series has n real points
154   and the transform has n/2+1 complex values starting with
155   frequency zero and ending at the nyquist frequency.
156
157   Parameters:
158        x       floating point data array
159        n       the number of samples, an integral power of 2
160*****************************************************************************/
161static void ah_fftr(double *x, int n) {
162        int nn, is, nm, j, i;
163        int k1j, k1i, k2j, k2i;
164        double s, fn, ex, wr, wi, wwr, wrr, wwi, a1, a2, b1, b2;
165
166        nn= n/2;
167        is= 1;
168        ah_four(x, nn, is);
169        nm= nn/2;
170        s= x[0];
171        x[0]+= x[1];
172        x[n]= s - x[1];
173        x[1]= 0.0 ;
174        x[n+1]= 0.0;
175        x[nn+1]= (-x[nn+1]);
176        fn= (double) n;
177        ex= 6.2831853/fn;
178        j= nn;
179        wr= 1.0;
180        wi= 0.0;
181        wwr= (double) cos((double) ex);
182        wwi= (double) (-sin((double) ex));
183        for (i= 2; i <= nm; i++) {
184                wrr= wr*wwr-wi*wwi;
185                wi= wr*wwi+wi*wwr;
186                wr= wrr;
187                k1j= 2*j-1;
188                k1i= 2*i-1;
189                k2j= 2*j;
190                k2i= 2*i;
191                a1= 0.5*(x[k1i-1]+x[k1j-1]);
192                a2= 0.5*(x[k2i-1]-x[k2j-1]);
193                b1= 0.5*(-x[k1i-1]+x[k1j-1]);
194                b2= 0.5*(-x[k2i-1]-x[k2j-1]);
195                s= b1;
196                b1= b1*wr+b2*wi;
197                b2= b2*wr-s*wi;
198                x[k1i-1]= a1-b2;
199                x[k2i-1]= (-a2-b1);
200                x[k1j-1]= a1+b2;
201                x[k2j-1]=  a2-b1;
202                j-= 1;
203        }
204        return;
205}
206
207int scale;                                                      /* 0=no scaling */
208int white;                                                      /* 1=scale amplitude output only by mean */
209double start, stop;                                     /* time period from config */
210int smooth;                                                     /* 0=no smoothing */
211double smooth_width;                            /* width (in secs) of smoothing window */
212char SCNL[MAX_EWS_PROCS][3][4][10];     /* SCNLs to process */
213char binary[MAX_EWS_PROCS];                     /* 1=use 2 SCNLs for processing */
214char processing_mode[MAX_EWS_PROCS];/* what processing to do:
215                                                                                p = Plain (unary)
216                                                                                d = difference (binary)
217                                                                                */
218int taperType;                                          /* what kind of tapering to do:
219                                                                                BARTLETT=1, HANNING=2,
220                                                                                PARZAN=3, BMHARRIS=4
221                                                                                */
222double taperFrac;                                       /* fraction of each end to taper */
223int num_ews_procs;                                      /* # of processing requests */
224char outpath[100];                                      /* path for output file */
225FILE *fp;                                                       /* output file pointer */
226char inRing[MAX_RING_STR];                      /* name of input ring */
227char outRing[MAX_RING_STR];                     /* name of output ring */
228int find_triggers;
229double highcutoff, lowcutoff;           /* freq for high and low cutoffs */
230int highpoles, lowpoles;                        /* # poles for high & low cutoffs (0=none) */
231
232char *taperName[5] = {"", "BARTLETT", "HANNING", "PARZAN", "BMHARRIS"};
233char *Argv0 = "ewspectra";
234
235static char    MyModName[MAX_MOD_STR];       /* speak as this module name/id      */
236int spectraOut = 0;
237
238/***********************************************************************
239 * parse_my_command () - handle config file commands not handled in ws2ts
240 *              return 0 if handled, 1 if unknown or an error
241 ***********************************************************************/
242int parse_my_command() {
243        if (k_its("Scale")) {
244                scale = 1;
245        } else if (k_its("White")) {
246                white = 1;
247        } else if (k_its("Smooth")) {
248                smooth = 1;
249                smooth_width = k_val();
250        } else if (k_its("Taper")) {
251                char *taperTypeName;
252                taperTypeName = k_str();
253                for ( taperType = 4; taperType>0; taperType-- )
254                        if ( strcmp( taperTypeName, taperName[taperType] )==0 )
255                                break;
256                if ( taperType == 0 ) {
257                        logit( "e", "Invalid type of taper: '%s'\n", taperTypeName );
258                        return 1;
259                }
260                taperFrac = k_val();
261        } else if (k_its("MyModuleId")) {
262                strcpy( MyModName, k_str() );
263                if ( GetModId( MyModName, &MyModId ) != 0 ) {
264                  logit( "e", "%s: Invalid module name <%s>; exiting!\n",
265                                   Argv0, MyModName );
266                  return 1;
267                }
268        } else if (k_its("DeconvolveSpectraSCNLs") || k_its("DiffSpectraSCNLs")
269                || k_its("PlainSpectraSCNL")) {
270                int i,j,step;
271                if ( num_ews_procs >= MAX_EWS_PROCS ) {
272                        logit( "e", "Too many processing requests; ignoring '%s'\n", k_com() );
273                        return 0;
274                }
275                step = k_its("PlainSpectraSCNL") ? 2 : 1;
276                *SCNL[num_ews_procs][2][3] = 0;
277                for ( i=0; i<3; i+=step )
278                        for ( j=0; j<4; j++ ) {
279                                char *str = k_str();
280                                if ( str == NULL )
281                                        if ( i < 2 ) {
282                                                logit( "e", "Incomplete source SCNL for %s\n", k_com() );
283                                                return 0;
284                                        } else if ( j > 0 ) {
285                                                logit( "e", "Target SCNL must be fully specified for %s\n", k_com() );
286                                                return 0;
287                                        } else
288                                                break;
289                                strcpy( SCNL[num_ews_procs][i][j], str );
290                        }
291                processing_mode[num_ews_procs] = step==2 ? 'p' : k_its("DeconvolveSpectraSCNLs") ? 'd' : 's';
292                binary[num_ews_procs] = (step == 1);
293                num_ews_procs++;
294        } else if (k_its("TimeSpan")) {
295                char *arg1s = k_str();
296                double arg1 = atof( arg1s );
297                if ( arg1 < 0 ) { 
298                        start = time(NULL) + arg1;
299                } else if ( EWSConvertTime (arg1s, &start) == EW_FAILURE ) {
300                        return 1;
301                }
302                stop = start + k_val();
303        } else if (k_its("OutFile")) {
304                strcpy( outpath, k_str() );
305        } else if (k_its("InRing")) {
306                find_triggers = 1;
307                strcpy( inRing, k_str() );
308                if ( ( InRingKey = GetKey(inRing) ) == -1 ) {
309                        logit( "e", "%s:  Invalid ring name <%s>; exiting!\n",
310                                         Argv0, inRing);
311                        return 1;
312                }
313        } else if (k_its("OutRing")) {
314                strcpy( outRing, k_str() );
315            if ( ( OutRingKey = GetKey(outRing) ) == -1 ) {
316                        logit( "e", "%s:  Invalid ring name <%s>; exiting!\n",
317                                         Argv0, outRing);
318                        return 1;
319                }
320        } else if (k_its("HighCut")) {
321                highcutoff = k_val();
322                if ( highcutoff <= 0 ) {
323                        logit( "e", "HighCut cutoff frequency must be > 0\n" );
324                        return 1;
325                }
326                highpoles = k_int();
327                if ( highpoles % 2 || highpoles < 2 || highpoles > 12 )  {
328                        logit( "e", "# poles for HighCut must be 2, 4, 6, 8, 10 or 12\n" );
329                        return 1;
330                }
331        } else if (k_its("LowCut")) {
332                lowcutoff = k_val();
333                if ( lowcutoff <= 0 ) {
334                        logit( "e", "LowCut cutoff frequency must be > 0\n" );
335                        return 1;
336                }
337                lowpoles = k_int();
338                if ( lowpoles % 2 || lowpoles < 2 || lowpoles > 12 )  {
339                        logit( "e", "# poles for LowCut must be 2, 4, 6, 8, 10 or 12\n" );
340                        return 1;
341                }
342        } else if (k_its("ReportSpectra")) {
343                spectraOut = 1;
344        } else if (k_its("ReportPeaks")) {
345                numPeaks = k_int();
346                peakBand[0] = k_val();
347                peakBand[1] = k_val();
348        } else
349                return 1;
350        return 0;
351}
352
353
354
355/***********************************************************************
356 * deconvolve_spectra () - deconvolve spectra b into a
357 *              return 0 if no problems, 1 otherwise
358 ***********************************************************************/
359int deconvolve_spectra( EW_TIME_SERIES *spec_a, EW_TIME_SERIES *spec_b ) {
360        int ndata = spec_a->dataCount * 2;
361        int i;
362        double denom, result_0, result_1;
363        double *a = spec_a->data, *b = spec_b->data;
364        for ( i=0; i<=ndata; i+=2 ) {
365                denom = b[i]*b[i] + b[i+1]*b[i+1];
366                if ( denom == 0.0 )
367                        a[i] = a[i+1] = 0.0;
368                else {
369                        result_0 = a[i]*b[i] + a[i+1]*b[i+1];
370                        result_1 = b[i]*a[i+1] - a[i]*b[i+1];
371                        a[i]   = result_0 / denom;
372                        a[i+1] = result_1 / denom;
373                }
374                       
375        }
376        return 0;
377}
378
379static double iScale( int i, double t, void* scale ) {
380        return i*(*(double*)scale);
381}
382
383/***********************************************************************
384 * smooth_spectra () - smooth the spectra timeseries
385 *              return 0 if no problems, 1 otherwise
386 ***********************************************************************/
387int smooth_spectra( EW_TIME_SERIES * ewspec, int mode ) {
388        double *specsm, even_sum, odd_sum;
389        int i, imod;
390        double delta = 1/ewspec->trh2x.samprate;
391        int smoother= smooth_width/delta;
392        int ndata = ewspec->dataCount * 2;
393        double *spec = ewspec->data;
394       
395        /* make sure that smoother is even */
396        smoother= (smoother/2)*2;
397        if ((specsm= (double *) malloc((smoother)*sizeof(double))) == NULL ) {
398                logit("et","%s: memory allocation error\n", progname);
399                return 1;
400        }
401        /* Create & fill smoothing window */
402        memcpy( specsm, spec, smoother*2*sizeof(double) );
403        even_sum = odd_sum = 0;
404        for ( i=0; i<smoother*2; i+=2) {
405                if ( mode & EWTS_MODE_FIRST )
406                        even_sum   += spec[i];
407                if ( mode & EWTS_MODE_SECOND )
408                        odd_sum += spec[i+1];
409        }
410               
411        for (i= smoother, imod=0; i <= ndata-smoother; i+= 2, imod = (imod+2)%(smoother*2) ) {
412                if ( mode & EWTS_MODE_FIRST ) {
413                        even_sum += spec[i+smoother];           /* Add next item to window sums */
414                        spec[i]   = even_sum / (smoother+1);/* Store next smoothed result */
415                        even_sum -= specsm[imod];                       /* Remove oldest value from sums... */
416                        specsm[imod]   = spec[i+smoother];      /*... and replace it with newest in window */
417                }
418                if ( mode & EWTS_MODE_SECOND ) {
419                        odd_sum  += spec[i+smoother+1];         /* Add next item to window sums */
420                        spec[i+1] = odd_sum  / (smoother+1);/* Store next smoothed result */
421                        odd_sum -= specsm[imod+1];              /* Remove oldest value from sums... */
422                        specsm[imod+1] = spec[i+smoother+1];/*... and replace it with newest in window */
423                }
424        }
425        free(specsm);
426        return 0;
427}
428
429/***********************************************************************
430 * convert_spectra () - convert complex spectra to amp/phase representation
431 *              return 0 if no problems, 1 otherwise
432 ***********************************************************************/
433int convert_spectra( EW_TIME_SERIES * ewspec ) {
434        int i, ndata = ewspec->dataCount * 2;
435        double *spec = ewspec->data;
436        double amp, phase;
437        for (i= 0; i <= ndata; i+= 2) {
438                amp= (double) sqrt((double) (spec[i]*spec[i]+spec[i+1]*spec[i+1]));
439                phase= (double) atan2(spec[i+1], spec[i]);
440                spec[i]= amp;
441                spec[i+1]= phase;
442        }
443        ewspec->dataType = EWTS_TYPE_AMPPHS;
444        return 0;
445}
446
447/* Similar thing for white */
448
449/***********************************************************************
450 * calc_spectra () - calculate the spectra for a timeseries, store in
451 *      provided timeseries
452 *              return 0 if no problems, 1 otherwise
453 ***********************************************************************/
454int calc_spectra( EW_TIME_SERIES * ewts, EW_TIME_SERIES * ewspec ) 
455{
456        double *spec;
457        int i, ndata;
458        /*double delta = 1/ewts->trh2x.samprate;*/
459       
460        *ewspec = *ewts;
461
462        ndata = ewts->dataCount;
463
464        /* get nearest power of 2 for listed data length */
465        for (i= 1; (int) pow((double) 2., (double) i) < ewts->dataCount; i++);
466        ndata= (int) pow((double) 2.,(double) (i+1));
467        ewspec->dataCount = ndata/2;
468        ewspec->dataType = EWTS_TYPE_COMPLEX;
469
470        /* allocate space for data and smoothing area */
471        if ((spec= (double *) malloc((unsigned) (ndata+2)*sizeof(double))) == NULL ) {
472                logit("et","%s: memory allocation error\n", progname);
473                return 1;
474        }
475        ewspec->data = spec;
476        memcpy( spec, ewts->data, ewts->dataCount*sizeof(double) );
477        bzero( spec+ewts->dataCount, sizeof(double)*(ndata-ewts->dataCount+2) );
478
479        ah_fftr(spec, ndata);
480       
481        return 0;
482}
483
484
485void findmax(EW_TIME_SERIES *ewspec, int top3[])
486{
487        double *arr = ewspec->data;
488        int n = ewspec->dataCount;
489        double prev = arr[0];
490        int i, j, cap, locap;
491        int goup = 1;
492        double *top3val;
493
494        double delta = 1/ewspec->trh2x.samprate;
495        double nyquist= 1.0/(2.0*delta);
496        double ndata = ewspec->dataCount - 1;
497        double scale = nyquist/ndata;
498       
499        printf( "Finding %d peaks\n", numPeaks );
500        top3val = malloc( sizeof(double) * numPeaks );
501        if ( top3val == NULL ) {
502                logit( "e", "Failed to allocate memory for peak-finding\n" );
503                exit(1);
504        }
505        for ( i=0; i<numPeaks; i++ ) {
506                top3[i] = -1;
507                top3val[i] = -1;
508        }
509
510        locap = (peakBand[0] / scale)*2;
511        if ( locap < 0 )
512                locap = 0;
513        if ( locap % 2 )
514                locap--;
515        cap = (peakBand[1] / scale)*2;
516        if ( cap % 2 )
517                cap++;
518        if ( cap > n )
519                cap = n;
520        for( i=0; i <= cap; i+=2)
521        {
522                double l,r;
523                l = (i - 2 < 0)?arr[0]:arr[i-2];
524                r = (i + 2 >= n)?arr[n-2]:arr[i+2];
525                double cur = (l + 2*arr[i] + r)/4;
526                if(goup) {
527                        if(prev > cur && i > 0) {
528                                int newidx = i-2;
529                                double newval = arr[newidx];
530                                if ( newidx >= locap && newval > top3val[numPeaks-1] ) {
531                                        for ( j=numPeaks-2; j>=0 && newval > top3val[j]; j-- ) {
532                                                top3val[j+1] = top3val[j];
533                                                top3[j+1] = top3[j];
534                                        }
535                                        top3[j+1] = newidx;
536                                        top3val[j+1] = newval;
537                                }
538                                goup = 0;
539                        }
540                } else {
541                        if(prev < cur)
542                                goup = 1;
543                }
544                prev = cur;
545        }
546        free( top3val );
547}
548
549/***********************************************************************
550 * process_timespan () - process the specified timespan for data gotten
551 *              for the SCNLs specified in config from the set of waveservers
552 *              in wsh
553 ***********************************************************************/
554void process_timespan(  WShandle *wsh, double start, double stop ) 
555{
556        int ret, i, pn;
557        EW_TIME_SERIES *ewts = NULL, *ewts2 = NULL, ewspec, ewspec2;
558        TRACE_REQ tr;
559        int *top3 = malloc( sizeof(int)*numPeaks );
560
561        tr.pBuf = NULL;
562        /* Try each of the processing requests from config */
563        for ( pn=0; pn < num_ews_procs; pn++ ) {
564       
565                /* Get the tracebufs for the first/only SCNL */
566                tr.bufLen = (stop - start)*1000000;
567                if ( tr.pBuf == NULL ) {
568                        tr.pBuf = malloc( tr.bufLen );
569                        if ( tr.pBuf == NULL ) {
570                                logit( "et", "%s: failed to allocate waveserver buffer\n", progname );
571                                continue;
572                        }
573                }
574                tr.timeout = 500;
575                tr.reqStarttime = start;
576                tr.reqEndtime = stop;           
577                ret = getTraceBufsFromWS( wsh, SCNL[pn][0][0], SCNL[pn][0][1], 
578                                                                        SCNL[pn][0][2], SCNL[pn][0][3], &tr );
579       
580                /* Convert to a timeseries */
581                if ( ret == 0 )
582                        ewts = convertTraceBufReq2WEW( &tr, 0 );
583                else 
584                        continue; /* move on to next SCNL */
585                       
586                if ( ewts == NULL )
587                        continue; /* move on to next SCNL */
588                if ( 1 ) {
589                        char date[2][20];
590                        EWSUnConvertTime( date[0], tr.actStarttime );
591                        EWSUnConvertTime( date[1], tr.actEndtime);
592                        EWSUnConvertTime( date[0], ewts->trh2x.starttime );
593                        EWSUnConvertTime( date[1], ewts->trh2x.endtime );
594                }
595
596                /* Handle the second timeseries, if present */
597                if ( binary[pn] ) {
598                        ret = getTraceBufsFromWS( wsh, SCNL[pn][1][0], SCNL[pn][1][1], 
599                                                                                SCNL[pn][1][2], SCNL[pn][1][3], &tr );
600                        if ( ret == 0 )
601                                ewts2 = convertTraceBufReq2WEW( &tr, 0 );       
602                        else {
603                                ws2ts_purge( NULL, NULL, ewts );
604                                continue;
605                        }
606                        if ( ewts2 == NULL ) {
607                                ws2ts_purge( NULL, NULL, ewts );
608                                continue;
609                        }
610                        if ( 1 ) {
611                                char date[2][20];
612                                EWSUnConvertTime( date[0], tr.actStarttime );
613                                EWSUnConvertTime( date[1], tr.actEndtime);
614                                EWSUnConvertTime( date[0], ewts2->trh2x.starttime );
615                                EWSUnConvertTime( date[1], ewts2->trh2x.endtime );
616                        }
617                }
618               
619                /* Free tracebuf space */
620                free( tr.pBuf );
621                tr.pBuf = NULL;
622               
623                /* Complete second timeseries for diff */
624                if ( processing_mode[pn] == 's' ) {
625                        demean_EWTS( *ewts );
626                        if ( taperType )
627                                taper_EWTS( *ewts, taperType, taperFrac, EWTS_MODE_BOTH );
628                        demean_EWTS( *ewts2 );
629                        if ( taperType )
630                                taper_EWTS( *ewts2, taperType, taperFrac, EWTS_MODE_BOTH );
631                       
632                        subtract_from_EWTS( *ewts, *ewts2, EWTS_MODE_BOTH );
633                               
634                        ws2ts_purge( NULL, NULL, ewts2 );
635                }               
636
637                /* Demean, filter, taper, & produce spectra */
638                demean_EWTS( *ewts );   
639                if ( highpoles | lowpoles ) {
640                        if ( 0 && taperType )
641                                taper_EWTS( *ewts, taperType, taperFrac, EWTS_MODE_BOTH );
642                        iir( ewts, lowcutoff, lowpoles, highcutoff, highpoles );
643                        demean_EWTS( *ewts );
644                }
645                if ( taperType )
646                        taper_EWTS( *ewts, taperType, taperFrac, EWTS_MODE_BOTH );
647                calc_spectra( ewts, &ewspec ); 
648                ws2ts_purge( NULL, NULL, ewts );
649               
650                if ( processing_mode[pn] == 'd' ) {
651                        demean_EWTS( *ewts2 );
652                        if ( highpoles | lowpoles ) {
653                                if ( 0 && taperType )
654                                        taper_EWTS( *ewts2, taperType, taperFrac, EWTS_MODE_BOTH );
655                                iir( ewts2, lowcutoff, lowpoles, highcutoff, highpoles );
656                                demean_EWTS( *ewts2 );
657                        }
658                        if ( taperType )
659                                taper_EWTS( *ewts2, taperType, taperFrac, EWTS_MODE_BOTH );
660                        calc_spectra( ewts2, &ewspec2 );
661                        ws2ts_purge( NULL, NULL, ewts2 );
662                       
663                        deconvolve_spectra( &ewspec, &ewspec2 );
664                        if ( ewspec2.data != NULL )
665                                free( ewspec2.data );
666                }
667               
668                convert_spectra( &ewspec );
669                if ( smooth )
670                        smooth_spectra( &ewspec, EWTS_MODE_FIRST | EWTS_MODE_SECOND );
671               
672                double delta = 1/ewspec.trh2x.samprate;
673                double nyquist= 1.0/(2.0*delta);
674                double ndata = ewspec.dataCount - 1;
675                double scale = nyquist/ndata;
676               
677                if ( OutRingKey != -1 ) {
678                        memcpy( ewspec.trh2x.sta,  SCNL[pn][2][0], TRACE2_STA_LEN );
679                        memcpy( ewspec.trh2x.net,  SCNL[pn][2][1], TRACE2_NET_LEN );
680                        memcpy( ewspec.trh2x.chan, SCNL[pn][2][2], TRACE2_CHAN_LEN );
681                        memcpy( ewspec.trh2x.loc,  SCNL[pn][2][3], TRACE2_LOC_LEN );
682                }
683               
684                MSG_LOGO logo;
685                logo.instid = InstId;
686                logo.mod    = MyModId;
687               
688                char date[2][20];
689                char buffer[2000], line[100], *specData = buffer, *peakData = buffer;
690                EWSUnConvertTime( date[0], start );
691                EWSUnConvertTime( date[1], stop );
692                sprintf( peakData, "Requested start=%15s\tend=%15s\n", date[0], date[1] );
693                peakData += strlen(peakData);
694                EWSUnConvertTime( date[0], ewspec.trh2x.starttime );
695                EWSUnConvertTime( date[1], ewspec.trh2x.endtime );
696                sprintf( peakData, "Actual    start=%15s\tend=%15s\n", 
697                        date[0], date[1] );
698                peakData += strlen(peakData);   
699                sprintf( peakData, "Sampling  nsamp=%15ld\trate=%15.3lf\n", 
700                        ewspec.dataCount, ewspec.trh2x.samprate );
701                peakData += strlen(peakData);
702                sprintf( peakData, "Source 1: %s.%s.%s.%s", 
703                        SCNL[pn][0][0], SCNL[pn][0][1], SCNL[pn][0][2], SCNL[pn][0][3] );
704                peakData += strlen(peakData);
705                if ( binary[pn] ) {
706                        sprintf( peakData, "\tSource 2: %s.%s.%s.%s\t%s\n", 
707                                SCNL[pn][1][0], SCNL[pn][1][1], SCNL[pn][1][2], SCNL[pn][1][3],
708                                processing_mode[pn] == 's' ? "(difference)" : "(deconvolution)");
709                        peakData += strlen(peakData);
710                } else 
711                        *peakData++ = '\n';
712                if ( taperType )
713                        sprintf( peakData, "Tapered %5.2lf%% (%s)\n", taperFrac*100, taperName[taperType] );
714                else
715                        sprintf( peakData, "Tapering disabled\n" );
716                peakData += strlen(peakData);
717                if ( smooth ) 
718                        sprintf( peakData, "Smoothed (%5.2lf Hz)\n", smooth_width );
719                else
720                        sprintf( peakData, "Smoothing disabled\n");                     
721                peakData += strlen(peakData);
722                if ( lowpoles )
723                        sprintf( peakData, "LowCut  (%d poles) %lf\n", lowpoles, lowcutoff );
724                else
725                        sprintf( peakData, "LowCut disabled\n" );
726                peakData += strlen(peakData);
727                if ( highpoles )
728                        sprintf( peakData, "HighCut (%d poles) %lf\n", highpoles, highcutoff );
729                else
730                        sprintf( peakData, "HighCut disabled\n" );
731                peakData += strlen(peakData);
732                specData = peakData;
733
734                if ( numPeaks > 0 ) {
735                        findmax( &ewspec, top3 );
736                       
737                        sprintf( peakData, "Peak band: %15.3lf thru %15.3lf\n", peakBand[0], peakBand[1] );
738                        peakData += strlen(peakData);
739                        for ( i=0; i<numPeaks; i++ ) {
740                                if ( top3[i] == -1 )
741                                        sprintf( peakData, "Peak %d: Not in band\n", i+1 );
742                                else
743                                        sprintf( peakData, "Peak %d: Freq = %lf, amp = %lf\n", i+1, 
744                                                top3[i]*scale/2, ewspec.data[top3[i]] );
745                                peakData += strlen( peakData );
746                        }
747
748                        if ( OutRingKey != -1 ) {
749                                printf("Writing peaks to ring\n");
750                                GetType( "TYPE_SPECTRA_PEAKS", &(logo.type) );
751                                peakData[0] = 0;
752                                if( tport_putmsg( &OutRegion, &logo, peakData-buffer, buffer ) != PUT_OK ) {
753                                   logit("et", "%s:  Error sending writing peaks.\n",
754                                                  Argv0 );
755                                }
756                        } 
757                        if ( fp == NULL && OutRingKey == -1 )
758                                logit( "w", "ewspectra: Peaks requested, but not output means specified\n" );
759                }
760 
761                if ( outpath[0] != 0 ) {
762                        fp = fopen( outpath, "a" );
763                        if ( fp == NULL )
764                                logit( "e", "Could not open output file: '%s'\n", outpath );
765                        fwrite( buffer, peakData-buffer, 1, fp );
766                }
767                       
768                if ( spectraOut ) {                     
769                        printf("ReportSpectra %lf %lf %lf %lf\n", delta, nyquist, ndata, scale);
770                        if ( fp != NULL ) {
771                                printf("Writing spectra to file\n");
772                                write_EWTS_as_spectra_to_file( &ewspec, Argv0, fp );
773                        } 
774                        if ( OutRingKey != -1 ) {
775                                write_EWTS_as_spectra_to_ring( &ewspec, Argv0, &OutRegion, &logo );
776                        }
777                        if ( fp == NULL && OutRingKey == -1 )
778                                logit( "w", "ewspectra: Spectra output requested, but not output means specified\n" );
779                }
780               
781                if ( fp != NULL ) {
782                        fclose( fp );
783                        fp = NULL;
784                }
785                       
786                if ( ewspec.data != NULL )
787                        free( ewspec.data );
788               
789        }
790       
791        /* Free tracebuf space, if still allocated */
792        if ( tr.pBuf != NULL ) 
793                free( tr.pBuf );
794
795}
796
797/***************************************************************************
798 * ewspectra_status() builds a heartbeat or error message & puts it into      *
799 *                 shared memory.  Writes errors to log file & screen.     *
800 ***************************************************************************/
801void ewspectra_status( unsigned char type, short ierr, char *note )
802{
803   MSG_LOGO    logo;
804   char        msg[256];
805   long        size;
806   time_t      t;
807
808/* Build the message
809 *******************/
810   logo.instid = InstId;
811   logo.mod    = MyModId;
812   logo.type   = type;
813
814   time( &t );
815
816   if( type == TypeHeartBeat )
817    sprintf( msg, "%ld %ld\n%c", (long) t, (long) MyPid, (char)0);
818   else if( type == TypeError )
819   {
820    sprintf( msg, "%ld %hd %s\n%c", (long) t, ierr, note, 0);
821
822    logit( "et", "%s(%s): %s\n", Argv0, MyModName, note );
823   }
824
825   size = strlen( msg );   /* don't include the null byte in the message */
826
827/* Write the message to shared memory
828 ************************************/
829   if( tport_putmsg( &InRegion, &logo, size, msg ) != PUT_OK )
830   {
831        if( type == TypeHeartBeat ) {
832           logit("et","%s(%s):  Error sending heartbeat.\n",
833                  Argv0, MyModName );
834    }
835    else if( type == TypeError ) {
836           logit("et", "%s(%s):  Error sending error:%d.\n",
837                  Argv0, MyModName, ierr );
838    }
839   }
840
841   return;
842}
843
844/********************** Message Stacking Thread *******************
845 *           Move messages from transport to memory queue         *
846 ******************************************************************/
847thr_ret MessageStacker( void *dummy )
848{
849   long          recsize;   /* size of retrieved message             */
850   MSG_LOGO      reclogo;       /* logo of retrieved message             */
851   long      filteredSize;  /* size of message after user-filtering  */
852   unsigned char filteredType;  /* type of message after filtering       */
853   int       res;
854   int       ret;
855   char      msg[100];
856   double    start, stop;
857
858   /* Tell the main thread we're ok
859   ********************************/
860   MessageStackerStatus = MSGSTK_ALIVE;
861
862   /* Start main export service loop for current connection
863   ********************************************************/
864   while( 1 )
865   {
866      /* Get a message from transport ring
867      ************************************/
868      res = tport_getmsg( &InRegion, &GetLogo, 1,
869                          &reclogo, &recsize, MSrawmsg, MaxMsgSize-1 );
870
871      /* Wait if no messages for us
872       ****************************/
873      if( res == GET_NONE ) {sleep_ew(100); continue;}
874     
875      /* Check return code; report errors
876      ***********************************/
877      if( res != GET_OK )
878      {
879         if( res==GET_TOOBIG )
880         {
881            sprintf( errText, "msg[%ld] i%d m%d t%d too long for target",
882                            recsize, (int) reclogo.instid,
883                (int) reclogo.mod, (int)reclogo.type );
884            ewspectra_status( TypeError, ERR_TOOBIG, errText );
885            continue;
886         }
887         else if( res==GET_MISS )
888         {
889            sprintf( errText, "missed msg(s) i%d m%d t%d in %s",(int) reclogo.instid,
890                    (int) reclogo.mod, (int)reclogo.type, inRing );
891            ewspectra_status( TypeError, ERR_MISSMSG, errText );
892         }
893         else if( res==GET_NOTRACK )
894         {
895            sprintf( errText, "no tracking for logo i%d m%d t%d in %s",
896                     (int) reclogo.instid, (int) reclogo.mod, (int)reclogo.type,
897                     inRing );
898            ewspectra_status( TypeError, ERR_NOTRACK, errText );
899         }
900      }
901
902      /* Process retrieved msg (res==GET_OK,GET_MISS,GET_NOTRACK)
903      ***********************************************************/
904          strncpy( msg, MSrawmsg, recsize );
905          msg[recsize] = 0;
906                start = atof( msg );
907                if ( start < 0 ) { 
908                        start += time(NULL);
909                } else if ( EWSConvertTime (msg, &start) == EW_FAILURE ) {
910            sprintf( errText, "COMPUTE_SPECTRA message w/ bad time" );
911            ewspectra_status( TypeError, ERR_BADMSG, errText );
912                }
913                stop = start + atoi( msg+14 );
914         
915                process_timespan( wsh, start, stop );
916
917   } /* end of while */
918
919   /* we're quitting
920   *****************/
921error:
922   MessageStackerStatus = MSGSTK_ERR; /* file a complaint to the main thread */
923   KillSelfThread(); /* main thread will restart us */
924}
925
926int main(int argc, char **argv)
927{
928/* Other variables: */
929   int           res;
930   long          recsize;   /* size of retrieved message             */
931   MSG_LOGO      reclogo;   /* logo of retrieved message             */
932       
933        /* Look up installations of interest
934        *********************************/
935        if ( GetLocalInst( &InstId ) != 0 ) {
936          fprintf( stderr,
937                          "%s: error getting local installation id; exiting!\n",
938                           Argv0 );
939          exit( -1 );
940        }
941   if ( GetType( "TYPE_HEARTBEAT", &TypeHeartBeat ) != 0 ) {
942      fprintf( stderr,
943              "%s: Invalid message type <TYPE_HEARTBEAT>; exiting!\n", Argv0 );
944      exit( -1 );
945   }
946   if ( GetType( "TYPE_ERROR", &TypeError ) != 0 ) {
947      fprintf( stderr,
948              "%s: Invalid message type <TYPE_ERROR>; exiting!\n", Argv0 );
949      exit( -1 );
950   }
951
952        if ((progname= strrchr(*argv, (int) '/')) != (char *) 0)
953                progname++;
954        else
955                progname= *argv;
956       
957    logit_init (progname, 0, 1024, 1);
958
959    /* Check command line arguments */
960    if (argc != 2) {
961                fprintf (stderr, "Usage: %s <configfile>\n", progname);
962                return EW_FAILURE;
963    }
964       
965        /* Get our own Pid for restart purposes
966        ***************************************/
967        MyPid = getpid();
968        if(MyPid == -1)
969        {
970      logit("e", "%s: Cannot get pid; exiting!\n", Argv0);
971      return(0);
972        }
973
974        /* Read-in & interpret config file */
975        scale= white= smooth= taperType= 0;
976        num_ews_procs = 0;
977        outpath[0] = 0;
978        find_triggers = 0;
979        lowpoles = highpoles = lowcutoff = highcutoff = 0;
980        InRingKey = OutRingKey = -1;
981        start = -1;
982        wsh = init_ws( argv[1], progname, parse_my_command );
983       
984        if ( wsh == NULL )
985                /* init_ws already reported the problem */
986                return EW_FAILURE;
987
988        if ( num_ews_procs == 0 ) {
989                logit( "e", "No SCNLs specified; exiting\n" );
990                ws2ts_purge( wsh, NULL, NULL );
991                exit(1);
992        }
993        if ( OutRingKey == -1 && outpath[0] == 0 ) {
994                logit( "e", "No output specified; exiting\n" );
995                ws2ts_purge( wsh, NULL, NULL );
996                exit(1);
997        }
998        if ( lowcutoff > 0 && highcutoff > 0 && lowcutoff >= highcutoff ) {
999                logit( "e", "Low Cutoff Frequency Must Be < High Cutoff Frequency.\n" );
1000                ws2ts_purge( wsh, NULL, NULL );
1001                exit(1);
1002        }
1003        if ( OutRingKey != -1 ) {
1004                int i;
1005                for ( i=num_ews_procs-1; i>=0; i-- )
1006                        if ( *SCNL[i][2][3] == 0 )
1007                                if ( outpath[0] != 0 )
1008                                        logit( "w", "No SCNL for ring messages specified for %s.%s.%s.%s; will only write to file\n",
1009                                                SCNL[i][0][0], SCNL[i][0][1], SCNL[i][0][2], SCNL[i][0][3] );
1010                                else {
1011                                        logit( "w", "No SCNL for ring messages specified for %s.%s.%s.%s; aborting this SCNL\n",
1012                                                SCNL[i][0][0], SCNL[i][0][1], SCNL[i][0][2], SCNL[i][0][3] );
1013                                        num_ews_procs--;
1014                                        if ( i < num_ews_procs ) {
1015                                                int j,k;
1016                                                for ( j=0; j<3; j+=(binary[i]?1:2) )
1017                                                        for ( k=0; j<4; k++ )
1018                                                                strcpy( SCNL[i][j][k], SCNL[num_ews_procs][j][k] );
1019                                                binary[i] = binary[num_ews_procs];
1020                                                processing_mode[i] = processing_mode[num_ews_procs];
1021                                        }
1022                                }
1023                if ( num_ews_procs == 0 ) {
1024                        logit( "e", "All SCNLs aborted; exiting\n" );
1025                        ws2ts_purge( wsh, NULL, NULL );
1026                        exit(1);
1027                }
1028               
1029        }
1030        if ( find_triggers ) {
1031                int geti = GetInst( "INST_WILDCARD", &(GetLogo.instid) );
1032                int getm = GetModId( "MOD_WILDCARD", &(GetLogo.mod) );
1033                int gett = GetType( "TYPE_COMPUTE_SPECTRA", &(GetLogo.type) );
1034                int getMsg = 0;
1035                if ( ( MSrawmsg = (char *) malloc(MaxMsgSize) ) ==  NULL )
1036                {
1037                  logit( "e", "%s: error allocating MSrawmsg; exiting!\n",
1038                                 Argv0 );
1039                  getMsg = 1;
1040                }
1041                if ( geti || getm || gett || getMsg ) {
1042                        if ( geti )
1043                                logit( "e", "%s: INST_WILDCARD unknown; exiting!\n", Argv0 );
1044                        if ( getm )
1045                                logit( "e", "%s: MOD_WILDCARD unknown; exiting!\n", Argv0 );
1046                        if ( geti )
1047                                logit( "e", "%s: TYPE_COMPUTE_SPECTRA unknown; exiting!\n", Argv0 );
1048                        ws2ts_purge( wsh, NULL, NULL );
1049                        exit(1);
1050                }
1051                tport_attach( &InRegion, InRingKey );           
1052        }
1053        if ( OutRingKey != -1 ) {
1054                tport_attach( &OutRegion, OutRingKey );
1055        }
1056       
1057        if ( start != -1 )
1058                process_timespan( wsh, start, stop );
1059
1060        /* Here's where, if InputRing is defined, we'd listen for
1061                COMPUTE_SPECTRA messages, and call process_timespan
1062                for their timespans */
1063        if ( find_triggers ) {
1064
1065           /* step over all messages from transport ring
1066           *********************************************/
1067           /* As Lynn pointed out: if we're restarted by startstop after hanging,
1068                  we should throw away any of our messages in the transport ring.
1069                  Else we could end up re-sending a previously sent message, causing
1070                  time to go backwards... */
1071           do
1072           {
1073                 res = tport_getmsg( &InRegion, &GetLogo, 1,
1074                                                         &reclogo, &recsize, MSrawmsg, MaxMsgSize-1 );
1075           } while (res !=GET_NONE);
1076       
1077           /* One heartbeat to announce ourselves to statmgr
1078           ************************************************/
1079           ewspectra_status( TypeHeartBeat, 0, "" );
1080           time(&MyLastBeat);
1081       
1082       
1083           /* Start the message stacking thread if it isn't already running.
1084                ****************************************************************/
1085           if (MessageStackerStatus != MSGSTK_ALIVE )
1086           {
1087                 if ( StartThread(  MessageStacker, (unsigned)THREAD_STACK, &tidStacker ) == -1 )
1088                 {
1089                   logit( "e",
1090                                  "%s(%s): Error starting  MessageStacker thread; exiting!\n",
1091                          Argv0, MyModName );
1092                   tport_detach( &InRegion );
1093                   tport_detach( &OutRegion );
1094                   return( -1 );
1095                 }
1096                 MessageStackerStatus = MSGSTK_ALIVE;
1097           }
1098       
1099       
1100           /* Start main ringdup service loop
1101           **********************************/
1102           while( tport_getflag( &InRegion ) != TERMINATE  &&
1103                          tport_getflag( &InRegion ) != MyPid         )
1104           {
1105                 /* Beat the heart into the transport ring
1106                  ****************************************/
1107                  time(&now);
1108                  if (difftime(now,MyLastBeat) > (double)HeartBeatInt )
1109                  {
1110                          ewspectra_status( TypeHeartBeat, 0, "" );
1111                  time(&MyLastBeat);
1112                  }
1113       
1114                  /* take a brief nap; added 970624:ldd
1115                   ************************************/
1116                  sleep_ew(500);
1117           } /*end while of monitoring loop */
1118               
1119        }
1120               
1121        /* Clean up after ourselves */
1122        if ( InRingKey != -1 )
1123                tport_detach( &InRegion );
1124        if ( OutRingKey != -1 )
1125                tport_detach( &OutRegion );
1126        ws2ts_purge( wsh, NULL, NULL );
1127}
Note: See TracBrowser for help on using the repository browser.