Skip to content

Commit c88b606

Browse files
ChinmayBansalAmnah199davidsbatista
authored
feat: add delete_all_documents() to MongoDB DocumentStore (#2401)
* feat: add delete_all_documents() to MongoDB DocumentStore * add functionality to recreate a collection --------- Co-authored-by: Amna Mubashar <amnahkhan.ak@gmail.com> Co-authored-by: David S. Batista <dsbatista@gmail.com>
1 parent 5758a79 commit c88b606

File tree

3 files changed

+154
-0
lines changed

3 files changed

+154
-0
lines changed

integrations/mongodb_atlas/src/haystack_integrations/document_stores/mongodb_atlas/document_store.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,108 @@ async def delete_documents_async(self, document_ids: List[str]) -> None:
423423
return
424424
await self._collection_async.delete_many(filter={"id": {"$in": document_ids}})
425425

426+
def delete_all_documents(self, *, recreate_collection: bool = False) -> None:
427+
"""
428+
Deletes all documents in the document store.
429+
430+
:param recreate_collection: If True, the collection will be dropped and recreated with the original
431+
configuration and indexes. If False, all documents will be deleted while preserving the collection.
432+
Recreating the collection is faster for very large collections.
433+
"""
434+
self._ensure_connection_setup()
435+
assert self._collection is not None
436+
assert self._connection is not None
437+
438+
try:
439+
if recreate_collection:
440+
database = self._connection[self.database_name]
441+
442+
# Save collection configuration
443+
collection_info = database.list_collections(filter={"name": self.collection_name})
444+
config = next(collection_info, {}).get("options", {})
445+
446+
# Save index definitions (excluding default _id index)
447+
indexes = list(self._collection.list_indexes())
448+
custom_indexes = [idx for idx in indexes if idx["name"] != "_id_"]
449+
450+
# Drop and recreate collection
451+
self._collection.drop()
452+
database.create_collection(self.collection_name, **config)
453+
454+
# Recreate indexes
455+
for idx in custom_indexes:
456+
keys = list(idx["key"].items())
457+
index_options = {k: v for k, v in idx.items() if k not in ["key", "v", "ns"]}
458+
self._collection.create_index(keys, **index_options)
459+
460+
logger.info(
461+
"Collection '{collection}' recreated with original configuration.",
462+
collection=self.collection_name,
463+
)
464+
else:
465+
# Delete all documents without recreating collection
466+
result = self._collection.delete_many({})
467+
logger.info(
468+
"Deleted {n_docs} documents from collection '{collection}'.",
469+
n_docs=result.deleted_count,
470+
collection=self.collection_name,
471+
)
472+
except Exception as e:
473+
msg = f"Failed to delete all documents from MongoDB Atlas: {e!s}"
474+
raise DocumentStoreError(msg) from e
475+
476+
async def delete_all_documents_async(self, *, recreate_collection: bool = False) -> None:
477+
"""
478+
Asynchronously deletes all documents in the document store.
479+
480+
:param recreate_collection: If True, the collection will be dropped and recreated with the original
481+
configuration and indexes. If False, all documents will be deleted while preserving the collection.
482+
Recreating the collection is faster for very large collections.
483+
"""
484+
await self._ensure_connection_setup_async()
485+
assert self._collection_async is not None
486+
assert self._connection_async is not None
487+
488+
try:
489+
if recreate_collection:
490+
database = self._connection_async[self.database_name]
491+
492+
# Save collection configuration
493+
collection_info_cursor = await database.list_collections(filter={"name": self.collection_name})
494+
config_list = await collection_info_cursor.to_list(length=1)
495+
config = config_list[0].get("options", {}) if config_list else {}
496+
497+
# Save index definitions (excluding default _id index)
498+
indexes_cursor = await self._collection_async.list_indexes()
499+
indexes = await indexes_cursor.to_list(length=None)
500+
custom_indexes = [idx for idx in indexes if idx["name"] != "_id_"]
501+
502+
# Drop and recreate collection
503+
await self._collection_async.drop()
504+
await database.create_collection(self.collection_name, **config)
505+
506+
# Recreate indexes
507+
for idx in custom_indexes:
508+
keys = list(idx["key"].items())
509+
index_options = {k: v for k, v in idx.items() if k not in ["key", "v", "ns"]}
510+
await self._collection_async.create_index(keys, **index_options)
511+
512+
logger.info(
513+
"Collection '{collection}' recreated with original configuration.",
514+
collection=self.collection_name,
515+
)
516+
else:
517+
# Delete all documents without recreating collection
518+
result = await self._collection_async.delete_many({})
519+
logger.info(
520+
"Deleted {n_docs} documents from collection '{collection}'.",
521+
n_docs=result.deleted_count,
522+
collection=self.collection_name,
523+
)
524+
except Exception as e:
525+
msg = f"Failed to delete all documents from MongoDB Atlas: {e!s}"
526+
raise DocumentStoreError(msg) from e
527+
426528
def _embedding_retrieval(
427529
self,
428530
query_embedding: List[float],

integrations/mongodb_atlas/tests/test_document_store.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,3 +333,29 @@ def test_custom_content_field(self):
333333

334334
finally:
335335
database[collection_name].drop()
336+
337+
def test_delete_all_documents(self, document_store: MongoDBAtlasDocumentStore):
338+
docs = [Document(id="1", content="first doc"), Document(id="2", content="second doc")]
339+
document_store.write_documents(docs)
340+
assert document_store.count_documents() == 2
341+
document_store.delete_all_documents()
342+
assert document_store.count_documents() == 0
343+
344+
def test_delete_all_documents_empty_collection(self, document_store: MongoDBAtlasDocumentStore):
345+
assert document_store.count_documents() == 0
346+
document_store.delete_all_documents()
347+
assert document_store.count_documents() == 0
348+
349+
def test_delete_all_documents_with_recreate_collection(self, document_store: MongoDBAtlasDocumentStore):
350+
docs = [Document(id="1", content="first doc"), Document(id="2", content="second doc")]
351+
document_store.write_documents(docs)
352+
assert document_store.count_documents() == 2
353+
354+
# Delete all documents with collection recreation
355+
document_store.delete_all_documents(recreate_collection=True)
356+
assert document_store.count_documents() == 0
357+
358+
# Verify collection still exists and we can write to it
359+
new_docs = [Document(id="3", content="third doc")]
360+
document_store.write_documents(new_docs)
361+
assert document_store.count_documents() == 1

integrations/mongodb_atlas/tests/test_document_store_async.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,29 @@ async def test_delete_documents_async(self, document_store: MongoDBAtlasDocument
127127
assert await document_store.count_documents_async() == 1
128128
await document_store.delete_documents_async(document_ids=["1"])
129129
assert await document_store.count_documents_async() == 0
130+
131+
async def test_delete_all_documents_async(self, document_store: MongoDBAtlasDocumentStore):
132+
docs = [Document(id="1", content="first doc"), Document(id="2", content="second doc")]
133+
await document_store.write_documents_async(docs)
134+
assert await document_store.count_documents_async() == 2
135+
await document_store.delete_all_documents_async()
136+
assert await document_store.count_documents_async() == 0
137+
138+
async def test_delete_all_documents_async_empty_collection(self, document_store: MongoDBAtlasDocumentStore):
139+
assert await document_store.count_documents_async() == 0
140+
await document_store.delete_all_documents_async()
141+
assert await document_store.count_documents_async() == 0
142+
143+
async def test_delete_all_documents_async_with_recreate_collection(self, document_store: MongoDBAtlasDocumentStore):
144+
docs = [Document(id="1", content="first doc"), Document(id="2", content="second doc")]
145+
await document_store.write_documents_async(docs)
146+
assert await document_store.count_documents_async() == 2
147+
148+
# Delete all documents with collection recreation
149+
await document_store.delete_all_documents_async(recreate_collection=True)
150+
assert await document_store.count_documents_async() == 0
151+
152+
# Verify collection still exists and we can write to it
153+
new_docs = [Document(id="3", content="third doc")]
154+
await document_store.write_documents_async(new_docs)
155+
assert await document_store.count_documents_async() == 1

0 commit comments

Comments
 (0)