Creating a Python Package with Poetry for Beginners Part 3
Want to share your content on python-bloggers? click here.
Intro
This it the third part of a blog series. In the previous posts we have addressed: creating a package with Poetry, managing our development environment and
adding a function in part one;
and package documentation, testing and how to publish to PyPI in
part two.
In those previous posts, I
developed a function for summarising the successes (and failures) of the teams in a fantasy football league. That function makes various API calls which in theory could all be made in parallel to speed up
the runtime.
In this blog I aim to parallelise the function get_season_league which I wrote in the
first blog.
Starting Function
Here is the function written in part one:
import requests
import pandas as pd
import json
def get_season_league(league_id = "485842"):
api_url = "https://fantasy.premierleague.com/api/"
url = api_url+ "leagues-classic/" + league_id + "/standings/"
response = requests.get(url)
data = json.loads(response.text)
league = pd.DataFrame(data['standings']['results'])
df = pd.DataFrame([])
for index, row in league.iterrows():
player_query = api_url + "entry/" + str(row['entry']) + "/history"
player_response = requests.get(player_query)
player_data = json.loads(player_response.text)
player_df = pd.DataFrame({
'name': row['player_name'],
'team_name': row['entry_name'],
'event': pd.json_normalize(
player_data['current']
)['event'],
'points': pd.json_normalize(
player_data['current']
)['total_points']
})
df = pd.concat([df, player_df])
return df
The logic is as follows:
- Query API to get current league data
- Loop over each member of the league
- Query API for individual player
- Return relevant data
The way it is currently written is how any normal for loop will run, where the current
iteration must finish before the next one starts. With this code we shouldn’t need to wait for the previous
API call, there is no dependency or anything like that. In theory we could run all of the individual player
queries at once and the function would be a lot faster.
Measuring function calls in Python
We can measure how long it takes to run a piece of Python code using the time package. For example measuring
my get_season_league function:
import time
from get_league import get_season_league
start_time = time.time()
x = get_season_league()
print("--- %s seconds ---" % (time.time() - start_time))
My function was taking ~3.5 seconds for the default league. Which has 13 players and there has been
11 game weeks. An average of 0.27 seconds per player (including the single original API call).
I also tested it for a larger league of 50 people and seems to take ~13 seconds but with more variance. This
is a similar 0.26 seconds per player.
So this is why I want to parallelise the function, as if the non-dependent API calls could be
made all at once, or at least multiple at once the function could be sped up massively. For example
for the league of 50 taking the time per player at 0.26 seconds if I introduce two processes
at once then it could take ~6.5 seconds, or 4 processes ~3.25. These values are approximate,
but hopefully you can see the value of splitting up the parallelisable parts of the workload.
Optimising the Function
Before starting on the asynchronous side there is a few things we can address first.
iterrows() Alternative
The iterrows() function is pretty inefficient for this use case (generally as well).
This blog explains
it well and why there are better alternatives like itertuples. However I am just going to loop
over a zip of the values I need.
# Old: for index, row in league.iterrows(): player_id = row['entry'] player_name = row['player_name'] team_name = row['entry_name'] # New: for player_id, player_name, team_name in zip( league['entry'], league['player_name'], league['entry_name'] ):
Concatenating DataFrames
Another area I could improve the function is switching away from concatenating dataframes from within
the for loop, towards either concatenating once at the end or creating a list of dictionaries then converting
to a DataFrame at the end.
The reason for this is the way Pandas handles DataFrame memory allocation, more detail on this Saturn
Cloud blog.
# Old:
df = pd.DataFrame([])
for index, row in league.iterrows():
player_query = api_url + "entry/" + str(row['entry']) + "/history"
player_response = requests.get(player_query)
player_data = json.loads(player_response.text)
player_df = pd.DataFrame({
'name': row['player_name'],
'team_name': row['entry_name'],
'event': pd.json_normalize(
player_data['current']
)['event'],
'points': pd.json_normalize(
player_data['current']
)['total_points']
})
df = pd.concat([df, player_df])
return df
# New:
list_to_df = []
for player_id, player_name, team_name in zip(
league["entry"], league["player_name"], league["entry_name"]
):
player_query = api_url + "entry/" + str(player_id) + "/history"
player_response = requests.get(player_query)
player_data = json.loads(player_response.text)
player_df = pd.DataFrame({
'name': player_name,
'team_name': team_name,
'event': pd.json_normalize(
player_data['current']
)['event'],
'points': pd.json_normalize(
player_data['current']
)['total_points']
})
list_to_df.append(player_df)
df = pd.concat(list_to_df, ignore_index=True)
return df
These changes do seem to have sped up the function by a few seconds (for the league of 50) but the bulk
time is taken by the API queries so these best practices aren’t going to speed it up too much, but are
worth implementing nevertheless.
Asynchronising the Code
Before I start on this section I will give a brief background on asynchronous programming but if you want
more detail please read this blog.
There is two main routes we can go down here:
concurrent.futures.ThreadPoolExecutorwill use multiple threads, so the code is technically synchronous
it will just be running at the same time in different use cases. This will be easier to implement with the
current code however the time gains wouldn’t scale as much as the alternative. This approach will use more
computational power as we’ll need additional processors.asynciowill use a single threaded multi-tasking, truly asynchronous code. The syntax is more complex
and doesn’t integrate very well with my current function for example I will need to replacerequestswith
aiohttp. This would definitely be the better option if I was making lots of api calls, but on a smaller
scale the gains wouldn’t be as significant.
concurrent.futures.ThreadPoolExecutor
For this blog I will be going with concurrent.futures.ThreadPoolExecutor as it integrates nicely with my
existing code and the bigger gains from asyncio won’t really suit my use case.
The first thing I need to do (which could’ve been done earlier) is extract the per player logic to a separate function.
This function will take a players details then use the player ID to query the API and grab the players season data. It
will then nicely return it as a DataFrame.
def get_player_data(player_info, api_url):
"""Fetch data for a single player and return as DataFrame"""
player_id = player_info['entry']
player_name = player_info['player_name']
team_name = player_info['entry_name']
player_query = api_url + "entry/" + str(player_id) + "/history"
player_response = requests.get(player_query)
player_data = json.loads(player_response.text)
# Create DataFrame for this player
player_df = pd.DataFrame({
'name': player_name,
'team_name': team_name,
'event': pd.json_normalize(player_data['current'])['event'],
'points': pd.json_normalize(player_data['current'])['total_points']
})
return player_df
I will also need to adapt how I iterate over the player data. I know I’ve already switched from iterrows to
a for loop over a zip of the relevant data but, then new function will use a different method of iteration. So
I am creating a ‘records’ dictionary of the relevant data which I can then pass directly to my new get_player_data
function.
players = league[['entry', 'player_name', 'entry_name']].to_dict('records')
Next comes the ThreadPoolExecutor, this is what allows us to run multiple API calls at once. It allows
to create and send code to other Python threads (workers). I will first initialise an empty list to
write my player dataframes to. Then I’ll use ThreadPoolExecutor(max_workers=10) to create 10 workers
that we can send code to (I am using 10 as an example, this will be an argument the user can change in
the final function). exector is the object used to send code to the new workers, I can use executor.map
to map get_player_data over the players dictionary and save the output to our initialised list.
from concurrent.futures import ThreadPoolExecutor def get_season_league(league_id = "485842"): # ... player_dfs = [] with ThreadPoolExecutor(max_workers=10) as executor: results = executor.map(get_player_data, players) player_dfs = list(results)
Finally we use the change mentioned above of using a single pd.concat so we only run it once rather than
n many times.
df = pd.concat(player_dfs, ignore_index=True)
So our final functions will look like this, with get_player_data defined inside get_season_league so
the api_url is available:
def get_season_league(league_id="485842", max_workers=10):
api_url = "https://fantasy.premierleague.com/api/"
url = api_url + "leagues-classic/" + league_id + "/standings/"
response = requests.get(url)
data = json.loads(response.text)
league = pd.DataFrame(data['standings']['results'])
def get_player_data(player_info):
"""Fetch data for a single player and return as DataFrame"""
player_id = player_info['entry']
player_name = player_info['player_name']
team_name = player_info['entry_name']
player_query = api_url + "entry/" + str(player_id) + "/history"
player_response = requests.get(player_query)
player_data = json.loads(player_response.text)
# Create DataFrame for this player
player_df = pd.DataFrame({
'name': player_name,
'team_name': team_name,
'event': pd.json_normalize(player_data['current'])['event'],
'points': pd.json_normalize(player_data['current'])['total_points']
})
return player_df
players = league[['entry', 'player_name', 'entry_name']].to_dict('records')
player_dfs = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = executor.map(get_player_data, players)
player_dfs = list(results)
df = pd.concat(player_dfs, ignore_index=True)
return df
When I run the function on the league of 50, it now takes ~1.5 seconds rather than the original ~13 seconds.
Summary
So we’ve optimised the function to a good degree using a few adjustments to the orginial function, then using multiple
threads to run API calls at the same time. There is still some things left on the table like using asyncio
instead or even executor.submit() to have more control of the individual player queries (handling errors etc). So
perhaps in a future blog we will look at speeding the function up a little bit more.
For updates and revisions to this article, see the original post
Want to share your content on python-bloggers? click here.
