Files
TheOracle/producers/sp500.py
2025-08-30 22:32:21 -05:00

58 lines
1.9 KiB
Python

import asyncio
from tastytrade.dxfeed import Quote, Greeks
from datetime import date, timedelta
from tastytrade import instruments,DXLinkStreamer
from loguru import logger
from tastytrade.instruments import get_option_chain
import json
from utils import create_mqttc, load_config, parse_greek_event
market_subs_list = ["SPX"] # your subscription list, format per tastytrade docs
config = load_config()
async def get_spx_market(session):
mqttc = create_mqttc(config)
async with DXLinkStreamer(session) as streamer:
await streamer.subscribe(Quote, market_subs_list)
logger.info("Subscribed, streaming quotes... Press Ctrl+C to stop.")
try:
while True:
quote = await streamer.get_event(Quote)
mqttc.publish("SP500",str((quote.ask_price+quote.bid_price)/2))
# or process/yield the quote as needed
except asyncio.CancelledError:
logger.info("Streaming cancelled.")
async def stream_spx_greeks(session, days_out=30):
mqttc = create_mqttc(config)
# Fetch option chain (blocking → thread)
chain = await asyncio.to_thread(get_option_chain, session, "SPX")
cutoff = date.today() + timedelta(days=days_out)
option_symbols = [
opt.streamer_symbol
for exp_date, opts in chain.items()
if exp_date <= cutoff
for opt in opts
]
logger.info(f"Subscribing to {len(option_symbols)} SPX option contracts...")
async with DXLinkStreamer(session) as streamer:
# Here we pass the list as a single argument (no unpacking, no loop)
await streamer.subscribe(Greeks, option_symbols)
async for greek_event in streamer.listen(Greeks):
logger.info(f"Greek Event: {greek_event}")
mqttc.publish("SP500/oc",json.dumps(parse_greek_event(greek_event)))
def create_IV_histogram(mqttc):
mqttc.subscribe([("SP500",0),("SP500/oc",0)])