Parametreyi Airflow'taki bağımlı görevlere aktarmanın yolu nedir? Birçok bashes dosyası var ve bu yaklaşımı hava akışına geçirmeye çalışıyorum, ancak bazı özellikleri görevler arasında nasıl geçeceğimi bilmiyorum.Hava akışı parametreleri bağımlı göreve geçiş
Bu gerçek bir örnektir: t2 olarak
#sqoop bash template
sqoop_template = """
sqoop job --exec {{params.job}} -- --target-dir {{params.dir}} --outdir /src/
"""
s3_template = """
s3-dist-cp --src= {{params.dir}} "--dest={{params.s3}}
"""
#Task of extraction in EMR
t1 = BashOperator(
task_id='extract_account',
bash_command=sqoop_template,
params={'job': 'job', 'dir': 'hdfs:///account/' + time.now().strftime("%Y-%m-%d-%H-%M-%S")},
dag=dag)
#Task to upload in s3 backup.
t2 = BashOperator(
task_id='s3_upload',
bash_command=s3_template,
params={}, #here i need the dir name created in t1
depends_on_past=True
)
t2.set_upstream(t1)
i t1 oluşturulan dir name erişmek gerekir.
gelişmeler bekliyoruz yüzden değil kesin çözüm Çözüm
#Execute a valid job sqoop
def sqoop_import(table_name, job_name):
s3, hdfs = dirpath(table_name)
sqoop_job = job_default_config(job_name, hdfs)
#call(sqoop_job)
return {'hdfs_dir': hdfs, 's3_dir': s3}
def s3_upload(**context):
hdfs = context['task_instance'].xcom_pull(task_ids='sqoop_import')['hdfs_dir']
s3 = context['task_instance'].xcom_pull(task_ids='sqoop_import')['s3_dir']
s3_cpdist_job = ["s3-dist-cp", "--src=%s" % (hdfs), "--dest=%s" % (s3)]
#call(s3_cpdist_job)
return {'s3_dir': s3} #context['task_instance'].xcom_pull(task_ids='sqoop_import')
def sns_notify(**context):
s3 = context['task_instance'].xcom_pull(task_ids='distcp_s3')['s3_dir']
client = boto3.client('sns')
arn = 'arn:aws:sns:us-east-1:744617668409:pipeline-notification-stg'
response = client.publish(TargetArn=arn, Message=s3)
return response
. Teşekkürler.
Bir çözüm, bence, t1'de oluşturulan özelliklerle bir dosya oluşturmak ve aynı dosyayı t2'de kullanmaktır. –