# -*- coding: utf-8 -*- """ TODO: given a list of symbols, for each stock, plot Subplot1: 1. price 2. 50 and 200 day SMA lines 3. bollinger band (200 day) Subplot2 RSI Subplot3 MACD TODO: validate the plots with online resource Updated on Mon Sep. 30, 2024 Use one as buy/sell trigger and verify a bag of indicators to make final decision. Could also come up with a value that ties to the trading volume. @author: thomwang """ import pandas as pd import numpy as np import datetime as dt import plotly.express as px from plotly.subplots import make_subplots from dash import Dash, html, dcc, callback, Output, Input, State, no_update, ctx from waitress import serve from flask_caching import Cache from dash.exceptions import PreventUpdate from dash_auth import BasicAuth import yahoo_fin.stock_info as si from dotenv import load_dotenv import os from sec_cik_mapper import StockMapper, MutualFundMapper from subroutines import * pd.options.mode.chained_assignment = None # default='warn' load_dotenv() LB_YEAR = 3 # years of stock data to retrieve PLT_YEAR = 2.7 # number of years data to plot LB_TRIGGER = 5 # days to lookback for triggering events def intelligent_loop_plots(sym : str, stk_data): """calculated indicators Args: sym (str): ticker stk_data (DataFrame): stock data Returns: DataFrame: price DataFrame: volume DataFrame: macd DataFrame: rsi str: detected indicators """ price = stk_data["adjclose"] vol = stk_data["volume"] stk = Security(sym, price, vol) rsi = stk.rsi() vorsi = stk.volume_rsi() macd, macd_sig, macd_hist, norm_hist = stk.macd() sma50 = stk.sma(50) vwma50 = stk.vwma(50) sma200 = stk.sma(200) bol_low, bol_up = stk.bollinger(200) # init plot_indicator = "[" # print('{:5}: '.format(sym), end = '') # RSI outside window (over bought / over sold) rsi_tail = rsi.tail(LB_TRIGGER) if (rsi_tail >= 70).any() or (rsi_tail <= 30).any(): # print('--RSI', end = '') plot_indicator += 'RSI, ' # VoRSI outside window (over bought / over sold) vorsi_tail = vorsi.tail(LB_TRIGGER) if (vorsi_tail >= 70).any() or (vorsi_tail <= 30).any(): # print('--VoRSI', end = '') plot_indicator += 'VoRSI, ' # Normalized MACD histogram out of 3% range norm_hist_tail = abs(norm_hist.tail(LB_TRIGGER)) if (abs(norm_hist_tail) >= 0.02).any(): # print('--MACD/R', end = '') # outside normal range plot_indicator += 'MACD/R, ' # MACD histogram zero crossing macd_hist_tail = macd_hist.tail(LB_TRIGGER) macd_hist_sign = np.sign(macd_hist_tail) macd_hist_diff = macd_hist_sign.diff() if (abs(macd_hist_diff) > 1).any(): # print('--MACD', end = '') # zero crossing plot_indicator += 'MACD, ' # Stock price crosses SMA50 sma50_cross_tail = sma50.tail(LB_TRIGGER) - price.tail(LB_TRIGGER) sma50_cross_sign = np.sign(sma50_cross_tail) sma50_cross_diff = sma50_cross_sign.diff() if (abs(sma50_cross_diff) > 1).any(): # print('--SMA50', end = '') plot_indicator += 'SMA50, ' # Death cross or golden cross - SMA50 vs SMA200 sma_cross_tail = sma50.tail(LB_TRIGGER) - sma200.tail(LB_TRIGGER).values sma_cross_sign = np.sign(sma_cross_tail) sma_cross_diff = sma_cross_sign.diff() if (abs(sma_cross_diff) > 1).any(): # print('--Golden/Death', end = '') plot_indicator += 'Golden/Death, ' # Price outside bollinger band or crossing price_tail = price.tail(LB_TRIGGER) bol_low_tail = bol_low.tail(LB_TRIGGER) bol_up_tail = bol_up.tail(LB_TRIGGER) price_high = price_tail - bol_up_tail.values price_low = price_tail - bol_low_tail.values if (price_high >= 0).any() or (price_low <= 0).any(): # print('--Bollinger', end ='') plot_indicator += 'Bollinger, ' # Price cross 200 day moving average sma200_tail = sma200.tail(LB_TRIGGER) sma200_cross = price_tail - sma200_tail.values sma200_cross_sign = np.sign(sma200_cross) sma200_cross_diff = sma200_cross_sign.diff() if (abs(sma200_cross_diff) > 1).any(): # print('--SMA200', end = '') plot_indicator += 'SMA200, ' # Large trading volume trigger volume_tail = vol.tail(LB_TRIGGER) vol_mean = vol.tail(50).mean() vol_std = vol.tail(50).std() if ((volume_tail - vol_mean - 2*vol_std) > 0).any(): # print('--HiVol', end = '') plot_indicator += "HiVol, " # print(f"-- {watchlist.loc[sym, 'Notes']}") # carriage return plot_indicator += ']' # note_field = watchlist.loc[sym, 'Notes'].strip().lower() # if note_field != "watch" and ( note_field == "skip" or \ # plot_indicator =="[]" ): # continue # skipping plotting to save memory and time # plot basic price info data = price.copy().to_frame(sym) # to limit low bound when plotting in log scale bol_low.loc[sma200.divide(bol_low) > bol_up.divide(sma200).mul(3)] = np.nan data = data.join(bol_low.rename('_BOL200L')) data = data.join(bol_up.rename('_BOL200U')) data = data.join(sma200.rename('_SMA200')) data = data.join(sma50.rename('_SMA50')) data = data.join(vwma50.rename('_WVMA50')) macd = macd.to_frame('_MACD').join(macd_sig.rename('_SIG')) macd = macd.join(macd_hist.rename('_HIST')) # macd.rename(columns={sym: sym+'_MACD'}, inplace=True) rsi = rsi.to_frame('_RSI').join(vorsi.rename('_VoRSI')) # rsi.rename(columns={sym: sym+'_RSI'}, inplace=True) return data, vol.to_frame('_VOL'), macd, rsi, plot_indicator VALID_USERNAME_PASSWORD_PAIRS = { os.environ['USER1']:os.environ['PASS1'], os.environ['USER2']:os.environ['PASS2'], } class Auth: def __init__(self) -> None: self.username = None def user_login(self, username, password): if VALID_USERNAME_PASSWORD_PAIRS.get(username) == hash_password(password): self.username = username return True return False auth = Auth() # Initialize the app app = Dash(__name__) BasicAuth( app, # VALID_USERNAME_PASSWORD_PAIRS, auth_func=auth.user_login, secret_key=os.environ['SECRET_KEY'], ) CACHE_CONFIG = {'CACHE_TYPE': 'SimpleCache'} cache = Cache() cache.init_app(app.server, config=CACHE_CONFIG) @cache.memoize(timeout=14400) # cache timeout set to 4 hours def fetch_stk_data(sym, sd): return si.get_data(sym, start_date=sd)[["adjclose", "volume"]] # App layout app.layout = [ html.Div([ html.Button('Load', id="reload-button", n_clicks=0, style={'font-size': '12px', 'width': '80px', 'display': 'inline-block', 'margin-bottom': '10px', 'margin-right': '5px', 'height':'36px', 'verticalAlign': 'top'}), # html.Label('Add ticker:', style={'width': '80px', 'display': 'inline-block', 'height':'36px', 'textAlign': 'center', 'adjust': 'right', 'margin-bottom': '10px', 'margin-right': '5px', 'verticalAlign': 'top'}), html.Div([ html.Label('Add ticker:'), dcc.Input(id='input-ticker', type='text', maxLength=5, debounce=True, style={'height':'31px', 'width':'50px'}), ], style={'width': '150px', 'text-align': 'center'}), html.Button('Remove', id="remove-button", n_clicks=0, style={'font-size': '12px', 'width': '80px', 'display': 'inline-block', 'margin-bottom': '10px', 'margin-right': '5px', 'height':'36px', 'verticalAlign': 'top'}), html.Div([ dcc.Dropdown(id='symbols_dropdown_list',), ], style={'width': '330px', 'text-align': 'center'}), html.Button('Auto Play', id="start-button", n_clicks=0, style={'font-size': '12px', 'width': '80px', 'display': 'inline-block', 'margin-bottom': '10px', 'margin-right': '5px', 'height':'36px', 'verticalAlign': 'top'}), ], style={'display':'flex', 'justify-content':'center'}), dcc.Graph( figure={}, id='controls-and-graph', style={'height':'85vh'} ), dcc.Interval( id='interval-component', interval=3*1000, # in milliseconds n_intervals=0, max_intervals=1, disabled=True, ), dcc.Store(id="signaling"), # to pass the input ticker and signal reload dcc.Store(id="del_sig") # to signal reload only after delete a ticker ] app.clientside_callback( """ function(id) { document.addEventListener("keyup", function(event) { if (event.key == ' ') { document.getElementById('start-button').click() event.stopPropogation() } }); return window.dash_clientside.no_update } """, Output("start-button", "id"), Input("start-button", "id") ) # delete ticker button callback @callback(Output('del_sig', 'data'), Input("remove-button", "n_clicks"), State("symbols_dropdown_list", "value") ) def remove_ticker(n, sym_raw : str): if n and len(sym_raw) > 0: sym = sym_raw.split()[0] remove_from_db(auth.username, sym.upper()) return n return no_update # ticker input callback @callback(Output('signaling', component_property='data'), Input('input-ticker', component_property='value')) def update_tickers(ticker : str): if ticker: ticker_upper = ticker.upper() long_name = StockMapper().ticker_to_company_name.get(ticker_upper) if long_name: insert_into_db(auth.username, ticker_upper, long_name) return ticker_upper else: long_name = MutualFundMapper().ticker_to_series_id.get(ticker_upper) if long_name: insert_into_db(auth.username, ticker_upper, 'Mutual Fund - ' + long_name) return ticker_upper return no_update # start / stop button callback @callback(Output('interval-component', 'disabled'), Output("start-button", "children"), Output('symbols_dropdown_list', 'disabled'), Input("start-button", "n_clicks"), State("start-button", "children"), ) def start_cycle(n, value): if n: if value == "Pause": return True, "Auto Play", False else: return False, "Pause", True return no_update # clear input @callback(Output('input-ticker', component_property='value'), Input("signaling", "data"), ) def clear_input(d): return '' # reload button callback @callback(Output('symbols_dropdown_list', 'options'), Output('interval-component', 'n_intervals'), Output('interval-component', 'max_intervals'), Input("reload-button", "n_clicks"), Input("signaling", "data"), Input("del_sig", "data"), ) def reload_syms(n, s, d): # if n or s or d: watchlist = get_watchlist(auth.username) symbols = (watchlist.iloc[:, 0] + " - " + watchlist.iloc[:, 1]).tolist() return symbols, 0, 2*len(symbols) # return no_update # interval callback @callback(Output('symbols_dropdown_list', 'value'), Input("signaling", "data"), Input("interval-component", "n_intervals"), State('symbols_dropdown_list', 'options'), ) def cycle_syms(tick_input, n, syms): if not syms: return no_update triggered_id = ctx.triggered_id if triggered_id == "interval-component": return syms[n % len(syms)] elif triggered_id == "signaling": row_num = None tick_len = len(tick_input) + 2 tick_match = tick_input + ' -' for i in range(0, len(syms)): if syms[i][0:tick_len] == tick_match: row_num = i break if row_num is not None: return syms[row_num] return no_update # dropdown callback @callback( Output(component_id='controls-and-graph', component_property='figure'), Input(component_id='symbols_dropdown_list', component_property='value'), # State('symbols_dropdown_list', 'options') # Input(component_id='signal', component_property='data'), ) def update_graph(col_chosen): if not col_chosen: raise PreventUpdate # return no_update sym = col_chosen.split()[0] end_date = dt.datetime.today() start_date = end_date - dt.timedelta(days = round(365 * LB_YEAR)) days_to_drop = round(365 * (LB_YEAR-PLT_YEAR)) days_to_drop = days_to_drop if days_to_drop > 0 else 0 tmp = fetch_stk_data(sym, start_date) # cached function all data, vol, macd, rsi, plot_ind = intelligent_loop_plots(sym, tmp) data.drop(data.index[:days_to_drop], inplace=True) vol.drop(vol.index[:days_to_drop], inplace=True) macd.drop(macd.index[:days_to_drop], inplace=True) rsi.drop(rsi.index[:days_to_drop], inplace=True) fig = make_subplots( rows=3, cols=1, shared_xaxes=True, row_heights=[0.6, 0.2, 0.2], vertical_spacing=0.02, specs=[[{"secondary_y": True}],[{"secondary_y": False}],[{"secondary_y": False}]], ) price_line = px.line( data, x=data.index, y=data.columns.to_list(), ) volume_line = px.bar( vol, x=vol.index, y=vol.columns.to_list(), ) macd_line = px.line( macd, x=macd.index, y=['_MACD', '_SIG'], ) macd_neg = macd.copy() macd_pos = macd.copy() macd_neg[macd_neg>0] = 0 macd_pos[macd_pos<0] = 0 macd_hist_pos = px.line( macd_pos, x=macd_pos.index, y=['_HIST'], ) macd_hist_pos.update_traces(fill='tozeroy', line_color='rgba(0,100,0,0.5)', showlegend=False) macd_hist_neg = px.line( macd_neg, x=macd_neg.index, y=['_HIST'], ) macd_hist_neg.update_traces(fill='tozeroy', line_color='rgba(100,0,0,0.5)', showlegend=False) rsi_line = px.line( rsi, x=rsi.index, y=['_RSI', '_VoRSI'], ) fig.add_traces(price_line.data + volume_line.data, rows=1, cols=1, secondary_ys=[True, True, True, True, True, True, False]) fig.add_traces(macd_line.data + macd_hist_pos.data + macd_hist_neg.data, rows=2, cols=1) fig.add_traces(rsi_line.data, rows=3, cols=1) # fig.update_traces(marker_color = 'rgba(0,0,250,0.5)', # marker_line_width = 0, # selector=dict(type="bar"), fig.layout.yaxis.title="Volume" fig.layout.yaxis2.title="Price ($)" fig.layout.yaxis2.type="log" fig.layout.yaxis3.title="MACD" fig.layout.yaxis4.title="RSI/VoRSI" fig.layout.yaxis.griddash="dash" fig.update_layout(title_text=sym+' - '+plot_ind, title_x=0.5, margin=dict(l=30, r=20, t=50, b=20), ) # fig.update_layout(showlegend=False) return fig if __name__ == "__main__": serve(app.server, host="0.0.0.0", port=8050, threads=7) # using production quality WSGI server Waitress # app.run(debug=True)