tests/extmod: Add coverage tests for select module.
Signed-off-by: Damien George <damien@micropython.org>
This commit is contained in:
parent
3f417e8943
commit
6b78a1bf00
|
@ -0,0 +1,55 @@
|
|||
# Test select.ipoll().
|
||||
|
||||
try:
|
||||
import socket, select
|
||||
except ImportError:
|
||||
print("SKIP")
|
||||
raise SystemExit
|
||||
|
||||
|
||||
def print_poll_output(lst):
|
||||
print([(type(obj), flags) for obj, flags in lst])
|
||||
|
||||
|
||||
poller = select.poll()
|
||||
|
||||
# Use a new UDP socket for tests, which should be writable but not readable.
|
||||
try:
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
s.bind(socket.getaddrinfo("127.0.0.1", 8000)[0][-1])
|
||||
except OSError:
|
||||
print("SKIP")
|
||||
raise SystemExit
|
||||
|
||||
poller.register(s)
|
||||
|
||||
# Basic polling.
|
||||
print_poll_output(poller.ipoll(0))
|
||||
|
||||
# Pass in flags=1 for one-shot behaviour.
|
||||
print_poll_output(poller.ipoll(0, 1))
|
||||
|
||||
# Socket should be deregistered and poll should return nothing.
|
||||
print_poll_output(poller.ipoll(0))
|
||||
|
||||
# Create a second socket.
|
||||
s2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
s2.bind(socket.getaddrinfo("127.0.0.1", 8001)[0][-1])
|
||||
|
||||
# Register both sockets (to reset the first one).
|
||||
poller.register(s)
|
||||
poller.register(s2)
|
||||
|
||||
# Basic polling with two sockets.
|
||||
print_poll_output(poller.ipoll(0))
|
||||
|
||||
# Unregister the first socket, to test polling the remaining one.
|
||||
poller.unregister(s)
|
||||
print_poll_output(poller.ipoll(0))
|
||||
|
||||
# Unregister the second socket, to test polling none.
|
||||
poller.unregister(s2)
|
||||
print_poll_output(poller.ipoll(0))
|
||||
|
||||
s2.close()
|
||||
s.close()
|
|
@ -0,0 +1,6 @@
|
|||
[(<class 'socket'>, 4)]
|
||||
[(<class 'socket'>, 4)]
|
||||
[]
|
||||
[(<class 'socket'>, 4), (<class 'socket'>, 4)]
|
||||
[(<class 'socket'>, 4)]
|
||||
[]
|
|
@ -16,6 +16,7 @@ poller.register(s)
|
|||
# "Registering a file descriptor that’s already registered is not an error,
|
||||
# and has the same effect as registering the descriptor exactly once."
|
||||
poller.register(s)
|
||||
poller.register(s, select.POLLIN | select.POLLOUT)
|
||||
|
||||
# 2 args are mandatory unlike register()
|
||||
try:
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
# Test custom pollable objects implemented in Python.
|
||||
|
||||
from micropython import const
|
||||
|
||||
try:
|
||||
import socket, select, io
|
||||
except ImportError:
|
||||
print("SKIP")
|
||||
raise SystemExit
|
||||
|
||||
_MP_STREAM_POLL = const(3)
|
||||
_MP_STREAM_GET_FILENO = const(10)
|
||||
|
||||
_MP_STREAM_POLL_RD = const(0x0001)
|
||||
_MP_STREAM_POLL_WR = const(0x0004)
|
||||
|
||||
|
||||
def print_poll_output(lst):
|
||||
print([(type(obj), flags) for obj, flags in lst])
|
||||
|
||||
|
||||
class CustomPollable(io.IOBase):
|
||||
def __init__(self):
|
||||
self.poll_state = 0
|
||||
|
||||
def ioctl(self, cmd, arg):
|
||||
if cmd == _MP_STREAM_GET_FILENO:
|
||||
# Bare-metal ports don't call this ioctl, so don't print it.
|
||||
return -1
|
||||
|
||||
print("CustomPollable.ioctl", cmd, arg)
|
||||
if cmd == _MP_STREAM_POLL:
|
||||
if self.poll_state == "delay_rd":
|
||||
self.poll_state = _MP_STREAM_POLL_RD
|
||||
return 0
|
||||
elif self.poll_state < 0:
|
||||
return self.poll_state
|
||||
else:
|
||||
return self.poll_state & arg
|
||||
|
||||
|
||||
poller = select.poll()
|
||||
|
||||
# Use a new UDP socket for tests, which should be writable but not readable.
|
||||
try:
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
s.bind(socket.getaddrinfo("127.0.0.1", 8000)[0][-1])
|
||||
except OSError:
|
||||
print("SKIP")
|
||||
raise SystemExit
|
||||
|
||||
x = CustomPollable()
|
||||
|
||||
# Register both a file-descriptor-based object and a custom pure-Python object.
|
||||
poller.register(s)
|
||||
poller.register(x)
|
||||
|
||||
# Modify the flags for the custom object.
|
||||
poller.modify(x, select.POLLIN)
|
||||
|
||||
# Test polling.
|
||||
print_poll_output(poller.poll(0))
|
||||
x.poll_state = _MP_STREAM_POLL_WR
|
||||
print_poll_output(poller.poll(0))
|
||||
x.poll_state = _MP_STREAM_POLL_RD
|
||||
print_poll_output(poller.poll(0))
|
||||
|
||||
# The custom object becomes readable only after being polled.
|
||||
poller.modify(s, select.POLLIN)
|
||||
x.poll_state = "delay_rd"
|
||||
print_poll_output(poller.poll())
|
||||
|
||||
# The custom object returns an error.
|
||||
x.poll_state = -1000
|
||||
try:
|
||||
poller.poll(0)
|
||||
except OSError as er:
|
||||
print("OSError", er.errno)
|
||||
|
||||
poller.unregister(x)
|
||||
poller.unregister(s)
|
||||
|
||||
s.close()
|
|
@ -0,0 +1,11 @@
|
|||
CustomPollable.ioctl 3 1
|
||||
[(<class 'socket'>, 4)]
|
||||
CustomPollable.ioctl 3 1
|
||||
[(<class 'socket'>, 4)]
|
||||
CustomPollable.ioctl 3 1
|
||||
[(<class 'socket'>, 4), (<class 'CustomPollable'>, 1)]
|
||||
CustomPollable.ioctl 3 1
|
||||
CustomPollable.ioctl 3 1
|
||||
[(<class 'CustomPollable'>, 1)]
|
||||
CustomPollable.ioctl 3 1
|
||||
OSError 1000
|
|
@ -0,0 +1,47 @@
|
|||
# Test interruption of select.poll by EINTR signal, when
|
||||
# MICROPY_PY_SELECT_POSIX_OPTIMISATIONS is enabled.
|
||||
|
||||
try:
|
||||
import time, gc, select, socket, _thread
|
||||
|
||||
time.time_ns # Check for time_ns on MicroPython
|
||||
select.poll # Raises AttributeError for CPython implementations without poll()
|
||||
except (ImportError, AttributeError):
|
||||
print("SKIP")
|
||||
raise SystemExit
|
||||
|
||||
|
||||
def thread_main():
|
||||
lock.acquire()
|
||||
time.sleep(0.2)
|
||||
print("thread gc start")
|
||||
# The unix gc.collect() implementation will raise EINTR on other threads.
|
||||
# Could possibly use _thread._interrupt_main() instead if MicroPython had it.
|
||||
gc.collect()
|
||||
print("thread gc end")
|
||||
|
||||
|
||||
# Start a thread to interrupt the main thread during its call to poll.
|
||||
lock = _thread.allocate_lock()
|
||||
lock.acquire()
|
||||
_thread.start_new_thread(thread_main, ())
|
||||
|
||||
# Use a new UDP socket for tests, which should be writable but not readable.
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
s.bind(socket.getaddrinfo("127.0.0.1", 8000)[0][-1])
|
||||
|
||||
# Create the poller object.
|
||||
poller = select.poll()
|
||||
poller.register(s, select.POLLIN)
|
||||
|
||||
# Poll on the UDP socket for a set timeout, which should be reached.
|
||||
print("poll")
|
||||
lock.release()
|
||||
t0 = time.time_ns()
|
||||
result = poller.poll(400)
|
||||
dt_ms = (time.time_ns() - t0) / 1e6
|
||||
print("result:", result)
|
||||
print("dt in range:", 380 <= dt_ms <= 500)
|
||||
|
||||
# Clean up.
|
||||
s.close()
|
|
@ -0,0 +1,44 @@
|
|||
# Test select.poll in combination with file descriptors.
|
||||
|
||||
try:
|
||||
import select, errno
|
||||
|
||||
select.poll # Raises AttributeError for CPython implementations without poll()
|
||||
except (ImportError, AttributeError):
|
||||
print("SKIP")
|
||||
raise SystemExit
|
||||
|
||||
# Check that poll supports registering file descriptors (integers).
|
||||
try:
|
||||
select.poll().register(0)
|
||||
except OSError:
|
||||
print("SKIP")
|
||||
raise SystemExit
|
||||
|
||||
# Register invalid file descriptor.
|
||||
try:
|
||||
select.poll().register(-1)
|
||||
except ValueError:
|
||||
print("ValueError")
|
||||
|
||||
# Test polling stdout, it should be writable.
|
||||
poller = select.poll()
|
||||
poller.register(1)
|
||||
poller.modify(1, select.POLLOUT)
|
||||
print(poller.poll())
|
||||
|
||||
# Unregister then re-register.
|
||||
poller.unregister(1)
|
||||
poller.register(1, select.POLLIN)
|
||||
|
||||
# Poll for input, should return an empty list.
|
||||
print(poller.poll(0))
|
||||
|
||||
# Test registering a very large number of file descriptors.
|
||||
poller = select.poll()
|
||||
for fd in range(6000):
|
||||
poller.register(fd)
|
||||
try:
|
||||
poller.poll()
|
||||
except OSError as er:
|
||||
print(er.errno == errno.EINVAL)
|
Loading…
Reference in New Issue