import numpy as np import matplotlib.pyplot as plt from api import fetch_chart_data_yahoo, pull_last_from_file import warnings from datetime import timedelta, datetime import concurrent.futures def main(): # take the fft of sin(t) for t in [0, 2*pi] dt = 1 t = np.arange(0, 100, dt) y = np.sin(t) y += np.min(y) y_fft = np.fft.fftshift((np.fft.fft(y))) freqs = ( 1 / dt ) * np.linspace(-1/2, 1/2, len(y)) # graph the original and the fft plt.subplot(2, 1, 1) plt.plot(t, y) plt.title("Original Signal") plt.subplot(2, 1, 2) plt.plot(freqs, np.abs(y_fft)) plt.title("FFT of Signal") plt.show() # introduce new variable gamma to sweep over range of convergence gamma = np.linspace(-.05, .25, 100) for g in gamma: y_damped = y * np.exp(-g * t) y_fft_damped = np.fft.fftshift((np.fft.fft(y_damped))) freqs = (1 / dt) * np.linspace(-1/2, 1/2, len(y_damped)) #plt.plot(freqs, np.abs(y_fft_damped), label=f"Gamma: {g:.2f}") # print what the limit approaches print(f"Limit at gamma={g:.2f}, x -> infinity: {np.real(y_fft_damped[-1])}") # make a numpy function accordig to data timeseries data = fetch_chart_data_yahoo('AAPL') print(data.keys()) y_test = np.array(data['prices']) y_test += np.min(y_test) t_test = np.arange(0, len(data['prices']), 1) print(y_test.shape, t_test.shape) gamma = find_gamma_where_area_changes_signs(y_test, t_test) print(gamma) # Example: Add more S&P 500 stocks to the list deltatime= timedelta(days=8) interval = '1m' stock_list = [ 'AAPL', 'MSFT', 'GOOGL', 'AMZN', 'NVDA', 'TSLA', 'NFLX', 'PLTR', 'META', 'JPM', 'V', 'UNH', 'HD', 'MA', 'PG', 'LLY', 'AVGO', 'XOM', 'COST', 'MRK', 'ABBV', 'PEP', 'CVX', 'ADBE', 'WMT', 'CRM', 'ACN', 'MCD', 'DHR', 'AMD', 'TXN', 'LIN', 'NEE', 'UNP', 'HON', 'AMAT', 'LOW', 'QCOM', 'INTC', 'TMO', 'COP', 'BKNG', 'SPGI', 'GS', 'ISRG', 'NOW', 'BLK', 'AXP', 'DE', 'CAT', 'LMT', 'MDT', 'SYK', 'C', 'AMGN', 'ELV', 'SCHW', 'CB', 'PGR', 'VRTX', 'REGN', 'CI', 'ADP', 'GILD', 'MO', 'SO', 'DUK', 'MMC', 'TGT', 'FISV', 'BSX', 'PNC', 'BDX', 'ITW', 'NSC', 'CME', 'AON', 'ETN', 'ECL', 'EMR', 'AIG', 'HCA', 'PSA', 'APD', 'ORLY', 'SHW', 'SRE', 'MCO', 'ROST', 'KMB', 'WELL', 'TRV', 'STZ', 'PAYX', 'VLO', 'WMB', 'MTD', 'F', 'GM' ] def gamma_worker(ticker): data = fetch_chart_data_yahoo(ticker, interval, period_length=deltatime) # print the first and last date and price of this ticker start_date = datetime.fromtimestamp(data['timestamps'][0]).strftime('%Y-%m-%d %H:%M:%S') end_date = datetime.fromtimestamp(data['timestamps'][-1]).strftime('%Y-%m-%d %H:%M:%S') print(f"{ticker}: {start_date} {data['prices'][0]} {end_date} {data['prices'][-1]}") # normalize the data using min-max scaling min_price = np.min(data['prices']) max_price = np.max(data['prices']) normalized_prices = (data['prices'] - min_price) / (max_price - min_price) if max_price > min_price else data['prices'] # for now, set normalized prices to data['prices'] normalized_prices = data['prices'] return ticker, find_gamma_where_area_changes_signs(normalized_prices, np.arange(0, len(normalized_prices), 1)), (data['prices'][-1] - data['prices'][0]) / data['prices'][0] with concurrent.futures.ThreadPoolExecutor() as executor: results = list(executor.map(gamma_worker, stock_list)) keys = [r[0] for r in results] value = [r[1] for r in results] percent_change = [r[2] for r in results] gamma_map = dict(zip(keys, value)) percent_change_map = dict(zip(keys, percent_change)) # gamma_map = {} # print(results) # for ticker in stock_list: # print(ticker) # data = fetch_chart_data_yahoo(ticker) # gamma_map[ticker] = find_gamma_where_area_changes_signs(data['prices'], np.arange(0, len(data['prices']), 1)) spy_data = fetch_chart_data_yahoo('SPY', interval, period_length=deltatime) gamma_spy = find_gamma_where_area_changes_signs(spy_data['prices'], np.arange(0, len(spy_data['prices']), 1)) distribution_std = np.std([gamma_map[ticker] for ticker in stock_list]) # make a normal distrubiton where gamma spy is the mean normal_dist = np.random.normal(loc=gamma_spy, scale=distribution_std, size=500) # for each stock in the gamma map, determine a p-value to see who is under and over performing p_values = {} for ticker in stock_list: p_values[ticker] = np.sum(normal_dist < gamma_map[ticker]) / len(normal_dist) # pretty print on each line the pvalues for ticker in stock_list: print(f"{ticker}: {p_values[ticker]}") # print the statistically significant stocks with an alpha of 0.05 alpha = 0.10 print(f"Statistically significant stocks (alpha={alpha}):") for ticker in stock_list: if p_values[ticker] < alpha or p_values[ticker] > 1 - alpha: # determine their percent gain/loss over this time period from the prices percent_change = percent_change_map[ticker] * 100 print(f" - {ticker}: {p_values[ticker]} ({percent_change:.2f}%)") # data = pull_last_from_file() # y = np.array(data['prices']) # t = np.array(data['timestamps']) # y += np.min(y) # print(len(y), len(t), len(y) == len(t)) # # gamma_low = low_where_no_overflow(y, t) # gamma_low = 0.00000001 # F, dF = compute_laplacian_transform(gamma_low, y, t) # print(f"gamma_low: {gamma_low}\n") # print(F) # print(dF) # print(t) # print(np.argwhere(np.abs(F) < .0001)) # print(np.argwhere(np.abs(dF) < 1)) # prev_sign = dF[0] # swings = [] # for i, v in enumerate(dF): # if i == 0: continue # if np.sign(prev_sign) * np.sign(v) < 0: # # print(f'sign_change found in dF: i = {i}, prev_value = {dF[i-1]}, value = {v}, swing = {v - dF[i-1]}') # swings.append(v - dF[i-1]) # prev_sign = np.sign(v) # s_avg = np.average(swings) # s_std = np.std(swings) # s_max = np.max(swings) # s_min = np.min(swings) # s_cnt = len(swings) # print(f'swings: [avg: {s_avg}, std: {s_std}, max: {s_max}, min: {s_min}, count: {s_cnt}]') # t_scores = (swings - s_avg) / s_std # # print(np.sort(t_scores)) # normal_s_dist = np.random.normal(loc=s_avg, scale=s_std, size=1000000) # p_values = [] # for t in t_scores: # p_values.append(np.sum(normal_s_dist < t) / 1000000) # p_values = np.array(p_values) # print("p_values: ", p_values) # print(np.argwhere(np.abs(p_values) < .25)) def compute_laplacian_transform(gamma, y, t): # compute the laplacian transform for a given gamma # check to see if -gamma * t is too large and will throw an overflow y_damped = y * np.exp(-gamma * t) y_fft_damped = np.fft.fftshift((np.fft.fft(y_damped))) area = np.real((y_fft_damped)) # such that s = gamma + iw # dy/dt = s * y(s) - y(0) # is the space of these wavelengths # dt = t[1] - t[0] # i need to determine what the frequencies are here... they seem like they should be on the unit circle # wi = ( 1.j / dt ) * np.linspace(-1, 1, len(y)) s = t * 1.j + gamma # print('s___\n', s) dy_dt = s * y_fft_damped - y_fft_damped[0] da = np.real(dy_dt) return area, da def compute_laplacian_transform_convergence(gamma, y, t): # compute the laplacian transform for a given gamma # check to see if -gamma * t is too large and will throw an overflow y_damped = y * np.exp(-gamma * t) y_fft_damped = np.fft.fftshift((np.fft.fft(y_damped))) area = np.real(y_fft_damped[-1]) return area def low_where_no_overflow(y, t): # setup a raise overflow exepction when warnings.filterwarnings("error", category=RuntimeWarning) low = -7 while True: try: compute_laplacian_transform_convergence(low, y, t) return low except RuntimeWarning: low += .1 def find_gamma_where_area_changes_signs(y, t): # start at gamma = 100 # do a binary search over the gamma values # set high to the max float low = low_where_no_overflow(y, t) low_area = compute_laplacian_transform_convergence(low, y, t) high = 100000000 high_area = compute_laplacian_transform_convergence(high, y, t) found = False # print(f"Log: starting with high={high} and low={low}, high_area={high_area}, low_area={low_area}.") iters = 0 while not found: if iters > 10000: raise Exception("ERROR: find_gamma_where_area_changes_signs did not converge after 10,000 iterations.") iters += 1 mid = low * 0.5 + high * 0.5 # if there is no sign change between high and low return mid with a warning if np.sign(high_area) * np.sign(low_area) > 0: print(f"Warning: No sign change between high={high} and low={low}, high_area={high_area}, low_area={low_area}. Returning mid={mid}.") return mid # compute laplacian transform at this gamma mid_area = compute_laplacian_transform_convergence(mid, y, t) # if there is a sign change between mid and high, then set low to mid if np.sign(high_area) * np.sign(mid_area) <= 0: low = mid low_area = mid_area # if low and area are ,001 apart, then we have found the root if abs(low - high) < .000000001: found = True # print('branch1') continue # if there is a sign change between mid and low, then set high to mid if np.sign(low_area) * np.sign(mid_area) <= 0: high = mid high_area = mid_area # if high and area are one apart, then we have found the root if abs(high - low) < .000000001: found = True # print('branch2') continue # print('branch3', low_area, high_area) # return the gamma value where the area changes sign # print(f"Log: found gamma={low} after {iters} iterations.") return low if __name__ == "__main__": main()