Thread pooling library

Share your advanced PureBasic knowledge/code with the community.
Axeman
User
User
Posts: 89
Joined: Mon Nov 03, 2003 5:34 am

Thread pooling library

Post by Axeman »

This is a small generic thread-pooling library that I made as an experiment. See the bottom of the code for example usage.

Make sure that you compile the test code in thread-safe mode.

This library came about as a result of this thread: viewtopic.php?f=13&t=75872

Code: Select all


; Use thread-safe mode, if required.


Global G_num_system_cpu = CountCPUs( #PB_System_CPUs )
Global G_num_process_cpu = CountCPUs( #PB_System_ProcessCPUs )

;Debug "System CPUs: " + G_num_system_cpu
;Debug "Process CPUs: " + G_num_process_cpu

Global G_max_process_cpu = G_num_process_cpu - 1

#MAX_POSITIVE = 2147483647 ; The highest positive long integer.

Prototype.i ProtoCallFunc( *struct ) 


Structure THREAD_JOB_STRUCT
DoJob.ProtoCallFunc
*job_struct
EndStructure

;Global NewList JobList.THREAD_JOB_STRUCT() ; A list used to add an entire joblist to a worker thread.


Structure THREAD_STRUCT
id.i
thread_handle.i
mutex.i
command.b
status.b
List JobList.THREAD_JOB_STRUCT()
EndStructure

Global Dim WorkerThreadArray.THREAD_STRUCT( G_max_process_cpu )

; Custom events.
Enumeration #PB_Event_FirstCustomValue
  #EVENT_THREAD_NOJOBS ; An event that can be posted when a worker thread has run out of jobs.
EndEnumeration



; Worker thread command constants.
Enumeration
#JOBCOM_NULL ; This command causes the thread to do nothing. Since this is zero it will be the command set by default on thread startup. The thread handle should not be used in this case, as it may not have been set yet.
#JOBCOM_PAUSE ; This command pauses the thread.
#JOBCOM_RUN ; This command allows the job que to be processed.
#JOBCOM_FLUSH ; This command flushes the job que. This may be problematic in some cases, so use with care. Once the que has been flushed it will be paused with a '#JOBSTATUS_NOJOBS' status.
#JOBCOM_END ; This command ends the thread.
EndEnumeration


; Worker thread status constants.
Enumeration
#JOBSTATUS_NULL ; This status indicates that the thread loop is running but has nothing to do. Since this is zero it will be the status set by default on thread startup when the thread handle may not have been set yet.
#JOBSTATUS_RUNNING ; This status signifies that the thread is currently running and processing jobs.
#JOBSTATUS_PAUSED ; This status signifies that the thread is currently paused, but may have jobs in the que. This is set before the thread is actually paused, so a delay might be needed between checking the paused state and proceeding.
#JOBSTATUS_ENDED ; This status signifies that the thread has ended and can be safely shut down.
#JOBSTATUS_NOJOBS ; This status signifies that the thread has no jobs in the que. The thread will be paused in this case, so treat it the same as '#JOBSTATUS_PAUSED'.
EndEnumeration





Procedure WorkerThread( *thread_data.THREAD_STRUCT )
LockMutex( *thread_data\mutex )
*thread_data\status = #JOBSTATUS_NOJOBS
PauseThread( *thread_data\thread_handle )
Repeat
	Select *thread_data\command
		Case #JOBCOM_END
			UnlockMutex( *thread_data\mutex )
			FreeMutex( *thread_data\mutex )
			*thread_data\status = #JOBSTATUS_ENDED
			ProcedureReturn
			
		Case #JOBCOM_FLUSH
			ClearList( *thread_data\JobList() )
			*thread_data\status = #JOBSTATUS_NOJOBS
			
			; Signal via the event system that this thread has run out of jobs to do. Returns the thread's array index value via EventData().
			; For programs that don't have an event que via windows or networking, a custom event system will probably need to be used.
			PostEvent( #EVENT_THREAD_NOJOBS, 0, 0, 0, *thread_data\id )
			
			PauseThread( *thread_data\thread_handle )
			
		Case #JOBCOM_PAUSE ; Pause the thread. Use 'ResumeWorkerThread()' or 'ResumeAllWorkerThreads()' to start it again.
			*thread_data\status = #JOBSTATUS_PAUSED
			PauseThread( *thread_data\thread_handle )
			
		Case #JOBCOM_RUN ; Run the thread. After the thread has run out of jobs and paused, use 'ResumeThread()' to start it up again after adding more jobs and setting this command.
			ended = 1
			*thread_data\status = #JOBSTATUS_RUNNING
			ForEach *thread_data\JobList()
				*thread_data\JobList()\DoJob( *thread_data\JobList()\job_struct )
				DeleteElement( *thread_data\JobList() ) ; Delete this completed job from the job list.
				If *thread_data\command <> #JOBCOM_RUN : ended = 0 : Break : EndIf ; Break out of the loop to process a new command. The 'ended' flag stops the new command from being ignored if the end of the job list is reached first.
			Next
			If ended
				If ListSize( *thread_data\JobList() ) = 0
					*thread_data\status = #JOBSTATUS_NOJOBS
					
					; Signal via the event system that this thread has run out of jobs to do. Returns the thread's array index value via EventData().
					; For programs that don't have an event que via windows or networking, a custom event system will probably need to be used.
					PostEvent( #EVENT_THREAD_NOJOBS, 0, 0, 0, *thread_data\id )
					
					PauseThread( *thread_data\thread_handle )
				EndIf
			EndIf
	EndSelect
ForEver
EndProcedure


; == INTERFACE FUNCTION ==


Procedure SetupWorkerThreads()
; Creates all the worker threads.
; Initially each thread will be paused with a '#JOBSTATUS_NOJOBS' status. Give it a que of jobs to do and then use the resume functions to start it running.

For thread_index = 0 To G_max_process_cpu
	WorkerThreadArray( thread_index )\id = thread_index
	WorkerThreadArray( thread_index )\mutex = CreateMutex()
	WorkerThreadArray( thread_index )\thread_handle = CreateThread( @WorkerThread(), @WorkerThreadArray( thread_index ) )
Next
Delay( 20 ) ; Delay to give the threads time to pause.
EndProcedure


Procedure EndWorkerThreads()
; Ends all the worker threads and returns once they have stopped running. This is normally called before shutting down the program, though it may not be needed.

; End all the worker threads.
For thread_index = 0 To G_max_process_cpu
	WorkerThreadArray( thread_index )\command = #JOBCOM_END
	ResumeThread( WorkerThreadArray( thread_index )\thread_handle ) ; Resume any paused threads.
Next

Delay( 20 ) ; Delay to be on the safe side.

; Check that all the worker threads have ended safely.
Repeat
	running_count = G_num_process_cpu
	For thread_index = 0 To G_max_process_cpu
		If WorkerThreadArray( thread_index )\status = #JOBSTATUS_ENDED : running_count - 1 : EndIf
	Next
Until running_count = 0

Delay( 20 ) ; Delay to be on the safe side.
EndProcedure


Procedure PauseWorkerThread( thread_index )
WorkerThreadArray( thread_index )\command = #JOBCOM_PAUSE
EndProcedure


Procedure PauseAllWorkerThreads()
For thread_index = 0 To G_max_process_cpu
	WorkerThreadArray( thread_index )\command = #JOBCOM_PAUSE
Next
EndProcedure


Procedure ResumeWorkerThread( thread_index )
WorkerThreadArray( thread_index )\command = #JOBCOM_RUN
ResumeThread( WorkerThreadArray( thread_index )\thread_handle )
EndProcedure


Procedure ResumeAllWorkerThreads()
For thread_index = 0 To G_max_process_cpu
	WorkerThreadArray( thread_index )\command = #JOBCOM_RUN
	ResumeThread( WorkerThreadArray( thread_index )\thread_handle )
Next
EndProcedure


Procedure AddJobToAnyWorkerThread( *job_function, *job_structure )
; Adds the specified job function and data structure as a job for the worker thread with the fewest jobs in its que.

; Find the worker thread with the least jobs in its que.
smallest_que = #MAX_POSITIVE
smallest_que_thread_index = 0
For thread_index = 0 To G_max_process_cpu
	size = ListSize( WorkerThreadArray( thread_index )\JobList() )
	If size < smallest_que : smallest_que = size : smallest_que_thread_index = thread_index : EndIf
Next
; Temporarily pause the thread and add the job to the thread's que.
*thread_data.THREAD_STRUCT = @WorkerThreadArray( smallest_que_thread_index )
PauseThread( *thread_data\thread_handle )
AddElement( *thread_data\JobList() )
*thread_data\JobList()\DoJob = *job_function
*thread_data\JobList()\job_struct = *job_structure
ResumeThread( *thread_data\thread_handle )
EndProcedure


Procedure AddJobToWorkerThread( thread_index, *job_function, *job_structure )
; Adds the specified job function and data structure as a job for the worker thread with the specified index.

; Temporarily pause the thread and add the job to the thread's que.
*thread_data.THREAD_STRUCT = @WorkerThreadArray( thread_index )
PauseThread( *thread_data\thread_handle )
AddElement( *thread_data\JobList() )
*thread_data\JobList()\DoJob = *job_function
*thread_data\JobList()\job_struct = *job_structure
ResumeThread( *thread_data\thread_handle )
EndProcedure


Procedure AddJobListToWorkerThread( thread_index, List NewJobList.THREAD_JOB_STRUCT(), clear_list = 1 )
; Adds the specified list of job functions and data structures as jobs for the worker thread with the specified index. The list must use the 'THREAD_JOB_STRUCT' structure.
; clear_list - 0 = The the list of jobs to be added is not cleared after it is added. 1 = The list is cleared.

; Temporarily pause the thread and add the jobs to the thread's que.
*thread_data.THREAD_STRUCT = @WorkerThreadArray( thread_index )
PauseThread( *thread_data\thread_handle )
ForEach NewJobList()
	AddElement( *thread_data\JobList() )
	*thread_data\JobList()\DoJob = NewJobList()\DoJob
	*thread_data\JobList()\job_struct = NewJobList()\job_struct
Next
If clear_list : ClearList( NewJobList() ) : EndIf ; Ensure that the list of jobs to be added is cleared after use, if specified.
ResumeThread( *thread_data\thread_handle )
EndProcedure


; == TEST ==

; *** Set the compiler options to 'thread-safe' or this test code won't work correctly. ***



#CANVAS = 0


Structure TEST_STRUCT
start_x.i
end_x.i
start_y.i
end_y.i
EndStructure

Global NewList TestList.TEST_STRUCT()

Procedure.i TestJob( *job_struct.TEST_STRUCT )
StartDrawing( CanvasOutput( #CANVAS ) )
For x = *job_struct\start_x To *job_struct\end_x
	For y = *job_struct\start_y To *job_struct\end_y
		Plot( x, y, #Black )
		;Delay( 20 )
	Next
Next
StopDrawing()
EndProcedure


Global G_num_worker_threads_with_jobs = G_num_process_cpu

SetupWorkerThreads()

If OpenWindow( 0, 0, 0, 601, 601, "Test", #PB_Window_SystemMenu | #PB_Window_ScreenCentered ) = 0 : End : EndIf
CanvasGadget( #CANVAS, 0, 0, 601, 601 )

; *** Three different ways to add jobs to the joblist of each thread. Comment out two of them to test the one left uncommented.

; ; Add jobs using 'AddJobToAnyWorkerThread()'.
; For x = 0 To 600 Step 10
; 	AddElement( TestList() )
; 	TestList()\start_x = x
; 	TestList()\end_x = x
; 	TestList()\start_y = 0
; 	TestList()\end_y = 600
; 	AddJobToAnyWorkerThread( @TestJob(), @TestList() )
; Next
; For y = 0 To 600 Step 10
; 	AddElement( TestList() )
; 	TestList()\start_x = 0
; 	TestList()\end_x = 600
; 	TestList()\start_y = y
; 	TestList()\end_y = y
; 	AddJobToAnyWorkerThread( @TestJob(), @TestList() )
; Next

; ; Add jobs using 'AddJobToWorkerThread()'.
; thread_index = 0
; For x = 0 To 600 Step 10
; 	AddElement( TestList() )
; 	TestList()\start_x = x
; 	TestList()\end_x = x
; 	TestList()\start_y = 0
; 	TestList()\end_y = 600
; 	AddJobToWorkerThread( thread_index, @TestJob(), @TestList() )
; 	thread_index + 1 : If thread_index > G_max_process_cpu : thread_index = 0 : EndIf
; Next
; For y = 0 To 600 Step 10
; 	AddElement( TestList() )
; 	TestList()\start_x = 0
; 	TestList()\end_x = 600
; 	TestList()\start_y = y
; 	TestList()\end_y = y
; 	AddJobToWorkerThread( thread_index, @TestJob(), @TestList() )
; 	thread_index + 1 : If thread_index > G_max_process_cpu : thread_index = 0 : EndIf
; Next

; Add jobs using 'AddJobListToWorkerThread()'.
Structure TEST_JOBLIST_STRUCT
	List JobList.THREAD_JOB_STRUCT() ; A list used to add an entire joblist to a worker thread.
EndStructure
Global Dim TestJobArray.TEST_JOBLIST_STRUCT( G_max_process_cpu )
thread_index = 0
For x = 0 To 600 Step 10
	AddElement( TestList() )
	TestList()\start_x = x
	TestList()\end_x = x
	TestList()\start_y = 0
	TestList()\end_y = 600
	AddElement( TestJobArray( thread_index )\JobList() )
	TestJobArray( thread_index )\JobList()\DoJob = @TestJob()
	TestJobArray( thread_index )\JobList()\job_struct = @TestList()
	thread_index + 1 : If thread_index > G_max_process_cpu : thread_index = 0 : EndIf
Next
For y = 0 To 600 Step 10
	AddElement( TestList() )
	TestList()\start_x = 0
	TestList()\end_x = 600
	TestList()\start_y = y
	TestList()\end_y = y
	AddElement( TestJobArray( thread_index )\JobList() )
	TestJobArray( thread_index )\JobList()\DoJob = @TestJob()
	TestJobArray( thread_index )\JobList()\job_struct = @TestList()
	thread_index + 1 : If thread_index > G_max_process_cpu : thread_index = 0 : EndIf
Next
For thread_index = 0 To G_max_process_cpu
	AddJobListToWorkerThread( thread_index, TestJobArray( thread_index )\JobList() )
Next

; ***

ResumeAllWorkerThreads()

Repeat
	Select WaitWindowEvent()
		
		Case #PB_Event_CloseWindow
			EndWorkerThreads()
			End
		
		Case #PB_Event_Gadget
		
		Case #EVENT_THREAD_NOJOBS
			G_num_worker_threads_with_jobs - 1
			Debug "Thread that finished all jobs: " + Str( EventData() )
			Debug "Num threads with jobs in que: " + Str( G_num_worker_threads_with_jobs )
		
	EndSelect
ForEver