Problem Statement In many data-driven environments, we often receive data in CSV format from upstream systems. The challenge lies not just in loading this data into a database, but doing so intelligently:
How can we insert new records without duplicating existing ones? How do we handle updates to existing records? How can we track what changed in each load? Can we maintain logs for auditing? This sounds simple — but if not handled properly, it can lead to duplicate data, inconsistent reports, and untraceable changes.
Different Approaches Explored
Let’s walk through a few conventional approaches and their trade-offs:
Approach 1: Direct Insert into Main Table Pros: Very simple and fast to implement. Cons: Cannot detect duplicates. Cannot update existing records. No logging or traceability. Approach 2: Truncate & Reload Pros: Easy to implement. Always has the latest data. Cons: Risk of data loss if the file is incomplete or corrupted. No history or audit trail. Cannot be used for transactional or incremental systems. Approach 3: Row-by-Row UPSERT Logic in Python Pros: Accurate handling of insert/update logic. Cons: Very slow for large files (due to per-row SQL). Error-prone with schema mismatches. Difficult to maintain. Final Approach (My Solution): Staging → Merge → Log After exploring the above methods, I designed a robust, scalable, and auditable solution using Python and Oracle SQL. Here's how it works:
Key Features of My Approach
Staging Table: Load data into a temporary (staging) table first. MERGE Statement: Compare staging and main table using MERGE (UPSERT) logic. Audit Logging: Insert metadata into a log table (RECORD_HIST_TEST) after each run. Unique Key Matching: Ensures updates are only done where records match. Timestamped Logging: Every load is traceable by datetime and filename. Step 1: Read CSV & Normalize df = pd.read_csv(csv_file_path, encoding='latin1') df.columns = [col.upper().strip() for col in df.columns] Ensures column names are uppercase (Oracle is case-insensitive).
Step 2: Insert into Staging Table insert_sql = f'INSERT INTO {staging_table} (...) VALUES (...)' cur.executemany(insert_sql, data) Efficiently loads all data into a staging table.
Step 3: Merge into Main Table MERGE INTO PATIENT_VISITS tgt USING STG_PATIENT_VISITS stg ON (tgt.VISIT_ID = stg.VISIT_ID) WHEN MATCHED THEN UPDATE SET ... WHEN NOT MATCHED THEN INSERT ... This SQL handles both updates and inserts. Ensures only new or updated records are touched.
Step 4: Insert into Log Table INSERT INTO RECORD_HIST_TEST (table_name, schema_name, row_count, run_time, refresh_flag, new_records_count, file_name) VALUES (:1, :2, :3, :4, :5, :6, :7) Compares previous row count. Logs number of new records, timestamp, file name, and refresh flag.
Conclusion While inserting data from CSV into a database might seem simple at first, doing it right requires handling edge cases like duplicates, updates, and change tracking. By building a staging + merge + log pipeline in Python, we create a reliable, auditable, and scalable data ingestion framework.
Full Python Script:
import pandas as pd from db_config import conn from datetime import datetime import os
# Configuration csv_file_path = 'csvfiles/patient_visits.csv' schema_name = 'HEALTH_MART' table_name = 'PATIENT_VISITS' staging_table = 'STG_PATIENT_VISITS' unique_keys = ['VISIT_ID'] # list of unique keys run_time = datetime.now() timestamped_message = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
# Helper Functions def table_exists(conn, schema, table): cur = conn.cursor() query = """ SELECT COUNT(*) FROM all_tables WHERE owner = :1 AND table_name = :2 """ cur.execute(query, (schema.upper(), table.upper())) exists = cur.fetchone()[0] > 0 cur.close() return exists
def get_table_data(conn, table): cur = conn.cursor() query = f"SELECT COUNT(*) FROM {table}" cur.execute(query) count = cur.fetchone()[0] cur.close() return count
def read_csv_file(csv_file_path): print(f"{timestamped_message} - Reading CSV file") df = pd.read_csv(csv_file_path, encoding='latin1') df.columns = [col.upper().strip() for col in df.columns] return df
def insert_or_merge_data(df, conn, schema, table, staging_table, unique_keys): cur = conn.cursor() df.columns = [col.upper().strip() for col in df.columns] unique_keys = [key.upper().strip() for key in unique_keys] columns = df.columns.tolist()
# Insert into staging table placeholders = ', '.join([f':{i+1}' for i in range(len(columns))]) col_names = ', '.join([f'"{col}"' for col in columns]) insert_sql = f'INSERT INTO {staging_table} ({col_names}) VALUES ({placeholders})' data = [tuple(row) for row in df.itertuples(index=False, name=None)]
print(f"{timestamped_message} - Inserting {len(data)} rows into staging table {staging_table}") cur.executemany(insert_sql, data)
# Count new records in staging not in main table count_new_sql = f""" SELECT COUNT(*) FROM {staging_table} stg WHERE NOT EXISTS ( SELECT 1 FROM {schema}.{table} tgt WHERE {' AND '.join([f'tgt."{key}" = stg."{key}"' for key in unique_keys])} ) """ cur.execute(count_new_sql) new_records_count = cur.fetchone()[0] print(f"{timestamped_message} - Number of new records: {new_records_count}")
# Merge data from staging to main table merge_on = ' AND '.join([f'tgt."{key}" = stg."{key}"' for key in unique_keys]) update_set = ', '.join([f'tgt."{col}" = stg."{col}"' for col in columns if col not in unique_keys]) insert_cols = ', '.join([f'"{col}"' for col in columns]) insert_vals = ', '.join([f'stg."{col}"' for col in columns])
merge_sql = f""" MERGE INTO {schema}.{table} tgt USING {staging_table} stg ON ({merge_on}) WHEN MATCHED THEN UPDATE SET {update_set} WHEN NOT MATCHED THEN INSERT ({insert_cols}) VALUES ({insert_vals}) """
print(f"{timestamped_message} - Merging data into {schema}.{table}") cur.execute(merge_sql)
conn.commit() cur.close() print(f"{timestamped_message} - Merge completed successfully") return new_records_count
def insert_log(conn, table_name, schema_name, row_count, run_time, new_records_count, file_name): cur = conn.cursor() select_query = """ SELECT row_count FROM RECORD_HIST_TEST WHERE table_name = :1 AND schema_name = :2 ORDER BY run_time DESC FETCH FIRST 1 ROWS ONLY """ cur.execute(select_query, (table_name, schema_name)) previous = cur.fetchone()
if previous and previous[0] == row_count: refresh_flag = 'N' else: refresh_flag = 'Y'
insert_query = """ INSERT INTO RECORD_HIST_TEST (table_name, schema_name, row_count, run_time, refresh_flag, new_records_count, file_name) VALUES (:1, :2, :3, :4, :5, :6, :7) """ cur.execute(insert_query, (table_name, schema_name, row_count, run_time, refresh_flag, new_records_count, file_name)) conn.commit() cur.close()
# Main execution if not table_exists(conn, schema_name, table_name) or not os.path.exists(csv_file_path): print(f"{timestamped_message} - Table {table_name} or CSV file not found in schema {schema_name}") else: print(f"{timestamped_message} - Table {table_name} found in schema {schema_name}") df = read_csv_file(csv_file_path) file_name = os.path.basename(csv_file_path)
if not table_exists(conn, schema_name, staging_table): print(f"{timestamped_message} - Staging table {staging_table} does not exist. Please create it first.") else: new_records_count = insert_or_merge_data(df, conn, schema_name, table_name, staging_table, unique_keys) count = get_table_data(conn, table_name) print(f"{timestamped_message} - Total rows in {table_name}: {count}") insert_log(conn, table_name, schema_name, count, run_time, new_records_count, file_name) print(f"{timestamped_message} - Log inserted. New records: {new_records_count}")
conn.close()
