|
47 | 47 | FLOAT = 701 |
48 | 48 | FLOAT_ARRAY = 1022 |
49 | 49 | GEOMETRY = 3000 |
| 50 | +GEOMETRYHEX = 3999 |
50 | 51 | INET = 869 |
51 | 52 | INET_ARRAY = 1041 |
52 | 53 | INT2VECTOR = 22 |
@@ -497,6 +498,82 @@ def array_recv_binary(data: bytes, idx: int, length: int) -> typing.List: |
497 | 498 | return values |
498 | 499 |
|
499 | 500 |
|
| 501 | +ascii_invalid_value: int = 0x7F |
| 502 | + |
| 503 | + |
| 504 | +def hexencoding_lookup_no_case(input_value: int) -> int: |
| 505 | + if input_value == 48: |
| 506 | + return 0x00 |
| 507 | + elif input_value == 49: |
| 508 | + return 0x01 |
| 509 | + elif input_value == 50: |
| 510 | + return 0x02 |
| 511 | + elif input_value == 51: |
| 512 | + return 0x03 |
| 513 | + elif input_value == 52: |
| 514 | + return 0x04 |
| 515 | + elif input_value == 53: |
| 516 | + return 0x05 |
| 517 | + elif input_value == 54: |
| 518 | + return 0x06 |
| 519 | + elif input_value == 55: |
| 520 | + return 0x07 |
| 521 | + elif input_value == 56: |
| 522 | + return 0x08 |
| 523 | + elif input_value == 57: |
| 524 | + return 0x09 |
| 525 | + elif (input_value == 65) or (input_value == 97): |
| 526 | + return 0x0A |
| 527 | + elif (input_value == 66) or (input_value == 98): |
| 528 | + return 0x0B |
| 529 | + elif (input_value == 67) or (input_value == 99): |
| 530 | + return 0x0C |
| 531 | + elif (input_value == 68) or (input_value == 100): |
| 532 | + return 0x0D |
| 533 | + elif (input_value == 69) or (input_value == 101): |
| 534 | + return 0x0E |
| 535 | + elif (input_value == 70) or (input_value == 102): |
| 536 | + return 0x0F |
| 537 | + else: |
| 538 | + return ascii_invalid_value |
| 539 | + |
| 540 | + |
| 541 | +def geometryhex_recv(data: bytes, idx: int, length: int) -> str: |
| 542 | + error_flag: bool = False |
| 543 | + pointer: int = idx |
| 544 | + |
| 545 | + if data is None: |
| 546 | + return "" |
| 547 | + elif length == 0: |
| 548 | + return "" |
| 549 | + else: |
| 550 | + # EWT is always hex encoded |
| 551 | + # check to see if byte is expected length |
| 552 | + if 1 == ((idx + length - pointer) % 2): |
| 553 | + return data[idx : idx + length].hex() |
| 554 | + |
| 555 | + result: bytearray = bytearray((idx + length - pointer) // 2) |
| 556 | + |
| 557 | + i: int = 0 |
| 558 | + while pointer < (idx + length): |
| 559 | + # get the ascii number encoded |
| 560 | + stage: int = hexencoding_lookup_no_case(data[pointer]) << 4 |
| 561 | + # error check |
| 562 | + error_flag = (stage == ascii_invalid_value) | error_flag |
| 563 | + pointer += 1 |
| 564 | + stage2 = hexencoding_lookup_no_case(data[pointer]) |
| 565 | + error_flag = (stage2 == ascii_invalid_value) | error_flag |
| 566 | + pointer += 1 |
| 567 | + |
| 568 | + result[i] = stage | stage2 |
| 569 | + i += 1 |
| 570 | + |
| 571 | + if error_flag: |
| 572 | + return data[idx : idx + length].hex() |
| 573 | + |
| 574 | + return result.hex() |
| 575 | + |
| 576 | + |
500 | 577 | # def inet_in(data: bytes, offset: int, length: int) -> typing.Union[IPv4Address, IPv6Address, IPv4Network, IPv6Network]: |
501 | 578 | # inet_str: str = data[offset: offset + length].decode( |
502 | 579 | # _client_encoding) |
@@ -554,6 +631,7 @@ def array_recv_binary(data: bytes, idx: int, length: int) -> typing.List: |
554 | 631 | # 2275: (FC_BINARY, text_recv), # cstring |
555 | 632 | # 2950: (FC_BINARY, uuid_recv), # uuid |
556 | 633 | GEOMETRY: (FC_TEXT, text_recv), # GEOMETRY |
| 634 | + GEOMETRYHEX: (FC_TEXT, geometryhex_recv), # GEOMETRYHEX |
557 | 635 | # 3802: (FC_TEXT, json_in), # jsonb |
558 | 636 | SUPER: (FC_TEXT, text_recv), # SUPER |
559 | 637 | }, |
|
0 commit comments