-Restructured internals
-Added Services
Simple queued state machine actor framework for PureBasic. Each actor is spawned in it's own thread and has it own data or interfaces it works on. Actors talk to each other by adding states to another actor's queue. Maybe hard to wrap your head around at first, at least for most LabVIEW "Developers". I have known a few "Developers" who didn't have the sense God gave a baby duck. See https://en.wikipedia.org/wiki/Actor_model for more information. Edit: https://www.brianstorti.com/the-actor-model/ I think this does a good job at explaining the actor model.
Features
Dynamic registration of actors
Queued state machine architecture similar to the ones commonly seen in LabVIEW
Simple multi threaded development
Services with subscribers
Improvements needed
Distributed Actor Framework - Adds a master message handler that actors can register with. Local messages are still handled locally only remote messages hit the master. Allows talking between local applications or across the internet. As long as the message format is kept the same, applications written in other languages can mix here. My plain is to keep this compatible with the distributed framework I already written in LabVIEW.
Delayed messaging - Hold a message unit time x
Cleaner way of sending data, something better then Allocate memory and having GetNextState free after it has been used.
Examples and documentation!
Main actor is called directly in your program or via WaitThread(CreateThread(@Console(), 0)) All other actors are called before the main with CreateThread. When sharing data between actors the memory used to hold the data is cleared up on the next call of GetNextState.
Each actor has it's own queue telling it which code to execute. If the queue is empty GetNextState gives you the idle state often just called "Idle". Idle contains any delay for the actor to prevent it from eating all your resources. All actors should have an "Exit" case so they can be shutdown via AddState_Global("Exit")
Typical actor setup
Code: Select all
Procedure Main(*Input)
Protected.Actor::State State ; Holds next State information
Protected Actor.s = "Main" ; Easier to Handle the actor name in a string
Actor::Register(Actor, "Idle") ; Registers the Actor "Main" with default state Idle
Actor::AddState(Actor, "Setup") ; Makes sure setup is the first called case
Repeat
Actor::GetNextState(Actor, @State)
Select State\State
Case "Setup"
;Setup code here
Case "Idle"
;Idle case, put delay here. Call events case here if handling events.
Delay(10)
Case "Exit"
;Called to exit actor
Break
Default
;Should never be here something is wrong. Display message and quit
Actor::AddState_Global("Exit")
Debug "[ERROR] ACTOR: "+ Actor +" STATE: "+ State\State
Break
EndSelect
ForEver
Actor::Remove(Actor)
EndProcedure
Code: Select all
; Title : Actor
; Created: 23/12/2019
; Author : CopperD
; Credits:
; File : ActorFrameWork.pbi
; Info : Actor Framework for PureBasic. Early release for public testing.
; v0.2 : New features / internal rework
; Added: ---- 28/12/2019----------------------
; Internal restructure
; Single Mutex
; ActorManager control function (Private)
; Accessed via public functions
; Create Service Added
; Subscribe Added
; Publish Added
;
; Added: ---- 23/12/2019----------------------
; Inital Release
; --------------------------------------------
;--------------------------------------------
;-Notes
;-MUST ENABLE THREADSAFE EXECUTABLE
;-NEVER FreeMemory on *Data
; Data pointers passed via AddState are freed by GetNextState after they are used by FreeMemory
; Copy data you want to pass into a new memory allocation via AllocateMemory
;
;--------------------------------------------
DeclareModule Actor
Structure State
State.s
*Data
EndStructure
Declare InitActor()
Declare.i Register(Actor.s, IdleState.s="Idle")
Declare.i Remove(Actor.s)
Declare.i AddState(Actor.s, State.s, *Data=#Null)
Declare.i AddState_Global(State.s, *Data=#Null)
Declare.i GetNextState(Actor.s, *State.State)
Declare.i RegisterService(Actor.s, Service.s)
Declare.i SubscribeService(Service.s, Actor.s, State.s)
Declare.i PublishService(Service.s, *Data=#Null)
EndDeclareModule
Module Actor
EnableExplicit
Structure ServiceList
Actor.s
Service.s
EndStructure
Structure Subscriber
Actor.s
State.s
EndStructure
Structure ServiceMap
Service.S
Map Subscribers.Subscriber()
EndStructure
Structure ActorData
Name.s
IdleState.s
List Queue.State()
Map Services.ServiceMap()
*CleanUp
EndStructure
Structure Actor_Internal
*Mutex
Map Actors.ActorData()
Map ServiceListing.ServiceList()
EndStructure
Global Internal.Actor_Internal
Enumeration Mode
#Actor_Init
#Actor_Register
#Actor_Add
#Actor_AddGlobal
#Actor_Get
#Actor_Remove
#Actor_Service
#Actor_Subscribe
#Actor_Publish
EndEnumeration
;--------------------------------------------
;-Public Functions
;--------------------------------------------
Declare.i ActorManager(Mode, S1.S=#Null$, S2.S=#Null$, S3.s=#Null$, *Data=#Null)
; --------
; Initialize Actor Module
;
; Cannot fail
;
; --------
Procedure InitActor()
ActorManager(#Actor_Init)
EndProcedure
; --------
; Register
;
; Registers a new actor. Returns 1 if in error 0 if good
; Will fail if actor already registers or Name is null
;
; Inputs
; Actor: Actor Name
; IdleState: State to goto when nothing is on the Queue. Default is Idle
;
; --------
Procedure.i Register(Actor.s, IdleState.s="Idle")
ProcedureReturn ActorManager(#Actor_Register, Actor, IdleState)
EndProcedure
; ---------
; Remove
;
; Removes an actor. Returns 1 if in error 0 if good
; Will fail if Name is null
;
; Inputs
; Name: Actor Name
;
; ---------
Procedure.i Remove(Actor.s)
ProcedureReturn ActorManager(#Actor_Remove, Actor)
EndProcedure
; ---------
; AddState
;
; Adds a state to an actor.
; Will fail if state or Name is null
; Will fail if can not allocate queue item
;
; Inputs
; Name: Actor Name
; State: State being added to actor named above
;
; ---------
Procedure.i AddState(Actor.s, State.s, *Data=#Null)
ProcedureReturn ActorManager(#Actor_Add, Actor, State, "", *Data)
EndProcedure
; ---------
; AddState_Global
;
; Adds a state to all actors.
; Will fail if state empty
;
;
; Inputs
; State: State to add to all actors
;
; ---------
Procedure.i AddState_Global(State.s, *Data=#Null)
ProcedureReturn ActorManager(#Actor_AddGlobal, State, "", "", *Data)
EndProcedure
; ---------
; GetNextState
;
; Gets next state from queue or returns idle state
; Will fail if state or actor is null
; Will fail if can not allocate queue item
;
; Inputs
; Name: Actor Name
; *State: Pointer to Actor's local copy of state
; ---------
Procedure.i GetNextState(Actor.s, *State.State)
ProcedureReturn ActorManager(#Actor_Get, Actor, "", "", *State)
EndProcedure
; ---------
; Register Service
;
; Adds a service to an actor.
; Will fail if actor or service is null
;
; Inputs
; Actor: Actor Name
; Service: State being added to actor named above
;
; ---------
Procedure.i RegisterService(Actor.s, Service.s)
ProcedureReturn ActorManager(#Actor_Service, Actor, Service)
EndProcedure
; ---------
; Subscribe Service
;
; Subscribes an actor to a service
; Will fail if any input is null or not found
;
; Inputs
; Service: Service we want to subscribe to
; Actor: Actor being added to subscription
; State: Actor's state that will handle publishments
;
; ---------
Procedure.i SubscribeService(Service.s, Actor.s, State.s)
ProcedureReturn ActorManager(#Actor_Subscribe, Service, Actor, State)
EndProcedure
Procedure.i PublishService(Service.s, *Data=#Null)
ProcedureReturn ActorManager(#Actor_Publish, Service, "", "", *Data)
EndProcedure
;--------------------------------------------
;-Private Functions
;--------------------------------------------
; ---------
; ActorManager
; Handles internal data privately and non-reentrant. Protected by Mutex.
Procedure.i ActorManager(Mode, S1.S=#Null$, S2.S=#Null$, S3.s=#Null$, *Data=#Null)
;Create Mutex if not created
If Internal\Mutex = #Null
Internal\Mutex = CreateMutex()
EndIf
;Wait here
LockMutex(Internal\Mutex)
;Setup error return <> 0 error
Protected Error.i = 1
Protected Test.i = 0
Select Mode
; ---------------------------------------------------------------------------------
Case #Actor_Init
;Nothing required allows the Mutex to be created.
Error = 0
; ---------------------------------------------------------------------------------
Case #Actor_Register
;S1 passes in Actor name and S2 brings in the Idle State
If S1 <> #Null$
;Make sure we dont already have this actor registered
If FindMapElement(Internal\Actors(), S1) = #Null
;Make sure we can allocate the new map
Test = AddMapElement(Internal\Actors(), S1, #PB_Map_ElementCheck)
If Test <> 0
Internal\Actors()\Name = S1
Internal\Actors()\IdleState = S2
Internal\Actors()\CleanUp = #Null
Error = 0
Else
;Could not allocate actor map memory
Error = 1
EndIf
Else
;Actor Already Exists
Error = 1
EndIf
Else
;Actor Name not given
Error = 1
EndIf
; ---------------------------------------------------------------------------------
Case #Actor_Add
;AddState: S1 Actor Name, S2 State, and Data if needed
If S2 <> #Null$ And S1 <> #Null$
;Make sure actor exits
If FindMapElement(Internal\Actors(), S1) <> #Null
FirstElement(Internal\Actors()\Queue())
Protected *Queuelist.State = InsertElement(Internal\Actors()\Queue())
;Make sure we can add queue
If *Queuelist <> #Null
Error = 0
*Queuelist\State = S2
*Queuelist\Data = *Data
Else
;Could not add queue item
Error = 1
EndIf
Else
;Actor not found
Error = 1
EndIf
Else
; Actor name or state missing
Error = 1
EndIf
; ---------------------------------------------------------------------------------
Case #Actor_AddGlobal
; S1 State, and Data if needed
; Check that we have a valid STate
If S1 <> #Null$
ForEach Internal\Actors()
FirstElement(Internal\Actors()\Queue())
Protected *QueuelistGlobal.State = InsertElement(Internal\Actors()\Queue())
;Check that the queuq was able to add the element
If *QueuelistGlobal <> #Null
Error = 0
*QueuelistGlobal\State = S1
;Are we passing data to the actors?
If *Data <> #Null
;Create a new copy for each actor.
Protected Size.i = MemorySize(*Data)
Protected *Temp = AllocateMemory(Size)
If *Temp <> #Null
*QueuelistGlobal\Data = CopyMemory(*Data, *Temp, Size)
Else
;COuld not allocate a copy of data
Error = 1
Break
EndIf
EndIf
Else
;Could not allocate queue item
Error = 1
Break
EndIf
Next
Else
;No state
Error = 1
EndIf
;Are we passing data to the actors?
If *Data <> #Null
;Original is no longer needed as all actors have own copy
FreeMemory(*Data)
EndIf
; ---------------------------------------------------------------------------------
Case #Actor_Get
;S1 is actor *Data is state
;Check S1 is not null
If S1 <> #Null$
;Check Actor exists
If FindMapElement(Internal\Actors(), S1) <> #Null
Error = 0
;Do we have memory to free from last state?
If Internal\Actors()\CleanUp <> #Null
FreeMemory(Internal\Actors()\CleanUp)
Internal\Actors()\CleanUp = #Null
EndIf
;Do we have an item waiting on the queue
Protected *TempData.State = *Data
If LastElement(Internal\Actors()\Queue()) <> 0
*TempData.State\State = Internal\Actors()\Queue()\State
;Copy pointer to data in state that is passed out and to cleanup so it it freed on next Get
Protected *TempGet = Internal\Actors()\Queue()\Data
*TempData.State\Data = *TempGet
Internal\Actors()\CleanUp = *TempGet
;Delete item on queue
DeleteElement(Internal\Actors()\Queue())
Else
;Nothing found on queue copy Idle state
*TempData.State\State = Internal\Actors()\IdleState
*TempData.State\Data = #Null
EndIf
Else
;Actor not found
Error = 1
EndIf
Else
;Actor name null
EndIf
; ---------------------------------------------------------------------------------
Case #Actor_Remove
;S1 brings in actor name
If S1 <> #Null$
;Make sure the actor exists
If FindMapElement(Internal\Actors(), S1) <> #Null
;Check if actor has any services listed and remove them
ForEach Internal\Actors()\Services()
DeleteMapElement(Internal\ServiceListing(), Internal\Actors()\Services()\Service)
Next
;Free actor's queue and service map
FreeList(Internal\Actors()\Queue())
FreeMap(Internal\Actors()\Services())
DeleteMapElement(Internal\Actors(), S1)
Error = 0
Else
;Actor not found
Error = 1
EndIf
Else
;Null actor string
Error = 1
EndIf
; ---------------------------------------------------------------------------------
Case #Actor_Service
;Register Service S1 Actor S2 Service
;Make sure service does not exist
If FindMapElement(Internal\ServiceListing(), S2) = #Null
;Check for null string inputs
If S1 <> #Null$ And S2 <> #Null$
;Actor exists
If FindMapElement(Internal\Actors(),S1) <> #Null
;Add listing
If AddMapElement(Internal\ServiceListing(), S2) <> #Null
Internal\ServiceListing()\Actor = S1
Internal\ServiceListing()\Service = S2
;Add to actor services
If AddMapElement(Internal\Actors()\Services(), S2) <> #Null
Internal\Actors()\Services()\Service = S2
Error = 0
Else
;Couldn't add to Actor Services
Error = 1
EndIf
Else
;Couldn't add to ServiceListing
Error = 1
EndIf
Else
;Actor not found
Error = 1
EndIf
Else
;Actor or Service null
Error = 1
EndIf
Else
;Service exists
Error = 1
EndIf
; ---------------------------------------------------------------------------------
Case #Actor_Subscribe
; S1 Service S2 Actor S3 State
If S1 <> #Null$ And S2 <> #Null$ And S3 <> #Null$
;Find service
If FindMapElement(Internal\ServiceListing(), S1) <> #Null
FindMapElement(Internal\Actors(), Internal\ServiceListing()\Actor)
FindMapElement(Internal\Actors()\Services(), S1)
If AddMapElement(Internal\Actors()\Services()\Subscribers(), S2) <> #Null
Internal\Actors()\Services()\Subscribers()\Actor = S2
Internal\Actors()\Services()\Subscribers()\State = S3
Error = 0
Else
;Could't add to actor service subscriber map
Error = 1
EndIf
Else
;Service not found
Error = 1
EndIf
Else
;Input has null string
Error = 1
EndIf
; ---------------------------------------------------------------------------------
Case #Actor_Publish
; S1 Service *Data
If S1 <> #Null$
If FindMapElement(Internal\ServiceListing(), S1) <> #Null
Protected ServiceActor.s = Internal\ServiceListing()\Actor
ForEach Internal\Actors(ServiceActor)\Services(S1)\Subscribers()
Protected RecieverActor.s = Internal\Actors(ServiceActor)\Services(S1)\Subscribers()\Actor
Protected RecieverState.s = Internal\Actors(ServiceActor)\Services(S1)\Subscribers()\State
If FindMapElement(Internal\Actors(), RecieverActor) <> #Null
FirstElement(Internal\Actors()\Queue())
Protected *QueuelistPublish.State = InsertElement(Internal\Actors()\Queue())
;Make sure we can add queue
If *QueuelistPublish <> #Null
Error = 0
*QueuelistPublish\State = RecieverState
*QueuelistPublish\Data = *Data
Else
;Could not add queue item
Error = 1
EndIf
Else
;Actor not found, Not a major issue could be removed
EndIf
Next
Else
;Service not found
Error = 1
EndIf
Else
;Service was Null
Error = 1
EndIf
; ---------------------------------------------------------------------------------
EndSelect
UnlockMutex(Internal\Mutex)
ProcedureReturn Error
EndProcedure
EndModule
;--------------------------------------------
;-Self Test / Basic Example
;--------------------------------------------
CompilerIf #PB_Compiler_IsMainFile
Procedure Main(*Input)
Protected.Actor::State State ; Holds next State information
Protected Actor.s = "Main" ; Easier to Handle the actor name in a string
Actor::Register(Actor, "Idle") ; Registers the Actor "Main" with default state Idle
Actor::AddState(Actor, "Setup") ; Makes sure setup is the first called case
Repeat
Actor::GetNextState(Actor, @State)
Debug "Actor: "+Actor+" State: "+State\State
Select State\State
Case "Setup"
;Setup code here
Debug "Creating Services"
Actor::RegisterService(Actor, "Service1")
Actor::RegisterService(Actor, "Service2")
Case "Idle"
;Idle case, put delay here. Call events case here if handling events.
Delay(500)
Actor::PublishService("Service1")
Actor::PublishService("Service2")
Case "Exit"
;Called to exit actor
Break
Default
;Should never be here something is wrong. Display message and quit
Actor::AddState_Global("Exit")
Debug "[ERROR] ACTOR: "+ Actor +" STATE: "+ State\State
Break
EndSelect
ForEver
Actor::Remove(Actor)
EndProcedure
Procedure ActorA(*Input)
Protected.Actor::State State
Protected Actor.s = "Actor1"
Actor::Register(Actor, "Idle")
Actor::AddState(Actor, "Setup")
Repeat
Actor::GetNextState(Actor, @State)
Debug "Actor: "+Actor+" State: "+State\State
Select State\State
Case "Setup"
;Setup code here
While Actor::SubscribeService("Service1", Actor, "State1") <> 0
Delay(100)
Debug "Waiting for service"
Wend
Case "Idle"
;Idle case, put delay here. Call events case here if handling events.
Delay(100)
Case "State1"
Debug "ActorA: Service1 MSG"
Case "Exit"
;Called to exit actor
Break
Default
;Should never be here something is wrong. Display message and quit
Actor::AddState_Global("Exit")
Debug "[ERROR] ACTOR: "+ Actor +" STATE: "+ State\State
Break
EndSelect
ForEver
Actor::Remove(Actor)
EndProcedure
Procedure ActorB(*Input)
Protected.Actor::State State
Protected Actor.s = "Actor2"
Protected Count.i = 0
Actor::Register(Actor, "Idle")
Actor::AddState(Actor, "Setup")
Repeat
Actor::GetNextState(Actor, @State)
Debug "Actor: "+Actor+" State: "+State\State
Select State\State
Case "Setup"
;Setup code here
While Actor::SubscribeService("Service1", Actor, "State1") <> 0
Delay(100)
Debug "Waiting for service"
Wend
While Actor::SubscribeService("Service2", Actor, "State2") <> 0
Delay(100)
Debug "Waiting for service"
Wend
Case "Idle"
;Idle case, put delay here. Call events case here if handling events.
Delay(100)
Case "State1"
Debug "ActorB: Service1 MSG"
Case "State2"
Debug "ActorB: Service2 MSG"
Count + 1
If Count = 10
Actor::AddState_Global("Exit")
EndIf
Case "Exit"
;Called to exit actor
Break
Default
;Should never be here something is wrong. Display message and quit
Actor::AddState_Global("Exit")
Debug "[ERROR] ACTOR: "+ Actor +" STATE: "+ State\State
Break
EndSelect
ForEver
Actor::Remove(Actor)
EndProcedure
Actor::InitActor()
CreateThread(@ActorA(), 0)
CreateThread(@ActorB(), 0)
Main(0)
Delay(500)
CompilerEndIf