2

PySpark'ta kullanmak için Scala'da yazılmış bir UDF (veya işlevi) kaydetmek mümkün mü? Ör:PySpark'ta kullanmak için Scala'dan SqlContext'e UDF'yi kaydet

Scala yılında
val mytable = sc.parallelize(1 to 2).toDF("spam") 
mytable.registerTempTable("mytable") 
def addOne(m: Integer): Integer = m + 1 
// Spam: 1, 2 

aşağıdaki artık mümkün:

val UDFaddOne = sqlContext.udf.register("UDFaddOne", addOne _) 
val mybiggertable = mytable.withColumn("moreSpam", UDFaddOne(mytable("spam"))) 
// Spam: 1, 2 
// moreSpam: 2, 3 

Ben

%pyspark 

mytable = sqlContext.table("mytable") 
UDFaddOne = sqlContext.udf("UDFaddOne") # does not work 
mybiggertable = mytable.withColumn("+1", UDFaddOne(mytable("spam"))) # does not work 

Arka Plan gibi PySpark içinde "UDFaddOne" kullanmak istiyorum: Biz bir takımız Geliştiricilerin, bazılarının Scala'da ve bazılarının Python'da kodlanması ve önceden yazılmış işlevleri paylaşmak istiyoruz. Bunu bir kütüphaneye kaydetmek ve ithal etmek de mümkündür.

cevap

3

Bildiğim kadarıyla PySpark, callUDF işlevinin herhangi bir eşdeğerini sağlamaz ve bu nedenle, doğrudan kayıtlı UDF'ye erişmek mümkün değildir. Eğer daha karmaşık iş akışlarını desteklemek için gerekirse bir paket oluşturmak ve eksiksiz Python sarmalayıcıları vermelidir böylece

mytable.withColumn("moreSpam", expr("UDFaddOne({})".format("spam"))) 

## OR 
sqlContext.sql("SELECT *, UDFaddOne(spam) AS moreSpam FROM mytable") 

## OR 
mytable.selectExpr("*", "UDFaddOne(spam) AS moreSpam") 

Bu yaklaşım oldukça sınırlıdır:

burada basit çözüm ham SQL ifadesini kullanmaktır. Bulabilir ve Spark: How to map Python with Scala or Java User Defined Functions?

+0

Bu ve diğer cevaplarınız için teşekkür ederim - önerdiğin gibi çözdüm! – Andarin

3

cevabım örnek UDAF sarıcı edeceğiz aşağıdaki benim için çalıştı (temelde zero323 tarafından sağlanan bağlantının yanı sıra birden fazla yerde bir özet):

scala olarak:

package com.example 
import org.apache.spark.sql.functions.udf 

object udfObj extends Serializable { 
    def createUDF = { 
    udf((x: Int) => x + 1) 
    } 
} 
python

(sc kıvılcım bağlam olduğunu varsayalım Eğer kıvılcım 2.0 kullanıyorsanız, kıvılcım oturumdan bunu elde edebilirsiniz.):

from py4j.java_gateway import java_import 
from pyspark.sql.column import Column 

jvm = sc._gateway.jvm 
java_import(jvm, "com.example") 
def udf_f(col): 
    return Column(jvm.com.example.udfObj.createUDF().apply(col)) 

Ve tabii

s yapmak Ure Scala oluşturulan kavanoz --jars ve --driver sınıf-yolu

Burada ne olur kullanılarak eklenir:

Biz scala UDF döndüren bir seri hale getirilebilir nesnesinin içinde bir işlev oluşturmak (Ben % 100 emin değil Serializable gereklidir, daha karmaşık UDF için benim için gerekliydi, bu yüzden java nesnelerini geçmesi gerekiyordu).

Python'da, dahili jvm'ye erişim (bu özel bir üyedir, bu nedenle gelecekte değiştirilebildiğim halde bunu göremiyorum) ve paketimizi java_import kullanarak içe aktarıyoruz. createUDF işlevine erişiriz ve onu çağırırız. Bu, uygulama yöntemine sahip bir nesne oluşturur (scala'daki işlevler, gerçekleme yöntemiyle birlikte java nesneleridir). Başvuru yönteminin girdisi bir sütundur. Sütunun uygulanmasının sonucu yeni bir sütun olduğundan, Sütun yöntemiyle kullanılabilir hale getirmek için Sütun yöntemiyle sarmamız gerekir.

+0

Bu gerçekten biraz hacky görünüyor ve test kodundan başka bir şekilde kullanacağımdan emin değilim, ama iç işleyişi biraz daha anladım, bu yüzden bunun için teşekkürler! – Andarin

+0

pysaprk 2.1.1, üzerinde çalıştırmak için denedim Şu hatayı alıyorum: 'TypeError: 'Column' nesnesinin 'callable değil' kıvılcım kod tabanı, sütun nesnesini oluşturamaz gibi görünüyor –

+0

Bu temelde anlamına gelir İthalatla ilgili bir sorun var.ya kavanozlar sınıf yolunda değil ya da isim yanlış ya da benzer bir şey. –