diff --git a/__pycache__/utils.cpython-313.pyc b/__pycache__/utils.cpython-313.pyc new file mode 100644 index 0000000..d01e6cf Binary files /dev/null and b/__pycache__/utils.cpython-313.pyc differ diff --git a/main.py b/main.py index e5d5a0a..cf8f2df 100644 --- a/main.py +++ b/main.py @@ -1,23 +1,20 @@ -import sys +from utils import load_config, create_mqttc +from tastytrade import OAuthSession +from loguru import logger +from producers.sp500 import get_spx_market, stream_spx_greeks +import asyncio import json -import tomllib -import paho.mqtt.client as paho -import time -from utils import load_config, authenticate - config = load_config() -#client = paho.Client(paho.CallbackAPIVersion.VERSION2) -#client.username_pw_set(username=config.mqtt.username,password=config.mqtt.password) +session = OAuthSession(config.tasty_trade.client_secret,config.tasty_trade.refresh_token) +logger.info("Tasty Trade session instantiated") -#if client.connect("proliant.lan", 1883, 60) != 0: - #print("Couldn't connect to the mqtt broker") - #sys.exit(1) +async def main(): + functions = [get_spx_market, stream_spx_greeks] + tasks = [] + for function in functions: + task = asyncio.create_task(function(session)) + tasks.append(task) + for task in tasks: + await task - -#client.publish("SP500", json.dumps({"value":6349}), 0) -#time.sleep(0.5) - - -#client.disconnect() - -authenticate() +asyncio.run(main()) diff --git a/producers/__pycache__/sp500.cpython-313.pyc b/producers/__pycache__/sp500.cpython-313.pyc new file mode 100644 index 0000000..b0b839a Binary files /dev/null and b/producers/__pycache__/sp500.cpython-313.pyc differ diff --git a/producers/sp500.py b/producers/sp500.py index e69de29..f5f77b3 100644 --- a/producers/sp500.py +++ b/producers/sp500.py @@ -0,0 +1,57 @@ +import asyncio +from tastytrade.dxfeed import Quote, Greeks +from datetime import date, timedelta +from tastytrade import instruments,DXLinkStreamer +from loguru import logger +from tastytrade.instruments import get_option_chain +import json +from utils import create_mqttc, load_config, parse_greek_event +market_subs_list = ["SPX"] # your subscription list, format per tastytrade docs +config = load_config() +async def get_spx_market(session): + mqttc = create_mqttc(config) + async with DXLinkStreamer(session) as streamer: + await streamer.subscribe(Quote, market_subs_list) + logger.info("Subscribed, streaming quotes... Press Ctrl+C to stop.") + try: + while True: + quote = await streamer.get_event(Quote) + mqttc.publish("SP500",str((quote.ask_price+quote.bid_price)/2)) + # or process/yield the quote as needed + except asyncio.CancelledError: + logger.info("Streaming cancelled.") + + + + + +async def stream_spx_greeks(session, days_out=30): + mqttc = create_mqttc(config) + # Fetch option chain (blocking → thread) + chain = await asyncio.to_thread(get_option_chain, session, "SPX") + + cutoff = date.today() + timedelta(days=days_out) + option_symbols = [ + opt.streamer_symbol + for exp_date, opts in chain.items() + if exp_date <= cutoff + for opt in opts + ] + + logger.info(f"Subscribing to {len(option_symbols)} SPX option contracts...") + + async with DXLinkStreamer(session) as streamer: + # Here we pass the list as a single argument (no unpacking, no loop) + await streamer.subscribe(Greeks, option_symbols) + + async for greek_event in streamer.listen(Greeks): + logger.info(f"Greek Event: {greek_event}") + mqttc.publish("SP500/oc",json.dumps(parse_greek_event(greek_event))) + +def create_IV_histogram(mqttc): + mqttc.subscribe([("SP500",0),("SP500/oc",0)]) + + + + + diff --git a/rss-feed-notif/config.toml b/rss-feed-notif/config.toml new file mode 100644 index 0000000..804a177 --- /dev/null +++ b/rss-feed-notif/config.toml @@ -0,0 +1,5 @@ +[mqtt] +user = "mqttuser" +password = "MWkWG42PX8Be5j87Q3Rk" +[rss-feeds] +nyt-feed = 'https://rss.nytimes.com/services/xml/rss/nyt/World.xml' diff --git a/rss-feed-notif/main.py b/rss-feed-notif/main.py new file mode 100644 index 0000000..e69de29 diff --git a/rss-feed-notif/rss-feed.py b/rss-feed-notif/rss-feed.py new file mode 100644 index 0000000..4b763d6 --- /dev/null +++ b/rss-feed-notif/rss-feed.py @@ -0,0 +1,23 @@ +from typing import List +import feedparser +import tomllib +import json +with open("config.toml","rb") as file: + config = tomllib.load(file) +print(config) + +previous_msgs = set() + +def read_rss_feeds(client): + for feed_name,feed_link in config["rss-feeds"].items(): + feed = feedparser.parse(feed_link) + for feed_post in feed.entries: + if feed_post.title in previous_msgs: + continue + else: + previous_msgs.add(feed_post.title) + notif_json = {"title":feed_post.title,"body":feed_post.description,"topic":"overlordian-news-192","link":feed_post.link} + client.publish("notification-pusher",json.dumps(notif_json),0) + + + diff --git a/utils.py b/utils.py index 2695913..a9c47d3 100644 --- a/utils.py +++ b/utils.py @@ -1,5 +1,10 @@ from typing import NamedTuple import tomllib +import tastytrade +import re +import paho.mqtt.client as mqtt +from datetime import datetime +from loguru import logger def dict_to_namedtuple(name, dictionary): fields = {} for key, value in dictionary.items(): @@ -15,75 +20,48 @@ def load_config(): config_dict = tomllib.load(f) config = dict_to_namedtuple("Config", config_dict) return config -import webbrowser -import requests -import threading -from flask import Flask, request -def authenticate(client_id, client_secret, scopes=None, port=5000): - """ - Opens the browser for OAuth authorization and returns token data. - - Args: - client_id (str): Your Tastytrade client ID - client_secret (str): Your Tastytrade client secret - scopes (list[str]): Optional list of scopes (default: None) - port (int): Port for local callback server (default: 5000) - - Returns: - dict: Token response JSON (access_token, refresh_token, etc.) - """ - redirect_uri = f"http://127.0.0.1:{port}/callback" - auth_url = "https://api.tastytrade.com/oauth/authorize" - token_url = "https://api.tastytrade.com/oauth/token" - - app = Flask(__name__) - auth_code_container = {} - - @app.route("/callback") - def callback(): - code = request.args.get("code") - auth_code_container["code"] = code - return "Authorization successful. You may close this window." - - # Start local server in background - def run_server(): - app.run(port=port, debug=False, use_reloader=False) - threading.Thread(target=run_server, daemon=True).start() - - # Build auth URL - params = { - "response_type": "code", - "client_id": client_id, - "redirect_uri": redirect_uri +def parse_event_symbol(symbol: str): + symbol = symbol.lstrip('.') + pattern = r"([A-Z]+)(\d{6})([CP])(\d+)" + match = re.match(pattern, symbol) + if not match: + return None + underlying, exp_str, opt_type, strike_str = match.groups() + expiration = datetime.strptime(exp_str, "%y%m%d").date() + strike = int(strike_str) / 10 + return { + "underlying": underlying, + "expiration": expiration.isoformat(), + "option_type": opt_type, + "strike": strike, } - if scopes: - params["scope"] = " ".join(scopes) +def create_mqttc(config): + mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) + mqttc.username_pw_set(username=config.mqtt.user,password=config.mqtt.password) + mqttc.connect("proliant.lan",port=1883) + logger.info("MQTT client instantiated") + return mqttc - # Open browser - webbrowser.open(f"{auth_url}?{requests.compat.urlencode(params)}") +def parse_greek_event(greek_event): + # parse symbol details + parsed_symbol = parse_event_symbol(greek_event.event_symbol) - # Wait for code - print("Waiting for authorization...") - while "code" not in auth_code_container: - pass - - # Exchange code for token - token_resp = requests.post(token_url, data={ - "grant_type": "authorization_code", - "code": auth_code_container["code"], - "redirect_uri": redirect_uri, - "client_id": client_id, - "client_secret": client_secret - }) - token_resp.raise_for_status() - - return token_resp.json() - -# Example usage -if __name__ == "__main__": - client_id = "YOUR_CLIENT_ID" - client_secret = "YOUR_CLIENT_SECRET" - token_data = authenticate(client_id, client_secret, scopes=["accounts", "orders"]) - print(token_data) + # combine everything into one dict + event_dict = { + **parsed_symbol, + "event_time": greek_event.event_time, + "event_flags": greek_event.event_flags, + "index": greek_event.index, + "time": greek_event.time, + "sequence": greek_event.sequence, + "price": float(greek_event.price), + "volatility": float(greek_event.volatility), + "delta": float(greek_event.delta), + "gamma": float(greek_event.gamma), + "theta": float(greek_event.theta), + "rho": float(greek_event.rho), + "vega": float(greek_event.vega), + } + return event_dict