PureBasic Forum
http://forums.purebasic.com/english/

[Module] Load-Balanced Worker Threads
http://forums.purebasic.com/english/viewtopic.php?f=12&t=72247
Page 1 of 1

Author:  Env [ Fri Feb 08, 2019 12:00 am ]
Post subject:  [Module] Load-Balanced Worker Threads

Hi folks,

Not sure if this will be useful to someone, but I've put together a module that is designed to control worker threads. You can specify a maximum number of threads to run, and it will distribute added tasks across each of the threads' task queue depending on which has less to do at any one time.

Furthermore, you can specify a Task ID (any number you want which will be passed to the thread), a Task Type (again, any number you want - it's implemented because a procedure might want to be responsible for more than one type of task), a pointer to data (like conventional threads).

Should be straight forward and cross-platform, and please feel free if you see areas to be improved to share them :)

P.S. The multiple queues approach is for future planned features

Example Included
Code:
; ====================================================================================================
; Title:        Load-Balanced Worker Threads Module
; Description:  Distribute tasks across multiple workers.
; Author:       Michael R. King (Env)
; License:      MIT
; Revision:     5

; If you like it, feel free to use it, if you really like it then you can buy me coffee :)
; https://ko-fi.com/mikerking
; ====================================================================================================

; Changelog
; Revision 2: Corrected TaskData to *TaskData in task creation.  Compiler ThreadSafe checking
; Revision 3: Added task cost to balance tasks more effectively
; Revision 4: Task handling events added
; Revision 5: Clearing pending tasks & Terminate all added

CompilerIf #PB_Compiler_Thread = #False
  CompilerError "Please compile with ThreadSafe enabled."
CompilerEndIf

DeclareModule WorkerThreads
 
  #MaximumEvents = 1000
 
  Structure sTaskInfo ; The structure of data to be passed to the worker procedure
    WorkerID.i
    ID.i
    Type.i
    *UserData
  EndStructure
 
  Enumeration BalanceMethod
    #BalanceMethod_QueueSize
    #BalanceMethod_QueueCost
  EndEnumeration
 
  Enumeration TaskCost
    #TaskCost_Minimal
    #TaskCost_Medium
    #TaskCost_High
    #TaskCost_Heavy
  EndEnumeration
 
  Enumeration WorkerEvent
    #Event_None
    #Event_TaskStarted
    #Event_TaskEnded
  EndEnumeration
 
  Prototype pTaskHandler(*TaskData.sTaskInfo)
 
  Declare SetBalanceMethod(Method = #BalanceMethod_QueueSize)                                   ;- Set how the tasks should be distributed.
  Declare SetMaximumThreads(Maximum)                                                            ;- Set the Maximum number of concurrent worker threads.
  Declare AddTask(ID, Type, *TaskData, *TaskHandler.pTaskHandler, TaskCost = #TaskCost_Medium)  ;- Create a task and add it to the queue.
  Declare Update()                                                                              ;- Process the queue.
  Declare TasksRemaining()                                                                      ;- Return how many tasks remain in the queue.
 
  Declare TrackEvents(State = #True)                                                            ;- Turn on/off tracking of worker events.
  Declare GetEvent()                                                                            ;- Get the next event in the events queue.
  Declare.i EventTaskID()                                                                       ;- Get the corresponding task ID of the currently pulled event.
  Declare ClearPending()                                                                        ;- Clear Pending Tasks
  Declare TerminateAll()                                                                        ;- Terminate any running tasks and clear the queue.
 
EndDeclareModule

Module WorkerThreads
 
  Structure sTask
    taskInfo.sTaskInfo
    cost.a
    *thread
    *handler
    pending.a
  EndStructure
 
  Structure sWorker
    workerIndex.i
    List tasks.sTask()
  EndStructure
 
  Structure sWorkerEvent
    eventType.i
    taskID.i
  EndStructure
 
  Structure sWorkerThreads
    *mutex
    maxThreads.i
    balanceMethod.a
    List worker.sWorker()
    currentEvent.sWorkerEvent
    trackEvents.a
    List eventQueue.sWorkerEvent()
  EndStructure
 
  Global gWorkerThreads.sWorkerThreads
  With gWorkerThreads
    \balanceMethod = #BalanceMethod_QueueSize
    \mutex = CreateMutex()
    \maxThreads = 1
  EndWith
 
  Procedure SetBalanceMethod(Method = #BalanceMethod_QueueSize)
    With gWorkerThreads
      \balanceMethod = Method
    EndWith
  EndProcedure
 
  Procedure SetMaximumThreads(Maximum)
    If Maximum > 0
      gWorkerThreads\maxThreads = Maximum
    EndIf
  EndProcedure
 
  Procedure GetWorkerTotalCost()
    Protected.i cost
    With gWorkerThreads\worker()
      ForEach \tasks()
        cost = cost + \tasks()\cost
      Next
    EndWith
    ProcedureReturn cost
  EndProcedure
 
  Procedure AddEvent(evtType, evtTaskID)
    With gWorkerThreads
      If \trackEvents = #True
        If ListSize(\eventQueue()) < #MaximumEvents
          LastElement(\eventQueue())
          AddElement(\eventQueue())
          \eventQueue()\eventType = evtType
          \eventQueue()\taskID = evtTaskID
        EndIf
      EndIf
    EndWith     
  EndProcedure
 
  Procedure TrackEvents(State = #True)
    If State
      gWorkerThreads\trackEvents = #True
    Else
      gWorkerThreads\trackEvents = #False
    EndIf
  EndProcedure
 
  Procedure GetEvent()
    With gWorkerThreads
      LockMutex(\mutex)
      If ListSize(\eventQueue()) > 0
        FirstElement(\eventQueue())
        \currentEvent\eventType = \eventQueue()\eventType
        \currentEvent\taskID = \eventQueue()\taskID
        DeleteElement(\eventQueue())
      Else
        \currentEvent\eventType = #Event_None
        \currentEvent\taskID = 0
      EndIf
      UnlockMutex(\mutex)
      ProcedureReturn \currentEvent\eventType
    EndWith
  EndProcedure
 
  Procedure EventTaskID()
    With gWorkerThreads
      ProcedureReturn \currentEvent\taskID
    EndWith
  EndProcedure
 
  Procedure AddTask(ID, Type, *TaskData, *TaskHandler.pTaskHandler, TaskCost = #TaskCost_Medium)
    With gWorkerThreads
      LockMutex(\mutex)
      If ListSize(\worker()) < \maxThreads
        AddElement(\worker())
        \worker()\workerIndex = ListIndex(\worker())
        LastElement(\worker()\tasks())
        AddElement(\worker()\tasks())
        \worker()\tasks()\taskInfo\WorkerID = \worker()\workerIndex
        \worker()\tasks()\taskInfo\ID = ID
        \worker()\tasks()\taskInfo\Type = Type
        \worker()\tasks()\taskInfo\UserData = *TaskData
        \worker()\tasks()\cost = TaskCost
        \worker()\tasks()\pending = #True
        \worker()\tasks()\handler = *TaskHandler
      Else
        Define ix.i, minCount.i = 1000000, bestWorker.i, cost.i, workers.i
        If \maxThreads < ListSize(\worker())
          workers = \maxThreads
        Else
          workers = ListSize(\worker())
        EndIf       
        For ix = 0 To workers - 1
          SelectElement(\worker(), ix)
          Select \balanceMethod
            Case #BalanceMethod_QueueSize
              If ListSize(\worker()\tasks()) < minCount
                minCount = ListSize(\worker()\tasks())
                bestWorker = ix
              EndIf
            Case #BalanceMethod_QueueCost
              cost = GetWorkerTotalCost()
              If cost < minCount
                minCount = cost
                bestWorker = ix
              EndIf
          EndSelect
        Next
        SelectElement(\worker(), bestWorker)
        LastElement(\worker()\tasks())
        AddElement(\worker()\tasks())
        \worker()\tasks()\taskInfo\WorkerID = \worker()\workerIndex
        \worker()\tasks()\taskInfo\ID = ID
        \worker()\tasks()\taskInfo\Type = Type
        \worker()\tasks()\taskInfo\UserData = *TaskData
        \worker()\tasks()\cost = TaskCost
        \worker()\tasks()\pending = #True
        \worker()\tasks()\handler = *TaskHandler       
      EndIf
      UnlockMutex(\mutex)
    EndWith
  EndProcedure
 
  Procedure Update()
    With gWorkerThreads
      LockMutex(\mutex)
      ForEach \worker()
        If ListSize(\worker()\tasks()) > 0
          FirstElement(\worker()\tasks())
          If \worker()\tasks()\pending
            \worker()\tasks()\pending = #False
            AddEvent(#Event_TaskStarted, \worker()\tasks()\taskInfo\ID)
            \worker()\tasks()\thread = CreateThread(\worker()\tasks()\handler, @\worker()\tasks()\taskInfo)
          Else
            If IsThread(\worker()\tasks()\thread) = 0
              AddEvent(#Event_TaskEnded, \worker()\tasks()\taskInfo\ID)
              DeleteElement(\worker()\tasks())
            EndIf
          EndIf
        EndIf
      Next
      UnlockMutex(\mutex)
    EndWith
  EndProcedure
 
  Procedure TasksRemaining()
    Protected.i count
    With gWorkerThreads
      LockMutex(\mutex)
      ForEach \worker()
        count = count + ListSize(\worker()\tasks())
      Next
      UnlockMutex(\mutex)
    EndWith
    ProcedureReturn count
  EndProcedure
 
  Procedure ClearPending()
    With gWorkerThreads
      LockMutex(\mutex)
      ForEach \worker()
        ForEach \worker()\tasks()
          If \worker()\tasks()\pending
            DeleteElement(\worker()\tasks())
          EndIf
        Next
      Next
      UnlockMutex(\mutex)
    EndWith
  EndProcedure
 
  Procedure TerminateAll()
    With gWorkerThreads
      LockMutex(\mutex)
      ForEach \worker()
        ForEach \worker()\tasks()
          If \worker()\tasks()\pending = #False
            If IsThread(\worker()\tasks()\thread)
              KillThread(\worker()\tasks()\thread)
            EndIf
          EndIf
        Next
        ClearList(\worker()\tasks())
      Next
      UnlockMutex(\mutex)
    EndWith
  EndProcedure
 
EndModule

CompilerIf #PB_Compiler_IsMainFile
 
  ; ========= EXAMPLE =========
 
  ; Make sure only 10 threads run at the same time (It will be 1 by default)
  WorkerThreads::SetMaximumThreads(10)
 
  ; Define a Worker procedure
  Procedure TestWorker(*Worker.WorkerThreads::sTaskInfo)
    With *Worker
      Debug "Task " + Str(\ID) + " started on worker " + Str(\WorkerID)
     
      Select *Worker\Type
        Case WorkerThreads::#TaskCost_Minimal
          Delay(Random(500))
        Case WorkerThreads::#TaskCost_Medium
          Delay(Random(1000))
        Case WorkerThreads::#TaskCost_High
          Delay(Random(2000))
        Case WorkerThreads::#TaskCost_Heavy
          Delay(Random(3000))
      EndSelect
     
      Debug "Task " + Str(\ID) + " finished on worker " + Str(\WorkerID) + ". Total tasks remaining: " + Str(WorkerThreads::TasksRemaining() - 1)
    EndWith
  EndProcedure
 
  ; Distribute tasks across workers depending on cost, not worker queue size
  WorkerThreads::SetBalanceMethod(WorkerThreads::#BalanceMethod_QueueCost)
 
  ; Create some tasks to queue, setting a random task cost.
  Define.i W, Cost
  For W = 1 To 40
    Cost = Random(WorkerThreads::#TaskCost_Heavy)
    WorkerThreads::AddTask(W, Cost, #Null, @TestWorker(), Cost)
  Next
 
  ; Enable Event Tracking
  WorkerThreads::TrackEvents(#True)
 
  ; Main loop - Handle until no tasks remain
  While WorkerThreads::TasksRemaining() > 0
    WorkerThreads::Update()
   
    ; Event Handling
    Select WorkerThreads::GetEvent()
      Case WorkerThreads::#Event_TaskStarted
        Debug "[Event] Task " + Str(WorkerThreads::EventTaskID()) + " has started."
      Case WorkerThreads::#Event_TaskEnded
        Debug "[Event] Task " + Str(WorkerThreads::EventTaskID()) + " has ended."
    EndSelect
   
    Delay(1)
  Wend
 
CompilerEndIf

Author:  Little John [ Fri Feb 08, 2019 12:09 pm ]
Post subject:  Re: [Module] Load-Balanced Worker Threads

Thank you for the code!

If you write EnableExplicit directly after the line
Code:
Module WorkerThreads
then you'll find 2 bugs. :-)

Author:  Env [ Fri Feb 08, 2019 12:31 pm ]
Post subject:  Re: [Module] Load-Balanced Worker Threads

Little John wrote:
Thank you for the code!

If you write EnableExplicit directly after the line
Code:
Module WorkerThreads
then you'll find 2 bugs. :-)



Good catch! and schoolboy error on my part. Code updated :)

Author:  Env [ Fri Feb 08, 2019 1:48 pm ]
Post subject:  Re: [Module] Load-Balanced Worker Threads

Update (Revision 3)

Added ability to specify how much time a task will cost.

Added the ability to define how to balance tasks across the workers:
    #BalanceMethod_QueueSize - Will balance tasks equally across the worker queues by quantity.
    #BalanceMethod_QueueCost - Will balance tasks equally across the worker queues by the total Task Cost of all tasks in each queue.

Author:  blueb [ Fri Feb 08, 2019 1:56 pm ]
Post subject:  Re: [Module] Load-Balanced Worker Threads

Thanks Michael,

Nice Code.

Any reason 'why' you limited the threads to 5 :?:

Testing, I did a little experiment and could not find a problem with a higher threadcount.

I added the code below and found no issues ( I have a maximum of 32 CPU cores available )

Code:
Procedure GetCores(Choice=0)
   If Choice = 1
      ProcedureReturn CountCPUs(#PB_System_CPUs)
   Else
      ProcedureReturn CountCPUs(#PB_System_ProcessCPUs)
   EndIf
EndProcedure
Debug GetCores()
Debug GetCores(0) 
Debug GetCores(1)
   ; Make sure only 5 threads run at the same time (It will be 1 by default)
   ;WorkerThreads::SetMaximumThreads(5)
  WorkerThreads::SetMaximumThreads(GetCores())


Thanks again.

Author:  Env [ Fri Feb 08, 2019 2:02 pm ]
Post subject:  Re: [Module] Load-Balanced Worker Threads

blueb wrote:
Thanks Michael,

Nice Code.

Any reason 'why' you limited the threads to 5 :?:

Testing, I did a little experiment and could not find a problem with a higher threadcount.

I added the code below and found not issues ( I have a 32 CPU cores available )

Code:
Procedure GetCores(Choice=0)
   If Choice = 1
      ProcedureReturn CountCPUs(#PB_System_CPUs)
   Else
      ProcedureReturn CountCPUs(#PB_System_ProcessCPUs)
   EndIf
EndProcedure
Debug GetCores()
Debug GetCores(0) 
Debug GetCores(1)
   ; Make sure only 5 threads run at the same time (It will be 1 by default)
   ;WorkerThreads::SetMaximumThreads(5)
  WorkerThreads::SetMaximumThreads(GetCores())


Thanks again.


Hi Blueb :)

No specific reason - purely for example purposes :)

Nice example! And yes, I've tested with 100 threads with 400 queued tasks and found no issue, so it really is down to the end-user how wild they want to get

Author:  davido [ Fri Feb 08, 2019 2:55 pm ]
Post subject:  Re: [Module] Load-Balanced Worker Threads

@Env,
I always have trouble with threads.
I'll feel a lot more confident, now. :D
Excellent!
Thank for sharing.

Author:  Env [ Fri Feb 08, 2019 3:03 pm ]
Post subject:  Re: [Module] Load-Balanced Worker Threads

davido wrote:
@Env,
I always have trouble with threads.
I'll feel a lot more confident, now. :D
Excellent!
Thank for sharing.


Great to hear! Threads have always been a bit of a scary territory for me when making anything, which is a reason why I wanted to create a drop-in solution to be used across applications to make life easier haha.

Update (Revision 4)

Added events queue to be accessed from main thread to track when a task (by specified ID) has started and ended.
This can be activated by using
Code:
WorkerThreads::TrackEvents(#True)


NB Update still needs to be called with your main loop regardless of if you call GetEvent

Author:  Env [ Tue Feb 12, 2019 1:02 pm ]
Post subject:  Re: [Module] Load-Balanced Worker Threads

Update (Revision 5)

    - Added method 'ClearPending()' - Will remove any pending tasks from all the worker queues.
    - Added method 'TerminateAll()' - Will kill any running threads, and then clear the queues.

Page 1 of 1 All times are UTC + 1 hour
Powered by phpBB © 2000, 2002, 2005, 2007 phpBB Group
http://www.phpbb.com/