Why we need an Async While?

If you need to run your coroutines and each must be executed independent, like receive stream data from a list of sources, in which every source have different frecuencies, you need this.

TaskLoop provides a set of functions and asyncio coroutines who can help you on your mission.

You need

fn1 ->->->->->->->-> |
fn2 —>—>—>—> |
. |
. |
. |
fnN –>–>–>–>–>–> |

You can’t do a /for/ and obtain good results. You need free tasks.

So, the model you need is for every source-coroutine is:

../_images/coroloop.png

The implementation consider:

  1. Define coroutine
  2. The couroutine need some input?
  3. The input is the same ever or change with the result?
  4. Create a couroutine mask who await the couroutine and manage the input
  5. Call the coro-mask in a task generator, scheduling it.
  6. Set a callback when task is done
  7. When task is done the callback must be the same task generator

In a schemma, the system is:

digraph async_while{ // graph from left to right rankdir=LR; splines=true; node [shape=box]; args [label="Args input"] coro_ob [label="Your Coroutine object"] task_0 [label="First task scheduled"] renew [label="Renew task when done"] task_k [label="Task(k)"] coro_mask [label="Coroutine manage your coroutine"] coro_ob -> task_0; args -> task_0; task_0 -> renew [label="Call when done"]; coro_ob -> renew; coro_mask -> renew; renew -> task_k [label="Define"]; task_k -> renew [label="Call when done"]; }

The methods or functions avalaible for do this schemma are:

Function Inputs Do
coromask [coro, args, fargs] await coro
renew [task, coro, fargs,*args] renew task
simple_fargs [input, output] return input
simple_fargs_out [input, output] return output

The following code is an example about how to use the taskloop methods, we have two coroutines, define a fargs for every coroutine, and use the methods on a asyncio event loop.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
 import asyncio
 import functools

 async def holacoro(v):
     print("Hola %d" % v)
     await asyncio.sleep(1)
     return v+1

 async def sumacoro(*args):
     c=sum(args)
     print("La suma es %d" %c)
     await asyncio.sleep(3)
     return c

 def fargs_holacoro(args, obtained):
     return [obtained]

 def fargs_sumacoro(args, obtained):
     result= [args[-1], obtained]
     return result

 #every coroutine
 async def coromask(coro, args, fargs):
      _in=args
      obtained=await coro(*args)
      result=fargs(_in, obtained)
      return result

 def renew(task, coro, fargs, *args):
     result=task.result()
     task=loop.create_task(coromask(coro, result, fargs))
     task.add_done_callback(functools.partial(renew, task, coro, fargs))

 loop=asyncio.get_event_loop()

 args=[1]
 task1=loop.create_task(coromask(holacoro,args,fargs_holacoro))
 task1.add_done_callback(functools.partial(renew, task1, holacoro, fargs_holacoro))

 args2=[1,2]
 task2=loop.create_task(coromask(sumacoro,args2,fargs_sumacoro))
 task2.add_done_callback(functools.partial(renew, task2, sumacoro, fargs_sumacoro))


 try:
      loop.run_forever()
 except KeyboardInterrupt:
      print('Loop stopped')

Now, those async functions are encapsulated on the Tasktools module inside the taskloop file. Hence, to call them you have to use the tradicional python system to use modules and functions.

1
from tasktools.taskloop import coromask, renew, simple_fargs