Skip to content
64 changes: 63 additions & 1 deletion alpaca_backtrader_api/alpacabroker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
unicode_literals)

import collections
from curses import has_key
import logging

from backtrader import BrokerBase, Order, BuyOrder, SellOrder
from backtrader.order import Order
from backtrader.utils.py3 import with_metaclass, iteritems
from backtrader.comminfo import CommInfoBase
from backtrader.position import Position
Expand Down Expand Up @@ -55,7 +58,7 @@ class AlpacaBroker(with_metaclass(MetaAlpacaBroker, BrokerBase)):

def __init__(self, **kwargs):
super(AlpacaBroker, self).__init__()

self.logger = logging.getLogger(self.__class__.__name__)
self.o = alpacastore.AlpacaStore(**kwargs)

self.orders = collections.OrderedDict() # orders by order id
Expand Down Expand Up @@ -102,6 +105,32 @@ def start(self):
self.startingvalue = self.value = self.o.get_value()
self.positions = self.update_positions()


_ORDEREXECS = {
'market': Order.Market,
'limit': Order.Limit,
'stop': Order.Stop,
'stop_limit': Order.StopLimit,
'trailing_stop': Order.StopTrail
}

_ORDERSTATUS = {
'new': Order.Created,
'accepted': Order.Accepted,
'accepted_for_bidding': Order.Accepted,
'canceled': Order.Canceled,
'expired': Order.Expired,
'filled': Order.Completed,
'partially_filled': Order.Partial,
'pending_cancel': Order.Partial,
'pending_replace': Order.Partial,
'rejected': Order.Rejected,
'suspended': Order.Rejected,
'stopped': Order.Completed,
'calculated': Order.Partial
}


def data_started(self, data):
pos = self.getposition(data)

Expand Down Expand Up @@ -137,6 +166,37 @@ def data_started(self, data):
order.completed()
self.notify(order)

alpaca_orders = self.o.get_orders()
alpaca_orders = {o.symbol: o for o in alpaca_orders}
o = alpaca_orders.get(data._name, None)
if o is not None:
self.logger.debug(f"Got open order: {o}")
exectype = self._ORDEREXECS[o.order_type]
status = self._ORDERSTATUS[o.status]
price = o.stop_price if o.stop_price is not None else o.limit_price
if o.side == "buy":
order = BuyOrder(data=data,
size = float(o.qty) if o.qty is not None else o.qty,
price = float(price) if price is not None else None,
exectype=exectype,
simulated=True
)
if o.side == "sell":
order = SellOrder(data=data,
size = float(o.qty) if o.qty is not None else o.qty,
price = float(price) if price is not None else None,
exectype=exectype,
simulated=True
)
order.status = status
order.tradeid = o.id
# icky, this is leaky, can we use the "create order" function with a "fake" order?
self.o._orders[order.ref] = o.id
self.o._ordersrev[o.id] = order.ref # maps ids to backtrader order
self.orders[order.ref] = order
self.notify(order)


def stop(self):
super(AlpacaBroker, self).stop()
self.o.stop()
Expand Down Expand Up @@ -324,8 +384,10 @@ def sell(self, owner, data,

def cancel(self, order):
if not self.orders.get(order.ref, False):
self.logger.warning(f"Cannot cancel unknown order: {order.ref}")
return
if order.status == Order.Cancelled: # already cancelled
self.logger.warning(f"Order {order.ref} already canceled!")
return

return self.o.order_cancel(order)
Expand Down
9 changes: 8 additions & 1 deletion alpaca_backtrader_api/alpacadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
unicode_literals)

from datetime import timedelta
import logging
import pandas as pd
from backtrader.feed import DataBase
from backtrader import date2num, num2date
Expand Down Expand Up @@ -159,6 +160,7 @@ def __init__(self, **kwargs):
self._candleFormat = 'bidask' if self.p.bidask else 'midpoint'
self._timeframe = self.p.timeframe
self.do_qcheck(True, 0)
self.logger = logging.getLogger(self.__class__.__name__)
if self._timeframe not in [bt.TimeFrame.Ticks,
bt.TimeFrame.Minutes,
bt.TimeFrame.Days]:
Expand All @@ -179,6 +181,7 @@ def start(self):
contractdetails if it exists
"""
super(AlpacaData, self).start()
self.logger.info("Starting data feed: %s" % self.p.dataname)

# Create attributes as soon as possible
self._statelivereconn = False # if reconnecting in live state
Expand Down Expand Up @@ -268,9 +271,10 @@ def _load(self):
if self._state == self._ST_LIVE:
try:
msg = (self._storedmsg.pop(None, None) or
self.qlive.get(timeout=self._qcheck))
self.qlive.get(timeout=self.p.qcheck))
except queue.Empty:
return None # indicate timeout situation

if msg is None: # Conn broken during historical/backfilling
self.put_notification(self.CONNBROKEN)
# Try to reconnect
Expand Down Expand Up @@ -404,6 +408,7 @@ def _load(self):

def _load_tick(self, msg):
dtobj = pd.Timestamp(msg['time'], unit='ns')
self.logger.debug("Loading tick at: %s %s" % (dtobj, msg))
dt = date2num(dtobj)
if dt <= self.lines.datetime[-1]:
return False # time already seen
Expand All @@ -428,6 +433,7 @@ def _load_tick(self, msg):

def _load_agg(self, msg):
dtobj = pd.Timestamp(msg['time'], unit='ns')
self.logger.debug("Loading agg at: %s %s" % (dtobj, msg))
dt = date2num(dtobj)
if dt <= self.lines.datetime[-1]:
return False # time already seen
Expand All @@ -443,6 +449,7 @@ def _load_agg(self, msg):

def _load_history(self, msg):
dtobj = msg['time'].to_pydatetime()
self.logger.debug("Loading history at: %s %s" % (dtobj, msg))
dt = date2num(dtobj)
if dt <= self.lines.datetime[-1]:
return False # time already seen
Expand Down
Loading