ViewVC Help
View File | Revision Log | Show Annotations | Download File | View Changeset | Root Listing
root/public/ibx/trunk/runtime/nongui/sv5ipc.inc
Revision: 209
Committed: Wed Mar 14 12:48:51 2018 UTC (6 years, 8 months ago) by tony
File size: 19488 byte(s)
Log Message:
Fixes Merged

File Contents

# User Rev Content
1 tony 209 {Used by ISQLMonitor and implements System V IPC}
2    
3     const
4     IPCFileName: string = 'FB.SQL.MONITOR1_0';
5     cNumberOfSemaphores = 10;
6     cMutexSemaphore = 0;
7     cMonitorCounter = 1;
8     cReadReadyEventSemaphore = 2;
9     cReadFinishedEventSemaphore = 4;
10     cDataAvailableEventSemaphore = 6;
11     cWriterBusyEventSemaphore = 8;
12     cDefaultTimeout = 1; { 1 seconds }
13    
14     {$IF FPC_FULLVERSION = 30000 }
15     {Fix regression in FPC 3.0.0 ipc.pp unit. Expected to be fixed in fpc 3.0.2}
16     {$IF defined(darwin) }
17     SEM_GETNCNT = 3; { Return the value of sempid (READ) }
18     SEM_GETPID = 4; { Return the value of semval (READ) }
19     SEM_GETVAL = 5; { Return semvals into arg.array (READ) }
20     SEM_GETALL = 6; { Return the value of semzcnt (READ) }
21     SEM_GETZCNT = 7; { Set the value of semval to arg.val (ALTER) }
22     SEM_SETVAL = 8; { Set semvals from arg.array (ALTER) }
23     SEM_SETALL = 9;
24     {$ENDIF}
25     {$ENDIF}
26    
27     {
28     The call to semctl in ipc is broken in FPC release 2.4.2 and earlier. Hence
29     need to replace with a working libc call. semtimedop is not present in 2.4.2 and earlier.
30     }
31    
32     function GetLastErrno: cint;
33     begin
34     Result := fpgetErrno
35     end;
36    
37     type
38     TGlobalInterface = class;
39     {Interprocess Communication Objects. All platform dependent IPC is abstracted
40     into this set of objects }
41    
42     { TIpcCommon }
43    
44     TIpcCommon = class
45     private
46     function GetSa: PSecurityAttributes;
47     protected
48     FInitialiser: boolean; static;
49     FSemaphoreSetID: cint; static;
50     FSharedMemoryID: cint; static;
51     function sem_op(SemNum, op: integer; flags: cshort = 0): cint;
52     function sem_timedop(SemNum, op: integer; timeout_secs: integer; flags: cshort = 0): cint;
53     function GetSemValue(SemNum: integer): cint;
54     procedure SemInit(SemNum, AValue: cint);
55     public
56     property Sa : PSecurityAttributes read GetSa;
57     end;
58    
59     { TSharedMemory }
60    
61     {
62     The shared memory segment is used for interprocess communication and
63     holds both a message buffer and a number of shared variables. Shared
64     memory is allocated to each shared variable using the Allocate function.
65     An underlying assumption is that each process using the shared memory
66     calls "Allocate" in the same order and for the same memory sizes.
67    
68     Linux:
69    
70     The Linux implementation uses Linux shared memory. IPC_PRIVATE is used
71     to allocate the memory and the resulting memory id is written to a
72     well known file. By default this is in the current user's home directory,
73     but this can be over-ridden to specify a globally unique filename.
74    
75     Access to the shared memory is restricted to the current user/group.
76     Note that the Linux semaphore set is also created with the shared memory.
77     }
78    
79     TSharedMemory = class(TIpcCommon)
80     private
81     FBuffer: PChar;
82     FLastAllocationSize: integer;
83     FUnused: integer;
84     FBufptr: PChar;
85     procedure DropSharedMemory;
86     procedure GetSharedMemory(MemSize: integer);
87     public
88     constructor Create(MemSize: integer);
89     destructor Destroy; override;
90     function Allocate(Size: integer): PChar;
91     property LastAllocationSize: integer read FLastAllocationSize;
92     end;
93    
94     {TMutex}
95    
96     TMutex = class(TIpcCommon)
97     private
98     FMutexSemaphore: cint;
99     FLockCount: integer;
100     public
101     constructor Create(SemNumber: cint);
102     procedure Lock;
103     procedure Unlock;
104     end;
105    
106     { TSingleLockGate }
107    
108     {
109     A single lock gate is either open or closed. When open, any thread can pass
110     through it while, when closed, all threads are blocked as they try to pass
111     through the gate. When the gate is opened, all blocked threads are resumed.
112    
113     There is an implementation assumption that only one writer thread at
114     a time (i.e. the thread which locks or unlocks the gate) can have access to
115     it at any one time. I.e. an external Mutex prevents race conditions.
116    
117     Linux:
118    
119     In the Linux implementation, the gate is implemented by a semaphore
120     and a share memory integer used as a bi-state variable. When the gate
121     is open, the bi-state variable is non-zero. It is set to zero when closed.
122     Another shared memory integer is used to count the number of waiting
123     threads, and a second semaphore is used to protect access to this.
124    
125     The event semaphore is initialised to zero. When a thread passes through the gate
126     it checks the state. If open, the thread continues. If closed then it
127     increments the count of waiting threads and then decrements the semaphore
128     and hence enters an indefinite wait state.
129    
130     When the gate is locked, the state is set to zero. When unlocked, the state
131     is set to one and the semaphore incremented by the number of waiting threads,
132     which itself is then zeroed.
133    
134     Always initialised to the Unlocked state
135     }
136    
137     TSingleLockGate = class(TIpcCommon)
138     private
139     FOwner: TGlobalInterface;
140     FSemaphore: cint;
141     FMutex: cint;
142     FSignalledState: PInteger;
143     FWaitingThreads: PInteger;
144     function GetWaitingThreads: integer;
145     public
146     constructor Create(SemNum: cint; AOwner: TGlobalInterface);
147     property WaitingThreads: integer read GetWaitingThreads;
148     public
149     procedure PassthroughGate;
150     procedure Unlock;
151     procedure Lock;
152     end;
153    
154     { TMultilockGate }
155    
156     { This type of Gate is used where several reader threads must pass
157     through the gate before it can be opened for a writer thread.
158    
159     The reader threads register their interest by each locking the gate.
160     The writer thread then waits on the locked gate until all the reader
161     threads have separately unlocked the gate.
162    
163     There is an underlying assumption of a single writer. A Mutex must
164     be used to control access to the gate from the writer side if this
165     assumption is invalid.
166    
167     Linux:
168    
169     The Linux implementation uses a single semaphore to implement the gate,
170     which is initialised to 1 (unlocked), and a count of the number of
171     threads that have locked the gate (LockCount). A mutex semaphore
172     protects access to the LockCount. When the gate is locked, the lockcount
173     is incremented and, if the LockCount was originally zero, the semaphore is
174     set to zero (Gate Closed).
175    
176     Unlocking the gate, is the reverse. The LockCount is decremented and, if it
177     reaches zero, the semaphore is set to one (Gate Opened).
178    
179     When a writer passes through the gate, it checks the LockCount, if zero it
180     proceeds to pass through the gate. Otherwise it decrements and waits on the
181     semaphore. When the writer resumes, it increments the semaphore in order
182     to return it to its unlocked state. The wait is a timed wait, as there is
183     a risk that a reader thread may terminate while the gate is locked. If the
184     LockCount is non-zero, it is decremented and the writer returns to wait on
185     the gate.
186    
187     Always initialised to the Unlocked state
188     }
189    
190     TMultilockGate = class(TIpcCommon)
191     private
192     FOnGateTimeout: TNotifyEvent;
193     FOwner: TGlobalInterface;
194     FSemaphore: cint;
195     FMutex: cint;
196     FLockCount: PInteger;
197     function GetLockCount: integer;
198     public
199     constructor Create(SemNum: cint; AOwner: TGlobalInterface);
200     procedure Lock;
201     procedure Unlock;
202     procedure PassthroughGate;
203     property LockCount: integer read GetLockCount;
204     property OnGateTimeout: TNotifyEvent read FOnGateTimeout write FOnGateTimeout;
205     end;
206    
207     { TGlobalInterface }
208    
209     TGlobalInterface = class(TIpcCommon)
210     private
211     FMaxBufferSize: integer;
212     FSharedMemory: TSharedMemory;
213     FWriteLock: TMutex;
214     FBuffer: PChar;
215     FTraceDataType,
216     FBufferSize: PInteger;
217     FTimeStamp: PDateTime;
218     FReadReadyEvent: TMultiLockGate;
219     FReadFinishedEvent: TMultiLockGate;
220     FDataAvailableEvent: TSingleLockGate;
221     FWriterBusyEvent: TSingleLockGate;
222     function GetMonitorCount: integer;
223     public
224     constructor Create;
225     destructor Destroy; override;
226     procedure IncMonitorCount;
227     procedure DecMonitorCount;
228     procedure SendTrace(TraceObject: TTraceObject);
229     procedure ReceiveTrace(TraceObject: TTraceObject);
230     property DataAvailableEvent: TSingleLockGate read FDataAvailableEvent;
231     property WriterBusyEvent: TSingleLockGate read FWriterBusyEvent;
232     property ReadReadyEvent: TMultiLockGate read FReadReadyEvent;
233     property ReadFinishedEvent: TMultiLockGate read FReadFinishedEvent;
234     property WriteLock: TMutex read FWriteLock;
235     property MonitorCount: integer read GetMonitorCount;
236     property SharedMemory: TSharedMemory read FSharedMemory;
237     property MaxBufferSize: integer read FMaxBufferSize;
238     end;
239    
240     { TSharedMemory }
241    
242     procedure TSharedMemory.GetSharedMemory(MemSize: integer);
243     var F: cint;
244     begin
245     {Get the Shared Memory and Semaphore IDs from the Global File if it exists
246     or create them and the file otherwise }
247    
248     repeat
249     F := fpOpen(IPCFileName, O_WrOnly or O_Creat or O_Excl);
250     if F < 0 then
251     begin
252     if fpgetErrno = ESysEEXIST {EEXIST} then
253     begin
254     { looks like it already exists}
255     Sleep(100);
256     F := fpOpen(IPCFileName,O_RdOnly);
257     if (F < 0) and (fpgetErrno = ESysENOENT {ENOENT}) then
258     {probably just got deleted }
259     else
260     if F < 0 then
261     IBError(ibxeCannotCreateSharedResource,['Error accessing IPC File - ' +
262     StrError(fpgetErrno)]);
263     end
264     else
265     IBError(ibxeCannotCreateSharedResource,['Error creating IPC File - ' +
266     StrError(fpgetErrno)]);
267     end
268     else
269     FInitialiser := true
270     until F >= 0;
271    
272     if FInitialiser then
273     begin
274     FSharedMemoryID := shmget(IPC_PRIVATE,MemSize, IPC_CREAT or
275     S_IRUSR or S_IWUSR or S_IRGRP or S_IWGRP);
276     if FSharedMemoryID < 0 then
277     IBError(ibxeCannotCreateSharedResource,['Cannot create shared memory segment - ' +
278     StrError(fpgetErrno)]);
279    
280     FSemaphoreSetID := semget(IPC_PRIVATE, cNumberOfSemaphores,IPC_CREAT or
281     S_IRUSR or S_IWUSR or S_IRGRP or S_IWGRP);
282     if FSemaphoreSetID < 0 then
283     IBError(ibxeCannotCreateSharedResource,['Cannot create shared semaphore set - ' +
284     StrError(fpgetErrno)]);
285    
286     fpWrite(F,FSharedMemoryID,sizeof(FSharedMemoryID));
287     fpWrite(F,FSemaphoreSetID,sizeof(FSemaphoreSetID));
288     end
289     else
290     begin
291     fpRead(F,FSharedMemoryID,sizeof(FSharedMemoryID));
292     fpRead(F,FSemaphoreSetID,sizeof(FSemaphoreSetID));
293     if GetSemValue(cMonitorCounter) = 0 then
294     begin
295     FInitialiser := true;
296     //writeln('Opened file and is initialiser');
297     end
298     end;
299     fpClose(F);
300     end;
301    
302     procedure TSharedMemory.DropSharedMemory;
303     var ds: TShmid_ds;
304     arg: tsemun;
305     begin
306     if shmctl(FSharedMemoryID,IPC_STAT,@ds) < 0 then
307     IBError(ibxeSV5APIError,['Error getting shared memory info' + strError(fpgetErrno)]);
308     if ds.shm_nattch = 0 then { we are the last one out - so, turn off the lights }
309     begin
310     shmctl(FSharedMemoryID,IPC_RMID,nil);
311     semctl(FSemaphoreSetID,0,IPC_RMID,arg);
312     DeleteFile(IPCFileName);
313     end;
314     end;
315    
316     constructor TSharedMemory.Create(MemSize: integer);
317     begin
318     inherited Create;
319     FInitialiser := false;
320     GetSharedMemory(MemSize);
321     FBuffer := shmat(FSharedMemoryID,nil,0);
322     if PtrInt(FBuffer) = -1 then
323     IBError(ibxeCannotCreateSharedResource,[StrError(Errno)]);
324     FBufPtr := FBuffer;
325     FUnused := MemSize
326     end;
327    
328     destructor TSharedMemory.Destroy;
329     begin
330     shmdt(FBuffer);
331     DropSharedMemory;
332     inherited Destroy;
333     end;
334    
335     function TSharedMemory.Allocate(Size: integer): PChar;
336     begin
337     if Size > FUnused then
338     IBError(ibxeCannotCreateSharedResource, ['Not enough shared memory']);
339     Result := FBufPtr;
340    
341     if Size = 0 then
342     begin
343     FLastAllocationSize := FUnused;
344     FUnused := 0
345     end
346     else
347     begin
348     FLastAllocationSize := Size;
349     Dec(FUnused,Size);
350     end;
351     Inc(FBufPtr,Size)
352     end;
353    
354     { TIpcCommon }
355    
356     function TIpcCommon.GetSa: PSecurityAttributes;
357     begin
358     Result := nil
359     end;
360    
361     function TIpcCommon.sem_op(SemNum, op: integer; flags: cshort): cint;
362     var sembuf: TSEMbuf;
363     begin
364     sembuf.sem_num := SemNum;
365     sembuf.sem_op:= op;
366     sembuf.sem_flg := flags or SEM_UNDO;
367     Result := semop(FSemaphoreSetID,@sembuf,1);
368     end;
369    
370     function TIpcCommon.sem_timedop(SemNum, op: integer; timeout_secs: integer;
371     flags: cshort): cint;
372     var sembuf: TSEMbuf;
373     timeout: TimeSpec;
374     begin
375     sembuf.sem_num := SemNum;
376     sembuf.sem_op:= op;
377     sembuf.sem_flg := flags or SEM_UNDO;
378     timeout.tv_sec := timeout_secs;
379     timeout.tv_nsec := 0;
380     {$IFDEF HAS_SEMTIMEDOP}
381     Result := semtimedop(FSemaphoreSetID,@sembuf,1,@timeout);
382     {$ELSE}
383     Result := semop(FSemaphoreSetID,@sembuf,1); {May hang on race condition}
384     {$ENDIF}
385     end;
386    
387     function TIpcCommon.GetSemValue(SemNum: integer): cint;
388     var args :TSEMun;
389     begin
390     Result := semctl(FSemaphoreSetID,SemNum,SEM_GETVAL,args);
391     if Result < 0 then
392     IBError(ibxeSV5APIError,['GetSemValue: '+strError(GetLastErrno)]);
393     end;
394    
395     procedure TIpcCommon.SemInit(SemNum, AValue: cint);
396     var args :TSEMun;
397     begin
398     //writeln('Initialising ',SemNum,' to ',AValue);
399     args.val := AValue;
400     if semctl(FSemaphoreSetID,SemNum,SEM_SETVAL,args) < 0 then
401     IBError(ibxeCannotCreateSharedResource,['Unable to initialise Semaphone ' +
402     IntToStr(SemNum) + '- ' + StrError(GetLastErrno)]);
403    
404     end;
405    
406     { TMutex }
407    
408     constructor TMutex.Create(SemNumber: cint);
409     begin
410     inherited Create;
411     FMutexSemaphore := SemNumber;
412     if FInitialiser then
413     SemInit(FMutexSemaphore,1)
414     end;
415    
416     { Obtain ownership of the Mutex and prevent other threads from accessing protected resource }
417    
418     procedure TMutex.Lock;
419     begin
420     //writeln('Lock: Entering Mutex ',FMutexSemaphore,' LockCount=',FLockCount,' State = ',GetSemValue(FMutexSemaphore));
421     if FLockCount = 0 then
422     sem_op(FMutexSemaphore,-1);
423     Inc(FLockCount);
424     //writeln('Lock: Mutex Exit');
425     end;
426    
427     {Give up ownership of the Mutex and allow other threads access }
428    
429     procedure TMutex.Unlock;
430     begin
431     //writeln('UnLock: Entering Mutex, LockCount=',FLockCount);
432     if FLockCount = 0 then Exit;
433     Dec(FLockCount);
434     if FLockCount = 0 then
435     sem_op(FMutexSemaphore,1);
436     //writeln('UnLock: Mutex Exit',' State = ',GetSemValue(FMutexSemaphore));
437     end;
438    
439     { TSingleLockGate }
440    
441     function TSingleLockGate.GetWaitingThreads: integer;
442     begin
443     Result := FWaitingThreads^
444     end;
445    
446     constructor TSingleLockGate.Create(SemNum: cint; AOwner: TGlobalInterface);
447     begin
448     inherited Create;
449     FOwner := AOwner;
450     FSignalledState := PInteger(FOwner.SharedMemory.Allocate(sizeof(FSignalledState)));
451     FWaitingThreads := PInteger(FOwner.SharedMemory.Allocate(sizeof(FWaitingThreads)));
452     FSemaphore := SemNum;
453     FMutex := SemNum + 1;
454     if FInitialiser then
455     begin
456     FSignalledState^ := 1;
457     FWaitingThreads^ := 0;
458     SemInit(FSemaphore,0);
459     SemInit(FMutex,1);
460     end;
461     end;
462    
463     procedure TSingleLockGate.PassthroughGate;
464     begin
465     if FSignalledState^ = 0 then
466     begin
467     sem_op(FMutex,-1,0); //Acquire Mutex
468     Inc(FWaitingThreads^);
469     sem_op(FMutex,1,0); //Release Mutex
470     //writeln(ClassName + ': Wait State Entered ',FSemaphore,' = ',GetSemValue(FSemaphore));
471     sem_op(FSemaphore,-1,0); //Enter Wait
472     //writeln(ClassName + ': Wait State Ends ',FSemaphore);
473     end;
474     end;
475    
476     procedure TSingleLockGate.Unlock;
477     begin
478     if FSignalledState^ = 0 then
479     begin
480     FSignalledState^ := 1;
481     sem_op(FMutex,-1,0); //Acquire Mutex
482     //writeln(ClassName + ': Unlocking' ,FSemaphore);
483     sem_op(FSemaphore,FWaitingThreads^,0);
484     FWaitingThreads^ := 0;
485     sem_op(FMutex,1,0); //Release Mutex
486     end;
487     end;
488    
489     procedure TSingleLockGate.Lock;
490     begin
491     if FSignalledState^ = 1 then
492     begin
493     //writeln(ClassName + ': Locking Gate ',FSemaphore);
494     SemInit(FSemaphore,0);
495     FSignalledState^ := 0;
496     end;
497     end;
498    
499     { TMultilockGate }
500    
501     constructor TMultilockGate.Create(SemNum: cint; AOwner: TGlobalInterface);
502     begin
503     inherited Create;
504     FOwner := AOwner;
505     FSemaphore := SemNum;
506     FMutex := SemNum + 1;
507     FLockCount := PInteger(FOwner.SharedMemory.Allocate(sizeof(FLockCount)));
508     if FInitialiser then
509     begin
510     FLockCount^ := 0;
511     SemInit(FSemaphore,1);
512     SemInit(FMutex,1);
513     end;
514     end;
515    
516     function TMultilockGate.GetLockCount: integer;
517     begin
518     Result := FLockCount^
519     end;
520    
521     procedure TMultilockGate.Lock;
522     begin
523     sem_op(FMutex,-1,0); //Acquire Mutex
524     if FLockCount^ = 0 then
525     begin
526     //writeln(ClassName,': Locking ',FSemaphore);
527     SemInit(FSemaphore,0);
528     end;
529     Inc(FLockCount^);
530     sem_op(FMutex,1,0); //Release Mutex
531     end;
532    
533     procedure TMultilockGate.Unlock;
534     begin
535     sem_op(FMutex,-1,0); //Acquire Mutex
536     Dec(FLockCount^);
537     if FLockCount^ <= 0 then
538     begin
539     //writeln(ClassName,': UnLocking ',FSemaphore);
540     SemInit(FSemaphore,1);
541     FLockCount^ := 0
542     end;
543     sem_op(FMutex,1,0); //Release Mutex
544     end;
545    
546     procedure TMultilockGate.PassthroughGate;
547     begin
548     if FLockCount^ = 0 then
549     Exit;
550     //writeln(ClassName,': Waiting on ',FSemaphore);
551     while sem_timedop(FSemaphore,-1,cDefaultTimeout) < 0 do
552     {looks like we lost a reader}
553     begin
554     if FLockCount^ > 0 then
555     begin
556     UnLock;
557     if assigned(FOnGateTimeout) then
558     OnGateTimeout(self)
559     end
560     end;
561     sem_op(FSemaphore,1);
562     //writeln(ClassName,': Wait done on ',FSemaphore);
563     end;
564    
565    
566     { TGlobalInterface }
567    
568     function TGlobalInterface.GetMonitorCount: integer;
569     begin
570     Result := GetSemValue(cMonitorCounter)
571     end;
572    
573     constructor TGlobalInterface.Create;
574     begin
575     inherited Create;
576     FSharedMemory := TSharedMemory.Create(cMonitorHookSize);
577    
578     FWriteLock := TMutex.Create(cMutexSemaphore);
579    
580     FDataAvailableEvent := TSingleLockGate.Create(cDataAvailableEventSemaphore,self);
581     FWriterBusyEvent := TSingleLockGate.Create(cWriterBusyEventSemaphore,self);
582     FReadReadyEvent := TMultiLockGate.Create(cReadReadyEventSemaphore,self);
583     FReadFinishedEvent := TMultiLockGate.Create(cReadFinishedEventSemaphore,self);
584    
585     if FInitialiser then
586     SemInit(cMonitorCounter,0);
587     FTraceDataType := PInteger(FSharedMemory.Allocate(sizeof(Integer)));
588     FTimeStamp := PDateTime(FSharedMemory.Allocate(sizeof(TDateTime)));
589     FBufferSize := PInteger(FSharedMemory.Allocate(sizeof(Integer)));
590     FBuffer := FSharedMemory.Allocate(0); //All remaining
591     FMaxBufferSize := FSharedMemory.LastAllocationSize;
592    
593     if FInitialiser then
594     begin
595     FBufferSize^ := 0;
596     FDataAvailableEvent.Lock
597     end;
598     end;
599    
600     destructor TGlobalInterface.Destroy;
601     begin
602     if assigned(FWriteLock) then FWriteLock.Free;
603     if assigned(FDataAvailableEvent) then FDataAvailableEvent.Free;
604     if assigned(FWriterBusyEvent) then FWriterBusyEvent.Free;
605     if assigned(FReadReadyEvent) then FReadReadyEvent.Free;
606     if assigned(FReadFinishedEvent) then FReadFinishedEvent.Free;
607     if assigned(FSharedMemory) then FSharedMemory.Free;
608     inherited Destroy;
609     end;
610    
611     procedure TGlobalInterface.IncMonitorCount;
612     begin
613     sem_op(cMonitorCounter,1);
614     end;
615    
616     procedure TGlobalInterface.DecMonitorCount;
617     begin
618     sem_op(cMonitorCounter,-1,IPC_NOWAIT);
619     end;
620    
621     procedure TGlobalInterface.SendTrace(TraceObject: TTraceObject);
622     begin
623     FTraceDataType^ := Integer(TraceObject.FDataType);
624     FTimeStamp^ := TraceObject.FTimeStamp;
625     FBufferSize^ := Min(Length(TraceObject.FMsg), MaxBufferSize);
626     Move(TraceObject.FMsg[1], FBuffer^, FBufferSize^);
627     end;
628    
629     procedure TGlobalInterface.ReceiveTrace(TraceObject: TTraceObject);
630     begin
631     SetString(TraceObject.FMsg, FBuffer, FBufferSize^);
632     TraceObject.FDataType := TTraceFlag(FTraceDataType^);
633     TraceObject.FTimeStamp := TDateTime(FTimeStamp^);
634     end;
635    
636    
637