Friday, August 15, 2008

Simple Oracle Streams Example

# 1. Create the strmadmin user on both databases.

CREATE TABLESPACE streams;
CREATE USER strmadmin IDENTIFIED BY strmadmin DEFAULT TABLESPACE streams TEMPORARY TABLESPACE temp QUOTA UNLIMITED ON streams;
GRANT CONNECT, RESOURCE, DBA TO strmadmin;
GRANT SELECT_CATALOG_ROLE TO strmadmin;
GRANT SELECT ANY DICTIONARY TO strmadmin;

# 2. Create the target table dwh.stream_test in both databases.

CREATE TABLESPACE dwh;
CREATE USER dwh IDENTIFIED BY dwh DEFAULT TABLESPACE dwh TEMPORARY TABLESPACE temp QUOTA UNLIMITED ON dwh;
GRANT create session, create table to dwh;
connect dwh/dwh
create table stream_test (test_id integer primary key, test varchar2(20));
ALTER TABLE stream_test ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE) COLUMNS;
grant all on stream_test to strmadmin;

# 3. Create the queue on both databases

connect strmadmin/strmadmin@stream1
BEGIN
DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE(
grantee => 'strmadmin',
grant_privileges => true);
END;
/
connect strmadmin/strmadmin@stream2
BEGIN
DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE(
grantee => 'strmadmin',
grant_privileges => true);
END;
/

# 4. create database links.

CONNECT strmadmin/strmadmin@stream1
EXEC DBMS_STREAMS_ADM.SET_UP_QUEUE();
CREATE DATABASE LINK stream2 CONNECT TO strmadmin IDENTIFIED BY strmadmin USING 'stream2';
CONNECT strmadmin/strmadmin@stream2
CREATE DATABASE LINK stream1 CONNECT TO strmadmin IDENTIFIED BY strmadmin USING 'stream1';

# 5. create the capture process

CONNECT strmadmin/strmadmin@stream1
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'dwh.stream_test',
streams_type => 'capture',
streams_name => 'capture_stream',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => true,
inclusion_rule => true);
END;
/

# 6. create the propagation process

CONNECT strmadmin/strmadmin@stream1
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES(
table_name => 'dwh.stream_test',
streams_name => 'STREAM1_to_STREAM2',
source_queue_name => 'strmadmin.streams_queue',
destination_queue_name => 'strmadmin.streams_queue@STREAM2',
include_dml => true,
include_ddl => true,
source_database => 'STREAM1',
inclusion_rule => true);
END;
/

# 7. set the instantiation number

CONNECT strmadmin/strmadmin@stream1
DECLARE
source_scn NUMBER;
BEGIN
source_scn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER();
DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN@STREAM2(
source_object_name => 'dwh.stream_test',
source_database_name => 'STREAM1',
instantiation_scn => source_scn);
END;
/

# 8. create the apply rule

CONNECT strmadmin/strmadmin@stream2
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'dwh.stream_test',
streams_type => 'apply',
streams_name => 'apply_stream',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => true,
source_database => 'STREAM1',
inclusion_rule => true);
END;
/

# 9. start the capture process

CONNECT strmadmin/strmadmin@stream1
BEGIN
DBMS_CAPTURE_ADM.START_CAPTURE(
capture_name => 'capture_stream');
END;
/

# 10. start the apply process

CONNECT strmadmin/strmadmin@stream2
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'apply_stream',
parameter => 'disable_on_error',
value => 'n');
END;
/
BEGIN
DBMS_APPLY_ADM.START_APPLY(
apply_name => 'apply_stream');
END;
/

# 11. create a test transaction

connect dwh/dwh@stream1
insert into stream_test values (1,'test1');

# 12. check if streams is working

connect dwh/dwh@stream2
select * from stream_test;