diff --git a/micropython/examples/pico_wireless/cheerlights.py b/micropython/examples/pico_wireless/cheerlights.py new file mode 100644 index 00000000..7efe3b81 --- /dev/null +++ b/micropython/examples/pico_wireless/cheerlights.py @@ -0,0 +1,112 @@ +import time +import picowireless +from micropython import const + +WIFI_SSID = "your SSID here!" +WIFI_PASS = "Your PSK here!" + +CLOUDFLARE_DNS = (1, 1, 1, 1) +GOOGLE_DNS = (8, 8, 8, 8) +USE_DNS = CLOUDFLARE_DNS + +TCP_MODE = const(0) +HTTP_REQUEST_DELAY = const(30) +HTTP_PORT = 80 +HTTP_REQUEST_HOST = "api.thingspeak.com" +HTTP_REQUEST_PATH = "/channels/1417/field/2/last.txt" + + +def connect(host_address, port, client_sock, timeout=1000): + picowireless.client_start(host_address, port, client_sock, TCP_MODE) + + t_start = time.time() + timeout /= 1000.0 + + while time.time() - t_start < timeout: + state = picowireless.get_client_state(client_sock) + if state == 4: + return True + time.sleep(1.0) + + return False + + +def http_request(client_sock, host_address, port, request_host, request_path, handler): + print("Connecting to {1}.{2}.{3}.{4}:{0}...".format(port, *host_address)) + if not connect(host_address, port, client_sock): + print("Connection failed!") + return False + print("Connected!") + + http_request = """GET {} HTTP/1.1 +Host: {} +Connection: close + +""".format(request_path, request_host).replace("\n", "\r\n") + + picowireless.send_data(client_sock, http_request) + + while True: + avail_length = picowireless.avail_data(client_sock) + if avail_length > 0: + break + + print("Got response: {} bytes".format(avail_length)) + + response = b"" + + while len(response) < avail_length: + data = picowireless.get_data_buf(client_sock) + response += data + + response = response.decode("utf-8") + + head, body = response.split("\r\n\r\n", 1) + dhead = {} + + for line in head.split("\r\n")[1:]: + key, value = line.split(": ", 1) + dhead[key] = value + + handler(dhead, body) + + picowireless.client_stop(client_sock) + + +picowireless.init() + +print("Connecting to {}...".format(WIFI_SSID)) +picowireless.wifi_set_passphrase(WIFI_SSID, WIFI_PASS) + +while True: + if picowireless.get_connection_status() == 3: + break +print("Connected!") + +# Get our own local IP! +my_ip = picowireless.get_ip_address() +print("Local IP: {}.{}.{}.{}".format(*my_ip)) + +# Resolve and cache the IP address +picowireless.set_dns(USE_DNS) +http_address = picowireless.get_host_by_name(HTTP_REQUEST_HOST) +print("Resolved {} to {}.{}.{}.{}".format(HTTP_REQUEST_HOST, *http_address)) + +client_sock = picowireless.get_socket() + + +def handler(head, body): + if head["Status"] == "200 OK": + color = body[1:] + r = int(color[0:2], 16) + g = int(color[3:4], 16) + b = int(color[5:6], 16) + picowireless.set_led(r, g, b) + print("Set LED to {} {} {}".format(r, g, b)) + else: + print("Error: {}".format(head["Status"])) + + +while True: + http_request(client_sock, http_address, HTTP_PORT, HTTP_REQUEST_HOST, HTTP_REQUEST_PATH, handler) + time.sleep(60.0) diff --git a/micropython/examples/pico_wireless/rgb_http.py b/micropython/examples/pico_wireless/rgb_http.py new file mode 100644 index 00000000..60db461c --- /dev/null +++ b/micropython/examples/pico_wireless/rgb_http.py @@ -0,0 +1,182 @@ +import time +import picowireless +from micropython import const + +WIFI_SSID = "your SSID here!" +WIFI_PASS = "Your PSK here!" + +TCP_CLOSED = 0 +TCP_LISTEN = 1 + +CLOUDFLARE_DNS = (1, 1, 1, 1) +GOOGLE_DNS = (8, 8, 8, 8) + +TCP_MODE = const(0) +HTTP_REQUEST_DELAY = const(30) +HTTP_PORT = 80 + +routes = {} + +r = 0 +g = 0 +b = 0 + + +def start_wifi(): + picowireless.init() + + print("Connecting to {}...".format(WIFI_SSID)) + picowireless.wifi_set_passphrase(WIFI_SSID, WIFI_PASS) + + while True: + if picowireless.get_connection_status() == 3: + break + print("Connected!") + + +def start_server(http_port, timeout=1.0): + my_ip = picowireless.get_ip_address() + print("Starting server...") + server_sock = picowireless.get_socket() + picowireless.server_start(http_port, server_sock, 0) + + t_start = time.time() + + while time.time() - t_start < timeout: + state = picowireless.get_server_state(server_sock) + if state == TCP_LISTEN: + print("Server listening on {1}.{2}.{3}.{4}:{0}".format(http_port, *my_ip)) + return server_sock + + return None + + +def handle_http_request(server_sock, timeout=1.0): + t_start = time.time() + + client_sock = picowireless.avail_server(server_sock) + if client_sock in [server_sock, 255, -1]: + return False + + print("Client connected!") + + avail_length = picowireless.avail_data(client_sock) + if avail_length == 0: + picowireless.client_stop(client_sock) + return False + + request = b"" + + while len(request) < avail_length: + data = picowireless.get_data_buf(client_sock) + request += data + if time.time() - t_start > timeout: + print("Client timed out getting data!") + picowireless.client_stop(client_sock) + return False + + request = request.decode("utf-8") + + if len(request) > 0: + head, body = request.split("\r\n\r\n", 1) + dhead = {} + + for line in head.split("\r\n")[1:]: + key, value = line.split(": ", 1) + dhead[key] = value + + method, url, _ = head.split("\r\n", 1)[0].split(" ") + + print("Serving {} on {}...".format(method, url)) + + response = None + + # Dispatch the request to the relevant route + if url in routes and method in routes[url] and callable(routes[url][method]): + if method == "POST": + data = {} + for var in body.split("&"): + key, value = var.split("=") + data[key] = value + response = routes[url][method](method, url, data) + else: + response = routes[url][method](method, url) + + if response is not None: + response = "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: text/html\r\n\r\n".format(len(response)) + response + picowireless.send_data(client_sock, response) + picowireless.client_stop(client_sock) + print("Success! Sending 200 OK") + return True + else: + picowireless.send_data(client_sock, "HTTP/1.1 501 Not Implemented\r\nContent-Length: 19\r\n\r\n501 Not Implemented") + picowireless.client_stop(client_sock) + print("Unhandled Request! Sending 501 OK") + return False + + +def route(url, methods="GET"): + if type(methods) is str: + methods = [methods] + + def decorate(handler): + for method in methods: + if url not in routes: + routes[url] = {} + routes[url][method] = handler + + return decorate + + +# Edit your routes here +# Nothing fancy is supported, just plain ol' URLs and GET/POST methods +@route("/", methods=["GET", "POST"]) +def get_home(method, url, data=None): + if method == "POST": + global r, g, b + r = int(data.get("r", 0)) + g = int(data.get("g", 0)) + b = int(data.get("b", 0)) + picowireless.set_led(r, g, b) + print("Set LED to {} {} {}".format(r, g, b)) + + return """
""".format(r=r, g=g, b=b) + + +@route("/test", methods="GET") +def get_test(method, url): + return "Hello World!" + + +start_wifi() + +server_sock = start_server(HTTP_PORT) +while True: + handle_http_request(server_sock) + time.sleep(0.01) + + +# Whoa there! Did you know you could run the server polling loop +# on Pico's *other* core!? Here's how: +# +# import _thread +# +# def server_loop_forever(): +# # Start a server and continuously poll for HTTP requests +# server_sock = start_server(HTTP_PORT) +# while True: +# handle_http_request(server_sock) +# time.sleep(0.01) +# +# Handle the server polling loop on the other core! +# _thread.start_new_thread(server_loop_forever, ()) +# +# # Your very own main loop for fun and profit! +# while True: +# print("Colour: {} {} {}".format(r, g, b)) +# time.sleep(5.0)