Ingesting a folder of split CSVs from database table.
Using Pandas and xargs we import a table exported as multiple csvs in parallel to multiple tables before rejoining them using a union.
At work I had to import a bunch of tables that ended up being about 98 million rows, the client had given them to us as a compressed 7zip file. I provisioned a pretty chunky VM and use the following to upload them into our Postgresql database.
Here is the script, it’s limited to a single thread so I was having the CSV’s already split up actually sort of helped here.
import sys, getopt, os
import pandas as pd
from sqlalchemy import create_engine
database_url = 'postgresql://user:pass@test:5432/db_name'
def main(csv_file_path):
# Create a database engine using SQLAlchemy
engine = create_engine(database_url)
# Read the CSV file into a pandas DataFrame
df = pd.read_csv(csv_file_path)
head, csv_file = os.path.split(csv_file_path)
print (f'Processing file {csv_file}')
# Infer column types and create the table in PostgreSQL
df.to_sql(name=csv_file[:-4], con=engine, index=False, if_exists='replace', chunksize=1000, method="multi")
if __name__ == "__main__":
csv_file = sys.argv[1:][0]
main(csv_file)
With this I could run this script to call the python script for each file in the extracted CSV directory. The ‘-P’ argument should be the max CPU cores you have and will limit the active amount of jobs.
ls ../EXTRACTED_CSV_FOLDER/*.csv | xargs -I {} -P 16 python single_file.py {}
Lastly in the database we can join the tables into one with the following
REATE TABLE new_table as
SELECT * FROM table1
UNION
SELECT * FROM table2;