2015-12-29 16 views
6

Güncelleme: Bu benim hatamın muhtemelen Spark ve/veya Hive'u nasıl kurduğumdan kaynaklanıyor gibi görünüyor. Pencere fonksiyonları ile çalışmak bir Databricks (hosted) defterinde oldukça basit görünüyor. Bunu yerel olarak nasıl ayarlayacağımı bulmalıyım.Spark 1.5.2'de HiveContext kullanılarak yapılan bir PySpark DataFrame ürününü nasıl edinebilirim?

Pencere işlevini kullanmam gereken bir Spark DataFrame'i var. * here üzerindeki yönergeleri izlemeye çalıştım, ancak bazı sorunlara rastladım.

benim ortamı oluşturulması: Bir DataFrame içine json açmak için

test_ts = {'adminDistrict': None, 
'city': None, 
'country': {'code': 'NA', 'name': 'UNKNOWN'}, 
'data': [{'timestamp': '2005-08-25T00:00:00Z', 'value': 369.89}, 
    {'timestamp': '2005-08-26T00:00:00Z', 'value': 362.44}, 
    {'timestamp': '2005-08-29T00:00:00Z', 'value': 368.3}, 
    {'timestamp': '2005-08-30T00:00:00Z', 'value': 382.6}, 
    {'timestamp': '2005-08-31T00:00:00Z', 'value': 377.84}, 
    {'timestamp': '2005-09-01T00:00:00Z', 'value': 380.74}, 
    {'timestamp': '2005-09-02T00:00:00Z', 'value': 370.33}, 
    {'timestamp': '2005-09-05T00:00:00Z', 'value': 370.33}, 
    {'timestamp': '2005-09-06T00:00:00Z', 'value': 361.5}, 
    {'timestamp': '2005-09-07T00:00:00Z', 'value': 352.79}, 
    {'timestamp': '2005-09-08T00:00:00Z', 'value': 354.3}, 
    {'timestamp': '2005-09-09T00:00:00Z', 'value': 353.0}, 
    {'timestamp': '2005-09-12T00:00:00Z', 'value': 349.35}, 
    {'timestamp': '2005-09-13T00:00:00Z', 'value': 348.82}, 
    {'timestamp': '2005-09-14T00:00:00Z', 'value': 360.24}, 
    {'timestamp': '2005-09-15T00:00:00Z', 'value': 357.61}, 
    {'timestamp': '2005-09-16T00:00:00Z', 'value': 347.14}, 
    {'timestamp': '2005-09-19T00:00:00Z', 'value': 370.0}, 
    {'timestamp': '2005-09-20T00:00:00Z', 'value': 362.82}, 
    {'timestamp': '2005-09-21T00:00:00Z', 'value': 366.11}, 
    {'timestamp': '2005-09-22T00:00:00Z', 'value': 364.46}, 
    {'timestamp': '2005-09-23T00:00:00Z', 'value': 351.8}, 
    {'timestamp': '2005-09-26T00:00:00Z', 'value': 360.74}, 
    {'timestamp': '2005-09-27T00:00:00Z', 'value': 356.63}, 
    {'timestamp': '2005-09-28T00:00:00Z', 'value': 363.64}, 
    {'timestamp': '2005-09-29T00:00:00Z', 'value': 366.05}], 
'maxDate': '2015-12-28T00:00:00Z', 
'minDate': '2005-08-25T00:00:00Z', 
'name': 'S&P GSCI Crude Oil Spot', 
'offset': 0, 
'resolution': 'DAY', 
'sources': ['trf'], 
'subtype': 'Index', 
'type': 'Commodities', 
'uid': 'TRF_INDEX_Z39824_PI'} 

bir işlevi:

import os 
import sys 
import datetime as dt 

os.environ["SPARK_HOME"] = '/usr/bin/spark-1.5.2' 
os.environ["PYTHONPATH"] = '/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip' 
sys.path.append('/usr/bin/spark-1.5.2/python') 
sys.path.append('/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip') 

import pyspark 
sc = pyspark.SparkContext() 
hiveContext = pyspark.sql.HiveContext(sc) 
sqlContext = pyspark.sql.SQLContext(sc) 
from pyspark.sql import Row 
from pyspark.sql.functions import struct 
from pyspark.sql import DataFrame 
from collections import OrderedDict 

benim verilerini ayarlama bir dataframe alınıyor

def ts_to_df(ts): 
    data = [] 
    for line in ts['data']: 
     data.append((dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), line['value'])) 
    return sc.parallelize(data).toDF(['Date', ts['name'].replace('&', '').replace(' ', '_')]) 

ve alma İçinde ne bir bakış:

bana bu hata veriyor

from pyspark.sql.functions import lag, col, lead 
from pyspark.sql.window import Window 

w = Window().partitionBy().orderBy(col('Date')) 
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show() 

: Bu bana gösteriyor

test_df = ts_to_df(test_ts) 
test_df.show() 

: Ben yaptığım hakkında hiçbir fikrim yok ve her şey ters gitmeye başladığı burada

+----------+----------------------+ 
|  Date|SP_GSCI_Crude_Oil_Spot| 
+----------+----------------------+ 
|2005-08-25|    369.89| 
|2005-08-26|    362.44| 
|2005-08-29|     368.3| 
|2005-08-30|     382.6| 
|2005-08-31|    377.84| 
|2005-09-01|    380.74| 
|2005-09-02|    370.33| 
|2005-09-05|    370.33| 
|2005-09-06|     361.5| 
|2005-09-07|    352.79| 
|2005-09-08|     354.3| 
|2005-09-09|     353.0| 
|2005-09-12|    349.35| 
|2005-09-13|    348.82| 
|2005-09-14|    360.24| 
|2005-09-15|    357.61| 
|2005-09-16|    347.14| 
|2005-09-19|     370.0| 
|2005-09-20|    362.82| 
|2005-09-21|    366.11| 
+----------+----------------------+ 

Ve geçerli:

Py4JJavaError: An error occurred while calling o59.select. : org.apache.spark.sql.AnalysisException: Could not resolve window function 'lead'. Note that, using window functions currently requires a HiveContext;

Bu yüzden bir HiveContext'e ihtiyacım var gibi görünüyor, değil mi? DataFrame'imi bir HiveContext kullanarak oluşturmam gerekiyor mu?

def ts_to_hive_df(ts): 
    data = [] 
    for line in ts['data']: 
     data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), 
       ts['name'].replace('&', '').replace(' ', '_'):line['value']}) 
    temp_rdd = sc.parallelize(data).map(lambda x: Row(**x)) 
    return hiveContext.createDataFrame(temp_rdd) 

test_df = ts_to_hive_df(test_ts) 
test_df.show() 

Ama bu bana bu hata veriyor: O zaman HiveContext kullanarak açıkça DataFrame oluşturmaya çalışalım

TypeError: 'JavaPackage' object is not callable

Peki nasıl Pencere işlevleri kullanılır? Bir HiveContext kullanarak DataFrames oluşturmalı mıyım? Eğer öyleyse, bunu nasıl yaparım? Birisi bana yanlış yaptığımı söyleyebilir mi?

* Verilerim arasında boşluk olup olmadığını bilmem gerekiyor. "Tarih" sütununa ve Tarih'e göre sıralanmış her satır için, sonraki satırda ne olduğunu bilmek istiyorum. Günler veya kötü veriler eksikse, o gündeki son günün verilerini kullanmak istiyorum. Bunu yapmanın daha iyi bir yolunu biliyorsanız, bana bildirin. Ama hala bu Pencere işlevlerinin nasıl çalıştığını bilmek istiyorum.

+0

Maalesef olsun. Özel kod eklendi. Umarım bu bizi bir yere götürür. Bir göz attığın için teşekkürler. – Nathaniel

+1

Pekala, yerel olarak kurulu olan Spark (veya Hive?) Aygıtımın nasıl bir şeyle karıştırılacağı gibi görünüyor, çünkü bunu bir DataBricks defterinde çalıştırabilirim. DataBricks kendi HiveContexts veya SQLContexts yapmamızı istemiyor. Orada çalışmak için kendi içeriklerimin oluşturulmasını bıraktım ve hiveContext'i sqlContext ile değiştirerek yukarıdaki ts_to_hive_df işlevini kullandım. Sonunda kendi kurulumumda çalışmam gerekecek. Geri gelip çözdüğümde bir çözüm yazacağım. – Nathaniel

+1

Spark serilerinin Hive desteği olmadan yapıldığı anlaşılıyor. – zero323

cevap

0

Bu daha eski bir sorudur ve bu nedenle, muhtemelen Spark'in yeni sürümlerine geçtiğinizden emin olabilirsiniz. Kıvılcım 2.0'ı kendim çalıştırıyorum, bu yüzden hile yapıyor olabilir.

Ancak olası sorunlar: 2 olası sorun. İlk örnekte, her ikisi de aradığınızdan beri, .toDF()'un SQLContext için varsayılan olduğunu düşünüyorum. İkincisi, yeniden düzenlediğinizde, işlev içinde hivecontext'i çağırıyor olabilir miydiniz?

Fonksiyonunuzun dışında çağrılan hivecontext işlevinin olması için ikinci ts_to_df işlevinizi yeniden ayarlayabilirsem, her şey yolunda.

def ts_to_df(ts): 
    data = [] 
    for line in ts['data']: 
     data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), 
       ts['name'].replace('&', '').replace(' ', '_'):line['value']}) 
    return data 

data = ts_to_df(test_ts) 
test_rdd = sc.parallelize(data).map(lambda x: Row(**x)) 
test_df = hiveContext.createDataFrame(test_rdd) 

from pyspark.sql.functions import lag, col, lead 
from pyspark.sql.window import Window 

w = Window().partitionBy().orderBy(col('Date')) 
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show() 

Ben çıktı üretmesi

+----------+ 
| Next_Date| 
+----------+ 
|2005-08-26| 
|2005-08-29| 
|2005-08-30| 
|2005-08-31| 
|2005-09-01| 
|2005-09-02| 
.....