with Unchecked_Deallocation;
with Ada.Streams;
with System.RPC.Net_Trace;
with System.RPC.Garlic;
with System.RPC.Streams;
pragma Elaborate (System.RPC.Garlic);
package body System.RPC is
use type Ada.Streams.Stream_Element_Count;
use type Ada.Streams.Stream_Element_Offset;
use type Garlic.Protocol_Access;
use type Garlic.Lock_Method;
Max_Of_Message_Id : constant := 127;
subtype Message_Id_Type is
Integer range -Max_Of_Message_Id .. Max_Of_Message_Id;
subtype Request_Id_Type is Message_Id_Type range 1 .. Max_Of_Message_Id;
type Message_Length_Per_Request is array (Request_Id_Type)
of Ada.Streams.Stream_Element_Count;
Header_Size : Ada.Streams.Stream_Element_Count :=
Streams.Get_Integer_Initial_Size +
Streams.Get_SEC_Initial_Size;
Stream_Error : exception;
Partition_RPC_Receiver : RPC_Receiver;
type Anonymous_Task_Node;
type Anonymous_Task_Node_Access is access Anonymous_Task_Node;
task type Anonymous_Task_Type (Self : Anonymous_Task_Node_Access) is
entry Start
(Message_Id : in Message_Id_Type;
Partition : in Partition_ID;
Params_Size : in Ada.Streams.Stream_Element_Count;
Result_Size : in Ada.Streams.Stream_Element_Count;
Protocol : in Garlic.Protocol_Access);
end Anonymous_Task_Type;
type Anonymous_Task_Access is access Anonymous_Task_Type;
type Anonymous_Task_List is record
Head : Anonymous_Task_Node_Access;
Tail : Anonymous_Task_Node_Access;
end record;
type Anonymous_Task_Node is record
Element : Anonymous_Task_Access;
Next : Anonymous_Task_Node_Access;
end record;
protected Garbage_Collector is
procedure Allocate
(Item : out Anonymous_Task_Node_Access);
procedure Deallocate
(Item : in out Anonymous_Task_Node_Access);
private
Anonymous_List : Anonymous_Task_Node_Access;
end Garbage_Collector;
task Dispatcher is
entry New_Request (Request : out Request_Id_Type);
entry Wait_On (Request_Id_Type)
(Length : out Ada.Streams.Stream_Element_Count);
entry Wake_Up
(Request : in Request_Id_Type;
Length : in Ada.Streams.Stream_Element_Count);
end Dispatcher;
task Environnement is
entry Start;
end Environnement;
protected Partition_Receiver is
entry Is_Set;
procedure Set;
private
Was_Set : Boolean := False;
end Partition_Receiver;
type Debug_Level is
(D_Elaborate, D_Communication, D_Debug, D_Exception);
package Debugging is new System.RPC.Net_Trace (Debug_Level, "RPC : ");
procedure D
(Flag : in Debug_Level; Info : in String) renames Debugging.Debug;
protected body Partition_Receiver is
entry Is_Set when Was_Set is
begin
null;
end Is_Set;
procedure Set is
begin
Was_Set := True;
end Set;
end Partition_Receiver;
procedure Head_Node
(Index : out Packet_Node_Access;
Stream : Params_Stream_Type)
is
begin
Index := Stream.Extra.Head;
exception
when others =>
D (D_Exception, "exception in Head_Node");
raise;
end Head_Node;
procedure Tail_Node
(Index : out Packet_Node_Access;
Stream : Params_Stream_Type)
is
begin
Index := Stream.Extra.Tail;
exception
when others =>
D (D_Exception, "exception in Tail_Node");
raise;
end Tail_Node;
function Null_Node (Index : in Packet_Node_Access) return Boolean is
begin
return Index = null;
exception
when others =>
D (D_Exception, "exception in Null_Node");
raise;
end Null_Node;
procedure Delete_Head_Node (Stream : in out Params_Stream_Type) is
procedure Free is
new Unchecked_Deallocation
(Packet_Node, Packet_Node_Access);
Next_Node : Packet_Node_Access := Stream.Extra.Head.Next;
begin
Free (Stream.Extra.Head);
Stream.Extra.Head := Next_Node;
if Stream.Extra.Head = null then
Stream.Extra.Tail := null;
end if;
exception
when others =>
D (D_Exception, "exception in Delete_Head_Node");
raise;
end Delete_Head_Node;
procedure Next_Node (Node : in out Packet_Node_Access) is
begin
if Node = null then
raise Stream_Error;
else
Node := Node.Next;
end if;
exception
when others =>
D (D_Exception, "exception in Next_Node");
raise;
end Next_Node;
procedure Append_New_Node (Stream : in out Params_Stream_Type) is
Index : Packet_Node_Access;
begin
Tail_Node (Index, Stream);
if Null_Node (Index) then
Stream.Extra.Head := new Packet_Node;
Stream.Extra.Tail := Stream.Extra.Head;
else
Stream.Extra.Tail.Next := new Packet_Node;
Stream.Extra.Tail := Stream.Extra.Tail.Next;
end if;
exception
when others =>
D (D_Exception, "exception in Append_New_Node");
raise;
end Append_New_Node;
procedure Read
(Stream : in out Params_Stream_Type;
Item : out Ada.Streams.Stream_Element_Array;
Last : out Ada.Streams.Stream_Element_Offset)
renames System.RPC.Streams.Read;
procedure Write
(Stream : in out Params_Stream_Type;
Item : in Ada.Streams.Stream_Element_Array)
renames System.RPC.Streams.Write;
protected body Garbage_Collector is
procedure Allocate (Item : out Anonymous_Task_Node_Access) is
New_Anonymous_Task_Node : Anonymous_Task_Node_Access;
Anonymous_Task : Anonymous_Task_Access;
begin
if Anonymous_List = null then
New_Anonymous_Task_Node := new Anonymous_Task_Node;
Anonymous_Task :=
new Anonymous_Task_Type (New_Anonymous_Task_Node);
New_Anonymous_Task_Node.all := (Anonymous_Task, null);
else
New_Anonymous_Task_Node := Anonymous_List;
Anonymous_List := Anonymous_List.Next;
New_Anonymous_Task_Node.Next := null;
end if;
Item := New_Anonymous_Task_Node;
exception
when others =>
D (D_Exception, "exception in Allocate (Anonymous Task)");
raise;
end Allocate;
procedure Deallocate (Item : in out Anonymous_Task_Node_Access) is
begin
Item.Next := Anonymous_List;
Anonymous_List := Item;
exception
when others =>
D (D_Exception, "exception in Deallocate (Anonymous Task)");
raise;
end Deallocate;
end Garbage_Collector;
procedure Do_RPC
(Partition : Partition_ID;
Params : access Params_Stream_Type;
Result : access Params_Stream_Type)
is
Protocol : Protocol_Access;
Request : Request_Id_Type;
Header : aliased Params_Stream_Type (Header_Size);
R_Length : Ada.Streams.Stream_Element_Count;
begin
if Partition /= Garlic.Get_My_Partition_ID then
Dispatcher.New_Request (Request);
D (D_Debug, "Do_RPC - Build header");
Streams.Allocate (Header);
Streams.Integer_Write_Attribute (Header'Access, Request);
System.RPC.Streams.SEC_Write_Attribute (Header'Access, Result.Initial_Size);
D (D_Communication,
"Do_RPC - Lookup for protocol to talk to partition" &
Partition_ID'Image (Partition));
Garlic.Initiate_Send
(Partition,
Streams.Get_Stream_Size (Header'Access) +
Streams.Get_Stream_Size (Params), Protocol,
Garlic.Remote_Call);
D (D_Communication, "Do_RPC - Send Header to partition" &
Partition_ID'Image (Partition));
Garlic.Send
(Protocol.all,
Partition,
Header'Access);
Streams.Deallocate (Header);
D (D_Communication, "Do_RPC - Send Params to partition" &
Partition_ID'Image (Partition));
Garlic.Send
(Protocol.all,
Partition,
Params);
Garlic.Complete_Send
(Protocol.all,
Partition);
D (D_Debug, "Do_RPC - Suspend");
Dispatcher.Wait_On (Request) (R_Length);
D (D_Debug, "Do_RPC - Resume");
declare
New_Result : aliased Params_Stream_Type (R_Length);
begin
Streams.Allocate (New_Result);
D (D_Communication, "Do_RPC - Receive Result from partition" &
Partition_ID'Image (Partition));
Garlic.Receive
(Protocol.all,
Partition,
New_Result'Access);
Garlic.Complete_Receive
(Protocol.all,
Partition);
D (D_Debug, "Do_RPC - Reconstruct Result");
Streams.Deallocate (Result.all);
Result.Initial := New_Result.Initial;
Streams.Dump ("|||", Result.all);
end;
else
Partition_Receiver.Is_Set;
D (D_Debug, "Do_RPC - Locally");
Partition_RPC_Receiver.all (Params, Result);
end if;
exception
when others =>
D (D_Exception, "exception in Do_RPC");
raise;
end Do_RPC;
procedure Do_APC
(Partition : Partition_ID;
Params : access Params_Stream_Type)
is
Message_Id : Message_Id_Type := 0;
Protocol : Protocol_Access;
Header : aliased Params_Stream_Type (Header_Size);
begin
if Partition /= Garlic.Get_My_Partition_ID then
D (D_Debug, "Do_APC - Build Header");
Streams.Allocate (Header);
Streams.Integer_Write_Attribute
(Header'Access, Integer (Message_Id));
Streams.SEC_Write_Attribute
(Header'Access, 0);
D (D_Communication,
"Do_APC - Lookup for protocol to talk to partition" &
Partition_ID'Image (Partition));
Garlic.Initiate_Send
(Partition,
Streams.Get_Stream_Size (Header'Access) +
Streams.Get_Stream_Size (Params),
Protocol,
Garlic.Remote_Call);
D (D_Communication, "Do_APC - Send Header to partition" &
Partition_ID'Image (Partition));
Garlic.Send
(Protocol.all,
Partition,
Header'Access);
Streams.Deallocate (Header);
D (D_Communication, "Do_APC - Send Params to partition" &
Partition_ID'Image (Partition));
Garlic.Send
(Protocol.all,
Partition,
Params);
Garlic.Complete_Send
(Protocol.all,
Partition);
else
declare
Result : aliased Params_Stream_Type (0);
begin
Partition_Receiver.Is_Set;
D (D_Debug, "Do_APC - Locally");
Partition_RPC_Receiver.all (Params, Result'Access);
end;
end if;
exception
when others =>
D (D_Exception, "exception in Do_APC");
raise;
end Do_APC;
procedure Establish_RPC_Receiver
(Partition : in Partition_ID;
Receiver : in RPC_Receiver)
is
begin
Partition_RPC_Receiver := Receiver;
Partition_Receiver.Set;
D (D_Elaborate, "Partition_Receiver is set");
exception
when others =>
D (D_Exception, "exception in Establish_RPC_Receiver");
raise;
end Establish_RPC_Receiver;
task body Dispatcher is
Last_Request : Request_Id_Type := Request_Id_Type'First;
Current_Rqst : Request_Id_Type := Request_Id_Type'First;
Current_Size : Ada.Streams.Stream_Element_Count;
begin
loop
select
accept New_Request (Request : out Request_Id_Type) do
Request := Last_Request;
if Last_Request = Request_Id_Type'Last then
Last_Request := Request_Id_Type'First;
else
Last_Request := Last_Request + 1;
end if;
end New_Request;
or
accept Wake_Up
(Request : Request_Id_Type;
Length : Ada.Streams.Stream_Element_Count)
do
Current_Rqst := Request;
Current_Size := Length;
end Wake_Up;
select
accept Wait_On (Current_Rqst)
(Length : out Ada.Streams.Stream_Element_Count)
do
Length := Current_Size;
end Wait_On;
or
delay 1.0;
end select;
or
terminate;
end select;
end loop;
exception
when others =>
D (D_Exception, "exception in Dispatcher body");
raise;
end Dispatcher;
task body Anonymous_Task_Type is
Whoami : Anonymous_Task_Node_Access := Self;
C_Message_Id : Message_Id_Type; C_Partition : Partition_ID; Params_S : Ada.Streams.Stream_Element_Count; Result_S : Ada.Streams.Stream_Element_Count; C_Protocol : Protocol_Access;
begin
loop
select
accept Start
(Message_Id : in Message_Id_Type;
Partition : in Partition_ID;
Params_Size : in Ada.Streams.Stream_Element_Count;
Result_Size : in Ada.Streams.Stream_Element_Count;
Protocol : in Protocol_Access)
do
C_Message_Id := Message_Id;
C_Partition := Partition;
Params_S := Params_Size;
Result_S := Result_Size;
C_Protocol := Protocol;
end Start;
or
terminate;
end select;
declare
Params : aliased Params_Stream_Type (Params_S);
Result : aliased Params_Stream_Type (Result_S);
Header : aliased Params_Stream_Type (Header_Size);
begin
D (D_Communication,
"Anonymous Task - Receive Params from partition" &
Partition_ID'Image (C_Partition));
Garlic.Receive
(C_Protocol.all,
C_Partition,
Params'Access);
Garlic.Complete_Receive
(C_Protocol.all,
C_Partition);
Partition_Receiver.Is_Set;
D (D_Debug,
"Anonymous Task - Perform Partition_RPC_Receiver for request" &
Message_Id_Type'Image (C_Message_Id));
Partition_RPC_Receiver (Params'Access, Result'Access);
if C_Message_Id /= 0 then
D (D_Debug, "Anonymous Task - Build Header");
Streams.Allocate (Header);
Streams.Integer_Write_Attribute
(Header'Access, Integer (-C_Message_Id));
Streams.SEC_Write_Attribute
(Header'Access,
Streams.Get_Stream_Size (Result'Access));
D (D_Communication,
"Anonymous Task - Lookup for protocol talk to partition" &
Partition_ID'Image (C_Partition));
Garlic.Initiate_Send
(C_Partition,
Streams.Get_Stream_Size (Header'Access) +
Streams.Get_Stream_Size (Result'Access),
C_Protocol,
Garlic.Remote_Call);
D (D_Communication,
"Anonymous Task - Send Header to partition" &
Partition_ID'Image (C_Partition));
Garlic.Send
(C_Protocol.all,
C_Partition,
Header'Access);
D (D_Communication,
"Anonymous Task - Send Result to partition" &
Partition_ID'Image (C_Partition));
Garlic.Send
(C_Protocol.all,
C_Partition,
Result'Access);
Garlic.Complete_Send
(C_Protocol.all,
C_Partition);
Streams.Deallocate (Header);
end if;
Streams.Deallocate (Params);
Streams.Deallocate (Result);
end;
Garbage_Collector.Deallocate (Whoami);
end loop;
exception
when others =>
D (D_Exception, "exception in Anonymous_Task_Type body");
raise;
end Anonymous_Task_Type;
task body Environnement is
Partition : Partition_ID;
Message_Size : Ada.Streams.Stream_Element_Count;
Result_Size : Ada.Streams.Stream_Element_Count;
Message_Id : Message_Id_Type;
Header : aliased Params_Stream_Type (Header_Size);
Protocol : Protocol_Access;
Anonymous : Anonymous_Task_Node_Access;
begin
accept Start;
D (D_Elaborate, "Environment task elaborated");
loop
Streams.Allocate (Header);
Garlic.Initiate_Receive
(Partition,
Message_Size,
Protocol,
Garlic.Remote_Call);
D (D_Communication,
"Environment task - Receive protocol to talk to active partition" &
Partition_ID'Image (Partition));
D (D_Communication,
"Environment task - Receive Header from partition" &
Partition_ID'Image (Partition));
Garlic.Receive
(Protocol.all,
Partition,
Header'Access);
Message_Size := Message_Size -
Streams.Get_Stream_Size (Header'Access);
Streams.Integer_Read_Attribute (Header'Access, Message_Id);
Streams.SEC_Read_Attribute (Header'Access, Result_Size);
if Streams.Get_Stream_Size (Header'Access) /= 0 then
D (D_Exception, "Header is not empty");
raise Program_Error;
end if;
if Message_Id < 0 then
D (D_Debug, "Environment Task - Receive Reply from partition" &
Partition_ID'Image (Partition));
Dispatcher.Wake_Up (-Message_Id, Result_Size);
else
D (D_Debug, "Environment Task - Receive Request from partition" &
Partition_ID'Image (Partition));
Garbage_Collector.Allocate (Anonymous);
Anonymous.Element.Start
(Message_Id,
Partition,
Message_Size,
Result_Size,
Protocol);
end if;
Streams.Deallocate (Header);
end loop;
exception
when others =>
D (D_Exception, "exception in Environment");
raise;
end Environnement;
begin
Debugging.Set_Environment_Variable ("RPC");
Debugging.Set_Debugging_Name ("D", D_Debug);
Debugging.Set_Debugging_Name ("E", D_Exception);
Debugging.Set_Debugging_Name ("C", D_Communication);
Debugging.Set_Debugging_Name ("Z", D_Elaborate);
D (D_Elaborate, "To be elaborated");
Environnement.Start;
D (D_Elaborate, "ELABORATED");
end System.RPC;