It is currently Tue Dec 10, 2019 9:30 am

All times are UTC + 1 hour




Post new topic Reply to topic  [ 9 posts ] 
Author Message
 Post subject: [Module] Load-Balanced Worker Threads
PostPosted: Fri Feb 08, 2019 12:00 am 
Offline
Enthusiast
Enthusiast
User avatar

Joined: Tue Apr 27, 2010 3:20 pm
Posts: 144
Location: Wales, United Kingdom
Hi folks,

Not sure if this will be useful to someone, but I've put together a module that is designed to control worker threads. You can specify a maximum number of threads to run, and it will distribute added tasks across each of the threads' task queue depending on which has less to do at any one time.

Furthermore, you can specify a Task ID (any number you want which will be passed to the thread), a Task Type (again, any number you want - it's implemented because a procedure might want to be responsible for more than one type of task), a pointer to data (like conventional threads).

Should be straight forward and cross-platform, and please feel free if you see areas to be improved to share them :)

P.S. The multiple queues approach is for future planned features

Example Included
Code:
; ====================================================================================================
; Title:        Load-Balanced Worker Threads Module
; Description:  Distribute tasks across multiple workers.
; Author:       Michael R. King (Env)
; License:      MIT
; Revision:     5

; If you like it, feel free to use it, if you really like it then you can buy me coffee :)
; https://ko-fi.com/mikerking
; ====================================================================================================

; Changelog
; Revision 2: Corrected TaskData to *TaskData in task creation.  Compiler ThreadSafe checking
; Revision 3: Added task cost to balance tasks more effectively
; Revision 4: Task handling events added
; Revision 5: Clearing pending tasks & Terminate all added

CompilerIf #PB_Compiler_Thread = #False
  CompilerError "Please compile with ThreadSafe enabled."
CompilerEndIf

DeclareModule WorkerThreads
 
  #MaximumEvents = 1000
 
  Structure sTaskInfo ; The structure of data to be passed to the worker procedure
    WorkerID.i
    ID.i
    Type.i
    *UserData
  EndStructure
 
  Enumeration BalanceMethod
    #BalanceMethod_QueueSize
    #BalanceMethod_QueueCost
  EndEnumeration
 
  Enumeration TaskCost
    #TaskCost_Minimal
    #TaskCost_Medium
    #TaskCost_High
    #TaskCost_Heavy
  EndEnumeration
 
  Enumeration WorkerEvent
    #Event_None
    #Event_TaskStarted
    #Event_TaskEnded
  EndEnumeration
 
  Prototype pTaskHandler(*TaskData.sTaskInfo)
 
  Declare SetBalanceMethod(Method = #BalanceMethod_QueueSize)                                   ;- Set how the tasks should be distributed.
  Declare SetMaximumThreads(Maximum)                                                            ;- Set the Maximum number of concurrent worker threads.
  Declare AddTask(ID, Type, *TaskData, *TaskHandler.pTaskHandler, TaskCost = #TaskCost_Medium)  ;- Create a task and add it to the queue.
  Declare Update()                                                                              ;- Process the queue.
  Declare TasksRemaining()                                                                      ;- Return how many tasks remain in the queue.
 
  Declare TrackEvents(State = #True)                                                            ;- Turn on/off tracking of worker events.
  Declare GetEvent()                                                                            ;- Get the next event in the events queue.
  Declare.i EventTaskID()                                                                       ;- Get the corresponding task ID of the currently pulled event.
  Declare ClearPending()                                                                        ;- Clear Pending Tasks
  Declare TerminateAll()                                                                        ;- Terminate any running tasks and clear the queue.
 
EndDeclareModule

Module WorkerThreads
 
  Structure sTask
    taskInfo.sTaskInfo
    cost.a
    *thread
    *handler
    pending.a
  EndStructure
 
  Structure sWorker
    workerIndex.i
    List tasks.sTask()
  EndStructure
 
  Structure sWorkerEvent
    eventType.i
    taskID.i
  EndStructure
 
  Structure sWorkerThreads
    *mutex
    maxThreads.i
    balanceMethod.a
    List worker.sWorker()
    currentEvent.sWorkerEvent
    trackEvents.a
    List eventQueue.sWorkerEvent()
  EndStructure
 
  Global gWorkerThreads.sWorkerThreads
  With gWorkerThreads
    \balanceMethod = #BalanceMethod_QueueSize
    \mutex = CreateMutex()
    \maxThreads = 1
  EndWith
 
  Procedure SetBalanceMethod(Method = #BalanceMethod_QueueSize)
    With gWorkerThreads
      \balanceMethod = Method
    EndWith
  EndProcedure
 
  Procedure SetMaximumThreads(Maximum)
    If Maximum > 0
      gWorkerThreads\maxThreads = Maximum
    EndIf
  EndProcedure
 
  Procedure GetWorkerTotalCost()
    Protected.i cost
    With gWorkerThreads\worker()
      ForEach \tasks()
        cost = cost + \tasks()\cost
      Next
    EndWith
    ProcedureReturn cost
  EndProcedure
 
  Procedure AddEvent(evtType, evtTaskID)
    With gWorkerThreads
      If \trackEvents = #True
        If ListSize(\eventQueue()) < #MaximumEvents
          LastElement(\eventQueue())
          AddElement(\eventQueue())
          \eventQueue()\eventType = evtType
          \eventQueue()\taskID = evtTaskID
        EndIf
      EndIf
    EndWith     
  EndProcedure
 
  Procedure TrackEvents(State = #True)
    If State
      gWorkerThreads\trackEvents = #True
    Else
      gWorkerThreads\trackEvents = #False
    EndIf
  EndProcedure
 
  Procedure GetEvent()
    With gWorkerThreads
      LockMutex(\mutex)
      If ListSize(\eventQueue()) > 0
        FirstElement(\eventQueue())
        \currentEvent\eventType = \eventQueue()\eventType
        \currentEvent\taskID = \eventQueue()\taskID
        DeleteElement(\eventQueue())
      Else
        \currentEvent\eventType = #Event_None
        \currentEvent\taskID = 0
      EndIf
      UnlockMutex(\mutex)
      ProcedureReturn \currentEvent\eventType
    EndWith
  EndProcedure
 
  Procedure EventTaskID()
    With gWorkerThreads
      ProcedureReturn \currentEvent\taskID
    EndWith
  EndProcedure
 
  Procedure AddTask(ID, Type, *TaskData, *TaskHandler.pTaskHandler, TaskCost = #TaskCost_Medium)
    With gWorkerThreads
      LockMutex(\mutex)
      If ListSize(\worker()) < \maxThreads
        AddElement(\worker())
        \worker()\workerIndex = ListIndex(\worker())
        LastElement(\worker()\tasks())
        AddElement(\worker()\tasks())
        \worker()\tasks()\taskInfo\WorkerID = \worker()\workerIndex
        \worker()\tasks()\taskInfo\ID = ID
        \worker()\tasks()\taskInfo\Type = Type
        \worker()\tasks()\taskInfo\UserData = *TaskData
        \worker()\tasks()\cost = TaskCost
        \worker()\tasks()\pending = #True
        \worker()\tasks()\handler = *TaskHandler
      Else
        Define ix.i, minCount.i = 1000000, bestWorker.i, cost.i, workers.i
        If \maxThreads < ListSize(\worker())
          workers = \maxThreads
        Else
          workers = ListSize(\worker())
        EndIf       
        For ix = 0 To workers - 1
          SelectElement(\worker(), ix)
          Select \balanceMethod
            Case #BalanceMethod_QueueSize
              If ListSize(\worker()\tasks()) < minCount
                minCount = ListSize(\worker()\tasks())
                bestWorker = ix
              EndIf
            Case #BalanceMethod_QueueCost
              cost = GetWorkerTotalCost()
              If cost < minCount
                minCount = cost
                bestWorker = ix
              EndIf
          EndSelect
        Next
        SelectElement(\worker(), bestWorker)
        LastElement(\worker()\tasks())
        AddElement(\worker()\tasks())
        \worker()\tasks()\taskInfo\WorkerID = \worker()\workerIndex
        \worker()\tasks()\taskInfo\ID = ID
        \worker()\tasks()\taskInfo\Type = Type
        \worker()\tasks()\taskInfo\UserData = *TaskData
        \worker()\tasks()\cost = TaskCost
        \worker()\tasks()\pending = #True
        \worker()\tasks()\handler = *TaskHandler       
      EndIf
      UnlockMutex(\mutex)
    EndWith
  EndProcedure
 
  Procedure Update()
    With gWorkerThreads
      LockMutex(\mutex)
      ForEach \worker()
        If ListSize(\worker()\tasks()) > 0
          FirstElement(\worker()\tasks())
          If \worker()\tasks()\pending
            \worker()\tasks()\pending = #False
            AddEvent(#Event_TaskStarted, \worker()\tasks()\taskInfo\ID)
            \worker()\tasks()\thread = CreateThread(\worker()\tasks()\handler, @\worker()\tasks()\taskInfo)
          Else
            If IsThread(\worker()\tasks()\thread) = 0
              AddEvent(#Event_TaskEnded, \worker()\tasks()\taskInfo\ID)
              DeleteElement(\worker()\tasks())
            EndIf
          EndIf
        EndIf
      Next
      UnlockMutex(\mutex)
    EndWith
  EndProcedure
 
  Procedure TasksRemaining()
    Protected.i count
    With gWorkerThreads
      LockMutex(\mutex)
      ForEach \worker()
        count = count + ListSize(\worker()\tasks())
      Next
      UnlockMutex(\mutex)
    EndWith
    ProcedureReturn count
  EndProcedure
 
  Procedure ClearPending()
    With gWorkerThreads
      LockMutex(\mutex)
      ForEach \worker()
        ForEach \worker()\tasks()
          If \worker()\tasks()\pending
            DeleteElement(\worker()\tasks())
          EndIf
        Next
      Next
      UnlockMutex(\mutex)
    EndWith
  EndProcedure
 
  Procedure TerminateAll()
    With gWorkerThreads
      LockMutex(\mutex)
      ForEach \worker()
        ForEach \worker()\tasks()
          If \worker()\tasks()\pending = #False
            If IsThread(\worker()\tasks()\thread)
              KillThread(\worker()\tasks()\thread)
            EndIf
          EndIf
        Next
        ClearList(\worker()\tasks())
      Next
      UnlockMutex(\mutex)
    EndWith
  EndProcedure
 
EndModule

CompilerIf #PB_Compiler_IsMainFile
 
  ; ========= EXAMPLE =========
 
  ; Make sure only 10 threads run at the same time (It will be 1 by default)
  WorkerThreads::SetMaximumThreads(10)
 
  ; Define a Worker procedure
  Procedure TestWorker(*Worker.WorkerThreads::sTaskInfo)
    With *Worker
      Debug "Task " + Str(\ID) + " started on worker " + Str(\WorkerID)
     
      Select *Worker\Type
        Case WorkerThreads::#TaskCost_Minimal
          Delay(Random(500))
        Case WorkerThreads::#TaskCost_Medium
          Delay(Random(1000))
        Case WorkerThreads::#TaskCost_High
          Delay(Random(2000))
        Case WorkerThreads::#TaskCost_Heavy
          Delay(Random(3000))
      EndSelect
     
      Debug "Task " + Str(\ID) + " finished on worker " + Str(\WorkerID) + ". Total tasks remaining: " + Str(WorkerThreads::TasksRemaining() - 1)
    EndWith
  EndProcedure
 
  ; Distribute tasks across workers depending on cost, not worker queue size
  WorkerThreads::SetBalanceMethod(WorkerThreads::#BalanceMethod_QueueCost)
 
  ; Create some tasks to queue, setting a random task cost.
  Define.i W, Cost
  For W = 1 To 40
    Cost = Random(WorkerThreads::#TaskCost_Heavy)
    WorkerThreads::AddTask(W, Cost, #Null, @TestWorker(), Cost)
  Next
 
  ; Enable Event Tracking
  WorkerThreads::TrackEvents(#True)
 
  ; Main loop - Handle until no tasks remain
  While WorkerThreads::TasksRemaining() > 0
    WorkerThreads::Update()
   
    ; Event Handling
    Select WorkerThreads::GetEvent()
      Case WorkerThreads::#Event_TaskStarted
        Debug "[Event] Task " + Str(WorkerThreads::EventTaskID()) + " has started."
      Case WorkerThreads::#Event_TaskEnded
        Debug "[Event] Task " + Str(WorkerThreads::EventTaskID()) + " has ended."
    EndSelect
   
    Delay(1)
  Wend
 
CompilerEndIf

_________________
Thanks!


Last edited by Env on Tue Feb 12, 2019 1:28 pm, edited 11 times in total.

Top
 Profile  
Reply with quote  
 Post subject: Re: [Module] Load-Balanced Worker Threads
PostPosted: Fri Feb 08, 2019 12:09 pm 
Offline
Addict
Addict
User avatar

Joined: Thu Jun 07, 2007 3:25 pm
Posts: 3715
Location: Berlin, Germany
Thank you for the code!

If you write EnableExplicit directly after the line
Code:
Module WorkerThreads
then you'll find 2 bugs. :-)

_________________
Please excuse my flawed English. My native language is PureBasic.
Search
RSBasic's backups


Top
 Profile  
Reply with quote  
 Post subject: Re: [Module] Load-Balanced Worker Threads
PostPosted: Fri Feb 08, 2019 12:31 pm 
Offline
Enthusiast
Enthusiast
User avatar

Joined: Tue Apr 27, 2010 3:20 pm
Posts: 144
Location: Wales, United Kingdom
Little John wrote:
Thank you for the code!

If you write EnableExplicit directly after the line
Code:
Module WorkerThreads
then you'll find 2 bugs. :-)



Good catch! and schoolboy error on my part. Code updated :)

_________________
Thanks!


Top
 Profile  
Reply with quote  
 Post subject: Re: [Module] Load-Balanced Worker Threads
PostPosted: Fri Feb 08, 2019 1:48 pm 
Offline
Enthusiast
Enthusiast
User avatar

Joined: Tue Apr 27, 2010 3:20 pm
Posts: 144
Location: Wales, United Kingdom
Update (Revision 3)

Added ability to specify how much time a task will cost.

Added the ability to define how to balance tasks across the workers:
    #BalanceMethod_QueueSize - Will balance tasks equally across the worker queues by quantity.
    #BalanceMethod_QueueCost - Will balance tasks equally across the worker queues by the total Task Cost of all tasks in each queue.

_________________
Thanks!


Last edited by Env on Fri Feb 08, 2019 3:44 pm, edited 1 time in total.

Top
 Profile  
Reply with quote  
 Post subject: Re: [Module] Load-Balanced Worker Threads
PostPosted: Fri Feb 08, 2019 1:56 pm 
Offline
Addict
Addict
User avatar

Joined: Sat Apr 26, 2003 2:15 pm
Posts: 841
Location: Cuernavaca, Mexico
Thanks Michael,

Nice Code.

Any reason 'why' you limited the threads to 5 :?:

Testing, I did a little experiment and could not find a problem with a higher threadcount.

I added the code below and found no issues ( I have a maximum of 32 CPU cores available )

Code:
Procedure GetCores(Choice=0)
   If Choice = 1
      ProcedureReturn CountCPUs(#PB_System_CPUs)
   Else
      ProcedureReturn CountCPUs(#PB_System_ProcessCPUs)
   EndIf
EndProcedure
Debug GetCores()
Debug GetCores(0) 
Debug GetCores(1)
   ; Make sure only 5 threads run at the same time (It will be 1 by default)
   ;WorkerThreads::SetMaximumThreads(5)
  WorkerThreads::SetMaximumThreads(GetCores())


Thanks again.

_________________
- It was too lonely at the top.

Current Machine: Win 10 Pro 64-bit, Dual Xeon E5-2670, 64 gigs ram, Geforce GTX 1660 Ti w/6 gigs ram


Top
 Profile  
Reply with quote  
 Post subject: Re: [Module] Load-Balanced Worker Threads
PostPosted: Fri Feb 08, 2019 2:02 pm 
Offline
Enthusiast
Enthusiast
User avatar

Joined: Tue Apr 27, 2010 3:20 pm
Posts: 144
Location: Wales, United Kingdom
blueb wrote:
Thanks Michael,

Nice Code.

Any reason 'why' you limited the threads to 5 :?:

Testing, I did a little experiment and could not find a problem with a higher threadcount.

I added the code below and found not issues ( I have a 32 CPU cores available )

Code:
Procedure GetCores(Choice=0)
   If Choice = 1
      ProcedureReturn CountCPUs(#PB_System_CPUs)
   Else
      ProcedureReturn CountCPUs(#PB_System_ProcessCPUs)
   EndIf
EndProcedure
Debug GetCores()
Debug GetCores(0) 
Debug GetCores(1)
   ; Make sure only 5 threads run at the same time (It will be 1 by default)
   ;WorkerThreads::SetMaximumThreads(5)
  WorkerThreads::SetMaximumThreads(GetCores())


Thanks again.


Hi Blueb :)

No specific reason - purely for example purposes :)

Nice example! And yes, I've tested with 100 threads with 400 queued tasks and found no issue, so it really is down to the end-user how wild they want to get

_________________
Thanks!


Top
 Profile  
Reply with quote  
 Post subject: Re: [Module] Load-Balanced Worker Threads
PostPosted: Fri Feb 08, 2019 2:55 pm 
Offline
Addict
Addict

Joined: Fri Nov 09, 2012 11:04 pm
Posts: 1710
Location: Uttoxeter, UK
@Env,
I always have trouble with threads.
I'll feel a lot more confident, now. :D
Excellent!
Thank for sharing.

_________________
DE AA EB


Top
 Profile  
Reply with quote  
 Post subject: Re: [Module] Load-Balanced Worker Threads
PostPosted: Fri Feb 08, 2019 3:03 pm 
Offline
Enthusiast
Enthusiast
User avatar

Joined: Tue Apr 27, 2010 3:20 pm
Posts: 144
Location: Wales, United Kingdom
davido wrote:
@Env,
I always have trouble with threads.
I'll feel a lot more confident, now. :D
Excellent!
Thank for sharing.


Great to hear! Threads have always been a bit of a scary territory for me when making anything, which is a reason why I wanted to create a drop-in solution to be used across applications to make life easier haha.

Update (Revision 4)

Added events queue to be accessed from main thread to track when a task (by specified ID) has started and ended.
This can be activated by using
Code:
WorkerThreads::TrackEvents(#True)


NB Update still needs to be called with your main loop regardless of if you call GetEvent

_________________
Thanks!


Top
 Profile  
Reply with quote  
 Post subject: Re: [Module] Load-Balanced Worker Threads
PostPosted: Tue Feb 12, 2019 1:02 pm 
Offline
Enthusiast
Enthusiast
User avatar

Joined: Tue Apr 27, 2010 3:20 pm
Posts: 144
Location: Wales, United Kingdom
Update (Revision 5)

    - Added method 'ClearPending()' - Will remove any pending tasks from all the worker queues.
    - Added method 'TerminateAll()' - Will kill any running threads, and then clear the queues.

_________________
Thanks!


Top
 Profile  
Reply with quote  
Display posts from previous:  Sort by  
Post new topic Reply to topic  [ 9 posts ] 

All times are UTC + 1 hour


Who is online

Users browsing this forum: No registered users and 8 guests


You cannot post new topics in this forum
You cannot reply to topics in this forum
You cannot edit your posts in this forum
You cannot delete your posts in this forum

Search for:
Jump to:  

 


Powered by phpBB © 2008 phpBB Group
subSilver+ theme by Canver Software, sponsor Sanal Modifiye