It is currently Wed Oct 28, 2020 7:17 pm

All times are UTC + 1 hour




Post new topic Reply to topic  [ 1 post ] 
Author Message
 Post subject: Thread pooling library
PostPosted: Fri Aug 28, 2020 9:41 am 
Offline
User
User

Joined: Mon Nov 03, 2003 5:34 am
Posts: 32
This is a small generic thread-pooling library that I made as an experiment. See the bottom of the code for example usage.

Make sure that you compile the test code in thread-safe mode.

This library came about as a result of this thread: viewtopic.php?f=13&t=75872

Code:

; Use thread-safe mode, if required.


Global G_num_system_cpu = CountCPUs( #PB_System_CPUs )
Global G_num_process_cpu = CountCPUs( #PB_System_ProcessCPUs )

;Debug "System CPUs: " + G_num_system_cpu
;Debug "Process CPUs: " + G_num_process_cpu

Global G_max_process_cpu = G_num_process_cpu - 1

#MAX_POSITIVE = 2147483647 ; The highest positive long integer.

Prototype.i ProtoCallFunc( *struct )


Structure THREAD_JOB_STRUCT
DoJob.ProtoCallFunc
*job_struct
EndStructure

;Global NewList JobList.THREAD_JOB_STRUCT() ; A list used to add an entire joblist to a worker thread.


Structure THREAD_STRUCT
id.i
thread_handle.i
mutex.i
command.b
status.b
List JobList.THREAD_JOB_STRUCT()
EndStructure

Global Dim WorkerThreadArray.THREAD_STRUCT( G_max_process_cpu )

; Custom events.
Enumeration #PB_Event_FirstCustomValue
  #EVENT_THREAD_NOJOBS ; An event that can be posted when a worker thread has run out of jobs.
EndEnumeration



; Worker thread command constants.
Enumeration
#JOBCOM_NULL ; This command causes the thread to do nothing. Since this is zero it will be the command set by default on thread startup. The thread handle should not be used in this case, as it may not have been set yet.
#JOBCOM_PAUSE ; This command pauses the thread.
#JOBCOM_RUN ; This command allows the job que to be processed.
#JOBCOM_FLUSH ; This command flushes the job que. This may be problematic in some cases, so use with care. Once the que has been flushed it will be paused with a '#JOBSTATUS_NOJOBS' status.
#JOBCOM_END ; This command ends the thread.
EndEnumeration


; Worker thread status constants.
Enumeration
#JOBSTATUS_NULL ; This status indicates that the thread loop is running but has nothing to do. Since this is zero it will be the status set by default on thread startup when the thread handle may not have been set yet.
#JOBSTATUS_RUNNING ; This status signifies that the thread is currently running and processing jobs.
#JOBSTATUS_PAUSED ; This status signifies that the thread is currently paused, but may have jobs in the que. This is set before the thread is actually paused, so a delay might be needed between checking the paused state and proceeding.
#JOBSTATUS_ENDED ; This status signifies that the thread has ended and can be safely shut down.
#JOBSTATUS_NOJOBS ; This status signifies that the thread has no jobs in the que. The thread will be paused in this case, so treat it the same as '#JOBSTATUS_PAUSED'.
EndEnumeration





Procedure WorkerThread( *thread_data.THREAD_STRUCT )
LockMutex( *thread_data\mutex )
*thread_data\status = #JOBSTATUS_NOJOBS
PauseThread( *thread_data\thread_handle )
Repeat
   Select *thread_data\command
      Case #JOBCOM_END
         UnlockMutex( *thread_data\mutex )
         FreeMutex( *thread_data\mutex )
         *thread_data\status = #JOBSTATUS_ENDED
         ProcedureReturn
         
      Case #JOBCOM_FLUSH
         ClearList( *thread_data\JobList() )
         *thread_data\status = #JOBSTATUS_NOJOBS
         
         ; Signal via the event system that this thread has run out of jobs to do. Returns the thread's array index value via EventData().
         ; For programs that don't have an event que via windows or networking, a custom event system will probably need to be used.
         PostEvent( #EVENT_THREAD_NOJOBS, 0, 0, 0, *thread_data\id )
         
         PauseThread( *thread_data\thread_handle )
         
      Case #JOBCOM_PAUSE ; Pause the thread. Use 'ResumeWorkerThread()' or 'ResumeAllWorkerThreads()' to start it again.
         *thread_data\status = #JOBSTATUS_PAUSED
         PauseThread( *thread_data\thread_handle )
         
      Case #JOBCOM_RUN ; Run the thread. After the thread has run out of jobs and paused, use 'ResumeThread()' to start it up again after adding more jobs and setting this command.
         ended = 1
         *thread_data\status = #JOBSTATUS_RUNNING
         ForEach *thread_data\JobList()
            *thread_data\JobList()\DoJob( *thread_data\JobList()\job_struct )
            DeleteElement( *thread_data\JobList() ) ; Delete this completed job from the job list.
            If *thread_data\command <> #JOBCOM_RUN : ended = 0 : Break : EndIf ; Break out of the loop to process a new command. The 'ended' flag stops the new command from being ignored if the end of the job list is reached first.
         Next
         If ended
            If ListSize( *thread_data\JobList() ) = 0
               *thread_data\status = #JOBSTATUS_NOJOBS
               
               ; Signal via the event system that this thread has run out of jobs to do. Returns the thread's array index value via EventData().
               ; For programs that don't have an event que via windows or networking, a custom event system will probably need to be used.
               PostEvent( #EVENT_THREAD_NOJOBS, 0, 0, 0, *thread_data\id )
               
               PauseThread( *thread_data\thread_handle )
            EndIf
         EndIf
   EndSelect
ForEver
EndProcedure


; == INTERFACE FUNCTION ==


Procedure SetupWorkerThreads()
; Creates all the worker threads.
; Initially each thread will be paused with a '#JOBSTATUS_NOJOBS' status. Give it a que of jobs to do and then use the resume functions to start it running.

For thread_index = 0 To G_max_process_cpu
   WorkerThreadArray( thread_index )\id = thread_index
   WorkerThreadArray( thread_index )\mutex = CreateMutex()
   WorkerThreadArray( thread_index )\thread_handle = CreateThread( @WorkerThread(), @WorkerThreadArray( thread_index ) )
Next
Delay( 20 ) ; Delay to give the threads time to pause.
EndProcedure


Procedure EndWorkerThreads()
; Ends all the worker threads and returns once they have stopped running. This is normally called before shutting down the program, though it may not be needed.

; End all the worker threads.
For thread_index = 0 To G_max_process_cpu
   WorkerThreadArray( thread_index )\command = #JOBCOM_END
   ResumeThread( WorkerThreadArray( thread_index )\thread_handle ) ; Resume any paused threads.
Next

Delay( 20 ) ; Delay to be on the safe side.

; Check that all the worker threads have ended safely.
Repeat
   running_count = G_num_process_cpu
   For thread_index = 0 To G_max_process_cpu
      If WorkerThreadArray( thread_index )\status = #JOBSTATUS_ENDED : running_count - 1 : EndIf
   Next
Until running_count = 0

Delay( 20 ) ; Delay to be on the safe side.
EndProcedure


Procedure PauseWorkerThread( thread_index )
WorkerThreadArray( thread_index )\command = #JOBCOM_PAUSE
EndProcedure


Procedure PauseAllWorkerThreads()
For thread_index = 0 To G_max_process_cpu
   WorkerThreadArray( thread_index )\command = #JOBCOM_PAUSE
Next
EndProcedure


Procedure ResumeWorkerThread( thread_index )
WorkerThreadArray( thread_index )\command = #JOBCOM_RUN
ResumeThread( WorkerThreadArray( thread_index )\thread_handle )
EndProcedure


Procedure ResumeAllWorkerThreads()
For thread_index = 0 To G_max_process_cpu
   WorkerThreadArray( thread_index )\command = #JOBCOM_RUN
   ResumeThread( WorkerThreadArray( thread_index )\thread_handle )
Next
EndProcedure


Procedure AddJobToAnyWorkerThread( *job_function, *job_structure )
; Adds the specified job function and data structure as a job for the worker thread with the fewest jobs in its que.

; Find the worker thread with the least jobs in its que.
smallest_que = #MAX_POSITIVE
smallest_que_thread_index = 0
For thread_index = 0 To G_max_process_cpu
   size = ListSize( WorkerThreadArray( thread_index )\JobList() )
   If size < smallest_que : smallest_que = size : smallest_que_thread_index = thread_index : EndIf
Next
; Temporarily pause the thread and add the job to the thread's que.
*thread_data.THREAD_STRUCT = @WorkerThreadArray( smallest_que_thread_index )
PauseThread( *thread_data\thread_handle )
AddElement( *thread_data\JobList() )
*thread_data\JobList()\DoJob = *job_function
*thread_data\JobList()\job_struct = *job_structure
ResumeThread( *thread_data\thread_handle )
EndProcedure


Procedure AddJobToWorkerThread( thread_index, *job_function, *job_structure )
; Adds the specified job function and data structure as a job for the worker thread with the specified index.

; Temporarily pause the thread and add the job to the thread's que.
*thread_data.THREAD_STRUCT = @WorkerThreadArray( thread_index )
PauseThread( *thread_data\thread_handle )
AddElement( *thread_data\JobList() )
*thread_data\JobList()\DoJob = *job_function
*thread_data\JobList()\job_struct = *job_structure
ResumeThread( *thread_data\thread_handle )
EndProcedure


Procedure AddJobListToWorkerThread( thread_index, List NewJobList.THREAD_JOB_STRUCT(), clear_list = 1 )
; Adds the specified list of job functions and data structures as jobs for the worker thread with the specified index. The list must use the 'THREAD_JOB_STRUCT' structure.
; clear_list - 0 = The the list of jobs to be added is not cleared after it is added. 1 = The list is cleared.

; Temporarily pause the thread and add the jobs to the thread's que.
*thread_data.THREAD_STRUCT = @WorkerThreadArray( thread_index )
PauseThread( *thread_data\thread_handle )
ForEach NewJobList()
   AddElement( *thread_data\JobList() )
   *thread_data\JobList()\DoJob = NewJobList()\DoJob
   *thread_data\JobList()\job_struct = NewJobList()\job_struct
Next
If clear_list : ClearList( NewJobList() ) : EndIf ; Ensure that the list of jobs to be added is cleared after use, if specified.
ResumeThread( *thread_data\thread_handle )
EndProcedure


; == TEST ==

; *** Set the compiler options to 'thread-safe' or this test code won't work correctly. ***



#CANVAS = 0


Structure TEST_STRUCT
start_x.i
end_x.i
start_y.i
end_y.i
EndStructure

Global NewList TestList.TEST_STRUCT()

Procedure.i TestJob( *job_struct.TEST_STRUCT )
StartDrawing( CanvasOutput( #CANVAS ) )
For x = *job_struct\start_x To *job_struct\end_x
   For y = *job_struct\start_y To *job_struct\end_y
      Plot( x, y, #Black )
      ;Delay( 20 )
   Next
Next
StopDrawing()
EndProcedure


Global G_num_worker_threads_with_jobs = G_num_process_cpu

SetupWorkerThreads()

If OpenWindow( 0, 0, 0, 601, 601, "Test", #PB_Window_SystemMenu | #PB_Window_ScreenCentered ) = 0 : End : EndIf
CanvasGadget( #CANVAS, 0, 0, 601, 601 )

; *** Three different ways to add jobs to the joblist of each thread. Comment out two of them to test the one left uncommented.

; ; Add jobs using 'AddJobToAnyWorkerThread()'.
; For x = 0 To 600 Step 10
;    AddElement( TestList() )
;    TestList()\start_x = x
;    TestList()\end_x = x
;    TestList()\start_y = 0
;    TestList()\end_y = 600
;    AddJobToAnyWorkerThread( @TestJob(), @TestList() )
; Next
; For y = 0 To 600 Step 10
;    AddElement( TestList() )
;    TestList()\start_x = 0
;    TestList()\end_x = 600
;    TestList()\start_y = y
;    TestList()\end_y = y
;    AddJobToAnyWorkerThread( @TestJob(), @TestList() )
; Next

; ; Add jobs using 'AddJobToWorkerThread()'.
; thread_index = 0
; For x = 0 To 600 Step 10
;    AddElement( TestList() )
;    TestList()\start_x = x
;    TestList()\end_x = x
;    TestList()\start_y = 0
;    TestList()\end_y = 600
;    AddJobToWorkerThread( thread_index, @TestJob(), @TestList() )
;    thread_index + 1 : If thread_index > G_max_process_cpu : thread_index = 0 : EndIf
; Next
; For y = 0 To 600 Step 10
;    AddElement( TestList() )
;    TestList()\start_x = 0
;    TestList()\end_x = 600
;    TestList()\start_y = y
;    TestList()\end_y = y
;    AddJobToWorkerThread( thread_index, @TestJob(), @TestList() )
;    thread_index + 1 : If thread_index > G_max_process_cpu : thread_index = 0 : EndIf
; Next

; Add jobs using 'AddJobListToWorkerThread()'.
Structure TEST_JOBLIST_STRUCT
   List JobList.THREAD_JOB_STRUCT() ; A list used to add an entire joblist to a worker thread.
EndStructure
Global Dim TestJobArray.TEST_JOBLIST_STRUCT( G_max_process_cpu )
thread_index = 0
For x = 0 To 600 Step 10
   AddElement( TestList() )
   TestList()\start_x = x
   TestList()\end_x = x
   TestList()\start_y = 0
   TestList()\end_y = 600
   AddElement( TestJobArray( thread_index )\JobList() )
   TestJobArray( thread_index )\JobList()\DoJob = @TestJob()
   TestJobArray( thread_index )\JobList()\job_struct = @TestList()
   thread_index + 1 : If thread_index > G_max_process_cpu : thread_index = 0 : EndIf
Next
For y = 0 To 600 Step 10
   AddElement( TestList() )
   TestList()\start_x = 0
   TestList()\end_x = 600
   TestList()\start_y = y
   TestList()\end_y = y
   AddElement( TestJobArray( thread_index )\JobList() )
   TestJobArray( thread_index )\JobList()\DoJob = @TestJob()
   TestJobArray( thread_index )\JobList()\job_struct = @TestList()
   thread_index + 1 : If thread_index > G_max_process_cpu : thread_index = 0 : EndIf
Next
For thread_index = 0 To G_max_process_cpu
   AddJobListToWorkerThread( thread_index, TestJobArray( thread_index )\JobList() )
Next

; ***

ResumeAllWorkerThreads()

Repeat
   Select WaitWindowEvent()
      
      Case #PB_Event_CloseWindow
         EndWorkerThreads()
         End
      
      Case #PB_Event_Gadget
      
      Case #EVENT_THREAD_NOJOBS
         G_num_worker_threads_with_jobs - 1
         Debug "Thread that finished all jobs: " + Str( EventData() )
         Debug "Num threads with jobs in que: " + Str( G_num_worker_threads_with_jobs )
      
   EndSelect
ForEver



Top
 Profile  
Reply with quote  
Display posts from previous:  Sort by  
Post new topic Reply to topic  [ 1 post ] 

All times are UTC + 1 hour


Who is online

Users browsing this forum: No registered users and 25 guests


You cannot post new topics in this forum
You cannot reply to topics in this forum
You cannot edit your posts in this forum
You cannot delete your posts in this forum

Search for:
Jump to:  

 


Powered by phpBB © 2008 phpBB Group
subSilver+ theme by Canver Software, sponsor Sanal Modifiye