68 lines
2.1 KiB
Python
68 lines
2.1 KiB
Python
from typing import NamedTuple
|
|
import tomllib
|
|
import tastytrade
|
|
import re
|
|
import paho.mqtt.client as mqtt
|
|
from datetime import datetime
|
|
from loguru import logger
|
|
def dict_to_namedtuple(name, dictionary):
|
|
fields = {}
|
|
for key, value in dictionary.items():
|
|
if isinstance(value, dict):
|
|
fields[key] = dict_to_namedtuple(key.capitalize(), value)
|
|
else:
|
|
fields[key] = value
|
|
return NamedTuple(name, [(k, type(v)) for k, v in fields.items()])(**fields)
|
|
|
|
|
|
def load_config():
|
|
with open("creds.toml", "rb") as f:
|
|
config_dict = tomllib.load(f)
|
|
config = dict_to_namedtuple("Config", config_dict)
|
|
return config
|
|
|
|
def parse_event_symbol(symbol: str):
|
|
symbol = symbol.lstrip('.')
|
|
pattern = r"([A-Z]+)(\d{6})([CP])(\d+)"
|
|
match = re.match(pattern, symbol)
|
|
if not match:
|
|
return None
|
|
underlying, exp_str, opt_type, strike_str = match.groups()
|
|
expiration = datetime.strptime(exp_str, "%y%m%d").date()
|
|
strike = int(strike_str) / 10
|
|
return {
|
|
"underlying": underlying,
|
|
"expiration": expiration.isoformat(),
|
|
"option_type": opt_type,
|
|
"strike": strike,
|
|
}
|
|
def create_mqttc(config):
|
|
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
|
|
mqttc.username_pw_set(username=config.mqtt.user,password=config.mqtt.password)
|
|
mqttc.connect("proliant.lan",port=1883)
|
|
logger.info("MQTT client instantiated")
|
|
return mqttc
|
|
|
|
def parse_greek_event(greek_event):
|
|
# parse symbol details
|
|
parsed_symbol = parse_event_symbol(greek_event.event_symbol)
|
|
|
|
# combine everything into one dict
|
|
event_dict = {
|
|
**parsed_symbol,
|
|
"event_time": greek_event.event_time,
|
|
"event_flags": greek_event.event_flags,
|
|
"index": greek_event.index,
|
|
"time": greek_event.time,
|
|
"sequence": greek_event.sequence,
|
|
"price": float(greek_event.price),
|
|
"volatility": float(greek_event.volatility),
|
|
"delta": float(greek_event.delta),
|
|
"gamma": float(greek_event.gamma),
|
|
"theta": float(greek_event.theta),
|
|
"rho": float(greek_event.rho),
|
|
"vega": float(greek_event.vega),
|
|
}
|
|
|
|
return event_dict
|