source: trunk/src/archiving/wave_serverV/wave_serverV.c @ 3164

Revision 3164, 108.1 KB checked in by paulf, 12 years ago (diff)

sqlite stuff added

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1
2/*
3 *   THIS FILE IS UNDER RCS - DO NOT MODIFY UNLESS YOU HAVE
4 *   CHECKED IT OUT USING THE COMMAND CHECKOUT.
5 *
6 *    $Id$
7 *
8 *    Revision history:
9 *     $Log$
10 *     Revision 1.39  2007/11/30 18:40:16  paulf
11 *     sqlite stuff added
12 *
13 *     Revision 1.38  2007/05/31 16:11:44  paulf
14 *     minor change to add in version number as #define, and update ver so it is clear from the logs which is running
15 *
16 *     Revision 1.37  2007/05/29 13:42:37  paulf
17 *     patched some checks in checking data_type and also some memory alignment issues in calls to TraceBufIsValid()
18 *
19 *     Revision 1.36  2007/03/28 18:02:34  paulf
20 *     fixed flags for MACOSX and LINUX
21 *
22 *     Revision 1.35  2007/02/26 14:47:38  paulf
23 *     made sure time_t are casted to long for heartbeat sprintf()
24 *
25 *     Revision 1.34  2007/02/23 15:24:33  paulf
26 *     fixed long to time_t declaration
27 *
28 *     Revision 1.33  2007/02/19 20:45:38  stefan
29 *     tweak on last revision
30 *
31 *     Revision 1.32  2007/02/19 20:43:36  stefan
32 *     patch for gcc 3.3.6 else if defined structure per Matt.VanDeWerken@csiro.au
33 *
34 *     Revision 1.31  2006/10/23 21:20:29  paulf
35 *     updated for linux makes
36 *
37 *     Revision 1.30  2006/09/05 13:46:52  hal
38 *     added _LINUX ifdefs so that signals under linux are handled the same as under solaris
39 *
40 *     Revision 1.29  2006/07/20 16:18:29  stefan
41 *     ifdef fixes for Linux
42 *
43 *     Revision 1.28  2006/07/11 22:28:53  stefan
44 *     The problem was that the size of the tankfile is not an integer multiple
45 *     of the packet size and so occasionally there is a mess up at the end of
46 *     the file (mine were 3 in this example).  The result was that the data
47 *     requested from the server was not returned and an event file failed to
48 *     be written.
49 *     [Fix was to add an else to wave_serverV_config]
50 *     Richard R Luckett, BGS
51 *
52 *     Revision 1.27  2006/03/27 17:16:49  davek
53 *     Updated the wave_serverV version and timestamp.
54 *
55 *     Revision 1.26  2006/03/27 17:12:07  davek
56 *     Added code to check the return value from call to WaveMsg2MakeLocal(),
57 *     and reject packets which for which WaveMsg2MakeLocal() reported and error.
58 *
59 *     Changed a >= to a > comparison in the packet filtering logic that was causing
60 *     wave_serverV to reject tracebufs that only contained 1 sample.
61 *
62 *     Revision 1.25  2005/07/21 21:03:41  friberg
63 *     added in one _LINUX ifdef directive for sin_addr struct
64 *
65 *     Revision 1.24  2005/04/21 22:48:46  davidk
66 *     Updated the version timestamp.  (It had not been properly updated in some time.)
67 *     Current version supports SCNL menu protocol adjustment.
68 *
69 *     Revision 1.23  2005/04/07 15:23:48  davidk
70 *     Separated the signal handling logic for Solaris(UNIX) and Windows, due
71 *     to problems using the UNIX logic in wave servers started from a
72 *     Windows Service.
73 *     Wave server should behave the same as before except when started from
74 *     a service.  Now the service based wave server should not shutdown when
75 *     users log in/out.
76 *     Added call to WIN32 API SetConsoleCtrlHandler() to setup a windows signal
77 *     handler to handle close messages, while using the default service logic to
78 *     ignore logoff messages.
79 *
80 *     Revision 1.22  2005/03/17 17:37:39  davidk
81 *     Changes to enforce a maximum tanksize of 1GB.
82 *     Added code to enforce 1GB limits on wave_server tanks.
83 *     If TruncateTanksTo1GBAndContinue is set in the config file,
84 *     then tanks listed in the config file at >= 1GB will be truncated to
85 *     just under 1GB, and wave server operation will continue normally.
86 *     If TruncateTanksTo1GBAndContinue is not set,
87 *     wave server will issue an Error and EXIT if it encounters a tank
88 *     >= 1GB in the config file.
89 *     This change was made because a bug was discovered that limits
90 *     the safe limit of a wave server tank file to 1GB.
91 *
92 *     Revision 1.21  2004/09/14 16:23:33  davidk
93 *     Fixed bug in main() function, where bAbortMainLoop flag was never
94 *     initialized.  Initialized bAbortMainLoop to FALSE.
95 *     Update wave_serverV.c timestamp.
96 *
97 *     Revision 1.20  2004/06/10 23:14:09  lombard
98 *     Fixed logging of invalid packets
99 *     Fixed compareSCNL function.
100 *
101 *     Revision 1.19  2004/06/09 21:35:58  lombard
102 *     Added code to validate a trace_buf packet before it gets into the tanks.
103 *     Added comments about GETPIN request not working correctly.
104 *
105 *     Revision 1.18  2004/05/18 22:30:19  lombard
106 *     Modified for location code
107 *
108 *     Revision 1.17  2003/04/15 18:48:27  dhanych
109 *     replaced commented changes with just comments
110 *
111 *     Revision 1.16  2003/04/14 19:13:58  dhanych
112 *     made changes to fix multi-home bug (removed gethomebyaddr)
113 *
114 *     Revision 1.15  2001/10/03 21:17:02  patton
115 *     Made logit changes due to new logit code.
116 *     Disk logging is now hardcoded to on
117 *     during config file reading, and set
118 *     to the user's choice afterwards.
119 *     JMP 10/3/2001.
120 *
121 *     Revision 1.14  2001/08/10 21:49:09  dietz
122 *     changed tport_getmsg to tport_copyfrom so we can distinguish between
123 *     sequence gaps and truly missed msgs in the transport ring.
124 *
125 *     Revision 1.13  2001/06/29 22:23:59  lucky
126 *     Implemented multi-level debug scheme where the user can specify how detailed
127 *     (and large) the log files should be. If no Debug is specified, only
128 *     errors are reported and logged.
129 *
130 *     Revision 1.12  2001/06/18 23:43:46  alex
131 *     *** empty log message ***
132 *
133 *     Revision 1.11  2001/05/08 00:09:47  dietz
134 *     Changed to shut down gracefully if the transport flag is
135 *     set to TERMINATE or MyPid.
136 *
137 *     Revision 1.10  2001/01/24 00:01:06  dietz
138 *     *** empty log message ***
139 *
140 *     Revision 1.9  2001/01/20 02:26:39  davidk
141 *     resolved a bug where multiple confusing timestamps were issued for wave_serverV.c
142 *
143 *     Revision 1.8  2001/01/18 02:26:59  davidk
144 *     Added ability to issue status messages from server_thread.c,
145 *     which entailed the following changes:
146 *     1. Changed status message constants and vars from static to
147 *     full globals so that status messages could be issued from
148 *     other files.
149 *     2. Moved #define constants for status message types to wave_serverV.h
150 *     so that they can be used by all .c files.
151 *     3. Moved wave_serverV_status() prototype to wave_serverV.h so
152 *     that other .c files can issue status messages.
153 *
154 *     Changed the timestamp.
155 *
156 *     Revision 1.6  2001/01/08 22:06:07  davidk
157 *     Modified portion of wave_server that handles client requests,
158 *     so that it replies to client when an error occurs while handling
159 *     the request.(Before, no response was given to the client when an
160 *     error occured.)  Added flags FC,FN, and FB.  Moved the wave_server
161 *     version logging into a separate function, such as the one used in
162 *     serve_trace.c.
163 *
164 *     Revision 1.5  2000/07/24 18:46:58  lucky
165 *     Implemented global limits to module, installation, ring, and message type strings.
166 *
167 *     Revision 1.4  2000/07/08 19:01:07  lombard
168 *     Numerous bug fies from Chris Wood
169 *
170 *     Revision 1.3  2000/07/08 16:56:21  lombard
171 *     clean up logging for server thread status
172 *
173 *     Revision 1.2  2000/06/28 23:45:27  lombard
174 *     added signal handler for graceful shutdowns; numerous bug fixes
175 *     See README.changes for complete list
176 *
177 *     Revision 1.1  2000/02/14 19:58:27  lucky
178 *     Initial revision
179 *
180 *
181 */
182
183  /*
184   wave_serverV.c:
185
186*****************************************************************************
187   Warning:  This file has been victim to multiple Global replace executions,
188   changing "wave_serverIII" to "wave_serverIV" and then to "wave_serverV".
189   As a result some of the documentation will read a little funny, such as:
190   "Wave_serverV represents an upgrade to the Wave_serverV code with
191   the following mods:".  The sentence originally looked like "Wave_serverIV
192   represents an upgrade to the Wave_serverIII code with the following mods:"
193*****************************************************************************
194
195MOTVATION
196wave_serverV is being written for the following reason:
197 Provide a means of saving indexes and tank structures to disk, such that
198 recovery from a crash becomes a simple, quick, turnkey experience, while
199 keeping the wave_server performance close to that ov wave_serverV without
200 index updates.  Same low performance price, now with improved crash recovery
201 and racing stripes.  DK
202
203
204MOTVATION:
205wave_serverV is being written for the following reasons:
206* Large storage space per trace: We have to store several days worth of stuff
207  from 600+ stations.
208* Conurrently serve a number of clients without ever overloading the system.
209* Support the new trace format, including variable sample size, rate, and
210  variable message length.
211* Support interrupted data, either short telemetry dropouts or segmented data
212  with weeks between segments.
213* Be able to survive being turned on and off: don't loose the data you had,
214  and become operational quickly after coming up. (Older wave servers took a
215  long time reading their tank files on restart)
216* Have an extensible protocol for future clients.
217
218HISTORY:
219The original wave server was written by Will in a rather spectacularly short
220time: We came in on a Monday to find Will comatose, all waste cans full of
221expresso cups, and a working wave server. The motivation was to support
222Alaska's interest in writing trace data into DataScope, and to provide a
223playback facility for testing real-time algorithms.
224
225Lynn then proceeded to enhance the thing a number of ways. Kent produced a
226variation, introducing the idea of segmenting the tank into one partition for
227each trace and feeding it demuxed data. Things then diverged, based on some
228misuderstanding of what code from Alaska could be integrated into the
229Earhtworm release. This is an effort to meet Menlo Park requriements, meet
230Tsunami needs, and bring things back together. Several authors were involved
231in wave_serverV: Alex wrote the main thread (wave_serverV.c); Mac McKenzie
232wrote the parser of the client thread (server_thread.c). Eureka Young wrote
233the code to send trace data (serve_trace.c).
234
235LEFT TO DO:
2361. Separate thread to pull trace data messages from public ring. (done)
2372. Compute oldest time in tank rather than reading it.
2383. Index compaction when out of entires
2394. Security scheme to limit access to blessed clients.
2405. Maybe compute actual digitization rate rather than nominal.
241
242PROGRAM STRUCTURE:
243
244Based on wave_server.c. It deals only with TYPE_TRACEBUF2 messages. It operates
245a disk tank for each pin it's told to serve. Each tank is a circular disk
246file, an associated TANK structure, and an index (list in memory).  The idea
247is that wave_serverV can deal reasonably with fragmentray data
248"CHUNKs". Chunks are arbitrarily long sequences of TYPE_TRACEBUF2 messages
249without breaks (that is, there are no time gaps longer than 1.5 times the
250stated sampling period). Such chunks can be separated by any amount of
251time. Thus, telemetry interruptions as well as triggered event data can be
252handled.
253
254        The tank file is thought of as being divided into 'records' of
255specified length (from the parameter file).  Each write of a TYPE_TRACEBUF2
256message is placed into such a 'record'. The number of records in the tank is
257computed from the parameters in the config file.
258
259        The TANK structure holds various pointers describing the tank file and
260the associated index. This lives in memory, and is the 'living' set of
261pointers for the tank. There is one TANK structure for each tank maintained.
262Notably, included here is the current value of the insertion point into the
263tank file, and a pointer to the start of the index.
264
265        The index is a memory-based list of CHUNK descriptors. At startup, an
266array of such descriptors is allocated and initialized. The size of this array
267is read from the configuration file. Each such descriptor contains the chunk
268start time, end time, and offset into the tank file where the beginning of the
269chunk lives; (and of course, a pointer to the next element of the list){This
270is now inherited from the array form, instead of being separately maintained.
271
272
273The scheme (not implemented), is to perform index squeezing: If the number of
274data chunks should exceed the available chunk descriptors, the smallest chunks
275will be removed from the index, and thus be no longer available to
276servers. The motivation is to survive observed telemetry drop-outs with DST's,
277where a large number of very small chunks are produced for some period of
278time. The motivation is to preserve older, long chunks, and sacrifice what is
279probably useless data. Note that it is possible to recover such de-indexed
280chunks by brute force reading of the tank file.  When wave_serverV shuts down,
281either by request or due to error, it writes the TANK structure and index
282array to a companion file to the tank. The name of this companion file is
283created by appending an extention (currently ".inx") to the tank file name. On
284startup, wave_serverV reads its configuration file, which tells it what tanks
285to operate. It then sees if the specified tank files exist already. If so, an
286attempt is made of open the associated companion file, and to read the TANK
287structure and index from there. A check is made to verify that the TANK
288structure from disk is consistent with what was read from the configuration
289file. wave_serverV exits with horror if there is a miss-match.
290
291Wave_serverV Improvements (over wave_serverIV)
292With wave_serverV, the indexes are periodically written to disk at a frequency
293determined by the config file.  The TANK structures, which include the
294insertion point for each tank file, are also saved to disk periodically, at a
295frequency determined by another param in the config file.  If a crash occurs,
296then on restart, wave_server attempts to rebuild the indexes using the
297insertion point stored in the TANK structures file, and the indexes, stored in
298the index files, for each tank.  On startup, the Tank file is considered to be
299only as up to date as the insertion point that is stored on disk.  Therefore,
300the insertion point should be saved often.  The index files on disk are not
301required to rebuild indexes, but large tanks can require long index rebuild
302times, if the must be built from scratch.
303
304wave_serverV also introduces the use of redundant index and tank structures
305files.  The redundant files are used to prevent file corruption errors from
306disabling the crash recovery process.  The theory behind the redundant files
307is that if a system suddenly crashes due to a power outage, or other problem,
308and a vital file is being written, then that file could become corrupt, and
309would be unusable.  By using alternating redundant files, the recovery files
310are better protected, because only one file can be written to at a time, and
311thus only one could be corrupted by an interruption during writing.
312
313
314THREAD STRUCTURE
315
316        Wave_serverV consists of four fulltime threads: The main thread reads
317the configuration file, sets things up, and starts a thread which picks up
318messages of interest and stuffs them into a memory-based queue. It then drops
319into a working loop which picks up trace data messages from the queue and
320writes them into the appropriate tank. This thread also maintains the index
321lists and keeps the variables in the TANK structure up to date. It also starts
322a server manager thread which listens for connect requests over the
323network. When a connect rerquest from some client is detected, this thread
324starts a copy of the server thread, and gives it the socket descriptor to the
325client. This server thread is responsible for getting client requests and
326providing the replies. This server thread quits if the client breaks the
327socket connection, or if some horrible error is detected. Otherwise the server
328thread keeps dealing with the client indefinitely. The server manager thread
329will start up to some fixed number (coded-in constant) of server threads to
330prevent overloading the machine.  The final thread is the index manager.
331Started by main, it intermittently writes the contents of the index lists and
332tank structures to disk, to enable recovery from a crash.
333
334THREAD CONCURRENCY AND TANK FILE IO:
335
336        The tank file is used by both the main writing thread and the reading
337server threads. Interlock is as follows: There is one mutex for each tank. The
338main writing thread takes it on itsself to interlock each tank file operation
339and each index operation with a mutex, save the original offset of the tank
340file, do the io operation, restore the offset and release the mutex. The
341server threads must wait for, hold, and release the mutex whenever it looks at
342the tank file or the index list.  The index manager thread must also deal with
343mutexes when interacting with tank data.
344
345PROTOCOL:
346Notes:
347        * The expectation is that additional protocols will be added as
348          needed. Specifically, a 'regurgitate raw' should be defined soon to
349          provide a play-back capability.
350
351        * A value of 'nada' for fill-value indicates no fill which is not
352          supported in the first version. We don't even know how we'd handle
353          that...
354
355        * <s><c><n><l> is short-hand for site code, channel code, network and
356          location id.
357
358        * <flags> :: F | F<letter> ... <letter>
359
360    Currently Supported Flags:
361
362    R  All requested data is right of the tank (after the tank).
363    L  All requested data is left of the tank (before the tank).
364    G  All requested data is in a gap in the tank.
365    B  The Client's request was bad.  It contained incorrect syntax.
366    C  The tank from which the client requested data is corrupt.
367       (Data may or may not be available for other time intervals
368        in the tank.  WARNING!  This flag indicates that the tank
369        is corrupt, this means data from other time intervals may
370        be inaccurate)
371    N  The requested channel(tank) was not found in this wave_server.
372    U  An unknown error occurred.
373    DK 010501  Added additional flags: FB, FC, FN, FU
374
375        * <datatype> is two character code ala CSS.  Initially, we will
376          support i2, i4, s2, and s4. i for Intel; s for Sparc. 2 meaning
377          two-bytes per integer, 4 ...
378
379        * Replies are ascii, including trace data.
380
381        * the <request id> was added later to permit the wave viewer to keep
382          track of which reply belongs to which query. The idea is that the
383          client sends this as part of it's request. To us it's an arbitrary
384          ascii string.  We simply echo this string as the first thing in our
385          reply. We don't care what it is. The motivation is that the
386          waveviewr would occasionally get confused as to which reply belonged
387          to which of it's requests, and it would sit there, listening for a
388          reply which never came.
389
390                                *** NOTE NOTE NOTE ***
391          The request id is echoed as a fixed length 12 character string, null
392          padded on the right if required.
393
394
395Description of Requests and Responses
396
397MENU: <request id>
398        returns one line for each tank it has:
399        <request id>
400        pin#  <s><c><n><l> <starttime> <endtime>  <datatype>
401          .      .       .         .          .
402        pin#  <s><c><n><l> <starttime> <endtime>  <datatype>
403        \n
404
405MENUPIN: <request id>  <pin#>
406        returns as above, but only for specified pin number:
407        <request id> <pin#>  <s><c><n><l>  <starttime>  <endtime>  <datatype> <\n>
408
409MENUSCN: <request id>  <s><c><n><l>
410        returns as above, but for specified <s><c><n><l> name:
411        <request id> <pin#>  <s><c><n><l>  <starttime>  <endtime>  <datatype> <\n>
412
413GETPIN:  <request id> <pin#> <starttime>  <endtime> <fill-value>
414        returns trace data for specified pin and time interval. Gaps filled
415         with <fill-value>.  <request id> <pin#> <s><c><n><l> F <datatype>
416         <starttime> <sampling rate> sample(1) sample(2)... sample(nsamples)
417         <\n> {the samples are ascii}
418
419        If the requested time is older than anything in the tank, the reply
420        is: <request id> <pin#> <s><c><n><l> FL <datatype> <oldest time in tank>
421        <sampling rate> \n for the case when the requested interval is younger
422        than anything in the tank, the reply is <request id> <pin#> <s><c><n><l>
423        FR <datatype> <youngest time in tank> <sampling rate> \n
424
425        NOTE: the GETPIN request has never worked in wave_serverV. As pin
426        numbers fade into the distance, it is unlikely that this request will
427        ever be supported.
428
429GETSCN: <request id> <s><c><n><l> <starttime>  <endtime> <fill-value>
430        returns as above, but for specified scn name.
431        <request id> <pin#> <s><c><n><l> F <datatype> <starttime> <sampling-rate >
432                 sample(1) sample(2)... sample(nsamples) <\n>
433
434
435GETSCNRAW: <request id> <s><c><n><l> <starttime>  <endtime>
436        returns trace data in the original binary form it which it was put
437        into the tank. Whole messages will be supplied, so that the actual
438        starttime may be older than requested, and the end time may be younger
439        than requested. The reply is part ascii, terminated by a "\n",
440        followed by binary messages:
441
442        <request id> <pin#> <s><c><n><l> F <datatype> <starttime> <endtime>
443        <bytes of binary data to follow> \n The line above is all in
444        ascii. All below is binary, byte order as found in the tank:
445        <trace_buf msg> ... <trace_buf msg>
446
447        If the requested time is older than anything in the tank, the reply is:
448         <request id> <pin#> <s><c><n><l> FL <datatype> <oldest time in tank> \n
449
450        For the case when the requested interval is younger than anything in
451        the tank, the reply is:
452        <request id> <pin#> <s><c><n><l> FR <datatype> <youngest time in tank> \n
453
454        For the case when the requested interval falls completely in a gap,
455        the reply is:
456         <request id> <pin#> <s><c><n><l> FG <datatype> \n
457
458*/
459
460
461#include <stdio.h>
462#include <stdlib.h>
463#include <math.h>
464#include <signal.h>
465#include <string.h>
466#include <time.h>
467#include <errno.h>
468#include <data_buf.h>
469#include <trace_buf.h>
470#include <earthworm.h>
471#include <kom.h>
472#include <swap.h>
473#include <transport.h>
474#include <mem_circ_queue.h>
475#include "wave_serverV.h"
476#include "tb_packet_db.h"
477
478/* Function Prototypes
479 *********************/
480void    wave_serverV_config( char * );
481void    wave_serverV_lookup( void );
482void    LogTank( TANK* );
483int     SocketInit( int, char * );
484thr_ret SocketServe( void * );
485double  get_starttime( char *, unsigned char );
486int     ReportServerThreadStatus(void);
487void    signal_hdlr(int signum);
488int     TraceBufIsValid(TANK * pTSPtr, TRACE2_HEADER * pCurrentRecord);
489
490#define IPADDRLEN 20
491#define FILENAMELEN 80
492
493/* Global socket things
494***********************/
495int            PassiveSocket;     /* Socket descriptor; passive socket     */
496static char    ServerIPAdr[IPADDRLEN];  /* IP address of wave_server machine */
497static int     ServerPort;        /* Server port for requests & replies    */
498
499/* Pointers to things and actual allocations
500********************************************/
501TANK *  Tanks=0;                /* Pointer to the array of configured tank descriptors */
502MSG_LOGO        ServeLogo[MAX_TANKS];   /* worst possible case is each pin from a different logo */
503mutex_t         QueueMutex;             /* mutex handles. We suspect they're long integers. */
504                                                /* The last one is for the input queue */
505int             NLogos;                         /* number of distinct logos to deal with */
506
507/* Parameters to be read from a configuration file (defaults given)
508*******************************************************************/
509static char          RingName[MAX_RING_STR];         /* name of transport ring for i/o    */
510static long          RingKey;              /* key to transport ring to read from */
511static unsigned char MyModId;              /* wave_server's module id            */
512static int           LogSwitch;            /* Log-to-disk switch                 */
513double               GapThresh;            /* Theshhold factor for gap declaration */
514static int           IndexUpdate;            /* should we write the index after each tank write. */
515int                  SocketTimeoutLength;  /* Length of Timeouts on SOCKET_ew calls */
516int                  SOCKET_ewDebug=0;     /* Set to 1 for socket debugging */
517int                  ClientTimeout = -1;   /* Time to wait for any activity
518                                              from client; -1: wait forever */
519static int           TankStructUpdate;     /* Seconds between TS disk updates */
520static int           IndexUpdate;          /* Seconds between index disk updates */
521int                  PleaseContinue=0;     /* Indicates whether to continue or not after a tank has failed. */
522int                  ReCreateBadTanks=0;   /* Indicates whether a tank should be born-again from scratch
523                                              if it is found to be bad during initialization. */
524int                  SecondsBetweenQueueErrorReports;
525                                           /* Indicates the minimum amount of time between error reports
526                                              to statmgr, related to internal message queue lapping. */
527int                  MaxServerThreads=10; /* largest number of  server threads we'll start up */
528int                  bAbortOnSingleTankFailure=TRUE;
529                                          /* set to true in config file if you wish wave_server
530                                             to continue if there is an I/O error on a single
531                                             tank file. DK 01/08/01      */
532int                  bTruncateTanksTo1GBAndContinue=FALSE;
533                                          /* set to true in config file if you wish wave_server
534                                             to truncate tanks that are listed in the config file at
535                                             over 1GB, down to 1GB and continue.  Default behavior
536                                             is to complain and exit if a >1GB tank is given.DK 2005/03/17  */
537
538int                                     bUsePacketSyncDb = 0;    /* Set to 1 in config file if we should use tb_packet_db module to
539                                                                                     store/retrieve out of sync packets ronb040307 */
540
541char                            sPacketSyncDbFile[TB_MAX_DB_NAME];      /* name of db file holding sync data ronb041307 */             
542
543int                                     bTankOverSyncDbData = 0;                /* how to resync data or which has priority. ronb041607*/
544
545int                             bPurgePacketSyncDb = 1;        /* On initialization of the db should we purge all
546                                                                                                   packets(1) or leave alone (0). Note that the db will be
547                                                                                                   purged of records older than the tank file every nPurgeTankFreq
548                                                                                                   Heartbeats.
549                                                                                                        ronb052007 */
550
551/*  I am not sure if SocketTimeoutLength should be volatile.  It should only
552    be modified by the main thread, but it is read in other threads.  I am
553    leaving it as a normal global int, but based on the problems that Alex
554    had, I would be cautious of it.  DavidK
555*/
556
557
558/* Server thread stuff
559**********************/
560#define THREAD_STACK  8096
561static unsigned int *  tidSockSrv;       /* id of socket-serving threads */
562static unsigned int   tidServerMgr;                     /* id of dispatcher of socket-serving threads */
563ServerThreadInfoStruct * ServerThreadInfo;
564ServerThreadInfoStruct * ServerThreadInfoPrevious;
565
566thr_ret ServerThread( void*  );
567thr_ret ServerMgr( void*  );
568
569
570/* Message stacker thread stuff
571*******************************/
572thr_ret MessageStacker( void * );
573int     MessageStackerStatus;                           /* as above, but for message stacking thread */
574static unsigned int   tidStacker;
575static int     InputQueueLen;   /* max messages in input buffer */
576QUEUE OutQueue;                 /* from queue.h, queue.c; sets up linked */
577                                /*   list via malloc and free */
578static int QueueHighWaterMark,QueueLowWaterMark;  /* For polling queue depth */
579
580
581/* IndexMgr thread stuff
582************************/
583thr_ret IndexMgr( void *);
584static unsigned int tidIndexMgr;
585static mutex_t IndexTerminationMutex;
586int terminate=0;   /* Changed to global to server threads: PNL 6/20/00 */
587static int QueueReportInterval=30;
588/* Heartbeat Stuff
589******************/
590time_t        timeLastBeat;             /* system time of last heartbeat sent */
591static long   HeartBeatInt;             /* seconds between heartbeats */
592
593/* Look up from earthworm.h
594 **************************/
595unsigned char InstId;         /* local installation id      */
596unsigned char TypeHeartBeat;
597unsigned char TypeError;
598unsigned char TypeWaveform;
599
600/* Other globals
601 ***************/
602static SHM_INFO       Region;       /* Info structure for shared memory */
603volatile        int   nTanks;       /* number of tanks in operation     */
604volatile        int   Debug =0;     /* Can be set to one by an optional configuration file command */
605static volatile int   MaxMsgSiz =0; /* the largest message expected */
606pid_t                 myPid;        /* for restarts by startstop */
607
608/**************************/
609/*  Global TANK and TANKList Variables */
610TANKList * pConfigTankList;
611TANKList * pTANKList;
612TANK * pTSPtr;
613TANK * pTSTemp;
614/**************************/
615
616TBDB_STORE      *pPacketDb =NULL;       /* ponter to TBDB_STORE for temp packet storage*/
617
618unsigned int nPurgeDbFreq = 20; /* purge db of records older than oldest tank trace_buf
619                                                                every nPuregeDbFreq Heartbeat.*/
620
621static char  Text[250];        /* string for log/error messages          */
622
623int serve_trace_c_init();  /* used by serve_trace.c to report it's
624                              internal-internal-internal-version.
625                              Added by davidk, due to the number of
626                              serve trace.c versions being put out.
627                           */
628
629int index_util_c_init();   /* same as serve_trace_c_init(), except for
630                              index_util.c.
631                           */
632
633int wave_serverV_c_init();   /* same as serve_trace_c_init(), except for
634                              wave_serverV.c.
635                           */
636int server_thread_c_init();   /* same as serve_trace_c_init(), except for
637                              server_thread.c.
638                           */
639
640/* Put this here to avoid having to ifdef a prototype. */
641#ifdef _WINNT
642BOOL CtrlHandler( DWORD fdwCtrlType )
643{
644  switch( fdwCtrlType )
645  {
646    // Handle the CTRL-C signal.
647    case CTRL_C_EVENT:
648    case CTRL_CLOSE_EVENT:
649    case CTRL_BREAK_EVENT:
650      signal_hdlr(0);
651      return(TRUE);
652
653    default:
654      return FALSE;
655  }
656}
657#endif /* _WINNT */
658
659
660#define WSV_VERSION "5.1.38 - 2007-11-28 sqlite"
661
662main( int argc, char **argv )
663{
664  time_t        timeNow;        /* current system time                      */
665  int           result;
666  int           i,j;
667  char*              deQMsg;         /* for picking messages from the memory queue   */
668  char*         inMsg;          /* naked message from queue after remvoing "t"  */
669  MSG_LOGO      logo;           /* of the incoming message                      */
670  char*         scrMsg;         /* scratch pad: like figuring out who's nexting */
671  long          msgInLen;       /* length of retrieved message                  */
672  int           t;              /* running index over tanks.                    */
673  int           grabit;         /* silly little flag for extracting unique logos */
674  unsigned int  tmpOffset;
675  time_t        CurrentTime;
676  int           bAbortMainLoop;  /* flag to abort the main loop */
677  int           flag;            /* transport ring flag value */
678  int           rc;
679
680  /* Variables to handle recording of File I/O errors */
681  int iFileOffset, iFileBlockSize;
682  char * szFileTankName;
683  char szFileFunction[15];
684  int  bIssueIOStatusError = FALSE;
685  unsigned int nPurgeDbCount = 0;
686  double PurgeTime = 0.0;
687  double LastPurgeTime = 0.0;
688
689  TRACE2_HEADER *trh;   /* paulf addition 1/07/1999 mem alignment bug fix*/
690
691  TracePacket   Tpkt;   /* paulf addition 5/24/2007 another mem alignment fix!
692                                        ironic no? */
693
694  /* Catch broken socket signals
695   ******************************/
696#if defined(_SOLARIS) || defined(_LINUX) || defined(_MACOSX)
697  (void)sigignore(SIGPIPE);
698#endif
699
700  /* Not enough arguments
701   ********************/
702  if ( argc != 2 )
703  {
704    fprintf( stderr, "Usage: wave_serverV <configfile>\n" );
705    fprintf( stderr, "Version: %s\n", WSV_VERSION );
706    exit( -1 );
707  }
708
709  /* Initialize name of log-file & open it
710     force disk logging until after config
711     file is read.  JMP 10/3/2001
712    ***************************************/
713  logit_init( argv[1], 0, 256, 1 );
714
715
716  /* debug: set up periodic dump logic
717   ************************************/
718
719
720  /*
721     wave_server_V_config() needs to set the following variables
722     pConfigTankList: a pointer to a list of tank structs created
723     from the config file, including a pointer to the TANK
724     file for the waveserver.
725     PleaseContinue: a flag indicating weather waveserver should
726     go down in flames if a single tank fails, or whether it
727     should please continue until an error occurs that renders
728     all tanks useless.
729  ***********************/
730  wave_serverV_config( argv[1] );
731  wave_serverV_lookup( );
732
733  /* Reset logging to desired level
734   ***************************************/
735  logit_init( argv[1], 0, 256, LogSwitch );
736  if (Debug > 1)
737    logit( "" , "NOTE: This is the post-v5.2 patch to not write the empty tank messages\n");
738
739  logit( "" , "wave_serverV: Read command file <%s>\n", argv[1] );
740
741  /* Give serve_trace.c a chance to init.
742     this is probably just a version stamp, but
743     who knows?  Also index_util.c
744   *********************************************/
745  wave_serverV_c_init();
746  serve_trace_c_init();
747  index_util_c_init();
748  server_thread_c_init();
749
750
751/* Get process ID for heartbeat messages */
752  myPid = getpid();
753  if( myPid == -1 )
754  {
755    logit("e","wave_serverV: Cannot get pid. Exiting.\n");
756    exit (-1);
757  }
758
759  /* initialize packet db ronb 041207*/
760  if(bUsePacketSyncDb) {
761          if(tbdb_open(&pPacketDb, sPacketSyncDbFile)) {
762                 /* delete packets based on ldSyncRetentionTime config setting */
763                  if(bPurgePacketSyncDb) {
764                          tbdb_purge_all(pPacketDb);
765                  }
766          }     else {
767                  /*error opening packet db*/
768                  logit("e","wave_serverV: failed to open packet db.\n");
769          }
770   }
771 
772 /* Attach to transport ring (Must already exist)
773   *********************************************/
774  tport_attach( &Region, RingKey );
775
776  /* Right now, everything we have is in pConfigTankList, it contains all
777   * tank info and filenames from the config file, but no valid file pointers..
778   */
779     /* Start of new Initialization */
780  if((pTANKList=malloc(sizeof(TANKList))) == NULL)
781  {
782    logit("et","wave_serverV:  malloc() failed for tank list.  Exiting\n");
783    exit(-1);
784  }
785
786  /* Copy the header information from the ConfigTankList to the Tank Structure
787     file list.
788   ***********************/
789  *pTANKList=*pConfigTankList;
790
791  /* Retrieve the TANK structs from disk */
792  if(GetLatestTankStructures(pTANKList) == -1)
793  {
794    logit("et","Failed in call to GetLatestTankStructures().  Exiting\n");
795    exit( -1 );
796  }
797
798  /* We now have pTANKList, which is a list of TANK structures
799     retrieved from the TANK file(s), and pConfigTankList, a list of TANK
800     structures retrieved from the config file.  We need to merge the two.
801     Use the tanks in pTANKList to configure the tanks in the pConfigTankList
802  */
803
804  time(&CurrentTime);
805  /* For each tank in the list retrieved from disk */
806  for(pTSPtr=pTANKList->pFirstTS;
807      pTSPtr != pTANKList->pFirstTS + (pTANKList->NumOfTS);
808      /*Not EOL*/
809      pTSPtr= pTSPtr+1  /* move to next struct */)
810  {
811
812    /* Configure the tank if it's in the config list, with the info from the
813       tank struct file.  ConfigTANK() returns a ptr to the just configured
814       tank in the Config Tank List.
815    */
816    if( (pTSTemp=ConfigTANK(pTSPtr,pConfigTankList)) != (TANK *) NULL )
817    {
818      /* Tank is in the list, so open it, and Index files, update the
819         living index, and write a copy of the living index to file.
820      */
821
822      if(OpenTankFile(pTSTemp) < 0)
823      {
824        if(!ReCreateBadTanks) MarkTankAsBad(pTSTemp); else pTSTemp->isConfigured=0;
825        continue;
826      }
827      if(OpenIndexFile(pTSTemp,pConfigTankList->redundantIndexFiles,0) < 0)
828      {
829        if(!ReCreateBadTanks) MarkTankAsBad(pTSTemp); else pTSTemp->isConfigured=0;
830        continue;
831      }
832      if(BuildLIndex(pTSTemp) < 0)
833      {
834        if(!ReCreateBadTanks) MarkTankAsBad(pTSTemp); else pTSTemp->isConfigured=0;
835        continue;
836      }
837      if(WriteLIndex(pTSTemp,0,CurrentTime) < 0)
838      {
839        if(!ReCreateBadTanks) MarkTankAsBad(pTSTemp); else pTSTemp->isConfigured=0;
840        continue;
841      }
842    }
843    else
844    {
845      logit("t","Tank %s,%s,%s,%s found in %s, but not listed in config file\n",
846            pTSPtr->sta,pTSPtr->chan,pTSPtr->net,pTSPtr->loc,
847            GetRedundantFileName(pTANKList->pTSFile));
848    }
849  }  /* End for each tank from Tank structures file */
850
851  /* We don't need pTANKList anymore, since we've pilfered anything useful
852     in it while configuring pConfigTankList, our new hero */
853  if(pTANKList->pFirstTS)
854  {
855    free(pTANKList->pFirstTS);
856  }
857  free(pTANKList);
858
859  /* for each tank in the list read from the config file */
860  for(pTSPtr=pConfigTankList->pFirstTS;
861      pTSPtr != (pConfigTankList->pFirstTS) + (pConfigTankList->NumOfTS);/*!EOL*/
862      pTSPtr= pTSPtr+1  /* move to next struct */)
863  {
864    if(!pTSPtr->isConfigured)
865      /* Tank is marked as configured during BuildLIndex() */
866    {
867      if(CreateTankFile(pTSPtr) < 0)
868      {
869        MarkTankAsBad(pTSPtr);
870        continue;
871      }
872      if(OpenIndexFile(pTSPtr,pConfigTankList->redundantIndexFiles,1) < 0)
873      {
874        MarkTankAsBad(pTSPtr);
875        continue;
876      }
877      if(BuildLIndex(pTSPtr) < 0)
878      {
879        MarkTankAsBad(pTSPtr);
880        continue;
881      }
882      if(WriteLIndex(pTSPtr,0,CurrentTime) < 0)
883      {
884        MarkTankAsBad(pTSPtr);
885        continue;
886      }
887      pTSPtr->isConfigured = 1;
888    }  /* End if !Tank Already Configured (for better or worse) */
889  }  /* End for tank in configTankList */
890
891  /* Throw out the bad tanks, so that we are not carrying excess baggage.
892     If PleaseContinue is not set, and a bad tank is found,
893     RemoveBadTanksFromList() will exit() wave_server
894  */
895  if(RemoveBadTanksFromList(pConfigTankList))
896  {
897    logit("et","wave_server: RemoveBadTanksFromList() failed.  Exiting\n");
898    exit(-1);
899  }
900
901  /* Set the nTanks and Tank variables used throughout the program */
902  nTanks=pConfigTankList->NumOfTS;
903  Tanks=pConfigTankList->pFirstTS;
904
905  /* All tanks now configured
906     List of tanks is in pConfigTankList
907   **********************************/
908
909  /* Sort tanks using CRTLIB func qsort().
910     Davidk 10/5/98
911   **********************************/
912  qsort(Tanks,nTanks,sizeof(TANK), CompareTankSCNLs);
913
914  /* Turn Socket level debugging On/Off
915   **********************************/
916  setSocket_ewDebug(SOCKET_ewDebug);
917
918  /* Create one mutex for each tank
919    ********************************/
920  for(i=0;i<nTanks;i++)
921  {
922    CreateSpecificMutex(&(Tanks[i].mutex));
923  }
924
925  /* Create a mutex fot IndexMgr Termination */
926  CreateSpecificMutex(&(IndexTerminationMutex));
927
928
929  /* Allocate message buffers
930  **************************/
931  for (i=0;i<nTanks;i++)       /* find the largest record size in any tank */
932    if(Tanks[i].recSize > MaxMsgSiz) MaxMsgSiz=Tanks[i].recSize;
933
934  if(Debug > 1)
935       logit("","MaxMsgSiz: %d\n",MaxMsgSiz);
936
937  deQMsg = (char*)malloc( MaxMsgSiz+sizeof(int) );
938  if ( deQMsg == (char *) NULL )
939  {
940    logit( "e","wave_serverV: Error allocating inMsg buffer; exiting!\n" );
941    exit( -1 );
942  }
943  inMsg = deQMsg + sizeof(int);   /* recall that the tank number is pasted on the front of the
944                                     queued message by the stacker thread */
945  scrMsg = (char*)malloc( MaxMsgSiz );
946  if ( scrMsg == (char *) NULL )
947  {
948    logit( "e","wave_serverV: Error allocating scrMsg buffer; exiting!\n" );
949    exit( -1 );
950  }
951
952  /* Create a mutex for the queue
953  ******************************/
954  CreateSpecificMutex( &QueueMutex );
955
956  /* Initialize the message queue
957  *******************************/
958  initqueue ( &OutQueue, (unsigned long)InputQueueLen,(unsigned long)(MaxMsgSiz+sizeof(int)) );
959
960  /* Load array of logos to get
961    ****************************/
962  /* the deal here is to construct an array of distinct logos to be gotten. The Tank structures
963     contain logos for each tank, but many of them are likely the same. No point loading down
964     tport with all that */
965  for (i=0;i<nTanks;i++)  ServeLogo[i].instid = 0; /* clear  array */
966  ServeLogo[0]=Tanks[0].logo; /* hand load the first logo. */
967  NLogos=1;  /* Number of entries in ServeLogo */
968
969  for (i=0;i<nTanks;i++)
970  {
971    grabit=1;
972    for(j=0; j<NLogos; j++) /* look at all the logos we have so far */
973    {
974      if ( Tanks[i].logo.instid == ServeLogo[j].instid &&
975           Tanks[i].logo.mod    == ServeLogo[j].mod    &&
976           Tanks[i].logo.type   == ServeLogo[j].type     )  /* then it's old hat */
977      {
978        grabit=0;
979      }
980    }
981    if( grabit == 1)
982    {
983      ServeLogo[NLogos++]=Tanks[i].logo;
984      printf("ServeLogo: %d %d %d \n",ServeLogo[NLogos-1].instid,
985             ServeLogo[NLogos-1].mod,
986             ServeLogo[NLogos-1].type);
987    }
988  }  /* for < NLogos */
989  if(Debug > 1)
990  {
991    logit("","Acquiring: %d logos:\n",NLogos);
992    for(i=0;i<NLogos;i++)
993      logit("","Logo:  %d %d %d \n",ServeLogo[i].instid,ServeLogo[i].mod,ServeLogo[i].type);
994  }
995
996  /* Set up the signal handler so we can shut down gracefully */
997#if defined(_SOLARIS) || defined(_LINUX) || defined(_MACOSX)
998  signal(SIGINT, signal_hdlr);     /* <Ctrl-C> interrupt */
999  signal(SIGTERM, signal_hdlr);    /* program termination request */
1000 #ifdef SIGBREAK
1001  signal(SIGBREAK, signal_hdlr);   /* keyboard break */
1002 #endif  /* SIGBREAK */
1003  signal(SIGABRT, signal_hdlr);    /* abnormal termination */
1004#else
1005#ifdef _WINNT
1006  SetConsoleCtrlHandler( (PHANDLER_ROUTINE) CtrlHandler, TRUE ); 
1007#endif _WINNT
1008#endif _SOLARIS
1009
1010  /* Create a server thread info array before starting any threads.  The info array
1011     information is set in the serverthread and servermgr, but is read by the
1012     indexmgr thread.  */
1013  ServerThreadInfo=(ServerThreadInfoStruct *)
1014    malloc(MaxServerThreads * sizeof(ServerThreadInfoStruct));
1015  memset(ServerThreadInfo,0,MaxServerThreads * sizeof(ServerThreadInfoStruct));
1016  /* Create a previous array for comparisons done by ReportServerThreadStatus() */
1017  ServerThreadInfoPrevious=(ServerThreadInfoStruct *)
1018    malloc(MaxServerThreads * sizeof(ServerThreadInfoStruct));
1019  memset(ServerThreadInfoPrevious,0,MaxServerThreads * sizeof(ServerThreadInfoStruct));
1020
1021
1022  /* Start the Server Manager Thread */
1023  /***********************************/
1024  if (StartThread(ServerMgr, (unsigned)THREAD_STACK, &tidServerMgr) == -1)
1025  {
1026    logit("e", "wave_serverV: error starting ServerMgr. Exiting.\n");
1027    exit(-1);
1028  }
1029
1030
1031  /* Start the Index Manager Thread */
1032  /***********************************/
1033  if (StartThread(IndexMgr, (unsigned)THREAD_STACK, &tidIndexMgr) == -1)
1034  {
1035    logit("e", "wave_serverV: error starting IndexMgr. Exiting.\n");
1036    exit(-1);
1037  }
1038
1039
1040  /* Start the message stacking Thread */
1041  /*************************************/
1042
1043  MessageStackerStatus =-1; /* it should announce life to us */
1044  if (StartThread(MessageStacker, (unsigned)THREAD_STACK, &tidStacker) == -1)
1045  {
1046    logit("e", "wave_serverV: error starting MessageStacker. Exiting.\n");
1047    exit(-1);
1048  }
1049
1050  /* Initialize bAbortMainLoop flag to FALSE */
1051  /*******************************************/
1052  bAbortMainLoop = FALSE;
1053
1054  /* Working loop: get messages from ring, and plop into tank
1055   ***********************************************************/
1056  while(!terminate)
1057    /***********************************
1058      start of loop over messages ***/
1059  {
1060
1061    /* Get message from queue
1062     *************************/
1063    RequestSpecificMutex( &QueueMutex );
1064    result=dequeue( &OutQueue, deQMsg, &msgInLen, &logo);
1065    if(QueueLowWaterMark > OutQueue.NumOfElements)
1066      QueueLowWaterMark = OutQueue.NumOfElements;
1067    ReleaseSpecificMutex( &QueueMutex );
1068
1069    /* See if it's time to stop */
1070    flag = tport_getflag(&Region);
1071    if( flag == TERMINATE  ||  flag == myPid )  break;
1072
1073    if( result < 0 ) /* -1 means empty queue */
1074    {
1075      /* Send wave_server's heartbeat */
1076      if( time(&timeNow)-timeLastBeat >= HeartBeatInt )
1077      {
1078        timeLastBeat = timeNow;
1079        wave_serverV_status( TypeHeartBeat, 0, "" );
1080
1081                /* check for db purge if using db at all*/
1082                if(bUsePacketSyncDb && !(++nPurgeDbCount % nPurgeDbFreq)) {
1083                        /* get oldest tank record*/
1084                        PurgeTime = IndexOldest(&Tanks[0])->tStart;
1085                        for(i=1; i < nTanks; i++) {
1086                                if(IndexOldest(&Tanks[i])->tStart < PurgeTime) {
1087                                        PurgeTime = IndexOldest(&Tanks[i])->tStart;
1088                                }
1089                        }
1090                        /* if needs purging then purge*/
1091                        if(LastPurgeTime != PurgeTime) {
1092                                tbdb_purge_prior_packets(pPacketDb, PurgeTime);
1093                                LastPurgeTime = PurgeTime;
1094                        }
1095                }
1096      }
1097
1098      sleep_ew(100);
1099      continue;
1100    }
1101
1102    /* Extract the tank number
1103     *************************/
1104    /* Recall, it was pasted as an int on the front of the message by the MessageStacker thread */
1105    t = *((int*)deQMsg);
1106    if(Debug > 1)
1107       logit("","From queue: msg of %ld bytes for tank %d\n",msgInLen-sizeof(int),t);
1108
1109    msgInLen = msgInLen -sizeof(int);  /* correct message length */
1110
1111
1112    /* Swap the message to local notation AND MEMORY ALIGN!!!
1113     *************************************/
1114    memcpy((void *)&Tpkt, (void *)inMsg, msgInLen);
1115    trh = (TRACE2_HEADER *) &Tpkt;
1116
1117    /*  from here on in, only use Tpkt to refer to the msg,
1118        or trh to the header, this is mem aligned
1119        *************************************************/
1120    rc = WaveMsg2MakeLocal( trh );
1121
1122    if(rc < 0)
1123    {
1124      logit("et","WARNING: WaveMsg2MakeLocal() rejected tracebuf.  Discarding: (%s.%s.%s.%s)\n",
1125        trh->sta, trh->chan, trh->net, trh->loc);
1126      logit("e", "\t%.2f - %.2f nsamp=%d samprate=%6.2f endtime=%.2f. datatype=[%s]\n",
1127        trh->starttime, trh->endtime, trh->nsamp, trh->samprate, 
1128        IndexYoungest(&Tanks[t])->tEnd, trh->datatype);
1129      goto ReleaseMutex;
1130    }
1131
1132    /* Grab the tank file
1133     ********************/
1134    RequestSpecificMutex( &(Tanks[t].mutex) );
1135
1136    if(!TraceBufIsValid(&Tanks[t],trh))
1137    {
1138        logit("et","WARNING: Tracebuf seems invalid.  Discarding: (%s.%s.%s.%s)\n",
1139              trh->sta, trh->chan, trh->net, trh->loc);
1140        logit("e", "\t%.2f - %.2f nsamp=%d samprate=%6.2f endtime=%.2f. datatype=[%s]\n",
1141              trh->starttime, trh->endtime, trh->nsamp, trh->samprate,
1142              IndexYoungest(&Tanks[t])->tEnd, trh->datatype);
1143        goto ReleaseMutex;
1144    }
1145
1146        /* ronb 040507 - check if incoming tracebuf overlaps tank.
1147        if it does then save overlapped chunk and write portion that
1148        doesn't overlap to tank*/
1149        if((trh->starttime < IndexYoungest(&Tanks[t])->tEnd) && bUsePacketSyncDb) {
1150 
1151                /* tracebuf partially overlaps. write to db.*/
1152                logit("e", "WARNING: Trace overlaps tankdata saving to db.\n");
1153               
1154                /* currently write entire trace buffer to packet db.*/
1155                if(tbdb_put_packet(pPacketDb, &trh, inMsg, msgInLen) != TBDB_OK) {
1156                        logit("e", "ERROR: failed to write async packet to db\n");     
1157                }
1158                goto ReleaseMutex;
1159        }
1160
1161         /* Seek to insertion point
1162     ********************/
1163    if ( fseek( Tanks[t].tfp, Tanks[t].inPtOffset, SEEK_SET ) != 0 )  /* move to insertion point */
1164    {
1165      logit( "et", "wave_serverV: Error on fseek in tank [%s].\n",Tanks[t].tankName );
1166      strncpy(szFileFunction,"fseek",sizeof(szFileFunction));
1167      szFileFunction[sizeof(szFileFunction)]= 0;
1168      iFileOffset     = Tanks[t].inPtOffset;
1169      iFileBlockSize  = 0;
1170      szFileTankName  = Tanks[t].tankName;   /* don't copy just point */
1171      if(bAbortOnSingleTankFailure)
1172        goto abortAndReleaseMutex;
1173      else
1174        goto ReleaseMutex;
1175    }
1176    /* Be sure message is not too big */
1177    if ( msgInLen > Tanks[t].recSize )
1178    {
1179      sprintf( Text, "tank %s.%s.%s.%s msg too big (%d > %d).\n",
1180               Tanks[t].sta, Tanks[t].chan, Tanks[t].net, Tanks[t].loc,
1181               msgInLen, Tanks[t].recSize);
1182      wave_serverV_status(TypeError, ERR_TOOBIG, Text);
1183      goto ReleaseMutex;
1184    }
1185    /* Write message to Tank file */
1186    if ( (long) fwrite( (void*)&Tpkt, Tanks[t].recSize, 1, Tanks[t].tfp ) != 1 )
1187    {               /* NOTE: we always write a FULL record */
1188      logit( "et", "wave_serverV: Error writing to tank %s; exiting!\n",Tanks[t].tankName );
1189      strncpy(szFileFunction,"fwrite",sizeof(szFileFunction));
1190      szFileFunction[sizeof(szFileFunction)]= 0;
1191      iFileOffset     = Tanks[t].inPtOffset;
1192      iFileBlockSize  = Tanks[t].recSize;
1193      szFileTankName  = Tanks[t].tankName;   /* don't copy just point.  The original should be
1194                                                around for a while, and we aren't modifying */
1195      bIssueIOStatusError = TRUE;
1196      if(bAbortOnSingleTankFailure)
1197        goto abortAndReleaseMutex;
1198      else
1199        goto ReleaseMutex;
1200    }
1201    /* Update the insertion point
1202     *****************************/
1203    tmpOffset=Tanks[t].inPtOffset;
1204    if ( ftell(Tanks[t].tfp) != Tanks[t].inPtOffset + Tanks[t].recSize )
1205    {
1206      logit("et"," wave_serverV: bad offset %ld from ftell updating insertion point on %s\n",
1207            Tanks[t].inPtOffset,Tanks[t].tankName);
1208      logit("et"," wave_serverV:  ftell(Tanks[t].tfp): %ld\n",  ftell(Tanks[t].tfp));
1209      strncpy(szFileFunction,"ftell",sizeof(szFileFunction));
1210      szFileFunction[sizeof(szFileFunction)]= 0;
1211      iFileOffset     = Tanks[t].inPtOffset;
1212      iFileBlockSize  = Tanks[t].recSize;
1213      szFileTankName  = Tanks[t].tankName;   /* don't copy just point */
1214      if(bAbortOnSingleTankFailure)
1215        goto abortAndReleaseMutex;
1216      else
1217        goto ReleaseMutex;
1218    }
1219    Tanks[t].inPtOffset = Tanks[t].inPtOffset + Tanks[t].recSize;
1220
1221    /* Check for wrap of tank
1222     *****************************/
1223    /* Davidk 4/9/98, changed this comparison from > to >=, with the mindset
1224       that once the offset point hits the tank length, it should not write anymore
1225       records, it should instead wrap back to the beginning of the tank.  If it waits
1226       until the offset is > than the tanksize, then if everything is on the up and up,
1227       it should write 1 record passed the logical end of the tank, because the last
1228       record will be written starting at inPtOffset == tanksize, since tanksize is
1229       a multiple of record size.  Records should be written at (0,+rsz,0+2*rsz,..,(tsz/rsz)*rsz),
1230       where the last write in the sequence, (tsz/rsz)*rsz = tsz.
1231    */
1232    if ( Tanks[t].inPtOffset >= Tanks[t].tankSize )
1233    {
1234      Tanks[t].inPtOffset=0;
1235      Tanks[t].firstPass=0; /* we've been here before */
1236    }
1237
1238    if(UpdateIndex(&Tanks[t],trh,tmpOffset, TRUE))
1239    {
1240      strncpy(szFileFunction,"UpdateIndex",sizeof(szFileFunction));
1241      szFileFunction[sizeof(szFileFunction)]= 0;
1242      iFileOffset     = Tanks[t].inPtOffset;
1243      iFileBlockSize  = Tanks[t].recSize;
1244      szFileTankName  = Tanks[t].tankName;   /* don't copy just point */
1245      if(bAbortOnSingleTankFailure)
1246        goto abortAndReleaseMutex;
1247      else
1248        goto ReleaseMutex;
1249    }
1250
1251    ReleaseSpecificMutex( &(Tanks[t].mutex) );
1252    continue;
1253
1254  abortAndReleaseMutex:
1255    bAbortMainLoop=1;
1256
1257  ReleaseMutex:
1258    ReleaseSpecificMutex( &(Tanks[t].mutex) );
1259    if(bIssueIOStatusError)
1260    {
1261      IssueIOStatusError(szFileFunction,iFileOffset,iFileBlockSize,szFileTankName);
1262      bIssueIOStatusError = FALSE;
1263    }
1264
1265    if(bAbortMainLoop)
1266    {
1267      break;
1268    }
1269  } /*** end of working loop over messages ***/
1270
1271
1272
1273  /* We're here either due to some error, or we're supposed to shut down.
1274   **************************************************************************/
1275  logit ("et","wave_serverV: Shutdown initiated.\n");
1276
1277  /* Tell the other threads that it is time to shutdown */
1278  terminate=1;
1279
1280  for (i=0;i<nTanks;i++)
1281  {
1282    if (Debug > 1)
1283       logit("et","wave_serverV: Closing tank file %s.\n",Tanks[i].tankName);
1284
1285    if( fclose(Tanks[i].tfp) != 0)
1286      logit(""," Error on fclose on tank file %s\n", Tanks[i].tankName);
1287    /* End of comment */
1288  }
1289
1290  /* At startup, the IndexMgr thread grabs the IndexTerminationMutex,
1291      periodically, it checks to terminate, to see if it is 1, meaning
1292      it should shutdown.  Upon seeing the terminate flag, it writes
1293      the current indexes to disk, and then writes the TANK structures
1294      to disk.  Then it closes all file pointers, and finally releases
1295      the IndexTerminationMutex, to indicate to the main thread that it
1296      has completed.
1297   */
1298
1299  if (Debug > 2)
1300     logit("et","wave_serverV:main(): Waiting for IndexTerminationMutex\n");
1301
1302  RequestSpecificMutex(&IndexTerminationMutex);
1303
1304   /* Just to be clean, release the mutex */
1305  if (Debug > 2)
1306     logit("et","wave_serverV:main(): Got IndexTerminationMutex\n");
1307
1308  ReleaseSpecificMutex(&IndexTerminationMutex);
1309
1310  if (Debug > 2)
1311     logit("et","wave_serverV:main(): Released IndexTerminationMutex\n");
1312
1313  /* It is now OK to terminate */
1314
1315  if (Debug > 1)
1316    logit("et","wave_server: Detaching from shared memory.\n");
1317
1318 
1319  tport_detach( &Region );
1320 
1321  /* close packet db if its open. ronb041207
1322        NOTE: this should be null if bUsePacketSyncDb = 0*/
1323   if(pPacketDb) {
1324                tbdb_close(&pPacketDb);
1325                pPacketDb = NULL;
1326   }
1327
1328   logit("et","wave_serverV: Shutdown complete. Indexes saved to disk. Exiting\n");
1329  exit(0);
1330  return(0);
1331}  /* End of main() */
1332/*---------------------------------------------------------------------------*/
1333
1334
1335/********************** Message Stacking Thread *******************
1336 *           Move messages from transport to memory queue         *
1337 ******************************************************************/
1338thr_ret MessageStacker( void *dummy )
1339{
1340  static char        *msg;            /* retrieved message               */
1341  static int          res;
1342  static long         msgInLen;       /* size of retrieved message             */
1343  static MSG_LOGO     reclogo;        /* logo of retrieved message             */
1344  static int          ret;
1345  static int          t;
1346  static int             sayItOnce=0;
1347  static int          NumOfTimesQueueLapped=0;
1348  static time_t       tLastQLComplaint;
1349  static int          NumLastReportedQueueLapped=0;
1350  static TANK        *pTempTank;
1351  unsigned char       seq;
1352
1353
1354  /* Set time of last queue lapped complaint to now, that way there is an X second
1355     buffer during startup.
1356  *******************************************/
1357  time(&tLastQLComplaint);
1358
1359  /* Allocate space for input/output messages
1360   *******************************************/
1361  /* note that we allocate an int's worth at the start of the message: this
1362     will contain the tank number this message is headed for. To save doing
1363     a second set of string compares when the message is pulled from the queue */
1364  if ( ( msg = (char *) malloc(sizeof(int)+MaxMsgSiz) ) == (char *) NULL )
1365  {
1366    logit( "e", "wave_serverV MessageStacker: error allocating msg; exiting!\n" );
1367    goto error;
1368  }
1369
1370  /* Tell the main thread we're ok
1371   ********************************/
1372  MessageStackerStatus=0;
1373
1374  /* Flush the transport ring
1375   ************************/
1376  while ( tport_copyfrom( &Region, ServeLogo, (short)NLogos, &reclogo,
1377                        &msgInLen, msg+sizeof(int), MaxMsgSiz, &seq ) != GET_NONE );
1378
1379  /* Loop (almost) forever getting messages from transport
1380  ***********************************************/
1381  while( !terminate )
1382  {
1383    /* Get a message from transport ring
1384     ************************************/
1385    res = tport_copyfrom( &Region, ServeLogo, (short)NLogos, &reclogo,
1386                        &msgInLen, msg+sizeof(int), MaxMsgSiz, &seq );
1387
1388    if( res == GET_NONE ) {sleep_ew(100); continue;} /*wait if no messages for us */
1389
1390    /* Check return code; report errors
1391     ***********************************/
1392    if( res != GET_OK )
1393    {
1394      if( res==GET_TOOBIG )
1395      {
1396        if(sayItOnce==0)
1397        {
1398          sprintf( Text, "tport msg[%ld] i%d m%d t%d too big. Max is %ld",
1399                   msgInLen, (int) reclogo.instid, (int) reclogo.mod,
1400                   (int)reclogo.type, MaxMsgSiz );
1401          wave_serverV_status( TypeError, ERR_TOOBIG, Text );
1402          sayItOnce =1;
1403        }
1404        continue;
1405      }
1406      else if( res==GET_MISS_LAPPED )
1407      {
1408        sprintf( Text, "missed msgs from i%d m%d t%d sq%d in %s (overwritten)",
1409                 (int) reclogo.instid, (int) reclogo.mod, (int)reclogo.type,
1410                 (int) seq, RingName );
1411        wave_serverV_status( TypeError, ERR_MISSMSG, Text );
1412      }
1413      else if( res==GET_MISS_SEQGAP )
1414      {
1415        sprintf( Text, "saw sequence gap before i%d m%d t%d sq%d in %s",
1416                 (int) reclogo.instid, (int) reclogo.mod, (int)reclogo.type,
1417                 (int) seq, RingName );
1418        wave_serverV_status( TypeError, ERR_MISSMSG, Text );
1419      }
1420      else if( res==GET_NOTRACK )
1421      {
1422        sprintf( Text, "no tracking for logo i%d m%d t%d in %s",
1423                 (int) reclogo.instid, (int) reclogo.mod, (int)reclogo.type,
1424                 RingName );
1425        wave_serverV_status( TypeError, ERR_NOTRACK, Text );
1426      }
1427    }  /* end if res ! GET_OK */
1428
1429    /* See if we want this SCN name
1430     *******************************/
1431
1432    pTempTank=FindSCNL(Tanks,nTanks,
1433                       ((TRACE2_HEADER*)(msg+sizeof(int)))->sta,
1434                       ((TRACE2_HEADER*)(msg+sizeof(int)))->chan,
1435                       ((TRACE2_HEADER*)(msg+sizeof(int)))->net,
1436                       ((TRACE2_HEADER*)(msg+sizeof(int)))->loc);
1437    if(pTempTank)
1438    {
1439      t=(pTempTank-Tanks);  /* both are TANK *'s so sizeof(TANK) is
1440                               already factored in */
1441    }
1442    else
1443    {
1444      continue;
1445    }
1446
1447
1448    /* This message is headed for Tanks "t" */
1449    /***************************************/
1450    /* stick the tank number as an int at the front of the message */
1451    *((int*)msg) = t;
1452    RequestSpecificMutex( &QueueMutex );
1453    ret=enqueue( &OutQueue, msg, msgInLen+sizeof(int), reclogo );
1454    if(QueueHighWaterMark < OutQueue.NumOfElements)
1455      QueueHighWaterMark = OutQueue.NumOfElements;
1456    ReleaseSpecificMutex( &QueueMutex );
1457    if ( ret!= 0 )
1458    {
1459      if (ret==-2)  /* Serious: quit */
1460      {
1461        sprintf(Text,"internal queue error. Terminating.");
1462        wave_serverV_status( TypeError, ERR_QUEUE, Text );
1463        goto error;
1464      }
1465      if (ret==-1)
1466      {
1467        sprintf(Text,"queue cannot allocate memory. Lost message.");
1468        wave_serverV_status( TypeError, ERR_QUEUE, Text );
1469        continue;
1470      }
1471      if (ret==-3)
1472      {
1473        /* sprintf(Text,"Circular queue lapped. Message lost.");
1474           wave_serverV_status( TypeError, ERR_QUEUE, Text );
1475        */
1476        /* Queue is lapped too often to be logged to screen.  Log
1477           circular queue laps to logfile.  Maybe queue laps should
1478           not be logged at all.
1479        */
1480
1481        /* What we want to do is to selectively log queue laps to the screen and statmgr.
1482           The selection that we want to log is, the first occurance of queue laps, and
1483           the number of queue laps over X seconds, as long as the number is > 1.  To do
1484           this, we need to keep a running total of queue lapped messages, a timer indicating
1485           the last time we complained, and a config file value that indicates how often to
1486           complain.
1487        */
1488        NumOfTimesQueueLapped++;
1489        if(time(NULL) - tLastQLComplaint >= SecondsBetweenQueueErrorReports)
1490        {
1491          /* Complain to stat_mgr and to screen */
1492          sprintf(Text,"Circular queue lapped.  %d messages lost.",
1493                  NumOfTimesQueueLapped-NumLastReportedQueueLapped);
1494          NumLastReportedQueueLapped=NumOfTimesQueueLapped;
1495          wave_serverV_status( TypeError, ERR_QUEUE, Text );
1496          time(&tLastQLComplaint);
1497        }
1498
1499        continue;
1500      }
1501    }
1502
1503    if(Debug > 1)
1504       logit("", "queued msg len:%ld for tank %d\n",msgInLen,*((int*)msg) );
1505  }
1506  /* we're quitting
1507   *****************/
1508 error:
1509  MessageStackerStatus=-1; /* file a complaint to the main thread */
1510  KillSelfThread(); /* main thread will restart us */
1511}
1512
1513
1514/*---------------------------------------------------------------------------*/
1515/************************** ServerMgr Thread *******************************
1516                Sets up the passive socket and listens for connect requests.
1517                Starts Server threads, gives them the active sockets, and
1518                wishes them luck. Keeps track of active threads, and does not
1519                start too many threads.
1520******************************************************************************/
1521thr_ret ServerMgr( void *dummy )
1522{
1523  int                  on = 1;
1524  int                  clientLen;
1525#ifdef TO_BE_USED_IN_FUTURE
1526  char                 sysname[40];                    /* system name; filled by getsysname_ew  */
1527#endif
1528  typedef char char16[16];
1529  char16 * client_ip;  /* IP address of client from inet_ntoa   */
1530  struct sockaddr_in   skt;
1531  struct sockaddr_in * client;
1532  int                  freeThrd;
1533  int                  i;
1534
1535
1536
1537#define ACCEPTABLE_TIME_TO_WAIT_FOR_SERVER_THREAD_TO_START 15
1538#ifndef SocketSysInit_after_gethostbyaddr
1539
1540  if (Debug > 2)
1541     logit("et","wave_serverV: SocketSysInit...\n");
1542
1543  /* Initialize the socket system
1544   ******************************/
1545  SocketSysInit();
1546
1547#endif
1548
1549  if (Debug > 2)
1550    logit("", "Wave_serverV: Beginning ServerMgr.\n");
1551
1552
1553  /* Allocate data for variables based on MaxServerThreads, which is defined
1554     in the config file, and meta-initialize any data neccessary.
1555  ******************************************************************/
1556  tidSockSrv=(unsigned int *) malloc(MaxServerThreads * sizeof(int));
1557
1558  /*ActiveSocket=(int *) malloc(MaxServerThreads * sizeof(int));*/
1559  client_ip=malloc(MaxServerThreads * sizeof(char)* 16);
1560  client=(struct sockaddr_in *)malloc(MaxServerThreads * sizeof(struct sockaddr_in));
1561
1562  /* End of malloc and initialization.
1563   *******************************************************************/
1564
1565  /* Initialize all ServerThread Info
1566   **************************************/
1567  for(i=0; i< MaxServerThreads; i++)
1568  {
1569    ServerThreadInfo[i].Status=SERVER_THREAD_IDLE; /* Set status to idle */
1570    ServerThreadInfo[i].ActiveSocket=0; /* Set all Sockets to NULL */
1571    ServerThreadInfo[i].ClientsAccepted=0;
1572    ServerThreadInfo[i].ClientsProcessed=0;
1573    ServerThreadInfo[i].RequestsProcessed=0;
1574    ServerThreadInfo[i].Errors=0;
1575  }
1576
1577  /* 2003.04.14 dbh -- removed inet_addr() and gethostbyaddr() here */
1578
1579  /************************/
1580 HeavyRestart:
1581  /************************/
1582  /* Come here after something broke the passive socket. This should not happen, but it does.
1583     In that case, we're going to re-initialize the socket system, probably drop all clients,
1584     but start the service all over again.
1585  */
1586  if (Debug > 2)
1587    logit("et","wave_serverV: (Re)starting socket system\n");
1588
1589#ifdef SocketSysInit_after_gethostbyaddr
1590
1591  if (Debug > 2)
1592     logit("et","wave_serverV: SocketSysInit...\n");
1593
1594  /* Initialize the socket system
1595   ******************************/
1596  SocketSysInit();
1597
1598#endif
1599
1600  if ( ( PassiveSocket = socket_ew( PF_INET, SOCK_STREAM, 0) ) == -1 )
1601  {
1602    logit( "et", "wave_serverV: Error opening socket. Exiting.\n" );
1603    exit(-1);
1604  }
1605
1606  /* Fill in server's socket address structure
1607   *******************************************/
1608  memset( (char *) &skt, '\0', sizeof(skt) );
1609  /* 2003.04.14 dbh -- formerly used memcpy to set skt.sin_addr from gethostbyaddr() result */
1610
1611#if defined(_LINUX) || defined(_MACOSX)
1612  /* the S_un substruct is not something linux knows about */
1613  if ((int)(skt.sin_addr.s_addr = inet_addr(ServerIPAdr)) == INADDR_NONE)
1614#else
1615  if ((int)(skt.sin_addr.S_un.S_addr = inet_addr(ServerIPAdr)) == INADDR_NONE)
1616#endif
1617  {
1618      logit( "e", "wave_serverV: inet_addr failed for ServerIPAdr <%s>; exiting!\n",ServerIPAdr );
1619      exit( -1 );
1620  }
1621
1622  skt.sin_family = AF_INET;
1623  skt.sin_port   = htons( (short)ServerPort );
1624
1625  /* Allows the server to be stopped and restarted
1626   *********************************************/
1627  on=1;
1628  if(setsockopt( PassiveSocket, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(char *) )!=0)
1629  {
1630    logit( "et", "wave_serverV: Error on setsockopt. Exiting.\n" );
1631    perror("Export_generic setsockopt");
1632    closesocket_ew( PassiveSocket, SOCKET_CLOSE_IMMEDIATELY_EW);
1633    goto HeavyRestart;
1634  }
1635
1636  /* Bind socket to a name
1637   ************************/
1638  if ( bind_ew( PassiveSocket, (struct sockaddr *) &skt, sizeof(skt)) )
1639  {
1640    sprintf(Text,"wave_serverV: error binding socket; Exiting.\n");
1641    perror("wave_serverV bind error");
1642    logit("et","%s",Text );
1643    closesocket_ew( PassiveSocket, SOCKET_CLOSE_IMMEDIATELY_EW);
1644    exit (-1);
1645  }
1646  /* and start listening */
1647  if ( listen_ew( PassiveSocket, MaxServerThreads) )
1648  {
1649    logit("et","wave_serverV: socket listen error; exiting!\n" );
1650    closesocket_ew( PassiveSocket, SOCKET_CLOSE_IMMEDIATELY_EW);
1651    exit(-1);
1652  }
1653
1654  while (!terminate)
1655  {
1656    /* Prepare for connect requests
1657     *******************************/
1658    /* Find an idle thread
1659     **********************/
1660    while(1)
1661    {
1662      for(freeThrd=0; freeThrd<MaxServerThreads; freeThrd++)
1663      {
1664        if (Debug > 2)
1665          logit("","ServerMgr: ServerThreadStatus[%d]=%d\n",
1666                freeThrd,ServerThreadInfo[freeThrd].Status);
1667
1668        if (ServerThreadInfo[freeThrd].Status == SERVER_THREAD_IDLE
1669            || ServerThreadInfo[freeThrd].Status == SERVER_THREAD_COMPLETED
1670            || ServerThreadInfo[freeThrd].Status == SERVER_THREAD_ERROR)
1671          goto gotthrd;
1672        /* As long as the thread is not busy, grab it. */
1673      }
1674      logit("et","Cannot accept client: out of server threads. Waiting.\n");
1675      sleep_ew(WAIT_FOR_SERVER_THREAD); /* get in line and wait */
1676    }
1677  gotthrd:
1678
1679    /*  First Check thread state */
1680    switch(ServerThreadInfo[freeThrd].Status)
1681    {
1682    case SERVER_THREAD_COMPLETED:
1683      {
1684        /* Everything is OK, the thread has completed a previous task */
1685        ServerThreadInfo[freeThrd].Status=SERVER_THREAD_IDLE;
1686      }  /* fall through to next case... */
1687    case SERVER_THREAD_IDLE:
1688      {
1689        /* The thread is ready, break */
1690        break;
1691      }
1692    case SERVER_THREAD_ERROR:
1693      {
1694        /* The thread died on a previous task.  Do whatever cleanup
1695           that is neccessary.  Proclaim senseless thread death!
1696        */
1697        logit("et","Wave_serverV:Server thread %d was found to be dead.  Recycling.\n\t%s\n",
1698              freeThrd,"This is a VERY BAD THING!");
1699        ServerThreadInfo[freeThrd].Status=SERVER_THREAD_IDLE;
1700        break;
1701      }
1702    default:
1703      {
1704        /* The thread is still busy.  We shouldn't be here,
1705           self-destruct
1706        */
1707        logit("et","Wave_serverV:Thread %d was selected as free, "
1708              "but status shows that it is still busy.\n", freeThrd);
1709        goto HeavyRestart;
1710      }
1711    }
1712
1713    /* Accept a connection (blocking)
1714     *********************************/
1715    clientLen = sizeof( client[freeThrd] );
1716
1717    if (Debug > 1)
1718      logit( "","wave_serverV: Waiting for new connection for thread %d.\n",
1719             freeThrd );
1720
1721    if( (ServerThreadInfo[freeThrd].ActiveSocket =
1722         accept_ew( PassiveSocket, (struct sockaddr*) &client[freeThrd],
1723                    &clientLen, -1 /*SocketTimeoutLength*/) ) == INVALID_SOCKET )
1724    {
1725      logit("et","wave_serverV: error on accept. Exiting\n" );
1726      closesocket_ew( ServerThreadInfo[freeThrd].ActiveSocket,
1727                      SOCKET_CLOSE_IMMEDIATELY_EW );
1728      goto HeavyRestart;
1729    }
1730    strcpy( client_ip[freeThrd], inet_ntoa(client[freeThrd].sin_addr) );
1731
1732    if (Debug > 1)
1733      logit("et", "wave_serverV: Connection accepted from IP address %s\n",
1734                                                                                                                                client_ip[freeThrd] );
1735
1736    /* Start the Server thread
1737     **************************/
1738    if (Debug > 1)
1739      logit("","Starting ServerThread %d \n",freeThrd);
1740
1741    if ( StartThreadWithArg(  ServerThread, &freeThrd, (unsigned)THREAD_STACK, &tidSockSrv[freeThrd] )  == -1 )
1742    {
1743      logit( "e", "wave_serverV: Error starting  SocketSender thread. Exiting.\n" );
1744      closesocket_ew( ServerThreadInfo[freeThrd].ActiveSocket,
1745                      SOCKET_CLOSE_IMMEDIATELY_EW );
1746      goto HeavyRestart;
1747    }
1748    /* Wait for it to wake up */
1749    /**************************/
1750    /* otherwise we'll see it as idle on the next pass through this loop */
1751    if (Debug > 1)
1752      logit("","ServerMgr waiting for thread %d to come alive\n",freeThrd);
1753
1754
1755    { /* Begin of block to check for thread start status */
1756      time_t WaitForThreadStartTime, WaitForThreadCurrTime;
1757      time(&WaitForThreadStartTime);
1758      while( ServerThreadInfo[freeThrd].Status == SERVER_THREAD_IDLE
1759             && (WaitForThreadStartTime + ACCEPTABLE_TIME_TO_WAIT_FOR_SERVER_THREAD_TO_START > time(&WaitForThreadCurrTime)) )
1760      {
1761        sleep_ew(1);
1762      }
1763      if(WaitForThreadStartTime + ACCEPTABLE_TIME_TO_WAIT_FOR_SERVER_THREAD_TO_START
1764         < time(&WaitForThreadCurrTime))
1765      {
1766        logit("et","Thread %d never went to busy.  Marking thread as dead, and continuing.\n",freeThrd);
1767        ServerThreadInfo[freeThrd].Status = SERVER_THREAD_ERROR;
1768        /* We now have this confused thread running around out there,
1769           detached from normal life.  We should probably do something
1770           besides wave goodbye.
1771        */
1772      }
1773    }  /* End of block to check for thread start status */
1774  }
1775  closesocket_ew(PassiveSocket, SOCKET_CLOSE_GRACEFULLY_EW);
1776}                    /*** end of ServerMgr thread ***/
1777
1778/*---------------------------------------------------------------------------*/
1779
1780
1781/************************** IndexMgr Thread *******************************
1782Periodically writes the current TANK structures, and memory indexes to disk.
1783******************************************************************************/
1784thr_ret IndexMgr( void *dummy )
1785{
1786
1787
1788  TANKList * pMyTANKList;
1789  unsigned int i;
1790  time_t CurrentTime, LastTANKListWrite, LastTime, LastQueueReport;
1791  time_t * pFirstTimeStamp, * pSecondTimeStamp;
1792  unsigned int TANKsSize,TANKListBufferSize;
1793  char * pTANKsInBuffer, *pTANKListBuffer;
1794  int * pNumOfTANKs;
1795
1796  /* Create a buffer to store our personal snapshot of the
1797     tank buffers.  The idea is to frequently take snapshots
1798     of the TANKs and save them to disk without slowing down
1799     the main threads tank update mechanism
1800  */
1801  if((pMyTANKList=CreatePersonalTANKBuffer(pConfigTankList)) == NULL)
1802  {
1803    logit("et","wave_serverV:IndexMgr():  failed to create tank buffer.  Exiting\n");
1804    exit(-1);
1805  }
1806
1807  RequestSpecificMutex(&IndexTerminationMutex);
1808
1809  /* Before starting, write a current copy of the TANKs to
1810     the TANKs file.
1811  */
1812  time(&LastTANKListWrite);
1813  /* Note: GetTANKSs uses tank mutexes here (last argument is TRUE). If
1814   * we get the terminate flag in the middle of this call to GetTANKS,
1815   * it is possible that a Server Thread could get killed holding a
1816   * tank mutex. This would block GetTANKS forever until wave_serverV gets
1817   * killed by startstop. PNL 1/10/00 */
1818  if(GetTANKs(Tanks,pMyTANKList->pFirstTS,pMyTANKList->NumOfTS,TRUE))
1819  {
1820    logit("et","wave_serverV:IndexMgr():  failed to get copy of tank structures.  Exiting\n");
1821    exit(-1);
1822  }
1823
1824  TANKListBufferSize = sizeof(time_t) + sizeof(int)
1825    + (pMyTANKList->NumOfTS * sizeof(TANK))
1826    + sizeof(time_t);
1827  if((pTANKListBuffer=malloc(TANKListBufferSize)) == NULL)
1828  {
1829    logit("et","wave_serverV:IndexMgr():  failed to malloc tank list buffer.  Exiting\n");
1830    exit(-1);
1831  }
1832
1833  /* Write the number of tanks to the correct position in the buffer.
1834     The buffer should look like:
1835     TimeStamp1 NumOfTanks <TANKs> TimeStamp2
1836  */
1837  pNumOfTANKs=(int *)(pTANKListBuffer + sizeof(time_t));
1838  *pNumOfTANKs=pMyTANKList->NumOfTS;
1839
1840  /* Create Shortcuts to the TimeStamp and TANKs locations */
1841  pFirstTimeStamp=(time_t *)pTANKListBuffer;
1842  pTANKsInBuffer=pTANKListBuffer+sizeof(time_t)+sizeof(int);
1843  pSecondTimeStamp=(time_t *)(pTANKListBuffer+TANKListBufferSize-sizeof(time_t));
1844  TANKsSize=pMyTANKList->NumOfTS * sizeof(TANK);
1845
1846  /*WriteTANKList()*/
1847  time(pFirstTimeStamp);
1848  memcpy(pTANKsInBuffer,pMyTANKList->pFirstTS,TANKsSize);
1849  memcpy(pSecondTimeStamp, pFirstTimeStamp, sizeof(time_t));
1850  if(WriteRFile(pMyTANKList->pTSFile,0,TANKListBufferSize,pTANKListBuffer,1) != 1)
1851  {
1852    logit("et","wave_serverV:IndexMgr():  failed to write tank list to file.  Exiting\n");
1853    exit(-1);
1854  }
1855
1856  CurrentTime=LastTANKListWrite;
1857
1858  {
1859    LastQueueReport=CurrentTime;
1860
1861    if (Debug > 0)
1862       logit("et","Queue Report: High Mark %d, Low Mark %d\n",
1863                                                                QueueHighWaterMark,QueueLowWaterMark);
1864
1865    QueueLowWaterMark=QueueHighWaterMark;
1866    QueueHighWaterMark=0;
1867  }
1868
1869  while(!terminate)
1870  {
1871    LastTime=CurrentTime;
1872
1873    time(&CurrentTime);
1874    if(CurrentTime-LastQueueReport >= QueueReportInterval)
1875    {
1876      LastQueueReport=CurrentTime;
1877
1878      if (Debug > 0)
1879          logit("et","Queue Report: High Mark %d, Low Mark %d\n",
1880                                                                        QueueHighWaterMark,QueueLowWaterMark);
1881
1882      QueueLowWaterMark=QueueHighWaterMark;
1883      QueueHighWaterMark=0;
1884
1885      /* Do A Server Thread Status Report */
1886      if (Debug > 0)
1887          ReportServerThreadStatus();
1888    }
1889    if(LastTime==CurrentTime)
1890    {
1891      sleep_ew(1000);
1892      continue;
1893    }
1894    /* else */
1895    if(CurrentTime-TankStructUpdate >= LastTANKListWrite)
1896    {
1897      LastTANKListWrite=CurrentTime;
1898      if(GetTANKs(Tanks,pMyTANKList->pFirstTS,pMyTANKList->NumOfTS,TRUE))
1899      {
1900        logit("et","wave_serverV:IndexMgr():  failed to get copy of tank structures.  Exiting\n");
1901        exit(-1);
1902      }
1903
1904      /*WriteTANKList()*/
1905      *pFirstTimeStamp=CurrentTime;
1906      memcpy(pTANKsInBuffer,pMyTANKList->pFirstTS,TANKsSize);
1907      *pSecondTimeStamp=*pFirstTimeStamp;
1908      if(WriteRFile(pMyTANKList->pTSFile,0,TANKListBufferSize,pTANKListBuffer,1) != 1)
1909      {
1910        logit("et","wave_serverV:IndexMgr():  failed to write tank list to file.  Exiting\n");
1911        exit(-1);
1912      }
1913    }
1914
1915    for(i=0;i<(pMyTANKList->NumOfTS);i++)
1916    {
1917      if(CurrentTime-IndexUpdate >= Tanks[i].lastIndexWrite)
1918      {
1919        if(WriteLIndex(&(Tanks[i]),1,CurrentTime))
1920        {
1921          logit("t","WriteLIndex() failed for %s\n", Tanks[i].tankName);
1922        }
1923      }
1924      if(terminate) break;
1925    }  /* End for loop through all tank indexes */
1926
1927  }  /* End: while !terminated */
1928
1929  /* We got terminated.  Save the women, children,
1930     and indexes first, and then save the TANKList file
1931  */
1932
1933  if (Debug > 1)
1934     logit("t","IndexMgr: Got terminate flag!\n");
1935
1936  time(&CurrentTime);
1937  /* Saving indexes
1938   ********************/
1939  for(i=0;i<(pMyTANKList->NumOfTS);i++)
1940  {
1941    if (Debug > 1)
1942       logit("t","IndexMgr: Writing Tank %s!\n",Tanks[i].tankName);
1943
1944    WriteLIndex(&(Tanks[i]),1,CurrentTime);
1945  }
1946
1947  /* Getting and Saving TANKList.  Copy it into
1948     the Tank list buffer, so that we can execute a single write..
1949     Do NOT use mutexes.
1950  ********************/
1951  if (Debug > 2)
1952    logit("t","IndexMgr: Getting Tanks!\n");
1953
1954  GetTANKs(Tanks,pMyTANKList->pFirstTS,pMyTANKList->NumOfTS,FALSE);
1955  /*WriteTANKList()*/
1956
1957  time(pFirstTimeStamp);
1958  memcpy(pTANKsInBuffer,pMyTANKList->pFirstTS,TANKsSize);
1959  memcpy(pSecondTimeStamp, pFirstTimeStamp, sizeof(time_t));
1960
1961  if (Debug > 2)
1962     logit("t","IndexMgr: Writing TANKlist to file!\n");
1963
1964  WriteRFile(pMyTANKList->pTSFile,0,TANKListBufferSize,pTANKListBuffer,1);
1965
1966  /* HandleAllFinishingTouches
1967   **********************/
1968  if (Debug > 2)
1969     logit("t","IndexMgr: Cleaning up!\n");
1970
1971  CleanupIndexThread(pMyTANKList);
1972
1973  /* Allow the main thread to proceed
1974   **********************/
1975   if (Debug > 2)
1976     logit("t","IndexMgr: Releasing IndexTerminationMutex!\n");
1977
1978  ReleaseSpecificMutex(&IndexTerminationMutex);
1979}                    /*** end of IndexMgr thread ***/
1980/*---------------------------------------------------------------------------*/
1981
1982#define NCOMMAND 13
1983/***********************************************************************
1984 * wave_serverV_config()  processes command file using kom.c functions  *
1985 *                       exits if any errors are encountered           *
1986 ***********************************************************************/
1987void wave_serverV_config(char *configfile)
1988{
1989  int      ncommand = NCOMMAND; /* # of required commands to process   */
1990  char     init[NCOMMAND]; /* init flags, one for each required command */
1991  int      nmiss;      /* number of required commands that were missed   */
1992  char    *comm;
1993  char    *str;
1994  int      nfiles;
1995  int      success;
1996  int      i;
1997  char     TankStructFile[FILENAMELEN],TankStructFile2[FILENAMELEN];
1998  int      RedundantTankStructFiles=0;
1999  int      RedundantIndexFiles=0;
2000
2001
2002  /* Initialization */
2003  TankStructFile[0]=0;
2004  TankStructFile2[0]=0;
2005
2006  SecondsBetweenQueueErrorReports=60;  /* default value of 60 sec. */
2007
2008  /* Set to zero one init flag for each required command
2009   *****************************************************/
2010  for( i=0; i<ncommand; i++ )  init[i] = 0;
2011
2012  /* Open the main configuration file
2013   **********************************/
2014  nfiles = k_open( configfile );
2015  if ( nfiles == 0 ) {
2016    logit ("e",
2017             "wave_serverV: Error opening command file <%s>; exiting!\n",
2018             configfile );
2019    exit( -1 );
2020  }
2021
2022  /* Process all command files
2023   ***************************/
2024  while(nfiles > 0)   /* While there are command files open */
2025  {
2026    while(k_rd())        /* Read next line from active file  */
2027    {
2028      comm = k_str();         /* Get the first token from line */
2029
2030      /* Ignore blank lines & comments
2031       *******************************/
2032      if( !comm )           continue;
2033      if( comm[0] == '#' )  continue;
2034
2035      /* Open a nested configuration file
2036       **********************************/
2037      if( comm[0] == '@' )
2038      {
2039        success = nfiles+1;
2040        nfiles  = k_open(&comm[1]);
2041        if ( nfiles != success ) {
2042          logit ("e",
2043                   "wave_serverV: Error opening command file <%s>; exiting!\n",
2044                   &comm[1] );
2045          exit( -1 );
2046        }
2047        continue;
2048      }
2049
2050      /* Process anything else as a command
2051       ************************************/
2052      /* Read the transport ring name
2053       ******************************/
2054      /*0*/    if( k_its( "RingName" ) )
2055      {
2056        if ( (str=k_str()) ) {
2057          strncpy(RingName,str,20);
2058          if ( (RingKey = GetKey(str)) == -1 ) {
2059            logit ("e",
2060                     "wave_serverV: Invalid ring name <%s>; exiting!\n",
2061                     str );
2062            exit( -1 );
2063          }
2064        }
2065        init[0] = 1;
2066      }
2067      /* Read the log file switch
2068       **************************/
2069      /*1*/    else if( k_its( "LogFile" ) )
2070      {
2071        LogSwitch = k_int();
2072        init[1] = 1;
2073      }
2074      /* Read wave_server's module id
2075       ******************************/
2076      /*2*/    else if( k_its( "MyModuleId" ) )
2077      {
2078        if ( (str=k_str()) ) {
2079          if ( GetModId( str, &MyModId ) != 0 ) {
2080            logit ("e",
2081                     "wave_serverV: Invalid module name <%s>; exiting!\n",
2082                     str );
2083            exit( -1 );
2084          }
2085        }
2086        init[2] = 1;
2087      }
2088
2089      /* Read the port for requests and replies
2090       ****************************************/
2091      /*3*/    else if( k_its( "ServerPort" ) )
2092      {
2093        ServerPort = k_int();
2094        init[3] = 1;
2095      }
2096      /* Read the Internet address of wave_server machine
2097       ***************************************************/
2098      /*4*/    else if( k_its( "ServerIPAdr" ) )
2099      {
2100        if ( (str = k_str()))
2101        {
2102          if (strlen(str) < IPADDRLEN)
2103          {
2104            strcpy( ServerIPAdr, str );
2105            init[4] = 1;
2106          }
2107          else
2108          {
2109            logit ("e", "ServerIPAdr name too long, max %d\n", IPADDRLEN-1);
2110            exit( -1 );
2111          }
2112        }
2113
2114      }
2115      /* Read heartbeat interval (seconds)
2116       ***********************************/
2117      /*5*/    else if( k_its("HeartBeatInt") )
2118      {
2119        HeartBeatInt = k_long();
2120        init[5] = 1;
2121      }
2122
2123      /* Read gap threshhold
2124       ***********************************/
2125      /*6*/    else if( k_its("GapThresh") )
2126      {
2127        GapThresh = k_val();
2128        init[6] = 1;
2129      }
2130
2131      /* Read index update duration
2132       **************************************/
2133      /*7*/    else if( k_its("IndexUpdate") )
2134      {
2135        IndexUpdate = k_int();
2136        init[7] = 1;
2137      }
2138
2139      /* Read tank struct update duration
2140       **************************************/
2141      /*8*/    else if( k_its("TankStructUpdate") )
2142      {
2143        TankStructUpdate = k_int();
2144        init[8] = 1;
2145      }
2146
2147      /* Maximum number of messages in outgoing circular buffer
2148       ********************************************************/
2149      /*9*/   else if( k_its("InputQueueLen") )
2150      {
2151        InputQueueLen = k_long();
2152        init[9] = 1;
2153      }
2154
2155      /* Timeout in milliseconds for IP Socket routines
2156       ***********************************/
2157      /*10*/   else if(k_its("SocketTimeout") )
2158      {
2159        SocketTimeoutLength = k_int();
2160        init[10]=1;
2161      }
2162
2163      /* Location of TankStruct File
2164       ***********************************/
2165      /*11*/   else if(k_its("TankStructFile") )
2166      {
2167        if(str=k_str())
2168        {
2169          if (strlen(str) < FILENAMELEN)
2170          {
2171            strcpy(TankStructFile, str);
2172            init[11]=1;
2173          }
2174          else
2175          {
2176            logit ("e", "wave_serverV: TankStructFile name too long, max is %d\n",
2177                    FILENAMELEN-1);
2178            exit(-1);
2179          }
2180        }
2181        else
2182        {
2183          logit ("e", "wave_serverV: TankStructFile name missing\n");
2184          exit( -1 );
2185        }
2186      }
2187
2188      /* Read Tank descriptor lines
2189       ****************************/
2190      /*12*/    else if( k_its("Tank") )
2191      {
2192        if( nTanks == 0)
2193        {
2194          Tanks=(TANK *)malloc(MAX_TANKS * sizeof(TANK));
2195          memset(Tanks,0,MAX_TANKS * sizeof(TANK));
2196        }
2197
2198        if( nTanks >= MAX_TANKS)
2199        {
2200          logit ("e", "wave_serverV: More than %d Tank descriptor lines. Exit\n",MAX_TANKS);
2201          exit(-1);
2202          logit ("e", "wave_serverV: Too many tanks in config file.  Maximum of %d Tanks permitted. Exit\n"
2203                ,MAX_TANKS);
2204          exit(-1);
2205
2206        }
2207        if(str=k_str()) strncpy(Tanks[nTanks].sta, str, TRACE2_STA_LEN);
2208        else    {
2209          logit ("e", "wave_serverV: bad station name\n");
2210          exit(-1);
2211        }
2212        if(str=k_str()) strncpy(Tanks[nTanks].chan, str, TRACE2_CHAN_LEN);
2213        else    {
2214          logit ("e", "wave_serverV: bad component name\n");
2215          exit(-1);
2216        }
2217        if(str=k_str()) strncpy(Tanks[nTanks].net, str, TRACE2_NET_LEN);
2218        else    {
2219          logit ("e", "wave_serverV: bad network name\n");
2220          exit(-1);
2221        }
2222        if(str=k_str()) strncpy(Tanks[nTanks].loc, str, TRACE2_LOC_LEN);
2223        else    {
2224          logit ("e", "wave_serverV: bad location name\n");
2225          exit(-1);
2226        }
2227
2228        Tanks[nTanks].recSize = k_int(); /* we'll chop the tank into such slots */
2229        if ( Tanks[nTanks].recSize % 4 != 0 ) {
2230          logit ("e", "wave_serverV: record size for <%s><%s><%s><%s> not a multiple of 4\n",
2231                 Tanks[nTanks].sta, Tanks[nTanks].chan, Tanks[nTanks].net,
2232                 Tanks[nTanks].loc);
2233          exit(-1);
2234        }
2235        if ( Tanks[nTanks].recSize > MAX_TRACEBUF_SIZ ) {
2236          logit ("e", "wave_serverV: record size for <%s><%s><%s><%s> is larger than MAX_TRACEBUF_SIZ <%d>\n",
2237                 Tanks[nTanks].sta, Tanks[nTanks].chan, Tanks[nTanks].net,
2238                 Tanks[nTanks].loc, MAX_TRACEBUF_SIZ );
2239          exit(-1);
2240        }
2241
2242        if( (str=k_str()) )
2243        {
2244          if( GetInst( str, &Tanks[nTanks].logo.instid ) != 0 )
2245          {
2246            logit ("e", "wave_serverV: Invalid installation name <%s>", str );
2247            exit( -1 );
2248          }
2249        }
2250        if( (str=k_str()) )
2251        {
2252          if( GetModId( str, &Tanks[nTanks].logo.mod ) != 0 )
2253          {
2254            logit ("e", "wave_serverV: Invalid module name <%s>", str );
2255            logit ("e", " in <GetWavesFrom> cmd; exiting!\n" ); exit( -1 );
2256          }
2257        }
2258
2259        if( GetType( "TYPE_TRACEBUF2", &Tanks[nTanks].logo.type ) != 0 )
2260        {
2261          logit ("e", "wave_serverV: PANIC: TYPE_TRACEBUF2 not recognized\n" );
2262          exit( -1 );
2263        }
2264
2265        Tanks[nTanks].tankSize = k_int()*1000000 ; /* user gives us tank size in megabytes */
2266        if(Tanks[nTanks].tankSize > MAX_TANK_SIZE)
2267        {
2268          if(bTruncateTanksTo1GBAndContinue)
2269          {
2270            logit("e", "wave_serverV: WARNING:  Tanksize for (%s:%s:%s:%s) is too large: %d bytes.\n"
2271                       "Tank truncated to maximum allowable size: %d bytes.\n",
2272                  Tanks[nTanks].tankSize,
2273                  Tanks[nTanks].sta, Tanks[nTanks].chan, Tanks[nTanks].net, Tanks[nTanks].loc,
2274                  MAX_TANK_SIZE);
2275
2276             /* limit tank size to largest multiple of recSize that is smaller than MAX_TANK_SIZE */
2277             Tanks[nTanks].tankSize =   (MAX_TANK_SIZE / Tanks[nTanks].recSize)
2278                                      * Tanks[nTanks].recSize ;
2279          }
2280          else
2281          {
2282            logit("et", "wave_serverV: ERROR:  Tanksize for (%s:%s:%s:%s) is too large: %d bytes.\n"
2283                       "Tanks must be <=  %d bytes.  EXITING!\n",
2284                  Tanks[nTanks].tankSize,
2285                  Tanks[nTanks].sta, Tanks[nTanks].chan, Tanks[nTanks].net, Tanks[nTanks].loc,
2286                  MAX_TANK_SIZE);
2287            exit(-1);
2288          }
2289        }
2290        else {
2291             /* RL set tanksize to multiple of recsize to avoid ReadBlockData errors */
2292             Tanks[nTanks].tankSize = (Tanks[nTanks].tankSize / Tanks[nTanks].recSize) * Tanks[nTanks].recSize;
2293        }
2294
2295        Tanks[nTanks].indxMaxChnks = k_int(); /*number of segments, not byte count */
2296        Tanks[nTanks].nRec = Tanks[nTanks].tankSize / Tanks[nTanks].recSize;  /* how many records can we store */
2297        Tanks[nTanks].tankSize = Tanks[nTanks].recSize * Tanks[nTanks].nRec;
2298        Tanks[nTanks].datatype[0]=0;
2299
2300
2301        /* When do we open FILE Ptrs, maybe in ConfigTank() */
2302
2303        if(str=k_str())
2304        {
2305          if (strlen(str) < MAX_TANK_NAME)
2306          {
2307            strncpy(Tanks[nTanks].tankName, str,MAX_TANK_NAME);
2308          }
2309          else
2310          {
2311            logit ("e", "wave_serverV: tank name too long; max is %d\n",
2312                    MAX_TANK_NAME-1);
2313            exit( -1 );
2314          }
2315        }
2316        else
2317        {
2318          logit ("e", "wave_serverV: bad tank name\n");
2319          exit(-1);
2320        }
2321
2322        init[12] = 1;
2323        nTanks++;
2324      }
2325
2326      /* Read debug flag - optional
2327       ****************************/
2328      else if( k_its("Debug") ){
2329        Debug =  k_int();
2330      }
2331
2332      /* Optional cmd: ClientTimeout */
2333      else if(k_its("ClientTimeout") ) {
2334        ClientTimeout = k_int();
2335        if (ClientTimeout <= 0 && ClientTimeout != -1)
2336        {
2337          logit ("e", "wave_serverV: bad ClientTimeout value <%d>, using -1\n",
2338                  ClientTimeout);
2339          ClientTimeout = -1;
2340        }
2341      }
2342
2343      /* Optional cmd: Turn on socket debug logging -> BIG LOG FILES!!
2344       ***********************************/
2345      else if(k_its("SocketDebug") ) {
2346        SOCKET_ewDebug = k_int();
2347      }
2348
2349      /* Optional cmd: RedundantTankStructFiles
2350         Should we use redundant files for saving Tank Structs
2351      ***********************************/
2352      else if(k_its("RedundantTankStructFiles") )
2353      {
2354        RedundantTankStructFiles=k_int();
2355      }
2356
2357      /* Optional cmd: RedundantIndexFiles
2358         Should we use redundant files for saving Indexes
2359      ***********************************/
2360      else if(k_its("RedundantIndexFiles") )
2361      {
2362        RedundantIndexFiles=k_int();
2363      }
2364
2365
2366      /* Optional:  Location of second TankStruct File
2367         for use only with redundant tank struct files
2368      ***********************************/
2369      else if(k_its("TankStructFile2") )
2370      {
2371        if(str=k_str())
2372        {
2373          if (strlen(str) < FILENAMELEN)
2374          {
2375            strcpy(TankStructFile2, str);
2376          }
2377          else
2378          {
2379            logit ("e", "wave_serverV: TankStructFile2 name too long, max is %d\n",
2380                    FILENAMELEN-1);
2381            exit( -1 );
2382          }
2383        }
2384        else
2385        {
2386          logit ("e", "wave_serverV: bad TankStructFile2 name\n");
2387        }
2388      }
2389
2390      /* Optional cmd: PleaseContinue
2391         Should we continue on Tank failures,
2392         providing that some tanks are still functioning
2393         (Do your best)
2394      ***********************************/
2395      else if(k_its("PleaseContinue") )
2396      {
2397        PleaseContinue=k_int();
2398      }
2399
2400      /* Optional cmd: MaxMsgSize
2401         In case trace_buf messages could be larger than any in this config file
2402      ********************************/
2403      else if (k_its("MaxMsgSize") )
2404      {
2405        MaxMsgSiz=k_int();
2406      }
2407
2408      /* Optional cmd: ReCreateBadTanks
2409         In case the operator desires to have bad tanks re-created
2410         from scratch.
2411      ********************************/
2412      else if (k_its("ReCreateBadTanks") )
2413      {
2414        ReCreateBadTanks=k_int();
2415      }
2416
2417
2418      /* Optional cmd: SecondsBetweenQueueErrorReports
2419         To set the minimum period of time between error reports
2420         to statmgr due to the internal message queue being lapped,
2421         and thus messages being lost.
2422      ********************************/
2423      else if (k_its("SecondsBetweenQueueErrorReports") )
2424      {
2425        SecondsBetweenQueueErrorReports=k_int();
2426      }
2427
2428
2429      /* Optional cmd: MaxServerThreads
2430         To set the maximum number of server threads to be created
2431         to handle client requests.
2432      ********************************/
2433      else if (k_its("MaxServerThreads") )
2434      {
2435        MaxServerThreads=k_int();
2436      }
2437
2438
2439      /* Optional cmd: QueueReportInterval
2440         To set the minimum number of seconds between
2441         reports on internal queue high and low water marks.
2442      ********************************/
2443      else if (k_its("QueueReportInterval") )
2444      {
2445        QueueReportInterval=k_int();
2446      }
2447
2448
2449      /* Optional cmd: RedundantIndexFiles
2450         Should we use redundant files for saving Indexes
2451      ***********************************/
2452      else if(k_its("AbortOnSingleTankFailure") )
2453      {
2454        bAbortOnSingleTankFailure=k_int();
2455      }
2456
2457      /* Optional cmd: RedundantIndexFiles
2458         Should we use redundant files for saving Indexes
2459      ***********************************/
2460      else if(k_its("TruncateTanksTo1GBAndContinue") )
2461      {
2462        bTruncateTanksTo1GBAndContinue=TRUE;
2463      }
2464
2465          /* ronb040307 - Optional cmd: UseSequenceDb
2466           Use packet db to store/retrieve out of order Packets
2467           ********************************/
2468          else if(k_its("UsePacketSyncDb")) {
2469                bUsePacketSyncDb = k_int();
2470          }
2471         
2472          /* get database file name from config file ronb040307*/
2473          else if(k_its("PacketSyncDbFile")) {
2474                strncpy(sPacketSyncDbFile, k_str(), TB_MAX_DB_NAME);
2475          }
2476
2477          /* get priority for out of sync data overlapping tank data ronb041207*/
2478          else if(k_its("TankOverSyncDbData")) {
2479                bTankOverSyncDbData = k_int();
2480          }
2481
2482          /* get number of epoch seconds worth of data to retain in
2483          database. ronb041207*/
2484          else if(k_its("PurgePacketSyncDb")) {
2485                bPurgePacketSyncDb = k_int();
2486          }
2487 
2488    /* Command is not recognized
2489       ****************************/
2490      else
2491      {
2492        logit ("e", "wave_serverV: <%s> unknown command in <%s>.\n",
2493                comm, configfile );
2494        continue;
2495      }
2496
2497
2498
2499      /* See if there were any errors processing the command
2500       *****************************************************/
2501      if( k_err() ) {
2502        logit ("e",
2503                 "wave_serverV: Bad <%s> command in <%s>; exiting!\n",
2504                 comm, configfile );
2505        exit( -1 );
2506      }
2507    }
2508    nfiles = k_close();
2509  }
2510  /* After all files are closed, check init flags for missed commands
2511   ******************************************************************/
2512  nmiss = 0;
2513  for ( i=0; i < ncommand; i++ )  if( !init[i] ) nmiss++;
2514  if ( nmiss ) {
2515    logit ("e", "wave_serverV: ERROR, no " );
2516    if ( !init[0] )  logit ("e", "<RingName> "     );
2517    if ( !init[1] )  logit ("e", "<LogFile> "      );
2518    if ( !init[2] )  logit ("e", "<MyModuleId> "   );
2519    if ( !init[3] )  logit ("e", "<ServerPort> "   );
2520    if ( !init[4] )  logit ("e", "<ServerIPAdr> "  );
2521    if ( !init[5] )  logit ("e", "<HeartBeatInt> " );
2522    if ( !init[6] )  logit ("e", "<GapThresh> "    );
2523    if ( !init[7] )  logit ("e", "<IndexUpdate> "  );
2524    if ( !init[8] )  logit ("e", "<TankStructUpdate> "   );
2525    if ( !init[9] )  logit ("e", "<InputQueueLen> "      );
2526    if ( !init[10])  logit ("e", "<SocketTimeoutLength> ");
2527    if ( !init[11])  logit ("e", "<TankStructFile> "     );
2528    if ( !init[12])  logit ("e", "<Tank> "               );
2529    logit ("e", "command(s) in <%s>; exiting!\n", configfile );
2530    exit( -1 );
2531  }
2532
2533  /* Real quick: define duties of config relative to indexing:
2534     read in the config parameters.
2535     When we find the number of tanks, malloc() a list for the tanks, and
2536     then as the tanks are read in, add each tank to the list.  Call
2537     InitTankList() to create a TANKFilePtr for the list, to copy the location
2538     of the tank array, to indicate redundant index or TANKstruct files, to set
2539     the number of tanks.
2540
2541     We will set the file ptrs for each tank index when we config each tank
2542  */
2543
2544  if(InitTankList(&pConfigTankList,Tanks,
2545                  RedundantTankStructFiles,
2546                  RedundantIndexFiles,nTanks,
2547                  TankStructFile,TankStructFile2) )
2548  {
2549    /* This is not good!!, we could not init the list.
2550       Quit and go home */
2551    logit ("e", "ERROR:  InitTankList failed!!!\nFailure Terminal, Exiting\n");
2552    exit(-1);
2553  }
2554
2555  /* At this point, we have a working configured tank list, generated from
2556     the config file, with the exception that no files are open. DK
2557  */
2558
2559  return;
2560}
2561/*---------------------------------------------------------------------------*/
2562/***************************************************************************
2563 *  wave_serverV_lookup( )  Look up important info from earthworm.h tables  *
2564 ***************************************************************************/
2565void wave_serverV_lookup( void )
2566{
2567  if ( GetLocalInst( &InstId ) != 0 ) {
2568    fprintf( stderr,
2569             "wave_serverV: error getting local installation id; exiting!\n" );
2570    exit( -1 );
2571  }
2572  if( GetType( "TYPE_HEARTBEAT", &TypeHeartBeat ) != 0 ) {
2573    fprintf( stderr,
2574             "wave_serverV: Invalid message type <TYPE_HEARTBEAT>; exiting!\n" );
2575    exit( -1 );
2576  }
2577  if( GetType( "TYPE_ERROR", &TypeError ) != 0 ) {
2578    fprintf( stderr,
2579             "wave_serverV: Invalid message type <TYPE_ERROR>; exiting!\n" );
2580    exit( -1 );
2581  }
2582  if( GetType( "TYPE_TRACEBUF2", &TypeWaveform ) != 0 ) {
2583    fprintf( stderr,
2584             "wave_serverV: Invalid message type <TYPE_TRACEBUF2>; exiting!\n" );
2585    exit( -1 );
2586  }
2587  return;
2588}
2589/*---------------------------------------------------------------------------*/
2590
2591/******************************************************************************
2592 * wave_serverV_status() builds a heartbeat or error message & puts it into    *
2593 *                     shared memory.  Writes errors to log file & screen.    *
2594 ******************************************************************************/
2595void wave_serverV_status( unsigned char type, short ierr, char *note )
2596{
2597  MSG_LOGO    logo;
2598  char        msg[256];
2599  long        size;
2600  time_t      t;
2601
2602  /* Build the message
2603   *******************/
2604  logo.instid = InstId;
2605  logo.mod    = MyModId;
2606  logo.type   = type;
2607
2608  time( &t );
2609
2610  if( type == TypeHeartBeat )
2611  {
2612    sprintf( msg, "%ld %ld\n\0", (long) t, (long) myPid);
2613  }
2614  else if( type == TypeError )
2615  {
2616    sprintf( msg, "%ld %hd %s\n\0", (long) t, ierr, note);
2617    logit( "et", "wave_serverV: %s\n", note );
2618  }
2619
2620  size = strlen( msg );   /* don't include the null byte in the message */
2621
2622  /* Write the message to shared memory
2623   ************************************/
2624  if( tport_putmsg( &Region, &logo, size, msg ) != PUT_OK )
2625  {
2626    if( type == TypeHeartBeat ) {
2627      logit("et","wave_serverV:  Error sending heartbeat.\n" );
2628    }
2629    else if( type == TypeError ) {
2630      logit("et","wave_serverV:  Error sending error:%d.\n", ierr );
2631    }
2632  }
2633
2634  return;
2635}
2636
2637
2638/*-----------------------------------------------------------------------*/
2639/***********************************************************************
2640 * UpdateIndex() Updates the memmory index list with a new sample of   *
2641 *  trace data.                                                        *
2642 ***********************************************************************/
2643int UpdateIndex(TANK * pTSPtr, TRACE2_HEADER * pCurrentRecord,
2644                unsigned int CurrentOffset, int CheckOverwrite)
2645{
2646  /*
2647     Purpose:
2648     Updates the memmory index list with a new sample of trace data.
2649
2650     Arguments:
2651         pTSPtr: pointer to the TANK structure we are dealing with
2652         pCurrentRecord: pointer to the trace record that was just
2653            copied into the tank
2654         CurrentOffset: the current offset to the tank file, updated after
2655            the current record was inserted.
2656         CheckOverwrite: flag to update the index for the old record that
2657            was overwritten by the current record. This check is needed
2658            when wave_serverV is running; unneccesary when UpdateIndex is
2659            called during wave_serverV startup.
2660
2661     Return Values:
2662     0: Successful update.
2663     -1: Error occured during update.
2664
2665     Functions Called:
2666     (not analyzed)
2667
2668     Additional Remarks:
2669     Apologia: we resort to making a special case of the first write
2670     (and the first pass)to keep the initialization logic from becoming
2671     too baroque and hard to debug. We gotta ship this
2672  */
2673
2674  char scrMsg[MAX_TRACEBUF_SIZ];
2675  static char * MyFuncName = "UpdateIndex()";
2676  int RetVal;
2677  DATA_CHUNK *dc;
2678
2679
2680  if(Debug > 2)
2681  {
2682    logit("t","Entering %s\n",MyFuncName);
2683    logit("et", "S %s,C %s,N %s,L %s,StartTime %f,EndTime %f,CurrentOffset %u\n",
2684          pCurrentRecord->sta,pCurrentRecord->chan,pCurrentRecord->net,
2685          pCurrentRecord->loc,
2686          pCurrentRecord->starttime,pCurrentRecord->endtime,CurrentOffset
2687          );
2688  }
2689
2690  /* Special case: first write ever
2691   ********************************/
2692  if ( pTSPtr->firstWrite == 1 )
2693  {
2694    /* CHECK:  This assumes that initialization of the index sets
2695       IndexYoungest, and probably IndexOldest, to valid places, with
2696       memory allocated at those places  DK
2697    */
2698    if(Debug > 2)
2699      logit("et","UpdateIndex: First Write\n");
2700
2701    dc=IndexYoungest(pTSPtr);
2702    dc->tStart = pCurrentRecord->starttime;
2703    dc->tEnd   = pCurrentRecord->endtime;
2704
2705    /* First write is always at beginning */
2706    IndexYoungest(pTSPtr)->offset = 0;
2707
2708    /* Move the data type, pin number, and sampling rate */
2709    strncpy(pTSPtr->datatype,  pCurrentRecord->datatype,  3);
2710    pTSPtr->samprate   = pCurrentRecord->samprate;
2711    pTSPtr->pin   = pCurrentRecord->pinno;
2712
2713    pTSPtr->firstWrite =0; /* no longer the first write */
2714
2715    /* All done. Release this tank */
2716  } /* End if first write */
2717  else  /* Not first write */
2718  {
2719
2720    /* Update index's newest time
2721     *****************************/
2722    /* philosophy: we just wrote a bunch of data points to the tank. If
2723       the oldest of those is  more than one and a half sample intervals
2724       (looks like 1.0 now) younger than the youngest we had before this
2725       write, we declare a gap has occurred. We create a new chunk in the
2726       index. We assume the insertion point has not been updated yet
2727    */
2728
2729    if(Debug > 2)
2730      logit("et","UpdateIndex: Normal Write\n");
2731
2732    /* Is there a Gap
2733     ********************************/
2734    if( IndexYoungest(pTSPtr)->tEnd +
2735        GapThresh*(1.0/(pCurrentRecord->samprate)) <
2736        pCurrentRecord->starttime )
2737    {
2738      /* Add a new index entry
2739       ********************************/
2740      if(Debug > 2)
2741        logit("et","UpdateIndex:  adding index, start,finish: %u,%u\n",
2742              pTSPtr->indxStart,pTSPtr->indxFinish);
2743
2744      if(( RetVal = IndexAdd( pTSPtr, pCurrentRecord->starttime,
2745                              pCurrentRecord->endtime,
2746                              CurrentOffset            )) != 0)
2747      {
2748        /* Davidk 9/8/98 */
2749        /*  We have a problem, either overwrote an index, or
2750            something really serious happened. */
2751        if(RetVal > 0)
2752        {
2753          /* we're out of index entires*/
2754          if(pTSPtr->lappedIndexEntries == 0)
2755          {
2756            sprintf(Text,"Ran out of free indexes.  Overwriting "
2757                    "oldest indexes in tank: %s,%s,%s,%s\n",
2758                    pTSPtr->sta,pTSPtr->chan,pTSPtr->net,pTSPtr->loc);
2759            wave_serverV_status(TypeError, ERR_OVERWRITE_INDEX, Text);
2760          }
2761          pTSPtr->lappedIndexEntries++;
2762          /* goto abortUpdateIndex; */
2763          /* Overwritten index entries is a problem handled
2764             internally by this function.  It provides no
2765             notice to its caller that something has gone
2766             wrong.  This is to prevent adverse logging.
2767             When an index is overwritten, everything still
2768             completes successfuly with the transaction, the
2769             problem is that old data has now been overwritten.
2770             Thus the update did not fail, it just didn't go
2771             as well as planned.  DavidK 9/8/98
2772          */
2773        }
2774        else
2775        {
2776          /* This is bad.  Something internal went kaboom.
2777             Something is corrupted in the tank.  Scream
2778             for help!!!!! and then abort.
2779          **********************************************/
2780          sprintf(Text,"Corruption problems in memory tank structure"
2781                  "for tank: %s,%s,%s,%s\n",
2782                  pTSPtr->sta,pTSPtr->chan,pTSPtr->net,pTSPtr->loc);
2783          wave_serverV_status(TypeError, ERR_TANK_CORRUPTION, Text);
2784          goto abortUpdateIndex;
2785        }
2786      }
2787
2788      else /* IndexAdd() completed successfully */
2789      {
2790        if(pTSPtr->lappedIndexEntries != 0)
2791        {
2792          sprintf(Text,"wave_serverV:  Found Free Index.  No longer overwriting"
2793                  " free indexes.  Overwrote %u "
2794                  "indexes in tank: %s,%s,%s,%s\n",
2795                  pTSPtr->lappedIndexEntries,
2796                  pTSPtr->sta,pTSPtr->chan,pTSPtr->net,pTSPtr->loc);
2797          wave_serverV_status(TypeError, ERR_RECOVER_OVERWRITE_INDEX, Text);
2798          pTSPtr->lappedIndexEntries=0;
2799        }
2800      }
2801
2802      if(Debug > 2)
2803        logit("et","UpdateIndex:  after adding index, start,finish: %u,%u\n",
2804              pTSPtr->indxStart,pTSPtr->indxFinish);
2805
2806    }
2807    else  /* No Gap */
2808      /* if a new chunk was not created by this write, we update the index by re-writing the
2809         end time of the youngest chunk */
2810    {
2811      IndexYoungest(pTSPtr)->tEnd = pCurrentRecord->endtime;
2812      if(Debug > 2)
2813        logit("et","UpdateIndex:  extending index, new endtime %f,%f\n",
2814              pCurrentRecord->endtime,IndexYoungest(pTSPtr)->tEnd);
2815
2816    }
2817
2818    if( pTSPtr->firstPass != 1 && CheckOverwrite)
2819    {
2820      /* Update index's oldest chunk.
2821         Concern is whether we just
2822         wiped out an old chunk
2823      ******************************/
2824
2825      /* for now, brute force: read it out of the file. If that's too
2826         much disc action, we could compute it from the index - somehow,
2827         some day
2828      */
2829
2830      if(Debug > 2)
2831        logit("et","Update Index: Not first pass, doing other junk\n");
2832
2833
2834      /* seek to the current position.  This is neccessary between reads and writes */
2835      if( fseek( pTSPtr->tfp, pTSPtr->inPtOffset, SEEK_SET ) != 0 )
2836      {
2837        logit( "e", "wave_serverV: error on fseek on tank %s; exiting!\n",pTSPtr->tankName );
2838        goto abortUpdateIndex;
2839      }
2840      if( fread( (void *)scrMsg, pTSPtr->recSize, 1, pTSPtr->tfp ) != 1 )
2841      {
2842        logit( "e", "wave_serverV: error on fread tank %s; exiting!\n",pTSPtr->tankName );
2843        goto abortUpdateIndex;
2844      }
2845      /* We peeked ahead into the tank, and picked up the oldest message into scrMsg */
2846
2847      /* If we just wiped out a chunk, get rid of its index entry
2848       ***********************************************************/
2849      if( ((TRACE2_HEADER*)scrMsg)->starttime > IndexOldest(pTSPtr)->tEnd )
2850      {
2851        IndexDel(pTSPtr,IndexOldest(pTSPtr));
2852
2853        if (Debug > 2)
2854          logit("","After deletion:\n");
2855      }
2856      else
2857      {
2858        IndexOldest(pTSPtr)->tStart = ((TRACE2_HEADER*)scrMsg)->starttime;
2859        IndexOldest(pTSPtr)->offset = pTSPtr->inPtOffset;
2860      }
2861    } /* End !FirstPass */
2862  }  /* End else !FirstWrite */
2863
2864  if(Debug > 2)
2865    logit("t","Exiting %s\n",MyFuncName);
2866
2867  return(0);
2868
2869 abortUpdateIndex:
2870  {
2871    if(Debug > 2)
2872      logit("t","Exiting %s\n",MyFuncName);
2873  }
2874  return(-1);
2875}  /* End UpdateIndex */
2876
2877
2878/*****************************************************************************/
2879/* int ReportServerThreadStatus() */
2880/*****************************************************************************/
2881int ReportServerThreadStatus(void)
2882{
2883  int i;
2884  char StatusString[8];
2885
2886  logit("","  Clnts Accp    Clnts Prcsd   Reqs Prcsd    Errors    Status  \n");
2887  logit("","==============================================================\n");
2888  for(i=0;i<MaxServerThreads;i++)
2889  {
2890    switch (ServerThreadInfo[i].Status)
2891    {
2892    case SERVER_THREAD_IDLE:      strcpy(StatusString," IDLE "); break;
2893    case SERVER_THREAD_COMPLETED: strcpy(StatusString,"CMPLTD"); break;
2894    case SERVER_THREAD_ERROR:     strcpy(StatusString," ERROR"); break;
2895    case SERVER_THREAD_WAITING:   strcpy(StatusString,"WTG4CL"); break;
2896
2897    default :
2898      {
2899        if(!memcmp(&(ServerThreadInfo[i]),&(ServerThreadInfoPrevious[i]),
2900                   sizeof(ServerThreadInfoStruct))
2901           )
2902        {
2903          /* We are busy and nothing has changed since the last check.
2904             We are not waiting for the client.
2905             I think the thread is hung!!!!
2906          */
2907          strcpy(StatusString," HUNG ");
2908          logit("et","ServerThread %d appears to be hung in state %d.\n",
2909                i,ServerThreadInfo[i].Status);
2910        }
2911        else
2912        {
2913          strcpy(StatusString," BUSY ");
2914        }
2915      }
2916    } /* End of Switch() */
2917
2918    logit("",
2919          "%2d  %6d         %6d       %6d      %6d    %s\n",i,
2920          ServerThreadInfo[i].ClientsAccepted,
2921          ServerThreadInfo[i].ClientsProcessed,
2922          ServerThreadInfo[i].RequestsProcessed,
2923          ServerThreadInfo[i].Errors,
2924          StatusString);
2925  }  /* end for (each server thread) loop */
2926
2927  /* Copy the current ServerThreadInfo to ServerThreadInfoPrevious. */
2928  memcpy(ServerThreadInfoPrevious,ServerThreadInfo,
2929         MaxServerThreads * sizeof(ServerThreadInfoStruct));
2930
2931  return(0);
2932}  /* ReportServerThreadStatus */
2933
2934TANK * FindSCNL(TANK * Tanks, int nTanks, char * sta, char * chan, char * net,
2935                char * loc)
2936{
2937  TANK TempTank;
2938
2939  /* Use CRTLIB func bsearch() to
2940     find SCN.  Davidk 10/5/98
2941  **********************************/
2942
2943  strcpy(TempTank.sta,sta);
2944  strcpy(TempTank.chan,chan);
2945  strcpy(TempTank.net,net);
2946  strcpy(TempTank.loc,loc);
2947
2948  return((TANK *) bsearch(&TempTank, Tanks, nTanks,
2949                          sizeof(TANK), CompareTankSCNLs));
2950}  /* End FindSCNL() */
2951
2952/**************************************************************
2953 *  signal_hdlr: sets the terminate flag for orderly shutdown *
2954 **************************************************************/
2955void signal_hdlr(int signum)
2956{
2957  terminate = 1;                 /* set flag to terminate program */
2958}
2959
2960
2961int wave_serverV_c_init()
2962{
2963  logit("t","wave_serverV.c:Version %s\n", WSV_VERSION);
2964  return(0);
2965}
2966
2967int IssueIOStatusError(char * szFunction, int iOffset,
2968                       int iRecordSize, char * szTankName)
2969{
2970  sprintf(Text, "%s failed for tank [%s], with offset[%d] "
2971          "and record size[%d]! errno[%d]\nError: %s\n",
2972          szFunction, szTankName, iOffset,
2973          iRecordSize, errno, strerror(errno));
2974
2975  wave_serverV_status( TypeError, ERR_FILE_IO, Text );
2976  return(0);
2977}
2978
2979int TraceBufIsValid(TANK * pTSPtr, TRACE2_HEADER * pCurrentRecord)
2980{
2981    /* check that time goes forward within the tracebuf */
2982    if(pCurrentRecord->starttime > pCurrentRecord->endtime) {
2983        if (Debug > 1)
2984            logit("e", "TraceBufIsValid: TB start time not before end time\n");
2985        return(FALSE);
2986    }
2987
2988    /* check that there are samples in the tracebuf */
2989    if(pCurrentRecord->nsamp <= 0) {
2990        if (Debug > 1)
2991            logit("e", "TraceBufIsValid: no samples\n");
2992        return(FALSE);
2993    }
2994    /* added check for datatype! */
2995    if (strcmp(pCurrentRecord->datatype, "i4") == 0 ||
2996        strcmp(pCurrentRecord->datatype, "i2") == 0 ||
2997        strcmp(pCurrentRecord->datatype, "s2") == 0 ||
2998        strcmp(pCurrentRecord->datatype, "s4") == 0 ) {
2999        /* do nothing here for now, its good */
3000    } else {
3001        logit("e", "TraceBufIsValid: bad datatype found in packet '%s'\n", pCurrentRecord->datatype);
3002        return(FALSE);
3003    }
3004
3005    /* if the tracebuf internals look OK, and this is the first tracebuf,
3006       then send it on it's merry way. */
3007    if ( pTSPtr->firstWrite == 1 )
3008        return(TRUE);
3009
3010    /* check that time goes forward between the previous tracebuf and this
3011       one */
3012
3013    if((pCurrentRecord->starttime < IndexYoungest(pTSPtr)->tEnd) && !bUsePacketSyncDb) {
3014        if (Debug > 1)
3015              logit("e", "TraceBufIsValid: packet time not advancing\n");
3016            return(FALSE);
3017     }
3018
3019    /* check samprate ???? */
3020
3021    /* it passed all the tests, so pass it on. */
3022    return(TRUE);
3023
3024}  /* end TraceBufIsValid() */
Note: See TracBrowser for help on using the repository browser.