Source code for wolframclient.utils.asyncio
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import asyncio
import functools
from wolframclient.utils.functional import first, iterate
[docs]def run_in_loop(cor, loop=None):
@functools.wraps(cor)
def wrapped(*args, **kwargs):
return get_event_loop(loop).run_until_complete(cor(*args, **kwargs))
return wrapped
[docs]def run_all(args, **opts):
done = tuple(iterate(*args))
if done and len(done) > 1:
return asyncio.ensure_future(asyncio.wait(done), **opts)
elif done:
return asyncio.ensure_future(first(done), **opts)
return done
[docs]def get_event_loop(loop=None):
try:
return loop or asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
[docs]def silence(*exceptions):
def wrap(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
try:
return fn(*args, **kwargs)
except tuple(exceptions):
pass
return wrapper
return wrap
if hasattr(asyncio, "create_task"):
create_task = asyncio.create_task
else:
def create_task(coro):
""" ensure_future using get_event_loop, so that it behaves similarly to
create_task, and gets the same signature.
"""
return asyncio.ensure_future(coro, loop=asyncio.get_event_loop())