2016-10-03 40 views
10

Yeryüzündeki insanlara merhaba! Spark görevlerini planlamak ve çalıştırmak için Airflow kullanıyorum. Tüm bu zamana kadar bulduğum Airflow'un yönetebileceği python DAG'leri.
DAG örnek:Hava akışında Spark kodu nasıl çalıştırılır?

spark_count_lines.py 
import logging 

from airflow import DAG 
from airflow.operators import PythonOperator 

from datetime import datetime 

args = { 
    'owner': 'airflow' 
    , 'start_date': datetime(2016, 4, 17) 
    , 'provide_context': True 
} 

dag = DAG(
    'spark_count_lines' 
    , start_date = datetime(2016, 4, 17) 
    , schedule_interval = '@hourly' 
    , default_args = args 
) 

def run_spark(**kwargs): 
    import pyspark 
    sc = pyspark.SparkContext() 
    df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt') 
    logging.info('Number of lines in people.txt = {0}'.format(df.count())) 
    sc.stop() 

t_main = PythonOperator(
    task_id = 'call_spark' 
    , dag = dag 
    , python_callable = run_spark 
) 

Sorun Python kodunda iyi değilim ve Java ile yazılmış bazı görevleri olması. Benim sorum Python DAG içinde Spark Java kavanoz çalıştırmak nasıl? Ya da belki başka bir yolu var mı? Kıvılcım gönderdim: http://spark.apache.org/docs/latest/submitting-applications.html
Ama her şeyi nasıl birbirine bağlayacağımı bilmiyorum. Belki birisi daha önce kullandı ve çalışma örneği var. Zaman ayırdığın için teşekkürler!

cevap

9

BashOperator kullanabilmeniz gerekir.

from airflow.operators.bash_operator import BashOperator 

import os 
import sys 

seti gerekli yolları:

os.environ['SPARK_HOME'] = '/path/to/spark/root' 
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin')) 

ve eklemek operatörü: Kolayca bu uzatabilirsiniz

spark_task = BashOperator(
    task_id='spark_java', 
    bash_command='spark-submit --class {{ params.class }} {{ params.jar }}', 
    params={'class': 'MainClassName', 'jar': '/path/to/your.jar'}, 
    dag=dag 
) 

olduğu gibi kodunuzun geri kalanı, ithalat gerekli sınıf ve sistem paketleri tutulması Jinja şablonlarını kullanarak ek argümanlar sağlamak.

Sen elbette, örneğin sizin durumunuzda uygun bir şablonla bash_command değiştirerek olmayan Spark senaryo için bu ayarlayabilirsiniz:

bash_command = 'java -jar {{ params.jar }}' 

ve params ayarlanması. 1.8 sürümü (serbest bugün) itibariyle

6

Hava akışı,

sahiptir

SparkSQLHook kodu - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py

SparkSubmitHook kodu - Bu iki yeni Spark operatörleri/kancalar itibariyle "katkı" dalında olduğu https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py

Bildirimi 1.8 sürümü yani (iyi) belgelenmiştir.

Bu yüzden SparkSubmitOperator'ı, java kodunuzu Spark yürütme için göndermek için kullanabilirsiniz.

+0

SparkSQLOperator, ihtiyacım olan şey gibi görünüyor - ancak, bağlantı dizesinin neye benzemesi gerektiğini bilmediğim için işe yaramayacağım - bu konuda bana yardımcı olabilecek herhangi bir belge var mı? –

+0

Bunu yapmazsanız - bağlantı varsayılan olarak idam moduna geçer - bkz. Https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py#L33 – Tagar

İlgili konular