Capturing Stock Prices into a Local Database

Goal

My aim is to load daily stock prices into a local database. First, I plan to automatically set up the load into the SQL database. This part will be the step that loads the data as strings into the database. 

This post will go over the steps on how I'm capturing that data. The API service that is being used, the SQL Server table structure and the loading of data from the API call into the database using python.

Tools and Services used




API Service Chosen

After looking at a few options I've chosen to use IEX Cloud's Apperate platform for stock and ETF daily price history. It also has some other nice data sets that I will also be using at some point. 

API Call Response Structure

The call is retrieving daily prices. It has a few parameters, the ones I typically use for the call are the symbol and day range. 

[
{
"close": 0.99,
"fclose": 0.99,
"fhigh": 0.99,
"flow": 0.99,
"fopen": 0.99,
"fvolume": 0,
"high": 0.99,
"low": 0.99,
"open": 0.99,
"priceDate": "2023-01-12",
"symbol": "BTG-AO",
"uclose": 0.99,
"uhigh": 0.99,
"ulow": 0.99,
"uopen": 0.99,
"uvolume": 0,
"volume": 0,
"id": "HISTORICAL_PRICES",
"key": "BTG-AO",
"subkey": "",
"date": 1673481600000,
"updated": 1673503202000
}
]

Database Table Structure

For the purposes of simply loading the data the table consists of all strings for column types for the data points being uploaded. The other columns added are meta data used for traceability.


CREATE TABLE [APPERATE].[Historical_Price_Load]
(
[HistoricalPriceId] BIGINT IDENTITY(1,1) NOT NULL PRIMARY KEY,
[index] NVARCHAR(250),
Ticker NVARCHAR(250),
Close_ NVARCHAR(250),
F_Close NVARCHAR(250),
F_High NVARCHAR(250),
F_Low NVARCHAR(250),
F_Open NVARCHAR(250),
F_Volume NVARCHAR(250),
High_ NVARCHAR(250),
Low_ NVARCHAR(250),
Open_ NVARCHAR(250),
PriceDate NVARCHAR(250),
Symbol NVARCHAR(250),
U_Close NVARCHAR(250),
U_High NVARCHAR(250),
U_Low NVARCHAR(250),
U_Open NVARCHAR(250),
U_Volume NVARCHAR(250),
Volume NVARCHAR(250),
Id NVARCHAR(250),
Key_ NVARCHAR(250),
Subkey NVARCHAR(250),
Date_ NVARCHAR(250),
Updated NVARCHAR(250),
EntityId BIGINT NOT NULL,
Load_Date DATE NOT NULL,
Load_Time DATETIME2 NOT NULL,
Source_ NVARCHAR(500) NOT NULL, 
Source_Dataset NVARCHAR(255) NOT NULL,
Source_API_Call NVARCHAR(2000) NOT NULL
)

Some things to take a note of. 

  • The schema APPERATE is created because that is the name of service the data is coming from putting grouping all tables and procedures used in association with that service under the same schema allows for easy convention structure to quickly know about those database objects.
  • All the incoming data from the API Call is stored as strings so that it reduces the chances of running into an error when loading in the data into the database. 
  • This is only the first step of loading in the data. It will be converted into the actual data types like Dates and Decimals in another step.
  • The other columns that are for keeping meta data about the data being loaded.


Loading Data with Python

Using Python is a quick and easy way to set up a script to pull data from the API Call and load into a SQL Database.


First thing I created a Python script to handle the connection to the database. This is in its own file.


import sqlalchemy as sa
import urllib as ur

def get_sql_engine(db):
    conn_str = ""

    if db == "StockTradingConfig":
        conn_str = "DRIVER={ODBC Driver 17 for SQL Server};"
        "SERVER={Computer_Name}\SQLSERVER2019;"
          "DATABASE=StockTradingConfig;UID={User_Id};PWD={User_Password}"
    elif db == "StockTradingStage":
        conn_str = "DRIVER={ODBC Driver 17 for SQL Server};"
        "SERVER={Computer_Name}\SQLSERVER2019;"
        "DATABASE=StockTradingStage;UID={User_Id};PWD={User_Password}"

    params = ur.parse.quote_plus(conn_str)
    return sa.create_engine('mssql+pyodbc:///?odbc_connect={}'.format(params))


Next is the Python file to do the API call then do some slight transformation and load into the SQL Database.


import sql_db as db
import requests as req
import pandas as pd
import Config as cf
import time
from datetime import datetime as date

# database name
stage_database = "StockTradingStage"
source = "IEX Cloud Apperate"

# column names that the pandas columns need
# to be changed into to match the database
transformed_columns = [
    "Close_", "F_Close", "F_High",
    "F_Low", "F_Open", "F_volume",
    "High_", "Low_", "Open_",
    "PriceDate", "Symbol", "U_Close",
    "U_High", "U_Low", "U_Open",
    "U_Volume", "Volume", "Id",
    "Key_", "Subkey", "Date_",
    "Updated"
]

# gets list of ETF's like SPY and loads prices for each of those
def load_historical_etf_prices(options):
    etf_frame = cf.get_market_etf_frame()

    for i, row in etf_frame.iterrows():
        entity_id = row['EntityId']
        ticker = row['Ticker']
        print(f"Loading ETF Price Data for Ticker: {ticker}")
        load_historical_price_data(entity_id, ticker, options, False)
        time.sleep(10)

    return None

# gets list of sotocks like AAPL and loads prices for each of those
def load_historical_stock_prices(options):
    stock_frame = cf.get_company_frame()

    for i, row in stock_frame.iterrows():
        entity_id = row['EntityId']
        ticker = row['Ticker']
        print(f"Loading Stock Price Data for Ticker: {ticker}")
        load_historical_price_data(entity_id, ticker, options, False)
        time.sleep(10)

    return None


# creates the api call string
def get_historical_price_call_string(symbol, options):
    if 'last' not in options.keys():
        options['last'] = 1

    if 'useRange' not in options.keys():
        options['useRange'] = False

    if 'range' not in options.keys():
        options['range'] = '7d'

    if options['useRange'] == False:
        return f"https://cloud.iexapis.com/v1/data/CORE/HISTORICAL_PRICES
        /{symbol}?last={options['last']}&token={your_key}"
    elif options['useRange'] == True:
        return f"https://cloud.iexapis.com/v1/data/CORE/HISTORICAL_PRICES
        /{symbol}?last={options['last']}&range={options['range']}
        &token={your_key}"

    return None

# makes api call for stock price, transforms and loads into database
def load_historical_price_data(entity_id, symbol, options, display = False):
    engine = db.get_sql_engine(stage_database)
    call = get_historical_price_call_string(symbol, options)

    if display == True:
        print(f"DB Engine: {engine} \n")
        print(f"API Call: {call} \n")

    response = req.request("GET", call)

    if display == True:
        print(f"Status Code: {response.status_code} \n")

    # transform to dataframe
    frame = pd.DataFrame(response.json())

    frame.columns = transformed_columns
    frame.insert(0, 'Ticker', f'{symbol}')
    frame['EntityId'] = entity_id
    frame['Load_Date'] = date.now()
    frame['Load_Time'] = date.now()
    frame['Source_'] = source
    frame['Source_Dataset'] = 'core.HISTORICAL_PRICES'
    frame['Source_API_Call'] = call

    # load into database
    frame.to_sql(name = 'Historical_Price_Load',
        con = engine, schema = 'APPERATE', if_exists = 'append')


    return frame




Testing Logic with a Jupyter Lab Notebook

Jupyter lab is a fantastic tool for data science. It's also great just for testing Python scripts and classes to get them working. 

First a section is created for loading in the libraries.

import importlib as imp
import Config as cf
import pandas as pd
import Price_Loads as pl
imp.reload(cf)
imp.reload(pl)

Next create the variables needed to pass into the method. The entity_id is a reference to a record generated in the local database, it's not something passed to the API call, the ticker is passed to the API call. The date range default is '7d' (7 days), in this case i specified to call all of 2022 for the QQQ ticker.


entity_id, ticker = 53, 'QQQ'
opts = { 'last': 400, 'useRange': True, 'range': '2022' } #'7d' is default


The method that does the call and loads into the database, also it returns the data as a dataframe.

df = pl.load_historical_price_data(entity_id, ticker, opts, True)

Viewing the dataframe.

df.head()





Create Automated Task to do the Upload

Next thing I want to do is create a task that will run the python script and do the data load automatically.


First, I'm creating a Python file that will run the logic to do the data load.


import Price_Loads as pl

opts = { 'last': 10, 'useRange': True, 'range': '7d'  }

pl.load_historical_etf_prices(opts)


Next, a .bat file needs to be created to do the execution of the python script above. Notepad can be used just and saved as a .bat file. It needs the full file path of where the Python.exe is and the full path of the python file that is to be run.

"C:/Users/{UserName}/AppData/Local/Microsoft/WindowsApps/python3.9.exe" "A:\GIT\Finance\IEX_Cloud\ETC_Price_Run.py"




Next in windows, the Task Scheduler is used for the automation. 




I've set the schedule to daily.



Next the action, "Start a program" is selected.



Finally, the Program/Script just need to be the full file path of the .bat file. 


Summary

This is the method used to load daily ETF and Stock price data into a local SQL Database. In a future post the method to do the step to do data transformation in the database will be gone over.




Comments

Popular posts from this blog

Capturing Stock Market Index Prices into a Local Database