import asyncio from tastytrade.dxfeed import Quote, Greeks from datetime import date, timedelta from tastytrade import instruments,DXLinkStreamer from loguru import logger from tastytrade.instruments import get_option_chain import json from utils import create_mqttc, load_config, parse_greek_event market_subs_list = ["SPX"] # your subscription list, format per tastytrade docs config = load_config() async def get_spx_market(session): mqttc = create_mqttc(config) async with DXLinkStreamer(session) as streamer: await streamer.subscribe(Quote, market_subs_list) logger.info("Subscribed, streaming quotes... Press Ctrl+C to stop.") try: while True: quote = await streamer.get_event(Quote) mqttc.publish("SP500",str((quote.ask_price+quote.bid_price)/2)) # or process/yield the quote as needed except asyncio.CancelledError: logger.info("Streaming cancelled.") async def stream_spx_greeks(session, days_out=30): mqttc = create_mqttc(config) # Fetch option chain (blocking → thread) chain = await asyncio.to_thread(get_option_chain, session, "SPX") cutoff = date.today() + timedelta(days=days_out) option_symbols = [ opt.streamer_symbol for exp_date, opts in chain.items() if exp_date <= cutoff for opt in opts ] logger.info(f"Subscribing to {len(option_symbols)} SPX option contracts...") async with DXLinkStreamer(session) as streamer: # Here we pass the list as a single argument (no unpacking, no loop) await streamer.subscribe(Greeks, option_symbols) async for greek_event in streamer.listen(Greeks): logger.info(f"Greek Event: {greek_event}") mqttc.publish("SP500/oc",json.dumps(parse_greek_event(greek_event))) def create_IV_histogram(mqttc): mqttc.subscribe([("SP500",0),("SP500/oc",0)])