A Real-World Scrapy Tutorial: A Python Lego Shop Restock Bot

A Real-World Scrapy Tutorial: A Python Lego Shop Restock Bot

Lately I figured out, that many products are temporarily not available in the German Lego shop. The available Lego product monitoring feature also only provides the possibility to get notified about specific products, but there is no possibility to get informed about all products that are in stock again. Therefore I decided to write my own scraping and monitoring tool to get the updates regularly using Python and the Scrapy framework.

The tool crawls products from the Lego shop page and stores them into a database. It regulary (e.g. every 2 hours) updates the availability of Lego shop products in a database and publishes notifications in a Telegram channel.

Since there are rare tutorials online, how to setup a scrapy-project end-to-end, I will demonstrate all the needed steps in this guide using the real-world Lego shop example. I will rather focus on the methods, instead of explaining every single line of code. In order to follow all the details, please refer to the scrapy documentation or to the common questions asked on stackoverflow.

This guide is split in six parts:

  1. Setup Python and the Scrapy Project
  2. Crawl Lego Products using a Scrapy Spider
  3. Save the Products to a Database
  4. Scrape the Availability of Lego Products
  5. Send Notifications using a Telegram Bot
  6. Deploying to a DigitalOcean Droplet

The full source code for this guide can be found on GitHub: chrizog/lego-restock-bot.

Setup Python and the Scrapy Project

As the first step create a new folder on your machine, clone this guide's repository and setup a new Python virtual environment.

mkdir lego-project && cd lego-project
git clone https://github.com/chrizog/lego-restock-bot.git
python3 -m venv venv_lego

Activate the virtual environment and install all dependencies needed including Scrapy from the requirements.txt.

source venv_lego/bin/activate
pip3 install -r requirements.txt

Your Python setup is done. If you start your project from scratch you would also use the scrapy startproject command. This is already done in the given repository. If you are starting from scratch, execute:

scrapy startproject lego

Your folder structure should look something like this now:

├── lego-project
│    ├── venv_lego
│      ├── bin
│      ├── include
│      ├── lib
│      └── pyvenv.cfg
│    ├── lego-restock-bot
       ├── README.md
       ├── lego
       │   ├── __init__.py
       │   ├── database.py
       │   ├── evaluate_availability.py
       │   ├── items.py
       │   ├── middlewares.py
       │   ├── pipelines.py
       │   ├── settings.py
       │   ├── spiders
       │   └── telegram_message.py
       ├── requirements.txt
       ├── scrapy.cfg
       ├── test.py
       └── update_availability.sh

Crawl Lego Products using a Scrapy Spider

Setup the first Spider

In the first step we want to find as many Lego products as possible from the Lego shop page and store them. For that purpose a new Scrapy spider is put in a new file called lego/spiders/product_spider.py.

A spider is a class that defines how a site is scraped. It parses responses and extracts data using CSS selections or XPath expressions. Out of the extracted data items are populated. It's also possible to extract links from the response and yield additional requests to follow. (https://docs.scrapy.org/en/latest/topics/spiders.html).

In product_spider.py a new class LegoProductSpider is created that inherits from scrapy.Spider:

class LegoProductSpider(scrapy.Spider):
    """A scrapy spider to search for Lego products in the Lego shop page"""

    name = "products"
    custom_settings = {
        "ITEM_PIPELINES": {
            "lego.pipelines.DuplicatesPipeline": 200,
            "lego.pipelines.LegoPipeline": 300,
        }
    }

    def __init__(self, name=None, **kwargs):
        super().__init__(name=name, **kwargs)
        self.link_extractor = LinkExtractor(
            allow=[r".*lego\.com/de-de.*"],
            deny=[r".*lego\.com(.*)(\.\w{1,3})$", r".*@lego\.com.*"],
        )

    def start_requests(self):
        urls = ["https://www.lego.com/de-de/themes"]
        for url in urls:
            yield scrapy.Request(url=url, callback=self.parse)

    def parse(self, response: scrapy.http.Response, **kwargs):
        page = response.url
        self.log(page)
        if "de-de/product" in page:
            if response.css(".eqJexe .hlipzx::text").get():
                self.log("############# Found product page: " + page + " #############")

                if not "Altes Produkt" in response.css(".ejRirH .hlipzx::text").get():
                    loader = ItemLoader(item=LegoItem(), response=response)
                    loader.add_css("name", ".eqJexe .hlipzx::text")
                    loader.add_css("price", ".eGdbAY::text")
                    loader.add_css(
                        "product_id",
                        ".ProductDetailsstyles__ProductID-sc-16lgx7x-10.bIKuiP::text",
                    )
                    loader.add_css("availability", ".ejRirH .hlipzx::text")
                    loader.add_value("url", response.url)
                    yield loader.load_item()

        for next_page in self.link_extractor.extract_links(response):
            yield response.follow(next_page, self.parse)

The name of the spider can be simply set as a class variable. It can be used later to run the spider.

In the start_requests method, you can define the starting URLs and yield a scrapy Request for every start URL. In this case, we only start with https://www.lego.com/de-de/themes, which is the shop's product categories page. It's a good starting point because it will lead directly to all the different products. However, you could also use the homepage or additional urls.

The generated requests will lead to triggering the parse method of the class. In the parse method you extract information from the response and yield items or yield additional requests to follow. The logic is simple: If we are on a Lego product page, we extract the information about the product and yield a new Lego item. Additionally all links from the page are extracted and followed, which will lead to additional requests scheduled by the Scrapy framework.

Finding CSS selectors using Chrome and the SelectorGadget extension

We can easily check if we are on a Lego product page by checking the URL of the response. If it contains "de-de/product" we know we are on a product page. Also we can check the existence of some CSS selectors specific to product pages.

One possibility for finding CSS selectors is using the Chrome browser, opening the developer tools (Option + ⌘ + J on macOS) and check the HTML of the elements you want to extract data from. This works, but can be quite cumbersome.

A really helpful chrome extension is SelectorGadget. With this extension you can simply click on a page element for which you want to generate a selector to match. In the bottom right box SelectorGadget will then generate a minimal CSS selector for that element. It will also highlight everything else that is matched by the selector.

In this screenshot I am generating the CSS selector for the product price in Euro. The CSS selector for the price is .eGdbAY which we will use to extract the string "399,99 €" from the page.

Using selectorgadget for generating scrapy css selectors

Testing CSS selectors using the Scrapy shell

After figuring out a CSS selector using SelectorGadget, we want to test the selector if it really works with the page. In order to not run a whole project and for debugging selectors, Scrapy provides the Scrapy shell:

"The Scrapy shell is an interactive shell where you can try and debug your scraping code very quickly, without having to run the spider." (taken from Scrapy shell)

Use the Scrapy shell for the Lego Hogwarts castle product and test the price CSS selector. Open up a terminal and use:

scrapy shell https://www.lego.com/de-de/product/hogwarts-castle-71043

In the interactive shell you have the response object available on which you can test the CSS selectors. With ::text you can additionally extract the text content of the selected element:

2021-10-14 14:09:14 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://www.lego.com/robots.txt> (referer: None)
2021-10-14 14:09:15 [scrapy.core.engine] DEBUG: Crawled (200) <GET https://www.lego.com/de-de/product/hogwarts-castle-71043> (referer: None)
[s] Available Scrapy objects:
[s]   scrapy     scrapy module (contains scrapy.Request, scrapy.Selector, etc)
[s]   crawler    <scrapy.crawler.Crawler object at 0x1062f3a30>
[s]   item       {}
[s]   request    <GET https://www.lego.com/de-de/product/hogwarts-castle-71043>
[s]   response   <200 https://www.lego.com/de-de/product/hogwarts-castle-71043>
[s]   settings   <scrapy.settings.Settings object at 0x1062f39d0>
[s]   spider     <DefaultSpider 'default' at 0x106e9a550>
[s] Useful shortcuts:
[s]   fetch(url[, redirect=True]) Fetch URL and update local objects (by default, redirects are followed)
[s]   fetch(req)                  Fetch a scrapy.Request and update local objects 
[s]   shelp()           Shell help (print this help)
[s]   view(response)    View response in a browser
>>> response.css(".eGdbAY::text").get()
'399,99\xa0€'
>>> 

Since we now know how to generate and test CSS selectors, we can also test the other selectors for the product name, availability text and Lego item number.

Extracting Items

Inside the parse method, the CSS selectors are used to extract information and generate a Scrapy item. If you take a look at the Scrapy architecture, items are the objects generated by a spider and sent to item pipelines. In the pipelines the items are processed sequentially and e.g. stored to a database. The pipelines are explained later.

The architecture can be found under: Scrapy Architecture Overview

Items are simply Python objects defining key-value pairs. The key-value pairs can be defined by declaring Fields inside an Item. The Field objects allow to specify metadata, e.g. you can specifiy a serializer or you can add pre- and postprocessing to each Field. In this way you can separate the pre- and postprocessing out of your Spider class. You can easier reuse your processing code in different Items.

For the Lego products five fields shall be extracted:

  • name
  • price
  • product number
  • availability and
  • shop url.

Inside lego/items.py a LegoItem class is declared that inherits from scrapy.Item. The item class has five fields. The fields have input and output processors. E.g. the price is extracted as a string from the spider. This string looks like this: "19,99\xa0€". Later the price shall be stored as an integer value in cents. For that reason you can implement a process_price function, that is used as an input processor for the price field of the item. These functions can be reused in other Items and Fields. Also chaining different processors is possible.

The full source can be seen here.

def process_price(price_text: str) -> int:
    """Convert the incoming price string to an integer in cents
    Example:
        in: 19,99\xa0€
        out: 1999
    """
    try:
        splitted = price_text.split("\xa0")
        price = float(splitted[0].replace(",", "."))
        price = int(price * 100)
        return price
    except:  # pylint: disable=bare-except
        return 0
...

class LegoItem(scrapy.Item):
    """Model for a Lego scrapy Item

    Fields:
        - name
        - price: in cents
        - product_id: Lego product ID
        - availability: integer representing the availability status
        - url: url to lego product page
    """

    name = scrapy.Field(output_processor=TakeFirst())
    price = scrapy.Field(
        input_processor=MapCompose(process_price), output_processor=TakeFirst()
    )
    product_id = scrapy.Field(
        input_processor=MapCompose(str_to_int), output_processor=TakeFirst()
    )
    availability = scrapy.Field(
        input_processor=MapCompose(availability_str_to_int),
        output_processor=TakeFirst(),
    )
    url = scrapy.Field(output_processor=TakeFirst())

Inside the LegoProductSpider we can import the LegoItem class and populate LegoItem objects. A very convenient mechanism for that is to use the ItemLoader API from scrapy. You first instantiate an ItemLoader with an item and the response. Afterwards you can easily collect the values into the item by using CSS selectors with the add_css method.

Following Links

In the previous chapters data was extracted out of Lego product pages and put into items which will be processed and stored later in item pipelines. However, also a link following functionality is needed to discover new product pages.

Links could be either retrieved by using CSS selectors like

response.css('a::attr(href)').getall()

but the issue here is that you start extracting emails, follow different subdomains outside the shop or follow links to the shop of different countries.

An easier way to extract links is using the Scrapy LinkExtractor class. It allows you to define regular expressions for links that are allowed or denied. In that way you can deny pages of different countries, different domains, emails and only allow the German shop page.

The LinkExtractor is instantiated in the spider and used with the extract_links function:

    def __init__(self, name=None, **kwargs):
        super().__init__(name=name, **kwargs)
        self.link_extractor = LinkExtractor(
            allow=[r".*lego\.com/de-de.*"],
            deny=[r".*lego\.com(.*)(\.\w{1,3})$", r".*@lego\.com.*"],
        )

    def parse(self, response: scrapy.http.Response, **kwargs):
        ...
        for next_page in self.link_extractor.extract_links(response):
            yield response.follow(next_page, self.parse)    

Save the Products to a Database

The next step is to save the crawled Lego product Items to a database. For the Lego products a SQLite database is used.

As shown in the Scrapy architecture above, items are sent from the spider to item pipelines. Item pipelines receive the item in their function and perform an action on it, e.g. storing it in a database, checking for duplicate items, etc. You can also concatenate multiple pipelines, so that you first check for duplicates in one pipeline and store the item in the database in the subsequent pipeline.

Setup the database

For communicating with the SQLite database, SQLAlchemy is used. It's a Python SQL toolkit and Object Relational Mapper (ORM). No SQL statements are needed inside the Python code itself and the object-oriented approach can be used to perform operations on the database.

In a file called lego/database.py two tables are defined:

  • Product
  • Availability

The Product table all crawled Lego products with name, price, product number, etc. are stored. In the Availability table, each entry will have the current timestamp and the availability status of a Product by having a ForeignKey which is the Products primary key.

Furthermore, additional functions for database operations are put into the database.py module, like adding and querying products. I highly recommend to implement all these functions by using unit tests. In that way, the database work can be prepared and debugged without working with the whole Scrapy chain. The unit tests are simply put into a test.py in the root of the repository.

The SQLalchemy connection string can be put in settings.py and retrieved with the scrapy util:

get_project_settings().get("CONNECTION_STRING")

For the full source code, see database.py.

Implement the Item pipeline

In lego/pipelines.py we create two classes: LegoPipeline and DuplicatesPipeline. By implementing a process_item method in each class they can be used as Scrapy Item Pipelines. The process_item method is called for every item populated by the spider and must either return the item (so it is processed further) or drop the item (by raising a DropItem exception). If the item is dropped it is no longer processed by subsequent pipelines.

In the process_item method of DuplicatesPipeline, it is checked whether the Lego product already exists in our database. If that's the case, the Lego product shall not be added to the database and a DropItem exception is raised. If it's not existent in the database, the item is returned which means that it will be further processed.

In the process_item method of LegoPipeline, we populate a new Product object for SQLAlchemy and collect the values from the scrapy item (LegoItem). Afterwards the product is added to the database.

class DuplicatesPipeline:
    """Pipeline for dropping duplicate lego products"""

    def __init__(self):
        """Create tables if they don't exist yet"""
        self.engine = db_connect()
        create_table(self.engine)

    def process_item(self, item: LegoItem, spider):
        """Pipeline process function.
        Drop items that already exist in the database
        """
        if does_product_exist(item["product_id"], self.engine):
            raise DropItem(f"Duplicate item found: {item['name']}")
        return item

class LegoPipeline:
    """Pipeline for adding new lego products to the database"""

    def __init__(self) -> None:
        """Create tables if they don't exist yet"""
        self.engine = db_connect()
        create_table(self.engine)

    def process_item(self, item: LegoItem, spider):
        """Pipeline process function.
        Save products in the database
        """
        product = Product()
        product.name = item["name"]
        product.price = item["price"]
        product.product_id = item["product_id"]
        product.url = item["url"]
        add_product(product, self.engine)
        return item

The pipelines need to be activated and put into an order. This can be done in settings.py as a global setting. However, later we will implement a second different spider and we will need different pipelines for different spiders. For that purpose you can specify custom setttings inside a spider. In the LegoProductSpider it looks like this:

class LegoProductSpider(scrapy.Spider):
    """A scrapy spider to search for Lego products in the Lego shop page"""

    name = "products"
    custom_settings = {
        "ITEM_PIPELINES": {
            "lego.pipelines.DuplicatesPipeline": 200,
            "lego.pipelines.LegoPipeline": 300,
        }
    }
    ...

The integer value for each pipeline defines the order in which the pipelines will run. The pipelines will be ordered from lower values to higher values. The DuplicatesPipeline will run before the LegoPipeline.

Run the Spider and Pipelines

Finally, the whole processing chain can be started. In order to start with scheduling the requests, processing and storing the items the scrapy crawl command is used.

scrapy crawl products

products is the name declared in the LegoProductSpider class. After followoing some links, starting from the categories page, product pages are found, items are populated and added to the database e.g.:

{'availability': 2,
 'name': 'Ferrari F8 Tributo',
 'price': 1998,
 'product_id': 76895,
 'url': 'https://www.lego.com/de-de/product/ferrari-f8-tributo-76895?page=3'}

Since the products do not change that often it is sufficient to run this spider occassionally for searching new products.

Inspect the database

The SQLite database is created under lego/scrapy_products.db. For inspection the DB Browser for SQLite (https://sqlitebrowser.org) can be used:

Using DB Browser for SQLite to inspect the crawled products

Scrape the Availability of Lego Products

Spider and pipeline implementation

After crawling Lego products, the next step is regularly checking the availability of the products. Therefore, we will create a second spider in lego/spiders/product_spider.py. Optionally it can be put into a separate file in the spiders folder.

The AvailabilitySpider will behave slightly different than the first ProductSpider. The AvailabilitySpider won't follow any links on the responses. Instead it will initially read all URLs of the products from the database and use them as start urls. In the responses on these product pages the information is extracted in the same way as in the LegoProductSpider and LegoItems are populated. Aftwards new entries will be added to the Availability table in the database. Each entry consists of

  • the current timestamp
  • the Lego product id
  • the availability represented as an integer value.

Since no links are followed in the new spider, it will run quite fast for around 1000 product pages.

The AvailabilitySpider looks like the following. The parse method is similar to the LegoProductSpider. In start_requests all product URLs are queried from the existing database and requests are yielded.

class AvailabilitySpider(scrapy.Spider):
    """A scrapy spider to extract the availabilities of crawled Lego products"""

    name = "availability"
    custom_settings = {
        "ITEM_PIPELINES": {
            "lego.pipelines.AvailabilityPipeline": 200,
            "lego.pipelines.UpdatePricePipeline": 300,
        }
    }

    def __init__(self, name=None, **kwargs):
        super().__init__(name=name, **kwargs)

    def start_requests(self):
        urls = load_product_urls(db_connect())
        for url in urls:
            yield scrapy.Request(url=url, callback=self.parse)

    def parse(self, response: scrapy.http.Response, **kwargs):
        ...

In the AvailabilityPipeline's process_item method it is checked whether the product with the id exists in the database. If it exists, a new entry is added to the Availability table. The timestamp does not need to be set since the default value in the SQLite database is configured as the current time.

class AvailabilityPipeline:
    """Pipeline for adding an availability entry for a product to the database"""

    def __init__(self) -> None:
        """Create tables if they don't exist yet"""
        self.engine = db_connect()
        create_table(self.engine)

    def process_item(self, item: LegoItem, spider):
        """Pipeline process function.
        Drops the item if product does not exist in the database.
        Adds an entry to the Availability table for the product
        """
        if not does_product_exist(item["product_id"], self.engine):
            raise DropItem(f"Product does not exist in database: {item['name']}")

        availability = Availability()
        availability.product_id = item["product_id"]
        availability.availability = item["availability"]
        add_availability(availability, self.engine)
        return item

Additionally an UpdatePricePipeline is implemented, which updates the price of existing products in the database, so the information is always up-to-date.

Scraping the availability status can be run with:

scrapy crawl availability

Evaluation and notification script

Since all information is now extracted using Scrapy and stored in the database, a Python script evaluate_availability.py is implemented that reads the data from the database and evaluates it. For each product it tests whether the last availability changed from "Not available" to "Available". In that case a Telegram message is created and sent.

Here are the most important snippets of the script. Initially all products are loaded. For each product the Availability entries are queried (ascending by time) and the two newest entries are compared. If there's a change in the status, the message is sent:

if __name__ == "__main__":
    ...
    product_ids = load_product_ids(engine)

    for product_id in product_ids:
        availabilites = get_availabilities(product_id, engine)

        if len(availabilites) > 1:
            last_availability = availabilites[-1]["availability"]
            n_last_availability = availabilites[-2]["availability"]

            if not last_availability == n_last_availability:
                old_status = availability_int_to_str(n_last_availability)
                new_status = availability_int_to_str(last_availability)
                product_name = get_product_name(product_id, engine)
                
                ...
                telegram_bot.send_html_message_to_channel(message)

The full source of the script can be found here.

For retrieving regular updates on the Lego products the Scrapy spider and the evaluation script need to be combined. I simply put into one bash script for a more convenient interface. Later it will be scheduled on DigitalOcean using cron:

#!/bin/bash
VENV_PATH="<path to virtual env>"
SCRAPY_PROJECT_ROOT_DIR="<path to scrapy project>"
cd $SCRAPY_PROJECT_ROOT_DIR
source $VENV_PATH
scrapy crawl availability
python3 lego/evaluate_availability.py

Send Notifications using a Telegram Bot

In this section the steps are summarized how a simple notification bot using Telegram can be implemented.

A Telegram Bot in combination with a public channel is used. The Telegram Bot needs to be configured as an administrator of the channel, so it can publish the messages there.

  1. Create a new Telegram bot: In Telegram write "/newbot" to the BotFather. The BotFather will ask you about the name und username for your Bot. Afterwards you will receive a token to access the HTTP API. Note down your token. For the Lego it is put into a .env file, in order to have it not in the Python source code.
  2. Create a public Telegram channel: In the Telegram app, click on the Edit icon and create a new public channel. Afterwards edit the channel and make your own Telegram bot one of the administrators.
  3. Figure out the channel ID: The Telegram API needs to get the id of the channel it shall write to. For figuring out the id of the newly created channel, simply copy the invitiation link of your channel and send it to the bot called "IDBot" (@username_to_id_bot). It will reply with the channel id.
  4. Using the Python API: Afterwards you can use the official Python Telegram API to send messages like so:
import telegram
...
bot = telegram.Bot(token=token)
...
bot.send_message(text=message, chat_id=channel_id, parse_mode="html")

The full source can be seen here.

Deploying to a DigitalOcean Droplet

In order to get regular notifications from the bot, the whole project is deployed to a DigitalOcean Ubuntu 18 droplet (DigitalOcean's virtual machines).

Setting up the project

After setting up a DigitalOcean account and Ubuntu droplet, login to your machine via SSH.

Clone the repository from GitHub, create the virtual environment and install all needed Python dependencies:

git clone https://github.com/chrizog/lego-restock-bot.git
python3 -m venv venv_lego
source venv_lego/bin/activate
pip3 install -r requirements.txt

Afterwards the LegoProductSpider can be run once in order to create the database and crawl Lego shop products. Optionally upload your existing SQLite database to DigitalOcean.

scrapy crawl products

Cronjob

As described above, the AvailabilitySpider and the evaluation script that are combined into a single bash script need to be run regularly. For the regular execution a cronjob is created that runs every two hours. On the DigitalOcean droplet use:

crontab -e

and add the following line to the end:

0 */2 * * * /<your path>/update_availability.sh

The crontab schedule expression 0 /2 * * will run the script at minute 0 of every second hour. For more crontab expressions you can use crontab guru which provides examples and explanations of the expressions.

In case of products that were out of stock and now in stock again, you will receive messages every two hours:

Screenshot Telegram bot using Scrapy

I hope you liked this guide and got (maybe) inspiration for your own Scrapy project!