Welcome to mirror list, hosted at ThFree Co, Russian Federation.

console.py « klippy - github.com/Klipper3d/klipper.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 4782702a1bdd52f0cbb3297164da16187d85e405 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/env python
# Script to implement a test console with firmware over serial port
#
# Copyright (C) 2016  Kevin O'Connor <kevin@koconnor.net>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import sys, optparse, os, re, logging

import reactor, serialhdl, pins, util, msgproto

re_eval = re.compile(r'\{(?P<eval>[^}]*)\}')

class KeyboardReader:
    def __init__(self, ser, reactor):
        self.ser = ser
        self.reactor = reactor
        self.fd = sys.stdin.fileno()
        util.set_nonblock(self.fd)
        self.pins = None
        self.data = ""
        self.reactor.register_fd(self.fd, self.process_kbd)
        self.local_commands = { "PINS": self.set_pin_map }
        self.eval_globals = {}
    def update_evals(self, eventtime):
        f = self.ser.msgparser.config.get('CLOCK_FREQ', 1)
        c = (eventtime - self.ser.last_ack_time) * f + self.ser.last_ack_clock
        self.eval_globals['freq'] = f
        self.eval_globals['clock'] = int(c)
    def set_pin_map(self, parts):
        mcu = self.ser.msgparser.config['MCU']
        self.pins = pins.map_pins(parts[1], mcu)
    def lookup_pin(self, value):
        if self.pins is None:
            self.pins = pins.mcu_to_pins(self.ser.msgparser.config['MCU'])
        return self.pins[value]
    def translate(self, line, eventtime):
        evalparts = re_eval.split(line)
        if len(evalparts) > 1:
            self.update_evals(eventtime)
            try:
                for i in range(1, len(evalparts), 2):
                    evalparts[i] = str(eval(evalparts[i], self.eval_globals))
            except:
                print "Unable to evaluate: ", line
                return None
            line = ''.join(evalparts)
            print "Eval:", line
        if self.pins is None and self.ser.msgparser.config:
            self.pins = pins.mcu_to_pins(self.ser.msgparser.config['MCU'])
        if self.pins is not None:
            try:
                line = pins.update_command(line, self.pins).strip()
            except:
                print "Unable to map pin: ", line
                return None
        if line:
            parts = line.split()
            if parts[0] in self.local_commands:
                self.local_commands[parts[0]](parts)
                return None
        try:
            msg = self.ser.msgparser.create_command(line)
        except msgproto.error, e:
            print "Error:", e
            return None
        return msg
    def process_kbd(self, eventtime):
        self.data += os.read(self.fd, 4096)

        kbdlines = self.data.split('\n')
        for line in kbdlines[:-1]:
            line = line.strip()
            cpos = line.find('#')
            if cpos >= 0:
                line = line[:cpos]
                if not line:
                    continue
            msg = self.translate(line.strip(), eventtime)
            if msg is None:
                continue
            self.ser.send(msg)
        self.data = kbdlines[-1]

def main():
    usage = "%prog [options] <serialdevice> <baud>"
    opts = optparse.OptionParser(usage)
    options, args = opts.parse_args()
    serialport, baud = args
    baud = int(baud)

    logging.basicConfig(level=logging.DEBUG)
    r = reactor.Reactor()
    ser = serialhdl.SerialReader(r, serialport, baud)
    ser.connect()
    kbd = KeyboardReader(ser, r)
    try:
        r.run()
    except KeyboardInterrupt:
        sys.stdout.write("\n")

if __name__ == '__main__':
    main()