💾 Archived View for gmi.noulin.net › gitRepositories › heartbeat › file › heartbeat.c.gmi captured on 2024-06-20 at 11:55:52. Gemini links have been rewritten to link to archived content
-=-=-=-=-=-=-
heartbeat.c (41009B)
1 #! /usr/bin/env sheepy 2 /* or direct path to sheepy: #! /usr/local/bin/sheepy */ 3 4 /* Libsheepy documentation: https://spartatek.se/libsheepy/ */ 5 #include <unistd.h> // for daemon 6 #include "libsheepyObject.h" 7 #include "shpPackages/short/short.h" 8 #include "shpPackages/termbox/termbox.h" 9 10 // In monitor, do not clear port state when c is pressed 11 // >> when port state is on, a service is not running and it needs to be fixed 12 13 /* 14 Commands: 15 no command: load config.yml 16 probe and send messages to bridge, forward messages to bridge 17 if logger, store messages, forward messages to monitor 18 config: generate configuration yml files for each machine 19 copy configuration to all machines 20 copy heartbeat executable 21 daemon: run as daemon 22 monitor: display state 23 24 25 msgt is the packets sent with udp from the agents 26 dbt is the biggest possible packet, it is written in the db files when there is a state 27 change (event). 28 */ 29 30 31 // cfgFile is the main configuration file written by the user 32 // it used in the config command to generate agentCfgFile for each machine 33 #define cfgFile "heartbeatConfig.yml" 34 35 // cleartime is used to ignore events that happened before cleartime 36 // press c to set cleartime to now 37 #define monitorCleartimeFile "cleartime.json" 38 39 // when running as a daemon, expanding ~/ doesn't work 40 #define uHome "/root/" 41 // monitor user home, monitor doesn't run as a daemon and 42 // can run as a normal user 43 #define muHome "~/" 44 #define home ".heartbeat" 45 46 // agentCfgFile is the default configuration file 47 #define agentCfgPath home"/config.yml" 48 #define agentCfgFile uHome agentCfgPath 49 #define binPath "bin/" 50 #define BIN uHome binPath 51 52 // keys 53 #define SECRET_KEY "secret.bin" 54 #define PUBLIC_KEY "public.bin" 55 #define LOGGER_PUBLIC_KEY "loggerPublic.bin" 56 57 // counter filename for encryption to avoid replay attacks 58 // using the clock fails (as in previous commit) because on some VMs, the clock jumps 59 #define counterFilename "counter.bin" 60 61 #define defaultPeriod 1 62 // TODO #define defaultPeriod 120 63 64 // 10mn timeout 65 #define agentTimeOut 8 66 //TODO #define agentTimeOut 600 67 68 int argc; char **argv; 69 70 /* enable/disable logging */ 71 /* #undef pLog */ 72 /* #define pLog(...) */ 73 74 void printHelp(void); 75 // generate configurations and setup script 76 void config(smallJsont *cfg); 77 // run commands with ssh on agents 78 void runcommand(void); 79 // show state 80 void monitor(void); 81 // send state to logger, as logger collect packets from agents 82 void probe(char *cfgfile, char *secretFile, char *publicFile, char *loggerPublicFile); 83 84 /* process arguments on command line 85 */ 86 int main(int ARGC, char** ARGV) { 87 88 argc = ARGC; argv = ARGV; 89 90 //initLibsheepy(ARGV[0]); 91 // don't initialize threads 92 // there are issues with tpool waiting for threads to finish forever 93 initLibsheepyF(ARGV[0], null); 94 setLogMode(LOG_VERBOSE); 95 //setLogMode(LOG_FUNC); 96 //openProgLogFile(); 97 setLogSymbols(LOG_UTF8); 98 //disableLibsheepyErrorLogs; 99 100 if (argc == 1) { 101 runProbes:; 102 // no arguments 103 cleanCharP(p) = expandHome(agentCfgFile); 104 if (!isPath(p)) { 105 logE("Missing configuration: %s", p); 106 ret 1; 107 } 108 cleanCharP(sf) = expandHome(uHome home "/" SECRET_KEY); 109 if (!isPath(sf)) { 110 logE("Missing configuration: %s", sf); 111 ret 1; 112 } 113 cleanCharP(pf) = expandHome(uHome home "/" PUBLIC_KEY); 114 if (!isPath(pf)) { 115 logE("Missing configuration: %s", pf); 116 ret 1; 117 } 118 cleanCharP(lf) = expandHome(uHome home "/" LOGGER_PUBLIC_KEY); 119 if (!isPath(lf)) { 120 freen(lf); 121 } 122 probe(p, sf, pf, lf); 123 ret 0; 124 } 125 elif (eqG(argv[1], "daemon")) { 126 cleanCharP(h) = expandHome(uHome home "/heartbeat.log"); 127 setLogFile(h); 128 setLogStdout(no); 129 #define KEEP_CWD 1 130 #define STDIO_DEV_NULL 0 131 daemon(KEEP_CWD, STDIO_DEV_NULL); 132 goto runProbes; 133 } 134 elif (eqG(argv[1], "monitor")) { 135 monitor(); 136 } 137 elif (eqG(argv[1], "config")) { 138 cleanAllocateSmallJson(cfg); 139 smallJsont *r = readFileG(cfg, cfgFile); 140 141 if (!r) { 142 logE("Missing "cfgFile); 143 ret 1; 144 } 145 config(cfg); 146 } 147 elif (eqG(argv[1], "command")) { 148 // run command on all machines 149 if (argc != 3) { 150 logE("Missing or too many arguments"); 151 printHelp(); 152 ret 1; 153 } 154 runcommand(); 155 } 156 elif ( eqG(argv[1], "help") 157 or eqG(argv[1], "-h") 158 or eqG(argv[1], "--help") 159 or eqG(argv[1], "-?") 160 or eqG(argv[1], "?")) { 161 printHelp(); 162 } 163 else { 164 logE("Invalid command: %s", argv[1]); 165 printHelp(); 166 ret 1; 167 } 168 169 ret 0; 170 } 171 172 void printHelp(void) { 173 puts(BLD GRN"Help"RST); 174 TODO("write help"); 175 } 176 177 #include "sel.h" 178 179 /* generate setup for all agents 180 */ 181 void config(smallJsont *cfg) { 182 183 //lv(cfg); 184 if (not selInit()) { 185 logE("selInit failed"); 186 XFailure; 187 } 188 189 // generate agent configs: agentNameConfig.yml 190 // add shell commands to copyScript 191 192 // generate agent configs: agentNameConfig.yml 193 u16 port = u$(cfg, "port"); 194 if (!port) { 195 logE("Missing port."); 196 XFailure; 197 } 198 delElemG(cfg, "port"); 199 200 // add shell commands to copyScript 201 cleanAllocateSmallArray(copyScript); 202 203 const char *loggerName = null; 204 char *loggerTransfer = null; 205 char *loggerPublicKey = null; 206 cleanAllocateSmallArray(keyFilenames); 207 208 // id is stored in the agent config and sent in the messages 209 // it is the index of the machine in the list 210 u32 id = 0; 211 iter(cfg, D) { 212 cast(smallDictt*, d, D); 213 //logD("key %s", iK(cfg)); 214 215 // generate configuration file for agent 216 cleanCharP(thisCfgFile) = catS(iK(cfg), "Config.yml"); 217 //lv(thisCfgFile); 218 219 cleanAllocateSmallJson(thisCfg); 220 cleanFinishSmallJsonP(agentCfg) = allocG(rtSmallJsont); 221 setG(thisCfg, "port", (u32)port); 222 223 // populate agent config 224 setG(agentCfg, "id", id); 225 226 if (hasG(d, "logger")) { 227 setG(agentCfg, "logger", TRUE); 228 // TODO("add monitor ip and port in logger agent"); 229 loggerName = iK(cfg); 230 } 231 232 // set bridge address 233 if (hasG(d, "bridge")) { 234 cleanCharP(key) = formatS("\"%s\".\"address\"", $(d, "bridge")); 235 char *address = $(cfg, key); 236 //lv(key); 237 //lv(address); 238 setG(agentCfg, "bridge", address); 239 } 240 241 if (hasG(d, "probes")) { 242 setNFreeG(agentCfg, "probes", getNDupG(d, rtSmallArrayt, "probes")); 243 } 244 245 setG(thisCfg, iK(cfg), agentCfg); 246 //lv(thisCfg); 247 writeFileG(thisCfg, thisCfgFile); 248 249 // generate keys 250 cleanCharP(secretFilename) = catS(iK(cfg), "Secret.bin"); 251 cleanCharP(publicFilename) = catS(iK(cfg), "Public.bin"); 252 pushG(keyFilenames, secretFilename); 253 pushG(keyFilenames, publicFilename); 254 keyst keys = init0Var; 255 newKeysBuf(&keys); 256 pError0(writeFile(secretFilename, keys.secretKey, sizeof(keys.secretKey))); 257 pError0(writeFile(publicFilename, keys.publicKey, sizeof(keys.publicKey))); 258 259 // generate commands to setup the agents 260 pushNFreeG(copyScript, 261 formatS("# %s", iK(cfg))); 262 if (!$(d, "transfers")) { 263 // ssh/scp 264 pushNFreeG(copyScript, 265 formatS("ssh %s mkdir "home, iK(cfg))); 266 if (hasG(d, "logger")) { 267 pushNFreeG(copyScript, 268 formatS("scp "cfgFile" %s:" home "/" cfgFile, iK(cfg))); 269 loggerTransfer = "scp"; 270 loggerPublicKey = $(keyFilenames, -1); 271 } 272 else { 273 if (!loggerPublicKey) { 274 logC("Put logger configuration first in the `"cfgFile"` yml file."); 275 XFailure; 276 } 277 // copy logger public key to agent 278 pushNFreeG(copyScript, 279 formatS("scp %s %s:" home "/" LOGGER_PUBLIC_KEY, loggerPublicKey, iK(cfg))); 280 } 281 pushNFreeG(copyScript, 282 formatS("scp %s %s:" home "/" SECRET_KEY, secretFilename, iK(cfg))); 283 pushNFreeG(copyScript, 284 formatS("scp %s %s:" home "/" PUBLIC_KEY, publicFilename, iK(cfg))); 285 pushNFreeG(copyScript, 286 formatS("scp %s %s:" agentCfgPath, thisCfgFile, iK(cfg))); 287 pushNFreeG(copyScript, 288 formatS("scp heartbeat %s:" binPath, iK(cfg))); 289 pushNFreeG(copyScript, 290 formatS("scp heartbeat.service %s:/etc/systemd/system/", iK(cfg))); 291 pushNFreeG(copyScript, 292 formatS("ssh %s \"systemctl daemon-reload ; systemctl enable heartbeat ; systemctl start heartbeat\"", iK(cfg))); 293 } 294 elif (eqG($(d, "transfers"), "copy")) { 295 // cp same machine 296 pushG(copyScript, 297 "mkdir "uHome home); 298 if (hasG(d, "logger")) { 299 pushG(copyScript, 300 "cp "cfgFile" "uHome home "/" cfgFile); 301 loggerTransfer = "ssh"; 302 loggerPublicKey = $(keyFilenames, -1); 303 } 304 else { 305 if (!loggerPublicKey) { 306 logC("Put logger configuration first in the `"cfgFile"` yml file."); 307 XFailure; 308 } 309 // copy logger public key to agent 310 pushNFreeG(copyScript, 311 formatS("cp %s " uHome home "/" LOGGER_PUBLIC_KEY, loggerPublicKey)); 312 } 313 pushNFreeG(copyScript, 314 formatS("cp %s " uHome home "/" SECRET_KEY, secretFilename)); 315 pushNFreeG(copyScript, 316 formatS("cp %s " uHome home "/" PUBLIC_KEY, publicFilename)); 317 pushNFreeG(copyScript, 318 formatS("cp %s " agentCfgFile, thisCfgFile)); 319 pushNFreeG(copyScript, 320 formatS("cp heartbeat " BIN)); 321 pushG(copyScript, 322 "cp heartbeat.service /etc/systemd/system/"); 323 pushG(copyScript, 324 "systemctl daemon-reload ; systemctl enable heartbeat ; systemctl start heartbeat"); 325 } 326 inc id; 327 } 328 329 // copy all keys to logger 330 // logger needs to know the public key for all agents 331 iter(keyFilenames, F) { 332 pushNFreeG(copyScript, 333 eqG(loggerTransfer, "scp") ? formatS("scp %s %s:" home "/", ssGet(F), loggerName) : 334 formatS("cp %s " uHome home "/", ssGet(F))); 335 } 336 337 logG(copyScript); 338 } 339 340 void runcommand(void) { 341 cleanAllocateSmallJson(cfg); 342 cleanCharP(p) = expandHome(uHome home "/" cfgFile); 343 smallJsont *r = readFileG(cfg, p); 344 if (!r) { 345 logE("Missing %s", p); 346 XFailure; 347 } 348 delElemG(cfg, "port"); 349 350 iter(cfg, D) { 351 cast(smallDictt*, d, D); 352 if (hasG(d, "logger")) continue; 353 // no, can kill itself with pkill - logSystem(argv[2]); 354 //else { 355 logSystemf("ssh %s \"%s\"", iK(cfg)/*agent*/, argv[2]); 356 //} 357 } 358 } 359 360 #include <sys/types.h> 361 #include <sys/socket.h> 362 #include <netinet/in.h> 363 #include <netdb.h> 364 #include <sys/stat.h> 365 #include <fcntl.h> 366 #include <arpa/inet.h> // for inet_ntoa 367 368 void monitor(void) { 369 // load config 370 // find logger config 371 // create monitor socket 372 // events older than clearTime are not displayed 373 // load cleartime 374 // setup terminal 375 // infinite loop 376 377 cleanAllocateSmallJson(cfg); 378 cleanCharP(p) = expandHome(muHome home "/" cfgFile); 379 smallJsont *r = readFileG(cfg, p); 380 if (!r) { 381 logE("Missing %s", p); 382 XFailure; 383 } 384 delElemG(cfg, "port"); 385 386 // find logger config 387 const char *loggerName = null; 388 iter(cfg, D) { 389 cast(smallDictt*, d, D); 390 if (hasG(d, "logger")) { 391 loggerName = iK(cfg); 392 break; 393 } 394 } 395 // create monitor socket 396 cleanFinishSmallDictP(logger) = getG(cfg, rtSmallDictt, loggerName); 397 u16 port = u$(logger, "monitorPort"); 398 399 int sock; 400 struct sockaddr_in server; 401 struct sockaddr_in client; 402 char buf[65536]; 403 404 sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); 405 if (sock < 0){ 406 logE("Failed to create server socket: %s", strerror(errno)); 407 XFailure; 408 } 409 410 server.sin_family = AF_INET; 411 server.sin_addr.s_addr = INADDR_ANY; 412 server.sin_port = htons(port); 413 414 if (bind(sock, (struct sockaddr *) &server, sizeof(server))){ 415 logE("bind failed: %s, port %d", strerror(errno), port); 416 XFailure; 417 } 418 419 // events older than clearTime are not displayed 420 time_t clearTime = 0; 421 422 // load cleartime 423 cleanAllocateSmallJson(ct); 424 cleanCharP(ctPath) = expandHome(muHome home "/" monitorCleartimeFile); 425 if (isPath(ctPath)) { 426 readFileG(ct, ctPath); 427 clearTime = getTopG(ct, rtU64); 428 } 429 430 // setup terminal 431 int R = tb_init(); 432 if (R) { 433 logE("tb_init() failed with error code %d\n", R); 434 XFailure; 435 } 436 437 tb_select_output_mode(TB_OUTPUT_TRUECOLOR); 438 439 // infinite loop 440 cleanAllocateSmallJson(state); 441 442 tb_stringf(0, 0, TB_WHITE, TB_DEFAULT, "%s", "No state received from logger yet..."); 443 tb_present(); 444 445 // when the first udp packet arrives, the message above is erased 446 bool gotAMessage = no; 447 448 struct tb_event ev; 449 while (tb_poll_event(&ev, sock)) { 450 switch (ev.type) { 451 case TB_EVENT_KEY: 452 switch (ev.key) { 453 case TB_KEY_ESC: 454 goto done; 455 } 456 if (ev.ch == 'q') goto done; 457 elif (ev.ch == 'c') { 458 clearTime = getCurrentUnixTime(); 459 setTopG(ct, clearTime); 460 writeFileG(ct, ctPath); 461 } 462 break; 463 case TB_EVENT_RESIZE: 464 //draw_all(); 465 break; 466 case TB_EVENT_SOCKET: 467 if (!gotAMessage) { 468 // erase default message 469 tb_stringf(0, 0, TB_WHITE, TB_DEFAULT, "%s", "Agent State Last change Boot Boot time Probes"); 470 gotAMessage = yes; 471 } 472 pError0(ZEROVAR(buf)); 473 socklen_t addr_size = sizeof(client); 474 ssize_t r = recvfrom(sock, buf, sizeof(buf), 0, (struct sockaddr *) &client, &addr_size); 475 if (r == -1) { 476 logE("recvfrom: %s", strerror(errno)); 477 continue; 478 } 479 parseG(state, buf); 480 u16 line = 1; 481 iter(state, D) { 482 if (!isOSmallDict(D)) continue; 483 cast(smallDictt*, d, D); 484 u16 x = 0; 485 tb_stringf(x, line, TB_WHITE, TB_DEFAULT, "%20s", iK(state) /*agent name*/); 486 x += 21; 487 u32 color, bgcolor; 488 if (eqG($(d, "state"), "alive")) { 489 color = TB_WHITE; 490 bgcolor = TB_BLACK; 491 } 492 elif (eqG($(d, "state"), "init")) { 493 color = TB_GREEN; 494 bgcolor = TB_BLACK; 495 } 496 elif (eqG($(d, "state"), "down")) { 497 color = TB_WHITE; 498 bgcolor = TB_RED; 499 } 500 else { 501 color = TB_BLACK; 502 bgcolor = TB_RED; 503 } 504 tb_stringf(x, line, color, bgcolor, "%5s", $(d, "state")); 505 x += 6; 506 cleanCharP(lastStateChange) = timeToYMDS(u$(d, "last")); 507 tb_stringf(x, line, TB_WHITE, TB_DEFAULT, "%19s", lastStateChange); 508 x += 20; 509 //tb_stringf(47, line, TB_WHITE, TB_DEFAULT, "%d", u$(d, "mId")); 510 char *rebootorNet = ""; 511 cleanCharP(lastBoot) = null; 512 if (getG(d, rtBool, "rebooted") and u$(d, "lastBoot") > clearTime) { 513 color = TB_WHITE; 514 bgcolor = TB_RED; 515 rebootorNet = "rebooted"; 516 lastBoot = timeToYMDS(u$(d, "lastBoot")); 517 } 518 elif (getG(d, rtBool, "net") and u$(d, "lastNet") > clearTime) { 519 u64 t = u$(d, "lastNet"); 520 // check network issues 521 color = TB_WHITE; 522 bgcolor = TB_RED; 523 rebootorNet = "network"; 524 lastBoot = timeToYMDS(u$(d, "lastNet")); 525 } 526 else { 527 color = TB_WHITE; 528 bgcolor = TB_BLACK; 529 lastBoot = timeToYMDS(u$(d, "lastBoot")); 530 } 531 tb_stringf(x, line, color, bgcolor, "%8s", rebootorNet); 532 x += 19; 533 tb_stringf(x, line, TB_WHITE, TB_DEFAULT, "%19s", lastBoot); 534 x += 20; 535 cleanFinishSmallArrayP(probes) = getG(d, rtSmallArrayt, "probes"); 536 if (probes) { 537 iter(probes, P) { 538 cast(smallDictt*,p,P); 539 if (getG(p, rtBool, "state")) { 540 // service down 541 color = TB_WHITE; 542 bgcolor = TB_RED; 543 } 544 else { 545 // service up 546 color = TB_WHITE; 547 bgcolor = TB_BLACK; 548 } 549 tb_stringf(x, line, color, bgcolor, "port %5d", u$(p, "port")); 550 x += 11; 551 } 552 } 553 inc line; 554 } 555 break; 556 } 557 tb_present(); 558 } 559 560 done: 561 tb_shutdown(); 562 } 563 564 565 void sendMail(char *cmd); 566 567 void setTimeout(int bridgesock, u32 period, u32 usec) { 568 struct timeval timeout = {.tv_sec = period, .tv_usec = usec}; 569 if (setsockopt(bridgesock, SOL_SOCKET, SO_RCVTIMEO, (char *) &timeout, sizeof(timeout)) < 0) { 570 logE("Could not set receive socket time out: %s", strerror(errno)); 571 close(bridgesock); 572 XFailure; 573 } 574 if (setsockopt(bridgesock, SOL_SOCKET, SO_SNDTIMEO, (char *) &timeout, sizeof(timeout)) < 0) { 575 logE("Could not set send socket time out: %s", strerror(errno)); 576 close(bridgesock); 577 XFailure; 578 } 579 } 580 581 /* agent, bridge and logger 582 */ 583 void probe(char *cfgfile, char *secretFile, char *publicFile, char *loggerPublicFile) { 584 585 // load agent config 586 // get port and agent name 587 // setup agent 588 // setup logger 589 // create client socket 590 // create socket for brigde (only if not logger) 591 // start server or bridge for the agents 592 // message structures 593 // infinite loop 594 // check probes 595 // send message 596 // this is the logger 597 // write messages directly 598 // update logger state and send mail if changed 599 // set remote agents down if there hasn't been a message in a while 600 // send mail 601 // bridge messages from others 602 // when agent is logger, store message 603 // update state for this agent 604 // send mail when agent rebooted 605 // send mail when service is down 606 // send mail 607 // when agent is not logger, forward message 608 609 if (not selInit()) { 610 logE("selInit failed"); 611 XFailure; 612 } 613 614 u64 counter = 0; 615 cleanCharP(counterfn) = expandHome(uHome home counterFilename); 616 if (isPath(counterfn)) { 617 // load counter value from disk 618 pError(bLReadFile(counterfn, &counter, sizeof(counter))); 619 } 620 621 keyst keys = init0Var; 622 623 // load keys 624 pError(bLReadFile(secretFile, keys.secretKey, sizeof(keys.secretKey))); 625 pError(bLReadFile(publicFile, keys.publicKey, sizeof(keys.publicKey))); 626 if (loggerPublicFile) { 627 pError(bLReadFile(loggerPublicFile, keys.remotePublicKey, sizeof(keys.remotePublicKey))); 628 } 629 630 // complete configuration in logger 631 cleanSmallJsonP(completeCfg) = null; 632 633 // load agent config 634 cleanAllocateSmallJson(cfg); 635 readFileG(cfg, cfgfile); 636 //lv(cfg); 637 638 // get port and agent name 639 u16 port = u$(cfg, "port"); 640 if (!port) { 641 logE("Missing port."); 642 XFailure; 643 } 644 delElemG(cfg, "port"); 645 cleanListP(names) = keysG(cfg); 646 //lv(names); 647 if (lenG(names) != 1) { 648 logE("There should only and at least one agent configured: %d", lenG(names)); 649 XFailure; 650 } 651 652 // this agent name in the configuration 653 char *name = names[0]; 654 655 // setup agent 656 cleanFinishSmallDictP(agent) = getG(cfg, rtSmallDictt, name); 657 //lv(agent); 658 if (!agent) { 659 logE("Agent object has wrong type, should be smallDict."); 660 XFailure; 661 } 662 663 u32 id = u$(agent, "id"); 664 bool logger = hasG(agent, "logger"); 665 u16 monitorPort = 0; 666 if (logger) { 667 // load complete configuration to get monitorPort 668 completeCfg = allocG(rtSmallJsont); 669 cleanCharP(p) = expandHome(uHome home "/" cfgFile); 670 smallJsont *r = readFileG(completeCfg, p); 671 if (!r) { 672 logE("Missing %s", p); 673 XFailure; 674 } 675 delElemG(completeCfg, "port"); 676 677 // find logger config 678 const char *loggerName = null; 679 iter(completeCfg, D) { 680 cast(smallDictt*, d, D); 681 if (hasG(d, "logger")) { 682 loggerName = iK(completeCfg); 683 break; 684 } 685 } 686 cleanFinishSmallDictP(logger) = getG(completeCfg, rtSmallDictt, loggerName); 687 688 // set monitor port 689 monitorPort = u$(logger, "monitorPort"); 690 if (!monitorPort) { 691 logE("monitorPort missing in logger configuration"); 692 XFailure; 693 } 694 } 695 char *dst = logger ? "localhost" /*TODO add monitorIp*/ : $(agent, "bridge"); 696 if (!dst) { 697 logE("Missing bridge address."); 698 XFailure; 699 } 700 cleanFinishSmallArrayP(probes) = getG(agent, rtSmallArrayt, "probes"); 701 if (probes and !lenG(probes)) { 702 finishG(probes); 703 probes = null; 704 } 705 u32 period = hasG(agent, "period") ? u$(agent, "period") : defaultPeriod; 706 /* lv(id); */ 707 /* lv(logger); */ 708 /* lv(dst); */ 709 //lv(probes); 710 //lv(period); 711 712 // setup logger 713 // agent names 714 cleanListP(agents) = null; 715 // agent db filenames 716 cleanListP(agentdbs) = null; 717 // when logger was started 718 u64 startTime = 0; 719 cleanCharP(mailSubject) = null; 720 721 if (logger) { 722 startTime = getCurrentUnixTime(); 723 724 // create filename with path in ~/.heartbeat/ 725 agents = keysG(completeCfg); 726 agentdbs = keysG(completeCfg); 727 cleanCharP(h) = expandHome(uHome home "/"); 728 forEachCharP(agentdbs, ag) { 729 pErrorNULL(iPrependS(ag, h)); 730 pErrorNULL(iAppendS(ag, ".bin")); 731 } 732 //lv(agentdbs); 733 734 // mail setup 735 pError0(chDir(h)); 736 mailSubject = catS("Heatbeat on ", name); 737 } 738 // open log files and load agent public keys 739 int agentf[lenG(agents)]; 740 u8 agentPublicKeys[lenG(agents)][sizeof(keys.remotePublicKey)]; 741 if (logger) { 742 arange(i, agentf) { 743 agentf[i] = open(agentdbs[i], O_APPEND|O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR); 744 if (agentf[i] < 0) { 745 logE("Failed to open %s: %s", agentdbs[i], strerror(errno)); 746 XFailure; 747 } 748 cleanCharP(publicFilename) = catS(uHome home "/", agents[i], "Public.bin"); 749 cleanCharP(pfn) = expandHome(publicFilename); 750 pError(bLReadFile(pfn, agentPublicKeys[i], sizeof(agentPublicKeys[i]))); 751 } 752 // setup objects for monitor 753 setG(completeCfg, "startTime", startTime); 754 iter(completeCfg, D) { 755 if (!isOSmallDict(D)) continue; 756 cast(smallDictt*,d,D); 757 setG(d, "state", "unknown"); // network state 758 setG(d, "last", startTime); // network state change 759 setG(d, "mId", 0); // last message id, if next mId under last one, the agent rebooted 760 setG(d, "rebooted", FALSE); 761 setG(d, "lastBoot", startTime); 762 setG(d, "time", 0); // last incoming message time 763 setG(d, "count", 0); // packet counter from agent to avoid replay attacks 764 setG(d, "c", 0); // packet counter under a period 765 setG(d, "mono", 0); // current period start time 766 setG(d, "net", FALSE); // there is a network issue, when agent goes from down to alive and next mId is higher than last mId 767 setG(d, "lastNet", startTime); 768 cleanFinishSmallArrayP(probes) = getG(d, rtSmallArrayt, "probes"); 769 if (probes) { 770 iter(probes, P) { 771 cast(smallDictt*,p,P); 772 setG(p, "state", FALSE); 773 setG(p, "last", startTime); 774 setPG(probes, iI(probes), p); 775 } 776 } 777 setPG(completeCfg, iK(completeCfg), d); 778 } 779 } 780 781 782 // create client socket 783 // sock is client socket or monitor socket on logger 784 int sock = -1; 785 struct sockaddr_in server; 786 struct hostent *hp; 787 788 sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); 789 if (sock < 0) { 790 logE("Failed to create socket: %s", strerror(errno)); 791 XFailure; 792 } 793 794 server.sin_family = AF_INET; 795 796 hp = gethostbyname(dst); 797 if (hp==0) { 798 logE("gethostbyname failed: %s", strerror(errno)); 799 close(sock); 800 XFailure; 801 } 802 803 memcpy(&server.sin_addr, hp->h_addr, hp->h_length); 804 server.sin_port = htons(logger ? monitorPort : port); 805 806 // create socket for brigde (only if not logger) 807 // start server or bridge for the agents 808 int bridgesock; 809 struct sockaddr_in bridge; 810 struct sockaddr_in client; 811 812 bridgesock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); 813 if (bridgesock < 0){ 814 logE("Failed to create bridge socket: %s", strerror(errno)); 815 XFailure; 816 } 817 818 bridge.sin_family = AF_INET; 819 bridge.sin_addr.s_addr = INADDR_ANY; 820 bridge.sin_port = htons(port); 821 822 if (bind(bridgesock, (struct sockaddr *) &bridge, sizeof(bridge))){ 823 logE("Bridge bind failed: %s, port %d", strerror(errno), port); 824 XFailure; 825 } 826 827 // packet structure 828 typ struct PACKED { 829 u8 nonce[crypto_box_NONCEBYTES]; 830 u32 id; 831 i32 len; 832 u8 buf[64*1024]; // buf holds the encrypted data (it is slightly bigger than the payload) 833 } packett; 834 // message structures 835 typ struct PACKED { 836 u64 messageId; 837 u64 status; 838 u32 info; 839 } msgt; 840 typ struct PACKED { 841 u64 count; 842 msgt m; 843 } payloadt; 844 typ struct PACKED { 845 u64 time; 846 u32 id; 847 msgt m; 848 u32 levels[32]; 849 } dbt; 850 packett packet = {.id = id}; // packet to send, id is the agent id 851 packett data = init0Var; // received packet 852 msgt msg = init0Var; // message to send 853 dbt record = init0Var; // record to write on disk in logger 854 //lv(sizeof(msg)); 855 856 // infinite loop 857 rangeInf(messageId) { 858 //lv(messageId); 859 860 u64 time = getMonotonicTime(); 861 u64 nextTime = time + (u64)period * 1000000000UL; 862 863 msg.messageId = messageId; 864 msg.status = 0; 865 msg.info = 0; 866 867 // check probes 868 if (probes) { 869 iter(probes, P) { 870 cast(smallDictt*,p,P); 871 if (not isOSmallDict(p)) { 872 logE("Invalid probe at index %d", iI(probes)); 873 XFailure; 874 } 875 // check all used ports tcp and tcp6 876 staticBitsetT(usedPortst, u64 , 65536); 877 usedPortst usedPorts; 878 staticBitsetInit(&usedPorts); 879 cleanAllocateSmallArray(tcp); 880 readFileG(tcp, "/proc/net/tcp"); 881 delElemG(tcp, 0); 882 iter(tcp, L) { 883 castS(l, L); 884 // 0: 00000000:0016 00000000:0000 0A 00000000:00000000 00:00000000 00000000 0 0 14737 1 ffff8d88f5265800 100 0 0 10 0 885 cleanSmallArrayP(ll) = splitG(l, ' '); 886 compactG(ll); 887 //logVarG(ll); 888 char *localAddr = $(ll, 1); 889 char *portStart = localAddr + 8; 890 *(portStart-1) = '0'; 891 *portStart = 'x'; 892 //logVarG(localAddr); 893 u16 port = parseHex(portStart-1); 894 //logVarG(port); 895 staticBitset1(&usedPorts, port); 896 } 897 emptyG(tcp); 898 readFileG(tcp, "/proc/net/tcp6"); 899 delElemG(tcp, 0); 900 iter(tcp, L) { 901 castS(l, L); 902 // 0: 00000000000000000000000000000000:0050 00000000000000000000000000000000:0000 0A 00000000:00000000 00:00000000 00000000 0 0 12592 1 ffff8d88f6b7c940 100 0 0 10 0 903 cleanSmallArrayP(ll) = splitG(l, ' '); 904 compactG(ll); 905 //logVarG(ll); 906 char *localAddr = $(ll, 1); 907 char *portStart = localAddr + 32; 908 // make sure port string starts with 0x, it should be 0xABCD 909 *(portStart-1) = '0'; 910 *portStart = 'x'; 911 //logVarG(localAddr); 912 u16 port = parseHex(portStart-1); 913 //logVarG(port); 914 staticBitset1(&usedPorts, port); 915 } 916 if (hasG(p, "port")) { 917 if (not staticBitsetGet(&usedPorts, u$(p, "port"))) 918 msg.status |= 1UL << iI(probes); 919 } 920 elif (hasG(p, "if")) { 921 // TODO("check if network interface is down"); 922 } 923 } 924 } // if probes 925 926 //logD("status %x", msg.status); 927 928 if (not logger) { 929 // send message 930 // encrypt message 931 randombytes_buf(keys.nonce, sizeof(keys.nonce)); 932 memcpy(packet.nonce, keys.nonce, sizeof(keys.nonce)); 933 payloadt payload; 934 // set counter in encrypted message to avoid replay attacks 935 payload.count = counter++; 936 pError0(writeFile(counterfn, &counter, sizeof(counter))); 937 payload.m = msg; 938 packet.len = selPublicEncrypt(packet.buf, sizeof(packet.buf), (u8*)&payload, sizeof(payload), &keys); 939 // send message 940 if (sendto(sock, &packet, packet.len + sizeof(packet.nonce) + sizeof(packet.id) + sizeof(packet.len), 0, (const struct sockaddr *)&server, sizeof(server)) < 0) { 941 logE("send failed: %s", strerror(errno)); 942 close(sock); 943 XFailure; 944 } 945 } 946 else { 947 // this is the logger 948 // write messages directly 949 bool saveEvent = no; 950 record.m = msg; 951 record.time = getCurrentUnixTime(); 952 cleanFinishSmallDictP(logAgent) = getG(completeCfg, rtSmallDictt, name); 953 setG(logAgent, "time", record.time); 954 char *newstate = messageId ? "alive" : "init"; 955 if (!eqG($(logAgent, "state"), newstate)) { 956 saveEvent = yes; 957 /* logD("%s is %s, previous state was %s", */ 958 /* name#<{(|agent name|)}>#, */ 959 /* newstate, */ 960 /* $(logAgent,"state")); */ 961 setG(logAgent, "state", newstate); 962 setG(logAgent, "last", record.time); 963 } 964 965 cleanAllocateSmallArray(mailMsg); 966 967 // update logger state and send mail if changed 968 if (probes) { 969 iter(probes, P) { 970 cast(smallDictt*,p,P); 971 bool newstate = msg.status & (1UL << iI(probes)); 972 if (getG(p, rtBool, "state") != newstate) { 973 saveEvent = yes; 974 setG(p, "state", newstate); 975 setG(p, "last", record.time); 976 // send mail when service is down 977 char *state = newstate ? "down" : "up"; 978 char *result = newstate ? "failed" : "ok"; 979 if (hasG(p, "port")) { 980 cleanCharP(s) = formatS("Service on port %d running in %s is %s", u$(p, "port"), name, state); 981 pushG(mailMsg, s); 982 logW("%s", s); 983 } 984 elif (hasG(p, "if")) { 985 cleanCharP(s) = formatS("Network interface %s in %s is down", u$(p, "if"), name); 986 pushG(mailMsg, s); 987 logW("%s", s); 988 } 989 elif (hasG(p, "cmd")) { 990 cleanCharP(s) = formatS("'%s' running in %s is %s", u$(p, "cmd"), name, result); 991 pushG(mailMsg, s); 992 logW("%s", s); 993 } 994 } 995 } 996 } // if probes 997 998 if (saveEvent) { 999 write(agentf[id], &record, sizeof(record)); 1000 } 1001 1002 // set remote agents down if there hasn't been a message in a while 1003 iter(completeCfg, D) { 1004 if (!isOSmallDict(D)) continue; 1005 cast(smallDictt*,d,D); 1006 if (eqG($(d, "state"), "down") or eqG($(d, "state"), "unknown")) continue; 1007 u64 timeout = u$(d, "timeout") ? u$(d, "timeout") : agentTimeOut; 1008 if (getCurrentUnixTime() - u$(d, "time") < timeout) continue; 1009 // send mail when agent is down 1010 cleanCharP(s) = formatS("%s is down, previous state was %s", iK(completeCfg)/*agent name*/, $(d,"state")); 1011 pushG(mailMsg, s); 1012 logW("%s", s); 1013 setG(d, "state", "down"); 1014 setG(d, "last", getCurrentUnixTime()); 1015 } 1016 1017 // send mail 1018 if (lenG(mailMsg)) { 1019 pError0(writeFileG(mailMsg, "mail.txt")); 1020 cleanFinishSmallArrayP(mails) = getG(logAgent, rtSmallArrayt, "mails"); 1021 char *email = $(mails, 0); 1022 cleanCharP(mutt) = formatS("mutt -s \"%s\" %s < mail.txt", mailSubject, email); 1023 sendMail(mutt); 1024 } 1025 1026 // send to monitor 1027 //lv(completeCfg); 1028 cleanCharP(monitorData) = toStringG(completeCfg); 1029 if (sendto(sock, monitorData, lenG(monitorData)+1, 0, (const struct sockaddr *)&server, sizeof(server)) < 0) { 1030 logE("monitor send failed: %s", strerror(errno)); 1031 close(sock); 1032 XFailure; 1033 } 1034 } 1035 1036 // bridge messages from others 1037 // set timeout to period time, to sleep enough time 1038 setTimeout(bridgesock, period, 0); 1039 listenToOthers:; 1040 socklen_t addr_size = sizeof(client); 1041 memset(&data, 0, sizeof(data)); 1042 ssize_t r = recvfrom(bridgesock, &data, sizeof(data), 0, (struct sockaddr *) &client, &addr_size); 1043 if (r == -1) { 1044 // timeout 1045 goto cont; 1046 } 1047 elif (r != data.len + sizeof(data.nonce) + sizeof(data.id) + sizeof(data.len)) { 1048 char *ip = inet_ntoa((client).sin_addr); 1049 logE("Dropped packet with id %d from ip %s. Incorrect size, received %d and expected %d", data.id, ip, r, data.len + sizeof(data.nonce) + sizeof(data.id) + sizeof(data.len)); 1050 goto cont; 1051 } 1052 else { 1053 // got a message 1054 // forward message when acting as a bridge 1055 //lv(r); 1056 if (logger) { 1057 // when agent is logger, store message 1058 //logD("store message"); 1059 if (data.id >= ARRAY_SIZE(agentf)) { 1060 char *ip = inet_ntoa((client).sin_addr); 1061 logE("Invalid id: %d, ip %s, packet size %d", data.id, ip, r); 1062 // drop packet 1063 } 1064 elif (data.id == id) { 1065 char *ip = inet_ntoa((client).sin_addr); 1066 logE("Invalid id, the logger doesn't send packets, ip %s", ip); 1067 // drop packet 1068 } 1069 else { 1070 // agent id is valid 1071 // decrypt message 1072 // copy agent public key to keys.remotePublicKey 1073 memcpy(keys.remotePublicKey, agentPublicKeys[data.id], sizeof(keys.remotePublicKey)); 1074 1075 // copy nonce 1076 memcpy(keys.nonce, data.nonce, sizeof(keys.nonce)); 1077 1078 payloadt payload; 1079 int len = selPublicDecrypt((u8*)&payload, sizeof(payload), (u8*)&data.buf, data.len, &keys); 1080 1081 if (!len) { 1082 char *ip = inet_ntoa((client).sin_addr); 1083 logE("Failed to decrypt, ip %s, agent %s", ip, agents[data.id]); 1084 // drop packet 1085 goto handleEventLoopSleep; 1086 } 1087 1088 // update state for this agent 1089 cleanFinishSmallDictP(agent) = getG(completeCfg, rtSmallDictt, agents[data.id]); 1090 1091 // packet is valid only if the counter is higher than the previous one 1092 if (u$(agent, "count") and payload.count <= u$(agent, "count")) { 1093 // packet counter is already initialized 1094 char *ip = inet_ntoa((client).sin_addr); 1095 logW("Dropping packet. Wrong remote packet count %"PRIu64" local %"PRIu64" diff %"PRIi64", ip %s, agent %s", payload.count, u$(agent, "count"), (i64)u$(agent, "count") - (i64)payload.count, ip, agents[data.id]); 1096 goto handleEventLoopSleep; 1097 } 1098 setG(agent, "count", payload.count); 1099 1100 // message is decrypted 1101 msgt *m = &payload.m; 1102 /* logD("got message from id %d, agent %s", data.id, agents[data.id]); */ 1103 /* logD("with message id %d", m->messageId); */ 1104 /* logD("and status %x", m->status); */ 1105 1106 bool saveEvent = no; 1107 record.m = *m; 1108 record.time = getCurrentUnixTime(); 1109 1110 cleanAllocateSmallArray(mailMsg); 1111 1112 // count packets in each period, if more than 2 packets arrive during a period, 1113 // something is wrong. 1114 // Don't measure time between packet because 1115 // they don't arrive regurlarly, some packets are delayed 1116 /* TODO use specific period for agent */ 1117 u64 expectedNextTime = getG(agent, rtU64, "mono") + (u64)period * 1000000000UL; 1118 time = getMonotonicTime(); 1119 if (time > expectedNextTime) { 1120 // reset packet counter 1121 setG(agent, "c", 0); 1122 setG(agent, "mono", time); 1123 } 1124 else { 1125 var c = getG(agent, rtU64P, "c"); 1126 if (c) { 1127 inc *c; 1128 if (*c > 2) { 1129 char *ip = inet_ntoa((client).sin_addr); 1130 logW("Too many packets from agent %s and ip %s, drop packet.", agents[data.id], ip); 1131 goto cont; 1132 } 1133 } 1134 } 1135 bool loggerRunning = u$(agent, "time"); 1136 setG(agent, "time", record.time); 1137 char *newstate = m->messageId ? "alive" : "init"; 1138 // last message id, if next mId under last one, the agent rebooted 1139 if (m->messageId < u$(agent,"mId")) { 1140 saveEvent = yes; 1141 setG(agent, "rebooted", TRUE); 1142 setG(agent, "lastBoot", record.time); 1143 // send mail when agent rebooted 1144 cleanCharP(s) = formatS("%s rebooted", agents[data.id]); 1145 pushG(mailMsg, s); 1146 logW("%s", s); 1147 } 1148 elif ( eqG($(agent, "state"), "down") 1149 and eqG(newstate, "alive") 1150 and loggerRunning) { 1151 // there is a network issue when the agent was down 1152 // and messageId is higher than the last received messageId 1153 // (it didn't reboot) 1154 // when starting logger, agent state is down 1155 // if the agents started running before logger, the agent states are alive 1156 // in that case mails are not sent because loggerRunning is false 1157 saveEvent = yes; 1158 setG(agent, "net", TRUE); 1159 setG(agent, "lastNet", record.time); 1160 // send mail when agent has network issues 1161 cleanCharP(s) = formatS("%s has network issues", agents[data.id]); 1162 pushG(mailMsg, s); 1163 logW("%s", s); 1164 } 1165 setG(agent, "mId", m->messageId); 1166 // update state 1167 if (!eqG($(agent, "state"), newstate)) { 1168 saveEvent = yes; 1169 setG(agent, "state", newstate); 1170 setG(agent, "last", record.time); 1171 } 1172 cleanFinishSmallArrayP(probes) = getG(agent, rtSmallArrayt, "probes"); 1173 if (probes) { 1174 iter(probes, P) { 1175 cast(smallDictt*,p,P); 1176 bool newstate = m->status & (1UL << iI(probes)); 1177 if (getG(p, rtBool, "state") != newstate) { 1178 saveEvent = yes; 1179 setG(p, "state", newstate); 1180 setG(p, "last", record.time); 1181 // send mail when service is down 1182 char *state = newstate ? "down" : "up"; 1183 char *result = newstate ? "failed" : "ok"; 1184 if (hasG(p, "port")) { 1185 cleanCharP(s) = formatS("Service on port %d running in %s is %s", u$(p, "port"), agents[data.id], state); 1186 pushG(mailMsg, s); 1187 logW("%s", s); 1188 } 1189 elif (hasG(p, "if")) { 1190 cleanCharP(s) = formatS("Network interface %s in %s is down", u$(p, "if"), agents[data.id]); 1191 pushG(mailMsg, s); 1192 logW("%s", s); 1193 } 1194 elif (hasG(p, "cmd")) { 1195 cleanCharP(s) = formatS("'%s' running in %s is %s", u$(p, "cmd"), agents[data.id], result); 1196 pushG(mailMsg, s); 1197 logW("%s", s); 1198 } 1199 } 1200 } 1201 } // if probes 1202 1203 if (saveEvent) { 1204 write(agentf[data.id], &record, sizeof(record)); 1205 } 1206 1207 // send mail 1208 if (lenG(mailMsg)) { 1209 pError0(writeFileG(mailMsg, "mail.txt")); 1210 cleanFinishSmallArrayP(mails) = getG(agent, rtSmallArrayt, "mails"); 1211 char *email = $(mails, 0); 1212 cleanCharP(mutt) = formatS("mutt -s \"%s\" %s < mail.txt", mailSubject, email); 1213 sendMail(mutt); 1214 } 1215 1216 // disable this, send to monitor only once for every period 1217 // here it updates monitor for every packet received 1218 /* // send to monitor */ 1219 /* //lv(completeCfg); */ 1220 /* cleanCharP(monitorData) = toStringG(completeCfg); */ 1221 /* if (sendto(sock, monitorData, lenG(monitorData)+1, 0, (const struct sockaddr *)&server, sizeof(server)) < 0) { */ 1222 /* logE("monitor send failed: %s", strerror(errno)); */ 1223 /* close(sock); */ 1224 /* XFailure; */ 1225 /* } */ 1226 } 1227 } 1228 else { 1229 // when agent is not logger, forward message 1230 //logD("forward message"); 1231 if (sendto(sock, &data, r, 0, (const struct sockaddr *)&server, sizeof(server)) < 0) { 1232 logE("send failed: %s", strerror(errno)); 1233 close(sock); 1234 XFailure; 1235 } 1236 } 1237 handleEventLoopSleep: 1238 time = getMonotonicTime(); 1239 if (time < nextTime) { 1240 // adjust timeout to have period delay between the sent messages from this agent 1241 u64 delta = nextTime - time; 1242 u64 sec = delta / 1000000000UL; 1243 u64 usec = (delta - sec * 1000000000UL) / 1000; 1244 /* lv(sec); */ 1245 /* lv(usec); */ 1246 setTimeout(bridgesock, sec, usec); 1247 goto listenToOthers; 1248 } 1249 } 1250 1251 cont:; 1252 } 1253 1254 // close files 1255 if (logger) { 1256 arange(i, agentf) { 1257 close(agentf[i]); 1258 } 1259 } 1260 close(sock); 1261 } 1262 1263 /* send mail in another process 1264 * fork before sending mail to not block the event loop 1265 */ 1266 void sendMail(char *cmd) { 1267 pid_t pid; 1268 1269 pid = fork(); 1270 switch (pid) { 1271 case -1: 1272 logE("spawnProc error: %s", strerror(errno)); 1273 XFAILURE 1274 case 0:; 1275 loop(5) { 1276 var r = logSystem(cmd); 1277 if (!r) break; 1278 sleep(5); 1279 } 1280 XSUCCESS; 1281 default: 1282 break; 1283 } 1284 1285 return; 1286 } 1287 // vim: set expandtab ts=2 sw=2: