@@ -18,6 +18,7 @@ import {
1818
1919import semanticTypes from './semantic-types' ;
2020import { AnyIterable } from './types' ;
21+ import { allowAbort , ALLOW_ABORT_INTERVAL_COUNT } from './util' ;
2122
2223type TypeCastMap = {
2324 Array : unknown [ ] ;
@@ -484,6 +485,10 @@ export class SchemaAnalyzer {
484485 fields : [ ]
485486 } ;
486487
488+ // Increments when every field or type is analyzed.
489+ // Useful for occasionally checking if the analysis should be aborted.
490+ fieldAndTypeAnalysisCounter = 0 ;
491+
487492 constructor ( options ?: SchemaParseOptions ) {
488493 // Set default options.
489494 this . options = { ...defaultSchemaParseOptions , ...options } ;
@@ -512,6 +517,13 @@ export class SchemaAnalyzer {
512517 }
513518 }
514519
520+ allowAbortDuringAnalysis ( ) {
521+ // Allow aborting the analysis.
522+ if ( this . fieldAndTypeAnalysisCounter ++ % ALLOW_ABORT_INTERVAL_COUNT === 0 ) {
523+ allowAbort ( ) ;
524+ }
525+ }
526+
515527 increaseFieldCount ( ) {
516528 if ( ! this . options . distinctFieldsAbortThreshold ) return ;
517529 this . fieldsCount ++ ;
@@ -531,14 +543,15 @@ export class SchemaAnalyzer {
531543 return returnValue ;
532544 }
533545
534- analyzeDoc ( doc : Document ) {
546+ async analyzeDoc ( doc : Document ) {
535547 this . finalized = false ;
536548 /**
537549 * Takes a field value, determines the correct type, handles recursion into
538550 * nested arrays and documents, and passes the value down to `addToValue`.
539551 * Note: This mutates the `schema` argument.
540552 */
541- const addToType = ( path : string [ ] , value : BSONValue , schema : SchemaAnalysisFieldTypes ) => {
553+ const addToType = async ( path : string [ ] , value : BSONValue , schema : SchemaAnalysisFieldTypes ) => {
554+ await this . allowAbortDuringAnalysis ( ) ;
542555 const bsonType = getBSONType ( value ) ;
543556 // If semantic type detection is enabled, the type is the semantic type
544557 // or the original bson type if no semantic type was detected. If disabled,
@@ -560,13 +573,16 @@ export class SchemaAnalyzer {
560573 type . types = type . types ?? Object . create ( null ) ;
561574 type . lengths = type . lengths ?? [ ] ;
562575 type . lengths . push ( ( value as BSONValue [ ] ) . length ) ;
563- ( value as BSONValue [ ] ) . forEach ( ( v : BSONValue ) => addToType ( path , v , type . types ) ) ;
576+ for ( const v of ( value as BSONValue [ ] ) ) {
577+ await addToType ( path , v , type . types ) ;
578+ }
564579 } else if ( isDocumentType ( type ) ) {
565580 // Recurse into nested documents by calling `addToField` for all sub-fields.
566581 type . fields = type . fields ?? Object . create ( null ) ;
567- Object . entries ( value as Document ) . forEach (
568- ( [ fieldName , v ] ) => addToField ( fieldName , [ ...path , fieldName ] , v , type . fields )
569- ) ;
582+
583+ for ( const [ fieldName , v ] of Object . entries ( value as Document ) ) {
584+ await addToField ( fieldName , [ ...path , fieldName ] , v , type . fields ) ;
585+ }
570586 } else if ( this . options . storeValues && ! isNullType ( type ) ) {
571587 // When the `storeValues` option is enabled, store some example values.
572588 if ( ! type . values ) {
@@ -584,7 +600,8 @@ export class SchemaAnalyzer {
584600 * Handles a field from a document. Passes the value to `addToType`.
585601 * Note: This mutates the `schema` argument.
586602 */
587- const addToField = ( fieldName : string , path : string [ ] , value : BSONValue , schema : SchemaAnalysisFieldsMap ) => {
603+ const addToField = async ( fieldName : string , path : string [ ] , value : BSONValue , schema : SchemaAnalysisFieldsMap ) => {
604+ await this . allowAbortDuringAnalysis ( ) ;
588605 if ( ! schema [ fieldName ] ) {
589606 schema [ fieldName ] = {
590607 name : fieldName ,
@@ -597,11 +614,11 @@ export class SchemaAnalyzer {
597614 const field = schema [ fieldName ] ;
598615
599616 field . count ++ ;
600- addToType ( path , value , field . types ) ;
617+ await addToType ( path , value , field . types ) ;
601618 } ;
602619
603620 for ( const key of Object . keys ( doc ) ) {
604- addToField ( key , [ key ] , doc [ key ] , this . schemaAnalysisRoot . fields ) ;
621+ await addToField ( key , [ key ] , doc [ key ] , this . schemaAnalysisRoot . fields ) ;
605622 }
606623 this . schemaAnalysisRoot . count += 1 ;
607624 }
@@ -652,7 +669,7 @@ export async function getCompletedSchemaAnalyzer(
652669 const analyzer = new SchemaAnalyzer ( options ) ;
653670 for await ( const doc of verifyStreamSource ( source ) ) {
654671 if ( options ?. signal ?. aborted ) throw options . signal . reason ;
655- analyzer . analyzeDoc ( doc ) ;
672+ await analyzer . analyzeDoc ( doc ) ;
656673 }
657674 return analyzer ;
658675}
0 commit comments