python 如何和java 的JVM通信 最簡單的就是RPC. JVM 作為RPC的服務(wù)端, python app 作為RPC 的客戶端. JVM 會開啟一個Socket 端口提供服務(wù), python app 只需要調(diào)用py4j 提供的client 的接口即可. (需要指出py4j 并不會啟動一個JVM, 需要java程序)
下面我們簡單介紹一下py4j 的安裝與使用
Install
- conda install py4j
- 找到py4j 的jar包 ${HOME}/anaconda3/share/py4j/py4j0.10.7.jar
執(zhí)行Java 程序
touch AdditionApplication.java
import py4j.GatewayServer;
public class AdditionApplication {
public int addition(int first, int second) {
return first + second;
}
public static void main(String[] args) {
// 熟悉的rpc registe 過程
AdditionApplication app = new AdditionApplication();
GatewayServer server = new GatewayServer(app);
server.start();
}
}
編譯AdditionApplication.java
javac -cp ${HOME}/anaconda3/share/py4j/py4j0.10.7.jar AdditionApplication.java
java -cp ${HOME}/anaconda3/share/py4j/py4j0.10.7.jar AdditionApplication
jps # 查看java 程序
lsof -p <pid> # 會看到 socket 25333 (LISTEN)
運(yùn)行python
>>> from py4j.java_gateway import JavaGateway
>>> gateway = JavaGateway() # connect to the JVM
>>> random = gateway.jvm.java.util.Random() # create a java.util.Random instance
>>> number1 = random.nextInt(10) # call the Random.nextInt method
>>> number2 = random.nextInt(10)
>>> print(number1, number2)
(2, 7)
>>> addition_app = gateway.entry_point # get the AdditionApplication instance
>>> gateway.help(gateway.jvm.AdditionApplication)
>>> value = addition_app.addition(number1, number2)) # call the addition method
>>> print(value)
9
>>> gateway.help(addition_app) # help
>>> from py4j.java_gateway import java_import
>>> java_import(gateway.jvm,'java.util.*')
>>> jList = gateway.jvm.ArrayList()
>>> gateway.help(jList)
>>> addition_app
Gateway Server創(chuàng)建的任意對象都會攜帶由服務(wù)端生成的唯一的對象id,服務(wù)端會將生成的所有對象裝在一個Map結(jié)構(gòu)里。當(dāng)Python客戶端需要操縱遠(yuǎn)程對象時,會將對象id和操縱指令以及參數(shù)一起傳遞到服務(wù)端,服務(wù)端根據(jù)對象id找到對應(yīng)的對象,然后使用反射方法執(zhí)行指令。
Py4J Memory model
Every time a Java object is sent to the Python side, a reference to the object is kept on the Java side (in the Gateway class). Once the object is garbage collected on the Python VM (reference count == 0), the reference is removed on the Java VM: if this was the last reference, the object will likely be garbage collected too. When a gateway is shut down, the remaining references are also removed on the Java VM.
Because Java objects on the Python side are involved in a circular reference (
JavaObjectandJavaMemberreference each other), these objects are not immediately garbage collected once the last reference to the object is removed (but they are guaranteed to be eventually collected if the Python garbage collector runs before the Python program exits).In doubt, users can always call the
detachfunction on the Python gateway to explicitly delete a reference on the Java side. A call to gc.collect() also usually works.
PySpark 與py4j
class SparkSession(object):
@ignore_unicode_prefix
def __init__(self, sparkContext, jsparkSession=None):
from pyspark.sql.context import SQLContext
self._sc = sparkContext
self._jsc = self._sc._jsc
self._jvm = self._sc._jvm
if jsparkSession is None:
jsparkSession = self._jvm.SparkSession(self._jsc.sc())
self._jsparkSession = jsparkSession
self._jwrapped = self._jsparkSession.sqlContext()
self._wrapped = SQLContext(self._sc, self, self._jwrapped)
_monkey_patch_RDD(self)
install_exception_handler()
SparkSession 生成流程
SparkSession.builder.getOrCreate()
def getOrCreate():
with self._lock:
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
session = SparkSession._instantiatedSession
if session is None or session._sc._jsc is None:
sparkConf = SparkConf()
for key, value in self._options.items():
sparkConf.set(key, value)
sc = SparkContext.getOrCreate(sparkConf)
# This SparkContext may be an existing one.
for key, value in self._options.items():
# we need to propagate the confs
# before we create the SparkSession. Otherwise, confs like
# warehouse path and metastore url will not be set correctly (
# these confs cannot be changed once the SparkSession is created).
sc._conf.set(key, value)
session = SparkSession(sc)
for key, value in self._options.items():
session._jsparkSession.sessionState().conf().setConfString(key, value)
for key, value in self._options.items():
session.sparkContext._conf.set(key, value)
return session
從spark._jvm 獲取gateway
>>> from py4j.java_gateway import JavaGateway
>>> g = JavaGateway(spark._jvm._gateway_client)
>>> g.help(spark._jsc)