2016-07-25 14 views
6

Parametreyi Airflow'taki bağımlı görevlere aktarmanın yolu nedir? Birçok bashes dosyası var ve bu yaklaşımı hava akışına geçirmeye çalışıyorum, ancak bazı özellikleri görevler arasında nasıl geçeceğimi bilmiyorum.Hava akışı parametreleri bağımlı göreve geçiş

Bu gerçek bir örnektir: t2 olarak

#sqoop bash template 
sqoop_template = """ 
     sqoop job --exec {{params.job}} -- --target-dir {{params.dir}} --outdir /src/ 
    """ 

s3_template = """ 
     s3-dist-cp --src= {{params.dir}} "--dest={{params.s3}} 
    """ 



#Task of extraction in EMR 
t1 = BashOperator(
     task_id='extract_account', 
     bash_command=sqoop_template, 
     params={'job': 'job', 'dir': 'hdfs:///account/' + time.now().strftime("%Y-%m-%d-%H-%M-%S")}, 
     dag=dag) 
#Task to upload in s3 backup. 
t2 = BashOperator(
     task_id='s3_upload', 
     bash_command=s3_template, 
     params={}, #here i need the dir name created in t1 
     depends_on_past=True 
    ) 

t2.set_upstream(t1) 

i t1 oluşturulan dir name erişmek gerekir.

gelişmeler bekliyoruz yüzden değil kesin çözüm Çözüm

#Execute a valid job sqoop 
def sqoop_import(table_name, job_name): 
    s3, hdfs = dirpath(table_name) 
    sqoop_job = job_default_config(job_name, hdfs) 
    #call(sqoop_job) 
    return {'hdfs_dir': hdfs, 's3_dir': s3} 

def s3_upload(**context): 
    hdfs = context['task_instance'].xcom_pull(task_ids='sqoop_import')['hdfs_dir'] 
    s3 = context['task_instance'].xcom_pull(task_ids='sqoop_import')['s3_dir'] 
    s3_cpdist_job = ["s3-dist-cp", "--src=%s" % (hdfs), "--dest=%s" % (s3)] 
    #call(s3_cpdist_job) 
    return {'s3_dir': s3} #context['task_instance'].xcom_pull(task_ids='sqoop_import') 

def sns_notify(**context): 
    s3 = context['task_instance'].xcom_pull(task_ids='distcp_s3')['s3_dir'] 
    client = boto3.client('sns') 
    arn = 'arn:aws:sns:us-east-1:744617668409:pipeline-notification-stg' 
    response = client.publish(TargetArn=arn, Message=s3) 
    return response 

. Teşekkürler.

+0

Bir çözüm, bence, t1'de oluşturulan özelliklerle bir dosya oluşturmak ve aynı dosyayı t2'de kullanmaktır. –

cevap

İlgili konular