Güncelleme: Bu benim hatamın muhtemelen Spark ve/veya Hive'u nasıl kurduğumdan kaynaklanıyor gibi görünüyor. Pencere fonksiyonları ile çalışmak bir Databricks (hosted) defterinde oldukça basit görünüyor. Bunu yerel olarak nasıl ayarlayacağımı bulmalıyım.Spark 1.5.2'de HiveContext kullanılarak yapılan bir PySpark DataFrame ürününü nasıl edinebilirim?
Pencere işlevini kullanmam gereken bir Spark DataFrame'i var. * here üzerindeki yönergeleri izlemeye çalıştım, ancak bazı sorunlara rastladım.
benim ortamı oluşturulması: Bir DataFrame içine json açmak için
test_ts = {'adminDistrict': None,
'city': None,
'country': {'code': 'NA', 'name': 'UNKNOWN'},
'data': [{'timestamp': '2005-08-25T00:00:00Z', 'value': 369.89},
{'timestamp': '2005-08-26T00:00:00Z', 'value': 362.44},
{'timestamp': '2005-08-29T00:00:00Z', 'value': 368.3},
{'timestamp': '2005-08-30T00:00:00Z', 'value': 382.6},
{'timestamp': '2005-08-31T00:00:00Z', 'value': 377.84},
{'timestamp': '2005-09-01T00:00:00Z', 'value': 380.74},
{'timestamp': '2005-09-02T00:00:00Z', 'value': 370.33},
{'timestamp': '2005-09-05T00:00:00Z', 'value': 370.33},
{'timestamp': '2005-09-06T00:00:00Z', 'value': 361.5},
{'timestamp': '2005-09-07T00:00:00Z', 'value': 352.79},
{'timestamp': '2005-09-08T00:00:00Z', 'value': 354.3},
{'timestamp': '2005-09-09T00:00:00Z', 'value': 353.0},
{'timestamp': '2005-09-12T00:00:00Z', 'value': 349.35},
{'timestamp': '2005-09-13T00:00:00Z', 'value': 348.82},
{'timestamp': '2005-09-14T00:00:00Z', 'value': 360.24},
{'timestamp': '2005-09-15T00:00:00Z', 'value': 357.61},
{'timestamp': '2005-09-16T00:00:00Z', 'value': 347.14},
{'timestamp': '2005-09-19T00:00:00Z', 'value': 370.0},
{'timestamp': '2005-09-20T00:00:00Z', 'value': 362.82},
{'timestamp': '2005-09-21T00:00:00Z', 'value': 366.11},
{'timestamp': '2005-09-22T00:00:00Z', 'value': 364.46},
{'timestamp': '2005-09-23T00:00:00Z', 'value': 351.8},
{'timestamp': '2005-09-26T00:00:00Z', 'value': 360.74},
{'timestamp': '2005-09-27T00:00:00Z', 'value': 356.63},
{'timestamp': '2005-09-28T00:00:00Z', 'value': 363.64},
{'timestamp': '2005-09-29T00:00:00Z', 'value': 366.05}],
'maxDate': '2015-12-28T00:00:00Z',
'minDate': '2005-08-25T00:00:00Z',
'name': 'S&P GSCI Crude Oil Spot',
'offset': 0,
'resolution': 'DAY',
'sources': ['trf'],
'subtype': 'Index',
'type': 'Commodities',
'uid': 'TRF_INDEX_Z39824_PI'}
bir işlevi:
import os
import sys
import datetime as dt
os.environ["SPARK_HOME"] = '/usr/bin/spark-1.5.2'
os.environ["PYTHONPATH"] = '/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip'
sys.path.append('/usr/bin/spark-1.5.2/python')
sys.path.append('/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip')
import pyspark
sc = pyspark.SparkContext()
hiveContext = pyspark.sql.HiveContext(sc)
sqlContext = pyspark.sql.SQLContext(sc)
from pyspark.sql import Row
from pyspark.sql.functions import struct
from pyspark.sql import DataFrame
from collections import OrderedDict
benim verilerini ayarlama bir dataframe alınıyor
def ts_to_df(ts):
data = []
for line in ts['data']:
data.append((dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), line['value']))
return sc.parallelize(data).toDF(['Date', ts['name'].replace('&', '').replace(' ', '_')])
ve alma İçinde ne bir bakış:
bana bu hata veriyorfrom pyspark.sql.functions import lag, col, lead
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy(col('Date'))
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show()
: Bu bana gösteriyor
test_df = ts_to_df(test_ts)
test_df.show()
: Ben yaptığım hakkında hiçbir fikrim yok ve her şey ters gitmeye başladığı burada
+----------+----------------------+
| Date|SP_GSCI_Crude_Oil_Spot|
+----------+----------------------+
|2005-08-25| 369.89|
|2005-08-26| 362.44|
|2005-08-29| 368.3|
|2005-08-30| 382.6|
|2005-08-31| 377.84|
|2005-09-01| 380.74|
|2005-09-02| 370.33|
|2005-09-05| 370.33|
|2005-09-06| 361.5|
|2005-09-07| 352.79|
|2005-09-08| 354.3|
|2005-09-09| 353.0|
|2005-09-12| 349.35|
|2005-09-13| 348.82|
|2005-09-14| 360.24|
|2005-09-15| 357.61|
|2005-09-16| 347.14|
|2005-09-19| 370.0|
|2005-09-20| 362.82|
|2005-09-21| 366.11|
+----------+----------------------+
Ve geçerli:
Py4JJavaError: An error occurred while calling o59.select. : org.apache.spark.sql.AnalysisException: Could not resolve window function 'lead'. Note that, using window functions currently requires a HiveContext;
Bu yüzden bir HiveContext'e ihtiyacım var gibi görünüyor, değil mi? DataFrame'imi bir HiveContext kullanarak oluşturmam gerekiyor mu?
def ts_to_hive_df(ts):
data = []
for line in ts['data']:
data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(),
ts['name'].replace('&', '').replace(' ', '_'):line['value']})
temp_rdd = sc.parallelize(data).map(lambda x: Row(**x))
return hiveContext.createDataFrame(temp_rdd)
test_df = ts_to_hive_df(test_ts)
test_df.show()
Ama bu bana bu hata veriyor: O zaman HiveContext kullanarak açıkça DataFrame oluşturmaya çalışalım
TypeError: 'JavaPackage' object is not callable
Peki nasıl Pencere işlevleri kullanılır? Bir HiveContext kullanarak DataFrames oluşturmalı mıyım? Eğer öyleyse, bunu nasıl yaparım? Birisi bana yanlış yaptığımı söyleyebilir mi?
* Verilerim arasında boşluk olup olmadığını bilmem gerekiyor. "Tarih" sütununa ve Tarih'e göre sıralanmış her satır için, sonraki satırda ne olduğunu bilmek istiyorum. Günler veya kötü veriler eksikse, o gündeki son günün verilerini kullanmak istiyorum. Bunu yapmanın daha iyi bir yolunu biliyorsanız, bana bildirin. Ama hala bu Pencere işlevlerinin nasıl çalıştığını bilmek istiyorum.
Maalesef olsun. Özel kod eklendi. Umarım bu bizi bir yere götürür. Bir göz attığın için teşekkürler. – Nathaniel
Pekala, yerel olarak kurulu olan Spark (veya Hive?) Aygıtımın nasıl bir şeyle karıştırılacağı gibi görünüyor, çünkü bunu bir DataBricks defterinde çalıştırabilirim. DataBricks kendi HiveContexts veya SQLContexts yapmamızı istemiyor. Orada çalışmak için kendi içeriklerimin oluşturulmasını bıraktım ve hiveContext'i sqlContext ile değiştirerek yukarıdaki ts_to_hive_df işlevini kullandım. Sonunda kendi kurulumumda çalışmam gerekecek. Geri gelip çözdüğümde bir çözüm yazacağım. – Nathaniel
Spark serilerinin Hive desteği olmadan yapıldığı anlaşılıyor. – zero323