micropython/extmod/webrepl/webrepl.py

83 lines
2.1 KiB
Python

# This module should be imported from REPL, not run from command line.
import socket
import uos
import network
import uwebsocket
import websocket_helper
import _webrepl
listen_s = None
client_s = None
def setup_conn(port, accept_handler):
global listen_s
listen_s = socket.socket()
listen_s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ai = socket.getaddrinfo("0.0.0.0", port)
addr = ai[0][4]
listen_s.bind(addr)
listen_s.listen(1)
if accept_handler:
listen_s.setsockopt(socket.SOL_SOCKET, 20, accept_handler)
for i in (network.AP_IF, network.STA_IF):
iface = network.WLAN(i)
if iface.active():
print("WebREPL daemon started on ws://%s:%d" % (iface.ifconfig()[0], port))
return listen_s
def accept_conn(listen_sock):
global client_s
cl, remote_addr = listen_sock.accept()
prev = uos.dupterm(None)
uos.dupterm(prev)
if prev:
print("\nConcurrent WebREPL connection from", remote_addr, "rejected")
cl.close()
return
print("\nWebREPL connection from:", remote_addr)
client_s = cl
websocket_helper.server_handshake(cl)
ws = uwebsocket.websocket(cl, True)
ws = _webrepl._webrepl(ws)
cl.setblocking(False)
# notify REPL on socket incoming data (ESP32/ESP8266-only)
if hasattr(uos, "dupterm_notify"):
cl.setsockopt(socket.SOL_SOCKET, 20, uos.dupterm_notify)
uos.dupterm(ws)
def stop():
global listen_s, client_s
uos.dupterm(None)
if client_s:
client_s.close()
if listen_s:
listen_s.close()
def start(port=8266, password=None):
stop()
if password is None:
try:
import webrepl_cfg
_webrepl.password(webrepl_cfg.PASS)
setup_conn(port, accept_conn)
print("Started webrepl in normal mode")
except:
print("WebREPL is not configured, run 'import webrepl_setup'")
else:
_webrepl.password(password)
setup_conn(port, accept_conn)
print("Started webrepl in manual override mode")
def start_foreground(port=8266):
stop()
s = setup_conn(port, None)
accept_conn(s)