Thinking Out Loud

November 20, 2010

My Change Data Capture Adventure Part1

Filed under: 11g,CDC,oracle — mdinh @ 4:38 pm

This is my adventure in automating the configuration for Change Data Capture (CDC) using Asynchronous HotLog.

Please read the reference materials since not all steps are listed here.

For example: setting the database initialization parameter and FORCE LOGGING

Simply, CDC have the following components

  1. Source
  2. Publisher to publish the changed data
  3. Subscriber to subscribe to the changed data

In my example:

  1. Source = SCOTT
  2. Publisher = OPS$ORACLE
  3. Subscriber = SCOTT_SUB

As part of the setup, I have created 2 tables – CLIENTS (Source Schema) and XCDC (Source Table)

OPS$ORACLE@dbtest> select * from CLIENTS;
PK_CLIENT_ID
------------------------------
JOE
JOHN
SCOTT
OPS$ORACLE@dbtest> select * from XCDC;
TABLE_NAME
 ------------------------------
 BADTABLE
 DEPT
 EMP
OPS$ORACLE@dbtest>

NOTE:  BADTABLE is not a valid table.

ANOTHER NOTE:  This is a Multi-Tenant Data Architecture; hence, EMP & DEPT tables will exist for ALL clients.

******* GRANT PRIVILEGES TO THE PUBLISHER: *******

SYS@dbtest> -- Execute as SYS
SYS@dbtest> /*
SYS@dbtest> CREATE TABLESPACE cdc_data DATAFILE
SYS@dbtest>   '/oracle/oradata/dbtest/cdc_data01.dbf' SIZE 524352K AUTOEXTEND ON NEXT 512M MAXSIZE 16777280K
SYS@dbtest> EXTENT MANAGEMENT LOCAL UNIFORM SIZE 1m
SYS@dbtest> SEGMENT SPACE MANAGEMENT AUTO;
SYS@dbtest> */
SYS@dbtest> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
Database altered.
SYS@dbtest> ALTER USER ops$oracle QUOTA UNLIMITED ON cdc_data;
User altered.
SYS@dbtest> ALTER USER ops$oracle QUOTA UNLIMITED ON sysaux;
User altered.
SYS@dbtest> ALTER USER ops$oracle DEFAULT TABLESPACE cdc_data;
User altered.
SYS@dbtest> -- system privs
SYS@dbtest> GRANT create session TO ops$oracle;
Grant succeeded.
SYS@dbtest> GRANT create table TO ops$oracle;
Grant succeeded.
SYS@dbtest> GRANT create sequence TO ops$oracle;
Grant succeeded.
SYS@dbtest> GRANT create procedure TO ops$oracle;
Grant succeeded.
SYS@dbtest> GRANT create job TO ops$oracle;
Grant succeeded.
SYS@dbtest> -- role privs
SYS@dbtest> GRANT execute_catalog_role TO ops$oracle;
Grant succeeded.
SYS@dbtest> GRANT select_catalog_role TO ops$oracle;
Grant succeeded.
SYS@dbtest> GRANT dba TO ops$oracle;
Grant succeeded.
SYS@dbtest> GRANT execute ON dbms_cdc_publish TO ops$oracle;
Grant succeeded.
SYS@dbtest> -- streams specific priv
SYS@dbtest> execute dbms_streams_auth.grant_admin_privilege(grantee=>'OPS$ORACLE');
PL/SQL procedure successfully completed.
SYS@dbtest> --
SYS@dbtest> connect /
Connected.
OPS$ORACLE@dbtest> CREATE TABLE xcdc (table_name VARCHAR2 (30));
Table created.
OPS$ORACLE@dbtest> ALTER TABLE xcdc ADD CONSTRAINT PK_xcdc PRIMARY KEY(table_name) USING INDEX;
Table altered.
OPS$ORACLE@dbtest> exit

sqlp is just an alias

> alias sqlp
alias sqlp='sqlplus /'

******* ADD SUPPLEMENTAL LOG DATA FOR SOURCE TABLES: *******

> sqlp @add_sup.sql scott

SQL*Plus: Release 11.2.0.2.0 Production on Sat Nov 20 06:02:38 2010

Copyright (c) 1982, 2010, Oracle.  All rights reserved.

Connected to:
Oracle Database 11g Enterprise Edition Release 11.2.0.2.0 - 64bit Production
With the Partitioning, OLAP, Data Mining and Real Application Testing options

OPS$ORACLE@dbtest> define vsource = &1
OPS$ORACLE@dbtest> DECLARE
 2     l_sql   VARCHAR2 (1000);
 3  BEGIN
 4     FOR x IN (
 5            SELECT t.owner, z.table_name
 6              FROM xcdc z, clients c, dba_tables t
 7             WHERE c.pk_client_id = UPPER('&&vsource')
 8               AND c.pk_client_id = t.owner
 9               AND z.table_name   = t.table_name
 10            MINUS
 11            SELECT g.owner, g.table_name
 12              FROM dba_log_groups g, clients c
 13             WHERE     g.log_group_type = 'ALL COLUMN LOGGING'
 14                   AND c.pk_client_id = owner
 15                   AND c.pk_client_id = UPPER('&&vsource')
 16     ) LOOP
 17        l_sql := 'alter table '||x.owner||'.'||x.table_name||' add supplemental log data (all) columns';
 18        EXECUTE IMMEDIATE l_sql;
 19        DBMS_OUTPUT.put_line (l_sql);
 20     END LOOP;
 21  EXCEPTION
 22     WHEN OTHERS
 23     THEN
 24        DBMS_OUTPUT.put_line ('******* ERROR: ' || l_sql);
 25        RAISE;
 26  END;
 27  /
alter table SCOTT.DEPT add supplemental log data (all) columns
alter table SCOTT.EMP add supplemental log data (all) columns

PL/SQL procedure successfully completed.

OPS$ORACLE@dbtest> select owner, table_name from DBA_LOG_GROUPS where owner=UPPER('&vsource');

OWNER                          TABLE_NAME
------------------------------ ------------------------------
SCOTT                          DEPT
SCOTT                          EMP

OPS$ORACLE@dbtest> exit
 

NOTE that BADTABLE was ignored because it is not in DBA_TABLES.

******* PREPARE THE SOURCE TABLES: *******

> sqlp @prepare_tab.sql scott

SQL*Plus: Release 11.2.0.2.0 Production on Sat Nov 20 06:16:27 2010

Copyright (c) 1982, 2010, Oracle.  All rights reserved.

Connected to:
Oracle Database 11g Enterprise Edition Release 11.2.0.2.0 - 64bit Production
With the Partitioning, OLAP, Data Mining and Real Application Testing options

OPS$ORACLE@dbtest> define vsource = &1
OPS$ORACLE@dbtest> DECLARE
 2     l_sql     VARCHAR2 (1000);
 3     l_table   VARCHAR2 (100);
 4  BEGIN
 5     FOR x IN (
 6            SELECT t.owner, z.table_name
 7              FROM xcdc z, clients c, dba_tables t
 8             WHERE c.pk_client_id = UPPER('&&vsource')
 9               AND c.pk_client_id = t.owner
 10               AND z.table_name   = t.table_name
 11            MINUS
 12            SELECT p.table_owner, p.table_name
 13              FROM dba_capture_prepared_tables p
 14              WHERE p.table_owner=UPPER('&&vsource')
 15     ) LOOP
 16        l_table := x.owner||'.'||x.table_name;
 17        DBMS_CAPTURE_ADM.prepare_table_instantiation (l_table);
 18     END LOOP;
 19  EXCEPTION
 20     WHEN OTHERS
 21     THEN
 22        DBMS_OUTPUT.put_line ('******* ERROR: ' || l_sql);
 23        RAISE;
 24  END;
 25  /

PL/SQL procedure successfully completed.

OPS$ORACLE@dbtest> select table_owner, table_name from DBA_CAPTURE_PREPARED_TABLES where table_owner=UPPER('&&vsource');

TABLE_OWNER                    TABLE_NAME
------------------------------ ------------------------------
SCOTT                          EMP
SCOTT                          DEPT

OPS$ORACLE@dbtest> exit
 

******* CREATE CHANGE SETS: *******

> sqlp @changeset.sql

SQL*Plus: Release 11.2.0.2.0 Production on Sat Nov 20 06:32:17 2010

Copyright (c) 1982, 2010, Oracle.  All rights reserved.

Connected to:
Oracle Database 11g Enterprise Edition Release 11.2.0.2.0 - 64bit Production
With the Partitioning, OLAP, Data Mining and Real Application Testing options

OPS$ORACLE@dbtest> DECLARE
 2  l_cs VARCHAR2(100);
 3  l_de VARCHAR2(100);
 4  BEGIN
 5    FOR x IN (SELECT pk_client_id FROM clients)
 6    LOOP
 7       l_cs := x.pk_client_id||'_CS';
 8       l_de := x.pk_client_id||'_CHANGE_SET';
 9       dbms_cdc_publish.create_change_set(change_set_name=>l_cs,description=>l_de,change_source_name=>'HOTLOG_SOURCE',stop_on_ddl=>'Y') ;
 10    END LOOP;
 11  END;
 12  /

PL/SQL procedure successfully completed.

OPS$ORACLE@dbtest> set echo off
==============================
SET_NAME                      : SCOTT_CS
CHANGE_SOURCE_NAME            : HOTLOG_SOURCE
BEGIN_DATE                    :
END_DATE                      :
BEGIN_SCN                     :
END_SCN                       :
FRESHNESS_DATE                :
FRESHNESS_SCN                 :
ADVANCE_ENABLED               : N
IGNORE_DDL                    :
CREATED                       : 20-NOV-2010 06:32:41
ROLLBACK_SEGMENT_NAME         :
ADVANCING                     : N
PURGING                       : N
LOWEST_SCN                    : 0
TABLESPACE                    :
CAPTURE_ENABLED               : N
STOP_ON_DDL                   : Y
CAPTURE_ERROR                 : N
CAPTURE_NAME                  : CDC$C_SCOTT_CS
QUEUE_NAME                    : CDC$Q_SCOTT_CS
QUEUE_TABLE_NAME              : CDC$T_SCOTT_CS
APPLY_NAME                    : CDC$A_SCOTT_CS
SET_DESCRIPTION               : SCOTT_CHANGE_SET
PUBLISHER                     : OPS$ORACLE
LOWEST_TIMESTAMP              :
MAP_NAME                      : CDC$M_SCOTT_CS
==============================

PL/SQL procedure successfully completed.
 

So far, this has been the easy part. The harder part is to create_change_table and provide column_type_list.

I will post as soon as I get a working model.

References:
>>Database Data Warehousing Guide: Change Data Capture
Comparison Between Oracle Streams and Change Data Capture [ID 727445.1]

Advertisements

4 Comments »

  1. I use CDC for auditing table changes and use a spreadsheet to manage the base, audit and view table names since some base tables are close to 30 characters. I wanted to have a unique audit table name even if it was in a separate schema. below is the text from the spreadsheet. The lists are formed with concatenate functions and the last one in the list is used to paste into the generate script for the change tale and subscription.

    Base Table Perm Audit Table Base Table Schema CDC View Base Table List Audit Table List Schema List View List
    BASE_TABLE_NAME1 BASE_TABLE_NAME1_AUD BASESCHEMA BASE_TABLE_NAME1_V ‘BASE_TABLE_NAME1’ ‘BASE_TABLE_NAME1_AUD’ ‘BASESCHEMA’ ‘BASE_TABLE_NAME1_V’
    BASE_TABLE_NAME2 BASE_TABLE_NAME2_AUD BASESCHEMA BASE_TABLE_NAME2_V ‘BASE_TABLE_NAME1′,’BASE_TABLE_NAME2’ ‘BASE_TABLE_NAME1_AUD’,’BASE_TABLE_NAME2_AUD’ ‘BASESCHEMA’,’BASESCHEMA’ ‘BASE_TABLE_NAME1_V’,’BASE_TABLE_NAME2_V’

    Comment by Peter Barnum — July 23, 2014 @ 2:45 pm | Reply

  2. this is the create change table script –

    SET VERIFY OFF
    SET ECHO OFF
    SET FEEDBACK OFF
    SET HEADING OFF
    set PAGESIZE 0
    set serveroutput on

    — Script to generate DBMS_CDC_PUBLISH.CREATE_CHANGE_TABLE given a list of tables

    declare
    type tabname_arr is VARRAY(287) of VARCHAR2(32);
    type schema_arr is VARRAY(287) of VARCHAR(16);
    tabname_set tabname_arr := tabname_arr(‘BASE_TABLE_NAME1′,’BASE_TABLE_NAME2’);
    audname_set tabname_arr := tabname_arr(‘BASE_TABLE_NAME1_AUD’,’BASE_TABLE_NAME2_AUD’);
    schema_set schema_arr := schema_arr(‘BASE_TABLE1_SCHEMA’,’BASE_TABLE1_SCHEMA’);

    begin
    for i in 1..2 LOOP
    dbms_output.put_line (‘– ‘ || tabname_set(i));
    dbms_output.put_line (‘BEGIN’);
    dbms_output.put_line (‘ DBMS_CDC_PUBLISH.CREATE_CHANGE_TABLE(‘);
    dbms_output.put_line (‘ owner => ”cdcpub”,’);
    dbms_output.put_line (‘ change_table_name => ”’||audname_set(i)||”’,’);
    dbms_output.put_line (‘ change_set_name => ”APPCHG”,’);
    dbms_output.put_line (‘ source_schema => ”’||schema_set(i)||”’,’);
    dbms_output.put_line (‘ source_table => ”’||tabname_set(i)||”’,’);

    — This section puts in the change table column DDL to match the base table
    declare
    colDDL varchar2(128);
    colTypeList varchar2(12000);
    cursor colDDL_cur IS
    select column_name || ‘ ‘ || data_type||
    case
    when data_type = ‘TIMESTAMP(6)’ then ”
    when data_precision is not null and nvl(data_scale,0)>0 then ‘(‘||data_precision||’,’||data_scale||’)’
    when data_precision is not null and nvl(data_scale,0)=0 then ‘(‘||data_precision||’)’
    when data_precision is null and data_scale is not null then ‘(38,’||data_scale||’)’
    when char_length>0 then ‘(‘||char_length|| case char_used
    when ‘B’ then ‘ Byte’
    when ‘C’ then ‘ Char’
    else null
    end||’)’
    end
    from dba_tab_columns
    where table_name = tabname_set(i) and owner = schema_set(i) order by column_id;

    begin
    open colDDL_cur;
    fetch colDDL_cur into colDDL;
    colTypeList:=colDDL;
    loop
    fetch colDDL_cur into colDDL;
    exit when colDDL_cur%NOTFOUND;
    colTypeList:=colTypeList ||’, ‘|| colDDL;
    end loop;
    close colDDL_cur;
    dbms_output.put_line(‘ column_type_list => ”’|| colTypeList||”’,’);
    end;

    dbms_output.put_line (‘ capture_values => ”both”,’);
    dbms_output.put_line (‘ rs_id => ”y”,’);
    dbms_output.put_line (‘ row_id => ”y”,’);
    dbms_output.put_line (‘ user_id => ”n”,’);
    dbms_output.put_line (‘ timestamp => ”y”,’);
    dbms_output.put_line (‘ object_id => ”n”,’);
    dbms_output.put_line (‘ source_colmap => ”n”,’);
    dbms_output.put_line (‘ target_colmap => ”n”,’);
    dbms_output.put_line (‘ ddl_markers => ”n”,’);
    dbms_output.put_line (‘ options_string => ”TABLESPACE xxxxxxx”);’);
    dbms_output.put_line (‘END;’);
    dbms_output.put_line (‘/’);
    end loop;

    Comment by Peter Barnum — July 23, 2014 @ 2:45 pm | Reply

  3. — This is the script to generate the subscription

    SET VERIFY OFF
    SET ECHO OFF
    SET FEEDBACK OFF
    SET HEADING OFF
    set PAGESIZE 0
    set serveroutput on

    — Script to generate DBMS_CDC_PUBLISH.CREATE_CREATE_SUBSCRIPTION

    declare
    type tabname_arr is VARRAY(287) of VARCHAR2(32);
    type schema_arr is VARRAY(287) of VARCHAR(16);
    tabname_set tabname_arr := tabname_arr(‘BASE_TABLE_NAME1′,’BASE_TABLE_NAME2’);
    vwname_set tabname_arr := tabname_arr(‘BASE_TABLE_NAME1_V’,’BASE_TABLE_NAME2_V’);
    schema_set schema_arr := schema_arr(‘BASE_TABLE1_SCHEMA’,’BASE_TABLE1_SCHEMA’);

    begin
    — First, drop subscription if it exists
    dbms_output.put_line (‘– Drop subscription for copying change data if it exists’);
    dbms_output.put_line (‘BEGIN’);
    dbms_output.put_line (‘ DBMS_CDC_SUBSCRIBE.DROP_SUBSCRIPTION(‘);
    dbms_output.put_line (‘ subscription_name => ”APP_CHANGE_DATA”);’);
    dbms_output.put_line (‘END;’);
    dbms_output.put_line (‘/’);

    — First create subscription
    dbms_output.put_line (‘– Create subscription for copying change data’);
    dbms_output.put_line (‘BEGIN’);
    dbms_output.put_line (‘ DBMS_CDC_SUBSCRIBE.CREATE_SUBSCRIPTION(‘);
    dbms_output.put_line (‘ change_set_name => ”APPCHG”,’);
    dbms_output.put_line (‘ description => ”Change data for APP”,’);
    dbms_output.put_line (‘ subscription_name => ”APP_CHANGE_DATA”);’);
    dbms_output.put_line (‘END;’);
    dbms_output.put_line (‘/’);

    — Next, subscribe for all xx tables
    for i in 1..2 LOOP
    dbms_output.put_line (‘– Subscribe for table –> ‘ || tabname_set(i));
    dbms_output.put_line (‘BEGIN’);
    dbms_output.put_line (‘ DBMS_CDC_SUBSCRIBE.SUBSCRIBE(‘);
    dbms_output.put_line (‘ subscription_name => ”APP_CHANGE_DATA”,’);
    dbms_output.put_line (‘ source_schema => ”’||schema_set(i)||”’,’);
    dbms_output.put_line (‘ source_table => ”’||tabname_set(i)||”’,’);
    — This section puts in the column list
    declare
    colDDL varchar2(128);
    colList varchar2(12000);
    cursor colDDL_cur IS
    select column_name
    from dba_tab_columns
    where table_name = tabname_set(i) and owner = schema_set(i) order by column_id;

    begin
    open colDDL_cur;
    fetch colDDL_cur into colDDL;
    colList:=colDDL;
    loop
    fetch colDDL_cur into colDDL;
    exit when colDDL_cur%NOTFOUND;
    colList:=colList ||’, ‘|| colDDL;
    end loop;
    close colDDL_cur;
    dbms_output.put_line(‘ column_list => ”’|| colList||”’,’);
    dbms_output.put_line(‘ subscriber_view => ”’||vwname_set(i)||”’);’);
    end;
    dbms_output.put_line (‘END;’);
    dbms_output.put_line (‘/’);
    end loop;

    — Finally, activate subscription
    dbms_output.put_line (‘– Activate change data subscription’);
    dbms_output.put_line (‘BEGIN’);
    dbms_output.put_line (‘ DBMS_CDC_SUBSCRIBE.ACTIVATE_SUBSCRIPTION(‘);
    dbms_output.put_line (‘ subscription_name => ”APP_CHANGE_DATA”);’);
    dbms_output.put_line (‘END;’);
    dbms_output.put_line (‘/’);

    end;

    Comment by Peter Barnum — July 23, 2014 @ 2:51 pm | Reply


RSS feed for comments on this post. TrackBack URI

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Blog at WordPress.com.

%d bloggers like this: