StockChartingDash/indicators.py
2024-10-07 23:52:40 -07:00

452 lines
15 KiB
Python

# -*- coding: utf-8 -*-
"""
TODO: given a list of symbols, for each stock, plot
Subplot1:
1. price
2. 50 and 200 day SMA lines
3. bollinger band (200 day)
Subplot2
RSI
Subplot3
MACD
TODO: validate the plots with online resource
Updated on Mon Sep. 30, 2024
Use one as buy/sell trigger and verify a bag of indicators to make final decision.
Could also come up with a value that ties to the trading volume.
@author: thomwang
"""
import pandas as pd
import numpy as np
import datetime as dt
import plotly.express as px
from plotly.subplots import make_subplots
from dash import Dash, html, dcc, callback, Output, Input, State, no_update, ctx
from waitress import serve
from flask_caching import Cache
from dash.exceptions import PreventUpdate
from dash_auth import BasicAuth
import yahoo_fin.stock_info as si
from dotenv import load_dotenv
import os
from sec_cik_mapper import StockMapper, MutualFundMapper
from subroutines import *
pd.options.mode.chained_assignment = None # default='warn'
load_dotenv()
LB_YEAR = 3 # years of stock data to retrieve
PLT_YEAR = 2.7 # number of years data to plot
LB_TRIGGER = 5 # days to lookback for triggering events
def intelligent_loop_plots(sym : str, stk_data):
"""calculated indicators
Args:
sym (str): ticker
stk_data (DataFrame): stock data
Returns:
DataFrame: price
DataFrame: volume
DataFrame: macd
DataFrame: rsi
str: detected indicators
"""
price = stk_data["adjclose"]
vol = stk_data["volume"]
stk = Security(sym, price, vol)
rsi = stk.rsi()
vorsi = stk.volume_rsi()
macd, macd_sig, macd_hist, norm_hist = stk.macd()
sma50 = stk.sma(50)
vwma50 = stk.vwma(50)
sma200 = stk.sma(200)
bol_low, bol_up = stk.bollinger(200)
# init
plot_indicator = "["
# print('{:5}: '.format(sym), end = '')
# RSI outside window (over bought / over sold)
rsi_tail = rsi.tail(LB_TRIGGER)
if (rsi_tail >= 70).any() or (rsi_tail <= 30).any():
# print('--RSI', end = '')
plot_indicator += 'RSI, '
# VoRSI outside window (over bought / over sold)
vorsi_tail = vorsi.tail(LB_TRIGGER)
if (vorsi_tail >= 70).any() or (vorsi_tail <= 30).any():
# print('--VoRSI', end = '')
plot_indicator += 'VoRSI, '
# Normalized MACD histogram out of 3% range
norm_hist_tail = abs(norm_hist.tail(LB_TRIGGER))
if (abs(norm_hist_tail) >= 0.02).any():
# print('--MACD/R', end = '') # outside normal range
plot_indicator += 'MACD/R, '
# MACD histogram zero crossing
macd_hist_tail = macd_hist.tail(LB_TRIGGER)
macd_hist_sign = np.sign(macd_hist_tail)
macd_hist_diff = macd_hist_sign.diff()
if (abs(macd_hist_diff) > 1).any():
# print('--MACD', end = '') # zero crossing
plot_indicator += 'MACD, '
# Stock price crosses SMA50
sma50_cross_tail = sma50.tail(LB_TRIGGER) - price.tail(LB_TRIGGER)
sma50_cross_sign = np.sign(sma50_cross_tail)
sma50_cross_diff = sma50_cross_sign.diff()
if (abs(sma50_cross_diff) > 1).any():
# print('--SMA50', end = '')
plot_indicator += 'SMA50, '
# Death cross or golden cross - SMA50 vs SMA200
sma_cross_tail = sma50.tail(LB_TRIGGER) - sma200.tail(LB_TRIGGER).values
sma_cross_sign = np.sign(sma_cross_tail)
sma_cross_diff = sma_cross_sign.diff()
if (abs(sma_cross_diff) > 1).any():
# print('--Golden/Death', end = '')
plot_indicator += 'Golden/Death, '
# Price outside bollinger band or crossing
price_tail = price.tail(LB_TRIGGER)
bol_low_tail = bol_low.tail(LB_TRIGGER)
bol_up_tail = bol_up.tail(LB_TRIGGER)
price_high = price_tail - bol_up_tail.values
price_low = price_tail - bol_low_tail.values
if (price_high >= 0).any() or (price_low <= 0).any():
# print('--Bollinger', end ='')
plot_indicator += 'Bollinger, '
# Price cross 200 day moving average
sma200_tail = sma200.tail(LB_TRIGGER)
sma200_cross = price_tail - sma200_tail.values
sma200_cross_sign = np.sign(sma200_cross)
sma200_cross_diff = sma200_cross_sign.diff()
if (abs(sma200_cross_diff) > 1).any():
# print('--SMA200', end = '')
plot_indicator += 'SMA200, '
# Large trading volume trigger
volume_tail = vol.tail(LB_TRIGGER)
vol_mean = vol.tail(50).mean()
vol_std = vol.tail(50).std()
if ((volume_tail - vol_mean - 2*vol_std) > 0).any():
# print('--HiVol', end = '')
plot_indicator += "HiVol, "
# print(f"-- {watchlist.loc[sym, 'Notes']}") # carriage return
plot_indicator += ']'
# note_field = watchlist.loc[sym, 'Notes'].strip().lower()
# if note_field != "watch" and ( note_field == "skip" or \
# plot_indicator =="[]" ):
# continue # skipping plotting to save memory and time
# plot basic price info
data = price.copy().to_frame(sym)
# to limit low bound when plotting in log scale
bol_low.loc[sma200.divide(bol_low) > bol_up.divide(sma200).mul(3)] = np.nan
data = data.join(bol_low.rename('_BOL200L'))
data = data.join(bol_up.rename('_BOL200U'))
data = data.join(sma200.rename('_SMA200'))
data = data.join(sma50.rename('_SMA50'))
data = data.join(vwma50.rename('_WVMA50'))
macd = macd.to_frame('_MACD').join(macd_sig.rename('_SIG'))
macd = macd.join(macd_hist.rename('_HIST'))
# macd.rename(columns={sym: sym+'_MACD'}, inplace=True)
rsi = rsi.to_frame('_RSI').join(vorsi.rename('_VoRSI'))
# rsi.rename(columns={sym: sym+'_RSI'}, inplace=True)
return data, vol.to_frame('_VOL'), macd, rsi, plot_indicator
VALID_USERNAME_PASSWORD_PAIRS = {
os.environ['USER1']:os.environ['PASS1'],
os.environ['USER2']:os.environ['PASS2'],
}
class Auth:
def __init__(self) -> None:
self.username = None
def user_login(self, username, password):
if VALID_USERNAME_PASSWORD_PAIRS.get(username) == hash_password(password):
self.username = username
return True
return False
auth = Auth()
# Initialize the app
app = Dash(__name__)
BasicAuth(
app,
# VALID_USERNAME_PASSWORD_PAIRS,
auth_func=auth.user_login,
secret_key=os.environ['SECRET_KEY'],
)
CACHE_CONFIG = {'CACHE_TYPE': 'SimpleCache'}
cache = Cache()
cache.init_app(app.server, config=CACHE_CONFIG)
@cache.memoize(timeout=14400) # cache timeout set to 4 hours
def fetch_stk_data(sym, sd):
return si.get_data(sym, start_date=sd)[["adjclose", "volume"]]
# App layout
app.layout = [
html.Div([
html.Button('Load', id="reload-button", n_clicks=0,
style={'font-size': '12px', 'width': '80px', 'display': 'inline-block', 'margin-bottom': '10px', 'margin-right': '5px', 'height':'36px', 'verticalAlign': 'top'}),
# html.Label('Add ticker:', style={'width': '80px', 'display': 'inline-block', 'height':'36px', 'textAlign': 'center', 'adjust': 'right', 'margin-bottom': '10px', 'margin-right': '5px', 'verticalAlign': 'top'}),
html.Div([
html.Label('Add ticker:'),
dcc.Input(id='input-ticker', type='text', maxLength=5, debounce=True, style={'height':'31px', 'width':'50px'}),
], style={'width': '150px', 'text-align': 'center'}),
html.Button('Remove', id="remove-button", n_clicks=0,
style={'font-size': '12px', 'width': '80px', 'display': 'inline-block', 'margin-bottom': '10px', 'margin-right': '5px', 'height':'36px', 'verticalAlign': 'top'}),
html.Div([
dcc.Dropdown(id='symbols_dropdown_list',),
], style={'width': '330px', 'text-align': 'center'}),
html.Button('Auto Play', id="start-button", n_clicks=0,
style={'font-size': '12px', 'width': '80px', 'display': 'inline-block', 'margin-bottom': '10px', 'margin-right': '5px', 'height':'36px', 'verticalAlign': 'top'}),
], style={'display':'flex', 'justify-content':'center'}),
dcc.Graph(
figure={},
id='controls-and-graph',
style={'height':'85vh'}
),
dcc.Interval(
id='interval-component',
interval=3*1000, # in milliseconds
n_intervals=0,
max_intervals=1,
disabled=True,
),
dcc.Store(id="signaling"), # to pass the input ticker and signal reload
dcc.Store(id="del_sig") # to signal reload only after delete a ticker
]
app.clientside_callback(
"""
function(id) {
document.addEventListener("keyup", function(event) {
if (event.key == ' ') {
document.getElementById('start-button').click()
event.stopPropogation()
}
});
return window.dash_clientside.no_update
}
""",
Output("start-button", "id"),
Input("start-button", "id")
)
# delete ticker button callback
@callback(Output('del_sig', 'data'),
Input("remove-button", "n_clicks"),
State("symbols_dropdown_list", "value")
)
def remove_ticker(n, sym_raw : str):
if n and len(sym_raw) > 0:
sym = sym_raw.split()[0]
remove_from_db(auth.username, sym.upper())
return n
return no_update
# ticker input callback
@callback(Output('signaling', component_property='data'),
Input('input-ticker', component_property='value'))
def update_tickers(ticker : str):
if ticker:
ticker_upper = ticker.upper()
long_name = StockMapper().ticker_to_company_name.get(ticker_upper)
if long_name:
insert_into_db(auth.username, ticker_upper, long_name)
return ticker_upper
else:
long_name = MutualFundMapper().ticker_to_series_id.get(ticker_upper)
if long_name:
insert_into_db(auth.username, ticker_upper, 'Mutual Fund - ' + long_name)
return ticker_upper
return no_update
# start / stop button callback
@callback(Output('interval-component', 'disabled'),
Output("start-button", "children"),
Output('symbols_dropdown_list', 'disabled'),
Input("start-button", "n_clicks"),
State("start-button", "children"),
)
def start_cycle(n, value):
if n:
if value == "Pause":
return True, "Auto Play", False
else:
return False, "Pause", True
return no_update
# clear input
@callback(Output('input-ticker', component_property='value'),
Input("signaling", "data"),
)
def clear_input(d):
return ''
# reload button callback
@callback(Output('symbols_dropdown_list', 'options'),
Output('interval-component', 'n_intervals'),
Output('interval-component', 'max_intervals'),
Input("reload-button", "n_clicks"),
Input("signaling", "data"),
Input("del_sig", "data"),
)
def reload_syms(n, s, d):
# if n or s or d:
watchlist = get_watchlist(auth.username)
symbols = (watchlist.iloc[:, 0] + " - " + watchlist.iloc[:, 1]).tolist()
return symbols, 0, 2*len(symbols)
# return no_update
# interval callback
@callback(Output('symbols_dropdown_list', 'value'),
Input("signaling", "data"),
Input("interval-component", "n_intervals"),
State('symbols_dropdown_list', 'options'),
)
def cycle_syms(tick_input, n, syms):
if not syms:
return no_update
triggered_id = ctx.triggered_id
if triggered_id == "interval-component":
return syms[n % len(syms)]
elif triggered_id == "signaling":
row_num = None
tick_len = len(tick_input) + 2
tick_match = tick_input + ' -'
for i in range(0, len(syms)):
if syms[i][0:tick_len] == tick_match:
row_num = i
break
if row_num is not None:
return syms[row_num]
return no_update
# dropdown callback
@callback(
Output(component_id='controls-and-graph', component_property='figure'),
Input(component_id='symbols_dropdown_list', component_property='value'),
# State('symbols_dropdown_list', 'options')
# Input(component_id='signal', component_property='data'),
)
def update_graph(col_chosen):
if not col_chosen:
raise PreventUpdate
# return no_update
sym = col_chosen.split()[0]
end_date = dt.datetime.today()
start_date = end_date - dt.timedelta(days = round(365 * LB_YEAR))
days_to_drop = round(365 * (LB_YEAR-PLT_YEAR))
days_to_drop = days_to_drop if days_to_drop > 0 else 0
tmp = fetch_stk_data(sym, start_date) # cached function all
data, vol, macd, rsi, plot_ind = intelligent_loop_plots(sym, tmp)
data.drop(data.index[:days_to_drop], inplace=True)
vol.drop(vol.index[:days_to_drop], inplace=True)
macd.drop(macd.index[:days_to_drop], inplace=True)
rsi.drop(rsi.index[:days_to_drop], inplace=True)
fig = make_subplots(
rows=3,
cols=1,
shared_xaxes=True,
row_heights=[0.6, 0.2, 0.2],
vertical_spacing=0.02,
specs=[[{"secondary_y": True}],[{"secondary_y": False}],[{"secondary_y": False}]],
)
price_line = px.line(
data,
x=data.index,
y=data.columns.to_list(),
)
volume_line = px.bar(
vol,
x=vol.index,
y=vol.columns.to_list(),
)
macd_line = px.line(
macd,
x=macd.index,
y=['_MACD', '_SIG'],
)
macd_neg = macd.copy()
macd_pos = macd.copy()
macd_neg[macd_neg>0] = 0
macd_pos[macd_pos<0] = 0
macd_hist_pos = px.line(
macd_pos,
x=macd_pos.index,
y=['_HIST'],
)
macd_hist_pos.update_traces(fill='tozeroy', line_color='rgba(0,100,0,0.5)', showlegend=False)
macd_hist_neg = px.line(
macd_neg,
x=macd_neg.index,
y=['_HIST'],
)
macd_hist_neg.update_traces(fill='tozeroy', line_color='rgba(100,0,0,0.5)', showlegend=False)
rsi_line = px.line(
rsi,
x=rsi.index,
y=['_RSI', '_VoRSI'],
)
fig.add_traces(price_line.data + volume_line.data, rows=1, cols=1, secondary_ys=[True, True, True, True, True, True, False])
fig.add_traces(macd_line.data + macd_hist_pos.data + macd_hist_neg.data, rows=2, cols=1)
fig.add_traces(rsi_line.data, rows=3, cols=1)
# fig.update_traces(marker_color = 'rgba(0,0,250,0.5)',
# marker_line_width = 0,
# selector=dict(type="bar"),
fig.layout.yaxis.title="Volume"
fig.layout.yaxis2.title="Price ($)"
fig.layout.yaxis2.type="log"
fig.layout.yaxis3.title="MACD"
fig.layout.yaxis4.title="RSI/VoRSI"
fig.layout.yaxis.griddash="dash"
fig.update_layout(title_text=sym+' - '+plot_ind, title_x=0.5,
margin=dict(l=30, r=20, t=50, b=20),
)
# fig.update_layout(showlegend=False)
return fig
if __name__ == "__main__":
serve(app.server, host="0.0.0.0", port=8050, threads=7) # using production quality WSGI server Waitress
# app.run(debug=True)