ViewVC Help
View File | Revision Log | Show Annotations | Download File | View Changeset | Root Listing
root/public/ibx/trunk/runtime/sv5ipc.inc
Revision: 7
Committed: Sun Aug 5 18:28:19 2012 UTC (12 years, 3 months ago) by tony
File size: 19667 byte(s)
Log Message:
Committing updates for Release R1-0-0

File Contents

# Content
1 {Used by ISQLMonitor and implements System V IPC}
2
3 const
4 IPCFileName: string = 'FB.SQL.MONITOR1_0';
5 cNumberOfSemaphores = 10;
6 cMutexSemaphore = 0;
7 cMonitorCounter = 1;
8 cReadReadyEventSemaphore = 2;
9 cReadFinishedEventSemaphore = 4;
10 cDataAvailableEventSemaphore = 6;
11 cWriterBusyEventSemaphore = 8;
12 cDefaultTimeout = 1; { 1 seconds }
13
14 {
15 The call to semctl in ipc is broken in FPC release 2.4.2 and earlier. Hence
16 need to replace with a working libc call. semtimedop is not present in 2.4.2 and earlier.
17 }
18
19 {$IF FPC_FULLVERSION <= 20402 }
20 Function real_semctl(semid:cint; semnum:cint; cmd:cint): cint; cdecl; varargs; external clib name 'semctl';
21
22 Function semctl(semid:cint; semnum:cint; cmd:cint; var arg: tsemun): cint;
23 begin
24 semctl := real_semctl(semid,semnum,cmd,pointer(arg));
25 end;
26 {$IFDEF HAS_SEMTIMEDOP}
27 Function semtimedop(semid:cint; sops: psembuf; nsops: cuint; timeOut: ptimespec): cint; cdecl; external clib name 'semtimedop';
28 {$ENDIF}
29 Function semget(key:Tkey; nsems:cint; semflg:cint): cint; cdecl; external clib name 'semget';
30 Function semop(semid:cint; sops: psembuf; nsops: cuint): cint; cdecl; external clib name 'semop';
31
32 function GetLastErrno: cint;
33 begin
34 Result := fpgetCerrno
35 end;
36 {$ELSE}
37 function GetLastErrno: cint;
38 begin
39 Result := fpgetErrno
40 end;
41
42 {$ENDIF}
43
44 type
45 TGlobalInterface = class;
46 {Interprocess Communication Objects. All platform dependent IPC is abstracted
47 into this set of objects }
48
49 { TIpcCommon }
50
51 TIpcCommon = class
52 private
53 function GetSa: PSecurityAttributes;
54 protected
55 FInitialiser: boolean; static;
56 FSemaphoreSetID: cint; static;
57 FSharedMemoryID: cint; static;
58 function sem_op(SemNum, op: integer; flags: cshort = 0): cint;
59 function sem_timedop(SemNum, op: integer; timeout_secs: integer; flags: cshort = 0): cint;
60 function GetSemValue(SemNum: integer): cint;
61 procedure SemInit(SemNum, AValue: cint);
62 public
63 property Sa : PSecurityAttributes read GetSa;
64 end;
65
66 { TSharedMemory }
67
68 {
69 The shared memory segment is used for interprocess communication and
70 holds both a message buffer and a number of shared variables. Shared
71 memory is allocated to each shared variable using the Allocate function.
72 An underlying assumption is that each process using the shared memory
73 calls "Allocate" in the same order and for the same memory sizes.
74
75 Linux:
76
77 The Linux implementation uses Linux shared memory. IPC_PRIVATE is used
78 to allocate the memory and the resulting memory id is written to a
79 well known file. By default this is in the current user's home directory,
80 but this can be over-ridden to specify a globally unique filename.
81
82 Access to the shared memory is restricted to the current user/group.
83 Note that the Linux semaphore set is also created with the shared memory.
84 }
85
86 TSharedMemory = class(TIpcCommon)
87 private
88 FBuffer: PChar;
89 FLastAllocationSize: integer;
90 FUnused: integer;
91 FBufptr: PChar;
92 procedure DropSharedMemory;
93 procedure GetSharedMemory(MemSize: integer);
94 public
95 constructor Create(MemSize: integer);
96 destructor Destroy; override;
97 function Allocate(Size: integer): PChar;
98 property LastAllocationSize: integer read FLastAllocationSize;
99 end;
100
101 {TMutex}
102
103 TMutex = class(TIpcCommon)
104 private
105 FMutexSemaphore: cint;
106 FLockCount: integer;
107 public
108 constructor Create(SemNumber: cint);
109 procedure Lock;
110 procedure Unlock;
111 end;
112
113 { TSingleLockGate }
114
115 {
116 A single lock gate is either open or closed. When open, any thread can pass
117 through it while, when closed, all threads are blocked as they try to pass
118 through the gate. When the gate is opened, all blocked threads are resumed.
119
120 There is an implementation assumption that only one writer thread at
121 a time (i.e. the thread which locks or unlocks the gate) can have access to
122 it at any one time. I.e. an external Mutex prevents race conditions.
123
124 Linux:
125
126 In the Linux implementation, the gate is implemented by a semaphore
127 and a share memory integer used as a bi-state variable. When the gate
128 is open, the bi-state variable is non-zero. It is set to zero when closed.
129 Another shared memory integer is used to count the number of waiting
130 threads, and a second semaphore is used to protect access to this.
131
132 The event semaphore is initialised to zero. When a thread passes through the gate
133 it checks the state. If open, the thread continues. If closed then it
134 increments the count of waiting threads and then decrements the semaphore
135 and hence enters an indefinite wait state.
136
137 When the gate is locked, the state is set to zero. When unlocked, the state
138 is set to one and the semaphore incremented by the number of waiting threads,
139 which itself is then zeroed.
140
141 Always initialised to the Unlocked state
142 }
143
144 TSingleLockGate = class(TIpcCommon)
145 private
146 FOwner: TGlobalInterface;
147 FSemaphore: cint;
148 FMutex: cint;
149 FSignalledState: PInteger;
150 FWaitingThreads: PInteger;
151 function GetWaitingThreads: integer;
152 public
153 constructor Create(SemNum: cint; AOwner: TGlobalInterface);
154 property WaitingThreads: integer read GetWaitingThreads;
155 public
156 procedure PassthroughGate;
157 procedure Unlock;
158 procedure Lock;
159 end;
160
161 { TMultilockGate }
162
163 { This type of Gate is used where several reader threads must pass
164 through the gate before it can be opened for a writer thread.
165
166 The reader threads register their interest by each locking the gate.
167 The writer thread then waits on the locked gate until all the reader
168 threads have separately unlocked the gate.
169
170 There is an underlying assumption of a single writer. A Mutex must
171 be used to control access to the gate from the writer side if this
172 assumption is invalid.
173
174 Linux:
175
176 The Linux implementation uses a single semaphore to implement the gate,
177 which is initialised to 1 (unlocked), and a count of the number of
178 threads that have locked the gate (LockCount). A mutex semaphore
179 protects access to the LockCount. When the gate is locked, the lockcount
180 is incremented and, if the LockCount was originally zero, the semaphore is
181 set to zero (Gate Closed).
182
183 Unlocking the gate, is the reverse. The LockCount is decremented and, if it
184 reaches zero, the semaphore is set to one (Gate Opened).
185
186 When a writer passes through the gate, it checks the LockCount, if zero it
187 proceeds to pass through the gate. Otherwise it decrements and waits on the
188 semaphore. When the writer resumes, it increments the semaphore in order
189 to return it to its unlocked state. The wait is a timed wait, as there is
190 a risk that a reader thread may terminate while the gate is locked. If the
191 LockCount is non-zero, it is decremented and the writer returns to wait on
192 the gate.
193
194 Always initialised to the Unlocked state
195 }
196
197 TMultilockGate = class(TIpcCommon)
198 private
199 FOnGateTimeout: TNotifyEvent;
200 FOwner: TGlobalInterface;
201 FSemaphore: cint;
202 FMutex: cint;
203 FLockCount: PInteger;
204 function GetLockCount: integer;
205 public
206 constructor Create(SemNum: cint; AOwner: TGlobalInterface);
207 procedure Lock;
208 procedure Unlock;
209 procedure PassthroughGate;
210 property LockCount: integer read GetLockCount;
211 property OnGateTimeout: TNotifyEvent read FOnGateTimeout write FOnGateTimeout;
212 end;
213
214 { TGlobalInterface }
215
216 TGlobalInterface = class(TIpcCommon)
217 private
218 FMaxBufferSize: integer;
219 FSharedMemory: TSharedMemory;
220 FWriteLock: TMutex;
221 FBuffer: PChar;
222 FTraceDataType,
223 FBufferSize: PInteger;
224 FTimeStamp: PDateTime;
225 FReadReadyEvent: TMultiLockGate;
226 FReadFinishedEvent: TMultiLockGate;
227 FDataAvailableEvent: TSingleLockGate;
228 FWriterBusyEvent: TSingleLockGate;
229 function GetMonitorCount: integer;
230 public
231 constructor Create;
232 destructor Destroy; override;
233 procedure IncMonitorCount;
234 procedure DecMonitorCount;
235 procedure SendTrace(TraceObject: TTraceObject);
236 procedure ReceiveTrace(TraceObject: TTraceObject);
237 property DataAvailableEvent: TSingleLockGate read FDataAvailableEvent;
238 property WriterBusyEvent: TSingleLockGate read FWriterBusyEvent;
239 property ReadReadyEvent: TMultiLockGate read FReadReadyEvent;
240 property ReadFinishedEvent: TMultiLockGate read FReadFinishedEvent;
241 property WriteLock: TMutex read FWriteLock;
242 property MonitorCount: integer read GetMonitorCount;
243 property SharedMemory: TSharedMemory read FSharedMemory;
244 property MaxBufferSize: integer read FMaxBufferSize;
245 end;
246
247 { TSharedMemory }
248
249 procedure TSharedMemory.GetSharedMemory(MemSize: integer);
250 var F: cint;
251 begin
252 {Get the Shared Memory and Semaphore IDs from the Global File if it exists
253 or create them and the file otherwise }
254
255 repeat
256 F := fpOpen(IPCFileName, O_WrOnly or O_Creat or O_Excl);
257 if F < 0 then
258 begin
259 if fpgetErrno = 17 {EEXIST} then
260 begin
261 { looks like it already exists}
262 Sleep(100);
263 F := fpOpen(IPCFileName,O_RdOnly);
264 if (F < 0) and (fpgetErrno = 2 {ENOENT}) then
265 {probably just got deleted }
266 else
267 if F < 0 then
268 IBError(ibxeCannotCreateSharedResource,['Error accessing IPC File - ' +
269 StrError(fpgetErrno)]);
270 end
271 else
272 IBError(ibxeCannotCreateSharedResource,['Error creating IPC File - ' +
273 StrError(fpgetErrno)]);
274 end
275 else
276 FInitialiser := true
277 until F >= 0;
278
279 if FInitialiser then
280 begin
281 FSharedMemoryID := shmget(IPC_PRIVATE,MemSize, IPC_CREAT or
282 S_IRUSR or S_IWUSR or S_IRGRP or S_IWGRP);
283 if FSharedMemoryID < 0 then
284 IBError(ibxeCannotCreateSharedResource,['Cannot create shared memory segment - ' +
285 StrError(fpgetErrno)]);
286
287 FSemaphoreSetID := semget(IPC_PRIVATE, cNumberOfSemaphores,IPC_CREAT or
288 S_IRUSR or S_IWUSR or S_IRGRP or S_IWGRP);
289 if FSemaphoreSetID < 0 then
290 IBError(ibxeCannotCreateSharedResource,['Cannot create shared semaphore set - ' +
291 StrError(fpgetErrno)]);
292
293 fpWrite(F,FSharedMemoryID,sizeof(FSharedMemoryID));
294 fpWrite(F,FSemaphoreSetID,sizeof(FSemaphoreSetID));
295 end
296 else
297 begin
298 fpRead(F,FSharedMemoryID,sizeof(FSharedMemoryID));
299 fpRead(F,FSemaphoreSetID,sizeof(FSemaphoreSetID));
300 if GetSemValue(cMonitorCounter) = 0 then
301 begin
302 FInitialiser := true;
303 //writeln('Opened file and is initialiser');
304 end
305 end;
306 fpClose(F);
307 end;
308
309 procedure TSharedMemory.DropSharedMemory;
310 var ds: TShmid_ds;
311 arg: tsemun;
312 begin
313 if shmctl(FSharedMemoryID,IPC_STAT,@ds) < 0 then
314 IBError(ibxeSV5APIError,['Error getting shared memory info' + strError(fpgetErrno)]);
315 if ds.shm_nattch = 0 then { we are the last one out - so, turn off the lights }
316 begin
317 shmctl(FSharedMemoryID,IPC_RMID,nil);
318 semctl(FSemaphoreSetID,0,IPC_RMID,arg);
319 DeleteFile(IPCFileName);
320 end;
321 end;
322
323 constructor TSharedMemory.Create(MemSize: integer);
324 begin
325 inherited Create;
326 FInitialiser := false;
327 GetSharedMemory(MemSize);
328 FBuffer := shmat(FSharedMemoryID,nil,0);
329 if PtrInt(FBuffer) = -1 then
330 IBError(ibxeCannotCreateSharedResource,[StrError(Errno)]);
331 FBufPtr := FBuffer;
332 FUnused := MemSize
333 end;
334
335 destructor TSharedMemory.Destroy;
336 begin
337 shmdt(FBuffer);
338 DropSharedMemory;
339 inherited Destroy;
340 end;
341
342 function TSharedMemory.Allocate(Size: integer): PChar;
343 begin
344 if Size > FUnused then
345 IBError(ibxeCannotCreateSharedResource, ['Not enough shared memory']);
346 Result := FBufPtr;
347
348 if Size = 0 then
349 begin
350 FLastAllocationSize := FUnused;
351 FUnused := 0
352 end
353 else
354 begin
355 FLastAllocationSize := Size;
356 Dec(FUnused,Size);
357 end;
358 Inc(FBufPtr,Size)
359 end;
360
361 { TIpcCommon }
362
363 function TIpcCommon.GetSa: PSecurityAttributes;
364 begin
365 Result := nil
366 end;
367
368 function TIpcCommon.sem_op(SemNum, op: integer; flags: cshort): cint;
369 var sembuf: TSEMbuf;
370 begin
371 sembuf.sem_num := SemNum;
372 sembuf.sem_op:= op;
373 sembuf.sem_flg := flags or SEM_UNDO;
374 Result := semop(FSemaphoreSetID,@sembuf,1);
375 end;
376
377 function TIpcCommon.sem_timedop(SemNum, op: integer; timeout_secs: integer;
378 flags: cshort): cint;
379 var sembuf: TSEMbuf;
380 timeout: TimeSpec;
381 begin
382 sembuf.sem_num := SemNum;
383 sembuf.sem_op:= op;
384 sembuf.sem_flg := flags or SEM_UNDO;
385 timeout.tv_sec := timeout_secs;
386 timeout.tv_nsec := 0;
387 {$IFDEF HAS_SEMTIMEDOP}
388 Result := semtimedop(FSemaphoreSetID,@sembuf,1,@timeout);
389 {$ELSE}
390 Result := semop(FSemaphoreSetID,@sembuf,1); {May hang on race condition}
391 {$ENDIF}
392 end;
393
394 function TIpcCommon.GetSemValue(SemNum: integer): cint;
395 var args :TSEMun;
396 begin
397 Result := semctl(FSemaphoreSetID,SemNum,SEM_GETVAL,args);
398 if Result < 0 then
399 IBError(ibxeSV5APIError,['GetSemValue: '+strError(GetLastErrno)]);
400 end;
401
402 procedure TIpcCommon.SemInit(SemNum, AValue: cint);
403 var args :TSEMun;
404 begin
405 //writeln('Initialising ',SemNum,' to ',AValue);
406 args.val := AValue;
407 if semctl(FSemaphoreSetID,SemNum,SEM_SETVAL,args) < 0 then
408 IBError(ibxeCannotCreateSharedResource,['Unable to initialise Semaphone ' +
409 IntToStr(SemNum) + '- ' + StrError(GetLastErrno)]);
410
411 end;
412
413 { TMutex }
414
415 constructor TMutex.Create(SemNumber: cint);
416 begin
417 inherited Create;
418 FMutexSemaphore := SemNumber;
419 if FInitialiser then
420 SemInit(FMutexSemaphore,1)
421 end;
422
423 { Obtain ownership of the Mutex and prevent other threads from accessing protected resource }
424
425 procedure TMutex.Lock;
426 begin
427 //writeln('Lock: Entering Mutex ',FMutexSemaphore,' LockCount=',FLockCount,' State = ',GetSemValue(FMutexSemaphore));
428 if FLockCount = 0 then
429 sem_op(FMutexSemaphore,-1);
430 Inc(FLockCount);
431 //writeln('Lock: Mutex Exit');
432 end;
433
434 {Give up ownership of the Mutex and allow other threads access }
435
436 procedure TMutex.Unlock;
437 begin
438 //writeln('UnLock: Entering Mutex, LockCount=',FLockCount);
439 if FLockCount = 0 then Exit;
440 Dec(FLockCount);
441 if FLockCount = 0 then
442 sem_op(FMutexSemaphore,1);
443 //writeln('UnLock: Mutex Exit',' State = ',GetSemValue(FMutexSemaphore));
444 end;
445
446 { TSingleLockGate }
447
448 function TSingleLockGate.GetWaitingThreads: integer;
449 begin
450 Result := FWaitingThreads^
451 end;
452
453 constructor TSingleLockGate.Create(SemNum: cint; AOwner: TGlobalInterface);
454 begin
455 inherited Create;
456 FOwner := AOwner;
457 FSignalledState := PInteger(FOwner.SharedMemory.Allocate(sizeof(FSignalledState)));
458 FWaitingThreads := PInteger(FOwner.SharedMemory.Allocate(sizeof(FWaitingThreads)));
459 FSemaphore := SemNum;
460 FMutex := SemNum + 1;
461 if FInitialiser then
462 begin
463 FSignalledState^ := 1;
464 FWaitingThreads^ := 0;
465 SemInit(FSemaphore,0);
466 SemInit(FMutex,1);
467 end;
468 end;
469
470 procedure TSingleLockGate.PassthroughGate;
471 begin
472 if FSignalledState^ = 0 then
473 begin
474 sem_op(FMutex,-1,0); //Acquire Mutex
475 Inc(FWaitingThreads^);
476 sem_op(FMutex,1,0); //Release Mutex
477 //writeln(ClassName + ': Wait State Entered ',FSemaphore,' = ',GetSemValue(FSemaphore));
478 sem_op(FSemaphore,-1,0); //Enter Wait
479 //writeln(ClassName + ': Wait State Ends ',FSemaphore);
480 end;
481 end;
482
483 procedure TSingleLockGate.Unlock;
484 begin
485 if FSignalledState^ = 0 then
486 begin
487 FSignalledState^ := 1;
488 sem_op(FMutex,-1,0); //Acquire Mutex
489 //writeln(ClassName + ': Unlocking' ,FSemaphore);
490 sem_op(FSemaphore,FWaitingThreads^,0);
491 FWaitingThreads^ := 0;
492 sem_op(FMutex,1,0); //Release Mutex
493 end;
494 end;
495
496 procedure TSingleLockGate.Lock;
497 begin
498 if FSignalledState^ = 1 then
499 begin
500 //writeln(ClassName + ': Locking Gate ',FSemaphore);
501 SemInit(FSemaphore,0);
502 FSignalledState^ := 0;
503 end;
504 end;
505
506 { TMultilockGate }
507
508 constructor TMultilockGate.Create(SemNum: cint; AOwner: TGlobalInterface);
509 begin
510 inherited Create;
511 FOwner := AOwner;
512 FSemaphore := SemNum;
513 FMutex := SemNum + 1;
514 FLockCount := PInteger(FOwner.SharedMemory.Allocate(sizeof(FLockCount)));
515 if FInitialiser then
516 begin
517 FLockCount^ := 0;
518 SemInit(FSemaphore,1);
519 SemInit(FMutex,1);
520 end;
521 end;
522
523 function TMultilockGate.GetLockCount: integer;
524 begin
525 Result := FLockCount^
526 end;
527
528 procedure TMultilockGate.Lock;
529 begin
530 sem_op(FMutex,-1,0); //Acquire Mutex
531 if FLockCount^ = 0 then
532 begin
533 //writeln(ClassName,': Locking ',FSemaphore);
534 SemInit(FSemaphore,0);
535 end;
536 Inc(FLockCount^);
537 sem_op(FMutex,1,0); //Release Mutex
538 end;
539
540 procedure TMultilockGate.Unlock;
541 begin
542 sem_op(FMutex,-1,0); //Acquire Mutex
543 Dec(FLockCount^);
544 if FLockCount^ <= 0 then
545 begin
546 //writeln(ClassName,': UnLocking ',FSemaphore);
547 SemInit(FSemaphore,1);
548 FLockCount^ := 0
549 end;
550 sem_op(FMutex,1,0); //Release Mutex
551 end;
552
553 procedure TMultilockGate.PassthroughGate;
554 begin
555 if FLockCount^ = 0 then
556 Exit;
557 //writeln(ClassName,': Waiting on ',FSemaphore);
558 while sem_timedop(FSemaphore,-1,cDefaultTimeout) < 0 do
559 {looks like we lost a reader}
560 begin
561 if FLockCount^ > 0 then
562 begin
563 UnLock;
564 if assigned(FOnGateTimeout) then
565 OnGateTimeout(self)
566 end
567 end;
568 sem_op(FSemaphore,1);
569 //writeln(ClassName,': Wait done on ',FSemaphore);
570 end;
571
572
573 { TGlobalInterface }
574
575 function TGlobalInterface.GetMonitorCount: integer;
576 begin
577 Result := GetSemValue(cMonitorCounter)
578 end;
579
580 constructor TGlobalInterface.Create;
581 begin
582 inherited Create;
583 FSharedMemory := TSharedMemory.Create(cMonitorHookSize);
584
585 FWriteLock := TMutex.Create(cMutexSemaphore);
586
587 FDataAvailableEvent := TSingleLockGate.Create(cDataAvailableEventSemaphore,self);
588 FWriterBusyEvent := TSingleLockGate.Create(cWriterBusyEventSemaphore,self);
589 FReadReadyEvent := TMultiLockGate.Create(cReadReadyEventSemaphore,self);
590 FReadFinishedEvent := TMultiLockGate.Create(cReadFinishedEventSemaphore,self);
591
592 if FInitialiser then
593 SemInit(cMonitorCounter,0);
594 FTraceDataType := PInteger(FSharedMemory.Allocate(sizeof(Integer)));
595 FTimeStamp := PDateTime(FSharedMemory.Allocate(sizeof(TDateTime)));
596 FBufferSize := PInteger(FSharedMemory.Allocate(sizeof(Integer)));
597 FBuffer := FSharedMemory.Allocate(0); //All remaining
598 FMaxBufferSize := FSharedMemory.LastAllocationSize;
599
600 if FInitialiser then
601 begin
602 FBufferSize^ := 0;
603 FDataAvailableEvent.Lock
604 end;
605 end;
606
607 destructor TGlobalInterface.Destroy;
608 begin
609 if assigned(FWriteLock) then FWriteLock.Free;
610 if assigned(FDataAvailableEvent) then FDataAvailableEvent.Free;
611 if assigned(FWriterBusyEvent) then FWriterBusyEvent.Free;
612 if assigned(FReadReadyEvent) then FReadReadyEvent.Free;
613 if assigned(FReadFinishedEvent) then FReadFinishedEvent.Free;
614 if assigned(FSharedMemory) then FSharedMemory.Free;
615 inherited Destroy;
616 end;
617
618 procedure TGlobalInterface.IncMonitorCount;
619 begin
620 sem_op(cMonitorCounter,1);
621 end;
622
623 procedure TGlobalInterface.DecMonitorCount;
624 begin
625 sem_op(cMonitorCounter,-1,IPC_NOWAIT);
626 end;
627
628 procedure TGlobalInterface.SendTrace(TraceObject: TTraceObject);
629 begin
630 FTraceDataType^ := Integer(TraceObject.FDataType);
631 FTimeStamp^ := TraceObject.FTimeStamp;
632 FBufferSize^ := Min(Length(TraceObject.FMsg), MaxBufferSize);
633 Move(TraceObject.FMsg[1], FBuffer^, FBufferSize^);
634 end;
635
636 procedure TGlobalInterface.ReceiveTrace(TraceObject: TTraceObject);
637 begin
638 SetString(TraceObject.FMsg, FBuffer, FBufferSize^);
639 TraceObject.FDataType := TTraceFlag(FTraceDataType^);
640 TraceObject.FTimeStamp := TDateTime(FTimeStamp^);
641 end;
642
643
644