Portfolio

Checking your Stocks with Alphavantage and Python

If you have not yet heard of Alphavantage, it is a Stock market API that gives you many stock charts in JSON. You can obtain a Key for Free, and so long as you keep your API calls limited to 5 API requests per minute; 500 API requests per day, you can use their API. Let’s learn how to use Alphavantage and Python to build a stock analysis application.

To be honest this code may seem a bit convoluted due to it being cut out of a larger project. The project was decommissioned after moving to a different API Service. At this projects peak it was able to forecast and notify me of daily price movements that should be taken advantage of. After my broker started offering a Realtime live API, I decided to stop using this one.

Getting Started

  1. Obtain a Key From AlphaVantage.
  2. Open a new project in Pycharm.

Handling AlphaVantage

First lets build out a stock handler that will allow us to make easy calls to AlphaVantage and handle the JSON Data.

StockHandler.py

import numpy as np
import json
import requests


key = 'your key here'
#Alphavantage specific variables
interval = {1:'1min', 5:'5min', 15:'15min', 30:'30min', 60:'60min', 'daily':'Daily', 'month': 'Monthly Time Series'} #min allowed: 1, 5, 15, 30, 60 > Daily: 'Daily'
function = {'intra' :'TIME_SERIES_INTRADAY', 'daily': 'TIME_SERIES_DAILY', 'month': 'TIME_SERIES_MONTHLY'}
i_type = {'intra': 0, 'none': 1}
#END Alphavantage specific variables

# A simple function that allows for easier transition between AlphaVantage's "intraday" 1,5,15,30,60 minute time series,
# and their "Daily" series
def alphavantage_time_series_url(function, ticker, interval, i_type):

    url = 'https://www.alphavantage.co/query?function=' + function

    switcher = {
        0: url + '&symbol=' + ticker + '&interval=' + interval + '&outputsize=full&apikey=' + key,
        1: url + '&symbol=' + ticker + '&apikey=' + key,
        2: url + '&symbol=' + ticker + '&interval=1min&apikey=' + key
    }
    return switcher.get(i_type, "Invalid type")

def load_data(function, ticker, interval, i_type):
    url = alphavantage_time_series_url(function, ticker,  interval, i_type)
    params = {"retina_name": "en_associative","start_index": 0, "max_results": 1, "sparsity": 1.0, "get_fingerprint": False}
    r = requests.get(url=url, params=params)
    data_str = r.json()

    ds = json.dumps(data_str)
    json_obj = json.loads(ds)
    try:
        if interval == 'Monthly Time Series' or interval == 'Technical Analysis: VWAP':
            dataset = json_obj[interval]
        else:
            dataset = json_obj["Time Series (" + interval + ")"]
    except:
        return None, None

    x = []
    ox = []
    y = []
    z = []
    count = 0
    for key in dataset:
        x.append(count)
        count += 1
        ox.append(key)
        try:
            y.append(average([float(dataset[key]["2. high"]), float(dataset[key]["3. low"])]))
            z.append(float(dataset[key]["5. volume"]))
        except:
            y.append(float(dataset[key]["VWAP"]))

    if len(y) % 2 != 0:
        y.pop(0)
        x.pop(0)
        try:
            z.pop(0)
        except:
            z = None

    #deviation = Reverse(cut_mean_from_elements(y))
    #zdev = Reverse(cut_mean_from_elements(z))

    deviation = Reverse(y)
    try:
        zdev = Reverse(z)
        np_z = np.array(zdev).reshape(len(z), 1)
    except:
        np_z = None

    np_x = np.array(x).reshape(len(x), 1)
    np_y = np.array(deviation).reshape(len(y), 1)

    #print(ox)
    return {'x': np_x, 'y': np_y, 'z': np_z}, {'ox': Reverse(ox), 'y': y}

def get_quotes(symbols):
    url = 'https://www.alphavantage.co/query?function=BATCH_STOCK_QUOTES&apikey=' + key + '&symbols=' + symbols
    params = {"retina_name": "en_associative", "start_index": 0, "max_results": 1, "sparsity": 1.0,
              "get_fingerprint": False}
    r = requests.get(url=url, params=params)
    print(r)
    data_str = r.json()

    ds = json.dumps(data_str)
    json_obj = json.loads(ds)

    dataset = json_obj["Stock Quotes"]

    return dataset

def Reverse(lst):
    return [ele for ele in reversed(lst)]


def average(set):
    count = len(set)
    total = 0
    for part in set:
        total += part

    return total / count


def cut_mean_from_elements(set):
    mean = average(set)
    deviation = []
    for element in set:
        deviation.append(element - mean)

    return deviation

Please put your key in the ‘key’ variable.

Building Analyzers

Once we get the data we need to do something with it. Here I have built three different files.

DataManipulate.py

A simple snippet of code to use wherever it is needed. Simply take non-numeric data and make it numeric.

def pieces(l, n):
    n = max(1, n)
    return (l[i:i + n] for i in range(0, len(l), n))

LinearRegression.py

The simplest Stock visualization is a simple trend line. We will use Linear Regression to get a trend line.

import numpy as np
from sklearn.linear_model import LinearRegression
import Plotting.PlotFunctions as plt
from sklearn.svm import SVR
from sklearn.model_selection import GridSearchCV
import sklearn.metrics as metrics

class LR:

    model = None
    vmodel = None
    x = None
    y = None
    z = None
    accuracies = []


    def __init__(self, data, t):
        self.x = np.array(data['x']).reshape((-1, 1))
        self.y = np.array(data['y'])
        self.z = np.array(data['z'])
        self.ticker = t

        self.model = LinearRegression()
        self.vmodel = LinearRegression()
        self.model.fit(self.x, self.y)
        try:
            self.vmodel.fit(self.x, self.z)
        except:
            self.vmodel = None

    def get_trend(self):
        return self.model.coef_, self.model.intercept_


    def predict(self, x):
        return self.model.predict(x)

    def pieces(self, l, n):
        n = max(1, n)
        return (l[i:i + n] for i in range(0, len(l), n))

    def regressorOp(self, x, y):
        """
        This will optimize the parameters for the algo
        """
        regr_rbf = SVR(kernel="rbf", verbose=True, cache_size=1024)
        C = [10000, 6000, 5000, 1000, 10, 1]
        gamma = [0.005, 0.004, 0.003, 0.002, 0.001, 1, 2, 3, 4]
        epsilon = [0.1, 0.01, 0.05, 1]
        parameters = {"C": C, "gamma": gamma, "epsilon": epsilon}

        gs = GridSearchCV(regr_rbf, parameters, scoring="neg_mean_squared_error", cv=2, iid=True)
        gs.fit(x, y)

        #print("Best Estimator:\n", gs.best_estimator_)
        #print("Type: ", type(gs.best_estimator_))
        return gs.best_estimator_

    def view_trend_for_subset(self, sets):
        # Data is deviation, and originalData is true timeseries
        # (data.x = 0,1,2,...,n | originalData.x = 01-01-2020, ..., 02-01-2020, ..., n
        xPieces = self.pieces(self.x, int(len(self.x) / sets))
        yPieces = self.pieces(self.y, int(len(self.y) / sets))
        try:
            zPieces = self.pieces(self.z, int(len(self.z) / sets))
        except:
            zPieces = []
        xPieces = list(xPieces)
        yPieces = list(yPieces)
        zPieces = list(zPieces)

        xParts = []
        yParts = []
        zParts = []
        mParts = []
        bParts = []
        predictions = []
        # print(xPieces[3], yPieces[3], "\n",  xPieces[4], yPieces[4])
        for sub in range(sets):
            try:
                z = zPieces[sub]
            except:
                z = -1
            subdata = {'x': xPieces[sub], 'y': yPieces[sub], 'z': z}
            #print(subdata)
            self.model = LinearRegression()
            self.model.fit(subdata['x'], subdata['y'])
            #self.vmodel = SVR(kernel='poly', gamma='auto')
            #self.vmodel.fit(subdata['x'], subdata['z'].ravel())

            if z != -1:
                clf = self.regressorOp(subdata['x'], subdata['z'].ravel())
                clf.fit(subdata['x'], subdata['z'].ravel())

                y_pred = clf.predict(subdata['x'])

                accuracy = clf.score(subdata['x'], subdata['z'].ravel())
                variance = metrics.explained_variance_score(subdata['z'], y_pred)
                self.accuracies.append(accuracy)
                prediction = (clf.predict(subdata['x']))
                predictions.append(prediction)

            #print("prediction: ", prediction)

            xParts.append(subdata['x'])
            yParts.append(subdata['y'])
            zParts.append(subdata['z'])
            mParts.append(self.model.coef_)
            bParts.append(self.model.intercept_)

            #plt.plot_all(subdata['x'], {1: subdata['y'], 2: subdata['z']}, self.model.coef_, self.model.intercept_, prediction, self.ticker, 0)

            #print("PARAMS: ", clf.get_params(True))
        return mParts, bParts, xParts, yParts, zParts, predictions

GradientDescent.py

Another resourceful yet slightly more complex trend would be finding a non-linear trend with Gradient Descent.

import numpy as np
import scipy.linalg

def gradient_descent(X, Y, w, c, it, L):
    n = len(X)
    Y_pred = w * X + c  # The current predicted value of Y
    D_m = (-2 / n) * sum(X * (Y - Y_pred))  # Derivative wrt m
    D_c = (-2 / n) * sum(Y - Y_pred)  # Derivative wrt c
    m = w - L * D_m  # Update m
    c = c - L * D_c  # Update c

    return m, c


def batch_gradient_descent(x, y, w, eta):
    derivative = np.sum([-(y[d]-np.dot(w.T.copy(),x[d,:]))*(x[d,:]).reshape(np.shape(w)) for d in range(len(x))],axis=0)
    return eta*(1/len(x))*derivative


def mini_batch_gradient_descent(x, y, w, eta, batch):
    gradient_sum = np.zeros(shape=np.shape(w))
    for b in range(batch):
        choice = np.random.choice(list(range(len(x))))
        gradient_sum += -(y[choice]-np.dot(w.T,x[choice,:]))*x[choice,:].reshape(np.shape(w))
        return eta*(1/batch)*gradient_sum


# initialize variables
def trend_for(data, it, L, m, b):
    x = data['x']
    y = data['y']
    w = np.random.normal(size=(np.shape(x)[1], 1))
    c = 0.0
    # Update w
    w_s = []
    Error = []

    for i in range(it):
        # Calculate error
        error = (1 / 2) * np.sum([(y[i] - (b + np.dot(w.T, x[i, :]))) ** 2 for i in range(len(x))])
        #print(error)
        Error.append(error)
        if error == float('inf'):
            print('INFINITY')
            return x, y, m, b, Error

        m, b = gradient_descent(x, y, w, c, it, L)
        w = m
        c = b

        w_s.append(w.copy())

    return x, y, m, b, Error


Lets Plot Something

We can find all of this data, and build trends for the data, but now we need to see or visualize the data. Lets Plot it

PlotFunctions.py

import numpy as np
import matplotlib.pyplot as plt


def plot_function(x, y, w,b, Error, st, iterations):
    if len(Error) == iterations:
        fig, ax = plt.subplots(nrows=1,ncols=2,figsize=(40,10))
        ax[0].plot(x, ((x * w) + b), c='lightgreen', linewidth=3, zorder=0)
        ax[0].plot(x, y)
        ax[0].set_title(st)
        ax[1].scatter(range(iterations), Error)
        ax[1].set_title('Error')
        plt.show()
    else:
        print('Plotting Error: Iterations and Errors are different sizes')

def plot_all(x, set, w,b, vw, st, iterations):

    fig, ax = plt.subplots(nrows=1,ncols=2,figsize=(40,10))
    wb = [((xi * w) + b) for xi in x]
    print(wb.shape, " >> ", x.shape)
    wb = np.array(wb).reshape(len(x), 1)
    print(wb.shape, " >> ", x.shape)
    ax[0].plot(x, wb, c='lightgreen', linewidth=3, zorder=0)
    ax[0].plot(x, set[1])
    ax[0].set_title(st)
    try:
        ax[1].plot(x, set[2])
        ax[1].set_title('Volume')
        ax[1].plot(x, vw)
    except:
        ''
    plt.show()

Building an Alphavantage Service

Now we built a bunch of services, but no one want to recreate all of this every time they want to see a stock quote. Lets build a service that we can easily call one function and get an entire process all in one.

AlphavantageService.py

import GradientDescent.GD as gd
import Plotting.PlotFunctions as plt
import JsonHandler.StockHandler as sh
import Services.DataManipulate as dm
import LinearRegression.LR as lr

class AlphavantageService:
    data = None
    originalData = None
    ticker = None
    iterations = None
    L = None
    x = None
    y = None
    m = 0.0
    b = 0.0
    Error = None
    DataSet = True

    def __init__(self, t, i, l, stype, mins):
        self.ticker = t
        self.iterations = i
        self.L = l
        if stype == 0:
            self.data, self.originalData = sh.load_data(sh.function['daily'], t, sh.interval['daily'], sh.i_type['none'])
        elif stype == 1:
            try:
                self.data, self.originalData = sh.load_data(sh.function['intra'], t, sh.interval[mins],
                                                        sh.i_type['intra'])
            except:
                self.DataSet = False
        elif stype == 2:
            self.data, self.originalData = sh.load_data(sh.function['month'], t, sh.interval['month'],
                                                        sh.i_type['none'])
        elif stype == 3:
            self.data, self.originalData = sh.load_data('VWAP', t, 'Technical Analysis: VWAP',
                                                        2)

    def get_data(self):
        return self.data

    def set_data_switch(self, val):
        self.DataSet = val

    def originalX(self):
        return self.originalData['ox']


    def view_trend_for(self, l):
        self.L = l
        # Data is deviation, and originalData is true timeseries
        # (data.x = 0,1,2,...,n | originalData.x = 01-01-2020, ..., 02-01-2020, ..., n
        self.x, self.y, self.m, self.b, self.Error = gd.trend_for(self.data, self.iterations, self.L, self.m, self.b)
        return self.m, self.b, self.Error[len(self.Error) - 1] / len(self.x)

    def gradient_descent(self, l, sets, ticker):
        self.L = l
        # Data is deviation, and originalData is true timeseries
        # (data.x = 0,1,2,...,n | originalData.x = 01-01-2020, ..., 02-01-2020, ..., n
        xPieces = dm.pieces(self.data['x'], int(len(self.data['x']) / sets))
        yPieces = dm.pieces(self.data['y'], int(len(self.data['y']) / sets))

        xPieces = list(xPieces)
        yPieces = list(yPieces)
        xParts = []
        yParts = []
        mParts = []
        bParts = []
        # print(xPieces[3], yPieces[3], "\n",  xPieces[4], yPieces[4])
        for sub in range(sets):
            subdata = {'x': xPieces[sub], 'y': yPieces[sub]}
            xPart, yPart, mPart, bPart, ErrorPart = gd.trend_for(subdata, self.iterations, self.L, 0.0, 0.0)
            xParts.append(xPart)
            yParts.append(yPart)
            mParts.append(mPart)
            bParts.append(bPart)

            plt.plot_function(self.originalData['ox'], yPart, mPart, bPart, ErrorPart, self.ticker, self.iterations)

        return mParts, bParts, 0

    def linear_regression(self, sets):
        return lr.view_trend_for_subset(sets)

    def plot_trend(self, acceptedError, error):
        # Plot the predicted function and the Error
        if error < acceptedError:
            plt.plot_function(self.x, self.y, self.m, self.b, self.Error, self.ticker, self.iterations)

    def get_daily_trend_for(ticker, iterations, L):
        data, originalData = sh.load_data(sh.function['daily'], ticker, sh.interval['daily'], sh.i_type['none'])
        x, y, m, b, self.Error = gd.trend_for(data, iterations, L)
        return m, b, Error[len(Error) - 1]

    def get_trendline_for(ticker, iterations, L):
        data, originalData = sh.load_data(sh.function['intra'], ticker, sh.interval[60], sh.i_type['intra'])
        x, y, m, b, Error = gd.trend_for(data, iterations, L)
        return m, b, Error[len(Error) - 1]

Build for Future Enhancements

This is one API. The next article in this series will take this project and tie in another Free API Service that has different data sets. Lets build a service that we call once, and manages Alphavantage as well as other API’s

The service we build will have two key functions. It will have a deep analysis function, and it will also build a watch list.

Watch List

If you want to scan through a large group of tickers, and find stocks that fit your criteria, we can build a function to do this.

What this means is we can use a flat file with tickers, and a reader to read the tickers.

tickers.txt

AAPL XOM MSFT BAC^I IBM CVX GE WMT T JNJ PG WFC KO PFE JPM GOOG PM VOD ORCL INTC MRK VZ

Then, something to read the file

FlatDataReader.py

def read(file):
    tickers = []
    with open(file) as f:
        for line in f:
            line = line.replace("\n", "")
            tickers.append(line)

    return tickers

Our Final Service

StockService.py

import LinearRegression.LR as lr
import Services.AlphavantageService as alphavantage
import numpy as np
from sklearn.linear_model import LinearRegression
import Plotting.PlotFunctions as plt
import JsonHandler.StockHandler as sh
import FlatData.FlatDataReader as fdr
import Services.DataManipulate as dm
import time

#Run: Used to set which tickers you want, some Ai Settings, and what type of dataset you want to work with

#Ai settings
iterations = 20000
L = 0.1
errortolerance = 20
trials = 1000 #set a trial limit
Error = []
#END Ai settings

#OK Now lets loop through and find the best Learning rate
not_found = True


def deep_analysis_of_stock(ticker):

    # cut data into many components
    # First lets Analyze longest trend
    stock_service = alphavantage.AlphavantageService(ticker, 10, 1, 3, 0)
    lineReg = lr.LR(stock_service.get_data(), ticker)
    mParts, bParts, xParts, yParts, zParts, prediction = lineReg.view_trend_for_subset(2)

    for i in range(len(xParts)):
        print(xParts[i].shape, " >> ", yParts[i].shape)
        print('ticker: ', ticker, 'm: ', mParts[i], 'b: ', bParts[i])
        plt.plot_all(xParts[i], {1: yParts[i], 2: zParts[i]},
                     mParts, bParts[i], None, ticker, 0)

    return mParts, bParts, xParts, yParts, zParts, prediction


def get_watchlist(min_price, max_price, wait, min_slope):

    tickers = fdr.read("C:\\Path\\To\\ticker.txt")
    ticker_batches = dm.pieces(tickers, 750)
    ticker_batches = list(ticker_batches)
    print('Batches: ', len(ticker_batches))

    in_budget_stocks = []
    for i in ticker_batches:
        print("batchSize: ", len(i))
        batchString = ','.join(i)
        print('batch: ', batchString)

        data = sh.get_quotes(batchString)

        for line in data:
            if max_price > float(line['2. price']) > min_price:
                print("price: ", line['2. price'], "symbol: ", line['1. symbol'])
                in_budget_stocks.append(line)
        time.sleep(wait)  # Throttle API Calls

    stock_data = []
    for stock in in_budget_stocks:
        print('Reviewing Symbol: ', stock['1. symbol'])
        stock_service = None
        try:
            stock_service = alphavantage.AlphavantageService(stock['1. symbol'], iterations, L, 0, 15)
        except:
            stock_service.set_data_switch(False)
            continue
        if stock_service is None or stock_service.get_data() is None:
            print('No Data Available for: ', stock['1. symbol'])
            continue
        lineReg = lr.LR(stock_service.get_data(), stock['1. symbol'] + " From " + stock_service.originalX()[0] + " TO "
                        + stock_service.originalX()[len(stock_service.originalX()) - 1])

        mParts, bParts, xParts, yParts, zParts, prediction = lineReg.view_trend_for_subset(1)

        analysis = {'ticker': stock['1. symbol'], 'mParts': mParts, 'bParts': bParts, 'xParts': xParts, 'yParts': yParts,
                    'zParts': zParts, 'pz': prediction}

        stock_data.append(analysis)
        time.sleep(wait)

    watch_list = []

    for analysis in stock_data:
        for i in range(len(analysis['xParts'])):
            print('ticker: ', analysis['ticker'], 'm: ', analysis['mParts'][i], 'b: ', analysis['bParts'][i])
            if analysis['mParts'][i] > min_slope:
                watch_list.append(analysis)
                plt.plot_all(analysis['xParts'][i], {1: analysis['yParts'][i], 2: analysis['zParts'][i]},
                             analysis['mParts'][i], analysis['bParts'][i], analysis['pz'][i], analysis['ticker'], 0)


    ##print('WatchList: ', watch_list)
    return watch_list

And Finally, Use your Stock Service

Run.py

import StockService as ss

wl = ss.get_watchlist(100, 105, 15, 0.1)
print(wl)
# Now, once we get are watch list, we can Do further analysis.
for analysis in wl:
    print('Deep Analysis', ss.deep_analysis_of_stock(analysis['ticker']))

Conclusion

This code is intended to show how to use AlphaVantage and Python to begin the process of a Stock Analysis Tool. This Tool will almost surely not help you make decisions with your stock picks.

This Application will however give you a great head start on building a Stock service using AlphaVantage and Python.

Leave a Reply

Discover more from SynchroDynamic

Subscribe now to keep reading and get access to the full archive.

Continue reading