How to ingest premium market data with Zipline Reloaded

December 18, 2023
Facebook logo.
Twitter logo.
LinkedIn logo.

How to ingest premium market data with Zipline Reloaded

This article explains how to build the two Python scripts you need to use premium data to create a custom data bundle using Zipline Reloaded.

Step 1: Subscribe to premium data

By now you should already have an account with Nasdaq Data Link. If not, head over to https://data.nasdaq.com and set one up.

You’re looking for QuoteMedia End of Day US Stock Prices. This product offers end-of-day prices, dividends, adjustments and splits for US publicly traded stocks with history to 1996. Prices are provided both adjusted and unadjusted. The product covers all stocks with primary listing on NASDAQ, AMEX, NYSE, and ARCA.

You can find the page to subscribe here: https://data.nasdaq.com/databases/EOD/data

Once subscribed, you’ll be able to use it through your API key.

Step 2: Create/Edit extension.py

Now we’ll create the two files we need to create the bundle.

For Windows users

In the .zipline directory, you will store the extension.py file, which informs Zipline about the custom data bundle.

  1. Open the File Explorer and navigate to your home directory. You should find the .zipline folder there. If you're not sure where your home directory is, it's usually C:\Users\[YourUsername].
  2. Open the .zipline folder.
  3. Right-click within the folder, select New, then choose Text Document. Rename the newly created file to extension.py. Make sure you change the file extension from .txt to .py.

Note: If you can't see file extensions in your File Explorer, you'll need to enable them. To do this, click on the View tab in File Explorer, and then check the box for File name extensions.

For Mac/Linux/Unix users

  1. Open Terminal: You can do this by searching for "Terminal" using Spotlight (Cmd + Space) on Mac or by accessing it from the Applications folder.
  2. Navigate to .zipline Directory: By default, the terminal opens in your home directory. To ensure you're in the home directory and then navigate to the .zipline directory, you can use the following commands:cd ~ cd .zipline
  3. Create/Edit the extension.py File:
    • If the file doesn't exist: You can create it using the touch command followed by opening it with a text editor of your choice. touch extension.py
    • If the file already exists: Simply open it with a text editor.

For all users

Within the editor, you can now proceed to input or edit the necessary content. In the file, add the following content:

1import sys
2from pathlib import Path
3sys.path.append(Path("~", ".zipline").expanduser())
4
5from zipline.data.bundles import register
6
7from daily_us_equities import daily_us_equities_bundle
8
9register("quotemedia", daily_us_equities_bundle, calendar_name="XNYS")

Save and close the file

Step 3: Create the code to build the bundle

Use the instructions above to create a file called daily_us_equities.py.

In the file, add the following code exactly as is (do not alter!):

1import time
2from io import BytesIO
3from zipfile import ZipFile
4
5import numpy as np
6import pandas as pd
7import requests
8from click import progressbar
9from logbook import Logger
10from six import iteritems
11from six.moves.urllib.parse import urlencode
12
13log = Logger(__name__)
14
15DATA_START_DATE = "2000-01-01"
16ONE_MEGABYTE = 1024 * 1024
17DATALINK_DATA_URL = "https://data.nasdaq.com/api/v3/datatables/QUOTEMEDIA/PRICES"
18MAX_DOWNLOAD_TRIES = 5
19
20
21def format_metadata_url(api_key):
22    """Build the query URL for Quandl WIKI Prices metadata."""
23    columns = ",".join(
24        [
25            "ticker",
26            "date",
27            "open",
28            "high",
29            "low",
30            "close",
31            "volume",
32            "dividend",
33            "split",
34        ]
35    )
36
37    query_params = [
38        ("date.gte", DATA_START_DATE),
39        ("api_key", api_key),
40        ("qopts.export", "true"),
41        ("qopts.columns", columns),
42    ]
43    return f"{DATALINK_DATA_URL}?{urlencode(query_params)}"
44
45
46def fetch_download_link(table_url, max_download_tries=MAX_DOWNLOAD_TRIES):
47    log.info(f"Attempting to fetch download link with ...")
48
49    status = None
50    cnt = 0
51
52    while status != "fresh" and cnt < max_download_tries:
53        log.info(f"Fetching download link...")
54        try:
55            resp = requests.get(table_url)
56            resp.raise_for_status()
57        except:
58            log.info("Failed to get download link from Quandl")
59
60        payload = resp.json()
61
62        status = payload["datatable_bulk_download"]["file"]["status"]
63
64        if status == "fresh":
65            link = payload["datatable_bulk_download"]["file"]["link"]
66            log.info(f"Status is {status}. Returning download link: {link}")
67            return link
68
69        log.info(f"Status is {status}. Retrying in 10 seconds...")
70
71        time.sleep(10)
72
73
74def load_data_table(file, index_col=None):
75    """Load data table from zip file provided by Quandl."""
76    with ZipFile(file) as zip_file:
77        file_names = zip_file.namelist()
78        assert len(file_names) == 1, "Expected a single file from Quandl."
79        eod_prices = file_names.pop()
80        with zip_file.open(eod_prices) as table_file:
81            log.info("Parsing raw data.")
82            data_table = pd.read_csv(
83                table_file,
84                header=0,
85                names=[
86                    "ticker",
87                    "date",
88                    "open",
89                    "high",
90                    "low",
91                    "close",
92                    "volume",
93                    "dividend",
94                    "split",
95                ],
96                parse_dates=["date"],
97                index_col=index_col,
98                usecols=[
99                    "ticker",
100                    "date",
101                    "open",
102                    "high",
103                    "low",
104                    "close",
105                    "volume",
106                    "dividend",
107                    "split",
108                ],
109            ).rename(
110                columns={
111                    "ticker": "symbol",
112                    "dividend": "ex_dividend",
113                    "split": "split_ratio",
114                }
115            )
116
117    return data_table
118
119
120def fetch_data_table(api_key):
121    """Fetch WIKI Prices data table from Quandl"""
122    log.info(f"Fetching data table...")
123
124    table_url = format_metadata_url(api_key)
125    download_link = fetch_download_link(table_url)
126    raw_file = download_with_progress(download_link, chunk_size=ONE_MEGABYTE)
127
128    return load_data_table(file=raw_file)
129
130
131def gen_asset_metadata(data, show_progress):
132    if show_progress:
133        log.info("Generating asset metadata.")
134
135    data = data.groupby(by="symbol").agg({"date": ["min", "max"]})
136    data.reset_index(inplace=True)
137    data["start_date"] = data.date.min(axis=1)
138    data["end_date"] = data.date.max(axis=1)
139    del data["date"]
140    data.columns = data.columns.get_level_values(0)
141
142    data["exchange"] = "QUOTEMEDIA"
143    data["auto_close_date"] = data["end_date"].values + pd.Timedelta(days=1)
144    return data
145
146
147def parse_splits(data, show_progress):
148    if show_progress:
149        log.info("Parsing split data.")
150
151    data["split_ratio"] = 1.0 / data.split_ratio
152    data.rename(
153        columns={"split_ratio": "ratio", "date": "effective_date"},
154        inplace=True,
155        copy=False,
156    )
157    return data
158
159
160def parse_dividends(data, show_progress):
161    if show_progress:
162        log.info("Parsing dividend data.")
163
164    data["record_date"] = data["declared_date"] = data["pay_date"] = pd.NaT
165    data.rename(
166        columns={"ex_dividend": "amount", "date": "ex_date"}, inplace=True, copy=False
167    )
168    return data
169
170
171def parse_pricing_and_vol(data, sessions, symbol_map):
172    for asset_id, symbol in iteritems(symbol_map):
173        asset_data = (
174            data.xs(symbol, level=1).reindex(sessions.tz_localize(None)).fillna(0.0)
175        )
176        yield asset_id, asset_data
177
178
179def daily_us_equities_bundle(
180    environ,
181    asset_db_writer,
182    minute_bar_writer,
183    daily_bar_writer,
184    adjustment_writer,
185    calendar,
186    start_session,
187    end_session,
188    cache,
189    show_progress,
190    output_dir,
191):
192    """
193    daily_us_equities_bundle builds a daily dataset using Quotemedia
194    end of day equities data. For more information on the Quotemedia
195    data see here: https://data.nasdaq.com/databases/EOD
196    """
197    api_key = environ.get("DATALINK_API_KEY")
198    if api_key is None:
199        raise ValueError(
200            "Please set your DATALINK_API_KEY environment variable and retry."
201        )
202
203    raw_data = fetch_data_table(api_key)
204
205    start_session, end_session = raw_data.date.min(), raw_data.date.max()
206    asset_metadata = gen_asset_metadata(raw_data[["symbol", "date"]], show_progress)
207
208    exchanges = pd.DataFrame(
209        data=[["QUOTEMEDIA", "QUOTEMEDIA", "US"]],
210        columns=["exchange", "canonical_name", "country_code"],
211    )
212    asset_db_writer.write(equities=asset_metadata, exchanges=exchanges)
213
214    symbol_map = asset_metadata.symbol
215    sessions = calendar.sessions_in_range(start_session, end_session)
216
217    raw_data.set_index(["date", "symbol"], inplace=True)
218    daily_bar_writer.write(
219        parse_pricing_and_vol(raw_data, sessions, symbol_map),
220        show_progress=show_progress,
221    )
222
223    raw_data.reset_index(inplace=True)
224    raw_data["symbol"] = raw_data["symbol"].astype("category")
225    raw_data["sid"] = raw_data.symbol.cat.codes
226    adjustment_writer.write(
227        splits=parse_splits(
228            raw_data[["sid", "date", "split_ratio"]].loc[raw_data.split_ratio != 1],
229            show_progress=show_progress,
230        ),
231        dividends=parse_dividends(
232            raw_data[["sid", "date", "ex_dividend"]].loc[raw_data.ex_dividend != 0],
233            show_progress=show_progress,
234        ),
235    )
236
237
238def download_with_progress(url, chunk_size, **progress_kwargs):
239    """
240    Download streaming data from a URL, printing progress information to the
241    terminal.
242    Parameters
243    ----------
244    url : str
245        A URL that can be understood by ``requests.get``.
246    chunk_size : int
247        Number of bytes to read at a time from requests.
248    **progress_kwargs
249        Forwarded to click.progressbar.
250    Returns
251    -------
252    data : BytesIO
253        A BytesIO containing the downloaded data.
254    """
255    resp = requests.get(url, stream=True)
256    resp.raise_for_status()
257
258    total_size = int(resp.headers["content-length"])
259    data = BytesIO()
260    with progressbar(length=total_size, **progress_kwargs) as pbar:
261        for chunk in resp.iter_content(chunk_size=chunk_size):
262            data.write(chunk)
263            pbar.update(len(chunk))
264
265    data.seek(0)
266    return data

The format_metadata_url function constructs the URL for querying Nasdaq Data Link based on a provided API key and selects specific columns of data to retrieve, including ticker information, date, and price metrics.

The fetch_download_link function attempts to retrieve the actual data download link from Nasdaq Data Link. This link is dynamic and can change, so the function continually checks the status of the data until it is ready for download. If the data isn't ready after a certain number of tries (defined by MAX_DOWNLOAD_TRIES), the function waits for a set interval before trying again.

load_data_table extracts and processes data from a downloaded ZIP file. It assumes the ZIP file contains a single CSV file, from which data is read into a Pandas DataFrame. The columns are renamed to be compatible with Zipline's naming conventions.

In the fetch_data_table function, the data table is fetched by constructing the appropriate metadata URL and then downloading the data, leveraging the previously mentioned functions.

Subsequent functions like gen_asset_metadata, parse_splits, parse_dividends, and parse_pricing_and_vol provide parsing and transformation capabilities to process the raw data into a format suitable for Zipline. They generate asset metadata, handle stock split and dividend data, and parse pricing and volume data, respectively.

The core function, daily_us_equities_bundle, integrates all the functionalities to fetch and prepare the QuoteMedia End of Day US Stock Prices dataset for Zipline's consumption. It checks for the required API key, fetches the raw data table, processes it, and writes the formatted data to disk. This function is the primary interface that a user or system might call to get Quandl data into Zipline's bundle format.

Lastly, the download_with_progress functions facilitate the actual data download. The function provides a visual progress bar for tracking download progress. It returns the downloaded data as a BytesIO object, making it easier to subsequently process or store the data.