Changeset 8165


Ignore:
Timestamp:
06/24/20 09:37:59 (2 weeks ago)
Author:
paulf
Message:

incorporated Stuart Weinstein MaxLatency? option, and added a version number to this EW module

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/data_exchange/coaxtoring/coaxtoring.c

    r7787 r8165  
    9999#include "kom.h" 
    100100#include "transport.h" 
     101#include "trace_buf.h" 
    101102#include "ew_packet.h" 
     103 
     104/* paulf introduced a version to this program in 2020! */ 
     105#define VERSION_STR "2.0.0 - 2020-06-24" 
    102106 
    103107DECLARE_SPECIFIC_SEMAPHORE_EW(sema_MessageReady); 
     
    162166int           RcvBufSize;        /* Size of IP receive buffer */ 
    163167static unsigned char MyModuleId; /* Module id for this instance of coaxtoring */ 
     168double        MaxLatency=0.0;    /* if set reject packets with data later than this */ 
     169int           skipped=0; 
     170 
    164171 
    165172int main( int argc, char *argv[] ) 
     
    172179   ew_thread_t tidMsgIn;            /* Thread id of the MsgIn thread */ 
    173180   ew_thread_t tidKill;             /* Thread id of the KillProcess thread */ 
     181   unsigned int type; 
     182   char  tempd[8]; 
     183   double starttime,now,latency; 
     184   time_t  t; 
     185   struct tm  *gm; 
     186   TRACE2_HEADER *ewPacket; 
    174187 
    175188/* Initialize name of log-file & open it 
     
    181194   if ( argc != 2 ) 
    182195   { 
    183         printf( "Usage: coaxtoring <configfile>\n" ); 
     196        printf( "Usage: coaxtoring configfile.d\n" ); 
     197        printf( "Version: %s\n", VERSION_STR ); 
    184198        return -1; 
    185199   } 
     
    188202   *****************************/ 
    189203   coaxtoring_config( argv[1] ); 
    190    logit( "" , "%s: Read command file <%s>\n", argv[0], argv[1] ); 
     204   logit( "t" , "%s: Read command file <%s>\nVersion %s", argv[0], argv[1], VERSION_STR ); 
    191205 
    192206/* Look up other important info from earthworm.h tables 
     
    299313/* Send message to transport ring buffer 
    300314   *************************************/ 
    301       if ( pMsgOut->logo.mod == MyModuleId ) 
     315      type = (unsigned short) pMsgOut->logo.type; 
     316 
     317      if ( MaxLatency && type == 19 ) {   /*Is Packet TRACE BUF2 and MaxLatency set?*/ 
     318         ewPacket = (TRACE2_HEADER*) (pMsgOut->start); 
     319         if (pMsgOut->start[57] =='s' || pMsgOut->start[57] =='t') { 
     320             /* BYTE SWAP TO CORRECT ENDIANNESS for Intel systems only*/ 
     321               tempd[0]=pMsgOut->start[15]; 
     322               tempd[1]=pMsgOut->start[14]; 
     323               tempd[2]=pMsgOut->start[13]; 
     324               tempd[3]=pMsgOut->start[12]; 
     325               tempd[4]=pMsgOut->start[11]; 
     326               tempd[5]=pMsgOut->start[10]; 
     327               tempd[6]=pMsgOut->start[9]; 
     328               tempd[7]=pMsgOut->start[8]; 
     329           memcpy(&starttime, tempd, sizeof(double)); 
     330         } else { 
     331           memcpy(&starttime, &(pMsgOut->start[8]), sizeof(double)); 
     332         } 
     333 
     334         t = time(NULL); 
     335         gm = gmtime(&t); 
     336         now = (double) t; 
     337 
     338         latency = now - starttime; 
     339         if (MaxLatency && ( latency  > MaxLatency) ) { 
     340                logit("et", "coaxtoring: Latent Packet for %5s.%3s.%2s.%2s %lf secs\n", 
     341                     ewPacket->sta, ewPacket->chan, ewPacket->net, ewPacket->loc, latency); 
     342                  pMsgOut->status = BUFFER_UNUSED; 
     343                skipped ++;   /* Keep track of number of consecutive skipped packetss*/ 
     344                continue;   
     345         } 
     346      } 
     347 
     348      if ( pMsgOut->logo.mod == MyModuleId ) { 
    302349         res = tport_putmsg( &region, &pMsgOut->logo, pMsgOut->length, 
    303350                              (char *)pMsgOut->start ); 
    304       else 
    305          res = tport_copyto( &region, &pMsgOut->logo, pMsgOut->length, 
     351      } else { 
     352          if (skipped) {  /* Correct cSeqNum for skipped messages */ 
     353             res = tport_copyto( &region, &pMsgOut->logo, pMsgOut->length, 
     354                               (char *)pMsgOut->start, (pMsgOut->cSeqNum - skipped) ); 
     355          } else { 
     356             skipped =0; 
     357             res = tport_copyto( &region, &pMsgOut->logo, pMsgOut->length, 
    306358                              (char *)pMsgOut->start, pMsgOut->cSeqNum ); 
     359          } 
     360      } 
    307361 
    308362      if ( res != PUT_OK ) 
     
    695749             init[10] = 1; 
    696750          } 
     751        /* Optional Command:  Max Latecny Limit */ 
     752          else if ( k_its ("MaxLatency")) 
     753          { 
     754              if ( (str = k_str ()) ) 
     755                { 
     756                  MaxLatency  = (double)  atoi (str) ; 
     757                } 
     758              if ( !str || MaxLatency < 0 ) 
     759                { 
     760                  fprintf(stderr, "MaxLatency is unspecified or negative, quiting\n"); 
     761                  logit("e", "MaxLatency is unspecified or negative, quiting\n", com, configfile); 
     762                  exit( -1 ); 
     763                } 
     764          } 
    697765          else 
    698766          { 
     
    754822   logit( "",  "BufferReportInt: %d\n", BufferReportInt ); 
    755823   logit( "",  "RcvBufSize:      %d\n", RcvBufSize ); 
     824   logit( "",  "MaxLatency:      %f\n", MaxLatency ); 
    756825   return; 
    757826} 
Note: See TracChangeset for help on using the changeset viewer.