22import json
33from typing import Any
44from typing import Dict
5+ from typing import List
56from typing import Optional
7+ from typing import Tuple
68from typing import Union
79
810from ddtrace ._trace .span import Span
1517from ddtrace .appsec ._http_utils import extract_cookies_from_headers
1618from ddtrace .appsec ._http_utils import normalize_headers
1719from ddtrace .appsec ._http_utils import parse_http_body
20+ from ddtrace .appsec ._utils import Block_config
1821from ddtrace .contrib import trace_utils
1922from ddtrace .contrib .internal .trace_utils_base import _get_request_header_user_agent
2023from ddtrace .contrib .internal .trace_utils_base import _set_url_tag
@@ -292,24 +295,23 @@ def _on_grpc_server_data(headers, request_message, method, metadata):
292295 set_waf_address (SPAN_DATA_NAMES .GRPC_SERVER_REQUEST_METADATA , dict (metadata ))
293296
294297
295- def _wsgi_make_block_content (ctx , construct_url ):
298+ def _wsgi_make_block_content (ctx , construct_url ) -> Tuple [ int , List [ Tuple [ str , str ]], bytes ] :
296299 middleware = ctx .get_item ("middleware" )
297300 req_span = ctx .get_item ("req_span" )
298301 headers = ctx .get_item ("headers" )
299302 environ = ctx .get_item ("environ" )
300303 if req_span is None :
301304 raise ValueError ("request span not found" )
302- block_config = get_blocked ()
303- desired_type = block_config .get ("type" , "auto" )
305+ block_config : Block_config = get_blocked () or Block_config ()
304306 ctype = None
305- if desired_type == "none" :
306- content = ""
307- resp_headers = [("content-type" , "text/plain; charset=utf-8" ), ("location" , block_config .get ( " location" , "" ) )]
307+ if block_config . type == "none" :
308+ content = b ""
309+ resp_headers = [("content-type" , "text/plain; charset=utf-8" ), ("location" , block_config .location )]
308310 else :
309- ctype = block_config .get ( "content-type" , "application/json" )
310- content = http_utils ._get_blocked_template (ctype ).encode ("UTF-8" )
311+ ctype = block_config .content_type
312+ content = http_utils ._get_blocked_template (ctype , block_config . block_id ).encode ("UTF-8" )
311313 resp_headers = [("content-type" , ctype )]
312- status = block_config .get ( " status_code" , 403 )
314+ status = block_config .status_code
313315 try :
314316 req_span ._set_tag_str (RESPONSE_HEADERS + ".content-length" , str (len (content )))
315317 if ctype is not None :
@@ -332,28 +334,26 @@ def _wsgi_make_block_content(ctx, construct_url):
332334 return status , resp_headers , content
333335
334336
335- def _asgi_make_block_content (ctx , url ):
337+ def _asgi_make_block_content (ctx , url ) -> Tuple [ int , List [ Tuple [ bytes , bytes ]], bytes ] :
336338 middleware = ctx .get_item ("middleware" )
337339 req_span = ctx .get_item ("req_span" )
338340 headers = ctx .get_item ("headers" )
339341 environ = ctx .get_item ("environ" )
340342 if req_span is None :
341343 raise ValueError ("request span not found" )
342- block_config = get_blocked ()
343- desired_type = block_config .get ("type" , "auto" )
344+ block_config = get_blocked () or Block_config ()
344345 ctype = None
345- if desired_type == "none" :
346- content = ""
346+ if block_config . type == "none" :
347+ content = b ""
347348 resp_headers = [
348349 (b"content-type" , b"text/plain; charset=utf-8" ),
349- (b"location" , block_config .get ( " location" , "" ) .encode ()),
350+ (b"location" , block_config .location .encode ()),
350351 ]
351352 else :
352- ctype = block_config .get ("content-type" , "application/json" )
353- content = http_utils ._get_blocked_template (ctype ).encode ("UTF-8" )
353+ content = http_utils ._get_blocked_template (block_config .content_type , block_config .block_id ).encode ("UTF-8" )
354354 # ctype = f"{ctype}; charset=utf-8" can be considered at some point
355- resp_headers = [(b"content-type" , ctype .encode ())]
356- status = block_config .get ( " status_code" , 403 )
355+ resp_headers = [(b"content-type" , block_config . content_type .encode ())]
356+ status = block_config .status_code
357357 try :
358358 req_span ._set_tag_str (RESPONSE_HEADERS + ".content-length" , str (len (content )))
359359 if ctype is not None :
0 commit comments