diff --git a/alpaca_backtrader_api/alpacabroker.py b/alpaca_backtrader_api/alpacabroker.py index 2a3d781e..bd8f815b 100644 --- a/alpaca_backtrader_api/alpacabroker.py +++ b/alpaca_backtrader_api/alpacabroker.py @@ -2,8 +2,11 @@ unicode_literals) import collections +from curses import has_key +import logging from backtrader import BrokerBase, Order, BuyOrder, SellOrder +from backtrader.order import Order from backtrader.utils.py3 import with_metaclass, iteritems from backtrader.comminfo import CommInfoBase from backtrader.position import Position @@ -55,7 +58,7 @@ class AlpacaBroker(with_metaclass(MetaAlpacaBroker, BrokerBase)): def __init__(self, **kwargs): super(AlpacaBroker, self).__init__() - + self.logger = logging.getLogger(self.__class__.__name__) self.o = alpacastore.AlpacaStore(**kwargs) self.orders = collections.OrderedDict() # orders by order id @@ -102,6 +105,32 @@ def start(self): self.startingvalue = self.value = self.o.get_value() self.positions = self.update_positions() + + _ORDEREXECS = { + 'market': Order.Market, + 'limit': Order.Limit, + 'stop': Order.Stop, + 'stop_limit': Order.StopLimit, + 'trailing_stop': Order.StopTrail + } + + _ORDERSTATUS = { + 'new': Order.Created, + 'accepted': Order.Accepted, + 'accepted_for_bidding': Order.Accepted, + 'canceled': Order.Canceled, + 'expired': Order.Expired, + 'filled': Order.Completed, + 'partially_filled': Order.Partial, + 'pending_cancel': Order.Partial, + 'pending_replace': Order.Partial, + 'rejected': Order.Rejected, + 'suspended': Order.Rejected, + 'stopped': Order.Completed, + 'calculated': Order.Partial + } + + def data_started(self, data): pos = self.getposition(data) @@ -137,6 +166,37 @@ def data_started(self, data): order.completed() self.notify(order) + alpaca_orders = self.o.get_orders() + alpaca_orders = {o.symbol: o for o in alpaca_orders} + o = alpaca_orders.get(data._name, None) + if o is not None: + self.logger.debug(f"Got open order: {o}") + exectype = self._ORDEREXECS[o.order_type] + status = self._ORDERSTATUS[o.status] + price = o.stop_price if o.stop_price is not None else o.limit_price + if o.side == "buy": + order = BuyOrder(data=data, + size = float(o.qty) if o.qty is not None else o.qty, + price = float(price) if price is not None else None, + exectype=exectype, + simulated=True + ) + if o.side == "sell": + order = SellOrder(data=data, + size = float(o.qty) if o.qty is not None else o.qty, + price = float(price) if price is not None else None, + exectype=exectype, + simulated=True + ) + order.status = status + order.tradeid = o.id + # icky, this is leaky, can we use the "create order" function with a "fake" order? + self.o._orders[order.ref] = o.id + self.o._ordersrev[o.id] = order.ref # maps ids to backtrader order + self.orders[order.ref] = order + self.notify(order) + + def stop(self): super(AlpacaBroker, self).stop() self.o.stop() @@ -324,8 +384,10 @@ def sell(self, owner, data, def cancel(self, order): if not self.orders.get(order.ref, False): + self.logger.warning(f"Cannot cancel unknown order: {order.ref}") return if order.status == Order.Cancelled: # already cancelled + self.logger.warning(f"Order {order.ref} already canceled!") return return self.o.order_cancel(order) diff --git a/alpaca_backtrader_api/alpacadata.py b/alpaca_backtrader_api/alpacadata.py index ea345342..fd334c4e 100644 --- a/alpaca_backtrader_api/alpacadata.py +++ b/alpaca_backtrader_api/alpacadata.py @@ -2,6 +2,7 @@ unicode_literals) from datetime import timedelta +import logging import pandas as pd from backtrader.feed import DataBase from backtrader import date2num, num2date @@ -159,6 +160,7 @@ def __init__(self, **kwargs): self._candleFormat = 'bidask' if self.p.bidask else 'midpoint' self._timeframe = self.p.timeframe self.do_qcheck(True, 0) + self.logger = logging.getLogger(self.__class__.__name__) if self._timeframe not in [bt.TimeFrame.Ticks, bt.TimeFrame.Minutes, bt.TimeFrame.Days]: @@ -179,6 +181,7 @@ def start(self): contractdetails if it exists """ super(AlpacaData, self).start() + self.logger.info("Starting data feed: %s" % self.p.dataname) # Create attributes as soon as possible self._statelivereconn = False # if reconnecting in live state @@ -268,9 +271,10 @@ def _load(self): if self._state == self._ST_LIVE: try: msg = (self._storedmsg.pop(None, None) or - self.qlive.get(timeout=self._qcheck)) + self.qlive.get(timeout=self.p.qcheck)) except queue.Empty: return None # indicate timeout situation + if msg is None: # Conn broken during historical/backfilling self.put_notification(self.CONNBROKEN) # Try to reconnect @@ -404,6 +408,7 @@ def _load(self): def _load_tick(self, msg): dtobj = pd.Timestamp(msg['time'], unit='ns') + self.logger.debug("Loading tick at: %s %s" % (dtobj, msg)) dt = date2num(dtobj) if dt <= self.lines.datetime[-1]: return False # time already seen @@ -428,6 +433,7 @@ def _load_tick(self, msg): def _load_agg(self, msg): dtobj = pd.Timestamp(msg['time'], unit='ns') + self.logger.debug("Loading agg at: %s %s" % (dtobj, msg)) dt = date2num(dtobj) if dt <= self.lines.datetime[-1]: return False # time already seen @@ -443,6 +449,7 @@ def _load_agg(self, msg): def _load_history(self, msg): dtobj = msg['time'].to_pydatetime() + self.logger.debug("Loading history at: %s %s" % (dtobj, msg)) dt = date2num(dtobj) if dt <= self.lines.datetime[-1]: return False # time already seen diff --git a/alpaca_backtrader_api/alpacastore.py b/alpaca_backtrader_api/alpacastore.py index b9f0ffca..137a671a 100644 --- a/alpaca_backtrader_api/alpacastore.py +++ b/alpaca_backtrader_api/alpacastore.py @@ -1,5 +1,6 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) +import logging import os import collections import time @@ -24,8 +25,6 @@ from backtrader.metabase import MetaParams from backtrader.utils.py3 import queue, with_metaclass -NY = 'America/New_York' - # Extend the exceptions to support extra cases class AlpacaError(Exception): @@ -125,6 +124,7 @@ def __init__( except RuntimeError: asyncio.set_event_loop(asyncio.new_event_loop()) + self.logger = logging.getLogger(self.__class__.__name__) self.conn = Stream(api_key, api_secret, base_url, @@ -135,6 +135,7 @@ def __init__( self.q = q def run(self): + self.logger.info("Starting streamer for: %s %s" % (self.instrument, self.method.name)) if self.method == StreamingMethod.AccountUpdate: self.conn.subscribe_trade_updates(self.on_trade) elif self.method == StreamingMethod.MinuteAgg: @@ -152,16 +153,20 @@ async def on_listen(self, conn, stream, msg): async def on_quotes(self, msg): msg._raw['time'] = msg.timestamp + self.logger.debug("Got: %s" % msg) self.q.put(msg._raw) async def on_agg_min(self, msg): msg._raw['time'] = msg.timestamp + self.logger.debug("Got: %s" % msg) self.q.put(msg._raw) async def on_account(self, msg): + self.logger.debug("Got: %s" % msg) self.q.put(msg) async def on_trade(self, msg): + self.logger.debug("Got: %s" % msg) self.q.put(msg) @@ -193,6 +198,9 @@ class AlpacaStore(with_metaclass(MetaSingleton, object)): - ``account_tmout`` (default: ``10.0``): refresh period for account value/cash refresh + + - ``order_tmout`` (default: ``0.05``): how often the order creation queue + is checked within _t_create_order ''' BrokerCls = None # broker class will autoregister @@ -203,6 +211,7 @@ class AlpacaStore(with_metaclass(MetaSingleton, object)): ('secret_key', ''), ('paper', False), ('account_tmout', 10.0), # account balance refresh timeout + ('order_tmout', 0.05), ('api_version', None) ) @@ -224,6 +233,7 @@ def getbroker(cls, *args, **kwargs): def __init__(self): super(AlpacaStore, self).__init__() + self.logger = logging.getLogger(self.__class__.__name__) self.notifs = collections.deque() # store notifications for cerebro @@ -293,6 +303,17 @@ def get_notifications(self): (bt.TimeFrame.Days, 1): '1D', } + def get_orders(self): + try: + orders = self.oapi.list_orders() + except (AlpacaError, AlpacaRequestError,): + return [] + if orders: + if 'code' in orders[0]._raw: + return [] + return orders + + def get_positions(self): try: positions = self.oapi.list_positions() @@ -412,17 +433,18 @@ def _t_candles(self, dataname, dtbegin, dtend, timeframe, compression, # don't use dt.replace. use localize # (https://stackoverflow.com/a/1592837/2739124) - cdl = cdl.loc[ - pytz.timezone(NY).localize(dtbegin) if - not dtbegin.tzname() else dtbegin: - pytz.timezone(NY).localize(dtend) if - not dtend.tzname() else dtend - ].dropna(subset=['high']) - records = cdl.reset_index().to_dict('records') - for r in records: - r['time'] = r['timestamp'] - q.put(r) - q.put({}) # end of transmission + if not cdl.empty: + cdl = cdl.loc[ + pytz.utc.localize(dtbegin) if + not dtbegin.tzname() else dtbegin: + pytz.utc.localize(dtend) if + not dtend.tzname() else dtend + ].dropna(subset=['high']) + records = cdl.reset_index().to_dict('records') + for r in records: + r['time'] = r['timestamp'] + q.put(r) + q.put({}) # end of transmission def _make_sure_dates_are_initialized_properly(self, dtbegin, dtend, granularity): @@ -439,14 +461,14 @@ def _make_sure_dates_are_initialized_properly(self, dtbegin, dtend, :return: """ if not dtend: - dtend = pd.Timestamp('now', tz=NY) + dtend = pd.Timestamp('now', tz=pytz.utc) else: - dtend = pd.Timestamp(pytz.timezone('UTC').localize(dtend)) if \ + dtend = pd.Timestamp(pytz.utc.localize(dtend)) if \ not dtend.tzname() else dtend if granularity == Granularity.Minute: calendar = exchange_calendars.get_calendar(name='NYSE') while not calendar.is_open_on_minute(dtend.ceil(freq='T')): - dtend = dtend.replace(hour=15, + dtend = dtend.replace(hour=20, minute=59, second=0, microsecond=0) @@ -456,13 +478,13 @@ def _make_sure_dates_are_initialized_properly(self, dtbegin, dtend, delta = timedelta(days=days) dtbegin = dtend - delta else: - dtbegin = pd.Timestamp(pytz.timezone('UTC').localize(dtbegin)) if \ + dtbegin = pd.Timestamp(pytz.utc.localize(dtbegin)) if \ not dtbegin.tzname() else dtbegin while dtbegin > dtend: # if we start the script during market hours we could get this # situation. this resolves that. dtbegin -= timedelta(days=1) - return dtbegin.astimezone(NY), dtend.astimezone(NY) + return dtbegin.astimezone(pytz.utc), dtend.astimezone(pytz.utc) def get_aggs_from_alpaca(self, dataname, @@ -506,51 +528,34 @@ def _granularity_to_timeframe(granularity): timeframe = TimeFrame.Day return timeframe - def _iterate_api_calls(): + def _get_bars(): """ - you could get max 1000 samples from the server. if we need more - than that we need to do several api calls. - - currently the alpaca api supports also 5Min and 15Min so we could - optimize server communication time by addressing timeframes """ - got_all = False - curr = end - response = pd.DataFrame() - while not got_all: - timeframe = _granularity_to_timeframe(granularity) - r = self.oapi.get_bars(dataname, - timeframe, - start.isoformat(), - curr.isoformat()) - if r: - earliest_sample = r[0].t - response = pd.concat([r.df, response], axis=0) - if earliest_sample <= (pytz.timezone(NY).localize( - start) if not start.tzname() else start): - got_all = True - else: - delta = timedelta(days=1) if granularity == "day" \ - else timedelta(minutes=1) - curr = earliest_sample - delta - else: - # no more data is available, let's return what we have - break - return response + timeframe = _granularity_to_timeframe(granularity) + self.logger.debug(f"Getting bars for: {dataname} from: {start} to {end} by {compression} {granularity}") + response = self.oapi.get_bars(dataname, + timeframe, + start.isoformat(), + end.isoformat()) + return response.df def _clear_out_of_market_hours(df): """ - only interested in samples between 9:30, 16:00 NY time + only interested in samples between 9:30, 16:00 NY time, which is 14:30 to 21:00 UTC """ - return df.between_time("09:30", "16:00") + if df.empty: + return df + return df.between_time("14:30", "21:00") def _drop_early_samples(df): """ - samples from server don't start at 9:30 NY time + samples from server don't start at 9:30 NY time (14:30 UTC) let's drop earliest samples """ + if df.empty: + return df for i, b in df.iterrows(): - if i.time() >= dtime(9, 30): + if i.time() >= dtime(14, 30): return df[i:] def _resample(df): @@ -558,6 +563,8 @@ def _resample(df): samples returned with certain window size (1 day, 1 minute) user may want to work with different window size (5min) """ + if df.empty: + return df if granularity == Granularity.Minute: sample_size = f"{compression}Min" @@ -573,27 +580,31 @@ def _resample(df): ]) ) if granularity == Granularity.Minute: - return df.between_time("09:30", "16:00") + return _clear_out_of_market_hours(df) else: return df if not start: - timeframe = _granularity_to_timeframe(granularity) start = end - timedelta(days=1) - response = self.oapi.get_bars(dataname, - timeframe, start, end)._raw - else: - response = _iterate_api_calls() - cdl = response - if granularity == Granularity.Minute: - cdl = _clear_out_of_market_hours(cdl) - cdl = _drop_early_samples(cdl) - if compression != 1: - response = _resample(cdl) - else: - response = cdl + response = _get_bars() + if response.empty: #for free accounts you cannot request the last 15 minutes of data, this results as an empty response + end = end - timedelta(minutes=15) + self.logger.debug(f"Got empty response! Trying bars for: {dataname} from: {start} to {end} by {compression} {granularity}") + response = _get_bars() + + self.logger.debug(f"Got: {response}") + if not response.empty: + cdl = response + if granularity == Granularity.Minute: + cdl = _clear_out_of_market_hours(cdl) + cdl = _drop_early_samples(cdl) + if compression != 1: + response = _resample(cdl) + else: + response = cdl response = response.dropna() response = response[~response.index.duplicated()] + self.logger.debug(f"Response: {response}") return response def streaming_prices(self, @@ -753,13 +764,15 @@ def _check_if_transaction_occurred(order_id): while True: try: - if self.q_ordercreate.empty(): + try: + msg = self.q_ordercreate.get(timeout=self.p.order_tmout) + except queue.Empty: continue - msg = self.q_ordercreate.get() if msg is None: continue oref, okwargs = msg try: + self.logger.debug(f"Submitting order: {okwargs}") o = self.oapi.submit_order(**okwargs) except Exception as e: self.put_notification(e) @@ -767,14 +780,15 @@ def _check_if_transaction_occurred(order_id): continue try: oid = o.id - except Exception: + except Exception as e: if 'code' in o._raw: + desc = o.description if hasattr(o, "description") else '' self.put_notification(f"error submitting order " f"code: {o.code}. msg: " - f"{o.message}") + f"{o.message}, desc: {desc}") else: self.put_notification( - "General error from the Alpaca server") + f"General error from the Alpaca server: {e}") self.broker._reject(oref) continue @@ -809,8 +823,10 @@ def _t_order_cancel(self): oid = self._orders.get(oref, None) if oid is None: + self.logger.warning(f"Cannot cancel unknown order: {oid}") continue # the order is no longer there try: + self.logger.debug(f"Canceling order: {oid}") self.oapi.cancel_order(oid) except Exception as e: self.put_notification(