2016-12-22 27 views
6

PostgreSQL ve SQLAlchemy'yi çocuk süreçlerini başlatan bir ana işlemden oluşan bir projede kullanıyorum. Tüm bu işlemler SQLAlchemy ile veritabanına erişir.SQLAlchemy ve birden çok işlemle bağlantı sorunları

Tekrarlanabilir bağlantı hataları yaşıyorum: İlk birkaç alt işlem doğru şekilde çalışıyor, ancak bir süre sonra bir bağlantı hatası ortaya çıkıyor. İşte bir MWCE: benim sistemde

from sqlalchemy.ext.declarative import declarative_base 
from sqlalchemy import Column, Integer, create_engine 
from sqlalchemy.orm import sessionmaker 

DB_URL = 'postgresql://user:[email protected]/database' 

Base = declarative_base() 

class Dummy(Base): 
    __tablename__ = 'dummies' 
    id = Column(Integer, primary_key=True) 
    value = Column(Integer) 

engine = None 
Session = None 
session = None 

def init(): 
    global engine, Session, session 
    engine = create_engine(DB_URL) 
    Base.metadata.create_all(engine) 
    Session = sessionmaker(bind=engine) 
    session = Session() 

def cleanup(): 
    session.close() 
    engine.dispose() 

def target(id): 
    init() 
    try: 
     dummy = session.query(Dummy).get(id) 
     dummy.value += 1 
     session.add(dummy) 
     session.commit() 
    finally: 
     cleanup() 

def main(): 
    init() 
    try: 
     dummy = Dummy(value=1) 
     session.add(dummy) 
     session.commit() 
     p = multiprocessing.Process(target=target, args=(dummy.id,)) 
     p.start() 
     p.join() 
     session.refresh(dummy) 
     assert dummy.value == 2 
    finally: 
     cleanup() 

if __name__ == '__main__': 
    i = 1 
    while True: 
     print(i) 
     main() 
     i += 1 

(PostgreSQL 9.6, SQLAlchemy 1.1.4, psycopg2 2.6.2, Python 2.7, Ubuntu 14.04) bu

1 
2 
3 
4 
5 
6 
7 
8 
9 
10 
11 
Traceback (most recent call last): 
    File "./fork_test.py", line 64, in <module> 
    main() 
    File "./fork_test.py", line 55, in main 
    session.refresh(dummy) 
    File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1422, in refresh 
    only_load_props=attribute_names) is None: 
    File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident 
    return q.one() 
    File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2756, in one 
    ret = self.one_or_none() 
    File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2726, in one_or_none 
    ret = list(self) 
    File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2797, in __iter__ 
    return self._execute_and_instances(context) 
    File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2820, in _execute_and_instances 
    result = conn.execute(querycontext.statement, self._params) 
    File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 945, in execute 
    return meth(self, multiparams, params) 
    File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection 
    return connection._execute_clauseelement(self, multiparams, params) 
    File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement 
    compiled_sql, distilled_params 
    File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context 
    context) 
    File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception 
    exc_info 
    File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause 
    reraise(type(exception), exception, tb=exc_tb, cause=cause) 
    File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context 
    context) 
    File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 469, in do_execute 
    cursor.execute(statement, parameters) 
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly 
    This probably means the server terminated abnormally 
    before or while processing the request. 
[SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: {'param_1': 11074}] 

Bu tekrarlanabilir ve her zaman en çöküyor verir aynı iterasyon.

SQLAlchemy documentation ve elsewhere tarafından önerilen şekilde çataldan sonra yeni bir motor ve oturum oluşturuyorum. İlginçtir, aşağıdaki biraz farklı bir yaklaşım çökmez:

import contextlib 
import multiprocessing 

import sqlalchemy 
from sqlalchemy.ext.declarative import declarative_base 
from sqlalchemy import Column, Integer, create_engine 
from sqlalchemy.orm import sessionmaker 

DB_URL = 'postgresql://user:[email protected]/database' 

Base = declarative_base() 

class Dummy(Base): 
    __tablename__ = 'dummies' 
    id = Column(Integer, primary_key=True) 
    value = Column(Integer) 

@contextlib.contextmanager 
def get_session(): 
    engine = sqlalchemy.create_engine(DB_URL) 
    Base.metadata.create_all(engine) 
    Session = sessionmaker(bind=engine) 
    session = Session() 
    try: 
     yield session 
    finally: 
     session.close() 
     engine.dispose() 

def target(id): 
    with get_session() as session: 
     dummy = session.query(Dummy).get(id) 
     dummy.value += 1 
     session.add(dummy) 
     session.commit() 

def main(): 
    with get_session() as session: 
     dummy = Dummy(value=1) 
     session.add(dummy) 
     session.commit() 
     p = multiprocessing.Process(target=target, args=(dummy.id,)) 
     p.start() 
     p.join() 
     session.refresh(dummy) 
     assert dummy.value == 2 

if __name__ == '__main__': 
    i = 1 
    while True: 
     print(i) 
     main() 
     i += 1 

orijinal kod daha karmaşıktır ve sadece ben anlamak istiyorum ikincisi sürüme yanına açık edilemez yana neden bu eserlerin diğeri doesn 't.

Sadece belirgin fark, kilitleme kodunun motor ve oturum için global değişkenleri kullanmasıdır - bunlar çocuk süreçleri ile yazma üzerine yazılır. Ancak, onları çataldan hemen sonra sıfırladığımdan, bunun nasıl bir sorun olabileceğini anlamıyorum.

Güncelleme

Python 2.7 ve Python 3.4 ikisini de kullanarak son sqlalchemy (1.1.5) ile iki kod parçalarını yeniden koştu. Her iki sonuçta da temelde yukarıda açıklandığı gibi. Bununla birlikte, Python 2.7'de, ilk kod parçasının çarpışması şimdi 13. tekrarda (tekrarlanabilir) olurken, 3.4'te üçüncü yinelemede (tekrarlanabilir şekilde) zaten gerçekleşir. İkinci kod parçası, her iki sürümde sorunsuz çalışır. İşte 3.4 den traceback var: Burada

1 
2 
3 
Traceback (most recent call last): 
    File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context 
    context) 
    File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute 
    cursor.execute(statement, parameters) 
psycopg2.OperationalError: server closed the connection unexpectedly 
    This probably means the server terminated abnormally 
    before or while processing the request. 


The above exception was the direct cause of the following exception: 

Traceback (most recent call last): 
    File "fork_test.py", line 64, in <module> 
    main() 
    File "fork_test.py", line 55, in main 
    session.refresh(dummy) 
    File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 1424, in refresh 
    only_load_props=attribute_names) is None: 
    File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident 
    return q.one() 
    File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2749, in one 
    ret = self.one_or_none() 
    File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2719, in one_or_none 
    ret = list(self) 
    File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2790, in __iter__ 
    return self._execute_and_instances(context) 
    File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2813, in _execute_and_instances 
    result = conn.execute(querycontext.statement, self._params) 
    File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 945, in execute 
    return meth(self, multiparams, params) 
    File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection 
    return connection._execute_clauseelement(self, multiparams, params) 
    File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement 
    compiled_sql, distilled_params 
    File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context 
    context) 
    File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception 
    exc_info 
    File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause 
    reraise(type(exception), exception, tb=exc_tb, cause=cause) 
    File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 186, in reraise 
    raise value.with_traceback(tb) 
    File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context 
    context) 
    File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute 
    cursor.execute(statement, parameters) 
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly 
    This probably means the server terminated abnormally 
    before or while processing the request. 
[SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: {'param_1': 3397}] 

PostgreSQL günlüğü var (o 2.7'ye için aynı ve 3.4):
2017-01-18 10:59:36 UTC [22429-1] LOG: database system was shut down at 2017-01-18 10:59:35 UTC 
2017-01-18 10:59:36 UTC [22429-2] LOG: MultiXact member wraparound protections are now enabled 
2017-01-18 10:59:36 UTC [22428-1] LOG: database system is ready to accept connections 
2017-01-18 10:59:36 UTC [22433-1] LOG: autovacuum launcher started 
2017-01-18 10:59:36 UTC [22435-1] [unknown]@[unknown] LOG: incomplete startup packet 
2017-01-18 11:00:10 UTC [22466-1] [email protected] LOG: SSL error: decryption failed or bad record mac 
2017-01-18 11:00:10 UTC [22466-2] [email protected] LOG: could not receive data from client: Connection reset by peer 

(Not eksik başlangıç ​​paketinizde is harmless hakkında mesajı)

+0

Hangi sürüm python ile çalışıyorsunuz? –

+0

@ThomasMoreau: "PostgreSQL 9.6, SQLAlchemy 1.1.4, psycopg2 2.6.2, Python 2.7, Ubuntu 14.04";) –

+0

Örnekte gösterdiğiniz gibi aynı bağlantı parametrelerini kullanıyor musunuz? Gerçek kodunuzda özel bir bağlantı havuzu uygulaması var mı? – Nicolai

cevap

9

Ekleme ile "How do I use engines/connections/sessions with Python multiprocessing, or os.fork()?" numaralı alıntı:

ve

However, for the case of a transaction-active Session or Connection being shared, there’s no automatic fix for this; an application needs to ensure a new child process only initiate new Connection objects and transactions, as well as ORM Session objects.

sorunu Connection tutan yok ki canlı küresel session devralan çatallı çocuk süreç kaynaklanıyor. target, init'u çağırdığında, engine ve session numaralı global başvuruların üzerine yazar ve böylece yeniden sayımlarını çocukta 0'a düşürerek, sonuçlandırılmasını zorlar. Örneğin, bir şekilde çocuğunuzdaki miras kalan oturuma başka bir referans oluşturursanız, bunun temizlenmesini engellersiniz - ama bunu yapmayın. main'un birleştirildiği ve her zamanki gibi işletmeye döndükten sonra, şimdi potansiyel olarak sonlandırılmış veya senkronize olmayan bağlantıyı kullanmaya çalışıyor.Bunun neden bir hataya neden olduğu konusunda bir miktar iterasyondan sonra emin değilim.

tek yolu

çatallanmasını önce tüm oturumları kapat

  1. için
  2. Çağrısı engine.dispose()

yapmanız yoludur globalsi kullanarak bu durumun üstesinden gelmek için. Bu, bağlantıların çocuğa sızmasını önleyecektir. Örneğin:

def main(): 
    global session 
    init() 
    try: 
     dummy = Dummy(value=1) 
     session.add(dummy) 
     session.commit() 
     dummy_id = dummy.id 
     # Return the Connection to the pool 
     session.close() 
     # Dispose of it! 
     engine.dispose() 
     # ...or call your cleanup() function, which does the same 
     p = multiprocessing.Process(target=target, args=(dummy_id,)) 
     p.start() 
     p.join() 
     # Start a new session 
     session = Session() 
     dummy = session.query(Dummy).get(dummy_id) 
     assert dummy.value == 2 
    finally: 
     cleanup() 

Kişisel İkinci örnek çocukta tamamlanmasına tetiklemez, ve yüzden sadece ilk olarak kırık olarak olabilir ama yine de oturumun bir kopyasını miras olarak, iş gibi görünüyor ve bağlantısı yerel olarak main'da tanımlanmıştır.

İlgili konular