ViewVC Help
View File | Revision Log | Show Annotations | Download File | View Changeset | Root Listing
root/public/ibx/trunk/runtime/sv5ipc.inc
Revision: 45
Committed: Tue Dec 6 10:33:46 2016 UTC (7 years, 11 months ago) by tony
File size: 19488 byte(s)
Log Message:
Committing updates for Release R2-0-0

File Contents

# Content
1 {Used by ISQLMonitor and implements System V IPC}
2
3 const
4 IPCFileName: string = 'FB.SQL.MONITOR1_0';
5 cNumberOfSemaphores = 10;
6 cMutexSemaphore = 0;
7 cMonitorCounter = 1;
8 cReadReadyEventSemaphore = 2;
9 cReadFinishedEventSemaphore = 4;
10 cDataAvailableEventSemaphore = 6;
11 cWriterBusyEventSemaphore = 8;
12 cDefaultTimeout = 1; { 1 seconds }
13
14 {$IF FPC_FULLVERSION = 30000 }
15 {Fix regression in FPC 3.0.0 ipc.pp unit. Expected to be fixed in fpc 3.0.2}
16 {$IF defined(darwin) }
17 SEM_GETNCNT = 3; { Return the value of sempid (READ) }
18 SEM_GETPID = 4; { Return the value of semval (READ) }
19 SEM_GETVAL = 5; { Return semvals into arg.array (READ) }
20 SEM_GETALL = 6; { Return the value of semzcnt (READ) }
21 SEM_GETZCNT = 7; { Set the value of semval to arg.val (ALTER) }
22 SEM_SETVAL = 8; { Set semvals from arg.array (ALTER) }
23 SEM_SETALL = 9;
24 {$ENDIF}
25 {$ENDIF}
26
27 {
28 The call to semctl in ipc is broken in FPC release 2.4.2 and earlier. Hence
29 need to replace with a working libc call. semtimedop is not present in 2.4.2 and earlier.
30 }
31
32 function GetLastErrno: cint;
33 begin
34 Result := fpgetErrno
35 end;
36
37 type
38 TGlobalInterface = class;
39 {Interprocess Communication Objects. All platform dependent IPC is abstracted
40 into this set of objects }
41
42 { TIpcCommon }
43
44 TIpcCommon = class
45 private
46 function GetSa: PSecurityAttributes;
47 protected
48 FInitialiser: boolean; static;
49 FSemaphoreSetID: cint; static;
50 FSharedMemoryID: cint; static;
51 function sem_op(SemNum, op: integer; flags: cshort = 0): cint;
52 function sem_timedop(SemNum, op: integer; timeout_secs: integer; flags: cshort = 0): cint;
53 function GetSemValue(SemNum: integer): cint;
54 procedure SemInit(SemNum, AValue: cint);
55 public
56 property Sa : PSecurityAttributes read GetSa;
57 end;
58
59 { TSharedMemory }
60
61 {
62 The shared memory segment is used for interprocess communication and
63 holds both a message buffer and a number of shared variables. Shared
64 memory is allocated to each shared variable using the Allocate function.
65 An underlying assumption is that each process using the shared memory
66 calls "Allocate" in the same order and for the same memory sizes.
67
68 Linux:
69
70 The Linux implementation uses Linux shared memory. IPC_PRIVATE is used
71 to allocate the memory and the resulting memory id is written to a
72 well known file. By default this is in the current user's home directory,
73 but this can be over-ridden to specify a globally unique filename.
74
75 Access to the shared memory is restricted to the current user/group.
76 Note that the Linux semaphore set is also created with the shared memory.
77 }
78
79 TSharedMemory = class(TIpcCommon)
80 private
81 FBuffer: PChar;
82 FLastAllocationSize: integer;
83 FUnused: integer;
84 FBufptr: PChar;
85 procedure DropSharedMemory;
86 procedure GetSharedMemory(MemSize: integer);
87 public
88 constructor Create(MemSize: integer);
89 destructor Destroy; override;
90 function Allocate(Size: integer): PChar;
91 property LastAllocationSize: integer read FLastAllocationSize;
92 end;
93
94 {TMutex}
95
96 TMutex = class(TIpcCommon)
97 private
98 FMutexSemaphore: cint;
99 FLockCount: integer;
100 public
101 constructor Create(SemNumber: cint);
102 procedure Lock;
103 procedure Unlock;
104 end;
105
106 { TSingleLockGate }
107
108 {
109 A single lock gate is either open or closed. When open, any thread can pass
110 through it while, when closed, all threads are blocked as they try to pass
111 through the gate. When the gate is opened, all blocked threads are resumed.
112
113 There is an implementation assumption that only one writer thread at
114 a time (i.e. the thread which locks or unlocks the gate) can have access to
115 it at any one time. I.e. an external Mutex prevents race conditions.
116
117 Linux:
118
119 In the Linux implementation, the gate is implemented by a semaphore
120 and a share memory integer used as a bi-state variable. When the gate
121 is open, the bi-state variable is non-zero. It is set to zero when closed.
122 Another shared memory integer is used to count the number of waiting
123 threads, and a second semaphore is used to protect access to this.
124
125 The event semaphore is initialised to zero. When a thread passes through the gate
126 it checks the state. If open, the thread continues. If closed then it
127 increments the count of waiting threads and then decrements the semaphore
128 and hence enters an indefinite wait state.
129
130 When the gate is locked, the state is set to zero. When unlocked, the state
131 is set to one and the semaphore incremented by the number of waiting threads,
132 which itself is then zeroed.
133
134 Always initialised to the Unlocked state
135 }
136
137 TSingleLockGate = class(TIpcCommon)
138 private
139 FOwner: TGlobalInterface;
140 FSemaphore: cint;
141 FMutex: cint;
142 FSignalledState: PInteger;
143 FWaitingThreads: PInteger;
144 function GetWaitingThreads: integer;
145 public
146 constructor Create(SemNum: cint; AOwner: TGlobalInterface);
147 property WaitingThreads: integer read GetWaitingThreads;
148 public
149 procedure PassthroughGate;
150 procedure Unlock;
151 procedure Lock;
152 end;
153
154 { TMultilockGate }
155
156 { This type of Gate is used where several reader threads must pass
157 through the gate before it can be opened for a writer thread.
158
159 The reader threads register their interest by each locking the gate.
160 The writer thread then waits on the locked gate until all the reader
161 threads have separately unlocked the gate.
162
163 There is an underlying assumption of a single writer. A Mutex must
164 be used to control access to the gate from the writer side if this
165 assumption is invalid.
166
167 Linux:
168
169 The Linux implementation uses a single semaphore to implement the gate,
170 which is initialised to 1 (unlocked), and a count of the number of
171 threads that have locked the gate (LockCount). A mutex semaphore
172 protects access to the LockCount. When the gate is locked, the lockcount
173 is incremented and, if the LockCount was originally zero, the semaphore is
174 set to zero (Gate Closed).
175
176 Unlocking the gate, is the reverse. The LockCount is decremented and, if it
177 reaches zero, the semaphore is set to one (Gate Opened).
178
179 When a writer passes through the gate, it checks the LockCount, if zero it
180 proceeds to pass through the gate. Otherwise it decrements and waits on the
181 semaphore. When the writer resumes, it increments the semaphore in order
182 to return it to its unlocked state. The wait is a timed wait, as there is
183 a risk that a reader thread may terminate while the gate is locked. If the
184 LockCount is non-zero, it is decremented and the writer returns to wait on
185 the gate.
186
187 Always initialised to the Unlocked state
188 }
189
190 TMultilockGate = class(TIpcCommon)
191 private
192 FOnGateTimeout: TNotifyEvent;
193 FOwner: TGlobalInterface;
194 FSemaphore: cint;
195 FMutex: cint;
196 FLockCount: PInteger;
197 function GetLockCount: integer;
198 public
199 constructor Create(SemNum: cint; AOwner: TGlobalInterface);
200 procedure Lock;
201 procedure Unlock;
202 procedure PassthroughGate;
203 property LockCount: integer read GetLockCount;
204 property OnGateTimeout: TNotifyEvent read FOnGateTimeout write FOnGateTimeout;
205 end;
206
207 { TGlobalInterface }
208
209 TGlobalInterface = class(TIpcCommon)
210 private
211 FMaxBufferSize: integer;
212 FSharedMemory: TSharedMemory;
213 FWriteLock: TMutex;
214 FBuffer: PChar;
215 FTraceDataType,
216 FBufferSize: PInteger;
217 FTimeStamp: PDateTime;
218 FReadReadyEvent: TMultiLockGate;
219 FReadFinishedEvent: TMultiLockGate;
220 FDataAvailableEvent: TSingleLockGate;
221 FWriterBusyEvent: TSingleLockGate;
222 function GetMonitorCount: integer;
223 public
224 constructor Create;
225 destructor Destroy; override;
226 procedure IncMonitorCount;
227 procedure DecMonitorCount;
228 procedure SendTrace(TraceObject: TTraceObject);
229 procedure ReceiveTrace(TraceObject: TTraceObject);
230 property DataAvailableEvent: TSingleLockGate read FDataAvailableEvent;
231 property WriterBusyEvent: TSingleLockGate read FWriterBusyEvent;
232 property ReadReadyEvent: TMultiLockGate read FReadReadyEvent;
233 property ReadFinishedEvent: TMultiLockGate read FReadFinishedEvent;
234 property WriteLock: TMutex read FWriteLock;
235 property MonitorCount: integer read GetMonitorCount;
236 property SharedMemory: TSharedMemory read FSharedMemory;
237 property MaxBufferSize: integer read FMaxBufferSize;
238 end;
239
240 { TSharedMemory }
241
242 procedure TSharedMemory.GetSharedMemory(MemSize: integer);
243 var F: cint;
244 begin
245 {Get the Shared Memory and Semaphore IDs from the Global File if it exists
246 or create them and the file otherwise }
247
248 repeat
249 F := fpOpen(IPCFileName, O_WrOnly or O_Creat or O_Excl);
250 if F < 0 then
251 begin
252 if fpgetErrno = ESysEEXIST {EEXIST} then
253 begin
254 { looks like it already exists}
255 Sleep(100);
256 F := fpOpen(IPCFileName,O_RdOnly);
257 if (F < 0) and (fpgetErrno = ESysENOENT {ENOENT}) then
258 {probably just got deleted }
259 else
260 if F < 0 then
261 IBError(ibxeCannotCreateSharedResource,['Error accessing IPC File - ' +
262 StrError(fpgetErrno)]);
263 end
264 else
265 IBError(ibxeCannotCreateSharedResource,['Error creating IPC File - ' +
266 StrError(fpgetErrno)]);
267 end
268 else
269 FInitialiser := true
270 until F >= 0;
271
272 if FInitialiser then
273 begin
274 FSharedMemoryID := shmget(IPC_PRIVATE,MemSize, IPC_CREAT or
275 S_IRUSR or S_IWUSR or S_IRGRP or S_IWGRP);
276 if FSharedMemoryID < 0 then
277 IBError(ibxeCannotCreateSharedResource,['Cannot create shared memory segment - ' +
278 StrError(fpgetErrno)]);
279
280 FSemaphoreSetID := semget(IPC_PRIVATE, cNumberOfSemaphores,IPC_CREAT or
281 S_IRUSR or S_IWUSR or S_IRGRP or S_IWGRP);
282 if FSemaphoreSetID < 0 then
283 IBError(ibxeCannotCreateSharedResource,['Cannot create shared semaphore set - ' +
284 StrError(fpgetErrno)]);
285
286 fpWrite(F,FSharedMemoryID,sizeof(FSharedMemoryID));
287 fpWrite(F,FSemaphoreSetID,sizeof(FSemaphoreSetID));
288 end
289 else
290 begin
291 fpRead(F,FSharedMemoryID,sizeof(FSharedMemoryID));
292 fpRead(F,FSemaphoreSetID,sizeof(FSemaphoreSetID));
293 if GetSemValue(cMonitorCounter) = 0 then
294 begin
295 FInitialiser := true;
296 //writeln('Opened file and is initialiser');
297 end
298 end;
299 fpClose(F);
300 end;
301
302 procedure TSharedMemory.DropSharedMemory;
303 var ds: TShmid_ds;
304 arg: tsemun;
305 begin
306 if shmctl(FSharedMemoryID,IPC_STAT,@ds) < 0 then
307 IBError(ibxeSV5APIError,['Error getting shared memory info' + strError(fpgetErrno)]);
308 if ds.shm_nattch = 0 then { we are the last one out - so, turn off the lights }
309 begin
310 shmctl(FSharedMemoryID,IPC_RMID,nil);
311 semctl(FSemaphoreSetID,0,IPC_RMID,arg);
312 DeleteFile(IPCFileName);
313 end;
314 end;
315
316 constructor TSharedMemory.Create(MemSize: integer);
317 begin
318 inherited Create;
319 FInitialiser := false;
320 GetSharedMemory(MemSize);
321 FBuffer := shmat(FSharedMemoryID,nil,0);
322 if PtrInt(FBuffer) = -1 then
323 IBError(ibxeCannotCreateSharedResource,[StrError(Errno)]);
324 FBufPtr := FBuffer;
325 FUnused := MemSize
326 end;
327
328 destructor TSharedMemory.Destroy;
329 begin
330 shmdt(FBuffer);
331 DropSharedMemory;
332 inherited Destroy;
333 end;
334
335 function TSharedMemory.Allocate(Size: integer): PChar;
336 begin
337 if Size > FUnused then
338 IBError(ibxeCannotCreateSharedResource, ['Not enough shared memory']);
339 Result := FBufPtr;
340
341 if Size = 0 then
342 begin
343 FLastAllocationSize := FUnused;
344 FUnused := 0
345 end
346 else
347 begin
348 FLastAllocationSize := Size;
349 Dec(FUnused,Size);
350 end;
351 Inc(FBufPtr,Size)
352 end;
353
354 { TIpcCommon }
355
356 function TIpcCommon.GetSa: PSecurityAttributes;
357 begin
358 Result := nil
359 end;
360
361 function TIpcCommon.sem_op(SemNum, op: integer; flags: cshort): cint;
362 var sembuf: TSEMbuf;
363 begin
364 sembuf.sem_num := SemNum;
365 sembuf.sem_op:= op;
366 sembuf.sem_flg := flags or SEM_UNDO;
367 Result := semop(FSemaphoreSetID,@sembuf,1);
368 end;
369
370 function TIpcCommon.sem_timedop(SemNum, op: integer; timeout_secs: integer;
371 flags: cshort): cint;
372 var sembuf: TSEMbuf;
373 timeout: TimeSpec;
374 begin
375 sembuf.sem_num := SemNum;
376 sembuf.sem_op:= op;
377 sembuf.sem_flg := flags or SEM_UNDO;
378 timeout.tv_sec := timeout_secs;
379 timeout.tv_nsec := 0;
380 {$IFDEF HAS_SEMTIMEDOP}
381 Result := semtimedop(FSemaphoreSetID,@sembuf,1,@timeout);
382 {$ELSE}
383 Result := semop(FSemaphoreSetID,@sembuf,1); {May hang on race condition}
384 {$ENDIF}
385 end;
386
387 function TIpcCommon.GetSemValue(SemNum: integer): cint;
388 var args :TSEMun;
389 begin
390 Result := semctl(FSemaphoreSetID,SemNum,SEM_GETVAL,args);
391 if Result < 0 then
392 IBError(ibxeSV5APIError,['GetSemValue: '+strError(GetLastErrno)]);
393 end;
394
395 procedure TIpcCommon.SemInit(SemNum, AValue: cint);
396 var args :TSEMun;
397 begin
398 //writeln('Initialising ',SemNum,' to ',AValue);
399 args.val := AValue;
400 if semctl(FSemaphoreSetID,SemNum,SEM_SETVAL,args) < 0 then
401 IBError(ibxeCannotCreateSharedResource,['Unable to initialise Semaphone ' +
402 IntToStr(SemNum) + '- ' + StrError(GetLastErrno)]);
403
404 end;
405
406 { TMutex }
407
408 constructor TMutex.Create(SemNumber: cint);
409 begin
410 inherited Create;
411 FMutexSemaphore := SemNumber;
412 if FInitialiser then
413 SemInit(FMutexSemaphore,1)
414 end;
415
416 { Obtain ownership of the Mutex and prevent other threads from accessing protected resource }
417
418 procedure TMutex.Lock;
419 begin
420 //writeln('Lock: Entering Mutex ',FMutexSemaphore,' LockCount=',FLockCount,' State = ',GetSemValue(FMutexSemaphore));
421 if FLockCount = 0 then
422 sem_op(FMutexSemaphore,-1);
423 Inc(FLockCount);
424 //writeln('Lock: Mutex Exit');
425 end;
426
427 {Give up ownership of the Mutex and allow other threads access }
428
429 procedure TMutex.Unlock;
430 begin
431 //writeln('UnLock: Entering Mutex, LockCount=',FLockCount);
432 if FLockCount = 0 then Exit;
433 Dec(FLockCount);
434 if FLockCount = 0 then
435 sem_op(FMutexSemaphore,1);
436 //writeln('UnLock: Mutex Exit',' State = ',GetSemValue(FMutexSemaphore));
437 end;
438
439 { TSingleLockGate }
440
441 function TSingleLockGate.GetWaitingThreads: integer;
442 begin
443 Result := FWaitingThreads^
444 end;
445
446 constructor TSingleLockGate.Create(SemNum: cint; AOwner: TGlobalInterface);
447 begin
448 inherited Create;
449 FOwner := AOwner;
450 FSignalledState := PInteger(FOwner.SharedMemory.Allocate(sizeof(FSignalledState)));
451 FWaitingThreads := PInteger(FOwner.SharedMemory.Allocate(sizeof(FWaitingThreads)));
452 FSemaphore := SemNum;
453 FMutex := SemNum + 1;
454 if FInitialiser then
455 begin
456 FSignalledState^ := 1;
457 FWaitingThreads^ := 0;
458 SemInit(FSemaphore,0);
459 SemInit(FMutex,1);
460 end;
461 end;
462
463 procedure TSingleLockGate.PassthroughGate;
464 begin
465 if FSignalledState^ = 0 then
466 begin
467 sem_op(FMutex,-1,0); //Acquire Mutex
468 Inc(FWaitingThreads^);
469 sem_op(FMutex,1,0); //Release Mutex
470 //writeln(ClassName + ': Wait State Entered ',FSemaphore,' = ',GetSemValue(FSemaphore));
471 sem_op(FSemaphore,-1,0); //Enter Wait
472 //writeln(ClassName + ': Wait State Ends ',FSemaphore);
473 end;
474 end;
475
476 procedure TSingleLockGate.Unlock;
477 begin
478 if FSignalledState^ = 0 then
479 begin
480 FSignalledState^ := 1;
481 sem_op(FMutex,-1,0); //Acquire Mutex
482 //writeln(ClassName + ': Unlocking' ,FSemaphore);
483 sem_op(FSemaphore,FWaitingThreads^,0);
484 FWaitingThreads^ := 0;
485 sem_op(FMutex,1,0); //Release Mutex
486 end;
487 end;
488
489 procedure TSingleLockGate.Lock;
490 begin
491 if FSignalledState^ = 1 then
492 begin
493 //writeln(ClassName + ': Locking Gate ',FSemaphore);
494 SemInit(FSemaphore,0);
495 FSignalledState^ := 0;
496 end;
497 end;
498
499 { TMultilockGate }
500
501 constructor TMultilockGate.Create(SemNum: cint; AOwner: TGlobalInterface);
502 begin
503 inherited Create;
504 FOwner := AOwner;
505 FSemaphore := SemNum;
506 FMutex := SemNum + 1;
507 FLockCount := PInteger(FOwner.SharedMemory.Allocate(sizeof(FLockCount)));
508 if FInitialiser then
509 begin
510 FLockCount^ := 0;
511 SemInit(FSemaphore,1);
512 SemInit(FMutex,1);
513 end;
514 end;
515
516 function TMultilockGate.GetLockCount: integer;
517 begin
518 Result := FLockCount^
519 end;
520
521 procedure TMultilockGate.Lock;
522 begin
523 sem_op(FMutex,-1,0); //Acquire Mutex
524 if FLockCount^ = 0 then
525 begin
526 //writeln(ClassName,': Locking ',FSemaphore);
527 SemInit(FSemaphore,0);
528 end;
529 Inc(FLockCount^);
530 sem_op(FMutex,1,0); //Release Mutex
531 end;
532
533 procedure TMultilockGate.Unlock;
534 begin
535 sem_op(FMutex,-1,0); //Acquire Mutex
536 Dec(FLockCount^);
537 if FLockCount^ <= 0 then
538 begin
539 //writeln(ClassName,': UnLocking ',FSemaphore);
540 SemInit(FSemaphore,1);
541 FLockCount^ := 0
542 end;
543 sem_op(FMutex,1,0); //Release Mutex
544 end;
545
546 procedure TMultilockGate.PassthroughGate;
547 begin
548 if FLockCount^ = 0 then
549 Exit;
550 //writeln(ClassName,': Waiting on ',FSemaphore);
551 while sem_timedop(FSemaphore,-1,cDefaultTimeout) < 0 do
552 {looks like we lost a reader}
553 begin
554 if FLockCount^ > 0 then
555 begin
556 UnLock;
557 if assigned(FOnGateTimeout) then
558 OnGateTimeout(self)
559 end
560 end;
561 sem_op(FSemaphore,1);
562 //writeln(ClassName,': Wait done on ',FSemaphore);
563 end;
564
565
566 { TGlobalInterface }
567
568 function TGlobalInterface.GetMonitorCount: integer;
569 begin
570 Result := GetSemValue(cMonitorCounter)
571 end;
572
573 constructor TGlobalInterface.Create;
574 begin
575 inherited Create;
576 FSharedMemory := TSharedMemory.Create(cMonitorHookSize);
577
578 FWriteLock := TMutex.Create(cMutexSemaphore);
579
580 FDataAvailableEvent := TSingleLockGate.Create(cDataAvailableEventSemaphore,self);
581 FWriterBusyEvent := TSingleLockGate.Create(cWriterBusyEventSemaphore,self);
582 FReadReadyEvent := TMultiLockGate.Create(cReadReadyEventSemaphore,self);
583 FReadFinishedEvent := TMultiLockGate.Create(cReadFinishedEventSemaphore,self);
584
585 if FInitialiser then
586 SemInit(cMonitorCounter,0);
587 FTraceDataType := PInteger(FSharedMemory.Allocate(sizeof(Integer)));
588 FTimeStamp := PDateTime(FSharedMemory.Allocate(sizeof(TDateTime)));
589 FBufferSize := PInteger(FSharedMemory.Allocate(sizeof(Integer)));
590 FBuffer := FSharedMemory.Allocate(0); //All remaining
591 FMaxBufferSize := FSharedMemory.LastAllocationSize;
592
593 if FInitialiser then
594 begin
595 FBufferSize^ := 0;
596 FDataAvailableEvent.Lock
597 end;
598 end;
599
600 destructor TGlobalInterface.Destroy;
601 begin
602 if assigned(FWriteLock) then FWriteLock.Free;
603 if assigned(FDataAvailableEvent) then FDataAvailableEvent.Free;
604 if assigned(FWriterBusyEvent) then FWriterBusyEvent.Free;
605 if assigned(FReadReadyEvent) then FReadReadyEvent.Free;
606 if assigned(FReadFinishedEvent) then FReadFinishedEvent.Free;
607 if assigned(FSharedMemory) then FSharedMemory.Free;
608 inherited Destroy;
609 end;
610
611 procedure TGlobalInterface.IncMonitorCount;
612 begin
613 sem_op(cMonitorCounter,1);
614 end;
615
616 procedure TGlobalInterface.DecMonitorCount;
617 begin
618 sem_op(cMonitorCounter,-1,IPC_NOWAIT);
619 end;
620
621 procedure TGlobalInterface.SendTrace(TraceObject: TTraceObject);
622 begin
623 FTraceDataType^ := Integer(TraceObject.FDataType);
624 FTimeStamp^ := TraceObject.FTimeStamp;
625 FBufferSize^ := Min(Length(TraceObject.FMsg), MaxBufferSize);
626 Move(TraceObject.FMsg[1], FBuffer^, FBufferSize^);
627 end;
628
629 procedure TGlobalInterface.ReceiveTrace(TraceObject: TTraceObject);
630 begin
631 SetString(TraceObject.FMsg, FBuffer, FBufferSize^);
632 TraceObject.FDataType := TTraceFlag(FTraceDataType^);
633 TraceObject.FTimeStamp := TDateTime(FTimeStamp^);
634 end;
635
636
637