Want to share your content on python-bloggers? click here.
If you research how to connect to a database from Python, many examples use the pyodbc library, which, aptly named, creates a connection to any ODBC-compatible database. However, connections with pyodbc itself are uni-directional: Data can be retrieved, but it cannot be uploaded into the database. To allow for simple, bi-directional database transactions, we use pyodbc along with sqlalchemy, a Python SQL toolkit and Object Relational Mapper that gives application developers the full power and flexibility of SQL. With pyodbc and sqlalchemy together, it becomes possible to retrieve and upload data from Pandas DataFrames with relative ease. Let’s assume we’re interested in connecting to a SQL Server database on some server. A connection using sqlalchemy is created as follows:
""" Creating a database connection with sqlalchemy. """ import pandas as pd import sqlalchemy DRIVER = "SQL Server" SERVER = "SERVER" DATABASE = "DATABASE" # Create connection uri. conn_uri = f"mssql+pyodbc://{SERVER}/{DATABASE}?driver={DRIVER}".replace(" ", "+") # Initialize connection. conn = sqlalchemy.create_engine(conn_uri)
A few points to highlight:
-
conn_uri
is a string that contains information needed to connect to our database. The prefixmssql+pyodbc://
indicates that we’re targeting a SQL Server database via the pyodbc connector. Also, if we weren’t using Windows authentication, or were working with a different RDBMS, it would be necessary to changeconn_uri
. For example, an Oracle connection uri would be specified asoracle://[USERNAME]:[PASSWORD]@[DATABASE]
. -
Also in
conn_uri
, within the format substitution, whitespace inDRIVER
is replaced with+
. This is consistent with how whitespace is encoded for web addresses.
Next, to query the French Motor Third-Party Liability Claims sample dataset in the table SAMPLE_FREMTPL, use the read_sql
function. I’ve included the connection initialization logic for convenience:
""" Reading database data into Pandas DataFrame. """ import pandas as pd import sqlalchemy DRIVER = "SQL Server" SERVER = "SERVER" DATABASE = "DATABASE" # Create connection uri. conn_uri = f"mssql+pyodbc://{SERVER}/{DATABASE}?driver={DRIVER}".replace(" ", "+") # Initialize connection. conn = sqlalchemy.create_engine(conn_uri) # Create query. SQL = "SELECT * FROM SAMPLE_TABLE" df = pd.read_sql(SQL, con=conn)
Instead of passing a query to pd.read_sql
, the tablename could have been provided. pd.read_sql
is convenience wrapper around read_sql_table
and read_sql_query
which will delegate to the specific function depending on the input (dispatches read_sql_table
if input is a tablename, read_sql_query
if input is a query). Refer to the documentation for more information.
Let’s assume SAMPLE_TABLE represents the French Motor Third-Party Liability Claims dataset available here. Inspecting the first 10 records of the dataset yields:
IDPOL CLAIMNB EXPOSURE AREA VEHPOWER VEHAGE DRIVAGE BONUSMALUS VEHBRAND VEHGAS DENSITY REGION 0 1290 1 0.66000 'B' 7 0 28 60 'B12' 'Regular' 52 'R72' 1 1292 1 0.12000 'B' 7 0 28 60 'B12' 'Regular' 52 'R72' 2 1295 1 0.08000 'E' 5 0 36 50 'B12' 'Regular' 3274 'R11' 3 1296 1 0.50000 'E' 5 0 36 50 'B12' 'Regular' 3274 'R11' 4 1297 1 0.20000 'E' 5 0 36 50 'B12' 'Regular' 3274 'R11' 5 1299 1 0.74000 'D' 6 0 76 50 'B12' 'Regular' 543 'R91' 6 1301 1 0.05000 'D' 6 0 76 50 'B12' 'Regular' 543 'R91' 7 1303 1 0.03000 'B' 11 0 39 50 'B12' 'Diesel' 55 'R52' 8 1304 1 0.76000 'B' 11 0 39 50 'B12' 'Diesel' 55 'R52' 9 1306 1 0.49000 'E' 10 0 38 50 'B12' 'Regular' 2715 'R93'
Iterative Data Retrieval
When working with large datasets, it may be inefficient to retrieve the entire dataset in a single pass. Pandas provides functionality to retrieve data in chunksize
-record blocks, which can result in significant speedups. In the following example, the same French Motor Third-Party Liability Claims sample dataset is retrieved in 20,000-record blocks. The only change in the call to read_sql
is the inclusion of chunksize
, which specifies the maximum number of records to retrieve for a given iteration. We assume conn
has already been initialized:
""" Using `read_sql`'s *chunksize* parameter for iterative retrieval. """ CHUNKSIZE = 20000 SQL = "SELECT * FROM SAMPLE_TABLE" dfiter = pd.read_sql(SQL, con=conn, chunksize=CHUNKSIZE) df = pd.concat([dd for dd in dfiter])
CHUNKSIZE
specifies the maximum number of records to retrieve at each iteration.dfiter
is a reference to the data targeted in our query.dfiter
is not a DataFrame, rather it is a generator, a Python object which makes it easy to create iterators. Generators yield values lazily, so they are particularly memory efficient.df = pd.concat([dd for dd in dfiter])
can be decomposed into two parts: First,[dd for dd in dfiter]
is a list comprehension, a very powerful tool that works similar to a flattened for loop. If we bound[dd for dd in dfiter]
to a variable directly, the result would be a list of 34 DataFrames, each having no more than 20,000 records. Second,pd.concat
takes the list of DataFrames, and performs a row-wise concatenation of each DataFrame, resulting in a single DataFrame with 678,013 records.pd.concat
is akin to the SQLUNION
operator. The final result,df
, is a DataFrame having 678,013 rows and 12 columns.
Exporting Results to File
Instead of reading the data into memory, it may be necessary to retrieve the dataset, then write the results to file for later analysis. This can be accomplished in an iterative fashion so that no more than CHUNKSIZE
records are in-memory at any point in time. Results will be saved to .csv in a file named "FREMTPL.csv"
in 100,000 record blocks:
""" Writing queried results to file. """ import time CHUNKSIZE = 100000 CSV_PATH = "FREMTPL.csv" SQL = "SELECT * FROM SAMPLE_TABLE" dfiter = pd.read_sql(SQL, conn, chunksize=CHUNKSIZE) t_i = time.time() trkr, nbrrecs = 0, 0 with open(CSV_PATH, "w", encoding="utf-8", newline="") as fcsv: for df in dfiter: fcsv.write(df.to_csv(header=nbrrecs==0, index=False, mode="a")) nbrrecs+=df.shape[0] print("Retrieved records {}-{}".format((trkr * CHUNKSIZE) + 1, nbrrecs)) trkr+=1 t_tot = time.time() - t_i retrieval_rate = nbrrecs / t_tot print( f"Retrieved {nbrrecs} records in {t_tot:.0f} seconds ({retrieval_rate:.0f} recs/sec.)." )
Executing the code above produces the following output:
Retrieved records 1-100000 Retrieved records 100001-200000 Retrieved records 200001-300000 Retrieved records 300001-400000 Retrieved records 400001-500000 Retrieved records 500001-600000 Retrieved records 600001-678013 Retrieved 678013 records in 20 seconds (33370 recs/sec.).
Exporting Data
In order to export a DataFrame into a database, we leverage the DataFrame’s to_sql
method. We provide the name of the table we wish to upload data into, along with a connection object, and what action to take if the table already exists. if_exists
can be one of:
-
“fail”: Raise a
ValueError
. -
“replace”: Drop the table before inserting new values.
-
“append”: Insert new values to the existing table.
As a simple transformation, we determine aggregate EXPOSURE by AREA, append a timestamp, then export the result as “SAMPLE_AREA_SUMM”. If the table exists, we want the query to fail:
""" Summary of aggregate EXPOSURE by AREA based on the French Motor Third-Party Liability Claims sample dataset. """ import datetime # Compute aggregate EXPOSURE by AREA. dfsumm = df.groupby("AREA", as_index=False)["EXPOSURE"].sum() # Append timestamp. timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") dfsumm["TIMESTAMP"] = timestamp # Export results. dfsumm.to_sql("SAMPLE_AREA_SUMM", con=conn, if_exists="fail")
If the table already exists, an error like the following will be generated:
ValueError: Table 'SAMPLE_AREA_SUMM' already exists.
Otherwise, no output will be generated.
Writing Queried Data to Compressed Format
Next we demonstrate how data can be queried iteratively and written directly to a compressed file format. This is especially useful when working with very large datasets, or when the data exceeds available system resources. Another reason to save datasets in compressed format is that Pandas can read compressed files just as easily as csvs. Once read into memory, the dataset will expand to the full uncompressed size, but by writing data to compressed format we reduce our overall storage footprint. Here’s the code to do it:
import gzip import time import pandas as pd import sqlalchemy DRIVER = "SQL Server" SERVER = "SERVER" DATABASE = "DATABASE" CHUNKSIZE = 100000 DATA_PATH = "COMPRESSED-SAMPLE-TABLE.csv.gz" # Create connection uri. conn_uri = f"mssql+pyodbc://{SERVER}/{DATABASE}?driver={DRIVER}".replace(" ", "+") # Initialize connection. conn = sqlalchemy.create_engine(conn_uri) SQL = "SELECT * FROM SAMPLE_TABLE" dfiter = pd.read_sql(SQL, con=conn, chunksize=CHUNKSIZE) t_i = time.time() trkr, nbrrecs = 0, 0 with gzip.open(DATA_PATH, "wb") as fgz: for df in dfiter: fgz.write(df.to_csv(header=nbrrecs==0, index=False, mode="a").encode("utf-8")) nbrrecs+=df.shape[0] print("Retrieved records {}-{}".format((trkr * CHUNKSIZE) + 1, nbrrecs)) trkr+=1 t_tot = time.time() - t_i retrieval_rate = nbrrecs / t_tot print( "Retrieved {} records in {:.0f} seconds ({:.0f} recs/sec.).".format( nbrrecs, t_tot, retrieval_rate ) )
The only expression requiring explanation is within df.to_csv
, where header=nbrrecs==0
is specified. This ensures that headers are written for the first batch of records only, and ignored for subsequent batches (100,000 record chunks are read in at each iteration).
To read the compressed file back into Pandas, use the pd.read_csv
function specifying the compression type (in this example we used “gzip” – other options are “zip”, “bz2” or “xz”):
In [1]: df = pd.read_csv(DATA_PATH, compression="gzip") In [2]: df.shape Out[2]: (678013, 12)
Want to share your content on python-bloggers? click here.