Test1

CREATE OR REPLACE PROCEDURE compare_tables_with_multi AS sql_stmt VARCHAR2(10000); merge_stmt CLOB; ip_merge_stmt_1 CLOB; ip_merge_stmt_2 CLOB; set_clause VARCHAR2(10000); insert_clause_dst VARCHAR2(10000); insert_clause_src VARCHAR2(10000); ip_tbl_last_update_col VARCHAR2(1000); ip_tbl_create_date_col VARCHAR2(1000); ip_tbl_primary_col VARCHAR2(1000); resultset tbl_nmbr; insert_rows_affected NUMBER(10); v_merge_count NUMBER := 0; chunk_sql VARCHAR2(4000); task_name VARCHAR2(100); L_TRY NUMBER(6); L_STATUS NUMBER(6); chunk_size NUMBER(10) := 20000; CURSOR cur_tables IS SELECT * FROM temp_date_col_mapper WHERE table_name IN ( SELECT table_name FROM leaf_nodes WHERE table_name IN ('UD_MPI_MPG_ID') ); BEGIN FOR table_rec IN cur_tables LOOP BEGIN -- Get table metadata SELECT last_update_date, create_date, primary_key, merge_stmt_1, merge_stmt_2 INTO ip_tbl_last_update_col, ip_tbl_create_date_col, ip_tbl_primary_col, ip_merge_stmt_1, ip_merge_stmt_2 FROM temp_date_col_mapper WHERE table_name = table_rec.table_name; -- Define task name task_name := 'COMPARE_' || table_rec.TABLE_NAME || '_' || TO_CHAR(SYSDATE, 'YYYYMMDDHH24MISS'); -- Create task DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => task_name); -- Chunk SQL based on ROW_NUMBER() chunk_sql := 'SELECT rn AS start_id, rn + ' || chunk_size || ' - 1 AS end_id ' || 'FROM (SELECT ROW_NUMBER() OVER (ORDER BY ' || ip_tbl_primary_col || ') AS rn ' || 'FROM cr_s_0_x.' || table_rec.TABLE_NAME || ') ' || 'WHERE MOD(rn - 1, ' || chunk_size || ') = 0'; -- Create chunks DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL( task_name => task_name, sql_stmt => chunk_sql, by_rowid => FALSE ); -- Define MERGE dynamic block that prints merged count merge_stmt := ' DECLARE v_cnt NUMBER; BEGIN ' || ip_merge_stmt_1 || ' WHERE rn BETWEEN :start_id AND :end_id; ' || ' v_cnt := SQL%ROWCOUNT; DBMS_OUTPUT.PUT_LINE(''Task: ' || ''' || task_name || ''' || ', Chunk Start ID: '' || :start_id || '', Rows Merged: '' || v_cnt); COMMIT; END;'; -- Run merge in parallel DBMS_PARALLEL_EXECUTE.RUN_TASK( task_name => task_name, sql_stmt => merge_stmt, language_flag => DBMS_SQL.NATIVE, parallel_level => 8 ); -- Handle retries if needed L_TRY := 0; L_STATUS := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name); WHILE L_TRY < 2 AND L_STATUS != DBMS_PARALLEL_EXECUTE.FINISHED LOOP DBMS_OUTPUT.PUT_LINE('Retrying task: ' || task_name); L_TRY := L_TRY + 1; DBMS_PARALLEL_EXECUTE.RESUME_TASK(task_name); L_STATUS := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name); END LOOP; -- Clean up DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name); EXCEPTION WHEN OTHERS THEN DBMS_OUTPUT.PUT_LINE('Error processing table: ' || table_rec.TABLE_NAME); DBMS_OUTPUT.PUT_LINE('Error: ' || SQLERRM); END; END LOOP; END;

Apr 23, 2025 - 11:44
 0
Test1

CREATE OR REPLACE PROCEDURE compare_tables_with_multi AS
    sql_stmt                VARCHAR2(10000);
    merge_stmt              CLOB;
    ip_merge_stmt_1         CLOB;
    ip_merge_stmt_2         CLOB;
    set_clause              VARCHAR2(10000);
    insert_clause_dst       VARCHAR2(10000);
    insert_clause_src       VARCHAR2(10000);
    ip_tbl_last_update_col  VARCHAR2(1000);
    ip_tbl_create_date_col  VARCHAR2(1000);
    ip_tbl_primary_col      VARCHAR2(1000);
    resultset               tbl_nmbr;
    insert_rows_affected    NUMBER(10);
    v_merge_count           NUMBER := 0;

    chunk_sql               VARCHAR2(4000);
    task_name               VARCHAR2(100);
    L_TRY                   NUMBER(6);
    L_STATUS                NUMBER(6);
    chunk_size              NUMBER(10) := 20000;

    CURSOR cur_tables IS
        SELECT *
        FROM temp_date_col_mapper
        WHERE table_name IN (
            SELECT table_name
            FROM leaf_nodes
            WHERE table_name IN ('UD_MPI_MPG_ID')
        );
BEGIN
    FOR table_rec IN cur_tables LOOP
        BEGIN
            -- Get table metadata
            SELECT last_update_date, create_date, primary_key, merge_stmt_1, merge_stmt_2
            INTO ip_tbl_last_update_col, ip_tbl_create_date_col, ip_tbl_primary_col, ip_merge_stmt_1, ip_merge_stmt_2
            FROM temp_date_col_mapper
            WHERE table_name = table_rec.table_name;

            -- Define task name
            task_name := 'COMPARE_' || table_rec.TABLE_NAME || '_' || TO_CHAR(SYSDATE, 'YYYYMMDDHH24MISS');

            -- Create task
            DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => task_name);

            -- Chunk SQL based on ROW_NUMBER()
            chunk_sql := 'SELECT rn AS start_id, rn + ' || chunk_size || ' - 1 AS end_id ' ||
                         'FROM (SELECT ROW_NUMBER() OVER (ORDER BY ' || ip_tbl_primary_col || ') AS rn ' ||
                         'FROM cr_s_0_x.' || table_rec.TABLE_NAME || ') ' ||
                         'WHERE MOD(rn - 1, ' || chunk_size || ') = 0';

            -- Create chunks
            DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(
                task_name => task_name,
                sql_stmt  => chunk_sql,
                by_rowid  => FALSE
            );

            -- Define MERGE dynamic block that prints merged count
            merge_stmt := '
            DECLARE
                v_cnt NUMBER;
            BEGIN
                ' || ip_merge_stmt_1 || ' WHERE rn BETWEEN :start_id AND :end_id; ' || '
                v_cnt := SQL%ROWCOUNT;
                DBMS_OUTPUT.PUT_LINE(''Task: ' || ''' || task_name || ''' || ', Chunk Start ID: '' || :start_id || '', Rows Merged: '' || v_cnt);
                COMMIT;
            END;';

            -- Run merge in parallel
            DBMS_PARALLEL_EXECUTE.RUN_TASK(
                task_name      => task_name,
                sql_stmt       => merge_stmt,
                language_flag  => DBMS_SQL.NATIVE,
                parallel_level => 8
            );

            -- Handle retries if needed
            L_TRY := 0;
            L_STATUS := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name);

            WHILE L_TRY < 2 AND L_STATUS != DBMS_PARALLEL_EXECUTE.FINISHED LOOP
                DBMS_OUTPUT.PUT_LINE('Retrying task: ' || task_name);
                L_TRY := L_TRY + 1;
                DBMS_PARALLEL_EXECUTE.RESUME_TASK(task_name);
                L_STATUS := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name);
            END LOOP;

            -- Clean up
            DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name);

        EXCEPTION
            WHEN OTHERS THEN
                DBMS_OUTPUT.PUT_LINE('Error processing table: ' || table_rec.TABLE_NAME);
                DBMS_OUTPUT.PUT_LINE('Error: ' || SQLERRM);
        END;
    END LOOP;
END;