Have you ever wanted to track the prices of products on an ecommerce platform but found that no price tracker existed for that specific platform? In this article, I’ll share with you how I built a Price Tracker app for Moroccan ecommerce platforms and hosted it on AWS for free. This simple end-to-end data engineering project includes some UX elements and will teach you about web scraping and how to use some of AWS’s services. You can try the app using this link.
The Context & The Plan:
The main value of a Price Tracker is to provide you with the historical price of a product so that you can make your purchasing decision based on data, among other criteria, and minimize the effect of FOMO/discounts that can be in some cases just a form of marketing.
Price trackers exist for all major international ecommerce websites, such as Amazon, eBay, and Alibaba, but they don’t exist for ecommerce platforms in Morocco. The goal of this project is to create a simple price tracker and host it for free. To achieve the latter, I chose to make use of AWS free tier, which offers quite generous cloud resources, just enough to bootstrap this kind of project if used efficiently. Learn more about AWS Free Tier Here
Technical Architecture:
The following picture summarizes the architecture I chose to spread out and make use of a variety of AWS components, which is more optimal for efficiency:
The architecture split into two main sections:
Scraping/ETL: This section is responsible for periodically getting the data, transforming it, and loading it into a Postgres database. I made use of Airflow for scheduling and coordinating tasks written in Python and S3 for an extra backup of the data.
Data Delivery / UX : In classic web app fashion, we have a front-end UI written in JavaScript, calling a REST API which is, in this case, powered by AWS Lambda, which interacts with our Postgres database in RDS. Making efficient use of resources like this is what made it possible to host the project for free.
Data Model:
The main table, called “Prices,” holds historical price data. We also maintain details about the products tracked in the “products” table and analytics/recommendations related data, such as “prod_ranking” and “KPI” tables.
Generating the best deals:
To generate the best deals, we calculate the average price of a particular product and compare it to its actual price today, getting the percent difference. For example, in the picture below:
The product is down 27.62% from its average price. To further enhance recommendations, we prioritize popular products with the highest number of reviews by category.
Deep Dive Into Scraping:
The first step is to get URLs of the categories, as shown below:
Now each category has multiple pages, and we use the page number as a variable to navigate and grab products on each page.
Below is the full scraping code, it utilizes python’s request module, beautifulsoup to parse html and and tqdm for multithreading which accelerates the task. To learn more about scraping I’d recommend my article or similar content.
# coding=utf-8 | |
from bs4 import BeautifulSoup | |
import requests | |
from tqdm import tqdm | |
from datetime import datetime | |
import pandas as pd | |
from tqdm.contrib.concurrent import thread_map | |
outfile='/opt/airflow/dags/jumia_data'+str(datetime.today().strftime('%Y-%m-%d'))+'.csv' | |
def process_article(article): | |
dataid=article.find('a').get('data-id') | |
href=article.find('a').get('href') | |
category=article.find('a').get('data-category') | |
name=article.find('a').get('data-name') | |
price=article.find(class_='prc').text | |
stars=article.find(class_='stars _s').text if article.find(class_='stars _s') else None | |
reviewcount=article.find(class_='rev').text if article.find(class_='stars _s') else None | |
brand=article.find('a').get('data-brand') | |
discount=article.find(class_='bdg _dsct _sm').text if article.find(class_='bdg _dsct _sm') else False | |
boutiqueOfficielle=True if article.find(class_='bdg _mall _xs') else False | |
etranger=True if article.find(class_='bdg _glb _xs') else False | |
fastDelivery=True if article.find(class_='shipp') else False | |
image=article.find(class_='img').get('data-src') | |
return {'reviewcount':reviewcount,'img_url':image,'id':dataid,'href':href,'name':name,'category':category,'brand':brand,'price':price,'stars':stars,'discount':discount,'boutiqueOfficielle':boutiqueOfficielle,'etranger':etranger,'fastDelivery':fastDelivery,'timestamp':datetime.today().strftime('%Y-%m-%d %H:%M:%S')} | |
def process_page(url): | |
page = requests.get(url) | |
soup = BeautifulSoup(page.text, 'html.parser') | |
PageData=[process_article(article) for article in soup.find_all(name='article',attrs={'class':'prd _fb col c-prd'})] | |
PagaDataTable=pd.DataFrame(PageData) | |
PagaDataTable.to_csv(outfile, mode='a', index=False,header=False, encoding="utf-8") | |
def process_sub_category(url): | |
page = requests.get(url) | |
soup = BeautifulSoup(page.text, 'html.parser') | |
try: | |
numPagesToScrape=soup.find(name='a',attrs={'class':'pg','aria-label':'Dernière page'}).get('href').split("page=")[1].split("#")[0] | |
except: | |
numPagesToScrape='1' | |
urls=[url+'?page='+str(i) for i in range(int(numPagesToScrape)+1)] | |
thread_map(process_page,urls,max_workers=32) | |
def start_scrape(): | |
subCategories=pd.read_csv('/opt/airflow/dags/subCategoriesHrefs.csv') | |
subCategories.href=subCategories.href.apply(lambda s: str(s).split("?shipped_from=country_local")[0]) | |
pd.DataFrame({'reviewcount':[],'img_url':[],'id':[],'href':[],'name':[],'category':[],'brand':[],'price':[],'stars':[],'discount':[],'boutiqueOfficielle':[],'etranger':[],'fastDelivery':[],'timestamp':[]}).to_csv(outfile,index=False) | |
for subCategory in tqdm(subCategories.href): | |
process_sub_category(subCategory) | |
if __name__ == "__main__": | |
start_scrape() |
Airflow: A Powerful Task Scheduling Platform:
Airflow is a robust platform that enables users to create and run workflows using Directed Acyclic Graphs (DAGs) and tasks with dependencies and data flows taken into account. With Airflow, users can specify the order of execution and run retries as well as describe what to do with each task, such as fetching data, running analysis, triggering other systems, and more.
One of the most significant advantages of using Airflow is its user-friendly graphical interface, which allows you to track the progress of your tasks in real-time, while also providing built-in retry on failure and integration with most popular databases. Moreover, it stores the execution times and logs, making it incredibly useful for debugging.
To learn more about Airflow, check out the official documentation, which is the best place to get started.
Below is the DAG used in the project, along with the main Python code used to generate it:
import airflow | |
from airflow import DAG | |
from airflow.operators.python_operator import PythonOperator | |
from datetime import timedelta | |
import sys, os | |
sys.path.insert(1, '/opt/airflow/dags/scripts') | |
from scrapeJumia import * | |
from updateProducts import * | |
from updatePrices import * | |
from updateProdRanking import * | |
from updateKpi import * | |
from uploadS3 import * | |
default_args = { | |
'owner': 'airflow', | |
'depends_on_past': False, | |
'start_date': airflow.utils.dates.days_ago(2), | |
'email': ['youremail@gmail.com'], | |
'email_on_failure': True, | |
'email_on_retry': False, | |
'retries': 1, | |
'retry_delay': timedelta(minutes=1) | |
} | |
dag_python = DAG( | |
dag_id = "jumia_python", | |
default_args=default_args, | |
description='Dag that srapes from jumia and updates a postgres database in RDS', | |
schedule_interval='10 0 * * *', | |
catchup=False | |
) | |
scrape_jumia = PythonOperator(task_id='scrape_jumia', python_callable=start_scrape, dag=dag_python) | |
update_products = PythonOperator(task_id='update_products', python_callable=start_update_products, dag=dag_python) | |
update_prices = PythonOperator(task_id='update_prices', python_callable=start_update_prices, dag=dag_python) | |
update_prod_ranking = PythonOperator(task_id='update_prod_ranking', python_callable=start_update_prod_ranking, dag=dag_python) | |
update_kpi = PythonOperator(task_id='update_kpi', python_callable=start_update_kpi, dag=dag_python) | |
upload_s3 = PythonOperator(task_id='upload_s3', python_callable=start_upload_s3, dag=dag_python) | |
scrape_jumia >> upload_s3 >> update_products >> update_prices >> update_prod_ranking >> update_kpi | |
AWS Lambda: A Serverless Backend Solution:
Lambda functions are incredibly flexible and can be used for a wide range of applications. In this project, they were used as a REST API to offload the workload from the main EC2 server. It’s easy to get started with Lambda, simply choose your preferred language and start a function from scratch or use a container or one of the provided AWS blueprints.
Once you’ve created your function, you’ll need to set it up for your use case. In my experience, this includes setting up “layers,” which allow your function to use external libraries such as pandas and sqlalchemy. You’ll also need to set up the REST API to call the function from the web, enabling CORS (Cross-Origin Resource Sharing) to allow calls from your browser. The AWS documentation does an excellent job of explaining this.
After setting up your Lambda function, you’ll have a function with layers and an API gateway:
To enable your function to communicate with your RDS database, you’ll need to connect it to a VPC in the same subnets as your RDS setup and create a “security group” that allows connection on the Postgres port 5432 and assign it to the function:
Here’s an example of a function that gets product details given a product ID or URL:
import json | |
from sqlalchemy import create_engine | |
import pandas as pd | |
def lambda_handler(event, context): | |
engine = create_engine('postgresql://postgres:password!@host:5432/database') | |
try: | |
product_id=json.loads(event['body'])["prod_id"] | |
sql="SELECT * FROM products where id='"+product_id+"'" | |
except: | |
href_full=json.loads(event['body'])["href"] | |
if href_full.find("www")>0: | |
href=json.loads(event['body'])["href"].split("https://www.jumia.ma")[1] | |
if href.find("?")>0: | |
href=href.split("?")[0] | |
else: | |
href=json.loads(event['body'])["href"].split("https://jumia.ma")[1] | |
if href.find("?")>0: | |
href=href.split("?")[0] | |
sql="SELECT * FROM products where href='"+href+"'" | |
product=pd.read_sql(sql,con=engine) | |
return { | |
'headers': { | |
'Content-Type': 'application/json', | |
'Access-Control-Allow-Origin': '*', | |
'Access-Control-Allow-Headers': 'Authorization,Content-Type', | |
'Access-Control-Allow-Method': 'GET,POST,OPTIONS', | |
}, | |
'statusCode': 200, | |
'body': json.dumps(product.to_dict(orient='records')[0]) | |
} |
Conclusion:
It was an exciting and fulfilling experience working on this project, as it has real-world applications for the average person. AWS’s free tier offers a generous package, making it ideal for prototyping compared to the competition. As long as you use it efficiently and do not exceed the limits, you can host almost any project.
Thank you for reading this article. We hope you found it informative and learned something new. If you have any questions or would like to discuss further, feel free to reach out on LinkedIn: LinkedIn.