Oracle 流stream将删除的数据保存

发布于:2024-05-17 ⋅ 阅读:(175) ⋅ 点赞:(0)

Oracle 流stream将删除的数据保存


--实验的目的是捕获hr.employees表的删除行,将删除行插入到emp_del表中。
--设置初始化参数

AQ_TM_PROCESSES=1
COMPATIBLE=9.2.0
LOG_PARALLELISM=1


--查看数据库的名称,我的为ora9,将以下的ora9全部替换为你的数据库名称
--数据库为归档模式
--建立表emp_del,用于存放EMPLOYEES的删除数据

conn hr/hr
CREATE TABLE emp_del( 
  employee_id    NUMBER(6), 
  first_name     VARCHAR2(20), 
  last_name      VARCHAR2(25), 
  email          VARCHAR2(25), 
  phone_number   VARCHAR2(20), 
  hire_date      DATE, 
  job_id         VARCHAR2(10), 
  salary         NUMBER(8,2), 
  commission_pct NUMBER(2,2), 
  manager_id     NUMBER(6), 
  department_id  NUMBER(4),
  timestamp      DATE);

CREATE UNIQUE INDEX emp_del_id_pk ON emp_del (employee_id);

ALTER TABLE emp_del ADD (CONSTRAINT emp_del_id_pk PRIMARY KEY (employee_id));


--建立管理用户,设定默认表空间,授权

conn / as sysdba
drop user strmadmin cascade;
GRANT CONNECT, RESOURCE, SELECT_CATALOG_ROLE 
  TO strmadmin IDENTIFIED BY strmadmin;
ALTER USER strmadmin DEFAULT TABLESPACE users;

GRANT ALL ON hr.emp_del  TO strmadmin;

GRANT EXECUTE ON DBMS_APPLY_ADM        TO strmadmin;
GRANT EXECUTE ON DBMS_AQ               TO strmadmin;
GRANT EXECUTE ON DBMS_AQADM            TO strmadmin;
GRANT EXECUTE ON DBMS_CAPTURE_ADM      TO strmadmin;
GRANT EXECUTE ON DBMS_FLASHBACK        TO strmadmin;
GRANT EXECUTE ON DBMS_STREAMS_ADM      TO strmadmin;

BEGIN 
  DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
    privilege    => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, 
    grantee      => 'strmadmin', 
    grant_option => FALSE);
END;
/

BEGIN 
  DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
    privilege    => DBMS_RULE_ADM.CREATE_RULE_OBJ, 
    grantee      => 'strmadmin', 
    grant_option => FALSE);
END;
/

--建立流队列,名称叫streams_queue ,用于存储捕获的变化

CONNECT strmadmin/strmadmin
EXEC DBMS_STREAMS_ADM.SET_UP_QUEUE();

--配置logmnr使用的表空间,我们就用tools

conn / as sysdba
EXECUTE DBMS_LOGMNR_D.SET_TABLESPACE('TOOLS');

--增强日志的模式

ALTER TABLE hr.employees  ADD SUPPLEMENTAL LOG GROUP log_group_employees_pk
  (employee_id) ALWAYS;


--配置捕获程序

CONNECT strmadmin/strmadmin
BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name     => 'hr.employees',   
    streams_type   => 'capture',
    streams_name   => 'capture_emp',
    queue_name     => 'strmadmin.streams_queue',
    include_dml    =>  true,
    include_ddl    =>  false);
END;
/


--设置scn

DECLARE
  iscn  NUMBER;         -- Variable to hold instantiation SCN value
BEGIN
  iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
  DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN(
    source_object_name    => 'hr.employees',
    source_database_name  => 'ora9',
    instantiation_scn     => iscn);
END;
/

--配置叫emp_agent的代理程序

BEGIN
  DBMS_AQADM.DROP_AQ_AGENT(
     agent_name => 'emp_agent');
END;
/

BEGIN
  DBMS_AQADM.CREATE_AQ_AGENT(
     agent_name => 'emp_agent');
  DBMS_AQADM.ENABLE_DB_ACCESS(
    agent_name  => 'emp_agent',
    db_username => 'strmadmin');
END;
/

--建立队列订户

DECLARE
  subscriber SYS.AQ$_AGENT;
BEGIN
  subscriber :=  SYS.AQ$_AGENT('emp_agent', NULL, NULL);  
  SYS.DBMS_AQADM.ADD_SUBSCRIBER(
    queue_name          =>  'strmadmin.streams_queue',
    subscriber                  =>  subscriber,
    rule                =>  NULL,
    transformation      =>  NULL);
END;
/

--建立存储过程enq_row_lcr 

CREATE OR REPLACE PROCEDURE enq_row_lcr(in_any IN SYS.ANYDATA) IS
  enqopt       DBMS_AQ.ENQUEUE_OPTIONS_T;
  mprop        DBMS_AQ.MESSAGE_PROPERTIES_T;
  recipients   DBMS_AQ.AQ$_RECIPIENT_LIST_T;
  enq_eventid  RAW(16);
BEGIN
  mprop.SENDER_ID := SYS.AQ$_AGENT(
    name     => 'emp_agent',
    address  => NULL,
    protocol => NULL);
  recipients(1) := SYS.AQ$_AGENT(
    name     => 'emp_agent',
    address  => NULL,
    protocol => NULL);
  mprop.RECIPIENT_LIST := recipients;
  DBMS_AQ.ENQUEUE(
    queue_name         => 'strmadmin.streams_queue',
    enqueue_options    => enqopt,
    message_properties => mprop,
    payload            => in_any,
    msgid              => enq_eventid);
END;
/

--建立DML处理存储过程

CREATE OR REPLACE PROCEDURE emp_dml_handler(in_any IN SYS.ANYDATA) IS
  lcr          SYS.LCR$_ROW_RECORD;
  rc           PLS_INTEGER;
  command      VARCHAR2(10);
  old_values   SYS.LCR$_ROW_LIST;
BEGIN
  -- Re-enqueue the row LCR for explicit dequeue by another application
  enq_row_lcr(in_any);
  -- Access the LCR
  rc := in_any.GETOBJECT(lcr);
  -- Get the object command type
  command := lcr.GET_COMMAND_TYPE();
  -- Check for DELETE command on the hr.employees table
  IF command = 'DELETE' THEN
    -- Set the command_type in the row LCR to INSERT
    lcr.SET_COMMAND_TYPE('INSERT');
    -- Set the object_name in the row LCR to EMP_DEL
    lcr.SET_OBJECT_NAME('EMP_DEL');
    -- Get the old values in the row LCR
    old_values := lcr.GET_VALUES('old');
    -- Set the old values in the row LCR to the new values in the row LCR
    lcr.SET_VALUES('new', old_values);
    -- Set the old values in the row LCR to NULL
    lcr.SET_VALUES('old', NULL);
    -- Add a SYSDATE value for the timestamp column
    lcr.ADD_COLUMN('new', 'TIMESTAMP', SYS.AnyData.ConvertDate(SYSDATE));
    --  Apply the row LCR as an INSERT into the EMP_DEL table
    lcr.EXECUTE(true);
  END IF;
END;
/

--配置DML管理者,为hr.employees

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => 'hr.employees',
    object_type         => 'TABLE',
    operation_name      => 'INSERT',
    error_handler       => false,
    user_procedure      => 'strmadmin.emp_dml_handler',
    apply_database_link => NULL);
END;
/

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => 'hr.employees',
    object_type         => 'TABLE',
    operation_name      => 'UPDATE',
    error_handler       => false,
    user_procedure      => 'strmadmin.emp_dml_handler',
    apply_database_link => NULL);
END;
/

BEGIN
  DBMS_APPLY_ADM.SET_DML_HANDLER(
    object_name         => 'hr.employees',
    object_type         => 'TABLE',
    operation_name      => 'DELETE',
    error_handler       => false,
    user_procedure      => 'strmadmin.emp_dml_handler',
    apply_database_link => NULL);
END;
/

--建立存储过程为出列和再入列事件

CREATE OR REPLACE PROCEDURE emp_dq (consumer IN VARCHAR2) AS
  deqopt       DBMS_AQ.DEQUEUE_OPTIONS_T;
  mprop        DBMS_AQ.MESSAGE_PROPERTIES_T;
  msgid        RAW(16);
  payload      SYS.AnyData;
  new_messages BOOLEAN := TRUE;
  row_lcr      SYS.LCR$_ROW_RECORD;
  tc           pls_integer;
  next_trans   EXCEPTION;
  no_messages  EXCEPTION; 
  pragma exception_init (next_trans, -25235);
  pragma exception_init (no_messages, -25228);
BEGIN
  deqopt.consumer_name := consumer;
  deqopt.wait := 1;
  WHILE (new_messages) LOOP
    BEGIN
      DBMS_AQ.DEQUEUE(
        queue_name          =>  'strmadmin.streams_queue',
        dequeue_options     =>  deqopt,
        message_properties  =>  mprop,
        payload             =>  payload,
        msgid               =>  msgid);
      COMMIT;
      deqopt.navigation := DBMS_AQ.NEXT;
      IF (payload.GetTypeName = 'SYS.LCR$_ROW_RECORD') THEN
        tc := payload.GetObject(row_lcr);   
        DBMS_OUTPUT.PUT_LINE(row_lcr.GET_COMMAND_TYPE || ' row LCR dequeued');
      END IF;                       
      EXCEPTION
        WHEN next_trans THEN
        deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
        WHEN no_messages THEN
          new_messages  := FALSE;
          DBMS_OUTPUT.PUT_LINE('No more events');
     END;
  END LOOP; 
END;
/

--配置应用程序

BEGIN
  DBMS_STREAMS_ADM.ADD_TABLE_RULES(
    table_name      => 'hr.employees',
    streams_type    => 'apply', 
    streams_name    => 'apply_emp',
    queue_name      => 'strmadmin.streams_queue',
    include_dml     =>  true,
    include_ddl     =>  false,
    source_database => 'ora9');
END;
/

--启动应用程序

BEGIN
  DBMS_APPLY_ADM.SET_PARAMETER(
    apply_name  => 'apply_emp', 
    parameter   => 'disable_on_error', 
    value       => 'n');
END;
/
 
BEGIN
  DBMS_APPLY_ADM.START_APPLY(
    apply_name  => 'apply_emp');
END;
/

--启动捕获程序

BEGIN
  DBMS_CAPTURE_ADM.START_CAPTURE(
    capture_name  => 'capture_emp');
END;
/

--对hr.employees进行插入,删除和修改
conn hr/hr
INSERT INTO hr.employees values(207, 'JOHN', 'SMITH', 'JSMITH@MYCOMPANY.COM', 
  NULL, '07-JUN-94', 'AC_ACCOUNT', 777, NULL, NULL, 110);
COMMIT;

UPDATE hr.employees SET salary=5999 WHERE employee_id=206;
COMMIT;

DELETE FROM hr.employees WHERE employee_id=207;
COMMIT;


CONNECT strmadmin/strmadmin
SELECT * FROM hr.emp_del;

SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$STREAMS_QUEUE_TABLE;

EXEC emp_dq('emp_agent');

SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$STREAMS_QUEUE_TABLE;

--显示应用程序的错误

COLUMN APPLY_NAME HEADING 'Apply|Process|Name' FORMAT A8
COLUMN SOURCE_DATABASE HEADING 'Source|Database' FORMAT A8
COLUMN LOCAL_TRANSACTION_ID HEADING 'Local|Transaction|ID' FORMAT A11
COLUMN ERROR_MESSAGE HEADING 'Error Message' FORMAT A50

SELECT APPLY_NAME, SOURCE_DATABASE, LOCAL_TRANSACTION_ID, ERROR_MESSAGE
  FROM DBA_APPLY_ERROR;



网站公告

今日签到

点亮在社区的每一天
去签到