[Module] Load-Balanced Worker Threads

Share your advanced PureBasic knowledge/code with the community.
Env
Enthusiast
Enthusiast
Posts: 151
Joined: Tue Apr 27, 2010 3:20 pm
Location: Wales, United Kingdom

[Module] Load-Balanced Worker Threads

Post by Env »

Hi folks,

Not sure if this will be useful to someone, but I've put together a module that is designed to control worker threads. You can specify a maximum number of threads to run, and it will distribute added tasks across each of the threads' task queue depending on which has less to do at any one time.

Furthermore, you can specify a Task ID (any number you want which will be passed to the thread), a Task Type (again, any number you want - it's implemented because a procedure might want to be responsible for more than one type of task), a pointer to data (like conventional threads).

Should be straight forward and cross-platform, and please feel free if you see areas to be improved to share them :)

P.S. The multiple queues approach is for future planned features

Example Included

Code: Select all

; ====================================================================================================
; Title:        Load-Balanced Worker Threads Module
; Description:  Distribute tasks across multiple workers.
; Author:       Michael R. King (Env)
; License:      MIT
; Revision:     5

; If you like it, feel free to use it, if you really like it then you can buy me coffee :)
; https://ko-fi.com/mikerking
; ====================================================================================================

; Changelog
; Revision 2: Corrected TaskData to *TaskData in task creation.  Compiler ThreadSafe checking
; Revision 3: Added task cost to balance tasks more effectively
; Revision 4: Task handling events added
; Revision 5: Clearing pending tasks & Terminate all added

CompilerIf #PB_Compiler_Thread = #False
  CompilerError "Please compile with ThreadSafe enabled."
CompilerEndIf

DeclareModule WorkerThreads
  
  #MaximumEvents = 1000
  
  Structure sTaskInfo ; The structure of data to be passed to the worker procedure
    WorkerID.i
    ID.i
    Type.i
    *UserData
  EndStructure
  
  Enumeration BalanceMethod
    #BalanceMethod_QueueSize
    #BalanceMethod_QueueCost
  EndEnumeration
  
  Enumeration TaskCost
    #TaskCost_Minimal
    #TaskCost_Medium
    #TaskCost_High
    #TaskCost_Heavy
  EndEnumeration
  
  Enumeration WorkerEvent
    #Event_None
    #Event_TaskStarted
    #Event_TaskEnded
  EndEnumeration
  
  Prototype pTaskHandler(*TaskData.sTaskInfo)
  
  Declare SetBalanceMethod(Method = #BalanceMethod_QueueSize)                                   ;- Set how the tasks should be distributed.
  Declare SetMaximumThreads(Maximum)                                                            ;- Set the Maximum number of concurrent worker threads.
  Declare AddTask(ID, Type, *TaskData, *TaskHandler.pTaskHandler, TaskCost = #TaskCost_Medium)  ;- Create a task and add it to the queue.
  Declare Update()                                                                              ;- Process the queue.
  Declare TasksRemaining()                                                                      ;- Return how many tasks remain in the queue.
  
  Declare TrackEvents(State = #True)                                                            ;- Turn on/off tracking of worker events.
  Declare GetEvent()                                                                            ;- Get the next event in the events queue.
  Declare.i EventTaskID()                                                                       ;- Get the corresponding task ID of the currently pulled event.
  Declare ClearPending()                                                                        ;- Clear Pending Tasks
  Declare TerminateAll()                                                                        ;- Terminate any running tasks and clear the queue.
  
EndDeclareModule

Module WorkerThreads
  
  Structure sTask
    taskInfo.sTaskInfo
    cost.a
    *thread
    *handler
    pending.a
  EndStructure
  
  Structure sWorker
    workerIndex.i
    List tasks.sTask()
  EndStructure
  
  Structure sWorkerEvent
    eventType.i
    taskID.i
  EndStructure
  
  Structure sWorkerThreads
    *mutex
    maxThreads.i
    balanceMethod.a
    List worker.sWorker()
    currentEvent.sWorkerEvent
    trackEvents.a
    List eventQueue.sWorkerEvent()
  EndStructure
  
  Global gWorkerThreads.sWorkerThreads
  With gWorkerThreads
    \balanceMethod = #BalanceMethod_QueueSize
    \mutex = CreateMutex()
    \maxThreads = 1
  EndWith
  
  Procedure SetBalanceMethod(Method = #BalanceMethod_QueueSize)
    With gWorkerThreads
      \balanceMethod = Method
    EndWith
  EndProcedure
  
  Procedure SetMaximumThreads(Maximum)
    If Maximum > 0
      gWorkerThreads\maxThreads = Maximum
    EndIf
  EndProcedure
  
  Procedure GetWorkerTotalCost()
    Protected.i cost
    With gWorkerThreads\worker()
      ForEach \tasks()
        cost = cost + \tasks()\cost
      Next
    EndWith
    ProcedureReturn cost
  EndProcedure
  
  Procedure AddEvent(evtType, evtTaskID)
    With gWorkerThreads
      If \trackEvents = #True
        If ListSize(\eventQueue()) < #MaximumEvents
          LastElement(\eventQueue())
          AddElement(\eventQueue())
          \eventQueue()\eventType = evtType
          \eventQueue()\taskID = evtTaskID
        EndIf
      EndIf
    EndWith      
  EndProcedure
  
  Procedure TrackEvents(State = #True)
    If State
      gWorkerThreads\trackEvents = #True
    Else
      gWorkerThreads\trackEvents = #False
    EndIf
  EndProcedure
  
  Procedure GetEvent()
    With gWorkerThreads
      LockMutex(\mutex)
      If ListSize(\eventQueue()) > 0
        FirstElement(\eventQueue())
        \currentEvent\eventType = \eventQueue()\eventType
        \currentEvent\taskID = \eventQueue()\taskID
        DeleteElement(\eventQueue())
      Else
        \currentEvent\eventType = #Event_None
        \currentEvent\taskID = 0
      EndIf
      UnlockMutex(\mutex)
      ProcedureReturn \currentEvent\eventType
    EndWith
  EndProcedure
  
  Procedure EventTaskID()
    With gWorkerThreads
      ProcedureReturn \currentEvent\taskID
    EndWith
  EndProcedure
  
  Procedure AddTask(ID, Type, *TaskData, *TaskHandler.pTaskHandler, TaskCost = #TaskCost_Medium)
    With gWorkerThreads
      LockMutex(\mutex)
      If ListSize(\worker()) < \maxThreads
        AddElement(\worker())
        \worker()\workerIndex = ListIndex(\worker())
        LastElement(\worker()\tasks())
        AddElement(\worker()\tasks())
        \worker()\tasks()\taskInfo\WorkerID = \worker()\workerIndex
        \worker()\tasks()\taskInfo\ID = ID
        \worker()\tasks()\taskInfo\Type = Type
        \worker()\tasks()\taskInfo\UserData = *TaskData
        \worker()\tasks()\cost = TaskCost
        \worker()\tasks()\pending = #True
        \worker()\tasks()\handler = *TaskHandler
      Else
        Define ix.i, minCount.i = 1000000, bestWorker.i, cost.i, workers.i
        If \maxThreads < ListSize(\worker())
          workers = \maxThreads
        Else
          workers = ListSize(\worker())
        EndIf        
        For ix = 0 To workers - 1
          SelectElement(\worker(), ix)
          Select \balanceMethod
            Case #BalanceMethod_QueueSize
              If ListSize(\worker()\tasks()) < minCount
                minCount = ListSize(\worker()\tasks())
                bestWorker = ix
              EndIf
            Case #BalanceMethod_QueueCost
              cost = GetWorkerTotalCost()
              If cost < minCount
                minCount = cost
                bestWorker = ix
              EndIf
          EndSelect
        Next
        SelectElement(\worker(), bestWorker)
        LastElement(\worker()\tasks())
        AddElement(\worker()\tasks())
        \worker()\tasks()\taskInfo\WorkerID = \worker()\workerIndex
        \worker()\tasks()\taskInfo\ID = ID
        \worker()\tasks()\taskInfo\Type = Type
        \worker()\tasks()\taskInfo\UserData = *TaskData
        \worker()\tasks()\cost = TaskCost
        \worker()\tasks()\pending = #True
        \worker()\tasks()\handler = *TaskHandler        
      EndIf
      UnlockMutex(\mutex)
    EndWith
  EndProcedure
  
  Procedure Update()
    With gWorkerThreads
      LockMutex(\mutex)
      ForEach \worker()
        If ListSize(\worker()\tasks()) > 0
          FirstElement(\worker()\tasks())
          If \worker()\tasks()\pending
            \worker()\tasks()\pending = #False
            AddEvent(#Event_TaskStarted, \worker()\tasks()\taskInfo\ID)
            \worker()\tasks()\thread = CreateThread(\worker()\tasks()\handler, @\worker()\tasks()\taskInfo)
          Else
            If IsThread(\worker()\tasks()\thread) = 0
              AddEvent(#Event_TaskEnded, \worker()\tasks()\taskInfo\ID)
              DeleteElement(\worker()\tasks())
            EndIf
          EndIf
        EndIf
      Next
      UnlockMutex(\mutex)
    EndWith
  EndProcedure
  
  Procedure TasksRemaining()
    Protected.i count
    With gWorkerThreads
      LockMutex(\mutex)
      ForEach \worker()
        count = count + ListSize(\worker()\tasks())
      Next
      UnlockMutex(\mutex)
    EndWith
    ProcedureReturn count
  EndProcedure
  
  Procedure ClearPending()
    With gWorkerThreads
      LockMutex(\mutex)
      ForEach \worker()
        ForEach \worker()\tasks()
          If \worker()\tasks()\pending
            DeleteElement(\worker()\tasks())
          EndIf
        Next
      Next
      UnlockMutex(\mutex)
    EndWith
  EndProcedure
  
  Procedure TerminateAll()
    With gWorkerThreads
      LockMutex(\mutex)
      ForEach \worker()
        ForEach \worker()\tasks()
          If \worker()\tasks()\pending = #False
            If IsThread(\worker()\tasks()\thread)
              KillThread(\worker()\tasks()\thread)
            EndIf
          EndIf
        Next
        ClearList(\worker()\tasks())
      Next
      UnlockMutex(\mutex)
    EndWith
  EndProcedure
  
EndModule

CompilerIf #PB_Compiler_IsMainFile
  
  ; ========= EXAMPLE =========
  
  ; Make sure only 10 threads run at the same time (It will be 1 by default)
  WorkerThreads::SetMaximumThreads(10)
  
  ; Define a Worker procedure
  Procedure TestWorker(*Worker.WorkerThreads::sTaskInfo)
    With *Worker
      Debug "Task " + Str(\ID) + " started on worker " + Str(\WorkerID)
      
      Select *Worker\Type
        Case WorkerThreads::#TaskCost_Minimal
          Delay(Random(500))
        Case WorkerThreads::#TaskCost_Medium
          Delay(Random(1000))
        Case WorkerThreads::#TaskCost_High
          Delay(Random(2000))
        Case WorkerThreads::#TaskCost_Heavy
          Delay(Random(3000))
      EndSelect
      
      Debug "Task " + Str(\ID) + " finished on worker " + Str(\WorkerID) + ". Total tasks remaining: " + Str(WorkerThreads::TasksRemaining() - 1)
    EndWith
  EndProcedure
  
  ; Distribute tasks across workers depending on cost, not worker queue size
  WorkerThreads::SetBalanceMethod(WorkerThreads::#BalanceMethod_QueueCost)
  
  ; Create some tasks to queue, setting a random task cost.
  Define.i W, Cost
  For W = 1 To 40
    Cost = Random(WorkerThreads::#TaskCost_Heavy)
    WorkerThreads::AddTask(W, Cost, #Null, @TestWorker(), Cost)
  Next
  
  ; Enable Event Tracking
  WorkerThreads::TrackEvents(#True)
  
  ; Main loop - Handle until no tasks remain 
  While WorkerThreads::TasksRemaining() > 0
    WorkerThreads::Update()
    
    ; Event Handling
    Select WorkerThreads::GetEvent()
      Case WorkerThreads::#Event_TaskStarted
        Debug "[Event] Task " + Str(WorkerThreads::EventTaskID()) + " has started."
      Case WorkerThreads::#Event_TaskEnded
        Debug "[Event] Task " + Str(WorkerThreads::EventTaskID()) + " has ended."
    EndSelect
    
    Delay(1)
  Wend
  
CompilerEndIf
Last edited by Env on Tue Feb 12, 2019 1:28 pm, edited 11 times in total.
Thanks!
Little John
Addict
Addict
Posts: 4519
Joined: Thu Jun 07, 2007 3:25 pm
Location: Berlin, Germany

Re: [Module] Load-Balanced Worker Threads

Post by Little John »

Thank you for the code!

If you write EnableExplicit directly after the line

Code: Select all

Module WorkerThreads
then you'll find 2 bugs. :-)
Env
Enthusiast
Enthusiast
Posts: 151
Joined: Tue Apr 27, 2010 3:20 pm
Location: Wales, United Kingdom

Re: [Module] Load-Balanced Worker Threads

Post by Env »

Little John wrote:Thank you for the code!

If you write EnableExplicit directly after the line

Code: Select all

Module WorkerThreads
then you'll find 2 bugs. :-)

Good catch! and schoolboy error on my part. Code updated :)
Thanks!
Env
Enthusiast
Enthusiast
Posts: 151
Joined: Tue Apr 27, 2010 3:20 pm
Location: Wales, United Kingdom

Re: [Module] Load-Balanced Worker Threads

Post by Env »

Update (Revision 3)

Added ability to specify how much time a task will cost.

Added the ability to define how to balance tasks across the workers:
  • #BalanceMethod_QueueSize - Will balance tasks equally across the worker queues by quantity.
    #BalanceMethod_QueueCost - Will balance tasks equally across the worker queues by the total Task Cost of all tasks in each queue.
Last edited by Env on Fri Feb 08, 2019 3:44 pm, edited 1 time in total.
Thanks!
User avatar
blueb
Addict
Addict
Posts: 1041
Joined: Sat Apr 26, 2003 2:15 pm
Location: Cuernavaca, Mexico

Re: [Module] Load-Balanced Worker Threads

Post by blueb »

Thanks Michael,

Nice Code.

Any reason 'why' you limited the threads to 5 :?:

Testing, I did a little experiment and could not find a problem with a higher threadcount.

I added the code below and found no issues ( I have a maximum of 32 CPU cores available )

Code: Select all

Procedure GetCores(Choice=0)
   If Choice = 1
      ProcedureReturn CountCPUs(#PB_System_CPUs)
   Else
      ProcedureReturn CountCPUs(#PB_System_ProcessCPUs)
   EndIf
EndProcedure 
Debug GetCores() 
Debug GetCores(0)  
Debug GetCores(1)
   ; Make sure only 5 threads run at the same time (It will be 1 by default)
   ;WorkerThreads::SetMaximumThreads(5)
  WorkerThreads::SetMaximumThreads(GetCores())
Thanks again.
- It was too lonely at the top.

System : PB 6.10 Beta 9 (x64) and Win Pro 11 (x64)
Hardware: AMD Ryzen 9 5900X w/64 gigs Ram, AMD RX 6950 XT Graphics w/16gigs Mem
Env
Enthusiast
Enthusiast
Posts: 151
Joined: Tue Apr 27, 2010 3:20 pm
Location: Wales, United Kingdom

Re: [Module] Load-Balanced Worker Threads

Post by Env »

blueb wrote:Thanks Michael,

Nice Code.

Any reason 'why' you limited the threads to 5 :?:

Testing, I did a little experiment and could not find a problem with a higher threadcount.

I added the code below and found not issues ( I have a 32 CPU cores available )

Code: Select all

Procedure GetCores(Choice=0)
   If Choice = 1
      ProcedureReturn CountCPUs(#PB_System_CPUs)
   Else
      ProcedureReturn CountCPUs(#PB_System_ProcessCPUs)
   EndIf
EndProcedure 
Debug GetCores() 
Debug GetCores(0)  
Debug GetCores(1)
   ; Make sure only 5 threads run at the same time (It will be 1 by default)
   ;WorkerThreads::SetMaximumThreads(5)
  WorkerThreads::SetMaximumThreads(GetCores())
Thanks again.
Hi Blueb :)

No specific reason - purely for example purposes :)

Nice example! And yes, I've tested with 100 threads with 400 queued tasks and found no issue, so it really is down to the end-user how wild they want to get
Thanks!
davido
Addict
Addict
Posts: 1890
Joined: Fri Nov 09, 2012 11:04 pm
Location: Uttoxeter, UK

Re: [Module] Load-Balanced Worker Threads

Post by davido »

@Env,
I always have trouble with threads.
I'll feel a lot more confident, now. :D
Excellent!
Thank for sharing.
DE AA EB
Env
Enthusiast
Enthusiast
Posts: 151
Joined: Tue Apr 27, 2010 3:20 pm
Location: Wales, United Kingdom

Re: [Module] Load-Balanced Worker Threads

Post by Env »

davido wrote:@Env,
I always have trouble with threads.
I'll feel a lot more confident, now. :D
Excellent!
Thank for sharing.
Great to hear! Threads have always been a bit of a scary territory for me when making anything, which is a reason why I wanted to create a drop-in solution to be used across applications to make life easier haha.

Update (Revision 4)

Added events queue to be accessed from main thread to track when a task (by specified ID) has started and ended.
This can be activated by using

Code: Select all

WorkerThreads::TrackEvents(#True)
NB Update still needs to be called with your main loop regardless of if you call GetEvent
Thanks!
Env
Enthusiast
Enthusiast
Posts: 151
Joined: Tue Apr 27, 2010 3:20 pm
Location: Wales, United Kingdom

Re: [Module] Load-Balanced Worker Threads

Post by Env »

Update (Revision 5)
  • - Added method 'ClearPending()' - Will remove any pending tasks from all the worker queues.
    - Added method 'TerminateAll()' - Will kill any running threads, and then clear the queues.
Thanks!
User avatar
pdwyer
Addict
Addict
Posts: 2813
Joined: Tue May 08, 2007 1:27 pm
Location: Chiba, Japan

Re: [Module] Load-Balanced Worker Threads

Post by pdwyer »

Thanks for this!

I have a need for thread queuing and thought I'd check the forums before writing something myself to do this.
I'll give it a go and let you know if I have an feedback.

Looks great so far!
Paul Dwyer

“In nature, it’s not the strongest nor the most intelligent who survives. It’s the most adaptable to change” - Charles Darwin
“If you can't explain it to a six-year old you really don't understand it yourself.” - Albert Einstein
Post Reply