2121import pandas as pd
2222
2323import backtrader as bt
24- from alpaca_trade_api .entity import Aggs
2524from backtrader .metabase import MetaParams
2625from backtrader .utils .py3 import queue , with_metaclass
2726
@@ -167,6 +166,7 @@ async def on_trade(self, msg):
167166
168167class MetaSingleton (MetaParams ):
169168 '''Metaclass to make a metaclassed class a singleton'''
169+
170170 def __init__ (cls , name , bases , dct ):
171171 super (MetaSingleton , cls ).__init__ (name , bases , dct )
172172 cls ._singleton = None
@@ -285,11 +285,11 @@ def get_notifications(self):
285285
286286 # Alpaca supported granularities
287287 _GRANULARITIES = {
288- (bt .TimeFrame .Minutes , 1 ): '1Min' ,
289- (bt .TimeFrame .Minutes , 5 ): '5Min' ,
288+ (bt .TimeFrame .Minutes , 1 ): '1Min' ,
289+ (bt .TimeFrame .Minutes , 5 ): '5Min' ,
290290 (bt .TimeFrame .Minutes , 15 ): '15Min' ,
291291 (bt .TimeFrame .Minutes , 60 ): '1H' ,
292- (bt .TimeFrame .Days , 1 ): '1D' ,
292+ (bt .TimeFrame .Days , 1 ): '1D' ,
293293 }
294294
295295 def get_positions (self ):
@@ -489,6 +489,7 @@ def get_aggs_from_alpaca(self,
489489 but we need to manipulate it to be able to work with it
490490 smoothly
491491 """
492+
492493 def _granularity_to_timeframe (granularity ):
493494 if granularity in [Granularity .Minute , Granularity .Ticks ]:
494495 timeframe = TimeFrame .Minute
@@ -573,19 +574,6 @@ def _resample(df):
573574 else :
574575 return df
575576
576- # def _back_to_aggs(df):
577- # response = []
578- # for i, v in df.iterrows():
579- # response.append({
580- # "o": v.open,
581- # "h": v.high,
582- # "l": v.low,
583- # "c": v.close,
584- # "v": v.volume,
585- # "t": i.timestamp() * 1000,
586- # })
587- # return Aggs({"results": response})
588-
589577 if not start :
590578 timeframe = _granularity_to_timeframe (granularity )
591579 start = end - timedelta (days = 1 )
@@ -608,11 +596,11 @@ def _resample(df):
608596 def streaming_prices (self ,
609597 dataname , timeframe , tmout = None , data_feed = 'iex' ):
610598 q = queue .Queue ()
611- kwargs = {'q' : q ,
612- 'dataname' : dataname ,
599+ kwargs = {'q' : q ,
600+ 'dataname' : dataname ,
613601 'timeframe' : timeframe ,
614602 'data_feed' : data_feed ,
615- 'tmout' : tmout }
603+ 'tmout' : tmout }
616604 t = threading .Thread (target = self ._t_streaming_prices , kwargs = kwargs )
617605 t .daemon = True
618606 t .start ()
@@ -629,7 +617,6 @@ def _t_streaming_prices(self, dataname, timeframe, q, tmout, data_feed):
629617 else :
630618 method = StreamingMethod .MinuteAgg
631619
632-
633620 streamer = Streamer (q ,
634621 api_key = self .p .key_id ,
635622 api_secret = self .p .secret_key ,
@@ -648,9 +635,9 @@ def get_value(self):
648635 return self ._value
649636
650637 _ORDEREXECS = {
651- bt .Order .Market : 'market' ,
652- bt .Order .Limit : 'limit' ,
653- bt .Order .Stop : 'stop' ,
638+ bt .Order .Market : 'market' ,
639+ bt .Order .Limit : 'limit' ,
640+ bt .Order .Stop : 'stop' ,
654641 bt .Order .StopLimit : 'stop_limit' ,
655642 bt .Order .StopTrail : 'trailing_stop' ,
656643 }
@@ -760,6 +747,7 @@ def _check_if_transaction_occurred(order_id):
760747 if trans is None :
761748 break
762749 self ._process_transaction (order_id , trans )
750+
763751 while True :
764752 try :
765753 if self .q_ordercreate .empty ():
@@ -847,7 +835,7 @@ def _transaction(self, trans):
847835 self ._transpend [oid ].append (trans )
848836 self ._process_transaction (oid , trans )
849837
850- _X_ORDER_FILLED = ('partially_filled' , 'filled' , )
838+ _X_ORDER_FILLED = ('partially_filled' , 'filled' ,)
851839
852840 def _process_transaction (self , oid , trans ):
853841 try :
0 commit comments