Make sure that you compile the test code in thread-safe mode.
This library came about as a result of this thread: viewtopic.php?f=13&t=75872
Code: Select all
; Use thread-safe mode, if required.
Global G_num_system_cpu = CountCPUs( #PB_System_CPUs )
Global G_num_process_cpu = CountCPUs( #PB_System_ProcessCPUs )
;Debug "System CPUs: " + G_num_system_cpu
;Debug "Process CPUs: " + G_num_process_cpu
Global G_max_process_cpu = G_num_process_cpu - 1
#MAX_POSITIVE = 2147483647 ; The highest positive long integer.
Prototype.i ProtoCallFunc( *struct )
Structure THREAD_JOB_STRUCT
DoJob.ProtoCallFunc
*job_struct
EndStructure
;Global NewList JobList.THREAD_JOB_STRUCT() ; A list used to add an entire joblist to a worker thread.
Structure THREAD_STRUCT
id.i
thread_handle.i
mutex.i
command.b
status.b
List JobList.THREAD_JOB_STRUCT()
EndStructure
Global Dim WorkerThreadArray.THREAD_STRUCT( G_max_process_cpu )
; Custom events.
Enumeration #PB_Event_FirstCustomValue
#EVENT_THREAD_NOJOBS ; An event that can be posted when a worker thread has run out of jobs.
EndEnumeration
; Worker thread command constants.
Enumeration
#JOBCOM_NULL ; This command causes the thread to do nothing. Since this is zero it will be the command set by default on thread startup. The thread handle should not be used in this case, as it may not have been set yet.
#JOBCOM_PAUSE ; This command pauses the thread.
#JOBCOM_RUN ; This command allows the job que to be processed.
#JOBCOM_FLUSH ; This command flushes the job que. This may be problematic in some cases, so use with care. Once the que has been flushed it will be paused with a '#JOBSTATUS_NOJOBS' status.
#JOBCOM_END ; This command ends the thread.
EndEnumeration
; Worker thread status constants.
Enumeration
#JOBSTATUS_NULL ; This status indicates that the thread loop is running but has nothing to do. Since this is zero it will be the status set by default on thread startup when the thread handle may not have been set yet.
#JOBSTATUS_RUNNING ; This status signifies that the thread is currently running and processing jobs.
#JOBSTATUS_PAUSED ; This status signifies that the thread is currently paused, but may have jobs in the que. This is set before the thread is actually paused, so a delay might be needed between checking the paused state and proceeding.
#JOBSTATUS_ENDED ; This status signifies that the thread has ended and can be safely shut down.
#JOBSTATUS_NOJOBS ; This status signifies that the thread has no jobs in the que. The thread will be paused in this case, so treat it the same as '#JOBSTATUS_PAUSED'.
EndEnumeration
Procedure WorkerThread( *thread_data.THREAD_STRUCT )
LockMutex( *thread_data\mutex )
*thread_data\status = #JOBSTATUS_NOJOBS
PauseThread( *thread_data\thread_handle )
Repeat
Select *thread_data\command
Case #JOBCOM_END
UnlockMutex( *thread_data\mutex )
FreeMutex( *thread_data\mutex )
*thread_data\status = #JOBSTATUS_ENDED
ProcedureReturn
Case #JOBCOM_FLUSH
ClearList( *thread_data\JobList() )
*thread_data\status = #JOBSTATUS_NOJOBS
; Signal via the event system that this thread has run out of jobs to do. Returns the thread's array index value via EventData().
; For programs that don't have an event que via windows or networking, a custom event system will probably need to be used.
PostEvent( #EVENT_THREAD_NOJOBS, 0, 0, 0, *thread_data\id )
PauseThread( *thread_data\thread_handle )
Case #JOBCOM_PAUSE ; Pause the thread. Use 'ResumeWorkerThread()' or 'ResumeAllWorkerThreads()' to start it again.
*thread_data\status = #JOBSTATUS_PAUSED
PauseThread( *thread_data\thread_handle )
Case #JOBCOM_RUN ; Run the thread. After the thread has run out of jobs and paused, use 'ResumeThread()' to start it up again after adding more jobs and setting this command.
ended = 1
*thread_data\status = #JOBSTATUS_RUNNING
ForEach *thread_data\JobList()
*thread_data\JobList()\DoJob( *thread_data\JobList()\job_struct )
DeleteElement( *thread_data\JobList() ) ; Delete this completed job from the job list.
If *thread_data\command <> #JOBCOM_RUN : ended = 0 : Break : EndIf ; Break out of the loop to process a new command. The 'ended' flag stops the new command from being ignored if the end of the job list is reached first.
Next
If ended
If ListSize( *thread_data\JobList() ) = 0
*thread_data\status = #JOBSTATUS_NOJOBS
; Signal via the event system that this thread has run out of jobs to do. Returns the thread's array index value via EventData().
; For programs that don't have an event que via windows or networking, a custom event system will probably need to be used.
PostEvent( #EVENT_THREAD_NOJOBS, 0, 0, 0, *thread_data\id )
PauseThread( *thread_data\thread_handle )
EndIf
EndIf
EndSelect
ForEver
EndProcedure
; == INTERFACE FUNCTION ==
Procedure SetupWorkerThreads()
; Creates all the worker threads.
; Initially each thread will be paused with a '#JOBSTATUS_NOJOBS' status. Give it a que of jobs to do and then use the resume functions to start it running.
For thread_index = 0 To G_max_process_cpu
WorkerThreadArray( thread_index )\id = thread_index
WorkerThreadArray( thread_index )\mutex = CreateMutex()
WorkerThreadArray( thread_index )\thread_handle = CreateThread( @WorkerThread(), @WorkerThreadArray( thread_index ) )
Next
Delay( 20 ) ; Delay to give the threads time to pause.
EndProcedure
Procedure EndWorkerThreads()
; Ends all the worker threads and returns once they have stopped running. This is normally called before shutting down the program, though it may not be needed.
; End all the worker threads.
For thread_index = 0 To G_max_process_cpu
WorkerThreadArray( thread_index )\command = #JOBCOM_END
ResumeThread( WorkerThreadArray( thread_index )\thread_handle ) ; Resume any paused threads.
Next
Delay( 20 ) ; Delay to be on the safe side.
; Check that all the worker threads have ended safely.
Repeat
running_count = G_num_process_cpu
For thread_index = 0 To G_max_process_cpu
If WorkerThreadArray( thread_index )\status = #JOBSTATUS_ENDED : running_count - 1 : EndIf
Next
Until running_count = 0
Delay( 20 ) ; Delay to be on the safe side.
EndProcedure
Procedure PauseWorkerThread( thread_index )
WorkerThreadArray( thread_index )\command = #JOBCOM_PAUSE
EndProcedure
Procedure PauseAllWorkerThreads()
For thread_index = 0 To G_max_process_cpu
WorkerThreadArray( thread_index )\command = #JOBCOM_PAUSE
Next
EndProcedure
Procedure ResumeWorkerThread( thread_index )
WorkerThreadArray( thread_index )\command = #JOBCOM_RUN
ResumeThread( WorkerThreadArray( thread_index )\thread_handle )
EndProcedure
Procedure ResumeAllWorkerThreads()
For thread_index = 0 To G_max_process_cpu
WorkerThreadArray( thread_index )\command = #JOBCOM_RUN
ResumeThread( WorkerThreadArray( thread_index )\thread_handle )
Next
EndProcedure
Procedure AddJobToAnyWorkerThread( *job_function, *job_structure )
; Adds the specified job function and data structure as a job for the worker thread with the fewest jobs in its que.
; Find the worker thread with the least jobs in its que.
smallest_que = #MAX_POSITIVE
smallest_que_thread_index = 0
For thread_index = 0 To G_max_process_cpu
size = ListSize( WorkerThreadArray( thread_index )\JobList() )
If size < smallest_que : smallest_que = size : smallest_que_thread_index = thread_index : EndIf
Next
; Temporarily pause the thread and add the job to the thread's que.
*thread_data.THREAD_STRUCT = @WorkerThreadArray( smallest_que_thread_index )
PauseThread( *thread_data\thread_handle )
AddElement( *thread_data\JobList() )
*thread_data\JobList()\DoJob = *job_function
*thread_data\JobList()\job_struct = *job_structure
ResumeThread( *thread_data\thread_handle )
EndProcedure
Procedure AddJobToWorkerThread( thread_index, *job_function, *job_structure )
; Adds the specified job function and data structure as a job for the worker thread with the specified index.
; Temporarily pause the thread and add the job to the thread's que.
*thread_data.THREAD_STRUCT = @WorkerThreadArray( thread_index )
PauseThread( *thread_data\thread_handle )
AddElement( *thread_data\JobList() )
*thread_data\JobList()\DoJob = *job_function
*thread_data\JobList()\job_struct = *job_structure
ResumeThread( *thread_data\thread_handle )
EndProcedure
Procedure AddJobListToWorkerThread( thread_index, List NewJobList.THREAD_JOB_STRUCT(), clear_list = 1 )
; Adds the specified list of job functions and data structures as jobs for the worker thread with the specified index. The list must use the 'THREAD_JOB_STRUCT' structure.
; clear_list - 0 = The the list of jobs to be added is not cleared after it is added. 1 = The list is cleared.
; Temporarily pause the thread and add the jobs to the thread's que.
*thread_data.THREAD_STRUCT = @WorkerThreadArray( thread_index )
PauseThread( *thread_data\thread_handle )
ForEach NewJobList()
AddElement( *thread_data\JobList() )
*thread_data\JobList()\DoJob = NewJobList()\DoJob
*thread_data\JobList()\job_struct = NewJobList()\job_struct
Next
If clear_list : ClearList( NewJobList() ) : EndIf ; Ensure that the list of jobs to be added is cleared after use, if specified.
ResumeThread( *thread_data\thread_handle )
EndProcedure
; == TEST ==
; *** Set the compiler options to 'thread-safe' or this test code won't work correctly. ***
#CANVAS = 0
Structure TEST_STRUCT
start_x.i
end_x.i
start_y.i
end_y.i
EndStructure
Global NewList TestList.TEST_STRUCT()
Procedure.i TestJob( *job_struct.TEST_STRUCT )
StartDrawing( CanvasOutput( #CANVAS ) )
For x = *job_struct\start_x To *job_struct\end_x
For y = *job_struct\start_y To *job_struct\end_y
Plot( x, y, #Black )
;Delay( 20 )
Next
Next
StopDrawing()
EndProcedure
Global G_num_worker_threads_with_jobs = G_num_process_cpu
SetupWorkerThreads()
If OpenWindow( 0, 0, 0, 601, 601, "Test", #PB_Window_SystemMenu | #PB_Window_ScreenCentered ) = 0 : End : EndIf
CanvasGadget( #CANVAS, 0, 0, 601, 601 )
; *** Three different ways to add jobs to the joblist of each thread. Comment out two of them to test the one left uncommented.
; ; Add jobs using 'AddJobToAnyWorkerThread()'.
; For x = 0 To 600 Step 10
; AddElement( TestList() )
; TestList()\start_x = x
; TestList()\end_x = x
; TestList()\start_y = 0
; TestList()\end_y = 600
; AddJobToAnyWorkerThread( @TestJob(), @TestList() )
; Next
; For y = 0 To 600 Step 10
; AddElement( TestList() )
; TestList()\start_x = 0
; TestList()\end_x = 600
; TestList()\start_y = y
; TestList()\end_y = y
; AddJobToAnyWorkerThread( @TestJob(), @TestList() )
; Next
; ; Add jobs using 'AddJobToWorkerThread()'.
; thread_index = 0
; For x = 0 To 600 Step 10
; AddElement( TestList() )
; TestList()\start_x = x
; TestList()\end_x = x
; TestList()\start_y = 0
; TestList()\end_y = 600
; AddJobToWorkerThread( thread_index, @TestJob(), @TestList() )
; thread_index + 1 : If thread_index > G_max_process_cpu : thread_index = 0 : EndIf
; Next
; For y = 0 To 600 Step 10
; AddElement( TestList() )
; TestList()\start_x = 0
; TestList()\end_x = 600
; TestList()\start_y = y
; TestList()\end_y = y
; AddJobToWorkerThread( thread_index, @TestJob(), @TestList() )
; thread_index + 1 : If thread_index > G_max_process_cpu : thread_index = 0 : EndIf
; Next
; Add jobs using 'AddJobListToWorkerThread()'.
Structure TEST_JOBLIST_STRUCT
List JobList.THREAD_JOB_STRUCT() ; A list used to add an entire joblist to a worker thread.
EndStructure
Global Dim TestJobArray.TEST_JOBLIST_STRUCT( G_max_process_cpu )
thread_index = 0
For x = 0 To 600 Step 10
AddElement( TestList() )
TestList()\start_x = x
TestList()\end_x = x
TestList()\start_y = 0
TestList()\end_y = 600
AddElement( TestJobArray( thread_index )\JobList() )
TestJobArray( thread_index )\JobList()\DoJob = @TestJob()
TestJobArray( thread_index )\JobList()\job_struct = @TestList()
thread_index + 1 : If thread_index > G_max_process_cpu : thread_index = 0 : EndIf
Next
For y = 0 To 600 Step 10
AddElement( TestList() )
TestList()\start_x = 0
TestList()\end_x = 600
TestList()\start_y = y
TestList()\end_y = y
AddElement( TestJobArray( thread_index )\JobList() )
TestJobArray( thread_index )\JobList()\DoJob = @TestJob()
TestJobArray( thread_index )\JobList()\job_struct = @TestList()
thread_index + 1 : If thread_index > G_max_process_cpu : thread_index = 0 : EndIf
Next
For thread_index = 0 To G_max_process_cpu
AddJobListToWorkerThread( thread_index, TestJobArray( thread_index )\JobList() )
Next
; ***
ResumeAllWorkerThreads()
Repeat
Select WaitWindowEvent()
Case #PB_Event_CloseWindow
EndWorkerThreads()
End
Case #PB_Event_Gadget
Case #EVENT_THREAD_NOJOBS
G_num_worker_threads_with_jobs - 1
Debug "Thread that finished all jobs: " + Str( EventData() )
Debug "Num threads with jobs in que: " + Str( G_num_worker_threads_with_jobs )
EndSelect
ForEver