<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    Decode360's Blog

    業精于勤而荒于嬉 QQ:150355677 MSN:decode360@hotmail.com

      BlogJava :: 首頁 :: 新隨筆 :: 聯系 ::  :: 管理 ::
      397 隨筆 :: 33 文章 :: 29 評論 :: 0 Trackbacks
    Oracle 10g CDC
    ?

    ??? 從Oracle9i開始,Oracle引入了CDC技術來實現對變化數據的捕獲。在Oracle9i中CDC只支持同步的數據捕獲(synchronous change capture),源數據的變化被實時的捕獲,捕獲的過程和源數據是同一個事務。它的實現需要源數據支持trigger,所以這種同步的技術會給數據源帶來性能的問題。這是CDC在Oracle9i的一個缺陷(在Oracle10g中已經改進)

    ??? 在CDC中變化的數據被保存在一個變化表(change table)中,使用者通過訂閱的方式(subscribe)生成一個視圖(subscribe view)并且通過這個視圖來得到這些變化的數據,一個change table可以有多個訂閱者,也就可以有多個subscribe view。每個subscribe view可以從change table提取自己所關心的不同的columns 和rows。

    ??? 在Oracle10g中,CDC開始支持異步方式來捕獲變化的數據。在Oracle10g中利用redo log來實現CDC。Redo log是一個記錄所有數據庫變化的獨立的文件。CDC的異步方式是非入侵方式(noninvasive)的實現,對于變化數據的捕獲就不需要數據庫加入trigger了。
    ?
    ?
    Oracle的redo log有兩種類型:
    ?
    ● On-line redo log
    ● Archive redo log

    這種從redo log中讀取數據變化的方式叫做“日志挖掘”(mining the logs),這種方式使捕獲數據的操作從源數據的事務中分離出來,減輕了trigger技術給數據源帶來的性能問題(是減輕不是消除!)
    ?

    在Oracle10g中,異步CDC有兩種實現方式:
    ?
    ● HOTLOG
    ?
    這種方式利用On-line redo log。變化的數據從on-line redo log中獲取,然后通過(Oracle Streams技術)把獲取的變化保存在本地的變化表中(變化表還是在數據源上)。然后還需要其他方式把這些數據移到數據倉庫中。

    AUTOLOG
    ?
    這種方式與HOTLOG不同的是數據獲取的位置不同。AUTOLOG方式利用Log Transport Services技術在不同數據庫之間轉換數據時進行變化數據獲取操作的。如果目標數據庫正好是數據倉庫的話,這種方式就比HOTLOG方式減少了一次數據遷移的環節。
    ?
    Log Transport Services技術是標準的日志遷移技術,比如Oracle的Oracle Data Guard。
    ?

    Latency of change:
    ?
    從發生變化的數據源事務提交到利用CDC發現變化的數據,并且把變化的數據移植到變化表之間的時間間隔。HOTLOG方式的時間滯后要比AUTOLOG方式小。
    ?

    Oracle Streams

    Oracle Streams技術可以在多個數據庫之間進行數據備份,它通過在數據隊列里面記錄變化的數據。Oracle Streams技術也是利用數據庫的redo log來解析變化的數據的。Oracle Streams除了進行數據備份外,還可以用來作為數據倉庫的增量抽取。
    ?
    Oracle Streams與CDC技術比較,可以用一個形象的比喻來形容:
    ?
    把Oracle Streams比作磚、瓦;
    把CDC看作是用磚、瓦蓋成的大廈;
    ?
    Oracle Streams可以用來實現CDC,它們是輔助的關系,不是真正競爭的關系。
    ?
    ?
    ?
    ?
    ?
    ?
    ?
    上面是CDC的簡單介紹,下面轉一篇應用(但是怎么看都不太防舒服)
    *****************************************************************************************
    Oracle 10g CDC Test
    ?
    ?
    怎樣使用oracle 10G CDC(Capturing Changed Data)

    1)create the user on the source DB and target DB.

    sqlplus "/? as sysdba"
    connect system/manager

    set serveroutput on size 100000
    set linesize 2000
    create or replace package etl_util AUTHID CURRENT_USER
    IS
    ? FUNCTION get_col_names(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2;
    ? FUNCTION get_cols_definition(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2;
    ? FUNCTION get_modified_col_names(schema IN VARCHAR2, table_name IN VARCHAR2, source_colmap$ IN RAW)? RETURN VARCHAR2;
    END etl_util;
    /
    show errors
    ?
    create or replace package body etl_util
    IS
    -- ******************************************************************************
    -- PUBLIC FUNCTION get_col_names
    -- ******************************************************************************
    ? FUNCTION get_col_names(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2???
    ??? IS
    ? BEGIN
    ??? DECLARE
    ????? v_col_nm VARCHAR2(100);
    ????? v_ret VARCHAR2(32767);
    ????? v_work_str VARCHAR2(32767);
    ?
    ????? CURSOR c_tab_col(p_tbl_nm VARCHAR2, p_schm VARCHAR2) IS???????
    ??????????? SELECT column_name FROM all_tab_columns
    ??????????? WHERE owner = UPPER(TRIM(p_schm))
    ??????????? AND table_name = UPPER(TRIM(p_tbl_nm))
    ??????????? ORDER BY column_name;
    ?
    ? BEGIN
    ? OPEN c_tab_col(p_table_name, p_schema);
    ? v_ret := '';
    ? v_work_str := ','||LOWER(REPLACE(p_skip_columns,' ','')) || ',';
    ?
    ? LOOP
    ????? FETCH c_tab_col INTO v_col_nm;
    ????? EXIT WHEN c_tab_col%NOTFOUND;
    ??? IF INSTR(v_work_str,','||LOWER(v_col_nm)) = 0 THEN
    ??????? v_ret := v_ret || ',' || p_pref ||LOWER(v_col_nm);
    ??? END IF;
    ? END LOOP;
    ? CLOSE c_tab_col;
    ? IF (v_ret IS NOT NULL) THEN v_ret := SUBSTR(v_ret, 2);
    ? END IF;
    ? IF (v_ret IS NULL) THEN
    ????? raise_application_error(-20201,'Table ' || p_schema||'.'||p_table_name
    ??????? || ' Does not exist or has no columns');
    ? END IF;
    ?
    ? RETURN v_ret;
    ? END;
    ? END;
    -- ******************************************************************************
    -- END OF PUBLIC FUNCTION get_col_names
    -- ******************************************************************************
    ?
    -- ******************************************************************************
    -- PUBLIC FUNCTION get_cols_definition
    -- ******************************************************************************
    ? FUNCTION get_cols_definition(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2???
    ??? IS
    ? BEGIN
    ??? DECLARE
    ????? v_col_nm VARCHAR2(100);
    ????? v_col_def VARCHAR2(300);
    ????? v_ret VARCHAR2(32767);
    ????? v_work_str VARCHAR2(32767);
    ????? CURSOR c_tab_col(p_tbl_nm VARCHAR2, p_schm VARCHAR2) IS???????
    ??????? SELECT column_name,
    ??????????? RPAD(column_name,53) ||
    ??????? decode(data_type
    ??????????? ,'NUMBER','NUMBER('||TO_CHAR(data_precision)||
    ??????????????? decode(data_scale,0,'',','||data_scale)||')'
    ??????????? ,'CHAR','CHAR('||TO_CHAR(DATA_LENGTH)||')'
    ??????????? ,'VARCHAR2','VARCHAR2('||TO_CHAR(DATA_LENGTH)||')'
    ??????????? ,'DATE','DATE'
    ??????????? ,data_type) def
    ??????? from all_tab_columns
    ??????? where table_name = upper(trim(p_tbl_nm))
    ??????????? AND owner = upper(trim(p_schm))
    ??????? order by owner, table_name, column_name
    ??????? ;
    ? BEGIN
    ? OPEN c_tab_col(p_table_name, p_schema);
    ? v_ret := '';
    ? v_work_str := ','||LOWER(REPLACE(p_skip_columns,' ','')) || ',';
    ?
    ? LOOP
    ????? FETCH c_tab_col INTO v_col_nm, v_col_def;
    ????? EXIT WHEN c_tab_col%NOTFOUND;
    ??? IF INSTR(v_work_str,','||LOWER(v_col_nm)) = 0 THEN
    ??????? v_ret := v_ret || ', ' || p_pref ||LOWER(v_col_def);
    ??? END IF;
    ? END LOOP;
    ? CLOSE c_tab_col;
    ? IF (v_ret IS NOT NULL) THEN v_ret := SUBSTR(v_ret, 2); END IF;
    ? IF (v_ret IS NULL) THEN
    ????? raise_application_error(-20201,'Table ' || p_schema||'.'||p_table_name
    ??????? || ' Does not exist or has no columns');
    ? END IF;
    ?
    ? RETURN v_ret;
    ? exception
    ????????? when others
    ??????????? then dbms_output.put_line(SUBSTR('Error in get_cols_definition ' || sqlerrm,1,220));
    ? return 'error ' || sqlerrm ;
    ? END;
    ? END;
    -- ******************************************************************************
    -- END OF PUBLIC FUNCTION get_cols_definition
    -- ******************************************************************************
    ?
    -- ******************************************************************************
    -- PUBLIC FUNCTION get_modified_col_names
    -- ******************************************************************************
    ? FUNCTION get_modified_col_names(schema IN VARCHAR2, table_name IN VARCHAR2, source_colmap$ IN RAW) RETURN VARCHAR2
    ??? IS
    ? BEGIN
    ??? DECLARE
    ????? v_col_nm VARCHAR2(100);
    ????? v_col_pos number;
    ????? v_ret VARCHAR2(32767);
    ????? v_work_str VARCHAR2(32767);
    ??? v_byte_nr INTEGER;
    ??? v_col_map VARCHAR2(32767);
    ??? i INTEGER; j INTEGER;k INTEGER;
    ??? v_val VARCHAR2(5);
    ????? CURSOR c_tab_col(p_tbl_nm VARCHAR2, p_schm VARCHAR2) IS???????
    ??????? SELECT column_name, POWER(2,MOD(column_id,8)) col_pos, 1 + floor((column_id)/8) byte_nr
    ??????????? FROM all_tab_cols
    ??????????? WHERE table_name=UPPER(p_tbl_nm)
    ??????????????? AND OWNER=UPPER(p_schm)
    ??????????? ORDER BY column_id;???????
    ? BEGIN
    ????? SELECT dump(source_colmap$,10) INTO v_col_map FROM DUAL;
    ????? i:=INSTR(v_col_map,':');
    ????? v_col_map:=','||SUBSTR(v_col_map,i + 2)||',';
    --??? dbms_output.put_line('v_col_map='||v_col_map);
    ????? OPEN c_tab_col(table_name, schema);
    ????? v_ret := '';
    ????? LOOP
    --??? dbms_output.put_line('--1');
    ????????? FETCH c_tab_col INTO v_col_nm, v_col_pos, v_byte_nr;
    ????????? EXIT WHEN c_tab_col%NOTFOUND;
    --??? dbms_output.put_line('byte_nr='||to_char(v_byte_nr));
    ????????? i:=INSTR(v_col_map,',',1,v_byte_nr);
    ????????? j:=INSTR(v_col_map,',',1,v_byte_nr + 1);
    ????????? IF ((i=0) OR (j = 0)) THEN
    ????????????? EXIT;
    ????????? END IF;
    ????????? v_val := SUBSTR(v_col_map, i + 1, j - i - 1 );
    ????????? k := BITAND(TO_NUMBER(v_val),v_col_pos);
    ????????? IF (k>0) THEN
    ????????????? v_ret := v_ret || ',' || v_col_nm;
    ????????? END IF;
    --??? DBMS_OUTPUT.put_line('col='||v_col_nm ||' byte='||TO_CHAR(v_byte_nr)
    --????????? || ' i=' || TO_CHAR(i) || ' j='||TO_CHAR(j)
    --????????? ||' val='||v_val||' col_pos='||TO_CHAR(v_col_pos)||'res='||TO_CHAR(k));
    ????? END LOOP;
    ????? CLOSE c_tab_col;
    ????? IF (v_ret IS NOT NULL) THEN
    ????????? v_ret := SUBSTR(v_ret,2);
    ????? END IF;
    ????? RETURN v_ret;
    ? EXCEPTION
    ??????? WHEN OTHERS
    ??????????? THEN dbms_output.put_line(SUBSTR('Error in get_modifed_col_names: ',1,220));
    ??????????????? dbms_output.put_line(SUBSTR(sqlerrm,1,240));
    ????? return '';
    ? END;
    ? END;
    -- ******************************************************************************
    -- END OF PUBLIC FUNCTION get_modified_col_names
    -- ******************************************************************************
    ?
    END etl_util;
    /
    show errors

    grant execute on etl_util to public;
    DROP PUBLIC SYNONYM etl_util;
    CREATE PUBLIC SYNONYM etl_util FOR etl_util;
    --select etl_util.get_modified_col_names(schema1 => 'SCOTT'
    --,table_name => 'EMP'
    --,source_colmap$ => HEXTORAW('C000')) FROM dual;
    --
    BEGIN
    declare v_res VARCHAR2(1000);
    BEGIN
    ??? v_res := etl_util.get_modified_col_names('test', 'philip_test', HEXTORAW('0601'));
    ??? dbms_output.put_line('res='||v_res);
    END;
    END;
    /

    2)create the user

    connect system/manager
    DROP USER cdc_publisher CASCADE;
    CREATE USER cdc_publisher IDENTIFIED BY pass;
    GRANT EXECUTE_CATALOG_ROLE to cdc_publisher;
    GRANT SELECT_CATALOG_ROLE to cdc_publisher;
    ?
    3)grant the priviledge

    connect test/pass
    GRANT SELECT on PHILIP_TEST to cdc_publisher;
    GRANT SELECT on PHILIP_TEST2 to cdc_publisher;
    --the PHILIP_TEST2? is under? user?? "test"

    4)Create Change Tables
    --connect cdc_publisher/pass@sde223_dvweb
    set serveroutput on size 1000000
    set linesize 120
    ?
    /* Login as a publisher to run this */
    BEGIN
    ??? DECLARE work_sql VARCHAR2(32767);
    ??????? v_publisher_id VARCHAR2(20)? := 'cdc_publisher';
    ??????? v_source_schema VARCHAR2(20) := 'test';
    ??????? v_source_table VARCHAR2(100);
    ??????? v_cdc_table VARCHAR2(100);
    ??????? CURSOR c_tables IS
    ??????????? SELECT 'PHILIP_TEST' table_name FROM dual
    ??????????????? UNION
    ??????????? SELECT 'PHILIP_TEST2' table_name FROM dual
    ??????? ;
    BEGIN
    FOR tables IN c_tables LOOP
    ? v_source_table := tables.table_name;
    ? v_cdc_table := 'rep_'||v_source_table;
    ? work_sql := etl_util.get_cols_definition(
    ??????? p_schema=>v_source_schema,
    ??????? p_table_name =>v_source_table,
    ??????? p_skip_columns => NULL,
    ??????? p_pref => NULL);
    ??? DBMS_output.put_line('work_sql'||work_sql);
    ? DBMS_LOGMNR_CDC_PUBLISH.CREATE_CHANGE_TABLE (
    ??? OWNER => v_publisher_id, SOURCE_SCHEMA => v_source_schema
    ??? ,CHANGE_SET_NAME => 'SYNC_SET', CAPTURE_VALUES => 'new'--only Captures the changed values from the source table
    ??? ,RS_ID => 'n', ROW_ID => 'n', USER_ID => 'y', TIMESTAMP => 'y'
    ??? ,OBJECT_ID => 'n' -- leave it as 'N' or you will have "table has no columns" error
    ??? ,SOURCE_COLMAP => 'n', TARGET_COLMAP => 'n', OPTIONS_STRING => null
    ??? ,SOURCE_TABLE => v_source_table, CHANGE_TABLE_NAME => v_cdc_table
    ??? ,COLUMN_TYPE_LIST => work_sql);
    ? DBMS_output.put_line('Change table '||v_cdc_table ||' was created successfully');
    END LOOP;
    ?
    EXCEPTION
    ??? WHEN OTHERS THEN
    ??? DBMS_output.put_line('Error start ********************************************');
    ??? DBMS_output.put_line('Error during change table ' || v_cdc_table || ' creation:');
    ??? DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm);
    ??? DBMS_output.put_line('Error end? ********************************************');
    END;
    END;
    /

    5)when we change the "test.PHILIP_TEST" and "test.PHILIP_TEST2" ,under cdc_publisher , the table "cdc_philip_test will recorde this operation and the value at once.
    ?
    ?
    *****************************************************************************************
    ?
    ?
    ?
    ?
    ?
    另附一篇比較詳細的CDC原理研究:
    *****************************************************************************************
    oracle學習--CDC 研究(1)
    http://hi.baidu.com/ybgba/blog/item/898d93366f79c8d5a2cc2b01.html
    ?
    oracle學習--CDC 研究(2)
    http://hi.baidu.com/ybgba/blog/item/988d113d7c18ddcf9e3d6201.html
    ?
    ?
    ?
    posted on 2009-06-15 19:49 decode360 閱讀(1537) 評論(0)  編輯  收藏 所屬分類: 10.DB_Tools
    主站蜘蛛池模板: 最近中文字幕免费完整| 99久久免费观看| 日本免费v片一二三区| 亚洲欧美日韩自偷自拍| 日韩视频在线免费| 国产大陆亚洲精品国产| 亚洲日本中文字幕一区二区三区| 日韩一级片免费观看| 亚洲人妻av伦理| 在线观看免费视频网站色| 久久青青草原亚洲AV无码麻豆| 午夜不卡久久精品无码免费| 久久久久亚洲AV无码观看| 国产一卡二卡四卡免费| 亚洲欧美不卡高清在线| 免费看男女下面日出水视频| 国产高清视频免费在线观看| 亚洲处破女AV日韩精品| 国产成人精品免费午夜app| 亚洲中文字幕精品久久| 免费一级毛片在播放视频| 东北美女野外bbwbbw免费| 久久精品国产亚洲AV麻豆~| 亚洲毛片在线免费观看| 亚洲avav天堂av在线网毛片| 久久亚洲国产成人影院网站| 香港a毛片免费观看| 亚洲性色AV日韩在线观看| 免费在线观看a级毛片| 91视频免费网站| 亚洲日韩国产精品无码av| 国产在线观看免费完整版中文版 | 亚洲黄色免费观看| 亚洲午夜理论片在线观看| 亚洲精品美女久久久久99小说| 黄网站色视频免费在线观看的a站最新| 亚洲国产成人在线视频| 日韩免费观看视频| 无码A级毛片免费视频内谢| 亚洲精品天堂成人片AV在线播放| 国产亚洲色婷婷久久99精品91|