ViewVC Help
View File | Revision Log | Show Annotations | Download File | View Changeset | Root Listing
root/public/ibx/trunk/runtime/nongui/sv5ipc.inc
(Generate patch)

Comparing ibx/trunk/runtime/nongui/sv5ipc.inc (file contents):
Revision 209 by tony, Wed Mar 14 12:48:51 2018 UTC vs.
Revision 321 by tony, Thu Feb 25 12:10:07 2021 UTC

# Line 1 | Line 1
1 < {Used by ISQLMonitor and implements System V IPC}
1 > {Used by IBIPC and implements System V IPC}
2 >
3 > uses IBMessages, ipc, Errors, baseunix;
4  
5   const
6    IPCFileName: string = 'FB.SQL.MONITOR1_0';
# Line 9 | Line 11 | const
11    cReadFinishedEventSemaphore = 4;
12    cDataAvailableEventSemaphore = 6;
13    cWriterBusyEventSemaphore = 8;
14 <  cDefaultTimeout = 1; { 1 seconds }
14 >  cDefaultTimeout = 10; {seconds }
15  
16   {$IF FPC_FULLVERSION = 30000 }
17   {Fix regression in FPC 3.0.0 ipc.pp unit. Expected to be fixed in fpc 3.0.2}
# Line 24 | Line 26 | SEM_SETALL  = 9;
26   {$ENDIF}
27   {$ENDIF}
28  
27 {
28  The call to semctl in ipc is broken in FPC release 2.4.2 and earlier. Hence
29  need to replace with a working libc call. semtimedop is not present in 2.4.2 and earlier.
30 }
31
29   function GetLastErrno: cint;
30   begin
31    Result := fpgetErrno
32   end;
33  
34   type
38  TGlobalInterface = class;
35    {Interprocess Communication Objects. All platform dependent IPC is abstracted
36     into this set of objects }
37  
38    { TIpcCommon }
39  
40 <  TIpcCommon = class
41 <   private
42 <    function GetSa: PSecurityAttributes;
47 < protected
48 <    FInitialiser: boolean;  static;
49 <    FSemaphoreSetID: cint;  static;
50 <    FSharedMemoryID: cint;  static;
40 >  TIpcCommon = class(TInterfacedObject)
41 >  protected
42 >    class var FSemaphoreSetID: cint;  {Initialised by TSharedMemory. Used by other classes}
43      function sem_op(SemNum, op: integer; flags: cshort = 0): cint;
44      function sem_timedop(SemNum, op: integer; timeout_secs: integer; flags: cshort = 0): cint;
45      function GetSemValue(SemNum: integer): cint;
46      procedure SemInit(SemNum, AValue: cint);
47    public
48 +    function GetSa: PSecurityAttributes;
49      property Sa : PSecurityAttributes read GetSa;
50    end;
51  
# Line 76 | Line 69 | type
69      Note that the Linux semaphore set is also created with the shared memory.
70    }
71  
72 <  TSharedMemory = class(TIpcCommon)
72 >  ISharedMemory = interface
73 >  ['{db77bdd4-233a-4c9c-9212-dd7945e2e57c}']
74 >  function IsInitialiser: boolean;
75 >  function Allocate(Size: integer): PByte;
76 >  function GetLastAllocationSize: integer;
77 >  property LastAllocationSize: integer read GetLastAllocationSize;
78 >  end;
79 >
80 >  TSharedMemory = class(TIpcCommon,ISharedMemory)
81    private
82 <    FBuffer: PChar;
82 >    FSharedMemoryID: cint;
83 >    FBuffer: PByte;
84      FLastAllocationSize: integer;
85      FUnused: integer;
86 <    FBufptr: PChar;
86 >    FBufptr: PByte;
87 >    FIPCFileName: AnsiString;
88 >    FInitialiser: boolean;
89      procedure DropSharedMemory;
90      procedure GetSharedMemory(MemSize: integer);
91    public
92      constructor Create(MemSize: integer);
93      destructor Destroy; override;
94 <    function Allocate(Size: integer): PChar;
95 <    property LastAllocationSize: integer read FLastAllocationSize;
94 >    function Allocate(Size: integer): PByte;
95 >    function GetLastAllocationSize: integer;
96 >    function IsInitialiser: boolean;
97 >    property LastAllocationSize: integer read GetLastAllocationSize;
98    end;
99  
100    {TMutex}
101  
102 <  TMutex = class(TIpcCommon)
102 >  TMutex = class(TIpcCommon,IMutex)
103    private
104      FMutexSemaphore: cint;
105      FLockCount: integer;
106    public
107 <    constructor Create(SemNumber: cint);
107 >    constructor Create(SemNumber: cint; IsInitialiser: boolean);
108      procedure Lock;
109      procedure Unlock;
110    end;
# Line 134 | Line 140 | type
140      Always initialised to the Unlocked state
141    }
142  
143 <  TSingleLockGate = class(TIpcCommon)
143 >  TSingleLockGate = class(TIpcCommon,ISingleLockGate)
144    private
145 <    FOwner: TGlobalInterface;
145 >    FSharedMemory: ISharedMemory;
146      FSemaphore: cint;
147      FMutex: cint;
148      FSignalledState: PInteger;
149      FWaitingThreads: PInteger;
150      function GetWaitingThreads: integer;
151    public
152 <    constructor Create(SemNum: cint; AOwner: TGlobalInterface);
152 >    constructor Create(SemNum: cint; sm: ISharedMemory);
153      property WaitingThreads: integer read GetWaitingThreads;
154    public
155      procedure PassthroughGate;
# Line 187 | Line 193 | type
193      Always initialised to the Unlocked state
194    }
195  
196 <  TMultilockGate = class(TIpcCommon)
196 >  TMultilockGate = class(TIpcCommon,IMultiLockGate)
197    private
198 +    FSharedMemory: ISharedMemory;
199      FOnGateTimeout: TNotifyEvent;
193    FOwner: TGlobalInterface;
200      FSemaphore: cint;
201      FMutex: cint;
202      FLockCount: PInteger;
203      function GetLockCount: integer;
204    public
205 <    constructor Create(SemNum: cint; AOwner: TGlobalInterface);
205 >    constructor Create(SemNum: cint; sm: ISharedMemory);
206      procedure Lock;
207      procedure Unlock;
208      procedure PassthroughGate;
209 +    function GetOnGateTimeout: TNotifyEvent;
210 +    procedure SetOnGateTimeout(AValue: TNotifyEvent);
211      property LockCount: integer read GetLockCount;
212 <    property OnGateTimeout: TNotifyEvent read FOnGateTimeout write FOnGateTimeout;
212 >    property OnGateTimeout: TNotifyEvent read GetOnGateTimeout write SetOnGateTimeout;
213    end;
214  
215 <  { TGlobalInterface }
215 >  { TIPCInterface }
216  
217 <  TGlobalInterface = class(TIpcCommon)
217 >  TIPCInterface = class(TIpcCommon,IIPCInterface)
218    private
219      FMaxBufferSize: integer;
220 <    FSharedMemory: TSharedMemory;
221 <    FWriteLock: TMutex;
222 <    FBuffer: PChar;
220 >    FSharedMemory: ISharedMemory;
221 >    FWriteLock: IMutex;
222 >    FBuffer: PByte;
223      FTraceDataType,
224      FBufferSize: PInteger;
225      FTimeStamp: PDateTime;
226 <    FReadReadyEvent: TMultiLockGate;
227 <    FReadFinishedEvent: TMultiLockGate;
228 <    FDataAvailableEvent: TSingleLockGate;
229 <    FWriterBusyEvent: TSingleLockGate;
230 <    function GetMonitorCount: integer;
226 >    FMsgNumber: PInteger;
227 >    FReadReadyEvent: IMultiLockGate;
228 >    FReadFinishedEvent: IMultiLockGate;
229 >    FDataAvailableEvent: ISingleLockGate;
230 >    FWriterBusyEvent: ISingleLockGate;
231    public
232      constructor Create;
225    destructor Destroy; override;
233      procedure IncMonitorCount;
234      procedure DecMonitorCount;
235      procedure SendTrace(TraceObject: TTraceObject);
236      procedure ReceiveTrace(TraceObject: TTraceObject);
237 <    property DataAvailableEvent: TSingleLockGate read FDataAvailableEvent;
238 <    property WriterBusyEvent: TSingleLockGate read FWriterBusyEvent;
239 <    property ReadReadyEvent: TMultiLockGate read FReadReadyEvent;
240 <    property ReadFinishedEvent: TMultiLockGate read FReadFinishedEvent;
241 <    property WriteLock: TMutex read FWriteLock;
237 >    function GetDataAvailableEvent: ISingleLockGate;
238 >    function GetWriterBusyEvent: ISingleLockGate;
239 >    function GetReadReadyEvent: IMultiLockGate;
240 >    function GetReadFinishedEvent: IMultiLockGate;
241 >    function GetWriteLock: IMutex;
242 >    function GetMonitorCount: integer;
243 >    function GetMaxBufferSize: integer;
244 >    property DataAvailableEvent: ISingleLockGate read GetDataAvailableEvent;
245 >    property WriterBusyEvent: ISingleLockGate read GetWriterBusyEvent;
246 >    property ReadReadyEvent: IMultiLockGate read GetReadReadyEvent;
247 >    property ReadFinishedEvent: IMultiLockGate read GetReadFinishedEvent;
248 >    property WriteLock: IMutex read GetWriteLock;
249      property MonitorCount: integer read GetMonitorCount;
250 <    property SharedMemory: TSharedMemory read FSharedMemory;
237 <    property MaxBufferSize: integer read FMaxBufferSize;
250 >    property MaxBufferSize: integer read GetMaxBufferSize;
251    end;
252  
253   { TSharedMemory }
# Line 242 | Line 255 | type
255   procedure TSharedMemory.GetSharedMemory(MemSize: integer);
256   var F: cint;
257   begin
258 +  if GetEnvironmentVariable('FBSQL_IPCFILENAME') <> '' then
259 +    FIPCFileName := GetEnvironmentVariable('FBSQL_IPCFILENAME')
260 +  else
261 +    FIPCFileName := GetTempDir(true) + IPCFileName + '.' + GetEnvironmentVariable('USER');
262 +
263      {Get the Shared Memory and Semaphore IDs from the Global File if it exists
264       or create them and the file otherwise }
265  
266      repeat
267 <      F := fpOpen(IPCFileName, O_WrOnly or O_Creat or O_Excl);
267 >      F := fpOpen(FIPCFileName, O_WrOnly or O_Creat or O_Excl);
268        if F < 0 then
269        begin
270          if fpgetErrno = ESysEEXIST {EEXIST} then
271          begin
272            { looks like it already exists}
273            Sleep(100);
274 <          F := fpOpen(IPCFileName,O_RdOnly);
274 >          F := fpOpen(FIPCFileName,O_RdOnly);
275            if (F < 0) and (fpgetErrno = ESysENOENT {ENOENT}) then
276              {probably just got deleted }
277            else
# Line 309 | Line 327 | begin
327    begin
328      shmctl(FSharedMemoryID,IPC_RMID,nil);
329      semctl(FSemaphoreSetID,0,IPC_RMID,arg);
330 <    DeleteFile(IPCFileName);
330 >    DeleteFile(FIPCFileName);
331    end;
332   end;
333  
# Line 332 | Line 350 | begin
350    inherited Destroy;
351   end;
352  
353 < function TSharedMemory.Allocate(Size: integer): PChar;
353 > function TSharedMemory.Allocate(Size: integer): PByte;
354   begin
355    if Size > FUnused then
356        IBError(ibxeCannotCreateSharedResource, ['Not enough shared memory']);
# Line 351 | Line 369 | begin
369    Inc(FBufPtr,Size)
370   end;
371  
372 + function TSharedMemory.GetLastAllocationSize: integer;
373 + begin
374 +  Result := FLastAllocationSize;
375 + end;
376 +
377 + function TSharedMemory.IsInitialiser: boolean;
378 + begin
379 +  Result := FInitialiser;
380 + end;
381 +
382   { TIpcCommon }
383  
384   function TIpcCommon.GetSa: PSecurityAttributes;
# Line 377 | Line 405 | begin
405      sembuf.sem_flg := flags or SEM_UNDO;
406      timeout.tv_sec := timeout_secs;
407      timeout.tv_nsec := 0;
408 < {$IFDEF HAS_SEMTIMEDOP}
408 > {$IF declared(semtimedop)}
409      Result := semtimedop(FSemaphoreSetID,@sembuf,1,@timeout);
410   {$ELSE}
411      Result := semop(FSemaphoreSetID,@sembuf,1);    {May hang on race condition}
412 < {$ENDIF}
412 > {$IFEND}
413   end;
414  
415   function TIpcCommon.GetSemValue(SemNum: integer): cint;
# Line 405 | Line 433 | end;
433  
434    { TMutex }
435  
436 < constructor TMutex.Create(SemNumber: cint);
436 > constructor TMutex.Create(SemNumber: cint; IsInitialiser: boolean);
437   begin
438    inherited Create;
439    FMutexSemaphore := SemNumber;
440 <  if FInitialiser then
440 >  if IsInitialiser then
441      SemInit(FMutexSemaphore,1)
442   end;
443  
# Line 443 | Line 471 | begin
471    Result := FWaitingThreads^
472   end;
473  
474 < constructor TSingleLockGate.Create(SemNum: cint; AOwner: TGlobalInterface);
474 > constructor TSingleLockGate.Create(SemNum: cint; sm: ISharedMemory);
475   begin
476    inherited Create;
477 <  FOwner := AOwner;
478 <  FSignalledState := PInteger(FOwner.SharedMemory.Allocate(sizeof(FSignalledState)));
479 <  FWaitingThreads := PInteger(FOwner.SharedMemory.Allocate(sizeof(FWaitingThreads)));
477 >  FSharedMemory := sm;
478 >  FSignalledState := PInteger(FSharedMemory.Allocate(sizeof(FSignalledState)));
479 >  FWaitingThreads := PInteger(FSharedMemory.Allocate(sizeof(FWaitingThreads)));
480    FSemaphore := SemNum;
481    FMutex := SemNum + 1;
482 <  if FInitialiser then
482 >  if FSharedMemory.IsInitialiser then
483    begin
484      FSignalledState^ := 1;
485      FWaitingThreads^ := 0;
# Line 479 | Line 507 | begin
507    begin
508      FSignalledState^ := 1;
509      sem_op(FMutex,-1,0); //Acquire Mutex
510 <    //writeln(ClassName + ': Unlocking' ,FSemaphore);
510 >    {$IFDEF DEBUG}writeln(ClassName + ': Unlocking' ,FSemaphore);{$ENDIF}
511      sem_op(FSemaphore,FWaitingThreads^,0);
512      FWaitingThreads^ := 0;
513      sem_op(FMutex,1,0); //Release Mutex
# Line 490 | Line 518 | procedure TSingleLockGate.Lock;
518   begin
519    if FSignalledState^ = 1 then
520    begin
521 <    //writeln(ClassName + ': Locking Gate ',FSemaphore);
521 >    {$IFDEF DEBUG}writeln(ClassName + ': Locking Gate ',FSemaphore);{$ENDIF}
522      SemInit(FSemaphore,0);
523      FSignalledState^ := 0;
524    end;
# Line 498 | Line 526 | end;
526  
527   { TMultilockGate }
528  
529 < constructor TMultilockGate.Create(SemNum: cint; AOwner: TGlobalInterface);
529 > constructor TMultilockGate.Create(SemNum: cint; sm: ISharedMemory);
530   begin
531    inherited Create;
504  FOwner := AOwner;
532    FSemaphore := SemNum;
533    FMutex := SemNum + 1;
534 <  FLockCount := PInteger(FOwner.SharedMemory.Allocate(sizeof(FLockCount)));
535 <  if FInitialiser then
534 >  FSharedMemory := sm;
535 >  FLockCount := PInteger(FSharedMemory.Allocate(sizeof(FLockCount)));
536 >  if FSharedMemory.IsInitialiser then
537    begin
538      FLockCount^ := 0;
539      SemInit(FSemaphore,1);
# Line 518 | Line 546 | begin
546    Result := FLockCount^
547   end;
548  
549 + function TMultilockGate.GetOnGateTimeout: TNotifyEvent;
550 + begin
551 +  Result := FOnGateTimeout;
552 + end;
553 +
554 + procedure TMultilockGate.SetOnGateTimeout(AValue: TNotifyEvent);
555 + begin
556 +  FOnGateTimeout := AValue;
557 + end;
558 +
559   procedure TMultilockGate.Lock;
560   begin
561      sem_op(FMutex,-1,0); //Acquire Mutex
562      if FLockCount^ = 0 then
563      begin
564 <      //writeln(ClassName,': Locking ',FSemaphore);
564 >      {$IFDEF DEBUG}writeln(ClassName,': Locking ',FSemaphore);{$ENDIF}
565        SemInit(FSemaphore,0);
566      end;
567      Inc(FLockCount^);
# Line 536 | Line 574 | begin
574      Dec(FLockCount^);
575      if FLockCount^ <= 0 then
576      begin
577 <      //writeln(ClassName,': UnLocking ',FSemaphore);
577 >      {$IFDEF DEBUG}writeln(ClassName,': UnLocking ',FSemaphore);{$ENDIF}
578        SemInit(FSemaphore,1);
579        FLockCount^ := 0
580      end;
# Line 547 | Line 585 | procedure TMultilockGate.PassthroughGate
585   begin
586    if FLockCount^ = 0 then
587      Exit;
588 <  //writeln(ClassName,': Waiting on ',FSemaphore);
588 >  {$IFDEF DEBUG}writeln(ClassName,': Waiting on ',FSemaphore);{$ENDIF}
589    while sem_timedop(FSemaphore,-1,cDefaultTimeout) < 0 do
590    {looks like we lost a reader}
591    begin
592 +    {$IFDEF DEBUG}writeln(ClassName,': reader lost timeout');{$ENDIF}
593      if FLockCount^ > 0 then
594      begin
595        UnLock;
# Line 559 | Line 598 | begin
598      end
599    end;
600    sem_op(FSemaphore,1);
601 <  //writeln(ClassName,': Wait done on ',FSemaphore);
601 >  {$IFDEF DEBUG}writeln(ClassName,': Wait done on ',FSemaphore);{$ENDIF}
602   end;
603  
604  
605 < { TGlobalInterface }
605 > { TIPCInterface }
606  
607 < function TGlobalInterface.GetMonitorCount: integer;
607 > function TIPCInterface.GetMonitorCount: integer;
608   begin
609    Result := GetSemValue(cMonitorCounter)
610   end;
611  
612 < constructor TGlobalInterface.Create;
612 > function TIPCInterface.GetMaxBufferSize: integer;
613 > begin
614 >  Result := FMaxBufferSize;
615 > end;
616 >
617 > constructor TIPCInterface.Create;
618   begin
619    inherited Create;
620    FSharedMemory := TSharedMemory.Create(cMonitorHookSize);
621  
622 <  FWriteLock := TMutex.Create(cMutexSemaphore);
622 >  FWriteLock := TMutex.Create(cMutexSemaphore,FSharedMemory.IsInitialiser);
623  
624 <  FDataAvailableEvent := TSingleLockGate.Create(cDataAvailableEventSemaphore,self);
625 <  FWriterBusyEvent := TSingleLockGate.Create(cWriterBusyEventSemaphore,self);
626 <  FReadReadyEvent := TMultiLockGate.Create(cReadReadyEventSemaphore,self);
627 <  FReadFinishedEvent := TMultiLockGate.Create(cReadFinishedEventSemaphore,self);
624 >  FDataAvailableEvent := TSingleLockGate.Create(cDataAvailableEventSemaphore,FSharedMemory);
625 >  FWriterBusyEvent := TSingleLockGate.Create(cWriterBusyEventSemaphore,FSharedMemory);
626 >  FReadReadyEvent := TMultiLockGate.Create(cReadReadyEventSemaphore,FSharedMemory);
627 >  FReadFinishedEvent := TMultiLockGate.Create(cReadFinishedEventSemaphore,FSharedMemory);
628  
629 <  if FInitialiser then
629 >  if FSharedMemory.IsInitialiser then
630      SemInit(cMonitorCounter,0);
631    FTraceDataType := PInteger(FSharedMemory.Allocate(sizeof(Integer)));
632    FTimeStamp := PDateTime(FSharedMemory.Allocate(sizeof(TDateTime)));
633    FBufferSize := PInteger(FSharedMemory.Allocate(sizeof(Integer)));
634 +  FMsgNumber := PInteger(FSharedMemory.Allocate(sizeof(Integer)));
635    FBuffer := FSharedMemory.Allocate(0); //All remaining
636    FMaxBufferSize := FSharedMemory.LastAllocationSize;
637  
638 <  if FInitialiser then
638 >  if FSharedMemory.IsInitialiser then
639    begin
640      FBufferSize^ := 0;
641 <    FDataAvailableEvent.Lock
641 >    FDataAvailableEvent.Lock;
642 >    FMsgNumber^ := 0;
643    end;
644   end;
645  
646 < destructor TGlobalInterface.Destroy;
601 < begin
602 <  if assigned(FWriteLock) then FWriteLock.Free;
603 <  if assigned(FDataAvailableEvent) then FDataAvailableEvent.Free;
604 <  if assigned(FWriterBusyEvent) then FWriterBusyEvent.Free;
605 <  if assigned(FReadReadyEvent) then FReadReadyEvent.Free;
606 <  if assigned(FReadFinishedEvent) then FReadFinishedEvent.Free;
607 <  if assigned(FSharedMemory) then FSharedMemory.Free;
608 <  inherited Destroy;
609 < end;
610 <
611 < procedure TGlobalInterface.IncMonitorCount;
646 > procedure TIPCInterface.IncMonitorCount;
647   begin
648    sem_op(cMonitorCounter,1);
649   end;
650  
651 < procedure TGlobalInterface.DecMonitorCount;
651 > procedure TIPCInterface.DecMonitorCount;
652   begin
653    sem_op(cMonitorCounter,-1,IPC_NOWAIT);
654   end;
655  
656 < procedure TGlobalInterface.SendTrace(TraceObject: TTraceObject);
656 > procedure TIPCInterface.SendTrace(TraceObject: TTraceObject);
657   begin
658    FTraceDataType^ := Integer(TraceObject.FDataType);
659    FTimeStamp^ := TraceObject.FTimeStamp;
660 <  FBufferSize^ := Min(Length(TraceObject.FMsg), MaxBufferSize);
660 >  if Length(TraceObject.FMsg) > MaxBufferSize then
661 >    FBufferSize^ := MaxBufferSize
662 >  else
663 >    FBufferSize^ := Length(TraceObject.FMsg);
664 >  FMsgNumber^ := TraceObject.FMsgNumber;
665    Move(TraceObject.FMsg[1], FBuffer^, FBufferSize^);
666   end;
667  
668 < procedure TGlobalInterface.ReceiveTrace(TraceObject: TTraceObject);
668 > procedure TIPCInterface.ReceiveTrace(TraceObject: TTraceObject);
669   begin
670 <  SetString(TraceObject.FMsg, FBuffer, FBufferSize^);
670 >  SetString(TraceObject.FMsg, PAnsiChar(FBuffer), FBufferSize^);
671    TraceObject.FDataType := TTraceFlag(FTraceDataType^);
672    TraceObject.FTimeStamp := TDateTime(FTimeStamp^);
673 +  TraceObject.FMsgNumber := FMsgNumber^;
674 + end;
675 +
676 + function TIPCInterface.GetDataAvailableEvent: ISingleLockGate;
677 + begin
678 +  Result := FDataAvailableEvent;
679 + end;
680 +
681 + function TIPCInterface.GetWriterBusyEvent: ISingleLockGate;
682 + begin
683 +  Result := FWriterBusyEvent;
684 + end;
685 +
686 + function TIPCInterface.GetReadReadyEvent: IMultiLockGate;
687 + begin
688 +  Result := FReadReadyEvent;
689 + end;
690 +
691 + function TIPCInterface.GetReadFinishedEvent: IMultiLockGate;
692 + begin
693 +  Result := FReadFinishedEvent;
694 + end;
695 +
696 + function TIPCInterface.GetWriteLock: IMutex;
697 + begin
698 +  Result := FWriteLock;
699   end;
700  
701  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines