added first implementation of check_otp supporting HOTP and TOTP checks against PrivacyIDEA/LinOTP

This commit is contained in:
2016-10-10 11:27:24 +02:00
parent ed63fa9140
commit d9efd3954e
2 changed files with 666 additions and 11 deletions

195
README.md
View File

@@ -1,7 +1,7 @@
nagios-plugins
==============
This repository contains my collection of modified and custom written check
plugins and scripts for [Nagios](http://www.nagios.org).
This repository contains my small collection of modified and custom written
nagios check plugins and scripts for [Nagios](http://www.nagios.org).
Most of these are very custom solutions or modified versions of standard plugins
so distributing them through [NagiosExchange](https://exchange.nagios.org/) is
@@ -12,12 +12,23 @@ encounter any issues or require changes.
The latest versions, documentation and bugtracker available on my
[GitLab instance](https://gitlab.lindenaar.net/scripts/privacyidea-checkotp)
Copyright (c) 2015 Frederik Lindenaar. free for distribution under the GNU
License, see [below](#license)
Copyright (c) 2015 - 2016 Frederik Lindenaar. free for distribution under
the GNU General Public License, see [below](#license)
contents
========
This repository contains the following scripts:
* [check_memory](#check_memory)
patched version of nagios-plugins check_memory script for Linux procps v3.3+
* [check_multiple_host_addresses](#host_addresses)
monitor multi-home and dual-stack (i.e. ipv4 and ipv6) servers.
* [check_otp](#check_otp)
plugin to monitor PrivacyIDEA (and LinOTP) OTP validation
* [nagiosstatus](#nagiosstatus)
CGI-BIN script to report the status of nagios (to monitor nagios itself)
plugins/check_memory
--------------------
<a name=check_memory>plugins/check_memory</a>
---------------------------------------------
Nagios check script to monitor the memory on Linux systems. Due to changes in
the output of procps v3.3 (the changelog refers to it as modernizing it), it's
output changed and breaks the the check_memory script as shipped with many linux
@@ -26,8 +37,8 @@ is indifferent of which version of procps (to date) is used. No other changes
were made to the script.
plugins/plugins/check_multiple_host_addresses
---------------------------------------------
<a name=host_addresses>plugins/check_multiple_host_addresses</a>
----------------------------------------------------------------
This script is a first attempt to monitor multi-home and dual-stack (i.e. ipv4
and ipv6) servers. In my setup a server should only considered availble if it is
available on all of its primary addresses (i.e. both ipv4 and ipv6). It uses the
@@ -38,6 +49,7 @@ this solution as well.
Installation is straightforward, after installing the script on your server, add
the following to your `commands.cmd` configuration file to make it available:
~~~
# 'check-host-alive' command definition for multi-homed/dual-stack servers
define command{
@@ -45,8 +57,10 @@ the following to your `commands.cmd` configuration file to make it available:
command_line [install_path]/plugins/check_multiplehost_addresses '$HOSTADDRESS$' '$_HOSTADDRESS6$'
}
~~~
The example above assumes that the IPv6 address of the host is provided as part
of the host configuration, i.e.:
~~~
define host {
...
@@ -55,13 +69,172 @@ of the host configuration, i.e.:
...
}
~~~
To use the script either add ` check_command check-addresses-alive`
To use the script either add `check_command check-addresses-alive`
to the specific hosts that should use the check or to the generic host used as
template.
cgi-bin/nagiosstatus.sh
-----------------------
<a name=check_otp>plugins/check_otp</a>
---------------------------------------
Plugin (check) to monitor OTP validation, currently implemented for PrivacyIDEA
(and LinOTP). The check can validate a provided password/secret or calculate an
HOTP or TOTP value and use that to validate (with or without a password). Other
methods and interfaces can be plugged in easily (please raise a request or
provide a patch).
Please run `check_otp -h` for an actual overview of the available options. The
script currently supports 3 modes of operation:
* password - simply authenticate with the provided secret (no calculations)
* totp - calculate the TOTP code using a key and current time
* hotp - calculate the HOTP code using a key and a count (automatically
increments the count in case a count file is used)
Generic parameters (connection parameters, critical/warning thresholds, etc.)
should be provided before the mode of operation is specified, mode-specific
parameters should follow the mode selected. Keys, passwords and HOTP counts can
be read from a file as well. Checks can be performed based on token
serial or a login and a password (only mandatory for password authentication).
HOTP/TOTP modes require a Base16/32/64 encoded key provided on the command-line
or in a file. The generated HOTP/TOTP value is appended to the password/secret
(if provided), the order can be changed with the `-m` command line parameter.
Installation for is straightforward, after installing the script on the server
add the following to your Nagios `commands.cmd` configuration file:
~~~
# 'check_totp_serial' command definition to test TOTP based on token serial (no password)
# parameters: token serial (ARG1), key (ARG2), additional parameters in ARG3
define command {
command_name check_totp_serial
command_line [install_path]/plugins/check_otp -H $HOSTNAME$ -w 3 -c 8 -P /token totp -s $ARG1$ -k $ARG2$ $ARG3$
}
# 'check_totp_serial' command definition to test TOTP based on token serial and password
# parameters: token serial (ARG1), key (ARG2), password (ARG3), additional parameters in ARG4
define command {
command_name check_totp_serial_pwd
command_line [install_path]/plugins/check_otp -H $HOSTNAME$ -w 3 -c 8 -P /token totp -s $ARG1$ -k $ARG2$ -p $ARG3$ $ARG4$
}
# 'check_totp_login' command definition to test TOTP based on login and password
# parameters: login (ARG1), key (ARG2), password (ARG3), additional parameters in ARG4
define command {
command_name check_totp_login
command_line [install_path]/plugins/check_otp -H $HOSTNAME$ -w 3 -c 8 -P /token totp -l $ARG1$ -k $ARG2$ -p $ARG3$ $ARG4$
}
# 'check_totp_serial_dir' command definition to test TOTP based on token serial
# parameters: directory (ARG1), token serial (ARG2) additional parameters in ARG3
define command {
command_name check_totp_serial_dir
command_line [install_path]/plugins/check_otp -H $HOSTNAME$ -w 3 -c 8 -P /token totp -s $ARG2$ -K $ARG1$/$ARG2$.key $ARG3$
}
# 'check_totp_serial_dir_pwd' command definition to test TOTP based on token serial and password
# parameters: directory (ARG1), token serial (ARG2), additional parameters in ARG3
define command {
command_name check_totp_serial_dir_pwd
command_line [install_path]/plugins/check_otp -H $HOSTNAME$ -w 3 -c 8 -P /token totp -s $ARG2$ -K $ARG1$/$ARG2$.key -P $ARG1$/$ARG2$.pwd $ARG3$
}
# 'check_totp_login_dir' command definition to test TOTP based on login
# parameters: directory (ARG1), login (ARG2), additional parameters in ARG3
define command {
command_name check_totp_login_dir
command_line [install_path]/plugins/check_otp -H $HOSTNAME$ -w 3 -c 8 -P /token totp -l $ARG2$ -K $ARG1$/$ARG2$.key $ARG3$
}
# 'check_totp_login_dir_pwd' command definition to test TOTP based on login and password
# parameters: directory (ARG1), login (ARG2) additional parameters in ARG3
define command {
command_name check_totp_login_dir_pwd
command_line [install_path]/plugins/check_otp -H $HOSTNAME$ -w 3 -c 8 -P /token totp -l $ARG2$ -K $ARG1$/$ARG2$.key -P $ARG1$/$ARG2$.pwd $ARG3$
}
# 'check_hotp_serial_dir' command definition to test HOTP based on token serial
# parameters: directory (ARG1), token serial (ARG2), additional parameters in ARG3
define command {
command_name check_hotp_serial_dir
command_line [install_path]/plugins/check_otp -H $HOSTNAME$ -w 3 -c 8 -P /token hotp -s $ARG2$ -K $ARG1$/$ARG2$.key -C $ARG1$/$ARG2$.count $ARG3$
}
# 'check_hotp_serial_dir_pwd' command definition to test HOTP based on token serial and password
# parameters: directory (ARG1), token serial (ARG2), additional parameters in ARG3
define command {
command_name check_hotp_serial_dir_pwd
command_line [install_path]/plugins/check_otp -H $HOSTNAME$ -w 3 -c 8 -P /token hotp -s $ARG2$ -K $ARG1$/$ARG2$.key -C $ARG1$/$ARG2$.count -P $ARG1$/$ARG2$.pwd $ARG3$
}
# 'check_hotp_login_dir' command definition to test HOTP based on login
# parameters: directory (ARG1), login (ARG2), additional parameters in ARG3
define command {
command_name check_hotp_login_dir
command_line [install_path]/plugins/check_otp -H $HOSTNAME$ -w 3 -c 8 -P /token hotp -l $ARG2$ -K $ARG1$/$ARG2$.key -C $ARG1$/$ARG2$.count $ARG3$
}
# 'check_hotp_login_dir_pwd' command definition to test HOTP based on login and password
# parameters: directory (ARG1), login (ARG2), additional parameters in ARG3
define command {
command_name check_hotp_login_dir_pwd
command_line [install_path]/plugins/check_otp -H $HOSTNAME$ -w 3 -c 8 -P /token hotp -l $ARG2$ -K $ARG1$/$ARG2$.key -C $ARG1$/$ARG2$.count -P $ARG1$/$ARG2$.pwd $ARG3$
}
~~~
Please check / adjust the following:
* replace `[install_path]/plugins` with the location of the script
* assumption is that the `$HOSTNAME$` can be used for an SSL connection (and
that the certificate is valid for this host, use the -u parameter and an
URL if this is not the case)
* path on the server is assumed to be /token (API endpoints will be added)
* check the thresholds for Warning (3s) and Critical (8s), adjust if needed
The `dir` and `dir_pwd` commands allow to store all sensitive data for tokens in
a folder and hence only require a folder name and token serial or login. This
expects the folder specified to contain the following files:
* [serial/login].key - HOTP/TOTP key in Base16/32/64 format on first line
* [serial/login].pwd - password (only first line is used)
* [serial/login].count - numeric HOTP count on first line, autoincremented
Please note that required files must exist or the check will fail with an error.
To use the it define a service check like below:
~~~
# check that TOTP authentication is working for token serial and provided key
define service {
host hostname.mydomain.tld
service_description Check TOTP Authentication
check_command check_totp_serial!TOTP0001234X!82f37371367b7e8aafb320b2d9b2721f66bbf161
use generic-service
}
# check that TOTP authentication is working for token serial and info from folder
define service {
host hostname.mydomain.tld
service_description Check TOTP Authentication
check_command check_totp_serial_dir!/etc/nagios3/tokeninfo!TOTP0001234X
use generic-service
}
# check that HOTP authentication is working for token serial and info from folder
define service {
host hostname.mydomain.tld
service_description Check TOTP Authentication
check_command check_hotp_serial_dir!/etc/nagios3/tokeninfo!HOTP0004321Y
use generic-service
}
~~~
<a name=nagiosstatus>cgi-bin/nagiosstatus.sh</a>
------------------------------------------------
Very simplistic CGI-BIN script that checkes whether nagios is still running and
still updating its status. It wil always return an HTTP Status 200 (OK) and a
simple text page with one of the following texts:

482
plugins/check_otp Executable file
View File

@@ -0,0 +1,482 @@
#! /usr/bin/env python
#
# check_otp - Nagios check plugin for LinOTP/PrivacyIDEA OTP validation
#
# Version 1.0, latest version, documentation and bugtracker available at:
# https://gitlab.lindenaar.net/scripts/nagios-plugins
#
# Copyright (c) 2016 Frederik Lindenaar
#
# This script is free software: you can redistribute and/or modify it under the
# terms of version 3 of the GNU General Public License as published by the Free
# Software Foundation, or (at your option) any later version of the license.
#
# This script is distributed in the hope that it will be useful but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, visit <http://www.gnu.org/licenses/> to download it.
import sys, os, logging, socket, hmac, json
from time import time
from struct import pack
from hashlib import sha1
from getpass import getpass
from urllib import urlencode
from urllib2 import Request, HTTPError, URLError, urlopen
from base64 import b16decode, b32decode, standard_b64decode
from argparse import ArgumentParser as StandardArgumentParser, FileType, \
_StoreAction as StoreAction, _StoreConstAction as StoreConstAction
# Constants (no need to change but allows for easy customization)
VERSION="1.0"
PROG_NAME=os.path.splitext(os.path.basename(__file__))[0]
PROG_VERSION=PROG_NAME + ' ' + VERSION
URL_API_SUFFIX='/validate/check'
ENV_VAR_USER='USER_NAME'
ENV_VAR_PWD='USER_PASSWORD'
ENV_VAR_SERIAL='TOKEN_SERIAL'
ENV_VAR_KEY='TOKEN_KEY'
ENV_VAR_NAME='CHECK_NAME'
OTP_DIGITS=6
OTP_TOTP_WINDOW=30
OTP_DIGEST_ALG=sha1
LOG_FORMAT='%(levelname)s - %(message)s'
LOG_FORMAT_FILE='%(asctime)s - ' + LOG_FORMAT
LOGGING_NONE=logging.CRITICAL + 10
NAGIOS_OK = ( 'OK', 0)
NAGIOS_WARN = ( 'WARNING', 1)
NAGIOS_CRITICAL = ( 'CRITICAL', 2 )
NAGIOS_UNKNOWN = ( 'UNKNOWN', 3 )
# Setup logging
logging.basicConfig(format=LOG_FORMAT)
logging.addLevelName(LOGGING_NONE, 'NONE')
logger = logging.getLogger(PROG_NAME)
logger.setLevel(logging.CRITICAL)
###################[ minimalistic HOTP/TOTP implementation ]###################
# based on Yoav Aner's code: http://blog.gingerlime.com/2010/once-upon-a-time #
# latest version: https://github.com/gingerlime/hotpie/blob/master/hotpie.py #
def HOTP(K, C, digits=OTP_DIGITS, digest=OTP_DIGEST_ALG):
"""Calculate the HOTP value for Key K and count C, returns OTP value"""
digest = hmac.new(key=K, msg=pack(b"!Q", C), digestmod=digest).hexdigest()
offset = int(digest[-1], 16)
return str(int(digest[(offset<<1):((offset<<1)+8)],16)&0x7fffffff)[-digits:]
def TOTP(K, C=None, d=OTP_DIGITS, win=OTP_TOTP_WINDOW, dg=OTP_DIGEST_ALG):
"""Calculate the TOTP value for Key K (using HOTP), returns OTP value"""
return HOTP(K, int((time() if C is None else C) / win), digits=d, digest=dg)
###############################################################################
def Password(K, C):
"""Dummy routine to represent Password authentication (without OTP)"""
return None
###################[ patch to force urllib on ipv4 / ipv6 ]###################
# Support functions to allow forcing urllib2.urlopen() IPv4 or IPv6 connections
# based on http://stackoverflow.com/questions/2014534/force-python-mechanize-urllib2-to-only-use-a-requests/6319043#6319043
# the trick is to wrap the original socket.getaddrinfo and enforce the family
socket.origGetAddrInfo = socket.getaddrinfo
socket.getAddrInfoFamily = 0
def getAddrInfoWrapper(host, port, family=0, socktype=0, proto=0, flags=0):
"""wrapper for socket.getaddrinfo() to connect only to a specific family"""
if family == 0:
family = socket.getAddrInfoFamily
result=socket.origGetAddrInfo(host, port, family, socktype, proto, flags)
logger.debug('connecting over IPv4 to %s:%d' if family==socket.AF_INET else
'connecting over IPv6 to %s:%d' if family==socket.AF_INET6 else
'connecting to %s:%d' ,result[0][4][0],result[0][4][1])
return result
socket.getaddrinfo = getAddrInfoWrapper
###############################################################################
################[ wrapper to stop ArgumentParser from exiting ]################
# based on http://stackoverflow.com/questions/14728376/i-want-python-argparse-to-throw-an-exception-rather-than-usage/14728477#14728477
# the only way to do this is overriding the error method and throw and Exception
class ArgumentParserError(Exception): pass
class ArgumentParser(StandardArgumentParser):
"""ArgumentParser not exiting with non-Nagios format message upon errors"""
def error(self, message):
raise ArgumentParserError(message)
##################[ Action to immediately set the log level ]##################
class SetSocketAddrFamily(StoreConstAction):
"""ArgumentParser action to set socket.getAddrInfo() Addr Family to const"""
def __call__(self, parser, namespace, values, option_string=None):
socket.getAddrInfoFamily = self.const
##################[ Action to immediately set the log level ]##################
class SetLogLevel(StoreConstAction):
"""ArgumentParser action to set log level to provided const value"""
def __call__(self, parser, namespace, values, option_string=None):
logging.getLogger(PROG_NAME).setLevel(self.const)
####################[ Action to immediately log to a file ]####################
class SetLogFile(StoreAction):
"""ArgumentParser action to log to file (sets up FileHandler accordingly)"""
def __call__(self, parser, namespace, values, option_string=None):
super(SetLogFile, self).__call__(parser,namespace,values,option_string)
formatter = logging.Formatter(LOG_FORMAT_FILE)
handler = logging.FileHandler(values)
handler.setFormatter(formatter)
logger = logging.getLogger(PROG_NAME)
logger.propagate = False
logger.addHandler(handler)
####################[ Action to load contents from a file ]####################
class LoadFromFile(StoreAction):
"""ArgumentParser action to load file contents into another variable """
def __init__(self,option_strings,dest,nargs=None,const=None,default=None,
type=None,choices=None,required=False,help=None,metavar=None):
if not isinstance(type, FileLoadType):
raise ArgumentParserError('LoadFromFile action option %s requires '
'type FileLoadType (got %s)' % (option_strings, type))
super(LoadFromFile, self).__init__(option_strings,dest,nargs,const,
default,type,choices,required,help,metavar)
def __call__(self, parser, namespace, values, option_string=None):
super(LoadFromFile,self).__call__(parser,namespace,values,option_string)
logger.info('reading %s from %s', self.type.name, values.name)
try:
content = values.readline().strip()
if self.type.close:
values.close()
setattr(namespace, self.type.name, self.type.type(content) \
if not isempty(content) else self.type.type())
except (IOError, ValueError) as e:
raise ArgumentParserError('cannot read %s from %s: %s' %
(self.type.name, values.name, e))
####################[ Enhanced FileType for LoadFromFile ]####################
class FileLoadType(FileType):
"""ArgumentParser FileType extension storing data needed by LoadFromFile"""
def __init__(self, name, valuetype=str, mode='r', close=None, bufsize=-1):
self.name = name
self.type = valuetype
self.close = mode == 'r' if close is None else close
super(FileLoadType, self).__init__(mode, bufsize)
##############[ Action to prompt for password if value is empty ]##############
class PasswordPrompt(StoreAction):
"""ArgumentParser action to prompt for password when empty (and store it)"""
def __call__(self, parser, namespace, values, option_string=None):
pwd = getpass('Please enter password: ') if values is None else values
super(PasswordPrompt, self).__call__(parser,namespace,pwd,option_string)
###############################################################################
def isempty(string):
"""Checks whether string 'str' provided is unset or empty"""
return string is None or len(string) == 0
def envvar(name, default=None):
"""Returns value of environment var 'name', or value of default otherwise"""
return os.environ.get(name, default)
def base16_32_64(string=None):
"""convert encoded string to binary, supports base16, base32 and base64
Args:
string (str) : base 16/32/64 encoded string
Returns:
binary version of string, provided it is base 16/32/64 encoded
"""
if not isempty(string):
try:
encoding, value = ('base16', b16decode(string, True))
except TypeError:
try:
encoding, value = ('base32', b32decode(string, False))
except TypeError:
encoding, value = ('base64', standard_b64decode(string))
logger.debug("converted %s encoded key '%s' to %d bytes of binary data",
encoding, string, len(value))
return value
def parse_args():
"""Parse command line and get parameters from environment, if present"""
# Setup argument parser, the workhorse gluing it all together
parser = ArgumentParser(
description='Nagios check for OTP validation against LinOTP/PrivacyIDEA'
)
parser.add_argument('-V', '--version',action="version",version=PROG_VERSION)
pgroup = parser.add_mutually_exclusive_group(required=True)
pgroup.add_argument('-u', '--url',
help='URL to check OTP authentication against')
pgroup.add_argument('-H', '--host',
help='hostname to test against (to construct URL)')
parser.add_argument('-p', '--port', type=int,
help='port number to connect to (only used with -H)')
parser.add_argument('-P', '--path',
help='URL path to be used (only used with -H)')
parser.add_argument('-S', '--no-ssl', action='store_true',
help='connect WITHOUT SSL (only used with -H)')
parser.add_argument('-n', '--name', default=envvar(ENV_VAR_NAME, PROG_NAME),
help="name in authentication request for logging "
"(defaults to '%s')" % PROG_NAME)
parser.add_argument('-w', '--warn', type=float,
help='Response time for warning status (seconds)')
parser.add_argument('-c','--critical', type=float,
help='Response time for critical status (seconds)')
pgroup = parser.add_mutually_exclusive_group(required=False)
pgroup.add_argument('-4', '--ipv4', action=SetSocketAddrFamily,
const=socket.AF_INET, help='connect using IPv4')
pgroup.add_argument('-6', '--ipv6', action=SetSocketAddrFamily,
const=socket.AF_INET6,help='connect using IPv6')
pgroup = parser.add_mutually_exclusive_group(required=False)
pgroup.add_argument('-q', '--quiet', default=logging.CRITICAL,
action=SetLogLevel, const=LOGGING_NONE,
help='quiet (no output, only exit with exit code)')
pgroup.add_argument('-v', '--verbose', help='more verbose output',
action=SetLogLevel, const=logging.INFO)
pgroup.add_argument('-d', '--debug', help='debug output (more verbose)',
action=SetLogLevel, const=logging.DEBUG)
parser.add_argument('-l', '--logfile', action=SetLogFile,
help='send logging output to logfile')
commonparser = ArgumentParser(add_help=False)
pgroup = commonparser.add_mutually_exclusive_group(required=False)
pgroup.add_argument('-l', '--login', default=envvar(ENV_VAR_USER),
help='username to login with, can only be omitted when '
'%s is set or -s/--serial is used' % ENV_VAR_USER)
pgroup.add_argument('-s', '--serial', default=envvar(ENV_VAR_SERIAL),
help='token serial to use, can only be omitted when %s '
'is set or -l/--login is used' % ENV_VAR_SERIAL)
pgroup = commonparser.add_mutually_exclusive_group(required=False)
pgroup.add_argument('-p', '--password', nargs='?', action=PasswordPrompt,
default=envvar(ENV_VAR_PWD),
help='password or OTP+PIN to authenticate, uses env. '
'var %s if not present and prompts when PASSWORD '
'omitted (use "" for empty password)'%ENV_VAR_PWD)
pgroup.add_argument('-P', '--passwordfile',
action=LoadFromFile,type=FileLoadType('password'),
help='read password/authentication secret from file'),
subparser = parser.add_subparsers(title='Implemented test modes / checks')
cmdparser = subparser.add_parser('password', parents=[commonparser],
help='perform test with provided secret')
cmdparser.set_defaults(func=Password, cmdparser=cmdparser)
otpparser = ArgumentParser(add_help=False)
pgroup = otpparser.add_mutually_exclusive_group(required=False)
pgroup.add_argument('-k', '--key', default=envvar(ENV_VAR_KEY),
type=base16_32_64, help='HOTP/TOTP key, can only be'
'omitted if %s is set or -K is used' % ENV_VAR_KEY)
pgroup.add_argument('-K', '--keyfile', action=LoadFromFile,
type=FileLoadType('key', base16_32_64),
help='read HOTP key from the file specified'),
otpparser.add_argument('-m','--merge', choices=['pwdOTP', 'OTPpwd' ],
default='pwdOTP', help='how to merge password and OTP')
cmdparser = subparser.add_parser('hotp', parents=[commonparser, otpparser],
help='perform HOTP check using provided key and count')
pgroup = cmdparser.add_mutually_exclusive_group(required=True)
pgroup.add_argument('-c', '--count', type=int,
help='count to be used to calculate the HTOP value')
pgroup.add_argument('-C', '--countfile',type=FileLoadType('count',int,'r+'),
action=LoadFromFile,
help='read HOTP count from file and update it')
cmdparser.add_argument('-i', '--increment', type=int, default=2,
help='increment value for count (default=1)')
cmdparser.set_defaults(func=HOTP, cmdparser=cmdparser)
cmdparser = subparser.add_parser('totp',parents=[commonparser, otpparser],
help='perform TOTP test using provided key')
cmdparser.set_defaults(func=TOTP, cmdparser=cmdparser)
# parse arguments and post-process command line options
args = parser.parse_args()
# Generate the URL if not provided on the command line
if isempty(args.url):
args.url = 'http://' if args.no_ssl else 'https://'
args.url+= args.host
if not isempty(args.port):
args.url+= ':' + args.port
if not isempty(args.path):
args.url+= args.path if args.path[0]=='/' else '/' + args.path
# We should now be ready to authenticate, fail if that's not the case
if args.func == Password:
if isempty(args.login) and isempty(args.serial) \
or isempty(args.password):
args.cmdparser.error('user/serial and a secret are required!')
elif args.func == HOTP or args.func == TOTP:
if isempty(args.login) and isempty(args.serial) or isempty(args.key):
args.cmdparser.error('user/serial and a key are required!')
else:
args.cmdparser.error("BUG: mode %s is not supported"%args.func.__name__)
# if we got here all seems OK
return args
def checkotp(url, subject, secret, isserial=False, nas=None):
"""Check a subject (user or token) with secret against PrivacyIDEA / LinOTP.
Args:
url (str) : URL to connect to, URL_API_SUFFIX is added if missing
subject (str) : subject to authenticate (user or a token serial)
secret (str) : secret (password+OTP) to authenticate with
isserial (bool): True if subject is a token serial (optional)
nas (str) : string to pass-on as the nas string (optional)
Returns:
The result response from the PrivacyIDEA server (mapping object)
"""
# Complete (fix) URL
if not url.endswith(URL_API_SUFFIX):
url += URL_API_SUFFIX[1:] if url[-1] == '/' else URL_API_SUFFIX
logger.info('connecting to %s', url)
# Prepare the parameters
params = { 'pass': secret, 'serial' if isserial else 'user': subject }
if not isempty(nas):
params['nas'] = nas
if logger.isEnabledFor(logging.DEBUG):
logger.debug('HTTP request parameters: %s',
', '.join(map(lambda (k,v): '%s="%s"' % (k, v if k!='pass'
else '***MASKED***'), params.iteritems())))
# Perform the API authentication request
response = json.load(urlopen(Request(url, data=urlencode(params))))
if logger.isEnabledFor(logging.DEBUG):
logger.debug('result: %s', json.dumps(response, indent=4))
return response
def nagios_exit(status, message, data=None):
"""exit 'nagios-style', print status and message followed by the data"""
if logger.isEnabledFor(logging.CRITICAL):
if data is not None and len(data) > 0:
perfdata=map(lambda (k,v): "'%s'=%s" %(k,v if not isinstance(v,list)
else ';'.join(map(lambda x:'' if x is None else str(x),v)))
,data.iteritems())
perfstr = ' | ' + ' '.join(perfdata)
else:
perfstr = ''
print 'OTP %s: %s%s' % (status[0], message, perfstr)
sys.exit(status[1])
if __name__ == '__main__':
try:
args = parse_args()
except ArgumentParserError as e:
nagios_exit(NAGIOS_UNKNOWN,'error with setup: ' + e.message)
except (KeyboardInterrupt, EOFError) as e:
print
nagios_exit(NAGIOS_UNKNOWN,'initialization aborted')
message = args.func.__name__ + ' authentication'
if 'key' in args:
secret = args.func(args.key, args.count if 'count' in args else None)
if not isempty(args.password):
secret = (secret + args.password) if args.merge=='OTPpwd' \
else (args.password + secret)
else:
secret = args.password
try:
starttime = time()
response=checkotp(args.url,
args.login if isempty(args.serial) else args.serial,
secret, isserial=not isempty(args.serial),
nas=args.name)
endtime = time()
except (HTTPError, URLError) as e:
nagios_exit(NAGIOS_CRITICAL,'%s request failed: %s' % (message, e))
except (KeyboardInterrupt, EOFError) as e:
nagios_exit(NAGIOS_UNKNOWN,'%s request aborted' % message)
resultdata = response.get('result')
if resultdata is None or not resultdata['status']:
nagios_exit(NAGIOS_CRITICAL,'%s request processing failed' % message)
authenticated = resultdata['status'] and resultdata['value']
detaildata = response.get('detail')
elapse = endtime-starttime
if logger.isEnabledFor(logging.INFO):
logger.info('Got response from : %s', response.get('version'))
logger.info('Processing time : %.2fs', elapse)
logger.info('Got valid result : %s', resultdata.get('status'))
logger.info('Authenticated : %s', authenticated)
for field in 'message', 'type', 'serial':
if field in detaildata:
logger.info('Token %-12s: %s', field, detaildata.get(field))
errmsgs = []
if authenticated and 'countfile' in args and args.increment > 0:
fname = args.countfile.name
nwcnt = args.count + args.increment
logger.info('updating count in %s from %d to %d',fname,args.count,nwcnt)
try:
with args.countfile as countfile:
countfile.seek(0)
countfile.write(str(nwcnt))
countfile.truncate()
except IOError as e:
errmsgs.append('unable to update countfile %s: %s' % (fname, e))
logger.critical(errmsgs[-1])
for k, src, g in ('serial',detaildata,' of '),('version',response,' with '):
if k in src:
message += g + src.get(k)
if authenticated:
nagiosresult = NAGIOS_OK
message += ' succeeded'
else:
nagiosresult = NAGIOS_CRITICAL
message += ' failed'
if args.critical is not None and elapse > args.critical:
errmsgs.append('took too long (%.2fs > %.2fs)' % (elapse,args.critical))
logger.critical('response %s', errmsgs[-1])
elif args.warn is not None and elapse > args.warn:
if nagiosresult != NAGIOS_CRITICAL:
nagiosresult = NAGIOS_WARN
errmsgs.append('is slow (%.2fs > %.2fs)' % (elapse, args.warn))
logger.warn('response time %s', errmsgs[-1])
else:
message+= ' in %.1fs' % elapse
logger.info('response completed in %.1fs', elapse)
if len(errmsgs) > 0:
message+= ' and ' if nagiosresult == NAGIOS_CRITICAL else ' but '
message+= ' and '.join(errmsgs)
if nagiosresult == NAGIOS_OK:
nagiosresult = NAGIOS_CRITICAL
if 'message' in detaildata:
message += ': ' + detaildata.get('message')
nagios_exit(nagiosresult, message, {
'time': [ elapse, args.warn, args.critical, 0, None]})