ParallelFor - Use your cores

Share your advanced PureBasic knowledge/code with the community.
remi_meier
Enthusiast
Enthusiast
Posts: 468
Joined: Sat Dec 20, 2003 6:19 pm
Location: Switzerland

ParallelFor - Use your cores

Post by remi_meier »

It's something a wanted to do a long time ago, but forgot about it :lol:
Anyway, this example is not really finished and I will surely advance it further,
but you will get the idea :wink:

So, I wanted an easy method of using all my CPU cores to do complex calculations.
I didn't really test it yet, but if I didn't really do a great mistake (possible,
cause I'm in a hurry right now :lol: ), it really is twice as fast now.

Ok, let's see:
1. Example, use the standard way. 2 For-Loops to calculate an array of values.
I do it for 2 seconds and count how many times it calculated:

Code: Select all

DisableDebugger
;- EXAMPLE


maxX = 1400
maxY = 1050
Global Dim a.d(maxX, maxY)

Delay(100)
time = ElapsedMilliseconds()
counter = 0

Repeat
  
  For y = 0 To maxY
    For x = 0 To maxX
      c.l = (Int(y) ! Int(x)) & $FF
      a(x, y) = Pow(c, x)
    Next
  Next
  
  counter + 1
Until ElapsedMilliseconds() - time > 2000


MessageRequester("Fertig", "Durchläufe: "+Str(counter))
There I get 4 runs :roll:

Ok, now let's try the ParallelFor:

Code: Select all


DisableDebugger
EnableExplicit
CompilerIf #PB_Compiler_Thread
CompilerElse
  CompilerError "In order to use ParallelFor, you have to compile as thread safe."
CompilerEndIf

; a procedure to get the number of CPUs/Cores
; somebody could add the windows version ;)
Procedure.l GetCPUCount()
  CompilerIf #PB_Compiler_OS = #PB_OS_Linux
  Protected file.l, count.l, s.s, dir.l
  
  If FileSize("/proc/cpuinfo") <> -1
    file = ReadFile(#PB_Any, "/proc/cpuinfo")
    
    If IsFile(file)
      count = 0
      While Not Eof(file)
        s.s = ReadString(file)
        If Left(s, 9) = "processor"
          count + 1
        EndIf
      Wend
      
      CloseFile(file)
    EndIf
    
  Else
    dir = ExamineDirectory(#PB_Any, "/proc/acpi/processor", "")
    count = 0
    If IsDirectory(dir)
      While NextDirectoryEntry(dir)
        If Left(DirectoryEntryName(dir), 3) = "CPU"
          count + 1
        EndIf
      Wend
      FinishDirectory(dir)
    EndIf
  EndIf
  
  ProcedureReturn count
  CompilerElse
  ProcedureReturn 2
  CompilerEndIf
EndProcedure

; this is the callback procedure lookalike. If the callback returns #True,
; the thread will terminate.
; i.l = the variable that will be incremented like in "For i = 80 to 110"
; *dataPtr = a value you can specify that will be given to each callback 
Prototype.l ParallelLoopFunc(i.l, *dataPtr)

; internal structures
Structure PARALLELLOOP
  i1.l                    ; starting number
  i2.l                    ; finishing number
  loop.ParallelLoopFunc   ; callback prototype
  *dataPtr                ; a value given to the callback
EndStructure
Structure PARALLELTHREAD  ; a structure for ParallelForLinear()
  id.l                ; ThreadID
  info.PARALLELLOOP   ; Thread-Data for running the callback
EndStructure

; This is the actual thread:
Procedure _ParallelFor_Thread(*info.PARALLELLOOP)
  Protected i.l
  
  ; in *info, all the information is stored, like
  ; start and stop value for 'i'
  ; or the data to be given to the callback
  With *info
    
    ; now run the callback with 'i' as the first
    ; parameter exactly (\i1 - \i2 + 1) times
    For i = \i1 To \i2
      ; call the Callback with 'i' and '*dataPtr'
      ; and check, if it returns #True.
      If \loop(i, \dataPtr)
        ; if returned #true, break and therefore
        ; terminate the thread
        Break
      EndIf
    Next
    
  EndWith
  
  ; I have finished my job -> terminate thread
EndProcedure

; ParallelFor() is a procedure that will create 2 threads:
; i1 = thread1, start value
; i2 = thread1, stop value
; j1 = thread2, start value
; j2 = thread2, stop value
; loopFunc = pointer to the callback function
; *dataPtr = a value given to each callback function
Procedure ParallelFor(i1.l, i2.l, j1.l, j2.l, loopFunc.ParallelLoopFunc, *dataPtr)
  Protected *th1, *th2
  Protected info1.PARALLELLOOP, info2.PARALLELLOOP
  
  ; these requirements have to be fullfilled
  If loopFunc And i1 <= i2 And j1 <= j2
    ; create the sub-job for thread 1:
    With info1
      \i1 = i1              ; start
      \i2 = i2              ; stop
      \loop = loopFunc
      \dataPtr = *dataPtr
    EndWith
    
    *th1 = CreateThread(@_ParallelFor_Thread(), @info1)
    
    
    If *th1
      ; if thread 1 has been created, create thread 2
      With info2
        \i1 = j1
        \i2 = j2
        \loop = loopFunc
        \dataPtr = *dataPtr
      EndWith
      
      *th2 = CreateThread(@_ParallelFor_Thread(), @info2)
      
      ; and now wait for both threads to terminate
      If *th2
        WaitThread(*th1)
        WaitThread(*th2)
      Else
        WaitThread(*th1)
      EndIf 
    EndIf
    
  EndIf
  
EndProcedure

; ParallelForLinear() is a procedure that will create 1 or more threads
; and split the job into equal sub-jobs/threads
; i1 = start value
; i2 = stop value    - be sure that i2 is significantly larger than i1
; loopFunc = pointer to the callback function
; *dataPtr = a value given to each callback function
; cores = there you can specify, how many threads shall be generated. You can
;         use GetCPUCount() for this to use all cores.
Procedure ParallelForLinear(i1.l, i2.l, loopFunc.ParallelLoopFunc, *dataPtr, cores.l = 2)
  Protected NewList threads.PARALLELTHREAD()
  Protected z.l, stepSize.l, j.l, j2.l
  
  If loopFunc And i1 <= i2 And cores >= 1
    ; stepSize is the range every thread has to process
    stepSize = (i2 - i1) / cores
    j = i1    ; first thread starts with i1
    
    ; for each core, create a job
    For z = 1 To cores
      ; the 'stop' for the thread will be start+stepSize:
      j2 = j + stepSize
      If j2 > i2
        j2 = i2
        z = cores  ; stop the For-Loop if no things for other cores
      EndIf
      If j > i2
        ; just to be on the safe side. not tested that much
        Break
      EndIf
      
      ; create a new thread/sub-job
      AddElement(threads())
      With threads()\info
        \i1 = j
        \i2 = j2
        \loop = loopFunc
        \dataPtr = *dataPtr
      EndWith
      
      j + stepSize + 1   ; next thread will continue with the work
    Next
    
    ; now we will start all the threads
    ; it's now because I don't want to know what can happen if
    ; I access the LinkedList within the threads and still adding
    ; new elements
    ForEach threads()
      threads()\id = CreateThread(@_ParallelFor_Thread(), @threads()\info)
      
      If threads()\id = 0
        ; if something went wrong, break up
        Break
      EndIf
    Next
    
    ; wait for each thread to stop
    ForEach threads()
      WaitThread(threads()\id)
    Next
    
  EndIf
  
EndProcedure



;- HOWTO:
; In general:
; If you have a BIG For-Loop (BIG means either many many iterations, big
; loop-body or in general, looong execution times (so that the overhead
; produced by starting threads etc. doesn't matter)), then you just have
; to follow these few steps:
; - Be sure, that no iteration of the loop depends on another iteration
;   i.e. you cannot use the result of a calculation that was done an
;   iteration earlier.
; - Be sure, that you are able to synchronize the loop-body so that
;   threads can safely do their work -> Thread safety!
; - Then just replace a for-loop like
;   For x = 50 to 50000
;   with
;   ParallelForLinear(50, 50000, @LoopBody(), @myData, GetCPUCount())
; - Capsule the loop body into a LoopBody() procedure that follows the
;   definition of the prototype ParallelLoopFunc
; and you should be done.







;- EXAMPLE
DisableExplicit

maxX = 1400
maxY = 1050
Global Dim a.d(maxX, maxY)

Delay(100)
time = ElapsedMilliseconds()
counter = 0


Procedure InnerLoop(i.l, maxX.l)
  For x = 0 To maxX
    c.l = (Int(y) ! Int(x)) & $FF
    a(x, y) = Pow(c, x)
  Next
EndProcedure

Repeat
  
  ParallelForLinear(0, maxY, @InnerLoop(), maxX, GetCPUCount())
;   For y = 0 To maxY
;     For x = 0 To maxX
;       c.l = (Int(y) ! Int(x)) & $FF
;       a(x, y) = Pow(c, x)
;     Next
;   Next
  
  counter + 1
Until ElapsedMilliseconds() - time > 2000



MessageRequester("Fertig", "Durchläufe: "+Str(counter))
Ok, cool, 8 runs :D


Yeah, it doesn't scale up to 4 or more cores, but let's wait'n'see :wink:


Edit: Added comments and an alpha version of the linear scaling version
Last edited by remi_meier on Fri Feb 01, 2008 9:08 pm, edited 1 time in total.
Athlon64 3700+, 1024MB Ram, Radeon X1600
User avatar
Rescator
Addict
Addict
Posts: 1769
Joined: Sat Feb 19, 2005 5:05 pm
Location: Norway

Post by Rescator »

Nice idea.
But this depends entirely on what you are doing.

Code: Select all

  ParallelFor(0, Int(maxY / 2), Int(maxY / 2) + 1, maxY, @InnerLoop(), maxX)
is not the same as

Code: Select all

   For y = 0 To maxY
     For x = 0 To maxX
       c.l = (Int(y) ! Int(x)) & $FF
       a(x, y) = Pow(c, x)
     Next
   Next
The parallel one handles both at the same time, either thread could finish before the other.

The "normal" one is sequential so no race conditions can occur.
I may be missing something here though.

But a better way would be to have the "first thread" calculate the "top half" of a screen and the "second thread" the "second half" of the screen.
Then when both threads are done, display the new screen.
I haven't studied what you do exactly, so if you are making thread1 do each odd line and thread2 do each odd line then you are doing it correctly (albeit differently).
I you are not doing that then the result will be a mess.

PS! The WindowsAPI also let you mess with CPU and thread affinity masks.
But for one offs like this it may just be best to let the OS place the threads on the CPUs it wants as one CPU may already be very busy with something else so having one of the threads on that could make things even slower.


EDIT:
PS! Cutting a screen in two should be faster than alternating lines.
The most used way to use multiple GPU's to render a screen is to split the screen in two or more (All SLI does this I think?) rather than alternating lines.
Ofcourse, alternating lines threads may be better suited to stuff like de-interlacing etc. though. It really depends on the intended use. Just be careful about race conditions.
User avatar
Dreamland Fantasy
Enthusiast
Enthusiast
Posts: 335
Joined: Fri Jun 11, 2004 9:35 pm
Location: Glasgow, UK
Contact:

Post by Dreamland Fantasy »

As Rescator says you should be wary of "race" conditions whereby one thread can finish before another.

Some algorithms will re-evaluate what is still to be done when a thread has completed and redistribute the workload accordingly.

Kind regards,

Francis.
remi_meier
Enthusiast
Enthusiast
Posts: 468
Joined: Sat Dec 20, 2003 6:19 pm
Location: Switzerland

Post by remi_meier »

Rescator wrote:Nice idea.
But this depends entirely on what you are doing.
Correct
Rescator wrote:

Code: Select all

  ParallelFor(0, Int(maxY / 2), Int(maxY / 2) + 1, maxY, @InnerLoop(), maxX)
is not the same as

Code: Select all

   For y = 0 To maxY
     For x = 0 To maxX
       c.l = (Int(y) ! Int(x)) & $FF
       a(x, y) = Pow(c, x)
     Next
   Next
The parallel one handles both at the same time, either thread could finish before the other.
Actually, in this case, it is the same. It's correct, that they will not be executed
sequential, but that's the idea of it. If you take a closer look, you'll see, that
there are no race conditions here.
Rescator wrote:But a better way would be to have the "first thread" calculate the "top half" of a screen and the "second thread" the "second half" of the screen.
Then when both threads are done, display the new screen.
That's actually, what it does. But it's up to you. You have your i1, i2, j1, j2
where you can split up a for-loop as you like.
Rescator wrote:I haven't studied what you do exactly, so if you are making thread1 do each odd line and thread2 do each odd line then you are doing it correctly (albeit differently).
That would be quite odd, wouldn't it? :wink: You should study the code.
Rescator wrote:PS! The WindowsAPI also let you mess with CPU and thread affinity masks.
But for one offs like this it may just be best to let the OS place the threads on the CPUs it wants as one CPU may already be very busy with something else so having one of the threads on that could make things even slower.
I'm aware of that, but I'm on Linux and there is surely a way that somebody
could improve the code above (actually, I already did a dynamic scaling
version, but I'll test it a bit more).
Rescator wrote:PS! Cutting a screen in two should be faster than alternating lines.
The most used way to use multiple GPU's to render a screen is to split the screen in two or more (All SLI does this I think?) rather than alternating lines.
Ofcourse, alternating lines threads may be better suited to stuff like de-interlacing etc. though. It really depends on the intended use. Just be careful about race conditions.
Of course that's what it does...


Anyway: You're missing the whole point of it! I know that it is essential
that one writes to the graphics buffer SEQUENTIAL! It would be a great loss
if one didn't do so. The whole purpose of the code above is for CALCULATIONS
that one has to do. If that means for you to calculate pixels of a screen, do
so and buffer it yourself, and THEN write to the screen.

Actually, I think of using it more for things like matrix calculations (of course
SSE would be the first step for this, but one can speed it up like this even
more).

And you're right, I should have written about the race conditions. I actually
thought I did in a comment in the code, but it must have gotten lost while
testing :roll:



@Dreamland Fantasy:
Some algorithms will re-evaluate what is still to be done when a thread has completed and redistribute the workload accordingly.
Yeah, I'm aware of that, but you actually missed my goal. I wanted an easy
way of splitting up calculations into threads. It just has to be limited so that
it can be used easily. An algorithm like yours would require corrections and
complex adjustments to each problem you're trying to solve. And this certainly
wouldn't be easy and reusable.
Athlon64 3700+, 1024MB Ram, Radeon X1600
User avatar
Rescator
Addict
Addict
Posts: 1769
Joined: Sat Feb 19, 2005 5:05 pm
Location: Norway

Post by Rescator »

remi_meier wrote:And you're right, I should have written about the race conditions. I actually
thought I did in a comment in the code, but it must have gotten lost while
testing :roll:
Heh! I could have taken the time to trace what was plotted where but... Learning other peoples code is a pain at times :P Especially with no comments.

A few comment lines pointing out what you do where and this/that order would be great for those learning threading.

Me, being old and stubborn would proll'y roll my own from scratch despite the extra work :P
remi_meier
Enthusiast
Enthusiast
Posts: 468
Joined: Sat Dec 20, 2003 6:19 pm
Location: Switzerland

Post by remi_meier »

You're right, there were no comments, now there are hopefully plenty :wink:

There is also the linear scaling version for testing. I'm not that totally sure on
this one.
Athlon64 3700+, 1024MB Ram, Radeon X1600
freak
PureBasic Team
PureBasic Team
Posts: 5929
Joined: Fri Apr 25, 2003 5:21 pm
Location: Germany

Post by freak »

Very interesting.

I have been experimenting with this for a bit and came up with a little different
approach. The idea is this:

- Create a set of "worker threads" on startup which will be initially sleeping
- ParallelFor() wake's up a worker thread with semaphores and dispatches the work to it.
- Then it waits for the next thread to be available to dispatch work to.

It requires some inter-thread communication and synchronisation to do this, but the
benefit is that the overhead is much smaller than creating the threads each time,
which allows the ParallelFor() to dispatch one loop iteration (=procedure call)
to a thread at a time, which makes it easier to scale up to more processor cores.
Also the situation that one thread finishes much earlier than the other (=wasted resources) should be less frequent.

Ok. Some code now:

This is a common code used by both examples below.
(I will probably add these semaphore commands to PB 4.30 for all OS.)

Code: Select all

DisableDebugger
EnableExplicit

CompilerIf #PB_Compiler_Thread = 0
  CompilerError "In order to use ParallelFor, you have to compile as thread safe."
CompilerEndIf


; Platform specific code
;
CompilerSelect #PB_Compiler_OS 
CompilerCase #PB_OS_Windows

  Macro CreateSemaphore(Initial = 0, Max = 1)
    CreateSemaphore_(#Null, Initial, Max, #Null)
  EndMacro
  
  Macro FreeSemaphore(Sem)
    CloseHandle_(Sem)
  EndMacro
  
  Macro SignalSemaphore(Sem, Inc = 1)
    ReleaseSemaphore_(Sem, Inc, #Null)
  EndMacro
  
  Macro WaitSemaphore(Sem)
    WaitForSingleObject_(Sem, #INFINITE)
  EndMacro
  
  Procedure GetCPUCount()
    Protected Count, i, ProcessMask, SystemMask
  
    If GetProcessAffinityMask_(GetCurrentProcess_(), @ProcessMask, @SystemMask)
      For i = 0 To 31
        If ProcessMask & (1<<i)
          Count + 1
        EndIf
      Next i
    EndIf
    
    If Count = 0
      ProcedureReturn 1
    Else
      ProcedureReturn Count
    EndIf
  EndProcedure

CompilerCase #PB_OS_Linux

  ; Bits/pthreadtypes.h
  #__SIZEOF_PTHREAD_MUTEX_T = 24
  #__SIZEOF_PTHREAD_COND_T  = 48
  
  Structure pthread_mutex_t
    opaque.b[#__SIZEOF_PTHREAD_MUTEX_T]
  EndStructure
  
  Structure pthread_cond_t
    opaque.b[#__SIZEOF_PTHREAD_COND_T]
  EndStructure

  Structure Semaphore
    Count.l
    Max.l
    Mutex.pthread_mutex_t
    Cond.pthread_cond_t
  EndStructure
  
  ImportC "-lpthread"
    pthread_cond_init_(*cond, *attr)  As "pthread_cond_init"
    pthread_cond_destroy_(*cond)      As "pthread_cond_destroy"
    pthread_cond_broadcast_(*cond)    As "pthread_cond_broadcast"
    pthread_cond_wait_(*cond, *mutex) As "pthread_cond_wait"
  EndImport

  Procedure CreateSemaphore(Initial = 0, Max = 1)
    *Sem.Semaphore = AllocateMemory(SizeOf(Semaphore))    
    If *Sem
      *Sem\Count = Initial
      *Sem\Max   = Max
      pthread_mutex_init_(@*Sem\Mutex, #Null)
      pthread_cond_init_(@*Sem\Cond, #Null)     
    EndIf
    
    ProcedureReturn *Sem
  EndProcedure
  
  Procedure FreeSemaphore(*Sem.Semaphore)
    pthread_mutex_destroy_(@*Sem\Mutex)
    pthread_cond_destroy_(@*Sem\Cond)
    FreeMemory(*Sem)
  EndProcedure
  
  Procedure SignalSemaphore(*Sem.Semaphore, Inc = 1)
    pthread_mutex_lock_(@*Sem\Mutex)
    
    If *Sem\Count + Inc <= *Sem\Max
 
      ; While we are below 0 (=locked), we must signal the condition
      ; variable for each increase to release a waiting thread       
      While *Sem\Count < 0 And Inc > 0
        *Sem\Count + 1
        Inc - 1
        pthread_cond_broadcast_(@*Sem\Cond)
      Wend
      
      ; Once Count >= 0, we just increase the count as no thread is waiting      
      *Sem\Count + Inc
    EndIf

    pthread_mutex_unlock_(@*Sem\Mutex)
  EndProcedure
  
  Procedure WaitSemaphore(*Sem.Semaphore)
    pthread_mutex_lock_(@*Sem\Mutex)
    
    *Sem\Count - 1   
    If *Sem\Count < 0
      ; wait only if the count fell below 0
      pthread_cond_wait_(@*Sem\Cond, @*Sem\Mutex) ; unlocks mutex while it waits.
    EndIf

    pthread_mutex_unlock_(@*Sem\Mutex)
  EndProcedure
  
  Procedure GetCPUCount()
    ;
    ; Code by remi_meier
    ;
    Protected file.l, count.l, s.s, dir.l
   
    If FileSize("/proc/cpuinfo") <> -1
      file = ReadFile(#PB_Any, "/proc/cpuinfo")
     
      If IsFile(file)
        count = 0
        While Not Eof(file)
          s.s = ReadString(file)
          If Left(s, 9) = "processor"
            count + 1
          EndIf
        Wend
       
        CloseFile(file)
      EndIf
     
    Else
      dir = ExamineDirectory(#PB_Any, "/proc/acpi/processor", "")
      count = 0
      If IsDirectory(dir)
        While NextDirectoryEntry(dir)
          If Left(DirectoryEntryName(dir), 3) = "CPU"
            count + 1
          EndIf
        Wend
        FinishDirectory(dir)
      EndIf
    EndIf
   
    ProcedureReturn count 
  EndProcedure

CompilerDefault
  CompilerError "Not implemented."
  
CompilerEndSelect
First example: The ParallelFor() with the changed system:

Code: Select all

XIncludeFile "parallel_common.pb"

Prototype ParallelLoopFunc(Value, *dataPtr)

Global StartSemaphore, StartedSemaphore, RunningSemaphore
Global CurrentValue, *CurrentData, CurrentLoop.ParallelLoopFunc
Global ProcessorCount

Procedure WorkerThread(dummy)
  Protected Value, *dataPtr, Loop.ParallelLoopFunc

  Repeat
    WaitSemaphore(StartSemaphore)     ; wait for "weakup" - semaphore
    WaitSemaphore(RunningSemaphore)   ; this one should be immediately available (allows to later know when the work is complete)
      Value    = CurrentValue         ; copy the values locally
      *dataPtr = *CurrentData
      Loop     = CurrentLoop
    SignalSemaphore(StartedSemaphore) ; signal the "startup-complete" semaphore so the next thread can be started
      Loop(Value, *dataPtr)           ; execute loop
    SignalSemaphore(RunningSemaphore) ; signal completion of the job
  ForEver
EndProcedure

Procedure ParallelFor(i1, i2, Loop.ParallelLoopFunc, *dataPtr = 0)
  Protected i

  *CurrentData = *dataPtr
  CurrentLoop  = Loop
  
  ; release one thread at a time so they can access the "CurrentValue"
  ; only the thread startup is serialized. the Loop() is run in parallel
  ;
  For CurrentValue = i1 To i2
    SignalSemaphore(StartSemaphore)
    WaitSemaphore(StartedSemaphore)
  Next CurrentValue
  
  ; wait for all worker threads to complete their work
  ;
  For i = 1 To ProcessorCount
    WaitSemaphore(RunningSemaphore)
  Next i
  
  ; reset the running count semaphore back to the start value
  ;
  SignalSemaphore(RunningSemaphore, ProcessorCount)
EndProcedure

; Initialize
;
ProcessorCount   = GetCPUCount()
StartSemaphore   = CreateSemaphore(0, 1)
StartedSemaphore = CreateSemaphore(0, 1)
RunningSemaphore = CreateSemaphore(ProcessorCount, ProcessorCount)

Define i
For i = 1 To ProcessorCount
  CreateThread(@WorkerThread(), 0)
Next i

; --------------------------------------------------------------------
; --------------------------------------------------------------------

;- EXAMPLE
DisableExplicit

maxX = 1400
maxY = 1050
Global Dim a.d(maxX, maxY)

Delay(100)
time = ElapsedMilliseconds()
counter = 0


Procedure InnerLoop(y.l, maxX.l)
  ; access has to be synchronized
  For x = 0 To maxX
    c.l = (Int(y) ! Int(x)) & $FF
    a(x, y) = Pow(c, x)
  Next
EndProcedure

Repeat
 
  ParallelFor(0, maxY, @InnerLoop(), maxX)
;   For y = 0 To maxY
;     For x = 0 To maxX
;       c.l = (Int(y) ! Int(x)) & $FF
;       a(x, y) = Pow(c, x)
;     Next
;   Next
 
  counter + 1
Until ElapsedMilliseconds() - time > 2000



MessageRequester("Fertig", "Durchläufe: "+Str(counter))
With some tricks, its even possible to execute a generic procedure
call in a separate thread. Lets call this "parallel procedure call":

This code scales very well: If there is only one processor available,
the procedure is called directly without any overhead, and if there
are multiple processors/cores, it uses as many threads as there
are available cores..

Its also very flexible, as you can mix different procedures in the
"parallel" loop without problems.

Code: Select all

XIncludeFile "parallel_common.pb"


Global StartStackOffset, EndStackOffset, StackCallSize
Global ParallelFunction, ParallelReturn
Global StartSemaphore, StartedSemaphore, RunningSemaphore
Global ProcessorCount, ParallelMutex

Procedure WorkerThread(dummy)
  Protected StackOffset

  Repeat
    WaitSemaphore(StartSemaphore)   ; wait for "weakup"     
    WaitSemaphore(RunningSemaphore) ; should be available immediately (marks this thread as "working")       
      ; copy the stack frame from the "caller" to this thread     
      !MOV [p.v_StackOffset], dword esp
      CopyMemory(EndStackOffset, StackOffset-StackCallSize, StackCallSize)
      !SUB esp, dword [v_StackCallSize]
      
      ; save the function pointer on the local stack as well
      !PUSH dword [v_ParallelFunction]
    SignalSemaphore(StartedSemaphore) ; allow next thread to start up
      !POP eax   ; get function pointer
      !CALL eax  ; call the function (must be stdcall!)
    SignalSemaphore(RunningSemaphore) ; job done
  ForEver
EndProcedure

; This is the "stub"-code that is called by the CallParallel() instead of the
; real function (through a prototype). It must determine the stack frame size
; and store the needed information in the global variables.
; Then it signals the "wakeup"-semaphore and waits for the "started" one
; before returning.
;
Goto EndParallelStub
ParallelStub:
  !POP dword [v_ParallelReturn]
  !MOV [v_EndStackOffset], esp
  StackCallSize = StartStackOffset - EndStackOffset
  SignalSemaphore(StartSemaphore)
  WaitSemaphore(StartedSemaphore)
  !PUSH dword [v_ParallelReturn]
  !RET  ; The stubs are PrototypeC, so no stack cleanup done here
EndParallelStub:

; To allow the CallParallel() to work, we need a prototype with the same
; arguments as the real procedure but which points to the stub code
; above 
Macro DeclareParallel(Name, Arguments)
  PrototypeC Name#_Prototype#Arguments  ; must be prototype C and the procedure must NOT be procedureC!
  Global     Name#_Caller.Name#_Prototype = ?ParallelStub
EndMacro

; Call a function in parallel (only if multiple processors are available)
;
Macro CallParallel(Name, Arguments)
  If ProcessorCount > 1 And (__ParallelLocked Or TryLockMutex(ParallelMutex))
    __ParallelLocked = 1 ; MUST NOT be global, so every thread gets its own copy of this variable!
    ParallelFunction = @Name()
    !MOV [v_StartStackOffset], esp
    Name#_Caller#Arguments    
  Else
    Name#Arguments
  EndIf
EndMacro

; Waits for all running jobs to complete
;
Procedure __FinishParallel()
  Protected i
  For i = 1 To ProcessorCount
    WaitSemaphore(RunningSemaphore)
  Next i
  SignalSemaphore(RunningSemaphore, ProcessorCount)
EndProcedure

; Wrapper for the above function to also unlock the
; mutex protection against multiple thread access
;
Macro FinishParallel()
  If __ParallelLocked
    __FinishParallel()
    __ParallelLocked = 0
    UnlockMutex(ParallelMutex)
  EndIf
EndMacro


; Initializes the parallel code.
; 'MinThreads' can be used to set the number of worker threads higher than
; the actual processor/core count. (for testing mainly)
;
Procedure InitParallel(MinThreads = 1)
  Protected i

  ProcessorCount = GetCPUCount()

  If ProcessorCount < MinThreads
    ProcessorCount = MinThreads
  EndIf
    
  If ProcessorCount > 1
    ParallelMutex    = CreateMutex()
    StartSemaphore   = CreateSemaphore(0, 1)
    StartedSemaphore = CreateSemaphore(0, 1)
    RunningSemaphore = CreateSemaphore(ProcessorCount, ProcessorCount)
    
    For i = 1 To ProcessorCount
      CreateThread(@WorkerThread(), 0)
    Next i
  EndIf
EndProcedure

; Usage:
;
; InitParallel([MinThreads]) 
;  - startup the parallel call system
;
; DeclareParallel(<procedurename>, <arguments>)
;  - declare a procedure for parallel calls.
;  - <arguments> must include the ()
;  - example: DeclareParallel(proc, (a.l, b.l))
;  NOTE: The procedure may not have string parameters, as these would
;    not be copied correctly to the thread before execution.
;
; CallParallel(<procedurename>, <arguments>)
;  - calls a procedure in parallel
;  - <arguments> must be in () again
;  - example: CallParallel(proc, (1, 3))
;
; FinishParallel()
;  - waits for any running parallel calls to finish
;  NOTE: This must be called after a parallel loop, or another thread
;    will not be able to execute parallel calls again
;    (they will all be done sequentially due to the mutex protection)

; --------------------------------------------------------------------
; --------------------------------------------------------------------

;- EXAMPLE
DisableExplicit

;InitParallel(2)
InitParallel() ; initialize

maxX = 1400
maxY = 1050
Global Dim a.d(maxX, maxY)
Global Dim b.d(maxX, maxY)

Delay(100)
time = ElapsedMilliseconds()
counter = 0

Repeat
 
  For y = 0 To maxY
    For x = 0 To maxX
      c.l = (Int(y) ! Int(x)) & $FF
      a(x, y) = Pow(c, x)
    Next
  Next
 
  counter + 1
Until ElapsedMilliseconds() - time > 2000


DeclareParallel(InnerLoop, (y.l, maxX.l)) ; declare the parallel call

Procedure InnerLoop(y.l, maxX.l)
  ; access has to be synchronized
  For x = 0 To maxX
    c.l = (Int(y) ! Int(x)) & $FF
    b(x, y) = Pow(c, x)
  Next
EndProcedure

Delay(100)
time = ElapsedMilliseconds()
counter2 = 0
Repeat
 
  For y = 0 To maxY
    CallParallel(InnerLoop, (y, maxX)) ; call in parallel
  Next
  FinishParallel() ; wait for all calls to complete
 
  counter2 + 1
Until ElapsedMilliseconds() - time > 2000

equal = 1
For y = 0 To maxY
  For x = 0 To maxX
    If a(x, y) <> b(x, y)
      equal = 0
      Break 2
    EndIf
  Next
Next

MessageRequester("Fertig", "Durchläufe seriell: "+Str(counter)+Chr(13)+"Durchläufe parallel: "+Str(counter2)+Chr(13)+"Ergebnis gleich?: "+Str(equal))
This is fun to experiment with. Who knows, maybe something like this
could be added natively to PB. Would make a very cool feature imho.

Could look somewhat like this:

Code: Select all

For i = 1 To 1000
  Parallel MyProcedure(a, b)
Next i

WaitParallel() ; syncronize again
btw, i have no dualcore system available yet (will get a quadcore soon),
so i am interested in some results from people who do.
(I get good results even on a singlecore when putting a delay() in the procedure,
which suggests that the parallelism is working, but of course that is no real test.)
quidquid Latine dictum sit altum videtur
User avatar
Comtois
Addict
Addict
Posts: 1429
Joined: Tue Aug 19, 2003 11:36 am
Location: Doubs - France

Post by Comtois »

freak wrote:This is fun to experiment with. Who knows, maybe something like this
could be added natively to PB. Would make a very cool feature imho.

Could look somewhat like this:

Code: Select all

For i = 1 To 1000
  Parallel MyProcedure(a, b)
Next i

WaitParallel() ; syncronize again
Ah yes ! Just do it :)

It could be very nice.
Please correct my english
http://purebasic.developpez.com/
remi_meier
Enthusiast
Enthusiast
Posts: 468
Joined: Sat Dec 20, 2003 6:19 pm
Location: Switzerland

Post by remi_meier »

Now that's cool :D

I also thought about that thread-start overhead but didn't try out anything
yet. I did some tests, but couldn't do many because there seems to be a
problem under Linux that sometimes locks up the examples you created,
and sometimes they just run fine... I will have to study them a bit.

Well, the first 2 runs of your ParallelFor() showed all 7 runs while my method
got 8. Now, this test is surely too small to say much (as I said, testing
was a bit boring while always having to Try-Kill-Try-Kill-Works :D ), but anyway
there is one point I'm not sure how much it impacts the speed of the two
methods:
AFAIK all cores (and of course CPUs) have their own cache. While splitting
the work in half for each core, the memory access will be sequential but when
you continue the thread in bits that are not continuous memory, the CPU
could have problems in seeing which memory has to be cached in order to
get the best speed as possible.
It would be very interesting to test if the time gap between one thread's ending
and the second one's last bits of work is more significant than sequential
memory access, or the other way around.

In the CallParallel() example, I sometimes got equal times for serial and
parallel method, but also saw that if this was the case, only one core was
used. If both cores were used, I got the 7 runs again.


I will do some more tests, but I'm not sure when.

Code: Select all

For i = 1 To 1000 
  Parallel MyProcedure(a, b) 
Next i 

WaitParallel() ; syncronize again
That would definitely be cool!
Athlon64 3700+, 1024MB Ram, Radeon X1600
remi_meier
Enthusiast
Enthusiast
Posts: 468
Joined: Sat Dec 20, 2003 6:19 pm
Location: Switzerland

Post by remi_meier »

Quite odd:
This code sometimes runs through, sometimes it locks up after a few "runned"
outputs. This shows, that the deadlock happens within

Code: Select all

  For CurrentValue = i1 To i2
    SignalSemaphore(StartSemaphore) 
    WaitSemaphore(StartedSemaphore) 
  Next CurrentValue
(or the thread). I tried to debug all the pthread_ functions within SignalSemaphore()
and WaitSemaphore(), but all returned 0 (success) and one probably doesn't
return (can't test that, because every "Debug" helps the code to run properly,
and it only happens with a 'maxY' that is high enough).
The threads and the semaphores are created successfully IITC.

I didn't have the time to look through the semaphore functions though.

Here is the code:

Code: Select all


XIncludeFile "parallel_common.pb"


Global StartSemaphore, StartedSemaphore

Procedure WorkerThread(dummy) 
  Repeat 
    WaitSemaphore(StartSemaphore)     ; wait for "weakup" - semaphore 
    SignalSemaphore(StartedSemaphore) ; signal the "startup-complete" semaphore so the next thread can be started 
  ForEver 
EndProcedure 

Procedure ParallelFor(i1, i2)
  Debug 1 
  For CurrentValue = i1 To i2
    SignalSemaphore(StartSemaphore) 
    WaitSemaphore(StartedSemaphore) 
  Next CurrentValue 
  Debug "runned"
  
EndProcedure 


ProcessorCount   = GetCPUCount()
StartSemaphore   = CreateSemaphore(0, 1) 
StartedSemaphore = CreateSemaphore(0, 1) 

Define i 
For i = 1 To ProcessorCount 
  CreateThread(@WorkerThread(), 0) 
Next i 



maxY = 5050 

Repeat 
  ParallelFor(0, maxY) 
Until ElapsedMilliseconds() - time > 2000 

MessageRequester("Fertig", "Durchläufe: "+Str(counter))
Athlon64 3700+, 1024MB Ram, Radeon X1600
freak
PureBasic Team
PureBasic Team
Posts: 5929
Joined: Fri Apr 25, 2003 5:21 pm
Location: Germany

Post by freak »

I think its the linux semaphore code. I did not test that very much.

After some reading of the posix thread docs, a wait on a condition variable
can return even when the condition is not really signaled. This is very odd imho.
I will try to adjust the code to work around this tomorrow.
quidquid Latine dictum sit altum videtur
freak
PureBasic Team
PureBasic Team
Posts: 5929
Joined: Fri Apr 25, 2003 5:21 pm
Location: Germany

Post by freak »

Ok, i misunderstood some stuff in the condition variable docs.
Here is a new try at the semaphore commands:

Code: Select all

  #__SIZEOF_PTHREAD_MUTEX_T = 24
  #__SIZEOF_PTHREAD_COND_T  = 48
  
  Structure pthread_mutex_t
    opaque.b[#__SIZEOF_PTHREAD_MUTEX_T]
  EndStructure
  
  Structure pthread_cond_t
    opaque.b[#__SIZEOF_PTHREAD_COND_T]
  EndStructure

  Structure Semaphore
    Count.l
    Max.l
    Release.l
    Mutex.pthread_mutex_t
    Cond.pthread_cond_t
  EndStructure
  
  ImportC "-lpthread"
    pthread_cond_init_(*cond, *attr)  As "pthread_cond_init"
    pthread_cond_destroy_(*cond)      As "pthread_cond_destroy"
    pthread_cond_signal_(*cond)       As "pthread_cond_signal"
    pthread_cond_broadcast_(*cond)    As "pthread_cond_broadcast"
    pthread_cond_wait_(*cond, *mutex) As "pthread_cond_wait"
  EndImport

  Procedure CreateSemaphore(Initial = 0, Max = 1)
    *Sem.Semaphore = AllocateMemory(SizeOf(Semaphore))    
    If *Sem And Initial >= 0 And Initial <= Max
      *Sem\Count   = Initial
      *Sem\Max     = Max
      *Sem\Release = 0
      pthread_mutex_init_(@*Sem\Mutex, #Null)
      pthread_cond_init_(@*Sem\Cond, #Null)     
    EndIf
    
    ProcedureReturn *Sem
  EndProcedure
  
  Procedure FreeSemaphore(*Sem.Semaphore)
    pthread_mutex_destroy_(@*Sem\Mutex)
    pthread_cond_destroy_(@*Sem\Cond)
    FreeMemory(*Sem)
  EndProcedure
  
  Procedure SignalSemaphore(*Sem.Semaphore, Inc = 1)
    Protected Result = 0, ToRelease
    pthread_mutex_lock_(@*Sem\Mutex)
    
    If *Sem\Count + Inc <= *Sem\Max And Inc > 0
      Result = 1
      
      ; Control how many waiting threads will continue running,
      ; as threads can also wakeup spontaneously
      ;
      If *Sem\Count >= 0
        ToRelease = 0
      ElseIf *Sem\Count + Inc <= 0
        ToRelease = Inc
      Else
        ToRelease = - *Sem\Count
      EndIf

      *Sem\Count   + Inc
      *Sem\Release + ToRelease 
      
      If ToRelease = 1
        ; wakeup 1 waiting thread (could be more in some conditions according to the doc)
        pthread_cond_signal_(@*Sem\Cond)
      ElseIf ToRelease > 1
        ; wakeup all waiting threads. The *Sem\Release controls how many start running
        pthread_cond_broadcast_(@*Sem\Cond)
      EndIf
    EndIf

    pthread_mutex_unlock_(@*Sem\Mutex)
    ProcedureReturn Result
  EndProcedure
  
  Procedure WaitSemaphore(*Sem.Semaphore)
    pthread_mutex_lock_(@*Sem\Mutex)
    
    *Sem\Count - 1   
    
    If *Sem\Count < 0 ; if count < 0 then we must block
    
      ; The docs state that threads can return from a wait spontaneously, so
      ; the *Sem\Release is used to control when a thread actually continues running
      ; It contains the number of threads to wakeup, so count it down.
      While *Sem\Release = 0
        pthread_cond_wait_(@*Sem\Cond, @*Sem\Mutex) ; unlocks mutex while it waits.
      Wend
      
      *Sem\Release - 1
    EndIf

    pthread_mutex_unlock_(@*Sem\Mutex)
  EndProcedure
I cannot reproduce your problem though, so i don't know if this fixes it.
(I run linux only in a VM, maybe that has an effect)
quidquid Latine dictum sit altum videtur
remi_meier
Enthusiast
Enthusiast
Posts: 468
Joined: Sat Dec 20, 2003 6:19 pm
Location: Switzerland

Post by remi_meier »

Seems to work :)
Ok, I've run some tests now:
Time: 10 Seconds
maxX = 400
maxY = 50
Problem: The threads aren't working long enough and therefore both stay on
one core most of the time.
My PFor: Actually never ran on both cores: ~1600 runs
Freak's PCall: ~2400 or ~1500 (~50% of the tests ran on both cores)
Freak's PFor: ~2450 (100% of the tests ran on both cores)
maxX = 2400
maxY = 2050
My PFor: 11 runs, always
Freak's PCall: 10, always
Freak's PFor: 9, always
maxX = 4400
maxY = 2050
My PFor: 6 runs, always
Freak's PCall: 5, always
Freak's PFor: 5, always
It's interesting that the solution with 2 resident threads is much more stable
and therefore also boosts speed with small loops that are not that time
consuming.
It seems that an 90% speed increase is certainly realistic for calculations on
big datasets. And it can be that simple :D

Eagerly awaiting Quad-Core tests!
Athlon64 3700+, 1024MB Ram, Radeon X1600
Rinzwind
Enthusiast
Enthusiast
Posts: 636
Joined: Wed Mar 11, 2009 4:06 pm
Location: NL

Re: ParallelFor - Use your cores

Post by Rinzwind »

Another example where cool ideas for PB extension stay laying around.
Anyway, any update for x64?
Post Reply