ile bellek taşması ben önemsiz bir kıvılcım programı var. Girişi, bir satırda bir dosyaya böldüm. Yani eminim ki bu geleneksel hafıza baskısı değil.KryoException: Çok küçük girişi
Exception in thread "main" com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 32749568, required: 34359296
at com.esotericsoftware.kryo.io.Output.require(Output.java:138)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at carbonite.serializer$write_map.invoke(serializer.clj:69)
Ben spark.kryoserializer.buffer.mb
ayarlayabilirsiniz, ancak ben sadece sorunu ertelenmesi düşünüyorum. Bunu anlamak isterim.
Program hakkında standart dışı bir şey olduğunu sanmıyorum. Tek bir satırı (rasgele görünüyor) kaldırırsam hata gider. Ben sabit sınırı çeşit isabet gibiyim
görünüyor. Ama Girdi dosyası çok küçük ve benim yaptığım tek operasyonlar Sanıyorum maps
ve reduceByKey
s öngörülebilir olması başka bir şey yukarıda.
Ben Flambo Clojure 0.4.0 kitaplığı kullanıyorum (ama bu, sebep sanmıyorum) ve Core 2.10 Spark.
Burada asgari çalışma örnek. Üzgünüm biraz şifreli ama ben herşeyi yok ettim. İki parçalar halinde bu bölünmüş ve tembel dosya akışı yeniden oluşturursanız
(ns mytest.core
(:require [flambo.conf :as conf])
(:require [flambo.api :as f]))
(def sc (f/spark-context (-> (conf/spark-conf)
(conf/master "local")
(conf/app-name "test")
(conf/set "spark.driver.memory" "1g")
(conf/set "spark.executor.memory" "1g"))))
(defn -main
[& args]
(let [logfile (f/text-file sc "file://tmp/one-line-file")
a (f/map logfile (f/fn [u] nil))
b (f/map logfile (f/fn [u] nil))
c (f/map logfile (f/fn [u] nil))
d (f/map logfile (f/fn [u] nil))
e (f/map logfile (f/fn [u] nil))
g (f/map logfile (f/fn [u] nil))
h (f/map logfile (f/fn [u] nil))
i (f/map logfile (f/fn [u] nil))
j (f/map logfile (f/fn [u] nil))
k (f/map logfile (f/fn [u] nil))
l (f/map logfile (f/fn [u] nil))
m (f/map logfile (f/fn [u] nil))
n (f/map logfile (f/fn [u] nil))
o (f/map logfile (f/fn [u] nil))
p (f/map logfile (f/fn [u] nil))
q (f/map logfile (f/fn [u] nil))
r (f/map logfile (f/fn [u] nil))
s (f/map logfile (f/fn [u] nil))
t (f/map logfile (f/fn [u] nil))
]))
DÜZENLEME
, çalışır:
(defn get-inputs []
(f/text-file sc "file://tmp/one-line-file"))
(defn -main
[& args]
(let [logfile (get-inputs)
a (f/map logfile (f/fn [u] nil))
b (f/map logfile (f/fn [u] nil))
c (f/map logfile (f/fn [u] nil))
d (f/map logfile (f/fn [u] nil))
e (f/map logfile (f/fn [u] nil))
g (f/map logfile (f/fn [u] nil))
h (f/map logfile (f/fn [u] nil))
i (f/map logfile (f/fn [u] nil))])
(let [logfile (get-inputs)
j (f/map logfile (f/fn [u] nil))
k (f/map logfile (f/fn [u] nil))
l (f/map logfile (f/fn [u] nil))
m (f/map logfile (f/fn [u] nil))
n (f/map logfile (f/fn [u] nil))
o (f/map logfile (f/fn [u] nil))
p (f/map logfile (f/fn [u] nil))
q (f/map logfile (f/fn [u] nil))
r (f/map logfile (f/fn [u] nil))
s (f/map logfile (f/fn [u] nil))
t (f/map logfile (f/fn [u] nil))]))
Java'da bu ikisini yaratma eşdeğer olacaktır yerel kapsamlar (örneğin iki ayrı yöntem). Ve get-inputs
sadece yeni oluşturulmuş bir metin dosyası nesnesini döndüren bir yöntemdir.
textFile
yönteminin, birden çok kez (yeniden) okunabilen bir tembel akış oluşturacağını düşündüm, bu nedenle iki örnek çok farklı olmamalıdır. Bunu yapmış
conf.set("spark.kryoserializer.buffer.mb","128")
Teşekkür, ama bu benim soruya cevap vermez: – Joe
Dosya muhtemelen çok büyük olduğu için. Başka bir sebep bellekte/disk birleştirmeye yazılabilir. –