1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
import gzip
import io
import os
import subprocess
import tempfile
import unittest
DATA_DIR = os.path.dirname(__file__)
EXAMPLE_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'example-authoritative.json.gz')
EXAMPLE_RECURSIVE = os.path.join(DATA_DIR, 'data', 'example-recursive.json.gz')
ROOT_AUTHORITATIVE = os.path.join(DATA_DIR, 'data', 'root-authoritative.json.gz')
ROOT_RECURSIVE = os.path.join(DATA_DIR, 'data', 'root-recursive.json.gz')
class DNSGrokRunTestCase(unittest.TestCase):
def setUp(self):
self.devnull = io.open('/dev/null', 'wb')
self.current_cwd = os.getcwd()
self.dnsviz_bin = os.path.join(self.current_cwd, 'bin', 'dnsviz')
tk = 'example.com. IN DNSKEY 256 3 7 AwEAAZ2YEuBl4X58v1CezDfZjT1viYn5kY3MF3lSDjvHjMZ6gJlYt4Qq oIdpChifmeJldEX9/wPc04Tg7MlEfV3m0x2j80dMyObM0FZTxzMgbTFk Zs0AWrDXELieGkFZv1FB9YoxSX2XqvpFxwvPyyszUtCy/c5hrb6vfKRB Jh+qIO+NsNrl6O8NiYjWWNjdiFw+c2BxzpArQoaA+rcoyDYwH4xGpvTw YLnE9HmkwTSQuwASkgWgX3KgTmsDEw4I0P5Tk+wvmNnaqDhmFMHJK5Oh 92wUX+ppxxSgUx4UIJmftzi7sCg0qekIYUf99Dkn7OlC8X0rjj+xO4cD hbTjGkxmsD0='
with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.tk_file:
self.tk_file.write(tk.encode('utf-8'))
with gzip.open(EXAMPLE_AUTHORITATIVE, 'rb') as example_auth_in:
with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_auth_out:
self.example_auth_out.write(example_auth_in.read())
with gzip.open(EXAMPLE_RECURSIVE, 'rb') as example_rec_in:
with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.example_rec_out:
self.example_rec_out.write(example_rec_in.read())
with tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False) as self.names_file:
self.names_file.write('example.com\nexample.net\n'.encode('utf-8'))
self.output = tempfile.NamedTemporaryFile('wb', prefix='dnsviz', delete=False)
self.output.close()
self.run_cwd = tempfile.mkdtemp(prefix='dnsviz')
def tearDown(self):
self.devnull.close()
os.remove(self.tk_file.name)
os.remove(self.example_auth_out.name)
os.remove(self.example_rec_out.name)
os.remove(self.names_file.name)
os.remove(self.output.name)
subprocess.check_call(['rm', '-rf', self.run_cwd])
def test_dnsviz_grok_input(self):
with io.open(self.output.name, 'wb') as fh_out:
with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
p.communicate(fh_in.read())
self.assertEqual(p.returncode, 0)
with io.open(self.output.name, 'wb') as fh_out:
with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
p = subprocess.Popen([self.dnsviz_bin, 'grok', '-r', '-'], stdin=subprocess.PIPE, stdout=fh_out)
p.communicate(fh_in.read())
self.assertEqual(p.returncode, 0)
with io.open(self.output.name, 'wb') as fh:
self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name], stdout=fh), 0)
def test_dnsviz_grok_names_input(self):
with io.open(self.output.name, 'wb') as fh:
self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-f', self.names_file.name], stdout=fh), 0)
with io.open(self.output.name, 'wb') as fh_out:
with io.open(self.names_file.name, 'rb') as fh_in:
p = subprocess.Popen([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-f', '-'], stdin=subprocess.PIPE, stdout=fh_out)
p.communicate(fh_in.read())
self.assertEqual(p.returncode, 0)
def test_dnsviz_grok_tk_input(self):
pass
#FIXME
#with io.open(self.output.name, 'wb') as fh:
# self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-t', self.tk_file.name], stdout=fh), 0)
#with io.open(self.output.name, 'wb') as fh_out:
# with io.open(self.tk_file.name, 'rb') as fh_in:
# p = subprocess.Popen([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-t', '-'], stdin=subprocess.PIPE, stdout=fh_out)
# p.communicate(fh_in.read())
# self.assertEqual(p.returncode, 0)
def test_dnsviz_grok_output(self):
with io.open(self.output.name, 'wb') as fh:
self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name], cwd=self.run_cwd, stdout=fh), 0)
with io.open(self.output.name, 'wb') as fh:
self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-o', '-'], cwd=self.run_cwd, stdout=fh), 0)
with io.open(self.output.name, 'wb') as fh:
self.assertEqual(subprocess.call([self.dnsviz_bin, 'grok', '-r', self.example_auth_out.name, '-o', 'all.json'], cwd=self.run_cwd, stdout=fh), 0)
self.assertTrue(os.path.exists(os.path.join(self.run_cwd, 'all.json')))
def test_dnsviz_grok_input_auth(self):
with io.open(self.output.name, 'wb') as fh_out:
with gzip.open(EXAMPLE_AUTHORITATIVE) as fh_in:
p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
p.communicate(fh_in.read())
self.assertEqual(p.returncode, 0)
with io.open(self.output.name, 'wb') as fh_out:
with gzip.open(ROOT_AUTHORITATIVE) as fh_in:
p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
p.communicate(fh_in.read())
self.assertEqual(p.returncode, 0)
def test_dnsviz_grok_input_rec(self):
with io.open(self.output.name, 'wb') as fh_out:
with gzip.open(EXAMPLE_RECURSIVE) as fh_in:
p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
p.communicate(fh_in.read())
self.assertEqual(p.returncode, 0)
with io.open(self.output.name, 'wb') as fh_out:
with gzip.open(ROOT_RECURSIVE) as fh_in:
p = subprocess.Popen([self.dnsviz_bin, 'grok'], stdin=subprocess.PIPE, stdout=fh_out)
p.communicate(fh_in.read())
self.assertEqual(p.returncode, 0)
if __name__ == '__main__':
unittest.main()
|