Veritabanı oturumlarımın düzgün bir şekilde nasıl açılıp kapatılacağını anlamakta zorluk çekiyorum, squmchunch belgelerine göre anladım, eğer Scoped_session'ı Oturum nesnemi oluşturmak için kullanırsam ve sonra da Session nesnesini kullanırsam oturum oluşturmak için nesne, threadafe, temelde her iş parçacığı kendi oturumunu alacak ve bununla ilgili herhangi bir sorun olmayacak. Şimdi aşağıdaki örnek çalışıyor, oturumları düzgün bir şekilde kapatıp kapatmayacağını ve doğru bir şekilde izlediğimi (mysql içinde "GÖSTERGE PROSESİ LİSTESİ" ni yürütürken) izleyen sonsuz bir döngüye koydum, bağlantılar büyümeye devam ediyor, kapatmıyor , session.close() işlevini kullanmamıza ve her çalışmanın sonunda scoped_session nesnesini bile kaldırmamıza rağmen. Neyi yanlış yapıyorum? Daha büyük bir uygulamadaki amacım, gereken en az sayıda veritabanı bağlantısını kullanmaktır çünkü mevcut çalışma uygulamam gerekli olduğu her yöntemde yeni bir oturum oluşturur ve geri dönmeden önce kapatır, bu da verimsiz gibi görünür.SQLAlchemy çoklu iş parçacıklı uygulamalarda düzgün oturum işleme
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from threading import Thread
from Queue import Queue, Empty as QueueEmpty
from models import MyModel
DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname'
class MTWorker(object):
def __init__(self, worker_count=5):
self.task_queue = Queue()
self.worker_count = worker_count
self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
self.DBSession = scoped_session(
sessionmaker(
autoflush=True,
autocommit=False,
bind=self.db_engine
)
)
def _worker(self):
db_session = self.DBSession()
while True:
try:
task_id = self.task_queue.get(False)
try:
item = db_session.query(MyModel).filter(MyModel.id == task_id).one()
# do something with item
except Exception as exc:
# if an error occurrs we skip it
continue
finally:
db_session.commit()
self.task_queue.task_done()
except QueueEmpty:
db_session.close()
return
def start(self):
try:
db_session = self.DBSession()
all_items = db_session.query(MyModel).all()
for item in all_items:
self.task_queue.put(item.id)
for _i in range(self.worker_count):
t = Thread(target=self._worker)
t.start()
self.task_queue.join()
finally:
db_session.close()
self.DBSession.remove()
if __name__ == '__main__':
while True:
mt_worker = MTWorker(worker_count=50)
mt_worker.start()
bilgiler için teşekkür ederiz bu melodinin gerekebilir. Kral saygılar! – andrean