heartbeat

Log

Files

Refs

README

heartbeat.c (41009B)

     1 #! /usr/bin/env sheepy
     2 /* or direct path to sheepy: #! /usr/local/bin/sheepy */
     3 
     4 /* Libsheepy documentation: https://spartatek.se/libsheepy/ */
     5 #include <unistd.h> // for daemon
     6 #include "libsheepyObject.h"
     7 #include "shpPackages/short/short.h"
     8 #include "shpPackages/termbox/termbox.h"
     9 
    10 // In monitor, do not clear port state when c is pressed
    11 // >> when port state is on, a service is not running and it needs to be fixed
    12 
    13 /*
    14 Commands:
    15 no command: load config.yml
    16             probe and send messages to bridge, forward messages to bridge
    17             if logger, store messages, forward messages to monitor
    18 config: generate configuration yml files for each machine
    19         copy configuration to all machines
    20         copy heartbeat executable
    21 daemon: run as daemon
    22 monitor: display state
    23 
    24 
    25 msgt is the packets sent with udp from the agents
    26 dbt is the biggest possible packet, it is written in the db files when there is a state
    27 change (event).
    28 */
    29 
    30 
    31 // cfgFile is the main configuration file written by the user
    32 // it used in the config command to generate agentCfgFile for each machine
    33 #define cfgFile "heartbeatConfig.yml"
    34 
    35 // cleartime is used to ignore events that happened before cleartime
    36 // press c to set cleartime to now
    37 #define monitorCleartimeFile "cleartime.json"
    38 
    39 // when running as a daemon, expanding ~/ doesn't work
    40 #define uHome "/root/"
    41 // monitor user home, monitor doesn't run as a daemon and
    42 // can run as a normal user
    43 #define muHome "~/"
    44 #define home ".heartbeat"
    45 
    46 // agentCfgFile is the default configuration file
    47 #define agentCfgPath home"/config.yml"
    48 #define agentCfgFile uHome agentCfgPath
    49 #define binPath "bin/"
    50 #define BIN uHome binPath
    51 
    52 // keys
    53 #define SECRET_KEY "secret.bin"
    54 #define PUBLIC_KEY "public.bin"
    55 #define LOGGER_PUBLIC_KEY "loggerPublic.bin"
    56 
    57 // counter filename for encryption to avoid replay attacks
    58 // using the clock fails (as in previous commit) because on some VMs, the clock jumps
    59 #define counterFilename "counter.bin"
    60 
    61 #define defaultPeriod 1
    62 // TODO #define defaultPeriod 120
    63 
    64 // 10mn timeout
    65 #define agentTimeOut 8
    66 //TODO #define agentTimeOut 600
    67 
    68 int argc; char **argv;
    69 
    70 /* enable/disable logging */
    71 /* #undef pLog */
    72 /* #define pLog(...) */
    73 
    74 void printHelp(void);
    75 // generate configurations and setup script
    76 void config(smallJsont *cfg);
    77 // run commands with ssh on agents
    78 void runcommand(void);
    79 // show state
    80 void monitor(void);
    81 // send state to logger, as logger collect packets from agents
    82 void probe(char *cfgfile, char *secretFile, char *publicFile, char *loggerPublicFile);
    83 
    84 /* process arguments on command line
    85  */
    86 int main(int ARGC, char** ARGV) {
    87 
    88   argc = ARGC; argv = ARGV;
    89 
    90   //initLibsheepy(ARGV[0]);
    91   // don't initialize threads
    92   // there are issues with tpool waiting for threads to finish forever
    93   initLibsheepyF(ARGV[0], null);
    94   setLogMode(LOG_VERBOSE);
    95   //setLogMode(LOG_FUNC);
    96   //openProgLogFile();
    97   setLogSymbols(LOG_UTF8);
    98   //disableLibsheepyErrorLogs;
    99 
   100   if (argc == 1) {
   101     runProbes:;
   102     // no arguments
   103     cleanCharP(p) = expandHome(agentCfgFile);
   104     if (!isPath(p)) {
   105       logE("Missing configuration: %s", p);
   106       ret 1;
   107     }
   108     cleanCharP(sf) = expandHome(uHome home "/" SECRET_KEY);
   109     if (!isPath(sf)) {
   110       logE("Missing configuration: %s", sf);
   111       ret 1;
   112     }
   113     cleanCharP(pf) = expandHome(uHome home "/" PUBLIC_KEY);
   114     if (!isPath(pf)) {
   115       logE("Missing configuration: %s", pf);
   116       ret 1;
   117     }
   118     cleanCharP(lf) = expandHome(uHome home "/" LOGGER_PUBLIC_KEY);
   119     if (!isPath(lf)) {
   120       freen(lf);
   121     }
   122     probe(p, sf, pf, lf);
   123     ret 0;
   124   }
   125   elif (eqG(argv[1], "daemon")) {
   126     cleanCharP(h) = expandHome(uHome home "/heartbeat.log");
   127     setLogFile(h);
   128     setLogStdout(no);
   129     #define KEEP_CWD 1
   130     #define STDIO_DEV_NULL 0
   131     daemon(KEEP_CWD, STDIO_DEV_NULL);
   132     goto runProbes;
   133   }
   134   elif (eqG(argv[1], "monitor")) {
   135     monitor();
   136   }
   137   elif (eqG(argv[1], "config")) {
   138     cleanAllocateSmallJson(cfg);
   139     smallJsont *r = readFileG(cfg, cfgFile);
   140 
   141     if (!r) {
   142       logE("Missing "cfgFile);
   143       ret 1;
   144     }
   145     config(cfg);
   146   }
   147   elif (eqG(argv[1], "command")) {
   148     // run command on all machines
   149     if (argc != 3) {
   150       logE("Missing or too many arguments");
   151       printHelp();
   152       ret 1;
   153     }
   154     runcommand();
   155   }
   156   elif (   eqG(argv[1], "help")
   157         or eqG(argv[1], "-h")
   158         or eqG(argv[1], "--help")
   159         or eqG(argv[1], "-?")
   160         or eqG(argv[1], "?")) {
   161     printHelp();
   162   }
   163   else {
   164     logE("Invalid command: %s", argv[1]);
   165     printHelp();
   166     ret 1;
   167   }
   168 
   169   ret 0;
   170 }
   171 
   172 void printHelp(void) {
   173   puts(BLD GRN"Help"RST);
   174   TODO("write help");
   175 }
   176 
   177 #include "sel.h"
   178 
   179 /* generate setup for all agents
   180  */
   181 void config(smallJsont *cfg) {
   182 
   183   //lv(cfg);
   184   if (not selInit()) {
   185     logE("selInit failed");
   186     XFailure;
   187   }
   188 
   189   // generate agent configs: agentNameConfig.yml
   190   // add shell commands to copyScript
   191 
   192   // generate agent configs: agentNameConfig.yml
   193   u16 port = u$(cfg, "port");
   194   if (!port) {
   195     logE("Missing port.");
   196     XFailure;
   197   }
   198   delElemG(cfg, "port");
   199 
   200   // add shell commands to copyScript
   201   cleanAllocateSmallArray(copyScript);
   202 
   203   const char *loggerName = null;
   204   char *loggerTransfer   = null;
   205   char *loggerPublicKey  = null;
   206   cleanAllocateSmallArray(keyFilenames);
   207 
   208   // id is stored in the agent config and sent in the messages
   209   // it is the index of the machine in the list
   210   u32 id = 0;
   211   iter(cfg, D) {
   212     cast(smallDictt*, d, D);
   213     //logD("key %s", iK(cfg));
   214 
   215     // generate configuration file for agent
   216     cleanCharP(thisCfgFile) = catS(iK(cfg), "Config.yml");
   217     //lv(thisCfgFile);
   218 
   219     cleanAllocateSmallJson(thisCfg);
   220     cleanFinishSmallJsonP(agentCfg) = allocG(rtSmallJsont);
   221     setG(thisCfg, "port", (u32)port);
   222 
   223     // populate agent config
   224     setG(agentCfg, "id", id);
   225 
   226     if (hasG(d, "logger")) {
   227       setG(agentCfg, "logger", TRUE);
   228       // TODO("add monitor ip and port in logger agent");
   229       loggerName = iK(cfg);
   230     }
   231 
   232     // set bridge address
   233     if (hasG(d, "bridge")) {
   234         cleanCharP(key) = formatS("\"%s\".\"address\"", $(d, "bridge"));
   235         char *address = $(cfg, key);
   236         //lv(key);
   237         //lv(address);
   238         setG(agentCfg, "bridge", address);
   239     }
   240 
   241     if (hasG(d, "probes")) {
   242       setNFreeG(agentCfg, "probes", getNDupG(d, rtSmallArrayt, "probes"));
   243     }
   244 
   245     setG(thisCfg, iK(cfg), agentCfg);
   246     //lv(thisCfg);
   247     writeFileG(thisCfg, thisCfgFile);
   248 
   249     // generate keys
   250     cleanCharP(secretFilename) = catS(iK(cfg), "Secret.bin");
   251     cleanCharP(publicFilename) = catS(iK(cfg), "Public.bin");
   252     pushG(keyFilenames, secretFilename);
   253     pushG(keyFilenames, publicFilename);
   254     keyst keys                 = init0Var;
   255     newKeysBuf(&keys);
   256     pError0(writeFile(secretFilename, keys.secretKey, sizeof(keys.secretKey)));
   257     pError0(writeFile(publicFilename, keys.publicKey, sizeof(keys.publicKey)));
   258 
   259     // generate commands to setup the agents
   260     pushNFreeG(copyScript,
   261         formatS("# %s", iK(cfg)));
   262     if (!$(d, "transfers")) {
   263       // ssh/scp
   264       pushNFreeG(copyScript,
   265           formatS("ssh %s mkdir "home, iK(cfg)));
   266       if (hasG(d, "logger")) {
   267         pushNFreeG(copyScript,
   268             formatS("scp "cfgFile" %s:" home "/" cfgFile, iK(cfg)));
   269         loggerTransfer  = "scp";
   270         loggerPublicKey = $(keyFilenames, -1);
   271       }
   272       else {
   273         if (!loggerPublicKey) {
   274           logC("Put logger configuration first in the `"cfgFile"` yml file.");
   275           XFailure;
   276         }
   277         // copy logger public key to agent
   278         pushNFreeG(copyScript,
   279             formatS("scp %s %s:" home "/" LOGGER_PUBLIC_KEY, loggerPublicKey, iK(cfg)));
   280       }
   281       pushNFreeG(copyScript,
   282           formatS("scp %s %s:" home "/" SECRET_KEY, secretFilename, iK(cfg)));
   283       pushNFreeG(copyScript,
   284           formatS("scp %s %s:" home "/" PUBLIC_KEY, publicFilename, iK(cfg)));
   285       pushNFreeG(copyScript,
   286           formatS("scp %s %s:" agentCfgPath, thisCfgFile, iK(cfg)));
   287       pushNFreeG(copyScript,
   288           formatS("scp heartbeat %s:" binPath, iK(cfg)));
   289       pushNFreeG(copyScript,
   290           formatS("scp heartbeat.service %s:/etc/systemd/system/", iK(cfg)));
   291       pushNFreeG(copyScript,
   292           formatS("ssh %s \"systemctl daemon-reload ; systemctl enable heartbeat ; systemctl start heartbeat\"", iK(cfg)));
   293     }
   294     elif (eqG($(d, "transfers"), "copy")) {
   295       // cp same machine
   296       pushG(copyScript,
   297           "mkdir "uHome home);
   298       if (hasG(d, "logger")) {
   299         pushG(copyScript,
   300             "cp "cfgFile" "uHome home "/" cfgFile);
   301         loggerTransfer  = "ssh";
   302         loggerPublicKey = $(keyFilenames, -1);
   303       }
   304       else {
   305         if (!loggerPublicKey) {
   306           logC("Put logger configuration first in the `"cfgFile"` yml file.");
   307           XFailure;
   308         }
   309         // copy logger public key to agent
   310         pushNFreeG(copyScript,
   311             formatS("cp %s " uHome home "/" LOGGER_PUBLIC_KEY, loggerPublicKey));
   312       }
   313       pushNFreeG(copyScript,
   314           formatS("cp %s " uHome home "/" SECRET_KEY, secretFilename));
   315       pushNFreeG(copyScript,
   316           formatS("cp %s " uHome home "/" PUBLIC_KEY, publicFilename));
   317       pushNFreeG(copyScript,
   318           formatS("cp %s " agentCfgFile, thisCfgFile));
   319       pushNFreeG(copyScript,
   320           formatS("cp heartbeat " BIN));
   321       pushG(copyScript,
   322           "cp heartbeat.service /etc/systemd/system/");
   323       pushG(copyScript,
   324           "systemctl daemon-reload ; systemctl enable heartbeat ; systemctl start heartbeat");
   325     }
   326     inc id;
   327   }
   328 
   329   // copy all keys to logger
   330   // logger needs to know the public key for all agents
   331   iter(keyFilenames, F) {
   332     pushNFreeG(copyScript,
   333         eqG(loggerTransfer, "scp") ? formatS("scp %s %s:" home "/", ssGet(F), loggerName) :
   334                                      formatS("cp %s " uHome home "/", ssGet(F)));
   335   }
   336 
   337   logG(copyScript);
   338 }
   339 
   340 void runcommand(void) {
   341   cleanAllocateSmallJson(cfg);
   342   cleanCharP(p) = expandHome(uHome home "/" cfgFile);
   343   smallJsont *r = readFileG(cfg, p);
   344   if (!r) {
   345     logE("Missing %s", p);
   346     XFailure;
   347   }
   348   delElemG(cfg, "port");
   349 
   350   iter(cfg, D) {
   351     cast(smallDictt*, d, D);
   352     if (hasG(d, "logger")) continue;
   353       // no, can kill itself with pkill - logSystem(argv[2]);
   354     //else {
   355     logSystemf("ssh %s \"%s\"", iK(cfg)/*agent*/, argv[2]);
   356     //}
   357   }
   358 }
   359 
   360 #include <sys/types.h>
   361 #include <sys/socket.h>
   362 #include <netinet/in.h>
   363 #include <netdb.h>
   364 #include <sys/stat.h>
   365 #include <fcntl.h>
   366 #include <arpa/inet.h> // for inet_ntoa
   367 
   368 void monitor(void) {
   369   // load config
   370   // find logger config
   371   // create monitor socket
   372   // events older than clearTime are not displayed
   373   // load cleartime
   374   // setup terminal
   375   // infinite loop
   376 
   377   cleanAllocateSmallJson(cfg);
   378   cleanCharP(p) = expandHome(muHome home "/" cfgFile);
   379   smallJsont *r = readFileG(cfg, p);
   380   if (!r) {
   381     logE("Missing %s", p);
   382     XFailure;
   383   }
   384   delElemG(cfg, "port");
   385 
   386   // find logger config
   387   const char *loggerName = null;
   388   iter(cfg, D) {
   389     cast(smallDictt*, d, D);
   390     if (hasG(d, "logger")) {
   391       loggerName = iK(cfg);
   392       break;
   393     }
   394   }
   395   // create monitor socket
   396   cleanFinishSmallDictP(logger) = getG(cfg, rtSmallDictt, loggerName);
   397   u16 port = u$(logger, "monitorPort");
   398 
   399   int sock;
   400   struct sockaddr_in server;
   401   struct sockaddr_in client;
   402   char buf[65536];
   403 
   404   sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
   405   if (sock < 0){
   406     logE("Failed to create server socket: %s", strerror(errno));
   407     XFailure;
   408   }
   409 
   410   server.sin_family = AF_INET;
   411   server.sin_addr.s_addr = INADDR_ANY;
   412   server.sin_port = htons(port);
   413 
   414   if (bind(sock, (struct sockaddr *) &server, sizeof(server))){
   415       logE("bind failed: %s, port %d", strerror(errno), port);
   416       XFailure;
   417   }
   418 
   419   // events older than clearTime are not displayed
   420   time_t clearTime = 0;
   421 
   422   // load cleartime
   423   cleanAllocateSmallJson(ct);
   424   cleanCharP(ctPath) = expandHome(muHome home "/" monitorCleartimeFile);
   425   if (isPath(ctPath)) {
   426     readFileG(ct, ctPath);
   427     clearTime = getTopG(ct, rtU64);
   428   }
   429 
   430   // setup terminal
   431   int R = tb_init();
   432   if (R) {
   433     logE("tb_init() failed with error code %d\n", R);
   434     XFailure;
   435   }
   436 
   437   tb_select_output_mode(TB_OUTPUT_TRUECOLOR);
   438 
   439   // infinite loop
   440   cleanAllocateSmallJson(state);
   441 
   442   tb_stringf(0, 0, TB_WHITE, TB_DEFAULT, "%s", "No state received from logger yet...");
   443   tb_present();
   444 
   445   // when the first udp packet arrives, the message above is erased
   446   bool gotAMessage = no;
   447 
   448   struct tb_event ev;
   449   while (tb_poll_event(&ev, sock)) {
   450     switch (ev.type) {
   451       case TB_EVENT_KEY:
   452         switch (ev.key) {
   453           case TB_KEY_ESC:
   454             goto done;
   455         }
   456         if   (ev.ch == 'q') goto done;
   457         elif (ev.ch == 'c') {
   458           clearTime = getCurrentUnixTime();
   459           setTopG(ct, clearTime);
   460           writeFileG(ct, ctPath);
   461         }
   462         break;
   463       case TB_EVENT_RESIZE:
   464         //draw_all();
   465         break;
   466       case TB_EVENT_SOCKET:
   467         if (!gotAMessage) {
   468           // erase default message
   469           tb_stringf(0, 0, TB_WHITE, TB_DEFAULT, "%s", "Agent                State Last change         Boot     Boot time                     Probes");
   470           gotAMessage = yes;
   471         }
   472         pError0(ZEROVAR(buf));
   473         socklen_t addr_size = sizeof(client);
   474         ssize_t r = recvfrom(sock, buf, sizeof(buf), 0, (struct sockaddr *) &client, &addr_size);
   475         if (r == -1) {
   476           logE("recvfrom: %s", strerror(errno));
   477           continue;
   478         }
   479         parseG(state, buf);
   480         u16 line = 1;
   481         iter(state, D) {
   482           if (!isOSmallDict(D)) continue;
   483           cast(smallDictt*, d, D);
   484           u16 x = 0;
   485           tb_stringf(x, line, TB_WHITE, TB_DEFAULT, "%20s", iK(state) /*agent name*/);
   486           x += 21;
   487           u32 color, bgcolor;
   488           if (eqG($(d, "state"), "alive")) {
   489             color   = TB_WHITE;
   490             bgcolor = TB_BLACK;
   491           }
   492           elif (eqG($(d, "state"), "init")) {
   493             color   = TB_GREEN;
   494             bgcolor = TB_BLACK;
   495           }
   496           elif (eqG($(d, "state"), "down")) {
   497             color   = TB_WHITE;
   498             bgcolor = TB_RED;
   499           }
   500           else {
   501             color   = TB_BLACK;
   502             bgcolor = TB_RED;
   503           }
   504           tb_stringf(x, line, color, bgcolor, "%5s", $(d, "state"));
   505           x += 6;
   506           cleanCharP(lastStateChange) = timeToYMDS(u$(d, "last"));
   507           tb_stringf(x, line, TB_WHITE, TB_DEFAULT, "%19s", lastStateChange);
   508           x += 20;
   509           //tb_stringf(47, line, TB_WHITE, TB_DEFAULT, "%d", u$(d, "mId"));
   510           char *rebootorNet    = "";
   511           cleanCharP(lastBoot) = null;
   512           if (getG(d, rtBool, "rebooted") and u$(d, "lastBoot") > clearTime) {
   513             color   = TB_WHITE;
   514             bgcolor = TB_RED;
   515             rebootorNet = "rebooted";
   516             lastBoot = timeToYMDS(u$(d, "lastBoot"));
   517           }
   518           elif (getG(d, rtBool, "net") and u$(d, "lastNet") > clearTime) {
   519             u64 t = u$(d, "lastNet");
   520             // check network issues
   521               color   = TB_WHITE;
   522               bgcolor = TB_RED;
   523               rebootorNet = "network";
   524               lastBoot = timeToYMDS(u$(d, "lastNet"));
   525           }
   526           else {
   527             color   = TB_WHITE;
   528             bgcolor = TB_BLACK;
   529             lastBoot = timeToYMDS(u$(d, "lastBoot"));
   530           }
   531           tb_stringf(x, line, color, bgcolor, "%8s", rebootorNet);
   532           x += 19;
   533           tb_stringf(x, line, TB_WHITE, TB_DEFAULT, "%19s", lastBoot);
   534           x += 20;
   535           cleanFinishSmallArrayP(probes) = getG(d, rtSmallArrayt, "probes");
   536           if (probes) {
   537             iter(probes, P) {
   538               cast(smallDictt*,p,P);
   539               if (getG(p, rtBool, "state")) {
   540                 // service down
   541                 color   = TB_WHITE;
   542                 bgcolor = TB_RED;
   543               }
   544               else {
   545                 // service up
   546                 color   = TB_WHITE;
   547                 bgcolor = TB_BLACK;
   548               }
   549               tb_stringf(x, line, color, bgcolor, "port %5d", u$(p, "port"));
   550               x += 11;
   551             }
   552           }
   553           inc line;
   554         }
   555         break;
   556     }
   557     tb_present();
   558   }
   559 
   560   done:
   561   tb_shutdown();
   562 }
   563 
   564 
   565 void sendMail(char *cmd);
   566 
   567 void setTimeout(int bridgesock, u32 period, u32 usec) {
   568   struct timeval timeout = {.tv_sec = period, .tv_usec = usec};
   569   if (setsockopt(bridgesock, SOL_SOCKET, SO_RCVTIMEO, (char *) &timeout, sizeof(timeout)) < 0) {
   570     logE("Could not set receive socket time out: %s", strerror(errno));
   571     close(bridgesock);
   572     XFailure;
   573   }
   574   if (setsockopt(bridgesock, SOL_SOCKET, SO_SNDTIMEO, (char *) &timeout, sizeof(timeout)) < 0) {
   575     logE("Could not set send socket time out: %s", strerror(errno));
   576     close(bridgesock);
   577     XFailure;
   578   }
   579 }
   580 
   581 /* agent, bridge and logger
   582  */
   583 void probe(char *cfgfile, char *secretFile, char *publicFile, char *loggerPublicFile) {
   584 
   585   // load agent config
   586   // get port and agent name
   587   // setup agent
   588   // setup logger
   589   // create client socket
   590   // create socket for brigde (only if not logger)
   591   // start server or bridge for the agents
   592   // message structures
   593   // infinite loop
   594     // check probes
   595       // send message
   596       // this is the logger
   597       // write messages directly
   598       // update logger state and send mail if changed
   599       // set remote agents down if there hasn't been a message in a while
   600       // send mail
   601     // bridge messages from others
   602         // when agent is logger, store message
   603           // update state for this agent
   604               // send mail when agent rebooted
   605               // send mail when service is down
   606           // send mail
   607         // when agent is not logger, forward message
   608 
   609   if (not selInit()) {
   610     logE("selInit failed");
   611     XFailure;
   612   }
   613 
   614   u64 counter = 0;
   615   cleanCharP(counterfn) = expandHome(uHome home counterFilename);
   616   if (isPath(counterfn)) {
   617     // load counter value from disk
   618     pError(bLReadFile(counterfn, &counter, sizeof(counter)));
   619   }
   620 
   621   keyst keys = init0Var;
   622 
   623   // load keys
   624   pError(bLReadFile(secretFile, keys.secretKey, sizeof(keys.secretKey)));
   625   pError(bLReadFile(publicFile, keys.publicKey, sizeof(keys.publicKey)));
   626   if (loggerPublicFile) {
   627     pError(bLReadFile(loggerPublicFile, keys.remotePublicKey, sizeof(keys.remotePublicKey)));
   628   }
   629 
   630   // complete configuration in logger
   631   cleanSmallJsonP(completeCfg) = null;
   632 
   633   // load agent config
   634   cleanAllocateSmallJson(cfg);
   635   readFileG(cfg, cfgfile);
   636   //lv(cfg);
   637 
   638   // get port and agent name
   639   u16 port         = u$(cfg, "port");
   640   if (!port) {
   641     logE("Missing port.");
   642     XFailure;
   643   }
   644   delElemG(cfg, "port");
   645   cleanListP(names) = keysG(cfg);
   646   //lv(names);
   647   if (lenG(names) != 1) {
   648     logE("There should only and at least one agent configured: %d", lenG(names));
   649     XFailure;
   650   }
   651 
   652   // this agent name in the configuration
   653   char *name = names[0];
   654 
   655   // setup agent
   656   cleanFinishSmallDictP(agent) = getG(cfg, rtSmallDictt, name);
   657   //lv(agent);
   658   if (!agent) {
   659     logE("Agent object has wrong type, should be smallDict.");
   660     XFailure;
   661   }
   662 
   663   u32 id           = u$(agent, "id");
   664   bool logger      = hasG(agent, "logger");
   665   u16  monitorPort = 0;
   666   if (logger) {
   667     // load complete configuration to get monitorPort
   668     completeCfg   = allocG(rtSmallJsont);
   669     cleanCharP(p) = expandHome(uHome home "/" cfgFile);
   670     smallJsont *r = readFileG(completeCfg, p);
   671     if (!r) {
   672       logE("Missing %s", p);
   673       XFailure;
   674     }
   675     delElemG(completeCfg, "port");
   676 
   677     // find logger config
   678     const char *loggerName = null;
   679     iter(completeCfg, D) {
   680       cast(smallDictt*, d, D);
   681       if (hasG(d, "logger")) {
   682         loggerName = iK(completeCfg);
   683         break;
   684       }
   685     }
   686     cleanFinishSmallDictP(logger) = getG(completeCfg, rtSmallDictt, loggerName);
   687 
   688     // set monitor port
   689     monitorPort = u$(logger, "monitorPort");
   690     if (!monitorPort) {
   691       logE("monitorPort missing in logger configuration");
   692       XFailure;
   693     }
   694   }
   695   char *dst   = logger ? "localhost" /*TODO add monitorIp*/ : $(agent, "bridge");
   696   if (!dst) {
   697     logE("Missing bridge address.");
   698     XFailure;
   699   }
   700   cleanFinishSmallArrayP(probes) = getG(agent, rtSmallArrayt, "probes");
   701   if (probes and !lenG(probes)) {
   702     finishG(probes);
   703     probes = null;
   704   }
   705   u32 period = hasG(agent, "period") ? u$(agent, "period") : defaultPeriod;
   706   /* lv(id); */
   707   /* lv(logger); */
   708   /* lv(dst); */
   709   //lv(probes);
   710   //lv(period);
   711 
   712   // setup logger
   713   // agent names
   714   cleanListP(agents)           = null;
   715   // agent db filenames
   716   cleanListP(agentdbs)         = null;
   717   // when logger was started
   718   u64 startTime                = 0;
   719   cleanCharP(mailSubject)      = null;
   720 
   721   if (logger) {
   722     startTime     = getCurrentUnixTime();
   723 
   724     // create filename with path in ~/.heartbeat/
   725     agents        = keysG(completeCfg);
   726     agentdbs      = keysG(completeCfg);
   727     cleanCharP(h) = expandHome(uHome home "/");
   728     forEachCharP(agentdbs, ag) {
   729       pErrorNULL(iPrependS(ag, h));
   730       pErrorNULL(iAppendS(ag, ".bin"));
   731     }
   732     //lv(agentdbs);
   733 
   734     // mail setup
   735     pError0(chDir(h));
   736     mailSubject   = catS("Heatbeat on ", name);
   737   }
   738   // open log files and load agent public keys
   739   int agentf[lenG(agents)];
   740   u8 agentPublicKeys[lenG(agents)][sizeof(keys.remotePublicKey)];
   741   if (logger) {
   742     arange(i, agentf) {
   743       agentf[i] = open(agentdbs[i], O_APPEND|O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR);
   744       if (agentf[i] < 0) {
   745         logE("Failed to open %s: %s", agentdbs[i], strerror(errno));
   746         XFailure;
   747       }
   748       cleanCharP(publicFilename) = catS(uHome home "/", agents[i], "Public.bin");
   749       cleanCharP(pfn) = expandHome(publicFilename);
   750       pError(bLReadFile(pfn, agentPublicKeys[i], sizeof(agentPublicKeys[i])));
   751     }
   752     // setup objects for monitor
   753     setG(completeCfg, "startTime", startTime);
   754     iter(completeCfg, D) {
   755       if (!isOSmallDict(D)) continue;
   756       cast(smallDictt*,d,D);
   757       setG(d, "state", "unknown");   // network state
   758       setG(d, "last", startTime); // network state change
   759       setG(d, "mId", 0);          // last message id, if next mId under last one, the agent rebooted
   760       setG(d, "rebooted", FALSE);
   761       setG(d, "lastBoot", startTime);
   762       setG(d, "time", 0);         // last incoming message time
   763       setG(d, "count", 0);        // packet counter from agent to avoid replay attacks
   764       setG(d, "c", 0);            // packet counter under a period
   765       setG(d, "mono", 0);         // current period start time
   766       setG(d, "net", FALSE);      // there is a network issue, when agent goes from down to alive and next mId is higher than last mId
   767       setG(d, "lastNet", startTime);
   768       cleanFinishSmallArrayP(probes) = getG(d, rtSmallArrayt, "probes");
   769       if (probes) {
   770         iter(probes, P) {
   771           cast(smallDictt*,p,P);
   772           setG(p, "state", FALSE);
   773           setG(p, "last", startTime);
   774           setPG(probes, iI(probes), p);
   775         }
   776       }
   777       setPG(completeCfg, iK(completeCfg), d);
   778     }
   779   }
   780 
   781 
   782   // create client socket
   783   // sock is client socket or monitor socket on logger
   784   int sock = -1;
   785   struct sockaddr_in server;
   786   struct hostent *hp;
   787 
   788   sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
   789   if (sock < 0) {
   790     logE("Failed to create socket: %s", strerror(errno));
   791     XFailure;
   792   }
   793 
   794   server.sin_family = AF_INET;
   795 
   796   hp = gethostbyname(dst);
   797   if (hp==0) {
   798     logE("gethostbyname failed: %s", strerror(errno));
   799     close(sock);
   800     XFailure;
   801   }
   802 
   803   memcpy(&server.sin_addr, hp->h_addr, hp->h_length);
   804   server.sin_port = htons(logger ? monitorPort : port);
   805 
   806   // create socket for brigde (only if not logger)
   807   // start server or bridge for the agents
   808   int bridgesock;
   809   struct sockaddr_in bridge;
   810   struct sockaddr_in client;
   811 
   812   bridgesock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
   813   if (bridgesock < 0){
   814     logE("Failed to create bridge socket: %s", strerror(errno));
   815     XFailure;
   816   }
   817 
   818   bridge.sin_family = AF_INET;
   819   bridge.sin_addr.s_addr = INADDR_ANY;
   820   bridge.sin_port = htons(port);
   821 
   822   if (bind(bridgesock, (struct sockaddr *) &bridge, sizeof(bridge))){
   823       logE("Bridge bind failed: %s, port %d", strerror(errno), port);
   824       XFailure;
   825   }
   826 
   827   // packet structure
   828   typ struct PACKED {
   829     u8 nonce[crypto_box_NONCEBYTES];
   830     u32 id;
   831     i32 len;
   832     u8 buf[64*1024]; // buf holds the encrypted data (it is slightly bigger than the payload)
   833   } packett;
   834   // message structures
   835   typ struct PACKED {
   836     u64 messageId;
   837     u64 status;
   838     u32 info;
   839   } msgt;
   840   typ struct PACKED {
   841     u64 count;
   842     msgt m;
   843   } payloadt;
   844   typ struct PACKED {
   845     u64 time;
   846     u32 id;
   847     msgt m;
   848     u32 levels[32];
   849   } dbt;
   850   packett packet = {.id = id}; // packet to send, id is the agent id
   851   packett data   = init0Var;   // received packet
   852   msgt msg       = init0Var;   // message to send
   853   dbt record     = init0Var;   // record to write on disk in logger
   854   //lv(sizeof(msg));
   855 
   856   // infinite loop
   857   rangeInf(messageId) {
   858     //lv(messageId);
   859 
   860     u64 time     = getMonotonicTime();
   861     u64 nextTime = time + (u64)period * 1000000000UL;
   862 
   863     msg.messageId = messageId;
   864     msg.status    = 0;
   865     msg.info      = 0;
   866 
   867     // check probes
   868     if (probes) {
   869       iter(probes, P) {
   870         cast(smallDictt*,p,P);
   871         if (not isOSmallDict(p)) {
   872           logE("Invalid probe at index %d", iI(probes));
   873           XFailure;
   874         }
   875         // check all used ports tcp and tcp6
   876         staticBitsetT(usedPortst, u64 , 65536);
   877         usedPortst usedPorts;
   878         staticBitsetInit(&usedPorts);
   879         cleanAllocateSmallArray(tcp);
   880         readFileG(tcp, "/proc/net/tcp");
   881         delElemG(tcp, 0);
   882         iter(tcp, L) {
   883           castS(l, L);
   884           //    0: 00000000:0016 00000000:0000 0A 00000000:00000000 00:00000000 00000000     0        0 14737 1 ffff8d88f5265800 100 0 0 10 0
   885           cleanSmallArrayP(ll) = splitG(l, ' ');
   886           compactG(ll);
   887           //logVarG(ll);
   888           char *localAddr = $(ll, 1);
   889           char *portStart = localAddr + 8;
   890           *(portStart-1)  = '0';
   891           *portStart      = 'x';
   892           //logVarG(localAddr);
   893           u16 port        = parseHex(portStart-1);
   894           //logVarG(port);
   895           staticBitset1(&usedPorts, port);
   896         }
   897         emptyG(tcp);
   898         readFileG(tcp, "/proc/net/tcp6");
   899         delElemG(tcp, 0);
   900         iter(tcp, L) {
   901           castS(l, L);
   902           //    0: 00000000000000000000000000000000:0050 00000000000000000000000000000000:0000 0A 00000000:00000000 00:00000000 00000000     0        0 12592 1 ffff8d88f6b7c940 100 0 0 10 0
   903           cleanSmallArrayP(ll) = splitG(l, ' ');
   904           compactG(ll);
   905           //logVarG(ll);
   906           char *localAddr = $(ll, 1);
   907           char *portStart = localAddr + 32;
   908           // make sure port string starts with 0x, it should be 0xABCD
   909           *(portStart-1)  = '0';
   910           *portStart      = 'x';
   911           //logVarG(localAddr);
   912           u16 port        = parseHex(portStart-1);
   913           //logVarG(port);
   914           staticBitset1(&usedPorts, port);
   915         }
   916         if (hasG(p, "port")) {
   917           if (not staticBitsetGet(&usedPorts, u$(p, "port")))
   918             msg.status |= 1UL << iI(probes);
   919         }
   920         elif (hasG(p, "if")) {
   921           // TODO("check if network interface is down");
   922         }
   923       }
   924     } // if probes
   925 
   926     //logD("status %x", msg.status);
   927 
   928     if (not logger) {
   929       // send message
   930       // encrypt message
   931       randombytes_buf(keys.nonce, sizeof(keys.nonce));
   932       memcpy(packet.nonce, keys.nonce, sizeof(keys.nonce));
   933       payloadt payload;
   934       // set counter in encrypted message to avoid replay attacks
   935       payload.count = counter++;
   936       pError0(writeFile(counterfn, &counter, sizeof(counter)));
   937       payload.m    = msg;
   938       packet.len   = selPublicEncrypt(packet.buf, sizeof(packet.buf), (u8*)&payload, sizeof(payload), &keys);
   939       // send message
   940       if (sendto(sock, &packet, packet.len + sizeof(packet.nonce) + sizeof(packet.id) + sizeof(packet.len), 0, (const struct sockaddr *)&server, sizeof(server)) < 0) {
   941         logE("send failed: %s", strerror(errno));
   942         close(sock);
   943         XFailure;
   944       }
   945     }
   946     else {
   947       // this is the logger
   948       // write messages directly
   949       bool saveEvent = no;
   950       record.m       = msg;
   951       record.time    = getCurrentUnixTime();
   952       cleanFinishSmallDictP(logAgent) = getG(completeCfg, rtSmallDictt, name);
   953       setG(logAgent, "time", record.time);
   954       char *newstate = messageId ? "alive" : "init";
   955       if (!eqG($(logAgent, "state"), newstate)) {
   956         saveEvent = yes;
   957         /* logD("%s is %s, previous state was %s", */
   958         /*     name#<{(|agent name|)}>#, */
   959         /*     newstate, */
   960         /*     $(logAgent,"state")); */
   961         setG(logAgent, "state", newstate);
   962         setG(logAgent, "last", record.time);
   963       }
   964 
   965       cleanAllocateSmallArray(mailMsg);
   966 
   967       // update logger state and send mail if changed
   968       if (probes) {
   969         iter(probes, P) {
   970           cast(smallDictt*,p,P);
   971           bool newstate = msg.status & (1UL << iI(probes));
   972           if (getG(p, rtBool, "state") != newstate) {
   973             saveEvent = yes;
   974             setG(p, "state", newstate);
   975             setG(p, "last", record.time);
   976             // send mail when service is down
   977             char *state = newstate ? "down" : "up";
   978             char *result = newstate ? "failed" : "ok";
   979             if (hasG(p, "port")) {
   980               cleanCharP(s) = formatS("Service on port %d running in %s is %s", u$(p, "port"), name, state);
   981               pushG(mailMsg, s);
   982               logW("%s", s);
   983             }
   984             elif (hasG(p, "if")) {
   985               cleanCharP(s) = formatS("Network interface %s in %s is down", u$(p, "if"), name);
   986               pushG(mailMsg, s);
   987               logW("%s", s);
   988             }
   989             elif (hasG(p, "cmd")) {
   990               cleanCharP(s) = formatS("'%s' running in %s is %s", u$(p, "cmd"), name, result);
   991               pushG(mailMsg, s);
   992               logW("%s", s);
   993             }
   994           }
   995         }
   996       } // if probes
   997 
   998       if (saveEvent) {
   999         write(agentf[id], &record, sizeof(record));
  1000       }
  1001 
  1002       // set remote agents down if there hasn't been a message in a while
  1003       iter(completeCfg, D) {
  1004         if (!isOSmallDict(D)) continue;
  1005         cast(smallDictt*,d,D);
  1006         if (eqG($(d, "state"), "down") or eqG($(d, "state"), "unknown")) continue;
  1007         u64 timeout = u$(d, "timeout") ? u$(d, "timeout") : agentTimeOut;
  1008         if (getCurrentUnixTime() - u$(d, "time") < timeout) continue;
  1009         // send mail when agent is down
  1010         cleanCharP(s) = formatS("%s is down, previous state was %s", iK(completeCfg)/*agent name*/, $(d,"state"));
  1011         pushG(mailMsg, s);
  1012         logW("%s", s);
  1013         setG(d, "state", "down");
  1014         setG(d, "last", getCurrentUnixTime());
  1015       }
  1016 
  1017       // send mail
  1018       if (lenG(mailMsg)) {
  1019         pError0(writeFileG(mailMsg, "mail.txt"));
  1020         cleanFinishSmallArrayP(mails) = getG(logAgent, rtSmallArrayt, "mails");
  1021         char *email                   = $(mails, 0);
  1022         cleanCharP(mutt) = formatS("mutt -s \"%s\" %s < mail.txt", mailSubject, email);
  1023         sendMail(mutt);
  1024       }
  1025 
  1026       // send to monitor
  1027       //lv(completeCfg);
  1028       cleanCharP(monitorData) = toStringG(completeCfg);
  1029       if (sendto(sock, monitorData, lenG(monitorData)+1, 0, (const struct sockaddr *)&server, sizeof(server)) < 0) {
  1030         logE("monitor send failed: %s", strerror(errno));
  1031         close(sock);
  1032         XFailure;
  1033       }
  1034     }
  1035 
  1036     // bridge messages from others
  1037     // set timeout to period time, to sleep enough time
  1038     setTimeout(bridgesock, period, 0);
  1039     listenToOthers:;
  1040     socklen_t addr_size = sizeof(client);
  1041     memset(&data, 0, sizeof(data));
  1042     ssize_t r = recvfrom(bridgesock, &data, sizeof(data), 0, (struct sockaddr *) &client, &addr_size);
  1043     if (r == -1) {
  1044       // timeout
  1045       goto cont;
  1046     }
  1047     elif (r != data.len + sizeof(data.nonce) + sizeof(data.id) + sizeof(data.len)) {
  1048       char *ip = inet_ntoa((client).sin_addr);
  1049       logE("Dropped packet with id %d from ip %s. Incorrect size, received %d and expected %d", data.id, ip, r, data.len + sizeof(data.nonce) + sizeof(data.id) + sizeof(data.len));
  1050       goto cont;
  1051     }
  1052     else {
  1053       // got a message
  1054       // forward message when acting as a bridge
  1055       //lv(r);
  1056       if (logger) {
  1057         // when agent is logger, store message
  1058         //logD("store message");
  1059         if (data.id >= ARRAY_SIZE(agentf)) {
  1060           char *ip = inet_ntoa((client).sin_addr);
  1061           logE("Invalid id: %d, ip %s, packet size %d", data.id, ip, r);
  1062           // drop packet
  1063         }
  1064         elif (data.id == id) {
  1065           char *ip = inet_ntoa((client).sin_addr);
  1066           logE("Invalid id, the logger doesn't send packets, ip %s", ip);
  1067           // drop packet
  1068         }
  1069         else {
  1070           // agent id is valid
  1071           // decrypt message
  1072           // copy agent public key to keys.remotePublicKey
  1073           memcpy(keys.remotePublicKey, agentPublicKeys[data.id], sizeof(keys.remotePublicKey));
  1074 
  1075           // copy nonce
  1076           memcpy(keys.nonce, data.nonce, sizeof(keys.nonce));
  1077 
  1078           payloadt payload;
  1079           int len = selPublicDecrypt((u8*)&payload, sizeof(payload), (u8*)&data.buf, data.len, &keys);
  1080 
  1081           if (!len) {
  1082             char *ip = inet_ntoa((client).sin_addr);
  1083             logE("Failed to decrypt, ip %s, agent %s", ip, agents[data.id]);
  1084             // drop packet
  1085             goto handleEventLoopSleep;
  1086           }
  1087 
  1088           // update state for this agent
  1089           cleanFinishSmallDictP(agent) = getG(completeCfg, rtSmallDictt, agents[data.id]);
  1090 
  1091           // packet is valid only if the counter is higher than the previous one
  1092           if (u$(agent, "count") and payload.count <= u$(agent, "count")) {
  1093             // packet counter is already initialized
  1094             char *ip = inet_ntoa((client).sin_addr);
  1095             logW("Dropping packet. Wrong remote packet count %"PRIu64" local %"PRIu64" diff %"PRIi64", ip %s, agent %s", payload.count, u$(agent, "count"), (i64)u$(agent, "count") - (i64)payload.count, ip, agents[data.id]);
  1096             goto handleEventLoopSleep;
  1097           }
  1098           setG(agent, "count", payload.count);
  1099 
  1100           // message is decrypted
  1101           msgt *m = &payload.m;
  1102           /* logD("got message from id %d, agent %s", data.id, agents[data.id]); */
  1103           /* logD("with message id %d", m->messageId); */
  1104           /* logD("and status %x", m->status); */
  1105 
  1106           bool saveEvent = no;
  1107           record.m       = *m;
  1108           record.time    = getCurrentUnixTime();
  1109 
  1110           cleanAllocateSmallArray(mailMsg);
  1111 
  1112           // count packets in each period, if more than 2 packets arrive during a period,
  1113           // something is wrong.
  1114           // Don't measure time between packet because
  1115           // they don't arrive regurlarly, some packets are delayed
  1116           /* TODO use specific period for agent */
  1117           u64 expectedNextTime = getG(agent, rtU64, "mono") + (u64)period * 1000000000UL;
  1118           time                 = getMonotonicTime();
  1119           if (time > expectedNextTime) {
  1120             // reset packet counter
  1121             setG(agent, "c", 0);
  1122             setG(agent, "mono", time);
  1123           }
  1124           else {
  1125             var c = getG(agent, rtU64P, "c");
  1126             if (c) {
  1127               inc *c;
  1128               if (*c > 2) {
  1129                 char *ip = inet_ntoa((client).sin_addr);
  1130                 logW("Too many packets from agent %s and ip %s, drop packet.", agents[data.id], ip);
  1131                 goto cont;
  1132               }
  1133             }
  1134           }
  1135           bool loggerRunning = u$(agent, "time");
  1136           setG(agent, "time", record.time);
  1137           char *newstate = m->messageId ? "alive" : "init";
  1138           // last message id, if next mId under last one, the agent rebooted
  1139           if (m->messageId < u$(agent,"mId")) {
  1140             saveEvent = yes;
  1141             setG(agent, "rebooted", TRUE);
  1142             setG(agent, "lastBoot", record.time);
  1143             // send mail when agent rebooted
  1144             cleanCharP(s) = formatS("%s rebooted", agents[data.id]);
  1145             pushG(mailMsg, s);
  1146             logW("%s", s);
  1147           }
  1148           elif (    eqG($(agent, "state"), "down")
  1149                 and eqG(newstate, "alive")
  1150                 and loggerRunning) {
  1151             // there is a network issue when the agent was down
  1152             // and messageId is higher than the last received messageId
  1153             // (it didn't reboot)
  1154             // when starting logger, agent state is down
  1155             // if the agents started running before logger, the agent states are alive
  1156             // in that case mails are not sent because loggerRunning is false
  1157             saveEvent = yes;
  1158             setG(agent, "net", TRUE);
  1159             setG(agent, "lastNet", record.time);
  1160             // send mail when agent has network issues
  1161             cleanCharP(s) = formatS("%s has network issues", agents[data.id]);
  1162             pushG(mailMsg, s);
  1163             logW("%s", s);
  1164           }
  1165           setG(agent, "mId", m->messageId);
  1166           // update state
  1167           if (!eqG($(agent, "state"), newstate)) {
  1168             saveEvent = yes;
  1169             setG(agent, "state", newstate);
  1170             setG(agent, "last", record.time);
  1171           }
  1172           cleanFinishSmallArrayP(probes) = getG(agent, rtSmallArrayt, "probes");
  1173           if (probes) {
  1174             iter(probes, P) {
  1175               cast(smallDictt*,p,P);
  1176               bool newstate = m->status & (1UL << iI(probes));
  1177               if (getG(p, rtBool, "state") != newstate) {
  1178                 saveEvent = yes;
  1179                 setG(p, "state", newstate);
  1180                 setG(p, "last", record.time);
  1181                 // send mail when service is down
  1182                 char *state = newstate ? "down" : "up";
  1183                 char *result = newstate ? "failed" : "ok";
  1184                 if (hasG(p, "port")) {
  1185                   cleanCharP(s) = formatS("Service on port %d running in %s is %s", u$(p, "port"), agents[data.id], state);
  1186                   pushG(mailMsg, s);
  1187                   logW("%s", s);
  1188                 }
  1189                 elif (hasG(p, "if")) {
  1190                   cleanCharP(s) = formatS("Network interface %s in %s is down", u$(p, "if"), agents[data.id]);
  1191                   pushG(mailMsg, s);
  1192                   logW("%s", s);
  1193                 }
  1194                 elif (hasG(p, "cmd")) {
  1195                   cleanCharP(s) = formatS("'%s' running in %s is %s", u$(p, "cmd"), agents[data.id], result);
  1196                   pushG(mailMsg, s);
  1197                   logW("%s", s);
  1198                 }
  1199               }
  1200             }
  1201           } // if probes
  1202 
  1203           if (saveEvent) {
  1204             write(agentf[data.id], &record, sizeof(record));
  1205           }
  1206 
  1207           // send mail
  1208           if (lenG(mailMsg)) {
  1209             pError0(writeFileG(mailMsg, "mail.txt"));
  1210             cleanFinishSmallArrayP(mails) = getG(agent, rtSmallArrayt, "mails");
  1211             char *email                   = $(mails, 0);
  1212             cleanCharP(mutt) = formatS("mutt -s \"%s\" %s < mail.txt", mailSubject, email);
  1213             sendMail(mutt);
  1214           }
  1215 
  1216           // disable this, send to monitor only once for every period
  1217           // here it updates monitor for every packet received
  1218           /* // send to monitor */
  1219           /* //lv(completeCfg); */
  1220           /* cleanCharP(monitorData) = toStringG(completeCfg); */
  1221           /* if (sendto(sock, monitorData, lenG(monitorData)+1, 0, (const struct sockaddr *)&server, sizeof(server)) < 0) { */
  1222           /*   logE("monitor send failed: %s", strerror(errno)); */
  1223           /*   close(sock); */
  1224           /*   XFailure; */
  1225           /* } */
  1226         }
  1227       }
  1228       else {
  1229         // when agent is not logger, forward message
  1230         //logD("forward message");
  1231         if (sendto(sock, &data, r, 0, (const struct sockaddr *)&server, sizeof(server)) < 0) {
  1232           logE("send failed: %s", strerror(errno));
  1233           close(sock);
  1234           XFailure;
  1235         }
  1236       }
  1237       handleEventLoopSleep:
  1238       time = getMonotonicTime();
  1239       if (time < nextTime) {
  1240         // adjust timeout to have period delay between the sent messages from this agent
  1241         u64 delta = nextTime - time;
  1242         u64 sec   = delta / 1000000000UL;
  1243         u64 usec  = (delta - sec * 1000000000UL) / 1000;
  1244         /* lv(sec); */
  1245         /* lv(usec); */
  1246         setTimeout(bridgesock, sec, usec);
  1247         goto listenToOthers;
  1248       }
  1249     }
  1250 
  1251     cont:;
  1252   }
  1253 
  1254   // close files
  1255   if (logger) {
  1256     arange(i, agentf) {
  1257       close(agentf[i]);
  1258     }
  1259   }
  1260   close(sock);
  1261 }
  1262 
  1263 /* send mail in another process
  1264  * fork before sending mail to not block the event loop
  1265  */
  1266 void sendMail(char *cmd) {
  1267   pid_t pid;
  1268 
  1269   pid = fork();
  1270   switch (pid) {
  1271     case -1:
  1272       logE("spawnProc error: %s", strerror(errno));
  1273       XFAILURE
  1274     case 0:;
  1275       loop(5) {
  1276         var r = logSystem(cmd);
  1277         if (!r) break;
  1278         sleep(5);
  1279       }
  1280       XSUCCESS;
  1281     default:
  1282       break;
  1283   }
  1284 
  1285   return;
  1286 }
  1287 // vim: set expandtab ts=2 sw=2: