55import asyncio
66from typing import List
77from abc import ABC , abstractmethod
8- import queue
8+ from datetime import datetime
99
1010from requests .models import HTTPError
1111
@@ -110,19 +110,25 @@ def __init__(self,object:SalesforceObject,connection:BulkAPIConnection) -> None:
110110 self .object = object
111111 self .id = None
112112
113- def status (self ):
114- req = requests .get (f"{ self .connection .instance_url } /services/data/{ self .connection .settings .api_version } /jobs/query/{ self .id } " ,headers = self .connection .headers )
113+ async def status (self ):
114+ loop = asyncio .get_event_loop ()
115+ f3 = loop .run_in_executor (None ,lambda :requests .get (f"{ self .connection .instance_url } /services/data/{ self .connection .settings .api_version } /jobs/query/{ self .id } " ,headers = self .connection .headers ))
116+ req = await f3
115117 req .raise_for_status ()
116118 return req .json ()['state' ]
117119
118120
119121 async def start (self ):
120122 print (f'Starting job for { self .object .name } ' )
121- req = requests .post (
123+
124+ loop = asyncio .get_event_loop ()
125+ f1 = loop .run_in_executor (None ,lambda : requests .post (
122126 f"{ self .connection .instance_url } /services/data/{ self .connection .settings .api_version } /jobs/query" ,
123127 data = json .dumps (self .body ),
124128 headers = self .connection .headers
125- )
129+ ))
130+ req = await f1
131+
126132 try :
127133 req .raise_for_status ()
128134 except HTTPError as e :
@@ -136,11 +142,21 @@ async def start(self):
136142 raise
137143
138144 self .id = req .json ()['id' ]
145+ delay_iter = iter ([1 ,1 ,10 ,30 ,60 ])
146+
139147 while True :
140- await asyncio .sleep (1 )
141- status = self .status ()
148+ try :
149+ delay = next (delay_iter )
150+ except StopIteration :
151+ delay = delay
152+
153+ await asyncio .sleep (delay )
154+
155+ status = await self .status ()
156+ print (f'{ self .object .name } : { status } ' )
157+
142158 if status == 'JobComplete' :
143- self .on_complete (f"{ self .connection .instance_url } /services/data/{ self .connection .settings .api_version } /jobs/query/{ self .id } /results" ,self )
159+ await self .on_complete (f"{ self .connection .instance_url } /services/data/{ self .connection .settings .api_version } /jobs/query/{ self .id } /results" ,self )
144160 print (f'Finished job for { self .object .name } ' )
145161 break
146162 return 0
@@ -187,13 +203,18 @@ def __init__(self,result_url:str,job:BulkAPIJob) -> None:
187203 self .batch_number = 0
188204 self .job = job
189205
190- def fetch (self ):
191- result = requests .get (f"{ self .result_url } ?maxRecords=10000" ,headers = self .job .connection .headers )
206+ async def fetch (self ):
207+ loop = asyncio .get_event_loop ()
208+ f1 = loop .run_in_executor (None ,lambda : requests .get (f"{ self .result_url } ?maxRecords=50000" ,headers = self .job .connection .headers ))
209+
210+ self .datetime_start_fetch = datetime .now ()
211+ result = await f1
192212 self .handle (result )
193213 while 'sforce-locator' in result .headers .keys ():
194214 if (result .headers ['sforce-locator' ]!= 'NA' ) & (result .headers ['sforce-locator' ]!= 'null' ):
195215 self .batch_number += 1
196- result = requests .get (f"{ self .result_url } ?locator={ result .headers ['sforce-locator' ]} &maxRecords=10000" ,headers = self .job .connection .headers )
216+ f2 = loop .run_in_executor (None ,lambda : requests .get (f"{ self .result_url } ?locator={ result .headers ['sforce-locator' ]} &maxRecords=50000" ,headers = self .job .connection .headers ))
217+ result = await f2
197218 self .handle (result )
198219 else :
199220 break
@@ -205,10 +226,10 @@ def handle(self,data):
205226
206227class JobCompleteEvent (List [BulkAPIResultHandler ]):
207228
208- def __call__ (self , * args , ** kwargs ):
229+ async def __call__ (self , * args , ** kwargs ):
209230 for c in self :
210231 i = c (* args ,* kwargs )
211- i .fetch ()
232+ await i .fetch ()
212233
213234 def __repr__ (self ):
214235 return "Event(%s)" % list .__repr__ (self )
@@ -230,5 +251,4 @@ async def run_all(self):
230251 )
231252 else :
232253 break
233-
234-
254+
0 commit comments