Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/mRemoteNG/PuTTYNG.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/authplugin-example.py')
-rwxr-xr-xcontrib/authplugin-example.py287
1 files changed, 287 insertions, 0 deletions
diff --git a/contrib/authplugin-example.py b/contrib/authplugin-example.py
new file mode 100755
index 00000000..395bd2c8
--- /dev/null
+++ b/contrib/authplugin-example.py
@@ -0,0 +1,287 @@
+#!/usr/bin/env python3
+
+# This is a demonstration example of how to write a
+# keyboard-interactive authentication helper plugin using PuTTY's
+# protocol for involving it in SSH connection setup.
+
+import io
+import os
+import struct
+import sys
+
+# Exception class we'll use to get a clean exit on EOF.
+class PluginEOF(Exception): pass
+
+# ----------------------------------------------------------------------
+#
+# Marshalling and unmarshalling routines to write and read the
+# necessary SSH data types to/from a binary file handle (which can
+# include an io.BytesIO if you need to encode/decode in-process).
+#
+# Error handling is a totally ad-hoc mixture of 'assert' and just
+# assuming things will have the right type, or be the right length of
+# tuple, or be valid UTF-8. So it should be _robust_, in the sense
+# that you'll get a Python exception if anything fails. But no
+# sensible error reporting or recovery is implemented.
+#
+# That should be good enough, because PuTTY will log the plugin's
+# standard error in its Event Log, so if the plugin crashes, you'll be
+# able to retrieve the traceback.
+
+def wr_byte(fh, b):
+ assert 0 <= b < 0x100
+ fh.write(bytes([b]))
+
+def wr_boolean(fh, b):
+ wr_byte(fh, 1 if b else 0)
+
+def wr_uint32(fh, u):
+ assert 0 <= u < 0x100000000
+ fh.write(struct.pack(">I", u))
+
+def wr_string(fh, s):
+ wr_uint32(fh, len(s))
+ fh.write(s)
+
+def wr_string_utf8(fh, s):
+ wr_string(fh, s.encode("UTF-8"))
+
+def rd_n(fh, n):
+ data = fh.read(n)
+ if len(data) < n:
+ raise PluginEOF()
+ return data
+
+def rd_byte(fh):
+ return rd_n(fh, 1)[0]
+
+def rd_boolean(fh):
+ return rd_byte(fh) != 0
+
+def rd_uint32(fh):
+ return struct.unpack(">I", rd_n(fh, 4))[0]
+
+def rd_string(fh):
+ length = rd_uint32(fh)
+ return rd_n(fh, length)
+
+def rd_string_utf8(fh):
+ return rd_string(fh).decode("UTF-8")
+
+# ----------------------------------------------------------------------
+#
+# Protocol definitions.
+
+our_max_version = 2
+
+PLUGIN_INIT = 1
+PLUGIN_INIT_RESPONSE = 2
+PLUGIN_PROTOCOL = 3
+PLUGIN_PROTOCOL_ACCEPT = 4
+PLUGIN_PROTOCOL_REJECT = 5
+PLUGIN_AUTH_SUCCESS = 6
+PLUGIN_AUTH_FAILURE = 7
+PLUGIN_INIT_FAILURE = 8
+PLUGIN_KI_SERVER_REQUEST = 20
+PLUGIN_KI_SERVER_RESPONSE = 21
+PLUGIN_KI_USER_REQUEST = 22
+PLUGIN_KI_USER_RESPONSE = 23
+
+# ----------------------------------------------------------------------
+#
+# Classes to make it easy to construct and receive messages.
+#
+# OutMessage is constructed with the message type; then you use the
+# wr_foo() routines to add fields to it, and finally call its send()
+# method.
+#
+# InMessage is constructed via the expect() class method, to which you
+# give a list of message types you expect to see one of at this stage.
+# Once you've got one, you can rd_foo() fields from it.
+
+class OutMessage:
+ def __init__(self, msgtype):
+ self.buf = io.BytesIO()
+ wr_byte(self.buf, msgtype)
+ self.write = self.buf.write
+
+ def send(self, fh=sys.stdout.buffer):
+ wr_string(fh, self.buf.getvalue())
+ fh.flush()
+
+class InMessage:
+ @classmethod
+ def expect(cls, expected_types, fh=sys.stdin.buffer):
+ self = cls()
+ self.buf = io.BytesIO(rd_string(fh))
+ self.msgtype = rd_byte(self.buf)
+ self.read = self.buf.read
+
+ if self.msgtype not in expected_types:
+ raise ValueError("received packet type {:d}, expected {}".format(
+ self.msgtype, ",".join(map("{:d}".format,
+ sorted(expected_types)))))
+ return self
+
+# ----------------------------------------------------------------------
+#
+# The main implementation of the protocol.
+
+def protocol():
+ # Start by expecting PLUGIN_INIT.
+ msg = InMessage.expect({PLUGIN_INIT})
+ their_version = rd_uint32(msg)
+ hostname = rd_string_utf8(msg)
+ port = rd_uint32(msg)
+ username = rd_string_utf8(msg)
+ print(f"Got hostname {hostname!r}, port {port!r}", file=sys.stderr)
+
+ # Decide which protocol version we're speaking.
+ version = min(their_version, our_max_version)
+ assert version != 0, "Protocol version 0 does not exist"
+
+ if "TESTPLUGIN_INIT_FAIL" in os.environ:
+ # Test the plugin failing at startup time.
+ msg = OutMessage(PLUGIN_INIT_FAILURE)
+ wr_string_utf8(msg, os.environ["TESTPLUGIN_INIT_FAIL"])
+ msg.send()
+ return
+
+ # Send INIT_RESPONSE, with our protocol version and an overridden
+ # username.
+ #
+ # By default this test plugin doesn't override the username, but
+ # you can make it do so by setting TESTPLUGIN_USERNAME in the
+ # environment.
+ msg = OutMessage(PLUGIN_INIT_RESPONSE)
+ wr_uint32(msg, version)
+ wr_string_utf8(msg, os.environ.get("TESTPLUGIN_USERNAME", ""))
+ msg.send()
+
+ # Outer loop run once per authentication protocol.
+ while True:
+ # Expect a message telling us what the protocol is.
+ msg = InMessage.expect({PLUGIN_PROTOCOL})
+ method = rd_string(msg)
+
+ if "TESTPLUGIN_PROTO_REJECT" in os.environ:
+ # Test the plugin failing at PLUGIN_PROTOCOL time.
+ msg = OutMessage(PLUGIN_PROTOCOL_REJECT)
+ wr_string_utf8(msg, os.environ["TESTPLUGIN_PROTO_REJECT"])
+ msg.send()
+ continue
+
+ # We only support keyboard-interactive. If we supported other
+ # auth methods, this would be the place to add further clauses
+ # to this if statement for them.
+ if method == b"keyboard-interactive":
+ msg = OutMessage(PLUGIN_PROTOCOL_ACCEPT)
+ msg.send()
+
+ # Inner loop run once per keyboard-interactive exchange
+ # with the SSH server.
+ while True:
+ # Expect a set of prompts from the server, or
+ # terminate the loop on SUCCESS or FAILURE.
+ #
+ # (We could also respond to SUCCESS or FAILURE by
+ # updating caches of our own, if we had any that were
+ # useful.)
+ msg = InMessage.expect({PLUGIN_KI_SERVER_REQUEST,
+ PLUGIN_AUTH_SUCCESS,
+ PLUGIN_AUTH_FAILURE})
+ if (msg.msgtype == PLUGIN_AUTH_SUCCESS or
+ msg.msgtype == PLUGIN_AUTH_FAILURE):
+ break
+
+ # If we didn't just break, we're sitting on a
+ # PLUGIN_KI_SERVER_REQUEST message. Get all its bits
+ # and pieces out.
+ name = rd_string_utf8(msg)
+ instructions = rd_string_utf8(msg)
+ language = rd_string(msg)
+ nprompts = rd_uint32(msg)
+ prompts = []
+ for i in range(nprompts):
+ prompt = rd_string_utf8(msg)
+ echo = rd_boolean(msg)
+ prompts.append((prompt, echo))
+
+ # Actually make up some answers for the prompts. This
+ # is the part that a non-example implementation would
+ # do very differently, of course!
+ #
+ # Here, we answer "foo" to every prompt, except that
+ # if there are exactly two prompts in the packet then
+ # we answer "stoat" to the first and "weasel" to the
+ # second.
+ #
+ # (These answers are consistent with the ones required
+ # by PuTTY's test SSH server Uppity in its own
+ # keyboard-interactive test implementation: that
+ # presents a two-prompt packet and expects
+ # "stoat","weasel" as the answers, and then presents a
+ # zero-prompt packet. So this test plugin will get you
+ # through Uppity's k-i in a one-touch manner. The
+ # "foo" in this code isn't used by Uppity at all; I
+ # just include it because I had to have _some_
+ # handling for the else clause.)
+ #
+ # If TESTPLUGIN_PROMPTS is set in the environment, we
+ # ask the user questions of our own by sending them
+ # back to PuTTY as USER_REQUEST messages.
+ if nprompts == 2:
+ if "TESTPLUGIN_PROMPTS" in os.environ:
+ for i in range(2):
+ # Make up some questions to ask.
+ msg = OutMessage(PLUGIN_KI_USER_REQUEST)
+ wr_string_utf8(
+ msg, "Plugin request #{:d} (name)".format(i))
+ wr_string_utf8(
+ msg, "Plugin request #{:d} (instructions)"
+ .format(i))
+ wr_string(msg, b"")
+ wr_uint32(msg, 2)
+ wr_string_utf8(msg, "Prompt 1 of 2 (echo): ")
+ wr_boolean(msg, True)
+ wr_string_utf8(msg, "Prompt 2 of 2 (no echo): ")
+ wr_boolean(msg, False)
+ msg.send()
+
+ # Expect the answers.
+ msg = InMessage.expect({PLUGIN_KI_USER_RESPONSE})
+ user_nprompts = rd_uint32(msg)
+ assert user_nprompts == 2, (
+ "Should match what we just sent")
+ for i in range(nprompts):
+ user_response = rd_string_utf8(msg)
+ # We don't actually check these
+ # responses for anything.
+
+ answers = ["stoat", "weasel"]
+
+ else:
+ answers = ["foo"] * nprompts
+
+ # Send the answers to the SSH server's questions.
+ msg = OutMessage(PLUGIN_KI_SERVER_RESPONSE)
+ wr_uint32(msg, len(answers))
+ for answer in answers:
+ wr_string_utf8(msg, answer)
+ msg.send()
+
+ else:
+ # Default handler if we don't speak the offered protocol
+ # at all.
+ msg = OutMessage(PLUGIN_PROTOCOL_REJECT)
+ wr_string_utf8(msg, "")
+ msg.send()
+
+# Demonstration write to stderr, to prove that it shows up in PuTTY's
+# Event Log.
+print("Hello from test plugin's stderr", file=sys.stderr)
+
+try:
+ protocol()
+except PluginEOF:
+ pass