Welcome to mirror list, hosted at ThFree Co, Russian Federation.

test_dnsviz_grok_run.py « tests - github.com/dnsviz/dnsviz.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: c4d0933ab1b372b3bc0053e0b4d78c1758218d90 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import gzip
import io
import os
import subprocess
import tempfile
import unittest

DATA_DIR = os.path.dirname(__file__)
EXAMPLE_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'example-authoritative.json.gz')
EXAMPLE_RECURSIVE = os.path.join(DATA_DIR, 'data', 'example-recursive.json.gz')
ROOT_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'root-authoritative.json.gz')
ROOT_RECURSIVE = os.path.join(DATA_DIR, 'data', 'root-recursive.json.gz')

class DNSGrokRunTestCase(unittest.TestCase):
    def setUp(self):
        self.devnull = io.open('/dev/null', 'wb')
        self.current_cwd = os.getcwd()
        self.dnsviz_bin = os.path.join(self.current_cwd, 'bin', 'dnsviz')

        tk = 'example.com. IN DNSKEY 256 3 7 AwEAAZ2YEuBl4X58v1CezDfZjT1viYn5kY3MF3lSDjvHjMZ6gJlYt4Qq oIdpChifmeJldEX9/wPc04Tg7MlEfV3m0x2j80dMyObM0FZTxzMgbTFk Zs0AWrDXELieGkFZv1FB9YoxSX2XqvpFxwvPyyszUtCy/c5hrb6vfKRB Jh+qIO+NsNrl6O8NiYjWWNjdiFw+c2BxzpArQoaA+rcoyDYwH4xGpvTw YLnE9HmkwTSQuwASkgWgX3KgTmsDEw4I0P5Tk+wvmNnaqDhmFMHJK5Oh 92wUX+ppxxSgUx4UIJmftzi7sCg0qekIYUf99Dkn7OlC8X0rjj+xO4cD hbTjGkxmsD0='

        with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.tk_file:
            self.tk_file.write(tk.encode('utf-8'))

        with gzip.open(EXAMPLE_AUTHORITATIVE, 'rb') as example_auth_in:
            with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_auth_out:
                self.example_auth_out.write(example_auth_in.read())

        with gzip.open(EXAMPLE_RECURSIVE, 'rb') as example_rec_in:
            with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_rec_out:
                self.example_rec_out.write(example_rec_in.read())

        with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.names_file:
            self.names_file.write('example.com\nexample.net\n'.encode('utf-8'))

        self.output = tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False)
        self.output.close()

        self.run_cwd = tempfile.mkdtemp(prefix='dnsviz')

    def tearDown(self):
        self.devnull.close()
        os.remove(self.tk_file.name)
        os.remove(self.example_auth_out.name)
        os.remove(self.example_rec_out.name)
        os.remove(self.names_file.name)
        os.remove(self.output.name)
        subprocess.check_call(['rm', '-rf', self.run_cwd])

    def test_dnsviz_grok_input(self):
        with io.open(self.output.name, 'wb') as fh_out:
            with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
                p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
                p.communicate(fh_in.read())
                self.assertEqual(p.returncode, 0)

        with io.open(self.output.name, 'wb') as fh_out:
            with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
                p = subprocess.Popen([self.dnsviz_bin, 'grok', '-r', '-'], stdin=subprocess.PIPE, stdout=fh_out)
                p.communicate(fh_in.read())
                self.assertEqual(p.returncode, 0)

        with io.open(self.output.name, 'wb') as fh:
            self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name], stdout=fh), 0)

    def test_dnsviz_grok_names_input(self):
        with io.open(self.output.name, 'wb') as fh:
            self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-f', self.names_file.name], stdout=fh), 0)

        with io.open(self.output.name, 'wb') as fh_out:
            with io.open(self.names_file.name, 'rb') as fh_in:
                p = subprocess.Popen([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-f', '-'], stdin=subprocess.PIPE, stdout=fh_out)
                p.communicate(fh_in.read())
                self.assertEqual(p.returncode, 0)

    def test_dnsviz_grok_tk_input(self):
        pass

        #FIXME
        #with io.open(self.output.name, 'wb') as fh:
        #    self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-t', self.tk_file.name], stdout=fh), 0)

        #with io.open(self.output.name, 'wb') as fh_out:
        #    with io.open(self.tk_file.name, 'rb') as fh_in:
        #        p = subprocess.Popen([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-t', '-'], stdin=subprocess.PIPE, stdout=fh_out)
        #        p.communicate(fh_in.read())
        #        self.assertEqual(p.returncode, 0)

    def test_dnsviz_grok_output(self):
        with io.open(self.output.name, 'wb') as fh:
            self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name], cwd=self.run_cwd, stdout=fh), 0)

        with io.open(self.output.name, 'wb') as fh:
            self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-o', '-'], cwd=self.run_cwd, stdout=fh), 0)

        with io.open(self.output.name, 'wb') as fh:
            self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-o', 'all.json'], cwd=self.run_cwd, stdout=fh), 0)
            self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'all.json')))

    def test_dnsviz_grok_input_auth(self):
        with io.open(self.output.name, 'wb') as fh_out:
            with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
                p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
                p.communicate(fh_in.read())
                self.assertEqual(p.returncode, 0)

        with io.open(self.output.name, 'wb') as fh_out:
            with gzip.open(ROOT_AUTHORITATIVE) as fh_in:
                p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
                p.communicate(fh_in.read())
                self.assertEqual(p.returncode, 0)

    def test_dnsviz_grok_input_rec(self):
        with io.open(self.output.name, 'wb') as fh_out:
            with gzip.open(EXAMPLE_RECURSIVE) as fh_in:
                p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
                p.communicate(fh_in.read())
                self.assertEqual(p.returncode, 0)

        with io.open(self.output.name, 'wb') as fh_out:
            with gzip.open(ROOT_RECURSIVE) as fh_in:
                p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
                p.communicate(fh_in.read())
                self.assertEqual(p.returncode, 0)

if __name__ == '__main__':
    unittest.main()