Welcome to mirror list, hosted at ThFree Co, Russian Federation.

authplugin-example.py « contrib - github.com/mRemoteNG/PuTTYNG.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 395bd2c81f7aae3d1f5de4eb23c0b9d2c916c8d5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
#!/usr/bin/env python3

# This is a demonstration example of how to write a
# keyboard-interactive authentication helper plugin using PuTTY's
# protocol for involving it in SSH connection setup.

import io
import os
import struct
import sys

# Exception class we'll use to get a clean exit on EOF.
class PluginEOF(Exception): pass

# ----------------------------------------------------------------------
#
# Marshalling and unmarshalling routines to write and read the
# necessary SSH data types to/from a binary file handle (which can
# include an io.BytesIO if you need to encode/decode in-process).
#
# Error handling is a totally ad-hoc mixture of 'assert' and just
# assuming things will have the right type, or be the right length of
# tuple, or be valid UTF-8. So it should be _robust_, in the sense
# that you'll get a Python exception if anything fails. But no
# sensible error reporting or recovery is implemented.
#
# That should be good enough, because PuTTY will log the plugin's
# standard error in its Event Log, so if the plugin crashes, you'll be
# able to retrieve the traceback.

def wr_byte(fh, b):
    assert 0 <= b < 0x100
    fh.write(bytes([b]))

def wr_boolean(fh, b):
    wr_byte(fh, 1 if b else 0)

def wr_uint32(fh, u):
    assert 0 <= u < 0x100000000
    fh.write(struct.pack(">I", u))

def wr_string(fh, s):
    wr_uint32(fh, len(s))
    fh.write(s)

def wr_string_utf8(fh, s):
    wr_string(fh, s.encode("UTF-8"))

def rd_n(fh, n):
    data = fh.read(n)
    if len(data) < n:
        raise PluginEOF()
    return data

def rd_byte(fh):
    return rd_n(fh, 1)[0]

def rd_boolean(fh):
    return rd_byte(fh) != 0

def rd_uint32(fh):
    return struct.unpack(">I", rd_n(fh, 4))[0]

def rd_string(fh):
    length = rd_uint32(fh)
    return rd_n(fh, length)

def rd_string_utf8(fh):
    return rd_string(fh).decode("UTF-8")

# ----------------------------------------------------------------------
#
# Protocol definitions.

our_max_version = 2

PLUGIN_INIT                 =  1
PLUGIN_INIT_RESPONSE        =  2
PLUGIN_PROTOCOL             =  3
PLUGIN_PROTOCOL_ACCEPT      =  4
PLUGIN_PROTOCOL_REJECT      =  5
PLUGIN_AUTH_SUCCESS         =  6
PLUGIN_AUTH_FAILURE         =  7
PLUGIN_INIT_FAILURE         =  8
PLUGIN_KI_SERVER_REQUEST    = 20
PLUGIN_KI_SERVER_RESPONSE   = 21
PLUGIN_KI_USER_REQUEST      = 22
PLUGIN_KI_USER_RESPONSE     = 23

# ----------------------------------------------------------------------
#
# Classes to make it easy to construct and receive messages.
#
# OutMessage is constructed with the message type; then you use the
# wr_foo() routines to add fields to it, and finally call its send()
# method.
#
# InMessage is constructed via the expect() class method, to which you
# give a list of message types you expect to see one of at this stage.
# Once you've got one, you can rd_foo() fields from it.

class OutMessage:
    def __init__(self, msgtype):
        self.buf = io.BytesIO()
        wr_byte(self.buf, msgtype)
        self.write = self.buf.write

    def send(self, fh=sys.stdout.buffer):
        wr_string(fh, self.buf.getvalue())
        fh.flush()

class InMessage:
    @classmethod
    def expect(cls, expected_types, fh=sys.stdin.buffer):
        self = cls()
        self.buf = io.BytesIO(rd_string(fh))
        self.msgtype = rd_byte(self.buf)
        self.read = self.buf.read

        if self.msgtype not in expected_types:
            raise ValueError("received packet type {:d}, expected {}".format(
                self.msgtype, ",".join(map("{:d}".format,
                                           sorted(expected_types)))))
        return self

# ----------------------------------------------------------------------
#
# The main implementation of the protocol.

def protocol():
    # Start by expecting PLUGIN_INIT.
    msg = InMessage.expect({PLUGIN_INIT})
    their_version = rd_uint32(msg)
    hostname = rd_string_utf8(msg)
    port = rd_uint32(msg)
    username = rd_string_utf8(msg)
    print(f"Got hostname {hostname!r}, port {port!r}", file=sys.stderr)

    # Decide which protocol version we're speaking.
    version = min(their_version, our_max_version)
    assert version != 0, "Protocol version 0 does not exist"

    if "TESTPLUGIN_INIT_FAIL" in os.environ:
        # Test the plugin failing at startup time.
        msg = OutMessage(PLUGIN_INIT_FAILURE)
        wr_string_utf8(msg, os.environ["TESTPLUGIN_INIT_FAIL"])
        msg.send()
        return

    # Send INIT_RESPONSE, with our protocol version and an overridden
    # username.
    #
    # By default this test plugin doesn't override the username, but
    # you can make it do so by setting TESTPLUGIN_USERNAME in the
    # environment.
    msg = OutMessage(PLUGIN_INIT_RESPONSE)
    wr_uint32(msg, version)
    wr_string_utf8(msg, os.environ.get("TESTPLUGIN_USERNAME", ""))
    msg.send()

    # Outer loop run once per authentication protocol.
    while True:
        # Expect a message telling us what the protocol is.
        msg = InMessage.expect({PLUGIN_PROTOCOL})
        method = rd_string(msg)

        if "TESTPLUGIN_PROTO_REJECT" in os.environ:
            # Test the plugin failing at PLUGIN_PROTOCOL time.
            msg = OutMessage(PLUGIN_PROTOCOL_REJECT)
            wr_string_utf8(msg, os.environ["TESTPLUGIN_PROTO_REJECT"])
            msg.send()
            continue

        # We only support keyboard-interactive. If we supported other
        # auth methods, this would be the place to add further clauses
        # to this if statement for them.
        if method == b"keyboard-interactive":
            msg = OutMessage(PLUGIN_PROTOCOL_ACCEPT)
            msg.send()

            # Inner loop run once per keyboard-interactive exchange
            # with the SSH server.
            while True:
                # Expect a set of prompts from the server, or
                # terminate the loop on SUCCESS or FAILURE.
                #
                # (We could also respond to SUCCESS or FAILURE by
                # updating caches of our own, if we had any that were
                # useful.)
                msg = InMessage.expect({PLUGIN_KI_SERVER_REQUEST,
                                        PLUGIN_AUTH_SUCCESS,
                                        PLUGIN_AUTH_FAILURE})
                if (msg.msgtype == PLUGIN_AUTH_SUCCESS or
                    msg.msgtype == PLUGIN_AUTH_FAILURE):
                    break

                # If we didn't just break, we're sitting on a
                # PLUGIN_KI_SERVER_REQUEST message. Get all its bits
                # and pieces out.
                name = rd_string_utf8(msg)
                instructions = rd_string_utf8(msg)
                language = rd_string(msg)
                nprompts = rd_uint32(msg)
                prompts = []
                for i in range(nprompts):
                    prompt = rd_string_utf8(msg)
                    echo = rd_boolean(msg)
                    prompts.append((prompt, echo))

                # Actually make up some answers for the prompts. This
                # is the part that a non-example implementation would
                # do very differently, of course!
                #
                # Here, we answer "foo" to every prompt, except that
                # if there are exactly two prompts in the packet then
                # we answer "stoat" to the first and "weasel" to the
                # second.
                #
                # (These answers are consistent with the ones required
                # by PuTTY's test SSH server Uppity in its own
                # keyboard-interactive test implementation: that
                # presents a two-prompt packet and expects
                # "stoat","weasel" as the answers, and then presents a
                # zero-prompt packet. So this test plugin will get you
                # through Uppity's k-i in a one-touch manner. The
                # "foo" in this code isn't used by Uppity at all; I
                # just include it because I had to have _some_
                # handling for the else clause.)
                #
                # If TESTPLUGIN_PROMPTS is set in the environment, we
                # ask the user questions of our own by sending them
                # back to PuTTY as USER_REQUEST messages.
                if nprompts == 2:
                    if "TESTPLUGIN_PROMPTS" in os.environ:
                        for i in range(2):
                            # Make up some questions to ask.
                            msg = OutMessage(PLUGIN_KI_USER_REQUEST)
                            wr_string_utf8(
                                msg, "Plugin request #{:d} (name)".format(i))
                            wr_string_utf8(
                                msg, "Plugin request #{:d} (instructions)"
                                .format(i))
                            wr_string(msg, b"")
                            wr_uint32(msg, 2)
                            wr_string_utf8(msg, "Prompt 1 of 2 (echo): ")
                            wr_boolean(msg, True)
                            wr_string_utf8(msg, "Prompt 2 of 2 (no echo): ")
                            wr_boolean(msg, False)
                            msg.send()

                            # Expect the answers.
                            msg = InMessage.expect({PLUGIN_KI_USER_RESPONSE})
                            user_nprompts = rd_uint32(msg)
                            assert user_nprompts == 2, (
                                "Should match what we just sent")
                            for i in range(nprompts):
                                user_response = rd_string_utf8(msg)
                                # We don't actually check these
                                # responses for anything.

                    answers = ["stoat", "weasel"]

                else:
                    answers = ["foo"] * nprompts

                # Send the answers to the SSH server's questions.
                msg = OutMessage(PLUGIN_KI_SERVER_RESPONSE)
                wr_uint32(msg, len(answers))
                for answer in answers:
                    wr_string_utf8(msg, answer)
                msg.send()

        else:
            # Default handler if we don't speak the offered protocol
            # at all.
            msg = OutMessage(PLUGIN_PROTOCOL_REJECT)
            wr_string_utf8(msg, "")
            msg.send()

# Demonstration write to stderr, to prove that it shows up in PuTTY's
# Event Log.
print("Hello from test plugin's stderr", file=sys.stderr)

try:
    protocol()
except PluginEOF:
    pass