2016-04-19 25 views
19

değişkenine erişmeniz gerekiyor Bu forumda gerçekten yeni bir kullanıcıyım. Ama şirketimiz için bir süredir hava akışıyla oynuyordum. Üzgünüm bu soru gerçekten aptalca geliyorsa.Hava akışında execution_date:

BashOperators grubunu kullanarak bir boru hattı yazıyorum.

from airflow import DAG 
from airflow.operators import BashOperator, PythonOperator 
from dateutil import tz 
import datetime 

datetime_obj = datetime.datetime 

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()), 
    'email': ['[email protected]'], 
    'email_on_failure': True, 
    'email_on_retry': False, 
    'retries': 2, 
    'retry_delay': datetime.timedelta(minutes=5), 
} 


current_datetime = datetime_obj.now(tz=tz.tzlocal()) 

dag = DAG(
    'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60)) 

curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"' 


t1 = BashOperator(
    task_id='rest-api-1', 
    bash_command=curl_cmd, 
    dag=dag) 

ben current_datetime= datetime_obj.now(tz=tz.tzlocal()) yapıyorum fark ederseniz: Temelde, her Görev için, ben sadece 'curl'

Bu benim boru hattı (çok basitleştirilmiş sürümü) neye benzediği kullanarak REST API aramak istediğiniz yerine benim burada istiyorum olduğunu 'execution_date' ı doğrudan 'execution_date' kullanmak ve bir değişkene atamak nasıl

i n python dosyam?

Bu genel erişim yayma sorununu yaşıyorum. Herhangi bir yardım gerçekten takdir edilecektir.

Teşekkür

cevap

13

PythonOperator yapıcı bir 'provide_context' parametre alır (https://pythonhosted.org/airflow/code.html bakınız). True ise, bir dizi parametreyi python_callable'a kwargs yoluyla geçirir. kwargs ['execution_date'] istediğin şey, inanıyorum. Böyle

şey:

def python_method(ds, **kwargs): 
    Variable.set('execution_date', kwargs['execution_date']) 
    return 

doit = PythonOperator(
    task_id='doit', 
    provide_context=True, 
    python_callable=python_method, 
    dag=dag) 

Ben BashOperator ile bunu nasıl emin değilim ama bu konu ile başlayabilir: https://github.com/airbnb/airflow/issues/775

+2

Teşekkür yürütmek() yöntemi içinde olmalıdır. Bu yaklaşımla, bir pythonOperator örneğidir, yani cost_datetime = 'execution_date' ayarlayacağım ve döndüreceğim kwargs ['execution_date'] 'i kullanmamı sağlayan allow_context = true ile bir görev t1'im olacaktır. Sonra görevimi oluşturuyorum t2: BashOperator: içinde (XCOM kullanarak) çekip değişkenlerimi kullanacağım. Yani görüyorsunuz, 2 görev oluşturmalıyım. hangi seksi değil;) Eminim (ve umarım haklıyım) PythonOperator kullanmadan doğrudan python koduna 'execution_date' erişebileceğim bir yol var. Ama nasıl yapılacağını anlayamıyorum :( – Roger

+0

bu yaklaşım benim için çözdü! – Nico

13

BashOperator bash_command argüman bir şablondur. Herhangi bir şablonda execution_date öğesine execution_date değişkenini kullanarak datetime nesnesi olarak erişebilirsiniz. Şablonda, onu işlemek için herhangi bir jinja2 yöntemini kullanabilirsiniz. Örneğin, BashOperator bash_command dizesi olarak aşağıdaki kullanma: sadece yapıldıkları tarihten dize eşdeğer istiyorsanız

# pass in the first of the current month 
some_command.sh {{ execution_date.replace(day=1) }} 

# last day of previous month 
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }} 

, ds (tire olmadan bir datestamp (YYYY-AA-GG), ds-nodash döner aynı dönecektir YYYYMMDD, vb. Makrolar hakkında daha fazla bilgi için Api Docs numaralı telefonu kullanabilirsiniz.

+0

Bu, doğru cevaptır.Görenin tamamlanmış bir sürümünü göstermek için onu düzenlerim, örneğin, 't1 = BashOperator ( task_id = 'rest-api-1', bash_command = 'curl -XPOST' '+ hostname +': 8000/run? st = {{execution_date}} "', dag = dag)' – Davos

0

execution_date, (datetime.datetime)

{{ execution_date }} 
1

Sana bir görev hesabı dışındaki hava akışı bağlamdan değerlerle değişkenleri atayamazsınız düşünüyorum, bunlar çalışma anında kullanılabilir.

  • Önce dag dosya yorumlanır ve ayrıştırılır: Temelde bir dag hava akımı yüklenen ve yürütülür 2 farklı adımlar vardır. Çalışmak ve derlemek zorundadır ve görev tanımları doğru olmalıdır (sözdizimi hatası veya hiçbir şey). Bu adımda, bazı değerleri doldurmak için işlev çağrıları yaparsanız, bu işlevler hava akışı içeriğine erişemez (örneğin, yürütme tarihi, örneğin, bir miktar geri doldurma işlemi yapıyorsanız).

  • İkinci adım dag uygulamasıdır.Sadece bu ikinci adımda, hava akışı (execution_date, ds, etc...) tarafından sağlanan değişkenler, sistemin çalıştırılmasıyla ilgili oldukları için kullanılabilir.

Yani Hava akımı bağlamı kullanarak küresel değişkenleri başlatılamıyor, ancak, Hava akımı aynı etkiyi elde etmek için size birden mekanizmalarını verir: sizin komuta jinja şablonunu kullanarak

  1. (o olabilir kodda veya dosyada bir dizede, her ikisi de işlenecektir). Kullanılabilir şablonların listesi: https://airflow.apache.org/code.html#default-variables. Bazı işlevlerin, özellikle günler arası delta ve tarih biçimlendirmesi için kullanılabilir olduğunu unutmayın.

  2. İçeriği ilettiğiniz bir PythonOperator kullanma (provide_context bağımsız değişkeni ile). Bu, aynı şablona kwargs['<variable_name'] sözdizimi ile erişmenizi sağlayacaktır. Buna ihtiyacınız varsa, bir PythonOperator'dan bir değer döndürebilirsiniz, bu herhangi bir şablonda daha sonra kullanabileceğiniz bir XCOM değişkeninde saklanır. XCOM değişkenlerine erişim, bu sözdizimini kullanır:

  3. Kendi operatörünüzü yazarsanız, dict context ile hava akımı değişkenlerine erişebilirsiniz.

+1

Teknik olarak 3 yol var. jinja şablonunu kullanarak, bir python_callable içindeki kwargs kullanarak veya bir operatördeki context ['execution_date'] 'i kullanarak. Bu cevabı tamamen kaldırmak veya en azından en iyisi silmek için en iyisi – Davos

+1

Bu cevabı yazdığımdan beri hava akışı hakkında çok şey öğrendim, daha doğru ve hassas hale getirmek için düzenledim! – Babcool

+0

İlk özet durumunuzu yapmak için bazı küçük düzenlemeler yaptım. Aşağıdaki 2 puan ile tutarlı. Ekstra puan için daha fazla kod örneği ekleyebilmenize rağmen, bu yanıtın doğru olduğunu düşünüyorum. – Davos

1
def execute(self, context): 
    execution_date = context.get("execution_date") 

Bu Operatör

+0

Özel bir operatör oluşturuyorsanız, muhtemelen bu ne istediğinizi. –

İlgili konular