# -*- coding: utf-8 -*-
# Copyright 2016, 2025 Juca Crispim <juca@poraodojuca.dev>
# This file is part of mongomotor.
# mongomotor is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# mongomotor is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with mongomotor. If not, see <http://www.gnu.org/licenses/>.
from bson.code import Code
from bson import SON
import os
import re
from mongoengine import DENY, CASCADE, NULLIFY, PULL
from mongoengine.common import _import_class
from mongoengine.connection import get_db
from mongoengine.context_managers import (
    set_write_concern,
    set_read_write_concern,
)
from mongoengine.errors import (
    OperationError,
    BulkWriteError,
    NotUniqueError,
    LookUpError,
)
from mongoengine.queryset import transform
from mongoengine.queryset.queryset import QuerySet as MEQuerySet
import pymongo
from pymongo import ReturnDocument
from mongomotor import signals
# for tests
TEST_ENV = os.environ.get('MONGOMOTOR_TEST_ENV')
[docs]
class QuerySet(MEQuerySet):
    def __repr__(self):  # pragma no cover
        return self.__class__.__name__
    def __len__(self):
        raise TypeError('len() is not supported. Use count()')
    def _iter_results(self):
        try:
            return super()._iter_results()
        except StopIteration:
            raise StopAsyncIteration
    def __getitem__(self, index):
        skip = self._skip or 0
        # If we received an slice we will return a queryset
        # and as we will not touch the db now we do not need a future
        # here
        if isinstance(index, slice):
            query = self.clone()
            query = query.skip(index.start + skip)
            query = query.limit(index.stop - index.start)
            return query
        else:
            query = self.clone()
            query = query.skip(index + skip).limit(1)
            return query.first()
    def __aiter__(self):
        return self
    async def __anext__(self):
        async for doc in self._cursor:
            mm_doc = self._document._from_son(
                doc,
                _auto_dereference=self._auto_dereference)
            return mm_doc
        else:
            raise StopAsyncIteration()
[docs]
    async def get(self, *q_objs, **query):
        """Retrieve the the matching object raising
        :class:`~mongoengine.queryset.MultipleObjectsReturned` or
        `DocumentName.MultipleObjectsReturned` exception if multiple results
        and :class:`~mongoengine.queryset.DoesNotExist` or
        `DocumentName.DoesNotExist` if no results are found.
        """
        queryset = self.clone()
        queryset = queryset.order_by().limit(2)
        queryset = queryset.filter(*q_objs, **query)
        docs = await queryset.to_list(length=2)
        if len(docs) < 1:
            msg = ("%s matching query does not exist."
                   % queryset._document._class_name)
            raise queryset._document.DoesNotExist(msg)
        elif len(docs) > 1:
            msg = 'More than 1 item returned'
            raise queryset._document.MultipleObjectsReturned(msg)
        return docs[0] 
[docs]
    async def first(self):
        """Retrieve the first object matching the query.
        """
        queryset = self.clone()
        result = await queryset.limit(1).to_list()
        try:
            return result[0]
        except IndexError:
            return None 
[docs]
    async def count(self, with_limit_and_skip=True):
        """Counts the documents in the queryset.
        :param with_limit_and_skip: Indicates if limit and skip applied to
          the queryset should be taken into account."""
        if self._limit == 0 and with_limit_and_skip or self._none:
            return 0
        kw = {}
        if with_limit_and_skip and self._limit:
            kw['limit'] = self._limit
        if with_limit_and_skip and self._skip:
            kw['skip'] = self._skip
        return await self._collection.count_documents(self._query, **kw) 
[docs]
    async def insert(
        self, doc_or_docs, load_bulk=True, write_concern=None,
        signal_kwargs=None
    ):
        """bulk insert documents
        :param doc_or_docs: a document or list of documents to be inserted
        :param load_bulk (optional): If True returns the list of document
            instances
        :param write_concern: Extra keyword arguments are passed down to
                :meth:`~pymongo.collection.Collection.insert`
                which will be used as options for the resultant
                ``getLastError`` command.  For example,
                ``insert(..., {w: 2, fsync: True})`` will wait until at least
                two servers have recorded the write and will force an fsync on
                each server being written to.
        :param signal_kwargs: (optional) kwargs dictionary to be passed to
            the signal calls.
        By default returns document instances, set ``load_bulk`` to False to
        return just ``ObjectIds``
        """
        Document = _import_class("Document")
        if write_concern is None:
            write_concern = {}
        docs = doc_or_docs
        return_one = False
        if isinstance(docs, Document) or issubclass(docs.__class__, Document):
            return_one = True
            docs = [docs]
        for doc in docs:
            if not isinstance(doc, self._document):
                msg = "Some documents inserted aren't instances of %s" % str(
                    self._document
                )
                raise OperationError(msg)
            if doc.pk and not doc._created:
                msg = "Some documents have ObjectIds, use doc.update() instead"
                raise OperationError(msg)
        signal_kwargs = signal_kwargs or {}
        signals.pre_bulk_insert.send(
            self._document, documents=docs, **signal_kwargs)
        raw = [doc.to_mongo() for doc in docs]
        with set_write_concern(self._collection, write_concern) as collection:
            insert_func = collection.insert_many
            if return_one:
                raw = raw[0]
                insert_func = collection.insert_one
        try:
            inserted_result = await insert_func(raw)
            ids = (
                [inserted_result.inserted_id]
                if return_one
                else inserted_result.inserted_ids
            )
        except pymongo.errors.DuplicateKeyError as err:
            message = "Could not save document (%s)"
            raise NotUniqueError(message % err)
        except pymongo.errors.BulkWriteError as err:
            # inserting documents that already have an _id field will
            # give huge performance debt or raise
            message = "Bulk write error: (%s)"
            raise BulkWriteError(message % err.details)
        except pymongo.errors.OperationFailure as err:
            message = "Could not save document (%s)"
            if re.match("^E1100[01] duplicate key", str(err)):
                # E11000 - duplicate key error index
                # E11001 - duplicate key on update
                message = "Tried to save duplicate unique keys (%s)"
                raise NotUniqueError(message % err)
            raise OperationError(message % err)
        # Apply inserted_ids to documents
        for doc, doc_id in zip(docs, ids):
            doc.pk = doc_id
        if not load_bulk:
            signals.post_bulk_insert.send(
                self._document, documents=docs, loaded=False, **signal_kwargs
            )
            return ids[0] if return_one else ids
        documents = await self.in_bulk(ids)
        results = [documents.get(obj_id) for obj_id in ids]
        signals.post_bulk_insert.send(
            self._document, documents=results, loaded=True, **signal_kwargs
        )
        return results[0] if return_one else results 
[docs]
    async def update(
        self,
        upsert=False,
        multi=True,
        write_concern=None,
        read_concern=None,
        full_result=False,
        array_filters=None,
        **update,
    ):
        """Perform an atomic update on the fields matched by the query.
        :param upsert: insert if document doesn't exist (default ``False``)
        :param multi: Update multiple documents.
        :param write_concern: Extra keyword arguments are passed down which
            will be used as options for the resultant
            ``getLastError`` command.  For example,
            ``save(..., write_concern={w: 2, fsync: True}, ...)`` will
            wait until at least two servers have recorded the write and
            will force an fsync on the primary server.
        :param read_concern: Override the read concern for the operation
        :param full_result: Return the associated ``pymongo.UpdateResult``
            rather than just the number updated items
        :param array_filters: A list of filters specifying which array elements
            an update should apply.
        :param update: Django-style update keyword arguments
        :returns the number of updated documents (unless ``full_result``
            is True)
        """
        if not update and not upsert:
            raise OperationError("No update parameters, would remove data")
        if write_concern is None:
            write_concern = {}
        if self._none or self._empty:
            return 0
        queryset = self.clone()
        query = queryset._query
        if "__raw__" in update and isinstance(
            update["__raw__"], list
        ):  # Case of Update with Aggregation Pipeline
            update = [
                transform.update(queryset._document, **{"__raw__": u})
                for u in update["__raw__"]
            ]
        else:
            update = transform.update(queryset._document, **update)
        # If doing an atomic upsert on an inheritable class
        # then ensure we add _cls to the update operation
        if upsert and "_cls" in query:
            if "$set" in update:
                update["$set"]["_cls"] = queryset._document._class_name
            else:
                update["$set"] = {"_cls": queryset._document._class_name}
        try:
            with set_read_write_concern(
                queryset._collection, write_concern, read_concern
            ) as collection:
                update_func = collection.update_one
                if multi:
                    update_func = collection.update_many
                result = await update_func(
                    query, update, upsert=upsert, array_filters=array_filters
                )
            if full_result:
                return result
            elif result.raw_result:
                return result.raw_result["n"]
        except pymongo.errors.DuplicateKeyError as err:
            raise NotUniqueError("Update failed (%s)" % err)
        except pymongo.errors.OperationFailure as err:
            if str(err) == "multi not coded yet":
                message = "update() method requires MongoDB 1.1.3+"
                raise OperationError(message)
            raise OperationError("Update failed (%s)" % err) 
[docs]
    async def in_bulk(self, object_ids):
        """Retrieve a set of documents by their ids.
        :param object_ids: a list or tuple of ObjectId's
        :rtype: dict of ObjectId's as keys and collection-specific
                Document subclasses as values.
        """
        doc_map = {}
        docs = self._collection.find(
            {"_id": {"$in": object_ids}}, **self._cursor_args)
        if self._scalar:
            async for doc in docs:
                doc_map[doc["_id"]] = self._get_scalar(
                    self._document._from_son(doc))
        elif self._as_pymongo:
            async for doc in docs:
                doc_map[doc["_id"]] = doc
        else:
            async for doc in docs:
                doc_map[doc["_id"]] = self._document._from_son(
                    doc,
                    _auto_dereference=self._auto_dereference,
                )
        return doc_map 
[docs]
    async def delete(self, write_concern=None, _from_doc_delete=False,
                     cascade_refs=None):
        """Deletes the documents matched by the query.
        :param write_concern: Extra keyword arguments are passed down which
            will be used as options for the resultant
            ``getLastError`` command.  For example,
            ``save(..., write_concern={w: 2, fsync: True}, ...)`` will
            wait until at least two servers have recorded the write and
            will force an fsync on the primary server.
        :param _from_doc_delete: True when called from document delete
          therefore signals will have been triggered so don't loop.
        :returns number of deleted documents
        """
        queryset = self.clone()
        doc = queryset._document
        if write_concern is None:
            write_concern = {}
        # Handle deletes where skips or limits have been applied or
        # there is an untriggered delete signal
        has_delete_signal = (
            signals.pre_delete.has_receivers_for(self._document) or
            signals.post_delete.has_receivers_for(self._document))
        call_document_delete = (queryset._skip or queryset._limit or
                                has_delete_signal) and not _from_doc_delete
        if call_document_delete:
            r = await self._document_delete(queryset, write_concern)
            return r
        await self._check_delete_rules(doc, queryset, cascade_refs,
                                       write_concern)
        r = await queryset._collection.delete_many(
            queryset._query, **write_concern)
        return r 
[docs]
    async def upsert_one(self, write_concern=None, **update):
        """Overwrite or add the first document matched by the query.
        :param write_concern: Extra keyword arguments are passed down which
            will be used as options for the resultant
            ``getLastError`` command.  For example,
            ``save(..., write_concern={w: 2, fsync: True}, ...)`` will
            wait until at least two servers have recorded the write and
            will force an fsync on the primary server.
        :param update: Django-style update keyword arguments
        :returns the new or overwritten document
        """
        result = await self.update(multi=False, upsert=True,
                                   write_concern=write_concern,
                                   full_result=True, **update)
        result = result.raw_result
        if result['updatedExisting']:
            doc = await self.first()
        else:
            doc = await self._document.objects.with_id(
                result['upserted'])
        return doc 
[docs]
    async def to_list(self, length=100):
        """Returns a list of the current documents in the queryset.
        :param length: maximum number of documents to return for this call."""
        cursor = self._cursor
        docs_list = await cursor.to_list(length)
        final_list = [self._document._from_son(
            d, _auto_dereference=self._auto_dereference)
            for d in docs_list]
        return final_list 
[docs]
    async def item_frequencies(self, field, normalize=False):
        """Returns a dictionary of all items present in a field across
        the whole queried set of documents, and their corresponding frequency.
        This is useful for generating tag clouds, or searching documents.
        .. note::
            Can only do direct simple mappings and cannot map across
            :class:`~mongomotor.fields.ReferenceField` or
            :class:`~mongomotor.fields.GenericReferenceField` for more complex
            counting a manual aggretation call would be required.
        If the field is a :class:`~mongomotor.fields.ListField`,
        the items within each list will be counted individually.
        :param field: the field to use
        :param normalize: normalize the results so they add to 1.0
        """
        cursor = await self._document._get_collection().aggregate([
            {'$match': self._query},
            {'$unwind': f'${field}'},
            {'$group': {'_id': '$' + field, 'total': {'$sum': 1}}}
        ])
        freqs = {}
        async for doc in cursor:
            freqs[doc['_id']] = doc['total']
        if normalize:
            count = sum(freqs.values())
            freqs = dict([(k, float(v) / count)
                          for k, v in list(freqs.items())])
        return freqs 
[docs]
    async def average(self, field):
        """Average over the values of the specified field.
        :param field: the field to average over; use dot-notation to refer to
            embedded document fields
        This method is more performant than the regular `average`, because it
        uses the aggregation framework instead of map-reduce.
        """
        cursor = await self._document._get_collection().aggregate([
            {'$match': self._query},
            {'$group': {'_id': 'avg', 'total': {'$avg': '$' + field}}}
        ])
        avg = 0
        async for doc in cursor:
            avg = doc['total']
            break
        return avg 
[docs]
    async def aggregate(self, pipeline, **kwargs):
        """Perform an aggregate function based on your queryset params
        :param pipeline: list of aggregation commands,
            see: https://www.mongodb.com/docs/manual/core/aggregation-pipeline/
        :param kwargs: (optional) kwargs dictionary to be passed to pymongo's
            aggregate call.
        """
        user_pipeline = pipeline
        initial_pipeline = []
        if self._none or self._empty:
            initial_pipeline.append({"$limit": 1})
            initial_pipeline.append({"$match": {"$expr": False}})
        if self._query:
            initial_pipeline.append({"$match": self._query})
        if self._ordering:
            initial_pipeline.append({"$sort": dict(self._ordering)})
        if self._limit is not None:
            # As per MongoDB Documentation
            # (https://www.mongodb.com/docs/manual/reference/operator/aggregation/limit/),
            # keeping limit stage right after sort stage is more efficient.
            # But this leads to wrong set of documents
            # for a skip stage that might succeed these. So we need to maintain
            # more documents in memory in such a case
            # (https://stackoverflow.com/a/24161461).
            initial_pipeline.append(
                {"$limit": self._limit + (self._skip or 0)})
        if self._skip is not None:
            initial_pipeline.append({"$skip": self._skip})
        final_pipeline = initial_pipeline + user_pipeline
        collection = self._collection
        if self._read_preference is not None or self._read_concern is not None:
            collection = self._collection.with_options(
                read_preference=self._read_preference,
                read_concern=self._read_concern
            )
        return await collection.aggregate(final_pipeline, cursor={}, **kwargs) 
[docs]
    async def map_reduce(
        self, map_f, reduce_f, output, finalize_f=None, limit=None, scope=None
    ):
        """Perform a map/reduce query using the current query spec
        and ordering. While ``map_reduce`` respects ``QuerySet`` chaining,
        it must be the last call made, as it does not return a maleable
        ``QuerySet``.
        See the :meth:`~mongoengine.tests.QuerySetTest.test_map_reduce`
        and :meth:`~mongoengine.tests.QuerySetTest.test_map_advanced`
        tests in ``tests.queryset.QuerySetTest`` for usage examples.
        :param map_f: map function, as :class:`~bson.code.Code` or string
        :param reduce_f: reduce function, as
                         :class:`~bson.code.Code` or string
        :param output: output collection name, if set to 'inline' will return
           the results inline. This can also be a dictionary containing output
           options see:
           https://www.mongodb.com/docs/manual/reference/command/mapReduce/#mongodb-dbcommand-dbcmd.mapReduce
        :param finalize_f: finalize function, an optional function that
                           performs any post-reduction processing.
        :param scope: values to insert into map/reduce global scope. Optional.
        :param limit: number of objects from current query to provide
                      to map/reduce method
        Returns an iterator yielding
        :class:`~mongoengine.document.MapReduceDocument`.
        """
        queryset = self.clone()
        MapReduceDocument = _import_class("MapReduceDocument")
        map_f_scope = {}
        if isinstance(map_f, Code):
            map_f_scope = map_f.scope
            map_f = str(map_f)
        map_f = Code(queryset._sub_js_fields(map_f), map_f_scope or None)
        reduce_f_scope = {}
        if isinstance(reduce_f, Code):
            reduce_f_scope = reduce_f.scope
            reduce_f = str(reduce_f)
        reduce_f_code = queryset._sub_js_fields(reduce_f)
        reduce_f = Code(reduce_f_code, reduce_f_scope or None)
        mr_args, inline = self._get_map_reduce_args(
            queryset, finalize_f, scope, limit, output)
        db = queryset._document._get_db()
        result = await db.command(
            {
                "mapReduce": queryset._document._get_collection_name(),
                "map": map_f,
                "reduce": reduce_f,
                **mr_args,
            }
        )
        if inline:
            docs = result["results"]
        else:
            if isinstance(result["result"], str):
                docs = await db[result["result"]].find().to_list()
            else:
                info = result["result"]
                docs = await db.client[
                    info["db"]][info["collection"]].find().to_list()
        if queryset._ordering:
            docs = docs.sort(queryset._ordering)
        for doc in docs:
            yield MapReduceDocument(
                queryset._document, queryset._collection, doc["_id"],
                doc["value"]
            ) 
[docs]
    async def sum(self, field):
        """Sum over the values of the specified field.
        :param field: the field to sum over; use dot-notation to refer to
            embedded document fields
        This method is more performant than the regular `sum`, because it uses
        the aggregation framework instead of map-reduce.
        """
        cursor = await self._document._get_collection().aggregate([
            {'$match': self._query},
            {'$group': {'_id': 'sum', 'total': {'$sum': '$' + field}}}
        ])
        r = 0
        async for doc in cursor:
            r = doc['total']
            break
        return r 
[docs]
    async def distinct(self, field):
        """Return a list of distinct values for a given field.
        :param field: the field to select distinct values from
        .. note:: This is a command and won't take ordering or limit into
           account.
        """
        queryset = self.clone()
        try:
            field = self._fields_to_dbfields([field]).pop()
        except LookUpError:
            pass
        raw_values = await queryset._cursor.distinct(field)
        if not self._auto_dereference:
            return raw_values
        distinct = await self._dereference(
            raw_values, 1, name=field, instance=self._document)
        doc_field = self._document._fields.get(field.split(".", 1)[0])
        instance = None
        # We may need to cast to the correct type eg.
        # ListField(EmbeddedDocumentField)
        EmbeddedDocumentField = _import_class("EmbeddedDocumentField")
        ListField = _import_class("ListField")
        GenericEmbeddedDocumentField = _import_class(
            "GenericEmbeddedDocumentField")
        if isinstance(doc_field, ListField):
            doc_field = getattr(doc_field, "field", doc_field)
        if isinstance(doc_field, (EmbeddedDocumentField,
                                  GenericEmbeddedDocumentField)):
            instance = getattr(doc_field, "document_type", None)
        # handle distinct on subdocuments
        if "." in field:
            for field_part in field.split(".")[1:]:
                # if looping on embedded document, get the document
                # type instance
                if instance and isinstance(
                    doc_field, (EmbeddedDocumentField,
                                GenericEmbeddedDocumentField)
                ):
                    doc_field = instance
                # now get the subdocument
                doc_field = getattr(doc_field, field_part, doc_field)
                # We may need to cast to the correct type eg.
                # ListField(EmbeddedDocumentField)
                if isinstance(doc_field, ListField):
                    doc_field = getattr(doc_field, "field", doc_field)
                if isinstance(
                    doc_field, (EmbeddedDocumentField,
                                GenericEmbeddedDocumentField)
                ):
                    instance = getattr(doc_field, "document_type", None)
        if instance and isinstance(
            doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField)
        ):
            distinct = [instance(**doc) for doc in distinct]
        return distinct 
[docs]
    async def modify(
        self,
        upsert=False,
        remove=False,
        new=False,
        array_filters=None,
        **update,
    ):
        """Update and return the updated document.
        Returns either the document before or after modification based on `new`
        parameter. If no documents match the query and `upsert` is false,
        returns ``None``. If upserting and `new` is false, returns ``None``.
        :param upsert: insert if document doesn't exist (default ``False``)
        :param full_response: return the entire response object from the
            server (default ``False``, not available for PyMongo 3+)
        :param remove: remove rather than updating (default ``False``)
        :param new: return updated rather than original document
            (default ``False``)
        :param array_filters: A list of filters specifying which array
            elements an update should apply.
        :param update: Django-style update keyword arguments
        """
        if remove and new:
            raise OperationError("Conflicting parameters: remove and new")
        if not update and not upsert and not remove:
            raise OperationError(
                "No update parameters, must either update or remove")
        if self._none or self._empty:
            return None
        queryset = self.clone()
        query = queryset._query
        if not remove:
            update = transform.update(queryset._document, **update)
        sort = queryset._ordering
        try:
            if remove:
                result = await queryset._collection.find_one_and_delete(
                    query, sort=sort, **self._cursor_args
                )
            else:
                if new:
                    return_doc = ReturnDocument.AFTER
                else:
                    return_doc = ReturnDocument.BEFORE
                result = await queryset._collection.find_one_and_update(
                    query,
                    update,
                    upsert=upsert,
                    sort=sort,
                    return_document=return_doc,
                    array_filters=array_filters,
                    **self._cursor_args,
                )
        except pymongo.errors.DuplicateKeyError as err:
            raise NotUniqueError("Update failed (%s)" % err)
        except pymongo.errors.OperationFailure as err:
            raise OperationError("Update failed (%s)" % err)
        if result is not None:
            result = self._document._from_son(result)
        return result 
[docs]
    async def explain(self):
        """Return an explain plan record for the
        :class:`~mongoengine.queryset.QuerySet` cursor.
        """
        return await self._cursor.explain() 
    @property
    def fetch_next(self):
        return self._cursor.fetch_next
[docs]
    def next_object(self):
        raw = self._cursor.next_object()
        return self._document._from_son(
            raw, _auto_dereference=self._auto_dereference) 
[docs]
    def no_cache(self):
        """Convert to a non-caching queryset
        """
        if self._result_cache is not None:
            raise OperationError('QuerySet already cached')
        return self._clone_into(QuerySetNoCache(self._document,
                                                self._collection)) 
    def _get_code(self, func):
        f_scope = {}
        if isinstance(func, Code):
            f_scope = func.scope
            func = str(func)
        func = Code(self._sub_js_fields(func), f_scope)
        return func
    async def _check_delete_rules(self, doc, queryset, cascade_refs,
                                  write_concern):
        """Checks the delete rules for documents being deleted in a queryset.
        Raises an exception if any document has a DENY rule."""
        delete_rules = doc._meta.get('delete_rules') or {}
        # Check for DENY rules before actually deleting/nullifying any other
        # references
        delete_rules = delete_rules.copy()
        fields = [d async for d in self]
        for rule_entry in delete_rules:
            document_cls, field_name = rule_entry
            if document_cls._meta.get('abstract'):
                continue
            rule = doc._meta['delete_rules'][rule_entry]
            if rule == DENY and document_cls.objects(
                    **{field_name + '__in': self}).count() > 0:
                msg = ("Could not delete document (%s.%s refers to it)"
                       % (document_cls.__name__, field_name))
                raise OperationError(msg)
        if not delete_rules:
            return
        r = None
        for rule_entry in delete_rules:
            document_cls, field_name = rule_entry
            if document_cls._meta.get('abstract'):
                continue
            rule = doc._meta['delete_rules'][rule_entry]
            if rule == CASCADE:
                cascade_refs = set() if cascade_refs is None else cascade_refs
                async for ref in queryset:
                    cascade_refs.add(ref.id)
                ref_q = document_cls.objects(**{field_name + '__in': fields,
                                                'id__nin': cascade_refs})
                count = await ref_q.count()
                if count > 0:
                    r = await ref_q.delete(write_concern=write_concern,
                                           cascade_refs=cascade_refs)
            elif rule in (NULLIFY, PULL):
                if rule == NULLIFY:
                    updatekw = {'unset__%s' % field_name: 1}
                else:
                    updatekw = {'pull_all__%s' % field_name: fields}
                r = await document_cls.objects(
                    **{field_name + '__in': fields}).update(
                        write_concern=write_concern, **updatekw)
        return r
    async def _document_delete(self, queryset, write_concern):
        """Delete the documents in queryset by calling the document's delete
        method."""
        cnt = 0
        async for doc in queryset:
            await doc.delete(**write_concern)
            cnt += 1
        return cnt
    def _get_map_reduce_args(self, queryset, finalize_f, scope, limit, output):
        mr_args = {"query": queryset._query}
        if finalize_f:
            finalize_f_scope = {}
            if isinstance(finalize_f, Code):
                finalize_f_scope = finalize_f.scope
                finalize_f = str(finalize_f)
            finalize_f_code = queryset._sub_js_fields(finalize_f)
            finalize_f = Code(finalize_f_code, finalize_f_scope or None)
            mr_args["finalize"] = finalize_f
        if scope:
            mr_args["scope"] = scope
        if limit:
            mr_args["limit"] = limit
        if output == "inline" and not queryset._ordering:
            inline = True
            mr_args["out"] = {"inline": 1}
        else:
            inline = False
            if isinstance(output, str):
                mr_args["out"] = output
            elif isinstance(output, dict):
                ordered_output = []
                for part in ("replace", "merge", "reduce"):
                    value = output.get(part)
                    if value:
                        ordered_output.append((part, value))
                        break
                else:
                    raise OperationError("actionData not specified for output")
                db_alias = output.get("db_alias")
                remaing_args = ["db", "sharded", "nonAtomic"]
                if db_alias:
                    ordered_output.append(("db", get_db(db_alias).name))
                    del remaing_args[0]
                for part in remaing_args:
                    value = output.get(part)
                    if value:
                        ordered_output.append((part, value))
                mr_args["out"] = SON(ordered_output)
        return mr_args, inline 
[docs]
class QuerySetNoCache(QuerySet):
    """A non caching QuerySet"""
[docs]
    def cache(self):
        """Convert to a caching queryset
        """
        return self._clone_into(QuerySet(self._document, self._collection)) 
    def __iter__(self):
        queryset = self
        if queryset._iter:
            queryset = self.clone()
        queryset.rewind()
        return queryset