Changeset 3164


Ignore:
Timestamp:
11/30/07 13:40:16 (12 years ago)
Author:
paulf
Message:

sqlite stuff added

Location:
trunk/src/archiving/wave_serverV
Files:
2 added
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/archiving/wave_serverV/makefile.nt

    r2513 r3164  
    88#    Revision history: 
    99#     $Log$ 
     10#     Revision 1.4  2007/11/30 18:40:16  paulf 
     11#     sqlite stuff added 
     12# 
    1013#     Revision 1.3  2006/11/21 23:03:09  stefan 
    1114#     GLOBALFLAGS for c compiler from ew_nt.cmd 
     
    2932APP = wave_serverV 
    3033O = $(APP).obj index.obj server_thread.obj serve_trace.obj index_util.obj \ 
    31     compare.obj 
     34    compare.obj tb_packet_db.obj 
    3235 
    3336B = $(EW_HOME)\$(EW_VERSION)\bin 
     
    4144   $L\getutil.obj $L\kom.obj $L\logit_mt.obj $L\sema_ew.obj $L\mem_circ_queue.obj \ 
    4245   $L\sleep_ew.obj $L\socket_ew.obj $L\socket_ew_common.obj $L\swap.obj \ 
     46   $L\sqlite3.obj \ 
    4347   $L\threads_ew.obj $L\time_ew.obj $L\transport.obj  -out:$B\$(APP).exe 
    4448 
  • trunk/src/archiving/wave_serverV/serve_trace.c

    r2883 r3164  
    88 *    Revision history: 
    99 *     $Log$ 
     10 *     Revision 1.15  2007/11/30 18:40:16  paulf 
     11 *     sqlite stuff added 
     12 * 
    1013 *     Revision 1.14  2007/03/28 18:02:34  paulf 
    1114 *     fixed flags for MACOSX and LINUX 
     
    8992#include        "wave_serverV.h" 
    9093#include        "server_thread.h" 
    91  
     94#include                "tb_packet_db.h"                /* ronb041007 - used to read back out-of-sync packets*/ 
     95    
    9296 
    9397#define FLAG_L    0 
     
    129133 
    130134 
     135extern int bUsePacketSyncDb;    /* config flag to use packet db to sync data. ronb011007*/ 
     136extern int bTankOverSyncDbData; /* config buffer precedence. */ 
     137extern TBDB_STORE* pPacketDb;   /* ref to OutOfSync Packet Db. ronb041007*/ 
    131138 
    132139int _writeTraceDataRaw( int, TANK*, CLIENT_MSG* ); 
     
    157164                       double tE, int binSize, int flagNewLine); 
    158165static int SendStrData( int soc, char* str ); 
    159  
     166static int MergeAsyncPackets( 
     167     TANK* t, 
     168        char** data, 
     169        long* datasize, 
     170        char* AsyncData, 
     171        long AsyncDataSize, 
     172        long AsyncPacketCount 
     173); 
    160174 
    161175 
     
    245259  int mutexLocked = 0; 
    246260  long int saveOff = 0;      /* saving offset of file */ 
    247  
     261  long AsyncDataSize; 
     262  long AsyncPacketCount= 0; 
     263  long AsyncPageSize; 
     264  char* AsyncData; 
     265  
    248266  if ( _fDBG == -1 ) 
    249267    ewInitDebugFlag( _pMODULE, &_fDBG ); 
     
    274292    goto abort; 
    275293     
     294   /* Check to see if we have async packets in requested range.*/ 
     295   if(bUsePacketSyncDb) { 
     296        tbdb_get_all_packets(pPacketDb,t->sta, t->net,  
     297                t->chan, t->loc, tS, tE, (void*) &AsyncData, &AsyncDataSize,  
     298                &AsyncPageSize, &AsyncPacketCount); 
     299   } 
     300  
    276301  if ( _fDBG & _fSUB ) 
    277302  { 
     
    315340       ( acttS + DBL_FLOAT_EPSILON <= acttE) ) 
    316341  { 
    317     if ( CopyReqData( t, oS, oE, (char**) &data, &datasize ) == R_FAIL ) 
    318       goto abort; 
     342     if ( (CopyReqData( t, oS, oE, (char**) &data, &datasize ) == R_FAIL) && !AsyncPacketCount ) 
     343       goto abort; 
    319344  } 
    320345     
     346  /* resync packets if we have any*/ 
     347  if(AsyncPacketCount && AsyncData) { 
     348          MergeAsyncPackets(t, &data, &datasize, AsyncData, AsyncDataSize, AsyncPacketCount); 
     349          free(AsyncData); 
     350  } 
    321351  ReleaseSpecificMutex(&(t->mutex)); 
    322352  mutexLocked = 0; 
     
    385415  int mutexLocked = 0; 
    386416  long int saveOff = 0;      /* saving offset of file */ 
     417  long AsyncDataSize; 
     418  long AsyncPacketCount= 0; 
     419  long AsyncPageSize; 
     420  char* AsyncData; 
    387421 
    388422  if ( _fDBG == -1 ) 
     
    397431  if ( tS + DBL_FLOAT_EPSILON >= tE ) goto skip; 
    398432 
    399   if ( _fDBG & _fSUB ) 
     433   if ( _fDBG & _fSUB ) 
    400434    logit( "","\n================================\n" ); 
    401435 
     
    415449    goto abort; 
    416450     
     451  /* Check to see if we have async packets in requested range.*/ 
     452  if(bUsePacketSyncDb) { 
     453        tbdb_get_all_packets(pPacketDb,t->sta, t->net,  
     454                t->chan, t->loc, tS, tE, (void*) &AsyncData, &AsyncDataSize,  
     455                &AsyncPageSize, &AsyncPacketCount); 
     456  } 
     457 
    417458  if ( _fDBG & _fSUB ) 
    418459  { 
     
    447488 
    448489  if ( ( acttS < tE && tS < acttE ) && 
    449        ( acttS + DBL_FLOAT_EPSILON > acttE ) ) 
     490       ( acttS + DBL_FLOAT_EPSILON > acttE ) && !AsyncPacketCount) 
    450491  { 
    451492    logit( "","_writeTraceDataAscii(): warning! acttS[%f] acttE[%f], no data available for request in tank %s\n", 
     
    456497       ( acttS + DBL_FLOAT_EPSILON <= acttE) ) 
    457498  { 
    458     if ( CopyReqData( t, oS, oE, (char**) &data, &datasize ) == R_FAIL ) 
    459       goto abort; 
     499    if ( (CopyReqData( t, oS, oE, (char**) &data, &datasize ) == R_FAIL) && !AsyncPacketCount) 
     500       goto abort; 
    460501  } 
    461502     
     503 
     504  /* resync packets if we have any*/ 
     505  if(AsyncPacketCount && AsyncData) { 
     506          MergeAsyncPackets(t, &data, &datasize, AsyncData, AsyncDataSize, AsyncPacketCount); 
     507          free(AsyncData); 
     508  } 
     509 
    462510  ReleaseSpecificMutex(&(t->mutex)); 
    463511  mutexLocked = 0; 
     
    23762424  return(0); 
    23772425} 
     2426/****************************************************** 
     2427  MergeAsyncPackets - used to resync packets written to  
     2428  db. 
     2429  
     2430*******************************************************/ 
     2431int MergeAsyncPackets( 
     2432     TANK* t, 
     2433        char** data, 
     2434        long* datasize, 
     2435        char* AsyncData, 
     2436        long AsyncDataSize, 
     2437        long AsyncPacketCount 
     2438 ) { 
     2439 
     2440        char* pData; 
     2441        char* pDataEnd; 
     2442        char* pLast; 
     2443        char* pAsync; 
     2444        char* pAsyncEnd; 
     2445        char* pMerge; 
     2446        char* p; 
     2447        long MergeSize; 
     2448        TRACE2_HEADER* pTb; 
     2449        int rc = FALSE; 
     2450 
     2451   /* merge async data if any. It would be better to include this 
     2452   someplace lower for performance reasons but since this should be 
     2453   a rare event so we'll optimize the merge here so all the other code 
     2454   that examines the tank data on the way out include the async packets as well.*/ 
     2455   if(AsyncPacketCount) { 
     2456  
     2457          if(*datasize && *data) { 
     2458  
     2459  
     2460                  logit( "","MergeAsycPackets(): merging async packet data.\n" ); 
     2461  
     2462                  /* reallocate data for combined buffers */ 
     2463                  MergeSize = *datasize + (AsyncPacketCount * t->recSize); 
     2464                  if(p = pMerge = (char*) malloc(MergeSize)) { 
     2465  
     2466                          /* init buf*/ 
     2467                          memset(pMerge, 0, MergeSize); 
     2468 
     2469                          /* walk through tank data*/ 
     2470                          pLast = pData = *data; 
     2471                          pDataEnd = pData + *datasize; 
     2472                          pAsync = AsyncData; 
     2473                          pAsyncEnd = pAsync + AsyncDataSize; 
     2474                          while(pData < pDataEnd && pAsync < pAsyncEnd) { 
     2475                                  /**/ 
     2476                                  pTb = (TRACE2_HEADER*) (pAsync + sizeof(long)); 
     2477                                  if(pTb->starttime < ((TRACE2_HEADER*)pData)->starttime) { 
     2478                                          /*copy any data up to this tank record */ 
     2479                                          if(pData - pLast) { 
     2480                                                  memcpy(p, pLast, pData - pLast); 
     2481                                                  p += pData - pLast; 
     2482                                                  pLast = pData; 
     2483                                          } 
     2484                                          /* copy async data */ 
     2485                                          memcpy(p, pTb, *((long*)pAsync)); 
     2486                                          p += t->recSize; 
     2487                                          pAsync += *((long*)pAsync) + sizeof(long); 
     2488                                  } 
     2489                                  pData += t->recSize; 
     2490                          } 
     2491                          /* write any leftover. Should only be tankdata because the query shouldn't 
     2492                          return any records beyond tE drom db.*/ 
     2493                          if(pDataEnd - pLast) { 
     2494  
     2495                                  memcpy(p, pLast, pDataEnd - pLast);  
     2496                          } 
     2497                          /* free old data*/ 
     2498                          FreeReqData(*data); 
     2499                          /* updata data pointer and size and fall through to old code. */ 
     2500                          *data = pMerge; 
     2501                          *datasize = MergeSize; 
     2502                          rc = TRUE; 
     2503                  } else { 
     2504                          logit( "e","MergeAsyncPackets(): Error reallocating packet buffer for async packet data.\n" ); 
     2505                  } 
     2506  
     2507          } else { 
     2508              
     2509                 /* just async data. Normalize buffer (make sure packets are recsize. 
     2510                 this too should be a rare occasion also but seemed possible. */ 
     2511             MergeSize = AsyncPacketCount * t->recSize; 
     2512                 if(pMerge = (char*) malloc(MergeSize)) { 
     2513  
     2514                         memset(pMerge, 0, MergeSize); 
     2515                         pAsync = AsyncData; 
     2516                         pAsyncEnd = pAsync + AsyncDataSize; 
     2517                         p = pMerge; 
     2518                         while(pAsync < pAsyncEnd) { 
     2519                                memcpy(p, pAsync + sizeof(long), *((long*)pAsync)); 
     2520                                pAsync += *((long*)pAsync) + sizeof(long); 
     2521                                p += t->recSize; 
     2522                         } 
     2523                         *data = pMerge; 
     2524                         *datasize = MergeSize; 
     2525                         rc = TRUE; 
     2526                 } else { 
     2527                          logit( "e","MergeAsyncPackets(): Error allocating packet normalization buffer for async packet data.\n" ); 
     2528                 } 
     2529  
     2530  
     2531          } 
     2532   } 
     2533   return rc;  
     2534 } 
  • trunk/src/archiving/wave_serverV/wave_serverV.c

    r3010 r3164  
    88 *    Revision history: 
    99 *     $Log$ 
     10 *     Revision 1.39  2007/11/30 18:40:16  paulf 
     11 *     sqlite stuff added 
     12 * 
    1013 *     Revision 1.38  2007/05/31 16:11:44  paulf 
    1114 *     minor change to add in version number as #define, and update ver so it is clear from the logs which is running 
     
    471474#include <mem_circ_queue.h> 
    472475#include "wave_serverV.h" 
     476#include "tb_packet_db.h" 
    473477 
    474478/* Function Prototypes 
     
    532536                                             is to complain and exit if a >1GB tank is given.DK 2005/03/17  */ 
    533537 
     538int                                     bUsePacketSyncDb = 0;    /* Set to 1 in config file if we should use tb_packet_db module to  
     539                                                                                     store/retrieve out of sync packets ronb040307 */ 
     540 
     541char                            sPacketSyncDbFile[TB_MAX_DB_NAME];      /* name of db file holding sync data ronb041307 */               
     542 
     543int                                     bTankOverSyncDbData = 0;                /* how to resync data or which has priority. ronb041607*/ 
     544 
     545int                             bPurgePacketSyncDb = 1;        /* On initialization of the db should we purge all 
     546                                                                                                   packets(1) or leave alone (0). Note that the db will be 
     547                                                                                                   purged of records older than the tank file every nPurgeTankFreq 
     548                                                                                                   Heartbeats. 
     549                                                                                                        ronb052007 */ 
    534550 
    535551/*  I am not sure if SocketTimeoutLength should be volatile.  It should only 
     
    598614/**************************/ 
    599615 
     616TBDB_STORE      *pPacketDb =NULL;       /* ponter to TBDB_STORE for temp packet storage*/ 
     617 
     618unsigned int nPurgeDbFreq = 20; /* purge db of records older than oldest tank trace_buf 
     619                                                                every nPuregeDbFreq Heartbeat.*/ 
    600620 
    601621static char  Text[250];        /* string for log/error messages          */ 
     
    638658 
    639659 
    640 #define WSV_VERSION "5.1.37 - 2007-05-31"  
     660#define WSV_VERSION "5.1.38 - 2007-11-28 sqlite" 
    641661 
    642662main( int argc, char **argv ) 
     
    663683  char szFileFunction[15]; 
    664684  int  bIssueIOStatusError = FALSE; 
    665  
     685  unsigned int nPurgeDbCount = 0; 
     686  double PurgeTime = 0.0; 
     687  double LastPurgeTime = 0.0; 
    666688 
    667689  TRACE2_HEADER *trh;   /* paulf addition 1/07/1999 mem alignment bug fix*/ 
    668690 
    669691  TracePacket   Tpkt;   /* paulf addition 5/24/2007 another mem alignment fix! 
    670                                         ironic no? */ 
     692                                        ironic no? */ 
     693 
    671694  /* Catch broken socket signals 
    672695   ******************************/ 
     
    675698#endif 
    676699 
     700  /* Not enough arguments 
     701   ********************/ 
     702  if ( argc != 2 ) 
     703  { 
     704    fprintf( stderr, "Usage: wave_serverV <configfile>\n" ); 
     705    fprintf( stderr, "Version: %s\n", WSV_VERSION ); 
     706    exit( -1 ); 
     707  } 
     708 
    677709  /* Initialize name of log-file & open it 
    678710     force disk logging until after config 
     
    681713  logit_init( argv[1], 0, 256, 1 ); 
    682714 
    683   /* Not enough arguments 
    684    ********************/ 
    685   if ( argc != 2 ) 
    686   { 
    687     fprintf( stderr, "Usage: wave_serverV <configfile>\n" ); 
    688     exit( -1 ); 
    689   } 
    690715 
    691716  /* debug: set up periodic dump logic 
     
    732757  } 
    733758 
    734    /* Attach to transport ring (Must already exist) 
     759  /* initialize packet db ronb 041207*/ 
     760  if(bUsePacketSyncDb) { 
     761          if(tbdb_open(&pPacketDb, sPacketSyncDbFile)) { 
     762                 /* delete packets based on ldSyncRetentionTime config setting */ 
     763                  if(bPurgePacketSyncDb) { 
     764                          tbdb_purge_all(pPacketDb); 
     765                  } 
     766          }     else { 
     767                  /*error opening packet db*/ 
     768                  logit("e","wave_serverV: failed to open packet db.\n"); 
     769          } 
     770   } 
     771  
     772 /* Attach to transport ring (Must already exist) 
    735773   *********************************************/ 
    736774  tport_attach( &Region, RingKey ); 
     
    10401078        timeLastBeat = timeNow; 
    10411079        wave_serverV_status( TypeHeartBeat, 0, "" ); 
     1080 
     1081                /* check for db purge if using db at all*/ 
     1082                if(bUsePacketSyncDb && !(++nPurgeDbCount % nPurgeDbFreq)) { 
     1083                        /* get oldest tank record*/ 
     1084                        PurgeTime = IndexOldest(&Tanks[0])->tStart; 
     1085                        for(i=1; i < nTanks; i++) { 
     1086                                if(IndexOldest(&Tanks[i])->tStart < PurgeTime) { 
     1087                                        PurgeTime = IndexOldest(&Tanks[i])->tStart; 
     1088                                } 
     1089                        } 
     1090                        /* if needs purging then purge*/ 
     1091                        if(LastPurgeTime != PurgeTime) { 
     1092                                tbdb_purge_prior_packets(pPacketDb, PurgeTime); 
     1093                                LastPurgeTime = PurgeTime; 
     1094                        } 
     1095                } 
    10421096      } 
    10431097 
     
    10611115    trh = (TRACE2_HEADER *) &Tpkt; 
    10621116 
    1063     /*  from here on in, only use Tpkt to refer to the msg,  
    1064         or trh to the header, this is mem aligned  
    1065         *************************************************/ 
    1066     rc = WaveMsg2MakeLocal( trh );  
     1117    /*  from here on in, only use Tpkt to refer to the msg, 
     1118        or trh to the header, this is mem aligned 
     1119        *************************************************/ 
     1120    rc = WaveMsg2MakeLocal( trh ); 
    10671121 
    10681122    if(rc < 0) 
     
    10711125        trh->sta, trh->chan, trh->net, trh->loc); 
    10721126      logit("e", "\t%.2f - %.2f nsamp=%d samprate=%6.2f endtime=%.2f. datatype=[%s]\n", 
    1073         trh->starttime, trh->endtime, trh->nsamp, trh->samprate, 
     1127        trh->starttime, trh->endtime, trh->nsamp, trh->samprate,  
    10741128        IndexYoungest(&Tanks[t])->tEnd, trh->datatype); 
    10751129      goto ReleaseMutex; 
    10761130    } 
    1077  
    10781131 
    10791132    /* Grab the tank file 
     
    10911144    } 
    10921145 
    1093     /* Seek to insertion point 
     1146        /* ronb 040507 - check if incoming tracebuf overlaps tank. 
     1147        if it does then save overlapped chunk and write portion that 
     1148        doesn't overlap to tank*/ 
     1149        if((trh->starttime < IndexYoungest(&Tanks[t])->tEnd) && bUsePacketSyncDb) { 
     1150  
     1151                /* tracebuf partially overlaps. write to db.*/ 
     1152                logit("e", "WARNING: Trace overlaps tankdata saving to db.\n"); 
     1153                 
     1154                /* currently write entire trace buffer to packet db.*/ 
     1155                if(tbdb_put_packet(pPacketDb, &trh, inMsg, msgInLen) != TBDB_OK) { 
     1156                        logit("e", "ERROR: failed to write async packet to db\n");       
     1157                } 
     1158                goto ReleaseMutex; 
     1159        } 
     1160 
     1161         /* Seek to insertion point 
    10941162     ********************/ 
    10951163    if ( fseek( Tanks[t].tfp, Tanks[t].inPtOffset, SEEK_SET ) != 0 )  /* move to insertion point */ 
     
    12481316    logit("et","wave_server: Detaching from shared memory.\n"); 
    12491317 
     1318  
    12501319  tport_detach( &Region ); 
    1251   logit("et","wave_serverV: Shutdown complete. Indexes saved to disk. Exiting\n"); 
     1320   
     1321  /* close packet db if its open. ronb041207 
     1322        NOTE: this should be null if bUsePacketSyncDb = 0*/ 
     1323   if(pPacketDb) { 
     1324                tbdb_close(&pPacketDb); 
     1325                pPacketDb = NULL; 
     1326   } 
     1327 
     1328   logit("et","wave_serverV: Shutdown complete. Indexes saved to disk. Exiting\n"); 
    12521329  exit(0); 
    12531330  return(0); 
     
    23862463      } 
    23872464 
    2388       /* Command is not recognized 
     2465          /* ronb040307 - Optional cmd: UseSequenceDb 
     2466           Use packet db to store/retrieve out of order Packets 
     2467           ********************************/ 
     2468          else if(k_its("UsePacketSyncDb")) { 
     2469                bUsePacketSyncDb = k_int(); 
     2470          } 
     2471           
     2472          /* get database file name from config file ronb040307*/ 
     2473          else if(k_its("PacketSyncDbFile")) { 
     2474                strncpy(sPacketSyncDbFile, k_str(), TB_MAX_DB_NAME); 
     2475          } 
     2476 
     2477          /* get priority for out of sync data overlapping tank data ronb041207*/ 
     2478          else if(k_its("TankOverSyncDbData")) { 
     2479                bTankOverSyncDbData = k_int(); 
     2480          } 
     2481 
     2482          /* get number of epoch seconds worth of data to retain in 
     2483          database. ronb041207*/ 
     2484          else if(k_its("PurgePacketSyncDb")) { 
     2485                bPurgePacketSyncDb = k_int(); 
     2486          } 
     2487  
     2488    /* Command is not recognized 
    23892489       ****************************/ 
    23902490      else 
     
    28972997        strcmp(pCurrentRecord->datatype, "s2") == 0 || 
    28982998        strcmp(pCurrentRecord->datatype, "s4") == 0 ) { 
    2899         /* do nothing here for now, its good */ 
     2999        /* do nothing here for now, its good */ 
    29003000    } else { 
    2901         return(FALSE); 
    2902     } 
    2903  
    2904     /* check samprate ???? */ 
     3001        logit("e", "TraceBufIsValid: bad datatype found in packet '%s'\n", pCurrentRecord->datatype); 
     3002        return(FALSE); 
     3003    } 
    29053004 
    29063005    /* if the tracebuf internals look OK, and this is the first tracebuf, 
     
    29093008        return(TRUE); 
    29103009 
    2911         /* WARNING, any checks after this point might not be hit for first write! */ 
    2912  
    29133010    /* check that time goes forward between the previous tracebuf and this 
    29143011       one */ 
    2915     if(pCurrentRecord->starttime < IndexYoungest(pTSPtr)->tEnd) { 
    2916         if (Debug > 1) 
    2917             logit("e", "TraceBufIsValid: packet time not advancing\n"); 
    2918         return(FALSE); 
    2919     } 
    2920  
     3012 
     3013    if((pCurrentRecord->starttime < IndexYoungest(pTSPtr)->tEnd) && !bUsePacketSyncDb) { 
     3014        if (Debug > 1) 
     3015              logit("e", "TraceBufIsValid: packet time not advancing\n"); 
     3016            return(FALSE); 
     3017     } 
     3018 
     3019    /* check samprate ???? */ 
    29213020 
    29223021    /* it passed all the tests, so pass it on. */ 
Note: See TracChangeset for help on using the changeset viewer.