source: trunk/src/archiving/wave_serverV/wave_serverV.c @ 3010

Revision 3010, 104.7 KB checked in by paulf, 12 years ago (diff)

minor change to add in version number as #define, and update ver so it is clear from the logs which is running

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1
2/*
3 *   THIS FILE IS UNDER RCS - DO NOT MODIFY UNLESS YOU HAVE
4 *   CHECKED IT OUT USING THE COMMAND CHECKOUT.
5 *
6 *    $Id$
7 *
8 *    Revision history:
9 *     $Log$
10 *     Revision 1.38  2007/05/31 16:11:44  paulf
11 *     minor change to add in version number as #define, and update ver so it is clear from the logs which is running
12 *
13 *     Revision 1.37  2007/05/29 13:42:37  paulf
14 *     patched some checks in checking data_type and also some memory alignment issues in calls to TraceBufIsValid()
15 *
16 *     Revision 1.36  2007/03/28 18:02:34  paulf
17 *     fixed flags for MACOSX and LINUX
18 *
19 *     Revision 1.35  2007/02/26 14:47:38  paulf
20 *     made sure time_t are casted to long for heartbeat sprintf()
21 *
22 *     Revision 1.34  2007/02/23 15:24:33  paulf
23 *     fixed long to time_t declaration
24 *
25 *     Revision 1.33  2007/02/19 20:45:38  stefan
26 *     tweak on last revision
27 *
28 *     Revision 1.32  2007/02/19 20:43:36  stefan
29 *     patch for gcc 3.3.6 else if defined structure per Matt.VanDeWerken@csiro.au
30 *
31 *     Revision 1.31  2006/10/23 21:20:29  paulf
32 *     updated for linux makes
33 *
34 *     Revision 1.30  2006/09/05 13:46:52  hal
35 *     added _LINUX ifdefs so that signals under linux are handled the same as under solaris
36 *
37 *     Revision 1.29  2006/07/20 16:18:29  stefan
38 *     ifdef fixes for Linux
39 *
40 *     Revision 1.28  2006/07/11 22:28:53  stefan
41 *     The problem was that the size of the tankfile is not an integer multiple
42 *     of the packet size and so occasionally there is a mess up at the end of
43 *     the file (mine were 3 in this example).  The result was that the data
44 *     requested from the server was not returned and an event file failed to
45 *     be written.
46 *     [Fix was to add an else to wave_serverV_config]
47 *     Richard R Luckett, BGS
48 *
49 *     Revision 1.27  2006/03/27 17:16:49  davek
50 *     Updated the wave_serverV version and timestamp.
51 *
52 *     Revision 1.26  2006/03/27 17:12:07  davek
53 *     Added code to check the return value from call to WaveMsg2MakeLocal(),
54 *     and reject packets which for which WaveMsg2MakeLocal() reported and error.
55 *
56 *     Changed a >= to a > comparison in the packet filtering logic that was causing
57 *     wave_serverV to reject tracebufs that only contained 1 sample.
58 *
59 *     Revision 1.25  2005/07/21 21:03:41  friberg
60 *     added in one _LINUX ifdef directive for sin_addr struct
61 *
62 *     Revision 1.24  2005/04/21 22:48:46  davidk
63 *     Updated the version timestamp.  (It had not been properly updated in some time.)
64 *     Current version supports SCNL menu protocol adjustment.
65 *
66 *     Revision 1.23  2005/04/07 15:23:48  davidk
67 *     Separated the signal handling logic for Solaris(UNIX) and Windows, due
68 *     to problems using the UNIX logic in wave servers started from a
69 *     Windows Service.
70 *     Wave server should behave the same as before except when started from
71 *     a service.  Now the service based wave server should not shutdown when
72 *     users log in/out.
73 *     Added call to WIN32 API SetConsoleCtrlHandler() to setup a windows signal
74 *     handler to handle close messages, while using the default service logic to
75 *     ignore logoff messages.
76 *
77 *     Revision 1.22  2005/03/17 17:37:39  davidk
78 *     Changes to enforce a maximum tanksize of 1GB.
79 *     Added code to enforce 1GB limits on wave_server tanks.
80 *     If TruncateTanksTo1GBAndContinue is set in the config file,
81 *     then tanks listed in the config file at >= 1GB will be truncated to
82 *     just under 1GB, and wave server operation will continue normally.
83 *     If TruncateTanksTo1GBAndContinue is not set,
84 *     wave server will issue an Error and EXIT if it encounters a tank
85 *     >= 1GB in the config file.
86 *     This change was made because a bug was discovered that limits
87 *     the safe limit of a wave server tank file to 1GB.
88 *
89 *     Revision 1.21  2004/09/14 16:23:33  davidk
90 *     Fixed bug in main() function, where bAbortMainLoop flag was never
91 *     initialized.  Initialized bAbortMainLoop to FALSE.
92 *     Update wave_serverV.c timestamp.
93 *
94 *     Revision 1.20  2004/06/10 23:14:09  lombard
95 *     Fixed logging of invalid packets
96 *     Fixed compareSCNL function.
97 *
98 *     Revision 1.19  2004/06/09 21:35:58  lombard
99 *     Added code to validate a trace_buf packet before it gets into the tanks.
100 *     Added comments about GETPIN request not working correctly.
101 *
102 *     Revision 1.18  2004/05/18 22:30:19  lombard
103 *     Modified for location code
104 *
105 *     Revision 1.17  2003/04/15 18:48:27  dhanych
106 *     replaced commented changes with just comments
107 *
108 *     Revision 1.16  2003/04/14 19:13:58  dhanych
109 *     made changes to fix multi-home bug (removed gethomebyaddr)
110 *
111 *     Revision 1.15  2001/10/03 21:17:02  patton
112 *     Made logit changes due to new logit code.
113 *     Disk logging is now hardcoded to on
114 *     during config file reading, and set
115 *     to the user's choice afterwards.
116 *     JMP 10/3/2001.
117 *
118 *     Revision 1.14  2001/08/10 21:49:09  dietz
119 *     changed tport_getmsg to tport_copyfrom so we can distinguish between
120 *     sequence gaps and truly missed msgs in the transport ring.
121 *
122 *     Revision 1.13  2001/06/29 22:23:59  lucky
123 *     Implemented multi-level debug scheme where the user can specify how detailed
124 *     (and large) the log files should be. If no Debug is specified, only
125 *     errors are reported and logged.
126 *
127 *     Revision 1.12  2001/06/18 23:43:46  alex
128 *     *** empty log message ***
129 *
130 *     Revision 1.11  2001/05/08 00:09:47  dietz
131 *     Changed to shut down gracefully if the transport flag is
132 *     set to TERMINATE or MyPid.
133 *
134 *     Revision 1.10  2001/01/24 00:01:06  dietz
135 *     *** empty log message ***
136 *
137 *     Revision 1.9  2001/01/20 02:26:39  davidk
138 *     resolved a bug where multiple confusing timestamps were issued for wave_serverV.c
139 *
140 *     Revision 1.8  2001/01/18 02:26:59  davidk
141 *     Added ability to issue status messages from server_thread.c,
142 *     which entailed the following changes:
143 *     1. Changed status message constants and vars from static to
144 *     full globals so that status messages could be issued from
145 *     other files.
146 *     2. Moved #define constants for status message types to wave_serverV.h
147 *     so that they can be used by all .c files.
148 *     3. Moved wave_serverV_status() prototype to wave_serverV.h so
149 *     that other .c files can issue status messages.
150 *
151 *     Changed the timestamp.
152 *
153 *     Revision 1.6  2001/01/08 22:06:07  davidk
154 *     Modified portion of wave_server that handles client requests,
155 *     so that it replies to client when an error occurs while handling
156 *     the request.(Before, no response was given to the client when an
157 *     error occured.)  Added flags FC,FN, and FB.  Moved the wave_server
158 *     version logging into a separate function, such as the one used in
159 *     serve_trace.c.
160 *
161 *     Revision 1.5  2000/07/24 18:46:58  lucky
162 *     Implemented global limits to module, installation, ring, and message type strings.
163 *
164 *     Revision 1.4  2000/07/08 19:01:07  lombard
165 *     Numerous bug fies from Chris Wood
166 *
167 *     Revision 1.3  2000/07/08 16:56:21  lombard
168 *     clean up logging for server thread status
169 *
170 *     Revision 1.2  2000/06/28 23:45:27  lombard
171 *     added signal handler for graceful shutdowns; numerous bug fixes
172 *     See README.changes for complete list
173 *
174 *     Revision 1.1  2000/02/14 19:58:27  lucky
175 *     Initial revision
176 *
177 *
178 */
179
180  /*
181   wave_serverV.c:
182
183*****************************************************************************
184   Warning:  This file has been victim to multiple Global replace executions,
185   changing "wave_serverIII" to "wave_serverIV" and then to "wave_serverV".
186   As a result some of the documentation will read a little funny, such as:
187   "Wave_serverV represents an upgrade to the Wave_serverV code with
188   the following mods:".  The sentence originally looked like "Wave_serverIV
189   represents an upgrade to the Wave_serverIII code with the following mods:"
190*****************************************************************************
191
192MOTVATION
193wave_serverV is being written for the following reason:
194 Provide a means of saving indexes and tank structures to disk, such that
195 recovery from a crash becomes a simple, quick, turnkey experience, while
196 keeping the wave_server performance close to that ov wave_serverV without
197 index updates.  Same low performance price, now with improved crash recovery
198 and racing stripes.  DK
199
200
201MOTVATION:
202wave_serverV is being written for the following reasons:
203* Large storage space per trace: We have to store several days worth of stuff
204  from 600+ stations.
205* Conurrently serve a number of clients without ever overloading the system.
206* Support the new trace format, including variable sample size, rate, and
207  variable message length.
208* Support interrupted data, either short telemetry dropouts or segmented data
209  with weeks between segments.
210* Be able to survive being turned on and off: don't loose the data you had,
211  and become operational quickly after coming up. (Older wave servers took a
212  long time reading their tank files on restart)
213* Have an extensible protocol for future clients.
214
215HISTORY:
216The original wave server was written by Will in a rather spectacularly short
217time: We came in on a Monday to find Will comatose, all waste cans full of
218expresso cups, and a working wave server. The motivation was to support
219Alaska's interest in writing trace data into DataScope, and to provide a
220playback facility for testing real-time algorithms.
221
222Lynn then proceeded to enhance the thing a number of ways. Kent produced a
223variation, introducing the idea of segmenting the tank into one partition for
224each trace and feeding it demuxed data. Things then diverged, based on some
225misuderstanding of what code from Alaska could be integrated into the
226Earhtworm release. This is an effort to meet Menlo Park requriements, meet
227Tsunami needs, and bring things back together. Several authors were involved
228in wave_serverV: Alex wrote the main thread (wave_serverV.c); Mac McKenzie
229wrote the parser of the client thread (server_thread.c). Eureka Young wrote
230the code to send trace data (serve_trace.c).
231
232LEFT TO DO:
2331. Separate thread to pull trace data messages from public ring. (done)
2342. Compute oldest time in tank rather than reading it.
2353. Index compaction when out of entires
2364. Security scheme to limit access to blessed clients.
2375. Maybe compute actual digitization rate rather than nominal.
238
239PROGRAM STRUCTURE:
240
241Based on wave_server.c. It deals only with TYPE_TRACEBUF2 messages. It operates
242a disk tank for each pin it's told to serve. Each tank is a circular disk
243file, an associated TANK structure, and an index (list in memory).  The idea
244is that wave_serverV can deal reasonably with fragmentray data
245"CHUNKs". Chunks are arbitrarily long sequences of TYPE_TRACEBUF2 messages
246without breaks (that is, there are no time gaps longer than 1.5 times the
247stated sampling period). Such chunks can be separated by any amount of
248time. Thus, telemetry interruptions as well as triggered event data can be
249handled.
250
251        The tank file is thought of as being divided into 'records' of
252specified length (from the parameter file).  Each write of a TYPE_TRACEBUF2
253message is placed into such a 'record'. The number of records in the tank is
254computed from the parameters in the config file.
255
256        The TANK structure holds various pointers describing the tank file and
257the associated index. This lives in memory, and is the 'living' set of
258pointers for the tank. There is one TANK structure for each tank maintained.
259Notably, included here is the current value of the insertion point into the
260tank file, and a pointer to the start of the index.
261
262        The index is a memory-based list of CHUNK descriptors. At startup, an
263array of such descriptors is allocated and initialized. The size of this array
264is read from the configuration file. Each such descriptor contains the chunk
265start time, end time, and offset into the tank file where the beginning of the
266chunk lives; (and of course, a pointer to the next element of the list){This
267is now inherited from the array form, instead of being separately maintained.
268
269
270The scheme (not implemented), is to perform index squeezing: If the number of
271data chunks should exceed the available chunk descriptors, the smallest chunks
272will be removed from the index, and thus be no longer available to
273servers. The motivation is to survive observed telemetry drop-outs with DST's,
274where a large number of very small chunks are produced for some period of
275time. The motivation is to preserve older, long chunks, and sacrifice what is
276probably useless data. Note that it is possible to recover such de-indexed
277chunks by brute force reading of the tank file.  When wave_serverV shuts down,
278either by request or due to error, it writes the TANK structure and index
279array to a companion file to the tank. The name of this companion file is
280created by appending an extention (currently ".inx") to the tank file name. On
281startup, wave_serverV reads its configuration file, which tells it what tanks
282to operate. It then sees if the specified tank files exist already. If so, an
283attempt is made of open the associated companion file, and to read the TANK
284structure and index from there. A check is made to verify that the TANK
285structure from disk is consistent with what was read from the configuration
286file. wave_serverV exits with horror if there is a miss-match.
287
288Wave_serverV Improvements (over wave_serverIV)
289With wave_serverV, the indexes are periodically written to disk at a frequency
290determined by the config file.  The TANK structures, which include the
291insertion point for each tank file, are also saved to disk periodically, at a
292frequency determined by another param in the config file.  If a crash occurs,
293then on restart, wave_server attempts to rebuild the indexes using the
294insertion point stored in the TANK structures file, and the indexes, stored in
295the index files, for each tank.  On startup, the Tank file is considered to be
296only as up to date as the insertion point that is stored on disk.  Therefore,
297the insertion point should be saved often.  The index files on disk are not
298required to rebuild indexes, but large tanks can require long index rebuild
299times, if the must be built from scratch.
300
301wave_serverV also introduces the use of redundant index and tank structures
302files.  The redundant files are used to prevent file corruption errors from
303disabling the crash recovery process.  The theory behind the redundant files
304is that if a system suddenly crashes due to a power outage, or other problem,
305and a vital file is being written, then that file could become corrupt, and
306would be unusable.  By using alternating redundant files, the recovery files
307are better protected, because only one file can be written to at a time, and
308thus only one could be corrupted by an interruption during writing.
309
310
311THREAD STRUCTURE
312
313        Wave_serverV consists of four fulltime threads: The main thread reads
314the configuration file, sets things up, and starts a thread which picks up
315messages of interest and stuffs them into a memory-based queue. It then drops
316into a working loop which picks up trace data messages from the queue and
317writes them into the appropriate tank. This thread also maintains the index
318lists and keeps the variables in the TANK structure up to date. It also starts
319a server manager thread which listens for connect requests over the
320network. When a connect rerquest from some client is detected, this thread
321starts a copy of the server thread, and gives it the socket descriptor to the
322client. This server thread is responsible for getting client requests and
323providing the replies. This server thread quits if the client breaks the
324socket connection, or if some horrible error is detected. Otherwise the server
325thread keeps dealing with the client indefinitely. The server manager thread
326will start up to some fixed number (coded-in constant) of server threads to
327prevent overloading the machine.  The final thread is the index manager.
328Started by main, it intermittently writes the contents of the index lists and
329tank structures to disk, to enable recovery from a crash.
330
331THREAD CONCURRENCY AND TANK FILE IO:
332
333        The tank file is used by both the main writing thread and the reading
334server threads. Interlock is as follows: There is one mutex for each tank. The
335main writing thread takes it on itsself to interlock each tank file operation
336and each index operation with a mutex, save the original offset of the tank
337file, do the io operation, restore the offset and release the mutex. The
338server threads must wait for, hold, and release the mutex whenever it looks at
339the tank file or the index list.  The index manager thread must also deal with
340mutexes when interacting with tank data.
341
342PROTOCOL:
343Notes:
344        * The expectation is that additional protocols will be added as
345          needed. Specifically, a 'regurgitate raw' should be defined soon to
346          provide a play-back capability.
347
348        * A value of 'nada' for fill-value indicates no fill which is not
349          supported in the first version. We don't even know how we'd handle
350          that...
351
352        * <s><c><n><l> is short-hand for site code, channel code, network and
353          location id.
354
355        * <flags> :: F | F<letter> ... <letter>
356
357    Currently Supported Flags:
358
359    R  All requested data is right of the tank (after the tank).
360    L  All requested data is left of the tank (before the tank).
361    G  All requested data is in a gap in the tank.
362    B  The Client's request was bad.  It contained incorrect syntax.
363    C  The tank from which the client requested data is corrupt.
364       (Data may or may not be available for other time intervals
365        in the tank.  WARNING!  This flag indicates that the tank
366        is corrupt, this means data from other time intervals may
367        be inaccurate)
368    N  The requested channel(tank) was not found in this wave_server.
369    U  An unknown error occurred.
370    DK 010501  Added additional flags: FB, FC, FN, FU
371
372        * <datatype> is two character code ala CSS.  Initially, we will
373          support i2, i4, s2, and s4. i for Intel; s for Sparc. 2 meaning
374          two-bytes per integer, 4 ...
375
376        * Replies are ascii, including trace data.
377
378        * the <request id> was added later to permit the wave viewer to keep
379          track of which reply belongs to which query. The idea is that the
380          client sends this as part of it's request. To us it's an arbitrary
381          ascii string.  We simply echo this string as the first thing in our
382          reply. We don't care what it is. The motivation is that the
383          waveviewr would occasionally get confused as to which reply belonged
384          to which of it's requests, and it would sit there, listening for a
385          reply which never came.
386
387                                *** NOTE NOTE NOTE ***
388          The request id is echoed as a fixed length 12 character string, null
389          padded on the right if required.
390
391
392Description of Requests and Responses
393
394MENU: <request id>
395        returns one line for each tank it has:
396        <request id>
397        pin#  <s><c><n><l> <starttime> <endtime>  <datatype>
398          .      .       .         .          .
399        pin#  <s><c><n><l> <starttime> <endtime>  <datatype>
400        \n
401
402MENUPIN: <request id>  <pin#>
403        returns as above, but only for specified pin number:
404        <request id> <pin#>  <s><c><n><l>  <starttime>  <endtime>  <datatype> <\n>
405
406MENUSCN: <request id>  <s><c><n><l>
407        returns as above, but for specified <s><c><n><l> name:
408        <request id> <pin#>  <s><c><n><l>  <starttime>  <endtime>  <datatype> <\n>
409
410GETPIN:  <request id> <pin#> <starttime>  <endtime> <fill-value>
411        returns trace data for specified pin and time interval. Gaps filled
412         with <fill-value>.  <request id> <pin#> <s><c><n><l> F <datatype>
413         <starttime> <sampling rate> sample(1) sample(2)... sample(nsamples)
414         <\n> {the samples are ascii}
415
416        If the requested time is older than anything in the tank, the reply
417        is: <request id> <pin#> <s><c><n><l> FL <datatype> <oldest time in tank>
418        <sampling rate> \n for the case when the requested interval is younger
419        than anything in the tank, the reply is <request id> <pin#> <s><c><n><l>
420        FR <datatype> <youngest time in tank> <sampling rate> \n
421
422        NOTE: the GETPIN request has never worked in wave_serverV. As pin
423        numbers fade into the distance, it is unlikely that this request will
424        ever be supported.
425
426GETSCN: <request id> <s><c><n><l> <starttime>  <endtime> <fill-value>
427        returns as above, but for specified scn name.
428        <request id> <pin#> <s><c><n><l> F <datatype> <starttime> <sampling-rate >
429                 sample(1) sample(2)... sample(nsamples) <\n>
430
431
432GETSCNRAW: <request id> <s><c><n><l> <starttime>  <endtime>
433        returns trace data in the original binary form it which it was put
434        into the tank. Whole messages will be supplied, so that the actual
435        starttime may be older than requested, and the end time may be younger
436        than requested. The reply is part ascii, terminated by a "\n",
437        followed by binary messages:
438
439        <request id> <pin#> <s><c><n><l> F <datatype> <starttime> <endtime>
440        <bytes of binary data to follow> \n The line above is all in
441        ascii. All below is binary, byte order as found in the tank:
442        <trace_buf msg> ... <trace_buf msg>
443
444        If the requested time is older than anything in the tank, the reply is:
445         <request id> <pin#> <s><c><n><l> FL <datatype> <oldest time in tank> \n
446
447        For the case when the requested interval is younger than anything in
448        the tank, the reply is:
449        <request id> <pin#> <s><c><n><l> FR <datatype> <youngest time in tank> \n
450
451        For the case when the requested interval falls completely in a gap,
452        the reply is:
453         <request id> <pin#> <s><c><n><l> FG <datatype> \n
454
455*/
456
457
458#include <stdio.h>
459#include <stdlib.h>
460#include <math.h>
461#include <signal.h>
462#include <string.h>
463#include <time.h>
464#include <errno.h>
465#include <data_buf.h>
466#include <trace_buf.h>
467#include <earthworm.h>
468#include <kom.h>
469#include <swap.h>
470#include <transport.h>
471#include <mem_circ_queue.h>
472#include "wave_serverV.h"
473
474/* Function Prototypes
475 *********************/
476void    wave_serverV_config( char * );
477void    wave_serverV_lookup( void );
478void    LogTank( TANK* );
479int     SocketInit( int, char * );
480thr_ret SocketServe( void * );
481double  get_starttime( char *, unsigned char );
482int     ReportServerThreadStatus(void);
483void    signal_hdlr(int signum);
484int     TraceBufIsValid(TANK * pTSPtr, TRACE2_HEADER * pCurrentRecord);
485
486#define IPADDRLEN 20
487#define FILENAMELEN 80
488
489/* Global socket things
490***********************/
491int            PassiveSocket;     /* Socket descriptor; passive socket     */
492static char    ServerIPAdr[IPADDRLEN];  /* IP address of wave_server machine */
493static int     ServerPort;        /* Server port for requests & replies    */
494
495/* Pointers to things and actual allocations
496********************************************/
497TANK *  Tanks=0;                /* Pointer to the array of configured tank descriptors */
498MSG_LOGO        ServeLogo[MAX_TANKS];   /* worst possible case is each pin from a different logo */
499mutex_t         QueueMutex;             /* mutex handles. We suspect they're long integers. */
500                                                /* The last one is for the input queue */
501int             NLogos;                         /* number of distinct logos to deal with */
502
503/* Parameters to be read from a configuration file (defaults given)
504*******************************************************************/
505static char          RingName[MAX_RING_STR];         /* name of transport ring for i/o    */
506static long          RingKey;              /* key to transport ring to read from */
507static unsigned char MyModId;              /* wave_server's module id            */
508static int           LogSwitch;            /* Log-to-disk switch                 */
509double               GapThresh;            /* Theshhold factor for gap declaration */
510static int           IndexUpdate;            /* should we write the index after each tank write. */
511int                  SocketTimeoutLength;  /* Length of Timeouts on SOCKET_ew calls */
512int                  SOCKET_ewDebug=0;     /* Set to 1 for socket debugging */
513int                  ClientTimeout = -1;   /* Time to wait for any activity
514                                              from client; -1: wait forever */
515static int           TankStructUpdate;     /* Seconds between TS disk updates */
516static int           IndexUpdate;          /* Seconds between index disk updates */
517int                  PleaseContinue=0;     /* Indicates whether to continue or not after a tank has failed. */
518int                  ReCreateBadTanks=0;   /* Indicates whether a tank should be born-again from scratch
519                                              if it is found to be bad during initialization. */
520int                  SecondsBetweenQueueErrorReports;
521                                           /* Indicates the minimum amount of time between error reports
522                                              to statmgr, related to internal message queue lapping. */
523int                  MaxServerThreads=10; /* largest number of  server threads we'll start up */
524int                  bAbortOnSingleTankFailure=TRUE;
525                                          /* set to true in config file if you wish wave_server
526                                             to continue if there is an I/O error on a single
527                                             tank file. DK 01/08/01      */
528int                  bTruncateTanksTo1GBAndContinue=FALSE;
529                                          /* set to true in config file if you wish wave_server
530                                             to truncate tanks that are listed in the config file at
531                                             over 1GB, down to 1GB and continue.  Default behavior
532                                             is to complain and exit if a >1GB tank is given.DK 2005/03/17  */
533
534
535/*  I am not sure if SocketTimeoutLength should be volatile.  It should only
536    be modified by the main thread, but it is read in other threads.  I am
537    leaving it as a normal global int, but based on the problems that Alex
538    had, I would be cautious of it.  DavidK
539*/
540
541
542/* Server thread stuff
543**********************/
544#define THREAD_STACK  8096
545static unsigned int *  tidSockSrv;       /* id of socket-serving threads */
546static unsigned int   tidServerMgr;                     /* id of dispatcher of socket-serving threads */
547ServerThreadInfoStruct * ServerThreadInfo;
548ServerThreadInfoStruct * ServerThreadInfoPrevious;
549
550thr_ret ServerThread( void*  );
551thr_ret ServerMgr( void*  );
552
553
554/* Message stacker thread stuff
555*******************************/
556thr_ret MessageStacker( void * );
557int     MessageStackerStatus;                           /* as above, but for message stacking thread */
558static unsigned int   tidStacker;
559static int     InputQueueLen;   /* max messages in input buffer */
560QUEUE OutQueue;                 /* from queue.h, queue.c; sets up linked */
561                                /*   list via malloc and free */
562static int QueueHighWaterMark,QueueLowWaterMark;  /* For polling queue depth */
563
564
565/* IndexMgr thread stuff
566************************/
567thr_ret IndexMgr( void *);
568static unsigned int tidIndexMgr;
569static mutex_t IndexTerminationMutex;
570int terminate=0;   /* Changed to global to server threads: PNL 6/20/00 */
571static int QueueReportInterval=30;
572/* Heartbeat Stuff
573******************/
574time_t        timeLastBeat;             /* system time of last heartbeat sent */
575static long   HeartBeatInt;             /* seconds between heartbeats */
576
577/* Look up from earthworm.h
578 **************************/
579unsigned char InstId;         /* local installation id      */
580unsigned char TypeHeartBeat;
581unsigned char TypeError;
582unsigned char TypeWaveform;
583
584/* Other globals
585 ***************/
586static SHM_INFO       Region;       /* Info structure for shared memory */
587volatile        int   nTanks;       /* number of tanks in operation     */
588volatile        int   Debug =0;     /* Can be set to one by an optional configuration file command */
589static volatile int   MaxMsgSiz =0; /* the largest message expected */
590pid_t                 myPid;        /* for restarts by startstop */
591
592/**************************/
593/*  Global TANK and TANKList Variables */
594TANKList * pConfigTankList;
595TANKList * pTANKList;
596TANK * pTSPtr;
597TANK * pTSTemp;
598/**************************/
599
600
601static char  Text[250];        /* string for log/error messages          */
602
603int serve_trace_c_init();  /* used by serve_trace.c to report it's
604                              internal-internal-internal-version.
605                              Added by davidk, due to the number of
606                              serve trace.c versions being put out.
607                           */
608
609int index_util_c_init();   /* same as serve_trace_c_init(), except for
610                              index_util.c.
611                           */
612
613int wave_serverV_c_init();   /* same as serve_trace_c_init(), except for
614                              wave_serverV.c.
615                           */
616int server_thread_c_init();   /* same as serve_trace_c_init(), except for
617                              server_thread.c.
618                           */
619
620/* Put this here to avoid having to ifdef a prototype. */
621#ifdef _WINNT
622BOOL CtrlHandler( DWORD fdwCtrlType )
623{
624  switch( fdwCtrlType )
625  {
626    // Handle the CTRL-C signal.
627    case CTRL_C_EVENT:
628    case CTRL_CLOSE_EVENT:
629    case CTRL_BREAK_EVENT:
630      signal_hdlr(0);
631      return(TRUE);
632
633    default:
634      return FALSE;
635  }
636}
637#endif /* _WINNT */
638
639
640#define WSV_VERSION "5.1.37 - 2007-05-31"
641
642main( int argc, char **argv )
643{
644  time_t        timeNow;        /* current system time                      */
645  int           result;
646  int           i,j;
647  char*              deQMsg;         /* for picking messages from the memory queue   */
648  char*         inMsg;          /* naked message from queue after remvoing "t"  */
649  MSG_LOGO      logo;           /* of the incoming message                      */
650  char*         scrMsg;         /* scratch pad: like figuring out who's nexting */
651  long          msgInLen;       /* length of retrieved message                  */
652  int           t;              /* running index over tanks.                    */
653  int           grabit;         /* silly little flag for extracting unique logos */
654  unsigned int  tmpOffset;
655  time_t        CurrentTime;
656  int           bAbortMainLoop;  /* flag to abort the main loop */
657  int           flag;            /* transport ring flag value */
658  int           rc;
659
660  /* Variables to handle recording of File I/O errors */
661  int iFileOffset, iFileBlockSize;
662  char * szFileTankName;
663  char szFileFunction[15];
664  int  bIssueIOStatusError = FALSE;
665
666
667  TRACE2_HEADER *trh;   /* paulf addition 1/07/1999 mem alignment bug fix*/
668
669  TracePacket   Tpkt;   /* paulf addition 5/24/2007 another mem alignment fix!
670                                        ironic no? */
671  /* Catch broken socket signals
672   ******************************/
673#if defined(_SOLARIS) || defined(_LINUX) || defined(_MACOSX)
674  (void)sigignore(SIGPIPE);
675#endif
676
677  /* Initialize name of log-file & open it
678     force disk logging until after config
679     file is read.  JMP 10/3/2001
680    ***************************************/
681  logit_init( argv[1], 0, 256, 1 );
682
683  /* Not enough arguments
684   ********************/
685  if ( argc != 2 )
686  {
687    fprintf( stderr, "Usage: wave_serverV <configfile>\n" );
688    exit( -1 );
689  }
690
691  /* debug: set up periodic dump logic
692   ************************************/
693
694
695  /*
696     wave_server_V_config() needs to set the following variables
697     pConfigTankList: a pointer to a list of tank structs created
698     from the config file, including a pointer to the TANK
699     file for the waveserver.
700     PleaseContinue: a flag indicating weather waveserver should
701     go down in flames if a single tank fails, or whether it
702     should please continue until an error occurs that renders
703     all tanks useless.
704  ***********************/
705  wave_serverV_config( argv[1] );
706  wave_serverV_lookup( );
707
708  /* Reset logging to desired level
709   ***************************************/
710  logit_init( argv[1], 0, 256, LogSwitch );
711  if (Debug > 1)
712    logit( "" , "NOTE: This is the post-v5.2 patch to not write the empty tank messages\n");
713
714  logit( "" , "wave_serverV: Read command file <%s>\n", argv[1] );
715
716  /* Give serve_trace.c a chance to init.
717     this is probably just a version stamp, but
718     who knows?  Also index_util.c
719   *********************************************/
720  wave_serverV_c_init();
721  serve_trace_c_init();
722  index_util_c_init();
723  server_thread_c_init();
724
725
726/* Get process ID for heartbeat messages */
727  myPid = getpid();
728  if( myPid == -1 )
729  {
730    logit("e","wave_serverV: Cannot get pid. Exiting.\n");
731    exit (-1);
732  }
733
734   /* Attach to transport ring (Must already exist)
735   *********************************************/
736  tport_attach( &Region, RingKey );
737
738  /* Right now, everything we have is in pConfigTankList, it contains all
739   * tank info and filenames from the config file, but no valid file pointers..
740   */
741     /* Start of new Initialization */
742  if((pTANKList=malloc(sizeof(TANKList))) == NULL)
743  {
744    logit("et","wave_serverV:  malloc() failed for tank list.  Exiting\n");
745    exit(-1);
746  }
747
748  /* Copy the header information from the ConfigTankList to the Tank Structure
749     file list.
750   ***********************/
751  *pTANKList=*pConfigTankList;
752
753  /* Retrieve the TANK structs from disk */
754  if(GetLatestTankStructures(pTANKList) == -1)
755  {
756    logit("et","Failed in call to GetLatestTankStructures().  Exiting\n");
757    exit( -1 );
758  }
759
760  /* We now have pTANKList, which is a list of TANK structures
761     retrieved from the TANK file(s), and pConfigTankList, a list of TANK
762     structures retrieved from the config file.  We need to merge the two.
763     Use the tanks in pTANKList to configure the tanks in the pConfigTankList
764  */
765
766  time(&CurrentTime);
767  /* For each tank in the list retrieved from disk */
768  for(pTSPtr=pTANKList->pFirstTS;
769      pTSPtr != pTANKList->pFirstTS + (pTANKList->NumOfTS);
770      /*Not EOL*/
771      pTSPtr= pTSPtr+1  /* move to next struct */)
772  {
773
774    /* Configure the tank if it's in the config list, with the info from the
775       tank struct file.  ConfigTANK() returns a ptr to the just configured
776       tank in the Config Tank List.
777    */
778    if( (pTSTemp=ConfigTANK(pTSPtr,pConfigTankList)) != (TANK *) NULL )
779    {
780      /* Tank is in the list, so open it, and Index files, update the
781         living index, and write a copy of the living index to file.
782      */
783
784      if(OpenTankFile(pTSTemp) < 0)
785      {
786        if(!ReCreateBadTanks) MarkTankAsBad(pTSTemp); else pTSTemp->isConfigured=0;
787        continue;
788      }
789      if(OpenIndexFile(pTSTemp,pConfigTankList->redundantIndexFiles,0) < 0)
790      {
791        if(!ReCreateBadTanks) MarkTankAsBad(pTSTemp); else pTSTemp->isConfigured=0;
792        continue;
793      }
794      if(BuildLIndex(pTSTemp) < 0)
795      {
796        if(!ReCreateBadTanks) MarkTankAsBad(pTSTemp); else pTSTemp->isConfigured=0;
797        continue;
798      }
799      if(WriteLIndex(pTSTemp,0,CurrentTime) < 0)
800      {
801        if(!ReCreateBadTanks) MarkTankAsBad(pTSTemp); else pTSTemp->isConfigured=0;
802        continue;
803      }
804    }
805    else
806    {
807      logit("t","Tank %s,%s,%s,%s found in %s, but not listed in config file\n",
808            pTSPtr->sta,pTSPtr->chan,pTSPtr->net,pTSPtr->loc,
809            GetRedundantFileName(pTANKList->pTSFile));
810    }
811  }  /* End for each tank from Tank structures file */
812
813  /* We don't need pTANKList anymore, since we've pilfered anything useful
814     in it while configuring pConfigTankList, our new hero */
815  if(pTANKList->pFirstTS)
816  {
817    free(pTANKList->pFirstTS);
818  }
819  free(pTANKList);
820
821  /* for each tank in the list read from the config file */
822  for(pTSPtr=pConfigTankList->pFirstTS;
823      pTSPtr != (pConfigTankList->pFirstTS) + (pConfigTankList->NumOfTS);/*!EOL*/
824      pTSPtr= pTSPtr+1  /* move to next struct */)
825  {
826    if(!pTSPtr->isConfigured)
827      /* Tank is marked as configured during BuildLIndex() */
828    {
829      if(CreateTankFile(pTSPtr) < 0)
830      {
831        MarkTankAsBad(pTSPtr);
832        continue;
833      }
834      if(OpenIndexFile(pTSPtr,pConfigTankList->redundantIndexFiles,1) < 0)
835      {
836        MarkTankAsBad(pTSPtr);
837        continue;
838      }
839      if(BuildLIndex(pTSPtr) < 0)
840      {
841        MarkTankAsBad(pTSPtr);
842        continue;
843      }
844      if(WriteLIndex(pTSPtr,0,CurrentTime) < 0)
845      {
846        MarkTankAsBad(pTSPtr);
847        continue;
848      }
849      pTSPtr->isConfigured = 1;
850    }  /* End if !Tank Already Configured (for better or worse) */
851  }  /* End for tank in configTankList */
852
853  /* Throw out the bad tanks, so that we are not carrying excess baggage.
854     If PleaseContinue is not set, and a bad tank is found,
855     RemoveBadTanksFromList() will exit() wave_server
856  */
857  if(RemoveBadTanksFromList(pConfigTankList))
858  {
859    logit("et","wave_server: RemoveBadTanksFromList() failed.  Exiting\n");
860    exit(-1);
861  }
862
863  /* Set the nTanks and Tank variables used throughout the program */
864  nTanks=pConfigTankList->NumOfTS;
865  Tanks=pConfigTankList->pFirstTS;
866
867  /* All tanks now configured
868     List of tanks is in pConfigTankList
869   **********************************/
870
871  /* Sort tanks using CRTLIB func qsort().
872     Davidk 10/5/98
873   **********************************/
874  qsort(Tanks,nTanks,sizeof(TANK), CompareTankSCNLs);
875
876  /* Turn Socket level debugging On/Off
877   **********************************/
878  setSocket_ewDebug(SOCKET_ewDebug);
879
880  /* Create one mutex for each tank
881    ********************************/
882  for(i=0;i<nTanks;i++)
883  {
884    CreateSpecificMutex(&(Tanks[i].mutex));
885  }
886
887  /* Create a mutex fot IndexMgr Termination */
888  CreateSpecificMutex(&(IndexTerminationMutex));
889
890
891  /* Allocate message buffers
892  **************************/
893  for (i=0;i<nTanks;i++)       /* find the largest record size in any tank */
894    if(Tanks[i].recSize > MaxMsgSiz) MaxMsgSiz=Tanks[i].recSize;
895
896  if(Debug > 1)
897       logit("","MaxMsgSiz: %d\n",MaxMsgSiz);
898
899  deQMsg = (char*)malloc( MaxMsgSiz+sizeof(int) );
900  if ( deQMsg == (char *) NULL )
901  {
902    logit( "e","wave_serverV: Error allocating inMsg buffer; exiting!\n" );
903    exit( -1 );
904  }
905  inMsg = deQMsg + sizeof(int);   /* recall that the tank number is pasted on the front of the
906                                     queued message by the stacker thread */
907  scrMsg = (char*)malloc( MaxMsgSiz );
908  if ( scrMsg == (char *) NULL )
909  {
910    logit( "e","wave_serverV: Error allocating scrMsg buffer; exiting!\n" );
911    exit( -1 );
912  }
913
914  /* Create a mutex for the queue
915  ******************************/
916  CreateSpecificMutex( &QueueMutex );
917
918  /* Initialize the message queue
919  *******************************/
920  initqueue ( &OutQueue, (unsigned long)InputQueueLen,(unsigned long)(MaxMsgSiz+sizeof(int)) );
921
922  /* Load array of logos to get
923    ****************************/
924  /* the deal here is to construct an array of distinct logos to be gotten. The Tank structures
925     contain logos for each tank, but many of them are likely the same. No point loading down
926     tport with all that */
927  for (i=0;i<nTanks;i++)  ServeLogo[i].instid = 0; /* clear  array */
928  ServeLogo[0]=Tanks[0].logo; /* hand load the first logo. */
929  NLogos=1;  /* Number of entries in ServeLogo */
930
931  for (i=0;i<nTanks;i++)
932  {
933    grabit=1;
934    for(j=0; j<NLogos; j++) /* look at all the logos we have so far */
935    {
936      if ( Tanks[i].logo.instid == ServeLogo[j].instid &&
937           Tanks[i].logo.mod    == ServeLogo[j].mod    &&
938           Tanks[i].logo.type   == ServeLogo[j].type     )  /* then it's old hat */
939      {
940        grabit=0;
941      }
942    }
943    if( grabit == 1)
944    {
945      ServeLogo[NLogos++]=Tanks[i].logo;
946      printf("ServeLogo: %d %d %d \n",ServeLogo[NLogos-1].instid,
947             ServeLogo[NLogos-1].mod,
948             ServeLogo[NLogos-1].type);
949    }
950  }  /* for < NLogos */
951  if(Debug > 1)
952  {
953    logit("","Acquiring: %d logos:\n",NLogos);
954    for(i=0;i<NLogos;i++)
955      logit("","Logo:  %d %d %d \n",ServeLogo[i].instid,ServeLogo[i].mod,ServeLogo[i].type);
956  }
957
958  /* Set up the signal handler so we can shut down gracefully */
959#if defined(_SOLARIS) || defined(_LINUX) || defined(_MACOSX)
960  signal(SIGINT, signal_hdlr);     /* <Ctrl-C> interrupt */
961  signal(SIGTERM, signal_hdlr);    /* program termination request */
962 #ifdef SIGBREAK
963  signal(SIGBREAK, signal_hdlr);   /* keyboard break */
964 #endif  /* SIGBREAK */
965  signal(SIGABRT, signal_hdlr);    /* abnormal termination */
966#else
967#ifdef _WINNT
968  SetConsoleCtrlHandler( (PHANDLER_ROUTINE) CtrlHandler, TRUE ); 
969#endif _WINNT
970#endif _SOLARIS
971
972  /* Create a server thread info array before starting any threads.  The info array
973     information is set in the serverthread and servermgr, but is read by the
974     indexmgr thread.  */
975  ServerThreadInfo=(ServerThreadInfoStruct *)
976    malloc(MaxServerThreads * sizeof(ServerThreadInfoStruct));
977  memset(ServerThreadInfo,0,MaxServerThreads * sizeof(ServerThreadInfoStruct));
978  /* Create a previous array for comparisons done by ReportServerThreadStatus() */
979  ServerThreadInfoPrevious=(ServerThreadInfoStruct *)
980    malloc(MaxServerThreads * sizeof(ServerThreadInfoStruct));
981  memset(ServerThreadInfoPrevious,0,MaxServerThreads * sizeof(ServerThreadInfoStruct));
982
983
984  /* Start the Server Manager Thread */
985  /***********************************/
986  if (StartThread(ServerMgr, (unsigned)THREAD_STACK, &tidServerMgr) == -1)
987  {
988    logit("e", "wave_serverV: error starting ServerMgr. Exiting.\n");
989    exit(-1);
990  }
991
992
993  /* Start the Index Manager Thread */
994  /***********************************/
995  if (StartThread(IndexMgr, (unsigned)THREAD_STACK, &tidIndexMgr) == -1)
996  {
997    logit("e", "wave_serverV: error starting IndexMgr. Exiting.\n");
998    exit(-1);
999  }
1000
1001
1002  /* Start the message stacking Thread */
1003  /*************************************/
1004
1005  MessageStackerStatus =-1; /* it should announce life to us */
1006  if (StartThread(MessageStacker, (unsigned)THREAD_STACK, &tidStacker) == -1)
1007  {
1008    logit("e", "wave_serverV: error starting MessageStacker. Exiting.\n");
1009    exit(-1);
1010  }
1011
1012  /* Initialize bAbortMainLoop flag to FALSE */
1013  /*******************************************/
1014  bAbortMainLoop = FALSE;
1015
1016  /* Working loop: get messages from ring, and plop into tank
1017   ***********************************************************/
1018  while(!terminate)
1019    /***********************************
1020      start of loop over messages ***/
1021  {
1022
1023    /* Get message from queue
1024     *************************/
1025    RequestSpecificMutex( &QueueMutex );
1026    result=dequeue( &OutQueue, deQMsg, &msgInLen, &logo);
1027    if(QueueLowWaterMark > OutQueue.NumOfElements)
1028      QueueLowWaterMark = OutQueue.NumOfElements;
1029    ReleaseSpecificMutex( &QueueMutex );
1030
1031    /* See if it's time to stop */
1032    flag = tport_getflag(&Region);
1033    if( flag == TERMINATE  ||  flag == myPid )  break;
1034
1035    if( result < 0 ) /* -1 means empty queue */
1036    {
1037      /* Send wave_server's heartbeat */
1038      if( time(&timeNow)-timeLastBeat >= HeartBeatInt )
1039      {
1040        timeLastBeat = timeNow;
1041        wave_serverV_status( TypeHeartBeat, 0, "" );
1042      }
1043
1044      sleep_ew(100);
1045      continue;
1046    }
1047
1048    /* Extract the tank number
1049     *************************/
1050    /* Recall, it was pasted as an int on the front of the message by the MessageStacker thread */
1051    t = *((int*)deQMsg);
1052    if(Debug > 1)
1053       logit("","From queue: msg of %ld bytes for tank %d\n",msgInLen-sizeof(int),t);
1054
1055    msgInLen = msgInLen -sizeof(int);  /* correct message length */
1056
1057
1058    /* Swap the message to local notation AND MEMORY ALIGN!!!
1059     *************************************/
1060    memcpy((void *)&Tpkt, (void *)inMsg, msgInLen);
1061    trh = (TRACE2_HEADER *) &Tpkt;
1062
1063    /*  from here on in, only use Tpkt to refer to the msg,
1064        or trh to the header, this is mem aligned
1065        *************************************************/
1066    rc = WaveMsg2MakeLocal( trh ); 
1067
1068    if(rc < 0)
1069    {
1070      logit("et","WARNING: WaveMsg2MakeLocal() rejected tracebuf.  Discarding: (%s.%s.%s.%s)\n",
1071        trh->sta, trh->chan, trh->net, trh->loc);
1072      logit("e", "\t%.2f - %.2f nsamp=%d samprate=%6.2f endtime=%.2f. datatype=[%s]\n",
1073        trh->starttime, trh->endtime, trh->nsamp, trh->samprate,
1074        IndexYoungest(&Tanks[t])->tEnd, trh->datatype);
1075      goto ReleaseMutex;
1076    }
1077
1078
1079    /* Grab the tank file
1080     ********************/
1081    RequestSpecificMutex( &(Tanks[t].mutex) );
1082
1083    if(!TraceBufIsValid(&Tanks[t],trh))
1084    {
1085        logit("et","WARNING: Tracebuf seems invalid.  Discarding: (%s.%s.%s.%s)\n",
1086              trh->sta, trh->chan, trh->net, trh->loc);
1087        logit("e", "\t%.2f - %.2f nsamp=%d samprate=%6.2f endtime=%.2f. datatype=[%s]\n",
1088              trh->starttime, trh->endtime, trh->nsamp, trh->samprate,
1089              IndexYoungest(&Tanks[t])->tEnd, trh->datatype);
1090        goto ReleaseMutex;
1091    }
1092
1093    /* Seek to insertion point
1094     ********************/
1095    if ( fseek( Tanks[t].tfp, Tanks[t].inPtOffset, SEEK_SET ) != 0 )  /* move to insertion point */
1096    {
1097      logit( "et", "wave_serverV: Error on fseek in tank [%s].\n",Tanks[t].tankName );
1098      strncpy(szFileFunction,"fseek",sizeof(szFileFunction));
1099      szFileFunction[sizeof(szFileFunction)]= 0;
1100      iFileOffset     = Tanks[t].inPtOffset;
1101      iFileBlockSize  = 0;
1102      szFileTankName  = Tanks[t].tankName;   /* don't copy just point */
1103      if(bAbortOnSingleTankFailure)
1104        goto abortAndReleaseMutex;
1105      else
1106        goto ReleaseMutex;
1107    }
1108    /* Be sure message is not too big */
1109    if ( msgInLen > Tanks[t].recSize )
1110    {
1111      sprintf( Text, "tank %s.%s.%s.%s msg too big (%d > %d).\n",
1112               Tanks[t].sta, Tanks[t].chan, Tanks[t].net, Tanks[t].loc,
1113               msgInLen, Tanks[t].recSize);
1114      wave_serverV_status(TypeError, ERR_TOOBIG, Text);
1115      goto ReleaseMutex;
1116    }
1117    /* Write message to Tank file */
1118    if ( (long) fwrite( (void*)&Tpkt, Tanks[t].recSize, 1, Tanks[t].tfp ) != 1 )
1119    {               /* NOTE: we always write a FULL record */
1120      logit( "et", "wave_serverV: Error writing to tank %s; exiting!\n",Tanks[t].tankName );
1121      strncpy(szFileFunction,"fwrite",sizeof(szFileFunction));
1122      szFileFunction[sizeof(szFileFunction)]= 0;
1123      iFileOffset     = Tanks[t].inPtOffset;
1124      iFileBlockSize  = Tanks[t].recSize;
1125      szFileTankName  = Tanks[t].tankName;   /* don't copy just point.  The original should be
1126                                                around for a while, and we aren't modifying */
1127      bIssueIOStatusError = TRUE;
1128      if(bAbortOnSingleTankFailure)
1129        goto abortAndReleaseMutex;
1130      else
1131        goto ReleaseMutex;
1132    }
1133    /* Update the insertion point
1134     *****************************/
1135    tmpOffset=Tanks[t].inPtOffset;
1136    if ( ftell(Tanks[t].tfp) != Tanks[t].inPtOffset + Tanks[t].recSize )
1137    {
1138      logit("et"," wave_serverV: bad offset %ld from ftell updating insertion point on %s\n",
1139            Tanks[t].inPtOffset,Tanks[t].tankName);
1140      logit("et"," wave_serverV:  ftell(Tanks[t].tfp): %ld\n",  ftell(Tanks[t].tfp));
1141      strncpy(szFileFunction,"ftell",sizeof(szFileFunction));
1142      szFileFunction[sizeof(szFileFunction)]= 0;
1143      iFileOffset     = Tanks[t].inPtOffset;
1144      iFileBlockSize  = Tanks[t].recSize;
1145      szFileTankName  = Tanks[t].tankName;   /* don't copy just point */
1146      if(bAbortOnSingleTankFailure)
1147        goto abortAndReleaseMutex;
1148      else
1149        goto ReleaseMutex;
1150    }
1151    Tanks[t].inPtOffset = Tanks[t].inPtOffset + Tanks[t].recSize;
1152
1153    /* Check for wrap of tank
1154     *****************************/
1155    /* Davidk 4/9/98, changed this comparison from > to >=, with the mindset
1156       that once the offset point hits the tank length, it should not write anymore
1157       records, it should instead wrap back to the beginning of the tank.  If it waits
1158       until the offset is > than the tanksize, then if everything is on the up and up,
1159       it should write 1 record passed the logical end of the tank, because the last
1160       record will be written starting at inPtOffset == tanksize, since tanksize is
1161       a multiple of record size.  Records should be written at (0,+rsz,0+2*rsz,..,(tsz/rsz)*rsz),
1162       where the last write in the sequence, (tsz/rsz)*rsz = tsz.
1163    */
1164    if ( Tanks[t].inPtOffset >= Tanks[t].tankSize )
1165    {
1166      Tanks[t].inPtOffset=0;
1167      Tanks[t].firstPass=0; /* we've been here before */
1168    }
1169
1170    if(UpdateIndex(&Tanks[t],trh,tmpOffset, TRUE))
1171    {
1172      strncpy(szFileFunction,"UpdateIndex",sizeof(szFileFunction));
1173      szFileFunction[sizeof(szFileFunction)]= 0;
1174      iFileOffset     = Tanks[t].inPtOffset;
1175      iFileBlockSize  = Tanks[t].recSize;
1176      szFileTankName  = Tanks[t].tankName;   /* don't copy just point */
1177      if(bAbortOnSingleTankFailure)
1178        goto abortAndReleaseMutex;
1179      else
1180        goto ReleaseMutex;
1181    }
1182
1183    ReleaseSpecificMutex( &(Tanks[t].mutex) );
1184    continue;
1185
1186  abortAndReleaseMutex:
1187    bAbortMainLoop=1;
1188
1189  ReleaseMutex:
1190    ReleaseSpecificMutex( &(Tanks[t].mutex) );
1191    if(bIssueIOStatusError)
1192    {
1193      IssueIOStatusError(szFileFunction,iFileOffset,iFileBlockSize,szFileTankName);
1194      bIssueIOStatusError = FALSE;
1195    }
1196
1197    if(bAbortMainLoop)
1198    {
1199      break;
1200    }
1201  } /*** end of working loop over messages ***/
1202
1203
1204
1205  /* We're here either due to some error, or we're supposed to shut down.
1206   **************************************************************************/
1207  logit ("et","wave_serverV: Shutdown initiated.\n");
1208
1209  /* Tell the other threads that it is time to shutdown */
1210  terminate=1;
1211
1212  for (i=0;i<nTanks;i++)
1213  {
1214    if (Debug > 1)
1215       logit("et","wave_serverV: Closing tank file %s.\n",Tanks[i].tankName);
1216
1217    if( fclose(Tanks[i].tfp) != 0)
1218      logit(""," Error on fclose on tank file %s\n", Tanks[i].tankName);
1219    /* End of comment */
1220  }
1221
1222  /* At startup, the IndexMgr thread grabs the IndexTerminationMutex,
1223      periodically, it checks to terminate, to see if it is 1, meaning
1224      it should shutdown.  Upon seeing the terminate flag, it writes
1225      the current indexes to disk, and then writes the TANK structures
1226      to disk.  Then it closes all file pointers, and finally releases
1227      the IndexTerminationMutex, to indicate to the main thread that it
1228      has completed.
1229   */
1230
1231  if (Debug > 2)
1232     logit("et","wave_serverV:main(): Waiting for IndexTerminationMutex\n");
1233
1234  RequestSpecificMutex(&IndexTerminationMutex);
1235
1236   /* Just to be clean, release the mutex */
1237  if (Debug > 2)
1238     logit("et","wave_serverV:main(): Got IndexTerminationMutex\n");
1239
1240  ReleaseSpecificMutex(&IndexTerminationMutex);
1241
1242  if (Debug > 2)
1243     logit("et","wave_serverV:main(): Released IndexTerminationMutex\n");
1244
1245  /* It is now OK to terminate */
1246
1247  if (Debug > 1)
1248    logit("et","wave_server: Detaching from shared memory.\n");
1249
1250  tport_detach( &Region );
1251  logit("et","wave_serverV: Shutdown complete. Indexes saved to disk. Exiting\n");
1252  exit(0);
1253  return(0);
1254}  /* End of main() */
1255/*---------------------------------------------------------------------------*/
1256
1257
1258/********************** Message Stacking Thread *******************
1259 *           Move messages from transport to memory queue         *
1260 ******************************************************************/
1261thr_ret MessageStacker( void *dummy )
1262{
1263  static char        *msg;            /* retrieved message               */
1264  static int          res;
1265  static long         msgInLen;       /* size of retrieved message             */
1266  static MSG_LOGO     reclogo;        /* logo of retrieved message             */
1267  static int          ret;
1268  static int          t;
1269  static int             sayItOnce=0;
1270  static int          NumOfTimesQueueLapped=0;
1271  static time_t       tLastQLComplaint;
1272  static int          NumLastReportedQueueLapped=0;
1273  static TANK        *pTempTank;
1274  unsigned char       seq;
1275
1276
1277  /* Set time of last queue lapped complaint to now, that way there is an X second
1278     buffer during startup.
1279  *******************************************/
1280  time(&tLastQLComplaint);
1281
1282  /* Allocate space for input/output messages
1283   *******************************************/
1284  /* note that we allocate an int's worth at the start of the message: this
1285     will contain the tank number this message is headed for. To save doing
1286     a second set of string compares when the message is pulled from the queue */
1287  if ( ( msg = (char *) malloc(sizeof(int)+MaxMsgSiz) ) == (char *) NULL )
1288  {
1289    logit( "e", "wave_serverV MessageStacker: error allocating msg; exiting!\n" );
1290    goto error;
1291  }
1292
1293  /* Tell the main thread we're ok
1294   ********************************/
1295  MessageStackerStatus=0;
1296
1297  /* Flush the transport ring
1298   ************************/
1299  while ( tport_copyfrom( &Region, ServeLogo, (short)NLogos, &reclogo,
1300                        &msgInLen, msg+sizeof(int), MaxMsgSiz, &seq ) != GET_NONE );
1301
1302  /* Loop (almost) forever getting messages from transport
1303  ***********************************************/
1304  while( !terminate )
1305  {
1306    /* Get a message from transport ring
1307     ************************************/
1308    res = tport_copyfrom( &Region, ServeLogo, (short)NLogos, &reclogo,
1309                        &msgInLen, msg+sizeof(int), MaxMsgSiz, &seq );
1310
1311    if( res == GET_NONE ) {sleep_ew(100); continue;} /*wait if no messages for us */
1312
1313    /* Check return code; report errors
1314     ***********************************/
1315    if( res != GET_OK )
1316    {
1317      if( res==GET_TOOBIG )
1318      {
1319        if(sayItOnce==0)
1320        {
1321          sprintf( Text, "tport msg[%ld] i%d m%d t%d too big. Max is %ld",
1322                   msgInLen, (int) reclogo.instid, (int) reclogo.mod,
1323                   (int)reclogo.type, MaxMsgSiz );
1324          wave_serverV_status( TypeError, ERR_TOOBIG, Text );
1325          sayItOnce =1;
1326        }
1327        continue;
1328      }
1329      else if( res==GET_MISS_LAPPED )
1330      {
1331        sprintf( Text, "missed msgs from i%d m%d t%d sq%d in %s (overwritten)",
1332                 (int) reclogo.instid, (int) reclogo.mod, (int)reclogo.type,
1333                 (int) seq, RingName );
1334        wave_serverV_status( TypeError, ERR_MISSMSG, Text );
1335      }
1336      else if( res==GET_MISS_SEQGAP )
1337      {
1338        sprintf( Text, "saw sequence gap before i%d m%d t%d sq%d in %s",
1339                 (int) reclogo.instid, (int) reclogo.mod, (int)reclogo.type,
1340                 (int) seq, RingName );
1341        wave_serverV_status( TypeError, ERR_MISSMSG, Text );
1342      }
1343      else if( res==GET_NOTRACK )
1344      {
1345        sprintf( Text, "no tracking for logo i%d m%d t%d in %s",
1346                 (int) reclogo.instid, (int) reclogo.mod, (int)reclogo.type,
1347                 RingName );
1348        wave_serverV_status( TypeError, ERR_NOTRACK, Text );
1349      }
1350    }  /* end if res ! GET_OK */
1351
1352    /* See if we want this SCN name
1353     *******************************/
1354
1355    pTempTank=FindSCNL(Tanks,nTanks,
1356                       ((TRACE2_HEADER*)(msg+sizeof(int)))->sta,
1357                       ((TRACE2_HEADER*)(msg+sizeof(int)))->chan,
1358                       ((TRACE2_HEADER*)(msg+sizeof(int)))->net,
1359                       ((TRACE2_HEADER*)(msg+sizeof(int)))->loc);
1360    if(pTempTank)
1361    {
1362      t=(pTempTank-Tanks);  /* both are TANK *'s so sizeof(TANK) is
1363                               already factored in */
1364    }
1365    else
1366    {
1367      continue;
1368    }
1369
1370
1371    /* This message is headed for Tanks "t" */
1372    /***************************************/
1373    /* stick the tank number as an int at the front of the message */
1374    *((int*)msg) = t;
1375    RequestSpecificMutex( &QueueMutex );
1376    ret=enqueue( &OutQueue, msg, msgInLen+sizeof(int), reclogo );
1377    if(QueueHighWaterMark < OutQueue.NumOfElements)
1378      QueueHighWaterMark = OutQueue.NumOfElements;
1379    ReleaseSpecificMutex( &QueueMutex );
1380    if ( ret!= 0 )
1381    {
1382      if (ret==-2)  /* Serious: quit */
1383      {
1384        sprintf(Text,"internal queue error. Terminating.");
1385        wave_serverV_status( TypeError, ERR_QUEUE, Text );
1386        goto error;
1387      }
1388      if (ret==-1)
1389      {
1390        sprintf(Text,"queue cannot allocate memory. Lost message.");
1391        wave_serverV_status( TypeError, ERR_QUEUE, Text );
1392        continue;
1393      }
1394      if (ret==-3)
1395      {
1396        /* sprintf(Text,"Circular queue lapped. Message lost.");
1397           wave_serverV_status( TypeError, ERR_QUEUE, Text );
1398        */
1399        /* Queue is lapped too often to be logged to screen.  Log
1400           circular queue laps to logfile.  Maybe queue laps should
1401           not be logged at all.
1402        */
1403
1404        /* What we want to do is to selectively log queue laps to the screen and statmgr.
1405           The selection that we want to log is, the first occurance of queue laps, and
1406           the number of queue laps over X seconds, as long as the number is > 1.  To do
1407           this, we need to keep a running total of queue lapped messages, a timer indicating
1408           the last time we complained, and a config file value that indicates how often to
1409           complain.
1410        */
1411        NumOfTimesQueueLapped++;
1412        if(time(NULL) - tLastQLComplaint >= SecondsBetweenQueueErrorReports)
1413        {
1414          /* Complain to stat_mgr and to screen */
1415          sprintf(Text,"Circular queue lapped.  %d messages lost.",
1416                  NumOfTimesQueueLapped-NumLastReportedQueueLapped);
1417          NumLastReportedQueueLapped=NumOfTimesQueueLapped;
1418          wave_serverV_status( TypeError, ERR_QUEUE, Text );
1419          time(&tLastQLComplaint);
1420        }
1421
1422        continue;
1423      }
1424    }
1425
1426    if(Debug > 1)
1427       logit("", "queued msg len:%ld for tank %d\n",msgInLen,*((int*)msg) );
1428  }
1429  /* we're quitting
1430   *****************/
1431 error:
1432  MessageStackerStatus=-1; /* file a complaint to the main thread */
1433  KillSelfThread(); /* main thread will restart us */
1434}
1435
1436
1437/*---------------------------------------------------------------------------*/
1438/************************** ServerMgr Thread *******************************
1439                Sets up the passive socket and listens for connect requests.
1440                Starts Server threads, gives them the active sockets, and
1441                wishes them luck. Keeps track of active threads, and does not
1442                start too many threads.
1443******************************************************************************/
1444thr_ret ServerMgr( void *dummy )
1445{
1446  int                  on = 1;
1447  int                  clientLen;
1448#ifdef TO_BE_USED_IN_FUTURE
1449  char                 sysname[40];                    /* system name; filled by getsysname_ew  */
1450#endif
1451  typedef char char16[16];
1452  char16 * client_ip;  /* IP address of client from inet_ntoa   */
1453  struct sockaddr_in   skt;
1454  struct sockaddr_in * client;
1455  int                  freeThrd;
1456  int                  i;
1457
1458
1459
1460#define ACCEPTABLE_TIME_TO_WAIT_FOR_SERVER_THREAD_TO_START 15
1461#ifndef SocketSysInit_after_gethostbyaddr
1462
1463  if (Debug > 2)
1464     logit("et","wave_serverV: SocketSysInit...\n");
1465
1466  /* Initialize the socket system
1467   ******************************/
1468  SocketSysInit();
1469
1470#endif
1471
1472  if (Debug > 2)
1473    logit("", "Wave_serverV: Beginning ServerMgr.\n");
1474
1475
1476  /* Allocate data for variables based on MaxServerThreads, which is defined
1477     in the config file, and meta-initialize any data neccessary.
1478  ******************************************************************/
1479  tidSockSrv=(unsigned int *) malloc(MaxServerThreads * sizeof(int));
1480
1481  /*ActiveSocket=(int *) malloc(MaxServerThreads * sizeof(int));*/
1482  client_ip=malloc(MaxServerThreads * sizeof(char)* 16);
1483  client=(struct sockaddr_in *)malloc(MaxServerThreads * sizeof(struct sockaddr_in));
1484
1485  /* End of malloc and initialization.
1486   *******************************************************************/
1487
1488  /* Initialize all ServerThread Info
1489   **************************************/
1490  for(i=0; i< MaxServerThreads; i++)
1491  {
1492    ServerThreadInfo[i].Status=SERVER_THREAD_IDLE; /* Set status to idle */
1493    ServerThreadInfo[i].ActiveSocket=0; /* Set all Sockets to NULL */
1494    ServerThreadInfo[i].ClientsAccepted=0;
1495    ServerThreadInfo[i].ClientsProcessed=0;
1496    ServerThreadInfo[i].RequestsProcessed=0;
1497    ServerThreadInfo[i].Errors=0;
1498  }
1499
1500  /* 2003.04.14 dbh -- removed inet_addr() and gethostbyaddr() here */
1501
1502  /************************/
1503 HeavyRestart:
1504  /************************/
1505  /* Come here after something broke the passive socket. This should not happen, but it does.
1506     In that case, we're going to re-initialize the socket system, probably drop all clients,
1507     but start the service all over again.
1508  */
1509  if (Debug > 2)
1510    logit("et","wave_serverV: (Re)starting socket system\n");
1511
1512#ifdef SocketSysInit_after_gethostbyaddr
1513
1514  if (Debug > 2)
1515     logit("et","wave_serverV: SocketSysInit...\n");
1516
1517  /* Initialize the socket system
1518   ******************************/
1519  SocketSysInit();
1520
1521#endif
1522
1523  if ( ( PassiveSocket = socket_ew( PF_INET, SOCK_STREAM, 0) ) == -1 )
1524  {
1525    logit( "et", "wave_serverV: Error opening socket. Exiting.\n" );
1526    exit(-1);
1527  }
1528
1529  /* Fill in server's socket address structure
1530   *******************************************/
1531  memset( (char *) &skt, '\0', sizeof(skt) );
1532  /* 2003.04.14 dbh -- formerly used memcpy to set skt.sin_addr from gethostbyaddr() result */
1533
1534#if defined(_LINUX) || defined(_MACOSX)
1535  /* the S_un substruct is not something linux knows about */
1536  if ((int)(skt.sin_addr.s_addr = inet_addr(ServerIPAdr)) == INADDR_NONE)
1537#else
1538  if ((int)(skt.sin_addr.S_un.S_addr = inet_addr(ServerIPAdr)) == INADDR_NONE)
1539#endif
1540  {
1541      logit( "e", "wave_serverV: inet_addr failed for ServerIPAdr <%s>; exiting!\n",ServerIPAdr );
1542      exit( -1 );
1543  }
1544
1545  skt.sin_family = AF_INET;
1546  skt.sin_port   = htons( (short)ServerPort );
1547
1548  /* Allows the server to be stopped and restarted
1549   *********************************************/
1550  on=1;
1551  if(setsockopt( PassiveSocket, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(char *) )!=0)
1552  {
1553    logit( "et", "wave_serverV: Error on setsockopt. Exiting.\n" );
1554    perror("Export_generic setsockopt");
1555    closesocket_ew( PassiveSocket, SOCKET_CLOSE_IMMEDIATELY_EW);
1556    goto HeavyRestart;
1557  }
1558
1559  /* Bind socket to a name
1560   ************************/
1561  if ( bind_ew( PassiveSocket, (struct sockaddr *) &skt, sizeof(skt)) )
1562  {
1563    sprintf(Text,"wave_serverV: error binding socket; Exiting.\n");
1564    perror("wave_serverV bind error");
1565    logit("et","%s",Text );
1566    closesocket_ew( PassiveSocket, SOCKET_CLOSE_IMMEDIATELY_EW);
1567    exit (-1);
1568  }
1569  /* and start listening */
1570  if ( listen_ew( PassiveSocket, MaxServerThreads) )
1571  {
1572    logit("et","wave_serverV: socket listen error; exiting!\n" );
1573    closesocket_ew( PassiveSocket, SOCKET_CLOSE_IMMEDIATELY_EW);
1574    exit(-1);
1575  }
1576
1577  while (!terminate)
1578  {
1579    /* Prepare for connect requests
1580     *******************************/
1581    /* Find an idle thread
1582     **********************/
1583    while(1)
1584    {
1585      for(freeThrd=0; freeThrd<MaxServerThreads; freeThrd++)
1586      {
1587        if (Debug > 2)
1588          logit("","ServerMgr: ServerThreadStatus[%d]=%d\n",
1589                freeThrd,ServerThreadInfo[freeThrd].Status);
1590
1591        if (ServerThreadInfo[freeThrd].Status == SERVER_THREAD_IDLE
1592            || ServerThreadInfo[freeThrd].Status == SERVER_THREAD_COMPLETED
1593            || ServerThreadInfo[freeThrd].Status == SERVER_THREAD_ERROR)
1594          goto gotthrd;
1595        /* As long as the thread is not busy, grab it. */
1596      }
1597      logit("et","Cannot accept client: out of server threads. Waiting.\n");
1598      sleep_ew(WAIT_FOR_SERVER_THREAD); /* get in line and wait */
1599    }
1600  gotthrd:
1601
1602    /*  First Check thread state */
1603    switch(ServerThreadInfo[freeThrd].Status)
1604    {
1605    case SERVER_THREAD_COMPLETED:
1606      {
1607        /* Everything is OK, the thread has completed a previous task */
1608        ServerThreadInfo[freeThrd].Status=SERVER_THREAD_IDLE;
1609      }  /* fall through to next case... */
1610    case SERVER_THREAD_IDLE:
1611      {
1612        /* The thread is ready, break */
1613        break;
1614      }
1615    case SERVER_THREAD_ERROR:
1616      {
1617        /* The thread died on a previous task.  Do whatever cleanup
1618           that is neccessary.  Proclaim senseless thread death!
1619        */
1620        logit("et","Wave_serverV:Server thread %d was found to be dead.  Recycling.\n\t%s\n",
1621              freeThrd,"This is a VERY BAD THING!");
1622        ServerThreadInfo[freeThrd].Status=SERVER_THREAD_IDLE;
1623        break;
1624      }
1625    default:
1626      {
1627        /* The thread is still busy.  We shouldn't be here,
1628           self-destruct
1629        */
1630        logit("et","Wave_serverV:Thread %d was selected as free, "
1631              "but status shows that it is still busy.\n", freeThrd);
1632        goto HeavyRestart;
1633      }
1634    }
1635
1636    /* Accept a connection (blocking)
1637     *********************************/
1638    clientLen = sizeof( client[freeThrd] );
1639
1640    if (Debug > 1)
1641      logit( "","wave_serverV: Waiting for new connection for thread %d.\n",
1642             freeThrd );
1643
1644    if( (ServerThreadInfo[freeThrd].ActiveSocket =
1645         accept_ew( PassiveSocket, (struct sockaddr*) &client[freeThrd],
1646                    &clientLen, -1 /*SocketTimeoutLength*/) ) == INVALID_SOCKET )
1647    {
1648      logit("et","wave_serverV: error on accept. Exiting\n" );
1649      closesocket_ew( ServerThreadInfo[freeThrd].ActiveSocket,
1650                      SOCKET_CLOSE_IMMEDIATELY_EW );
1651      goto HeavyRestart;
1652    }
1653    strcpy( client_ip[freeThrd], inet_ntoa(client[freeThrd].sin_addr) );
1654
1655    if (Debug > 1)
1656      logit("et", "wave_serverV: Connection accepted from IP address %s\n",
1657                                                                                                                                client_ip[freeThrd] );
1658
1659    /* Start the Server thread
1660     **************************/
1661    if (Debug > 1)
1662      logit("","Starting ServerThread %d \n",freeThrd);
1663
1664    if ( StartThreadWithArg(  ServerThread, &freeThrd, (unsigned)THREAD_STACK, &tidSockSrv[freeThrd] )  == -1 )
1665    {
1666      logit( "e", "wave_serverV: Error starting  SocketSender thread. Exiting.\n" );
1667      closesocket_ew( ServerThreadInfo[freeThrd].ActiveSocket,
1668                      SOCKET_CLOSE_IMMEDIATELY_EW );
1669      goto HeavyRestart;
1670    }
1671    /* Wait for it to wake up */
1672    /**************************/
1673    /* otherwise we'll see it as idle on the next pass through this loop */
1674    if (Debug > 1)
1675      logit("","ServerMgr waiting for thread %d to come alive\n",freeThrd);
1676
1677
1678    { /* Begin of block to check for thread start status */
1679      time_t WaitForThreadStartTime, WaitForThreadCurrTime;
1680      time(&WaitForThreadStartTime);
1681      while( ServerThreadInfo[freeThrd].Status == SERVER_THREAD_IDLE
1682             && (WaitForThreadStartTime + ACCEPTABLE_TIME_TO_WAIT_FOR_SERVER_THREAD_TO_START > time(&WaitForThreadCurrTime)) )
1683      {
1684        sleep_ew(1);
1685      }
1686      if(WaitForThreadStartTime + ACCEPTABLE_TIME_TO_WAIT_FOR_SERVER_THREAD_TO_START
1687         < time(&WaitForThreadCurrTime))
1688      {
1689        logit("et","Thread %d never went to busy.  Marking thread as dead, and continuing.\n",freeThrd);
1690        ServerThreadInfo[freeThrd].Status = SERVER_THREAD_ERROR;
1691        /* We now have this confused thread running around out there,
1692           detached from normal life.  We should probably do something
1693           besides wave goodbye.
1694        */
1695      }
1696    }  /* End of block to check for thread start status */
1697  }
1698  closesocket_ew(PassiveSocket, SOCKET_CLOSE_GRACEFULLY_EW);
1699}                    /*** end of ServerMgr thread ***/
1700
1701/*---------------------------------------------------------------------------*/
1702
1703
1704/************************** IndexMgr Thread *******************************
1705Periodically writes the current TANK structures, and memory indexes to disk.
1706******************************************************************************/
1707thr_ret IndexMgr( void *dummy )
1708{
1709
1710
1711  TANKList * pMyTANKList;
1712  unsigned int i;
1713  time_t CurrentTime, LastTANKListWrite, LastTime, LastQueueReport;
1714  time_t * pFirstTimeStamp, * pSecondTimeStamp;
1715  unsigned int TANKsSize,TANKListBufferSize;
1716  char * pTANKsInBuffer, *pTANKListBuffer;
1717  int * pNumOfTANKs;
1718
1719  /* Create a buffer to store our personal snapshot of the
1720     tank buffers.  The idea is to frequently take snapshots
1721     of the TANKs and save them to disk without slowing down
1722     the main threads tank update mechanism
1723  */
1724  if((pMyTANKList=CreatePersonalTANKBuffer(pConfigTankList)) == NULL)
1725  {
1726    logit("et","wave_serverV:IndexMgr():  failed to create tank buffer.  Exiting\n");
1727    exit(-1);
1728  }
1729
1730  RequestSpecificMutex(&IndexTerminationMutex);
1731
1732  /* Before starting, write a current copy of the TANKs to
1733     the TANKs file.
1734  */
1735  time(&LastTANKListWrite);
1736  /* Note: GetTANKSs uses tank mutexes here (last argument is TRUE). If
1737   * we get the terminate flag in the middle of this call to GetTANKS,
1738   * it is possible that a Server Thread could get killed holding a
1739   * tank mutex. This would block GetTANKS forever until wave_serverV gets
1740   * killed by startstop. PNL 1/10/00 */
1741  if(GetTANKs(Tanks,pMyTANKList->pFirstTS,pMyTANKList->NumOfTS,TRUE))
1742  {
1743    logit("et","wave_serverV:IndexMgr():  failed to get copy of tank structures.  Exiting\n");
1744    exit(-1);
1745  }
1746
1747  TANKListBufferSize = sizeof(time_t) + sizeof(int)
1748    + (pMyTANKList->NumOfTS * sizeof(TANK))
1749    + sizeof(time_t);
1750  if((pTANKListBuffer=malloc(TANKListBufferSize)) == NULL)
1751  {
1752    logit("et","wave_serverV:IndexMgr():  failed to malloc tank list buffer.  Exiting\n");
1753    exit(-1);
1754  }
1755
1756  /* Write the number of tanks to the correct position in the buffer.
1757     The buffer should look like:
1758     TimeStamp1 NumOfTanks <TANKs> TimeStamp2
1759  */
1760  pNumOfTANKs=(int *)(pTANKListBuffer + sizeof(time_t));
1761  *pNumOfTANKs=pMyTANKList->NumOfTS;
1762
1763  /* Create Shortcuts to the TimeStamp and TANKs locations */
1764  pFirstTimeStamp=(time_t *)pTANKListBuffer;
1765  pTANKsInBuffer=pTANKListBuffer+sizeof(time_t)+sizeof(int);
1766  pSecondTimeStamp=(time_t *)(pTANKListBuffer+TANKListBufferSize-sizeof(time_t));
1767  TANKsSize=pMyTANKList->NumOfTS * sizeof(TANK);
1768
1769  /*WriteTANKList()*/
1770  time(pFirstTimeStamp);
1771  memcpy(pTANKsInBuffer,pMyTANKList->pFirstTS,TANKsSize);
1772  memcpy(pSecondTimeStamp, pFirstTimeStamp, sizeof(time_t));
1773  if(WriteRFile(pMyTANKList->pTSFile,0,TANKListBufferSize,pTANKListBuffer,1) != 1)
1774  {
1775    logit("et","wave_serverV:IndexMgr():  failed to write tank list to file.  Exiting\n");
1776    exit(-1);
1777  }
1778
1779  CurrentTime=LastTANKListWrite;
1780
1781  {
1782    LastQueueReport=CurrentTime;
1783
1784    if (Debug > 0)
1785       logit("et","Queue Report: High Mark %d, Low Mark %d\n",
1786                                                                QueueHighWaterMark,QueueLowWaterMark);
1787
1788    QueueLowWaterMark=QueueHighWaterMark;
1789    QueueHighWaterMark=0;
1790  }
1791
1792  while(!terminate)
1793  {
1794    LastTime=CurrentTime;
1795
1796    time(&CurrentTime);
1797    if(CurrentTime-LastQueueReport >= QueueReportInterval)
1798    {
1799      LastQueueReport=CurrentTime;
1800
1801      if (Debug > 0)
1802          logit("et","Queue Report: High Mark %d, Low Mark %d\n",
1803                                                                        QueueHighWaterMark,QueueLowWaterMark);
1804
1805      QueueLowWaterMark=QueueHighWaterMark;
1806      QueueHighWaterMark=0;
1807
1808      /* Do A Server Thread Status Report */
1809      if (Debug > 0)
1810          ReportServerThreadStatus();
1811    }
1812    if(LastTime==CurrentTime)
1813    {
1814      sleep_ew(1000);
1815      continue;
1816    }
1817    /* else */
1818    if(CurrentTime-TankStructUpdate >= LastTANKListWrite)
1819    {
1820      LastTANKListWrite=CurrentTime;
1821      if(GetTANKs(Tanks,pMyTANKList->pFirstTS,pMyTANKList->NumOfTS,TRUE))
1822      {
1823        logit("et","wave_serverV:IndexMgr():  failed to get copy of tank structures.  Exiting\n");
1824        exit(-1);
1825      }
1826
1827      /*WriteTANKList()*/
1828      *pFirstTimeStamp=CurrentTime;
1829      memcpy(pTANKsInBuffer,pMyTANKList->pFirstTS,TANKsSize);
1830      *pSecondTimeStamp=*pFirstTimeStamp;
1831      if(WriteRFile(pMyTANKList->pTSFile,0,TANKListBufferSize,pTANKListBuffer,1) != 1)
1832      {
1833        logit("et","wave_serverV:IndexMgr():  failed to write tank list to file.  Exiting\n");
1834        exit(-1);
1835      }
1836    }
1837
1838    for(i=0;i<(pMyTANKList->NumOfTS);i++)
1839    {
1840      if(CurrentTime-IndexUpdate >= Tanks[i].lastIndexWrite)
1841      {
1842        if(WriteLIndex(&(Tanks[i]),1,CurrentTime))
1843        {
1844          logit("t","WriteLIndex() failed for %s\n", Tanks[i].tankName);
1845        }
1846      }
1847      if(terminate) break;
1848    }  /* End for loop through all tank indexes */
1849
1850  }  /* End: while !terminated */
1851
1852  /* We got terminated.  Save the women, children,
1853     and indexes first, and then save the TANKList file
1854  */
1855
1856  if (Debug > 1)
1857     logit("t","IndexMgr: Got terminate flag!\n");
1858
1859  time(&CurrentTime);
1860  /* Saving indexes
1861   ********************/
1862  for(i=0;i<(pMyTANKList->NumOfTS);i++)
1863  {
1864    if (Debug > 1)
1865       logit("t","IndexMgr: Writing Tank %s!\n",Tanks[i].tankName);
1866
1867    WriteLIndex(&(Tanks[i]),1,CurrentTime);
1868  }
1869
1870  /* Getting and Saving TANKList.  Copy it into
1871     the Tank list buffer, so that we can execute a single write..
1872     Do NOT use mutexes.
1873  ********************/
1874  if (Debug > 2)
1875    logit("t","IndexMgr: Getting Tanks!\n");
1876
1877  GetTANKs(Tanks,pMyTANKList->pFirstTS,pMyTANKList->NumOfTS,FALSE);
1878  /*WriteTANKList()*/
1879
1880  time(pFirstTimeStamp);
1881  memcpy(pTANKsInBuffer,pMyTANKList->pFirstTS,TANKsSize);
1882  memcpy(pSecondTimeStamp, pFirstTimeStamp, sizeof(time_t));
1883
1884  if (Debug > 2)
1885     logit("t","IndexMgr: Writing TANKlist to file!\n");
1886
1887  WriteRFile(pMyTANKList->pTSFile,0,TANKListBufferSize,pTANKListBuffer,1);
1888
1889  /* HandleAllFinishingTouches
1890   **********************/
1891  if (Debug > 2)
1892     logit("t","IndexMgr: Cleaning up!\n");
1893
1894  CleanupIndexThread(pMyTANKList);
1895
1896  /* Allow the main thread to proceed
1897   **********************/
1898   if (Debug > 2)
1899     logit("t","IndexMgr: Releasing IndexTerminationMutex!\n");
1900
1901  ReleaseSpecificMutex(&IndexTerminationMutex);
1902}                    /*** end of IndexMgr thread ***/
1903/*---------------------------------------------------------------------------*/
1904
1905#define NCOMMAND 13
1906/***********************************************************************
1907 * wave_serverV_config()  processes command file using kom.c functions  *
1908 *                       exits if any errors are encountered           *
1909 ***********************************************************************/
1910void wave_serverV_config(char *configfile)
1911{
1912  int      ncommand = NCOMMAND; /* # of required commands to process   */
1913  char     init[NCOMMAND]; /* init flags, one for each required command */
1914  int      nmiss;      /* number of required commands that were missed   */
1915  char    *comm;
1916  char    *str;
1917  int      nfiles;
1918  int      success;
1919  int      i;
1920  char     TankStructFile[FILENAMELEN],TankStructFile2[FILENAMELEN];
1921  int      RedundantTankStructFiles=0;
1922  int      RedundantIndexFiles=0;
1923
1924
1925  /* Initialization */
1926  TankStructFile[0]=0;
1927  TankStructFile2[0]=0;
1928
1929  SecondsBetweenQueueErrorReports=60;  /* default value of 60 sec. */
1930
1931  /* Set to zero one init flag for each required command
1932   *****************************************************/
1933  for( i=0; i<ncommand; i++ )  init[i] = 0;
1934
1935  /* Open the main configuration file
1936   **********************************/
1937  nfiles = k_open( configfile );
1938  if ( nfiles == 0 ) {
1939    logit ("e",
1940             "wave_serverV: Error opening command file <%s>; exiting!\n",
1941             configfile );
1942    exit( -1 );
1943  }
1944
1945  /* Process all command files
1946   ***************************/
1947  while(nfiles > 0)   /* While there are command files open */
1948  {
1949    while(k_rd())        /* Read next line from active file  */
1950    {
1951      comm = k_str();         /* Get the first token from line */
1952
1953      /* Ignore blank lines & comments
1954       *******************************/
1955      if( !comm )           continue;
1956      if( comm[0] == '#' )  continue;
1957
1958      /* Open a nested configuration file
1959       **********************************/
1960      if( comm[0] == '@' )
1961      {
1962        success = nfiles+1;
1963        nfiles  = k_open(&comm[1]);
1964        if ( nfiles != success ) {
1965          logit ("e",
1966                   "wave_serverV: Error opening command file <%s>; exiting!\n",
1967                   &comm[1] );
1968          exit( -1 );
1969        }
1970        continue;
1971      }
1972
1973      /* Process anything else as a command
1974       ************************************/
1975      /* Read the transport ring name
1976       ******************************/
1977      /*0*/    if( k_its( "RingName" ) )
1978      {
1979        if ( (str=k_str()) ) {
1980          strncpy(RingName,str,20);
1981          if ( (RingKey = GetKey(str)) == -1 ) {
1982            logit ("e",
1983                     "wave_serverV: Invalid ring name <%s>; exiting!\n",
1984                     str );
1985            exit( -1 );
1986          }
1987        }
1988        init[0] = 1;
1989      }
1990      /* Read the log file switch
1991       **************************/
1992      /*1*/    else if( k_its( "LogFile" ) )
1993      {
1994        LogSwitch = k_int();
1995        init[1] = 1;
1996      }
1997      /* Read wave_server's module id
1998       ******************************/
1999      /*2*/    else if( k_its( "MyModuleId" ) )
2000      {
2001        if ( (str=k_str()) ) {
2002          if ( GetModId( str, &MyModId ) != 0 ) {
2003            logit ("e",
2004                     "wave_serverV: Invalid module name <%s>; exiting!\n",
2005                     str );
2006            exit( -1 );
2007          }
2008        }
2009        init[2] = 1;
2010      }
2011
2012      /* Read the port for requests and replies
2013       ****************************************/
2014      /*3*/    else if( k_its( "ServerPort" ) )
2015      {
2016        ServerPort = k_int();
2017        init[3] = 1;
2018      }
2019      /* Read the Internet address of wave_server machine
2020       ***************************************************/
2021      /*4*/    else if( k_its( "ServerIPAdr" ) )
2022      {
2023        if ( (str = k_str()))
2024        {
2025          if (strlen(str) < IPADDRLEN)
2026          {
2027            strcpy( ServerIPAdr, str );
2028            init[4] = 1;
2029          }
2030          else
2031          {
2032            logit ("e", "ServerIPAdr name too long, max %d\n", IPADDRLEN-1);
2033            exit( -1 );
2034          }
2035        }
2036
2037      }
2038      /* Read heartbeat interval (seconds)
2039       ***********************************/
2040      /*5*/    else if( k_its("HeartBeatInt") )
2041      {
2042        HeartBeatInt = k_long();
2043        init[5] = 1;
2044      }
2045
2046      /* Read gap threshhold
2047       ***********************************/
2048      /*6*/    else if( k_its("GapThresh") )
2049      {
2050        GapThresh = k_val();
2051        init[6] = 1;
2052      }
2053
2054      /* Read index update duration
2055       **************************************/
2056      /*7*/    else if( k_its("IndexUpdate") )
2057      {
2058        IndexUpdate = k_int();
2059        init[7] = 1;
2060      }
2061
2062      /* Read tank struct update duration
2063       **************************************/
2064      /*8*/    else if( k_its("TankStructUpdate") )
2065      {
2066        TankStructUpdate = k_int();
2067        init[8] = 1;
2068      }
2069
2070      /* Maximum number of messages in outgoing circular buffer
2071       ********************************************************/
2072      /*9*/   else if( k_its("InputQueueLen") )
2073      {
2074        InputQueueLen = k_long();
2075        init[9] = 1;
2076      }
2077
2078      /* Timeout in milliseconds for IP Socket routines
2079       ***********************************/
2080      /*10*/   else if(k_its("SocketTimeout") )
2081      {
2082        SocketTimeoutLength = k_int();
2083        init[10]=1;
2084      }
2085
2086      /* Location of TankStruct File
2087       ***********************************/
2088      /*11*/   else if(k_its("TankStructFile") )
2089      {
2090        if(str=k_str())
2091        {
2092          if (strlen(str) < FILENAMELEN)
2093          {
2094            strcpy(TankStructFile, str);
2095            init[11]=1;
2096          }
2097          else
2098          {
2099            logit ("e", "wave_serverV: TankStructFile name too long, max is %d\n",
2100                    FILENAMELEN-1);
2101            exit(-1);
2102          }
2103        }
2104        else
2105        {
2106          logit ("e", "wave_serverV: TankStructFile name missing\n");
2107          exit( -1 );
2108        }
2109      }
2110
2111      /* Read Tank descriptor lines
2112       ****************************/
2113      /*12*/    else if( k_its("Tank") )
2114      {
2115        if( nTanks == 0)
2116        {
2117          Tanks=(TANK *)malloc(MAX_TANKS * sizeof(TANK));
2118          memset(Tanks,0,MAX_TANKS * sizeof(TANK));
2119        }
2120
2121        if( nTanks >= MAX_TANKS)
2122        {
2123          logit ("e", "wave_serverV: More than %d Tank descriptor lines. Exit\n",MAX_TANKS);
2124          exit(-1);
2125          logit ("e", "wave_serverV: Too many tanks in config file.  Maximum of %d Tanks permitted. Exit\n"
2126                ,MAX_TANKS);
2127          exit(-1);
2128
2129        }
2130        if(str=k_str()) strncpy(Tanks[nTanks].sta, str, TRACE2_STA_LEN);
2131        else    {
2132          logit ("e", "wave_serverV: bad station name\n");
2133          exit(-1);
2134        }
2135        if(str=k_str()) strncpy(Tanks[nTanks].chan, str, TRACE2_CHAN_LEN);
2136        else    {
2137          logit ("e", "wave_serverV: bad component name\n");
2138          exit(-1);
2139        }
2140        if(str=k_str()) strncpy(Tanks[nTanks].net, str, TRACE2_NET_LEN);
2141        else    {
2142          logit ("e", "wave_serverV: bad network name\n");
2143          exit(-1);
2144        }
2145        if(str=k_str()) strncpy(Tanks[nTanks].loc, str, TRACE2_LOC_LEN);
2146        else    {
2147          logit ("e", "wave_serverV: bad location name\n");
2148          exit(-1);
2149        }
2150
2151        Tanks[nTanks].recSize = k_int(); /* we'll chop the tank into such slots */
2152        if ( Tanks[nTanks].recSize % 4 != 0 ) {
2153          logit ("e", "wave_serverV: record size for <%s><%s><%s><%s> not a multiple of 4\n",
2154                 Tanks[nTanks].sta, Tanks[nTanks].chan, Tanks[nTanks].net,
2155                 Tanks[nTanks].loc);
2156          exit(-1);
2157        }
2158        if ( Tanks[nTanks].recSize > MAX_TRACEBUF_SIZ ) {
2159          logit ("e", "wave_serverV: record size for <%s><%s><%s><%s> is larger than MAX_TRACEBUF_SIZ <%d>\n",
2160                 Tanks[nTanks].sta, Tanks[nTanks].chan, Tanks[nTanks].net,
2161                 Tanks[nTanks].loc, MAX_TRACEBUF_SIZ );
2162          exit(-1);
2163        }
2164
2165        if( (str=k_str()) )
2166        {
2167          if( GetInst( str, &Tanks[nTanks].logo.instid ) != 0 )
2168          {
2169            logit ("e", "wave_serverV: Invalid installation name <%s>", str );
2170            exit( -1 );
2171          }
2172        }
2173        if( (str=k_str()) )
2174        {
2175          if( GetModId( str, &Tanks[nTanks].logo.mod ) != 0 )
2176          {
2177            logit ("e", "wave_serverV: Invalid module name <%s>", str );
2178            logit ("e", " in <GetWavesFrom> cmd; exiting!\n" ); exit( -1 );
2179          }
2180        }
2181
2182        if( GetType( "TYPE_TRACEBUF2", &Tanks[nTanks].logo.type ) != 0 )
2183        {
2184          logit ("e", "wave_serverV: PANIC: TYPE_TRACEBUF2 not recognized\n" );
2185          exit( -1 );
2186        }
2187
2188        Tanks[nTanks].tankSize = k_int()*1000000 ; /* user gives us tank size in megabytes */
2189        if(Tanks[nTanks].tankSize > MAX_TANK_SIZE)
2190        {
2191          if(bTruncateTanksTo1GBAndContinue)
2192          {
2193            logit("e", "wave_serverV: WARNING:  Tanksize for (%s:%s:%s:%s) is too large: %d bytes.\n"
2194                       "Tank truncated to maximum allowable size: %d bytes.\n",
2195                  Tanks[nTanks].tankSize,
2196                  Tanks[nTanks].sta, Tanks[nTanks].chan, Tanks[nTanks].net, Tanks[nTanks].loc,
2197                  MAX_TANK_SIZE);
2198
2199             /* limit tank size to largest multiple of recSize that is smaller than MAX_TANK_SIZE */
2200             Tanks[nTanks].tankSize =   (MAX_TANK_SIZE / Tanks[nTanks].recSize)
2201                                      * Tanks[nTanks].recSize ;
2202          }
2203          else
2204          {
2205            logit("et", "wave_serverV: ERROR:  Tanksize for (%s:%s:%s:%s) is too large: %d bytes.\n"
2206                       "Tanks must be <=  %d bytes.  EXITING!\n",
2207                  Tanks[nTanks].tankSize,
2208                  Tanks[nTanks].sta, Tanks[nTanks].chan, Tanks[nTanks].net, Tanks[nTanks].loc,
2209                  MAX_TANK_SIZE);
2210            exit(-1);
2211          }
2212        }
2213        else {
2214             /* RL set tanksize to multiple of recsize to avoid ReadBlockData errors */
2215             Tanks[nTanks].tankSize = (Tanks[nTanks].tankSize / Tanks[nTanks].recSize) * Tanks[nTanks].recSize;
2216        }
2217
2218        Tanks[nTanks].indxMaxChnks = k_int(); /*number of segments, not byte count */
2219        Tanks[nTanks].nRec = Tanks[nTanks].tankSize / Tanks[nTanks].recSize;  /* how many records can we store */
2220        Tanks[nTanks].tankSize = Tanks[nTanks].recSize * Tanks[nTanks].nRec;
2221        Tanks[nTanks].datatype[0]=0;
2222
2223
2224        /* When do we open FILE Ptrs, maybe in ConfigTank() */
2225
2226        if(str=k_str())
2227        {
2228          if (strlen(str) < MAX_TANK_NAME)
2229          {
2230            strncpy(Tanks[nTanks].tankName, str,MAX_TANK_NAME);
2231          }
2232          else
2233          {
2234            logit ("e", "wave_serverV: tank name too long; max is %d\n",
2235                    MAX_TANK_NAME-1);
2236            exit( -1 );
2237          }
2238        }
2239        else
2240        {
2241          logit ("e", "wave_serverV: bad tank name\n");
2242          exit(-1);
2243        }
2244
2245        init[12] = 1;
2246        nTanks++;
2247      }
2248
2249      /* Read debug flag - optional
2250       ****************************/
2251      else if( k_its("Debug") ){
2252        Debug =  k_int();
2253      }
2254
2255      /* Optional cmd: ClientTimeout */
2256      else if(k_its("ClientTimeout") ) {
2257        ClientTimeout = k_int();
2258        if (ClientTimeout <= 0 && ClientTimeout != -1)
2259        {
2260          logit ("e", "wave_serverV: bad ClientTimeout value <%d>, using -1\n",
2261                  ClientTimeout);
2262          ClientTimeout = -1;
2263        }
2264      }
2265
2266      /* Optional cmd: Turn on socket debug logging -> BIG LOG FILES!!
2267       ***********************************/
2268      else if(k_its("SocketDebug") ) {
2269        SOCKET_ewDebug = k_int();
2270      }
2271
2272      /* Optional cmd: RedundantTankStructFiles
2273         Should we use redundant files for saving Tank Structs
2274      ***********************************/
2275      else if(k_its("RedundantTankStructFiles") )
2276      {
2277        RedundantTankStructFiles=k_int();
2278      }
2279
2280      /* Optional cmd: RedundantIndexFiles
2281         Should we use redundant files for saving Indexes
2282      ***********************************/
2283      else if(k_its("RedundantIndexFiles") )
2284      {
2285        RedundantIndexFiles=k_int();
2286      }
2287
2288
2289      /* Optional:  Location of second TankStruct File
2290         for use only with redundant tank struct files
2291      ***********************************/
2292      else if(k_its("TankStructFile2") )
2293      {
2294        if(str=k_str())
2295        {
2296          if (strlen(str) < FILENAMELEN)
2297          {
2298            strcpy(TankStructFile2, str);
2299          }
2300          else
2301          {
2302            logit ("e", "wave_serverV: TankStructFile2 name too long, max is %d\n",
2303                    FILENAMELEN-1);
2304            exit( -1 );
2305          }
2306        }
2307        else
2308        {
2309          logit ("e", "wave_serverV: bad TankStructFile2 name\n");
2310        }
2311      }
2312
2313      /* Optional cmd: PleaseContinue
2314         Should we continue on Tank failures,
2315         providing that some tanks are still functioning
2316         (Do your best)
2317      ***********************************/
2318      else if(k_its("PleaseContinue") )
2319      {
2320        PleaseContinue=k_int();
2321      }
2322
2323      /* Optional cmd: MaxMsgSize
2324         In case trace_buf messages could be larger than any in this config file
2325      ********************************/
2326      else if (k_its("MaxMsgSize") )
2327      {
2328        MaxMsgSiz=k_int();
2329      }
2330
2331      /* Optional cmd: ReCreateBadTanks
2332         In case the operator desires to have bad tanks re-created
2333         from scratch.
2334      ********************************/
2335      else if (k_its("ReCreateBadTanks") )
2336      {
2337        ReCreateBadTanks=k_int();
2338      }
2339
2340
2341      /* Optional cmd: SecondsBetweenQueueErrorReports
2342         To set the minimum period of time between error reports
2343         to statmgr due to the internal message queue being lapped,
2344         and thus messages being lost.
2345      ********************************/
2346      else if (k_its("SecondsBetweenQueueErrorReports") )
2347      {
2348        SecondsBetweenQueueErrorReports=k_int();
2349      }
2350
2351
2352      /* Optional cmd: MaxServerThreads
2353         To set the maximum number of server threads to be created
2354         to handle client requests.
2355      ********************************/
2356      else if (k_its("MaxServerThreads") )
2357      {
2358        MaxServerThreads=k_int();
2359      }
2360
2361
2362      /* Optional cmd: QueueReportInterval
2363         To set the minimum number of seconds between
2364         reports on internal queue high and low water marks.
2365      ********************************/
2366      else if (k_its("QueueReportInterval") )
2367      {
2368        QueueReportInterval=k_int();
2369      }
2370
2371
2372      /* Optional cmd: RedundantIndexFiles
2373         Should we use redundant files for saving Indexes
2374      ***********************************/
2375      else if(k_its("AbortOnSingleTankFailure") )
2376      {
2377        bAbortOnSingleTankFailure=k_int();
2378      }
2379
2380      /* Optional cmd: RedundantIndexFiles
2381         Should we use redundant files for saving Indexes
2382      ***********************************/
2383      else if(k_its("TruncateTanksTo1GBAndContinue") )
2384      {
2385        bTruncateTanksTo1GBAndContinue=TRUE;
2386      }
2387
2388      /* Command is not recognized
2389       ****************************/
2390      else
2391      {
2392        logit ("e", "wave_serverV: <%s> unknown command in <%s>.\n",
2393                comm, configfile );
2394        continue;
2395      }
2396
2397
2398
2399      /* See if there were any errors processing the command
2400       *****************************************************/
2401      if( k_err() ) {
2402        logit ("e",
2403                 "wave_serverV: Bad <%s> command in <%s>; exiting!\n",
2404                 comm, configfile );
2405        exit( -1 );
2406      }
2407    }
2408    nfiles = k_close();
2409  }
2410  /* After all files are closed, check init flags for missed commands
2411   ******************************************************************/
2412  nmiss = 0;
2413  for ( i=0; i < ncommand; i++ )  if( !init[i] ) nmiss++;
2414  if ( nmiss ) {
2415    logit ("e", "wave_serverV: ERROR, no " );
2416    if ( !init[0] )  logit ("e", "<RingName> "     );
2417    if ( !init[1] )  logit ("e", "<LogFile> "      );
2418    if ( !init[2] )  logit ("e", "<MyModuleId> "   );
2419    if ( !init[3] )  logit ("e", "<ServerPort> "   );
2420    if ( !init[4] )  logit ("e", "<ServerIPAdr> "  );
2421    if ( !init[5] )  logit ("e", "<HeartBeatInt> " );
2422    if ( !init[6] )  logit ("e", "<GapThresh> "    );
2423    if ( !init[7] )  logit ("e", "<IndexUpdate> "  );
2424    if ( !init[8] )  logit ("e", "<TankStructUpdate> "   );
2425    if ( !init[9] )  logit ("e", "<InputQueueLen> "      );
2426    if ( !init[10])  logit ("e", "<SocketTimeoutLength> ");
2427    if ( !init[11])  logit ("e", "<TankStructFile> "     );
2428    if ( !init[12])  logit ("e", "<Tank> "               );
2429    logit ("e", "command(s) in <%s>; exiting!\n", configfile );
2430    exit( -1 );
2431  }
2432
2433  /* Real quick: define duties of config relative to indexing:
2434     read in the config parameters.
2435     When we find the number of tanks, malloc() a list for the tanks, and
2436     then as the tanks are read in, add each tank to the list.  Call
2437     InitTankList() to create a TANKFilePtr for the list, to copy the location
2438     of the tank array, to indicate redundant index or TANKstruct files, to set
2439     the number of tanks.
2440
2441     We will set the file ptrs for each tank index when we config each tank
2442  */
2443
2444  if(InitTankList(&pConfigTankList,Tanks,
2445                  RedundantTankStructFiles,
2446                  RedundantIndexFiles,nTanks,
2447                  TankStructFile,TankStructFile2) )
2448  {
2449    /* This is not good!!, we could not init the list.
2450       Quit and go home */
2451    logit ("e", "ERROR:  InitTankList failed!!!\nFailure Terminal, Exiting\n");
2452    exit(-1);
2453  }
2454
2455  /* At this point, we have a working configured tank list, generated from
2456     the config file, with the exception that no files are open. DK
2457  */
2458
2459  return;
2460}
2461/*---------------------------------------------------------------------------*/
2462/***************************************************************************
2463 *  wave_serverV_lookup( )  Look up important info from earthworm.h tables  *
2464 ***************************************************************************/
2465void wave_serverV_lookup( void )
2466{
2467  if ( GetLocalInst( &InstId ) != 0 ) {
2468    fprintf( stderr,
2469             "wave_serverV: error getting local installation id; exiting!\n" );
2470    exit( -1 );
2471  }
2472  if( GetType( "TYPE_HEARTBEAT", &TypeHeartBeat ) != 0 ) {
2473    fprintf( stderr,
2474             "wave_serverV: Invalid message type <TYPE_HEARTBEAT>; exiting!\n" );
2475    exit( -1 );
2476  }
2477  if( GetType( "TYPE_ERROR", &TypeError ) != 0 ) {
2478    fprintf( stderr,
2479             "wave_serverV: Invalid message type <TYPE_ERROR>; exiting!\n" );
2480    exit( -1 );
2481  }
2482  if( GetType( "TYPE_TRACEBUF2", &TypeWaveform ) != 0 ) {
2483    fprintf( stderr,
2484             "wave_serverV: Invalid message type <TYPE_TRACEBUF2>; exiting!\n" );
2485    exit( -1 );
2486  }
2487  return;
2488}
2489/*---------------------------------------------------------------------------*/
2490
2491/******************************************************************************
2492 * wave_serverV_status() builds a heartbeat or error message & puts it into    *
2493 *                     shared memory.  Writes errors to log file & screen.    *
2494 ******************************************************************************/
2495void wave_serverV_status( unsigned char type, short ierr, char *note )
2496{
2497  MSG_LOGO    logo;
2498  char        msg[256];
2499  long        size;
2500  time_t      t;
2501
2502  /* Build the message
2503   *******************/
2504  logo.instid = InstId;
2505  logo.mod    = MyModId;
2506  logo.type   = type;
2507
2508  time( &t );
2509
2510  if( type == TypeHeartBeat )
2511  {
2512    sprintf( msg, "%ld %ld\n\0", (long) t, (long) myPid);
2513  }
2514  else if( type == TypeError )
2515  {
2516    sprintf( msg, "%ld %hd %s\n\0", (long) t, ierr, note);
2517    logit( "et", "wave_serverV: %s\n", note );
2518  }
2519
2520  size = strlen( msg );   /* don't include the null byte in the message */
2521
2522  /* Write the message to shared memory
2523   ************************************/
2524  if( tport_putmsg( &Region, &logo, size, msg ) != PUT_OK )
2525  {
2526    if( type == TypeHeartBeat ) {
2527      logit("et","wave_serverV:  Error sending heartbeat.\n" );
2528    }
2529    else if( type == TypeError ) {
2530      logit("et","wave_serverV:  Error sending error:%d.\n", ierr );
2531    }
2532  }
2533
2534  return;
2535}
2536
2537
2538/*-----------------------------------------------------------------------*/
2539/***********************************************************************
2540 * UpdateIndex() Updates the memmory index list with a new sample of   *
2541 *  trace data.                                                        *
2542 ***********************************************************************/
2543int UpdateIndex(TANK * pTSPtr, TRACE2_HEADER * pCurrentRecord,
2544                unsigned int CurrentOffset, int CheckOverwrite)
2545{
2546  /*
2547     Purpose:
2548     Updates the memmory index list with a new sample of trace data.
2549
2550     Arguments:
2551         pTSPtr: pointer to the TANK structure we are dealing with
2552         pCurrentRecord: pointer to the trace record that was just
2553            copied into the tank
2554         CurrentOffset: the current offset to the tank file, updated after
2555            the current record was inserted.
2556         CheckOverwrite: flag to update the index for the old record that
2557            was overwritten by the current record. This check is needed
2558            when wave_serverV is running; unneccesary when UpdateIndex is
2559            called during wave_serverV startup.
2560
2561     Return Values:
2562     0: Successful update.
2563     -1: Error occured during update.
2564
2565     Functions Called:
2566     (not analyzed)
2567
2568     Additional Remarks:
2569     Apologia: we resort to making a special case of the first write
2570     (and the first pass)to keep the initialization logic from becoming
2571     too baroque and hard to debug. We gotta ship this
2572  */
2573
2574  char scrMsg[MAX_TRACEBUF_SIZ];
2575  static char * MyFuncName = "UpdateIndex()";
2576  int RetVal;
2577  DATA_CHUNK *dc;
2578
2579
2580  if(Debug > 2)
2581  {
2582    logit("t","Entering %s\n",MyFuncName);
2583    logit("et", "S %s,C %s,N %s,L %s,StartTime %f,EndTime %f,CurrentOffset %u\n",
2584          pCurrentRecord->sta,pCurrentRecord->chan,pCurrentRecord->net,
2585          pCurrentRecord->loc,
2586          pCurrentRecord->starttime,pCurrentRecord->endtime,CurrentOffset
2587          );
2588  }
2589
2590  /* Special case: first write ever
2591   ********************************/
2592  if ( pTSPtr->firstWrite == 1 )
2593  {
2594    /* CHECK:  This assumes that initialization of the index sets
2595       IndexYoungest, and probably IndexOldest, to valid places, with
2596       memory allocated at those places  DK
2597    */
2598    if(Debug > 2)
2599      logit("et","UpdateIndex: First Write\n");
2600
2601    dc=IndexYoungest(pTSPtr);
2602    dc->tStart = pCurrentRecord->starttime;
2603    dc->tEnd   = pCurrentRecord->endtime;
2604
2605    /* First write is always at beginning */
2606    IndexYoungest(pTSPtr)->offset = 0;
2607
2608    /* Move the data type, pin number, and sampling rate */
2609    strncpy(pTSPtr->datatype,  pCurrentRecord->datatype,  3);
2610    pTSPtr->samprate   = pCurrentRecord->samprate;
2611    pTSPtr->pin   = pCurrentRecord->pinno;
2612
2613    pTSPtr->firstWrite =0; /* no longer the first write */
2614
2615    /* All done. Release this tank */
2616  } /* End if first write */
2617  else  /* Not first write */
2618  {
2619
2620    /* Update index's newest time
2621     *****************************/
2622    /* philosophy: we just wrote a bunch of data points to the tank. If
2623       the oldest of those is  more than one and a half sample intervals
2624       (looks like 1.0 now) younger than the youngest we had before this
2625       write, we declare a gap has occurred. We create a new chunk in the
2626       index. We assume the insertion point has not been updated yet
2627    */
2628
2629    if(Debug > 2)
2630      logit("et","UpdateIndex: Normal Write\n");
2631
2632    /* Is there a Gap
2633     ********************************/
2634    if( IndexYoungest(pTSPtr)->tEnd +
2635        GapThresh*(1.0/(pCurrentRecord->samprate)) <
2636        pCurrentRecord->starttime )
2637    {
2638      /* Add a new index entry
2639       ********************************/
2640      if(Debug > 2)
2641        logit("et","UpdateIndex:  adding index, start,finish: %u,%u\n",
2642              pTSPtr->indxStart,pTSPtr->indxFinish);
2643
2644      if(( RetVal = IndexAdd( pTSPtr, pCurrentRecord->starttime,
2645                              pCurrentRecord->endtime,
2646                              CurrentOffset            )) != 0)
2647      {
2648        /* Davidk 9/8/98 */
2649        /*  We have a problem, either overwrote an index, or
2650            something really serious happened. */
2651        if(RetVal > 0)
2652        {
2653          /* we're out of index entires*/
2654          if(pTSPtr->lappedIndexEntries == 0)
2655          {
2656            sprintf(Text,"Ran out of free indexes.  Overwriting "
2657                    "oldest indexes in tank: %s,%s,%s,%s\n",
2658                    pTSPtr->sta,pTSPtr->chan,pTSPtr->net,pTSPtr->loc);
2659            wave_serverV_status(TypeError, ERR_OVERWRITE_INDEX, Text);
2660          }
2661          pTSPtr->lappedIndexEntries++;
2662          /* goto abortUpdateIndex; */
2663          /* Overwritten index entries is a problem handled
2664             internally by this function.  It provides no
2665             notice to its caller that something has gone
2666             wrong.  This is to prevent adverse logging.
2667             When an index is overwritten, everything still
2668             completes successfuly with the transaction, the
2669             problem is that old data has now been overwritten.
2670             Thus the update did not fail, it just didn't go
2671             as well as planned.  DavidK 9/8/98
2672          */
2673        }
2674        else
2675        {
2676          /* This is bad.  Something internal went kaboom.
2677             Something is corrupted in the tank.  Scream
2678             for help!!!!! and then abort.
2679          **********************************************/
2680          sprintf(Text,"Corruption problems in memory tank structure"
2681                  "for tank: %s,%s,%s,%s\n",
2682                  pTSPtr->sta,pTSPtr->chan,pTSPtr->net,pTSPtr->loc);
2683          wave_serverV_status(TypeError, ERR_TANK_CORRUPTION, Text);
2684          goto abortUpdateIndex;
2685        }
2686      }
2687
2688      else /* IndexAdd() completed successfully */
2689      {
2690        if(pTSPtr->lappedIndexEntries != 0)
2691        {
2692          sprintf(Text,"wave_serverV:  Found Free Index.  No longer overwriting"
2693                  " free indexes.  Overwrote %u "
2694                  "indexes in tank: %s,%s,%s,%s\n",
2695                  pTSPtr->lappedIndexEntries,
2696                  pTSPtr->sta,pTSPtr->chan,pTSPtr->net,pTSPtr->loc);
2697          wave_serverV_status(TypeError, ERR_RECOVER_OVERWRITE_INDEX, Text);
2698          pTSPtr->lappedIndexEntries=0;
2699        }
2700      }
2701
2702      if(Debug > 2)
2703        logit("et","UpdateIndex:  after adding index, start,finish: %u,%u\n",
2704              pTSPtr->indxStart,pTSPtr->indxFinish);
2705
2706    }
2707    else  /* No Gap */
2708      /* if a new chunk was not created by this write, we update the index by re-writing the
2709         end time of the youngest chunk */
2710    {
2711      IndexYoungest(pTSPtr)->tEnd = pCurrentRecord->endtime;
2712      if(Debug > 2)
2713        logit("et","UpdateIndex:  extending index, new endtime %f,%f\n",
2714              pCurrentRecord->endtime,IndexYoungest(pTSPtr)->tEnd);
2715
2716    }
2717
2718    if( pTSPtr->firstPass != 1 && CheckOverwrite)
2719    {
2720      /* Update index's oldest chunk.
2721         Concern is whether we just
2722         wiped out an old chunk
2723      ******************************/
2724
2725      /* for now, brute force: read it out of the file. If that's too
2726         much disc action, we could compute it from the index - somehow,
2727         some day
2728      */
2729
2730      if(Debug > 2)
2731        logit("et","Update Index: Not first pass, doing other junk\n");
2732
2733
2734      /* seek to the current position.  This is neccessary between reads and writes */
2735      if( fseek( pTSPtr->tfp, pTSPtr->inPtOffset, SEEK_SET ) != 0 )
2736      {
2737        logit( "e", "wave_serverV: error on fseek on tank %s; exiting!\n",pTSPtr->tankName );
2738        goto abortUpdateIndex;
2739      }
2740      if( fread( (void *)scrMsg, pTSPtr->recSize, 1, pTSPtr->tfp ) != 1 )
2741      {
2742        logit( "e", "wave_serverV: error on fread tank %s; exiting!\n",pTSPtr->tankName );
2743        goto abortUpdateIndex;
2744      }
2745      /* We peeked ahead into the tank, and picked up the oldest message into scrMsg */
2746
2747      /* If we just wiped out a chunk, get rid of its index entry
2748       ***********************************************************/
2749      if( ((TRACE2_HEADER*)scrMsg)->starttime > IndexOldest(pTSPtr)->tEnd )
2750      {
2751        IndexDel(pTSPtr,IndexOldest(pTSPtr));
2752
2753        if (Debug > 2)
2754          logit("","After deletion:\n");
2755      }
2756      else
2757      {
2758        IndexOldest(pTSPtr)->tStart = ((TRACE2_HEADER*)scrMsg)->starttime;
2759        IndexOldest(pTSPtr)->offset = pTSPtr->inPtOffset;
2760      }
2761    } /* End !FirstPass */
2762  }  /* End else !FirstWrite */
2763
2764  if(Debug > 2)
2765    logit("t","Exiting %s\n",MyFuncName);
2766
2767  return(0);
2768
2769 abortUpdateIndex:
2770  {
2771    if(Debug > 2)
2772      logit("t","Exiting %s\n",MyFuncName);
2773  }
2774  return(-1);
2775}  /* End UpdateIndex */
2776
2777
2778/*****************************************************************************/
2779/* int ReportServerThreadStatus() */
2780/*****************************************************************************/
2781int ReportServerThreadStatus(void)
2782{
2783  int i;
2784  char StatusString[8];
2785
2786  logit("","  Clnts Accp    Clnts Prcsd   Reqs Prcsd    Errors    Status  \n");
2787  logit("","==============================================================\n");
2788  for(i=0;i<MaxServerThreads;i++)
2789  {
2790    switch (ServerThreadInfo[i].Status)
2791    {
2792    case SERVER_THREAD_IDLE:      strcpy(StatusString," IDLE "); break;
2793    case SERVER_THREAD_COMPLETED: strcpy(StatusString,"CMPLTD"); break;
2794    case SERVER_THREAD_ERROR:     strcpy(StatusString," ERROR"); break;
2795    case SERVER_THREAD_WAITING:   strcpy(StatusString,"WTG4CL"); break;
2796
2797    default :
2798      {
2799        if(!memcmp(&(ServerThreadInfo[i]),&(ServerThreadInfoPrevious[i]),
2800                   sizeof(ServerThreadInfoStruct))
2801           )
2802        {
2803          /* We are busy and nothing has changed since the last check.
2804             We are not waiting for the client.
2805             I think the thread is hung!!!!
2806          */
2807          strcpy(StatusString," HUNG ");
2808          logit("et","ServerThread %d appears to be hung in state %d.\n",
2809                i,ServerThreadInfo[i].Status);
2810        }
2811        else
2812        {
2813          strcpy(StatusString," BUSY ");
2814        }
2815      }
2816    } /* End of Switch() */
2817
2818    logit("",
2819          "%2d  %6d         %6d       %6d      %6d    %s\n",i,
2820          ServerThreadInfo[i].ClientsAccepted,
2821          ServerThreadInfo[i].ClientsProcessed,
2822          ServerThreadInfo[i].RequestsProcessed,
2823          ServerThreadInfo[i].Errors,
2824          StatusString);
2825  }  /* end for (each server thread) loop */
2826
2827  /* Copy the current ServerThreadInfo to ServerThreadInfoPrevious. */
2828  memcpy(ServerThreadInfoPrevious,ServerThreadInfo,
2829         MaxServerThreads * sizeof(ServerThreadInfoStruct));
2830
2831  return(0);
2832}  /* ReportServerThreadStatus */
2833
2834TANK * FindSCNL(TANK * Tanks, int nTanks, char * sta, char * chan, char * net,
2835                char * loc)
2836{
2837  TANK TempTank;
2838
2839  /* Use CRTLIB func bsearch() to
2840     find SCN.  Davidk 10/5/98
2841  **********************************/
2842
2843  strcpy(TempTank.sta,sta);
2844  strcpy(TempTank.chan,chan);
2845  strcpy(TempTank.net,net);
2846  strcpy(TempTank.loc,loc);
2847
2848  return((TANK *) bsearch(&TempTank, Tanks, nTanks,
2849                          sizeof(TANK), CompareTankSCNLs));
2850}  /* End FindSCNL() */
2851
2852/**************************************************************
2853 *  signal_hdlr: sets the terminate flag for orderly shutdown *
2854 **************************************************************/
2855void signal_hdlr(int signum)
2856{
2857  terminate = 1;                 /* set flag to terminate program */
2858}
2859
2860
2861int wave_serverV_c_init()
2862{
2863  logit("t","wave_serverV.c:Version %s\n", WSV_VERSION);
2864  return(0);
2865}
2866
2867int IssueIOStatusError(char * szFunction, int iOffset,
2868                       int iRecordSize, char * szTankName)
2869{
2870  sprintf(Text, "%s failed for tank [%s], with offset[%d] "
2871          "and record size[%d]! errno[%d]\nError: %s\n",
2872          szFunction, szTankName, iOffset,
2873          iRecordSize, errno, strerror(errno));
2874
2875  wave_serverV_status( TypeError, ERR_FILE_IO, Text );
2876  return(0);
2877}
2878
2879int TraceBufIsValid(TANK * pTSPtr, TRACE2_HEADER * pCurrentRecord)
2880{
2881    /* check that time goes forward within the tracebuf */
2882    if(pCurrentRecord->starttime > pCurrentRecord->endtime) {
2883        if (Debug > 1)
2884            logit("e", "TraceBufIsValid: TB start time not before end time\n");
2885        return(FALSE);
2886    }
2887
2888    /* check that there are samples in the tracebuf */
2889    if(pCurrentRecord->nsamp <= 0) {
2890        if (Debug > 1)
2891            logit("e", "TraceBufIsValid: no samples\n");
2892        return(FALSE);
2893    }
2894    /* added check for datatype! */
2895    if (strcmp(pCurrentRecord->datatype, "i4") == 0 ||
2896        strcmp(pCurrentRecord->datatype, "i2") == 0 ||
2897        strcmp(pCurrentRecord->datatype, "s2") == 0 ||
2898        strcmp(pCurrentRecord->datatype, "s4") == 0 ) {
2899        /* do nothing here for now, its good */
2900    } else {
2901        return(FALSE);
2902    }
2903
2904    /* check samprate ???? */
2905
2906    /* if the tracebuf internals look OK, and this is the first tracebuf,
2907       then send it on it's merry way. */
2908    if ( pTSPtr->firstWrite == 1 )
2909        return(TRUE);
2910
2911        /* WARNING, any checks after this point might not be hit for first write! */
2912
2913    /* check that time goes forward between the previous tracebuf and this
2914       one */
2915    if(pCurrentRecord->starttime < IndexYoungest(pTSPtr)->tEnd) {
2916        if (Debug > 1)
2917            logit("e", "TraceBufIsValid: packet time not advancing\n");
2918        return(FALSE);
2919    }
2920
2921
2922    /* it passed all the tests, so pass it on. */
2923    return(TRUE);
2924
2925}  /* end TraceBufIsValid() */
Note: See TracBrowser for help on using the repository browser.