Ingesting a folder of split CSVs from database table.

October 31, 2023

Using Pandas and xargs we import a table exported as multiple csvs in parallel to multiple tables before rejoining them using a union.

At work I had to import a bunch of tables that ended up being about 98 million rows, the client had given them to us as a compressed 7zip file. I provisioned a pretty chunky VM and use the following to upload them into our Postgresql database.

Here is the script, it's limited to a single thread so I was having the CSV's already split up actually sort of helped here.

import sys, getopt, os
import pandas as pd
from sqlalchemy import create_engine

database_url = 'postgresql://user:pass@test:5432/db_name'

def main(csv_file_path):
   # Create a database engine using SQLAlchemy
   engine = create_engine(database_url)
   # Read the CSV file into a pandas DataFrame
   df = pd.read_csv(csv_file_path)
   head, csv_file = os.path.split(csv_file_path)
   print (f'Processing file {csv_file}')
   # Infer column types and create the table in PostgreSQL
   df.to_sql(name=csv_file[:-4], con=engine, index=False, if_exists='replace', chunksize=1000, method="multi")

if __name__ == "__main__":
   csv_file = sys.argv[1:][0]
   main(csv_file)

With this I could run this script to call the python script for each file in the extracted CSV directory. The '-P' argument should be the max CPU cores you have and will limit the active amount of jobs.

ls ../EXTRACTED_CSV_FOLDER/*.csv | xargs -I {} -P 16 python single_file.py {}

Lastly in the database we can join the tables into one with the following

REATE TABLE new_table as
  SELECT * FROM table1
    UNION
  SELECT * FROM table2;