The Walking Stick for The Old Scheduler: The Assignator¶
Here is useful to understand how works the queues and shared dicts or list on memory. Using the manager from multiprocessing library you can activate this special objects.
The TaskAssignator class works with an instance of TaskScheduler’s subclass implemented specifically for your goal.
This class handle the list of new data incoming and assign the task to some specifically worker. Register the action to future control. Simple but effective.
Also you can control if you only need to active a group(for test purposes) or all the list of fata.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | async def new_process(self, queue_tasks):
"""
This coroutine activate a process with new station task
Check every \ts\ seconds the queue_tasks if there are new
stations or tasksto add
Args:
:param queue_tasks: a queue to put task ids
"""
await asyncio.sleep(self.ts)
scheduler = self.scheduler
sta_assigned = self.sta_assigned
dt_status = self.dt_status
dt_group = self.dt_group
msg_in = []
try:
tasks = []
W=0
if not queue_tasks.empty():
for i in range(queue_tasks.qsize()):
ids = queue_tasks.get()
scheduler.status_tasks[ids] = True
scheduler.sta_init[ids] = True
if ids in scheduler.stations.keys():
q=0
for ipt in scheduler.proc_tasks.keys():
q+=1
if len(scheduler.proc_tasks[ipt])<scheduler.lnproc and \
not ids in sta_assigned:
if dt_status == 'GROUP':
if scheduler.stations[ids]['code'] in dt_group:
scheduler.add_task(ids, ipt)
scheduler.set_init(ids)
sta_assigned.append(ids)
ans="TASK %s ADDED TO %s" % (ids, ipt)
elif dt_status == 'ALL':
scheduler.add_task(ids, ipt)
scheduler.set_init(ids)
sta_assigned.append(ids)
ans = "TASK %s ADDED TO %s" % (ids, ipt)
queue_tasks.task_done()
except Exception as exec:
bprint("Error en asignación de tareas a procesador: %s" %exec)
raise exec
|