2012-03-08 12 views
21

Veritabanı oturumlarımın düzgün bir şekilde nasıl açılıp kapatılacağını anlamakta zorluk çekiyorum, squmchunch belgelerine göre anladım, eğer Scoped_session'ı Oturum nesnemi oluşturmak için kullanırsam ve sonra da Session nesnesini kullanırsam oturum oluşturmak için nesne, threadafe, temelde her iş parçacığı kendi oturumunu alacak ve bununla ilgili herhangi bir sorun olmayacak. Şimdi aşağıdaki örnek çalışıyor, oturumları düzgün bir şekilde kapatıp kapatmayacağını ve doğru bir şekilde izlediğimi (mysql içinde "GÖSTERGE PROSESİ LİSTESİ" ni yürütürken) izleyen sonsuz bir döngüye koydum, bağlantılar büyümeye devam ediyor, kapatmıyor , session.close() işlevini kullanmamıza ve her çalışmanın sonunda scoped_session nesnesini bile kaldırmamıza rağmen. Neyi yanlış yapıyorum? Daha büyük bir uygulamadaki amacım, gereken en az sayıda veritabanı bağlantısını kullanmaktır çünkü mevcut çalışma uygulamam gerekli olduğu her yöntemde yeni bir oturum oluşturur ve geri dönmeden önce kapatır, bu da verimsiz gibi görünür.SQLAlchemy çoklu iş parçacıklı uygulamalarda düzgün oturum işleme

from sqlalchemy import create_engine 
from sqlalchemy.orm import sessionmaker, scoped_session 
from threading import Thread 
from Queue import Queue, Empty as QueueEmpty 
from models import MyModel 


DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname' 


class MTWorker(object): 

    def __init__(self, worker_count=5): 
     self.task_queue = Queue() 
     self.worker_count = worker_count 
     self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) 
     self.DBSession = scoped_session(
      sessionmaker(
       autoflush=True, 
       autocommit=False, 
       bind=self.db_engine 
      ) 
     ) 

    def _worker(self): 
     db_session = self.DBSession() 
     while True: 
      try: 
       task_id = self.task_queue.get(False) 
       try: 
        item = db_session.query(MyModel).filter(MyModel.id == task_id).one() 
        # do something with item 
       except Exception as exc: 
        # if an error occurrs we skip it 
        continue 

       finally: 
        db_session.commit() 
        self.task_queue.task_done() 
      except QueueEmpty: 
       db_session.close() 
       return 

    def start(self): 
     try: 
      db_session = self.DBSession() 
      all_items = db_session.query(MyModel).all() 
      for item in all_items: 
       self.task_queue.put(item.id) 

      for _i in range(self.worker_count): 
       t = Thread(target=self._worker) 
       t.start() 

      self.task_queue.join() 
     finally: 
      db_session.close() 
      self.DBSession.remove() 


if __name__ == '__main__': 
    while True: 
     mt_worker = MTWorker(worker_count=50) 
     mt_worker.start() 

cevap

36

yalnızca (veritabanı başına) süreci bir kez create_engine ve scoped_session arayarak edilmelidir. Her biri kendi bağlantı havuzunu veya oturumlarını (sırasıyla) alır, böylece yalnızca bir havuzu oluşturduğunuzdan emin olmak istersiniz. Sadece küresel bir modül seviyesi yap. Daha preciesly bundan daha da oturumları yönetmek için gerekirse, muhtemelen yapmak için scoped_session

diğer değişiklik kullanarak edilmemelidir bir oturumu sanki doğrudan DBSession kullanmaktır. scoped_session'daki oturum yöntemlerini çağırmak, saydam bir şekilde, gerekirse bir iş parçacığı oturumu oluşturur ve yöntem çağrısını oturumuna iletir. varsayılan olarak 5'tir bağlantı havuzun pool_size değildir

Başka bir şey

farkında olmak. Birçok uygulama için, iyi olduğunu, ancak iş parçacığı sürü oluşturuyorsanız, bunu gerçekten çok yardımcı oldu, parametre

DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname' 
db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) 
DBSession = scoped_session(
    sessionmaker(
     autoflush=True, 
     autocommit=False, 
     bind=db_engine 
    ) 
) 


class MTWorker(object): 

    def __init__(self, worker_count=5): 
     self.task_queue = Queue() 
     self.worker_count = worker_count 
# snip 
+1

bilgiler için teşekkür ederiz bu melodinin gerekebilir. Kral saygılar! – andrean

İlgili konular