Real-time Streaming of Options Trades Data

In today’s tutorial we investigate how you can use ThetaData’s API to use real-time streaming and historical options trade level data for your individual analysis.

import os
import sys
import pickle
import numpy as np
import pandas as pd

from datetime import timedelta, datetime, date
from thetadata import ThetaClient, OptionReqType, OptionRight, DateRange, DataType, StockReqType
from thetadata import MessageType, TradeCondition
from thetadata import StreamMsg, StreamMsgType

Thetadata API account details. Here I have saved them into my environmental variables, I recommend you do the same.

your_username = os.environ['thetadata_username']
your_password = os.environ['thetadata_password']

Real-time Intraday Streaming

The first step is to create a callback funciton which is called everytime a new message arrives from a given stream. This can then be tailored to print, append and analysis the trade/open interest level data as you see fit. For now we will just print the trade level information: contract specifics, trade information and then the last closest bid/ask quote at time of trade.

# User generated method that gets called each time a message from the stream arrives.
def callback(msg: StreamMsg):
    msg.type = msg.type

    if msg.type == StreamMsgType.TRADE:
        print('---------------------------------------------------------------------------')
        print('con:                         ' + msg.contract.to_string())
        print('trade:                       ' + msg.trade.to_string())
        print('last quote at time of trade: ' + msg.quote.to_string())

Now we need to establish a ThetaData Client Class, connect the callabck funciton to our stream and then we will use the function req_full_trade_stream_opt() which subscribes us to every option trade.

client = ThetaClient(username=your_username, passwd=your_password)
client.connect_stream(callback)
client.req_full_trade_stream_opt()  # Subscribes to every option trade.

To then unsubscribe from all streams, we need to run the following function .remove_full_trade_stream_opt(), and to close the stream all together we call close_stream()

client.remove_full_trade_stream_opt()  # Unsubscribes from the full option trade stream.
client.close_stream()

Following Contracts for a Specific Underlying

Get AMZN Option Expiry Dates

Our first goal is to get all expirations for a specific underlying such as Amazon, root ticker AMZN. Then we make requests to API for all Contracts by Expiry Dates.

def get_expirations(root_ticker) -> pd.DataFrame:
    """Request expirations from a particular options root"""
    # Create a ThetaClient
    client = ThetaClient(username=your_username, passwd=your_password, jvm_mem=4, timeout=15)

    # Connect to the Terminal
    with client.connect():

        # Make the request
        data = client.get_expirations(
            root=root_ticker,
        )

    return data

root_ticker = 'AMZN'
expirations = get_expirations(root_ticker)
expirations

Market-Makers are not forced to show Quotes on all options!

There are rules listed for each Exchange that market makers must abide by. For Example on the NASDAQ where AMZN trades here are the rules

Specifically there is a large difference between the obligation of a Competitive Market Maker and the Primary Market Makers for a particular options series. This is notable in whether they need to present two-sided quotes on Non-standard options like weekly or quarterly expiry options and adjusted options.

To be safe here, we will only want to return option contracts with ‘standard’ option expires. These expire on the Saturday following the third Friday of the month, and some have the expiry date as the Third friday of the month, but in the past were recorded as the Saturday. Therefore we need to find the intersection of all the expiries that Thetadata has options data for and the 3rd Fridays and the following Saturday dates for every month since Jun-2021.

trading_days = pd.date_range(start=datetime(2023,1,24),end=datetime(2024,12,31),freq='B')
# The third friday in every month
contracts = pd.date_range(start=datetime(2023,1,24),end=datetime(2024,12,31),freq='WOM-3FRI')
# Find contract expiries that match with ThetaData expiries 
mth_expirations = [exp for exp in expirations if exp in contracts]
# Convert from python list to pandas datetime
mth_expirations = pd.to_datetime(pd.Series(mth_expirations))

mth_expirations

Get all Strikes for each AMZN Option Expiry

Let’s get all the strikes for each option contract expiry and build up a dictionary and pickle this data for future use.

def get_strikes(root_ticker, expiration_dates) -> pd.DataFrame:
    """Request strikes from a particular option contract"""
    # Create a ThetaClient
    client = ThetaClient(username=your_username, passwd=your_password, jvm_mem=4, timeout=15)
    
    all_strikes = {}

    # Connect to the Terminal
    with client.connect():
        
        for exp_date in expiration_dates:
        
            # Make the request
            data = client.get_strikes(
                root=root_ticker,
                exp=exp_date
            )
            
            all_strikes[exp_date] = pd.to_numeric(data)
            

    return all_strikes


root_ticker = 'AMZN'

all_strikes = get_strikes(root_ticker, mth_expirations)

with open('strikes.pkl', 'wb') as f:
    pickle.dump(all_strikes, f)

To then use this dictionary in the future we can now read/load this information from pickle.

with open('strikes.pkl', 'rb') as f:
    all_strikes = pickle.load(f)
    
print("Option Contract: ", mth_expirations[0])
print("AMZN Strike ", all_strikes[mth_expirations[0]][13])

Real-time Streaming of AMZN Contracts

Now we can create a ThetaData client, connect the same callback function to print trades and open interest at time of the trade to console. We then add all specific contracts for AMZN by looping through our dictionary for expiry, strike and option type combinations.

client = ThetaClient(username=your_username, passwd=your_password)
client.connect_stream(callback)

root_ticker = 'AMZN'
opt_types=["P", "C"]
for opt_type in opt_types:
    for expiry in mth_expirations:
        strikes = all_strikes[expiry]
        for strike in strikes:
            # add specific contract to required trade stream using req_trade_stream_opt()
            client.req_trade_stream_opt(root_ticker, expiry.date(), strike, OptionRight.CALL if opt_type=="C" else OptionRight.PUT)

To close the stream, we can then use the client.close_stream().

Combine Real-time Data to Pandas DataFrame

So being able to see or print the trade information to console is all well and good, however we would like to store this information and maybe do something with it. Therefore let’s combine this Real-time data into a Pandas DataFrame. Let’s first create a global parameter that will be the dictionary for each individual option contract we are interested in subscribing to with ThetaData.

We will then for each individual contract, create an empty pandas dataframe with the infromation (columns) that we are interested in storing – for the moment that will be trade data: ms_of_day, sequence, size, condition, price, and date.

This so happens to align with historical data calls from the ThetaData API which will be useful for analysis later.

global trades_data
trades_data = {}

opt_types = ["P", "C"]
for expiry in mth_expirations:
    trades_data[expiry] = {}
    strikes = all_strikes[expiry]
    for strike in strikes:
        trades_data[expiry][strike] = {}
        for opt_type in opt_types:
            trades_data[expiry][strike][opt_type] = pd.DataFrame(
                columns = ['ms_of_day','sequence','size','condition','price','date']
            )

Now that we have created an empty dataframe for each contract, it’s time to create a callback function that will update the pandas dataframe of a contract if there has been a new trade.

def build_trades_data(msg: StreamMsg):
    msg.type = msg.type

    if msg.type == StreamMsgType.TRADE:
        print('---------------------------------------------------------------------------')
        print('trade:                       ' + msg.trade.to_string())
        # Set up expiry, strike and opt_type for easy reference in dataframe
        expiry = datetime(msg.contract.exp.year, msg.contract.exp.month, msg.contract.exp.day)
        strike = msg.contract.strike
        opt_type = "C" if msg.contract.isCall else "P"
        
        trades_data[expiry][strike][opt_type] = pd.concat([
            trades_data[expiry][strike][opt_type],
                    pd.DataFrame({'ms_of_day': msg.trade.ms_of_day,
                     'sequence': msg.trade.sequence,
                     'size': msg.trade.size,
                     'condition': str(msg.trade.condition).replace('TradeCondition.', ''),
                     'price': msg.trade.price,
                     'date': msg.trade.date}, index=[msg.trade.sequence])
            ], ignore_index = False)

Let’s then run the same ThetaData client, connect the new callback function called build_trades_data and then add all contracts for AMZN we are interested in!

client = ThetaClient(username=your_username, passwd=your_password)
client.connect_stream(build_trades_data)
# add specific contract to required trade stream using req_trade_stream_opt()
root_ticker = 'AMZN'
opt_types=["P", "C"]
for opt_type in opt_types:
    for expiry in mth_expirations:
        strikes = all_strikes[expiry]
        for strike in strikes:
            client.req_trade_stream_opt(root_ticker, expiry.date(), strike, OptionRight.CALL if opt_type=="C" else OptionRight.PUT)

There you have it, we are constantly storing the latest trades data (intraday) for all options contracts on a particular underlying contract. We can now perform intraday analysis in real-time.