💾 Archived View for radia.bortzmeyer.org › software › manisha › check_gemini.py captured on 2024-05-12 at 15:21:42.

View Raw

More Information

⬅️ Previous capture (2023-06-14)

-=-=-=-=-=-=-

#!/usr/bin/env python3

"""Monitoring plugin (Nagios-compatible) for watching a Gemini server

The monitoring plugin API is documented at
<https://www.monitoring-plugins.org/doc/guidelines.html>.

"""

import socket
import re
import sys
import datetime
import getopt
import urllib.parse

# https://www.pyopenssl.org/
# https://pyopenssl.readthedocs.io/
import OpenSSL

# https://framagit.org/bortzmeyer/agunua
import Agunua

# Do not touch
# https://www.monitoring-plugins.org/doc/guidelines.html#AEN78
STATE_OK = 0
STATE_WARNING = 1
STATE_CRITICAL = 2
STATE_UNKNOWN = 3
STATE_DEPENDENT = 4

# Can be changed from the command-line
host = None
vhostname = None
path = ""
expect = None
expect_statuscode = None
# "meta" is defined in Gemini protocol specification
expect_meta = None
sni = True
warn_cert = None
crit_cert = None
insecure = False
disable_tofu = False
accept_expired = False
socks = None
port = Agunua.GEMINI_PORT
forceIPv4 = False
forceIPv6 = False
get_content = False
err_message = ""

def error(msg=None):
    if msg is None:
        msg = "Unknown error"
    print("%s CRITICAL: %s" % (host, msg))
    sys.exit(STATE_CRITICAL)

def warning(msg=None):
    if msg is None:
        msg = "Unknown warning"
    print("%s WARNING: %s" % (host, msg))
    sys.exit(STATE_WARNING)

try:    
    try:
        optlist, args = getopt.getopt (sys.argv[1:], "hH:p:V:e:c:C:m:P:s:ghi46xaT")
        for option, value in optlist:
            if option == "-h":
                print("Standard Nagios API options")
                sys.exit(STATE_UNKNOWN)
            elif option == "-H":
                host = value
            elif option == "-V":
                vhostname = value
            elif option == "-e":
                expect = value
            elif option == "-c":
                expect_statuscode = value
            elif option == "-C":
                if value.find(",") < 0:
                    warn_cert = int(value)
                else:
                    (warning_c, critical_c) = value.split(",")
                    warn_cert = int(warning_c)
                    crit_cert = int(critical_c)
            elif option == "-m":
                expect_meta = value
            elif option == "-p":
                path = value
            elif option == "-P":
                port = int(value)
            elif option == "-i":
                insecure = True
            elif option == "-T":
                disable_tofu = True
            elif option == "-x":
                sni = False
            elif option == "-a":
                accept_expired = True
            elif option == "-s":
                match = re.search("^\[?(([\w\.]+)|(([a-fA-F0-9:]+))|([0-9\.]+))\]?:([0-9]+)$", value)
                if not match:
                    print("Socks must be host:port")
                    sys.exit(STATE_UNKNOWN)
                socks = (match.group(1), int(match.group(6)))
            elif option == "-4":
                forceIPv4 = True
            elif option == "-6":
                forceIPv6 = True
            elif option == '-g':
                get_content = True
            else:
                # Should never occur, it is trapped by getopt
                print("Unknown option %s" % option)
                sys.exit(STATE_UNKNOWN)
    except getopt.error as reason:
        print("Option parsing problem %s" % reason)
        sys.exit(STATE_UNKNOWN)
    if host is None:
        print("Host (-H) is necessary")
        sys.exit(STATE_UNKNOWN)
    if vhostname is None:
        vhostname = host
    if socks is not None and vhostname.lower().endswith(".onion"):
        host = vhostname
    if forceIPv4 and forceIPv6:
        print("Force IPv4 or IPv6 but not both")
        sys.exit(STATE_UNKNOWN)        
    if warn_cert is not None and crit_cert is not None and warn_cert <= crit_cert:
        print("Warning threshold must be higher than critical threshold")
        sys.exit(STATE_UNKNOWN)        
    if len(args) > 0:
        print("Too many arguments (\"%s\")" % args)
        sys.exit(STATE_UNKNOWN)

    if port == Agunua.GEMINI_PORT:
        netloc = vhostname
    else:
        netloc = "%s:%i" % (vhostname, port)
    url = urllib.parse.urlunsplit(("gemini", netloc, path, "", ""))
    if disable_tofu:
        tofu_path = ""
    else:
        tofu_path = Agunua.TOFU
    if expect != None:
        get_content = True
    start = datetime.datetime.now()
    result = Agunua.GeminiUri(url, get_content=get_content,
                              connect_to=host, use_socks=socks,
                              insecure=insecure, tofu=tofu_path,
                              accept_expired=accept_expired,
                              send_sni=sni, force_ipv4=forceIPv4,
                              force_ipv6=forceIPv6)
    if not result.network_success:
        error(result.error)
    end = datetime.datetime.now()
    if get_content:
        data = result.payload
    if expect_statuscode is not None:
        if result.status_code != expect_statuscode:
            error("Unexpected status code %s (expected was %s)" % \
                  (result.status_code, expect_statuscode))   
    elif result.status_code == "20": 
        if expect_meta is not None:
            f = result.meta.find(expect_meta)
            if f < 0:
                error("\"%s\" not found in meta (value is \"%s\")" % (expect_meta, result.meta))
        if expect is not None:
            f = data.find(expect)
            if f < 0:
                error("\"%s\" not found in answer" % expect)
    else:
        error("Status code %s: %s" % (result.status_code, result.meta))
    now = datetime.datetime.now()
    if crit_cert is not None and (result.cert_not_after <= now + datetime.timedelta(days=crit_cert)):
        error("Certificate expires in less than %i days" % crit_cert)
    if warn_cert is not None and (result.cert_not_after <= now + datetime.timedelta(days=warn_cert)):
        warning("Certificate expires in less than %i days" % warn_cert)
    latency = end-start
    elapsed = latency.seconds + latency.microseconds/(10**6)
    if get_content:
        size = " - %i bytes" % len(data)
    else:
        size = ""
    print("%s OK - %s%s - %.3f seconds" % (host, "No error", size, elapsed))
    sys.exit(STATE_OK)

except Exception as e:
    exc_type, exc_value, exc_traceback = sys.exc_info()
    print("%s UNKNOWN - Unknown internal error %s: %s" % (host, exc_type, exc_value)) 
    sys.exit(STATE_UNKNOWN)