summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYves Fischer <yvesf-git@xapek.org>2016-12-28 23:27:59 +0100
committerYves Fischer <yvesf-git@xapek.org>2016-12-29 03:19:09 +0100
commit4e07f8306c855be768d435baac68f14a6bca547b (patch)
treece7b13e8dd6bfeeece1593d9de5cb89bbb845b08
parentba2960846e52e9862a7d0aca69c33d29610276bb (diff)
downloadmp-tool-4e07f8306c855be768d435baac68f14a6bca547b.tar.gz
mp-tool-4e07f8306c855be768d435baac68f14a6bca547b.zip
Add serial communication
-rwxr-xr-xmp-tool135
-rw-r--r--mp_tool/__init__.py117
-rw-r--r--mp_tool/serial.py135
-rw-r--r--mp_tool/util.py65
-rw-r--r--mp_tool/web.py117
-rw-r--r--setup.py2
6 files changed, 383 insertions, 188 deletions
diff --git a/mp-tool b/mp-tool
index b990e27..1419089 100755
--- a/mp-tool
+++ b/mp-tool
@@ -1,88 +1,69 @@
#!/usr/bin/env python3
-import mp_tool
-
-import argparse
-import sys
-
-
-class HelpAction(argparse._HelpAction):
- def __call__(self, parser, namespace, values, option_string=None):
- formatter = parser._get_formatter()
- formatter.add_usage(parser.usage,
- parser._actions,
- parser._mutually_exclusive_groups)
-
- formatter.start_section(parser._optionals.title)
- formatter.add_text(parser._optionals.description)
- formatter.add_arguments(parser._optionals._group_actions)
- formatter.end_section()
-
- subparsers_actions = [
- action for action in parser._actions
- if isinstance(action, argparse._SubParsersAction)]
-
- for subparsers_action in subparsers_actions:
- # get all subparsers and print help
- subparsers = subparsers_action.choices
- for subaction in subparsers_action._get_subactions():
- subparser = subparsers[subaction.dest]
- usage = formatter._format_actions_usage(subparser._actions, [])
- usage_parent = formatter._format_actions_usage(filter(
- lambda a: not (isinstance(a, HelpAction) or isinstance(a, argparse._SubParsersAction)),
- parser._actions), [])
- formatter.start_section("{} {} {} {}".format(formatter._prog,
- usage_parent,
- subaction.dest,
- usage))
- formatter.add_text(subaction.help)
- formatter.add_arguments(subparser._positionals._group_actions)
- formatter.add_arguments(filter(lambda a: not isinstance(a, argparse._HelpAction),
- subparser._optionals._group_actions))
- formatter.end_section()
-
- print(formatter.format_help())
- parser.exit(0)
+from mp_tool.util import HelpAction, format_subcommands_help, get_default_serial_port
+from argparse import ArgumentParser
+from importlib import import_module
+from sys import argv
+import traceback
if __name__ == '__main__':
- parser = argparse.ArgumentParser(add_help=False)
+ default_port = get_default_serial_port()
+
+ parser = ArgumentParser(add_help=False)
parser.add_argument("--help", action=HelpAction, help="Display full help")
- parser.add_argument("--password", action='store', nargs='?')
- parser.add_argument("WEBSOCKET", action='store', nargs=1,
- help="Websocket address (e.g. ws://ESP_E1278E:8266)")
subparsers = parser.add_subparsers()
- parser_eval = subparsers.add_parser("eval", help="Eval python code remotely")
- parser_eval.set_defaults(func=mp_tool.do_eval)
- parser_eval.add_argument("CODE", action='store', nargs=1, help="Code to execute")
-
- parser_repl = subparsers.add_parser("repl", help="Start interactive REPL")
- parser_repl.set_defaults(func=mp_tool.do_repl)
-
- parser_put = subparsers.add_parser("put", help="Send file to remote")
- parser_put.set_defaults(func=mp_tool.do_put)
-
- parser_get = subparsers.add_parser("get", help="Load file from remote")
- parser_put.set_defaults(func=mp_tool.do_get)
-
- args = parser.parse_args(sys.argv[1:])
+ parser_eval = subparsers.add_parser("eval", help="Eval python code over websocket")
+ parser_eval.set_defaults(func=lambda a: import_module('mp_tool.web').eval(a.WEBSOCKET, a.password, a.CODE))
+ parser_eval.add_argument("WEBSOCKET", help="Websocket address (e.g. ws://ESP_E1278E:8266)")
+ parser_eval.add_argument("CODE", help="Code to execute")
+ parser_eval.add_argument("--password")
+
+ parser_eval_serial = subparsers.add_parser("eval-serial", help="Eval python code over serial")
+ parser_eval_serial.set_defaults(func=lambda a: import_module('mp_tool.serial').eval(a.port, a.CODE))
+ parser_eval_serial.add_argument("--port", default=default_port, help="Serial port, default " + default_port)
+ parser_eval_serial.add_argument("CODE", help="Code to execute")
+
+ parser_repl = subparsers.add_parser("repl", help="Interactive REPL over websocket")
+ parser_repl.set_defaults(func=lambda a: import_module('mp_tool.web').repl(a.WEBSOCKET, a.password))
+ parser_repl.add_argument("WEBSOCKET", help="Websocket address (e.g. ws://ESP_E1278E:8266)")
+ parser_repl.add_argument("--password")
+
+ parser_put = subparsers.add_parser("put", help="Send file over websocket")
+ parser_put.set_defaults(func=lambda a: import_module('mp_tool.web').put(a.WEBSOCKET, a.password, a.CODE))
+ parser_put.add_argument("WEBSOCKET", help="Websocket address (e.g. ws://ESP_E1278E:8266)")
+ parser_put.add_argument("--password")
+
+ parser_put_serial = subparsers.add_parser("put-serial", help="Send file over serial")
+ parser_put_serial.set_defaults(func=lambda a: import_module('mp_tool.serial').put(a.port, a.FILE, a.TARGET))
+ parser_put_serial.add_argument("--port", default=default_port, help="Serial port, default " + default_port)
+ parser_put_serial.add_argument("FILE", help="Filename")
+ parser_put_serial.add_argument("TARGET", nargs='?', help="remote target path/filename")
+
+ parser_get = subparsers.add_parser("get", help="Load file over websocket")
+ parser_get.set_defaults(func=lambda a: import_module('mp_tool.web').get(a.WEBSOCKET, a.password))
+ parser_get.add_argument("WEBSOCKET", help="Websocket address (e.g. ws://ESP_E1278E:8266)")
+ parser_get.add_argument("--password")
+
+ parser_get_serial = subparsers.add_parser("get-serial", help="Get file over serial")
+ parser_get_serial.set_defaults(func=lambda a: import_module('mp_tool.serial').get(a.port, a.FILE, a.TARGET))
+ parser_get_serial.add_argument("--port", default=default_port, help="Serial port, default " + default_port)
+ parser_get_serial.add_argument("FILE", help="Filename")
+ parser_get_serial.add_argument("TARGET", nargs='?', help="local target path/filename")
+
+ parser_ls_serial = subparsers.add_parser("ls-serial", help="List files over serial")
+ parser_ls_serial.set_defaults(func=lambda a: import_module('mp_tool.serial').ls(a.port, a.DIR))
+ parser_ls_serial.add_argument("--port", default=default_port, help="Serial port, default " + default_port)
+ parser_ls_serial.add_argument("DIR", default=".", nargs='?', help="List files in this directory (Default '.')")
+
+ args = parser.parse_args(argv[1:])
if 'func' in args:
- args.func(args)
+ try:
+ args.func(args)
+ except Exception as e:
+ print("Call failed with arguments: " + str(args))
+ parser.exit(1, traceback.format_exc())
else:
- subparsers_actions = [
- action for action in parser._actions
- if isinstance(action, argparse._SubParsersAction)]
-
- formatter = parser._get_formatter()
- formatter.add_usage(parser.usage,
- parser._actions,
- parser._mutually_exclusive_groups)
-
- formatter.start_section("Choose subcommand")
- for subparsers_action in subparsers_actions:
- formatter.add_argument(subparsers_action)
- formatter.end_section()
-
- print(formatter.format_help())
+ print(format_subcommands_help(parser))
parser.exit(0)
diff --git a/mp_tool/__init__.py b/mp_tool/__init__.py
index 481ba39..77b8820 100644
--- a/mp_tool/__init__.py
+++ b/mp_tool/__init__.py
@@ -1,112 +1,9 @@
-import websocket
-import tty
-import termios
-from threading import Thread
-from sys import stdout, stdin
-from copy import copy
+class Constants:
+ ENTER_RAW_MODE = b'\x01' # CTRL-A
+ ENTER_REPL_MODE = b'\x02' # CTRL-B
+ INTERRUPT = b'\x03' # CTRL-C
+ CTRL_D = b'\x04' # CTRL-D
+ MARKER_BEGIN = b'>>>>>>>>>>'
+ MARKER_END = b'<<<<<<<<<<'
-
-def connect_and_auth(url, password) -> websocket.WebSocket:
- ws = websocket.create_connection(url, timeout=0.5)
- frame = ws.recv_frame()
-
- if frame.data != b"Password: ":
- raise Exception("Unexpected response: {}".format(frame.data))
- stdout.write(frame.data.decode('utf-8'))
- ws.send(password + "\n")
-
- frame = ws.recv_frame()
- if frame.data.strip() != b"WebREPL connected\r\n>>>":
- raise Exception("Unexpected response: {}".format(frame.data))
- return ws
-
-
-def do_eval(args):
- ws = connect_and_auth(args.WEBSOCKET[0], args.password)
- ws.send("\x02")
- stdout.write(read_until_eval_or_timeout(ws))
- ws.send(args.CODE[0] + "\r\n")
-
- result = read_until_eval_or_timeout(ws)
- stdout.write(result[:-6])
- print("")
- ws.close()
-
-
-def read_until_eval_or_timeout(ws: websocket.WebSocket):
- buf = ""
- while not buf.endswith("\r\n>>> "):
- buf += ws.recv()
- return buf
-
-
-class Reader(Thread):
- def __init__(self, ws):
- Thread.__init__(self)
- self.ws = ws
- self.stop = False
-
- def run(self):
- while True:
- try:
- frame = self.ws.recv_frame()
- stdout.write(frame.data.decode('utf-8'))
- stdout.flush()
- except Exception as e:
- if self.stop:
- break
-
-
-def set_tty_raw_mode(fd):
- saved_mode = termios.tcgetattr(fd)
-
- new_mode = copy(saved_mode)
- new_mode[tty.LFLAG] = new_mode[tty.LFLAG] & ~termios.ECHO
- new_mode[tty.CC][tty.VMIN] = 1
- new_mode[tty.CC][tty.VTIME] = 0
- set_tty_mode(fd, new_mode)
-
- return saved_mode
-
-
-def set_tty_mode(fd, mode):
- termios.tcsetattr(fd, termios.TCSAFLUSH, mode)
-
-
-def do_repl(args):
- print("Type ^[ CTRL-] or CTRL-D to quit")
- ws = connect_and_auth(args.WEBSOCKET[0], args.password)
- ws.send("\x02")
-
- reader = Reader(ws)
- reader.start()
-
- saved_tty_mode = set_tty_raw_mode(stdin.fileno())
- try:
- tty.setraw(stdin.fileno())
- while True:
- try:
- in_char = stdin.read(1)
- if in_char == "\x1d" or in_char == "\x04": # escape char 'Ctrl-]' or CTRL-C
- break
- else:
- ws.send(in_char)
- except KeyboardInterrupt:
- break
- except Exception as _:
- pass
-
- reader.stop = True
- ws.close()
-
- set_tty_mode(stdin.fileno(), saved_tty_mode)
- print("")
-
-
-def do_put(args):
- raise NotImplementedError()
-
-
-def do_get(args):
- raise NotImplementedError()
diff --git a/mp_tool/serial.py b/mp_tool/serial.py
new file mode 100644
index 0000000..c1838f2
--- /dev/null
+++ b/mp_tool/serial.py
@@ -0,0 +1,135 @@
+"""
+Implementation of commands that run against the serial interface of micropython
+"""
+from . import Constants
+
+try:
+ import serial
+except ImportError:
+ print("Could not find pyserial library")
+ raise
+
+import os
+
+
+def eval(port: str, code: str):
+ with serial.Serial(port=port, baudrate=115200) as fh:
+ fh.write(Constants.INTERRUPT)
+ fh.write(Constants.ENTER_REPL_MODE)
+ _, _ = fh.readline(), fh.readline()
+ print(fh.readline().decode('utf-8').strip())
+ print(fh.readline().decode('utf-8').strip())
+
+ fh.write(code.encode('utf-8') + b"\r\n")
+ fh.flush()
+
+ buf = fh.read(1)
+ i = 0
+ while not buf.endswith(b"\r\n>>> "):
+ buf += fh.read(1)
+ i += 1
+ if i > 300:
+ raise Exception("Exceed number of bytes while seeking for end of output")
+ print(buf.decode('utf-8')[:-6])
+
+
+def ls(port: str, directory: str):
+ with serial.Serial(port=port, baudrate=115200) as fh:
+ fh.write(Constants.INTERRUPT) # ctrl-c interrupt
+ fh.write(Constants.ENTER_RAW_MODE) # ctrl-a raw-mode
+ fh.write(b"import os\r\n")
+ fh.write(b"print()\r\n")
+ fh.write(b"print('" + Constants.MARKER_BEGIN + b"')\r\n")
+ fh.write(b"try:")
+ fh.write(b" print('\\n'.join(os.listdir(" + repr(directory).encode('utf-8') + b")))\r\n")
+ fh.write(b"except OSError as e:\r\n")
+ fh.write(b" print(str(e))\r\n")
+ fh.write(b"print('" + Constants.MARKER_END + b"')\r\n")
+ fh.write(b"print()\r\n")
+ fh.write(Constants.CTRL_D)
+ fh.flush()
+ fh.reset_input_buffer()
+
+ _line_ok = fh.readline()
+ if fh.readline().strip() != Constants.MARKER_BEGIN:
+ raise Exception('Failed to find begin marker')
+
+ line = fh.readline()
+ while line.strip() != Constants.MARKER_END:
+ print(line.strip().decode('utf-8'))
+ line = fh.readline()
+
+ fh.write(Constants.ENTER_REPL_MODE)
+
+
+def get(port: str, remote_filename: str, target: str):
+ if target:
+ if os.path.isdir(target):
+ local_filename = os.path.join(target, os.path.basename(remote_filename))
+ else:
+ local_filename = target
+ else:
+ local_filename = os.path.basename(remote_filename)
+
+ with serial.Serial(port=port, baudrate=115200) as fh:
+ fh.write(Constants.INTERRUPT) # ctrl-c interrupt
+ fh.write(Constants.ENTER_RAW_MODE) # ctrl-a raw-mode
+ fh.write(b"import sys\r\n")
+ fh.write(b"import os\r\n")
+ fh.write(b"print()\r\n")
+ fh.write(b"print('" + Constants.MARKER_BEGIN + b"')\r\n")
+ fh.write(b"try:\r\n")
+ fh.write(" print(os.stat({})[6])\r\n".format(repr(remote_filename)).encode('utf-8'))
+ fh.write(b"except OSError:\r\n")
+ fh.write(b" print('-1')\r\n")
+ fh.write(b"print('" + Constants.MARKER_END + b"')\r\n")
+ fh.write("with open({}, 'rb') as fh:\r\n".format(repr(remote_filename)).encode('utf-8'))
+ # use sys.stdout.buffer to avoid cr to crlf conversion
+ fh.write(b" sys.stdout.buffer.write(fh.read())\r\n")
+ fh.write(b"print()\r\n")
+ fh.write(Constants.CTRL_D)
+ fh.flush()
+ fh.reset_input_buffer()
+
+ _line_ok = fh.readline()
+
+ if fh.readline().strip() != Constants.MARKER_BEGIN:
+ raise Exception('Failed to find begin marker')
+
+ length = int(fh.readline().strip().decode('utf-8'))
+ if fh.readline().strip() != Constants.MARKER_END:
+ raise Exception("Failed to read end marker value")
+
+ if length == -1:
+ raise Exception("Failed to read file {}".format(remote_filename))
+
+ print("File length: {}".format(length))
+
+ with open(local_filename, 'wb') as fh_out:
+ bytes_processed = fh_out.write(fh.read(length))
+ print("{} bytes written to {}".format(bytes_processed, local_filename))
+
+ fh.write(Constants.ENTER_REPL_MODE)
+
+
+def put(port: str, local_filename: str, target: str):
+ if target:
+ remote_filename = os.path.join(target, local_filename)
+ else:
+ remote_filename = os.path.basename(local_filename)
+
+ with open(local_filename, 'br') as file_fh:
+ data = file_fh.read()
+
+ with serial.Serial(port=port, baudrate=115200) as fh:
+ fh.write(Constants.INTERRUPT) # ctrl-c interrupt
+ fh.write(Constants.ENTER_RAW_MODE) # ctrl-a raw-mode
+ fh.write(b"import sys\r\n")
+ fh.write("with open({}, 'wb') as fh:\r\n".format(repr(remote_filename)).encode('utf-8'))
+ if len(data) == 0:
+ fh.write(b" pass\r\n")
+ else:
+ fh.write(" fh.write(sys.stdin.buffer.read({}))\r\n".format(len(data)).encode('utf-8'))
+ fh.write(Constants.CTRL_D)
+ fh.write(data)
+ fh.write(Constants.ENTER_REPL_MODE)
diff --git a/mp_tool/util.py b/mp_tool/util.py
new file mode 100644
index 0000000..7e3f610
--- /dev/null
+++ b/mp_tool/util.py
@@ -0,0 +1,65 @@
+import argparse
+import platform
+
+class HelpAction(argparse._HelpAction):
+ def __call__(self, parser, namespace, values, option_string=None):
+ formatter = parser._get_formatter()
+ formatter.add_usage(parser.usage,
+ parser._actions,
+ parser._mutually_exclusive_groups)
+
+ formatter.start_section(parser._optionals.title)
+ formatter.add_text(parser._optionals.description)
+ formatter.add_arguments(parser._optionals._group_actions)
+ formatter.end_section()
+
+ subparsers_actions = [
+ action for action in parser._actions
+ if isinstance(action, argparse._SubParsersAction)]
+
+ for subparsers_action in subparsers_actions:
+ # get all subparsers and print help
+ subparsers = subparsers_action.choices
+ for subaction in subparsers_action._get_subactions():
+ subparser = subparsers[subaction.dest]
+ usage = formatter._format_actions_usage(subparser._actions, [])
+ usage_parent = formatter._format_actions_usage(filter(
+ lambda a: not (isinstance(a, HelpAction) or isinstance(a, argparse._SubParsersAction)),
+ parser._actions), [])
+ formatter.start_section("{} {} {} {}".format(formatter._prog,
+ usage_parent,
+ subaction.dest,
+ usage))
+ formatter.add_text(subaction.help)
+ formatter.add_arguments(subparser._positionals._group_actions)
+ formatter.add_arguments(filter(lambda a: not isinstance(a, argparse._HelpAction),
+ subparser._optionals._group_actions))
+ formatter.end_section()
+
+ print(formatter.format_help())
+ parser.exit(0)
+
+
+def format_subcommands_help(parser: argparse.ArgumentParser):
+ subparsers_actions = [
+ action for action in parser._actions
+ if isinstance(action, argparse._SubParsersAction)]
+
+ formatter = parser._get_formatter()
+ formatter.add_usage(parser.usage,
+ parser._actions,
+ parser._mutually_exclusive_groups)
+
+ formatter.start_section("Choose subcommand")
+ for subparsers_action in subparsers_actions:
+ formatter.add_argument(subparsers_action)
+ formatter.end_section()
+
+ return formatter.format_help()
+
+
+def get_default_serial_port() -> str:
+ if platform.system() == "Windows":
+ return 'COM3'
+ else:
+ return '/dev/ttyUSB0'
diff --git a/mp_tool/web.py b/mp_tool/web.py
new file mode 100644
index 0000000..6dcccfb
--- /dev/null
+++ b/mp_tool/web.py
@@ -0,0 +1,117 @@
+"""
+Implementation of commands that run against the websocket interface of micropython
+"""
+from . import Constants
+
+import websocket
+
+import tty
+import termios
+from threading import Thread
+from sys import stdout, stdin
+from copy import copy
+
+
+def get(url: str, password: str):
+ raise NotImplementedError()
+
+
+def put(url: str, password: str):
+ raise NotImplementedError()
+
+
+def connect_and_auth(url, password) -> websocket.WebSocket:
+ ws = websocket.create_connection(url, timeout=0.5)
+ frame = ws.recv_frame()
+
+ if frame.data != b"Password: ":
+ raise Exception("Unexpected response: {}".format(frame.data))
+ stdout.write(frame.data.decode('utf-8'))
+ ws.send(password + "\n")
+
+ frame = ws.recv_frame()
+ if frame.data.strip() != b"WebREPL connected\r\n>>>":
+ raise Exception("Unexpected response: {}".format(frame.data))
+ return ws
+
+
+def eval(url: str, password: str, code: str):
+ ws = connect_and_auth(url, password)
+ ws.send(Constants.ENTER_REPL_MODE)
+ stdout.write(read_until_eval_or_timeout(ws))
+ ws.send(code + "\r\n")
+
+ result = read_until_eval_or_timeout(ws)
+ stdout.write(result[:-6])
+ print("")
+ ws.close()
+
+
+def read_until_eval_or_timeout(ws: websocket.WebSocket):
+ buf = ""
+ while not buf.endswith("\r\n>>> "):
+ buf += ws.recv()
+ return buf
+
+
+class Reader(Thread):
+ def __init__(self, ws):
+ Thread.__init__(self)
+ self.ws = ws
+ self.stop = False
+
+ def run(self):
+ while True:
+ try:
+ frame = self.ws.recv_frame()
+ stdout.write(frame.data.decode('utf-8'))
+ stdout.flush()
+ except Exception as e:
+ if self.stop:
+ break
+
+
+def set_tty_raw_mode(fd):
+ saved_mode = termios.tcgetattr(fd)
+
+ new_mode = copy(saved_mode)
+ new_mode[tty.LFLAG] = new_mode[tty.LFLAG] & ~termios.ECHO
+ new_mode[tty.CC][tty.VMIN] = 1
+ new_mode[tty.CC][tty.VTIME] = 0
+ set_tty_mode(fd, new_mode)
+
+ return saved_mode
+
+
+def set_tty_mode(fd, mode):
+ termios.tcsetattr(fd, termios.TCSAFLUSH, mode)
+
+
+def repl(url: str, password: str):
+ print("Type ^[ CTRL-] or CTRL-D to quit")
+ ws = connect_and_auth(url, password)
+ ws.send("\x02")
+
+ reader = Reader(ws)
+ reader.start()
+
+ saved_tty_mode = set_tty_raw_mode(stdin.fileno())
+ try:
+ tty.setraw(stdin.fileno())
+ while True:
+ try:
+ in_char = stdin.read(1)
+ if in_char == "\x1d" or in_char == "\x04": # escape char 'Ctrl-]' or CTRL-C
+ break
+ else:
+ ws.send(in_char)
+ except KeyboardInterrupt:
+ break
+ except Exception as _:
+ pass
+
+ reader.stop = True
+ ws.close()
+
+ set_tty_mode(stdin.fileno(), saved_tty_mode)
+ print("")
diff --git a/setup.py b/setup.py
index facc0a5..886c2d6 100644
--- a/setup.py
+++ b/setup.py
@@ -10,7 +10,7 @@ setup(name='mp-tool',
packages=['mp_tool'],
scripts=['mp-tool'],
url='https://example.com/',
- install_requires=['websocket_client==0.40.0'],
+ install_requires=['websocket_client==0.40.0','pyserial==3.2.1'],
tests_require=[],
classifiers=[
"Programming Language :: Python",