--- ibx/branches/journaling/fbintf/client/FBAttachment.pas 2021/12/07 13:27:39 362 +++ ibx/branches/journaling/fbintf/client/FBAttachment.pas 2021/12/07 13:30:05 363 @@ -39,7 +39,7 @@ interface uses Classes, SysUtils, {$IFDEF WINDOWS} windows, {$ENDIF} IB, FBParamBlock, - FBActivityMonitor, FBClientAPI; + FBActivityMonitor, FBClientAPI, IBUtils; const DefaultMaxInlineBlobLimit = 8192; @@ -53,9 +53,84 @@ type AllowReverseLookup: boolean; {used to ensure that lookup of CP_UTF* does not return UNICODE_FSS} end; + { Database Journalling. + + This class is intended to support a client side journal of all database + updates, inserts and deletes made by the client during a session. It also records + the transaction each update was made under. + + The database schema is required to include a control table "IBX$JOURNALS" and + an SQL Sequence IBX$SESSIONS. These are created by the class when the + database is opened, if they are not already present. However, it is recommended + that they are created as an orginal part of the database schema in order to + unnecessarily avoid each user being given sufficient priviledge to create tables + and Sequences. + + Syntax: + + Transaction Start: + *S:,,,:,:, + + Transaction Commit: + *C:,, + + Transaction Commit retaining : + *c:,, + + Transaction Rollback: + *R:,, + + Transaction Rollback retaining: + *r:,, + + Update/Insert/Delete + *Q:,,,: + + } + + { TFBJournaling } + + TFBJournaling = class(TActivityHandler, IJournallingHook) + private + {Logfile} + const sQueryJournal = '*Q:''%s'',%d,%d,%d:%s' + LineEnding; + const sTransStartJnl = '*S:''%s'',%d,%d,%d:%s,%d:%s,%d' + LineEnding; + const sTransCommitJnl = '*C:''%s'',%d,%d' + LineEnding; + const sTransCommitRetJnl = '*c:''%s'',%d,%d,%d' + LineEnding; + const sTransRollBackJnl = '*R:''%s'',%d,%d' + LineEnding; + const sTransRollBackRetJnl = '*r:''%s'',%d,%d,%d' + LineEnding; + private + FOptions: TJournalOptions; + FJournalFilePath: string; + FJournalFileStream: TStream; + FSessionID: integer; + FDoNotJournal: boolean; + function GetDateTimeFmt: AnsiString; + protected + procedure EndSession(RetainJournal: boolean); + function GetAttachment: IAttachment; virtual; abstract; + public + {IAttachment} + procedure Disconnect(Force: boolean=false); virtual; + public + {IJournallingHook} + procedure TransactionStart(Tr: ITransaction); + function TransactionEnd( TransactionID: integer; Action: TTransactionAction): boolean; + procedure TransactionRetained(Tr: ITransaction; OldTransactionID: integer; + Action: TTransactionAction); + procedure ExecQuery(Stmt: IStatement); + public + {Client side Journaling} + function JournalingActive: boolean; + function GetJournalOptions: TJournalOptions; + function StartJournaling(aJournalLogFile: AnsiString): integer; overload; + function StartJournaling(aJournalLogFile: AnsiString; Options: TJournalOptions): integer; overload; + procedure StopJournaling(RetainJournal: boolean); + end; + { TFBAttachment } - TFBAttachment = class(TActivityHandler) + TFBAttachment = class(TFBJournaling) private FDPB: IDPB; FFirebirdAPI: IFirebirdAPI; @@ -90,9 +165,11 @@ type function getDPB: IDPB; function AllocateBPB: IBPB; function AllocateDIRB: IDIRB; - function StartTransaction(TPB: array of byte; DefaultCompletion: TTransactionCompletion): ITransaction; overload; virtual; abstract; - function StartTransaction(TPB: ITPB; DefaultCompletion: TTransactionCompletion): ITransaction; overload; virtual; abstract; - procedure Disconnect(Force: boolean=false); virtual; abstract; + function StartTransaction(TPB: array of byte; + DefaultCompletion: TTransactionCompletion; + aName: AnsiString=''): ITransaction; overload; virtual; abstract; + function StartTransaction(TPB: ITPB; DefaultCompletion: TTransactionCompletion; + aName: AnsiString=''): ITransaction; overload; virtual; abstract; procedure ExecImmediate(transaction: ITransaction; sql: AnsiString; aSQLDialect: integer); overload; virtual; abstract; procedure ExecImmediate(TPB: array of byte; sql: AnsiString; aSQLDialect: integer); overload; procedure ExecImmediate(transaction: ITransaction; sql: AnsiString); overload; @@ -156,6 +233,7 @@ type function GetDBInformation(Requests: array of byte): IDBInformation; overload; function GetDBInformation(Request: byte): IDBInformation; overload; function GetDBInformation(Requests: IDIRB): IDBInformation; overload; + function GetAttachmentID: integer; function GetConnectString: AnsiString; function GetRemoteProtocol: AnsiString; function GetAuthenticationMethod: AnsiString; @@ -166,6 +244,7 @@ type function GetInlineBlobLimit: integer; procedure SetInlineBlobLimit(limit: integer); function HasBatchMode: boolean; virtual; + function HasTable(aTableName: AnsiString): boolean; public {Character Sets} @@ -214,7 +293,31 @@ type implementation -uses FBMessages, IBUtils, FBTransaction {$IFDEF HASREQEX}, RegExpr{$ENDIF}; +uses FBMessages, IBErrorCodes, FBTransaction {$IFDEF HASREQEX}, RegExpr{$ENDIF}; + +const + {Journaling} + sJournalTableName = 'IBX$JOURNALS'; + sSequenceName = 'IBX$SESSIONS'; + + sqlCreateJournalTable = + 'Create Table ' + sJournalTableName + '(' + + ' IBX$SessionID Integer not null, '+ + ' IBX$TransactionID Integer not null, '+ + ' IBX$OldTransactionID Integer, '+ + ' IBX$USER VarChar(32) Default CURRENT_USER, '+ + ' IBX$CREATED TIMESTAMP Default CURRENT_TIMESTAMP, '+ + ' Primary Key(IBX$SessionID,IBX$TransactionID)' + + ')'; + + sqlCreateSequence = 'CREATE SEQUENCE ' + sSequenceName; + + sqlGetNextSessionID = 'Select Gen_ID(' + sSequenceName + ',1) as SessionID From RDB$DATABASE'; + + sqlRecordJournalEntry = 'Insert into ' + sJournalTableName + '(IBX$SessionID,IBX$TransactionID,IBX$OldTransactionID) '+ + 'Values(?,?,?)'; + + sqlCleanUpSession = 'Delete From ' + sJournalTableName + ' Where IBX$SessionID = ?'; const CharSetMap: array [0..69] of TCharsetMap = ( @@ -391,6 +494,264 @@ const 'decfloat_traps' ); +type + + { TQueryProcessor } + + TQueryProcessor=class(TSQLTokeniser) + private + FInString: AnsiString; + FIndex: integer; + FStmt: IStatement; + function DoExecute: AnsiString; + function GetParamValue(ParamIndex: integer): AnsiString; + protected + function GetChar: AnsiChar; override; + public + class function Execute(Stmt: IStatement): AnsiString; + end; + + { TQueryProcessor } + +function TQueryProcessor.DoExecute: AnsiString; +var token: TSQLTokens; + ParamIndex: integer; +begin + Result := ''; + ParamIndex := 0; + + while not EOF do + begin + token := GetNextToken; + case token of + sqltPlaceHolder: + begin + Result := Result + GetParamValue(ParamIndex); + Inc(ParamIndex); + end; + else + Result := Result + TokenText; + end; + end; +end; + +function TQueryProcessor.GetParamValue(ParamIndex: integer): AnsiString; +begin + with FStmt.SQLParams[ParamIndex] do + begin + if IsNull then + Result := 'NULL' + else + case GetSQLType of + SQL_BLOB: + if getSubType = 1 then {string} + Result := '''' + SQLSafeString(GetAsString) + '''' + else + Result := TSQLXMLReader.FormatBlob(GetAsString,getSubType); + + SQL_ARRAY: + Result := TSQLXMLReader.FormatArray(getAsArray); + + SQL_VARYING, + SQL_TEXT, + SQL_TIMESTAMP, + SQL_TYPE_DATE, + SQL_TYPE_TIME, + SQL_TIMESTAMP_TZ_EX, + SQL_TIME_TZ_EX, + SQL_TIMESTAMP_TZ, + SQL_TIME_TZ: + Result := '''' + SQLSafeString(GetAsString) + ''''; + else + Result := GetAsString; + end; + end; +end; + +function TQueryProcessor.GetChar: AnsiChar; +begin + if FIndex <= Length(FInString) then + begin + Result := FInString[FIndex]; + Inc(FIndex); + end + else + Result := #0; +end; + +class function TQueryProcessor.Execute(Stmt: IStatement): AnsiString; +begin + if not Stmt.IsPrepared then + IBError(ibxeSQLClosed,[]); + with self.Create do + try + FStmt := Stmt; + FInString := Stmt.GetProcessedSQLText; + FIndex := 1; + Result := Trim(DoExecute); + finally + Free; + end; +end; + +{ TFBJournaling } + +function TFBJournaling.GetDateTimeFmt: AnsiString; +begin + {$IF declared(DefaultFormatSettings)} + with DefaultFormatSettings do + {$ELSE} + {$IF declared(FormatSettings)} + with FormatSettings do + {$IFEND} + {$IFEND} + Result := ShortDateFormat + ' ' + LongTimeFormat + '.zzzz' +end; + +procedure TFBJournaling.EndSession(RetainJournal: boolean); +begin + if JournalingActive then + begin + FreeAndNil(FJournalFileStream); + if not RetainJournal then + try + GetAttachment.ExecuteSQL([isc_tpb_write,isc_tpb_wait,isc_tpb_consistency], + sqlCleanUpSession,[FSessionID]); + sysutils.DeleteFile(FJournalFilePath); + except On E: EIBInterBaseError do + if E.IBErrorCode <> isc_lost_db_connection then + raise; + {ignore - do not delete journal if database gone away} + end; + FSessionID := -1; + end; +end; + +procedure TFBJournaling.Disconnect(Force: boolean); +begin + if JournalingActive then + EndSession(Force); +end; + +procedure TFBJournaling.TransactionStart(Tr: ITransaction); +var LogEntry: AnsiString; + TPBText: AnsiString; +begin + FDoNotJournal := true; + try + GetAttachment.ExecuteSQL(Tr,sqlRecordJournalEntry,[FSessionID,Tr.GetTransactionID,NULL]); + finally + FDoNotJournal := false; + end; + TPBText := Tr.getTPB.AsText; + LogEntry := Format(sTransStartJnl,[FBFormatDateTime(GetDateTimeFmt,Now), + FSessionID, + Tr.GetTransactionID, + Length(Tr.TransactionName), + Tr.TransactionName, + Length(TPBText),TPBText, + ord(tr.GetDefaultCompletion)]); + if assigned(FJournalFileStream) then + FJournalFileStream.Write(LogEntry[1],Length(LogEntry)); +end; + +function TFBJournaling.TransactionEnd(TransactionID: integer; + Action: TTransactionAction): boolean; + +var LogEntry: AnsiString; +begin + Result := false; + case Action of + TARollback: + begin + LogEntry := Format(sTransRollbackJnl,[FBFormatDateTime(GetDateTimeFmt,Now),FSessionID,TransactionID]); + Result := true; + end; + TACommit: + begin + LogEntry := Format(sTransCommitJnl,[FBFormatDateTime(GetDateTimeFmt,Now),FSessionID,TransactionID]); + Result := true; + end; + end; + if assigned(FJournalFileStream) then + FJournalFileStream.Write(LogEntry[1],Length(LogEntry)); +end; + +procedure TFBJournaling.TransactionRetained(Tr: ITransaction; + OldTransactionID: integer; Action: TTransactionAction); +var LogEntry: AnsiString; +begin + case Action of + TACommitRetaining: + LogEntry := Format(sTransCommitRetJnl,[FBFormatDateTime(GetDateTimeFmt,Now), + FSessionID,Tr.GetTransactionID,OldTransactionID]); + TARollbackRetaining: + LogEntry := Format(sTransRollbackRetJnl,[FBFormatDateTime(GetDateTimeFmt,Now), + FSessionID,Tr.GetTransactionID,OldTransactionID]); + end; + if assigned(FJournalFileStream) then + FJournalFileStream.Write(LogEntry[1],Length(LogEntry)); + + FDoNotJournal := true; + try + GetAttachment.ExecuteSQL(Tr,sqlRecordJournalEntry,[FSessionID,Tr.GetTransactionID,OldTransactionID]); + finally + FDoNotJournal := false; + end; +end; + +procedure TFBJournaling.ExecQuery(Stmt: IStatement); +var SQL: AnsiString; + LogEntry: AnsiString; +begin + SQL := TQueryProcessor.Execute(Stmt); + LogEntry := Format(sQueryJournal,[FBFormatDateTime(GetDateTimeFmt,Now), + FSessionID, + Stmt.GetTransaction.GetTransactionID, + Length(SQL),SQL]); + if assigned(FJournalFileStream) then + FJournalFileStream.Write(LogEntry[1],Length(LogEntry)); +end; + +function TFBJournaling.JournalingActive: boolean; +begin + Result := (FJournalFileStream <> nil) and not FDoNotJournal; +end; + +function TFBJournaling.GetJournalOptions: TJournalOptions; +begin + Result := FOptions; +end; + +function TFBJournaling.StartJournaling(aJournalLogFile: AnsiString): integer; +begin + Result := StartJournaling(aJournalLogFile,[joReadWriteTransactions,joModifyQueries]); +end; + +function TFBJournaling.StartJournaling(aJournalLogFile: AnsiString; + Options: TJournalOptions): integer; +begin + FOptions := Options; + with GetAttachment do + begin + if not HasTable(sJournalTableName) then + begin + ExecImmediate([isc_tpb_write,isc_tpb_wait,isc_tpb_consistency],sqlCreateJournalTable); + ExecImmediate([isc_tpb_write,isc_tpb_wait,isc_tpb_consistency],sqlCreateSequence); + end; + FSessionID := OpenCursorAtStart(sqlGetNextSessionID)[0].AsInteger; + end; + FJournalFilePath := aJournalLogFile; + FJournalFileStream := TFileStream.Create(FJournalFilePath,fmCreate); + Result := FSessionID; +end; + +procedure TFBJournaling.StopJournaling(RetainJournal: boolean); +begin + EndSession(RetainJournal); +end; + + { TFBAttachment } @@ -889,6 +1250,16 @@ begin Result := GetDBInfo(getBuffer,getDataLength); end; +function TFBAttachment.GetAttachmentID: integer; +var Info: IDBInformation; +begin + Info := GetDBInformation(isc_info_attachment_id); + if (Info.Count > 0) and (Info[0].getItemType = isc_info_attachment_id) then + Result := Info[0].getAsInteger + else + Result := -1; +end; + function TFBAttachment.GetConnectString: AnsiString; begin Result := FDatabaseName; @@ -942,6 +1313,13 @@ begin Result := false; end; +function TFBAttachment.HasTable(aTableName: AnsiString): boolean; +begin + Result := OpenCursorAtStart( + 'Select count(*) From RDB$RELATIONS Where RDB$RELATION_NAME = ?', + [aTableName])[0].AsInteger > 0; +end; + function TFBAttachment.HasDefaultCharSet: boolean; begin Result := FHasDefaultCharSet