Martin Richard, alwaysdata
asynctest
Presentation of asynctest and how it helps you testing asyncio code.
TestCase
and related featuresTestCase
and the loopasynctest.TestCase
overrides unittest.TestCase
self.loop
TestCase
support of coroutinesimport asynctest, piedpiper
class Test_MyFeature(asynctest.TestCase):
async def setUp(self):
...
async def tearDown(self):
...
async def test_a_case(self):
self.server = await piedpiper.start_server()
self.addCleanup(self.server.close_and_wait_closed)
TestCase
: advicessetUp()
and tearDown()
,setUpClass()
and tearDownClass()
can't be coroutinesimport asyncio, asynctest, piedpiper
class Test_MyFeature(asynctest.TestCase):
# Don't issue a new loop for each test, use the result of
# asyncio.get_event_loop()
use_default_loop = True
def setUpClass(cls):
# set your customized loop
asyncio.set_event_loop(piedpiper.get_loop())
import asynctest, asyncio, piedpiper
class Test_MyFeature(asynctest.TestCase):
@asyncio.coroutine
def test_old_style_coroutine(self):
data = yield from piedpiper.compress()
self.assertIsSmall(data)
@asynctest.fail_on(unused_loop=False)
def test_a_feature_without_async(self):
self.assertTrue("middleOut", piedpiper.method())
ClockedTestCase
import asynctest, piedpiper
class Test_PeriodicRefresh(asynctest.TestCase):
async def test_refreshed(self):
downloader = piedpiper.Downloader()
self.assertEqual("nb_calls: 0", downloader.data)
downloader.refresh(5)
await self.advance(5)
# 5 seconds after refresh(5) was set, data is updated
self.assertEqual("nb_calls: 1", downloader.data)
await self.advance(10)
# Updated two more times.
self.assertEqual("nb_calls: 3", downloader.data)
import asynctest, asyncio, piedpiper
@asynctest.fail_on(active_handles=True)
class Test_MyFeature(asynctest.TestCase):
async def test_with_a_callback(self):
self.loop.call_later(1, piedpiper.callback)
# this test will fail, as this callback
# will not run during the test
async def test_with_a_cancelled_callback(self):
handle = self.loop.call_later(1, piedpiper.callback)
handle.cancel()
# this test will not fail
async def download(self):
host, port, query, ssl = self.get_parsed_url()
reader, writer = await asyncio.open_connection(host, port, ssl=ssl)
try:
writer.write(self._build_request(host, query))
response_headers = await reader.readuntil(b"/r/n/r/n")
code, payload_size = self._parse_response_headers(response_headers)
if code != 200:
raise RuntimeError(
"Server answered with unsupported code {}".format(code))
self.data = await reader.read(payload_size)
finally:
writer.close()
return self.data
def create_mocks():
reader = asynctest.mock.Mock(asyncio.StreamReader)
writer = asynctest.mock.Mock(asyncio.StreamWriter)
reader.read.return_value = b"MiddleOut"
reader.readuntil.return_value = b"HTTP/1.1 200 OK\r\n..."
return reader, writer
asynctest.mock.Mock()
uses its spec
and detects coroutines,asynctest.mock.CoroutineMock()
asynctest.mock.patch
@patch("asyncio.open_connection", side_effect=create_mocks)
async def test_download_resource(self):
downloader = ResourceDownloader(
"http://piedpiper.com/compression")
payload = await downloader.download()
self.assertEqual(payload, b"MiddleOut")
@patch
decorating a coroutineasync def test_download_resource(self):
with patch("asyncio.open_connection", side_effect=create_mocks):
downloader = ResourceDownloader(
"http://piedpiper.com/compression")
payload = await downloader.download()
self.assertEqual(payload, b"MiddleOut")
But what if a concurrent task must not be affected by the patch?
@patch
decorating a coroutine@patch("asyncio.open_connection",
side_effect=create_mocks, scope=asynctest.LIMITED)
async def test_download_resource(self):
downloader = ResourceDownloader(
"http://piedpiper.com/compression")
payload = await downloader.download()
self.assertEqual(payload, b"MiddleOut")
The patch is disabled when the coroutine (task) yields to the scheduler.
import asyncio, asynctest
class Test_LowLevel(asynctest.TestCase):
async def test_using_selector(self):
mock_socket = asynctest.selector.SocketMock()
event = asyncio.Event()
self.loop.add_reader(mock_socket, event.set)
asynctest.selector.set_read_ready(mock_socket, self.loop)
try:
await asyncio.wait_for(event.wait(), timeout=1)
finally:
self.loop.remove_reader(mock_socket)
TestSelector
FileMock()
, SSLSocketMock()
, ...TestCase
(but not with proactor)@asynctest.fail_on(active_selector_callbacks=True)
def test_using_selector(self):
mock_socket = asynctest.selector.SocketMock()
event = asyncio.Event()
self.loop.add_reader(mock_socket, event.set)
asynctest.selector.set_read_ready(mock_socket, self.loop)
try:
await asyncio.wait_for(event.wait(), timeout=1)
finally:
self.loop.remove_reader(mock_socket)
asynctest
pytest-asyncio
: less features, but you can use asynctest with itREADME
or CONTRIBUTING
file$ git clone
$ python -m unittest
(or nose
, pytest
, tox
, ...)martius@martiusweb.net
https://marti.us
Martiusweb