Merge pull request #208 from pimoroni/picowireless-ppwhttp
PicoWireless: move HTTP code to ppwhttp library
This commit is contained in:
commit
3c57cbcdef
|
@ -0,0 +1 @@
|
|||
secrets.py
|
|
@ -1,119 +1,60 @@
|
|||
import time
|
||||
import picowireless
|
||||
import json
|
||||
try:
|
||||
import xmltok # https://pypi.org/project/micropython-xmltok/
|
||||
import io
|
||||
except ImportError:
|
||||
xmltok = None
|
||||
from micropython import const
|
||||
|
||||
WIFI_SSID = "Your SSID here!"
|
||||
WIFI_PASS = "Your PSK here!"
|
||||
try:
|
||||
import ppwhttp
|
||||
except ImportError:
|
||||
raise RuntimeError("Cannot find ppwhttp. Have you copied ppwhttp.py to your Pico?")
|
||||
|
||||
CLOUDFLARE_DNS = (1, 1, 1, 1)
|
||||
GOOGLE_DNS = (8, 8, 8, 8)
|
||||
USE_DNS = CLOUDFLARE_DNS
|
||||
|
||||
TCP_MODE = const(0)
|
||||
HTTP_REQUEST_DELAY = const(30)
|
||||
HTTP_PORT = 80
|
||||
HTTP_REQUEST_DELAY = const(60)
|
||||
HTTP_REQUEST_PORT = const(80)
|
||||
HTTP_REQUEST_HOST = "api.thingspeak.com"
|
||||
HTTP_REQUEST_PATH = "/channels/1417/field/2/last.txt"
|
||||
|
||||
|
||||
def connect(host_address, port, client_sock, timeout=1000):
|
||||
picowireless.client_start(host_address, port, client_sock, TCP_MODE)
|
||||
|
||||
t_start = time.time()
|
||||
timeout /= 1000.0
|
||||
|
||||
while time.time() - t_start < timeout:
|
||||
state = picowireless.get_client_state(client_sock)
|
||||
if state == 4:
|
||||
return True
|
||||
time.sleep(1.0)
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def http_request(client_sock, host_address, port, request_host, request_path, handler, timeout=5000):
|
||||
print("Connecting to {1}.{2}.{3}.{4}:{0}...".format(port, *host_address))
|
||||
if not connect(host_address, port, client_sock):
|
||||
print("Connection failed!")
|
||||
return False
|
||||
print("Connected!")
|
||||
|
||||
http_request = """GET {} HTTP/1.1
|
||||
Host: {}
|
||||
Connection: close
|
||||
|
||||
""".format(request_path, request_host).replace("\n", "\r\n")
|
||||
|
||||
picowireless.send_data(client_sock, http_request)
|
||||
|
||||
t_start = time.time()
|
||||
|
||||
while True:
|
||||
if time.time() - t_start > timeout:
|
||||
picowireless.client_stop(client_sock)
|
||||
print("HTTP request to {}:{} timed out...".format(host_address, port))
|
||||
return False
|
||||
|
||||
avail_length = picowireless.avail_data(client_sock)
|
||||
if avail_length > 0:
|
||||
break
|
||||
|
||||
print("Got response: {} bytes".format(avail_length))
|
||||
|
||||
response = b""
|
||||
|
||||
while len(response) < avail_length:
|
||||
data = picowireless.get_data_buf(client_sock)
|
||||
response += data
|
||||
|
||||
response = response.decode("utf-8")
|
||||
|
||||
head, body = response.split("\r\n\r\n", 1)
|
||||
dhead = {}
|
||||
|
||||
for line in head.split("\r\n")[1:]:
|
||||
key, value = line.split(": ", 1)
|
||||
dhead[key] = value
|
||||
|
||||
handler(dhead, body)
|
||||
|
||||
picowireless.client_stop(client_sock)
|
||||
|
||||
|
||||
picowireless.init()
|
||||
|
||||
print("Connecting to {}...".format(WIFI_SSID))
|
||||
picowireless.wifi_set_passphrase(WIFI_SSID, WIFI_PASS)
|
||||
|
||||
while True:
|
||||
if picowireless.get_connection_status() == 3:
|
||||
break
|
||||
print("Connected!")
|
||||
ppwhttp.start_wifi()
|
||||
ppwhttp.set_dns(ppwhttp.GOOGLE_DNS)
|
||||
|
||||
# Get our own local IP!
|
||||
my_ip = picowireless.get_ip_address()
|
||||
my_ip = ppwhttp.get_ip_address()
|
||||
print("Local IP: {}.{}.{}.{}".format(*my_ip))
|
||||
|
||||
# Resolve and cache the IP address
|
||||
picowireless.set_dns(USE_DNS)
|
||||
http_address = picowireless.get_host_by_name(HTTP_REQUEST_HOST)
|
||||
print("Resolved {} to {}.{}.{}.{}".format(HTTP_REQUEST_HOST, *http_address))
|
||||
|
||||
client_sock = picowireless.get_socket()
|
||||
|
||||
|
||||
def handler(head, body):
|
||||
if head["Status"] == "200 OK":
|
||||
color = body[1:]
|
||||
if HTTP_REQUEST_PATH.endswith(".json"):
|
||||
# Parse as JSON
|
||||
data = json.loads(body)
|
||||
color = data['field2'][1:]
|
||||
|
||||
elif HTTP_REQUEST_PATH.endswith(".xml") and xmltok is not None:
|
||||
# Parse as XML
|
||||
data = xmltok.tokenize(io.StringIO(body))
|
||||
color = xmltok.text_of(data, "field2")[1:]
|
||||
|
||||
elif HTTP_REQUEST_PATH.endswith(".txt"):
|
||||
# Parse as plain text
|
||||
color = body[1:]
|
||||
|
||||
else:
|
||||
print("Unable to parse API response!")
|
||||
return
|
||||
r = int(color[0:2], 16)
|
||||
g = int(color[2:4], 16)
|
||||
b = int(color[4:6], 16)
|
||||
picowireless.set_led(r, g, b)
|
||||
ppwhttp.set_led(r, g, b)
|
||||
print("Set LED to {} {} {}".format(r, g, b))
|
||||
else:
|
||||
print("Error: {}".format(head["Status"]))
|
||||
|
||||
|
||||
while True:
|
||||
http_request(client_sock, http_address, HTTP_PORT, HTTP_REQUEST_HOST, HTTP_REQUEST_PATH, handler)
|
||||
time.sleep(60.0)
|
||||
ppwhttp.http_request(HTTP_REQUEST_HOST, HTTP_REQUEST_PORT, HTTP_REQUEST_HOST, HTTP_REQUEST_PATH, handler)
|
||||
time.sleep(HTTP_REQUEST_DELAY)
|
||||
|
|
|
@ -0,0 +1,123 @@
|
|||
import time
|
||||
import plasma
|
||||
|
||||
try:
|
||||
import ppwhttp
|
||||
except ImportError:
|
||||
raise RuntimeError("Cannot find ppwhttp. Have you copied ppwhttp.py to your Pico?")
|
||||
|
||||
"""
|
||||
This example uses the Plasma WS2812 LED library to drive a string of LEDs alongside the built-in RGB LED.
|
||||
You should wire your LEDs to VBUS/GND and connect the data pin to pin 27 (unused by Pico Wireless).
|
||||
|
||||
Go to: https://<address>:<port>/set_led/<index> to set a single LED
|
||||
"""
|
||||
|
||||
NUM_LEDS = 30 # Number of connected LEDs
|
||||
LED_PIN = 27 # LED data pin (27 is unused by Pico Wireless)
|
||||
LED_PIO = 0 # Hardware PIO (0 or 1)
|
||||
LED_SM = 0 # PIO State-Machine (0 to 3)
|
||||
|
||||
|
||||
r = 0
|
||||
g = 0
|
||||
b = 0
|
||||
|
||||
led_strip = plasma.WS2812(NUM_LEDS, LED_PIO, LED_SM, LED_PIN)
|
||||
|
||||
|
||||
# Edit your routes here
|
||||
# Nothing fancy is supported, just plain ol' URLs and GET/POST methods
|
||||
@ppwhttp.route("/", methods=["GET", "POST"])
|
||||
def get_home(method, url, data):
|
||||
if method == "POST":
|
||||
global r, g, b
|
||||
r = int(data.get("r", 0))
|
||||
g = int(data.get("g", 0))
|
||||
b = int(data.get("b", 0))
|
||||
ppwhttp.set_led(r, g, b)
|
||||
for i in range(NUM_LEDS):
|
||||
led_strip.set_rgb(i, r, g, b)
|
||||
print("Set LED to {} {} {}".format(r, g, b))
|
||||
|
||||
return """<form method="post" action="/">
|
||||
<input id="r" name="r" type="number" value="{r}" />
|
||||
<input name="g" type="number" value="{g}" />
|
||||
<input name="b" type="number" value="{b}" />
|
||||
<input type="submit" value="Set LED" />
|
||||
</form>""".format(r=r, g=g, b=b)
|
||||
|
||||
|
||||
# This wildcard route allows us to visit eg `/set_led/<index>`
|
||||
# to get/set the state of LED <index>
|
||||
# You should *probably* not modify state with GET, even though you can
|
||||
# so we use a form and POST to handle changing things.
|
||||
@ppwhttp.route("/set_led/<int:index>", methods=["GET", "POST"])
|
||||
def set_led(method, url, data):
|
||||
i = int(data.get("index", 0))
|
||||
|
||||
if method == "POST":
|
||||
r = int(data.get("r", 0))
|
||||
g = int(data.get("g", 0))
|
||||
b = int(data.get("b", 0))
|
||||
led_strip.set_rgb(i, r, g, b)
|
||||
print("Set LED to {} {} {}".format(r, g, b))
|
||||
else:
|
||||
# TODO Fix WS2812 / APA102 get methods to correct for colour order/alignment
|
||||
r, g, b, w = led_strip.get(i)
|
||||
r = int(r)
|
||||
g = int(g)
|
||||
b = int(b)
|
||||
|
||||
return """LED: {i}<br /><form method="post" action="/set_led/{i}">
|
||||
<input id="r" name="r" type="number" value="{r}" />
|
||||
<input name="g" type="number" value="{g}" />
|
||||
<input name="b" type="number" value="{b}" />
|
||||
<input type="submit" value="Set LED" />
|
||||
</form>""".format(i=i, r=r, g=g, b=b)
|
||||
|
||||
|
||||
# This wildcard route allows us to visit eg `/get_led/<index>`
|
||||
# to get the state of LED <index>
|
||||
@ppwhttp.route("/get_led/<int:index>", methods="GET")
|
||||
def get_led(method, url, data):
|
||||
i = data.get("index", 0)
|
||||
# TODO Fix WS2812 / APA102 get methods to correct for colour order/alignment
|
||||
r, g, b, w = led_strip.get(i)
|
||||
return "LED: {}<br />R: {:0.0f}<br />G: {:0.0f}<br />B: {:0.0f}".format(i, r, g, b)
|
||||
|
||||
|
||||
@ppwhttp.route("/test", methods="GET")
|
||||
def get_test(method, url):
|
||||
return "Hello World!"
|
||||
|
||||
|
||||
ppwhttp.start_wifi()
|
||||
|
||||
led_strip.start()
|
||||
|
||||
server_sock = ppwhttp.start_server()
|
||||
while True:
|
||||
ppwhttp.handle_http_request(server_sock)
|
||||
time.sleep(0.01)
|
||||
|
||||
|
||||
# Whoa there! Did you know you could run the server polling loop
|
||||
# on Pico's *other* core!? Here's how:
|
||||
#
|
||||
# import _thread
|
||||
#
|
||||
# def server_loop_forever():
|
||||
# # Start a server and continuously poll for HTTP requests
|
||||
# server_sock = ppwhttp.start_server()
|
||||
# while True:
|
||||
# ppwhttp.handle_http_request(server_sock)
|
||||
# time.sleep(0.01)
|
||||
#
|
||||
# Handle the server polling loop on the other core!
|
||||
# _thread.start_new_thread(server_loop_forever, ())
|
||||
#
|
||||
# # Your very own main loop for fun and profit!
|
||||
# while True:
|
||||
# print("Colour: {} {} {}".format(r, g, b))
|
||||
# time.sleep(5.0)
|
|
@ -0,0 +1,334 @@
|
|||
"""Pimoroni Pico Wireless HTTP
|
||||
|
||||
A super-simple HTTP server library for Pico Wireless.
|
||||
"""
|
||||
import time
|
||||
import picowireless
|
||||
|
||||
from micropython import const
|
||||
|
||||
try:
|
||||
from secrets import WIFI_SSID, WIFI_PASS
|
||||
except ImportError:
|
||||
WIFI_SSID = None
|
||||
WIFI_PASS = None
|
||||
|
||||
|
||||
TCP_CLOSED = const(0)
|
||||
TCP_LISTEN = const(1)
|
||||
|
||||
TCP_MODE = const(0)
|
||||
UDP_MODE = const(1)
|
||||
TLS_MODE = const(2)
|
||||
UDP_MULTICAST_MODE = const(3)
|
||||
TLS_BEARSSL_MODE = const(4)
|
||||
|
||||
TCP_STATE_CLOSED = const(0)
|
||||
TCP_STATE_LISTEN = const(1)
|
||||
TCP_STATE_SYN_SENT = const(2)
|
||||
TCP_STATE_SVN_RCVD = const(3)
|
||||
TCP_STATE_ESTABLISHED = const(4)
|
||||
TCP_STATE_FIN_WAIT_1 = const(5)
|
||||
TCP_STATE_FIN_WAIT_2 = const(6)
|
||||
TCP_STATE_CLOSE_WAIT = const(7)
|
||||
TCP_STATE_CLOSING = const(8)
|
||||
TCP_STATE_LAST_ACK = const(9)
|
||||
TCP_STATE_TIME_WAIT = const(10)
|
||||
|
||||
CLOUDFLARE_DNS = (1, 1, 1, 1)
|
||||
GOOGLE_DNS = (8, 8, 8, 8)
|
||||
|
||||
DEFAULT_HTTP_PORT = const(80)
|
||||
|
||||
|
||||
routes = {}
|
||||
sockets = []
|
||||
hosts = {}
|
||||
|
||||
|
||||
def set_led(r, g, b):
|
||||
"""Set """
|
||||
picowireless.set_led(r, g, b)
|
||||
|
||||
|
||||
def get_socket(force_new=False):
|
||||
global sockets
|
||||
if force_new or len(sockets) == 0:
|
||||
socket = picowireless.get_socket()
|
||||
sockets.append(socket)
|
||||
return socket
|
||||
return sockets[0]
|
||||
|
||||
|
||||
def get_ip_address():
|
||||
return picowireless.get_ip_address()
|
||||
|
||||
|
||||
def set_dns(dns):
|
||||
picowireless.set_dns(dns)
|
||||
|
||||
|
||||
def get_host_by_name(hostname, no_cache=False):
|
||||
# Already an IP
|
||||
if type(hostname) is tuple and len(hostname) == 4:
|
||||
return hostname
|
||||
|
||||
# Get from cached hosts
|
||||
if hostname in hosts and not no_cache:
|
||||
return hosts[hostname]
|
||||
|
||||
ip = picowireless.get_host_by_name(hostname)
|
||||
hosts[hostname] = ip
|
||||
return ip
|
||||
|
||||
|
||||
def start_wifi(wifi_ssid=WIFI_SSID, wifi_pass=WIFI_PASS):
|
||||
if wifi_ssid is None or wifi_pass is None:
|
||||
raise RuntimeError("WiFi SSID/PASS required. Set them in secrets.py and copy it to your Pico, or pass them as arguments.")
|
||||
picowireless.init()
|
||||
|
||||
print("Connecting to {}...".format(wifi_ssid))
|
||||
picowireless.wifi_set_passphrase(wifi_ssid, wifi_pass)
|
||||
|
||||
while True:
|
||||
if picowireless.get_connection_status() == 3:
|
||||
break
|
||||
print("Connected!")
|
||||
|
||||
|
||||
def start_server(http_port=DEFAULT_HTTP_PORT, timeout=5000):
|
||||
my_ip = picowireless.get_ip_address()
|
||||
print("Starting server...")
|
||||
server_sock = picowireless.get_socket()
|
||||
picowireless.server_start(http_port, server_sock, 0)
|
||||
|
||||
t_start = time.ticks_ms()
|
||||
|
||||
while time.ticks_ms() - t_start < timeout:
|
||||
state = picowireless.get_server_state(server_sock)
|
||||
if state == TCP_LISTEN:
|
||||
print("Server listening on {1}.{2}.{3}.{4}:{0}".format(http_port, *my_ip))
|
||||
return server_sock
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def connect_to_server(host_address, port, client_sock, timeout=5000, connection_mode=TCP_MODE):
|
||||
if connection_mode in (TLS_MODE, TLS_BEARSSL_MODE):
|
||||
print("Connecting to {1}:{0}...".format(port, host_address))
|
||||
picowireless.client_start(host_address, (0, 0, 0, 0), port, client_sock, connection_mode)
|
||||
else:
|
||||
host_address = get_host_by_name(host_address)
|
||||
print("Connecting to {1}.{2}.{3}.{4}:{0}...".format(port, *host_address))
|
||||
picowireless.client_start(host_address, port, client_sock, connection_mode)
|
||||
|
||||
t_start = time.ticks_ms()
|
||||
|
||||
while time.ticks_ms() - t_start < timeout:
|
||||
state = picowireless.get_client_state(client_sock)
|
||||
if state == TCP_STATE_ESTABLISHED:
|
||||
print("Connected!")
|
||||
return True
|
||||
if state == TCP_STATE_CLOSED:
|
||||
print("Connection failed!")
|
||||
return False
|
||||
print(state)
|
||||
time.sleep(0.5)
|
||||
|
||||
print("Connection timeout!")
|
||||
return False
|
||||
|
||||
|
||||
def http_request(host_address, port, request_host, request_path, handler, timeout=5000, client_sock=None, connection_mode=TCP_MODE):
|
||||
if client_sock is None:
|
||||
client_sock = get_socket()
|
||||
|
||||
if not connect_to_server(host_address, port, client_sock, connection_mode=connection_mode):
|
||||
return False
|
||||
|
||||
http_request = """GET {} HTTP/1.1
|
||||
Host: {}
|
||||
Connection: close
|
||||
|
||||
""".format(request_path, request_host).replace("\n", "\r\n")
|
||||
|
||||
picowireless.send_data(client_sock, http_request)
|
||||
|
||||
t_start = time.ticks_ms()
|
||||
|
||||
while True:
|
||||
if time.ticks_ms() - t_start > timeout:
|
||||
picowireless.client_stop(client_sock)
|
||||
print("HTTP request to {}:{} timed out...".format(host_address, port))
|
||||
return False
|
||||
|
||||
avail_length = picowireless.avail_data(client_sock)
|
||||
if avail_length > 0:
|
||||
break
|
||||
|
||||
print("Got response: {} bytes".format(avail_length))
|
||||
|
||||
response = b""
|
||||
|
||||
while len(response) < avail_length:
|
||||
data = picowireless.get_data_buf(client_sock)
|
||||
response += data
|
||||
|
||||
head, body = response.split(b"\r\n\r\n", 1)
|
||||
head = head.decode("utf-8")
|
||||
dhead = {}
|
||||
|
||||
for line in head.split("\r\n")[1:]:
|
||||
key, value = line.split(": ", 1)
|
||||
dhead[key] = value
|
||||
|
||||
encoding = "iso-8869-1"
|
||||
content_type = "application/octet-stream"
|
||||
|
||||
if "Content-Type" in dhead:
|
||||
ctype = dhead["Content-Type"].split("; ")
|
||||
content_type = ctype[0].lower()
|
||||
for c in ctype:
|
||||
if c.startswith("encoding="):
|
||||
encoding = c[9:]
|
||||
|
||||
# Handle JSON content type, this is prefixed with a length
|
||||
# which we'll parse and use to truncate the body
|
||||
if content_type == "application/json":
|
||||
if not body.startswith(b"{"):
|
||||
length, body = body.split(b"\r\n", 1)
|
||||
length = int(length, 16)
|
||||
body = body[:length]
|
||||
|
||||
body = body.decode(encoding)
|
||||
|
||||
handler(dhead, body)
|
||||
|
||||
picowireless.client_stop(client_sock)
|
||||
|
||||
|
||||
def find_route(route, url, method, data):
|
||||
if len(url) > 0:
|
||||
for key, value in route.items():
|
||||
if key == url[0]:
|
||||
return find_route(route[url[0]], url[1:], method, data)
|
||||
|
||||
elif key.startswith("<") and key.endswith(">"):
|
||||
key = key[1:-1]
|
||||
if ":" in key:
|
||||
dtype, key = key.split(":")
|
||||
else:
|
||||
dtype = "str"
|
||||
|
||||
if dtype == "int":
|
||||
try:
|
||||
data[key] = int(url[0])
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
else:
|
||||
data[key] = url[0]
|
||||
|
||||
return find_route(value, url[1:], method, data)
|
||||
|
||||
return None, None
|
||||
|
||||
if method in route:
|
||||
return route[method], data
|
||||
|
||||
return None, None
|
||||
|
||||
|
||||
def handle_http_request(server_sock, timeout=5000):
|
||||
t_start = time.ticks_ms()
|
||||
|
||||
client_sock = picowireless.avail_server(server_sock)
|
||||
if client_sock in [server_sock, 255, -1]:
|
||||
return False
|
||||
|
||||
print("Client connected!")
|
||||
|
||||
avail_length = picowireless.avail_data(client_sock)
|
||||
if avail_length == 0:
|
||||
picowireless.client_stop(client_sock)
|
||||
return False
|
||||
|
||||
request = b""
|
||||
|
||||
while len(request) < avail_length:
|
||||
data = picowireless.get_data_buf(client_sock)
|
||||
request += data
|
||||
if time.ticks_ms() - t_start > timeout:
|
||||
print("Client timed out getting data!")
|
||||
picowireless.client_stop(client_sock)
|
||||
return False
|
||||
|
||||
request = request.decode("utf-8")
|
||||
|
||||
if len(request) > 0:
|
||||
head, body = request.split("\r\n\r\n", 1)
|
||||
dhead = {}
|
||||
|
||||
for line in head.split("\r\n")[1:]:
|
||||
key, value = line.split(": ", 1)
|
||||
dhead[key] = value
|
||||
|
||||
method, url, _ = head.split("\r\n", 1)[0].split(" ")
|
||||
|
||||
print("Serving {} on {}...".format(method, url))
|
||||
|
||||
response = None
|
||||
|
||||
data = {}
|
||||
|
||||
if url.startswith("/"):
|
||||
url = url[1:]
|
||||
url = url.split("/")
|
||||
handler, data = find_route(routes, url, method, data)
|
||||
|
||||
# Dispatch the request to the relevant route
|
||||
if callable(handler):
|
||||
if method == "POST":
|
||||
for var in body.split("&"):
|
||||
key, value = var.split("=")
|
||||
data[key] = value
|
||||
|
||||
if data == {}:
|
||||
response = handler(method, url)
|
||||
else:
|
||||
response = handler(method, url, data)
|
||||
|
||||
if response is not None:
|
||||
response = "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: text/html\r\n\r\n".format(len(response)) + response
|
||||
picowireless.send_data(client_sock, response)
|
||||
picowireless.client_stop(client_sock)
|
||||
print("Success! Sending 200 OK")
|
||||
return True
|
||||
else:
|
||||
picowireless.send_data(client_sock, "HTTP/1.1 501 Not Implemented\r\nContent-Length: 19\r\n\r\n501 Not Implemented")
|
||||
picowireless.client_stop(client_sock)
|
||||
print("Unhandled Request! Sending 501 OK")
|
||||
return False
|
||||
|
||||
|
||||
def route(url, methods="GET"):
|
||||
if type(methods) is str:
|
||||
methods = [methods]
|
||||
|
||||
if url.startswith("/"):
|
||||
url = url[1:]
|
||||
|
||||
url = url.split("/")
|
||||
|
||||
def decorate(handler):
|
||||
route = routes
|
||||
for part in url:
|
||||
if part not in route:
|
||||
route[part] = {}
|
||||
|
||||
route = route[part]
|
||||
|
||||
for method in methods:
|
||||
route[method] = handler
|
||||
|
||||
return decorate
|
|
@ -1,143 +1,26 @@
|
|||
import time
|
||||
import picowireless
|
||||
from micropython import const
|
||||
|
||||
WIFI_SSID = "your SSID here!"
|
||||
WIFI_PASS = "Your PSK here!"
|
||||
try:
|
||||
import ppwhttp
|
||||
except ImportError:
|
||||
raise RuntimeError("Cannot find ppwhttp. Have you copied ppwhttp.py to your Pico?")
|
||||
|
||||
TCP_CLOSED = 0
|
||||
TCP_LISTEN = 1
|
||||
|
||||
CLOUDFLARE_DNS = (1, 1, 1, 1)
|
||||
GOOGLE_DNS = (8, 8, 8, 8)
|
||||
|
||||
TCP_MODE = const(0)
|
||||
HTTP_REQUEST_DELAY = const(30)
|
||||
HTTP_PORT = 80
|
||||
|
||||
routes = {}
|
||||
|
||||
r = 0
|
||||
g = 0
|
||||
b = 0
|
||||
|
||||
|
||||
def start_wifi():
|
||||
picowireless.init()
|
||||
|
||||
print("Connecting to {}...".format(WIFI_SSID))
|
||||
picowireless.wifi_set_passphrase(WIFI_SSID, WIFI_PASS)
|
||||
|
||||
while True:
|
||||
if picowireless.get_connection_status() == 3:
|
||||
break
|
||||
print("Connected!")
|
||||
|
||||
|
||||
def start_server(http_port, timeout=1.0):
|
||||
my_ip = picowireless.get_ip_address()
|
||||
print("Starting server...")
|
||||
server_sock = picowireless.get_socket()
|
||||
picowireless.server_start(http_port, server_sock, 0)
|
||||
|
||||
t_start = time.time()
|
||||
|
||||
while time.time() - t_start < timeout:
|
||||
state = picowireless.get_server_state(server_sock)
|
||||
if state == TCP_LISTEN:
|
||||
print("Server listening on {1}.{2}.{3}.{4}:{0}".format(http_port, *my_ip))
|
||||
return server_sock
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def handle_http_request(server_sock, timeout=1.0):
|
||||
t_start = time.time()
|
||||
|
||||
client_sock = picowireless.avail_server(server_sock)
|
||||
if client_sock in [server_sock, 255, -1]:
|
||||
return False
|
||||
|
||||
print("Client connected!")
|
||||
|
||||
avail_length = picowireless.avail_data(client_sock)
|
||||
if avail_length == 0:
|
||||
picowireless.client_stop(client_sock)
|
||||
return False
|
||||
|
||||
request = b""
|
||||
|
||||
while len(request) < avail_length:
|
||||
data = picowireless.get_data_buf(client_sock)
|
||||
request += data
|
||||
if time.time() - t_start > timeout:
|
||||
print("Client timed out getting data!")
|
||||
picowireless.client_stop(client_sock)
|
||||
return False
|
||||
|
||||
request = request.decode("utf-8")
|
||||
|
||||
if len(request) > 0:
|
||||
head, body = request.split("\r\n\r\n", 1)
|
||||
dhead = {}
|
||||
|
||||
for line in head.split("\r\n")[1:]:
|
||||
key, value = line.split(": ", 1)
|
||||
dhead[key] = value
|
||||
|
||||
method, url, _ = head.split("\r\n", 1)[0].split(" ")
|
||||
|
||||
print("Serving {} on {}...".format(method, url))
|
||||
|
||||
response = None
|
||||
|
||||
# Dispatch the request to the relevant route
|
||||
if url in routes and method in routes[url] and callable(routes[url][method]):
|
||||
if method == "POST":
|
||||
data = {}
|
||||
for var in body.split("&"):
|
||||
key, value = var.split("=")
|
||||
data[key] = value
|
||||
response = routes[url][method](method, url, data)
|
||||
else:
|
||||
response = routes[url][method](method, url)
|
||||
|
||||
if response is not None:
|
||||
response = "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: text/html\r\n\r\n".format(len(response)) + response
|
||||
picowireless.send_data(client_sock, response)
|
||||
picowireless.client_stop(client_sock)
|
||||
print("Success! Sending 200 OK")
|
||||
return True
|
||||
else:
|
||||
picowireless.send_data(client_sock, "HTTP/1.1 501 Not Implemented\r\nContent-Length: 19\r\n\r\n501 Not Implemented")
|
||||
picowireless.client_stop(client_sock)
|
||||
print("Unhandled Request! Sending 501 OK")
|
||||
return False
|
||||
|
||||
|
||||
def route(url, methods="GET"):
|
||||
if type(methods) is str:
|
||||
methods = [methods]
|
||||
|
||||
def decorate(handler):
|
||||
for method in methods:
|
||||
if url not in routes:
|
||||
routes[url] = {}
|
||||
routes[url][method] = handler
|
||||
|
||||
return decorate
|
||||
|
||||
|
||||
# Edit your routes here
|
||||
# Nothing fancy is supported, just plain ol' URLs and GET/POST methods
|
||||
@route("/", methods=["GET", "POST"])
|
||||
@ppwhttp.route("/", methods=["GET", "POST"])
|
||||
def get_home(method, url, data=None):
|
||||
if method == "POST":
|
||||
global r, g, b
|
||||
r = int(data.get("r", 0))
|
||||
g = int(data.get("g", 0))
|
||||
b = int(data.get("b", 0))
|
||||
picowireless.set_led(r, g, b)
|
||||
ppwhttp.set_led(r, g, b)
|
||||
print("Set LED to {} {} {}".format(r, g, b))
|
||||
|
||||
return """<form method="post" action="/">
|
||||
|
@ -148,16 +31,16 @@ def get_home(method, url, data=None):
|
|||
</form>""".format(r=r, g=g, b=b)
|
||||
|
||||
|
||||
@route("/test", methods="GET")
|
||||
@ppwhttp.route("/test", methods="GET")
|
||||
def get_test(method, url):
|
||||
return "Hello World!"
|
||||
|
||||
|
||||
start_wifi()
|
||||
ppwhttp.start_wifi()
|
||||
|
||||
server_sock = start_server(HTTP_PORT)
|
||||
server_sock = ppwhttp.start_server()
|
||||
while True:
|
||||
handle_http_request(server_sock)
|
||||
ppwhttp.handle_http_request(server_sock)
|
||||
time.sleep(0.01)
|
||||
|
||||
|
||||
|
@ -168,9 +51,9 @@ while True:
|
|||
#
|
||||
# def server_loop_forever():
|
||||
# # Start a server and continuously poll for HTTP requests
|
||||
# server_sock = start_server(HTTP_PORT)
|
||||
# server_sock = ppwhttp.start_server()
|
||||
# while True:
|
||||
# handle_http_request(server_sock)
|
||||
# ppwhttp.handle_http_request(server_sock)
|
||||
# time.sleep(0.01)
|
||||
#
|
||||
# Handle the server polling loop on the other core!
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
import picowireless
|
||||
import time
|
||||
|
||||
picowireless.init()
|
||||
|
||||
picowireless.start_scan_networks()
|
||||
|
||||
while True:
|
||||
networks = picowireless.get_scan_networks()
|
||||
print("Found {} network(s)...".format(networks))
|
||||
for network in range(networks):
|
||||
ssid = picowireless.get_ssid_networks(network)
|
||||
print("{}: {}".format(network, ssid))
|
||||
print("\n")
|
||||
time.sleep(10)
|
|
@ -0,0 +1,2 @@
|
|||
WIFI_SSID = "your SSID here!"
|
||||
WIFI_PASS = "Your PSK here!"
|
|
@ -370,8 +370,7 @@ mp_obj_t picowireless_get_ssid_networks(size_t n_args, const mp_obj_t *pos_args,
|
|||
uint8_t network_item = args[ARG_network_item].u_int;
|
||||
const char* ssid = wireless->get_ssid_networks(network_item);
|
||||
if(ssid != nullptr) {
|
||||
std::string str_ssid(ssid, WL_SSID_MAX_LENGTH);
|
||||
return mp_obj_new_str(str_ssid.c_str(), str_ssid.length());
|
||||
return mp_obj_new_str(ssid, strnlen(ssid, WL_SSID_MAX_LENGTH));
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -516,8 +515,7 @@ mp_obj_t picowireless_get_host_by_name(size_t n_args, const mp_obj_t *pos_args,
|
|||
mp_obj_t picowireless_get_fw_version() {
|
||||
if(wireless != nullptr) {
|
||||
const char* fw_ver = wireless->get_fw_version();
|
||||
std::string str_fw_ver(fw_ver, WL_FW_VER_LENGTH);
|
||||
return mp_obj_new_str(str_fw_ver.c_str(), str_fw_ver.length());
|
||||
return mp_obj_new_str(fw_ver, strnlen(fw_ver, WL_FW_VER_LENGTH));
|
||||
}
|
||||
else
|
||||
mp_raise_msg(&mp_type_RuntimeError, NOT_INITIALISED_MSG);
|
||||
|
|
Loading…
Reference in New Issue