- set HTTP User-Agent header to check_otp/VERSION
- added SSL validation options (--no-ssl-validation, --cacert, --capath) to deal with SSL validation issues
This commit is contained in:
@@ -2,10 +2,10 @@
|
||||
#
|
||||
# check_otp - Nagios check plugin for LinOTP/PrivacyIDEA OTP validation
|
||||
#
|
||||
# Version 1.0, latest version, documentation and bugtracker available at:
|
||||
# Version 1.1, latest version, documentation and bugtracker available at:
|
||||
# https://gitlab.lindenaar.net/scripts/nagios-plugins
|
||||
#
|
||||
# Copyright (c) 2016 Frederik Lindenaar
|
||||
# Copyright (c) 2018 Frederik Lindenaar
|
||||
#
|
||||
# This script is free software: you can redistribute and/or modify it under the
|
||||
# terms of version 3 of the GNU General Public License as published by the Free
|
||||
@@ -25,14 +25,18 @@ from hashlib import sha1
|
||||
from getpass import getpass
|
||||
from urllib import urlencode
|
||||
from urllib2 import Request, HTTPError, URLError, urlopen
|
||||
from ssl import CertificateError, \
|
||||
create_default_context as create_default_SSL_context, \
|
||||
_create_unverified_context as create_unverified_SSL_context
|
||||
from base64 import b16decode, b32decode, standard_b64decode
|
||||
from argparse import ArgumentParser as StandardArgumentParser, FileType, \
|
||||
_StoreAction as StoreAction, _StoreConstAction as StoreConstAction
|
||||
|
||||
# Constants (no need to change but allows for easy customization)
|
||||
VERSION="1.0"
|
||||
VERSION="1.1"
|
||||
PROG_NAME=os.path.splitext(os.path.basename(__file__))[0]
|
||||
PROG_VERSION=PROG_NAME + ' ' + VERSION
|
||||
HTTP_AGENT=PROG_NAME + '/' + VERSION
|
||||
URL_API_SUFFIX='/validate/check'
|
||||
ENV_VAR_USER='USER_NAME'
|
||||
ENV_VAR_PWD='USER_PASSWORD'
|
||||
@@ -222,8 +226,18 @@ def parse_args():
|
||||
help='port number to connect to (only used with -H)')
|
||||
parser.add_argument('-P', '--path',
|
||||
help='URL path to be used (only used with -H)')
|
||||
|
||||
parser.add_argument('-S', '--no-ssl', action='store_true',
|
||||
help='connect WITHOUT SSL (only used with -H)')
|
||||
parser.add_argument('--no-ssl-validation',
|
||||
action='store_const', dest='sslcontext',
|
||||
const=create_unverified_SSL_context(),
|
||||
default=create_default_SSL_context(),
|
||||
help='Do not validate server SSL certificate (DANGEROUS)')
|
||||
parser.add_argument('--cacert',
|
||||
help='CA Certificate file for SSL server validation')
|
||||
parser.add_argument('--capath',
|
||||
help='CA Certificate directory for SSL server validation')
|
||||
|
||||
parser.add_argument('-n', '--name', default=envvar(ENV_VAR_NAME, PROG_NAME),
|
||||
help="name in authentication request for logging "
|
||||
@@ -319,6 +333,15 @@ def parse_args():
|
||||
if not isempty(args.path):
|
||||
args.url+= args.path if args.path[0]=='/' else '/' + args.path
|
||||
|
||||
# Pass-on CA file/path to SSLContext if provided on the command line
|
||||
if not isempty(args.cacert):
|
||||
try:
|
||||
args.sslcontext.load_verify_locations(cafile=args.cacert)
|
||||
except IOError as e:
|
||||
args.cmdparser.error("cafile '%s': %s" % (args.cacert, str(e)))
|
||||
elif not isempty(args.capath):
|
||||
args.sslcontext.load_verify_locations(capath=args.capath)
|
||||
|
||||
# We should now be ready to authenticate, fail if that's not the case
|
||||
if args.func == Password:
|
||||
if isempty(args.login) and isempty(args.serial) \
|
||||
@@ -336,7 +359,7 @@ def parse_args():
|
||||
return args
|
||||
|
||||
|
||||
def checkotp(url, subject, secret, isserial=False, nas=None):
|
||||
def checkotp(url, subject, secret, isserial=False, nas=None, sslcontext=create_default_SSL_context()):
|
||||
"""Check a subject (user or token) with secret against PrivacyIDEA / LinOTP.
|
||||
|
||||
Args:
|
||||
@@ -363,7 +386,9 @@ def checkotp(url, subject, secret, isserial=False, nas=None):
|
||||
v if k!='pass' else '***MASKED***') for k,v in params.iteritems()]))
|
||||
|
||||
# Perform the API authentication request
|
||||
response = json.load(urlopen(Request(url, data=urlencode(params))))
|
||||
response = json.load(urlopen(Request(url, data=urlencode(params),
|
||||
headers={'User-Agent':HTTP_AGENT}),
|
||||
context=sslcontext))
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug('result: %s', json.dumps(response, indent=4))
|
||||
|
||||
@@ -407,10 +432,10 @@ if __name__ == '__main__':
|
||||
response=checkotp(args.url,
|
||||
args.login if isempty(args.serial) else args.serial,
|
||||
secret, isserial=not isempty(args.serial),
|
||||
nas=args.name)
|
||||
nas=args.name, sslcontext=args.sslcontext)
|
||||
endtime = time()
|
||||
|
||||
except (HTTPError, URLError) as e:
|
||||
except (HTTPError, URLError, CertificateError) as e:
|
||||
nagios_exit(NAGIOS_CRITICAL,'%s request failed: %s' % (message, e))
|
||||
|
||||
except (KeyboardInterrupt, EOFError) as e:
|
||||
@@ -478,3 +503,4 @@ if __name__ == '__main__':
|
||||
|
||||
nagios_exit(nagiosresult, message, [
|
||||
('time', [ elapse, args.warn, args.critical, 0, None ])])
|
||||
|
||||
|
||||
Reference in New Issue
Block a user