481 lines
22 KiB
Python
Executable File
481 lines
22 KiB
Python
Executable File
#! /usr/bin/env python
|
|
#
|
|
# check_otp - Nagios check plugin for LinOTP/PrivacyIDEA OTP validation
|
|
#
|
|
# Version 1.0, latest version, documentation and bugtracker available at:
|
|
# https://gitlab.lindenaar.net/scripts/nagios-plugins
|
|
#
|
|
# Copyright (c) 2016 Frederik Lindenaar
|
|
#
|
|
# This script is free software: you can redistribute and/or modify it under the
|
|
# terms of version 3 of the GNU General Public License as published by the Free
|
|
# Software Foundation, or (at your option) any later version of the license.
|
|
#
|
|
# This script is distributed in the hope that it will be useful but WITHOUT ANY
|
|
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
|
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program. If not, visit <http://www.gnu.org/licenses/> to download it.
|
|
|
|
import sys, os, logging, socket, hmac, json
|
|
from time import time
|
|
from struct import pack
|
|
from hashlib import sha1
|
|
from getpass import getpass
|
|
from urllib import urlencode
|
|
from urllib2 import Request, HTTPError, URLError, urlopen
|
|
from base64 import b16decode, b32decode, standard_b64decode
|
|
from argparse import ArgumentParser as StandardArgumentParser, FileType, \
|
|
_StoreAction as StoreAction, _StoreConstAction as StoreConstAction
|
|
|
|
# Constants (no need to change but allows for easy customization)
|
|
VERSION="1.0"
|
|
PROG_NAME=os.path.splitext(os.path.basename(__file__))[0]
|
|
PROG_VERSION=PROG_NAME + ' ' + VERSION
|
|
URL_API_SUFFIX='/validate/check'
|
|
ENV_VAR_USER='USER_NAME'
|
|
ENV_VAR_PWD='USER_PASSWORD'
|
|
ENV_VAR_SERIAL='TOKEN_SERIAL'
|
|
ENV_VAR_KEY='TOKEN_KEY'
|
|
ENV_VAR_NAME='CHECK_NAME'
|
|
OTP_DIGITS=6
|
|
OTP_TOTP_WINDOW=30
|
|
OTP_DIGEST_ALG=sha1
|
|
LOG_FORMAT='%(levelname)s - %(message)s'
|
|
LOG_FORMAT_FILE='%(asctime)s - ' + LOG_FORMAT
|
|
LOGGING_NONE=logging.CRITICAL + 10
|
|
NAGIOS_OK = ( 'OK', 0)
|
|
NAGIOS_WARN = ( 'WARNING', 1)
|
|
NAGIOS_CRITICAL = ( 'CRITICAL', 2 )
|
|
NAGIOS_UNKNOWN = ( 'UNKNOWN', 3 )
|
|
|
|
# Setup logging
|
|
logging.basicConfig(format=LOG_FORMAT)
|
|
logging.addLevelName(LOGGING_NONE, 'NONE')
|
|
logger = logging.getLogger(PROG_NAME)
|
|
logger.setLevel(logging.CRITICAL)
|
|
|
|
###################[ minimalistic HOTP/TOTP implementation ]###################
|
|
# based on Yoav Aner's code: http://blog.gingerlime.com/2010/once-upon-a-time #
|
|
# latest version: https://github.com/gingerlime/hotpie/blob/master/hotpie.py #
|
|
|
|
def HOTP(K, C, digits=OTP_DIGITS, digest=OTP_DIGEST_ALG):
|
|
"""Calculate the HOTP value for Key K and count C, returns OTP value"""
|
|
digest = hmac.new(key=K, msg=pack(b"!Q", C), digestmod=digest).hexdigest()
|
|
offset = int(digest[-1], 16)
|
|
return str(int(digest[(offset<<1):((offset<<1)+8)],16)&0x7fffffff)[-digits:]
|
|
|
|
def TOTP(K, C=None, d=OTP_DIGITS, win=OTP_TOTP_WINDOW, dg=OTP_DIGEST_ALG):
|
|
"""Calculate the TOTP value for Key K (using HOTP), returns OTP value"""
|
|
return HOTP(K, int((time() if C is None else C) / win), digits=d, digest=dg)
|
|
|
|
###############################################################################
|
|
|
|
def Password(K, C):
|
|
"""Dummy routine to represent Password authentication (without OTP)"""
|
|
return None
|
|
|
|
###################[ patch to force urllib on ipv4 / ipv6 ]###################
|
|
# Support functions to allow forcing urllib2.urlopen() IPv4 or IPv6 connections
|
|
# based on http://stackoverflow.com/questions/2014534/force-python-mechanize-urllib2-to-only-use-a-requests/6319043#6319043
|
|
# the trick is to wrap the original socket.getaddrinfo and enforce the family
|
|
socket.origGetAddrInfo = socket.getaddrinfo
|
|
socket.getAddrInfoFamily = 0
|
|
def getAddrInfoWrapper(host, port, family=0, socktype=0, proto=0, flags=0):
|
|
"""wrapper for socket.getaddrinfo() to connect only to a specific family"""
|
|
if family == 0:
|
|
family = socket.getAddrInfoFamily
|
|
result=socket.origGetAddrInfo(host, port, family, socktype, proto, flags)
|
|
logger.debug('connecting over IPv4 to %s:%d' if family==socket.AF_INET else
|
|
'connecting over IPv6 to %s:%d' if family==socket.AF_INET6 else
|
|
'connecting to %s:%d' ,result[0][4][0],result[0][4][1])
|
|
return result
|
|
socket.getaddrinfo = getAddrInfoWrapper
|
|
###############################################################################
|
|
|
|
################[ wrapper to stop ArgumentParser from exiting ]################
|
|
# based on http://stackoverflow.com/questions/14728376/i-want-python-argparse-to-throw-an-exception-rather-than-usage/14728477#14728477
|
|
# the only way to do this is overriding the error method and throw and Exception
|
|
class ArgumentParserError(Exception): pass
|
|
|
|
class ArgumentParser(StandardArgumentParser):
|
|
"""ArgumentParser not exiting with non-Nagios format message upon errors"""
|
|
def error(self, message):
|
|
raise ArgumentParserError(message)
|
|
|
|
##################[ Action to immediately set the log level ]##################
|
|
class SetSocketAddrFamily(StoreConstAction):
|
|
"""ArgumentParser action to set socket.getAddrInfo() Addr Family to const"""
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
socket.getAddrInfoFamily = self.const
|
|
|
|
##################[ Action to immediately set the log level ]##################
|
|
class SetLogLevel(StoreConstAction):
|
|
"""ArgumentParser action to set log level to provided const value"""
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
logging.getLogger(PROG_NAME).setLevel(self.const)
|
|
|
|
####################[ Action to immediately log to a file ]####################
|
|
class SetLogFile(StoreAction):
|
|
"""ArgumentParser action to log to file (sets up FileHandler accordingly)"""
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
super(SetLogFile, self).__call__(parser,namespace,values,option_string)
|
|
formatter = logging.Formatter(LOG_FORMAT_FILE)
|
|
handler = logging.FileHandler(values)
|
|
handler.setFormatter(formatter)
|
|
logger = logging.getLogger(PROG_NAME)
|
|
logger.propagate = False
|
|
logger.addHandler(handler)
|
|
|
|
####################[ Action to load contents from a file ]####################
|
|
class LoadFromFile(StoreAction):
|
|
"""ArgumentParser action to load file contents into another variable """
|
|
def __init__(self,option_strings,dest,nargs=None,const=None,default=None,
|
|
type=None,choices=None,required=False,help=None,metavar=None):
|
|
if not isinstance(type, FileLoadType):
|
|
raise ArgumentParserError('LoadFromFile action option %s requires '
|
|
'type FileLoadType (got %s)' % (option_strings, type))
|
|
super(LoadFromFile, self).__init__(option_strings,dest,nargs,const,
|
|
default,type,choices,required,help,metavar)
|
|
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
super(LoadFromFile,self).__call__(parser,namespace,values,option_string)
|
|
logger.info('reading %s from %s', self.type.name, values.name)
|
|
try:
|
|
content = values.readline().strip()
|
|
if self.type.close:
|
|
values.close()
|
|
setattr(namespace, self.type.name, self.type.type(content) \
|
|
if not isempty(content) else self.type.type())
|
|
except (IOError, ValueError) as e:
|
|
raise ArgumentParserError('cannot read %s from %s: %s' %
|
|
(self.type.name, values.name, e))
|
|
|
|
####################[ Enhanced FileType for LoadFromFile ]####################
|
|
class FileLoadType(FileType):
|
|
"""ArgumentParser FileType extension storing data needed by LoadFromFile"""
|
|
def __init__(self, name, valuetype=str, mode='r', close=None, bufsize=-1):
|
|
self.name = name
|
|
self.type = valuetype
|
|
self.close = mode == 'r' if close is None else close
|
|
super(FileLoadType, self).__init__(mode, bufsize)
|
|
|
|
##############[ Action to prompt for password if value is empty ]##############
|
|
class PasswordPrompt(StoreAction):
|
|
"""ArgumentParser action to prompt for password when empty (and store it)"""
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
pwd = getpass('Please enter password: ') if values is None else values
|
|
super(PasswordPrompt, self).__call__(parser,namespace,pwd,option_string)
|
|
|
|
###############################################################################
|
|
|
|
|
|
def isempty(string):
|
|
"""Checks whether string 'str' provided is unset or empty"""
|
|
return string is None or len(string) == 0
|
|
|
|
|
|
def envvar(name, default=None):
|
|
"""Returns value of environment var 'name', or value of default otherwise"""
|
|
return os.environ.get(name, default)
|
|
|
|
|
|
def base16_32_64(string=None):
|
|
"""convert encoded string to binary, supports base16, base32 and base64
|
|
|
|
Args:
|
|
string (str) : base 16/32/64 encoded string
|
|
|
|
Returns:
|
|
binary version of string, provided it is base 16/32/64 encoded
|
|
"""
|
|
if not isempty(string):
|
|
try:
|
|
encoding, value = ('base16', b16decode(string, True))
|
|
except TypeError:
|
|
try:
|
|
encoding, value = ('base32', b32decode(string, False))
|
|
except TypeError:
|
|
encoding, value = ('base64', standard_b64decode(string))
|
|
logger.debug("converted %s encoded key '%s' to %d bytes of binary data",
|
|
encoding, string, len(value))
|
|
return value
|
|
|
|
|
|
def parse_args():
|
|
"""Parse command line and get parameters from environment, if present"""
|
|
|
|
# Setup argument parser, the workhorse gluing it all together
|
|
parser = ArgumentParser(
|
|
description='Nagios check for OTP validation against LinOTP/PrivacyIDEA'
|
|
)
|
|
parser.add_argument('-V', '--version',action="version",version=PROG_VERSION)
|
|
|
|
pgroup = parser.add_mutually_exclusive_group(required=True)
|
|
pgroup.add_argument('-u', '--url',
|
|
help='URL to check OTP authentication against')
|
|
pgroup.add_argument('-H', '--host',
|
|
help='hostname to test against (to construct URL)')
|
|
|
|
parser.add_argument('-p', '--port', type=int,
|
|
help='port number to connect to (only used with -H)')
|
|
parser.add_argument('-P', '--path',
|
|
help='URL path to be used (only used with -H)')
|
|
parser.add_argument('-S', '--no-ssl', action='store_true',
|
|
help='connect WITHOUT SSL (only used with -H)')
|
|
|
|
parser.add_argument('-n', '--name', default=envvar(ENV_VAR_NAME, PROG_NAME),
|
|
help="name in authentication request for logging "
|
|
"(defaults to '%s')" % PROG_NAME)
|
|
|
|
parser.add_argument('-w', '--warn', type=float,
|
|
help='Response time for warning status (seconds)')
|
|
parser.add_argument('-c','--critical', type=float,
|
|
help='Response time for critical status (seconds)')
|
|
|
|
pgroup = parser.add_mutually_exclusive_group(required=False)
|
|
pgroup.add_argument('-4', '--ipv4', action=SetSocketAddrFamily,
|
|
const=socket.AF_INET, help='connect using IPv4')
|
|
pgroup.add_argument('-6', '--ipv6', action=SetSocketAddrFamily,
|
|
const=socket.AF_INET6,help='connect using IPv6')
|
|
|
|
pgroup = parser.add_mutually_exclusive_group(required=False)
|
|
pgroup.add_argument('-q', '--quiet', default=logging.CRITICAL,
|
|
action=SetLogLevel, const=LOGGING_NONE,
|
|
help='quiet (no output, only exit with exit code)')
|
|
pgroup.add_argument('-v', '--verbose', help='more verbose output',
|
|
action=SetLogLevel, const=logging.INFO)
|
|
pgroup.add_argument('-d', '--debug', help='debug output (more verbose)',
|
|
action=SetLogLevel, const=logging.DEBUG)
|
|
|
|
parser.add_argument('-l', '--logfile', action=SetLogFile,
|
|
help='send logging output to logfile')
|
|
|
|
commonparser = ArgumentParser(add_help=False)
|
|
pgroup = commonparser.add_mutually_exclusive_group(required=False)
|
|
pgroup.add_argument('-l', '--login', default=envvar(ENV_VAR_USER),
|
|
help='username to login with, can only be omitted when '
|
|
'%s is set or -s/--serial is used' % ENV_VAR_USER)
|
|
pgroup.add_argument('-s', '--serial', default=envvar(ENV_VAR_SERIAL),
|
|
help='token serial to use, can only be omitted when %s '
|
|
'is set or -l/--login is used' % ENV_VAR_SERIAL)
|
|
|
|
pgroup = commonparser.add_mutually_exclusive_group(required=False)
|
|
pgroup.add_argument('-p', '--password', nargs='?', action=PasswordPrompt,
|
|
default=envvar(ENV_VAR_PWD),
|
|
help='password or OTP+PIN to authenticate, uses env. '
|
|
'var %s if not present and prompts when PASSWORD '
|
|
'omitted (use "" for empty password)'%ENV_VAR_PWD)
|
|
pgroup.add_argument('-P', '--passwordfile',
|
|
action=LoadFromFile,type=FileLoadType('password'),
|
|
help='read password/authentication secret from file'),
|
|
|
|
subparser = parser.add_subparsers(title='Implemented test modes / checks')
|
|
|
|
cmdparser = subparser.add_parser('password', parents=[commonparser],
|
|
help='perform test with provided secret')
|
|
cmdparser.set_defaults(func=Password, cmdparser=cmdparser)
|
|
|
|
otpparser = ArgumentParser(add_help=False)
|
|
pgroup = otpparser.add_mutually_exclusive_group(required=False)
|
|
pgroup.add_argument('-k', '--key', default=envvar(ENV_VAR_KEY),
|
|
type=base16_32_64, help='HOTP/TOTP key, can only be'
|
|
'omitted if %s is set or -K is used' % ENV_VAR_KEY)
|
|
pgroup.add_argument('-K', '--keyfile', action=LoadFromFile,
|
|
type=FileLoadType('key', base16_32_64),
|
|
help='read HOTP key from the file specified'),
|
|
|
|
otpparser.add_argument('-m','--merge', choices=['pwdOTP', 'OTPpwd' ],
|
|
default='pwdOTP', help='how to merge password and OTP')
|
|
|
|
|
|
cmdparser = subparser.add_parser('hotp', parents=[commonparser, otpparser],
|
|
help='perform HOTP check using provided key and count')
|
|
|
|
pgroup = cmdparser.add_mutually_exclusive_group(required=True)
|
|
pgroup.add_argument('-c', '--count', type=int,
|
|
help='count to be used to calculate the HTOP value')
|
|
pgroup.add_argument('-C', '--countfile',type=FileLoadType('count',int,'r+'),
|
|
action=LoadFromFile,
|
|
help='read HOTP count from file and update it')
|
|
cmdparser.add_argument('-i', '--increment', type=int, default=2,
|
|
help='increment value for count (default=1)')
|
|
cmdparser.set_defaults(func=HOTP, cmdparser=cmdparser)
|
|
|
|
cmdparser = subparser.add_parser('totp',parents=[commonparser, otpparser],
|
|
help='perform TOTP test using provided key')
|
|
cmdparser.set_defaults(func=TOTP, cmdparser=cmdparser)
|
|
|
|
# parse arguments and post-process command line options
|
|
args = parser.parse_args()
|
|
|
|
# Generate the URL if not provided on the command line
|
|
if isempty(args.url):
|
|
args.url = 'http://' if args.no_ssl else 'https://'
|
|
args.url+= args.host
|
|
if not isempty(args.port):
|
|
args.url+= ':' + args.port
|
|
if not isempty(args.path):
|
|
args.url+= args.path if args.path[0]=='/' else '/' + args.path
|
|
|
|
# We should now be ready to authenticate, fail if that's not the case
|
|
if args.func == Password:
|
|
if isempty(args.login) and isempty(args.serial) \
|
|
or isempty(args.password):
|
|
args.cmdparser.error('user/serial and a secret are required!')
|
|
|
|
elif args.func == HOTP or args.func == TOTP:
|
|
if isempty(args.login) and isempty(args.serial) or isempty(args.key):
|
|
args.cmdparser.error('user/serial and a key are required!')
|
|
|
|
else:
|
|
args.cmdparser.error("BUG: mode %s is not supported"%args.func.__name__)
|
|
|
|
# if we got here all seems OK
|
|
return args
|
|
|
|
|
|
def checkotp(url, subject, secret, isserial=False, nas=None):
|
|
"""Check a subject (user or token) with secret against PrivacyIDEA / LinOTP.
|
|
|
|
Args:
|
|
url (str) : URL to connect to, URL_API_SUFFIX is added if missing
|
|
subject (str) : subject to authenticate (user or a token serial)
|
|
secret (str) : secret (password+OTP) to authenticate with
|
|
isserial (bool): True if subject is a token serial (optional)
|
|
nas (str) : string to pass-on as the nas string (optional)
|
|
|
|
Returns:
|
|
The result response from the PrivacyIDEA server (mapping object)
|
|
"""
|
|
# Complete (fix) URL
|
|
if not url.endswith(URL_API_SUFFIX):
|
|
url += URL_API_SUFFIX[1:] if url[-1] == '/' else URL_API_SUFFIX
|
|
logger.info('connecting to %s', url)
|
|
|
|
# Prepare the parameters
|
|
params = { 'pass': secret, 'serial' if isserial else 'user': subject }
|
|
if not isempty(nas):
|
|
params['nas'] = nas
|
|
if logger.isEnabledFor(logging.DEBUG):
|
|
logger.debug('HTTP request parameters: %s', ', '.join([ '%s=%s' % (k,
|
|
v if k!='pass' else '***MASKED***') for k,v in params.iteritems()]))
|
|
|
|
# Perform the API authentication request
|
|
response = json.load(urlopen(Request(url, data=urlencode(params))))
|
|
if logger.isEnabledFor(logging.DEBUG):
|
|
logger.debug('result: %s', json.dumps(response, indent=4))
|
|
|
|
return response
|
|
|
|
|
|
def nagios_exit(status, message, data=None):
|
|
"""exit 'nagios-style', print status and message followed by perf. data"""
|
|
if logger.isEnabledFor(logging.CRITICAL):
|
|
if data is not None and len(data) > 0:
|
|
perfdata = ' | ' + ' '.join([ "'%s'=%s" % (k,
|
|
';'.join(['' if x is None else str(x) for x in v])
|
|
if isinstance(v,list) else v) for k,v in data ])
|
|
else:
|
|
perfdata = ''
|
|
print 'OTP %s: %s%s' % (status[0], message, perfdata)
|
|
sys.exit(status[1])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
args = parse_args()
|
|
except ArgumentParserError as e:
|
|
nagios_exit(NAGIOS_UNKNOWN,'error with setup: ' + e.message)
|
|
except (KeyboardInterrupt, EOFError) as e:
|
|
print
|
|
nagios_exit(NAGIOS_UNKNOWN,'initialization aborted')
|
|
|
|
message = args.func.__name__ + ' authentication'
|
|
|
|
if 'key' in args:
|
|
secret = args.func(args.key, args.count if 'count' in args else None)
|
|
if not isempty(args.password):
|
|
secret = (secret + args.password) if args.merge=='OTPpwd' \
|
|
else (args.password + secret)
|
|
else:
|
|
secret = args.password
|
|
|
|
try:
|
|
starttime = time()
|
|
response=checkotp(args.url,
|
|
args.login if isempty(args.serial) else args.serial,
|
|
secret, isserial=not isempty(args.serial),
|
|
nas=args.name)
|
|
endtime = time()
|
|
|
|
except (HTTPError, URLError) as e:
|
|
nagios_exit(NAGIOS_CRITICAL,'%s request failed: %s' % (message, e))
|
|
|
|
except (KeyboardInterrupt, EOFError) as e:
|
|
nagios_exit(NAGIOS_UNKNOWN,'%s request aborted' % message)
|
|
|
|
resultdata = response.get('result')
|
|
if resultdata is None or not resultdata['status']:
|
|
nagios_exit(NAGIOS_CRITICAL,'%s request processing failed' % message)
|
|
|
|
authenticated = resultdata['status'] and resultdata['value']
|
|
detaildata = response.get('detail')
|
|
elapse = endtime-starttime
|
|
if logger.isEnabledFor(logging.INFO):
|
|
logger.info('Got response from : %s', response.get('version'))
|
|
logger.info('Processing time : %.2fs', elapse)
|
|
logger.info('Got valid result : %s', resultdata.get('status'))
|
|
logger.info('Authenticated : %s', authenticated)
|
|
for field in 'message', 'type', 'serial':
|
|
if field in detaildata:
|
|
logger.info('Token %-12s: %s', field, detaildata.get(field))
|
|
|
|
errmsgs = []
|
|
if authenticated and 'countfile' in args and args.increment > 0:
|
|
fname = args.countfile.name
|
|
nwcnt = args.count + args.increment
|
|
logger.info('updating count in %s from %d to %d',fname,args.count,nwcnt)
|
|
try:
|
|
with args.countfile as countfile:
|
|
countfile.seek(0)
|
|
countfile.write(str(nwcnt))
|
|
countfile.truncate()
|
|
except IOError as e:
|
|
errmsgs.append('unable to update countfile %s: %s' % (fname, e))
|
|
logger.critical(errmsgs[-1])
|
|
|
|
for k, src, g in ('serial',detaildata,' of '),('version',response,' with '):
|
|
if k in src:
|
|
message += g + src.get(k)
|
|
if authenticated:
|
|
nagiosresult = NAGIOS_OK
|
|
message += ' succeeded'
|
|
else:
|
|
nagiosresult = NAGIOS_CRITICAL
|
|
message += ' failed'
|
|
|
|
if args.critical is not None and elapse > args.critical:
|
|
errmsgs.append('took too long (%.2fs > %.2fs)' % (elapse,args.critical))
|
|
logger.critical('response %s', errmsgs[-1])
|
|
elif args.warn is not None and elapse > args.warn:
|
|
if nagiosresult != NAGIOS_CRITICAL:
|
|
nagiosresult = NAGIOS_WARN
|
|
errmsgs.append('is slow (%.2fs > %.2fs)' % (elapse, args.warn))
|
|
logger.warn('response time %s', errmsgs[-1])
|
|
else:
|
|
message+= ' in %.1fs' % elapse
|
|
logger.info('response completed in %.1fs', elapse)
|
|
|
|
if len(errmsgs) > 0:
|
|
message+= ' and ' if nagiosresult == NAGIOS_CRITICAL else ' but '
|
|
message+= ' and '.join(errmsgs)
|
|
if nagiosresult == NAGIOS_OK:
|
|
nagiosresult = NAGIOS_CRITICAL
|
|
if 'message' in detaildata:
|
|
message += ': ' + detaildata.get('message')
|
|
|
|
nagios_exit(nagiosresult, message, [
|
|
('time', [ elapse, args.warn, args.critical, 0, None ])])
|