Test1
CREATE OR REPLACE PROCEDURE compare_tables_with_multi AS sql_stmt VARCHAR2(10000); merge_stmt CLOB; ip_merge_stmt_1 CLOB; ip_merge_stmt_2 CLOB; set_clause VARCHAR2(10000); insert_clause_dst VARCHAR2(10000); insert_clause_src VARCHAR2(10000); ip_tbl_last_update_col VARCHAR2(1000); ip_tbl_create_date_col VARCHAR2(1000); ip_tbl_primary_col VARCHAR2(1000); resultset tbl_nmbr; insert_rows_affected NUMBER(10); v_merge_count NUMBER := 0; chunk_sql VARCHAR2(4000); task_name VARCHAR2(100); L_TRY NUMBER(6); L_STATUS NUMBER(6); chunk_size NUMBER(10) := 20000; CURSOR cur_tables IS SELECT * FROM temp_date_col_mapper WHERE table_name IN ( SELECT table_name FROM leaf_nodes WHERE table_name IN ('UD_MPI_MPG_ID') ); BEGIN FOR table_rec IN cur_tables LOOP BEGIN -- Get table metadata SELECT last_update_date, create_date, primary_key, merge_stmt_1, merge_stmt_2 INTO ip_tbl_last_update_col, ip_tbl_create_date_col, ip_tbl_primary_col, ip_merge_stmt_1, ip_merge_stmt_2 FROM temp_date_col_mapper WHERE table_name = table_rec.table_name; -- Define task name task_name := 'COMPARE_' || table_rec.TABLE_NAME || '_' || TO_CHAR(SYSDATE, 'YYYYMMDDHH24MISS'); -- Create task DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => task_name); -- Chunk SQL based on ROW_NUMBER() chunk_sql := 'SELECT rn AS start_id, rn + ' || chunk_size || ' - 1 AS end_id ' || 'FROM (SELECT ROW_NUMBER() OVER (ORDER BY ' || ip_tbl_primary_col || ') AS rn ' || 'FROM cr_s_0_x.' || table_rec.TABLE_NAME || ') ' || 'WHERE MOD(rn - 1, ' || chunk_size || ') = 0'; -- Create chunks DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL( task_name => task_name, sql_stmt => chunk_sql, by_rowid => FALSE ); -- Define MERGE dynamic block that prints merged count merge_stmt := ' DECLARE v_cnt NUMBER; BEGIN ' || ip_merge_stmt_1 || ' WHERE rn BETWEEN :start_id AND :end_id; ' || ' v_cnt := SQL%ROWCOUNT; DBMS_OUTPUT.PUT_LINE(''Task: ' || ''' || task_name || ''' || ', Chunk Start ID: '' || :start_id || '', Rows Merged: '' || v_cnt); COMMIT; END;'; -- Run merge in parallel DBMS_PARALLEL_EXECUTE.RUN_TASK( task_name => task_name, sql_stmt => merge_stmt, language_flag => DBMS_SQL.NATIVE, parallel_level => 8 ); -- Handle retries if needed L_TRY := 0; L_STATUS := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name); WHILE L_TRY < 2 AND L_STATUS != DBMS_PARALLEL_EXECUTE.FINISHED LOOP DBMS_OUTPUT.PUT_LINE('Retrying task: ' || task_name); L_TRY := L_TRY + 1; DBMS_PARALLEL_EXECUTE.RESUME_TASK(task_name); L_STATUS := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name); END LOOP; -- Clean up DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name); EXCEPTION WHEN OTHERS THEN DBMS_OUTPUT.PUT_LINE('Error processing table: ' || table_rec.TABLE_NAME); DBMS_OUTPUT.PUT_LINE('Error: ' || SQLERRM); END; END LOOP; END;

CREATE OR REPLACE PROCEDURE compare_tables_with_multi AS
sql_stmt VARCHAR2(10000);
merge_stmt CLOB;
ip_merge_stmt_1 CLOB;
ip_merge_stmt_2 CLOB;
set_clause VARCHAR2(10000);
insert_clause_dst VARCHAR2(10000);
insert_clause_src VARCHAR2(10000);
ip_tbl_last_update_col VARCHAR2(1000);
ip_tbl_create_date_col VARCHAR2(1000);
ip_tbl_primary_col VARCHAR2(1000);
resultset tbl_nmbr;
insert_rows_affected NUMBER(10);
v_merge_count NUMBER := 0;
chunk_sql VARCHAR2(4000);
task_name VARCHAR2(100);
L_TRY NUMBER(6);
L_STATUS NUMBER(6);
chunk_size NUMBER(10) := 20000;
CURSOR cur_tables IS
SELECT *
FROM temp_date_col_mapper
WHERE table_name IN (
SELECT table_name
FROM leaf_nodes
WHERE table_name IN ('UD_MPI_MPG_ID')
);
BEGIN
FOR table_rec IN cur_tables LOOP
BEGIN
-- Get table metadata
SELECT last_update_date, create_date, primary_key, merge_stmt_1, merge_stmt_2
INTO ip_tbl_last_update_col, ip_tbl_create_date_col, ip_tbl_primary_col, ip_merge_stmt_1, ip_merge_stmt_2
FROM temp_date_col_mapper
WHERE table_name = table_rec.table_name;
-- Define task name
task_name := 'COMPARE_' || table_rec.TABLE_NAME || '_' || TO_CHAR(SYSDATE, 'YYYYMMDDHH24MISS');
-- Create task
DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => task_name);
-- Chunk SQL based on ROW_NUMBER()
chunk_sql := 'SELECT rn AS start_id, rn + ' || chunk_size || ' - 1 AS end_id ' ||
'FROM (SELECT ROW_NUMBER() OVER (ORDER BY ' || ip_tbl_primary_col || ') AS rn ' ||
'FROM cr_s_0_x.' || table_rec.TABLE_NAME || ') ' ||
'WHERE MOD(rn - 1, ' || chunk_size || ') = 0';
-- Create chunks
DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(
task_name => task_name,
sql_stmt => chunk_sql,
by_rowid => FALSE
);
-- Define MERGE dynamic block that prints merged count
merge_stmt := '
DECLARE
v_cnt NUMBER;
BEGIN
' || ip_merge_stmt_1 || ' WHERE rn BETWEEN :start_id AND :end_id; ' || '
v_cnt := SQL%ROWCOUNT;
DBMS_OUTPUT.PUT_LINE(''Task: ' || ''' || task_name || ''' || ', Chunk Start ID: '' || :start_id || '', Rows Merged: '' || v_cnt);
COMMIT;
END;';
-- Run merge in parallel
DBMS_PARALLEL_EXECUTE.RUN_TASK(
task_name => task_name,
sql_stmt => merge_stmt,
language_flag => DBMS_SQL.NATIVE,
parallel_level => 8
);
-- Handle retries if needed
L_TRY := 0;
L_STATUS := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name);
WHILE L_TRY < 2 AND L_STATUS != DBMS_PARALLEL_EXECUTE.FINISHED LOOP
DBMS_OUTPUT.PUT_LINE('Retrying task: ' || task_name);
L_TRY := L_TRY + 1;
DBMS_PARALLEL_EXECUTE.RESUME_TASK(task_name);
L_STATUS := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name);
END LOOP;
-- Clean up
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name);
EXCEPTION
WHEN OTHERS THEN
DBMS_OUTPUT.PUT_LINE('Error processing table: ' || table_rec.TABLE_NAME);
DBMS_OUTPUT.PUT_LINE('Error: ' || SQLERRM);
END;
END LOOP;
END;