python - How to use celery for inserting data to mongodb using mongoengine -
I am trying to use celery to incorporate large data into my mongodb, but problem mangodb in concurrency If I send more than one work to the vegetable part of the data at one time, then it will be inserted into mongodb and there will be no other data. I think this is because MongoDob locks the database on the functioning of inserting data, but I need a solution to be able to send several functions of the same type to enter data into the database. As if the database is locked when it is waiting to unlock it. Here is a part of my code:
@ celery.task (name = 'celery_tasks.add_book_product') def add_book_product (product_dict, store_id): connect (DefaultConfig.MONGODB_DB, host = DefaultConfig.MONGODB_HOST ) Try store_obj = Store.objects.get (pk = store_id): book = books.objects.get (pk = product_dict ['rawbook']) Try: product_obj = product. Absence. (Store = store_obj, related_book = book, kind = 'book') print (found for "product {} store {}". Format (product_obj.id, store_obj.id)) product_obj.count = int (product_dict ['count ']) Product_obj.buy_price = int (product_dict [product_obj.sell_price = int (product_dict [' sell_book ']) except product_obj.save () (DoesNotExist, ValidationError): product_obj = product (store = store_obj, related_book = book, kind = 'Book', count = int (product_dict ['count']), buy_price = int (product_dict ['buy_book']), sell_price = int (product_dict ['sell_book']), name = book.name_fa) product_obj.save () Print ("Adding books to store oz ...") store_obj.products.append (product_obj) store_ob j.save () print ("to store oboes To engage Itaben ") return" product saved for {} stores {} "). Excluding the format (product_obj.id, store_obj.id) (DoesNotExist, ValidationError): traceback.print_exc () Returns "Raw book {{PRODUCT WITH A raw book}} does not exist." Format (product_dict ['rawbook'])
Using multi-processing by default In celery is done for concurrent performance of the work. But there are two ways to ensure that only one task is executed at any one time.
Solution 1: When you call a celery employee
Celery - your_app worker - l info The default concurrency is the number of the core in your machine. So if you start an employee like this then
celery -e your_app worker -l info -c 1 this is just a time Work runs. If you have some other tasks to execute, then you can start a new queue and allocate a worker to do this.
Solution 2: It's a bit complicated, you have to use lock in your work, something like that.
if acquire_lock (): try: #do something finally: release_lock () Return You can read more about this.
Comments
Post a Comment