首页 > 其他分享 >死磕rmi之远程对象

死磕rmi之远程对象

时间:2023-01-11 17:36:50浏览次数:51  
标签:TCPTransport var1 Log var3 var2 对象 new rmi 远程

远程对象

远程对象是啥

继承了UnicastRemoteObject的对象都可称为远程对象

让一个对象继承UnicastRemoteObject的能力,就是把自己发布出去。

UnicastRemoteObject初始化

构造方法

调用exportObject。

/**
 * Creates and exports a new UnicastRemoteObject object using the
 * particular supplied port.
 *
 * <p>The object is exported with a server socket
 * created using the {@link RMISocketFactory} class.
 *
 * @param port the port number on which the remote object receives calls
 * (if <code>port</code> is zero, an anonymous port is chosen)
 * @throws RemoteException if failed to export object
 * @since 1.2
 */
protected UnicastRemoteObject(int port) throws RemoteException
{
    this.port = port;
    exportObject((Remote) this, port);
}

java.rmi.server.UnicastRemoteObject#exportObject(java.rmi.Remote, int)

目的是接受remote call

/**
 * Exports the remote object to make it available to receive incoming
 * calls, using the particular supplied port.
 *
 * <p>The object is exported with a server socket
 * created using the {@link RMISocketFactory} class.
 *
 * @param obj the remote object to be exported
 * @param port the port to export the object on
 * @return remote object stub
 * @exception RemoteException if export fails
 * @since 1.2
 */
public static Remote exportObject(Remote obj, int port)
    throws RemoteException
{
    return exportObject(obj, new UnicastServerRef(port));
}

查看一下sun.rmi.server.UnicastServerRef#UnicastServerRef(int)

public class UnicastServerRef extends UnicastRef implements ServerRef, Dispatcher {

这里调用的构造方法

public UnicastServerRef(int var1) {
    super(new LiveRef(var1));
    this.forceStubUse = false;
    this.hashToMethod_Map = null;
    this.methodCallIDCount = new AtomicInteger(0);
    this.filter = null;
}

查看一下super方法

this.ref

public UnicastRef(LiveRef var1) {
    this.ref = var1;
}

new LiveRef方法最终会调用。var1为new ObjID()。var2为TCPEndpoint.getLocalEndpoint(port),类型为TCPEndpoint。var3为true

    public LiveRef(ObjID var1, Endpoint var2, boolean var3) {
        this.ep = var2;
        this.id = var1;
        this.isLocal = var3;
    }

getLocalEndpoint

查看一下

public static TCPEndpoint getLocalEndpoint(int var0) {
    return getLocalEndpoint(var0, (RMIClientSocketFactory)null, (RMIServerSocketFactory)null);
}

public static TCPEndpoint getLocalEndpoint(int var0, RMIClientSocketFactory var1, RMIServerSocketFactory var2) {
    TCPEndpoint var3 = null;
    synchronized(localEndpoints) {
    //只有var0为port,传入有值,其他均为null
        TCPEndpoint var5 = new TCPEndpoint((String)null, var0, var1, var2);
        //localEndpoints存放的是TCPEndpoint
        LinkedList var6 = (LinkedList)localEndpoints.get(var5);
        //是否配置了java.rmi.server.hostname
        //重新采样主机host
        String var7 = resampleLocalHost();
        //如果不存在TCPEndpoint,这里重写了equals方法。吐槽一下这个equals方法,垃圾。
        if (var6 == null) {
            //host,port,null,null
            var3 = new TCPEndpoint(var7, var0, var1, var2);
            var6 = new LinkedList();
            var6.add(var3);
            var3.listenPort = var0;
            //transport保存自身,这不来个循环引用?
            var3.transport = new TCPTransport(var6);
            //key为TCPEndpoint,value为TCPTransport
            localEndpoints.put(var5, var6);
            if (TCPTransport.tcpLog.isLoggable(Log.BRIEF)) {
                TCPTransport.tcpLog.log(Log.BRIEF, "created local endpoint for socket factory " + var2 + " on port " + var0);
            }
        } else {
            //如果get到了
            synchronized(var6) {
                var3 = (TCPEndpoint)var6.getLast();
                String var9 = var3.host;
                int var10 = var3.port;
                TCPTransport var11 = var3.transport;
                //如果host变了,就重新创建TCPEndpoint,把这个放到链表的末尾
                if (var7 != null && !var7.equals(var9)) {
                    if (var10 != 0) {
                        var6.clear();
                    }

                    var3 = new TCPEndpoint(var7, var10, var1, var2);
                    var3.listenPort = var0;
                    var3.transport = var11;
                    var6.add(var3);
                }
            }
        }

        return var3;
    }
}

sun.rmi.transport.tcp.TCPEndpoint构造方法

public TCPEndpoint(String var1, int var2, RMIClientSocketFactory var3, RMIServerSocketFactory var4) {
    this.listenPort = -1;
    this.transport = null;
    if (var1 == null) {
        var1 = "";
    }

    this.host = var1;
    this.port = var2;
    this.csf = var3;
    this.ssf = var4;
}

java.rmi.server.UnicastRemoteObject#exportObject(java.rmi.Remote, sun.rmi.server.UnicastServerRef)

这里是在sref上暴露remoteObject。这个remoteObject就是我们的远程对象。

/**
 * Exports the specified object using the specified server ref.
 */
private static Remote exportObject(Remote obj, UnicastServerRef sref)
    throws RemoteException
{
    // if obj extends UnicastRemoteObject, set its ref.
    if (obj instanceof UnicastRemoteObject) {
        ((UnicastRemoteObject) obj).ref = sref;
    }
    return sref.exportObject(obj, null, false);
}

sun.rmi.server.UnicastServerRef#exportObject(java.rmi.Remote, java.lang.Object, boolean)

看看如何创建代理对象的。

public Remote exportObject(Remote var1, Object var2, boolean var3) throws RemoteException {
    Class var4 = var1.getClass();

    Remote var5;
    try {
    //var5为远程对象的代理对象,Target里装入的也是代理对象
    //var4为远程对象的class
    //getClientRef返回new UnicastRef(this.ref)
    //forceStubUse在构造方法里初始化为false
        var5 = Util.createProxy(var4, this.getClientRef(), this.forceStubUse);
    } catch (IllegalArgumentException var7) {
        throw new ExportException("remote object implements illegal remote interface", var7);
    }
	//判断var5是否为RemoteStub
    if (var5 instanceof RemoteStub) {
        this.setSkeleton(var1);
    }
    //UnicastServerRef对象的this指针,为disp
    //动态代理类为stub
	//var3为false
    Target var6 = new Target(var1, this, var5, this.ref.getObjID(), var3);
    //暴露出去代理对象
    //调用this.ep.exportObject(var1)。ep就是上面创建的TCPEndPoint
    //进入TCPEndPoint,调用this.transport.exportObject(var1)
    //创建listen,这里包含socket,以及socket的处理逻辑。exportCount++。
    //target设置 setExportedTransport,为this
    //ObjectTable存入target。objTable存入getObjectEndpoint->target。implTable存入getWeakImpl->target。
    //getObjectEndpoint为new ObjectEndpoint(this.id, this.exportedTransport)。id就是ObjID,this.exportedTransport就是this。
    //getWeakImpl返回WeakRef,在构造方法里可以看到,为WeakRef(var1, ObjectTable.reapQueue),var1为remote,就是我们实现的远程对象。
    this.ref.exportObject(var6);
    
    //hashToMethod_Maps类型为 WeakClassHashMap<Map<Long, Method>>
    //get不到会把var4 put进去。
    this.hashToMethod_Map = (Map)hashToMethod_Maps.get(var4);
    return var5;
}

sun.rmi.transport.Target#Target

暴露的target类的构造方法

public Target(Remote var1, Dispatcher var2, Remote var3, ObjID var4, boolean var5) {
    this.weakImpl = new WeakRef(var1, ObjectTable.reapQueue);
    this.disp = var2;
    this.stub = var3;
    this.id = var4;
    this.acc = AccessController.getContext();
    ClassLoader var6 = Thread.currentThread().getContextClassLoader();
    ClassLoader var7 = var1.getClass().getClassLoader();
    //如果var6的祖先加载器有var7
    if (checkLoaderAncestry(var6, var7)) {
        this.ccl = var6;
    } else {
        this.ccl = var7;
    }
//false,不是永久的
    this.permanent = var5;
    if (var5) {
        this.pinImpl();
    }

}

sun.rmi.server.Util#createProxy

public static Remote createProxy(Class<?> var0, RemoteRef var1, boolean var2) throws StubNotFoundException {
    Class var3;
    try {
    //如果var0实现了Remote接口,那么直接返回var0,否则返回实现了Remote接口的超类,否则报错。
        var3 = getRemoteClass(var0);
    } catch (ClassNotFoundException var9) {
        throw new StubNotFoundException("object does not implement a remote interface: " + var0.getName());
    }
//var2为false
//ignoreStubClasses在静态块里取值为java.rmi.server.ignoreStubClasses
//withoutStubs类型为Map<Class<?>, Void>,在静态块里取值为Collections.synchronizedMap(new WeakHashMap(11)
//如果withoutStubs不包含var3,那么就尝试加载var3.getName+"_Stub"这个类,加载成功返回true,否则存入withoutStubs,返回false
//withoutStubs包含var3直接返回false
//所以如果是远程对象,不走这条。这里将逻辑杂糅,不太好。
    if (var2 || !ignoreStubClasses && stubClassExists(var3)) {
        return createStub(var3, var1);
    } else {
    //远程对象代理。
        final ClassLoader var4 = var0.getClassLoader();
        //只代理实现了remote的类或接口,要代理的类,这些类的方法都会被代理
        final Class[] var5 = getRemoteInterfaces(var0);
        //var1为UnicastRef。remote ref
        //代理类。继承RemoteObject,实现InvocationHandler
        final RemoteObjectInvocationHandler var6 = new RemoteObjectInvocationHandler(var1);

        try {
            return (Remote)AccessController.doPrivileged(new PrivilegedAction<Remote>() {
                public Remote run() {
                //动态代理的典型使用
                //会调用ref.invoke接口。UnicastRef.invoke方法
                    return (Remote)Proxy.newProxyInstance(var4, var5, var6);
                }
            });
        } catch (IllegalArgumentException var8) {
            throw new StubNotFoundException("unable to create proxy", var8);
        }
    }
}

sun.rmi.server.UnicastRef#invoke(java.rmi.Remote, java.lang.reflect.Method, java.lang.Object[], long)

//var1为远程对象
//var3为参数
//var4为getMethodHash(method))
public Object invoke(Remote var1, Method var2, Object[] var3, long var4) throws Exception {
    if (clientRefLog.isLoggable(Log.VERBOSE)) {
        clientRefLog.log(Log.VERBOSE, "method: " + var2);
    }

    if (clientCallLog.isLoggable(Log.VERBOSE)) {
        this.logClientCall(var1, var2);
    }
//TCPChannel:this.ep.newSocket()
    Connection var6 = this.ref.getChannel().newConnection();
    StreamRemoteCall var7 = null;
    boolean var8 = true;
    boolean var9 = false;

    Object var13;
    try {
        if (clientRefLog.isLoggable(Log.VERBOSE)) {
            clientRefLog.log(Log.VERBOSE, "opnum = " + var4);
        }
//向var6里输出80
//使用ObjectOutput输出8字节objNum,再输出space,4字节unique,8字节time,2字节count
//再输出4字节-1,8字节getMethodHash(method))
        var7 = new StreamRemoteCall(var6, this.ref.getObjID(), -1, var4);

        Object var11;
        try {
            ObjectOutput var10 = var7.getOutputStream();
            this.marshalCustomCallData(var10);
            //参数类型
            var11 = var2.getParameterTypes();

            for(int var12 = 0; var12 < ((Object[])var11).length; ++var12) {
            //参数类型,参数值,输出流
            //向var10里输出序列化的参数
                marshalValue((Class)((Object[])var11)[var12], var3[var12], var10);
            }
        } catch (IOException var39) {
            clientRefLog.log(Log.BRIEF, "IOException marshalling arguments: ", var39);
            throw new MarshalException("error marshalling arguments", var39);
        }
//这个查看一下
        var7.executeCall();

        try {
            Class var46 = var2.getReturnType();
            if (var46 == Void.TYPE) {
                var11 = null;
                return var11;
            }
//从输入流里获取返回值
            var11 = var7.getInputStream();
            Object var47 = unmarshalValue(var46, (ObjectInput)var11);
            var9 = true;
            clientRefLog.log(Log.BRIEF, "free connection (reuse = true)");
            this.ref.getChannel().free(var6, true);
            var13 = var47;
        } catch (ClassNotFoundException | IOException var40) {
            ((StreamRemoteCall)var7).discardPendingRefs();
            clientRefLog.log(Log.BRIEF, var40.getClass().getName() + " unmarshalling return: ", var40);
            throw new UnmarshalException("error unmarshalling return", var40);
        } finally {
            try {
                var7.done();
            } catch (IOException var38) {
                var8 = false;
            }

        }
    } catch (RuntimeException var42) {
        if (var7 == null || ((StreamRemoteCall)var7).getServerException() != var42) {
            var8 = false;
        }

        throw var42;
    } catch (RemoteException var43) {
        var8 = false;
        throw var43;
    } catch (Error var44) {
        var8 = false;
        throw var44;
    } finally {
        if (!var9) {
            if (clientRefLog.isLoggable(Log.BRIEF)) {
                clientRefLog.log(Log.BRIEF, "free connection (reuse = " + var8 + ")");
            }

            this.ref.getChannel().free(var6, var8);
        }

    }

    return var13;
}

sun.rmi.transport.StreamRemoteCall#executeCall

public void executeCall() throws Exception {
    DGCAckHandler var2 = null;

    byte var1;
    try {
        if (this.out != null) {
            var2 = this.out.getDGCAckHandler();
        }

        this.releaseOutputStream();
        DataInputStream var3 = new DataInputStream(this.conn.getInputStream());
        //读1个字节
        byte var4 = var3.readByte();
        if (var4 != 81) {
            if (Transport.transportLog.isLoggable(Log.BRIEF)) {
                Transport.transportLog.log(Log.BRIEF, "transport return code invalid: " + var4);
            }

            throw new UnmarshalException("Transport return code invalid");
        }

        this.getInputStream();
        //再读1个字节
        var1 = this.in.readByte();
        //读objID
        this.in.readID();
    } catch (UnmarshalException var11) {
        throw var11;
    } catch (IOException var12) {
        throw new UnmarshalException("Error unmarshaling return header", var12);
    } finally {
        if (var2 != null) {
            var2.release();
        }

    }
//判断是否有报错
    switch (var1) {
        case 1:
            return;
        case 2:
            Object var14;
            try {
            //读一个对象
                var14 = this.in.readObject();
            } catch (Exception var10) {
                this.discardPendingRefs();
                throw new UnmarshalException("Error unmarshaling return", var10);
            }

            if (!(var14 instanceof Exception)) {
                this.discardPendingRefs();
                throw new UnmarshalException("Return type not Exception");
            } else {
                this.exceptionReceivedFromServer((Exception)var14);
            }
        default:
            if (Transport.transportLog.isLoggable(Log.BRIEF)) {
                Transport.transportLog.log(Log.BRIEF, "return code invalid: " + var1);
            }

            throw new UnmarshalException("Return code invalid");
    }
}

sun.rmi.transport.tcp.TCPTransport#listen

此处起了一个线程用来监听socket,处理远程请求。

private void listen() throws RemoteException {
    assert Thread.holdsLock(this);

    TCPEndpoint var1 = this.getEndpoint();
    int var2 = var1.getPort();
    if (this.server == null) {
        if (tcpLog.isLoggable(Log.BRIEF)) {
            tcpLog.log(Log.BRIEF, "(port " + var2 + ") create server socket");
        }

        try {
        //server的实现是调用 sun.rmi.transport.proxy.RMIMasterSocketFactory,最终创建ServerSocket
            this.server = var1.newServerSocket();
            //此处用的是多线程阻塞I/O的方式
            //AcceptLoop的run方法就是调用sun.rmi.transport.tcp.TCPTransport.AcceptLoop#executeAcceptLoop
            Thread var3 = (Thread)AccessController.doPrivileged(new NewThreadAction(new AcceptLoop(this.server), "TCP Accept-" + var2, true));
            var3.start();
        } catch (BindException var4) {
            throw new ExportException("Port already in use: " + var2, var4);
        } catch (IOException var5) {
            throw new ExportException("Listen failed on port: " + var2, var5);
        }
    } else {
        SecurityManager var6 = System.getSecurityManager();
        if (var6 != null) {
            var6.checkListen(var2);
        }
    }

}

sun.rmi.transport.tcp.TCPTransport.AcceptLoop#executeAcceptLoop

注意到使用ExecutorService,查看初始化,注意到

1、如果没有设置sun.rmi.transport.tcp.maxConnectionThreads参数,最大线程数可为Int的最大值。

2、在其中执行的线程有特权。

3、放到ExecutorService里的run方法最后会调用ConnectionHandler.run0。这里是处理远程调用的真正地方。

private void executeAcceptLoop() {
    if (TCPTransport.tcpLog.isLoggable(Log.BRIEF)) {
        TCPTransport.tcpLog.log(Log.BRIEF, "listening on port " + TCPTransport.this.getEndpoint().getPort());
    }

    while(true) {
        Socket var1 = null;

        try {
            var1 = this.serverSocket.accept();
            InetAddress var16 = var1.getInetAddress();
            String var3 = var16 != null ? var16.getHostAddress() : "0.0.0.0";

            try {
            //var1为建立好连接的socket,var3为ip。
                TCPTransport.connectionThreadPool.execute(TCPTransport.this.new ConnectionHandler(var1, var3));
            } catch (RejectedExecutionException var11) {
                TCPTransport.closeSocket(var1);
                TCPTransport.tcpLog.log(Log.BRIEF, "rejected connection from " + var3);
            }
        } catch (Throwable var15) {
            Throwable var2 = var15;

            try {
                if (this.serverSocket.isClosed()) {
                    return;
                }

                try {
                    if (TCPTransport.tcpLog.isLoggable(Level.WARNING)) {
                        TCPTransport.tcpLog.log(Level.WARNING, "accept loop for " + this.serverSocket + " throws", var2);
                    }
                } catch (Throwable var13) {
                }
            } finally {
                if (var1 != null) {
                    TCPTransport.closeSocket(var1);
                }

            }

            if (!(var15 instanceof SecurityException)) {
                try {
                    TCPEndpoint.shedConnectionCaches();
                } catch (Throwable var12) {
                }
            }

            if (!(var15 instanceof Exception) && !(var15 instanceof OutOfMemoryError) && !(var15 instanceof NoClassDefFoundError)) {
                if (var15 instanceof Error) {
                    throw (Error)var15;
                }

                throw new UndeclaredThrowableException(var15);
            }

            if (!this.continueAfterAcceptFailure(var15)) {
                return;
            }
        }
    }
}

UnicastRemoteObject执行逻辑

sun.rmi.transport.tcp.TCPTransport.ConnectionHandler#run0

    private void run0() {
        TCPEndpoint var1 = TCPTransport.this.getEndpoint();
        int var2 = var1.getPort();
        TCPTransport.threadConnectionHandler.set(this);

        try {
            this.socket.setTcpNoDelay(true);
        } catch (Exception var31) {
        }

        try {
            if (TCPTransport.connectionReadTimeout > 0) {
                this.socket.setSoTimeout(TCPTransport.connectionReadTimeout);
            }
        } catch (Exception var30) {
        }
        try {
            //获取socket的输入流,返回SocketInputStream
            InputStream var3 = this.socket.getInputStream();
            //markSupported 判断该输入流能支持mark 和 reset 方法。
            Object var4 = var3.markSupported() ? var3 : new BufferedInputStream(var3);
            //这里为啥首先缓冲4个字节?
            ((InputStream) var4).mark(4);
            //装饰成DataInputStream
            DataInputStream var5 = new DataInputStream((InputStream) var4);
            //读4个字节
            int var6 = var5.readInt();
            //16进制转字符为POST
            if (var6 == 1347375956) {
                if (TCPTransport.disableIncomingHttp) {
                    throw new RemoteException("RMI over HTTP is disabled");
                }

                TCPTransport.tcpLog.log(Log.BRIEF, "decoding HTTP-wrapped call");
                ((InputStream) var4).reset();

                try {
                    this.socket = new HttpReceiveSocket(this.socket, (InputStream) var4, (OutputStream) null);
                    this.remoteHost = "0.0.0.0";
                    var3 = this.socket.getInputStream();
                    var4 = new BufferedInputStream(var3);
                    var5 = new DataInputStream((InputStream) var4);
                    var6 = var5.readInt();
                } catch (IOException var29) {
                    throw new RemoteException("Error HTTP-unwrapping call", var29);
                }
            }
//读2个字节
            short var7 = var5.readShort();
            //16进制转字符为JRMP
            if (var6 == 1246907721 && var7 == 2) {
                OutputStream var8 = this.socket.getOutputStream();
                BufferedOutputStream var9 = new BufferedOutputStream(var8);
                //var10为输出流
                DataOutputStream var10 = new DataOutputStream(var9);
                int var11 = this.socket.getPort();
                if (TCPTransport.tcpLog.isLoggable(Log.BRIEF)) {
                    TCPTransport.tcpLog.log(Log.BRIEF, "accepted socket from [" + this.remoteHost + ":" + var11 + "]");
                }
//读1个字节
                byte var15 = var5.readByte();
                TCPEndpoint var12;
                TCPChannel var13;
                TCPConnection var14;
                switch (var15) {
                    case 75:
                        //回写78
                        var10.writeByte(78);
                        if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
                            TCPTransport.tcpLog.log(Log.VERBOSE, "(port " + var2 + ") suggesting " + this.remoteHost + ":" + var11);
                        }
//回写主机host
                        var10.writeUTF(this.remoteHost);
                        //回写主机port
                        var10.writeInt(var11);
                        //输出
                        var10.flush();
                        //readUTF,先读两个字节表示utf的len,然后读取utf_len,然后将byte转int,
                        String var16 = var5.readUTF();
                        //读4个字节
                        int var17 = var5.readInt();
                        if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
                            TCPTransport.tcpLog.log(Log.VERBOSE, "(port " + var2 + ") client using " + var16 + ":" + var17);
                        }
                        var12 = new TCPEndpoint(this.remoteHost, this.socket.getLocalPort(), var1.getClientSocketFactory(), var1.getServerSocketFactory());
                        var13 = new TCPChannel(TCPTransport.this, var12);
                        //var4,var9分别为this.socket的input,output,作为TCPConnection的input、output
                        var14 = new TCPConnection(var13, this.socket, (InputStream) var4, var9);
                        //读一个字节,转为int,分别判断80,81,82,83,84,后续对此方法做详细分析
                        //类名.this一般用于内部类调用外部类的对象时使用
                        TCPTransport.this.handleMessages(var14, true);
                        return;
                    case 76:
                        //对比75少了回写远程主机信息
                        var12 = new TCPEndpoint(this.remoteHost, this.socket.getLocalPort(), var1.getClientSocketFactory(), var1.getServerSocketFactory());
                        var13 = new TCPChannel(TCPTransport.this, var12);
                        var14 = new TCPConnection(var13, this.socket, (InputStream) var4, var9);
                        TCPTransport.this.handleMessages(var14, false);
                        return;
                    case 77:
                        //回写主机信息
                        if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
                            TCPTransport.tcpLog.log(Log.VERBOSE, "(port " + var2 + ") accepting multiplex protocol");
                        }

                        var10.writeByte(78);
                        if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
                            TCPTransport.tcpLog.log(Log.VERBOSE, "(port " + var2 + ") suggesting " + this.remoteHost + ":" + var11);
                        }

                        var10.writeUTF(this.remoteHost);
                        var10.writeInt(var11);
                        var10.flush();
                        var12 = new TCPEndpoint(var5.readUTF(), var5.readInt(), var1.getClientSocketFactory(), var1.getServerSocketFactory());
                        if (TCPTransport.tcpLog.isLoggable(Log.VERBOSE)) {
                            TCPTransport.tcpLog.log(Log.VERBOSE, "(port " + var2 + ") client using " + var12.getHost() + ":" + var12.getPort());
                        }
//这里是新增逻辑,多路复用,单独讲解。
                        ConnectionMultiplexer var18;
                        synchronized (TCPTransport.this.channelTable) {
                            var13 = TCPTransport.this.getChannel(var12);
                            //var4,var8是input、output作为ConnectionMultiplexer的input、output
                            var18 = new ConnectionMultiplexer(var13, (InputStream) var4, var8, false);
                            var13.useMultiplexer(var18);
                        }

                        var18.run();
                        return;
                    default:
                        //只写一个79
                        var10.writeByte(79);
                        var10.flush();
                        return;
                }
            }

            TCPTransport.closeSocket(this.socket);
        } catch (IOException var32) {
            TCPTransport.tcpLog.log(Log.BRIEF, "terminated with exception:", var32);
            return;
        } finally {
            TCPTransport.closeSocket(this.socket);
        }
    }

sun.rmi.transport.tcp.TCPTransport#handleMessages

    void handleMessages(Connection var1, boolean var2) {
        int var3 = this.getEndpoint().getPort();

        try {
            DataInputStream var4 = new DataInputStream(var1.getInputStream());
//do while循环,如果var5不为break中的任意一个,是否可以造成dos攻击。
            do {
            //读一个字节
                int var5 = var4.read();
                if (var5 == -1) {
                    if (tcpLog.isLoggable(Log.BRIEF)) {
                        tcpLog.log(Log.BRIEF, "(port " + var3 + ") connection closed");
                    }
                    break;
                }

                if (tcpLog.isLoggable(Log.BRIEF)) {
                    tcpLog.log(Log.BRIEF, "(port " + var3 + ") op = " + var5);
                }
//判断
                switch (var5) {
                    case 80:
                        StreamRemoteCall var6 = new StreamRemoteCall(var1);
                        if (!this.serviceCall(var6)) {
                            return;
                        }
                        break;
                    case 81:
                    case 83:
                    default:
                        throw new IOException("unknown transport op " + var5);
                    case 82:
                        DataOutputStream var7 = new DataOutputStream(var1.getOutputStream());
                        var7.writeByte(83);
                        var1.releaseOutputStream();
                        break;
                    case 84:
                        DGCAckHandler.received(UID.read(var4));
                }
            } while(var2);
        } catch (IOException var17) {
            if (tcpLog.isLoggable(Log.BRIEF)) {
                tcpLog.log(Log.BRIEF, "(port " + var3 + ") exception: ", var17);
            }
        } finally {
            try {
                var1.close();
            } catch (IOException var16) {
            }

        }

    }

sun.rmi.transport.Transport#serviceCall

    public boolean serviceCall(final RemoteCall var1) {
        try {
            ObjID var39;
            try {
            //读8个字节作为num,再读4,8,2个字节new(unique, time, count)作为UID,取名space,然后new ObjID(num, space)
                var39 = ObjID.read(var1.getInputStream());
            } catch (IOException var33) {
                throw new MarshalException("unable to read objID", var33);
            }
//dgcID为静态变量,值为ObjID(2, new(0,0,0));
            Transport var40 = var39.equals(dgcID) ? null : this;
            //getTarget会从objTable取值。objTable为静态变量Hasp<ObjectEndpoint, Target>
            //这个Target是远程对象的代理类
            Target var5 = ObjectTable.getTarget(new ObjectEndpoint(var39, var40));
            final Remote var37;
            //var5.getImpl()从Target里的weakRef里取出Remote类,赋值给var37
            if (var5 != null && (var37 = var5.getImpl()) != null) {
            //从target里取出disp
                final Dispatcher var6 = var5.getDispatcher();
                //callCount++
                var5.incrementCallCount();

                boolean var8;
                try {
                    transportLog.log(Log.VERBOSE, "call dispatcher");
                    //返回acc
                    final AccessControlContext var7 = var5.getAccessControlContext();
                    //返回ccl
                    ClassLoader var41 = var5.getContextClassLoader();
                    //获得线程上下文加载器
                    ClassLoader var9 = Thread.currentThread().getContextClassLoader();

                    try {
                        setContextClassLoader(var41);
                        currentTransport.set(this);

                        try {
                            AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
                                public Void run() throws IOException {
                                    Transport.this.checkAcceptPermission(var7);
                                    //在特权模块里执行dispatch,实现类为UnicastServerRef
                                    //var37为var5里的remote
                                    //var1为remotecall
                                    //下面详细展开
                                    var6.dispatch(var37, var1);
                                    return null;
                                }
                            }, var7);
                            return true;
                        } catch (PrivilegedActionException var31) {
                            throw (IOException)var31.getException();
                        }
                    } finally {
                        setContextClassLoader(var9);
                        currentTransport.set((Object)null);
                    }
                } catch (IOException var34) {
                    transportLog.log(Log.BRIEF, "exception thrown by dispatcher: ", var34);
                    var8 = false;
                } finally {
                    var5.decrementCallCount();
                }

                return var8;
            }

            throw new NoSuchObjectException("no such object in table");
        } catch (RemoteException var36) {
            RemoteException var2 = var36;
            if (UnicastServerRef.callLog.isLoggable(Log.BRIEF)) {
                String var3 = "";

                try {
                    var3 = "[" + RemoteServer.getClientHost() + "] ";
                } catch (ServerNotActiveException var30) {
                }

                String var4 = var3 + "exception: ";
                UnicastServerRef.callLog.log(Log.BRIEF, var4, var36);
            }

            try {
                ObjectOutput var38 = var1.getResultStream(false);
                UnicastServerRef.clearStackTraces(var2);
                var38.writeObject(var2);
                var1.releaseOutputStream();
            } catch (IOException var29) {
                transportLog.log(Log.BRIEF, "exception thrown marshalling exception: ", var29);
                return false;
            }
        }

        return true;
    }

sun.rmi.server.UnicastServerRef#dispatch

public void dispatch(Remote var1, RemoteCall var2) throws IOException {
    try {
        int var3;
        ObjectInput var41;
        try {
            var41 = var2.getInputStream();
            //读1个字节
            var3 = var41.readInt();
        } catch (Exception var38) {
            throw new UnmarshalException("error unmarshalling call header", var38);
        }
//调用远程对象,不会走这里
        if (this.skel != null) {
            this.oldDispatch(var1, var2, var3);
            return;
        }

        if (var3 >= 0) {
            throw new UnmarshalException("skeleton class not found but required for client version");
        }

        long var4;
        try {
        //读8个字节
            var4 = var41.readLong();
        } catch (Exception var37) {
            throw new UnmarshalException("error unmarshalling call header", var37);
        }
//输入流转化。MarshalInputStream
//Provides a MarshalInputStream that uses a caller provided ClassLoader to resolve classes during objects deserialization.
        MarshalInputStream var7 = (MarshalInputStream)var41;
        var7.skipDefaultResolveClass();
        //回头看一下hashToMethod_Map
        Method var42 = (Method)this.hashToMethod_Map.get(var4);
        if (var42 == null) {
            throw new UnmarshalException("unrecognized method hash: method not supported by remote object");
        }
        
        this.logCall(var1, var42);
        Object[] var9 = null;

        try {
        //反序列化parameters
            this.unmarshalCustomCallData(var41);
            var9 = this.unmarshalParameters(var1, var42, var7);
        } catch (AccessException var34) {
            ((StreamRemoteCall)var2).discardPendingRefs();
            throw var34;
        } catch (ClassNotFoundException | IOException var35) {
            ((StreamRemoteCall)var2).discardPendingRefs();
            throw new UnmarshalException("error unmarshalling arguments", var35);
        } finally {
            var2.releaseInputStream();
        }

        Object var10;
        try {
        //反射调用var1的var42方法,参数为var9。var10为返回值
            var10 = var42.invoke(var1, var9);
        } catch (InvocationTargetException var33) {
            throw var33.getTargetException();
        }

        try {
            ObjectOutput var11 = var2.getResultStream(true);
            Class var12 = var42.getReturnType();
            if (var12 != Void.TYPE) {
            //var12为返回值类型,将var10反序列化后,输出到var11
                marshalValue(var12, var10, var11);
            }
        } catch (IOException var32) {
            throw new MarshalException("error marshalling return", var32);
        }
    } catch (Throwable var39) {
        Object var6 = var39;
        this.logCallException(var39);
        ObjectOutput var8 = var2.getResultStream(false);
        if (var39 instanceof Error) {
            var6 = new ServerError("Error occurred in server thread", (Error)var39);
        } else if (var39 instanceof RemoteException) {
            var6 = new ServerException("RemoteException occurred in server thread", (Exception)var39);
        }

        if (suppressStackTraces) {
            clearStackTraces((Throwable)var6);
        }

        var8.writeObject(var6);
        if (var39 instanceof AccessException) {
            throw new IOException("Connection is not reusable", var39);
        }
    } finally {
        var2.releaseInputStream();
        var2.releaseOutputStream();
    }

}

总结

1、远程对象会被包装为一个代理对象,然后再封装为Target,然后发布出去。

2、这个代理对象只代理实现了remote接口的类。

3、发布的意思在TCPEndPoint上开启监听。等待逻辑处理。

直接给远程对象发送消息

import socket

ip_port = ('10.43.234.130', 10999)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

payload = b"%08x" % 1246907721

payload = payload + b"%04x" % 2
payload = payload + b"%02x" % 77

s.connect(ip_port)
s.send(bytes.fromhex(payload.decode('utf-8')))

rcv = s.recv(1024)
a = rcv[0:1]

print(int.from_bytes(rcv[0:1], byteorder="little", signed=False))
# client的host长度
print(int(rcv[1:3].hex(), 16))
# client的host
print(rcv[3:-4].decode("utf-8"))
# client的port
print(int(rcv[-4:].hex(), 16))

返回值

标签:TCPTransport,var1,Log,var3,var2,对象,new,rmi,远程
From: https://www.cnblogs.com/wqkant/p/17044435.html

相关文章

  • 将嵌套的对象合并到一个对象里面,删除键名、属性名的空格
    先看一下要处理的数据当一个我们拿到这样的数据是不是很头痛,接下来处理数据created(){letresult=[];for(letiinthis.info){......
  • windows2012通过powershell安装远程组件技巧
    概要:通过服务器管理器安装远程桌面服务会报错一直没有办法解决之后了解到安装组件可以通过power命令安装 powershell命令说明https://docs.microsoft.com/en-us/wind......
  • Mac上访问Windows,远程桌面Microsoft remote desktop使用说明
    如果你在家里工作,需要从你的Mac上远程访问Windows11或10电脑,我们有好消息:这很容易。微软提供了Microsoftremotedesktop工具来完成这项工作。它可以让你直接从你的MacBook......
  • 工业物联网解决方案:二次供水泵站设备远程监控
    近年来,随着城乡一体化建设的加快,二次供水作为城市基础设施,对人民生活生产活动产生越来越重要的影响。当前二次供水泵站的分布式管理已经无法满足用户对于水质、水压以及服务......
  • OOD 面向对象设计 (Object-oriented design)
    OOD概念面向对象设计(Object-OrientedDesign,OOD)方法是面向对象程序设计方法中一个环节。主要作用:对分析模型进行整理,生成设计模型提供给OOP作为开发依据。主要内容:......
  • 互联网医院在线问诊系统-医院远程问诊,守护您的健康
    随着互联网+医疗的探索逐步深入,互联网医院在近年内新起,其价值在于通过先进的技术整合产业链各个核心环节,缓解供需矛盾,优化资源配置,提高产业运行效率,让优质的资源汇集更多的群......
  • C#类和对象
    方法方法即函数,实现某种功能的东西。访问修饰符访问类型方法名称参数列表{//代码块}publicstaticvoidHappen(){Console.WriteLine("happentodosome......
  • 远程控制网关S274支持4路继电器
    采用GSM/GPRS/3G/4G网络通信,不受距离限制,任何地方都可使用;宽工作电压设计,支持9~36VDC供电,且带有防反接保护设计;采用本地配置软件、远程短信、APP设置参数,方便操作,简......
  • 物联网网关采集工业设备数据实现远程监控管理
    在工业物联网中,设备之间、协议之间、网络之间需要一个中间设备,那就是“物联网网关”。它是连接设备和通信网络的桥梁,也是数据上云的重要设备。物联网网关是一种工业互联的数......
  • Json-Tutorial06 对象解析
    前言MiloYip2016/11/15本文是《从零开始的JSON库教程》的第六个单元解答篇。解答代码位于json-tutorial/tutorial06_answer。1.重构lept_parse_string()这个......