diff options
author | Casey Deccio <casey@deccio.net> | 2019-06-28 22:50:15 +0300 |
---|---|---|
committer | Casey Deccio <casey@deccio.net> | 2019-06-28 22:50:15 +0300 |
commit | b0da7694ca4597e03e9dcb0d124318c5de9910ef (patch) | |
tree | 8e06d10a0e43aeecf6a27af1fcc7eb6304f26d09 | |
parent | 6e8451ce2944eef0c64f629fa99f2ef8bcce661f (diff) |
Add status command
-rw-r--r-- | dnsviz/commands/status.py | 353 |
1 files changed, 353 insertions, 0 deletions
diff --git a/dnsviz/commands/status.py b/dnsviz/commands/status.py new file mode 100644 index 0000000..9be4e60 --- /dev/null +++ b/dnsviz/commands/status.py @@ -0,0 +1,353 @@ +#!/usr/bin/env python +# +# This file is a part of DNSViz, a tool suite for DNS/DNSSEC monitoring, +# analysis, and visualization. +# Created by Casey Deccio (casey@deccio.net) +# +# Copyright 2014-2016 VeriSign, Inc. +# +# Copyright 2016-2019 Casey Deccio +# +# DNSViz is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# DNSViz is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with DNSViz. If not, see <http://www.gnu.org/licenses/>. +# + +from __future__ import unicode_literals + +import codecs +import getopt +import io +import json +import logging +import os +import re +import sys + +# minimal support for python2.6 +try: + from collections import OrderedDict +except ImportError: + from ordereddict import OrderedDict + +import dns.exception, dns.name + +from dnsviz.analysis import status as Status +from dnsviz.analysis import TTLAgnosticOfflineDomainNameAnalysis, DNS_RAW_VERSION +from dnsviz.format import humanize_name, latin1_binary_to_string as lb2s +from dnsviz.util import get_trusted_keys, get_default_trusted_keys + +logging.basicConfig(level=logging.WARNING, format='%(message)s') +logger = logging.getLogger() + +def usage(err=None): + if err is not None: + err += '\n\n' + else: + err = '' + sys.stderr.write('''%sUsage: %s %s [options] [domain_name...] + +Print the assessment of diagnostic DNS queries. + +Options: + -f <filename> - Read names from a file. + -r <filename> - Read diagnostic queries from a file. + -t <filename> - Use trusted keys from the designated file. + -a <alg>[,<alg>...] + - Support only the specified DNSSEC algorithm(s). + -d <digest_alg>[,<digest_alg>...] + - Support only the specified DNSSEC digest algorithm(s). + -C - Enforce DNS cookies strictly. + -P - Allow private IP addresses for authoritative DNS servers. + -O - Derive the filename(s) from domain name(s). + -o <filename> - Save the output to the specified file. + -h - Display the usage and exit. +''' % (err, sys.argv[0], __name__.split('.')[-1])) + +def finish_graph(name_objs, filename): + if filename is None: + filename = sys.stdout.fileno() + try: + fh = io.open(filename, 'w', encoding='utf-8') + except IOError as e: + logger.error('%s: "%s"' % (e.strerror, filename)) + sys.exit(3) + + show_colors = fh.isatty() and os.environ.get('TERM', 'dumb') != 'dumb' + + tuples = [] + for name_obj in name_objs: + fh.write(textualize_status_output(name_obj.zone, show_colors)) + +TERM_COLOR_MAP = { + 'BOLD': '\033[1m', + 'RESET': '\033[0m', + Status.SERVER_CHECKLIST_STATUS_OK: '\033[36m', + Status.SERVER_CHECKLIST_STATUS_INDETERMINATE: '\033[37m', + Status.SERVER_CHECKLIST_STATUS_WARNING: '\033[33m', + Status.SERVER_CHECKLIST_STATUS_ERROR: '\033[31m', +} + +STATUS_MAP = { + Status.SERVER_CHECKLIST_STATUS_OK: '.', + Status.SERVER_CHECKLIST_STATUS_INDETERMINATE: '-', + Status.SERVER_CHECKLIST_STATUS_WARNING: '?', + Status.SERVER_CHECKLIST_STATUS_ERROR: '!', +} + +def textualize_status_output(zone_obj, show_color): + s = '' + + name_template = '%(status_color)s%(name)s%(color_reset)s\n' + server_status_template = '[%(status_color)s %(status)s%(color_reset)s] ' + server_row_template = ' [%(status_color)s%(server_index)2d%(color_reset)s] %(server_name)s (%(server)s)\n' + column_header_template = '[%(status_color)s%(server_index)2d%(color_reset)s] ' + server_status_header = '%(indent)s%(status_color)s%(status)s%(color_reset)s: ' + + params = {} + params['status_color'] = '' + params['status_color_rdata'] = '' + + if show_color: + params['color_reset'] = TERM_COLOR_MAP['RESET'] + else: + params['color_reset'] = '' + + params['name'] = humanize_name(zone_obj.name) + if show_color: + params['status_color'] = TERM_COLOR_MAP['BOLD'] + s += name_template % params + + server_list = [(ip, zone_obj.get_ns_name_for_ip(ip)[0]) for ip in zone_obj.get_auth_or_designated_servers()] + server_list = sorted(server_list, key=lambda x: (x[1][0], x[0])) + + i = 1 + if show_color: + params['status_color'] = TERM_COLOR_MAP['BOLD'] + for server, ns_name in server_list: + params['server_index'] = i + params['server_name'] = humanize_name(ns_name[0]) + params['server'] = server + s += server_row_template % params + i += 1 + + indent_size = max([len(status) for status in zone_obj.server_checklist]) + 2 + s += (' ' * indent_size) + ' ' + + if show_color: + params['status_color'] = TERM_COLOR_MAP['BOLD'] + for i in range(len(server_list)): + params['server_index'] = i + 1 + s += column_header_template % params + s += '\n' + + for status in zone_obj.server_checklist: + params['indent'] = ' ' * (indent_size - len(status)) + if show_color: + params['status_color'] = TERM_COLOR_MAP['BOLD'] + params['status'] = status + s += server_status_header % params + for server, ns_name in server_list: + if server in zone_obj.server_checklist[status]: + st = zone_obj.server_checklist[status][server] + if show_color: + params['status_color'] = TERM_COLOR_MAP[st.status] + params['status'] = STATUS_MAP[st.status] + s += server_status_template % params + else: + if show_color: + params['status_color'] = TERM_COLOR_MAP[Status.SERVER_CHECKLIST_STATUS_INDETERMINATE] + params['status'] = STATUS_MAP[Status.SERVER_CHECKLIST_STATUS_INDETERMINATE] + s += server_status_template % params + s += '\n' + + return s + +def main(argv): + try: + try: + opts, args = getopt.getopt(argv[1:], 'f:r:t:a:d:CPOo:h') + except getopt.GetoptError as e: + sys.stderr.write('%s\n' % str(e)) + sys.exit(1) + + # collect trusted keys + trusted_keys = [] + for opt, arg in opts: + if opt == '-t': + try: + with io.open(arg, 'r', encoding='utf-8') as fh: + tk_str = fh.read() + except IOError as e: + logger.error('%s: "%s"' % (e.strerror, arg)) + sys.exit(3) + try: + trusted_keys.extend(get_trusted_keys(tk_str)) + except dns.exception.DNSException: + logger.error('There was an error parsing the trusted keys file: "%s"' % arg) + sys.exit(3) + + opts = dict(opts) + if '-h' in opts: + usage() + sys.exit(0) + + if '-f' in opts and args: + sys.stderr.write('If -f is used, then domain names may not supplied as command line arguments.\n') + sys.exit(1) + + if '-a' in opts: + try: + supported_algs = set([int(x) for x in opts['-a'].split(',')]) + except ValueError: + sys.stderr.write('The list of algorithms was invalid: "%s"\n' % opts['-a']) + sys.exit(1) + else: + supported_algs = None + + if '-d' in opts: + try: + supported_digest_algs = set([int(x) for x in opts['-d'].split(',')]) + except ValueError: + sys.stderr.write('The list of digest algorithms was invalid: "%s"\n' % opts['-d']) + sys.exit(1) + else: + supported_digest_algs = None + + strict_cookies = '-C' in opts + allow_private = '-P' in opts + + if '-o' in opts and '-O' in opts: + sys.stderr.write('The -o and -O options may not be used together.\n') + sys.exit(1) + + if '-r' not in opts or opts['-r'] == '-': + opt_r = sys.stdin.fileno() + else: + opt_r = opts['-r'] + try: + with io.open(opt_r, 'r', encoding='utf-8') as fh: + analysis_str = fh.read() + except IOError as e: + logger.error('%s: "%s"' % (e.strerror, opts.get('-r', '-'))) + sys.exit(3) + if not analysis_str: + if opt_r != sys.stdin.fileno(): + logger.error('No input.') + sys.exit(3) + try: + analysis_structured = json.loads(analysis_str) + except ValueError: + logger.error('There was an error parsing the JSON input: "%s"' % opts.get('-r', '-')) + sys.exit(3) + + # check version + if '_meta._dnsviz.' not in analysis_structured or 'version' not in analysis_structured['_meta._dnsviz.']: + logger.error('No version information in JSON input.') + sys.exit(3) + try: + major_vers, minor_vers = [int(x) for x in str(analysis_structured['_meta._dnsviz.']['version']).split('.', 1)] + except ValueError: + logger.error('Version of JSON input is invalid: %s' % analysis_structured['_meta._dnsviz.']['version']) + sys.exit(3) + # ensure major version is a match and minor version is no greater + # than the current minor version + curr_major_vers, curr_minor_vers = [int(x) for x in str(DNS_RAW_VERSION).split('.', 1)] + if major_vers != curr_major_vers or minor_vers > curr_minor_vers: + logger.error('Version %d.%d of JSON input is incompatible with this software.' % (major_vers, minor_vers)) + sys.exit(3) + + names = OrderedDict() + if '-f' in opts: + if opts['-f'] == '-': + opts['-f'] = sys.stdin.fileno() + try: + f = io.open(opts['-f'], 'r', encoding='utf-8') + except IOError as e: + logger.error('%s: "%s"' % (e.strerror, opts['-f'])) + sys.exit(3) + for line in f: + name = line.strip() + try: + name = dns.name.from_text(name) + except UnicodeDecodeError as e: + logger.error('%s: "%s"' % (e, name)) + except dns.exception.DNSException: + logger.error('The domain name was invalid: "%s"' % name) + else: + if name not in names: + names[name] = None + f.close() + else: + if args: + # python3/python2 dual compatibility + if isinstance(args[0], bytes): + args = [codecs.decode(x, sys.getfilesystemencoding()) for x in args] + else: + try: + args = analysis_structured['_meta._dnsviz.']['names'] + except KeyError: + logger.error('No names found in JSON input!') + sys.exit(3) + for name in args: + try: + name = dns.name.from_text(name) + except UnicodeDecodeError as e: + logger.error('%s: "%s"' % (e, name)) + except dns.exception.DNSException: + logger.error('The domain name was invalid: "%s"' % name) + else: + if name not in names: + names[name] = None + + latest_analysis_date = None + name_objs = [] + cache = {} + for name in names: + name_str = lb2s(name.canonicalize().to_text()) + if name_str not in analysis_structured or analysis_structured[name_str].get('stub', True): + logger.error('The analysis of "%s" was not found in the input.' % lb2s(name.to_text())) + continue + name_obj = TTLAgnosticOfflineDomainNameAnalysis.deserialize(name, analysis_structured, cache, strict_cookies=strict_cookies, allow_private=allow_private) + name_objs.append(name_obj) + + if latest_analysis_date is None or latest_analysis_date > name_obj.analysis_end: + latest_analysis_date = name_obj.analysis_end + + if not name_objs: + sys.exit(4) + + if '-t' not in opts: + trusted_keys = get_default_trusted_keys(latest_analysis_date) + + for name_obj in name_objs: + name_obj.populate_status(trusted_keys, supported_algs=supported_algs, supported_digest_algs=supported_digest_algs) + + if '-O' in opts: + if name_obj.name == dns.name.root: + name = 'root' + else: + name = lb2s(name_obj.name.canonicalize().to_text()).rstrip('.') + finish_graph([name_obj], '%s.txt' % name) + + if '-O' not in opts: + if '-o' not in opts or opts['-o'] == '-': + finish_graph(name_objs, None) + else: + finish_graph(name_objs, opts['-o']) + + except KeyboardInterrupt: + logger.error('Interrupted.') + sys.exit(4) + +if __name__ == "__main__": + main(sys.argv) |